summaryrefslogtreecommitdiffstats
path: root/remoting/protocol
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-10-06 22:46:00 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-10-06 22:46:00 +0000
commitc3af26f3314bf48f478cec8128b5c15cc3f98940 (patch)
tree0d3d0802a3a9b8e05487626f90c7dbf0dcecdea9 /remoting/protocol
parent5bcab699da1cedb4fc666c9f5d0099574a27c2fe (diff)
downloadchromium_src-c3af26f3314bf48f478cec8128b5c15cc3f98940.zip
chromium_src-c3af26f3314bf48f478cec8128b5c15cc3f98940.tar.gz
chromium_src-c3af26f3314bf48f478cec8128b5c15cc3f98940.tar.bz2
Use new Chromotocol code in host andclient.
1. ProtocolDecoder renamed to MessagesDecoder and moved to remoting/protocol. 2. base/protocol_util.[h|cc] split into base/util.[h|cc] and protocol/util.[h|cc]. 3. Added StreamReader and StreamWriter classes for events and video channels. 4. Client and host changed to use the new protocol code. BUG=None TEST=Unittests Review URL: http://codereview.chromium.org/3595012 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@61723 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting/protocol')
-rw-r--r--remoting/protocol/buffered_socket_writer.cc116
-rw-r--r--remoting/protocol/buffered_socket_writer.h74
-rw-r--r--remoting/protocol/chromoting_connection.h13
-rw-r--r--remoting/protocol/fake_connection.cc112
-rw-r--r--remoting/protocol/fake_connection.h94
-rw-r--r--remoting/protocol/jingle_chromoting_connection.cc34
-rw-r--r--remoting/protocol/jingle_chromoting_connection.h7
-rw-r--r--remoting/protocol/messages_decoder.cc146
-rw-r--r--remoting/protocol/messages_decoder.h89
-rw-r--r--remoting/protocol/messages_decoder_unittest.cc121
-rw-r--r--remoting/protocol/stream_reader.cc102
-rw-r--r--remoting/protocol/stream_reader.h97
-rw-r--r--remoting/protocol/stream_writer.cc47
-rw-r--r--remoting/protocol/stream_writer.h53
-rw-r--r--remoting/protocol/util.cc28
-rw-r--r--remoting/protocol/util.h25
16 files changed, 1140 insertions, 18 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc
new file mode 100644
index 0000000..6c60a4a
--- /dev/null
+++ b/remoting/protocol/buffered_socket_writer.cc
@@ -0,0 +1,116 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/buffered_socket_writer.h"
+
+#include "base/message_loop.h"
+#include "net/base/net_errors.h"
+
+namespace remoting {
+
+BufferedSocketWriter::BufferedSocketWriter()
+ : socket_(NULL),
+ message_loop_(NULL),
+ buffer_size_(0),
+ write_pending_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ written_callback_(this, &BufferedSocketWriter::OnWritten)),
+ closed_(false) {
+}
+
+BufferedSocketWriter::~BufferedSocketWriter() { }
+
+void BufferedSocketWriter::Init(net::Socket* socket,
+ WriteFailedCallback* callback) {
+ AutoLock auto_lock(lock_);
+ message_loop_ = MessageLoop::current();
+ socket_ = socket;
+}
+
+bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) {
+ AutoLock auto_lock(lock_);
+ if (!socket_)
+ return false;
+ queue_.push_back(data);
+ buffer_size_ += data->size();
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ return true;
+}
+
+void BufferedSocketWriter::DoWrite() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ DCHECK(socket_);
+
+ // Don't try to write if the writer not initialized or closed, or
+ // there is already a write pending.
+ if (write_pending_ || closed_)
+ return;
+
+ while (true) {
+ while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ AutoLock auto_lock(lock_);
+ if (queue_.empty())
+ return; // Nothing to write.
+ current_buf_ =
+ new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
+ queue_.pop_front();
+ }
+
+ int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(),
+ &written_callback_);
+ if (result >= 0) {
+ {
+ AutoLock auto_lock(lock_);
+ buffer_size_ -= result;
+ }
+ current_buf_->DidConsume(result);
+ } else {
+ if (result == net::ERR_IO_PENDING) {
+ write_pending_ = true;
+ } else {
+ if (write_failed_callback_.get())
+ write_failed_callback_->Run(result);
+ }
+
+ return;
+ }
+ }
+}
+
+void BufferedSocketWriter::OnWritten(int result) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ write_pending_ = false;
+
+ if (result < 0) {
+ if (write_failed_callback_.get())
+ write_failed_callback_->Run(result);
+ }
+
+ {
+ AutoLock auto_lock(lock_);
+ buffer_size_ -= result;
+ }
+ current_buf_->DidConsume(result);
+ // Schedule next write.
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+}
+
+int BufferedSocketWriter::GetBufferSize() {
+ AutoLock auto_lock(lock_);
+ return buffer_size_;
+}
+
+int BufferedSocketWriter::GetBufferChunks() {
+ AutoLock auto_lock(lock_);
+ return queue_.size();
+}
+
+void BufferedSocketWriter::Close() {
+ AutoLock auto_lock(lock_);
+ closed_ = true;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h
new file mode 100644
index 0000000..3d4709d
--- /dev/null
+++ b/remoting/protocol/buffered_socket_writer.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
+#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
+
+#include <deque>
+
+#include "base/lock.h"
+#include "base/ref_counted.h"
+#include "net/base/io_buffer.h"
+#include "net/socket/socket.h"
+
+class MessageLoop;
+
+namespace net {
+class Socket;
+} // namespace net
+
+namespace remoting {
+
+class BufferedSocketWriter
+ : public base::RefCountedThreadSafe<BufferedSocketWriter> {
+ public:
+ typedef Callback1<int>::Type WriteFailedCallback;
+
+ BufferedSocketWriter();
+ virtual ~BufferedSocketWriter();
+
+ // Initializes the writer. Must be called on the thread that will be used
+ // to access the socket in the future. |callback| will be called after each
+ // failed write.
+ void Init(net::Socket* socket, WriteFailedCallback* callback);
+
+ // Puts a new data chunk in the buffer. Returns false and doesn't enqueue
+ // the data if called before Init(). Can be called on any thread.
+ bool Write(scoped_refptr<net::IOBufferWithSize> buffer);
+
+ // Returns current size of the buffer. Can be called on any thread.
+ int GetBufferSize();
+
+ // Returns number of chunks that are currently in the buffer waiting
+ // to be written. Can be called on any thread.
+ int GetBufferChunks();
+
+ // Stops writing and drops current buffers.
+ void Close();
+
+ private:
+ typedef std::deque<scoped_refptr<net::IOBufferWithSize> > DataQueue;
+
+ void DoWrite();
+ void OnWritten(int result);
+
+ net::Socket* socket_;
+ MessageLoop* message_loop_;
+ scoped_ptr<WriteFailedCallback> write_failed_callback_;
+
+ Lock lock_;
+
+ DataQueue queue_;
+ int buffer_size_;
+ scoped_refptr<net::DrainableIOBuffer> current_buf_;
+ bool write_pending_;
+
+ net::CompletionCallbackImpl<BufferedSocketWriter> written_callback_;
+
+ bool closed_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
diff --git a/remoting/protocol/chromoting_connection.h b/remoting/protocol/chromoting_connection.h
index e9c0f42d..397b633 100644
--- a/remoting/protocol/chromoting_connection.h
+++ b/remoting/protocol/chromoting_connection.h
@@ -9,6 +9,7 @@
#include "base/callback.h"
+class MessageLoop;
class Task;
namespace net {
@@ -17,7 +18,10 @@ class Socket;
namespace remoting {
-// Generic interface for Chromoting connection.
+// Generic interface for Chromoting connection used by both client and host.
+// Provides access to the connection channels, but doesn't depend on the
+// protocol used for each channel.
+// TODO(sergeyu): Remove refcounting?
class ChromotingConnection
: public base::RefCountedThreadSafe<ChromotingConnection> {
public:
@@ -37,6 +41,7 @@ class ChromotingConnection
// Reliable PseudoTCP channels for this connection.
// TODO(sergeyu): Remove VideoChannel, and use RTP channels instead.
+ // TODO(sergeyu): Make it possible to create/destroy new channels on-fly?
virtual net::Socket* GetVideoChannel() = 0;
virtual net::Socket* GetEventsChannel() = 0;
@@ -47,7 +52,11 @@ class ChromotingConnection
// JID of the other side.
virtual const std::string& jid() = 0;
- // Closed connection. Callbacks are guaranteed not to be called after
+ // Message loop that must be used for to access the channels of this
+ // connection.
+ virtual MessageLoop* message_loop() = 0;
+
+ // Closes connection. Callbacks are guaranteed not to be called after
// |closed_task| is executed.
virtual void Close(Task* closed_task) = 0;
diff --git a/remoting/protocol/fake_connection.cc b/remoting/protocol/fake_connection.cc
new file mode 100644
index 0000000..cffacb5
--- /dev/null
+++ b/remoting/protocol/fake_connection.cc
@@ -0,0 +1,112 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/fake_connection.h"
+
+#include "base/message_loop.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+
+namespace remoting {
+
+const char kTestJid[] = "host1@gmail.com/chromoting123";
+
+FakeSocket::FakeSocket()
+ : read_pending_(false),
+ input_pos_(0) {
+}
+
+FakeSocket::~FakeSocket() {
+}
+
+void FakeSocket::AppendInputData(char* data, int data_size) {
+ input_data_.insert(input_data_.end(), data, data + data_size);
+ // Complete pending read if any.
+ if (read_pending_) {
+ read_pending_ = false;
+ int result = std::min(read_buffer_size_,
+ static_cast<int>(input_data_.size() - input_pos_));
+ CHECK(result > 0);
+ memcpy(read_buffer_->data(),
+ &(*input_data_.begin()) + input_pos_, result);
+ input_pos_ += result;
+ read_callback_->Run(result);
+ read_buffer_ = NULL;
+ }
+}
+
+int FakeSocket::Read(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback) {
+ if (input_pos_ < static_cast<int>(input_data_.size())) {
+ int result = std::min(buf_len,
+ static_cast<int>(input_data_.size()) - input_pos_);
+ memcpy(buf->data(), &(*input_data_.begin()) + input_pos_, result);
+ input_pos_ += result;
+ return result;
+ } else {
+ read_pending_ = true;
+ read_buffer_ = buf;
+ read_buffer_size_ = buf_len;
+ read_callback_ = callback;
+ return net::ERR_IO_PENDING;
+ }
+}
+
+int FakeSocket::Write(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback) {
+ written_data_.insert(written_data_.end(),
+ buf->data(), buf->data() + buf_len);
+ return buf_len;
+}
+
+bool FakeSocket::SetReceiveBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+bool FakeSocket::SetSendBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+FakeChromotingConnection::FakeChromotingConnection()
+ : message_loop_(NULL),
+ jid_(kTestJid) {
+}
+
+FakeChromotingConnection::~FakeChromotingConnection() { }
+
+void FakeChromotingConnection::SetStateChangeCallback(
+ StateChangeCallback* callback) {
+ callback_.reset(callback);
+}
+
+FakeSocket* FakeChromotingConnection::GetVideoChannel() {
+ return &video_channel_;
+}
+FakeSocket* FakeChromotingConnection::GetEventsChannel() {
+ return &events_channel_;
+}
+
+FakeSocket* FakeChromotingConnection::GetVideoRtpChannel() {
+ return &video_rtp_channel_;
+}
+FakeSocket* FakeChromotingConnection::GetVideoRtcpChannel() {
+ return &video_rtcp_channel_;
+}
+
+const std::string& FakeChromotingConnection::jid() {
+ return jid_;
+}
+
+MessageLoop* FakeChromotingConnection::message_loop() {
+ return message_loop_;
+}
+
+void FakeChromotingConnection::Close(Task* closed_task) {
+ closed_ = true;
+ closed_task->Run();
+ delete closed_task;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/fake_connection.h b/remoting/protocol/fake_connection.h
new file mode 100644
index 0000000..cabce4a
--- /dev/null
+++ b/remoting/protocol/fake_connection.h
@@ -0,0 +1,94 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_FAKE_CONNECTION_H_
+#define REMOTING_PROTOCOL_FAKE_CONNECTION_H_
+
+#include <vector>
+
+#include "base/scoped_ptr.h"
+#include "net/socket/socket.h"
+#include "remoting/protocol/chromoting_connection.h"
+
+namespace remoting {
+
+extern const char kTestJid[];
+
+// FakeSocket implement net::Socket interface for FakeConnection. All data
+// written to FakeSocket is stored in a buffer returned by written_data().
+// Read() reads data from another buffer that can be set with AppendInputData().
+// Pending reads are supported, so if there is a pending read AppendInputData()
+// calls the read callback.
+class FakeSocket : public net::Socket {
+ public:
+ FakeSocket();
+ virtual ~FakeSocket();
+
+ const std::vector<char>& written_data() { return written_data_; }
+
+ void AppendInputData(char* data, int data_size);
+ int input_pos() { return input_pos_; }
+
+ // net::Socket interface.
+ virtual int Read(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback);
+ virtual int Write(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback);
+
+ virtual bool SetReceiveBufferSize(int32 size);
+ virtual bool SetSendBufferSize(int32 size);
+
+ private:
+ bool read_pending_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ int read_buffer_size_;
+ net::CompletionCallback* read_callback_;
+
+ std::vector<char> written_data_;
+ std::vector<char> input_data_;
+ int input_pos_;
+};
+
+// FakeChromotingConnection is a dummy ChromotingConnection that uses
+// FakeSocket for all channels.
+class FakeChromotingConnection : public ChromotingConnection {
+ public:
+ FakeChromotingConnection();
+ virtual ~FakeChromotingConnection();
+
+ StateChangeCallback* get_state_change_callback() { return callback_.get(); }
+
+ void set_message_loop(MessageLoop* message_loop) {
+ message_loop_ = message_loop;
+ }
+
+ bool is_closed() { return closed_; }
+
+ virtual void SetStateChangeCallback(StateChangeCallback* callback);
+
+ virtual FakeSocket* GetVideoChannel();
+ virtual FakeSocket* GetEventsChannel();
+
+ virtual FakeSocket* GetVideoRtpChannel();
+ virtual FakeSocket* GetVideoRtcpChannel();
+
+ virtual const std::string& jid();
+
+ virtual MessageLoop* message_loop();
+ virtual void Close(Task* closed_task);
+
+ public:
+ scoped_ptr<StateChangeCallback> callback_;
+ MessageLoop* message_loop_;
+ FakeSocket video_channel_;
+ FakeSocket events_channel_;
+ FakeSocket video_rtp_channel_;
+ FakeSocket video_rtcp_channel_;
+ std::string jid_;
+ bool closed_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_FAKE_CONNECTION_H_
diff --git a/remoting/protocol/jingle_chromoting_connection.cc b/remoting/protocol/jingle_chromoting_connection.cc
index 5c2e246..2412627 100644
--- a/remoting/protocol/jingle_chromoting_connection.cc
+++ b/remoting/protocol/jingle_chromoting_connection.cc
@@ -28,8 +28,8 @@ const char kEventsChannelName[] = "events";
} // namespace
JingleChromotingConnection::JingleChromotingConnection(
- JingleChromotingServer* session_client)
- : session_client_(session_client),
+ JingleChromotingServer* server)
+ : server_(server),
state_(INITIALIZING),
closed_(false),
session_(NULL),
@@ -42,7 +42,7 @@ JingleChromotingConnection::~JingleChromotingConnection() {
}
void JingleChromotingConnection::Init(Session* session) {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
session_ = session;
jid_ = session_->remote_name();
@@ -55,7 +55,7 @@ bool JingleChromotingConnection::HasSession(cricket::Session* session) {
}
Session* JingleChromotingConnection::ReleaseSession() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
SetState(CLOSED);
Session* session = session_;
@@ -68,35 +68,45 @@ Session* JingleChromotingConnection::ReleaseSession() {
void JingleChromotingConnection::SetStateChangeCallback(
StateChangeCallback* callback) {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
DCHECK(callback);
state_change_callback_.reset(callback);
}
// TODO(sergeyu): Remove this method after we switch to RTP.
net::Socket* JingleChromotingConnection::GetVideoChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return video_channel_adapter_.get();
}
net::Socket* JingleChromotingConnection::GetEventsChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return events_channel_adapter_.get();
}
net::Socket* JingleChromotingConnection::GetVideoRtpChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return video_rtp_channel_.get();
}
net::Socket* JingleChromotingConnection::GetVideoRtcpChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return video_rtcp_channel_.get();
}
+const std::string& JingleChromotingConnection::jid() {
+ // No synchronization is needed because jid_ is not changed
+ // after new connection is passed to JingleChromotingServer callback.
+ return jid_;
+}
+
+MessageLoop* JingleChromotingConnection::message_loop() {
+ return server_->message_loop();
+}
+
void JingleChromotingConnection::Close(Task* closed_task) {
- if (MessageLoop::current() != session_client_->message_loop()) {
- session_client_->message_loop()->PostTask(
+ if (MessageLoop::current() != server_->message_loop()) {
+ server_->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &JingleChromotingConnection::Close,
closed_task));
return;
@@ -171,7 +181,7 @@ void JingleChromotingConnection::OnSessionState(
void JingleChromotingConnection::OnInitiate(bool incoming) {
jid_ = session_->remote_name();
if (incoming)
- session_client_->AcceptConnection(this, session_);
+ server_->AcceptConnection(this, session_);
SetState(CONNECTING);
}
diff --git a/remoting/protocol/jingle_chromoting_connection.h b/remoting/protocol/jingle_chromoting_connection.h
index ada4ead..cc54182 100644
--- a/remoting/protocol/jingle_chromoting_connection.h
+++ b/remoting/protocol/jingle_chromoting_connection.h
@@ -41,9 +41,8 @@ class JingleChromotingConnection : public ChromotingConnection,
virtual net::Socket* GetVideoRtpChannel();
virtual net::Socket* GetVideoRtcpChannel();
- // No synchronization is needed because jid_ is not changed
- // after new connection is passed to JingleChromotingServer callback.
- virtual const std::string& jid() { return jid_; };
+ virtual const std::string& jid();
+ virtual MessageLoop* message_loop();
virtual void Close(Task* closed_task);
@@ -68,7 +67,7 @@ class JingleChromotingConnection : public ChromotingConnection,
void SetState(State new_state);
// JingleChromotingServer that created this connection.
- scoped_refptr<JingleChromotingServer> session_client_;
+ scoped_refptr<JingleChromotingServer> server_;
State state_;
scoped_ptr<StateChangeCallback> state_change_callback_;
diff --git a/remoting/protocol/messages_decoder.cc b/remoting/protocol/messages_decoder.cc
new file mode 100644
index 0000000..f7c7439
--- /dev/null
+++ b/remoting/protocol/messages_decoder.cc
@@ -0,0 +1,146 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/messages_decoder.h"
+
+#include "base/logging.h"
+#include "remoting/base/multiple_array_input_stream.h"
+#include "talk/base/byteorder.h"
+
+namespace remoting {
+
+MessagesDecoder::MessagesDecoder()
+ : last_read_position_(0),
+ available_bytes_(0),
+ next_payload_(0),
+ next_payload_known_(false) {
+}
+
+MessagesDecoder::~MessagesDecoder() {}
+
+void MessagesDecoder::ParseClientMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
+ ClientMessageList* messages) {
+ ParseMessages<ChromotingClientMessage>(data, data_size, messages);
+}
+
+void MessagesDecoder::ParseHostMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
+ HostMessageList* messages) {
+ ParseMessages<ChromotingHostMessage>(data, data_size, messages);
+}
+
+template <typename T>
+void MessagesDecoder::ParseMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
+ std::list<T*>* messages) {
+ // If this is the first data in the processing queue, then set the
+ // last read position to 0.
+ if (data_list_.empty())
+ last_read_position_ = 0;
+
+ // First enqueue the data received.
+ data_list_.push_back(DataChunk(data, data_size));
+ available_bytes_ += data_size;
+
+ // Then try to parse one message until we can't parse anymore.
+ T* message;
+ while (ParseOneMessage<T>(&message)) {
+ messages->push_back(message);
+ }
+}
+
+template <typename T>
+bool MessagesDecoder::ParseOneMessage(T** message) {
+ // Determine the payload size. If we already know it, then skip this
+ // part.
+ // We have the value set to -1 for checking later.
+ int next_payload = -1;
+ if (!next_payload_known_ && GetPayloadSize(&next_payload)) {
+ DCHECK_NE(-1, next_payload);
+ next_payload_ = next_payload;
+ next_payload_known_ = true;
+ }
+
+ // If the next payload size is still not known or we don't have enough
+ // data for parsing then exit.
+ if (!next_payload_known_ || available_bytes_ < next_payload_)
+ return false;
+ next_payload_known_ = false;
+
+ // Create a MultipleArrayInputStream for parsing.
+ MultipleArrayInputStream stream;
+ std::vector<scoped_refptr<net::IOBuffer> > buffers;
+ while (next_payload_ > 0 && !data_list_.empty()) {
+ DataChunk* buffer = &(data_list_.front());
+ size_t read_bytes = std::min(buffer->data_size - last_read_position_,
+ next_payload_);
+
+ buffers.push_back(buffer->data);
+ stream.AddBuffer(buffer->data->data() + last_read_position_, read_bytes);
+
+ // Adjust counters.
+ last_read_position_ += read_bytes;
+ next_payload_ -= read_bytes;
+ available_bytes_ -= read_bytes;
+
+ // If the front buffer is fully read, remove it from the queue.
+ if (buffer->data_size == last_read_position_) {
+ data_list_.pop_front();
+ last_read_position_ = 0;
+ }
+ }
+ DCHECK_EQ(0UL, next_payload_);
+
+ // And finally it is parsing.
+ *message = new T();
+ bool ret = (*message)->ParseFromZeroCopyStream(&stream);
+ if (!ret) {
+ LOG(ERROR) << "Received invalid message.";
+ delete *message;
+ }
+ return ret;
+}
+
+bool MessagesDecoder::GetPayloadSize(int* size) {
+ // The header has a size of 4 bytes.
+ const size_t kHeaderSize = sizeof(int32);
+
+ if (available_bytes_ < kHeaderSize)
+ return false;
+
+ std::string header;
+ while (header.length() < kHeaderSize && !data_list_.empty()) {
+ DataChunk* buffer = &(data_list_.front());
+
+ // Find out how many bytes we need and how many bytes are available in this
+ // buffer.
+ int needed_bytes = kHeaderSize - header.length();
+ int available_bytes = buffer->data_size - last_read_position_;
+
+ // Then append the required bytes into the header and advance the last
+ // read position.
+ int read_bytes = std::min(needed_bytes, available_bytes);
+ header.append(
+ reinterpret_cast<char*>(buffer->data->data()) + last_read_position_,
+ read_bytes);
+ last_read_position_ += read_bytes;
+ available_bytes_ -= read_bytes;
+
+ // If the buffer is depleted then remove it from the queue.
+ if (last_read_position_ == buffer->data_size) {
+ last_read_position_ = 0;
+ data_list_.pop_front();
+ }
+ }
+
+ if (header.length() == kHeaderSize) {
+ *size = talk_base::GetBE32(header.c_str());
+ return true;
+ }
+ NOTREACHED() << "Unable to extract payload size";
+ return false;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/messages_decoder.h b/remoting/protocol/messages_decoder.h
new file mode 100644
index 0000000..66014f6
--- /dev/null
+++ b/remoting/protocol/messages_decoder.h
@@ -0,0 +1,89 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_
+#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_
+
+#include <deque>
+#include <list>
+
+#include "base/ref_counted.h"
+#include "google/protobuf/message_lite.h"
+#include "net/base/io_buffer.h"
+#include "remoting/base/protocol/chromotocol.pb.h"
+
+namespace remoting {
+
+typedef std::list<ChromotingHostMessage*> HostMessageList;
+typedef std::list<ChromotingClientMessage*> ClientMessageList;
+
+// A protocol decoder is used to decode data transmitted in the chromoting
+// network.
+// TODO(hclam): Defines the interface and implement methods.
+class MessagesDecoder {
+ public:
+ MessagesDecoder();
+ virtual ~MessagesDecoder();
+
+ // Parse data received from network into ClientMessages. Output is written
+ // to |messages|.
+ virtual void ParseClientMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
+ ClientMessageList* messages);
+
+ // Parse data received from network into HostMessages. Output is
+ // written to |messages|.
+ virtual void ParseHostMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
+ HostMessageList* messages);
+
+ private:
+ // DataChunk stores reference to a net::IOBuffer and size of the data
+ // stored in that buffer.
+ struct DataChunk {
+ DataChunk(net::IOBuffer* data, size_t data_size)
+ : data(data),
+ data_size(data_size) {
+ }
+
+ scoped_refptr<net::IOBuffer> data;
+ size_t data_size;
+ };
+
+ // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer
+ // instead of storing chunks in dqueue.
+ typedef std::deque<DataChunk> DataList;
+
+ // A private method used to parse data received from network into protocol
+ // buffers.
+ template <typename T>
+ void ParseMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
+ std::list<T*>* messages);
+
+ // Parse one message from |data_list_|. Return true if sucessful.
+ template <typename T>
+ bool ParseOneMessage(T** messages);
+
+ // A utility method to read payload size of the protocol buffer from the
+ // data list. Return false if we don't have enough data.
+ bool GetPayloadSize(int* size);
+
+ DataList data_list_;
+ size_t last_read_position_;
+
+ // Count the number of bytes in |data_list_| not read.
+ size_t available_bytes_;
+
+ // Stores the size of the next payload if known.
+ size_t next_payload_;
+
+ // True if the size of the next payload is known. After one payload is read,
+ // this is reset to false.
+ bool next_payload_known_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_
diff --git a/remoting/protocol/messages_decoder_unittest.cc b/remoting/protocol/messages_decoder_unittest.cc
new file mode 100644
index 0000000..7c5616d
--- /dev/null
+++ b/remoting/protocol/messages_decoder_unittest.cc
@@ -0,0 +1,121 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <string>
+
+#include "base/scoped_ptr.h"
+#include "base/stl_util-inl.h"
+#include "net/base/io_buffer.h"
+#include "remoting/protocol/messages_decoder.h"
+#include "remoting/protocol/util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace remoting {
+
+static const int kWidth = 640;
+static const int kHeight = 480;
+static const std::string kTestData = "Chromoting rockz";
+
+static void AppendMessage(const ChromotingHostMessage& msg,
+ std::string* buffer) {
+ // Contains one encoded message.
+ scoped_refptr<net::IOBufferWithSize> encoded_msg;
+ encoded_msg = SerializeAndFrameMessage(msg);
+ buffer->append(encoded_msg->data(), encoded_msg->size());
+}
+
+// Construct and prepare data in the |output_stream|.
+static void PrepareData(uint8** buffer, int* size) {
+ // Contains all encoded messages.
+ std::string encoded_data;
+
+ // The first message is InitClient.
+ ChromotingHostMessage msg;
+ msg.mutable_init_client()->set_width(kWidth);
+ msg.mutable_init_client()->set_height(kHeight);
+ AppendMessage(msg, &encoded_data);
+ msg.Clear();
+
+ // Then append 10 update sequences to the data.
+ for (int i = 0; i < 10; ++i) {
+ msg.mutable_begin_update_stream();
+ AppendMessage(msg, &encoded_data);
+ msg.Clear();
+
+ msg.mutable_update_stream_packet()->mutable_rect_data()->
+ set_sequence_number(0);
+ msg.mutable_update_stream_packet()->mutable_rect_data()->
+ set_data(kTestData);
+ AppendMessage(msg, &encoded_data);
+ msg.Clear();
+
+ msg.mutable_end_update_stream();
+ AppendMessage(msg, &encoded_data);
+ msg.Clear();
+ }
+
+ *size = encoded_data.length();
+ *buffer = new uint8[*size];
+ memcpy(*buffer, encoded_data.c_str(), *size);
+}
+
+TEST(MessagesDecoderTest, BasicOperations) {
+ // Prepare encoded data for testing.
+ int size;
+ uint8* test_data;
+ PrepareData(&test_data, &size);
+ scoped_array<uint8> memory_deleter(test_data);
+
+ // Then simulate using MessagesDecoder to decode variable
+ // size of encoded data.
+ // The first thing to do is to generate a variable size of data. This is done
+ // by iterating the following array for read sizes.
+ const int kReadSizes[] = {1, 2, 3, 1};
+
+ MessagesDecoder decoder;
+
+ // Then feed the protocol decoder using the above generated data and the
+ // read pattern.
+ HostMessageList message_list;
+ for (int i = 0; i < size;) {
+ // First generate the amount to feed the decoder.
+ int read = std::min(size - i, kReadSizes[i % arraysize(kReadSizes)]);
+
+ // And then prepare a DataBuffer for feeding it.
+ scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(read);
+ memcpy(buffer->data(), test_data + i, read);
+ decoder.ParseHostMessages(buffer, read, &message_list);
+ i += read;
+ }
+
+ // Then verify the decoded messages.
+ EXPECT_EQ(31u, message_list.size());
+ ASSERT_TRUE(message_list.size() > 0);
+ EXPECT_TRUE(message_list.front()->has_init_client());
+ delete message_list.front();
+ message_list.pop_front();
+
+ int index = 0;
+ for (HostMessageList::iterator it = message_list.begin();
+ it != message_list.end(); ++it) {
+ ChromotingHostMessage* message = *it;
+ int type = index % 3;
+ ++index;
+ if (type == 0) {
+ // Begin update stream.
+ EXPECT_TRUE(message->has_begin_update_stream());
+ } else if (type == 1) {
+ // Partial update stream.
+ EXPECT_TRUE(message->has_update_stream_packet());
+ EXPECT_EQ(kTestData,
+ message->update_stream_packet().rect_data().data());
+ } else if (type == 2) {
+ // End update stream.
+ EXPECT_TRUE(message->has_end_update_stream());
+ }
+ }
+ STLDeleteElements(&message_list);
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc
new file mode 100644
index 0000000..bfceb3e
--- /dev/null
+++ b/remoting/protocol/stream_reader.cc
@@ -0,0 +1,102 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/stream_reader.h"
+
+#include "base/message_loop.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+#include "remoting/protocol/chromoting_connection.h"
+
+namespace remoting {
+
+namespace {
+int kReadBufferSize = 4096;
+} // namespace
+
+StreamReaderBase::StreamReaderBase()
+ : socket_(NULL),
+ closed_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &StreamReaderBase::OnRead)) {
+}
+
+StreamReaderBase::~StreamReaderBase() { }
+
+void StreamReaderBase::Close() {
+ closed_ = true;
+}
+
+void StreamReaderBase::Init(net::Socket* socket) {
+ DCHECK(socket);
+ socket_ = socket;
+ DoRead();
+}
+
+void StreamReaderBase::DoRead() {
+ while (true) {
+ read_buffer_ = new net::IOBuffer(kReadBufferSize);
+ int result = socket_->Read(
+ read_buffer_, kReadBufferSize, &read_callback_);
+ HandleReadResult(result);
+ if (result < 0)
+ break;
+ }
+}
+
+void StreamReaderBase::OnRead(int result) {
+ if (!closed_) {
+ HandleReadResult(result);
+ DoRead();
+ }
+}
+
+void StreamReaderBase::HandleReadResult(int result) {
+ if (result > 0) {
+ OnDataReceived(read_buffer_, result);
+ } else {
+ if (result != net::ERR_IO_PENDING)
+ LOG(ERROR) << "Read() returned error " << result;
+ }
+}
+
+// EventsStreamReader class.
+EventsStreamReader::EventsStreamReader() { }
+EventsStreamReader::~EventsStreamReader() { }
+
+void EventsStreamReader::Init(net::Socket* socket,
+ OnMessageCallback* on_message_callback) {
+ on_message_callback_.reset(on_message_callback);
+ StreamReaderBase::Init(socket);
+}
+
+void EventsStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
+ ClientMessageList messages_list;
+ messages_decoder_.ParseClientMessages(buffer, data_size, &messages_list);
+ for (ClientMessageList::iterator it = messages_list.begin();
+ it != messages_list.end(); ++it) {
+ on_message_callback_->Run(*it);
+ }
+}
+
+// VideoStreamReader class.
+VideoStreamReader::VideoStreamReader() { }
+VideoStreamReader::~VideoStreamReader() { }
+
+void VideoStreamReader::Init(net::Socket* socket,
+ OnMessageCallback* on_message_callback) {
+ on_message_callback_.reset(on_message_callback);
+ StreamReaderBase::Init(socket);
+}
+
+void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
+ HostMessageList messages_list;
+ messages_decoder_.ParseHostMessages(buffer, data_size, &messages_list);
+ for (HostMessageList::iterator it = messages_list.begin();
+ it != messages_list.end(); ++it) {
+ on_message_callback_->Run(*it);
+ }
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h
new file mode 100644
index 0000000..b71f4b0
--- /dev/null
+++ b/remoting/protocol/stream_reader.h
@@ -0,0 +1,97 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
+#define REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
+
+#include "base/callback.h"
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "net/base/completion_callback.h"
+#include "remoting/protocol/messages_decoder.h"
+
+namespace net {
+class Socket;
+} // namespace net
+
+namespace remoting {
+
+class ChromotingConnection;
+
+class StreamReaderBase {
+ public:
+ StreamReaderBase();
+ virtual ~StreamReaderBase();
+
+ // Stops reading. Must be called on the same thread as Init().
+ void Close();
+
+ protected:
+ void Init(net::Socket* socket);
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0;
+
+ private:
+ void DoRead();
+ void OnRead(int result);
+ void HandleReadResult(int result);
+
+ net::Socket* socket_;
+ bool closed_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ net::CompletionCallbackImpl<StreamReaderBase> read_callback_;
+};
+
+class EventsStreamReader : public StreamReaderBase {
+ public:
+ EventsStreamReader();
+ ~EventsStreamReader();
+
+ // The OnMessageCallback is called whenever a new message is received.
+ // Ownership of the message is passed the callback.
+ typedef Callback1<ChromotingClientMessage*>::Type OnMessageCallback;
+
+ // Initialize the reader and start reading. Must be called on the thread
+ // |socket| belongs to. The callback will be called when a new message is
+ // received. EventsStreamReader owns |on_message_callback|, doesn't own
+ // |socket|.
+ void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
+
+ protected:
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
+
+ private:
+ MessagesDecoder messages_decoder_;
+ scoped_ptr<OnMessageCallback> on_message_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(EventsStreamReader);
+};
+
+class VideoStreamReader : public StreamReaderBase {
+ public:
+ VideoStreamReader();
+ ~VideoStreamReader();
+
+ // The OnMessageCallback is called whenever a new message is received.
+ // Ownership of the message is passed the callback.
+ typedef Callback1<ChromotingHostMessage*>::Type OnMessageCallback;
+
+ // Initialize the reader and start reading. Must be called on the thread
+ // |socket| belongs to. The callback will be called when a new message is
+ // received. VideoStreamReader owns |on_message_callback|, doesn't own
+ // |socket|.
+ void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
+
+ protected:
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
+
+ private:
+ MessagesDecoder messages_decoder_;
+ scoped_ptr<OnMessageCallback> on_message_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(VideoStreamReader);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
diff --git a/remoting/protocol/stream_writer.cc b/remoting/protocol/stream_writer.cc
new file mode 100644
index 0000000..3269a1b
--- /dev/null
+++ b/remoting/protocol/stream_writer.cc
@@ -0,0 +1,47 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/stream_writer.h"
+
+#include "base/message_loop.h"
+#include "remoting/protocol/chromoting_connection.h"
+#include "remoting/protocol/util.h"
+
+namespace remoting {
+
+StreamWriterBase::StreamWriterBase()
+ : socket_(NULL),
+ buffered_writer_(new BufferedSocketWriter()) {
+}
+
+StreamWriterBase::~StreamWriterBase() { }
+
+void StreamWriterBase::Init(net::Socket* socket) {
+ socket_ = socket;
+ buffered_writer_->Init(socket, NULL);
+}
+
+int StreamWriterBase::GetBufferSize() {
+ return buffered_writer_->GetBufferSize();
+}
+
+int StreamWriterBase::GetPendingMessages() {
+ return buffered_writer_->GetBufferChunks();
+}
+
+void StreamWriterBase::Close() {
+ buffered_writer_->Close();
+}
+
+bool EventsStreamWriter::SendMessage(
+ const ChromotingClientMessage& message) {
+ return buffered_writer_->Write(SerializeAndFrameMessage(message));
+}
+
+bool VideoStreamWriter::SendMessage(
+ const ChromotingHostMessage& message) {
+ return buffered_writer_->Write(SerializeAndFrameMessage(message));
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h
new file mode 100644
index 0000000..4bc2b76
--- /dev/null
+++ b/remoting/protocol/stream_writer.h
@@ -0,0 +1,53 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_
+#define REMOTING_PROTOCOL_STREAM_WRITER_H_
+
+#include "remoting/protocol/buffered_socket_writer.h"
+#include "remoting/protocol/messages_decoder.h"
+
+namespace remoting {
+
+class ChromotingConnection;
+
+class StreamWriterBase {
+ public:
+ StreamWriterBase();
+ virtual ~StreamWriterBase();
+
+ // Initializes the writer. Must be called on the thread the |socket| belongs
+ // to.
+ void Init(net::Socket* socket);
+
+ // Return current buffer state. Can be called from any thread.
+ int GetBufferSize();
+ int GetPendingMessages();
+
+ // Stop writing and drop pending data. Must be called from the same thread as
+ // Init().
+ void Close();
+
+ protected:
+ net::Socket* socket_;
+ scoped_refptr<BufferedSocketWriter> buffered_writer_;
+};
+
+class EventsStreamWriter : public StreamWriterBase {
+ public:
+ // Sends the |message| or returns false if called before Init().
+ // Can be called on any thread.
+ bool SendMessage(const ChromotingClientMessage& message);
+};
+
+class VideoStreamWriter : public StreamWriterBase {
+ public:
+ // Sends the |message| or returns false if called before Init().
+ // Can be called on any thread.
+ bool SendMessage(const ChromotingHostMessage& message);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_STREAM_WRITER_H_
diff --git a/remoting/protocol/util.cc b/remoting/protocol/util.cc
new file mode 100644
index 0000000..5a23c93
--- /dev/null
+++ b/remoting/protocol/util.cc
@@ -0,0 +1,28 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/util.h"
+
+#include "base/basictypes.h"
+#include "base/hash_tables.h"
+#include "base/logging.h"
+#include "net/base/io_buffer.h"
+#include "third_party/libjingle/source/talk/base/byteorder.h"
+
+namespace remoting {
+
+scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage(
+ const google::protobuf::MessageLite& msg) {
+ // Create a buffer with 4 extra bytes. This is used as prefix to write an
+ // int32 of the serialized message size for framing.
+ const int kExtraBytes = sizeof(int32);
+ int size = msg.ByteSize() + kExtraBytes;
+ scoped_refptr<net::IOBufferWithSize> buffer = new net::IOBufferWithSize(size);
+ talk_base::SetBE32(buffer->data(), msg.GetCachedSize());
+ msg.SerializeWithCachedSizesToArray(
+ reinterpret_cast<uint8*>(buffer->data()) + kExtraBytes);
+ return buffer;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/util.h b/remoting/protocol/util.h
new file mode 100644
index 0000000..ef15e45
--- /dev/null
+++ b/remoting/protocol/util.h
@@ -0,0 +1,25 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_UTIL_H_
+#define REMOTING_PROTOCOL_UTIL_H_
+
+#include "google/protobuf/message_lite.h"
+#include "net/base/io_buffer.h"
+#include "remoting/base/protocol/chromotocol.pb.h"
+
+// This file defines utility methods used for encoding and decoding the protocol
+// used in Chromoting.
+namespace remoting {
+
+// Serialize the Protocol Buffer message and provide sufficient framing for
+// sending it over the wire.
+// This will provide sufficient prefix and suffix for the receiver side to
+// decode the message.
+scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage(
+ const google::protobuf::MessageLite& msg);
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_UTIL_H_