summaryrefslogtreecommitdiffstats
path: root/content
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-04-06 23:16:42 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-04-06 23:16:42 +0000
commit00f0b81e684035f705e44b818bb039bbeae2c274 (patch)
tree6357372615e1822f5e98fdcdffe6c07d814cd801 /content
parentf0dbe56c5e204e4b2945a33732faabf9aafc3229 (diff)
downloadchromium_src-00f0b81e684035f705e44b818bb039bbeae2c274.zip
chromium_src-00f0b81e684035f705e44b818bb039bbeae2c274.tar.gz
chromium_src-00f0b81e684035f705e44b818bb039bbeae2c274.tar.bz2
P2P Transport implementation.
TEST=Unittests. BUG=None Review URL: http://codereview.chromium.org/6791023 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@80717 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'content')
-rw-r--r--content/content_renderer.gypi2
-rw-r--r--content/renderer/p2p/ipc_network_manager.h6
-rw-r--r--content/renderer/p2p/p2p_transport_impl.cc161
-rw-r--r--content/renderer/p2p/p2p_transport_impl.h77
-rw-r--r--content/renderer/p2p/p2p_transport_impl_unittest.cc304
5 files changed, 547 insertions, 3 deletions
diff --git a/content/content_renderer.gypi b/content/content_renderer.gypi
index 255523c..32c4271 100644
--- a/content/content_renderer.gypi
+++ b/content/content_renderer.gypi
@@ -63,6 +63,8 @@
'renderer/p2p/ipc_network_manager.h',
'renderer/p2p/ipc_socket_factory.cc',
'renderer/p2p/ipc_socket_factory.h',
+ 'renderer/p2p/p2p_transport_impl.cc',
+ 'renderer/p2p/p2p_transport_impl.h',
'renderer/p2p/socket_client.cc',
'renderer/p2p/socket_client.h',
'renderer/p2p/socket_dispatcher.cc',
diff --git a/content/renderer/p2p/ipc_network_manager.h b/content/renderer/p2p/ipc_network_manager.h
index 9a1bbcb..a22cb81 100644
--- a/content/renderer/p2p/ipc_network_manager.h
+++ b/content/renderer/p2p/ipc_network_manager.h
@@ -22,9 +22,9 @@ class IpcNetworkManager : public talk_base::NetworkManager {
protected:
// Fills the supplied list with all usable networks.
- virtual bool EnumNetworks(bool include_ignored,
- std::vector<talk_base::Network*>* networks)
- OVERRIDE;
+ virtual bool EnumNetworks(
+ bool include_ignored,
+ std::vector<talk_base::Network*>* networks) OVERRIDE;
P2PSocketDispatcher* socket_dispatcher_;
};
diff --git a/content/renderer/p2p/p2p_transport_impl.cc b/content/renderer/p2p/p2p_transport_impl.cc
new file mode 100644
index 0000000..d5f2afe
--- /dev/null
+++ b/content/renderer/p2p/p2p_transport_impl.cc
@@ -0,0 +1,161 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/renderer/p2p/p2p_transport_impl.h"
+
+#include "base/values.h"
+#include "content/renderer/p2p/ipc_network_manager.h"
+#include "content/renderer/p2p/ipc_socket_factory.h"
+#include "content/renderer/render_view.h"
+#include "chrome/common/json_value_serializer.h"
+#include "jingle/glue/channel_socket_adapter.h"
+#include "jingle/glue/thread_wrapper.h"
+#include "third_party/libjingle/source/talk/p2p/base/p2ptransportchannel.h"
+#include "third_party/libjingle/source/talk/p2p/client/basicportallocator.h"
+
+P2PTransportImpl::P2PTransportImpl(
+ talk_base::NetworkManager* network_manager,
+ talk_base::PacketSocketFactory* socket_factory)
+ : event_handler_(NULL),
+ state_(STATE_NONE),
+ network_manager_(network_manager),
+ socket_factory_(socket_factory) {
+}
+
+P2PTransportImpl::~P2PTransportImpl() {
+}
+
+bool P2PTransportImpl::Init(const std::string& name, const std::string& config,
+ EventHandler* event_handler) {
+ DCHECK(event_handler);
+
+ // Before proceeding, ensure we have libjingle thread wrapper for
+ // the current thread.
+ jingle_glue::JingleThreadWrapper::EnsureForCurrentThread();
+
+ name_ = name;
+ event_handler_ = event_handler;
+
+ // TODO(sergeyu): Implement PortAllocator that can parse |config|
+ // and use it here instead of BasicPortAllocator.
+ allocator_.reset(new cricket::BasicPortAllocator(
+ network_manager_, socket_factory_));
+
+ channel_.reset(new cricket::P2PTransportChannel(
+ name, "", NULL, allocator_.get()));
+ channel_->SignalRequestSignaling.connect(
+ this, &P2PTransportImpl::OnRequestSignaling);
+ channel_->SignalWritableState.connect(
+ this, &P2PTransportImpl::OnReadableState);
+ channel_->SignalWritableState.connect(
+ this, &P2PTransportImpl::OnWriteableState);
+ channel_->SignalCandidateReady.connect(
+ this, &P2PTransportImpl::OnCandidateReady);
+
+ channel_adapter_.reset(new jingle_glue::TransportChannelSocketAdapter(
+ channel_.get()));
+
+ channel_->Connect();
+
+ return true;
+}
+
+bool P2PTransportImpl::AddRemoteCandidate(const std::string& address) {
+ cricket::Candidate candidate;
+ if (!DeserializeCandidate(address, &candidate)) {
+ return false;
+ }
+
+ channel_->OnCandidate(candidate);
+ return true;
+}
+
+void P2PTransportImpl::OnRequestSignaling() {
+ channel_->OnSignalingReady();
+}
+
+void P2PTransportImpl::OnCandidateReady(
+ cricket::TransportChannelImpl* channel,
+ const cricket::Candidate& candidate) {
+ event_handler_->OnCandidateReady(SerializeCandidate(candidate));
+}
+
+void P2PTransportImpl::OnReadableState(cricket::TransportChannel* channel) {
+ state_ = static_cast<State>(state_ | STATE_READABLE);
+ event_handler_->OnStateChange(state_);
+}
+
+void P2PTransportImpl::OnWriteableState(cricket::TransportChannel* channel) {
+ state_ = static_cast<State>(state_ | STATE_WRITABLE);
+ event_handler_->OnStateChange(state_);
+}
+
+std::string P2PTransportImpl::SerializeCandidate(
+ const cricket::Candidate& candidate) {
+
+ // TODO(sergeyu): Use SDP to format candidates?
+ DictionaryValue value;
+ value.SetString("name", candidate.name());
+ value.SetString("ip", candidate.address().IPAsString());
+ value.SetInteger("port", candidate.address().port());
+ value.SetString("type", candidate.type());
+ value.SetString("protocol", candidate.protocol());
+ value.SetString("username", candidate.username());
+ value.SetString("password", candidate.password());
+ value.SetDouble("preference", candidate.preference());
+ value.SetInteger("generation", candidate.generation());
+
+ std::string result;
+ JSONStringValueSerializer serializer(&result);
+ serializer.Serialize(value);
+ return result;
+}
+
+bool P2PTransportImpl::DeserializeCandidate(const std::string& address,
+ cricket::Candidate* candidate) {
+ JSONStringValueSerializer deserializer(address);
+ scoped_ptr<Value> value(deserializer.Deserialize(NULL, NULL));
+ if (!value.get() || !value->IsType(Value::TYPE_DICTIONARY)) {
+ return false;
+ }
+
+ DictionaryValue* dic_value = static_cast<DictionaryValue*>(value.get());
+
+ std::string name;
+ std::string ip;
+ int port;
+ std::string type;
+ std::string protocol;
+ std::string username;
+ std::string password;
+ double preference;
+ int generation;
+
+ if (!dic_value->GetString("name", &name) ||
+ !dic_value->GetString("ip", &ip) ||
+ !dic_value->GetInteger("port", &port) ||
+ !dic_value->GetString("type", &type) ||
+ !dic_value->GetString("protocol", &protocol) ||
+ !dic_value->GetString("username", &username) ||
+ !dic_value->GetString("password", &password) ||
+ !dic_value->GetDouble("preference", &preference) ||
+ !dic_value->GetInteger("generation", &generation)) {
+ return false;
+ }
+
+ candidate->set_name(name);
+ candidate->set_address(talk_base::SocketAddress(ip, port));
+ candidate->set_type(type);
+ candidate->set_protocol(protocol);
+ candidate->set_username(username);
+ candidate->set_password(password);
+ candidate->set_preference(static_cast<float>(preference));
+ candidate->set_generation(generation);
+
+ return true;
+}
+
+net::Socket* P2PTransportImpl::GetChannel() {
+ return channel_adapter_.get();
+}
diff --git a/content/renderer/p2p/p2p_transport_impl.h b/content/renderer/p2p/p2p_transport_impl.h
new file mode 100644
index 0000000..d473614
--- /dev/null
+++ b/content/renderer/p2p/p2p_transport_impl.h
@@ -0,0 +1,77 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_RENDERER_P2P_P2P_TRANSPORT_IMPL_H_
+#define CONTENT_RENDERER_P2P_P2P_TRANSPORT_IMPL_H_
+
+#include "base/basictypes.h"
+#include "base/scoped_ptr.h"
+#include "third_party/libjingle/source/talk/base/sigslot.h"
+#include "webkit/glue/p2p_transport.h"
+
+class RenderView;
+
+namespace cricket {
+class Candidate;
+class PortAllocator;
+class P2PTransportChannel;
+class TransportChannel;
+class TransportChannelImpl;
+} // namespace cricket
+
+namespace jingle_glue {
+class TransportChannelSocketAdapter;
+} // namespace jingle_glue
+
+namespace talk_base {
+class NetworkManager;
+class PacketSocketFactory;
+} // namespace talk_base
+
+class P2PTransportImpl : public webkit_glue::P2PTransport,
+ public sigslot::has_slots<> {
+ public:
+ // Create P2PTransportImpl using specified NetworkManager and
+ // PacketSocketFactory. Caller keeps ownership of |network_manager|
+ // and |socket_factory|.
+ P2PTransportImpl(talk_base::NetworkManager* network_manager,
+ talk_base::PacketSocketFactory* socket_factory);
+
+ virtual ~P2PTransportImpl();
+
+ // webkit_glue::P2PTransport interface.
+ virtual bool Init(const std::string& name, const std::string& config,
+ EventHandler* event_handler) OVERRIDE;
+ virtual bool AddRemoteCandidate(const std::string& address) OVERRIDE;
+ virtual net::Socket* GetChannel() OVERRIDE;
+
+ private:
+ class ChannelAdapter;
+
+ void OnRequestSignaling();
+ void OnCandidateReady(cricket::TransportChannelImpl* channel,
+ const cricket::Candidate& candidate);
+ void OnReadableState(cricket::TransportChannel* channel);
+ void OnWriteableState(cricket::TransportChannel* channel);
+
+ std::string SerializeCandidate(const cricket::Candidate& candidate);
+ bool DeserializeCandidate(const std::string& address,
+ cricket::Candidate* candidate);
+
+ std::string name_;
+ EventHandler* event_handler_;
+ State state_;
+
+ talk_base::NetworkManager* network_manager_;
+ talk_base::PacketSocketFactory* socket_factory_;
+
+ scoped_ptr<cricket::PortAllocator> allocator_;
+ scoped_ptr<cricket::P2PTransportChannel> channel_;
+
+ scoped_ptr<jingle_glue::TransportChannelSocketAdapter> channel_adapter_;
+
+ DISALLOW_COPY_AND_ASSIGN(P2PTransportImpl);
+};
+
+#endif // CONTENT_RENDERER_P2P_P2P_TRANSPORT_IMPL_H_
diff --git a/content/renderer/p2p/p2p_transport_impl_unittest.cc b/content/renderer/p2p/p2p_transport_impl_unittest.cc
new file mode 100644
index 0000000..fc87fb6
--- /dev/null
+++ b/content/renderer/p2p/p2p_transport_impl_unittest.cc
@@ -0,0 +1,304 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/message_loop.h"
+#include "base/test/test_timeouts.h"
+#include "content/renderer/p2p/p2p_transport_impl.h"
+#include "jingle/glue/fake_network_manager.h"
+#include "jingle/glue/fake_socket_factory.h"
+#include "jingle/glue/thread_wrapper.h"
+#include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_util.h"
+#include "net/socket/socket.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::AtMost;
+using testing::Exactly;
+using testing::Invoke;
+
+using webkit_glue::P2PTransport;
+
+namespace {
+const char kTestAddress1[] = "192.168.15.12";
+const char kTestAddress2[] = "192.168.15.33";
+
+const char kTransportName1[] = "tr1";
+const char kTransportName2[] = "tr2";
+
+const char kTestConfig[] = "";
+
+// Send 10 packets 10 bytes each. Packets are sent with 10ms delay
+// between packets (about 100 ms for 10 messages).
+const int kMessageSize = 10;
+const int kMessages = 10;
+const int kUdpWriteDelayMs = 10;
+
+class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> {
+ public:
+ ChannelTester(MessageLoop* message_loop,
+ net::Socket* write_socket,
+ net::Socket* read_socket)
+ : message_loop_(message_loop),
+ write_socket_(write_socket),
+ read_socket_(read_socket),
+ done_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ write_cb_(this, &ChannelTester::OnWritten)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_cb_(this, &ChannelTester::OnRead)),
+ write_errors_(0),
+ read_errors_(0),
+ packets_sent_(0),
+ packets_received_(0),
+ broken_packets_(0) {
+ }
+
+ virtual ~ChannelTester() { }
+
+ void Start() {
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoStart));
+ }
+
+ void CheckResults() {
+ EXPECT_EQ(0, write_errors_);
+ EXPECT_EQ(0, read_errors_);
+
+ EXPECT_EQ(0, broken_packets_);
+
+ // Verify that we've received at least one packet.
+ EXPECT_GT(packets_received_, 0);
+ LOG(INFO) << "Received " << packets_received_ << " packets out of "
+ << kMessages;
+ }
+
+ protected:
+ void Done() {
+ done_ = true;
+ message_loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask());
+ }
+
+ void DoStart() {
+ DoRead();
+ DoWrite();
+ }
+
+ void DoWrite() {
+ if (packets_sent_ >= kMessages) {
+ Done();
+ return;
+ }
+
+ scoped_refptr<net::IOBuffer> packet(new net::IOBuffer(kMessageSize));
+ memset(packet->data(), 123, kMessageSize);
+ sent_packets_[packets_sent_] = packet;
+ // Put index of this packet in the beginning of the packet body.
+ memcpy(packet->data(), &packets_sent_, sizeof(packets_sent_));
+
+ int result = write_socket_->Write(packet, kMessageSize, &write_cb_);
+ HandleWriteResult(result);
+ }
+
+ void OnWritten(int result) {
+ HandleWriteResult(result);
+ }
+
+ void HandleWriteResult(int result) {
+ if (result <= 0 && result != net::ERR_IO_PENDING) {
+ LOG(ERROR) << "Received error " << result << " when trying to write";
+ write_errors_++;
+ Done();
+ } else if (result > 0) {
+ EXPECT_EQ(kMessageSize, result);
+ packets_sent_++;
+ message_loop_->PostDelayedTask(
+ FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoWrite),
+ kUdpWriteDelayMs);
+ }
+ }
+
+ void DoRead() {
+ int result = 1;
+ while (result > 0) {
+ int kReadSize = kMessageSize * 2;
+ read_buffer_ = new net::IOBuffer(kReadSize);
+
+ result = read_socket_->Read(read_buffer_, kReadSize, &read_cb_);
+ HandleReadResult(result);
+ };
+ }
+
+ void OnRead(int result) {
+ HandleReadResult(result);
+ DoRead();
+ }
+
+ void HandleReadResult(int result) {
+ if (result <= 0 && result != net::ERR_IO_PENDING) {
+ // Error will be received after the socket is closed.
+ if (!done_) {
+ LOG(ERROR) << "Received error " << result << " when trying to read";
+ read_errors_++;
+ Done();
+ }
+ } else if (result > 0) {
+ packets_received_++;
+ if (kMessageSize != result) {
+ // Invalid packet size;
+ broken_packets_++;
+ } else {
+ // Validate packet body.
+ int packet_id;
+ memcpy(&packet_id, read_buffer_->data(), sizeof(packet_id));
+ if (packet_id < 0 || packet_id >= kMessages) {
+ broken_packets_++;
+ } else {
+ if (memcmp(read_buffer_->data(), sent_packets_[packet_id]->data(),
+ kMessageSize) != 0)
+ broken_packets_++;
+ }
+ }
+ }
+ }
+
+ private:
+ MessageLoop* message_loop_;
+ net::Socket* write_socket_;
+ net::Socket* read_socket_;
+ bool done_;
+
+ scoped_refptr<net::IOBuffer> sent_packets_[kMessages];
+ scoped_refptr<net::IOBuffer> read_buffer_;
+
+ net::CompletionCallbackImpl<ChannelTester> write_cb_;
+ net::CompletionCallbackImpl<ChannelTester> read_cb_;
+ int write_errors_;
+ int read_errors_;
+ int packets_sent_;
+ int packets_received_;
+ int broken_packets_;
+};
+
+} // namespace
+
+class MockP2PEventHandler : public P2PTransport::EventHandler {
+ public:
+ MOCK_METHOD1(OnCandidateReady, void(const std::string& address));
+ MOCK_METHOD1(OnStateChange, void(P2PTransport::State state));
+};
+
+class P2PTransportImplTest : public testing::Test {
+ public:
+
+ protected:
+ void SetUp() OVERRIDE {
+ socket_manager_ = new jingle_glue::FakeSocketManager();
+
+ net::IPAddressNumber ip;
+ ASSERT(net::ParseIPLiteralToNumber(kTestAddress1, &ip));
+ network_manager1_.reset(new jingle_glue::FakeNetworkManager(ip));
+ socket_factory1_.reset(
+ new jingle_glue::FakeSocketFactory(socket_manager_, ip));
+ transport1_.reset(new P2PTransportImpl(network_manager1_.get(),
+ socket_factory1_.get()));
+
+ ASSERT(net::ParseIPLiteralToNumber(kTestAddress2, &ip));
+ network_manager2_.reset(new jingle_glue::FakeNetworkManager(ip));
+ socket_factory2_.reset(
+ new jingle_glue::FakeSocketFactory(socket_manager_, ip));
+ transport2_.reset(new P2PTransportImpl(network_manager2_.get(),
+ socket_factory2_.get()));
+ }
+
+ MessageLoop message_loop_;
+
+ scoped_ptr<jingle_glue::FakeNetworkManager> network_manager1_;
+ scoped_ptr<jingle_glue::FakeNetworkManager> network_manager2_;
+ scoped_refptr<jingle_glue::FakeSocketManager> socket_manager_;
+ scoped_ptr<jingle_glue::FakeSocketFactory> socket_factory1_;
+ scoped_ptr<jingle_glue::FakeSocketFactory> socket_factory2_;
+
+ scoped_ptr<P2PTransportImpl> transport1_;
+ MockP2PEventHandler event_handler1_;
+ scoped_ptr<P2PTransportImpl> transport2_;
+ MockP2PEventHandler event_handler2_;
+};
+
+TEST_F(P2PTransportImplTest, Create) {
+ ASSERT_TRUE(transport1_->Init(
+ kTransportName1, kTestConfig, &event_handler1_));
+ ASSERT_TRUE(transport2_->Init(
+ kTransportName2, kTestConfig, &event_handler2_));
+
+ EXPECT_CALL(event_handler1_, OnCandidateReady(_));
+ EXPECT_CALL(event_handler2_, OnCandidateReady(_));
+
+ message_loop_.RunAllPending();
+}
+
+ACTION_P(AddRemoteCandidate, transport) {
+ EXPECT_TRUE(transport->AddRemoteCandidate(arg0));
+}
+
+TEST_F(P2PTransportImplTest, Connect) {
+ ASSERT_TRUE(transport1_->Init(
+ kTransportName1, kTestConfig, &event_handler1_));
+ ASSERT_TRUE(transport2_->Init(
+ kTransportName2, kTestConfig, &event_handler2_));
+
+ EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport2_.get()));
+ EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport1_.get()));
+
+ message_loop_.RunAllPending();
+}
+
+TEST_F(P2PTransportImplTest, SendData) {
+ ASSERT_TRUE(transport1_->Init(
+ kTransportName1, kTestConfig, &event_handler1_));
+ ASSERT_TRUE(transport2_->Init(
+ kTransportName2, kTestConfig, &event_handler2_));
+
+ EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport2_.get()));
+ EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly(
+ AddRemoteCandidate(transport1_.get()));
+
+ // Transport may first become ether readable or writable, but
+ // eventually it must be readable and writable.
+ EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_READABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_WRITABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler1_, OnStateChange(
+ static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE |
+ P2PTransport::STATE_WRITABLE)))
+ .Times(Exactly(1));
+
+ EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_READABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_WRITABLE))
+ .Times(AtMost(1));
+ EXPECT_CALL(event_handler2_, OnStateChange(
+ static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE |
+ P2PTransport::STATE_WRITABLE)))
+ .Times(Exactly(1));
+
+ scoped_refptr<ChannelTester> channel_tester = new ChannelTester(
+ &message_loop_, transport1_->GetChannel(), transport2_->GetChannel());
+
+ message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(),
+ TestTimeouts::action_max_timeout_ms());
+
+ channel_tester->Start();
+ message_loop_.Run();
+ channel_tester->CheckResults();
+}