summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authoryzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-27 18:13:25 +0000
committeryzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-27 18:13:25 +0000
commit4976265023cd8aa03dcc65a0958b9e4ad778ab75 (patch)
treef1d88f9b4d09c39b419b612e40b218bbaad3a59b /mojo
parent06bd7ea902f66a54a410184b8f78bc30b4b306b6 (diff)
downloadchromium_src-4976265023cd8aa03dcc65a0958b9e4ad778ab75.zip
chromium_src-4976265023cd8aa03dcc65a0958b9e4ad778ab75.tar.gz
chromium_src-4976265023cd8aa03dcc65a0958b9e4ad778ab75.tar.bz2
Restructure RawChannelPosix and RawChannel, so that the platform-independent logic can be shared between POSIX and Windows.
BUG=None TEST=mojo_system_unittests R=viettrungluu@chromium.org Review URL: https://codereview.chromium.org/169723004 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253865 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp1
-rw-r--r--mojo/system/raw_channel.cc344
-rw-r--r--mojo/system/raw_channel.h150
-rw-r--r--mojo/system/raw_channel_posix.cc478
4 files changed, 657 insertions, 316 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 82cd8ab..8074028 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -138,6 +138,7 @@
'system/message_pipe_endpoint.h',
'system/proxy_message_pipe_endpoint.cc',
'system/proxy_message_pipe_endpoint.h',
+ 'system/raw_channel.cc',
'system/raw_channel.h',
'system/raw_channel_posix.cc',
'system/raw_channel_win.cc',
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
new file mode 100644
index 0000000..5d8da3b
--- /dev/null
+++ b/mojo/system/raw_channel.cc
@@ -0,0 +1,344 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/raw_channel.h"
+
+#include <string.h>
+
+#include <algorithm>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/stl_util.h"
+#include "mojo/system/message_in_transit.h"
+
+namespace mojo {
+namespace system {
+
+const size_t kReadSize = 4096;
+
+RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
+}
+
+RawChannel::ReadBuffer::~ReadBuffer() {}
+
+void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
+ DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
+ *addr = &buffer_[0] + num_valid_bytes_;
+ *size = kReadSize;
+}
+
+RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
+
+RawChannel::WriteBuffer::~WriteBuffer() {
+ STLDeleteElements(&message_queue_);
+}
+
+void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
+ buffers->clear();
+
+ size_t bytes_to_write = GetTotalBytesToWrite();
+ if (bytes_to_write == 0)
+ return;
+
+ MessageInTransit* message = message_queue_.front();
+ if (!message->secondary_buffer_size()) {
+ // Only write from the main buffer.
+ DCHECK_LT(offset_, message->main_buffer_size());
+ DCHECK_LE(bytes_to_write, message->main_buffer_size());
+ Buffer buffer = {
+ static_cast<const char*>(message->main_buffer()) + offset_,
+ bytes_to_write};
+ buffers->push_back(buffer);
+ return;
+ }
+
+ if (offset_ >= message->main_buffer_size()) {
+ // Only write from the secondary buffer.
+ DCHECK_LT(offset_ - message->main_buffer_size(),
+ message->secondary_buffer_size());
+ DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
+ Buffer buffer = {
+ static_cast<const char*>(message->secondary_buffer()) +
+ (offset_ - message->main_buffer_size()),
+ bytes_to_write};
+ buffers->push_back(buffer);
+ return;
+ }
+
+ // Write from both buffers.
+ DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
+ message->secondary_buffer_size());
+ Buffer buffer1 = {
+ static_cast<const char*>(message->main_buffer()) + offset_,
+ message->main_buffer_size() - offset_};
+ buffers->push_back(buffer1);
+ Buffer buffer2 = {
+ static_cast<const char*>(message->secondary_buffer()),
+ message->secondary_buffer_size()};
+ buffers->push_back(buffer2);
+}
+
+size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
+ if (message_queue_.empty())
+ return 0;
+
+ MessageInTransit* message = message_queue_.front();
+ DCHECK_LT(offset_, message->total_size());
+ return message->total_size() - offset_;
+}
+
+RawChannel::RawChannel(Delegate* delegate,
+ base::MessageLoopForIO* message_loop_for_io)
+ : delegate_(delegate),
+ message_loop_for_io_(message_loop_for_io),
+ read_stopped_(false),
+ write_stopped_(false),
+ weak_ptr_factory_(this) {
+}
+
+RawChannel::~RawChannel() {
+ DCHECK(!read_buffer_);
+ DCHECK(!write_buffer_);
+
+ // No need to take the |write_lock_| here -- if there are still weak pointers
+ // outstanding, then we're hosed anyway (since we wouldn't be able to
+ // invalidate them cleanly, since we might not be on the I/O thread).
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs());
+}
+
+bool RawChannel::Init() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ // No need to take the lock. No one should be using us yet.
+ DCHECK(!read_buffer_);
+ read_buffer_.reset(new ReadBuffer);
+ DCHECK(!write_buffer_);
+ write_buffer_.reset(new WriteBuffer);
+
+ if (!OnInit())
+ return false;
+
+ return ScheduleRead() == IO_PENDING;
+}
+
+void RawChannel::Shutdown() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ base::AutoLock locker(write_lock_);
+
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
+ read_stopped_ = true;
+ write_stopped_ = true;
+
+ OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
+}
+
+// Reminder: This must be thread-safe.
+bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
+ base::AutoLock locker(write_lock_);
+ if (write_stopped_)
+ return false;
+
+ if (!write_buffer_->message_queue_.empty()) {
+ write_buffer_->message_queue_.push_back(message.release());
+ return true;
+ }
+
+ write_buffer_->message_queue_.push_front(message.release());
+ DCHECK_EQ(write_buffer_->offset_, 0u);
+
+ size_t bytes_written = 0;
+ IOResult io_result = WriteNoLock(&bytes_written);
+ if (io_result == IO_PENDING)
+ return true;
+
+ bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
+ bytes_written);
+ if (!result) {
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the
+ // nested context.
+ message_loop_for_io_->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::CallOnFatalError,
+ weak_ptr_factory_.GetWeakPtr(),
+ Delegate::FATAL_ERROR_FAILED_WRITE));
+ }
+
+ return result;
+}
+
+RawChannel::ReadBuffer* RawChannel::read_buffer() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ return read_buffer_.get();
+}
+
+RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
+ write_lock_.AssertAcquired();
+ return write_buffer_.get();
+}
+
+void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ if (read_stopped_) {
+ NOTREACHED();
+ return;
+ }
+
+ IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
+
+ // Keep reading data in a loop, and dispatches messages if enough data is
+ // received. Exit the loop if any of the following happens:
+ // - one or more messages were dispatched;
+ // - the last read failed, was a partial read or would block;
+ // - |Shutdown()| was called.
+ do {
+ if (io_result != IO_SUCCEEDED) {
+ read_stopped_ = true;
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
+ return;
+ }
+
+ read_buffer_->num_valid_bytes_ += bytes_read;
+
+ // Dispatch all the messages that we can.
+ bool did_dispatch_message = false;
+ // Tracks the offset of the first undispatched message in |read_buffer_|.
+ // Currently, we copy data to ensure that this is zero at the beginning.
+ size_t read_buffer_start = 0;
+ size_t remaining_bytes = read_buffer_->num_valid_bytes_;
+ size_t message_size;
+ // Note that we rely on short-circuit evaluation here:
+ // - |read_buffer_start| may be an invalid index into
+ // |read_buffer_->buffer_| if |remaining_bytes| is zero.
+ // - |message_size| is only valid if |GetNextMessageSize()| returns true.
+ // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
+ // next read).
+ while (remaining_bytes > 0 &&
+ MessageInTransit::GetNextMessageSize(
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
+ &message_size) &&
+ remaining_bytes >= message_size) {
+ // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
+ // some sort of "view" abstraction.
+ MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
+ &read_buffer_->buffer_[read_buffer_start]);
+ DCHECK_EQ(message.total_size(), message_size);
+
+ // Dispatch the message.
+ delegate_->OnReadMessage(message);
+ if (read_stopped_) {
+ // |Shutdown()| was called in |OnReadMessage()|.
+ // TODO(vtl): Add test for this case.
+ return;
+ }
+ did_dispatch_message = true;
+
+ // Update our state.
+ read_buffer_start += message_size;
+ remaining_bytes -= message_size;
+ }
+
+ if (read_buffer_start > 0) {
+ // Move data back to start.
+ read_buffer_->num_valid_bytes_ = remaining_bytes;
+ if (read_buffer_->num_valid_bytes_ > 0) {
+ memmove(&read_buffer_->buffer_[0],
+ &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
+ }
+ read_buffer_start = 0;
+ }
+
+ if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
+ kReadSize) {
+ // Use power-of-2 buffer sizes.
+ // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
+ // maximum message size to whatever extent necessary).
+ // TODO(vtl): We may often be able to peek at the header and get the real
+ // required extra space (which may be much bigger than |kReadSize|).
+ size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
+ while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
+ new_size *= 2;
+
+ // TODO(vtl): It's suboptimal to zero out the fresh memory.
+ read_buffer_->buffer_.resize(new_size, 0);
+ }
+
+ // (1) If we dispatched any messages, stop reading for now (and let the
+ // message loop do its thing for another round).
+ // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
+ // a single message. Risks: slower, more complex if we want to avoid lots of
+ // copying. ii. Keep reading until there's no more data and dispatch all the
+ // messages we can. Risks: starvation of other users of the message loop.)
+ // (2) If we didn't max out |kReadSize|, stop reading for now.
+ bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
+ bytes_read = 0;
+ io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
+ } while (io_result != IO_PENDING);
+}
+
+void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+
+ bool did_fail = false;
+ {
+ base::AutoLock locker(write_lock_);
+ DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
+
+ if (write_stopped_) {
+ NOTREACHED();
+ return;
+ }
+
+ did_fail = !OnWriteCompletedNoLock(result, bytes_written);
+ }
+
+ if (did_fail)
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
+}
+
+void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
+ delegate_->OnFatalError(fatal_error);
+}
+
+bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
+ write_lock_.AssertAcquired();
+
+ DCHECK(!write_stopped_);
+ DCHECK(!write_buffer_->message_queue_.empty());
+
+ if (result) {
+ if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
+ // Partial (or no) write.
+ write_buffer_->offset_ += bytes_written;
+ } else {
+ // Complete write.
+ DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
+ delete write_buffer_->message_queue_.front();
+ write_buffer_->message_queue_.pop_front();
+ write_buffer_->offset_ = 0;
+ }
+
+ if (write_buffer_->message_queue_.empty())
+ return true;
+
+ // Schedule the next write.
+ if (ScheduleWriteNoLock() == IO_PENDING)
+ return true;
+ }
+
+ write_stopped_ = true;
+ STLDeleteElements(&write_buffer_->message_queue_);
+ write_buffer_->offset_ = 0;
+ return false;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
index 4b14edd..4b0f0f4 100644
--- a/mojo/system/raw_channel.h
+++ b/mojo/system/raw_channel.h
@@ -5,10 +5,13 @@
#ifndef MOJO_SYSTEM_RAW_CHANNEL_H_
#define MOJO_SYSTEM_RAW_CHANNEL_H_
+#include <deque>
#include <vector>
#include "base/macros.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/synchronization/lock.h"
#include "mojo/system/constants.h"
#include "mojo/system/embedder/scoped_platform_handle.h"
#include "mojo/system/system_impl_export.h"
@@ -39,7 +42,7 @@ class MessageInTransit;
// on which |Init()| is called).
class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
public:
- virtual ~RawChannel() {}
+ virtual ~RawChannel();
// The |Delegate| is only accessed on the same thread as the message loop
// (passed in on creation).
@@ -78,26 +81,159 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// This must be called (on an I/O thread) before this object is used. Returns
// true on success. On failure, |Shutdown()| should *not* be called.
- virtual bool Init() = 0;
+ bool Init();
// This must be called (on the I/O thread) before this object is destroyed.
- virtual void Shutdown() = 0;
+ void Shutdown();
// This is thread-safe. It takes ownership of |message| (always, even on
// failure). Returns true on success.
- virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) = 0;
+ bool WriteMessage(scoped_ptr<MessageInTransit> message);
protected:
- RawChannel(Delegate* delegate, base::MessageLoopForIO* message_loop_for_io)
- : delegate_(delegate), message_loop_for_io_(message_loop_for_io) {}
+ // Return values of |[Schedule]Read()| and |[Schedule]WriteNoLock()|.
+ enum IOResult {
+ IO_SUCCEEDED,
+ IO_FAILED,
+ IO_PENDING
+ };
+
+ class ReadBuffer {
+ public:
+ ReadBuffer();
+ ~ReadBuffer();
+
+ void GetBuffer(char** addr, size_t* size);
+
+ private:
+ friend class RawChannel;
+
+ // We store data from |[Schedule]Read()|s in |buffer_|. The start of
+ // |buffer_| is always aligned with a message boundary (we will copy memory
+ // to ensure this), but |buffer_| may be larger than the actual number of
+ // bytes we have.
+ std::vector<char> buffer_;
+ size_t num_valid_bytes_;
+
+ DISALLOW_COPY_AND_ASSIGN(ReadBuffer);
+ };
+
+ class WriteBuffer {
+ public:
+ struct Buffer {
+ const char* addr;
+ size_t size;
+ };
+
+ WriteBuffer();
+ ~WriteBuffer();
+
+ void GetBuffers(std::vector<Buffer>* buffers) const;
+ // Returns the total size of all buffers returned by |GetBuffers()|.
+ size_t GetTotalBytesToWrite() const;
+
+ private:
+ friend class RawChannel;
+
+ // TODO(vtl): When C++11 is available, switch this to a deque of
+ // |scoped_ptr|/|unique_ptr|s.
+ std::deque<MessageInTransit*> message_queue_;
+ // The first message may have been partially sent. |offset_| indicates the
+ // position in the first message where to start the next write.
+ size_t offset_;
+
+ DISALLOW_COPY_AND_ASSIGN(WriteBuffer);
+ };
+
+ RawChannel(Delegate* delegate, base::MessageLoopForIO* message_loop_for_io);
- Delegate* delegate() { return delegate_; }
base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; }
+ base::Lock& write_lock() { return write_lock_; }
+
+ // Only accessed on the I/O thread.
+ ReadBuffer* read_buffer();
+
+ // Only accessed under |write_lock_|.
+ WriteBuffer* write_buffer_no_lock();
+
+ // Reads into |read_buffer()|.
+ // This class guarantees that:
+ // - the area indicated by |GetBuffer()| will stay valid until read completion
+ // (but please also see the comments for |OnShutdownNoLock()|);
+ // - a second read is not started if there is a pending read;
+ // - the method is called on the I/O thread WITHOUT |write_lock_| held.
+ //
+ // The implementing subclass must guarantee that:
+ // - |bytes_read| is untouched if the method returns values other than
+ // IO_SUCCEEDED;
+ // - if the method returns IO_PENDING, |OnReadCompleted()| will be called on
+ // the I/O thread to report the result, unless |Shutdown()| is called.
+ virtual IOResult Read(size_t* bytes_read) = 0;
+ // Similar to |Read()|, except that the implementing subclass must also
+ // guarantee that the method doesn't succeed synchronously, i.e., it only
+ // returns IO_FAILED or IO_PENDING.
+ virtual IOResult ScheduleRead() = 0;
+
+ // Writes contents in |write_buffer_no_lock()|.
+ // This class guarantees that:
+ // - the area indicated by |GetBuffers()| will stay valid until write
+ // completion (but please also see the comments for |OnShutdownNoLock()|);
+ // - a second write is not started if there is a pending write;
+ // - the method is called under |write_lock_|.
+ //
+ // The implementing subclass must guarantee that:
+ // - |bytes_written| is untouched if the method returns values other than
+ // IO_SUCCEEDED;
+ // - if the method returns IO_PENDING, |OnWriteCompleted()| will be called on
+ // the I/O thread to report the result, unless |Shutdown()| is called.
+ virtual IOResult WriteNoLock(size_t* bytes_written) = 0;
+ // Similar to |WriteNoLock()|, except that the implementing subclass must also
+ // guarantee that the method doesn't succeed synchronously, i.e., it only
+ // returns IO_FAILED or IO_PENDING.
+ virtual IOResult ScheduleWriteNoLock() = 0;
+
+ // Must be called on the I/O thread WITHOUT |write_lock_| held.
+ virtual bool OnInit() = 0;
+ // On shutdown, passes the ownership of the buffers to subclasses, who may
+ // want to preserve them if there are pending read/write.
+ // Must be called on the I/O thread under |write_lock_|.
+ virtual void OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) = 0;
+
+ // Must be called on the I/O thread WITHOUT |write_lock_| held.
+ void OnReadCompleted(bool result, size_t bytes_read);
+ // Must be called on the I/O thread WITHOUT |write_lock_| held.
+ void OnWriteCompleted(bool result, size_t bytes_written);
private:
+ // Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O
+ // thread WITHOUT |write_lock_| held.
+ void CallOnFatalError(Delegate::FatalError fatal_error);
+
+ // If |result| is true, updates the write buffer and schedules a write
+ // operation to run later if there are more contents to write. If |result| is
+ // false or any error occurs during the method execution, cancels pending
+ // writes and returns false.
+ // Must be called only if |write_stopped_| is false and under |write_lock_|.
+ bool OnWriteCompletedNoLock(bool result, size_t bytes_written);
+
Delegate* const delegate_;
base::MessageLoopForIO* const message_loop_for_io_;
+ // Only used on the I/O thread:
+ bool read_stopped_;
+ scoped_ptr<ReadBuffer> read_buffer_;
+
+ base::Lock write_lock_; // Protects the following members.
+ bool write_stopped_;
+ scoped_ptr<WriteBuffer> write_buffer_;
+
+ // This is used for posting tasks from write threads to the I/O thread. It
+ // must only be accessed under |write_lock_|. The weak pointers it produces
+ // are only used/invalidated on the I/O thread.
+ base::WeakPtrFactory<RawChannel> weak_ptr_factory_;
+
DISALLOW_COPY_AND_ASSIGN(RawChannel);
};
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index fa1a5ce..00a5448 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -5,13 +5,10 @@
#include "mojo/system/raw_channel.h"
#include <errno.h>
-#include <string.h>
#include <sys/uio.h>
#include <unistd.h>
#include <algorithm>
-#include <deque>
-#include <vector>
#include "base/basictypes.h"
#include "base/bind.h"
@@ -22,18 +19,14 @@
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/posix/eintr_wrapper.h"
-#include "base/stl_util.h"
#include "base/synchronization/lock.h"
#include "mojo/system/embedder/platform_handle.h"
-#include "mojo/system/message_in_transit.h"
namespace mojo {
namespace system {
namespace {
-const size_t kReadSize = 4096;
-
class RawChannelPosix : public RawChannel,
public base::MessageLoopForIO::Watcher {
public:
@@ -42,12 +35,17 @@ class RawChannelPosix : public RawChannel,
base::MessageLoopForIO* message_loop_for_io);
virtual ~RawChannelPosix();
+ private:
// |RawChannel| implementation:
- virtual bool Init() OVERRIDE;
- virtual void Shutdown() OVERRIDE;
- virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
+ virtual IOResult Read(size_t* bytes_read) OVERRIDE;
+ virtual IOResult ScheduleRead() OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
+ virtual IOResult ScheduleWriteNoLock() OVERRIDE;
+ virtual bool OnInit() OVERRIDE;
+ virtual void OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
- private:
// |base::MessageLoopForIO::Watcher| implementation:
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
@@ -55,40 +53,18 @@ class RawChannelPosix : public RawChannel,
// Watches for |fd_| to become writable. Must be called on the I/O thread.
void WaitToWrite();
- // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
- // thread WITHOUT |write_lock_| held.
- void CallOnFatalError(Delegate::FatalError fatal_error);
-
- // Writes the message at the front of |write_message_queue_|, starting at
- // |write_message_offset_|. It removes and destroys if the write completes and
- // otherwise updates |write_message_offset_|. Returns true on success. Must be
- // called under |write_lock_|.
- bool WriteFrontMessageNoLock();
-
- // Cancels all pending writes and destroys the contents of
- // |write_message_queue_|. Should only be called if |write_stopped_| is false;
- // sets |write_stopped_| to true. Must be called under |write_lock_|.
- void CancelPendingWritesNoLock();
-
embedder::ScopedPlatformHandle fd_;
- // Only used on the I/O thread:
+ // The following members are only used on the I/O thread:
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
- // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
- // is always aligned with a message boundary (we will copy memory to ensure
- // this), but |read_buffer_| may be larger than the actual number of bytes we
- // have.
- std::vector<char> read_buffer_;
- size_t read_buffer_num_valid_bytes_;
-
- base::Lock write_lock_; // Protects the following members.
- bool write_stopped_;
- // TODO(vtl): When C++11 is available, switch this to a deque of
- // |scoped_ptr|/|unique_ptr|s.
- std::deque<MessageInTransit*> write_message_queue_;
- size_t write_message_offset_;
+ bool pending_read_;
+
+ // The following members are used on multiple threads and protected by
+ // |write_lock()|:
+ bool pending_write_;
+
// This is used for posting tasks from write threads to the I/O thread. It
// must only be accessed under |write_lock_|. The weak pointers it produces
// are only used/invalidated on the I/O thread.
@@ -102,20 +78,17 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
base::MessageLoopForIO* message_loop_for_io)
: RawChannel(delegate, message_loop_for_io),
fd_(handle.Pass()),
- read_buffer_num_valid_bytes_(0),
- write_stopped_(false),
- write_message_offset_(0),
+ pending_read_(false),
+ pending_write_(false),
weak_ptr_factory_(this) {
- CHECK_EQ(RawChannel::message_loop_for_io()->type(),
- base::MessageLoop::TYPE_IO);
DCHECK(fd_.is_valid());
}
RawChannelPosix::~RawChannelPosix() {
- DCHECK(write_stopped_);
- DCHECK(!fd_.is_valid());
+ DCHECK(!pending_read_);
+ DCHECK(!pending_write_);
- // No need to take the |write_lock_| here -- if there are still weak pointers
+ // No need to take the |write_lock()| here -- if there are still weak pointers
// outstanding, then we're hosed anyway (since we wouldn't be able to
// invalidate them cleanly, since we might not be on the I/O thread).
DCHECK(!weak_ptr_factory_.HasWeakPtrs());
@@ -125,324 +98,211 @@ RawChannelPosix::~RawChannelPosix() {
DCHECK(!write_watcher_.get());
}
-bool RawChannelPosix::Init() {
+RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(!pending_read_);
- DCHECK(!read_watcher_.get());
- read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
- DCHECK(!write_watcher_.get());
- write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+ char* buffer = NULL;
+ size_t bytes_to_read = 0;
+ read_buffer()->GetBuffer(&buffer, &bytes_to_read);
- // No need to take the lock. No one should be using us yet.
- DCHECK(write_message_queue_.empty());
+ ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read));
- if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
- base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
- // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
- // (in the sense of returning the message loop's state to what it was before
- // it was called).
+ if (read_result >= 0) {
+ *bytes_read = static_cast<size_t>(read_result);
+ return IO_SUCCEEDED;
+ }
+
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "read";
+
+ // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
read_watcher_.reset();
- write_watcher_.reset();
- return false;
+
+ return IO_FAILED;
}
- return true;
+ return ScheduleRead();
}
-void RawChannelPosix::Shutdown() {
+RawChannel::IOResult RawChannelPosix::ScheduleRead() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(!pending_read_);
- base::AutoLock locker(write_lock_);
- if (!write_stopped_)
- CancelPendingWritesNoLock();
+ pending_read_ = true;
- read_watcher_.reset(); // This will stop watching (if necessary).
- write_watcher_.reset(); // This will stop watching (if necessary).
+ return IO_PENDING;
+}
- DCHECK(fd_.is_valid());
- fd_.reset();
+RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
+ write_lock().AssertAcquired();
- weak_ptr_factory_.InvalidateWeakPtrs();
-}
+ DCHECK(!pending_write_);
-// Reminder: This must be thread-safe, and takes ownership of |message|.
-bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) {
- base::AutoLock locker(write_lock_);
- if (write_stopped_)
- return false;
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
- if (!write_message_queue_.empty()) {
- write_message_queue_.push_back(message.release());
- return true;
+ ssize_t write_result = -1;
+ if (buffers.size() == 1) {
+ write_result = HANDLE_EINTR(
+ write(fd_.get().fd, buffers[0].addr, buffers[0].size));
+ } else {
+ // Note that using |writev()| is measurably slower than using |write()| --
+ // at least in a microbenchmark -- but much faster than using multiple
+ // |write()|s.
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
+ }
+
+ write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count));
}
- write_message_queue_.push_front(message.release());
- DCHECK_EQ(write_message_offset_, 0u);
- bool result = WriteFrontMessageNoLock();
- DCHECK(result || write_message_queue_.empty());
+ if (write_result >= 0) {
+ *bytes_written = static_cast<size_t>(write_result);
+ return IO_SUCCEEDED;
+ }
+
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "write of size "
+ << write_buffer_no_lock()->GetTotalBytesToWrite();
+ return IO_FAILED;
+ }
- if (!result) {
- // Even if we're on the I/O thread, don't call |OnFatalError()| in the
- // nested context.
+ return ScheduleWriteNoLock();
+}
+
+RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
+ write_lock().AssertAcquired();
+
+ DCHECK(!pending_write_);
+
+ // Set up to wait for the FD to become writable.
+ // If we're not on the I/O thread, we have to post a task to do this.
+ if (base::MessageLoop::current() != message_loop_for_io()) {
message_loop_for_io()->PostTask(
FROM_HERE,
- base::Bind(&RawChannelPosix::CallOnFatalError,
- weak_ptr_factory_.GetWeakPtr(),
- Delegate::FATAL_ERROR_FAILED_WRITE));
- } else if (!write_message_queue_.empty()) {
- // Set up to wait for the FD to become writable. If we're not on the I/O
- // thread, we have to post a task to do this.
- if (base::MessageLoop::current() == message_loop_for_io()) {
- WaitToWrite();
- } else {
- message_loop_for_io()->PostTask(
- FROM_HERE,
- base::Bind(&RawChannelPosix::WaitToWrite,
- weak_ptr_factory_.GetWeakPtr()));
- }
+ base::Bind(&RawChannelPosix::WaitToWrite,
+ weak_ptr_factory_.GetWeakPtr()));
+ pending_write_ = true;
+ return IO_PENDING;
+ }
+
+ if (message_loop_for_io()->WatchFileDescriptor(
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
+ write_watcher_.get(), this)) {
+ pending_write_ = true;
+ return IO_PENDING;
}
- return result;
+ return IO_FAILED;
}
-void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
- DCHECK_EQ(fd, fd_.get().fd);
+bool RawChannelPosix::OnInit() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- bool did_dispatch_message = false;
- // Tracks the offset of the first undispatched message in |read_buffer_|.
- // Currently, we copy data to ensure that this is zero at the beginning.
- size_t read_buffer_start = 0;
- for (;;) {
- if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
- < kReadSize) {
- // Use power-of-2 buffer sizes.
- // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
- // maximum message size to whatever extent necessary).
- // TODO(vtl): We may often be able to peek at the header and get the real
- // required extra space (which may be much bigger than |kReadSize|).
- size_t new_size = std::max(read_buffer_.size(), kReadSize);
- while (new_size <
- read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
- new_size *= 2;
-
- // TODO(vtl): It's suboptimal to zero out the fresh memory.
- read_buffer_.resize(new_size, 0);
- }
+ DCHECK(!read_watcher_.get());
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+ DCHECK(!write_watcher_.get());
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
- ssize_t bytes_read = HANDLE_EINTR(
- read(fd_.get().fd,
- &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
- kReadSize));
- if (bytes_read < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- PLOG(ERROR) << "read";
+ if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
+ // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
+ // (in the sense of returning the message loop's state to what it was before
+ // it was called).
+ read_watcher_.reset();
+ write_watcher_.reset();
+ return false;
+ }
- // Make sure that |OnFileCanReadWithoutBlocking()| won't be called
- // again.
- read_watcher_.reset();
+ return true;
+}
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
- return;
- }
+void RawChannelPosix::OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> /*read_buffer*/,
+ scoped_ptr<WriteBuffer> /*write_buffer*/) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ write_lock().AssertAcquired();
- break;
- }
+ read_watcher_.reset(); // This will stop watching (if necessary).
+ write_watcher_.reset(); // This will stop watching (if necessary).
- read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
-
- // Dispatch all the messages that we can.
- size_t message_size;
- // Note that we rely on short-circuit evaluation here:
- // - |read_buffer_start| may be an invalid index into |read_buffer_| if
- // |read_buffer_num_valid_bytes_| is zero.
- // - |message_size| is only valid if |GetNextMessageSize()| returns true.
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
- // next read).
- while (read_buffer_num_valid_bytes_ > 0 &&
- MessageInTransit::GetNextMessageSize(
- &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
- &message_size) &&
- read_buffer_num_valid_bytes_ >= message_size) {
- // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
- // some sort of "view" abstraction.
- MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
- &read_buffer_[read_buffer_start]);
- DCHECK_EQ(message.total_size(), message_size);
-
- // Dispatch the message.
- delegate()->OnReadMessage(message);
- if (!read_watcher_.get()) {
- // |Shutdown()| was called in |OnReadMessage()|.
- // TODO(vtl): Add test for this case.
- return;
- }
- did_dispatch_message = true;
-
- // Update our state.
- read_buffer_start += message_size;
- read_buffer_num_valid_bytes_ -= message_size;
- }
+ pending_read_ = false;
+ pending_write_ = false;
- // If we dispatched any messages, stop reading for now (and let the message
- // loop do its thing for another round).
- // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
- // a single message. Risks: slower, more complex if we want to avoid lots of
- // copying. ii. Keep reading until there's no more data and dispatch all the
- // messages we can. Risks: starvation of other users of the message loop.)
- if (did_dispatch_message)
- break;
+ DCHECK(fd_.is_valid());
+ fd_.reset();
- // If we didn't max out |kReadSize|, stop reading for now.
- if (static_cast<size_t>(bytes_read) < kReadSize)
- break;
+ weak_ptr_factory_.InvalidateWeakPtrs();
+}
- // Else try to read some more....
- }
+void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
+ DCHECK_EQ(fd, fd_.get().fd);
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- // Move data back to start.
- if (read_buffer_start > 0) {
- if (read_buffer_num_valid_bytes_ > 0) {
- memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
- read_buffer_num_valid_bytes_);
- }
- read_buffer_start = 0;
+ if (!pending_read_) {
+ NOTREACHED();
+ return;
}
+
+ pending_read_ = false;
+ size_t bytes_read = 0;
+ IOResult result = Read(&bytes_read);
+ if (result != IO_PENDING)
+ OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
+
+ // On failure, |read_watcher_| must have been reset; on success,
+ // we assume that |OnReadCompleted()| always schedules another read.
+ // Otherwise, we could end up spinning -- getting
+ // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
+ // read.
+ // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
+ // schedule a new read. But that code won't be reached under the current
+ // RawChannel implementation.
+ DCHECK(!read_watcher_.get() || pending_read_);
}
void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_.get().fd);
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- bool did_fail = false;
+ IOResult result = IO_FAILED;
+ size_t bytes_written = 0;
{
- base::AutoLock locker(write_lock_);
- DCHECK_EQ(write_stopped_, write_message_queue_.empty());
+ base::AutoLock locker(write_lock());
- if (write_stopped_) {
- write_watcher_.reset();
- return;
- }
-
- bool result = WriteFrontMessageNoLock();
- DCHECK(result || write_message_queue_.empty());
+ DCHECK(pending_write_);
- if (!result) {
- did_fail = true;
- write_watcher_.reset();
- } else if (!write_message_queue_.empty()) {
- WaitToWrite();
- }
+ pending_write_ = false;
+ result = WriteNoLock(&bytes_written);
}
- if (did_fail)
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
+
+ if (result != IO_PENDING)
+ OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
}
void RawChannelPosix::WaitToWrite() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(write_watcher_.get());
- bool result = message_loop_for_io()->WatchFileDescriptor(
- fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
- write_watcher_.get(), this);
- DCHECK(result);
-}
-void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
- delegate()->OnFatalError(fatal_error);
-}
+ if (!message_loop_for_io()->WatchFileDescriptor(
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
+ write_watcher_.get(), this)) {
+ {
+ base::AutoLock locker(write_lock());
-// TODO(vtl): This will collide with yzshen's work. This is just a hacky,
-// minimally-invasive function that does what I want (write/resume writing a
-// |MessageInTransit| that may consist of more than one buffer).
-ssize_t WriteMessageInTransit(int fd,
- MessageInTransit* message,
- size_t offset,
- size_t* bytes_to_write) {
- *bytes_to_write = message->total_size() - offset;
- if (!message->secondary_buffer_size()) {
- // Only write from the main buffer.
- DCHECK_LT(offset, message->main_buffer_size());
- DCHECK_LE(*bytes_to_write, message->main_buffer_size());
- return HANDLE_EINTR(
- write(fd,
- static_cast<const char*>(message->main_buffer()) + offset,
- *bytes_to_write));
- }
- if (offset >= message->main_buffer_size()) {
- // Only write from the secondary buffer.
- DCHECK_LT(offset - message->main_buffer_size(),
- message->secondary_buffer_size());
- DCHECK_LE(*bytes_to_write, message->secondary_buffer_size());
- return HANDLE_EINTR(
- write(fd,
- static_cast<const char*>(message->secondary_buffer()) +
- (offset - message->main_buffer_size()),
- *bytes_to_write));
- }
- // Write from both buffers. (Note that using |writev()| is measurably slower
- // than using |write()| -- at least in a microbenchmark -- but much faster
- // than using two |write()|s.)
- DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset +
- message->secondary_buffer_size());
- struct iovec iov[2] = {
- { const_cast<char*>(
- static_cast<const char*>(message->main_buffer()) + offset),
- message->main_buffer_size() - offset },
- { const_cast<void*>(message->secondary_buffer()),
- message->secondary_buffer_size() }
- };
- return HANDLE_EINTR(writev(fd, iov, 2));
-}
-
-bool RawChannelPosix::WriteFrontMessageNoLock() {
- write_lock_.AssertAcquired();
-
- DCHECK(!write_stopped_);
- DCHECK(!write_message_queue_.empty());
-
- MessageInTransit* message = write_message_queue_.front();
- DCHECK_LT(write_message_offset_, message->total_size());
- size_t bytes_to_write;
- ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd,
- message,
- write_message_offset_,
- &bytes_to_write);
- if (bytes_written < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- PLOG(ERROR) << "write of size " << bytes_to_write;
- CancelPendingWritesNoLock();
- return false;
+ DCHECK(pending_write_);
+ pending_write_ = false;
}
-
- // We simply failed to write since we'd block. The logic is the same as if
- // we got a partial write.
- bytes_written = 0;
- }
-
- DCHECK_GE(bytes_written, 0);
- if (static_cast<size_t>(bytes_written) < bytes_to_write) {
- // Partial (or no) write.
- write_message_offset_ += static_cast<size_t>(bytes_written);
- } else {
- // Complete write.
- DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
- write_message_queue_.pop_front();
- delete message;
- write_message_offset_ = 0;
+ OnWriteCompleted(false, 0);
}
-
- return true;
-}
-
-void RawChannelPosix::CancelPendingWritesNoLock() {
- write_lock_.AssertAcquired();
- DCHECK(!write_stopped_);
-
- write_stopped_ = true;
- STLDeleteElements(&write_message_queue_);
}
} // namespace