summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-01-14 21:58:01 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-01-14 21:58:01 +0000
commit051916e611af748469e61ac32a4ae65eb126deaf (patch)
treec9b52630f0c5b1cf5ab5497d69404a1b351604f9
parent0a5fb17a569ccc83ecc1d340af9c90279e9e9817 (diff)
downloadchromium_src-051916e611af748469e61ac32a4ae65eb126deaf.zip
chromium_src-051916e611af748469e61ac32a4ae65eb126deaf.tar.gz
chromium_src-051916e611af748469e61ac32a4ae65eb126deaf.tar.bz2
Simplified MessageReader and MessageDecoder classes.
Now these two classes are not templates, and just handle raw data. New ProtobufMessageReader is used to parse messages. BUG=None TEST=Unittests. Review URL: http://codereview.chromium.org/6277003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@71497 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--remoting/protocol/client_message_dispatcher.cc4
-rw-r--r--remoting/protocol/client_message_dispatcher.h5
-rw-r--r--remoting/protocol/host_message_dispatcher.cc8
-rw-r--r--remoting/protocol/host_message_dispatcher.h6
-rw-r--r--remoting/protocol/jingle_connection_to_host.cc3
-rw-r--r--remoting/protocol/message_decoder.cc6
-rw-r--r--remoting/protocol/message_decoder.h65
-rw-r--r--remoting/protocol/message_decoder_unittest.cc12
-rw-r--r--remoting/protocol/message_reader.cc32
-rw-r--r--remoting/protocol/message_reader.h104
-rw-r--r--remoting/protocol/protobuf_video_reader.cc6
-rw-r--r--remoting/protocol/protobuf_video_reader.h3
-rw-r--r--remoting/protocol/rtp_video_reader.cc4
-rw-r--r--remoting/protocol/rtp_video_reader.h1
-rw-r--r--remoting/protocol/socket_reader_base.cc9
-rw-r--r--remoting/protocol/socket_reader_base.h3
-rw-r--r--remoting/protocol/video_reader.h3
17 files changed, 110 insertions, 164 deletions
diff --git a/remoting/protocol/client_message_dispatcher.cc b/remoting/protocol/client_message_dispatcher.cc
index c70588a..9568685 100644
--- a/remoting/protocol/client_message_dispatcher.cc
+++ b/remoting/protocol/client_message_dispatcher.cc
@@ -29,10 +29,10 @@ void ClientMessageDispatcher::Initialize(
return;
}
- control_message_reader_.reset(new MessageReader());
+ control_message_reader_.reset(new ProtobufMessageReader<ControlMessage>());
client_stub_ = client_stub;
- control_message_reader_->Init<ControlMessage>(
+ control_message_reader_->Init(
session->control_channel(),
NewCallback(this, &ClientMessageDispatcher::OnControlMessageReceived));
return;
diff --git a/remoting/protocol/client_message_dispatcher.h b/remoting/protocol/client_message_dispatcher.h
index cad48e4..8f0f5a6 100644
--- a/remoting/protocol/client_message_dispatcher.h
+++ b/remoting/protocol/client_message_dispatcher.h
@@ -8,6 +8,7 @@
#include "base/basictypes.h"
#include "base/scoped_ptr.h"
#include "base/task.h"
+#include "remoting/protocol/message_reader.h"
namespace remoting {
@@ -17,8 +18,6 @@ namespace protocol {
class ClientStub;
class ControlMessage;
-class InputStub;
-class MessageReader;
class Session;
// A message dispatcher used to listen for messages received in
@@ -46,7 +45,7 @@ class ClientMessageDispatcher {
// MessageReader that runs on the control channel. It runs a loop
// that parses data on the channel and then calls the corresponding handler
// in this class.
- scoped_ptr<MessageReader> control_message_reader_;
+ scoped_ptr<ProtobufMessageReader<ControlMessage> > control_message_reader_;
// Stubs for client and input. These objects are not owned.
// They are called on the thread there data is received, i.e. jingle thread.
diff --git a/remoting/protocol/host_message_dispatcher.cc b/remoting/protocol/host_message_dispatcher.cc
index b557171..2554c9b 100644
--- a/remoting/protocol/host_message_dispatcher.cc
+++ b/remoting/protocol/host_message_dispatcher.cc
@@ -33,16 +33,16 @@ void HostMessageDispatcher::Initialize(
return;
}
- control_message_reader_.reset(new MessageReader());
- event_message_reader_.reset(new MessageReader());
+ control_message_reader_.reset(new ProtobufMessageReader<ControlMessage>());
+ event_message_reader_.reset(new ProtobufMessageReader<EventMessage>());
host_stub_ = host_stub;
input_stub_ = input_stub;
// Initialize the readers on the sockets provided by channels.
- event_message_reader_->Init<EventMessage>(
+ event_message_reader_->Init(
session->event_channel(),
NewCallback(this, &HostMessageDispatcher::OnEventMessageReceived));
- control_message_reader_->Init<ControlMessage>(
+ control_message_reader_->Init(
session->control_channel(),
NewCallback(this, &HostMessageDispatcher::OnControlMessageReceived));
}
diff --git a/remoting/protocol/host_message_dispatcher.h b/remoting/protocol/host_message_dispatcher.h
index 6274168..8ebed01 100644
--- a/remoting/protocol/host_message_dispatcher.h
+++ b/remoting/protocol/host_message_dispatcher.h
@@ -8,6 +8,7 @@
#include "base/basictypes.h"
#include "base/scoped_ptr.h"
#include "base/task.h"
+#include "remoting/protocol/message_reader.h"
namespace remoting {
@@ -17,7 +18,6 @@ namespace protocol {
class ControlMessage;
class HostStub;
-class MessageReader;
class InputStub;
class Session;
@@ -54,10 +54,10 @@ class HostMessageDispatcher {
// MessageReader that runs on the control channel. It runs a loop
// that parses data on the channel and then delegates the message to this
// class.
- scoped_ptr<MessageReader> control_message_reader_;
+ scoped_ptr<ProtobufMessageReader<ControlMessage> > control_message_reader_;
// MessageReader that runs on the event channel.
- scoped_ptr<MessageReader> event_message_reader_;
+ scoped_ptr<ProtobufMessageReader<EventMessage> > event_message_reader_;
// Stubs for host and input. These objects are not owned.
// They are called on the thread there data is received, i.e. jingle thread.
diff --git a/remoting/protocol/jingle_connection_to_host.cc b/remoting/protocol/jingle_connection_to_host.cc
index f786ef8..f680012 100644
--- a/remoting/protocol/jingle_connection_to_host.cc
+++ b/remoting/protocol/jingle_connection_to_host.cc
@@ -57,9 +57,6 @@ void JingleConnectionToHost::Disconnect() {
return;
}
- control_reader_.Close();
- video_reader_->Close();
-
if (session_) {
session_->Close(
NewRunnableMethod(this, &JingleConnectionToHost::OnDisconnected));
diff --git a/remoting/protocol/message_decoder.cc b/remoting/protocol/message_decoder.cc
index 09f331a..b460b4d 100644
--- a/remoting/protocol/message_decoder.cc
+++ b/remoting/protocol/message_decoder.cc
@@ -20,12 +20,12 @@ MessageDecoder::MessageDecoder()
MessageDecoder::~MessageDecoder() {}
-void MessageDecoder::AddBuffer(scoped_refptr<net::IOBuffer> data,
- int data_size) {
+void MessageDecoder::AddData(scoped_refptr<net::IOBuffer> data,
+ int data_size) {
buffer_.Append(data, data_size);
}
-bool MessageDecoder::GetNextMessageData(CompoundBuffer* message_buffer) {
+bool MessageDecoder::GetNextMessage(CompoundBuffer* message_buffer) {
// Determine the payload size. If we already know it then skip this part.
// We may not have enough data to determine the payload size so use a
// utility function to find out.
diff --git a/remoting/protocol/message_decoder.h b/remoting/protocol/message_decoder.h
index 8b521fe..3e0745f7 100644
--- a/remoting/protocol/message_decoder.h
+++ b/remoting/protocol/message_decoder.h
@@ -17,63 +17,30 @@
namespace remoting {
namespace protocol {
-// MessageDecoder uses CompoundBuffer to decode bytes into protocol
-// buffer messages. This can be used to decode bytes received from the
-// network.
+// MessageDecoder uses CompoundBuffer to split the data received from
+// the network into separate messages. Each message is expected to be
+// decoded in the stream as follows:
+// +--------------+--------------+
+// | message_size | message_data |
+// +--------------+--------------+
//
-// It provides ParseMessages() which accepts an IOBuffer. If enough bytes
-// are collected to produce protocol buffer messages then the bytes will be
-// consumed and the generated protocol buffer messages are added to the output
-// list.
-//
-// It retains ownership of IOBuffer given to this object and keeps it alive
-// until it is full consumed.
+// Here, message_size is 4-byte integer that represents size of
+// message_data in bytes. message_data - content of the message.
class MessageDecoder {
public:
MessageDecoder();
virtual ~MessageDecoder();
- // Parses the bytes in |data| into a protobuf of type MessageType. The bytes
- // in |data| are conceptually a stream of bytes to be parsed into a series of
- // protobufs. Each parsed protouf is appended into the |messages|. All calls
- // to ParseMessages should use same MessageType for any single instace of
- // MessageDecoder.
-
- // This function retains |data| until all its bytes are consumed.
- // Ownership of the produced protobufs is passed to the caller via the
- // |messages| list.
- template <class MessageType>
- void ParseMessages(scoped_refptr<net::IOBuffer> data,
- int data_size, std::list<MessageType*>* messages) {
- AddBuffer(data, data_size);
-
- // Then try to parse the next message until we can't parse anymore.
- MessageType* message;
- while (ParseOneMessage<MessageType>(&message)) {
- messages->push_back(message);
- }
- }
-
- private:
- // Parse one message from |buffer_list_|. Return true if sucessful.
- template <class MessageType>
- bool ParseOneMessage(MessageType** message) {
- CompoundBuffer buffer;
- if (!GetNextMessageData(&buffer))
- return false;
-
- CompoundBufferInputStream stream(&buffer);
- *message = new MessageType();
- bool ret = (*message)->ParseFromZeroCopyStream(&stream);
- if (!ret)
- delete *message;
- return ret;
- }
-
- void AddBuffer(scoped_refptr<net::IOBuffer> data, int data_size);
+ // Add next chunk of data. MessageDecoder retains |data| until all
+ // its bytes are consumed.
+ void AddData(scoped_refptr<net::IOBuffer> data, int data_size);
- bool GetNextMessageData(CompoundBuffer* message_buffer);
+ // Get next message from the stream and puts it in
+ // |message_buffer|. Returns false if there are no complete messages
+ // yet.
+ bool GetNextMessage(CompoundBuffer* message_buffer);
+ private:
// Retrieves the read payload size of the current protocol buffer via |size|.
// Returns false and leaves |size| unmodified, if we do not have enough data
// to retrieve the current size.
diff --git a/remoting/protocol/message_decoder_unittest.cc b/remoting/protocol/message_decoder_unittest.cc
index f2ecfcc..81bb699 100644
--- a/remoting/protocol/message_decoder_unittest.cc
+++ b/remoting/protocol/message_decoder_unittest.cc
@@ -69,7 +69,17 @@ void SimulateReadSequence(const int read_sequence[], int sequence_size) {
// And then prepare an IOBuffer for feeding it.
scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(read));
memcpy(buffer->data(), test_data + i, read);
- decoder.ParseMessages(buffer, read, &message_list);
+ decoder.AddData(buffer, read);
+ while (true) {
+ CompoundBuffer message;
+ if (!decoder.GetNextMessage(&message))
+ break;
+
+ EventMessage* event = new EventMessage();
+ CompoundBufferInputStream stream(&message);
+ ASSERT_TRUE(event->ParseFromZeroCopyStream(&stream));
+ message_list.push_back(event);
+ }
i += read;
}
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc
index 75b14f2..7b818ee 100644
--- a/remoting/protocol/message_reader.cc
+++ b/remoting/protocol/message_reader.cc
@@ -24,23 +24,18 @@ MessageReader::MessageReader()
}
MessageReader::~MessageReader() {
- // Destroy MessageReaderPrivate if it was created.
- if (destruction_callback_.get())
- destruction_callback_->Run();
}
-void MessageReader::Close() {
- closed_ = true;
-}
-
-void MessageReader::Init(net::Socket* socket) {
+void MessageReader::Init(net::Socket* socket,
+ MessageReceivedCallback* callback) {
+ message_received_callback_.reset(callback);
DCHECK(socket);
socket_ = socket;
DoRead();
}
void MessageReader::DoRead() {
- while (true) {
+ while (!closed_) {
read_buffer_ = new net::IOBuffer(kReadBufferSize);
int result = socket_->Read(
read_buffer_, kReadBufferSize, &read_callback_);
@@ -59,10 +54,25 @@ void MessageReader::OnRead(int result) {
void MessageReader::HandleReadResult(int result) {
if (result > 0) {
- data_received_callback_->Run(read_buffer_, result);
+ OnDataReceived(read_buffer_, result);
} else {
- if (result != net::ERR_IO_PENDING)
+ if (result == net::ERR_CONNECTION_CLOSED) {
+ closed_ = true;
+ } else if (result != net::ERR_IO_PENDING) {
LOG(ERROR) << "Read() returned error " << result;
+ }
+ }
+}
+
+void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) {
+ message_decoder_.AddData(data, data_size);
+
+ while (true) {
+ CompoundBuffer buffer;
+ if (!message_decoder_.GetNextMessage(&buffer))
+ break;
+
+ message_received_callback_->Run(&buffer);
}
}
diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h
index 8522630..d493778 100644
--- a/remoting/protocol/message_reader.h
+++ b/remoting/protocol/message_reader.h
@@ -10,87 +10,35 @@
#include "base/scoped_ptr.h"
#include "base/task.h"
#include "net/base/completion_callback.h"
-#include "net/base/io_buffer.h"
+#include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_decoder.h"
namespace net {
+class IOBuffer;
class Socket;
} // namespace net
namespace remoting {
namespace protocol {
-class MessageReader;
-
-namespace internal {
-
-template <class T>
-class MessageReaderPrivate {
- private:
- friend class remoting::protocol::MessageReader;
-
- typedef typename Callback1<T*>::Type MessageReceivedCallback;
-
- MessageReaderPrivate(MessageReceivedCallback* callback)
- : message_received_callback_(callback) {
- }
-
- ~MessageReaderPrivate() { }
-
- void OnDataReceived(net::IOBuffer* buffer, int data_size) {
- typedef typename std::list<T*>::iterator MessageListIterator;
-
- std::list<T*> message_list;
- message_decoder_.ParseMessages(buffer, data_size, &message_list);
- for (MessageListIterator it = message_list.begin();
- it != message_list.end(); ++it) {
- message_received_callback_->Run(*it);
- }
- }
-
- void Destroy() {
- delete this;
- }
-
- // Message decoder is used to decode bytes into protobuf message.
- MessageDecoder message_decoder_;
-
- // Callback is called when a message is received.
- scoped_ptr<MessageReceivedCallback> message_received_callback_;
-};
-
-} // namespace internal
-
-// MessageReader reads data from the socket asynchronously and uses
-// MessageReaderPrivate to decode the data received.
+// MessageReader reads data from the socket asynchronously and calls
+// callback for each message it receives
class MessageReader {
public:
+ typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback;
+
MessageReader();
virtual ~MessageReader();
- // Stops reading. Must be called on the same thread as Init().
- void Close();
-
// Initialize the MessageReader with a socket. If a message is received
// |callback| is called.
- template <class T>
- void Init(net::Socket* socket, typename Callback1<T*>::Type* callback) {
- internal::MessageReaderPrivate<T>* reader =
- new internal::MessageReaderPrivate<T>(callback);
- data_received_callback_.reset(
- ::NewCallback(
- reader, &internal::MessageReaderPrivate<T>::OnDataReceived));
- destruction_callback_.reset(
- ::NewCallback(reader, &internal::MessageReaderPrivate<T>::Destroy));
- Init(socket);
- }
+ void Init(net::Socket* socket, MessageReceivedCallback* callback);
private:
- void Init(net::Socket* socket);
-
void DoRead();
void OnRead(int result);
void HandleReadResult(int result);
+ void OnDataReceived(net::IOBuffer* data, int data_size);
net::Socket* socket_;
@@ -98,8 +46,40 @@ class MessageReader {
scoped_refptr<net::IOBuffer> read_buffer_;
net::CompletionCallbackImpl<MessageReader> read_callback_;
- scoped_ptr<Callback2<net::IOBuffer*, int>::Type> data_received_callback_;
- scoped_ptr<Callback0::Type> destruction_callback_;
+ MessageDecoder message_decoder_;
+
+ // Callback is called when a message is received.
+ scoped_ptr<MessageReceivedCallback> message_received_callback_;
+};
+
+template <class T>
+class ProtobufMessageReader {
+ public:
+ typedef typename Callback1<T*>::Type MessageReceivedCallback;
+
+ ProtobufMessageReader() { };
+ ~ProtobufMessageReader() { };
+
+ void Init(net::Socket* socket, MessageReceivedCallback* callback) {
+ message_received_callback_.reset(callback);
+ message_reader_.Init(
+ socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData));
+ }
+
+ private:
+ void OnNewData(CompoundBuffer* buffer) {
+ T* message = new T();
+ CompoundBufferInputStream stream(buffer);
+ bool ret = message->ParseFromZeroCopyStream(&stream);
+ if (!ret) {
+ delete message;
+ } else {
+ message_received_callback_->Run(message);
+ }
+ }
+
+ MessageReader message_reader_;
+ scoped_ptr<MessageReceivedCallback> message_received_callback_;
};
} // namespace protocol
diff --git a/remoting/protocol/protobuf_video_reader.cc b/remoting/protocol/protobuf_video_reader.cc
index 31f4aae..02e06f8 100644
--- a/remoting/protocol/protobuf_video_reader.cc
+++ b/remoting/protocol/protobuf_video_reader.cc
@@ -19,16 +19,12 @@ ProtobufVideoReader::~ProtobufVideoReader() { }
void ProtobufVideoReader::Init(protocol::Session* session,
VideoStub* video_stub) {
- reader_.Init<VideoPacket>(
+ reader_.Init(
session->video_channel(),
NewCallback(this, &ProtobufVideoReader::OnNewData));
video_stub_ = video_stub;
}
-void ProtobufVideoReader::Close() {
- reader_.Close();
-}
-
void ProtobufVideoReader::OnNewData(VideoPacket* packet) {
video_stub_->ProcessVideoPacket(packet, new DeleteTask<VideoPacket>(packet));
}
diff --git a/remoting/protocol/protobuf_video_reader.h b/remoting/protocol/protobuf_video_reader.h
index 42e1c97..7305dd7 100644
--- a/remoting/protocol/protobuf_video_reader.h
+++ b/remoting/protocol/protobuf_video_reader.h
@@ -21,14 +21,13 @@ class ProtobufVideoReader : public VideoReader {
// VideoReader interface.
virtual void Init(protocol::Session* session, VideoStub* video_stub);
- virtual void Close();
private:
void OnNewData(VideoPacket* packet);
VideoPacketFormat::Encoding encoding_;
- MessageReader reader_;
+ ProtobufMessageReader<VideoPacket> reader_;
// The stub that processes all received packets.
VideoStub* video_stub_;
diff --git a/remoting/protocol/rtp_video_reader.cc b/remoting/protocol/rtp_video_reader.cc
index d7e9e52..ad00a4a 100644
--- a/remoting/protocol/rtp_video_reader.cc
+++ b/remoting/protocol/rtp_video_reader.cc
@@ -36,10 +36,6 @@ void RtpVideoReader::Init(protocol::Session* session, VideoStub* video_stub) {
video_stub_ = video_stub;
}
-void RtpVideoReader::Close() {
- rtp_reader_.Close();
-}
-
void RtpVideoReader::ResetQueue() {
for (PacketsQueue::iterator it = packets_queue_.begin();
it != packets_queue_.end(); ++it) {
diff --git a/remoting/protocol/rtp_video_reader.h b/remoting/protocol/rtp_video_reader.h
index d8d89f7..a99bf7c 100644
--- a/remoting/protocol/rtp_video_reader.h
+++ b/remoting/protocol/rtp_video_reader.h
@@ -22,7 +22,6 @@ class RtpVideoReader : public VideoReader {
// VideoReader interface.
virtual void Init(protocol::Session* session, VideoStub* video_stub);
- virtual void Close();
private:
friend class RtpVideoReaderTest;
diff --git a/remoting/protocol/socket_reader_base.cc b/remoting/protocol/socket_reader_base.cc
index 41648bc..743120d 100644
--- a/remoting/protocol/socket_reader_base.cc
+++ b/remoting/protocol/socket_reader_base.cc
@@ -24,10 +24,6 @@ SocketReaderBase::SocketReaderBase()
SocketReaderBase::~SocketReaderBase() { }
-void SocketReaderBase::Close() {
- closed_ = true;
-}
-
void SocketReaderBase::Init(net::Socket* socket) {
DCHECK(socket);
socket_ = socket;
@@ -56,8 +52,11 @@ void SocketReaderBase::HandleReadResult(int result) {
if (result > 0) {
OnDataReceived(read_buffer_, result);
} else {
- if (result != net::ERR_IO_PENDING)
+ if (result == net::ERR_CONNECTION_CLOSED) {
+ closed_ = true;
+ } else if (result != net::ERR_IO_PENDING) {
LOG(ERROR) << "Read() returned error " << result;
+ }
}
}
diff --git a/remoting/protocol/socket_reader_base.h b/remoting/protocol/socket_reader_base.h
index 7dd3dee..5e366f8 100644
--- a/remoting/protocol/socket_reader_base.h
+++ b/remoting/protocol/socket_reader_base.h
@@ -20,9 +20,6 @@ class SocketReaderBase {
SocketReaderBase();
virtual ~SocketReaderBase();
- // Stops reading. Must be called on the same thread as Init().
- void Close();
-
protected:
void Init(net::Socket* socket);
virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0;
diff --git a/remoting/protocol/video_reader.h b/remoting/protocol/video_reader.h
index ab94913..ba39be3 100644
--- a/remoting/protocol/video_reader.h
+++ b/remoting/protocol/video_reader.h
@@ -32,9 +32,6 @@ class VideoReader {
virtual void Init(Session* session,
VideoStub* video_stub) = 0;
- // Closes the reader. The stub should not be called after Close().
- virtual void Close() = 0;
-
protected:
VideoReader() { }