diff options
50 files changed, 1286 insertions, 601 deletions
diff --git a/remoting/base/codec_test.cc b/remoting/base/codec_test.cc index 2736c3e..62b98607 100644 --- a/remoting/base/codec_test.cc +++ b/remoting/base/codec_test.cc @@ -11,7 +11,7 @@ #include "remoting/base/decoder.h" #include "remoting/base/encoder.h" #include "remoting/base/mock_objects.h" -#include "remoting/base/protocol_util.h" +#include "remoting/base/util.h" #include "testing/gtest/include/gtest/gtest.h" static const int kWidth = 320; diff --git a/remoting/base/decoder_row_based.cc b/remoting/base/decoder_row_based.cc index 2f4290a..65c74c5 100644 --- a/remoting/base/decoder_row_based.cc +++ b/remoting/base/decoder_row_based.cc @@ -7,7 +7,7 @@ #include "remoting/base/decompressor.h" #include "remoting/base/decompressor_zlib.h" #include "remoting/base/decompressor_verbatim.h" -#include "remoting/base/protocol_util.h" +#include "remoting/base/util.h" namespace remoting { diff --git a/remoting/base/decoder_vp8.cc b/remoting/base/decoder_vp8.cc index 0ede4ab..452cd1f 100644 --- a/remoting/base/decoder_vp8.cc +++ b/remoting/base/decoder_vp8.cc @@ -6,7 +6,7 @@ #include "media/base/media.h" #include "media/base/yuv_convert.h" -#include "remoting/base/protocol_util.h" +#include "remoting/base/util.h" extern "C" { #define VPX_CODEC_DISABLE_COMPAT 1 diff --git a/remoting/base/encoder_verbatim.cc b/remoting/base/encoder_verbatim.cc index d831006..b636670 100644 --- a/remoting/base/encoder_verbatim.cc +++ b/remoting/base/encoder_verbatim.cc @@ -8,7 +8,7 @@ #include "gfx/rect.h" #include "media/base/data_buffer.h" #include "remoting/base/capture_data.h" -#include "remoting/base/protocol_util.h" +#include "remoting/base/util.h" #include "remoting/base/protocol/chromotocol.pb.h" namespace remoting { diff --git a/remoting/base/encoder_zlib.cc b/remoting/base/encoder_zlib.cc index 390a46ad..f807e85 100644 --- a/remoting/base/encoder_zlib.cc +++ b/remoting/base/encoder_zlib.cc @@ -9,7 +9,7 @@ #include "media/base/data_buffer.h" #include "remoting/base/capture_data.h" #include "remoting/base/compressor_zlib.h" -#include "remoting/base/protocol_util.h" +#include "remoting/base/util.h" #include "remoting/base/protocol/chromotocol.pb.h" namespace remoting { diff --git a/remoting/base/mock_objects.h b/remoting/base/mock_objects.h index 7565f22..12143ad 100644 --- a/remoting/base/mock_objects.h +++ b/remoting/base/mock_objects.h @@ -8,26 +8,10 @@ #include "remoting/base/capture_data.h" #include "remoting/base/decoder.h" #include "remoting/base/encoder.h" -#include "remoting/base/protocol_decoder.h" #include "testing/gmock/include/gmock/gmock.h" namespace remoting { -class MockProtocolDecoder : public ProtocolDecoder { - public: - MockProtocolDecoder() {} - - MOCK_METHOD2(ParseClientMessages, - void(scoped_refptr<media::DataBuffer> data, - ClientMessageList* messages)); - MOCK_METHOD2(ParseHostMessages, - void(scoped_refptr<media::DataBuffer> data, - HostMessageList* messages)); - - private: - DISALLOW_COPY_AND_ASSIGN(MockProtocolDecoder); -}; - class MockEncoder : public Encoder { public: MockEncoder() {} diff --git a/remoting/base/multiple_array_input_stream.cc b/remoting/base/multiple_array_input_stream.cc index e5b8f94..ab0ba45 100644 --- a/remoting/base/multiple_array_input_stream.cc +++ b/remoting/base/multiple_array_input_stream.cc @@ -9,22 +9,26 @@ namespace remoting { -MultipleArrayInputStream::MultipleArrayInputStream(int count) - : buffer_count_(count), - current_buffer_(0), +MultipleArrayInputStream::MultipleArrayInputStream() + : current_buffer_(0), current_buffer_offset_(0), position_(0), last_returned_size_(0) { - DCHECK_GT(buffer_count_, 0); - buffers_.reset(new const uint8*[buffer_count_]); - buffer_sizes_.reset(new int[buffer_count_]); } MultipleArrayInputStream::~MultipleArrayInputStream() { } +void MultipleArrayInputStream::AddBuffer( + const char* buffer, int size) { + DCHECK_EQ(position_, 0); // Haven't started reading. + buffers_.push_back(buffer); + buffer_sizes_.push_back(size); + DCHECK_EQ(buffers_.size(), buffer_sizes_.size()); +} + bool MultipleArrayInputStream::Next(const void** data, int* size) { - if (current_buffer_ < buffer_count_) { + if (current_buffer_ < buffers_.size()) { // Also reply with that is remaining in the current buffer. last_returned_size_ = buffer_sizes_[current_buffer_] - current_buffer_offset_; @@ -49,7 +53,7 @@ bool MultipleArrayInputStream::Next(const void** data, int* size) { void MultipleArrayInputStream::BackUp(int count) { DCHECK_LE(count, last_returned_size_); DCHECK_EQ(0, current_buffer_offset_); - DCHECK_GT(current_buffer_, 0); + DCHECK_GT(current_buffer_, 0u); // Rewind one buffer. --current_buffer_; @@ -63,7 +67,7 @@ bool MultipleArrayInputStream::Skip(int count) { DCHECK_GE(count, 0); last_returned_size_ = 0; - while (count && current_buffer_ < buffer_count_) { + while (count && current_buffer_ < buffers_.size()) { int read = std::min( count, buffer_sizes_[current_buffer_] - current_buffer_offset_); @@ -86,11 +90,4 @@ int64 MultipleArrayInputStream::ByteCount() const { return position_; } -void MultipleArrayInputStream::SetBuffer( - int n, const uint8* buffer, int size) { - CHECK(n < buffer_count_); - buffers_[n] = buffer; - buffer_sizes_[n] = size; -} - } // namespace remoting diff --git a/remoting/base/multiple_array_input_stream.h b/remoting/base/multiple_array_input_stream.h index a1b5f01..7747da5 100644 --- a/remoting/base/multiple_array_input_stream.h +++ b/remoting/base/multiple_array_input_stream.h @@ -5,8 +5,9 @@ #ifndef REMOTING_BASE_MULTIPLE_ARRAY_INPUT_STREAM_H_ #define REMOTING_BASE_MULTIPLE_ARRAY_INPUT_STREAM_H_ +#include <vector> + #include "base/basictypes.h" -#include "base/scoped_ptr.h" #include "google/protobuf/io/zero_copy_stream.h" namespace remoting { @@ -19,24 +20,23 @@ class MultipleArrayInputStream : // Construct a MultipleArrayInputStream with |count| backing arrays. // TODO(hclam): Consider adding block size to see if it has a performance // gain. - explicit MultipleArrayInputStream(int count); + MultipleArrayInputStream(); virtual ~MultipleArrayInputStream(); + // Add a new buffer to the list. + void AddBuffer(const char* buffer, int size); + + // google::protobuf::io::ZeroCopyInputStream interface. virtual bool Next(const void** data, int* size); virtual void BackUp(int count); virtual bool Skip(int count); virtual int64 ByteCount() const; - // Set the n-th buffer to be |buffer|. - void SetBuffer(int n, const uint8* buffer, int size); - private: - scoped_array<const uint8*> buffers_; - scoped_array<int> buffer_sizes_; - - const int buffer_count_; + std::vector<const char*> buffers_; + std::vector<int> buffer_sizes_; - int current_buffer_; + size_t current_buffer_; int current_buffer_offset_; int position_; int last_returned_size_; diff --git a/remoting/base/multiple_array_input_stream_unittest.cc b/remoting/base/multiple_array_input_stream_unittest.cc index d8397c7..1d705fe 100644 --- a/remoting/base/multiple_array_input_stream_unittest.cc +++ b/remoting/base/multiple_array_input_stream_unittest.cc @@ -4,11 +4,14 @@ #include <string> +#include "base/scoped_ptr.h" #include "remoting/base/multiple_array_input_stream.h" #include "testing/gtest/include/gtest/gtest.h" namespace remoting { +// TODO(sergeyu): Add SCOPED_TRACE() for ReadFromInput() and ReadString(). + static size_t ReadFromInput(MultipleArrayInputStream* input, void* data, size_t size) { uint8* out = reinterpret_cast<uint8*>(data); @@ -64,11 +67,11 @@ static void PrepareData(scoped_ptr<MultipleArrayInputStream>* stream) { segments += 2; } - MultipleArrayInputStream* mstream = new MultipleArrayInputStream(segments); + MultipleArrayInputStream* mstream = new MultipleArrayInputStream(); const char* data = kTestData.c_str(); for (int i = 0; i < segments; ++i) { int size = i % 2 == 0 ? 1 : 2; - mstream->SetBuffer(i, reinterpret_cast<const uint8*>(data), size); + mstream->AddBuffer(data, size); data += size; } stream->reset(mstream); diff --git a/remoting/base/protocol_util.cc b/remoting/base/protocol_util.cc deleted file mode 100644 index f009eb3..0000000 --- a/remoting/base/protocol_util.cc +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "remoting/base/protocol_util.h" - -#include "base/basictypes.h" -#include "base/hash_tables.h" -#include "base/logging.h" -#include "talk/base/byteorder.h" - -namespace remoting { - -scoped_refptr<media::DataBuffer> SerializeAndFrameMessage( - const google::protobuf::MessageLite& msg) { - - // Create a buffer with 4 extra bytes. This is used as prefix to write an - // int32 of the serialized message size for framing. - const int kExtraBytes = sizeof(int32); - int size = msg.ByteSize() + kExtraBytes; - scoped_refptr<media::DataBuffer> buffer = new media::DataBuffer(size); - talk_base::SetBE32(buffer->GetWritableData(), msg.GetCachedSize()); - msg.SerializeWithCachedSizesToArray(buffer->GetWritableData() + kExtraBytes); - buffer->SetDataSize(size); - return buffer; -} - -int GetBytesPerPixel(PixelFormat format) { - // Note: The order is important here for performance. This is sorted from the - // most common to the less common (PixelFormatAscii is mostly used - // just for testing). - switch (format) { - case PixelFormatRgb24: return 3; - case PixelFormatRgb565: return 2; - case PixelFormatRgb32: return 4; - case PixelFormatAscii: return 1; - default: - NOTREACHED() << "Pixel format not supported"; - return 0; - } -} - -} // namespace remoting diff --git a/remoting/base/util.cc b/remoting/base/util.cc new file mode 100644 index 0000000..8ccbd23 --- /dev/null +++ b/remoting/base/util.cc @@ -0,0 +1,26 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/base/util.h" + +#include "base/logging.h" + +namespace remoting { + +int GetBytesPerPixel(PixelFormat format) { + // Note: The order is important here for performance. This is sorted from the + // most common to the less common (PixelFormatAscii is mostly used + // just for testing). + switch (format) { + case PixelFormatRgb24: return 3; + case PixelFormatRgb565: return 2; + case PixelFormatRgb32: return 4; + case PixelFormatAscii: return 1; + default: + NOTREACHED() << "Pixel format not supported"; + return 0; + } +} + +} // namespace remoting diff --git a/remoting/base/util.h b/remoting/base/util.h new file mode 100644 index 0000000..adfbc89 --- /dev/null +++ b/remoting/base/util.h @@ -0,0 +1,16 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_BASE_UTIL_H_ +#define REMOTING_BASE_UTIL_H_ + +#include "remoting/base/protocol/chromotocol.pb.h" + +namespace remoting { + +int GetBytesPerPixel(PixelFormat format); + +} // namespace remoting + +#endif // REMOTING_BASE_UTIL_H_ diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc index e4f93ab..7669e3f 100644 --- a/remoting/client/chromoting_client.cc +++ b/remoting/client/chromoting_client.cc @@ -96,18 +96,18 @@ void ChromotingClient::SetViewport(int x, int y, int width, int height) { view_->SetViewport(x, y, width, height); } -void ChromotingClient::HandleMessages(HostConnection* conn, - HostMessageList* messages) { +void ChromotingClient::HandleMessage(HostConnection* conn, + ChromotingHostMessage* msg) { if (message_loop() != MessageLoop::current()) { message_loop()->PostTask( FROM_HERE, - NewRunnableMethod(this, &ChromotingClient::HandleMessages, - conn, messages)); + NewRunnableMethod(this, &ChromotingClient::HandleMessage, + conn, msg)); return; } // Put all messages in the queue. - received_messages_.splice(received_messages_.end(), *messages); + received_messages_.push_back(msg); if (!message_being_processed_) { DispatchMessage(); @@ -204,18 +204,18 @@ void ChromotingClient::SetState(State s) { Repaint(); } -void ChromotingClient::OnMessageDone(ChromotingHostMessage* msg) { +void ChromotingClient::OnMessageDone(ChromotingHostMessage* message) { if (message_loop() != MessageLoop::current()) { message_loop()->PostTask( FROM_HERE, - NewTracedMethod(this, &ChromotingClient::OnMessageDone, msg)); + NewTracedMethod(this, &ChromotingClient::OnMessageDone, message)); return; } TraceContext::tracer()->PrintString("Message done"); message_being_processed_ = false; - delete msg; + delete message; DispatchMessage(); } diff --git a/remoting/client/chromoting_client.h b/remoting/client/chromoting_client.h index 31870ed..f55ec78 100644 --- a/remoting/client/chromoting_client.h +++ b/remoting/client/chromoting_client.h @@ -10,6 +10,7 @@ #include "base/task.h" #include "remoting/client/host_connection.h" #include "remoting/client/client_config.h" +#include "remoting/protocol/messages_decoder.h" class MessageLoop; @@ -50,7 +51,8 @@ class ChromotingClient : public HostConnection::HostEventCallback { virtual void SetViewport(int x, int y, int width, int height); // HostConnection::HostEventCallback implementation. - virtual void HandleMessages(HostConnection* conn, HostMessageList* messages); + virtual void HandleMessage(HostConnection* conn, + ChromotingHostMessage* messages); virtual void OnConnectionOpened(HostConnection* conn); virtual void OnConnectionClosed(HostConnection* conn); virtual void OnConnectionFailed(HostConnection* conn); diff --git a/remoting/client/host_connection.h b/remoting/client/host_connection.h index 7af8de3..74c2a6d 100644 --- a/remoting/client/host_connection.h +++ b/remoting/client/host_connection.h @@ -7,7 +7,7 @@ #include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "remoting/base/protocol_decoder.h" +#include "remoting/protocol/messages_decoder.h" namespace remoting { @@ -19,12 +19,10 @@ class HostConnection { public: virtual ~HostEventCallback() {} - // Handles an event received by the HostConnection. Receiver will own the - // HostMessages in HostMessageList and needs to delete them. - // Note that the sender of messages will not reference messages - // again so it is okay to clear |messages| in this method. - virtual void HandleMessages(HostConnection* conn, - HostMessageList* messages) = 0; + // Handles an event received by the HostConnection. Ownership of + // the message is passed to the callee. + virtual void HandleMessage(HostConnection* conn, + ChromotingHostMessage* message) = 0; // Called when the network connection is opened. virtual void OnConnectionOpened(HostConnection* conn) = 0; diff --git a/remoting/client/jingle_host_connection.cc b/remoting/client/jingle_host_connection.cc index 4892883..a4519df 100644 --- a/remoting/client/jingle_host_connection.cc +++ b/remoting/client/jingle_host_connection.cc @@ -2,12 +2,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/callback.h" #include "base/message_loop.h" #include "remoting/base/constants.h" -#include "remoting/base/protocol_util.h" #include "remoting/client/client_config.h" #include "remoting/client/jingle_host_connection.h" #include "remoting/jingle_glue/jingle_thread.h" +#include "remoting/protocol/jingle_chromoting_server.h" +#include "remoting/protocol/util.h" namespace remoting { @@ -21,67 +23,82 @@ JingleHostConnection::~JingleHostConnection() { void JingleHostConnection::Connect(const ClientConfig& config, HostEventCallback* event_callback) { - message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &JingleHostConnection::DoConnect, - config, event_callback)); -} + event_callback_ = event_callback; -void JingleHostConnection::Disconnect() { - message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &JingleHostConnection::DoDisconnect)); + // Initialize |jingle_client_|. + jingle_client_ = new JingleClient(context_->jingle_thread()); + jingle_client_->Init(config.username, config.auth_token, + kChromotingTokenServiceName, this); + + // Save jid of the host. The actual connection is created later after + // |jingle_client_| is connected. + host_jid_ = config.host_jid; } -void JingleHostConnection::SendEvent(const ChromotingClientMessage& msg) { - if (message_loop() != MessageLoop::current()) { +void JingleHostConnection::Disconnect() { + if (MessageLoop::current() != message_loop()) { message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &JingleHostConnection::SendEvent, msg)); + FROM_HERE, NewRunnableMethod(this, + &JingleHostConnection::Disconnect)); return; } - // Don't send messages if we're disconnected. - if (jingle_channel_ == NULL) { - return; + events_writer_.Close(); + video_reader_.Close(); + + if (connection_) { + connection_->Close( + NewRunnableMethod(this, &JingleHostConnection::OnDisconnected)); + } else { + OnDisconnected(); } +} - jingle_channel_->Write(SerializeAndFrameMessage(msg)); +void JingleHostConnection::OnVideoMessage( + ChromotingHostMessage* msg) { + event_callback_->HandleMessage(this, msg); } -void JingleHostConnection::OnStateChange(JingleChannel* channel, - JingleChannel::State state) { +void JingleHostConnection::InitConnection() { DCHECK_EQ(message_loop(), MessageLoop::current()); - DCHECK(event_callback_); - - switch (state) { - case JingleChannel::FAILED: - event_callback_->OnConnectionFailed(this); - break; - case JingleChannel::CLOSED: - event_callback_->OnConnectionClosed(this); - break; + // Initialize |chromotocol_server_|. + JingleChromotingServer* chromotocol_server = + new JingleChromotingServer(message_loop()); + chromotocol_server->Init( + jingle_client_->GetFullJid(), + jingle_client_->session_manager(), + NewCallback(this, &JingleHostConnection::OnNewChromotocolConnection)); + chromotocol_server_ = chromotocol_server; + + // Initialize |connection_|. + connection_ = chromotocol_server_->Connect( + host_jid_, + NewCallback(this, &JingleHostConnection::OnConnectionStateChange)); +} - case JingleChannel::OPEN: - event_callback_->OnConnectionOpened(this); - break; +void JingleHostConnection::OnDisconnected() { + connection_ = NULL; - default: - // Ignore the other states by default. - break; + if (chromotocol_server_) { + chromotocol_server_->Close( + NewRunnableMethod(this, &JingleHostConnection::OnServerClosed)); + } else { + OnServerClosed(); } } -void JingleHostConnection::OnPacketReceived( - JingleChannel* channel, - scoped_refptr<media::DataBuffer> buffer) { - DCHECK_EQ(message_loop(), MessageLoop::current()); - DCHECK(event_callback_); +void JingleHostConnection::OnServerClosed() { + chromotocol_server_ = NULL; + if (jingle_client_) { + jingle_client_->Close(); + jingle_client_ = NULL; + } +} - HostMessageList list; - decoder_.ParseHostMessages(buffer, &list); - event_callback_->HandleMessages(this, &list); +void JingleHostConnection::SendEvent(const ChromotingClientMessage& msg) { + // This drops the message if we are not connected yet. + events_writer_.SendMessage(msg); } // JingleClient::Callback interface. @@ -93,6 +110,7 @@ void JingleHostConnection::OnStateChange(JingleClient* client, if (state == JingleClient::CONNECTED) { LOG(INFO) << "Connected as: " << client->GetFullJid(); + InitConnection(); } else if (state == JingleClient::CLOSED) { LOG(INFO) << "Connection closed."; event_callback_->OnConnectionClosed(this); @@ -110,8 +128,8 @@ bool JingleHostConnection::OnAcceptConnection( } void JingleHostConnection::OnNewConnection( - JingleClient* client, - scoped_refptr<JingleChannel> channel) { + JingleClient* client, + scoped_refptr<JingleChannel> channel) { DCHECK_EQ(message_loop(), MessageLoop::current()); // TODO(ajwong): Should we log more aggressively on this and above? We @@ -119,34 +137,44 @@ void JingleHostConnection::OnNewConnection( NOTREACHED() << "Clients can't accept inbound connections."; } -MessageLoop* JingleHostConnection::message_loop() { - return context_->jingle_thread()->message_loop(); +void JingleHostConnection::OnNewChromotocolConnection( + ChromotingConnection* connection, bool* accept) { + DCHECK_EQ(message_loop(), MessageLoop::current()); + // Client always rejects incoming connections. + *accept = false; } -void JingleHostConnection::DoConnect(const ClientConfig& config, - HostEventCallback* event_callback) { +void JingleHostConnection::OnConnectionStateChange( + ChromotingConnection::State state) { DCHECK_EQ(message_loop(), MessageLoop::current()); + DCHECK(event_callback_); - event_callback_ = event_callback; + switch (state) { + case ChromotingConnection::FAILED: + event_callback_->OnConnectionFailed(this); + break; - jingle_client_ = new JingleClient(context_->jingle_thread()); - jingle_client_->Init(config.username, config.auth_token, - kChromotingTokenServiceName, this); - jingle_channel_ = jingle_client_->Connect(config.host_jid, this); -} + case ChromotingConnection::CLOSED: + event_callback_->OnConnectionClosed(this); + break; -void JingleHostConnection::DoDisconnect() { - DCHECK_EQ(message_loop(), MessageLoop::current()); + case ChromotingConnection::CONNECTED: + // Initialize reader and writer. + events_writer_.Init(connection_->GetEventsChannel()); + video_reader_.Init( + connection_->GetVideoChannel(), + NewCallback(this, &JingleHostConnection::OnVideoMessage)); + event_callback_->OnConnectionOpened(this); + break; - if (jingle_channel_.get()) { - jingle_channel_->Close(); - jingle_channel_ = NULL; + default: + // Ignore the other states by default. + break; } +} - if (jingle_client_.get()) { - jingle_client_->Close(); - jingle_client_ = NULL; - } +MessageLoop* JingleHostConnection::message_loop() { + return context_->jingle_thread()->message_loop(); } } // namespace remoting diff --git a/remoting/client/jingle_host_connection.h b/remoting/client/jingle_host_connection.h index 885c63c..bbbbcab 100644 --- a/remoting/client/jingle_host_connection.h +++ b/remoting/client/jingle_host_connection.h @@ -21,11 +21,13 @@ #include "base/ref_counted.h" #include "base/scoped_ptr.h" #include "base/task.h" -#include "remoting/base/protocol_decoder.h" #include "remoting/client/client_context.h" #include "remoting/client/host_connection.h" -#include "remoting/jingle_glue/jingle_channel.h" #include "remoting/jingle_glue/jingle_client.h" +#include "remoting/protocol/chromoting_connection.h" +#include "remoting/protocol/chromoting_server.h" +#include "remoting/protocol/stream_reader.h" +#include "remoting/protocol/stream_writer.h" class MessageLoop; @@ -36,7 +38,6 @@ class JingleThread; struct ClientConfig; class JingleHostConnection : public HostConnection, - public JingleChannel::Callback, public JingleClient::Callback { public: explicit JingleHostConnection(ClientContext* context); @@ -48,34 +49,50 @@ class JingleHostConnection : public HostConnection, virtual void SendEvent(const ChromotingClientMessage& msg); - // JingleChannel::Callback interface. - virtual void OnStateChange(JingleChannel* channel, - JingleChannel::State state); - virtual void OnPacketReceived(JingleChannel* channel, - scoped_refptr<media::DataBuffer> buffer); - // JingleClient::Callback interface. virtual void OnStateChange(JingleClient* client, JingleClient::State state); virtual bool OnAcceptConnection(JingleClient* client, const std::string& jid, JingleChannel::Callback** callback); - virtual void OnNewConnection(JingleClient* client, - scoped_refptr<JingleChannel> channel); + void OnNewConnection(JingleClient* client, + scoped_refptr<JingleChannel> channel); + + // Callback for ChromotingServer. + void OnNewChromotocolConnection( + ChromotingConnection* connection, bool* accept); + + // Callback for ChromotingConnection. + void OnConnectionStateChange(ChromotingConnection::State state); private: + // The message loop for the jingle thread this object works on. MessageLoop* message_loop(); - void DoConnect(const ClientConfig& config, - HostEventCallback* event_callback); - void DoDisconnect(); + // Called on the jingle thread after we've successfully to XMPP server. Starts + // P2P connection to the host. + void InitConnection(); + + // Callback for |video_reader_|. + // TODO(sergeyu): This should be replaced with RTP/RTCP handler. + void OnVideoMessage(ChromotingHostMessage* msg); + + // Used by Disconnect() to disconnect chromoting connection, stop chromoting + // server, and then disconnect XMPP connection. + void OnDisconnected(); + void OnServerClosed(); ClientContext* context_; scoped_refptr<JingleClient> jingle_client_; - scoped_refptr<JingleChannel> jingle_channel_; + scoped_refptr<ChromotingServer> chromotocol_server_; + scoped_refptr<ChromotingConnection> connection_; + + EventsStreamWriter events_writer_; + VideoStreamReader video_reader_; - ProtocolDecoder decoder_; HostEventCallback* event_callback_; + std::string host_jid_; + DISALLOW_COPY_AND_ASSIGN(JingleHostConnection); }; diff --git a/remoting/client/rectangle_update_decoder.cc b/remoting/client/rectangle_update_decoder.cc index 44bac08..aa87c4a 100644 --- a/remoting/client/rectangle_update_decoder.cc +++ b/remoting/client/rectangle_update_decoder.cc @@ -10,8 +10,8 @@ #include "remoting/base/decoder.h" #include "remoting/base/decoder_row_based.h" #include "remoting/base/protocol/chromotocol.pb.h" -#include "remoting/base/protocol_util.h" #include "remoting/base/tracer.h" +#include "remoting/base/util.h" #include "remoting/client/frame_consumer.h" using media::AutoTaskRunner; diff --git a/remoting/host/chromoting_host.cc b/remoting/host/chromoting_host.cc index 656327fb..d0b7bb7 100644 --- a/remoting/host/chromoting_host.cc +++ b/remoting/host/chromoting_host.cc @@ -9,13 +9,14 @@ #include "build/build_config.h" #include "remoting/base/constants.h" #include "remoting/base/encoder.h" -#include "remoting/base/protocol_decoder.h" #include "remoting/host/chromoting_host_context.h" #include "remoting/host/capturer.h" #include "remoting/host/event_executor.h" #include "remoting/host/host_config.h" #include "remoting/host/session_manager.h" #include "remoting/jingle_glue/jingle_channel.h" +#include "remoting/protocol/messages_decoder.h" +#include "remoting/protocol/jingle_chromoting_server.h" namespace remoting { @@ -54,9 +55,6 @@ void ChromotingHost::Start(Task* shutdown_task) { state_ = kStarted; } - // Get capturer to set up it's initial configuration. - capturer_->ScreenConfigurationChanged(); - // Save the shutdown task. shutdown_task_.reset(shutdown_task); @@ -64,11 +62,12 @@ void ChromotingHost::Start(Task* shutdown_task) { std::string xmpp_auth_token; if (!config_->GetString(kXmppLoginConfigPath, &xmpp_login) || !config_->GetString(kXmppAuthTokenConfigPath, &xmpp_auth_token)) { - LOG(ERROR) << "XMPP credentials are not defined in config."; + LOG(ERROR) << "XMPP credentials are not defined in the config."; return; } - access_verifier_.Init(config_); + if (!access_verifier_.Init(config_)) + return; // Connect to the talk network with a JingleClient. jingle_client_ = new JingleClient(context_->jingle_thread()); @@ -94,8 +93,10 @@ void ChromotingHost::Shutdown() { // No-op if this object is not started yet. { AutoLock auto_lock(lock_); - if (state_ != kStarted) + if (state_ != kStarted) { + state_ = kStopped; return; + } state_ = kStopped; } @@ -110,16 +111,22 @@ void ChromotingHost::Shutdown() { client_->Disconnect(); } - // Disconnect from the talk network. - if (jingle_client_) { - jingle_client_->Close(); - } - // Stop the heartbeat sender. if (heartbeat_sender_) { heartbeat_sender_->Stop(); } + // Stop chromotocol server. + if (chromotocol_server_) { + chromotocol_server_->Close( + NewRunnableMethod(this, &ChromotingHost::OnServerClosed)); + } + + // Disconnect from the talk network. + if (jingle_client_) { + jingle_client_->Close(); + } + // Lastly call the shutdown task. if (shutdown_task_.get()) { shutdown_task_->Run(); @@ -168,15 +175,14 @@ void ChromotingHost::OnClientDisconnected(ClientConnection* client) { //////////////////////////////////////////////////////////////////////////// // ClientConnection::EventHandler implementations -void ChromotingHost::HandleMessages(ClientConnection* client, - ClientMessageList* messages) { +void ChromotingHost::HandleMessage(ClientConnection* client, + ChromotingClientMessage* message) { DCHECK_EQ(context_->main_message_loop(), MessageLoop::current()); // Delegate the messages to EventExecutor and delete the unhandled // messages. DCHECK(executor_.get()); - executor_->HandleInputEvents(messages); - STLDeleteElements<ClientMessageList>(messages); + executor_->HandleInputEvent(message); } void ChromotingHost::OnConnectionOpened(ClientConnection* client) { @@ -210,7 +216,15 @@ void ChromotingHost::OnStateChange(JingleClient* jingle_client, LOG(INFO) << "Host connected as " << jingle_client->GetFullJid() << "." << std::endl; - // Start heartbeating after we've connected. + // Create and start |chromotocol_server_|. + chromotocol_server_ = + new JingleChromotingServer(context_->jingle_thread()->message_loop()); + chromotocol_server_->Init( + jingle_client->GetFullJid(), + jingle_client->session_manager(), + NewCallback(this, &ChromotingHost::OnNewClientConnection)); + + // Start heartbeating. heartbeat_sender_->Start(); } else if (state == JingleClient::CLOSED) { LOG(INFO) << "Host disconnected from talk network." << std::endl; @@ -224,45 +238,44 @@ void ChromotingHost::OnStateChange(JingleClient* jingle_client, } } -bool ChromotingHost::OnAcceptConnection( - JingleClient* jingle_client, const std::string& jid, - JingleChannel::Callback** channel_callback) { +void ChromotingHost::OnNewClientConnection( + ChromotingConnection* connection, bool* accept) { AutoLock auto_lock(lock_); - if (state_ != kStarted) - return false; - - DCHECK_EQ(jingle_client_.get(), jingle_client); - // TODO(hclam): Allow multiple clients to connect to the host. - if (client_.get()) - return false; + if (client_.get() || state_ != kStarted) { + *accept = false; + return; + } // Check that the user has access to the host. - if (!access_verifier_.VerifyPermissions(jid)) - return false; + if (!access_verifier_.VerifyPermissions(connection->jid())) { + *accept = false; + return; + } - LOG(INFO) << "Client connected: " << jid << std::endl; + *accept = true; + + LOG(INFO) << "Client connected: " << connection->jid() << std::endl; // If we accept the connected then create a client object and set the // callback. - client_ = new ClientConnection(context_->main_message_loop(), - new ProtocolDecoder(), this); - *channel_callback = client_.get(); - return true; + client_ = new ClientConnection(context_->main_message_loop(), this); + client_->Init(connection); +} + +bool ChromotingHost::OnAcceptConnection( + JingleClient* jingle_client, const std::string& jid, + JingleChannel::Callback** channel_callback) { + return false; } void ChromotingHost::OnNewConnection(JingleClient* jingle_client, scoped_refptr<JingleChannel> channel) { - AutoLock auto_lock(lock_); - if (state_ != kStarted) - return; - - DCHECK_EQ(jingle_client_.get(), jingle_client); + NOTREACHED(); +} - // Since the session manager has not started, it is still safe to access - // the client directly. Note that we give the ownership of the channel - // to the client. - client_->set_jingle_channel(channel); +void ChromotingHost::OnServerClosed() { + // Don't need to do anything here. } } // namespace remoting diff --git a/remoting/host/chromoting_host.h b/remoting/host/chromoting_host.h index ce3bdda..c1773cb 100644 --- a/remoting/host/chromoting_host.h +++ b/remoting/host/chromoting_host.h @@ -27,6 +27,7 @@ class Encoder; class EventExecutor; class MutableHostConfig; class SessionManager; +class JingleChromotingServer; // A class to implement the functionality of a host process. // @@ -83,8 +84,8 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>, //////////////////////////////////////////////////////////////////////////// // ClientConnection::EventHandler implementations - virtual void HandleMessages(ClientConnection* client, - ClientMessageList* messages); + virtual void HandleMessage(ClientConnection* client, + ChromotingClientMessage* message); virtual void OnConnectionOpened(ClientConnection* client); virtual void OnConnectionClosed(ClientConnection* client); virtual void OnConnectionFailed(ClientConnection* client); @@ -99,6 +100,9 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>, JingleClient* jingle, scoped_refptr<JingleChannel> channel); + // Callback for ChromotingServer. + void OnNewClientConnection(ChromotingConnection* connection, bool* accept); + private: enum State { kInitial, @@ -106,6 +110,13 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>, kStopped, }; + // This method connects to the talk network and start listening for incoming + // connections. + void DoStart(Task* shutdown_task); + + // Callback for ChromotingServer::Close(). + void OnServerClosed(); + // The context that the chromoting host runs on. ChromotingHostContext* context_; @@ -126,6 +137,8 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>, // receive connection requests from chromoting client. scoped_refptr<JingleClient> jingle_client_; + scoped_refptr<JingleChromotingServer> chromotocol_server_; + // Objects that takes care of sending heartbeats to the chromoting bot. scoped_refptr<HeartbeatSender> heartbeat_sender_; diff --git a/remoting/host/client_connection.cc b/remoting/host/client_connection.cc index 2cbbed6..8c8bc94 100644 --- a/remoting/host/client_connection.cc +++ b/remoting/host/client_connection.cc @@ -5,11 +5,9 @@ #include "remoting/host/client_connection.h" #include "google/protobuf/message.h" -#include "media/base/data_buffer.h" -#include "remoting/base/protocol_decoder.h" -#include "remoting/base/protocol_util.h" - -using media::DataBuffer; +#include "net/base/io_buffer.h" +#include "remoting/protocol/messages_decoder.h" +#include "remoting/protocol/util.h" namespace remoting { @@ -18,165 +16,100 @@ namespace remoting { static const size_t kAverageUpdateStream = 10; ClientConnection::ClientConnection(MessageLoop* message_loop, - ProtocolDecoder* decoder, EventHandler* handler) : loop_(message_loop), - decoder_(decoder), - size_in_queue_(0), - update_stream_size_(0), handler_(handler) { DCHECK(loop_); - DCHECK(decoder_.get()); DCHECK(handler_); } ClientConnection::~ClientConnection() { // TODO(hclam): When we shut down the viewer we may have to close the - // jingle channel. + // connection. } -// static -scoped_refptr<media::DataBuffer> - ClientConnection::CreateWireFormatDataBuffer( - const ChromotingHostMessage* msg) { - // TODO(hclam): Instead of serializing |msg| create an DataBuffer - // object that wraps around it. - scoped_ptr<const ChromotingHostMessage> message_deleter(msg); - return SerializeAndFrameMessage(*msg); +void ClientConnection::Init(ChromotingConnection* connection) { + DCHECK_EQ(connection->message_loop(), MessageLoop::current()); + + connection_ = connection; + connection_->SetStateChangeCallback( + NewCallback(this, &ClientConnection::OnConnectionStateChange)); } void ClientConnection::SendInitClientMessage(int width, int height) { DCHECK_EQ(loop_, MessageLoop::current()); - DCHECK(!update_stream_size_); // If we are disconnected then return. - if (!channel_) + if (!connection_) return; ChromotingHostMessage msg; msg.mutable_init_client()->set_width(width); msg.mutable_init_client()->set_height(height); DCHECK(msg.IsInitialized()); - channel_->Write(SerializeAndFrameMessage(msg)); -} - -void ClientConnection::SendBeginUpdateStreamMessage() { - DCHECK_EQ(loop_, MessageLoop::current()); - - // If we are disconnected then return. - if (!channel_) - return; - - ChromotingHostMessage msg; - msg.mutable_begin_update_stream(); - DCHECK(msg.IsInitialized()); - - scoped_refptr<DataBuffer> data = SerializeAndFrameMessage(msg); - DCHECK(!update_stream_size_); - update_stream_size_ += data->GetDataSize(); - channel_->Write(data); + video_writer_.SendMessage(msg); } void ClientConnection::SendUpdateStreamPacketMessage( - scoped_refptr<DataBuffer> data) { + const ChromotingHostMessage& message) { DCHECK_EQ(loop_, MessageLoop::current()); // If we are disconnected then return. - if (!channel_) + if (!connection_) return; - update_stream_size_ += data->GetDataSize(); - channel_->Write(data); -} - -void ClientConnection::SendEndUpdateStreamMessage() { - DCHECK_EQ(loop_, MessageLoop::current()); - - // If we are disconnected then return. - if (!channel_) - return; - - ChromotingHostMessage msg; - msg.mutable_end_update_stream(); - DCHECK(msg.IsInitialized()); - - scoped_refptr<DataBuffer> data = SerializeAndFrameMessage(msg); - update_stream_size_ += data->GetDataSize(); - channel_->Write(data); - - // Here's some logic to help finding the average update stream size. - size_in_queue_ += update_stream_size_; - size_queue_.push_back(update_stream_size_); - if (size_queue_.size() > kAverageUpdateStream) { - size_in_queue_ -= size_queue_.front(); - size_queue_.pop_front(); - DCHECK_GE(size_in_queue_, 0); - } - update_stream_size_ = 0; -} - -void ClientConnection::MarkEndOfUpdate() { - // This is some logic to help calculate the average update stream size. - size_in_queue_ += update_stream_size_; - size_queue_.push_back(update_stream_size_); - if (size_queue_.size() > kAverageUpdateStream) { - size_in_queue_ -= size_queue_.front(); - size_queue_.pop_front(); - DCHECK_GE(size_in_queue_, 0); - } - update_stream_size_ = 0; + video_writer_.SendMessage(message); } int ClientConnection::GetPendingUpdateStreamMessages() { DCHECK_EQ(loop_, MessageLoop::current()); - - if (!size_queue_.size()) - return 0; - int average_size = size_in_queue_ / size_queue_.size(); - if (!average_size) - return 0; - return channel_->write_buffer_size() / average_size; + return video_writer_.GetPendingMessages(); } void ClientConnection::Disconnect() { DCHECK_EQ(loop_, MessageLoop::current()); // If there is a channel then close it and release the reference. - if (channel_) { - channel_->Close(); - channel_ = NULL; + if (connection_) { + connection_->Close(NewRunnableMethod(this, &ClientConnection::OnClosed)); + connection_ = NULL; } } -void ClientConnection::OnStateChange(JingleChannel* channel, - JingleChannel::State state) { - DCHECK(channel); +void ClientConnection::OnConnectionStateChange( + ChromotingConnection::State state) { + if (state == ChromotingConnection::CONNECTED) { + events_reader_.Init( + connection_->GetEventsChannel(), + NewCallback(this, &ClientConnection::OnMessageReceived)); + video_writer_.Init(connection_->GetVideoChannel()); + } + loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &ClientConnection::StateChangeTask, state)); } -void ClientConnection::OnPacketReceived(JingleChannel* channel, - scoped_refptr<DataBuffer> data) { - DCHECK_EQ(channel_.get(), channel); +void ClientConnection::OnMessageReceived(ChromotingClientMessage* message) { loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &ClientConnection::PacketReceivedTask, data)); + NewRunnableMethod(this, &ClientConnection::MessageReceivedTask, + message)); } -void ClientConnection::StateChangeTask(JingleChannel::State state) { +void ClientConnection::StateChangeTask(ChromotingConnection::State state) { DCHECK_EQ(loop_, MessageLoop::current()); DCHECK(handler_); switch(state) { - case JingleChannel::CONNECTING: + case ChromotingConnection::CONNECTING: break; // Don't care about this message. - case JingleChannel::OPEN: + case ChromotingConnection::CONNECTED: handler_->OnConnectionOpened(this); break; - case JingleChannel::CLOSED: + case ChromotingConnection::CLOSED: handler_->OnConnectionClosed(this); break; - case JingleChannel::FAILED: + case ChromotingConnection::FAILED: handler_->OnConnectionFailed(this); break; default: @@ -185,17 +118,14 @@ void ClientConnection::StateChangeTask(JingleChannel::State state) { } } -void ClientConnection::PacketReceivedTask(scoped_refptr<DataBuffer> data) { +void ClientConnection::MessageReceivedTask(ChromotingClientMessage* message) { DCHECK_EQ(loop_, MessageLoop::current()); - - // Use the decoder to parse incoming data. - DCHECK(decoder_.get()); - ClientMessageList list; - decoder_->ParseClientMessages(data, &list); - - // Then submit the messages to the handler. DCHECK(handler_); - handler_->HandleMessages(this, &list); + handler_->HandleMessage(this, message); +} + +// OnClosed() is used as a callback for ChromotingConnection::Close(). +void ClientConnection::OnClosed() { } } // namespace remoting diff --git a/remoting/host/client_connection.h b/remoting/host/client_connection.h index 0ea2b26..44bb761 100644 --- a/remoting/host/client_connection.h +++ b/remoting/host/client_connection.h @@ -11,9 +11,10 @@ #include "base/message_loop.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "remoting/base/protocol_decoder.h" #include "remoting/base/protocol/chromotocol.pb.h" -#include "remoting/jingle_glue/jingle_channel.h" +#include "remoting/protocol/chromoting_connection.h" +#include "remoting/protocol/stream_reader.h" +#include "remoting/protocol/stream_writer.h" namespace media { @@ -28,8 +29,7 @@ namespace remoting { // screen updates and other messages to the remote viewer. It is also // responsible for receiving and parsing data from the remote viewer and // delegating events to the event handler. -class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>, - public JingleChannel::Callback { +class ClientConnection : public base::RefCountedThreadSafe<ClientConnection> { public: class EventHandler { public: @@ -39,8 +39,8 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>, // ClientMessages in ClientMessageList and needs to delete them. // Note that the sender of messages will not reference messages // again so it is okay to clear |messages| in this method. - virtual void HandleMessages(ClientConnection* viewer, - ClientMessageList* messages) = 0; + virtual void HandleMessage(ClientConnection* viewer, + ChromotingClientMessage* message) = 0; // Called when the network connection is opened. virtual void OnConnectionOpened(ClientConnection* viewer) = 0; @@ -57,45 +57,21 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>, // a libjingle channel, these events are delegated to |handler|. // It is guranteed that |handler| is called only on the |message_loop|. ClientConnection(MessageLoop* message_loop, - ProtocolDecoder* decoder, EventHandler* handler); virtual ~ClientConnection(); - // Creates a DataBuffer object that wraps around ChromotingHostMessage. The - // DataBuffer object will be responsible for serializing and framing the - // message. DataBuffer will also own |msg| after this call. - static scoped_refptr<media::DataBuffer> CreateWireFormatDataBuffer( - const ChromotingHostMessage* msg); + virtual void Init(ChromotingConnection* connection); - virtual void set_jingle_channel(JingleChannel* channel) { - channel_ = channel; - } - - // Returns the channel in use. - virtual JingleChannel* jingle_channel() { return channel_; } + // Returns the connection in use. + virtual ChromotingConnection* connection() { return connection_; } // Send information to the client for initialization. virtual void SendInitClientMessage(int width, int height); - // Notifies the viewer the start of an update stream. - virtual void SendBeginUpdateStreamMessage(); - // Send encoded update stream data to the viewer. - // - // |data| is the actual bytes in wire format. That means it is fully framed - // and serialized from a ChromotingHostMessage. This is a special case only - // for UpdateStreamPacket to reduce the amount of memory copies. - // - // |data| should be created by calling to - // CreateWireFormatDataBuffer(ChromotingHostMessage). virtual void SendUpdateStreamPacketMessage( - scoped_refptr<media::DataBuffer> data); - - // Notifies the viewer the update stream has ended. - virtual void SendEndUpdateStreamMessage(); - - virtual void MarkEndOfUpdate(); + const ChromotingHostMessage& message); // Gets the number of update stream messages not yet transmitted. // Note that the value returned is an estimate using average size of the @@ -109,43 +85,34 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>, // After this method is called all the send method calls will be ignored. virtual void Disconnect(); - ///////////////////////////////////////////////////////////////////////////// - // JingleChannel::Callback implmentations - virtual void OnStateChange(JingleChannel* channel, - JingleChannel::State state); - virtual void OnPacketReceived(JingleChannel* channel, - scoped_refptr<media::DataBuffer> data); - protected: // Protected constructor used by unit test. ClientConnection() {} private: + // Callback for ChromotingConnection. + void OnConnectionStateChange(ChromotingConnection::State state); + + // Callback for EventsStreamReader. + void OnMessageReceived(ChromotingClientMessage* message); + // Process a libjingle state change event on the |loop_|. - void StateChangeTask(JingleChannel::State state); + void StateChangeTask(ChromotingConnection::State state); // Process a data buffer received from libjingle. - void PacketReceivedTask(scoped_refptr<media::DataBuffer> data); + void MessageReceivedTask(ChromotingClientMessage* message); + + void OnClosed(); // The libjingle channel used to send and receive data from the remote client. - scoped_refptr<JingleChannel> channel_; + scoped_refptr<ChromotingConnection> connection_; + + EventsStreamReader events_reader_; + VideoStreamWriter video_writer_; // The message loop that this object runs on. MessageLoop* loop_; - // An object used by the ClientConnection to decode data received from the - // network. - scoped_ptr<ProtocolDecoder> decoder_; - - // A queue to count the sizes of the last 10 update streams. - std::deque<int> size_queue_; - - // Count the sum of sizes in the queue. - int size_in_queue_; - - // Measure the number of bytes of the current upstream stream. - int update_stream_size_; - // Event handler for handling events sent from this object. EventHandler* handler_; diff --git a/remoting/host/client_connection_unittest.cc b/remoting/host/client_connection_unittest.cc index 9c3bcb3..317d7fa 100644 --- a/remoting/host/client_connection_unittest.cc +++ b/remoting/host/client_connection_unittest.cc @@ -7,7 +7,7 @@ #include "remoting/base/mock_objects.h" #include "remoting/host/client_connection.h" #include "remoting/host/mock_objects.h" -#include "remoting/jingle_glue/mock_objects.h" +#include "remoting/protocol/fake_connection.h" #include "testing/gmock/include/gmock/gmock.h" using ::testing::_; @@ -23,86 +23,67 @@ class ClientConnectionTest : public testing::Test { protected: virtual void SetUp() { - decoder_ = new MockProtocolDecoder(); - channel_ = new StrictMock<MockJingleChannel>(); - - // Allocate a ClientConnection object with the mock objects. we give the - // ownership of decoder to the viewer. - viewer_ = new ClientConnection(&message_loop_, - decoder_, - &handler_); - - viewer_->set_jingle_channel(channel_.get()); + connection_ = new FakeChromotingConnection(); + connection_->set_message_loop(&message_loop_); + + // Allocate a ClientConnection object with the mock objects. + viewer_ = new ClientConnection(&message_loop_, &handler_); + viewer_->Init(connection_); + EXPECT_CALL(handler_, OnConnectionOpened(viewer_.get())); + connection_->get_state_change_callback()->Run( + ChromotingConnection::CONNECTED); + message_loop_.RunAllPending(); } MessageLoop message_loop_; - MockProtocolDecoder* decoder_; MockClientConnectionEventHandler handler_; scoped_refptr<ClientConnection> viewer_; - // |channel_| is wrapped with StrictMock because we limit strictly the calls - // to it. - scoped_refptr<StrictMock<MockJingleChannel> > channel_; + scoped_refptr<FakeChromotingConnection> connection_; private: DISALLOW_COPY_AND_ASSIGN(ClientConnectionTest); }; TEST_F(ClientConnectionTest, SendUpdateStream) { - // Tell the viewer we are starting an update stream. - EXPECT_CALL(*channel_, Write(_)); - viewer_->SendBeginUpdateStreamMessage(); - // Then send the actual data. - EXPECT_CALL(*channel_, Write(_)); - scoped_refptr<media::DataBuffer> data = new media::DataBuffer(10); - viewer_->SendUpdateStreamPacketMessage(data); - - // Send the end of update message. - EXPECT_CALL(*channel_, Write(_)); - viewer_->SendEndUpdateStreamMessage(); + ChromotingHostMessage message; + viewer_->SendUpdateStreamPacketMessage(message); // And then close the connection to ClientConnection. - EXPECT_CALL(*channel_, Close()); viewer_->Disconnect(); -} -TEST_F(ClientConnectionTest, StateChange) { - EXPECT_CALL(handler_, OnConnectionOpened(viewer_.get())); - viewer_->OnStateChange(channel_.get(), JingleChannel::OPEN); message_loop_.RunAllPending(); + // Verify that something has been written. + // TODO(sergeyu): Verify that the correct data has been written. + EXPECT_GT(connection_->GetVideoChannel()->written_data().size(), 0u); +} + +TEST_F(ClientConnectionTest, StateChange) { EXPECT_CALL(handler_, OnConnectionClosed(viewer_.get())); - viewer_->OnStateChange(channel_.get(), JingleChannel::CLOSED); + connection_->get_state_change_callback()->Run(ChromotingConnection::CLOSED); message_loop_.RunAllPending(); EXPECT_CALL(handler_, OnConnectionFailed(viewer_.get())); - viewer_->OnStateChange(channel_.get(), JingleChannel::FAILED); - message_loop_.RunAllPending(); -} - -TEST_F(ClientConnectionTest, ParseMessages) { - scoped_refptr<media::DataBuffer> data; - - // Give the data to the ClientConnection, it will use ProtocolDecoder to - // decode the messages. - EXPECT_CALL(*decoder_, ParseClientMessages(data, NotNull())); - EXPECT_CALL(handler_, HandleMessages(viewer_.get(), NotNull())); - - viewer_->OnPacketReceived(channel_.get(), data); + connection_->get_state_change_callback()->Run(ChromotingConnection::FAILED); message_loop_.RunAllPending(); } // Test that we can close client connection more than once and operations // after the connection is closed has no effect. TEST_F(ClientConnectionTest, Close) { - EXPECT_CALL(*channel_, Close()); viewer_->Disconnect(); + message_loop_.RunAllPending(); + EXPECT_TRUE(connection_->is_closed()); - scoped_refptr<media::DataBuffer> data = new media::DataBuffer(10); - viewer_->SendUpdateStreamPacketMessage(data); - viewer_->SendEndUpdateStreamMessage(); + ChromotingHostMessage message; + viewer_->SendUpdateStreamPacketMessage(message); viewer_->Disconnect(); + message_loop_.RunAllPending(); + + // Verify that nothing has been written. + EXPECT_EQ(connection_->GetVideoChannel()->written_data().size(), 0u); } } // namespace remoting diff --git a/remoting/host/event_executor.h b/remoting/host/event_executor.h index bfdf38f..3f87ea7 100644 --- a/remoting/host/event_executor.h +++ b/remoting/host/event_executor.h @@ -7,11 +7,12 @@ #include <vector> -#include "remoting/base/protocol_decoder.h" +#include "base/basictypes.h" namespace remoting { class Capturer; +class ChromotingClientMessage; // An interface that defines the behavior of an event executor object. // An event executor is to perform actions on the host machine. For example @@ -19,14 +20,14 @@ class Capturer; // clipboards. class EventExecutor { public: - EventExecutor(Capturer* capturer) + explicit EventExecutor(Capturer* capturer) : capturer_(capturer) { } virtual ~EventExecutor() {} // Handles input events from ClientMessageList and removes them from the // list. - virtual void HandleInputEvents(ClientMessageList* messages) = 0; + virtual void HandleInputEvent(ChromotingClientMessage* message) = 0; // TODO(hclam): Define actions for clipboards. protected: diff --git a/remoting/host/event_executor_linux.cc b/remoting/host/event_executor_linux.cc index d2e4a13..44c4c55 100644 --- a/remoting/host/event_executor_linux.cc +++ b/remoting/host/event_executor_linux.cc @@ -4,6 +4,8 @@ #include "remoting/host/event_executor_linux.h" +#include "remoting/protocol/messages_decoder.h" + namespace remoting { EventExecutorLinux::EventExecutorLinux(Capturer* capturer) @@ -13,7 +15,8 @@ EventExecutorLinux::EventExecutorLinux(Capturer* capturer) EventExecutorLinux::~EventExecutorLinux() { } -void EventExecutorLinux::HandleInputEvents(ClientMessageList* messages) { +void EventExecutorLinux::HandleInputEvent(ChromotingClientMessage* message) { + delete message; } } // namespace remoting diff --git a/remoting/host/event_executor_linux.h b/remoting/host/event_executor_linux.h index 44b8209..bdb3e73 100644 --- a/remoting/host/event_executor_linux.h +++ b/remoting/host/event_executor_linux.h @@ -17,7 +17,7 @@ class EventExecutorLinux : public EventExecutor { EventExecutorLinux(Capturer* capturer); virtual ~EventExecutorLinux(); - virtual void HandleInputEvents(ClientMessageList* messages); + virtual void HandleInputEvent(ChromotingClientMessage* message); private: DISALLOW_COPY_AND_ASSIGN(EventExecutorLinux); diff --git a/remoting/host/event_executor_mac.cc b/remoting/host/event_executor_mac.cc index 3a1256b..89acece 100644 --- a/remoting/host/event_executor_mac.cc +++ b/remoting/host/event_executor_mac.cc @@ -13,7 +13,8 @@ EventExecutorMac::EventExecutorMac(Capturer* capturer) EventExecutorMac::~EventExecutorMac() { } -void EventExecutorMac::HandleInputEvents(ClientMessageList* messages) { +void EventExecutorMac::HandleInputEvent(ChromotingClientMessage* message) { + delete message; } } // namespace remoting diff --git a/remoting/host/event_executor_mac.h b/remoting/host/event_executor_mac.h index 854d274..f8934b1 100644 --- a/remoting/host/event_executor_mac.h +++ b/remoting/host/event_executor_mac.h @@ -17,7 +17,7 @@ class EventExecutorMac : public EventExecutor { EventExecutorMac(Capturer* capturer); virtual ~EventExecutorMac(); - virtual void HandleInputEvents(ClientMessageList* messages); + virtual void HandleInputEvent(ChromotingClientMessage* message); private: DISALLOW_COPY_AND_ASSIGN(EventExecutorMac); diff --git a/remoting/host/event_executor_win.cc b/remoting/host/event_executor_win.cc index 24aa841..762cd0c 100644 --- a/remoting/host/event_executor_win.cc +++ b/remoting/host/event_executor_win.cc @@ -355,28 +355,21 @@ EventExecutorWin::EventExecutorWin(Capturer* capturer) EventExecutorWin::~EventExecutorWin() { } -void EventExecutorWin::HandleInputEvents(ClientMessageList* messages) { - for (ClientMessageList::iterator it = messages->begin(); - it != messages->end(); - ++it) { - ChromotingClientMessage* msg = *it; - if (msg->has_mouse_set_position_event()) { - HandleMouseSetPosition(msg); - } else if (msg->has_mouse_move_event()) { - HandleMouseMove(msg); - } else if (msg->has_mouse_wheel_event()) { - HandleMouseWheel(msg); - } else if (msg->has_mouse_down_event()) { - HandleMouseButtonDown(msg); - } else if (msg->has_mouse_up_event()) { - HandleMouseButtonUp(msg); - } else if (msg->has_key_event()) { - HandleKey(msg); - } +void EventExecutorWin::HandleInputEvent(ChromotingClientMessage* msg) { + if (msg->has_mouse_set_position_event()) { + HandleMouseSetPosition(msg); + } else if (msg->has_mouse_move_event()) { + HandleMouseMove(msg); + } else if (msg->has_mouse_wheel_event()) { + HandleMouseWheel(msg); + } else if (msg->has_mouse_down_event()) { + HandleMouseButtonDown(msg); + } else if (msg->has_mouse_up_event()) { + HandleMouseButtonUp(msg); + } else if (msg->has_key_event()) { + HandleKey(msg); } - // We simply delete all messages. - // TODO(hclam): Delete messages processed. - STLDeleteElements<ClientMessageList>(messages); + delete msg; } void EventExecutorWin::HandleMouseSetPosition(ChromotingClientMessage* msg) { diff --git a/remoting/host/event_executor_win.h b/remoting/host/event_executor_win.h index 4a354e6..e9a0269 100644 --- a/remoting/host/event_executor_win.h +++ b/remoting/host/event_executor_win.h @@ -17,7 +17,7 @@ class EventExecutorWin : public EventExecutor { EventExecutorWin(Capturer* capturer); virtual ~EventExecutorWin(); - virtual void HandleInputEvents(ClientMessageList* messages); + virtual void HandleInputEvent(ChromotingClientMessage* message); private: void HandleMouseSetPosition(ChromotingClientMessage* msg); diff --git a/remoting/host/mock_objects.h b/remoting/host/mock_objects.h index 9e6b631..64dcc55 100644 --- a/remoting/host/mock_objects.h +++ b/remoting/host/mock_objects.h @@ -6,10 +6,10 @@ #define REMOTING_HOST_MOCK_OBJECTS_H_ #include "media/base/data_buffer.h" -#include "remoting/base/protocol_decoder.h" #include "remoting/host/capturer.h" #include "remoting/host/client_connection.h" #include "remoting/host/event_executor.h" +#include "remoting/protocol/messages_decoder.h" #include "testing/gmock/include/gmock/gmock.h" namespace remoting { @@ -36,7 +36,7 @@ class MockEventExecutor : public EventExecutor { public: MockEventExecutor(Capturer* capturer) : EventExecutor(capturer) {} - MOCK_METHOD1(HandleInputEvents, void(ClientMessageList* messages)); + MOCK_METHOD1(HandleInputEvent, void(ChromotingClientMessage* messages)); private: DISALLOW_COPY_AND_ASSIGN(MockEventExecutor); @@ -46,17 +46,14 @@ class MockClientConnection : public ClientConnection { public: MockClientConnection(){} + MOCK_METHOD1(Init, void(ChromotingConnection* connection)); MOCK_METHOD2(SendInitClientMessage, void(int width, int height)); MOCK_METHOD0(SendBeginUpdateStreamMessage, void()); MOCK_METHOD1(SendUpdateStreamPacketMessage, - void(scoped_refptr<media::DataBuffer> data)); + void(const ChromotingHostMessage& message)); MOCK_METHOD0(SendEndUpdateStreamMessage, void()); MOCK_METHOD0(GetPendingUpdateStreamMessages, int()); - - MOCK_METHOD2(OnStateChange, void(JingleChannel* channel, - JingleChannel::State state)); - MOCK_METHOD2(OnPacketReceived, void(JingleChannel* channel, - scoped_refptr<media::DataBuffer> data)); + MOCK_METHOD0(Disconnect, void()); private: DISALLOW_COPY_AND_ASSIGN(MockClientConnection); @@ -66,8 +63,9 @@ class MockClientConnectionEventHandler : public ClientConnection::EventHandler { public: MockClientConnectionEventHandler() {} - MOCK_METHOD2(HandleMessages, - void(ClientConnection* viewer, ClientMessageList* messages)); + MOCK_METHOD2(HandleMessage, + void(ClientConnection* viewer, + ChromotingClientMessage* message)); MOCK_METHOD1(OnConnectionOpened, void(ClientConnection* viewer)); MOCK_METHOD1(OnConnectionClosed, void(ClientConnection* viewer)); MOCK_METHOD1(OnConnectionFailed, void(ClientConnection* viewer)); diff --git a/remoting/host/session_manager.cc b/remoting/host/session_manager.cc index 7ab3bc5..4aacfc8 100644 --- a/remoting/host/session_manager.cc +++ b/remoting/host/session_manager.cc @@ -11,9 +11,9 @@ #include "base/stl_util-inl.h" #include "media/base/data_buffer.h" #include "remoting/base/capture_data.h" -#include "remoting/base/protocol_decoder.h" #include "remoting/base/tracer.h" #include "remoting/host/client_connection.h" +#include "remoting/protocol/messages_decoder.h" namespace remoting { @@ -333,24 +333,15 @@ void SessionManager::DoSendUpdate(ChromotingHostMessage* message, Encoder::EncodingState state) { DCHECK_EQ(network_loop_, MessageLoop::current()); - // TODO(ajwong): We shouldn't need EncodingState. Just inspect message. - bool is_end_of_update = (message->rectangle_update().flags() | - RectangleUpdatePacket::LAST_PACKET) != 0; - TraceContext::tracer()->PrintString("DoSendUpdate"); - // Create a data buffer in wire format from |message|. - // Note that this takes ownership of |message|. - scoped_refptr<media::DataBuffer> data = - ClientConnection::CreateWireFormatDataBuffer(message); - for (ClientConnectionList::const_iterator i = clients_.begin(); i < clients_.end(); ++i) { - (*i)->SendUpdateStreamPacketMessage(data); - - if (is_end_of_update) - (*i)->MarkEndOfUpdate(); + (*i)->SendUpdateStreamPacketMessage(*message); } + + delete message; + TraceContext::tracer()->PrintString("DoSendUpdate done"); } diff --git a/remoting/jingle_glue/stream_socket_adapter.cc b/remoting/jingle_glue/stream_socket_adapter.cc index cb84941..3155328 100644 --- a/remoting/jingle_glue/stream_socket_adapter.cc +++ b/remoting/jingle_glue/stream_socket_adapter.cc @@ -65,6 +65,7 @@ int StreamSocketAdapter::Write( if (result == net::ERR_CONNECTION_CLOSED && stream_->GetState() == talk_base::SS_OPENING) result = net::ERR_IO_PENDING; + if (result == net::ERR_IO_PENDING) { write_pending_ = true; write_callback_ = callback; diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc new file mode 100644 index 0000000..6c60a4a --- /dev/null +++ b/remoting/protocol/buffered_socket_writer.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/buffered_socket_writer.h" + +#include "base/message_loop.h" +#include "net/base/net_errors.h" + +namespace remoting { + +BufferedSocketWriter::BufferedSocketWriter() + : socket_(NULL), + message_loop_(NULL), + buffer_size_(0), + write_pending_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + written_callback_(this, &BufferedSocketWriter::OnWritten)), + closed_(false) { +} + +BufferedSocketWriter::~BufferedSocketWriter() { } + +void BufferedSocketWriter::Init(net::Socket* socket, + WriteFailedCallback* callback) { + AutoLock auto_lock(lock_); + message_loop_ = MessageLoop::current(); + socket_ = socket; +} + +bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) { + AutoLock auto_lock(lock_); + if (!socket_) + return false; + queue_.push_back(data); + buffer_size_ += data->size(); + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); + return true; +} + +void BufferedSocketWriter::DoWrite() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(socket_); + + // Don't try to write if the writer not initialized or closed, or + // there is already a write pending. + if (write_pending_ || closed_) + return; + + while (true) { + while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + AutoLock auto_lock(lock_); + if (queue_.empty()) + return; // Nothing to write. + current_buf_ = + new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); + queue_.pop_front(); + } + + int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(), + &written_callback_); + if (result >= 0) { + { + AutoLock auto_lock(lock_); + buffer_size_ -= result; + } + current_buf_->DidConsume(result); + } else { + if (result == net::ERR_IO_PENDING) { + write_pending_ = true; + } else { + if (write_failed_callback_.get()) + write_failed_callback_->Run(result); + } + + return; + } + } +} + +void BufferedSocketWriter::OnWritten(int result) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + write_pending_ = false; + + if (result < 0) { + if (write_failed_callback_.get()) + write_failed_callback_->Run(result); + } + + { + AutoLock auto_lock(lock_); + buffer_size_ -= result; + } + current_buf_->DidConsume(result); + // Schedule next write. + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); +} + +int BufferedSocketWriter::GetBufferSize() { + AutoLock auto_lock(lock_); + return buffer_size_; +} + +int BufferedSocketWriter::GetBufferChunks() { + AutoLock auto_lock(lock_); + return queue_.size(); +} + +void BufferedSocketWriter::Close() { + AutoLock auto_lock(lock_); + closed_ = true; +} + +} // namespace remoting diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h new file mode 100644 index 0000000..3d4709d --- /dev/null +++ b/remoting/protocol/buffered_socket_writer.h @@ -0,0 +1,74 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ +#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ + +#include <deque> + +#include "base/lock.h" +#include "base/ref_counted.h" +#include "net/base/io_buffer.h" +#include "net/socket/socket.h" + +class MessageLoop; + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class BufferedSocketWriter + : public base::RefCountedThreadSafe<BufferedSocketWriter> { + public: + typedef Callback1<int>::Type WriteFailedCallback; + + BufferedSocketWriter(); + virtual ~BufferedSocketWriter(); + + // Initializes the writer. Must be called on the thread that will be used + // to access the socket in the future. |callback| will be called after each + // failed write. + void Init(net::Socket* socket, WriteFailedCallback* callback); + + // Puts a new data chunk in the buffer. Returns false and doesn't enqueue + // the data if called before Init(). Can be called on any thread. + bool Write(scoped_refptr<net::IOBufferWithSize> buffer); + + // Returns current size of the buffer. Can be called on any thread. + int GetBufferSize(); + + // Returns number of chunks that are currently in the buffer waiting + // to be written. Can be called on any thread. + int GetBufferChunks(); + + // Stops writing and drops current buffers. + void Close(); + + private: + typedef std::deque<scoped_refptr<net::IOBufferWithSize> > DataQueue; + + void DoWrite(); + void OnWritten(int result); + + net::Socket* socket_; + MessageLoop* message_loop_; + scoped_ptr<WriteFailedCallback> write_failed_callback_; + + Lock lock_; + + DataQueue queue_; + int buffer_size_; + scoped_refptr<net::DrainableIOBuffer> current_buf_; + bool write_pending_; + + net::CompletionCallbackImpl<BufferedSocketWriter> written_callback_; + + bool closed_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ diff --git a/remoting/protocol/chromoting_connection.h b/remoting/protocol/chromoting_connection.h index e9c0f42d..397b633 100644 --- a/remoting/protocol/chromoting_connection.h +++ b/remoting/protocol/chromoting_connection.h @@ -9,6 +9,7 @@ #include "base/callback.h" +class MessageLoop; class Task; namespace net { @@ -17,7 +18,10 @@ class Socket; namespace remoting { -// Generic interface for Chromoting connection. +// Generic interface for Chromoting connection used by both client and host. +// Provides access to the connection channels, but doesn't depend on the +// protocol used for each channel. +// TODO(sergeyu): Remove refcounting? class ChromotingConnection : public base::RefCountedThreadSafe<ChromotingConnection> { public: @@ -37,6 +41,7 @@ class ChromotingConnection // Reliable PseudoTCP channels for this connection. // TODO(sergeyu): Remove VideoChannel, and use RTP channels instead. + // TODO(sergeyu): Make it possible to create/destroy new channels on-fly? virtual net::Socket* GetVideoChannel() = 0; virtual net::Socket* GetEventsChannel() = 0; @@ -47,7 +52,11 @@ class ChromotingConnection // JID of the other side. virtual const std::string& jid() = 0; - // Closed connection. Callbacks are guaranteed not to be called after + // Message loop that must be used for to access the channels of this + // connection. + virtual MessageLoop* message_loop() = 0; + + // Closes connection. Callbacks are guaranteed not to be called after // |closed_task| is executed. virtual void Close(Task* closed_task) = 0; diff --git a/remoting/protocol/fake_connection.cc b/remoting/protocol/fake_connection.cc new file mode 100644 index 0000000..cffacb5 --- /dev/null +++ b/remoting/protocol/fake_connection.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/fake_connection.h" + +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" + +namespace remoting { + +const char kTestJid[] = "host1@gmail.com/chromoting123"; + +FakeSocket::FakeSocket() + : read_pending_(false), + input_pos_(0) { +} + +FakeSocket::~FakeSocket() { +} + +void FakeSocket::AppendInputData(char* data, int data_size) { + input_data_.insert(input_data_.end(), data, data + data_size); + // Complete pending read if any. + if (read_pending_) { + read_pending_ = false; + int result = std::min(read_buffer_size_, + static_cast<int>(input_data_.size() - input_pos_)); + CHECK(result > 0); + memcpy(read_buffer_->data(), + &(*input_data_.begin()) + input_pos_, result); + input_pos_ += result; + read_callback_->Run(result); + read_buffer_ = NULL; + } +} + +int FakeSocket::Read(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + if (input_pos_ < static_cast<int>(input_data_.size())) { + int result = std::min(buf_len, + static_cast<int>(input_data_.size()) - input_pos_); + memcpy(buf->data(), &(*input_data_.begin()) + input_pos_, result); + input_pos_ += result; + return result; + } else { + read_pending_ = true; + read_buffer_ = buf; + read_buffer_size_ = buf_len; + read_callback_ = callback; + return net::ERR_IO_PENDING; + } +} + +int FakeSocket::Write(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + written_data_.insert(written_data_.end(), + buf->data(), buf->data() + buf_len); + return buf_len; +} + +bool FakeSocket::SetReceiveBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} +bool FakeSocket::SetSendBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +FakeChromotingConnection::FakeChromotingConnection() + : message_loop_(NULL), + jid_(kTestJid) { +} + +FakeChromotingConnection::~FakeChromotingConnection() { } + +void FakeChromotingConnection::SetStateChangeCallback( + StateChangeCallback* callback) { + callback_.reset(callback); +} + +FakeSocket* FakeChromotingConnection::GetVideoChannel() { + return &video_channel_; +} +FakeSocket* FakeChromotingConnection::GetEventsChannel() { + return &events_channel_; +} + +FakeSocket* FakeChromotingConnection::GetVideoRtpChannel() { + return &video_rtp_channel_; +} +FakeSocket* FakeChromotingConnection::GetVideoRtcpChannel() { + return &video_rtcp_channel_; +} + +const std::string& FakeChromotingConnection::jid() { + return jid_; +} + +MessageLoop* FakeChromotingConnection::message_loop() { + return message_loop_; +} + +void FakeChromotingConnection::Close(Task* closed_task) { + closed_ = true; + closed_task->Run(); + delete closed_task; +} + +} // namespace remoting diff --git a/remoting/protocol/fake_connection.h b/remoting/protocol/fake_connection.h new file mode 100644 index 0000000..cabce4a --- /dev/null +++ b/remoting/protocol/fake_connection.h @@ -0,0 +1,94 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_FAKE_CONNECTION_H_ +#define REMOTING_PROTOCOL_FAKE_CONNECTION_H_ + +#include <vector> + +#include "base/scoped_ptr.h" +#include "net/socket/socket.h" +#include "remoting/protocol/chromoting_connection.h" + +namespace remoting { + +extern const char kTestJid[]; + +// FakeSocket implement net::Socket interface for FakeConnection. All data +// written to FakeSocket is stored in a buffer returned by written_data(). +// Read() reads data from another buffer that can be set with AppendInputData(). +// Pending reads are supported, so if there is a pending read AppendInputData() +// calls the read callback. +class FakeSocket : public net::Socket { + public: + FakeSocket(); + virtual ~FakeSocket(); + + const std::vector<char>& written_data() { return written_data_; } + + void AppendInputData(char* data, int data_size); + int input_pos() { return input_pos_; } + + // net::Socket interface. + virtual int Read(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + virtual int Write(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + bool read_pending_; + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + net::CompletionCallback* read_callback_; + + std::vector<char> written_data_; + std::vector<char> input_data_; + int input_pos_; +}; + +// FakeChromotingConnection is a dummy ChromotingConnection that uses +// FakeSocket for all channels. +class FakeChromotingConnection : public ChromotingConnection { + public: + FakeChromotingConnection(); + virtual ~FakeChromotingConnection(); + + StateChangeCallback* get_state_change_callback() { return callback_.get(); } + + void set_message_loop(MessageLoop* message_loop) { + message_loop_ = message_loop; + } + + bool is_closed() { return closed_; } + + virtual void SetStateChangeCallback(StateChangeCallback* callback); + + virtual FakeSocket* GetVideoChannel(); + virtual FakeSocket* GetEventsChannel(); + + virtual FakeSocket* GetVideoRtpChannel(); + virtual FakeSocket* GetVideoRtcpChannel(); + + virtual const std::string& jid(); + + virtual MessageLoop* message_loop(); + virtual void Close(Task* closed_task); + + public: + scoped_ptr<StateChangeCallback> callback_; + MessageLoop* message_loop_; + FakeSocket video_channel_; + FakeSocket events_channel_; + FakeSocket video_rtp_channel_; + FakeSocket video_rtcp_channel_; + std::string jid_; + bool closed_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_FAKE_CONNECTION_H_ diff --git a/remoting/protocol/jingle_chromoting_connection.cc b/remoting/protocol/jingle_chromoting_connection.cc index 5c2e246..2412627 100644 --- a/remoting/protocol/jingle_chromoting_connection.cc +++ b/remoting/protocol/jingle_chromoting_connection.cc @@ -28,8 +28,8 @@ const char kEventsChannelName[] = "events"; } // namespace JingleChromotingConnection::JingleChromotingConnection( - JingleChromotingServer* session_client) - : session_client_(session_client), + JingleChromotingServer* server) + : server_(server), state_(INITIALIZING), closed_(false), session_(NULL), @@ -42,7 +42,7 @@ JingleChromotingConnection::~JingleChromotingConnection() { } void JingleChromotingConnection::Init(Session* session) { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); session_ = session; jid_ = session_->remote_name(); @@ -55,7 +55,7 @@ bool JingleChromotingConnection::HasSession(cricket::Session* session) { } Session* JingleChromotingConnection::ReleaseSession() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); SetState(CLOSED); Session* session = session_; @@ -68,35 +68,45 @@ Session* JingleChromotingConnection::ReleaseSession() { void JingleChromotingConnection::SetStateChangeCallback( StateChangeCallback* callback) { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); DCHECK(callback); state_change_callback_.reset(callback); } // TODO(sergeyu): Remove this method after we switch to RTP. net::Socket* JingleChromotingConnection::GetVideoChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return video_channel_adapter_.get(); } net::Socket* JingleChromotingConnection::GetEventsChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return events_channel_adapter_.get(); } net::Socket* JingleChromotingConnection::GetVideoRtpChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return video_rtp_channel_.get(); } net::Socket* JingleChromotingConnection::GetVideoRtcpChannel() { - DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK_EQ(server_->message_loop(), MessageLoop::current()); return video_rtcp_channel_.get(); } +const std::string& JingleChromotingConnection::jid() { + // No synchronization is needed because jid_ is not changed + // after new connection is passed to JingleChromotingServer callback. + return jid_; +} + +MessageLoop* JingleChromotingConnection::message_loop() { + return server_->message_loop(); +} + void JingleChromotingConnection::Close(Task* closed_task) { - if (MessageLoop::current() != session_client_->message_loop()) { - session_client_->message_loop()->PostTask( + if (MessageLoop::current() != server_->message_loop()) { + server_->message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &JingleChromotingConnection::Close, closed_task)); return; @@ -171,7 +181,7 @@ void JingleChromotingConnection::OnSessionState( void JingleChromotingConnection::OnInitiate(bool incoming) { jid_ = session_->remote_name(); if (incoming) - session_client_->AcceptConnection(this, session_); + server_->AcceptConnection(this, session_); SetState(CONNECTING); } diff --git a/remoting/protocol/jingle_chromoting_connection.h b/remoting/protocol/jingle_chromoting_connection.h index ada4ead..cc54182 100644 --- a/remoting/protocol/jingle_chromoting_connection.h +++ b/remoting/protocol/jingle_chromoting_connection.h @@ -41,9 +41,8 @@ class JingleChromotingConnection : public ChromotingConnection, virtual net::Socket* GetVideoRtpChannel(); virtual net::Socket* GetVideoRtcpChannel(); - // No synchronization is needed because jid_ is not changed - // after new connection is passed to JingleChromotingServer callback. - virtual const std::string& jid() { return jid_; }; + virtual const std::string& jid(); + virtual MessageLoop* message_loop(); virtual void Close(Task* closed_task); @@ -68,7 +67,7 @@ class JingleChromotingConnection : public ChromotingConnection, void SetState(State new_state); // JingleChromotingServer that created this connection. - scoped_refptr<JingleChromotingServer> session_client_; + scoped_refptr<JingleChromotingServer> server_; State state_; scoped_ptr<StateChangeCallback> state_change_callback_; diff --git a/remoting/base/protocol_decoder.cc b/remoting/protocol/messages_decoder.cc index 10d2413..f7c7439 100644 --- a/remoting/base/protocol_decoder.cc +++ b/remoting/protocol/messages_decoder.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "remoting/base/protocol_decoder.h" +#include "remoting/protocol/messages_decoder.h" #include "base/logging.h" #include "remoting/base/multiple_array_input_stream.h" @@ -10,27 +10,30 @@ namespace remoting { -ProtocolDecoder::ProtocolDecoder() +MessagesDecoder::MessagesDecoder() : last_read_position_(0), available_bytes_(0), next_payload_(0), next_payload_known_(false) { } -ProtocolDecoder::~ProtocolDecoder() {} +MessagesDecoder::~MessagesDecoder() {} -void ProtocolDecoder::ParseClientMessages(scoped_refptr<media::DataBuffer> data, +void MessagesDecoder::ParseClientMessages(scoped_refptr<net::IOBuffer> data, + int data_size, ClientMessageList* messages) { - ParseMessages<ChromotingClientMessage>(data, messages); + ParseMessages<ChromotingClientMessage>(data, data_size, messages); } -void ProtocolDecoder::ParseHostMessages(scoped_refptr<media::DataBuffer> data, +void MessagesDecoder::ParseHostMessages(scoped_refptr<net::IOBuffer> data, + int data_size, HostMessageList* messages) { - ParseMessages<ChromotingHostMessage>(data, messages); + ParseMessages<ChromotingHostMessage>(data, data_size, messages); } template <typename T> -void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data, +void MessagesDecoder::ParseMessages(scoped_refptr<net::IOBuffer> data, + int data_size, std::list<T*>* messages) { // If this is the first data in the processing queue, then set the // last read position to 0. @@ -38,8 +41,8 @@ void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data, last_read_position_ = 0; // First enqueue the data received. - data_list_.push_back(data); - available_bytes_ += data->GetDataSize(); + data_list_.push_back(DataChunk(data, data_size)); + available_bytes_ += data_size; // Then try to parse one message until we can't parse anymore. T* message; @@ -49,7 +52,7 @@ void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data, } template <typename T> -bool ProtocolDecoder::ParseOneMessage(T** message) { +bool MessagesDecoder::ParseOneMessage(T** message) { // Determine the payload size. If we already know it, then skip this // part. // We have the value set to -1 for checking later. @@ -66,18 +69,16 @@ bool ProtocolDecoder::ParseOneMessage(T** message) { return false; next_payload_known_ = false; - // Extract data from |data_list_| used to form a full protocol buffer. - DataList buffers; - std::deque<const uint8*> buffer_pointers; - std::deque<int> buffer_sizes; + // Create a MultipleArrayInputStream for parsing. + MultipleArrayInputStream stream; + std::vector<scoped_refptr<net::IOBuffer> > buffers; while (next_payload_ > 0 && !data_list_.empty()) { - scoped_refptr<media::DataBuffer> buffer = data_list_.front(); - size_t read_bytes = std::min(buffer->GetDataSize() - last_read_position_, + DataChunk* buffer = &(data_list_.front()); + size_t read_bytes = std::min(buffer->data_size - last_read_position_, next_payload_); - buffers.push_back(buffer); - buffer_pointers.push_back(buffer->GetData() + last_read_position_); - buffer_sizes.push_back(read_bytes); + buffers.push_back(buffer->data); + stream.AddBuffer(buffer->data->data() + last_read_position_, read_bytes); // Adjust counters. last_read_position_ += read_bytes; @@ -85,30 +86,24 @@ bool ProtocolDecoder::ParseOneMessage(T** message) { available_bytes_ -= read_bytes; // If the front buffer is fully read, remove it from the queue. - if (buffer->GetDataSize() == last_read_position_) { + if (buffer->data_size == last_read_position_) { data_list_.pop_front(); last_read_position_ = 0; } } DCHECK_EQ(0UL, next_payload_); - DCHECK_EQ(buffers.size(), buffer_pointers.size()); - DCHECK_EQ(buffers.size(), buffer_sizes.size()); - - // Create a MultipleArrayInputStream for parsing. - MultipleArrayInputStream stream(buffers.size()); - for (size_t i = 0; i < buffers.size(); ++i) { - stream.SetBuffer(i, buffer_pointers[i], buffer_sizes[i]); - } // And finally it is parsing. *message = new T(); bool ret = (*message)->ParseFromZeroCopyStream(&stream); - if (!ret) + if (!ret) { + LOG(ERROR) << "Received invalid message."; delete *message; + } return ret; } -bool ProtocolDecoder::GetPayloadSize(int* size) { +bool MessagesDecoder::GetPayloadSize(int* size) { // The header has a size of 4 bytes. const size_t kHeaderSize = sizeof(int32); @@ -117,24 +112,24 @@ bool ProtocolDecoder::GetPayloadSize(int* size) { std::string header; while (header.length() < kHeaderSize && !data_list_.empty()) { - scoped_refptr<media::DataBuffer> buffer = data_list_.front(); + DataChunk* buffer = &(data_list_.front()); // Find out how many bytes we need and how many bytes are available in this // buffer. int needed_bytes = kHeaderSize - header.length(); - int available_bytes = buffer->GetDataSize() - last_read_position_; + int available_bytes = buffer->data_size - last_read_position_; // Then append the required bytes into the header and advance the last // read position. int read_bytes = std::min(needed_bytes, available_bytes); header.append( - reinterpret_cast<const char*>(buffer->GetData()) + last_read_position_, + reinterpret_cast<char*>(buffer->data->data()) + last_read_position_, read_bytes); last_read_position_ += read_bytes; available_bytes_ -= read_bytes; // If the buffer is depleted then remove it from the queue. - if (last_read_position_ == buffer->GetDataSize()) { + if (last_read_position_ == buffer->data_size) { last_read_position_ = 0; data_list_.pop_front(); } diff --git a/remoting/base/protocol_decoder.h b/remoting/protocol/messages_decoder.h index fed196f..66014f6 100644 --- a/remoting/base/protocol_decoder.h +++ b/remoting/protocol/messages_decoder.h @@ -2,15 +2,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#ifndef REMOTING_BASE_PROTOCOL_DECODER_H_ -#define REMOTING_BASE_PROTOCOL_DECODER_H_ +#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_ +#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_ #include <deque> #include <list> #include "base/ref_counted.h" #include "google/protobuf/message_lite.h" -#include "media/base/data_buffer.h" +#include "net/base/io_buffer.h" #include "remoting/base/protocol/chromotocol.pb.h" namespace remoting { @@ -21,27 +21,45 @@ typedef std::list<ChromotingClientMessage*> ClientMessageList; // A protocol decoder is used to decode data transmitted in the chromoting // network. // TODO(hclam): Defines the interface and implement methods. -class ProtocolDecoder { +class MessagesDecoder { public: - ProtocolDecoder(); + MessagesDecoder(); + virtual ~MessagesDecoder(); - virtual ~ProtocolDecoder(); - - // Parse data received from network into ClientMessages. Ownership of |data| - // is passed to this object and output is written to |messages|. - virtual void ParseClientMessages(scoped_refptr<media::DataBuffer> data, + // Parse data received from network into ClientMessages. Output is written + // to |messages|. + virtual void ParseClientMessages(scoped_refptr<net::IOBuffer> data, + int data_size, ClientMessageList* messages); - // Parse data received from network into HostMessages. Ownership of |data| - // is passed to this object and output is written to |messages|. - virtual void ParseHostMessages(scoped_refptr<media::DataBuffer> data, + // Parse data received from network into HostMessages. Output is + // written to |messages|. + virtual void ParseHostMessages(scoped_refptr<net::IOBuffer> data, + int data_size, HostMessageList* messages); private: + // DataChunk stores reference to a net::IOBuffer and size of the data + // stored in that buffer. + struct DataChunk { + DataChunk(net::IOBuffer* data, size_t data_size) + : data(data), + data_size(data_size) { + } + + scoped_refptr<net::IOBuffer> data; + size_t data_size; + }; + + // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer + // instead of storing chunks in dqueue. + typedef std::deque<DataChunk> DataList; + // A private method used to parse data received from network into protocol // buffers. template <typename T> - void ParseMessages(scoped_refptr<media::DataBuffer> data, + void ParseMessages(scoped_refptr<net::IOBuffer> data, + int data_size, std::list<T*>* messages); // Parse one message from |data_list_|. Return true if sucessful. @@ -52,7 +70,6 @@ class ProtocolDecoder { // data list. Return false if we don't have enough data. bool GetPayloadSize(int* size); - typedef std::deque<scoped_refptr<media::DataBuffer> > DataList; DataList data_list_; size_t last_read_position_; @@ -69,4 +86,4 @@ class ProtocolDecoder { } // namespace remoting -#endif // REMOTING_BASE_PROTOCOL_DECODER_H_ +#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_ diff --git a/remoting/base/protocol_decoder_unittest.cc b/remoting/protocol/messages_decoder_unittest.cc index ae4ba85..7c5616d 100644 --- a/remoting/base/protocol_decoder_unittest.cc +++ b/remoting/protocol/messages_decoder_unittest.cc @@ -6,9 +6,9 @@ #include "base/scoped_ptr.h" #include "base/stl_util-inl.h" -#include "media/base/data_buffer.h" -#include "remoting/base/protocol_decoder.h" -#include "remoting/base/protocol_util.h" +#include "net/base/io_buffer.h" +#include "remoting/protocol/messages_decoder.h" +#include "remoting/protocol/util.h" #include "testing/gtest/include/gtest/gtest.h" namespace remoting { @@ -20,10 +20,9 @@ static const std::string kTestData = "Chromoting rockz"; static void AppendMessage(const ChromotingHostMessage& msg, std::string* buffer) { // Contains one encoded message. - scoped_refptr<media::DataBuffer> encoded_msg; + scoped_refptr<net::IOBufferWithSize> encoded_msg; encoded_msg = SerializeAndFrameMessage(msg); - buffer->append(reinterpret_cast<const char*>(encoded_msg->GetData()), - encoded_msg->GetDataSize()); + buffer->append(encoded_msg->data(), encoded_msg->size()); } // Construct and prepare data in the |output_stream|. @@ -61,20 +60,20 @@ static void PrepareData(uint8** buffer, int* size) { memcpy(*buffer, encoded_data.c_str(), *size); } -TEST(ProtocolDecoderTest, BasicOperations) { +TEST(MessagesDecoderTest, BasicOperations) { // Prepare encoded data for testing. int size; uint8* test_data; PrepareData(&test_data, &size); scoped_array<uint8> memory_deleter(test_data); - // Then simulate using ProtocolDecoder to decode variable + // Then simulate using MessagesDecoder to decode variable // size of encoded data. // The first thing to do is to generate a variable size of data. This is done // by iterating the following array for read sizes. const int kReadSizes[] = {1, 2, 3, 1}; - ProtocolDecoder decoder; + MessagesDecoder decoder; // Then feed the protocol decoder using the above generated data and the // read pattern. @@ -84,10 +83,9 @@ TEST(ProtocolDecoderTest, BasicOperations) { int read = std::min(size - i, kReadSizes[i % arraysize(kReadSizes)]); // And then prepare a DataBuffer for feeding it. - scoped_refptr<media::DataBuffer> buffer = new media::DataBuffer(read); - memcpy(buffer->GetWritableData(), test_data + i, read); - buffer->SetDataSize(read); - decoder.ParseHostMessages(buffer, &message_list); + scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(read); + memcpy(buffer->data(), test_data + i, read); + decoder.ParseHostMessages(buffer, read, &message_list); i += read; } @@ -98,10 +96,12 @@ TEST(ProtocolDecoderTest, BasicOperations) { delete message_list.front(); message_list.pop_front(); + int index = 0; for (HostMessageList::iterator it = message_list.begin(); it != message_list.end(); ++it) { ChromotingHostMessage* message = *it; - int type = (i - 1) % 3; + int type = index % 3; + ++index; if (type == 0) { // Begin update stream. EXPECT_TRUE(message->has_begin_update_stream()); diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc new file mode 100644 index 0000000..bfceb3e --- /dev/null +++ b/remoting/protocol/stream_reader.cc @@ -0,0 +1,102 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/stream_reader.h" + +#include "base/message_loop.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" +#include "remoting/protocol/chromoting_connection.h" + +namespace remoting { + +namespace { +int kReadBufferSize = 4096; +} // namespace + +StreamReaderBase::StreamReaderBase() + : socket_(NULL), + closed_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &StreamReaderBase::OnRead)) { +} + +StreamReaderBase::~StreamReaderBase() { } + +void StreamReaderBase::Close() { + closed_ = true; +} + +void StreamReaderBase::Init(net::Socket* socket) { + DCHECK(socket); + socket_ = socket; + DoRead(); +} + +void StreamReaderBase::DoRead() { + while (true) { + read_buffer_ = new net::IOBuffer(kReadBufferSize); + int result = socket_->Read( + read_buffer_, kReadBufferSize, &read_callback_); + HandleReadResult(result); + if (result < 0) + break; + } +} + +void StreamReaderBase::OnRead(int result) { + if (!closed_) { + HandleReadResult(result); + DoRead(); + } +} + +void StreamReaderBase::HandleReadResult(int result) { + if (result > 0) { + OnDataReceived(read_buffer_, result); + } else { + if (result != net::ERR_IO_PENDING) + LOG(ERROR) << "Read() returned error " << result; + } +} + +// EventsStreamReader class. +EventsStreamReader::EventsStreamReader() { } +EventsStreamReader::~EventsStreamReader() { } + +void EventsStreamReader::Init(net::Socket* socket, + OnMessageCallback* on_message_callback) { + on_message_callback_.reset(on_message_callback); + StreamReaderBase::Init(socket); +} + +void EventsStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { + ClientMessageList messages_list; + messages_decoder_.ParseClientMessages(buffer, data_size, &messages_list); + for (ClientMessageList::iterator it = messages_list.begin(); + it != messages_list.end(); ++it) { + on_message_callback_->Run(*it); + } +} + +// VideoStreamReader class. +VideoStreamReader::VideoStreamReader() { } +VideoStreamReader::~VideoStreamReader() { } + +void VideoStreamReader::Init(net::Socket* socket, + OnMessageCallback* on_message_callback) { + on_message_callback_.reset(on_message_callback); + StreamReaderBase::Init(socket); +} + +void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { + HostMessageList messages_list; + messages_decoder_.ParseHostMessages(buffer, data_size, &messages_list); + for (HostMessageList::iterator it = messages_list.begin(); + it != messages_list.end(); ++it) { + on_message_callback_->Run(*it); + } +} + +} // namespace remoting diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h new file mode 100644 index 0000000..b71f4b0 --- /dev/null +++ b/remoting/protocol/stream_reader.h @@ -0,0 +1,97 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ +#define REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ + +#include "base/callback.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" +#include "net/base/completion_callback.h" +#include "remoting/protocol/messages_decoder.h" + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class ChromotingConnection; + +class StreamReaderBase { + public: + StreamReaderBase(); + virtual ~StreamReaderBase(); + + // Stops reading. Must be called on the same thread as Init(). + void Close(); + + protected: + void Init(net::Socket* socket); + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0; + + private: + void DoRead(); + void OnRead(int result); + void HandleReadResult(int result); + + net::Socket* socket_; + bool closed_; + scoped_refptr<net::IOBuffer> read_buffer_; + net::CompletionCallbackImpl<StreamReaderBase> read_callback_; +}; + +class EventsStreamReader : public StreamReaderBase { + public: + EventsStreamReader(); + ~EventsStreamReader(); + + // The OnMessageCallback is called whenever a new message is received. + // Ownership of the message is passed the callback. + typedef Callback1<ChromotingClientMessage*>::Type OnMessageCallback; + + // Initialize the reader and start reading. Must be called on the thread + // |socket| belongs to. The callback will be called when a new message is + // received. EventsStreamReader owns |on_message_callback|, doesn't own + // |socket|. + void Init(net::Socket* socket, OnMessageCallback* on_message_callback); + + protected: + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); + + private: + MessagesDecoder messages_decoder_; + scoped_ptr<OnMessageCallback> on_message_callback_; + + DISALLOW_COPY_AND_ASSIGN(EventsStreamReader); +}; + +class VideoStreamReader : public StreamReaderBase { + public: + VideoStreamReader(); + ~VideoStreamReader(); + + // The OnMessageCallback is called whenever a new message is received. + // Ownership of the message is passed the callback. + typedef Callback1<ChromotingHostMessage*>::Type OnMessageCallback; + + // Initialize the reader and start reading. Must be called on the thread + // |socket| belongs to. The callback will be called when a new message is + // received. VideoStreamReader owns |on_message_callback|, doesn't own + // |socket|. + void Init(net::Socket* socket, OnMessageCallback* on_message_callback); + + protected: + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); + + private: + MessagesDecoder messages_decoder_; + scoped_ptr<OnMessageCallback> on_message_callback_; + + DISALLOW_COPY_AND_ASSIGN(VideoStreamReader); +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ diff --git a/remoting/protocol/stream_writer.cc b/remoting/protocol/stream_writer.cc new file mode 100644 index 0000000..3269a1b --- /dev/null +++ b/remoting/protocol/stream_writer.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/stream_writer.h" + +#include "base/message_loop.h" +#include "remoting/protocol/chromoting_connection.h" +#include "remoting/protocol/util.h" + +namespace remoting { + +StreamWriterBase::StreamWriterBase() + : socket_(NULL), + buffered_writer_(new BufferedSocketWriter()) { +} + +StreamWriterBase::~StreamWriterBase() { } + +void StreamWriterBase::Init(net::Socket* socket) { + socket_ = socket; + buffered_writer_->Init(socket, NULL); +} + +int StreamWriterBase::GetBufferSize() { + return buffered_writer_->GetBufferSize(); +} + +int StreamWriterBase::GetPendingMessages() { + return buffered_writer_->GetBufferChunks(); +} + +void StreamWriterBase::Close() { + buffered_writer_->Close(); +} + +bool EventsStreamWriter::SendMessage( + const ChromotingClientMessage& message) { + return buffered_writer_->Write(SerializeAndFrameMessage(message)); +} + +bool VideoStreamWriter::SendMessage( + const ChromotingHostMessage& message) { + return buffered_writer_->Write(SerializeAndFrameMessage(message)); +} + +} // namespace remoting diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h new file mode 100644 index 0000000..4bc2b76 --- /dev/null +++ b/remoting/protocol/stream_writer.h @@ -0,0 +1,53 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_ +#define REMOTING_PROTOCOL_STREAM_WRITER_H_ + +#include "remoting/protocol/buffered_socket_writer.h" +#include "remoting/protocol/messages_decoder.h" + +namespace remoting { + +class ChromotingConnection; + +class StreamWriterBase { + public: + StreamWriterBase(); + virtual ~StreamWriterBase(); + + // Initializes the writer. Must be called on the thread the |socket| belongs + // to. + void Init(net::Socket* socket); + + // Return current buffer state. Can be called from any thread. + int GetBufferSize(); + int GetPendingMessages(); + + // Stop writing and drop pending data. Must be called from the same thread as + // Init(). + void Close(); + + protected: + net::Socket* socket_; + scoped_refptr<BufferedSocketWriter> buffered_writer_; +}; + +class EventsStreamWriter : public StreamWriterBase { + public: + // Sends the |message| or returns false if called before Init(). + // Can be called on any thread. + bool SendMessage(const ChromotingClientMessage& message); +}; + +class VideoStreamWriter : public StreamWriterBase { + public: + // Sends the |message| or returns false if called before Init(). + // Can be called on any thread. + bool SendMessage(const ChromotingHostMessage& message); +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_STREAM_WRITER_H_ diff --git a/remoting/protocol/util.cc b/remoting/protocol/util.cc new file mode 100644 index 0000000..5a23c93 --- /dev/null +++ b/remoting/protocol/util.cc @@ -0,0 +1,28 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/util.h" + +#include "base/basictypes.h" +#include "base/hash_tables.h" +#include "base/logging.h" +#include "net/base/io_buffer.h" +#include "third_party/libjingle/source/talk/base/byteorder.h" + +namespace remoting { + +scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage( + const google::protobuf::MessageLite& msg) { + // Create a buffer with 4 extra bytes. This is used as prefix to write an + // int32 of the serialized message size for framing. + const int kExtraBytes = sizeof(int32); + int size = msg.ByteSize() + kExtraBytes; + scoped_refptr<net::IOBufferWithSize> buffer = new net::IOBufferWithSize(size); + talk_base::SetBE32(buffer->data(), msg.GetCachedSize()); + msg.SerializeWithCachedSizesToArray( + reinterpret_cast<uint8*>(buffer->data()) + kExtraBytes); + return buffer; +} + +} // namespace remoting diff --git a/remoting/base/protocol_util.h b/remoting/protocol/util.h index 94cdc2f..ef15e45 100644 --- a/remoting/base/protocol_util.h +++ b/remoting/protocol/util.h @@ -2,11 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#ifndef REMOTING_BASE_PROTOCOL_UTIL_H_ -#define REMOTING_BASE_PROTOCOL_UTIL_H_ +#ifndef REMOTING_PROTOCOL_UTIL_H_ +#define REMOTING_PROTOCOL_UTIL_H_ #include "google/protobuf/message_lite.h" -#include "media/base/data_buffer.h" +#include "net/base/io_buffer.h" #include "remoting/base/protocol/chromotocol.pb.h" // This file defines utility methods used for encoding and decoding the protocol @@ -17,11 +17,9 @@ namespace remoting { // sending it over the wire. // This will provide sufficient prefix and suffix for the receiver side to // decode the message. -scoped_refptr<media::DataBuffer> SerializeAndFrameMessage( +scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage( const google::protobuf::MessageLite& msg); -int GetBytesPerPixel(PixelFormat format); - } // namespace remoting -#endif // REMOTING_BASE_PROTOCOL_UTIL_H_ +#endif // REMOTING_PROTOCOL_UTIL_H_ diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp index 716b3e2..bcde3f5 100644 --- a/remoting/remoting.gyp +++ b/remoting/remoting.gyp @@ -116,9 +116,11 @@ '../third_party/protobuf/protobuf.gyp:protobuf_lite', '../third_party/libvpx/libvpx.gyp:libvpx_include', '../third_party/zlib/zlib.gyp:zlib', + 'chromoting_jingle_glue', 'base/protocol/chromotocol.gyp:chromotocol_proto_lib', 'base/protocol/chromotocol.gyp:trace_proto_lib', - 'chromoting_jingle_glue', + # TODO(hclam): Enable VP8 in the build. + #'third_party/on2/on2.gyp:vp8', ], 'export_dependent_settings': [ '../third_party/protobuf/protobuf.gyp:protobuf_lite', @@ -154,13 +156,11 @@ 'base/encoder_zlib.h', 'base/multiple_array_input_stream.cc', 'base/multiple_array_input_stream.h', - 'base/protocol_decoder.cc', - 'base/protocol_decoder.h', - 'base/protocol_util.cc', - 'base/protocol_util.h', 'base/tracer.cc', 'base/tracer.h', 'base/types.h', + 'base/util.cc', + 'base/util.h', ], }, # end of target 'chromoting_base' @@ -170,6 +170,7 @@ 'dependencies': [ 'chromoting_base', 'chromoting_jingle_glue', + 'chromoting_protocol', ], 'sources': [ 'host/access_verifier.cc', @@ -241,6 +242,7 @@ 'dependencies': [ 'chromoting_base', 'chromoting_jingle_glue', + 'chromoting_protocol', ], 'sources': [ 'client/chromoting_client.cc', @@ -358,12 +360,22 @@ 'chromoting_jingle_glue', ], 'sources': [ + 'protocol/messages_decoder.cc', + 'protocol/messages_decoder.h', + 'protocol/buffered_socket_writer.cc', + 'protocol/buffered_socket_writer.h', 'protocol/chromoting_connection.h', 'protocol/chromoting_server.h', 'protocol/jingle_chromoting_connection.cc', 'protocol/jingle_chromoting_connection.h', 'protocol/jingle_chromoting_server.cc', 'protocol/jingle_chromoting_server.h', + 'protocol/stream_reader.cc', + 'protocol/stream_reader.h', + 'protocol/stream_writer.cc', + 'protocol/stream_writer.h', + 'protocol/util.cc', + 'protocol/util.h', ], }, # end of target 'chromoting_protocol' @@ -411,7 +423,6 @@ # BUG57351 'base/encoder_zlib_unittest.cc', 'base/mock_objects.h', 'base/multiple_array_input_stream_unittest.cc', -# BUG57351 'base/protocol_decoder_unittest.cc', # BUG57351 'client/chromoting_view_unittest.cc', 'client/mock_objects.h', 'host/access_verifier_unittest.cc', @@ -433,6 +444,9 @@ 'jingle_glue/mock_objects.h', 'jingle_glue/stream_socket_adapter_unittest.cc', 'protocol/jingle_chromoting_connection_unittest.cc', + 'protocol/messages_decoder_unittest.cc', + 'protocol/fake_connection.cc', + 'protocol/fake_connection.h', 'protocol/session_manager_pair.cc', 'protocol/session_manager_pair.h', 'run_all_unittests.cc', |