summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-16 00:24:37 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-16 00:24:37 +0000
commit989f8bc65fc880165287075bcc92bc97dfe5c971 (patch)
treed1391735da22493131be1ae6943f53600e30dd2f /mojo
parent3b04d1f2b6efef7f6f1c9efb4b011645d5893490 (diff)
downloadchromium_src-989f8bc65fc880165287075bcc92bc97dfe5c971.zip
chromium_src-989f8bc65fc880165287075bcc92bc97dfe5c971.tar.gz
chromium_src-989f8bc65fc880165287075bcc92bc97dfe5c971.tar.bz2
Mojo: Abstract out the endpoints of MessagePipes.
This will allow us to plug in different kinds of endpoints, in particular, non-local endpoints (so that MessagePipes can be cross-process, with, e.g., a socket as transport). R=darin Review URL: https://codereview.chromium.org/27060003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228815 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp4
-rw-r--r--mojo/system/local_message_pipe_endpoint.cc160
-rw-r--r--mojo/system/local_message_pipe_endpoint.h61
-rw-r--r--mojo/system/message_pipe.cc173
-rw-r--r--mojo/system/message_pipe.h26
-rw-r--r--mojo/system/message_pipe_endpoint.cc41
-rw-r--r--mojo/system/message_pipe_endpoint.h63
7 files changed, 375 insertions, 153 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 74d7f8b..c69fc1a 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -78,6 +78,8 @@
'system/dispatcher.cc',
'system/dispatcher.h',
'system/limits.h',
+ 'system/local_message_pipe_endpoint.cc',
+ 'system/local_message_pipe_endpoint.h',
'system/memory.cc',
'system/memory.h',
'system/message_in_transit.cc',
@@ -86,6 +88,8 @@
'system/message_pipe.h',
'system/message_pipe_dispatcher.cc',
'system/message_pipe_dispatcher.h',
+ 'system/message_pipe_endpoint.cc',
+ 'system/message_pipe_endpoint.h',
'system/platform_channel_handle.h',
'system/raw_channel.h',
'system/raw_channel_posix.cc',
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc
new file mode 100644
index 0000000..2dafe0a
--- /dev/null
+++ b/mojo/system/local_message_pipe_endpoint.cc
@@ -0,0 +1,160 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/local_message_pipe_endpoint.h"
+
+#include <string.h>
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "mojo/system/message_in_transit.h"
+
+namespace mojo {
+namespace system {
+
+LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
+ : is_open_(true),
+ is_peer_open_(true) {
+}
+
+LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
+ DCHECK(!is_open_);
+}
+
+void LocalMessagePipeEndpoint::OnPeerClose() {
+ DCHECK(is_open_);
+ DCHECK(is_peer_open_);
+
+ MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
+ MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
+ is_peer_open_ = false;
+ MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
+ MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
+
+ if (new_satisfied_flags != old_satisfied_flags ||
+ new_satisfiable_flags != old_satisfiable_flags) {
+ waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
+ new_satisfiable_flags);
+ }
+}
+
+MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags /*flags*/) {
+ DCHECK(is_open_);
+ DCHECK(is_peer_open_);
+
+ bool was_empty = message_queue_.empty();
+
+ // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
+ message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes));
+ // TODO(vtl): Support sending handles.
+
+ if (was_empty) {
+ waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
+ SatisfiableFlags());
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+void LocalMessagePipeEndpoint::CancelAllWaiters() {
+ DCHECK(is_open_);
+ waiter_list_.CancelAllWaiters();
+}
+
+void LocalMessagePipeEndpoint::Close() {
+ DCHECK(is_open_);
+ is_open_ = false;
+ STLDeleteElements(&message_queue_);
+}
+
+MojoResult LocalMessagePipeEndpoint::ReadMessage(
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ DCHECK(is_open_);
+
+ const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
+ // TODO(vtl): We'll need this later:
+ // const uint32_t max_handles = num_handles ? *num_handles : 0;
+
+ if (message_queue_.empty())
+ return MOJO_RESULT_NOT_FOUND;
+
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
+ // and release the lock immediately.
+ bool not_enough_space = false;
+ MessageInTransit* const message = message_queue_.front();
+ if (num_bytes)
+ *num_bytes = message->data_size();
+ if (message->data_size() <= max_bytes)
+ memcpy(bytes, message->data(), message->data_size());
+ else
+ not_enough_space = true;
+
+ // TODO(vtl): Support receiving handles.
+ if (num_handles)
+ *num_handles = 0;
+
+ if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
+ message_queue_.pop_front();
+ message->Destroy();
+
+ // Now it's empty, thus no longer readable.
+ if (message_queue_.empty()) {
+ // It's currently not possible to wait for non-readability, but we should
+ // do the state change anyway.
+ waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
+ SatisfiableFlags());
+ }
+ }
+
+ if (not_enough_space)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ DCHECK(is_open_);
+
+ if ((flags & SatisfiedFlags()))
+ return MOJO_RESULT_ALREADY_EXISTS;
+ if (!(flags & SatisfiableFlags()))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ waiter_list_.AddWaiter(waiter, flags, wake_result);
+ return MOJO_RESULT_OK;
+}
+
+void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
+ DCHECK(is_open_);
+ waiter_list_.RemoveWaiter(waiter);
+}
+
+MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
+ MojoWaitFlags satisfied_flags = 0;
+ if (!message_queue_.empty())
+ satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
+ if (is_peer_open_)
+ satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
+ return satisfied_flags;
+}
+
+MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
+ MojoWaitFlags satisfiable_flags = 0;
+ if (!message_queue_.empty() || is_peer_open_)
+ satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
+ if (is_peer_open_)
+ satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
+ return satisfiable_flags;
+}
+
+} // namespace system
+} // namespace mojo
+
diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h
new file mode 100644
index 0000000..caed81d
--- /dev/null
+++ b/mojo/system/local_message_pipe_endpoint.h
@@ -0,0 +1,61 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_LOCAL_MESSAGE_PIPE_ENDPOINT_H_
+#define MOJO_SYSTEM_LOCAL_MESSAGE_PIPE_ENDPOINT_H_
+
+#include <deque>
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "mojo/public/system/core.h"
+#include "mojo/system/message_pipe_endpoint.h"
+#include "mojo/system/waiter_list.h"
+
+namespace mojo {
+namespace system {
+
+class MessageInTransit;
+
+class LocalMessagePipeEndpoint : public MessagePipeEndpoint {
+ public:
+ LocalMessagePipeEndpoint();
+ virtual ~LocalMessagePipeEndpoint();
+
+ // |MessagePipeEndpoint| implementation:
+ virtual void OnPeerClose() OVERRIDE;
+ virtual MojoResult EnqueueMessage(
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags) OVERRIDE;
+
+ // There's a dispatcher for |LocalMessagePipeEndpoint|s, so we have to
+ // implement/override these:
+ virtual void CancelAllWaiters() OVERRIDE;
+ virtual void Close() OVERRIDE;
+ virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) OVERRIDE;
+ virtual MojoResult AddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) OVERRIDE;
+ virtual void RemoveWaiter(Waiter* waiter) OVERRIDE;
+
+ private:
+ MojoWaitFlags SatisfiedFlags();
+ MojoWaitFlags SatisfiableFlags();
+
+ bool is_open_;
+ bool is_peer_open_;
+
+ std::deque<MessageInTransit*> message_queue_;
+ WaiterList waiter_list_;
+
+ DISALLOW_COPY_AND_ASSIGN(LocalMessagePipeEndpoint);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_LOCAL_MESSAGE_PIPE_ENDPOINT_H_
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index 8cd94cb..bfb0646 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -6,7 +6,9 @@
#include "base/logging.h"
#include "base/stl_util.h"
+#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_in_transit.h"
+#include "mojo/system/message_pipe_endpoint.h"
namespace mojo {
namespace system {
@@ -20,17 +22,23 @@ unsigned DestinationPortFromSourcePort(unsigned port) {
} // namespace
+MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
+ scoped_ptr<MessagePipeEndpoint> endpoint_1) {
+ endpoints_[0].reset(endpoint_0.release());
+ endpoints_[1].reset(endpoint_1.release());
+}
+
MessagePipe::MessagePipe() {
- is_open_[0] = is_open_[1] = true;
+ endpoints_[0].reset(new LocalMessagePipeEndpoint());
+ endpoints_[1].reset(new LocalMessagePipeEndpoint());
}
void MessagePipe::CancelAllWaiters(unsigned port) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
-
- waiter_lists_[port].CancelAllWaiters();
+ DCHECK(endpoints_[port].get());
+ endpoints_[port]->CancelAllWaiters();
}
void MessagePipe::Close(unsigned port) {
@@ -39,35 +47,13 @@ void MessagePipe::Close(unsigned port) {
unsigned destination_port = DestinationPortFromSourcePort(port);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
-
- // Record the old state of the other (destination) port, so we can tell if it
- // changes.
- // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
- // don't have to do this.
- MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
- MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
- bool dest_is_open = is_open_[destination_port];
- if (dest_is_open) {
- old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
- old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
- }
-
- is_open_[port] = false;
- STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
-
- // Notify the other (destination) port if its state has changed.
- if (dest_is_open) {
- MojoWaitFlags new_dest_satisfied_flags =
- SatisfiedFlagsNoLock(destination_port);
- MojoWaitFlags new_dest_satisfiable_flags =
- SatisfiableFlagsNoLock(destination_port);
- if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
- new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
- waiter_lists_[destination_port].AwakeWaitersForStateChange(
- new_dest_satisfied_flags, new_dest_satisfiable_flags);
- }
- }
+ DCHECK(endpoints_[port].get());
+
+ endpoints_[port]->Close();
+ if (endpoints_[destination_port].get())
+ endpoints_[destination_port]->OnPeerClose();
+
+ endpoints_[port].reset();
}
// TODO(vtl): Handle flags.
@@ -75,34 +61,21 @@ MojoResult MessagePipe::WriteMessage(
unsigned port,
const void* bytes, uint32_t num_bytes,
const MojoHandle* handles, uint32_t num_handles,
- MojoWriteMessageFlags /*flags*/) {
+ MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
unsigned destination_port = DestinationPortFromSourcePort(port);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
+ DCHECK(endpoints_[port].get());
// The destination port need not be open, unlike the source port.
- if (!is_open_[destination_port])
+ if (!endpoints_[destination_port].get())
return MOJO_RESULT_FAILED_PRECONDITION;
- bool dest_was_empty = message_queues_[destination_port].empty();
-
- // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
- message_queues_[destination_port].push_back(
- MessageInTransit::Create(bytes, num_bytes));
- // TODO(vtl): Support sending handles.
-
- // The other (destination) port was empty and now isn't, so it should now be
- // readable. Wake up anyone waiting on this.
- if (dest_was_empty) {
- waiter_lists_[destination_port].AwakeWaitersForStateChange(
- SatisfiedFlagsNoLock(destination_port),
- SatisfiableFlagsNoLock(destination_port));
- }
-
- return MOJO_RESULT_OK;
+ return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
+ handles, num_handles,
+ flags);
}
MojoResult MessagePipe::ReadMessage(unsigned port,
@@ -111,48 +84,12 @@ MojoResult MessagePipe::ReadMessage(unsigned port,
MojoReadMessageFlags flags) {
DCHECK(port == 0 || port == 1);
- const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
- // TODO(vtl): We'll need this later:
- // const uint32_t max_handles = num_handles ? *num_handles : 0;
-
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
-
- if (message_queues_[port].empty())
- return MOJO_RESULT_NOT_FOUND;
-
- // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
- // and release the lock immediately.
- bool not_enough_space = false;
- MessageInTransit* const message = message_queues_[port].front();
- if (num_bytes)
- *num_bytes = message->data_size();
- if (message->data_size() <= max_bytes)
- memcpy(bytes, message->data(), message->data_size());
- else
- not_enough_space = true;
-
- // TODO(vtl): Support receiving handles.
- if (num_handles)
- *num_handles = 0;
-
- if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
- message_queues_[port].pop_front();
- message->Destroy();
-
- // Now it's empty, thus no longer readable.
- if (message_queues_[port].empty()) {
- // It's currently not possible to wait for non-readability, but we should
- // do the state change anyway.
- waiter_lists_[port].AwakeWaitersForStateChange(
- SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
- }
- }
-
- if (not_enough_space)
- return MOJO_RESULT_RESOURCE_EXHAUSTED;
-
- return MOJO_RESULT_OK;
+ DCHECK(endpoints_[port].get());
+
+ return endpoints_[port]->ReadMessage(bytes, num_bytes,
+ handles, num_handles,
+ flags);
}
MojoResult MessagePipe::AddWaiter(unsigned port,
@@ -162,64 +99,26 @@ MojoResult MessagePipe::AddWaiter(unsigned port,
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
+ DCHECK(endpoints_[port].get());
- if ((flags & SatisfiedFlagsNoLock(port)))
- return MOJO_RESULT_ALREADY_EXISTS;
- if (!(flags & SatisfiableFlagsNoLock(port)))
- return MOJO_RESULT_FAILED_PRECONDITION;
-
- waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
- return MOJO_RESULT_OK;
+ return endpoints_[port]->AddWaiter(waiter, flags, wake_result);
}
void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
- DCHECK(is_open_[port]);
+ DCHECK(endpoints_[port].get());
- waiter_lists_[port].RemoveWaiter(waiter);
+ endpoints_[port]->RemoveWaiter(waiter);
}
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
// |Close()|. Thus these should already be null.
- DCHECK(!is_open_[0]);
- DCHECK(!is_open_[1]);
-}
-
-MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- unsigned destination_port = DestinationPortFromSourcePort(port);
-
- lock_.AssertAcquired();
-
- MojoWaitFlags satisfied_flags = 0;
- if (!message_queues_[port].empty())
- satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
- if (is_open_[destination_port])
- satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
-
- return satisfied_flags;
-}
-
-MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- unsigned destination_port = DestinationPortFromSourcePort(port);
-
- lock_.AssertAcquired();
-
- MojoWaitFlags satisfiable_flags = 0;
- if (!message_queues_[port].empty() || is_open_[destination_port])
- satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
- if (is_open_[destination_port])
- satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
-
- return satisfiable_flags;
+ DCHECK(!endpoints_[0].get());
+ DCHECK(!endpoints_[1].get());
}
} // namespace system
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
index e4c0c40..bb2f4ff 100644
--- a/mojo/system/message_pipe.h
+++ b/mojo/system/message_pipe.h
@@ -5,28 +5,31 @@
#ifndef MOJO_SYSTEM_MESSAGE_PIPE_H_
#define MOJO_SYSTEM_MESSAGE_PIPE_H_
-#include <deque>
-
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/lock.h"
#include "mojo/public/system/core.h"
#include "mojo/public/system/system_export.h"
-#include "mojo/system/waiter_list.h"
namespace mojo {
namespace system {
-class MessageInTransit;
+class MessagePipeEndpoint;
class Waiter;
// |MessagePipe| is the secondary object implementing a message pipe (see the
-// explanatory comment in core_impl.cc), and is jointly owned by the two
-// dispatchers passed in to the constructor. This class is thread-safe.
+// explanatory comment in core_impl.cc). It is typically owned by the
+// dispatcher(s) corresponding to the local endpoints. This class is
+// thread-safe.
class MOJO_SYSTEM_EXPORT MessagePipe :
public base::RefCountedThreadSafe<MessagePipe> {
public:
+ MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
+ scoped_ptr<MessagePipeEndpoint> endpoint_1);
+
+ // Convenience constructor that constructs a |MessagePipe| with two new
+ // |LocalMessagePipeEndpoint|s.
MessagePipe();
// These are called by the dispatcher to implement its methods of
@@ -55,17 +58,8 @@ class MOJO_SYSTEM_EXPORT MessagePipe :
friend class base::RefCountedThreadSafe<MessagePipe>;
virtual ~MessagePipe();
- MojoWaitFlags SatisfiedFlagsNoLock(unsigned port);
- MojoWaitFlags SatisfiableFlagsNoLock(unsigned port);
-
base::Lock lock_; // Protects the following members.
- bool is_open_[2];
- // These are *incoming* queues for their corresponding ports. It owns its
- // contents.
- // TODO(vtl): When possible (with C++11), convert the plain pointers to
- // scoped_ptr/unique_ptr.
- std::deque<MessageInTransit*> message_queues_[2];
- WaiterList waiter_lists_[2];
+ scoped_ptr<MessagePipeEndpoint> endpoints_[2];
DISALLOW_COPY_AND_ASSIGN(MessagePipe);
};
diff --git a/mojo/system/message_pipe_endpoint.cc b/mojo/system/message_pipe_endpoint.cc
new file mode 100644
index 0000000..7f91ba2
--- /dev/null
+++ b/mojo/system/message_pipe_endpoint.cc
@@ -0,0 +1,41 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/message_pipe_endpoint.h"
+
+#include "base/logging.h"
+
+namespace mojo {
+namespace system {
+
+void MessagePipeEndpoint::CancelAllWaiters() {
+ NOTREACHED();
+}
+
+void MessagePipeEndpoint::Close() {
+ NOTREACHED();
+}
+
+MojoResult MessagePipeEndpoint::ReadMessage(
+ void* /*bytes*/, uint32_t* /*num_bytes*/,
+ MojoHandle* /*handles*/, uint32_t* /*num_handles*/,
+ MojoReadMessageFlags /*flags*/) {
+ NOTREACHED();
+ return MOJO_RESULT_INTERNAL;
+}
+
+MojoResult MessagePipeEndpoint::AddWaiter(Waiter* /*waiter*/,
+ MojoWaitFlags /*flags*/,
+ MojoResult /*wake_result*/) {
+ NOTREACHED();
+ return MOJO_RESULT_INTERNAL;
+}
+
+void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/) {
+ NOTREACHED();
+}
+
+} // namespace system
+} // namespace mojo
+
diff --git a/mojo/system/message_pipe_endpoint.h b/mojo/system/message_pipe_endpoint.h
new file mode 100644
index 0000000..03e6158
--- /dev/null
+++ b/mojo/system/message_pipe_endpoint.h
@@ -0,0 +1,63 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_
+#define MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_
+
+#include "base/basictypes.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace system {
+
+class Waiter;
+
+// This is an interface to one of the ends of a message pipe, and is used by
+// |MessagePipe|. Its most important role is to provide a sink for messages
+// (i.e., a place where messages can be sent). It has a secondary role: When the
+// endpoint is local (i.e., in the current process), there'll be a dispatcher
+// corresponding to the endpoint. In that case, the implementation of
+// |MessagePipeEndpoint| also implements the functionality required by the
+// dispatcher, e.g., to read messages and to wait. Implementations of this class
+// are not thread-safe; instances are protected by |MesssagePipe|'s lock.
+class MessagePipeEndpoint {
+ public:
+ virtual ~MessagePipeEndpoint() {}
+
+ // All implementations must implement these.
+ virtual void OnPeerClose() = 0;
+ virtual MojoResult EnqueueMessage(
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags) = 0;
+
+ // Implementations must override these if they represent a local endpoint,
+ // i.e., one for which there's a |MessagePipeDispatcher| (and thus a handle).
+ // An implementation for a remote endpoint (for which there's no dispatcher)
+ // needs not override these methods, since they should never be called.
+ //
+ // These methods implement the methods of the same name in |MessagePipe|,
+ // though |MessagePipe|'s implementation may have to do a little more if the
+ // operation involves both endpoints.
+ virtual void CancelAllWaiters();
+ virtual void Close();
+ virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags);
+ virtual MojoResult AddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result);
+ virtual void RemoveWaiter(Waiter* waiter);
+
+ protected:
+ MessagePipeEndpoint() {}
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(MessagePipeEndpoint);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_