summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--content/renderer/p2p/ipc_network_manager.cc10
-rw-r--r--content/renderer/p2p/p2p_transport_impl.cc54
-rw-r--r--content/renderer/p2p/p2p_transport_impl.h11
-rw-r--r--content/renderer/p2p/p2p_transport_impl_unittest.cc232
-rw-r--r--content/renderer/p2p/socket_dispatcher.cc6
-rw-r--r--content/renderer/p2p/socket_dispatcher.h4
-rw-r--r--ppapi/tests/test_transport.cc95
-rw-r--r--ppapi/tests/test_transport.h8
-rw-r--r--ppapi/tests/test_utils.cc13
-rw-r--r--ppapi/tests/test_utils.h6
-rw-r--r--tools/valgrind/gtest_exclude/unit_tests.gtest.txt8
-rw-r--r--webkit/glue/p2p_transport.h11
-rw-r--r--webkit/plugins/ppapi/ppb_transport_impl.cc33
-rw-r--r--webkit/plugins/ppapi/ppb_transport_impl.h3
14 files changed, 427 insertions, 67 deletions
diff --git a/content/renderer/p2p/ipc_network_manager.cc b/content/renderer/p2p/ipc_network_manager.cc
index 3cb7124..ef5f550 100644
--- a/content/renderer/p2p/ipc_network_manager.cc
+++ b/content/renderer/p2p/ipc_network_manager.cc
@@ -8,8 +8,15 @@
#include "net/base/sys_byteorder.h"
#include "content/renderer/p2p/socket_dispatcher.h"
+// TODO(sergeyu): Currently the NetworkManager interface is
+// syncronous, but it gets list of networks from the browser process
+// asyncrhonously, so EnumNetworks() may return an empty list if we
+// haven't received list of networks from the browser. Make
+// NetworkManager interface asynchronous to avoid this problem.
+
IpcNetworkManager::IpcNetworkManager(P2PSocketDispatcher* socket_dispatcher)
: socket_dispatcher_(socket_dispatcher) {
+ socket_dispatcher_->RequestNetworks();
}
IpcNetworkManager::~IpcNetworkManager() {
@@ -18,7 +25,8 @@ IpcNetworkManager::~IpcNetworkManager() {
bool IpcNetworkManager::EnumNetworks(
bool include_ignored, std::vector<talk_base::Network*>* networks) {
socket_dispatcher_->RequestNetworks();
- const net::NetworkInterfaceList& list = socket_dispatcher_->networks();
+ net::NetworkInterfaceList list;
+ socket_dispatcher_->GetNetworks(&list);
for (net::NetworkInterfaceList::const_iterator it = list.begin();
it != list.end(); it++) {
uint32 address;
diff --git a/content/renderer/p2p/p2p_transport_impl.cc b/content/renderer/p2p/p2p_transport_impl.cc
index ad542d7..d467342 100644
--- a/content/renderer/p2p/p2p_transport_impl.cc
+++ b/content/renderer/p2p/p2p_transport_impl.cc
@@ -10,7 +10,9 @@
#include "content/renderer/p2p/ipc_socket_factory.h"
#include "content/renderer/render_view.h"
#include "jingle/glue/channel_socket_adapter.h"
+#include "jingle/glue/pseudotcp_adapter.h"
#include "jingle/glue/thread_wrapper.h"
+#include "net/base/net_errors.h"
#include "third_party/libjingle/source/talk/p2p/base/p2ptransportchannel.h"
#include "third_party/libjingle/source/talk/p2p/client/basicportallocator.h"
@@ -20,18 +22,26 @@ P2PTransportImpl::P2PTransportImpl(
: event_handler_(NULL),
state_(STATE_NONE),
network_manager_(network_manager),
- socket_factory_(socket_factory) {
+ socket_factory_(socket_factory),
+ ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_(
+ this, &P2PTransportImpl::OnTcpConnected)) {
}
P2PTransportImpl::P2PTransportImpl(P2PSocketDispatcher* socket_dispatcher)
- : network_manager_(new IpcNetworkManager(socket_dispatcher)),
- socket_factory_(new IpcPacketSocketFactory(socket_dispatcher)) {
+ : event_handler_(NULL),
+ state_(STATE_NONE),
+ network_manager_(new IpcNetworkManager(socket_dispatcher)),
+ socket_factory_(new IpcPacketSocketFactory(socket_dispatcher)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_(
+ this, &P2PTransportImpl::OnTcpConnected)) {
}
P2PTransportImpl::~P2PTransportImpl() {
}
-bool P2PTransportImpl::Init(const std::string& name, const std::string& config,
+bool P2PTransportImpl::Init(const std::string& name,
+ Protocol protocol,
+ const std::string& config,
EventHandler* event_handler) {
DCHECK(event_handler);
@@ -52,18 +62,29 @@ bool P2PTransportImpl::Init(const std::string& name, const std::string& config,
name, "", NULL, allocator_.get()));
channel_->SignalRequestSignaling.connect(
this, &P2PTransportImpl::OnRequestSignaling);
- channel_->SignalWritableState.connect(
- this, &P2PTransportImpl::OnReadableState);
- channel_->SignalWritableState.connect(
- this, &P2PTransportImpl::OnWriteableState);
channel_->SignalCandidateReady.connect(
this, &P2PTransportImpl::OnCandidateReady);
+ if (protocol == PROTOCOL_UDP) {
+ channel_->SignalWritableState.connect(
+ this, &P2PTransportImpl::OnReadableState);
+ channel_->SignalWritableState.connect(
+ this, &P2PTransportImpl::OnWriteableState);
+ }
+
channel_adapter_.reset(new jingle_glue::TransportChannelSocketAdapter(
channel_.get()));
channel_->Connect();
+ if (protocol == PROTOCOL_TCP) {
+ pseudo_tcp_adapter_.reset(new jingle_glue::PseudoTcpAdapter(
+ channel_adapter_.release()));
+ int result = pseudo_tcp_adapter_->Connect(&connect_callback_);
+ if (result != net::ERR_IO_PENDING)
+ OnTcpConnected(result);
+ }
+
return true;
}
@@ -163,5 +184,20 @@ bool P2PTransportImpl::DeserializeCandidate(const std::string& address,
}
net::Socket* P2PTransportImpl::GetChannel() {
- return channel_adapter_.get();
+ if (pseudo_tcp_adapter_.get()) {
+ DCHECK(!channel_adapter_.get());
+ return pseudo_tcp_adapter_.get();
+ } else {
+ DCHECK(channel_adapter_.get());
+ return channel_adapter_.get();
+ }
+}
+
+void P2PTransportImpl::OnTcpConnected(int result) {
+ if (result < 0) {
+ event_handler_->OnError(result);
+ return;
+ }
+ state_ = static_cast<State>(STATE_READABLE | STATE_WRITABLE);
+ event_handler_->OnStateChange(state_);
}
diff --git a/content/renderer/p2p/p2p_transport_impl.h b/content/renderer/p2p/p2p_transport_impl.h
index 2c8b947..e427820 100644
--- a/content/renderer/p2p/p2p_transport_impl.h
+++ b/content/renderer/p2p/p2p_transport_impl.h
@@ -7,6 +7,7 @@
#include "base/basictypes.h"
#include "base/scoped_ptr.h"
+#include "net/base/completion_callback.h"
#include "third_party/libjingle/source/talk/base/sigslot.h"
#include "webkit/glue/p2p_transport.h"
@@ -22,6 +23,7 @@ class TransportChannelImpl;
namespace jingle_glue {
class TransportChannelSocketAdapter;
+class PseudoTcpAdapter;
} // namespace jingle_glue
namespace talk_base {
@@ -46,7 +48,9 @@ class P2PTransportImpl : public webkit_glue::P2PTransport,
virtual ~P2PTransportImpl();
// webkit_glue::P2PTransport interface.
- virtual bool Init(const std::string& name, const std::string& config,
+ virtual bool Init(const std::string& name,
+ Protocol protocol,
+ const std::string& config,
EventHandler* event_handler) OVERRIDE;
virtual bool AddRemoteCandidate(const std::string& address) OVERRIDE;
virtual net::Socket* GetChannel() OVERRIDE;
@@ -64,6 +68,8 @@ class P2PTransportImpl : public webkit_glue::P2PTransport,
bool DeserializeCandidate(const std::string& address,
cricket::Candidate* candidate);
+ void OnTcpConnected(int result);
+
std::string name_;
EventHandler* event_handler_;
State state_;
@@ -75,6 +81,9 @@ class P2PTransportImpl : public webkit_glue::P2PTransport,
scoped_ptr<cricket::P2PTransportChannel> channel_;
scoped_ptr<jingle_glue::TransportChannelSocketAdapter> channel_adapter_;
+ scoped_ptr<jingle_glue::PseudoTcpAdapter> pseudo_tcp_adapter_;
+
+ net::CompletionCallbackImpl<P2PTransportImpl> connect_callback_;
DISALLOW_COPY_AND_ASSIGN(P2PTransportImpl);
};
diff --git a/content/renderer/p2p/p2p_transport_impl_unittest.cc b/content/renderer/p2p/p2p_transport_impl_unittest.cc
index 393fd0d..718111b 100644
--- a/content/renderer/p2p/p2p_transport_impl_unittest.cc
+++ b/content/renderer/p2p/p2p_transport_impl_unittest.cc
@@ -39,10 +39,12 @@ const char kTestConfig[] = "";
const int kMessageSize = 10;
const int kMessages = 10;
const int kUdpWriteDelayMs = 10;
+const int kTcpDataSize = 10 * 1024;
+const int kTcpWriteDelayMs = 1;
-class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
+class UdpChannelTester : public base::RefCountedThreadSafe<UdpChannelTester> {
public:
- ChannelTester(MessageLoop* message_loop,
+ UdpChannelTester(MessageLoop* message_loop,
net::Socket* write_socket,
net::Socket* read_socket)
: message_loop_(message_loop),
@@ -50,9 +52,9 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
read_socket_(read_socket),
done_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(
- write_cb_(this, &ChannelTester::OnWritten)),
+ write_cb_(this, &UdpChannelTester::OnWritten)),
ALLOW_THIS_IN_INITIALIZER_LIST(
- read_cb_(this, &ChannelTester::OnRead)),
+ read_cb_(this, &UdpChannelTester::OnRead)),
write_errors_(0),
read_errors_(0),
packets_sent_(0),
@@ -60,11 +62,11 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
broken_packets_(0) {
}
- virtual ~ChannelTester() { }
+ virtual ~UdpChannelTester() { }
void Start() {
message_loop_->PostTask(
- FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoStart));
+ FROM_HERE, NewRunnableMethod(this, &UdpChannelTester::DoStart));
}
void CheckResults() {
@@ -119,7 +121,7 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
EXPECT_EQ(kMessageSize, result);
packets_sent_++;
message_loop_->PostDelayedTask(
- FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoWrite),
+ FROM_HERE, NewRunnableMethod(this, &UdpChannelTester::DoWrite),
kUdpWriteDelayMs);
}
}
@@ -177,8 +179,8 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
scoped_refptr<net::IOBuffer> sent_packets_[kMessages];
scoped_refptr<net::IOBuffer> read_buffer_;
- net::CompletionCallbackImpl<ChannelTester> write_cb_;
- net::CompletionCallbackImpl<ChannelTester> read_cb_;
+ net::CompletionCallbackImpl<UdpChannelTester> write_cb_;
+ net::CompletionCallbackImpl<UdpChannelTester> read_cb_;
int write_errors_;
int read_errors_;
int packets_sent_;
@@ -186,19 +188,151 @@ class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
int broken_packets_;
};
+class TcpChannelTester : public base::RefCountedThreadSafe<TcpChannelTester> {
+ public:
+ TcpChannelTester(MessageLoop* message_loop,
+ net::Socket* write_socket,
+ net::Socket* read_socket)
+ : message_loop_(message_loop),
+ write_socket_(write_socket),
+ read_socket_(read_socket),
+ done_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_cb_(this, &TcpChannelTester::OnWritten)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_cb_(this, &TcpChannelTester::OnRead)),
+ write_errors_(0),
+ read_errors_(0) {
+ }
+
+ virtual ~TcpChannelTester() { }
+
+ void Start() {
+ // Initialize |send_buffer_|.
+ send_buffer_ = new net::DrainableIOBuffer(new net::IOBuffer(kTcpDataSize),
+ kTcpDataSize);
+ for (int i = 0; i < kTcpDataSize; ++i) {
+ send_buffer_->data()[i] = rand() % 256;
+ }
+
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &TcpChannelTester::DoStart));
+ }
+
+ void CheckResults() {
+ EXPECT_EQ(0, write_errors_);
+ EXPECT_EQ(0, read_errors_);
+
+ EXPECT_EQ(0, send_buffer_->BytesRemaining());
+
+ send_buffer_->SetOffset(0);
+ EXPECT_EQ(kTcpDataSize, static_cast<int>(received_data_.size()));
+ EXPECT_EQ(0, memcmp(send_buffer_->data(),
+ &received_data_[0], received_data_.size()));
+ }
+
+ protected:
+ void Done() {
+ done_ = true;
+ message_loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask());
+ }
+
+ void DoStart() {
+ DoRead();
+ DoWrite();
+ }
+
+ void DoWrite() {
+ if (send_buffer_->BytesRemaining() == 0) {
+ return;
+ }
+
+ int result = write_socket_->Write(
+ send_buffer_, send_buffer_->BytesRemaining(), &write_cb_);
+ HandleWriteResult(result);
+ }
+
+ void OnWritten(int result) {
+ HandleWriteResult(result);
+ }
+
+ void HandleWriteResult(int result) {
+ if (result <= 0 && result != net::ERR_IO_PENDING) {
+ LOG(ERROR) << "Received error " << result << " when trying to write";
+ write_errors_++;
+ Done();
+ } else if (result > 0) {
+ send_buffer_->DidConsume(result);
+ message_loop_->PostDelayedTask(
+ FROM_HERE, NewRunnableMethod(this, &TcpChannelTester::DoWrite),
+ kTcpWriteDelayMs);
+ }
+ }
+
+ void DoRead() {
+ int result = 1;
+ while (result > 0) {
+ int kReadSize = kMessageSize * 2;
+ read_buffer_ = new net::IOBuffer(kReadSize);
+
+ result = read_socket_->Read(read_buffer_, kReadSize, &read_cb_);
+ HandleReadResult(result);
+ };
+ }
+
+ void OnRead(int result) {
+ HandleReadResult(result);
+ DoRead();
+ }
+
+ void HandleReadResult(int result) {
+ if (result <= 0 && result != net::ERR_IO_PENDING) {
+ // Error will be received after the socket is closed.
+ if (!done_) {
+ LOG(ERROR) << "Received error " << result << " when trying to read";
+ read_errors_++;
+ Done();
+ }
+ } else if (result > 0) {
+ received_data_.insert(received_data_.end(), read_buffer_->data(),
+ read_buffer_->data() + result);
+ if (static_cast<int>(received_data_.size()) == kTcpDataSize)
+ Done();
+ }
+ }
+
+ private:
+ MessageLoop* message_loop_;
+ net::Socket* write_socket_;
+ net::Socket* read_socket_;
+ bool done_;
+
+ scoped_refptr<net::DrainableIOBuffer> send_buffer_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+
+ std::vector<char> sent_data_;
+ std::vector<char> received_data_;
+
+ net::CompletionCallbackImpl<TcpChannelTester> write_cb_;
+ net::CompletionCallbackImpl<TcpChannelTester> read_cb_;
+ int write_errors_;
+ int read_errors_;
+};
+
} // namespace
class MockP2PEventHandler : public P2PTransport::EventHandler {
public:
MOCK_METHOD1(OnCandidateReady, void(const std::string& address));
MOCK_METHOD1(OnStateChange, void(P2PTransport::State state));
+ MOCK_METHOD1(OnError, void(int error));
};
class P2PTransportImplTest : public testing::Test {
public:
protected:
- void SetUp() OVERRIDE {
+ virtual void SetUp() OVERRIDE {
socket_manager_ = new jingle_glue::FakeSocketManager();
net::IPAddressNumber ip;
@@ -213,6 +347,13 @@ class P2PTransportImplTest : public testing::Test {
new jingle_glue::FakeSocketFactory(socket_manager_, ip)));
}
+ void Init(P2PTransport::Protocol protocol) {
+ ASSERT_TRUE(transport1_->Init(
+ kTransportName1, protocol, kTestConfig, &event_handler1_));
+ ASSERT_TRUE(transport2_->Init(
+ kTransportName2, protocol, kTestConfig, &event_handler2_));
+ }
+
MessageLoop message_loop_;
scoped_refptr<jingle_glue::FakeSocketManager> socket_manager_;
@@ -223,10 +364,7 @@ class P2PTransportImplTest : public testing::Test {
};
TEST_F(P2PTransportImplTest, Create) {
- ASSERT_TRUE(transport1_->Init(
- kTransportName1, kTestConfig, &event_handler1_));
- ASSERT_TRUE(transport2_->Init(
- kTransportName2, kTestConfig, &event_handler2_));
+ Init(P2PTransport::PROTOCOL_UDP);
EXPECT_CALL(event_handler1_, OnCandidateReady(_));
EXPECT_CALL(event_handler2_, OnCandidateReady(_));
@@ -238,11 +376,19 @@ ACTION_P(AddRemoteCandidate, transport) {
EXPECT_TRUE(transport->AddRemoteCandidate(arg0));
}
-TEST_F(P2PTransportImplTest, Connect) {
- ASSERT_TRUE(transport1_->Init(
- kTransportName1, kTestConfig, &event_handler1_));
- ASSERT_TRUE(transport2_->Init(
- kTransportName2, kTestConfig, &event_handler2_));
+TEST_F(P2PTransportImplTest, ConnectUdp) {
+ Init(P2PTransport::PROTOCOL_UDP);
+
+ EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport2_.get()));
+ EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport1_.get()));
+
+ message_loop_.RunAllPending();
+}
+
+TEST_F(P2PTransportImplTest, ConnectTcp) {
+ Init(P2PTransport::PROTOCOL_UDP);
EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly(
AddRemoteCandidate(transport2_.get()));
@@ -252,11 +398,47 @@ TEST_F(P2PTransportImplTest, Connect) {
message_loop_.RunAllPending();
}
-TEST_F(P2PTransportImplTest, SendData) {
- ASSERT_TRUE(transport1_->Init(
- kTransportName1, kTestConfig, &event_handler1_));
- ASSERT_TRUE(transport2_->Init(
- kTransportName2, kTestConfig, &event_handler2_));
+TEST_F(P2PTransportImplTest, SendDataUdp) {
+ Init(P2PTransport::PROTOCOL_UDP);
+
+ EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport2_.get()));
+ EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport1_.get()));
+
+ // Transport may first become ether readable or writable, but
+ // eventually it must be readable and writable.
+ EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_READABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_WRITABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler1_, OnStateChange(
+ static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE |
+ P2PTransport::STATE_WRITABLE)))
+ .Times(Exactly(1));
+
+ EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_READABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_WRITABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler2_, OnStateChange(
+ static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE |
+ P2PTransport::STATE_WRITABLE)))
+ .Times(Exactly(1));
+
+ scoped_refptr<UdpChannelTester> channel_tester = new UdpChannelTester(
+ &message_loop_, transport1_->GetChannel(), transport2_->GetChannel());
+
+ message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(),
+ TestTimeouts::action_max_timeout_ms());
+
+ channel_tester->Start();
+ message_loop_.Run();
+ channel_tester->CheckResults();
+}
+
+TEST_F(P2PTransportImplTest, SendDataTcp) {
+ Init(P2PTransport::PROTOCOL_TCP);
EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly(
AddRemoteCandidate(transport2_.get()));
@@ -283,7 +465,7 @@ TEST_F(P2PTransportImplTest, SendData) {
P2PTransport::STATE_WRITABLE)))
.Times(Exactly(1));
- scoped_refptr<ChannelTester> channel_tester = new ChannelTester(
+ scoped_refptr<TcpChannelTester> channel_tester = new TcpChannelTester(
&message_loop_, transport1_->GetChannel(), transport2_->GetChannel());
message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(),
diff --git a/content/renderer/p2p/socket_dispatcher.cc b/content/renderer/p2p/socket_dispatcher.cc
index 065bd23..429d7bc 100644
--- a/content/renderer/p2p/socket_dispatcher.cc
+++ b/content/renderer/p2p/socket_dispatcher.cc
@@ -23,6 +23,11 @@ void P2PSocketDispatcher::RequestNetworks() {
Send(new P2PHostMsg_GetNetworkList(routing_id()));
}
+void P2PSocketDispatcher::GetNetworks(net::NetworkInterfaceList* networks) {
+ base::AutoLock auto_lock(networks_lock_);
+ *networks = networks_;
+}
+
bool P2PSocketDispatcher::OnMessageReceived(const IPC::Message& message) {
bool handled = true;
IPC_BEGIN_MESSAGE_MAP(P2PSocketDispatcher, message)
@@ -55,6 +60,7 @@ base::MessageLoopProxy* P2PSocketDispatcher::message_loop() {
void P2PSocketDispatcher::OnNetworkList(
const net::NetworkInterfaceList& networks) {
+ base::AutoLock auto_lock(networks_lock_);
networks_ = networks;
}
diff --git a/content/renderer/p2p/socket_dispatcher.h b/content/renderer/p2p/socket_dispatcher.h
index 4d0c273..b91b847 100644
--- a/content/renderer/p2p/socket_dispatcher.h
+++ b/content/renderer/p2p/socket_dispatcher.h
@@ -22,6 +22,7 @@
#include <vector>
#include "base/id_map.h"
+#include "base/synchronization/lock.h"
#include "content/common/p2p_sockets.h"
#include "content/renderer/p2p/socket_client.h"
#include "content/renderer/render_view_observer.h"
@@ -39,7 +40,7 @@ class P2PSocketDispatcher : public RenderViewObserver {
virtual ~P2PSocketDispatcher();
void RequestNetworks();
- const net::NetworkInterfaceList& networks() const { return networks_; }
+ void GetNetworks(net::NetworkInterfaceList* networks);
// RenderViewObserver overrides.
virtual bool OnMessageReceived(const IPC::Message& message);
@@ -66,6 +67,7 @@ class P2PSocketDispatcher : public RenderViewObserver {
scoped_refptr<base::MessageLoopProxy> message_loop_;
IDMap<P2PSocketClient> clients_;
net::NetworkInterfaceList networks_;
+ base::Lock networks_lock_;
DISALLOW_COPY_AND_ASSIGN(P2PSocketDispatcher);
};
diff --git a/ppapi/tests/test_transport.cc b/ppapi/tests/test_transport.cc
index bcf9701..80f9475 100644
--- a/ppapi/tests/test_transport.cc
+++ b/ppapi/tests/test_transport.cc
@@ -22,7 +22,7 @@
REGISTER_TEST_CASE(Transport);
#define RUN_SUBTEST(function) { \
- std::string result = function(); \
+ std::string result = function; \
if (!result.empty()) \
return result; \
}
@@ -96,13 +96,15 @@ bool TestTransport::Init() {
void TestTransport::RunTest() {
RUN_TEST(Create);
RUN_TEST(Connect);
- RUN_TEST(SendData);
- RUN_TEST(ConnectAndClose);
+ RUN_TEST(SendDataUdp);
+ RUN_TEST(SendDataTcp);
+ RUN_TEST(ConnectAndCloseUdp);
+ RUN_TEST(ConnectAndCloseTcp);
}
-std::string TestTransport::InitTargets() {
- transport1_.reset(new pp::Transport_Dev(instance_, kTestChannelName, ""));
- transport2_.reset(new pp::Transport_Dev(instance_, kTestChannelName, ""));
+std::string TestTransport::InitTargets(const char* proto) {
+ transport1_.reset(new pp::Transport_Dev(instance_, kTestChannelName, proto));
+ transport2_.reset(new pp::Transport_Dev(instance_, kTestChannelName, proto));
ASSERT_TRUE(transport1_.get() != NULL);
ASSERT_TRUE(transport2_.get() != NULL);
@@ -149,7 +151,7 @@ std::string TestTransport::Clean() {
}
std::string TestTransport::TestCreate() {
- RUN_SUBTEST(InitTargets);
+ RUN_SUBTEST(InitTargets("udp"));
Clean();
@@ -157,17 +159,17 @@ std::string TestTransport::TestCreate() {
}
std::string TestTransport::TestConnect() {
- RUN_SUBTEST(InitTargets);
- RUN_SUBTEST(Connect);
+ RUN_SUBTEST(InitTargets("udp"));
+ RUN_SUBTEST(Connect());
Clean();
PASS();
}
-std::string TestTransport::TestSendData() {
- RUN_SUBTEST(InitTargets);
- RUN_SUBTEST(Connect);
+std::string TestTransport::TestSendDataUdp() {
+ RUN_SUBTEST(InitTargets("udp"));
+ RUN_SUBTEST(Connect());
StreamReader reader(transport1_.get());
@@ -206,9 +208,72 @@ std::string TestTransport::TestSendData() {
PASS();
}
-std::string TestTransport::TestConnectAndClose() {
- RUN_SUBTEST(InitTargets);
- RUN_SUBTEST(Connect);
+std::string TestTransport::TestSendDataTcp() {
+ RUN_SUBTEST(InitTargets("tcp"));
+ RUN_SUBTEST(Connect());
+
+ StreamReader reader(transport1_.get());
+
+ std::vector<char> sent_data;
+ for (int i = 0; i < kNumPackets; ++i) {
+ std::vector<char> send_buffer(kSendBufferSize);
+ for (size_t j = 0; j < send_buffer.size(); ++j) {
+ send_buffer[j] = rand() % 256;
+ }
+
+ TestCompletionCallback send_cb(instance_->pp_instance());
+ int result = transport2_->Send(&send_buffer[0], send_buffer.size(),
+ send_cb);
+ if (result == PP_OK_COMPLETIONPENDING)
+ result = send_cb.WaitForResult();
+ ASSERT_TRUE(result > 0);
+ sent_data.insert(sent_data.end(), send_buffer.begin(),
+ send_buffer.begin() + result);
+ }
+
+ // Wait for 1 second.
+ TestCompletionCallback wait_cb(instance_->pp_instance());
+ pp::Module::Get()->core()->CallOnMainThread(1000, wait_cb);
+ ASSERT_EQ(wait_cb.WaitForResult(), PP_OK);
+
+ ASSERT_TRUE(reader.errors().size() == 0);
+
+ std::vector<char> received_data;
+ for (std::list<std::vector<char> >::const_iterator it =
+ reader.received().begin(); it != reader.received().end(); ++it) {
+ received_data.insert(received_data.end(), it->begin(), it->end());
+ }
+ ASSERT_EQ(sent_data, received_data);
+
+ Clean();
+
+ PASS();
+}
+
+std::string TestTransport::TestConnectAndCloseUdp() {
+ RUN_SUBTEST(InitTargets("udp"));
+ RUN_SUBTEST(Connect());
+
+ std::vector<char> recv_buffer(kReadBufferSize);
+ TestCompletionCallback recv_cb(instance_->pp_instance());
+ ASSERT_EQ(
+ transport1_->Recv(&recv_buffer[0], recv_buffer.size(), recv_cb),
+ PP_OK_COMPLETIONPENDING);
+
+ // Close the transport and verify that callback is aborted.
+ ASSERT_EQ(transport1_->Close(), PP_OK);
+
+ ASSERT_EQ(recv_cb.run_count(), 1);
+ ASSERT_EQ(recv_cb.result(), PP_ERROR_ABORTED);
+
+ Clean();
+
+ PASS();
+}
+
+std::string TestTransport::TestConnectAndCloseTcp() {
+ RUN_SUBTEST(InitTargets("tcp"));
+ RUN_SUBTEST(Connect());
std::vector<char> recv_buffer(kReadBufferSize);
TestCompletionCallback recv_cb(instance_->pp_instance());
diff --git a/ppapi/tests/test_transport.h b/ppapi/tests/test_transport.h
index f883b6a..2f3a8147 100644
--- a/ppapi/tests/test_transport.h
+++ b/ppapi/tests/test_transport.h
@@ -25,14 +25,16 @@ class TestTransport : public TestCase {
virtual void RunTest();
private:
- std::string InitTargets();
+ std::string InitTargets(const char* proto);
std::string Connect();
std::string Clean();
std::string TestCreate();
std::string TestConnect();
- std::string TestSendData();
- std::string TestConnectAndClose();
+ std::string TestSendDataTcp();
+ std::string TestSendDataUdp();
+ std::string TestConnectAndCloseTcp();
+ std::string TestConnectAndCloseUdp();
// Used by the tests that access the C API directly.
const PPB_Transport_Dev* transport_interface_;
diff --git a/ppapi/tests/test_utils.cc b/ppapi/tests/test_utils.cc
index 622c758..7e0a351 100644
--- a/ppapi/tests/test_utils.cc
+++ b/ppapi/tests/test_utils.cc
@@ -27,16 +27,20 @@ std::string ReportError(const char* method, int32_t error) {
}
TestCompletionCallback::TestCompletionCallback(PP_Instance instance)
- : result_(PP_OK_COMPLETIONPENDING),
+ : have_result_(false),
+ result_(PP_OK_COMPLETIONPENDING),
post_quit_task_(false),
run_count_(0),
instance_(instance) {
}
int32_t TestCompletionCallback::WaitForResult() {
- result_ = PP_OK_COMPLETIONPENDING; // Reset
- post_quit_task_ = true;
- GetTestingInterface()->RunMessageLoop(instance_);
+ if (!have_result_) {
+ result_ = PP_OK_COMPLETIONPENDING; // Reset
+ post_quit_task_ = true;
+ GetTestingInterface()->RunMessageLoop(instance_);
+ }
+ have_result_ = false;
return result_;
}
@@ -50,6 +54,7 @@ void TestCompletionCallback::Handler(void* user_data, int32_t result) {
TestCompletionCallback* callback =
static_cast<TestCompletionCallback*>(user_data);
callback->result_ = result;
+ callback->have_result_ = true;
callback->run_count_++;
if (callback->post_quit_task_) {
callback->post_quit_task_ = false;
diff --git a/ppapi/tests/test_utils.h b/ppapi/tests/test_utils.h
index b71cc05..721a048 100644
--- a/ppapi/tests/test_utils.h
+++ b/ppapi/tests/test_utils.h
@@ -19,6 +19,11 @@ class TestCompletionCallback {
public:
TestCompletionCallback(PP_Instance instance);
+ // Waits for the callback to be called and returns the
+ // result. Returns immediately if the callback was previously called
+ // and the result wasn't returned (i.e. each result value received
+ // by the callback is returned by WaitForResult() once and only
+ // once).
int32_t WaitForResult();
operator pp::CompletionCallback() const;
@@ -31,6 +36,7 @@ class TestCompletionCallback {
private:
static void Handler(void* user_data, int32_t result);
+ bool have_result_;
int32_t result_;
bool post_quit_task_;
unsigned run_count_;
diff --git a/tools/valgrind/gtest_exclude/unit_tests.gtest.txt b/tools/valgrind/gtest_exclude/unit_tests.gtest.txt
index ab9bc2b..6884731 100644
--- a/tools/valgrind/gtest_exclude/unit_tests.gtest.txt
+++ b/tools/valgrind/gtest_exclude/unit_tests.gtest.txt
@@ -9,12 +9,14 @@ PredictorTest.MassiveConcurrentLookupTest
# Pure virtual method called: see http://crbug.com/50950
ConnectionTesterTest.RunAllTests
-# Following two tests fail under valgrind because libjingle has hardcoded
+# Following tests fail under valgrind because libjingle has hardcoded
# timeouts for P2P connections, and it makes these tests fail under valgrind.
# TODO(sergeyu): Remove hardcoded timeouts from libjingle.
P2PTransportImplTest.Create
-P2PTransportImplTest.Connect
-P2PTransportImplTest.SendData
+P2PTransportImplTest.ConnectUdp
+P2PTransportImplTest.ConnectTcp
+P2PTransportImplTest.SendDataUdp
+P2PTransportImplTest.SendDataTcp
# Failing on CrOS, see http://crbug.com/79657
SignedSettingsTest.StorePolicyNoPolicyData
diff --git a/webkit/glue/p2p_transport.h b/webkit/glue/p2p_transport.h
index e80ffb5..2c3200f 100644
--- a/webkit/glue/p2p_transport.h
+++ b/webkit/glue/p2p_transport.h
@@ -22,6 +22,11 @@ class P2PTransport {
STATE_READABLE = 2,
};
+ enum Protocol {
+ PROTOCOL_UDP = 0,
+ PROTOCOL_TCP = 1,
+ };
+
class EventHandler {
public:
virtual ~EventHandler() {}
@@ -31,6 +36,11 @@ class P2PTransport {
// Called when readable of writable state of the stream changes.
virtual void OnStateChange(State state) = 0;
+
+ // Called when an error occures (e.g. TCP handshake
+ // failed). P2PTransport object is not usable after that and
+ // should be destroyed.
+ virtual void OnError(int error) = 0;
};
virtual ~P2PTransport() {}
@@ -38,6 +48,7 @@ class P2PTransport {
// Initialize transport using specified configuration. Returns true
// if initialization succeeded.
virtual bool Init(const std::string& name,
+ Protocol protocol,
const std::string& config,
EventHandler* event_handler) = 0;
diff --git a/webkit/plugins/ppapi/ppb_transport_impl.cc b/webkit/plugins/ppapi/ppb_transport_impl.cc
index 3f48721..a37f299 100644
--- a/webkit/plugins/ppapi/ppb_transport_impl.cc
+++ b/webkit/plugins/ppapi/ppb_transport_impl.cc
@@ -5,6 +5,7 @@
#include "webkit/plugins/ppapi/ppb_transport_impl.h"
#include "base/message_loop.h"
+#include "base/string_util.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/socket/socket.h"
@@ -16,11 +17,16 @@
#include "webkit/plugins/ppapi/ppapi_plugin_instance.h"
#include "webkit/plugins/ppapi/var.h"
+using webkit_glue::P2PTransport;
+
namespace webkit {
namespace ppapi {
namespace {
+const char kUdpProtocolName[] = "udp";
+const char kTcpProtocolName[] = "tcp";
+
PP_Resource CreateTransport(PP_Instance instance_id, const char* name,
const char* proto) {
PluginInstance* instance = ResourceTracker::Get()->GetInstance(instance_id);
@@ -137,7 +143,16 @@ PPB_Transport_Impl* PPB_Transport_Impl::AsPPB_Transport_Impl() {
bool PPB_Transport_Impl::Init(const char* name, const char* proto) {
name_ = name;
- proto_ = proto;
+
+ if (base::strcasecmp(proto, kUdpProtocolName) == 0) {
+ use_tcp_ = false;
+ } else if (base::strcasecmp(proto, kTcpProtocolName) == 0) {
+ use_tcp_ = true;
+ } else {
+ LOG(WARNING) << "Unknown protocol: " << proto;
+ return false;
+ }
+
p2p_transport_.reset(instance()->delegate()->CreateP2PTransport());
return p2p_transport_.get() != NULL;
}
@@ -153,13 +168,14 @@ int32_t PPB_Transport_Impl::Connect(PP_CompletionCallback callback) {
if (!p2p_transport_.get())
return PP_ERROR_FAILED;
- // TODO(sergeyu): Use |proto_| here.
-
// Connect() has already been called.
if (started_)
return PP_ERROR_INPROGRESS;
- if (!p2p_transport_->Init(name_, "", this))
+ P2PTransport::Protocol protocol = use_tcp_ ?
+ P2PTransport::PROTOCOL_TCP : P2PTransport::PROTOCOL_UDP;
+
+ if (!p2p_transport_->Init(name_, protocol, "", this))
return PP_ERROR_FAILED;
started_ = true;
@@ -285,6 +301,15 @@ void PPB_Transport_Impl::OnStateChange(webkit_glue::P2PTransport::State state) {
}
}
+void PPB_Transport_Impl::OnError(int error) {
+ writable_ = false;
+ if (connect_callback_.get() && !connect_callback_->completed()) {
+ scoped_refptr<TrackedCompletionCallback> callback;
+ callback.swap(connect_callback_);
+ callback->Run(PP_ERROR_FAILED);
+ }
+}
+
void PPB_Transport_Impl::OnRead(int result) {
DCHECK(recv_callback_.get() && !recv_callback_->completed());
diff --git a/webkit/plugins/ppapi/ppb_transport_impl.h b/webkit/plugins/ppapi/ppb_transport_impl.h
index c368b07..ca4818f 100644
--- a/webkit/plugins/ppapi/ppb_transport_impl.h
+++ b/webkit/plugins/ppapi/ppb_transport_impl.h
@@ -43,13 +43,14 @@ class PPB_Transport_Impl : public Resource,
// webkit_glue::P2PTransport::EventHandler implementation.
virtual void OnCandidateReady(const std::string& address) OVERRIDE;
virtual void OnStateChange(webkit_glue::P2PTransport::State state) OVERRIDE;
+ virtual void OnError(int error) OVERRIDE;
private:
void OnRead(int result);
void OnWritten(int result);
std::string name_;
- std::string proto_;
+ bool use_tcp_;
bool started_;
scoped_ptr<webkit_glue::P2PTransport> p2p_transport_;
bool writable_;