summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorsky@chromium.org <sky@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-04 16:06:29 +0000
committersky@chromium.org <sky@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-04 16:06:29 +0000
commit03be76c122dee5b086208950c216824b80e5f37f (patch)
tree7fd3d017f377aa12378b497b88106d833a490652 /mojo
parent727c72757ac627a8daa197e3019693207d5ec6b1 (diff)
downloadchromium_src-03be76c122dee5b086208950c216824b80e5f37f.zip
chromium_src-03be76c122dee5b086208950c216824b80e5f37f.tar.gz
chromium_src-03be76c122dee5b086208950c216824b80e5f37f.tar.bz2
Initial cut at object to listen for handle to be ready
It creates a singleton background thread that uses MojoWaitMany() to wait for a handle to be ready. An additional communication pipe is used to wake up the thread as necessary (when set of handles changes, or to quit). Currently it only waits for ready to be read from. If we need the ability to listen for writes I can add that. BUG=none TEST=none R=darin@chromium.org Review URL: https://codereview.chromium.org/56333002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@232716 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp27
-rw-r--r--mojo/shell/app_container.cc4
-rw-r--r--mojo/shell/handle_watcher.cc400
-rw-r--r--mojo/shell/handle_watcher.h70
-rw-r--r--mojo/shell/handle_watcher_unittest.cc278
-rw-r--r--mojo/shell/scoped_message_pipe.cc25
-rw-r--r--mojo/shell/scoped_message_pipe.h34
7 files changed, 836 insertions, 2 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 6943354..3cf650f 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -142,12 +142,16 @@
'shell/app_container.h',
'shell/context.cc',
'shell/context.h',
+ 'shell/handle_watcher.cc',
+ 'shell/handle_watcher.h',
'shell/loader.cc',
'shell/loader.h',
'shell/network_delegate.cc',
'shell/network_delegate.h',
'shell/run.cc',
'shell/run.h',
+ 'shell/scoped_message_pipe.cc',
+ 'shell/scoped_message_pipe.h',
'shell/storage.cc',
'shell/storage.h',
'shell/switches.cc',
@@ -188,6 +192,29 @@
],
},
{
+ 'target_name': 'mojo_shell_unittests',
+ 'type': 'executable',
+ 'dependencies': [
+ '../base/base.gyp:base',
+ '../base/base.gyp:run_all_unittests',
+ '../testing/gtest.gyp:gtest',
+ 'mojo_shell_lib',
+ 'mojo_system',
+ ],
+ 'sources': [
+ 'shell/handle_watcher_unittest.cc',
+ 'shell/test/run_all_unittests.cc',
+ ],
+ 'conditions': [
+ ['OS == "win"', {
+ # TODO(jschuh): crbug.com/167187 fix size_t to int truncations.
+ 'msvs_disabled_warnings': [
+ 4267,
+ ],
+ }],
+ ],
+ },
+ {
'target_name': 'sample_app',
'type': 'shared_library',
'dependencies': [
diff --git a/mojo/shell/app_container.cc b/mojo/shell/app_container.cc
index de0fe2a..234f06d 100644
--- a/mojo/shell/app_container.cc
+++ b/mojo/shell/app_container.cc
@@ -55,8 +55,8 @@ completed:
}
AppContainer::AppContainer(Context* context)
- : context_(context)
- , weak_factory_(this) {
+ : context_(context),
+ weak_factory_(this) {
}
AppContainer::~AppContainer() {
diff --git a/mojo/shell/handle_watcher.cc b/mojo/shell/handle_watcher.cc
new file mode 100644
index 0000000..5352db0
--- /dev/null
+++ b/mojo/shell/handle_watcher.cc
@@ -0,0 +1,400 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/shell/handle_watcher.h"
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/lazy_instance.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/thread.h"
+#include "base/time/tick_clock.h"
+#include "base/time/time.h"
+#include "mojo/shell/scoped_message_pipe.h"
+
+namespace mojo {
+namespace shell {
+
+typedef int WatcherID;
+
+namespace {
+
+const char kWatcherThreadName[] = "handle-watcher-thread";
+
+// WatcherThreadManager --------------------------------------------------------
+
+// WatcherThreadManager manages listening for the handles. It is a singleton. It
+// spawns a thread that waits for any handle passed to StartWatching() to be
+// ready. Additionally it creates a message pipe for communication between the
+// two threads. The message pipe is used solely to wake up the background
+// thread. This happens when the set of handles changes, or during shutdown.
+class WatcherThreadManager {
+ public:
+ // Returns the shared instance.
+ static WatcherThreadManager* GetInstance();
+
+ // Starts watching the requested handle. Returns a unique ID that is used to
+ // stop watching the handle. When the handle is ready |callback| is notified
+ // on the thread StartWatching() was invoked on.
+ // This may be invoked on any thread.
+ WatcherID StartWatching(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ base::TimeTicks deadline,
+ const base::Closure& callback);
+
+ // Stops watching a handle.
+ void StopWatching(WatcherID watcher_id);
+
+ private:
+ friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
+
+ // Tracks a single request.
+ struct HandleAndCallback {
+ HandleAndCallback()
+ : handle(MOJO_HANDLE_INVALID),
+ wait_flags(MOJO_WAIT_FLAG_NONE),
+ message_loop(NULL) {}
+
+ MojoHandle handle;
+ MojoWaitFlags wait_flags;
+ base::TimeTicks deadline;
+ base::Closure callback;
+ scoped_refptr<base::MessageLoopProxy> message_loop;
+ };
+
+ // Contains the state needed for MojoWaitMany.
+ // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to
+ // pass to MojoWaitMany.
+ struct WaitState {
+ // List of ids.
+ std::vector<WatcherID> ids;
+
+ // List of handles.
+ std::vector<MojoHandle> handles;
+
+ // List of flags each handle is waiting on.
+ std::vector<MojoWaitFlags> wait_flags;
+
+ // First deadline.
+ MojoDeadline deadline;
+
+ // Set of ids whose deadline has been reached.
+ std::set<WatcherID> deadline_exceeded;
+ };
+
+ typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap;
+
+ WatcherThreadManager();
+ ~WatcherThreadManager();
+
+ // Invoked on the background thread. Runs a loop waiting on current set of
+ // handles.
+ void RunOnBackgroundThread();
+
+ // Writes to the communication pipe to wake up the background thread.
+ void SignalBackgroundThread();
+
+ // Invoked when a handle associated with |id| should be removed and notified.
+ // |result| gives the reason for removing.
+ void RemoveAndNotify(WatcherID id, MojoResult result);
+
+ // Removes all callbacks schedule for |handle|. This is used when a handle
+ // is identified as invalid.
+ void RemoveHandle(MojoHandle handle);
+
+ const MojoHandle read_handle() const { return control_pipe_.handle_0(); }
+ const MojoHandle write_handle() const { return control_pipe_.handle_1(); }
+
+ // Returns state needed for MojoWaitMany.
+ WaitState GetWaitState();
+
+ // Guards members accessed on both threads.
+ base::Lock lock_;
+
+ // Used for communicating with the background thread.
+ ScopedMessagePipe control_pipe_;
+
+ base::Thread thread_;
+
+ // Maps from assigned id to the callback.
+ IDToCallbackMap id_to_callback_;
+
+ // If true the background loop should exit.
+ bool quit_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
+};
+
+WatcherThreadManager* WatcherThreadManager::GetInstance() {
+ static base::LazyInstance<WatcherThreadManager> instance =
+ LAZY_INSTANCE_INITIALIZER;
+ return &instance.Get();
+}
+
+WatcherID WatcherThreadManager::StartWatching(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ base::TimeTicks deadline,
+ const base::Closure& callback) {
+ WatcherID id = 0;
+ {
+ static int next_id = 0;
+ base::AutoLock lock(lock_);
+ // TODO(sky): worry about overflow?
+ id = ++next_id;
+ id_to_callback_[id].handle = handle;
+ id_to_callback_[id].callback = callback;
+ id_to_callback_[id].wait_flags = wait_flags;
+ id_to_callback_[id].deadline = deadline;
+ id_to_callback_[id].message_loop = base::MessageLoopProxy::current();
+ }
+ SignalBackgroundThread();
+ return id;
+}
+
+
+void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
+ {
+ base::AutoLock lock(lock_);
+ // It's possible we've already serviced the handle but HandleWatcher hasn't
+ // processed it yet.
+ IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id);
+ if (i == id_to_callback_.end())
+ return;
+ id_to_callback_.erase(i);
+ }
+ SignalBackgroundThread();
+}
+
+WatcherThreadManager::WatcherThreadManager()
+ : thread_(kWatcherThreadName),
+ quit_(false) {
+ // TODO(sky): deal with error condition?
+ CHECK_NE(MOJO_HANDLE_INVALID, read_handle());
+ thread_.Start();
+ thread_.message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&WatcherThreadManager::RunOnBackgroundThread,
+ base::Unretained(this)));
+}
+
+WatcherThreadManager::~WatcherThreadManager() {
+ {
+ base::AutoLock lock(lock_);
+ quit_ = true;
+ }
+ SignalBackgroundThread();
+
+ thread_.Stop();
+}
+
+void WatcherThreadManager::RunOnBackgroundThread() {
+ while (true) {
+ const WaitState state = GetWaitState();
+ for (std::set<WatcherID>::const_iterator i =
+ state.deadline_exceeded.begin();
+ i != state.deadline_exceeded.end(); ++i) {
+ RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED);
+ }
+ const MojoResult result = MojoWaitMany(&state.handles.front(),
+ &state.wait_flags.front(),
+ state.handles.size(),
+ state.deadline);
+
+ if (result >= 0) {
+ DCHECK_LT(result, static_cast<int>(state.handles.size()));
+ // Last handle is used to wake us up.
+ if (result == static_cast<int>(state.handles.size()) - 1) {
+ uint32_t num_bytes = 0;
+ MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+ {
+ base::AutoLock lock(lock_);
+ if (quit_)
+ return;
+ }
+ } else {
+ RemoveAndNotify(state.ids[result], MOJO_RESULT_OK);
+ }
+ } else if (result == MOJO_RESULT_INVALID_ARGUMENT ||
+ result == MOJO_RESULT_FAILED_PRECONDITION) {
+ // One of the handles is invalid or the flags supplied is invalid, remove
+ // it.
+ // Use -1 as last handle is used for communication and should never be
+ // invalid.
+ for (size_t i = 0; i < state.handles.size() - 1; ++i) {
+ const MojoResult result =
+ MojoWait(state.handles[i], state.wait_flags[i], 0);
+ switch (result) {
+ // TODO: do we really want to notify for all these conditions?
+ case MOJO_RESULT_OK:
+ case MOJO_RESULT_FAILED_PRECONDITION:
+ case MOJO_RESULT_INVALID_ARGUMENT:
+ RemoveAndNotify(state.ids[i], result);
+ break;
+ case MOJO_RESULT_DEADLINE_EXCEEDED:
+ break;
+ default:
+ NOTREACHED();
+ }
+ }
+ }
+ }
+}
+
+void WatcherThreadManager::SignalBackgroundThread() {
+ // TODO(sky): deal with error?
+ MojoWriteMessage(write_handle(), NULL, 0, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+}
+
+void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) {
+ HandleAndCallback to_notify;
+ {
+ base::AutoLock lock(lock_);
+ IDToCallbackMap::iterator i = id_to_callback_.find(id);
+ if (i == id_to_callback_.end())
+ return;
+ to_notify = i->second;
+ id_to_callback_.erase(i);
+ }
+ to_notify.message_loop->PostTask(FROM_HERE, to_notify.callback);
+}
+
+void WatcherThreadManager::RemoveHandle(MojoHandle handle) {
+ {
+ base::AutoLock lock(lock_);
+ for (IDToCallbackMap::iterator i = id_to_callback_.begin();
+ i != id_to_callback_.end(); ) {
+ if (i->second.handle == handle) {
+ id_to_callback_.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+ }
+}
+
+WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() {
+ WaitState state;
+ const base::TimeTicks now(HandleWatcher::NowTicks());
+ base::TimeDelta deadline;
+ {
+ base::AutoLock lock(lock_);
+ for (IDToCallbackMap::const_iterator i = id_to_callback_.begin();
+ i != id_to_callback_.end(); ++i) {
+ if (!i->second.deadline.is_null()) {
+ if (i->second.deadline <= now) {
+ state.deadline_exceeded.insert(i->first);
+ continue;
+ } else {
+ const base::TimeDelta delta = i->second.deadline - now;
+ if (deadline == base::TimeDelta() || delta < deadline)
+ deadline = delta;
+ }
+ }
+ state.ids.push_back(i->first);
+ state.handles.push_back(i->second.handle);
+ state.wait_flags.push_back(i->second.wait_flags);
+ }
+ }
+ state.ids.push_back(0);
+ state.handles.push_back(read_handle());
+ state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
+ state.deadline = (deadline == base::TimeDelta()) ?
+ MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds();
+ return state;
+}
+
+} // namespace
+
+// HandleWatcher::StartState ---------------------------------------------------
+
+// Contains the information passed to Start().
+struct HandleWatcher::StartState {
+ explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
+ }
+
+ ~StartState() {
+ }
+
+ // ID assigned by WatcherThreadManager.
+ WatcherID watcher_id;
+
+ // Callback to notify when done.
+ base::Closure callback;
+
+ // When Start() is invoked a callback is passed to WatcherThreadManager
+ // using a WeakRef from |weak_refactory_|. The callback invokes
+ // OnHandleReady() (on the thread Start() is invoked from) which in turn
+ // notifies |callback_|. Doing this allows us to reset state when the handle
+ // is ready, and then notify the callback. Doing this also means Stop()
+ // cancels any pending callbacks that may be inflight.
+ base::WeakPtrFactory<HandleWatcher> weak_factory;
+};
+
+// HandleWatcher ---------------------------------------------------------------
+
+// static
+base::TickClock* HandleWatcher::tick_clock_ = NULL;
+
+HandleWatcher::HandleWatcher() {
+}
+
+HandleWatcher::~HandleWatcher() {
+ Stop();
+}
+
+void HandleWatcher::Start(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ MojoDeadline deadline,
+ const base::Closure& callback) {
+ DCHECK_NE(MOJO_HANDLE_INVALID, handle);
+ DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
+
+ Stop();
+
+ start_state_.reset(new StartState(this));
+ start_state_->callback = callback;
+ start_state_->watcher_id =
+ WatcherThreadManager::GetInstance()->StartWatching(
+ handle,
+ wait_flags,
+ MojoDeadlineToTimeTicks(deadline),
+ base::Bind(&HandleWatcher::OnHandleReady,
+ start_state_->weak_factory.GetWeakPtr()));
+}
+
+void HandleWatcher::Stop() {
+ if (!start_state_.get())
+ return;
+
+ scoped_ptr<StartState> old_state(start_state_.Pass());
+ WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
+}
+
+void HandleWatcher::OnHandleReady() {
+ DCHECK(start_state_.get());
+ scoped_ptr<StartState> old_state(start_state_.Pass());
+ old_state->callback.Run();
+}
+
+// static
+base::TimeTicks HandleWatcher::NowTicks() {
+ return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
+}
+
+// static
+base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
+ return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
+ NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
+}
+
+} // namespace shell
+} // namespace mojo
diff --git a/mojo/shell/handle_watcher.h b/mojo/shell/handle_watcher.h
new file mode 100644
index 0000000..950a145
--- /dev/null
+++ b/mojo/shell/handle_watcher.h
@@ -0,0 +1,70 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SHELL_HANDLE_WATCHER_H_
+#define MOJO_SHELL_HANDLE_WATCHER_H_
+
+#include "base/basictypes.h"
+#include "base/callback_forward.h"
+#include "base/memory/scoped_ptr.h"
+#include "mojo/public/system/core.h"
+
+namespace base {
+class Thread;
+class TickClock;
+class TimeTicks;
+}
+
+namespace mojo {
+namespace shell {
+namespace test {
+class HandleWatcherTest;
+}
+
+// HandleWatcher is used to asynchronously wait on a handle and notify a Closure
+// when the handle is ready, or the deadline has expired.
+class HandleWatcher {
+ public:
+ HandleWatcher();
+ ~HandleWatcher();
+
+ // Starts listening for |handle|. This implicitly invokes Stop(). In other
+ // words, Start() performs one asynchronous watch at a time. It is ok to call
+ // Start() multiple times, but it cancels any existing watches. |callback| is
+ // notified when the handle is ready, invalid or deadline has passed and is
+ // notified on the thread Start() was invoked on.
+ void Start(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ MojoDeadline deadline,
+ const base::Closure& callback);
+
+ // Stops listening. Does nothing if not in the process of listening.
+ void Stop();
+
+ // Returns now. Used internally; generally not useful.
+ static base::TimeTicks NowTicks();
+
+ // Converts a MojoDeadline into a TimeTicks.
+ static base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline);
+
+ private:
+ friend class test::HandleWatcherTest;
+ struct StartState;
+
+ // See description of |StartState::weak_factory| for details.
+ void OnHandleReady();
+
+ // If non-NULL Start() has been invoked.
+ scoped_ptr<StartState> start_state_;
+
+ // Used for getting the time. Only set by tests.
+ static base::TickClock* tick_clock_;
+
+ DISALLOW_COPY_AND_ASSIGN(HandleWatcher);
+};
+
+} // namespace shell
+} // namespace mojo
+
+#endif // MOJO_SHELL_HANDLE_WATCHER_H_
diff --git a/mojo/shell/handle_watcher_unittest.cc b/mojo/shell/handle_watcher_unittest.cc
new file mode 100644
index 0000000..7f70b14
--- /dev/null
+++ b/mojo/shell/handle_watcher_unittest.cc
@@ -0,0 +1,278 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/shell/handle_watcher.h"
+
+#include "base/auto_reset.h"
+#include "base/bind.h"
+#include "base/run_loop.h"
+#include "base/test/simple_test_tick_clock.h"
+#include "mojo/shell/scoped_message_pipe.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace shell {
+namespace test {
+
+MojoResult WriteToHandle(MojoHandle handle) {
+ return MojoWriteMessage(handle, NULL, 0, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+}
+
+MojoResult ReadFromHandle(MojoHandle handle) {
+ uint32_t num_bytes = 0;
+ uint32_t num_handles = 0;
+ return MojoReadMessage(handle, NULL, &num_bytes, NULL, &num_handles,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+}
+
+void RunUntilIdle() {
+ base::RunLoop run_loop;
+ run_loop.RunUntilIdle();
+}
+
+// Helper class to manage the callback and running the message loop waiting for
+// message to be received. Typical usage is something like:
+// Schedule callback returned from GetCallback().
+// RunUntilGotCallback();
+// EXPECT_TRUE(got_callback());
+// clear_callback();
+class CallbackHelper {
+ public:
+ CallbackHelper()
+ : got_callback_(false),
+ run_loop_(NULL),
+ weak_factory_(this) {}
+ ~CallbackHelper() {}
+
+ // See description above |got_callback_|.
+ bool got_callback() const { return got_callback_; }
+ void clear_callback() { got_callback_ = false; }
+
+ // Runs the current MessageLoop until the callback returned from GetCallback()
+ // is notified.
+ void RunUntilGotCallback() {
+ ASSERT_TRUE(run_loop_ == NULL);
+ base::RunLoop run_loop;
+ base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
+ run_loop.Run();
+ }
+
+ base::Closure GetCallback() {
+ return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
+ }
+
+ void Start(HandleWatcher* watcher, MojoHandle handle) {
+ watcher->Start(handle, MOJO_WAIT_FLAG_READABLE, MOJO_DEADLINE_INDEFINITE,
+ GetCallback());
+ }
+
+ private:
+ void OnCallback() {
+ got_callback_ = true;
+ if (run_loop_)
+ run_loop_->Quit();
+ }
+
+ // Set to true when the callback is called.
+ bool got_callback_;
+
+ // If non-NULL we're in RunUntilGotCallback().
+ base::RunLoop* run_loop_;
+
+ base::WeakPtrFactory<CallbackHelper> weak_factory_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
+};
+
+class HandleWatcherTest : public testing::Test {
+ public:
+ HandleWatcherTest() {}
+ virtual ~HandleWatcherTest() {
+ HandleWatcher::tick_clock_ = NULL;
+ }
+
+ protected:
+ void InstallTickClock() {
+ HandleWatcher::tick_clock_ = &tick_clock_;
+ }
+
+ base::SimpleTestTickClock tick_clock_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
+};
+
+// Trivial test case with a single handle to watch.
+TEST_F(HandleWatcherTest, SingleHandler) {
+ ScopedMessagePipe test_pipe;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe.handle_0());
+ CallbackHelper callback_helper;
+ HandleWatcher watcher;
+ callback_helper.Start(&watcher, test_pipe.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper.got_callback());
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe.handle_1()));
+ callback_helper.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper.got_callback());
+}
+
+// Creates three handles and notfies them in reverse order ensuring each one is
+// notified appropriately.
+TEST_F(HandleWatcherTest, ThreeHandles) {
+ ScopedMessagePipe test_pipe1;
+ ScopedMessagePipe test_pipe2;
+ ScopedMessagePipe test_pipe3;
+ CallbackHelper callback_helper1;
+ CallbackHelper callback_helper2;
+ CallbackHelper callback_helper3;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe1.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe2.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe3.handle_0());
+
+ HandleWatcher watcher1;
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ HandleWatcher watcher2;
+ callback_helper2.Start(&watcher2, test_pipe2.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ HandleWatcher watcher3;
+ callback_helper3.Start(&watcher3, test_pipe3.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ // Write to 3 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe3.handle_1()));
+ callback_helper3.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_TRUE(callback_helper3.got_callback());
+ callback_helper3.clear_callback();
+
+ // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
+ // running.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe3.handle_1()));
+ callback_helper1.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+ callback_helper1.clear_callback();
+
+ // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe2.handle_1()));
+ callback_helper2.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_TRUE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+}
+
+// Verifies Start() invoked a second time works.
+TEST_F(HandleWatcherTest, Restart) {
+ ScopedMessagePipe test_pipe1;
+ ScopedMessagePipe test_pipe2;
+ CallbackHelper callback_helper1;
+ CallbackHelper callback_helper2;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe1.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe2.handle_0());
+
+ HandleWatcher watcher1;
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+
+ HandleWatcher watcher2;
+ callback_helper2.Start(&watcher2, test_pipe2.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+
+ // Write to 1 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ callback_helper1.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ callback_helper1.clear_callback();
+ EXPECT_EQ(MOJO_RESULT_OK, ReadFromHandle(test_pipe1.handle_0()));
+
+ // Write to 2 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe2.handle_1()));
+ callback_helper2.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_TRUE(callback_helper2.got_callback());
+ callback_helper2.clear_callback();
+
+ // Listen on 1 again.
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+
+ // Write to 1 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ callback_helper1.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+}
+
+// Verifies deadline is honored.
+TEST_F(HandleWatcherTest, Deadline) {
+ InstallTickClock();
+
+ ScopedMessagePipe test_pipe1;
+ ScopedMessagePipe test_pipe2;
+ ScopedMessagePipe test_pipe3;
+ CallbackHelper callback_helper1;
+ CallbackHelper callback_helper2;
+ CallbackHelper callback_helper3;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe1.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe2.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe3.handle_0());
+
+ // Add a watcher with an infinite timeout.
+ HandleWatcher watcher1;
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ // Add another watcher wth a timeout of 500 microseconds.
+ HandleWatcher watcher2;
+ watcher2.Start(test_pipe2.handle_0(), MOJO_WAIT_FLAG_READABLE, 500,
+ callback_helper2.GetCallback());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ // Advance the clock passed the deadline. We also have to start another
+ // watcher to wake up the background thread.
+ tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
+
+ HandleWatcher watcher3;
+ callback_helper3.Start(&watcher3, test_pipe3.handle_0());
+
+ callback_helper2.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_TRUE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+}
+
+} // namespace test
+} // namespace shell
+} // namespace mojo
diff --git a/mojo/shell/scoped_message_pipe.cc b/mojo/shell/scoped_message_pipe.cc
new file mode 100644
index 0000000..863e140b
--- /dev/null
+++ b/mojo/shell/scoped_message_pipe.cc
@@ -0,0 +1,25 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/shell/scoped_message_pipe.h"
+
+namespace mojo {
+namespace shell {
+
+ScopedMessagePipe::ScopedMessagePipe()
+ : handle_0_(MOJO_HANDLE_INVALID),
+ handle_1_(MOJO_HANDLE_INVALID) {
+ MojoCreateMessagePipe(&handle_0_, &handle_1_);
+}
+
+ScopedMessagePipe::~ScopedMessagePipe() {
+ if (handle_0_ != MOJO_HANDLE_INVALID)
+ MojoClose(handle_0_);
+
+ if (handle_1_ != MOJO_HANDLE_INVALID)
+ MojoClose(handle_1_);
+}
+
+} // namespace shell
+} // namespace mojo
diff --git a/mojo/shell/scoped_message_pipe.h b/mojo/shell/scoped_message_pipe.h
new file mode 100644
index 0000000..7b0d84b
--- /dev/null
+++ b/mojo/shell/scoped_message_pipe.h
@@ -0,0 +1,34 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SHELL_SCOPED_MESSAGE_PIPE_H_
+#define MOJO_SHELL_SCOPED_MESSAGE_PIPE_H_
+
+#include "base/basictypes.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace shell {
+
+// Simple scoper that creates a message pipe in constructor and closes it in
+// destructor. Test for success by comparing handles with MOJO_HANDLE_INVALID.
+class ScopedMessagePipe {
+ public:
+ ScopedMessagePipe();
+ ~ScopedMessagePipe();
+
+ MojoHandle handle_0() const { return handle_0_; }
+ MojoHandle handle_1() const { return handle_1_; }
+
+ private:
+ MojoHandle handle_0_;
+ MojoHandle handle_1_;
+
+ DISALLOW_COPY_AND_ASSIGN(ScopedMessagePipe);
+};
+
+} // namespace shell
+} // namespace mojo
+
+#endif // MOJO_SHELL_SCOPED_MESSAGE_PIPE_H_