diff options
author | yzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-27 18:13:25 +0000 |
---|---|---|
committer | yzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-27 18:13:25 +0000 |
commit | 4976265023cd8aa03dcc65a0958b9e4ad778ab75 (patch) | |
tree | f1d88f9b4d09c39b419b612e40b218bbaad3a59b /mojo | |
parent | 06bd7ea902f66a54a410184b8f78bc30b4b306b6 (diff) | |
download | chromium_src-4976265023cd8aa03dcc65a0958b9e4ad778ab75.zip chromium_src-4976265023cd8aa03dcc65a0958b9e4ad778ab75.tar.gz chromium_src-4976265023cd8aa03dcc65a0958b9e4ad778ab75.tar.bz2 |
Restructure RawChannelPosix and RawChannel, so that the platform-independent logic can be shared between POSIX and Windows.
BUG=None
TEST=mojo_system_unittests
R=viettrungluu@chromium.org
Review URL: https://codereview.chromium.org/169723004
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253865 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/mojo.gyp | 1 | ||||
-rw-r--r-- | mojo/system/raw_channel.cc | 344 | ||||
-rw-r--r-- | mojo/system/raw_channel.h | 150 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 478 |
4 files changed, 657 insertions, 316 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp index 82cd8ab..8074028 100644 --- a/mojo/mojo.gyp +++ b/mojo/mojo.gyp @@ -138,6 +138,7 @@ 'system/message_pipe_endpoint.h', 'system/proxy_message_pipe_endpoint.cc', 'system/proxy_message_pipe_endpoint.h', + 'system/raw_channel.cc', 'system/raw_channel.h', 'system/raw_channel_posix.cc', 'system/raw_channel_win.cc', diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc new file mode 100644 index 0000000..5d8da3b --- /dev/null +++ b/mojo/system/raw_channel.cc @@ -0,0 +1,344 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/raw_channel.h" + +#include <string.h> + +#include <algorithm> + +#include "base/bind.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "base/stl_util.h" +#include "mojo/system/message_in_transit.h" + +namespace mojo { +namespace system { + +const size_t kReadSize = 4096; + +RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { +} + +RawChannel::ReadBuffer::~ReadBuffer() {} + +void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { + DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); + *addr = &buffer_[0] + num_valid_bytes_; + *size = kReadSize; +} + +RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} + +RawChannel::WriteBuffer::~WriteBuffer() { + STLDeleteElements(&message_queue_); +} + +void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { + buffers->clear(); + + size_t bytes_to_write = GetTotalBytesToWrite(); + if (bytes_to_write == 0) + return; + + MessageInTransit* message = message_queue_.front(); + if (!message->secondary_buffer_size()) { + // Only write from the main buffer. + DCHECK_LT(offset_, message->main_buffer_size()); + DCHECK_LE(bytes_to_write, message->main_buffer_size()); + Buffer buffer = { + static_cast<const char*>(message->main_buffer()) + offset_, + bytes_to_write}; + buffers->push_back(buffer); + return; + } + + if (offset_ >= message->main_buffer_size()) { + // Only write from the secondary buffer. + DCHECK_LT(offset_ - message->main_buffer_size(), + message->secondary_buffer_size()); + DCHECK_LE(bytes_to_write, message->secondary_buffer_size()); + Buffer buffer = { + static_cast<const char*>(message->secondary_buffer()) + + (offset_ - message->main_buffer_size()), + bytes_to_write}; + buffers->push_back(buffer); + return; + } + + // Write from both buffers. + DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + + message->secondary_buffer_size()); + Buffer buffer1 = { + static_cast<const char*>(message->main_buffer()) + offset_, + message->main_buffer_size() - offset_}; + buffers->push_back(buffer1); + Buffer buffer2 = { + static_cast<const char*>(message->secondary_buffer()), + message->secondary_buffer_size()}; + buffers->push_back(buffer2); +} + +size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { + if (message_queue_.empty()) + return 0; + + MessageInTransit* message = message_queue_.front(); + DCHECK_LT(offset_, message->total_size()); + return message->total_size() - offset_; +} + +RawChannel::RawChannel(Delegate* delegate, + base::MessageLoopForIO* message_loop_for_io) + : delegate_(delegate), + message_loop_for_io_(message_loop_for_io), + read_stopped_(false), + write_stopped_(false), + weak_ptr_factory_(this) { +} + +RawChannel::~RawChannel() { + DCHECK(!read_buffer_); + DCHECK(!write_buffer_); + + // No need to take the |write_lock_| here -- if there are still weak pointers + // outstanding, then we're hosed anyway (since we wouldn't be able to + // invalidate them cleanly, since we might not be on the I/O thread). + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); +} + +bool RawChannel::Init() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + + // No need to take the lock. No one should be using us yet. + DCHECK(!read_buffer_); + read_buffer_.reset(new ReadBuffer); + DCHECK(!write_buffer_); + write_buffer_.reset(new WriteBuffer); + + if (!OnInit()) + return false; + + return ScheduleRead() == IO_PENDING; +} + +void RawChannel::Shutdown() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + + base::AutoLock locker(write_lock_); + + weak_ptr_factory_.InvalidateWeakPtrs(); + + read_stopped_ = true; + write_stopped_ = true; + + OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); +} + +// Reminder: This must be thread-safe. +bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { + base::AutoLock locker(write_lock_); + if (write_stopped_) + return false; + + if (!write_buffer_->message_queue_.empty()) { + write_buffer_->message_queue_.push_back(message.release()); + return true; + } + + write_buffer_->message_queue_.push_front(message.release()); + DCHECK_EQ(write_buffer_->offset_, 0u); + + size_t bytes_written = 0; + IOResult io_result = WriteNoLock(&bytes_written); + if (io_result == IO_PENDING) + return true; + + bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, + bytes_written); + if (!result) { + // Even if we're on the I/O thread, don't call |OnFatalError()| in the + // nested context. + message_loop_for_io_->PostTask( + FROM_HERE, + base::Bind(&RawChannel::CallOnFatalError, + weak_ptr_factory_.GetWeakPtr(), + Delegate::FATAL_ERROR_FAILED_WRITE)); + } + + return result; +} + +RawChannel::ReadBuffer* RawChannel::read_buffer() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + return read_buffer_.get(); +} + +RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() { + write_lock_.AssertAcquired(); + return write_buffer_.get(); +} + +void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + + if (read_stopped_) { + NOTREACHED(); + return; + } + + IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; + + // Keep reading data in a loop, and dispatches messages if enough data is + // received. Exit the loop if any of the following happens: + // - one or more messages were dispatched; + // - the last read failed, was a partial read or would block; + // - |Shutdown()| was called. + do { + if (io_result != IO_SUCCEEDED) { + read_stopped_ = true; + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); + return; + } + + read_buffer_->num_valid_bytes_ += bytes_read; + + // Dispatch all the messages that we can. + bool did_dispatch_message = false; + // Tracks the offset of the first undispatched message in |read_buffer_|. + // Currently, we copy data to ensure that this is zero at the beginning. + size_t read_buffer_start = 0; + size_t remaining_bytes = read_buffer_->num_valid_bytes_; + size_t message_size; + // Note that we rely on short-circuit evaluation here: + // - |read_buffer_start| may be an invalid index into + // |read_buffer_->buffer_| if |remaining_bytes| is zero. + // - |message_size| is only valid if |GetNextMessageSize()| returns true. + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the + // next read). + while (remaining_bytes > 0 && + MessageInTransit::GetNextMessageSize( + &read_buffer_->buffer_[read_buffer_start], remaining_bytes, + &message_size) && + remaining_bytes >= message_size) { + // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with + // some sort of "view" abstraction. + MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, + &read_buffer_->buffer_[read_buffer_start]); + DCHECK_EQ(message.total_size(), message_size); + + // Dispatch the message. + delegate_->OnReadMessage(message); + if (read_stopped_) { + // |Shutdown()| was called in |OnReadMessage()|. + // TODO(vtl): Add test for this case. + return; + } + did_dispatch_message = true; + + // Update our state. + read_buffer_start += message_size; + remaining_bytes -= message_size; + } + + if (read_buffer_start > 0) { + // Move data back to start. + read_buffer_->num_valid_bytes_ = remaining_bytes; + if (read_buffer_->num_valid_bytes_ > 0) { + memmove(&read_buffer_->buffer_[0], + &read_buffer_->buffer_[read_buffer_start], remaining_bytes); + } + read_buffer_start = 0; + } + + if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < + kReadSize) { + // Use power-of-2 buffer sizes. + // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the + // maximum message size to whatever extent necessary). + // TODO(vtl): We may often be able to peek at the header and get the real + // required extra space (which may be much bigger than |kReadSize|). + size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); + while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) + new_size *= 2; + + // TODO(vtl): It's suboptimal to zero out the fresh memory. + read_buffer_->buffer_.resize(new_size, 0); + } + + // (1) If we dispatched any messages, stop reading for now (and let the + // message loop do its thing for another round). + // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only + // a single message. Risks: slower, more complex if we want to avoid lots of + // copying. ii. Keep reading until there's no more data and dispatch all the + // messages we can. Risks: starvation of other users of the message loop.) + // (2) If we didn't max out |kReadSize|, stop reading for now. + bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; + bytes_read = 0; + io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); + } while (io_result != IO_PENDING); +} + +void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + + bool did_fail = false; + { + base::AutoLock locker(write_lock_); + DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); + + if (write_stopped_) { + NOTREACHED(); + return; + } + + did_fail = !OnWriteCompletedNoLock(result, bytes_written); + } + + if (did_fail) + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); +} + +void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? + delegate_->OnFatalError(fatal_error); +} + +bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { + write_lock_.AssertAcquired(); + + DCHECK(!write_stopped_); + DCHECK(!write_buffer_->message_queue_.empty()); + + if (result) { + if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { + // Partial (or no) write. + write_buffer_->offset_ += bytes_written; + } else { + // Complete write. + DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite()); + delete write_buffer_->message_queue_.front(); + write_buffer_->message_queue_.pop_front(); + write_buffer_->offset_ = 0; + } + + if (write_buffer_->message_queue_.empty()) + return true; + + // Schedule the next write. + if (ScheduleWriteNoLock() == IO_PENDING) + return true; + } + + write_stopped_ = true; + STLDeleteElements(&write_buffer_->message_queue_); + write_buffer_->offset_ = 0; + return false; +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h index 4b14edd..4b0f0f4 100644 --- a/mojo/system/raw_channel.h +++ b/mojo/system/raw_channel.h @@ -5,10 +5,13 @@ #ifndef MOJO_SYSTEM_RAW_CHANNEL_H_ #define MOJO_SYSTEM_RAW_CHANNEL_H_ +#include <deque> #include <vector> #include "base/macros.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/synchronization/lock.h" #include "mojo/system/constants.h" #include "mojo/system/embedder/scoped_platform_handle.h" #include "mojo/system/system_impl_export.h" @@ -39,7 +42,7 @@ class MessageInTransit; // on which |Init()| is called). class MOJO_SYSTEM_IMPL_EXPORT RawChannel { public: - virtual ~RawChannel() {} + virtual ~RawChannel(); // The |Delegate| is only accessed on the same thread as the message loop // (passed in on creation). @@ -78,26 +81,159 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // This must be called (on an I/O thread) before this object is used. Returns // true on success. On failure, |Shutdown()| should *not* be called. - virtual bool Init() = 0; + bool Init(); // This must be called (on the I/O thread) before this object is destroyed. - virtual void Shutdown() = 0; + void Shutdown(); // This is thread-safe. It takes ownership of |message| (always, even on // failure). Returns true on success. - virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) = 0; + bool WriteMessage(scoped_ptr<MessageInTransit> message); protected: - RawChannel(Delegate* delegate, base::MessageLoopForIO* message_loop_for_io) - : delegate_(delegate), message_loop_for_io_(message_loop_for_io) {} + // Return values of |[Schedule]Read()| and |[Schedule]WriteNoLock()|. + enum IOResult { + IO_SUCCEEDED, + IO_FAILED, + IO_PENDING + }; + + class ReadBuffer { + public: + ReadBuffer(); + ~ReadBuffer(); + + void GetBuffer(char** addr, size_t* size); + + private: + friend class RawChannel; + + // We store data from |[Schedule]Read()|s in |buffer_|. The start of + // |buffer_| is always aligned with a message boundary (we will copy memory + // to ensure this), but |buffer_| may be larger than the actual number of + // bytes we have. + std::vector<char> buffer_; + size_t num_valid_bytes_; + + DISALLOW_COPY_AND_ASSIGN(ReadBuffer); + }; + + class WriteBuffer { + public: + struct Buffer { + const char* addr; + size_t size; + }; + + WriteBuffer(); + ~WriteBuffer(); + + void GetBuffers(std::vector<Buffer>* buffers) const; + // Returns the total size of all buffers returned by |GetBuffers()|. + size_t GetTotalBytesToWrite() const; + + private: + friend class RawChannel; + + // TODO(vtl): When C++11 is available, switch this to a deque of + // |scoped_ptr|/|unique_ptr|s. + std::deque<MessageInTransit*> message_queue_; + // The first message may have been partially sent. |offset_| indicates the + // position in the first message where to start the next write. + size_t offset_; + + DISALLOW_COPY_AND_ASSIGN(WriteBuffer); + }; + + RawChannel(Delegate* delegate, base::MessageLoopForIO* message_loop_for_io); - Delegate* delegate() { return delegate_; } base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; } + base::Lock& write_lock() { return write_lock_; } + + // Only accessed on the I/O thread. + ReadBuffer* read_buffer(); + + // Only accessed under |write_lock_|. + WriteBuffer* write_buffer_no_lock(); + + // Reads into |read_buffer()|. + // This class guarantees that: + // - the area indicated by |GetBuffer()| will stay valid until read completion + // (but please also see the comments for |OnShutdownNoLock()|); + // - a second read is not started if there is a pending read; + // - the method is called on the I/O thread WITHOUT |write_lock_| held. + // + // The implementing subclass must guarantee that: + // - |bytes_read| is untouched if the method returns values other than + // IO_SUCCEEDED; + // - if the method returns IO_PENDING, |OnReadCompleted()| will be called on + // the I/O thread to report the result, unless |Shutdown()| is called. + virtual IOResult Read(size_t* bytes_read) = 0; + // Similar to |Read()|, except that the implementing subclass must also + // guarantee that the method doesn't succeed synchronously, i.e., it only + // returns IO_FAILED or IO_PENDING. + virtual IOResult ScheduleRead() = 0; + + // Writes contents in |write_buffer_no_lock()|. + // This class guarantees that: + // - the area indicated by |GetBuffers()| will stay valid until write + // completion (but please also see the comments for |OnShutdownNoLock()|); + // - a second write is not started if there is a pending write; + // - the method is called under |write_lock_|. + // + // The implementing subclass must guarantee that: + // - |bytes_written| is untouched if the method returns values other than + // IO_SUCCEEDED; + // - if the method returns IO_PENDING, |OnWriteCompleted()| will be called on + // the I/O thread to report the result, unless |Shutdown()| is called. + virtual IOResult WriteNoLock(size_t* bytes_written) = 0; + // Similar to |WriteNoLock()|, except that the implementing subclass must also + // guarantee that the method doesn't succeed synchronously, i.e., it only + // returns IO_FAILED or IO_PENDING. + virtual IOResult ScheduleWriteNoLock() = 0; + + // Must be called on the I/O thread WITHOUT |write_lock_| held. + virtual bool OnInit() = 0; + // On shutdown, passes the ownership of the buffers to subclasses, who may + // want to preserve them if there are pending read/write. + // Must be called on the I/O thread under |write_lock_|. + virtual void OnShutdownNoLock( + scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) = 0; + + // Must be called on the I/O thread WITHOUT |write_lock_| held. + void OnReadCompleted(bool result, size_t bytes_read); + // Must be called on the I/O thread WITHOUT |write_lock_| held. + void OnWriteCompleted(bool result, size_t bytes_written); private: + // Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O + // thread WITHOUT |write_lock_| held. + void CallOnFatalError(Delegate::FatalError fatal_error); + + // If |result| is true, updates the write buffer and schedules a write + // operation to run later if there are more contents to write. If |result| is + // false or any error occurs during the method execution, cancels pending + // writes and returns false. + // Must be called only if |write_stopped_| is false and under |write_lock_|. + bool OnWriteCompletedNoLock(bool result, size_t bytes_written); + Delegate* const delegate_; base::MessageLoopForIO* const message_loop_for_io_; + // Only used on the I/O thread: + bool read_stopped_; + scoped_ptr<ReadBuffer> read_buffer_; + + base::Lock write_lock_; // Protects the following members. + bool write_stopped_; + scoped_ptr<WriteBuffer> write_buffer_; + + // This is used for posting tasks from write threads to the I/O thread. It + // must only be accessed under |write_lock_|. The weak pointers it produces + // are only used/invalidated on the I/O thread. + base::WeakPtrFactory<RawChannel> weak_ptr_factory_; + DISALLOW_COPY_AND_ASSIGN(RawChannel); }; diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index fa1a5ce..00a5448 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -5,13 +5,10 @@ #include "mojo/system/raw_channel.h" #include <errno.h> -#include <string.h> #include <sys/uio.h> #include <unistd.h> #include <algorithm> -#include <deque> -#include <vector> #include "base/basictypes.h" #include "base/bind.h" @@ -22,18 +19,14 @@ #include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" #include "base/posix/eintr_wrapper.h" -#include "base/stl_util.h" #include "base/synchronization/lock.h" #include "mojo/system/embedder/platform_handle.h" -#include "mojo/system/message_in_transit.h" namespace mojo { namespace system { namespace { -const size_t kReadSize = 4096; - class RawChannelPosix : public RawChannel, public base::MessageLoopForIO::Watcher { public: @@ -42,12 +35,17 @@ class RawChannelPosix : public RawChannel, base::MessageLoopForIO* message_loop_for_io); virtual ~RawChannelPosix(); + private: // |RawChannel| implementation: - virtual bool Init() OVERRIDE; - virtual void Shutdown() OVERRIDE; - virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; + virtual IOResult Read(size_t* bytes_read) OVERRIDE; + virtual IOResult ScheduleRead() OVERRIDE; + virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; + virtual IOResult ScheduleWriteNoLock() OVERRIDE; + virtual bool OnInit() OVERRIDE; + virtual void OnShutdownNoLock( + scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; - private: // |base::MessageLoopForIO::Watcher| implementation: virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; @@ -55,40 +53,18 @@ class RawChannelPosix : public RawChannel, // Watches for |fd_| to become writable. Must be called on the I/O thread. void WaitToWrite(); - // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O - // thread WITHOUT |write_lock_| held. - void CallOnFatalError(Delegate::FatalError fatal_error); - - // Writes the message at the front of |write_message_queue_|, starting at - // |write_message_offset_|. It removes and destroys if the write completes and - // otherwise updates |write_message_offset_|. Returns true on success. Must be - // called under |write_lock_|. - bool WriteFrontMessageNoLock(); - - // Cancels all pending writes and destroys the contents of - // |write_message_queue_|. Should only be called if |write_stopped_| is false; - // sets |write_stopped_| to true. Must be called under |write_lock_|. - void CancelPendingWritesNoLock(); - embedder::ScopedPlatformHandle fd_; - // Only used on the I/O thread: + // The following members are only used on the I/O thread: scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; - // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| - // is always aligned with a message boundary (we will copy memory to ensure - // this), but |read_buffer_| may be larger than the actual number of bytes we - // have. - std::vector<char> read_buffer_; - size_t read_buffer_num_valid_bytes_; - - base::Lock write_lock_; // Protects the following members. - bool write_stopped_; - // TODO(vtl): When C++11 is available, switch this to a deque of - // |scoped_ptr|/|unique_ptr|s. - std::deque<MessageInTransit*> write_message_queue_; - size_t write_message_offset_; + bool pending_read_; + + // The following members are used on multiple threads and protected by + // |write_lock()|: + bool pending_write_; + // This is used for posting tasks from write threads to the I/O thread. It // must only be accessed under |write_lock_|. The weak pointers it produces // are only used/invalidated on the I/O thread. @@ -102,20 +78,17 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, base::MessageLoopForIO* message_loop_for_io) : RawChannel(delegate, message_loop_for_io), fd_(handle.Pass()), - read_buffer_num_valid_bytes_(0), - write_stopped_(false), - write_message_offset_(0), + pending_read_(false), + pending_write_(false), weak_ptr_factory_(this) { - CHECK_EQ(RawChannel::message_loop_for_io()->type(), - base::MessageLoop::TYPE_IO); DCHECK(fd_.is_valid()); } RawChannelPosix::~RawChannelPosix() { - DCHECK(write_stopped_); - DCHECK(!fd_.is_valid()); + DCHECK(!pending_read_); + DCHECK(!pending_write_); - // No need to take the |write_lock_| here -- if there are still weak pointers + // No need to take the |write_lock()| here -- if there are still weak pointers // outstanding, then we're hosed anyway (since we wouldn't be able to // invalidate them cleanly, since we might not be on the I/O thread). DCHECK(!weak_ptr_factory_.HasWeakPtrs()); @@ -125,324 +98,211 @@ RawChannelPosix::~RawChannelPosix() { DCHECK(!write_watcher_.get()); } -bool RawChannelPosix::Init() { +RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(!pending_read_); - DCHECK(!read_watcher_.get()); - read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); - DCHECK(!write_watcher_.get()); - write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + char* buffer = NULL; + size_t bytes_to_read = 0; + read_buffer()->GetBuffer(&buffer, &bytes_to_read); - // No need to take the lock. No one should be using us yet. - DCHECK(write_message_queue_.empty()); + ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read)); - if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, - base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { - // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly - // (in the sense of returning the message loop's state to what it was before - // it was called). + if (read_result >= 0) { + *bytes_read = static_cast<size_t>(read_result); + return IO_SUCCEEDED; + } + + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "read"; + + // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. read_watcher_.reset(); - write_watcher_.reset(); - return false; + + return IO_FAILED; } - return true; + return ScheduleRead(); } -void RawChannelPosix::Shutdown() { +RawChannel::IOResult RawChannelPosix::ScheduleRead() { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(!pending_read_); - base::AutoLock locker(write_lock_); - if (!write_stopped_) - CancelPendingWritesNoLock(); + pending_read_ = true; - read_watcher_.reset(); // This will stop watching (if necessary). - write_watcher_.reset(); // This will stop watching (if necessary). + return IO_PENDING; +} - DCHECK(fd_.is_valid()); - fd_.reset(); +RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { + write_lock().AssertAcquired(); - weak_ptr_factory_.InvalidateWeakPtrs(); -} + DCHECK(!pending_write_); -// Reminder: This must be thread-safe, and takes ownership of |message|. -bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { - base::AutoLock locker(write_lock_); - if (write_stopped_) - return false; + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); - if (!write_message_queue_.empty()) { - write_message_queue_.push_back(message.release()); - return true; + ssize_t write_result = -1; + if (buffers.size() == 1) { + write_result = HANDLE_EINTR( + write(fd_.get().fd, buffers[0].addr, buffers[0].size)); + } else { + // Note that using |writev()| is measurably slower than using |write()| -- + // at least in a microbenchmark -- but much faster than using multiple + // |write()|s. + const size_t kMaxBufferCount = 10; + iovec iov[kMaxBufferCount]; + size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); + + for (size_t i = 0; i < buffer_count; ++i) { + iov[i].iov_base = const_cast<char*>(buffers[i].addr); + iov[i].iov_len = buffers[i].size; + } + + write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count)); } - write_message_queue_.push_front(message.release()); - DCHECK_EQ(write_message_offset_, 0u); - bool result = WriteFrontMessageNoLock(); - DCHECK(result || write_message_queue_.empty()); + if (write_result >= 0) { + *bytes_written = static_cast<size_t>(write_result); + return IO_SUCCEEDED; + } + + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "write of size " + << write_buffer_no_lock()->GetTotalBytesToWrite(); + return IO_FAILED; + } - if (!result) { - // Even if we're on the I/O thread, don't call |OnFatalError()| in the - // nested context. + return ScheduleWriteNoLock(); +} + +RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { + write_lock().AssertAcquired(); + + DCHECK(!pending_write_); + + // Set up to wait for the FD to become writable. + // If we're not on the I/O thread, we have to post a task to do this. + if (base::MessageLoop::current() != message_loop_for_io()) { message_loop_for_io()->PostTask( FROM_HERE, - base::Bind(&RawChannelPosix::CallOnFatalError, - weak_ptr_factory_.GetWeakPtr(), - Delegate::FATAL_ERROR_FAILED_WRITE)); - } else if (!write_message_queue_.empty()) { - // Set up to wait for the FD to become writable. If we're not on the I/O - // thread, we have to post a task to do this. - if (base::MessageLoop::current() == message_loop_for_io()) { - WaitToWrite(); - } else { - message_loop_for_io()->PostTask( - FROM_HERE, - base::Bind(&RawChannelPosix::WaitToWrite, - weak_ptr_factory_.GetWeakPtr())); - } + base::Bind(&RawChannelPosix::WaitToWrite, + weak_ptr_factory_.GetWeakPtr())); + pending_write_ = true; + return IO_PENDING; + } + + if (message_loop_for_io()->WatchFileDescriptor( + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, + write_watcher_.get(), this)) { + pending_write_ = true; + return IO_PENDING; } - return result; + return IO_FAILED; } -void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { - DCHECK_EQ(fd, fd_.get().fd); +bool RawChannelPosix::OnInit() { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - bool did_dispatch_message = false; - // Tracks the offset of the first undispatched message in |read_buffer_|. - // Currently, we copy data to ensure that this is zero at the beginning. - size_t read_buffer_start = 0; - for (;;) { - if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) - < kReadSize) { - // Use power-of-2 buffer sizes. - // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the - // maximum message size to whatever extent necessary). - // TODO(vtl): We may often be able to peek at the header and get the real - // required extra space (which may be much bigger than |kReadSize|). - size_t new_size = std::max(read_buffer_.size(), kReadSize); - while (new_size < - read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) - new_size *= 2; - - // TODO(vtl): It's suboptimal to zero out the fresh memory. - read_buffer_.resize(new_size, 0); - } + DCHECK(!read_watcher_.get()); + read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + DCHECK(!write_watcher_.get()); + write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); - ssize_t bytes_read = HANDLE_EINTR( - read(fd_.get().fd, - &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], - kReadSize)); - if (bytes_read < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - PLOG(ERROR) << "read"; + if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, + base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { + // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly + // (in the sense of returning the message loop's state to what it was before + // it was called). + read_watcher_.reset(); + write_watcher_.reset(); + return false; + } - // Make sure that |OnFileCanReadWithoutBlocking()| won't be called - // again. - read_watcher_.reset(); + return true; +} - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); - return; - } +void RawChannelPosix::OnShutdownNoLock( + scoped_ptr<ReadBuffer> /*read_buffer*/, + scoped_ptr<WriteBuffer> /*write_buffer*/) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + write_lock().AssertAcquired(); - break; - } + read_watcher_.reset(); // This will stop watching (if necessary). + write_watcher_.reset(); // This will stop watching (if necessary). - read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); - - // Dispatch all the messages that we can. - size_t message_size; - // Note that we rely on short-circuit evaluation here: - // - |read_buffer_start| may be an invalid index into |read_buffer_| if - // |read_buffer_num_valid_bytes_| is zero. - // - |message_size| is only valid if |GetNextMessageSize()| returns true. - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the - // next read). - while (read_buffer_num_valid_bytes_ > 0 && - MessageInTransit::GetNextMessageSize( - &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, - &message_size) && - read_buffer_num_valid_bytes_ >= message_size) { - // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with - // some sort of "view" abstraction. - MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, - &read_buffer_[read_buffer_start]); - DCHECK_EQ(message.total_size(), message_size); - - // Dispatch the message. - delegate()->OnReadMessage(message); - if (!read_watcher_.get()) { - // |Shutdown()| was called in |OnReadMessage()|. - // TODO(vtl): Add test for this case. - return; - } - did_dispatch_message = true; - - // Update our state. - read_buffer_start += message_size; - read_buffer_num_valid_bytes_ -= message_size; - } + pending_read_ = false; + pending_write_ = false; - // If we dispatched any messages, stop reading for now (and let the message - // loop do its thing for another round). - // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only - // a single message. Risks: slower, more complex if we want to avoid lots of - // copying. ii. Keep reading until there's no more data and dispatch all the - // messages we can. Risks: starvation of other users of the message loop.) - if (did_dispatch_message) - break; + DCHECK(fd_.is_valid()); + fd_.reset(); - // If we didn't max out |kReadSize|, stop reading for now. - if (static_cast<size_t>(bytes_read) < kReadSize) - break; + weak_ptr_factory_.InvalidateWeakPtrs(); +} - // Else try to read some more.... - } +void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { + DCHECK_EQ(fd, fd_.get().fd); + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - // Move data back to start. - if (read_buffer_start > 0) { - if (read_buffer_num_valid_bytes_ > 0) { - memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], - read_buffer_num_valid_bytes_); - } - read_buffer_start = 0; + if (!pending_read_) { + NOTREACHED(); + return; } + + pending_read_ = false; + size_t bytes_read = 0; + IOResult result = Read(&bytes_read); + if (result != IO_PENDING) + OnReadCompleted(result == IO_SUCCEEDED, bytes_read); + + // On failure, |read_watcher_| must have been reset; on success, + // we assume that |OnReadCompleted()| always schedules another read. + // Otherwise, we could end up spinning -- getting + // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual + // read. + // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't + // schedule a new read. But that code won't be reached under the current + // RawChannel implementation. + DCHECK(!read_watcher_.get() || pending_read_); } void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { DCHECK_EQ(fd, fd_.get().fd); DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - bool did_fail = false; + IOResult result = IO_FAILED; + size_t bytes_written = 0; { - base::AutoLock locker(write_lock_); - DCHECK_EQ(write_stopped_, write_message_queue_.empty()); + base::AutoLock locker(write_lock()); - if (write_stopped_) { - write_watcher_.reset(); - return; - } - - bool result = WriteFrontMessageNoLock(); - DCHECK(result || write_message_queue_.empty()); + DCHECK(pending_write_); - if (!result) { - did_fail = true; - write_watcher_.reset(); - } else if (!write_message_queue_.empty()) { - WaitToWrite(); - } + pending_write_ = false; + result = WriteNoLock(&bytes_written); } - if (did_fail) - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); + + if (result != IO_PENDING) + OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); } void RawChannelPosix::WaitToWrite() { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); DCHECK(write_watcher_.get()); - bool result = message_loop_for_io()->WatchFileDescriptor( - fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, - write_watcher_.get(), this); - DCHECK(result); -} -void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { - DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? - delegate()->OnFatalError(fatal_error); -} + if (!message_loop_for_io()->WatchFileDescriptor( + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, + write_watcher_.get(), this)) { + { + base::AutoLock locker(write_lock()); -// TODO(vtl): This will collide with yzshen's work. This is just a hacky, -// minimally-invasive function that does what I want (write/resume writing a -// |MessageInTransit| that may consist of more than one buffer). -ssize_t WriteMessageInTransit(int fd, - MessageInTransit* message, - size_t offset, - size_t* bytes_to_write) { - *bytes_to_write = message->total_size() - offset; - if (!message->secondary_buffer_size()) { - // Only write from the main buffer. - DCHECK_LT(offset, message->main_buffer_size()); - DCHECK_LE(*bytes_to_write, message->main_buffer_size()); - return HANDLE_EINTR( - write(fd, - static_cast<const char*>(message->main_buffer()) + offset, - *bytes_to_write)); - } - if (offset >= message->main_buffer_size()) { - // Only write from the secondary buffer. - DCHECK_LT(offset - message->main_buffer_size(), - message->secondary_buffer_size()); - DCHECK_LE(*bytes_to_write, message->secondary_buffer_size()); - return HANDLE_EINTR( - write(fd, - static_cast<const char*>(message->secondary_buffer()) + - (offset - message->main_buffer_size()), - *bytes_to_write)); - } - // Write from both buffers. (Note that using |writev()| is measurably slower - // than using |write()| -- at least in a microbenchmark -- but much faster - // than using two |write()|s.) - DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset + - message->secondary_buffer_size()); - struct iovec iov[2] = { - { const_cast<char*>( - static_cast<const char*>(message->main_buffer()) + offset), - message->main_buffer_size() - offset }, - { const_cast<void*>(message->secondary_buffer()), - message->secondary_buffer_size() } - }; - return HANDLE_EINTR(writev(fd, iov, 2)); -} - -bool RawChannelPosix::WriteFrontMessageNoLock() { - write_lock_.AssertAcquired(); - - DCHECK(!write_stopped_); - DCHECK(!write_message_queue_.empty()); - - MessageInTransit* message = write_message_queue_.front(); - DCHECK_LT(write_message_offset_, message->total_size()); - size_t bytes_to_write; - ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd, - message, - write_message_offset_, - &bytes_to_write); - if (bytes_written < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - PLOG(ERROR) << "write of size " << bytes_to_write; - CancelPendingWritesNoLock(); - return false; + DCHECK(pending_write_); + pending_write_ = false; } - - // We simply failed to write since we'd block. The logic is the same as if - // we got a partial write. - bytes_written = 0; - } - - DCHECK_GE(bytes_written, 0); - if (static_cast<size_t>(bytes_written) < bytes_to_write) { - // Partial (or no) write. - write_message_offset_ += static_cast<size_t>(bytes_written); - } else { - // Complete write. - DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); - write_message_queue_.pop_front(); - delete message; - write_message_offset_ = 0; + OnWriteCompleted(false, 0); } - - return true; -} - -void RawChannelPosix::CancelPendingWritesNoLock() { - write_lock_.AssertAcquired(); - DCHECK(!write_stopped_); - - write_stopped_ = true; - STLDeleteElements(&write_message_queue_); } } // namespace |