summaryrefslogtreecommitdiffstats
path: root/mojo/public/cpp/bindings/lib
diff options
context:
space:
mode:
Diffstat (limited to 'mojo/public/cpp/bindings/lib')
-rw-r--r--mojo/public/cpp/bindings/lib/binding_state.h4
-rw-r--r--mojo/public/cpp/bindings/lib/connector.cc122
-rw-r--r--mojo/public/cpp/bindings/lib/connector.h42
-rw-r--r--mojo/public/cpp/bindings/lib/interface_ptr_state.h3
-rw-r--r--mojo/public/cpp/bindings/lib/router.cc133
-rw-r--r--mojo/public/cpp/bindings/lib/router.h36
-rw-r--r--mojo/public/cpp/bindings/lib/sync_handle_watcher.cc125
-rw-r--r--mojo/public/cpp/bindings/lib/sync_handle_watcher.h72
8 files changed, 467 insertions, 70 deletions
diff --git a/mojo/public/cpp/bindings/lib/binding_state.h b/mojo/public/cpp/bindings/lib/binding_state.h
index 3d025fb..6483d94 100644
--- a/mojo/public/cpp/bindings/lib/binding_state.h
+++ b/mojo/public/cpp/bindings/lib/binding_state.h
@@ -56,8 +56,8 @@ class BindingState<Interface, false> {
filters.Append<internal::MessageHeaderValidator>();
filters.Append<typename Interface::RequestValidator_>();
- router_ =
- new internal::Router(std::move(handle), std::move(filters), waiter);
+ router_ = new internal::Router(std::move(handle), std::move(filters),
+ Interface::HasSyncMethods_, waiter);
router_->set_incoming_receiver(&stub_);
router_->set_connection_error_handler(
[this]() { connection_error_handler_.Run(); });
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc
index ceccaa9..07c1094 100644
--- a/mojo/public/cpp/bindings/lib/connector.cc
+++ b/mojo/public/cpp/bindings/lib/connector.cc
@@ -7,9 +7,11 @@
#include <stdint.h>
#include <utility>
+#include "base/bind.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
+#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
namespace mojo {
namespace internal {
@@ -52,8 +54,11 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe,
drop_writes_(false),
enforce_errors_from_incoming_receiver_(true),
paused_(false),
- destroyed_flag_(nullptr),
- lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) {
+ lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
+ register_sync_handle_watch_count_(0),
+ registered_with_sync_handle_watcher_(false),
+ sync_handle_watcher_callback_count_(0),
+ weak_factory_(this) {
// Even though we don't have an incoming receiver, we still want to monitor
// the message pipe to know if is closed or encounters an error.
WaitToReadMore();
@@ -62,9 +67,6 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe,
Connector::~Connector() {
DCHECK(thread_checker_.CalledOnValidThread());
- if (destroyed_flag_)
- *destroyed_flag_ = true;
-
CancelWait();
}
@@ -191,17 +193,84 @@ bool Connector::Accept(Message* message) {
return true;
}
+bool Connector::RegisterSyncHandleWatch() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ if (error_)
+ return false;
+
+ register_sync_handle_watch_count_++;
+
+ if (!registered_with_sync_handle_watcher_ && !paused_) {
+ registered_with_sync_handle_watcher_ =
+ SyncHandleWatcher::current()->RegisterHandle(
+ message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
+ base::Unretained(this)));
+ }
+ return true;
+}
+
+void Connector::UnregisterSyncHandleWatch() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ if (register_sync_handle_watch_count_ == 0) {
+ NOTREACHED();
+ return;
+ }
+
+ register_sync_handle_watch_count_--;
+ if (register_sync_handle_watch_count_ > 0)
+ return;
+
+ if (registered_with_sync_handle_watcher_) {
+ SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
+ registered_with_sync_handle_watcher_ = false;
+ }
+}
+
+bool Connector::RunSyncHandleWatch(const bool* should_stop) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK_GT(register_sync_handle_watch_count_, 0u);
+
+ if (error_)
+ return false;
+
+ ResumeIncomingMethodCallProcessing();
+
+ if (!should_stop_sync_handle_watch_)
+ should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false);
+
+ // This object may be destroyed during the WatchAllHandles() call. So we have
+ // to preserve the boolean that WatchAllHandles uses.
+ scoped_refptr<base::RefCountedData<bool>> preserver =
+ should_stop_sync_handle_watch_;
+ const bool* should_stop_array[] = {should_stop,
+ &should_stop_sync_handle_watch_->data};
+ return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2);
+}
+
// static
void Connector::CallOnHandleReady(void* closure, MojoResult result) {
Connector* self = static_cast<Connector*>(closure);
- self->OnHandleReady(result);
+ CHECK(self->async_wait_id_ != 0);
+ self->async_wait_id_ = 0;
+ self->OnHandleReadyInternal(result);
}
-void Connector::OnHandleReady(MojoResult result) {
+void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
+ base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr());
+
+ sync_handle_watcher_callback_count_++;
+ OnHandleReadyInternal(result);
+ // At this point, this object might have been deleted.
+ if (weak_self)
+ sync_handle_watcher_callback_count_--;
+}
+
+void Connector::OnHandleReadyInternal(MojoResult result) {
DCHECK(thread_checker_.CalledOnValidThread());
- CHECK(async_wait_id_ != 0);
- async_wait_id_ = 0;
if (result != MOJO_RESULT_OK) {
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
return;
@@ -218,6 +287,15 @@ void Connector::WaitToReadMore() {
MOJO_DEADLINE_INDEFINITE,
&Connector::CallOnHandleReady,
this);
+
+ if (register_sync_handle_watch_count_ > 0 &&
+ !registered_with_sync_handle_watcher_) {
+ registered_with_sync_handle_watcher_ =
+ SyncHandleWatcher::current()->RegisterHandle(
+ message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
+ base::Unretained(this)));
+ }
}
bool Connector::ReadSingleMessage(MojoResult* read_result) {
@@ -227,9 +305,7 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
// Detect if |this| was destroyed during message dispatch. Allow for the
// possibility of re-entering ReadMore() through message dispatch.
- bool was_destroyed_during_dispatch = false;
- bool* previous_destroyed_flag = destroyed_flag_;
- destroyed_flag_ = &was_destroyed_during_dispatch;
+ base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr();
Message message;
const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
@@ -247,13 +323,8 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) {
incoming_receiver_ && incoming_receiver_->Accept(&message);
}
- if (was_destroyed_during_dispatch) {
- if (previous_destroyed_flag)
- *previous_destroyed_flag = true; // Propagate flag.
+ if (!weak_self)
return false;
- }
-
- destroyed_flag_ = previous_destroyed_flag;
if (rv == MOJO_RESULT_SHOULD_WAIT)
return true;
@@ -295,11 +366,18 @@ void Connector::ReadAllAvailableMessages() {
}
void Connector::CancelWait() {
- if (!async_wait_id_)
- return;
+ if (async_wait_id_) {
+ waiter_->CancelWait(async_wait_id_);
+ async_wait_id_ = 0;
+ }
+
+ if (registered_with_sync_handle_watcher_) {
+ SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
+ registered_with_sync_handle_watcher_ = false;
+ }
- waiter_->CancelWait(async_wait_id_);
- async_wait_id_ = 0;
+ if (should_stop_sync_handle_watch_)
+ should_stop_sync_handle_watch_->data = true;
}
void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
diff --git a/mojo/public/cpp/bindings/lib/connector.h b/mojo/public/cpp/bindings/lib/connector.h
index 3978171..b0af520 100644
--- a/mojo/public/cpp/bindings/lib/connector.h
+++ b/mojo/public/cpp/bindings/lib/connector.h
@@ -5,7 +5,9 @@
#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_CONNECTOR_H_
#define MOJO_PUBLIC_CPP_BINDINGS_LIB_CONNECTOR_H_
+#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
#include "base/threading/thread_checker.h"
#include "mojo/public/c/environment/async_waiter.h"
#include "mojo/public/cpp/bindings/callback.h"
@@ -121,9 +123,31 @@ class Connector : public MessageReceiver {
return message_pipe_.get();
}
+ // Requests to register |message_pipe_| with SyncHandleWatcher whenever this
+ // instance is expecting incoming messages.
+ //
+ // Please note that UnregisterSyncHandleWatch() needs to be called as many
+ // times as successful RegisterSyncHandleWatch() calls in order to cancel the
+ // effect.
+ bool RegisterSyncHandleWatch();
+ void UnregisterSyncHandleWatch();
+
+ // Watches all handles registered with SyncHandleWatcher on the same thread.
+ // The method returns true when |*should_stop| is set to true; returns false
+ // when any failure occurs during the watch, including |message_pipe_| is
+ // closed.
+ bool RunSyncHandleWatch(const bool* should_stop);
+
+ // Whether currently the control flow is inside the sync handle watcher
+ // callback.
+ bool during_sync_handle_watcher_callback() const {
+ return sync_handle_watcher_callback_count_ > 0;
+ }
+
private:
static void CallOnHandleReady(void* closure, MojoResult result);
- void OnHandleReady(MojoResult result);
+ void OnSyncHandleWatcherHandleReady(MojoResult result);
+ void OnHandleReadyInternal(MojoResult result);
void WaitToReadMore();
@@ -155,17 +179,23 @@ class Connector : public MessageReceiver {
bool paused_;
- // If non-null, this will be set to true when the Connector is destroyed. We
- // use this flag to allow for the Connector to be destroyed as a side-effect
- // of dispatching an incoming message.
- bool* destroyed_flag_;
-
// If sending messages is allowed from multiple threads, |lock_| is used to
// protect modifications to |message_pipe_| and |drop_writes_|.
scoped_ptr<base::Lock> lock_;
+ // If non-zero, |message_pipe_| should be registered with SyncHandleWatcher.
+ size_t register_sync_handle_watch_count_;
+ // Whether |message_pipe_| has been registered with SyncHandleWatcher.
+ bool registered_with_sync_handle_watcher_;
+ // If non-zero, currently the control flow is inside the sync handle watcher
+ // callback.
+ size_t sync_handle_watcher_callback_count_;
+ scoped_refptr<base::RefCountedData<bool>> should_stop_sync_handle_watch_;
+
base::ThreadChecker thread_checker_;
+ base::WeakPtrFactory<Connector> weak_factory_;
+
MOJO_DISALLOW_COPY_AND_ASSIGN(Connector);
};
diff --git a/mojo/public/cpp/bindings/lib/interface_ptr_state.h b/mojo/public/cpp/bindings/lib/interface_ptr_state.h
index a175edd..b2093bb 100644
--- a/mojo/public/cpp/bindings/lib/interface_ptr_state.h
+++ b/mojo/public/cpp/bindings/lib/interface_ptr_state.h
@@ -172,7 +172,8 @@ class InterfacePtrState<Interface, false> {
filters.Append<MessageHeaderValidator>();
filters.Append<typename Interface::ResponseValidator_>();
- router_ = new Router(std::move(handle_), std::move(filters), waiter_);
+ router_ =
+ new Router(std::move(handle_), std::move(filters), false, waiter_);
waiter_ = nullptr;
proxy_ = new Proxy(router_);
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc
index 9bc9aa1..bb83477 100644
--- a/mojo/public/cpp/bindings/lib/router.cc
+++ b/mojo/public/cpp/bindings/lib/router.cc
@@ -7,7 +7,10 @@
#include <stdint.h>
#include <utility>
+#include "base/bind.h"
#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/stl_util.h"
namespace mojo {
namespace internal {
@@ -60,6 +63,13 @@ class ResponderThunk : public MessageReceiverWithStatus {
// ----------------------------------------------------------------------------
+Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
+ : response_received(in_response_received) {}
+
+Router::SyncResponseInfo::~SyncResponseInfo() {}
+
+// ----------------------------------------------------------------------------
+
Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
: router_(router) {
}
@@ -75,6 +85,7 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
Router::Router(ScopedMessagePipeHandle message_pipe,
FilterChain filters,
+ bool expects_sync_requests,
const MojoAsyncWaiter* waiter)
: thunk_(this),
filters_(std::move(filters)),
@@ -84,19 +95,15 @@ Router::Router(ScopedMessagePipeHandle message_pipe,
incoming_receiver_(nullptr),
next_request_id_(0),
testing_mode_(false),
+ pending_task_for_messages_(false),
weak_factory_(this) {
filters_.SetSink(&thunk_);
+ if (expects_sync_requests)
+ connector_.RegisterSyncHandleWatch();
connector_.set_incoming_receiver(filters_.GetHead());
}
-Router::~Router() {
- weak_factory_.InvalidateWeakPtrs();
-
- for (auto& pair : async_responders_)
- delete pair.second;
- for (auto& pair : sync_responders_)
- delete pair.second;
-}
+Router::~Router() {}
bool Router::Accept(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
@@ -119,27 +126,32 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
if (!message->has_flag(kMessageIsSync)) {
// We assume ownership of |responder|.
- async_responders_[request_id] = responder;
+ async_responders_[request_id] = make_scoped_ptr(responder);
return true;
}
- sync_responders_[request_id] = responder;
- base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
- for (;;) {
- // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
- // block async messages.
- bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
- // The message pipe has disconnected.
- if (!result)
- break;
+ if (!connector_.RegisterSyncHandleWatch())
+ return false;
- // This instance has been destroyed.
- if (!weak_self)
- break;
+ bool response_received = false;
+ scoped_ptr<MessageReceiver> sync_responder(responder);
+ sync_responses_.insert(std::make_pair(
+ request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
- // The corresponding response message has arrived.
- if (sync_responders_.find(request_id) == sync_responders_.end())
- break;
+ base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
+ bool result = connector_.RunSyncHandleWatch(&response_received);
+ // Make sure that this instance hasn't been destroyed.
+ if (weak_self) {
+ DCHECK(ContainsKey(sync_responses_, request_id));
+ auto iter = sync_responses_.find(request_id);
+ DCHECK_EQ(&response_received, iter->second->response_received);
+ if (result && response_received) {
+ scoped_ptr<Message> response = std::move(iter->second->response);
+ ignore_result(sync_responder->Accept(response.get()));
+ }
+ sync_responses_.erase(iter);
+
+ connector_.UnregisterSyncHandleWatch();
}
// Return true means that we take ownership of |responder|.
@@ -154,6 +166,51 @@ void Router::EnableTestingMode() {
bool Router::HandleIncomingMessage(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
+
+ const bool during_sync_call =
+ connector_.during_sync_handle_watcher_callback();
+ if (!message->has_flag(kMessageIsSync) &&
+ (during_sync_call || !pending_messages_.empty())) {
+ scoped_ptr<Message> pending_message(new Message);
+ message->MoveTo(pending_message.get());
+ pending_messages_.push(std::move(pending_message));
+
+ if (!pending_task_for_messages_) {
+ pending_task_for_messages_ = true;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ return true;
+ }
+
+ return HandleMessageInternal(message);
+}
+
+void Router::HandleQueuedMessages() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(pending_task_for_messages_);
+
+ base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
+ while (!pending_messages_.empty()) {
+ scoped_ptr<Message> message(std::move(pending_messages_.front()));
+ pending_messages_.pop();
+
+ bool result = HandleMessageInternal(message.get());
+ if (!weak_self)
+ return;
+
+ if (!result && !testing_mode_) {
+ connector_.RaiseError();
+ break;
+ }
+ }
+
+ pending_task_for_messages_ = false;
+}
+
+bool Router::HandleMessageInternal(Message* message) {
if (message->has_flag(kMessageExpectsResponse)) {
if (!incoming_receiver_)
return false;
@@ -166,20 +223,28 @@ bool Router::HandleIncomingMessage(Message* message) {
return ok;
} else if (message->has_flag(kMessageIsResponse)) {
- ResponderMap& responder_map = message->has_flag(kMessageIsSync)
- ? sync_responders_
- : async_responders_;
uint64_t request_id = message->request_id();
- ResponderMap::iterator it = responder_map.find(request_id);
- if (it == responder_map.end()) {
+
+ if (message->has_flag(kMessageIsSync)) {
+ auto it = sync_responses_.find(request_id);
+ if (it == sync_responses_.end()) {
+ DCHECK(testing_mode_);
+ return false;
+ }
+ it->second->response.reset(new Message());
+ message->MoveTo(it->second->response.get());
+ *it->second->response_received = true;
+ return true;
+ }
+
+ auto it = async_responders_.find(request_id);
+ if (it == async_responders_.end()) {
DCHECK(testing_mode_);
return false;
}
- MessageReceiver* responder = it->second;
- responder_map.erase(it);
- bool ok = responder->Accept(message);
- delete responder;
- return ok;
+ scoped_ptr<MessageReceiver> responder = std::move(it->second);
+ async_responders_.erase(it);
+ return responder->Accept(message);
} else {
if (!incoming_receiver_)
return false;
diff --git a/mojo/public/cpp/bindings/lib/router.h b/mojo/public/cpp/bindings/lib/router.h
index f7ce513..12aa7e72 100644
--- a/mojo/public/cpp/bindings/lib/router.h
+++ b/mojo/public/cpp/bindings/lib/router.h
@@ -8,8 +8,10 @@
#include <stdint.h>
#include <map>
+#include <queue>
-#include "base/logging.h"
+#include "base/macros.h"
+#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/thread_checker.h"
#include "mojo/public/cpp/bindings/callback.h"
@@ -24,6 +26,7 @@ class Router : public MessageReceiverWithResponder {
public:
Router(ScopedMessagePipeHandle message_pipe,
FilterChain filters,
+ bool expects_sync_requests,
const MojoAsyncWaiter* waiter = Environment::GetDefaultAsyncWaiter());
~Router() override;
@@ -104,13 +107,29 @@ class Router : public MessageReceiverWithResponder {
// Returns true if this Router has any pending callbacks.
bool has_pending_responders() const {
DCHECK(thread_checker_.CalledOnValidThread());
- return !async_responders_.empty() || !sync_responders_.empty();
+ return !async_responders_.empty() || !sync_responses_.empty();
}
private:
// Maps from the id of a response to the MessageReceiver that handles the
// response.
- typedef std::map<uint64_t, MessageReceiver*> ResponderMap;
+ using AsyncResponderMap = std::map<uint64_t, scoped_ptr<MessageReceiver>>;
+
+ struct SyncResponseInfo {
+ public:
+ explicit SyncResponseInfo(bool* in_response_received);
+ ~SyncResponseInfo();
+
+ scoped_ptr<Message> response;
+
+ // Points to a stack-allocated variable.
+ bool* response_received;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SyncResponseInfo);
+ };
+
+ using SyncResponseMap = std::map<uint64_t, scoped_ptr<SyncResponseInfo>>;
class HandleIncomingMessageThunk : public MessageReceiver {
public:
@@ -125,15 +144,22 @@ class Router : public MessageReceiverWithResponder {
};
bool HandleIncomingMessage(Message* message);
+ void HandleQueuedMessages();
+
+ bool HandleMessageInternal(Message* message);
HandleIncomingMessageThunk thunk_;
FilterChain filters_;
Connector connector_;
MessageReceiverWithResponderStatus* incoming_receiver_;
- ResponderMap async_responders_;
- ResponderMap sync_responders_;
+ AsyncResponderMap async_responders_;
+ SyncResponseMap sync_responses_;
uint64_t next_request_id_;
bool testing_mode_;
+ std::queue<scoped_ptr<Message>> pending_messages_;
+ // Whether a task has been posted to trigger processing of
+ // |pending_messages_|.
+ bool pending_task_for_messages_;
base::ThreadChecker thread_checker_;
base::WeakPtrFactory<Router> weak_factory_;
};
diff --git a/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc b/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc
new file mode 100644
index 0000000..13ab491
--- /dev/null
+++ b/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc
@@ -0,0 +1,125 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
+
+#include "base/lazy_instance.h"
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "base/threading/thread_local.h"
+#include "mojo/public/c/system/core.h"
+
+namespace mojo {
+namespace internal {
+namespace {
+
+base::LazyInstance<base::ThreadLocalPointer<SyncHandleWatcher>>
+ g_current_sync_handle_watcher = LAZY_INSTANCE_INITIALIZER;
+
+} // namespace
+
+// static
+SyncHandleWatcher* SyncHandleWatcher::current() {
+ SyncHandleWatcher* result = g_current_sync_handle_watcher.Pointer()->Get();
+ if (!result) {
+ // This object will be destroyed when the current message loop goes away.
+ result = new SyncHandleWatcher();
+ DCHECK_EQ(result, g_current_sync_handle_watcher.Pointer()->Get());
+ }
+ return result;
+}
+
+bool SyncHandleWatcher::RegisterHandle(const Handle& handle,
+ MojoHandleSignals handle_signals,
+ const HandleCallback& callback) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ if (ContainsKey(handles_, handle))
+ return false;
+
+ MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
+ handle.value(), handle_signals);
+ if (result != MOJO_RESULT_OK)
+ return false;
+
+ handles_[handle] = callback;
+ return true;
+}
+
+void SyncHandleWatcher::UnregisterHandle(const Handle& handle) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(ContainsKey(handles_, handle));
+
+ MojoResult result =
+ MojoRemoveHandle(wait_set_handle_.get().value(), handle.value());
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+
+ handles_.erase(handle);
+}
+
+bool SyncHandleWatcher::WatchAllHandles(const bool* should_stop[],
+ size_t count) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ MojoResult result;
+ uint32_t num_ready_handles;
+ MojoHandle ready_handle;
+ MojoResult ready_handle_result;
+
+ while (true) {
+ for (size_t i = 0; i < count; ++i)
+ if (*should_stop[i])
+ return true;
+ do {
+ result = Wait(wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, nullptr);
+ if (result != MOJO_RESULT_OK)
+ return false;
+
+ // TODO(yzshen): Theoretically it can reduce sync call re-entrancy if we
+ // give priority to the handle that is waiting for sync response.
+ num_ready_handles = 1;
+ result = MojoGetReadyHandles(wait_set_handle_.get().value(),
+ &num_ready_handles, &ready_handle,
+ &ready_handle_result, nullptr);
+ if (result != MOJO_RESULT_OK && result != MOJO_RESULT_SHOULD_WAIT)
+ return false;
+ } while (result == MOJO_RESULT_SHOULD_WAIT);
+
+ const auto iter = handles_.find(Handle(ready_handle));
+ iter->second.Run(ready_handle_result);
+ };
+
+ return true;
+}
+
+SyncHandleWatcher::SyncHandleWatcher() {
+ MojoHandle handle;
+ MojoResult result = MojoCreateWaitSet(&handle);
+ CHECK_EQ(MOJO_RESULT_OK, result);
+ wait_set_handle_.reset(Handle(handle));
+ CHECK(wait_set_handle_.is_valid());
+
+ DCHECK(!g_current_sync_handle_watcher.Pointer()->Get());
+ g_current_sync_handle_watcher.Pointer()->Set(this);
+
+ base::MessageLoop::current()->AddDestructionObserver(this);
+}
+
+SyncHandleWatcher::~SyncHandleWatcher() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(handles_.empty());
+ g_current_sync_handle_watcher.Pointer()->Set(nullptr);
+}
+
+void SyncHandleWatcher::WillDestroyCurrentMessageLoop() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK_EQ(this, g_current_sync_handle_watcher.Pointer()->Get());
+
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
+ delete this;
+}
+
+} // namespace internal
+} // namespace mojo
diff --git a/mojo/public/cpp/bindings/lib/sync_handle_watcher.h b/mojo/public/cpp/bindings/lib/sync_handle_watcher.h
new file mode 100644
index 0000000..65657fe
--- /dev/null
+++ b/mojo/public/cpp/bindings/lib/sync_handle_watcher.h
@@ -0,0 +1,72 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_
+#define MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_
+
+#include <unordered_map>
+
+#include "base/callback.h"
+#include "base/macros.h"
+#include "base/message_loop/message_loop.h"
+#include "base/threading/thread_checker.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace mojo {
+namespace internal {
+
+// SyncHandleWatcher is used to support sync methods. While a sync call is
+// waiting for response, we would like incoming sync method requests on the same
+// thread to be able to reenter. We also would like master endpoints to continue
+// dispatching messages for associated endpoints on different threads.
+// Therefore, SyncHandleWatcher is used as thread-local storage to register all
+// handles that need to be watched while waiting for sync call responses.
+//
+// This class is not thread safe.
+class SyncHandleWatcher : public base::MessageLoop::DestructionObserver {
+ public:
+ // Returns a thread-local object.
+ static SyncHandleWatcher* current();
+
+ using HandleCallback = base::Callback<void(MojoResult)>;
+ bool RegisterHandle(const Handle& handle,
+ MojoHandleSignals handle_signals,
+ const HandleCallback& callback);
+
+ void UnregisterHandle(const Handle& handle);
+
+ // Waits on all the registered handles and runs callbacks synchronously for
+ // those ready handles.
+ // The method:
+ // - returns true when any element of |should_stop| is set to true;
+ // - returns false when any error occurs.
+ bool WatchAllHandles(const bool* should_stop[], size_t count);
+
+ private:
+ struct HandleHasher {
+ size_t operator()(const Handle& handle) const {
+ return std::hash<uint32_t>()(static_cast<uint32_t>(handle.value()));
+ }
+ };
+ using HandleMap = std::unordered_map<Handle, HandleCallback, HandleHasher>;
+
+ SyncHandleWatcher();
+ ~SyncHandleWatcher() override;
+
+ // base::MessageLoop::DestructionObserver implementation:
+ void WillDestroyCurrentMessageLoop() override;
+
+ HandleMap handles_;
+
+ ScopedHandle wait_set_handle_;
+
+ base::ThreadChecker thread_checker_;
+
+ DISALLOW_COPY_AND_ASSIGN(SyncHandleWatcher);
+};
+
+} // namespace internal
+} // namespace mojo
+
+#endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_