diff options
Diffstat (limited to 'mojo/edk/system/node_controller.cc')
-rw-r--r-- | mojo/edk/system/node_controller.cc | 204 |
1 files changed, 174 insertions, 30 deletions
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc index 38c53df..6410cae 100644 --- a/mojo/edk/system/node_controller.cc +++ b/mojo/edk/system/node_controller.cc @@ -99,6 +99,18 @@ class ThreadDestructionObserver : } // namespace +NodeController::PendingPortRequest::PendingPortRequest() {} + +NodeController::PendingPortRequest::~PendingPortRequest() {} + +NodeController::ReservedPort::ReservedPort() {} + +NodeController::ReservedPort::~ReservedPort() {} + +NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {} + +NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {} + NodeController::~NodeController() {} NodeController::NodeController(Core* core) @@ -170,25 +182,18 @@ int NodeController::SendMessage(const ports::PortRef& port, } void NodeController::ReservePort(const std::string& token, - const ports::PortRef& port) { + const ReservePortCallback& callback) { + ports::PortRef port; + node_->CreateUninitializedPort(&port); + DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " << token; base::AutoLock lock(reserved_ports_lock_); - auto result = reserved_ports_.insert(std::make_pair(token, port)); - DCHECK(result.second); -} - -void NodeController::MergePortIntoParent(const std::string& token, - const ports::PortRef& port) { - scoped_refptr<NodeChannel> parent = GetParentChannel(); - if (parent) { - parent->RequestPortMerge(port.name(), token); - return; - } - - base::AutoLock lock(pending_port_merges_lock_); - pending_port_merges_.push_back(std::make_pair(token, port)); + ReservedPort reservation; + reservation.local_port = port; + reservation.callback = callback; + reserved_ports_.insert(std::make_pair(token, reservation)); } scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( @@ -205,6 +210,42 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( return buffer; } +void NodeController::ConnectToParentPort(const ports::PortRef& local_port, + const std::string& token, + const base::Closure& callback) { + io_task_runner_->PostTask( + FROM_HERE, + base::Bind(&NodeController::RequestParentPortConnectionOnIOThread, + base::Unretained(this), local_port, token, callback)); +} + +void NodeController::ConnectToRemotePort( + const ports::PortRef& local_port, + const ports::NodeName& remote_node_name, + const ports::PortName& remote_port_name, + const base::Closure& callback) { + if (remote_node_name == name_) { + // It's possible that two different code paths on the node are trying to + // bootstrap ports to each other (e.g. in Chrome single-process mode) + // without being aware of the fact. In this case we can initialize the port + // immediately (which can fail silently if it's already been initialized by + // the request on the other side), and invoke |callback|. + node_->InitializePort(local_port, name_, remote_port_name); + callback.Run(); + return; + } + + PendingRemotePortConnection connection; + connection.local_port = local_port; + connection.remote_node_name = remote_node_name; + connection.remote_port_name = remote_port_name; + connection.callback = callback; + io_task_runner_->PostTask( + FROM_HERE, + base::Bind(&NodeController::ConnectToRemotePortOnIOThread, + base::Unretained(this), connection)); +} + void NodeController::RequestShutdown(const base::Closure& callback) { { base::AutoLock lock(shutdown_lock_); @@ -263,6 +304,46 @@ void NodeController::ConnectToParentOnIOThread( bootstrap_parent_channel_->Start(); } +void NodeController::RequestParentPortConnectionOnIOThread( + const ports::PortRef& local_port, + const std::string& token, + const base::Closure& callback) { + DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); + + scoped_refptr<NodeChannel> parent = GetParentChannel(); + if (!parent) { + PendingPortRequest request; + request.token = token; + request.local_port = local_port; + request.callback = callback; + pending_port_requests_.push_back(request); + return; + } + + pending_parent_port_connections_.insert( + std::make_pair(local_port.name(), callback)); + parent->RequestPortConnection(local_port.name(), token); +} + +void NodeController::ConnectToRemotePortOnIOThread( + const PendingRemotePortConnection& connection) { + scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name); + if (peer) { + // It's safe to initialize the port since we already have a channel to its + // peer. No need to actually send them a message. + int rv = node_->InitializePort(connection.local_port, + connection.remote_node_name, + connection.remote_port_name); + DCHECK_EQ(rv, ports::OK); + connection.callback.Run(); + return; + } + + // Save this for later. We'll initialize the port once this peer is added. + pending_remote_port_connections_[connection.remote_node_name].push_back( + connection); +} + scoped_refptr<NodeChannel> NodeController::GetPeerChannel( const ports::NodeName& name) { base::AutoLock lock(peers_lock_); @@ -333,6 +414,19 @@ void NodeController::AddPeer(const ports::NodeName& name, channel->PortsMessage(std::move(pending_messages.front())); pending_messages.pop(); } + + // Complete any pending port connections to this peer. + auto connections_it = pending_remote_port_connections_.find(name); + if (connections_it != pending_remote_port_connections_.end()) { + for (const auto& connection : connections_it->second) { + int rv = node_->InitializePort(connection.local_port, + connection.remote_node_name, + connection.remote_port_name); + DCHECK_EQ(rv, ports::OK); + connection.callback.Run(); + } + pending_remote_port_connections_.erase(connections_it); + } } void NodeController::DropPeer(const ports::NodeName& name) { @@ -522,8 +616,6 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node, // NOTE: The child does not actually add its parent as a peer until // receiving an AcceptBrokerClient message from the broker. The parent // will request that said message be sent upon receiving AcceptParent. - - DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; } void NodeController::OnAcceptParent(const ports::NodeName& from_node, @@ -671,16 +763,15 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, io_task_runner_); AddPeer(broker_name, broker, true /* start_channel */); } - AddPeer(parent_name, parent, false /* start_channel */); - { - // Complete any port merge requests we have waiting for the parent. - base::AutoLock lock(pending_port_merges_lock_); - for (const auto& request : pending_port_merges_) - parent->RequestPortMerge(request.second.name(), request.first); - pending_port_merges_.clear(); + // Resolve any pending port connections to the parent. + for (const auto& request : pending_port_requests_) { + pending_parent_port_connections_.insert( + std::make_pair(request.local_port.name(), request.callback)); + parent->RequestPortConnection(request.local_port.name(), request.token); } + pending_port_requests_.clear(); // Feed the broker any pending children of our own. while (!pending_broker_clients.empty()) { @@ -733,15 +824,16 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) { AttemptShutdownIfRequested(); } -void NodeController::OnRequestPortMerge( +void NodeController::OnRequestPortConnection( const ports::NodeName& from_node, const ports::PortName& connector_port_name, const std::string& token) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " + DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " << token << " and port " << connector_port_name << "@" << from_node; + ReservePortCallback callback; ports::PortRef local_port; { base::AutoLock lock(reserved_ports_lock_); @@ -751,12 +843,64 @@ void NodeController::OnRequestPortMerge( << token; return; } - local_port = it->second; + local_port = it->second.local_port; + callback = it->second.callback; + reserved_ports_.erase(it); + } + + DCHECK(!callback.is_null()); + + scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); + if (!peer) { + DVLOG(1) << "Ignoring request to connect to port from unknown node " + << from_node; + return; } - int rv = node_->MergePorts(local_port, from_node, connector_port_name); - if (rv != ports::OK) - DLOG(ERROR) << "MergePorts failed: " << rv; + // This reserved port should not have been initialized yet. + CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node, + connector_port_name)); + + peer->ConnectToPort(local_port.name(), connector_port_name); + callback.Run(local_port); +} + +void NodeController::OnConnectToPort( + const ports::NodeName& from_node, + const ports::PortName& connector_port_name, + const ports::PortName& connectee_port_name) { + DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); + + DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port " + << connectee_port_name << " to port " << connector_port_name << "@" + << from_node; + + ports::PortRef connectee_port; + int rv = node_->GetPort(connectee_port_name, &connectee_port); + if (rv != ports::OK) { + DLOG(ERROR) << "Ignoring ConnectToPort for unknown port " + << connectee_port_name; + return; + } + + // It's OK if this port has already been initialized. This message is only + // sent by the remote peer to ensure the port is ready before it starts + // us sending messages to it. + ports::PortStatus port_status; + rv = node_->GetStatus(connectee_port, &port_status); + if (rv == ports::OK) { + DVLOG(1) << "Ignoring ConnectToPort for already-initialized port " + << connectee_port_name; + return; + } + + CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node, + connector_port_name)); + + auto it = pending_parent_port_connections_.find(connectee_port_name); + DCHECK(it != pending_parent_port_connections_.end()); + it->second.Run(); + pending_parent_port_connections_.erase(it); } void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |