diff options
Diffstat (limited to 'mojo/edk/system/message_pipe.cc')
-rw-r--r-- | mojo/edk/system/message_pipe.cc | 119 |
1 files changed, 79 insertions, 40 deletions
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc index 731fd90..8e1be57 100644 --- a/mojo/edk/system/message_pipe.cc +++ b/mojo/edk/system/message_pipe.cc @@ -139,7 +139,7 @@ MojoResult MessagePipe::WriteMessage( std::vector<DispatcherTransport>* transports, MojoWriteMessageFlags flags) { DCHECK(port == 0 || port == 1); - return EnqueueMessageInternal( + return EnqueueMessage( GetPeerPort(port), make_scoped_ptr(new MessageInTransit( MessageInTransit::kTypeMessagePipeEndpoint, @@ -209,57 +209,96 @@ bool MessagePipe::EndSerialize( void* destination, size_t* actual_size, embedder::PlatformHandleVector* /*platform_handles*/) { + DCHECK(port == 0 || port == 1); + + scoped_refptr<ChannelEndpoint> channel_endpoint; + { + base::AutoLock locker(lock_); + DCHECK(endpoints_[port]); + + // The port being serialized must be local. + DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); + + // There are three possibilities for the peer port (below). In all cases, we + // pass the contents of |port|'s message queue to the channel, and it'll + // (presumably) make a |ChannelEndpoint| from it. + // + // 1. The peer port is (known to be) closed. + // + // There's no reason for us to continue to exist and no need for the + // channel to give us the |ChannelEndpoint|. It only remains for us to + // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for + // destruction. + // + // 2. The peer port is local (the typical case). + // + // The channel gives us back a |ChannelEndpoint|, which we hook up to a + // |ProxyMessagePipeEndpoint| to replace |port|'s + // |LocalMessagePipeEndpoint|. We continue to exist, since the peer + // port's message pipe dispatcher will continue to hold a reference to + // us. + // + // 3. The peer port is remote. + // + // We also pass its |ChannelEndpoint| to the channel, which then decides + // what to do. We have no reason to continue to exist. + // + // TODO(vtl): Factor some of this out to |ChannelEndpoint|. + + if (!endpoints_[GetPeerPort(port)]) { + // Case 1. + channel_endpoint = new ChannelEndpoint( + nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( + endpoints_[port].get())->message_queue()); + endpoints_[port]->Close(); + endpoints_[port].reset(); + } else if (endpoints_[GetPeerPort(port)]->GetType() == + MessagePipeEndpoint::kTypeLocal) { + // Case 2. + channel_endpoint = new ChannelEndpoint( + this, port, static_cast<LocalMessagePipeEndpoint*>( + endpoints_[port].get())->message_queue()); + endpoints_[port]->Close(); + endpoints_[port].reset( + new ProxyMessagePipeEndpoint(channel_endpoint.get())); + } else { + // Case 3. + // TODO(vtl): Temporarily the same as case 2. + DLOG(WARNING) << "Direct message pipe passing across multiple channels " + "not yet implemented; will proxy"; + channel_endpoint = new ChannelEndpoint( + this, port, static_cast<LocalMessagePipeEndpoint*>( + endpoints_[port].get())->message_queue()); + endpoints_[port]->Close(); + endpoints_[port].reset( + new ProxyMessagePipeEndpoint(channel_endpoint.get())); + } + } + SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); // Convert the local endpoint to a proxy endpoint (moving the message queue) // and attach it to the channel. s->receiver_endpoint_id = - channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false); + channel->AttachAndRunEndpoint(channel_endpoint, false); DVLOG(2) << "Serializing message pipe (remote ID = " << s->receiver_endpoint_id << ")"; *actual_size = sizeof(SerializedMessagePipe); return true; } -scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); - - // The local peer is already closed, so just make a |ChannelEndpoint| that'll - // send the already-queued messages. - if (!endpoints_[GetPeerPort(port)]) { - scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( - nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( - endpoints_[port].get())->message_queue())); - endpoints_[port]->Close(); - endpoints_[port].reset(); - return channel_endpoint; - } - - // TODO(vtl): Allowing this case is a temporary hack. It'll set up a - // |MessagePipe| with two proxy endpoints, which will then act as a proxy - // (rather than trying to connect the two ends directly). - DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() != - MessagePipeEndpoint::kTypeLocal) - << "Direct message pipe passing across multiple channels not yet " - "implemented; will proxy"; - - scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); - scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( - this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) - ->message_queue())); - endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); - old_endpoint->Close(); - - return channel_endpoint; +bool MessagePipe::OnReadMessage(unsigned port, + scoped_ptr<MessageInTransit> message) { + // This is called when the |ChannelEndpoint| for the + // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). + // We need to pass this message on to its peer port (typically a + // |LocalMessagePipeEndpoint|). + return EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr) == + MOJO_RESULT_OK; } -MojoResult MessagePipe::EnqueueMessage(unsigned port, - scoped_ptr<MessageInTransit> message) { - return EnqueueMessageInternal(port, message.Pass(), nullptr); +void MessagePipe::OnDetachFromChannel(unsigned port) { + Close(port); } MessagePipe::MessagePipe() { @@ -273,7 +312,7 @@ MessagePipe::~MessagePipe() { DCHECK(!endpoints_[1]); } -MojoResult MessagePipe::EnqueueMessageInternal( +MojoResult MessagePipe::EnqueueMessage( unsigned port, scoped_ptr<MessageInTransit> message, std::vector<DispatcherTransport>* transports) { |