diff options
author | yzshen <yzshen@chromium.org> | 2016-02-22 17:22:10 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-02-23 01:23:00 +0000 |
commit | 8ef9022a0d352066587791b3e225ca264c688ade (patch) | |
tree | 099161b2828cb2e83dfb77037930a6ae68c78b8e | |
parent | 9166c9c8a3fbbd0dc74f837660ca6952a2203402 (diff) | |
download | chromium_src-8ef9022a0d352066587791b3e225ca264c688ade.zip chromium_src-8ef9022a0d352066587791b3e225ca264c688ade.tar.gz chromium_src-8ef9022a0d352066587791b3e225ca264c688ade.tar.bz2 |
Reland "Mojo C++ bindings: support sync methods - part 2"
This CL introduces the correct re-entrancy behavior: when a sync call is waiting
for response, allow incoming sync requests on the same thread to re-enter, async
messages are queued until the sync call completes.
The following will be in future CLs:
- Support sync calls with associated interfaces.
The original CL is at https://codereview.chromium.org/1713203002/
BUG=577699
Review URL: https://codereview.chromium.org/1723673002
Cr-Commit-Position: refs/heads/master@{#376892}
17 files changed, 737 insertions, 98 deletions
diff --git a/mojo/mojo_public.gyp b/mojo/mojo_public.gyp index 82cb600..18f3b75f 100644 --- a/mojo/mojo_public.gyp +++ b/mojo/mojo_public.gyp @@ -162,6 +162,8 @@ 'public/cpp/bindings/lib/shared_ptr.h', 'public/cpp/bindings/lib/string_serialization.cc', 'public/cpp/bindings/lib/string_serialization.h', + 'public/cpp/bindings/lib/sync_handle_watcher.cc', + 'public/cpp/bindings/lib/sync_handle_watcher.h', 'public/cpp/bindings/lib/validate_params.h', 'public/cpp/bindings/lib/validation_errors.cc', 'public/cpp/bindings/lib/validation_errors.h', diff --git a/mojo/public/cpp/bindings/BUILD.gn b/mojo/public/cpp/bindings/BUILD.gn index d650a43a0..72079fd 100644 --- a/mojo/public/cpp/bindings/BUILD.gn +++ b/mojo/public/cpp/bindings/BUILD.gn @@ -66,6 +66,8 @@ source_set("bindings") { "lib/scoped_interface_endpoint_handle.h", "lib/string_serialization.cc", "lib/string_serialization.h", + "lib/sync_handle_watcher.cc", + "lib/sync_handle_watcher.h", "lib/union_accessor.h", "lib/validate_params.h", "lib/validation_errors.cc", diff --git a/mojo/public/cpp/bindings/lib/binding_state.h b/mojo/public/cpp/bindings/lib/binding_state.h index 3d025fb..6483d94 100644 --- a/mojo/public/cpp/bindings/lib/binding_state.h +++ b/mojo/public/cpp/bindings/lib/binding_state.h @@ -56,8 +56,8 @@ class BindingState<Interface, false> { filters.Append<internal::MessageHeaderValidator>(); filters.Append<typename Interface::RequestValidator_>(); - router_ = - new internal::Router(std::move(handle), std::move(filters), waiter); + router_ = new internal::Router(std::move(handle), std::move(filters), + Interface::HasSyncMethods_, waiter); router_->set_incoming_receiver(&stub_); router_->set_connection_error_handler( [this]() { connection_error_handler_.Run(); }); diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc index ceccaa9..07c1094 100644 --- a/mojo/public/cpp/bindings/lib/connector.cc +++ b/mojo/public/cpp/bindings/lib/connector.cc @@ -7,9 +7,11 @@ #include <stdint.h> #include <utility> +#include "base/bind.h" #include "base/logging.h" #include "base/macros.h" #include "base/synchronization/lock.h" +#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" namespace mojo { namespace internal { @@ -52,8 +54,11 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, drop_writes_(false), enforce_errors_from_incoming_receiver_(true), paused_(false), - destroyed_flag_(nullptr), - lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { + lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), + register_sync_handle_watch_count_(0), + registered_with_sync_handle_watcher_(false), + sync_handle_watcher_callback_count_(0), + weak_factory_(this) { // Even though we don't have an incoming receiver, we still want to monitor // the message pipe to know if is closed or encounters an error. WaitToReadMore(); @@ -62,9 +67,6 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, Connector::~Connector() { DCHECK(thread_checker_.CalledOnValidThread()); - if (destroyed_flag_) - *destroyed_flag_ = true; - CancelWait(); } @@ -191,17 +193,84 @@ bool Connector::Accept(Message* message) { return true; } +bool Connector::RegisterSyncHandleWatch() { + DCHECK(thread_checker_.CalledOnValidThread()); + + if (error_) + return false; + + register_sync_handle_watch_count_++; + + if (!registered_with_sync_handle_watcher_ && !paused_) { + registered_with_sync_handle_watcher_ = + SyncHandleWatcher::current()->RegisterHandle( + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, + base::Bind(&Connector::OnSyncHandleWatcherHandleReady, + base::Unretained(this))); + } + return true; +} + +void Connector::UnregisterSyncHandleWatch() { + DCHECK(thread_checker_.CalledOnValidThread()); + + if (register_sync_handle_watch_count_ == 0) { + NOTREACHED(); + return; + } + + register_sync_handle_watch_count_--; + if (register_sync_handle_watch_count_ > 0) + return; + + if (registered_with_sync_handle_watcher_) { + SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); + registered_with_sync_handle_watcher_ = false; + } +} + +bool Connector::RunSyncHandleWatch(const bool* should_stop) { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK_GT(register_sync_handle_watch_count_, 0u); + + if (error_) + return false; + + ResumeIncomingMethodCallProcessing(); + + if (!should_stop_sync_handle_watch_) + should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false); + + // This object may be destroyed during the WatchAllHandles() call. So we have + // to preserve the boolean that WatchAllHandles uses. + scoped_refptr<base::RefCountedData<bool>> preserver = + should_stop_sync_handle_watch_; + const bool* should_stop_array[] = {should_stop, + &should_stop_sync_handle_watch_->data}; + return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); +} + // static void Connector::CallOnHandleReady(void* closure, MojoResult result) { Connector* self = static_cast<Connector*>(closure); - self->OnHandleReady(result); + CHECK(self->async_wait_id_ != 0); + self->async_wait_id_ = 0; + self->OnHandleReadyInternal(result); } -void Connector::OnHandleReady(MojoResult result) { +void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { + base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); + + sync_handle_watcher_callback_count_++; + OnHandleReadyInternal(result); + // At this point, this object might have been deleted. + if (weak_self) + sync_handle_watcher_callback_count_--; +} + +void Connector::OnHandleReadyInternal(MojoResult result) { DCHECK(thread_checker_.CalledOnValidThread()); - CHECK(async_wait_id_ != 0); - async_wait_id_ = 0; if (result != MOJO_RESULT_OK) { HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); return; @@ -218,6 +287,15 @@ void Connector::WaitToReadMore() { MOJO_DEADLINE_INDEFINITE, &Connector::CallOnHandleReady, this); + + if (register_sync_handle_watch_count_ > 0 && + !registered_with_sync_handle_watcher_) { + registered_with_sync_handle_watcher_ = + SyncHandleWatcher::current()->RegisterHandle( + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, + base::Bind(&Connector::OnSyncHandleWatcherHandleReady, + base::Unretained(this))); + } } bool Connector::ReadSingleMessage(MojoResult* read_result) { @@ -227,9 +305,7 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { // Detect if |this| was destroyed during message dispatch. Allow for the // possibility of re-entering ReadMore() through message dispatch. - bool was_destroyed_during_dispatch = false; - bool* previous_destroyed_flag = destroyed_flag_; - destroyed_flag_ = &was_destroyed_during_dispatch; + base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); Message message; const MojoResult rv = ReadMessage(message_pipe_.get(), &message); @@ -247,13 +323,8 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { incoming_receiver_ && incoming_receiver_->Accept(&message); } - if (was_destroyed_during_dispatch) { - if (previous_destroyed_flag) - *previous_destroyed_flag = true; // Propagate flag. + if (!weak_self) return false; - } - - destroyed_flag_ = previous_destroyed_flag; if (rv == MOJO_RESULT_SHOULD_WAIT) return true; @@ -295,11 +366,18 @@ void Connector::ReadAllAvailableMessages() { } void Connector::CancelWait() { - if (!async_wait_id_) - return; + if (async_wait_id_) { + waiter_->CancelWait(async_wait_id_); + async_wait_id_ = 0; + } + + if (registered_with_sync_handle_watcher_) { + SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); + registered_with_sync_handle_watcher_ = false; + } - waiter_->CancelWait(async_wait_id_); - async_wait_id_ = 0; + if (should_stop_sync_handle_watch_) + should_stop_sync_handle_watch_->data = true; } void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { diff --git a/mojo/public/cpp/bindings/lib/connector.h b/mojo/public/cpp/bindings/lib/connector.h index 3978171..b0af520 100644 --- a/mojo/public/cpp/bindings/lib/connector.h +++ b/mojo/public/cpp/bindings/lib/connector.h @@ -5,7 +5,9 @@ #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_CONNECTOR_H_ #define MOJO_PUBLIC_CPP_BINDINGS_LIB_CONNECTOR_H_ +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "base/threading/thread_checker.h" #include "mojo/public/c/environment/async_waiter.h" #include "mojo/public/cpp/bindings/callback.h" @@ -121,9 +123,31 @@ class Connector : public MessageReceiver { return message_pipe_.get(); } + // Requests to register |message_pipe_| with SyncHandleWatcher whenever this + // instance is expecting incoming messages. + // + // Please note that UnregisterSyncHandleWatch() needs to be called as many + // times as successful RegisterSyncHandleWatch() calls in order to cancel the + // effect. + bool RegisterSyncHandleWatch(); + void UnregisterSyncHandleWatch(); + + // Watches all handles registered with SyncHandleWatcher on the same thread. + // The method returns true when |*should_stop| is set to true; returns false + // when any failure occurs during the watch, including |message_pipe_| is + // closed. + bool RunSyncHandleWatch(const bool* should_stop); + + // Whether currently the control flow is inside the sync handle watcher + // callback. + bool during_sync_handle_watcher_callback() const { + return sync_handle_watcher_callback_count_ > 0; + } + private: static void CallOnHandleReady(void* closure, MojoResult result); - void OnHandleReady(MojoResult result); + void OnSyncHandleWatcherHandleReady(MojoResult result); + void OnHandleReadyInternal(MojoResult result); void WaitToReadMore(); @@ -155,17 +179,23 @@ class Connector : public MessageReceiver { bool paused_; - // If non-null, this will be set to true when the Connector is destroyed. We - // use this flag to allow for the Connector to be destroyed as a side-effect - // of dispatching an incoming message. - bool* destroyed_flag_; - // If sending messages is allowed from multiple threads, |lock_| is used to // protect modifications to |message_pipe_| and |drop_writes_|. scoped_ptr<base::Lock> lock_; + // If non-zero, |message_pipe_| should be registered with SyncHandleWatcher. + size_t register_sync_handle_watch_count_; + // Whether |message_pipe_| has been registered with SyncHandleWatcher. + bool registered_with_sync_handle_watcher_; + // If non-zero, currently the control flow is inside the sync handle watcher + // callback. + size_t sync_handle_watcher_callback_count_; + scoped_refptr<base::RefCountedData<bool>> should_stop_sync_handle_watch_; + base::ThreadChecker thread_checker_; + base::WeakPtrFactory<Connector> weak_factory_; + MOJO_DISALLOW_COPY_AND_ASSIGN(Connector); }; diff --git a/mojo/public/cpp/bindings/lib/interface_ptr_state.h b/mojo/public/cpp/bindings/lib/interface_ptr_state.h index a175edd..b2093bb 100644 --- a/mojo/public/cpp/bindings/lib/interface_ptr_state.h +++ b/mojo/public/cpp/bindings/lib/interface_ptr_state.h @@ -172,7 +172,8 @@ class InterfacePtrState<Interface, false> { filters.Append<MessageHeaderValidator>(); filters.Append<typename Interface::ResponseValidator_>(); - router_ = new Router(std::move(handle_), std::move(filters), waiter_); + router_ = + new Router(std::move(handle_), std::move(filters), false, waiter_); waiter_ = nullptr; proxy_ = new Proxy(router_); diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc index 9bc9aa1..bb83477 100644 --- a/mojo/public/cpp/bindings/lib/router.cc +++ b/mojo/public/cpp/bindings/lib/router.cc @@ -7,7 +7,10 @@ #include <stdint.h> #include <utility> +#include "base/bind.h" #include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "base/stl_util.h" namespace mojo { namespace internal { @@ -60,6 +63,13 @@ class ResponderThunk : public MessageReceiverWithStatus { // ---------------------------------------------------------------------------- +Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) + : response_received(in_response_received) {} + +Router::SyncResponseInfo::~SyncResponseInfo() {} + +// ---------------------------------------------------------------------------- + Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) : router_(router) { } @@ -75,6 +85,7 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) { Router::Router(ScopedMessagePipeHandle message_pipe, FilterChain filters, + bool expects_sync_requests, const MojoAsyncWaiter* waiter) : thunk_(this), filters_(std::move(filters)), @@ -84,19 +95,15 @@ Router::Router(ScopedMessagePipeHandle message_pipe, incoming_receiver_(nullptr), next_request_id_(0), testing_mode_(false), + pending_task_for_messages_(false), weak_factory_(this) { filters_.SetSink(&thunk_); + if (expects_sync_requests) + connector_.RegisterSyncHandleWatch(); connector_.set_incoming_receiver(filters_.GetHead()); } -Router::~Router() { - weak_factory_.InvalidateWeakPtrs(); - - for (auto& pair : async_responders_) - delete pair.second; - for (auto& pair : sync_responders_) - delete pair.second; -} +Router::~Router() {} bool Router::Accept(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); @@ -119,27 +126,32 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { if (!message->has_flag(kMessageIsSync)) { // We assume ownership of |responder|. - async_responders_[request_id] = responder; + async_responders_[request_id] = make_scoped_ptr(responder); return true; } - sync_responders_[request_id] = responder; - base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); - for (;;) { - // TODO(yzshen): Here we should allow incoming sync requests to re-enter and - // block async messages. - bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); - // The message pipe has disconnected. - if (!result) - break; + if (!connector_.RegisterSyncHandleWatch()) + return false; - // This instance has been destroyed. - if (!weak_self) - break; + bool response_received = false; + scoped_ptr<MessageReceiver> sync_responder(responder); + sync_responses_.insert(std::make_pair( + request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); - // The corresponding response message has arrived. - if (sync_responders_.find(request_id) == sync_responders_.end()) - break; + base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); + bool result = connector_.RunSyncHandleWatch(&response_received); + // Make sure that this instance hasn't been destroyed. + if (weak_self) { + DCHECK(ContainsKey(sync_responses_, request_id)); + auto iter = sync_responses_.find(request_id); + DCHECK_EQ(&response_received, iter->second->response_received); + if (result && response_received) { + scoped_ptr<Message> response = std::move(iter->second->response); + ignore_result(sync_responder->Accept(response.get())); + } + sync_responses_.erase(iter); + + connector_.UnregisterSyncHandleWatch(); } // Return true means that we take ownership of |responder|. @@ -154,6 +166,51 @@ void Router::EnableTestingMode() { bool Router::HandleIncomingMessage(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); + + const bool during_sync_call = + connector_.during_sync_handle_watcher_callback(); + if (!message->has_flag(kMessageIsSync) && + (during_sync_call || !pending_messages_.empty())) { + scoped_ptr<Message> pending_message(new Message); + message->MoveTo(pending_message.get()); + pending_messages_.push(std::move(pending_message)); + + if (!pending_task_for_messages_) { + pending_task_for_messages_ = true; + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(&Router::HandleQueuedMessages, + weak_factory_.GetWeakPtr())); + } + + return true; + } + + return HandleMessageInternal(message); +} + +void Router::HandleQueuedMessages() { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(pending_task_for_messages_); + + base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); + while (!pending_messages_.empty()) { + scoped_ptr<Message> message(std::move(pending_messages_.front())); + pending_messages_.pop(); + + bool result = HandleMessageInternal(message.get()); + if (!weak_self) + return; + + if (!result && !testing_mode_) { + connector_.RaiseError(); + break; + } + } + + pending_task_for_messages_ = false; +} + +bool Router::HandleMessageInternal(Message* message) { if (message->has_flag(kMessageExpectsResponse)) { if (!incoming_receiver_) return false; @@ -166,20 +223,28 @@ bool Router::HandleIncomingMessage(Message* message) { return ok; } else if (message->has_flag(kMessageIsResponse)) { - ResponderMap& responder_map = message->has_flag(kMessageIsSync) - ? sync_responders_ - : async_responders_; uint64_t request_id = message->request_id(); - ResponderMap::iterator it = responder_map.find(request_id); - if (it == responder_map.end()) { + + if (message->has_flag(kMessageIsSync)) { + auto it = sync_responses_.find(request_id); + if (it == sync_responses_.end()) { + DCHECK(testing_mode_); + return false; + } + it->second->response.reset(new Message()); + message->MoveTo(it->second->response.get()); + *it->second->response_received = true; + return true; + } + + auto it = async_responders_.find(request_id); + if (it == async_responders_.end()) { DCHECK(testing_mode_); return false; } - MessageReceiver* responder = it->second; - responder_map.erase(it); - bool ok = responder->Accept(message); - delete responder; - return ok; + scoped_ptr<MessageReceiver> responder = std::move(it->second); + async_responders_.erase(it); + return responder->Accept(message); } else { if (!incoming_receiver_) return false; diff --git a/mojo/public/cpp/bindings/lib/router.h b/mojo/public/cpp/bindings/lib/router.h index f7ce513..12aa7e72 100644 --- a/mojo/public/cpp/bindings/lib/router.h +++ b/mojo/public/cpp/bindings/lib/router.h @@ -8,8 +8,10 @@ #include <stdint.h> #include <map> +#include <queue> -#include "base/logging.h" +#include "base/macros.h" +#include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/threading/thread_checker.h" #include "mojo/public/cpp/bindings/callback.h" @@ -24,6 +26,7 @@ class Router : public MessageReceiverWithResponder { public: Router(ScopedMessagePipeHandle message_pipe, FilterChain filters, + bool expects_sync_requests, const MojoAsyncWaiter* waiter = Environment::GetDefaultAsyncWaiter()); ~Router() override; @@ -104,13 +107,29 @@ class Router : public MessageReceiverWithResponder { // Returns true if this Router has any pending callbacks. bool has_pending_responders() const { DCHECK(thread_checker_.CalledOnValidThread()); - return !async_responders_.empty() || !sync_responders_.empty(); + return !async_responders_.empty() || !sync_responses_.empty(); } private: // Maps from the id of a response to the MessageReceiver that handles the // response. - typedef std::map<uint64_t, MessageReceiver*> ResponderMap; + using AsyncResponderMap = std::map<uint64_t, scoped_ptr<MessageReceiver>>; + + struct SyncResponseInfo { + public: + explicit SyncResponseInfo(bool* in_response_received); + ~SyncResponseInfo(); + + scoped_ptr<Message> response; + + // Points to a stack-allocated variable. + bool* response_received; + + private: + DISALLOW_COPY_AND_ASSIGN(SyncResponseInfo); + }; + + using SyncResponseMap = std::map<uint64_t, scoped_ptr<SyncResponseInfo>>; class HandleIncomingMessageThunk : public MessageReceiver { public: @@ -125,15 +144,22 @@ class Router : public MessageReceiverWithResponder { }; bool HandleIncomingMessage(Message* message); + void HandleQueuedMessages(); + + bool HandleMessageInternal(Message* message); HandleIncomingMessageThunk thunk_; FilterChain filters_; Connector connector_; MessageReceiverWithResponderStatus* incoming_receiver_; - ResponderMap async_responders_; - ResponderMap sync_responders_; + AsyncResponderMap async_responders_; + SyncResponseMap sync_responses_; uint64_t next_request_id_; bool testing_mode_; + std::queue<scoped_ptr<Message>> pending_messages_; + // Whether a task has been posted to trigger processing of + // |pending_messages_|. + bool pending_task_for_messages_; base::ThreadChecker thread_checker_; base::WeakPtrFactory<Router> weak_factory_; }; diff --git a/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc b/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc new file mode 100644 index 0000000..13ab491 --- /dev/null +++ b/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc @@ -0,0 +1,125 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/stl_util.h" +#include "base/threading/thread_local.h" +#include "mojo/public/c/system/core.h" + +namespace mojo { +namespace internal { +namespace { + +base::LazyInstance<base::ThreadLocalPointer<SyncHandleWatcher>> + g_current_sync_handle_watcher = LAZY_INSTANCE_INITIALIZER; + +} // namespace + +// static +SyncHandleWatcher* SyncHandleWatcher::current() { + SyncHandleWatcher* result = g_current_sync_handle_watcher.Pointer()->Get(); + if (!result) { + // This object will be destroyed when the current message loop goes away. + result = new SyncHandleWatcher(); + DCHECK_EQ(result, g_current_sync_handle_watcher.Pointer()->Get()); + } + return result; +} + +bool SyncHandleWatcher::RegisterHandle(const Handle& handle, + MojoHandleSignals handle_signals, + const HandleCallback& callback) { + DCHECK(thread_checker_.CalledOnValidThread()); + + if (ContainsKey(handles_, handle)) + return false; + + MojoResult result = MojoAddHandle(wait_set_handle_.get().value(), + handle.value(), handle_signals); + if (result != MOJO_RESULT_OK) + return false; + + handles_[handle] = callback; + return true; +} + +void SyncHandleWatcher::UnregisterHandle(const Handle& handle) { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(ContainsKey(handles_, handle)); + + MojoResult result = + MojoRemoveHandle(wait_set_handle_.get().value(), handle.value()); + DCHECK_EQ(MOJO_RESULT_OK, result); + + handles_.erase(handle); +} + +bool SyncHandleWatcher::WatchAllHandles(const bool* should_stop[], + size_t count) { + DCHECK(thread_checker_.CalledOnValidThread()); + + MojoResult result; + uint32_t num_ready_handles; + MojoHandle ready_handle; + MojoResult ready_handle_result; + + while (true) { + for (size_t i = 0; i < count; ++i) + if (*should_stop[i]) + return true; + do { + result = Wait(wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, nullptr); + if (result != MOJO_RESULT_OK) + return false; + + // TODO(yzshen): Theoretically it can reduce sync call re-entrancy if we + // give priority to the handle that is waiting for sync response. + num_ready_handles = 1; + result = MojoGetReadyHandles(wait_set_handle_.get().value(), + &num_ready_handles, &ready_handle, + &ready_handle_result, nullptr); + if (result != MOJO_RESULT_OK && result != MOJO_RESULT_SHOULD_WAIT) + return false; + } while (result == MOJO_RESULT_SHOULD_WAIT); + + const auto iter = handles_.find(Handle(ready_handle)); + iter->second.Run(ready_handle_result); + }; + + return true; +} + +SyncHandleWatcher::SyncHandleWatcher() { + MojoHandle handle; + MojoResult result = MojoCreateWaitSet(&handle); + CHECK_EQ(MOJO_RESULT_OK, result); + wait_set_handle_.reset(Handle(handle)); + CHECK(wait_set_handle_.is_valid()); + + DCHECK(!g_current_sync_handle_watcher.Pointer()->Get()); + g_current_sync_handle_watcher.Pointer()->Set(this); + + base::MessageLoop::current()->AddDestructionObserver(this); +} + +SyncHandleWatcher::~SyncHandleWatcher() { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(handles_.empty()); + g_current_sync_handle_watcher.Pointer()->Set(nullptr); +} + +void SyncHandleWatcher::WillDestroyCurrentMessageLoop() { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK_EQ(this, g_current_sync_handle_watcher.Pointer()->Get()); + + base::MessageLoop::current()->RemoveDestructionObserver(this); + delete this; +} + +} // namespace internal +} // namespace mojo diff --git a/mojo/public/cpp/bindings/lib/sync_handle_watcher.h b/mojo/public/cpp/bindings/lib/sync_handle_watcher.h new file mode 100644 index 0000000..65657fe --- /dev/null +++ b/mojo/public/cpp/bindings/lib/sync_handle_watcher.h @@ -0,0 +1,72 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_ +#define MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_ + +#include <unordered_map> + +#include "base/callback.h" +#include "base/macros.h" +#include "base/message_loop/message_loop.h" +#include "base/threading/thread_checker.h" +#include "mojo/public/cpp/system/core.h" + +namespace mojo { +namespace internal { + +// SyncHandleWatcher is used to support sync methods. While a sync call is +// waiting for response, we would like incoming sync method requests on the same +// thread to be able to reenter. We also would like master endpoints to continue +// dispatching messages for associated endpoints on different threads. +// Therefore, SyncHandleWatcher is used as thread-local storage to register all +// handles that need to be watched while waiting for sync call responses. +// +// This class is not thread safe. +class SyncHandleWatcher : public base::MessageLoop::DestructionObserver { + public: + // Returns a thread-local object. + static SyncHandleWatcher* current(); + + using HandleCallback = base::Callback<void(MojoResult)>; + bool RegisterHandle(const Handle& handle, + MojoHandleSignals handle_signals, + const HandleCallback& callback); + + void UnregisterHandle(const Handle& handle); + + // Waits on all the registered handles and runs callbacks synchronously for + // those ready handles. + // The method: + // - returns true when any element of |should_stop| is set to true; + // - returns false when any error occurs. + bool WatchAllHandles(const bool* should_stop[], size_t count); + + private: + struct HandleHasher { + size_t operator()(const Handle& handle) const { + return std::hash<uint32_t>()(static_cast<uint32_t>(handle.value())); + } + }; + using HandleMap = std::unordered_map<Handle, HandleCallback, HandleHasher>; + + SyncHandleWatcher(); + ~SyncHandleWatcher() override; + + // base::MessageLoop::DestructionObserver implementation: + void WillDestroyCurrentMessageLoop() override; + + HandleMap handles_; + + ScopedHandle wait_set_handle_; + + base::ThreadChecker thread_checker_; + + DISALLOW_COPY_AND_ASSIGN(SyncHandleWatcher); +}; + +} // namespace internal +} // namespace mojo + +#endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_ diff --git a/mojo/public/cpp/bindings/message.h b/mojo/public/cpp/bindings/message.h index f58b2e0..9781a13 100644 --- a/mojo/public/cpp/bindings/message.h +++ b/mojo/public/cpp/bindings/message.h @@ -122,6 +122,7 @@ class MessageReceiverWithResponder : public MessageReceiver { // |responder| and will delete it after calling |responder->Accept| or upon // its own destruction. // + // TODO(yzshen): consider changing |responder| to scoped_ptr<MessageReceiver>. virtual bool AcceptWithResponder(Message* message, MessageReceiver* responder) MOJO_WARN_UNUSED_RESULT = 0; }; @@ -154,6 +155,7 @@ class MessageReceiverWithResponderStatus : public MessageReceiver { // |responder| and will delete it after calling |responder->Accept| or upon // its own destruction. // + // TODO(yzshen): consider changing |responder| to scoped_ptr<MessageReceiver>. virtual bool AcceptWithResponder(Message* message, MessageReceiverWithStatus* responder) MOJO_WARN_UNUSED_RESULT = 0; diff --git a/mojo/public/cpp/bindings/tests/router_unittest.cc b/mojo/public/cpp/bindings/tests/router_unittest.cc index e88f0e8..1f7160e 100644 --- a/mojo/public/cpp/bindings/tests/router_unittest.cc +++ b/mojo/public/cpp/bindings/tests/router_unittest.cc @@ -39,8 +39,8 @@ class RouterTest : public testing::Test { }; TEST_F(RouterTest, BasicRequestResponse) { - internal::Router router0(std::move(handle0_), internal::FilterChain()); - internal::Router router1(std::move(handle1_), internal::FilterChain()); + internal::Router router0(std::move(handle0_), internal::FilterChain(), false); + internal::Router router1(std::move(handle1_), internal::FilterChain(), false); ResponseGenerator generator; router1.set_incoming_receiver(&generator); @@ -83,8 +83,8 @@ TEST_F(RouterTest, BasicRequestResponse) { } TEST_F(RouterTest, BasicRequestResponse_Synchronous) { - internal::Router router0(std::move(handle0_), internal::FilterChain()); - internal::Router router1(std::move(handle1_), internal::FilterChain()); + internal::Router router0(std::move(handle0_), internal::FilterChain(), false); + internal::Router router1(std::move(handle1_), internal::FilterChain(), false); ResponseGenerator generator; router1.set_incoming_receiver(&generator); @@ -125,8 +125,8 @@ TEST_F(RouterTest, BasicRequestResponse_Synchronous) { } TEST_F(RouterTest, RequestWithNoReceiver) { - internal::Router router0(std::move(handle0_), internal::FilterChain()); - internal::Router router1(std::move(handle1_), internal::FilterChain()); + internal::Router router0(std::move(handle0_), internal::FilterChain(), false); + internal::Router router1(std::move(handle1_), internal::FilterChain(), false); // Without an incoming receiver set on router1, we expect router0 to observe // an error as a result of sending a message. @@ -151,8 +151,8 @@ TEST_F(RouterTest, RequestWithNoReceiver) { // Tests Router using the LazyResponseGenerator. The responses will not be // sent until after the requests have been accepted. TEST_F(RouterTest, LazyResponses) { - internal::Router router0(std::move(handle0_), internal::FilterChain()); - internal::Router router1(std::move(handle1_), internal::FilterChain()); + internal::Router router0(std::move(handle0_), internal::FilterChain(), false); + internal::Router router1(std::move(handle1_), internal::FilterChain(), false); base::RunLoop run_loop; LazyResponseGenerator generator(run_loop.QuitClosure()); @@ -217,7 +217,7 @@ TEST_F(RouterTest, LazyResponses) { // both sides still appear to have a valid message pipe handle bound. TEST_F(RouterTest, MissingResponses) { base::RunLoop run_loop0, run_loop1; - internal::Router router0(std::move(handle0_), internal::FilterChain()); + internal::Router router0(std::move(handle0_), internal::FilterChain(), false); bool error_handler_called0 = false; router0.set_connection_error_handler( [&error_handler_called0, &run_loop0]() { @@ -225,7 +225,7 @@ TEST_F(RouterTest, MissingResponses) { run_loop0.Quit(); }); - internal::Router router1(std::move(handle1_), internal::FilterChain()); + internal::Router router1(std::move(handle1_), internal::FilterChain(), false); bool error_handler_called1 = false; router1.set_connection_error_handler( [&error_handler_called1, &run_loop1]() { @@ -277,8 +277,10 @@ TEST_F(RouterTest, LateResponse) { base::RunLoop run_loop; LazyResponseGenerator generator(run_loop.QuitClosure()); { - internal::Router router0(std::move(handle0_), internal::FilterChain()); - internal::Router router1(std::move(handle1_), internal::FilterChain()); + internal::Router router0(std::move(handle0_), internal::FilterChain(), + false); + internal::Router router1(std::move(handle1_), internal::FilterChain(), + false); router1.set_incoming_receiver(&generator); diff --git a/mojo/public/cpp/bindings/tests/sync_method_unittest.cc b/mojo/public/cpp/bindings/tests/sync_method_unittest.cc index fd519b0..207a6e2 100644 --- a/mojo/public/cpp/bindings/tests/sync_method_unittest.cc +++ b/mojo/public/cpp/bindings/tests/sync_method_unittest.cc @@ -30,26 +30,47 @@ class TestSyncImpl : public TestSync { public: TestSyncImpl(TestSyncRequest request) : binding_(this, std::move(request)) {} - void set_ping_notification(const Closure& closure) { - ping_notification_ = closure; + using PingHandler = Callback<void(const PingCallback&)>; + void set_ping_handler(const PingHandler& handler) { ping_handler_ = handler; } + + using EchoHandler = Callback<void(int32_t, const EchoCallback&)>; + void set_echo_handler(const EchoHandler& handler) { echo_handler_ = handler; } + + using AsyncEchoHandler = Callback<void(int32_t, const AsyncEchoCallback&)>; + void set_async_echo_handler(const AsyncEchoHandler& handler) { + async_echo_handler_ = handler; } // TestSync implementation: - void Get(const GetCallback& callback) override { callback.Run(42); } - void Set(int32_t value, const SetCallback& callback) override { - callback.Run(); - } void Ping(const PingCallback& callback) override { - ping_notification_.Run(); - callback.Run(); + if (ping_handler_.is_null()) { + callback.Run(); + return; + } + ping_handler_.Run(callback); } void Echo(int32_t value, const EchoCallback& callback) override { - callback.Run(value); + if (echo_handler_.is_null()) { + callback.Run(value); + return; + } + echo_handler_.Run(value, callback); + } + void AsyncEcho(int32_t value, const AsyncEchoCallback& callback) override { + if (async_echo_handler_.is_null()) { + callback.Run(value); + return; + } + async_echo_handler_.Run(value, callback); } + Binding<TestSync>* binding() { return &binding_; } + private: Binding<TestSync> binding_; - Closure ping_notification_; + PingHandler ping_handler_; + EchoHandler echo_handler_; + AsyncEchoHandler async_echo_handler_; DISALLOW_COPY_AND_ASSIGN(TestSyncImpl); }; @@ -67,9 +88,12 @@ class TestSyncServiceThread { void SetUp(TestSyncRequest request) { CHECK(thread_.task_runner()->BelongsToCurrentThread()); impl_.reset(new TestSyncImpl(std::move(request))); - impl_->set_ping_notification([this]() { - base::AutoLock locker(lock_); - ping_called_ = true; + impl_->set_ping_handler([this](const TestSync::PingCallback& callback) { + { + base::AutoLock locker(lock_); + ping_called_ = true; + } + callback.Run(); }); } @@ -130,6 +154,197 @@ TEST_F(SyncMethodTest, BasicSyncCalls) { run_loop.Run(); } +TEST_F(SyncMethodTest, ReenteredBySyncMethodBinding) { + // Test that an interface pointer waiting for a sync call response can be + // reentered by a binding serving sync methods on the same thread. + + TestSyncPtr ptr; + // The binding lives on the same thread as the interface pointer. + TestSyncImpl impl(GetProxy(&ptr)); + int32_t output_value = -1; + ASSERT_TRUE(ptr->Echo(42, &output_value)); + EXPECT_EQ(42, output_value); +} + +TEST_F(SyncMethodTest, InterefacePtrDestroyedDuringSyncCall) { + // Test that it won't result in crash or hang if an interface pointer is + // destroyed while it is waiting for a sync call response. + + TestSyncPtr ptr; + TestSyncImpl impl(GetProxy(&ptr)); + impl.set_ping_handler([&ptr](const TestSync::PingCallback& callback) { + ptr.reset(); + callback.Run(); + }); + ASSERT_FALSE(ptr->Ping()); +} + +TEST_F(SyncMethodTest, BindingDestroyedDuringSyncCall) { + // Test that it won't result in crash or hang if a binding is + // closed (and therefore the message pipe handle is closed) while the + // corresponding interface pointer is waiting for a sync call response. + + TestSyncPtr ptr; + TestSyncImpl impl(GetProxy(&ptr)); + impl.set_ping_handler([&impl](const TestSync::PingCallback& callback) { + impl.binding()->Close(); + callback.Run(); + }); + ASSERT_FALSE(ptr->Ping()); +} + +TEST_F(SyncMethodTest, NestedSyncCallsWithInOrderResponses) { + // Test that we can call a sync method on an interface ptr, while there is + // already a sync call ongoing. The responses arrive in order. + + TestSyncPtr ptr; + TestSyncImpl impl(GetProxy(&ptr)); + + // The same variable is used to store the output of the two sync calls, in + // order to test that responses are handled in the correct order. + int32_t result_value = -1; + + bool first_call = true; + impl.set_echo_handler([&first_call, &ptr, &result_value]( + int32_t value, const TestSync::EchoCallback& callback) { + if (first_call) { + first_call = false; + ASSERT_TRUE(ptr->Echo(456, &result_value)); + EXPECT_EQ(456, result_value); + } + callback.Run(value); + }); + + ASSERT_TRUE(ptr->Echo(123, &result_value)); + EXPECT_EQ(123, result_value); +} + +TEST_F(SyncMethodTest, NestedSyncCallsWithOutOfOrderResponses) { + // Test that we can call a sync method on an interface ptr, while there is + // already a sync call ongoing. The responses arrive out of order. + + TestSyncPtr ptr; + TestSyncImpl impl(GetProxy(&ptr)); + + // The same variable is used to store the output of the two sync calls, in + // order to test that responses are handled in the correct order. + int32_t result_value = -1; + + bool first_call = true; + impl.set_echo_handler([&first_call, &ptr, &result_value]( + int32_t value, const TestSync::EchoCallback& callback) { + callback.Run(value); + if (first_call) { + first_call = false; + ASSERT_TRUE(ptr->Echo(456, &result_value)); + EXPECT_EQ(456, result_value); + } + }); + + ASSERT_TRUE(ptr->Echo(123, &result_value)); + EXPECT_EQ(123, result_value); +} + +TEST_F(SyncMethodTest, AsyncResponseQueuedDuringSyncCall) { + // Test that while an interface pointer is waiting for the response to a sync + // call, async responses are queued until the sync call completes. + + TestSyncPtr ptr; + TestSyncImpl impl(GetProxy(&ptr)); + + int32_t async_echo_request_value = -1; + TestSync::AsyncEchoCallback async_echo_request_callback; + base::RunLoop run_loop1; + impl.set_async_echo_handler( + [&async_echo_request_value, &async_echo_request_callback, &run_loop1]( + int32_t value, const TestSync::AsyncEchoCallback& callback) { + async_echo_request_value = value; + async_echo_request_callback = callback; + run_loop1.Quit(); + }); + + bool async_echo_response_dispatched = false; + base::RunLoop run_loop2; + ptr->AsyncEcho(123, + [&async_echo_response_dispatched, &run_loop2](int32_t result) { + async_echo_response_dispatched = true; + EXPECT_EQ(123, result); + run_loop2.Quit(); + }); + // Run until the AsyncEcho request reaches the service side. + run_loop1.Run(); + + impl.set_echo_handler( + [&async_echo_request_value, &async_echo_request_callback]( + int32_t value, const TestSync::EchoCallback& callback) { + // Send back the async response first. + EXPECT_FALSE(async_echo_request_callback.is_null()); + async_echo_request_callback.Run(async_echo_request_value); + + callback.Run(value); + }); + + int32_t result_value = -1; + ASSERT_TRUE(ptr->Echo(456, &result_value)); + EXPECT_EQ(456, result_value); + + // Although the AsyncEcho response arrives before the Echo response, it should + // be queued and not yet dispatched. + EXPECT_FALSE(async_echo_response_dispatched); + + // Run until the AsyncEcho response is dispatched. + run_loop2.Run(); + + EXPECT_TRUE(async_echo_response_dispatched); +} + +TEST_F(SyncMethodTest, AsyncRequestQueuedDuringSyncCall) { + // Test that while an interface pointer is waiting for the response to a sync + // call, async requests for a binding running on the same thread are queued + // until the sync call completes. + + TestSyncPtr ptr; + TestSyncImpl impl(GetProxy(&ptr)); + + bool async_echo_request_dispatched = false; + impl.set_async_echo_handler([&async_echo_request_dispatched]( + int32_t value, const TestSync::AsyncEchoCallback& callback) { + async_echo_request_dispatched = true; + callback.Run(value); + }); + + bool async_echo_response_dispatched = false; + base::RunLoop run_loop; + ptr->AsyncEcho(123, + [&async_echo_response_dispatched, &run_loop](int32_t result) { + async_echo_response_dispatched = true; + EXPECT_EQ(123, result); + run_loop.Quit(); + }); + + impl.set_echo_handler([&async_echo_request_dispatched]( + int32_t value, const TestSync::EchoCallback& callback) { + // Although the AsyncEcho request is sent before the Echo request, it + // shouldn't be dispatched yet at this point, because there is an ongoing + // sync call on the same thread. + EXPECT_FALSE(async_echo_request_dispatched); + callback.Run(value); + }); + + int32_t result_value = -1; + ASSERT_TRUE(ptr->Echo(456, &result_value)); + EXPECT_EQ(456, result_value); + + // Although the AsyncEcho request is sent before the Echo request, it + // shouldn't be dispatched yet. + EXPECT_FALSE(async_echo_request_dispatched); + + // Run until the AsyncEcho response is dispatched. + run_loop.Run(); + + EXPECT_TRUE(async_echo_response_dispatched); +} + } // namespace } // namespace test } // namespace mojo diff --git a/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom b/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom index 2df8a1c..1a55a0f 100644 --- a/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom +++ b/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom @@ -4,16 +4,26 @@ module mojo.test; -interface TestSync { +interface TestCodeGeneration { + [Sync] + NoInput() => (int32 result); + + [Sync] + NoOutput(int32 value) => (); + [Sync] - Get() => (int32 result); + NoInOut() => (); [Sync] - Set(int32 value) => (); + HaveInOut(int32 value1, int32 value2) => (int32 result1, int32 result2); +}; +interface TestSync { [Sync] Ping() => (); [Sync] Echo(int32 value) => (int32 result); + + AsyncEcho(int32 value) => (int32 result); }; diff --git a/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl b/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl index 7bc6d6c..2f0c60e 100644 --- a/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl +++ b/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl @@ -12,6 +12,7 @@ class {{interface.name}} { static const char Name_[]; static const uint32_t Version_ = {{interface.version}}; static const bool PassesAssociatedKinds_ = {% if interface|passes_associated_kinds %}true{% else %}false{% endif %}; + static const bool HasSyncMethods_ = {% if interface|has_sync_methods %}true{% else %}false{% endif %}; using GenericInterface = {{interface|get_qualified_name_for_kind}}; diff --git a/mojo/public/tools/bindings/generators/mojom_cpp_generator.py b/mojo/public/tools/bindings/generators/mojom_cpp_generator.py index 09b8bad..22e089c 100644 --- a/mojo/public/tools/bindings/generators/mojom_cpp_generator.py +++ b/mojo/public/tools/bindings/generators/mojom_cpp_generator.py @@ -473,6 +473,7 @@ class Generator(generator.Generator): "get_pad": pack.GetPad, "get_qualified_name_for_kind": GetQualifiedNameForKind, "has_callbacks": mojom.HasCallbacks, + "has_sync_methods": mojom.HasSyncMethods, "should_inline": ShouldInlineStruct, "should_inline_union": ShouldInlineUnion, "is_array_kind": mojom.IsArrayKind, diff --git a/mojo/public/tools/bindings/pylib/mojom/generate/module.py b/mojo/public/tools/bindings/pylib/mojom/generate/module.py index 860438d..6a86f16 100644 --- a/mojo/public/tools/bindings/pylib/mojom/generate/module.py +++ b/mojo/public/tools/bindings/pylib/mojom/generate/module.py @@ -671,3 +671,10 @@ def PassesAssociatedKinds(interface): if _ContainsAssociatedKinds(param.kind, visited_kinds): return True return False + + +def HasSyncMethods(interface): + for method in interface.methods: + if method.sync: + return True + return False |