summaryrefslogtreecommitdiffstats
path: root/remoting/protocol/buffered_socket_writer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'remoting/protocol/buffered_socket_writer.cc')
-rw-r--r--remoting/protocol/buffered_socket_writer.cc116
1 files changed, 116 insertions, 0 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc
new file mode 100644
index 0000000..6c60a4a
--- /dev/null
+++ b/remoting/protocol/buffered_socket_writer.cc
@@ -0,0 +1,116 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/buffered_socket_writer.h"
+
+#include "base/message_loop.h"
+#include "net/base/net_errors.h"
+
+namespace remoting {
+
+BufferedSocketWriter::BufferedSocketWriter()
+ : socket_(NULL),
+ message_loop_(NULL),
+ buffer_size_(0),
+ write_pending_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ written_callback_(this, &BufferedSocketWriter::OnWritten)),
+ closed_(false) {
+}
+
+BufferedSocketWriter::~BufferedSocketWriter() { }
+
+void BufferedSocketWriter::Init(net::Socket* socket,
+ WriteFailedCallback* callback) {
+ AutoLock auto_lock(lock_);
+ message_loop_ = MessageLoop::current();
+ socket_ = socket;
+}
+
+bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) {
+ AutoLock auto_lock(lock_);
+ if (!socket_)
+ return false;
+ queue_.push_back(data);
+ buffer_size_ += data->size();
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ return true;
+}
+
+void BufferedSocketWriter::DoWrite() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ DCHECK(socket_);
+
+ // Don't try to write if the writer not initialized or closed, or
+ // there is already a write pending.
+ if (write_pending_ || closed_)
+ return;
+
+ while (true) {
+ while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ AutoLock auto_lock(lock_);
+ if (queue_.empty())
+ return; // Nothing to write.
+ current_buf_ =
+ new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
+ queue_.pop_front();
+ }
+
+ int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(),
+ &written_callback_);
+ if (result >= 0) {
+ {
+ AutoLock auto_lock(lock_);
+ buffer_size_ -= result;
+ }
+ current_buf_->DidConsume(result);
+ } else {
+ if (result == net::ERR_IO_PENDING) {
+ write_pending_ = true;
+ } else {
+ if (write_failed_callback_.get())
+ write_failed_callback_->Run(result);
+ }
+
+ return;
+ }
+ }
+}
+
+void BufferedSocketWriter::OnWritten(int result) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ write_pending_ = false;
+
+ if (result < 0) {
+ if (write_failed_callback_.get())
+ write_failed_callback_->Run(result);
+ }
+
+ {
+ AutoLock auto_lock(lock_);
+ buffer_size_ -= result;
+ }
+ current_buf_->DidConsume(result);
+ // Schedule next write.
+ message_loop_->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+}
+
+int BufferedSocketWriter::GetBufferSize() {
+ AutoLock auto_lock(lock_);
+ return buffer_size_;
+}
+
+int BufferedSocketWriter::GetBufferChunks() {
+ AutoLock auto_lock(lock_);
+ return queue_.size();
+}
+
+void BufferedSocketWriter::Close() {
+ AutoLock auto_lock(lock_);
+ closed_ = true;
+}
+
+} // namespace remoting