diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-11 19:07:18 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-11 19:07:18 +0000 |
commit | 5a0d0066f349bb4e52421cce97460477b137154f (patch) | |
tree | 3d20d6ab84563aa7e710b7f81ff0e8e965921fbe | |
parent | 7d687af81a2683b07ee999968a4de0a3e1a4b989 (diff) | |
download | chromium_src-5a0d0066f349bb4e52421cce97460477b137154f.zip chromium_src-5a0d0066f349bb4e52421cce97460477b137154f.tar.gz chromium_src-5a0d0066f349bb4e52421cce97460477b137154f.tar.bz2 |
Add a "RawChannel" abstraction (with an implementation for Chromium POSIX).
This abstraction is meant to provide for reading/writing |MessageInTransit|s to
an OS-level pipe. Writes (|WriteMessage()|) can be done from any thread. Reads
are handled on the I/O message loop's thread.
(We need a MessageLoopForIO since we need its FD-watching capabilities, at least
on POSIX. This is really inessential -- we really only need/want a thread that's
capable of watching FDs. Possibly, things should be abstracted further.)
R=darin@chromium.org
Review URL: https://codereview.chromium.org/26615002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228210 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | mojo/mojo.gyp | 4 | ||||
-rw-r--r-- | mojo/system/message_in_transit.cc | 29 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 18 | ||||
-rw-r--r-- | mojo/system/message_pipe.cc | 6 | ||||
-rw-r--r-- | mojo/system/platform_channel_handle.h | 26 | ||||
-rw-r--r-- | mojo/system/raw_channel.h | 93 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 351 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix_unittest.cc | 467 |
8 files changed, 984 insertions, 10 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp index 43c9961..d953fb8 100644 --- a/mojo/mojo.gyp +++ b/mojo/mojo.gyp @@ -86,6 +86,9 @@ 'system/message_pipe.h', 'system/message_pipe_dispatcher.cc', 'system/message_pipe_dispatcher.h', + 'system/platform_channel_handle.h', + 'system/raw_channel.h', + 'system/raw_channel_posix.cc', 'system/simple_dispatcher.cc', 'system/simple_dispatcher.h', 'system/waiter.cc', @@ -114,6 +117,7 @@ 'system/dispatcher_unittest.cc', 'system/message_pipe_dispatcher_unittest.cc', 'system/message_pipe_unittest.cc', + 'system/raw_channel_posix_unittest.cc', 'system/simple_dispatcher_unittest.cc', 'system/test_utils.h', 'system/waiter_list_unittest.cc', diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc index 5a39ded..ba05957 100644 --- a/mojo/system/message_in_transit.cc +++ b/mojo/system/message_in_transit.cc @@ -10,6 +10,8 @@ #include <new> #include "base/basictypes.h" +#include "base/logging.h" +#include "build/build_config.h" #include "mojo/system/limits.h" namespace mojo { @@ -21,16 +23,33 @@ COMPILE_ASSERT(static_cast<uint64_t>(sizeof(MessageInTransit)) + kMaxMessageNumBytes <= 0x7fffffff, kMaxMessageNumBytes_too_big); +COMPILE_ASSERT(sizeof(MessageInTransit) % + MessageInTransit::kMessageAlignment == 0, + sizeof_MessageInTransit_not_a_multiple_of_alignment); + +// C++ requires that storage be declared (in a single compilation unit), but +// MSVS isn't standards-conformant and doesn't handle this correctly. +#if !defined(COMPILER_MSVC) +const size_t MessageInTransit::kMessageAlignment; +#endif + // static MessageInTransit* MessageInTransit::Create(const void* bytes, uint32_t num_bytes) { - // Store the data immediately after the "header", so allocate the needed - // space. - char* buffer = static_cast<char*>( - malloc(sizeof(MessageInTransit) + static_cast<size_t>(num_bytes))); - // And do a placement new. + const size_t size_with_header = sizeof(MessageInTransit) + num_bytes; + const size_t size_with_header_and_padding = + RoundUpMessageAlignment(size_with_header); + + char* buffer = static_cast<char*>(malloc(size_with_header_and_padding)); + DCHECK_EQ(reinterpret_cast<size_t>(buffer) % + MessageInTransit::kMessageAlignment, 0u); + + // The buffer consists of the header (a |MessageInTransit|, constructed using + // a placement new), followed by the data, followed by padding (of zeros). MessageInTransit* rv = new (buffer) MessageInTransit(num_bytes); memcpy(buffer + sizeof(MessageInTransit), bytes, num_bytes); + memset(buffer + size_with_header, 0, + size_with_header_and_padding - size_with_header); return rv; } diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index be37b98..693e876 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -9,13 +9,14 @@ #include <stdlib.h> // For |free()|. #include "base/basictypes.h" +#include "mojo/public/system/system_export.h" namespace mojo { namespace system { // This class is used to represent data in transit. It is thread-unsafe. // Note: This class is POD. -class MessageInTransit { +class MOJO_SYSTEM_EXPORT MessageInTransit { public: // Creates a |MessageInTransit| with the data given by |bytes|/|num_bytes|. static MessageInTransit* Create(const void* bytes, uint32_t num_bytes); @@ -27,7 +28,7 @@ class MessageInTransit { } // Gets the size of the data (in number of bytes). - uint32_t size() const { + uint32_t data_size() const { return size_; } @@ -36,8 +37,21 @@ class MessageInTransit { return reinterpret_cast<const char*>(this) + sizeof(*this); } + size_t size_with_header_and_padding() const { + return RoundUpMessageAlignment(sizeof(*this) + size_); + } + // TODO(vtl): Add whatever's necessary to transport handles. + // Messages (the header and data) must always be aligned to a multiple of this + // quantity (which must be a power of 2). + static const size_t kMessageAlignment = 8; + + // Rounds |n| up to a multiple of |kMessageAlignment|. + static inline size_t RoundUpMessageAlignment(size_t n) { + return (n + kMessageAlignment - 1) & ~(kMessageAlignment - 1); + } + private: explicit MessageInTransit(uint32_t size) : size_(size), reserved_(0), user_1_(0), user_2_(0) {} diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc index c84959c..8cd94cb 100644 --- a/mojo/system/message_pipe.cc +++ b/mojo/system/message_pipe.cc @@ -126,9 +126,9 @@ MojoResult MessagePipe::ReadMessage(unsigned port, bool not_enough_space = false; MessageInTransit* const message = message_queues_[port].front(); if (num_bytes) - *num_bytes = message->size(); - if (message->size() <= max_bytes) - memcpy(bytes, message->data(), message->size()); + *num_bytes = message->data_size(); + if (message->data_size() <= max_bytes) + memcpy(bytes, message->data(), message->data_size()); else not_enough_space = true; diff --git a/mojo/system/platform_channel_handle.h b/mojo/system/platform_channel_handle.h new file mode 100644 index 0000000..b848aef --- /dev/null +++ b/mojo/system/platform_channel_handle.h @@ -0,0 +1,26 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_PLATFORM_CHANNEL_HANDLE_H_ +#define MOJO_SYSTEM_PLATFORM_CHANNEL_HANDLE_H_ + +#include "build/build_config.h" + +namespace mojo { +namespace system { + +#if defined(OS_POSIX) +struct PlatformChannelHandle { + explicit PlatformChannelHandle(int fd) : fd(fd) {} + + int fd; +}; +#else +#error "Platform not yet supported." +#endif + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_PLATFORM_CHANNEL_HANDLE_H_ diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h new file mode 100644 index 0000000..c56dbb5 --- /dev/null +++ b/mojo/system/raw_channel.h @@ -0,0 +1,93 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_RAW_CHANNEL_H_ +#define MOJO_SYSTEM_RAW_CHANNEL_H_ + +#include <vector> + +#include "base/basictypes.h" +#include "mojo/public/system/system_export.h" +#include "mojo/system/limits.h" + +namespace base { +class MessageLoop; +} + +namespace mojo { +namespace system { + +class MessageInTransit; + +// This simply wraps an |int| file descriptor on POSIX and a |HANDLE| on +// Windows, but we don't want to impose, e.g., the inclusion of windows.h on +// everyone. +struct PlatformChannelHandle; + +// |RawChannel| is an interface to objects that wrap an OS "pipe". It presents +// the following interface to users: +// - Receives and dispatches messages on a thread (running a |MessageLoop|; it +// must be a |MessageLoopForIO| in the case of the POSIX libevent +// implementation). +// - Provides a thread-safe way of writing messages (|WriteMessage()|); +// writing/queueing messages will not block and is atomic from the point of +// view of the caller. If necessary, messages are queued (to be written on +// the aforementioned thread). +// +// OS-specific implementation subclasses are to be instantiated using the +// |Create()| static factory method. +// +// With the exception of |WriteMessage()|, this class is thread-unsafe (and in +// general its methods should only be used on the I/O thread). +class MOJO_SYSTEM_EXPORT RawChannel { + public: + virtual ~RawChannel() {} + + // The |Delegate| is only accessed on the same thread as the message loop + // (passed in on creation). + class Delegate { + public: + // Called when a message is read. This may call |Shutdown()| on the + // |RawChannel|, but must not destroy it. + virtual void OnReadMessage(const MessageInTransit& message) = 0; + + protected: + virtual ~Delegate() {} + }; + + // Static factory method. Takes ownership of |handle| (i.e., will close it). + // Does *not* take ownership of |delegate| and |message_loop|, which must + // remain alive while this object does. + static RawChannel* Create(const PlatformChannelHandle& handle, + Delegate* delegate, + base::MessageLoop* message_loop); + + // This must be called (on the I/O thread) before this object is used. + virtual void Init() = 0; + + // This must be called (on the I/O thread) before this object is destroyed. + virtual void Shutdown() = 0; + + // This is thread-safe. It takes ownership of |message| (always, even on + // failure). Returns true on success. + virtual bool WriteMessage(MessageInTransit* message) = 0; + + protected: + RawChannel(Delegate* delegate, base::MessageLoop* message_loop) + : delegate_(delegate), message_loop_(message_loop) {} + + Delegate* delegate() { return delegate_; } + base::MessageLoop* message_loop() { return message_loop_; } + + private: + Delegate* const delegate_; + base::MessageLoop* const message_loop_; + + DISALLOW_COPY_AND_ASSIGN(RawChannel); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_RAW_CHANNEL_H_ diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc new file mode 100644 index 0000000..d81d1bf --- /dev/null +++ b/mojo/system/raw_channel_posix.cc @@ -0,0 +1,351 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/raw_channel.h" + +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include <algorithm> +#include <deque> +#include <vector> + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/posix/eintr_wrapper.h" +#include "base/synchronization/lock.h" +#include "mojo/system/message_in_transit.h" +#include "mojo/system/platform_channel_handle.h" + +namespace mojo { +namespace system { + +const size_t kReadSize = 4096; + +class RawChannelPosix : public RawChannel, + public base::MessageLoopForIO::Watcher { + public: + RawChannelPosix(const PlatformChannelHandle& handle, + Delegate* delegate, + base::MessageLoop* message_loop); + virtual ~RawChannelPosix(); + + // |RawChannel| implementation: + virtual void Init() OVERRIDE; + virtual void Shutdown() OVERRIDE; + virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; + + private: + // |base::MessageLoopForIO::Watcher| implementation: + virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; + virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; + + // Watches for |fd_| to become writable. Must be called on the I/O thread. + void WaitToWrite(); + + // Writes the message at the front of |write_message_queue_|, starting at + // |write_message_offset_|. It removes and destroys if the write completes and + // otherwise updates |write_message_offset_|. Returns true on success. Must be + // called under |write_lock_|. + bool WriteFrontMessageNoLock(); + + // Cancels all pending writes and destroys the contents of + // |write_message_queue_|. Should only be called if |is_writable_| is true; + // sets |is_writable_| to false. Must be called under |write_lock_|. + void CancelPendingWritesNoLock(); + + base::MessageLoopForIO* message_loop_for_io() { + return static_cast<base::MessageLoopForIO*>(message_loop()); + } + + const int fd_; + + // Only used on the I/O thread: + scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; + scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; + + // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| + // is always aligned with a message boundary (we will copy memory to ensure + // this), but |read_buffer_| may be larger than the actual number of bytes we + // have. + std::vector<char> read_buffer_; + size_t read_buffer_num_valid_bytes_; + + base::Lock write_lock_; // Protects the following members. + bool is_writable_; + std::deque<MessageInTransit*> write_message_queue_; + size_t write_message_offset_; + + DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); +}; + +RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, + Delegate* delegate, + base::MessageLoop* message_loop) + : RawChannel(delegate, message_loop), + fd_(handle.fd), + read_buffer_num_valid_bytes_(0), + is_writable_(true), + write_message_offset_(0) { + CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); +} + +RawChannelPosix::~RawChannelPosix() { + close(fd_); + + // These must have been shut down/destroyed on the I/O thread. + DCHECK(!read_watcher_.get()); + DCHECK(!write_watcher_.get()); +} + +void RawChannelPosix::Init() { + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + + DCHECK(!read_watcher_.get()); + read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + DCHECK(!write_watcher_.get()); + write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); + + // No need to take the lock. No one should be using us yet. + DCHECK(write_message_queue_.empty()); + + bool result = message_loop_for_io()->WatchFileDescriptor( + fd_, true, base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); + DCHECK(result); +} + +void RawChannelPosix::Shutdown() { + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + + base::AutoLock locker(write_lock_); + if (is_writable_) + CancelPendingWritesNoLock(); + + read_watcher_.reset(); // This will stop watching (if necessary). + write_watcher_.reset(); // This will stop watching (if necessary). +} + +// Reminder: This must be thread-safe, and takes ownership of |message| on +// success. +bool RawChannelPosix::WriteMessage(MessageInTransit* message) { + base::AutoLock locker(write_lock_); + if (!is_writable_) { + message->Destroy(); + return false; + } + + if (!write_message_queue_.empty()) { + write_message_queue_.push_back(message); + return true; + } + + write_message_queue_.push_front(message); + DCHECK_EQ(write_message_offset_, 0u); + bool result = WriteFrontMessageNoLock(); + DCHECK(result || write_message_queue_.empty()); + + if (!write_message_queue_.empty()) { + // Set up to wait for the FD to become writable. If we're not on the I/O + // thread, we have to post a task to do this. + if (base::MessageLoop::current() == message_loop()) { + WaitToWrite(); + } else { + message_loop()->PostTask(FROM_HERE, + base::Bind(&RawChannelPosix::WaitToWrite, + base::Unretained(this))); + } + } + + return result; +} + +void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { + DCHECK_EQ(fd, fd_); + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + + bool did_dispatch_message = false; + // Tracks the offset of the first undispatched message in |read_buffer_|. + // Currently, we copy data to ensure that this is zero at the beginning. + size_t read_buffer_start = 0; + for (;;) { + if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) + < kReadSize) { + // Use power-of-2 buffer sizes. + // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the + // maximum message size to whatever extent necessary). + // TODO(vtl): We may often be able to peek at the header and get the real + // required extra space (which may be much bigger than |kReadSize|). + size_t new_size = std::max(read_buffer_.size(), kReadSize); + while (new_size < + read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) + new_size *= 2; + + // TODO(vtl): It's suboptimal to zero out the fresh memory. + read_buffer_.resize(new_size, 0); + } + + ssize_t bytes_read = HANDLE_EINTR( + read(fd_, + &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], + kReadSize)); + if (bytes_read < 0) { + base::AutoLock locker(write_lock_); + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "read"; + CancelPendingWritesNoLock(); + return; + } + + break; + } + + read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); + + // Dispatch all the messages that we can. + while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { + const MessageInTransit* message = + reinterpret_cast<const MessageInTransit*>( + &read_buffer_[read_buffer_start]); + DCHECK_EQ(reinterpret_cast<size_t>(message) % + MessageInTransit::kMessageAlignment, 0u); + // If we have the header, not the whole message.... + if (read_buffer_num_valid_bytes_ < + message->size_with_header_and_padding()) + break; + + // Dispatch the message. + delegate()->OnReadMessage(*message); + if (!read_watcher_.get()) { + // |Shutdown()| was called in |OnReadMessage()|. + // TODO(vtl): Add test for this case. + return; + } + did_dispatch_message = true; + + // Update our state. + read_buffer_start += message->size_with_header_and_padding(); + read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); + } + + // If we dispatched any messages, stop reading for now (and let the message + // loop do its thing for another round). + // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only + // a single message. Risks: slower, more complex if we want to avoid lots of + // copying. ii. Keep reading until there's no more data and dispatch all the + // messages we can. Risks: starvation of other users of the message loop.) + if (did_dispatch_message) + break; + + // If we didn't max out |kReadSize|, stop reading for now. + if (static_cast<size_t>(bytes_read) < kReadSize) + break; + + // Else try to read some more.... + } + + // Move data back to start. + if (read_buffer_start > 0) { + memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], + read_buffer_num_valid_bytes_); + read_buffer_start = 0; + } +} + +void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { + DCHECK_EQ(fd, fd_); + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + + base::AutoLock locker(write_lock_); + DCHECK(is_writable_); + DCHECK(!write_message_queue_.empty()); + + bool result = WriteFrontMessageNoLock(); + DCHECK(result || write_message_queue_.empty()); + + if (!write_message_queue_.empty()) + WaitToWrite(); +} + +void RawChannelPosix::WaitToWrite() { + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + + DCHECK(write_watcher_.get()); + bool result = message_loop_for_io()->WatchFileDescriptor( + fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), + this); + DCHECK(result); +} + +bool RawChannelPosix::WriteFrontMessageNoLock() { + write_lock_.AssertAcquired(); + + DCHECK(is_writable_); + DCHECK(!write_message_queue_.empty()); + + MessageInTransit* message = write_message_queue_.front(); + DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); + size_t bytes_to_write = + message->size_with_header_and_padding() - write_message_offset_; + ssize_t bytes_written = HANDLE_EINTR( + write(fd_, + reinterpret_cast<char*>(message) + write_message_offset_, + bytes_to_write)); + if (bytes_written < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + PLOG(ERROR) << "write of size " << bytes_to_write; + CancelPendingWritesNoLock(); + return false; + } + + // We simply failed to write since we'd block. The logic is the same as if + // we got a partial write. + bytes_written = 0; + } + + DCHECK_GE(bytes_written, 0); + if (static_cast<size_t>(bytes_written) < bytes_to_write) { + // Partial (or no) write. + write_message_offset_ += static_cast<size_t>(bytes_written); + } else { + // Complete write. + DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); + write_message_queue_.pop_front(); + write_message_offset_ = 0; + message->Destroy(); + } + + return true; +} + +void RawChannelPosix::CancelPendingWritesNoLock() { + write_lock_.AssertAcquired(); + DCHECK(is_writable_); + + is_writable_ = false; + for (std::deque<MessageInTransit*>::iterator it = + write_message_queue_.begin(); it != write_message_queue_.end(); + ++it) { + (*it)->Destroy(); + } + write_message_queue_.clear(); +} + +// ----------------------------------------------------------------------------- + +// Static factory method declared in raw_channel.h. +// static +RawChannel* RawChannel::Create(const PlatformChannelHandle& handle, + Delegate* delegate, + base::MessageLoop* message_loop) { + return new RawChannelPosix(handle, delegate, message_loop); +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc new file mode 100644 index 0000000..8599996 --- /dev/null +++ b/mojo/system/raw_channel_posix_unittest.cc @@ -0,0 +1,467 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// TODO(vtl): Factor out the POSIX-specific bits of this test (once we have a +// non-POSIX implementation). + +#include "mojo/system/raw_channel.h" + +#include <fcntl.h> +#include <stdint.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <vector> + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/callback.h" +#include "base/compiler_specific.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/scoped_vector.h" +#include "base/message_loop/message_loop.h" +#include "base/posix/eintr_wrapper.h" +#include "base/rand_util.h" +#include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/threading/simple_thread.h" +#include "base/threading/thread.h" +#include "base/time/time.h" +#include "mojo/system/message_in_transit.h" +#include "mojo/system/platform_channel_handle.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +void PostTaskAndWaitHelper(base::WaitableEvent* event, + const base::Closure& task) { + task.Run(); + event->Signal(); +} + +void PostTaskAndWait(base::MessageLoop* message_loop, + const tracked_objects::Location& from_here, + const base::Closure& task) { + base::WaitableEvent event(false, false); + message_loop->PostTask(from_here, + base::Bind(&PostTaskAndWaitHelper, &event, task)); + event.Wait(); +} + +// ----------------------------------------------------------------------------- + +MessageInTransit* MakeTestMessage(uint32_t num_bytes) { + std::vector<unsigned char> bytes(num_bytes, 0); + for (size_t i = 0; i < num_bytes; i++) + bytes[i] = static_cast<unsigned char>(i + num_bytes); + return MessageInTransit::Create(bytes.data(), num_bytes); +} + +bool CheckMessageData(const void* bytes, uint32_t num_bytes) { + const unsigned char* b = static_cast<const unsigned char*>(bytes); + for (uint32_t i = 0; i < num_bytes; i++) { + if (b[i] != static_cast<unsigned char>(i + num_bytes)) + return false; + } + return true; +} + +// ----------------------------------------------------------------------------- + +class RawChannelPosixTest : public testing::Test { + public: + RawChannelPosixTest() : io_thread_("io_thread") { + fds_[0] = -1; + fds_[1] = -1; + } + + virtual ~RawChannelPosixTest() { + } + + virtual void SetUp() OVERRIDE { + io_thread_.StartWithOptions( + base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); + + // Create the socket. + PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_) == 0); + + // Set the ends to non-blocking. + PCHECK(fcntl(fds_[0], F_SETFL, O_NONBLOCK) == 0); + PCHECK(fcntl(fds_[1], F_SETFL, O_NONBLOCK) == 0); + } + + virtual void TearDown() OVERRIDE { + if (fds_[0] != -1) + close(fds_[0]); + if (fds_[1] != -1) + close(fds_[1]); + + io_thread_.Stop(); + } + + protected: + int fd(size_t i) { return fds_[i]; } + void clear_fd(size_t i) { fds_[i] = -1; } + + base::MessageLoop* message_loop() { return io_thread_.message_loop(); } + + private: + base::Thread io_thread_; + int fds_[2]; + + DISALLOW_COPY_AND_ASSIGN(RawChannelPosixTest); +}; + +// RawChannelPosixTest.WriteMessage -------------------------------------------- + +class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { + public: + WriteOnlyRawChannelDelegate() {} + virtual ~WriteOnlyRawChannelDelegate() {} + + // |RawChannel::Delegate| implementation: + virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE { + NOTREACHED(); + } + + private: + DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate); +}; + +static const int64_t kMessageReaderSleepMs = 1; +static const size_t kMessageReaderMaxPollIterations = 3000; + +class TestMessageReaderAndChecker { + public: + explicit TestMessageReaderAndChecker(int fd) : fd_(fd) {} + ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); } + + bool ReadAndCheckNextMessage(uint32_t expected_size) { + unsigned char buffer[4096]; + + for (size_t i = 0; i < kMessageReaderMaxPollIterations;) { + ssize_t read_size = HANDLE_EINTR(read(fd_, buffer, sizeof(buffer))); + if (read_size < 0) { + PCHECK(errno == EAGAIN || errno == EWOULDBLOCK); + read_size = 0; + } + + // Append newly-read data to |bytes_|. + bytes_.insert(bytes_.end(), buffer, buffer + read_size); + + // If we have the header.... + if (bytes_.size() >= sizeof(MessageInTransit)) { + const MessageInTransit* message = + reinterpret_cast<const MessageInTransit*>(bytes_.data()); + CHECK_EQ(reinterpret_cast<size_t>(message) % + MessageInTransit::kMessageAlignment, 0u); + + if (message->data_size() != expected_size) { + LOG(ERROR) << "Wrong size: " << message->data_size() << " instead of " + << expected_size << " bytes."; + return false; + } + + // If we've read the whole message.... + if (bytes_.size() >= message->size_with_header_and_padding()) { + if (!CheckMessageData(message->data(), message->data_size())) { + LOG(ERROR) << "Incorrect message data."; + return false; + } + + // Erase message data. + bytes_.erase(bytes_.begin(), + bytes_.begin() + + message->size_with_header_and_padding()); + return true; + } + } + + if (static_cast<size_t>(read_size) < sizeof(buffer)) { + i++; + base::PlatformThread::Sleep( + base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs)); + } + } + + LOG(ERROR) << "Too many iterations."; + return false; + } + + private: + const int fd_; + + // The start of the received data should always be on a message boundary. + std::vector<unsigned char> bytes_; + + DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker); +}; + +// Tests writing (and verifies reading using our own custom reader). +TEST_F(RawChannelPosixTest, WriteMessage) { + WriteOnlyRawChannelDelegate delegate; + scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)), + &delegate, + message_loop())); + // |RawChannel::Create()| takes ownership of the FD. + clear_fd(0); + + TestMessageReaderAndChecker checker(fd(1)); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Init, base::Unretained(rc.get()))); + + // Write and read, for a variety of sizes. + for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { + rc->WriteMessage(MakeTestMessage(size)); + EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; + } + + // Write/queue and read afterwards, for a variety of sizes. + for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) + rc->WriteMessage(MakeTestMessage(size)); + for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) + EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(rc.get()))); +} + +// RawChannelPosixTest.OnReadMessage ------------------------------------------- + +class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { + public: + ReadCheckerRawChannelDelegate() + : done_event_(false, false), + position_(0) {} + virtual ~ReadCheckerRawChannelDelegate() {} + + // |RawChannel::Delegate| implementation: + // Called on the I/O thread. + virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE { + size_t position; + size_t expected_size; + bool should_signal = false; + { + base::AutoLock locker(lock_); + CHECK_LT(position_, expected_sizes_.size()); + position = position_; + expected_size = expected_sizes_[position]; + position_++; + if (position_ >= expected_sizes_.size()) + should_signal = true; + } + + EXPECT_EQ(expected_size, message.data_size()) << position; + if (message.data_size() == expected_size) { + EXPECT_TRUE(CheckMessageData(message.data(), message.data_size())) + << position; + } + + if (should_signal) + done_event_.Signal(); + } + + // Wait for all the messages (of sizes |expected_sizes_|) to be seen. + void Wait() { + done_event_.Wait(); + } + + void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) { + base::AutoLock locker(lock_); + CHECK_EQ(position_, expected_sizes_.size()); + expected_sizes_ = expected_sizes; + position_ = 0; + } + + private: + base::WaitableEvent done_event_; + + base::Lock lock_; // Protects the following members. + std::vector<uint32_t> expected_sizes_; + size_t position_; + + DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate); +}; + +// Tests reading (writing using our own custom writer). +TEST_F(RawChannelPosixTest, OnReadMessage) { + // We're going to write to |fd(1)|. We'll do so in a blocking manner. + PCHECK(fcntl(fd(1), F_SETFL, 0) == 0); + + ReadCheckerRawChannelDelegate delegate; + scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)), + &delegate, + message_loop())); + // |RawChannel::Create()| takes ownership of the FD. + clear_fd(0); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Init, base::Unretained(rc.get()))); + + // Write and read, for a variety of sizes. + for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { + delegate.SetExpectedSizes(std::vector<uint32_t>(1, size)); + MessageInTransit* message = MakeTestMessage(size); + EXPECT_EQ(static_cast<ssize_t>(message->size_with_header_and_padding()), + write(fd(1), message, message->size_with_header_and_padding())); + message->Destroy(); + delegate.Wait(); + } + + // Set up reader and write as fast as we can. + // Write/queue and read afterwards, for a variety of sizes. + std::vector<uint32_t> expected_sizes; + for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) + expected_sizes.push_back(size); + delegate.SetExpectedSizes(expected_sizes); + for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { + MessageInTransit* message = MakeTestMessage(size); + EXPECT_EQ(static_cast<ssize_t>(message->size_with_header_and_padding()), + write(fd(1), message, message->size_with_header_and_padding())); + message->Destroy(); + } + delegate.Wait(); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(rc.get()))); +} + +// RawChannelPosixTest.WriteMessageAndOnReadMessage ---------------------------- + +class RawChannelWriterThread : public base::SimpleThread { + public: + RawChannelWriterThread(RawChannel* raw_channel, size_t write_count) + : base::SimpleThread("raw_channel_writer_thread"), + raw_channel_(raw_channel), + left_to_write_(write_count) { + } + + virtual ~RawChannelWriterThread() { + Join(); + } + + private: + virtual void Run() OVERRIDE { + static const int kMaxRandomMessageSize = 25000; + + while (left_to_write_-- > 0) { + raw_channel_->WriteMessage(MakeTestMessage( + static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize)))); + } + } + + RawChannel* const raw_channel_; + size_t left_to_write_; + + DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread); +}; + +class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { + public: + explicit ReadCountdownRawChannelDelegate(size_t expected_count) + : done_event_(false, false), + expected_count_(expected_count), + count_(0) {} + virtual ~ReadCountdownRawChannelDelegate() {} + + // |RawChannel::Delegate| implementation: + // Called on the I/O thread. + virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE { + EXPECT_LT(count_, expected_count_); + count_++; + + EXPECT_TRUE(CheckMessageData(message.data(), message.data_size())); + + if (count_ >= expected_count_) + done_event_.Signal(); + } + + // Wait for all the messages to have been seen. + void Wait() { + done_event_.Wait(); + } + + private: + base::WaitableEvent done_event_; + size_t expected_count_; + size_t count_; + + DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate); +}; + +TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) { + static const size_t kNumWriterThreads = 10; + static const size_t kNumWriteMessagesPerThread = 4000; + + WriteOnlyRawChannelDelegate writer_delegate; + scoped_ptr<RawChannel> writer_rc( + RawChannel::Create(PlatformChannelHandle(fd(0)), + &writer_delegate, + message_loop())); + // |RawChannel::Create()| takes ownership of the FD. + clear_fd(0); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Init, + base::Unretained(writer_rc.get()))); + + ReadCountdownRawChannelDelegate reader_delegate( + kNumWriterThreads * kNumWriteMessagesPerThread); + scoped_ptr<RawChannel> reader_rc( + RawChannel::Create(PlatformChannelHandle(fd(1)), + &reader_delegate, + message_loop())); + // |RawChannel::Create()| takes ownership of the FD. + clear_fd(1); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Init, + base::Unretained(reader_rc.get()))); + + { + ScopedVector<RawChannelWriterThread> writer_threads; + for (size_t i = 0; i < kNumWriterThreads; i++) { + writer_threads.push_back(new RawChannelWriterThread( + writer_rc.get(), kNumWriteMessagesPerThread)); + } + for (size_t i = 0; i < writer_threads.size(); i++) + writer_threads[i]->Start(); + } // Joins all the writer threads. + + // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be + // any, but we want to know about them.) + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); + + // Wait for reading to finish. + reader_delegate.Wait(); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(reader_rc.get()))); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(writer_rc.get()))); +} + +} // namespace +} // namespace system +} // namespace mojo |