summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp27
-rw-r--r--mojo/shell/app_container.cc4
-rw-r--r--mojo/shell/handle_watcher.cc400
-rw-r--r--mojo/shell/handle_watcher.h70
-rw-r--r--mojo/shell/handle_watcher_unittest.cc278
-rw-r--r--mojo/shell/scoped_message_pipe.cc25
-rw-r--r--mojo/shell/scoped_message_pipe.h34
7 files changed, 836 insertions, 2 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 6943354..3cf650f 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -142,12 +142,16 @@
'shell/app_container.h',
'shell/context.cc',
'shell/context.h',
+ 'shell/handle_watcher.cc',
+ 'shell/handle_watcher.h',
'shell/loader.cc',
'shell/loader.h',
'shell/network_delegate.cc',
'shell/network_delegate.h',
'shell/run.cc',
'shell/run.h',
+ 'shell/scoped_message_pipe.cc',
+ 'shell/scoped_message_pipe.h',
'shell/storage.cc',
'shell/storage.h',
'shell/switches.cc',
@@ -188,6 +192,29 @@
],
},
{
+ 'target_name': 'mojo_shell_unittests',
+ 'type': 'executable',
+ 'dependencies': [
+ '../base/base.gyp:base',
+ '../base/base.gyp:run_all_unittests',
+ '../testing/gtest.gyp:gtest',
+ 'mojo_shell_lib',
+ 'mojo_system',
+ ],
+ 'sources': [
+ 'shell/handle_watcher_unittest.cc',
+ 'shell/test/run_all_unittests.cc',
+ ],
+ 'conditions': [
+ ['OS == "win"', {
+ # TODO(jschuh): crbug.com/167187 fix size_t to int truncations.
+ 'msvs_disabled_warnings': [
+ 4267,
+ ],
+ }],
+ ],
+ },
+ {
'target_name': 'sample_app',
'type': 'shared_library',
'dependencies': [
diff --git a/mojo/shell/app_container.cc b/mojo/shell/app_container.cc
index de0fe2a..234f06d 100644
--- a/mojo/shell/app_container.cc
+++ b/mojo/shell/app_container.cc
@@ -55,8 +55,8 @@ completed:
}
AppContainer::AppContainer(Context* context)
- : context_(context)
- , weak_factory_(this) {
+ : context_(context),
+ weak_factory_(this) {
}
AppContainer::~AppContainer() {
diff --git a/mojo/shell/handle_watcher.cc b/mojo/shell/handle_watcher.cc
new file mode 100644
index 0000000..5352db0
--- /dev/null
+++ b/mojo/shell/handle_watcher.cc
@@ -0,0 +1,400 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/shell/handle_watcher.h"
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/lazy_instance.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/thread.h"
+#include "base/time/tick_clock.h"
+#include "base/time/time.h"
+#include "mojo/shell/scoped_message_pipe.h"
+
+namespace mojo {
+namespace shell {
+
+typedef int WatcherID;
+
+namespace {
+
+const char kWatcherThreadName[] = "handle-watcher-thread";
+
+// WatcherThreadManager --------------------------------------------------------
+
+// WatcherThreadManager manages listening for the handles. It is a singleton. It
+// spawns a thread that waits for any handle passed to StartWatching() to be
+// ready. Additionally it creates a message pipe for communication between the
+// two threads. The message pipe is used solely to wake up the background
+// thread. This happens when the set of handles changes, or during shutdown.
+class WatcherThreadManager {
+ public:
+ // Returns the shared instance.
+ static WatcherThreadManager* GetInstance();
+
+ // Starts watching the requested handle. Returns a unique ID that is used to
+ // stop watching the handle. When the handle is ready |callback| is notified
+ // on the thread StartWatching() was invoked on.
+ // This may be invoked on any thread.
+ WatcherID StartWatching(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ base::TimeTicks deadline,
+ const base::Closure& callback);
+
+ // Stops watching a handle.
+ void StopWatching(WatcherID watcher_id);
+
+ private:
+ friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
+
+ // Tracks a single request.
+ struct HandleAndCallback {
+ HandleAndCallback()
+ : handle(MOJO_HANDLE_INVALID),
+ wait_flags(MOJO_WAIT_FLAG_NONE),
+ message_loop(NULL) {}
+
+ MojoHandle handle;
+ MojoWaitFlags wait_flags;
+ base::TimeTicks deadline;
+ base::Closure callback;
+ scoped_refptr<base::MessageLoopProxy> message_loop;
+ };
+
+ // Contains the state needed for MojoWaitMany.
+ // NOTE: |handles| and |wait_flags| are separate vectors to make it easy to
+ // pass to MojoWaitMany.
+ struct WaitState {
+ // List of ids.
+ std::vector<WatcherID> ids;
+
+ // List of handles.
+ std::vector<MojoHandle> handles;
+
+ // List of flags each handle is waiting on.
+ std::vector<MojoWaitFlags> wait_flags;
+
+ // First deadline.
+ MojoDeadline deadline;
+
+ // Set of ids whose deadline has been reached.
+ std::set<WatcherID> deadline_exceeded;
+ };
+
+ typedef std::map<WatcherID, HandleAndCallback> IDToCallbackMap;
+
+ WatcherThreadManager();
+ ~WatcherThreadManager();
+
+ // Invoked on the background thread. Runs a loop waiting on current set of
+ // handles.
+ void RunOnBackgroundThread();
+
+ // Writes to the communication pipe to wake up the background thread.
+ void SignalBackgroundThread();
+
+ // Invoked when a handle associated with |id| should be removed and notified.
+ // |result| gives the reason for removing.
+ void RemoveAndNotify(WatcherID id, MojoResult result);
+
+ // Removes all callbacks schedule for |handle|. This is used when a handle
+ // is identified as invalid.
+ void RemoveHandle(MojoHandle handle);
+
+ const MojoHandle read_handle() const { return control_pipe_.handle_0(); }
+ const MojoHandle write_handle() const { return control_pipe_.handle_1(); }
+
+ // Returns state needed for MojoWaitMany.
+ WaitState GetWaitState();
+
+ // Guards members accessed on both threads.
+ base::Lock lock_;
+
+ // Used for communicating with the background thread.
+ ScopedMessagePipe control_pipe_;
+
+ base::Thread thread_;
+
+ // Maps from assigned id to the callback.
+ IDToCallbackMap id_to_callback_;
+
+ // If true the background loop should exit.
+ bool quit_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
+};
+
+WatcherThreadManager* WatcherThreadManager::GetInstance() {
+ static base::LazyInstance<WatcherThreadManager> instance =
+ LAZY_INSTANCE_INITIALIZER;
+ return &instance.Get();
+}
+
+WatcherID WatcherThreadManager::StartWatching(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ base::TimeTicks deadline,
+ const base::Closure& callback) {
+ WatcherID id = 0;
+ {
+ static int next_id = 0;
+ base::AutoLock lock(lock_);
+ // TODO(sky): worry about overflow?
+ id = ++next_id;
+ id_to_callback_[id].handle = handle;
+ id_to_callback_[id].callback = callback;
+ id_to_callback_[id].wait_flags = wait_flags;
+ id_to_callback_[id].deadline = deadline;
+ id_to_callback_[id].message_loop = base::MessageLoopProxy::current();
+ }
+ SignalBackgroundThread();
+ return id;
+}
+
+
+void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
+ {
+ base::AutoLock lock(lock_);
+ // It's possible we've already serviced the handle but HandleWatcher hasn't
+ // processed it yet.
+ IDToCallbackMap::iterator i = id_to_callback_.find(watcher_id);
+ if (i == id_to_callback_.end())
+ return;
+ id_to_callback_.erase(i);
+ }
+ SignalBackgroundThread();
+}
+
+WatcherThreadManager::WatcherThreadManager()
+ : thread_(kWatcherThreadName),
+ quit_(false) {
+ // TODO(sky): deal with error condition?
+ CHECK_NE(MOJO_HANDLE_INVALID, read_handle());
+ thread_.Start();
+ thread_.message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&WatcherThreadManager::RunOnBackgroundThread,
+ base::Unretained(this)));
+}
+
+WatcherThreadManager::~WatcherThreadManager() {
+ {
+ base::AutoLock lock(lock_);
+ quit_ = true;
+ }
+ SignalBackgroundThread();
+
+ thread_.Stop();
+}
+
+void WatcherThreadManager::RunOnBackgroundThread() {
+ while (true) {
+ const WaitState state = GetWaitState();
+ for (std::set<WatcherID>::const_iterator i =
+ state.deadline_exceeded.begin();
+ i != state.deadline_exceeded.end(); ++i) {
+ RemoveAndNotify(*i, MOJO_RESULT_DEADLINE_EXCEEDED);
+ }
+ const MojoResult result = MojoWaitMany(&state.handles.front(),
+ &state.wait_flags.front(),
+ state.handles.size(),
+ state.deadline);
+
+ if (result >= 0) {
+ DCHECK_LT(result, static_cast<int>(state.handles.size()));
+ // Last handle is used to wake us up.
+ if (result == static_cast<int>(state.handles.size()) - 1) {
+ uint32_t num_bytes = 0;
+ MojoReadMessage(read_handle(), NULL, &num_bytes, NULL, 0,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+ {
+ base::AutoLock lock(lock_);
+ if (quit_)
+ return;
+ }
+ } else {
+ RemoveAndNotify(state.ids[result], MOJO_RESULT_OK);
+ }
+ } else if (result == MOJO_RESULT_INVALID_ARGUMENT ||
+ result == MOJO_RESULT_FAILED_PRECONDITION) {
+ // One of the handles is invalid or the flags supplied is invalid, remove
+ // it.
+ // Use -1 as last handle is used for communication and should never be
+ // invalid.
+ for (size_t i = 0; i < state.handles.size() - 1; ++i) {
+ const MojoResult result =
+ MojoWait(state.handles[i], state.wait_flags[i], 0);
+ switch (result) {
+ // TODO: do we really want to notify for all these conditions?
+ case MOJO_RESULT_OK:
+ case MOJO_RESULT_FAILED_PRECONDITION:
+ case MOJO_RESULT_INVALID_ARGUMENT:
+ RemoveAndNotify(state.ids[i], result);
+ break;
+ case MOJO_RESULT_DEADLINE_EXCEEDED:
+ break;
+ default:
+ NOTREACHED();
+ }
+ }
+ }
+ }
+}
+
+void WatcherThreadManager::SignalBackgroundThread() {
+ // TODO(sky): deal with error?
+ MojoWriteMessage(write_handle(), NULL, 0, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+}
+
+void WatcherThreadManager::RemoveAndNotify(WatcherID id, MojoResult result) {
+ HandleAndCallback to_notify;
+ {
+ base::AutoLock lock(lock_);
+ IDToCallbackMap::iterator i = id_to_callback_.find(id);
+ if (i == id_to_callback_.end())
+ return;
+ to_notify = i->second;
+ id_to_callback_.erase(i);
+ }
+ to_notify.message_loop->PostTask(FROM_HERE, to_notify.callback);
+}
+
+void WatcherThreadManager::RemoveHandle(MojoHandle handle) {
+ {
+ base::AutoLock lock(lock_);
+ for (IDToCallbackMap::iterator i = id_to_callback_.begin();
+ i != id_to_callback_.end(); ) {
+ if (i->second.handle == handle) {
+ id_to_callback_.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+ }
+}
+
+WatcherThreadManager::WaitState WatcherThreadManager::GetWaitState() {
+ WaitState state;
+ const base::TimeTicks now(HandleWatcher::NowTicks());
+ base::TimeDelta deadline;
+ {
+ base::AutoLock lock(lock_);
+ for (IDToCallbackMap::const_iterator i = id_to_callback_.begin();
+ i != id_to_callback_.end(); ++i) {
+ if (!i->second.deadline.is_null()) {
+ if (i->second.deadline <= now) {
+ state.deadline_exceeded.insert(i->first);
+ continue;
+ } else {
+ const base::TimeDelta delta = i->second.deadline - now;
+ if (deadline == base::TimeDelta() || delta < deadline)
+ deadline = delta;
+ }
+ }
+ state.ids.push_back(i->first);
+ state.handles.push_back(i->second.handle);
+ state.wait_flags.push_back(i->second.wait_flags);
+ }
+ }
+ state.ids.push_back(0);
+ state.handles.push_back(read_handle());
+ state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE);
+ state.deadline = (deadline == base::TimeDelta()) ?
+ MOJO_DEADLINE_INDEFINITE : deadline.InMicroseconds();
+ return state;
+}
+
+} // namespace
+
+// HandleWatcher::StartState ---------------------------------------------------
+
+// Contains the information passed to Start().
+struct HandleWatcher::StartState {
+ explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
+ }
+
+ ~StartState() {
+ }
+
+ // ID assigned by WatcherThreadManager.
+ WatcherID watcher_id;
+
+ // Callback to notify when done.
+ base::Closure callback;
+
+ // When Start() is invoked a callback is passed to WatcherThreadManager
+ // using a WeakRef from |weak_refactory_|. The callback invokes
+ // OnHandleReady() (on the thread Start() is invoked from) which in turn
+ // notifies |callback_|. Doing this allows us to reset state when the handle
+ // is ready, and then notify the callback. Doing this also means Stop()
+ // cancels any pending callbacks that may be inflight.
+ base::WeakPtrFactory<HandleWatcher> weak_factory;
+};
+
+// HandleWatcher ---------------------------------------------------------------
+
+// static
+base::TickClock* HandleWatcher::tick_clock_ = NULL;
+
+HandleWatcher::HandleWatcher() {
+}
+
+HandleWatcher::~HandleWatcher() {
+ Stop();
+}
+
+void HandleWatcher::Start(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ MojoDeadline deadline,
+ const base::Closure& callback) {
+ DCHECK_NE(MOJO_HANDLE_INVALID, handle);
+ DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
+
+ Stop();
+
+ start_state_.reset(new StartState(this));
+ start_state_->callback = callback;
+ start_state_->watcher_id =
+ WatcherThreadManager::GetInstance()->StartWatching(
+ handle,
+ wait_flags,
+ MojoDeadlineToTimeTicks(deadline),
+ base::Bind(&HandleWatcher::OnHandleReady,
+ start_state_->weak_factory.GetWeakPtr()));
+}
+
+void HandleWatcher::Stop() {
+ if (!start_state_.get())
+ return;
+
+ scoped_ptr<StartState> old_state(start_state_.Pass());
+ WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
+}
+
+void HandleWatcher::OnHandleReady() {
+ DCHECK(start_state_.get());
+ scoped_ptr<StartState> old_state(start_state_.Pass());
+ old_state->callback.Run();
+}
+
+// static
+base::TimeTicks HandleWatcher::NowTicks() {
+ return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
+}
+
+// static
+base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
+ return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
+ NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
+}
+
+} // namespace shell
+} // namespace mojo
diff --git a/mojo/shell/handle_watcher.h b/mojo/shell/handle_watcher.h
new file mode 100644
index 0000000..950a145
--- /dev/null
+++ b/mojo/shell/handle_watcher.h
@@ -0,0 +1,70 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SHELL_HANDLE_WATCHER_H_
+#define MOJO_SHELL_HANDLE_WATCHER_H_
+
+#include "base/basictypes.h"
+#include "base/callback_forward.h"
+#include "base/memory/scoped_ptr.h"
+#include "mojo/public/system/core.h"
+
+namespace base {
+class Thread;
+class TickClock;
+class TimeTicks;
+}
+
+namespace mojo {
+namespace shell {
+namespace test {
+class HandleWatcherTest;
+}
+
+// HandleWatcher is used to asynchronously wait on a handle and notify a Closure
+// when the handle is ready, or the deadline has expired.
+class HandleWatcher {
+ public:
+ HandleWatcher();
+ ~HandleWatcher();
+
+ // Starts listening for |handle|. This implicitly invokes Stop(). In other
+ // words, Start() performs one asynchronous watch at a time. It is ok to call
+ // Start() multiple times, but it cancels any existing watches. |callback| is
+ // notified when the handle is ready, invalid or deadline has passed and is
+ // notified on the thread Start() was invoked on.
+ void Start(MojoHandle handle,
+ MojoWaitFlags wait_flags,
+ MojoDeadline deadline,
+ const base::Closure& callback);
+
+ // Stops listening. Does nothing if not in the process of listening.
+ void Stop();
+
+ // Returns now. Used internally; generally not useful.
+ static base::TimeTicks NowTicks();
+
+ // Converts a MojoDeadline into a TimeTicks.
+ static base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline);
+
+ private:
+ friend class test::HandleWatcherTest;
+ struct StartState;
+
+ // See description of |StartState::weak_factory| for details.
+ void OnHandleReady();
+
+ // If non-NULL Start() has been invoked.
+ scoped_ptr<StartState> start_state_;
+
+ // Used for getting the time. Only set by tests.
+ static base::TickClock* tick_clock_;
+
+ DISALLOW_COPY_AND_ASSIGN(HandleWatcher);
+};
+
+} // namespace shell
+} // namespace mojo
+
+#endif // MOJO_SHELL_HANDLE_WATCHER_H_
diff --git a/mojo/shell/handle_watcher_unittest.cc b/mojo/shell/handle_watcher_unittest.cc
new file mode 100644
index 0000000..7f70b14
--- /dev/null
+++ b/mojo/shell/handle_watcher_unittest.cc
@@ -0,0 +1,278 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/shell/handle_watcher.h"
+
+#include "base/auto_reset.h"
+#include "base/bind.h"
+#include "base/run_loop.h"
+#include "base/test/simple_test_tick_clock.h"
+#include "mojo/shell/scoped_message_pipe.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace shell {
+namespace test {
+
+MojoResult WriteToHandle(MojoHandle handle) {
+ return MojoWriteMessage(handle, NULL, 0, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+}
+
+MojoResult ReadFromHandle(MojoHandle handle) {
+ uint32_t num_bytes = 0;
+ uint32_t num_handles = 0;
+ return MojoReadMessage(handle, NULL, &num_bytes, NULL, &num_handles,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
+}
+
+void RunUntilIdle() {
+ base::RunLoop run_loop;
+ run_loop.RunUntilIdle();
+}
+
+// Helper class to manage the callback and running the message loop waiting for
+// message to be received. Typical usage is something like:
+// Schedule callback returned from GetCallback().
+// RunUntilGotCallback();
+// EXPECT_TRUE(got_callback());
+// clear_callback();
+class CallbackHelper {
+ public:
+ CallbackHelper()
+ : got_callback_(false),
+ run_loop_(NULL),
+ weak_factory_(this) {}
+ ~CallbackHelper() {}
+
+ // See description above |got_callback_|.
+ bool got_callback() const { return got_callback_; }
+ void clear_callback() { got_callback_ = false; }
+
+ // Runs the current MessageLoop until the callback returned from GetCallback()
+ // is notified.
+ void RunUntilGotCallback() {
+ ASSERT_TRUE(run_loop_ == NULL);
+ base::RunLoop run_loop;
+ base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
+ run_loop.Run();
+ }
+
+ base::Closure GetCallback() {
+ return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
+ }
+
+ void Start(HandleWatcher* watcher, MojoHandle handle) {
+ watcher->Start(handle, MOJO_WAIT_FLAG_READABLE, MOJO_DEADLINE_INDEFINITE,
+ GetCallback());
+ }
+
+ private:
+ void OnCallback() {
+ got_callback_ = true;
+ if (run_loop_)
+ run_loop_->Quit();
+ }
+
+ // Set to true when the callback is called.
+ bool got_callback_;
+
+ // If non-NULL we're in RunUntilGotCallback().
+ base::RunLoop* run_loop_;
+
+ base::WeakPtrFactory<CallbackHelper> weak_factory_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
+};
+
+class HandleWatcherTest : public testing::Test {
+ public:
+ HandleWatcherTest() {}
+ virtual ~HandleWatcherTest() {
+ HandleWatcher::tick_clock_ = NULL;
+ }
+
+ protected:
+ void InstallTickClock() {
+ HandleWatcher::tick_clock_ = &tick_clock_;
+ }
+
+ base::SimpleTestTickClock tick_clock_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
+};
+
+// Trivial test case with a single handle to watch.
+TEST_F(HandleWatcherTest, SingleHandler) {
+ ScopedMessagePipe test_pipe;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe.handle_0());
+ CallbackHelper callback_helper;
+ HandleWatcher watcher;
+ callback_helper.Start(&watcher, test_pipe.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper.got_callback());
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe.handle_1()));
+ callback_helper.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper.got_callback());
+}
+
+// Creates three handles and notfies them in reverse order ensuring each one is
+// notified appropriately.
+TEST_F(HandleWatcherTest, ThreeHandles) {
+ ScopedMessagePipe test_pipe1;
+ ScopedMessagePipe test_pipe2;
+ ScopedMessagePipe test_pipe3;
+ CallbackHelper callback_helper1;
+ CallbackHelper callback_helper2;
+ CallbackHelper callback_helper3;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe1.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe2.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe3.handle_0());
+
+ HandleWatcher watcher1;
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ HandleWatcher watcher2;
+ callback_helper2.Start(&watcher2, test_pipe2.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ HandleWatcher watcher3;
+ callback_helper3.Start(&watcher3, test_pipe3.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ // Write to 3 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe3.handle_1()));
+ callback_helper3.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_TRUE(callback_helper3.got_callback());
+ callback_helper3.clear_callback();
+
+ // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
+ // running.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe3.handle_1()));
+ callback_helper1.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+ callback_helper1.clear_callback();
+
+ // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe2.handle_1()));
+ callback_helper2.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_TRUE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+}
+
+// Verifies Start() invoked a second time works.
+TEST_F(HandleWatcherTest, Restart) {
+ ScopedMessagePipe test_pipe1;
+ ScopedMessagePipe test_pipe2;
+ CallbackHelper callback_helper1;
+ CallbackHelper callback_helper2;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe1.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe2.handle_0());
+
+ HandleWatcher watcher1;
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+
+ HandleWatcher watcher2;
+ callback_helper2.Start(&watcher2, test_pipe2.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+
+ // Write to 1 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ callback_helper1.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ callback_helper1.clear_callback();
+ EXPECT_EQ(MOJO_RESULT_OK, ReadFromHandle(test_pipe1.handle_0()));
+
+ // Write to 2 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe2.handle_1()));
+ callback_helper2.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_TRUE(callback_helper2.got_callback());
+ callback_helper2.clear_callback();
+
+ // Listen on 1 again.
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+
+ // Write to 1 and make sure it's notified.
+ EXPECT_EQ(MOJO_RESULT_OK, WriteToHandle(test_pipe1.handle_1()));
+ callback_helper1.RunUntilGotCallback();
+ EXPECT_TRUE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+}
+
+// Verifies deadline is honored.
+TEST_F(HandleWatcherTest, Deadline) {
+ InstallTickClock();
+
+ ScopedMessagePipe test_pipe1;
+ ScopedMessagePipe test_pipe2;
+ ScopedMessagePipe test_pipe3;
+ CallbackHelper callback_helper1;
+ CallbackHelper callback_helper2;
+ CallbackHelper callback_helper3;
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe1.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe2.handle_0());
+ ASSERT_NE(MOJO_HANDLE_INVALID, test_pipe3.handle_0());
+
+ // Add a watcher with an infinite timeout.
+ HandleWatcher watcher1;
+ callback_helper1.Start(&watcher1, test_pipe1.handle_0());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ // Add another watcher wth a timeout of 500 microseconds.
+ HandleWatcher watcher2;
+ watcher2.Start(test_pipe2.handle_0(), MOJO_WAIT_FLAG_READABLE, 500,
+ callback_helper2.GetCallback());
+ RunUntilIdle();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_FALSE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+
+ // Advance the clock passed the deadline. We also have to start another
+ // watcher to wake up the background thread.
+ tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
+
+ HandleWatcher watcher3;
+ callback_helper3.Start(&watcher3, test_pipe3.handle_0());
+
+ callback_helper2.RunUntilGotCallback();
+ EXPECT_FALSE(callback_helper1.got_callback());
+ EXPECT_TRUE(callback_helper2.got_callback());
+ EXPECT_FALSE(callback_helper3.got_callback());
+}
+
+} // namespace test
+} // namespace shell
+} // namespace mojo
diff --git a/mojo/shell/scoped_message_pipe.cc b/mojo/shell/scoped_message_pipe.cc
new file mode 100644
index 0000000..863e140b
--- /dev/null
+++ b/mojo/shell/scoped_message_pipe.cc
@@ -0,0 +1,25 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/shell/scoped_message_pipe.h"
+
+namespace mojo {
+namespace shell {
+
+ScopedMessagePipe::ScopedMessagePipe()
+ : handle_0_(MOJO_HANDLE_INVALID),
+ handle_1_(MOJO_HANDLE_INVALID) {
+ MojoCreateMessagePipe(&handle_0_, &handle_1_);
+}
+
+ScopedMessagePipe::~ScopedMessagePipe() {
+ if (handle_0_ != MOJO_HANDLE_INVALID)
+ MojoClose(handle_0_);
+
+ if (handle_1_ != MOJO_HANDLE_INVALID)
+ MojoClose(handle_1_);
+}
+
+} // namespace shell
+} // namespace mojo
diff --git a/mojo/shell/scoped_message_pipe.h b/mojo/shell/scoped_message_pipe.h
new file mode 100644
index 0000000..7b0d84b
--- /dev/null
+++ b/mojo/shell/scoped_message_pipe.h
@@ -0,0 +1,34 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SHELL_SCOPED_MESSAGE_PIPE_H_
+#define MOJO_SHELL_SCOPED_MESSAGE_PIPE_H_
+
+#include "base/basictypes.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace shell {
+
+// Simple scoper that creates a message pipe in constructor and closes it in
+// destructor. Test for success by comparing handles with MOJO_HANDLE_INVALID.
+class ScopedMessagePipe {
+ public:
+ ScopedMessagePipe();
+ ~ScopedMessagePipe();
+
+ MojoHandle handle_0() const { return handle_0_; }
+ MojoHandle handle_1() const { return handle_1_; }
+
+ private:
+ MojoHandle handle_0_;
+ MojoHandle handle_1_;
+
+ DISALLOW_COPY_AND_ASSIGN(ScopedMessagePipe);
+};
+
+} // namespace shell
+} // namespace mojo
+
+#endif // MOJO_SHELL_SCOPED_MESSAGE_PIPE_H_