diff options
author | blundell <blundell@chromium.org> | 2016-02-09 02:26:54 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-02-09 10:27:56 +0000 |
commit | c8588be061339ced9a34d3d7e3319daf7eee241d (patch) | |
tree | c51d3e41c3d4029ba97d47588867e1e9a2f085ae | |
parent | 1f071893d22e2260b5ebc6c97a02626a9ca945fe (diff) | |
download | chromium_src-c8588be061339ced9a34d3d7e3319daf7eee241d.zip chromium_src-c8588be061339ced9a34d3d7e3319daf7eee241d.tar.gz chromium_src-c8588be061339ced9a34d3d7e3319daf7eee241d.tar.bz2 |
Revert of [mojo-edk] Simplify multiprocess pipe bootstrap (patchset #7 id:140001 of https://codereview.chromium.org/1675603002/ )
Reason for revert:
This patch broke several browsertests on Linux: PlatformAppBrowserTest.LoadAndLaunchAppChromeRunning, PlatformAppBrowserTest.LoadAndLaunchAppWithFile, PolicyMakeDefaultBrowserTest.MakeDefaultDisabled (e.g., https://build.chromium.org/p/chromium.linux/buildstatus?builder=Linux%20Tests%20%28dbg%29%281%29%2832%29&number=25685). I could repro the failures locally, and reverting this patch fixed them.
Example failure:
PlatformAppBrowserTest.LoadAndLaunchAppChromeRunning (run #1):
[ RUN ] PlatformAppBrowserTest.LoadAndLaunchAppChromeRunning
<snip>
[26865:26901:0209/004101:FATAL:thread.cc(270)] Check failed: GetThreadWasQuitProperly().
#0 0x0000f7142d94 base::debug::StackTrace::StackTrace()
#1 0x0000f71a998f logging::LogMessage::~LogMessage()
#2 0x0000f72cc94f base::Thread::ThreadMain()
#3 0x0000f72b487a base::(anonymous namespace)::ThreadFunc()
#4 0x0000e749dd4c start_thread
#5 0x0000e6a7cb8e clone
Original issue's description:
> [mojo-edk] Simplify multiprocess pipe bootstrap
>
> This introduces a new MergePort message at the Ports layer
> for joining two independent port cycles which each have
> an unused (i.e. unwritten, unread, unsent) receiving port.
>
> MergePort allows us to create a MessagePipeDispatcher which
> is immediately usable but which will eventually be linked to
> a MessagePipeDispatcher on another port cycle, potentially in
> another process.
>
> The basic idea is to create a fully functional port pair but
> only bind one port to an MPD. Do this on each end and
> merge the dangling ports asynchronously.
>
> The simplification here allows a lot of code to be deleted
> from NodeController, some of which is deleted in this CL.
>
> Future work will convert existing bootstrap sites back to
> using synchronous bootstrap, including the token-based APIs.
>
> BUG=584764
> TBR=ben@chromium.org for null check in mash shell
>
> Committed: https://crrev.com/b3ea203171e07f5c7e476e94d210ec4ad53ce5b0
> Cr-Commit-Position: refs/heads/master@{#374322}
TBR=amistry@chromium.org,darin@chormium.org,rockot@chromium.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=584764
Review URL: https://codereview.chromium.org/1678333003
Cr-Commit-Position: refs/heads/master@{#374346}
22 files changed, 661 insertions, 745 deletions
diff --git a/mash/shell/shell_application_delegate.cc b/mash/shell/shell_application_delegate.cc index a88c6d6..032ad13 100644 --- a/mash/shell/shell_application_delegate.cc +++ b/mash/shell/shell_application_delegate.cc @@ -117,12 +117,8 @@ void ShellApplicationDelegate::StartRestartableService( // TODO(beng): This would be the place to insert logic that counted restarts // to avoid infinite crash-restart loops. scoped_ptr<mojo::Connection> connection = shell_->Connect(url); - // Note: |connection| may be null if we've lost our connection to the shell. - if (connection) { - connection->SetRemoteServiceProviderConnectionErrorHandler( - restart_callback); - connections_[url] = std::move(connection); - } + connection->SetRemoteServiceProviderConnectionErrorHandler(restart_callback); + connections_[url] = std::move(connection); } } // namespace shell diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc index 926ee18..d1ab6c6 100644 --- a/mojo/edk/embedder/embedder.cc +++ b/mojo/edk/embedder/embedder.cc @@ -107,37 +107,29 @@ void ShutdownIPCSupport() { ScopedMessagePipeHandle CreateMessagePipe( ScopedPlatformHandle platform_handle) { - DCHECK(internal::g_core); - return internal::g_core->CreateMessagePipe(std::move(platform_handle)); + NOTREACHED(); + return ScopedMessagePipeHandle(); } void CreateMessagePipe( ScopedPlatformHandle platform_handle, const base::Callback<void(ScopedMessagePipeHandle)>& callback) { DCHECK(internal::g_core); - callback.Run(CreateMessagePipe(std::move(platform_handle))); -} - -ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token) { - DCHECK(internal::g_core); - return internal::g_core->CreateParentMessagePipe(token); + internal::g_core->CreateMessagePipe(std::move(platform_handle), callback); } void CreateParentMessagePipe( const std::string& token, const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - callback.Run(CreateParentMessagePipe(token)); -} - -ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token) { DCHECK(internal::g_core); - return internal::g_core->CreateChildMessagePipe(token); + internal::g_core->CreateParentMessagePipe(token, callback); } void CreateChildMessagePipe( const std::string& token, const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - callback.Run(CreateChildMessagePipe(token)); + DCHECK(internal::g_core); + internal::g_core->CreateChildMessagePipe(token, callback); } std::string GenerateRandomToken() { diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h index 49b64c4..b43748c 100644 --- a/mojo/edk/embedder/embedder.h +++ b/mojo/edk/embedder/embedder.h @@ -118,12 +118,7 @@ MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupportOnIOThread(); // |OnShutdownComplete()|. MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupport(); -// Creates a message pipe over an arbitrary platform channel. The other end of -// the channel must also be passed to this function. Either endpoint can be in -// any process. -// -// Note that the channel is only used to negotiate pipe connection, not as the -// transport for messages on the pipe. +// Unused. Crashes. Only here for linking. MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle CreateMessagePipe(ScopedPlatformHandle platform_handle); @@ -133,10 +128,13 @@ CreateMessagePipe(ScopedPlatformHandle platform_handle); // either PreInitializeChildProcess() or SetParentPipe() must have been been // called at least once already. // -// |callback| must be safe to call from any thread. +// Note: This only exists for backwards compatibility with embedders that rely +// on mojo::embedder::CreateChannel() behavior. If you have a means of passing +// platform handles around, you can probably also pass strings around. If you +// can pass strings around, use CreateParentMessagePipe() and +// CreateChlidMessagePipe() instead (see below.) // -// DEPRECATED: Please don't use this. Use the synchronous version above. This -// is now merely an inconvenient wrapper for that. +// |callback| must be safe to call from any thread. MOJO_SYSTEM_IMPL_EXPORT void CreateMessagePipe( ScopedPlatformHandle platform_handle, @@ -145,17 +143,8 @@ CreateMessagePipe( // Creates a message pipe from a token. A child embedder must also have this // token and call CreateChildMessagePipe() with it in order for the pipe to get // connected. -MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle -CreateParentMessagePipe(const std::string& token); - -// Creates a message pipe from a token. A child embedder must also have this -// token and call CreateChildMessagePipe() with it in order for the pipe to get -// connected. // // |callback| must be safe to call from any thread. -// -// DEPRECATED: Please don't use this. Use the synchronous version above. This -// is now merely an inconvenient wrapper for that. MOJO_SYSTEM_IMPL_EXPORT void CreateParentMessagePipe( const std::string& token, @@ -164,17 +153,8 @@ CreateParentMessagePipe( // Creates a message pipe from a token in a child process. The parent must also // have this token and call CreateParentMessagePipe() with it in order for the // pipe to get connected. -MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle -CreateChildMessagePipe(const std::string& token); - -// Creates a message pipe from a token in a child process. The parent must also -// have this token and call CreateParentMessagePipe() with it in order for the -// pipe to get connected. // // |callback| must be safe to call from any thread. -// -// DEPRECATED: Please don't use this. Use the synchronous version above. This -// is now merely an inconvenient wrapper for that. MOJO_SYSTEM_IMPL_EXPORT void CreateChildMessagePipe( const std::string& token, diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc index d0b7b67..aad9541 100644 --- a/mojo/edk/system/core.cc +++ b/mojo/edk/system/core.cc @@ -44,9 +44,19 @@ namespace { // This is an unnecessarily large limit that is relatively easy to enforce. const uint32_t kMaxHandlesPerMessage = 1024 * 1024; -// TODO: Maybe we could negotiate a debugging pipe ID for cross-process pipes -// too; for now we just use a constant. This only affects bootstrap pipes. -const uint64_t kUnknownPipeIdForDebug = 0x7f7f7f7f7f7f7f7fUL; +void OnPortConnected( + Core* core, + int endpoint, + const base::Callback<void(ScopedMessagePipeHandle)>& callback, + const ports::PortRef& port) { + // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too; + // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for + // bootstrap and aren't passed around, so tracking them is less important. + MojoHandle handle = core->AddDispatcher( + new MessagePipeDispatcher(core->GetNodeController(), port, + 0x7f7f7f7f7f7f7f7fUL, endpoint)); + callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle))); +} } // namespace @@ -150,37 +160,32 @@ void Core::RequestShutdown(const base::Closure& callback) { GetNodeController()->RequestShutdown(on_shutdown); } -ScopedMessagePipeHandle Core::CreateMessagePipe( - ScopedPlatformHandle platform_handle) { - ports::PortRef port0, port1; - GetNodeController()->node()->CreatePortPair(&port0, &port1); - MojoHandle handle = AddDispatcher( - new MessagePipeDispatcher(GetNodeController(), port0, - kUnknownPipeIdForDebug, 0)); +void Core::CreateMessagePipe( + ScopedPlatformHandle platform_handle, + const base::Callback<void(ScopedMessagePipeHandle)>& callback) { + ports::PortRef port; + GetNodeController()->node()->CreateUninitializedPort(&port); RemoteMessagePipeBootstrap::Create( - GetNodeController(), std::move(platform_handle), port1); - return ScopedMessagePipeHandle(MessagePipeHandle(handle)); -} - -ScopedMessagePipeHandle Core::CreateParentMessagePipe( - const std::string& token) { - ports::PortRef port0, port1; - GetNodeController()->node()->CreatePortPair(&port0, &port1); - MojoHandle handle = AddDispatcher( - new MessagePipeDispatcher(GetNodeController(), port0, - kUnknownPipeIdForDebug, 0)); - GetNodeController()->ReservePort(token, port1); - return ScopedMessagePipeHandle(MessagePipeHandle(handle)); -} - -ScopedMessagePipeHandle Core::CreateChildMessagePipe(const std::string& token) { - ports::PortRef port0, port1; - GetNodeController()->node()->CreatePortPair(&port0, &port1); - MojoHandle handle = AddDispatcher( - new MessagePipeDispatcher(GetNodeController(), port0, - kUnknownPipeIdForDebug, 1)); - GetNodeController()->MergePortIntoParent(token, port1); - return ScopedMessagePipeHandle(MessagePipeHandle(handle)); + GetNodeController(), std::move(platform_handle), port, + base::Bind(&OnPortConnected, base::Unretained(this), 0, callback, port)); +} + +void Core::CreateParentMessagePipe( + const std::string& token, + const base::Callback<void(ScopedMessagePipeHandle)>& callback) { + GetNodeController()->ReservePort( + token, + base::Bind(&OnPortConnected, base::Unretained(this), 0, callback)); +} + +void Core::CreateChildMessagePipe( + const std::string& token, + const base::Callback<void(ScopedMessagePipeHandle)>& callback) { + ports::PortRef port; + GetNodeController()->node()->CreateUninitializedPort(&port); + GetNodeController()->ConnectToParentPort( + port, token, + base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port)); } MojoResult Core::AsyncWait(MojoHandle handle, diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h index 603cba9..8e9af8e 100644 --- a/mojo/edk/system/core.h +++ b/mojo/edk/system/core.h @@ -52,19 +52,26 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { // Called in a child process exactly once during early initialization. void InitChild(ScopedPlatformHandle platform_handle); - // Creates a message pipe endpoint connected to an endpoint in a remote + // This creates a message pipe endpoint connected to an endpoint in a remote // embedder. |platform_handle| is used as a channel to negotiate the - // connection. - ScopedMessagePipeHandle CreateMessagePipe( - ScopedPlatformHandle platform_handle); + // connection. This is only here to facilitate legacy embedder code. See + // mojo::edk::CreateMessagePipe in mojo/edk/embedder/embedder.h. + void CreateMessagePipe( + ScopedPlatformHandle platform_handle, + const base::Callback<void(ScopedMessagePipeHandle)>& callback); // Creates a message pipe endpoint associated with |token|, which a child // holding the token can later locate and connect to. - ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token); - - // Creates a message pipe endpoint and connects it to a pipe the parent has - // associated with |token|. - ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token); + void CreateParentMessagePipe( + const std::string& token, + const base::Callback<void(ScopedMessagePipeHandle)>& callback); + + // Creates a message pipe endpoint associated with |token|, which will be + // passed to the parent in order to find an associated remote port and connect + // to it. + void CreateChildMessagePipe( + const std::string& token, + const base::Callback<void(ScopedMessagePipeHandle)>& callback); MojoHandle AddDispatcher(scoped_refptr<Dispatcher> dispatcher); diff --git a/mojo/edk/system/multiprocess_message_pipe_unittest.cc b/mojo/edk/system/multiprocess_message_pipe_unittest.cc index ece85a1..00c2ab5 100644 --- a/mojo/edk/system/multiprocess_message_pipe_unittest.cc +++ b/mojo/edk/system/multiprocess_message_pipe_unittest.cc @@ -19,7 +19,6 @@ #include "base/logging.h" #include "base/strings/string_split.h" #include "build/build_config.h" -#include "mojo/edk/embedder/platform_channel_pair.h" #include "mojo/edk/embedder/scoped_platform_handle.h" #include "mojo/edk/system/handle_signals_state.h" #include "mojo/edk/system/test_utils.h" @@ -1272,44 +1271,6 @@ TEST_F(MultiprocessMessagePipeTest, WriteCloseSendPeer) { END_CHILD() } -DEFINE_TEST_CLIENT_TEST_WITH_PIPE(BootstrapMessagePipeAsyncClient, - MultiprocessMessagePipeTest, parent) { - // Receive one end of a platform channel from the parent. - MojoHandle channel_handle; - EXPECT_EQ("hi", ReadMessageWithHandles(parent, &channel_handle, 1)); - ScopedPlatformHandle channel; - EXPECT_EQ(MOJO_RESULT_OK, - edk::PassWrappedPlatformHandle(channel_handle, &channel)); - ASSERT_TRUE(channel.is_valid()); - - // Create a new pipe using our end of the channel. - ScopedMessagePipeHandle pipe = edk::CreateMessagePipe(std::move(channel)); - - // Ensure that we can read and write on the new pipe. - VerifyEcho(pipe.get().value(), "goodbye"); -} - -TEST_F(MultiprocessMessagePipeTest, BootstrapMessagePipeAsync) { - // Tests that new cross-process message pipes can be created synchronously - // using asynchronous negotiation over an arbitrary platform channel. - RUN_CHILD_ON_PIPE(BootstrapMessagePipeAsyncClient, child) - // Pass one end of a platform channel to the child. - PlatformChannelPair platform_channel; - MojoHandle client_channel_handle; - EXPECT_EQ(MOJO_RESULT_OK, - CreatePlatformHandleWrapper(platform_channel.PassClientHandle(), - &client_channel_handle)); - WriteMessageWithHandles(child, "hi", &client_channel_handle, 1); - - // Create a new pipe using our end of the channel. - ScopedMessagePipeHandle pipe = - edk::CreateMessagePipe(platform_channel.PassServerHandle()); - - // Ensure that we can read and write on the new pipe. - VerifyEcho(pipe.get().value(), "goodbye"); - END_CHILD() -} - } // namespace } // namespace edk } // namespace mojo diff --git a/mojo/edk/system/node_channel.cc b/mojo/edk/system/node_channel.cc index 0277d46..4fb0ac4 100644 --- a/mojo/edk/system/node_channel.cc +++ b/mojo/edk/system/node_channel.cc @@ -29,7 +29,8 @@ enum class MessageType : uint32_t { BROKER_CLIENT_ADDED, ACCEPT_BROKER_CLIENT, PORTS_MESSAGE, - REQUEST_PORT_MERGE, + REQUEST_PORT_CONNECTION, + CONNECT_TO_PORT, REQUEST_INTRODUCTION, INTRODUCE, #if defined(OS_WIN) @@ -73,10 +74,15 @@ struct AcceptBrokerClientData { // This is followed by arbitrary payload data which is interpreted as a token // string for port location. -struct RequestPortMergeData { +struct RequestPortConnectionData { ports::PortName connector_port_name; }; +struct ConnectToPortData { + ports::PortName connector_port_name; + ports::PortName connectee_port_name; +}; + // Used for both REQUEST_INTRODUCTION and INTRODUCE. // // For INTRODUCE the message also includes a valid platform handle for a channel @@ -261,17 +267,28 @@ void NodeChannel::PortsMessage(Channel::MessagePtr message) { WriteChannelMessage(std::move(message)); } -void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name, - const std::string& token) { - RequestPortMergeData* data; +void NodeChannel::RequestPortConnection( + const ports::PortName& connector_port_name, + const std::string& token) { + RequestPortConnectionData* data; Channel::MessagePtr message = CreateMessage( - MessageType::REQUEST_PORT_MERGE, - sizeof(RequestPortMergeData) + token.size(), 0, &data); + MessageType::REQUEST_PORT_CONNECTION, + sizeof(RequestPortConnectionData) + token.size(), 0, &data); data->connector_port_name = connector_port_name; memcpy(data + 1, token.data(), token.size()); WriteChannelMessage(std::move(message)); } +void NodeChannel::ConnectToPort(const ports::PortName& connector_port_name, + const ports::PortName& connectee_port_name) { + ConnectToPortData* data; + Channel::MessagePtr message = CreateMessage( + MessageType::CONNECT_TO_PORT, sizeof(ConnectToPortData), 0, &data); + data->connector_port_name = connector_port_name; + data->connectee_port_name = connectee_port_name; + WriteChannelMessage(std::move(message)); +} + void NodeChannel::RequestIntroduction(const ports::NodeName& name) { IntroductionData* data; Channel::MessagePtr message = CreateMessage( @@ -435,16 +452,24 @@ void NodeChannel::OnChannelMessage(const void* payload, break; } - case MessageType::REQUEST_PORT_MERGE: { - const RequestPortMergeData* data; + case MessageType::REQUEST_PORT_CONNECTION: { + const RequestPortConnectionData* data; GetMessagePayload(payload, &data); const char* token_data = reinterpret_cast<const char*>(data + 1); const size_t token_size = payload_size - sizeof(*data) - sizeof(Header); std::string token(token_data, token_size); - delegate_->OnRequestPortMerge(remote_node_name_, - data->connector_port_name, token); + delegate_->OnRequestPortConnection(remote_node_name_, + data->connector_port_name, token); + break; + } + + case MessageType::CONNECT_TO_PORT: { + const ConnectToPortData* data; + GetMessagePayload(payload, &data); + delegate_->OnConnectToPort(remote_node_name_, data->connector_port_name, + data->connectee_port_name); break; } diff --git a/mojo/edk/system/node_channel.h b/mojo/edk/system/node_channel.h index 2bc86bd..a9bc520 100644 --- a/mojo/edk/system/node_channel.h +++ b/mojo/edk/system/node_channel.h @@ -44,9 +44,14 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>, const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel) = 0; virtual void OnPortsMessage(Channel::MessagePtr message) = 0; - virtual void OnRequestPortMerge(const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const std::string& token) = 0; + virtual void OnRequestPortConnection( + const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const std::string& token) = 0; + virtual void OnConnectToPort( + const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const ports::PortName& connectee_port_name) = 0; virtual void OnRequestIntroduction(const ports::NodeName& from_node, const ports::NodeName& name) = 0; virtual void OnIntroduce(const ports::NodeName& from_node, @@ -98,8 +103,10 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>, void AcceptBrokerClient(const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel); void PortsMessage(Channel::MessagePtr message); - void RequestPortMerge(const ports::PortName& connector_port_name, - const std::string& token); + void RequestPortConnection(const ports::PortName& connector_port_name, + const std::string& token); + void ConnectToPort(const ports::PortName& connector_port_name, + const ports::PortName& connectee_port_name); void RequestIntroduction(const ports::NodeName& name); void Introduce(const ports::NodeName& name, ScopedPlatformHandle channel_handle); diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc index 38c53df..6410cae 100644 --- a/mojo/edk/system/node_controller.cc +++ b/mojo/edk/system/node_controller.cc @@ -99,6 +99,18 @@ class ThreadDestructionObserver : } // namespace +NodeController::PendingPortRequest::PendingPortRequest() {} + +NodeController::PendingPortRequest::~PendingPortRequest() {} + +NodeController::ReservedPort::ReservedPort() {} + +NodeController::ReservedPort::~ReservedPort() {} + +NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {} + +NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {} + NodeController::~NodeController() {} NodeController::NodeController(Core* core) @@ -170,25 +182,18 @@ int NodeController::SendMessage(const ports::PortRef& port, } void NodeController::ReservePort(const std::string& token, - const ports::PortRef& port) { + const ReservePortCallback& callback) { + ports::PortRef port; + node_->CreateUninitializedPort(&port); + DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " << token; base::AutoLock lock(reserved_ports_lock_); - auto result = reserved_ports_.insert(std::make_pair(token, port)); - DCHECK(result.second); -} - -void NodeController::MergePortIntoParent(const std::string& token, - const ports::PortRef& port) { - scoped_refptr<NodeChannel> parent = GetParentChannel(); - if (parent) { - parent->RequestPortMerge(port.name(), token); - return; - } - - base::AutoLock lock(pending_port_merges_lock_); - pending_port_merges_.push_back(std::make_pair(token, port)); + ReservedPort reservation; + reservation.local_port = port; + reservation.callback = callback; + reserved_ports_.insert(std::make_pair(token, reservation)); } scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( @@ -205,6 +210,42 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( return buffer; } +void NodeController::ConnectToParentPort(const ports::PortRef& local_port, + const std::string& token, + const base::Closure& callback) { + io_task_runner_->PostTask( + FROM_HERE, + base::Bind(&NodeController::RequestParentPortConnectionOnIOThread, + base::Unretained(this), local_port, token, callback)); +} + +void NodeController::ConnectToRemotePort( + const ports::PortRef& local_port, + const ports::NodeName& remote_node_name, + const ports::PortName& remote_port_name, + const base::Closure& callback) { + if (remote_node_name == name_) { + // It's possible that two different code paths on the node are trying to + // bootstrap ports to each other (e.g. in Chrome single-process mode) + // without being aware of the fact. In this case we can initialize the port + // immediately (which can fail silently if it's already been initialized by + // the request on the other side), and invoke |callback|. + node_->InitializePort(local_port, name_, remote_port_name); + callback.Run(); + return; + } + + PendingRemotePortConnection connection; + connection.local_port = local_port; + connection.remote_node_name = remote_node_name; + connection.remote_port_name = remote_port_name; + connection.callback = callback; + io_task_runner_->PostTask( + FROM_HERE, + base::Bind(&NodeController::ConnectToRemotePortOnIOThread, + base::Unretained(this), connection)); +} + void NodeController::RequestShutdown(const base::Closure& callback) { { base::AutoLock lock(shutdown_lock_); @@ -263,6 +304,46 @@ void NodeController::ConnectToParentOnIOThread( bootstrap_parent_channel_->Start(); } +void NodeController::RequestParentPortConnectionOnIOThread( + const ports::PortRef& local_port, + const std::string& token, + const base::Closure& callback) { + DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); + + scoped_refptr<NodeChannel> parent = GetParentChannel(); + if (!parent) { + PendingPortRequest request; + request.token = token; + request.local_port = local_port; + request.callback = callback; + pending_port_requests_.push_back(request); + return; + } + + pending_parent_port_connections_.insert( + std::make_pair(local_port.name(), callback)); + parent->RequestPortConnection(local_port.name(), token); +} + +void NodeController::ConnectToRemotePortOnIOThread( + const PendingRemotePortConnection& connection) { + scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name); + if (peer) { + // It's safe to initialize the port since we already have a channel to its + // peer. No need to actually send them a message. + int rv = node_->InitializePort(connection.local_port, + connection.remote_node_name, + connection.remote_port_name); + DCHECK_EQ(rv, ports::OK); + connection.callback.Run(); + return; + } + + // Save this for later. We'll initialize the port once this peer is added. + pending_remote_port_connections_[connection.remote_node_name].push_back( + connection); +} + scoped_refptr<NodeChannel> NodeController::GetPeerChannel( const ports::NodeName& name) { base::AutoLock lock(peers_lock_); @@ -333,6 +414,19 @@ void NodeController::AddPeer(const ports::NodeName& name, channel->PortsMessage(std::move(pending_messages.front())); pending_messages.pop(); } + + // Complete any pending port connections to this peer. + auto connections_it = pending_remote_port_connections_.find(name); + if (connections_it != pending_remote_port_connections_.end()) { + for (const auto& connection : connections_it->second) { + int rv = node_->InitializePort(connection.local_port, + connection.remote_node_name, + connection.remote_port_name); + DCHECK_EQ(rv, ports::OK); + connection.callback.Run(); + } + pending_remote_port_connections_.erase(connections_it); + } } void NodeController::DropPeer(const ports::NodeName& name) { @@ -522,8 +616,6 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node, // NOTE: The child does not actually add its parent as a peer until // receiving an AcceptBrokerClient message from the broker. The parent // will request that said message be sent upon receiving AcceptParent. - - DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; } void NodeController::OnAcceptParent(const ports::NodeName& from_node, @@ -671,16 +763,15 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, io_task_runner_); AddPeer(broker_name, broker, true /* start_channel */); } - AddPeer(parent_name, parent, false /* start_channel */); - { - // Complete any port merge requests we have waiting for the parent. - base::AutoLock lock(pending_port_merges_lock_); - for (const auto& request : pending_port_merges_) - parent->RequestPortMerge(request.second.name(), request.first); - pending_port_merges_.clear(); + // Resolve any pending port connections to the parent. + for (const auto& request : pending_port_requests_) { + pending_parent_port_connections_.insert( + std::make_pair(request.local_port.name(), request.callback)); + parent->RequestPortConnection(request.local_port.name(), request.token); } + pending_port_requests_.clear(); // Feed the broker any pending children of our own. while (!pending_broker_clients.empty()) { @@ -733,15 +824,16 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) { AttemptShutdownIfRequested(); } -void NodeController::OnRequestPortMerge( +void NodeController::OnRequestPortConnection( const ports::NodeName& from_node, const ports::PortName& connector_port_name, const std::string& token) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " + DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " << token << " and port " << connector_port_name << "@" << from_node; + ReservePortCallback callback; ports::PortRef local_port; { base::AutoLock lock(reserved_ports_lock_); @@ -751,12 +843,64 @@ void NodeController::OnRequestPortMerge( << token; return; } - local_port = it->second; + local_port = it->second.local_port; + callback = it->second.callback; + reserved_ports_.erase(it); + } + + DCHECK(!callback.is_null()); + + scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); + if (!peer) { + DVLOG(1) << "Ignoring request to connect to port from unknown node " + << from_node; + return; } - int rv = node_->MergePorts(local_port, from_node, connector_port_name); - if (rv != ports::OK) - DLOG(ERROR) << "MergePorts failed: " << rv; + // This reserved port should not have been initialized yet. + CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node, + connector_port_name)); + + peer->ConnectToPort(local_port.name(), connector_port_name); + callback.Run(local_port); +} + +void NodeController::OnConnectToPort( + const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const ports::PortName& connectee_port_name) { + DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); + + DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port " + << connectee_port_name << " to port " << connector_port_name << "@" + << from_node; + + ports::PortRef connectee_port; + int rv = node_->GetPort(connectee_port_name, &connectee_port); + if (rv != ports::OK) { + DLOG(ERROR) << "Ignoring ConnectToPort for unknown port " + << connectee_port_name; + return; + } + + // It's OK if this port has already been initialized. This message is only + // sent by the remote peer to ensure the port is ready before it starts + // us sending messages to it. + ports::PortStatus port_status; + rv = node_->GetStatus(connectee_port, &port_status); + if (rv == ports::OK) { + DVLOG(1) << "Ignoring ConnectToPort for already-initialized port " + << connectee_port_name; + return; + } + + CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node, + connector_port_name)); + + auto it = pending_parent_port_connections_.find(connectee_port_name); + DCHECK(it != pending_parent_port_connections_.end()); + it->second.Run(); + pending_parent_port_connections_.erase(it); } void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h index 0675e97..35148a0 100644 --- a/mojo/edk/system/node_controller.h +++ b/mojo/edk/system/node_controller.h @@ -82,13 +82,42 @@ class NodeController : public ports::NodeDelegate, int SendMessage(const ports::PortRef& port_ref, scoped_ptr<PortsMessage>* message); - // Reserves a local port |port| associated with |token|. A peer holding a copy - // of |token| can merge one of its own ports into this one. - void ReservePort(const std::string& token, const ports::PortRef& port); + using ReservePortCallback = base::Callback<void(const ports::PortRef& port)>; - // Merges a local port |port| into a port reserved by |token| in the parent. - void MergePortIntoParent(const std::string& token, - const ports::PortRef& port); + // Reserves a port associated with |token|. A peer may associate one of their + // own ports with this one by sending us a RequestPortConnection message with + // the same token value. + // + // Note that the reservation is made synchronously. In order to avoid races, + // reservations should be acquired before |token| is communicated to any + // potential peer. + // + // |callback| must be runnable on any thread and will be run with a reference + // to the new local port once connected. + void ReservePort(const std::string& token, + const ReservePortCallback& callback); + + // Eventually initializes a local port with a parent port peer identified by + // |token|. The parent should also have |token| and should alrady have + // reserved a port for it. |callback| must be runnable on any thread and will + // be run if and when the local port is connected. + void ConnectToParentPort(const ports::PortRef& local_port, + const std::string& token, + const base::Closure& callback); + + // Connects two reserved ports to each other. Useful when two independent + // systems in the same (parent) process need to establish a port pair without + // any direct knowledge of each other. + void ConnectReservedPorts(const std::string& token1, + const std::string& token2); + + // Connects a local port to a port on a remote node. Note that a connection to + // the remote node need not be established yet. The port will be connected + // ASAP, at which point |callback| will be run. + void ConnectToRemotePort(const ports::PortRef& local_port, + const ports::NodeName& remote_node_name, + const ports::PortName& remote_port_name, + const base::Closure& callback); // Creates a new shared buffer for use in the current process. scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes); @@ -109,9 +138,44 @@ class NodeController : public ports::NodeDelegate, scoped_refptr<NodeChannel>>; using OutgoingMessageQueue = std::queue<Channel::MessagePtr>; + // Tracks a pending token-based connection to a parent port. + struct PendingPortRequest { + PendingPortRequest(); + ~PendingPortRequest(); + + std::string token; + ports::PortRef local_port; + base::Closure callback; + }; + + // Tracks a reserved port. + struct ReservedPort { + ReservedPort(); + ~ReservedPort(); + + ports::PortRef local_port; + ReservePortCallback callback; + }; + + // Tracks a pending connection to a remote port on any peer. + struct PendingRemotePortConnection { + PendingRemotePortConnection(); + ~PendingRemotePortConnection(); + + ports::PortRef local_port; + ports::NodeName remote_node_name; + ports::PortName remote_port_name; + base::Closure callback; + }; + void ConnectToChildOnIOThread(base::ProcessHandle process_handle, ScopedPlatformHandle platform_handle); void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle); + void RequestParentPortConnectionOnIOThread(const ports::PortRef& local_port, + const std::string& token, + const base::Closure& callback); + void ConnectToRemotePortOnIOThread( + const PendingRemotePortConnection& connection); scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); scoped_refptr<NodeChannel> GetParentChannel(); @@ -151,9 +215,12 @@ class NodeController : public ports::NodeDelegate, const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel) override; void OnPortsMessage(Channel::MessagePtr message) override; - void OnRequestPortMerge(const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const std::string& token) override; + void OnRequestPortConnection(const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const std::string& token) override; + void OnConnectToPort(const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const ports::PortName& connectee_port_name) override; void OnRequestIntroduction(const ports::NodeName& from_node, const ports::NodeName& name) override; void OnIntroduce(const ports::NodeName& from_node, @@ -197,13 +264,7 @@ class NodeController : public ports::NodeDelegate, base::Lock reserved_ports_lock_; // Ports reserved by token. - base::hash_map<std::string, ports::PortRef> reserved_ports_; - - // Guards |pending_port_merges_|. - base::Lock pending_port_merges_lock_; - - // A set of port merge requests awaiting parent connection. - std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_; + base::hash_map<std::string, ReservedPort> reserved_ports_; // Guards |parent_name_| and |bootstrap_parent_channel_|. base::Lock parent_lock_; @@ -246,6 +307,17 @@ class NodeController : public ports::NodeDelegate, // Channels to children during handshake. NodeMap pending_children_; + // Port connection requests which have been deferred until we have a parent. + std::vector<PendingPortRequest> pending_port_requests_; + + // Port connection requests awaiting a response from the parent. + std::unordered_map<ports::PortName, base::Closure> + pending_parent_port_connections_; + + // Port connections pending the availability of a remote peer node. + std::unordered_map<ports::NodeName, std::vector<PendingRemotePortConnection>> + pending_remote_port_connections_; + // Indicates whether this object should delete itself on IO thread shutdown. // Must only be accessed from the IO thread. bool destroy_on_io_thread_shutdown_ = false; diff --git a/mojo/edk/system/ports/event.h b/mojo/edk/system/ports/event.h index 1dc8eab..aec9f6b 100644 --- a/mojo/edk/system/ports/event.h +++ b/mojo/edk/system/ports/event.h @@ -34,7 +34,6 @@ enum struct EventType : uint32_t { kObserveProxy, kObserveProxyAck, kObserveClosure, - kMergePort, }; struct EventHeader { @@ -64,11 +63,6 @@ struct ObserveClosureEventData { uint64_t last_sequence_num; }; -struct MergePortEventData { - PortName new_port_name; - PortDescriptor new_port_descriptor; -}; - inline const EventHeader* GetEventHeader(const Message& message) { return static_cast<const EventHeader*>(message.header_bytes()); } diff --git a/mojo/edk/system/ports/message.cc b/mojo/edk/system/ports/message.cc index 2106c15..20f6d88 100644 --- a/mojo/edk/system/ports/message.cc +++ b/mojo/edk/system/ports/message.cc @@ -37,9 +37,6 @@ void Message::Parse(const void* bytes, case EventType::kObserveClosure: *num_header_bytes = sizeof(EventHeader) + sizeof(ObserveClosureEventData); break; - case EventType::kMergePort: - *num_header_bytes = sizeof(EventHeader) + sizeof(MergePortEventData); - break; default: CHECK(false) << "Bad event type"; return; diff --git a/mojo/edk/system/ports/name.h b/mojo/edk/system/ports/name.h index 8a9307d..b0ad0b7 100644 --- a/mojo/edk/system/ports/name.h +++ b/mojo/edk/system/ports/name.h @@ -8,7 +8,6 @@ #include <stdint.h> #include <ostream> -#include <tuple> namespace mojo { namespace edk { @@ -22,15 +21,9 @@ struct Name { inline bool operator==(const Name& a, const Name& b) { return a.v1 == b.v1 && a.v2 == b.v2; } - inline bool operator!=(const Name& a, const Name& b) { return !(a == b); } - -inline bool operator<(const Name& a, const Name& b) { - return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2); -} - std::ostream& operator<<(std::ostream& stream, const Name& name); struct PortName : Name { diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc index 9cfc511..1718306 100644 --- a/mojo/edk/system/ports/node.cc +++ b/mojo/edk/system/ports/node.cc @@ -354,39 +354,10 @@ int Node::AcceptMessage(ScopedMessage message) { return OnObserveClosure( header->port_name, GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); - case EventType::kMergePort: - return OnMergePort(header->port_name, - *GetEventData<MergePortEventData>(*message)); } return OOPS(ERROR_NOT_IMPLEMENTED); } -int Node::MergePorts(const PortRef& port_ref, - const NodeName& destination_node_name, - const PortName& destination_port_name) { - Port* port = port_ref.port(); - { - // |ports_lock_| must be held for WillSendPort_Locked below. - base::AutoLock ports_lock(ports_lock_); - base::AutoLock lock(port->lock); - - DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ - << " to " << destination_port_name << "@" << destination_node_name; - - // Send the port-to-merge over to the destination node so it can be merged - // into the port cycle atomically there. - MergePortEventData data; - data.new_port_name = port_ref.name(); - WillSendPort_Locked(port, destination_node_name, &data.new_port_name, - &data.new_port_descriptor); - delegate_->ForwardMessage( - destination_node_name, - NewInternalMessage(destination_port_name, - EventType::kMergePort, data)); - } - return OK; -} - int Node::LostConnectionToNode(const NodeName& node_name) { // We can no longer send events to the given node. We also can't expect any // PortAccepted events. @@ -538,8 +509,34 @@ int Node::OnPortAccepted(const PortName& port_name) { << " pointing to " << port->peer_port_name << "@" << port->peer_node_name; - return BeginProxying_Locked(port.get(), port_name); + if (port->state != Port::kBuffering) + return OOPS(ERROR_PORT_STATE_UNEXPECTED); + + port->state = Port::kProxying; + + int rv = ForwardMessages_Locked(port.get(), port_name); + if (rv != OK) + return rv; + + // We may have observed closure before receiving PortAccepted. In that + // case, we can advance to removing the proxy without sending out an + // ObserveProxy message. We already know the last expected message, etc. + + if (port->remove_proxy_on_last_message) { + MaybeRemoveProxy_Locked(port.get(), port_name); + + // Make sure we propagate closure to our current peer. + ObserveClosureEventData data; + data.last_sequence_num = port->last_sequence_num_to_receive; + delegate_->ForwardMessage( + port->peer_node_name, + NewInternalMessage(port->peer_port_name, + EventType::kObserveClosure, data)); + } else { + InitiateProxyRemoval_Locked(port.get(), port_name); + } } + return OK; } int Node::OnObserveProxy(const PortName& port_name, @@ -724,94 +721,6 @@ int Node::OnObserveClosure(const PortName& port_name, return OK; } -int Node::OnMergePort(const PortName& port_name, - const MergePortEventData& event) { - scoped_refptr<Port> port = GetPort(port_name); - if (!port) - return ERROR_PORT_UNKNOWN; - - DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" - << port->state << ") merging with proxy " << event.new_port_name - << "@" << name_ << " pointing to " - << event.new_port_descriptor.peer_port_name << "@" - << event.new_port_descriptor.peer_node_name << " referred by " - << event.new_port_descriptor.referring_port_name << "@" - << event.new_port_descriptor.referring_node_name; - - bool close_target_port = false; - bool close_new_port = false; - - // Accept the new port. This is now the receiving end of the other port cycle - // to be merged with ours. - int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); - if (rv != OK) { - close_target_port = true; - } else { - // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn - // needs to hold |ports_lock_|. We also acquire multiple port locks within. - base::AutoLock ports_lock(ports_lock_); - base::AutoLock lock(port->lock); - - if (port->state != Port::kReceiving) { - close_new_port = true; - } else { - scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); - base::AutoLock new_port_lock(new_port->lock); - DCHECK(new_port->state == Port::kReceiving); - - // Both ports are locked. Now all we have to do is swap their peer - // information and set them up as proxies. - - std::swap(port->peer_node_name, new_port->peer_node_name); - std::swap(port->peer_port_name, new_port->peer_port_name); - std::swap(port->peer_closed, new_port->peer_closed); - - port->state = Port::kBuffering; - if (port->peer_closed) - port->remove_proxy_on_last_message = true; - - new_port->state = Port::kBuffering; - if (new_port->peer_closed) - new_port->remove_proxy_on_last_message = true; - - int rv1 = BeginProxying_Locked(port.get(), port_name); - int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); - - if (rv1 == OK && rv2 == OK) - return OK; - - // If either proxy failed to initialize (e.g. had undeliverable messages - // or ended up in a bad state somehow), we keep the system in a consistent - // state by undoing the peer swap and closing both merge ports. - - std::swap(port->peer_node_name, new_port->peer_node_name); - std::swap(port->peer_port_name, new_port->peer_port_name); - port->state = Port::kReceiving; - new_port->state = Port::kReceiving; - close_new_port = true; - close_target_port = true; - } - } - - if (close_target_port) { - PortRef target_port; - rv = GetPort(port_name, &target_port); - DCHECK(rv == OK); - - ClosePort(target_port); - } - - if (close_new_port) { - PortRef new_port; - rv = GetPort(event.new_port_name, &new_port); - DCHECK(rv == OK); - - ClosePort(new_port); - } - - return ERROR_PORT_STATE_UNEXPECTED; -} - int Node::AddPortWithName(const PortName& port_name, const scoped_refptr<Port>& port) { base::AutoLock lock(ports_lock_); @@ -999,40 +908,6 @@ int Node::WillSendMessage_Locked(Port* port, return OK; } -int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { - ports_lock_.AssertAcquired(); - port->lock.AssertAcquired(); - - if (port->state != Port::kBuffering) - return OOPS(ERROR_PORT_STATE_UNEXPECTED); - - port->state = Port::kProxying; - - int rv = ForwardMessages_Locked(port, port_name); - if (rv != OK) - return rv; - - // We may have observed closure while buffering. In that case, we can advance - // to removing the proxy without sending out an ObserveProxy message. We - // already know the last expected message, etc. - - if (port->remove_proxy_on_last_message) { - MaybeRemoveProxy_Locked(port, port_name); - - // Make sure we propagate closure to our current peer. - ObserveClosureEventData data; - data.last_sequence_num = port->last_sequence_num_to_receive; - delegate_->ForwardMessage( - port->peer_node_name, - NewInternalMessage(port->peer_port_name, - EventType::kObserveClosure, data)); - } else { - InitiateProxyRemoval_Locked(port, port_name); - } - - return OK; -} - int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { ports_lock_.AssertAcquired(); port->lock.AssertAcquired(); diff --git a/mojo/edk/system/ports/node.h b/mojo/edk/system/ports/node.h index cb1d5fd..410d53b 100644 --- a/mojo/edk/system/ports/node.h +++ b/mojo/edk/system/ports/node.h @@ -122,21 +122,6 @@ class Node { // Corresponding to NodeDelegate::ForwardMessage. int AcceptMessage(ScopedMessage message); - // Called to merge two ports with each other. If you have two independent - // port pairs A <=> B and C <=> D, the net result of merging B and C is a - // single connected port pair A <=> D. - // - // Note that the behavior of this operation is undefined if either port to be - // merged (B or C above) has ever been read from or written to directly, and - // this must ONLY be called on one side of the merge, though it doesn't matter - // which side. - // - // It is safe for the non-merged peers (A and D above) to be transferred, - // closed, and/or written to before, during, or after the merge. - int MergePorts(const PortRef& port_ref, - const NodeName& destination_node_name, - const PortName& destination_port_name); - // Called to inform this node that communication with another node is lost // indefinitely. This triggers cleanup of ports bound to this node. int LostConnectionToNode(const NodeName& node_name); @@ -148,7 +133,6 @@ class Node { const ObserveProxyEventData& event); int OnObserveProxyAck(const PortName& port_name, uint64_t last_sequence_num); int OnObserveClosure(const PortName& port_name, uint64_t last_sequence_num); - int OnMergePort(const PortName& port_name, const MergePortEventData& event); int AddPortWithName(const PortName& port_name, const scoped_refptr<Port>& port); @@ -167,7 +151,6 @@ class Node { int WillSendMessage_Locked(Port* port, const PortName& port_name, Message* message); - int BeginProxying_Locked(Port* port, const PortName& port_name); int ForwardMessages_Locked(Port* port, const PortName& port_name); void InitiateProxyRemoval_Locked(Port* port, const PortName& port_name); void MaybeRemoveProxy_Locked(Port* port, const PortName& port_name); diff --git a/mojo/edk/system/ports/ports_unittest.cc b/mojo/edk/system/ports/ports_unittest.cc index 1bdca3f..95e2a63 100644 --- a/mojo/edk/system/ports/ports_unittest.cc +++ b/mojo/edk/system/ports/ports_unittest.cc @@ -690,12 +690,32 @@ TEST_F(PortsTest, SendUninitialized) { Node node0(node0_name, &node0_delegate); node_map[0] = &node0; - PortRef x0; + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + // Begin to setup a pipe between node0 and node1, but don't initialize either + // endpoint. + PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); + EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); + + node0_delegate.set_save_messages(true); + node1_delegate.set_save_messages(true); + + // Send a message on each port and expect neither to arrive yet. + EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, SendStringMessage(&node0, x0, "oops")); + EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, + SendStringMessage(&node1, x1, "oh well")); + EXPECT_EQ(OK, node0.ClosePort(x0)); + EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, SendFailure) { @@ -1048,322 +1068,6 @@ TEST_F(PortsTest, SendWithClosedPeerSent) { EXPECT_TRUE(node0.CanShutdownCleanly(false)); } -TEST_F(PortsTest, MergePorts) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - // Setup two independent port pairs, A-B on node0 and C-D on node1. - PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_read_messages(false); - node1_delegate.set_save_messages(true); - - // Write a message on A. - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); - - PumpTasks(); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - PumpTasks(); - - // Expect only two receiving ports to be left after pumping tasks. - EXPECT_TRUE(node0.CanShutdownCleanly(true)); - EXPECT_TRUE(node1.CanShutdownCleanly(true)); - - // Expect D to have received the message sent on A. - ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); - - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(D)); - - // No more ports should be open. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); -} - -TEST_F(PortsTest, MergePortWithClosedPeer1) { - // This tests that the right thing happens when initiating a merge on a port - // whose peer has already been closed. - - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - // Setup two independent port pairs, A-B on node0 and C-D on node1. - PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_read_messages(false); - node1_delegate.set_save_messages(true); - - // Write a message on A. - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); - - PumpTasks(); - - // Close A. - EXPECT_EQ(OK, node0.ClosePort(A)); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - PumpTasks(); - - // Expect only one receiving port to be left after pumping tasks. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(true)); - - // Expect D to have received the message sent on A. - ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); - - EXPECT_EQ(OK, node1.ClosePort(D)); - - // No more ports should be open. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); -} - -TEST_F(PortsTest, MergePortWithClosedPeer2) { - // This tests that the right thing happens when merging into a port whose peer - // has already been closed. - - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - // Setup two independent port pairs, A-B on node0 and C-D on node1. - PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_save_messages(true); - node1_delegate.set_read_messages(false); - - // Write a message on D. - EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); - - PumpTasks(); - - // Close D. - EXPECT_EQ(OK, node1.ClosePort(D)); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - PumpTasks(); - - // Expect only one receiving port to be left after pumping tasks. - EXPECT_TRUE(node0.CanShutdownCleanly(true)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); - - // Expect A to have received the message sent on D. - ScopedMessage message; - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); - - EXPECT_EQ(OK, node0.ClosePort(A)); - - // No more ports should be open. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); -} - -TEST_F(PortsTest, MergePortsWithClosedPeers) { - // This tests that no residual ports are left behind if two ports are merged - // when both of their peers have been closed. - - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - // Setup two independent port pairs, A-B on node0 and C-D on node1. - PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_save_messages(true); - node1_delegate.set_read_messages(false); - - // Close A and D. - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(D)); - - PumpTasks(); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - PumpTasks(); - - // Expect everything to have gone away. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); -} - -TEST_F(PortsTest, MergePortsWithMovedPeers) { - // This tests that no ports can be merged successfully even if their peers - // are moved around. - - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - node0_delegate.set_save_messages(true); - node1_delegate.set_read_messages(false); - - // Setup two independent port pairs, A-B on node0 and C-D on node1. - PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - // Set up another pair X-Y for moving ports on node0. - PortRef X, Y; - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); - - ScopedMessage message; - - // Move A to new port E. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - ASSERT_EQ(1u, message->num_ports()); - PortRef E; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); - - EXPECT_EQ(OK, node0.ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(Y)); - - node0_delegate.set_read_messages(false); - - // Write messages on E and D. - EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey")); - EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi")); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - node0_delegate.set_read_messages(true); - node1_delegate.set_read_messages(true); - node1_delegate.set_save_messages(true); - - PumpTasks(); - - // Expect to receive D's message on E and E's message on D. - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hi", ToString(message))); - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); - - // Close E and D. - EXPECT_EQ(OK, node0.ClosePort(E)); - EXPECT_EQ(OK, node1.ClosePort(D)); - - PumpTasks(); - - // Expect everything to have gone away. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); -} - - -TEST_F(PortsTest, MergePortsFailsGracefully) { - // This tests that the system remains in a well-defined state if something - // goes wrong during port merge. - - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - // Setup two independent port pairs, A-B on node0 and C-D on node1. - PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - PumpTasks(); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - // Move C to a new port E. This is dumb and nobody should do it, but it's - // possible. MergePorts will fail as a result because C won't be in a - // receiving state when the event arrives at node1, so B should be closed. - ScopedMessage message; - PortRef X, Y; - EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y)); - node1_delegate.set_save_messages(true); - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C)); - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - ASSERT_EQ(1u, message->num_ports()); - PortRef E; - ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E)); - EXPECT_EQ(OK, node1.ClosePort(X)); - EXPECT_EQ(OK, node1.ClosePort(Y)); - - // C goes away as a result of normal proxy removal. - PumpTasks(); - - EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C)); - - // B should have been closed cleanly. - EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); - - // Close A, D, and E. - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(D)); - EXPECT_EQ(OK, node1.ClosePort(E)); - - PumpTasks(); - - // Expect everything to have gone away. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); -} - } // namespace test } // namespace ports } // namespace edk diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.cc b/mojo/edk/system/remote_message_pipe_bootstrap.cc index 2342166..671542f3 100644 --- a/mojo/edk/system/remote_message_pipe_bootstrap.cc +++ b/mojo/edk/system/remote_message_pipe_bootstrap.cc @@ -31,17 +31,18 @@ struct BootstrapData { void RemoteMessagePipeBootstrap::Create( NodeController* node_controller, ScopedPlatformHandle platform_handle, - const ports::PortRef& port) { + const ports::PortRef& port, + const base::Closure& callback) { if (node_controller->io_task_runner()->RunsTasksOnCurrentThread()) { // Owns itself. - new RemoteMessagePipeBootstrap( - node_controller, std::move(platform_handle), port); + new RemoteMessagePipeBootstrap(node_controller, std::move(platform_handle), + port, callback); } else { node_controller->io_task_runner()->PostTask( FROM_HERE, base::Bind(&RemoteMessagePipeBootstrap::Create, base::Unretained(node_controller), - base::Passed(&platform_handle), port)); + base::Passed(&platform_handle), port, callback)); } } @@ -55,9 +56,11 @@ RemoteMessagePipeBootstrap::~RemoteMessagePipeBootstrap() { RemoteMessagePipeBootstrap::RemoteMessagePipeBootstrap( NodeController* node_controller, ScopedPlatformHandle platform_handle, - const ports::PortRef& port) + const ports::PortRef& port, + const base::Closure& callback) : node_controller_(node_controller), local_port_(port), + callback_(callback), io_task_runner_(base::ThreadTaskRunnerHandle::Get()), channel_(Channel::Create(this, std::move(platform_handle), io_task_runner_)) { @@ -116,14 +119,8 @@ void RemoteMessagePipeBootstrap::OnChannelMessage( } peer_info_received_ = true; - - // We need to choose one side to initiate the port merge. It doesn't matter - // who does it as long as they don't both try. Simple solution: pick the one - // with the "smaller" port name. - if (local_port_.name() < data->port_name) { - node_controller_->node()->MergePorts(local_port_, data->node_name, - data->port_name); - } + node_controller_->ConnectToRemotePort( + local_port_, data->node_name, data->port_name, callback_); // Send another ping to the other end to trigger shutdown. This may race with // the other end sending its own ping, but it doesn't matter. Whoever wins diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.h b/mojo/edk/system/remote_message_pipe_bootstrap.h index 2a24680..911d518 100644 --- a/mojo/edk/system/remote_message_pipe_bootstrap.h +++ b/mojo/edk/system/remote_message_pipe_bootstrap.h @@ -7,6 +7,7 @@ #include <string> +#include "base/callback.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/message_loop/message_loop.h" @@ -27,20 +28,17 @@ class NodeController; // // The bootstrapping procedure the same on either end: // -// 1. Select a local port P to be merged with a remote port. +// 1. Create a local port P. // 2. Write the local node name and P's name to the bootstrap pipe. // 3. When a message is read from the pipe: // - If it's the first message read, extract the remote node+port name and -// and send an empty ACK message on the pipe. +// initialize the local port. Send an empty ACK message on the pipe. // - If it's the second message read, close the channel, and delete |this|. // 4. When an error occus on the pipe, delete |this|. // // Excluding irrecoverable error conditions such as either process dying, // armageddon, etc., this ensures neither end closes the channel until both ends -// are aware of each other's port-to-merge. -// -// At step 3, one side of the channel is chosen to issue a message at the Ports -// layer which eventually merges the two ports. +// have intiailized their corresponding local port. class RemoteMessagePipeBootstrap : public Channel::Delegate, public base::MessageLoop::DestructionObserver { @@ -50,18 +48,22 @@ class RemoteMessagePipeBootstrap // |port| must be a reference to an uninitialized local port. static void Create(NodeController* node_controller, ScopedPlatformHandle platform_handle, - const ports::PortRef& port); + const ports::PortRef& port, + const base::Closure& callback); protected: - explicit RemoteMessagePipeBootstrap(NodeController* node_controller, - ScopedPlatformHandle platform_handle, - const ports::PortRef& port); + explicit RemoteMessagePipeBootstrap( + NodeController* node_controller, + ScopedPlatformHandle platform_handle, + const ports::PortRef& port, + const base::Closure& callback); void ShutDown(); bool shutting_down_ = false; NodeController* node_controller_; const ports::PortRef local_port_; + base::Closure callback_; scoped_refptr<base::TaskRunner> io_task_runner_; scoped_refptr<Channel> channel_; diff --git a/mojo/shell/runner/child/runner_connection.cc b/mojo/shell/runner/child/runner_connection.cc index 313c00a..75b4648 100644 --- a/mojo/shell/runner/child/runner_connection.cc +++ b/mojo/shell/runner/child/runner_connection.cc @@ -74,6 +74,13 @@ class Blocker { using GotApplicationRequestCallback = base::Callback<void(InterfaceRequest<mojom::ShellClient>)>; +void OnCreateMessagePipe(ScopedMessagePipeHandle* result, + Blocker::Unblocker unblocker, + ScopedMessagePipeHandle pipe) { + *result = std::move(pipe); + unblocker.Unblock(base::Bind(&base::DoNothing)); +} + void OnGotApplicationRequest(InterfaceRequest<mojom::ShellClient>* out_request, InterfaceRequest<mojom::ShellClient> request) { *out_request = std::move(request); @@ -222,11 +229,14 @@ bool RunnerConnectionImpl::WaitForApplicationRequest( std::string primordial_pipe_token = base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( switches::kPrimordialPipeToken); - handle = edk::CreateChildMessagePipe(primordial_pipe_token); + Blocker blocker; + edk::CreateChildMessagePipe( + primordial_pipe_token, + base::Bind(&OnCreateMessagePipe, base::Unretained(&handle), + blocker.GetUnblocker())); + blocker.Block(); } - DCHECK(handle.is_valid()); - Blocker blocker; controller_runner_->PostTask( FROM_HERE, diff --git a/mojo/shell/runner/host/child_process.cc b/mojo/shell/runner/host/child_process.cc index 75230f0..0208b84 100644 --- a/mojo/shell/runner/host/child_process.cc +++ b/mojo/shell/runner/host/child_process.cc @@ -51,6 +51,10 @@ namespace shell { namespace { +void DidCreateChannel(embedder::ChannelInfo* channel_info) { + DVLOG(2) << "ChildControllerImpl::DidCreateChannel()"; +} + // Blocker --------------------------------------------------------------------- // Blocks a thread until another thread unblocks it, at which point it unblocks @@ -296,14 +300,34 @@ scoped_ptr<mojo::shell::LinuxSandbox> InitializeSandbox() { } #endif -ScopedMessagePipeHandle InitializeHostMessagePipe( - edk::ScopedPlatformHandle platform_channel, - scoped_refptr<base::TaskRunner> io_task_runner) { - edk::SetParentPipeHandle(std::move(platform_channel)); - std::string primordial_pipe_token = - base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( - switches::kPrimordialPipeToken); - return edk::CreateChildMessagePipe(primordial_pipe_token); +void InitializeHostMessagePipe( + embedder::ScopedPlatformHandle platform_channel, + scoped_refptr<base::TaskRunner> io_task_runner, + const base::Callback<void(ScopedMessagePipeHandle)>& callback) { + if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { + embedder::SetParentPipeHandle(std::move(platform_channel)); + std::string primordial_pipe_token = + base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( + switches::kPrimordialPipeToken); + edk::CreateChildMessagePipe(primordial_pipe_token, callback); + } else { + ScopedMessagePipeHandle host_message_pipe; + host_message_pipe = + embedder::CreateChannel(std::move(platform_channel), + base::Bind(&DidCreateChannel), io_task_runner); + callback.Run(std::move(host_message_pipe)); + } +} + +void OnHostMessagePipeCreated(AppContext* app_context, + base::NativeLibrary app_library, + const Blocker::Unblocker& unblocker, + ScopedMessagePipeHandle pipe) { + app_context->controller_runner()->PostTask( + FROM_HERE, + base::Bind(&ChildControllerImpl::Init, base::Unretained(app_context), + base::Unretained(app_library), base::Passed(&pipe), + unblocker)); } } // namespace @@ -336,8 +360,8 @@ int ChildProcessMain() { sandbox = InitializeSandbox(); #endif - edk::ScopedPlatformHandle platform_channel = - edk::PlatformChannelPair::PassClientHandleFromParentProcess( + embedder::ScopedPlatformHandle platform_channel = + embedder::PlatformChannelPair::PassClientHandleFromParentProcess( command_line); CHECK(platform_channel.is_valid()); @@ -348,12 +372,13 @@ int ChildProcessMain() { app_context.Init(); app_context.StartControllerThread(); - ScopedMessagePipeHandle host_pipe = InitializeHostMessagePipe( - std::move(platform_channel), app_context.io_runner()); app_context.controller_runner()->PostTask( FROM_HERE, - base::Bind(&ChildControllerImpl::Init, &app_context, app_library, - base::Passed(&host_pipe), blocker.GetUnblocker())); + base::Bind( + &InitializeHostMessagePipe, base::Passed(&platform_channel), + make_scoped_refptr(app_context.io_runner()), + base::Bind(&OnHostMessagePipeCreated, base::Unretained(&app_context), + base::Unretained(app_library), blocker.GetUnblocker()))); // This will block, then run whatever the controller wants. blocker.Block(); diff --git a/mojo/shell/runner/host/child_process_host.cc b/mojo/shell/runner/host/child_process_host.cc index 2b3cfc9..9a5b439 100644 --- a/mojo/shell/runner/host/child_process_host.cc +++ b/mojo/shell/runner/host/child_process_host.cc @@ -35,24 +35,55 @@ namespace mojo { namespace shell { +ChildProcessHost::PipeHolder::PipeHolder() {} + +void ChildProcessHost::PipeHolder::Reject() { + base::AutoLock lock(lock_); + reject_pipe_ = true; + pipe_.reset(); +} + +void ChildProcessHost::PipeHolder::SetPipe(ScopedMessagePipeHandle pipe) { + base::AutoLock lock(lock_); + DCHECK(!pipe_.is_valid()); + if (!reject_pipe_) + pipe_ = std::move(pipe); +} + +ScopedMessagePipeHandle ChildProcessHost::PipeHolder::PassPipe() { + base::AutoLock lock(lock_); + DCHECK(pipe_.is_valid()); + return std::move(pipe_); +} + +ChildProcessHost::PipeHolder::~PipeHolder() {} + ChildProcessHost::ChildProcessHost(base::TaskRunner* launch_process_runner, bool start_sandboxed, const base::FilePath& app_path) : launch_process_runner_(launch_process_runner), start_sandboxed_(start_sandboxed), app_path_(app_path), + channel_info_(nullptr), start_child_process_event_(false, false), weak_factory_(this) { - node_channel_.reset(new edk::PlatformChannelPair); - primordial_pipe_token_ = edk::GenerateRandomToken(); - controller_.Bind( - InterfacePtrInfo<mojom::ChildController>( - edk::CreateParentMessagePipe(primordial_pipe_token_), 0u)); + pipe_holder_ = new PipeHolder(); + if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { + node_channel_.reset(new edk::PlatformChannelPair); + primordial_pipe_token_ = edk::GenerateRandomToken(); + } else { + pipe_holder_->SetPipe(embedder::CreateChannel( + platform_channel_pair_.PassServerHandle(), + base::Bind(&ChildProcessHost::DidCreateChannel, base::Unretained(this)), + base::ThreadTaskRunnerHandle::Get())); + OnMessagePipeCreated(); + } } ChildProcessHost::ChildProcessHost(ScopedHandle channel) : launch_process_runner_(nullptr), start_sandboxed_(false), + channel_info_(nullptr), start_child_process_event_(false, false), weak_factory_(this) { CHECK(channel.is_valid()); @@ -68,11 +99,40 @@ ChildProcessHost::~ChildProcessHost() { void ChildProcessHost::Start(const ProcessReadyCallback& callback) { DCHECK(!child_process_.IsValid()); + DCHECK(process_ready_callback_.is_null()); + + process_ready_callback_ = callback; + if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { + // With the new EDK, bootstrap message pipes are created asynchronously. + // We recieve the bound pipe (if successful) on an arbitrary thread, + // stash it in the thread-safe |pipe_holder_|, and then try to call + // OnMessagePipeCreated() on the host's main thread. + // + // Because of the way the launcher process shuts down, it's possible for + // the main thread's MessageLoop to stop running (but not yet be destroyed!) + // while this boostrap is pending, resulting in OnMessagePipeCreated() never + // being called. + // + // A typical child process (i.e. one using ShellConnection to bind the other + // end of this pipe) may hang forever waiting for an Initialize() message + // unless the pipe is closed. This in turn means that Join() could hang + // waiting for the process to exit. Deadlock! + // + // |pipe_holder_| exists for this reason. If it's still holding onto the + // pipe when Join() is called, the pipe will be closed. + DCHECK(!primordial_pipe_token_.empty()); + edk::CreateParentMessagePipe( + primordial_pipe_token_, + base::Bind(&OnParentMessagePipeCreated, pipe_holder_, + base::ThreadTaskRunnerHandle::Get(), + base::Bind(&ChildProcessHost::OnMessagePipeCreated, + weak_factory_.GetWeakPtr()))); + } + launch_process_runner_->PostTaskAndReply( FROM_HERE, base::Bind(&ChildProcessHost::DoLaunch, base::Unretained(this)), - base::Bind(&ChildProcessHost::DidStart, - weak_factory_.GetWeakPtr(), callback)); + base::Bind(&ChildProcessHost::DidStart, weak_factory_.GetWeakPtr())); } int ChildProcessHost::Join() { @@ -82,6 +142,10 @@ int ChildProcessHost::Join() { controller_ = mojom::ChildControllerPtr(); DCHECK(child_process_.IsValid()); + // Ensure the child pipe is closed even if it wasn't yet connected to the + // controller. + pipe_holder_->Reject(); + int rv = -1; LOG_IF(ERROR, !child_process_.WaitForExit(&rv)) << "Failed to wait for child process"; @@ -108,11 +172,11 @@ void ChildProcessHost::ExitNow(int32_t exit_code) { controller_->ExitNow(exit_code); } -void ChildProcessHost::DidStart(const ProcessReadyCallback& callback) { +void ChildProcessHost::DidStart() { DVLOG(2) << "ChildProcessHost::DidStart()"; if (child_process_.IsValid()) { - callback.Run(child_process_.Pid()); + MaybeNotifyProcessReady(); } else { LOG(ERROR) << "Failed to start child process"; AppCompleted(MOJO_RESULT_UNKNOWN); @@ -214,5 +278,34 @@ void ChildProcessHost::AppCompleted(int32_t result) { } } +void ChildProcessHost::DidCreateChannel(embedder::ChannelInfo* channel_info) { + DVLOG(2) << "AppChildProcessHost::DidCreateChannel()"; + + DCHECK(channel_info || + base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")); + channel_info_ = channel_info; +} + +void ChildProcessHost::OnMessagePipeCreated() { + controller_.Bind( + InterfacePtrInfo<mojom::ChildController>(pipe_holder_->PassPipe(), 0u)); + MaybeNotifyProcessReady(); +} + +void ChildProcessHost::MaybeNotifyProcessReady() { + if (controller_.is_bound() && child_process_.IsValid()) + process_ready_callback_.Run(child_process_.Pid()); +} + +// static +void ChildProcessHost::OnParentMessagePipeCreated( + scoped_refptr<PipeHolder> holder, + scoped_refptr<base::TaskRunner> callback_task_runner, + const base::Closure& callback, + ScopedMessagePipeHandle pipe) { + holder->SetPipe(std::move(pipe)); + callback_task_runner->PostTask(FROM_HERE, callback); +} + } // namespace shell } // namespace mojo diff --git a/mojo/shell/runner/host/child_process_host.h b/mojo/shell/runner/host/child_process_host.h index 2664d30..906a1a2 100644 --- a/mojo/shell/runner/host/child_process_host.h +++ b/mojo/shell/runner/host/child_process_host.h @@ -71,24 +71,70 @@ class ChildProcessHost { void ExitNow(int32_t exit_code); protected: - void DidStart(const ProcessReadyCallback& callback); + void DidStart(); private: + // A thread-safe holder for the bootstrap message pipe to this child process. + // The pipe is established on an arbitrary thread and may not be connected + // until the host's message loop has stopped running. + class PipeHolder : public base::RefCountedThreadSafe<PipeHolder> { + public: + PipeHolder(); + + void Reject(); + void SetPipe(ScopedMessagePipeHandle pipe); + ScopedMessagePipeHandle PassPipe(); + + private: + friend class base::RefCountedThreadSafe<PipeHolder>; + + ~PipeHolder(); + + base::Lock lock_; + bool reject_pipe_ = false; + ScopedMessagePipeHandle pipe_; + + DISALLOW_COPY_AND_ASSIGN(PipeHolder); + }; + void DoLaunch(); void AppCompleted(int32_t result); + // Callback for |embedder::CreateChannel()|. + void DidCreateChannel(embedder::ChannelInfo* channel_info); + + // Called once |pipe_holder_| is bound to a pipe. + void OnMessagePipeCreated(); + + // Called when the child process is launched and when the bootstrap + // message pipe is created. Once both things have happened (which may happen + // in either order), |process_ready_callback_| is invoked. + void MaybeNotifyProcessReady(); + + // Callback used to receive the child message pipe from the ports EDK. + // This may be called on any thread. It will always stash the pipe in + // |holder|, and it will then attempt to call |callback| on + // |callback_task_runner| (which may or may not still be running tasks.) + static void OnParentMessagePipeCreated( + scoped_refptr<PipeHolder> holder, + scoped_refptr<base::TaskRunner> callback_task_runner, + const base::Closure& callback, + ScopedMessagePipeHandle pipe); + scoped_refptr<base::TaskRunner> launch_process_runner_; bool start_sandboxed_; const base::FilePath app_path_; base::Process child_process_; // Used for the ChildController binding. - edk::PlatformChannelPair platform_channel_pair_; + embedder::PlatformChannelPair platform_channel_pair_; mojom::ChildControllerPtr controller_; + embedder::ChannelInfo* channel_info_; mojom::ChildController::StartAppCallback on_app_complete_; - edk::HandlePassingInformation handle_passing_info_; + embedder::HandlePassingInformation handle_passing_info_; - // Used to back the NodeChannel between the parent and child node. + // Used only when --use-new-edk is specified. Used to back the NodeChannel + // between the parent and child node. scoped_ptr<edk::PlatformChannelPair> node_channel_; // Since Start() calls a method on another thread, we use an event to block @@ -98,6 +144,14 @@ class ChildProcessHost { // A token the child can use to connect a primordial pipe to the host. std::string primordial_pipe_token_; + // Holds the message pipe to the child process until it is either closed or + // bound to the controller interface. + scoped_refptr<PipeHolder> pipe_holder_; + + // Invoked exactly once, as soon as the child process's ID is known and + // a pipe to the child has been established. + ProcessReadyCallback process_ready_callback_; + base::WeakPtrFactory<ChildProcessHost> weak_factory_; DISALLOW_COPY_AND_ASSIGN(ChildProcessHost); |