summaryrefslogtreecommitdiffstats
path: root/mojo/edk/system/message_pipe.cc
diff options
context:
space:
mode:
Diffstat (limited to 'mojo/edk/system/message_pipe.cc')
-rw-r--r--mojo/edk/system/message_pipe.cc119
1 files changed, 79 insertions, 40 deletions
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
index 731fd90..8e1be57 100644
--- a/mojo/edk/system/message_pipe.cc
+++ b/mojo/edk/system/message_pipe.cc
@@ -139,7 +139,7 @@ MojoResult MessagePipe::WriteMessage(
std::vector<DispatcherTransport>* transports,
MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
- return EnqueueMessageInternal(
+ return EnqueueMessage(
GetPeerPort(port),
make_scoped_ptr(new MessageInTransit(
MessageInTransit::kTypeMessagePipeEndpoint,
@@ -209,57 +209,96 @@ bool MessagePipe::EndSerialize(
void* destination,
size_t* actual_size,
embedder::PlatformHandleVector* /*platform_handles*/) {
+ DCHECK(port == 0 || port == 1);
+
+ scoped_refptr<ChannelEndpoint> channel_endpoint;
+ {
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port]);
+
+ // The port being serialized must be local.
+ DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
+
+ // There are three possibilities for the peer port (below). In all cases, we
+ // pass the contents of |port|'s message queue to the channel, and it'll
+ // (presumably) make a |ChannelEndpoint| from it.
+ //
+ // 1. The peer port is (known to be) closed.
+ //
+ // There's no reason for us to continue to exist and no need for the
+ // channel to give us the |ChannelEndpoint|. It only remains for us to
+ // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for
+ // destruction.
+ //
+ // 2. The peer port is local (the typical case).
+ //
+ // The channel gives us back a |ChannelEndpoint|, which we hook up to a
+ // |ProxyMessagePipeEndpoint| to replace |port|'s
+ // |LocalMessagePipeEndpoint|. We continue to exist, since the peer
+ // port's message pipe dispatcher will continue to hold a reference to
+ // us.
+ //
+ // 3. The peer port is remote.
+ //
+ // We also pass its |ChannelEndpoint| to the channel, which then decides
+ // what to do. We have no reason to continue to exist.
+ //
+ // TODO(vtl): Factor some of this out to |ChannelEndpoint|.
+
+ if (!endpoints_[GetPeerPort(port)]) {
+ // Case 1.
+ channel_endpoint = new ChannelEndpoint(
+ nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
+ endpoints_[port].get())->message_queue());
+ endpoints_[port]->Close();
+ endpoints_[port].reset();
+ } else if (endpoints_[GetPeerPort(port)]->GetType() ==
+ MessagePipeEndpoint::kTypeLocal) {
+ // Case 2.
+ channel_endpoint = new ChannelEndpoint(
+ this, port, static_cast<LocalMessagePipeEndpoint*>(
+ endpoints_[port].get())->message_queue());
+ endpoints_[port]->Close();
+ endpoints_[port].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint.get()));
+ } else {
+ // Case 3.
+ // TODO(vtl): Temporarily the same as case 2.
+ DLOG(WARNING) << "Direct message pipe passing across multiple channels "
+ "not yet implemented; will proxy";
+ channel_endpoint = new ChannelEndpoint(
+ this, port, static_cast<LocalMessagePipeEndpoint*>(
+ endpoints_[port].get())->message_queue());
+ endpoints_[port]->Close();
+ endpoints_[port].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint.get()));
+ }
+ }
+
SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
// Convert the local endpoint to a proxy endpoint (moving the message queue)
// and attach it to the channel.
s->receiver_endpoint_id =
- channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false);
+ channel->AttachAndRunEndpoint(channel_endpoint, false);
DVLOG(2) << "Serializing message pipe (remote ID = "
<< s->receiver_endpoint_id << ")";
*actual_size = sizeof(SerializedMessagePipe);
return true;
}
-scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
- DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
-
- // The local peer is already closed, so just make a |ChannelEndpoint| that'll
- // send the already-queued messages.
- if (!endpoints_[GetPeerPort(port)]) {
- scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
- nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
- endpoints_[port].get())->message_queue()));
- endpoints_[port]->Close();
- endpoints_[port].reset();
- return channel_endpoint;
- }
-
- // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
- // |MessagePipe| with two proxy endpoints, which will then act as a proxy
- // (rather than trying to connect the two ends directly).
- DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() !=
- MessagePipeEndpoint::kTypeLocal)
- << "Direct message pipe passing across multiple channels not yet "
- "implemented; will proxy";
-
- scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
- scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
- this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
- ->message_queue()));
- endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
- old_endpoint->Close();
-
- return channel_endpoint;
+bool MessagePipe::OnReadMessage(unsigned port,
+ scoped_ptr<MessageInTransit> message) {
+ // This is called when the |ChannelEndpoint| for the
+ // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
+ // We need to pass this message on to its peer port (typically a
+ // |LocalMessagePipeEndpoint|).
+ return EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr) ==
+ MOJO_RESULT_OK;
}
-MojoResult MessagePipe::EnqueueMessage(unsigned port,
- scoped_ptr<MessageInTransit> message) {
- return EnqueueMessageInternal(port, message.Pass(), nullptr);
+void MessagePipe::OnDetachFromChannel(unsigned port) {
+ Close(port);
}
MessagePipe::MessagePipe() {
@@ -273,7 +312,7 @@ MessagePipe::~MessagePipe() {
DCHECK(!endpoints_[1]);
}
-MojoResult MessagePipe::EnqueueMessageInternal(
+MojoResult MessagePipe::EnqueueMessage(
unsigned port,
scoped_ptr<MessageInTransit> message,
std::vector<DispatcherTransport>* transports) {