summaryrefslogtreecommitdiffstats
path: root/mojo/system/raw_channel_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'mojo/system/raw_channel_posix.cc')
-rw-r--r--mojo/system/raw_channel_posix.cc478
1 files changed, 169 insertions, 309 deletions
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index fa1a5ce..00a5448 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -5,13 +5,10 @@
#include "mojo/system/raw_channel.h"
#include <errno.h>
-#include <string.h>
#include <sys/uio.h>
#include <unistd.h>
#include <algorithm>
-#include <deque>
-#include <vector>
#include "base/basictypes.h"
#include "base/bind.h"
@@ -22,18 +19,14 @@
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/posix/eintr_wrapper.h"
-#include "base/stl_util.h"
#include "base/synchronization/lock.h"
#include "mojo/system/embedder/platform_handle.h"
-#include "mojo/system/message_in_transit.h"
namespace mojo {
namespace system {
namespace {
-const size_t kReadSize = 4096;
-
class RawChannelPosix : public RawChannel,
public base::MessageLoopForIO::Watcher {
public:
@@ -42,12 +35,17 @@ class RawChannelPosix : public RawChannel,
base::MessageLoopForIO* message_loop_for_io);
virtual ~RawChannelPosix();
+ private:
// |RawChannel| implementation:
- virtual bool Init() OVERRIDE;
- virtual void Shutdown() OVERRIDE;
- virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
+ virtual IOResult Read(size_t* bytes_read) OVERRIDE;
+ virtual IOResult ScheduleRead() OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
+ virtual IOResult ScheduleWriteNoLock() OVERRIDE;
+ virtual bool OnInit() OVERRIDE;
+ virtual void OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
- private:
// |base::MessageLoopForIO::Watcher| implementation:
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
@@ -55,40 +53,18 @@ class RawChannelPosix : public RawChannel,
// Watches for |fd_| to become writable. Must be called on the I/O thread.
void WaitToWrite();
- // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
- // thread WITHOUT |write_lock_| held.
- void CallOnFatalError(Delegate::FatalError fatal_error);
-
- // Writes the message at the front of |write_message_queue_|, starting at
- // |write_message_offset_|. It removes and destroys if the write completes and
- // otherwise updates |write_message_offset_|. Returns true on success. Must be
- // called under |write_lock_|.
- bool WriteFrontMessageNoLock();
-
- // Cancels all pending writes and destroys the contents of
- // |write_message_queue_|. Should only be called if |write_stopped_| is false;
- // sets |write_stopped_| to true. Must be called under |write_lock_|.
- void CancelPendingWritesNoLock();
-
embedder::ScopedPlatformHandle fd_;
- // Only used on the I/O thread:
+ // The following members are only used on the I/O thread:
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
- // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
- // is always aligned with a message boundary (we will copy memory to ensure
- // this), but |read_buffer_| may be larger than the actual number of bytes we
- // have.
- std::vector<char> read_buffer_;
- size_t read_buffer_num_valid_bytes_;
-
- base::Lock write_lock_; // Protects the following members.
- bool write_stopped_;
- // TODO(vtl): When C++11 is available, switch this to a deque of
- // |scoped_ptr|/|unique_ptr|s.
- std::deque<MessageInTransit*> write_message_queue_;
- size_t write_message_offset_;
+ bool pending_read_;
+
+ // The following members are used on multiple threads and protected by
+ // |write_lock()|:
+ bool pending_write_;
+
// This is used for posting tasks from write threads to the I/O thread. It
// must only be accessed under |write_lock_|. The weak pointers it produces
// are only used/invalidated on the I/O thread.
@@ -102,20 +78,17 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
base::MessageLoopForIO* message_loop_for_io)
: RawChannel(delegate, message_loop_for_io),
fd_(handle.Pass()),
- read_buffer_num_valid_bytes_(0),
- write_stopped_(false),
- write_message_offset_(0),
+ pending_read_(false),
+ pending_write_(false),
weak_ptr_factory_(this) {
- CHECK_EQ(RawChannel::message_loop_for_io()->type(),
- base::MessageLoop::TYPE_IO);
DCHECK(fd_.is_valid());
}
RawChannelPosix::~RawChannelPosix() {
- DCHECK(write_stopped_);
- DCHECK(!fd_.is_valid());
+ DCHECK(!pending_read_);
+ DCHECK(!pending_write_);
- // No need to take the |write_lock_| here -- if there are still weak pointers
+ // No need to take the |write_lock()| here -- if there are still weak pointers
// outstanding, then we're hosed anyway (since we wouldn't be able to
// invalidate them cleanly, since we might not be on the I/O thread).
DCHECK(!weak_ptr_factory_.HasWeakPtrs());
@@ -125,324 +98,211 @@ RawChannelPosix::~RawChannelPosix() {
DCHECK(!write_watcher_.get());
}
-bool RawChannelPosix::Init() {
+RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(!pending_read_);
- DCHECK(!read_watcher_.get());
- read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
- DCHECK(!write_watcher_.get());
- write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+ char* buffer = NULL;
+ size_t bytes_to_read = 0;
+ read_buffer()->GetBuffer(&buffer, &bytes_to_read);
- // No need to take the lock. No one should be using us yet.
- DCHECK(write_message_queue_.empty());
+ ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read));
- if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
- base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
- // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
- // (in the sense of returning the message loop's state to what it was before
- // it was called).
+ if (read_result >= 0) {
+ *bytes_read = static_cast<size_t>(read_result);
+ return IO_SUCCEEDED;
+ }
+
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "read";
+
+ // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
read_watcher_.reset();
- write_watcher_.reset();
- return false;
+
+ return IO_FAILED;
}
- return true;
+ return ScheduleRead();
}
-void RawChannelPosix::Shutdown() {
+RawChannel::IOResult RawChannelPosix::ScheduleRead() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(!pending_read_);
- base::AutoLock locker(write_lock_);
- if (!write_stopped_)
- CancelPendingWritesNoLock();
+ pending_read_ = true;
- read_watcher_.reset(); // This will stop watching (if necessary).
- write_watcher_.reset(); // This will stop watching (if necessary).
+ return IO_PENDING;
+}
- DCHECK(fd_.is_valid());
- fd_.reset();
+RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
+ write_lock().AssertAcquired();
- weak_ptr_factory_.InvalidateWeakPtrs();
-}
+ DCHECK(!pending_write_);
-// Reminder: This must be thread-safe, and takes ownership of |message|.
-bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) {
- base::AutoLock locker(write_lock_);
- if (write_stopped_)
- return false;
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
- if (!write_message_queue_.empty()) {
- write_message_queue_.push_back(message.release());
- return true;
+ ssize_t write_result = -1;
+ if (buffers.size() == 1) {
+ write_result = HANDLE_EINTR(
+ write(fd_.get().fd, buffers[0].addr, buffers[0].size));
+ } else {
+ // Note that using |writev()| is measurably slower than using |write()| --
+ // at least in a microbenchmark -- but much faster than using multiple
+ // |write()|s.
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
+ }
+
+ write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count));
}
- write_message_queue_.push_front(message.release());
- DCHECK_EQ(write_message_offset_, 0u);
- bool result = WriteFrontMessageNoLock();
- DCHECK(result || write_message_queue_.empty());
+ if (write_result >= 0) {
+ *bytes_written = static_cast<size_t>(write_result);
+ return IO_SUCCEEDED;
+ }
+
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "write of size "
+ << write_buffer_no_lock()->GetTotalBytesToWrite();
+ return IO_FAILED;
+ }
- if (!result) {
- // Even if we're on the I/O thread, don't call |OnFatalError()| in the
- // nested context.
+ return ScheduleWriteNoLock();
+}
+
+RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
+ write_lock().AssertAcquired();
+
+ DCHECK(!pending_write_);
+
+ // Set up to wait for the FD to become writable.
+ // If we're not on the I/O thread, we have to post a task to do this.
+ if (base::MessageLoop::current() != message_loop_for_io()) {
message_loop_for_io()->PostTask(
FROM_HERE,
- base::Bind(&RawChannelPosix::CallOnFatalError,
- weak_ptr_factory_.GetWeakPtr(),
- Delegate::FATAL_ERROR_FAILED_WRITE));
- } else if (!write_message_queue_.empty()) {
- // Set up to wait for the FD to become writable. If we're not on the I/O
- // thread, we have to post a task to do this.
- if (base::MessageLoop::current() == message_loop_for_io()) {
- WaitToWrite();
- } else {
- message_loop_for_io()->PostTask(
- FROM_HERE,
- base::Bind(&RawChannelPosix::WaitToWrite,
- weak_ptr_factory_.GetWeakPtr()));
- }
+ base::Bind(&RawChannelPosix::WaitToWrite,
+ weak_ptr_factory_.GetWeakPtr()));
+ pending_write_ = true;
+ return IO_PENDING;
+ }
+
+ if (message_loop_for_io()->WatchFileDescriptor(
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
+ write_watcher_.get(), this)) {
+ pending_write_ = true;
+ return IO_PENDING;
}
- return result;
+ return IO_FAILED;
}
-void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
- DCHECK_EQ(fd, fd_.get().fd);
+bool RawChannelPosix::OnInit() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- bool did_dispatch_message = false;
- // Tracks the offset of the first undispatched message in |read_buffer_|.
- // Currently, we copy data to ensure that this is zero at the beginning.
- size_t read_buffer_start = 0;
- for (;;) {
- if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
- < kReadSize) {
- // Use power-of-2 buffer sizes.
- // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
- // maximum message size to whatever extent necessary).
- // TODO(vtl): We may often be able to peek at the header and get the real
- // required extra space (which may be much bigger than |kReadSize|).
- size_t new_size = std::max(read_buffer_.size(), kReadSize);
- while (new_size <
- read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
- new_size *= 2;
-
- // TODO(vtl): It's suboptimal to zero out the fresh memory.
- read_buffer_.resize(new_size, 0);
- }
+ DCHECK(!read_watcher_.get());
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+ DCHECK(!write_watcher_.get());
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
- ssize_t bytes_read = HANDLE_EINTR(
- read(fd_.get().fd,
- &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
- kReadSize));
- if (bytes_read < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- PLOG(ERROR) << "read";
+ if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
+ // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
+ // (in the sense of returning the message loop's state to what it was before
+ // it was called).
+ read_watcher_.reset();
+ write_watcher_.reset();
+ return false;
+ }
- // Make sure that |OnFileCanReadWithoutBlocking()| won't be called
- // again.
- read_watcher_.reset();
+ return true;
+}
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
- return;
- }
+void RawChannelPosix::OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> /*read_buffer*/,
+ scoped_ptr<WriteBuffer> /*write_buffer*/) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ write_lock().AssertAcquired();
- break;
- }
+ read_watcher_.reset(); // This will stop watching (if necessary).
+ write_watcher_.reset(); // This will stop watching (if necessary).
- read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
-
- // Dispatch all the messages that we can.
- size_t message_size;
- // Note that we rely on short-circuit evaluation here:
- // - |read_buffer_start| may be an invalid index into |read_buffer_| if
- // |read_buffer_num_valid_bytes_| is zero.
- // - |message_size| is only valid if |GetNextMessageSize()| returns true.
- // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
- // next read).
- while (read_buffer_num_valid_bytes_ > 0 &&
- MessageInTransit::GetNextMessageSize(
- &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_,
- &message_size) &&
- read_buffer_num_valid_bytes_ >= message_size) {
- // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
- // some sort of "view" abstraction.
- MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
- &read_buffer_[read_buffer_start]);
- DCHECK_EQ(message.total_size(), message_size);
-
- // Dispatch the message.
- delegate()->OnReadMessage(message);
- if (!read_watcher_.get()) {
- // |Shutdown()| was called in |OnReadMessage()|.
- // TODO(vtl): Add test for this case.
- return;
- }
- did_dispatch_message = true;
-
- // Update our state.
- read_buffer_start += message_size;
- read_buffer_num_valid_bytes_ -= message_size;
- }
+ pending_read_ = false;
+ pending_write_ = false;
- // If we dispatched any messages, stop reading for now (and let the message
- // loop do its thing for another round).
- // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
- // a single message. Risks: slower, more complex if we want to avoid lots of
- // copying. ii. Keep reading until there's no more data and dispatch all the
- // messages we can. Risks: starvation of other users of the message loop.)
- if (did_dispatch_message)
- break;
+ DCHECK(fd_.is_valid());
+ fd_.reset();
- // If we didn't max out |kReadSize|, stop reading for now.
- if (static_cast<size_t>(bytes_read) < kReadSize)
- break;
+ weak_ptr_factory_.InvalidateWeakPtrs();
+}
- // Else try to read some more....
- }
+void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
+ DCHECK_EQ(fd, fd_.get().fd);
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- // Move data back to start.
- if (read_buffer_start > 0) {
- if (read_buffer_num_valid_bytes_ > 0) {
- memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
- read_buffer_num_valid_bytes_);
- }
- read_buffer_start = 0;
+ if (!pending_read_) {
+ NOTREACHED();
+ return;
}
+
+ pending_read_ = false;
+ size_t bytes_read = 0;
+ IOResult result = Read(&bytes_read);
+ if (result != IO_PENDING)
+ OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
+
+ // On failure, |read_watcher_| must have been reset; on success,
+ // we assume that |OnReadCompleted()| always schedules another read.
+ // Otherwise, we could end up spinning -- getting
+ // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
+ // read.
+ // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
+ // schedule a new read. But that code won't be reached under the current
+ // RawChannel implementation.
+ DCHECK(!read_watcher_.get() || pending_read_);
}
void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_.get().fd);
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- bool did_fail = false;
+ IOResult result = IO_FAILED;
+ size_t bytes_written = 0;
{
- base::AutoLock locker(write_lock_);
- DCHECK_EQ(write_stopped_, write_message_queue_.empty());
+ base::AutoLock locker(write_lock());
- if (write_stopped_) {
- write_watcher_.reset();
- return;
- }
-
- bool result = WriteFrontMessageNoLock();
- DCHECK(result || write_message_queue_.empty());
+ DCHECK(pending_write_);
- if (!result) {
- did_fail = true;
- write_watcher_.reset();
- } else if (!write_message_queue_.empty()) {
- WaitToWrite();
- }
+ pending_write_ = false;
+ result = WriteNoLock(&bytes_written);
}
- if (did_fail)
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
+
+ if (result != IO_PENDING)
+ OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
}
void RawChannelPosix::WaitToWrite() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(write_watcher_.get());
- bool result = message_loop_for_io()->WatchFileDescriptor(
- fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
- write_watcher_.get(), this);
- DCHECK(result);
-}
-void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
- delegate()->OnFatalError(fatal_error);
-}
+ if (!message_loop_for_io()->WatchFileDescriptor(
+ fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
+ write_watcher_.get(), this)) {
+ {
+ base::AutoLock locker(write_lock());
-// TODO(vtl): This will collide with yzshen's work. This is just a hacky,
-// minimally-invasive function that does what I want (write/resume writing a
-// |MessageInTransit| that may consist of more than one buffer).
-ssize_t WriteMessageInTransit(int fd,
- MessageInTransit* message,
- size_t offset,
- size_t* bytes_to_write) {
- *bytes_to_write = message->total_size() - offset;
- if (!message->secondary_buffer_size()) {
- // Only write from the main buffer.
- DCHECK_LT(offset, message->main_buffer_size());
- DCHECK_LE(*bytes_to_write, message->main_buffer_size());
- return HANDLE_EINTR(
- write(fd,
- static_cast<const char*>(message->main_buffer()) + offset,
- *bytes_to_write));
- }
- if (offset >= message->main_buffer_size()) {
- // Only write from the secondary buffer.
- DCHECK_LT(offset - message->main_buffer_size(),
- message->secondary_buffer_size());
- DCHECK_LE(*bytes_to_write, message->secondary_buffer_size());
- return HANDLE_EINTR(
- write(fd,
- static_cast<const char*>(message->secondary_buffer()) +
- (offset - message->main_buffer_size()),
- *bytes_to_write));
- }
- // Write from both buffers. (Note that using |writev()| is measurably slower
- // than using |write()| -- at least in a microbenchmark -- but much faster
- // than using two |write()|s.)
- DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset +
- message->secondary_buffer_size());
- struct iovec iov[2] = {
- { const_cast<char*>(
- static_cast<const char*>(message->main_buffer()) + offset),
- message->main_buffer_size() - offset },
- { const_cast<void*>(message->secondary_buffer()),
- message->secondary_buffer_size() }
- };
- return HANDLE_EINTR(writev(fd, iov, 2));
-}
-
-bool RawChannelPosix::WriteFrontMessageNoLock() {
- write_lock_.AssertAcquired();
-
- DCHECK(!write_stopped_);
- DCHECK(!write_message_queue_.empty());
-
- MessageInTransit* message = write_message_queue_.front();
- DCHECK_LT(write_message_offset_, message->total_size());
- size_t bytes_to_write;
- ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd,
- message,
- write_message_offset_,
- &bytes_to_write);
- if (bytes_written < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- PLOG(ERROR) << "write of size " << bytes_to_write;
- CancelPendingWritesNoLock();
- return false;
+ DCHECK(pending_write_);
+ pending_write_ = false;
}
-
- // We simply failed to write since we'd block. The logic is the same as if
- // we got a partial write.
- bytes_written = 0;
- }
-
- DCHECK_GE(bytes_written, 0);
- if (static_cast<size_t>(bytes_written) < bytes_to_write) {
- // Partial (or no) write.
- write_message_offset_ += static_cast<size_t>(bytes_written);
- } else {
- // Complete write.
- DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
- write_message_queue_.pop_front();
- delete message;
- write_message_offset_ = 0;
+ OnWriteCompleted(false, 0);
}
-
- return true;
-}
-
-void RawChannelPosix::CancelPendingWritesNoLock() {
- write_lock_.AssertAcquired();
- DCHECK(!write_stopped_);
-
- write_stopped_ = true;
- STLDeleteElements(&write_message_queue_);
}
} // namespace