diff options
author | sergeyu <sergeyu@chromium.org> | 2015-12-01 09:08:25 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-12-01 17:09:08 +0000 |
commit | 824c1056e444395a43870a02c9f9c04cc6a58dcd (patch) | |
tree | 5b3de5227751f9e8e2e9de4eefd2b308e0ebd41f /remoting/protocol/webrtc_data_stream_adapter.cc | |
parent | 0edd36dd4ac523ad6989fd2e387b15c68d22379e (diff) | |
download | chromium_src-824c1056e444395a43870a02c9f9c04cc6a58dcd.zip chromium_src-824c1056e444395a43870a02c9f9c04cc6a58dcd.tar.gz chromium_src-824c1056e444395a43870a02c9f9c04cc6a58dcd.tar.bz2 |
Implement data channel support in WebrtcTransport.
Now WebrtcTransport::GetStreamChannelFactory() returns a valid factory
that creates streams using WebRTC data channel.
BUG=547158
Review URL: https://codereview.chromium.org/1488723002
Cr-Commit-Position: refs/heads/master@{#362438}
Diffstat (limited to 'remoting/protocol/webrtc_data_stream_adapter.cc')
-rw-r--r-- | remoting/protocol/webrtc_data_stream_adapter.cc | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/remoting/protocol/webrtc_data_stream_adapter.cc b/remoting/protocol/webrtc_data_stream_adapter.cc new file mode 100644 index 0000000..9c1dac1 --- /dev/null +++ b/remoting/protocol/webrtc_data_stream_adapter.cc @@ -0,0 +1,276 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/webrtc_data_stream_adapter.h" + +#include "base/callback.h" +#include "base/callback_helpers.h" +#include "base/location.h" +#include "base/thread_task_runner_handle.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "remoting/base/compound_buffer.h" +#include "remoting/protocol/p2p_stream_socket.h" + +static const int kMaxSendBufferSize = 256 * 1024; + +namespace remoting { +namespace protocol { + +class WebrtcDataStreamAdapter::Channel : public P2PStreamSocket, + public webrtc::DataChannelObserver { + public: + typedef base::Callback<void(Channel* adapter, bool success)> + ConnectedCallback; + + Channel(const ConnectedCallback& connected_callback); + ~Channel() override; + + void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); + + std::string name() { return channel_->label(); } + + // P2PStreamSocket interface. + int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback &callback) override; + int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback &callback) override; + + private: + // webrtc::DataChannelObserver interface. + void OnStateChange() override; + void OnMessage(const webrtc::DataBuffer& buffer) override; + void OnBufferedAmountChange(uint64_t previous_amount) override; + + int DoWrite(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size); + + rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; + ConnectedCallback connected_callback_; + + scoped_refptr<net::IOBuffer> write_buffer_; + int write_buffer_size_; + net::CompletionCallback write_callback_; + + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + net::CompletionCallback read_callback_; + + CompoundBuffer received_data_buffer_; + + base::WeakPtrFactory<Channel> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(Channel); +}; + +WebrtcDataStreamAdapter::Channel::Channel( + const ConnectedCallback& connected_callback) + : connected_callback_(connected_callback), weak_factory_(this) {} + +WebrtcDataStreamAdapter::Channel::~Channel() { + if (channel_) { + channel_->UnregisterObserver(); + channel_->Close(); + } +} + +void WebrtcDataStreamAdapter::Channel::Start( + rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { + DCHECK(!channel_); + + channel_ = channel; + channel_->RegisterObserver(this); + + if (channel_->state() == webrtc::DataChannelInterface::kOpen) { + base::ResetAndReturn(&connected_callback_).Run(this, true); + } else { + DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); + } +} + +int WebrtcDataStreamAdapter::Channel::Read( + const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK(read_callback_.is_null()); + + if (received_data_buffer_.total_bytes() == 0) { + read_buffer_ = buffer; + read_buffer_size_ = buffer_size; + read_callback_ = callback; + return net::ERR_IO_PENDING; + } + + int bytes_to_copy = + std::min(buffer_size, received_data_buffer_.total_bytes()); + received_data_buffer_.CopyTo(buffer->data(), bytes_to_copy); + received_data_buffer_.CropFront(bytes_to_copy); + return bytes_to_copy; +} + +int WebrtcDataStreamAdapter::Channel::Write( + const scoped_refptr<net::IOBuffer>& buffer, int buffer_size, + const net::CompletionCallback& callback) { + DCHECK(write_callback_.is_null()); + + if (channel_->buffered_amount() >= kMaxSendBufferSize) { + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + write_callback_ = callback; + return net::ERR_IO_PENDING; + } + + return DoWrite(buffer, buffer_size); +} + +void WebrtcDataStreamAdapter::Channel::OnStateChange() { + switch (channel_->state()) { + case webrtc::DataChannelInterface::kConnecting: + break; + + case webrtc::DataChannelInterface::kOpen: + DCHECK(!connected_callback_.is_null()); + base::ResetAndReturn(&connected_callback_).Run(this, true); + break; + + case webrtc::DataChannelInterface::kClosing: { + // Hold weak pointer for self to detect when one of the callbacks deletes + // the channel. + base::WeakPtr<Channel> self = weak_factory_.GetWeakPtr(); + if (!connected_callback_.is_null()) { + base::ResetAndReturn(&connected_callback_).Run(this, false); + } + + if (self && !read_callback_.is_null()) { + read_buffer_ = nullptr; + base::ResetAndReturn(&read_callback_).Run(net::ERR_CONNECTION_CLOSED); + } + + if (self && !write_callback_.is_null()) { + write_buffer_ = nullptr; + base::ResetAndReturn(&write_callback_).Run(net::ERR_CONNECTION_CLOSED); + } + break; + } + case webrtc::DataChannelInterface::kClosed: + DCHECK(connected_callback_.is_null()); + break; + } +} + +void WebrtcDataStreamAdapter::Channel::OnMessage( + const webrtc::DataBuffer& buffer) { + const char* data = reinterpret_cast<const char*>(buffer.data.data()); + int data_size = buffer.data.size(); + + // If there is no outstanding read request then just copy the data to + // |received_data_buffer_|. + if (read_callback_.is_null()) { + received_data_buffer_.AppendCopyOf(data, data_size); + return; + } + + DCHECK(received_data_buffer_.total_bytes() == 0); + int bytes_to_copy = std::min(read_buffer_size_, data_size); + memcpy(read_buffer_->data(), buffer.data.data(), bytes_to_copy); + + if (bytes_to_copy < data_size) { + received_data_buffer_.AppendCopyOf(data + bytes_to_copy, + data_size - bytes_to_copy); + } + read_buffer_ = nullptr; + base::ResetAndReturn(&read_callback_).Run(bytes_to_copy); +} + +void WebrtcDataStreamAdapter::Channel::OnBufferedAmountChange( + uint64_t previous_amount) { + if (channel_->buffered_amount() < kMaxSendBufferSize) { + base::ResetAndReturn(&write_callback_) + .Run(DoWrite(write_buffer_, write_buffer_size_)); + } +} + +int WebrtcDataStreamAdapter::Channel::DoWrite( + const scoped_refptr<net::IOBuffer>& buffer, + int buffer_size) { + webrtc::DataBuffer data_buffer(rtc::Buffer(buffer->data(), buffer_size), + true /* binary */); + if (channel_->Send(data_buffer)) { + return buffer_size; + } else { + return net::ERR_FAILED; + } +} + +WebrtcDataStreamAdapter::WebrtcDataStreamAdapter() : weak_factory_(this) {} + +WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { + DCHECK(pending_channels_.empty()); +} + +void WebrtcDataStreamAdapter::Initialize( + rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection, + bool is_server) { + peer_connection_ = peer_connection; + is_server_ = is_server; + + if (!is_server_) { + for (auto& channel : pending_channels_) { + webrtc::DataChannelInit config; + config.reliable = true; + channel.second->Start( + peer_connection_->CreateDataChannel(channel.first, &config)); + } + } +} + +void WebrtcDataStreamAdapter::OnIncomingDataChannel( + webrtc::DataChannelInterface* data_channel) { + auto it = pending_channels_.find(data_channel->label()); + if (!is_server_ || it == pending_channels_.end()) { + LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); + return; + } + it->second->Start(data_channel); +} + +void WebrtcDataStreamAdapter::CreateChannel( + const std::string& name, + const ChannelCreatedCallback& callback) { + DCHECK(pending_channels_.find(name) == pending_channels_.end()); + + Channel* channel = + new Channel(base::Bind(&WebrtcDataStreamAdapter::OnChannelConnected, + base::Unretained(this), callback)); + pending_channels_[name] = channel; + + if (peer_connection_ && !is_server_) { + webrtc::DataChannelInit config; + config.reliable = true; + channel->Start(peer_connection_->CreateDataChannel(name, &config)); + } +} + +void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { + auto it = pending_channels_.find(name); + DCHECK(it != pending_channels_.end()); + delete it->second; + pending_channels_.erase(it); +} + +void WebrtcDataStreamAdapter::OnChannelConnected( + const ChannelCreatedCallback& connected_callback, + Channel* channel, + bool connected) { + auto it = pending_channels_.find(channel->name()); + DCHECK(it != pending_channels_.end()); + pending_channels_.erase(it); + + // The callback can delete the channel which also holds the callback + // object which may cause crash if the callback carries some arguments. Copy + // the callback to stack to avoid this problem. + ChannelCreatedCallback callback = connected_callback; + callback.Run(make_scoped_ptr(channel)); +} + +} // namespace protocol +} // namespace remoting |