summaryrefslogtreecommitdiffstats
path: root/remoting/protocol/buffered_socket_writer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'remoting/protocol/buffered_socket_writer.cc')
-rw-r--r--remoting/protocol/buffered_socket_writer.cc123
1 files changed, 87 insertions, 36 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc
index 6c60a4a..0650cbe 100644
--- a/remoting/protocol/buffered_socket_writer.cc
+++ b/remoting/protocol/buffered_socket_writer.cc
@@ -9,63 +9,69 @@
namespace remoting {
-BufferedSocketWriter::BufferedSocketWriter()
- : socket_(NULL),
- message_loop_(NULL),
- buffer_size_(0),
- write_pending_(false),
- ALLOW_THIS_IN_INITIALIZER_LIST(
- written_callback_(this, &BufferedSocketWriter::OnWritten)),
- closed_(false) {
+BufferedSocketWriterBase::BufferedSocketWriterBase()
+ : buffer_size_(0),
+ socket_(NULL),
+ message_loop_(NULL),
+ write_pending_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ written_callback_(this, &BufferedSocketWriterBase::OnWritten)),
+ closed_(false) {
}
-BufferedSocketWriter::~BufferedSocketWriter() { }
+BufferedSocketWriterBase::~BufferedSocketWriterBase() { }
-void BufferedSocketWriter::Init(net::Socket* socket,
+void BufferedSocketWriterBase::Init(net::Socket* socket,
WriteFailedCallback* callback) {
AutoLock auto_lock(lock_);
message_loop_ = MessageLoop::current();
socket_ = socket;
}
-bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) {
+bool BufferedSocketWriterBase::Write(
+ scoped_refptr<net::IOBufferWithSize> data) {
AutoLock auto_lock(lock_);
if (!socket_)
return false;
- queue_.push_back(data);
+ queue_.push(data);
buffer_size_ += data->size();
message_loop_->PostTask(
- FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
return true;
}
-void BufferedSocketWriter::DoWrite() {
+void BufferedSocketWriterBase::DoWrite() {
DCHECK_EQ(message_loop_, MessageLoop::current());
DCHECK(socket_);
- // Don't try to write if the writer not initialized or closed, or
- // there is already a write pending.
- if (write_pending_ || closed_)
+ // Don't try to write if there is another write pending.
+ if (write_pending_)
return;
+ // Don't write after Close().
+ {
+ AutoLock auto_lock(lock_);
+ if (closed_)
+ return;
+ }
+
while (true) {
- while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ net::IOBuffer* current_packet;
+ int current_packet_size;
+ {
AutoLock auto_lock(lock_);
- if (queue_.empty())
- return; // Nothing to write.
- current_buf_ =
- new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
- queue_.pop_front();
+ GetNextPacket_Locked(&current_packet, &current_packet_size);
}
- int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(),
+ // Return if the queue is empty.
+ if (!current_packet)
+ return;
+
+ int result = socket_->Write(current_packet, current_packet_size,
&written_callback_);
if (result >= 0) {
- {
- AutoLock auto_lock(lock_);
- buffer_size_ -= result;
- }
- current_buf_->DidConsume(result);
+ AutoLock auto_lock(lock_);
+ AdvanceBufferPosition_Locked(result);
} else {
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
@@ -79,38 +85,83 @@ void BufferedSocketWriter::DoWrite() {
}
}
-void BufferedSocketWriter::OnWritten(int result) {
+void BufferedSocketWriterBase::OnWritten(int result) {
DCHECK_EQ(message_loop_, MessageLoop::current());
write_pending_ = false;
if (result < 0) {
if (write_failed_callback_.get())
write_failed_callback_->Run(result);
+ return;
}
{
AutoLock auto_lock(lock_);
- buffer_size_ -= result;
+ AdvanceBufferPosition_Locked(result);
}
- current_buf_->DidConsume(result);
+
// Schedule next write.
message_loop_->PostTask(
- FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
}
-int BufferedSocketWriter::GetBufferSize() {
+int BufferedSocketWriterBase::GetBufferSize() {
AutoLock auto_lock(lock_);
return buffer_size_;
}
-int BufferedSocketWriter::GetBufferChunks() {
+int BufferedSocketWriterBase::GetBufferChunks() {
AutoLock auto_lock(lock_);
return queue_.size();
}
-void BufferedSocketWriter::Close() {
+void BufferedSocketWriterBase::Close() {
AutoLock auto_lock(lock_);
closed_ = true;
}
+BufferedSocketWriter::BufferedSocketWriter() { }
+BufferedSocketWriter::~BufferedSocketWriter() { }
+
+void BufferedSocketWriter::GetNextPacket_Locked(
+ net::IOBuffer** buffer, int* size) {
+ while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ if (queue_.empty()) {
+ *buffer = NULL;
+ return; // Nothing to write.
+ }
+ current_buf_ =
+ new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
+ queue_.pop();
+ }
+
+ *buffer = current_buf_;
+ *size = current_buf_->BytesRemaining();
+}
+
+void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) {
+ buffer_size_ -= written;
+ current_buf_->DidConsume(written);
+}
+
+BufferedDatagramWriter::BufferedDatagramWriter() { }
+BufferedDatagramWriter::~BufferedDatagramWriter() { }
+
+void BufferedDatagramWriter::GetNextPacket_Locked(
+ net::IOBuffer** buffer, int* size) {
+ if (queue_.empty()) {
+ *buffer = NULL;
+ return; // Nothing to write.
+ }
+ *buffer = queue_.front();
+ *size = queue_.front()->size();
+}
+
+void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) {
+ DCHECK_EQ(written, queue_.front()->size());
+ buffer_size_ -= queue_.front()->size();
+ queue_.pop();
+}
+
+
} // namespace remoting