diff options
author | jam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-08-04 02:00:56 +0000 |
---|---|---|
committer | jam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-08-04 02:00:56 +0000 |
commit | a5da6d613f41ecf35c027e8744dd6d3a41e4010c (patch) | |
tree | dad6a3e159fcc21df62180f481a0024db97d5b0d /chrome/browser/worker_host | |
parent | e23ed5427d0892d07480781f45b4367adebc0c00 (diff) | |
download | chromium_src-a5da6d613f41ecf35c027e8744dd6d3a41e4010c.zip chromium_src-a5da6d613f41ecf35c027e8744dd6d3a41e4010c.tar.gz chromium_src-a5da6d613f41ecf35c027e8744dd6d3a41e4010c.tar.bz2 |
Cross-process Message Port implementation.
I'm sending this first, then I'll add support to workers in another changelist to avoid making this change larger.
TEST=running message port related layout tests in ui_tests
Review URL: http://codereview.chromium.org/159372
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@22356 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/worker_host')
-rw-r--r-- | chrome/browser/worker_host/message_port_dispatcher.cc | 225 | ||||
-rw-r--r-- | chrome/browser/worker_host/message_port_dispatcher.h | 90 | ||||
-rw-r--r-- | chrome/browser/worker_host/worker_process_host.cc | 44 | ||||
-rw-r--r-- | chrome/browser/worker_host/worker_process_host.h | 4 | ||||
-rw-r--r-- | chrome/browser/worker_host/worker_service.cc | 24 | ||||
-rw-r--r-- | chrome/browser/worker_host/worker_service.h | 28 |
6 files changed, 382 insertions, 33 deletions
diff --git a/chrome/browser/worker_host/message_port_dispatcher.cc b/chrome/browser/worker_host/message_port_dispatcher.cc new file mode 100644 index 0000000..9804e86 --- /dev/null +++ b/chrome/browser/worker_host/message_port_dispatcher.cc @@ -0,0 +1,225 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/worker_host/message_port_dispatcher.h" + +#include "base/singleton.h" +#include "chrome/browser/renderer_host/resource_message_filter.h" +#include "chrome/browser/worker_host/worker_process_host.h" +#include "chrome/common/notification_service.h" +#include "chrome/common/worker_messages.h" + + +MessagePortDispatcher* MessagePortDispatcher::GetInstance() { + return Singleton<MessagePortDispatcher>::get(); +} + +MessagePortDispatcher::MessagePortDispatcher() + : next_message_port_id_(0), + sender_(NULL), + next_routing_id_(NULL) { + // Receive a notification if a message filter or WorkerProcessHost is deleted. + registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, + NotificationService::AllSources()); + + registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, + NotificationService::AllSources()); +} + +MessagePortDispatcher::~MessagePortDispatcher() { +} + +bool MessagePortDispatcher::OnMessageReceived( + const IPC::Message& message, + IPC::Message::Sender* sender, + CallbackWithReturnValue<int>::Type* next_routing_id, + bool* message_was_ok) { + sender_ = sender; + next_routing_id_ = next_routing_id; + + bool handled = true; + *message_was_ok = true; + + IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok) + IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate) + IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy) + IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle) + IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage) + IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages) + IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages, + OnSendQueuedMessages) + IPC_MESSAGE_UNHANDLED(handled = false) + IPC_END_MESSAGE_MAP_EX() + + sender_ = NULL; + next_routing_id_ = NULL; + + return handled; +} + +bool MessagePortDispatcher::Send(IPC::Message* message) { + return sender_->Send(message); +} + +void MessagePortDispatcher::OnCreate(int *route_id, + int* message_port_id) { + *message_port_id = ++next_message_port_id_; + *route_id = next_routing_id_->Run(); + + MessagePort port; + port.sender = sender_; + port.route_id = *route_id; + port.next_routing_id = next_routing_id_; + port.message_port_id = *message_port_id; + port.entangled_message_port_id = MSG_ROUTING_NONE; + port.queue_messages = false; + message_ports_[*message_port_id] = port; +} + +void MessagePortDispatcher::OnDestroy(int message_port_id) { + if (!message_ports_.count(message_port_id)) { + NOTREACHED(); + return; + } + + DCHECK(message_ports_[message_port_id].queued_messages.empty()); + delete message_ports_[message_port_id].next_routing_id; + message_ports_.erase(message_port_id); +} + +void MessagePortDispatcher::OnEntangle(int local_message_port_id, + int remote_message_port_id) { + if (!message_ports_.count(local_message_port_id) || + !message_ports_.count(remote_message_port_id)) { + NOTREACHED(); + return; + } + + DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == + MSG_ROUTING_NONE); + message_ports_[remote_message_port_id].entangled_message_port_id = + local_message_port_id; +} + +void MessagePortDispatcher::OnPostMessage(int sender_message_port_id, + const string16& message, + int sent_message_port_id) { + if (!message_ports_.count(sender_message_port_id)) { + NOTREACHED(); + return; + } + + int entangled_message_port_id = + message_ports_[sender_message_port_id].entangled_message_port_id; + if (entangled_message_port_id == MSG_ROUTING_NONE) + return; // Process could have crashed. + + if (!message_ports_.count(entangled_message_port_id)) { + NOTREACHED(); + return; + } + + PostMessageTo(entangled_message_port_id, message, sent_message_port_id); +} + +void MessagePortDispatcher::PostMessageTo(int message_port_id, + const string16& message, + int sent_message_port_id) { + if (!message_ports_.count(message_port_id) || + (sent_message_port_id != MSG_ROUTING_NONE && + !message_ports_.count(sent_message_port_id))) { + NOTREACHED(); + return; + } + + MessagePort& entangled_port = message_ports_[message_port_id]; + + MessagePort* sent_port = NULL; + if (sent_message_port_id != MSG_ROUTING_NONE) { + sent_port = &message_ports_[sent_message_port_id]; + sent_port->queue_messages = true; + } + + if (entangled_port.queue_messages) { + entangled_port.queued_messages.push_back( + std::make_pair(message, sent_message_port_id)); + } else { + // If a message port was sent around, the new location will need a routing + // id. Instead of having the created port send us a sync message to get it, + // send along with the message. + int new_routing_id = MSG_ROUTING_NONE; + if (sent_message_port_id != MSG_ROUTING_NONE) { + new_routing_id = entangled_port.next_routing_id->Run(); + sent_port->sender = entangled_port.sender; + + // Update the entry for the sent port as it can be in a different process. + sent_port->route_id = new_routing_id; + } + + // Now send the message to the entangled port. + IPC::Message* ipc_msg = new WorkerProcessMsg_Message( + entangled_port.route_id, message, sent_message_port_id, + new_routing_id); + entangled_port.sender->Send(ipc_msg); + } +} + +void MessagePortDispatcher::OnQueueMessages(int message_port_id) { + if (!message_ports_.count(message_port_id)) { + NOTREACHED(); + return; + } + + MessagePort& port = message_ports_[message_port_id]; + port.queue_messages = true; + port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); +} + +void MessagePortDispatcher::OnSendQueuedMessages( + int message_port_id, + const QueuedMessages& queued_messages) { + if (!message_ports_.count(message_port_id)) { + NOTREACHED(); + return; + } + + // Send the queued messages to the port again. This time they'll reach the + // new location. + MessagePort& port = message_ports_[message_port_id]; + port.queue_messages = false; + port.queued_messages.insert(port.queued_messages.begin(), + queued_messages.begin(), + queued_messages.end()); + for (QueuedMessages::iterator iter = port.queued_messages.begin(); + iter != port.queued_messages.end(); ++iter) { + PostMessageTo(message_port_id, iter->first, iter->second); + } + port.queued_messages.clear(); +} + +void MessagePortDispatcher::Observe(NotificationType type, + const NotificationSource& source, + const NotificationDetails& details) { + IPC::Message::Sender* sender = NULL; + if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { + sender = Source<ResourceMessageFilter>(source).ptr(); + } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { + sender = Source<WorkerProcessHost>(source).ptr(); + } else { + NOTREACHED(); + } + + // Check if the (possibly) crashed process had any message ports. + for (MessagePorts::iterator iter = message_ports_.begin(); + iter != message_ports_.end();) { + MessagePorts::iterator cur_item = iter++; + if (cur_item->second.sender == sender) { + if (cur_item->second.entangled_message_port_id != MSG_ROUTING_NONE) { + message_ports_[cur_item->second.entangled_message_port_id]. + entangled_message_port_id = MSG_ROUTING_NONE; + } + message_ports_.erase(cur_item); + } + } +} diff --git a/chrome/browser/worker_host/message_port_dispatcher.h b/chrome/browser/worker_host/message_port_dispatcher.h new file mode 100644 index 0000000..d443b07 --- /dev/null +++ b/chrome/browser/worker_host/message_port_dispatcher.h @@ -0,0 +1,90 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROME_BROWSER_WORKER_HOST_MESSAGE_PORT_DISPATCHER_H_ +#define CHROME_BROWSER_WORKER_HOST_MESSAGE_PORT_DISPATCHER_H_ + +#include <map> +#include <utility> +#include <vector> + +#include "base/basictypes.h" +#include "base/singleton.h" +#include "base/string16.h" +#include "base/task.h" +#include "chrome/common/notification_registrar.h" +#include "ipc/ipc_message.h" + +class MessagePortDispatcher : public NotificationObserver { + public: + typedef std::vector<std::pair<string16, int> > QueuedMessages; + + // Returns the MessagePortDispatcher singleton. + static MessagePortDispatcher* GetInstance(); + + bool OnMessageReceived(const IPC::Message& message, + IPC::Message::Sender* sender, + CallbackWithReturnValue<int>::Type* next_routing_id, + bool* message_was_ok); + + bool Send(IPC::Message* message); + + private: + friend struct DefaultSingletonTraits<MessagePortDispatcher>; + + MessagePortDispatcher(); + ~MessagePortDispatcher(); + + // Message handlers. + void OnCreate(int* route_id, int* message_port_id); + void OnDestroy(int message_port_id); + void OnEntangle(int local_message_port_id, int remote_message_port_id); + void OnPostMessage(int sender_message_port_id, + const string16& message, + int sent_message_port_id); + void OnQueueMessages(int message_port_id); + void OnSendQueuedMessages(int message_port_id, + const QueuedMessages& queued_messages); + + void PostMessageTo(int message_port_id, + const string16& message, + int sent_message_port_id); + + // NotificationObserver interface. + void Observe(NotificationType type, + const NotificationSource& source, + const NotificationDetails& details); + + struct MessagePort { + // sender and route_id are what we need to send messages to the port. + IPC::Message::Sender* sender; + int route_id; + // A function pointer to generate a new route id for the sender above. + // Owned by "sender" above, so don't delete. + CallbackWithReturnValue<int>::Type* next_routing_id; + // A globally unique id for this message port. + int message_port_id; + // The globally unique id of the entangled message port. + int entangled_message_port_id; + // If true, all messages to this message port are queued and not delivered. + bool queue_messages; + QueuedMessages queued_messages; + }; + + typedef std::map<int, MessagePort> MessagePorts; + MessagePorts message_ports_; + + // We need globally unique identifiers for each message port. + int next_message_port_id_; + + // Valid only during IPC message dispatching. + IPC::Message::Sender* sender_; + CallbackWithReturnValue<int>::Type* next_routing_id_; + + NotificationRegistrar registrar_; + + DISALLOW_COPY_AND_ASSIGN(MessagePortDispatcher); +}; + +#endif // CHROME_BROWSER_WORKER_HOST_MESSAGE_PORT_DISPATCHER_H_ diff --git a/chrome/browser/worker_host/worker_process_host.cc b/chrome/browser/worker_host/worker_process_host.cc index bcb3da2..ed3fd9f 100644 --- a/chrome/browser/worker_host/worker_process_host.cc +++ b/chrome/browser/worker_host/worker_process_host.cc @@ -18,11 +18,14 @@ #include "chrome/browser/browser_process.h" #include "chrome/browser/child_process_security_policy.h" #include "chrome/browser/renderer_host/render_view_host.h" +#include "chrome/browser/worker_host/message_port_dispatcher.h" #include "chrome/browser/worker_host/worker_service.h" #include "chrome/common/chrome_switches.h" #include "chrome/common/debug_flags.h" +#include "chrome/common/notification_service.h" #include "chrome/common/process_watcher.h" #include "chrome/common/render_messages.h" +#include "chrome/common/result_codes.h" #include "chrome/common/worker_messages.h" #include "ipc/ipc_descriptors.h" #include "ipc/ipc_switches.h" @@ -59,11 +62,16 @@ class WorkerCrashTask : public Task { WorkerProcessHost::WorkerProcessHost( ResourceDispatcherHost* resource_dispatcher_host_) : ChildProcessHost(WORKER_PROCESS, resource_dispatcher_host_) { + next_route_id_ = NewCallbackWithReturnValue( + WorkerService::GetInstance(), &WorkerService::next_worker_route_id); } WorkerProcessHost::~WorkerProcessHost() { - WorkerService::GetInstance()->OnSenderShutdown(this); - WorkerService::GetInstance()->OnWorkerProcessDestroyed(this); + // Let interested observers know we are being deleted. + NotificationService::current()->Notify( + NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, + Source<WorkerProcessHost>(this), + NotificationService::NoDetails()); // If we crashed, tell the RenderViewHost. MessageLoop* ui_loop = WorkerService::GetInstance()->ui_loop(); @@ -157,16 +165,28 @@ URLRequestContext* WorkerProcessHost::GetRequestContext( } void WorkerProcessHost::OnMessageReceived(const IPC::Message& message) { - bool handled = true; - IPC_BEGIN_MESSAGE_MAP(WorkerProcessHost, message) - IPC_MESSAGE_HANDLER(ViewHostMsg_CreateDedicatedWorker, - OnCreateDedicatedWorker) - IPC_MESSAGE_HANDLER(ViewHostMsg_CancelCreateDedicatedWorker, - OnCancelCreateDedicatedWorker) - IPC_MESSAGE_HANDLER(ViewHostMsg_ForwardToWorker, - OnForwardToWorker) - IPC_MESSAGE_UNHANDLED(handled = false) - IPC_END_MESSAGE_MAP_EX() + bool msg_is_ok = true; + bool handled = MessagePortDispatcher::GetInstance()->OnMessageReceived( + message, this, next_route_id_, &msg_is_ok); + + if (!handled) { + handled = true; + IPC_BEGIN_MESSAGE_MAP_EX(WorkerProcessHost, message, msg_is_ok) + IPC_MESSAGE_HANDLER(ViewHostMsg_CreateDedicatedWorker, + OnCreateDedicatedWorker) + IPC_MESSAGE_HANDLER(ViewHostMsg_CancelCreateDedicatedWorker, + OnCancelCreateDedicatedWorker) + IPC_MESSAGE_HANDLER(ViewHostMsg_ForwardToWorker, + OnForwardToWorker) + IPC_MESSAGE_UNHANDLED(handled = false) + IPC_END_MESSAGE_MAP_EX() + } + + if (!msg_is_ok) { + NOTREACHED(); + base::KillProcess(handle(), ResultCodes::KILLED_BAD_MESSAGE, false); + } + if (handled) return; diff --git a/chrome/browser/worker_host/worker_process_host.h b/chrome/browser/worker_host/worker_process_host.h index 00a6642..44bc676 100644 --- a/chrome/browser/worker_host/worker_process_host.h +++ b/chrome/browser/worker_host/worker_process_host.h @@ -8,6 +8,7 @@ #include <list> #include "base/basictypes.h" +#include "base/task.h" #include "chrome/common/child_process_host.h" #include "googleurl/src/gurl.h" #include "ipc/ipc_channel.h" @@ -69,6 +70,9 @@ class WorkerProcessHost : public ChildProcessHost { Instances instances_; + // A callback to create a routing id for the associated worker process. + CallbackWithReturnValue<int>::Type* next_route_id_; + DISALLOW_COPY_AND_ASSIGN(WorkerProcessHost); }; diff --git a/chrome/browser/worker_host/worker_service.cc b/chrome/browser/worker_host/worker_service.cc index e67ae80..df8ba51 100644 --- a/chrome/browser/worker_host/worker_service.cc +++ b/chrome/browser/worker_host/worker_service.cc @@ -10,9 +10,9 @@ #include "base/thread.h" #include "chrome/browser/browser_process.h" #include "chrome/browser/plugin_service.h" -#include "chrome/browser/worker_host/worker_process_host.h" #include "chrome/browser/renderer_host/render_process_host.h" #include "chrome/browser/renderer_host/resource_message_filter.h" +#include "chrome/browser/worker_host/worker_process_host.h" #include "chrome/common/chrome_switches.h" #include "chrome/common/notification_service.h" #include "chrome/common/worker_messages.h" @@ -30,9 +30,12 @@ WorkerService::WorkerService() : next_worker_route_id_(0), resource_dispatcher_host_(NULL), ui_loop_(NULL) { - // Receive a notification if the message filter is deleted. + // Receive a notification if a message filter or WorkerProcessHost is deleted. registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN, NotificationService::AllSources()); + + registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN, + NotificationService::AllSources()); } void WorkerService::Initialize(ResourceDispatcherHost* rdh, @@ -210,12 +213,19 @@ bool WorkerService::CanCreateWorkerProcess( void WorkerService::Observe(NotificationType type, const NotificationSource& source, const NotificationDetails& details) { - DCHECK(type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN); - ResourceMessageFilter* filter = Source<ResourceMessageFilter>(source).ptr(); - OnSenderShutdown(filter); + if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) { + ResourceMessageFilter* sender = Source<ResourceMessageFilter>(source).ptr(); + SenderShutdown(sender); + } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) { + WorkerProcessHost* sender = Source<WorkerProcessHost>(source).ptr(); + SenderShutdown(sender); + WorkerProcessDestroyed(sender); + } else { + NOTREACHED(); + } } -void WorkerService::OnSenderShutdown(IPC::Message::Sender* sender) { +void WorkerService::SenderShutdown(IPC::Message::Sender* sender) { for (ChildProcessHost::Iterator iter(ChildProcessInfo::WORKER_PROCESS); !iter.Done(); ++iter) { WorkerProcessHost* worker = static_cast<WorkerProcessHost*>(*iter); @@ -233,7 +243,7 @@ void WorkerService::OnSenderShutdown(IPC::Message::Sender* sender) { } } -void WorkerService::OnWorkerProcessDestroyed(WorkerProcessHost* process) { +void WorkerService::WorkerProcessDestroyed(WorkerProcessHost* process) { if (queued_workers_.empty()) return; diff --git a/chrome/browser/worker_host/worker_service.h b/chrome/browser/worker_host/worker_service.h index 45e8248..704a7da 100644 --- a/chrome/browser/worker_host/worker_service.h +++ b/chrome/browser/worker_host/worker_service.h @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#ifndef CHROME_BROWSER_WORKER_HOST__WORKER_SERVICE_H_ -#define CHROME_BROWSER_WORKER_HOST__WORKER_SERVICE_H_ +#ifndef CHROME_BROWSER_WORKER_HOST_WORKER_SERVICE_H_ +#define CHROME_BROWSER_WORKER_HOST_WORKER_SERVICE_H_ #include <list> @@ -42,17 +42,6 @@ class WorkerService : public NotificationObserver { // forwarded to the worker process. void ForwardMessage(const IPC::Message& message, int sender_pid); - // NotificationObserver interface. - void Observe(NotificationType type, - const NotificationSource& source, - const NotificationDetails& details); - - // Notifies us that a process that's talking to a worker has shut down. - void OnSenderShutdown(IPC::Message::Sender* sender); - - // Notifies us that a worker process has closed. - void OnWorkerProcessDestroyed(WorkerProcessHost* process); - MessageLoop* ui_loop() { return ui_loop_; } int next_worker_route_id() { return ++next_worker_route_id_; } @@ -86,6 +75,17 @@ class WorkerService : public NotificationObserver { // we're using a strategy of one process per core. bool CanCreateWorkerProcess(const WorkerProcessHost::WorkerInstance& instance); + // NotificationObserver interface. + void Observe(NotificationType type, + const NotificationSource& source, + const NotificationDetails& details); + + // Notifies us that a process that's talking to a worker has shut down. + void SenderShutdown(IPC::Message::Sender* sender); + + // Notifies us that a worker process has closed. + void WorkerProcessDestroyed(WorkerProcessHost* process); + NotificationRegistrar registrar_; int next_worker_route_id_; ResourceDispatcherHost* resource_dispatcher_host_; @@ -96,4 +96,4 @@ class WorkerService : public NotificationObserver { DISALLOW_COPY_AND_ASSIGN(WorkerService); }; -#endif // CHROME_BROWSER_WORKER_HOST__WORKER_SERVICE_H_ +#endif // CHROME_BROWSER_WORKER_HOST_WORKER_SERVICE_H_ |