diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-10-06 22:46:00 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-10-06 22:46:00 +0000 |
commit | c3af26f3314bf48f478cec8128b5c15cc3f98940 (patch) | |
tree | 0d3d0802a3a9b8e05487626f90c7dbf0dcecdea9 /remoting/protocol | |
parent | 5bcab699da1cedb4fc666c9f5d0099574a27c2fe (diff) | |
download | chromium_src-c3af26f3314bf48f478cec8128b5c15cc3f98940.zip chromium_src-c3af26f3314bf48f478cec8128b5c15cc3f98940.tar.gz chromium_src-c3af26f3314bf48f478cec8128b5c15cc3f98940.tar.bz2 |
Use new Chromotocol code in host andclient.
1. ProtocolDecoder renamed to MessagesDecoder and moved to remoting/protocol.
2. base/protocol_util.[h|cc] split into base/util.[h|cc] and protocol/util.[h|cc].
3. Added StreamReader and StreamWriter classes for events and video channels.
4. Client and host changed to use the new protocol code.
BUG=None
TEST=Unittests
Review URL: http://codereview.chromium.org/3595012
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@61723 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting/protocol')
-rw-r--r-- | remoting/protocol/buffered_socket_writer.cc | 116 | ||||
-rw-r--r-- | remoting/protocol/buffered_socket_writer.h | 74 | ||||
-rw-r--r-- | remoting/protocol/chromoting_connection.h | 13 | ||||
-rw-r--r-- | remoting/protocol/fake_connection.cc | 112 | ||||
-rw-r--r-- | remoting/protocol/fake_connection.h | 94 | ||||
-rw-r--r-- | remoting/protocol/jingle_chromoting_connection.cc | 34 | ||||
-rw-r--r-- | remoting/protocol/jingle_chromoting_connection.h | 7 | ||||
-rw-r--r-- | remoting/protocol/messages_decoder.cc | 146 | ||||
-rw-r--r-- | remoting/protocol/messages_decoder.h | 89 | ||||
-rw-r--r-- | remoting/protocol/messages_decoder_unittest.cc | 121 | ||||
-rw-r--r-- | remoting/protocol/stream_reader.cc | 102 | ||||
-rw-r--r-- | remoting/protocol/stream_reader.h | 97 | ||||
-rw-r--r-- | remoting/protocol/stream_writer.cc | 47 | ||||
-rw-r--r-- | remoting/protocol/stream_writer.h | 53 | ||||
-rw-r--r-- | remoting/protocol/util.cc | 28 | ||||
-rw-r--r-- | remoting/protocol/util.h | 25 |
16 files changed, 1140 insertions, 18 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc new file mode 100644 index 0000000..6c60a4a --- /dev/null +++ b/remoting/protocol/buffered_socket_writer.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/buffered_socket_writer.h" + +#include "base/message_loop.h" +#include "net/base/net_errors.h" + +namespace remoting { + +BufferedSocketWriter::BufferedSocketWriter() + : socket_(NULL), + message_loop_(NULL), + buffer_size_(0), + write_pending_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + written_callback_(this, &BufferedSocketWriter::OnWritten)), + closed_(false) { +} + +BufferedSocketWriter::~BufferedSocketWriter() { } + +void BufferedSocketWriter::Init(net::Socket* socket, + WriteFailedCallback* callback) { + AutoLock auto_lock(lock_); + message_loop_ = MessageLoop::current(); + socket_ = socket; +} + +bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) { + AutoLock auto_lock(lock_); + if (!socket_) + return false; + queue_.push_back(data); + buffer_size_ += data->size(); + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); + return true; +} + +void BufferedSocketWriter::DoWrite() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(socket_); + + // Don't try to write if the writer not initialized or closed, or + // there is already a write pending. + if (write_pending_ || closed_) + return; + + while (true) { + while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + AutoLock auto_lock(lock_); + if (queue_.empty()) + return; // Nothing to write. + current_buf_ = + new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); + queue_.pop_front(); + } + + int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(), + &written_callback_); + if (result >= 0) { + { + AutoLock auto_lock(lock_); + buffer_size_ -= result; + } + current_buf_->DidConsume(result); + } else { + if (result == net::ERR_IO_PENDING) { + write_pending_ = true; + } else { + if (write_failed_callback_.get()) + write_failed_callback_->Run(result); + } + + return; + } + } +} + +void BufferedSocketWriter::OnWritten(int result) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + write_pending_ = false; + + if (result < 0) { + if (write_failed_callback_.get()) + write_failed_callback_->Run(result); + } + + { + AutoLock auto_lock(lock_); + buffer_size_ -= result; + } + current_buf_->DidConsume(result); + // Schedule next write. + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); +} + +int BufferedSocketWriter::GetBufferSize() { + AutoLock auto_lock(lock_); + return buffer_size_; +} + +int BufferedSocketWriter::GetBufferChunks() { + AutoLock auto_lock(lock_); + return queue_.size(); +} + +void BufferedSocketWriter::Close() { + AutoLock auto_lock(lock_); + closed_ = true; +} + +} // namespace remoting diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h new file mode 100644 index 0000000..3d4709d --- /dev/null +++ b/remoting/protocol/buffered_socket_writer.h @@ -0,0 +1,74 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ +#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ + +#include <deque> + +#include "base/lock.h" +#include "base/ref_counted.h" +#include "net/base/io_buffer.h" +#include "net/socket/socket.h" + +class MessageLoop; + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class BufferedSocketWriter + : public base::RefCountedThreadSafe<BufferedSocketWriter> { + public: + typedef Callback1<int>::Type WriteFailedCallback; + + BufferedSocketWriter(); + virtual ~BufferedSocketWriter(); + + // Initializes the writer. Must be called on the thread that will be used + // to access the socket in the future. |callback| will be called after each + // failed write. + void Init(net::Socket* socket, WriteFailedCallback* callback); + + // Puts a new data chunk in the buffer. Returns false and doesn't enqueue + // the data if called before Init(). Can be called on any thread. + bool Write(scoped_refptr<net::IOBufferWithSize> buffer); + + // Returns current size of the buffer. Can be called on any thread. + int GetBufferSize(); + + // Returns number of chunks that are currently in the buffer waiting + // to be written. Can be called on any thread. + int GetBufferChunks(); + + // Stops writing and drops current buffers. + void Close(); + + private: + typedef std::deque<scoped_refptr<net::IOBufferWithSize> > DataQueue; + + void DoWrite(); + void OnWritten(int result); + + net::Socket* socket_; + MessageLoop* message_loop_; + scoped_ptr<WriteFailedCallback> write_failed_callback_; + + Lock lock_; + + DataQueue queue_; + int buffer_size_; + scoped_refptr<net::DrainableIOBuffer> current_buf_; + bool write_pending_; + + net::CompletionCallbackImpl<BufferedSocketWriter> written_callback_; + + bool closed_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ diff --git a/remoting/protocol/chromoting_connection.h b/remoting/protocol/chromoting_connection.h index e9c0f42d..397b633 100644 --- a/remoting/protocol/chromoting_connection.h +++ b/remoting/protocol/chromoting_connection.h @@ -9,6 +9,7 @@ #include "base/callback.h" +class MessageLoop; class Task; namespace net { @@ -17,7 +18,10 @@ class Socket; namespace remoting { -// Generic interface for Chromoting connection. +// Generic interface for Chromoting connection used by both client and host. +// Provides access to the connection channels, but doesn't depend on the +// protocol used for each channel. +// TODO(sergeyu): Remove refcounting? class ChromotingConnection : public base::RefCountedThreadSafe<ChromotingConnection> { public: @@ -37,6 +41,7 @@ class ChromotingConnection // Reliable PseudoTCP channels for this connection. // TODO(sergeyu): Remove VideoChannel, and use RTP channels instead. + // TODO(sergeyu): Make it possible to create/destroy new channels on-fly? virtual net::Socket* GetVideoChannel() = 0; virtual net::Socket* GetEventsChannel() = 0; @@ -47,7 +52,11 @@ class ChromotingConnection // JID of the other side. virtual const std::string& jid() = 0; - // Closed connection. Callbacks are guaranteed not to be called after + // Message loop that must be used for to access the channels of this + // connection. + virtual MessageLoop* message_loop() = 0; + + // Closes connection. Callbacks are guaranteed not to be called after // |closed_task| is executed. virtual void Close(Task* closed_task) = 0; diff --git a/remoting/protocol/fake_connection.cc b/remoting/protocol/fake_connection.cc new file mode 100644 index 0000000..cffacb5 --- /dev/null +++ b/remoting/protocol/fake_connection.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/fake_connection.h" + +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" + +namespace remoting { + +const char kTestJid[] = "host1@gmail.com/chromoting123"; + +FakeSocket::FakeSocket() + : read_pending_(false), + input_pos_(0) { +} + +FakeSocket::~FakeSocket() { +} + +void FakeSocket::AppendInputData(char* data, int data_size) { + input_data_.insert(input_data_.end(), data, data + data_size); + // Complete pending read if any. + if (read_pending_) { + read_pending_ = false; + int result = std::min(read_buffer_size_, + static_cast<int>(input_data_.size() - input_pos_)); + CHECK(result > 0); + memcpy(read_buffer_->data(), + &(*input_data_.begin()) + input_pos_, result); + input_pos_ += result; + read_callback_->Run(result); + read_buffer_ = NULL; + } +} + +int FakeSocket::Read(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + if (input_pos_ < static_cast<int>(input_data_.size())) { + int result = std::min(buf_len, + static_cast<int>(input_data_.size()) - input_pos_); + memcpy(buf->data(), &(*input_data_.begin()) + input_pos_, result); + input_pos_ += result; + return result; + } else { + read_pending_ = true; + read_buffer_ = buf; + read_buffer_size_ = buf_len; + read_callback_ = callback; + return net::ERR_IO_PENDING; + } +} + +int FakeSocket::Write(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + written_data_.insert(written_data_.end(), + buf->data(), buf->data() + buf_len); + return buf_len; +} + +bool FakeSocket::SetReceiveBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} +bool FakeSocket::SetSendBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +FakeChromotingConnection::FakeChromotingConnection() + : message_loop_(NULL), + jid_(kTestJid) { +} + +FakeChromotingConnection::~FakeChromotingConnection() { } + +void FakeChromotingConnection::SetStateChangeCallback( + StateChangeCallback* callback) { + callback_.reset(callback); +} + +FakeSocket* FakeChromotingConnection::GetVideoChannel() { + return &video_channel_; +} +FakeSocket* FakeChromotingConnection::GetEventsChannel() { + return &events_channel_; +} + +FakeSocket* FakeChromotingConnection::GetVideoRtpChannel() { + return &video_rtp_channel_; +} +FakeSocket* FakeChromotingConnection::GetVideoRtcpChannel() { + return &video_rtcp_channel_; +} + +const std::string& FakeChromotingConnection::jid() { + return jid_; +} + +MessageLoop* FakeChromotingConnection::message_loop() { + return message_loop_; +} + +void FakeChromotingConnection::Close(Task* closed_task) { + closed_ = true; + closed_task->Run(); + delete closed_task; +} + +} // namespace remoting diff --git a/remoting/protocol/fake_connection.h b/remoting/protocol/fake_connection.h new file mode 100644 index 0000000..cabce4a --- /dev/null +++ b/remoting/protocol/fake_connection.h @@ -0,0 +1,94 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_FAKE_CONNECTION_H_ +#define REMOTING_PROTOCOL_FAKE_CONNECTION_H_ + +#include <vector> + +#include "base/scoped_ptr.h" +#include "net/socket/socket.h" +#include "remoting/protocol/chromoting_connection.h" + +namespace remoting { + +extern const char kTestJid[]; + +// FakeSocket implement net::Socket interface for FakeConnection. All data +// written to FakeSocket is stored in a buffer returned by written_data(). +// Read() reads data from another buffer that can be set with AppendInputData(). +// Pending reads are supported, so if there is a pending read AppendInputData() +// calls the read callback. +class FakeSocket : public net::Socket { + public: + FakeSocket(); + virtual ~FakeSocket(); + + const std::vector<char>& written_data() { return written_data_; } + + void AppendInputData(char* data, int data_size); + int input_pos() { return input_pos_; } + + // net::Socket interface. + virtual int Read(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + virtual int Write(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + bool read_pending_; + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + net::CompletionCallback* read_callback_; + + std::vector<char> written_data_; + std::vector<char> input_data_; + int input_pos_; +}; + +// FakeChromotingConnection is a dummy ChromotingConnection that uses +// FakeSocket for all channels. +class FakeChromotingConnection : public ChromotingConnection { + public: + FakeChromotingConnection(); + virtual ~FakeChromotingConnection(); + + StateChangeCallback* get_state_change_callback() { return callback_.get(); } + + void set_message_loop(MessageLoop* message_loop) { + message_loop_ = message_loop; + } + + bool is_closed() { return closed_; } + + virtual void SetStateChangeCallback(StateChangeCallback* callback); + + virtual FakeSocket* GetVideoChannel(); + virtual FakeSocket* GetEventsChannel(); + + virtual FakeSocket* GetVideoRtpChannel(); + virtual FakeSocket* GetVideoRtcpChannel(); + + virtual const std::string& jid(); + + virtual MessageLoop* message_loop(); + virtual void Close(Task* closed_task); + + public: + scoped_ptr<StateChangeCallback> callback_; + MessageLoop* message_loop_; + FakeSocket video_channel_; + FakeSocket events_channel_; + FakeSocket video_rtp_channel_; + FakeSocket video_rtcp_channel_; + std::string jid_; + bool closed_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_FAKE_CONNECTION_H_ diff --git a/remoting/protocol/jingle_chromoting_connection.cc b/remoting/protocol/jingle_chromoting_connection.cc index 5c2e246..2412627 100644 --- a/remoting/protocol/jingle_chromoting_connection.cc +++ b/remoting/protocol/jingle_chromoting_connection.cc @@ -28,8 +28,8 @@ const char kEventsChannelName[] = "events"; } // namespace JingleChromotingConnection::JingleChromotingConnection( - JingleChromotingServer* session_client) - : session_client_(session_client), + JingleChromotingServer* server) + : server_(server), state_(INITIALIZING), closed_(false), session_(NULL), @@ -42,7 +42,7 @@ JingleChromotingConnection::~JingleChromotingConnection() { } void JingleChromotingConnection::Init(Session* session) { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); session_ = session; jid_ = session_->remote_name(); @@ -55,7 +55,7 @@ bool JingleChromotingConnection::HasSession(cricket::Session* session) { } Session* JingleChromotingConnection::ReleaseSession() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); SetState(CLOSED); Session* session = session_; @@ -68,35 +68,45 @@ Session* JingleChromotingConnection::ReleaseSession() { void JingleChromotingConnection::SetStateChangeCallback( StateChangeCallback* callback) { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); DCHECK(callback); state_change_callback_.reset(callback); } // TODO(sergeyu): Remove this method after we switch to RTP. net::Socket* JingleChromotingConnection::GetVideoChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return video_channel_adapter_.get(); } net::Socket* JingleChromotingConnection::GetEventsChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return events_channel_adapter_.get(); } net::Socket* JingleChromotingConnection::GetVideoRtpChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return video_rtp_channel_.get(); } net::Socket* JingleChromotingConnection::GetVideoRtcpChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return video_rtcp_channel_.get(); } +const std::string& JingleChromotingConnection::jid() { + // No synchronization is needed because jid_ is not changed + // after new connection is passed to JingleChromotingServer callback. + return jid_; +} + +MessageLoop* JingleChromotingConnection::message_loop() { + return server_->message_loop(); +} + void JingleChromotingConnection::Close(Task* closed_task) { - if (MessageLoop::current() != session_client_->message_loop()) { - session_client_->message_loop()->PostTask( + if (MessageLoop::current() != server_->message_loop()) { + server_->message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &JingleChromotingConnection::Close, closed_task)); return; @@ -171,7 +181,7 @@ void JingleChromotingConnection::OnSessionState( void JingleChromotingConnection::OnInitiate(bool incoming) { jid_ = session_->remote_name(); if (incoming) - session_client_->AcceptConnection(this, session_); + server_->AcceptConnection(this, session_); SetState(CONNECTING); } diff --git a/remoting/protocol/jingle_chromoting_connection.h b/remoting/protocol/jingle_chromoting_connection.h index ada4ead..cc54182 100644 --- a/remoting/protocol/jingle_chromoting_connection.h +++ b/remoting/protocol/jingle_chromoting_connection.h @@ -41,9 +41,8 @@ class JingleChromotingConnection : public ChromotingConnection, virtual net::Socket* GetVideoRtpChannel(); virtual net::Socket* GetVideoRtcpChannel(); - // No synchronization is needed because jid_ is not changed - // after new connection is passed to JingleChromotingServer callback. - virtual const std::string& jid() { return jid_; }; + virtual const std::string& jid(); + virtual MessageLoop* message_loop(); virtual void Close(Task* closed_task); @@ -68,7 +67,7 @@ class JingleChromotingConnection : public ChromotingConnection, void SetState(State new_state); // JingleChromotingServer that created this connection. - scoped_refptr<JingleChromotingServer> session_client_; + scoped_refptr<JingleChromotingServer> server_; State state_; scoped_ptr<StateChangeCallback> state_change_callback_; diff --git a/remoting/protocol/messages_decoder.cc b/remoting/protocol/messages_decoder.cc new file mode 100644 index 0000000..f7c7439 --- /dev/null +++ b/remoting/protocol/messages_decoder.cc @@ -0,0 +1,146 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/messages_decoder.h" + +#include "base/logging.h" +#include "remoting/base/multiple_array_input_stream.h" +#include "talk/base/byteorder.h" + +namespace remoting { + +MessagesDecoder::MessagesDecoder() + : last_read_position_(0), + available_bytes_(0), + next_payload_(0), + next_payload_known_(false) { +} + +MessagesDecoder::~MessagesDecoder() {} + +void MessagesDecoder::ParseClientMessages(scoped_refptr<net::IOBuffer> data, + int data_size, + ClientMessageList* messages) { + ParseMessages<ChromotingClientMessage>(data, data_size, messages); +} + +void MessagesDecoder::ParseHostMessages(scoped_refptr<net::IOBuffer> data, + int data_size, + HostMessageList* messages) { + ParseMessages<ChromotingHostMessage>(data, data_size, messages); +} + +template <typename T> +void MessagesDecoder::ParseMessages(scoped_refptr<net::IOBuffer> data, + int data_size, + std::list<T*>* messages) { + // If this is the first data in the processing queue, then set the + // last read position to 0. + if (data_list_.empty()) + last_read_position_ = 0; + + // First enqueue the data received. + data_list_.push_back(DataChunk(data, data_size)); + available_bytes_ += data_size; + + // Then try to parse one message until we can't parse anymore. + T* message; + while (ParseOneMessage<T>(&message)) { + messages->push_back(message); + } +} + +template <typename T> +bool MessagesDecoder::ParseOneMessage(T** message) { + // Determine the payload size. If we already know it, then skip this + // part. + // We have the value set to -1 for checking later. + int next_payload = -1; + if (!next_payload_known_ && GetPayloadSize(&next_payload)) { + DCHECK_NE(-1, next_payload); + next_payload_ = next_payload; + next_payload_known_ = true; + } + + // If the next payload size is still not known or we don't have enough + // data for parsing then exit. + if (!next_payload_known_ || available_bytes_ < next_payload_) + return false; + next_payload_known_ = false; + + // Create a MultipleArrayInputStream for parsing. + MultipleArrayInputStream stream; + std::vector<scoped_refptr<net::IOBuffer> > buffers; + while (next_payload_ > 0 && !data_list_.empty()) { + DataChunk* buffer = &(data_list_.front()); + size_t read_bytes = std::min(buffer->data_size - last_read_position_, + next_payload_); + + buffers.push_back(buffer->data); + stream.AddBuffer(buffer->data->data() + last_read_position_, read_bytes); + + // Adjust counters. + last_read_position_ += read_bytes; + next_payload_ -= read_bytes; + available_bytes_ -= read_bytes; + + // If the front buffer is fully read, remove it from the queue. + if (buffer->data_size == last_read_position_) { + data_list_.pop_front(); + last_read_position_ = 0; + } + } + DCHECK_EQ(0UL, next_payload_); + + // And finally it is parsing. + *message = new T(); + bool ret = (*message)->ParseFromZeroCopyStream(&stream); + if (!ret) { + LOG(ERROR) << "Received invalid message."; + delete *message; + } + return ret; +} + +bool MessagesDecoder::GetPayloadSize(int* size) { + // The header has a size of 4 bytes. + const size_t kHeaderSize = sizeof(int32); + + if (available_bytes_ < kHeaderSize) + return false; + + std::string header; + while (header.length() < kHeaderSize && !data_list_.empty()) { + DataChunk* buffer = &(data_list_.front()); + + // Find out how many bytes we need and how many bytes are available in this + // buffer. + int needed_bytes = kHeaderSize - header.length(); + int available_bytes = buffer->data_size - last_read_position_; + + // Then append the required bytes into the header and advance the last + // read position. + int read_bytes = std::min(needed_bytes, available_bytes); + header.append( + reinterpret_cast<char*>(buffer->data->data()) + last_read_position_, + read_bytes); + last_read_position_ += read_bytes; + available_bytes_ -= read_bytes; + + // If the buffer is depleted then remove it from the queue. + if (last_read_position_ == buffer->data_size) { + last_read_position_ = 0; + data_list_.pop_front(); + } + } + + if (header.length() == kHeaderSize) { + *size = talk_base::GetBE32(header.c_str()); + return true; + } + NOTREACHED() << "Unable to extract payload size"; + return false; +} + +} // namespace remoting diff --git a/remoting/protocol/messages_decoder.h b/remoting/protocol/messages_decoder.h new file mode 100644 index 0000000..66014f6 --- /dev/null +++ b/remoting/protocol/messages_decoder.h @@ -0,0 +1,89 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_ +#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_ + +#include <deque> +#include <list> + +#include "base/ref_counted.h" +#include "google/protobuf/message_lite.h" +#include "net/base/io_buffer.h" +#include "remoting/base/protocol/chromotocol.pb.h" + +namespace remoting { + +typedef std::list<ChromotingHostMessage*> HostMessageList; +typedef std::list<ChromotingClientMessage*> ClientMessageList; + +// A protocol decoder is used to decode data transmitted in the chromoting +// network. +// TODO(hclam): Defines the interface and implement methods. +class MessagesDecoder { + public: + MessagesDecoder(); + virtual ~MessagesDecoder(); + + // Parse data received from network into ClientMessages. Output is written + // to |messages|. + virtual void ParseClientMessages(scoped_refptr<net::IOBuffer> data, + int data_size, + ClientMessageList* messages); + + // Parse data received from network into HostMessages. Output is + // written to |messages|. + virtual void ParseHostMessages(scoped_refptr<net::IOBuffer> data, + int data_size, + HostMessageList* messages); + + private: + // DataChunk stores reference to a net::IOBuffer and size of the data + // stored in that buffer. + struct DataChunk { + DataChunk(net::IOBuffer* data, size_t data_size) + : data(data), + data_size(data_size) { + } + + scoped_refptr<net::IOBuffer> data; + size_t data_size; + }; + + // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer + // instead of storing chunks in dqueue. + typedef std::deque<DataChunk> DataList; + + // A private method used to parse data received from network into protocol + // buffers. + template <typename T> + void ParseMessages(scoped_refptr<net::IOBuffer> data, + int data_size, + std::list<T*>* messages); + + // Parse one message from |data_list_|. Return true if sucessful. + template <typename T> + bool ParseOneMessage(T** messages); + + // A utility method to read payload size of the protocol buffer from the + // data list. Return false if we don't have enough data. + bool GetPayloadSize(int* size); + + DataList data_list_; + size_t last_read_position_; + + // Count the number of bytes in |data_list_| not read. + size_t available_bytes_; + + // Stores the size of the next payload if known. + size_t next_payload_; + + // True if the size of the next payload is known. After one payload is read, + // this is reset to false. + bool next_payload_known_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_ diff --git a/remoting/protocol/messages_decoder_unittest.cc b/remoting/protocol/messages_decoder_unittest.cc new file mode 100644 index 0000000..7c5616d --- /dev/null +++ b/remoting/protocol/messages_decoder_unittest.cc @@ -0,0 +1,121 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <string> + +#include "base/scoped_ptr.h" +#include "base/stl_util-inl.h" +#include "net/base/io_buffer.h" +#include "remoting/protocol/messages_decoder.h" +#include "remoting/protocol/util.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace remoting { + +static const int kWidth = 640; +static const int kHeight = 480; +static const std::string kTestData = "Chromoting rockz"; + +static void AppendMessage(const ChromotingHostMessage& msg, + std::string* buffer) { + // Contains one encoded message. + scoped_refptr<net::IOBufferWithSize> encoded_msg; + encoded_msg = SerializeAndFrameMessage(msg); + buffer->append(encoded_msg->data(), encoded_msg->size()); +} + +// Construct and prepare data in the |output_stream|. +static void PrepareData(uint8** buffer, int* size) { + // Contains all encoded messages. + std::string encoded_data; + + // The first message is InitClient. + ChromotingHostMessage msg; + msg.mutable_init_client()->set_width(kWidth); + msg.mutable_init_client()->set_height(kHeight); + AppendMessage(msg, &encoded_data); + msg.Clear(); + + // Then append 10 update sequences to the data. + for (int i = 0; i < 10; ++i) { + msg.mutable_begin_update_stream(); + AppendMessage(msg, &encoded_data); + msg.Clear(); + + msg.mutable_update_stream_packet()->mutable_rect_data()-> + set_sequence_number(0); + msg.mutable_update_stream_packet()->mutable_rect_data()-> + set_data(kTestData); + AppendMessage(msg, &encoded_data); + msg.Clear(); + + msg.mutable_end_update_stream(); + AppendMessage(msg, &encoded_data); + msg.Clear(); + } + + *size = encoded_data.length(); + *buffer = new uint8[*size]; + memcpy(*buffer, encoded_data.c_str(), *size); +} + +TEST(MessagesDecoderTest, BasicOperations) { + // Prepare encoded data for testing. + int size; + uint8* test_data; + PrepareData(&test_data, &size); + scoped_array<uint8> memory_deleter(test_data); + + // Then simulate using MessagesDecoder to decode variable + // size of encoded data. + // The first thing to do is to generate a variable size of data. This is done + // by iterating the following array for read sizes. + const int kReadSizes[] = {1, 2, 3, 1}; + + MessagesDecoder decoder; + + // Then feed the protocol decoder using the above generated data and the + // read pattern. + HostMessageList message_list; + for (int i = 0; i < size;) { + // First generate the amount to feed the decoder. + int read = std::min(size - i, kReadSizes[i % arraysize(kReadSizes)]); + + // And then prepare a DataBuffer for feeding it. + scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(read); + memcpy(buffer->data(), test_data + i, read); + decoder.ParseHostMessages(buffer, read, &message_list); + i += read; + } + + // Then verify the decoded messages. + EXPECT_EQ(31u, message_list.size()); + ASSERT_TRUE(message_list.size() > 0); + EXPECT_TRUE(message_list.front()->has_init_client()); + delete message_list.front(); + message_list.pop_front(); + + int index = 0; + for (HostMessageList::iterator it = message_list.begin(); + it != message_list.end(); ++it) { + ChromotingHostMessage* message = *it; + int type = index % 3; + ++index; + if (type == 0) { + // Begin update stream. + EXPECT_TRUE(message->has_begin_update_stream()); + } else if (type == 1) { + // Partial update stream. + EXPECT_TRUE(message->has_update_stream_packet()); + EXPECT_EQ(kTestData, + message->update_stream_packet().rect_data().data()); + } else if (type == 2) { + // End update stream. + EXPECT_TRUE(message->has_end_update_stream()); + } + } + STLDeleteElements(&message_list); +} + +} // namespace remoting diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc new file mode 100644 index 0000000..bfceb3e --- /dev/null +++ b/remoting/protocol/stream_reader.cc @@ -0,0 +1,102 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/stream_reader.h" + +#include "base/message_loop.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" +#include "remoting/protocol/chromoting_connection.h" + +namespace remoting { + +namespace { +int kReadBufferSize = 4096; +} // namespace + +StreamReaderBase::StreamReaderBase() + : socket_(NULL), + closed_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &StreamReaderBase::OnRead)) { +} + +StreamReaderBase::~StreamReaderBase() { } + +void StreamReaderBase::Close() { + closed_ = true; +} + +void StreamReaderBase::Init(net::Socket* socket) { + DCHECK(socket); + socket_ = socket; + DoRead(); +} + +void StreamReaderBase::DoRead() { + while (true) { + read_buffer_ = new net::IOBuffer(kReadBufferSize); + int result = socket_->Read( + read_buffer_, kReadBufferSize, &read_callback_); + HandleReadResult(result); + if (result < 0) + break; + } +} + +void StreamReaderBase::OnRead(int result) { + if (!closed_) { + HandleReadResult(result); + DoRead(); + } +} + +void StreamReaderBase::HandleReadResult(int result) { + if (result > 0) { + OnDataReceived(read_buffer_, result); + } else { + if (result != net::ERR_IO_PENDING) + LOG(ERROR) << "Read() returned error " << result; + } +} + +// EventsStreamReader class. +EventsStreamReader::EventsStreamReader() { } +EventsStreamReader::~EventsStreamReader() { } + +void EventsStreamReader::Init(net::Socket* socket, + OnMessageCallback* on_message_callback) { + on_message_callback_.reset(on_message_callback); + StreamReaderBase::Init(socket); +} + +void EventsStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { + ClientMessageList messages_list; + messages_decoder_.ParseClientMessages(buffer, data_size, &messages_list); + for (ClientMessageList::iterator it = messages_list.begin(); + it != messages_list.end(); ++it) { + on_message_callback_->Run(*it); + } +} + +// VideoStreamReader class. +VideoStreamReader::VideoStreamReader() { } +VideoStreamReader::~VideoStreamReader() { } + +void VideoStreamReader::Init(net::Socket* socket, + OnMessageCallback* on_message_callback) { + on_message_callback_.reset(on_message_callback); + StreamReaderBase::Init(socket); +} + +void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { + HostMessageList messages_list; + messages_decoder_.ParseHostMessages(buffer, data_size, &messages_list); + for (HostMessageList::iterator it = messages_list.begin(); + it != messages_list.end(); ++it) { + on_message_callback_->Run(*it); + } +} + +} // namespace remoting diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h new file mode 100644 index 0000000..b71f4b0 --- /dev/null +++ b/remoting/protocol/stream_reader.h @@ -0,0 +1,97 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ +#define REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ + +#include "base/callback.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" +#include "net/base/completion_callback.h" +#include "remoting/protocol/messages_decoder.h" + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class ChromotingConnection; + +class StreamReaderBase { + public: + StreamReaderBase(); + virtual ~StreamReaderBase(); + + // Stops reading. Must be called on the same thread as Init(). + void Close(); + + protected: + void Init(net::Socket* socket); + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0; + + private: + void DoRead(); + void OnRead(int result); + void HandleReadResult(int result); + + net::Socket* socket_; + bool closed_; + scoped_refptr<net::IOBuffer> read_buffer_; + net::CompletionCallbackImpl<StreamReaderBase> read_callback_; +}; + +class EventsStreamReader : public StreamReaderBase { + public: + EventsStreamReader(); + ~EventsStreamReader(); + + // The OnMessageCallback is called whenever a new message is received. + // Ownership of the message is passed the callback. + typedef Callback1<ChromotingClientMessage*>::Type OnMessageCallback; + + // Initialize the reader and start reading. Must be called on the thread + // |socket| belongs to. The callback will be called when a new message is + // received. EventsStreamReader owns |on_message_callback|, doesn't own + // |socket|. + void Init(net::Socket* socket, OnMessageCallback* on_message_callback); + + protected: + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); + + private: + MessagesDecoder messages_decoder_; + scoped_ptr<OnMessageCallback> on_message_callback_; + + DISALLOW_COPY_AND_ASSIGN(EventsStreamReader); +}; + +class VideoStreamReader : public StreamReaderBase { + public: + VideoStreamReader(); + ~VideoStreamReader(); + + // The OnMessageCallback is called whenever a new message is received. + // Ownership of the message is passed the callback. + typedef Callback1<ChromotingHostMessage*>::Type OnMessageCallback; + + // Initialize the reader and start reading. Must be called on the thread + // |socket| belongs to. The callback will be called when a new message is + // received. VideoStreamReader owns |on_message_callback|, doesn't own + // |socket|. + void Init(net::Socket* socket, OnMessageCallback* on_message_callback); + + protected: + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); + + private: + MessagesDecoder messages_decoder_; + scoped_ptr<OnMessageCallback> on_message_callback_; + + DISALLOW_COPY_AND_ASSIGN(VideoStreamReader); +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ diff --git a/remoting/protocol/stream_writer.cc b/remoting/protocol/stream_writer.cc new file mode 100644 index 0000000..3269a1b --- /dev/null +++ b/remoting/protocol/stream_writer.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/stream_writer.h" + +#include "base/message_loop.h" +#include "remoting/protocol/chromoting_connection.h" +#include "remoting/protocol/util.h" + +namespace remoting { + +StreamWriterBase::StreamWriterBase() + : socket_(NULL), + buffered_writer_(new BufferedSocketWriter()) { +} + +StreamWriterBase::~StreamWriterBase() { } + +void StreamWriterBase::Init(net::Socket* socket) { + socket_ = socket; + buffered_writer_->Init(socket, NULL); +} + +int StreamWriterBase::GetBufferSize() { + return buffered_writer_->GetBufferSize(); +} + +int StreamWriterBase::GetPendingMessages() { + return buffered_writer_->GetBufferChunks(); +} + +void StreamWriterBase::Close() { + buffered_writer_->Close(); +} + +bool EventsStreamWriter::SendMessage( + const ChromotingClientMessage& message) { + return buffered_writer_->Write(SerializeAndFrameMessage(message)); +} + +bool VideoStreamWriter::SendMessage( + const ChromotingHostMessage& message) { + return buffered_writer_->Write(SerializeAndFrameMessage(message)); +} + +} // namespace remoting diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h new file mode 100644 index 0000000..4bc2b76 --- /dev/null +++ b/remoting/protocol/stream_writer.h @@ -0,0 +1,53 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_ +#define REMOTING_PROTOCOL_STREAM_WRITER_H_ + +#include "remoting/protocol/buffered_socket_writer.h" +#include "remoting/protocol/messages_decoder.h" + +namespace remoting { + +class ChromotingConnection; + +class StreamWriterBase { + public: + StreamWriterBase(); + virtual ~StreamWriterBase(); + + // Initializes the writer. Must be called on the thread the |socket| belongs + // to. + void Init(net::Socket* socket); + + // Return current buffer state. Can be called from any thread. + int GetBufferSize(); + int GetPendingMessages(); + + // Stop writing and drop pending data. Must be called from the same thread as + // Init(). + void Close(); + + protected: + net::Socket* socket_; + scoped_refptr<BufferedSocketWriter> buffered_writer_; +}; + +class EventsStreamWriter : public StreamWriterBase { + public: + // Sends the |message| or returns false if called before Init(). + // Can be called on any thread. + bool SendMessage(const ChromotingClientMessage& message); +}; + +class VideoStreamWriter : public StreamWriterBase { + public: + // Sends the |message| or returns false if called before Init(). + // Can be called on any thread. + bool SendMessage(const ChromotingHostMessage& message); +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_STREAM_WRITER_H_ diff --git a/remoting/protocol/util.cc b/remoting/protocol/util.cc new file mode 100644 index 0000000..5a23c93 --- /dev/null +++ b/remoting/protocol/util.cc @@ -0,0 +1,28 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/util.h" + +#include "base/basictypes.h" +#include "base/hash_tables.h" +#include "base/logging.h" +#include "net/base/io_buffer.h" +#include "third_party/libjingle/source/talk/base/byteorder.h" + +namespace remoting { + +scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage( + const google::protobuf::MessageLite& msg) { + // Create a buffer with 4 extra bytes. This is used as prefix to write an + // int32 of the serialized message size for framing. + const int kExtraBytes = sizeof(int32); + int size = msg.ByteSize() + kExtraBytes; + scoped_refptr<net::IOBufferWithSize> buffer = new net::IOBufferWithSize(size); + talk_base::SetBE32(buffer->data(), msg.GetCachedSize()); + msg.SerializeWithCachedSizesToArray( + reinterpret_cast<uint8*>(buffer->data()) + kExtraBytes); + return buffer; +} + +} // namespace remoting diff --git a/remoting/protocol/util.h b/remoting/protocol/util.h new file mode 100644 index 0000000..ef15e45 --- /dev/null +++ b/remoting/protocol/util.h @@ -0,0 +1,25 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_UTIL_H_ +#define REMOTING_PROTOCOL_UTIL_H_ + +#include "google/protobuf/message_lite.h" +#include "net/base/io_buffer.h" +#include "remoting/base/protocol/chromotocol.pb.h" + +// This file defines utility methods used for encoding and decoding the protocol +// used in Chromoting. +namespace remoting { + +// Serialize the Protocol Buffer message and provide sufficient framing for +// sending it over the wire. +// This will provide sufficient prefix and suffix for the receiver side to +// decode the message. +scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage( + const google::protobuf::MessageLite& msg); + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_UTIL_H_ |