From 314a150a7a17639873137ba079a4a26785242273 Mon Sep 17 00:00:00 2001 From: amistry Date: Mon, 21 Mar 2016 12:16:41 -0700 Subject: Make mojo::Callback safe to copy across threads and destroy on any thread. This brings the behaviour of mojo::Callback close to that of base::Callback. BUG=595939 Review URL: https://codereview.chromium.org/1819463002 Cr-Commit-Position: refs/heads/master@{#382349} --- mojo/public/cpp/bindings/callback.h | 27 ++++++++++---- .../cpp/bindings/lib/interface_endpoint_client.cc | 43 ++++++++++++++++------ mojo/public/cpp/bindings/lib/router.cc | 39 +++++++++++++++++--- mojo/public/cpp/bindings/message.h | 5 +++ .../bindings/tests/binding_callback_unittest.cc | 2 +- .../cpp_templates/interface_definition.tmpl | 15 +++++--- 6 files changed, 99 insertions(+), 32 deletions(-) (limited to 'mojo') diff --git a/mojo/public/cpp/bindings/callback.h b/mojo/public/cpp/bindings/callback.h index 8b4c158..60dea1c 100644 --- a/mojo/public/cpp/bindings/callback.h +++ b/mojo/public/cpp/bindings/callback.h @@ -7,8 +7,9 @@ #include +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" #include "mojo/public/cpp/bindings/lib/callback_internal.h" -#include "mojo/public/cpp/bindings/lib/shared_ptr.h" #include "mojo/public/cpp/bindings/lib/template_util.h" namespace mojo { @@ -40,7 +41,7 @@ class Callback { // Constructs a callback that will run |runnable|. The callback takes // ownership of |runnable|. - explicit Callback(Runnable* runnable) : sink_(runnable) {} + explicit Callback(Runnable* runnable) : sink_(new RunnableHolder(runnable)) {} // As above, but can take an object that isn't derived from Runnable, so long // as it has a compatible operator() or Run() method. operator() will be @@ -50,19 +51,19 @@ class Callback { using sink_type = typename internal::Conditional< internal::HasCompatibleCallOperator::value, FunctorAdapter, RunnableAdapter>::type; - sink_ = internal::SharedPtr(new sink_type(sink)); + sink_ = new RunnableHolder(new sink_type(sink)); } // As above, but can take a compatible function pointer. Callback(void (*function_ptr)( typename internal::Callback_ParamTraits::ForwardType...)) - : sink_(new FunctionPtrAdapter(function_ptr)) {} + : sink_(new RunnableHolder(new FunctionPtrAdapter(function_ptr))) {} // Executes the callback function. void Run(typename internal::Callback_ParamTraits::ForwardType... args) const { - if (sink_.get()) - sink_->Run(std::forward< + if (sink_) + sink_->runnable->Run(std::forward< typename internal::Callback_ParamTraits::ForwardType>( args)...); } @@ -70,7 +71,7 @@ class Callback { bool is_null() const { return !sink_.get(); } // Resets the callback to the "null" state. - void reset() { sink_.reset(); } + void reset() { sink_ = nullptr; } private: // Adapts a class that has a Run() method but is not derived from Runnable to @@ -123,7 +124,17 @@ class Callback { FunctionPtr function_ptr; }; - internal::SharedPtr sink_; + struct RunnableHolder : public base::RefCountedThreadSafe { + explicit RunnableHolder(Runnable* runnable) : runnable(runnable) {} + + scoped_ptr runnable; + + private: + friend class base::RefCountedThreadSafe; + ~RunnableHolder() {} + }; + + scoped_refptr sink_; }; // A specialization of Callback which takes no parameters. diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc index 0063bff..58bc885 100644 --- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc +++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc @@ -8,9 +8,12 @@ #include #include "base/bind.h" +#include "base/location.h" #include "base/macros.h" #include "base/message_loop/message_loop.h" +#include "base/single_thread_task_runner.h" #include "base/stl_util.h" +#include "base/thread_task_runner_handle.h" #include "mojo/public/cpp/bindings/associated_group.h" #include "mojo/public/cpp/bindings/lib/multiplex_router.h" @@ -21,6 +24,12 @@ namespace internal { namespace { +void DCheckIfInvalid(const base::WeakPtr& client, + const std::string& message) { + bool is_valid = client && !client->encountered_error(); + DCHECK(!is_valid) << message; +} + // When receiving an incoming message which expects a repsonse, // InterfaceEndpointClient creates a ResponderThunk object and passes it to the // incoming message receiver. When the receiver finishes processing the message, @@ -29,21 +38,21 @@ class ResponderThunk : public MessageReceiverWithStatus { public: explicit ResponderThunk( const base::WeakPtr& endpoint_client) - : endpoint_client_(endpoint_client), accept_was_invoked_(false) {} + : endpoint_client_(endpoint_client), accept_was_invoked_(false), + task_runner_(base::ThreadTaskRunnerHandle::Get()) {} ~ResponderThunk() override { if (!accept_was_invoked_) { // The Mojo application handled a message that was expecting a response // but did not send a response. - if (endpoint_client_) { - // We raise an error to signal the calling application that an error - // condition occurred. Without this the calling application would have - // no way of knowing it should stop waiting for a response. - // - // We raise the error asynchronously and only if |endpoint_client_| is - // still alive when the task is run. That way it won't break the case - // where the user abandons the interface endpoint client soon after - // he/she abandons the callback. - base::MessageLoop::current()->PostTask( + if (task_runner_->RunsTasksOnCurrentThread()) { + if (endpoint_client_) { + // We raise an error to signal the calling application that an error + // condition occurred. Without this the calling application would have + // no way of knowing it should stop waiting for a response. + endpoint_client_->RaiseError(); + } + } else { + task_runner_->PostTask( FROM_HERE, base::Bind(&InterfaceEndpointClient::RaiseError, endpoint_client_)); } @@ -52,6 +61,7 @@ class ResponderThunk : public MessageReceiverWithStatus { // MessageReceiver implementation: bool Accept(Message* message) override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); accept_was_invoked_ = true; DCHECK(message->has_flag(kMessageIsResponse)); @@ -65,12 +75,23 @@ class ResponderThunk : public MessageReceiverWithStatus { // MessageReceiverWithStatus implementation: bool IsValid() override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); return endpoint_client_ && !endpoint_client_->encountered_error(); } + void DCheckInvalid(const std::string& message) override { + if (task_runner_->RunsTasksOnCurrentThread()) { + DCheckIfInvalid(endpoint_client_, message); + } else { + task_runner_->PostTask( + FROM_HERE, base::Bind(&DCheckIfInvalid, endpoint_client_, message)); + } + } + private: base::WeakPtr endpoint_client_; bool accept_was_invoked_; + scoped_refptr task_runner_; DISALLOW_COPY_AND_ASSIGN(ResponderThunk); }; diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc index d929e80..045fc62 100644 --- a/mojo/public/cpp/bindings/lib/router.cc +++ b/mojo/public/cpp/bindings/lib/router.cc @@ -8,9 +8,12 @@ #include #include "base/bind.h" +#include "base/location.h" #include "base/logging.h" #include "base/message_loop/message_loop.h" +#include "base/single_thread_task_runner.h" #include "base/stl_util.h" +#include "base/thread_task_runner_handle.h" namespace mojo { namespace internal { @@ -19,25 +22,38 @@ namespace internal { namespace { +void DCheckIfInvalid(const base::WeakPtr& router, + const std::string& message) { + bool is_valid = router && !router->encountered_error() && router->is_valid(); + DCHECK(!is_valid) << message; +} + class ResponderThunk : public MessageReceiverWithStatus { public: explicit ResponderThunk(const base::WeakPtr& router) - : router_(router), accept_was_invoked_(false) {} + : router_(router), accept_was_invoked_(false), + task_runner_(base::ThreadTaskRunnerHandle::Get()) {} ~ResponderThunk() override { if (!accept_was_invoked_) { // The Mojo application handled a message that was expecting a response // but did not send a response. - if (router_) { - // We raise an error to signal the calling application that an error - // condition occurred. Without this the calling application would have - // no way of knowing it should stop waiting for a response. - router_->RaiseError(); + if (task_runner_->RunsTasksOnCurrentThread()) { + if (router_) { + // We raise an error to signal the calling application that an error + // condition occurred. Without this the calling application would have + // no way of knowing it should stop waiting for a response. + router_->RaiseError(); + } + } else { + task_runner_->PostTask(FROM_HERE, + base::Bind(&Router::RaiseError, router_)); } } } // MessageReceiver implementation: bool Accept(Message* message) override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); accept_was_invoked_ = true; DCHECK(message->has_flag(kMessageIsResponse)); @@ -51,12 +67,23 @@ class ResponderThunk : public MessageReceiverWithStatus { // MessageReceiverWithStatus implementation: bool IsValid() override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); return router_ && !router_->encountered_error() && router_->is_valid(); } + void DCheckInvalid(const std::string& message) override { + if (task_runner_->RunsTasksOnCurrentThread()) { + DCheckIfInvalid(router_, message); + } else { + task_runner_->PostTask(FROM_HERE, + base::Bind(&DCheckIfInvalid, router_, message)); + } + } + private: base::WeakPtr router_; bool accept_was_invoked_; + scoped_refptr task_runner_; }; } // namespace diff --git a/mojo/public/cpp/bindings/message.h b/mojo/public/cpp/bindings/message.h index 4b8014a..5163669 100644 --- a/mojo/public/cpp/bindings/message.h +++ b/mojo/public/cpp/bindings/message.h @@ -137,6 +137,11 @@ class MessageReceiverWithStatus : public MessageReceiver { // Returns |true| if this MessageReceiver is currently bound to a MessagePipe, // the pipe has not been closed, and the pipe has not encountered an error. virtual bool IsValid() = 0; + + // DCHECKs if this MessageReceiver is currently bound to a MessagePipe, the + // pipe has not been closed, and the pipe has not encountered an error. + // This function may be called on any thread. + virtual void DCheckInvalid(const std::string& message) = 0; }; // An alternative to MessageReceiverWithResponder for cases in which it diff --git a/mojo/public/cpp/bindings/tests/binding_callback_unittest.cc b/mojo/public/cpp/bindings/tests/binding_callback_unittest.cc index 8dbdb8c..68aedde 100644 --- a/mojo/public/cpp/bindings/tests/binding_callback_unittest.cc +++ b/mojo/public/cpp/bindings/tests/binding_callback_unittest.cc @@ -338,7 +338,7 @@ TEST_F(BindingCallbackTest, DeleteCallbackBeforeBindingDeathTest) { #if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) // Delete the callback without running it. This should cause a crash in debug // builds due to a DCHECK. - std::string regex("Check failed: !callback_was_dropped."); + std::string regex("Check failed: !is_valid"); #if defined(OS_WIN) // TODO(msw): Fix MOJO_DCHECK logs and EXPECT_DEATH* on Win: crbug.com/535014 regex.clear(); diff --git a/mojo/public/tools/bindings/generators/cpp_templates/interface_definition.tmpl b/mojo/public/tools/bindings/generators/cpp_templates/interface_definition.tmpl index de7469c..c437738 100644 --- a/mojo/public/tools/bindings/generators/cpp_templates/interface_definition.tmpl +++ b/mojo/public/tools/bindings/generators/cpp_templates/interface_definition.tmpl @@ -200,15 +200,18 @@ class {{class_name}}_{{method.name}}_ProxyToResponder : public {{class_name}}::{{method.name}}Callback::Runnable { public: ~{{class_name}}_{{method.name}}_ProxyToResponder() override { - // Is the Mojo application destroying the callback without running it - // and without first closing the pipe? - bool callback_was_dropped = responder_ && responder_->IsValid(); +#if DCHECK_IS_ON() + if (responder_) { + // Is the Mojo application destroying the callback without running it + // and without first closing the pipe? + responder_->DCheckInvalid("The callback passed to " + "{{class_name}}::{{method.name}}({%- if method.parameters -%}{{pass_params(method.parameters)}}, {% endif -%}callback) " + "was never run."); + } +#endif // If the Callback was dropped then deleting the responder will close // the pipe so the calling application knows to stop waiting for a reply. delete responder_; - DCHECK(!callback_was_dropped) << "The callback passed to " - "{{class_name}}::{{method.name}}({%- if method.parameters -%}{{pass_params(method.parameters)}}, {% endif -%}callback) " - "was never run."; } {{class_name}}_{{method.name}}_ProxyToResponder( -- cgit v1.1