diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-16 00:24:37 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-16 00:24:37 +0000 |
commit | 989f8bc65fc880165287075bcc92bc97dfe5c971 (patch) | |
tree | d1391735da22493131be1ae6943f53600e30dd2f /mojo | |
parent | 3b04d1f2b6efef7f6f1c9efb4b011645d5893490 (diff) | |
download | chromium_src-989f8bc65fc880165287075bcc92bc97dfe5c971.zip chromium_src-989f8bc65fc880165287075bcc92bc97dfe5c971.tar.gz chromium_src-989f8bc65fc880165287075bcc92bc97dfe5c971.tar.bz2 |
Mojo: Abstract out the endpoints of MessagePipes.
This will allow us to plug in different kinds of endpoints, in particular,
non-local endpoints (so that MessagePipes can be cross-process, with, e.g., a
socket as transport).
R=darin
Review URL: https://codereview.chromium.org/27060003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228815 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/mojo.gyp | 4 | ||||
-rw-r--r-- | mojo/system/local_message_pipe_endpoint.cc | 160 | ||||
-rw-r--r-- | mojo/system/local_message_pipe_endpoint.h | 61 | ||||
-rw-r--r-- | mojo/system/message_pipe.cc | 173 | ||||
-rw-r--r-- | mojo/system/message_pipe.h | 26 | ||||
-rw-r--r-- | mojo/system/message_pipe_endpoint.cc | 41 | ||||
-rw-r--r-- | mojo/system/message_pipe_endpoint.h | 63 |
7 files changed, 375 insertions, 153 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp index 74d7f8b..c69fc1a 100644 --- a/mojo/mojo.gyp +++ b/mojo/mojo.gyp @@ -78,6 +78,8 @@ 'system/dispatcher.cc', 'system/dispatcher.h', 'system/limits.h', + 'system/local_message_pipe_endpoint.cc', + 'system/local_message_pipe_endpoint.h', 'system/memory.cc', 'system/memory.h', 'system/message_in_transit.cc', @@ -86,6 +88,8 @@ 'system/message_pipe.h', 'system/message_pipe_dispatcher.cc', 'system/message_pipe_dispatcher.h', + 'system/message_pipe_endpoint.cc', + 'system/message_pipe_endpoint.h', 'system/platform_channel_handle.h', 'system/raw_channel.h', 'system/raw_channel_posix.cc', diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc new file mode 100644 index 0000000..2dafe0a --- /dev/null +++ b/mojo/system/local_message_pipe_endpoint.cc @@ -0,0 +1,160 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/local_message_pipe_endpoint.h" + +#include <string.h> + +#include "base/logging.h" +#include "base/stl_util.h" +#include "mojo/system/message_in_transit.h" + +namespace mojo { +namespace system { + +LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() + : is_open_(true), + is_peer_open_(true) { +} + +LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { + DCHECK(!is_open_); +} + +void LocalMessagePipeEndpoint::OnPeerClose() { + DCHECK(is_open_); + DCHECK(is_peer_open_); + + MojoWaitFlags old_satisfied_flags = SatisfiedFlags(); + MojoWaitFlags old_satisfiable_flags = SatisfiableFlags(); + is_peer_open_ = false; + MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); + MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); + + if (new_satisfied_flags != old_satisfied_flags || + new_satisfiable_flags != old_satisfiable_flags) { + waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, + new_satisfiable_flags); + } +} + +MojoResult LocalMessagePipeEndpoint::EnqueueMessage( + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags /*flags*/) { + DCHECK(is_open_); + DCHECK(is_peer_open_); + + bool was_empty = message_queue_.empty(); + + // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. + message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes)); + // TODO(vtl): Support sending handles. + + if (was_empty) { + waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), + SatisfiableFlags()); + } + + return MOJO_RESULT_OK; +} + +void LocalMessagePipeEndpoint::CancelAllWaiters() { + DCHECK(is_open_); + waiter_list_.CancelAllWaiters(); +} + +void LocalMessagePipeEndpoint::Close() { + DCHECK(is_open_); + is_open_ = false; + STLDeleteElements(&message_queue_); +} + +MojoResult LocalMessagePipeEndpoint::ReadMessage( + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) { + DCHECK(is_open_); + + const uint32_t max_bytes = num_bytes ? *num_bytes : 0; + // TODO(vtl): We'll need this later: + // const uint32_t max_handles = num_handles ? *num_handles : 0; + + if (message_queue_.empty()) + return MOJO_RESULT_NOT_FOUND; + + // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop + // and release the lock immediately. + bool not_enough_space = false; + MessageInTransit* const message = message_queue_.front(); + if (num_bytes) + *num_bytes = message->data_size(); + if (message->data_size() <= max_bytes) + memcpy(bytes, message->data(), message->data_size()); + else + not_enough_space = true; + + // TODO(vtl): Support receiving handles. + if (num_handles) + *num_handles = 0; + + if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { + message_queue_.pop_front(); + message->Destroy(); + + // Now it's empty, thus no longer readable. + if (message_queue_.empty()) { + // It's currently not possible to wait for non-readability, but we should + // do the state change anyway. + waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), + SatisfiableFlags()); + } + } + + if (not_enough_space) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + return MOJO_RESULT_OK; +} + +MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + DCHECK(is_open_); + + if ((flags & SatisfiedFlags())) + return MOJO_RESULT_ALREADY_EXISTS; + if (!(flags & SatisfiableFlags())) + return MOJO_RESULT_FAILED_PRECONDITION; + + waiter_list_.AddWaiter(waiter, flags, wake_result); + return MOJO_RESULT_OK; +} + +void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { + DCHECK(is_open_); + waiter_list_.RemoveWaiter(waiter); +} + +MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() { + MojoWaitFlags satisfied_flags = 0; + if (!message_queue_.empty()) + satisfied_flags |= MOJO_WAIT_FLAG_READABLE; + if (is_peer_open_) + satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; + return satisfied_flags; +} + +MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() { + MojoWaitFlags satisfiable_flags = 0; + if (!message_queue_.empty() || is_peer_open_) + satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; + if (is_peer_open_) + satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; + return satisfiable_flags; +} + +} // namespace system +} // namespace mojo + diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h new file mode 100644 index 0000000..caed81d --- /dev/null +++ b/mojo/system/local_message_pipe_endpoint.h @@ -0,0 +1,61 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_LOCAL_MESSAGE_PIPE_ENDPOINT_H_ +#define MOJO_SYSTEM_LOCAL_MESSAGE_PIPE_ENDPOINT_H_ + +#include <deque> + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "mojo/public/system/core.h" +#include "mojo/system/message_pipe_endpoint.h" +#include "mojo/system/waiter_list.h" + +namespace mojo { +namespace system { + +class MessageInTransit; + +class LocalMessagePipeEndpoint : public MessagePipeEndpoint { + public: + LocalMessagePipeEndpoint(); + virtual ~LocalMessagePipeEndpoint(); + + // |MessagePipeEndpoint| implementation: + virtual void OnPeerClose() OVERRIDE; + virtual MojoResult EnqueueMessage( + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags) OVERRIDE; + + // There's a dispatcher for |LocalMessagePipeEndpoint|s, so we have to + // implement/override these: + virtual void CancelAllWaiters() OVERRIDE; + virtual void Close() OVERRIDE; + virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) OVERRIDE; + virtual MojoResult AddWaiter(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) OVERRIDE; + virtual void RemoveWaiter(Waiter* waiter) OVERRIDE; + + private: + MojoWaitFlags SatisfiedFlags(); + MojoWaitFlags SatisfiableFlags(); + + bool is_open_; + bool is_peer_open_; + + std::deque<MessageInTransit*> message_queue_; + WaiterList waiter_list_; + + DISALLOW_COPY_AND_ASSIGN(LocalMessagePipeEndpoint); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_LOCAL_MESSAGE_PIPE_ENDPOINT_H_ diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc index 8cd94cb..bfb0646 100644 --- a/mojo/system/message_pipe.cc +++ b/mojo/system/message_pipe.cc @@ -6,7 +6,9 @@ #include "base/logging.h" #include "base/stl_util.h" +#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/message_in_transit.h" +#include "mojo/system/message_pipe_endpoint.h" namespace mojo { namespace system { @@ -20,17 +22,23 @@ unsigned DestinationPortFromSourcePort(unsigned port) { } // namespace +MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, + scoped_ptr<MessagePipeEndpoint> endpoint_1) { + endpoints_[0].reset(endpoint_0.release()); + endpoints_[1].reset(endpoint_1.release()); +} + MessagePipe::MessagePipe() { - is_open_[0] = is_open_[1] = true; + endpoints_[0].reset(new LocalMessagePipeEndpoint()); + endpoints_[1].reset(new LocalMessagePipeEndpoint()); } void MessagePipe::CancelAllWaiters(unsigned port) { DCHECK(port == 0 || port == 1); base::AutoLock locker(lock_); - DCHECK(is_open_[port]); - - waiter_lists_[port].CancelAllWaiters(); + DCHECK(endpoints_[port].get()); + endpoints_[port]->CancelAllWaiters(); } void MessagePipe::Close(unsigned port) { @@ -39,35 +47,13 @@ void MessagePipe::Close(unsigned port) { unsigned destination_port = DestinationPortFromSourcePort(port); base::AutoLock locker(lock_); - DCHECK(is_open_[port]); - - // Record the old state of the other (destination) port, so we can tell if it - // changes. - // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we - // don't have to do this. - MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE; - MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE; - bool dest_is_open = is_open_[destination_port]; - if (dest_is_open) { - old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port); - old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port); - } - - is_open_[port] = false; - STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port. - - // Notify the other (destination) port if its state has changed. - if (dest_is_open) { - MojoWaitFlags new_dest_satisfied_flags = - SatisfiedFlagsNoLock(destination_port); - MojoWaitFlags new_dest_satisfiable_flags = - SatisfiableFlagsNoLock(destination_port); - if (new_dest_satisfied_flags != old_dest_satisfied_flags || - new_dest_satisfiable_flags != old_dest_satisfiable_flags) { - waiter_lists_[destination_port].AwakeWaitersForStateChange( - new_dest_satisfied_flags, new_dest_satisfiable_flags); - } - } + DCHECK(endpoints_[port].get()); + + endpoints_[port]->Close(); + if (endpoints_[destination_port].get()) + endpoints_[destination_port]->OnPeerClose(); + + endpoints_[port].reset(); } // TODO(vtl): Handle flags. @@ -75,34 +61,21 @@ MojoResult MessagePipe::WriteMessage( unsigned port, const void* bytes, uint32_t num_bytes, const MojoHandle* handles, uint32_t num_handles, - MojoWriteMessageFlags /*flags*/) { + MojoWriteMessageFlags flags) { DCHECK(port == 0 || port == 1); unsigned destination_port = DestinationPortFromSourcePort(port); base::AutoLock locker(lock_); - DCHECK(is_open_[port]); + DCHECK(endpoints_[port].get()); // The destination port need not be open, unlike the source port. - if (!is_open_[destination_port]) + if (!endpoints_[destination_port].get()) return MOJO_RESULT_FAILED_PRECONDITION; - bool dest_was_empty = message_queues_[destination_port].empty(); - - // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. - message_queues_[destination_port].push_back( - MessageInTransit::Create(bytes, num_bytes)); - // TODO(vtl): Support sending handles. - - // The other (destination) port was empty and now isn't, so it should now be - // readable. Wake up anyone waiting on this. - if (dest_was_empty) { - waiter_lists_[destination_port].AwakeWaitersForStateChange( - SatisfiedFlagsNoLock(destination_port), - SatisfiableFlagsNoLock(destination_port)); - } - - return MOJO_RESULT_OK; + return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes, + handles, num_handles, + flags); } MojoResult MessagePipe::ReadMessage(unsigned port, @@ -111,48 +84,12 @@ MojoResult MessagePipe::ReadMessage(unsigned port, MojoReadMessageFlags flags) { DCHECK(port == 0 || port == 1); - const uint32_t max_bytes = num_bytes ? *num_bytes : 0; - // TODO(vtl): We'll need this later: - // const uint32_t max_handles = num_handles ? *num_handles : 0; - base::AutoLock locker(lock_); - DCHECK(is_open_[port]); - - if (message_queues_[port].empty()) - return MOJO_RESULT_NOT_FOUND; - - // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop - // and release the lock immediately. - bool not_enough_space = false; - MessageInTransit* const message = message_queues_[port].front(); - if (num_bytes) - *num_bytes = message->data_size(); - if (message->data_size() <= max_bytes) - memcpy(bytes, message->data(), message->data_size()); - else - not_enough_space = true; - - // TODO(vtl): Support receiving handles. - if (num_handles) - *num_handles = 0; - - if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { - message_queues_[port].pop_front(); - message->Destroy(); - - // Now it's empty, thus no longer readable. - if (message_queues_[port].empty()) { - // It's currently not possible to wait for non-readability, but we should - // do the state change anyway. - waiter_lists_[port].AwakeWaitersForStateChange( - SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port)); - } - } - - if (not_enough_space) - return MOJO_RESULT_RESOURCE_EXHAUSTED; - - return MOJO_RESULT_OK; + DCHECK(endpoints_[port].get()); + + return endpoints_[port]->ReadMessage(bytes, num_bytes, + handles, num_handles, + flags); } MojoResult MessagePipe::AddWaiter(unsigned port, @@ -162,64 +99,26 @@ MojoResult MessagePipe::AddWaiter(unsigned port, DCHECK(port == 0 || port == 1); base::AutoLock locker(lock_); - DCHECK(is_open_[port]); + DCHECK(endpoints_[port].get()); - if ((flags & SatisfiedFlagsNoLock(port))) - return MOJO_RESULT_ALREADY_EXISTS; - if (!(flags & SatisfiableFlagsNoLock(port))) - return MOJO_RESULT_FAILED_PRECONDITION; - - waiter_lists_[port].AddWaiter(waiter, flags, wake_result); - return MOJO_RESULT_OK; + return endpoints_[port]->AddWaiter(waiter, flags, wake_result); } void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { DCHECK(port == 0 || port == 1); base::AutoLock locker(lock_); - DCHECK(is_open_[port]); + DCHECK(endpoints_[port].get()); - waiter_lists_[port].RemoveWaiter(waiter); + endpoints_[port]->RemoveWaiter(waiter); } MessagePipe::~MessagePipe() { // Owned by the dispatchers. The owning dispatchers should only release us via // their |Close()| method, which should inform us of being closed via our // |Close()|. Thus these should already be null. - DCHECK(!is_open_[0]); - DCHECK(!is_open_[1]); -} - -MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) { - DCHECK(port == 0 || port == 1); - - unsigned destination_port = DestinationPortFromSourcePort(port); - - lock_.AssertAcquired(); - - MojoWaitFlags satisfied_flags = 0; - if (!message_queues_[port].empty()) - satisfied_flags |= MOJO_WAIT_FLAG_READABLE; - if (is_open_[destination_port]) - satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; - - return satisfied_flags; -} - -MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) { - DCHECK(port == 0 || port == 1); - - unsigned destination_port = DestinationPortFromSourcePort(port); - - lock_.AssertAcquired(); - - MojoWaitFlags satisfiable_flags = 0; - if (!message_queues_[port].empty() || is_open_[destination_port]) - satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; - if (is_open_[destination_port]) - satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; - - return satisfiable_flags; + DCHECK(!endpoints_[0].get()); + DCHECK(!endpoints_[1].get()); } } // namespace system diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h index e4c0c40..bb2f4ff 100644 --- a/mojo/system/message_pipe.h +++ b/mojo/system/message_pipe.h @@ -5,28 +5,31 @@ #ifndef MOJO_SYSTEM_MESSAGE_PIPE_H_ #define MOJO_SYSTEM_MESSAGE_PIPE_H_ -#include <deque> - #include "base/basictypes.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/synchronization/lock.h" #include "mojo/public/system/core.h" #include "mojo/public/system/system_export.h" -#include "mojo/system/waiter_list.h" namespace mojo { namespace system { -class MessageInTransit; +class MessagePipeEndpoint; class Waiter; // |MessagePipe| is the secondary object implementing a message pipe (see the -// explanatory comment in core_impl.cc), and is jointly owned by the two -// dispatchers passed in to the constructor. This class is thread-safe. +// explanatory comment in core_impl.cc). It is typically owned by the +// dispatcher(s) corresponding to the local endpoints. This class is +// thread-safe. class MOJO_SYSTEM_EXPORT MessagePipe : public base::RefCountedThreadSafe<MessagePipe> { public: + MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, + scoped_ptr<MessagePipeEndpoint> endpoint_1); + + // Convenience constructor that constructs a |MessagePipe| with two new + // |LocalMessagePipeEndpoint|s. MessagePipe(); // These are called by the dispatcher to implement its methods of @@ -55,17 +58,8 @@ class MOJO_SYSTEM_EXPORT MessagePipe : friend class base::RefCountedThreadSafe<MessagePipe>; virtual ~MessagePipe(); - MojoWaitFlags SatisfiedFlagsNoLock(unsigned port); - MojoWaitFlags SatisfiableFlagsNoLock(unsigned port); - base::Lock lock_; // Protects the following members. - bool is_open_[2]; - // These are *incoming* queues for their corresponding ports. It owns its - // contents. - // TODO(vtl): When possible (with C++11), convert the plain pointers to - // scoped_ptr/unique_ptr. - std::deque<MessageInTransit*> message_queues_[2]; - WaiterList waiter_lists_[2]; + scoped_ptr<MessagePipeEndpoint> endpoints_[2]; DISALLOW_COPY_AND_ASSIGN(MessagePipe); }; diff --git a/mojo/system/message_pipe_endpoint.cc b/mojo/system/message_pipe_endpoint.cc new file mode 100644 index 0000000..7f91ba2 --- /dev/null +++ b/mojo/system/message_pipe_endpoint.cc @@ -0,0 +1,41 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/message_pipe_endpoint.h" + +#include "base/logging.h" + +namespace mojo { +namespace system { + +void MessagePipeEndpoint::CancelAllWaiters() { + NOTREACHED(); +} + +void MessagePipeEndpoint::Close() { + NOTREACHED(); +} + +MojoResult MessagePipeEndpoint::ReadMessage( + void* /*bytes*/, uint32_t* /*num_bytes*/, + MojoHandle* /*handles*/, uint32_t* /*num_handles*/, + MojoReadMessageFlags /*flags*/) { + NOTREACHED(); + return MOJO_RESULT_INTERNAL; +} + +MojoResult MessagePipeEndpoint::AddWaiter(Waiter* /*waiter*/, + MojoWaitFlags /*flags*/, + MojoResult /*wake_result*/) { + NOTREACHED(); + return MOJO_RESULT_INTERNAL; +} + +void MessagePipeEndpoint::RemoveWaiter(Waiter* /*waiter*/) { + NOTREACHED(); +} + +} // namespace system +} // namespace mojo + diff --git a/mojo/system/message_pipe_endpoint.h b/mojo/system/message_pipe_endpoint.h new file mode 100644 index 0000000..03e6158 --- /dev/null +++ b/mojo/system/message_pipe_endpoint.h @@ -0,0 +1,63 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_ +#define MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_ + +#include "base/basictypes.h" +#include "mojo/public/system/core.h" + +namespace mojo { +namespace system { + +class Waiter; + +// This is an interface to one of the ends of a message pipe, and is used by +// |MessagePipe|. Its most important role is to provide a sink for messages +// (i.e., a place where messages can be sent). It has a secondary role: When the +// endpoint is local (i.e., in the current process), there'll be a dispatcher +// corresponding to the endpoint. In that case, the implementation of +// |MessagePipeEndpoint| also implements the functionality required by the +// dispatcher, e.g., to read messages and to wait. Implementations of this class +// are not thread-safe; instances are protected by |MesssagePipe|'s lock. +class MessagePipeEndpoint { + public: + virtual ~MessagePipeEndpoint() {} + + // All implementations must implement these. + virtual void OnPeerClose() = 0; + virtual MojoResult EnqueueMessage( + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags) = 0; + + // Implementations must override these if they represent a local endpoint, + // i.e., one for which there's a |MessagePipeDispatcher| (and thus a handle). + // An implementation for a remote endpoint (for which there's no dispatcher) + // needs not override these methods, since they should never be called. + // + // These methods implement the methods of the same name in |MessagePipe|, + // though |MessagePipe|'s implementation may have to do a little more if the + // operation involves both endpoints. + virtual void CancelAllWaiters(); + virtual void Close(); + virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags); + virtual MojoResult AddWaiter(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result); + virtual void RemoveWaiter(Waiter* waiter); + + protected: + MessagePipeEndpoint() {} + + private: + DISALLOW_COPY_AND_ASSIGN(MessagePipeEndpoint); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_ |