summaryrefslogtreecommitdiffstats
path: root/remoting
diff options
context:
space:
mode:
authorsergeyu <sergeyu@chromium.org>2015-01-10 13:44:09 -0800
committerCommit bot <commit-bot@chromium.org>2015-01-10 21:44:59 +0000
commit203832d43af2d2c0cb92ca2a6ad3e59193c6836d (patch)
tree44cb36fdca1dcb557781260a676cf374ad0ff94b /remoting
parentf4f051fe183f2595bb09d6ddcc9da895460c4429 (diff)
downloadchromium_src-203832d43af2d2c0cb92ca2a6ad3e59193c6836d.zip
chromium_src-203832d43af2d2c0cb92ca2a6ad3e59193c6836d.tar.gz
chromium_src-203832d43af2d2c0cb92ca2a6ad3e59193c6836d.tar.bz2
Cleanup channel dispatchers
Now ChannelDispatcherBase handles initialization of reader and writer, which makes all dispatcher classes simpler. ChannelDispatcherBase now can handle writer errors. Also renamed ProtobufMessageReader -> ProtobufMessageParser. Review URL: https://codereview.chromium.org/841773005 Cr-Commit-Position: refs/heads/master@{#310993}
Diffstat (limited to 'remoting')
-rw-r--r--remoting/protocol/audio_reader.cc24
-rw-r--r--remoting/protocol/audio_reader.h30
-rw-r--r--remoting/protocol/audio_writer.cc8
-rw-r--r--remoting/protocol/audio_writer.h5
-rw-r--r--remoting/protocol/channel_dispatcher_base.cc23
-rw-r--r--remoting/protocol/channel_dispatcher_base.h33
-rw-r--r--remoting/protocol/channel_multiplexer.cc7
-rw-r--r--remoting/protocol/channel_multiplexer.h4
-rw-r--r--remoting/protocol/client_control_dispatcher.cc30
-rw-r--r--remoting/protocol/client_control_dispatcher.h9
-rw-r--r--remoting/protocol/client_event_dispatcher.cc13
-rw-r--r--remoting/protocol/client_event_dispatcher.h6
-rw-r--r--remoting/protocol/client_video_dispatcher.cc9
-rw-r--r--remoting/protocol/client_video_dispatcher.h11
-rw-r--r--remoting/protocol/connection_to_client.cc51
-rw-r--r--remoting/protocol/connection_to_client.h11
-rw-r--r--remoting/protocol/connection_to_host.cc46
-rw-r--r--remoting/protocol/connection_to_host.h10
-rw-r--r--remoting/protocol/host_control_dispatcher.cc22
-rw-r--r--remoting/protocol/host_control_dispatcher.h9
-rw-r--r--remoting/protocol/host_event_dispatcher.cc14
-rw-r--r--remoting/protocol/host_event_dispatcher.h8
-rw-r--r--remoting/protocol/host_video_dispatcher.cc7
-rw-r--r--remoting/protocol/host_video_dispatcher.h6
-rw-r--r--remoting/protocol/message_reader.cc22
-rw-r--r--remoting/protocol/message_reader.h51
-rw-r--r--remoting/protocol/message_reader_unittest.cc9
-rw-r--r--remoting/protocol/protobuf_message_parser.h62
28 files changed, 243 insertions, 297 deletions
diff --git a/remoting/protocol/audio_reader.cc b/remoting/protocol/audio_reader.cc
index c4f036d..02f8eb1 100644
--- a/remoting/protocol/audio_reader.cc
+++ b/remoting/protocol/audio_reader.cc
@@ -13,31 +13,15 @@
namespace remoting {
namespace protocol {
-AudioReader::AudioReader(AudioPacket::Encoding encoding)
+AudioReader::AudioReader(AudioStub* audio_stub)
: ChannelDispatcherBase(kAudioChannelName),
- encoding_(encoding),
- audio_stub_(nullptr) {
+ parser_(base::Bind(&AudioStub::ProcessAudioPacket,
+ base::Unretained(audio_stub)),
+ reader()) {
}
AudioReader::~AudioReader() {
}
-// static
-scoped_ptr<AudioReader> AudioReader::Create(const SessionConfig& config) {
- if (!config.is_audio_enabled())
- return nullptr;
- return make_scoped_ptr(new AudioReader(AudioPacket::ENCODING_RAW));
-}
-
-void AudioReader::OnInitialized() {
- reader_.Init(channel(), base::Bind(&AudioReader::OnNewData,
- base::Unretained(this)));
-}
-
-void AudioReader::OnNewData(scoped_ptr<AudioPacket> packet,
- const base::Closure& done_task) {
- audio_stub_->ProcessAudioPacket(packet.Pass(), done_task);
-}
-
} // namespace protocol
} // namespace remoting
diff --git a/remoting/protocol/audio_reader.h b/remoting/protocol/audio_reader.h
index e78392b..c4bced7 100644
--- a/remoting/protocol/audio_reader.h
+++ b/remoting/protocol/audio_reader.h
@@ -5,46 +5,22 @@
#ifndef REMOTING_PROTOCOL_AUDIO_READER_H_
#define REMOTING_PROTOCOL_AUDIO_READER_H_
-#include "base/callback.h"
#include "base/compiler_specific.h"
#include "remoting/proto/audio.pb.h"
#include "remoting/protocol/audio_stub.h"
-#include "remoting/protocol/message_reader.h"
#include "remoting/protocol/channel_dispatcher_base.h"
-
-namespace net {
-class StreamSocket;
-} // namespace net
+#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
-class Session;
-class SessionConfig;
-
class AudioReader : public ChannelDispatcherBase {
public:
- static scoped_ptr<AudioReader> Create(const SessionConfig& config);
-
+ explicit AudioReader(AudioStub* audio_stub);
~AudioReader() override;
- void set_audio_stub(AudioStub* audio_stub) { audio_stub_ = audio_stub; }
-
- protected:
- void OnInitialized() override;
-
private:
- explicit AudioReader(AudioPacket::Encoding encoding);
-
- void OnNewData(scoped_ptr<AudioPacket> packet,
- const base::Closure& done_task);
-
- AudioPacket::Encoding encoding_;
-
- ProtobufMessageReader<AudioPacket> reader_;
-
- // The stub that processes all received packets.
- AudioStub* audio_stub_;
+ ProtobufMessageParser<AudioPacket> parser_;
DISALLOW_COPY_AND_ASSIGN(AudioReader);
};
diff --git a/remoting/protocol/audio_writer.cc b/remoting/protocol/audio_writer.cc
index a6c8f4b..be60d2e 100644
--- a/remoting/protocol/audio_writer.cc
+++ b/remoting/protocol/audio_writer.cc
@@ -22,15 +22,9 @@ AudioWriter::AudioWriter()
AudioWriter::~AudioWriter() {
}
-void AudioWriter::OnInitialized() {
- // TODO(sergeyu): Provide a non-null WriteFailedCallback for the writer.
- buffered_writer_.Init(
- channel(), BufferedSocketWriter::WriteFailedCallback());
-}
-
void AudioWriter::ProcessAudioPacket(scoped_ptr<AudioPacket> packet,
const base::Closure& done) {
- buffered_writer_.Write(SerializeAndFrameMessage(*packet), done);
+ writer()->Write(SerializeAndFrameMessage(*packet), done);
}
// static
diff --git a/remoting/protocol/audio_writer.h b/remoting/protocol/audio_writer.h
index c5cac15..fb2f68c 100644
--- a/remoting/protocol/audio_writer.h
+++ b/remoting/protocol/audio_writer.h
@@ -38,14 +38,9 @@ class AudioWriter : public ChannelDispatcherBase,
void ProcessAudioPacket(scoped_ptr<AudioPacket> packet,
const base::Closure& done) override;
- protected:
- void OnInitialized() override;
-
private:
AudioWriter();
- BufferedSocketWriter buffered_writer_;
-
DISALLOW_COPY_AND_ASSIGN(AudioWriter);
};
diff --git a/remoting/protocol/channel_dispatcher_base.cc b/remoting/protocol/channel_dispatcher_base.cc
index 0209cc2..ee90ee5 100644
--- a/remoting/protocol/channel_dispatcher_base.cc
+++ b/remoting/protocol/channel_dispatcher_base.cc
@@ -15,17 +15,19 @@ namespace protocol {
ChannelDispatcherBase::ChannelDispatcherBase(const char* channel_name)
: channel_name_(channel_name),
- channel_factory_(nullptr) {
+ channel_factory_(nullptr),
+ event_handler_(nullptr) {
}
ChannelDispatcherBase::~ChannelDispatcherBase() {
+ writer()->Close();
if (channel_factory_)
channel_factory_->CancelChannelCreation(channel_name_);
}
void ChannelDispatcherBase::Init(Session* session,
const ChannelConfig& config,
- const InitializedCallback& callback) {
+ EventHandler* event_handler) {
DCHECK(session);
switch (config.transport) {
case ChannelConfig::TRANSPORT_MUX_STREAM:
@@ -37,12 +39,10 @@ void ChannelDispatcherBase::Init(Session* session,
break;
default:
- NOTREACHED();
- callback.Run(false);
- return;
+ LOG(FATAL) << "Unknown transport type: " << config.transport;
}
- initialized_callback_ = callback;
+ event_handler_ = event_handler;
channel_factory_->CreateChannel(channel_name_, base::Bind(
&ChannelDispatcherBase::OnChannelReady, base::Unretained(this)));
@@ -51,16 +51,21 @@ void ChannelDispatcherBase::Init(Session* session,
void ChannelDispatcherBase::OnChannelReady(
scoped_ptr<net::StreamSocket> socket) {
if (!socket.get()) {
- initialized_callback_.Run(false);
+ event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR);
return;
}
channel_factory_ = nullptr;
channel_ = socket.Pass();
+ writer_.Init(channel_.get(), base::Bind(&ChannelDispatcherBase::OnWriteFailed,
+ base::Unretained(this)));
+ reader_.StartReading(channel_.get());
- OnInitialized();
+ event_handler_->OnChannelInitialized(this);
+}
- initialized_callback_.Run(true);
+void ChannelDispatcherBase::OnWriteFailed(int error) {
+ event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR);
}
} // namespace protocol
diff --git a/remoting/protocol/channel_dispatcher_base.h b/remoting/protocol/channel_dispatcher_base.h
index 705b224..0c80b30 100644
--- a/remoting/protocol/channel_dispatcher_base.h
+++ b/remoting/protocol/channel_dispatcher_base.h
@@ -10,6 +10,9 @@
#include "base/basictypes.h"
#include "base/callback.h"
#include "base/memory/scoped_ptr.h"
+#include "remoting/protocol/buffered_socket_writer.h"
+#include "remoting/protocol/errors.h"
+#include "remoting/protocol/message_reader.h"
namespace net {
class StreamSocket;
@@ -28,6 +31,17 @@ class Session;
// messages.
class ChannelDispatcherBase {
public:
+ class EventHandler {
+ public:
+ EventHandler() {}
+ virtual ~EventHandler() {}
+
+ virtual void OnChannelInitialized(
+ ChannelDispatcherBase* channel_dispatcher) = 0;
+ virtual void OnChannelError(ChannelDispatcherBase* channel_dispatcher,
+ ErrorCode error) = 0;
+ };
+
// The callback is called when initialization is finished. The
// parameter is set to true on success.
typedef base::Callback<void(bool)> InitializedCallback;
@@ -38,28 +52,31 @@ class ChannelDispatcherBase {
// |session|. Caller retains ownership of the Session.
void Init(Session* session,
const ChannelConfig& config,
- const InitializedCallback& callback);
+ EventHandler* event_handler);
+
+ const std::string& channel_name() { return channel_name_; }
// Returns true if the channel is currently connected.
- bool is_connected() { return channel() != nullptr; }
+ bool is_connected() { return channel_ != nullptr; }
protected:
explicit ChannelDispatcherBase(const char* channel_name);
- net::StreamSocket* channel() { return channel_.get(); }
-
- // Called when channel is initialized. Must be overriden in the
- // child classes. Should not delete the dispatcher.
- virtual void OnInitialized() = 0;
+ BufferedSocketWriter* writer() { return &writer_; }
+ MessageReader* reader() { return &reader_; }
private:
void OnChannelReady(scoped_ptr<net::StreamSocket> socket);
+ void OnWriteFailed(int error);
std::string channel_name_;
StreamChannelFactory* channel_factory_;
- InitializedCallback initialized_callback_;
+ EventHandler* event_handler_;
scoped_ptr<net::StreamSocket> channel_;
+ BufferedSocketWriter writer_;
+ MessageReader reader_;
+
DISALLOW_COPY_AND_ASSIGN(ChannelDispatcherBase);
};
diff --git a/remoting/protocol/channel_multiplexer.cc b/remoting/protocol/channel_multiplexer.cc
index 62bb9d2..8d885fe 100644
--- a/remoting/protocol/channel_multiplexer.cc
+++ b/remoting/protocol/channel_multiplexer.cc
@@ -348,6 +348,9 @@ ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory,
: base_channel_factory_(factory),
base_channel_name_(base_channel_name),
next_channel_id_(0),
+ parser_(base::Bind(&ChannelMultiplexer::OnIncomingPacket,
+ base::Unretained(this)),
+ &reader_),
weak_factory_(this) {
}
@@ -400,9 +403,7 @@ void ChannelMultiplexer::OnBaseChannelReady(
if (base_channel_.get()) {
// Initialize reader and writer.
- reader_.Init(base_channel_.get(),
- base::Bind(&ChannelMultiplexer::OnIncomingPacket,
- base::Unretained(this)));
+ reader_.StartReading(base_channel_.get());
writer_.Init(base_channel_.get(),
base::Bind(&ChannelMultiplexer::OnWriteFailed,
base::Unretained(this)));
diff --git a/remoting/protocol/channel_multiplexer.h b/remoting/protocol/channel_multiplexer.h
index 2b93d2e..c83b0f3 100644
--- a/remoting/protocol/channel_multiplexer.h
+++ b/remoting/protocol/channel_multiplexer.h
@@ -9,6 +9,7 @@
#include "remoting/proto/mux.pb.h"
#include "remoting/protocol/buffered_socket_writer.h"
#include "remoting/protocol/message_reader.h"
+#include "remoting/protocol/protobuf_message_parser.h"
#include "remoting/protocol/stream_channel_factory.h"
namespace remoting {
@@ -78,7 +79,8 @@ class ChannelMultiplexer : public StreamChannelFactory {
std::map<int, MuxChannel*> channels_by_receive_id_;
BufferedSocketWriter writer_;
- ProtobufMessageReader<MultiplexPacket> reader_;
+ MessageReader reader_;
+ ProtobufMessageParser<MultiplexPacket> parser_;
base::WeakPtrFactory<ChannelMultiplexer> weak_factory_;
diff --git a/remoting/protocol/client_control_dispatcher.cc b/remoting/protocol/client_control_dispatcher.cc
index f08e6cf..d13e53f 100644
--- a/remoting/protocol/client_control_dispatcher.cc
+++ b/remoting/protocol/client_control_dispatcher.cc
@@ -61,69 +61,65 @@ bool CursorShapeIsValid(const CursorShapeInfo& cursor_shape) {
ClientControlDispatcher::ClientControlDispatcher()
: ChannelDispatcherBase(kControlChannelName),
client_stub_(nullptr),
- clipboard_stub_(nullptr) {
+ clipboard_stub_(nullptr),
+ parser_(base::Bind(&ClientControlDispatcher::OnMessageReceived,
+ base::Unretained(this)),
+ reader()) {
}
ClientControlDispatcher::~ClientControlDispatcher() {
- writer_.Close();
-}
-
-void ClientControlDispatcher::OnInitialized() {
- // TODO(garykac): Set write failed callback.
- writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback());
- reader_.Init(channel(), base::Bind(
- &ClientControlDispatcher::OnMessageReceived, base::Unretained(this)));
}
void ClientControlDispatcher::InjectClipboardEvent(
const ClipboardEvent& event) {
ControlMessage message;
message.mutable_clipboard_event()->CopyFrom(event);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::NotifyClientResolution(
const ClientResolution& resolution) {
ControlMessage message;
message.mutable_client_resolution()->CopyFrom(resolution);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::ControlVideo(const VideoControl& video_control) {
ControlMessage message;
message.mutable_video_control()->CopyFrom(video_control);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::ControlAudio(const AudioControl& audio_control) {
ControlMessage message;
message.mutable_audio_control()->CopyFrom(audio_control);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::SetCapabilities(
const Capabilities& capabilities) {
ControlMessage message;
message.mutable_capabilities()->CopyFrom(capabilities);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::RequestPairing(
const PairingRequest& pairing_request) {
ControlMessage message;
message.mutable_pairing_request()->CopyFrom(pairing_request);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientControlDispatcher::DeliverClientMessage(
const ExtensionMessage& message) {
ControlMessage control_message;
control_message.mutable_extension_message()->CopyFrom(message);
- writer_.Write(SerializeAndFrameMessage(control_message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(control_message), base::Closure());
}
void ClientControlDispatcher::OnMessageReceived(
- scoped_ptr<ControlMessage> message, const base::Closure& done_task) {
+ scoped_ptr<ControlMessage> message,
+ const base::Closure& done_task) {
DCHECK(client_stub_);
DCHECK(clipboard_stub_);
base::ScopedClosureRunner done_runner(done_task);
diff --git a/remoting/protocol/client_control_dispatcher.h b/remoting/protocol/client_control_dispatcher.h
index e4612c0..dc9035a 100644
--- a/remoting/protocol/client_control_dispatcher.h
+++ b/remoting/protocol/client_control_dispatcher.h
@@ -11,7 +11,7 @@
#include "remoting/protocol/clipboard_stub.h"
#include "remoting/protocol/cursor_shape_stub.h"
#include "remoting/protocol/host_stub.h"
-#include "remoting/protocol/message_reader.h"
+#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
@@ -51,10 +51,6 @@ class ClientControlDispatcher : public ChannelDispatcherBase,
clipboard_stub_ = clipboard_stub;
}
- protected:
- // ChannelDispatcherBase overrides.
- void OnInitialized() override;
-
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message,
const base::Closure& done_task);
@@ -62,8 +58,7 @@ class ClientControlDispatcher : public ChannelDispatcherBase,
ClientStub* client_stub_;
ClipboardStub* clipboard_stub_;
- ProtobufMessageReader<ControlMessage> reader_;
- BufferedSocketWriter writer_;
+ ProtobufMessageParser<ControlMessage> parser_;
DISALLOW_COPY_AND_ASSIGN(ClientControlDispatcher);
};
diff --git a/remoting/protocol/client_event_dispatcher.cc b/remoting/protocol/client_event_dispatcher.cc
index 983cba6..89e4bcf 100644
--- a/remoting/protocol/client_event_dispatcher.cc
+++ b/remoting/protocol/client_event_dispatcher.cc
@@ -20,13 +20,6 @@ ClientEventDispatcher::ClientEventDispatcher()
}
ClientEventDispatcher::~ClientEventDispatcher() {
- writer_.Close();
-}
-
-void ClientEventDispatcher::OnInitialized() {
- // TODO(garykac): Set write failed callback.
- writer_.Init(channel(),
- BufferedSocketWriter::WriteFailedCallback());
}
void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
@@ -35,7 +28,7 @@ void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) {
EventMessage message;
message.set_timestamp(base::Time::Now().ToInternalValue());
message.mutable_key_event()->CopyFrom(event);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) {
@@ -43,14 +36,14 @@ void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) {
EventMessage message;
message.set_timestamp(base::Time::Now().ToInternalValue());
message.mutable_text_event()->CopyFrom(event);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void ClientEventDispatcher::InjectMouseEvent(const MouseEvent& event) {
EventMessage message;
message.set_timestamp(base::Time::Now().ToInternalValue());
message.mutable_mouse_event()->CopyFrom(event);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
} // namespace protocol
diff --git a/remoting/protocol/client_event_dispatcher.h b/remoting/protocol/client_event_dispatcher.h
index ca61740..6a3c54f 100644
--- a/remoting/protocol/client_event_dispatcher.h
+++ b/remoting/protocol/client_event_dispatcher.h
@@ -25,13 +25,7 @@ class ClientEventDispatcher : public ChannelDispatcherBase, public InputStub {
void InjectTextEvent(const TextEvent& event) override;
void InjectMouseEvent(const MouseEvent& event) override;
- protected:
- // ChannelDispatcherBase overrides.
- void OnInitialized() override;
-
private:
- BufferedSocketWriter writer_;
-
DISALLOW_COPY_AND_ASSIGN(ClientEventDispatcher);
};
diff --git a/remoting/protocol/client_video_dispatcher.cc b/remoting/protocol/client_video_dispatcher.cc
index bca586c..287bba3 100644
--- a/remoting/protocol/client_video_dispatcher.cc
+++ b/remoting/protocol/client_video_dispatcher.cc
@@ -15,16 +15,13 @@ namespace protocol {
ClientVideoDispatcher::ClientVideoDispatcher(VideoStub* video_stub)
: ChannelDispatcherBase(kVideoChannelName),
- video_stub_(video_stub) {
+ parser_(base::Bind(&VideoStub::ProcessVideoPacket,
+ base::Unretained(video_stub)),
+ reader()) {
}
ClientVideoDispatcher::~ClientVideoDispatcher() {
}
-void ClientVideoDispatcher::OnInitialized() {
- reader_.Init(channel(), base::Bind(&VideoStub::ProcessVideoPacket,
- base::Unretained(video_stub_)));
-}
-
} // namespace protocol
} // namespace remoting
diff --git a/remoting/protocol/client_video_dispatcher.h b/remoting/protocol/client_video_dispatcher.h
index fb726f0..b2b1e31 100644
--- a/remoting/protocol/client_video_dispatcher.h
+++ b/remoting/protocol/client_video_dispatcher.h
@@ -8,7 +8,7 @@
#include "base/compiler_specific.h"
#include "remoting/proto/video.pb.h"
#include "remoting/protocol/channel_dispatcher_base.h"
-#include "remoting/protocol/message_reader.h"
+#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
@@ -20,15 +20,8 @@ class ClientVideoDispatcher : public ChannelDispatcherBase {
explicit ClientVideoDispatcher(VideoStub* video_stub);
~ClientVideoDispatcher() override;
- protected:
- // ChannelDispatcherBase overrides.
- void OnInitialized() override;
-
private:
- ProtobufMessageReader<VideoPacket> reader_;
-
- // The stub to which VideoPackets are passed for processing.
- VideoStub* video_stub_;
+ ProtobufMessageParser<VideoPacket> parser_;
DISALLOW_COPY_AND_ASSIGN(ClientVideoDispatcher);
};
diff --git a/remoting/protocol/connection_to_client.cc b/remoting/protocol/connection_to_client.cc
index 536fa75..6894a9c 100644
--- a/remoting/protocol/connection_to_client.cc
+++ b/remoting/protocol/connection_to_client.cc
@@ -119,34 +119,26 @@ void ConnectionToClient::OnSessionStateChange(Session::State state) {
case Session::AUTHENTICATED:
// Initialize channels.
control_dispatcher_.reset(new HostControlDispatcher());
- control_dispatcher_->Init(
- session_.get(), session_->config().control_config(),
- base::Bind(&ConnectionToClient::OnChannelInitialized,
- base::Unretained(this)));
+ control_dispatcher_->Init(session_.get(),
+ session_->config().control_config(), this);
control_dispatcher_->set_clipboard_stub(clipboard_stub_);
control_dispatcher_->set_host_stub(host_stub_);
event_dispatcher_.reset(new HostEventDispatcher());
- event_dispatcher_->Init(
- session_.get(), session_->config().event_config(),
- base::Bind(&ConnectionToClient::OnChannelInitialized,
- base::Unretained(this)));
+ event_dispatcher_->Init(session_.get(), session_->config().event_config(),
+ this);
event_dispatcher_->set_input_stub(input_stub_);
event_dispatcher_->set_event_timestamp_callback(base::Bind(
&ConnectionToClient::OnEventTimestamp, base::Unretained(this)));
video_dispatcher_.reset(new HostVideoDispatcher());
- video_dispatcher_->Init(
- session_.get(), session_->config().video_config(),
- base::Bind(&ConnectionToClient::OnChannelInitialized,
- base::Unretained(this)));
+ video_dispatcher_->Init(session_.get(), session_->config().video_config(),
+ this);
audio_writer_ = AudioWriter::Create(session_->config());
if (audio_writer_.get()) {
- audio_writer_->Init(
- session_.get(), session_->config().audio_config(),
- base::Bind(&ConnectionToClient::OnChannelInitialized,
- base::Unretained(this)));
+ audio_writer_->Init(session_.get(), session_->config().audio_config(),
+ this);
}
// Notify the handler after initializing the channels, so that
@@ -170,28 +162,33 @@ void ConnectionToClient::OnSessionRouteChange(
handler_->OnRouteChange(this, channel_name, route);
}
-void ConnectionToClient::OnChannelInitialized(bool successful) {
+void ConnectionToClient::OnChannelInitialized(
+ ChannelDispatcherBase* channel_dispatcher) {
DCHECK(CalledOnValidThread());
- if (!successful) {
- LOG(ERROR) << "Failed to connect a channel";
- Close(CHANNEL_CONNECTION_ERROR);
- return;
- }
-
NotifyIfChannelsReady();
}
+void ConnectionToClient::OnChannelError(
+ ChannelDispatcherBase* channel_dispatcher,
+ ErrorCode error) {
+ DCHECK(CalledOnValidThread());
+
+ LOG(ERROR) << "Failed to connect channel "
+ << channel_dispatcher->channel_name();
+ Close(CHANNEL_CONNECTION_ERROR);
+}
+
void ConnectionToClient::NotifyIfChannelsReady() {
DCHECK(CalledOnValidThread());
- if (!control_dispatcher_.get() || !control_dispatcher_->is_connected())
+ if (!control_dispatcher_ || !control_dispatcher_->is_connected())
return;
- if (!event_dispatcher_.get() || !event_dispatcher_->is_connected())
+ if (!event_dispatcher_ || !event_dispatcher_->is_connected())
return;
- if (!video_dispatcher_.get() || !video_dispatcher_->is_connected())
+ if (!video_dispatcher_ || !video_dispatcher_->is_connected())
return;
- if ((!audio_writer_.get() || !audio_writer_->is_connected()) &&
+ if ((!audio_writer_ || !audio_writer_->is_connected()) &&
session_->config().is_audio_enabled()) {
return;
}
diff --git a/remoting/protocol/connection_to_client.h b/remoting/protocol/connection_to_client.h
index 8f075b7..11f4a34 100644
--- a/remoting/protocol/connection_to_client.h
+++ b/remoting/protocol/connection_to_client.h
@@ -31,7 +31,8 @@ class VideoStub;
// host. It sets up all protocol channels and connects them to the
// stubs.
class ConnectionToClient : public base::NonThreadSafe,
- public Session::EventHandler {
+ public Session::EventHandler,
+ public ChannelDispatcherBase::EventHandler {
public:
class EventHandler {
public:
@@ -105,10 +106,12 @@ class ConnectionToClient : public base::NonThreadSafe,
void OnSessionRouteChange(const std::string& channel_name,
const TransportRoute& route) override;
- private:
- // Callback for channel initialization.
- void OnChannelInitialized(bool successful);
+ // ChannelDispatcherBase::EventHandler interface.
+ void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
+ void OnChannelError(ChannelDispatcherBase* channel_dispatcher,
+ ErrorCode error) override;
+ private:
void NotifyIfChannelsReady();
void Close(ErrorCode error);
diff --git a/remoting/protocol/connection_to_host.cc b/remoting/protocol/connection_to_host.cc
index 9217202..e2346e6 100644
--- a/remoting/protocol/connection_to_host.cc
+++ b/remoting/protocol/connection_to_host.cc
@@ -184,31 +184,24 @@ void ConnectionToHost::OnSessionStateChange(
SetState(AUTHENTICATED, OK);
control_dispatcher_.reset(new ClientControlDispatcher());
- control_dispatcher_->Init(
- session_.get(), session_->config().control_config(),
- base::Bind(&ConnectionToHost::OnChannelInitialized,
- base::Unretained(this)));
+ control_dispatcher_->Init(session_.get(),
+ session_->config().control_config(), this);
control_dispatcher_->set_client_stub(client_stub_);
control_dispatcher_->set_clipboard_stub(clipboard_stub_);
event_dispatcher_.reset(new ClientEventDispatcher());
- event_dispatcher_->Init(
- session_.get(), session_->config().event_config(),
- base::Bind(&ConnectionToHost::OnChannelInitialized,
- base::Unretained(this)));
+ event_dispatcher_->Init(session_.get(), session_->config().event_config(),
+ this);
video_dispatcher_.reset(
new ClientVideoDispatcher(monitored_video_stub_.get()));
video_dispatcher_->Init(session_.get(), session_->config().video_config(),
- base::Bind(&ConnectionToHost::OnChannelInitialized,
- base::Unretained(this)));
+ this);
- audio_reader_ = AudioReader::Create(session_->config());
- if (audio_reader_.get()) {
+ if (session_->config().is_audio_enabled()) {
+ audio_reader_.reset(new AudioReader(audio_stub_));
audio_reader_->Init(session_.get(), session_->config().audio_config(),
- base::Bind(&ConnectionToHost::OnChannelInitialized,
- base::Unretained(this)));
- audio_reader_->set_audio_stub(audio_stub_);
+ this);
}
break;
@@ -241,6 +234,19 @@ void ConnectionToHost::OnSessionRouteChange(const std::string& channel_name,
event_callback_->OnRouteChanged(channel_name, route);
}
+void ConnectionToHost::OnChannelInitialized(
+ ChannelDispatcherBase* channel_dispatcher) {
+ NotifyIfChannelsReady();
+}
+
+void ConnectionToHost::OnChannelError(
+ ChannelDispatcherBase* channel_dispatcher,
+ ErrorCode error) {
+ LOG(ERROR) << "Failed to connect channel " << channel_dispatcher;
+ CloseOnError(CHANNEL_CONNECTION_ERROR);
+ return;
+}
+
void ConnectionToHost::OnVideoChannelStatus(bool active) {
event_callback_->OnConnectionReady(active);
}
@@ -249,16 +255,6 @@ ConnectionToHost::State ConnectionToHost::state() const {
return state_;
}
-void ConnectionToHost::OnChannelInitialized(bool successful) {
- if (!successful) {
- LOG(ERROR) << "Failed to connect video channel";
- CloseOnError(CHANNEL_CONNECTION_ERROR);
- return;
- }
-
- NotifyIfChannelsReady();
-}
-
void ConnectionToHost::NotifyIfChannelsReady() {
if (!control_dispatcher_.get() || !control_dispatcher_->is_connected())
return;
diff --git a/remoting/protocol/connection_to_host.h b/remoting/protocol/connection_to_host.h
index 7bc648a..605bea3 100644
--- a/remoting/protocol/connection_to_host.h
+++ b/remoting/protocol/connection_to_host.h
@@ -13,6 +13,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/threading/non_thread_safe.h"
#include "remoting/proto/internal.pb.h"
+#include "remoting/protocol/channel_dispatcher_base.h"
#include "remoting/protocol/clipboard_filter.h"
#include "remoting/protocol/errors.h"
#include "remoting/protocol/input_filter.h"
@@ -47,6 +48,7 @@ class VideoStub;
class ConnectionToHost : public SignalStrategy::Listener,
public SessionManager::Listener,
public Session::EventHandler,
+ public ChannelDispatcherBase::EventHandler,
public base::NonThreadSafe {
public:
// The UI implementations maintain corresponding definitions of this
@@ -133,6 +135,11 @@ class ConnectionToHost : public SignalStrategy::Listener,
void OnSessionRouteChange(const std::string& channel_name,
const TransportRoute& route) override;
+ // ChannelDispatcherBase::EventHandler interface.
+ void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override;
+ void OnChannelError(ChannelDispatcherBase* channel_dispatcher,
+ ErrorCode error) override;
+
// MonitoredVideoStub::EventHandler interface.
virtual void OnVideoChannelStatus(bool active);
@@ -140,9 +147,6 @@ class ConnectionToHost : public SignalStrategy::Listener,
State state() const;
private:
- // Callbacks for channel initialization
- void OnChannelInitialized(bool successful);
-
void NotifyIfChannelsReady();
void CloseOnError(ErrorCode error);
diff --git a/remoting/protocol/host_control_dispatcher.cc b/remoting/protocol/host_control_dispatcher.cc
index 0c19d17..e323d09 100644
--- a/remoting/protocol/host_control_dispatcher.cc
+++ b/remoting/protocol/host_control_dispatcher.cc
@@ -20,51 +20,47 @@ namespace protocol {
HostControlDispatcher::HostControlDispatcher()
: ChannelDispatcherBase(kControlChannelName),
clipboard_stub_(nullptr),
- host_stub_(nullptr) {
+ host_stub_(nullptr),
+ parser_(base::Bind(&HostControlDispatcher::OnMessageReceived,
+ base::Unretained(this)),
+ reader()) {
}
HostControlDispatcher::~HostControlDispatcher() {
- writer_.Close();
-}
-
-void HostControlDispatcher::OnInitialized() {
- reader_.Init(channel(), base::Bind(
- &HostControlDispatcher::OnMessageReceived, base::Unretained(this)));
- writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback());
}
void HostControlDispatcher::SetCapabilities(
const Capabilities& capabilities) {
ControlMessage message;
message.mutable_capabilities()->CopyFrom(capabilities);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void HostControlDispatcher::SetPairingResponse(
const PairingResponse& pairing_response) {
ControlMessage message;
message.mutable_pairing_response()->CopyFrom(pairing_response);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void HostControlDispatcher::DeliverHostMessage(
const ExtensionMessage& message) {
ControlMessage control_message;
control_message.mutable_extension_message()->CopyFrom(message);
- writer_.Write(SerializeAndFrameMessage(control_message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(control_message), base::Closure());
}
void HostControlDispatcher::InjectClipboardEvent(const ClipboardEvent& event) {
ControlMessage message;
message.mutable_clipboard_event()->CopyFrom(event);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void HostControlDispatcher::SetCursorShape(
const CursorShapeInfo& cursor_shape) {
ControlMessage message;
message.mutable_cursor_shape()->CopyFrom(cursor_shape);
- writer_.Write(SerializeAndFrameMessage(message), base::Closure());
+ writer()->Write(SerializeAndFrameMessage(message), base::Closure());
}
void HostControlDispatcher::OnMessageReceived(
diff --git a/remoting/protocol/host_control_dispatcher.h b/remoting/protocol/host_control_dispatcher.h
index eab64c2..3fe8e73 100644
--- a/remoting/protocol/host_control_dispatcher.h
+++ b/remoting/protocol/host_control_dispatcher.h
@@ -10,7 +10,7 @@
#include "remoting/protocol/client_stub.h"
#include "remoting/protocol/clipboard_stub.h"
#include "remoting/protocol/cursor_shape_stub.h"
-#include "remoting/protocol/message_reader.h"
+#include "remoting/protocol/protobuf_message_parser.h"
namespace net {
class StreamSocket;
@@ -54,10 +54,6 @@ class HostControlDispatcher : public ChannelDispatcherBase,
// message. |host_stub| must outlive this object.
void set_host_stub(HostStub* host_stub) { host_stub_ = host_stub; }
- protected:
- // ChannelDispatcherBase overrides.
- void OnInitialized() override;
-
private:
void OnMessageReceived(scoped_ptr<ControlMessage> message,
const base::Closure& done_task);
@@ -65,8 +61,7 @@ class HostControlDispatcher : public ChannelDispatcherBase,
ClipboardStub* clipboard_stub_;
HostStub* host_stub_;
- ProtobufMessageReader<ControlMessage> reader_;
- BufferedSocketWriter writer_;
+ ProtobufMessageParser<ControlMessage> parser_;
DISALLOW_COPY_AND_ASSIGN(HostControlDispatcher);
};
diff --git a/remoting/protocol/host_event_dispatcher.cc b/remoting/protocol/host_event_dispatcher.cc
index 7bc93fb..b1addbd 100644
--- a/remoting/protocol/host_event_dispatcher.cc
+++ b/remoting/protocol/host_event_dispatcher.cc
@@ -16,19 +16,17 @@ namespace protocol {
HostEventDispatcher::HostEventDispatcher()
: ChannelDispatcherBase(kEventChannelName),
- input_stub_(nullptr) {
+ input_stub_(nullptr),
+ parser_(base::Bind(&HostEventDispatcher::OnMessageReceived,
+ base::Unretained(this)),
+ reader()) {
}
HostEventDispatcher::~HostEventDispatcher() {
}
-void HostEventDispatcher::OnInitialized() {
- reader_.Init(channel(), base::Bind(
- &HostEventDispatcher::OnMessageReceived, base::Unretained(this)));
-}
-
-void HostEventDispatcher::OnMessageReceived(
- scoped_ptr<EventMessage> message, const base::Closure& done_task) {
+void HostEventDispatcher::OnMessageReceived(scoped_ptr<EventMessage> message,
+ const base::Closure& done_task) {
DCHECK(input_stub_);
base::ScopedClosureRunner done_runner(done_task);
diff --git a/remoting/protocol/host_event_dispatcher.h b/remoting/protocol/host_event_dispatcher.h
index aaa5fe9..3f1bd88 100644
--- a/remoting/protocol/host_event_dispatcher.h
+++ b/remoting/protocol/host_event_dispatcher.h
@@ -6,7 +6,7 @@
#define REMOTING_PROTOCOL_HOST_EVENT_DISPATCHER_H_
#include "remoting/protocol/channel_dispatcher_base.h"
-#include "remoting/protocol/message_reader.h"
+#include "remoting/protocol/protobuf_message_parser.h"
namespace remoting {
namespace protocol {
@@ -34,10 +34,6 @@ class HostEventDispatcher : public ChannelDispatcherBase {
event_timestamp_callback_ = value;
}
- protected:
- // ChannelDispatcherBase overrides.
- void OnInitialized() override;
-
private:
void OnMessageReceived(scoped_ptr<EventMessage> message,
const base::Closure& done_task);
@@ -45,7 +41,7 @@ class HostEventDispatcher : public ChannelDispatcherBase {
InputStub* input_stub_;
EventTimestampCallback event_timestamp_callback_;
- ProtobufMessageReader<EventMessage> reader_;
+ ProtobufMessageParser<EventMessage> parser_;
DISALLOW_COPY_AND_ASSIGN(HostEventDispatcher);
};
diff --git a/remoting/protocol/host_video_dispatcher.cc b/remoting/protocol/host_video_dispatcher.cc
index 08ac857..b9a12c0 100644
--- a/remoting/protocol/host_video_dispatcher.cc
+++ b/remoting/protocol/host_video_dispatcher.cc
@@ -20,14 +20,9 @@ HostVideoDispatcher::HostVideoDispatcher()
HostVideoDispatcher::~HostVideoDispatcher() {
}
-void HostVideoDispatcher::OnInitialized() {
- // TODO(sergeyu): Provide WriteFailedCallback for the buffered writer.
- writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback());
-}
-
void HostVideoDispatcher::ProcessVideoPacket(scoped_ptr<VideoPacket> packet,
const base::Closure& done) {
- writer_.Write(SerializeAndFrameMessage(*packet), done);
+ writer()->Write(SerializeAndFrameMessage(*packet), done);
}
} // namespace protocol
diff --git a/remoting/protocol/host_video_dispatcher.h b/remoting/protocol/host_video_dispatcher.h
index 36b9c69..c1a9573 100644
--- a/remoting/protocol/host_video_dispatcher.h
+++ b/remoting/protocol/host_video_dispatcher.h
@@ -24,13 +24,7 @@ class HostVideoDispatcher : public ChannelDispatcherBase, public VideoStub {
void ProcessVideoPacket(scoped_ptr<VideoPacket> packet,
const base::Closure& done) override;
- protected:
- // ChannelDispatcherBase overrides.
- void OnInitialized() override;
-
private:
- BufferedSocketWriter writer_;
-
DISALLOW_COPY_AND_ASSIGN(HostVideoDispatcher);
};
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc
index 9528b83..6e3d536 100644
--- a/remoting/protocol/message_reader.cc
+++ b/remoting/protocol/message_reader.cc
@@ -29,18 +29,22 @@ MessageReader::MessageReader()
weak_factory_(this) {
}
-void MessageReader::Init(net::Socket* socket,
- const MessageReceivedCallback& callback) {
+MessageReader::~MessageReader() {
+}
+
+void MessageReader::SetMessageReceivedCallback(
+ const MessageReceivedCallback& callback) {
DCHECK(CalledOnValidThread());
message_received_callback_ = callback;
+}
+
+void MessageReader::StartReading(net::Socket* socket) {
+ DCHECK(CalledOnValidThread());
DCHECK(socket);
socket_ = socket;
DoRead();
}
-MessageReader::~MessageReader() {
-}
-
void MessageReader::DoRead() {
DCHECK(CalledOnValidThread());
// Don't try to read again if there is another read pending or we
@@ -104,9 +108,11 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
}
void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) {
- message_received_callback_.Run(
- message.Pass(), base::Bind(&MessageReader::OnMessageDone,
- weak_factory_.GetWeakPtr()));
+ if (!message_received_callback_.is_null()){
+ message_received_callback_.Run(
+ message.Pass(),
+ base::Bind(&MessageReader::OnMessageDone, weak_factory_.GetWeakPtr()));
+ }
}
void MessageReader::OnMessageDone() {
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
index fb6c92e..65d52d0 100644
--- a/remoting/protocol/message_reader.h
+++ b/remoting/protocol/message_reader.h
@@ -5,12 +5,10 @@
#ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_
#define REMOTING_PROTOCOL_MESSAGE_READER_H_
-#include "base/bind.h"
#include "base/callback.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h"
-#include "net/base/completion_callback.h"
#include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_decoder.h"
@@ -41,9 +39,11 @@ class MessageReader : public base::NonThreadSafe {
MessageReader();
virtual ~MessageReader();
- // Initialize the MessageReader with a socket. If a message is received
- // |callback| is called.
- void Init(net::Socket* socket, const MessageReceivedCallback& callback);
+ // Sets the callback to be called for each incoming message.
+ void SetMessageReceivedCallback(const MessageReceivedCallback& callback);
+
+ // Starts reading from |socket|.
+ void StartReading(net::Socket* socket);
private:
void DoRead();
@@ -77,47 +77,6 @@ class MessageReader : public base::NonThreadSafe {
DISALLOW_COPY_AND_ASSIGN(MessageReader);
};
-// Version of MessageReader for protocol buffer messages, that parses
-// each incoming message.
-template <class T>
-class ProtobufMessageReader {
- public:
- // The callback that is called when a new message is received. |done_task|
- // must be called by the callback when it's done processing the |message|.
- typedef typename base::Callback<void(scoped_ptr<T> message,
- const base::Closure& done_task)>
- MessageReceivedCallback;
-
- ProtobufMessageReader() { };
- ~ProtobufMessageReader() { };
-
- void Init(net::Socket* socket, const MessageReceivedCallback& callback) {
- DCHECK(!callback.is_null());
- message_received_callback_ = callback;
- message_reader_.reset(new MessageReader());
- message_reader_->Init(
- socket, base::Bind(&ProtobufMessageReader<T>::OnNewData,
- base::Unretained(this)));
- }
-
- private:
- void OnNewData(scoped_ptr<CompoundBuffer> buffer,
- const base::Closure& done_task) {
- scoped_ptr<T> message(new T());
- CompoundBufferInputStream stream(buffer.get());
- bool ret = message->ParseFromZeroCopyStream(&stream);
- if (!ret) {
- LOG(WARNING) << "Received message that is not a valid protocol buffer.";
- } else {
- DCHECK_EQ(stream.position(), buffer->total_bytes());
- message_received_callback_.Run(message.Pass(), done_task);
- }
- }
-
- scoped_ptr<MessageReader> message_reader_;
- MessageReceivedCallback message_received_callback_;
-};
-
} // namespace protocol
} // namespace remoting
diff --git a/remoting/protocol/message_reader_unittest.cc b/remoting/protocol/message_reader_unittest.cc
index 5c43788..68bcde6 100644
--- a/remoting/protocol/message_reader_unittest.cc
+++ b/remoting/protocol/message_reader_unittest.cc
@@ -67,13 +67,16 @@ class MessageReaderTest : public testing::Test {
}
protected:
- void SetUp() override { reader_.reset(new MessageReader()); }
+ void SetUp() override {
+ reader_.reset(new MessageReader());
+ }
void TearDown() override { STLDeleteElements(&messages_); }
void InitReader() {
- reader_->Init(&socket_, base::Bind(
- &MessageReaderTest::OnMessage, base::Unretained(this)));
+ reader_->SetMessageReceivedCallback(
+ base::Bind(&MessageReaderTest::OnMessage, base::Unretained(this)));
+ reader_->StartReading(&socket_);
}
void AddMessage(const std::string& message) {
diff --git a/remoting/protocol/protobuf_message_parser.h b/remoting/protocol/protobuf_message_parser.h
new file mode 100644
index 0000000..68840b1
--- /dev/null
+++ b/remoting/protocol/protobuf_message_parser.h
@@ -0,0 +1,62 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_
+#define REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/memory/scoped_ptr.h"
+#include "remoting/base/compound_buffer.h"
+#include "remoting/protocol/message_reader.h"
+
+namespace remoting {
+namespace protocol {
+
+// Version of MessageReader for protocol buffer messages, that parses
+// each incoming message.
+template <class T>
+class ProtobufMessageParser {
+ public:
+ // The callback that is called when a new message is received. |done_task|
+ // must be called by the callback when it's done processing the |message|.
+ typedef typename base::Callback<void(scoped_ptr<T> message,
+ const base::Closure& done_task)>
+ MessageReceivedCallback;
+
+ // |message_reader| must outlive ProtobufMessageParser.
+ ProtobufMessageParser(const MessageReceivedCallback& callback,
+ MessageReader* message_reader)
+ : message_reader_(message_reader),
+ message_received_callback_(callback) {
+ message_reader->SetMessageReceivedCallback(base::Bind(
+ &ProtobufMessageParser<T>::OnNewData, base::Unretained(this)));
+ }
+ ~ProtobufMessageParser() {
+ message_reader_->SetMessageReceivedCallback(
+ MessageReader::MessageReceivedCallback());
+ }
+
+ private:
+ void OnNewData(scoped_ptr<CompoundBuffer> buffer,
+ const base::Closure& done_task) {
+ scoped_ptr<T> message(new T());
+ CompoundBufferInputStream stream(buffer.get());
+ bool ret = message->ParseFromZeroCopyStream(&stream);
+ if (!ret) {
+ LOG(WARNING) << "Received message that is not a valid protocol buffer.";
+ } else {
+ DCHECK_EQ(stream.position(), buffer->total_bytes());
+ message_received_callback_.Run(message.Pass(), done_task);
+ }
+ }
+
+ MessageReader* message_reader_;
+ MessageReceivedCallback message_received_callback_;
+};
+
+} // namespace protocol
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_