diff options
author | reillyg <reillyg@chromium.org> | 2016-02-22 12:20:15 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-02-22 20:21:08 +0000 |
commit | 8017c320b4215892c4fdf339e75fea5d5a17ce16 (patch) | |
tree | 3f9a75fd00a68fe94925d94abde6396ca614fe12 | |
parent | 6e6d14f5c4e968fce5ce0f874005be4ca127d291 (diff) | |
download | chromium_src-8017c320b4215892c4fdf339e75fea5d5a17ce16.zip chromium_src-8017c320b4215892c4fdf339e75fea5d5a17ce16.tar.gz chromium_src-8017c320b4215892c4fdf339e75fea5d5a17ce16.tar.bz2 |
Revert of Mojo C++ bindings: support sync methods - part 2 (patchset #6 id:100001 of https://codereview.chromium.org/1713203002/ )
Reason for revert:
This change is triggering a use-of-uninitialized-value error on the Linux and Chrome OS MSan bots:
https://build.chromium.org/p/chromium.memory.fyi/builders/Linux%20ChromeOS%20MSan%20Tests/builds/7464
Original issue's description:
> Mojo C++ bindings: support sync methods - part 2
>
> This CL introduces the correct re-entrancy behavior: when a sync call is waiting
> for response, allow incoming sync requests on the same thread to re-enter, async
> messages are queued until the sync call completes.
>
> The following will be in future CLs:
> - Support sync calls with associated interfaces.
>
> BUG=577699
>
> Committed: https://crrev.com/083f46638041ba7d7620e47a7f9edf06982a9340
> Cr-Commit-Position: refs/heads/master@{#376751}
TBR=jam@chromium.org,yzshen@chromium.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=577699
Review URL: https://codereview.chromium.org/1720093002
Cr-Commit-Position: refs/heads/master@{#376793}
17 files changed, 92 insertions, 725 deletions
diff --git a/mojo/mojo_public.gyp b/mojo/mojo_public.gyp index 18f3b75f..82cb600 100644 --- a/mojo/mojo_public.gyp +++ b/mojo/mojo_public.gyp @@ -162,8 +162,6 @@ 'public/cpp/bindings/lib/shared_ptr.h', 'public/cpp/bindings/lib/string_serialization.cc', 'public/cpp/bindings/lib/string_serialization.h', - 'public/cpp/bindings/lib/sync_handle_watcher.cc', - 'public/cpp/bindings/lib/sync_handle_watcher.h', 'public/cpp/bindings/lib/validate_params.h', 'public/cpp/bindings/lib/validation_errors.cc', 'public/cpp/bindings/lib/validation_errors.h', diff --git a/mojo/public/cpp/bindings/BUILD.gn b/mojo/public/cpp/bindings/BUILD.gn index 72079fd..d650a43a0 100644 --- a/mojo/public/cpp/bindings/BUILD.gn +++ b/mojo/public/cpp/bindings/BUILD.gn @@ -66,8 +66,6 @@ source_set("bindings") { "lib/scoped_interface_endpoint_handle.h", "lib/string_serialization.cc", "lib/string_serialization.h", - "lib/sync_handle_watcher.cc", - "lib/sync_handle_watcher.h", "lib/union_accessor.h", "lib/validate_params.h", "lib/validation_errors.cc", diff --git a/mojo/public/cpp/bindings/lib/binding_state.h b/mojo/public/cpp/bindings/lib/binding_state.h index 6483d94..3d025fb 100644 --- a/mojo/public/cpp/bindings/lib/binding_state.h +++ b/mojo/public/cpp/bindings/lib/binding_state.h @@ -56,8 +56,8 @@ class BindingState<Interface, false> { filters.Append<internal::MessageHeaderValidator>(); filters.Append<typename Interface::RequestValidator_>(); - router_ = new internal::Router(std::move(handle), std::move(filters), - Interface::HasSyncMethods_, waiter); + router_ = + new internal::Router(std::move(handle), std::move(filters), waiter); router_->set_incoming_receiver(&stub_); router_->set_connection_error_handler( [this]() { connection_error_handler_.Run(); }); diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc index 4eff31d..ceccaa9 100644 --- a/mojo/public/cpp/bindings/lib/connector.cc +++ b/mojo/public/cpp/bindings/lib/connector.cc @@ -7,11 +7,9 @@ #include <stdint.h> #include <utility> -#include "base/bind.h" #include "base/logging.h" #include "base/macros.h" #include "base/synchronization/lock.h" -#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" namespace mojo { namespace internal { @@ -54,11 +52,8 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, drop_writes_(false), enforce_errors_from_incoming_receiver_(true), paused_(false), - lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), - register_sync_handle_watch_count_(0), - registered_with_sync_handle_watcher_(false), - sync_handle_watcher_callback_count_(0), - weak_factory_(this) { + destroyed_flag_(nullptr), + lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { // Even though we don't have an incoming receiver, we still want to monitor // the message pipe to know if is closed or encounters an error. WaitToReadMore(); @@ -67,6 +62,9 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, Connector::~Connector() { DCHECK(thread_checker_.CalledOnValidThread()); + if (destroyed_flag_) + *destroyed_flag_ = true; + CancelWait(); } @@ -193,76 +191,17 @@ bool Connector::Accept(Message* message) { return true; } -bool Connector::RegisterSyncHandleWatch() { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (error_) - return false; - - register_sync_handle_watch_count_++; - - if (!registered_with_sync_handle_watcher_ && !paused_) { - registered_with_sync_handle_watcher_ = - SyncHandleWatcher::current()->RegisterHandle( - message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, - base::Bind(&Connector::OnSyncHandleWatcherHandleReady, - base::Unretained(this))); - } - return true; -} - -void Connector::UnregisterSyncHandleWatch() { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (register_sync_handle_watch_count_ == 0) { - NOTREACHED(); - return; - } - - register_sync_handle_watch_count_--; - if (register_sync_handle_watch_count_ > 0) - return; - - if (registered_with_sync_handle_watcher_) { - SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); - registered_with_sync_handle_watcher_ = false; - } -} - -bool Connector::RunSyncHandleWatch(const bool* should_stop) { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK_GT(register_sync_handle_watch_count_, 0u); - - if (error_) - return false; - - ResumeIncomingMethodCallProcessing(); - - return SyncHandleWatcher::current()->WatchAllHandles(message_pipe_.get(), - should_stop); -} - // static void Connector::CallOnHandleReady(void* closure, MojoResult result) { Connector* self = static_cast<Connector*>(closure); - CHECK(self->async_wait_id_ != 0); - self->async_wait_id_ = 0; - self->OnHandleReadyInternal(result); + self->OnHandleReady(result); } -void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { - base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); - - sync_handle_watcher_callback_count_++; - OnHandleReadyInternal(result); - // At this point, this object might have been deleted. - if (weak_self) - sync_handle_watcher_callback_count_--; -} - -void Connector::OnHandleReadyInternal(MojoResult result) { +void Connector::OnHandleReady(MojoResult result) { DCHECK(thread_checker_.CalledOnValidThread()); + CHECK(async_wait_id_ != 0); + async_wait_id_ = 0; if (result != MOJO_RESULT_OK) { HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); return; @@ -279,15 +218,6 @@ void Connector::WaitToReadMore() { MOJO_DEADLINE_INDEFINITE, &Connector::CallOnHandleReady, this); - - if (register_sync_handle_watch_count_ > 0 && - !registered_with_sync_handle_watcher_) { - registered_with_sync_handle_watcher_ = - SyncHandleWatcher::current()->RegisterHandle( - message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, - base::Bind(&Connector::OnSyncHandleWatcherHandleReady, - base::Unretained(this))); - } } bool Connector::ReadSingleMessage(MojoResult* read_result) { @@ -297,7 +227,9 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { // Detect if |this| was destroyed during message dispatch. Allow for the // possibility of re-entering ReadMore() through message dispatch. - base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); + bool was_destroyed_during_dispatch = false; + bool* previous_destroyed_flag = destroyed_flag_; + destroyed_flag_ = &was_destroyed_during_dispatch; Message message; const MojoResult rv = ReadMessage(message_pipe_.get(), &message); @@ -315,8 +247,13 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { incoming_receiver_ && incoming_receiver_->Accept(&message); } - if (!weak_self) + if (was_destroyed_during_dispatch) { + if (previous_destroyed_flag) + *previous_destroyed_flag = true; // Propagate flag. return false; + } + + destroyed_flag_ = previous_destroyed_flag; if (rv == MOJO_RESULT_SHOULD_WAIT) return true; @@ -358,15 +295,11 @@ void Connector::ReadAllAvailableMessages() { } void Connector::CancelWait() { - if (async_wait_id_) { - waiter_->CancelWait(async_wait_id_); - async_wait_id_ = 0; - } + if (!async_wait_id_) + return; - if (registered_with_sync_handle_watcher_) { - SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); - registered_with_sync_handle_watcher_ = false; - } + waiter_->CancelWait(async_wait_id_); + async_wait_id_ = 0; } void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { diff --git a/mojo/public/cpp/bindings/lib/connector.h b/mojo/public/cpp/bindings/lib/connector.h index 8e664a1..3978171 100644 --- a/mojo/public/cpp/bindings/lib/connector.h +++ b/mojo/public/cpp/bindings/lib/connector.h @@ -6,7 +6,6 @@ #define MOJO_PUBLIC_CPP_BINDINGS_LIB_CONNECTOR_H_ #include "base/memory/scoped_ptr.h" -#include "base/memory/weak_ptr.h" #include "base/threading/thread_checker.h" #include "mojo/public/c/environment/async_waiter.h" #include "mojo/public/cpp/bindings/callback.h" @@ -122,31 +121,9 @@ class Connector : public MessageReceiver { return message_pipe_.get(); } - // Requests to register |message_pipe_| with SyncHandleWatcher whenever this - // instance is expecting incoming messages. - // - // Please note that UnregisterSyncHandleWatch() needs to be called as many - // times as successful RegisterSyncHandleWatch() calls in order to cancel the - // effect. - bool RegisterSyncHandleWatch(); - void UnregisterSyncHandleWatch(); - - // Watches all handles registered with SyncHandleWatcher on the same thread. - // The method returns true when |*should_stop| is set to true; returns false - // when any failure occurs during the watch, including |message_pipe_| is - // closed. - bool RunSyncHandleWatch(const bool* should_stop); - - // Whether currently the control flow is inside the sync handle watcher - // callback. - bool during_sync_handle_watcher_callback() const { - return sync_handle_watcher_callback_count_ > 0; - } - private: static void CallOnHandleReady(void* closure, MojoResult result); - void OnSyncHandleWatcherHandleReady(MojoResult result); - void OnHandleReadyInternal(MojoResult result); + void OnHandleReady(MojoResult result); void WaitToReadMore(); @@ -178,22 +155,17 @@ class Connector : public MessageReceiver { bool paused_; + // If non-null, this will be set to true when the Connector is destroyed. We + // use this flag to allow for the Connector to be destroyed as a side-effect + // of dispatching an incoming message. + bool* destroyed_flag_; + // If sending messages is allowed from multiple threads, |lock_| is used to // protect modifications to |message_pipe_| and |drop_writes_|. scoped_ptr<base::Lock> lock_; - // If non-zero, |message_pipe_| should be registered with SyncHandleWatcher. - size_t register_sync_handle_watch_count_; - // Whether |message_pipe_| has been registered with SyncHandleWatcher. - bool registered_with_sync_handle_watcher_; - // If non-zero, currently the control flow is inside the sync handle watcher - // callback. - size_t sync_handle_watcher_callback_count_; - base::ThreadChecker thread_checker_; - base::WeakPtrFactory<Connector> weak_factory_; - MOJO_DISALLOW_COPY_AND_ASSIGN(Connector); }; diff --git a/mojo/public/cpp/bindings/lib/interface_ptr_state.h b/mojo/public/cpp/bindings/lib/interface_ptr_state.h index b2093bb..a175edd 100644 --- a/mojo/public/cpp/bindings/lib/interface_ptr_state.h +++ b/mojo/public/cpp/bindings/lib/interface_ptr_state.h @@ -172,8 +172,7 @@ class InterfacePtrState<Interface, false> { filters.Append<MessageHeaderValidator>(); filters.Append<typename Interface::ResponseValidator_>(); - router_ = - new Router(std::move(handle_), std::move(filters), false, waiter_); + router_ = new Router(std::move(handle_), std::move(filters), waiter_); waiter_ = nullptr; proxy_ = new Proxy(router_); diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc index a78cdc7..9bc9aa1 100644 --- a/mojo/public/cpp/bindings/lib/router.cc +++ b/mojo/public/cpp/bindings/lib/router.cc @@ -7,10 +7,7 @@ #include <stdint.h> #include <utility> -#include "base/bind.h" #include "base/logging.h" -#include "base/message_loop/message_loop.h" -#include "base/stl_util.h" namespace mojo { namespace internal { @@ -63,13 +60,6 @@ class ResponderThunk : public MessageReceiverWithStatus { // ---------------------------------------------------------------------------- -Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) - : response_received(in_response_received) {} - -Router::SyncResponseInfo::~SyncResponseInfo() {} - -// ---------------------------------------------------------------------------- - Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) : router_(router) { } @@ -85,7 +75,6 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) { Router::Router(ScopedMessagePipeHandle message_pipe, FilterChain filters, - bool expects_sync_requests, const MojoAsyncWaiter* waiter) : thunk_(this), filters_(std::move(filters)), @@ -95,15 +84,19 @@ Router::Router(ScopedMessagePipeHandle message_pipe, incoming_receiver_(nullptr), next_request_id_(0), testing_mode_(false), - pending_task_for_messages_(false), weak_factory_(this) { filters_.SetSink(&thunk_); - if (expects_sync_requests) - connector_.RegisterSyncHandleWatch(); connector_.set_incoming_receiver(filters_.GetHead()); } -Router::~Router() {} +Router::~Router() { + weak_factory_.InvalidateWeakPtrs(); + + for (auto& pair : async_responders_) + delete pair.second; + for (auto& pair : sync_responders_) + delete pair.second; +} bool Router::Accept(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); @@ -126,21 +119,17 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { if (!message->has_flag(kMessageIsSync)) { // We assume ownership of |responder|. - async_responders_[request_id] = make_scoped_ptr(responder); + async_responders_[request_id] = responder; return true; } - if (!connector_.RegisterSyncHandleWatch()) - return false; - - bool response_received = false; - scoped_ptr<MessageReceiver> sync_responder(responder); - sync_responses_.insert(std::make_pair( - request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); - + sync_responders_[request_id] = responder; base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); - do { - bool result = connector_.RunSyncHandleWatch(&response_received); + for (;;) { + // TODO(yzshen): Here we should allow incoming sync requests to re-enter and + // block async messages. + bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); + // The message pipe has disconnected. if (!result) break; @@ -149,17 +138,9 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { break; // The corresponding response message has arrived. - DCHECK(response_received); - DCHECK(ContainsKey(sync_responses_, request_id)); - auto iter = sync_responses_.find(request_id); - DCHECK_EQ(&response_received, iter->second->response_received); - scoped_ptr<Message> response = std::move(iter->second->response); - sync_responses_.erase(iter); - ignore_result(sync_responder->Accept(response.get())); - } while (false); - - if (weak_self) - connector_.UnregisterSyncHandleWatch(); + if (sync_responders_.find(request_id) == sync_responders_.end()) + break; + } // Return true means that we take ownership of |responder|. return true; @@ -173,51 +154,6 @@ void Router::EnableTestingMode() { bool Router::HandleIncomingMessage(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); - - const bool during_sync_call = - connector_.during_sync_handle_watcher_callback(); - if (!message->has_flag(kMessageIsSync) && - (during_sync_call || !pending_messages_.empty())) { - scoped_ptr<Message> pending_message(new Message); - message->MoveTo(pending_message.get()); - pending_messages_.push(std::move(pending_message)); - - if (!pending_task_for_messages_) { - pending_task_for_messages_ = true; - base::MessageLoop::current()->PostTask( - FROM_HERE, base::Bind(&Router::HandleQueuedMessages, - weak_factory_.GetWeakPtr())); - } - - return true; - } - - return HandleMessageInternal(message); -} - -void Router::HandleQueuedMessages() { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(pending_task_for_messages_); - - base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); - while (!pending_messages_.empty()) { - scoped_ptr<Message> message(std::move(pending_messages_.front())); - pending_messages_.pop(); - - bool result = HandleMessageInternal(message.get()); - if (!weak_self) - return; - - if (!result && !testing_mode_) { - connector_.RaiseError(); - break; - } - } - - pending_task_for_messages_ = false; -} - -bool Router::HandleMessageInternal(Message* message) { if (message->has_flag(kMessageExpectsResponse)) { if (!incoming_receiver_) return false; @@ -230,28 +166,20 @@ bool Router::HandleMessageInternal(Message* message) { return ok; } else if (message->has_flag(kMessageIsResponse)) { + ResponderMap& responder_map = message->has_flag(kMessageIsSync) + ? sync_responders_ + : async_responders_; uint64_t request_id = message->request_id(); - - if (message->has_flag(kMessageIsSync)) { - auto it = sync_responses_.find(request_id); - if (it == sync_responses_.end()) { - DCHECK(testing_mode_); - return false; - } - it->second->response.reset(new Message()); - message->MoveTo(it->second->response.get()); - *it->second->response_received = true; - return true; - } - - auto it = async_responders_.find(request_id); - if (it == async_responders_.end()) { + ResponderMap::iterator it = responder_map.find(request_id); + if (it == responder_map.end()) { DCHECK(testing_mode_); return false; } - scoped_ptr<MessageReceiver> responder = std::move(it->second); - async_responders_.erase(it); - return responder->Accept(message); + MessageReceiver* responder = it->second; + responder_map.erase(it); + bool ok = responder->Accept(message); + delete responder; + return ok; } else { if (!incoming_receiver_) return false; diff --git a/mojo/public/cpp/bindings/lib/router.h b/mojo/public/cpp/bindings/lib/router.h index 12aa7e72..f7ce513 100644 --- a/mojo/public/cpp/bindings/lib/router.h +++ b/mojo/public/cpp/bindings/lib/router.h @@ -8,10 +8,8 @@ #include <stdint.h> #include <map> -#include <queue> -#include "base/macros.h" -#include "base/memory/scoped_ptr.h" +#include "base/logging.h" #include "base/memory/weak_ptr.h" #include "base/threading/thread_checker.h" #include "mojo/public/cpp/bindings/callback.h" @@ -26,7 +24,6 @@ class Router : public MessageReceiverWithResponder { public: Router(ScopedMessagePipeHandle message_pipe, FilterChain filters, - bool expects_sync_requests, const MojoAsyncWaiter* waiter = Environment::GetDefaultAsyncWaiter()); ~Router() override; @@ -107,29 +104,13 @@ class Router : public MessageReceiverWithResponder { // Returns true if this Router has any pending callbacks. bool has_pending_responders() const { DCHECK(thread_checker_.CalledOnValidThread()); - return !async_responders_.empty() || !sync_responses_.empty(); + return !async_responders_.empty() || !sync_responders_.empty(); } private: // Maps from the id of a response to the MessageReceiver that handles the // response. - using AsyncResponderMap = std::map<uint64_t, scoped_ptr<MessageReceiver>>; - - struct SyncResponseInfo { - public: - explicit SyncResponseInfo(bool* in_response_received); - ~SyncResponseInfo(); - - scoped_ptr<Message> response; - - // Points to a stack-allocated variable. - bool* response_received; - - private: - DISALLOW_COPY_AND_ASSIGN(SyncResponseInfo); - }; - - using SyncResponseMap = std::map<uint64_t, scoped_ptr<SyncResponseInfo>>; + typedef std::map<uint64_t, MessageReceiver*> ResponderMap; class HandleIncomingMessageThunk : public MessageReceiver { public: @@ -144,22 +125,15 @@ class Router : public MessageReceiverWithResponder { }; bool HandleIncomingMessage(Message* message); - void HandleQueuedMessages(); - - bool HandleMessageInternal(Message* message); HandleIncomingMessageThunk thunk_; FilterChain filters_; Connector connector_; MessageReceiverWithResponderStatus* incoming_receiver_; - AsyncResponderMap async_responders_; - SyncResponseMap sync_responses_; + ResponderMap async_responders_; + ResponderMap sync_responders_; uint64_t next_request_id_; bool testing_mode_; - std::queue<scoped_ptr<Message>> pending_messages_; - // Whether a task has been posted to trigger processing of - // |pending_messages_|. - bool pending_task_for_messages_; base::ThreadChecker thread_checker_; base::WeakPtrFactory<Router> weak_factory_; }; diff --git a/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc b/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc deleted file mode 100644 index 197447f..0000000 --- a/mojo/public/cpp/bindings/lib/sync_handle_watcher.cc +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2016 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" - -#include "base/lazy_instance.h" -#include "base/logging.h" -#include "base/stl_util.h" -#include "base/threading/thread_local.h" -#include "mojo/public/c/system/core.h" - -namespace mojo { -namespace internal { -namespace { - -base::LazyInstance<base::ThreadLocalPointer<SyncHandleWatcher>> - g_current_sync_handle_watcher = LAZY_INSTANCE_INITIALIZER; - -} // namespace - -// static -SyncHandleWatcher* SyncHandleWatcher::current() { - SyncHandleWatcher* result = g_current_sync_handle_watcher.Pointer()->Get(); - if (!result) { - // This object will be destroyed when the current message loop goes away. - result = new SyncHandleWatcher(); - DCHECK_EQ(result, g_current_sync_handle_watcher.Pointer()->Get()); - } - return result; -} - -bool SyncHandleWatcher::RegisterHandle(const Handle& handle, - MojoHandleSignals handle_signals, - const HandleCallback& callback) { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (ContainsKey(handles_, handle)) - return false; - - MojoResult result = MojoAddHandle(wait_set_handle_.get().value(), - handle.value(), handle_signals); - if (result != MOJO_RESULT_OK) - return false; - - handles_[handle] = callback; - return true; -} - -void SyncHandleWatcher::UnregisterHandle(const Handle& handle) { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(ContainsKey(handles_, handle)); - - MojoResult result = - MojoRemoveHandle(wait_set_handle_.get().value(), handle.value()); - DCHECK_EQ(MOJO_RESULT_OK, result); - - handles_.erase(handle); -} - -bool SyncHandleWatcher::WatchAllHandles(const Handle& caller_handle, - const bool* should_stop) { - DCHECK(thread_checker_.CalledOnValidThread()); - - MojoResult result; - uint32_t num_ready_handles; - MojoHandle ready_handle; - MojoResult ready_handle_result; - - while (!*should_stop) { - if (!ContainsKey(handles_, caller_handle)) - return false; - do { - result = Wait(wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, - MOJO_DEADLINE_INDEFINITE, nullptr); - if (result != MOJO_RESULT_OK) - return false; - - // TODO(yzshen): Theoretically it can reduce sync call re-entrancy if we - // give priority to the handle that is waiting for sync response. - num_ready_handles = 1; - result = MojoGetReadyHandles(wait_set_handle_.get().value(), - &num_ready_handles, &ready_handle, - &ready_handle_result, nullptr); - if (result != MOJO_RESULT_OK && result != MOJO_RESULT_SHOULD_WAIT) - return false; - } while (result == MOJO_RESULT_SHOULD_WAIT); - - const auto iter = handles_.find(Handle(ready_handle)); - iter->second.Run(ready_handle_result); - }; - - return true; -} - -SyncHandleWatcher::SyncHandleWatcher() { - MojoHandle handle; - MojoResult result = MojoCreateWaitSet(&handle); - CHECK_EQ(MOJO_RESULT_OK, result); - wait_set_handle_.reset(Handle(handle)); - CHECK(wait_set_handle_.is_valid()); - - DCHECK(!g_current_sync_handle_watcher.Pointer()->Get()); - g_current_sync_handle_watcher.Pointer()->Set(this); - - base::MessageLoop::current()->AddDestructionObserver(this); -} - -SyncHandleWatcher::~SyncHandleWatcher() { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(handles_.empty()); - g_current_sync_handle_watcher.Pointer()->Set(nullptr); -} - -void SyncHandleWatcher::WillDestroyCurrentMessageLoop() { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK_EQ(this, g_current_sync_handle_watcher.Pointer()->Get()); - - base::MessageLoop::current()->RemoveDestructionObserver(this); - delete this; -} - -} // namespace internal -} // namespace mojo diff --git a/mojo/public/cpp/bindings/lib/sync_handle_watcher.h b/mojo/public/cpp/bindings/lib/sync_handle_watcher.h deleted file mode 100644 index 734a212..0000000 --- a/mojo/public/cpp/bindings/lib/sync_handle_watcher.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2016 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_ -#define MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_ - -#include <unordered_map> - -#include "base/callback.h" -#include "base/macros.h" -#include "base/message_loop/message_loop.h" -#include "base/threading/thread_checker.h" -#include "mojo/public/cpp/system/core.h" - -namespace mojo { -namespace internal { - -// SyncHandleWatcher is used to support sync methods. While a sync call is -// waiting for response, we would like incoming sync method requests on the same -// thread to be able to reenter. We also would like master endpoints to continue -// dispatching messages for associated endpoints on different threads. -// Therefore, SyncHandleWatcher is used as thread-local storage to register all -// handles that need to be watched while waiting for sync call responses. -// -// This class is not thread safe. -class SyncHandleWatcher : public base::MessageLoop::DestructionObserver { - public: - // Returns a thread-local object. - static SyncHandleWatcher* current(); - - using HandleCallback = base::Callback<void(MojoResult)>; - bool RegisterHandle(const Handle& handle, - MojoHandleSignals handle_signals, - const HandleCallback& callback); - - void UnregisterHandle(const Handle& handle); - - // Waits on all the registered handles and runs callbacks synchronously for - // those ready handles. - // The method: - // - returns true when |*should_stop| is set to true; - // - returns false when either |caller_handle| is unregistered or any error - // occurs. - bool WatchAllHandles(const Handle& caller_handle, const bool* should_stop); - - private: - struct HandleHasher { - size_t operator()(const Handle& handle) const { - return std::hash<uint32_t>()(static_cast<uint32_t>(handle.value())); - } - }; - using HandleMap = std::unordered_map<Handle, HandleCallback, HandleHasher>; - - SyncHandleWatcher(); - ~SyncHandleWatcher() override; - - // base::MessageLoop::DestructionObserver implementation: - void WillDestroyCurrentMessageLoop() override; - - HandleMap handles_; - - ScopedHandle wait_set_handle_; - - base::ThreadChecker thread_checker_; - - DISALLOW_COPY_AND_ASSIGN(SyncHandleWatcher); -}; - -} // namespace internal -} // namespace mojo - -#endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_SYNC_HANDLE_WATCHER_H_ diff --git a/mojo/public/cpp/bindings/message.h b/mojo/public/cpp/bindings/message.h index 9781a13..f58b2e0 100644 --- a/mojo/public/cpp/bindings/message.h +++ b/mojo/public/cpp/bindings/message.h @@ -122,7 +122,6 @@ class MessageReceiverWithResponder : public MessageReceiver { // |responder| and will delete it after calling |responder->Accept| or upon // its own destruction. // - // TODO(yzshen): consider changing |responder| to scoped_ptr<MessageReceiver>. virtual bool AcceptWithResponder(Message* message, MessageReceiver* responder) MOJO_WARN_UNUSED_RESULT = 0; }; @@ -155,7 +154,6 @@ class MessageReceiverWithResponderStatus : public MessageReceiver { // |responder| and will delete it after calling |responder->Accept| or upon // its own destruction. // - // TODO(yzshen): consider changing |responder| to scoped_ptr<MessageReceiver>. virtual bool AcceptWithResponder(Message* message, MessageReceiverWithStatus* responder) MOJO_WARN_UNUSED_RESULT = 0; diff --git a/mojo/public/cpp/bindings/tests/router_unittest.cc b/mojo/public/cpp/bindings/tests/router_unittest.cc index 1f7160e..e88f0e8 100644 --- a/mojo/public/cpp/bindings/tests/router_unittest.cc +++ b/mojo/public/cpp/bindings/tests/router_unittest.cc @@ -39,8 +39,8 @@ class RouterTest : public testing::Test { }; TEST_F(RouterTest, BasicRequestResponse) { - internal::Router router0(std::move(handle0_), internal::FilterChain(), false); - internal::Router router1(std::move(handle1_), internal::FilterChain(), false); + internal::Router router0(std::move(handle0_), internal::FilterChain()); + internal::Router router1(std::move(handle1_), internal::FilterChain()); ResponseGenerator generator; router1.set_incoming_receiver(&generator); @@ -83,8 +83,8 @@ TEST_F(RouterTest, BasicRequestResponse) { } TEST_F(RouterTest, BasicRequestResponse_Synchronous) { - internal::Router router0(std::move(handle0_), internal::FilterChain(), false); - internal::Router router1(std::move(handle1_), internal::FilterChain(), false); + internal::Router router0(std::move(handle0_), internal::FilterChain()); + internal::Router router1(std::move(handle1_), internal::FilterChain()); ResponseGenerator generator; router1.set_incoming_receiver(&generator); @@ -125,8 +125,8 @@ TEST_F(RouterTest, BasicRequestResponse_Synchronous) { } TEST_F(RouterTest, RequestWithNoReceiver) { - internal::Router router0(std::move(handle0_), internal::FilterChain(), false); - internal::Router router1(std::move(handle1_), internal::FilterChain(), false); + internal::Router router0(std::move(handle0_), internal::FilterChain()); + internal::Router router1(std::move(handle1_), internal::FilterChain()); // Without an incoming receiver set on router1, we expect router0 to observe // an error as a result of sending a message. @@ -151,8 +151,8 @@ TEST_F(RouterTest, RequestWithNoReceiver) { // Tests Router using the LazyResponseGenerator. The responses will not be // sent until after the requests have been accepted. TEST_F(RouterTest, LazyResponses) { - internal::Router router0(std::move(handle0_), internal::FilterChain(), false); - internal::Router router1(std::move(handle1_), internal::FilterChain(), false); + internal::Router router0(std::move(handle0_), internal::FilterChain()); + internal::Router router1(std::move(handle1_), internal::FilterChain()); base::RunLoop run_loop; LazyResponseGenerator generator(run_loop.QuitClosure()); @@ -217,7 +217,7 @@ TEST_F(RouterTest, LazyResponses) { // both sides still appear to have a valid message pipe handle bound. TEST_F(RouterTest, MissingResponses) { base::RunLoop run_loop0, run_loop1; - internal::Router router0(std::move(handle0_), internal::FilterChain(), false); + internal::Router router0(std::move(handle0_), internal::FilterChain()); bool error_handler_called0 = false; router0.set_connection_error_handler( [&error_handler_called0, &run_loop0]() { @@ -225,7 +225,7 @@ TEST_F(RouterTest, MissingResponses) { run_loop0.Quit(); }); - internal::Router router1(std::move(handle1_), internal::FilterChain(), false); + internal::Router router1(std::move(handle1_), internal::FilterChain()); bool error_handler_called1 = false; router1.set_connection_error_handler( [&error_handler_called1, &run_loop1]() { @@ -277,10 +277,8 @@ TEST_F(RouterTest, LateResponse) { base::RunLoop run_loop; LazyResponseGenerator generator(run_loop.QuitClosure()); { - internal::Router router0(std::move(handle0_), internal::FilterChain(), - false); - internal::Router router1(std::move(handle1_), internal::FilterChain(), - false); + internal::Router router0(std::move(handle0_), internal::FilterChain()); + internal::Router router1(std::move(handle1_), internal::FilterChain()); router1.set_incoming_receiver(&generator); diff --git a/mojo/public/cpp/bindings/tests/sync_method_unittest.cc b/mojo/public/cpp/bindings/tests/sync_method_unittest.cc index 207a6e2..fd519b0 100644 --- a/mojo/public/cpp/bindings/tests/sync_method_unittest.cc +++ b/mojo/public/cpp/bindings/tests/sync_method_unittest.cc @@ -30,47 +30,26 @@ class TestSyncImpl : public TestSync { public: TestSyncImpl(TestSyncRequest request) : binding_(this, std::move(request)) {} - using PingHandler = Callback<void(const PingCallback&)>; - void set_ping_handler(const PingHandler& handler) { ping_handler_ = handler; } - - using EchoHandler = Callback<void(int32_t, const EchoCallback&)>; - void set_echo_handler(const EchoHandler& handler) { echo_handler_ = handler; } - - using AsyncEchoHandler = Callback<void(int32_t, const AsyncEchoCallback&)>; - void set_async_echo_handler(const AsyncEchoHandler& handler) { - async_echo_handler_ = handler; + void set_ping_notification(const Closure& closure) { + ping_notification_ = closure; } // TestSync implementation: + void Get(const GetCallback& callback) override { callback.Run(42); } + void Set(int32_t value, const SetCallback& callback) override { + callback.Run(); + } void Ping(const PingCallback& callback) override { - if (ping_handler_.is_null()) { - callback.Run(); - return; - } - ping_handler_.Run(callback); + ping_notification_.Run(); + callback.Run(); } void Echo(int32_t value, const EchoCallback& callback) override { - if (echo_handler_.is_null()) { - callback.Run(value); - return; - } - echo_handler_.Run(value, callback); - } - void AsyncEcho(int32_t value, const AsyncEchoCallback& callback) override { - if (async_echo_handler_.is_null()) { - callback.Run(value); - return; - } - async_echo_handler_.Run(value, callback); + callback.Run(value); } - Binding<TestSync>* binding() { return &binding_; } - private: Binding<TestSync> binding_; - PingHandler ping_handler_; - EchoHandler echo_handler_; - AsyncEchoHandler async_echo_handler_; + Closure ping_notification_; DISALLOW_COPY_AND_ASSIGN(TestSyncImpl); }; @@ -88,12 +67,9 @@ class TestSyncServiceThread { void SetUp(TestSyncRequest request) { CHECK(thread_.task_runner()->BelongsToCurrentThread()); impl_.reset(new TestSyncImpl(std::move(request))); - impl_->set_ping_handler([this](const TestSync::PingCallback& callback) { - { - base::AutoLock locker(lock_); - ping_called_ = true; - } - callback.Run(); + impl_->set_ping_notification([this]() { + base::AutoLock locker(lock_); + ping_called_ = true; }); } @@ -154,197 +130,6 @@ TEST_F(SyncMethodTest, BasicSyncCalls) { run_loop.Run(); } -TEST_F(SyncMethodTest, ReenteredBySyncMethodBinding) { - // Test that an interface pointer waiting for a sync call response can be - // reentered by a binding serving sync methods on the same thread. - - TestSyncPtr ptr; - // The binding lives on the same thread as the interface pointer. - TestSyncImpl impl(GetProxy(&ptr)); - int32_t output_value = -1; - ASSERT_TRUE(ptr->Echo(42, &output_value)); - EXPECT_EQ(42, output_value); -} - -TEST_F(SyncMethodTest, InterefacePtrDestroyedDuringSyncCall) { - // Test that it won't result in crash or hang if an interface pointer is - // destroyed while it is waiting for a sync call response. - - TestSyncPtr ptr; - TestSyncImpl impl(GetProxy(&ptr)); - impl.set_ping_handler([&ptr](const TestSync::PingCallback& callback) { - ptr.reset(); - callback.Run(); - }); - ASSERT_FALSE(ptr->Ping()); -} - -TEST_F(SyncMethodTest, BindingDestroyedDuringSyncCall) { - // Test that it won't result in crash or hang if a binding is - // closed (and therefore the message pipe handle is closed) while the - // corresponding interface pointer is waiting for a sync call response. - - TestSyncPtr ptr; - TestSyncImpl impl(GetProxy(&ptr)); - impl.set_ping_handler([&impl](const TestSync::PingCallback& callback) { - impl.binding()->Close(); - callback.Run(); - }); - ASSERT_FALSE(ptr->Ping()); -} - -TEST_F(SyncMethodTest, NestedSyncCallsWithInOrderResponses) { - // Test that we can call a sync method on an interface ptr, while there is - // already a sync call ongoing. The responses arrive in order. - - TestSyncPtr ptr; - TestSyncImpl impl(GetProxy(&ptr)); - - // The same variable is used to store the output of the two sync calls, in - // order to test that responses are handled in the correct order. - int32_t result_value = -1; - - bool first_call = true; - impl.set_echo_handler([&first_call, &ptr, &result_value]( - int32_t value, const TestSync::EchoCallback& callback) { - if (first_call) { - first_call = false; - ASSERT_TRUE(ptr->Echo(456, &result_value)); - EXPECT_EQ(456, result_value); - } - callback.Run(value); - }); - - ASSERT_TRUE(ptr->Echo(123, &result_value)); - EXPECT_EQ(123, result_value); -} - -TEST_F(SyncMethodTest, NestedSyncCallsWithOutOfOrderResponses) { - // Test that we can call a sync method on an interface ptr, while there is - // already a sync call ongoing. The responses arrive out of order. - - TestSyncPtr ptr; - TestSyncImpl impl(GetProxy(&ptr)); - - // The same variable is used to store the output of the two sync calls, in - // order to test that responses are handled in the correct order. - int32_t result_value = -1; - - bool first_call = true; - impl.set_echo_handler([&first_call, &ptr, &result_value]( - int32_t value, const TestSync::EchoCallback& callback) { - callback.Run(value); - if (first_call) { - first_call = false; - ASSERT_TRUE(ptr->Echo(456, &result_value)); - EXPECT_EQ(456, result_value); - } - }); - - ASSERT_TRUE(ptr->Echo(123, &result_value)); - EXPECT_EQ(123, result_value); -} - -TEST_F(SyncMethodTest, AsyncResponseQueuedDuringSyncCall) { - // Test that while an interface pointer is waiting for the response to a sync - // call, async responses are queued until the sync call completes. - - TestSyncPtr ptr; - TestSyncImpl impl(GetProxy(&ptr)); - - int32_t async_echo_request_value = -1; - TestSync::AsyncEchoCallback async_echo_request_callback; - base::RunLoop run_loop1; - impl.set_async_echo_handler( - [&async_echo_request_value, &async_echo_request_callback, &run_loop1]( - int32_t value, const TestSync::AsyncEchoCallback& callback) { - async_echo_request_value = value; - async_echo_request_callback = callback; - run_loop1.Quit(); - }); - - bool async_echo_response_dispatched = false; - base::RunLoop run_loop2; - ptr->AsyncEcho(123, - [&async_echo_response_dispatched, &run_loop2](int32_t result) { - async_echo_response_dispatched = true; - EXPECT_EQ(123, result); - run_loop2.Quit(); - }); - // Run until the AsyncEcho request reaches the service side. - run_loop1.Run(); - - impl.set_echo_handler( - [&async_echo_request_value, &async_echo_request_callback]( - int32_t value, const TestSync::EchoCallback& callback) { - // Send back the async response first. - EXPECT_FALSE(async_echo_request_callback.is_null()); - async_echo_request_callback.Run(async_echo_request_value); - - callback.Run(value); - }); - - int32_t result_value = -1; - ASSERT_TRUE(ptr->Echo(456, &result_value)); - EXPECT_EQ(456, result_value); - - // Although the AsyncEcho response arrives before the Echo response, it should - // be queued and not yet dispatched. - EXPECT_FALSE(async_echo_response_dispatched); - - // Run until the AsyncEcho response is dispatched. - run_loop2.Run(); - - EXPECT_TRUE(async_echo_response_dispatched); -} - -TEST_F(SyncMethodTest, AsyncRequestQueuedDuringSyncCall) { - // Test that while an interface pointer is waiting for the response to a sync - // call, async requests for a binding running on the same thread are queued - // until the sync call completes. - - TestSyncPtr ptr; - TestSyncImpl impl(GetProxy(&ptr)); - - bool async_echo_request_dispatched = false; - impl.set_async_echo_handler([&async_echo_request_dispatched]( - int32_t value, const TestSync::AsyncEchoCallback& callback) { - async_echo_request_dispatched = true; - callback.Run(value); - }); - - bool async_echo_response_dispatched = false; - base::RunLoop run_loop; - ptr->AsyncEcho(123, - [&async_echo_response_dispatched, &run_loop](int32_t result) { - async_echo_response_dispatched = true; - EXPECT_EQ(123, result); - run_loop.Quit(); - }); - - impl.set_echo_handler([&async_echo_request_dispatched]( - int32_t value, const TestSync::EchoCallback& callback) { - // Although the AsyncEcho request is sent before the Echo request, it - // shouldn't be dispatched yet at this point, because there is an ongoing - // sync call on the same thread. - EXPECT_FALSE(async_echo_request_dispatched); - callback.Run(value); - }); - - int32_t result_value = -1; - ASSERT_TRUE(ptr->Echo(456, &result_value)); - EXPECT_EQ(456, result_value); - - // Although the AsyncEcho request is sent before the Echo request, it - // shouldn't be dispatched yet. - EXPECT_FALSE(async_echo_request_dispatched); - - // Run until the AsyncEcho response is dispatched. - run_loop.Run(); - - EXPECT_TRUE(async_echo_response_dispatched); -} - } // namespace } // namespace test } // namespace mojo diff --git a/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom b/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom index 1a55a0f..2df8a1c 100644 --- a/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom +++ b/mojo/public/interfaces/bindings/tests/test_sync_methods.mojom @@ -4,26 +4,16 @@ module mojo.test; -interface TestCodeGeneration { - [Sync] - NoInput() => (int32 result); - - [Sync] - NoOutput(int32 value) => (); - +interface TestSync { [Sync] - NoInOut() => (); + Get() => (int32 result); [Sync] - HaveInOut(int32 value1, int32 value2) => (int32 result1, int32 result2); -}; + Set(int32 value) => (); -interface TestSync { [Sync] Ping() => (); [Sync] Echo(int32 value) => (int32 result); - - AsyncEcho(int32 value) => (int32 result); }; diff --git a/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl b/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl index 2f0c60e..7bc6d6c 100644 --- a/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl +++ b/mojo/public/tools/bindings/generators/cpp_templates/interface_declaration.tmpl @@ -12,7 +12,6 @@ class {{interface.name}} { static const char Name_[]; static const uint32_t Version_ = {{interface.version}}; static const bool PassesAssociatedKinds_ = {% if interface|passes_associated_kinds %}true{% else %}false{% endif %}; - static const bool HasSyncMethods_ = {% if interface|has_sync_methods %}true{% else %}false{% endif %}; using GenericInterface = {{interface|get_qualified_name_for_kind}}; diff --git a/mojo/public/tools/bindings/generators/mojom_cpp_generator.py b/mojo/public/tools/bindings/generators/mojom_cpp_generator.py index 22e089c..09b8bad 100644 --- a/mojo/public/tools/bindings/generators/mojom_cpp_generator.py +++ b/mojo/public/tools/bindings/generators/mojom_cpp_generator.py @@ -473,7 +473,6 @@ class Generator(generator.Generator): "get_pad": pack.GetPad, "get_qualified_name_for_kind": GetQualifiedNameForKind, "has_callbacks": mojom.HasCallbacks, - "has_sync_methods": mojom.HasSyncMethods, "should_inline": ShouldInlineStruct, "should_inline_union": ShouldInlineUnion, "is_array_kind": mojom.IsArrayKind, diff --git a/mojo/public/tools/bindings/pylib/mojom/generate/module.py b/mojo/public/tools/bindings/pylib/mojom/generate/module.py index 6a86f16..860438d 100644 --- a/mojo/public/tools/bindings/pylib/mojom/generate/module.py +++ b/mojo/public/tools/bindings/pylib/mojom/generate/module.py @@ -671,10 +671,3 @@ def PassesAssociatedKinds(interface): if _ContainsAssociatedKinds(param.kind, visited_kinds): return True return False - - -def HasSyncMethods(interface): - for method in interface.methods: - if method.sync: - return True - return False |