summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-11 19:07:18 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-11 19:07:18 +0000
commit5a0d0066f349bb4e52421cce97460477b137154f (patch)
tree3d20d6ab84563aa7e710b7f81ff0e8e965921fbe
parent7d687af81a2683b07ee999968a4de0a3e1a4b989 (diff)
downloadchromium_src-5a0d0066f349bb4e52421cce97460477b137154f.zip
chromium_src-5a0d0066f349bb4e52421cce97460477b137154f.tar.gz
chromium_src-5a0d0066f349bb4e52421cce97460477b137154f.tar.bz2
Add a "RawChannel" abstraction (with an implementation for Chromium POSIX).
This abstraction is meant to provide for reading/writing |MessageInTransit|s to an OS-level pipe. Writes (|WriteMessage()|) can be done from any thread. Reads are handled on the I/O message loop's thread. (We need a MessageLoopForIO since we need its FD-watching capabilities, at least on POSIX. This is really inessential -- we really only need/want a thread that's capable of watching FDs. Possibly, things should be abstracted further.) R=darin@chromium.org Review URL: https://codereview.chromium.org/26615002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228210 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--mojo/mojo.gyp4
-rw-r--r--mojo/system/message_in_transit.cc29
-rw-r--r--mojo/system/message_in_transit.h18
-rw-r--r--mojo/system/message_pipe.cc6
-rw-r--r--mojo/system/platform_channel_handle.h26
-rw-r--r--mojo/system/raw_channel.h93
-rw-r--r--mojo/system/raw_channel_posix.cc351
-rw-r--r--mojo/system/raw_channel_posix_unittest.cc467
8 files changed, 984 insertions, 10 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 43c9961..d953fb8 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -86,6 +86,9 @@
'system/message_pipe.h',
'system/message_pipe_dispatcher.cc',
'system/message_pipe_dispatcher.h',
+ 'system/platform_channel_handle.h',
+ 'system/raw_channel.h',
+ 'system/raw_channel_posix.cc',
'system/simple_dispatcher.cc',
'system/simple_dispatcher.h',
'system/waiter.cc',
@@ -114,6 +117,7 @@
'system/dispatcher_unittest.cc',
'system/message_pipe_dispatcher_unittest.cc',
'system/message_pipe_unittest.cc',
+ 'system/raw_channel_posix_unittest.cc',
'system/simple_dispatcher_unittest.cc',
'system/test_utils.h',
'system/waiter_list_unittest.cc',
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index 5a39ded..ba05957 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -10,6 +10,8 @@
#include <new>
#include "base/basictypes.h"
+#include "base/logging.h"
+#include "build/build_config.h"
#include "mojo/system/limits.h"
namespace mojo {
@@ -21,16 +23,33 @@ COMPILE_ASSERT(static_cast<uint64_t>(sizeof(MessageInTransit)) +
kMaxMessageNumBytes <= 0x7fffffff,
kMaxMessageNumBytes_too_big);
+COMPILE_ASSERT(sizeof(MessageInTransit) %
+ MessageInTransit::kMessageAlignment == 0,
+ sizeof_MessageInTransit_not_a_multiple_of_alignment);
+
+// C++ requires that storage be declared (in a single compilation unit), but
+// MSVS isn't standards-conformant and doesn't handle this correctly.
+#if !defined(COMPILER_MSVC)
+const size_t MessageInTransit::kMessageAlignment;
+#endif
+
// static
MessageInTransit* MessageInTransit::Create(const void* bytes,
uint32_t num_bytes) {
- // Store the data immediately after the "header", so allocate the needed
- // space.
- char* buffer = static_cast<char*>(
- malloc(sizeof(MessageInTransit) + static_cast<size_t>(num_bytes)));
- // And do a placement new.
+ const size_t size_with_header = sizeof(MessageInTransit) + num_bytes;
+ const size_t size_with_header_and_padding =
+ RoundUpMessageAlignment(size_with_header);
+
+ char* buffer = static_cast<char*>(malloc(size_with_header_and_padding));
+ DCHECK_EQ(reinterpret_cast<size_t>(buffer) %
+ MessageInTransit::kMessageAlignment, 0u);
+
+ // The buffer consists of the header (a |MessageInTransit|, constructed using
+ // a placement new), followed by the data, followed by padding (of zeros).
MessageInTransit* rv = new (buffer) MessageInTransit(num_bytes);
memcpy(buffer + sizeof(MessageInTransit), bytes, num_bytes);
+ memset(buffer + size_with_header, 0,
+ size_with_header_and_padding - size_with_header);
return rv;
}
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index be37b98..693e876 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -9,13 +9,14 @@
#include <stdlib.h> // For |free()|.
#include "base/basictypes.h"
+#include "mojo/public/system/system_export.h"
namespace mojo {
namespace system {
// This class is used to represent data in transit. It is thread-unsafe.
// Note: This class is POD.
-class MessageInTransit {
+class MOJO_SYSTEM_EXPORT MessageInTransit {
public:
// Creates a |MessageInTransit| with the data given by |bytes|/|num_bytes|.
static MessageInTransit* Create(const void* bytes, uint32_t num_bytes);
@@ -27,7 +28,7 @@ class MessageInTransit {
}
// Gets the size of the data (in number of bytes).
- uint32_t size() const {
+ uint32_t data_size() const {
return size_;
}
@@ -36,8 +37,21 @@ class MessageInTransit {
return reinterpret_cast<const char*>(this) + sizeof(*this);
}
+ size_t size_with_header_and_padding() const {
+ return RoundUpMessageAlignment(sizeof(*this) + size_);
+ }
+
// TODO(vtl): Add whatever's necessary to transport handles.
+ // Messages (the header and data) must always be aligned to a multiple of this
+ // quantity (which must be a power of 2).
+ static const size_t kMessageAlignment = 8;
+
+ // Rounds |n| up to a multiple of |kMessageAlignment|.
+ static inline size_t RoundUpMessageAlignment(size_t n) {
+ return (n + kMessageAlignment - 1) & ~(kMessageAlignment - 1);
+ }
+
private:
explicit MessageInTransit(uint32_t size)
: size_(size), reserved_(0), user_1_(0), user_2_(0) {}
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index c84959c..8cd94cb 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -126,9 +126,9 @@ MojoResult MessagePipe::ReadMessage(unsigned port,
bool not_enough_space = false;
MessageInTransit* const message = message_queues_[port].front();
if (num_bytes)
- *num_bytes = message->size();
- if (message->size() <= max_bytes)
- memcpy(bytes, message->data(), message->size());
+ *num_bytes = message->data_size();
+ if (message->data_size() <= max_bytes)
+ memcpy(bytes, message->data(), message->data_size());
else
not_enough_space = true;
diff --git a/mojo/system/platform_channel_handle.h b/mojo/system/platform_channel_handle.h
new file mode 100644
index 0000000..b848aef
--- /dev/null
+++ b/mojo/system/platform_channel_handle.h
@@ -0,0 +1,26 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_PLATFORM_CHANNEL_HANDLE_H_
+#define MOJO_SYSTEM_PLATFORM_CHANNEL_HANDLE_H_
+
+#include "build/build_config.h"
+
+namespace mojo {
+namespace system {
+
+#if defined(OS_POSIX)
+struct PlatformChannelHandle {
+ explicit PlatformChannelHandle(int fd) : fd(fd) {}
+
+ int fd;
+};
+#else
+#error "Platform not yet supported."
+#endif
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_PLATFORM_CHANNEL_HANDLE_H_
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
new file mode 100644
index 0000000..c56dbb5
--- /dev/null
+++ b/mojo/system/raw_channel.h
@@ -0,0 +1,93 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_RAW_CHANNEL_H_
+#define MOJO_SYSTEM_RAW_CHANNEL_H_
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "mojo/public/system/system_export.h"
+#include "mojo/system/limits.h"
+
+namespace base {
+class MessageLoop;
+}
+
+namespace mojo {
+namespace system {
+
+class MessageInTransit;
+
+// This simply wraps an |int| file descriptor on POSIX and a |HANDLE| on
+// Windows, but we don't want to impose, e.g., the inclusion of windows.h on
+// everyone.
+struct PlatformChannelHandle;
+
+// |RawChannel| is an interface to objects that wrap an OS "pipe". It presents
+// the following interface to users:
+// - Receives and dispatches messages on a thread (running a |MessageLoop|; it
+// must be a |MessageLoopForIO| in the case of the POSIX libevent
+// implementation).
+// - Provides a thread-safe way of writing messages (|WriteMessage()|);
+// writing/queueing messages will not block and is atomic from the point of
+// view of the caller. If necessary, messages are queued (to be written on
+// the aforementioned thread).
+//
+// OS-specific implementation subclasses are to be instantiated using the
+// |Create()| static factory method.
+//
+// With the exception of |WriteMessage()|, this class is thread-unsafe (and in
+// general its methods should only be used on the I/O thread).
+class MOJO_SYSTEM_EXPORT RawChannel {
+ public:
+ virtual ~RawChannel() {}
+
+ // The |Delegate| is only accessed on the same thread as the message loop
+ // (passed in on creation).
+ class Delegate {
+ public:
+ // Called when a message is read. This may call |Shutdown()| on the
+ // |RawChannel|, but must not destroy it.
+ virtual void OnReadMessage(const MessageInTransit& message) = 0;
+
+ protected:
+ virtual ~Delegate() {}
+ };
+
+ // Static factory method. Takes ownership of |handle| (i.e., will close it).
+ // Does *not* take ownership of |delegate| and |message_loop|, which must
+ // remain alive while this object does.
+ static RawChannel* Create(const PlatformChannelHandle& handle,
+ Delegate* delegate,
+ base::MessageLoop* message_loop);
+
+ // This must be called (on the I/O thread) before this object is used.
+ virtual void Init() = 0;
+
+ // This must be called (on the I/O thread) before this object is destroyed.
+ virtual void Shutdown() = 0;
+
+ // This is thread-safe. It takes ownership of |message| (always, even on
+ // failure). Returns true on success.
+ virtual bool WriteMessage(MessageInTransit* message) = 0;
+
+ protected:
+ RawChannel(Delegate* delegate, base::MessageLoop* message_loop)
+ : delegate_(delegate), message_loop_(message_loop) {}
+
+ Delegate* delegate() { return delegate_; }
+ base::MessageLoop* message_loop() { return message_loop_; }
+
+ private:
+ Delegate* const delegate_;
+ base::MessageLoop* const message_loop_;
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannel);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_RAW_CHANNEL_H_
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
new file mode 100644
index 0000000..d81d1bf
--- /dev/null
+++ b/mojo/system/raw_channel_posix.cc
@@ -0,0 +1,351 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/raw_channel.h"
+
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <deque>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/compiler_specific.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/posix/eintr_wrapper.h"
+#include "base/synchronization/lock.h"
+#include "mojo/system/message_in_transit.h"
+#include "mojo/system/platform_channel_handle.h"
+
+namespace mojo {
+namespace system {
+
+const size_t kReadSize = 4096;
+
+class RawChannelPosix : public RawChannel,
+ public base::MessageLoopForIO::Watcher {
+ public:
+ RawChannelPosix(const PlatformChannelHandle& handle,
+ Delegate* delegate,
+ base::MessageLoop* message_loop);
+ virtual ~RawChannelPosix();
+
+ // |RawChannel| implementation:
+ virtual void Init() OVERRIDE;
+ virtual void Shutdown() OVERRIDE;
+ virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
+
+ private:
+ // |base::MessageLoopForIO::Watcher| implementation:
+ virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
+ virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
+
+ // Watches for |fd_| to become writable. Must be called on the I/O thread.
+ void WaitToWrite();
+
+ // Writes the message at the front of |write_message_queue_|, starting at
+ // |write_message_offset_|. It removes and destroys if the write completes and
+ // otherwise updates |write_message_offset_|. Returns true on success. Must be
+ // called under |write_lock_|.
+ bool WriteFrontMessageNoLock();
+
+ // Cancels all pending writes and destroys the contents of
+ // |write_message_queue_|. Should only be called if |is_writable_| is true;
+ // sets |is_writable_| to false. Must be called under |write_lock_|.
+ void CancelPendingWritesNoLock();
+
+ base::MessageLoopForIO* message_loop_for_io() {
+ return static_cast<base::MessageLoopForIO*>(message_loop());
+ }
+
+ const int fd_;
+
+ // Only used on the I/O thread:
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
+ scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
+
+ // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
+ // is always aligned with a message boundary (we will copy memory to ensure
+ // this), but |read_buffer_| may be larger than the actual number of bytes we
+ // have.
+ std::vector<char> read_buffer_;
+ size_t read_buffer_num_valid_bytes_;
+
+ base::Lock write_lock_; // Protects the following members.
+ bool is_writable_;
+ std::deque<MessageInTransit*> write_message_queue_;
+ size_t write_message_offset_;
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
+};
+
+RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
+ Delegate* delegate,
+ base::MessageLoop* message_loop)
+ : RawChannel(delegate, message_loop),
+ fd_(handle.fd),
+ read_buffer_num_valid_bytes_(0),
+ is_writable_(true),
+ write_message_offset_(0) {
+ CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
+}
+
+RawChannelPosix::~RawChannelPosix() {
+ close(fd_);
+
+ // These must have been shut down/destroyed on the I/O thread.
+ DCHECK(!read_watcher_.get());
+ DCHECK(!write_watcher_.get());
+}
+
+void RawChannelPosix::Init() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+
+ DCHECK(!read_watcher_.get());
+ read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+ DCHECK(!write_watcher_.get());
+ write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
+
+ // No need to take the lock. No one should be using us yet.
+ DCHECK(write_message_queue_.empty());
+
+ bool result = message_loop_for_io()->WatchFileDescriptor(
+ fd_, true, base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
+ DCHECK(result);
+}
+
+void RawChannelPosix::Shutdown() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+
+ base::AutoLock locker(write_lock_);
+ if (is_writable_)
+ CancelPendingWritesNoLock();
+
+ read_watcher_.reset(); // This will stop watching (if necessary).
+ write_watcher_.reset(); // This will stop watching (if necessary).
+}
+
+// Reminder: This must be thread-safe, and takes ownership of |message| on
+// success.
+bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
+ base::AutoLock locker(write_lock_);
+ if (!is_writable_) {
+ message->Destroy();
+ return false;
+ }
+
+ if (!write_message_queue_.empty()) {
+ write_message_queue_.push_back(message);
+ return true;
+ }
+
+ write_message_queue_.push_front(message);
+ DCHECK_EQ(write_message_offset_, 0u);
+ bool result = WriteFrontMessageNoLock();
+ DCHECK(result || write_message_queue_.empty());
+
+ if (!write_message_queue_.empty()) {
+ // Set up to wait for the FD to become writable. If we're not on the I/O
+ // thread, we have to post a task to do this.
+ if (base::MessageLoop::current() == message_loop()) {
+ WaitToWrite();
+ } else {
+ message_loop()->PostTask(FROM_HERE,
+ base::Bind(&RawChannelPosix::WaitToWrite,
+ base::Unretained(this)));
+ }
+ }
+
+ return result;
+}
+
+void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
+ DCHECK_EQ(fd, fd_);
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+
+ bool did_dispatch_message = false;
+ // Tracks the offset of the first undispatched message in |read_buffer_|.
+ // Currently, we copy data to ensure that this is zero at the beginning.
+ size_t read_buffer_start = 0;
+ for (;;) {
+ if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
+ < kReadSize) {
+ // Use power-of-2 buffer sizes.
+ // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
+ // maximum message size to whatever extent necessary).
+ // TODO(vtl): We may often be able to peek at the header and get the real
+ // required extra space (which may be much bigger than |kReadSize|).
+ size_t new_size = std::max(read_buffer_.size(), kReadSize);
+ while (new_size <
+ read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
+ new_size *= 2;
+
+ // TODO(vtl): It's suboptimal to zero out the fresh memory.
+ read_buffer_.resize(new_size, 0);
+ }
+
+ ssize_t bytes_read = HANDLE_EINTR(
+ read(fd_,
+ &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
+ kReadSize));
+ if (bytes_read < 0) {
+ base::AutoLock locker(write_lock_);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "read";
+ CancelPendingWritesNoLock();
+ return;
+ }
+
+ break;
+ }
+
+ read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
+
+ // Dispatch all the messages that we can.
+ while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) {
+ const MessageInTransit* message =
+ reinterpret_cast<const MessageInTransit*>(
+ &read_buffer_[read_buffer_start]);
+ DCHECK_EQ(reinterpret_cast<size_t>(message) %
+ MessageInTransit::kMessageAlignment, 0u);
+ // If we have the header, not the whole message....
+ if (read_buffer_num_valid_bytes_ <
+ message->size_with_header_and_padding())
+ break;
+
+ // Dispatch the message.
+ delegate()->OnReadMessage(*message);
+ if (!read_watcher_.get()) {
+ // |Shutdown()| was called in |OnReadMessage()|.
+ // TODO(vtl): Add test for this case.
+ return;
+ }
+ did_dispatch_message = true;
+
+ // Update our state.
+ read_buffer_start += message->size_with_header_and_padding();
+ read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding();
+ }
+
+ // If we dispatched any messages, stop reading for now (and let the message
+ // loop do its thing for another round).
+ // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
+ // a single message. Risks: slower, more complex if we want to avoid lots of
+ // copying. ii. Keep reading until there's no more data and dispatch all the
+ // messages we can. Risks: starvation of other users of the message loop.)
+ if (did_dispatch_message)
+ break;
+
+ // If we didn't max out |kReadSize|, stop reading for now.
+ if (static_cast<size_t>(bytes_read) < kReadSize)
+ break;
+
+ // Else try to read some more....
+ }
+
+ // Move data back to start.
+ if (read_buffer_start > 0) {
+ memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
+ read_buffer_num_valid_bytes_);
+ read_buffer_start = 0;
+ }
+}
+
+void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
+ DCHECK_EQ(fd, fd_);
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+
+ base::AutoLock locker(write_lock_);
+ DCHECK(is_writable_);
+ DCHECK(!write_message_queue_.empty());
+
+ bool result = WriteFrontMessageNoLock();
+ DCHECK(result || write_message_queue_.empty());
+
+ if (!write_message_queue_.empty())
+ WaitToWrite();
+}
+
+void RawChannelPosix::WaitToWrite() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+
+ DCHECK(write_watcher_.get());
+ bool result = message_loop_for_io()->WatchFileDescriptor(
+ fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(),
+ this);
+ DCHECK(result);
+}
+
+bool RawChannelPosix::WriteFrontMessageNoLock() {
+ write_lock_.AssertAcquired();
+
+ DCHECK(is_writable_);
+ DCHECK(!write_message_queue_.empty());
+
+ MessageInTransit* message = write_message_queue_.front();
+ DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
+ size_t bytes_to_write =
+ message->size_with_header_and_padding() - write_message_offset_;
+ ssize_t bytes_written = HANDLE_EINTR(
+ write(fd_,
+ reinterpret_cast<char*>(message) + write_message_offset_,
+ bytes_to_write));
+ if (bytes_written < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PLOG(ERROR) << "write of size " << bytes_to_write;
+ CancelPendingWritesNoLock();
+ return false;
+ }
+
+ // We simply failed to write since we'd block. The logic is the same as if
+ // we got a partial write.
+ bytes_written = 0;
+ }
+
+ DCHECK_GE(bytes_written, 0);
+ if (static_cast<size_t>(bytes_written) < bytes_to_write) {
+ // Partial (or no) write.
+ write_message_offset_ += static_cast<size_t>(bytes_written);
+ } else {
+ // Complete write.
+ DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
+ write_message_queue_.pop_front();
+ write_message_offset_ = 0;
+ message->Destroy();
+ }
+
+ return true;
+}
+
+void RawChannelPosix::CancelPendingWritesNoLock() {
+ write_lock_.AssertAcquired();
+ DCHECK(is_writable_);
+
+ is_writable_ = false;
+ for (std::deque<MessageInTransit*>::iterator it =
+ write_message_queue_.begin(); it != write_message_queue_.end();
+ ++it) {
+ (*it)->Destroy();
+ }
+ write_message_queue_.clear();
+}
+
+// -----------------------------------------------------------------------------
+
+// Static factory method declared in raw_channel.h.
+// static
+RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
+ Delegate* delegate,
+ base::MessageLoop* message_loop) {
+ return new RawChannelPosix(handle, delegate, message_loop);
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc
new file mode 100644
index 0000000..8599996
--- /dev/null
+++ b/mojo/system/raw_channel_posix_unittest.cc
@@ -0,0 +1,467 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// TODO(vtl): Factor out the POSIX-specific bits of this test (once we have a
+// non-POSIX implementation).
+
+#include "mojo/system/raw_channel.h"
+
+#include <fcntl.h>
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/compiler_specific.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/scoped_vector.h"
+#include "base/message_loop/message_loop.h"
+#include "base/posix/eintr_wrapper.h"
+#include "base/rand_util.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "base/threading/simple_thread.h"
+#include "base/threading/thread.h"
+#include "base/time/time.h"
+#include "mojo/system/message_in_transit.h"
+#include "mojo/system/platform_channel_handle.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+void PostTaskAndWaitHelper(base::WaitableEvent* event,
+ const base::Closure& task) {
+ task.Run();
+ event->Signal();
+}
+
+void PostTaskAndWait(base::MessageLoop* message_loop,
+ const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ base::WaitableEvent event(false, false);
+ message_loop->PostTask(from_here,
+ base::Bind(&PostTaskAndWaitHelper, &event, task));
+ event.Wait();
+}
+
+// -----------------------------------------------------------------------------
+
+MessageInTransit* MakeTestMessage(uint32_t num_bytes) {
+ std::vector<unsigned char> bytes(num_bytes, 0);
+ for (size_t i = 0; i < num_bytes; i++)
+ bytes[i] = static_cast<unsigned char>(i + num_bytes);
+ return MessageInTransit::Create(bytes.data(), num_bytes);
+}
+
+bool CheckMessageData(const void* bytes, uint32_t num_bytes) {
+ const unsigned char* b = static_cast<const unsigned char*>(bytes);
+ for (uint32_t i = 0; i < num_bytes; i++) {
+ if (b[i] != static_cast<unsigned char>(i + num_bytes))
+ return false;
+ }
+ return true;
+}
+
+// -----------------------------------------------------------------------------
+
+class RawChannelPosixTest : public testing::Test {
+ public:
+ RawChannelPosixTest() : io_thread_("io_thread") {
+ fds_[0] = -1;
+ fds_[1] = -1;
+ }
+
+ virtual ~RawChannelPosixTest() {
+ }
+
+ virtual void SetUp() OVERRIDE {
+ io_thread_.StartWithOptions(
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
+
+ // Create the socket.
+ PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_) == 0);
+
+ // Set the ends to non-blocking.
+ PCHECK(fcntl(fds_[0], F_SETFL, O_NONBLOCK) == 0);
+ PCHECK(fcntl(fds_[1], F_SETFL, O_NONBLOCK) == 0);
+ }
+
+ virtual void TearDown() OVERRIDE {
+ if (fds_[0] != -1)
+ close(fds_[0]);
+ if (fds_[1] != -1)
+ close(fds_[1]);
+
+ io_thread_.Stop();
+ }
+
+ protected:
+ int fd(size_t i) { return fds_[i]; }
+ void clear_fd(size_t i) { fds_[i] = -1; }
+
+ base::MessageLoop* message_loop() { return io_thread_.message_loop(); }
+
+ private:
+ base::Thread io_thread_;
+ int fds_[2];
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannelPosixTest);
+};
+
+// RawChannelPosixTest.WriteMessage --------------------------------------------
+
+class WriteOnlyRawChannelDelegate : public RawChannel::Delegate {
+ public:
+ WriteOnlyRawChannelDelegate() {}
+ virtual ~WriteOnlyRawChannelDelegate() {}
+
+ // |RawChannel::Delegate| implementation:
+ virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE {
+ NOTREACHED();
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate);
+};
+
+static const int64_t kMessageReaderSleepMs = 1;
+static const size_t kMessageReaderMaxPollIterations = 3000;
+
+class TestMessageReaderAndChecker {
+ public:
+ explicit TestMessageReaderAndChecker(int fd) : fd_(fd) {}
+ ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); }
+
+ bool ReadAndCheckNextMessage(uint32_t expected_size) {
+ unsigned char buffer[4096];
+
+ for (size_t i = 0; i < kMessageReaderMaxPollIterations;) {
+ ssize_t read_size = HANDLE_EINTR(read(fd_, buffer, sizeof(buffer)));
+ if (read_size < 0) {
+ PCHECK(errno == EAGAIN || errno == EWOULDBLOCK);
+ read_size = 0;
+ }
+
+ // Append newly-read data to |bytes_|.
+ bytes_.insert(bytes_.end(), buffer, buffer + read_size);
+
+ // If we have the header....
+ if (bytes_.size() >= sizeof(MessageInTransit)) {
+ const MessageInTransit* message =
+ reinterpret_cast<const MessageInTransit*>(bytes_.data());
+ CHECK_EQ(reinterpret_cast<size_t>(message) %
+ MessageInTransit::kMessageAlignment, 0u);
+
+ if (message->data_size() != expected_size) {
+ LOG(ERROR) << "Wrong size: " << message->data_size() << " instead of "
+ << expected_size << " bytes.";
+ return false;
+ }
+
+ // If we've read the whole message....
+ if (bytes_.size() >= message->size_with_header_and_padding()) {
+ if (!CheckMessageData(message->data(), message->data_size())) {
+ LOG(ERROR) << "Incorrect message data.";
+ return false;
+ }
+
+ // Erase message data.
+ bytes_.erase(bytes_.begin(),
+ bytes_.begin() +
+ message->size_with_header_and_padding());
+ return true;
+ }
+ }
+
+ if (static_cast<size_t>(read_size) < sizeof(buffer)) {
+ i++;
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs));
+ }
+ }
+
+ LOG(ERROR) << "Too many iterations.";
+ return false;
+ }
+
+ private:
+ const int fd_;
+
+ // The start of the received data should always be on a message boundary.
+ std::vector<unsigned char> bytes_;
+
+ DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker);
+};
+
+// Tests writing (and verifies reading using our own custom reader).
+TEST_F(RawChannelPosixTest, WriteMessage) {
+ WriteOnlyRawChannelDelegate delegate;
+ scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
+ &delegate,
+ message_loop()));
+ // |RawChannel::Create()| takes ownership of the FD.
+ clear_fd(0);
+
+ TestMessageReaderAndChecker checker(fd(1));
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(rc.get())));
+
+ // Write and read, for a variety of sizes.
+ for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
+ rc->WriteMessage(MakeTestMessage(size));
+ EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
+ }
+
+ // Write/queue and read afterwards, for a variety of sizes.
+ for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
+ rc->WriteMessage(MakeTestMessage(size));
+ for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
+ EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(rc.get())));
+}
+
+// RawChannelPosixTest.OnReadMessage -------------------------------------------
+
+class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
+ public:
+ ReadCheckerRawChannelDelegate()
+ : done_event_(false, false),
+ position_(0) {}
+ virtual ~ReadCheckerRawChannelDelegate() {}
+
+ // |RawChannel::Delegate| implementation:
+ // Called on the I/O thread.
+ virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
+ size_t position;
+ size_t expected_size;
+ bool should_signal = false;
+ {
+ base::AutoLock locker(lock_);
+ CHECK_LT(position_, expected_sizes_.size());
+ position = position_;
+ expected_size = expected_sizes_[position];
+ position_++;
+ if (position_ >= expected_sizes_.size())
+ should_signal = true;
+ }
+
+ EXPECT_EQ(expected_size, message.data_size()) << position;
+ if (message.data_size() == expected_size) {
+ EXPECT_TRUE(CheckMessageData(message.data(), message.data_size()))
+ << position;
+ }
+
+ if (should_signal)
+ done_event_.Signal();
+ }
+
+ // Wait for all the messages (of sizes |expected_sizes_|) to be seen.
+ void Wait() {
+ done_event_.Wait();
+ }
+
+ void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) {
+ base::AutoLock locker(lock_);
+ CHECK_EQ(position_, expected_sizes_.size());
+ expected_sizes_ = expected_sizes;
+ position_ = 0;
+ }
+
+ private:
+ base::WaitableEvent done_event_;
+
+ base::Lock lock_; // Protects the following members.
+ std::vector<uint32_t> expected_sizes_;
+ size_t position_;
+
+ DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate);
+};
+
+// Tests reading (writing using our own custom writer).
+TEST_F(RawChannelPosixTest, OnReadMessage) {
+ // We're going to write to |fd(1)|. We'll do so in a blocking manner.
+ PCHECK(fcntl(fd(1), F_SETFL, 0) == 0);
+
+ ReadCheckerRawChannelDelegate delegate;
+ scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
+ &delegate,
+ message_loop()));
+ // |RawChannel::Create()| takes ownership of the FD.
+ clear_fd(0);
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(rc.get())));
+
+ // Write and read, for a variety of sizes.
+ for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
+ delegate.SetExpectedSizes(std::vector<uint32_t>(1, size));
+ MessageInTransit* message = MakeTestMessage(size);
+ EXPECT_EQ(static_cast<ssize_t>(message->size_with_header_and_padding()),
+ write(fd(1), message, message->size_with_header_and_padding()));
+ message->Destroy();
+ delegate.Wait();
+ }
+
+ // Set up reader and write as fast as we can.
+ // Write/queue and read afterwards, for a variety of sizes.
+ std::vector<uint32_t> expected_sizes;
+ for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
+ expected_sizes.push_back(size);
+ delegate.SetExpectedSizes(expected_sizes);
+ for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
+ MessageInTransit* message = MakeTestMessage(size);
+ EXPECT_EQ(static_cast<ssize_t>(message->size_with_header_and_padding()),
+ write(fd(1), message, message->size_with_header_and_padding()));
+ message->Destroy();
+ }
+ delegate.Wait();
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(rc.get())));
+}
+
+// RawChannelPosixTest.WriteMessageAndOnReadMessage ----------------------------
+
+class RawChannelWriterThread : public base::SimpleThread {
+ public:
+ RawChannelWriterThread(RawChannel* raw_channel, size_t write_count)
+ : base::SimpleThread("raw_channel_writer_thread"),
+ raw_channel_(raw_channel),
+ left_to_write_(write_count) {
+ }
+
+ virtual ~RawChannelWriterThread() {
+ Join();
+ }
+
+ private:
+ virtual void Run() OVERRIDE {
+ static const int kMaxRandomMessageSize = 25000;
+
+ while (left_to_write_-- > 0) {
+ raw_channel_->WriteMessage(MakeTestMessage(
+ static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize))));
+ }
+ }
+
+ RawChannel* const raw_channel_;
+ size_t left_to_write_;
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread);
+};
+
+class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
+ public:
+ explicit ReadCountdownRawChannelDelegate(size_t expected_count)
+ : done_event_(false, false),
+ expected_count_(expected_count),
+ count_(0) {}
+ virtual ~ReadCountdownRawChannelDelegate() {}
+
+ // |RawChannel::Delegate| implementation:
+ // Called on the I/O thread.
+ virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
+ EXPECT_LT(count_, expected_count_);
+ count_++;
+
+ EXPECT_TRUE(CheckMessageData(message.data(), message.data_size()));
+
+ if (count_ >= expected_count_)
+ done_event_.Signal();
+ }
+
+ // Wait for all the messages to have been seen.
+ void Wait() {
+ done_event_.Wait();
+ }
+
+ private:
+ base::WaitableEvent done_event_;
+ size_t expected_count_;
+ size_t count_;
+
+ DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate);
+};
+
+TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
+ static const size_t kNumWriterThreads = 10;
+ static const size_t kNumWriteMessagesPerThread = 4000;
+
+ WriteOnlyRawChannelDelegate writer_delegate;
+ scoped_ptr<RawChannel> writer_rc(
+ RawChannel::Create(PlatformChannelHandle(fd(0)),
+ &writer_delegate,
+ message_loop()));
+ // |RawChannel::Create()| takes ownership of the FD.
+ clear_fd(0);
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Init,
+ base::Unretained(writer_rc.get())));
+
+ ReadCountdownRawChannelDelegate reader_delegate(
+ kNumWriterThreads * kNumWriteMessagesPerThread);
+ scoped_ptr<RawChannel> reader_rc(
+ RawChannel::Create(PlatformChannelHandle(fd(1)),
+ &reader_delegate,
+ message_loop()));
+ // |RawChannel::Create()| takes ownership of the FD.
+ clear_fd(1);
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Init,
+ base::Unretained(reader_rc.get())));
+
+ {
+ ScopedVector<RawChannelWriterThread> writer_threads;
+ for (size_t i = 0; i < kNumWriterThreads; i++) {
+ writer_threads.push_back(new RawChannelWriterThread(
+ writer_rc.get(), kNumWriteMessagesPerThread));
+ }
+ for (size_t i = 0; i < writer_threads.size(); i++)
+ writer_threads[i]->Start();
+ } // Joins all the writer threads.
+
+ // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be
+ // any, but we want to know about them.)
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
+
+ // Wait for reading to finish.
+ reader_delegate.Wait();
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(reader_rc.get())));
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(writer_rc.get())));
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo