// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "content/browser/message_port_service.h" #include "content/common/message_port_messages.h" #include "content/public/browser/browser_thread.h" #include "content/public/browser/message_port_delegate.h" namespace content { struct MessagePortService::MessagePort { // |delegate| and |route_id| are what we need to send messages to the port. // |delegate| is just a raw pointer since it notifies us by calling // OnMessagePortDelegateClosing before it gets destroyed. MessagePortDelegate* delegate; int route_id; // A globally unique id for this message port. int message_port_id; // The globally unique id of the entangled message port. int entangled_message_port_id; // If true, all messages to this message port are queued and not delivered. // This is needed so that when a message port is sent between processes all // pending message get transferred. There are two possibilities for pending // messages: either they are already received by the child process, or they're // in-flight. This flag ensures that the latter type get flushed through the // system. // This flag should only be set to true in response to // MessagePortHostMsg_QueueMessages. bool queue_for_inflight_messages; // If true, all messages to this message port are queued and not delivered. // This is needed so that when a message port is sent to a new process all // messages are held in the browser process until the destination process is // ready to receive messages. This flag is set true when a message port is // transferred to a different process but there isn't immediately a // MessagePortDelegate available for that new process. Once the // destination process is ready to receive messages it sends // MessagePortHostMsg_ReleaseMessages to set this flag to false. bool hold_messages_for_destination; // Returns true if messages should be queued for either reason. bool queue_messages() const { return queue_for_inflight_messages || hold_messages_for_destination; } // If true, the message port should be destroyed but was currently still // waiting for a SendQueuedMessages message from a renderer. As soon as that // message is received the port will actually be destroyed. bool should_be_destroyed; QueuedMessages queued_messages; }; MessagePortService* MessagePortService::GetInstance() { return Singleton::get(); } MessagePortService::MessagePortService() : next_message_port_id_(0) { } MessagePortService::~MessagePortService() { } void MessagePortService::UpdateMessagePort(int message_port_id, MessagePortDelegate* delegate, int routing_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } MessagePort& port = message_ports_[message_port_id]; port.delegate = delegate; port.route_id = routing_id; } void MessagePortService::OnMessagePortDelegateClosing( MessagePortDelegate* delegate) { DCHECK_CURRENTLY_ON(BrowserThread::IO); // Check if the (possibly) crashed process had any message ports. for (MessagePorts::iterator iter = message_ports_.begin(); iter != message_ports_.end();) { MessagePorts::iterator cur_item = iter++; if (cur_item->second.delegate == delegate) { Erase(cur_item->first); } } } void MessagePortService::Create(int route_id, MessagePortDelegate* delegate, int* message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); *message_port_id = ++next_message_port_id_; MessagePort port; port.delegate = delegate; port.route_id = route_id; port.message_port_id = *message_port_id; port.entangled_message_port_id = MSG_ROUTING_NONE; port.queue_for_inflight_messages = false; port.hold_messages_for_destination = false; port.should_be_destroyed = false; message_ports_[*message_port_id] = port; } void MessagePortService::Destroy(int message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } DCHECK(message_ports_[message_port_id].queued_messages.empty()); Erase(message_port_id); } void MessagePortService::Entangle(int local_message_port_id, int remote_message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(local_message_port_id) || !message_ports_.count(remote_message_port_id)) { NOTREACHED(); return; } DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == MSG_ROUTING_NONE); message_ports_[remote_message_port_id].entangled_message_port_id = local_message_port_id; } void MessagePortService::PostMessage( int sender_message_port_id, const MessagePortMessage& message, const std::vector& sent_message_ports) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(sender_message_port_id)) { NOTREACHED(); return; } int entangled_message_port_id = message_ports_[sender_message_port_id].entangled_message_port_id; if (entangled_message_port_id == MSG_ROUTING_NONE) return; // Process could have crashed. if (!message_ports_.count(entangled_message_port_id)) { NOTREACHED(); return; } PostMessageTo(entangled_message_port_id, message, sent_message_ports); } void MessagePortService::PostMessageTo( int message_port_id, const MessagePortMessage& message, const std::vector& sent_message_ports) { if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } for (size_t i = 0; i < sent_message_ports.size(); ++i) { if (!message_ports_.count(sent_message_ports[i].id)) { NOTREACHED(); return; } } MessagePort& entangled_port = message_ports_[message_port_id]; if (entangled_port.queue_messages()) { // If the target port is currently holding messages because the destination // renderer isn't available yet, all message ports being sent should also be // put in this state. if (entangled_port.hold_messages_for_destination) { for (const auto& port : sent_message_ports) HoldMessages(port.id); } entangled_port.queued_messages.push_back( std::make_pair(message, sent_message_ports)); return; } if (!entangled_port.delegate) { NOTREACHED(); return; } // Now send the message to the entangled port. entangled_port.delegate->SendMessage(entangled_port.route_id, message, sent_message_ports); } void MessagePortService::QueueMessages(int message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } MessagePort& port = message_ports_[message_port_id]; if (port.delegate) { port.delegate->SendMessagesAreQueued(port.route_id); port.queue_for_inflight_messages = true; port.delegate = NULL; } } void MessagePortService::SendQueuedMessages( int message_port_id, const QueuedMessages& queued_messages) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } // Send the queued messages to the port again. This time they'll reach the // new location. MessagePort& port = message_ports_[message_port_id]; port.queue_for_inflight_messages = false; // If the port is currently holding messages waiting for the target renderer, // all ports in messages being sent to the port should also be put on hold. if (port.hold_messages_for_destination) { for (const auto& message : queued_messages) for (const TransferredMessagePort& sent_port : message.second) HoldMessages(sent_port.id); } port.queued_messages.insert(port.queued_messages.begin(), queued_messages.begin(), queued_messages.end()); if (port.should_be_destroyed) ClosePort(message_port_id); else SendQueuedMessagesIfPossible(message_port_id); } void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } MessagePort& port = message_ports_[message_port_id]; if (port.queue_messages() || !port.delegate) return; for (QueuedMessages::iterator iter = port.queued_messages.begin(); iter != port.queued_messages.end(); ++iter) { PostMessageTo(message_port_id, iter->first, iter->second); } port.queued_messages.clear(); } void MessagePortService::HoldMessages(int message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } // Any ports in messages currently in the queue should also be put on hold. for (const auto& message : message_ports_[message_port_id].queued_messages) for (const TransferredMessagePort& sent_port : message.second) HoldMessages(sent_port.id); message_ports_[message_port_id].hold_messages_for_destination = true; } void MessagePortService::ClosePort(int message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } if (message_ports_[message_port_id].queue_for_inflight_messages) { message_ports_[message_port_id].should_be_destroyed = true; return; } // First close any message ports in the queue for this message port. for (const auto& message : message_ports_[message_port_id].queued_messages) for (const TransferredMessagePort& sent_port : message.second) ClosePort(sent_port.id); Erase(message_port_id); } void MessagePortService::ReleaseMessages(int message_port_id) { DCHECK_CURRENTLY_ON(BrowserThread::IO); if (!message_ports_.count(message_port_id)) { NOTREACHED(); return; } message_ports_[message_port_id].hold_messages_for_destination = false; SendQueuedMessagesIfPossible(message_port_id); } void MessagePortService::Erase(int message_port_id) { MessagePorts::iterator erase_item = message_ports_.find(message_port_id); DCHECK(erase_item != message_ports_.end()); int entangled_id = erase_item->second.entangled_message_port_id; if (entangled_id != MSG_ROUTING_NONE) { // Do the disentanglement (and be paranoid about the other side existing // just in case something unusual happened during entanglement). if (message_ports_.count(entangled_id)) { message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; } } message_ports_.erase(erase_item); } } // namespace content