summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsky@chromium.org <sky@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-07-24 14:57:28 +0000
committersky@chromium.org <sky@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-07-24 14:57:28 +0000
commitc902c5745c3a69afd98cc856b79aea7251de19ea (patch)
tree344e074720c35286ced2a439eee27bb1cd61678d
parentdf1d26053df86738d3c4d54901b6eb28901a2e5e (diff)
downloadchromium_src-c902c5745c3a69afd98cc856b79aea7251de19ea.zip
chromium_src-c902c5745c3a69afd98cc856b79aea7251de19ea.tar.gz
chromium_src-c902c5745c3a69afd98cc856b79aea7251de19ea.tar.bz2
Makes HandleWatcher block until no longer waiting on pipe
BUG=394886 TEST=covered as best as can from tests R=darin@chromium.org, jam@chromium.org Review URL: https://codereview.chromium.org/409943003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@285266 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/threading/thread_restrictions.h6
-rw-r--r--mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java56
-rw-r--r--mojo/common/handle_watcher.cc36
-rw-r--r--mojo/common/handle_watcher.h5
-rw-r--r--mojo/common/handle_watcher_unittest.cc100
-rw-r--r--mojo/common/message_pump_mojo.cc11
-rw-r--r--mojo/public/cpp/bindings/lib/connector.cc18
-rw-r--r--mojo/public/cpp/bindings/lib/connector.h3
8 files changed, 159 insertions, 76 deletions
diff --git a/base/threading/thread_restrictions.h b/base/threading/thread_restrictions.h
index 6e543a1..2288049 100644
--- a/base/threading/thread_restrictions.h
+++ b/base/threading/thread_restrictions.h
@@ -61,6 +61,11 @@ class InFlightIO;
namespace media {
class AudioOutputController;
}
+namespace mojo {
+namespace common {
+class WatcherThreadManager;
+}
+}
namespace net {
class FileStreamPosix;
class FileStreamWin;
@@ -186,6 +191,7 @@ class BASE_EXPORT ThreadRestrictions {
friend class ::ScopedAllowWaitForLegacyWebViewApi;
friend class ::TestingAutomationProvider;
friend class cc::CompletionEvent;
+ friend class mojo::common::WatcherThreadManager;
friend class remoting::AutoThread;
friend class MessagePumpDefault;
friend class SequencedWorkerPool;
diff --git a/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java b/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java
index abe7fa3..97bab76 100644
--- a/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java
+++ b/mojo/android/javatests/src/org/chromium/mojo/system/impl/CoreImplTest.java
@@ -648,8 +648,8 @@ public class CoreImplTest extends MojoTestCase {
assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
assertEquals(null, asyncWaiterResult.getException());
- core.getDefaultAsyncWaiter().asyncWait(handles.first, Core.HandleSignals.READABLE,
- Core.DEADLINE_INFINITE, asyncWaiterResult);
+ Cancellable cancellable = core.getDefaultAsyncWaiter().asyncWait(handles.first,
+ Core.HandleSignals.READABLE, Core.DEADLINE_INFINITE, asyncWaiterResult);
assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
assertEquals(null, asyncWaiterResult.getException());
@@ -657,7 +657,7 @@ public class CoreImplTest extends MojoTestCase {
assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
assertEquals(null, asyncWaiterResult.getException());
- handles.first.close();
+ cancellable.cancel();
nativeRunLoop(RUN_LOOP_TIMEOUT_MS);
// TODO(qsr) Re-enable when MojoWaitMany handles it correctly.
// assertNull(asyncWaiterResult.getException());
@@ -668,56 +668,6 @@ public class CoreImplTest extends MojoTestCase {
* Testing core {@link AsyncWaiter} implementation.
*/
@SmallTest
- public void testAsyncWaiterWaitingOnInvalidHandle() {
- Core core = CoreImpl.getInstance();
-
- // Closing the peer handle.
- Pair<MessagePipeHandle, MessagePipeHandle> handles = core.createMessagePipe(null);
- addHandlePairToClose(handles);
-
- final AsyncWaiterResult asyncWaiterResult = new AsyncWaiterResult();
- assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
- assertEquals(null, asyncWaiterResult.getException());
-
- handles.first.close();
- core.getDefaultAsyncWaiter().asyncWait(handles.first, Core.HandleSignals.READABLE,
- Core.DEADLINE_INFINITE, asyncWaiterResult);
- assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
- assertEquals(null, asyncWaiterResult.getException());
-
- nativeRunLoop(RUN_LOOP_TIMEOUT_MS);
- assertNotNull(asyncWaiterResult.getException());
- assertEquals(MojoResult.INVALID_ARGUMENT,
- asyncWaiterResult.getException().getMojoResult());
- assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
- }
-
- /**
- * Testing core {@link AsyncWaiter} implementation.
- */
- @SmallTest
- public void testAsyncWaiterWaitingOnDefaultInvalidHandle() {
- Core core = CoreImpl.getInstance();
-
- final AsyncWaiterResult asyncWaiterResult = new AsyncWaiterResult();
- assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
- assertEquals(null, asyncWaiterResult.getException());
-
- core.getDefaultAsyncWaiter().asyncWait(InvalidHandle.INSTANCE, Core.HandleSignals.READABLE,
- Core.DEADLINE_INFINITE, asyncWaiterResult);
- assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
- assertEquals(null, asyncWaiterResult.getException());
-
- nativeRunLoop(RUN_LOOP_TIMEOUT_MS);
- assertNotNull(asyncWaiterResult.getException());
- assertEquals(MojoResult.INVALID_ARGUMENT, asyncWaiterResult.getException().getMojoResult());
- assertEquals(Integer.MIN_VALUE, asyncWaiterResult.getResult());
- }
-
- /**
- * Testing core {@link AsyncWaiter} implementation.
- */
- @SmallTest
public void testAsyncWaiterWaitingWithTimeout() {
Core core = CoreImpl.getInstance();
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
index 84e8b64..0c7cdef 100644
--- a/mojo/common/handle_watcher.cc
+++ b/mojo/common/handle_watcher.cc
@@ -14,7 +14,9 @@
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
+#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "mojo/common/message_pump_mojo.h"
#include "mojo/common/message_pump_mojo_handler.h"
@@ -68,7 +70,10 @@ class WatcherBackend : public MessagePumpMojoHandler {
virtual ~WatcherBackend();
void StartWatching(const WatchData& data);
- void StopWatching(WatcherID watcher_id);
+
+ // Cancels a previously schedule request to start a watch. When done signals
+ // |event|.
+ void StopWatching(WatcherID watcher_id, base::WaitableEvent* event);
private:
typedef std::map<Handle, WatchData> HandleToWatchDataMap;
@@ -107,15 +112,16 @@ void WatcherBackend::StartWatching(const WatchData& data) {
data.deadline);
}
-void WatcherBackend::StopWatching(WatcherID watcher_id) {
+void WatcherBackend::StopWatching(WatcherID watcher_id,
+ base::WaitableEvent* event) {
// Because of the thread hop it is entirely possible to get here and not
// have a valid handle registered for |watcher_id|.
Handle handle;
- if (!GetMojoHandleByWatcherID(watcher_id, &handle))
- return;
-
- handle_to_data_.erase(handle);
- message_pump_mojo->RemoveHandler(handle);
+ if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
+ handle_to_data_.erase(handle);
+ message_pump_mojo->RemoveHandler(handle);
+ }
+ event->Signal();
}
void WatcherBackend::RemoveAndNotify(const Handle& handle,
@@ -153,6 +159,8 @@ void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
// WatcherThreadManager manages the background thread that listens for handles
// to be ready. All requests are handled by WatcherBackend.
+} // namespace
+
class WatcherThreadManager {
public:
~WatcherThreadManager();
@@ -208,7 +216,7 @@ WatcherID WatcherThreadManager::StartWatching(
data.message_loop = base::MessageLoopProxy::current();
DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
data.message_loop.get());
- // We outlive |thread_|, so it's safe to use Unretained() here.
+ // We own |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StartWatching,
@@ -218,12 +226,18 @@ WatcherID WatcherThreadManager::StartWatching(
}
void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
- // We outlive |thread_|, so it's safe to use Unretained() here.
+ base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ base::WaitableEvent event(true, false);
+ // We own |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StopWatching,
base::Unretained(&backend_),
- watcher_id));
+ watcher_id,
+ &event));
+
+ // We need to block until the handle is actually removed.
+ event.Wait();
}
WatcherThreadManager::WatcherThreadManager()
@@ -233,8 +247,6 @@ WatcherThreadManager::WatcherThreadManager()
thread_.StartWithOptions(thread_options);
}
-} // namespace
-
// HandleWatcher::State --------------------------------------------------------
// Represents the state of the HandleWatcher. Owns the user's callback and
diff --git a/mojo/common/handle_watcher.h b/mojo/common/handle_watcher.h
index 9fac3f5..f40ae88 100644
--- a/mojo/common/handle_watcher.h
+++ b/mojo/common/handle_watcher.h
@@ -27,6 +27,8 @@ class HandleWatcherTest;
class MOJO_COMMON_EXPORT HandleWatcher {
public:
HandleWatcher();
+
+ // The destructor implicitly stops listening. See Stop() for details.
~HandleWatcher();
// Starts listening for |handle|. This implicitly invokes Stop(). In other
@@ -41,7 +43,8 @@ class MOJO_COMMON_EXPORT HandleWatcher {
MojoDeadline deadline,
const base::Callback<void(MojoResult)>& callback);
- // Stops listening. Does nothing if not in the process of listening.
+ // Stops listening. Does nothing if not in the process of listening. Blocks
+ // until no longer listening on the handle.
void Stop();
private:
diff --git a/mojo/common/handle_watcher_unittest.cc b/mojo/common/handle_watcher_unittest.cc
index 02cc610..b734e99 100644
--- a/mojo/common/handle_watcher_unittest.cc
+++ b/mojo/common/handle_watcher_unittest.cc
@@ -9,8 +9,10 @@
#include "base/at_exit.h"
#include "base/auto_reset.h"
#include "base/bind.h"
+#include "base/memory/scoped_vector.h"
#include "base/run_loop.h"
#include "base/test/simple_test_tick_clock.h"
+#include "base/threading/thread.h"
#include "mojo/common/time_helper.h"
#include "mojo/public/cpp/system/core.h"
#include "mojo/public/cpp/test_support/test_utils.h"
@@ -336,6 +338,104 @@ TEST(HandleWatcherCleanEnvironmentTest, AbortedOnMessageLoopDestruction) {
EXPECT_EQ(MOJO_RESULT_ABORTED, result);
}
+void NeverReached(MojoResult result) {
+ FAIL() << "Callback should never be invoked " << result;
+}
+
+// Called on the main thread when a thread is done. Decrements |active_count|
+// and if |active_count| is zero quits |run_loop|.
+void StressThreadDone(base::RunLoop* run_loop, int* active_count) {
+ (*active_count)--;
+ EXPECT_GE(*active_count, 0);
+ if (*active_count == 0)
+ run_loop->Quit();
+}
+
+// See description of StressTest. This is called on the background thread.
+// |count| is the number of HandleWatchers to create. |active_count| is the
+// number of outstanding threads, |task_runner| the task runner for the main
+// thread and |run_loop| the run loop that should be quit when there are no more
+// threads running. When done StressThreadDone() is invoked on the main thread.
+// |active_count| and |run_loop| should only be used on the main thread.
+void RunStressTest(int count,
+ scoped_refptr<base::TaskRunner> task_runner,
+ base::RunLoop* run_loop,
+ int* active_count) {
+ struct TestData {
+ MessagePipe pipe;
+ HandleWatcher watcher;
+ };
+ ScopedVector<TestData> data_vector;
+ for (int i = 0; i < count; ++i) {
+ if (i % 20 == 0) {
+ // Every so often we wait. This results in some level of thread balancing
+ // as well as making sure HandleWatcher has time to actually start some
+ // watches.
+ MessagePipe test_pipe;
+ ASSERT_TRUE(test_pipe.handle0.is_valid());
+ CallbackHelper callback_helper;
+ HandleWatcher watcher;
+ callback_helper.Start(&watcher, test_pipe.handle0.get());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper.got_callback());
+ EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
+ std::string()));
+ base::MessageLoop::ScopedNestableTaskAllower scoper(
+ base::MessageLoop::current());
+ callback_helper.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper.got_callback());
+ } else {
+ scoped_ptr<TestData> test_data(new TestData);
+ ASSERT_TRUE(test_data->pipe.handle0.is_valid());
+ test_data->watcher.Start(test_data->pipe.handle0.get(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ base::Bind(&NeverReached));
+ data_vector.push_back(test_data.release());
+ }
+ if (i % 15 == 0)
+ data_vector.clear();
+ }
+ task_runner->PostTask(FROM_HERE,
+ base::Bind(&StressThreadDone, run_loop,
+ active_count));
+}
+
+// This test is meant to stress HandleWatcher. It uses from various threads
+// repeatedly starting and stopping watches. It spins up kThreadCount
+// threads. Each thread creates kWatchCount watches. Every so often each thread
+// writes to a pipe and waits for the response.
+TEST(HandleWatcherCleanEnvironmentTest, StressTest) {
+#if defined(NDEBUG)
+ const int kThreadCount = 15;
+ const int kWatchCount = 400;
+#else
+ const int kThreadCount = 10;
+ const int kWatchCount = 250;
+#endif
+
+ base::ShadowingAtExitManager at_exit;
+ base::MessageLoop message_loop;
+ base::RunLoop run_loop;
+ ScopedVector<base::Thread> threads;
+ int threads_active_counter = kThreadCount;
+ // Starts the threads first and then post the task in hopes of having more
+ // threads running at once.
+ for (int i = 0; i < kThreadCount; ++i) {
+ scoped_ptr<base::Thread> thread(new base::Thread("test thread"));
+ thread->Start();
+ threads.push_back(thread.release());
+ }
+ for (int i = 0; i < kThreadCount; ++i) {
+ threads[i]->task_runner()->PostTask(
+ FROM_HERE, base::Bind(&RunStressTest, kWatchCount,
+ message_loop.task_runner(),
+ &run_loop, &threads_active_counter));
+ }
+ run_loop.Run();
+ ASSERT_EQ(0, threads_active_counter);
+}
+
} // namespace test
} // namespace common
} // namespace mojo
diff --git a/mojo/common/message_pump_mojo.cc b/mojo/common/message_pump_mojo.cc
index 37d636b..c7903f2 100644
--- a/mojo/common/message_pump_mojo.cc
+++ b/mojo/common/message_pump_mojo.cc
@@ -157,7 +157,6 @@ void MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
switch (result) {
case MOJO_RESULT_CANCELLED:
case MOJO_RESULT_FAILED_PRECONDITION:
- case MOJO_RESULT_INVALID_ARGUMENT:
RemoveFirstInvalidHandle(wait_state);
break;
case MOJO_RESULT_DEADLINE_EXCEEDED:
@@ -190,9 +189,13 @@ void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
for (size_t i = 1; i < wait_state.handles.size(); ++i) {
const MojoResult result =
Wait(wait_state.handles[i], wait_state.wait_signals[i], 0);
- if (result == MOJO_RESULT_INVALID_ARGUMENT ||
- result == MOJO_RESULT_FAILED_PRECONDITION ||
- result == MOJO_RESULT_CANCELLED) {
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) {
+ // We should never have an invalid argument. If we do it indicates
+ // RemoveHandler() was not invoked and is likely to cause problems else
+ // where in the stack if we ignore it.
+ CHECK(false);
+ } else if (result == MOJO_RESULT_FAILED_PRECONDITION ||
+ result == MOJO_RESULT_CANCELLED) {
// Remove the handle first, this way if OnHandleError() tries to remove
// the handle our iterator isn't invalidated.
DCHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc
index 170c4d7..6665ff9 100644
--- a/mojo/public/cpp/bindings/lib/connector.cc
+++ b/mojo/public/cpp/bindings/lib/connector.cc
@@ -34,19 +34,16 @@ Connector::~Connector() {
if (destroyed_flag_)
*destroyed_flag_ = true;
- if (async_wait_id_)
- waiter_->CancelWait(async_wait_id_);
+ CancelWait();
}
void Connector::CloseMessagePipe() {
+ CancelWait();
Close(message_pipe_.Pass());
}
ScopedMessagePipeHandle Connector::PassMessagePipe() {
- if (async_wait_id_) {
- waiter_->CancelWait(async_wait_id_);
- async_wait_id_ = 0;
- }
+ CancelWait();
return message_pipe_.Pass();
}
@@ -136,6 +133,7 @@ void Connector::OnHandleReady(MojoResult result) {
}
void Connector::WaitToReadMore() {
+ MOJO_DCHECK(!async_wait_id_);
async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
MOJO_HANDLE_SIGNAL_READABLE,
MOJO_DEADLINE_INDEFINITE,
@@ -190,6 +188,14 @@ void Connector::ReadAllAvailableMessages() {
}
}
+void Connector::CancelWait() {
+ if (!async_wait_id_)
+ return;
+
+ waiter_->CancelWait(async_wait_id_);
+ async_wait_id_ = 0;
+}
+
void Connector::NotifyError() {
error_ = true;
// The error handler might destroyed |this|. Also, after an error, all method
diff --git a/mojo/public/cpp/bindings/lib/connector.h b/mojo/public/cpp/bindings/lib/connector.h
index dc0a646..e92a4e7 100644
--- a/mojo/public/cpp/bindings/lib/connector.h
+++ b/mojo/public/cpp/bindings/lib/connector.h
@@ -85,6 +85,9 @@ class Connector : public MessageReceiver {
void NotifyError();
+ // Cancels any calls made to |waiter_|.
+ void CancelWait();
+
ErrorHandler* error_handler_;
const MojoAsyncWaiter* waiter_;