diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-03-05 05:58:10 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-03-05 05:58:10 +0000 |
commit | 23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d (patch) | |
tree | e60007ea90c0ad1af1653f772f689054b9c7bb4d /mojo/system | |
parent | a0c57c178a064a6a28daa15356876c4298948258 (diff) | |
download | chromium_src-23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d.zip chromium_src-23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d.tar.gz chromium_src-23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d.tar.bz2 |
Mojo: First pass at cross-process passing of MessagePipe handles.
There's still stuff left to do:
- Some high-priority TODOs are marked with an additional FIXME (mostly
for my benefit).
- Notably, we need better error handling (in particular, if the other
process spews garbage, we should correctly detect this and not die).
- There's a crasher bug (see RemoteMessagePipeTest.HandlePassing).
- Also, we need more testing, and more end-to-end testing.
R=darin@chromium.org
Review URL: https://codereview.chromium.org/180953003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@254946 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r-- | mojo/system/channel.cc | 61 | ||||
-rw-r--r-- | mojo/system/channel.h | 9 | ||||
-rw-r--r-- | mojo/system/dispatcher.cc | 30 | ||||
-rw-r--r-- | mojo/system/dispatcher.h | 10 | ||||
-rw-r--r-- | mojo/system/embedder/embedder.cc | 17 | ||||
-rw-r--r-- | mojo/system/local_message_pipe_endpoint.h | 3 | ||||
-rw-r--r-- | mojo/system/message_in_transit.cc | 51 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 15 | ||||
-rw-r--r-- | mojo/system/message_pipe.cc | 44 | ||||
-rw-r--r-- | mojo/system/message_pipe.h | 9 | ||||
-rw-r--r-- | mojo/system/message_pipe_dispatcher.cc | 85 | ||||
-rw-r--r-- | mojo/system/message_pipe_dispatcher.h | 19 | ||||
-rw-r--r-- | mojo/system/proxy_message_pipe_endpoint.cc | 20 | ||||
-rw-r--r-- | mojo/system/proxy_message_pipe_endpoint.h | 8 | ||||
-rw-r--r-- | mojo/system/remote_message_pipe_posix_unittest.cc | 67 |
15 files changed, 384 insertions, 64 deletions
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc index 804e4d7..02dc0da 100644 --- a/mojo/system/channel.cc +++ b/mojo/system/channel.cc @@ -105,13 +105,38 @@ void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, IdToEndpointInfoMap::const_iterator it = local_id_to_endpoint_info_map_.find(local_id); + // TODO(vtl): FIXME -- This check is wrong if this is in response to a + // |kSubtypeChannelRunMessagePipeEndpoint| message. We should report error. CHECK(it != local_id_to_endpoint_info_map_.end()); endpoint_info = it->second; } + // TODO(vtl): FIXME -- We need to handle the case that message pipe is already + // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|). endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); } +void Channel::RunRemoteMessagePipeEndpoint( + MessageInTransit::EndpointId local_id, + MessageInTransit::EndpointId remote_id) { + base::AutoLock locker(lock_); + + DCHECK(local_id_to_endpoint_info_map_.find(local_id) != + local_id_to_endpoint_info_map_.end()); + + scoped_ptr<MessageInTransit> message(new MessageInTransit( + MessageInTransit::kTypeChannel, + MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint, + 0, 0, NULL)); + message->set_source_id(local_id); + message->set_destination_id(remote_id); + if (!raw_channel_->WriteMessage(message.Pass())) { + // TODO(vtl): FIXME -- I guess we should report the error back somehow so + // that the dispatcher can be closed? + CHECK(false) << "Not yet handled"; + } +} + bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { base::AutoLock locker(lock_); if (!raw_channel_.get()) { @@ -202,26 +227,36 @@ void Channel::OnReadMessageForDownstream( // ownership of it. // TODO(vtl): Need to enforce limits on message size and handle count. scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); - std::vector<DispatcherTransport> transports(message->num_handles()); - // TODO(vtl): Create dispatchers for handles. - // TODO(vtl): It's bad that the current API will create equivalent dispatchers - // for the freshly-created ones, which is totally redundant. Make a version of - // |EnqueueMessage()| that passes ownership. - if (endpoint_info.message_pipe->EnqueueMessage( - MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(), - transports.empty() ? NULL : &transports) != MOJO_RESULT_OK) { + message->DeserializeDispatchers(this); + MojoResult result = endpoint_info.message_pipe->EnqueueMessage( + MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(), NULL); + if (result != MOJO_RESULT_OK) { HandleLocalError(base::StringPrintf( - "Failed to enqueue message to local destination ID %u", - static_cast<unsigned>(local_id))); + "Failed to enqueue message to local destination ID %u (result %d)", + static_cast<unsigned>(local_id), static_cast<int>(result))); return; } } void Channel::OnReadMessageForChannel( const MessageInTransit::View& message_view) { - // TODO(vtl): Currently no channel-only messages yet. - HandleRemoteError("Received invalid channel message"); - NOTREACHED(); + DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel); + + switch (message_view.subtype()) { + case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint: + // TODO(vtl): FIXME -- Error handling (also validation of + // source/destination IDs). + DVLOG(2) << "Handling channel message to run message pipe (local ID = " + << message_view.destination_id() << ", remote ID = " + << message_view.source_id() << ")"; + RunMessagePipeEndpoint(message_view.destination_id(), + message_view.source_id()); + break; + default: + HandleRemoteError("Received invalid channel message"); + NOTREACHED(); + break; + } } void Channel::HandleRemoteError(const base::StringPiece& error_message) { diff --git a/mojo/system/channel.h b/mojo/system/channel.h index 336aae9..c50bf49 100644 --- a/mojo/system/channel.h +++ b/mojo/system/channel.h @@ -80,6 +80,15 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel void RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id); + // Tells the other side of the channel to run a message pipe endpoint (which + // must already be attached); |local_id| and |remote_id| are relative to this + // channel (i.e., |local_id| is the other side's remote ID and |remote_id| is + // its local ID). + // TODO(vtl): Maybe we should just have a flag argument to + // |RunMessagePipeEndpoint()| that tells it to do this. + void RunRemoteMessagePipeEndpoint(MessageInTransit::EndpointId local_id, + MessageInTransit::EndpointId remote_id); + // This forwards |message| verbatim to |raw_channel_|. bool WriteMessage(scoped_ptr<MessageInTransit> message); diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc index 896d044..c22a73b 100644 --- a/mojo/system/dispatcher.cc +++ b/mojo/system/dispatcher.cc @@ -6,6 +6,7 @@ #include "base/logging.h" #include "mojo/system/constants.h" +#include "mojo/system/message_pipe_dispatcher.h" namespace mojo { namespace system { @@ -39,11 +40,11 @@ size_t Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize( // static bool Dispatcher::MessageInTransitAccess::SerializeAndClose( Dispatcher* dispatcher, - void* destination, Channel* channel, + void* destination, size_t* actual_size) { DCHECK(dispatcher); - return dispatcher->SerializeAndClose(destination, channel, actual_size); + return dispatcher->SerializeAndClose(channel, destination, actual_size); } // static @@ -52,7 +53,20 @@ scoped_refptr<Dispatcher> Dispatcher::MessageInTransitAccess::Deserialize( int32_t type, const void* source, size_t size) { - // TODO(vtl) + switch (static_cast<int32_t>(type)) { + case kTypeUnknown: + DVLOG(2) << "Deserializing invalid handle"; + return scoped_refptr<Dispatcher>(); + case kTypeMessagePipe: + return scoped_refptr<Dispatcher>( + MessagePipeDispatcher::Deserialize(channel, source, size)); + case kTypeDataPipeProducer: + case kTypeDataPipeConsumer: + LOG(WARNING) << "Deserialization of dispatcher type " << type + << " not supported"; + return scoped_refptr<Dispatcher>(); + } + LOG(WARNING) << "Unknown dispatcher type " << type; return scoped_refptr<Dispatcher>(); } @@ -333,8 +347,8 @@ size_t Dispatcher::GetMaximumSerializedSizeImplNoLock( return 0; } -bool Dispatcher::SerializeAndCloseImplNoLock(void* /*destination*/, - Channel* /*channel*/, +bool Dispatcher::SerializeAndCloseImplNoLock(Channel* /*channel*/, + void* /*destination*/, size_t* /*actual_size*/) { lock_.AssertAcquired(); DCHECK(is_closed_); @@ -379,8 +393,8 @@ size_t Dispatcher::GetMaximumSerializedSize(const Channel* channel) const { return GetMaximumSerializedSizeImplNoLock(channel); } -bool Dispatcher::SerializeAndClose(void* destination, - Channel* channel, +bool Dispatcher::SerializeAndClose(Channel* channel, + void* destination, size_t* actual_size) { DCHECK(destination); DCHECK(channel); @@ -401,7 +415,7 @@ bool Dispatcher::SerializeAndClose(void* destination, // No need to cancel waiters: we shouldn't have any (and shouldn't be in // |Core|'s handle table. - if (!SerializeAndCloseImplNoLock(destination, channel, actual_size)) + if (!SerializeAndCloseImplNoLock(channel, destination, actual_size)) return false; DCHECK_LE(*actual_size, max_size); diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h index 7b2bdb5..b4cd526 100644 --- a/mojo/system/dispatcher.h +++ b/mojo/system/dispatcher.h @@ -161,8 +161,8 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : static size_t GetMaximumSerializedSize(const Dispatcher* dispatcher, const Channel* channel); static bool SerializeAndClose(Dispatcher* dispatcher, - void* destination, Channel* channel, + void* destination, size_t* actual_size); // Deserialization API. @@ -236,8 +236,8 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : // being passed over a message pipe. virtual size_t GetMaximumSerializedSizeImplNoLock( const Channel* channel) const; - virtual bool SerializeAndCloseImplNoLock(void* destination, - Channel* channel, + virtual bool SerializeAndCloseImplNoLock(Channel* channel, + void* destination, size_t* actual_size); // Available to subclasses. (Note: Returns a non-const reference, just like @@ -280,8 +280,8 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : // success, in which case |*actual_size| is set to the amount it actually // wrote to |destination|. On failure, |*actual_size| should not be modified; // however, the dispatcher will still be closed. - bool SerializeAndClose(void* destination, - Channel* channel, + bool SerializeAndClose(Channel* channel, + void* destination, size_t* actual_size); // This protects the following members as well as any state added by diff --git a/mojo/system/embedder/embedder.cc b/mojo/system/embedder/embedder.cc index 84949c7..03b5440 100644 --- a/mojo/system/embedder/embedder.cc +++ b/mojo/system/embedder/embedder.cc @@ -10,10 +10,8 @@ #include "base/memory/scoped_ptr.h" #include "mojo/system/channel.h" #include "mojo/system/core_impl.h" -#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/message_pipe.h" #include "mojo/system/message_pipe_dispatcher.h" -#include "mojo/system/proxy_message_pipe_endpoint.h" namespace mojo { namespace embedder { @@ -57,24 +55,19 @@ MojoHandle CreateChannel( DidCreateChannelOnIOThreadCallback callback) { DCHECK(platform_handle.is_valid()); - scoped_refptr<system::MessagePipe> message_pipe( - new system::MessagePipe(scoped_ptr<system::MessagePipeEndpoint>( - new system::LocalMessagePipeEndpoint()), - scoped_ptr<system::MessagePipeEndpoint>( - new system::ProxyMessagePipeEndpoint()))); - scoped_refptr<system::MessagePipeDispatcher> dispatcher( - new system::MessagePipeDispatcher()); - dispatcher->Init(message_pipe, 0); + std::pair<scoped_refptr<system::MessagePipeDispatcher>, + scoped_refptr<system::MessagePipe> > remote_message_pipe = + system::MessagePipeDispatcher::CreateRemoteMessagePipe(); system::CoreImpl* core_impl = static_cast<system::CoreImpl*>(Core::Get()); DCHECK(core_impl); - MojoHandle rv = core_impl->AddDispatcher(dispatcher); + MojoHandle rv = core_impl->AddDispatcher(remote_message_pipe.first); // TODO(vtl): Do we properly handle the failure case here? if (rv != MOJO_HANDLE_INVALID) { io_thread_task_runner->PostTask(FROM_HERE, base::Bind(&CreateChannelOnIOThread, base::Passed(&platform_handle), - message_pipe, + remote_message_pipe.second, callback)); } return rv; diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h index d923321..bae0786 100644 --- a/mojo/system/local_message_pipe_endpoint.h +++ b/mojo/system/local_message_pipe_endpoint.h @@ -41,6 +41,9 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint MojoResult wake_result) OVERRIDE; virtual void RemoveWaiter(Waiter* waiter) OVERRIDE; + // This is only to be used by |ProxyMessagePipeEndpoint|: + MessageInTransitQueue* message_queue() { return &message_queue_; } + private: MojoWaitFlags SatisfiedFlags(); MojoWaitFlags SatisfiableFlags(); diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc index a444a8e..9ac129e 100644 --- a/mojo/system/message_in_transit.cc +++ b/mojo/system/message_in_transit.cc @@ -89,6 +89,8 @@ MessageInTransit::MessageInTransit(Type type, } } +// TODO(vtl): Do I really want/need to copy the secondary buffer here, or should +// I just create (deserialize) the dispatchers right away? MessageInTransit::MessageInTransit(const View& message_view) : main_buffer_size_(message_view.main_buffer_size()), main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)), @@ -200,16 +202,19 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { continue; } + void* destination = static_cast<char*>(secondary_buffer_) + current_offset; size_t actual_size = 0; if (Dispatcher::MessageInTransitAccess::SerializeAndClose( - dispatcher, static_cast<char*>(secondary_buffer_) + current_offset, - channel, &actual_size)) { + dispatcher, channel, destination, &actual_size)) { handle_table[i].type = static_cast<int32_t>(dispatcher->GetType()); handle_table[i].offset = static_cast<uint32_t>(current_offset); handle_table[i].size = static_cast<uint32_t>(actual_size); + } else { + // (Nothing to do on failure, since |secondary_buffer_| was cleared, and + // |kTypeUnknown| is zero.) + // The handle will simply be closed. + LOG(ERROR) << "Failed to serialize handle to remote message pipe"; } - // (Nothing to do on failure, since |secondary_buffer_| was cleared, and - // |kTypeUnknown| is zero.) current_offset += RoundUpMessageAlignment(actual_size); DCHECK_LE(current_offset, size); @@ -218,6 +223,44 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { UpdateTotalSize(); } +void MessageInTransit::DeserializeDispatchers(Channel* channel) { + DCHECK(!dispatchers_.get()); + + if (!num_handles()) + return; + + // TODO(vtl): Restrict |num_handles()| to a sane range. (Maybe this should be + // done earlier?) + + dispatchers_.reset( + new std::vector<scoped_refptr<Dispatcher> >(num_handles())); + + size_t handle_table_size = num_handles() * sizeof(HandleTableEntry); + if (secondary_buffer_size_ < handle_table_size) { + LOG(ERROR) << "Serialized handle table too small"; + return; + } + + const HandleTableEntry* handle_table = + static_cast<const HandleTableEntry*>(secondary_buffer_); + for (size_t i = 0; i < num_handles(); i++) { + size_t offset = handle_table[i].offset; + size_t size = handle_table[i].size; + // TODO(vtl): Sanity-check the size. + if (offset % kMessageAlignment != 0 || offset > secondary_buffer_size_ || + offset + size > secondary_buffer_size_) { + // TODO(vtl): Maybe should report error (and make it possible to kill the + // connection with extreme prejudice). + LOG(ERROR) << "Invalid serialized handle table entry"; + continue; + } + + const void* source = static_cast<const char*>(secondary_buffer_) + offset; + (*dispatchers_)[i] = Dispatcher::MessageInTransitAccess::Deserialize( + channel, handle_table[i].type, source, size); + } +} + void MessageInTransit::UpdateTotalSize() { DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u); DCHECK_EQ(secondary_buffer_size_ % kMessageAlignment, 0u); diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index cbb7271..7a8475d 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -54,6 +54,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { static const Subtype kSubtypeMessagePipeEndpointData = 0; // Subtypes for type |kTypeMessagePipe|: static const Subtype kSubtypeMessagePipePeerClosed = 0; + // Subtypes for type |kTypeChannel|: + static const Subtype kSubtypeChannelRunMessagePipeEndpoint = 0; typedef uint32_t EndpointId; // Never a valid endpoint ID. @@ -146,6 +148,12 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { // stays alive through the call. void SerializeAndCloseDispatchers(Channel* channel); + // Deserializes any dispatchers from the secondary buffer. This message must + // not have any dispatchers attached. + // TODO(vtl): Having to copy the secondary buffer (in the constructor from a + // |View|) is suboptimal. Maybe this should just be done in the constructor? + void DeserializeDispatchers(Channel* channel); + // Gets the main buffer and its size (in number of bytes), respectively. const void* main_buffer() const { return main_buffer_; } size_t main_buffer_size() const { return main_buffer_size_; } @@ -185,6 +193,11 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { return dispatchers_.get(); } + // Returns true if this message has dispatchers attached. + bool has_dispatchers() const { + return dispatchers_.get() && !dispatchers_->empty(); + } + // Rounds |n| up to a multiple of |kMessageAlignment|. static inline size_t RoundUpMessageAlignment(size_t n) { return (n + kMessageAlignment - 1) & ~(kMessageAlignment - 1); @@ -215,7 +228,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { struct HandleTableEntry { int32_t type; // From |Dispatcher::Type| (|kTypeUnknown| for "invalid"). - uint32_t offset; + uint32_t offset; // Relative to the start of the secondary buffer. uint32_t size; // (Not including any padding.) uint32_t unused; }; diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc index b18fb75..e450569 100644 --- a/mojo/system/message_pipe.cc +++ b/mojo/system/message_pipe.cc @@ -35,6 +35,8 @@ unsigned MessagePipe::GetPeerPort(unsigned port) { MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { DCHECK(port == 0 || port == 1); base::AutoLock locker(lock_); + DCHECK(endpoints_[port].get()); + return endpoints_[port]->GetType(); } @@ -116,15 +118,49 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { endpoints_[port]->RemoveWaiter(waiter); } +void MessagePipe::ConvertLocalToProxy(unsigned port) { + DCHECK(port == 0 || port == 1); + + base::AutoLock locker(lock_); + DCHECK(endpoints_[port].get()); + DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); + + bool is_peer_open = !!endpoints_[GetPeerPort(port)].get(); + + // TODO(vtl): Hopefully this will work if the peer has been closed and when + // the peer is local. If the peer is remote, we should do something more + // sophisticated. + DCHECK(!is_peer_open || + endpoints_[GetPeerPort(port)]->GetType() == + MessagePipeEndpoint::kTypeLocal); + + scoped_ptr<MessagePipeEndpoint> replacement_endpoint( + new ProxyMessagePipeEndpoint( + static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()), + is_peer_open)); + endpoints_[port].swap(replacement_endpoint); +} + MojoResult MessagePipe::EnqueueMessage( unsigned port, scoped_ptr<MessageInTransit> message, std::vector<DispatcherTransport>* transports) { DCHECK(port == 0 || port == 1); DCHECK(message.get()); - DCHECK((!transports && message->num_handles() == 0) || - (transports && transports->size() > 0 && - message->num_handles() == transports->size())); + if (message->num_handles() == 0) { + // If |message->num_handles()| is 0, then |transports| should be null and + // |message| should not have dispatchers. + DCHECK(!transports); + DCHECK(!message->has_dispatchers()); + } else { + // Otherwise either |transports| must be (non-null and) of the right size + // and the message shouldn't have dispatchers, or |transports| must be null + // and the message should have the right number of dispatchers. + DCHECK((transports && transports->size() == message->num_handles() && + !message->has_dispatchers()) || + (!transports && message->has_dispatchers() && + message->dispatchers()->size() == message->num_handles())); + } if (message->type() == MessageInTransit::kTypeMessagePipe) { DCHECK(!transports); @@ -141,6 +177,8 @@ MojoResult MessagePipe::EnqueueMessage( return MOJO_RESULT_FAILED_PRECONDITION; if (transports) { + DCHECK(!message->dispatchers()); + // You're not allowed to send either handle to a message pipe over the // message pipe, so check for this. (The case of trying to write a handle to // itself is taken care of by |CoreImpl|. That case kind of makes sense, but diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h index 33e49a1..6815dcb 100644 --- a/mojo/system/message_pipe.h +++ b/mojo/system/message_pipe.h @@ -68,10 +68,15 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe : MojoResult wake_result); void RemoveWaiter(unsigned port, Waiter* waiter); + // This is called by the dispatcher to convert a local endpoint to a proxy + // endpoint. + void ConvertLocalToProxy(unsigned port); + // This is used internally by |WriteMessage()| and by |Channel| to enqueue // messages (typically to a |LocalMessagePipeEndpoint|). Unlike - // |WriteMessage()|, |port| is the *destination* port. |dispatchers| should be - // non-null only if it's nonempty. + // |WriteMessage()|, |port| is the *destination* port. |transports| should be + // non-null only if it's nonempty, and only if |message| has no dispatchers + // attached. MojoResult EnqueueMessage(unsigned port, scoped_ptr<MessageInTransit> message, std::vector<DispatcherTransport>* transports); diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc index 2390acb..bf9e5b0 100644 --- a/mojo/system/message_pipe_dispatcher.cc +++ b/mojo/system/message_pipe_dispatcher.cc @@ -5,15 +5,27 @@ #include "mojo/system/message_pipe_dispatcher.h" #include "base/logging.h" +#include "mojo/system/channel.h" #include "mojo/system/constants.h" +#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/memory.h" +#include "mojo/system/message_in_transit.h" #include "mojo/system/message_pipe.h" +#include "mojo/system/proxy_message_pipe_endpoint.h" namespace mojo { namespace system { +namespace { + const unsigned kInvalidPort = static_cast<unsigned>(-1); +struct SerializedMessagePipeDispatcher { + MessageInTransit::EndpointId endpoint_id; +}; + +} // namespace + // MessagePipeDispatcher ------------------------------------------------------- MessagePipeDispatcher::MessagePipeDispatcher() @@ -33,6 +45,45 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { return kTypeMessagePipe; } +// static +std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> > +MessagePipeDispatcher::CreateRemoteMessagePipe() { + scoped_refptr<MessagePipe> message_pipe( + new MessagePipe( + scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), + scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); + scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher()); + dispatcher->Init(message_pipe, 0); + + return std::make_pair(dispatcher, message_pipe); +} + +// static +scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( + Channel* channel, + const void* source, + size_t size) { + if (size != sizeof(SerializedMessagePipeDispatcher)) { + LOG(ERROR) << "Invalid serialized message pipe dispatcher"; + return scoped_refptr<MessagePipeDispatcher>(); + } + + std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> > + remote_message_pipe = CreateRemoteMessagePipe(); + + MessageInTransit::EndpointId remote_id = + static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id; + MessageInTransit::EndpointId local_id = + channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1); + DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " + << remote_id << ", new local ID = " << local_id << ")"; + + channel->RunMessagePipeEndpoint(local_id, remote_id); + // TODO(vtl): FIXME -- Need some error handling here. + channel->RunRemoteMessagePipeEndpoint(local_id, remote_id); + return remote_message_pipe.first; +} + MessagePipeDispatcher::~MessagePipeDispatcher() { // |Close()|/|CloseImplNoLock()| should have taken care of the pipe. DCHECK(!message_pipe_.get()); @@ -121,6 +172,40 @@ void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) { message_pipe_->RemoveWaiter(port_, waiter); } +size_t MessagePipeDispatcher::GetMaximumSerializedSizeImplNoLock( + const Channel* /*channel*/) const { + lock().AssertAcquired(); + return sizeof(SerializedMessagePipeDispatcher); +} + +bool MessagePipeDispatcher::SerializeAndCloseImplNoLock(Channel* channel, + void* destination, + size_t* actual_size) { + lock().AssertAcquired(); + + // Convert the local endpoint to a proxy endpoint (moving the message queue). + message_pipe_->ConvertLocalToProxy(port_); + + // Attach the new proxy endpoint to the channel. + MessageInTransit::EndpointId endpoint_id = + channel->AttachMessagePipeEndpoint(message_pipe_, port_); + DCHECK_NE(endpoint_id, MessageInTransit::kInvalidEndpointId); + + DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id + << ")"; + + // We now have a local ID. Before we can run the proxy endpoint, we need to + // get an ack back from the other side with the remote ID. + static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id = + endpoint_id; + + message_pipe_ = NULL; + port_ = kInvalidPort; + + *actual_size = sizeof(SerializedMessagePipeDispatcher); + return true; +} + // MessagePipeDispatcherTransport ---------------------------------------------- MessagePipeDispatcherTransport::MessagePipeDispatcherTransport( diff --git a/mojo/system/message_pipe_dispatcher.h b/mojo/system/message_pipe_dispatcher.h index f9ce641..34e4b96 100644 --- a/mojo/system/message_pipe_dispatcher.h +++ b/mojo/system/message_pipe_dispatcher.h @@ -5,6 +5,8 @@ #ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ #define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ +#include <utility> + #include "base/basictypes.h" #include "base/compiler_specific.h" #include "base/memory/ref_counted.h" @@ -28,6 +30,18 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher { virtual Type GetType() const OVERRIDE; + // Creates a |MessagePipe| with a local endpoint (at port 0) and a proxy + // endpoint, and creates/initializes a |MessagePipeDispatcher| (attached to + // the message pipe, port 0). + static std::pair<scoped_refptr<MessagePipeDispatcher>, + scoped_refptr<MessagePipe> > CreateRemoteMessagePipe(); + + // The "opposite" of |SerializeAndClose()|. (Typically this is called by + // |Dispatcher::Deserialize()|.) + static scoped_refptr<MessagePipeDispatcher> Deserialize(Channel* channel, + const void* source, + size_t size); + private: friend class MessagePipeDispatcherTransport; @@ -62,6 +76,11 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher { MojoWaitFlags flags, MojoResult wake_result) OVERRIDE; virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE; + virtual size_t GetMaximumSerializedSizeImplNoLock( + const Channel* channel) const OVERRIDE; + virtual bool SerializeAndCloseImplNoLock(Channel* channel, + void* destination, + size_t* actual_size) OVERRIDE; // Protected by |lock()|: scoped_refptr<MessagePipe> message_pipe_; // This will be null if closed. diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc index 95fe180..d5ef656 100644 --- a/mojo/system/proxy_message_pipe_endpoint.cc +++ b/mojo/system/proxy_message_pipe_endpoint.cc @@ -8,6 +8,7 @@ #include "base/logging.h" #include "mojo/system/channel.h" +#include "mojo/system/local_message_pipe_endpoint.h" #include "mojo/system/message_pipe_dispatcher.h" namespace mojo { @@ -20,6 +21,18 @@ ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() is_peer_open_(true) { } +ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( + LocalMessagePipeEndpoint* local_message_pipe_endpoint, + bool is_peer_open) + : local_id_(MessageInTransit::kInvalidEndpointId), + remote_id_(MessageInTransit::kInvalidEndpointId), + is_open_(true), + is_peer_open_(is_peer_open), + paused_message_queue_(MessageInTransitQueue::PassContents(), + local_message_pipe_endpoint->message_queue()) { + local_message_pipe_endpoint->Close(); +} + ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { DCHECK(!is_running()); DCHECK(!is_attached()); @@ -61,13 +74,6 @@ void ProxyMessagePipeEndpoint::EnqueueMessage( scoped_ptr<MessageInTransit> message) { DCHECK(is_open_); - if (message->dispatchers() && !message->dispatchers()->empty()) { - // Since the dispatchers are attached to the message, they'll be closed on - // message destruction. - LOG(ERROR) << "Sending handles over remote message pipes not yet supported " - "(sent handles will simply be closed)"; - } - if (is_running()) { message->SerializeAndCloseDispatchers(channel_.get()); diff --git a/mojo/system/proxy_message_pipe_endpoint.h b/mojo/system/proxy_message_pipe_endpoint.h index f3cc221..c1ed3c0 100644 --- a/mojo/system/proxy_message_pipe_endpoint.h +++ b/mojo/system/proxy_message_pipe_endpoint.h @@ -20,6 +20,7 @@ namespace mojo { namespace system { class Channel; +class LocalMessagePipeEndpoint; class MessagePipe; // A |ProxyMessagePipeEndpoint| connects an end of a |MessagePipe| to a @@ -41,6 +42,13 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint : public MessagePipeEndpoint { public: ProxyMessagePipeEndpoint(); + // Constructs a |ProxyMessagePipeEndpoint| that replaces the given + // |LocalMessagePipeEndpoint| (which this constructor will close), taking its + // message queue's contents. This is done when transferring a message pipe + // handle over a remote message pipe. + ProxyMessagePipeEndpoint( + LocalMessagePipeEndpoint* local_message_pipe_endpoint, + bool is_peer_open); virtual ~ProxyMessagePipeEndpoint(); // |MessagePipeEndpoint| implementation: diff --git a/mojo/system/remote_message_pipe_posix_unittest.cc b/mojo/system/remote_message_pipe_posix_unittest.cc index c76d5b6..bd279e8 100644 --- a/mojo/system/remote_message_pipe_posix_unittest.cc +++ b/mojo/system/remote_message_pipe_posix_unittest.cc @@ -439,7 +439,7 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) { scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); ConnectMessagePipes(mp0, mp1); - // We'll try to pass one of these dispatc + // We'll try to pass this dispatcher. scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher()); scoped_refptr<MessagePipe> local_mp(new MessagePipe()); dispatcher->Init(local_mp, 0); @@ -459,9 +459,7 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) { std::vector<DispatcherTransport> transports; transports.push_back(transport); EXPECT_EQ(MOJO_RESULT_OK, - mp0->WriteMessage(0, - hello, sizeof(hello), - &transports, + mp0->WriteMessage(0, hello, sizeof(hello), &transports, MOJO_WRITE_MESSAGE_FLAG_NONE)); transport.End(); @@ -481,22 +479,73 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) { std::vector<scoped_refptr<Dispatcher> > read_dispatchers; uint32_t read_num_dispatchers = 10; // Maximum to get. EXPECT_EQ(MOJO_RESULT_OK, - mp1->ReadMessage(1, - read_buffer, &read_buffer_size, + mp1->ReadMessage(1, read_buffer, &read_buffer_size, &read_dispatchers, &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE)); EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size)); EXPECT_STREQ(hello, read_buffer); EXPECT_EQ(1u, read_dispatchers.size()); EXPECT_EQ(1u, read_num_dispatchers); + ASSERT_TRUE(read_dispatchers[0].get()); + EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); + + EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType()); + dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get()); + + // Write to "local_mp", port 1. + EXPECT_EQ(MOJO_RESULT_OK, + local_mp->WriteMessage(1, hello, sizeof(hello), NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately + // here. (We don't crash if I sleep and then close.) + + // Wait for the dispatcher to become readable. + waiter.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + dispatcher->AddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 456)); + EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE)); + dispatcher->RemoveWaiter(&waiter); + + // Read from the dispatcher. + memset(read_buffer, 0, sizeof(read_buffer)); + read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); + EXPECT_EQ(MOJO_RESULT_OK, + dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size)); + EXPECT_STREQ(hello, read_buffer); + + // Prepare to wait on "local_mp", port 1. + waiter.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + local_mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789)); + + // Write to the dispatcher. + EXPECT_EQ(MOJO_RESULT_OK, + dispatcher->WriteMessage(hello, sizeof(hello), NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Wait. + EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE)); + local_mp->RemoveWaiter(1, &waiter); + + // Read from "local_mp", port 1. + memset(read_buffer, 0, sizeof(read_buffer)); + read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); + EXPECT_EQ(MOJO_RESULT_OK, + local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size)); + EXPECT_STREQ(hello, read_buffer); - // TODO(vtl): Once we can pass the local message pipe handle (over a remote - // message pipe), this will fail. - EXPECT_FALSE(read_dispatchers[0].get()); + // TODO(vtl): Also test that messages queued up before the handle was sent are + // delivered properly. // Close everything that belongs to us. mp0->Close(0); mp1->Close(1); + EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close()); // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed. local_mp->Close(1); } |