summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-07 17:49:50 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-07 17:49:50 +0000
commitccf8453db576012646427004578a156a8830c4bf (patch)
tree3b8950f0029c1c3f1d3bcf10993d46660115dc9a
parent2ccb47aa7dd2d32a488743a01dcaa1983cc8ae4c (diff)
downloadchromium_src-ccf8453db576012646427004578a156a8830c4bf.zip
chromium_src-ccf8453db576012646427004578a156a8830c4bf.tar.gz
chromium_src-ccf8453db576012646427004578a156a8830c4bf.tar.bz2
Mojo: First stab at making MessagePipes work across OS pipes.
Given a running RawChannel, one can set up MessagePipes that have one endpoint available locally (in the usual way) and the other endpoint proxied to the other side of the OS-level "pipe" (which presumably has a symmetrical setup -- i.e., another RawChannel, etc.). Currently, this has only been tested in-process, but apart from possible synchronization/bootstrapping issues there's no reason it shouldn't work across processes. (Whatever launches the process will have to begin the bootstrapping by getting an OS pipe between processes and making sure things are appropriately synchronized.) Still to do: - Properly handle errors (e.g., due to the pipe/process dying). - Figure out how to start processes and bootstrap in that situation (and test this). R=darin@chromium.org, darin Review URL: https://codereview.chromium.org/60103005 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@233638 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/compiler_specific.h22
-rw-r--r--mojo/mojo.gyp6
-rw-r--r--mojo/system/channel.cc215
-rw-r--r--mojo/system/channel.h142
-rw-r--r--mojo/system/local_message_pipe_endpoint.cc44
-rw-r--r--mojo/system/local_message_pipe_endpoint.h14
-rw-r--r--mojo/system/message_in_transit.cc27
-rw-r--r--mojo/system/message_in_transit.h57
-rw-r--r--mojo/system/message_pipe.cc129
-rw-r--r--mojo/system/message_pipe.h22
-rw-r--r--mojo/system/message_pipe_dispatcher_unittest.cc13
-rw-r--r--mojo/system/message_pipe_endpoint.cc15
-rw-r--r--mojo/system/message_pipe_endpoint.h27
-rw-r--r--mojo/system/message_pipe_unittest.cc4
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.cc144
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.h94
-rw-r--r--mojo/system/raw_channel.h7
-rw-r--r--mojo/system/raw_channel_posix.cc18
-rw-r--r--mojo/system/raw_channel_posix_unittest.cc27
-rw-r--r--mojo/system/raw_channel_win.cc27
-rw-r--r--mojo/system/remote_message_pipe_posix_unittest.cc342
21 files changed, 1281 insertions, 115 deletions
diff --git a/base/compiler_specific.h b/base/compiler_specific.h
index 07b680b..409a613 100644
--- a/base/compiler_specific.h
+++ b/base/compiler_specific.h
@@ -68,6 +68,28 @@
#endif // COMPILER_MSVC
+// The C++ standard requires that static const members have an out-of-class
+// definition (in a single compilation unit), but MSVC chokes on this (when
+// language extensions, which are required, are enabled). (You're only likely to
+// notice the need for a definition if you take the address of the member or,
+// more commonly, pass it to a function that takes it as a reference argument --
+// probably an STL function.) This macro makes MSVC do the right thing. See
+// http://msdn.microsoft.com/en-us/library/34h23df8(v=vs.100).aspx for more
+// information. Use like:
+//
+// In .h file:
+// struct Foo {
+// static const int kBar = 5;
+// };
+//
+// In .cc file:
+// STATIC_CONST_MEMBER_DEFINITION const int Foo::kBar;
+#if defined(COMPILER_MSVC)
+#define STATIC_CONST_MEMBER_DEFINITION __declspec(selectany)
+#else
+#define STATIC_CONST_MEMBER_DEFINITION
+#endif
+
// Annotate a variable indicating it's ok if the variable is not used.
// (Typically used to silence a compiler warning when the assignment
// is important for some other reason.)
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 9374104..dbe1a61 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -80,6 +80,8 @@
'MOJO_SYSTEM_IMPLEMENTATION',
],
'sources': [
+ 'system/channel.cc',
+ 'system/channel.h',
'system/core.cc',
'system/core_impl.cc',
'system/core_impl.h',
@@ -99,8 +101,11 @@
'system/message_pipe_endpoint.cc',
'system/message_pipe_endpoint.h',
'system/platform_channel_handle.h',
+ 'system/proxy_message_pipe_endpoint.cc',
+ 'system/proxy_message_pipe_endpoint.h',
'system/raw_channel.h',
'system/raw_channel_posix.cc',
+ 'system/raw_channel_win.cc',
'system/simple_dispatcher.cc',
'system/simple_dispatcher.h',
'system/waiter.cc',
@@ -130,6 +135,7 @@
'system/message_pipe_dispatcher_unittest.cc',
'system/message_pipe_unittest.cc',
'system/raw_channel_posix_unittest.cc',
+ 'system/remote_message_pipe_posix_unittest.cc',
'system/simple_dispatcher_unittest.cc',
'system/test_utils.cc',
'system/test_utils.h',
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
new file mode 100644
index 0000000..095e44c
--- /dev/null
+++ b/mojo/system/channel.cc
@@ -0,0 +1,215 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/channel.h"
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/strings/stringprintf.h"
+
+namespace mojo {
+namespace system {
+
+COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
+ MessageInTransit::kInvalidEndpointId,
+ kBootstrapEndpointId_is_invalid);
+
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
+ Channel::kBootstrapEndpointId;
+
+Channel::EndpointInfo::EndpointInfo() {
+}
+
+Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
+ unsigned port)
+ : message_pipe(message_pipe),
+ port(port) {
+}
+
+Channel::EndpointInfo::~EndpointInfo() {
+}
+
+Channel::Channel()
+ : next_local_id_(kBootstrapEndpointId) {
+}
+
+bool Channel::Init(const PlatformChannelHandle& handle) {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
+ // No need to take |lock_|, since this must be called before this object
+ // becomes thread-safe.
+ DCHECK(!raw_channel_.get());
+
+ raw_channel_.reset(
+ RawChannel::Create(handle, this, base::MessageLoop::current()));
+ if (!raw_channel_->Init()) {
+ raw_channel_.reset();
+ return false;
+ }
+
+ return true;
+}
+
+void Channel::Shutdown() {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
+ base::AutoLock locker(lock_);
+ DCHECK(raw_channel_.get());
+ raw_channel_->Shutdown();
+ raw_channel_.reset();
+
+ // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
+ // it's empty?
+}
+
+MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
+ scoped_refptr<MessagePipe> message_pipe, unsigned port) {
+ MessageInTransit::EndpointId local_id;
+ {
+ base::AutoLock locker(lock_);
+
+ while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
+ local_id_to_endpoint_info_map_.find(next_local_id_) !=
+ local_id_to_endpoint_info_map_.end())
+ next_local_id_++;
+
+ local_id = next_local_id_;
+ next_local_id_++;
+
+ // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
+ // some expensive reference count increment/decrements.) Once this is done,
+ // we should be able to delete |EndpointInfo|'s default constructor.
+ local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
+ }
+
+ message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
+ return local_id;
+}
+
+void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id) {
+ EndpointInfo endpoint_info;
+ {
+ base::AutoLock locker(lock_);
+
+ IdToEndpointInfoMap::const_iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ CHECK(it != local_id_to_endpoint_info_map_.end());
+ endpoint_info = it->second;
+ }
+
+ endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
+}
+
+bool Channel::WriteMessage(MessageInTransit* message) {
+ base::AutoLock locker(lock_);
+ if (!raw_channel_.get()) {
+ // TODO(vtl): I think this is probably not an error condition, but I should
+ // think about it (and the shutdown sequence) more carefully.
+ LOG(INFO) << "WriteMessage() after shutdown";
+ return false;
+ }
+
+ return raw_channel_->WriteMessage(message);
+}
+
+void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
+
+ base::AutoLock locker_(lock_);
+ local_id_to_endpoint_info_map_.erase(local_id);
+}
+
+Channel::~Channel() {
+ // The channel should have been shut down first.
+ DCHECK(!raw_channel_.get());
+}
+
+void Channel::OnReadMessage(const MessageInTransit& message) {
+ switch (message.type()) {
+ case MessageInTransit::kTypeMessagePipeEndpoint:
+ case MessageInTransit::kTypeMessagePipe:
+ OnReadMessageForDownstream(message);
+ break;
+ case MessageInTransit::TYPE_CHANNEL:
+ OnReadMessageForChannel(message);
+ break;
+ default:
+ HandleRemoteError(base::StringPrintf(
+ "Received message of invalid type %u",
+ static_cast<unsigned>(message.type())));
+ break;
+ }
+}
+
+void Channel::OnFatalError(FatalError fatal_error) {
+ // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
+ NOTIMPLEMENTED();
+}
+
+void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
+ DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
+ message.type() == MessageInTransit::kTypeMessagePipe);
+
+ MessageInTransit::EndpointId local_id = message.destination_id();
+ if (local_id == MessageInTransit::kInvalidEndpointId) {
+ HandleRemoteError("Received message with no destination ID");
+ return;
+ }
+
+ EndpointInfo endpoint_info;
+ {
+ base::AutoLock locker(lock_);
+
+ // Since we own |raw_channel_|, and this method and |Shutdown()| should only
+ // be called from the creation thread, |raw_channel_| should never be null
+ // here.
+ DCHECK(raw_channel_.get());
+
+ IdToEndpointInfoMap::const_iterator it =
+ local_id_to_endpoint_info_map_.find(local_id);
+ if (it == local_id_to_endpoint_info_map_.end()) {
+ HandleRemoteError(base::StringPrintf(
+ "Received a message for nonexistent local destination ID %u",
+ static_cast<unsigned>(local_id)));
+ return;
+ }
+ endpoint_info = it->second;
+ }
+
+ // We need to duplicate the message, because |EnqueueMessage()| will take
+ // ownership of it.
+ MessageInTransit* own_message = MessageInTransit::Create(
+ message.type(), message.subtype(), message.data(), message.data_size());
+ if (endpoint_info.message_pipe->EnqueueMessage(
+ MessagePipe::GetPeerPort(endpoint_info.port),
+ own_message) != MOJO_RESULT_OK) {
+ HandleLocalError(base::StringPrintf(
+ "Failed to enqueue message to local destination ID %u",
+ static_cast<unsigned>(local_id)));
+ return;
+ }
+}
+
+void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
+ // TODO(vtl): Currently no channel-only messages yet.
+ HandleRemoteError("Received invalid channel message");
+ NOTREACHED();
+}
+
+void Channel::HandleRemoteError(const base::StringPiece& error_message) {
+ // TODO(vtl): Is this how we really want to handle this?
+ LOG(INFO) << error_message;
+}
+
+void Channel::HandleLocalError(const base::StringPiece& error_message) {
+ // TODO(vtl): Is this how we really want to handle this?
+ LOG(FATAL) << error_message;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/channel.h b/mojo/system/channel.h
new file mode 100644
index 0000000..5e8e2de
--- /dev/null
+++ b/mojo/system/channel.h
@@ -0,0 +1,142 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_CHANNEL_H_
+#define MOJO_SYSTEM_CHANNEL_H_
+
+#include <stdint.h>
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/containers/hash_tables.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/strings/string_piece.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/thread_checker.h"
+#include "mojo/public/system/core.h"
+#include "mojo/public/system/system_export.h"
+#include "mojo/system/message_in_transit.h"
+#include "mojo/system/message_pipe.h"
+#include "mojo/system/raw_channel.h"
+
+namespace base {
+class MessageLoop;
+}
+
+namespace mojo {
+namespace system {
+
+// This class is mostly thread-safe. It must be created on an "I/O thread" (see
+// raw_channel.h). |Init()| must be called on that same thread before it becomes
+// thread-safe (in particular, before references are given to any other thread)
+// and |Shutdown()| must be called on that same thread before destruction. Its
+// public methods are otherwise thread-safe. It may be destroyed on any thread,
+// in the sense that the last reference to it may be released on any thread,
+// with the proviso that |Shutdown()| must have been called first (so the
+// pattern is that a "main" reference is kept on its creation thread and is
+// released after |Shutdown()| is called, but other threads may have temporarily
+// "dangling" references).
+//
+// Note that |MessagePipe| calls into |Channel| and the former's |lock_| must be
+// acquired before the latter's. When |Channel| wants to call into a
+// |MessagePipe|, it must obtain a reference to the |MessagePipe| (from
+// |local_id_to_endpoint_info_map_|) under |Channel::lock_| and then release the
+// lock.
+//
+// Also, care must be taken with respect to references: While a |Channel| has
+// references to |MessagePipe|s, |MessagePipe|s (via |ProxyMessagePipeEndpoint|)
+// may also have references to |Channel|s. These references are set up by
+// calling |AttachMessagePipeEndpoint()|. The reference to |MessagePipe| owned
+// by |Channel| must be removed by calling |DetachMessagePipeEndpoint()| (which
+// is done by |MessagePipe|/|ProxyMessagePipeEndpoint|, which simultaneously
+// removes its reference to |Channel|).
+class MOJO_SYSTEM_EXPORT Channel : public base::RefCountedThreadSafe<Channel>,
+ public RawChannel::Delegate {
+ public:
+ // The first message pipe endpoint attached will have this as its local ID.
+ static const MessageInTransit::EndpointId kBootstrapEndpointId = 1;
+
+ Channel();
+
+ // This must be called on the creation thread before any other methods are
+ // called, and before references to this object are given to any other
+ // threads. Takes ownership of |handle|. Returns true on success. On failure,
+ // no other methods should be called (including |Shutdown()|).
+ bool Init(const PlatformChannelHandle& handle);
+
+ // This must be called on the creation thread before destruction (which can
+ // happen on any thread).
+ void Shutdown();
+
+ // Attaches the given message pipe/port's endpoint (which must be a
+ // |ProxyMessagePipeEndpoint|) to this channel. This assigns it a local ID,
+ // which it returns. The first message pipe endpoint attached will always have
+ // |kBootstrapEndpointId| as its local ID. (For bootstrapping, this occurs on
+ // both sides, so one should use |kBootstrapEndpointId| for the remote ID for
+ // the first message pipe across a channel.)
+ MessageInTransit::EndpointId AttachMessagePipeEndpoint(
+ scoped_refptr<MessagePipe> message_pipe, unsigned port);
+ void RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id);
+
+ // This forwards |message| verbatim to |raw_channel_|.
+ bool WriteMessage(MessageInTransit* message);
+
+ // This removes the message pipe/port's endpoint (with the given local ID,
+ // returned by |AttachMessagePipeEndpoint()| from this channel. After this is
+ // called, |local_id| may be reused for another message pipe.
+ void DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id);
+
+ private:
+ friend class base::RefCountedThreadSafe<Channel>;
+ virtual ~Channel();
+
+ // |RawChannel::Delegate| implementation:
+ virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE;
+ virtual void OnFatalError(FatalError fatal_error) OVERRIDE;
+
+ // Helpers for |OnReadMessage|:
+ void OnReadMessageForDownstream(const MessageInTransit& message);
+ void OnReadMessageForChannel(const MessageInTransit& message);
+
+ // Handles errors (e.g., invalid messages) from the remote side.
+ void HandleRemoteError(const base::StringPiece& error_message);
+ // Handles internal errors/failures from the local side.
+ void HandleLocalError(const base::StringPiece& error_message);
+
+ struct EndpointInfo {
+ EndpointInfo();
+ EndpointInfo(scoped_refptr<MessagePipe> message_pipe, unsigned port);
+ ~EndpointInfo();
+
+ scoped_refptr<MessagePipe> message_pipe;
+ unsigned port;
+ };
+
+ base::ThreadChecker creation_thread_checker_;
+
+ // Note: |MessagePipe|s MUST NOT be used under |lock_|. I.e., |lock_| can only
+ // be acquired after |MessagePipe::lock_|, never before. Thus to call into a
+ // |MessagePipe|, a reference should be acquired from
+ // |local_id_to_endpoint_info_map_| under |lock_| (e.g., by copying the
+ // |EndpointInfo|) and then the lock released.
+ base::Lock lock_; // Protects the members below.
+
+ scoped_ptr<RawChannel> raw_channel_;
+
+ typedef base::hash_map<MessageInTransit::EndpointId, EndpointInfo>
+ IdToEndpointInfoMap;
+ IdToEndpointInfoMap local_id_to_endpoint_info_map_;
+ // The next local ID to try (when allocating new local IDs). Note: It should
+ // be checked for existence before use.
+ MessageInTransit::EndpointId next_local_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(Channel);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_CHANNEL_H_
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc
index 261d44a..8f1cdf3 100644
--- a/mojo/system/local_message_pipe_endpoint.cc
+++ b/mojo/system/local_message_pipe_endpoint.cc
@@ -21,7 +21,18 @@ LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
DCHECK(!is_open_);
}
-void LocalMessagePipeEndpoint::OnPeerClose() {
+void LocalMessagePipeEndpoint::Close() {
+ DCHECK(is_open_);
+ is_open_ = false;
+ for (std::deque<MessageInTransit*>::iterator it = message_queue_.begin();
+ it != message_queue_.end();
+ ++it) {
+ (*it)->Destroy();
+ }
+ message_queue_.clear();
+}
+
+bool LocalMessagePipeEndpoint::OnPeerClose() {
DCHECK(is_open_);
DCHECK(is_peer_open_);
@@ -36,21 +47,16 @@ void LocalMessagePipeEndpoint::OnPeerClose() {
waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
new_satisfiable_flags);
}
+
+ return true;
}
-MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
- const void* bytes, uint32_t num_bytes,
- const MojoHandle* handles, uint32_t num_handles,
- MojoWriteMessageFlags /*flags*/) {
+MojoResult LocalMessagePipeEndpoint::EnqueueMessage(MessageInTransit* message) {
DCHECK(is_open_);
DCHECK(is_peer_open_);
bool was_empty = message_queue_.empty();
-
- // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
- message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes));
- // TODO(vtl): Support sending handles.
-
+ message_queue_.push_back(message);
if (was_empty) {
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
SatisfiableFlags());
@@ -64,17 +70,6 @@ void LocalMessagePipeEndpoint::CancelAllWaiters() {
waiter_list_.CancelAllWaiters();
}
-void LocalMessagePipeEndpoint::Close() {
- DCHECK(is_open_);
- is_open_ = false;
- for (std::deque<MessageInTransit*>::iterator it = message_queue_.begin();
- it != message_queue_.end();
- ++it) {
- (*it)->Destroy();
- }
- message_queue_.clear();
-}
-
MojoResult LocalMessagePipeEndpoint::ReadMessage(
void* bytes, uint32_t* num_bytes,
MojoHandle* handles, uint32_t* num_handles,
@@ -85,8 +80,10 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage(
// TODO(vtl): We'll need this later:
// const uint32_t max_handles = num_handles ? *num_handles : 0;
- if (message_queue_.empty())
- return MOJO_RESULT_NOT_FOUND;
+ if (message_queue_.empty()) {
+ return is_peer_open_ ? MOJO_RESULT_NOT_FOUND :
+ MOJO_RESULT_FAILED_PRECONDITION;
+ }
// TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
// and release the lock immediately.
@@ -161,4 +158,3 @@ MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
} // namespace system
} // namespace mojo
-
diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h
index caed81d..7e8703d 100644
--- a/mojo/system/local_message_pipe_endpoint.h
+++ b/mojo/system/local_message_pipe_endpoint.h
@@ -10,30 +10,26 @@
#include "base/basictypes.h"
#include "base/compiler_specific.h"
#include "mojo/public/system/core.h"
+#include "mojo/public/system/system_export.h"
#include "mojo/system/message_pipe_endpoint.h"
#include "mojo/system/waiter_list.h"
namespace mojo {
namespace system {
-class MessageInTransit;
-
-class LocalMessagePipeEndpoint : public MessagePipeEndpoint {
+class MOJO_SYSTEM_EXPORT LocalMessagePipeEndpoint : public MessagePipeEndpoint {
public:
LocalMessagePipeEndpoint();
virtual ~LocalMessagePipeEndpoint();
// |MessagePipeEndpoint| implementation:
- virtual void OnPeerClose() OVERRIDE;
- virtual MojoResult EnqueueMessage(
- const void* bytes, uint32_t num_bytes,
- const MojoHandle* handles, uint32_t num_handles,
- MojoWriteMessageFlags flags) OVERRIDE;
+ virtual void Close() OVERRIDE;
+ virtual bool OnPeerClose() OVERRIDE;
+ virtual MojoResult EnqueueMessage(MessageInTransit* message) OVERRIDE;
// There's a dispatcher for |LocalMessagePipeEndpoint|s, so we have to
// implement/override these:
virtual void CancelAllWaiters() OVERRIDE;
- virtual void Close() OVERRIDE;
virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes,
MojoHandle* handles, uint32_t* num_handles,
MojoReadMessageFlags flags) OVERRIDE;
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index ba05957..591f68a 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -10,8 +10,8 @@
#include <new>
#include "base/basictypes.h"
+#include "base/compiler_specific.h"
#include "base/logging.h"
-#include "build/build_config.h"
#include "mojo/system/limits.h"
namespace mojo {
@@ -27,14 +27,24 @@ COMPILE_ASSERT(sizeof(MessageInTransit) %
MessageInTransit::kMessageAlignment == 0,
sizeof_MessageInTransit_not_a_multiple_of_alignment);
-// C++ requires that storage be declared (in a single compilation unit), but
-// MSVS isn't standards-conformant and doesn't handle this correctly.
-#if !defined(COMPILER_MSVC)
-const size_t MessageInTransit::kMessageAlignment;
-#endif
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::kTypeMessagePipeEndpoint;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::kTypeMessagePipe;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::TYPE_CHANNEL;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeMessagePipeEndpointData;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeMessagePipePeerClosed;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
+ MessageInTransit::kInvalidEndpointId;
+STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment;
// static
-MessageInTransit* MessageInTransit::Create(const void* bytes,
+MessageInTransit* MessageInTransit::Create(Type type,
+ Subtype subtype,
+ const void* bytes,
uint32_t num_bytes) {
const size_t size_with_header = sizeof(MessageInTransit) + num_bytes;
const size_t size_with_header_and_padding =
@@ -46,7 +56,8 @@ MessageInTransit* MessageInTransit::Create(const void* bytes,
// The buffer consists of the header (a |MessageInTransit|, constructed using
// a placement new), followed by the data, followed by padding (of zeros).
- MessageInTransit* rv = new (buffer) MessageInTransit(num_bytes);
+ MessageInTransit* rv =
+ new (buffer) MessageInTransit(num_bytes, type, subtype);
memcpy(buffer + sizeof(MessageInTransit), bytes, num_bytes);
memset(buffer + size_with_header, 0,
size_with_header_and_padding - size_with_header);
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 77b658f..b072e23 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -18,8 +18,32 @@ namespace system {
// Note: This class is POD.
class MOJO_SYSTEM_EXPORT MessageInTransit {
public:
- // Creates a |MessageInTransit| with the data given by |bytes|/|num_bytes|.
- static MessageInTransit* Create(const void* bytes, uint32_t num_bytes);
+ typedef uint16_t Type;
+ // Messages that are forwarded to |MessagePipeEndpoint|s.
+ static const Type kTypeMessagePipeEndpoint = 0;
+ // Messages that are forwarded to |MessagePipe|s.
+ static const Type kTypeMessagePipe = 1;
+ // Messages that are consumed by the channel.
+ static const Type TYPE_CHANNEL = 2;
+
+ typedef uint16_t Subtype;
+ // Subtypes for type |kTypeMessagePipeEndpoint|:
+ static const Subtype kSubtypeMessagePipeEndpointData = 0;
+ // Subtypes for type |kTypeMessagePipe|:
+ static const Subtype kSubtypeMessagePipePeerClosed = 0;
+
+ typedef uint32_t EndpointId;
+ // Never a valid endpoint ID.
+ static const EndpointId kInvalidEndpointId = 0;
+
+ // Messages (the header and data) must always be aligned to a multiple of this
+ // quantity (which must be a power of 2).
+ static const size_t kMessageAlignment = 8;
+
+ // Creates a |MessageInTransit| of the given |type| and |subtype|, with the
+ // data given by |bytes|/|num_bytes|.
+ static MessageInTransit* Create(Type type, Subtype subtype,
+ const void* bytes, uint32_t num_bytes);
// Destroys a |MessageInTransit| created using |Create()|.
inline void Destroy() {
@@ -41,11 +65,17 @@ class MOJO_SYSTEM_EXPORT MessageInTransit {
return RoundUpMessageAlignment(sizeof(*this) + size_);
}
- // TODO(vtl): Add whatever's necessary to transport handles.
+ Type type() const { return type_; }
+ Subtype subtype() const { return subtype_; }
+ EndpointId source_id() const { return source_id_; }
+ EndpointId destination_id() const { return destination_id_; }
- // Messages (the header and data) must always be aligned to a multiple of this
- // quantity (which must be a power of 2).
- static const size_t kMessageAlignment = 8;
+ void set_source_id(EndpointId source_id) { source_id_ = source_id; }
+ void set_destination_id(EndpointId destination_id) {
+ destination_id_ = destination_id;
+ }
+
+ // TODO(vtl): Add whatever's necessary to transport handles.
// Rounds |n| up to a multiple of |kMessageAlignment|.
static inline size_t RoundUpMessageAlignment(size_t n) {
@@ -53,14 +83,19 @@ class MOJO_SYSTEM_EXPORT MessageInTransit {
}
private:
- explicit MessageInTransit(uint32_t size)
- : size_(size), reserved_(0), user_1_(0), user_2_(0) {}
+ explicit MessageInTransit(uint32_t size, Type type, Subtype subtype)
+ : size_(size),
+ type_(type),
+ subtype_(subtype),
+ source_id_(kInvalidEndpointId),
+ destination_id_(kInvalidEndpointId) {}
// "Header" for the data.
uint32_t size_;
- uint32_t reserved_;
- uint32_t user_1_;
- uint32_t user_2_;
+ Type type_;
+ Subtype subtype_;
+ EndpointId source_id_;
+ EndpointId destination_id_;
// Intentionally unimplemented (and private): Use |Destroy()| instead (which
// simply frees the memory).
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index bfb0646..857cdbc 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -6,22 +6,15 @@
#include "base/logging.h"
#include "base/stl_util.h"
+#include "mojo/system/channel.h"
#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe_endpoint.h"
+#include "mojo/system/proxy_message_pipe_endpoint.h"
namespace mojo {
namespace system {
-namespace {
-
-unsigned DestinationPortFromSourcePort(unsigned port) {
- DCHECK(port == 0 || port == 1);
- return port ^ 1;
-}
-
-} // namespace
-
MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
scoped_ptr<MessagePipeEndpoint> endpoint_1) {
endpoints_[0].reset(endpoint_0.release());
@@ -33,6 +26,12 @@ MessagePipe::MessagePipe() {
endpoints_[1].reset(new LocalMessagePipeEndpoint());
}
+// static
+unsigned MessagePipe::GetPeerPort(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+ return port ^ 1;
+}
+
void MessagePipe::CancelAllWaiters(unsigned port) {
DCHECK(port == 0 || port == 1);
@@ -44,38 +43,36 @@ void MessagePipe::CancelAllWaiters(unsigned port) {
void MessagePipe::Close(unsigned port) {
DCHECK(port == 0 || port == 1);
- unsigned destination_port = DestinationPortFromSourcePort(port);
+ unsigned destination_port = GetPeerPort(port);
base::AutoLock locker(lock_);
DCHECK(endpoints_[port].get());
endpoints_[port]->Close();
- if (endpoints_[destination_port].get())
- endpoints_[destination_port]->OnPeerClose();
+ bool should_destroy_destination = endpoints_[destination_port].get() ?
+ !endpoints_[destination_port]->OnPeerClose() : false;
endpoints_[port].reset();
+ if (should_destroy_destination) {
+ endpoints_[destination_port]->Close();
+ endpoints_[destination_port].reset();
+ }
}
+// TODO(vtl): Support sending handles.
// TODO(vtl): Handle flags.
MojoResult MessagePipe::WriteMessage(
unsigned port,
const void* bytes, uint32_t num_bytes,
- const MojoHandle* handles, uint32_t num_handles,
+ const MojoHandle* /*handles*/, uint32_t /*num_handles*/,
MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
-
- unsigned destination_port = DestinationPortFromSourcePort(port);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port].get());
-
- // The destination port need not be open, unlike the source port.
- if (!endpoints_[destination_port].get())
- return MOJO_RESULT_FAILED_PRECONDITION;
-
- return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
- handles, num_handles,
- flags);
+ return EnqueueMessage(
+ GetPeerPort(port),
+ MessageInTransit::Create(
+ MessageInTransit::kTypeMessagePipeEndpoint,
+ MessageInTransit::kSubtypeMessagePipeEndpointData,
+ bytes, num_bytes));
}
MojoResult MessagePipe::ReadMessage(unsigned port,
@@ -113,6 +110,54 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
endpoints_[port]->RemoveWaiter(waiter);
}
+MojoResult MessagePipe::EnqueueMessage(unsigned port,
+ MessageInTransit* message) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(message);
+
+ if (message->type() == MessageInTransit::kTypeMessagePipe)
+ return HandleControlMessage(port, message);
+
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[GetPeerPort(port)].get());
+
+ // The destination port need not be open, unlike the source port.
+ if (!endpoints_[port].get()) {
+ message->Destroy();
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ return endpoints_[port]->EnqueueMessage(message);
+}
+
+void MessagePipe::Attach(unsigned port,
+ scoped_refptr<Channel> channel,
+ MessageInTransit::EndpointId local_id) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(channel.get());
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port].get());
+
+ endpoints_[port]->Attach(channel, local_id);
+}
+
+void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port].get());
+
+ if (!endpoints_[port]->Run(remote_id)) {
+ endpoints_[port]->Close();
+ endpoints_[port].reset();
+ }
+}
+
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
@@ -121,5 +166,37 @@ MessagePipe::~MessagePipe() {
DCHECK(!endpoints_[1].get());
}
+MojoResult MessagePipe::HandleControlMessage(unsigned port,
+ MessageInTransit* message) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(message);
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe);
+
+ MojoResult rv = MOJO_RESULT_OK;
+ switch (message->subtype()) {
+ case MessageInTransit::kSubtypeMessagePipePeerClosed: {
+ unsigned source_port = GetPeerPort(port);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[source_port].get());
+
+ endpoints_[source_port]->Close();
+ if (endpoints_[port].get())
+ endpoints_[port]->OnPeerClose();
+
+ endpoints_[source_port].reset();
+ break;
+ }
+ default:
+ LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
+ << message->subtype();
+ rv = MOJO_RESULT_UNKNOWN;
+ break;
+ }
+
+ message->Destroy();
+ return rv;
+}
+
} // namespace system
} // namespace mojo
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
index bb2f4ff..67f99d2 100644
--- a/mojo/system/message_pipe.h
+++ b/mojo/system/message_pipe.h
@@ -6,15 +6,18 @@
#define MOJO_SYSTEM_MESSAGE_PIPE_H_
#include "base/basictypes.h"
+#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/lock.h"
#include "mojo/public/system/core.h"
#include "mojo/public/system/system_export.h"
+#include "mojo/system/message_in_transit.h"
namespace mojo {
namespace system {
+class Channel;
class MessagePipeEndpoint;
class Waiter;
@@ -32,6 +35,9 @@ class MOJO_SYSTEM_EXPORT MessagePipe :
// |LocalMessagePipeEndpoint|s.
MessagePipe();
+ // Gets the other port number (i.e., 0 -> 1, 1 -> 0).
+ static unsigned GetPeerPort(unsigned port);
+
// These are called by the dispatcher to implement its methods of
// corresponding names. In all cases, the port |port| must be open.
void CancelAllWaiters(unsigned port);
@@ -54,10 +60,26 @@ class MOJO_SYSTEM_EXPORT MessagePipe :
MojoResult wake_result);
void RemoveWaiter(unsigned port, Waiter* waiter);
+ // This is used internally by |WriteMessage()| and by |Channel| to enqueue
+ // messages (typically to a |LocalMessagePipeEndpoint|). Unlike
+ // |WriteMessage()|, |port| is the *destination* port. Takes ownership of
+ // |message|.
+ MojoResult EnqueueMessage(unsigned port, MessageInTransit* message);
+
+ // These are used by |Channel|.
+ void Attach(unsigned port,
+ scoped_refptr<Channel> channel,
+ MessageInTransit::EndpointId local_id);
+ void Run(unsigned port, MessageInTransit::EndpointId remote_id);
+
private:
friend class base::RefCountedThreadSafe<MessagePipe>;
virtual ~MessagePipe();
+ // Used by |EnqueueMessage()| to handle control messages that are actually
+ // meant for us.
+ MojoResult HandleControlMessage(unsigned port, MessageInTransit* message);
+
base::Lock lock_; // Protects the following members.
scoped_ptr<MessagePipeEndpoint> endpoints_[2];
diff --git a/mojo/system/message_pipe_dispatcher_unittest.cc b/mojo/system/message_pipe_dispatcher_unittest.cc
index 6e7bfcf..32c978b 100644
--- a/mojo/system/message_pipe_dispatcher_unittest.cc
+++ b/mojo/system/message_pipe_dispatcher_unittest.cc
@@ -211,6 +211,14 @@ TEST(MessagePipeDispatcherTest, BasicClosed) {
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
+ // Try reading from |d_1|; should fail (nothing to read).
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ d_1->ReadMessage(buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
// Close |d_1|.
EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
@@ -254,10 +262,11 @@ TEST(MessagePipeDispatcherTest, BasicClosed) {
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
- // Try reading from |d_0|; should fail (nothing to read).
+ // Try reading from |d_0|; should fail (nothing to read and other end
+ // closed).
buffer[0] = 0;
buffer_size = kBufferSize;
- EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
d_0->ReadMessage(buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
diff --git a/mojo/system/message_pipe_endpoint.cc b/mojo/system/message_pipe_endpoint.cc
index 7f91ba2..977ee2b 100644
--- a/mojo/system/message_pipe_endpoint.cc
+++ b/mojo/system/message_pipe_endpoint.cc
@@ -5,6 +5,7 @@
#include "mojo/system/message_pipe_endpoint.h"
#include "base/logging.h"
+#include "mojo/system/channel.h"
namespace mojo {
namespace system {
@@ -13,10 +14,6 @@ void MessagePipeEndpoint::CancelAllWaiters() {
NOTREACHED();
}
-void MessagePipeEndpoint::Close() {
- NOTREACHED();
-}
-
MojoResult MessagePipeEndpoint::ReadMessage(
void* /*bytes*/, uint32_t* /*num_bytes*/,
MojoHandle* /*handles*/, uint32_t* /*num_handles*/,
@@ -36,6 +33,16 @@ void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/) {
NOTREACHED();
}
+void MessagePipeEndpoint::Attach(scoped_refptr<Channel> /*channel*/,
+ MessageInTransit::EndpointId /*local_id*/) {
+ NOTREACHED();
+}
+
+bool MessagePipeEndpoint::Run(MessageInTransit::EndpointId /*remote_id*/) {
+ NOTREACHED();
+ return true;
+}
+
} // namespace system
} // namespace mojo
diff --git a/mojo/system/message_pipe_endpoint.h b/mojo/system/message_pipe_endpoint.h
index 03e6158..14e9897 100644
--- a/mojo/system/message_pipe_endpoint.h
+++ b/mojo/system/message_pipe_endpoint.h
@@ -6,11 +6,15 @@
#define MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_
#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
#include "mojo/public/system/core.h"
+#include "mojo/public/system/system_export.h"
+#include "mojo/system/message_in_transit.h"
namespace mojo {
namespace system {
+class Channel;
class Waiter;
// This is an interface to one of the ends of a message pipe, and is used by
@@ -21,27 +25,26 @@ class Waiter;
// |MessagePipeEndpoint| also implements the functionality required by the
// dispatcher, e.g., to read messages and to wait. Implementations of this class
// are not thread-safe; instances are protected by |MesssagePipe|'s lock.
-class MessagePipeEndpoint {
+class MOJO_SYSTEM_EXPORT MessagePipeEndpoint {
public:
virtual ~MessagePipeEndpoint() {}
// All implementations must implement these.
- virtual void OnPeerClose() = 0;
- virtual MojoResult EnqueueMessage(
- const void* bytes, uint32_t num_bytes,
- const MojoHandle* handles, uint32_t num_handles,
- MojoWriteMessageFlags flags) = 0;
+ virtual void Close() = 0;
+ // Returns false if the endpoint should be closed and destroyed, else true.
+ virtual bool OnPeerClose() = 0;
+ // Takes ownership of |message|.
+ virtual MojoResult EnqueueMessage(MessageInTransit* message) = 0;
// Implementations must override these if they represent a local endpoint,
// i.e., one for which there's a |MessagePipeDispatcher| (and thus a handle).
- // An implementation for a remote endpoint (for which there's no dispatcher)
+ // An implementation for a proxy endpoint (for which there's no dispatcher)
// needs not override these methods, since they should never be called.
//
// These methods implement the methods of the same name in |MessagePipe|,
// though |MessagePipe|'s implementation may have to do a little more if the
// operation involves both endpoints.
virtual void CancelAllWaiters();
- virtual void Close();
virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes,
MojoHandle* handles, uint32_t* num_handles,
MojoReadMessageFlags flags);
@@ -50,6 +53,14 @@ class MessagePipeEndpoint {
MojoResult wake_result);
virtual void RemoveWaiter(Waiter* waiter);
+ // Implementations must override these if they represent a proxy endpoint. An
+ // implementation for a local endpoint needs not override these methods, since
+ // they should never be called.
+ virtual void Attach(scoped_refptr<Channel> channel,
+ MessageInTransit::EndpointId local_id);
+ // Returns false if the endpoint should be closed and destroyed, else true.
+ virtual bool Run(MessageInTransit::EndpointId remote_id);
+
protected:
MessagePipeEndpoint() {}
diff --git a/mojo/system/message_pipe_unittest.cc b/mojo/system/message_pipe_unittest.cc
index d26db74..cf0d6a2 100644
--- a/mojo/system/message_pipe_unittest.cc
+++ b/mojo/system/message_pipe_unittest.cc
@@ -195,9 +195,9 @@ TEST(MessagePipeTest, Basic) {
EXPECT_EQ(345678901, buffer[0]);
EXPECT_EQ(456, buffer[1]);
- // Read again from port 1 -- it should be empty.
+ // Read again from port 1 -- it should be empty (and port 0 is closed).
buffer_size = kBufferSize;
- EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
mp->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc
new file mode 100644
index 0000000..fce2e43
--- /dev/null
+++ b/mojo/system/proxy_message_pipe_endpoint.cc
@@ -0,0 +1,144 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/proxy_message_pipe_endpoint.h"
+
+#include <string.h>
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "mojo/system/channel.h"
+
+namespace mojo {
+namespace system {
+
+ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
+ : local_id_(MessageInTransit::kInvalidEndpointId),
+ remote_id_(MessageInTransit::kInvalidEndpointId),
+ is_open_(true),
+ is_peer_open_(true) {
+}
+
+ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
+ DCHECK(!is_running());
+ DCHECK(!is_attached());
+ AssertConsistentState();
+ DCHECK(paused_message_queue_.empty());
+}
+
+void ProxyMessagePipeEndpoint::Close() {
+ DCHECK(is_open_);
+ is_open_ = false;
+
+ DCHECK(is_attached());
+ channel_->DetachMessagePipeEndpoint(local_id_);
+ channel_ = NULL;
+ local_id_ = MessageInTransit::kInvalidEndpointId;
+ remote_id_ = MessageInTransit::kInvalidEndpointId;
+
+ for (std::deque<MessageInTransit*>::iterator it =
+ paused_message_queue_.begin();
+ it != paused_message_queue_.end();
+ ++it) {
+ (*it)->Destroy();
+ }
+ paused_message_queue_.clear();
+}
+
+bool ProxyMessagePipeEndpoint::OnPeerClose() {
+ DCHECK(is_open_);
+ DCHECK(is_peer_open_);
+
+ is_peer_open_ = false;
+ if (EnqueueMessage(MessageInTransit::Create(
+ MessageInTransit::kTypeMessagePipe,
+ MessageInTransit::kSubtypeMessagePipePeerClosed,
+ NULL, 0)) != MOJO_RESULT_OK) {
+ // TODO(vtl): Do something more sensible on error here?
+ LOG(WARNING) << "Failed to send peer closed control message";
+ }
+
+ // Return false -- to indicate that we should be destroyed -- if no messages
+ // are still enqueued. (Messages may still be enqueued if we're not running
+ // yet, but our peer was closed.)
+ return !paused_message_queue_.empty();
+}
+
+MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(MessageInTransit* message) {
+ DCHECK(is_open_);
+ // If our (local) peer isn't open, we should only be enqueueing our own
+ // control messages.
+ DCHECK(is_peer_open_ ||
+ (message->type() == MessageInTransit::kTypeMessagePipe));
+
+ MojoResult rv = MOJO_RESULT_OK;
+
+ if (is_running()) {
+ message->set_source_id(local_id_);
+ message->set_destination_id(remote_id_);
+ if (!channel_->WriteMessage(message))
+ rv = MOJO_RESULT_FAILED_PRECONDITION;
+ } else {
+ paused_message_queue_.push_back(message);
+ }
+
+ return rv;
+}
+
+void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
+ MessageInTransit::EndpointId local_id) {
+ DCHECK(channel.get());
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
+
+ DCHECK(!is_attached());
+
+ AssertConsistentState();
+ channel_ = channel;
+ local_id_ = local_id;
+ AssertConsistentState();
+}
+
+bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
+ // Assertions about arguments:
+ DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
+
+ // Assertions about current state:
+ DCHECK(is_attached());
+ DCHECK(!is_running());
+
+ AssertConsistentState();
+ remote_id_ = remote_id;
+ AssertConsistentState();
+
+ MojoResult result = MOJO_RESULT_OK;
+ for (std::deque<MessageInTransit*>::iterator it =
+ paused_message_queue_.begin();
+ it != paused_message_queue_.end();
+ ++it) {
+ result = EnqueueMessage(*it);
+ if (result != MOJO_RESULT_OK) {
+ // TODO(vtl): Do something more sensible on error here?
+ LOG(WARNING) << "Failed to send message";
+ }
+ }
+ paused_message_queue_.clear();
+
+ // If the peer is not open, we should return false since we should be
+ // destroyed.
+ return is_peer_open_;
+}
+
+#ifndef NDEBUG
+void ProxyMessagePipeEndpoint::AssertConsistentState() const {
+ if (is_attached()) {
+ DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
+ } else { // Not attached.
+ DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
+ DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
+ }
+}
+#endif
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/proxy_message_pipe_endpoint.h b/mojo/system/proxy_message_pipe_endpoint.h
new file mode 100644
index 0000000..43b59b4
--- /dev/null
+++ b/mojo/system/proxy_message_pipe_endpoint.h
@@ -0,0 +1,94 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_PROXY_MESSAGE_PIPE_ENDPOINT_H_
+#define MOJO_SYSTEM_PROXY_MESSAGE_PIPE_ENDPOINT_H_
+
+#include <stdint.h>
+
+#include <deque>
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "mojo/public/system/core.h"
+#include "mojo/public/system/system_export.h"
+#include "mojo/system/message_in_transit.h"
+#include "mojo/system/message_pipe_endpoint.h"
+
+namespace mojo {
+namespace system {
+
+class Channel;
+
+// A |ProxyMessagePipeEndpoint| connects an end of a |MessagePipe| to a
+// |Channel|, over which it transmits and receives data (to/from another
+// |ProxyMessagePipeEndpoint|). So a |MessagePipe| with one endpoint local and
+// the other endpoint remote consists of a |LocalMessagePipeEndpoint| and a
+// |ProxyMessagePipeEndpoint|, with only the local endpoint being accessible via
+// a |MessagePipeDispatcher|.
+//
+// Like any |MessagePipeEndpoint|, a |ProxyMessagePipeEndpoint| is owned by a
+// |MessagePipe|.
+// - A |ProxyMessagePipeEndpoint| starts out *detached*, i.e., not associated
+// to any |Channel|. When *attached*, it gets a reference to a |Channel| and
+// is assigned a local ID. A |ProxyMessagePipeEndpoint| must be detached
+// before destruction; this is done inside |Close()|.
+// - When attached, a |ProxyMessagePipeEndpoint| starts out not running. When
+// run, it gets a remote ID.
+class MOJO_SYSTEM_EXPORT ProxyMessagePipeEndpoint : public MessagePipeEndpoint {
+ public:
+ ProxyMessagePipeEndpoint();
+ virtual ~ProxyMessagePipeEndpoint();
+
+ // |MessagePipeEndpoint| implementation:
+ virtual void Close() OVERRIDE;
+ virtual bool OnPeerClose() OVERRIDE;
+ virtual MojoResult EnqueueMessage(MessageInTransit* message) OVERRIDE;
+ virtual void Attach(scoped_refptr<Channel> channel,
+ MessageInTransit::EndpointId local_id) OVERRIDE;
+ virtual bool Run(MessageInTransit::EndpointId remote_id) OVERRIDE;
+
+ private:
+ bool is_attached() const {
+ return !!channel_.get();
+ }
+
+ bool is_running() const {
+ return remote_id_ != MessageInTransit::kInvalidEndpointId;
+ }
+
+#ifdef NDEBUG
+ void AssertConsistentState() const {}
+#else
+ void AssertConsistentState() const;
+#endif
+
+ // This should only be set if we're attached.
+ scoped_refptr<Channel> channel_;
+
+ // |local_id_| should be set to something other than
+ // |MessageInTransit::kInvalidEndpointId| when we're attached.
+ MessageInTransit::EndpointId local_id_;
+
+ // |remote_id_| being set to anything other than
+ // |MessageInTransit::kInvalidEndpointId| indicates that we're "running",
+ // i.e., actively able to send messages. We should only ever be running if
+ // we're attached.
+ MessageInTransit::EndpointId remote_id_;
+
+ bool is_open_;
+ bool is_peer_open_;
+
+ // This queue is only used while we're detached, to store messages while we're
+ // not ready to send them yet.
+ std::deque<MessageInTransit*> paused_message_queue_;
+
+ DISALLOW_COPY_AND_ASSIGN(ProxyMessagePipeEndpoint);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_PROXY_MESSAGE_PIPE_ENDPOINT_H_
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
index 0c19ad0..f6b1b4a 100644
--- a/mojo/system/raw_channel.h
+++ b/mojo/system/raw_channel.h
@@ -46,7 +46,7 @@ class MOJO_SYSTEM_EXPORT RawChannel {
// The |Delegate| is only accessed on the same thread as the message loop
// (passed in on creation).
- class Delegate {
+ class MOJO_SYSTEM_EXPORT Delegate {
public:
enum FatalError {
FATAL_ERROR_UNKNOWN = 0,
@@ -73,8 +73,9 @@ class MOJO_SYSTEM_EXPORT RawChannel {
Delegate* delegate,
base::MessageLoop* message_loop);
- // This must be called (on the I/O thread) before this object is used.
- virtual void Init() = 0;
+ // This must be called (on the I/O thread) before this object is used. Returns
+ // true on success. On failure, |Shutdown()| should *not* be called.
+ virtual bool Init() = 0;
// This must be called (on the I/O thread) before this object is destroyed.
virtual void Shutdown() = 0;
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index e218d84..f063424 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -39,7 +39,7 @@ class RawChannelPosix : public RawChannel,
virtual ~RawChannelPosix();
// |RawChannel| implementation:
- virtual void Init() OVERRIDE;
+ virtual bool Init() OVERRIDE;
virtual void Shutdown() OVERRIDE;
virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
@@ -122,7 +122,7 @@ RawChannelPosix::~RawChannelPosix() {
DCHECK(!write_watcher_.get());
}
-void RawChannelPosix::Init() {
+bool RawChannelPosix::Init() {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
DCHECK(!read_watcher_.get());
@@ -133,9 +133,17 @@ void RawChannelPosix::Init() {
// No need to take the lock. No one should be using us yet.
DCHECK(write_message_queue_.empty());
- bool result = message_loop_for_io()->WatchFileDescriptor(
- fd_, true, base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
- DCHECK(result);
+ if (!message_loop_for_io()->WatchFileDescriptor(fd_, true,
+ base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
+ // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
+ // (in the sense of returning the message loop's state to what it was before
+ // it was called).
+ read_watcher_.reset();
+ write_watcher_.reset();
+ return false;
+ }
+
+ return true;
}
void RawChannelPosix::Shutdown() {
diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc
index 16ebcf0..21898e9 100644
--- a/mojo/system/raw_channel_posix_unittest.cc
+++ b/mojo/system/raw_channel_posix_unittest.cc
@@ -45,7 +45,10 @@ MessageInTransit* MakeTestMessage(uint32_t num_bytes) {
std::vector<unsigned char> bytes(num_bytes, 0);
for (size_t i = 0; i < num_bytes; i++)
bytes[i] = static_cast<unsigned char>(i + num_bytes);
- return MessageInTransit::Create(bytes.data(), num_bytes);
+ return MessageInTransit::Create(
+ MessageInTransit::kTypeMessagePipeEndpoint,
+ MessageInTransit::kSubtypeMessagePipeEndpointData,
+ bytes.data(), num_bytes);
}
bool CheckMessageData(const void* bytes, uint32_t num_bytes) {
@@ -57,6 +60,10 @@ bool CheckMessageData(const void* bytes, uint32_t num_bytes) {
return true;
}
+void InitOnIOThread(RawChannel* raw_channel) {
+ CHECK(raw_channel->Init());
+}
+
// -----------------------------------------------------------------------------
class RawChannelPosixTest : public testing::Test {
@@ -210,8 +217,7 @@ TEST_F(RawChannelPosixTest, WriteMessage) {
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
- base::Bind(&RawChannel::Init,
- base::Unretained(rc.get())));
+ base::Bind(&InitOnIOThread, rc.get()));
// Write and read, for a variety of sizes.
for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
@@ -304,8 +310,7 @@ TEST_F(RawChannelPosixTest, OnReadMessage) {
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
- base::Bind(&RawChannel::Init,
- base::Unretained(rc.get())));
+ base::Bind(&InitOnIOThread, rc.get()));
// Write and read, for a variety of sizes.
for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
@@ -416,8 +421,7 @@ TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
- base::Bind(&RawChannel::Init,
- base::Unretained(writer_rc.get())));
+ base::Bind(&InitOnIOThread, writer_rc.get()));
ReadCountdownRawChannelDelegate reader_delegate(
kNumWriterThreads * kNumWriteMessagesPerThread);
@@ -430,8 +434,7 @@ TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
- base::Bind(&RawChannel::Init,
- base::Unretained(reader_rc.get())));
+ base::Bind(&InitOnIOThread, reader_rc.get()));
{
ScopedVector<RawChannelWriterThread> writer_threads;
@@ -510,8 +513,7 @@ TEST_F(RawChannelPosixTest, OnFatalError) {
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
- base::Bind(&RawChannel::Init,
- base::Unretained(rc.get())));
+ base::Bind(&InitOnIOThread, rc.get()));
// Close the other end, which should make writing fail.
CHECK_EQ(close(fd(1)), 0);
@@ -545,8 +547,7 @@ TEST_F(RawChannelPosixTest, WriteMessageAfterShutdown) {
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
- base::Bind(&RawChannel::Init,
- base::Unretained(rc.get())));
+ base::Bind(&InitOnIOThread, rc.get()));
test::PostTaskAndWait(io_thread_task_runner(),
FROM_HERE,
base::Bind(&RawChannel::Shutdown,
diff --git a/mojo/system/raw_channel_win.cc b/mojo/system/raw_channel_win.cc
new file mode 100644
index 0000000..bb5eb10
--- /dev/null
+++ b/mojo/system/raw_channel_win.cc
@@ -0,0 +1,27 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/raw_channel.h"
+
+#include <stddef.h>
+
+#include "base/logging.h"
+
+namespace mojo {
+namespace system {
+
+// -----------------------------------------------------------------------------
+
+// Static factory method declared in raw_channel.h.
+// static
+RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
+ Delegate* delegate,
+ base::MessageLoop* message_loop) {
+ // TODO(vtl)
+ NOTIMPLEMENTED();
+ return NULL;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/remote_message_pipe_posix_unittest.cc b/mojo/system/remote_message_pipe_posix_unittest.cc
new file mode 100644
index 0000000..5f16166
--- /dev/null
+++ b/mojo/system/remote_message_pipe_posix_unittest.cc
@@ -0,0 +1,342 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// TODO(vtl): Factor out the POSIX-specific bits of this test (once we have a
+// non-POSIX implementation).
+
+#include "mojo/system/message_pipe.h"
+
+#include <fcntl.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/thread.h"
+#include "mojo/system/channel.h"
+#include "mojo/system/local_message_pipe_endpoint.h"
+#include "mojo/system/platform_channel_handle.h"
+#include "mojo/system/proxy_message_pipe_endpoint.h"
+#include "mojo/system/test_utils.h"
+#include "mojo/system/waiter.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+class RemoteMessagePipeTest : public testing::Test {
+ public:
+ RemoteMessagePipeTest() : io_thread_("io_thread") {
+ }
+
+ virtual ~RemoteMessagePipeTest() {
+ }
+
+ virtual void SetUp() OVERRIDE {
+ io_thread_.StartWithOptions(
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
+
+ test::PostTaskAndWait(io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
+ base::Unretained(this)));
+ }
+
+ virtual void TearDown() OVERRIDE {
+ test::PostTaskAndWait(io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
+ base::Unretained(this)));
+ io_thread_.Stop();
+ }
+
+ // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
+ // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
+ // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
+ void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0,
+ scoped_refptr<MessagePipe> mp_1) {
+ CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
+
+ MessageInTransit::EndpointId local_id_0 =
+ channels_[0]->AttachMessagePipeEndpoint(mp_0, 1);
+ MessageInTransit::EndpointId local_id_1 =
+ channels_[1]->AttachMessagePipeEndpoint(mp_1, 0);
+
+ channels_[0]->RunMessagePipeEndpoint(local_id_0, local_id_1);
+ channels_[1]->RunMessagePipeEndpoint(local_id_1, local_id_0);
+ }
+
+ protected:
+ base::MessageLoop* io_thread_message_loop() {
+ return io_thread_.message_loop();
+ }
+
+ scoped_refptr<base::TaskRunner> io_thread_task_runner() {
+ return io_thread_message_loop()->message_loop_proxy();
+ }
+
+ private:
+ void SetUpOnIOThread() {
+ CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop());
+
+ // Create the socket.
+ int fds[2];
+ PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
+
+ // Set the ends to non-blocking.
+ PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
+ PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
+
+ // Create and initialize |Channel|s.
+ channels_[0] = new Channel();
+ CHECK(channels_[0]->Init(PlatformChannelHandle(fds[0])));
+ channels_[1] = new Channel();
+ CHECK(channels_[1]->Init(PlatformChannelHandle(fds[1])));
+ }
+
+ void TearDownOnIOThread() {
+ channels_[1]->Shutdown();
+ channels_[1] = NULL;
+ channels_[0]->Shutdown();
+ channels_[0] = NULL;
+ }
+
+ base::Thread io_thread_;
+ scoped_refptr<Channel> channels_[2];
+
+ DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
+};
+
+TEST_F(RemoteMessagePipeTest, Basic) {
+ const char hello[] = "hello";
+ const char world[] = "world!!!1!!!1!";
+ char buffer[100] = { 0 };
+ uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ Waiter waiter;
+
+ // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
+ // connected to MP 1, port 0, which will be attached to channel 1. This leaves
+ // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
+
+ scoped_refptr<MessagePipe> mp_0(new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
+ scoped_refptr<MessagePipe> mp_1(new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
+ test::PostTaskAndWait(
+ io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
+ base::Unretained(this), mp_0, mp_1));
+
+ // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
+
+ // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
+ // it later, it might already be readable.)
+ waiter.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
+
+ // Write to MP 0, port 0.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_0->WriteMessage(0,
+ hello, sizeof(hello),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Wait.
+ EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ mp_1->RemoveWaiter(1, &waiter);
+
+ // Read from MP 1, port 1.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_1->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
+ EXPECT_EQ(0, strcmp(buffer, hello));
+
+ // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
+
+ waiter.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456));
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_1->WriteMessage(1,
+ world, sizeof(world),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ mp_0->RemoveWaiter(0, &waiter);
+
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_0->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
+ EXPECT_EQ(0, strcmp(buffer, world));
+
+ // Close MP 0, port 0.
+ mp_0->Close(0);
+
+ // Try to wait for MP 1, port 1 to become readable. This will eventually fail
+ // when it realizes that MP 0, port 0 has been closed. (It may also fail
+ // immediately.)
+ waiter.Init();
+ MojoResult result = mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789);
+ if (result == MOJO_RESULT_OK) {
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ mp_1->RemoveWaiter(1, &waiter);
+ } else {
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+ }
+
+ // And MP 1, port 1.
+ mp_1->Close(1);
+}
+
+TEST_F(RemoteMessagePipeTest, Multiplex) {
+ const char hello[] = "hello";
+ const char world[] = "world!!!1!!!1!";
+ char buffer[100] = { 0 };
+ uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ Waiter waiter;
+
+ // Connect message pipes as in the |Basic| test.
+
+ scoped_refptr<MessagePipe> mp_0(new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
+ scoped_refptr<MessagePipe> mp_1(new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
+ test::PostTaskAndWait(
+ io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
+ base::Unretained(this), mp_0, mp_1));
+
+ // Now put another message pipe on the channel.
+
+ scoped_refptr<MessagePipe> mp_2(new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
+ scoped_refptr<MessagePipe> mp_3(new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
+ test::PostTaskAndWait(
+ io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
+ base::Unretained(this), mp_2, mp_3));
+
+ // Write: MP 2, port 0 -> MP 3, port 1.
+
+ waiter.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_2->WriteMessage(0,
+ hello, sizeof(hello),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ mp_3->RemoveWaiter(1, &waiter);
+
+ // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp_0->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp_1->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp_2->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ // Read from MP 3, port 1.
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_3->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
+ EXPECT_EQ(0, strcmp(buffer, hello));
+
+ // Write: MP 0, port 0 -> MP 1, port 1 again.
+
+ waiter.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_0->WriteMessage(0,
+ world, sizeof(world),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ mp_1->RemoveWaiter(1, &waiter);
+
+ // Make sure there's nothing on the other ports.
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp_0->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp_2->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp_3->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp_1->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
+ EXPECT_EQ(0, strcmp(buffer, world));
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo