From d06373e7cd8b4ad725ed5c64c958f2de13585add Mon Sep 17 00:00:00 2001 From: rockot Date: Fri, 4 Mar 2016 15:59:50 -0800 Subject: [mojo-bindings] Use Watch API instead of MessagePumpMojo This removes the C++ bindings dependency on MessagePumpMojo, consuming the new Watch API instead. For convenience a new mojo::Watcher is added to the public Mojo C++ API library, and this is used by Connector. BUG=590495 R=yzshen@chromium.org TBR=blundell@chromium.org for rename affecting components/message_port.gypi TBR=jam@chromium.org - added a missing header to new url tests Review URL: https://codereview.chromium.org/1759783003 Cr-Commit-Position: refs/heads/master@{#379402} --- mojo/public/cpp/bindings/BUILD.gn | 1 - mojo/public/cpp/bindings/lib/connector.cc | 43 +++--- mojo/public/cpp/bindings/lib/connector.h | 8 +- mojo/public/cpp/system/BUILD.gn | 2 + mojo/public/cpp/system/tests/BUILD.gn | 2 + mojo/public/cpp/system/tests/watcher_unittest.cc | 186 +++++++++++++++++++++++ mojo/public/cpp/system/watcher.cc | 134 ++++++++++++++++ mojo/public/cpp/system/watcher.h | 117 ++++++++++++++ 8 files changed, 463 insertions(+), 30 deletions(-) create mode 100644 mojo/public/cpp/system/tests/watcher_unittest.cc create mode 100644 mojo/public/cpp/system/watcher.cc create mode 100644 mojo/public/cpp/system/watcher.h (limited to 'mojo/public') diff --git a/mojo/public/cpp/bindings/BUILD.gn b/mojo/public/cpp/bindings/BUILD.gn index b172621..137d8d3 100644 --- a/mojo/public/cpp/bindings/BUILD.gn +++ b/mojo/public/cpp/bindings/BUILD.gn @@ -101,7 +101,6 @@ source_set("bindings") { deps = [ "//base", - "//mojo/message_pump", "//mojo/public/interfaces/bindings:bindings_cpp_sources", ] } diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc index 7f9a9c3..812b4c11 100644 --- a/mojo/public/cpp/bindings/lib/connector.cc +++ b/mojo/public/cpp/bindings/lib/connector.cc @@ -11,6 +11,7 @@ #include "base/logging.h" #include "base/macros.h" #include "base/synchronization/lock.h" +#include "base/thread_task_runner_handle.h" #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" namespace mojo { @@ -247,7 +248,7 @@ bool Connector::RunSyncHandleWatch(const bool* should_stop) { return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); } -void Connector::OnHandleWatcherHandleReady(MojoResult result) { +void Connector::OnWatcherHandleReady(MojoResult result) { OnHandleReadyInternal(result); } @@ -273,12 +274,21 @@ void Connector::OnHandleReadyInternal(MojoResult result) { } void Connector::WaitToReadMore() { - CHECK(!handle_watcher_.is_watching()); CHECK(!paused_); - handle_watcher_.Start(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, - MOJO_DEADLINE_INDEFINITE, - base::Bind(&Connector::OnHandleWatcherHandleReady, - base::Unretained(this))); + DCHECK(!handle_watcher_.IsWatching()); + + MojoResult rv = handle_watcher_.Start( + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, + base::Bind(&Connector::OnWatcherHandleReady, + base::Unretained(this))); + + if (rv != MOJO_RESULT_OK) { + // If the watch failed because the handle is invalid or its conditions can + // no longer be met, we signal the error asynchronously to avoid reentry. + base::ThreadTaskRunnerHandle::Get()->PostTask( + FROM_HERE, base::Bind(&Connector::OnWatcherHandleReady, + weak_factory_.GetWeakPtr(), rv)); + } if (register_sync_handle_watch_count_ > 0 && !registered_with_sync_handle_watcher_) { @@ -304,13 +314,6 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { *read_result = rv; if (rv == MOJO_RESULT_OK) { - // Dispatching the message may spin in a nested message loop. To ensure we - // continue dispatching messages when this happens start listening for - // messagse now. - if (!handle_watcher_.is_watching()) { - // TODO: Need to evaluate the perf impact of this. - WaitToReadMore(); - } receiver_result = incoming_receiver_ && incoming_receiver_->Accept(&message); } @@ -344,21 +347,13 @@ void Connector::ReadAllAvailableMessages() { if (paused_) return; - if (rv == MOJO_RESULT_SHOULD_WAIT) { - // ReadSingleMessage could end up calling HandleError which resets - // message_pipe_ to a dummy one that is closed. The old EDK will see the - // that the peer is closed immediately, while the new one is asynchronous - // because of thread hops. In that case, there'll still be an async - // waiter. - if (!handle_watcher_.is_watching()) - WaitToReadMore(); + if (rv == MOJO_RESULT_SHOULD_WAIT) break; - } } } void Connector::CancelWait() { - handle_watcher_.Stop(); + handle_watcher_.Cancel(); if (registered_with_sync_handle_watcher_) { SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); @@ -394,8 +389,6 @@ void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { } if (force_async_handler) { - // |dummy_pipe.handle1| has been destructed. Reading the pipe will - // eventually cause a read error on |message_pipe_| and set error state. if (!paused_) WaitToReadMore(); } else { diff --git a/mojo/public/cpp/bindings/lib/connector.h b/mojo/public/cpp/bindings/lib/connector.h index 2d2a5cc..2989d70 100644 --- a/mojo/public/cpp/bindings/lib/connector.h +++ b/mojo/public/cpp/bindings/lib/connector.h @@ -9,10 +9,10 @@ #include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/threading/thread_checker.h" -#include "mojo/message_pump/handle_watcher.h" #include "mojo/public/cpp/bindings/callback.h" #include "mojo/public/cpp/bindings/message.h" #include "mojo/public/cpp/system/core.h" +#include "mojo/public/cpp/system/watcher.h" namespace base { class Lock; @@ -141,8 +141,8 @@ class Connector : public MessageReceiver { } private: - // Callback of mojo::common::HandleWatcher. - void OnHandleWatcherHandleReady(MojoResult result); + // Callback of mojo::Watcher. + void OnWatcherHandleReady(MojoResult result); // Callback of SyncHandleWatcher. void OnSyncHandleWatcherHandleReady(MojoResult result); void OnHandleReadyInternal(MojoResult result); @@ -169,7 +169,7 @@ class Connector : public MessageReceiver { ScopedMessagePipeHandle message_pipe_; MessageReceiver* incoming_receiver_; - common::HandleWatcher handle_watcher_; + Watcher handle_watcher_; bool error_; bool drop_writes_; diff --git a/mojo/public/cpp/system/BUILD.gn b/mojo/public/cpp/system/BUILD.gn index d5298f2..dc2d28c 100644 --- a/mojo/public/cpp/system/BUILD.gn +++ b/mojo/public/cpp/system/BUILD.gn @@ -11,6 +11,8 @@ source_set("system") { "handle.h", "macros.h", "message_pipe.h", + "watcher.cc", + "watcher.h", ] public_deps = [ diff --git a/mojo/public/cpp/system/tests/BUILD.gn b/mojo/public/cpp/system/tests/BUILD.gn index 068f009..e7c8f29 100644 --- a/mojo/public/cpp/system/tests/BUILD.gn +++ b/mojo/public/cpp/system/tests/BUILD.gn @@ -8,9 +8,11 @@ source_set("tests") { sources = [ "core_unittest.cc", "macros_unittest.cc", + "watcher_unittest.cc", ] deps = [ + "//base", "//mojo/public/c/system/tests", "//mojo/public/cpp/environment:standalone", "//mojo/public/cpp/system", diff --git a/mojo/public/cpp/system/tests/watcher_unittest.cc b/mojo/public/cpp/system/tests/watcher_unittest.cc new file mode 100644 index 0000000..ea78a56 --- /dev/null +++ b/mojo/public/cpp/system/tests/watcher_unittest.cc @@ -0,0 +1,186 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/bind.h" +#include "base/callback.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "base/thread_task_runner_handle.h" +#include "mojo/public/c/system/types.h" +#include "mojo/public/cpp/system/message_pipe.h" +#include "mojo/public/cpp/system/watcher.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace { + +template +void RunResultHandler(Handler f, MojoResult result) { f(result); } + +template +Watcher::ReadyCallback OnReady(Handler f) { + return base::Bind(&RunResultHandler, f); +} + +Watcher::ReadyCallback NotReached() { + return OnReady([] (MojoResult) { NOTREACHED(); }); +} + +class WatcherTest : public testing::Test { + public: + WatcherTest() {} + ~WatcherTest() override {} + + void SetUp() override { + message_loop_.reset(new base::MessageLoop); + } + + void TearDown() override { + message_loop_.reset(); + } + + protected: + scoped_ptr message_loop_; +}; + +TEST_F(WatcherTest, WatchBasic) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + + bool notified = false; + base::RunLoop run_loop; + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_OK, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + OnReady([&] (MojoResult result) { + EXPECT_EQ(MOJO_RESULT_OK, result); + notified = true; + run_loop.Quit(); + }))); + EXPECT_TRUE(b_watcher.IsWatching()); + + EXPECT_EQ(MOJO_RESULT_OK, WriteMessageRaw(a.get(), "hello", 5, nullptr, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + run_loop.Run(); + EXPECT_TRUE(notified); + + b_watcher.Cancel(); +} + +TEST_F(WatcherTest, WatchUnsatisfiable) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + a.reset(); + + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + NotReached())); + EXPECT_FALSE(b_watcher.IsWatching()); +} + +TEST_F(WatcherTest, WatchInvalidHandle) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + a.reset(); + b.reset(); + + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + NotReached())); + EXPECT_FALSE(b_watcher.IsWatching()); +} + +TEST_F(WatcherTest, Cancel) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + + base::RunLoop run_loop; + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_OK, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + NotReached())); + EXPECT_TRUE(b_watcher.IsWatching()); + b_watcher.Cancel(); + EXPECT_FALSE(b_watcher.IsWatching()); + + // This should never trigger the watcher. + EXPECT_EQ(MOJO_RESULT_OK, WriteMessageRaw(a.get(), "hello", 5, nullptr, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + base::ThreadTaskRunnerHandle::Get()->PostTask( + FROM_HERE, run_loop.QuitClosure()); + run_loop.Run(); +} + +TEST_F(WatcherTest, CancelOnClose) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + + base::RunLoop run_loop; + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_OK, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + OnReady([&] (MojoResult result) { + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + run_loop.Quit(); + }))); + EXPECT_TRUE(b_watcher.IsWatching()); + + // This should trigger the watcher above. + b.reset(); + + run_loop.Run(); + + EXPECT_FALSE(b_watcher.IsWatching()); +} + +TEST_F(WatcherTest, CancelOnDestruction) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + base::RunLoop run_loop; + { + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_OK, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + NotReached())); + EXPECT_TRUE(b_watcher.IsWatching()); + + // |b_watcher| should be cancelled when it goes out of scope. + } + + // This should never trigger the watcher above. + EXPECT_EQ(MOJO_RESULT_OK, WriteMessageRaw(a.get(), "hello", 5, nullptr, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + base::ThreadTaskRunnerHandle::Get()->PostTask( + FROM_HERE, run_loop.QuitClosure()); + run_loop.Run(); +} + +TEST_F(WatcherTest, NotifyOnMessageLoopDestruction) { + ScopedMessagePipeHandle a, b; + CreateMessagePipe(nullptr, &a, &b); + + bool notified = false; + Watcher b_watcher; + EXPECT_EQ(MOJO_RESULT_OK, + b_watcher.Start(b.get(), MOJO_HANDLE_SIGNAL_READABLE, + OnReady([&] (MojoResult result) { + EXPECT_EQ(MOJO_RESULT_ABORTED, result); + notified = true; + }))); + EXPECT_TRUE(b_watcher.IsWatching()); + + message_loop_.reset(); + + EXPECT_TRUE(notified); + + EXPECT_TRUE(b_watcher.IsWatching()); + b_watcher.Cancel(); +} + +} // namespace +} // namespace mojo diff --git a/mojo/public/cpp/system/watcher.cc b/mojo/public/cpp/system/watcher.cc new file mode 100644 index 0000000..ad965ff --- /dev/null +++ b/mojo/public/cpp/system/watcher.cc @@ -0,0 +1,134 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/cpp/system/watcher.h" + +#include "base/bind.h" +#include "base/location.h" +#include "base/macros.h" +#include "base/message_loop/message_loop.h" +#include "base/thread_task_runner_handle.h" +#include "mojo/public/c/system/functions.h" + +namespace mojo { + +class Watcher::MessageLoopObserver + : public base::MessageLoop::DestructionObserver { + public: + explicit MessageLoopObserver(Watcher* watcher) : watcher_(watcher) { + base::MessageLoop::current()->AddDestructionObserver(this); + } + + ~MessageLoopObserver() override { + StopObservingIfNecessary(); + } + + private: + // base::MessageLoop::DestructionObserver: + void WillDestroyCurrentMessageLoop() override { + StopObservingIfNecessary(); + if (watcher_->IsWatching()) + watcher_->OnHandleReady(MOJO_RESULT_ABORTED); + } + + void StopObservingIfNecessary() { + if (is_observing_) { + is_observing_ = false; + base::MessageLoop::current()->RemoveDestructionObserver(this); + } + } + + bool is_observing_ = true; + Watcher* watcher_; + + DISALLOW_COPY_AND_ASSIGN(MessageLoopObserver); +}; + +Watcher::Watcher() + : task_runner_(base::ThreadTaskRunnerHandle::Get()), + weak_factory_(this) { + weak_self_ = weak_factory_.GetWeakPtr(); +} + +Watcher::~Watcher() { + if(IsWatching()) + Cancel(); +} + +bool Watcher::IsWatching() const { + DCHECK(thread_checker_.CalledOnValidThread()); + return handle_.is_valid(); +} + +MojoResult Watcher::Start(Handle handle, + MojoHandleSignals signals, + const ReadyCallback& callback) { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(!IsWatching()); + DCHECK(!callback.is_null()); + + message_loop_observer_.reset(new MessageLoopObserver(this)); + callback_ = callback; + handle_ = handle; + MojoResult result = MojoWatch(handle_.value(), signals, + &Watcher::CallOnHandleReady, + reinterpret_cast(this)); + if (result != MOJO_RESULT_OK) { + handle_.set_value(kInvalidHandleValue); + callback_.Reset(); + message_loop_observer_.reset(); + DCHECK(result == MOJO_RESULT_FAILED_PRECONDITION || + result == MOJO_RESULT_INVALID_ARGUMENT); + return result; + } + + return MOJO_RESULT_OK; +} + +void Watcher::Cancel() { + DCHECK(thread_checker_.CalledOnValidThread()); + + // The watch may have already been cancelled if the handle was closed. + if (!handle_.is_valid()) + return; + + MojoResult result = + MojoCancelWatch(handle_.value(), reinterpret_cast(this)); + message_loop_observer_.reset(); + DCHECK_EQ(result, MOJO_RESULT_OK); + handle_.set_value(kInvalidHandleValue); + callback_.Reset(); +} + +void Watcher::OnHandleReady(MojoResult result) { + DCHECK(thread_checker_.CalledOnValidThread()); + + ReadyCallback callback = callback_; + if (result == MOJO_RESULT_CANCELLED) { + message_loop_observer_.reset(); + handle_.set_value(kInvalidHandleValue); + callback_.Reset(); + } + + // NOTE: It's legal for |callback| to delete |this|. + if (!callback.is_null()) + callback.Run(result); +} + +// static +void Watcher::CallOnHandleReady(uintptr_t context, + MojoResult result, + MojoHandleSignalsState signals_state) { + // NOTE: It is safe to assume the Watcher still exists because this callback + // will never be run after the Watcher's destructor. + // + // TODO: Maybe we should also expose |signals_state| throught he Watcher API. + // Current HandleWatcher users have no need for it, so it's omitted here. + Watcher* watcher = reinterpret_cast(context); + watcher->task_runner_->PostTask( + FROM_HERE, + base::Bind(&Watcher::OnHandleReady, watcher->weak_self_, result)); +} + +} // namespace mojo diff --git a/mojo/public/cpp/system/watcher.h b/mojo/public/cpp/system/watcher.h new file mode 100644 index 0000000..cd068aa --- /dev/null +++ b/mojo/public/cpp/system/watcher.h @@ -0,0 +1,117 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_PUBLIC_CPP_SYSTEM_WATCHER_H_ +#define MOJO_PUBLIC_CPP_SYSTEM_WATCHER_H_ + +#include "base/callback.h" +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/single_thread_task_runner.h" +#include "base/threading/thread_checker.h" +#include "mojo/public/c/system/types.h" +#include "mojo/public/cpp/system/handle.h" + +namespace mojo { + +// A Watcher watches a single Mojo handle for signal state changes. +// +// NOTE: Watchers may only be used on threads which have a running MessageLoop. +class Watcher { + public: + // A callback to be called any time a watched handle changes state in some + // interesting way. The |result| argument indicates one of the following + // conditions depending on its value: + // + // |MOJO_RESULT_OK|: One or more of the signals being watched is satisfied. + // + // |MOJO_RESULT_FAILED_PRECONDITION|: None of the signals being watched can + // ever be satisfied again. + // + // |MOJO_RESULT_CANCELLED|: The handle has been closed and the watch has + // been cancelled implicitly. + // + // |MOJO_RESULT_ABORTED|: Notifications can no longer be delivered for this + // watcher for some unspecified reason, e.g., the watching thread may + // be shutting down soon. Note that it is still necessary to explicitly + // Cancel() the watch in this case. + using ReadyCallback = base::Callback; + + // TODO(rockot/yzshen): Support giving Watcher an explicit TaskRunner for + // more fine-grained control over dispatch behavior. + Watcher(); + + // NOTE: This destructor automatically calls |Cancel()| if the Watcher is + // still active. + ~Watcher(); + + // Indicates if the Watcher is currently watching a handle. + bool IsWatching() const; + + // Starts watching |handle|. A Watcher may only watch one handle at a time, + // but it is safe to call this more than once as long as the previous watch + // has been cancelled (i.e. |is_watching()| returns |false|.) + // + // If no signals in |signals| can ever be satisfied for |handle|, this returns + // |MOJO_RESULT_FAILED_PRECONDITION|. + // + // If |handle| is not a valid watchable (message or data pipe) handle, this + // returns |MOJO_RESULT_INVALID_ARGUMENT|. + // + // Otherwise |MOJO_RESULT_OK| is returned and the handle will be watched until + // closure or cancellation. + // + // Once the watch is started, |callback| may be called at any time on the + // current thread until |Cancel()| is called or the handle is closed. + // + // Destroying the Watcher implicitly calls |Cancel()|. + MojoResult Start(Handle handle, + MojoHandleSignals signals, + const ReadyCallback& callback); + + // Cancels the current watch. Once this returns, the callback previously + // passed to |Start()| will never be called again for this Watcher. + void Cancel(); + + private: + class MessageLoopObserver; + friend class MessageLoopObserver; + + void OnHandleReady(MojoResult result); + + static void CallOnHandleReady(uintptr_t context, + MojoResult result, + MojoHandleSignalsState signals_state); + + base::ThreadChecker thread_checker_; + + // The TaskRunner of this Watcher's owning thread. This field is safe to + // access from any thread. + const scoped_refptr task_runner_; + + scoped_ptr message_loop_observer_; + + // A persistent weak reference to this Watcher which can be passed to the + // Dispatcher any time this object should be signalled. Safe to access (but + // not to dereference!) from any thread. + base::WeakPtr weak_self_; + + // Fields below must only be accessed on the Watcher's owning thread. + + // The handle currently under watch. Not owned. + Handle handle_; + + // The callback to call when the handle is signaled. + ReadyCallback callback_; + + base::WeakPtrFactory weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(Watcher); +}; + +} // namespace mojo + +#endif // MOJO_PUBLIC_CPP_SYSTEM_WATCHER_H_ -- cgit v1.1