diff options
Diffstat (limited to 'remoting/protocol/buffered_socket_writer.cc')
-rw-r--r-- | remoting/protocol/buffered_socket_writer.cc | 123 |
1 files changed, 87 insertions, 36 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc index 6c60a4a..0650cbe 100644 --- a/remoting/protocol/buffered_socket_writer.cc +++ b/remoting/protocol/buffered_socket_writer.cc @@ -9,63 +9,69 @@ namespace remoting { -BufferedSocketWriter::BufferedSocketWriter() - : socket_(NULL), - message_loop_(NULL), - buffer_size_(0), - write_pending_(false), - ALLOW_THIS_IN_INITIALIZER_LIST( - written_callback_(this, &BufferedSocketWriter::OnWritten)), - closed_(false) { +BufferedSocketWriterBase::BufferedSocketWriterBase() + : buffer_size_(0), + socket_(NULL), + message_loop_(NULL), + write_pending_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + written_callback_(this, &BufferedSocketWriterBase::OnWritten)), + closed_(false) { } -BufferedSocketWriter::~BufferedSocketWriter() { } +BufferedSocketWriterBase::~BufferedSocketWriterBase() { } -void BufferedSocketWriter::Init(net::Socket* socket, +void BufferedSocketWriterBase::Init(net::Socket* socket, WriteFailedCallback* callback) { AutoLock auto_lock(lock_); message_loop_ = MessageLoop::current(); socket_ = socket; } -bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) { +bool BufferedSocketWriterBase::Write( + scoped_refptr<net::IOBufferWithSize> data) { AutoLock auto_lock(lock_); if (!socket_) return false; - queue_.push_back(data); + queue_.push(data); buffer_size_ += data->size(); message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); return true; } -void BufferedSocketWriter::DoWrite() { +void BufferedSocketWriterBase::DoWrite() { DCHECK_EQ(message_loop_, MessageLoop::current()); DCHECK(socket_); - // Don't try to write if the writer not initialized or closed, or - // there is already a write pending. - if (write_pending_ || closed_) + // Don't try to write if there is another write pending. + if (write_pending_) return; + // Don't write after Close(). + { + AutoLock auto_lock(lock_); + if (closed_) + return; + } + while (true) { - while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + net::IOBuffer* current_packet; + int current_packet_size; + { AutoLock auto_lock(lock_); - if (queue_.empty()) - return; // Nothing to write. - current_buf_ = - new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); - queue_.pop_front(); + GetNextPacket_Locked(¤t_packet, ¤t_packet_size); } - int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(), + // Return if the queue is empty. + if (!current_packet) + return; + + int result = socket_->Write(current_packet, current_packet_size, &written_callback_); if (result >= 0) { - { - AutoLock auto_lock(lock_); - buffer_size_ -= result; - } - current_buf_->DidConsume(result); + AutoLock auto_lock(lock_); + AdvanceBufferPosition_Locked(result); } else { if (result == net::ERR_IO_PENDING) { write_pending_ = true; @@ -79,38 +85,83 @@ void BufferedSocketWriter::DoWrite() { } } -void BufferedSocketWriter::OnWritten(int result) { +void BufferedSocketWriterBase::OnWritten(int result) { DCHECK_EQ(message_loop_, MessageLoop::current()); write_pending_ = false; if (result < 0) { if (write_failed_callback_.get()) write_failed_callback_->Run(result); + return; } { AutoLock auto_lock(lock_); - buffer_size_ -= result; + AdvanceBufferPosition_Locked(result); } - current_buf_->DidConsume(result); + // Schedule next write. message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); } -int BufferedSocketWriter::GetBufferSize() { +int BufferedSocketWriterBase::GetBufferSize() { AutoLock auto_lock(lock_); return buffer_size_; } -int BufferedSocketWriter::GetBufferChunks() { +int BufferedSocketWriterBase::GetBufferChunks() { AutoLock auto_lock(lock_); return queue_.size(); } -void BufferedSocketWriter::Close() { +void BufferedSocketWriterBase::Close() { AutoLock auto_lock(lock_); closed_ = true; } +BufferedSocketWriter::BufferedSocketWriter() { } +BufferedSocketWriter::~BufferedSocketWriter() { } + +void BufferedSocketWriter::GetNextPacket_Locked( + net::IOBuffer** buffer, int* size) { + while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + if (queue_.empty()) { + *buffer = NULL; + return; // Nothing to write. + } + current_buf_ = + new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); + queue_.pop(); + } + + *buffer = current_buf_; + *size = current_buf_->BytesRemaining(); +} + +void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) { + buffer_size_ -= written; + current_buf_->DidConsume(written); +} + +BufferedDatagramWriter::BufferedDatagramWriter() { } +BufferedDatagramWriter::~BufferedDatagramWriter() { } + +void BufferedDatagramWriter::GetNextPacket_Locked( + net::IOBuffer** buffer, int* size) { + if (queue_.empty()) { + *buffer = NULL; + return; // Nothing to write. + } + *buffer = queue_.front(); + *size = queue_.front()->size(); +} + +void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) { + DCHECK_EQ(written, queue_.front()->size()); + buffer_size_ -= queue_.front()->size(); + queue_.pop(); +} + + } // namespace remoting |