summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--content/common/mojo/channel_init.cc30
-rw-r--r--content/common/mojo/channel_init.h6
-rw-r--r--mash/shell/shell_application_delegate.cc8
-rw-r--r--mojo/edk/embedder/embedder.cc20
-rw-r--r--mojo/edk/embedder/embedder.h34
-rw-r--r--mojo/edk/system/core.cc71
-rw-r--r--mojo/edk/system/core.h25
-rw-r--r--mojo/edk/system/multiprocess_message_pipe_unittest.cc39
-rw-r--r--mojo/edk/system/node_channel.cc47
-rw-r--r--mojo/edk/system/node_channel.h17
-rw-r--r--mojo/edk/system/node_controller.cc204
-rw-r--r--mojo/edk/system/node_controller.h104
-rw-r--r--mojo/edk/system/ports/event.h6
-rw-r--r--mojo/edk/system/ports/message.cc3
-rw-r--r--mojo/edk/system/ports/name.h7
-rw-r--r--mojo/edk/system/ports/node.cc179
-rw-r--r--mojo/edk/system/ports/node.h17
-rw-r--r--mojo/edk/system/ports/ports_unittest.cc338
-rw-r--r--mojo/edk/system/remote_message_pipe_bootstrap.cc23
-rw-r--r--mojo/edk/system/remote_message_pipe_bootstrap.h22
-rw-r--r--mojo/shell/runner/child/runner_connection.cc16
-rw-r--r--mojo/shell/runner/host/child_process.cc53
-rw-r--r--mojo/shell/runner/host/child_process_host.cc111
-rw-r--r--mojo/shell/runner/host/child_process_host.h62
24 files changed, 748 insertions, 694 deletions
diff --git a/content/common/mojo/channel_init.cc b/content/common/mojo/channel_init.cc
index 8eb156a..3b7fe21 100644
--- a/content/common/mojo/channel_init.cc
+++ b/content/common/mojo/channel_init.cc
@@ -19,17 +19,6 @@
namespace content {
-namespace {
-
-void CallMessagePipeCallbackOnThread(
- const base::Callback<void(mojo::ScopedMessagePipeHandle)>& callback,
- scoped_refptr<base::TaskRunner> task_runner,
- mojo::ScopedMessagePipeHandle pipe) {
- task_runner->PostTask(FROM_HERE, base::Bind(callback, base::Passed(&pipe)));
-}
-
-} // namespace
-
ChannelInit::ChannelInit() : channel_info_(nullptr), weak_factory_(this) {}
ChannelInit::~ChannelInit() {
@@ -45,14 +34,9 @@ void ChannelInit::Init(
scoped_ptr<IPC::ScopedIPCSupport> ipc_support(
new IPC::ScopedIPCSupport(io_thread_task_runner));
if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
- mojo::edk::CreateMessagePipe(
- mojo::edk::ScopedPlatformHandle(mojo::edk::PlatformHandle(file)),
- base::Bind(&CallMessagePipeCallbackOnThread,
- base::Bind(&ChannelInit::OnCreateMessagePipe,
- weak_factory_.GetWeakPtr(),
- base::Passed(&ipc_support),
- callback),
- base::ThreadTaskRunnerHandle::Get()));
+ ipc_support_ = std::move(ipc_support);
+ callback.Run(mojo::edk::CreateMessagePipe(
+ mojo::edk::ScopedPlatformHandle(mojo::edk::PlatformHandle(file))));
} else {
mojo::ScopedMessagePipeHandle message_pipe = mojo::embedder::CreateChannel(
mojo::embedder::ScopedPlatformHandle(
@@ -86,12 +70,4 @@ void ChannelInit::OnCreatedChannel(
self->ipc_support_ = std::move(ipc_support);
}
-void ChannelInit::OnCreateMessagePipe(
- scoped_ptr<IPC::ScopedIPCSupport> ipc_support,
- const base::Callback<void(mojo::ScopedMessagePipeHandle)>& callback,
- mojo::ScopedMessagePipeHandle pipe) {
- ipc_support_ = std::move(ipc_support);
- callback.Run(std::move(pipe));
-}
-
} // namespace content
diff --git a/content/common/mojo/channel_init.h b/content/common/mojo/channel_init.h
index 4c19053..34d0c95 100644
--- a/content/common/mojo/channel_init.h
+++ b/content/common/mojo/channel_init.h
@@ -47,12 +47,6 @@ class CONTENT_EXPORT ChannelInit {
scoped_ptr<IPC::ScopedIPCSupport> ipc_support,
mojo::embedder::ChannelInfo* channel);
- // Callback used with the new ports EDK on pipe creation.
- void OnCreateMessagePipe(
- scoped_ptr<IPC::ScopedIPCSupport> ipc_support,
- const base::Callback<void(mojo::ScopedMessagePipeHandle)>& callback,
- mojo::ScopedMessagePipeHandle pipe);
-
// If non-null the channel has been established.
mojo::embedder::ChannelInfo* channel_info_;
diff --git a/mash/shell/shell_application_delegate.cc b/mash/shell/shell_application_delegate.cc
index 032ad13..a88c6d6 100644
--- a/mash/shell/shell_application_delegate.cc
+++ b/mash/shell/shell_application_delegate.cc
@@ -117,8 +117,12 @@ void ShellApplicationDelegate::StartRestartableService(
// TODO(beng): This would be the place to insert logic that counted restarts
// to avoid infinite crash-restart loops.
scoped_ptr<mojo::Connection> connection = shell_->Connect(url);
- connection->SetRemoteServiceProviderConnectionErrorHandler(restart_callback);
- connections_[url] = std::move(connection);
+ // Note: |connection| may be null if we've lost our connection to the shell.
+ if (connection) {
+ connection->SetRemoteServiceProviderConnectionErrorHandler(
+ restart_callback);
+ connections_[url] = std::move(connection);
+ }
}
} // namespace shell
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index d1ab6c6..926ee18 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -107,29 +107,37 @@ void ShutdownIPCSupport() {
ScopedMessagePipeHandle CreateMessagePipe(
ScopedPlatformHandle platform_handle) {
- NOTREACHED();
- return ScopedMessagePipeHandle();
+ DCHECK(internal::g_core);
+ return internal::g_core->CreateMessagePipe(std::move(platform_handle));
}
void CreateMessagePipe(
ScopedPlatformHandle platform_handle,
const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
DCHECK(internal::g_core);
- internal::g_core->CreateMessagePipe(std::move(platform_handle), callback);
+ callback.Run(CreateMessagePipe(std::move(platform_handle)));
+}
+
+ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token) {
+ DCHECK(internal::g_core);
+ return internal::g_core->CreateParentMessagePipe(token);
}
void CreateParentMessagePipe(
const std::string& token,
const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
+ callback.Run(CreateParentMessagePipe(token));
+}
+
+ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token) {
DCHECK(internal::g_core);
- internal::g_core->CreateParentMessagePipe(token, callback);
+ return internal::g_core->CreateChildMessagePipe(token);
}
void CreateChildMessagePipe(
const std::string& token,
const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- DCHECK(internal::g_core);
- internal::g_core->CreateChildMessagePipe(token, callback);
+ callback.Run(CreateChildMessagePipe(token));
}
std::string GenerateRandomToken() {
diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h
index b43748c..49b64c4 100644
--- a/mojo/edk/embedder/embedder.h
+++ b/mojo/edk/embedder/embedder.h
@@ -118,7 +118,12 @@ MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupportOnIOThread();
// |OnShutdownComplete()|.
MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupport();
-// Unused. Crashes. Only here for linking.
+// Creates a message pipe over an arbitrary platform channel. The other end of
+// the channel must also be passed to this function. Either endpoint can be in
+// any process.
+//
+// Note that the channel is only used to negotiate pipe connection, not as the
+// transport for messages on the pipe.
MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
CreateMessagePipe(ScopedPlatformHandle platform_handle);
@@ -128,13 +133,10 @@ CreateMessagePipe(ScopedPlatformHandle platform_handle);
// either PreInitializeChildProcess() or SetParentPipe() must have been been
// called at least once already.
//
-// Note: This only exists for backwards compatibility with embedders that rely
-// on mojo::embedder::CreateChannel() behavior. If you have a means of passing
-// platform handles around, you can probably also pass strings around. If you
-// can pass strings around, use CreateParentMessagePipe() and
-// CreateChlidMessagePipe() instead (see below.)
-//
// |callback| must be safe to call from any thread.
+//
+// DEPRECATED: Please don't use this. Use the synchronous version above. This
+// is now merely an inconvenient wrapper for that.
MOJO_SYSTEM_IMPL_EXPORT void
CreateMessagePipe(
ScopedPlatformHandle platform_handle,
@@ -143,8 +145,17 @@ CreateMessagePipe(
// Creates a message pipe from a token. A child embedder must also have this
// token and call CreateChildMessagePipe() with it in order for the pipe to get
// connected.
+MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
+CreateParentMessagePipe(const std::string& token);
+
+// Creates a message pipe from a token. A child embedder must also have this
+// token and call CreateChildMessagePipe() with it in order for the pipe to get
+// connected.
//
// |callback| must be safe to call from any thread.
+//
+// DEPRECATED: Please don't use this. Use the synchronous version above. This
+// is now merely an inconvenient wrapper for that.
MOJO_SYSTEM_IMPL_EXPORT void
CreateParentMessagePipe(
const std::string& token,
@@ -153,8 +164,17 @@ CreateParentMessagePipe(
// Creates a message pipe from a token in a child process. The parent must also
// have this token and call CreateParentMessagePipe() with it in order for the
// pipe to get connected.
+MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
+CreateChildMessagePipe(const std::string& token);
+
+// Creates a message pipe from a token in a child process. The parent must also
+// have this token and call CreateParentMessagePipe() with it in order for the
+// pipe to get connected.
//
// |callback| must be safe to call from any thread.
+//
+// DEPRECATED: Please don't use this. Use the synchronous version above. This
+// is now merely an inconvenient wrapper for that.
MOJO_SYSTEM_IMPL_EXPORT void
CreateChildMessagePipe(
const std::string& token,
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index aad9541..d0b7b67 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -44,19 +44,9 @@ namespace {
// This is an unnecessarily large limit that is relatively easy to enforce.
const uint32_t kMaxHandlesPerMessage = 1024 * 1024;
-void OnPortConnected(
- Core* core,
- int endpoint,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback,
- const ports::PortRef& port) {
- // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too;
- // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for
- // bootstrap and aren't passed around, so tracking them is less important.
- MojoHandle handle = core->AddDispatcher(
- new MessagePipeDispatcher(core->GetNodeController(), port,
- 0x7f7f7f7f7f7f7f7fUL, endpoint));
- callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle)));
-}
+// TODO: Maybe we could negotiate a debugging pipe ID for cross-process pipes
+// too; for now we just use a constant. This only affects bootstrap pipes.
+const uint64_t kUnknownPipeIdForDebug = 0x7f7f7f7f7f7f7f7fUL;
} // namespace
@@ -160,32 +150,37 @@ void Core::RequestShutdown(const base::Closure& callback) {
GetNodeController()->RequestShutdown(on_shutdown);
}
-void Core::CreateMessagePipe(
- ScopedPlatformHandle platform_handle,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- ports::PortRef port;
- GetNodeController()->node()->CreateUninitializedPort(&port);
+ScopedMessagePipeHandle Core::CreateMessagePipe(
+ ScopedPlatformHandle platform_handle) {
+ ports::PortRef port0, port1;
+ GetNodeController()->node()->CreatePortPair(&port0, &port1);
+ MojoHandle handle = AddDispatcher(
+ new MessagePipeDispatcher(GetNodeController(), port0,
+ kUnknownPipeIdForDebug, 0));
RemoteMessagePipeBootstrap::Create(
- GetNodeController(), std::move(platform_handle), port,
- base::Bind(&OnPortConnected, base::Unretained(this), 0, callback, port));
-}
-
-void Core::CreateParentMessagePipe(
- const std::string& token,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- GetNodeController()->ReservePort(
- token,
- base::Bind(&OnPortConnected, base::Unretained(this), 0, callback));
-}
-
-void Core::CreateChildMessagePipe(
- const std::string& token,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- ports::PortRef port;
- GetNodeController()->node()->CreateUninitializedPort(&port);
- GetNodeController()->ConnectToParentPort(
- port, token,
- base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port));
+ GetNodeController(), std::move(platform_handle), port1);
+ return ScopedMessagePipeHandle(MessagePipeHandle(handle));
+}
+
+ScopedMessagePipeHandle Core::CreateParentMessagePipe(
+ const std::string& token) {
+ ports::PortRef port0, port1;
+ GetNodeController()->node()->CreatePortPair(&port0, &port1);
+ MojoHandle handle = AddDispatcher(
+ new MessagePipeDispatcher(GetNodeController(), port0,
+ kUnknownPipeIdForDebug, 0));
+ GetNodeController()->ReservePort(token, port1);
+ return ScopedMessagePipeHandle(MessagePipeHandle(handle));
+}
+
+ScopedMessagePipeHandle Core::CreateChildMessagePipe(const std::string& token) {
+ ports::PortRef port0, port1;
+ GetNodeController()->node()->CreatePortPair(&port0, &port1);
+ MojoHandle handle = AddDispatcher(
+ new MessagePipeDispatcher(GetNodeController(), port0,
+ kUnknownPipeIdForDebug, 1));
+ GetNodeController()->MergePortIntoParent(token, port1);
+ return ScopedMessagePipeHandle(MessagePipeHandle(handle));
}
MojoResult Core::AsyncWait(MojoHandle handle,
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index 8e9af8e..603cba9 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -52,26 +52,19 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
// Called in a child process exactly once during early initialization.
void InitChild(ScopedPlatformHandle platform_handle);
- // This creates a message pipe endpoint connected to an endpoint in a remote
+ // Creates a message pipe endpoint connected to an endpoint in a remote
// embedder. |platform_handle| is used as a channel to negotiate the
- // connection. This is only here to facilitate legacy embedder code. See
- // mojo::edk::CreateMessagePipe in mojo/edk/embedder/embedder.h.
- void CreateMessagePipe(
- ScopedPlatformHandle platform_handle,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback);
+ // connection.
+ ScopedMessagePipeHandle CreateMessagePipe(
+ ScopedPlatformHandle platform_handle);
// Creates a message pipe endpoint associated with |token|, which a child
// holding the token can later locate and connect to.
- void CreateParentMessagePipe(
- const std::string& token,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback);
-
- // Creates a message pipe endpoint associated with |token|, which will be
- // passed to the parent in order to find an associated remote port and connect
- // to it.
- void CreateChildMessagePipe(
- const std::string& token,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback);
+ ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token);
+
+ // Creates a message pipe endpoint and connects it to a pipe the parent has
+ // associated with |token|.
+ ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token);
MojoHandle AddDispatcher(scoped_refptr<Dispatcher> dispatcher);
diff --git a/mojo/edk/system/multiprocess_message_pipe_unittest.cc b/mojo/edk/system/multiprocess_message_pipe_unittest.cc
index 00c2ab5..ece85a1 100644
--- a/mojo/edk/system/multiprocess_message_pipe_unittest.cc
+++ b/mojo/edk/system/multiprocess_message_pipe_unittest.cc
@@ -19,6 +19,7 @@
#include "base/logging.h"
#include "base/strings/string_split.h"
#include "build/build_config.h"
+#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/handle_signals_state.h"
#include "mojo/edk/system/test_utils.h"
@@ -1271,6 +1272,44 @@ TEST_F(MultiprocessMessagePipeTest, WriteCloseSendPeer) {
END_CHILD()
}
+DEFINE_TEST_CLIENT_TEST_WITH_PIPE(BootstrapMessagePipeAsyncClient,
+ MultiprocessMessagePipeTest, parent) {
+ // Receive one end of a platform channel from the parent.
+ MojoHandle channel_handle;
+ EXPECT_EQ("hi", ReadMessageWithHandles(parent, &channel_handle, 1));
+ ScopedPlatformHandle channel;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ edk::PassWrappedPlatformHandle(channel_handle, &channel));
+ ASSERT_TRUE(channel.is_valid());
+
+ // Create a new pipe using our end of the channel.
+ ScopedMessagePipeHandle pipe = edk::CreateMessagePipe(std::move(channel));
+
+ // Ensure that we can read and write on the new pipe.
+ VerifyEcho(pipe.get().value(), "goodbye");
+}
+
+TEST_F(MultiprocessMessagePipeTest, BootstrapMessagePipeAsync) {
+ // Tests that new cross-process message pipes can be created synchronously
+ // using asynchronous negotiation over an arbitrary platform channel.
+ RUN_CHILD_ON_PIPE(BootstrapMessagePipeAsyncClient, child)
+ // Pass one end of a platform channel to the child.
+ PlatformChannelPair platform_channel;
+ MojoHandle client_channel_handle;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ CreatePlatformHandleWrapper(platform_channel.PassClientHandle(),
+ &client_channel_handle));
+ WriteMessageWithHandles(child, "hi", &client_channel_handle, 1);
+
+ // Create a new pipe using our end of the channel.
+ ScopedMessagePipeHandle pipe =
+ edk::CreateMessagePipe(platform_channel.PassServerHandle());
+
+ // Ensure that we can read and write on the new pipe.
+ VerifyEcho(pipe.get().value(), "goodbye");
+ END_CHILD()
+}
+
} // namespace
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/node_channel.cc b/mojo/edk/system/node_channel.cc
index 4fb0ac4..0277d46 100644
--- a/mojo/edk/system/node_channel.cc
+++ b/mojo/edk/system/node_channel.cc
@@ -29,8 +29,7 @@ enum class MessageType : uint32_t {
BROKER_CLIENT_ADDED,
ACCEPT_BROKER_CLIENT,
PORTS_MESSAGE,
- REQUEST_PORT_CONNECTION,
- CONNECT_TO_PORT,
+ REQUEST_PORT_MERGE,
REQUEST_INTRODUCTION,
INTRODUCE,
#if defined(OS_WIN)
@@ -74,15 +73,10 @@ struct AcceptBrokerClientData {
// This is followed by arbitrary payload data which is interpreted as a token
// string for port location.
-struct RequestPortConnectionData {
+struct RequestPortMergeData {
ports::PortName connector_port_name;
};
-struct ConnectToPortData {
- ports::PortName connector_port_name;
- ports::PortName connectee_port_name;
-};
-
// Used for both REQUEST_INTRODUCTION and INTRODUCE.
//
// For INTRODUCE the message also includes a valid platform handle for a channel
@@ -267,28 +261,17 @@ void NodeChannel::PortsMessage(Channel::MessagePtr message) {
WriteChannelMessage(std::move(message));
}
-void NodeChannel::RequestPortConnection(
- const ports::PortName& connector_port_name,
- const std::string& token) {
- RequestPortConnectionData* data;
+void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
+ const std::string& token) {
+ RequestPortMergeData* data;
Channel::MessagePtr message = CreateMessage(
- MessageType::REQUEST_PORT_CONNECTION,
- sizeof(RequestPortConnectionData) + token.size(), 0, &data);
+ MessageType::REQUEST_PORT_MERGE,
+ sizeof(RequestPortMergeData) + token.size(), 0, &data);
data->connector_port_name = connector_port_name;
memcpy(data + 1, token.data(), token.size());
WriteChannelMessage(std::move(message));
}
-void NodeChannel::ConnectToPort(const ports::PortName& connector_port_name,
- const ports::PortName& connectee_port_name) {
- ConnectToPortData* data;
- Channel::MessagePtr message = CreateMessage(
- MessageType::CONNECT_TO_PORT, sizeof(ConnectToPortData), 0, &data);
- data->connector_port_name = connector_port_name;
- data->connectee_port_name = connectee_port_name;
- WriteChannelMessage(std::move(message));
-}
-
void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
IntroductionData* data;
Channel::MessagePtr message = CreateMessage(
@@ -452,24 +435,16 @@ void NodeChannel::OnChannelMessage(const void* payload,
break;
}
- case MessageType::REQUEST_PORT_CONNECTION: {
- const RequestPortConnectionData* data;
+ case MessageType::REQUEST_PORT_MERGE: {
+ const RequestPortMergeData* data;
GetMessagePayload(payload, &data);
const char* token_data = reinterpret_cast<const char*>(data + 1);
const size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
std::string token(token_data, token_size);
- delegate_->OnRequestPortConnection(remote_node_name_,
- data->connector_port_name, token);
- break;
- }
-
- case MessageType::CONNECT_TO_PORT: {
- const ConnectToPortData* data;
- GetMessagePayload(payload, &data);
- delegate_->OnConnectToPort(remote_node_name_, data->connector_port_name,
- data->connectee_port_name);
+ delegate_->OnRequestPortMerge(remote_node_name_,
+ data->connector_port_name, token);
break;
}
diff --git a/mojo/edk/system/node_channel.h b/mojo/edk/system/node_channel.h
index a9bc520..2bc86bd 100644
--- a/mojo/edk/system/node_channel.h
+++ b/mojo/edk/system/node_channel.h
@@ -44,14 +44,9 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) = 0;
virtual void OnPortsMessage(Channel::MessagePtr message) = 0;
- virtual void OnRequestPortConnection(
- const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const std::string& token) = 0;
- virtual void OnConnectToPort(
- const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const ports::PortName& connectee_port_name) = 0;
+ virtual void OnRequestPortMerge(const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const std::string& token) = 0;
virtual void OnRequestIntroduction(const ports::NodeName& from_node,
const ports::NodeName& name) = 0;
virtual void OnIntroduce(const ports::NodeName& from_node,
@@ -103,10 +98,8 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
void AcceptBrokerClient(const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel);
void PortsMessage(Channel::MessagePtr message);
- void RequestPortConnection(const ports::PortName& connector_port_name,
- const std::string& token);
- void ConnectToPort(const ports::PortName& connector_port_name,
- const ports::PortName& connectee_port_name);
+ void RequestPortMerge(const ports::PortName& connector_port_name,
+ const std::string& token);
void RequestIntroduction(const ports::NodeName& name);
void Introduce(const ports::NodeName& name,
ScopedPlatformHandle channel_handle);
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
index 6410cae..38c53df 100644
--- a/mojo/edk/system/node_controller.cc
+++ b/mojo/edk/system/node_controller.cc
@@ -99,18 +99,6 @@ class ThreadDestructionObserver :
} // namespace
-NodeController::PendingPortRequest::PendingPortRequest() {}
-
-NodeController::PendingPortRequest::~PendingPortRequest() {}
-
-NodeController::ReservedPort::ReservedPort() {}
-
-NodeController::ReservedPort::~ReservedPort() {}
-
-NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {}
-
-NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {}
-
NodeController::~NodeController() {}
NodeController::NodeController(Core* core)
@@ -182,18 +170,25 @@ int NodeController::SendMessage(const ports::PortRef& port,
}
void NodeController::ReservePort(const std::string& token,
- const ReservePortCallback& callback) {
- ports::PortRef port;
- node_->CreateUninitializedPort(&port);
-
+ const ports::PortRef& port) {
DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
<< token;
base::AutoLock lock(reserved_ports_lock_);
- ReservedPort reservation;
- reservation.local_port = port;
- reservation.callback = callback;
- reserved_ports_.insert(std::make_pair(token, reservation));
+ auto result = reserved_ports_.insert(std::make_pair(token, port));
+ DCHECK(result.second);
+}
+
+void NodeController::MergePortIntoParent(const std::string& token,
+ const ports::PortRef& port) {
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (parent) {
+ parent->RequestPortMerge(port.name(), token);
+ return;
+ }
+
+ base::AutoLock lock(pending_port_merges_lock_);
+ pending_port_merges_.push_back(std::make_pair(token, port));
}
scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
@@ -210,42 +205,6 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
return buffer;
}
-void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
- const std::string& token,
- const base::Closure& callback) {
- io_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
- base::Unretained(this), local_port, token, callback));
-}
-
-void NodeController::ConnectToRemotePort(
- const ports::PortRef& local_port,
- const ports::NodeName& remote_node_name,
- const ports::PortName& remote_port_name,
- const base::Closure& callback) {
- if (remote_node_name == name_) {
- // It's possible that two different code paths on the node are trying to
- // bootstrap ports to each other (e.g. in Chrome single-process mode)
- // without being aware of the fact. In this case we can initialize the port
- // immediately (which can fail silently if it's already been initialized by
- // the request on the other side), and invoke |callback|.
- node_->InitializePort(local_port, name_, remote_port_name);
- callback.Run();
- return;
- }
-
- PendingRemotePortConnection connection;
- connection.local_port = local_port;
- connection.remote_node_name = remote_node_name;
- connection.remote_port_name = remote_port_name;
- connection.callback = callback;
- io_task_runner_->PostTask(
- FROM_HERE,
- base::Bind(&NodeController::ConnectToRemotePortOnIOThread,
- base::Unretained(this), connection));
-}
-
void NodeController::RequestShutdown(const base::Closure& callback) {
{
base::AutoLock lock(shutdown_lock_);
@@ -304,46 +263,6 @@ void NodeController::ConnectToParentOnIOThread(
bootstrap_parent_channel_->Start();
}
-void NodeController::RequestParentPortConnectionOnIOThread(
- const ports::PortRef& local_port,
- const std::string& token,
- const base::Closure& callback) {
- DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
-
- scoped_refptr<NodeChannel> parent = GetParentChannel();
- if (!parent) {
- PendingPortRequest request;
- request.token = token;
- request.local_port = local_port;
- request.callback = callback;
- pending_port_requests_.push_back(request);
- return;
- }
-
- pending_parent_port_connections_.insert(
- std::make_pair(local_port.name(), callback));
- parent->RequestPortConnection(local_port.name(), token);
-}
-
-void NodeController::ConnectToRemotePortOnIOThread(
- const PendingRemotePortConnection& connection) {
- scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name);
- if (peer) {
- // It's safe to initialize the port since we already have a channel to its
- // peer. No need to actually send them a message.
- int rv = node_->InitializePort(connection.local_port,
- connection.remote_node_name,
- connection.remote_port_name);
- DCHECK_EQ(rv, ports::OK);
- connection.callback.Run();
- return;
- }
-
- // Save this for later. We'll initialize the port once this peer is added.
- pending_remote_port_connections_[connection.remote_node_name].push_back(
- connection);
-}
-
scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
const ports::NodeName& name) {
base::AutoLock lock(peers_lock_);
@@ -414,19 +333,6 @@ void NodeController::AddPeer(const ports::NodeName& name,
channel->PortsMessage(std::move(pending_messages.front()));
pending_messages.pop();
}
-
- // Complete any pending port connections to this peer.
- auto connections_it = pending_remote_port_connections_.find(name);
- if (connections_it != pending_remote_port_connections_.end()) {
- for (const auto& connection : connections_it->second) {
- int rv = node_->InitializePort(connection.local_port,
- connection.remote_node_name,
- connection.remote_port_name);
- DCHECK_EQ(rv, ports::OK);
- connection.callback.Run();
- }
- pending_remote_port_connections_.erase(connections_it);
- }
}
void NodeController::DropPeer(const ports::NodeName& name) {
@@ -616,6 +522,8 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node,
// NOTE: The child does not actually add its parent as a peer until
// receiving an AcceptBrokerClient message from the broker. The parent
// will request that said message be sent upon receiving AcceptParent.
+
+ DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
}
void NodeController::OnAcceptParent(const ports::NodeName& from_node,
@@ -763,15 +671,16 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
io_task_runner_);
AddPeer(broker_name, broker, true /* start_channel */);
}
+
AddPeer(parent_name, parent, false /* start_channel */);
- // Resolve any pending port connections to the parent.
- for (const auto& request : pending_port_requests_) {
- pending_parent_port_connections_.insert(
- std::make_pair(request.local_port.name(), request.callback));
- parent->RequestPortConnection(request.local_port.name(), request.token);
+ {
+ // Complete any port merge requests we have waiting for the parent.
+ base::AutoLock lock(pending_port_merges_lock_);
+ for (const auto& request : pending_port_merges_)
+ parent->RequestPortMerge(request.second.name(), request.first);
+ pending_port_merges_.clear();
}
- pending_port_requests_.clear();
// Feed the broker any pending children of our own.
while (!pending_broker_clients.empty()) {
@@ -824,16 +733,15 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
AttemptShutdownIfRequested();
}
-void NodeController::OnRequestPortConnection(
+void NodeController::OnRequestPortMerge(
const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
const std::string& token) {
DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
- DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
+ DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
<< token << " and port " << connector_port_name << "@" << from_node;
- ReservePortCallback callback;
ports::PortRef local_port;
{
base::AutoLock lock(reserved_ports_lock_);
@@ -843,64 +751,12 @@ void NodeController::OnRequestPortConnection(
<< token;
return;
}
- local_port = it->second.local_port;
- callback = it->second.callback;
- reserved_ports_.erase(it);
- }
-
- DCHECK(!callback.is_null());
-
- scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
- if (!peer) {
- DVLOG(1) << "Ignoring request to connect to port from unknown node "
- << from_node;
- return;
+ local_port = it->second;
}
- // This reserved port should not have been initialized yet.
- CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
- connector_port_name));
-
- peer->ConnectToPort(local_port.name(), connector_port_name);
- callback.Run(local_port);
-}
-
-void NodeController::OnConnectToPort(
- const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const ports::PortName& connectee_port_name) {
- DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
-
- DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
- << connectee_port_name << " to port " << connector_port_name << "@"
- << from_node;
-
- ports::PortRef connectee_port;
- int rv = node_->GetPort(connectee_port_name, &connectee_port);
- if (rv != ports::OK) {
- DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
- << connectee_port_name;
- return;
- }
-
- // It's OK if this port has already been initialized. This message is only
- // sent by the remote peer to ensure the port is ready before it starts
- // us sending messages to it.
- ports::PortStatus port_status;
- rv = node_->GetStatus(connectee_port, &port_status);
- if (rv == ports::OK) {
- DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
- << connectee_port_name;
- return;
- }
-
- CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
- connector_port_name));
-
- auto it = pending_parent_port_connections_.find(connectee_port_name);
- DCHECK(it != pending_parent_port_connections_.end());
- it->second.Run();
- pending_parent_port_connections_.erase(it);
+ int rv = node_->MergePorts(local_port, from_node, connector_port_name);
+ if (rv != ports::OK)
+ DLOG(ERROR) << "MergePorts failed: " << rv;
}
void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h
index 35148a0..0675e97 100644
--- a/mojo/edk/system/node_controller.h
+++ b/mojo/edk/system/node_controller.h
@@ -82,42 +82,13 @@ class NodeController : public ports::NodeDelegate,
int SendMessage(const ports::PortRef& port_ref,
scoped_ptr<PortsMessage>* message);
- using ReservePortCallback = base::Callback<void(const ports::PortRef& port)>;
+ // Reserves a local port |port| associated with |token|. A peer holding a copy
+ // of |token| can merge one of its own ports into this one.
+ void ReservePort(const std::string& token, const ports::PortRef& port);
- // Reserves a port associated with |token|. A peer may associate one of their
- // own ports with this one by sending us a RequestPortConnection message with
- // the same token value.
- //
- // Note that the reservation is made synchronously. In order to avoid races,
- // reservations should be acquired before |token| is communicated to any
- // potential peer.
- //
- // |callback| must be runnable on any thread and will be run with a reference
- // to the new local port once connected.
- void ReservePort(const std::string& token,
- const ReservePortCallback& callback);
-
- // Eventually initializes a local port with a parent port peer identified by
- // |token|. The parent should also have |token| and should alrady have
- // reserved a port for it. |callback| must be runnable on any thread and will
- // be run if and when the local port is connected.
- void ConnectToParentPort(const ports::PortRef& local_port,
- const std::string& token,
- const base::Closure& callback);
-
- // Connects two reserved ports to each other. Useful when two independent
- // systems in the same (parent) process need to establish a port pair without
- // any direct knowledge of each other.
- void ConnectReservedPorts(const std::string& token1,
- const std::string& token2);
-
- // Connects a local port to a port on a remote node. Note that a connection to
- // the remote node need not be established yet. The port will be connected
- // ASAP, at which point |callback| will be run.
- void ConnectToRemotePort(const ports::PortRef& local_port,
- const ports::NodeName& remote_node_name,
- const ports::PortName& remote_port_name,
- const base::Closure& callback);
+ // Merges a local port |port| into a port reserved by |token| in the parent.
+ void MergePortIntoParent(const std::string& token,
+ const ports::PortRef& port);
// Creates a new shared buffer for use in the current process.
scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
@@ -138,44 +109,9 @@ class NodeController : public ports::NodeDelegate,
scoped_refptr<NodeChannel>>;
using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
- // Tracks a pending token-based connection to a parent port.
- struct PendingPortRequest {
- PendingPortRequest();
- ~PendingPortRequest();
-
- std::string token;
- ports::PortRef local_port;
- base::Closure callback;
- };
-
- // Tracks a reserved port.
- struct ReservedPort {
- ReservedPort();
- ~ReservedPort();
-
- ports::PortRef local_port;
- ReservePortCallback callback;
- };
-
- // Tracks a pending connection to a remote port on any peer.
- struct PendingRemotePortConnection {
- PendingRemotePortConnection();
- ~PendingRemotePortConnection();
-
- ports::PortRef local_port;
- ports::NodeName remote_node_name;
- ports::PortName remote_port_name;
- base::Closure callback;
- };
-
void ConnectToChildOnIOThread(base::ProcessHandle process_handle,
ScopedPlatformHandle platform_handle);
void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle);
- void RequestParentPortConnectionOnIOThread(const ports::PortRef& local_port,
- const std::string& token,
- const base::Closure& callback);
- void ConnectToRemotePortOnIOThread(
- const PendingRemotePortConnection& connection);
scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
scoped_refptr<NodeChannel> GetParentChannel();
@@ -215,12 +151,9 @@ class NodeController : public ports::NodeDelegate,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) override;
void OnPortsMessage(Channel::MessagePtr message) override;
- void OnRequestPortConnection(const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const std::string& token) override;
- void OnConnectToPort(const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const ports::PortName& connectee_port_name) override;
+ void OnRequestPortMerge(const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const std::string& token) override;
void OnRequestIntroduction(const ports::NodeName& from_node,
const ports::NodeName& name) override;
void OnIntroduce(const ports::NodeName& from_node,
@@ -264,7 +197,13 @@ class NodeController : public ports::NodeDelegate,
base::Lock reserved_ports_lock_;
// Ports reserved by token.
- base::hash_map<std::string, ReservedPort> reserved_ports_;
+ base::hash_map<std::string, ports::PortRef> reserved_ports_;
+
+ // Guards |pending_port_merges_|.
+ base::Lock pending_port_merges_lock_;
+
+ // A set of port merge requests awaiting parent connection.
+ std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
// Guards |parent_name_| and |bootstrap_parent_channel_|.
base::Lock parent_lock_;
@@ -307,17 +246,6 @@ class NodeController : public ports::NodeDelegate,
// Channels to children during handshake.
NodeMap pending_children_;
- // Port connection requests which have been deferred until we have a parent.
- std::vector<PendingPortRequest> pending_port_requests_;
-
- // Port connection requests awaiting a response from the parent.
- std::unordered_map<ports::PortName, base::Closure>
- pending_parent_port_connections_;
-
- // Port connections pending the availability of a remote peer node.
- std::unordered_map<ports::NodeName, std::vector<PendingRemotePortConnection>>
- pending_remote_port_connections_;
-
// Indicates whether this object should delete itself on IO thread shutdown.
// Must only be accessed from the IO thread.
bool destroy_on_io_thread_shutdown_ = false;
diff --git a/mojo/edk/system/ports/event.h b/mojo/edk/system/ports/event.h
index aec9f6b..1dc8eab 100644
--- a/mojo/edk/system/ports/event.h
+++ b/mojo/edk/system/ports/event.h
@@ -34,6 +34,7 @@ enum struct EventType : uint32_t {
kObserveProxy,
kObserveProxyAck,
kObserveClosure,
+ kMergePort,
};
struct EventHeader {
@@ -63,6 +64,11 @@ struct ObserveClosureEventData {
uint64_t last_sequence_num;
};
+struct MergePortEventData {
+ PortName new_port_name;
+ PortDescriptor new_port_descriptor;
+};
+
inline const EventHeader* GetEventHeader(const Message& message) {
return static_cast<const EventHeader*>(message.header_bytes());
}
diff --git a/mojo/edk/system/ports/message.cc b/mojo/edk/system/ports/message.cc
index 20f6d88..2106c15 100644
--- a/mojo/edk/system/ports/message.cc
+++ b/mojo/edk/system/ports/message.cc
@@ -37,6 +37,9 @@ void Message::Parse(const void* bytes,
case EventType::kObserveClosure:
*num_header_bytes = sizeof(EventHeader) + sizeof(ObserveClosureEventData);
break;
+ case EventType::kMergePort:
+ *num_header_bytes = sizeof(EventHeader) + sizeof(MergePortEventData);
+ break;
default:
CHECK(false) << "Bad event type";
return;
diff --git a/mojo/edk/system/ports/name.h b/mojo/edk/system/ports/name.h
index b0ad0b7..8a9307d 100644
--- a/mojo/edk/system/ports/name.h
+++ b/mojo/edk/system/ports/name.h
@@ -8,6 +8,7 @@
#include <stdint.h>
#include <ostream>
+#include <tuple>
namespace mojo {
namespace edk {
@@ -21,9 +22,15 @@ struct Name {
inline bool operator==(const Name& a, const Name& b) {
return a.v1 == b.v1 && a.v2 == b.v2;
}
+
inline bool operator!=(const Name& a, const Name& b) {
return !(a == b);
}
+
+inline bool operator<(const Name& a, const Name& b) {
+ return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2);
+}
+
std::ostream& operator<<(std::ostream& stream, const Name& name);
struct PortName : Name {
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
index 1718306..9cfc511 100644
--- a/mojo/edk/system/ports/node.cc
+++ b/mojo/edk/system/ports/node.cc
@@ -354,10 +354,39 @@ int Node::AcceptMessage(ScopedMessage message) {
return OnObserveClosure(
header->port_name,
GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
+ case EventType::kMergePort:
+ return OnMergePort(header->port_name,
+ *GetEventData<MergePortEventData>(*message));
}
return OOPS(ERROR_NOT_IMPLEMENTED);
}
+int Node::MergePorts(const PortRef& port_ref,
+ const NodeName& destination_node_name,
+ const PortName& destination_port_name) {
+ Port* port = port_ref.port();
+ {
+ // |ports_lock_| must be held for WillSendPort_Locked below.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock lock(port->lock);
+
+ DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
+ << " to " << destination_port_name << "@" << destination_node_name;
+
+ // Send the port-to-merge over to the destination node so it can be merged
+ // into the port cycle atomically there.
+ MergePortEventData data;
+ data.new_port_name = port_ref.name();
+ WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
+ &data.new_port_descriptor);
+ delegate_->ForwardMessage(
+ destination_node_name,
+ NewInternalMessage(destination_port_name,
+ EventType::kMergePort, data));
+ }
+ return OK;
+}
+
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
@@ -509,34 +538,8 @@ int Node::OnPortAccepted(const PortName& port_name) {
<< " pointing to "
<< port->peer_port_name << "@" << port->peer_node_name;
- if (port->state != Port::kBuffering)
- return OOPS(ERROR_PORT_STATE_UNEXPECTED);
-
- port->state = Port::kProxying;
-
- int rv = ForwardMessages_Locked(port.get(), port_name);
- if (rv != OK)
- return rv;
-
- // We may have observed closure before receiving PortAccepted. In that
- // case, we can advance to removing the proxy without sending out an
- // ObserveProxy message. We already know the last expected message, etc.
-
- if (port->remove_proxy_on_last_message) {
- MaybeRemoveProxy_Locked(port.get(), port_name);
-
- // Make sure we propagate closure to our current peer.
- ObserveClosureEventData data;
- data.last_sequence_num = port->last_sequence_num_to_receive;
- delegate_->ForwardMessage(
- port->peer_node_name,
- NewInternalMessage(port->peer_port_name,
- EventType::kObserveClosure, data));
- } else {
- InitiateProxyRemoval_Locked(port.get(), port_name);
- }
+ return BeginProxying_Locked(port.get(), port_name);
}
- return OK;
}
int Node::OnObserveProxy(const PortName& port_name,
@@ -721,6 +724,94 @@ int Node::OnObserveClosure(const PortName& port_name,
return OK;
}
+int Node::OnMergePort(const PortName& port_name,
+ const MergePortEventData& event) {
+ scoped_refptr<Port> port = GetPort(port_name);
+ if (!port)
+ return ERROR_PORT_UNKNOWN;
+
+ DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
+ << port->state << ") merging with proxy " << event.new_port_name
+ << "@" << name_ << " pointing to "
+ << event.new_port_descriptor.peer_port_name << "@"
+ << event.new_port_descriptor.peer_node_name << " referred by "
+ << event.new_port_descriptor.referring_port_name << "@"
+ << event.new_port_descriptor.referring_node_name;
+
+ bool close_target_port = false;
+ bool close_new_port = false;
+
+ // Accept the new port. This is now the receiving end of the other port cycle
+ // to be merged with ours.
+ int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
+ if (rv != OK) {
+ close_target_port = true;
+ } else {
+ // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
+ // needs to hold |ports_lock_|. We also acquire multiple port locks within.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock lock(port->lock);
+
+ if (port->state != Port::kReceiving) {
+ close_new_port = true;
+ } else {
+ scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
+ base::AutoLock new_port_lock(new_port->lock);
+ DCHECK(new_port->state == Port::kReceiving);
+
+ // Both ports are locked. Now all we have to do is swap their peer
+ // information and set them up as proxies.
+
+ std::swap(port->peer_node_name, new_port->peer_node_name);
+ std::swap(port->peer_port_name, new_port->peer_port_name);
+ std::swap(port->peer_closed, new_port->peer_closed);
+
+ port->state = Port::kBuffering;
+ if (port->peer_closed)
+ port->remove_proxy_on_last_message = true;
+
+ new_port->state = Port::kBuffering;
+ if (new_port->peer_closed)
+ new_port->remove_proxy_on_last_message = true;
+
+ int rv1 = BeginProxying_Locked(port.get(), port_name);
+ int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
+
+ if (rv1 == OK && rv2 == OK)
+ return OK;
+
+ // If either proxy failed to initialize (e.g. had undeliverable messages
+ // or ended up in a bad state somehow), we keep the system in a consistent
+ // state by undoing the peer swap and closing both merge ports.
+
+ std::swap(port->peer_node_name, new_port->peer_node_name);
+ std::swap(port->peer_port_name, new_port->peer_port_name);
+ port->state = Port::kReceiving;
+ new_port->state = Port::kReceiving;
+ close_new_port = true;
+ close_target_port = true;
+ }
+ }
+
+ if (close_target_port) {
+ PortRef target_port;
+ rv = GetPort(port_name, &target_port);
+ DCHECK(rv == OK);
+
+ ClosePort(target_port);
+ }
+
+ if (close_new_port) {
+ PortRef new_port;
+ rv = GetPort(event.new_port_name, &new_port);
+ DCHECK(rv == OK);
+
+ ClosePort(new_port);
+ }
+
+ return ERROR_PORT_STATE_UNEXPECTED;
+}
+
int Node::AddPortWithName(const PortName& port_name,
const scoped_refptr<Port>& port) {
base::AutoLock lock(ports_lock_);
@@ -908,6 +999,40 @@ int Node::WillSendMessage_Locked(Port* port,
return OK;
}
+int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
+ ports_lock_.AssertAcquired();
+ port->lock.AssertAcquired();
+
+ if (port->state != Port::kBuffering)
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+
+ port->state = Port::kProxying;
+
+ int rv = ForwardMessages_Locked(port, port_name);
+ if (rv != OK)
+ return rv;
+
+ // We may have observed closure while buffering. In that case, we can advance
+ // to removing the proxy without sending out an ObserveProxy message. We
+ // already know the last expected message, etc.
+
+ if (port->remove_proxy_on_last_message) {
+ MaybeRemoveProxy_Locked(port, port_name);
+
+ // Make sure we propagate closure to our current peer.
+ ObserveClosureEventData data;
+ data.last_sequence_num = port->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name,
+ EventType::kObserveClosure, data));
+ } else {
+ InitiateProxyRemoval_Locked(port, port_name);
+ }
+
+ return OK;
+}
+
int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
ports_lock_.AssertAcquired();
port->lock.AssertAcquired();
diff --git a/mojo/edk/system/ports/node.h b/mojo/edk/system/ports/node.h
index 410d53b..cb1d5fd 100644
--- a/mojo/edk/system/ports/node.h
+++ b/mojo/edk/system/ports/node.h
@@ -122,6 +122,21 @@ class Node {
// Corresponding to NodeDelegate::ForwardMessage.
int AcceptMessage(ScopedMessage message);
+ // Called to merge two ports with each other. If you have two independent
+ // port pairs A <=> B and C <=> D, the net result of merging B and C is a
+ // single connected port pair A <=> D.
+ //
+ // Note that the behavior of this operation is undefined if either port to be
+ // merged (B or C above) has ever been read from or written to directly, and
+ // this must ONLY be called on one side of the merge, though it doesn't matter
+ // which side.
+ //
+ // It is safe for the non-merged peers (A and D above) to be transferred,
+ // closed, and/or written to before, during, or after the merge.
+ int MergePorts(const PortRef& port_ref,
+ const NodeName& destination_node_name,
+ const PortName& destination_port_name);
+
// Called to inform this node that communication with another node is lost
// indefinitely. This triggers cleanup of ports bound to this node.
int LostConnectionToNode(const NodeName& node_name);
@@ -133,6 +148,7 @@ class Node {
const ObserveProxyEventData& event);
int OnObserveProxyAck(const PortName& port_name, uint64_t last_sequence_num);
int OnObserveClosure(const PortName& port_name, uint64_t last_sequence_num);
+ int OnMergePort(const PortName& port_name, const MergePortEventData& event);
int AddPortWithName(const PortName& port_name,
const scoped_refptr<Port>& port);
@@ -151,6 +167,7 @@ class Node {
int WillSendMessage_Locked(Port* port,
const PortName& port_name,
Message* message);
+ int BeginProxying_Locked(Port* port, const PortName& port_name);
int ForwardMessages_Locked(Port* port, const PortName& port_name);
void InitiateProxyRemoval_Locked(Port* port, const PortName& port_name);
void MaybeRemoveProxy_Locked(Port* port, const PortName& port_name);
diff --git a/mojo/edk/system/ports/ports_unittest.cc b/mojo/edk/system/ports/ports_unittest.cc
index 95e2a63..1bdca3f 100644
--- a/mojo/edk/system/ports/ports_unittest.cc
+++ b/mojo/edk/system/ports/ports_unittest.cc
@@ -690,32 +690,12 @@ TEST_F(PortsTest, SendUninitialized) {
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- // Begin to setup a pipe between node0 and node1, but don't initialize either
- // endpoint.
- PortRef x0, x1;
+ PortRef x0;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_save_messages(true);
-
- // Send a message on each port and expect neither to arrive yet.
-
EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
SendStringMessage(&node0, x0, "oops"));
- EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
- SendStringMessage(&node1, x1, "oh well"));
-
EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_EQ(OK, node1.ClosePort(x1));
-
EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, SendFailure) {
@@ -1068,6 +1048,322 @@ TEST_F(PortsTest, SendWithClosedPeerSent) {
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
+TEST_F(PortsTest, MergePorts) {
+ NodeName node0_name(0, 1);
+ TestNodeDelegate node0_delegate(node0_name);
+ Node node0(node0_name, &node0_delegate);
+ node_map[0] = &node0;
+
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+
+ node0_delegate.set_read_messages(false);
+ node1_delegate.set_save_messages(true);
+
+ // Write a message on A.
+ EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
+
+ PumpTasks();
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+
+ PumpTasks();
+
+ // Expect only two receiving ports to be left after pumping tasks.
+ EXPECT_TRUE(node0.CanShutdownCleanly(true));
+ EXPECT_TRUE(node1.CanShutdownCleanly(true));
+
+ // Expect D to have received the message sent on A.
+ ScopedMessage message;
+ ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(0, strcmp("hey", ToString(message)));
+
+ EXPECT_EQ(OK, node0.ClosePort(A));
+ EXPECT_EQ(OK, node1.ClosePort(D));
+
+ // No more ports should be open.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+}
+
+TEST_F(PortsTest, MergePortWithClosedPeer1) {
+ // This tests that the right thing happens when initiating a merge on a port
+ // whose peer has already been closed.
+
+ NodeName node0_name(0, 1);
+ TestNodeDelegate node0_delegate(node0_name);
+ Node node0(node0_name, &node0_delegate);
+ node_map[0] = &node0;
+
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+
+ node0_delegate.set_read_messages(false);
+ node1_delegate.set_save_messages(true);
+
+ // Write a message on A.
+ EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
+
+ PumpTasks();
+
+ // Close A.
+ EXPECT_EQ(OK, node0.ClosePort(A));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+
+ PumpTasks();
+
+ // Expect only one receiving port to be left after pumping tasks.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(true));
+
+ // Expect D to have received the message sent on A.
+ ScopedMessage message;
+ ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(0, strcmp("hey", ToString(message)));
+
+ EXPECT_EQ(OK, node1.ClosePort(D));
+
+ // No more ports should be open.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+}
+
+TEST_F(PortsTest, MergePortWithClosedPeer2) {
+ // This tests that the right thing happens when merging into a port whose peer
+ // has already been closed.
+
+ NodeName node0_name(0, 1);
+ TestNodeDelegate node0_delegate(node0_name);
+ Node node0(node0_name, &node0_delegate);
+ node_map[0] = &node0;
+
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+
+ node0_delegate.set_save_messages(true);
+ node1_delegate.set_read_messages(false);
+
+ // Write a message on D.
+ EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
+
+ PumpTasks();
+
+ // Close D.
+ EXPECT_EQ(OK, node1.ClosePort(D));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+
+ PumpTasks();
+
+ // Expect only one receiving port to be left after pumping tasks.
+ EXPECT_TRUE(node0.CanShutdownCleanly(true));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+
+ // Expect A to have received the message sent on D.
+ ScopedMessage message;
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(0, strcmp("hey", ToString(message)));
+
+ EXPECT_EQ(OK, node0.ClosePort(A));
+
+ // No more ports should be open.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+}
+
+TEST_F(PortsTest, MergePortsWithClosedPeers) {
+ // This tests that no residual ports are left behind if two ports are merged
+ // when both of their peers have been closed.
+
+ NodeName node0_name(0, 1);
+ TestNodeDelegate node0_delegate(node0_name);
+ Node node0(node0_name, &node0_delegate);
+ node_map[0] = &node0;
+
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+
+ node0_delegate.set_save_messages(true);
+ node1_delegate.set_read_messages(false);
+
+ // Close A and D.
+ EXPECT_EQ(OK, node0.ClosePort(A));
+ EXPECT_EQ(OK, node1.ClosePort(D));
+
+ PumpTasks();
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+
+ PumpTasks();
+
+ // Expect everything to have gone away.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+}
+
+TEST_F(PortsTest, MergePortsWithMovedPeers) {
+ // This tests that no ports can be merged successfully even if their peers
+ // are moved around.
+
+ NodeName node0_name(0, 1);
+ TestNodeDelegate node0_delegate(node0_name);
+ Node node0(node0_name, &node0_delegate);
+ node_map[0] = &node0;
+
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ node0_delegate.set_save_messages(true);
+ node1_delegate.set_read_messages(false);
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+
+ // Set up another pair X-Y for moving ports on node0.
+ PortRef X, Y;
+ EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
+
+ ScopedMessage message;
+
+ // Move A to new port E.
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
+
+ EXPECT_EQ(OK, node0.ClosePort(X));
+ EXPECT_EQ(OK, node0.ClosePort(Y));
+
+ node0_delegate.set_read_messages(false);
+
+ // Write messages on E and D.
+ EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
+ EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+
+ node0_delegate.set_read_messages(true);
+ node1_delegate.set_read_messages(true);
+ node1_delegate.set_save_messages(true);
+
+ PumpTasks();
+
+ // Expect to receive D's message on E and E's message on D.
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(0, strcmp("hi", ToString(message)));
+ ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(0, strcmp("hey", ToString(message)));
+
+ // Close E and D.
+ EXPECT_EQ(OK, node0.ClosePort(E));
+ EXPECT_EQ(OK, node1.ClosePort(D));
+
+ PumpTasks();
+
+ // Expect everything to have gone away.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+}
+
+
+TEST_F(PortsTest, MergePortsFailsGracefully) {
+ // This tests that the system remains in a well-defined state if something
+ // goes wrong during port merge.
+
+ NodeName node0_name(0, 1);
+ TestNodeDelegate node0_delegate(node0_name);
+ Node node0(node0_name, &node0_delegate);
+ node_map[0] = &node0;
+
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ // Setup two independent port pairs, A-B on node0 and C-D on node1.
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+
+ PumpTasks();
+
+ // Initiate a merge between B and C.
+ EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+
+ // Move C to a new port E. This is dumb and nobody should do it, but it's
+ // possible. MergePorts will fail as a result because C won't be in a
+ // receiving state when the event arrives at node1, so B should be closed.
+ ScopedMessage message;
+ PortRef X, Y;
+ EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
+ node1_delegate.set_save_messages(true);
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
+ ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ ASSERT_EQ(1u, message->num_ports());
+ PortRef E;
+ ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
+ EXPECT_EQ(OK, node1.ClosePort(X));
+ EXPECT_EQ(OK, node1.ClosePort(Y));
+
+ // C goes away as a result of normal proxy removal.
+ PumpTasks();
+
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
+
+ // B should have been closed cleanly.
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
+
+ // Close A, D, and E.
+ EXPECT_EQ(OK, node0.ClosePort(A));
+ EXPECT_EQ(OK, node1.ClosePort(D));
+ EXPECT_EQ(OK, node1.ClosePort(E));
+
+ PumpTasks();
+
+ // Expect everything to have gone away.
+ EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
+}
+
} // namespace test
} // namespace ports
} // namespace edk
diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.cc b/mojo/edk/system/remote_message_pipe_bootstrap.cc
index 671542f3..2342166 100644
--- a/mojo/edk/system/remote_message_pipe_bootstrap.cc
+++ b/mojo/edk/system/remote_message_pipe_bootstrap.cc
@@ -31,18 +31,17 @@ struct BootstrapData {
void RemoteMessagePipeBootstrap::Create(
NodeController* node_controller,
ScopedPlatformHandle platform_handle,
- const ports::PortRef& port,
- const base::Closure& callback) {
+ const ports::PortRef& port) {
if (node_controller->io_task_runner()->RunsTasksOnCurrentThread()) {
// Owns itself.
- new RemoteMessagePipeBootstrap(node_controller, std::move(platform_handle),
- port, callback);
+ new RemoteMessagePipeBootstrap(
+ node_controller, std::move(platform_handle), port);
} else {
node_controller->io_task_runner()->PostTask(
FROM_HERE,
base::Bind(&RemoteMessagePipeBootstrap::Create,
base::Unretained(node_controller),
- base::Passed(&platform_handle), port, callback));
+ base::Passed(&platform_handle), port));
}
}
@@ -56,11 +55,9 @@ RemoteMessagePipeBootstrap::~RemoteMessagePipeBootstrap() {
RemoteMessagePipeBootstrap::RemoteMessagePipeBootstrap(
NodeController* node_controller,
ScopedPlatformHandle platform_handle,
- const ports::PortRef& port,
- const base::Closure& callback)
+ const ports::PortRef& port)
: node_controller_(node_controller),
local_port_(port),
- callback_(callback),
io_task_runner_(base::ThreadTaskRunnerHandle::Get()),
channel_(Channel::Create(this, std::move(platform_handle),
io_task_runner_)) {
@@ -119,8 +116,14 @@ void RemoteMessagePipeBootstrap::OnChannelMessage(
}
peer_info_received_ = true;
- node_controller_->ConnectToRemotePort(
- local_port_, data->node_name, data->port_name, callback_);
+
+ // We need to choose one side to initiate the port merge. It doesn't matter
+ // who does it as long as they don't both try. Simple solution: pick the one
+ // with the "smaller" port name.
+ if (local_port_.name() < data->port_name) {
+ node_controller_->node()->MergePorts(local_port_, data->node_name,
+ data->port_name);
+ }
// Send another ping to the other end to trigger shutdown. This may race with
// the other end sending its own ping, but it doesn't matter. Whoever wins
diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.h b/mojo/edk/system/remote_message_pipe_bootstrap.h
index 911d518..2a24680 100644
--- a/mojo/edk/system/remote_message_pipe_bootstrap.h
+++ b/mojo/edk/system/remote_message_pipe_bootstrap.h
@@ -7,7 +7,6 @@
#include <string>
-#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
@@ -28,17 +27,20 @@ class NodeController;
//
// The bootstrapping procedure the same on either end:
//
-// 1. Create a local port P.
+// 1. Select a local port P to be merged with a remote port.
// 2. Write the local node name and P's name to the bootstrap pipe.
// 3. When a message is read from the pipe:
// - If it's the first message read, extract the remote node+port name and
-// initialize the local port. Send an empty ACK message on the pipe.
+// and send an empty ACK message on the pipe.
// - If it's the second message read, close the channel, and delete |this|.
// 4. When an error occus on the pipe, delete |this|.
//
// Excluding irrecoverable error conditions such as either process dying,
// armageddon, etc., this ensures neither end closes the channel until both ends
-// have intiailized their corresponding local port.
+// are aware of each other's port-to-merge.
+//
+// At step 3, one side of the channel is chosen to issue a message at the Ports
+// layer which eventually merges the two ports.
class RemoteMessagePipeBootstrap
: public Channel::Delegate,
public base::MessageLoop::DestructionObserver {
@@ -48,22 +50,18 @@ class RemoteMessagePipeBootstrap
// |port| must be a reference to an uninitialized local port.
static void Create(NodeController* node_controller,
ScopedPlatformHandle platform_handle,
- const ports::PortRef& port,
- const base::Closure& callback);
+ const ports::PortRef& port);
protected:
- explicit RemoteMessagePipeBootstrap(
- NodeController* node_controller,
- ScopedPlatformHandle platform_handle,
- const ports::PortRef& port,
- const base::Closure& callback);
+ explicit RemoteMessagePipeBootstrap(NodeController* node_controller,
+ ScopedPlatformHandle platform_handle,
+ const ports::PortRef& port);
void ShutDown();
bool shutting_down_ = false;
NodeController* node_controller_;
const ports::PortRef local_port_;
- base::Closure callback_;
scoped_refptr<base::TaskRunner> io_task_runner_;
scoped_refptr<Channel> channel_;
diff --git a/mojo/shell/runner/child/runner_connection.cc b/mojo/shell/runner/child/runner_connection.cc
index 75b4648..313c00a 100644
--- a/mojo/shell/runner/child/runner_connection.cc
+++ b/mojo/shell/runner/child/runner_connection.cc
@@ -74,13 +74,6 @@ class Blocker {
using GotApplicationRequestCallback =
base::Callback<void(InterfaceRequest<mojom::ShellClient>)>;
-void OnCreateMessagePipe(ScopedMessagePipeHandle* result,
- Blocker::Unblocker unblocker,
- ScopedMessagePipeHandle pipe) {
- *result = std::move(pipe);
- unblocker.Unblock(base::Bind(&base::DoNothing));
-}
-
void OnGotApplicationRequest(InterfaceRequest<mojom::ShellClient>* out_request,
InterfaceRequest<mojom::ShellClient> request) {
*out_request = std::move(request);
@@ -229,14 +222,11 @@ bool RunnerConnectionImpl::WaitForApplicationRequest(
std::string primordial_pipe_token =
base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
switches::kPrimordialPipeToken);
- Blocker blocker;
- edk::CreateChildMessagePipe(
- primordial_pipe_token,
- base::Bind(&OnCreateMessagePipe, base::Unretained(&handle),
- blocker.GetUnblocker()));
- blocker.Block();
+ handle = edk::CreateChildMessagePipe(primordial_pipe_token);
}
+ DCHECK(handle.is_valid());
+
Blocker blocker;
controller_runner_->PostTask(
FROM_HERE,
diff --git a/mojo/shell/runner/host/child_process.cc b/mojo/shell/runner/host/child_process.cc
index 0208b84..75230f0 100644
--- a/mojo/shell/runner/host/child_process.cc
+++ b/mojo/shell/runner/host/child_process.cc
@@ -51,10 +51,6 @@ namespace shell {
namespace {
-void DidCreateChannel(embedder::ChannelInfo* channel_info) {
- DVLOG(2) << "ChildControllerImpl::DidCreateChannel()";
-}
-
// Blocker ---------------------------------------------------------------------
// Blocks a thread until another thread unblocks it, at which point it unblocks
@@ -300,34 +296,14 @@ scoped_ptr<mojo::shell::LinuxSandbox> InitializeSandbox() {
}
#endif
-void InitializeHostMessagePipe(
- embedder::ScopedPlatformHandle platform_channel,
- scoped_refptr<base::TaskRunner> io_task_runner,
- const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
- embedder::SetParentPipeHandle(std::move(platform_channel));
- std::string primordial_pipe_token =
- base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
- switches::kPrimordialPipeToken);
- edk::CreateChildMessagePipe(primordial_pipe_token, callback);
- } else {
- ScopedMessagePipeHandle host_message_pipe;
- host_message_pipe =
- embedder::CreateChannel(std::move(platform_channel),
- base::Bind(&DidCreateChannel), io_task_runner);
- callback.Run(std::move(host_message_pipe));
- }
-}
-
-void OnHostMessagePipeCreated(AppContext* app_context,
- base::NativeLibrary app_library,
- const Blocker::Unblocker& unblocker,
- ScopedMessagePipeHandle pipe) {
- app_context->controller_runner()->PostTask(
- FROM_HERE,
- base::Bind(&ChildControllerImpl::Init, base::Unretained(app_context),
- base::Unretained(app_library), base::Passed(&pipe),
- unblocker));
+ScopedMessagePipeHandle InitializeHostMessagePipe(
+ edk::ScopedPlatformHandle platform_channel,
+ scoped_refptr<base::TaskRunner> io_task_runner) {
+ edk::SetParentPipeHandle(std::move(platform_channel));
+ std::string primordial_pipe_token =
+ base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ switches::kPrimordialPipeToken);
+ return edk::CreateChildMessagePipe(primordial_pipe_token);
}
} // namespace
@@ -360,8 +336,8 @@ int ChildProcessMain() {
sandbox = InitializeSandbox();
#endif
- embedder::ScopedPlatformHandle platform_channel =
- embedder::PlatformChannelPair::PassClientHandleFromParentProcess(
+ edk::ScopedPlatformHandle platform_channel =
+ edk::PlatformChannelPair::PassClientHandleFromParentProcess(
command_line);
CHECK(platform_channel.is_valid());
@@ -372,13 +348,12 @@ int ChildProcessMain() {
app_context.Init();
app_context.StartControllerThread();
+ ScopedMessagePipeHandle host_pipe = InitializeHostMessagePipe(
+ std::move(platform_channel), app_context.io_runner());
app_context.controller_runner()->PostTask(
FROM_HERE,
- base::Bind(
- &InitializeHostMessagePipe, base::Passed(&platform_channel),
- make_scoped_refptr(app_context.io_runner()),
- base::Bind(&OnHostMessagePipeCreated, base::Unretained(&app_context),
- base::Unretained(app_library), blocker.GetUnblocker())));
+ base::Bind(&ChildControllerImpl::Init, &app_context, app_library,
+ base::Passed(&host_pipe), blocker.GetUnblocker()));
// This will block, then run whatever the controller wants.
blocker.Block();
diff --git a/mojo/shell/runner/host/child_process_host.cc b/mojo/shell/runner/host/child_process_host.cc
index 9a5b439..2b3cfc9 100644
--- a/mojo/shell/runner/host/child_process_host.cc
+++ b/mojo/shell/runner/host/child_process_host.cc
@@ -35,55 +35,24 @@
namespace mojo {
namespace shell {
-ChildProcessHost::PipeHolder::PipeHolder() {}
-
-void ChildProcessHost::PipeHolder::Reject() {
- base::AutoLock lock(lock_);
- reject_pipe_ = true;
- pipe_.reset();
-}
-
-void ChildProcessHost::PipeHolder::SetPipe(ScopedMessagePipeHandle pipe) {
- base::AutoLock lock(lock_);
- DCHECK(!pipe_.is_valid());
- if (!reject_pipe_)
- pipe_ = std::move(pipe);
-}
-
-ScopedMessagePipeHandle ChildProcessHost::PipeHolder::PassPipe() {
- base::AutoLock lock(lock_);
- DCHECK(pipe_.is_valid());
- return std::move(pipe_);
-}
-
-ChildProcessHost::PipeHolder::~PipeHolder() {}
-
ChildProcessHost::ChildProcessHost(base::TaskRunner* launch_process_runner,
bool start_sandboxed,
const base::FilePath& app_path)
: launch_process_runner_(launch_process_runner),
start_sandboxed_(start_sandboxed),
app_path_(app_path),
- channel_info_(nullptr),
start_child_process_event_(false, false),
weak_factory_(this) {
- pipe_holder_ = new PipeHolder();
- if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
- node_channel_.reset(new edk::PlatformChannelPair);
- primordial_pipe_token_ = edk::GenerateRandomToken();
- } else {
- pipe_holder_->SetPipe(embedder::CreateChannel(
- platform_channel_pair_.PassServerHandle(),
- base::Bind(&ChildProcessHost::DidCreateChannel, base::Unretained(this)),
- base::ThreadTaskRunnerHandle::Get()));
- OnMessagePipeCreated();
- }
+ node_channel_.reset(new edk::PlatformChannelPair);
+ primordial_pipe_token_ = edk::GenerateRandomToken();
+ controller_.Bind(
+ InterfacePtrInfo<mojom::ChildController>(
+ edk::CreateParentMessagePipe(primordial_pipe_token_), 0u));
}
ChildProcessHost::ChildProcessHost(ScopedHandle channel)
: launch_process_runner_(nullptr),
start_sandboxed_(false),
- channel_info_(nullptr),
start_child_process_event_(false, false),
weak_factory_(this) {
CHECK(channel.is_valid());
@@ -99,40 +68,11 @@ ChildProcessHost::~ChildProcessHost() {
void ChildProcessHost::Start(const ProcessReadyCallback& callback) {
DCHECK(!child_process_.IsValid());
- DCHECK(process_ready_callback_.is_null());
-
- process_ready_callback_ = callback;
- if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
- // With the new EDK, bootstrap message pipes are created asynchronously.
- // We recieve the bound pipe (if successful) on an arbitrary thread,
- // stash it in the thread-safe |pipe_holder_|, and then try to call
- // OnMessagePipeCreated() on the host's main thread.
- //
- // Because of the way the launcher process shuts down, it's possible for
- // the main thread's MessageLoop to stop running (but not yet be destroyed!)
- // while this boostrap is pending, resulting in OnMessagePipeCreated() never
- // being called.
- //
- // A typical child process (i.e. one using ShellConnection to bind the other
- // end of this pipe) may hang forever waiting for an Initialize() message
- // unless the pipe is closed. This in turn means that Join() could hang
- // waiting for the process to exit. Deadlock!
- //
- // |pipe_holder_| exists for this reason. If it's still holding onto the
- // pipe when Join() is called, the pipe will be closed.
- DCHECK(!primordial_pipe_token_.empty());
- edk::CreateParentMessagePipe(
- primordial_pipe_token_,
- base::Bind(&OnParentMessagePipeCreated, pipe_holder_,
- base::ThreadTaskRunnerHandle::Get(),
- base::Bind(&ChildProcessHost::OnMessagePipeCreated,
- weak_factory_.GetWeakPtr())));
- }
-
launch_process_runner_->PostTaskAndReply(
FROM_HERE,
base::Bind(&ChildProcessHost::DoLaunch, base::Unretained(this)),
- base::Bind(&ChildProcessHost::DidStart, weak_factory_.GetWeakPtr()));
+ base::Bind(&ChildProcessHost::DidStart,
+ weak_factory_.GetWeakPtr(), callback));
}
int ChildProcessHost::Join() {
@@ -142,10 +82,6 @@ int ChildProcessHost::Join() {
controller_ = mojom::ChildControllerPtr();
DCHECK(child_process_.IsValid());
- // Ensure the child pipe is closed even if it wasn't yet connected to the
- // controller.
- pipe_holder_->Reject();
-
int rv = -1;
LOG_IF(ERROR, !child_process_.WaitForExit(&rv))
<< "Failed to wait for child process";
@@ -172,11 +108,11 @@ void ChildProcessHost::ExitNow(int32_t exit_code) {
controller_->ExitNow(exit_code);
}
-void ChildProcessHost::DidStart() {
+void ChildProcessHost::DidStart(const ProcessReadyCallback& callback) {
DVLOG(2) << "ChildProcessHost::DidStart()";
if (child_process_.IsValid()) {
- MaybeNotifyProcessReady();
+ callback.Run(child_process_.Pid());
} else {
LOG(ERROR) << "Failed to start child process";
AppCompleted(MOJO_RESULT_UNKNOWN);
@@ -278,34 +214,5 @@ void ChildProcessHost::AppCompleted(int32_t result) {
}
}
-void ChildProcessHost::DidCreateChannel(embedder::ChannelInfo* channel_info) {
- DVLOG(2) << "AppChildProcessHost::DidCreateChannel()";
-
- DCHECK(channel_info ||
- base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk"));
- channel_info_ = channel_info;
-}
-
-void ChildProcessHost::OnMessagePipeCreated() {
- controller_.Bind(
- InterfacePtrInfo<mojom::ChildController>(pipe_holder_->PassPipe(), 0u));
- MaybeNotifyProcessReady();
-}
-
-void ChildProcessHost::MaybeNotifyProcessReady() {
- if (controller_.is_bound() && child_process_.IsValid())
- process_ready_callback_.Run(child_process_.Pid());
-}
-
-// static
-void ChildProcessHost::OnParentMessagePipeCreated(
- scoped_refptr<PipeHolder> holder,
- scoped_refptr<base::TaskRunner> callback_task_runner,
- const base::Closure& callback,
- ScopedMessagePipeHandle pipe) {
- holder->SetPipe(std::move(pipe));
- callback_task_runner->PostTask(FROM_HERE, callback);
-}
-
} // namespace shell
} // namespace mojo
diff --git a/mojo/shell/runner/host/child_process_host.h b/mojo/shell/runner/host/child_process_host.h
index 906a1a2..2664d30 100644
--- a/mojo/shell/runner/host/child_process_host.h
+++ b/mojo/shell/runner/host/child_process_host.h
@@ -71,70 +71,24 @@ class ChildProcessHost {
void ExitNow(int32_t exit_code);
protected:
- void DidStart();
+ void DidStart(const ProcessReadyCallback& callback);
private:
- // A thread-safe holder for the bootstrap message pipe to this child process.
- // The pipe is established on an arbitrary thread and may not be connected
- // until the host's message loop has stopped running.
- class PipeHolder : public base::RefCountedThreadSafe<PipeHolder> {
- public:
- PipeHolder();
-
- void Reject();
- void SetPipe(ScopedMessagePipeHandle pipe);
- ScopedMessagePipeHandle PassPipe();
-
- private:
- friend class base::RefCountedThreadSafe<PipeHolder>;
-
- ~PipeHolder();
-
- base::Lock lock_;
- bool reject_pipe_ = false;
- ScopedMessagePipeHandle pipe_;
-
- DISALLOW_COPY_AND_ASSIGN(PipeHolder);
- };
-
void DoLaunch();
void AppCompleted(int32_t result);
- // Callback for |embedder::CreateChannel()|.
- void DidCreateChannel(embedder::ChannelInfo* channel_info);
-
- // Called once |pipe_holder_| is bound to a pipe.
- void OnMessagePipeCreated();
-
- // Called when the child process is launched and when the bootstrap
- // message pipe is created. Once both things have happened (which may happen
- // in either order), |process_ready_callback_| is invoked.
- void MaybeNotifyProcessReady();
-
- // Callback used to receive the child message pipe from the ports EDK.
- // This may be called on any thread. It will always stash the pipe in
- // |holder|, and it will then attempt to call |callback| on
- // |callback_task_runner| (which may or may not still be running tasks.)
- static void OnParentMessagePipeCreated(
- scoped_refptr<PipeHolder> holder,
- scoped_refptr<base::TaskRunner> callback_task_runner,
- const base::Closure& callback,
- ScopedMessagePipeHandle pipe);
-
scoped_refptr<base::TaskRunner> launch_process_runner_;
bool start_sandboxed_;
const base::FilePath app_path_;
base::Process child_process_;
// Used for the ChildController binding.
- embedder::PlatformChannelPair platform_channel_pair_;
+ edk::PlatformChannelPair platform_channel_pair_;
mojom::ChildControllerPtr controller_;
- embedder::ChannelInfo* channel_info_;
mojom::ChildController::StartAppCallback on_app_complete_;
- embedder::HandlePassingInformation handle_passing_info_;
+ edk::HandlePassingInformation handle_passing_info_;
- // Used only when --use-new-edk is specified. Used to back the NodeChannel
- // between the parent and child node.
+ // Used to back the NodeChannel between the parent and child node.
scoped_ptr<edk::PlatformChannelPair> node_channel_;
// Since Start() calls a method on another thread, we use an event to block
@@ -144,14 +98,6 @@ class ChildProcessHost {
// A token the child can use to connect a primordial pipe to the host.
std::string primordial_pipe_token_;
- // Holds the message pipe to the child process until it is either closed or
- // bound to the controller interface.
- scoped_refptr<PipeHolder> pipe_holder_;
-
- // Invoked exactly once, as soon as the child process's ID is known and
- // a pipe to the child has been established.
- ProcessReadyCallback process_ready_callback_;
-
base::WeakPtrFactory<ChildProcessHost> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ChildProcessHost);