diff options
author | sergeyu <sergeyu@chromium.org> | 2015-01-10 13:44:09 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-01-10 21:44:59 +0000 |
commit | 203832d43af2d2c0cb92ca2a6ad3e59193c6836d (patch) | |
tree | 44cb36fdca1dcb557781260a676cf374ad0ff94b /remoting | |
parent | f4f051fe183f2595bb09d6ddcc9da895460c4429 (diff) | |
download | chromium_src-203832d43af2d2c0cb92ca2a6ad3e59193c6836d.zip chromium_src-203832d43af2d2c0cb92ca2a6ad3e59193c6836d.tar.gz chromium_src-203832d43af2d2c0cb92ca2a6ad3e59193c6836d.tar.bz2 |
Cleanup channel dispatchers
Now ChannelDispatcherBase handles initialization of reader and writer,
which makes all dispatcher classes simpler. ChannelDispatcherBase now
can handle writer errors. Also renamed
ProtobufMessageReader -> ProtobufMessageParser.
Review URL: https://codereview.chromium.org/841773005
Cr-Commit-Position: refs/heads/master@{#310993}
Diffstat (limited to 'remoting')
28 files changed, 243 insertions, 297 deletions
diff --git a/remoting/protocol/audio_reader.cc b/remoting/protocol/audio_reader.cc index c4f036d..02f8eb1 100644 --- a/remoting/protocol/audio_reader.cc +++ b/remoting/protocol/audio_reader.cc @@ -13,31 +13,15 @@ namespace remoting { namespace protocol { -AudioReader::AudioReader(AudioPacket::Encoding encoding) +AudioReader::AudioReader(AudioStub* audio_stub) : ChannelDispatcherBase(kAudioChannelName), - encoding_(encoding), - audio_stub_(nullptr) { + parser_(base::Bind(&AudioStub::ProcessAudioPacket, + base::Unretained(audio_stub)), + reader()) { } AudioReader::~AudioReader() { } -// static -scoped_ptr<AudioReader> AudioReader::Create(const SessionConfig& config) { - if (!config.is_audio_enabled()) - return nullptr; - return make_scoped_ptr(new AudioReader(AudioPacket::ENCODING_RAW)); -} - -void AudioReader::OnInitialized() { - reader_.Init(channel(), base::Bind(&AudioReader::OnNewData, - base::Unretained(this))); -} - -void AudioReader::OnNewData(scoped_ptr<AudioPacket> packet, - const base::Closure& done_task) { - audio_stub_->ProcessAudioPacket(packet.Pass(), done_task); -} - } // namespace protocol } // namespace remoting diff --git a/remoting/protocol/audio_reader.h b/remoting/protocol/audio_reader.h index e78392b..c4bced7 100644 --- a/remoting/protocol/audio_reader.h +++ b/remoting/protocol/audio_reader.h @@ -5,46 +5,22 @@ #ifndef REMOTING_PROTOCOL_AUDIO_READER_H_ #define REMOTING_PROTOCOL_AUDIO_READER_H_ -#include "base/callback.h" #include "base/compiler_specific.h" #include "remoting/proto/audio.pb.h" #include "remoting/protocol/audio_stub.h" -#include "remoting/protocol/message_reader.h" #include "remoting/protocol/channel_dispatcher_base.h" - -namespace net { -class StreamSocket; -} // namespace net +#include "remoting/protocol/protobuf_message_parser.h" namespace remoting { namespace protocol { -class Session; -class SessionConfig; - class AudioReader : public ChannelDispatcherBase { public: - static scoped_ptr<AudioReader> Create(const SessionConfig& config); - + explicit AudioReader(AudioStub* audio_stub); ~AudioReader() override; - void set_audio_stub(AudioStub* audio_stub) { audio_stub_ = audio_stub; } - - protected: - void OnInitialized() override; - private: - explicit AudioReader(AudioPacket::Encoding encoding); - - void OnNewData(scoped_ptr<AudioPacket> packet, - const base::Closure& done_task); - - AudioPacket::Encoding encoding_; - - ProtobufMessageReader<AudioPacket> reader_; - - // The stub that processes all received packets. - AudioStub* audio_stub_; + ProtobufMessageParser<AudioPacket> parser_; DISALLOW_COPY_AND_ASSIGN(AudioReader); }; diff --git a/remoting/protocol/audio_writer.cc b/remoting/protocol/audio_writer.cc index a6c8f4b..be60d2e 100644 --- a/remoting/protocol/audio_writer.cc +++ b/remoting/protocol/audio_writer.cc @@ -22,15 +22,9 @@ AudioWriter::AudioWriter() AudioWriter::~AudioWriter() { } -void AudioWriter::OnInitialized() { - // TODO(sergeyu): Provide a non-null WriteFailedCallback for the writer. - buffered_writer_.Init( - channel(), BufferedSocketWriter::WriteFailedCallback()); -} - void AudioWriter::ProcessAudioPacket(scoped_ptr<AudioPacket> packet, const base::Closure& done) { - buffered_writer_.Write(SerializeAndFrameMessage(*packet), done); + writer()->Write(SerializeAndFrameMessage(*packet), done); } // static diff --git a/remoting/protocol/audio_writer.h b/remoting/protocol/audio_writer.h index c5cac15..fb2f68c 100644 --- a/remoting/protocol/audio_writer.h +++ b/remoting/protocol/audio_writer.h @@ -38,14 +38,9 @@ class AudioWriter : public ChannelDispatcherBase, void ProcessAudioPacket(scoped_ptr<AudioPacket> packet, const base::Closure& done) override; - protected: - void OnInitialized() override; - private: AudioWriter(); - BufferedSocketWriter buffered_writer_; - DISALLOW_COPY_AND_ASSIGN(AudioWriter); }; diff --git a/remoting/protocol/channel_dispatcher_base.cc b/remoting/protocol/channel_dispatcher_base.cc index 0209cc2..ee90ee5 100644 --- a/remoting/protocol/channel_dispatcher_base.cc +++ b/remoting/protocol/channel_dispatcher_base.cc @@ -15,17 +15,19 @@ namespace protocol { ChannelDispatcherBase::ChannelDispatcherBase(const char* channel_name) : channel_name_(channel_name), - channel_factory_(nullptr) { + channel_factory_(nullptr), + event_handler_(nullptr) { } ChannelDispatcherBase::~ChannelDispatcherBase() { + writer()->Close(); if (channel_factory_) channel_factory_->CancelChannelCreation(channel_name_); } void ChannelDispatcherBase::Init(Session* session, const ChannelConfig& config, - const InitializedCallback& callback) { + EventHandler* event_handler) { DCHECK(session); switch (config.transport) { case ChannelConfig::TRANSPORT_MUX_STREAM: @@ -37,12 +39,10 @@ void ChannelDispatcherBase::Init(Session* session, break; default: - NOTREACHED(); - callback.Run(false); - return; + LOG(FATAL) << "Unknown transport type: " << config.transport; } - initialized_callback_ = callback; + event_handler_ = event_handler; channel_factory_->CreateChannel(channel_name_, base::Bind( &ChannelDispatcherBase::OnChannelReady, base::Unretained(this))); @@ -51,16 +51,21 @@ void ChannelDispatcherBase::Init(Session* session, void ChannelDispatcherBase::OnChannelReady( scoped_ptr<net::StreamSocket> socket) { if (!socket.get()) { - initialized_callback_.Run(false); + event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR); return; } channel_factory_ = nullptr; channel_ = socket.Pass(); + writer_.Init(channel_.get(), base::Bind(&ChannelDispatcherBase::OnWriteFailed, + base::Unretained(this))); + reader_.StartReading(channel_.get()); - OnInitialized(); + event_handler_->OnChannelInitialized(this); +} - initialized_callback_.Run(true); +void ChannelDispatcherBase::OnWriteFailed(int error) { + event_handler_->OnChannelError(this, CHANNEL_CONNECTION_ERROR); } } // namespace protocol diff --git a/remoting/protocol/channel_dispatcher_base.h b/remoting/protocol/channel_dispatcher_base.h index 705b224..0c80b30 100644 --- a/remoting/protocol/channel_dispatcher_base.h +++ b/remoting/protocol/channel_dispatcher_base.h @@ -10,6 +10,9 @@ #include "base/basictypes.h" #include "base/callback.h" #include "base/memory/scoped_ptr.h" +#include "remoting/protocol/buffered_socket_writer.h" +#include "remoting/protocol/errors.h" +#include "remoting/protocol/message_reader.h" namespace net { class StreamSocket; @@ -28,6 +31,17 @@ class Session; // messages. class ChannelDispatcherBase { public: + class EventHandler { + public: + EventHandler() {} + virtual ~EventHandler() {} + + virtual void OnChannelInitialized( + ChannelDispatcherBase* channel_dispatcher) = 0; + virtual void OnChannelError(ChannelDispatcherBase* channel_dispatcher, + ErrorCode error) = 0; + }; + // The callback is called when initialization is finished. The // parameter is set to true on success. typedef base::Callback<void(bool)> InitializedCallback; @@ -38,28 +52,31 @@ class ChannelDispatcherBase { // |session|. Caller retains ownership of the Session. void Init(Session* session, const ChannelConfig& config, - const InitializedCallback& callback); + EventHandler* event_handler); + + const std::string& channel_name() { return channel_name_; } // Returns true if the channel is currently connected. - bool is_connected() { return channel() != nullptr; } + bool is_connected() { return channel_ != nullptr; } protected: explicit ChannelDispatcherBase(const char* channel_name); - net::StreamSocket* channel() { return channel_.get(); } - - // Called when channel is initialized. Must be overriden in the - // child classes. Should not delete the dispatcher. - virtual void OnInitialized() = 0; + BufferedSocketWriter* writer() { return &writer_; } + MessageReader* reader() { return &reader_; } private: void OnChannelReady(scoped_ptr<net::StreamSocket> socket); + void OnWriteFailed(int error); std::string channel_name_; StreamChannelFactory* channel_factory_; - InitializedCallback initialized_callback_; + EventHandler* event_handler_; scoped_ptr<net::StreamSocket> channel_; + BufferedSocketWriter writer_; + MessageReader reader_; + DISALLOW_COPY_AND_ASSIGN(ChannelDispatcherBase); }; diff --git a/remoting/protocol/channel_multiplexer.cc b/remoting/protocol/channel_multiplexer.cc index 62bb9d2..8d885fe 100644 --- a/remoting/protocol/channel_multiplexer.cc +++ b/remoting/protocol/channel_multiplexer.cc @@ -348,6 +348,9 @@ ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, : base_channel_factory_(factory), base_channel_name_(base_channel_name), next_channel_id_(0), + parser_(base::Bind(&ChannelMultiplexer::OnIncomingPacket, + base::Unretained(this)), + &reader_), weak_factory_(this) { } @@ -400,9 +403,7 @@ void ChannelMultiplexer::OnBaseChannelReady( if (base_channel_.get()) { // Initialize reader and writer. - reader_.Init(base_channel_.get(), - base::Bind(&ChannelMultiplexer::OnIncomingPacket, - base::Unretained(this))); + reader_.StartReading(base_channel_.get()); writer_.Init(base_channel_.get(), base::Bind(&ChannelMultiplexer::OnWriteFailed, base::Unretained(this))); diff --git a/remoting/protocol/channel_multiplexer.h b/remoting/protocol/channel_multiplexer.h index 2b93d2e..c83b0f3 100644 --- a/remoting/protocol/channel_multiplexer.h +++ b/remoting/protocol/channel_multiplexer.h @@ -9,6 +9,7 @@ #include "remoting/proto/mux.pb.h" #include "remoting/protocol/buffered_socket_writer.h" #include "remoting/protocol/message_reader.h" +#include "remoting/protocol/protobuf_message_parser.h" #include "remoting/protocol/stream_channel_factory.h" namespace remoting { @@ -78,7 +79,8 @@ class ChannelMultiplexer : public StreamChannelFactory { std::map<int, MuxChannel*> channels_by_receive_id_; BufferedSocketWriter writer_; - ProtobufMessageReader<MultiplexPacket> reader_; + MessageReader reader_; + ProtobufMessageParser<MultiplexPacket> parser_; base::WeakPtrFactory<ChannelMultiplexer> weak_factory_; diff --git a/remoting/protocol/client_control_dispatcher.cc b/remoting/protocol/client_control_dispatcher.cc index f08e6cf..d13e53f 100644 --- a/remoting/protocol/client_control_dispatcher.cc +++ b/remoting/protocol/client_control_dispatcher.cc @@ -61,69 +61,65 @@ bool CursorShapeIsValid(const CursorShapeInfo& cursor_shape) { ClientControlDispatcher::ClientControlDispatcher() : ChannelDispatcherBase(kControlChannelName), client_stub_(nullptr), - clipboard_stub_(nullptr) { + clipboard_stub_(nullptr), + parser_(base::Bind(&ClientControlDispatcher::OnMessageReceived, + base::Unretained(this)), + reader()) { } ClientControlDispatcher::~ClientControlDispatcher() { - writer_.Close(); -} - -void ClientControlDispatcher::OnInitialized() { - // TODO(garykac): Set write failed callback. - writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback()); - reader_.Init(channel(), base::Bind( - &ClientControlDispatcher::OnMessageReceived, base::Unretained(this))); } void ClientControlDispatcher::InjectClipboardEvent( const ClipboardEvent& event) { ControlMessage message; message.mutable_clipboard_event()->CopyFrom(event); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientControlDispatcher::NotifyClientResolution( const ClientResolution& resolution) { ControlMessage message; message.mutable_client_resolution()->CopyFrom(resolution); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientControlDispatcher::ControlVideo(const VideoControl& video_control) { ControlMessage message; message.mutable_video_control()->CopyFrom(video_control); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientControlDispatcher::ControlAudio(const AudioControl& audio_control) { ControlMessage message; message.mutable_audio_control()->CopyFrom(audio_control); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientControlDispatcher::SetCapabilities( const Capabilities& capabilities) { ControlMessage message; message.mutable_capabilities()->CopyFrom(capabilities); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientControlDispatcher::RequestPairing( const PairingRequest& pairing_request) { ControlMessage message; message.mutable_pairing_request()->CopyFrom(pairing_request); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientControlDispatcher::DeliverClientMessage( const ExtensionMessage& message) { ControlMessage control_message; control_message.mutable_extension_message()->CopyFrom(message); - writer_.Write(SerializeAndFrameMessage(control_message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(control_message), base::Closure()); } void ClientControlDispatcher::OnMessageReceived( - scoped_ptr<ControlMessage> message, const base::Closure& done_task) { + scoped_ptr<ControlMessage> message, + const base::Closure& done_task) { DCHECK(client_stub_); DCHECK(clipboard_stub_); base::ScopedClosureRunner done_runner(done_task); diff --git a/remoting/protocol/client_control_dispatcher.h b/remoting/protocol/client_control_dispatcher.h index e4612c0..dc9035a 100644 --- a/remoting/protocol/client_control_dispatcher.h +++ b/remoting/protocol/client_control_dispatcher.h @@ -11,7 +11,7 @@ #include "remoting/protocol/clipboard_stub.h" #include "remoting/protocol/cursor_shape_stub.h" #include "remoting/protocol/host_stub.h" -#include "remoting/protocol/message_reader.h" +#include "remoting/protocol/protobuf_message_parser.h" namespace remoting { namespace protocol { @@ -51,10 +51,6 @@ class ClientControlDispatcher : public ChannelDispatcherBase, clipboard_stub_ = clipboard_stub; } - protected: - // ChannelDispatcherBase overrides. - void OnInitialized() override; - private: void OnMessageReceived(scoped_ptr<ControlMessage> message, const base::Closure& done_task); @@ -62,8 +58,7 @@ class ClientControlDispatcher : public ChannelDispatcherBase, ClientStub* client_stub_; ClipboardStub* clipboard_stub_; - ProtobufMessageReader<ControlMessage> reader_; - BufferedSocketWriter writer_; + ProtobufMessageParser<ControlMessage> parser_; DISALLOW_COPY_AND_ASSIGN(ClientControlDispatcher); }; diff --git a/remoting/protocol/client_event_dispatcher.cc b/remoting/protocol/client_event_dispatcher.cc index 983cba6..89e4bcf 100644 --- a/remoting/protocol/client_event_dispatcher.cc +++ b/remoting/protocol/client_event_dispatcher.cc @@ -20,13 +20,6 @@ ClientEventDispatcher::ClientEventDispatcher() } ClientEventDispatcher::~ClientEventDispatcher() { - writer_.Close(); -} - -void ClientEventDispatcher::OnInitialized() { - // TODO(garykac): Set write failed callback. - writer_.Init(channel(), - BufferedSocketWriter::WriteFailedCallback()); } void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) { @@ -35,7 +28,7 @@ void ClientEventDispatcher::InjectKeyEvent(const KeyEvent& event) { EventMessage message; message.set_timestamp(base::Time::Now().ToInternalValue()); message.mutable_key_event()->CopyFrom(event); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) { @@ -43,14 +36,14 @@ void ClientEventDispatcher::InjectTextEvent(const TextEvent& event) { EventMessage message; message.set_timestamp(base::Time::Now().ToInternalValue()); message.mutable_text_event()->CopyFrom(event); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void ClientEventDispatcher::InjectMouseEvent(const MouseEvent& event) { EventMessage message; message.set_timestamp(base::Time::Now().ToInternalValue()); message.mutable_mouse_event()->CopyFrom(event); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } } // namespace protocol diff --git a/remoting/protocol/client_event_dispatcher.h b/remoting/protocol/client_event_dispatcher.h index ca61740..6a3c54f 100644 --- a/remoting/protocol/client_event_dispatcher.h +++ b/remoting/protocol/client_event_dispatcher.h @@ -25,13 +25,7 @@ class ClientEventDispatcher : public ChannelDispatcherBase, public InputStub { void InjectTextEvent(const TextEvent& event) override; void InjectMouseEvent(const MouseEvent& event) override; - protected: - // ChannelDispatcherBase overrides. - void OnInitialized() override; - private: - BufferedSocketWriter writer_; - DISALLOW_COPY_AND_ASSIGN(ClientEventDispatcher); }; diff --git a/remoting/protocol/client_video_dispatcher.cc b/remoting/protocol/client_video_dispatcher.cc index bca586c..287bba3 100644 --- a/remoting/protocol/client_video_dispatcher.cc +++ b/remoting/protocol/client_video_dispatcher.cc @@ -15,16 +15,13 @@ namespace protocol { ClientVideoDispatcher::ClientVideoDispatcher(VideoStub* video_stub) : ChannelDispatcherBase(kVideoChannelName), - video_stub_(video_stub) { + parser_(base::Bind(&VideoStub::ProcessVideoPacket, + base::Unretained(video_stub)), + reader()) { } ClientVideoDispatcher::~ClientVideoDispatcher() { } -void ClientVideoDispatcher::OnInitialized() { - reader_.Init(channel(), base::Bind(&VideoStub::ProcessVideoPacket, - base::Unretained(video_stub_))); -} - } // namespace protocol } // namespace remoting diff --git a/remoting/protocol/client_video_dispatcher.h b/remoting/protocol/client_video_dispatcher.h index fb726f0..b2b1e31 100644 --- a/remoting/protocol/client_video_dispatcher.h +++ b/remoting/protocol/client_video_dispatcher.h @@ -8,7 +8,7 @@ #include "base/compiler_specific.h" #include "remoting/proto/video.pb.h" #include "remoting/protocol/channel_dispatcher_base.h" -#include "remoting/protocol/message_reader.h" +#include "remoting/protocol/protobuf_message_parser.h" namespace remoting { namespace protocol { @@ -20,15 +20,8 @@ class ClientVideoDispatcher : public ChannelDispatcherBase { explicit ClientVideoDispatcher(VideoStub* video_stub); ~ClientVideoDispatcher() override; - protected: - // ChannelDispatcherBase overrides. - void OnInitialized() override; - private: - ProtobufMessageReader<VideoPacket> reader_; - - // The stub to which VideoPackets are passed for processing. - VideoStub* video_stub_; + ProtobufMessageParser<VideoPacket> parser_; DISALLOW_COPY_AND_ASSIGN(ClientVideoDispatcher); }; diff --git a/remoting/protocol/connection_to_client.cc b/remoting/protocol/connection_to_client.cc index 536fa75..6894a9c 100644 --- a/remoting/protocol/connection_to_client.cc +++ b/remoting/protocol/connection_to_client.cc @@ -119,34 +119,26 @@ void ConnectionToClient::OnSessionStateChange(Session::State state) { case Session::AUTHENTICATED: // Initialize channels. control_dispatcher_.reset(new HostControlDispatcher()); - control_dispatcher_->Init( - session_.get(), session_->config().control_config(), - base::Bind(&ConnectionToClient::OnChannelInitialized, - base::Unretained(this))); + control_dispatcher_->Init(session_.get(), + session_->config().control_config(), this); control_dispatcher_->set_clipboard_stub(clipboard_stub_); control_dispatcher_->set_host_stub(host_stub_); event_dispatcher_.reset(new HostEventDispatcher()); - event_dispatcher_->Init( - session_.get(), session_->config().event_config(), - base::Bind(&ConnectionToClient::OnChannelInitialized, - base::Unretained(this))); + event_dispatcher_->Init(session_.get(), session_->config().event_config(), + this); event_dispatcher_->set_input_stub(input_stub_); event_dispatcher_->set_event_timestamp_callback(base::Bind( &ConnectionToClient::OnEventTimestamp, base::Unretained(this))); video_dispatcher_.reset(new HostVideoDispatcher()); - video_dispatcher_->Init( - session_.get(), session_->config().video_config(), - base::Bind(&ConnectionToClient::OnChannelInitialized, - base::Unretained(this))); + video_dispatcher_->Init(session_.get(), session_->config().video_config(), + this); audio_writer_ = AudioWriter::Create(session_->config()); if (audio_writer_.get()) { - audio_writer_->Init( - session_.get(), session_->config().audio_config(), - base::Bind(&ConnectionToClient::OnChannelInitialized, - base::Unretained(this))); + audio_writer_->Init(session_.get(), session_->config().audio_config(), + this); } // Notify the handler after initializing the channels, so that @@ -170,28 +162,33 @@ void ConnectionToClient::OnSessionRouteChange( handler_->OnRouteChange(this, channel_name, route); } -void ConnectionToClient::OnChannelInitialized(bool successful) { +void ConnectionToClient::OnChannelInitialized( + ChannelDispatcherBase* channel_dispatcher) { DCHECK(CalledOnValidThread()); - if (!successful) { - LOG(ERROR) << "Failed to connect a channel"; - Close(CHANNEL_CONNECTION_ERROR); - return; - } - NotifyIfChannelsReady(); } +void ConnectionToClient::OnChannelError( + ChannelDispatcherBase* channel_dispatcher, + ErrorCode error) { + DCHECK(CalledOnValidThread()); + + LOG(ERROR) << "Failed to connect channel " + << channel_dispatcher->channel_name(); + Close(CHANNEL_CONNECTION_ERROR); +} + void ConnectionToClient::NotifyIfChannelsReady() { DCHECK(CalledOnValidThread()); - if (!control_dispatcher_.get() || !control_dispatcher_->is_connected()) + if (!control_dispatcher_ || !control_dispatcher_->is_connected()) return; - if (!event_dispatcher_.get() || !event_dispatcher_->is_connected()) + if (!event_dispatcher_ || !event_dispatcher_->is_connected()) return; - if (!video_dispatcher_.get() || !video_dispatcher_->is_connected()) + if (!video_dispatcher_ || !video_dispatcher_->is_connected()) return; - if ((!audio_writer_.get() || !audio_writer_->is_connected()) && + if ((!audio_writer_ || !audio_writer_->is_connected()) && session_->config().is_audio_enabled()) { return; } diff --git a/remoting/protocol/connection_to_client.h b/remoting/protocol/connection_to_client.h index 8f075b7..11f4a34 100644 --- a/remoting/protocol/connection_to_client.h +++ b/remoting/protocol/connection_to_client.h @@ -31,7 +31,8 @@ class VideoStub; // host. It sets up all protocol channels and connects them to the // stubs. class ConnectionToClient : public base::NonThreadSafe, - public Session::EventHandler { + public Session::EventHandler, + public ChannelDispatcherBase::EventHandler { public: class EventHandler { public: @@ -105,10 +106,12 @@ class ConnectionToClient : public base::NonThreadSafe, void OnSessionRouteChange(const std::string& channel_name, const TransportRoute& route) override; - private: - // Callback for channel initialization. - void OnChannelInitialized(bool successful); + // ChannelDispatcherBase::EventHandler interface. + void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override; + void OnChannelError(ChannelDispatcherBase* channel_dispatcher, + ErrorCode error) override; + private: void NotifyIfChannelsReady(); void Close(ErrorCode error); diff --git a/remoting/protocol/connection_to_host.cc b/remoting/protocol/connection_to_host.cc index 9217202..e2346e6 100644 --- a/remoting/protocol/connection_to_host.cc +++ b/remoting/protocol/connection_to_host.cc @@ -184,31 +184,24 @@ void ConnectionToHost::OnSessionStateChange( SetState(AUTHENTICATED, OK); control_dispatcher_.reset(new ClientControlDispatcher()); - control_dispatcher_->Init( - session_.get(), session_->config().control_config(), - base::Bind(&ConnectionToHost::OnChannelInitialized, - base::Unretained(this))); + control_dispatcher_->Init(session_.get(), + session_->config().control_config(), this); control_dispatcher_->set_client_stub(client_stub_); control_dispatcher_->set_clipboard_stub(clipboard_stub_); event_dispatcher_.reset(new ClientEventDispatcher()); - event_dispatcher_->Init( - session_.get(), session_->config().event_config(), - base::Bind(&ConnectionToHost::OnChannelInitialized, - base::Unretained(this))); + event_dispatcher_->Init(session_.get(), session_->config().event_config(), + this); video_dispatcher_.reset( new ClientVideoDispatcher(monitored_video_stub_.get())); video_dispatcher_->Init(session_.get(), session_->config().video_config(), - base::Bind(&ConnectionToHost::OnChannelInitialized, - base::Unretained(this))); + this); - audio_reader_ = AudioReader::Create(session_->config()); - if (audio_reader_.get()) { + if (session_->config().is_audio_enabled()) { + audio_reader_.reset(new AudioReader(audio_stub_)); audio_reader_->Init(session_.get(), session_->config().audio_config(), - base::Bind(&ConnectionToHost::OnChannelInitialized, - base::Unretained(this))); - audio_reader_->set_audio_stub(audio_stub_); + this); } break; @@ -241,6 +234,19 @@ void ConnectionToHost::OnSessionRouteChange(const std::string& channel_name, event_callback_->OnRouteChanged(channel_name, route); } +void ConnectionToHost::OnChannelInitialized( + ChannelDispatcherBase* channel_dispatcher) { + NotifyIfChannelsReady(); +} + +void ConnectionToHost::OnChannelError( + ChannelDispatcherBase* channel_dispatcher, + ErrorCode error) { + LOG(ERROR) << "Failed to connect channel " << channel_dispatcher; + CloseOnError(CHANNEL_CONNECTION_ERROR); + return; +} + void ConnectionToHost::OnVideoChannelStatus(bool active) { event_callback_->OnConnectionReady(active); } @@ -249,16 +255,6 @@ ConnectionToHost::State ConnectionToHost::state() const { return state_; } -void ConnectionToHost::OnChannelInitialized(bool successful) { - if (!successful) { - LOG(ERROR) << "Failed to connect video channel"; - CloseOnError(CHANNEL_CONNECTION_ERROR); - return; - } - - NotifyIfChannelsReady(); -} - void ConnectionToHost::NotifyIfChannelsReady() { if (!control_dispatcher_.get() || !control_dispatcher_->is_connected()) return; diff --git a/remoting/protocol/connection_to_host.h b/remoting/protocol/connection_to_host.h index 7bc648a..605bea3 100644 --- a/remoting/protocol/connection_to_host.h +++ b/remoting/protocol/connection_to_host.h @@ -13,6 +13,7 @@ #include "base/memory/scoped_ptr.h" #include "base/threading/non_thread_safe.h" #include "remoting/proto/internal.pb.h" +#include "remoting/protocol/channel_dispatcher_base.h" #include "remoting/protocol/clipboard_filter.h" #include "remoting/protocol/errors.h" #include "remoting/protocol/input_filter.h" @@ -47,6 +48,7 @@ class VideoStub; class ConnectionToHost : public SignalStrategy::Listener, public SessionManager::Listener, public Session::EventHandler, + public ChannelDispatcherBase::EventHandler, public base::NonThreadSafe { public: // The UI implementations maintain corresponding definitions of this @@ -133,6 +135,11 @@ class ConnectionToHost : public SignalStrategy::Listener, void OnSessionRouteChange(const std::string& channel_name, const TransportRoute& route) override; + // ChannelDispatcherBase::EventHandler interface. + void OnChannelInitialized(ChannelDispatcherBase* channel_dispatcher) override; + void OnChannelError(ChannelDispatcherBase* channel_dispatcher, + ErrorCode error) override; + // MonitoredVideoStub::EventHandler interface. virtual void OnVideoChannelStatus(bool active); @@ -140,9 +147,6 @@ class ConnectionToHost : public SignalStrategy::Listener, State state() const; private: - // Callbacks for channel initialization - void OnChannelInitialized(bool successful); - void NotifyIfChannelsReady(); void CloseOnError(ErrorCode error); diff --git a/remoting/protocol/host_control_dispatcher.cc b/remoting/protocol/host_control_dispatcher.cc index 0c19d17..e323d09 100644 --- a/remoting/protocol/host_control_dispatcher.cc +++ b/remoting/protocol/host_control_dispatcher.cc @@ -20,51 +20,47 @@ namespace protocol { HostControlDispatcher::HostControlDispatcher() : ChannelDispatcherBase(kControlChannelName), clipboard_stub_(nullptr), - host_stub_(nullptr) { + host_stub_(nullptr), + parser_(base::Bind(&HostControlDispatcher::OnMessageReceived, + base::Unretained(this)), + reader()) { } HostControlDispatcher::~HostControlDispatcher() { - writer_.Close(); -} - -void HostControlDispatcher::OnInitialized() { - reader_.Init(channel(), base::Bind( - &HostControlDispatcher::OnMessageReceived, base::Unretained(this))); - writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback()); } void HostControlDispatcher::SetCapabilities( const Capabilities& capabilities) { ControlMessage message; message.mutable_capabilities()->CopyFrom(capabilities); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void HostControlDispatcher::SetPairingResponse( const PairingResponse& pairing_response) { ControlMessage message; message.mutable_pairing_response()->CopyFrom(pairing_response); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void HostControlDispatcher::DeliverHostMessage( const ExtensionMessage& message) { ControlMessage control_message; control_message.mutable_extension_message()->CopyFrom(message); - writer_.Write(SerializeAndFrameMessage(control_message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(control_message), base::Closure()); } void HostControlDispatcher::InjectClipboardEvent(const ClipboardEvent& event) { ControlMessage message; message.mutable_clipboard_event()->CopyFrom(event); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void HostControlDispatcher::SetCursorShape( const CursorShapeInfo& cursor_shape) { ControlMessage message; message.mutable_cursor_shape()->CopyFrom(cursor_shape); - writer_.Write(SerializeAndFrameMessage(message), base::Closure()); + writer()->Write(SerializeAndFrameMessage(message), base::Closure()); } void HostControlDispatcher::OnMessageReceived( diff --git a/remoting/protocol/host_control_dispatcher.h b/remoting/protocol/host_control_dispatcher.h index eab64c2..3fe8e73 100644 --- a/remoting/protocol/host_control_dispatcher.h +++ b/remoting/protocol/host_control_dispatcher.h @@ -10,7 +10,7 @@ #include "remoting/protocol/client_stub.h" #include "remoting/protocol/clipboard_stub.h" #include "remoting/protocol/cursor_shape_stub.h" -#include "remoting/protocol/message_reader.h" +#include "remoting/protocol/protobuf_message_parser.h" namespace net { class StreamSocket; @@ -54,10 +54,6 @@ class HostControlDispatcher : public ChannelDispatcherBase, // message. |host_stub| must outlive this object. void set_host_stub(HostStub* host_stub) { host_stub_ = host_stub; } - protected: - // ChannelDispatcherBase overrides. - void OnInitialized() override; - private: void OnMessageReceived(scoped_ptr<ControlMessage> message, const base::Closure& done_task); @@ -65,8 +61,7 @@ class HostControlDispatcher : public ChannelDispatcherBase, ClipboardStub* clipboard_stub_; HostStub* host_stub_; - ProtobufMessageReader<ControlMessage> reader_; - BufferedSocketWriter writer_; + ProtobufMessageParser<ControlMessage> parser_; DISALLOW_COPY_AND_ASSIGN(HostControlDispatcher); }; diff --git a/remoting/protocol/host_event_dispatcher.cc b/remoting/protocol/host_event_dispatcher.cc index 7bc93fb..b1addbd 100644 --- a/remoting/protocol/host_event_dispatcher.cc +++ b/remoting/protocol/host_event_dispatcher.cc @@ -16,19 +16,17 @@ namespace protocol { HostEventDispatcher::HostEventDispatcher() : ChannelDispatcherBase(kEventChannelName), - input_stub_(nullptr) { + input_stub_(nullptr), + parser_(base::Bind(&HostEventDispatcher::OnMessageReceived, + base::Unretained(this)), + reader()) { } HostEventDispatcher::~HostEventDispatcher() { } -void HostEventDispatcher::OnInitialized() { - reader_.Init(channel(), base::Bind( - &HostEventDispatcher::OnMessageReceived, base::Unretained(this))); -} - -void HostEventDispatcher::OnMessageReceived( - scoped_ptr<EventMessage> message, const base::Closure& done_task) { +void HostEventDispatcher::OnMessageReceived(scoped_ptr<EventMessage> message, + const base::Closure& done_task) { DCHECK(input_stub_); base::ScopedClosureRunner done_runner(done_task); diff --git a/remoting/protocol/host_event_dispatcher.h b/remoting/protocol/host_event_dispatcher.h index aaa5fe9..3f1bd88 100644 --- a/remoting/protocol/host_event_dispatcher.h +++ b/remoting/protocol/host_event_dispatcher.h @@ -6,7 +6,7 @@ #define REMOTING_PROTOCOL_HOST_EVENT_DISPATCHER_H_ #include "remoting/protocol/channel_dispatcher_base.h" -#include "remoting/protocol/message_reader.h" +#include "remoting/protocol/protobuf_message_parser.h" namespace remoting { namespace protocol { @@ -34,10 +34,6 @@ class HostEventDispatcher : public ChannelDispatcherBase { event_timestamp_callback_ = value; } - protected: - // ChannelDispatcherBase overrides. - void OnInitialized() override; - private: void OnMessageReceived(scoped_ptr<EventMessage> message, const base::Closure& done_task); @@ -45,7 +41,7 @@ class HostEventDispatcher : public ChannelDispatcherBase { InputStub* input_stub_; EventTimestampCallback event_timestamp_callback_; - ProtobufMessageReader<EventMessage> reader_; + ProtobufMessageParser<EventMessage> parser_; DISALLOW_COPY_AND_ASSIGN(HostEventDispatcher); }; diff --git a/remoting/protocol/host_video_dispatcher.cc b/remoting/protocol/host_video_dispatcher.cc index 08ac857..b9a12c0 100644 --- a/remoting/protocol/host_video_dispatcher.cc +++ b/remoting/protocol/host_video_dispatcher.cc @@ -20,14 +20,9 @@ HostVideoDispatcher::HostVideoDispatcher() HostVideoDispatcher::~HostVideoDispatcher() { } -void HostVideoDispatcher::OnInitialized() { - // TODO(sergeyu): Provide WriteFailedCallback for the buffered writer. - writer_.Init(channel(), BufferedSocketWriter::WriteFailedCallback()); -} - void HostVideoDispatcher::ProcessVideoPacket(scoped_ptr<VideoPacket> packet, const base::Closure& done) { - writer_.Write(SerializeAndFrameMessage(*packet), done); + writer()->Write(SerializeAndFrameMessage(*packet), done); } } // namespace protocol diff --git a/remoting/protocol/host_video_dispatcher.h b/remoting/protocol/host_video_dispatcher.h index 36b9c69..c1a9573 100644 --- a/remoting/protocol/host_video_dispatcher.h +++ b/remoting/protocol/host_video_dispatcher.h @@ -24,13 +24,7 @@ class HostVideoDispatcher : public ChannelDispatcherBase, public VideoStub { void ProcessVideoPacket(scoped_ptr<VideoPacket> packet, const base::Closure& done) override; - protected: - // ChannelDispatcherBase overrides. - void OnInitialized() override; - private: - BufferedSocketWriter writer_; - DISALLOW_COPY_AND_ASSIGN(HostVideoDispatcher); }; diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc index 9528b83..6e3d536 100644 --- a/remoting/protocol/message_reader.cc +++ b/remoting/protocol/message_reader.cc @@ -29,18 +29,22 @@ MessageReader::MessageReader() weak_factory_(this) { } -void MessageReader::Init(net::Socket* socket, - const MessageReceivedCallback& callback) { +MessageReader::~MessageReader() { +} + +void MessageReader::SetMessageReceivedCallback( + const MessageReceivedCallback& callback) { DCHECK(CalledOnValidThread()); message_received_callback_ = callback; +} + +void MessageReader::StartReading(net::Socket* socket) { + DCHECK(CalledOnValidThread()); DCHECK(socket); socket_ = socket; DoRead(); } -MessageReader::~MessageReader() { -} - void MessageReader::DoRead() { DCHECK(CalledOnValidThread()); // Don't try to read again if there is another read pending or we @@ -104,9 +108,11 @@ void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { } void MessageReader::RunCallback(scoped_ptr<CompoundBuffer> message) { - message_received_callback_.Run( - message.Pass(), base::Bind(&MessageReader::OnMessageDone, - weak_factory_.GetWeakPtr())); + if (!message_received_callback_.is_null()){ + message_received_callback_.Run( + message.Pass(), + base::Bind(&MessageReader::OnMessageDone, weak_factory_.GetWeakPtr())); + } } void MessageReader::OnMessageDone() { diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h index fb6c92e..65d52d0 100644 --- a/remoting/protocol/message_reader.h +++ b/remoting/protocol/message_reader.h @@ -5,12 +5,10 @@ #ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_ #define REMOTING_PROTOCOL_MESSAGE_READER_H_ -#include "base/bind.h" #include "base/callback.h" #include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/threading/non_thread_safe.h" -#include "net/base/completion_callback.h" #include "remoting/base/compound_buffer.h" #include "remoting/protocol/message_decoder.h" @@ -41,9 +39,11 @@ class MessageReader : public base::NonThreadSafe { MessageReader(); virtual ~MessageReader(); - // Initialize the MessageReader with a socket. If a message is received - // |callback| is called. - void Init(net::Socket* socket, const MessageReceivedCallback& callback); + // Sets the callback to be called for each incoming message. + void SetMessageReceivedCallback(const MessageReceivedCallback& callback); + + // Starts reading from |socket|. + void StartReading(net::Socket* socket); private: void DoRead(); @@ -77,47 +77,6 @@ class MessageReader : public base::NonThreadSafe { DISALLOW_COPY_AND_ASSIGN(MessageReader); }; -// Version of MessageReader for protocol buffer messages, that parses -// each incoming message. -template <class T> -class ProtobufMessageReader { - public: - // The callback that is called when a new message is received. |done_task| - // must be called by the callback when it's done processing the |message|. - typedef typename base::Callback<void(scoped_ptr<T> message, - const base::Closure& done_task)> - MessageReceivedCallback; - - ProtobufMessageReader() { }; - ~ProtobufMessageReader() { }; - - void Init(net::Socket* socket, const MessageReceivedCallback& callback) { - DCHECK(!callback.is_null()); - message_received_callback_ = callback; - message_reader_.reset(new MessageReader()); - message_reader_->Init( - socket, base::Bind(&ProtobufMessageReader<T>::OnNewData, - base::Unretained(this))); - } - - private: - void OnNewData(scoped_ptr<CompoundBuffer> buffer, - const base::Closure& done_task) { - scoped_ptr<T> message(new T()); - CompoundBufferInputStream stream(buffer.get()); - bool ret = message->ParseFromZeroCopyStream(&stream); - if (!ret) { - LOG(WARNING) << "Received message that is not a valid protocol buffer."; - } else { - DCHECK_EQ(stream.position(), buffer->total_bytes()); - message_received_callback_.Run(message.Pass(), done_task); - } - } - - scoped_ptr<MessageReader> message_reader_; - MessageReceivedCallback message_received_callback_; -}; - } // namespace protocol } // namespace remoting diff --git a/remoting/protocol/message_reader_unittest.cc b/remoting/protocol/message_reader_unittest.cc index 5c43788..68bcde6 100644 --- a/remoting/protocol/message_reader_unittest.cc +++ b/remoting/protocol/message_reader_unittest.cc @@ -67,13 +67,16 @@ class MessageReaderTest : public testing::Test { } protected: - void SetUp() override { reader_.reset(new MessageReader()); } + void SetUp() override { + reader_.reset(new MessageReader()); + } void TearDown() override { STLDeleteElements(&messages_); } void InitReader() { - reader_->Init(&socket_, base::Bind( - &MessageReaderTest::OnMessage, base::Unretained(this))); + reader_->SetMessageReceivedCallback( + base::Bind(&MessageReaderTest::OnMessage, base::Unretained(this))); + reader_->StartReading(&socket_); } void AddMessage(const std::string& message) { diff --git a/remoting/protocol/protobuf_message_parser.h b/remoting/protocol/protobuf_message_parser.h new file mode 100644 index 0000000..68840b1 --- /dev/null +++ b/remoting/protocol/protobuf_message_parser.h @@ -0,0 +1,62 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_ +#define REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_ + +#include "base/bind.h" +#include "base/callback.h" +#include "base/memory/scoped_ptr.h" +#include "remoting/base/compound_buffer.h" +#include "remoting/protocol/message_reader.h" + +namespace remoting { +namespace protocol { + +// Version of MessageReader for protocol buffer messages, that parses +// each incoming message. +template <class T> +class ProtobufMessageParser { + public: + // The callback that is called when a new message is received. |done_task| + // must be called by the callback when it's done processing the |message|. + typedef typename base::Callback<void(scoped_ptr<T> message, + const base::Closure& done_task)> + MessageReceivedCallback; + + // |message_reader| must outlive ProtobufMessageParser. + ProtobufMessageParser(const MessageReceivedCallback& callback, + MessageReader* message_reader) + : message_reader_(message_reader), + message_received_callback_(callback) { + message_reader->SetMessageReceivedCallback(base::Bind( + &ProtobufMessageParser<T>::OnNewData, base::Unretained(this))); + } + ~ProtobufMessageParser() { + message_reader_->SetMessageReceivedCallback( + MessageReader::MessageReceivedCallback()); + } + + private: + void OnNewData(scoped_ptr<CompoundBuffer> buffer, + const base::Closure& done_task) { + scoped_ptr<T> message(new T()); + CompoundBufferInputStream stream(buffer.get()); + bool ret = message->ParseFromZeroCopyStream(&stream); + if (!ret) { + LOG(WARNING) << "Received message that is not a valid protocol buffer."; + } else { + DCHECK_EQ(stream.position(), buffer->total_bytes()); + message_received_callback_.Run(message.Pass(), done_task); + } + } + + MessageReader* message_reader_; + MessageReceivedCallback message_received_callback_; +}; + +} // namespace protocol +} // namespace remoting + +#endif // REMOTING_PROTOCOL_PROTOBUF_MESSAGE_PARSER_H_ |