summaryrefslogtreecommitdiffstats
path: root/remoting/protocol
diff options
context:
space:
mode:
authorhclam@chromium.org <hclam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-10-28 18:43:37 +0000
committerhclam@chromium.org <hclam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-10-28 18:43:37 +0000
commit4d10edeb5f3db4366b4521c2346bfb4741c21e6f (patch)
tree98a0a1d7d295cc31c0840977dc15134c1271de31 /remoting/protocol
parent45b10f0917d6449aa53f66a6ea7fa6e8a4aea079 (diff)
downloadchromium_src-4d10edeb5f3db4366b4521c2346bfb4741c21e6f.zip
chromium_src-4d10edeb5f3db4366b4521c2346bfb4741c21e6f.tar.gz
chromium_src-4d10edeb5f3db4366b4521c2346bfb4741c21e6f.tar.bz2
HostMessageDispatcher to parse control messages
Changed MessageReader and MessageDecoder to support parsing in HostMessageDispatcher. BUG=None TEST=None Review URL: http://codereview.chromium.org/4017002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@64283 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting/protocol')
-rw-r--r--remoting/protocol/host_message_dispatcher.cc57
-rw-r--r--remoting/protocol/host_message_dispatcher.h67
-rw-r--r--remoting/protocol/message_decoder.cc112
-rw-r--r--remoting/protocol/message_decoder.h110
-rw-r--r--remoting/protocol/message_decoder_unittest.cc (renamed from remoting/protocol/messages_decoder_unittest.cc)39
-rw-r--r--remoting/protocol/message_reader.cc69
-rw-r--r--remoting/protocol/message_reader.h108
-rw-r--r--remoting/protocol/messages_decoder.cc154
-rw-r--r--remoting/protocol/messages_decoder.h90
-rw-r--r--remoting/protocol/socket_reader_base.h2
-rw-r--r--remoting/protocol/stream_reader.cc50
-rw-r--r--remoting/protocol/stream_reader.h66
-rw-r--r--remoting/protocol/stream_writer.h2
13 files changed, 516 insertions, 410 deletions
diff --git a/remoting/protocol/host_message_dispatcher.cc b/remoting/protocol/host_message_dispatcher.cc
index 834b5f8..818934f 100644
--- a/remoting/protocol/host_message_dispatcher.cc
+++ b/remoting/protocol/host_message_dispatcher.cc
@@ -2,27 +2,64 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "base/message_loop_proxy.h"
+#include "net/base/io_buffer.h"
+#include "remoting/base/multiple_array_input_stream.h"
#include "remoting/proto/control.pb.h"
#include "remoting/proto/event.pb.h"
+#include "remoting/protocol/chromotocol_connection.h"
#include "remoting/protocol/host_message_dispatcher.h"
#include "remoting/protocol/host_control_message_handler.h"
#include "remoting/protocol/host_event_message_handler.h"
-#include "remoting/protocol/stream_reader.h"
+#include "remoting/protocol/message_reader.h"
namespace remoting {
-HostMessageDispatcher::HostMessageDispatcher(
- base::MessageLoopProxy* message_loop_proxy,
- ChromotingConnection* connection,
- HostControlMessageHandler* control_message_handler,
- HostEventMessageHandler* event_message_handler)
- : message_loop_proxy_(message_loop_proxy),
- control_message_handler_(control_message_handler),
- event_message_handler_(event_message_handler) {
+HostMessageDispatcher::HostMessageDispatcher() {
}
HostMessageDispatcher::~HostMessageDispatcher() {
}
+bool HostMessageDispatcher::Initialize(
+ ChromotocolConnection* connection,
+ HostControlMessageHandler* control_message_handler,
+ HostEventMessageHandler* event_message_handler) {
+ if (!connection || !control_message_handler || !event_message_handler ||
+ !connection->event_channel() || !connection->control_channel()) {
+ return false;
+ }
+
+ control_message_reader_.reset(new MessageReader());
+ event_message_reader_.reset(new MessageReader());
+ control_message_handler_.reset(control_message_handler);
+ event_message_handler_.reset(event_message_handler);
+
+ // Initialize the readers on the sockets provided by channels.
+ event_message_reader_->Init<ClientEventMessage>(
+ connection->event_channel(),
+ NewCallback(this, &HostMessageDispatcher::OnEventMessageReceived));
+ control_message_reader_->Init<ClientControlMessage>(
+ connection->control_channel(),
+ NewCallback(this, &HostMessageDispatcher::OnControlMessageReceived));
+ return true;
+}
+
+void HostMessageDispatcher::OnControlMessageReceived(
+ ClientControlMessage* message) {
+ scoped_refptr<RefCountedMessage<ClientControlMessage> > ref_msg =
+ new RefCountedMessage<ClientControlMessage>(message);
+ if (message->has_suggest_screen_resolution_request()) {
+ control_message_handler_->OnSuggestScreenResolutionRequest(
+ message->suggest_screen_resolution_request(),
+ NewRunnableFunction(
+ &DeleteMessage<RefCountedMessage<ClientControlMessage> >,
+ ref_msg));
+ }
+}
+
+void HostMessageDispatcher::OnEventMessageReceived(
+ ClientEventMessage* message) {
+ // TODO(hclam): Implement.
+}
+
} // namespace remoting
diff --git a/remoting/protocol/host_message_dispatcher.h b/remoting/protocol/host_message_dispatcher.h
index aab7fbf..23dc55a 100644
--- a/remoting/protocol/host_message_dispatcher.h
+++ b/remoting/protocol/host_message_dispatcher.h
@@ -9,58 +9,73 @@
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
-namespace base {
-class MessageLoopProxy;
-} // namespace base
-
namespace remoting {
-class ChromotingClientMessage;
-class ChromotingConnection;
-class EventStreamReader;
+class ChromotocolConnection;
+class ClientControlMessage;
+class ClientEventMessage;
class HostControlMessageHandler;
class HostEventMessageHandler;
+class MessageReader;
// A message dispatcher used to listen for messages received in
-// ChromotingConnection. It dispatches messages to the corresponding
+// ChromotocolConnection. It dispatches messages to the corresponding
// handler.
//
// Internally it contains an EventStreamReader that decodes data on
// communications channels into protocol buffer messages.
-// EventStreamReader is registered with ChromotingConnection given to it.
+// EventStreamReader is registered with ChromotocolConnection given to it.
//
// Object of this class is owned by ChromotingHost to dispatch messages
// to itself.
-class HostMessageDispatcher {
+class HostMessageDispatcher :
+ public base::RefCountedThreadSafe<HostMessageDispatcher> {
public:
- // Construct a message dispatcher that dispatches messages received
- // in ChromotingConnection.
- HostMessageDispatcher(base::MessageLoopProxy* message_loop_proxy,
- ChromotingConnection* connection,
- HostControlMessageHandler* control_message_handler,
- HostEventMessageHandler* event_message_handler);
-
+ // Construct a message dispatcher.
+ HostMessageDispatcher();
virtual ~HostMessageDispatcher();
+ // Initialize the message dispatcher with the given connection and
+ // message handlers.
+ // Return true if initalization was successful.
+ bool Initialize(ChromotocolConnection* connection,
+ HostControlMessageHandler* control_message_handler,
+ HostEventMessageHandler* event_message_handler);
+
private:
+ // A single protobuf can contain multiple messages that will be handled by
+ // different message handlers. We use this wrapper to ensure that the
+ // protobuf is only deleted after all the handlers have finished executing.
+ template <typename T>
+ class RefCountedMessage : public base::RefCounted<RefCountedMessage<T> > {
+ public:
+ RefCountedMessage(T* message) : message_(message) { }
+
+ T* message() { return message_.get(); }
+
+ private:
+ scoped_ptr<T> message_;
+ };
+
// This method is called by |control_channel_reader_| when a control
// message is received.
- void OnControlMessageReceived(ChromotingClientMessage* message);
+ void OnControlMessageReceived(ClientControlMessage* message);
// This method is called by |event_channel_reader_| when a event
// message is received.
- void OnEventMessageReceived(ChromotingClientMessage* message);
+ void OnEventMessageReceived(ClientEventMessage* message);
- // Message loop to dispatch the messages.
- scoped_refptr<base::MessageLoopProxy> message_loop_proxy_;
+ // Dummy methods to destroy messages.
+ template <class T>
+ static void DeleteMessage(scoped_refptr<T> message) { }
- // EventStreamReader that runs on the control channel. It runs a loop
- // that parses data on the channel and then delegate the message to this
+ // MessageReader that runs on the control channel. It runs a loop
+ // that parses data on the channel and then delegates the message to this
// class.
- scoped_ptr<EventStreamReader> control_channel_reader_;
+ scoped_ptr<MessageReader> control_message_reader_;
- // EventStreamReader that runs on the event channel.
- scoped_ptr<EventStreamReader> event_channel_reader_;
+ // MessageReader that runs on the event channel.
+ scoped_ptr<MessageReader> event_message_reader_;
// Event handlers for control channel and event channel respectively.
// Method calls to these objects are made on the message loop given.
diff --git a/remoting/protocol/message_decoder.cc b/remoting/protocol/message_decoder.cc
new file mode 100644
index 0000000..4a31ee1
--- /dev/null
+++ b/remoting/protocol/message_decoder.cc
@@ -0,0 +1,112 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/message_decoder.h"
+
+#include "base/logging.h"
+#include "net/base/io_buffer.h"
+#include "remoting/base/multiple_array_input_stream.h"
+#include "remoting/proto/internal.pb.h"
+#include "talk/base/byteorder.h"
+
+namespace remoting {
+
+MessageDecoder::MessageDecoder()
+ : available_bytes_(0),
+ next_payload_(0),
+ next_payload_known_(false) {
+}
+
+MessageDecoder::~MessageDecoder() {}
+
+void MessageDecoder::AddBuffer(scoped_refptr<net::IOBuffer> data,
+ int data_size) {
+ buffer_list_.push_back(new net::DrainableIOBuffer(data, data_size));
+ available_bytes_ += data_size;
+}
+
+MultipleArrayInputStream* MessageDecoder::CreateInputStreamFromData() {
+ // Determine the payload size. If we already know it then skip this part.
+ // We may not have enough data to determine the payload size so use a
+ // utility function to find out.
+ int next_payload = -1;
+ if (!next_payload_known_ && GetPayloadSize(&next_payload)) {
+ DCHECK_NE(-1, next_payload);
+ next_payload_ = next_payload;
+ next_payload_known_ = true;
+ }
+
+ // If the next payload size is still not known or we don't have enough
+ // data for parsing then exit.
+ if (!next_payload_known_ || available_bytes_ < next_payload_)
+ return NULL;
+ next_payload_known_ = false;
+
+ // The following loop gather buffers in |buffer_list_| that sum up to
+ // |next_payload_| bytes. These buffers are added to |stream|.
+
+ // Create a MultipleArrayInputStream for parsing.
+ // TODO(hclam): Avoid creating this object everytime.
+ MultipleArrayInputStream* stream = new MultipleArrayInputStream();
+ while (next_payload_ > 0 && !buffer_list_.empty()) {
+ scoped_refptr<net::DrainableIOBuffer> buffer = buffer_list_.front();
+ int read_bytes = std::min(buffer->BytesRemaining(), next_payload_);
+
+ // This call creates a new instance of DrainableIOBuffer internally.
+ // This will reference the same base pointer but maintain it's own
+ // version of data pointer.
+ stream->AddBuffer(buffer, read_bytes);
+
+ // Adjust counters.
+ buffer->DidConsume(read_bytes);
+ next_payload_ -= read_bytes;
+ available_bytes_ -= read_bytes;
+
+ // If the front buffer is fully read then remove it from the queue.
+ if (!buffer->BytesRemaining())
+ buffer_list_.pop_front();
+ }
+ DCHECK_EQ(0, next_payload_);
+ DCHECK_LE(0, available_bytes_);
+ return stream;
+}
+
+static int GetHeaderSize(const std::string& header) {
+ return header.length();
+}
+
+bool MessageDecoder::GetPayloadSize(int* size) {
+ // The header has a size of 4 bytes.
+ const int kHeaderSize = sizeof(int32);
+
+ if (available_bytes_ < kHeaderSize)
+ return false;
+
+ std::string header;
+ while (GetHeaderSize(header) < kHeaderSize && !buffer_list_.empty()) {
+ scoped_refptr<net::DrainableIOBuffer> buffer = buffer_list_.front();
+
+ // Find out how many bytes we need and how many bytes are available in this
+ // buffer.
+ int needed_bytes = kHeaderSize - GetHeaderSize(header);
+ int available_bytes = buffer->BytesRemaining();
+
+ // Then append the required bytes into the header and advance the last
+ // read position.
+ int read_bytes = std::min(needed_bytes, available_bytes);
+ header.append(buffer->data(), read_bytes);
+ buffer->DidConsume(read_bytes);
+ available_bytes_ -= read_bytes;
+
+ // If the buffer is depleted then remove it from the queue.
+ if (!buffer->BytesRemaining()) {
+ buffer_list_.pop_front();
+ }
+ }
+
+ *size = talk_base::GetBE32(header.c_str());
+ return true;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/message_decoder.h b/remoting/protocol/message_decoder.h
new file mode 100644
index 0000000..d28d9e0
--- /dev/null
+++ b/remoting/protocol/message_decoder.h
@@ -0,0 +1,110 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_
+#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_
+
+#include <deque>
+#include <list>
+
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "google/protobuf/message_lite.h"
+
+namespace net {
+class DrainableIOBuffer;
+class IOBuffer;
+} // namespace net
+
+namespace remoting {
+
+class ChromotingClientMessage;
+class ChromotingHostMessage;
+class ClientControlMessage;
+class ClientEventMessage;
+class HostControlMessage;
+class HostEventMessage;
+class MultipleArrayInputStream;
+
+// MessageDecoder uses MultipleArrayInputStream to decode bytes into
+// protocol buffer messages. This can be used to decode bytes received from
+// the network.
+//
+// It provides ParseMessages() which accepts an IOBuffer. If enough bytes
+// are collected to produce protocol buffer messages then the bytes will be
+// consumed and the generated protocol buffer messages are added to the output
+// list.
+//
+// It retains ownership of IOBuffer given to this object and keeps it alive
+// until it is full consumed.
+class MessageDecoder {
+ public:
+ MessageDecoder();
+ virtual ~MessageDecoder();
+
+ // Parses the bytes in |data| into a protobuf of type MessageType. The bytes
+ // in |data| are conceptually a stream of bytes to be parsed into a series of
+ // protobufs. Each parsed protouf is appended into the |messages|. All calls
+ // to ParseMessages should use same MessageType for any single instace of
+ // MessageDecoder.
+
+ // This function retains |data| until all its bytes are consumed.
+ // Ownership of the produced protobufs is passed to the caller via the
+ // |messages| list.
+ template <class MessageType>
+ void ParseMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size, std::list<MessageType*>* messages) {
+ AddBuffer(data, data_size);
+
+ // Then try to parse the next message until we can't parse anymore.
+ MessageType* message;
+ while (ParseOneMessage<MessageType>(&message)) {
+ messages->push_back(message);
+ }
+ }
+
+ private:
+ // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer
+ // instead of storing chunks in dqueue.
+ typedef std::deque<scoped_refptr<net::DrainableIOBuffer> > BufferList;
+
+ // Parse one message from |buffer_list_|. Return true if sucessful.
+ template <class MessageType>
+ bool ParseOneMessage(MessageType** message) {
+ scoped_ptr<MultipleArrayInputStream> stream(CreateInputStreamFromData());
+ if (!stream.get())
+ return false;
+
+ *message = new MessageType();
+ bool ret = (*message)->ParseFromZeroCopyStream(stream.get());
+ if (!ret) {
+ delete *message;
+ }
+ return ret;
+ }
+
+ void AddBuffer(scoped_refptr<net::IOBuffer> data, int data_size);
+
+ MultipleArrayInputStream* CreateInputStreamFromData();
+
+ // Retrieves the read payload size of the current protocol buffer via |size|.
+ // Returns false and leaves |size| unmodified, if we do not have enough data
+ // to retrieve the current size.
+ bool GetPayloadSize(int* size);
+
+ BufferList buffer_list_;
+
+ // The number of bytes in |buffer_list_| not consumed.
+ int available_bytes_;
+
+ // |next_payload_| stores the size of the next payload if known.
+ // |next_payload_known_| is true if the size of the next payload is known.
+ // After one payload is read this is reset to false.
+ int next_payload_;
+ bool next_payload_known_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_
diff --git a/remoting/protocol/messages_decoder_unittest.cc b/remoting/protocol/message_decoder_unittest.cc
index 7c5616d..c2772c8 100644
--- a/remoting/protocol/messages_decoder_unittest.cc
+++ b/remoting/protocol/message_decoder_unittest.cc
@@ -7,7 +7,9 @@
#include "base/scoped_ptr.h"
#include "base/stl_util-inl.h"
#include "net/base/io_buffer.h"
-#include "remoting/protocol/messages_decoder.h"
+#include "remoting/base/multiple_array_input_stream.h"
+#include "remoting/proto/internal.pb.h"
+#include "remoting/protocol/message_decoder.h"
#include "remoting/protocol/util.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -60,44 +62,42 @@ static void PrepareData(uint8** buffer, int* size) {
memcpy(*buffer, encoded_data.c_str(), *size);
}
-TEST(MessagesDecoderTest, BasicOperations) {
+void SimulateReadSequence(const int read_sequence[], int sequence_size) {
// Prepare encoded data for testing.
int size;
uint8* test_data;
PrepareData(&test_data, &size);
scoped_array<uint8> memory_deleter(test_data);
- // Then simulate using MessagesDecoder to decode variable
+ // Then simulate using MessageDecoder to decode variable
// size of encoded data.
// The first thing to do is to generate a variable size of data. This is done
// by iterating the following array for read sizes.
- const int kReadSizes[] = {1, 2, 3, 1};
-
- MessagesDecoder decoder;
+ MessageDecoder decoder;
// Then feed the protocol decoder using the above generated data and the
// read pattern.
- HostMessageList message_list;
+ std::list<ChromotingHostMessage*> message_list;
for (int i = 0; i < size;) {
// First generate the amount to feed the decoder.
- int read = std::min(size - i, kReadSizes[i % arraysize(kReadSizes)]);
+ int read = std::min(size - i, read_sequence[i % sequence_size]);
- // And then prepare a DataBuffer for feeding it.
+ // And then prepare an IOBuffer for feeding it.
scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(read);
memcpy(buffer->data(), test_data + i, read);
- decoder.ParseHostMessages(buffer, read, &message_list);
+ decoder.ParseMessages(buffer, read, &message_list);
i += read;
}
// Then verify the decoded messages.
EXPECT_EQ(31u, message_list.size());
- ASSERT_TRUE(message_list.size() > 0);
EXPECT_TRUE(message_list.front()->has_init_client());
delete message_list.front();
message_list.pop_front();
int index = 0;
- for (HostMessageList::iterator it = message_list.begin();
+ for (std::list<ChromotingHostMessage*>::iterator it =
+ message_list.begin();
it != message_list.end(); ++it) {
ChromotingHostMessage* message = *it;
int type = index % 3;
@@ -118,4 +118,19 @@ TEST(MessagesDecoderTest, BasicOperations) {
STLDeleteElements(&message_list);
}
+TEST(MessageDecoderTest, SmallReads) {
+ const int kReads[] = {1, 2, 3, 1};
+ SimulateReadSequence(kReads, arraysize(kReads));
+}
+
+TEST(MessageDecoderTest, LargeReads) {
+ const int kReads[] = {50, 50, 5};
+ SimulateReadSequence(kReads, arraysize(kReads));
+}
+
+TEST(MessageDecoderTest, EmptyReads) {
+ const int kReads[] = {4, 0, 50, 0};
+ SimulateReadSequence(kReads, arraysize(kReads));
+}
+
} // namespace remoting
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc
new file mode 100644
index 0000000..79e2b7a
--- /dev/null
+++ b/remoting/protocol/message_reader.cc
@@ -0,0 +1,69 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/message_reader.h"
+
+#include "base/message_loop.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+#include "remoting/base/multiple_array_input_stream.h"
+#include "remoting/proto/internal.pb.h"
+#include "remoting/protocol/chromotocol_connection.h"
+
+namespace remoting {
+
+static const int kReadBufferSize = 4096;
+
+MessageReader::MessageReader()
+ : socket_(NULL),
+ closed_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &MessageReader::OnRead)) {
+}
+
+MessageReader::~MessageReader() {
+ // Destroy MessageReaderPrivate if it was created.
+ if (destruction_callback_.get())
+ destruction_callback_->Run();
+}
+
+void MessageReader::Close() {
+ closed_ = true;
+}
+
+void MessageReader::Init(net::Socket* socket) {
+ DCHECK(socket);
+ socket_ = socket;
+ DoRead();
+}
+
+void MessageReader::DoRead() {
+ while (true) {
+ read_buffer_ = new net::IOBuffer(kReadBufferSize);
+ int result = socket_->Read(
+ read_buffer_, kReadBufferSize, &read_callback_);
+ HandleReadResult(result);
+ if (result < 0)
+ break;
+ }
+}
+
+void MessageReader::OnRead(int result) {
+ if (!closed_) {
+ HandleReadResult(result);
+ DoRead();
+ }
+}
+
+void MessageReader::HandleReadResult(int result) {
+ if (result > 0) {
+ data_received_callback_->Run(read_buffer_, result);
+ } else {
+ if (result != net::ERR_IO_PENDING)
+ LOG(ERROR) << "Read() returned error " << result;
+ }
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
new file mode 100644
index 0000000..d3540bb
--- /dev/null
+++ b/remoting/protocol/message_reader.h
@@ -0,0 +1,108 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_
+#define REMOTING_PROTOCOL_MESSAGE_READER_H_
+
+#include "base/callback.h"
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "base/task.h"
+#include "net/base/completion_callback.h"
+#include "remoting/protocol/message_decoder.h"
+
+namespace net {
+class Socket;
+} // namespace net
+
+namespace remoting {
+
+class ChromotocolConnection;
+class ChromotingClientMessage;
+class ChromotingHostMessage;
+class MessageReader;
+
+namespace internal {
+
+template <class T>
+class MessageReaderPrivate {
+ private:
+ friend class remoting::MessageReader;
+
+ typedef typename Callback1<T*>::Type MessageReceivedCallback;
+
+ MessageReaderPrivate(MessageReceivedCallback* callback)
+ : message_received_callback_(callback) {
+ }
+
+ ~MessageReaderPrivate() { }
+
+ void OnDataReceived(net::IOBuffer* buffer, int data_size) {
+ typedef typename std::list<T*>::iterator MessageListIterator;
+
+ std::list<T*> message_list;
+ message_decoder_.ParseMessages(buffer, data_size, &message_list);
+ for (MessageListIterator it = message_list.begin();
+ it != message_list.end(); ++it) {
+ message_received_callback_->Run(*it);
+ }
+ }
+
+ void Destroy() {
+ delete this;
+ }
+
+ // Message decoder is used to decode bytes into protobuf message.
+ MessageDecoder message_decoder_;
+
+ // Callback is called when a message is received.
+ scoped_ptr<MessageReceivedCallback> message_received_callback_;
+};
+
+} // namespace internal
+
+// MessageReader reads data from the socket asynchronously and uses
+// MessageReaderPrivate to decode the data received.
+class MessageReader {
+ public:
+ MessageReader();
+ virtual ~MessageReader();
+
+ // Stops reading. Must be called on the same thread as Init().
+ void Close();
+
+ // Initialize the MessageReader with a socket. If a message is received
+ // |callback| is called.
+ template <class T>
+ void Init(net::Socket* socket, typename Callback1<T*>::Type* callback) {
+ internal::MessageReaderPrivate<T>* reader =
+ new internal::MessageReaderPrivate<T>(callback);
+ data_received_callback_.reset(
+ ::NewCallback(
+ reader, &internal::MessageReaderPrivate<T>::OnDataReceived));
+ destruction_callback_.reset(
+ ::NewCallback(reader, &internal::MessageReaderPrivate<T>::Destroy));
+ Init(socket);
+ }
+
+ private:
+ void Init(net::Socket* socket);
+
+ void DoRead();
+ void OnRead(int result);
+ void HandleReadResult(int result);
+
+ net::Socket* socket_;
+
+ bool closed_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ net::CompletionCallbackImpl<MessageReader> read_callback_;
+
+ scoped_ptr<Callback2<net::IOBuffer*, int>::Type> data_received_callback_;
+ scoped_ptr<Callback0::Type> destruction_callback_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_MESSAGE_READER_H_
diff --git a/remoting/protocol/messages_decoder.cc b/remoting/protocol/messages_decoder.cc
deleted file mode 100644
index 73e9ea4..0000000
--- a/remoting/protocol/messages_decoder.cc
+++ /dev/null
@@ -1,154 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "remoting/protocol/messages_decoder.h"
-
-#include "base/logging.h"
-#include "net/base/io_buffer.h"
-#include "remoting/base/multiple_array_input_stream.h"
-#include "talk/base/byteorder.h"
-
-namespace remoting {
-
-MessagesDecoder::MessagesDecoder()
- : last_read_position_(0),
- available_bytes_(0),
- next_payload_(0),
- next_payload_known_(false) {
-}
-
-MessagesDecoder::~MessagesDecoder() {}
-
-void MessagesDecoder::ParseClientMessages(scoped_refptr<net::IOBuffer> data,
- int data_size,
- ClientMessageList* messages) {
- ParseMessages<ChromotingClientMessage>(data, data_size, messages);
-}
-
-void MessagesDecoder::ParseHostMessages(scoped_refptr<net::IOBuffer> data,
- int data_size,
- HostMessageList* messages) {
- ParseMessages<ChromotingHostMessage>(data, data_size, messages);
-}
-
-MessagesDecoder::DataChunk::DataChunk(net::IOBuffer* data, size_t data_size)
- : data(data),
- data_size(data_size) {
-}
-
-MessagesDecoder::DataChunk::~DataChunk() {}
-
-template <typename T>
-void MessagesDecoder::ParseMessages(scoped_refptr<net::IOBuffer> data,
- int data_size,
- std::list<T*>* messages) {
- // If this is the first data in the processing queue, then set the
- // last read position to 0.
- if (data_list_.empty())
- last_read_position_ = 0;
-
- // First enqueue the data received.
- data_list_.push_back(DataChunk(data, data_size));
- available_bytes_ += data_size;
-
- // Then try to parse one message until we can't parse anymore.
- T* message;
- while (ParseOneMessage<T>(&message)) {
- messages->push_back(message);
- }
-}
-
-template <typename T>
-bool MessagesDecoder::ParseOneMessage(T** message) {
- // Determine the payload size. If we already know it, then skip this
- // part.
- // We have the value set to -1 for checking later.
- int next_payload = -1;
- if (!next_payload_known_ && GetPayloadSize(&next_payload)) {
- DCHECK_NE(-1, next_payload);
- next_payload_ = next_payload;
- next_payload_known_ = true;
- }
-
- // If the next payload size is still not known or we don't have enough
- // data for parsing then exit.
- if (!next_payload_known_ || available_bytes_ < next_payload_)
- return false;
- next_payload_known_ = false;
-
- // Create a MultipleArrayInputStream for parsing.
- MultipleArrayInputStream stream;
- std::vector<scoped_refptr<net::IOBuffer> > buffers;
- while (next_payload_ > 0 && !data_list_.empty()) {
- DataChunk* buffer = &(data_list_.front());
- size_t read_bytes = std::min(buffer->data_size - last_read_position_,
- next_payload_);
-
- buffers.push_back(buffer->data);
- stream.AddBuffer(buffer->data->data() + last_read_position_, read_bytes);
-
- // Adjust counters.
- last_read_position_ += read_bytes;
- next_payload_ -= read_bytes;
- available_bytes_ -= read_bytes;
-
- // If the front buffer is fully read, remove it from the queue.
- if (buffer->data_size == last_read_position_) {
- data_list_.pop_front();
- last_read_position_ = 0;
- }
- }
- DCHECK_EQ(0UL, next_payload_);
-
- // And finally it is parsing.
- *message = new T();
- bool ret = (*message)->ParseFromZeroCopyStream(&stream);
- if (!ret) {
- LOG(ERROR) << "Received invalid message.";
- delete *message;
- }
- return ret;
-}
-
-bool MessagesDecoder::GetPayloadSize(int* size) {
- // The header has a size of 4 bytes.
- const size_t kHeaderSize = sizeof(int32);
-
- if (available_bytes_ < kHeaderSize)
- return false;
-
- std::string header;
- while (header.length() < kHeaderSize && !data_list_.empty()) {
- DataChunk* buffer = &(data_list_.front());
-
- // Find out how many bytes we need and how many bytes are available in this
- // buffer.
- int needed_bytes = kHeaderSize - header.length();
- int available_bytes = buffer->data_size - last_read_position_;
-
- // Then append the required bytes into the header and advance the last
- // read position.
- int read_bytes = std::min(needed_bytes, available_bytes);
- header.append(
- reinterpret_cast<char*>(buffer->data->data()) + last_read_position_,
- read_bytes);
- last_read_position_ += read_bytes;
- available_bytes_ -= read_bytes;
-
- // If the buffer is depleted then remove it from the queue.
- if (last_read_position_ == buffer->data_size) {
- last_read_position_ = 0;
- data_list_.pop_front();
- }
- }
-
- if (header.length() == kHeaderSize) {
- *size = talk_base::GetBE32(header.c_str());
- return true;
- }
- NOTREACHED() << "Unable to extract payload size";
- return false;
-}
-
-} // namespace remoting
diff --git a/remoting/protocol/messages_decoder.h b/remoting/protocol/messages_decoder.h
deleted file mode 100644
index b22f6ba..0000000
--- a/remoting/protocol/messages_decoder.h
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_
-#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_
-
-#include <deque>
-#include <list>
-
-#include "base/ref_counted.h"
-#include "google/protobuf/message_lite.h"
-#include "remoting/proto/internal.pb.h"
-
-namespace net {
-class IOBuffer;
-}
-
-namespace remoting {
-
-typedef std::list<ChromotingHostMessage*> HostMessageList;
-typedef std::list<ChromotingClientMessage*> ClientMessageList;
-
-// A protocol decoder is used to decode data transmitted in the chromoting
-// network.
-// TODO(hclam): Defines the interface and implement methods.
-class MessagesDecoder {
- public:
- MessagesDecoder();
- virtual ~MessagesDecoder();
-
- // Parse data received from network into ClientMessages. Output is written
- // to |messages|.
- virtual void ParseClientMessages(scoped_refptr<net::IOBuffer> data,
- int data_size,
- ClientMessageList* messages);
-
- // Parse data received from network into HostMessages. Output is
- // written to |messages|.
- virtual void ParseHostMessages(scoped_refptr<net::IOBuffer> data,
- int data_size,
- HostMessageList* messages);
-
- private:
- // DataChunk stores reference to a net::IOBuffer and size of the data
- // stored in that buffer.
- struct DataChunk {
- DataChunk(net::IOBuffer* data, size_t data_size);
- ~DataChunk();
-
- scoped_refptr<net::IOBuffer> data;
- size_t data_size;
- };
-
- // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer
- // instead of storing chunks in dqueue.
- typedef std::deque<DataChunk> DataList;
-
- // A private method used to parse data received from network into protocol
- // buffers.
- template <typename T>
- void ParseMessages(scoped_refptr<net::IOBuffer> data,
- int data_size,
- std::list<T*>* messages);
-
- // Parse one message from |data_list_|. Return true if sucessful.
- template <typename T>
- bool ParseOneMessage(T** messages);
-
- // A utility method to read payload size of the protocol buffer from the
- // data list. Return false if we don't have enough data.
- bool GetPayloadSize(int* size);
-
- DataList data_list_;
- size_t last_read_position_;
-
- // Count the number of bytes in |data_list_| not read.
- size_t available_bytes_;
-
- // Stores the size of the next payload if known.
- size_t next_payload_;
-
- // True if the size of the next payload is known. After one payload is read,
- // this is reset to false.
- bool next_payload_known_;
-};
-
-} // namespace remoting
-
-#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_
diff --git a/remoting/protocol/socket_reader_base.h b/remoting/protocol/socket_reader_base.h
index 9c3d805..7dd3dee 100644
--- a/remoting/protocol/socket_reader_base.h
+++ b/remoting/protocol/socket_reader_base.h
@@ -7,9 +7,9 @@
#include "base/ref_counted.h"
#include "net/base/completion_callback.h"
-#include "remoting/protocol/messages_decoder.h"
namespace net {
+class IOBuffer;
class Socket;
} // namespace net
diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc
deleted file mode 100644
index 23d3dec..0000000
--- a/remoting/protocol/stream_reader.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "remoting/protocol/stream_reader.h"
-
-#include "net/base/completion_callback.h"
-#include "net/base/io_buffer.h"
-
-namespace remoting {
-
-// EventStreamReader class.
-EventStreamReader::EventStreamReader() { }
-EventStreamReader::~EventStreamReader() { }
-
-void EventStreamReader::Init(net::Socket* socket,
- OnMessageCallback* on_message_callback) {
- on_message_callback_.reset(on_message_callback);
- SocketReaderBase::Init(socket);
-}
-
-void EventStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
- ClientMessageList messages_list;
- messages_decoder_.ParseClientMessages(buffer, data_size, &messages_list);
- for (ClientMessageList::iterator it = messages_list.begin();
- it != messages_list.end(); ++it) {
- on_message_callback_->Run(*it);
- }
-}
-
-// VideoStreamReader class.
-VideoStreamReader::VideoStreamReader() { }
-VideoStreamReader::~VideoStreamReader() { }
-
-void VideoStreamReader::Init(net::Socket* socket,
- OnMessageCallback* on_message_callback) {
- on_message_callback_.reset(on_message_callback);
- SocketReaderBase::Init(socket);
-}
-
-void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
- HostMessageList messages_list;
- messages_decoder_.ParseHostMessages(buffer, data_size, &messages_list);
- for (HostMessageList::iterator it = messages_list.begin();
- it != messages_list.end(); ++it) {
- on_message_callback_->Run(*it);
- }
-}
-
-} // namespace remoting
diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h
deleted file mode 100644
index 7f8e004..0000000
--- a/remoting/protocol/stream_reader.h
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef REMOTING_PROTOCOL_STREAM_READER_H_
-#define REMOTING_PROTOCOL_STREAM_READER_H_
-
-#include "base/callback.h"
-#include "base/scoped_ptr.h"
-#include "remoting/protocol/socket_reader_base.h"
-
-namespace remoting {
-
-class EventStreamReader : public SocketReaderBase {
- public:
- EventStreamReader();
- ~EventStreamReader();
-
- // The OnMessageCallback is called whenever a new message is received.
- // Ownership of the message is passed the callback.
- typedef Callback1<ChromotingClientMessage*>::Type OnMessageCallback;
-
- // Initialize the reader and start reading. Must be called on the thread
- // |socket| belongs to. The callback will be called when a new message is
- // received. EventStreamReader owns |on_message_callback|, doesn't own
- // |socket|.
- void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
-
- protected:
- virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
-
- private:
- MessagesDecoder messages_decoder_;
- scoped_ptr<OnMessageCallback> on_message_callback_;
-
- DISALLOW_COPY_AND_ASSIGN(EventStreamReader);
-};
-
-class VideoStreamReader : public SocketReaderBase {
- public:
- VideoStreamReader();
- ~VideoStreamReader();
-
- // The OnMessageCallback is called whenever a new message is received.
- // Ownership of the message is passed the callback.
- typedef Callback1<ChromotingHostMessage*>::Type OnMessageCallback;
-
- // Initialize the reader and start reading. Must be called on the thread
- // |socket| belongs to. The callback will be called when a new message is
- // received. VideoStreamReader owns |on_message_callback|, doesn't own
- // |socket|.
- void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
-
- protected:
- virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
-
- private:
- MessagesDecoder messages_decoder_;
- scoped_ptr<OnMessageCallback> on_message_callback_;
-
- DISALLOW_COPY_AND_ASSIGN(VideoStreamReader);
-};
-
-} // namespace remoting
-
-#endif // REMOTING_PROTOCOL_STREAM_READER_H_
diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h
index cc1c4b1..75f6180 100644
--- a/remoting/protocol/stream_writer.h
+++ b/remoting/protocol/stream_writer.h
@@ -5,8 +5,8 @@
#ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_
#define REMOTING_PROTOCOL_STREAM_WRITER_H_
+#include "remoting/proto/internal.pb.h"
#include "remoting/protocol/buffered_socket_writer.h"
-#include "remoting/protocol/messages_decoder.h"
namespace remoting {