diff options
Diffstat (limited to 'net/curvecp')
-rw-r--r-- | net/curvecp/curvecp_client_socket.cc | 5 | ||||
-rw-r--r-- | net/curvecp/curvecp_client_socket.h | 3 | ||||
-rw-r--r-- | net/curvecp/curvecp_server_socket.cc | 5 | ||||
-rw-r--r-- | net/curvecp/curvecp_server_socket.h | 3 | ||||
-rw-r--r-- | net/curvecp/messenger.cc | 41 | ||||
-rw-r--r-- | net/curvecp/messenger.h | 4 |
6 files changed, 53 insertions, 8 deletions
diff --git a/net/curvecp/curvecp_client_socket.cc b/net/curvecp/curvecp_client_socket.cc index 6772e38..d6889cf 100644 --- a/net/curvecp/curvecp_client_socket.cc +++ b/net/curvecp/curvecp_client_socket.cc @@ -116,6 +116,11 @@ int CurveCPClientSocket::Write(IOBuffer* buf, OldCompletionCallback* callback) { return messenger_.Write(buf, buf_len, callback); } +int CurveCPClientSocket::Write(IOBuffer* buf, + int buf_len, + const CompletionCallback& callback) { + return messenger_.Write(buf, buf_len, callback); +} bool CurveCPClientSocket::SetReceiveBufferSize(int32 size) { return true; diff --git a/net/curvecp/curvecp_client_socket.h b/net/curvecp/curvecp_client_socket.h index 8062c87..90e2253 100644 --- a/net/curvecp/curvecp_client_socket.h +++ b/net/curvecp/curvecp_client_socket.h @@ -51,6 +51,9 @@ class CurveCPClientSocket : public StreamSocket { virtual int Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) OVERRIDE; + virtual int Write(IOBuffer* buf, + int buf_len, + const CompletionCallback& callback) OVERRIDE; virtual bool SetReceiveBufferSize(int32 size) OVERRIDE; virtual bool SetSendBufferSize(int32 size) OVERRIDE; diff --git a/net/curvecp/curvecp_server_socket.cc b/net/curvecp/curvecp_server_socket.cc index 6e7934c..b48f82f 100644 --- a/net/curvecp/curvecp_server_socket.cc +++ b/net/curvecp/curvecp_server_socket.cc @@ -63,6 +63,11 @@ int CurveCPServerSocket::Write(IOBuffer* buf, OldCompletionCallback* callback) { return messenger_.Write(buf, buf_len, callback); } +int CurveCPServerSocket::Write(IOBuffer* buf, + int buf_len, + const CompletionCallback& callback) { + return messenger_.Write(buf, buf_len, callback); +} bool CurveCPServerSocket::SetReceiveBufferSize(int32 size) { return true; diff --git a/net/curvecp/curvecp_server_socket.h b/net/curvecp/curvecp_server_socket.h index a2ea1d7..bd005f3 100644 --- a/net/curvecp/curvecp_server_socket.h +++ b/net/curvecp/curvecp_server_socket.h @@ -43,6 +43,9 @@ class CurveCPServerSocket : public Socket, virtual int Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) OVERRIDE; + virtual int Write(IOBuffer* buf, + int buf_len, + const CompletionCallback& callback) OVERRIDE; virtual bool SetReceiveBufferSize(int32 size) OVERRIDE; virtual bool SetSendBufferSize(int32 size) OVERRIDE; diff --git a/net/curvecp/messenger.cc b/net/curvecp/messenger.cc index a99abda..fc9adf5 100644 --- a/net/curvecp/messenger.cc +++ b/net/curvecp/messenger.cc @@ -58,7 +58,7 @@ static const size_t kReceiveBufferSize = (128 * 1024); Messenger::Messenger(Packetizer* packetizer) : packetizer_(packetizer), send_buffer_(kSendBufferSize), - send_complete_callback_(NULL), + old_send_complete_callback_(NULL), old_receive_complete_callback_(NULL), pending_receive_length_(0), send_message_in_progress_(false), @@ -104,10 +104,31 @@ int Messenger::Read(IOBuffer* buf, int buf_len, return bytes_read; } -int Messenger::Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) { +int Messenger::Write( + IOBuffer* buf, int buf_len, OldCompletionCallback* callback) { DCHECK(CalledOnValidThread()); DCHECK(!pending_send_.get()); // Already a write pending! - DCHECK(!send_complete_callback_); + DCHECK(!old_send_complete_callback_ && send_complete_callback_.is_null()); + DCHECK_LT(0, buf_len); + + int len = send_buffer_.write(buf->data(), buf_len); + if (!send_timer_.IsRunning()) + send_timer_.Start(FROM_HERE, base::TimeDelta(), + this, &Messenger::OnSendTimer); + if (len) + return len; + + // We couldn't add data to the send buffer, so block the application. + pending_send_ = buf; + pending_send_length_ = buf_len; + old_send_complete_callback_ = callback; + return ERR_IO_PENDING; +} +int Messenger::Write( + IOBuffer* buf, int buf_len, const CompletionCallback& callback) { + DCHECK(CalledOnValidThread()); + DCHECK(!pending_send_.get()); // Already a write pending! + DCHECK(!old_send_complete_callback_ && send_complete_callback_.is_null()); DCHECK_LT(0, buf_len); int len = send_buffer_.write(buf->data(), buf_len); @@ -168,15 +189,21 @@ IOBufferWithSize* Messenger::CreateBufferFromSendQueue() { DCHECK_EQ(bytes, length); // We consumed data, check to see if someone is waiting to write more data. - if (send_complete_callback_) { + if (old_send_complete_callback_ || !send_complete_callback_.is_null()) { DCHECK(pending_send_.get()); int len = send_buffer_.write(pending_send_->data(), pending_send_length_); if (len) { pending_send_ = NULL; - OldCompletionCallback* callback = send_complete_callback_; - send_complete_callback_ = NULL; - callback->Run(len); + if (old_send_complete_callback_) { + OldCompletionCallback* callback = old_send_complete_callback_; + old_send_complete_callback_ = NULL; + callback->Run(len); + } else { + CompletionCallback callback = send_complete_callback_; + send_complete_callback_.Reset(); + callback.Run(len); + } } } diff --git a/net/curvecp/messenger.h b/net/curvecp/messenger.h index bb67946..b71c684 100644 --- a/net/curvecp/messenger.h +++ b/net/curvecp/messenger.h @@ -36,6 +36,7 @@ class Messenger : public base::NonThreadSafe, int Read(IOBuffer* buf, int buf_len, OldCompletionCallback* callback); int Read(IOBuffer* buf, int buf_len, const CompletionCallback& callback); int Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback); + int Write(IOBuffer* buf, int buf_len, const CompletionCallback& callback); // Packetizer::Listener implementation. virtual void OnConnection(ConnectionKey key) OVERRIDE; @@ -71,7 +72,8 @@ class Messenger : public base::NonThreadSafe, // The send_buffer is a list of pending data to pack into messages and send // to the remote. CircularBuffer send_buffer_; - OldCompletionCallback* send_complete_callback_; + OldCompletionCallback* old_send_complete_callback_; + CompletionCallback send_complete_callback_; scoped_refptr<IOBuffer> pending_send_; int pending_send_length_; |