diff options
Diffstat (limited to 'mojo/system/raw_channel_posix.cc')
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 478 |
1 files changed, 169 insertions, 309 deletions
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index fa1a5ce..00a5448 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -5,13 +5,10 @@ #include "mojo/system/raw_channel.h" #include <errno.h> -#include <string.h> #include <sys/uio.h> #include <unistd.h> #include <algorithm> -#include <deque> -#include <vector> #include "base/basictypes.h" #include "base/bind.h" @@ -22,18 +19,14 @@ #include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" #include "base/posix/eintr_wrapper.h" -#include "base/stl_util.h" #include "base/synchronization/lock.h" #include "mojo/system/embedder/platform_handle.h" -#include "mojo/system/message_in_transit.h" namespace mojo { namespace system { namespace { -const size_t kReadSize = 4096; - class RawChannelPosix : public RawChannel, public base::MessageLoopForIO::Watcher { public: @@ -42,12 +35,17 @@ class RawChannelPosix : public RawChannel, base::MessageLoopForIO* message_loop_for_io); virtual ~RawChannelPosix(); + private: // |RawChannel| implementation: - virtual bool Init() OVERRIDE; - virtual void Shutdown() OVERRIDE; - virtual bool WriteMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; + virtual IOResult Read(size_t* bytes_read) OVERRIDE; + virtual IOResult ScheduleRead() OVERRIDE; + virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; + virtual IOResult ScheduleWriteNoLock() OVERRIDE; + virtual bool OnInit() OVERRIDE; + virtual void OnShutdownNoLock( + scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; - private: // |base::MessageLoopForIO::Watcher| implementation: virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; @@ -55,40 +53,18 @@ class RawChannelPosix : public RawChannel, // Watches for |fd_| to become writable. Must be called on the I/O thread. void WaitToWrite(); - // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O - // thread WITHOUT |write_lock_| held. - void CallOnFatalError(Delegate::FatalError fatal_error); - - // Writes the message at the front of |write_message_queue_|, starting at - // |write_message_offset_|. It removes and destroys if the write completes and - // otherwise updates |write_message_offset_|. Returns true on success. Must be - // called under |write_lock_|. - bool WriteFrontMessageNoLock(); - - // Cancels all pending writes and destroys the contents of - // |write_message_queue_|. Should only be called if |write_stopped_| is false; - // sets |write_stopped_| to true. Must be called under |write_lock_|. - void CancelPendingWritesNoLock(); - embedder::ScopedPlatformHandle fd_; - // Only used on the I/O thread: + // The following members are only used on the I/O thread: scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; - // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| - // is always aligned with a message boundary (we will copy memory to ensure - // this), but |read_buffer_| may be larger than the actual number of bytes we - // have. - std::vector<char> read_buffer_; - size_t read_buffer_num_valid_bytes_; - - base::Lock write_lock_; // Protects the following members. - bool write_stopped_; - // TODO(vtl): When C++11 is available, switch this to a deque of - // |scoped_ptr|/|unique_ptr|s. - std::deque<MessageInTransit*> write_message_queue_; - size_t write_message_offset_; + bool pending_read_; + + // The following members are used on multiple threads and protected by + // |write_lock()|: + bool pending_write_; + // This is used for posting tasks from write threads to the I/O thread. It // must only be accessed under |write_lock_|. The weak pointers it produces // are only used/invalidated on the I/O thread. @@ -102,20 +78,17 @@ RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, base::MessageLoopForIO* message_loop_for_io) : RawChannel(delegate, message_loop_for_io), fd_(handle.Pass()), - read_buffer_num_valid_bytes_(0), - write_stopped_(false), - write_message_offset_(0), + pending_read_(false), + pending_write_(false), weak_ptr_factory_(this) { - CHECK_EQ(RawChannel::message_loop_for_io()->type(), - base::MessageLoop::TYPE_IO); DCHECK(fd_.is_valid()); } RawChannelPosix::~RawChannelPosix() { - DCHECK(write_stopped_); - DCHECK(!fd_.is_valid()); + DCHECK(!pending_read_); + DCHECK(!pending_write_); - // No need to take the |write_lock_| here -- if there are still weak pointers + // No need to take the |write_lock()| here -- if there are still weak pointers // outstanding, then we're hosed anyway (since we wouldn't be able to // invalidate them cleanly, since we might not be on the I/O thread). DCHECK(!weak_ptr_factory_.HasWeakPtrs()); @@ -125,324 +98,211 @@ RawChannelPosix::~RawChannelPosix() { DCHECK(!write_watcher_.get()); } -bool RawChannelPosix::Init() { +RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(!pending_read_); - DCHECK(!read_watcher_.get()); - read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); - DCHECK(!write_watcher_.get()); - write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + char* buffer = NULL; + size_t bytes_to_read = 0; + read_buffer()->GetBuffer(&buffer, &bytes_to_read); - // No need to take the lock. No one should be using us yet. - DCHECK(write_message_queue_.empty()); + ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read)); - if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, - base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { - // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly - // (in the sense of returning the message loop's state to what it was before - // it was called). + if (read_result >= 0) { + *bytes_read = static_cast<size_t>(read_result); + return IO_SUCCEEDED; + } + + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "read"; + + // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. read_watcher_.reset(); - write_watcher_.reset(); - return false; + + return IO_FAILED; } - return true; + return ScheduleRead(); } -void RawChannelPosix::Shutdown() { +RawChannel::IOResult RawChannelPosix::ScheduleRead() { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(!pending_read_); - base::AutoLock locker(write_lock_); - if (!write_stopped_) - CancelPendingWritesNoLock(); + pending_read_ = true; - read_watcher_.reset(); // This will stop watching (if necessary). - write_watcher_.reset(); // This will stop watching (if necessary). + return IO_PENDING; +} - DCHECK(fd_.is_valid()); - fd_.reset(); +RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { + write_lock().AssertAcquired(); - weak_ptr_factory_.InvalidateWeakPtrs(); -} + DCHECK(!pending_write_); -// Reminder: This must be thread-safe, and takes ownership of |message|. -bool RawChannelPosix::WriteMessage(scoped_ptr<MessageInTransit> message) { - base::AutoLock locker(write_lock_); - if (write_stopped_) - return false; + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); - if (!write_message_queue_.empty()) { - write_message_queue_.push_back(message.release()); - return true; + ssize_t write_result = -1; + if (buffers.size() == 1) { + write_result = HANDLE_EINTR( + write(fd_.get().fd, buffers[0].addr, buffers[0].size)); + } else { + // Note that using |writev()| is measurably slower than using |write()| -- + // at least in a microbenchmark -- but much faster than using multiple + // |write()|s. + const size_t kMaxBufferCount = 10; + iovec iov[kMaxBufferCount]; + size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); + + for (size_t i = 0; i < buffer_count; ++i) { + iov[i].iov_base = const_cast<char*>(buffers[i].addr); + iov[i].iov_len = buffers[i].size; + } + + write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count)); } - write_message_queue_.push_front(message.release()); - DCHECK_EQ(write_message_offset_, 0u); - bool result = WriteFrontMessageNoLock(); - DCHECK(result || write_message_queue_.empty()); + if (write_result >= 0) { + *bytes_written = static_cast<size_t>(write_result); + return IO_SUCCEEDED; + } + + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "write of size " + << write_buffer_no_lock()->GetTotalBytesToWrite(); + return IO_FAILED; + } - if (!result) { - // Even if we're on the I/O thread, don't call |OnFatalError()| in the - // nested context. + return ScheduleWriteNoLock(); +} + +RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { + write_lock().AssertAcquired(); + + DCHECK(!pending_write_); + + // Set up to wait for the FD to become writable. + // If we're not on the I/O thread, we have to post a task to do this. + if (base::MessageLoop::current() != message_loop_for_io()) { message_loop_for_io()->PostTask( FROM_HERE, - base::Bind(&RawChannelPosix::CallOnFatalError, - weak_ptr_factory_.GetWeakPtr(), - Delegate::FATAL_ERROR_FAILED_WRITE)); - } else if (!write_message_queue_.empty()) { - // Set up to wait for the FD to become writable. If we're not on the I/O - // thread, we have to post a task to do this. - if (base::MessageLoop::current() == message_loop_for_io()) { - WaitToWrite(); - } else { - message_loop_for_io()->PostTask( - FROM_HERE, - base::Bind(&RawChannelPosix::WaitToWrite, - weak_ptr_factory_.GetWeakPtr())); - } + base::Bind(&RawChannelPosix::WaitToWrite, + weak_ptr_factory_.GetWeakPtr())); + pending_write_ = true; + return IO_PENDING; + } + + if (message_loop_for_io()->WatchFileDescriptor( + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, + write_watcher_.get(), this)) { + pending_write_ = true; + return IO_PENDING; } - return result; + return IO_FAILED; } -void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { - DCHECK_EQ(fd, fd_.get().fd); +bool RawChannelPosix::OnInit() { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - bool did_dispatch_message = false; - // Tracks the offset of the first undispatched message in |read_buffer_|. - // Currently, we copy data to ensure that this is zero at the beginning. - size_t read_buffer_start = 0; - for (;;) { - if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) - < kReadSize) { - // Use power-of-2 buffer sizes. - // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the - // maximum message size to whatever extent necessary). - // TODO(vtl): We may often be able to peek at the header and get the real - // required extra space (which may be much bigger than |kReadSize|). - size_t new_size = std::max(read_buffer_.size(), kReadSize); - while (new_size < - read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) - new_size *= 2; - - // TODO(vtl): It's suboptimal to zero out the fresh memory. - read_buffer_.resize(new_size, 0); - } + DCHECK(!read_watcher_.get()); + read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + DCHECK(!write_watcher_.get()); + write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); - ssize_t bytes_read = HANDLE_EINTR( - read(fd_.get().fd, - &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], - kReadSize)); - if (bytes_read < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - PLOG(ERROR) << "read"; + if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, + base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { + // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly + // (in the sense of returning the message loop's state to what it was before + // it was called). + read_watcher_.reset(); + write_watcher_.reset(); + return false; + } - // Make sure that |OnFileCanReadWithoutBlocking()| won't be called - // again. - read_watcher_.reset(); + return true; +} - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); - return; - } +void RawChannelPosix::OnShutdownNoLock( + scoped_ptr<ReadBuffer> /*read_buffer*/, + scoped_ptr<WriteBuffer> /*write_buffer*/) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + write_lock().AssertAcquired(); - break; - } + read_watcher_.reset(); // This will stop watching (if necessary). + write_watcher_.reset(); // This will stop watching (if necessary). - read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); - - // Dispatch all the messages that we can. - size_t message_size; - // Note that we rely on short-circuit evaluation here: - // - |read_buffer_start| may be an invalid index into |read_buffer_| if - // |read_buffer_num_valid_bytes_| is zero. - // - |message_size| is only valid if |GetNextMessageSize()| returns true. - // TODO(vtl): Use |message_size| more intelligently (e.g., to request the - // next read). - while (read_buffer_num_valid_bytes_ > 0 && - MessageInTransit::GetNextMessageSize( - &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, - &message_size) && - read_buffer_num_valid_bytes_ >= message_size) { - // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with - // some sort of "view" abstraction. - MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, - &read_buffer_[read_buffer_start]); - DCHECK_EQ(message.total_size(), message_size); - - // Dispatch the message. - delegate()->OnReadMessage(message); - if (!read_watcher_.get()) { - // |Shutdown()| was called in |OnReadMessage()|. - // TODO(vtl): Add test for this case. - return; - } - did_dispatch_message = true; - - // Update our state. - read_buffer_start += message_size; - read_buffer_num_valid_bytes_ -= message_size; - } + pending_read_ = false; + pending_write_ = false; - // If we dispatched any messages, stop reading for now (and let the message - // loop do its thing for another round). - // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only - // a single message. Risks: slower, more complex if we want to avoid lots of - // copying. ii. Keep reading until there's no more data and dispatch all the - // messages we can. Risks: starvation of other users of the message loop.) - if (did_dispatch_message) - break; + DCHECK(fd_.is_valid()); + fd_.reset(); - // If we didn't max out |kReadSize|, stop reading for now. - if (static_cast<size_t>(bytes_read) < kReadSize) - break; + weak_ptr_factory_.InvalidateWeakPtrs(); +} - // Else try to read some more.... - } +void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { + DCHECK_EQ(fd, fd_.get().fd); + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - // Move data back to start. - if (read_buffer_start > 0) { - if (read_buffer_num_valid_bytes_ > 0) { - memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], - read_buffer_num_valid_bytes_); - } - read_buffer_start = 0; + if (!pending_read_) { + NOTREACHED(); + return; } + + pending_read_ = false; + size_t bytes_read = 0; + IOResult result = Read(&bytes_read); + if (result != IO_PENDING) + OnReadCompleted(result == IO_SUCCEEDED, bytes_read); + + // On failure, |read_watcher_| must have been reset; on success, + // we assume that |OnReadCompleted()| always schedules another read. + // Otherwise, we could end up spinning -- getting + // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual + // read. + // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't + // schedule a new read. But that code won't be reached under the current + // RawChannel implementation. + DCHECK(!read_watcher_.get() || pending_read_); } void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { DCHECK_EQ(fd, fd_.get().fd); DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - bool did_fail = false; + IOResult result = IO_FAILED; + size_t bytes_written = 0; { - base::AutoLock locker(write_lock_); - DCHECK_EQ(write_stopped_, write_message_queue_.empty()); + base::AutoLock locker(write_lock()); - if (write_stopped_) { - write_watcher_.reset(); - return; - } - - bool result = WriteFrontMessageNoLock(); - DCHECK(result || write_message_queue_.empty()); + DCHECK(pending_write_); - if (!result) { - did_fail = true; - write_watcher_.reset(); - } else if (!write_message_queue_.empty()) { - WaitToWrite(); - } + pending_write_ = false; + result = WriteNoLock(&bytes_written); } - if (did_fail) - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); + + if (result != IO_PENDING) + OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); } void RawChannelPosix::WaitToWrite() { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); DCHECK(write_watcher_.get()); - bool result = message_loop_for_io()->WatchFileDescriptor( - fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, - write_watcher_.get(), this); - DCHECK(result); -} -void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { - DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); - // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? - delegate()->OnFatalError(fatal_error); -} + if (!message_loop_for_io()->WatchFileDescriptor( + fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, + write_watcher_.get(), this)) { + { + base::AutoLock locker(write_lock()); -// TODO(vtl): This will collide with yzshen's work. This is just a hacky, -// minimally-invasive function that does what I want (write/resume writing a -// |MessageInTransit| that may consist of more than one buffer). -ssize_t WriteMessageInTransit(int fd, - MessageInTransit* message, - size_t offset, - size_t* bytes_to_write) { - *bytes_to_write = message->total_size() - offset; - if (!message->secondary_buffer_size()) { - // Only write from the main buffer. - DCHECK_LT(offset, message->main_buffer_size()); - DCHECK_LE(*bytes_to_write, message->main_buffer_size()); - return HANDLE_EINTR( - write(fd, - static_cast<const char*>(message->main_buffer()) + offset, - *bytes_to_write)); - } - if (offset >= message->main_buffer_size()) { - // Only write from the secondary buffer. - DCHECK_LT(offset - message->main_buffer_size(), - message->secondary_buffer_size()); - DCHECK_LE(*bytes_to_write, message->secondary_buffer_size()); - return HANDLE_EINTR( - write(fd, - static_cast<const char*>(message->secondary_buffer()) + - (offset - message->main_buffer_size()), - *bytes_to_write)); - } - // Write from both buffers. (Note that using |writev()| is measurably slower - // than using |write()| -- at least in a microbenchmark -- but much faster - // than using two |write()|s.) - DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset + - message->secondary_buffer_size()); - struct iovec iov[2] = { - { const_cast<char*>( - static_cast<const char*>(message->main_buffer()) + offset), - message->main_buffer_size() - offset }, - { const_cast<void*>(message->secondary_buffer()), - message->secondary_buffer_size() } - }; - return HANDLE_EINTR(writev(fd, iov, 2)); -} - -bool RawChannelPosix::WriteFrontMessageNoLock() { - write_lock_.AssertAcquired(); - - DCHECK(!write_stopped_); - DCHECK(!write_message_queue_.empty()); - - MessageInTransit* message = write_message_queue_.front(); - DCHECK_LT(write_message_offset_, message->total_size()); - size_t bytes_to_write; - ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd, - message, - write_message_offset_, - &bytes_to_write); - if (bytes_written < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - PLOG(ERROR) << "write of size " << bytes_to_write; - CancelPendingWritesNoLock(); - return false; + DCHECK(pending_write_); + pending_write_ = false; } - - // We simply failed to write since we'd block. The logic is the same as if - // we got a partial write. - bytes_written = 0; - } - - DCHECK_GE(bytes_written, 0); - if (static_cast<size_t>(bytes_written) < bytes_to_write) { - // Partial (or no) write. - write_message_offset_ += static_cast<size_t>(bytes_written); - } else { - // Complete write. - DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); - write_message_queue_.pop_front(); - delete message; - write_message_offset_ = 0; + OnWriteCompleted(false, 0); } - - return true; -} - -void RawChannelPosix::CancelPendingWritesNoLock() { - write_lock_.AssertAcquired(); - DCHECK(!write_stopped_); - - write_stopped_ = true; - STLDeleteElements(&write_message_queue_); } } // namespace |