diff options
Diffstat (limited to 'remoting/protocol/message_reader.cc')
-rw-r--r-- | remoting/protocol/message_reader.cc | 57 |
1 files changed, 50 insertions, 7 deletions
diff --git a/remoting/protocol/message_reader.cc b/remoting/protocol/message_reader.cc index 7b818ee..3bbfd59 100644 --- a/remoting/protocol/message_reader.cc +++ b/remoting/protocol/message_reader.cc @@ -18,12 +18,16 @@ static const int kReadBufferSize = 4096; MessageReader::MessageReader() : socket_(NULL), + message_loop_(NULL), + read_pending_(false), + pending_messages_(0), closed_(false), ALLOW_THIS_IN_INITIALIZER_LIST( read_callback_(this, &MessageReader::OnRead)) { } MessageReader::~MessageReader() { + CHECK_EQ(pending_messages_, 0); } void MessageReader::Init(net::Socket* socket, @@ -31,21 +35,27 @@ void MessageReader::Init(net::Socket* socket, message_received_callback_.reset(callback); DCHECK(socket); socket_ = socket; + message_loop_ = MessageLoop::current(); DoRead(); } void MessageReader::DoRead() { - while (!closed_) { + DCHECK(!read_pending_); + + // Don't try to read again if there is another read pending or we + // have messages that we haven't finished processing yet. + while (!closed_ && !read_pending_ && pending_messages_ == 0) { read_buffer_ = new net::IOBuffer(kReadBufferSize); int result = socket_->Read( read_buffer_, kReadBufferSize, &read_callback_); HandleReadResult(result); - if (result < 0) - break; } } void MessageReader::OnRead(int result) { + DCHECK(read_pending_); + read_pending_ = false; + if (!closed_) { HandleReadResult(result); DoRead(); @@ -53,12 +63,17 @@ void MessageReader::OnRead(int result) { } void MessageReader::HandleReadResult(int result) { + if (closed_) + return; + if (result > 0) { OnDataReceived(read_buffer_, result); } else { if (result == net::ERR_CONNECTION_CLOSED) { closed_ = true; - } else if (result != net::ERR_IO_PENDING) { + } else if (result == net::ERR_IO_PENDING) { + read_pending_ = true; + } else { LOG(ERROR) << "Read() returned error " << result; } } @@ -67,14 +82,42 @@ void MessageReader::HandleReadResult(int result) { void MessageReader::OnDataReceived(net::IOBuffer* data, int data_size) { message_decoder_.AddData(data, data_size); + // Get list of all new messages first, and then call the callback + // for all of them. + std::vector<CompoundBuffer*> new_messages; while (true) { - CompoundBuffer buffer; - if (!message_decoder_.GetNextMessage(&buffer)) + CompoundBuffer* buffer = message_decoder_.GetNextMessage(); + if (!buffer) break; + new_messages.push_back(buffer); + } + + pending_messages_ += new_messages.size(); - message_received_callback_->Run(&buffer); + for (std::vector<CompoundBuffer*>::iterator it = new_messages.begin(); + it != new_messages.end(); ++it) { + message_received_callback_->Run(*it, NewRunnableMethod( + this, &MessageReader::OnMessageDone, *it)); } } +void MessageReader::OnMessageDone(CompoundBuffer* message) { + delete message; + ProcessDoneEvent(); +} + +void MessageReader::ProcessDoneEvent() { + if (MessageLoop::current() != message_loop_) { + message_loop_->PostTask(FROM_HERE, NewRunnableMethod( + this, &MessageReader::ProcessDoneEvent)); + return; + } + + pending_messages_--; + DCHECK_GE(pending_messages_, 0); + + DoRead(); // Start next read if neccessary. +} + } // namespace protocol } // namespace remoting |