summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorblundell <blundell@chromium.org>2016-02-09 02:26:54 -0800
committerCommit bot <commit-bot@chromium.org>2016-02-09 10:27:56 +0000
commitc8588be061339ced9a34d3d7e3319daf7eee241d (patch)
treec51d3e41c3d4029ba97d47588867e1e9a2f085ae
parent1f071893d22e2260b5ebc6c97a02626a9ca945fe (diff)
downloadchromium_src-c8588be061339ced9a34d3d7e3319daf7eee241d.zip
chromium_src-c8588be061339ced9a34d3d7e3319daf7eee241d.tar.gz
chromium_src-c8588be061339ced9a34d3d7e3319daf7eee241d.tar.bz2
Revert of [mojo-edk] Simplify multiprocess pipe bootstrap (patchset #7 id:140001 of https://codereview.chromium.org/1675603002/ )
Reason for revert: This patch broke several browsertests on Linux: PlatformAppBrowserTest.LoadAndLaunchAppChromeRunning, PlatformAppBrowserTest.LoadAndLaunchAppWithFile, PolicyMakeDefaultBrowserTest.MakeDefaultDisabled (e.g., https://build.chromium.org/p/chromium.linux/buildstatus?builder=Linux%20Tests%20%28dbg%29%281%29%2832%29&number=25685). I could repro the failures locally, and reverting this patch fixed them. Example failure: PlatformAppBrowserTest.LoadAndLaunchAppChromeRunning (run #1): [ RUN ] PlatformAppBrowserTest.LoadAndLaunchAppChromeRunning <snip> [26865:26901:0209/004101:FATAL:thread.cc(270)] Check failed: GetThreadWasQuitProperly(). #0 0x0000f7142d94 base::debug::StackTrace::StackTrace() #1 0x0000f71a998f logging::LogMessage::~LogMessage() #2 0x0000f72cc94f base::Thread::ThreadMain() #3 0x0000f72b487a base::(anonymous namespace)::ThreadFunc() #4 0x0000e749dd4c start_thread #5 0x0000e6a7cb8e clone Original issue's description: > [mojo-edk] Simplify multiprocess pipe bootstrap > > This introduces a new MergePort message at the Ports layer > for joining two independent port cycles which each have > an unused (i.e. unwritten, unread, unsent) receiving port. > > MergePort allows us to create a MessagePipeDispatcher which > is immediately usable but which will eventually be linked to > a MessagePipeDispatcher on another port cycle, potentially in > another process. > > The basic idea is to create a fully functional port pair but > only bind one port to an MPD. Do this on each end and > merge the dangling ports asynchronously. > > The simplification here allows a lot of code to be deleted > from NodeController, some of which is deleted in this CL. > > Future work will convert existing bootstrap sites back to > using synchronous bootstrap, including the token-based APIs. > > BUG=584764 > TBR=ben@chromium.org for null check in mash shell > > Committed: https://crrev.com/b3ea203171e07f5c7e476e94d210ec4ad53ce5b0 > Cr-Commit-Position: refs/heads/master@{#374322} TBR=amistry@chromium.org,darin@chormium.org,rockot@chromium.org # Skipping CQ checks because original CL landed less than 1 days ago. NOPRESUBMIT=true NOTREECHECKS=true NOTRY=true BUG=584764 Review URL: https://codereview.chromium.org/1678333003 Cr-Commit-Position: refs/heads/master@{#374346}
-rw-r--r--mash/shell/shell_application_delegate.cc8
-rw-r--r--mojo/edk/embedder/embedder.cc20
-rw-r--r--mojo/edk/embedder/embedder.h34
-rw-r--r--mojo/edk/system/core.cc71
-rw-r--r--mojo/edk/system/core.h25
-rw-r--r--mojo/edk/system/multiprocess_message_pipe_unittest.cc39
-rw-r--r--mojo/edk/system/node_channel.cc47
-rw-r--r--mojo/edk/system/node_channel.h17
-rw-r--r--mojo/edk/system/node_controller.cc204
-rw-r--r--mojo/edk/system/node_controller.h104
-rw-r--r--mojo/edk/system/ports/event.h6
-rw-r--r--mojo/edk/system/ports/message.cc3
-rw-r--r--mojo/edk/system/ports/name.h7
-rw-r--r--mojo/edk/system/ports/node.cc179
-rw-r--r--mojo/edk/system/ports/node.h17
-rw-r--r--mojo/edk/system/ports/ports_unittest.cc338
-rw-r--r--mojo/edk/system/remote_message_pipe_bootstrap.cc23
-rw-r--r--mojo/edk/system/remote_message_pipe_bootstrap.h22
-rw-r--r--mojo/shell/runner/child/runner_connection.cc16
-rw-r--r--mojo/shell/runner/host/child_process.cc53
-rw-r--r--mojo/shell/runner/host/child_process_host.cc111
-rw-r--r--mojo/shell/runner/host/child_process_host.h62
22 files changed, 661 insertions, 745 deletions
diff --git a/mash/shell/shell_application_delegate.cc b/mash/shell/shell_application_delegate.cc
index a88c6d6..032ad13 100644
--- a/mash/shell/shell_application_delegate.cc
+++ b/mash/shell/shell_application_delegate.cc
@@ -117,12 +117,8 @@ void ShellApplicationDelegate::StartRestartableService(
// TODO(beng): This would be the place to insert logic that counted restarts
// to avoid infinite crash-restart loops.
scoped_ptr<mojo::Connection> connection = shell_->Connect(url);
- // Note: |connection| may be null if we've lost our connection to the shell.
- if (connection) {
- connection->SetRemoteServiceProviderConnectionErrorHandler(
- restart_callback);
- connections_[url] = std::move(connection);
- }
+ connection->SetRemoteServiceProviderConnectionErrorHandler(restart_callback);
+ connections_[url] = std::move(connection);
}
} // namespace shell
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index 926ee18..d1ab6c6 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -107,37 +107,29 @@ void ShutdownIPCSupport() {
ScopedMessagePipeHandle CreateMessagePipe(
ScopedPlatformHandle platform_handle) {
- DCHECK(internal::g_core);
- return internal::g_core->CreateMessagePipe(std::move(platform_handle));
+ NOTREACHED();
+ return ScopedMessagePipeHandle();
}
void CreateMessagePipe(
ScopedPlatformHandle platform_handle,
const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
DCHECK(internal::g_core);
- callback.Run(CreateMessagePipe(std::move(platform_handle)));
-}
-
-ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token) {
- DCHECK(internal::g_core);
- return internal::g_core->CreateParentMessagePipe(token);
+ internal::g_core->CreateMessagePipe(std::move(platform_handle), callback);
}
void CreateParentMessagePipe(
const std::string& token,
const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- callback.Run(CreateParentMessagePipe(token));
-}
-
-ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token) {
DCHECK(internal::g_core);
- return internal::g_core->CreateChildMessagePipe(token);
+ internal::g_core->CreateParentMessagePipe(token, callback);
}
void CreateChildMessagePipe(
const std::string& token,
const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
- callback.Run(CreateChildMessagePipe(token));
+ DCHECK(internal::g_core);
+ internal::g_core->CreateChildMessagePipe(token, callback);
}
std::string GenerateRandomToken() {
diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h
index 49b64c4..b43748c 100644
--- a/mojo/edk/embedder/embedder.h
+++ b/mojo/edk/embedder/embedder.h
@@ -118,12 +118,7 @@ MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupportOnIOThread();
// |OnShutdownComplete()|.
MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupport();
-// Creates a message pipe over an arbitrary platform channel. The other end of
-// the channel must also be passed to this function. Either endpoint can be in
-// any process.
-//
-// Note that the channel is only used to negotiate pipe connection, not as the
-// transport for messages on the pipe.
+// Unused. Crashes. Only here for linking.
MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
CreateMessagePipe(ScopedPlatformHandle platform_handle);
@@ -133,10 +128,13 @@ CreateMessagePipe(ScopedPlatformHandle platform_handle);
// either PreInitializeChildProcess() or SetParentPipe() must have been been
// called at least once already.
//
-// |callback| must be safe to call from any thread.
+// Note: This only exists for backwards compatibility with embedders that rely
+// on mojo::embedder::CreateChannel() behavior. If you have a means of passing
+// platform handles around, you can probably also pass strings around. If you
+// can pass strings around, use CreateParentMessagePipe() and
+// CreateChlidMessagePipe() instead (see below.)
//
-// DEPRECATED: Please don't use this. Use the synchronous version above. This
-// is now merely an inconvenient wrapper for that.
+// |callback| must be safe to call from any thread.
MOJO_SYSTEM_IMPL_EXPORT void
CreateMessagePipe(
ScopedPlatformHandle platform_handle,
@@ -145,17 +143,8 @@ CreateMessagePipe(
// Creates a message pipe from a token. A child embedder must also have this
// token and call CreateChildMessagePipe() with it in order for the pipe to get
// connected.
-MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
-CreateParentMessagePipe(const std::string& token);
-
-// Creates a message pipe from a token. A child embedder must also have this
-// token and call CreateChildMessagePipe() with it in order for the pipe to get
-// connected.
//
// |callback| must be safe to call from any thread.
-//
-// DEPRECATED: Please don't use this. Use the synchronous version above. This
-// is now merely an inconvenient wrapper for that.
MOJO_SYSTEM_IMPL_EXPORT void
CreateParentMessagePipe(
const std::string& token,
@@ -164,17 +153,8 @@ CreateParentMessagePipe(
// Creates a message pipe from a token in a child process. The parent must also
// have this token and call CreateParentMessagePipe() with it in order for the
// pipe to get connected.
-MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
-CreateChildMessagePipe(const std::string& token);
-
-// Creates a message pipe from a token in a child process. The parent must also
-// have this token and call CreateParentMessagePipe() with it in order for the
-// pipe to get connected.
//
// |callback| must be safe to call from any thread.
-//
-// DEPRECATED: Please don't use this. Use the synchronous version above. This
-// is now merely an inconvenient wrapper for that.
MOJO_SYSTEM_IMPL_EXPORT void
CreateChildMessagePipe(
const std::string& token,
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index d0b7b67..aad9541 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -44,9 +44,19 @@ namespace {
// This is an unnecessarily large limit that is relatively easy to enforce.
const uint32_t kMaxHandlesPerMessage = 1024 * 1024;
-// TODO: Maybe we could negotiate a debugging pipe ID for cross-process pipes
-// too; for now we just use a constant. This only affects bootstrap pipes.
-const uint64_t kUnknownPipeIdForDebug = 0x7f7f7f7f7f7f7f7fUL;
+void OnPortConnected(
+ Core* core,
+ int endpoint,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback,
+ const ports::PortRef& port) {
+ // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too;
+ // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for
+ // bootstrap and aren't passed around, so tracking them is less important.
+ MojoHandle handle = core->AddDispatcher(
+ new MessagePipeDispatcher(core->GetNodeController(), port,
+ 0x7f7f7f7f7f7f7f7fUL, endpoint));
+ callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle)));
+}
} // namespace
@@ -150,37 +160,32 @@ void Core::RequestShutdown(const base::Closure& callback) {
GetNodeController()->RequestShutdown(on_shutdown);
}
-ScopedMessagePipeHandle Core::CreateMessagePipe(
- ScopedPlatformHandle platform_handle) {
- ports::PortRef port0, port1;
- GetNodeController()->node()->CreatePortPair(&port0, &port1);
- MojoHandle handle = AddDispatcher(
- new MessagePipeDispatcher(GetNodeController(), port0,
- kUnknownPipeIdForDebug, 0));
+void Core::CreateMessagePipe(
+ ScopedPlatformHandle platform_handle,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
+ ports::PortRef port;
+ GetNodeController()->node()->CreateUninitializedPort(&port);
RemoteMessagePipeBootstrap::Create(
- GetNodeController(), std::move(platform_handle), port1);
- return ScopedMessagePipeHandle(MessagePipeHandle(handle));
-}
-
-ScopedMessagePipeHandle Core::CreateParentMessagePipe(
- const std::string& token) {
- ports::PortRef port0, port1;
- GetNodeController()->node()->CreatePortPair(&port0, &port1);
- MojoHandle handle = AddDispatcher(
- new MessagePipeDispatcher(GetNodeController(), port0,
- kUnknownPipeIdForDebug, 0));
- GetNodeController()->ReservePort(token, port1);
- return ScopedMessagePipeHandle(MessagePipeHandle(handle));
-}
-
-ScopedMessagePipeHandle Core::CreateChildMessagePipe(const std::string& token) {
- ports::PortRef port0, port1;
- GetNodeController()->node()->CreatePortPair(&port0, &port1);
- MojoHandle handle = AddDispatcher(
- new MessagePipeDispatcher(GetNodeController(), port0,
- kUnknownPipeIdForDebug, 1));
- GetNodeController()->MergePortIntoParent(token, port1);
- return ScopedMessagePipeHandle(MessagePipeHandle(handle));
+ GetNodeController(), std::move(platform_handle), port,
+ base::Bind(&OnPortConnected, base::Unretained(this), 0, callback, port));
+}
+
+void Core::CreateParentMessagePipe(
+ const std::string& token,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
+ GetNodeController()->ReservePort(
+ token,
+ base::Bind(&OnPortConnected, base::Unretained(this), 0, callback));
+}
+
+void Core::CreateChildMessagePipe(
+ const std::string& token,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
+ ports::PortRef port;
+ GetNodeController()->node()->CreateUninitializedPort(&port);
+ GetNodeController()->ConnectToParentPort(
+ port, token,
+ base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port));
}
MojoResult Core::AsyncWait(MojoHandle handle,
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index 603cba9..8e9af8e 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -52,19 +52,26 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
// Called in a child process exactly once during early initialization.
void InitChild(ScopedPlatformHandle platform_handle);
- // Creates a message pipe endpoint connected to an endpoint in a remote
+ // This creates a message pipe endpoint connected to an endpoint in a remote
// embedder. |platform_handle| is used as a channel to negotiate the
- // connection.
- ScopedMessagePipeHandle CreateMessagePipe(
- ScopedPlatformHandle platform_handle);
+ // connection. This is only here to facilitate legacy embedder code. See
+ // mojo::edk::CreateMessagePipe in mojo/edk/embedder/embedder.h.
+ void CreateMessagePipe(
+ ScopedPlatformHandle platform_handle,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback);
// Creates a message pipe endpoint associated with |token|, which a child
// holding the token can later locate and connect to.
- ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token);
-
- // Creates a message pipe endpoint and connects it to a pipe the parent has
- // associated with |token|.
- ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token);
+ void CreateParentMessagePipe(
+ const std::string& token,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback);
+
+ // Creates a message pipe endpoint associated with |token|, which will be
+ // passed to the parent in order to find an associated remote port and connect
+ // to it.
+ void CreateChildMessagePipe(
+ const std::string& token,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback);
MojoHandle AddDispatcher(scoped_refptr<Dispatcher> dispatcher);
diff --git a/mojo/edk/system/multiprocess_message_pipe_unittest.cc b/mojo/edk/system/multiprocess_message_pipe_unittest.cc
index ece85a1..00c2ab5 100644
--- a/mojo/edk/system/multiprocess_message_pipe_unittest.cc
+++ b/mojo/edk/system/multiprocess_message_pipe_unittest.cc
@@ -19,7 +19,6 @@
#include "base/logging.h"
#include "base/strings/string_split.h"
#include "build/build_config.h"
-#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/handle_signals_state.h"
#include "mojo/edk/system/test_utils.h"
@@ -1272,44 +1271,6 @@ TEST_F(MultiprocessMessagePipeTest, WriteCloseSendPeer) {
END_CHILD()
}
-DEFINE_TEST_CLIENT_TEST_WITH_PIPE(BootstrapMessagePipeAsyncClient,
- MultiprocessMessagePipeTest, parent) {
- // Receive one end of a platform channel from the parent.
- MojoHandle channel_handle;
- EXPECT_EQ("hi", ReadMessageWithHandles(parent, &channel_handle, 1));
- ScopedPlatformHandle channel;
- EXPECT_EQ(MOJO_RESULT_OK,
- edk::PassWrappedPlatformHandle(channel_handle, &channel));
- ASSERT_TRUE(channel.is_valid());
-
- // Create a new pipe using our end of the channel.
- ScopedMessagePipeHandle pipe = edk::CreateMessagePipe(std::move(channel));
-
- // Ensure that we can read and write on the new pipe.
- VerifyEcho(pipe.get().value(), "goodbye");
-}
-
-TEST_F(MultiprocessMessagePipeTest, BootstrapMessagePipeAsync) {
- // Tests that new cross-process message pipes can be created synchronously
- // using asynchronous negotiation over an arbitrary platform channel.
- RUN_CHILD_ON_PIPE(BootstrapMessagePipeAsyncClient, child)
- // Pass one end of a platform channel to the child.
- PlatformChannelPair platform_channel;
- MojoHandle client_channel_handle;
- EXPECT_EQ(MOJO_RESULT_OK,
- CreatePlatformHandleWrapper(platform_channel.PassClientHandle(),
- &client_channel_handle));
- WriteMessageWithHandles(child, "hi", &client_channel_handle, 1);
-
- // Create a new pipe using our end of the channel.
- ScopedMessagePipeHandle pipe =
- edk::CreateMessagePipe(platform_channel.PassServerHandle());
-
- // Ensure that we can read and write on the new pipe.
- VerifyEcho(pipe.get().value(), "goodbye");
- END_CHILD()
-}
-
} // namespace
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/node_channel.cc b/mojo/edk/system/node_channel.cc
index 0277d46..4fb0ac4 100644
--- a/mojo/edk/system/node_channel.cc
+++ b/mojo/edk/system/node_channel.cc
@@ -29,7 +29,8 @@ enum class MessageType : uint32_t {
BROKER_CLIENT_ADDED,
ACCEPT_BROKER_CLIENT,
PORTS_MESSAGE,
- REQUEST_PORT_MERGE,
+ REQUEST_PORT_CONNECTION,
+ CONNECT_TO_PORT,
REQUEST_INTRODUCTION,
INTRODUCE,
#if defined(OS_WIN)
@@ -73,10 +74,15 @@ struct AcceptBrokerClientData {
// This is followed by arbitrary payload data which is interpreted as a token
// string for port location.
-struct RequestPortMergeData {
+struct RequestPortConnectionData {
ports::PortName connector_port_name;
};
+struct ConnectToPortData {
+ ports::PortName connector_port_name;
+ ports::PortName connectee_port_name;
+};
+
// Used for both REQUEST_INTRODUCTION and INTRODUCE.
//
// For INTRODUCE the message also includes a valid platform handle for a channel
@@ -261,17 +267,28 @@ void NodeChannel::PortsMessage(Channel::MessagePtr message) {
WriteChannelMessage(std::move(message));
}
-void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
- const std::string& token) {
- RequestPortMergeData* data;
+void NodeChannel::RequestPortConnection(
+ const ports::PortName& connector_port_name,
+ const std::string& token) {
+ RequestPortConnectionData* data;
Channel::MessagePtr message = CreateMessage(
- MessageType::REQUEST_PORT_MERGE,
- sizeof(RequestPortMergeData) + token.size(), 0, &data);
+ MessageType::REQUEST_PORT_CONNECTION,
+ sizeof(RequestPortConnectionData) + token.size(), 0, &data);
data->connector_port_name = connector_port_name;
memcpy(data + 1, token.data(), token.size());
WriteChannelMessage(std::move(message));
}
+void NodeChannel::ConnectToPort(const ports::PortName& connector_port_name,
+ const ports::PortName& connectee_port_name) {
+ ConnectToPortData* data;
+ Channel::MessagePtr message = CreateMessage(
+ MessageType::CONNECT_TO_PORT, sizeof(ConnectToPortData), 0, &data);
+ data->connector_port_name = connector_port_name;
+ data->connectee_port_name = connectee_port_name;
+ WriteChannelMessage(std::move(message));
+}
+
void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
IntroductionData* data;
Channel::MessagePtr message = CreateMessage(
@@ -435,16 +452,24 @@ void NodeChannel::OnChannelMessage(const void* payload,
break;
}
- case MessageType::REQUEST_PORT_MERGE: {
- const RequestPortMergeData* data;
+ case MessageType::REQUEST_PORT_CONNECTION: {
+ const RequestPortConnectionData* data;
GetMessagePayload(payload, &data);
const char* token_data = reinterpret_cast<const char*>(data + 1);
const size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
std::string token(token_data, token_size);
- delegate_->OnRequestPortMerge(remote_node_name_,
- data->connector_port_name, token);
+ delegate_->OnRequestPortConnection(remote_node_name_,
+ data->connector_port_name, token);
+ break;
+ }
+
+ case MessageType::CONNECT_TO_PORT: {
+ const ConnectToPortData* data;
+ GetMessagePayload(payload, &data);
+ delegate_->OnConnectToPort(remote_node_name_, data->connector_port_name,
+ data->connectee_port_name);
break;
}
diff --git a/mojo/edk/system/node_channel.h b/mojo/edk/system/node_channel.h
index 2bc86bd..a9bc520 100644
--- a/mojo/edk/system/node_channel.h
+++ b/mojo/edk/system/node_channel.h
@@ -44,9 +44,14 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) = 0;
virtual void OnPortsMessage(Channel::MessagePtr message) = 0;
- virtual void OnRequestPortMerge(const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const std::string& token) = 0;
+ virtual void OnRequestPortConnection(
+ const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const std::string& token) = 0;
+ virtual void OnConnectToPort(
+ const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const ports::PortName& connectee_port_name) = 0;
virtual void OnRequestIntroduction(const ports::NodeName& from_node,
const ports::NodeName& name) = 0;
virtual void OnIntroduce(const ports::NodeName& from_node,
@@ -98,8 +103,10 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
void AcceptBrokerClient(const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel);
void PortsMessage(Channel::MessagePtr message);
- void RequestPortMerge(const ports::PortName& connector_port_name,
- const std::string& token);
+ void RequestPortConnection(const ports::PortName& connector_port_name,
+ const std::string& token);
+ void ConnectToPort(const ports::PortName& connector_port_name,
+ const ports::PortName& connectee_port_name);
void RequestIntroduction(const ports::NodeName& name);
void Introduce(const ports::NodeName& name,
ScopedPlatformHandle channel_handle);
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
index 38c53df..6410cae 100644
--- a/mojo/edk/system/node_controller.cc
+++ b/mojo/edk/system/node_controller.cc
@@ -99,6 +99,18 @@ class ThreadDestructionObserver :
} // namespace
+NodeController::PendingPortRequest::PendingPortRequest() {}
+
+NodeController::PendingPortRequest::~PendingPortRequest() {}
+
+NodeController::ReservedPort::ReservedPort() {}
+
+NodeController::ReservedPort::~ReservedPort() {}
+
+NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {}
+
+NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {}
+
NodeController::~NodeController() {}
NodeController::NodeController(Core* core)
@@ -170,25 +182,18 @@ int NodeController::SendMessage(const ports::PortRef& port,
}
void NodeController::ReservePort(const std::string& token,
- const ports::PortRef& port) {
+ const ReservePortCallback& callback) {
+ ports::PortRef port;
+ node_->CreateUninitializedPort(&port);
+
DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
<< token;
base::AutoLock lock(reserved_ports_lock_);
- auto result = reserved_ports_.insert(std::make_pair(token, port));
- DCHECK(result.second);
-}
-
-void NodeController::MergePortIntoParent(const std::string& token,
- const ports::PortRef& port) {
- scoped_refptr<NodeChannel> parent = GetParentChannel();
- if (parent) {
- parent->RequestPortMerge(port.name(), token);
- return;
- }
-
- base::AutoLock lock(pending_port_merges_lock_);
- pending_port_merges_.push_back(std::make_pair(token, port));
+ ReservedPort reservation;
+ reservation.local_port = port;
+ reservation.callback = callback;
+ reserved_ports_.insert(std::make_pair(token, reservation));
}
scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
@@ -205,6 +210,42 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
return buffer;
}
+void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
+ const std::string& token,
+ const base::Closure& callback) {
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
+ base::Unretained(this), local_port, token, callback));
+}
+
+void NodeController::ConnectToRemotePort(
+ const ports::PortRef& local_port,
+ const ports::NodeName& remote_node_name,
+ const ports::PortName& remote_port_name,
+ const base::Closure& callback) {
+ if (remote_node_name == name_) {
+ // It's possible that two different code paths on the node are trying to
+ // bootstrap ports to each other (e.g. in Chrome single-process mode)
+ // without being aware of the fact. In this case we can initialize the port
+ // immediately (which can fail silently if it's already been initialized by
+ // the request on the other side), and invoke |callback|.
+ node_->InitializePort(local_port, name_, remote_port_name);
+ callback.Run();
+ return;
+ }
+
+ PendingRemotePortConnection connection;
+ connection.local_port = local_port;
+ connection.remote_node_name = remote_node_name;
+ connection.remote_port_name = remote_port_name;
+ connection.callback = callback;
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NodeController::ConnectToRemotePortOnIOThread,
+ base::Unretained(this), connection));
+}
+
void NodeController::RequestShutdown(const base::Closure& callback) {
{
base::AutoLock lock(shutdown_lock_);
@@ -263,6 +304,46 @@ void NodeController::ConnectToParentOnIOThread(
bootstrap_parent_channel_->Start();
}
+void NodeController::RequestParentPortConnectionOnIOThread(
+ const ports::PortRef& local_port,
+ const std::string& token,
+ const base::Closure& callback) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (!parent) {
+ PendingPortRequest request;
+ request.token = token;
+ request.local_port = local_port;
+ request.callback = callback;
+ pending_port_requests_.push_back(request);
+ return;
+ }
+
+ pending_parent_port_connections_.insert(
+ std::make_pair(local_port.name(), callback));
+ parent->RequestPortConnection(local_port.name(), token);
+}
+
+void NodeController::ConnectToRemotePortOnIOThread(
+ const PendingRemotePortConnection& connection) {
+ scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name);
+ if (peer) {
+ // It's safe to initialize the port since we already have a channel to its
+ // peer. No need to actually send them a message.
+ int rv = node_->InitializePort(connection.local_port,
+ connection.remote_node_name,
+ connection.remote_port_name);
+ DCHECK_EQ(rv, ports::OK);
+ connection.callback.Run();
+ return;
+ }
+
+ // Save this for later. We'll initialize the port once this peer is added.
+ pending_remote_port_connections_[connection.remote_node_name].push_back(
+ connection);
+}
+
scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
const ports::NodeName& name) {
base::AutoLock lock(peers_lock_);
@@ -333,6 +414,19 @@ void NodeController::AddPeer(const ports::NodeName& name,
channel->PortsMessage(std::move(pending_messages.front()));
pending_messages.pop();
}
+
+ // Complete any pending port connections to this peer.
+ auto connections_it = pending_remote_port_connections_.find(name);
+ if (connections_it != pending_remote_port_connections_.end()) {
+ for (const auto& connection : connections_it->second) {
+ int rv = node_->InitializePort(connection.local_port,
+ connection.remote_node_name,
+ connection.remote_port_name);
+ DCHECK_EQ(rv, ports::OK);
+ connection.callback.Run();
+ }
+ pending_remote_port_connections_.erase(connections_it);
+ }
}
void NodeController::DropPeer(const ports::NodeName& name) {
@@ -522,8 +616,6 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node,
// NOTE: The child does not actually add its parent as a peer until
// receiving an AcceptBrokerClient message from the broker. The parent
// will request that said message be sent upon receiving AcceptParent.
-
- DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
}
void NodeController::OnAcceptParent(const ports::NodeName& from_node,
@@ -671,16 +763,15 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
io_task_runner_);
AddPeer(broker_name, broker, true /* start_channel */);
}
-
AddPeer(parent_name, parent, false /* start_channel */);
- {
- // Complete any port merge requests we have waiting for the parent.
- base::AutoLock lock(pending_port_merges_lock_);
- for (const auto& request : pending_port_merges_)
- parent->RequestPortMerge(request.second.name(), request.first);
- pending_port_merges_.clear();
+ // Resolve any pending port connections to the parent.
+ for (const auto& request : pending_port_requests_) {
+ pending_parent_port_connections_.insert(
+ std::make_pair(request.local_port.name(), request.callback));
+ parent->RequestPortConnection(request.local_port.name(), request.token);
}
+ pending_port_requests_.clear();
// Feed the broker any pending children of our own.
while (!pending_broker_clients.empty()) {
@@ -733,15 +824,16 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
AttemptShutdownIfRequested();
}
-void NodeController::OnRequestPortMerge(
+void NodeController::OnRequestPortConnection(
const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
const std::string& token) {
DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
- DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
+ DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
<< token << " and port " << connector_port_name << "@" << from_node;
+ ReservePortCallback callback;
ports::PortRef local_port;
{
base::AutoLock lock(reserved_ports_lock_);
@@ -751,12 +843,64 @@ void NodeController::OnRequestPortMerge(
<< token;
return;
}
- local_port = it->second;
+ local_port = it->second.local_port;
+ callback = it->second.callback;
+ reserved_ports_.erase(it);
+ }
+
+ DCHECK(!callback.is_null());
+
+ scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
+ if (!peer) {
+ DVLOG(1) << "Ignoring request to connect to port from unknown node "
+ << from_node;
+ return;
}
- int rv = node_->MergePorts(local_port, from_node, connector_port_name);
- if (rv != ports::OK)
- DLOG(ERROR) << "MergePorts failed: " << rv;
+ // This reserved port should not have been initialized yet.
+ CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
+ connector_port_name));
+
+ peer->ConnectToPort(local_port.name(), connector_port_name);
+ callback.Run(local_port);
+}
+
+void NodeController::OnConnectToPort(
+ const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const ports::PortName& connectee_port_name) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
+ << connectee_port_name << " to port " << connector_port_name << "@"
+ << from_node;
+
+ ports::PortRef connectee_port;
+ int rv = node_->GetPort(connectee_port_name, &connectee_port);
+ if (rv != ports::OK) {
+ DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
+ << connectee_port_name;
+ return;
+ }
+
+ // It's OK if this port has already been initialized. This message is only
+ // sent by the remote peer to ensure the port is ready before it starts
+ // us sending messages to it.
+ ports::PortStatus port_status;
+ rv = node_->GetStatus(connectee_port, &port_status);
+ if (rv == ports::OK) {
+ DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
+ << connectee_port_name;
+ return;
+ }
+
+ CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
+ connector_port_name));
+
+ auto it = pending_parent_port_connections_.find(connectee_port_name);
+ DCHECK(it != pending_parent_port_connections_.end());
+ it->second.Run();
+ pending_parent_port_connections_.erase(it);
}
void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h
index 0675e97..35148a0 100644
--- a/mojo/edk/system/node_controller.h
+++ b/mojo/edk/system/node_controller.h
@@ -82,13 +82,42 @@ class NodeController : public ports::NodeDelegate,
int SendMessage(const ports::PortRef& port_ref,
scoped_ptr<PortsMessage>* message);
- // Reserves a local port |port| associated with |token|. A peer holding a copy
- // of |token| can merge one of its own ports into this one.
- void ReservePort(const std::string& token, const ports::PortRef& port);
+ using ReservePortCallback = base::Callback<void(const ports::PortRef& port)>;
- // Merges a local port |port| into a port reserved by |token| in the parent.
- void MergePortIntoParent(const std::string& token,
- const ports::PortRef& port);
+ // Reserves a port associated with |token|. A peer may associate one of their
+ // own ports with this one by sending us a RequestPortConnection message with
+ // the same token value.
+ //
+ // Note that the reservation is made synchronously. In order to avoid races,
+ // reservations should be acquired before |token| is communicated to any
+ // potential peer.
+ //
+ // |callback| must be runnable on any thread and will be run with a reference
+ // to the new local port once connected.
+ void ReservePort(const std::string& token,
+ const ReservePortCallback& callback);
+
+ // Eventually initializes a local port with a parent port peer identified by
+ // |token|. The parent should also have |token| and should alrady have
+ // reserved a port for it. |callback| must be runnable on any thread and will
+ // be run if and when the local port is connected.
+ void ConnectToParentPort(const ports::PortRef& local_port,
+ const std::string& token,
+ const base::Closure& callback);
+
+ // Connects two reserved ports to each other. Useful when two independent
+ // systems in the same (parent) process need to establish a port pair without
+ // any direct knowledge of each other.
+ void ConnectReservedPorts(const std::string& token1,
+ const std::string& token2);
+
+ // Connects a local port to a port on a remote node. Note that a connection to
+ // the remote node need not be established yet. The port will be connected
+ // ASAP, at which point |callback| will be run.
+ void ConnectToRemotePort(const ports::PortRef& local_port,
+ const ports::NodeName& remote_node_name,
+ const ports::PortName& remote_port_name,
+ const base::Closure& callback);
// Creates a new shared buffer for use in the current process.
scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
@@ -109,9 +138,44 @@ class NodeController : public ports::NodeDelegate,
scoped_refptr<NodeChannel>>;
using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
+ // Tracks a pending token-based connection to a parent port.
+ struct PendingPortRequest {
+ PendingPortRequest();
+ ~PendingPortRequest();
+
+ std::string token;
+ ports::PortRef local_port;
+ base::Closure callback;
+ };
+
+ // Tracks a reserved port.
+ struct ReservedPort {
+ ReservedPort();
+ ~ReservedPort();
+
+ ports::PortRef local_port;
+ ReservePortCallback callback;
+ };
+
+ // Tracks a pending connection to a remote port on any peer.
+ struct PendingRemotePortConnection {
+ PendingRemotePortConnection();
+ ~PendingRemotePortConnection();
+
+ ports::PortRef local_port;
+ ports::NodeName remote_node_name;
+ ports::PortName remote_port_name;
+ base::Closure callback;
+ };
+
void ConnectToChildOnIOThread(base::ProcessHandle process_handle,
ScopedPlatformHandle platform_handle);
void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle);
+ void RequestParentPortConnectionOnIOThread(const ports::PortRef& local_port,
+ const std::string& token,
+ const base::Closure& callback);
+ void ConnectToRemotePortOnIOThread(
+ const PendingRemotePortConnection& connection);
scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
scoped_refptr<NodeChannel> GetParentChannel();
@@ -151,9 +215,12 @@ class NodeController : public ports::NodeDelegate,
const ports::NodeName& broker_name,
ScopedPlatformHandle broker_channel) override;
void OnPortsMessage(Channel::MessagePtr message) override;
- void OnRequestPortMerge(const ports::NodeName& from_node,
- const ports::PortName& connector_port_name,
- const std::string& token) override;
+ void OnRequestPortConnection(const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const std::string& token) override;
+ void OnConnectToPort(const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const ports::PortName& connectee_port_name) override;
void OnRequestIntroduction(const ports::NodeName& from_node,
const ports::NodeName& name) override;
void OnIntroduce(const ports::NodeName& from_node,
@@ -197,13 +264,7 @@ class NodeController : public ports::NodeDelegate,
base::Lock reserved_ports_lock_;
// Ports reserved by token.
- base::hash_map<std::string, ports::PortRef> reserved_ports_;
-
- // Guards |pending_port_merges_|.
- base::Lock pending_port_merges_lock_;
-
- // A set of port merge requests awaiting parent connection.
- std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
+ base::hash_map<std::string, ReservedPort> reserved_ports_;
// Guards |parent_name_| and |bootstrap_parent_channel_|.
base::Lock parent_lock_;
@@ -246,6 +307,17 @@ class NodeController : public ports::NodeDelegate,
// Channels to children during handshake.
NodeMap pending_children_;
+ // Port connection requests which have been deferred until we have a parent.
+ std::vector<PendingPortRequest> pending_port_requests_;
+
+ // Port connection requests awaiting a response from the parent.
+ std::unordered_map<ports::PortName, base::Closure>
+ pending_parent_port_connections_;
+
+ // Port connections pending the availability of a remote peer node.
+ std::unordered_map<ports::NodeName, std::vector<PendingRemotePortConnection>>
+ pending_remote_port_connections_;
+
// Indicates whether this object should delete itself on IO thread shutdown.
// Must only be accessed from the IO thread.
bool destroy_on_io_thread_shutdown_ = false;
diff --git a/mojo/edk/system/ports/event.h b/mojo/edk/system/ports/event.h
index 1dc8eab..aec9f6b 100644
--- a/mojo/edk/system/ports/event.h
+++ b/mojo/edk/system/ports/event.h
@@ -34,7 +34,6 @@ enum struct EventType : uint32_t {
kObserveProxy,
kObserveProxyAck,
kObserveClosure,
- kMergePort,
};
struct EventHeader {
@@ -64,11 +63,6 @@ struct ObserveClosureEventData {
uint64_t last_sequence_num;
};
-struct MergePortEventData {
- PortName new_port_name;
- PortDescriptor new_port_descriptor;
-};
-
inline const EventHeader* GetEventHeader(const Message& message) {
return static_cast<const EventHeader*>(message.header_bytes());
}
diff --git a/mojo/edk/system/ports/message.cc b/mojo/edk/system/ports/message.cc
index 2106c15..20f6d88 100644
--- a/mojo/edk/system/ports/message.cc
+++ b/mojo/edk/system/ports/message.cc
@@ -37,9 +37,6 @@ void Message::Parse(const void* bytes,
case EventType::kObserveClosure:
*num_header_bytes = sizeof(EventHeader) + sizeof(ObserveClosureEventData);
break;
- case EventType::kMergePort:
- *num_header_bytes = sizeof(EventHeader) + sizeof(MergePortEventData);
- break;
default:
CHECK(false) << "Bad event type";
return;
diff --git a/mojo/edk/system/ports/name.h b/mojo/edk/system/ports/name.h
index 8a9307d..b0ad0b7 100644
--- a/mojo/edk/system/ports/name.h
+++ b/mojo/edk/system/ports/name.h
@@ -8,7 +8,6 @@
#include <stdint.h>
#include <ostream>
-#include <tuple>
namespace mojo {
namespace edk {
@@ -22,15 +21,9 @@ struct Name {
inline bool operator==(const Name& a, const Name& b) {
return a.v1 == b.v1 && a.v2 == b.v2;
}
-
inline bool operator!=(const Name& a, const Name& b) {
return !(a == b);
}
-
-inline bool operator<(const Name& a, const Name& b) {
- return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2);
-}
-
std::ostream& operator<<(std::ostream& stream, const Name& name);
struct PortName : Name {
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
index 9cfc511..1718306 100644
--- a/mojo/edk/system/ports/node.cc
+++ b/mojo/edk/system/ports/node.cc
@@ -354,39 +354,10 @@ int Node::AcceptMessage(ScopedMessage message) {
return OnObserveClosure(
header->port_name,
GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
- case EventType::kMergePort:
- return OnMergePort(header->port_name,
- *GetEventData<MergePortEventData>(*message));
}
return OOPS(ERROR_NOT_IMPLEMENTED);
}
-int Node::MergePorts(const PortRef& port_ref,
- const NodeName& destination_node_name,
- const PortName& destination_port_name) {
- Port* port = port_ref.port();
- {
- // |ports_lock_| must be held for WillSendPort_Locked below.
- base::AutoLock ports_lock(ports_lock_);
- base::AutoLock lock(port->lock);
-
- DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
- << " to " << destination_port_name << "@" << destination_node_name;
-
- // Send the port-to-merge over to the destination node so it can be merged
- // into the port cycle atomically there.
- MergePortEventData data;
- data.new_port_name = port_ref.name();
- WillSendPort_Locked(port, destination_node_name, &data.new_port_name,
- &data.new_port_descriptor);
- delegate_->ForwardMessage(
- destination_node_name,
- NewInternalMessage(destination_port_name,
- EventType::kMergePort, data));
- }
- return OK;
-}
-
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
@@ -538,8 +509,34 @@ int Node::OnPortAccepted(const PortName& port_name) {
<< " pointing to "
<< port->peer_port_name << "@" << port->peer_node_name;
- return BeginProxying_Locked(port.get(), port_name);
+ if (port->state != Port::kBuffering)
+ return OOPS(ERROR_PORT_STATE_UNEXPECTED);
+
+ port->state = Port::kProxying;
+
+ int rv = ForwardMessages_Locked(port.get(), port_name);
+ if (rv != OK)
+ return rv;
+
+ // We may have observed closure before receiving PortAccepted. In that
+ // case, we can advance to removing the proxy without sending out an
+ // ObserveProxy message. We already know the last expected message, etc.
+
+ if (port->remove_proxy_on_last_message) {
+ MaybeRemoveProxy_Locked(port.get(), port_name);
+
+ // Make sure we propagate closure to our current peer.
+ ObserveClosureEventData data;
+ data.last_sequence_num = port->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port->peer_node_name,
+ NewInternalMessage(port->peer_port_name,
+ EventType::kObserveClosure, data));
+ } else {
+ InitiateProxyRemoval_Locked(port.get(), port_name);
+ }
}
+ return OK;
}
int Node::OnObserveProxy(const PortName& port_name,
@@ -724,94 +721,6 @@ int Node::OnObserveClosure(const PortName& port_name,
return OK;
}
-int Node::OnMergePort(const PortName& port_name,
- const MergePortEventData& event) {
- scoped_refptr<Port> port = GetPort(port_name);
- if (!port)
- return ERROR_PORT_UNKNOWN;
-
- DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
- << port->state << ") merging with proxy " << event.new_port_name
- << "@" << name_ << " pointing to "
- << event.new_port_descriptor.peer_port_name << "@"
- << event.new_port_descriptor.peer_node_name << " referred by "
- << event.new_port_descriptor.referring_port_name << "@"
- << event.new_port_descriptor.referring_node_name;
-
- bool close_target_port = false;
- bool close_new_port = false;
-
- // Accept the new port. This is now the receiving end of the other port cycle
- // to be merged with ours.
- int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
- if (rv != OK) {
- close_target_port = true;
- } else {
- // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
- // needs to hold |ports_lock_|. We also acquire multiple port locks within.
- base::AutoLock ports_lock(ports_lock_);
- base::AutoLock lock(port->lock);
-
- if (port->state != Port::kReceiving) {
- close_new_port = true;
- } else {
- scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
- base::AutoLock new_port_lock(new_port->lock);
- DCHECK(new_port->state == Port::kReceiving);
-
- // Both ports are locked. Now all we have to do is swap their peer
- // information and set them up as proxies.
-
- std::swap(port->peer_node_name, new_port->peer_node_name);
- std::swap(port->peer_port_name, new_port->peer_port_name);
- std::swap(port->peer_closed, new_port->peer_closed);
-
- port->state = Port::kBuffering;
- if (port->peer_closed)
- port->remove_proxy_on_last_message = true;
-
- new_port->state = Port::kBuffering;
- if (new_port->peer_closed)
- new_port->remove_proxy_on_last_message = true;
-
- int rv1 = BeginProxying_Locked(port.get(), port_name);
- int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
-
- if (rv1 == OK && rv2 == OK)
- return OK;
-
- // If either proxy failed to initialize (e.g. had undeliverable messages
- // or ended up in a bad state somehow), we keep the system in a consistent
- // state by undoing the peer swap and closing both merge ports.
-
- std::swap(port->peer_node_name, new_port->peer_node_name);
- std::swap(port->peer_port_name, new_port->peer_port_name);
- port->state = Port::kReceiving;
- new_port->state = Port::kReceiving;
- close_new_port = true;
- close_target_port = true;
- }
- }
-
- if (close_target_port) {
- PortRef target_port;
- rv = GetPort(port_name, &target_port);
- DCHECK(rv == OK);
-
- ClosePort(target_port);
- }
-
- if (close_new_port) {
- PortRef new_port;
- rv = GetPort(event.new_port_name, &new_port);
- DCHECK(rv == OK);
-
- ClosePort(new_port);
- }
-
- return ERROR_PORT_STATE_UNEXPECTED;
-}
-
int Node::AddPortWithName(const PortName& port_name,
const scoped_refptr<Port>& port) {
base::AutoLock lock(ports_lock_);
@@ -999,40 +908,6 @@ int Node::WillSendMessage_Locked(Port* port,
return OK;
}
-int Node::BeginProxying_Locked(Port* port, const PortName& port_name) {
- ports_lock_.AssertAcquired();
- port->lock.AssertAcquired();
-
- if (port->state != Port::kBuffering)
- return OOPS(ERROR_PORT_STATE_UNEXPECTED);
-
- port->state = Port::kProxying;
-
- int rv = ForwardMessages_Locked(port, port_name);
- if (rv != OK)
- return rv;
-
- // We may have observed closure while buffering. In that case, we can advance
- // to removing the proxy without sending out an ObserveProxy message. We
- // already know the last expected message, etc.
-
- if (port->remove_proxy_on_last_message) {
- MaybeRemoveProxy_Locked(port, port_name);
-
- // Make sure we propagate closure to our current peer.
- ObserveClosureEventData data;
- data.last_sequence_num = port->last_sequence_num_to_receive;
- delegate_->ForwardMessage(
- port->peer_node_name,
- NewInternalMessage(port->peer_port_name,
- EventType::kObserveClosure, data));
- } else {
- InitiateProxyRemoval_Locked(port, port_name);
- }
-
- return OK;
-}
-
int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) {
ports_lock_.AssertAcquired();
port->lock.AssertAcquired();
diff --git a/mojo/edk/system/ports/node.h b/mojo/edk/system/ports/node.h
index cb1d5fd..410d53b 100644
--- a/mojo/edk/system/ports/node.h
+++ b/mojo/edk/system/ports/node.h
@@ -122,21 +122,6 @@ class Node {
// Corresponding to NodeDelegate::ForwardMessage.
int AcceptMessage(ScopedMessage message);
- // Called to merge two ports with each other. If you have two independent
- // port pairs A <=> B and C <=> D, the net result of merging B and C is a
- // single connected port pair A <=> D.
- //
- // Note that the behavior of this operation is undefined if either port to be
- // merged (B or C above) has ever been read from or written to directly, and
- // this must ONLY be called on one side of the merge, though it doesn't matter
- // which side.
- //
- // It is safe for the non-merged peers (A and D above) to be transferred,
- // closed, and/or written to before, during, or after the merge.
- int MergePorts(const PortRef& port_ref,
- const NodeName& destination_node_name,
- const PortName& destination_port_name);
-
// Called to inform this node that communication with another node is lost
// indefinitely. This triggers cleanup of ports bound to this node.
int LostConnectionToNode(const NodeName& node_name);
@@ -148,7 +133,6 @@ class Node {
const ObserveProxyEventData& event);
int OnObserveProxyAck(const PortName& port_name, uint64_t last_sequence_num);
int OnObserveClosure(const PortName& port_name, uint64_t last_sequence_num);
- int OnMergePort(const PortName& port_name, const MergePortEventData& event);
int AddPortWithName(const PortName& port_name,
const scoped_refptr<Port>& port);
@@ -167,7 +151,6 @@ class Node {
int WillSendMessage_Locked(Port* port,
const PortName& port_name,
Message* message);
- int BeginProxying_Locked(Port* port, const PortName& port_name);
int ForwardMessages_Locked(Port* port, const PortName& port_name);
void InitiateProxyRemoval_Locked(Port* port, const PortName& port_name);
void MaybeRemoveProxy_Locked(Port* port, const PortName& port_name);
diff --git a/mojo/edk/system/ports/ports_unittest.cc b/mojo/edk/system/ports/ports_unittest.cc
index 1bdca3f..95e2a63 100644
--- a/mojo/edk/system/ports/ports_unittest.cc
+++ b/mojo/edk/system/ports/ports_unittest.cc
@@ -690,12 +690,32 @@ TEST_F(PortsTest, SendUninitialized) {
Node node0(node0_name, &node0_delegate);
node_map[0] = &node0;
- PortRef x0;
+ NodeName node1_name(1, 1);
+ TestNodeDelegate node1_delegate(node1_name);
+ Node node1(node1_name, &node1_delegate);
+ node_map[1] = &node1;
+
+ // Begin to setup a pipe between node0 and node1, but don't initialize either
+ // endpoint.
+ PortRef x0, x1;
EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
+
+ node0_delegate.set_save_messages(true);
+ node1_delegate.set_save_messages(true);
+
+ // Send a message on each port and expect neither to arrive yet.
+
EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
SendStringMessage(&node0, x0, "oops"));
+ EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
+ SendStringMessage(&node1, x1, "oh well"));
+
EXPECT_EQ(OK, node0.ClosePort(x0));
+ EXPECT_EQ(OK, node1.ClosePort(x1));
+
EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node1.CanShutdownCleanly(false));
}
TEST_F(PortsTest, SendFailure) {
@@ -1048,322 +1068,6 @@ TEST_F(PortsTest, SendWithClosedPeerSent) {
EXPECT_TRUE(node0.CanShutdownCleanly(false));
}
-TEST_F(PortsTest, MergePorts) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- // Setup two independent port pairs, A-B on node0 and C-D on node1.
- PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_read_messages(false);
- node1_delegate.set_save_messages(true);
-
- // Write a message on A.
- EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
-
- PumpTasks();
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- PumpTasks();
-
- // Expect only two receiving ports to be left after pumping tasks.
- EXPECT_TRUE(node0.CanShutdownCleanly(true));
- EXPECT_TRUE(node1.CanShutdownCleanly(true));
-
- // Expect D to have received the message sent on A.
- ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
-
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(D));
-
- // No more ports should be open.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-}
-
-TEST_F(PortsTest, MergePortWithClosedPeer1) {
- // This tests that the right thing happens when initiating a merge on a port
- // whose peer has already been closed.
-
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- // Setup two independent port pairs, A-B on node0 and C-D on node1.
- PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_read_messages(false);
- node1_delegate.set_save_messages(true);
-
- // Write a message on A.
- EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
-
- PumpTasks();
-
- // Close A.
- EXPECT_EQ(OK, node0.ClosePort(A));
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- PumpTasks();
-
- // Expect only one receiving port to be left after pumping tasks.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(true));
-
- // Expect D to have received the message sent on A.
- ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
-
- EXPECT_EQ(OK, node1.ClosePort(D));
-
- // No more ports should be open.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-}
-
-TEST_F(PortsTest, MergePortWithClosedPeer2) {
- // This tests that the right thing happens when merging into a port whose peer
- // has already been closed.
-
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- // Setup two independent port pairs, A-B on node0 and C-D on node1.
- PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_read_messages(false);
-
- // Write a message on D.
- EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
-
- PumpTasks();
-
- // Close D.
- EXPECT_EQ(OK, node1.ClosePort(D));
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- PumpTasks();
-
- // Expect only one receiving port to be left after pumping tasks.
- EXPECT_TRUE(node0.CanShutdownCleanly(true));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-
- // Expect A to have received the message sent on D.
- ScopedMessage message;
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
-
- EXPECT_EQ(OK, node0.ClosePort(A));
-
- // No more ports should be open.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-}
-
-TEST_F(PortsTest, MergePortsWithClosedPeers) {
- // This tests that no residual ports are left behind if two ports are merged
- // when both of their peers have been closed.
-
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- // Setup two independent port pairs, A-B on node0 and C-D on node1.
- PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_read_messages(false);
-
- // Close A and D.
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(D));
-
- PumpTasks();
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- PumpTasks();
-
- // Expect everything to have gone away.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-}
-
-TEST_F(PortsTest, MergePortsWithMovedPeers) {
- // This tests that no ports can be merged successfully even if their peers
- // are moved around.
-
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_read_messages(false);
-
- // Setup two independent port pairs, A-B on node0 and C-D on node1.
- PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- // Set up another pair X-Y for moving ports on node0.
- PortRef X, Y;
- EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
-
- ScopedMessage message;
-
- // Move A to new port E.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- ASSERT_EQ(1u, message->num_ports());
- PortRef E;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
-
- EXPECT_EQ(OK, node0.ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(Y));
-
- node0_delegate.set_read_messages(false);
-
- // Write messages on E and D.
- EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
- EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- node0_delegate.set_read_messages(true);
- node1_delegate.set_read_messages(true);
- node1_delegate.set_save_messages(true);
-
- PumpTasks();
-
- // Expect to receive D's message on E and E's message on D.
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hi", ToString(message)));
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
-
- // Close E and D.
- EXPECT_EQ(OK, node0.ClosePort(E));
- EXPECT_EQ(OK, node1.ClosePort(D));
-
- PumpTasks();
-
- // Expect everything to have gone away.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-}
-
-
-TEST_F(PortsTest, MergePortsFailsGracefully) {
- // This tests that the system remains in a well-defined state if something
- // goes wrong during port merge.
-
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- // Setup two independent port pairs, A-B on node0 and C-D on node1.
- PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- PumpTasks();
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- // Move C to a new port E. This is dumb and nobody should do it, but it's
- // possible. MergePorts will fail as a result because C won't be in a
- // receiving state when the event arrives at node1, so B should be closed.
- ScopedMessage message;
- PortRef X, Y;
- EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
- node1_delegate.set_save_messages(true);
- EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- ASSERT_EQ(1u, message->num_ports());
- PortRef E;
- ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
- EXPECT_EQ(OK, node1.ClosePort(X));
- EXPECT_EQ(OK, node1.ClosePort(Y));
-
- // C goes away as a result of normal proxy removal.
- PumpTasks();
-
- EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
-
- // B should have been closed cleanly.
- EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
-
- // Close A, D, and E.
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(D));
- EXPECT_EQ(OK, node1.ClosePort(E));
-
- PumpTasks();
-
- // Expect everything to have gone away.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
-}
-
} // namespace test
} // namespace ports
} // namespace edk
diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.cc b/mojo/edk/system/remote_message_pipe_bootstrap.cc
index 2342166..671542f3 100644
--- a/mojo/edk/system/remote_message_pipe_bootstrap.cc
+++ b/mojo/edk/system/remote_message_pipe_bootstrap.cc
@@ -31,17 +31,18 @@ struct BootstrapData {
void RemoteMessagePipeBootstrap::Create(
NodeController* node_controller,
ScopedPlatformHandle platform_handle,
- const ports::PortRef& port) {
+ const ports::PortRef& port,
+ const base::Closure& callback) {
if (node_controller->io_task_runner()->RunsTasksOnCurrentThread()) {
// Owns itself.
- new RemoteMessagePipeBootstrap(
- node_controller, std::move(platform_handle), port);
+ new RemoteMessagePipeBootstrap(node_controller, std::move(platform_handle),
+ port, callback);
} else {
node_controller->io_task_runner()->PostTask(
FROM_HERE,
base::Bind(&RemoteMessagePipeBootstrap::Create,
base::Unretained(node_controller),
- base::Passed(&platform_handle), port));
+ base::Passed(&platform_handle), port, callback));
}
}
@@ -55,9 +56,11 @@ RemoteMessagePipeBootstrap::~RemoteMessagePipeBootstrap() {
RemoteMessagePipeBootstrap::RemoteMessagePipeBootstrap(
NodeController* node_controller,
ScopedPlatformHandle platform_handle,
- const ports::PortRef& port)
+ const ports::PortRef& port,
+ const base::Closure& callback)
: node_controller_(node_controller),
local_port_(port),
+ callback_(callback),
io_task_runner_(base::ThreadTaskRunnerHandle::Get()),
channel_(Channel::Create(this, std::move(platform_handle),
io_task_runner_)) {
@@ -116,14 +119,8 @@ void RemoteMessagePipeBootstrap::OnChannelMessage(
}
peer_info_received_ = true;
-
- // We need to choose one side to initiate the port merge. It doesn't matter
- // who does it as long as they don't both try. Simple solution: pick the one
- // with the "smaller" port name.
- if (local_port_.name() < data->port_name) {
- node_controller_->node()->MergePorts(local_port_, data->node_name,
- data->port_name);
- }
+ node_controller_->ConnectToRemotePort(
+ local_port_, data->node_name, data->port_name, callback_);
// Send another ping to the other end to trigger shutdown. This may race with
// the other end sending its own ping, but it doesn't matter. Whoever wins
diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.h b/mojo/edk/system/remote_message_pipe_bootstrap.h
index 2a24680..911d518 100644
--- a/mojo/edk/system/remote_message_pipe_bootstrap.h
+++ b/mojo/edk/system/remote_message_pipe_bootstrap.h
@@ -7,6 +7,7 @@
#include <string>
+#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
@@ -27,20 +28,17 @@ class NodeController;
//
// The bootstrapping procedure the same on either end:
//
-// 1. Select a local port P to be merged with a remote port.
+// 1. Create a local port P.
// 2. Write the local node name and P's name to the bootstrap pipe.
// 3. When a message is read from the pipe:
// - If it's the first message read, extract the remote node+port name and
-// and send an empty ACK message on the pipe.
+// initialize the local port. Send an empty ACK message on the pipe.
// - If it's the second message read, close the channel, and delete |this|.
// 4. When an error occus on the pipe, delete |this|.
//
// Excluding irrecoverable error conditions such as either process dying,
// armageddon, etc., this ensures neither end closes the channel until both ends
-// are aware of each other's port-to-merge.
-//
-// At step 3, one side of the channel is chosen to issue a message at the Ports
-// layer which eventually merges the two ports.
+// have intiailized their corresponding local port.
class RemoteMessagePipeBootstrap
: public Channel::Delegate,
public base::MessageLoop::DestructionObserver {
@@ -50,18 +48,22 @@ class RemoteMessagePipeBootstrap
// |port| must be a reference to an uninitialized local port.
static void Create(NodeController* node_controller,
ScopedPlatformHandle platform_handle,
- const ports::PortRef& port);
+ const ports::PortRef& port,
+ const base::Closure& callback);
protected:
- explicit RemoteMessagePipeBootstrap(NodeController* node_controller,
- ScopedPlatformHandle platform_handle,
- const ports::PortRef& port);
+ explicit RemoteMessagePipeBootstrap(
+ NodeController* node_controller,
+ ScopedPlatformHandle platform_handle,
+ const ports::PortRef& port,
+ const base::Closure& callback);
void ShutDown();
bool shutting_down_ = false;
NodeController* node_controller_;
const ports::PortRef local_port_;
+ base::Closure callback_;
scoped_refptr<base::TaskRunner> io_task_runner_;
scoped_refptr<Channel> channel_;
diff --git a/mojo/shell/runner/child/runner_connection.cc b/mojo/shell/runner/child/runner_connection.cc
index 313c00a..75b4648 100644
--- a/mojo/shell/runner/child/runner_connection.cc
+++ b/mojo/shell/runner/child/runner_connection.cc
@@ -74,6 +74,13 @@ class Blocker {
using GotApplicationRequestCallback =
base::Callback<void(InterfaceRequest<mojom::ShellClient>)>;
+void OnCreateMessagePipe(ScopedMessagePipeHandle* result,
+ Blocker::Unblocker unblocker,
+ ScopedMessagePipeHandle pipe) {
+ *result = std::move(pipe);
+ unblocker.Unblock(base::Bind(&base::DoNothing));
+}
+
void OnGotApplicationRequest(InterfaceRequest<mojom::ShellClient>* out_request,
InterfaceRequest<mojom::ShellClient> request) {
*out_request = std::move(request);
@@ -222,11 +229,14 @@ bool RunnerConnectionImpl::WaitForApplicationRequest(
std::string primordial_pipe_token =
base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
switches::kPrimordialPipeToken);
- handle = edk::CreateChildMessagePipe(primordial_pipe_token);
+ Blocker blocker;
+ edk::CreateChildMessagePipe(
+ primordial_pipe_token,
+ base::Bind(&OnCreateMessagePipe, base::Unretained(&handle),
+ blocker.GetUnblocker()));
+ blocker.Block();
}
- DCHECK(handle.is_valid());
-
Blocker blocker;
controller_runner_->PostTask(
FROM_HERE,
diff --git a/mojo/shell/runner/host/child_process.cc b/mojo/shell/runner/host/child_process.cc
index 75230f0..0208b84 100644
--- a/mojo/shell/runner/host/child_process.cc
+++ b/mojo/shell/runner/host/child_process.cc
@@ -51,6 +51,10 @@ namespace shell {
namespace {
+void DidCreateChannel(embedder::ChannelInfo* channel_info) {
+ DVLOG(2) << "ChildControllerImpl::DidCreateChannel()";
+}
+
// Blocker ---------------------------------------------------------------------
// Blocks a thread until another thread unblocks it, at which point it unblocks
@@ -296,14 +300,34 @@ scoped_ptr<mojo::shell::LinuxSandbox> InitializeSandbox() {
}
#endif
-ScopedMessagePipeHandle InitializeHostMessagePipe(
- edk::ScopedPlatformHandle platform_channel,
- scoped_refptr<base::TaskRunner> io_task_runner) {
- edk::SetParentPipeHandle(std::move(platform_channel));
- std::string primordial_pipe_token =
- base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
- switches::kPrimordialPipeToken);
- return edk::CreateChildMessagePipe(primordial_pipe_token);
+void InitializeHostMessagePipe(
+ embedder::ScopedPlatformHandle platform_channel,
+ scoped_refptr<base::TaskRunner> io_task_runner,
+ const base::Callback<void(ScopedMessagePipeHandle)>& callback) {
+ if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
+ embedder::SetParentPipeHandle(std::move(platform_channel));
+ std::string primordial_pipe_token =
+ base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ switches::kPrimordialPipeToken);
+ edk::CreateChildMessagePipe(primordial_pipe_token, callback);
+ } else {
+ ScopedMessagePipeHandle host_message_pipe;
+ host_message_pipe =
+ embedder::CreateChannel(std::move(platform_channel),
+ base::Bind(&DidCreateChannel), io_task_runner);
+ callback.Run(std::move(host_message_pipe));
+ }
+}
+
+void OnHostMessagePipeCreated(AppContext* app_context,
+ base::NativeLibrary app_library,
+ const Blocker::Unblocker& unblocker,
+ ScopedMessagePipeHandle pipe) {
+ app_context->controller_runner()->PostTask(
+ FROM_HERE,
+ base::Bind(&ChildControllerImpl::Init, base::Unretained(app_context),
+ base::Unretained(app_library), base::Passed(&pipe),
+ unblocker));
}
} // namespace
@@ -336,8 +360,8 @@ int ChildProcessMain() {
sandbox = InitializeSandbox();
#endif
- edk::ScopedPlatformHandle platform_channel =
- edk::PlatformChannelPair::PassClientHandleFromParentProcess(
+ embedder::ScopedPlatformHandle platform_channel =
+ embedder::PlatformChannelPair::PassClientHandleFromParentProcess(
command_line);
CHECK(platform_channel.is_valid());
@@ -348,12 +372,13 @@ int ChildProcessMain() {
app_context.Init();
app_context.StartControllerThread();
- ScopedMessagePipeHandle host_pipe = InitializeHostMessagePipe(
- std::move(platform_channel), app_context.io_runner());
app_context.controller_runner()->PostTask(
FROM_HERE,
- base::Bind(&ChildControllerImpl::Init, &app_context, app_library,
- base::Passed(&host_pipe), blocker.GetUnblocker()));
+ base::Bind(
+ &InitializeHostMessagePipe, base::Passed(&platform_channel),
+ make_scoped_refptr(app_context.io_runner()),
+ base::Bind(&OnHostMessagePipeCreated, base::Unretained(&app_context),
+ base::Unretained(app_library), blocker.GetUnblocker())));
// This will block, then run whatever the controller wants.
blocker.Block();
diff --git a/mojo/shell/runner/host/child_process_host.cc b/mojo/shell/runner/host/child_process_host.cc
index 2b3cfc9..9a5b439 100644
--- a/mojo/shell/runner/host/child_process_host.cc
+++ b/mojo/shell/runner/host/child_process_host.cc
@@ -35,24 +35,55 @@
namespace mojo {
namespace shell {
+ChildProcessHost::PipeHolder::PipeHolder() {}
+
+void ChildProcessHost::PipeHolder::Reject() {
+ base::AutoLock lock(lock_);
+ reject_pipe_ = true;
+ pipe_.reset();
+}
+
+void ChildProcessHost::PipeHolder::SetPipe(ScopedMessagePipeHandle pipe) {
+ base::AutoLock lock(lock_);
+ DCHECK(!pipe_.is_valid());
+ if (!reject_pipe_)
+ pipe_ = std::move(pipe);
+}
+
+ScopedMessagePipeHandle ChildProcessHost::PipeHolder::PassPipe() {
+ base::AutoLock lock(lock_);
+ DCHECK(pipe_.is_valid());
+ return std::move(pipe_);
+}
+
+ChildProcessHost::PipeHolder::~PipeHolder() {}
+
ChildProcessHost::ChildProcessHost(base::TaskRunner* launch_process_runner,
bool start_sandboxed,
const base::FilePath& app_path)
: launch_process_runner_(launch_process_runner),
start_sandboxed_(start_sandboxed),
app_path_(app_path),
+ channel_info_(nullptr),
start_child_process_event_(false, false),
weak_factory_(this) {
- node_channel_.reset(new edk::PlatformChannelPair);
- primordial_pipe_token_ = edk::GenerateRandomToken();
- controller_.Bind(
- InterfacePtrInfo<mojom::ChildController>(
- edk::CreateParentMessagePipe(primordial_pipe_token_), 0u));
+ pipe_holder_ = new PipeHolder();
+ if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
+ node_channel_.reset(new edk::PlatformChannelPair);
+ primordial_pipe_token_ = edk::GenerateRandomToken();
+ } else {
+ pipe_holder_->SetPipe(embedder::CreateChannel(
+ platform_channel_pair_.PassServerHandle(),
+ base::Bind(&ChildProcessHost::DidCreateChannel, base::Unretained(this)),
+ base::ThreadTaskRunnerHandle::Get()));
+ OnMessagePipeCreated();
+ }
}
ChildProcessHost::ChildProcessHost(ScopedHandle channel)
: launch_process_runner_(nullptr),
start_sandboxed_(false),
+ channel_info_(nullptr),
start_child_process_event_(false, false),
weak_factory_(this) {
CHECK(channel.is_valid());
@@ -68,11 +99,40 @@ ChildProcessHost::~ChildProcessHost() {
void ChildProcessHost::Start(const ProcessReadyCallback& callback) {
DCHECK(!child_process_.IsValid());
+ DCHECK(process_ready_callback_.is_null());
+
+ process_ready_callback_ = callback;
+ if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) {
+ // With the new EDK, bootstrap message pipes are created asynchronously.
+ // We recieve the bound pipe (if successful) on an arbitrary thread,
+ // stash it in the thread-safe |pipe_holder_|, and then try to call
+ // OnMessagePipeCreated() on the host's main thread.
+ //
+ // Because of the way the launcher process shuts down, it's possible for
+ // the main thread's MessageLoop to stop running (but not yet be destroyed!)
+ // while this boostrap is pending, resulting in OnMessagePipeCreated() never
+ // being called.
+ //
+ // A typical child process (i.e. one using ShellConnection to bind the other
+ // end of this pipe) may hang forever waiting for an Initialize() message
+ // unless the pipe is closed. This in turn means that Join() could hang
+ // waiting for the process to exit. Deadlock!
+ //
+ // |pipe_holder_| exists for this reason. If it's still holding onto the
+ // pipe when Join() is called, the pipe will be closed.
+ DCHECK(!primordial_pipe_token_.empty());
+ edk::CreateParentMessagePipe(
+ primordial_pipe_token_,
+ base::Bind(&OnParentMessagePipeCreated, pipe_holder_,
+ base::ThreadTaskRunnerHandle::Get(),
+ base::Bind(&ChildProcessHost::OnMessagePipeCreated,
+ weak_factory_.GetWeakPtr())));
+ }
+
launch_process_runner_->PostTaskAndReply(
FROM_HERE,
base::Bind(&ChildProcessHost::DoLaunch, base::Unretained(this)),
- base::Bind(&ChildProcessHost::DidStart,
- weak_factory_.GetWeakPtr(), callback));
+ base::Bind(&ChildProcessHost::DidStart, weak_factory_.GetWeakPtr()));
}
int ChildProcessHost::Join() {
@@ -82,6 +142,10 @@ int ChildProcessHost::Join() {
controller_ = mojom::ChildControllerPtr();
DCHECK(child_process_.IsValid());
+ // Ensure the child pipe is closed even if it wasn't yet connected to the
+ // controller.
+ pipe_holder_->Reject();
+
int rv = -1;
LOG_IF(ERROR, !child_process_.WaitForExit(&rv))
<< "Failed to wait for child process";
@@ -108,11 +172,11 @@ void ChildProcessHost::ExitNow(int32_t exit_code) {
controller_->ExitNow(exit_code);
}
-void ChildProcessHost::DidStart(const ProcessReadyCallback& callback) {
+void ChildProcessHost::DidStart() {
DVLOG(2) << "ChildProcessHost::DidStart()";
if (child_process_.IsValid()) {
- callback.Run(child_process_.Pid());
+ MaybeNotifyProcessReady();
} else {
LOG(ERROR) << "Failed to start child process";
AppCompleted(MOJO_RESULT_UNKNOWN);
@@ -214,5 +278,34 @@ void ChildProcessHost::AppCompleted(int32_t result) {
}
}
+void ChildProcessHost::DidCreateChannel(embedder::ChannelInfo* channel_info) {
+ DVLOG(2) << "AppChildProcessHost::DidCreateChannel()";
+
+ DCHECK(channel_info ||
+ base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk"));
+ channel_info_ = channel_info;
+}
+
+void ChildProcessHost::OnMessagePipeCreated() {
+ controller_.Bind(
+ InterfacePtrInfo<mojom::ChildController>(pipe_holder_->PassPipe(), 0u));
+ MaybeNotifyProcessReady();
+}
+
+void ChildProcessHost::MaybeNotifyProcessReady() {
+ if (controller_.is_bound() && child_process_.IsValid())
+ process_ready_callback_.Run(child_process_.Pid());
+}
+
+// static
+void ChildProcessHost::OnParentMessagePipeCreated(
+ scoped_refptr<PipeHolder> holder,
+ scoped_refptr<base::TaskRunner> callback_task_runner,
+ const base::Closure& callback,
+ ScopedMessagePipeHandle pipe) {
+ holder->SetPipe(std::move(pipe));
+ callback_task_runner->PostTask(FROM_HERE, callback);
+}
+
} // namespace shell
} // namespace mojo
diff --git a/mojo/shell/runner/host/child_process_host.h b/mojo/shell/runner/host/child_process_host.h
index 2664d30..906a1a2 100644
--- a/mojo/shell/runner/host/child_process_host.h
+++ b/mojo/shell/runner/host/child_process_host.h
@@ -71,24 +71,70 @@ class ChildProcessHost {
void ExitNow(int32_t exit_code);
protected:
- void DidStart(const ProcessReadyCallback& callback);
+ void DidStart();
private:
+ // A thread-safe holder for the bootstrap message pipe to this child process.
+ // The pipe is established on an arbitrary thread and may not be connected
+ // until the host's message loop has stopped running.
+ class PipeHolder : public base::RefCountedThreadSafe<PipeHolder> {
+ public:
+ PipeHolder();
+
+ void Reject();
+ void SetPipe(ScopedMessagePipeHandle pipe);
+ ScopedMessagePipeHandle PassPipe();
+
+ private:
+ friend class base::RefCountedThreadSafe<PipeHolder>;
+
+ ~PipeHolder();
+
+ base::Lock lock_;
+ bool reject_pipe_ = false;
+ ScopedMessagePipeHandle pipe_;
+
+ DISALLOW_COPY_AND_ASSIGN(PipeHolder);
+ };
+
void DoLaunch();
void AppCompleted(int32_t result);
+ // Callback for |embedder::CreateChannel()|.
+ void DidCreateChannel(embedder::ChannelInfo* channel_info);
+
+ // Called once |pipe_holder_| is bound to a pipe.
+ void OnMessagePipeCreated();
+
+ // Called when the child process is launched and when the bootstrap
+ // message pipe is created. Once both things have happened (which may happen
+ // in either order), |process_ready_callback_| is invoked.
+ void MaybeNotifyProcessReady();
+
+ // Callback used to receive the child message pipe from the ports EDK.
+ // This may be called on any thread. It will always stash the pipe in
+ // |holder|, and it will then attempt to call |callback| on
+ // |callback_task_runner| (which may or may not still be running tasks.)
+ static void OnParentMessagePipeCreated(
+ scoped_refptr<PipeHolder> holder,
+ scoped_refptr<base::TaskRunner> callback_task_runner,
+ const base::Closure& callback,
+ ScopedMessagePipeHandle pipe);
+
scoped_refptr<base::TaskRunner> launch_process_runner_;
bool start_sandboxed_;
const base::FilePath app_path_;
base::Process child_process_;
// Used for the ChildController binding.
- edk::PlatformChannelPair platform_channel_pair_;
+ embedder::PlatformChannelPair platform_channel_pair_;
mojom::ChildControllerPtr controller_;
+ embedder::ChannelInfo* channel_info_;
mojom::ChildController::StartAppCallback on_app_complete_;
- edk::HandlePassingInformation handle_passing_info_;
+ embedder::HandlePassingInformation handle_passing_info_;
- // Used to back the NodeChannel between the parent and child node.
+ // Used only when --use-new-edk is specified. Used to back the NodeChannel
+ // between the parent and child node.
scoped_ptr<edk::PlatformChannelPair> node_channel_;
// Since Start() calls a method on another thread, we use an event to block
@@ -98,6 +144,14 @@ class ChildProcessHost {
// A token the child can use to connect a primordial pipe to the host.
std::string primordial_pipe_token_;
+ // Holds the message pipe to the child process until it is either closed or
+ // bound to the controller interface.
+ scoped_refptr<PipeHolder> pipe_holder_;
+
+ // Invoked exactly once, as soon as the child process's ID is known and
+ // a pipe to the child has been established.
+ ProcessReadyCallback process_ready_callback_;
+
base::WeakPtrFactory<ChildProcessHost> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ChildProcessHost);