diff options
author | jhawkins@chromium.org <jhawkins@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-12-07 02:03:33 +0000 |
---|---|---|
committer | jhawkins@chromium.org <jhawkins@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-12-07 02:03:33 +0000 |
commit | 3f55aa10587b3eaa629d7e95de87998b399fe3e2 (patch) | |
tree | 0f407a7e7fc837bed337a9a5af787edbd9473ef6 /jingle/glue | |
parent | b456003a580041d83e7f5a998c15f62ce380560f (diff) | |
download | chromium_src-3f55aa10587b3eaa629d7e95de87998b399fe3e2.zip chromium_src-3f55aa10587b3eaa629d7e95de87998b399fe3e2.tar.gz chromium_src-3f55aa10587b3eaa629d7e95de87998b399fe3e2.tar.bz2 |
base::Bind: Convert Socket::Read.
BUG=none
TEST=none
R=csilv
Review URL: http://codereview.chromium.org/8801005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@113326 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'jingle/glue')
-rw-r--r-- | jingle/glue/channel_socket_adapter.cc | 52 | ||||
-rw-r--r-- | jingle/glue/channel_socket_adapter.h | 8 | ||||
-rw-r--r-- | jingle/glue/pseudotcp_adapter.cc | 67 | ||||
-rw-r--r-- | jingle/glue/pseudotcp_adapter.h | 2 | ||||
-rw-r--r-- | jingle/glue/pseudotcp_adapter_unittest.cc | 46 |
5 files changed, 141 insertions, 34 deletions
diff --git a/jingle/glue/channel_socket_adapter.cc b/jingle/glue/channel_socket_adapter.cc index 64cfa7f..3e22a35 100644 --- a/jingle/glue/channel_socket_adapter.cc +++ b/jingle/glue/channel_socket_adapter.cc @@ -18,7 +18,7 @@ TransportChannelSocketAdapter::TransportChannelSocketAdapter( cricket::TransportChannel* channel) : message_loop_(MessageLoop::current()), channel_(channel), - read_callback_(NULL), + old_read_callback_(NULL), write_callback_(NULL), closed_error_code_(net::OK) { DCHECK(channel_); @@ -39,7 +39,26 @@ int TransportChannelSocketAdapter::Read( DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(buf); DCHECK(callback); - CHECK(!read_callback_); + CHECK(!old_read_callback_ && read_callback_.is_null()); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + old_read_callback_ = callback; + read_buffer_ = buf; + read_buffer_size_ = buffer_size; + + return net::ERR_IO_PENDING; +} +int TransportChannelSocketAdapter::Read( + net::IOBuffer* buf, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + DCHECK(buf); + DCHECK(!callback.is_null()); + CHECK(!old_read_callback_ && read_callback_.is_null()); if (!channel_) { DCHECK(closed_error_code_ != net::OK); @@ -110,11 +129,16 @@ void TransportChannelSocketAdapter::Close(int error_code) { channel_->SignalDestroyed.disconnect(this); channel_ = NULL; - if (read_callback_) { - net::OldCompletionCallback* callback = read_callback_; - read_callback_ = NULL; + if (old_read_callback_) { + net::OldCompletionCallback* callback = old_read_callback_; + old_read_callback_ = NULL; read_buffer_ = NULL; callback->Run(error_code); + } else if (!read_callback_.is_null()) { + net::CompletionCallback callback = read_callback_; + read_callback_.Reset(); + read_buffer_ = NULL; + callback.Run(error_code); } if (write_callback_) { @@ -129,7 +153,7 @@ void TransportChannelSocketAdapter::OnNewPacket( cricket::TransportChannel* channel, const char* data, size_t data_size) { DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK_EQ(channel, channel_); - if (read_callback_) { + if (old_read_callback_ || !read_callback_.is_null()) { DCHECK(read_buffer_); CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); @@ -141,11 +165,17 @@ void TransportChannelSocketAdapter::OnNewPacket( memcpy(read_buffer_->data(), data, data_size); - net::OldCompletionCallback* callback = read_callback_; - read_callback_ = NULL; - read_buffer_ = NULL; - - callback->Run(data_size); + if (old_read_callback_) { + net::OldCompletionCallback* callback = old_read_callback_; + old_read_callback_ = NULL; + read_buffer_ = NULL; + callback->Run(data_size); + } else { + net::CompletionCallback callback = read_callback_; + read_callback_.Reset(); + read_buffer_ = NULL; + callback.Run(data_size); + } } else { LOG(WARNING) << "Data was received without a callback. Dropping the packet."; diff --git a/jingle/glue/channel_socket_adapter.h b/jingle/glue/channel_socket_adapter.h index 43a63f0..1f367e8 100644 --- a/jingle/glue/channel_socket_adapter.h +++ b/jingle/glue/channel_socket_adapter.h @@ -6,6 +6,7 @@ #define JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ #include "base/compiler_specific.h" +#include "net/base/completion_callback.h" #include "net/socket/socket.h" #include "third_party/libjingle/source/talk/base/socketaddress.h" #include "third_party/libjingle/source/talk/base/sigslot.h" @@ -33,9 +34,11 @@ class TransportChannelSocketAdapter : public net::Socket, // Must be called before the session and the channel are destroyed. void Close(int error_code); - // Socket interface. + // Socket implementation. virtual int Read(net::IOBuffer* buf, int buf_len, net::OldCompletionCallback* callback) OVERRIDE; + virtual int Read(net::IOBuffer* buf, int buf_len, + const net::CompletionCallback& callback) OVERRIDE; virtual int Write(net::IOBuffer* buf, int buf_len, net::OldCompletionCallback* callback) OVERRIDE; @@ -52,7 +55,8 @@ class TransportChannelSocketAdapter : public net::Socket, cricket::TransportChannel* channel_; - net::OldCompletionCallback* read_callback_; // Not owned. + net::OldCompletionCallback* old_read_callback_; // Not owned. + net::CompletionCallback read_callback_; scoped_refptr<net::IOBuffer> read_buffer_; int read_buffer_size_; diff --git a/jingle/glue/pseudotcp_adapter.cc b/jingle/glue/pseudotcp_adapter.cc index 6f3ddd1..7807eed 100644 --- a/jingle/glue/pseudotcp_adapter.cc +++ b/jingle/glue/pseudotcp_adapter.cc @@ -31,6 +31,8 @@ class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, // Functions used to implement net::StreamSocket. int Read(net::IOBuffer* buffer, int buffer_size, net::OldCompletionCallback* callback); + int Read(net::IOBuffer* buffer, int buffer_size, + const net::CompletionCallback& callback); int Write(net::IOBuffer* buffer, int buffer_size, net::OldCompletionCallback* callback); int Connect(net::OldCompletionCallback* callback); @@ -71,7 +73,8 @@ class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, net::OldCompletionCallback* old_connect_callback_; net::CompletionCallback connect_callback_; - net::OldCompletionCallback* read_callback_; + net::OldCompletionCallback* old_read_callback_; + net::CompletionCallback read_callback_; net::OldCompletionCallback* write_callback_; cricket::PseudoTcp pseudo_tcp_; @@ -96,7 +99,7 @@ class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, PseudoTcpAdapter::Core::Core(net::Socket* socket) : old_connect_callback_(NULL), - read_callback_(NULL), + old_read_callback_(NULL), write_callback_(NULL), ALLOW_THIS_IN_INITIALIZER_LIST(pseudo_tcp_(this, 0)), socket_(socket), @@ -114,7 +117,30 @@ PseudoTcpAdapter::Core::~Core() { int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size, net::OldCompletionCallback* callback) { - DCHECK(!read_callback_); + DCHECK(!old_read_callback_ && read_callback_.is_null()); + + // Reference the Core in case a callback deletes the adapter. + scoped_refptr<Core> core(this); + + int result = pseudo_tcp_.Recv(buffer->data(), buffer_size); + if (result < 0) { + result = net::MapSystemError(pseudo_tcp_.GetError()); + DCHECK(result < 0); + } + + if (result == net::ERR_IO_PENDING) { + read_buffer_ = buffer; + read_buffer_size_ = buffer_size; + old_read_callback_ = callback; + } + + AdjustClock(); + + return result; +} +int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK(!old_read_callback_ && read_callback_.is_null()); // Reference the Core in case a callback deletes the adapter. scoped_refptr<Core> core(this); @@ -202,7 +228,8 @@ int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) { void PseudoTcpAdapter::Core::Disconnect() { // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket. - read_callback_ = NULL; + old_read_callback_ = NULL; + read_callback_.Reset(); read_buffer_ = NULL; write_callback_ = NULL; write_buffer_ = NULL; @@ -242,7 +269,7 @@ void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) { void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) { DCHECK_EQ(tcp, &pseudo_tcp_); - if (!read_callback_) + if (!old_read_callback_ && read_callback_.is_null()) return; int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_); @@ -255,10 +282,17 @@ void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) { AdjustClock(); - net::OldCompletionCallback* callback = read_callback_; - read_callback_ = NULL; - read_buffer_ = NULL; - callback->Run(result); + if (old_read_callback_) { + net::OldCompletionCallback* callback = old_read_callback_; + old_read_callback_ = NULL; + read_buffer_ = NULL; + callback->Run(result); + } else { + net::CompletionCallback callback = read_callback_; + read_callback_.Reset(); + read_buffer_ = NULL; + callback.Run(result); + } } void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) { @@ -295,10 +329,14 @@ void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) { callback.Run(net::MapSystemError(error)); } - if (read_callback_) { - net::OldCompletionCallback* callback = read_callback_; - read_callback_ = NULL; + if (old_read_callback_) { + net::OldCompletionCallback* callback = old_read_callback_; + old_read_callback_ = NULL; callback->Run(net::MapSystemError(error)); + } else if (!read_callback_.is_null()) { + net::CompletionCallback callback = read_callback_; + read_callback_.Reset(); + callback.Run(net::MapSystemError(error)); } if (write_callback_) { @@ -431,6 +469,11 @@ int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, DCHECK(CalledOnValidThread()); return core_->Read(buffer, buffer_size, callback); } +int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK(CalledOnValidThread()); + return core_->Read(buffer, buffer_size, callback); +} int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, net::OldCompletionCallback* callback) { diff --git a/jingle/glue/pseudotcp_adapter.h b/jingle/glue/pseudotcp_adapter.h index fa48b15..f0d27ee 100644 --- a/jingle/glue/pseudotcp_adapter.h +++ b/jingle/glue/pseudotcp_adapter.h @@ -32,6 +32,8 @@ class PseudoTcpAdapter : public net::StreamSocket, base::NonThreadSafe { // net::Socket implementation. virtual int Read(net::IOBuffer* buffer, int buffer_size, net::OldCompletionCallback* callback) OVERRIDE; + virtual int Read(net::IOBuffer* buffer, int buffer_size, + const net::CompletionCallback& callback) OVERRIDE; virtual int Write(net::IOBuffer* buffer, int buffer_size, net::OldCompletionCallback* callback) OVERRIDE; virtual bool SetReceiveBufferSize(int32 size) OVERRIDE; diff --git a/jingle/glue/pseudotcp_adapter_unittest.cc b/jingle/glue/pseudotcp_adapter_unittest.cc index 959348e..6d803a5 100644 --- a/jingle/glue/pseudotcp_adapter_unittest.cc +++ b/jingle/glue/pseudotcp_adapter_unittest.cc @@ -10,6 +10,7 @@ #include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "jingle/glue/thread_wrapper.h" +#include "net/base/completion_callback.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/base/test_completion_callback.h" @@ -80,7 +81,7 @@ class LeakyBucket : public RateLimiter { class FakeSocket : public net::Socket { public: FakeSocket() - : read_callback_(NULL), + : old_read_callback_(NULL), rate_limiter_(NULL), latency_ms_(0) { } @@ -90,13 +91,20 @@ class FakeSocket : public net::Socket { if (rate_limiter_ && rate_limiter_->DropNextPacket()) return; // Lose the packet. - if (read_callback_) { + if (old_read_callback_ || !read_callback_.is_null()) { int size = std::min(read_buffer_size_, static_cast<int>(data.size())); memcpy(read_buffer_->data(), &data[0], data.size()); - net::OldCompletionCallback* cb = read_callback_; - read_callback_ = NULL; - read_buffer_ = NULL; - cb->Run(size); + if (old_read_callback_) { + net::OldCompletionCallback* cb = old_read_callback_; + old_read_callback_ = NULL; + read_buffer_ = NULL; + cb->Run(size); + } else { + net::CompletionCallback cb = read_callback_; + read_callback_.Reset(); + read_buffer_ = NULL; + cb.Run(size); + } } else { incoming_packets_.push_back(data); } @@ -112,10 +120,29 @@ class FakeSocket : public net::Socket { void set_latency(int latency_ms) { latency_ms_ = latency_ms; }; - // net::Socket interface. + // net::Socket implementation. virtual int Read(net::IOBuffer* buf, int buf_len, net::OldCompletionCallback* callback) { - CHECK(!read_callback_); + CHECK(!old_read_callback_ && read_callback_.is_null()); + CHECK(buf); + + if (incoming_packets_.size() > 0) { + scoped_refptr<net::IOBuffer> buffer(buf); + int size = std::min( + static_cast<int>(incoming_packets_.front().size()), buf_len); + memcpy(buffer->data(), &*incoming_packets_.front().begin(), size); + incoming_packets_.pop_front(); + return size; + } else { + old_read_callback_ = callback; + read_buffer_ = buf; + read_buffer_size_ = buf_len; + return net::ERR_IO_PENDING; + } + } + virtual int Read(net::IOBuffer* buf, int buf_len, + const net::CompletionCallback& callback) { + CHECK(!old_read_callback_ && read_callback_.is_null()); CHECK(buf); if (incoming_packets_.size() > 0) { @@ -160,7 +187,8 @@ class FakeSocket : public net::Socket { private: scoped_refptr<net::IOBuffer> read_buffer_; int read_buffer_size_; - net::OldCompletionCallback* read_callback_; + net::OldCompletionCallback* old_read_callback_; + net::CompletionCallback read_callback_; std::deque<std::vector<char> > incoming_packets_; |