summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsergeyu <sergeyu@chromium.org>2016-02-03 13:11:30 -0800
committerCommit bot <commit-bot@chromium.org>2016-02-03 21:13:28 +0000
commitf1005f6e964841763ae3792ff6aea399f9514a67 (patch)
treec3f82c915b13c1fa30319a24de6d53a74a3937bf
parenta5501522684dbb5968df45fc0119855fcc3ac900 (diff)
downloadchromium_src-f1005f6e964841763ae3792ff6aea399f9514a67.zip
chromium_src-f1005f6e964841763ae3792ff6aea399f9514a67.tar.gz
chromium_src-f1005f6e964841763ae3792ff6aea399f9514a67.tar.bz2
Add MessagePipe interface. Use it in ChannelDispatcherBase.
In WebRTC data streams are message-based, so the message framing mechanism use for the old protocol doesn't make sense for the WebRTC-based protocol. The new MessagePipe represents an abstract messaging channel. StreamMessagePipeAdapter implements framing for the old protocol. A different MessagePipe is going to be used for WebRTC-based protocol. BUG=547158 Review URL: https://codereview.chromium.org/1649063003 Cr-Commit-Position: refs/heads/master@{#373340}
-rw-r--r--remoting/protocol/audio_reader.cc1
-rw-r--r--remoting/protocol/audio_writer.cc13
-rw-r--r--remoting/protocol/channel_dispatcher_base.cc21
-rw-r--r--remoting/protocol/channel_dispatcher_base.h16
-rw-r--r--remoting/protocol/client_control_dispatcher.cc16
-rw-r--r--remoting/protocol/client_event_dispatcher.cc18
-rw-r--r--remoting/protocol/client_video_dispatcher.cc4
-rw-r--r--remoting/protocol/client_video_dispatcher_unittest.cc1
-rw-r--r--remoting/protocol/host_control_dispatcher.cc12
-rw-r--r--remoting/protocol/host_event_dispatcher.cc1
-rw-r--r--remoting/protocol/host_video_dispatcher.cc4
-rw-r--r--remoting/protocol/message_pipe.h45
-rw-r--r--remoting/protocol/stream_message_pipe_adapter.cc58
-rw-r--r--remoting/protocol/stream_message_pipe_adapter.h49
-rw-r--r--remoting/protocol/webrtc_connection_to_client.cc6
-rw-r--r--remoting/remoting_srcs.gypi3
16 files changed, 215 insertions, 53 deletions
diff --git a/remoting/protocol/audio_reader.cc b/remoting/protocol/audio_reader.cc
index e7511f6..272a1f1 100644
--- a/remoting/protocol/audio_reader.cc
+++ b/remoting/protocol/audio_reader.cc
@@ -6,6 +6,7 @@
#include "base/bind.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/audio.pb.h"
#include "remoting/protocol/audio_stub.h"
diff --git a/remoting/protocol/audio_writer.cc b/remoting/protocol/audio_writer.cc
index 3ab624b..be40f91 100644
--- a/remoting/protocol/audio_writer.cc
+++ b/remoting/protocol/audio_writer.cc
@@ -6,25 +6,22 @@
#include "base/bind.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/audio.pb.h"
-#include "remoting/protocol/message_serialization.h"
+#include "remoting/protocol/message_pipe.h"
#include "remoting/protocol/session.h"
#include "remoting/protocol/session_config.h"
namespace remoting {
namespace protocol {
-AudioWriter::AudioWriter()
- : ChannelDispatcherBase(kAudioChannelName) {
-}
-
-AudioWriter::~AudioWriter() {
-}
+AudioWriter::AudioWriter() : ChannelDispatcherBase(kAudioChannelName) {}
+AudioWriter::~AudioWriter() {}
void AudioWriter::ProcessAudioPacket(scoped_ptr<AudioPacket> packet,
const base::Closure& done) {
- writer()->Write(SerializeAndFrameMessage(*packet), done);
+ message_pipe()->Send(packet.get(), done);
}
// static
diff --git a/remoting/protocol/channel_dispatcher_base.cc b/remoting/protocol/channel_dispatcher_base.cc
index 7679460..206d1ff 100644
--- a/remoting/protocol/channel_dispatcher_base.cc
+++ b/remoting/protocol/channel_dispatcher_base.cc
@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "remoting/protocol/p2p_stream_socket.h"
#include "remoting/protocol/stream_channel_factory.h"
+#include "remoting/protocol/stream_message_pipe_adapter.h"
namespace remoting {
namespace protocol {
@@ -16,8 +17,7 @@ namespace protocol {
ChannelDispatcherBase::ChannelDispatcherBase(const char* channel_name)
: channel_name_(channel_name),
channel_factory_(nullptr),
- event_handler_(nullptr) {
-}
+ event_handler_(nullptr) {}
ChannelDispatcherBase::~ChannelDispatcherBase() {
if (channel_factory_)
@@ -41,21 +41,16 @@ void ChannelDispatcherBase::OnChannelReady(
}
channel_factory_ = nullptr;
- channel_ = std::move(socket);
- writer_.Start(
- base::Bind(&P2PStreamSocket::Write, base::Unretained(channel_.get())),
- base::Bind(&ChannelDispatcherBase::OnReadWriteFailed,
- base::Unretained(this)));
- reader_.StartReading(channel_.get(),
- base::Bind(&ChannelDispatcherBase::OnIncomingMessage,
- base::Unretained(this)),
- base::Bind(&ChannelDispatcherBase::OnReadWriteFailed,
- base::Unretained(this)));
+ message_pipe_.reset(new StreamMessagePipeAdapter(
+ std::move(socket),
+ base::Bind(&ChannelDispatcherBase::OnPipeError, base::Unretained(this))));
+ message_pipe_->StartReceiving(base::Bind(
+ &ChannelDispatcherBase::OnIncomingMessage, base::Unretained(this)));
event_handler_->OnChannelInitialized(this);
}
-void ChannelDispatcherBase::OnReadWriteFailed(int error) {
+void ChannelDispatcherBase::OnPipeError(int error) {
event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR);
}
diff --git a/remoting/protocol/channel_dispatcher_base.h b/remoting/protocol/channel_dispatcher_base.h
index eca2d12..2e940c6 100644
--- a/remoting/protocol/channel_dispatcher_base.h
+++ b/remoting/protocol/channel_dispatcher_base.h
@@ -12,11 +12,15 @@
#include "base/memory/scoped_ptr.h"
#include "remoting/base/buffered_socket_writer.h"
#include "remoting/protocol/errors.h"
-#include "remoting/protocol/message_reader.h"
namespace remoting {
+
+class CompoundBuffer;
+
namespace protocol {
+class MessagePipe;
+class P2PStreamSocket;
class StreamChannelFactory;
// Base class for channel message dispatchers. It's responsible for
@@ -49,27 +53,25 @@ class ChannelDispatcherBase {
const std::string& channel_name() { return channel_name_; }
// Returns true if the channel is currently connected.
- bool is_connected() { return channel_ != nullptr; }
+ bool is_connected() { return message_pipe() != nullptr; }
protected:
explicit ChannelDispatcherBase(const char* channel_name);
- BufferedSocketWriter* writer() { return &writer_; }
+ MessagePipe* message_pipe() { return message_pipe_.get(); }
// Child classes must override this method to handle incoming messages.
virtual void OnIncomingMessage(scoped_ptr<CompoundBuffer> message) = 0;
private:
void OnChannelReady(scoped_ptr<P2PStreamSocket> socket);
- void OnReadWriteFailed(int error);
+ void OnPipeError(int error);
std::string channel_name_;
StreamChannelFactory* channel_factory_;
EventHandler* event_handler_;
- scoped_ptr<P2PStreamSocket> channel_;
- BufferedSocketWriter writer_;
- MessageReader reader_;
+ scoped_ptr<MessagePipe> message_pipe_;
DISALLOW_COPY_AND_ASSIGN(ChannelDispatcherBase);
};
diff --git a/remoting/protocol/client_control_dispatcher.cc b/remoting/protocol/client_control_dispatcher.cc
index 6ee4256..e228b0a 100644
--- a/remoting/protocol/client_control_dispatcher.cc
+++ b/remoting/protocol/client_control_dispatcher.cc
@@ -9,10 +9,12 @@
#include "base/bind_helpers.h"
#include "base/callback.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/control.pb.h"
#include "remoting/proto/internal.pb.h"
#include "remoting/protocol/client_stub.h"
+#include "remoting/protocol/message_pipe.h"
#include "remoting/protocol/message_serialization.h"
namespace remoting {
@@ -66,47 +68,47 @@ void ClientControlDispatcher::InjectClipboardEvent(
const ClipboardEvent& event) {
ControlMessage message;
message.mutable_clipboard_event()->CopyFrom(event);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientControlDispatcher::NotifyClientResolution(
const ClientResolution& resolution) {
ControlMessage message;
message.mutable_client_resolution()->CopyFrom(resolution);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientControlDispatcher::ControlVideo(const VideoControl& video_control) {
ControlMessage message;
message.mutable_video_control()->CopyFrom(video_control);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientControlDispatcher::ControlAudio(const AudioControl& audio_control) {
ControlMessage message;
message.mutable_audio_control()->CopyFrom(audio_control);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientControlDispatcher::SetCapabilities(
const Capabilities& capabilities) {
ControlMessage message;
message.mutable_capabilities()->CopyFrom(capabilities);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientControlDispatcher::RequestPairing(
const PairingRequest& pairing_request) {
ControlMessage message;
message.mutable_pairing_request()->CopyFrom(pairing_request);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientControlDispatcher::DeliverClientMessage(
const ExtensionMessage& message) {
ControlMessage control_message;
control_message.mutable_extension_message()->CopyFrom(message);
- writer()->Write(SerializeAndFrameMessage(control_message), base::Closure());
+ message_pipe()->Send(&control_message, base::Closure());
}
void ClientControlDispatcher::OnIncomingMessage(
diff --git a/remoting/protocol/client_event_dispatcher.cc b/remoting/protocol/client_event_dispatcher.cc
index 87ae9b3..90d2304 100644
--- a/remoting/protocol/client_event_dispatcher.cc
+++ b/remoting/protocol/client_event_dispatcher.cc
@@ -6,20 +6,18 @@
#include "base/time/time.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/event.pb.h"
#include "remoting/proto/internal.pb.h"
-#include "remoting/protocol/message_serialization.h"
+#include "remoting/protocol/message_pipe.h"
namespace remoting {
namespace protocol {
ClientEventDispatcher::ClientEventDispatcher()
- : ChannelDispatcherBase(kEventChannelName) {
-}
-
-ClientEventDispatcher::~ClientEventDispatcher() {
-}
+ : ChannelDispatcherBase(kEventChannelName) {}
+ClientEventDispatcher::~ClientEventDispatcher() {}
void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
DCHECK(event.has_usb_keycode());
@@ -27,7 +25,7 @@ void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
EventMessage message;
message.set_timestamp(base::TimeTicks::Now().ToInternalValue());
message.mutable_key_event()->CopyFrom(event);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) {
@@ -35,21 +33,21 @@ void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) {
EventMessage message;
message.set_timestamp(base::TimeTicks::Now().ToInternalValue());
message.mutable_text_event()->CopyFrom(event);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientEventDispatcher::InjectMouseEvent(const MouseEvent& event) {
EventMessage message;
message.set_timestamp(base::TimeTicks::Now().ToInternalValue());
message.mutable_mouse_event()->CopyFrom(event);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientEventDispatcher::InjectTouchEvent(const TouchEvent& event) {
EventMessage message;
message.set_timestamp(base::TimeTicks::Now().ToInternalValue());
message.mutable_touch_event()->CopyFrom(event);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void ClientEventDispatcher::OnIncomingMessage(
diff --git a/remoting/protocol/client_video_dispatcher.cc b/remoting/protocol/client_video_dispatcher.cc
index f82b7bd..8b21bd6 100644
--- a/remoting/protocol/client_video_dispatcher.cc
+++ b/remoting/protocol/client_video_dispatcher.cc
@@ -9,8 +9,10 @@
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h"
+#include "remoting/protocol/message_pipe.h"
#include "remoting/protocol/message_serialization.h"
#include "remoting/protocol/video_stub.h"
@@ -66,7 +68,7 @@ void ClientVideoDispatcher::OnPacketDone(
while (!pending_frames_.empty() && pending_frames_.front().done) {
VideoAck ack_message;
ack_message.set_frame_id(pending_frames_.front().frame_id);
- writer()->Write(SerializeAndFrameMessage(ack_message), base::Closure());
+ message_pipe()->Send(&ack_message, base::Closure());
pending_frames_.pop_front();
}
}
diff --git a/remoting/protocol/client_video_dispatcher_unittest.cc b/remoting/protocol/client_video_dispatcher_unittest.cc
index bc31d1a..fb39b46 100644
--- a/remoting/protocol/client_video_dispatcher_unittest.cc
+++ b/remoting/protocol/client_video_dispatcher_unittest.cc
@@ -12,6 +12,7 @@
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/fake_stream_socket.h"
+#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/message_serialization.h"
#include "remoting/protocol/video_stub.h"
#include "testing/gtest/include/gtest/gtest.h"
diff --git a/remoting/protocol/host_control_dispatcher.cc b/remoting/protocol/host_control_dispatcher.cc
index fb8aee9..627eb7e 100644
--- a/remoting/protocol/host_control_dispatcher.cc
+++ b/remoting/protocol/host_control_dispatcher.cc
@@ -6,11 +6,13 @@
#include "base/callback_helpers.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/control.pb.h"
#include "remoting/proto/internal.pb.h"
#include "remoting/protocol/clipboard_stub.h"
#include "remoting/protocol/host_stub.h"
+#include "remoting/protocol/message_pipe.h"
#include "remoting/protocol/message_serialization.h"
namespace remoting {
@@ -24,34 +26,34 @@ void HostControlDispatcher::SetCapabilities(
const Capabilities& capabilities) {
ControlMessage message;
message.mutable_capabilities()->CopyFrom(capabilities);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void HostControlDispatcher::SetPairingResponse(
const PairingResponse& pairing_response) {
ControlMessage message;
message.mutable_pairing_response()->CopyFrom(pairing_response);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void HostControlDispatcher::DeliverHostMessage(
const ExtensionMessage& message) {
ControlMessage control_message;
control_message.mutable_extension_message()->CopyFrom(message);
- writer()->Write(SerializeAndFrameMessage(control_message), base::Closure());
+ message_pipe()->Send(&control_message, base::Closure());
}
void HostControlDispatcher::InjectClipboardEvent(const ClipboardEvent& event) {
ControlMessage message;
message.mutable_clipboard_event()->CopyFrom(event);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void HostControlDispatcher::SetCursorShape(
const CursorShapeInfo& cursor_shape) {
ControlMessage message;
message.mutable_cursor_shape()->CopyFrom(cursor_shape);
- writer()->Write(SerializeAndFrameMessage(message), base::Closure());
+ message_pipe()->Send(&message, base::Closure());
}
void HostControlDispatcher::OnIncomingMessage(
diff --git a/remoting/protocol/host_event_dispatcher.cc b/remoting/protocol/host_event_dispatcher.cc
index d67f651..4032c0a 100644
--- a/remoting/protocol/host_event_dispatcher.cc
+++ b/remoting/protocol/host_event_dispatcher.cc
@@ -5,6 +5,7 @@
#include "remoting/protocol/host_event_dispatcher.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/event.pb.h"
#include "remoting/proto/internal.pb.h"
diff --git a/remoting/protocol/host_video_dispatcher.cc b/remoting/protocol/host_video_dispatcher.cc
index ca9b634..759f9d6 100644
--- a/remoting/protocol/host_video_dispatcher.cc
+++ b/remoting/protocol/host_video_dispatcher.cc
@@ -8,8 +8,10 @@
#include "base/bind.h"
#include "net/socket/stream_socket.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/proto/video.pb.h"
+#include "remoting/protocol/message_pipe.h"
#include "remoting/protocol/message_serialization.h"
#include "remoting/protocol/video_feedback_stub.h"
@@ -22,7 +24,7 @@ HostVideoDispatcher::~HostVideoDispatcher() {}
void HostVideoDispatcher::ProcessVideoPacket(scoped_ptr<VideoPacket> packet,
const base::Closure& done) {
- writer()->Write(SerializeAndFrameMessage(*packet), done);
+ message_pipe()->Send(packet.get(), done);
}
void HostVideoDispatcher::OnIncomingMessage(
diff --git a/remoting/protocol/message_pipe.h b/remoting/protocol/message_pipe.h
new file mode 100644
index 0000000..107a510
--- /dev/null
+++ b/remoting/protocol/message_pipe.h
@@ -0,0 +1,45 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_MESSAGE_PIPE_H_
+#define REMOTING_PROTOCOL_MESSAGE_PIPE_H_
+
+#include "base/callback_forward.h"
+#include "base/memory/scoped_ptr.h"
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace remoting {
+
+class CompoundBuffer;
+
+namespace protocol {
+
+// Represents a bi-directional pipe that allows to send and receive messages.
+class MessagePipe {
+ public:
+ typedef base::Callback<void(scoped_ptr<CompoundBuffer> message)>
+ MessageReceivedCallback;
+
+ virtual ~MessagePipe() {}
+
+ // Starts receiving incoming messages and calls |callback| for each message.
+ virtual void StartReceiving(const MessageReceivedCallback& callback) = 0;
+
+ // Sends a message. |done| is called when the message has been sent to the
+ // client, but it doesn't mean that the client has received it. |done| is
+ // never called if the message is never sent (e.g. if the pipe is destroyed
+ // before the message is sent).
+ virtual void Send(google::protobuf::MessageLite* message,
+ const base::Closure& done) = 0;
+};
+
+} // namespace protocol
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_MESSAGE_PIPE_H_
diff --git a/remoting/protocol/stream_message_pipe_adapter.cc b/remoting/protocol/stream_message_pipe_adapter.cc
new file mode 100644
index 0000000..8266304
--- /dev/null
+++ b/remoting/protocol/stream_message_pipe_adapter.cc
@@ -0,0 +1,58 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/stream_message_pipe_adapter.h"
+
+#include <utility>
+
+#include "base/bind.h"
+#include "base/callback_helpers.h"
+#include "remoting/base/buffered_socket_writer.h"
+#include "remoting/base/compound_buffer.h"
+#include "remoting/protocol/message_serialization.h"
+#include "remoting/protocol/p2p_stream_socket.h"
+
+namespace remoting {
+namespace protocol {
+
+StreamMessagePipeAdapter::StreamMessagePipeAdapter(
+ scoped_ptr<P2PStreamSocket> socket,
+ const ErrorCallback& error_callback)
+ : socket_(std::move(socket)),
+ error_callback_(error_callback),
+ writer_(new BufferedSocketWriter()) {
+ DCHECK(socket_);
+ DCHECK(!error_callback_.is_null());
+
+ writer_->Start(
+ base::Bind(&P2PStreamSocket::Write, base::Unretained(socket_.get())),
+ base::Bind(&StreamMessagePipeAdapter::CloseOnError,
+ base::Unretained(this)));
+}
+
+StreamMessagePipeAdapter::~StreamMessagePipeAdapter() {}
+
+void StreamMessagePipeAdapter::StartReceiving(
+ const MessageReceivedCallback& callback) {
+ reader_.StartReading(socket_.get(), callback,
+ base::Bind(&StreamMessagePipeAdapter::CloseOnError,
+ base::Unretained(this)));
+}
+
+void StreamMessagePipeAdapter::Send(google::protobuf::MessageLite* message,
+ const base::Closure& done) {
+ if (writer_)
+ writer_->Write(SerializeAndFrameMessage(*message), done);
+}
+
+void StreamMessagePipeAdapter::CloseOnError(int error) {
+ // Stop writing on error.
+ writer_.reset();
+
+ if (!error_callback_.is_null())
+ base::ResetAndReturn(&error_callback_).Run(error);
+}
+
+} // namespace protocol
+} // namespace remoting
diff --git a/remoting/protocol/stream_message_pipe_adapter.h b/remoting/protocol/stream_message_pipe_adapter.h
new file mode 100644
index 0000000..15f7e1d
--- /dev/null
+++ b/remoting/protocol/stream_message_pipe_adapter.h
@@ -0,0 +1,49 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_STREAM_MESSAGE_PIPE_ADAPTER_H_
+#define REMOTING_PROTOCOL_STREAM_MESSAGE_PIPE_ADAPTER_H_
+
+#include "base/callback.h"
+#include "remoting/protocol/message_pipe.h"
+#include "remoting/protocol/message_reader.h"
+
+namespace remoting {
+class BufferedSocketWriter;
+
+namespace protocol {
+
+class P2PStreamSocket;
+
+// MessagePipe implementation that sends and receives messages over a
+// P2PStreamChannel.
+class StreamMessagePipeAdapter : public MessagePipe {
+ public:
+ typedef base::Callback<void(int)> ErrorCallback;
+
+ StreamMessagePipeAdapter(scoped_ptr<P2PStreamSocket> socket,
+ const ErrorCallback& error_callback);
+ ~StreamMessagePipeAdapter() override;
+
+ // MessagePipe interface.
+ void StartReceiving(const MessageReceivedCallback& callback) override;
+ void Send(google::protobuf::MessageLite* message,
+ const base::Closure& done) override;
+
+ private:
+ void CloseOnError(int error);
+
+ scoped_ptr<P2PStreamSocket> socket_;
+ ErrorCallback error_callback_;
+
+ MessageReader reader_;
+ scoped_ptr<BufferedSocketWriter> writer_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamMessagePipeAdapter);
+};
+
+} // namespace protocol
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_STREAM_MESSAGE_PIPE_ADAPTER_H_
diff --git a/remoting/protocol/webrtc_connection_to_client.cc b/remoting/protocol/webrtc_connection_to_client.cc
index 1c23e2d..e845137 100644
--- a/remoting/protocol/webrtc_connection_to_client.cc
+++ b/remoting/protocol/webrtc_connection_to_client.cc
@@ -149,7 +149,6 @@ void WebrtcConnectionToClient::OnWebrtcTransportConnecting() {
void WebrtcConnectionToClient::OnWebrtcTransportConnected() {
DCHECK(thread_checker_.CalledOnValidThread());
- event_handler_->OnConnectionChannelsConnected(this);
}
void WebrtcConnectionToClient::OnWebrtcTransportError(ErrorCode error) {
@@ -168,6 +167,11 @@ void WebrtcConnectionToClient::OnWebrtcTransportMediaStreamRemoved(
void WebrtcConnectionToClient::OnChannelInitialized(
ChannelDispatcherBase* channel_dispatcher) {
DCHECK(thread_checker_.CalledOnValidThread());
+
+ if (control_dispatcher_ && control_dispatcher_->is_connected() &&
+ event_dispatcher_ && event_dispatcher_->is_connected()) {
+ event_handler_->OnConnectionChannelsConnected(this);
+ }
}
void WebrtcConnectionToClient::OnChannelError(
diff --git a/remoting/remoting_srcs.gypi b/remoting/remoting_srcs.gypi
index 5c2db03..cb4b1ff 100644
--- a/remoting/remoting_srcs.gypi
+++ b/remoting/remoting_srcs.gypi
@@ -139,6 +139,7 @@
'protocol/me2me_host_authenticator_factory.h',
'protocol/message_decoder.cc',
'protocol/message_decoder.h',
+ 'protocol/message_pipe.h',
'protocol/message_reader.cc',
'protocol/message_reader.h',
'protocol/message_serialization.cc',
@@ -187,6 +188,8 @@
'protocol/ssl_hmac_channel_authenticator.cc',
'protocol/ssl_hmac_channel_authenticator.h',
'protocol/stream_channel_factory.h',
+ 'protocol/stream_message_pipe_adapter.cc',
+ 'protocol/stream_message_pipe_adapter.h',
'protocol/third_party_authenticator_base.cc',
'protocol/third_party_authenticator_base.h',
'protocol/third_party_client_authenticator.cc',