summaryrefslogtreecommitdiffstats
path: root/chrome/browser/worker_host
diff options
context:
space:
mode:
authorjam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-08-04 02:00:56 +0000
committerjam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-08-04 02:00:56 +0000
commita5da6d613f41ecf35c027e8744dd6d3a41e4010c (patch)
treedad6a3e159fcc21df62180f481a0024db97d5b0d /chrome/browser/worker_host
parente23ed5427d0892d07480781f45b4367adebc0c00 (diff)
downloadchromium_src-a5da6d613f41ecf35c027e8744dd6d3a41e4010c.zip
chromium_src-a5da6d613f41ecf35c027e8744dd6d3a41e4010c.tar.gz
chromium_src-a5da6d613f41ecf35c027e8744dd6d3a41e4010c.tar.bz2
Cross-process Message Port implementation.
I'm sending this first, then I'll add support to workers in another changelist to avoid making this change larger. TEST=running message port related layout tests in ui_tests Review URL: http://codereview.chromium.org/159372 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@22356 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/worker_host')
-rw-r--r--chrome/browser/worker_host/message_port_dispatcher.cc225
-rw-r--r--chrome/browser/worker_host/message_port_dispatcher.h90
-rw-r--r--chrome/browser/worker_host/worker_process_host.cc44
-rw-r--r--chrome/browser/worker_host/worker_process_host.h4
-rw-r--r--chrome/browser/worker_host/worker_service.cc24
-rw-r--r--chrome/browser/worker_host/worker_service.h28
6 files changed, 382 insertions, 33 deletions
diff --git a/chrome/browser/worker_host/message_port_dispatcher.cc b/chrome/browser/worker_host/message_port_dispatcher.cc
new file mode 100644
index 0000000..9804e86
--- /dev/null
+++ b/chrome/browser/worker_host/message_port_dispatcher.cc
@@ -0,0 +1,225 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/worker_host/message_port_dispatcher.h"
+
+#include "base/singleton.h"
+#include "chrome/browser/renderer_host/resource_message_filter.h"
+#include "chrome/browser/worker_host/worker_process_host.h"
+#include "chrome/common/notification_service.h"
+#include "chrome/common/worker_messages.h"
+
+
+MessagePortDispatcher* MessagePortDispatcher::GetInstance() {
+ return Singleton<MessagePortDispatcher>::get();
+}
+
+MessagePortDispatcher::MessagePortDispatcher()
+ : next_message_port_id_(0),
+ sender_(NULL),
+ next_routing_id_(NULL) {
+ // Receive a notification if a message filter or WorkerProcessHost is deleted.
+ registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN,
+ NotificationService::AllSources());
+
+ registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN,
+ NotificationService::AllSources());
+}
+
+MessagePortDispatcher::~MessagePortDispatcher() {
+}
+
+bool MessagePortDispatcher::OnMessageReceived(
+ const IPC::Message& message,
+ IPC::Message::Sender* sender,
+ CallbackWithReturnValue<int>::Type* next_routing_id,
+ bool* message_was_ok) {
+ sender_ = sender;
+ next_routing_id_ = next_routing_id;
+
+ bool handled = true;
+ *message_was_ok = true;
+
+ IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok)
+ IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate)
+ IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy)
+ IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle)
+ IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage)
+ IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages)
+ IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages,
+ OnSendQueuedMessages)
+ IPC_MESSAGE_UNHANDLED(handled = false)
+ IPC_END_MESSAGE_MAP_EX()
+
+ sender_ = NULL;
+ next_routing_id_ = NULL;
+
+ return handled;
+}
+
+bool MessagePortDispatcher::Send(IPC::Message* message) {
+ return sender_->Send(message);
+}
+
+void MessagePortDispatcher::OnCreate(int *route_id,
+ int* message_port_id) {
+ *message_port_id = ++next_message_port_id_;
+ *route_id = next_routing_id_->Run();
+
+ MessagePort port;
+ port.sender = sender_;
+ port.route_id = *route_id;
+ port.next_routing_id = next_routing_id_;
+ port.message_port_id = *message_port_id;
+ port.entangled_message_port_id = MSG_ROUTING_NONE;
+ port.queue_messages = false;
+ message_ports_[*message_port_id] = port;
+}
+
+void MessagePortDispatcher::OnDestroy(int message_port_id) {
+ if (!message_ports_.count(message_port_id)) {
+ NOTREACHED();
+ return;
+ }
+
+ DCHECK(message_ports_[message_port_id].queued_messages.empty());
+ delete message_ports_[message_port_id].next_routing_id;
+ message_ports_.erase(message_port_id);
+}
+
+void MessagePortDispatcher::OnEntangle(int local_message_port_id,
+ int remote_message_port_id) {
+ if (!message_ports_.count(local_message_port_id) ||
+ !message_ports_.count(remote_message_port_id)) {
+ NOTREACHED();
+ return;
+ }
+
+ DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
+ MSG_ROUTING_NONE);
+ message_ports_[remote_message_port_id].entangled_message_port_id =
+ local_message_port_id;
+}
+
+void MessagePortDispatcher::OnPostMessage(int sender_message_port_id,
+ const string16& message,
+ int sent_message_port_id) {
+ if (!message_ports_.count(sender_message_port_id)) {
+ NOTREACHED();
+ return;
+ }
+
+ int entangled_message_port_id =
+ message_ports_[sender_message_port_id].entangled_message_port_id;
+ if (entangled_message_port_id == MSG_ROUTING_NONE)
+ return; // Process could have crashed.
+
+ if (!message_ports_.count(entangled_message_port_id)) {
+ NOTREACHED();
+ return;
+ }
+
+ PostMessageTo(entangled_message_port_id, message, sent_message_port_id);
+}
+
+void MessagePortDispatcher::PostMessageTo(int message_port_id,
+ const string16& message,
+ int sent_message_port_id) {
+ if (!message_ports_.count(message_port_id) ||
+ (sent_message_port_id != MSG_ROUTING_NONE &&
+ !message_ports_.count(sent_message_port_id))) {
+ NOTREACHED();
+ return;
+ }
+
+ MessagePort& entangled_port = message_ports_[message_port_id];
+
+ MessagePort* sent_port = NULL;
+ if (sent_message_port_id != MSG_ROUTING_NONE) {
+ sent_port = &message_ports_[sent_message_port_id];
+ sent_port->queue_messages = true;
+ }
+
+ if (entangled_port.queue_messages) {
+ entangled_port.queued_messages.push_back(
+ std::make_pair(message, sent_message_port_id));
+ } else {
+ // If a message port was sent around, the new location will need a routing
+ // id. Instead of having the created port send us a sync message to get it,
+ // send along with the message.
+ int new_routing_id = MSG_ROUTING_NONE;
+ if (sent_message_port_id != MSG_ROUTING_NONE) {
+ new_routing_id = entangled_port.next_routing_id->Run();
+ sent_port->sender = entangled_port.sender;
+
+ // Update the entry for the sent port as it can be in a different process.
+ sent_port->route_id = new_routing_id;
+ }
+
+ // Now send the message to the entangled port.
+ IPC::Message* ipc_msg = new WorkerProcessMsg_Message(
+ entangled_port.route_id, message, sent_message_port_id,
+ new_routing_id);
+ entangled_port.sender->Send(ipc_msg);
+ }
+}
+
+void MessagePortDispatcher::OnQueueMessages(int message_port_id) {
+ if (!message_ports_.count(message_port_id)) {
+ NOTREACHED();
+ return;
+ }
+
+ MessagePort& port = message_ports_[message_port_id];
+ port.queue_messages = true;
+ port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id));
+}
+
+void MessagePortDispatcher::OnSendQueuedMessages(
+ int message_port_id,
+ const QueuedMessages& queued_messages) {
+ if (!message_ports_.count(message_port_id)) {
+ NOTREACHED();
+ return;
+ }
+
+ // Send the queued messages to the port again. This time they'll reach the
+ // new location.
+ MessagePort& port = message_ports_[message_port_id];
+ port.queue_messages = false;
+ port.queued_messages.insert(port.queued_messages.begin(),
+ queued_messages.begin(),
+ queued_messages.end());
+ for (QueuedMessages::iterator iter = port.queued_messages.begin();
+ iter != port.queued_messages.end(); ++iter) {
+ PostMessageTo(message_port_id, iter->first, iter->second);
+ }
+ port.queued_messages.clear();
+}
+
+void MessagePortDispatcher::Observe(NotificationType type,
+ const NotificationSource& source,
+ const NotificationDetails& details) {
+ IPC::Message::Sender* sender = NULL;
+ if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) {
+ sender = Source<ResourceMessageFilter>(source).ptr();
+ } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) {
+ sender = Source<WorkerProcessHost>(source).ptr();
+ } else {
+ NOTREACHED();
+ }
+
+ // Check if the (possibly) crashed process had any message ports.
+ for (MessagePorts::iterator iter = message_ports_.begin();
+ iter != message_ports_.end();) {
+ MessagePorts::iterator cur_item = iter++;
+ if (cur_item->second.sender == sender) {
+ if (cur_item->second.entangled_message_port_id != MSG_ROUTING_NONE) {
+ message_ports_[cur_item->second.entangled_message_port_id].
+ entangled_message_port_id = MSG_ROUTING_NONE;
+ }
+ message_ports_.erase(cur_item);
+ }
+ }
+}
diff --git a/chrome/browser/worker_host/message_port_dispatcher.h b/chrome/browser/worker_host/message_port_dispatcher.h
new file mode 100644
index 0000000..d443b07
--- /dev/null
+++ b/chrome/browser/worker_host/message_port_dispatcher.h
@@ -0,0 +1,90 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CHROME_BROWSER_WORKER_HOST_MESSAGE_PORT_DISPATCHER_H_
+#define CHROME_BROWSER_WORKER_HOST_MESSAGE_PORT_DISPATCHER_H_
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/singleton.h"
+#include "base/string16.h"
+#include "base/task.h"
+#include "chrome/common/notification_registrar.h"
+#include "ipc/ipc_message.h"
+
+class MessagePortDispatcher : public NotificationObserver {
+ public:
+ typedef std::vector<std::pair<string16, int> > QueuedMessages;
+
+ // Returns the MessagePortDispatcher singleton.
+ static MessagePortDispatcher* GetInstance();
+
+ bool OnMessageReceived(const IPC::Message& message,
+ IPC::Message::Sender* sender,
+ CallbackWithReturnValue<int>::Type* next_routing_id,
+ bool* message_was_ok);
+
+ bool Send(IPC::Message* message);
+
+ private:
+ friend struct DefaultSingletonTraits<MessagePortDispatcher>;
+
+ MessagePortDispatcher();
+ ~MessagePortDispatcher();
+
+ // Message handlers.
+ void OnCreate(int* route_id, int* message_port_id);
+ void OnDestroy(int message_port_id);
+ void OnEntangle(int local_message_port_id, int remote_message_port_id);
+ void OnPostMessage(int sender_message_port_id,
+ const string16& message,
+ int sent_message_port_id);
+ void OnQueueMessages(int message_port_id);
+ void OnSendQueuedMessages(int message_port_id,
+ const QueuedMessages& queued_messages);
+
+ void PostMessageTo(int message_port_id,
+ const string16& message,
+ int sent_message_port_id);
+
+ // NotificationObserver interface.
+ void Observe(NotificationType type,
+ const NotificationSource& source,
+ const NotificationDetails& details);
+
+ struct MessagePort {
+ // sender and route_id are what we need to send messages to the port.
+ IPC::Message::Sender* sender;
+ int route_id;
+ // A function pointer to generate a new route id for the sender above.
+ // Owned by "sender" above, so don't delete.
+ CallbackWithReturnValue<int>::Type* next_routing_id;
+ // A globally unique id for this message port.
+ int message_port_id;
+ // The globally unique id of the entangled message port.
+ int entangled_message_port_id;
+ // If true, all messages to this message port are queued and not delivered.
+ bool queue_messages;
+ QueuedMessages queued_messages;
+ };
+
+ typedef std::map<int, MessagePort> MessagePorts;
+ MessagePorts message_ports_;
+
+ // We need globally unique identifiers for each message port.
+ int next_message_port_id_;
+
+ // Valid only during IPC message dispatching.
+ IPC::Message::Sender* sender_;
+ CallbackWithReturnValue<int>::Type* next_routing_id_;
+
+ NotificationRegistrar registrar_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePortDispatcher);
+};
+
+#endif // CHROME_BROWSER_WORKER_HOST_MESSAGE_PORT_DISPATCHER_H_
diff --git a/chrome/browser/worker_host/worker_process_host.cc b/chrome/browser/worker_host/worker_process_host.cc
index bcb3da2..ed3fd9f 100644
--- a/chrome/browser/worker_host/worker_process_host.cc
+++ b/chrome/browser/worker_host/worker_process_host.cc
@@ -18,11 +18,14 @@
#include "chrome/browser/browser_process.h"
#include "chrome/browser/child_process_security_policy.h"
#include "chrome/browser/renderer_host/render_view_host.h"
+#include "chrome/browser/worker_host/message_port_dispatcher.h"
#include "chrome/browser/worker_host/worker_service.h"
#include "chrome/common/chrome_switches.h"
#include "chrome/common/debug_flags.h"
+#include "chrome/common/notification_service.h"
#include "chrome/common/process_watcher.h"
#include "chrome/common/render_messages.h"
+#include "chrome/common/result_codes.h"
#include "chrome/common/worker_messages.h"
#include "ipc/ipc_descriptors.h"
#include "ipc/ipc_switches.h"
@@ -59,11 +62,16 @@ class WorkerCrashTask : public Task {
WorkerProcessHost::WorkerProcessHost(
ResourceDispatcherHost* resource_dispatcher_host_)
: ChildProcessHost(WORKER_PROCESS, resource_dispatcher_host_) {
+ next_route_id_ = NewCallbackWithReturnValue(
+ WorkerService::GetInstance(), &WorkerService::next_worker_route_id);
}
WorkerProcessHost::~WorkerProcessHost() {
- WorkerService::GetInstance()->OnSenderShutdown(this);
- WorkerService::GetInstance()->OnWorkerProcessDestroyed(this);
+ // Let interested observers know we are being deleted.
+ NotificationService::current()->Notify(
+ NotificationType::WORKER_PROCESS_HOST_SHUTDOWN,
+ Source<WorkerProcessHost>(this),
+ NotificationService::NoDetails());
// If we crashed, tell the RenderViewHost.
MessageLoop* ui_loop = WorkerService::GetInstance()->ui_loop();
@@ -157,16 +165,28 @@ URLRequestContext* WorkerProcessHost::GetRequestContext(
}
void WorkerProcessHost::OnMessageReceived(const IPC::Message& message) {
- bool handled = true;
- IPC_BEGIN_MESSAGE_MAP(WorkerProcessHost, message)
- IPC_MESSAGE_HANDLER(ViewHostMsg_CreateDedicatedWorker,
- OnCreateDedicatedWorker)
- IPC_MESSAGE_HANDLER(ViewHostMsg_CancelCreateDedicatedWorker,
- OnCancelCreateDedicatedWorker)
- IPC_MESSAGE_HANDLER(ViewHostMsg_ForwardToWorker,
- OnForwardToWorker)
- IPC_MESSAGE_UNHANDLED(handled = false)
- IPC_END_MESSAGE_MAP_EX()
+ bool msg_is_ok = true;
+ bool handled = MessagePortDispatcher::GetInstance()->OnMessageReceived(
+ message, this, next_route_id_, &msg_is_ok);
+
+ if (!handled) {
+ handled = true;
+ IPC_BEGIN_MESSAGE_MAP_EX(WorkerProcessHost, message, msg_is_ok)
+ IPC_MESSAGE_HANDLER(ViewHostMsg_CreateDedicatedWorker,
+ OnCreateDedicatedWorker)
+ IPC_MESSAGE_HANDLER(ViewHostMsg_CancelCreateDedicatedWorker,
+ OnCancelCreateDedicatedWorker)
+ IPC_MESSAGE_HANDLER(ViewHostMsg_ForwardToWorker,
+ OnForwardToWorker)
+ IPC_MESSAGE_UNHANDLED(handled = false)
+ IPC_END_MESSAGE_MAP_EX()
+ }
+
+ if (!msg_is_ok) {
+ NOTREACHED();
+ base::KillProcess(handle(), ResultCodes::KILLED_BAD_MESSAGE, false);
+ }
+
if (handled)
return;
diff --git a/chrome/browser/worker_host/worker_process_host.h b/chrome/browser/worker_host/worker_process_host.h
index 00a6642..44bc676 100644
--- a/chrome/browser/worker_host/worker_process_host.h
+++ b/chrome/browser/worker_host/worker_process_host.h
@@ -8,6 +8,7 @@
#include <list>
#include "base/basictypes.h"
+#include "base/task.h"
#include "chrome/common/child_process_host.h"
#include "googleurl/src/gurl.h"
#include "ipc/ipc_channel.h"
@@ -69,6 +70,9 @@ class WorkerProcessHost : public ChildProcessHost {
Instances instances_;
+ // A callback to create a routing id for the associated worker process.
+ CallbackWithReturnValue<int>::Type* next_route_id_;
+
DISALLOW_COPY_AND_ASSIGN(WorkerProcessHost);
};
diff --git a/chrome/browser/worker_host/worker_service.cc b/chrome/browser/worker_host/worker_service.cc
index e67ae80..df8ba51 100644
--- a/chrome/browser/worker_host/worker_service.cc
+++ b/chrome/browser/worker_host/worker_service.cc
@@ -10,9 +10,9 @@
#include "base/thread.h"
#include "chrome/browser/browser_process.h"
#include "chrome/browser/plugin_service.h"
-#include "chrome/browser/worker_host/worker_process_host.h"
#include "chrome/browser/renderer_host/render_process_host.h"
#include "chrome/browser/renderer_host/resource_message_filter.h"
+#include "chrome/browser/worker_host/worker_process_host.h"
#include "chrome/common/chrome_switches.h"
#include "chrome/common/notification_service.h"
#include "chrome/common/worker_messages.h"
@@ -30,9 +30,12 @@ WorkerService::WorkerService()
: next_worker_route_id_(0),
resource_dispatcher_host_(NULL),
ui_loop_(NULL) {
- // Receive a notification if the message filter is deleted.
+ // Receive a notification if a message filter or WorkerProcessHost is deleted.
registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN,
NotificationService::AllSources());
+
+ registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN,
+ NotificationService::AllSources());
}
void WorkerService::Initialize(ResourceDispatcherHost* rdh,
@@ -210,12 +213,19 @@ bool WorkerService::CanCreateWorkerProcess(
void WorkerService::Observe(NotificationType type,
const NotificationSource& source,
const NotificationDetails& details) {
- DCHECK(type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN);
- ResourceMessageFilter* filter = Source<ResourceMessageFilter>(source).ptr();
- OnSenderShutdown(filter);
+ if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) {
+ ResourceMessageFilter* sender = Source<ResourceMessageFilter>(source).ptr();
+ SenderShutdown(sender);
+ } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) {
+ WorkerProcessHost* sender = Source<WorkerProcessHost>(source).ptr();
+ SenderShutdown(sender);
+ WorkerProcessDestroyed(sender);
+ } else {
+ NOTREACHED();
+ }
}
-void WorkerService::OnSenderShutdown(IPC::Message::Sender* sender) {
+void WorkerService::SenderShutdown(IPC::Message::Sender* sender) {
for (ChildProcessHost::Iterator iter(ChildProcessInfo::WORKER_PROCESS);
!iter.Done(); ++iter) {
WorkerProcessHost* worker = static_cast<WorkerProcessHost*>(*iter);
@@ -233,7 +243,7 @@ void WorkerService::OnSenderShutdown(IPC::Message::Sender* sender) {
}
}
-void WorkerService::OnWorkerProcessDestroyed(WorkerProcessHost* process) {
+void WorkerService::WorkerProcessDestroyed(WorkerProcessHost* process) {
if (queued_workers_.empty())
return;
diff --git a/chrome/browser/worker_host/worker_service.h b/chrome/browser/worker_host/worker_service.h
index 45e8248..704a7da 100644
--- a/chrome/browser/worker_host/worker_service.h
+++ b/chrome/browser/worker_host/worker_service.h
@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#ifndef CHROME_BROWSER_WORKER_HOST__WORKER_SERVICE_H_
-#define CHROME_BROWSER_WORKER_HOST__WORKER_SERVICE_H_
+#ifndef CHROME_BROWSER_WORKER_HOST_WORKER_SERVICE_H_
+#define CHROME_BROWSER_WORKER_HOST_WORKER_SERVICE_H_
#include <list>
@@ -42,17 +42,6 @@ class WorkerService : public NotificationObserver {
// forwarded to the worker process.
void ForwardMessage(const IPC::Message& message, int sender_pid);
- // NotificationObserver interface.
- void Observe(NotificationType type,
- const NotificationSource& source,
- const NotificationDetails& details);
-
- // Notifies us that a process that's talking to a worker has shut down.
- void OnSenderShutdown(IPC::Message::Sender* sender);
-
- // Notifies us that a worker process has closed.
- void OnWorkerProcessDestroyed(WorkerProcessHost* process);
-
MessageLoop* ui_loop() { return ui_loop_; }
int next_worker_route_id() { return ++next_worker_route_id_; }
@@ -86,6 +75,17 @@ class WorkerService : public NotificationObserver {
// we're using a strategy of one process per core.
bool CanCreateWorkerProcess(const WorkerProcessHost::WorkerInstance& instance);
+ // NotificationObserver interface.
+ void Observe(NotificationType type,
+ const NotificationSource& source,
+ const NotificationDetails& details);
+
+ // Notifies us that a process that's talking to a worker has shut down.
+ void SenderShutdown(IPC::Message::Sender* sender);
+
+ // Notifies us that a worker process has closed.
+ void WorkerProcessDestroyed(WorkerProcessHost* process);
+
NotificationRegistrar registrar_;
int next_worker_route_id_;
ResourceDispatcherHost* resource_dispatcher_host_;
@@ -96,4 +96,4 @@ class WorkerService : public NotificationObserver {
DISALLOW_COPY_AND_ASSIGN(WorkerService);
};
-#endif // CHROME_BROWSER_WORKER_HOST__WORKER_SERVICE_H_
+#endif // CHROME_BROWSER_WORKER_HOST_WORKER_SERVICE_H_