diff options
author | sergeyu <sergeyu@chromium.org> | 2015-12-01 09:08:25 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-12-01 17:09:08 +0000 |
commit | 824c1056e444395a43870a02c9f9c04cc6a58dcd (patch) | |
tree | 5b3de5227751f9e8e2e9de4eefd2b308e0ebd41f | |
parent | 0edd36dd4ac523ad6989fd2e387b15c68d22379e (diff) | |
download | chromium_src-824c1056e444395a43870a02c9f9c04cc6a58dcd.zip chromium_src-824c1056e444395a43870a02c9f9c04cc6a58dcd.tar.gz chromium_src-824c1056e444395a43870a02c9f9c04cc6a58dcd.tar.bz2 |
Implement data channel support in WebrtcTransport.
Now WebrtcTransport::GetStreamChannelFactory() returns a valid factory
that creates streams using WebRTC data channel.
BUG=547158
Review URL: https://codereview.chromium.org/1488723002
Cr-Commit-Position: refs/heads/master@{#362438}
-rw-r--r-- | remoting/protocol/webrtc_data_stream_adapter.cc | 276 | ||||
-rw-r--r-- | remoting/protocol/webrtc_data_stream_adapter.h | 59 | ||||
-rw-r--r-- | remoting/protocol/webrtc_transport.cc | 12 | ||||
-rw-r--r-- | remoting/protocol/webrtc_transport.h | 3 | ||||
-rw-r--r-- | remoting/protocol/webrtc_transport_unittest.cc | 121 | ||||
-rw-r--r-- | remoting/remoting_srcs.gypi | 2 |
6 files changed, 436 insertions, 37 deletions
diff --git a/remoting/protocol/webrtc_data_stream_adapter.cc b/remoting/protocol/webrtc_data_stream_adapter.cc new file mode 100644 index 0000000..9c1dac1 --- /dev/null +++ b/remoting/protocol/webrtc_data_stream_adapter.cc @@ -0,0 +1,276 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/webrtc_data_stream_adapter.h" + +#include "base/callback.h" +#include "base/callback_helpers.h" +#include "base/location.h" +#include "base/thread_task_runner_handle.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "remoting/base/compound_buffer.h" +#include "remoting/protocol/p2p_stream_socket.h" + +static const int kMaxSendBufferSize = 256 * 1024; + +namespace remoting { +namespace protocol { + +class WebrtcDataStreamAdapter::Channel : public P2PStreamSocket, + public webrtc::DataChannelObserver { + public: + typedef base::Callback<void(Channel* adapter, bool success)> + ConnectedCallback; + + Channel(const ConnectedCallback& connected_callback); + ~Channel() override; + + void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); + + std::string name() { return channel_->label(); } + + // P2PStreamSocket interface. + int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback &callback) override; + int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback &callback) override; + + private: + // webrtc::DataChannelObserver interface. + void OnStateChange() override; + void OnMessage(const webrtc::DataBuffer& buffer) override; + void OnBufferedAmountChange(uint64_t previous_amount) override; + + int DoWrite(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size); + + rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; + ConnectedCallback connected_callback_; + + scoped_refptr<net::IOBuffer> write_buffer_; + int write_buffer_size_; + net::CompletionCallback write_callback_; + + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + net::CompletionCallback read_callback_; + + CompoundBuffer received_data_buffer_; + + base::WeakPtrFactory<Channel> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(Channel); +}; + +WebrtcDataStreamAdapter::Channel::Channel( + const ConnectedCallback& connected_callback) + : connected_callback_(connected_callback), weak_factory_(this) {} + +WebrtcDataStreamAdapter::Channel::~Channel() { + if (channel_) { + channel_->UnregisterObserver(); + channel_->Close(); + } +} + +void WebrtcDataStreamAdapter::Channel::Start( + rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { + DCHECK(!channel_); + + channel_ = channel; + channel_->RegisterObserver(this); + + if (channel_->state() == webrtc::DataChannelInterface::kOpen) { + base::ResetAndReturn(&connected_callback_).Run(this, true); + } else { + DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); + } +} + +int WebrtcDataStreamAdapter::Channel::Read( + const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK(read_callback_.is_null()); + + if (received_data_buffer_.total_bytes() == 0) { + read_buffer_ = buffer; + read_buffer_size_ = buffer_size; + read_callback_ = callback; + return net::ERR_IO_PENDING; + } + + int bytes_to_copy = + std::min(buffer_size, received_data_buffer_.total_bytes()); + received_data_buffer_.CopyTo(buffer->data(), bytes_to_copy); + received_data_buffer_.CropFront(bytes_to_copy); + return bytes_to_copy; +} + +int WebrtcDataStreamAdapter::Channel::Write( + const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK(write_callback_.is_null()); + + if (channel_->buffered_amount() >= kMaxSendBufferSize) { + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + write_callback_ = callback; + return net::ERR_IO_PENDING; + } + + return DoWrite(buffer, buffer_size); +} + +void WebrtcDataStreamAdapter::Channel::OnStateChange() { + switch (channel_->state()) { + case webrtc::DataChannelInterface::kConnecting: + break; + + case webrtc::DataChannelInterface::kOpen: + DCHECK(!connected_callback_.is_null()); + base::ResetAndReturn(&connected_callback_).Run(this, true); + break; + + case webrtc::DataChannelInterface::kClosing: { + // Hold weak pointer for self to detect when one of the callbacks deletes + // the channel. + base::WeakPtr<Channel> self = weak_factory_.GetWeakPtr(); + if (!connected_callback_.is_null()) { + base::ResetAndReturn(&connected_callback_).Run(this, false); + } + + if (self && !read_callback_.is_null()) { + read_buffer_ = nullptr; + base::ResetAndReturn(&read_callback_).Run(net::ERR_CONNECTION_CLOSED); + } + + if (self && !write_callback_.is_null()) { + write_buffer_ = nullptr; + base::ResetAndReturn(&write_callback_).Run(net::ERR_CONNECTION_CLOSED); + } + break; + } + case webrtc::DataChannelInterface::kClosed: + DCHECK(connected_callback_.is_null()); + break; + } +} + +void WebrtcDataStreamAdapter::Channel::OnMessage( + const webrtc::DataBuffer& buffer) { + const char* data = reinterpret_cast<const char*>(buffer.data.data()); + int data_size = buffer.data.size(); + + // If there is no outstanding read request then just copy the data to + // |received_data_buffer_|. + if (read_callback_.is_null()) { + received_data_buffer_.AppendCopyOf(data, data_size); + return; + } + + DCHECK(received_data_buffer_.total_bytes() == 0); + int bytes_to_copy = std::min(read_buffer_size_, data_size); + memcpy(read_buffer_->data(), buffer.data.data(), bytes_to_copy); + + if (bytes_to_copy < data_size) { + received_data_buffer_.AppendCopyOf(data + bytes_to_copy, + data_size - bytes_to_copy); + } + read_buffer_ = nullptr; + base::ResetAndReturn(&read_callback_).Run(bytes_to_copy); +} + +void WebrtcDataStreamAdapter::Channel::OnBufferedAmountChange( + uint64_t previous_amount) { + if (channel_->buffered_amount() < kMaxSendBufferSize) { + base::ResetAndReturn(&write_callback_) + .Run(DoWrite(write_buffer_, write_buffer_size_)); + } +} + +int WebrtcDataStreamAdapter::Channel::DoWrite( + const scoped_refptr<net::IOBuffer>& buffer, + int buffer_size) { + webrtc::DataBuffer data_buffer(rtc::Buffer(buffer->data(), buffer_size), + true /* binary */); + if (channel_->Send(data_buffer)) { + return buffer_size; + } else { + return net::ERR_FAILED; + } +} + +WebrtcDataStreamAdapter::WebrtcDataStreamAdapter() : weak_factory_(this) {} + +WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { + DCHECK(pending_channels_.empty()); +} + +void WebrtcDataStreamAdapter::Initialize( + rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection, + bool is_server) { + peer_connection_ = peer_connection; + is_server_ = is_server; + + if (!is_server_) { + for (auto& channel : pending_channels_) { + webrtc::DataChannelInit config; + config.reliable = true; + channel.second->Start( + peer_connection_->CreateDataChannel(channel.first, &config)); + } + } +} + +void WebrtcDataStreamAdapter::OnIncomingDataChannel( + webrtc::DataChannelInterface* data_channel) { + auto it = pending_channels_.find(data_channel->label()); + if (!is_server_ || it == pending_channels_.end()) { + LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); + return; + } + it->second->Start(data_channel); +} + +void WebrtcDataStreamAdapter::CreateChannel( + const std::string& name, + const ChannelCreatedCallback& callback) { + DCHECK(pending_channels_.find(name) == pending_channels_.end()); + + Channel* channel = + new Channel(base::Bind(&WebrtcDataStreamAdapter::OnChannelConnected, + base::Unretained(this), callback)); + pending_channels_[name] = channel; + + if (peer_connection_ && !is_server_) { + webrtc::DataChannelInit config; + config.reliable = true; + channel->Start(peer_connection_->CreateDataChannel(name, &config)); + } +} + +void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { + auto it = pending_channels_.find(name); + DCHECK(it != pending_channels_.end()); + delete it->second; + pending_channels_.erase(it); +} + +void WebrtcDataStreamAdapter::OnChannelConnected( + const ChannelCreatedCallback& connected_callback, + Channel* channel, + bool connected) { + auto it = pending_channels_.find(channel->name()); + DCHECK(it != pending_channels_.end()); + pending_channels_.erase(it); + + // The callback can delete the channel which also holds the callback + // object which may cause crash if the callback carries some arguments. Copy + // the callback to stack to avoid this problem. + ChannelCreatedCallback callback = connected_callback; + callback.Run(make_scoped_ptr(channel)); +} + +} // namespace protocol +} // namespace remoting diff --git a/remoting/protocol/webrtc_data_stream_adapter.h b/remoting/protocol/webrtc_data_stream_adapter.h new file mode 100644 index 0000000..1200db6 --- /dev/null +++ b/remoting/protocol/webrtc_data_stream_adapter.h @@ -0,0 +1,59 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_WEBRTC_DATA_STREAM_ADAPTER_H_ +#define REMOTING_PROTOCOL_WEBRTC_DATA_STREAM_ADAPTER_H_ + +#include <string> + +#include "base/macros.h" +#include "base/memory/weak_ptr.h" +#include "remoting/protocol/stream_channel_factory.h" +#include "third_party/libjingle/source/talk/app/webrtc/peerconnectioninterface.h" +#include "third_party/webrtc/base/refcount.h" + +namespace rtc { +class PeerConnectionInterface; +} // namespace rtc + +namespace remoting { +namespace protocol { + +class WebrtcDataStreamAdapter : public StreamChannelFactory { + public: + WebrtcDataStreamAdapter(); + ~WebrtcDataStreamAdapter() override; + + void Initialize( + rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection, + bool is_server); + + void OnIncomingDataChannel(webrtc::DataChannelInterface* data_channel); + + // StreamChannelFactory interface. + void CreateChannel(const std::string& name, + const ChannelCreatedCallback& callback) override; + void CancelChannelCreation(const std::string& name) override; + + private: + class Channel; + + void OnChannelConnected(const ChannelCreatedCallback& connected_callback, + Channel* channel, + bool connected); + + rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_; + bool is_server_ = false; + + std::map<std::string, Channel*> pending_channels_; + + base::WeakPtrFactory<WebrtcDataStreamAdapter> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(WebrtcDataStreamAdapter); +}; + +} // namespace protocol +} // namespace remoting + +#endif // REMOTING_PROTOCOL_WEBRTC_DATA_STREAM_ADAPTER_H_ diff --git a/remoting/protocol/webrtc_transport.cc b/remoting/protocol/webrtc_transport.cc index 4bc629b..295ef9c 100644 --- a/remoting/protocol/webrtc_transport.cc +++ b/remoting/protocol/webrtc_transport.cc @@ -225,9 +225,7 @@ DatagramChannelFactory* WebrtcTransport::GetDatagramChannelFactory() { StreamChannelFactory* WebrtcTransport::GetStreamChannelFactory() { DCHECK(thread_checker_.CalledOnValidThread()); - // TODO(sergeyu): Implement data stream support. - NOTIMPLEMENTED(); - return nullptr; + return &data_stream_adapter_; } StreamChannelFactory* WebrtcTransport::GetMultiplexedChannelFactory() { @@ -261,6 +259,9 @@ void WebrtcTransport::DoStart(rtc::Thread* worker_thread) { peer_connection_ = peer_connection_factory_->CreatePeerConnection( rtc_config, &constraints, port_allocator_factory_, nullptr, this); + data_stream_adapter_.Initialize(peer_connection_, + role_ == TransportRole::SERVER); + if (role_ == TransportRole::CLIENT) { webrtc::FakeConstraints offer_config; offer_config.AddMandatory( @@ -269,6 +270,9 @@ void WebrtcTransport::DoStart(rtc::Thread* worker_thread) { offer_config.AddMandatory( webrtc::MediaConstraintsInterface::kOfferToReceiveAudio, webrtc::MediaConstraintsInterface::kValueFalse); + offer_config.AddMandatory( + webrtc::MediaConstraintsInterface::kEnableDtlsSrtp, + webrtc::MediaConstraintsInterface::kValueTrue); peer_connection_->CreateOffer( CreateSessionDescriptionObserver::Create( base::Bind(&WebrtcTransport::OnLocalSessionDescriptionCreated, @@ -386,7 +390,7 @@ void WebrtcTransport::OnRemoveStream(webrtc::MediaStreamInterface* stream) { void WebrtcTransport::OnDataChannel( webrtc::DataChannelInterface* data_channel) { DCHECK(thread_checker_.CalledOnValidThread()); - // TODO(sergeyu): Use the data channel. + data_stream_adapter_.OnIncomingDataChannel(data_channel); } void WebrtcTransport::OnRenegotiationNeeded() { diff --git a/remoting/protocol/webrtc_transport.h b/remoting/protocol/webrtc_transport.h index cbc6458..f480ee6 100644 --- a/remoting/protocol/webrtc_transport.h +++ b/remoting/protocol/webrtc_transport.h @@ -13,6 +13,7 @@ #include "base/threading/thread_checker.h" #include "base/timer/timer.h" #include "remoting/protocol/transport.h" +#include "remoting/protocol/webrtc_data_stream_adapter.h" #include "remoting/signaling/signal_strategy.h" #include "third_party/libjingle/source/talk/app/webrtc/peerconnectioninterface.h" @@ -90,6 +91,8 @@ class WebrtcTransport : public Transport, std::list<rtc::scoped_refptr<webrtc::MediaStreamInterface>> unclaimed_streams_; + WebrtcDataStreamAdapter data_stream_adapter_; + base::WeakPtrFactory<WebrtcTransport> weak_factory_; DISALLOW_COPY_AND_ASSIGN(WebrtcTransport); diff --git a/remoting/protocol/webrtc_transport_unittest.cc b/remoting/protocol/webrtc_transport_unittest.cc index 8f21cca..d327085 100644 --- a/remoting/protocol/webrtc_transport_unittest.cc +++ b/remoting/protocol/webrtc_transport_unittest.cc @@ -7,10 +7,13 @@ #include "base/message_loop/message_loop.h" #include "base/run_loop.h" #include "jingle/glue/thread_wrapper.h" +#include "net/base/io_buffer.h" #include "net/url_request/url_request_context_getter.h" #include "remoting/protocol/chromium_port_allocator_factory.h" +#include "remoting/protocol/connection_tester.h" #include "remoting/protocol/fake_authenticator.h" #include "remoting/protocol/network_settings.h" +#include "remoting/protocol/p2p_stream_socket.h" #include "remoting/signaling/fake_signal_strategy.h" #include "testing/gtest/include/gtest/gtest.h" #include "third_party/webrtc/libjingle/xmllite/xmlelement.h" @@ -21,6 +24,7 @@ namespace protocol { namespace { const char kTestJid[] = "client@gmail.com/321"; +const char kChannelName[] = "test_channel"; class TestTransportEventHandler : public Transport::EventHandler { public: @@ -82,12 +86,49 @@ class WebrtcTransportTest : public testing::Test { } protected: - void WaitUntilConnected() { + void InitializeConnection() { + signal_strategy_.reset(new FakeSignalStrategy(kTestJid)); + + host_transport_factory_.reset(new WebrtcTransportFactory( + signal_strategy_.get(), + ChromiumPortAllocatorFactory::Create(network_settings_, nullptr), + TransportRole::SERVER)); + host_transport_ = host_transport_factory_->CreateTransport(); + host_authenticator_.reset(new FakeAuthenticator( + FakeAuthenticator::HOST, 0, FakeAuthenticator::ACCEPT, false)); + + client_transport_factory_.reset(new WebrtcTransportFactory( + signal_strategy_.get(), + ChromiumPortAllocatorFactory::Create(network_settings_, nullptr), + TransportRole::CLIENT)); + client_transport_ = client_transport_factory_->CreateTransport(); + host_authenticator_.reset(new FakeAuthenticator( + FakeAuthenticator::CLIENT, 0, FakeAuthenticator::ACCEPT, false)); + + // Connect signaling between the two WebrtcTransport objects. + host_event_handler_.set_transport_info_callback( + base::Bind(&WebrtcTransportTest::ProcessTransportInfo, + base::Unretained(this), &client_transport_)); + client_event_handler_.set_transport_info_callback( + base::Bind(&WebrtcTransportTest::ProcessTransportInfo, + base::Unretained(this), &host_transport_)); + } + + void StartConnection() { + host_event_handler_.set_connected_callback(base::Bind(&base::DoNothing)); + client_event_handler_.set_connected_callback(base::Bind(&base::DoNothing)); + host_event_handler_.set_error_callback(base::Bind( - &WebrtcTransportTest::QuitRunLoopOnError, base::Unretained(this))); + &WebrtcTransportTest::OnSessionError, base::Unretained(this))); client_event_handler_.set_error_callback(base::Bind( - &WebrtcTransportTest::QuitRunLoopOnError, base::Unretained(this))); + &WebrtcTransportTest::OnSessionError, base::Unretained(this))); + + host_transport_->Start(&host_event_handler_, host_authenticator_.get()); + client_transport_->Start(&client_event_handler_, + client_authenticator_.get()); + } + void WaitUntilConnected() { int counter = 2; host_event_handler_.set_connected_callback( base::Bind(&WebrtcTransportTest::QuitRunLoopOnCounter, @@ -102,7 +143,28 @@ class WebrtcTransportTest : public testing::Test { EXPECT_EQ(OK, error_); } - void QuitRunLoopOnError(ErrorCode error) { + void CreateDataStream() { + client_transport_->GetStreamChannelFactory()->CreateChannel( + kChannelName, base::Bind(&WebrtcTransportTest::OnClientChannelCreated, + base::Unretained(this))); + host_transport_->GetStreamChannelFactory()->CreateChannel( + kChannelName, base::Bind(&WebrtcTransportTest::OnHostChannelCreated, + base::Unretained(this))); + } + + void OnClientChannelCreated(scoped_ptr<P2PStreamSocket> socket) { + client_socket_ = socket.Pass(); + if (run_loop_ && host_socket_) + run_loop_->Quit(); + } + + void OnHostChannelCreated(scoped_ptr<P2PStreamSocket> socket) { + host_socket_ = socket.Pass(); + if (run_loop_ && client_socket_) + run_loop_->Quit(); + } + + void OnSessionError(ErrorCode error) { error_ = error; run_loop_->Quit(); } @@ -131,41 +193,34 @@ class WebrtcTransportTest : public testing::Test { TestTransportEventHandler client_event_handler_; scoped_ptr<FakeAuthenticator> client_authenticator_; + scoped_ptr<P2PStreamSocket> client_socket_; + scoped_ptr<P2PStreamSocket> host_socket_; + ErrorCode error_ = OK; }; TEST_F(WebrtcTransportTest, Connects) { - signal_strategy_.reset(new FakeSignalStrategy(kTestJid)); - - host_transport_factory_.reset(new WebrtcTransportFactory( - signal_strategy_.get(), - ChromiumPortAllocatorFactory::Create(network_settings_, nullptr), - TransportRole::SERVER)); - host_transport_ = host_transport_factory_->CreateTransport(); - host_authenticator_.reset(new FakeAuthenticator( - FakeAuthenticator::HOST, 0, FakeAuthenticator::ACCEPT, false)); - - client_transport_factory_.reset(new WebrtcTransportFactory( - signal_strategy_.get(), - ChromiumPortAllocatorFactory::Create(network_settings_, nullptr), - TransportRole::CLIENT)); - client_transport_ = client_transport_factory_->CreateTransport(); - host_authenticator_.reset(new FakeAuthenticator( - FakeAuthenticator::CLIENT, 0, FakeAuthenticator::ACCEPT, false)); - - // Connect signaling between the two WebrtcTransport objects. - host_event_handler_.set_transport_info_callback( - base::Bind(&WebrtcTransportTest::ProcessTransportInfo, - base::Unretained(this), &client_transport_)); - client_event_handler_.set_transport_info_callback( - base::Bind(&WebrtcTransportTest::ProcessTransportInfo, - base::Unretained(this), &host_transport_)); - - host_transport_->Start(&host_event_handler_, host_authenticator_.get()); - client_transport_->Start(&client_event_handler_, client_authenticator_.get()); - + InitializeConnection(); + StartConnection(); WaitUntilConnected(); } +TEST_F(WebrtcTransportTest, DataStream) { + InitializeConnection(); + CreateDataStream(); + StartConnection(); + + run_loop_.reset(new base::RunLoop()); + run_loop_->Run(); + + const int kMessageSize = 1024; + const int kMessages = 100; + StreamConnectionTester tester(host_socket_.get(), client_socket_.get(), + kMessageSize, kMessages); + tester.Start(); + message_loop_.Run(); + tester.CheckResults(); +} + } // namespace protocol } // namespace remoting diff --git a/remoting/remoting_srcs.gypi b/remoting/remoting_srcs.gypi index 69f9bbc..3286efe 100644 --- a/remoting/remoting_srcs.gypi +++ b/remoting/remoting_srcs.gypi @@ -216,6 +216,8 @@ 'protocol/ice_connection_to_client.h', 'protocol/video_frame_pump.cc', 'protocol/video_frame_pump.h', + 'protocol/webrtc_data_stream_adapter.cc', + 'protocol/webrtc_data_stream_adapter.h', 'protocol/webrtc_transport.cc', 'protocol/webrtc_transport.h', 'protocol/webrtc_video_capturer_adapter.cc', |