diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-05-02 17:28:10 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-05-02 17:28:10 +0000 |
commit | 26d912d33fc645ff2499e3537adad66d46e76b33 (patch) | |
tree | 809d8c79564ba2d697b608347cabcf24a8e8857b /content/renderer | |
parent | c0022e306a82f29b471bc5d0e7700030de3f9cc5 (diff) | |
download | chromium_src-26d912d33fc645ff2499e3537adad66d46e76b33.zip chromium_src-26d912d33fc645ff2499e3537adad66d46e76b33.tar.gz chromium_src-26d912d33fc645ff2499e3537adad66d46e76b33.tar.bz2 |
Implement PseudoTCP support in P2P Transport Pepper API.
BUG=None
TEST=Unittests
Review URL: http://codereview.chromium.org/6893101
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@83736 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'content/renderer')
-rw-r--r-- | content/renderer/p2p/ipc_network_manager.cc | 10 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl.cc | 54 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl.h | 11 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl_unittest.cc | 232 | ||||
-rw-r--r-- | content/renderer/p2p/socket_dispatcher.cc | 6 | ||||
-rw-r--r-- | content/renderer/p2p/socket_dispatcher.h | 4 |
6 files changed, 280 insertions, 37 deletions
diff --git a/content/renderer/p2p/ipc_network_manager.cc b/content/renderer/p2p/ipc_network_manager.cc index 3cb7124..ef5f550 100644 --- a/content/renderer/p2p/ipc_network_manager.cc +++ b/content/renderer/p2p/ipc_network_manager.cc @@ -8,8 +8,15 @@ #include "net/base/sys_byteorder.h" #include "content/renderer/p2p/socket_dispatcher.h" +// TODO(sergeyu): Currently the NetworkManager interface is +// syncronous, but it gets list of networks from the browser process +// asyncrhonously, so EnumNetworks() may return an empty list if we +// haven't received list of networks from the browser. Make +// NetworkManager interface asynchronous to avoid this problem. + IpcNetworkManager::IpcNetworkManager(P2PSocketDispatcher* socket_dispatcher) : socket_dispatcher_(socket_dispatcher) { + socket_dispatcher_->RequestNetworks(); } IpcNetworkManager::~IpcNetworkManager() { @@ -18,7 +25,8 @@ IpcNetworkManager::~IpcNetworkManager() { bool IpcNetworkManager::EnumNetworks( bool include_ignored, std::vector<talk_base::Network*>* networks) { socket_dispatcher_->RequestNetworks(); - const net::NetworkInterfaceList& list = socket_dispatcher_->networks(); + net::NetworkInterfaceList list; + socket_dispatcher_->GetNetworks(&list); for (net::NetworkInterfaceList::const_iterator it = list.begin(); it != list.end(); it++) { uint32 address; diff --git a/content/renderer/p2p/p2p_transport_impl.cc b/content/renderer/p2p/p2p_transport_impl.cc index ad542d7..d467342 100644 --- a/content/renderer/p2p/p2p_transport_impl.cc +++ b/content/renderer/p2p/p2p_transport_impl.cc @@ -10,7 +10,9 @@ #include "content/renderer/p2p/ipc_socket_factory.h" #include "content/renderer/render_view.h" #include "jingle/glue/channel_socket_adapter.h" +#include "jingle/glue/pseudotcp_adapter.h" #include "jingle/glue/thread_wrapper.h" +#include "net/base/net_errors.h" #include "third_party/libjingle/source/talk/p2p/base/p2ptransportchannel.h" #include "third_party/libjingle/source/talk/p2p/client/basicportallocator.h" @@ -20,18 +22,26 @@ P2PTransportImpl::P2PTransportImpl( : event_handler_(NULL), state_(STATE_NONE), network_manager_(network_manager), - socket_factory_(socket_factory) { + socket_factory_(socket_factory), + ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_( + this, &P2PTransportImpl::OnTcpConnected)) { } P2PTransportImpl::P2PTransportImpl(P2PSocketDispatcher* socket_dispatcher) - : network_manager_(new IpcNetworkManager(socket_dispatcher)), - socket_factory_(new IpcPacketSocketFactory(socket_dispatcher)) { + : event_handler_(NULL), + state_(STATE_NONE), + network_manager_(new IpcNetworkManager(socket_dispatcher)), + socket_factory_(new IpcPacketSocketFactory(socket_dispatcher)), + ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_( + this, &P2PTransportImpl::OnTcpConnected)) { } P2PTransportImpl::~P2PTransportImpl() { } -bool P2PTransportImpl::Init(const std::string& name, const std::string& config, +bool P2PTransportImpl::Init(const std::string& name, + Protocol protocol, + const std::string& config, EventHandler* event_handler) { DCHECK(event_handler); @@ -52,18 +62,29 @@ bool P2PTransportImpl::Init(const std::string& name, const std::string& config, name, "", NULL, allocator_.get())); channel_->SignalRequestSignaling.connect( this, &P2PTransportImpl::OnRequestSignaling); - channel_->SignalWritableState.connect( - this, &P2PTransportImpl::OnReadableState); - channel_->SignalWritableState.connect( - this, &P2PTransportImpl::OnWriteableState); channel_->SignalCandidateReady.connect( this, &P2PTransportImpl::OnCandidateReady); + if (protocol == PROTOCOL_UDP) { + channel_->SignalWritableState.connect( + this, &P2PTransportImpl::OnReadableState); + channel_->SignalWritableState.connect( + this, &P2PTransportImpl::OnWriteableState); + } + channel_adapter_.reset(new jingle_glue::TransportChannelSocketAdapter( channel_.get())); channel_->Connect(); + if (protocol == PROTOCOL_TCP) { + pseudo_tcp_adapter_.reset(new jingle_glue::PseudoTcpAdapter( + channel_adapter_.release())); + int result = pseudo_tcp_adapter_->Connect(&connect_callback_); + if (result != net::ERR_IO_PENDING) + OnTcpConnected(result); + } + return true; } @@ -163,5 +184,20 @@ bool P2PTransportImpl::DeserializeCandidate(const std::string& address, } net::Socket* P2PTransportImpl::GetChannel() { - return channel_adapter_.get(); + if (pseudo_tcp_adapter_.get()) { + DCHECK(!channel_adapter_.get()); + return pseudo_tcp_adapter_.get(); + } else { + DCHECK(channel_adapter_.get()); + return channel_adapter_.get(); + } +} + +void P2PTransportImpl::OnTcpConnected(int result) { + if (result < 0) { + event_handler_->OnError(result); + return; + } + state_ = static_cast<State>(STATE_READABLE | STATE_WRITABLE); + event_handler_->OnStateChange(state_); } diff --git a/content/renderer/p2p/p2p_transport_impl.h b/content/renderer/p2p/p2p_transport_impl.h index 2c8b947..e427820 100644 --- a/content/renderer/p2p/p2p_transport_impl.h +++ b/content/renderer/p2p/p2p_transport_impl.h @@ -7,6 +7,7 @@ #include "base/basictypes.h" #include "base/scoped_ptr.h" +#include "net/base/completion_callback.h" #include "third_party/libjingle/source/talk/base/sigslot.h" #include "webkit/glue/p2p_transport.h" @@ -22,6 +23,7 @@ class TransportChannelImpl; namespace jingle_glue { class TransportChannelSocketAdapter; +class PseudoTcpAdapter; } // namespace jingle_glue namespace talk_base { @@ -46,7 +48,9 @@ class P2PTransportImpl : public webkit_glue::P2PTransport, virtual ~P2PTransportImpl(); // webkit_glue::P2PTransport interface. - virtual bool Init(const std::string& name, const std::string& config, + virtual bool Init(const std::string& name, + Protocol protocol, + const std::string& config, EventHandler* event_handler) OVERRIDE; virtual bool AddRemoteCandidate(const std::string& address) OVERRIDE; virtual net::Socket* GetChannel() OVERRIDE; @@ -64,6 +68,8 @@ class P2PTransportImpl : public webkit_glue::P2PTransport, bool DeserializeCandidate(const std::string& address, cricket::Candidate* candidate); + void OnTcpConnected(int result); + std::string name_; EventHandler* event_handler_; State state_; @@ -75,6 +81,9 @@ class P2PTransportImpl : public webkit_glue::P2PTransport, scoped_ptr<cricket::P2PTransportChannel> channel_; scoped_ptr<jingle_glue::TransportChannelSocketAdapter> channel_adapter_; + scoped_ptr<jingle_glue::PseudoTcpAdapter> pseudo_tcp_adapter_; + + net::CompletionCallbackImpl<P2PTransportImpl> connect_callback_; DISALLOW_COPY_AND_ASSIGN(P2PTransportImpl); }; diff --git a/content/renderer/p2p/p2p_transport_impl_unittest.cc b/content/renderer/p2p/p2p_transport_impl_unittest.cc index 393fd0d..718111b 100644 --- a/content/renderer/p2p/p2p_transport_impl_unittest.cc +++ b/content/renderer/p2p/p2p_transport_impl_unittest.cc @@ -39,10 +39,12 @@ const char kTestConfig[] = ""; const int kMessageSize = 10; const int kMessages = 10; const int kUdpWriteDelayMs = 10; +const int kTcpDataSize = 10 * 1024; +const int kTcpWriteDelayMs = 1; -class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { +class UdpChannelTester : public base::RefCountedThreadSafe<UdpChannelTester> { public: - ChannelTester(MessageLoop* message_loop, + UdpChannelTester(MessageLoop* message_loop, net::Socket* write_socket, net::Socket* read_socket) : message_loop_(message_loop), @@ -50,9 +52,9 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { read_socket_(read_socket), done_(false), ALLOW_THIS_IN_INITIALIZER_LIST( - write_cb_(this, &ChannelTester::OnWritten)), + write_cb_(this, &UdpChannelTester::OnWritten)), ALLOW_THIS_IN_INITIALIZER_LIST( - read_cb_(this, &ChannelTester::OnRead)), + read_cb_(this, &UdpChannelTester::OnRead)), write_errors_(0), read_errors_(0), packets_sent_(0), @@ -60,11 +62,11 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { broken_packets_(0) { } - virtual ~ChannelTester() { } + virtual ~UdpChannelTester() { } void Start() { message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoStart)); + FROM_HERE, NewRunnableMethod(this, &UdpChannelTester::DoStart)); } void CheckResults() { @@ -119,7 +121,7 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { EXPECT_EQ(kMessageSize, result); packets_sent_++; message_loop_->PostDelayedTask( - FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoWrite), + FROM_HERE, NewRunnableMethod(this, &UdpChannelTester::DoWrite), kUdpWriteDelayMs); } } @@ -177,8 +179,8 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { scoped_refptr<net::IOBuffer> sent_packets_[kMessages]; scoped_refptr<net::IOBuffer> read_buffer_; - net::CompletionCallbackImpl<ChannelTester> write_cb_; - net::CompletionCallbackImpl<ChannelTester> read_cb_; + net::CompletionCallbackImpl<UdpChannelTester> write_cb_; + net::CompletionCallbackImpl<UdpChannelTester> read_cb_; int write_errors_; int read_errors_; int packets_sent_; @@ -186,19 +188,151 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { int broken_packets_; }; +class TcpChannelTester : public base::RefCountedThreadSafe<TcpChannelTester> { + public: + TcpChannelTester(MessageLoop* message_loop, + net::Socket* write_socket, + net::Socket* read_socket) + : message_loop_(message_loop), + write_socket_(write_socket), + read_socket_(read_socket), + done_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_cb_(this, &TcpChannelTester::OnWritten)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_cb_(this, &TcpChannelTester::OnRead)), + write_errors_(0), + read_errors_(0) { + } + + virtual ~TcpChannelTester() { } + + void Start() { + // Initialize |send_buffer_|. + send_buffer_ = new net::DrainableIOBuffer(new net::IOBuffer(kTcpDataSize), + kTcpDataSize); + for (int i = 0; i < kTcpDataSize; ++i) { + send_buffer_->data()[i] = rand() % 256; + } + + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &TcpChannelTester::DoStart)); + } + + void CheckResults() { + EXPECT_EQ(0, write_errors_); + EXPECT_EQ(0, read_errors_); + + EXPECT_EQ(0, send_buffer_->BytesRemaining()); + + send_buffer_->SetOffset(0); + EXPECT_EQ(kTcpDataSize, static_cast<int>(received_data_.size())); + EXPECT_EQ(0, memcmp(send_buffer_->data(), + &received_data_[0], received_data_.size())); + } + + protected: + void Done() { + done_ = true; + message_loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask()); + } + + void DoStart() { + DoRead(); + DoWrite(); + } + + void DoWrite() { + if (send_buffer_->BytesRemaining() == 0) { + return; + } + + int result = write_socket_->Write( + send_buffer_, send_buffer_->BytesRemaining(), &write_cb_); + HandleWriteResult(result); + } + + void OnWritten(int result) { + HandleWriteResult(result); + } + + void HandleWriteResult(int result) { + if (result <= 0 && result != net::ERR_IO_PENDING) { + LOG(ERROR) << "Received error " << result << " when trying to write"; + write_errors_++; + Done(); + } else if (result > 0) { + send_buffer_->DidConsume(result); + message_loop_->PostDelayedTask( + FROM_HERE, NewRunnableMethod(this, &TcpChannelTester::DoWrite), + kTcpWriteDelayMs); + } + } + + void DoRead() { + int result = 1; + while (result > 0) { + int kReadSize = kMessageSize * 2; + read_buffer_ = new net::IOBuffer(kReadSize); + + result = read_socket_->Read(read_buffer_, kReadSize, &read_cb_); + HandleReadResult(result); + }; + } + + void OnRead(int result) { + HandleReadResult(result); + DoRead(); + } + + void HandleReadResult(int result) { + if (result <= 0 && result != net::ERR_IO_PENDING) { + // Error will be received after the socket is closed. + if (!done_) { + LOG(ERROR) << "Received error " << result << " when trying to read"; + read_errors_++; + Done(); + } + } else if (result > 0) { + received_data_.insert(received_data_.end(), read_buffer_->data(), + read_buffer_->data() + result); + if (static_cast<int>(received_data_.size()) == kTcpDataSize) + Done(); + } + } + + private: + MessageLoop* message_loop_; + net::Socket* write_socket_; + net::Socket* read_socket_; + bool done_; + + scoped_refptr<net::DrainableIOBuffer> send_buffer_; + scoped_refptr<net::IOBuffer> read_buffer_; + + std::vector<char> sent_data_; + std::vector<char> received_data_; + + net::CompletionCallbackImpl<TcpChannelTester> write_cb_; + net::CompletionCallbackImpl<TcpChannelTester> read_cb_; + int write_errors_; + int read_errors_; +}; + } // namespace class MockP2PEventHandler : public P2PTransport::EventHandler { public: MOCK_METHOD1(OnCandidateReady, void(const std::string& address)); MOCK_METHOD1(OnStateChange, void(P2PTransport::State state)); + MOCK_METHOD1(OnError, void(int error)); }; class P2PTransportImplTest : public testing::Test { public: protected: - void SetUp() OVERRIDE { + virtual void SetUp() OVERRIDE { socket_manager_ = new jingle_glue::FakeSocketManager(); net::IPAddressNumber ip; @@ -213,6 +347,13 @@ class P2PTransportImplTest : public testing::Test { new jingle_glue::FakeSocketFactory(socket_manager_, ip))); } + void Init(P2PTransport::Protocol protocol) { + ASSERT_TRUE(transport1_->Init( + kTransportName1, protocol, kTestConfig, &event_handler1_)); + ASSERT_TRUE(transport2_->Init( + kTransportName2, protocol, kTestConfig, &event_handler2_)); + } + MessageLoop message_loop_; scoped_refptr<jingle_glue::FakeSocketManager> socket_manager_; @@ -223,10 +364,7 @@ class P2PTransportImplTest : public testing::Test { }; TEST_F(P2PTransportImplTest, Create) { - ASSERT_TRUE(transport1_->Init( - kTransportName1, kTestConfig, &event_handler1_)); - ASSERT_TRUE(transport2_->Init( - kTransportName2, kTestConfig, &event_handler2_)); + Init(P2PTransport::PROTOCOL_UDP); EXPECT_CALL(event_handler1_, OnCandidateReady(_)); EXPECT_CALL(event_handler2_, OnCandidateReady(_)); @@ -238,11 +376,19 @@ ACTION_P(AddRemoteCandidate, transport) { EXPECT_TRUE(transport->AddRemoteCandidate(arg0)); } -TEST_F(P2PTransportImplTest, Connect) { - ASSERT_TRUE(transport1_->Init( - kTransportName1, kTestConfig, &event_handler1_)); - ASSERT_TRUE(transport2_->Init( - kTransportName2, kTestConfig, &event_handler2_)); +TEST_F(P2PTransportImplTest, ConnectUdp) { + Init(P2PTransport::PROTOCOL_UDP); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport2_.get())); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport1_.get())); + + message_loop_.RunAllPending(); +} + +TEST_F(P2PTransportImplTest, ConnectTcp) { + Init(P2PTransport::PROTOCOL_UDP); EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( AddRemoteCandidate(transport2_.get())); @@ -252,11 +398,47 @@ TEST_F(P2PTransportImplTest, Connect) { message_loop_.RunAllPending(); } -TEST_F(P2PTransportImplTest, SendData) { - ASSERT_TRUE(transport1_->Init( - kTransportName1, kTestConfig, &event_handler1_)); - ASSERT_TRUE(transport2_->Init( - kTransportName2, kTestConfig, &event_handler2_)); +TEST_F(P2PTransportImplTest, SendDataUdp) { + Init(P2PTransport::PROTOCOL_UDP); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport2_.get())); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport1_.get())); + + // Transport may first become ether readable or writable, but + // eventually it must be readable and writable. + EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_READABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_WRITABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler1_, OnStateChange( + static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE | + P2PTransport::STATE_WRITABLE))) + .Times(Exactly(1)); + + EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_READABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_WRITABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler2_, OnStateChange( + static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE | + P2PTransport::STATE_WRITABLE))) + .Times(Exactly(1)); + + scoped_refptr<UdpChannelTester> channel_tester = new UdpChannelTester( + &message_loop_, transport1_->GetChannel(), transport2_->GetChannel()); + + message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), + TestTimeouts::action_max_timeout_ms()); + + channel_tester->Start(); + message_loop_.Run(); + channel_tester->CheckResults(); +} + +TEST_F(P2PTransportImplTest, SendDataTcp) { + Init(P2PTransport::PROTOCOL_TCP); EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( AddRemoteCandidate(transport2_.get())); @@ -283,7 +465,7 @@ TEST_F(P2PTransportImplTest, SendData) { P2PTransport::STATE_WRITABLE))) .Times(Exactly(1)); - scoped_refptr<ChannelTester> channel_tester = new ChannelTester( + scoped_refptr<TcpChannelTester> channel_tester = new TcpChannelTester( &message_loop_, transport1_->GetChannel(), transport2_->GetChannel()); message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), diff --git a/content/renderer/p2p/socket_dispatcher.cc b/content/renderer/p2p/socket_dispatcher.cc index 065bd23..429d7bc 100644 --- a/content/renderer/p2p/socket_dispatcher.cc +++ b/content/renderer/p2p/socket_dispatcher.cc @@ -23,6 +23,11 @@ void P2PSocketDispatcher::RequestNetworks() { Send(new P2PHostMsg_GetNetworkList(routing_id())); } +void P2PSocketDispatcher::GetNetworks(net::NetworkInterfaceList* networks) { + base::AutoLock auto_lock(networks_lock_); + *networks = networks_; +} + bool P2PSocketDispatcher::OnMessageReceived(const IPC::Message& message) { bool handled = true; IPC_BEGIN_MESSAGE_MAP(P2PSocketDispatcher, message) @@ -55,6 +60,7 @@ base::MessageLoopProxy* P2PSocketDispatcher::message_loop() { void P2PSocketDispatcher::OnNetworkList( const net::NetworkInterfaceList& networks) { + base::AutoLock auto_lock(networks_lock_); networks_ = networks; } diff --git a/content/renderer/p2p/socket_dispatcher.h b/content/renderer/p2p/socket_dispatcher.h index 4d0c273..b91b847 100644 --- a/content/renderer/p2p/socket_dispatcher.h +++ b/content/renderer/p2p/socket_dispatcher.h @@ -22,6 +22,7 @@ #include <vector> #include "base/id_map.h" +#include "base/synchronization/lock.h" #include "content/common/p2p_sockets.h" #include "content/renderer/p2p/socket_client.h" #include "content/renderer/render_view_observer.h" @@ -39,7 +40,7 @@ class P2PSocketDispatcher : public RenderViewObserver { virtual ~P2PSocketDispatcher(); void RequestNetworks(); - const net::NetworkInterfaceList& networks() const { return networks_; } + void GetNetworks(net::NetworkInterfaceList* networks); // RenderViewObserver overrides. virtual bool OnMessageReceived(const IPC::Message& message); @@ -66,6 +67,7 @@ class P2PSocketDispatcher : public RenderViewObserver { scoped_refptr<base::MessageLoopProxy> message_loop_; IDMap<P2PSocketClient> clients_; net::NetworkInterfaceList networks_; + base::Lock networks_lock_; DISALLOW_COPY_AND_ASSIGN(P2PSocketDispatcher); }; |