diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-06 23:16:42 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-06 23:16:42 +0000 |
commit | 00f0b81e684035f705e44b818bb039bbeae2c274 (patch) | |
tree | 6357372615e1822f5e98fdcdffe6c07d814cd801 /content | |
parent | f0dbe56c5e204e4b2945a33732faabf9aafc3229 (diff) | |
download | chromium_src-00f0b81e684035f705e44b818bb039bbeae2c274.zip chromium_src-00f0b81e684035f705e44b818bb039bbeae2c274.tar.gz chromium_src-00f0b81e684035f705e44b818bb039bbeae2c274.tar.bz2 |
P2P Transport implementation.
TEST=Unittests.
BUG=None
Review URL: http://codereview.chromium.org/6791023
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@80717 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'content')
-rw-r--r-- | content/content_renderer.gypi | 2 | ||||
-rw-r--r-- | content/renderer/p2p/ipc_network_manager.h | 6 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl.cc | 161 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl.h | 77 | ||||
-rw-r--r-- | content/renderer/p2p/p2p_transport_impl_unittest.cc | 304 |
5 files changed, 547 insertions, 3 deletions
diff --git a/content/content_renderer.gypi b/content/content_renderer.gypi index 255523c..32c4271 100644 --- a/content/content_renderer.gypi +++ b/content/content_renderer.gypi @@ -63,6 +63,8 @@ 'renderer/p2p/ipc_network_manager.h', 'renderer/p2p/ipc_socket_factory.cc', 'renderer/p2p/ipc_socket_factory.h', + 'renderer/p2p/p2p_transport_impl.cc', + 'renderer/p2p/p2p_transport_impl.h', 'renderer/p2p/socket_client.cc', 'renderer/p2p/socket_client.h', 'renderer/p2p/socket_dispatcher.cc', diff --git a/content/renderer/p2p/ipc_network_manager.h b/content/renderer/p2p/ipc_network_manager.h index 9a1bbcb..a22cb81 100644 --- a/content/renderer/p2p/ipc_network_manager.h +++ b/content/renderer/p2p/ipc_network_manager.h @@ -22,9 +22,9 @@ class IpcNetworkManager : public talk_base::NetworkManager { protected: // Fills the supplied list with all usable networks. - virtual bool EnumNetworks(bool include_ignored, - std::vector<talk_base::Network*>* networks) - OVERRIDE; + virtual bool EnumNetworks( + bool include_ignored, + std::vector<talk_base::Network*>* networks) OVERRIDE; P2PSocketDispatcher* socket_dispatcher_; }; diff --git a/content/renderer/p2p/p2p_transport_impl.cc b/content/renderer/p2p/p2p_transport_impl.cc new file mode 100644 index 0000000..d5f2afe --- /dev/null +++ b/content/renderer/p2p/p2p_transport_impl.cc @@ -0,0 +1,161 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/renderer/p2p/p2p_transport_impl.h" + +#include "base/values.h" +#include "content/renderer/p2p/ipc_network_manager.h" +#include "content/renderer/p2p/ipc_socket_factory.h" +#include "content/renderer/render_view.h" +#include "chrome/common/json_value_serializer.h" +#include "jingle/glue/channel_socket_adapter.h" +#include "jingle/glue/thread_wrapper.h" +#include "third_party/libjingle/source/talk/p2p/base/p2ptransportchannel.h" +#include "third_party/libjingle/source/talk/p2p/client/basicportallocator.h" + +P2PTransportImpl::P2PTransportImpl( + talk_base::NetworkManager* network_manager, + talk_base::PacketSocketFactory* socket_factory) + : event_handler_(NULL), + state_(STATE_NONE), + network_manager_(network_manager), + socket_factory_(socket_factory) { +} + +P2PTransportImpl::~P2PTransportImpl() { +} + +bool P2PTransportImpl::Init(const std::string& name, const std::string& config, + EventHandler* event_handler) { + DCHECK(event_handler); + + // Before proceeding, ensure we have libjingle thread wrapper for + // the current thread. + jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); + + name_ = name; + event_handler_ = event_handler; + + // TODO(sergeyu): Implement PortAllocator that can parse |config| + // and use it here instead of BasicPortAllocator. + allocator_.reset(new cricket::BasicPortAllocator( + network_manager_, socket_factory_)); + + channel_.reset(new cricket::P2PTransportChannel( + name, "", NULL, allocator_.get())); + channel_->SignalRequestSignaling.connect( + this, &P2PTransportImpl::OnRequestSignaling); + channel_->SignalWritableState.connect( + this, &P2PTransportImpl::OnReadableState); + channel_->SignalWritableState.connect( + this, &P2PTransportImpl::OnWriteableState); + channel_->SignalCandidateReady.connect( + this, &P2PTransportImpl::OnCandidateReady); + + channel_adapter_.reset(new jingle_glue::TransportChannelSocketAdapter( + channel_.get())); + + channel_->Connect(); + + return true; +} + +bool P2PTransportImpl::AddRemoteCandidate(const std::string& address) { + cricket::Candidate candidate; + if (!DeserializeCandidate(address, &candidate)) { + return false; + } + + channel_->OnCandidate(candidate); + return true; +} + +void P2PTransportImpl::OnRequestSignaling() { + channel_->OnSignalingReady(); +} + +void P2PTransportImpl::OnCandidateReady( + cricket::TransportChannelImpl* channel, + const cricket::Candidate& candidate) { + event_handler_->OnCandidateReady(SerializeCandidate(candidate)); +} + +void P2PTransportImpl::OnReadableState(cricket::TransportChannel* channel) { + state_ = static_cast<State>(state_ | STATE_READABLE); + event_handler_->OnStateChange(state_); +} + +void P2PTransportImpl::OnWriteableState(cricket::TransportChannel* channel) { + state_ = static_cast<State>(state_ | STATE_WRITABLE); + event_handler_->OnStateChange(state_); +} + +std::string P2PTransportImpl::SerializeCandidate( + const cricket::Candidate& candidate) { + + // TODO(sergeyu): Use SDP to format candidates? + DictionaryValue value; + value.SetString("name", candidate.name()); + value.SetString("ip", candidate.address().IPAsString()); + value.SetInteger("port", candidate.address().port()); + value.SetString("type", candidate.type()); + value.SetString("protocol", candidate.protocol()); + value.SetString("username", candidate.username()); + value.SetString("password", candidate.password()); + value.SetDouble("preference", candidate.preference()); + value.SetInteger("generation", candidate.generation()); + + std::string result; + JSONStringValueSerializer serializer(&result); + serializer.Serialize(value); + return result; +} + +bool P2PTransportImpl::DeserializeCandidate(const std::string& address, + cricket::Candidate* candidate) { + JSONStringValueSerializer deserializer(address); + scoped_ptr<Value> value(deserializer.Deserialize(NULL, NULL)); + if (!value.get() || !value->IsType(Value::TYPE_DICTIONARY)) { + return false; + } + + DictionaryValue* dic_value = static_cast<DictionaryValue*>(value.get()); + + std::string name; + std::string ip; + int port; + std::string type; + std::string protocol; + std::string username; + std::string password; + double preference; + int generation; + + if (!dic_value->GetString("name", &name) || + !dic_value->GetString("ip", &ip) || + !dic_value->GetInteger("port", &port) || + !dic_value->GetString("type", &type) || + !dic_value->GetString("protocol", &protocol) || + !dic_value->GetString("username", &username) || + !dic_value->GetString("password", &password) || + !dic_value->GetDouble("preference", &preference) || + !dic_value->GetInteger("generation", &generation)) { + return false; + } + + candidate->set_name(name); + candidate->set_address(talk_base::SocketAddress(ip, port)); + candidate->set_type(type); + candidate->set_protocol(protocol); + candidate->set_username(username); + candidate->set_password(password); + candidate->set_preference(static_cast<float>(preference)); + candidate->set_generation(generation); + + return true; +} + +net::Socket* P2PTransportImpl::GetChannel() { + return channel_adapter_.get(); +} diff --git a/content/renderer/p2p/p2p_transport_impl.h b/content/renderer/p2p/p2p_transport_impl.h new file mode 100644 index 0000000..d473614 --- /dev/null +++ b/content/renderer/p2p/p2p_transport_impl.h @@ -0,0 +1,77 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_RENDERER_P2P_P2P_TRANSPORT_IMPL_H_ +#define CONTENT_RENDERER_P2P_P2P_TRANSPORT_IMPL_H_ + +#include "base/basictypes.h" +#include "base/scoped_ptr.h" +#include "third_party/libjingle/source/talk/base/sigslot.h" +#include "webkit/glue/p2p_transport.h" + +class RenderView; + +namespace cricket { +class Candidate; +class PortAllocator; +class P2PTransportChannel; +class TransportChannel; +class TransportChannelImpl; +} // namespace cricket + +namespace jingle_glue { +class TransportChannelSocketAdapter; +} // namespace jingle_glue + +namespace talk_base { +class NetworkManager; +class PacketSocketFactory; +} // namespace talk_base + +class P2PTransportImpl : public webkit_glue::P2PTransport, + public sigslot::has_slots<> { + public: + // Create P2PTransportImpl using specified NetworkManager and + // PacketSocketFactory. Caller keeps ownership of |network_manager| + // and |socket_factory|. + P2PTransportImpl(talk_base::NetworkManager* network_manager, + talk_base::PacketSocketFactory* socket_factory); + + virtual ~P2PTransportImpl(); + + // webkit_glue::P2PTransport interface. + virtual bool Init(const std::string& name, const std::string& config, + EventHandler* event_handler) OVERRIDE; + virtual bool AddRemoteCandidate(const std::string& address) OVERRIDE; + virtual net::Socket* GetChannel() OVERRIDE; + + private: + class ChannelAdapter; + + void OnRequestSignaling(); + void OnCandidateReady(cricket::TransportChannelImpl* channel, + const cricket::Candidate& candidate); + void OnReadableState(cricket::TransportChannel* channel); + void OnWriteableState(cricket::TransportChannel* channel); + + std::string SerializeCandidate(const cricket::Candidate& candidate); + bool DeserializeCandidate(const std::string& address, + cricket::Candidate* candidate); + + std::string name_; + EventHandler* event_handler_; + State state_; + + talk_base::NetworkManager* network_manager_; + talk_base::PacketSocketFactory* socket_factory_; + + scoped_ptr<cricket::PortAllocator> allocator_; + scoped_ptr<cricket::P2PTransportChannel> channel_; + + scoped_ptr<jingle_glue::TransportChannelSocketAdapter> channel_adapter_; + + DISALLOW_COPY_AND_ASSIGN(P2PTransportImpl); +}; + +#endif // CONTENT_RENDERER_P2P_P2P_TRANSPORT_IMPL_H_ diff --git a/content/renderer/p2p/p2p_transport_impl_unittest.cc b/content/renderer/p2p/p2p_transport_impl_unittest.cc new file mode 100644 index 0000000..fc87fb6 --- /dev/null +++ b/content/renderer/p2p/p2p_transport_impl_unittest.cc @@ -0,0 +1,304 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop.h" +#include "base/test/test_timeouts.h" +#include "content/renderer/p2p/p2p_transport_impl.h" +#include "jingle/glue/fake_network_manager.h" +#include "jingle/glue/fake_socket_factory.h" +#include "jingle/glue/thread_wrapper.h" +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/base/net_util.h" +#include "net/socket/socket.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::AtMost; +using testing::Exactly; +using testing::Invoke; + +using webkit_glue::P2PTransport; + +namespace { +const char kTestAddress1[] = "192.168.15.12"; +const char kTestAddress2[] = "192.168.15.33"; + +const char kTransportName1[] = "tr1"; +const char kTransportName2[] = "tr2"; + +const char kTestConfig[] = ""; + +// Send 10 packets 10 bytes each. Packets are sent with 10ms delay +// between packets (about 100 ms for 10 messages). +const int kMessageSize = 10; +const int kMessages = 10; +const int kUdpWriteDelayMs = 10; + +class ChannelTester : public base::RefCountedThreadSafe<ChannelTester> { + public: + ChannelTester(MessageLoop* message_loop, + net::Socket* write_socket, + net::Socket* read_socket) + : message_loop_(message_loop), + write_socket_(write_socket), + read_socket_(read_socket), + done_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_cb_(this, &ChannelTester::OnWritten)), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_cb_(this, &ChannelTester::OnRead)), + write_errors_(0), + read_errors_(0), + packets_sent_(0), + packets_received_(0), + broken_packets_(0) { + } + + virtual ~ChannelTester() { } + + void Start() { + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoStart)); + } + + void CheckResults() { + EXPECT_EQ(0, write_errors_); + EXPECT_EQ(0, read_errors_); + + EXPECT_EQ(0, broken_packets_); + + // Verify that we've received at least one packet. + EXPECT_GT(packets_received_, 0); + LOG(INFO) << "Received " << packets_received_ << " packets out of " + << kMessages; + } + + protected: + void Done() { + done_ = true; + message_loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask()); + } + + void DoStart() { + DoRead(); + DoWrite(); + } + + void DoWrite() { + if (packets_sent_ >= kMessages) { + Done(); + return; + } + + scoped_refptr<net::IOBuffer> packet(new net::IOBuffer(kMessageSize)); + memset(packet->data(), 123, kMessageSize); + sent_packets_[packets_sent_] = packet; + // Put index of this packet in the beginning of the packet body. + memcpy(packet->data(), &packets_sent_, sizeof(packets_sent_)); + + int result = write_socket_->Write(packet, kMessageSize, &write_cb_); + HandleWriteResult(result); + } + + void OnWritten(int result) { + HandleWriteResult(result); + } + + void HandleWriteResult(int result) { + if (result <= 0 && result != net::ERR_IO_PENDING) { + LOG(ERROR) << "Received error " << result << " when trying to write"; + write_errors_++; + Done(); + } else if (result > 0) { + EXPECT_EQ(kMessageSize, result); + packets_sent_++; + message_loop_->PostDelayedTask( + FROM_HERE, NewRunnableMethod(this, &ChannelTester::DoWrite), + kUdpWriteDelayMs); + } + } + + void DoRead() { + int result = 1; + while (result > 0) { + int kReadSize = kMessageSize * 2; + read_buffer_ = new net::IOBuffer(kReadSize); + + result = read_socket_->Read(read_buffer_, kReadSize, &read_cb_); + HandleReadResult(result); + }; + } + + void OnRead(int result) { + HandleReadResult(result); + DoRead(); + } + + void HandleReadResult(int result) { + if (result <= 0 && result != net::ERR_IO_PENDING) { + // Error will be received after the socket is closed. + if (!done_) { + LOG(ERROR) << "Received error " << result << " when trying to read"; + read_errors_++; + Done(); + } + } else if (result > 0) { + packets_received_++; + if (kMessageSize != result) { + // Invalid packet size; + broken_packets_++; + } else { + // Validate packet body. + int packet_id; + memcpy(&packet_id, read_buffer_->data(), sizeof(packet_id)); + if (packet_id < 0 || packet_id >= kMessages) { + broken_packets_++; + } else { + if (memcmp(read_buffer_->data(), sent_packets_[packet_id]->data(), + kMessageSize) != 0) + broken_packets_++; + } + } + } + } + + private: + MessageLoop* message_loop_; + net::Socket* write_socket_; + net::Socket* read_socket_; + bool done_; + + scoped_refptr<net::IOBuffer> sent_packets_[kMessages]; + scoped_refptr<net::IOBuffer> read_buffer_; + + net::CompletionCallbackImpl<ChannelTester> write_cb_; + net::CompletionCallbackImpl<ChannelTester> read_cb_; + int write_errors_; + int read_errors_; + int packets_sent_; + int packets_received_; + int broken_packets_; +}; + +} // namespace + +class MockP2PEventHandler : public P2PTransport::EventHandler { + public: + MOCK_METHOD1(OnCandidateReady, void(const std::string& address)); + MOCK_METHOD1(OnStateChange, void(P2PTransport::State state)); +}; + +class P2PTransportImplTest : public testing::Test { + public: + + protected: + void SetUp() OVERRIDE { + socket_manager_ = new jingle_glue::FakeSocketManager(); + + net::IPAddressNumber ip; + ASSERT(net::ParseIPLiteralToNumber(kTestAddress1, &ip)); + network_manager1_.reset(new jingle_glue::FakeNetworkManager(ip)); + socket_factory1_.reset( + new jingle_glue::FakeSocketFactory(socket_manager_, ip)); + transport1_.reset(new P2PTransportImpl(network_manager1_.get(), + socket_factory1_.get())); + + ASSERT(net::ParseIPLiteralToNumber(kTestAddress2, &ip)); + network_manager2_.reset(new jingle_glue::FakeNetworkManager(ip)); + socket_factory2_.reset( + new jingle_glue::FakeSocketFactory(socket_manager_, ip)); + transport2_.reset(new P2PTransportImpl(network_manager2_.get(), + socket_factory2_.get())); + } + + MessageLoop message_loop_; + + scoped_ptr<jingle_glue::FakeNetworkManager> network_manager1_; + scoped_ptr<jingle_glue::FakeNetworkManager> network_manager2_; + scoped_refptr<jingle_glue::FakeSocketManager> socket_manager_; + scoped_ptr<jingle_glue::FakeSocketFactory> socket_factory1_; + scoped_ptr<jingle_glue::FakeSocketFactory> socket_factory2_; + + scoped_ptr<P2PTransportImpl> transport1_; + MockP2PEventHandler event_handler1_; + scoped_ptr<P2PTransportImpl> transport2_; + MockP2PEventHandler event_handler2_; +}; + +TEST_F(P2PTransportImplTest, Create) { + ASSERT_TRUE(transport1_->Init( + kTransportName1, kTestConfig, &event_handler1_)); + ASSERT_TRUE(transport2_->Init( + kTransportName2, kTestConfig, &event_handler2_)); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)); + + message_loop_.RunAllPending(); +} + +ACTION_P(AddRemoteCandidate, transport) { + EXPECT_TRUE(transport->AddRemoteCandidate(arg0)); +} + +TEST_F(P2PTransportImplTest, Connect) { + ASSERT_TRUE(transport1_->Init( + kTransportName1, kTestConfig, &event_handler1_)); + ASSERT_TRUE(transport2_->Init( + kTransportName2, kTestConfig, &event_handler2_)); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport2_.get())); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport1_.get())); + + message_loop_.RunAllPending(); +} + +TEST_F(P2PTransportImplTest, SendData) { + ASSERT_TRUE(transport1_->Init( + kTransportName1, kTestConfig, &event_handler1_)); + ASSERT_TRUE(transport2_->Init( + kTransportName2, kTestConfig, &event_handler2_)); + + EXPECT_CALL(event_handler1_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport2_.get())); + EXPECT_CALL(event_handler2_, OnCandidateReady(_)).WillRepeatedly( + AddRemoteCandidate(transport1_.get())); + + // Transport may first become ether readable or writable, but + // eventually it must be readable and writable. + EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_READABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler1_, OnStateChange(P2PTransport::STATE_WRITABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler1_, OnStateChange( + static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE | + P2PTransport::STATE_WRITABLE))) + .Times(Exactly(1)); + + EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_READABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler2_, OnStateChange(P2PTransport::STATE_WRITABLE)) + .Times(AtMost(1)); + EXPECT_CALL(event_handler2_, OnStateChange( + static_cast<P2PTransport::State>(P2PTransport::STATE_READABLE | + P2PTransport::STATE_WRITABLE))) + .Times(Exactly(1)); + + scoped_refptr<ChannelTester> channel_tester = new ChannelTester( + &message_loop_, transport1_->GetChannel(), transport2_->GetChannel()); + + message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), + TestTimeouts::action_max_timeout_ms()); + + channel_tester->Start(); + message_loop_.Run(); + channel_tester->CheckResults(); +} |