diff options
-rw-r--r-- | content/renderer/p2p/ipc_network_manager.cc | 10 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl.cc | 54 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl.h | 11 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl_unittest.cc | 232 | ||||
-rw-r--r-- | content/renderer/p2p/socket_dispatcher.cc | 6 | ||||
-rw-r--r-- | content/renderer/p2p/socket_dispatcher.h | 4 | ||||
-rw-r--r-- | ppapi/tests/test_transport.cc | 95 | ||||
-rw-r--r-- | ppapi/tests/test_transport.h | 8 | ||||
-rw-r--r-- | ppapi/tests/test_utils.cc | 13 | ||||
-rw-r--r-- | ppapi/tests/test_utils.h | 6 | ||||
-rw-r--r-- | tools/valgrind/gtest_exclude/unit_tests.gtest.txt | 8 | ||||
-rw-r--r-- | webkit/glue/p2p_transport.h | 11 | ||||
-rw-r--r-- | webkit/plugins/ppapi/ppb_transport_impl.cc | 33 | ||||
-rw-r--r-- | webkit/plugins/ppapi/ppb_transport_impl.h | 3 |
14 files changed, 427 insertions, 67 deletions
diff --git a/content/renderer/p2p/ipc_network_manager.cc b/content/renderer/p2p/ipc_network_manager.cc index 3cb7124..ef5f550 100644 --- a/content/renderer/p2p/ipc_network_manager.cc +++ b/content/renderer/p2p/ipc_network_manager.cc @@ -8,8 +8,15 @@ #include "net/base/sys_byteorder.h" #include "content/renderer/p2p/socket_dispatcher.h" +// TODO(sergeyu): Currently the NetworkManager interface is +// syncronous, but it gets list of networks from the browser process +// asyncrhonously, so EnumNetworks() may return an empty list if we +// haven't received list of networks from the browser. Make +// NetworkManager interface asynchronous to avoid this problem. + IpcNetworkManager::IpcNetworkManager(P2PSocketDispatcher* socket_dispatcher) : socket_dispatcher_(socket_dispatcher) { + socket_dispatcher_->RequestNetworks(); } IpcNetworkManager::~IpcNetworkManager() { @@ -18,7 +25,8 @@ IpcNetworkManager::~IpcNetworkManager() { bool IpcNetworkManager::EnumNetworks( bool include_ignored, std::vector<talk_base::Network*>* networks) { socket_dispatcher_->RequestNetworks(); - const net::NetworkInterfaceList& list = socket_dispatcher_->networks(); + net::NetworkInterfaceList list; + socket_dispatcher_->GetNetworks(&list); for (net::NetworkInterfaceList::const_iterator it = list.begin(); it != list.end(); it++) { uint32 address; diff --git a/content/renderer/p2p/p2p_transport_impl.cc b/content/renderer/p2p/p2p_transport_impl.cc index ad542d7..d467342 100644 --- a/content/renderer/p2p/p2p_transport_impl.cc +++ b/content/renderer/p2p/p2p_transport_impl.cc @@ -10,7 +10,9 @@ #include "content/renderer/p2p/ipc_socket_factory.h" #include "content/renderer/render_view.h" #include "jingle/glue/channel_socket_adapter.h" +#include "jingle/glue/pseudotcp_adapter.h" #include "jingle/glue/thread_wrapper.h" +#include "net/base/net_errors.h" #include "third_party/libjingle/source/talk/p2p/base/p2ptransportchannel.h" #include "third_party/libjingle/source/talk/p2p/client/basicportallocator.h" @@ -20,18 +22,26 @@ P2PTransportImpl::P2PTransportImpl( : event_handler_(NULL), state_(STATE_NONE), network_manager_(network_manager), - socket_factory_(socket_factory) { + socket_factory_(socket_factory), + ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_( + this, &P2PTransportImpl::OnTcpConnected)) { } P2PTransportImpl::P2PTransportImpl(P2PSocketDispatcher* socket_dispatcher) - : network_manager_(new IpcNetworkManager(socket_dispatcher)), - socket_factory_(new IpcPacketSocketFactory(socket_dispatcher)) { + : event_handler_(NULL), + state_(STATE_NONE), + network_manager_(new IpcNetworkManager(socket_dispatcher)), + socket_factory_(new IpcPacketSocketFactory(socket_dispatcher)), + ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_( + this, &P2PTransportImpl::OnTcpConnected)) { } P2PTransportImpl::~P2PTransportImpl() { } -bool P2PTransportImpl::Init(const std::string& name, const std::string& config, +bool P2PTransportImpl::Init(const std::string& name, + Protocol protocol, + const std::string& config, EventHandler* event_handler) { DCHECK(event_handler); @@ -52,18 +62,29 @@ bool P2PTransportImpl::Init(const std::string& name, const std::string& config, name, "", NULL, allocator_.get())); channel_->SignalRequestSignaling.connect( this, &P2PTransportImpl::OnRequestSignaling); - channel_->SignalWritableState.connect( - this, &P2PTransportImpl::OnReadableState); - channel_->SignalWritableState.connect( - this, &P2PTransportImpl::OnWriteableState); channel_->SignalCandidateReady.connect( this, &P2PTransportImpl::OnCandidateReady); + if (protocol == PROTOCOL_UDP) { + channel_->SignalWritableState.connect( + this, &P2PTransportImpl::OnReadableState); + channel_->SignalWritableState.connect( + this, &P2PTransportImpl::OnWriteableState); + } + channel_adapter_.reset(new jingle_glue::TransportChannelSocketAdapter( channel_.get())); channel_->Connect(); + if (protocol == PROTOCOL_TCP) { + pseudo_tcp_adapter_.reset(new jingle_glue::PseudoTcpAdapter( + channel_adapter_.release())); + int result = pseudo_tcp_adapter_->Connect(&connect_callback_); + if (result != net::ERR_IO_PENDING) + OnTcpConnected(result); + } + return true; } @@ -163,5 +184,20 @@ bool P2PTransportImpl::DeserializeCandidate(const std::string& address, } net::Socket* P2PTransportImpl::GetChannel() { - return channel_adapter_.get(); + if (pseudo_tcp_adapter_.get()) { + DCHECK(!channel_adapter_.get()); + return pseudo_tcp_adapter_.get(); + } else { + DCHECK(channel_adapter_.get()); + return channel_adapter_.get(); + } +} + +void P2PTransportImpl::OnTcpConnected(int result) { + if (result < 0) { + event_handler_->OnError(result); + return; + } + state_ = static_cast<State>(STATE_READABLE | STATE_WRITABLE); + event_handler_->OnStateChange(state_); } diff --git a/content/renderer/p2p/p2p_transport_impl.h b/content/renderer/p2p/p2p_transport_impl.h index 2c8b947..e427820 100644 --- a/content/renderer/p2p/p2p_transport_impl.h +++ b/content/renderer/p2p/p2p_transport_impl.h @@ -7,6 +7,7 @@ #include "base/basictypes.h" #include "base/scoped_ptr.h" +#include "net/base/completion_callback.h" #include "third_party/libjingle/source/talk/base/sigslot.h" #include "webkit/glue/p2p_transport.h" @@ -22,6 +23,7 @@ class TransportChannelImpl; namespace jingle_glue { class TransportChannelSocketAdapter; +class PseudoTcpAdapter; } // namespace jingle_glue namespace talk_base { @@ -46,7 +48,9 @@ class P2PTransportImpl : public webkit_glue::P2PTransport, virtual ~P2PTransportImpl(); // webkit_glue::P2PTransport interface. - virtual bool Init(const std::string& name, const std::string& config, + virtual bool Init(const std::string& name, + Protocol protocol, + const std::string& config, EventHandler* event_handler) OVERRIDE; virtual bool AddRemoteCandidate(const std::string& address) OVERRIDE; virtual net::Socket* GetChannel() OVERRIDE; @@ -64,6 +68,8 @@ class P2PTransportImpl : public webkit_glue::P2PTransport, bool DeserializeCandidate(const std::string& address, cricket::Candidate* candidate); + void OnTcpConnected(int result); + std::string name_; EventHandler* event_handler_; State state_; @@ -75,6 +81,9 @@ class P2PTransportImpl : public webkit_glue::P2PTransport, scoped_ptr<cricket::P2PTransportChannel> channel_; scoped_ptr<jingle_glue::TransportChannelSocketAdapter> channel_adapter_; + scoped_ptr<jingle_glue::PseudoTcpAdapter> pseudo_tcp_adapter_; + + net::CompletionCallbackImpl<P2PTransportImpl> connect_callback_; DISALLOW_COPY_AND_ASSIGN(P2PTransportImpl); }; diff --git a/content/renderer/p2p/p2p_transport_impl_unittest.cc b/content/renderer/p2p/p2p_transport_impl_unittest.cc index 393fd0d..718111b 100644 --- a/content/renderer/p2p/p2p_transport_impl_unittest.cc +++ b/content/renderer/p2p/p2p_transport_impl_unittest.cc @@ -39,10 +39,12 @@ const char kTestConfig[] = ""; const int kMessageSize = 10; const int kMessages = 10; const int kUdpWriteDelayMs = 10; +const int kTcpDataSize = 10 * 1024; +const int kTcpWriteDelayMs = 1; -class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { +class UdpChannelTester : public base::RefCountedThreadSafe<UdpChannelTester> { public: - ChannelTester(MessageLoop* message_loop, + UdpChannelTester(MessageLoop* message_loop, net::Socket* write_socket, net::Socket* read_socket) : message_loop_(message_loop), @@ -50,9 +52,9 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { read_socket_(read_socket), done_(false), ALLOW_THIS_IN_INITIALIZER_LIST( - write_cb_(this, &ChannelTester::OnWritten)), + write_cb_(this, &UdpChannelTester::OnWritten)), ALLOW_THIS_IN_INITIALIZER_LIST( - read_cb_(this, &ChannelTester::OnRead)), + read_cb_(this, &UdpChannelTester::OnRead)), write_errors_(0), read_errors_(0), packets_sent_(0), @@ -60,11 +62,11 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { broken_packets_(0) { } - virtual ~ChannelTester() { } + virtual ~UdpChannelTester() { } void Start() { message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoStart)); + FROM_HERE, NewRunnableMethod(this, &UdpChannelTester::DoStart)); } void CheckResults() { @@ -119,7 +121,7 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { EXPECT_EQ(kMessageSize, result); packets_sent_++; message_loop_->PostDelayedTask( - FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoWrite), + FROM_HERE, NewRunnableMethod(this, &UdpChannelTester::DoWrite), kUdpWriteDelayMs); } } @@ -177,8 +179,8 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { scoped_refptr<net::IOBuffer> sent_packets_[kMessages]; scoped_refptr<net::IOBuffer> read_buffer_; - net::CompletionCallbackImpl<ChannelTester> write_cb_; - net::CompletionCallbackImpl<ChannelTester> read_cb_; + net::CompletionCallbackImpl<UdpChannelTester> write_cb_; + net::CompletionCallbackImpl<UdpChannelTester> read_cb_; int write_errors_; int read_errors_; int packets_sent_; @@ -186,19 +188,151 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { int broken_packets_; }; +class TcpChannelTester : public base::RefCountedThreadSafe<TcpChannelTester> { + public: + TcpChannelTester(MessageLoop* message_loop, + net::Socket* write_socket, + net::Socket* read_socket) + : message_loop_(message_loop), + write_socket_(write_socket), + read_socket_(read_socket), + done_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_cb_(this, &TcpChannelTester::OnWritten)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_cb_(this, &TcpChannelTester::OnRead)), + write_errors_(0), + read_errors_(0) { + } + + virtual ~TcpChannelTester() { } + + void Start() { + // Initialize |send_buffer_|. + send_buffer_ = new net::DrainableIOBuffer(new net::IOBuffer(kTcpDataSize), + kTcpDataSize); + for (int i = 0; i < kTcpDataSize; ++i) { + send_buffer_->data()[i] = rand() % 256; + } + + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &TcpChannelTester::DoStart)); + } + + void CheckResults() { + EXPECT_EQ(0, write_errors_); + EXPECT_EQ(0, read_errors_); + + EXPECT_EQ(0, send_buffer_->BytesRemaining()); + + send_buffer_->SetOffset(0); + EXPECT_EQ(kTcpDataSize, static_cast<int>(received_data_.size())); + EXPECT_EQ(0, memcmp(send_buffer_->data(), + &received_data_[0], received_data_.size())); + } + + protected: + void Done() { + done_ = true; + message_loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask()); + } + + void DoStart() { + DoRead(); + DoWrite(); + } + + void DoWrite() { + if (send_buffer_->BytesRemaining() == 0) { + return; + } + + int result = write_socket_->Write( + send_buffer_, send_buffer_->BytesRemaining(), &write_cb_); + HandleWriteResult(result); + } + + void OnWritten(int result) { + HandleWriteResult(result); + } + + void HandleWriteResult(int result) { + if (result <= 0 && result != net::ERR_IO_PENDING) { + LOG(ERROR) << "Received error " << result << " when trying to write"; + write_errors_++; + Done(); + } else if (result > 0) { + send_buffer_->DidConsume(result); + message_loop_->PostDelayedTask( + FROM_HERE, NewRunnableMethod(this, &TcpChannelTester::DoWrite), + kTcpWriteDelayMs); + } + } + + void DoRead() { + int result = 1; + while (result > 0) { + int kReadSize = kMessageSize * 2; + read_buffer_ = new net::IOBuffer(kReadSize); + + result = read_socket_->Read(read_buffer_, kReadSize, &read_cb_); + HandleReadResult(result); + }; + } + + void OnRead(int result) { + HandleReadResult(result); + DoRead(); + } + + void HandleReadResult(int result) { + if (result <= 0 && result != net::ERR_IO_PENDING) { + // Error will be received after the socket is closed. + if (!done_) { + LOG(ERROR) << "Received error " << result << " when trying to read"; + read_errors_++; + Done(); + } + } else if (result > 0) { + received_data_.insert(received_data_.end(), read_buffer_->data(), + read_buffer_->data() + result); + if (static_cast<int>(received_data_.size()) == kTcpDataSize) + Done(); + } + } + + private: + MessageLoop* message_loop_; + net::Socket* write_socket_; + net::Socket* read_socket_; + bool done_; + + scoped_refptr<net::DrainableIOBuffer> send_buffer_; + scoped_refptr<net::IOBuffer> read_buffer_; + + std::vector<char> sent_data_; + std::vector<char> received_data_; + + net::CompletionCallbackImpl<TcpChannelTester> write_cb_; + net::CompletionCallbackImpl<TcpChannelTester> read_cb_; + int write_errors_; + int read_errors_; +}; + } // namespace class MockP2PEventHandler : public P2PTransport::EventHandler { public: MOCK_METHOD1(OnCandidateReady, void(const std::string& address)); MOCK_METHOD1(OnStateChange, void(P2PTransport::State state)); + MOCK_METHOD1(OnError, void(int error)); }; class P2PTransportImplTest : public testing::Test { public: protected: - void SetUp() OVERRIDE { + virtual void SetUp() OVERRIDE { socket_manager_ = new jingle_glue::FakeSocketManager(); net::IPAddressNumber ip; @@ -213,6 +347,13 @@ class P2PTransportImplTest : public testing::Test { new jingle_glue::FakeSocketFactory(socket_manager_, ip))); } + void Init(P2PTransport::Protocol protocol) { + ASSERT_TRUE(transport1_->Init( + kTransportName1, protocol, kTestConfig, &event_handler1_)); + ASSERT_TRUE(transport2_->Init( + kTransportName2, protocol, kTestConfig, &event_handler2_)); + } + MessageLoop message_loop_; scoped_refptr<jingle_glue::FakeSocketManager> socket_manager_; @@ -223,10 +364,7 @@ class P2PTransportImplTest : public testing::Test { }; TEST_F(P2PTransportImplTest, Create) { - ASSERT_TRUE(transport1_->Init( - kTransportName1, kTestConfig, &event_handler1_)); - ASSERT_TRUE(transport2_->Init( - kTransportName2, kTestConfig, &event_handler2_)); + Init(P2PTransport::PROTOCOL_UDP); EXPECT_CALL(event_handler1_, OnCandidateReady(_)); EXPECT_CALL(event_handler2_, OnCandidateReady(_)); @@ -238,11 +376,19 @@ ACTION_P(AddRemoteCandidate, transport) { EXPECT_TRUE(transport->AddRemoteCandidate(arg0)); } -TEST_F(P2PTransportImplTest, Connect) { - ASSERT_TRUE(transport1_->Init( - kTransportName1, kTestConfig, &event_handler1_)); - ASSERT_TRUE(transport2_->Init( - kTransportName2, kTestConfig, &event_handler2_)); +TEST_F(P2PTransportImplTest, ConnectUdp) { + Init(P2PTransport::PROTOCOL_UDP); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport2_.get())); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport1_.get())); + + message_loop_.RunAllPending(); +} + +TEST_F(P2PTransportImplTest, ConnectTcp) { + Init(P2PTransport::PROTOCOL_UDP); EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( AddRemoteCandidate(transport2_.get())); @@ -252,11 +398,47 @@ TEST_F(P2PTransportImplTest, Connect) { message_loop_.RunAllPending(); } -TEST_F(P2PTransportImplTest, SendData) { - ASSERT_TRUE(transport1_->Init( - kTransportName1, kTestConfig, &event_handler1_)); - ASSERT_TRUE(transport2_->Init( - kTransportName2, kTestConfig, &event_handler2_)); +TEST_F(P2PTransportImplTest, SendDataUdp) { + Init(P2PTransport::PROTOCOL_UDP); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport2_.get())); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport1_.get())); + + // Transport may first become ether readable or writable, but + // eventually it must be readable and writable. + EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_READABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_WRITABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler1_, OnStateChange( + static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE | + P2PTransport::STATE_WRITABLE))) + .Times(Exactly(1)); + + EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_READABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_WRITABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler2_, OnStateChange( + static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE | + P2PTransport::STATE_WRITABLE))) + .Times(Exactly(1)); + + scoped_refptr<UdpChannelTester> channel_tester = new UdpChannelTester( + &message_loop_, transport1_->GetChannel(), transport2_->GetChannel()); + + message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), + TestTimeouts::action_max_timeout_ms()); + + channel_tester->Start(); + message_loop_.Run(); + channel_tester->CheckResults(); +} + +TEST_F(P2PTransportImplTest, SendDataTcp) { + Init(P2PTransport::PROTOCOL_TCP); EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( AddRemoteCandidate(transport2_.get())); @@ -283,7 +465,7 @@ TEST_F(P2PTransportImplTest, SendData) { P2PTransport::STATE_WRITABLE))) .Times(Exactly(1)); - scoped_refptr<ChannelTester> channel_tester = new ChannelTester( + scoped_refptr<TcpChannelTester> channel_tester = new TcpChannelTester( &message_loop_, transport1_->GetChannel(), transport2_->GetChannel()); message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), diff --git a/content/renderer/p2p/socket_dispatcher.cc b/content/renderer/p2p/socket_dispatcher.cc index 065bd23..429d7bc 100644 --- a/content/renderer/p2p/socket_dispatcher.cc +++ b/content/renderer/p2p/socket_dispatcher.cc @@ -23,6 +23,11 @@ void P2PSocketDispatcher::RequestNetworks() { Send(new P2PHostMsg_GetNetworkList(routing_id())); } +void P2PSocketDispatcher::GetNetworks(net::NetworkInterfaceList* networks) { + base::AutoLock auto_lock(networks_lock_); + *networks = networks_; +} + bool P2PSocketDispatcher::OnMessageReceived(const IPC::Message& message) { bool handled = true; IPC_BEGIN_MESSAGE_MAP(P2PSocketDispatcher, message) @@ -55,6 +60,7 @@ base::MessageLoopProxy* P2PSocketDispatcher::message_loop() { void P2PSocketDispatcher::OnNetworkList( const net::NetworkInterfaceList& networks) { + base::AutoLock auto_lock(networks_lock_); networks_ = networks; } diff --git a/content/renderer/p2p/socket_dispatcher.h b/content/renderer/p2p/socket_dispatcher.h index 4d0c273..b91b847 100644 --- a/content/renderer/p2p/socket_dispatcher.h +++ b/content/renderer/p2p/socket_dispatcher.h @@ -22,6 +22,7 @@ #include <vector> #include "base/id_map.h" +#include "base/synchronization/lock.h" #include "content/common/p2p_sockets.h" #include "content/renderer/p2p/socket_client.h" #include "content/renderer/render_view_observer.h" @@ -39,7 +40,7 @@ class P2PSocketDispatcher : public RenderViewObserver { virtual ~P2PSocketDispatcher(); void RequestNetworks(); - const net::NetworkInterfaceList& networks() const { return networks_; } + void GetNetworks(net::NetworkInterfaceList* networks); // RenderViewObserver overrides. virtual bool OnMessageReceived(const IPC::Message& message); @@ -66,6 +67,7 @@ class P2PSocketDispatcher : public RenderViewObserver { scoped_refptr<base::MessageLoopProxy> message_loop_; IDMap<P2PSocketClient> clients_; net::NetworkInterfaceList networks_; + base::Lock networks_lock_; DISALLOW_COPY_AND_ASSIGN(P2PSocketDispatcher); }; diff --git a/ppapi/tests/test_transport.cc b/ppapi/tests/test_transport.cc index bcf9701..80f9475 100644 --- a/ppapi/tests/test_transport.cc +++ b/ppapi/tests/test_transport.cc @@ -22,7 +22,7 @@ REGISTER_TEST_CASE(Transport); #define RUN_SUBTEST(function) { \ - std::string result = function(); \ + std::string result = function; \ if (!result.empty()) \ return result; \ } @@ -96,13 +96,15 @@ bool TestTransport::Init() { void TestTransport::RunTest() { RUN_TEST(Create); RUN_TEST(Connect); - RUN_TEST(SendData); - RUN_TEST(ConnectAndClose); + RUN_TEST(SendDataUdp); + RUN_TEST(SendDataTcp); + RUN_TEST(ConnectAndCloseUdp); + RUN_TEST(ConnectAndCloseTcp); } -std::string TestTransport::InitTargets() { - transport1_.reset(new pp::Transport_Dev(instance_, kTestChannelName, "")); - transport2_.reset(new pp::Transport_Dev(instance_, kTestChannelName, "")); +std::string TestTransport::InitTargets(const char* proto) { + transport1_.reset(new pp::Transport_Dev(instance_, kTestChannelName, proto)); + transport2_.reset(new pp::Transport_Dev(instance_, kTestChannelName, proto)); ASSERT_TRUE(transport1_.get() != NULL); ASSERT_TRUE(transport2_.get() != NULL); @@ -149,7 +151,7 @@ std::string TestTransport::Clean() { } std::string TestTransport::TestCreate() { - RUN_SUBTEST(InitTargets); + RUN_SUBTEST(InitTargets("udp")); Clean(); @@ -157,17 +159,17 @@ std::string TestTransport::TestCreate() { } std::string TestTransport::TestConnect() { - RUN_SUBTEST(InitTargets); - RUN_SUBTEST(Connect); + RUN_SUBTEST(InitTargets("udp")); + RUN_SUBTEST(Connect()); Clean(); PASS(); } -std::string TestTransport::TestSendData() { - RUN_SUBTEST(InitTargets); - RUN_SUBTEST(Connect); +std::string TestTransport::TestSendDataUdp() { + RUN_SUBTEST(InitTargets("udp")); + RUN_SUBTEST(Connect()); StreamReader reader(transport1_.get()); @@ -206,9 +208,72 @@ std::string TestTransport::TestSendData() { PASS(); } -std::string TestTransport::TestConnectAndClose() { - RUN_SUBTEST(InitTargets); - RUN_SUBTEST(Connect); +std::string TestTransport::TestSendDataTcp() { + RUN_SUBTEST(InitTargets("tcp")); + RUN_SUBTEST(Connect()); + + StreamReader reader(transport1_.get()); + + std::vector<char> sent_data; + for (int i = 0; i < kNumPackets; ++i) { + std::vector<char> send_buffer(kSendBufferSize); + for (size_t j = 0; j < send_buffer.size(); ++j) { + send_buffer[j] = rand() % 256; + } + + TestCompletionCallback send_cb(instance_->pp_instance()); + int result = transport2_->Send(&send_buffer[0], send_buffer.size(), + send_cb); + if (result == PP_OK_COMPLETIONPENDING) + result = send_cb.WaitForResult(); + ASSERT_TRUE(result > 0); + sent_data.insert(sent_data.end(), send_buffer.begin(), + send_buffer.begin() + result); + } + + // Wait for 1 second. + TestCompletionCallback wait_cb(instance_->pp_instance()); + pp::Module::Get()->core()->CallOnMainThread(1000, wait_cb); + ASSERT_EQ(wait_cb.WaitForResult(), PP_OK); + + ASSERT_TRUE(reader.errors().size() == 0); + + std::vector<char> received_data; + for (std::list<std::vector<char> >::const_iterator it = + reader.received().begin(); it != reader.received().end(); ++it) { + received_data.insert(received_data.end(), it->begin(), it->end()); + } + ASSERT_EQ(sent_data, received_data); + + Clean(); + + PASS(); +} + +std::string TestTransport::TestConnectAndCloseUdp() { + RUN_SUBTEST(InitTargets("udp")); + RUN_SUBTEST(Connect()); + + std::vector<char> recv_buffer(kReadBufferSize); + TestCompletionCallback recv_cb(instance_->pp_instance()); + ASSERT_EQ( + transport1_->Recv(&recv_buffer[0], recv_buffer.size(), recv_cb), + PP_OK_COMPLETIONPENDING); + + // Close the transport and verify that callback is aborted. + ASSERT_EQ(transport1_->Close(), PP_OK); + + ASSERT_EQ(recv_cb.run_count(), 1); + ASSERT_EQ(recv_cb.result(), PP_ERROR_ABORTED); + + Clean(); + + PASS(); +} + +std::string TestTransport::TestConnectAndCloseTcp() { + RUN_SUBTEST(InitTargets("tcp")); + RUN_SUBTEST(Connect()); std::vector<char> recv_buffer(kReadBufferSize); TestCompletionCallback recv_cb(instance_->pp_instance()); diff --git a/ppapi/tests/test_transport.h b/ppapi/tests/test_transport.h index f883b6a..2f3a8147 100644 --- a/ppapi/tests/test_transport.h +++ b/ppapi/tests/test_transport.h @@ -25,14 +25,16 @@ class TestTransport : public TestCase { virtual void RunTest(); private: - std::string InitTargets(); + std::string InitTargets(const char* proto); std::string Connect(); std::string Clean(); std::string TestCreate(); std::string TestConnect(); - std::string TestSendData(); - std::string TestConnectAndClose(); + std::string TestSendDataTcp(); + std::string TestSendDataUdp(); + std::string TestConnectAndCloseTcp(); + std::string TestConnectAndCloseUdp(); // Used by the tests that access the C API directly. const PPB_Transport_Dev* transport_interface_; diff --git a/ppapi/tests/test_utils.cc b/ppapi/tests/test_utils.cc index 622c758..7e0a351 100644 --- a/ppapi/tests/test_utils.cc +++ b/ppapi/tests/test_utils.cc @@ -27,16 +27,20 @@ std::string ReportError(const char* method, int32_t error) { } TestCompletionCallback::TestCompletionCallback(PP_Instance instance) - : result_(PP_OK_COMPLETIONPENDING), + : have_result_(false), + result_(PP_OK_COMPLETIONPENDING), post_quit_task_(false), run_count_(0), instance_(instance) { } int32_t TestCompletionCallback::WaitForResult() { - result_ = PP_OK_COMPLETIONPENDING; // Reset - post_quit_task_ = true; - GetTestingInterface()->RunMessageLoop(instance_); + if (!have_result_) { + result_ = PP_OK_COMPLETIONPENDING; // Reset + post_quit_task_ = true; + GetTestingInterface()->RunMessageLoop(instance_); + } + have_result_ = false; return result_; } @@ -50,6 +54,7 @@ void TestCompletionCallback::Handler(void* user_data, int32_t result) { TestCompletionCallback* callback = static_cast<TestCompletionCallback*>(user_data); callback->result_ = result; + callback->have_result_ = true; callback->run_count_++; if (callback->post_quit_task_) { callback->post_quit_task_ = false; diff --git a/ppapi/tests/test_utils.h b/ppapi/tests/test_utils.h index b71cc05..721a048 100644 --- a/ppapi/tests/test_utils.h +++ b/ppapi/tests/test_utils.h @@ -19,6 +19,11 @@ class TestCompletionCallback { public: TestCompletionCallback(PP_Instance instance); + // Waits for the callback to be called and returns the + // result. Returns immediately if the callback was previously called + // and the result wasn't returned (i.e. each result value received + // by the callback is returned by WaitForResult() once and only + // once). int32_t WaitForResult(); operator pp::CompletionCallback() const; @@ -31,6 +36,7 @@ class TestCompletionCallback { private: static void Handler(void* user_data, int32_t result); + bool have_result_; int32_t result_; bool post_quit_task_; unsigned run_count_; diff --git a/tools/valgrind/gtest_exclude/unit_tests.gtest.txt b/tools/valgrind/gtest_exclude/unit_tests.gtest.txt index ab9bc2b..6884731 100644 --- a/tools/valgrind/gtest_exclude/unit_tests.gtest.txt +++ b/tools/valgrind/gtest_exclude/unit_tests.gtest.txt @@ -9,12 +9,14 @@ PredictorTest.MassiveConcurrentLookupTest # Pure virtual method called: see http://crbug.com/50950 ConnectionTesterTest.RunAllTests -# Following two tests fail under valgrind because libjingle has hardcoded +# Following tests fail under valgrind because libjingle has hardcoded # timeouts for P2P connections, and it makes these tests fail under valgrind. # TODO(sergeyu): Remove hardcoded timeouts from libjingle. P2PTransportImplTest.Create -P2PTransportImplTest.Connect -P2PTransportImplTest.SendData +P2PTransportImplTest.ConnectUdp +P2PTransportImplTest.ConnectTcp +P2PTransportImplTest.SendDataUdp +P2PTransportImplTest.SendDataTcp # Failing on CrOS, see http://crbug.com/79657 SignedSettingsTest.StorePolicyNoPolicyData diff --git a/webkit/glue/p2p_transport.h b/webkit/glue/p2p_transport.h index e80ffb5..2c3200f 100644 --- a/webkit/glue/p2p_transport.h +++ b/webkit/glue/p2p_transport.h @@ -22,6 +22,11 @@ class P2PTransport { STATE_READABLE = 2, }; + enum Protocol { + PROTOCOL_UDP = 0, + PROTOCOL_TCP = 1, + }; + class EventHandler { public: virtual ~EventHandler() {} @@ -31,6 +36,11 @@ class P2PTransport { // Called when readable of writable state of the stream changes. virtual void OnStateChange(State state) = 0; + + // Called when an error occures (e.g. TCP handshake + // failed). P2PTransport object is not usable after that and + // should be destroyed. + virtual void OnError(int error) = 0; }; virtual ~P2PTransport() {} @@ -38,6 +48,7 @@ class P2PTransport { // Initialize transport using specified configuration. Returns true // if initialization succeeded. virtual bool Init(const std::string& name, + Protocol protocol, const std::string& config, EventHandler* event_handler) = 0; diff --git a/webkit/plugins/ppapi/ppb_transport_impl.cc b/webkit/plugins/ppapi/ppb_transport_impl.cc index 3f48721..a37f299 100644 --- a/webkit/plugins/ppapi/ppb_transport_impl.cc +++ b/webkit/plugins/ppapi/ppb_transport_impl.cc @@ -5,6 +5,7 @@ #include "webkit/plugins/ppapi/ppb_transport_impl.h" #include "base/message_loop.h" +#include "base/string_util.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/socket/socket.h" @@ -16,11 +17,16 @@ #include "webkit/plugins/ppapi/ppapi_plugin_instance.h" #include "webkit/plugins/ppapi/var.h" +using webkit_glue::P2PTransport; + namespace webkit { namespace ppapi { namespace { +const char kUdpProtocolName[] = "udp"; +const char kTcpProtocolName[] = "tcp"; + PP_Resource CreateTransport(PP_Instance instance_id, const char* name, const char* proto) { PluginInstance* instance = ResourceTracker::Get()->GetInstance(instance_id); @@ -137,7 +143,16 @@ PPB_Transport_Impl* PPB_Transport_Impl::AsPPB_Transport_Impl() { bool PPB_Transport_Impl::Init(const char* name, const char* proto) { name_ = name; - proto_ = proto; + + if (base::strcasecmp(proto, kUdpProtocolName) == 0) { + use_tcp_ = false; + } else if (base::strcasecmp(proto, kTcpProtocolName) == 0) { + use_tcp_ = true; + } else { + LOG(WARNING) << "Unknown protocol: " << proto; + return false; + } + p2p_transport_.reset(instance()->delegate()->CreateP2PTransport()); return p2p_transport_.get() != NULL; } @@ -153,13 +168,14 @@ int32_t PPB_Transport_Impl::Connect(PP_CompletionCallback callback) { if (!p2p_transport_.get()) return PP_ERROR_FAILED; - // TODO(sergeyu): Use |proto_| here. - // Connect() has already been called. if (started_) return PP_ERROR_INPROGRESS; - if (!p2p_transport_->Init(name_, "", this)) + P2PTransport::Protocol protocol = use_tcp_ ? + P2PTransport::PROTOCOL_TCP : P2PTransport::PROTOCOL_UDP; + + if (!p2p_transport_->Init(name_, protocol, "", this)) return PP_ERROR_FAILED; started_ = true; @@ -285,6 +301,15 @@ void PPB_Transport_Impl::OnStateChange(webkit_glue::P2PTransport::State state) { } } +void PPB_Transport_Impl::OnError(int error) { + writable_ = false; + if (connect_callback_.get() && !connect_callback_->completed()) { + scoped_refptr<TrackedCompletionCallback> callback; + callback.swap(connect_callback_); + callback->Run(PP_ERROR_FAILED); + } +} + void PPB_Transport_Impl::OnRead(int result) { DCHECK(recv_callback_.get() && !recv_callback_->completed()); diff --git a/webkit/plugins/ppapi/ppb_transport_impl.h b/webkit/plugins/ppapi/ppb_transport_impl.h index c368b07..ca4818f 100644 --- a/webkit/plugins/ppapi/ppb_transport_impl.h +++ b/webkit/plugins/ppapi/ppb_transport_impl.h @@ -43,13 +43,14 @@ class PPB_Transport_Impl : public Resource, // webkit_glue::P2PTransport::EventHandler implementation. virtual void OnCandidateReady(const std::string& address) OVERRIDE; virtual void OnStateChange(webkit_glue::P2PTransport::State state) OVERRIDE; + virtual void OnError(int error) OVERRIDE; private: void OnRead(int result); void OnWritten(int result); std::string name_; - std::string proto_; + bool use_tcp_; bool started_; scoped_ptr<webkit_glue::P2PTransport> p2p_transport_; bool writable_; |