diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-14 21:58:01 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-14 21:58:01 +0000 |
commit | 051916e611af748469e61ac32a4ae65eb126deaf (patch) | |
tree | c9b52630f0c5b1cf5ab5497d69404a1b351604f9 | |
parent | 0a5fb17a569ccc83ecc1d340af9c90279e9e9817 (diff) | |
download | chromium_src-051916e611af748469e61ac32a4ae65eb126deaf.zip chromium_src-051916e611af748469e61ac32a4ae65eb126deaf.tar.gz chromium_src-051916e611af748469e61ac32a4ae65eb126deaf.tar.bz2 |
Simplified MessageReader and MessageDecoder classes.
Now these two classes are not templates, and just handle raw data.
New ProtobufMessageReader is used to parse messages.
BUG=None
TEST=Unittests.
Review URL: http://codereview.chromium.org/6277003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@71497 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | remoting/protocol/client_message_dispatcher.cc | 4 | ||||
-rw-r--r-- | remoting/protocol/client_message_dispatcher.h | 5 | ||||
-rw-r--r-- | remoting/protocol/host_message_dispatcher.cc | 8 | ||||
-rw-r--r-- | remoting/protocol/host_message_dispatcher.h | 6 | ||||
-rw-r--r-- | remoting/protocol/jingle_connection_to_host.cc | 3 | ||||
-rw-r--r-- | remoting/protocol/message_decoder.cc | 6 | ||||
-rw-r--r-- | remoting/protocol/message_decoder.h | 65 | ||||
-rw-r--r-- | remoting/protocol/message_decoder_unittest.cc | 12 | ||||
-rw-r--r-- | remoting/protocol/message_reader.cc | 32 | ||||
-rw-r--r-- | remoting/protocol/message_reader.h | 104 | ||||
-rw-r--r-- | remoting/protocol/protobuf_video_reader.cc | 6 | ||||
-rw-r--r-- | remoting/protocol/protobuf_video_reader.h | 3 | ||||
-rw-r--r-- | remoting/protocol/rtp_video_reader.cc | 4 | ||||
-rw-r--r-- | remoting/protocol/rtp_video_reader.h | 1 | ||||
-rw-r--r-- | remoting/protocol/socket_reader_base.cc | 9 | ||||
-rw-r--r-- | remoting/protocol/socket_reader_base.h | 3 | ||||
-rw-r--r-- | remoting/protocol/video_reader.h | 3 |
17 files changed, 110 insertions, 164 deletions
diff --git a/remoting/protocol/client_message_dispatcher.cc b/remoting/protocol/client_message_dispatcher.cc index c70588a..9568685 100644 --- a/remoting/protocol/client_message_dispatcher.cc +++ b/remoting/protocol/client_message_dispatcher.cc @@ -29,10 +29,10 @@ void ClientMessageDispatcher::Initialize( return; } - control_message_reader_.reset(new MessageReader()); + control_message_reader_.reset(new ProtobufMessageReader<ControlMessage>()); client_stub_ = client_stub; - control_message_reader_->Init<ControlMessage>( + control_message_reader_->Init( session->control_channel(), NewCallback(this, &ClientMessageDispatcher::OnControlMessageReceived)); return; diff --git a/remoting/protocol/client_message_dispatcher.h b/remoting/protocol/client_message_dispatcher.h index cad48e4..8f0f5a6 100644 --- a/remoting/protocol/client_message_dispatcher.h +++ b/remoting/protocol/client_message_dispatcher.h @@ -8,6 +8,7 @@ #include "base/basictypes.h" #include "base/scoped_ptr.h" #include "base/task.h" +#include "remoting/protocol/message_reader.h" namespace remoting { @@ -17,8 +18,6 @@ namespace protocol { class ClientStub; class ControlMessage; -class InputStub; -class MessageReader; class Session; // A message dispatcher used to listen for messages received in @@ -46,7 +45,7 @@ class ClientMessageDispatcher { // MessageReader that runs on the control channel. It runs a loop // that parses data on the channel and then calls the corresponding handler // in this class. - scoped_ptr<MessageReader> control_message_reader_; + scoped_ptr<ProtobufMessageReader<ControlMessage> > control_message_reader_; // Stubs for client and input. These objects are not owned. // They are called on the thread there data is received, i.e. jingle thread. diff --git a/remoting/protocol/host_message_dispatcher.cc b/remoting/protocol/host_message_dispatcher.cc index b557171..2554c9b 100644 --- a/remoting/protocol/host_message_dispatcher.cc +++ b/remoting/protocol/host_message_dispatcher.cc @@ -33,16 +33,16 @@ void HostMessageDispatcher::Initialize( return; } - control_message_reader_.reset(new MessageReader()); - event_message_reader_.reset(new MessageReader()); + control_message_reader_.reset(new ProtobufMessageReader<ControlMessage>()); + event_message_reader_.reset(new ProtobufMessageReader<EventMessage>()); host_stub_ = host_stub; input_stub_ = input_stub; // Initialize the readers on the sockets provided by channels. - event_message_reader_->Init<EventMessage>( + event_message_reader_->Init( session->event_channel(), NewCallback(this, &HostMessageDispatcher::OnEventMessageReceived)); - control_message_reader_->Init<ControlMessage>( + control_message_reader_->Init( session->control_channel(), NewCallback(this, &HostMessageDispatcher::OnControlMessageReceived)); } diff --git a/remoting/protocol/host_message_dispatcher.h b/remoting/protocol/host_message_dispatcher.h index 6274168..8ebed01 100644 --- a/remoting/protocol/host_message_dispatcher.h +++ b/remoting/protocol/host_message_dispatcher.h @@ -8,6 +8,7 @@ #include "base/basictypes.h" #include "base/scoped_ptr.h" #include "base/task.h" +#include "remoting/protocol/message_reader.h" namespace remoting { @@ -17,7 +18,6 @@ namespace protocol { class ControlMessage; class HostStub; -class MessageReader; class InputStub; class Session; @@ -54,10 +54,10 @@ class HostMessageDispatcher { // MessageReader that runs on the control channel. It runs a loop // that parses data on the channel and then delegates the message to this // class. - scoped_ptr<MessageReader> control_message_reader_; + scoped_ptr<ProtobufMessageReader<ControlMessage> > control_message_reader_; // MessageReader that runs on the event channel. - scoped_ptr<MessageReader> event_message_reader_; + scoped_ptr<ProtobufMessageReader<EventMessage> > event_message_reader_; // Stubs for host and input. These objects are not owned. // They are called on the thread there data is received, i.e. jingle thread. diff --git a/remoting/protocol/jingle_connection_to_host.cc b/remoting/protocol/jingle_connection_to_host.cc index f786ef8..f680012 100644 --- a/remoting/protocol/jingle_connection_to_host.cc +++ b/remoting/protocol/jingle_connection_to_host.cc @@ -57,9 +57,6 @@ void JingleConnectionToHost::Disconnect() { return; } - control_reader_.Close(); - video_reader_->Close(); - if (session_) { session_->Close( NewRunnableMethod(this, &JingleConnectionToHost::OnDisconnected)); diff --git a/remoting/protocol/message_decoder.cc b/remoting/protocol/message_decoder.cc index 09f331a..b460b4d 100644 --- a/remoting/protocol/message_decoder.cc +++ b/remoting/protocol/message_decoder.cc @@ -20,12 +20,12 @@ MessageDecoder::MessageDecoder() MessageDecoder::~MessageDecoder() {} -void MessageDecoder::AddBuffer(scoped_refptr<net::IOBuffer> data, - int data_size) { +void MessageDecoder::AddData(scoped_refptr<net::IOBuffer> data, + int data_size) { buffer_.Append(data, data_size); } -bool MessageDecoder::GetNextMessageData(CompoundBuffer* message_buffer) { +bool MessageDecoder::GetNextMessage(CompoundBuffer* message_buffer) { // Determine the payload size. If we already know it then skip this part. // We may not have enough data to determine the payload size so use a // utility function to find out. diff --git a/remoting/protocol/message_decoder.h b/remoting/protocol/message_decoder.h index 8b521fe..3e0745f7 100644 --- a/remoting/protocol/message_decoder.h +++ b/remoting/protocol/message_decoder.h @@ -17,63 +17,30 @@ namespace remoting { namespace protocol { -// MessageDecoder uses CompoundBuffer to decode bytes into protocol -// buffer messages. This can be used to decode bytes received from the -// network. +// MessageDecoder uses CompoundBuffer to split the data received from +// the network into separate messages. Each message is expected to be +// decoded in the stream as follows: +// +--------------+--------------+ +// | message_size | message_data | +// +--------------+--------------+ // -// It provides ParseMessages() which accepts an IOBuffer. If enough bytes -// are collected to produce protocol buffer messages then the bytes will be -// consumed and the generated protocol buffer messages are added to the output -// list. -// -// It retains ownership of IOBuffer given to this object and keeps it alive -// until it is full consumed. +// Here, message_size is 4-byte integer that represents size of +// message_data in bytes. message_data - content of the message. class MessageDecoder { public: MessageDecoder(); virtual ~MessageDecoder(); - // Parses the bytes in |data| into a protobuf of type MessageType. The bytes - // in |data| are conceptually a stream of bytes to be parsed into a series of - // protobufs. Each parsed protouf is appended into the |messages|. All calls - // to ParseMessages should use same MessageType for any single instace of - // MessageDecoder. - - // This function retains |data| until all its bytes are consumed. - // Ownership of the produced protobufs is passed to the caller via the - // |messages| list. - template <class MessageType> - void ParseMessages(scoped_refptr<net::IOBuffer> data, - int data_size, std::list<MessageType*>* messages) { - AddBuffer(data, data_size); - - // Then try to parse the next message until we can't parse anymore. - MessageType* message; - while (ParseOneMessage<MessageType>(&message)) { - messages->push_back(message); - } - } - - private: - // Parse one message from |buffer_list_|. Return true if sucessful. - template <class MessageType> - bool ParseOneMessage(MessageType** message) { - CompoundBuffer buffer; - if (!GetNextMessageData(&buffer)) - return false; - - CompoundBufferInputStream stream(&buffer); - *message = new MessageType(); - bool ret = (*message)->ParseFromZeroCopyStream(&stream); - if (!ret) - delete *message; - return ret; - } - - void AddBuffer(scoped_refptr<net::IOBuffer> data, int data_size); + // Add next chunk of data. MessageDecoder retains |data| until all + // its bytes are consumed. + void AddData(scoped_refptr<net::IOBuffer> data, int data_size); - bool GetNextMessageData(CompoundBuffer* message_buffer); + // Get next message from the stream and puts it in + // |message_buffer|. Returns false if there are no complete messages + // yet. + bool GetNextMessage(CompoundBuffer* message_buffer); + private: // Retrieves the read payload size of the current protocol buffer via |size|. // Returns false and leaves |size| unmodified, if we do not have enough data // to retrieve the current size. diff --git a/remoting/protocol/message_decoder_unittest.cc b/remoting/protocol/message_decoder_unittest.cc index f2ecfcc..81bb699 100644 --- a/remoting/protocol/message_decoder_unittest.cc +++ b/remoting/protocol/message_decoder_unittest.cc @@ -69,7 +69,17 @@ void SimulateReadSequence(const int read_sequence[], int sequence_size) { // And then prepare an IOBuffer for feeding it. scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(read)); memcpy(buffer->data(), test_data + i, read); - decoder.ParseMessages(buffer, read, &message_list); + decoder.AddData(buffer, read); + while (true) { + CompoundBuffer message; + if (!decoder.GetNextMessage(&message)) + break; + + EventMessage* event = new EventMessage(); + CompoundBufferInputStream stream(&message); + ASSERT_TRUE(event->ParseFromZeroCopyStream(&stream)); + message_list.push_back(event); + } i += read; } diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc index 75b14f2..7b818ee 100644 --- a/remoting/protocol/message_reader.cc +++ b/remoting/protocol/message_reader.cc @@ -24,23 +24,18 @@ MessageReader::MessageReader() } MessageReader::~MessageReader() { - // Destroy MessageReaderPrivate if it was created. - if (destruction_callback_.get()) - destruction_callback_->Run(); } -void MessageReader::Close() { - closed_ = true; -} - -void MessageReader::Init(net::Socket* socket) { +void MessageReader::Init(net::Socket* socket, + MessageReceivedCallback* callback) { + message_received_callback_.reset(callback); DCHECK(socket); socket_ = socket; DoRead(); } void MessageReader::DoRead() { - while (true) { + while (!closed_) { read_buffer_ = new net::IOBuffer(kReadBufferSize); int result = socket_->Read( read_buffer_, kReadBufferSize, &read_callback_); @@ -59,10 +54,25 @@ void MessageReader::OnRead(int result) { void MessageReader::HandleReadResult(int result) { if (result > 0) { - data_received_callback_->Run(read_buffer_, result); + OnDataReceived(read_buffer_, result); } else { - if (result != net::ERR_IO_PENDING) + if (result == net::ERR_CONNECTION_CLOSED) { + closed_ = true; + } else if (result != net::ERR_IO_PENDING) { LOG(ERROR) << "Read() returned error " << result; + } + } +} + +void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { + message_decoder_.AddData(data, data_size); + + while (true) { + CompoundBuffer buffer; + if (!message_decoder_.GetNextMessage(&buffer)) + break; + + message_received_callback_->Run(&buffer); } } diff --git a/remoting/protocol/message_reader.h b/remoting/protocol/message_reader.h index 8522630..d493778 100644 --- a/remoting/protocol/message_reader.h +++ b/remoting/protocol/message_reader.h @@ -10,87 +10,35 @@ #include "base/scoped_ptr.h" #include "base/task.h" #include "net/base/completion_callback.h" -#include "net/base/io_buffer.h" +#include "remoting/base/compound_buffer.h" #include "remoting/protocol/message_decoder.h" namespace net { +class IOBuffer; class Socket; } // namespace net namespace remoting { namespace protocol { -class MessageReader; - -namespace internal { - -template <class T> -class MessageReaderPrivate { - private: - friend class remoting::protocol::MessageReader; - - typedef typename Callback1<T*>::Type MessageReceivedCallback; - - MessageReaderPrivate(MessageReceivedCallback* callback) - : message_received_callback_(callback) { - } - - ~MessageReaderPrivate() { } - - void OnDataReceived(net::IOBuffer* buffer, int data_size) { - typedef typename std::list<T*>::iterator MessageListIterator; - - std::list<T*> message_list; - message_decoder_.ParseMessages(buffer, data_size, &message_list); - for (MessageListIterator it = message_list.begin(); - it != message_list.end(); ++it) { - message_received_callback_->Run(*it); - } - } - - void Destroy() { - delete this; - } - - // Message decoder is used to decode bytes into protobuf message. - MessageDecoder message_decoder_; - - // Callback is called when a message is received. - scoped_ptr<MessageReceivedCallback> message_received_callback_; -}; - -} // namespace internal - -// MessageReader reads data from the socket asynchronously and uses -// MessageReaderPrivate to decode the data received. +// MessageReader reads data from the socket asynchronously and calls +// callback for each message it receives class MessageReader { public: + typedef Callback1<CompoundBuffer*>::Type MessageReceivedCallback; + MessageReader(); virtual ~MessageReader(); - // Stops reading. Must be called on the same thread as Init(). - void Close(); - // Initialize the MessageReader with a socket. If a message is received // |callback| is called. - template <class T> - void Init(net::Socket* socket, typename Callback1<T*>::Type* callback) { - internal::MessageReaderPrivate<T>* reader = - new internal::MessageReaderPrivate<T>(callback); - data_received_callback_.reset( - ::NewCallback( - reader, &internal::MessageReaderPrivate<T>::OnDataReceived)); - destruction_callback_.reset( - ::NewCallback(reader, &internal::MessageReaderPrivate<T>::Destroy)); - Init(socket); - } + void Init(net::Socket* socket, MessageReceivedCallback* callback); private: - void Init(net::Socket* socket); - void DoRead(); void OnRead(int result); void HandleReadResult(int result); + void OnDataReceived(net::IOBuffer* data, int data_size); net::Socket* socket_; @@ -98,8 +46,40 @@ class MessageReader { scoped_refptr<net::IOBuffer> read_buffer_; net::CompletionCallbackImpl<MessageReader> read_callback_; - scoped_ptr<Callback2<net::IOBuffer*, int>::Type> data_received_callback_; - scoped_ptr<Callback0::Type> destruction_callback_; + MessageDecoder message_decoder_; + + // Callback is called when a message is received. + scoped_ptr<MessageReceivedCallback> message_received_callback_; +}; + +template <class T> +class ProtobufMessageReader { + public: + typedef typename Callback1<T*>::Type MessageReceivedCallback; + + ProtobufMessageReader() { }; + ~ProtobufMessageReader() { }; + + void Init(net::Socket* socket, MessageReceivedCallback* callback) { + message_received_callback_.reset(callback); + message_reader_.Init( + socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData)); + } + + private: + void OnNewData(CompoundBuffer* buffer) { + T* message = new T(); + CompoundBufferInputStream stream(buffer); + bool ret = message->ParseFromZeroCopyStream(&stream); + if (!ret) { + delete message; + } else { + message_received_callback_->Run(message); + } + } + + MessageReader message_reader_; + scoped_ptr<MessageReceivedCallback> message_received_callback_; }; } // namespace protocol diff --git a/remoting/protocol/protobuf_video_reader.cc b/remoting/protocol/protobuf_video_reader.cc index 31f4aae..02e06f8 100644 --- a/remoting/protocol/protobuf_video_reader.cc +++ b/remoting/protocol/protobuf_video_reader.cc @@ -19,16 +19,12 @@ ProtobufVideoReader::~ProtobufVideoReader() { } void ProtobufVideoReader::Init(protocol::Session* session, VideoStub* video_stub) { - reader_.Init<VideoPacket>( + reader_.Init( session->video_channel(), NewCallback(this, &ProtobufVideoReader::OnNewData)); video_stub_ = video_stub; } -void ProtobufVideoReader::Close() { - reader_.Close(); -} - void ProtobufVideoReader::OnNewData(VideoPacket* packet) { video_stub_->ProcessVideoPacket(packet, new DeleteTask<VideoPacket>(packet)); } diff --git a/remoting/protocol/protobuf_video_reader.h b/remoting/protocol/protobuf_video_reader.h index 42e1c97..7305dd7 100644 --- a/remoting/protocol/protobuf_video_reader.h +++ b/remoting/protocol/protobuf_video_reader.h @@ -21,14 +21,13 @@ class ProtobufVideoReader : public VideoReader { // VideoReader interface. virtual void Init(protocol::Session* session, VideoStub* video_stub); - virtual void Close(); private: void OnNewData(VideoPacket* packet); VideoPacketFormat::Encoding encoding_; - MessageReader reader_; + ProtobufMessageReader<VideoPacket> reader_; // The stub that processes all received packets. VideoStub* video_stub_; diff --git a/remoting/protocol/rtp_video_reader.cc b/remoting/protocol/rtp_video_reader.cc index d7e9e52..ad00a4a 100644 --- a/remoting/protocol/rtp_video_reader.cc +++ b/remoting/protocol/rtp_video_reader.cc @@ -36,10 +36,6 @@ void RtpVideoReader::Init(protocol::Session* session, VideoStub* video_stub) { video_stub_ = video_stub; } -void RtpVideoReader::Close() { - rtp_reader_.Close(); -} - void RtpVideoReader::ResetQueue() { for (PacketsQueue::iterator it = packets_queue_.begin(); it != packets_queue_.end(); ++it) { diff --git a/remoting/protocol/rtp_video_reader.h b/remoting/protocol/rtp_video_reader.h index d8d89f7..a99bf7c 100644 --- a/remoting/protocol/rtp_video_reader.h +++ b/remoting/protocol/rtp_video_reader.h @@ -22,7 +22,6 @@ class RtpVideoReader : public VideoReader { // VideoReader interface. virtual void Init(protocol::Session* session, VideoStub* video_stub); - virtual void Close(); private: friend class RtpVideoReaderTest; diff --git a/remoting/protocol/socket_reader_base.cc b/remoting/protocol/socket_reader_base.cc index 41648bc..743120d 100644 --- a/remoting/protocol/socket_reader_base.cc +++ b/remoting/protocol/socket_reader_base.cc @@ -24,10 +24,6 @@ SocketReaderBase::SocketReaderBase() SocketReaderBase::~SocketReaderBase() { } -void SocketReaderBase::Close() { - closed_ = true; -} - void SocketReaderBase::Init(net::Socket* socket) { DCHECK(socket); socket_ = socket; @@ -56,8 +52,11 @@ void SocketReaderBase::HandleReadResult(int result) { if (result > 0) { OnDataReceived(read_buffer_, result); } else { - if (result != net::ERR_IO_PENDING) + if (result == net::ERR_CONNECTION_CLOSED) { + closed_ = true; + } else if (result != net::ERR_IO_PENDING) { LOG(ERROR) << "Read() returned error " << result; + } } } diff --git a/remoting/protocol/socket_reader_base.h b/remoting/protocol/socket_reader_base.h index 7dd3dee..5e366f8 100644 --- a/remoting/protocol/socket_reader_base.h +++ b/remoting/protocol/socket_reader_base.h @@ -20,9 +20,6 @@ class SocketReaderBase { SocketReaderBase(); virtual ~SocketReaderBase(); - // Stops reading. Must be called on the same thread as Init(). - void Close(); - protected: void Init(net::Socket* socket); virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0; diff --git a/remoting/protocol/video_reader.h b/remoting/protocol/video_reader.h index ab94913..ba39be3 100644 --- a/remoting/protocol/video_reader.h +++ b/remoting/protocol/video_reader.h @@ -32,9 +32,6 @@ class VideoReader { virtual void Init(Session* session, VideoStub* video_stub) = 0; - // Closes the reader. The stub should not be called after Close(). - virtual void Close() = 0; - protected: VideoReader() { } |