diff options
author | sky@chromium.org <sky@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-07-24 14:57:28 +0000 |
---|---|---|
committer | sky@chromium.org <sky@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-07-24 14:57:28 +0000 |
commit | c902c5745c3a69afd98cc856b79aea7251de19ea (patch) | |
tree | 344e074720c35286ced2a439eee27bb1cd61678d | |
parent | df1d26053df86738d3c4d54901b6eb28901a2e5e (diff) | |
download | chromium_src-c902c5745c3a69afd98cc856b79aea7251de19ea.zip chromium_src-c902c5745c3a69afd98cc856b79aea7251de19ea.tar.gz chromium_src-c902c5745c3a69afd98cc856b79aea7251de19ea.tar.bz2 |
Makes HandleWatcher block until no longer waiting on pipe
BUG=394886
TEST=covered as best as can from tests
R=darin@chromium.org, jam@chromium.org
Review URL: https://codereview.chromium.org/409943003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@285266 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/threading/thread_restrictions.h | 6 | ||||
-rw-r--r-- | mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java | 56 | ||||
-rw-r--r-- | mojo/common/handle_watcher.cc | 36 | ||||
-rw-r--r-- | mojo/common/handle_watcher.h | 5 | ||||
-rw-r--r-- | mojo/common/handle_watcher_unittest.cc | 100 | ||||
-rw-r--r-- | mojo/common/message_pump_mojo.cc | 11 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/lib/connector.cc | 18 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/lib/connector.h | 3 |
8 files changed, 159 insertions, 76 deletions
diff --git a/base/threading/thread_restrictions.h b/base/threading/thread_restrictions.h index 6e543a1..2288049 100644 --- a/base/threading/thread_restrictions.h +++ b/base/threading/thread_restrictions.h @@ -61,6 +61,11 @@ class InFlightIO; namespace media { class AudioOutputController; } +namespace mojo { +namespace common { +class WatcherThreadManager; +} +} namespace net { class FileStreamPosix; class FileStreamWin; @@ -186,6 +191,7 @@ class BASE_EXPORT ThreadRestrictions { friend class ::ScopedAllowWaitForLegacyWebViewApi; friend class ::TestingAutomationProvider; friend class cc::CompletionEvent; + friend class mojo::common::WatcherThreadManager; friend class remoting::AutoThread; friend class MessagePumpDefault; friend class SequencedWorkerPool; diff --git a/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java b/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java index abe7fa3..97bab76 100644 --- a/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java +++ b/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java @@ -648,8 +648,8 @@ public class CoreImplTest extends MojoTestCase { assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); assertEquals(null, asyncWaiterResult.getException()); - core.getDefaultAsyncWaiter().asyncWait(handles.first, Core.HandleSignals.READABLE, - Core.DEADLINE_INFINITE, asyncWaiterResult); + Cancellable cancellable = core.getDefaultAsyncWaiter().asyncWait(handles.first, + Core.HandleSignals.READABLE, Core.DEADLINE_INFINITE, asyncWaiterResult); assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); assertEquals(null, asyncWaiterResult.getException()); @@ -657,7 +657,7 @@ public class CoreImplTest extends MojoTestCase { assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); assertEquals(null, asyncWaiterResult.getException()); - handles.first.close(); + cancellable.cancel(); nativeRunLoop(RUN_LOOP_TIMEOUT_MS); // TODO(qsr) Re-enable when MojoWaitMany handles it correctly. // assertNull(asyncWaiterResult.getException()); @@ -668,56 +668,6 @@ public class CoreImplTest extends MojoTestCase { * Testing core {@link AsyncWaiter} implementation. */ @SmallTest - public void testAsyncWaiterWaitingOnInvalidHandle() { - Core core = CoreImpl.getInstance(); - - // Closing the peer handle. - Pair<MessagePipeHandle, MessagePipeHandle> handles = core.createMessagePipe(null); - addHandlePairToClose(handles); - - final AsyncWaiterResult asyncWaiterResult = new AsyncWaiterResult(); - assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); - assertEquals(null, asyncWaiterResult.getException()); - - handles.first.close(); - core.getDefaultAsyncWaiter().asyncWait(handles.first, Core.HandleSignals.READABLE, - Core.DEADLINE_INFINITE, asyncWaiterResult); - assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); - assertEquals(null, asyncWaiterResult.getException()); - - nativeRunLoop(RUN_LOOP_TIMEOUT_MS); - assertNotNull(asyncWaiterResult.getException()); - assertEquals(MojoResult.INVALID_ARGUMENT, - asyncWaiterResult.getException().getMojoResult()); - assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); - } - - /** - * Testing core {@link AsyncWaiter} implementation. - */ - @SmallTest - public void testAsyncWaiterWaitingOnDefaultInvalidHandle() { - Core core = CoreImpl.getInstance(); - - final AsyncWaiterResult asyncWaiterResult = new AsyncWaiterResult(); - assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); - assertEquals(null, asyncWaiterResult.getException()); - - core.getDefaultAsyncWaiter().asyncWait(InvalidHandle.INSTANCE, Core.HandleSignals.READABLE, - Core.DEADLINE_INFINITE, asyncWaiterResult); - assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); - assertEquals(null, asyncWaiterResult.getException()); - - nativeRunLoop(RUN_LOOP_TIMEOUT_MS); - assertNotNull(asyncWaiterResult.getException()); - assertEquals(MojoResult.INVALID_ARGUMENT, asyncWaiterResult.getException().getMojoResult()); - assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult()); - } - - /** - * Testing core {@link AsyncWaiter} implementation. - */ - @SmallTest public void testAsyncWaiterWaitingWithTimeout() { Core core = CoreImpl.getInstance(); diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc index 84e8b64..0c7cdef 100644 --- a/mojo/common/handle_watcher.cc +++ b/mojo/common/handle_watcher.cc @@ -14,7 +14,9 @@ #include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop_proxy.h" #include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" #include "base/threading/thread.h" +#include "base/threading/thread_restrictions.h" #include "base/time/time.h" #include "mojo/common/message_pump_mojo.h" #include "mojo/common/message_pump_mojo_handler.h" @@ -68,7 +70,10 @@ class WatcherBackend : public MessagePumpMojoHandler { virtual ~WatcherBackend(); void StartWatching(const WatchData& data); - void StopWatching(WatcherID watcher_id); + + // Cancels a previously schedule request to start a watch. When done signals + // |event|. + void StopWatching(WatcherID watcher_id, base::WaitableEvent* event); private: typedef std::map<Handle, WatchData> HandleToWatchDataMap; @@ -107,15 +112,16 @@ void WatcherBackend::StartWatching(const WatchData& data) { data.deadline); } -void WatcherBackend::StopWatching(WatcherID watcher_id) { +void WatcherBackend::StopWatching(WatcherID watcher_id, + base::WaitableEvent* event) { // Because of the thread hop it is entirely possible to get here and not // have a valid handle registered for |watcher_id|. Handle handle; - if (!GetMojoHandleByWatcherID(watcher_id, &handle)) - return; - - handle_to_data_.erase(handle); - message_pump_mojo->RemoveHandler(handle); + if (GetMojoHandleByWatcherID(watcher_id, &handle)) { + handle_to_data_.erase(handle); + message_pump_mojo->RemoveHandler(handle); + } + event->Signal(); } void WatcherBackend::RemoveAndNotify(const Handle& handle, @@ -153,6 +159,8 @@ void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { // WatcherThreadManager manages the background thread that listens for handles // to be ready. All requests are handled by WatcherBackend. +} // namespace + class WatcherThreadManager { public: ~WatcherThreadManager(); @@ -208,7 +216,7 @@ WatcherID WatcherThreadManager::StartWatching( data.message_loop = base::MessageLoopProxy::current(); DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), data.message_loop.get()); - // We outlive |thread_|, so it's safe to use Unretained() here. + // We own |thread_|, so it's safe to use Unretained() here. thread_.message_loop()->PostTask( FROM_HERE, base::Bind(&WatcherBackend::StartWatching, @@ -218,12 +226,18 @@ WatcherID WatcherThreadManager::StartWatching( } void WatcherThreadManager::StopWatching(WatcherID watcher_id) { - // We outlive |thread_|, so it's safe to use Unretained() here. + base::ThreadRestrictions::ScopedAllowWait allow_wait; + base::WaitableEvent event(true, false); + // We own |thread_|, so it's safe to use Unretained() here. thread_.message_loop()->PostTask( FROM_HERE, base::Bind(&WatcherBackend::StopWatching, base::Unretained(&backend_), - watcher_id)); + watcher_id, + &event)); + + // We need to block until the handle is actually removed. + event.Wait(); } WatcherThreadManager::WatcherThreadManager() @@ -233,8 +247,6 @@ WatcherThreadManager::WatcherThreadManager() thread_.StartWithOptions(thread_options); } -} // namespace - // HandleWatcher::State -------------------------------------------------------- // Represents the state of the HandleWatcher. Owns the user's callback and diff --git a/mojo/common/handle_watcher.h b/mojo/common/handle_watcher.h index 9fac3f5..f40ae88 100644 --- a/mojo/common/handle_watcher.h +++ b/mojo/common/handle_watcher.h @@ -27,6 +27,8 @@ class HandleWatcherTest; class MOJO_COMMON_EXPORT HandleWatcher { public: HandleWatcher(); + + // The destructor implicitly stops listening. See Stop() for details. ~HandleWatcher(); // Starts listening for |handle|. This implicitly invokes Stop(). In other @@ -41,7 +43,8 @@ class MOJO_COMMON_EXPORT HandleWatcher { MojoDeadline deadline, const base::Callback<void(MojoResult)>& callback); - // Stops listening. Does nothing if not in the process of listening. + // Stops listening. Does nothing if not in the process of listening. Blocks + // until no longer listening on the handle. void Stop(); private: diff --git a/mojo/common/handle_watcher_unittest.cc b/mojo/common/handle_watcher_unittest.cc index 02cc610..b734e99 100644 --- a/mojo/common/handle_watcher_unittest.cc +++ b/mojo/common/handle_watcher_unittest.cc @@ -9,8 +9,10 @@ #include "base/at_exit.h" #include "base/auto_reset.h" #include "base/bind.h" +#include "base/memory/scoped_vector.h" #include "base/run_loop.h" #include "base/test/simple_test_tick_clock.h" +#include "base/threading/thread.h" #include "mojo/common/time_helper.h" #include "mojo/public/cpp/system/core.h" #include "mojo/public/cpp/test_support/test_utils.h" @@ -336,6 +338,104 @@ TEST(HandleWatcherCleanEnvironmentTest, AbortedOnMessageLoopDestruction) { EXPECT_EQ(MOJO_RESULT_ABORTED, result); } +void NeverReached(MojoResult result) { + FAIL() << "Callback should never be invoked " << result; +} + +// Called on the main thread when a thread is done. Decrements |active_count| +// and if |active_count| is zero quits |run_loop|. +void StressThreadDone(base::RunLoop* run_loop, int* active_count) { + (*active_count)--; + EXPECT_GE(*active_count, 0); + if (*active_count == 0) + run_loop->Quit(); +} + +// See description of StressTest. This is called on the background thread. +// |count| is the number of HandleWatchers to create. |active_count| is the +// number of outstanding threads, |task_runner| the task runner for the main +// thread and |run_loop| the run loop that should be quit when there are no more +// threads running. When done StressThreadDone() is invoked on the main thread. +// |active_count| and |run_loop| should only be used on the main thread. +void RunStressTest(int count, + scoped_refptr<base::TaskRunner> task_runner, + base::RunLoop* run_loop, + int* active_count) { + struct TestData { + MessagePipe pipe; + HandleWatcher watcher; + }; + ScopedVector<TestData> data_vector; + for (int i = 0; i < count; ++i) { + if (i % 20 == 0) { + // Every so often we wait. This results in some level of thread balancing + // as well as making sure HandleWatcher has time to actually start some + // watches. + MessagePipe test_pipe; + ASSERT_TRUE(test_pipe.handle0.is_valid()); + CallbackHelper callback_helper; + HandleWatcher watcher; + callback_helper.Start(&watcher, test_pipe.handle0.get()); + RunUntilIdle(); + EXPECT_FALSE(callback_helper.got_callback()); + EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(), + std::string())); + base::MessageLoop::ScopedNestableTaskAllower scoper( + base::MessageLoop::current()); + callback_helper.RunUntilGotCallback(); + EXPECT_TRUE(callback_helper.got_callback()); + } else { + scoped_ptr<TestData> test_data(new TestData); + ASSERT_TRUE(test_data->pipe.handle0.is_valid()); + test_data->watcher.Start(test_data->pipe.handle0.get(), + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + base::Bind(&NeverReached)); + data_vector.push_back(test_data.release()); + } + if (i % 15 == 0) + data_vector.clear(); + } + task_runner->PostTask(FROM_HERE, + base::Bind(&StressThreadDone, run_loop, + active_count)); +} + +// This test is meant to stress HandleWatcher. It uses from various threads +// repeatedly starting and stopping watches. It spins up kThreadCount +// threads. Each thread creates kWatchCount watches. Every so often each thread +// writes to a pipe and waits for the response. +TEST(HandleWatcherCleanEnvironmentTest, StressTest) { +#if defined(NDEBUG) + const int kThreadCount = 15; + const int kWatchCount = 400; +#else + const int kThreadCount = 10; + const int kWatchCount = 250; +#endif + + base::ShadowingAtExitManager at_exit; + base::MessageLoop message_loop; + base::RunLoop run_loop; + ScopedVector<base::Thread> threads; + int threads_active_counter = kThreadCount; + // Starts the threads first and then post the task in hopes of having more + // threads running at once. + for (int i = 0; i < kThreadCount; ++i) { + scoped_ptr<base::Thread> thread(new base::Thread("test thread")); + thread->Start(); + threads.push_back(thread.release()); + } + for (int i = 0; i < kThreadCount; ++i) { + threads[i]->task_runner()->PostTask( + FROM_HERE, base::Bind(&RunStressTest, kWatchCount, + message_loop.task_runner(), + &run_loop, &threads_active_counter)); + } + run_loop.Run(); + ASSERT_EQ(0, threads_active_counter); +} + } // namespace test } // namespace common } // namespace mojo diff --git a/mojo/common/message_pump_mojo.cc b/mojo/common/message_pump_mojo.cc index 37d636b..c7903f2 100644 --- a/mojo/common/message_pump_mojo.cc +++ b/mojo/common/message_pump_mojo.cc @@ -157,7 +157,6 @@ void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { switch (result) { case MOJO_RESULT_CANCELLED: case MOJO_RESULT_FAILED_PRECONDITION: - case MOJO_RESULT_INVALID_ARGUMENT: RemoveFirstInvalidHandle(wait_state); break; case MOJO_RESULT_DEADLINE_EXCEEDED: @@ -190,9 +189,13 @@ void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) { for (size_t i = 1; i < wait_state.handles.size(); ++i) { const MojoResult result = Wait(wait_state.handles[i], wait_state.wait_signals[i], 0); - if (result == MOJO_RESULT_INVALID_ARGUMENT || - result == MOJO_RESULT_FAILED_PRECONDITION || - result == MOJO_RESULT_CANCELLED) { + if (result == MOJO_RESULT_INVALID_ARGUMENT) { + // We should never have an invalid argument. If we do it indicates + // RemoveHandler() was not invoked and is likely to cause problems else + // where in the stack if we ignore it. + CHECK(false); + } else if (result == MOJO_RESULT_FAILED_PRECONDITION || + result == MOJO_RESULT_CANCELLED) { // Remove the handle first, this way if OnHandleError() tries to remove // the handle our iterator isn't invalidated. DCHECK(handlers_.find(wait_state.handles[i]) != handlers_.end()); diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc index 170c4d7..6665ff9 100644 --- a/mojo/public/cpp/bindings/lib/connector.cc +++ b/mojo/public/cpp/bindings/lib/connector.cc @@ -34,19 +34,16 @@ Connector::~Connector() { if (destroyed_flag_) *destroyed_flag_ = true; - if (async_wait_id_) - waiter_->CancelWait(async_wait_id_); + CancelWait(); } void Connector::CloseMessagePipe() { + CancelWait(); Close(message_pipe_.Pass()); } ScopedMessagePipeHandle Connector::PassMessagePipe() { - if (async_wait_id_) { - waiter_->CancelWait(async_wait_id_); - async_wait_id_ = 0; - } + CancelWait(); return message_pipe_.Pass(); } @@ -136,6 +133,7 @@ void Connector::OnHandleReady(MojoResult result) { } void Connector::WaitToReadMore() { + MOJO_DCHECK(!async_wait_id_); async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, @@ -190,6 +188,14 @@ void Connector::ReadAllAvailableMessages() { } } +void Connector::CancelWait() { + if (!async_wait_id_) + return; + + waiter_->CancelWait(async_wait_id_); + async_wait_id_ = 0; +} + void Connector::NotifyError() { error_ = true; // The error handler might destroyed |this|. Also, after an error, all method diff --git a/mojo/public/cpp/bindings/lib/connector.h b/mojo/public/cpp/bindings/lib/connector.h index dc0a646..e92a4e7 100644 --- a/mojo/public/cpp/bindings/lib/connector.h +++ b/mojo/public/cpp/bindings/lib/connector.h @@ -85,6 +85,9 @@ class Connector : public MessageReceiver { void NotifyError(); + // Cancels any calls made to |waiter_|. + void CancelWait(); + ErrorHandler* error_handler_; const MojoAsyncWaiter* waiter_; |