summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--remoting/base/codec_test.cc2
-rw-r--r--remoting/base/decoder_row_based.cc2
-rw-r--r--remoting/base/decoder_vp8.cc2
-rw-r--r--remoting/base/encoder_verbatim.cc2
-rw-r--r--remoting/base/encoder_zlib.cc2
-rw-r--r--remoting/base/mock_objects.h16
-rw-r--r--remoting/base/multiple_array_input_stream.cc29
-rw-r--r--remoting/base/multiple_array_input_stream.h20
-rw-r--r--remoting/base/multiple_array_input_stream_unittest.cc7
-rw-r--r--remoting/base/protocol_util.cc43
-rw-r--r--remoting/base/util.cc26
-rw-r--r--remoting/base/util.h16
-rw-r--r--remoting/client/chromoting_client.cc16
-rw-r--r--remoting/client/chromoting_client.h4
-rw-r--r--remoting/client/host_connection.h12
-rw-r--r--remoting/client/jingle_host_connection.cc156
-rw-r--r--remoting/client/jingle_host_connection.h49
-rw-r--r--remoting/client/rectangle_update_decoder.cc2
-rw-r--r--remoting/host/chromoting_host.cc99
-rw-r--r--remoting/host/chromoting_host.h17
-rw-r--r--remoting/host/client_connection.cc154
-rw-r--r--remoting/host/client_connection.h81
-rw-r--r--remoting/host/client_connection_unittest.cc79
-rw-r--r--remoting/host/event_executor.h7
-rw-r--r--remoting/host/event_executor_linux.cc5
-rw-r--r--remoting/host/event_executor_linux.h2
-rw-r--r--remoting/host/event_executor_mac.cc3
-rw-r--r--remoting/host/event_executor_mac.h2
-rw-r--r--remoting/host/event_executor_win.cc35
-rw-r--r--remoting/host/event_executor_win.h2
-rw-r--r--remoting/host/mock_objects.h18
-rw-r--r--remoting/host/session_manager.cc19
-rw-r--r--remoting/jingle_glue/stream_socket_adapter.cc1
-rw-r--r--remoting/protocol/buffered_socket_writer.cc116
-rw-r--r--remoting/protocol/buffered_socket_writer.h74
-rw-r--r--remoting/protocol/chromoting_connection.h13
-rw-r--r--remoting/protocol/fake_connection.cc112
-rw-r--r--remoting/protocol/fake_connection.h94
-rw-r--r--remoting/protocol/jingle_chromoting_connection.cc34
-rw-r--r--remoting/protocol/jingle_chromoting_connection.h7
-rw-r--r--remoting/protocol/messages_decoder.cc (renamed from remoting/base/protocol_decoder.cc)65
-rw-r--r--remoting/protocol/messages_decoder.h (renamed from remoting/base/protocol_decoder.h)49
-rw-r--r--remoting/protocol/messages_decoder_unittest.cc (renamed from remoting/base/protocol_decoder_unittest.cc)28
-rw-r--r--remoting/protocol/stream_reader.cc102
-rw-r--r--remoting/protocol/stream_reader.h97
-rw-r--r--remoting/protocol/stream_writer.cc47
-rw-r--r--remoting/protocol/stream_writer.h53
-rw-r--r--remoting/protocol/util.cc28
-rw-r--r--remoting/protocol/util.h (renamed from remoting/base/protocol_util.h)12
-rw-r--r--remoting/remoting.gyp26
50 files changed, 1286 insertions, 601 deletions
diff --git a/remoting/base/codec_test.cc b/remoting/base/codec_test.cc
index 2736c3e..62b98607 100644
--- a/remoting/base/codec_test.cc
+++ b/remoting/base/codec_test.cc
@@ -11,7 +11,7 @@
#include "remoting/base/decoder.h"
#include "remoting/base/encoder.h"
#include "remoting/base/mock_objects.h"
-#include "remoting/base/protocol_util.h"
+#include "remoting/base/util.h"
#include "testing/gtest/include/gtest/gtest.h"
static const int kWidth = 320;
diff --git a/remoting/base/decoder_row_based.cc b/remoting/base/decoder_row_based.cc
index 2f4290a..65c74c5 100644
--- a/remoting/base/decoder_row_based.cc
+++ b/remoting/base/decoder_row_based.cc
@@ -7,7 +7,7 @@
#include "remoting/base/decompressor.h"
#include "remoting/base/decompressor_zlib.h"
#include "remoting/base/decompressor_verbatim.h"
-#include "remoting/base/protocol_util.h"
+#include "remoting/base/util.h"
namespace remoting {
diff --git a/remoting/base/decoder_vp8.cc b/remoting/base/decoder_vp8.cc
index 0ede4ab..452cd1f 100644
--- a/remoting/base/decoder_vp8.cc
+++ b/remoting/base/decoder_vp8.cc
@@ -6,7 +6,7 @@
#include "media/base/media.h"
#include "media/base/yuv_convert.h"
-#include "remoting/base/protocol_util.h"
+#include "remoting/base/util.h"
extern "C" {
#define VPX_CODEC_DISABLE_COMPAT 1
diff --git a/remoting/base/encoder_verbatim.cc b/remoting/base/encoder_verbatim.cc
index d831006..b636670 100644
--- a/remoting/base/encoder_verbatim.cc
+++ b/remoting/base/encoder_verbatim.cc
@@ -8,7 +8,7 @@
#include "gfx/rect.h"
#include "media/base/data_buffer.h"
#include "remoting/base/capture_data.h"
-#include "remoting/base/protocol_util.h"
+#include "remoting/base/util.h"
#include "remoting/base/protocol/chromotocol.pb.h"
namespace remoting {
diff --git a/remoting/base/encoder_zlib.cc b/remoting/base/encoder_zlib.cc
index 390a46ad..f807e85 100644
--- a/remoting/base/encoder_zlib.cc
+++ b/remoting/base/encoder_zlib.cc
@@ -9,7 +9,7 @@
#include "media/base/data_buffer.h"
#include "remoting/base/capture_data.h"
#include "remoting/base/compressor_zlib.h"
-#include "remoting/base/protocol_util.h"
+#include "remoting/base/util.h"
#include "remoting/base/protocol/chromotocol.pb.h"
namespace remoting {
diff --git a/remoting/base/mock_objects.h b/remoting/base/mock_objects.h
index 7565f22..12143ad 100644
--- a/remoting/base/mock_objects.h
+++ b/remoting/base/mock_objects.h
@@ -8,26 +8,10 @@
#include "remoting/base/capture_data.h"
#include "remoting/base/decoder.h"
#include "remoting/base/encoder.h"
-#include "remoting/base/protocol_decoder.h"
#include "testing/gmock/include/gmock/gmock.h"
namespace remoting {
-class MockProtocolDecoder : public ProtocolDecoder {
- public:
- MockProtocolDecoder() {}
-
- MOCK_METHOD2(ParseClientMessages,
- void(scoped_refptr<media::DataBuffer> data,
- ClientMessageList* messages));
- MOCK_METHOD2(ParseHostMessages,
- void(scoped_refptr<media::DataBuffer> data,
- HostMessageList* messages));
-
- private:
- DISALLOW_COPY_AND_ASSIGN(MockProtocolDecoder);
-};
-
class MockEncoder : public Encoder {
public:
MockEncoder() {}
diff --git a/remoting/base/multiple_array_input_stream.cc b/remoting/base/multiple_array_input_stream.cc
index e5b8f94..ab0ba45 100644
--- a/remoting/base/multiple_array_input_stream.cc
+++ b/remoting/base/multiple_array_input_stream.cc
@@ -9,22 +9,26 @@
namespace remoting {
-MultipleArrayInputStream::MultipleArrayInputStream(int count)
- : buffer_count_(count),
- current_buffer_(0),
+MultipleArrayInputStream::MultipleArrayInputStream()
+ : current_buffer_(0),
current_buffer_offset_(0),
position_(0),
last_returned_size_(0) {
- DCHECK_GT(buffer_count_, 0);
- buffers_.reset(new const uint8*[buffer_count_]);
- buffer_sizes_.reset(new int[buffer_count_]);
}
MultipleArrayInputStream::~MultipleArrayInputStream() {
}
+void MultipleArrayInputStream::AddBuffer(
+ const char* buffer, int size) {
+ DCHECK_EQ(position_, 0); // Haven't started reading.
+ buffers_.push_back(buffer);
+ buffer_sizes_.push_back(size);
+ DCHECK_EQ(buffers_.size(), buffer_sizes_.size());
+}
+
bool MultipleArrayInputStream::Next(const void** data, int* size) {
- if (current_buffer_ < buffer_count_) {
+ if (current_buffer_ < buffers_.size()) {
// Also reply with that is remaining in the current buffer.
last_returned_size_ =
buffer_sizes_[current_buffer_] - current_buffer_offset_;
@@ -49,7 +53,7 @@ bool MultipleArrayInputStream::Next(const void** data, int* size) {
void MultipleArrayInputStream::BackUp(int count) {
DCHECK_LE(count, last_returned_size_);
DCHECK_EQ(0, current_buffer_offset_);
- DCHECK_GT(current_buffer_, 0);
+ DCHECK_GT(current_buffer_, 0u);
// Rewind one buffer.
--current_buffer_;
@@ -63,7 +67,7 @@ bool MultipleArrayInputStream::Skip(int count) {
DCHECK_GE(count, 0);
last_returned_size_ = 0;
- while (count && current_buffer_ < buffer_count_) {
+ while (count && current_buffer_ < buffers_.size()) {
int read = std::min(
count,
buffer_sizes_[current_buffer_] - current_buffer_offset_);
@@ -86,11 +90,4 @@ int64 MultipleArrayInputStream::ByteCount() const {
return position_;
}
-void MultipleArrayInputStream::SetBuffer(
- int n, const uint8* buffer, int size) {
- CHECK(n < buffer_count_);
- buffers_[n] = buffer;
- buffer_sizes_[n] = size;
-}
-
} // namespace remoting
diff --git a/remoting/base/multiple_array_input_stream.h b/remoting/base/multiple_array_input_stream.h
index a1b5f01..7747da5 100644
--- a/remoting/base/multiple_array_input_stream.h
+++ b/remoting/base/multiple_array_input_stream.h
@@ -5,8 +5,9 @@
#ifndef REMOTING_BASE_MULTIPLE_ARRAY_INPUT_STREAM_H_
#define REMOTING_BASE_MULTIPLE_ARRAY_INPUT_STREAM_H_
+#include <vector>
+
#include "base/basictypes.h"
-#include "base/scoped_ptr.h"
#include "google/protobuf/io/zero_copy_stream.h"
namespace remoting {
@@ -19,24 +20,23 @@ class MultipleArrayInputStream :
// Construct a MultipleArrayInputStream with |count| backing arrays.
// TODO(hclam): Consider adding block size to see if it has a performance
// gain.
- explicit MultipleArrayInputStream(int count);
+ MultipleArrayInputStream();
virtual ~MultipleArrayInputStream();
+ // Add a new buffer to the list.
+ void AddBuffer(const char* buffer, int size);
+
+ // google::protobuf::io::ZeroCopyInputStream interface.
virtual bool Next(const void** data, int* size);
virtual void BackUp(int count);
virtual bool Skip(int count);
virtual int64 ByteCount() const;
- // Set the n-th buffer to be |buffer|.
- void SetBuffer(int n, const uint8* buffer, int size);
-
private:
- scoped_array<const uint8*> buffers_;
- scoped_array<int> buffer_sizes_;
-
- const int buffer_count_;
+ std::vector<const char*> buffers_;
+ std::vector<int> buffer_sizes_;
- int current_buffer_;
+ size_t current_buffer_;
int current_buffer_offset_;
int position_;
int last_returned_size_;
diff --git a/remoting/base/multiple_array_input_stream_unittest.cc b/remoting/base/multiple_array_input_stream_unittest.cc
index d8397c7..1d705fe 100644
--- a/remoting/base/multiple_array_input_stream_unittest.cc
+++ b/remoting/base/multiple_array_input_stream_unittest.cc
@@ -4,11 +4,14 @@
#include <string>
+#include "base/scoped_ptr.h"
#include "remoting/base/multiple_array_input_stream.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace remoting {
+// TODO(sergeyu): Add SCOPED_TRACE() for ReadFromInput() and ReadString().
+
static size_t ReadFromInput(MultipleArrayInputStream* input,
void* data, size_t size) {
uint8* out = reinterpret_cast<uint8*>(data);
@@ -64,11 +67,11 @@ static void PrepareData(scoped_ptr<MultipleArrayInputStream>* stream) {
segments += 2;
}
- MultipleArrayInputStream* mstream = new MultipleArrayInputStream(segments);
+ MultipleArrayInputStream* mstream = new MultipleArrayInputStream();
const char* data = kTestData.c_str();
for (int i = 0; i < segments; ++i) {
int size = i % 2 == 0 ? 1 : 2;
- mstream->SetBuffer(i, reinterpret_cast<const uint8*>(data), size);
+ mstream->AddBuffer(data, size);
data += size;
}
stream->reset(mstream);
diff --git a/remoting/base/protocol_util.cc b/remoting/base/protocol_util.cc
deleted file mode 100644
index f009eb3..0000000
--- a/remoting/base/protocol_util.cc
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "remoting/base/protocol_util.h"
-
-#include "base/basictypes.h"
-#include "base/hash_tables.h"
-#include "base/logging.h"
-#include "talk/base/byteorder.h"
-
-namespace remoting {
-
-scoped_refptr<media::DataBuffer> SerializeAndFrameMessage(
- const google::protobuf::MessageLite& msg) {
-
- // Create a buffer with 4 extra bytes. This is used as prefix to write an
- // int32 of the serialized message size for framing.
- const int kExtraBytes = sizeof(int32);
- int size = msg.ByteSize() + kExtraBytes;
- scoped_refptr<media::DataBuffer> buffer = new media::DataBuffer(size);
- talk_base::SetBE32(buffer->GetWritableData(), msg.GetCachedSize());
- msg.SerializeWithCachedSizesToArray(buffer->GetWritableData() + kExtraBytes);
- buffer->SetDataSize(size);
- return buffer;
-}
-
-int GetBytesPerPixel(PixelFormat format) {
- // Note: The order is important here for performance. This is sorted from the
- // most common to the less common (PixelFormatAscii is mostly used
- // just for testing).
- switch (format) {
- case PixelFormatRgb24: return 3;
- case PixelFormatRgb565: return 2;
- case PixelFormatRgb32: return 4;
- case PixelFormatAscii: return 1;
- default:
- NOTREACHED() << "Pixel format not supported";
- return 0;
- }
-}
-
-} // namespace remoting
diff --git a/remoting/base/util.cc b/remoting/base/util.cc
new file mode 100644
index 0000000..8ccbd23
--- /dev/null
+++ b/remoting/base/util.cc
@@ -0,0 +1,26 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/base/util.h"
+
+#include "base/logging.h"
+
+namespace remoting {
+
+int GetBytesPerPixel(PixelFormat format) {
+ // Note: The order is important here for performance. This is sorted from the
+ // most common to the less common (PixelFormatAscii is mostly used
+ // just for testing).
+ switch (format) {
+ case PixelFormatRgb24: return 3;
+ case PixelFormatRgb565: return 2;
+ case PixelFormatRgb32: return 4;
+ case PixelFormatAscii: return 1;
+ default:
+ NOTREACHED() << "Pixel format not supported";
+ return 0;
+ }
+}
+
+} // namespace remoting
diff --git a/remoting/base/util.h b/remoting/base/util.h
new file mode 100644
index 0000000..adfbc89
--- /dev/null
+++ b/remoting/base/util.h
@@ -0,0 +1,16 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_BASE_UTIL_H_
+#define REMOTING_BASE_UTIL_H_
+
+#include "remoting/base/protocol/chromotocol.pb.h"
+
+namespace remoting {
+
+int GetBytesPerPixel(PixelFormat format);
+
+} // namespace remoting
+
+#endif // REMOTING_BASE_UTIL_H_
diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc
index e4f93ab..7669e3f 100644
--- a/remoting/client/chromoting_client.cc
+++ b/remoting/client/chromoting_client.cc
@@ -96,18 +96,18 @@ void ChromotingClient::SetViewport(int x, int y, int width, int height) {
view_->SetViewport(x, y, width, height);
}
-void ChromotingClient::HandleMessages(HostConnection* conn,
- HostMessageList* messages) {
+void ChromotingClient::HandleMessage(HostConnection* conn,
+ ChromotingHostMessage* msg) {
if (message_loop() != MessageLoop::current()) {
message_loop()->PostTask(
FROM_HERE,
- NewRunnableMethod(this, &ChromotingClient::HandleMessages,
- conn, messages));
+ NewRunnableMethod(this, &ChromotingClient::HandleMessage,
+ conn, msg));
return;
}
// Put all messages in the queue.
- received_messages_.splice(received_messages_.end(), *messages);
+ received_messages_.push_back(msg);
if (!message_being_processed_) {
DispatchMessage();
@@ -204,18 +204,18 @@ void ChromotingClient::SetState(State s) {
Repaint();
}
-void ChromotingClient::OnMessageDone(ChromotingHostMessage* msg) {
+void ChromotingClient::OnMessageDone(ChromotingHostMessage* message) {
if (message_loop() != MessageLoop::current()) {
message_loop()->PostTask(
FROM_HERE,
- NewTracedMethod(this, &ChromotingClient::OnMessageDone, msg));
+ NewTracedMethod(this, &ChromotingClient::OnMessageDone, message));
return;
}
TraceContext::tracer()->PrintString("Message done");
message_being_processed_ = false;
- delete msg;
+ delete message;
DispatchMessage();
}
diff --git a/remoting/client/chromoting_client.h b/remoting/client/chromoting_client.h
index 31870ed..f55ec78 100644
--- a/remoting/client/chromoting_client.h
+++ b/remoting/client/chromoting_client.h
@@ -10,6 +10,7 @@
#include "base/task.h"
#include "remoting/client/host_connection.h"
#include "remoting/client/client_config.h"
+#include "remoting/protocol/messages_decoder.h"
class MessageLoop;
@@ -50,7 +51,8 @@ class ChromotingClient : public HostConnection::HostEventCallback {
virtual void SetViewport(int x, int y, int width, int height);
// HostConnection::HostEventCallback implementation.
- virtual void HandleMessages(HostConnection* conn, HostMessageList* messages);
+ virtual void HandleMessage(HostConnection* conn,
+ ChromotingHostMessage* messages);
virtual void OnConnectionOpened(HostConnection* conn);
virtual void OnConnectionClosed(HostConnection* conn);
virtual void OnConnectionFailed(HostConnection* conn);
diff --git a/remoting/client/host_connection.h b/remoting/client/host_connection.h
index 7af8de3..74c2a6d 100644
--- a/remoting/client/host_connection.h
+++ b/remoting/client/host_connection.h
@@ -7,7 +7,7 @@
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
-#include "remoting/base/protocol_decoder.h"
+#include "remoting/protocol/messages_decoder.h"
namespace remoting {
@@ -19,12 +19,10 @@ class HostConnection {
public:
virtual ~HostEventCallback() {}
- // Handles an event received by the HostConnection. Receiver will own the
- // HostMessages in HostMessageList and needs to delete them.
- // Note that the sender of messages will not reference messages
- // again so it is okay to clear |messages| in this method.
- virtual void HandleMessages(HostConnection* conn,
- HostMessageList* messages) = 0;
+ // Handles an event received by the HostConnection. Ownership of
+ // the message is passed to the callee.
+ virtual void HandleMessage(HostConnection* conn,
+ ChromotingHostMessage* message) = 0;
// Called when the network connection is opened.
virtual void OnConnectionOpened(HostConnection* conn) = 0;
diff --git a/remoting/client/jingle_host_connection.cc b/remoting/client/jingle_host_connection.cc
index 4892883..a4519df 100644
--- a/remoting/client/jingle_host_connection.cc
+++ b/remoting/client/jingle_host_connection.cc
@@ -2,12 +2,14 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/callback.h"
#include "base/message_loop.h"
#include "remoting/base/constants.h"
-#include "remoting/base/protocol_util.h"
#include "remoting/client/client_config.h"
#include "remoting/client/jingle_host_connection.h"
#include "remoting/jingle_glue/jingle_thread.h"
+#include "remoting/protocol/jingle_chromoting_server.h"
+#include "remoting/protocol/util.h"
namespace remoting {
@@ -21,67 +23,82 @@ JingleHostConnection::~JingleHostConnection() {
void JingleHostConnection::Connect(const ClientConfig& config,
HostEventCallback* event_callback) {
- message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &JingleHostConnection::DoConnect,
- config, event_callback));
-}
+ event_callback_ = event_callback;
-void JingleHostConnection::Disconnect() {
- message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &JingleHostConnection::DoDisconnect));
+ // Initialize |jingle_client_|.
+ jingle_client_ = new JingleClient(context_->jingle_thread());
+ jingle_client_->Init(config.username, config.auth_token,
+ kChromotingTokenServiceName, this);
+
+ // Save jid of the host. The actual connection is created later after
+ // |jingle_client_| is connected.
+ host_jid_ = config.host_jid;
}
-void JingleHostConnection::SendEvent(const ChromotingClientMessage& msg) {
- if (message_loop() != MessageLoop::current()) {
+void JingleHostConnection::Disconnect() {
+ if (MessageLoop::current() != message_loop()) {
message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &JingleHostConnection::SendEvent, msg));
+ FROM_HERE, NewRunnableMethod(this,
+ &JingleHostConnection::Disconnect));
return;
}
- // Don't send messages if we're disconnected.
- if (jingle_channel_ == NULL) {
- return;
+ events_writer_.Close();
+ video_reader_.Close();
+
+ if (connection_) {
+ connection_->Close(
+ NewRunnableMethod(this, &JingleHostConnection::OnDisconnected));
+ } else {
+ OnDisconnected();
}
+}
- jingle_channel_->Write(SerializeAndFrameMessage(msg));
+void JingleHostConnection::OnVideoMessage(
+ ChromotingHostMessage* msg) {
+ event_callback_->HandleMessage(this, msg);
}
-void JingleHostConnection::OnStateChange(JingleChannel* channel,
- JingleChannel::State state) {
+void JingleHostConnection::InitConnection() {
DCHECK_EQ(message_loop(), MessageLoop::current());
- DCHECK(event_callback_);
-
- switch (state) {
- case JingleChannel::FAILED:
- event_callback_->OnConnectionFailed(this);
- break;
- case JingleChannel::CLOSED:
- event_callback_->OnConnectionClosed(this);
- break;
+ // Initialize |chromotocol_server_|.
+ JingleChromotingServer* chromotocol_server =
+ new JingleChromotingServer(message_loop());
+ chromotocol_server->Init(
+ jingle_client_->GetFullJid(),
+ jingle_client_->session_manager(),
+ NewCallback(this, &JingleHostConnection::OnNewChromotocolConnection));
+ chromotocol_server_ = chromotocol_server;
+
+ // Initialize |connection_|.
+ connection_ = chromotocol_server_->Connect(
+ host_jid_,
+ NewCallback(this, &JingleHostConnection::OnConnectionStateChange));
+}
- case JingleChannel::OPEN:
- event_callback_->OnConnectionOpened(this);
- break;
+void JingleHostConnection::OnDisconnected() {
+ connection_ = NULL;
- default:
- // Ignore the other states by default.
- break;
+ if (chromotocol_server_) {
+ chromotocol_server_->Close(
+ NewRunnableMethod(this, &JingleHostConnection::OnServerClosed));
+ } else {
+ OnServerClosed();
}
}
-void JingleHostConnection::OnPacketReceived(
- JingleChannel* channel,
- scoped_refptr<media::DataBuffer> buffer) {
- DCHECK_EQ(message_loop(), MessageLoop::current());
- DCHECK(event_callback_);
+void JingleHostConnection::OnServerClosed() {
+ chromotocol_server_ = NULL;
+ if (jingle_client_) {
+ jingle_client_->Close();
+ jingle_client_ = NULL;
+ }
+}
- HostMessageList list;
- decoder_.ParseHostMessages(buffer, &list);
- event_callback_->HandleMessages(this, &list);
+void JingleHostConnection::SendEvent(const ChromotingClientMessage& msg) {
+ // This drops the message if we are not connected yet.
+ events_writer_.SendMessage(msg);
}
// JingleClient::Callback interface.
@@ -93,6 +110,7 @@ void JingleHostConnection::OnStateChange(JingleClient* client,
if (state == JingleClient::CONNECTED) {
LOG(INFO) << "Connected as: " << client->GetFullJid();
+ InitConnection();
} else if (state == JingleClient::CLOSED) {
LOG(INFO) << "Connection closed.";
event_callback_->OnConnectionClosed(this);
@@ -110,8 +128,8 @@ bool JingleHostConnection::OnAcceptConnection(
}
void JingleHostConnection::OnNewConnection(
- JingleClient* client,
- scoped_refptr<JingleChannel> channel) {
+ JingleClient* client,
+ scoped_refptr<JingleChannel> channel) {
DCHECK_EQ(message_loop(), MessageLoop::current());
// TODO(ajwong): Should we log more aggressively on this and above? We
@@ -119,34 +137,44 @@ void JingleHostConnection::OnNewConnection(
NOTREACHED() << "Clients can't accept inbound connections.";
}
-MessageLoop* JingleHostConnection::message_loop() {
- return context_->jingle_thread()->message_loop();
+void JingleHostConnection::OnNewChromotocolConnection(
+ ChromotingConnection* connection, bool* accept) {
+ DCHECK_EQ(message_loop(), MessageLoop::current());
+ // Client always rejects incoming connections.
+ *accept = false;
}
-void JingleHostConnection::DoConnect(const ClientConfig& config,
- HostEventCallback* event_callback) {
+void JingleHostConnection::OnConnectionStateChange(
+ ChromotingConnection::State state) {
DCHECK_EQ(message_loop(), MessageLoop::current());
+ DCHECK(event_callback_);
- event_callback_ = event_callback;
+ switch (state) {
+ case ChromotingConnection::FAILED:
+ event_callback_->OnConnectionFailed(this);
+ break;
- jingle_client_ = new JingleClient(context_->jingle_thread());
- jingle_client_->Init(config.username, config.auth_token,
- kChromotingTokenServiceName, this);
- jingle_channel_ = jingle_client_->Connect(config.host_jid, this);
-}
+ case ChromotingConnection::CLOSED:
+ event_callback_->OnConnectionClosed(this);
+ break;
-void JingleHostConnection::DoDisconnect() {
- DCHECK_EQ(message_loop(), MessageLoop::current());
+ case ChromotingConnection::CONNECTED:
+ // Initialize reader and writer.
+ events_writer_.Init(connection_->GetEventsChannel());
+ video_reader_.Init(
+ connection_->GetVideoChannel(),
+ NewCallback(this, &JingleHostConnection::OnVideoMessage));
+ event_callback_->OnConnectionOpened(this);
+ break;
- if (jingle_channel_.get()) {
- jingle_channel_->Close();
- jingle_channel_ = NULL;
+ default:
+ // Ignore the other states by default.
+ break;
}
+}
- if (jingle_client_.get()) {
- jingle_client_->Close();
- jingle_client_ = NULL;
- }
+MessageLoop* JingleHostConnection::message_loop() {
+ return context_->jingle_thread()->message_loop();
}
} // namespace remoting
diff --git a/remoting/client/jingle_host_connection.h b/remoting/client/jingle_host_connection.h
index 885c63c..bbbbcab 100644
--- a/remoting/client/jingle_host_connection.h
+++ b/remoting/client/jingle_host_connection.h
@@ -21,11 +21,13 @@
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
#include "base/task.h"
-#include "remoting/base/protocol_decoder.h"
#include "remoting/client/client_context.h"
#include "remoting/client/host_connection.h"
-#include "remoting/jingle_glue/jingle_channel.h"
#include "remoting/jingle_glue/jingle_client.h"
+#include "remoting/protocol/chromoting_connection.h"
+#include "remoting/protocol/chromoting_server.h"
+#include "remoting/protocol/stream_reader.h"
+#include "remoting/protocol/stream_writer.h"
class MessageLoop;
@@ -36,7 +38,6 @@ class JingleThread;
struct ClientConfig;
class JingleHostConnection : public HostConnection,
- public JingleChannel::Callback,
public JingleClient::Callback {
public:
explicit JingleHostConnection(ClientContext* context);
@@ -48,34 +49,50 @@ class JingleHostConnection : public HostConnection,
virtual void SendEvent(const ChromotingClientMessage& msg);
- // JingleChannel::Callback interface.
- virtual void OnStateChange(JingleChannel* channel,
- JingleChannel::State state);
- virtual void OnPacketReceived(JingleChannel* channel,
- scoped_refptr<media::DataBuffer> buffer);
-
// JingleClient::Callback interface.
virtual void OnStateChange(JingleClient* client, JingleClient::State state);
virtual bool OnAcceptConnection(JingleClient* client, const std::string& jid,
JingleChannel::Callback** callback);
- virtual void OnNewConnection(JingleClient* client,
- scoped_refptr<JingleChannel> channel);
+ void OnNewConnection(JingleClient* client,
+ scoped_refptr<JingleChannel> channel);
+
+ // Callback for ChromotingServer.
+ void OnNewChromotocolConnection(
+ ChromotingConnection* connection, bool* accept);
+
+ // Callback for ChromotingConnection.
+ void OnConnectionStateChange(ChromotingConnection::State state);
private:
+ // The message loop for the jingle thread this object works on.
MessageLoop* message_loop();
- void DoConnect(const ClientConfig& config,
- HostEventCallback* event_callback);
- void DoDisconnect();
+ // Called on the jingle thread after we've successfully to XMPP server. Starts
+ // P2P connection to the host.
+ void InitConnection();
+
+ // Callback for |video_reader_|.
+ // TODO(sergeyu): This should be replaced with RTP/RTCP handler.
+ void OnVideoMessage(ChromotingHostMessage* msg);
+
+ // Used by Disconnect() to disconnect chromoting connection, stop chromoting
+ // server, and then disconnect XMPP connection.
+ void OnDisconnected();
+ void OnServerClosed();
ClientContext* context_;
scoped_refptr<JingleClient> jingle_client_;
- scoped_refptr<JingleChannel> jingle_channel_;
+ scoped_refptr<ChromotingServer> chromotocol_server_;
+ scoped_refptr<ChromotingConnection> connection_;
+
+ EventsStreamWriter events_writer_;
+ VideoStreamReader video_reader_;
- ProtocolDecoder decoder_;
HostEventCallback* event_callback_;
+ std::string host_jid_;
+
DISALLOW_COPY_AND_ASSIGN(JingleHostConnection);
};
diff --git a/remoting/client/rectangle_update_decoder.cc b/remoting/client/rectangle_update_decoder.cc
index 44bac08..aa87c4a 100644
--- a/remoting/client/rectangle_update_decoder.cc
+++ b/remoting/client/rectangle_update_decoder.cc
@@ -10,8 +10,8 @@
#include "remoting/base/decoder.h"
#include "remoting/base/decoder_row_based.h"
#include "remoting/base/protocol/chromotocol.pb.h"
-#include "remoting/base/protocol_util.h"
#include "remoting/base/tracer.h"
+#include "remoting/base/util.h"
#include "remoting/client/frame_consumer.h"
using media::AutoTaskRunner;
diff --git a/remoting/host/chromoting_host.cc b/remoting/host/chromoting_host.cc
index 656327fb..d0b7bb7 100644
--- a/remoting/host/chromoting_host.cc
+++ b/remoting/host/chromoting_host.cc
@@ -9,13 +9,14 @@
#include "build/build_config.h"
#include "remoting/base/constants.h"
#include "remoting/base/encoder.h"
-#include "remoting/base/protocol_decoder.h"
#include "remoting/host/chromoting_host_context.h"
#include "remoting/host/capturer.h"
#include "remoting/host/event_executor.h"
#include "remoting/host/host_config.h"
#include "remoting/host/session_manager.h"
#include "remoting/jingle_glue/jingle_channel.h"
+#include "remoting/protocol/messages_decoder.h"
+#include "remoting/protocol/jingle_chromoting_server.h"
namespace remoting {
@@ -54,9 +55,6 @@ void ChromotingHost::Start(Task* shutdown_task) {
state_ = kStarted;
}
- // Get capturer to set up it's initial configuration.
- capturer_->ScreenConfigurationChanged();
-
// Save the shutdown task.
shutdown_task_.reset(shutdown_task);
@@ -64,11 +62,12 @@ void ChromotingHost::Start(Task* shutdown_task) {
std::string xmpp_auth_token;
if (!config_->GetString(kXmppLoginConfigPath, &xmpp_login) ||
!config_->GetString(kXmppAuthTokenConfigPath, &xmpp_auth_token)) {
- LOG(ERROR) << "XMPP credentials are not defined in config.";
+ LOG(ERROR) << "XMPP credentials are not defined in the config.";
return;
}
- access_verifier_.Init(config_);
+ if (!access_verifier_.Init(config_))
+ return;
// Connect to the talk network with a JingleClient.
jingle_client_ = new JingleClient(context_->jingle_thread());
@@ -94,8 +93,10 @@ void ChromotingHost::Shutdown() {
// No-op if this object is not started yet.
{
AutoLock auto_lock(lock_);
- if (state_ != kStarted)
+ if (state_ != kStarted) {
+ state_ = kStopped;
return;
+ }
state_ = kStopped;
}
@@ -110,16 +111,22 @@ void ChromotingHost::Shutdown() {
client_->Disconnect();
}
- // Disconnect from the talk network.
- if (jingle_client_) {
- jingle_client_->Close();
- }
-
// Stop the heartbeat sender.
if (heartbeat_sender_) {
heartbeat_sender_->Stop();
}
+ // Stop chromotocol server.
+ if (chromotocol_server_) {
+ chromotocol_server_->Close(
+ NewRunnableMethod(this, &ChromotingHost::OnServerClosed));
+ }
+
+ // Disconnect from the talk network.
+ if (jingle_client_) {
+ jingle_client_->Close();
+ }
+
// Lastly call the shutdown task.
if (shutdown_task_.get()) {
shutdown_task_->Run();
@@ -168,15 +175,14 @@ void ChromotingHost::OnClientDisconnected(ClientConnection* client) {
////////////////////////////////////////////////////////////////////////////
// ClientConnection::EventHandler implementations
-void ChromotingHost::HandleMessages(ClientConnection* client,
- ClientMessageList* messages) {
+void ChromotingHost::HandleMessage(ClientConnection* client,
+ ChromotingClientMessage* message) {
DCHECK_EQ(context_->main_message_loop(), MessageLoop::current());
// Delegate the messages to EventExecutor and delete the unhandled
// messages.
DCHECK(executor_.get());
- executor_->HandleInputEvents(messages);
- STLDeleteElements<ClientMessageList>(messages);
+ executor_->HandleInputEvent(message);
}
void ChromotingHost::OnConnectionOpened(ClientConnection* client) {
@@ -210,7 +216,15 @@ void ChromotingHost::OnStateChange(JingleClient* jingle_client,
LOG(INFO) << "Host connected as "
<< jingle_client->GetFullJid() << "." << std::endl;
- // Start heartbeating after we've connected.
+ // Create and start |chromotocol_server_|.
+ chromotocol_server_ =
+ new JingleChromotingServer(context_->jingle_thread()->message_loop());
+ chromotocol_server_->Init(
+ jingle_client->GetFullJid(),
+ jingle_client->session_manager(),
+ NewCallback(this, &ChromotingHost::OnNewClientConnection));
+
+ // Start heartbeating.
heartbeat_sender_->Start();
} else if (state == JingleClient::CLOSED) {
LOG(INFO) << "Host disconnected from talk network." << std::endl;
@@ -224,45 +238,44 @@ void ChromotingHost::OnStateChange(JingleClient* jingle_client,
}
}
-bool ChromotingHost::OnAcceptConnection(
- JingleClient* jingle_client, const std::string& jid,
- JingleChannel::Callback** channel_callback) {
+void ChromotingHost::OnNewClientConnection(
+ ChromotingConnection* connection, bool* accept) {
AutoLock auto_lock(lock_);
- if (state_ != kStarted)
- return false;
-
- DCHECK_EQ(jingle_client_.get(), jingle_client);
-
// TODO(hclam): Allow multiple clients to connect to the host.
- if (client_.get())
- return false;
+ if (client_.get() || state_ != kStarted) {
+ *accept = false;
+ return;
+ }
// Check that the user has access to the host.
- if (!access_verifier_.VerifyPermissions(jid))
- return false;
+ if (!access_verifier_.VerifyPermissions(connection->jid())) {
+ *accept = false;
+ return;
+ }
- LOG(INFO) << "Client connected: " << jid << std::endl;
+ *accept = true;
+
+ LOG(INFO) << "Client connected: " << connection->jid() << std::endl;
// If we accept the connected then create a client object and set the
// callback.
- client_ = new ClientConnection(context_->main_message_loop(),
- new ProtocolDecoder(), this);
- *channel_callback = client_.get();
- return true;
+ client_ = new ClientConnection(context_->main_message_loop(), this);
+ client_->Init(connection);
+}
+
+bool ChromotingHost::OnAcceptConnection(
+ JingleClient* jingle_client, const std::string& jid,
+ JingleChannel::Callback** channel_callback) {
+ return false;
}
void ChromotingHost::OnNewConnection(JingleClient* jingle_client,
scoped_refptr<JingleChannel> channel) {
- AutoLock auto_lock(lock_);
- if (state_ != kStarted)
- return;
-
- DCHECK_EQ(jingle_client_.get(), jingle_client);
+ NOTREACHED();
+}
- // Since the session manager has not started, it is still safe to access
- // the client directly. Note that we give the ownership of the channel
- // to the client.
- client_->set_jingle_channel(channel);
+void ChromotingHost::OnServerClosed() {
+ // Don't need to do anything here.
}
} // namespace remoting
diff --git a/remoting/host/chromoting_host.h b/remoting/host/chromoting_host.h
index ce3bdda..c1773cb 100644
--- a/remoting/host/chromoting_host.h
+++ b/remoting/host/chromoting_host.h
@@ -27,6 +27,7 @@ class Encoder;
class EventExecutor;
class MutableHostConfig;
class SessionManager;
+class JingleChromotingServer;
// A class to implement the functionality of a host process.
//
@@ -83,8 +84,8 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
////////////////////////////////////////////////////////////////////////////
// ClientConnection::EventHandler implementations
- virtual void HandleMessages(ClientConnection* client,
- ClientMessageList* messages);
+ virtual void HandleMessage(ClientConnection* client,
+ ChromotingClientMessage* message);
virtual void OnConnectionOpened(ClientConnection* client);
virtual void OnConnectionClosed(ClientConnection* client);
virtual void OnConnectionFailed(ClientConnection* client);
@@ -99,6 +100,9 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
JingleClient* jingle,
scoped_refptr<JingleChannel> channel);
+ // Callback for ChromotingServer.
+ void OnNewClientConnection(ChromotingConnection* connection, bool* accept);
+
private:
enum State {
kInitial,
@@ -106,6 +110,13 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
kStopped,
};
+ // This method connects to the talk network and start listening for incoming
+ // connections.
+ void DoStart(Task* shutdown_task);
+
+ // Callback for ChromotingServer::Close().
+ void OnServerClosed();
+
// The context that the chromoting host runs on.
ChromotingHostContext* context_;
@@ -126,6 +137,8 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>,
// receive connection requests from chromoting client.
scoped_refptr<JingleClient> jingle_client_;
+ scoped_refptr<JingleChromotingServer> chromotocol_server_;
+
// Objects that takes care of sending heartbeats to the chromoting bot.
scoped_refptr<HeartbeatSender> heartbeat_sender_;
diff --git a/remoting/host/client_connection.cc b/remoting/host/client_connection.cc
index 2cbbed6..8c8bc94 100644
--- a/remoting/host/client_connection.cc
+++ b/remoting/host/client_connection.cc
@@ -5,11 +5,9 @@
#include "remoting/host/client_connection.h"
#include "google/protobuf/message.h"
-#include "media/base/data_buffer.h"
-#include "remoting/base/protocol_decoder.h"
-#include "remoting/base/protocol_util.h"
-
-using media::DataBuffer;
+#include "net/base/io_buffer.h"
+#include "remoting/protocol/messages_decoder.h"
+#include "remoting/protocol/util.h"
namespace remoting {
@@ -18,165 +16,100 @@ namespace remoting {
static const size_t kAverageUpdateStream = 10;
ClientConnection::ClientConnection(MessageLoop* message_loop,
- ProtocolDecoder* decoder,
EventHandler* handler)
: loop_(message_loop),
- decoder_(decoder),
- size_in_queue_(0),
- update_stream_size_(0),
handler_(handler) {
DCHECK(loop_);
- DCHECK(decoder_.get());
DCHECK(handler_);
}
ClientConnection::~ClientConnection() {
// TODO(hclam): When we shut down the viewer we may have to close the
- // jingle channel.
+ // connection.
}
-// static
-scoped_refptr<media::DataBuffer>
- ClientConnection::CreateWireFormatDataBuffer(
- const ChromotingHostMessage* msg) {
- // TODO(hclam): Instead of serializing |msg| create an DataBuffer
- // object that wraps around it.
- scoped_ptr<const ChromotingHostMessage> message_deleter(msg);
- return SerializeAndFrameMessage(*msg);
+void ClientConnection::Init(ChromotingConnection* connection) {
+ DCHECK_EQ(connection->message_loop(), MessageLoop::current());
+
+ connection_ = connection;
+ connection_->SetStateChangeCallback(
+ NewCallback(this, &ClientConnection::OnConnectionStateChange));
}
void ClientConnection::SendInitClientMessage(int width, int height) {
DCHECK_EQ(loop_, MessageLoop::current());
- DCHECK(!update_stream_size_);
// If we are disconnected then return.
- if (!channel_)
+ if (!connection_)
return;
ChromotingHostMessage msg;
msg.mutable_init_client()->set_width(width);
msg.mutable_init_client()->set_height(height);
DCHECK(msg.IsInitialized());
- channel_->Write(SerializeAndFrameMessage(msg));
-}
-
-void ClientConnection::SendBeginUpdateStreamMessage() {
- DCHECK_EQ(loop_, MessageLoop::current());
-
- // If we are disconnected then return.
- if (!channel_)
- return;
-
- ChromotingHostMessage msg;
- msg.mutable_begin_update_stream();
- DCHECK(msg.IsInitialized());
-
- scoped_refptr<DataBuffer> data = SerializeAndFrameMessage(msg);
- DCHECK(!update_stream_size_);
- update_stream_size_ += data->GetDataSize();
- channel_->Write(data);
+ video_writer_.SendMessage(msg);
}
void ClientConnection::SendUpdateStreamPacketMessage(
- scoped_refptr<DataBuffer> data) {
+ const ChromotingHostMessage& message) {
DCHECK_EQ(loop_, MessageLoop::current());
// If we are disconnected then return.
- if (!channel_)
+ if (!connection_)
return;
- update_stream_size_ += data->GetDataSize();
- channel_->Write(data);
-}
-
-void ClientConnection::SendEndUpdateStreamMessage() {
- DCHECK_EQ(loop_, MessageLoop::current());
-
- // If we are disconnected then return.
- if (!channel_)
- return;
-
- ChromotingHostMessage msg;
- msg.mutable_end_update_stream();
- DCHECK(msg.IsInitialized());
-
- scoped_refptr<DataBuffer> data = SerializeAndFrameMessage(msg);
- update_stream_size_ += data->GetDataSize();
- channel_->Write(data);
-
- // Here's some logic to help finding the average update stream size.
- size_in_queue_ += update_stream_size_;
- size_queue_.push_back(update_stream_size_);
- if (size_queue_.size() > kAverageUpdateStream) {
- size_in_queue_ -= size_queue_.front();
- size_queue_.pop_front();
- DCHECK_GE(size_in_queue_, 0);
- }
- update_stream_size_ = 0;
-}
-
-void ClientConnection::MarkEndOfUpdate() {
- // This is some logic to help calculate the average update stream size.
- size_in_queue_ += update_stream_size_;
- size_queue_.push_back(update_stream_size_);
- if (size_queue_.size() > kAverageUpdateStream) {
- size_in_queue_ -= size_queue_.front();
- size_queue_.pop_front();
- DCHECK_GE(size_in_queue_, 0);
- }
- update_stream_size_ = 0;
+ video_writer_.SendMessage(message);
}
int ClientConnection::GetPendingUpdateStreamMessages() {
DCHECK_EQ(loop_, MessageLoop::current());
-
- if (!size_queue_.size())
- return 0;
- int average_size = size_in_queue_ / size_queue_.size();
- if (!average_size)
- return 0;
- return channel_->write_buffer_size() / average_size;
+ return video_writer_.GetPendingMessages();
}
void ClientConnection::Disconnect() {
DCHECK_EQ(loop_, MessageLoop::current());
// If there is a channel then close it and release the reference.
- if (channel_) {
- channel_->Close();
- channel_ = NULL;
+ if (connection_) {
+ connection_->Close(NewRunnableMethod(this, &ClientConnection::OnClosed));
+ connection_ = NULL;
}
}
-void ClientConnection::OnStateChange(JingleChannel* channel,
- JingleChannel::State state) {
- DCHECK(channel);
+void ClientConnection::OnConnectionStateChange(
+ ChromotingConnection::State state) {
+ if (state == ChromotingConnection::CONNECTED) {
+ events_reader_.Init(
+ connection_->GetEventsChannel(),
+ NewCallback(this, &ClientConnection::OnMessageReceived));
+ video_writer_.Init(connection_->GetVideoChannel());
+ }
+
loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &ClientConnection::StateChangeTask, state));
}
-void ClientConnection::OnPacketReceived(JingleChannel* channel,
- scoped_refptr<DataBuffer> data) {
- DCHECK_EQ(channel_.get(), channel);
+void ClientConnection::OnMessageReceived(ChromotingClientMessage* message) {
loop_->PostTask(FROM_HERE,
- NewRunnableMethod(this, &ClientConnection::PacketReceivedTask, data));
+ NewRunnableMethod(this, &ClientConnection::MessageReceivedTask,
+ message));
}
-void ClientConnection::StateChangeTask(JingleChannel::State state) {
+void ClientConnection::StateChangeTask(ChromotingConnection::State state) {
DCHECK_EQ(loop_, MessageLoop::current());
DCHECK(handler_);
switch(state) {
- case JingleChannel::CONNECTING:
+ case ChromotingConnection::CONNECTING:
break;
// Don't care about this message.
- case JingleChannel::OPEN:
+ case ChromotingConnection::CONNECTED:
handler_->OnConnectionOpened(this);
break;
- case JingleChannel::CLOSED:
+ case ChromotingConnection::CLOSED:
handler_->OnConnectionClosed(this);
break;
- case JingleChannel::FAILED:
+ case ChromotingConnection::FAILED:
handler_->OnConnectionFailed(this);
break;
default:
@@ -185,17 +118,14 @@ void ClientConnection::StateChangeTask(JingleChannel::State state) {
}
}
-void ClientConnection::PacketReceivedTask(scoped_refptr<DataBuffer> data) {
+void ClientConnection::MessageReceivedTask(ChromotingClientMessage* message) {
DCHECK_EQ(loop_, MessageLoop::current());
-
- // Use the decoder to parse incoming data.
- DCHECK(decoder_.get());
- ClientMessageList list;
- decoder_->ParseClientMessages(data, &list);
-
- // Then submit the messages to the handler.
DCHECK(handler_);
- handler_->HandleMessages(this, &list);
+ handler_->HandleMessage(this, message);
+}
+
+// OnClosed() is used as a callback for ChromotingConnection::Close().
+void ClientConnection::OnClosed() {
}
} // namespace remoting
diff --git a/remoting/host/client_connection.h b/remoting/host/client_connection.h
index 0ea2b26..44bb761 100644
--- a/remoting/host/client_connection.h
+++ b/remoting/host/client_connection.h
@@ -11,9 +11,10 @@
#include "base/message_loop.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
-#include "remoting/base/protocol_decoder.h"
#include "remoting/base/protocol/chromotocol.pb.h"
-#include "remoting/jingle_glue/jingle_channel.h"
+#include "remoting/protocol/chromoting_connection.h"
+#include "remoting/protocol/stream_reader.h"
+#include "remoting/protocol/stream_writer.h"
namespace media {
@@ -28,8 +29,7 @@ namespace remoting {
// screen updates and other messages to the remote viewer. It is also
// responsible for receiving and parsing data from the remote viewer and
// delegating events to the event handler.
-class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>,
- public JingleChannel::Callback {
+class ClientConnection : public base::RefCountedThreadSafe<ClientConnection> {
public:
class EventHandler {
public:
@@ -39,8 +39,8 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>,
// ClientMessages in ClientMessageList and needs to delete them.
// Note that the sender of messages will not reference messages
// again so it is okay to clear |messages| in this method.
- virtual void HandleMessages(ClientConnection* viewer,
- ClientMessageList* messages) = 0;
+ virtual void HandleMessage(ClientConnection* viewer,
+ ChromotingClientMessage* message) = 0;
// Called when the network connection is opened.
virtual void OnConnectionOpened(ClientConnection* viewer) = 0;
@@ -57,45 +57,21 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>,
// a libjingle channel, these events are delegated to |handler|.
// It is guranteed that |handler| is called only on the |message_loop|.
ClientConnection(MessageLoop* message_loop,
- ProtocolDecoder* decoder,
EventHandler* handler);
virtual ~ClientConnection();
- // Creates a DataBuffer object that wraps around ChromotingHostMessage. The
- // DataBuffer object will be responsible for serializing and framing the
- // message. DataBuffer will also own |msg| after this call.
- static scoped_refptr<media::DataBuffer> CreateWireFormatDataBuffer(
- const ChromotingHostMessage* msg);
+ virtual void Init(ChromotingConnection* connection);
- virtual void set_jingle_channel(JingleChannel* channel) {
- channel_ = channel;
- }
-
- // Returns the channel in use.
- virtual JingleChannel* jingle_channel() { return channel_; }
+ // Returns the connection in use.
+ virtual ChromotingConnection* connection() { return connection_; }
// Send information to the client for initialization.
virtual void SendInitClientMessage(int width, int height);
- // Notifies the viewer the start of an update stream.
- virtual void SendBeginUpdateStreamMessage();
-
// Send encoded update stream data to the viewer.
- //
- // |data| is the actual bytes in wire format. That means it is fully framed
- // and serialized from a ChromotingHostMessage. This is a special case only
- // for UpdateStreamPacket to reduce the amount of memory copies.
- //
- // |data| should be created by calling to
- // CreateWireFormatDataBuffer(ChromotingHostMessage).
virtual void SendUpdateStreamPacketMessage(
- scoped_refptr<media::DataBuffer> data);
-
- // Notifies the viewer the update stream has ended.
- virtual void SendEndUpdateStreamMessage();
-
- virtual void MarkEndOfUpdate();
+ const ChromotingHostMessage& message);
// Gets the number of update stream messages not yet transmitted.
// Note that the value returned is an estimate using average size of the
@@ -109,43 +85,34 @@ class ClientConnection : public base::RefCountedThreadSafe<ClientConnection>,
// After this method is called all the send method calls will be ignored.
virtual void Disconnect();
- /////////////////////////////////////////////////////////////////////////////
- // JingleChannel::Callback implmentations
- virtual void OnStateChange(JingleChannel* channel,
- JingleChannel::State state);
- virtual void OnPacketReceived(JingleChannel* channel,
- scoped_refptr<media::DataBuffer> data);
-
protected:
// Protected constructor used by unit test.
ClientConnection() {}
private:
+ // Callback for ChromotingConnection.
+ void OnConnectionStateChange(ChromotingConnection::State state);
+
+ // Callback for EventsStreamReader.
+ void OnMessageReceived(ChromotingClientMessage* message);
+
// Process a libjingle state change event on the |loop_|.
- void StateChangeTask(JingleChannel::State state);
+ void StateChangeTask(ChromotingConnection::State state);
// Process a data buffer received from libjingle.
- void PacketReceivedTask(scoped_refptr<media::DataBuffer> data);
+ void MessageReceivedTask(ChromotingClientMessage* message);
+
+ void OnClosed();
// The libjingle channel used to send and receive data from the remote client.
- scoped_refptr<JingleChannel> channel_;
+ scoped_refptr<ChromotingConnection> connection_;
+
+ EventsStreamReader events_reader_;
+ VideoStreamWriter video_writer_;
// The message loop that this object runs on.
MessageLoop* loop_;
- // An object used by the ClientConnection to decode data received from the
- // network.
- scoped_ptr<ProtocolDecoder> decoder_;
-
- // A queue to count the sizes of the last 10 update streams.
- std::deque<int> size_queue_;
-
- // Count the sum of sizes in the queue.
- int size_in_queue_;
-
- // Measure the number of bytes of the current upstream stream.
- int update_stream_size_;
-
// Event handler for handling events sent from this object.
EventHandler* handler_;
diff --git a/remoting/host/client_connection_unittest.cc b/remoting/host/client_connection_unittest.cc
index 9c3bcb3..317d7fa 100644
--- a/remoting/host/client_connection_unittest.cc
+++ b/remoting/host/client_connection_unittest.cc
@@ -7,7 +7,7 @@
#include "remoting/base/mock_objects.h"
#include "remoting/host/client_connection.h"
#include "remoting/host/mock_objects.h"
-#include "remoting/jingle_glue/mock_objects.h"
+#include "remoting/protocol/fake_connection.h"
#include "testing/gmock/include/gmock/gmock.h"
using ::testing::_;
@@ -23,86 +23,67 @@ class ClientConnectionTest : public testing::Test {
protected:
virtual void SetUp() {
- decoder_ = new MockProtocolDecoder();
- channel_ = new StrictMock<MockJingleChannel>();
-
- // Allocate a ClientConnection object with the mock objects. we give the
- // ownership of decoder to the viewer.
- viewer_ = new ClientConnection(&message_loop_,
- decoder_,
- &handler_);
-
- viewer_->set_jingle_channel(channel_.get());
+ connection_ = new FakeChromotingConnection();
+ connection_->set_message_loop(&message_loop_);
+
+ // Allocate a ClientConnection object with the mock objects.
+ viewer_ = new ClientConnection(&message_loop_, &handler_);
+ viewer_->Init(connection_);
+ EXPECT_CALL(handler_, OnConnectionOpened(viewer_.get()));
+ connection_->get_state_change_callback()->Run(
+ ChromotingConnection::CONNECTED);
+ message_loop_.RunAllPending();
}
MessageLoop message_loop_;
- MockProtocolDecoder* decoder_;
MockClientConnectionEventHandler handler_;
scoped_refptr<ClientConnection> viewer_;
- // |channel_| is wrapped with StrictMock because we limit strictly the calls
- // to it.
- scoped_refptr<StrictMock<MockJingleChannel> > channel_;
+ scoped_refptr<FakeChromotingConnection> connection_;
private:
DISALLOW_COPY_AND_ASSIGN(ClientConnectionTest);
};
TEST_F(ClientConnectionTest, SendUpdateStream) {
- // Tell the viewer we are starting an update stream.
- EXPECT_CALL(*channel_, Write(_));
- viewer_->SendBeginUpdateStreamMessage();
-
// Then send the actual data.
- EXPECT_CALL(*channel_, Write(_));
- scoped_refptr<media::DataBuffer> data = new media::DataBuffer(10);
- viewer_->SendUpdateStreamPacketMessage(data);
-
- // Send the end of update message.
- EXPECT_CALL(*channel_, Write(_));
- viewer_->SendEndUpdateStreamMessage();
+ ChromotingHostMessage message;
+ viewer_->SendUpdateStreamPacketMessage(message);
// And then close the connection to ClientConnection.
- EXPECT_CALL(*channel_, Close());
viewer_->Disconnect();
-}
-TEST_F(ClientConnectionTest, StateChange) {
- EXPECT_CALL(handler_, OnConnectionOpened(viewer_.get()));
- viewer_->OnStateChange(channel_.get(), JingleChannel::OPEN);
message_loop_.RunAllPending();
+ // Verify that something has been written.
+ // TODO(sergeyu): Verify that the correct data has been written.
+ EXPECT_GT(connection_->GetVideoChannel()->written_data().size(), 0u);
+}
+
+TEST_F(ClientConnectionTest, StateChange) {
EXPECT_CALL(handler_, OnConnectionClosed(viewer_.get()));
- viewer_->OnStateChange(channel_.get(), JingleChannel::CLOSED);
+ connection_->get_state_change_callback()->Run(ChromotingConnection::CLOSED);
message_loop_.RunAllPending();
EXPECT_CALL(handler_, OnConnectionFailed(viewer_.get()));
- viewer_->OnStateChange(channel_.get(), JingleChannel::FAILED);
- message_loop_.RunAllPending();
-}
-
-TEST_F(ClientConnectionTest, ParseMessages) {
- scoped_refptr<media::DataBuffer> data;
-
- // Give the data to the ClientConnection, it will use ProtocolDecoder to
- // decode the messages.
- EXPECT_CALL(*decoder_, ParseClientMessages(data, NotNull()));
- EXPECT_CALL(handler_, HandleMessages(viewer_.get(), NotNull()));
-
- viewer_->OnPacketReceived(channel_.get(), data);
+ connection_->get_state_change_callback()->Run(ChromotingConnection::FAILED);
message_loop_.RunAllPending();
}
// Test that we can close client connection more than once and operations
// after the connection is closed has no effect.
TEST_F(ClientConnectionTest, Close) {
- EXPECT_CALL(*channel_, Close());
viewer_->Disconnect();
+ message_loop_.RunAllPending();
+ EXPECT_TRUE(connection_->is_closed());
- scoped_refptr<media::DataBuffer> data = new media::DataBuffer(10);
- viewer_->SendUpdateStreamPacketMessage(data);
- viewer_->SendEndUpdateStreamMessage();
+ ChromotingHostMessage message;
+ viewer_->SendUpdateStreamPacketMessage(message);
viewer_->Disconnect();
+ message_loop_.RunAllPending();
+
+ // Verify that nothing has been written.
+ EXPECT_EQ(connection_->GetVideoChannel()->written_data().size(), 0u);
}
} // namespace remoting
diff --git a/remoting/host/event_executor.h b/remoting/host/event_executor.h
index bfdf38f..3f87ea7 100644
--- a/remoting/host/event_executor.h
+++ b/remoting/host/event_executor.h
@@ -7,11 +7,12 @@
#include <vector>
-#include "remoting/base/protocol_decoder.h"
+#include "base/basictypes.h"
namespace remoting {
class Capturer;
+class ChromotingClientMessage;
// An interface that defines the behavior of an event executor object.
// An event executor is to perform actions on the host machine. For example
@@ -19,14 +20,14 @@ class Capturer;
// clipboards.
class EventExecutor {
public:
- EventExecutor(Capturer* capturer)
+ explicit EventExecutor(Capturer* capturer)
: capturer_(capturer) {
}
virtual ~EventExecutor() {}
// Handles input events from ClientMessageList and removes them from the
// list.
- virtual void HandleInputEvents(ClientMessageList* messages) = 0;
+ virtual void HandleInputEvent(ChromotingClientMessage* message) = 0;
// TODO(hclam): Define actions for clipboards.
protected:
diff --git a/remoting/host/event_executor_linux.cc b/remoting/host/event_executor_linux.cc
index d2e4a13..44c4c55 100644
--- a/remoting/host/event_executor_linux.cc
+++ b/remoting/host/event_executor_linux.cc
@@ -4,6 +4,8 @@
#include "remoting/host/event_executor_linux.h"
+#include "remoting/protocol/messages_decoder.h"
+
namespace remoting {
EventExecutorLinux::EventExecutorLinux(Capturer* capturer)
@@ -13,7 +15,8 @@ EventExecutorLinux::EventExecutorLinux(Capturer* capturer)
EventExecutorLinux::~EventExecutorLinux() {
}
-void EventExecutorLinux::HandleInputEvents(ClientMessageList* messages) {
+void EventExecutorLinux::HandleInputEvent(ChromotingClientMessage* message) {
+ delete message;
}
} // namespace remoting
diff --git a/remoting/host/event_executor_linux.h b/remoting/host/event_executor_linux.h
index 44b8209..bdb3e73 100644
--- a/remoting/host/event_executor_linux.h
+++ b/remoting/host/event_executor_linux.h
@@ -17,7 +17,7 @@ class EventExecutorLinux : public EventExecutor {
EventExecutorLinux(Capturer* capturer);
virtual ~EventExecutorLinux();
- virtual void HandleInputEvents(ClientMessageList* messages);
+ virtual void HandleInputEvent(ChromotingClientMessage* message);
private:
DISALLOW_COPY_AND_ASSIGN(EventExecutorLinux);
diff --git a/remoting/host/event_executor_mac.cc b/remoting/host/event_executor_mac.cc
index 3a1256b..89acece 100644
--- a/remoting/host/event_executor_mac.cc
+++ b/remoting/host/event_executor_mac.cc
@@ -13,7 +13,8 @@ EventExecutorMac::EventExecutorMac(Capturer* capturer)
EventExecutorMac::~EventExecutorMac() {
}
-void EventExecutorMac::HandleInputEvents(ClientMessageList* messages) {
+void EventExecutorMac::HandleInputEvent(ChromotingClientMessage* message) {
+ delete message;
}
} // namespace remoting
diff --git a/remoting/host/event_executor_mac.h b/remoting/host/event_executor_mac.h
index 854d274..f8934b1 100644
--- a/remoting/host/event_executor_mac.h
+++ b/remoting/host/event_executor_mac.h
@@ -17,7 +17,7 @@ class EventExecutorMac : public EventExecutor {
EventExecutorMac(Capturer* capturer);
virtual ~EventExecutorMac();
- virtual void HandleInputEvents(ClientMessageList* messages);
+ virtual void HandleInputEvent(ChromotingClientMessage* message);
private:
DISALLOW_COPY_AND_ASSIGN(EventExecutorMac);
diff --git a/remoting/host/event_executor_win.cc b/remoting/host/event_executor_win.cc
index 24aa841..762cd0c 100644
--- a/remoting/host/event_executor_win.cc
+++ b/remoting/host/event_executor_win.cc
@@ -355,28 +355,21 @@ EventExecutorWin::EventExecutorWin(Capturer* capturer)
EventExecutorWin::~EventExecutorWin() {
}
-void EventExecutorWin::HandleInputEvents(ClientMessageList* messages) {
- for (ClientMessageList::iterator it = messages->begin();
- it != messages->end();
- ++it) {
- ChromotingClientMessage* msg = *it;
- if (msg->has_mouse_set_position_event()) {
- HandleMouseSetPosition(msg);
- } else if (msg->has_mouse_move_event()) {
- HandleMouseMove(msg);
- } else if (msg->has_mouse_wheel_event()) {
- HandleMouseWheel(msg);
- } else if (msg->has_mouse_down_event()) {
- HandleMouseButtonDown(msg);
- } else if (msg->has_mouse_up_event()) {
- HandleMouseButtonUp(msg);
- } else if (msg->has_key_event()) {
- HandleKey(msg);
- }
+void EventExecutorWin::HandleInputEvent(ChromotingClientMessage* msg) {
+ if (msg->has_mouse_set_position_event()) {
+ HandleMouseSetPosition(msg);
+ } else if (msg->has_mouse_move_event()) {
+ HandleMouseMove(msg);
+ } else if (msg->has_mouse_wheel_event()) {
+ HandleMouseWheel(msg);
+ } else if (msg->has_mouse_down_event()) {
+ HandleMouseButtonDown(msg);
+ } else if (msg->has_mouse_up_event()) {
+ HandleMouseButtonUp(msg);
+ } else if (msg->has_key_event()) {
+ HandleKey(msg);
}
- // We simply delete all messages.
- // TODO(hclam): Delete messages processed.
- STLDeleteElements<ClientMessageList>(messages);
+ delete msg;
}
void EventExecutorWin::HandleMouseSetPosition(ChromotingClientMessage* msg) {
diff --git a/remoting/host/event_executor_win.h b/remoting/host/event_executor_win.h
index 4a354e6..e9a0269 100644
--- a/remoting/host/event_executor_win.h
+++ b/remoting/host/event_executor_win.h
@@ -17,7 +17,7 @@ class EventExecutorWin : public EventExecutor {
EventExecutorWin(Capturer* capturer);
virtual ~EventExecutorWin();
- virtual void HandleInputEvents(ClientMessageList* messages);
+ virtual void HandleInputEvent(ChromotingClientMessage* message);
private:
void HandleMouseSetPosition(ChromotingClientMessage* msg);
diff --git a/remoting/host/mock_objects.h b/remoting/host/mock_objects.h
index 9e6b631..64dcc55 100644
--- a/remoting/host/mock_objects.h
+++ b/remoting/host/mock_objects.h
@@ -6,10 +6,10 @@
#define REMOTING_HOST_MOCK_OBJECTS_H_
#include "media/base/data_buffer.h"
-#include "remoting/base/protocol_decoder.h"
#include "remoting/host/capturer.h"
#include "remoting/host/client_connection.h"
#include "remoting/host/event_executor.h"
+#include "remoting/protocol/messages_decoder.h"
#include "testing/gmock/include/gmock/gmock.h"
namespace remoting {
@@ -36,7 +36,7 @@ class MockEventExecutor : public EventExecutor {
public:
MockEventExecutor(Capturer* capturer) : EventExecutor(capturer) {}
- MOCK_METHOD1(HandleInputEvents, void(ClientMessageList* messages));
+ MOCK_METHOD1(HandleInputEvent, void(ChromotingClientMessage* messages));
private:
DISALLOW_COPY_AND_ASSIGN(MockEventExecutor);
@@ -46,17 +46,14 @@ class MockClientConnection : public ClientConnection {
public:
MockClientConnection(){}
+ MOCK_METHOD1(Init, void(ChromotingConnection* connection));
MOCK_METHOD2(SendInitClientMessage, void(int width, int height));
MOCK_METHOD0(SendBeginUpdateStreamMessage, void());
MOCK_METHOD1(SendUpdateStreamPacketMessage,
- void(scoped_refptr<media::DataBuffer> data));
+ void(const ChromotingHostMessage& message));
MOCK_METHOD0(SendEndUpdateStreamMessage, void());
MOCK_METHOD0(GetPendingUpdateStreamMessages, int());
-
- MOCK_METHOD2(OnStateChange, void(JingleChannel* channel,
- JingleChannel::State state));
- MOCK_METHOD2(OnPacketReceived, void(JingleChannel* channel,
- scoped_refptr<media::DataBuffer> data));
+ MOCK_METHOD0(Disconnect, void());
private:
DISALLOW_COPY_AND_ASSIGN(MockClientConnection);
@@ -66,8 +63,9 @@ class MockClientConnectionEventHandler : public ClientConnection::EventHandler {
public:
MockClientConnectionEventHandler() {}
- MOCK_METHOD2(HandleMessages,
- void(ClientConnection* viewer, ClientMessageList* messages));
+ MOCK_METHOD2(HandleMessage,
+ void(ClientConnection* viewer,
+ ChromotingClientMessage* message));
MOCK_METHOD1(OnConnectionOpened, void(ClientConnection* viewer));
MOCK_METHOD1(OnConnectionClosed, void(ClientConnection* viewer));
MOCK_METHOD1(OnConnectionFailed, void(ClientConnection* viewer));
diff --git a/remoting/host/session_manager.cc b/remoting/host/session_manager.cc
index 7ab3bc5..4aacfc8 100644
--- a/remoting/host/session_manager.cc
+++ b/remoting/host/session_manager.cc
@@ -11,9 +11,9 @@
#include "base/stl_util-inl.h"
#include "media/base/data_buffer.h"
#include "remoting/base/capture_data.h"
-#include "remoting/base/protocol_decoder.h"
#include "remoting/base/tracer.h"
#include "remoting/host/client_connection.h"
+#include "remoting/protocol/messages_decoder.h"
namespace remoting {
@@ -333,24 +333,15 @@ void SessionManager::DoSendUpdate(ChromotingHostMessage* message,
Encoder::EncodingState state) {
DCHECK_EQ(network_loop_, MessageLoop::current());
- // TODO(ajwong): We shouldn't need EncodingState. Just inspect message.
- bool is_end_of_update = (message->rectangle_update().flags() |
- RectangleUpdatePacket::LAST_PACKET) != 0;
-
TraceContext::tracer()->PrintString("DoSendUpdate");
- // Create a data buffer in wire format from |message|.
- // Note that this takes ownership of |message|.
- scoped_refptr<media::DataBuffer> data =
- ClientConnection::CreateWireFormatDataBuffer(message);
-
for (ClientConnectionList::const_iterator i = clients_.begin();
i < clients_.end(); ++i) {
- (*i)->SendUpdateStreamPacketMessage(data);
-
- if (is_end_of_update)
- (*i)->MarkEndOfUpdate();
+ (*i)->SendUpdateStreamPacketMessage(*message);
}
+
+ delete message;
+
TraceContext::tracer()->PrintString("DoSendUpdate done");
}
diff --git a/remoting/jingle_glue/stream_socket_adapter.cc b/remoting/jingle_glue/stream_socket_adapter.cc
index cb84941..3155328 100644
--- a/remoting/jingle_glue/stream_socket_adapter.cc
+++ b/remoting/jingle_glue/stream_socket_adapter.cc
@@ -65,6 +65,7 @@ int StreamSocketAdapter::Write(
if (result == net::ERR_CONNECTION_CLOSED &&
stream_->GetState() == talk_base::SS_OPENING)
result = net::ERR_IO_PENDING;
+
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
write_callback_ = callback;
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc
new file mode 100644
index 0000000..6c60a4a
--- /dev/null
+++ b/remoting/protocol/buffered_socket_writer.cc
@@ -0,0 +1,116 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/buffered_socket_writer.h"
+
+#include "base/message_loop.h"
+#include "net/base/net_errors.h"
+
+namespace remoting {
+
+BufferedSocketWriter::BufferedSocketWriter()
+ : socket_(NULL),
+ message_loop_(NULL),
+ buffer_size_(0),
+ write_pending_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ written_callback_(this, &BufferedSocketWriter::OnWritten)),
+ closed_(false) {
+}
+
+BufferedSocketWriter::~BufferedSocketWriter() { }
+
+void BufferedSocketWriter::Init(net::Socket* socket,
+ WriteFailedCallback* callback) {
+ AutoLock auto_lock(lock_);
+ message_loop_ = MessageLoop::current();
+ socket_ = socket;
+}
+
+bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) {
+ AutoLock auto_lock(lock_);
+ if (!socket_)
+ return false;
+ queue_.push_back(data);
+ buffer_size_ += data->size();
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ return true;
+}
+
+void BufferedSocketWriter::DoWrite() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ DCHECK(socket_);
+
+ // Don't try to write if the writer not initialized or closed, or
+ // there is already a write pending.
+ if (write_pending_ || closed_)
+ return;
+
+ while (true) {
+ while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ AutoLock auto_lock(lock_);
+ if (queue_.empty())
+ return; // Nothing to write.
+ current_buf_ =
+ new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
+ queue_.pop_front();
+ }
+
+ int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(),
+ &written_callback_);
+ if (result >= 0) {
+ {
+ AutoLock auto_lock(lock_);
+ buffer_size_ -= result;
+ }
+ current_buf_->DidConsume(result);
+ } else {
+ if (result == net::ERR_IO_PENDING) {
+ write_pending_ = true;
+ } else {
+ if (write_failed_callback_.get())
+ write_failed_callback_->Run(result);
+ }
+
+ return;
+ }
+ }
+}
+
+void BufferedSocketWriter::OnWritten(int result) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ write_pending_ = false;
+
+ if (result < 0) {
+ if (write_failed_callback_.get())
+ write_failed_callback_->Run(result);
+ }
+
+ {
+ AutoLock auto_lock(lock_);
+ buffer_size_ -= result;
+ }
+ current_buf_->DidConsume(result);
+ // Schedule next write.
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+}
+
+int BufferedSocketWriter::GetBufferSize() {
+ AutoLock auto_lock(lock_);
+ return buffer_size_;
+}
+
+int BufferedSocketWriter::GetBufferChunks() {
+ AutoLock auto_lock(lock_);
+ return queue_.size();
+}
+
+void BufferedSocketWriter::Close() {
+ AutoLock auto_lock(lock_);
+ closed_ = true;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h
new file mode 100644
index 0000000..3d4709d
--- /dev/null
+++ b/remoting/protocol/buffered_socket_writer.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
+#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
+
+#include <deque>
+
+#include "base/lock.h"
+#include "base/ref_counted.h"
+#include "net/base/io_buffer.h"
+#include "net/socket/socket.h"
+
+class MessageLoop;
+
+namespace net {
+class Socket;
+} // namespace net
+
+namespace remoting {
+
+class BufferedSocketWriter
+ : public base::RefCountedThreadSafe<BufferedSocketWriter> {
+ public:
+ typedef Callback1<int>::Type WriteFailedCallback;
+
+ BufferedSocketWriter();
+ virtual ~BufferedSocketWriter();
+
+ // Initializes the writer. Must be called on the thread that will be used
+ // to access the socket in the future. |callback| will be called after each
+ // failed write.
+ void Init(net::Socket* socket, WriteFailedCallback* callback);
+
+ // Puts a new data chunk in the buffer. Returns false and doesn't enqueue
+ // the data if called before Init(). Can be called on any thread.
+ bool Write(scoped_refptr<net::IOBufferWithSize> buffer);
+
+ // Returns current size of the buffer. Can be called on any thread.
+ int GetBufferSize();
+
+ // Returns number of chunks that are currently in the buffer waiting
+ // to be written. Can be called on any thread.
+ int GetBufferChunks();
+
+ // Stops writing and drops current buffers.
+ void Close();
+
+ private:
+ typedef std::deque<scoped_refptr<net::IOBufferWithSize> > DataQueue;
+
+ void DoWrite();
+ void OnWritten(int result);
+
+ net::Socket* socket_;
+ MessageLoop* message_loop_;
+ scoped_ptr<WriteFailedCallback> write_failed_callback_;
+
+ Lock lock_;
+
+ DataQueue queue_;
+ int buffer_size_;
+ scoped_refptr<net::DrainableIOBuffer> current_buf_;
+ bool write_pending_;
+
+ net::CompletionCallbackImpl<BufferedSocketWriter> written_callback_;
+
+ bool closed_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
diff --git a/remoting/protocol/chromoting_connection.h b/remoting/protocol/chromoting_connection.h
index e9c0f42d..397b633 100644
--- a/remoting/protocol/chromoting_connection.h
+++ b/remoting/protocol/chromoting_connection.h
@@ -9,6 +9,7 @@
#include "base/callback.h"
+class MessageLoop;
class Task;
namespace net {
@@ -17,7 +18,10 @@ class Socket;
namespace remoting {
-// Generic interface for Chromoting connection.
+// Generic interface for Chromoting connection used by both client and host.
+// Provides access to the connection channels, but doesn't depend on the
+// protocol used for each channel.
+// TODO(sergeyu): Remove refcounting?
class ChromotingConnection
: public base::RefCountedThreadSafe<ChromotingConnection> {
public:
@@ -37,6 +41,7 @@ class ChromotingConnection
// Reliable PseudoTCP channels for this connection.
// TODO(sergeyu): Remove VideoChannel, and use RTP channels instead.
+ // TODO(sergeyu): Make it possible to create/destroy new channels on-fly?
virtual net::Socket* GetVideoChannel() = 0;
virtual net::Socket* GetEventsChannel() = 0;
@@ -47,7 +52,11 @@ class ChromotingConnection
// JID of the other side.
virtual const std::string& jid() = 0;
- // Closed connection. Callbacks are guaranteed not to be called after
+ // Message loop that must be used for to access the channels of this
+ // connection.
+ virtual MessageLoop* message_loop() = 0;
+
+ // Closes connection. Callbacks are guaranteed not to be called after
// |closed_task| is executed.
virtual void Close(Task* closed_task) = 0;
diff --git a/remoting/protocol/fake_connection.cc b/remoting/protocol/fake_connection.cc
new file mode 100644
index 0000000..cffacb5
--- /dev/null
+++ b/remoting/protocol/fake_connection.cc
@@ -0,0 +1,112 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/fake_connection.h"
+
+#include "base/message_loop.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+
+namespace remoting {
+
+const char kTestJid[] = "host1@gmail.com/chromoting123";
+
+FakeSocket::FakeSocket()
+ : read_pending_(false),
+ input_pos_(0) {
+}
+
+FakeSocket::~FakeSocket() {
+}
+
+void FakeSocket::AppendInputData(char* data, int data_size) {
+ input_data_.insert(input_data_.end(), data, data + data_size);
+ // Complete pending read if any.
+ if (read_pending_) {
+ read_pending_ = false;
+ int result = std::min(read_buffer_size_,
+ static_cast<int>(input_data_.size() - input_pos_));
+ CHECK(result > 0);
+ memcpy(read_buffer_->data(),
+ &(*input_data_.begin()) + input_pos_, result);
+ input_pos_ += result;
+ read_callback_->Run(result);
+ read_buffer_ = NULL;
+ }
+}
+
+int FakeSocket::Read(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback) {
+ if (input_pos_ < static_cast<int>(input_data_.size())) {
+ int result = std::min(buf_len,
+ static_cast<int>(input_data_.size()) - input_pos_);
+ memcpy(buf->data(), &(*input_data_.begin()) + input_pos_, result);
+ input_pos_ += result;
+ return result;
+ } else {
+ read_pending_ = true;
+ read_buffer_ = buf;
+ read_buffer_size_ = buf_len;
+ read_callback_ = callback;
+ return net::ERR_IO_PENDING;
+ }
+}
+
+int FakeSocket::Write(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback) {
+ written_data_.insert(written_data_.end(),
+ buf->data(), buf->data() + buf_len);
+ return buf_len;
+}
+
+bool FakeSocket::SetReceiveBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+bool FakeSocket::SetSendBufferSize(int32 size) {
+ NOTIMPLEMENTED();
+ return false;
+}
+
+FakeChromotingConnection::FakeChromotingConnection()
+ : message_loop_(NULL),
+ jid_(kTestJid) {
+}
+
+FakeChromotingConnection::~FakeChromotingConnection() { }
+
+void FakeChromotingConnection::SetStateChangeCallback(
+ StateChangeCallback* callback) {
+ callback_.reset(callback);
+}
+
+FakeSocket* FakeChromotingConnection::GetVideoChannel() {
+ return &video_channel_;
+}
+FakeSocket* FakeChromotingConnection::GetEventsChannel() {
+ return &events_channel_;
+}
+
+FakeSocket* FakeChromotingConnection::GetVideoRtpChannel() {
+ return &video_rtp_channel_;
+}
+FakeSocket* FakeChromotingConnection::GetVideoRtcpChannel() {
+ return &video_rtcp_channel_;
+}
+
+const std::string& FakeChromotingConnection::jid() {
+ return jid_;
+}
+
+MessageLoop* FakeChromotingConnection::message_loop() {
+ return message_loop_;
+}
+
+void FakeChromotingConnection::Close(Task* closed_task) {
+ closed_ = true;
+ closed_task->Run();
+ delete closed_task;
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/fake_connection.h b/remoting/protocol/fake_connection.h
new file mode 100644
index 0000000..cabce4a
--- /dev/null
+++ b/remoting/protocol/fake_connection.h
@@ -0,0 +1,94 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_FAKE_CONNECTION_H_
+#define REMOTING_PROTOCOL_FAKE_CONNECTION_H_
+
+#include <vector>
+
+#include "base/scoped_ptr.h"
+#include "net/socket/socket.h"
+#include "remoting/protocol/chromoting_connection.h"
+
+namespace remoting {
+
+extern const char kTestJid[];
+
+// FakeSocket implement net::Socket interface for FakeConnection. All data
+// written to FakeSocket is stored in a buffer returned by written_data().
+// Read() reads data from another buffer that can be set with AppendInputData().
+// Pending reads are supported, so if there is a pending read AppendInputData()
+// calls the read callback.
+class FakeSocket : public net::Socket {
+ public:
+ FakeSocket();
+ virtual ~FakeSocket();
+
+ const std::vector<char>& written_data() { return written_data_; }
+
+ void AppendInputData(char* data, int data_size);
+ int input_pos() { return input_pos_; }
+
+ // net::Socket interface.
+ virtual int Read(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback);
+ virtual int Write(net::IOBuffer* buf, int buf_len,
+ net::CompletionCallback* callback);
+
+ virtual bool SetReceiveBufferSize(int32 size);
+ virtual bool SetSendBufferSize(int32 size);
+
+ private:
+ bool read_pending_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ int read_buffer_size_;
+ net::CompletionCallback* read_callback_;
+
+ std::vector<char> written_data_;
+ std::vector<char> input_data_;
+ int input_pos_;
+};
+
+// FakeChromotingConnection is a dummy ChromotingConnection that uses
+// FakeSocket for all channels.
+class FakeChromotingConnection : public ChromotingConnection {
+ public:
+ FakeChromotingConnection();
+ virtual ~FakeChromotingConnection();
+
+ StateChangeCallback* get_state_change_callback() { return callback_.get(); }
+
+ void set_message_loop(MessageLoop* message_loop) {
+ message_loop_ = message_loop;
+ }
+
+ bool is_closed() { return closed_; }
+
+ virtual void SetStateChangeCallback(StateChangeCallback* callback);
+
+ virtual FakeSocket* GetVideoChannel();
+ virtual FakeSocket* GetEventsChannel();
+
+ virtual FakeSocket* GetVideoRtpChannel();
+ virtual FakeSocket* GetVideoRtcpChannel();
+
+ virtual const std::string& jid();
+
+ virtual MessageLoop* message_loop();
+ virtual void Close(Task* closed_task);
+
+ public:
+ scoped_ptr<StateChangeCallback> callback_;
+ MessageLoop* message_loop_;
+ FakeSocket video_channel_;
+ FakeSocket events_channel_;
+ FakeSocket video_rtp_channel_;
+ FakeSocket video_rtcp_channel_;
+ std::string jid_;
+ bool closed_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_FAKE_CONNECTION_H_
diff --git a/remoting/protocol/jingle_chromoting_connection.cc b/remoting/protocol/jingle_chromoting_connection.cc
index 5c2e246..2412627 100644
--- a/remoting/protocol/jingle_chromoting_connection.cc
+++ b/remoting/protocol/jingle_chromoting_connection.cc
@@ -28,8 +28,8 @@ const char kEventsChannelName[] = "events";
} // namespace
JingleChromotingConnection::JingleChromotingConnection(
- JingleChromotingServer* session_client)
- : session_client_(session_client),
+ JingleChromotingServer* server)
+ : server_(server),
state_(INITIALIZING),
closed_(false),
session_(NULL),
@@ -42,7 +42,7 @@ JingleChromotingConnection::~JingleChromotingConnection() {
}
void JingleChromotingConnection::Init(Session* session) {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
session_ = session;
jid_ = session_->remote_name();
@@ -55,7 +55,7 @@ bool JingleChromotingConnection::HasSession(cricket::Session* session) {
}
Session* JingleChromotingConnection::ReleaseSession() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
SetState(CLOSED);
Session* session = session_;
@@ -68,35 +68,45 @@ Session* JingleChromotingConnection::ReleaseSession() {
void JingleChromotingConnection::SetStateChangeCallback(
StateChangeCallback* callback) {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
DCHECK(callback);
state_change_callback_.reset(callback);
}
// TODO(sergeyu): Remove this method after we switch to RTP.
net::Socket* JingleChromotingConnection::GetVideoChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return video_channel_adapter_.get();
}
net::Socket* JingleChromotingConnection::GetEventsChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return events_channel_adapter_.get();
}
net::Socket* JingleChromotingConnection::GetVideoRtpChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return video_rtp_channel_.get();
}
net::Socket* JingleChromotingConnection::GetVideoRtcpChannel() {
- DCHECK_EQ(session_client_->message_loop(), MessageLoop::current());
+ DCHECK_EQ(server_->message_loop(), MessageLoop::current());
return video_rtcp_channel_.get();
}
+const std::string& JingleChromotingConnection::jid() {
+ // No synchronization is needed because jid_ is not changed
+ // after new connection is passed to JingleChromotingServer callback.
+ return jid_;
+}
+
+MessageLoop* JingleChromotingConnection::message_loop() {
+ return server_->message_loop();
+}
+
void JingleChromotingConnection::Close(Task* closed_task) {
- if (MessageLoop::current() != session_client_->message_loop()) {
- session_client_->message_loop()->PostTask(
+ if (MessageLoop::current() != server_->message_loop()) {
+ server_->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &JingleChromotingConnection::Close,
closed_task));
return;
@@ -171,7 +181,7 @@ void JingleChromotingConnection::OnSessionState(
void JingleChromotingConnection::OnInitiate(bool incoming) {
jid_ = session_->remote_name();
if (incoming)
- session_client_->AcceptConnection(this, session_);
+ server_->AcceptConnection(this, session_);
SetState(CONNECTING);
}
diff --git a/remoting/protocol/jingle_chromoting_connection.h b/remoting/protocol/jingle_chromoting_connection.h
index ada4ead..cc54182 100644
--- a/remoting/protocol/jingle_chromoting_connection.h
+++ b/remoting/protocol/jingle_chromoting_connection.h
@@ -41,9 +41,8 @@ class JingleChromotingConnection : public ChromotingConnection,
virtual net::Socket* GetVideoRtpChannel();
virtual net::Socket* GetVideoRtcpChannel();
- // No synchronization is needed because jid_ is not changed
- // after new connection is passed to JingleChromotingServer callback.
- virtual const std::string& jid() { return jid_; };
+ virtual const std::string& jid();
+ virtual MessageLoop* message_loop();
virtual void Close(Task* closed_task);
@@ -68,7 +67,7 @@ class JingleChromotingConnection : public ChromotingConnection,
void SetState(State new_state);
// JingleChromotingServer that created this connection.
- scoped_refptr<JingleChromotingServer> session_client_;
+ scoped_refptr<JingleChromotingServer> server_;
State state_;
scoped_ptr<StateChangeCallback> state_change_callback_;
diff --git a/remoting/base/protocol_decoder.cc b/remoting/protocol/messages_decoder.cc
index 10d2413..f7c7439 100644
--- a/remoting/base/protocol_decoder.cc
+++ b/remoting/protocol/messages_decoder.cc
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "remoting/base/protocol_decoder.h"
+#include "remoting/protocol/messages_decoder.h"
#include "base/logging.h"
#include "remoting/base/multiple_array_input_stream.h"
@@ -10,27 +10,30 @@
namespace remoting {
-ProtocolDecoder::ProtocolDecoder()
+MessagesDecoder::MessagesDecoder()
: last_read_position_(0),
available_bytes_(0),
next_payload_(0),
next_payload_known_(false) {
}
-ProtocolDecoder::~ProtocolDecoder() {}
+MessagesDecoder::~MessagesDecoder() {}
-void ProtocolDecoder::ParseClientMessages(scoped_refptr<media::DataBuffer> data,
+void MessagesDecoder::ParseClientMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
ClientMessageList* messages) {
- ParseMessages<ChromotingClientMessage>(data, messages);
+ ParseMessages<ChromotingClientMessage>(data, data_size, messages);
}
-void ProtocolDecoder::ParseHostMessages(scoped_refptr<media::DataBuffer> data,
+void MessagesDecoder::ParseHostMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
HostMessageList* messages) {
- ParseMessages<ChromotingHostMessage>(data, messages);
+ ParseMessages<ChromotingHostMessage>(data, data_size, messages);
}
template <typename T>
-void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data,
+void MessagesDecoder::ParseMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
std::list<T*>* messages) {
// If this is the first data in the processing queue, then set the
// last read position to 0.
@@ -38,8 +41,8 @@ void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data,
last_read_position_ = 0;
// First enqueue the data received.
- data_list_.push_back(data);
- available_bytes_ += data->GetDataSize();
+ data_list_.push_back(DataChunk(data, data_size));
+ available_bytes_ += data_size;
// Then try to parse one message until we can't parse anymore.
T* message;
@@ -49,7 +52,7 @@ void ProtocolDecoder::ParseMessages(scoped_refptr<media::DataBuffer> data,
}
template <typename T>
-bool ProtocolDecoder::ParseOneMessage(T** message) {
+bool MessagesDecoder::ParseOneMessage(T** message) {
// Determine the payload size. If we already know it, then skip this
// part.
// We have the value set to -1 for checking later.
@@ -66,18 +69,16 @@ bool ProtocolDecoder::ParseOneMessage(T** message) {
return false;
next_payload_known_ = false;
- // Extract data from |data_list_| used to form a full protocol buffer.
- DataList buffers;
- std::deque<const uint8*> buffer_pointers;
- std::deque<int> buffer_sizes;
+ // Create a MultipleArrayInputStream for parsing.
+ MultipleArrayInputStream stream;
+ std::vector<scoped_refptr<net::IOBuffer> > buffers;
while (next_payload_ > 0 && !data_list_.empty()) {
- scoped_refptr<media::DataBuffer> buffer = data_list_.front();
- size_t read_bytes = std::min(buffer->GetDataSize() - last_read_position_,
+ DataChunk* buffer = &(data_list_.front());
+ size_t read_bytes = std::min(buffer->data_size - last_read_position_,
next_payload_);
- buffers.push_back(buffer);
- buffer_pointers.push_back(buffer->GetData() + last_read_position_);
- buffer_sizes.push_back(read_bytes);
+ buffers.push_back(buffer->data);
+ stream.AddBuffer(buffer->data->data() + last_read_position_, read_bytes);
// Adjust counters.
last_read_position_ += read_bytes;
@@ -85,30 +86,24 @@ bool ProtocolDecoder::ParseOneMessage(T** message) {
available_bytes_ -= read_bytes;
// If the front buffer is fully read, remove it from the queue.
- if (buffer->GetDataSize() == last_read_position_) {
+ if (buffer->data_size == last_read_position_) {
data_list_.pop_front();
last_read_position_ = 0;
}
}
DCHECK_EQ(0UL, next_payload_);
- DCHECK_EQ(buffers.size(), buffer_pointers.size());
- DCHECK_EQ(buffers.size(), buffer_sizes.size());
-
- // Create a MultipleArrayInputStream for parsing.
- MultipleArrayInputStream stream(buffers.size());
- for (size_t i = 0; i < buffers.size(); ++i) {
- stream.SetBuffer(i, buffer_pointers[i], buffer_sizes[i]);
- }
// And finally it is parsing.
*message = new T();
bool ret = (*message)->ParseFromZeroCopyStream(&stream);
- if (!ret)
+ if (!ret) {
+ LOG(ERROR) << "Received invalid message.";
delete *message;
+ }
return ret;
}
-bool ProtocolDecoder::GetPayloadSize(int* size) {
+bool MessagesDecoder::GetPayloadSize(int* size) {
// The header has a size of 4 bytes.
const size_t kHeaderSize = sizeof(int32);
@@ -117,24 +112,24 @@ bool ProtocolDecoder::GetPayloadSize(int* size) {
std::string header;
while (header.length() < kHeaderSize && !data_list_.empty()) {
- scoped_refptr<media::DataBuffer> buffer = data_list_.front();
+ DataChunk* buffer = &(data_list_.front());
// Find out how many bytes we need and how many bytes are available in this
// buffer.
int needed_bytes = kHeaderSize - header.length();
- int available_bytes = buffer->GetDataSize() - last_read_position_;
+ int available_bytes = buffer->data_size - last_read_position_;
// Then append the required bytes into the header and advance the last
// read position.
int read_bytes = std::min(needed_bytes, available_bytes);
header.append(
- reinterpret_cast<const char*>(buffer->GetData()) + last_read_position_,
+ reinterpret_cast<char*>(buffer->data->data()) + last_read_position_,
read_bytes);
last_read_position_ += read_bytes;
available_bytes_ -= read_bytes;
// If the buffer is depleted then remove it from the queue.
- if (last_read_position_ == buffer->GetDataSize()) {
+ if (last_read_position_ == buffer->data_size) {
last_read_position_ = 0;
data_list_.pop_front();
}
diff --git a/remoting/base/protocol_decoder.h b/remoting/protocol/messages_decoder.h
index fed196f..66014f6 100644
--- a/remoting/base/protocol_decoder.h
+++ b/remoting/protocol/messages_decoder.h
@@ -2,15 +2,15 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#ifndef REMOTING_BASE_PROTOCOL_DECODER_H_
-#define REMOTING_BASE_PROTOCOL_DECODER_H_
+#ifndef REMOTING_PROTOCOL_MESSAGES_DECODER_H_
+#define REMOTING_PROTOCOL_MESSAGES_DECODER_H_
#include <deque>
#include <list>
#include "base/ref_counted.h"
#include "google/protobuf/message_lite.h"
-#include "media/base/data_buffer.h"
+#include "net/base/io_buffer.h"
#include "remoting/base/protocol/chromotocol.pb.h"
namespace remoting {
@@ -21,27 +21,45 @@ typedef std::list<ChromotingClientMessage*> ClientMessageList;
// A protocol decoder is used to decode data transmitted in the chromoting
// network.
// TODO(hclam): Defines the interface and implement methods.
-class ProtocolDecoder {
+class MessagesDecoder {
public:
- ProtocolDecoder();
+ MessagesDecoder();
+ virtual ~MessagesDecoder();
- virtual ~ProtocolDecoder();
-
- // Parse data received from network into ClientMessages. Ownership of |data|
- // is passed to this object and output is written to |messages|.
- virtual void ParseClientMessages(scoped_refptr<media::DataBuffer> data,
+ // Parse data received from network into ClientMessages. Output is written
+ // to |messages|.
+ virtual void ParseClientMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
ClientMessageList* messages);
- // Parse data received from network into HostMessages. Ownership of |data|
- // is passed to this object and output is written to |messages|.
- virtual void ParseHostMessages(scoped_refptr<media::DataBuffer> data,
+ // Parse data received from network into HostMessages. Output is
+ // written to |messages|.
+ virtual void ParseHostMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
HostMessageList* messages);
private:
+ // DataChunk stores reference to a net::IOBuffer and size of the data
+ // stored in that buffer.
+ struct DataChunk {
+ DataChunk(net::IOBuffer* data, size_t data_size)
+ : data(data),
+ data_size(data_size) {
+ }
+
+ scoped_refptr<net::IOBuffer> data;
+ size_t data_size;
+ };
+
+ // TODO(sergeyu): It might be more efficient to memcopy data to one big buffer
+ // instead of storing chunks in dqueue.
+ typedef std::deque<DataChunk> DataList;
+
// A private method used to parse data received from network into protocol
// buffers.
template <typename T>
- void ParseMessages(scoped_refptr<media::DataBuffer> data,
+ void ParseMessages(scoped_refptr<net::IOBuffer> data,
+ int data_size,
std::list<T*>* messages);
// Parse one message from |data_list_|. Return true if sucessful.
@@ -52,7 +70,6 @@ class ProtocolDecoder {
// data list. Return false if we don't have enough data.
bool GetPayloadSize(int* size);
- typedef std::deque<scoped_refptr<media::DataBuffer> > DataList;
DataList data_list_;
size_t last_read_position_;
@@ -69,4 +86,4 @@ class ProtocolDecoder {
} // namespace remoting
-#endif // REMOTING_BASE_PROTOCOL_DECODER_H_
+#endif // REMOTING_PROTOCOL_MESSAGES_DECODER_H_
diff --git a/remoting/base/protocol_decoder_unittest.cc b/remoting/protocol/messages_decoder_unittest.cc
index ae4ba85..7c5616d 100644
--- a/remoting/base/protocol_decoder_unittest.cc
+++ b/remoting/protocol/messages_decoder_unittest.cc
@@ -6,9 +6,9 @@
#include "base/scoped_ptr.h"
#include "base/stl_util-inl.h"
-#include "media/base/data_buffer.h"
-#include "remoting/base/protocol_decoder.h"
-#include "remoting/base/protocol_util.h"
+#include "net/base/io_buffer.h"
+#include "remoting/protocol/messages_decoder.h"
+#include "remoting/protocol/util.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace remoting {
@@ -20,10 +20,9 @@ static const std::string kTestData = "Chromoting rockz";
static void AppendMessage(const ChromotingHostMessage& msg,
std::string* buffer) {
// Contains one encoded message.
- scoped_refptr<media::DataBuffer> encoded_msg;
+ scoped_refptr<net::IOBufferWithSize> encoded_msg;
encoded_msg = SerializeAndFrameMessage(msg);
- buffer->append(reinterpret_cast<const char*>(encoded_msg->GetData()),
- encoded_msg->GetDataSize());
+ buffer->append(encoded_msg->data(), encoded_msg->size());
}
// Construct and prepare data in the |output_stream|.
@@ -61,20 +60,20 @@ static void PrepareData(uint8** buffer, int* size) {
memcpy(*buffer, encoded_data.c_str(), *size);
}
-TEST(ProtocolDecoderTest, BasicOperations) {
+TEST(MessagesDecoderTest, BasicOperations) {
// Prepare encoded data for testing.
int size;
uint8* test_data;
PrepareData(&test_data, &size);
scoped_array<uint8> memory_deleter(test_data);
- // Then simulate using ProtocolDecoder to decode variable
+ // Then simulate using MessagesDecoder to decode variable
// size of encoded data.
// The first thing to do is to generate a variable size of data. This is done
// by iterating the following array for read sizes.
const int kReadSizes[] = {1, 2, 3, 1};
- ProtocolDecoder decoder;
+ MessagesDecoder decoder;
// Then feed the protocol decoder using the above generated data and the
// read pattern.
@@ -84,10 +83,9 @@ TEST(ProtocolDecoderTest, BasicOperations) {
int read = std::min(size - i, kReadSizes[i % arraysize(kReadSizes)]);
// And then prepare a DataBuffer for feeding it.
- scoped_refptr<media::DataBuffer> buffer = new media::DataBuffer(read);
- memcpy(buffer->GetWritableData(), test_data + i, read);
- buffer->SetDataSize(read);
- decoder.ParseHostMessages(buffer, &message_list);
+ scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(read);
+ memcpy(buffer->data(), test_data + i, read);
+ decoder.ParseHostMessages(buffer, read, &message_list);
i += read;
}
@@ -98,10 +96,12 @@ TEST(ProtocolDecoderTest, BasicOperations) {
delete message_list.front();
message_list.pop_front();
+ int index = 0;
for (HostMessageList::iterator it = message_list.begin();
it != message_list.end(); ++it) {
ChromotingHostMessage* message = *it;
- int type = (i - 1) % 3;
+ int type = index % 3;
+ ++index;
if (type == 0) {
// Begin update stream.
EXPECT_TRUE(message->has_begin_update_stream());
diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc
new file mode 100644
index 0000000..bfceb3e
--- /dev/null
+++ b/remoting/protocol/stream_reader.cc
@@ -0,0 +1,102 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/stream_reader.h"
+
+#include "base/message_loop.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+#include "remoting/protocol/chromoting_connection.h"
+
+namespace remoting {
+
+namespace {
+int kReadBufferSize = 4096;
+} // namespace
+
+StreamReaderBase::StreamReaderBase()
+ : socket_(NULL),
+ closed_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &StreamReaderBase::OnRead)) {
+}
+
+StreamReaderBase::~StreamReaderBase() { }
+
+void StreamReaderBase::Close() {
+ closed_ = true;
+}
+
+void StreamReaderBase::Init(net::Socket* socket) {
+ DCHECK(socket);
+ socket_ = socket;
+ DoRead();
+}
+
+void StreamReaderBase::DoRead() {
+ while (true) {
+ read_buffer_ = new net::IOBuffer(kReadBufferSize);
+ int result = socket_->Read(
+ read_buffer_, kReadBufferSize, &read_callback_);
+ HandleReadResult(result);
+ if (result < 0)
+ break;
+ }
+}
+
+void StreamReaderBase::OnRead(int result) {
+ if (!closed_) {
+ HandleReadResult(result);
+ DoRead();
+ }
+}
+
+void StreamReaderBase::HandleReadResult(int result) {
+ if (result > 0) {
+ OnDataReceived(read_buffer_, result);
+ } else {
+ if (result != net::ERR_IO_PENDING)
+ LOG(ERROR) << "Read() returned error " << result;
+ }
+}
+
+// EventsStreamReader class.
+EventsStreamReader::EventsStreamReader() { }
+EventsStreamReader::~EventsStreamReader() { }
+
+void EventsStreamReader::Init(net::Socket* socket,
+ OnMessageCallback* on_message_callback) {
+ on_message_callback_.reset(on_message_callback);
+ StreamReaderBase::Init(socket);
+}
+
+void EventsStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
+ ClientMessageList messages_list;
+ messages_decoder_.ParseClientMessages(buffer, data_size, &messages_list);
+ for (ClientMessageList::iterator it = messages_list.begin();
+ it != messages_list.end(); ++it) {
+ on_message_callback_->Run(*it);
+ }
+}
+
+// VideoStreamReader class.
+VideoStreamReader::VideoStreamReader() { }
+VideoStreamReader::~VideoStreamReader() { }
+
+void VideoStreamReader::Init(net::Socket* socket,
+ OnMessageCallback* on_message_callback) {
+ on_message_callback_.reset(on_message_callback);
+ StreamReaderBase::Init(socket);
+}
+
+void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
+ HostMessageList messages_list;
+ messages_decoder_.ParseHostMessages(buffer, data_size, &messages_list);
+ for (HostMessageList::iterator it = messages_list.begin();
+ it != messages_list.end(); ++it) {
+ on_message_callback_->Run(*it);
+ }
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h
new file mode 100644
index 0000000..b71f4b0
--- /dev/null
+++ b/remoting/protocol/stream_reader.h
@@ -0,0 +1,97 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
+#define REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
+
+#include "base/callback.h"
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+#include "net/base/completion_callback.h"
+#include "remoting/protocol/messages_decoder.h"
+
+namespace net {
+class Socket;
+} // namespace net
+
+namespace remoting {
+
+class ChromotingConnection;
+
+class StreamReaderBase {
+ public:
+ StreamReaderBase();
+ virtual ~StreamReaderBase();
+
+ // Stops reading. Must be called on the same thread as Init().
+ void Close();
+
+ protected:
+ void Init(net::Socket* socket);
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0;
+
+ private:
+ void DoRead();
+ void OnRead(int result);
+ void HandleReadResult(int result);
+
+ net::Socket* socket_;
+ bool closed_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ net::CompletionCallbackImpl<StreamReaderBase> read_callback_;
+};
+
+class EventsStreamReader : public StreamReaderBase {
+ public:
+ EventsStreamReader();
+ ~EventsStreamReader();
+
+ // The OnMessageCallback is called whenever a new message is received.
+ // Ownership of the message is passed the callback.
+ typedef Callback1<ChromotingClientMessage*>::Type OnMessageCallback;
+
+ // Initialize the reader and start reading. Must be called on the thread
+ // |socket| belongs to. The callback will be called when a new message is
+ // received. EventsStreamReader owns |on_message_callback|, doesn't own
+ // |socket|.
+ void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
+
+ protected:
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
+
+ private:
+ MessagesDecoder messages_decoder_;
+ scoped_ptr<OnMessageCallback> on_message_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(EventsStreamReader);
+};
+
+class VideoStreamReader : public StreamReaderBase {
+ public:
+ VideoStreamReader();
+ ~VideoStreamReader();
+
+ // The OnMessageCallback is called whenever a new message is received.
+ // Ownership of the message is passed the callback.
+ typedef Callback1<ChromotingHostMessage*>::Type OnMessageCallback;
+
+ // Initialize the reader and start reading. Must be called on the thread
+ // |socket| belongs to. The callback will be called when a new message is
+ // received. VideoStreamReader owns |on_message_callback|, doesn't own
+ // |socket|.
+ void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
+
+ protected:
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
+
+ private:
+ MessagesDecoder messages_decoder_;
+ scoped_ptr<OnMessageCallback> on_message_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(VideoStreamReader);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
diff --git a/remoting/protocol/stream_writer.cc b/remoting/protocol/stream_writer.cc
new file mode 100644
index 0000000..3269a1b
--- /dev/null
+++ b/remoting/protocol/stream_writer.cc
@@ -0,0 +1,47 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/stream_writer.h"
+
+#include "base/message_loop.h"
+#include "remoting/protocol/chromoting_connection.h"
+#include "remoting/protocol/util.h"
+
+namespace remoting {
+
+StreamWriterBase::StreamWriterBase()
+ : socket_(NULL),
+ buffered_writer_(new BufferedSocketWriter()) {
+}
+
+StreamWriterBase::~StreamWriterBase() { }
+
+void StreamWriterBase::Init(net::Socket* socket) {
+ socket_ = socket;
+ buffered_writer_->Init(socket, NULL);
+}
+
+int StreamWriterBase::GetBufferSize() {
+ return buffered_writer_->GetBufferSize();
+}
+
+int StreamWriterBase::GetPendingMessages() {
+ return buffered_writer_->GetBufferChunks();
+}
+
+void StreamWriterBase::Close() {
+ buffered_writer_->Close();
+}
+
+bool EventsStreamWriter::SendMessage(
+ const ChromotingClientMessage& message) {
+ return buffered_writer_->Write(SerializeAndFrameMessage(message));
+}
+
+bool VideoStreamWriter::SendMessage(
+ const ChromotingHostMessage& message) {
+ return buffered_writer_->Write(SerializeAndFrameMessage(message));
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/stream_writer.h b/remoting/protocol/stream_writer.h
new file mode 100644
index 0000000..4bc2b76
--- /dev/null
+++ b/remoting/protocol/stream_writer.h
@@ -0,0 +1,53 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_STREAM_WRITER_H_
+#define REMOTING_PROTOCOL_STREAM_WRITER_H_
+
+#include "remoting/protocol/buffered_socket_writer.h"
+#include "remoting/protocol/messages_decoder.h"
+
+namespace remoting {
+
+class ChromotingConnection;
+
+class StreamWriterBase {
+ public:
+ StreamWriterBase();
+ virtual ~StreamWriterBase();
+
+ // Initializes the writer. Must be called on the thread the |socket| belongs
+ // to.
+ void Init(net::Socket* socket);
+
+ // Return current buffer state. Can be called from any thread.
+ int GetBufferSize();
+ int GetPendingMessages();
+
+ // Stop writing and drop pending data. Must be called from the same thread as
+ // Init().
+ void Close();
+
+ protected:
+ net::Socket* socket_;
+ scoped_refptr<BufferedSocketWriter> buffered_writer_;
+};
+
+class EventsStreamWriter : public StreamWriterBase {
+ public:
+ // Sends the |message| or returns false if called before Init().
+ // Can be called on any thread.
+ bool SendMessage(const ChromotingClientMessage& message);
+};
+
+class VideoStreamWriter : public StreamWriterBase {
+ public:
+ // Sends the |message| or returns false if called before Init().
+ // Can be called on any thread.
+ bool SendMessage(const ChromotingHostMessage& message);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_STREAM_WRITER_H_
diff --git a/remoting/protocol/util.cc b/remoting/protocol/util.cc
new file mode 100644
index 0000000..5a23c93
--- /dev/null
+++ b/remoting/protocol/util.cc
@@ -0,0 +1,28 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/util.h"
+
+#include "base/basictypes.h"
+#include "base/hash_tables.h"
+#include "base/logging.h"
+#include "net/base/io_buffer.h"
+#include "third_party/libjingle/source/talk/base/byteorder.h"
+
+namespace remoting {
+
+scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage(
+ const google::protobuf::MessageLite& msg) {
+ // Create a buffer with 4 extra bytes. This is used as prefix to write an
+ // int32 of the serialized message size for framing.
+ const int kExtraBytes = sizeof(int32);
+ int size = msg.ByteSize() + kExtraBytes;
+ scoped_refptr<net::IOBufferWithSize> buffer = new net::IOBufferWithSize(size);
+ talk_base::SetBE32(buffer->data(), msg.GetCachedSize());
+ msg.SerializeWithCachedSizesToArray(
+ reinterpret_cast<uint8*>(buffer->data()) + kExtraBytes);
+ return buffer;
+}
+
+} // namespace remoting
diff --git a/remoting/base/protocol_util.h b/remoting/protocol/util.h
index 94cdc2f..ef15e45 100644
--- a/remoting/base/protocol_util.h
+++ b/remoting/protocol/util.h
@@ -2,11 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#ifndef REMOTING_BASE_PROTOCOL_UTIL_H_
-#define REMOTING_BASE_PROTOCOL_UTIL_H_
+#ifndef REMOTING_PROTOCOL_UTIL_H_
+#define REMOTING_PROTOCOL_UTIL_H_
#include "google/protobuf/message_lite.h"
-#include "media/base/data_buffer.h"
+#include "net/base/io_buffer.h"
#include "remoting/base/protocol/chromotocol.pb.h"
// This file defines utility methods used for encoding and decoding the protocol
@@ -17,11 +17,9 @@ namespace remoting {
// sending it over the wire.
// This will provide sufficient prefix and suffix for the receiver side to
// decode the message.
-scoped_refptr<media::DataBuffer> SerializeAndFrameMessage(
+scoped_refptr<net::IOBufferWithSize> SerializeAndFrameMessage(
const google::protobuf::MessageLite& msg);
-int GetBytesPerPixel(PixelFormat format);
-
} // namespace remoting
-#endif // REMOTING_BASE_PROTOCOL_UTIL_H_
+#endif // REMOTING_PROTOCOL_UTIL_H_
diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp
index 716b3e2..bcde3f5 100644
--- a/remoting/remoting.gyp
+++ b/remoting/remoting.gyp
@@ -116,9 +116,11 @@
'../third_party/protobuf/protobuf.gyp:protobuf_lite',
'../third_party/libvpx/libvpx.gyp:libvpx_include',
'../third_party/zlib/zlib.gyp:zlib',
+ 'chromoting_jingle_glue',
'base/protocol/chromotocol.gyp:chromotocol_proto_lib',
'base/protocol/chromotocol.gyp:trace_proto_lib',
- 'chromoting_jingle_glue',
+ # TODO(hclam): Enable VP8 in the build.
+ #'third_party/on2/on2.gyp:vp8',
],
'export_dependent_settings': [
'../third_party/protobuf/protobuf.gyp:protobuf_lite',
@@ -154,13 +156,11 @@
'base/encoder_zlib.h',
'base/multiple_array_input_stream.cc',
'base/multiple_array_input_stream.h',
- 'base/protocol_decoder.cc',
- 'base/protocol_decoder.h',
- 'base/protocol_util.cc',
- 'base/protocol_util.h',
'base/tracer.cc',
'base/tracer.h',
'base/types.h',
+ 'base/util.cc',
+ 'base/util.h',
],
}, # end of target 'chromoting_base'
@@ -170,6 +170,7 @@
'dependencies': [
'chromoting_base',
'chromoting_jingle_glue',
+ 'chromoting_protocol',
],
'sources': [
'host/access_verifier.cc',
@@ -241,6 +242,7 @@
'dependencies': [
'chromoting_base',
'chromoting_jingle_glue',
+ 'chromoting_protocol',
],
'sources': [
'client/chromoting_client.cc',
@@ -358,12 +360,22 @@
'chromoting_jingle_glue',
],
'sources': [
+ 'protocol/messages_decoder.cc',
+ 'protocol/messages_decoder.h',
+ 'protocol/buffered_socket_writer.cc',
+ 'protocol/buffered_socket_writer.h',
'protocol/chromoting_connection.h',
'protocol/chromoting_server.h',
'protocol/jingle_chromoting_connection.cc',
'protocol/jingle_chromoting_connection.h',
'protocol/jingle_chromoting_server.cc',
'protocol/jingle_chromoting_server.h',
+ 'protocol/stream_reader.cc',
+ 'protocol/stream_reader.h',
+ 'protocol/stream_writer.cc',
+ 'protocol/stream_writer.h',
+ 'protocol/util.cc',
+ 'protocol/util.h',
],
}, # end of target 'chromoting_protocol'
@@ -411,7 +423,6 @@
# BUG57351 'base/encoder_zlib_unittest.cc',
'base/mock_objects.h',
'base/multiple_array_input_stream_unittest.cc',
-# BUG57351 'base/protocol_decoder_unittest.cc',
# BUG57351 'client/chromoting_view_unittest.cc',
'client/mock_objects.h',
'host/access_verifier_unittest.cc',
@@ -433,6 +444,9 @@
'jingle_glue/mock_objects.h',
'jingle_glue/stream_socket_adapter_unittest.cc',
'protocol/jingle_chromoting_connection_unittest.cc',
+ 'protocol/messages_decoder_unittest.cc',
+ 'protocol/fake_connection.cc',
+ 'protocol/fake_connection.h',
'protocol/session_manager_pair.cc',
'protocol/session_manager_pair.h',
'run_all_unittests.cc',