diff options
author | hclam@chromium.org <hclam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-10-28 18:43:37 +0000 |
---|---|---|
committer | hclam@chromium.org <hclam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-10-28 18:43:37 +0000 |
commit | 4d10edeb5f3db4366b4521c2346bfb4741c21e6f (patch) | |
tree | 98a0a1d7d295cc31c0840977dc15134c1271de31 | |
parent | 45b10f0917d6449aa53f66a6ea7fa6e8a4aea079 (diff) | |
download | chromium_src-4d10edeb5f3db4366b4521c2346bfb4741c21e6f.zip chromium_src-4d10edeb5f3db4366b4521c2346bfb4741c21e6f.tar.gz chromium_src-4d10edeb5f3db4366b4521c2346bfb4741c21e6f.tar.bz2 |
HostMessageDispatcher to parse control messages
Changed MessageReader and MessageDecoder to support parsing in
HostMessageDispatcher.
BUG=None
TEST=None
Review URL: http://codereview.chromium.org/4017002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@64283 0039d316-1c4b-4281-b951-d872f2087c98
29 files changed, 579 insertions, 465 deletions
diff --git a/remoting/base/multiple_array_input_stream.cc b/remoting/base/multiple_array_input_stream.cc index ab0ba45..d2127c4 100644 --- a/remoting/base/multiple_array_input_stream.cc +++ b/remoting/base/multiple_array_input_stream.cc @@ -5,13 +5,13 @@ #include <functional> #include "base/logging.h" +#include "net/base/io_buffer.h" #include "remoting/base/multiple_array_input_stream.h" namespace remoting { MultipleArrayInputStream::MultipleArrayInputStream() : current_buffer_(0), - current_buffer_offset_(0), position_(0), last_returned_size_(0) { } @@ -19,24 +19,21 @@ MultipleArrayInputStream::MultipleArrayInputStream() MultipleArrayInputStream::~MultipleArrayInputStream() { } -void MultipleArrayInputStream::AddBuffer( - const char* buffer, int size) { +void MultipleArrayInputStream::AddBuffer(net::IOBuffer* buffer, int size) { DCHECK_EQ(position_, 0); // Haven't started reading. - buffers_.push_back(buffer); - buffer_sizes_.push_back(size); - DCHECK_EQ(buffers_.size(), buffer_sizes_.size()); + buffers_.push_back(new net::DrainableIOBuffer(buffer, size)); } bool MultipleArrayInputStream::Next(const void** data, int* size) { if (current_buffer_ < buffers_.size()) { - // Also reply with that is remaining in the current buffer. - last_returned_size_ = - buffer_sizes_[current_buffer_] - current_buffer_offset_; - *data = buffers_[current_buffer_] + current_buffer_offset_; + // Reply with the number of bytes remaining in the current buffer. + scoped_refptr<net::DrainableIOBuffer> buffer = buffers_[current_buffer_]; + last_returned_size_ = buffer->BytesRemaining(); + *data = buffer->data(); *size = last_returned_size_; // After reading the current buffer then advance to the next buffer. - current_buffer_offset_ = 0; + buffer->DidConsume(last_returned_size_); ++current_buffer_; position_ += last_returned_size_; return true; @@ -52,14 +49,13 @@ bool MultipleArrayInputStream::Next(const void** data, int* size) { void MultipleArrayInputStream::BackUp(int count) { DCHECK_LE(count, last_returned_size_); - DCHECK_EQ(0, current_buffer_offset_); DCHECK_GT(current_buffer_, 0u); - // Rewind one buffer. + // Rewind one buffer and rewind data offset by |count| bytes. --current_buffer_; - current_buffer_offset_ = buffer_sizes_[current_buffer_] - count; + scoped_refptr<net::DrainableIOBuffer> buffer = buffers_[current_buffer_]; + buffer->SetOffset(buffer->size() - count); position_ -= count; - DCHECK_GE(current_buffer_offset_, 0); DCHECK_GE(position_, 0); } @@ -68,20 +64,17 @@ bool MultipleArrayInputStream::Skip(int count) { last_returned_size_ = 0; while (count && current_buffer_ < buffers_.size()) { - int read = std::min( - count, - buffer_sizes_[current_buffer_] - current_buffer_offset_); + scoped_refptr<net::DrainableIOBuffer> buffer = buffers_[current_buffer_]; + int read = std::min(count, buffer->BytesRemaining()); // Advance the current buffer offset and position. - current_buffer_offset_ += read; + buffer->DidConsume(read); position_ += read; count -= read; // If the current buffer is fully read, then advance to the next buffer. - if (current_buffer_offset_ == buffer_sizes_[current_buffer_]) { + if (!buffer->BytesRemaining()) ++current_buffer_; - current_buffer_offset_ = 0; - } } return count == 0; } diff --git a/remoting/base/multiple_array_input_stream.h b/remoting/base/multiple_array_input_stream.h index 7747da5..a848248 100644 --- a/remoting/base/multiple_array_input_stream.h +++ b/remoting/base/multiple_array_input_stream.h @@ -2,29 +2,39 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// MultipleArrayInputStream implements ZeroCopyInputStream to be used by +// protobuf to decode bytes into a protocol buffer message. +// +// This input stream is made of multiple IOBuffers received from the network. +// This object retains the IOBuffers added to it. +// +// Internally, we wrap each added IOBuffer in a DrainableIOBuffer. This allows +// us to track how much data has been consumed from each IOBuffer. + #ifndef REMOTING_BASE_MULTIPLE_ARRAY_INPUT_STREAM_H_ #define REMOTING_BASE_MULTIPLE_ARRAY_INPUT_STREAM_H_ #include <vector> #include "base/basictypes.h" +#include "base/ref_counted.h" #include "google/protobuf/io/zero_copy_stream.h" +namespace net { +class DrainableIOBuffer; +class IOBuffer; +} // namespace net + namespace remoting { -// A MultipleArrayInputStream provides a ZeroCopyInputStream with multiple -// backing arrays. class MultipleArrayInputStream : public google::protobuf::io::ZeroCopyInputStream { public: - // Construct a MultipleArrayInputStream with |count| backing arrays. - // TODO(hclam): Consider adding block size to see if it has a performance - // gain. MultipleArrayInputStream(); virtual ~MultipleArrayInputStream(); - // Add a new buffer to the list. - void AddBuffer(const char* buffer, int size); + // Add a buffer to the list. |buffer| is retained by this object. + void AddBuffer(net::IOBuffer* buffer, int size); // google::protobuf::io::ZeroCopyInputStream interface. virtual bool Next(const void** data, int* size); @@ -33,11 +43,9 @@ class MultipleArrayInputStream : virtual int64 ByteCount() const; private: - std::vector<const char*> buffers_; - std::vector<int> buffer_sizes_; + std::vector<scoped_refptr<net::DrainableIOBuffer> > buffers_; size_t current_buffer_; - int current_buffer_offset_; int position_; int last_returned_size_; diff --git a/remoting/base/multiple_array_input_stream_unittest.cc b/remoting/base/multiple_array_input_stream_unittest.cc index 1d705fe..4a840e4 100644 --- a/remoting/base/multiple_array_input_stream_unittest.cc +++ b/remoting/base/multiple_array_input_stream_unittest.cc @@ -5,6 +5,7 @@ #include <string> #include "base/scoped_ptr.h" +#include "net/base/io_buffer.h" #include "remoting/base/multiple_array_input_stream.h" #include "testing/gtest/include/gtest/gtest.h" @@ -71,7 +72,7 @@ static void PrepareData(scoped_ptr<MultipleArrayInputStream>* stream) { const char* data = kTestData.c_str(); for (int i = 0; i < segments; ++i) { int size = i % 2 == 0 ? 1 : 2; - mstream->AddBuffer(data, size); + mstream->AddBuffer(new net::StringIOBuffer(std::string(data, size)), size); data += size; } stream->reset(mstream); diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc index af3d8f1..95271be 100644 --- a/remoting/client/chromoting_client.cc +++ b/remoting/client/chromoting_client.cc @@ -11,6 +11,7 @@ #include "remoting/client/host_connection.h" #include "remoting/client/input_handler.h" #include "remoting/client/rectangle_update_decoder.h" +#include "remoting/proto/internal.pb.h" namespace remoting { diff --git a/remoting/client/chromoting_client.h b/remoting/client/chromoting_client.h index 74fa35d..e476f31 100644 --- a/remoting/client/chromoting_client.h +++ b/remoting/client/chromoting_client.h @@ -7,11 +7,12 @@ #ifndef REMOTING_CLIENT_CHROMOTING_CLIENT_H #define REMOTING_CLIENT_CHROMOTING_CLIENT_H +#include <list> + #include "base/task.h" #include "remoting/client/host_connection.h" #include "remoting/client/client_config.h" #include "remoting/client/chromoting_view.h" -#include "remoting/protocol/messages_decoder.h" class MessageLoop; @@ -89,7 +90,7 @@ class ChromotingClient : public HostConnection::HostEventCallback { // processed. // // Used to serialize sending of messages to the client. - HostMessageList received_messages_; + std::list<ChromotingHostMessage*> received_messages_; // True if a message is being processed. Can be used to determine if it is // safe to dispatch another message. diff --git a/remoting/client/host_connection.h b/remoting/client/host_connection.h index 74c2a6d..7121025 100644 --- a/remoting/client/host_connection.h +++ b/remoting/client/host_connection.h @@ -7,7 +7,8 @@ #include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "remoting/protocol/messages_decoder.h" +#include "remoting/proto/internal.pb.h" +#include "remoting/protocol/message_decoder.h" namespace remoting { diff --git a/remoting/client/jingle_host_connection.cc b/remoting/client/jingle_host_connection.cc index 76c17d5..9a5fe65 100644 --- a/remoting/client/jingle_host_connection.cc +++ b/remoting/client/jingle_host_connection.cc @@ -5,6 +5,8 @@ #include "base/callback.h" #include "base/message_loop.h" #include "remoting/base/constants.h" +// TODO(hclam): Remove this header once MessageDispatcher is used. +#include "remoting/base/multiple_array_input_stream.h" #include "remoting/client/client_config.h" #include "remoting/client/jingle_host_connection.h" #include "remoting/jingle_glue/jingle_thread.h" @@ -149,7 +151,7 @@ void JingleHostConnection::OnConnectionStateChange( case ChromotocolConnection::CONNECTED: // Initialize reader and writer. event_writer_.Init(connection_->event_channel()); - video_reader_.Init( + video_reader_.Init<ChromotingHostMessage>( connection_->video_channel(), NewCallback(this, &JingleHostConnection::OnVideoMessage)); event_callback_->OnConnectionOpened(this); diff --git a/remoting/client/jingle_host_connection.h b/remoting/client/jingle_host_connection.h index ee5240f..df37702 100644 --- a/remoting/client/jingle_host_connection.h +++ b/remoting/client/jingle_host_connection.h @@ -25,9 +25,9 @@ #include "remoting/client/client_context.h" #include "remoting/client/host_connection.h" #include "remoting/jingle_glue/jingle_client.h" +#include "remoting/protocol/message_reader.h" #include "remoting/protocol/chromotocol_connection.h" #include "remoting/protocol/chromotocol_server.h" -#include "remoting/protocol/stream_reader.h" #include "remoting/protocol/stream_writer.h" class MessageLoop; @@ -85,7 +85,7 @@ class JingleHostConnection : public HostConnection, scoped_refptr<ChromotocolConnection> connection_; EventStreamWriter event_writer_; - VideoStreamReader video_reader_; + MessageReader video_reader_; HostEventCallback* event_callback_; diff --git a/remoting/host/chromoting_host.cc b/remoting/host/chromoting_host.cc index 113e3b0..21a95be 100644 --- a/remoting/host/chromoting_host.cc +++ b/remoting/host/chromoting_host.cc @@ -14,7 +14,6 @@ #include "remoting/host/event_executor.h" #include "remoting/host/host_config.h" #include "remoting/host/session_manager.h" -#include "remoting/protocol/messages_decoder.h" #include "remoting/protocol/jingle_chromotocol_server.h" namespace remoting { diff --git a/remoting/host/client_connection.cc b/remoting/host/client_connection.cc index 44d3fbf..8063d04 100644 --- a/remoting/host/client_connection.cc +++ b/remoting/host/client_connection.cc @@ -2,11 +2,12 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// TODO(hclam): Remove this header once MessageDispatcher is used. +#include "remoting/base/multiple_array_input_stream.h" #include "remoting/host/client_connection.h" #include "google/protobuf/message.h" #include "net/base/io_buffer.h" -#include "remoting/protocol/messages_decoder.h" #include "remoting/protocol/util.h" namespace remoting { @@ -85,7 +86,7 @@ ClientConnection::ClientConnection() {} void ClientConnection::OnConnectionStateChange( ChromotocolConnection::State state) { if (state == ChromotocolConnection::CONNECTED) { - event_reader_.Init( + event_reader_.Init<ChromotingClientMessage>( connection_->event_channel(), NewCallback(this, &ClientConnection::OnMessageReceived)); video_writer_.Init(connection_->video_channel()); diff --git a/remoting/host/client_connection.h b/remoting/host/client_connection.h index d8cf305..69283e2 100644 --- a/remoting/host/client_connection.h +++ b/remoting/host/client_connection.h @@ -13,7 +13,7 @@ #include "base/scoped_ptr.h" #include "remoting/proto/internal.pb.h" #include "remoting/protocol/chromotocol_connection.h" -#include "remoting/protocol/stream_reader.h" +#include "remoting/protocol/message_reader.h" #include "remoting/protocol/stream_writer.h" namespace remoting { @@ -87,7 +87,7 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection> { // Callback for ChromotocolConnection. void OnConnectionStateChange(ChromotocolConnection::State state); - // Callback for EventsStreamReader. + // Callback for MessageReader. void OnMessageReceived(ChromotingClientMessage* message); // Process a libjingle state change event on the |loop_|. @@ -101,7 +101,7 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection> { // The libjingle channel used to send and receive data from the remote client. scoped_refptr<ChromotocolConnection> connection_; - EventStreamReader event_reader_; + MessageReader event_reader_; VideoStreamWriter video_writer_; // The message loop that this object runs on. diff --git a/remoting/host/event_executor_linux.cc b/remoting/host/event_executor_linux.cc index 44c4c55..6f3acf2 100644 --- a/remoting/host/event_executor_linux.cc +++ b/remoting/host/event_executor_linux.cc @@ -4,7 +4,8 @@ #include "remoting/host/event_executor_linux.h" -#include "remoting/protocol/messages_decoder.h" +#include "remoting/proto/internal.pb.h" +#include "remoting/protocol/message_decoder.h" namespace remoting { diff --git a/remoting/host/mock_objects.h b/remoting/host/mock_objects.h index b02d02f..50c94ef 100644 --- a/remoting/host/mock_objects.h +++ b/remoting/host/mock_objects.h @@ -8,7 +8,7 @@ #include "remoting/host/capturer.h" #include "remoting/host/client_connection.h" #include "remoting/host/event_executor.h" -#include "remoting/protocol/messages_decoder.h" +#include "remoting/proto/internal.pb.h" #include "testing/gmock/include/gmock/gmock.h" namespace remoting { diff --git a/remoting/host/session_manager.cc b/remoting/host/session_manager.cc index 34af1c1..a61a6bdd 100644 --- a/remoting/host/session_manager.cc +++ b/remoting/host/session_manager.cc @@ -12,7 +12,7 @@ #include "remoting/base/capture_data.h" #include "remoting/base/tracer.h" #include "remoting/host/client_connection.h" -#include "remoting/protocol/messages_decoder.h" +#include "remoting/protocol/message_decoder.h" namespace remoting { diff --git a/remoting/proto/control.proto b/remoting/proto/control.proto index 299d8cc..99b5d37 100644 --- a/remoting/proto/control.proto +++ b/remoting/proto/control.proto @@ -18,7 +18,7 @@ message SuggestScreenResolutionRequest { // Represents a control message that sent from the client to the host. // This message is transmitted on the control channel. message ClientControlMessage { - optional SuggestScreenResolutionRequest suggestScreenResolutionRequest = 1; + optional SuggestScreenResolutionRequest suggest_screen_resolution_request = 1; } message SetScreenResolutionRequest { @@ -29,5 +29,5 @@ message SetScreenResolutionRequest { // Represents a control message that sent from host to the client. // This message is transmitted on the control channel. message HostControlMessage { - optional SetScreenResolutionRequest setScreenResolutionRequest = 1; + optional SetScreenResolutionRequest set_screen_resolution_request = 1; } diff --git a/remoting/protocol/host_message_dispatcher.cc b/remoting/protocol/host_message_dispatcher.cc index 834b5f8..818934f 100644 --- a/remoting/protocol/host_message_dispatcher.cc +++ b/remoting/protocol/host_message_dispatcher.cc @@ -2,27 +2,64 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "base/message_loop_proxy.h" +#include "net/base/io_buffer.h" +#include "remoting/base/multiple_array_input_stream.h" #include "remoting/proto/control.pb.h" #include "remoting/proto/event.pb.h" +#include "remoting/protocol/chromotocol_connection.h" #include "remoting/protocol/host_message_dispatcher.h" #include "remoting/protocol/host_control_message_handler.h" #include "remoting/protocol/host_event_message_handler.h" -#include "remoting/protocol/stream_reader.h" +#include "remoting/protocol/message_reader.h" namespace remoting { -HostMessageDispatcher::HostMessageDispatcher( - base::MessageLoopProxy* message_loop_proxy, - ChromotingConnection* connection, - HostControlMessageHandler* control_message_handler, - HostEventMessageHandler* event_message_handler) - : message_loop_proxy_(message_loop_proxy), - control_message_handler_(control_message_handler), - event_message_handler_(event_message_handler) { +HostMessageDispatcher::HostMessageDispatcher() { } HostMessageDispatcher::~HostMessageDispatcher() { } +bool HostMessageDispatcher::Initialize( + ChromotocolConnection* connection, + HostControlMessageHandler* control_message_handler, + HostEventMessageHandler* event_message_handler) { + if (!connection || !control_message_handler || !event_message_handler || + !connection->event_channel() || !connection->control_channel()) { + return false; + } + + control_message_reader_.reset(new MessageReader()); + event_message_reader_.reset(new MessageReader()); + control_message_handler_.reset(control_message_handler); + event_message_handler_.reset(event_message_handler); + + // Initialize the readers on the sockets provided by channels. + event_message_reader_->Init<ClientEventMessage>( + connection->event_channel(), + NewCallback(this, &HostMessageDispatcher::OnEventMessageReceived)); + control_message_reader_->Init<ClientControlMessage>( + connection->control_channel(), + NewCallback(this, &HostMessageDispatcher::OnControlMessageReceived)); + return true; +} + +void HostMessageDispatcher::OnControlMessageReceived( + ClientControlMessage* message) { + scoped_refptr<RefCountedMessage<ClientControlMessage> > ref_msg = + new RefCountedMessage<ClientControlMessage>(message); + if (message->has_suggest_screen_resolution_request()) { + control_message_handler_->OnSuggestScreenResolutionRequest( + message->suggest_screen_resolution_request(), + NewRunnableFunction( + &DeleteMessage<RefCountedMessage<ClientControlMessage> >, + ref_msg)); + } +} + +void HostMessageDispatcher::OnEventMessageReceived( + ClientEventMessage* message) { + // TODO(hclam): Implement. +} + } // namespace remoting diff --git a/remoting/protocol/host_message_dispatcher.h b/remoting/protocol/host_message_dispatcher.h index aab7fbf..23dc55a 100644 --- a/remoting/protocol/host_message_dispatcher.h +++ b/remoting/protocol/host_message_dispatcher.h @@ -9,58 +9,73 @@ #include "base/ref_counted.h" #include "base/scoped_ptr.h" -namespace base { -class MessageLoopProxy; -} // namespace base - namespace remoting { -class ChromotingClientMessage; -class ChromotingConnection; -class EventStreamReader; +class ChromotocolConnection; +class ClientControlMessage; +class ClientEventMessage; class HostControlMessageHandler; class HostEventMessageHandler; +class MessageReader; // A message dispatcher used to listen for messages received in -// ChromotingConnection. It dispatches messages to the corresponding +// ChromotocolConnection. It dispatches messages to the corresponding // handler. // // Internally it contains an EventStreamReader that decodes data on // communications channels into protocol buffer messages. -// EventStreamReader is registered with ChromotingConnection given to it. +// EventStreamReader is registered with ChromotocolConnection given to it. // // Object of this class is owned by ChromotingHost to dispatch messages // to itself. -class HostMessageDispatcher { +class HostMessageDispatcher : + public base::RefCountedThreadSafe<HostMessageDispatcher> { public: - // Construct a message dispatcher that dispatches messages received - // in ChromotingConnection. - HostMessageDispatcher(base::MessageLoopProxy* message_loop_proxy, - ChromotingConnection* connection, - HostControlMessageHandler* control_message_handler, - HostEventMessageHandler* event_message_handler); - + // Construct a message dispatcher. + HostMessageDispatcher(); virtual ~HostMessageDispatcher(); + // Initialize the message dispatcher with the given connection and + // message handlers. + // Return true if initalization was successful. + bool Initialize(ChromotocolConnection* connection, + HostControlMessageHandler* control_message_handler, + HostEventMessageHandler* event_message_handler); + private: + // A single protobuf can contain multiple messages that will be handled by + // different message handlers. We use this wrapper to ensure that the + // protobuf is only deleted after all the handlers have finished executing. + template <typename T> + class RefCountedMessage : public base::RefCounted<RefCountedMessage<T> > { + public: + RefCountedMessage(T* message) : message_(message) { } + + T* message() { return message_.get(); } + + private: + scoped_ptr<T> message_; + }; + // This method is called by |control_channel_reader_| when a control // message is received. - void OnControlMessageReceived(ChromotingClientMessage* message); + void OnControlMessageReceived(ClientControlMessage* message); // This method is called by |event_channel_reader_| when a event // message is received. - void OnEventMessageReceived(ChromotingClientMessage* message); + void OnEventMessageReceived(ClientEventMessage* message); - // Message loop to dispatch the messages. - scoped_refptr<base::MessageLoopProxy> message_loop_proxy_; + // Dummy methods to destroy messages. + template <class T> + static void DeleteMessage(scoped_refptr<T> message) { } - // EventStreamReader that runs on the control channel. It runs a loop - // that parses data on the channel and then delegate the message to this + // MessageReader that runs on the control channel. It runs a loop + // that parses data on the channel and then delegates the message to this // class. - scoped_ptr<EventStreamReader> control_channel_reader_; + scoped_ptr<MessageReader> control_message_reader_; - // EventStreamReader that runs on the event channel. - scoped_ptr<EventStreamReader> event_channel_reader_; + // MessageReader that runs on the event channel. + scoped_ptr<MessageReader> event_message_reader_; // Event handlers for control channel and event channel respectively. // Method calls to these objects are made on the message loop given. diff --git a/remoting/protocol/message_decoder.cc b/remoting/protocol/message_decoder.cc new file mode 100644 index 0000000..4a31ee1 --- /dev/null +++ b/remoting/protocol/message_decoder.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/message_decoder.h" + +#include "base/logging.h" +#include "net/base/io_buffer.h" +#include "remoting/base/multiple_array_input_stream.h" +#include "remoting/proto/internal.pb.h" +#include "talk/base/byteorder.h" + +namespace remoting { + +MessageDecoder::MessageDecoder() + : available_bytes_(0), + next_payload_(0), + next_payload_known_(false) { +} + +MessageDecoder::~MessageDecoder() {} + +void MessageDecoder::AddBuffer(scoped_refptr<net::IOBuffer> data, + int data_size) { + buffer_list_.push_back(new net::DrainableIOBuffer(data, data_size)); + available_bytes_ += data_size; +} + +MultipleArrayInputStream* MessageDecoder::CreateInputStreamFromData() { + // Determine the payload size. If we already know it then skip this part. + // We may not have enough data to determine the payload size so use a + // utility function to find out. + int next_payload = -1; + if (!next_payload_known_ && GetPayloadSize(&next_payload)) { + DCHECK_NE(-1, next_payload); + next_payload_ = next_payload; + next_payload_known_ = true; + } + + // If the next payload size is still not known or we don't have enough + // data for parsing then exit. + if (!next_payload_known_ || available_bytes_ < next_payload_) + return NULL; + next_payload_known_ = false; + + // The following loop gather buffers in |buffer_list_| that sum up to + // |next_payload_| bytes. These buffers are added to |stream|. + + // Create a MultipleArrayInputStream for parsing. + // TODO(hclam): Avoid creating this object everytime. + MultipleArrayInputStream* stream = new MultipleArrayInputStream(); + while (next_payload_ > 0 && !buffer_list_.empty()) { + scoped_refptr<net::DrainableIOBuffer> buffer = buffer_list_.front(); + int read_bytes = std::min(buffer->BytesRemaining(), next_payload_); + + // This call creates a new instance of DrainableIOBuffer internally. + // This will reference the same base pointer but maintain it's own + // version of data pointer. + stream->AddBuffer(buffer, read_bytes); + + // Adjust counters. + buffer->DidConsume(read_bytes); + next_payload_ -= read_bytes; + available_bytes_ -= read_bytes; + + // If the front buffer is fully read then remove it from the queue. + if (!buffer->BytesRemaining()) + buffer_list_.pop_front(); + } + DCHECK_EQ(0, next_payload_); + DCHECK_LE(0, available_bytes_); + return stream; +} + +static int GetHeaderSize(const std::string& header) { + return header.length(); +} + +bool MessageDecoder::GetPayloadSize(int* size) { + // The header has a size of 4 bytes. + const int kHeaderSize = sizeof(int32); + + if (available_bytes_ < kHeaderSize) + return false; + + std::string header; + while (GetHeaderSize(header) < kHeaderSize && !buffer_list_.empty()) { + scoped_refptr<net::DrainableIOBuffer> buffer = buffer_list_.front(); + + // Find out how many bytes we need and how many bytes are available in this + // buffer. + int needed_bytes = kHeaderSize - GetHeaderSize(header); + int available_bytes = buffer->BytesRemaining(); + + // Then append the required bytes into the header and advance the last + // read position. + int read_bytes = std::min(needed_bytes, available_bytes); + header.append(buffer->data(), read_bytes); + buffer->DidConsume(read_bytes); + available_bytes_ -= read_bytes; + + // If the buffer is depleted then remove it from the queue. + if (!buffer->BytesRemaining()) { + buffer_list_.pop_front(); + } + } + + *size = talk_base::GetBE32(header.c_str()); + return true; +} + +} // namespace remoting diff --git a/remoting/protocol/message_decoder.h b/remoting/protocol/message_decoder.h new file mode 100644 index 0000000..d28d9e0 --- /dev/null +++ b/remoting/protocol/message_decoder.h @@ -0,0 +1,110 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_ +#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_ + +#include <deque> +#include <list> + +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" +#include "google/protobuf/message_lite.h" + +namespace net { +class DrainableIOBuffer; +class IOBuffer; +} // namespace net + +namespace remoting { + +class ChromotingClientMessage; +class ChromotingHostMessage; +class ClientControlMessage; +class ClientEventMessage; +class HostControlMessage; +class HostEventMessage; +class MultipleArrayInputStream; + +// MessageDecoder uses MultipleArrayInputStream to decode bytes into +// protocol buffer messages. This can be used to decode bytes received from +// the network. +// +// It provides ParseMessages() which accepts an IOBuffer. If enough bytes +// are collected to produce protocol buffer messages then the bytes will be +// consumed and the generated protocol buffer messages are added to the output +// list. +// +// It retains ownership of IOBuffer given to this object and keeps it alive +// until it is full consumed. +class MessageDecoder { + public: + MessageDecoder(); + virtual ~MessageDecoder(); + + // Parses the bytes in |data| into a protobuf of type MessageType. The bytes + // in |data| are conceptually a stream of bytes to be parsed into a series of + // protobufs. Each parsed protouf is appended into the |messages|. All calls + // to ParseMessages should use same MessageType for any single instace of + // MessageDecoder. + + // This function retains |data| until all its bytes are consumed. + // Ownership of the produced protobufs is passed to the caller via the + // |messages| list. + template <class MessageType> + void ParseMessages(scoped_refptr<net::IOBuffer> data, + int data_size, std::list<MessageType*>* messages) { + AddBuffer(data, data_size); + + // Then try to parse the next message until we can't parse anymore. + MessageType* message; + while (ParseOneMessage<MessageType>(&message)) { + messages->push_back(message); + } + } + + private: + // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer + // instead of storing chunks in dqueue. + typedef std::deque<scoped_refptr<net::DrainableIOBuffer> > BufferList; + + // Parse one message from |buffer_list_|. Return true if sucessful. + template <class MessageType> + bool ParseOneMessage(MessageType** message) { + scoped_ptr<MultipleArrayInputStream> stream(CreateInputStreamFromData()); + if (!stream.get()) + return false; + + *message = new MessageType(); + bool ret = (*message)->ParseFromZeroCopyStream(stream.get()); + if (!ret) { + delete *message; + } + return ret; + } + + void AddBuffer(scoped_refptr<net::IOBuffer> data, int data_size); + + MultipleArrayInputStream* CreateInputStreamFromData(); + + // Retrieves the read payload size of the current protocol buffer via |size|. + // Returns false and leaves |size| unmodified, if we do not have enough data + // to retrieve the current size. + bool GetPayloadSize(int* size); + + BufferList buffer_list_; + + // The number of bytes in |buffer_list_| not consumed. + int available_bytes_; + + // |next_payload_| stores the size of the next payload if known. + // |next_payload_known_| is true if the size of the next payload is known. + // After one payload is read this is reset to false. + int next_payload_; + bool next_payload_known_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_ diff --git a/remoting/protocol/messages_decoder_unittest.cc b/remoting/protocol/message_decoder_unittest.cc index 7c5616d..c2772c8 100644 --- a/remoting/protocol/messages_decoder_unittest.cc +++ b/remoting/protocol/message_decoder_unittest.cc @@ -7,7 +7,9 @@ #include "base/scoped_ptr.h" #include "base/stl_util-inl.h" #include "net/base/io_buffer.h" -#include "remoting/protocol/messages_decoder.h" +#include "remoting/base/multiple_array_input_stream.h" +#include "remoting/proto/internal.pb.h" +#include "remoting/protocol/message_decoder.h" #include "remoting/protocol/util.h" #include "testing/gtest/include/gtest/gtest.h" @@ -60,44 +62,42 @@ static void PrepareData(uint8** buffer, int* size) { memcpy(*buffer, encoded_data.c_str(), *size); } -TEST(MessagesDecoderTest, BasicOperations) { +void SimulateReadSequence(const int read_sequence[], int sequence_size) { // Prepare encoded data for testing. int size; uint8* test_data; PrepareData(&test_data, &size); scoped_array<uint8> memory_deleter(test_data); - // Then simulate using MessagesDecoder to decode variable + // Then simulate using MessageDecoder to decode variable // size of encoded data. // The first thing to do is to generate a variable size of data. This is done // by iterating the following array for read sizes. - const int kReadSizes[] = {1, 2, 3, 1}; - - MessagesDecoder decoder; + MessageDecoder decoder; // Then feed the protocol decoder using the above generated data and the // read pattern. - HostMessageList message_list; + std::list<ChromotingHostMessage*> message_list; for (int i = 0; i < size;) { // First generate the amount to feed the decoder. - int read = std::min(size - i, kReadSizes[i % arraysize(kReadSizes)]); + int read = std::min(size - i, read_sequence[i % sequence_size]); - // And then prepare a DataBuffer for feeding it. + // And then prepare an IOBuffer for feeding it. scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(read); memcpy(buffer->data(), test_data + i, read); - decoder.ParseHostMessages(buffer, read, &message_list); + decoder.ParseMessages(buffer, read, &message_list); i += read; } // Then verify the decoded messages. EXPECT_EQ(31u, message_list.size()); - ASSERT_TRUE(message_list.size() > 0); EXPECT_TRUE(message_list.front()->has_init_client()); delete message_list.front(); message_list.pop_front(); int index = 0; - for (HostMessageList::iterator it = message_list.begin(); + for (std::list<ChromotingHostMessage*>::iterator it = + message_list.begin(); it != message_list.end(); ++it) { ChromotingHostMessage* message = *it; int type = index % 3; @@ -118,4 +118,19 @@ TEST(MessagesDecoderTest, BasicOperations) { STLDeleteElements(&message_list); } +TEST(MessageDecoderTest, SmallReads) { + const int kReads[] = {1, 2, 3, 1}; + SimulateReadSequence(kReads, arraysize(kReads)); +} + +TEST(MessageDecoderTest, LargeReads) { + const int kReads[] = {50, 50, 5}; + SimulateReadSequence(kReads, arraysize(kReads)); +} + +TEST(MessageDecoderTest, EmptyReads) { + const int kReads[] = {4, 0, 50, 0}; + SimulateReadSequence(kReads, arraysize(kReads)); +} + } // namespace remoting diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc new file mode 100644 index 0000000..79e2b7a --- /dev/null +++ b/remoting/protocol/message_reader.cc @@ -0,0 +1,69 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/message_reader.h" + +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" +#include "remoting/base/multiple_array_input_stream.h" +#include "remoting/proto/internal.pb.h" +#include "remoting/protocol/chromotocol_connection.h" + +namespace remoting { + +static const int kReadBufferSize = 4096; + +MessageReader::MessageReader() + : socket_(NULL), + closed_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &MessageReader::OnRead)) { +} + +MessageReader::~MessageReader() { + // Destroy MessageReaderPrivate if it was created. + if (destruction_callback_.get()) + destruction_callback_->Run(); +} + +void MessageReader::Close() { + closed_ = true; +} + +void MessageReader::Init(net::Socket* socket) { + DCHECK(socket); + socket_ = socket; + DoRead(); +} + +void MessageReader::DoRead() { + while (true) { + read_buffer_ = new net::IOBuffer(kReadBufferSize); + int result = socket_->Read( + read_buffer_, kReadBufferSize, &read_callback_); + HandleReadResult(result); + if (result < 0) + break; + } +} + +void MessageReader::OnRead(int result) { + if (!closed_) { + HandleReadResult(result); + DoRead(); + } +} + +void MessageReader::HandleReadResult(int result) { + if (result > 0) { + data_received_callback_->Run(read_buffer_, result); + } else { + if (result != net::ERR_IO_PENDING) + LOG(ERROR) << "Read() returned error " << result; + } +} + +} // namespace remoting diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h new file mode 100644 index 0000000..d3540bb --- /dev/null +++ b/remoting/protocol/message_reader.h @@ -0,0 +1,108 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_ +#define REMOTING_PROTOCOL_MESSAGE_READER_H_ + +#include "base/callback.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" +#include "base/task.h" +#include "net/base/completion_callback.h" +#include "remoting/protocol/message_decoder.h" + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class ChromotocolConnection; +class ChromotingClientMessage; +class ChromotingHostMessage; +class MessageReader; + +namespace internal { + +template <class T> +class MessageReaderPrivate { + private: + friend class remoting::MessageReader; + + typedef typename Callback1<T*>::Type MessageReceivedCallback; + + MessageReaderPrivate(MessageReceivedCallback* callback) + : message_received_callback_(callback) { + } + + ~MessageReaderPrivate() { } + + void OnDataReceived(net::IOBuffer* buffer, int data_size) { + typedef typename std::list<T*>::iterator MessageListIterator; + + std::list<T*> message_list; + message_decoder_.ParseMessages(buffer, data_size, &message_list); + for (MessageListIterator it = message_list.begin(); + it != message_list.end(); ++it) { + message_received_callback_->Run(*it); + } + } + + void Destroy() { + delete this; + } + + // Message decoder is used to decode bytes into protobuf message. + MessageDecoder message_decoder_; + + // Callback is called when a message is received. + scoped_ptr<MessageReceivedCallback> message_received_callback_; +}; + +} // namespace internal + +// MessageReader reads data from the socket asynchronously and uses +// MessageReaderPrivate to decode the data received. +class MessageReader { + public: + MessageReader(); + virtual ~MessageReader(); + + // Stops reading. Must be called on the same thread as Init(). + void Close(); + + // Initialize the MessageReader with a socket. If a message is received + // |callback| is called. + template <class T> + void Init(net::Socket* socket, typename Callback1<T*>::Type* callback) { + internal::MessageReaderPrivate<T>* reader = + new internal::MessageReaderPrivate<T>(callback); + data_received_callback_.reset( + ::NewCallback( + reader, &internal::MessageReaderPrivate<T>::OnDataReceived)); + destruction_callback_.reset( + ::NewCallback(reader, &internal::MessageReaderPrivate<T>::Destroy)); + Init(socket); + } + + private: + void Init(net::Socket* socket); + + void DoRead(); + void OnRead(int result); + void HandleReadResult(int result); + + net::Socket* socket_; + + bool closed_; + scoped_refptr<net::IOBuffer> read_buffer_; + net::CompletionCallbackImpl<MessageReader> read_callback_; + + scoped_ptr<Callback2<net::IOBuffer*, int>::Type> data_received_callback_; + scoped_ptr<Callback0::Type> destruction_callback_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_MESSAGE_READER_H_ diff --git a/remoting/protocol/messages_decoder.cc b/remoting/protocol/messages_decoder.cc deleted file mode 100644 index 73e9ea4..0000000 --- a/remoting/protocol/messages_decoder.cc +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "remoting/protocol/messages_decoder.h" - -#include "base/logging.h" -#include "net/base/io_buffer.h" -#include "remoting/base/multiple_array_input_stream.h" -#include "talk/base/byteorder.h" - -namespace remoting { - -MessagesDecoder::MessagesDecoder() - : last_read_position_(0), - available_bytes_(0), - next_payload_(0), - next_payload_known_(false) { -} - -MessagesDecoder::~MessagesDecoder() {} - -void MessagesDecoder::ParseClientMessages(scoped_refptr<net::IOBuffer> data, - int data_size, - ClientMessageList* messages) { - ParseMessages<ChromotingClientMessage>(data, data_size, messages); -} - -void MessagesDecoder::ParseHostMessages(scoped_refptr<net::IOBuffer> data, - int data_size, - HostMessageList* messages) { - ParseMessages<ChromotingHostMessage>(data, data_size, messages); -} - -MessagesDecoder::DataChunk::DataChunk(net::IOBuffer* data, size_t data_size) - : data(data), - data_size(data_size) { -} - -MessagesDecoder::DataChunk::~DataChunk() {} - -template <typename T> -void MessagesDecoder::ParseMessages(scoped_refptr<net::IOBuffer> data, - int data_size, - std::list<T*>* messages) { - // If this is the first data in the processing queue, then set the - // last read position to 0. - if (data_list_.empty()) - last_read_position_ = 0; - - // First enqueue the data received. - data_list_.push_back(DataChunk(data, data_size)); - available_bytes_ += data_size; - - // Then try to parse one message until we can't parse anymore. - T* message; - while (ParseOneMessage<T>(&message)) { - messages->push_back(message); - } -} - -template <typename T> -bool MessagesDecoder::ParseOneMessage(T** message) { - // Determine the payload size. If we already know it, then skip this - // part. - // We have the value set to -1 for checking later. - int next_payload = -1; - if (!next_payload_known_ && GetPayloadSize(&next_payload)) { - DCHECK_NE(-1, next_payload); - next_payload_ = next_payload; - next_payload_known_ = true; - } - - // If the next payload size is still not known or we don't have enough - // data for parsing then exit. - if (!next_payload_known_ || available_bytes_ < next_payload_) - return false; - next_payload_known_ = false; - - // Create a MultipleArrayInputStream for parsing. - MultipleArrayInputStream stream; - std::vector<scoped_refptr<net::IOBuffer> > buffers; - while (next_payload_ > 0 && !data_list_.empty()) { - DataChunk* buffer = &(data_list_.front()); - size_t read_bytes = std::min(buffer->data_size - last_read_position_, - next_payload_); - - buffers.push_back(buffer->data); - stream.AddBuffer(buffer->data->data() + last_read_position_, read_bytes); - - // Adjust counters. - last_read_position_ += read_bytes; - next_payload_ -= read_bytes; - available_bytes_ -= read_bytes; - - // If the front buffer is fully read, remove it from the queue. - if (buffer->data_size == last_read_position_) { - data_list_.pop_front(); - last_read_position_ = 0; - } - } - DCHECK_EQ(0UL, next_payload_); - - // And finally it is parsing. - *message = new T(); - bool ret = (*message)->ParseFromZeroCopyStream(&stream); - if (!ret) { - LOG(ERROR) << "Received invalid message."; - delete *message; - } - return ret; -} - -bool MessagesDecoder::GetPayloadSize(int* size) { - // The header has a size of 4 bytes. - const size_t kHeaderSize = sizeof(int32); - - if (available_bytes_ < kHeaderSize) - return false; - - std::string header; - while (header.length() < kHeaderSize && !data_list_.empty()) { - DataChunk* buffer = &(data_list_.front()); - - // Find out how many bytes we need and how many bytes are available in this - // buffer. - int needed_bytes = kHeaderSize - header.length(); - int available_bytes = buffer->data_size - last_read_position_; - - // Then append the required bytes into the header and advance the last - // read position. - int read_bytes = std::min(needed_bytes, available_bytes); - header.append( - reinterpret_cast<char*>(buffer->data->data()) + last_read_position_, - read_bytes); - last_read_position_ += read_bytes; - available_bytes_ -= read_bytes; - - // If the buffer is depleted then remove it from the queue. - if (last_read_position_ == buffer->data_size) { - last_read_position_ = 0; - data_list_.pop_front(); - } - } - - if (header.length() == kHeaderSize) { - *size = talk_base::GetBE32(header.c_str()); - return true; - } - NOTREACHED() << "Unable to extract payload size"; - return false; -} - -} // namespace remoting diff --git a/remoting/protocol/messages_decoder.h b/remoting/protocol/messages_decoder.h deleted file mode 100644 index b22f6ba..0000000 --- a/remoting/protocol/messages_decoder.h +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_ -#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_ - -#include <deque> -#include <list> - -#include "base/ref_counted.h" -#include "google/protobuf/message_lite.h" -#include "remoting/proto/internal.pb.h" - -namespace net { -class IOBuffer; -} - -namespace remoting { - -typedef std::list<ChromotingHostMessage*> HostMessageList; -typedef std::list<ChromotingClientMessage*> ClientMessageList; - -// A protocol decoder is used to decode data transmitted in the chromoting -// network. -// TODO(hclam): Defines the interface and implement methods. -class MessagesDecoder { - public: - MessagesDecoder(); - virtual ~MessagesDecoder(); - - // Parse data received from network into ClientMessages. Output is written - // to |messages|. - virtual void ParseClientMessages(scoped_refptr<net::IOBuffer> data, - int data_size, - ClientMessageList* messages); - - // Parse data received from network into HostMessages. Output is - // written to |messages|. - virtual void ParseHostMessages(scoped_refptr<net::IOBuffer> data, - int data_size, - HostMessageList* messages); - - private: - // DataChunk stores reference to a net::IOBuffer and size of the data - // stored in that buffer. - struct DataChunk { - DataChunk(net::IOBuffer* data, size_t data_size); - ~DataChunk(); - - scoped_refptr<net::IOBuffer> data; - size_t data_size; - }; - - // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer - // instead of storing chunks in dqueue. - typedef std::deque<DataChunk> DataList; - - // A private method used to parse data received from network into protocol - // buffers. - template <typename T> - void ParseMessages(scoped_refptr<net::IOBuffer> data, - int data_size, - std::list<T*>* messages); - - // Parse one message from |data_list_|. Return true if sucessful. - template <typename T> - bool ParseOneMessage(T** messages); - - // A utility method to read payload size of the protocol buffer from the - // data list. Return false if we don't have enough data. - bool GetPayloadSize(int* size); - - DataList data_list_; - size_t last_read_position_; - - // Count the number of bytes in |data_list_| not read. - size_t available_bytes_; - - // Stores the size of the next payload if known. - size_t next_payload_; - - // True if the size of the next payload is known. After one payload is read, - // this is reset to false. - bool next_payload_known_; -}; - -} // namespace remoting - -#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_ diff --git a/remoting/protocol/socket_reader_base.h b/remoting/protocol/socket_reader_base.h index 9c3d805..7dd3dee 100644 --- a/remoting/protocol/socket_reader_base.h +++ b/remoting/protocol/socket_reader_base.h @@ -7,9 +7,9 @@ #include "base/ref_counted.h" #include "net/base/completion_callback.h" -#include "remoting/protocol/messages_decoder.h" namespace net { +class IOBuffer; class Socket; } // namespace net diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc deleted file mode 100644 index 23d3dec..0000000 --- a/remoting/protocol/stream_reader.cc +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "remoting/protocol/stream_reader.h" - -#include "net/base/completion_callback.h" -#include "net/base/io_buffer.h" - -namespace remoting { - -// EventStreamReader class. -EventStreamReader::EventStreamReader() { } -EventStreamReader::~EventStreamReader() { } - -void EventStreamReader::Init(net::Socket* socket, - OnMessageCallback* on_message_callback) { - on_message_callback_.reset(on_message_callback); - SocketReaderBase::Init(socket); -} - -void EventStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { - ClientMessageList messages_list; - messages_decoder_.ParseClientMessages(buffer, data_size, &messages_list); - for (ClientMessageList::iterator it = messages_list.begin(); - it != messages_list.end(); ++it) { - on_message_callback_->Run(*it); - } -} - -// VideoStreamReader class. -VideoStreamReader::VideoStreamReader() { } -VideoStreamReader::~VideoStreamReader() { } - -void VideoStreamReader::Init(net::Socket* socket, - OnMessageCallback* on_message_callback) { - on_message_callback_.reset(on_message_callback); - SocketReaderBase::Init(socket); -} - -void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { - HostMessageList messages_list; - messages_decoder_.ParseHostMessages(buffer, data_size, &messages_list); - for (HostMessageList::iterator it = messages_list.begin(); - it != messages_list.end(); ++it) { - on_message_callback_->Run(*it); - } -} - -} // namespace remoting diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h deleted file mode 100644 index 7f8e004..0000000 --- a/remoting/protocol/stream_reader.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef REMOTING_PROTOCOL_STREAM_READER_H_ -#define REMOTING_PROTOCOL_STREAM_READER_H_ - -#include "base/callback.h" -#include "base/scoped_ptr.h" -#include "remoting/protocol/socket_reader_base.h" - -namespace remoting { - -class EventStreamReader : public SocketReaderBase { - public: - EventStreamReader(); - ~EventStreamReader(); - - // The OnMessageCallback is called whenever a new message is received. - // Ownership of the message is passed the callback. - typedef Callback1<ChromotingClientMessage*>::Type OnMessageCallback; - - // Initialize the reader and start reading. Must be called on the thread - // |socket| belongs to. The callback will be called when a new message is - // received. EventStreamReader owns |on_message_callback|, doesn't own - // |socket|. - void Init(net::Socket* socket, OnMessageCallback* on_message_callback); - - protected: - virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); - - private: - MessagesDecoder messages_decoder_; - scoped_ptr<OnMessageCallback> on_message_callback_; - - DISALLOW_COPY_AND_ASSIGN(EventStreamReader); -}; - -class VideoStreamReader : public SocketReaderBase { - public: - VideoStreamReader(); - ~VideoStreamReader(); - - // The OnMessageCallback is called whenever a new message is received. - // Ownership of the message is passed the callback. - typedef Callback1<ChromotingHostMessage*>::Type OnMessageCallback; - - // Initialize the reader and start reading. Must be called on the thread - // |socket| belongs to. The callback will be called when a new message is - // received. VideoStreamReader owns |on_message_callback|, doesn't own - // |socket|. - void Init(net::Socket* socket, OnMessageCallback* on_message_callback); - - protected: - virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); - - private: - MessagesDecoder messages_decoder_; - scoped_ptr<OnMessageCallback> on_message_callback_; - - DISALLOW_COPY_AND_ASSIGN(VideoStreamReader); -}; - -} // namespace remoting - -#endif // REMOTING_PROTOCOL_STREAM_READER_H_ diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h index cc1c4b1..75f6180 100644 --- a/remoting/protocol/stream_writer.h +++ b/remoting/protocol/stream_writer.h @@ -5,8 +5,8 @@ #ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_ #define REMOTING_PROTOCOL_STREAM_WRITER_H_ +#include "remoting/proto/internal.pb.h" #include "remoting/protocol/buffered_socket_writer.h" -#include "remoting/protocol/messages_decoder.h" namespace remoting { diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp index 84794e1..ec32165 100644 --- a/remoting/remoting.gyp +++ b/remoting/remoting.gyp @@ -361,12 +361,14 @@ 'protocol/host_event_message_handler.h', 'protocol/host_message_dispatcher.cc', 'protocol/host_message_dispatcher.h', + 'protocol/message_decoder.cc', + 'protocol/message_decoder.h', + 'protocol/message_reader.cc', + 'protocol/message_reader.h', 'protocol/jingle_chromotocol_connection.cc', 'protocol/jingle_chromotocol_connection.h', 'protocol/jingle_chromotocol_server.cc', 'protocol/jingle_chromotocol_server.h', - 'protocol/messages_decoder.cc', - 'protocol/messages_decoder.h', 'protocol/rtp_reader.cc', 'protocol/rtp_reader.h', 'protocol/rtp_utils.cc', @@ -375,8 +377,6 @@ 'protocol/rtp_writer.h', 'protocol/socket_reader_base.cc', 'protocol/socket_reader_base.h', - 'protocol/stream_reader.cc', - 'protocol/stream_reader.h', 'protocol/stream_writer.cc', 'protocol/stream_writer.h', 'protocol/util.cc', @@ -448,7 +448,7 @@ 'jingle_glue/mock_objects.h', 'jingle_glue/stream_socket_adapter_unittest.cc', 'protocol/jingle_chromotocol_connection_unittest.cc', - 'protocol/messages_decoder_unittest.cc', + 'protocol/message_decoder_unittest.cc', 'protocol/fake_connection.cc', 'protocol/fake_connection.h', 'protocol/session_manager_pair.cc', |