diff options
24 files changed, 748 insertions, 694 deletions
diff --git a/content/common/mojo/channel_init.cc b/content/common/mojo/channel_init.cc index 8eb156a..3b7fe21 100644 --- a/content/common/mojo/channel_init.cc +++ b/content/common/mojo/channel_init.cc @@ -19,17 +19,6 @@ namespace content { -namespace { - -void CallMessagePipeCallbackOnThread( - const base::Callback<void(mojo::ScopedMessagePipeHandle)>& callback, - scoped_refptr<base::TaskRunner> task_runner, - mojo::ScopedMessagePipeHandle pipe) { - task_runner->PostTask(FROM_HERE, base::Bind(callback, base::Passed(&pipe))); -} - -} // namespace - ChannelInit::ChannelInit() : channel_info_(nullptr), weak_factory_(this) {} ChannelInit::~ChannelInit() { @@ -45,14 +34,9 @@ void ChannelInit::Init( scoped_ptr<IPC::ScopedIPCSupport> ipc_support( new IPC::ScopedIPCSupport(io_thread_task_runner)); if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { - mojo::edk::CreateMessagePipe( - mojo::edk::ScopedPlatformHandle(mojo::edk::PlatformHandle(file)), - base::Bind(&CallMessagePipeCallbackOnThread, - base::Bind(&ChannelInit::OnCreateMessagePipe, - weak_factory_.GetWeakPtr(), - base::Passed(&ipc_support), - callback), - base::ThreadTaskRunnerHandle::Get())); + ipc_support_ = std::move(ipc_support); + callback.Run(mojo::edk::CreateMessagePipe( + mojo::edk::ScopedPlatformHandle(mojo::edk::PlatformHandle(file)))); } else { mojo::ScopedMessagePipeHandle message_pipe = mojo::embedder::CreateChannel( mojo::embedder::ScopedPlatformHandle( @@ -86,12 +70,4 @@ void ChannelInit::OnCreatedChannel( self->ipc_support_ = std::move(ipc_support); } -void ChannelInit::OnCreateMessagePipe( - scoped_ptr<IPC::ScopedIPCSupport> ipc_support, - const base::Callback<void(mojo::ScopedMessagePipeHandle)>& callback, - mojo::ScopedMessagePipeHandle pipe) { - ipc_support_ = std::move(ipc_support); - callback.Run(std::move(pipe)); -} - } // namespace content diff --git a/content/common/mojo/channel_init.h b/content/common/mojo/channel_init.h index 4c19053..34d0c95 100644 --- a/content/common/mojo/channel_init.h +++ b/content/common/mojo/channel_init.h @@ -47,12 +47,6 @@ class CONTENT_EXPORT ChannelInit { scoped_ptr<IPC::ScopedIPCSupport> ipc_support, mojo::embedder::ChannelInfo* channel); - // Callback used with the new ports EDK on pipe creation. - void OnCreateMessagePipe( - scoped_ptr<IPC::ScopedIPCSupport> ipc_support, - const base::Callback<void(mojo::ScopedMessagePipeHandle)>& callback, - mojo::ScopedMessagePipeHandle pipe); - // If non-null the channel has been established. mojo::embedder::ChannelInfo* channel_info_; diff --git a/mash/shell/shell_application_delegate.cc b/mash/shell/shell_application_delegate.cc index 032ad13..a88c6d6 100644 --- a/mash/shell/shell_application_delegate.cc +++ b/mash/shell/shell_application_delegate.cc @@ -117,8 +117,12 @@ void ShellApplicationDelegate::StartRestartableService( // TODO(beng): This would be the place to insert logic that counted restarts // to avoid infinite crash-restart loops. scoped_ptr<mojo::Connection> connection = shell_->Connect(url); - connection->SetRemoteServiceProviderConnectionErrorHandler(restart_callback); - connections_[url] = std::move(connection); + // Note: |connection| may be null if we've lost our connection to the shell. + if (connection) { + connection->SetRemoteServiceProviderConnectionErrorHandler( + restart_callback); + connections_[url] = std::move(connection); + } } } // namespace shell diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc index d1ab6c6..926ee18 100644 --- a/mojo/edk/embedder/embedder.cc +++ b/mojo/edk/embedder/embedder.cc @@ -107,29 +107,37 @@ void ShutdownIPCSupport() { ScopedMessagePipeHandle CreateMessagePipe( ScopedPlatformHandle platform_handle) { - NOTREACHED(); - return ScopedMessagePipeHandle(); + DCHECK(internal::g_core); + return internal::g_core->CreateMessagePipe(std::move(platform_handle)); } void CreateMessagePipe( ScopedPlatformHandle platform_handle, const base::Callback<void(ScopedMessagePipeHandle)>& callback) { DCHECK(internal::g_core); - internal::g_core->CreateMessagePipe(std::move(platform_handle), callback); + callback.Run(CreateMessagePipe(std::move(platform_handle))); +} + +ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token) { + DCHECK(internal::g_core); + return internal::g_core->CreateParentMessagePipe(token); } void CreateParentMessagePipe( const std::string& token, const base::Callback<void(ScopedMessagePipeHandle)>& callback) { + callback.Run(CreateParentMessagePipe(token)); +} + +ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token) { DCHECK(internal::g_core); - internal::g_core->CreateParentMessagePipe(token, callback); + return internal::g_core->CreateChildMessagePipe(token); } void CreateChildMessagePipe( const std::string& token, const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - DCHECK(internal::g_core); - internal::g_core->CreateChildMessagePipe(token, callback); + callback.Run(CreateChildMessagePipe(token)); } std::string GenerateRandomToken() { diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h index b43748c..49b64c4 100644 --- a/mojo/edk/embedder/embedder.h +++ b/mojo/edk/embedder/embedder.h @@ -118,7 +118,12 @@ MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupportOnIOThread(); // |OnShutdownComplete()|. MOJO_SYSTEM_IMPL_EXPORT void ShutdownIPCSupport(); -// Unused. Crashes. Only here for linking. +// Creates a message pipe over an arbitrary platform channel. The other end of +// the channel must also be passed to this function. Either endpoint can be in +// any process. +// +// Note that the channel is only used to negotiate pipe connection, not as the +// transport for messages on the pipe. MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle CreateMessagePipe(ScopedPlatformHandle platform_handle); @@ -128,13 +133,10 @@ CreateMessagePipe(ScopedPlatformHandle platform_handle); // either PreInitializeChildProcess() or SetParentPipe() must have been been // called at least once already. // -// Note: This only exists for backwards compatibility with embedders that rely -// on mojo::embedder::CreateChannel() behavior. If you have a means of passing -// platform handles around, you can probably also pass strings around. If you -// can pass strings around, use CreateParentMessagePipe() and -// CreateChlidMessagePipe() instead (see below.) -// // |callback| must be safe to call from any thread. +// +// DEPRECATED: Please don't use this. Use the synchronous version above. This +// is now merely an inconvenient wrapper for that. MOJO_SYSTEM_IMPL_EXPORT void CreateMessagePipe( ScopedPlatformHandle platform_handle, @@ -143,8 +145,17 @@ CreateMessagePipe( // Creates a message pipe from a token. A child embedder must also have this // token and call CreateChildMessagePipe() with it in order for the pipe to get // connected. +MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle +CreateParentMessagePipe(const std::string& token); + +// Creates a message pipe from a token. A child embedder must also have this +// token and call CreateChildMessagePipe() with it in order for the pipe to get +// connected. // // |callback| must be safe to call from any thread. +// +// DEPRECATED: Please don't use this. Use the synchronous version above. This +// is now merely an inconvenient wrapper for that. MOJO_SYSTEM_IMPL_EXPORT void CreateParentMessagePipe( const std::string& token, @@ -153,8 +164,17 @@ CreateParentMessagePipe( // Creates a message pipe from a token in a child process. The parent must also // have this token and call CreateParentMessagePipe() with it in order for the // pipe to get connected. +MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle +CreateChildMessagePipe(const std::string& token); + +// Creates a message pipe from a token in a child process. The parent must also +// have this token and call CreateParentMessagePipe() with it in order for the +// pipe to get connected. // // |callback| must be safe to call from any thread. +// +// DEPRECATED: Please don't use this. Use the synchronous version above. This +// is now merely an inconvenient wrapper for that. MOJO_SYSTEM_IMPL_EXPORT void CreateChildMessagePipe( const std::string& token, diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc index aad9541..d0b7b67 100644 --- a/mojo/edk/system/core.cc +++ b/mojo/edk/system/core.cc @@ -44,19 +44,9 @@ namespace { // This is an unnecessarily large limit that is relatively easy to enforce. const uint32_t kMaxHandlesPerMessage = 1024 * 1024; -void OnPortConnected( - Core* core, - int endpoint, - const base::Callback<void(ScopedMessagePipeHandle)>& callback, - const ports::PortRef& port) { - // TODO: Maybe we could negotiate a pipe ID for cross-process pipes too; - // for now we just use 0x7F7F7F7F7F7F7F7F. In practice these are used for - // bootstrap and aren't passed around, so tracking them is less important. - MojoHandle handle = core->AddDispatcher( - new MessagePipeDispatcher(core->GetNodeController(), port, - 0x7f7f7f7f7f7f7f7fUL, endpoint)); - callback.Run(ScopedMessagePipeHandle(MessagePipeHandle(handle))); -} +// TODO: Maybe we could negotiate a debugging pipe ID for cross-process pipes +// too; for now we just use a constant. This only affects bootstrap pipes. +const uint64_t kUnknownPipeIdForDebug = 0x7f7f7f7f7f7f7f7fUL; } // namespace @@ -160,32 +150,37 @@ void Core::RequestShutdown(const base::Closure& callback) { GetNodeController()->RequestShutdown(on_shutdown); } -void Core::CreateMessagePipe( - ScopedPlatformHandle platform_handle, - const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - ports::PortRef port; - GetNodeController()->node()->CreateUninitializedPort(&port); +ScopedMessagePipeHandle Core::CreateMessagePipe( + ScopedPlatformHandle platform_handle) { + ports::PortRef port0, port1; + GetNodeController()->node()->CreatePortPair(&port0, &port1); + MojoHandle handle = AddDispatcher( + new MessagePipeDispatcher(GetNodeController(), port0, + kUnknownPipeIdForDebug, 0)); RemoteMessagePipeBootstrap::Create( - GetNodeController(), std::move(platform_handle), port, - base::Bind(&OnPortConnected, base::Unretained(this), 0, callback, port)); -} - -void Core::CreateParentMessagePipe( - const std::string& token, - const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - GetNodeController()->ReservePort( - token, - base::Bind(&OnPortConnected, base::Unretained(this), 0, callback)); -} - -void Core::CreateChildMessagePipe( - const std::string& token, - const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - ports::PortRef port; - GetNodeController()->node()->CreateUninitializedPort(&port); - GetNodeController()->ConnectToParentPort( - port, token, - base::Bind(&OnPortConnected, base::Unretained(this), 1, callback, port)); + GetNodeController(), std::move(platform_handle), port1); + return ScopedMessagePipeHandle(MessagePipeHandle(handle)); +} + +ScopedMessagePipeHandle Core::CreateParentMessagePipe( + const std::string& token) { + ports::PortRef port0, port1; + GetNodeController()->node()->CreatePortPair(&port0, &port1); + MojoHandle handle = AddDispatcher( + new MessagePipeDispatcher(GetNodeController(), port0, + kUnknownPipeIdForDebug, 0)); + GetNodeController()->ReservePort(token, port1); + return ScopedMessagePipeHandle(MessagePipeHandle(handle)); +} + +ScopedMessagePipeHandle Core::CreateChildMessagePipe(const std::string& token) { + ports::PortRef port0, port1; + GetNodeController()->node()->CreatePortPair(&port0, &port1); + MojoHandle handle = AddDispatcher( + new MessagePipeDispatcher(GetNodeController(), port0, + kUnknownPipeIdForDebug, 1)); + GetNodeController()->MergePortIntoParent(token, port1); + return ScopedMessagePipeHandle(MessagePipeHandle(handle)); } MojoResult Core::AsyncWait(MojoHandle handle, diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h index 8e9af8e..603cba9 100644 --- a/mojo/edk/system/core.h +++ b/mojo/edk/system/core.h @@ -52,26 +52,19 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { // Called in a child process exactly once during early initialization. void InitChild(ScopedPlatformHandle platform_handle); - // This creates a message pipe endpoint connected to an endpoint in a remote + // Creates a message pipe endpoint connected to an endpoint in a remote // embedder. |platform_handle| is used as a channel to negotiate the - // connection. This is only here to facilitate legacy embedder code. See - // mojo::edk::CreateMessagePipe in mojo/edk/embedder/embedder.h. - void CreateMessagePipe( - ScopedPlatformHandle platform_handle, - const base::Callback<void(ScopedMessagePipeHandle)>& callback); + // connection. + ScopedMessagePipeHandle CreateMessagePipe( + ScopedPlatformHandle platform_handle); // Creates a message pipe endpoint associated with |token|, which a child // holding the token can later locate and connect to. - void CreateParentMessagePipe( - const std::string& token, - const base::Callback<void(ScopedMessagePipeHandle)>& callback); - - // Creates a message pipe endpoint associated with |token|, which will be - // passed to the parent in order to find an associated remote port and connect - // to it. - void CreateChildMessagePipe( - const std::string& token, - const base::Callback<void(ScopedMessagePipeHandle)>& callback); + ScopedMessagePipeHandle CreateParentMessagePipe(const std::string& token); + + // Creates a message pipe endpoint and connects it to a pipe the parent has + // associated with |token|. + ScopedMessagePipeHandle CreateChildMessagePipe(const std::string& token); MojoHandle AddDispatcher(scoped_refptr<Dispatcher> dispatcher); diff --git a/mojo/edk/system/multiprocess_message_pipe_unittest.cc b/mojo/edk/system/multiprocess_message_pipe_unittest.cc index 00c2ab5..ece85a1 100644 --- a/mojo/edk/system/multiprocess_message_pipe_unittest.cc +++ b/mojo/edk/system/multiprocess_message_pipe_unittest.cc @@ -19,6 +19,7 @@ #include "base/logging.h" #include "base/strings/string_split.h" #include "build/build_config.h" +#include "mojo/edk/embedder/platform_channel_pair.h" #include "mojo/edk/embedder/scoped_platform_handle.h" #include "mojo/edk/system/handle_signals_state.h" #include "mojo/edk/system/test_utils.h" @@ -1271,6 +1272,44 @@ TEST_F(MultiprocessMessagePipeTest, WriteCloseSendPeer) { END_CHILD() } +DEFINE_TEST_CLIENT_TEST_WITH_PIPE(BootstrapMessagePipeAsyncClient, + MultiprocessMessagePipeTest, parent) { + // Receive one end of a platform channel from the parent. + MojoHandle channel_handle; + EXPECT_EQ("hi", ReadMessageWithHandles(parent, &channel_handle, 1)); + ScopedPlatformHandle channel; + EXPECT_EQ(MOJO_RESULT_OK, + edk::PassWrappedPlatformHandle(channel_handle, &channel)); + ASSERT_TRUE(channel.is_valid()); + + // Create a new pipe using our end of the channel. + ScopedMessagePipeHandle pipe = edk::CreateMessagePipe(std::move(channel)); + + // Ensure that we can read and write on the new pipe. + VerifyEcho(pipe.get().value(), "goodbye"); +} + +TEST_F(MultiprocessMessagePipeTest, BootstrapMessagePipeAsync) { + // Tests that new cross-process message pipes can be created synchronously + // using asynchronous negotiation over an arbitrary platform channel. + RUN_CHILD_ON_PIPE(BootstrapMessagePipeAsyncClient, child) + // Pass one end of a platform channel to the child. + PlatformChannelPair platform_channel; + MojoHandle client_channel_handle; + EXPECT_EQ(MOJO_RESULT_OK, + CreatePlatformHandleWrapper(platform_channel.PassClientHandle(), + &client_channel_handle)); + WriteMessageWithHandles(child, "hi", &client_channel_handle, 1); + + // Create a new pipe using our end of the channel. + ScopedMessagePipeHandle pipe = + edk::CreateMessagePipe(platform_channel.PassServerHandle()); + + // Ensure that we can read and write on the new pipe. + VerifyEcho(pipe.get().value(), "goodbye"); + END_CHILD() +} + } // namespace } // namespace edk } // namespace mojo diff --git a/mojo/edk/system/node_channel.cc b/mojo/edk/system/node_channel.cc index 4fb0ac4..0277d46 100644 --- a/mojo/edk/system/node_channel.cc +++ b/mojo/edk/system/node_channel.cc @@ -29,8 +29,7 @@ enum class MessageType : uint32_t { BROKER_CLIENT_ADDED, ACCEPT_BROKER_CLIENT, PORTS_MESSAGE, - REQUEST_PORT_CONNECTION, - CONNECT_TO_PORT, + REQUEST_PORT_MERGE, REQUEST_INTRODUCTION, INTRODUCE, #if defined(OS_WIN) @@ -74,15 +73,10 @@ struct AcceptBrokerClientData { // This is followed by arbitrary payload data which is interpreted as a token // string for port location. -struct RequestPortConnectionData { +struct RequestPortMergeData { ports::PortName connector_port_name; }; -struct ConnectToPortData { - ports::PortName connector_port_name; - ports::PortName connectee_port_name; -}; - // Used for both REQUEST_INTRODUCTION and INTRODUCE. // // For INTRODUCE the message also includes a valid platform handle for a channel @@ -267,28 +261,17 @@ void NodeChannel::PortsMessage(Channel::MessagePtr message) { WriteChannelMessage(std::move(message)); } -void NodeChannel::RequestPortConnection( - const ports::PortName& connector_port_name, - const std::string& token) { - RequestPortConnectionData* data; +void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name, + const std::string& token) { + RequestPortMergeData* data; Channel::MessagePtr message = CreateMessage( - MessageType::REQUEST_PORT_CONNECTION, - sizeof(RequestPortConnectionData) + token.size(), 0, &data); + MessageType::REQUEST_PORT_MERGE, + sizeof(RequestPortMergeData) + token.size(), 0, &data); data->connector_port_name = connector_port_name; memcpy(data + 1, token.data(), token.size()); WriteChannelMessage(std::move(message)); } -void NodeChannel::ConnectToPort(const ports::PortName& connector_port_name, - const ports::PortName& connectee_port_name) { - ConnectToPortData* data; - Channel::MessagePtr message = CreateMessage( - MessageType::CONNECT_TO_PORT, sizeof(ConnectToPortData), 0, &data); - data->connector_port_name = connector_port_name; - data->connectee_port_name = connectee_port_name; - WriteChannelMessage(std::move(message)); -} - void NodeChannel::RequestIntroduction(const ports::NodeName& name) { IntroductionData* data; Channel::MessagePtr message = CreateMessage( @@ -452,24 +435,16 @@ void NodeChannel::OnChannelMessage(const void* payload, break; } - case MessageType::REQUEST_PORT_CONNECTION: { - const RequestPortConnectionData* data; + case MessageType::REQUEST_PORT_MERGE: { + const RequestPortMergeData* data; GetMessagePayload(payload, &data); const char* token_data = reinterpret_cast<const char*>(data + 1); const size_t token_size = payload_size - sizeof(*data) - sizeof(Header); std::string token(token_data, token_size); - delegate_->OnRequestPortConnection(remote_node_name_, - data->connector_port_name, token); - break; - } - - case MessageType::CONNECT_TO_PORT: { - const ConnectToPortData* data; - GetMessagePayload(payload, &data); - delegate_->OnConnectToPort(remote_node_name_, data->connector_port_name, - data->connectee_port_name); + delegate_->OnRequestPortMerge(remote_node_name_, + data->connector_port_name, token); break; } diff --git a/mojo/edk/system/node_channel.h b/mojo/edk/system/node_channel.h index a9bc520..2bc86bd 100644 --- a/mojo/edk/system/node_channel.h +++ b/mojo/edk/system/node_channel.h @@ -44,14 +44,9 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>, const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel) = 0; virtual void OnPortsMessage(Channel::MessagePtr message) = 0; - virtual void OnRequestPortConnection( - const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const std::string& token) = 0; - virtual void OnConnectToPort( - const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const ports::PortName& connectee_port_name) = 0; + virtual void OnRequestPortMerge(const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const std::string& token) = 0; virtual void OnRequestIntroduction(const ports::NodeName& from_node, const ports::NodeName& name) = 0; virtual void OnIntroduce(const ports::NodeName& from_node, @@ -103,10 +98,8 @@ class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>, void AcceptBrokerClient(const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel); void PortsMessage(Channel::MessagePtr message); - void RequestPortConnection(const ports::PortName& connector_port_name, - const std::string& token); - void ConnectToPort(const ports::PortName& connector_port_name, - const ports::PortName& connectee_port_name); + void RequestPortMerge(const ports::PortName& connector_port_name, + const std::string& token); void RequestIntroduction(const ports::NodeName& name); void Introduce(const ports::NodeName& name, ScopedPlatformHandle channel_handle); diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc index 6410cae..38c53df 100644 --- a/mojo/edk/system/node_controller.cc +++ b/mojo/edk/system/node_controller.cc @@ -99,18 +99,6 @@ class ThreadDestructionObserver : } // namespace -NodeController::PendingPortRequest::PendingPortRequest() {} - -NodeController::PendingPortRequest::~PendingPortRequest() {} - -NodeController::ReservedPort::ReservedPort() {} - -NodeController::ReservedPort::~ReservedPort() {} - -NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {} - -NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {} - NodeController::~NodeController() {} NodeController::NodeController(Core* core) @@ -182,18 +170,25 @@ int NodeController::SendMessage(const ports::PortRef& port, } void NodeController::ReservePort(const std::string& token, - const ReservePortCallback& callback) { - ports::PortRef port; - node_->CreateUninitializedPort(&port); - + const ports::PortRef& port) { DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " << token; base::AutoLock lock(reserved_ports_lock_); - ReservedPort reservation; - reservation.local_port = port; - reservation.callback = callback; - reserved_ports_.insert(std::make_pair(token, reservation)); + auto result = reserved_ports_.insert(std::make_pair(token, port)); + DCHECK(result.second); +} + +void NodeController::MergePortIntoParent(const std::string& token, + const ports::PortRef& port) { + scoped_refptr<NodeChannel> parent = GetParentChannel(); + if (parent) { + parent->RequestPortMerge(port.name(), token); + return; + } + + base::AutoLock lock(pending_port_merges_lock_); + pending_port_merges_.push_back(std::make_pair(token, port)); } scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( @@ -210,42 +205,6 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( return buffer; } -void NodeController::ConnectToParentPort(const ports::PortRef& local_port, - const std::string& token, - const base::Closure& callback) { - io_task_runner_->PostTask( - FROM_HERE, - base::Bind(&NodeController::RequestParentPortConnectionOnIOThread, - base::Unretained(this), local_port, token, callback)); -} - -void NodeController::ConnectToRemotePort( - const ports::PortRef& local_port, - const ports::NodeName& remote_node_name, - const ports::PortName& remote_port_name, - const base::Closure& callback) { - if (remote_node_name == name_) { - // It's possible that two different code paths on the node are trying to - // bootstrap ports to each other (e.g. in Chrome single-process mode) - // without being aware of the fact. In this case we can initialize the port - // immediately (which can fail silently if it's already been initialized by - // the request on the other side), and invoke |callback|. - node_->InitializePort(local_port, name_, remote_port_name); - callback.Run(); - return; - } - - PendingRemotePortConnection connection; - connection.local_port = local_port; - connection.remote_node_name = remote_node_name; - connection.remote_port_name = remote_port_name; - connection.callback = callback; - io_task_runner_->PostTask( - FROM_HERE, - base::Bind(&NodeController::ConnectToRemotePortOnIOThread, - base::Unretained(this), connection)); -} - void NodeController::RequestShutdown(const base::Closure& callback) { { base::AutoLock lock(shutdown_lock_); @@ -304,46 +263,6 @@ void NodeController::ConnectToParentOnIOThread( bootstrap_parent_channel_->Start(); } -void NodeController::RequestParentPortConnectionOnIOThread( - const ports::PortRef& local_port, - const std::string& token, - const base::Closure& callback) { - DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - - scoped_refptr<NodeChannel> parent = GetParentChannel(); - if (!parent) { - PendingPortRequest request; - request.token = token; - request.local_port = local_port; - request.callback = callback; - pending_port_requests_.push_back(request); - return; - } - - pending_parent_port_connections_.insert( - std::make_pair(local_port.name(), callback)); - parent->RequestPortConnection(local_port.name(), token); -} - -void NodeController::ConnectToRemotePortOnIOThread( - const PendingRemotePortConnection& connection) { - scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name); - if (peer) { - // It's safe to initialize the port since we already have a channel to its - // peer. No need to actually send them a message. - int rv = node_->InitializePort(connection.local_port, - connection.remote_node_name, - connection.remote_port_name); - DCHECK_EQ(rv, ports::OK); - connection.callback.Run(); - return; - } - - // Save this for later. We'll initialize the port once this peer is added. - pending_remote_port_connections_[connection.remote_node_name].push_back( - connection); -} - scoped_refptr<NodeChannel> NodeController::GetPeerChannel( const ports::NodeName& name) { base::AutoLock lock(peers_lock_); @@ -414,19 +333,6 @@ void NodeController::AddPeer(const ports::NodeName& name, channel->PortsMessage(std::move(pending_messages.front())); pending_messages.pop(); } - - // Complete any pending port connections to this peer. - auto connections_it = pending_remote_port_connections_.find(name); - if (connections_it != pending_remote_port_connections_.end()) { - for (const auto& connection : connections_it->second) { - int rv = node_->InitializePort(connection.local_port, - connection.remote_node_name, - connection.remote_port_name); - DCHECK_EQ(rv, ports::OK); - connection.callback.Run(); - } - pending_remote_port_connections_.erase(connections_it); - } } void NodeController::DropPeer(const ports::NodeName& name) { @@ -616,6 +522,8 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node, // NOTE: The child does not actually add its parent as a peer until // receiving an AcceptBrokerClient message from the broker. The parent // will request that said message be sent upon receiving AcceptParent. + + DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; } void NodeController::OnAcceptParent(const ports::NodeName& from_node, @@ -763,15 +671,16 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, io_task_runner_); AddPeer(broker_name, broker, true /* start_channel */); } + AddPeer(parent_name, parent, false /* start_channel */); - // Resolve any pending port connections to the parent. - for (const auto& request : pending_port_requests_) { - pending_parent_port_connections_.insert( - std::make_pair(request.local_port.name(), request.callback)); - parent->RequestPortConnection(request.local_port.name(), request.token); + { + // Complete any port merge requests we have waiting for the parent. + base::AutoLock lock(pending_port_merges_lock_); + for (const auto& request : pending_port_merges_) + parent->RequestPortMerge(request.second.name(), request.first); + pending_port_merges_.clear(); } - pending_port_requests_.clear(); // Feed the broker any pending children of our own. while (!pending_broker_clients.empty()) { @@ -824,16 +733,15 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) { AttemptShutdownIfRequested(); } -void NodeController::OnRequestPortConnection( +void NodeController::OnRequestPortMerge( const ports::NodeName& from_node, const ports::PortName& connector_port_name, const std::string& token) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " + DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " << token << " and port " << connector_port_name << "@" << from_node; - ReservePortCallback callback; ports::PortRef local_port; { base::AutoLock lock(reserved_ports_lock_); @@ -843,64 +751,12 @@ void NodeController::OnRequestPortConnection( << token; return; } - local_port = it->second.local_port; - callback = it->second.callback; - reserved_ports_.erase(it); - } - - DCHECK(!callback.is_null()); - - scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); - if (!peer) { - DVLOG(1) << "Ignoring request to connect to port from unknown node " - << from_node; - return; + local_port = it->second; } - // This reserved port should not have been initialized yet. - CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node, - connector_port_name)); - - peer->ConnectToPort(local_port.name(), connector_port_name); - callback.Run(local_port); -} - -void NodeController::OnConnectToPort( - const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const ports::PortName& connectee_port_name) { - DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - - DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port " - << connectee_port_name << " to port " << connector_port_name << "@" - << from_node; - - ports::PortRef connectee_port; - int rv = node_->GetPort(connectee_port_name, &connectee_port); - if (rv != ports::OK) { - DLOG(ERROR) << "Ignoring ConnectToPort for unknown port " - << connectee_port_name; - return; - } - - // It's OK if this port has already been initialized. This message is only - // sent by the remote peer to ensure the port is ready before it starts - // us sending messages to it. - ports::PortStatus port_status; - rv = node_->GetStatus(connectee_port, &port_status); - if (rv == ports::OK) { - DVLOG(1) << "Ignoring ConnectToPort for already-initialized port " - << connectee_port_name; - return; - } - - CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node, - connector_port_name)); - - auto it = pending_parent_port_connections_.find(connectee_port_name); - DCHECK(it != pending_parent_port_connections_.end()); - it->second.Run(); - pending_parent_port_connections_.erase(it); + int rv = node_->MergePorts(local_port, from_node, connector_port_name); + if (rv != ports::OK) + DLOG(ERROR) << "MergePorts failed: " << rv; } void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h index 35148a0..0675e97 100644 --- a/mojo/edk/system/node_controller.h +++ b/mojo/edk/system/node_controller.h @@ -82,42 +82,13 @@ class NodeController : public ports::NodeDelegate, int SendMessage(const ports::PortRef& port_ref, scoped_ptr<PortsMessage>* message); - using ReservePortCallback = base::Callback<void(const ports::PortRef& port)>; + // Reserves a local port |port| associated with |token|. A peer holding a copy + // of |token| can merge one of its own ports into this one. + void ReservePort(const std::string& token, const ports::PortRef& port); - // Reserves a port associated with |token|. A peer may associate one of their - // own ports with this one by sending us a RequestPortConnection message with - // the same token value. - // - // Note that the reservation is made synchronously. In order to avoid races, - // reservations should be acquired before |token| is communicated to any - // potential peer. - // - // |callback| must be runnable on any thread and will be run with a reference - // to the new local port once connected. - void ReservePort(const std::string& token, - const ReservePortCallback& callback); - - // Eventually initializes a local port with a parent port peer identified by - // |token|. The parent should also have |token| and should alrady have - // reserved a port for it. |callback| must be runnable on any thread and will - // be run if and when the local port is connected. - void ConnectToParentPort(const ports::PortRef& local_port, - const std::string& token, - const base::Closure& callback); - - // Connects two reserved ports to each other. Useful when two independent - // systems in the same (parent) process need to establish a port pair without - // any direct knowledge of each other. - void ConnectReservedPorts(const std::string& token1, - const std::string& token2); - - // Connects a local port to a port on a remote node. Note that a connection to - // the remote node need not be established yet. The port will be connected - // ASAP, at which point |callback| will be run. - void ConnectToRemotePort(const ports::PortRef& local_port, - const ports::NodeName& remote_node_name, - const ports::PortName& remote_port_name, - const base::Closure& callback); + // Merges a local port |port| into a port reserved by |token| in the parent. + void MergePortIntoParent(const std::string& token, + const ports::PortRef& port); // Creates a new shared buffer for use in the current process. scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes); @@ -138,44 +109,9 @@ class NodeController : public ports::NodeDelegate, scoped_refptr<NodeChannel>>; using OutgoingMessageQueue = std::queue<Channel::MessagePtr>; - // Tracks a pending token-based connection to a parent port. - struct PendingPortRequest { - PendingPortRequest(); - ~PendingPortRequest(); - - std::string token; - ports::PortRef local_port; - base::Closure callback; - }; - - // Tracks a reserved port. - struct ReservedPort { - ReservedPort(); - ~ReservedPort(); - - ports::PortRef local_port; - ReservePortCallback callback; - }; - - // Tracks a pending connection to a remote port on any peer. - struct PendingRemotePortConnection { - PendingRemotePortConnection(); - ~PendingRemotePortConnection(); - - ports::PortRef local_port; - ports::NodeName remote_node_name; - ports::PortName remote_port_name; - base::Closure callback; - }; - void ConnectToChildOnIOThread(base::ProcessHandle process_handle, ScopedPlatformHandle platform_handle); void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle); - void RequestParentPortConnectionOnIOThread(const ports::PortRef& local_port, - const std::string& token, - const base::Closure& callback); - void ConnectToRemotePortOnIOThread( - const PendingRemotePortConnection& connection); scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); scoped_refptr<NodeChannel> GetParentChannel(); @@ -215,12 +151,9 @@ class NodeController : public ports::NodeDelegate, const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel) override; void OnPortsMessage(Channel::MessagePtr message) override; - void OnRequestPortConnection(const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const std::string& token) override; - void OnConnectToPort(const ports::NodeName& from_node, - const ports::PortName& connector_port_name, - const ports::PortName& connectee_port_name) override; + void OnRequestPortMerge(const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const std::string& token) override; void OnRequestIntroduction(const ports::NodeName& from_node, const ports::NodeName& name) override; void OnIntroduce(const ports::NodeName& from_node, @@ -264,7 +197,13 @@ class NodeController : public ports::NodeDelegate, base::Lock reserved_ports_lock_; // Ports reserved by token. - base::hash_map<std::string, ReservedPort> reserved_ports_; + base::hash_map<std::string, ports::PortRef> reserved_ports_; + + // Guards |pending_port_merges_|. + base::Lock pending_port_merges_lock_; + + // A set of port merge requests awaiting parent connection. + std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_; // Guards |parent_name_| and |bootstrap_parent_channel_|. base::Lock parent_lock_; @@ -307,17 +246,6 @@ class NodeController : public ports::NodeDelegate, // Channels to children during handshake. NodeMap pending_children_; - // Port connection requests which have been deferred until we have a parent. - std::vector<PendingPortRequest> pending_port_requests_; - - // Port connection requests awaiting a response from the parent. - std::unordered_map<ports::PortName, base::Closure> - pending_parent_port_connections_; - - // Port connections pending the availability of a remote peer node. - std::unordered_map<ports::NodeName, std::vector<PendingRemotePortConnection>> - pending_remote_port_connections_; - // Indicates whether this object should delete itself on IO thread shutdown. // Must only be accessed from the IO thread. bool destroy_on_io_thread_shutdown_ = false; diff --git a/mojo/edk/system/ports/event.h b/mojo/edk/system/ports/event.h index aec9f6b..1dc8eab 100644 --- a/mojo/edk/system/ports/event.h +++ b/mojo/edk/system/ports/event.h @@ -34,6 +34,7 @@ enum struct EventType : uint32_t { kObserveProxy, kObserveProxyAck, kObserveClosure, + kMergePort, }; struct EventHeader { @@ -63,6 +64,11 @@ struct ObserveClosureEventData { uint64_t last_sequence_num; }; +struct MergePortEventData { + PortName new_port_name; + PortDescriptor new_port_descriptor; +}; + inline const EventHeader* GetEventHeader(const Message& message) { return static_cast<const EventHeader*>(message.header_bytes()); } diff --git a/mojo/edk/system/ports/message.cc b/mojo/edk/system/ports/message.cc index 20f6d88..2106c15 100644 --- a/mojo/edk/system/ports/message.cc +++ b/mojo/edk/system/ports/message.cc @@ -37,6 +37,9 @@ void Message::Parse(const void* bytes, case EventType::kObserveClosure: *num_header_bytes = sizeof(EventHeader) + sizeof(ObserveClosureEventData); break; + case EventType::kMergePort: + *num_header_bytes = sizeof(EventHeader) + sizeof(MergePortEventData); + break; default: CHECK(false) << "Bad event type"; return; diff --git a/mojo/edk/system/ports/name.h b/mojo/edk/system/ports/name.h index b0ad0b7..8a9307d 100644 --- a/mojo/edk/system/ports/name.h +++ b/mojo/edk/system/ports/name.h @@ -8,6 +8,7 @@ #include <stdint.h> #include <ostream> +#include <tuple> namespace mojo { namespace edk { @@ -21,9 +22,15 @@ struct Name { inline bool operator==(const Name& a, const Name& b) { return a.v1 == b.v1 && a.v2 == b.v2; } + inline bool operator!=(const Name& a, const Name& b) { return !(a == b); } + +inline bool operator<(const Name& a, const Name& b) { + return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2); +} + std::ostream& operator<<(std::ostream& stream, const Name& name); struct PortName : Name { diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc index 1718306..9cfc511 100644 --- a/mojo/edk/system/ports/node.cc +++ b/mojo/edk/system/ports/node.cc @@ -354,10 +354,39 @@ int Node::AcceptMessage(ScopedMessage message) { return OnObserveClosure( header->port_name, GetEventData<ObserveClosureEventData>(*message)->last_sequence_num); + case EventType::kMergePort: + return OnMergePort(header->port_name, + *GetEventData<MergePortEventData>(*message)); } return OOPS(ERROR_NOT_IMPLEMENTED); } +int Node::MergePorts(const PortRef& port_ref, + const NodeName& destination_node_name, + const PortName& destination_port_name) { + Port* port = port_ref.port(); + { + // |ports_lock_| must be held for WillSendPort_Locked below. + base::AutoLock ports_lock(ports_lock_); + base::AutoLock lock(port->lock); + + DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_ + << " to " << destination_port_name << "@" << destination_node_name; + + // Send the port-to-merge over to the destination node so it can be merged + // into the port cycle atomically there. + MergePortEventData data; + data.new_port_name = port_ref.name(); + WillSendPort_Locked(port, destination_node_name, &data.new_port_name, + &data.new_port_descriptor); + delegate_->ForwardMessage( + destination_node_name, + NewInternalMessage(destination_port_name, + EventType::kMergePort, data)); + } + return OK; +} + int Node::LostConnectionToNode(const NodeName& node_name) { // We can no longer send events to the given node. We also can't expect any // PortAccepted events. @@ -509,34 +538,8 @@ int Node::OnPortAccepted(const PortName& port_name) { << " pointing to " << port->peer_port_name << "@" << port->peer_node_name; - if (port->state != Port::kBuffering) - return OOPS(ERROR_PORT_STATE_UNEXPECTED); - - port->state = Port::kProxying; - - int rv = ForwardMessages_Locked(port.get(), port_name); - if (rv != OK) - return rv; - - // We may have observed closure before receiving PortAccepted. In that - // case, we can advance to removing the proxy without sending out an - // ObserveProxy message. We already know the last expected message, etc. - - if (port->remove_proxy_on_last_message) { - MaybeRemoveProxy_Locked(port.get(), port_name); - - // Make sure we propagate closure to our current peer. - ObserveClosureEventData data; - data.last_sequence_num = port->last_sequence_num_to_receive; - delegate_->ForwardMessage( - port->peer_node_name, - NewInternalMessage(port->peer_port_name, - EventType::kObserveClosure, data)); - } else { - InitiateProxyRemoval_Locked(port.get(), port_name); - } + return BeginProxying_Locked(port.get(), port_name); } - return OK; } int Node::OnObserveProxy(const PortName& port_name, @@ -721,6 +724,94 @@ int Node::OnObserveClosure(const PortName& port_name, return OK; } +int Node::OnMergePort(const PortName& port_name, + const MergePortEventData& event) { + scoped_refptr<Port> port = GetPort(port_name); + if (!port) + return ERROR_PORT_UNKNOWN; + + DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" + << port->state << ") merging with proxy " << event.new_port_name + << "@" << name_ << " pointing to " + << event.new_port_descriptor.peer_port_name << "@" + << event.new_port_descriptor.peer_node_name << " referred by " + << event.new_port_descriptor.referring_port_name << "@" + << event.new_port_descriptor.referring_node_name; + + bool close_target_port = false; + bool close_new_port = false; + + // Accept the new port. This is now the receiving end of the other port cycle + // to be merged with ours. + int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); + if (rv != OK) { + close_target_port = true; + } else { + // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn + // needs to hold |ports_lock_|. We also acquire multiple port locks within. + base::AutoLock ports_lock(ports_lock_); + base::AutoLock lock(port->lock); + + if (port->state != Port::kReceiving) { + close_new_port = true; + } else { + scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name); + base::AutoLock new_port_lock(new_port->lock); + DCHECK(new_port->state == Port::kReceiving); + + // Both ports are locked. Now all we have to do is swap their peer + // information and set them up as proxies. + + std::swap(port->peer_node_name, new_port->peer_node_name); + std::swap(port->peer_port_name, new_port->peer_port_name); + std::swap(port->peer_closed, new_port->peer_closed); + + port->state = Port::kBuffering; + if (port->peer_closed) + port->remove_proxy_on_last_message = true; + + new_port->state = Port::kBuffering; + if (new_port->peer_closed) + new_port->remove_proxy_on_last_message = true; + + int rv1 = BeginProxying_Locked(port.get(), port_name); + int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); + + if (rv1 == OK && rv2 == OK) + return OK; + + // If either proxy failed to initialize (e.g. had undeliverable messages + // or ended up in a bad state somehow), we keep the system in a consistent + // state by undoing the peer swap and closing both merge ports. + + std::swap(port->peer_node_name, new_port->peer_node_name); + std::swap(port->peer_port_name, new_port->peer_port_name); + port->state = Port::kReceiving; + new_port->state = Port::kReceiving; + close_new_port = true; + close_target_port = true; + } + } + + if (close_target_port) { + PortRef target_port; + rv = GetPort(port_name, &target_port); + DCHECK(rv == OK); + + ClosePort(target_port); + } + + if (close_new_port) { + PortRef new_port; + rv = GetPort(event.new_port_name, &new_port); + DCHECK(rv == OK); + + ClosePort(new_port); + } + + return ERROR_PORT_STATE_UNEXPECTED; +} + int Node::AddPortWithName(const PortName& port_name, const scoped_refptr<Port>& port) { base::AutoLock lock(ports_lock_); @@ -908,6 +999,40 @@ int Node::WillSendMessage_Locked(Port* port, return OK; } +int Node::BeginProxying_Locked(Port* port, const PortName& port_name) { + ports_lock_.AssertAcquired(); + port->lock.AssertAcquired(); + + if (port->state != Port::kBuffering) + return OOPS(ERROR_PORT_STATE_UNEXPECTED); + + port->state = Port::kProxying; + + int rv = ForwardMessages_Locked(port, port_name); + if (rv != OK) + return rv; + + // We may have observed closure while buffering. In that case, we can advance + // to removing the proxy without sending out an ObserveProxy message. We + // already know the last expected message, etc. + + if (port->remove_proxy_on_last_message) { + MaybeRemoveProxy_Locked(port, port_name); + + // Make sure we propagate closure to our current peer. + ObserveClosureEventData data; + data.last_sequence_num = port->last_sequence_num_to_receive; + delegate_->ForwardMessage( + port->peer_node_name, + NewInternalMessage(port->peer_port_name, + EventType::kObserveClosure, data)); + } else { + InitiateProxyRemoval_Locked(port, port_name); + } + + return OK; +} + int Node::ForwardMessages_Locked(Port* port, const PortName &port_name) { ports_lock_.AssertAcquired(); port->lock.AssertAcquired(); diff --git a/mojo/edk/system/ports/node.h b/mojo/edk/system/ports/node.h index 410d53b..cb1d5fd 100644 --- a/mojo/edk/system/ports/node.h +++ b/mojo/edk/system/ports/node.h @@ -122,6 +122,21 @@ class Node { // Corresponding to NodeDelegate::ForwardMessage. int AcceptMessage(ScopedMessage message); + // Called to merge two ports with each other. If you have two independent + // port pairs A <=> B and C <=> D, the net result of merging B and C is a + // single connected port pair A <=> D. + // + // Note that the behavior of this operation is undefined if either port to be + // merged (B or C above) has ever been read from or written to directly, and + // this must ONLY be called on one side of the merge, though it doesn't matter + // which side. + // + // It is safe for the non-merged peers (A and D above) to be transferred, + // closed, and/or written to before, during, or after the merge. + int MergePorts(const PortRef& port_ref, + const NodeName& destination_node_name, + const PortName& destination_port_name); + // Called to inform this node that communication with another node is lost // indefinitely. This triggers cleanup of ports bound to this node. int LostConnectionToNode(const NodeName& node_name); @@ -133,6 +148,7 @@ class Node { const ObserveProxyEventData& event); int OnObserveProxyAck(const PortName& port_name, uint64_t last_sequence_num); int OnObserveClosure(const PortName& port_name, uint64_t last_sequence_num); + int OnMergePort(const PortName& port_name, const MergePortEventData& event); int AddPortWithName(const PortName& port_name, const scoped_refptr<Port>& port); @@ -151,6 +167,7 @@ class Node { int WillSendMessage_Locked(Port* port, const PortName& port_name, Message* message); + int BeginProxying_Locked(Port* port, const PortName& port_name); int ForwardMessages_Locked(Port* port, const PortName& port_name); void InitiateProxyRemoval_Locked(Port* port, const PortName& port_name); void MaybeRemoveProxy_Locked(Port* port, const PortName& port_name); diff --git a/mojo/edk/system/ports/ports_unittest.cc b/mojo/edk/system/ports/ports_unittest.cc index 95e2a63..1bdca3f 100644 --- a/mojo/edk/system/ports/ports_unittest.cc +++ b/mojo/edk/system/ports/ports_unittest.cc @@ -690,32 +690,12 @@ TEST_F(PortsTest, SendUninitialized) { Node node0(node0_name, &node0_delegate); node_map[0] = &node0; - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - // Begin to setup a pipe between node0 and node1, but don't initialize either - // endpoint. - PortRef x0, x1; + PortRef x0; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - - node0_delegate.set_save_messages(true); - node1_delegate.set_save_messages(true); - - // Send a message on each port and expect neither to arrive yet. - EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, SendStringMessage(&node0, x0, "oops")); - EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, - SendStringMessage(&node1, x1, "oh well")); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_EQ(OK, node1.ClosePort(x1)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, SendFailure) { @@ -1068,6 +1048,322 @@ TEST_F(PortsTest, SendWithClosedPeerSent) { EXPECT_TRUE(node0.CanShutdownCleanly(false)); } +TEST_F(PortsTest, MergePorts) { + NodeName node0_name(0, 1); + TestNodeDelegate node0_delegate(node0_name); + Node node0(node0_name, &node0_delegate); + node_map[0] = &node0; + + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + + node0_delegate.set_read_messages(false); + node1_delegate.set_save_messages(true); + + // Write a message on A. + EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); + + PumpTasks(); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + + PumpTasks(); + + // Expect only two receiving ports to be left after pumping tasks. + EXPECT_TRUE(node0.CanShutdownCleanly(true)); + EXPECT_TRUE(node1.CanShutdownCleanly(true)); + + // Expect D to have received the message sent on A. + ScopedMessage message; + ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + EXPECT_EQ(0, strcmp("hey", ToString(message))); + + EXPECT_EQ(OK, node0.ClosePort(A)); + EXPECT_EQ(OK, node1.ClosePort(D)); + + // No more ports should be open. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); +} + +TEST_F(PortsTest, MergePortWithClosedPeer1) { + // This tests that the right thing happens when initiating a merge on a port + // whose peer has already been closed. + + NodeName node0_name(0, 1); + TestNodeDelegate node0_delegate(node0_name); + Node node0(node0_name, &node0_delegate); + node_map[0] = &node0; + + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + + node0_delegate.set_read_messages(false); + node1_delegate.set_save_messages(true); + + // Write a message on A. + EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); + + PumpTasks(); + + // Close A. + EXPECT_EQ(OK, node0.ClosePort(A)); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + + PumpTasks(); + + // Expect only one receiving port to be left after pumping tasks. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(true)); + + // Expect D to have received the message sent on A. + ScopedMessage message; + ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + EXPECT_EQ(0, strcmp("hey", ToString(message))); + + EXPECT_EQ(OK, node1.ClosePort(D)); + + // No more ports should be open. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); +} + +TEST_F(PortsTest, MergePortWithClosedPeer2) { + // This tests that the right thing happens when merging into a port whose peer + // has already been closed. + + NodeName node0_name(0, 1); + TestNodeDelegate node0_delegate(node0_name); + Node node0(node0_name, &node0_delegate); + node_map[0] = &node0; + + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + + node0_delegate.set_save_messages(true); + node1_delegate.set_read_messages(false); + + // Write a message on D. + EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); + + PumpTasks(); + + // Close D. + EXPECT_EQ(OK, node1.ClosePort(D)); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + + PumpTasks(); + + // Expect only one receiving port to be left after pumping tasks. + EXPECT_TRUE(node0.CanShutdownCleanly(true)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); + + // Expect A to have received the message sent on D. + ScopedMessage message; + ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(0, strcmp("hey", ToString(message))); + + EXPECT_EQ(OK, node0.ClosePort(A)); + + // No more ports should be open. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); +} + +TEST_F(PortsTest, MergePortsWithClosedPeers) { + // This tests that no residual ports are left behind if two ports are merged + // when both of their peers have been closed. + + NodeName node0_name(0, 1); + TestNodeDelegate node0_delegate(node0_name); + Node node0(node0_name, &node0_delegate); + node_map[0] = &node0; + + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + + node0_delegate.set_save_messages(true); + node1_delegate.set_read_messages(false); + + // Close A and D. + EXPECT_EQ(OK, node0.ClosePort(A)); + EXPECT_EQ(OK, node1.ClosePort(D)); + + PumpTasks(); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + + PumpTasks(); + + // Expect everything to have gone away. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); +} + +TEST_F(PortsTest, MergePortsWithMovedPeers) { + // This tests that no ports can be merged successfully even if their peers + // are moved around. + + NodeName node0_name(0, 1); + TestNodeDelegate node0_delegate(node0_name); + Node node0(node0_name, &node0_delegate); + node_map[0] = &node0; + + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + node0_delegate.set_save_messages(true); + node1_delegate.set_read_messages(false); + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + + // Set up another pair X-Y for moving ports on node0. + PortRef X, Y; + EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); + + ScopedMessage message; + + // Move A to new port E. + EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); + ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); + + EXPECT_EQ(OK, node0.ClosePort(X)); + EXPECT_EQ(OK, node0.ClosePort(Y)); + + node0_delegate.set_read_messages(false); + + // Write messages on E and D. + EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey")); + EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi")); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + + node0_delegate.set_read_messages(true); + node1_delegate.set_read_messages(true); + node1_delegate.set_save_messages(true); + + PumpTasks(); + + // Expect to receive D's message on E and E's message on D. + ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(0, strcmp("hi", ToString(message))); + ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + EXPECT_EQ(0, strcmp("hey", ToString(message))); + + // Close E and D. + EXPECT_EQ(OK, node0.ClosePort(E)); + EXPECT_EQ(OK, node1.ClosePort(D)); + + PumpTasks(); + + // Expect everything to have gone away. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); +} + + +TEST_F(PortsTest, MergePortsFailsGracefully) { + // This tests that the system remains in a well-defined state if something + // goes wrong during port merge. + + NodeName node0_name(0, 1); + TestNodeDelegate node0_delegate(node0_name); + Node node0(node0_name, &node0_delegate); + node_map[0] = &node0; + + NodeName node1_name(1, 1); + TestNodeDelegate node1_delegate(node1_name); + Node node1(node1_name, &node1_delegate); + node_map[1] = &node1; + + // Setup two independent port pairs, A-B on node0 and C-D on node1. + PortRef A, B, C, D; + EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + + PumpTasks(); + + // Initiate a merge between B and C. + EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + + // Move C to a new port E. This is dumb and nobody should do it, but it's + // possible. MergePorts will fail as a result because C won't be in a + // receiving state when the event arrives at node1, so B should be closed. + ScopedMessage message; + PortRef X, Y; + EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y)); + node1_delegate.set_save_messages(true); + EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C)); + ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + ASSERT_EQ(1u, message->num_ports()); + PortRef E; + ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E)); + EXPECT_EQ(OK, node1.ClosePort(X)); + EXPECT_EQ(OK, node1.ClosePort(Y)); + + // C goes away as a result of normal proxy removal. + PumpTasks(); + + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C)); + + // B should have been closed cleanly. + EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); + + // Close A, D, and E. + EXPECT_EQ(OK, node0.ClosePort(A)); + EXPECT_EQ(OK, node1.ClosePort(D)); + EXPECT_EQ(OK, node1.ClosePort(E)); + + PumpTasks(); + + // Expect everything to have gone away. + EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node1.CanShutdownCleanly(false)); +} + } // namespace test } // namespace ports } // namespace edk diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.cc b/mojo/edk/system/remote_message_pipe_bootstrap.cc index 671542f3..2342166 100644 --- a/mojo/edk/system/remote_message_pipe_bootstrap.cc +++ b/mojo/edk/system/remote_message_pipe_bootstrap.cc @@ -31,18 +31,17 @@ struct BootstrapData { void RemoteMessagePipeBootstrap::Create( NodeController* node_controller, ScopedPlatformHandle platform_handle, - const ports::PortRef& port, - const base::Closure& callback) { + const ports::PortRef& port) { if (node_controller->io_task_runner()->RunsTasksOnCurrentThread()) { // Owns itself. - new RemoteMessagePipeBootstrap(node_controller, std::move(platform_handle), - port, callback); + new RemoteMessagePipeBootstrap( + node_controller, std::move(platform_handle), port); } else { node_controller->io_task_runner()->PostTask( FROM_HERE, base::Bind(&RemoteMessagePipeBootstrap::Create, base::Unretained(node_controller), - base::Passed(&platform_handle), port, callback)); + base::Passed(&platform_handle), port)); } } @@ -56,11 +55,9 @@ RemoteMessagePipeBootstrap::~RemoteMessagePipeBootstrap() { RemoteMessagePipeBootstrap::RemoteMessagePipeBootstrap( NodeController* node_controller, ScopedPlatformHandle platform_handle, - const ports::PortRef& port, - const base::Closure& callback) + const ports::PortRef& port) : node_controller_(node_controller), local_port_(port), - callback_(callback), io_task_runner_(base::ThreadTaskRunnerHandle::Get()), channel_(Channel::Create(this, std::move(platform_handle), io_task_runner_)) { @@ -119,8 +116,14 @@ void RemoteMessagePipeBootstrap::OnChannelMessage( } peer_info_received_ = true; - node_controller_->ConnectToRemotePort( - local_port_, data->node_name, data->port_name, callback_); + + // We need to choose one side to initiate the port merge. It doesn't matter + // who does it as long as they don't both try. Simple solution: pick the one + // with the "smaller" port name. + if (local_port_.name() < data->port_name) { + node_controller_->node()->MergePorts(local_port_, data->node_name, + data->port_name); + } // Send another ping to the other end to trigger shutdown. This may race with // the other end sending its own ping, but it doesn't matter. Whoever wins diff --git a/mojo/edk/system/remote_message_pipe_bootstrap.h b/mojo/edk/system/remote_message_pipe_bootstrap.h index 911d518..2a24680 100644 --- a/mojo/edk/system/remote_message_pipe_bootstrap.h +++ b/mojo/edk/system/remote_message_pipe_bootstrap.h @@ -7,7 +7,6 @@ #include <string> -#include "base/callback.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/message_loop/message_loop.h" @@ -28,17 +27,20 @@ class NodeController; // // The bootstrapping procedure the same on either end: // -// 1. Create a local port P. +// 1. Select a local port P to be merged with a remote port. // 2. Write the local node name and P's name to the bootstrap pipe. // 3. When a message is read from the pipe: // - If it's the first message read, extract the remote node+port name and -// initialize the local port. Send an empty ACK message on the pipe. +// and send an empty ACK message on the pipe. // - If it's the second message read, close the channel, and delete |this|. // 4. When an error occus on the pipe, delete |this|. // // Excluding irrecoverable error conditions such as either process dying, // armageddon, etc., this ensures neither end closes the channel until both ends -// have intiailized their corresponding local port. +// are aware of each other's port-to-merge. +// +// At step 3, one side of the channel is chosen to issue a message at the Ports +// layer which eventually merges the two ports. class RemoteMessagePipeBootstrap : public Channel::Delegate, public base::MessageLoop::DestructionObserver { @@ -48,22 +50,18 @@ class RemoteMessagePipeBootstrap // |port| must be a reference to an uninitialized local port. static void Create(NodeController* node_controller, ScopedPlatformHandle platform_handle, - const ports::PortRef& port, - const base::Closure& callback); + const ports::PortRef& port); protected: - explicit RemoteMessagePipeBootstrap( - NodeController* node_controller, - ScopedPlatformHandle platform_handle, - const ports::PortRef& port, - const base::Closure& callback); + explicit RemoteMessagePipeBootstrap(NodeController* node_controller, + ScopedPlatformHandle platform_handle, + const ports::PortRef& port); void ShutDown(); bool shutting_down_ = false; NodeController* node_controller_; const ports::PortRef local_port_; - base::Closure callback_; scoped_refptr<base::TaskRunner> io_task_runner_; scoped_refptr<Channel> channel_; diff --git a/mojo/shell/runner/child/runner_connection.cc b/mojo/shell/runner/child/runner_connection.cc index 75b4648..313c00a 100644 --- a/mojo/shell/runner/child/runner_connection.cc +++ b/mojo/shell/runner/child/runner_connection.cc @@ -74,13 +74,6 @@ class Blocker { using GotApplicationRequestCallback = base::Callback<void(InterfaceRequest<mojom::ShellClient>)>; -void OnCreateMessagePipe(ScopedMessagePipeHandle* result, - Blocker::Unblocker unblocker, - ScopedMessagePipeHandle pipe) { - *result = std::move(pipe); - unblocker.Unblock(base::Bind(&base::DoNothing)); -} - void OnGotApplicationRequest(InterfaceRequest<mojom::ShellClient>* out_request, InterfaceRequest<mojom::ShellClient> request) { *out_request = std::move(request); @@ -229,14 +222,11 @@ bool RunnerConnectionImpl::WaitForApplicationRequest( std::string primordial_pipe_token = base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( switches::kPrimordialPipeToken); - Blocker blocker; - edk::CreateChildMessagePipe( - primordial_pipe_token, - base::Bind(&OnCreateMessagePipe, base::Unretained(&handle), - blocker.GetUnblocker())); - blocker.Block(); + handle = edk::CreateChildMessagePipe(primordial_pipe_token); } + DCHECK(handle.is_valid()); + Blocker blocker; controller_runner_->PostTask( FROM_HERE, diff --git a/mojo/shell/runner/host/child_process.cc b/mojo/shell/runner/host/child_process.cc index 0208b84..75230f0 100644 --- a/mojo/shell/runner/host/child_process.cc +++ b/mojo/shell/runner/host/child_process.cc @@ -51,10 +51,6 @@ namespace shell { namespace { -void DidCreateChannel(embedder::ChannelInfo* channel_info) { - DVLOG(2) << "ChildControllerImpl::DidCreateChannel()"; -} - // Blocker --------------------------------------------------------------------- // Blocks a thread until another thread unblocks it, at which point it unblocks @@ -300,34 +296,14 @@ scoped_ptr<mojo::shell::LinuxSandbox> InitializeSandbox() { } #endif -void InitializeHostMessagePipe( - embedder::ScopedPlatformHandle platform_channel, - scoped_refptr<base::TaskRunner> io_task_runner, - const base::Callback<void(ScopedMessagePipeHandle)>& callback) { - if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { - embedder::SetParentPipeHandle(std::move(platform_channel)); - std::string primordial_pipe_token = - base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( - switches::kPrimordialPipeToken); - edk::CreateChildMessagePipe(primordial_pipe_token, callback); - } else { - ScopedMessagePipeHandle host_message_pipe; - host_message_pipe = - embedder::CreateChannel(std::move(platform_channel), - base::Bind(&DidCreateChannel), io_task_runner); - callback.Run(std::move(host_message_pipe)); - } -} - -void OnHostMessagePipeCreated(AppContext* app_context, - base::NativeLibrary app_library, - const Blocker::Unblocker& unblocker, - ScopedMessagePipeHandle pipe) { - app_context->controller_runner()->PostTask( - FROM_HERE, - base::Bind(&ChildControllerImpl::Init, base::Unretained(app_context), - base::Unretained(app_library), base::Passed(&pipe), - unblocker)); +ScopedMessagePipeHandle InitializeHostMessagePipe( + edk::ScopedPlatformHandle platform_channel, + scoped_refptr<base::TaskRunner> io_task_runner) { + edk::SetParentPipeHandle(std::move(platform_channel)); + std::string primordial_pipe_token = + base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( + switches::kPrimordialPipeToken); + return edk::CreateChildMessagePipe(primordial_pipe_token); } } // namespace @@ -360,8 +336,8 @@ int ChildProcessMain() { sandbox = InitializeSandbox(); #endif - embedder::ScopedPlatformHandle platform_channel = - embedder::PlatformChannelPair::PassClientHandleFromParentProcess( + edk::ScopedPlatformHandle platform_channel = + edk::PlatformChannelPair::PassClientHandleFromParentProcess( command_line); CHECK(platform_channel.is_valid()); @@ -372,13 +348,12 @@ int ChildProcessMain() { app_context.Init(); app_context.StartControllerThread(); + ScopedMessagePipeHandle host_pipe = InitializeHostMessagePipe( + std::move(platform_channel), app_context.io_runner()); app_context.controller_runner()->PostTask( FROM_HERE, - base::Bind( - &InitializeHostMessagePipe, base::Passed(&platform_channel), - make_scoped_refptr(app_context.io_runner()), - base::Bind(&OnHostMessagePipeCreated, base::Unretained(&app_context), - base::Unretained(app_library), blocker.GetUnblocker()))); + base::Bind(&ChildControllerImpl::Init, &app_context, app_library, + base::Passed(&host_pipe), blocker.GetUnblocker())); // This will block, then run whatever the controller wants. blocker.Block(); diff --git a/mojo/shell/runner/host/child_process_host.cc b/mojo/shell/runner/host/child_process_host.cc index 9a5b439..2b3cfc9 100644 --- a/mojo/shell/runner/host/child_process_host.cc +++ b/mojo/shell/runner/host/child_process_host.cc @@ -35,55 +35,24 @@ namespace mojo { namespace shell { -ChildProcessHost::PipeHolder::PipeHolder() {} - -void ChildProcessHost::PipeHolder::Reject() { - base::AutoLock lock(lock_); - reject_pipe_ = true; - pipe_.reset(); -} - -void ChildProcessHost::PipeHolder::SetPipe(ScopedMessagePipeHandle pipe) { - base::AutoLock lock(lock_); - DCHECK(!pipe_.is_valid()); - if (!reject_pipe_) - pipe_ = std::move(pipe); -} - -ScopedMessagePipeHandle ChildProcessHost::PipeHolder::PassPipe() { - base::AutoLock lock(lock_); - DCHECK(pipe_.is_valid()); - return std::move(pipe_); -} - -ChildProcessHost::PipeHolder::~PipeHolder() {} - ChildProcessHost::ChildProcessHost(base::TaskRunner* launch_process_runner, bool start_sandboxed, const base::FilePath& app_path) : launch_process_runner_(launch_process_runner), start_sandboxed_(start_sandboxed), app_path_(app_path), - channel_info_(nullptr), start_child_process_event_(false, false), weak_factory_(this) { - pipe_holder_ = new PipeHolder(); - if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { - node_channel_.reset(new edk::PlatformChannelPair); - primordial_pipe_token_ = edk::GenerateRandomToken(); - } else { - pipe_holder_->SetPipe(embedder::CreateChannel( - platform_channel_pair_.PassServerHandle(), - base::Bind(&ChildProcessHost::DidCreateChannel, base::Unretained(this)), - base::ThreadTaskRunnerHandle::Get())); - OnMessagePipeCreated(); - } + node_channel_.reset(new edk::PlatformChannelPair); + primordial_pipe_token_ = edk::GenerateRandomToken(); + controller_.Bind( + InterfacePtrInfo<mojom::ChildController>( + edk::CreateParentMessagePipe(primordial_pipe_token_), 0u)); } ChildProcessHost::ChildProcessHost(ScopedHandle channel) : launch_process_runner_(nullptr), start_sandboxed_(false), - channel_info_(nullptr), start_child_process_event_(false, false), weak_factory_(this) { CHECK(channel.is_valid()); @@ -99,40 +68,11 @@ ChildProcessHost::~ChildProcessHost() { void ChildProcessHost::Start(const ProcessReadyCallback& callback) { DCHECK(!child_process_.IsValid()); - DCHECK(process_ready_callback_.is_null()); - - process_ready_callback_ = callback; - if (base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")) { - // With the new EDK, bootstrap message pipes are created asynchronously. - // We recieve the bound pipe (if successful) on an arbitrary thread, - // stash it in the thread-safe |pipe_holder_|, and then try to call - // OnMessagePipeCreated() on the host's main thread. - // - // Because of the way the launcher process shuts down, it's possible for - // the main thread's MessageLoop to stop running (but not yet be destroyed!) - // while this boostrap is pending, resulting in OnMessagePipeCreated() never - // being called. - // - // A typical child process (i.e. one using ShellConnection to bind the other - // end of this pipe) may hang forever waiting for an Initialize() message - // unless the pipe is closed. This in turn means that Join() could hang - // waiting for the process to exit. Deadlock! - // - // |pipe_holder_| exists for this reason. If it's still holding onto the - // pipe when Join() is called, the pipe will be closed. - DCHECK(!primordial_pipe_token_.empty()); - edk::CreateParentMessagePipe( - primordial_pipe_token_, - base::Bind(&OnParentMessagePipeCreated, pipe_holder_, - base::ThreadTaskRunnerHandle::Get(), - base::Bind(&ChildProcessHost::OnMessagePipeCreated, - weak_factory_.GetWeakPtr()))); - } - launch_process_runner_->PostTaskAndReply( FROM_HERE, base::Bind(&ChildProcessHost::DoLaunch, base::Unretained(this)), - base::Bind(&ChildProcessHost::DidStart, weak_factory_.GetWeakPtr())); + base::Bind(&ChildProcessHost::DidStart, + weak_factory_.GetWeakPtr(), callback)); } int ChildProcessHost::Join() { @@ -142,10 +82,6 @@ int ChildProcessHost::Join() { controller_ = mojom::ChildControllerPtr(); DCHECK(child_process_.IsValid()); - // Ensure the child pipe is closed even if it wasn't yet connected to the - // controller. - pipe_holder_->Reject(); - int rv = -1; LOG_IF(ERROR, !child_process_.WaitForExit(&rv)) << "Failed to wait for child process"; @@ -172,11 +108,11 @@ void ChildProcessHost::ExitNow(int32_t exit_code) { controller_->ExitNow(exit_code); } -void ChildProcessHost::DidStart() { +void ChildProcessHost::DidStart(const ProcessReadyCallback& callback) { DVLOG(2) << "ChildProcessHost::DidStart()"; if (child_process_.IsValid()) { - MaybeNotifyProcessReady(); + callback.Run(child_process_.Pid()); } else { LOG(ERROR) << "Failed to start child process"; AppCompleted(MOJO_RESULT_UNKNOWN); @@ -278,34 +214,5 @@ void ChildProcessHost::AppCompleted(int32_t result) { } } -void ChildProcessHost::DidCreateChannel(embedder::ChannelInfo* channel_info) { - DVLOG(2) << "AppChildProcessHost::DidCreateChannel()"; - - DCHECK(channel_info || - base::CommandLine::ForCurrentProcess()->HasSwitch("use-new-edk")); - channel_info_ = channel_info; -} - -void ChildProcessHost::OnMessagePipeCreated() { - controller_.Bind( - InterfacePtrInfo<mojom::ChildController>(pipe_holder_->PassPipe(), 0u)); - MaybeNotifyProcessReady(); -} - -void ChildProcessHost::MaybeNotifyProcessReady() { - if (controller_.is_bound() && child_process_.IsValid()) - process_ready_callback_.Run(child_process_.Pid()); -} - -// static -void ChildProcessHost::OnParentMessagePipeCreated( - scoped_refptr<PipeHolder> holder, - scoped_refptr<base::TaskRunner> callback_task_runner, - const base::Closure& callback, - ScopedMessagePipeHandle pipe) { - holder->SetPipe(std::move(pipe)); - callback_task_runner->PostTask(FROM_HERE, callback); -} - } // namespace shell } // namespace mojo diff --git a/mojo/shell/runner/host/child_process_host.h b/mojo/shell/runner/host/child_process_host.h index 906a1a2..2664d30 100644 --- a/mojo/shell/runner/host/child_process_host.h +++ b/mojo/shell/runner/host/child_process_host.h @@ -71,70 +71,24 @@ class ChildProcessHost { void ExitNow(int32_t exit_code); protected: - void DidStart(); + void DidStart(const ProcessReadyCallback& callback); private: - // A thread-safe holder for the bootstrap message pipe to this child process. - // The pipe is established on an arbitrary thread and may not be connected - // until the host's message loop has stopped running. - class PipeHolder : public base::RefCountedThreadSafe<PipeHolder> { - public: - PipeHolder(); - - void Reject(); - void SetPipe(ScopedMessagePipeHandle pipe); - ScopedMessagePipeHandle PassPipe(); - - private: - friend class base::RefCountedThreadSafe<PipeHolder>; - - ~PipeHolder(); - - base::Lock lock_; - bool reject_pipe_ = false; - ScopedMessagePipeHandle pipe_; - - DISALLOW_COPY_AND_ASSIGN(PipeHolder); - }; - void DoLaunch(); void AppCompleted(int32_t result); - // Callback for |embedder::CreateChannel()|. - void DidCreateChannel(embedder::ChannelInfo* channel_info); - - // Called once |pipe_holder_| is bound to a pipe. - void OnMessagePipeCreated(); - - // Called when the child process is launched and when the bootstrap - // message pipe is created. Once both things have happened (which may happen - // in either order), |process_ready_callback_| is invoked. - void MaybeNotifyProcessReady(); - - // Callback used to receive the child message pipe from the ports EDK. - // This may be called on any thread. It will always stash the pipe in - // |holder|, and it will then attempt to call |callback| on - // |callback_task_runner| (which may or may not still be running tasks.) - static void OnParentMessagePipeCreated( - scoped_refptr<PipeHolder> holder, - scoped_refptr<base::TaskRunner> callback_task_runner, - const base::Closure& callback, - ScopedMessagePipeHandle pipe); - scoped_refptr<base::TaskRunner> launch_process_runner_; bool start_sandboxed_; const base::FilePath app_path_; base::Process child_process_; // Used for the ChildController binding. - embedder::PlatformChannelPair platform_channel_pair_; + edk::PlatformChannelPair platform_channel_pair_; mojom::ChildControllerPtr controller_; - embedder::ChannelInfo* channel_info_; mojom::ChildController::StartAppCallback on_app_complete_; - embedder::HandlePassingInformation handle_passing_info_; + edk::HandlePassingInformation handle_passing_info_; - // Used only when --use-new-edk is specified. Used to back the NodeChannel - // between the parent and child node. + // Used to back the NodeChannel between the parent and child node. scoped_ptr<edk::PlatformChannelPair> node_channel_; // Since Start() calls a method on another thread, we use an event to block @@ -144,14 +98,6 @@ class ChildProcessHost { // A token the child can use to connect a primordial pipe to the host. std::string primordial_pipe_token_; - // Holds the message pipe to the child process until it is either closed or - // bound to the controller interface. - scoped_refptr<PipeHolder> pipe_holder_; - - // Invoked exactly once, as soon as the child process's ID is known and - // a pipe to the child has been established. - ProcessReadyCallback process_ready_callback_; - base::WeakPtrFactory<ChildProcessHost> weak_factory_; DISALLOW_COPY_AND_ASSIGN(ChildProcessHost); |