diff options
author | yzshen <yzshen@chromium.org> | 2014-08-29 15:51:42 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2014-08-29 23:04:16 +0000 |
commit | d928bdd26e4800d3ffc76a56877dda8dafae26b4 (patch) | |
tree | 89b96a0142f7029de92490d67d1ceddd89eaf6ea /mojo/common | |
parent | 5156fbffe13e2581984850b5654286bee306ac38 (diff) | |
download | chromium_src-d928bdd26e4800d3ffc76a56877dda8dafae26b4.zip chromium_src-d928bdd26e4800d3ffc76a56877dda8dafae26b4.tar.gz chromium_src-d928bdd26e4800d3ffc76a56877dda8dafae26b4.tar.bz2 |
Make HandleWatcher watch on the same thread if the thread is running a MessagePumpMojo.
This CL also makes ApplicationRunnerChromium use MessagePumpMojo by default
BUG=None
TEST=None
Review URL: https://codereview.chromium.org/506353002
Cr-Commit-Position: refs/heads/master@{#292702}
Diffstat (limited to 'mojo/common')
-rw-r--r-- | mojo/common/handle_watcher.cc | 159 | ||||
-rw-r--r-- | mojo/common/handle_watcher.h | 6 | ||||
-rw-r--r-- | mojo/common/handle_watcher_unittest.cc | 66 | ||||
-rw-r--r-- | mojo/common/message_pump_mojo.cc | 15 | ||||
-rw-r--r-- | mojo/common/message_pump_mojo.h | 5 |
5 files changed, 183 insertions, 68 deletions
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc index 33bb0bd..eeb3e043 100644 --- a/mojo/common/handle_watcher.cc +++ b/mojo/common/handle_watcher.cc @@ -9,6 +9,8 @@ #include "base/atomic_sequence_num.h" #include "base/bind.h" #include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/macros.h" #include "base/memory/singleton.h" #include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" @@ -31,14 +33,6 @@ namespace { const char kWatcherThreadName[] = "handle-watcher-thread"; -// TODO(sky): this should be unnecessary once MessageLoop has been refactored. -MessagePumpMojo* message_pump_mojo = NULL; - -scoped_ptr<base::MessagePump> CreateMessagePumpMojo() { - message_pump_mojo = new MessagePumpMojo; - return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass(); -} - base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); @@ -106,9 +100,9 @@ void WatcherBackend::StartWatching(const WatchData& data) { DCHECK_EQ(0u, handle_to_data_.count(data.handle)); handle_to_data_[data.handle] = data; - message_pump_mojo->AddHandler(this, data.handle, - data.handle_signals, - data.deadline); + MessagePumpMojo::current()->AddHandler(this, data.handle, + data.handle_signals, + data.deadline); } void WatcherBackend::StopWatching(WatcherID watcher_id) { @@ -117,7 +111,7 @@ void WatcherBackend::StopWatching(WatcherID watcher_id) { Handle handle; if (GetMojoHandleByWatcherID(watcher_id, &handle)) { handle_to_data_.erase(handle); - message_pump_mojo->RemoveHandler(handle); + MessagePumpMojo::current()->RemoveHandler(handle); } } @@ -128,7 +122,7 @@ void WatcherBackend::RemoveAndNotify(const Handle& handle, const WatchData data(handle_to_data_[handle]); handle_to_data_.erase(handle); - message_pump_mojo->RemoveHandler(handle); + MessagePumpMojo::current()->RemoveHandler(handle); data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result)); } @@ -312,58 +306,43 @@ void WatcherThreadManager::ProcessRequestsOnBackendThread() { WatcherThreadManager::WatcherThreadManager() : thread_(kWatcherThreadName) { base::Thread::Options thread_options; - thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo); + thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); thread_.StartWithOptions(thread_options); } -// HandleWatcher::State -------------------------------------------------------- +// HandleWatcher::StateBase and subclasses ------------------------------------- -// Represents the state of the HandleWatcher. Owns the user's callback and +// The base class of HandleWatcher's state. Owns the user's callback and // monitors the current thread's MessageLoop to know when to force the callback // to run (with an error) even though the pipe hasn't been signaled yet. -class HandleWatcher::State : public base::MessageLoop::DestructionObserver { +class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { public: - State(HandleWatcher* watcher, - const Handle& handle, - MojoHandleSignals handle_signals, - MojoDeadline deadline, - const base::Callback<void(MojoResult)>& callback) + StateBase(HandleWatcher* watcher, + const base::Callback<void(MojoResult)>& callback) : watcher_(watcher), callback_(callback), - got_ready_(false), - weak_factory_(this) { + got_ready_(false) { base::MessageLoop::current()->AddDestructionObserver(this); - - watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( - handle, - handle_signals, - MojoDeadlineToTimeTicks(deadline), - base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr())); } - virtual ~State() { + virtual ~StateBase() { base::MessageLoop::current()->RemoveDestructionObserver(this); + } - // If we've been notified the handle is ready (|got_ready_| is true) then - // the watch has been implicitly removed by - // WatcherThreadManager/MessagePumpMojo and we don't have to call - // StopWatching(). To do so would needlessly entail posting a task and - // blocking until the background thread services it. - if (!got_ready_) - WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); + protected: + void NotifyHandleReady(MojoResult result) { + got_ready_ = true; + NotifyAndDestroy(result); } + bool got_ready() const { return got_ready_; } + private: virtual void WillDestroyCurrentMessageLoop() OVERRIDE { // The current thread is exiting. Simulate a watch error. NotifyAndDestroy(MOJO_RESULT_ABORTED); } - void OnHandleReady(MojoResult result) { - got_ready_ = true; - NotifyAndDestroy(result); - } - void NotifyAndDestroy(MojoResult result) { base::Callback<void(MojoResult)> callback = callback_; watcher_->Stop(); // Destroys |this|. @@ -372,14 +351,96 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver { } HandleWatcher* watcher_; - WatcherID watcher_id_; base::Callback<void(MojoResult)> callback_; // Have we been notified that the handle is ready? bool got_ready_; + DISALLOW_COPY_AND_ASSIGN(StateBase); +}; + +// If the thread on which HandleWatcher is used runs MessagePumpMojo, +// SameThreadWatchingState is used to directly watch the handle on the same +// thread. +class HandleWatcher::SameThreadWatchingState : public StateBase, + public MessagePumpMojoHandler { + public: + SameThreadWatchingState(HandleWatcher* watcher, + const Handle& handle, + MojoHandleSignals handle_signals, + MojoDeadline deadline, + const base::Callback<void(MojoResult)>& callback) + : StateBase(watcher, callback), + handle_(handle) { + DCHECK(MessagePumpMojo::IsCurrent()); + + MessagePumpMojo::current()->AddHandler( + this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); + } + + virtual ~SameThreadWatchingState() { + if (!got_ready()) + MessagePumpMojo::current()->RemoveHandler(handle_); + } + + private: + // MessagePumpMojoHandler overrides: + virtual void OnHandleReady(const Handle& handle) OVERRIDE { + StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); + } + + virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE { + StopWatchingAndNotifyReady(handle, result); + } + + void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { + DCHECK_EQ(handle.value(), handle_.value()); + MessagePumpMojo::current()->RemoveHandler(handle_); + NotifyHandleReady(result); + } + + Handle handle_; + + DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); +}; + +// If the thread on which HandleWatcher is used runs a message pump different +// from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the +// handle on the handle watcher thread. +class HandleWatcher::SecondaryThreadWatchingState : public StateBase { + public: + SecondaryThreadWatchingState(HandleWatcher* watcher, + const Handle& handle, + MojoHandleSignals handle_signals, + MojoDeadline deadline, + const base::Callback<void(MojoResult)>& callback) + : StateBase(watcher, callback), + weak_factory_(this) { + watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( + handle, + handle_signals, + MojoDeadlineToTimeTicks(deadline), + base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, + weak_factory_.GetWeakPtr())); + } + + virtual ~SecondaryThreadWatchingState() { + // If we've been notified the handle is ready (|got_ready()| is true) then + // the watch has been implicitly removed by + // WatcherThreadManager/MessagePumpMojo and we don't have to call + // StopWatching(). To do so would needlessly entail posting a task and + // blocking until the background thread services it. + if (!got_ready()) + WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); + } + + private: + WatcherID watcher_id_; + // Used to weakly bind |this| to the WatcherThreadManager. - base::WeakPtrFactory<State> weak_factory_; + base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); }; // HandleWatcher --------------------------------------------------------------- @@ -397,7 +458,13 @@ void HandleWatcher::Start(const Handle& handle, DCHECK(handle.is_valid()); DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); - state_.reset(new State(this, handle, handle_signals, deadline, callback)); + if (MessagePumpMojo::IsCurrent()) { + state_.reset(new SameThreadWatchingState( + this, handle, handle_signals, deadline, callback)); + } else { + state_.reset(new SecondaryThreadWatchingState( + this, handle, handle_signals, deadline, callback)); + } } void HandleWatcher::Stop() { diff --git a/mojo/common/handle_watcher.h b/mojo/common/handle_watcher.h index f40ae88..dfcb145 100644 --- a/mojo/common/handle_watcher.h +++ b/mojo/common/handle_watcher.h @@ -48,10 +48,12 @@ class MOJO_COMMON_EXPORT HandleWatcher { void Stop(); private: - class State; + class StateBase; + class SameThreadWatchingState; + class SecondaryThreadWatchingState; // If non-NULL Start() has been invoked. - scoped_ptr<State> state_; + scoped_ptr<StateBase> state_; DISALLOW_COPY_AND_ASSIGN(HandleWatcher); }; diff --git a/mojo/common/handle_watcher_unittest.cc b/mojo/common/handle_watcher_unittest.cc index b734e99..61913cd 100644 --- a/mojo/common/handle_watcher_unittest.cc +++ b/mojo/common/handle_watcher_unittest.cc @@ -13,6 +13,7 @@ #include "base/run_loop.h" #include "base/test/simple_test_tick_clock.h" #include "base/threading/thread.h" +#include "mojo/common/message_pump_mojo.h" #include "mojo/common/time_helper.h" #include "mojo/public/cpp/system/core.h" #include "mojo/public/cpp/test_support/test_utils.h" @@ -22,6 +23,11 @@ namespace mojo { namespace common { namespace test { +enum MessageLoopConfig { + MESSAGE_LOOP_CONFIG_DEFAULT = 0, + MESSAGE_LOOP_CONFIG_MOJO = 1 +}; + void ObserveCallback(bool* was_signaled, MojoResult* result_observed, MojoResult result) { @@ -42,6 +48,15 @@ void DeleteWatcherAndForwardResult( next_callback.Run(result); } +scoped_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) { + scoped_ptr<base::MessageLoop> loop; + if (config == MESSAGE_LOOP_CONFIG_DEFAULT) + loop.reset(new base::MessageLoop()); + else + loop.reset(new base::MessageLoop(MessagePumpMojo::Create())); + return loop.Pass(); +} + // Helper class to manage the callback and running the message loop waiting for // message to be received. Typical usage is something like: // Schedule callback returned from GetCallback(). @@ -103,14 +118,18 @@ class CallbackHelper { DISALLOW_COPY_AND_ASSIGN(CallbackHelper); }; -class HandleWatcherTest : public testing::Test { +class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> { public: - HandleWatcherTest() {} + HandleWatcherTest() : message_loop_(CreateMessageLoop(GetParam())) {} virtual ~HandleWatcherTest() { test::SetTickClockForTest(NULL); } protected: + void TearDownMessageLoop() { + message_loop_.reset(); + } + void InstallTickClock() { test::SetTickClockForTest(&tick_clock_); } @@ -119,13 +138,17 @@ class HandleWatcherTest : public testing::Test { private: base::ShadowingAtExitManager at_exit_; - base::MessageLoop message_loop_; + scoped_ptr<base::MessageLoop> message_loop_; DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest); }; +INSTANTIATE_TEST_CASE_P( + MultipleMessageLoopConfigs, HandleWatcherTest, + testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO)); + // Trivial test case with a single handle to watch. -TEST_F(HandleWatcherTest, SingleHandler) { +TEST_P(HandleWatcherTest, SingleHandler) { MessagePipe test_pipe; ASSERT_TRUE(test_pipe.handle0.is_valid()); CallbackHelper callback_helper; @@ -141,7 +164,7 @@ TEST_F(HandleWatcherTest, SingleHandler) { // Creates three handles and notfies them in reverse order ensuring each one is // notified appropriately. -TEST_F(HandleWatcherTest, ThreeHandles) { +TEST_P(HandleWatcherTest, ThreeHandles) { MessagePipe test_pipe1; MessagePipe test_pipe2; MessagePipe test_pipe3; @@ -206,7 +229,7 @@ TEST_F(HandleWatcherTest, ThreeHandles) { } // Verifies Start() invoked a second time works. -TEST_F(HandleWatcherTest, Restart) { +TEST_P(HandleWatcherTest, Restart) { MessagePipe test_pipe1; MessagePipe test_pipe2; CallbackHelper callback_helper1; @@ -258,7 +281,7 @@ TEST_F(HandleWatcherTest, Restart) { } // Verifies deadline is honored. -TEST_F(HandleWatcherTest, Deadline) { +TEST_P(HandleWatcherTest, Deadline) { InstallTickClock(); MessagePipe test_pipe1; @@ -301,7 +324,7 @@ TEST_F(HandleWatcherTest, Deadline) { EXPECT_FALSE(callback_helper3.got_callback()); } -TEST_F(HandleWatcherTest, DeleteInCallback) { +TEST_P(HandleWatcherTest, DeleteInCallback) { MessagePipe test_pipe; CallbackHelper callback_helper; @@ -316,23 +339,19 @@ TEST_F(HandleWatcherTest, DeleteInCallback) { EXPECT_TRUE(callback_helper.got_callback()); } -TEST(HandleWatcherCleanEnvironmentTest, AbortedOnMessageLoopDestruction) { +TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) { bool was_signaled = false; MojoResult result = MOJO_RESULT_OK; - base::ShadowingAtExitManager at_exit; MessagePipe pipe; HandleWatcher watcher; - { - base::MessageLoop loop; - - watcher.Start(pipe.handle0.get(), - MOJO_HANDLE_SIGNAL_READABLE, - MOJO_DEADLINE_INDEFINITE, - base::Bind(&ObserveCallback, &was_signaled, &result)); + watcher.Start(pipe.handle0.get(), + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + base::Bind(&ObserveCallback, &was_signaled, &result)); - // Now, let the MessageLoop get torn down. We expect our callback to run. - } + // Now, let the MessageLoop get torn down. We expect our callback to run. + TearDownMessageLoop(); EXPECT_TRUE(was_signaled); EXPECT_EQ(MOJO_RESULT_ABORTED, result); @@ -423,7 +442,14 @@ TEST(HandleWatcherCleanEnvironmentTest, StressTest) { // threads running at once. for (int i = 0; i < kThreadCount; ++i) { scoped_ptr<base::Thread> thread(new base::Thread("test thread")); - thread->Start(); + if (i % 2) { + base::Thread::Options thread_options; + thread_options.message_pump_factory = + base::Bind(&MessagePumpMojo::Create); + thread->StartWithOptions(thread_options); + } else { + thread->Start(); + } threads.push_back(thread.release()); } for (int i = 0; i < kThreadCount; ++i) { diff --git a/mojo/common/message_pump_mojo.cc b/mojo/common/message_pump_mojo.cc index a227592..91b78d1 100644 --- a/mojo/common/message_pump_mojo.cc +++ b/mojo/common/message_pump_mojo.cc @@ -8,7 +8,9 @@ #include <vector> #include "base/debug/alias.h" +#include "base/lazy_instance.h" #include "base/logging.h" +#include "base/threading/thread_local.h" #include "base/time/time.h" #include "mojo/common/message_pump_mojo_handler.h" #include "mojo/common/time_helper.h" @@ -17,6 +19,9 @@ namespace mojo { namespace common { namespace { +base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky + g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; + MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, base::TimeTicks now) { // The is_null() check matches that of HandleWatcher as well as how @@ -52,9 +57,14 @@ struct MessagePumpMojo::RunState { }; MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { + DCHECK(!current()) + << "There is already a MessagePumpMojo instance on this thread."; + g_tls_current_pump.Pointer()->Set(this); } MessagePumpMojo::~MessagePumpMojo() { + DCHECK_EQ(this, current()); + g_tls_current_pump.Pointer()->Set(NULL); } // static @@ -62,6 +72,11 @@ scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { return scoped_ptr<MessagePump>(new MessagePumpMojo()); } +// static +MessagePumpMojo* MessagePumpMojo::current() { + return g_tls_current_pump.Pointer()->Get(); +} + void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, const Handle& handle, MojoHandleSignals wait_signals, diff --git a/mojo/common/message_pump_mojo.h b/mojo/common/message_pump_mojo.h index 7b2c170..8b71316 100644 --- a/mojo/common/message_pump_mojo.h +++ b/mojo/common/message_pump_mojo.h @@ -29,6 +29,11 @@ class MOJO_COMMON_EXPORT MessagePumpMojo : public base::MessagePump { // using |base::Bind()|). static scoped_ptr<base::MessagePump> Create(); + // Returns the MessagePumpMojo instance of the current thread, if it exists. + static MessagePumpMojo* current(); + + static bool IsCurrent() { return !!current(); } + // Registers a MessagePumpMojoHandler for the specified handle. Only one // handler can be registered for a specified handle. // NOTE: a value of 0 for |deadline| indicates an indefinite timeout. |