summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-09-28 00:30:04 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-09-28 00:30:04 +0000
commit3d58663b00b219889b298b233e0c0e0f80596ed7 (patch)
tree9a4bdf0e0f48f1af7a083924c2711af9cc7310d1 /mojo
parent82ce871c06a72ffb38f3ddcac1090a2ceed266fb (diff)
downloadchromium_src-3d58663b00b219889b298b233e0c0e0f80596ed7.zip
chromium_src-3d58663b00b219889b298b233e0c0e0f80596ed7.tar.gz
chromium_src-3d58663b00b219889b298b233e0c0e0f80596ed7.tar.bz2
Initial in-process implementation of some Mojo primitives.
This has an initial in-process implementation of the most basic Mojo primitives: - MojoClose() - MojoWait() - MojoWaitMany() - MojoCreateMessagePipe() - MojoWriteMessage() - MojoReadMessage() R=darin@chromium.org Committed: https://src.chromium.org/viewvc/chrome?view=rev&revision=225801 Review URL: https://codereview.chromium.org/23621056 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@225821 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp106
-rw-r--r--mojo/public/system/core.h295
-rw-r--r--mojo/public/tests/system_core_perftest.cc99
-rw-r--r--mojo/public/tests/system_core_unittest.cc105
-rw-r--r--mojo/public/tests/test_support.cc48
-rw-r--r--mojo/public/tests/test_support.h36
-rw-r--r--mojo/system/core.cc53
-rw-r--r--mojo/system/core_impl.cc275
-rw-r--r--mojo/system/core_impl.h104
-rw-r--r--mojo/system/core_impl_unittest.cc245
-rw-r--r--mojo/system/core_test_base.cc227
-rw-r--r--mojo/system/core_test_base.h87
-rw-r--r--mojo/system/dispatcher.cc131
-rw-r--r--mojo/system/dispatcher.h106
-rw-r--r--mojo/system/dispatcher_unittest.cc189
-rw-r--r--mojo/system/limits.h25
-rw-r--r--mojo/system/memory.cc26
-rw-r--r--mojo/system/memory.h27
-rw-r--r--mojo/system/message_pipe.cc248
-rw-r--r--mojo/system/message_pipe.h79
-rw-r--r--mojo/system/message_pipe_dispatcher.cc77
-rw-r--r--mojo/system/message_pipe_dispatcher.h61
-rw-r--r--mojo/system/message_pipe_dispatcher_unittest.cc539
-rw-r--r--mojo/system/message_pipe_unittest.cc540
-rw-r--r--mojo/system/simple_dispatcher.cc49
-rw-r--r--mojo/system/simple_dispatcher.h58
-rw-r--r--mojo/system/simple_dispatcher_unittest.cc514
-rw-r--r--mojo/system/test_utils.h38
-rw-r--r--mojo/system/waiter.cc80
-rw-r--r--mojo/system/waiter.h57
-rw-r--r--mojo/system/waiter_list.cc57
-rw-r--r--mojo/system/waiter_list.h55
-rw-r--r--mojo/system/waiter_list_unittest.cc266
-rw-r--r--mojo/system/waiter_test_utils.cc63
-rw-r--r--mojo/system/waiter_test_utils.h95
-rw-r--r--mojo/system/waiter_unittest.cc285
36 files changed, 5344 insertions, 1 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 2cd3d5c..567452f 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -4,7 +4,7 @@
{
'variables': {
- 'chromium_code': 1, # Use higher warning level.
+ 'chromium_code': 1,
},
'target_defaults': {
'defines': ['MOJO_IMPLEMENTATION'],
@@ -13,6 +13,110 @@
{
'target_name': 'mojo',
'type': 'none',
+ 'dependencies': [
+ 'mojo_public_test_support',
+ 'mojo_public_unittests',
+ 'mojo_public_perftests',
+ 'mojo_system',
+ 'mojo_system_unittests',
+ ],
+ },
+ {
+ 'target_name': 'mojo_public_test_support',
+ 'type': 'static_library',
+ 'dependencies': [
+ '../base/base.gyp:base',
+ '../testing/gtest.gyp:gtest',
+ 'mojo_system',
+ ],
+ 'sources': [
+ 'public/tests/test_support.cc',
+ 'public/tests/test_support.h',
+ ],
+ },
+ {
+ 'target_name': 'mojo_public_unittests',
+ 'type': 'executable',
+ 'dependencies': [
+ '../base/base.gyp:run_all_unittests',
+ '../testing/gtest.gyp:gtest',
+ 'mojo_public_test_support',
+ 'mojo_system',
+ ],
+ 'sources': [
+ 'public/tests/system_core_unittest.cc',
+ ],
+ },
+ {
+ 'target_name': 'mojo_public_perftests',
+ 'type': 'executable',
+ 'dependencies': [
+ '../base/base.gyp:base',
+ '../base/base.gyp:test_support_perf',
+ '../testing/gtest.gyp:gtest',
+ 'mojo_public_test_support',
+ 'mojo_system',
+ ],
+ 'sources': [
+ 'public/tests/system_core_perftest.cc',
+ ],
+ },
+ {
+ 'target_name': 'mojo_system',
+ # TODO(vtl): This should probably be '<(component)'; make it work.
+ 'type': 'static_library',
+ 'dependencies': [
+ '../base/base.gyp:base',
+ ],
+ 'sources': [
+ 'public/system/core.h',
+ 'system/core.cc',
+ 'system/core_impl.cc',
+ 'system/core_impl.h',
+ 'system/dispatcher.cc',
+ 'system/dispatcher.h',
+ 'system/limits.h',
+ 'system/memory.cc',
+ 'system/memory.h',
+ 'system/message_pipe.cc',
+ 'system/message_pipe.h',
+ 'system/message_pipe_dispatcher.cc',
+ 'system/message_pipe_dispatcher.h',
+ 'system/simple_dispatcher.cc',
+ 'system/simple_dispatcher.h',
+ 'system/waiter.cc',
+ 'system/waiter.h',
+ 'system/waiter_list.cc',
+ 'system/waiter_list.h',
+ ],
+ 'direct_dependent_settings': {
+ 'include_dirs': [
+ '..',
+ ],
+ },
+ },
+ {
+ 'target_name': 'mojo_system_unittests',
+ 'type': 'executable',
+ 'dependencies': [
+ '../base/base.gyp:run_all_unittests',
+ '../testing/gtest.gyp:gtest',
+ 'mojo_system',
+ ],
+ 'sources': [
+ 'system/core_impl_unittest.cc',
+ 'system/core_test_base.cc',
+ 'system/core_test_base.h',
+ 'system/dispatcher_unittest.cc',
+ 'system/message_pipe_dispatcher_unittest.cc',
+ 'system/message_pipe_unittest.cc',
+ 'system/simple_dispatcher_unittest.cc',
+ 'system/test_utils.h',
+ 'system/waiter_list_unittest.cc',
+ 'system/waiter_test_utils.cc',
+ 'system/waiter_test_utils.h',
+ 'system/waiter_unittest.cc',
+ ],
},
],
}
diff --git a/mojo/public/system/core.h b/mojo/public/system/core.h
index 9a75c41..cb405ae 100644
--- a/mojo/public/system/core.h
+++ b/mojo/public/system/core.h
@@ -5,15 +5,310 @@
#ifndef MOJO_PUBLIC_SYSTEM_CORE_H_
#define MOJO_PUBLIC_SYSTEM_CORE_H_
+// Note: This header should be compilable as C.
+
#include <stdint.h>
+// Types -----------------------------------------------------------------------
+
+// TODO(vtl): Notes: Use of undefined flags will lead to undefined behavior
+// (typically they'll be ignored), not necessarily an error.
+
+// Handles to Mojo objects.
typedef uint32_t MojoHandle;
+// Result codes for Mojo operations. Non-negative values are success codes;
+// negative values are error/failure codes.
+typedef int32_t MojoResult;
+
+// Used to specify deadlines (timeouts), in microseconds. Note that
+// |MOJO_DEADLINE_INDEFINITE| (-1) means "forever".
+typedef uint64_t MojoDeadline;
+
+// Used to specify the state of a handle to wait on (e.g., the ability to read
+// or write to it).
+typedef uint32_t MojoWaitFlags;
+
+// Used to specify different modes to |MojoWriteMessage()|.
+typedef uint32_t MojoWriteMessageFlags;
+
+// Used to specify different modes to |MojoReadMessage()|.
+typedef uint32_t MojoReadMessageFlags;
+
+// Constants -------------------------------------------------------------------
+
+// |MojoHandle|:
+// |MOJO_HANDLE_INVALID| - A value that is never a valid handle.
+#ifdef __cplusplus
+const MojoHandle MOJO_HANDLE_INVALID = 0;
+#else
+#define MOJO_HANDLE_INVALID ((MojoHandle) 0)
+#endif
+
+// |MojoResult|:
+// |MOJO_RESULT_OK| - Not an error; returned on success. Note that positive
+// |MojoResult|s may also be used to indicate success.
+// |MOJO_RESULT_CANCELLED| - Operation was cancelled, typically by the
+// caller.
+// |MOJO_RESULT_UNKNOWN| - Unknown error (e.g., if not enough information is
+// available for a more specific error).
+// |MOJO_RESULT_INVALID_ARGUMENT| - Caller specified an invalid argument.
+// This differs from |MOJO_RESULT_FAILED_PRECONDITION| in that the former
+// indicates arguments that are invalid regardless of the state of the
+// system.
+// |MOJO_RESULT_DEADLINE_EXCEEDED| - Deadline expired before the operation
+// could complete.
+// |MOJO_RESULT_NOT_FOUND| - Some requested entity was not found (i.e., does
+// not exist).
+// |MOJO_RESULT_ALREADY_EXISTS| - Some entity or condition that we attempted
+// to create already exists.
+// |MOJO_RESULT_PERMISSION_DENIED| - The caller does not have permission to
+// for the operation (use |MOJO_RESULT_RESOURCE_EXHAUSTED| for rejections
+// caused by exhausting some resource instead).
+// |MOJO_RESULT_RESOURCE_EXHAUSTED| - Some resource required for the call
+// (possibly some quota) has been exhausted.
+// |MOJO_RESULT_FAILED_PRECONDITION| - The system is not in a state required
+// for the operation (use this if the caller must do something to rectify
+// the state before retrying).
+// |MOJO_RESULT_ABORTED| - The operation was aborted by the system, possibly
+// due to a concurrency issue (use this if the caller may retry at a
+// higher level).
+// |MOJO_RESULT_OUT_OF_RANGE| - The operation was attempted past the valid
+// range. Unlike |MOJO_RESULT_INVALID_ARGUMENT|, this indicates that the
+// operation may be/become valid depending on the system state. (This
+// error is similar to |MOJO_RESULT_FAILED_PRECONDITION|, but is more
+// specific.)
+// |MOJO_RESULT_UNIMPLEMENTED| - The operation is not implemented, supported,
+// or enabled.
+// |MOJO_RESULT_INTERNAL| - Internal error: this should never happen and
+// indicates that some invariant expected by the system has been broken.
+// |MOJO_RESULT_UNAVAILABLE| - The operation is (temporarily) currently
+// unavailable. The caller may simply retry the operation (possibly with
+// a backoff).
+// |MOJO_RESULT_DATA_LOSS| - Unrecoverable data loss or corruption.
+//
+// Note that positive values are also available as success codes.
+//
+// The codes from |MOJO_RESULT_OK| to |MOJO_RESULT_DATA_LOSS| come from
+// Google3's canonical error codes.
+#ifdef __cplusplus
+const MojoResult MOJO_RESULT_OK = 0;
+const MojoResult MOJO_RESULT_CANCELLED = -1;
+const MojoResult MOJO_RESULT_UNKNOWN = -2;
+const MojoResult MOJO_RESULT_INVALID_ARGUMENT = -3;
+const MojoResult MOJO_RESULT_DEADLINE_EXCEEDED = -4;
+const MojoResult MOJO_RESULT_NOT_FOUND = -5;
+const MojoResult MOJO_RESULT_ALREADY_EXISTS = -6;
+const MojoResult MOJO_RESULT_PERMISSION_DENIED = -7;
+const MojoResult MOJO_RESULT_RESOURCE_EXHAUSTED = -8;
+const MojoResult MOJO_RESULT_FAILED_PRECONDITION = -9;
+const MojoResult MOJO_RESULT_ABORTED = -10;
+const MojoResult MOJO_RESULT_OUT_OF_RANGE = -11;
+const MojoResult MOJO_RESULT_UNIMPLEMENTED = -12;
+const MojoResult MOJO_RESULT_INTERNAL = -13;
+const MojoResult MOJO_RESULT_UNAVAILABLE = -14;
+const MojoResult MOJO_RESULT_DATA_LOSS = -15;
+#else
+#define MOJO_RESULT_OK ((MojoResult) 0)
+#define MOJO_RESULT_CANCELLED ((MojoResult) -1)
+#define MOJO_RESULT_UNKNOWN ((MojoResult) -2)
+#define MOJO_RESULT_INVALID_ARGUMENT ((MojoResult) -3)
+#define MOJO_RESULT_DEADLINE_EXCEEDED ((MojoResult) -4)
+#define MOJO_RESULT_NOT_FOUND ((MojoResult) -5)
+#define MOJO_RESULT_ALREADY_EXISTS ((MojoResult) -6)
+#define MOJO_RESULT_PERMISSION_DENIED ((MojoResult) -7)
+#define MOJO_RESULT_RESOURCE_EXHAUSTED ((MojoResult) -8)
+#define MOJO_RESULT_FAILED_PRECONDITION ((MojoResult) -9)
+#define MOJO_RESULT_ABORTED ((MojoResult) -10)
+#define MOJO_RESULT_OUT_OF_RANGE ((MojoResult) -11)
+#define MOJO_RESULT_UNIMPLEMENTED ((MojoResult) -12)
+#define MOJO_RESULT_INTERNAL ((MojoResult) -13)
+#define MOJO_RESULT_UNAVAILABLE ((MojoResult) -14)
+#define MOJO_RESULT_DATA_LOSS ((MojoResult) -15)
+#endif
+
+// |MojoDeadline|:
+// |MOJO_DEADLINE_INDEFINITE|
#ifdef __cplusplus
+const MojoDeadline MOJO_DEADLINE_INDEFINITE = static_cast<MojoDeadline>(-1);
+#else
+#define MOJO_DEADLINE_INDEFINITE = ((MojoDeadline) -1);
+#endif
+
+// |MojoWaitFlags|:
+// |MOJO_WAIT_FLAG_NONE| - No flags. |MojoWait()|, etc. will return
+// |MOJO_RESULT_FAILED_PRECONDITION| if you attempt to wait on this.
+// |MOJO_WAIT_FLAG_READABLE| - Can read (e.g., a message) from the handle.
+// |MOJO_WAIT_FLAG_WRITABLE| - Can write (e.g., a message) to the handle.
+// |MOJO_WAIT_FLAG_EVERYTHING| - All flags.
+#ifdef __cplusplus
+const MojoWaitFlags MOJO_WAIT_FLAG_NONE = 0;
+const MojoWaitFlags MOJO_WAIT_FLAG_READABLE = 1 << 0;
+const MojoWaitFlags MOJO_WAIT_FLAG_WRITABLE = 1 << 1;
+const MojoWaitFlags MOJO_WAIT_FLAG_EVERYTHING = ~0;
+#else
+#define MOJO_WAIT_FLAG_NONE ((MojoWaitFlags) 0)
+#define MOJO_WAIT_FLAG_READABLE ((MojoWaitFlags) 1 << 0)
+#define MOJO_WAIT_FLAG_WRITABLE ((MojoWaitFlags) 1 << 1)
+#define MOJO_WAIT_FLAG_EVERYTHING (~((MojoWaitFlags) 0))
+#endif
+
+// |MojoWriteMessageFlags|:
+// |MOJO_WRITE_MESSAGE_FLAG_NONE| - No flags; default mode.
+#ifdef __cplusplus
+const MojoWriteMessageFlags MOJO_WRITE_MESSAGE_FLAG_NONE = 0;
+#else
+#define MOJO_WRITE_MESSAGE_FLAG_NONE ((MojoWriteMessageFlags) 0)
+#endif
+
+// |MojoReadMessageFlags|:
+// |MOJO_READ_MESSAGE_FLAG_NONE| - No flags; default mode.
+// |MOJO_READ_MESSAGE_FLAG_MAY_DISCARD| - If the message is unable to be read
+// for whatever reason (e.g., the caller-supplied buffer is too small),
+// discard the message (i.e., simply dequeue it).
+#ifdef __cplusplus
+const MojoReadMessageFlags MOJO_READ_MESSAGE_FLAG_NONE = 0;
+const MojoReadMessageFlags MOJO_READ_MESSAGE_FLAG_MAY_DISCARD = 1 << 0;
+#else
+#define MOJO_READ_MESSAGE_FLAG_NONE ((MojoReadMessageFlags) 0)
+#define MOJO_READ_MESSAGE_FLAG_MAY_DISCARD ((MojoReadMessageFlags) 1 << 0)
+#endif
+
+// Functions -------------------------------------------------------------------
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// Closes the given |handle|.
+//
+// Returns:
+// |MOJO_RESULT_OK| on success.
+// |MOJO_RESULT_INVALID_ARGUMENT| if |handle| is not a valid handle.
+//
+// Concurrent operations on |handle| may succeed (or fail as usual) if they
+// happen before the close, be cancelled with result |MOJO_RESULT_CANCELLED| if
+// they properly overlap (this is likely the case with |MojoWait()|, etc.), or
+// fail with |MOJO_RESULT_INVALID_ARGUMENT| if they happen after.
+MojoResult MojoClose(MojoHandle handle);
+
+// Waits on the given handle until the state indicated by |flags| is satisfied
+// or until |deadline| has passed.
+//
+// Returns:
+// |MOJO_RESULT_OK| if some flag in |flags| was satisfied (or is already
+// satisfied).
+// |MOJO_RESULT_INVALID_ARGUMENT| if |handle| is not a valid handle (e.g., if
+// it has already been closed).
+// |MOJO_RESULT_DEADLINE_EXCEEDED| if the deadline has passed without any of
+// the flags being satisfied.
+// |MOJO_RESULT_FAILED_PRECONDITION| if it is or becomes impossible that any
+// flag in |flags| will ever be satisfied.
+//
+// If there are multiple waiters (on different threads, obviously) waiting on
+// the same handle and flag and that flag becomes set, all waiters will be
+// awoken.
+MojoResult MojoWait(MojoHandle handle,
+ MojoWaitFlags flags,
+ MojoDeadline deadline);
+
+// Waits on |handles[0]|, ..., |handles[num_handles-1]| for at least one of them
+// to satisfy the state indicated by |flags[0]|, ..., |flags[num_handles-1]|,
+// respectively, or until |deadline| has passed.
+//
+// Returns:
+// The index |i| (from 0 to |num_handles-1|) if |handle[i]| satisfies
+// |flags[i]|.
+// |MOJO_RESULT_INVALID_ARGUMENT| if some |handle[i]| is not a valid handle
+// (e.g., if it has already been closed).
+// |MOJO_RESULT_DEADLINE_EXCEEDED| if the deadline has passed without any of
+// handles satisfying any of its flags.
+// |MOJO_RESULT_FAILED_PRECONDITION| if it is or becomes impossible that SOME
+// |handle[i]| will ever satisfy any of its flags |flags[i]|.
+MojoResult MojoWaitMany(const MojoHandle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline);
+
+// TODO(vtl): flags? other params (e.g., queue sizes, max message sizes?)
+MojoResult MojoCreateMessagePipe(MojoHandle* handle_0, MojoHandle* handle_1);
+
+MojoResult MojoWriteMessage(MojoHandle handle,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags);
+
+MojoResult MojoReadMessage(MojoHandle handle,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+// C++ wrapper functions -------------------------------------------------------
+
+#ifdef __cplusplus
+
namespace mojo {
+// Used to assert things at compile time. (Use our own copy instead of
+// Chromium's, since we can't depend on Chromium.)
+template <bool> struct CompileAssert {};
+#define MOJO_COMPILE_ASSERT(expr, msg) \
+ typedef CompileAssert<(bool(expr))> msg[bool(expr) ? 1 : -1]
+
struct Handle { MojoHandle value; };
+const Handle kInvalidHandle = { MOJO_HANDLE_INVALID };
+
+// A |mojo::Handle| must take no extra space, since we'll treat arrays of them
+// as if they were arrays of |MojoHandle|s.
+MOJO_COMPILE_ASSERT(sizeof(Handle) == sizeof(MojoHandle),
+ bad_size_for_cplusplus_handle);
+
+inline MojoResult Close(Handle handle) {
+ return MojoClose(handle.value);
+}
+
+inline MojoResult Wait(Handle handle,
+ MojoWaitFlags flags,
+ MojoDeadline deadline) {
+ return MojoWait(handle.value, flags, deadline);
+}
+
+inline MojoResult WaitMany(const Handle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline) {
+ return MojoWaitMany(&handles[0].value, flags, num_handles, deadline);
+}
+
+inline MojoResult CreateMessagePipe(Handle* handle_0, Handle* handle_1) {
+ return MojoCreateMessagePipe(&handle_0->value, &handle_1->value);
+}
+
+inline MojoResult WriteMessage(Handle handle,
+ const void* bytes, uint32_t num_bytes,
+ const Handle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags) {
+ return MojoWriteMessage(handle.value,
+ bytes, num_bytes,
+ &handles[0].value, num_handles,
+ flags);
+}
+
+inline MojoResult ReadMessage(Handle handle,
+ void* bytes, uint32_t* num_bytes,
+ Handle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ return MojoReadMessage(handle.value,
+ bytes, num_bytes,
+ &handles[0].value, num_handles,
+ flags);
+}
+
} // namespace mojo
#endif
diff --git a/mojo/public/tests/system_core_perftest.cc b/mojo/public/tests/system_core_perftest.cc
new file mode 100644
index 0000000..2e9fef2
--- /dev/null
+++ b/mojo/public/tests/system_core_perftest.cc
@@ -0,0 +1,99 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/system/core.h"
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/logging.h"
+#include "mojo/public/tests/test_support.h"
+
+namespace mojo {
+namespace {
+
+class SystemPerftest : public test::TestBase {
+ public:
+ SystemPerftest() {}
+ virtual ~SystemPerftest() {}
+
+ void NoOp() {
+ }
+
+ void MessagePipe_CreateAndClose() {
+ MojoResult result;
+ result = CreateMessagePipe(&h_0_, &h_1_);
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+ result = Close(h_0_);
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+ result = Close(h_1_);
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+ }
+
+ void MessagePipe_WriteAndRead(void* buffer, uint32_t bytes) {
+ MojoResult result;
+ result = WriteMessage(h_0_,
+ buffer, bytes,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+ uint32_t read_bytes = bytes;
+ result = ReadMessage(h_1_,
+ buffer, &read_bytes,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+ }
+
+ protected:
+ Handle h_0_;
+ Handle h_1_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SystemPerftest);
+};
+
+// A no-op test so we can compare performance.
+TEST_F(SystemPerftest, NoOp) {
+ test::IterateAndReportPerf(
+ "NoOp",
+ base::Bind(&SystemPerftest::NoOp,
+ base::Unretained(this)));
+}
+
+TEST_F(SystemPerftest, MessagePipe_CreateAndClose) {
+ test::IterateAndReportPerf(
+ "MessagePipe_CreateAndClose",
+ base::Bind(&SystemPerftest::MessagePipe_CreateAndClose,
+ base::Unretained(this)));
+}
+
+TEST_F(SystemPerftest, MessagePipe_WriteAndRead) {
+ CHECK_EQ(CreateMessagePipe(&h_0_, &h_1_), MOJO_RESULT_OK);
+ char buffer[10000] = { 0 };
+ test::IterateAndReportPerf(
+ "MessagePipe_WriteAndRead_10bytes",
+ base::Bind(&SystemPerftest::MessagePipe_WriteAndRead,
+ base::Unretained(this),
+ static_cast<void*>(buffer), static_cast<uint32_t>(10)));
+ test::IterateAndReportPerf(
+ "MessagePipe_WriteAndRead_100bytes",
+ base::Bind(&SystemPerftest::MessagePipe_WriteAndRead,
+ base::Unretained(this),
+ static_cast<void*>(buffer), static_cast<uint32_t>(100)));
+ test::IterateAndReportPerf(
+ "MessagePipe_WriteAndRead_1000bytes",
+ base::Bind(&SystemPerftest::MessagePipe_WriteAndRead,
+ base::Unretained(this),
+ static_cast<void*>(buffer), static_cast<uint32_t>(1000)));
+ test::IterateAndReportPerf(
+ "MessagePipe_WriteAndRead_10000bytes",
+ base::Bind(&SystemPerftest::MessagePipe_WriteAndRead,
+ base::Unretained(this),
+ static_cast<void*>(buffer), static_cast<uint32_t>(10000)));
+ CHECK_EQ(Close(h_0_), MOJO_RESULT_OK);
+ CHECK_EQ(Close(h_1_), MOJO_RESULT_OK);
+}
+
+} // namespace
+} // namespace mojo
diff --git a/mojo/public/tests/system_core_unittest.cc b/mojo/public/tests/system_core_unittest.cc
new file mode 100644
index 0000000..382a800
--- /dev/null
+++ b/mojo/public/tests/system_core_unittest.cc
@@ -0,0 +1,105 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/system/core.h"
+
+#include <string.h>
+
+#include "mojo/public/tests/test_support.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace {
+
+class SystemTest : public test::TestBase {
+};
+
+TEST_F(SystemTest, Basic) {
+ Handle h_0;
+ MojoWaitFlags wf;
+ char buffer[10] = { 0 };
+ uint32_t buffer_size;
+
+ // The only handle that's guaranteed to be invalid is |kInvalidHandle|.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, Close(kInvalidHandle));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ Wait(kInvalidHandle, MOJO_WAIT_FLAG_EVERYTHING, 1000000));
+ h_0 = kInvalidHandle;
+ wf = MOJO_WAIT_FLAG_EVERYTHING;
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ WaitMany(&h_0, &wf, 1, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ WriteMessage(h_0,
+ buffer, 3,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ ReadMessage(h_0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ Handle h_1;
+ EXPECT_EQ(MOJO_RESULT_OK, CreateMessagePipe(&h_0, &h_1));
+
+ // Shouldn't be readable.
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
+ Wait(h_0, MOJO_WAIT_FLAG_READABLE, 0));
+
+ // Should be writable.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ Wait(h_0, MOJO_WAIT_FLAG_WRITABLE, 0));
+
+ // Try to read.
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ ReadMessage(h_0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ // Write to |h_1|.
+ static const char hello[] = "hello";
+ memcpy(buffer, hello, sizeof(hello));
+ buffer_size = static_cast<uint32_t>(sizeof(hello));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ WriteMessage(h_1,
+ hello, buffer_size,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // |h_0| should be readable.
+ wf = MOJO_WAIT_FLAG_READABLE;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ WaitMany(&h_0, &wf, 1, MOJO_DEADLINE_INDEFINITE));
+
+ // Read from |h_0|.
+ memset(buffer, 0, sizeof(buffer));
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ ReadMessage(h_0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(hello)), buffer_size);
+ EXPECT_EQ(0, memcmp(hello, buffer, sizeof(hello)));
+
+ // |h_0| should no longer be readable.
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
+ Wait(h_0, MOJO_WAIT_FLAG_READABLE, 10));
+
+ // Close |h_0|.
+ EXPECT_EQ(MOJO_RESULT_OK, Close(h_0));
+
+ // |h_1| should no longer be readable or writable.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ Wait(h_1, MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE, 1000));
+
+ EXPECT_EQ(MOJO_RESULT_OK, Close(h_1));
+}
+
+// TODO(vtl): Add multi-threaded tests.
+
+} // namespace
+} // namespace mojo
diff --git a/mojo/public/tests/test_support.cc b/mojo/public/tests/test_support.cc
new file mode 100644
index 0000000..0140c9f
--- /dev/null
+++ b/mojo/public/tests/test_support.cc
@@ -0,0 +1,48 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/tests/test_support.h"
+
+#include "base/test/perf_log.h"
+#include "base/time/time.h"
+#include "mojo/system/core_impl.h"
+
+namespace mojo {
+namespace test {
+
+TestBase::TestBase() {
+}
+
+TestBase::~TestBase() {
+}
+
+void TestBase::SetUp() {
+ if (!system::CoreImpl::Get())
+ system::CoreImpl::Init();
+}
+
+void IterateAndReportPerf(const char* test_name,
+ base::Callback<void()> single_iteration) {
+ // TODO(vtl): These should be specifiable using command-line flags.
+ static const size_t kGranularity = 100;
+ static const double kPerftestTimeSeconds = 3.0;
+
+ const base::TimeTicks start_time = base::TimeTicks::HighResNow();
+ base::TimeTicks end_time;
+ size_t iterations = 0;
+ do {
+ for (size_t i = 0; i < kGranularity; i++)
+ single_iteration.Run();
+ iterations += kGranularity;
+
+ end_time = base::TimeTicks::HighResNow();
+ } while ((end_time - start_time).InSecondsF() < kPerftestTimeSeconds);
+
+ base::LogPerfResult(test_name,
+ iterations / (end_time - start_time).InSecondsF(),
+ "iterations/second");
+}
+
+} // namespace test
+} // namespace mojo
diff --git a/mojo/public/tests/test_support.h b/mojo/public/tests/test_support.h
new file mode 100644
index 0000000..34ec0bf
--- /dev/null
+++ b/mojo/public/tests/test_support.h
@@ -0,0 +1,36 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_PUBLIC_TESTS_TEST_SUPPORT_H_
+#define MOJO_PUBLIC_TESTS_TEST_SUPPORT_H_
+
+#include "base/basictypes.h"
+#include "base/callback.h"
+#include "base/compiler_specific.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace test {
+
+class TestBase : public testing::Test {
+ public:
+ TestBase();
+ virtual ~TestBase();
+
+ virtual void SetUp() OVERRIDE;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(TestBase);
+};
+
+// Run |single_iteration| an appropriate number of times and report its
+// performance appropriately. (This actually runs |single_iteration| for a fixed
+// amount of time and reports the number of iterations per unit time.)
+void IterateAndReportPerf(const char* test_name,
+ base::Callback<void()> single_iteration);
+
+} // namespace test
+} // namespace mojo
+
+#endif // MOJO_PUBLIC_TESTS_TEST_SUPPORT_H_
diff --git a/mojo/system/core.cc b/mojo/system/core.cc
new file mode 100644
index 0000000..5367fd31
--- /dev/null
+++ b/mojo/system/core.cc
@@ -0,0 +1,53 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/system/core.h"
+
+#include "mojo/system/core_impl.h"
+
+extern "C" {
+
+MojoResult MojoClose(MojoHandle handle) {
+ return mojo::system::CoreImpl::Get()->Close(handle);
+}
+
+MojoResult MojoWait(MojoHandle handle,
+ MojoWaitFlags flags,
+ MojoDeadline deadline) {
+ return mojo::system::CoreImpl::Get()->Wait(handle, flags, deadline);
+}
+
+MojoResult MojoWaitMany(const MojoHandle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline) {
+ return mojo::system::CoreImpl::Get()->WaitMany(handles, flags, num_handles,
+ deadline);
+}
+
+MojoResult MojoCreateMessagePipe(MojoHandle* handle_0, MojoHandle* handle_1) {
+ return mojo::system::CoreImpl::Get()->CreateMessagePipe(handle_0, handle_1);
+}
+
+MojoResult MojoWriteMessage(MojoHandle handle,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags) {
+ return mojo::system::CoreImpl::Get()->WriteMessage(handle,
+ bytes, num_bytes,
+ handles, num_handles,
+ flags);
+}
+
+MojoResult MojoReadMessage(MojoHandle handle,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ return mojo::system::CoreImpl::Get()->ReadMessage(handle,
+ bytes, num_bytes,
+ handles, num_handles,
+ flags);
+}
+
+} // extern "C"
diff --git a/mojo/system/core_impl.cc b/mojo/system/core_impl.cc
new file mode 100644
index 0000000..00d1107
--- /dev/null
+++ b/mojo/system/core_impl.cc
@@ -0,0 +1,275 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/core_impl.h"
+
+#include <vector>
+
+#include "base/logging.h"
+#include "mojo/system/dispatcher.h"
+#include "mojo/system/limits.h"
+#include "mojo/system/memory.h"
+#include "mojo/system/message_pipe.h"
+#include "mojo/system/message_pipe_dispatcher.h"
+#include "mojo/system/waiter.h"
+
+namespace mojo {
+namespace system {
+
+// Implementation notes
+//
+// Mojo primitives are implemented by the singleton |CoreImpl| object. Most
+// calls are for a "primary" handle (the first argument).
+// |CoreImpl::GetDispatcher()| is used to look up a |Dispatcher| object for a
+// given handle. That object implements most primitives for that object. The
+// wait primitives are not attached to objects and are implemented by |CoreImpl|
+// itself.
+//
+// Some objects have multiple handles associated to them, e.g., message pipes
+// (which have two). In such a case, there is still a |Dispatcher| (e.g.,
+// |MessagePipeDispatcher|) for each handle, with each handle having a strong
+// reference to the common "secondary" object (e.g., |MessagePipe|). This
+// secondary object does NOT have any references to the |Dispatcher|s (even if
+// it did, it wouldn't be able to do anything with them due to lock order
+// requirements -- see below).
+//
+// Waiting is implemented by having the thread that wants to wait call the
+// |Dispatcher|s for the handles that it wants to wait on with a |Waiter|
+// object; this |Waiter| object may be created on the stack of that thread or be
+// kept in thread local storage for that thread (TODO(vtl): future improvement).
+// The |Dispatcher| then adds the |Waiter| to a |WaiterList| that's either owned
+// by that |Dispatcher| (see |SimpleDispatcher|) or by a secondary object (e.g.,
+// |MessagePipe|). To signal/wake a |Waiter|, the object in question -- either a
+// |SimpleDispatcher| or a secondary object -- talks to its |WaiterList|.
+
+// Thread-safety notes
+//
+// Mojo primitives calls are thread-safe. We achieve this with relatively
+// fine-grained locking. There is a global handle table lock. This lock should
+// be held as briefly as possible (TODO(vtl): a future improvement would be to
+// switch it to a reader-writer lock). Each |Dispatcher| object then has a lock
+// (which subclasses can use to protect their data).
+//
+// The lock ordering is as follows:
+// 1. global handle table lock
+// 2. |Dispatcher| locks
+// 3. secondary object locks
+// ...
+// INF. |Waiter| locks
+//
+// Notes:
+// - While holding a |Dispatcher| lock, you may not unconditionally attempt
+// to take another |Dispatcher| lock. (This has consequences on the
+// concurrency semantics of |MojoWriteMessage()| when passing handles.)
+// Doing so would lead to deadlock.
+// - Locks at the "INF" level may not have any locks taken while they are
+// held.
+
+// static
+CoreImpl* CoreImpl::singleton_ = NULL;
+
+// static
+void CoreImpl::Init() {
+ CHECK(!singleton_);
+ singleton_ = new CoreImpl();
+}
+
+MojoResult CoreImpl::Close(MojoHandle handle) {
+ if (handle == MOJO_HANDLE_INVALID)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ scoped_refptr<Dispatcher> dispatcher;
+ {
+ base::AutoLock locker(handle_table_lock_);
+ HandleTableMap::iterator it = handle_table_.find(handle);
+ if (it == handle_table_.end())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ dispatcher = it->second;
+ handle_table_.erase(it);
+ }
+
+ // The dispatcher doesn't have a say in being closed, but gets notified of it.
+ // Note: This is done outside of |handle_table_lock_|. As a result, there's a
+ // race condition that the dispatcher must handle; see the comment in
+ // |Dispatcher| in dispatcher.h.
+ return dispatcher->Close();
+}
+
+MojoResult CoreImpl::Wait(MojoHandle handle,
+ MojoWaitFlags flags,
+ MojoDeadline deadline) {
+ return WaitManyInternal(&handle, &flags, 1, deadline);
+}
+
+MojoResult CoreImpl::WaitMany(const MojoHandle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline) {
+ if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (!VerifyUserPointer(flags, num_handles, sizeof(flags[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_handles < 1)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_handles > kMaxWaitManyNumHandles)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+ return WaitManyInternal(handles, flags, num_handles, deadline);
+}
+
+MojoResult CoreImpl::CreateMessagePipe(MojoHandle* handle_0,
+ MojoHandle* handle_1) {
+ scoped_refptr<MessagePipeDispatcher> dispatcher_0(
+ new MessagePipeDispatcher());
+ scoped_refptr<MessagePipeDispatcher> dispatcher_1(
+ new MessagePipeDispatcher());
+
+ MojoHandle h0, h1;
+ {
+ base::AutoLock locker(handle_table_lock_);
+
+ h0 = AddDispatcherNoLock(dispatcher_0);
+ if (h0 == MOJO_HANDLE_INVALID)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ h1 = AddDispatcherNoLock(dispatcher_1);
+ if (h1 == MOJO_HANDLE_INVALID) {
+ handle_table_.erase(h0);
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+ }
+ }
+
+ scoped_refptr<MessagePipe> message_pipe(new MessagePipe());
+ dispatcher_0->Init(message_pipe, 0);
+ dispatcher_1->Init(message_pipe, 1);
+
+ *handle_0 = h0;
+ *handle_1 = h1;
+ return MOJO_RESULT_OK;
+}
+
+MojoResult CoreImpl::WriteMessage(
+ MojoHandle handle,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags) {
+ scoped_refptr<Dispatcher> dispatcher(GetDispatcher(handle));
+ if (!dispatcher.get())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return dispatcher->WriteMessage(bytes, num_bytes,
+ handles, num_handles,
+ flags);
+}
+
+MojoResult CoreImpl::ReadMessage(
+ MojoHandle handle,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ scoped_refptr<Dispatcher> dispatcher(GetDispatcher(handle));
+ if (!dispatcher.get())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return dispatcher->ReadMessage(bytes, num_bytes,
+ handles, num_handles,
+ flags);
+}
+
+CoreImpl::CoreImpl()
+ : next_handle_(MOJO_HANDLE_INVALID + 1) {
+}
+
+CoreImpl::~CoreImpl() {
+ // This should usually not be reached (the singleton lives forever), except
+ // in tests.
+}
+
+scoped_refptr<Dispatcher> CoreImpl::GetDispatcher(MojoHandle handle) {
+ if (handle == MOJO_HANDLE_INVALID)
+ return NULL;
+
+ base::AutoLock locker(handle_table_lock_);
+ HandleTableMap::iterator it = handle_table_.find(handle);
+ if (it == handle_table_.end())
+ return NULL;
+
+ return it->second;
+}
+
+MojoHandle CoreImpl::AddDispatcherNoLock(scoped_refptr<Dispatcher> dispatcher) {
+ DCHECK(dispatcher.get());
+ handle_table_lock_.AssertAcquired();
+ DCHECK_NE(next_handle_, MOJO_HANDLE_INVALID);
+
+ if (handle_table_.size() >= kMaxHandleTableSize)
+ return MOJO_HANDLE_INVALID;
+
+ // TODO(vtl): Maybe we want to do something different/smarter. (Or maybe try
+ // assigning randomly?)
+ while (handle_table_.find(next_handle_) != handle_table_.end()) {
+ next_handle_++;
+ if (next_handle_ == MOJO_HANDLE_INVALID)
+ next_handle_++;
+ }
+
+ MojoHandle new_handle = next_handle_;
+ handle_table_[new_handle] = dispatcher;
+
+ next_handle_++;
+ if (next_handle_ == MOJO_HANDLE_INVALID)
+ next_handle_++;
+
+ return new_handle;
+}
+
+// Note: We allow |handles| to repeat the same handle multiple times, since
+// different flags may be specified.
+// TODO(vtl): This incurs a performance cost in |RemoveWaiter()|. Analyze this
+// more carefully and address it if necessary.
+MojoResult CoreImpl::WaitManyInternal(const MojoHandle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline) {
+ DCHECK_GT(num_handles, 0u);
+
+ std::vector<scoped_refptr<Dispatcher> > dispatchers;
+ dispatchers.reserve(num_handles);
+ for (uint32_t i = 0; i < num_handles; i++) {
+ scoped_refptr<Dispatcher> d = GetDispatcher(handles[i]);
+ if (!d.get())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ dispatchers.push_back(d);
+ }
+
+ // TODO(vtl): Should make the waiter live (permanently) in TLS.
+ Waiter waiter;
+ waiter.Init();
+
+ uint32_t i;
+ MojoResult rv = MOJO_RESULT_OK;
+ for (i = 0; i < num_handles; i++) {
+ rv = dispatchers[i]->AddWaiter(&waiter,
+ flags[i],
+ static_cast<MojoResult>(i));
+ if (rv != MOJO_RESULT_OK)
+ break;
+ }
+ uint32_t num_added = i;
+
+ if (rv == MOJO_RESULT_ALREADY_EXISTS)
+ rv = static_cast<MojoResult>(i); // The i-th one is already "triggered".
+ else if (rv == MOJO_RESULT_OK)
+ rv = waiter.Wait(deadline);
+
+ // Make sure no other dispatchers try to wake |waiter| for the current
+ // |Wait()|/|WaitMany()| call. (Only after doing this can |waiter| be
+ // destroyed, but this would still be required if the waiter were in TLS.)
+ for (i = 0; i < num_added; i++)
+ dispatchers[i]->RemoveWaiter(&waiter);
+
+ return rv;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/core_impl.h b/mojo/system/core_impl.h
new file mode 100644
index 0000000..1417a06
--- /dev/null
+++ b/mojo/system/core_impl.h
@@ -0,0 +1,104 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_CORE_IMPL_H_
+#define MOJO_SYSTEM_CORE_IMPL_H_
+
+#include "base/basictypes.h"
+#include "base/containers/hash_tables.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace system {
+
+class CoreImpl;
+class Dispatcher;
+
+namespace test {
+class CoreTestBase;
+}
+
+// |CoreImpl| is a singleton object that implements the Mojo system calls. With
+// the (obvious) exception of |Init()|, which must be called first (and the call
+// completed) before making any other calls, all the public methods are
+// thread-safe.
+class CoreImpl {
+ public:
+ static void Init();
+
+ static CoreImpl* Get() {
+ return singleton_;
+ }
+
+ MojoResult Close(MojoHandle handle);
+
+ MojoResult Wait(MojoHandle handle,
+ MojoWaitFlags flags,
+ MojoDeadline deadline);
+
+ MojoResult WaitMany(const MojoHandle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline);
+
+ MojoResult CreateMessagePipe(MojoHandle* handle_0, MojoHandle* handle_1);
+
+ MojoResult WriteMessage(MojoHandle handle,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags);
+
+ MojoResult ReadMessage(MojoHandle handle,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags);
+
+ private:
+ friend class test::CoreTestBase;
+
+ typedef base::hash_map<MojoHandle, scoped_refptr<Dispatcher> >
+ HandleTableMap;
+
+ CoreImpl();
+ ~CoreImpl();
+
+ // Looks up the dispatcher for the given handle. Returns null if the handle is
+ // invalid.
+ scoped_refptr<Dispatcher> GetDispatcher(MojoHandle handle);
+
+ // Assigns a new handle for the given dispatcher (which must be valid);
+ // returns |MOJO_HANDLE_INVALID| on failure (due to hitting resource limits).
+ // Must be called under |handle_table_lock_|.
+ MojoHandle AddDispatcherNoLock(scoped_refptr<Dispatcher> dispatcher);
+
+ // Internal implementation of |Wait()| and |WaitMany()|; doesn't do basic
+ // validation of arguments.
+ MojoResult WaitManyInternal(const MojoHandle* handles,
+ const MojoWaitFlags* flags,
+ uint32_t num_handles,
+ MojoDeadline deadline);
+
+ // ---------------------------------------------------------------------------
+
+ static CoreImpl* singleton_;
+
+ // ---------------------------------------------------------------------------
+
+ // TODO(vtl): |handle_table_lock_| should be a reader-writer lock (if only we
+ // had them).
+ base::Lock handle_table_lock_; // Protects the immediately-following members.
+ HandleTableMap handle_table_;
+ MojoHandle next_handle_; // Invariant: never |MOJO_HANDLE_INVALID|.
+
+ // ---------------------------------------------------------------------------
+
+ DISALLOW_COPY_AND_ASSIGN(CoreImpl);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_CORE_IMPL_H_
diff --git a/mojo/system/core_impl_unittest.cc b/mojo/system/core_impl_unittest.cc
new file mode 100644
index 0000000..1ace04d
--- /dev/null
+++ b/mojo/system/core_impl_unittest.cc
@@ -0,0 +1,245 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/core_impl.h"
+
+#include "mojo/system/core_test_base.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+class CoreImplTest : public test::CoreTestBase {
+};
+
+TEST_F(CoreImplTest, Basic) {
+ MockHandleInfo info;
+
+ EXPECT_EQ(0u, info.GetCtorCallCount());
+ MojoHandle h = CreateMockHandle(&info);
+ EXPECT_EQ(1u, info.GetCtorCallCount());
+ EXPECT_NE(h, MOJO_HANDLE_INVALID);
+
+ EXPECT_EQ(0u, info.GetWriteMessageCallCount());
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->WriteMessage(h, NULL, 0, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(1u, info.GetWriteMessageCallCount());
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WriteMessage(h, NULL, 1, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(2u, info.GetWriteMessageCallCount());
+
+ EXPECT_EQ(0u, info.GetReadMessageCallCount());
+ uint32_t num_bytes = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->ReadMessage(h, NULL, &num_bytes, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(1u, info.GetReadMessageCallCount());
+ num_bytes = 1;
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->ReadMessage(h, NULL, &num_bytes, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(2u, info.GetReadMessageCallCount());
+
+ EXPECT_EQ(0u, info.GetAddWaiterCallCount());
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->Wait(h, MOJO_WAIT_FLAG_EVERYTHING,
+ MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(1u, info.GetAddWaiterCallCount());
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->Wait(h, MOJO_WAIT_FLAG_EVERYTHING, 0));
+ EXPECT_EQ(2u, info.GetAddWaiterCallCount());
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->Wait(h, MOJO_WAIT_FLAG_EVERYTHING, 10 * 1000));
+ EXPECT_EQ(3u, info.GetAddWaiterCallCount());
+ MojoWaitFlags wait_flags = MOJO_WAIT_FLAG_EVERYTHING;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->WaitMany(&h, &wait_flags, 1, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(4u, info.GetAddWaiterCallCount());
+
+ EXPECT_EQ(0u, info.GetDtorCallCount());
+ EXPECT_EQ(0u, info.GetCloseCallCount());
+ EXPECT_EQ(0u, info.GetCancelAllWaitersCallCount());
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h));
+ EXPECT_EQ(1u, info.GetCancelAllWaitersCallCount());
+ EXPECT_EQ(1u, info.GetCloseCallCount());
+ EXPECT_EQ(1u, info.GetDtorCallCount());
+
+ // No waiters should ever have ever been added.
+ EXPECT_EQ(0u, info.GetRemoveWaiterCallCount());
+}
+
+TEST_F(CoreImplTest, InvalidArguments) {
+ // |Close()|:
+ {
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(MOJO_HANDLE_INVALID));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(10));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(1000000000));
+
+ // Test a double-close.
+ MockHandleInfo info;
+ MojoHandle h = CreateMockHandle(&info);
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h));
+ EXPECT_EQ(1u, info.GetCloseCallCount());
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(h));
+ EXPECT_EQ(1u, info.GetCloseCallCount());
+ }
+
+ // |Wait()|:
+ {
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->Wait(MOJO_HANDLE_INVALID, MOJO_WAIT_FLAG_EVERYTHING,
+ MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->Wait(10, MOJO_WAIT_FLAG_EVERYTHING,
+ MOJO_DEADLINE_INDEFINITE));
+ }
+
+ // |WaitMany()|:
+ {
+ MojoHandle handles[2] = { MOJO_HANDLE_INVALID, MOJO_HANDLE_INVALID };
+ MojoWaitFlags flags[2] = { MOJO_WAIT_FLAG_EVERYTHING,
+ MOJO_WAIT_FLAG_EVERYTHING };
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(handles, flags, 0, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(NULL, flags, 0, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(handles, NULL, 0, MOJO_DEADLINE_INDEFINITE));
+
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(NULL, flags, 1, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(handles, NULL, 1, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(handles, flags, 1, MOJO_DEADLINE_INDEFINITE));
+
+ MockHandleInfo info[2];
+ handles[0] = CreateMockHandle(&info[0]);
+
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->WaitMany(handles, flags, 1, MOJO_DEADLINE_INDEFINITE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(handles, flags, 2, MOJO_DEADLINE_INDEFINITE));
+ handles[1] = handles[0] + 1; // Invalid handle.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ core()->WaitMany(handles, flags, 2, MOJO_DEADLINE_INDEFINITE));
+ handles[1] = CreateMockHandle(&info[1]);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->WaitMany(handles, flags, 2, MOJO_DEADLINE_INDEFINITE));
+
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(handles[0]));
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(handles[1]));
+ }
+}
+
+// TODO(vtl): test |Wait()| and |WaitMany()| properly
+// - including |WaitMany()| with the same handle more than once (with
+// same/different flags)
+
+TEST_F(CoreImplTest, MessagePipe) {
+ MojoHandle h[2];
+
+ EXPECT_EQ(MOJO_RESULT_OK, core()->CreateMessagePipe(&h[0], &h[1]));
+ // Should get two distinct, valid handles.
+ EXPECT_NE(h[0], MOJO_HANDLE_INVALID);
+ EXPECT_NE(h[1], MOJO_HANDLE_INVALID);
+ EXPECT_NE(h[0], h[1]);
+
+ // Neither should be readable.
+ MojoWaitFlags flags[2] = { MOJO_WAIT_FLAG_READABLE, MOJO_WAIT_FLAG_READABLE };
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
+ core()->WaitMany(h, flags, 2, 0));
+
+ // Try to read anyway.
+ char buffer[1] = { 'a' };
+ uint32_t buffer_size = 1;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ core()->ReadMessage(h[0], buffer, &buffer_size, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ // Check that it left its inputs alone.
+ EXPECT_EQ('a', buffer[0]);
+ EXPECT_EQ(1u, buffer_size);
+
+ // Both should be writable.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->Wait(h[0], MOJO_WAIT_FLAG_WRITABLE, 1000000000));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->Wait(h[1], MOJO_WAIT_FLAG_WRITABLE, 1000000000));
+
+ // Also check that |h[1]| is writable using |WaitMany()|.
+ flags[0] = MOJO_WAIT_FLAG_READABLE;
+ flags[1] = MOJO_WAIT_FLAG_WRITABLE;
+ EXPECT_EQ(1, core()->WaitMany(h, flags, 2, MOJO_DEADLINE_INDEFINITE));
+
+ // Write to |h[1]|.
+ buffer[0] = 'b';
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->WriteMessage(h[1], buffer, 1, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Check that |h[0]| is now readable.
+ flags[0] = MOJO_WAIT_FLAG_READABLE;
+ flags[1] = MOJO_WAIT_FLAG_READABLE;
+ EXPECT_EQ(0, core()->WaitMany(h, flags, 2, MOJO_DEADLINE_INDEFINITE));
+
+ // Read from |h[0]|.
+ // First, get only the size.
+ buffer_size = 0;
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ core()->ReadMessage(h[0], NULL, &buffer_size, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(1u, buffer_size);
+ // Then actually read it.
+ buffer[0] = 'c';
+ buffer_size = 1;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->ReadMessage(h[0], buffer, &buffer_size, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ('b', buffer[0]);
+ EXPECT_EQ(1u, buffer_size);
+
+ // |h[0]| should no longer be readable.
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
+ core()->Wait(h[0], MOJO_WAIT_FLAG_READABLE, 0));
+
+ // Write to |h[0]|.
+ buffer[0] = 'd';
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->WriteMessage(h[0], buffer, 1, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Close |h[0]|.
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h[0]));
+
+ // Check that |h[1]| is no longer writable (and will never be).
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->Wait(h[1], MOJO_WAIT_FLAG_WRITABLE, 1000000000));
+
+ // Check that |h[1]| is still readable (for the moment).
+ EXPECT_EQ(MOJO_RESULT_OK,
+ core()->Wait(h[1], MOJO_WAIT_FLAG_READABLE, 1000000000));
+
+ // Discard a message from |h[1]|.
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ core()->ReadMessage(h[1], NULL, NULL, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+
+ // |h[1]| is no longer readable (and will never be).
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->Wait(h[1], MOJO_WAIT_FLAG_READABLE, 1000000000));
+
+ // Try writing to |h[1]|.
+ buffer[0] = 'e';
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ core()->WriteMessage(h[1], buffer, 1, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h[1]));
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/core_test_base.cc b/mojo/system/core_test_base.cc
new file mode 100644
index 0000000..36241f8
--- /dev/null
+++ b/mojo/system/core_test_base.cc
@@ -0,0 +1,227 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/core_test_base.h"
+
+#include <vector>
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "mojo/system/core_impl.h"
+#include "mojo/system/dispatcher.h"
+#include "mojo/system/memory.h"
+
+namespace mojo {
+namespace system {
+namespace test {
+
+namespace {
+
+// MockDispatcher --------------------------------------------------------------
+
+class MockDispatcher : public Dispatcher {
+ public:
+ explicit MockDispatcher(CoreTestBase::MockHandleInfo* info)
+ : info_(info) {
+ CHECK(info_);
+ info_->IncrementCtorCallCount();
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<MockDispatcher>;
+ virtual ~MockDispatcher() {
+ info_->IncrementDtorCallCount();
+ }
+
+ // |Dispatcher| implementation/overrides:
+ virtual MojoResult CloseImplNoLock() OVERRIDE {
+ info_->IncrementCloseCallCount();
+ lock().AssertAcquired();
+ return MOJO_RESULT_OK;
+ }
+
+ virtual MojoResult WriteMessageImplNoLock(
+ const void* bytes,
+ uint32_t num_bytes,
+ const MojoHandle* handles,
+ uint32_t num_handles,
+ MojoWriteMessageFlags /*flags*/) OVERRIDE {
+ info_->IncrementWriteMessageCallCount();
+ lock().AssertAcquired();
+
+ if (!VerifyUserPointer(bytes, num_bytes, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return MOJO_RESULT_OK;
+ }
+
+ virtual MojoResult ReadMessageImplNoLock(
+ void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags /*flags*/) OVERRIDE {
+ info_->IncrementReadMessageCallCount();
+ lock().AssertAcquired();
+
+ if (num_bytes && !VerifyUserPointer(bytes, *num_bytes, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_handles &&
+ !VerifyUserPointer(handles, *num_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return MOJO_RESULT_OK;
+ }
+
+ virtual MojoResult AddWaiterImplNoLock(Waiter* /*waiter*/,
+ MojoWaitFlags /*flags*/,
+ MojoResult /*wake_result*/) OVERRIDE {
+ info_->IncrementAddWaiterCallCount();
+ lock().AssertAcquired();
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ virtual void RemoveWaiterImplNoLock(Waiter* /*waiter*/) OVERRIDE {
+ info_->IncrementRemoveWaiterCallCount();
+ lock().AssertAcquired();
+ }
+
+ virtual void CancelAllWaitersNoLock() OVERRIDE {
+ info_->IncrementCancelAllWaitersCallCount();
+ lock().AssertAcquired();
+ }
+
+ CoreTestBase::MockHandleInfo* const info_;
+
+ DISALLOW_COPY_AND_ASSIGN(MockDispatcher);
+};
+
+} // namespace
+
+// CoreTestBase ----------------------------------------------------------------
+
+CoreTestBase::CoreTestBase() {
+}
+
+CoreTestBase::~CoreTestBase() {
+}
+
+void CoreTestBase::SetUp() {
+ core_ = new CoreImpl();
+}
+
+void CoreTestBase::TearDown() {
+ delete core_;
+ core_ = NULL;
+}
+
+MojoHandle CoreTestBase::CreateMockHandle(CoreTestBase::MockHandleInfo* info) {
+ CHECK(core_);
+ scoped_refptr<MockDispatcher> dispatcher(new MockDispatcher(info));
+ base::AutoLock locker(core_->handle_table_lock_);
+ return core_->AddDispatcherNoLock(dispatcher);
+}
+
+// CoreTestBase_MockHandleInfo -------------------------------------------------
+
+CoreTestBase_MockHandleInfo::CoreTestBase_MockHandleInfo()
+ : ctor_call_count_(0),
+ dtor_call_count_(0),
+ close_call_count_(0),
+ write_message_call_count_(0),
+ read_message_call_count_(0),
+ add_waiter_call_count_(0),
+ remove_waiter_call_count_(0),
+ cancel_all_waiters_call_count_(0) {
+}
+
+CoreTestBase_MockHandleInfo::~CoreTestBase_MockHandleInfo() {
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetCtorCallCount() const {
+ base::AutoLock locker(lock_);
+ return ctor_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetDtorCallCount() const {
+ base::AutoLock locker(lock_);
+ return dtor_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetCloseCallCount() const {
+ base::AutoLock locker(lock_);
+ return close_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetWriteMessageCallCount() const {
+ base::AutoLock locker(lock_);
+ return write_message_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetReadMessageCallCount() const {
+ base::AutoLock locker(lock_);
+ return read_message_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetAddWaiterCallCount() const {
+ base::AutoLock locker(lock_);
+ return add_waiter_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetRemoveWaiterCallCount() const {
+ base::AutoLock locker(lock_);
+ return remove_waiter_call_count_;
+}
+
+unsigned CoreTestBase_MockHandleInfo::GetCancelAllWaitersCallCount() const {
+ base::AutoLock locker(lock_);
+ return cancel_all_waiters_call_count_;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementCtorCallCount() {
+ base::AutoLock locker(lock_);
+ ctor_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementDtorCallCount() {
+ base::AutoLock locker(lock_);
+ dtor_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementCloseCallCount() {
+ base::AutoLock locker(lock_);
+ close_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementWriteMessageCallCount() {
+ base::AutoLock locker(lock_);
+ write_message_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementReadMessageCallCount() {
+ base::AutoLock locker(lock_);
+ read_message_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementAddWaiterCallCount() {
+ base::AutoLock locker(lock_);
+ add_waiter_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementRemoveWaiterCallCount() {
+ base::AutoLock locker(lock_);
+ remove_waiter_call_count_++;
+}
+
+void CoreTestBase_MockHandleInfo::IncrementCancelAllWaitersCallCount() {
+ base::AutoLock locker(lock_);
+ cancel_all_waiters_call_count_++;
+}
+
+} // namespace test
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/core_test_base.h b/mojo/system/core_test_base.h
new file mode 100644
index 0000000..114e62e
--- /dev/null
+++ b/mojo/system/core_test_base.h
@@ -0,0 +1,87 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_CORE_TEST_BASE_H_
+#define MOJO_SYSTEM_CORE_TEST_BASE_H_
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/synchronization/lock.h"
+#include "mojo/public/system/core.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+
+class CoreImpl;
+
+namespace test {
+
+class CoreTestBase_MockHandleInfo;
+
+class CoreTestBase : public testing::Test {
+ public:
+ typedef CoreTestBase_MockHandleInfo MockHandleInfo;
+
+ CoreTestBase();
+ virtual ~CoreTestBase();
+
+ virtual void SetUp() OVERRIDE;
+ virtual void TearDown() OVERRIDE;
+
+ protected:
+ // |info| must remain alive until the returned handle is closed.
+ MojoHandle CreateMockHandle(MockHandleInfo* info);
+
+ CoreImpl* core() { return core_; }
+
+ private:
+ CoreImpl* core_;
+
+ DISALLOW_COPY_AND_ASSIGN(CoreTestBase);
+};
+
+class CoreTestBase_MockHandleInfo {
+ public:
+ CoreTestBase_MockHandleInfo();
+ ~CoreTestBase_MockHandleInfo();
+
+ unsigned GetCtorCallCount() const;
+ unsigned GetDtorCallCount() const;
+ unsigned GetCloseCallCount() const;
+ unsigned GetWriteMessageCallCount() const;
+ unsigned GetReadMessageCallCount() const;
+ unsigned GetAddWaiterCallCount() const;
+ unsigned GetRemoveWaiterCallCount() const;
+ unsigned GetCancelAllWaitersCallCount() const;
+
+ // For use by |MockDispatcher|:
+ void IncrementCtorCallCount();
+ void IncrementDtorCallCount();
+ void IncrementCloseCallCount();
+ void IncrementWriteMessageCallCount();
+ void IncrementReadMessageCallCount();
+ void IncrementAddWaiterCallCount();
+ void IncrementRemoveWaiterCallCount();
+ void IncrementCancelAllWaitersCallCount();
+
+ private:
+ mutable base::Lock lock_; // Protects the following members.
+ unsigned ctor_call_count_;
+ unsigned dtor_call_count_;
+ unsigned close_call_count_;
+ unsigned write_message_call_count_;
+ unsigned read_message_call_count_;
+ unsigned add_waiter_call_count_;
+ unsigned remove_waiter_call_count_;
+ unsigned cancel_all_waiters_call_count_;
+
+ DISALLOW_COPY_AND_ASSIGN(CoreTestBase_MockHandleInfo);
+};
+
+} // namespace test
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_CORE_TEST_BASE_H_
diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc
new file mode 100644
index 0000000..04f3550
--- /dev/null
+++ b/mojo/system/dispatcher.cc
@@ -0,0 +1,131 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/dispatcher.h"
+
+#include "base/logging.h"
+
+namespace mojo {
+namespace system {
+
+MojoResult Dispatcher::Close() {
+ base::AutoLock locker(lock_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ is_closed_ = true;
+ CancelAllWaitersNoLock();
+ return CloseImplNoLock();
+}
+
+MojoResult Dispatcher::WriteMessage(const void* bytes,
+ uint32_t num_bytes,
+ const MojoHandle* handles,
+ uint32_t num_handles,
+ MojoWriteMessageFlags flags) {
+ base::AutoLock locker(lock_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return WriteMessageImplNoLock(bytes, num_bytes, handles, num_handles, flags);
+}
+
+MojoResult Dispatcher::ReadMessage(void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ base::AutoLock locker(lock_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return ReadMessageImplNoLock(bytes, num_bytes, handles, num_handles, flags);
+}
+
+MojoResult Dispatcher::AddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ DCHECK_GE(wake_result, 0);
+
+ base::AutoLock locker(lock_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return AddWaiterImplNoLock(waiter, flags, wake_result);
+}
+
+void Dispatcher::RemoveWaiter(Waiter* waiter) {
+ base::AutoLock locker(lock_);
+ if (is_closed_)
+ return;
+ RemoveWaiterImplNoLock(waiter);
+}
+
+Dispatcher::Dispatcher()
+ : is_closed_(false) {
+}
+
+Dispatcher::~Dispatcher() {
+ // Make sure that |Close()| was called.
+ DCHECK(is_closed_);
+}
+
+void Dispatcher::CancelAllWaitersNoLock() {
+ lock_.AssertAcquired();
+ DCHECK(is_closed_);
+ // By default, waiting isn't supported. Only dispatchers that can be waited on
+ // will do something nontrivial.
+}
+
+MojoResult Dispatcher::CloseImplNoLock() {
+ lock_.AssertAcquired();
+ DCHECK(is_closed_);
+ // This may not need to do anything. Dispatchers should override this to do
+ // any actual close-time cleanup necessary.
+ return MOJO_RESULT_OK;
+}
+
+MojoResult Dispatcher::WriteMessageImplNoLock(const void* bytes,
+ uint32_t num_bytes,
+ const MojoHandle* handles,
+ uint32_t num_handles,
+ MojoWriteMessageFlags flags) {
+ lock_.AssertAcquired();
+ DCHECK(!is_closed_);
+ // By default, this isn't supported. Only dispatchers for message pipes (with
+ // whatever implementation, possibly a proxy) will do something nontrivial.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::ReadMessageImplNoLock(void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ lock_.AssertAcquired();
+ DCHECK(!is_closed_);
+ // By default, this isn't supported. Only dispatchers for message pipes (with
+ // whatever implementation, possibly a proxy) will do something nontrivial.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ lock_.AssertAcquired();
+ DCHECK(!is_closed_);
+ // By default, waiting isn't supported. Only dispatchers that can be waited on
+ // will do something nontrivial.
+ return MOJO_RESULT_FAILED_PRECONDITION;
+}
+
+void Dispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
+ lock_.AssertAcquired();
+ DCHECK(!is_closed_);
+ // By default, waiting isn't supported. Only dispatchers that can be waited on
+ // will do something nontrivial.
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h
new file mode 100644
index 0000000..6b44f6a
--- /dev/null
+++ b/mojo/system/dispatcher.h
@@ -0,0 +1,106 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_DISPATCHER_H_
+#define MOJO_SYSTEM_DISPATCHER_H_
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace system {
+
+class Waiter;
+
+// A |Dispatcher| implements Mojo primitives that are "attached" to a particular
+// handle. This includes most (all?) primitives except for |MojoWait...()|. This
+// object is thread-safe, with its state being protected by a single lock
+// |lock_|, which is also made available to implementation subclasses (via the
+// |lock()| method).
+class Dispatcher : public base::RefCountedThreadSafe<Dispatcher> {
+ public:
+ // These methods implement the various primitives named |Mojo...()|. These
+ // take |lock_| and handle races with |Close()|. Then they call out to
+ // subclasses' |...ImplNoLock()| methods (still under |lock_|), which actually
+ // implement the primitives.
+ // NOTE(vtl): This puts a big lock around each dispatcher (i.e., handle), and
+ // prevents the various |...ImplNoLock()|s from releasing the lock as soon as
+ // possible. If this becomes an issue, we can rethink this.
+ MojoResult Close();
+ MojoResult WriteMessage(const void* bytes,
+ uint32_t num_bytes,
+ const MojoHandle* handles,
+ uint32_t num_handles,
+ MojoWriteMessageFlags flags);
+ MojoResult ReadMessage(void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags);
+
+ // Adds a waiter to this dispatcher. The waiter will be woken up when this
+ // object changes state to satisfy |flags| with result |wake_result| (which
+ // must be >= 0, i.e., a success status). It will also be woken up when it
+ // becomes impossible for the object to ever satisfy |flags| with a suitable
+ // error status.
+ //
+ // Returns:
+ // - |MOJO_RESULT_OK| if the waiter was added;
+ // - |MOJO_RESULT_ALREADY_EXISTS| if |flags| is already satisfied;
+ // - |MOJO_RESULT_INVALID_ARGUMENT| if the dispatcher has been closed; and
+ // - |MOJO_RESULT_FAILED_PRECONDITION| if it is not (or no longer) possible
+ // that |flags| will ever be satisfied.
+ MojoResult AddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result);
+ void RemoveWaiter(Waiter* waiter);
+
+ protected:
+ Dispatcher();
+
+ friend class base::RefCountedThreadSafe<Dispatcher>;
+ virtual ~Dispatcher();
+
+ // These are to be overridden by subclasses (if necessary). They are called
+ // exactly once -- first |CancelAllWaitersNoLock()|, then |CloseImplNoLock()|,
+ // when the dispatcher is being closed. They are called under |lock_|.
+ virtual void CancelAllWaitersNoLock();
+ virtual MojoResult CloseImplNoLock();
+
+ // These are to be overridden by subclasses (if necessary). They are never
+ // called after the dispatcher has been closed. They are called under |lock_|.
+ virtual MojoResult WriteMessageImplNoLock(const void* bytes,
+ uint32_t num_bytes,
+ const MojoHandle* handles,
+ uint32_t num_handles,
+ MojoWriteMessageFlags flags);
+ virtual MojoResult ReadMessageImplNoLock(void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags);
+ virtual MojoResult AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result);
+ virtual void RemoveWaiterImplNoLock(Waiter* waiter);
+
+ // Available to subclasses. (Note: Returns a non-const reference, just like
+ // |base::AutoLock|'s constructor takes a non-const reference.
+ base::Lock& lock() const { return lock_; }
+
+ private:
+ // This protects the following members as well as any state added by
+ // subclasses.
+ mutable base::Lock lock_;
+ bool is_closed_;
+
+ DISALLOW_COPY_AND_ASSIGN(Dispatcher);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_DISPATCHER_H_
diff --git a/mojo/system/dispatcher_unittest.cc b/mojo/system/dispatcher_unittest.cc
new file mode 100644
index 0000000..a479fce
--- /dev/null
+++ b/mojo/system/dispatcher_unittest.cc
@@ -0,0 +1,189 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/dispatcher.h"
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_vector.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/simple_thread.h"
+#include "mojo/system/waiter.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+// Trivial subclass that makes the constructor public.
+class TrivialDispatcher : public Dispatcher {
+ public:
+ TrivialDispatcher() {}
+
+ private:
+ friend class base::RefCountedThreadSafe<TrivialDispatcher>;
+ virtual ~TrivialDispatcher() {}
+
+ DISALLOW_COPY_AND_ASSIGN(TrivialDispatcher);
+};
+
+TEST(DispatcherTest, Basic) {
+ scoped_refptr<Dispatcher> d(new TrivialDispatcher());
+
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->WriteMessage(NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->ReadMessage(NULL, NULL, NULL, NULL,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ Waiter w;
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ d->AddWaiter(&w, MOJO_WAIT_FLAG_EVERYTHING, 0));
+ // Okay to remove even if it wasn't added (or was already removed).
+ d->RemoveWaiter(&w);
+ d->RemoveWaiter(&w);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->WriteMessage(NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->ReadMessage(NULL, NULL, NULL, NULL,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->AddWaiter(&w, MOJO_WAIT_FLAG_EVERYTHING, 0));
+ d->RemoveWaiter(&w);
+}
+
+class ThreadSafetyStressThread : public base::SimpleThread {
+ public:
+ enum DispatcherOp {
+ CLOSE = 0,
+ WRITE_MESSAGE,
+ READ_MESSAGE,
+ ADD_WAITER,
+ REMOVE_WAITER,
+
+ DISPATCHER_OP_COUNT
+ };
+
+ ThreadSafetyStressThread(base::WaitableEvent* event,
+ scoped_refptr<Dispatcher> dispatcher,
+ DispatcherOp op)
+ : base::SimpleThread("thread_safety_stress_thread"),
+ event_(event),
+ dispatcher_(dispatcher),
+ op_(op) {
+ CHECK_LE(0, op_);
+ CHECK_LT(op_, DISPATCHER_OP_COUNT);
+ }
+
+ virtual ~ThreadSafetyStressThread() {
+ Join();
+ }
+
+ private:
+ virtual void Run() OVERRIDE {
+ event_->Wait();
+
+ waiter_.Init();
+ switch(op_) {
+ case CLOSE: {
+ MojoResult r = dispatcher_->Close();
+ EXPECT_TRUE(r == MOJO_RESULT_OK || r == MOJO_RESULT_INVALID_ARGUMENT)
+ << "Result: " << r;
+ break;
+ }
+ case WRITE_MESSAGE:
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ dispatcher_->WriteMessage(NULL, 0, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ break;
+ case READ_MESSAGE:
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ dispatcher_->ReadMessage(NULL, NULL, NULL, NULL,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ break;
+ case ADD_WAITER: {
+ MojoResult r = dispatcher_->AddWaiter(&waiter_,
+ MOJO_WAIT_FLAG_EVERYTHING, 0);
+ EXPECT_TRUE(r == MOJO_RESULT_FAILED_PRECONDITION ||
+ r == MOJO_RESULT_INVALID_ARGUMENT);
+ break;
+ }
+ case REMOVE_WAITER:
+ dispatcher_->RemoveWaiter(&waiter_);
+ break;
+ default:
+ NOTREACHED();
+ break;
+ }
+
+ // Always try to remove the waiter, in case we added it.
+ dispatcher_->RemoveWaiter(&waiter_);
+ }
+
+ base::WaitableEvent* const event_;
+ const scoped_refptr<Dispatcher> dispatcher_;
+ const DispatcherOp op_;
+
+ Waiter waiter_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadSafetyStressThread);
+};
+
+TEST(DispatcherTest, ThreadSafetyStress) {
+ static const size_t kRepeatCount = 20;
+ static const size_t kNumThreads = 100;
+
+ for (size_t i = 0; i < kRepeatCount; i++) {
+ // Manual reset, not initially signalled.
+ base::WaitableEvent event(true, false);
+ scoped_refptr<Dispatcher> d(new TrivialDispatcher());
+
+ {
+ ScopedVector<ThreadSafetyStressThread> threads;
+ for (size_t j = 0; j < kNumThreads; j++) {
+ ThreadSafetyStressThread::DispatcherOp op =
+ static_cast<ThreadSafetyStressThread::DispatcherOp>(
+ (i+j) % ThreadSafetyStressThread::DISPATCHER_OP_COUNT);
+ threads.push_back(new ThreadSafetyStressThread(&event, d, op));
+ threads.back()->Start();
+ }
+ event.Signal(); // Kicks off real work on the threads.
+ } // Joins all the threads.
+
+ // One of the threads should already have closed the dispatcher.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, d->Close());
+ }
+}
+
+TEST(DispatcherTest, ThreadSafetyStressNoClose) {
+ static const size_t kRepeatCount = 20;
+ static const size_t kNumThreads = 100;
+
+ for (size_t i = 0; i < kRepeatCount; i++) {
+ // Manual reset, not initially signalled.
+ base::WaitableEvent event(true, false);
+ scoped_refptr<Dispatcher> d(new TrivialDispatcher());
+
+ {
+ ScopedVector<ThreadSafetyStressThread> threads;
+ for (size_t j = 0; j < kNumThreads; j++) {
+ ThreadSafetyStressThread::DispatcherOp op =
+ static_cast<ThreadSafetyStressThread::DispatcherOp>(
+ (i+j) % (ThreadSafetyStressThread::DISPATCHER_OP_COUNT-1) + 1);
+ threads.push_back(new ThreadSafetyStressThread(&event, d, op));
+ threads.back()->Start();
+ }
+ event.Signal(); // Kicks off real work on the threads.
+ } // Joins all the threads.
+
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ }
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/limits.h b/mojo/system/limits.h
new file mode 100644
index 0000000..277c23a
--- /dev/null
+++ b/mojo/system/limits.h
@@ -0,0 +1,25 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_LIMITS_H_
+#define MOJO_SYSTEM_LIMITS_H_
+
+namespace mojo {
+namespace system {
+
+// Maximum of open (Mojo) handles.
+// TODO(vtl): This doesn't count "live" handles, some of which may live in
+// messages.
+const size_t kMaxHandleTableSize = 1000000;
+
+const size_t kMaxWaitManyNumHandles = kMaxHandleTableSize;
+
+const size_t kMaxMessageNumBytes = 4 * 1024 * 1024;
+
+const size_t kMaxMessageNumHandles = 10000;
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_LIMITS_H_
diff --git a/mojo/system/memory.cc b/mojo/system/memory.cc
new file mode 100644
index 0000000..df658ae
--- /dev/null
+++ b/mojo/system/memory.cc
@@ -0,0 +1,26 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/memory.h"
+
+#include <limits>
+
+#include "base/logging.h"
+
+namespace mojo {
+namespace system {
+
+bool VerifyUserPointer(const void* pointer, size_t count, size_t size_each) {
+ DCHECK_GT(size_each, 0u);
+ if (count > std::numeric_limits<size_t>::max() / size_each)
+ return false;
+
+ // TODO(vtl): If running in kernel mode, do a full verification. For now, just
+ // check that it's non-null if |size| is nonzero. (A faster user mode
+ // implementation is also possible if this check is skipped.)
+ return count == 0 || !!pointer;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/memory.h b/mojo/system/memory.h
new file mode 100644
index 0000000..6a1dafc
--- /dev/null
+++ b/mojo/system/memory.h
@@ -0,0 +1,27 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_MEMORY_H_
+#define MOJO_SYSTEM_MEMORY_H_
+
+#include <stddef.h>
+
+namespace mojo {
+namespace system {
+
+// Verify that |count * size_each| bytes can be read from the user |pointer|
+// insofar as possible/necessary. |count| and |size_each| are specified
+// separately instead of a single size, since |count * size_each| may overflow a
+// |size_t|. |count| may be zero but |size_each| must be nonzero.
+//
+// For example, if running in kernel mode, this should be a full verification
+// that the given memory is owned and readable by the user process. In user
+// mode, if crashes are acceptable, this may do nothing at all (and always
+// return true).
+bool VerifyUserPointer(const void* pointer, size_t count, size_t size_each);
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_MEMORY_H_
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
new file mode 100644
index 0000000..8461705
--- /dev/null
+++ b/mojo/system/message_pipe.cc
@@ -0,0 +1,248 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/message_pipe.h"
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "mojo/system/limits.h"
+#include "mojo/system/memory.h"
+
+namespace mojo {
+namespace system {
+
+namespace {
+
+unsigned DestinationPortFromSourcePort(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+ return port ^ 1;
+}
+
+} // namespace
+
+MessagePipe::MessagePipe() {
+ is_open_[0] = is_open_[1] = true;
+}
+
+void MessagePipe::CancelAllWaiters(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ waiter_lists_[port].CancelAllWaiters();
+}
+
+void MessagePipe::Close(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ // Record the old state of the other (destination) port, so we can tell if it
+ // changes.
+ // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we
+ // don't have to do this.
+ MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE;
+ MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE;
+ bool dest_is_open = is_open_[destination_port];
+ if (dest_is_open) {
+ old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port);
+ old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port);
+ }
+
+ is_open_[port] = false;
+ STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port.
+
+ // Notify the other (destination) port if its state has changed.
+ if (dest_is_open) {
+ MojoWaitFlags new_dest_satisfied_flags =
+ SatisfiedFlagsNoLock(destination_port);
+ MojoWaitFlags new_dest_satisfiable_flags =
+ SatisfiableFlagsNoLock(destination_port);
+ if (new_dest_satisfied_flags != old_dest_satisfied_flags ||
+ new_dest_satisfiable_flags != old_dest_satisfiable_flags) {
+ waiter_lists_[destination_port].AwakeWaitersForStateChange(
+ new_dest_satisfied_flags, new_dest_satisfiable_flags);
+ }
+ }
+}
+
+MojoResult MessagePipe::WriteMessage(
+ unsigned port,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags /*flags*/) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ if (!VerifyUserPointer(bytes, num_bytes, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_bytes > kMaxMessageNumBytes)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (num_handles > kMaxMessageNumHandles)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+ if (num_handles > 0) {
+ // TODO(vtl): Verify each handle.
+ NOTIMPLEMENTED();
+ return MOJO_RESULT_UNIMPLEMENTED;
+ }
+
+ // TODO(vtl): Handle flags.
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ // The destination port need not be open, unlike the source port.
+ if (!is_open_[destination_port])
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ bool dest_was_empty = message_queues_[destination_port].empty();
+
+ // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
+ message_queues_[destination_port].push_back(
+ new MessageInTransit(bytes, num_bytes));
+ // TODO(vtl): Support sending handles.
+
+ // The other (destination) port was empty and now isn't, so it should now be
+ // readable. Wake up anyone waiting on this.
+ if (dest_was_empty) {
+ waiter_lists_[destination_port].AwakeWaitersForStateChange(
+ SatisfiedFlagsNoLock(destination_port),
+ SatisfiableFlagsNoLock(destination_port));
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipe::ReadMessage(unsigned port,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ DCHECK(port == 0 || port == 1);
+
+ const size_t max_bytes = num_bytes ? *num_bytes : 0;
+ if (!VerifyUserPointer(bytes, max_bytes, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ const size_t max_handles = num_handles ? *num_handles : 0;
+ if (!VerifyUserPointer(handles, max_handles, sizeof(handles[0])))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ if (message_queues_[port].empty())
+ return MOJO_RESULT_NOT_FOUND;
+
+ // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
+ // and release the lock immediately.
+ bool not_enough_space = false;
+ MessageInTransit* const message = message_queues_[port].front();
+ const size_t message_size = message->data.size();
+ if (num_bytes)
+ *num_bytes = static_cast<uint32_t>(message_size);
+ if (message_size <= max_bytes)
+ memcpy(bytes, message->data.data(), message_size);
+ else
+ not_enough_space = true;
+
+ // TODO(vtl): Support receiving handles.
+ if (num_handles)
+ *num_handles = 0;
+
+ if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
+ message_queues_[port].pop_front();
+ delete message;
+
+ // Now it's empty, thus no longer readable.
+ if (message_queues_[port].empty()) {
+ // It's currently not possible to wait for non-readability, but we should
+ // do the state change anyway.
+ waiter_lists_[port].AwakeWaitersForStateChange(
+ SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port));
+ }
+ }
+
+ if (not_enough_space)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipe::AddWaiter(unsigned port,
+ Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ if ((flags & SatisfiedFlagsNoLock(port)))
+ return MOJO_RESULT_ALREADY_EXISTS;
+ if (!(flags & SatisfiableFlagsNoLock(port)))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ waiter_lists_[port].AddWaiter(waiter, flags, wake_result);
+ return MOJO_RESULT_OK;
+}
+
+void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(is_open_[port]);
+
+ waiter_lists_[port].RemoveWaiter(waiter);
+}
+
+MessagePipe::~MessagePipe() {
+ // Owned by the dispatchers. The owning dispatchers should only release us via
+ // their |Close()| method, which should inform us of being closed via our
+ // |Close()|. Thus these should already be null.
+ DCHECK(!is_open_[0]);
+ DCHECK(!is_open_[1]);
+}
+
+MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ lock_.AssertAcquired();
+
+ MojoWaitFlags satisfied_flags = 0;
+ if (!message_queues_[port].empty())
+ satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
+ if (is_open_[destination_port])
+ satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
+
+ return satisfied_flags;
+}
+
+MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ unsigned destination_port = DestinationPortFromSourcePort(port);
+
+ lock_.AssertAcquired();
+
+ MojoWaitFlags satisfiable_flags = 0;
+ if (!message_queues_[port].empty() || is_open_[destination_port])
+ satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
+ if (is_open_[destination_port])
+ satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
+
+ return satisfiable_flags;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
new file mode 100644
index 0000000..c02f29e
--- /dev/null
+++ b/mojo/system/message_pipe.h
@@ -0,0 +1,79 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_MESSAGE_PIPE_H_
+#define MOJO_SYSTEM_MESSAGE_PIPE_H_
+
+#include <list>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/lock.h"
+#include "mojo/public/system/core.h"
+#include "mojo/system/waiter_list.h"
+
+namespace mojo {
+namespace system {
+
+class Waiter;
+
+// |MessagePipe| is the secondary object implementing a message pipe (see the
+// explanatory comment in core_impl.cc), and is jointly owned by the two
+// dispatchers passed in to the constructor. This class is thread-safe.
+class MessagePipe : public base::RefCountedThreadSafe<MessagePipe> {
+ public:
+ MessagePipe();
+
+ // These are called by the dispatcher to implement its methods of
+ // corresponding names. In all cases, the port |port| must be open.
+ void CancelAllWaiters(unsigned port);
+ void Close(unsigned port);
+ MojoResult WriteMessage(unsigned port,
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags);
+ MojoResult ReadMessage(unsigned port,
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags);
+ MojoResult AddWaiter(unsigned port,
+ Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result);
+ void RemoveWaiter(unsigned port, Waiter* waiter);
+
+ private:
+ struct MessageInTransit {
+ MessageInTransit(const void* bytes, uint32_t num_bytes)
+ : data(static_cast<const char*>(bytes), num_bytes) {}
+
+ // TODO(vtl): Replace with something more efficient.
+ std::string data;
+ };
+
+ friend class base::RefCountedThreadSafe<MessagePipe>;
+ virtual ~MessagePipe();
+
+ MojoWaitFlags SatisfiedFlagsNoLock(unsigned port);
+ MojoWaitFlags SatisfiableFlagsNoLock(unsigned port);
+
+ base::Lock lock_; // Protects the following members.
+ bool is_open_[2];
+ // These are *incoming* queues for their corresponding ports. It owns its
+ // contents.
+ // TODO(vtl): When possible (with C++11), convert the plain pointers to
+ // scoped_ptr/unique_ptr. (Then we'll be able to use an |std::queue| instead
+ // of an |std::list|.)
+ std::list<MessageInTransit*> message_queues_[2];
+ WaiterList waiter_lists_[2];
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePipe);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_MESSAGE_PIPE_H_
diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc
new file mode 100644
index 0000000..ec5c80b
--- /dev/null
+++ b/mojo/system/message_pipe_dispatcher.cc
@@ -0,0 +1,77 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/message_pipe_dispatcher.h"
+
+#include "base/logging.h"
+#include "mojo/system/message_pipe.h"
+
+namespace mojo {
+namespace system {
+
+MessagePipeDispatcher::MessagePipeDispatcher() {
+}
+
+void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
+ unsigned port) {
+ DCHECK(message_pipe.get());
+ DCHECK(port == 0 || port == 1);
+
+ message_pipe_ = message_pipe;
+ port_ = port;
+}
+
+MessagePipeDispatcher::~MessagePipeDispatcher() {
+ // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
+ DCHECK(!message_pipe_.get());
+}
+
+void MessagePipeDispatcher::CancelAllWaitersNoLock() {
+ lock().AssertAcquired();
+ message_pipe_->CancelAllWaiters(port_);
+}
+
+MojoResult MessagePipeDispatcher::CloseImplNoLock() {
+ lock().AssertAcquired();
+ message_pipe_->Close(port_);
+ message_pipe_ = NULL;
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
+ const void* bytes, uint32_t num_bytes,
+ const MojoHandle* handles, uint32_t num_handles,
+ MojoWriteMessageFlags flags) {
+ lock().AssertAcquired();
+ return message_pipe_->WriteMessage(port_,
+ bytes, num_bytes,
+ handles, num_handles,
+ flags);
+}
+
+MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
+ void* bytes, uint32_t* num_bytes,
+ MojoHandle* handles, uint32_t* num_handles,
+ MojoReadMessageFlags flags) {
+ lock().AssertAcquired();
+ return message_pipe_->ReadMessage(port_,
+ bytes, num_bytes,
+ handles, num_handles,
+ flags);
+}
+
+MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ lock().AssertAcquired();
+ return message_pipe_->AddWaiter(port_, waiter, flags, wake_result);
+}
+
+void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
+ lock().AssertAcquired();
+ message_pipe_->RemoveWaiter(port_, waiter);
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/message_pipe_dispatcher.h b/mojo/system/message_pipe_dispatcher.h
new file mode 100644
index 0000000..b52ebb8
--- /dev/null
+++ b/mojo/system/message_pipe_dispatcher.h
@@ -0,0 +1,61 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
+#define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "mojo/system/dispatcher.h"
+
+namespace mojo {
+namespace system {
+
+class MessagePipe;
+
+// This is the |Dispatcher| implementation for message pipes (created by the
+// Mojo primitive |MojoCreateMessagePipe()|). This class is thread-safe.
+class MessagePipeDispatcher : public Dispatcher {
+ public:
+ MessagePipeDispatcher();
+
+ // Must be called before any other methods. (This method is not thread-safe.)
+ void Init(scoped_refptr<MessagePipe> message_pipe, unsigned port);
+
+ private:
+ friend class base::RefCountedThreadSafe<MessagePipeDispatcher>;
+ virtual ~MessagePipeDispatcher();
+
+ // |Dispatcher| implementation/overrides:
+ virtual void CancelAllWaitersNoLock() OVERRIDE;
+ virtual MojoResult CloseImplNoLock() OVERRIDE;
+ virtual MojoResult WriteMessageImplNoLock(
+ const void* bytes,
+ uint32_t num_bytes,
+ const MojoHandle* handles,
+ uint32_t num_handles,
+ MojoWriteMessageFlags flags) OVERRIDE;
+ virtual MojoResult ReadMessageImplNoLock(
+ void* bytes,
+ uint32_t* num_bytes,
+ MojoHandle* handles,
+ uint32_t* num_handles,
+ MojoReadMessageFlags flags) OVERRIDE;
+ virtual MojoResult AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) OVERRIDE;
+ virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE;
+
+ // Protected by |lock()|:
+ scoped_refptr<MessagePipe> message_pipe_; // This will be null if closed.
+ unsigned port_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
diff --git a/mojo/system/message_pipe_dispatcher_unittest.cc b/mojo/system/message_pipe_dispatcher_unittest.cc
new file mode 100644
index 0000000..b96d316
--- /dev/null
+++ b/mojo/system/message_pipe_dispatcher_unittest.cc
@@ -0,0 +1,539 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
+// heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase
+// tolerance and reduce observed flakiness.
+
+#include "mojo/system/message_pipe_dispatcher.h"
+
+#include <string.h>
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_vector.h"
+#include "base/rand_util.h"
+#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "base/threading/simple_thread.h"
+#include "base/time/time.h"
+#include "mojo/system/message_pipe.h"
+#include "mojo/system/test_utils.h"
+#include "mojo/system/waiter.h"
+#include "mojo/system/waiter_test_utils.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+const int64_t kMicrosPerMs = 1000;
+const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
+
+TEST(MessagePipeDispatcherTest, Basic) {
+ test::Stopwatch stopwatch;
+ int32_t buffer[1];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t buffer_size;
+ int64_t elapsed_micros;
+
+ // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
+ for (unsigned i = 0; i < 2; i++) {
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ d_0->Init(mp, i); // 0, 1.
+ d_1->Init(mp, i ^ 1); // 1, 0.
+ }
+ Waiter w;
+
+ // Try adding a writable waiter when already writable.
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0));
+ // Shouldn't need to remove the waiter (it was not added).
+
+ // Add a readable waiter to |d_0|, then make it readable (by writing to
+ // |d_1|), then wait.
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
+ buffer[0] = 123456789;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_1->WriteMessage(buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ stopwatch.Start();
+ EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d_0->RemoveWaiter(&w);
+
+ // Try adding a readable waiter when already readable (from above).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
+ // Shouldn't need to remove the waiter (it was not added).
+
+ // Make |d_0| no longer readable (by reading from it).
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->ReadMessage(buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(kBufferSize, buffer_size);
+ EXPECT_EQ(123456789, buffer[0]);
+
+ // Wait for zero time for readability on |d_0| (will time out).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d_0->RemoveWaiter(&w);
+
+ // Wait for non-zero, finite time for readability on |d_0| (will time out).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+ d_0->RemoveWaiter(&w);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
+ }
+}
+
+// Test what happens when one end is closed (single-threaded test).
+TEST(MessagePipeDispatcherTest, BasicClosed) {
+ int32_t buffer[1];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t buffer_size;
+
+ // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
+ for (unsigned i = 0; i < 2; i++) {
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ d_0->Init(mp, i); // 0, 1.
+ d_1->Init(mp, i ^ 1); // 1, 0.
+ }
+ Waiter w;
+
+ // Write (twice) to |d_1|.
+ buffer[0] = 123456789;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_1->WriteMessage(buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ buffer[0] = 234567890;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_1->WriteMessage(buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Try waiting for readable on |d_0|; should fail (already satisfied).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
+
+ // Close |d_1|.
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
+
+ // Try waiting for readable on |d_0|; should fail (already satisfied).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
+
+ // Read from |d_0|.
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->ReadMessage(buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(kBufferSize, buffer_size);
+ EXPECT_EQ(123456789, buffer[0]);
+
+ // Try waiting for readable on |d_0|; should fail (already satisfied).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
+
+ // Read again from |d_0|.
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->ReadMessage(buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(kBufferSize, buffer_size);
+ EXPECT_EQ(234567890, buffer[0]);
+
+ // Try waiting for readable on |d_0|; should fail (unsatisfiable).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
+
+ // Try waiting for writable on |d_0|; should fail (unsatisfiable).
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
+
+ // Try reading from |d_0|; should fail (nothing to read).
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ d_0->ReadMessage(buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ // Try writing to |d_0|; should fail (other end closed).
+ buffer[0] = 345678901;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ d_0->WriteMessage(buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
+ }
+}
+
+TEST(MessagePipeDispatcherTest, BasicThreaded) {
+ test::Stopwatch stopwatch;
+ int32_t buffer[1];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t buffer_size;
+ bool did_wait;
+ MojoResult result;
+ int64_t elapsed_micros;
+
+ // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
+ for (unsigned i = 0; i < 2; i++) {
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ d_0->Init(mp, i); // 0, 1.
+ d_1->Init(mp, i ^ 1); // 1, 0.
+ }
+
+ // Wait for readable on |d_1|, which will become readable after some time.
+ {
+ test::WaiterThread thread(d_1,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 0,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ // Wake it up by writing to |d_0|.
+ buffer[0] = 123456789;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_0->WriteMessage(buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(0, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ // Now |d_1| is already readable. Try waiting for it again.
+ {
+ test::WaiterThread thread(d_1,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 1,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_FALSE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+
+ // Consume what we wrote to |d_0|.
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ d_1->ReadMessage(buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(kBufferSize, buffer_size);
+ EXPECT_EQ(123456789, buffer[0]);
+
+ // Wait for readable on |d_1| and close |d_0| after some time, which should
+ // cancel that wait.
+ {
+ test::WaiterThread thread(d_1,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 0,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
+ }
+
+ for (unsigned i = 0; i < 2; i++) {
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ d_0->Init(mp, i); // 0, 1.
+ d_1->Init(mp, i ^ 1); // 1, 0.
+ }
+
+ // Wait for readable on |d_1| and close |d_1| after some time, which should
+ // cancel that wait.
+ {
+ test::WaiterThread thread(d_1,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 0,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
+ }
+}
+
+// Stress test -----------------------------------------------------------------
+
+const size_t kMaxMessageSize = 2000;
+
+class WriterThread : public base::SimpleThread {
+ public:
+ // |*messages_written| and |*bytes_written| belong to the thread while it's
+ // alive.
+ WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
+ size_t* messages_written, size_t* bytes_written)
+ : base::SimpleThread("writer_thread"),
+ write_dispatcher_(write_dispatcher),
+ messages_written_(messages_written),
+ bytes_written_(bytes_written) {
+ *messages_written_ = 0;
+ *bytes_written_ = 0;
+ }
+
+ virtual ~WriterThread() {
+ Join();
+ }
+
+ private:
+ virtual void Run() OVERRIDE {
+ // Make some data to write.
+ unsigned char buffer[kMaxMessageSize];
+ for (size_t i = 0; i < kMaxMessageSize; i++)
+ buffer[i] = static_cast<unsigned char>(i);
+
+ // Number of messages to write.
+ *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
+
+ // Write messages.
+ for (size_t i = 0; i < *messages_written_; i++) {
+ uint32_t bytes_to_write = static_cast<uint32_t>(
+ base::RandInt(1, static_cast<int>(kMaxMessageSize)));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ write_dispatcher_->WriteMessage(buffer, bytes_to_write,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ *bytes_written_ += bytes_to_write;
+ }
+
+ // Write one last "quit" message.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ write_dispatcher_->WriteMessage("quit", 4, NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ }
+
+ const scoped_refptr<Dispatcher> write_dispatcher_;
+ size_t* const messages_written_;
+ size_t* const bytes_written_;
+
+ DISALLOW_COPY_AND_ASSIGN(WriterThread);
+};
+
+class ReaderThread : public base::SimpleThread {
+ public:
+ // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
+ ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
+ size_t* messages_read, size_t* bytes_read)
+ : base::SimpleThread("reader_thread"),
+ read_dispatcher_(read_dispatcher),
+ messages_read_(messages_read),
+ bytes_read_(bytes_read) {
+ *messages_read_ = 0;
+ *bytes_read_ = 0;
+ }
+
+ virtual ~ReaderThread() {
+ Join();
+ }
+
+ private:
+ virtual void Run() OVERRIDE {
+ unsigned char buffer[kMaxMessageSize];
+ MojoResult result;
+ Waiter w;
+
+ // Read messages.
+ for (;;) {
+ // Wait for it to be readable.
+ w.Init();
+ result = read_dispatcher_->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0);
+ EXPECT_TRUE(result == MOJO_RESULT_OK ||
+ result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
+ if (result == MOJO_RESULT_OK) {
+ // Actually need to wait.
+ EXPECT_EQ(0, w.Wait(MOJO_DEADLINE_INDEFINITE));
+ read_dispatcher_->RemoveWaiter(&w);
+ }
+
+ // Now, try to do the read.
+ // Clear the buffer so that we can check the result.
+ memset(buffer, 0, sizeof(buffer));
+ uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ result = read_dispatcher_->ReadMessage(buffer, &buffer_size, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ EXPECT_TRUE(result == MOJO_RESULT_OK ||
+ result == MOJO_RESULT_NOT_FOUND) << "result: " << result;
+ // We're racing with others to read, so maybe we failed.
+ if (result == MOJO_RESULT_NOT_FOUND)
+ continue; // In which case, try again.
+ // Check for quit.
+ if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
+ return;
+ EXPECT_GE(buffer_size, 1u);
+ EXPECT_LE(buffer_size, kMaxMessageSize);
+ EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
+
+ (*messages_read_)++;
+ *bytes_read_ += buffer_size;
+ }
+ }
+
+ static bool IsValidMessage(const unsigned char* buffer,
+ uint32_t message_size) {
+ size_t i;
+ for (i = 0; i < message_size; i++) {
+ if (buffer[i] != static_cast<unsigned char>(i))
+ return false;
+ }
+ // Check that the remaining bytes weren't stomped on.
+ for (; i < kMaxMessageSize; i++) {
+ if (buffer[i] != 0)
+ return false;
+ }
+ return true;
+ }
+
+ const scoped_refptr<Dispatcher> read_dispatcher_;
+ size_t* const messages_read_;
+ size_t* const bytes_read_;
+
+ DISALLOW_COPY_AND_ASSIGN(ReaderThread);
+};
+
+TEST(MessagePipeDispatcherTest, Stress) {
+ static const size_t kNumWriters = 30;
+ static const size_t kNumReaders = kNumWriters;
+
+ scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher());
+ scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher());
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ d_write->Init(mp, 0);
+ d_read->Init(mp, 1);
+ }
+
+ size_t messages_written[kNumWriters];
+ size_t bytes_written[kNumWriters];
+ size_t messages_read[kNumReaders];
+ size_t bytes_read[kNumReaders];
+ {
+ // Make writers.
+ ScopedVector<WriterThread> writers;
+ for (size_t i = 0; i < kNumWriters; i++) {
+ writers.push_back(
+ new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
+ }
+
+ // Make readers.
+ ScopedVector<ReaderThread> readers;
+ for (size_t i = 0; i < kNumReaders; i++) {
+ readers.push_back(
+ new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
+ }
+
+ // Start writers.
+ for (size_t i = 0; i < kNumWriters; i++)
+ writers[i]->Start();
+
+ // Start readers.
+ for (size_t i = 0; i < kNumReaders; i++)
+ readers[i]->Start();
+
+ // TODO(vtl): Maybe I should have an event that triggers all the threads to
+ // start doing stuff for real (so that the first ones created/started aren't
+ // advantaged).
+ } // Joins all the threads.
+
+ size_t total_messages_written = 0;
+ size_t total_bytes_written = 0;
+ for (size_t i = 0; i < kNumWriters; i++) {
+ total_messages_written += messages_written[i];
+ total_bytes_written += bytes_written[i];
+ }
+ size_t total_messages_read = 0;
+ size_t total_bytes_read = 0;
+ for (size_t i = 0; i < kNumReaders; i++) {
+ total_messages_read += messages_read[i];
+ total_bytes_read += bytes_read[i];
+ // We'd have to be really unlucky to have read no messages on a thread.
+ EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
+ EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
+ }
+ EXPECT_EQ(total_messages_written, total_messages_read);
+ EXPECT_EQ(total_bytes_written, total_bytes_read);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
+ EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/message_pipe_unittest.cc b/mojo/system/message_pipe_unittest.cc
new file mode 100644
index 0000000..4ecd4ce
--- /dev/null
+++ b/mojo/system/message_pipe_unittest.cc
@@ -0,0 +1,540 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/message_pipe.h"
+
+#include <limits>
+
+#include "base/memory/ref_counted.h"
+#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "base/time/time.h"
+#include "mojo/system/waiter.h"
+#include "mojo/system/waiter_test_utils.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+// Tests:
+// - only default flags
+// - reading messages from a port
+// - when there are no/one/two messages available for that port
+// - with buffer size 0 (and null buffer) -- should get size
+// - with too-small buffer -- should get size
+// - also verify that buffers aren't modified when/where they shouldn't be
+// - writing messages to a port
+// - in the obvious scenarios (as above)
+// - to a port that's been closed
+// - writing a message to a port, closing the other (would be the source) port,
+// and reading it
+TEST(MessagePipeTest, Basic) {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+
+ int32_t buffer[2];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t buffer_size;
+
+ // Nothing to read yet on port 0.
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(kBufferSize, buffer_size);
+ EXPECT_EQ(123, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Ditto for port 1.
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ // Write from port 1 (to port 0).
+ buffer[0] = 789012345;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(1,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Read from port 0.
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+ EXPECT_EQ(789012345, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Read again from port 0 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ // Write two messages from port 0 (to port 1).
+ buffer[0] = 123456789;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(0,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ buffer[0] = 234567890;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(0,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Read from port 1 with buffer size 0 (should get the size of next message).
+ // Also test that giving a null buffer is okay when the buffer size is 0.
+ buffer_size = 0;
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->ReadMessage(1,
+ NULL, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+
+ // Read from port 1 with buffer size 1 (too small; should get the size of next
+ // message).
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = 1;
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+ EXPECT_EQ(123, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Read from port 1.
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+ EXPECT_EQ(123456789, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Read again from port 1.
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+ EXPECT_EQ(234567890, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Read again from port 1 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ // Write from port 0 (to port 1).
+ buffer[0] = 345678901;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(0,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Close port 0.
+ mp->Close(0);
+
+ // Try to write from port 1 (to port 0).
+ buffer[0] = 456789012;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ mp->WriteMessage(1,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Read from port 1; should still get message (even though port 0 was closed).
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+ EXPECT_EQ(345678901, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Read again from port 1 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ mp->Close(1);
+}
+
+TEST(MessagePipeTest, DiscardMode) {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+
+ int32_t buffer[2];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t buffer_size;
+
+ // Write from port 1 (to port 0).
+ buffer[0] = 789012345;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(1,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Read/discard from port 0 (no buffer); get size.
+ buffer_size = 0;
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->ReadMessage(0,
+ NULL, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+
+ // Read again from port 0 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+
+ // Write from port 1 (to port 0).
+ buffer[0] = 890123456;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(1,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Read from port 0 (buffer big enough).
+ buffer[0] = 123;
+ buffer[1] = 456;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+ EXPECT_EQ(890123456, buffer[0]);
+ EXPECT_EQ(456, buffer[1]);
+
+ // Read again from port 0 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+
+ // Write from port 1 (to port 0).
+ buffer[0] = 901234567;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(1,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Read/discard from port 0 (buffer too small); get size.
+ buffer_size = 1;
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+ EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size);
+
+ // Read again from port 0 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+
+ // Write from port 1 (to port 0).
+ buffer[0] = 123456789;
+ buffer[1] = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(1,
+ buffer, static_cast<uint32_t>(sizeof(buffer[0])),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Discard from port 0.
+ buffer_size = 1;
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->ReadMessage(0,
+ NULL, NULL,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+
+ // Read again from port 0 -- it should be empty.
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_MAY_DISCARD));
+
+ mp->Close(0);
+ mp->Close(1);
+}
+
+TEST(MessagePipeTest, InvalidParams) {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+
+ char buffer[1];
+ MojoHandle handles[1];
+
+ // |WriteMessage|:
+ // Null buffer with nonzero buffer size.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ mp->WriteMessage(0,
+ NULL, 1,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ // Huge buffer size.
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->WriteMessage(0,
+ buffer, std::numeric_limits<uint32_t>::max(),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Null handles with nonzero handle count.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ mp->WriteMessage(0,
+ buffer, sizeof(buffer),
+ NULL, 1,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ // Huge handle count (implausibly big on some systems -- more than can be
+ // stored in a 32-bit address space).
+ // Note: This may return either |MOJO_RESULT_INVALID_ARGUMENT| or
+ // |MOJO_RESULT_RESOURCE_EXHAUSTED|, depending on whether it's plausible or
+ // not.
+ EXPECT_NE(MOJO_RESULT_OK,
+ mp->WriteMessage(0,
+ buffer, sizeof(buffer),
+ handles, std::numeric_limits<uint32_t>::max(),
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ // Huge handle count (plausibly big).
+ EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
+ mp->WriteMessage(0,
+ buffer, sizeof(buffer),
+ handles, std::numeric_limits<uint32_t>::max() /
+ sizeof(handles[0]),
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // |ReadMessage|:
+ // Null buffer with nonzero buffer size.
+ uint32_t buffer_size = 1;
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ mp->ReadMessage(0,
+ NULL, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ // Null handles with nonzero handle count.
+ buffer_size = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t handle_count = 1;
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ mp->ReadMessage(0,
+ buffer, &buffer_size,
+ NULL, &handle_count,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+
+ mp->Close(0);
+ mp->Close(1);
+}
+
+TEST(MessagePipeTest, BasicWaiting) {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ Waiter waiter;
+
+ int32_t buffer[1];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+ uint32_t buffer_size;
+
+ // Always writable (until the other port is closed).
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ mp->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_WRITABLE, 0));
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ mp->AddWaiter(0,
+ &waiter,
+ MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE,
+ 0));
+
+ // Not yet readable.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 1));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0));
+ mp->RemoveWaiter(0, &waiter);
+
+ // Write from port 0 (to port 1), to make port 1 readable.
+ buffer[0] = 123456789;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(0,
+ buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Port 1 should already be readable now.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 2));
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ mp->AddWaiter(1,
+ &waiter,
+ MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE,
+ 0));
+ // ... and still writable.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_WRITABLE, 3));
+
+ // Close port 0.
+ mp->Close(0);
+
+ // Now port 1 should not be writable.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_WRITABLE, 4));
+
+ // But it should still be readable.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 5));
+
+ // Read from port 1.
+ buffer[0] = 0;
+ buffer_size = kBufferSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->ReadMessage(1,
+ buffer, &buffer_size,
+ NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(123456789, buffer[0]);
+
+ // Now port 1 should no longer be readable.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 6));
+
+ mp->Close(1);
+}
+
+TEST(MessagePipeTest, ThreadedWaiting) {
+ int32_t buffer[1];
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
+
+ MojoResult result;
+
+ // Write to wake up waiter waiting for read.
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ test::SimpleWaiterThread thread(&result);
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->AddWaiter(1, thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0));
+ thread.Start();
+
+ buffer[0] = 123456789;
+ // Write from port 0 (to port 1), which should wake up the waiter.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->WriteMessage(0,
+ buffer, kBufferSize,
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ mp->RemoveWaiter(1, thread.waiter());
+
+ mp->Close(0);
+ mp->Close(1);
+ } // Joins |thread|.
+ // The waiter should have woken up successfully.
+ EXPECT_EQ(0, result);
+
+ // Close to cancel waiter.
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ test::SimpleWaiterThread thread(&result);
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->AddWaiter(1, thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0));
+ thread.Start();
+
+ // Close port 1 first -- this should result in the waiter being cancelled.
+ mp->CancelAllWaiters(1);
+ mp->Close(1);
+
+ // Port 1 is closed, so |Dispatcher::RemoveWaiter()| wouldn't call into the
+ // |MessagePipe| to remove any waiter.
+
+ mp->Close(0);
+ } // Joins |thread|.
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+
+ // Close to make waiter un-wake-up-able.
+ {
+ scoped_refptr<MessagePipe> mp(new MessagePipe());
+ test::SimpleWaiterThread thread(&result);
+
+ EXPECT_EQ(MOJO_RESULT_OK,
+ mp->AddWaiter(1, thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0));
+ thread.Start();
+
+ // Close port 0 first -- this should wake the waiter up, since port 1 will
+ // never be readable.
+ mp->CancelAllWaiters(0);
+ mp->Close(0);
+
+ mp->RemoveWaiter(1, thread.waiter());
+
+ mp->CancelAllWaiters(1);
+ mp->Close(1);
+ } // Joins |thread|.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/simple_dispatcher.cc b/mojo/system/simple_dispatcher.cc
new file mode 100644
index 0000000..ce2baef
--- /dev/null
+++ b/mojo/system/simple_dispatcher.cc
@@ -0,0 +1,49 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/simple_dispatcher.h"
+
+#include "base/logging.h"
+
+namespace mojo {
+namespace system {
+
+SimpleDispatcher::SimpleDispatcher() {
+}
+
+SimpleDispatcher::~SimpleDispatcher() {
+}
+
+void SimpleDispatcher::StateChangedNoLock() {
+ lock().AssertAcquired();
+ waiter_list_.AwakeWaitersForStateChange(SatisfiedFlagsNoLock(),
+ SatisfiableFlagsNoLock());
+}
+
+void SimpleDispatcher::CancelAllWaitersNoLock() {
+ lock().AssertAcquired();
+ waiter_list_.CancelAllWaiters();
+}
+
+MojoResult SimpleDispatcher::AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ lock().AssertAcquired();
+
+ if ((flags & SatisfiedFlagsNoLock()))
+ return MOJO_RESULT_ALREADY_EXISTS;
+ if (!(flags & SatisfiableFlagsNoLock()))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ waiter_list_.AddWaiter(waiter, flags, wake_result);
+ return MOJO_RESULT_OK;
+}
+
+void SimpleDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
+ lock().AssertAcquired();
+ waiter_list_.RemoveWaiter(waiter);
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/simple_dispatcher.h b/mojo/system/simple_dispatcher.h
new file mode 100644
index 0000000..f306aba
--- /dev/null
+++ b/mojo/system/simple_dispatcher.h
@@ -0,0 +1,58 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_SIMPLE_DISPATCHER_H_
+#define MOJO_SYSTEM_SIMPLE_DISPATCHER_H_
+
+#include <list>
+
+#include "base/basictypes.h"
+#include "mojo/system/dispatcher.h"
+#include "mojo/system/waiter_list.h"
+
+namespace mojo {
+namespace system {
+
+// A base class for simple dispatchers. "Simple" means that there's a one-to-one
+// correspondence between handles and dispatchers (see the explanatory comment
+// in core_impl.cc). This class implements the standard waiter-signalling
+// mechanism in that case.
+class SimpleDispatcher : public Dispatcher {
+ protected:
+ SimpleDispatcher();
+
+ friend class base::RefCountedThreadSafe<SimpleDispatcher>;
+ virtual ~SimpleDispatcher();
+
+ // To be called by subclasses when the state changes (so
+ // |SatisfiedFlagsNoLock()| and |SatisfiableFlagsNoLock()| should be checked
+ // again). Must be called under lock.
+ void StateChangedNoLock();
+
+ // These should return the wait flags that are satisfied by the object's
+ // current state and those that may eventually be satisfied by this object's
+ // state, respectively. They should be overridden by subclasses to reflect
+ // their notion of state. They are never called after the dispatcher has been
+ // closed. They are called under |lock_|.
+ virtual MojoWaitFlags SatisfiedFlagsNoLock() const = 0;
+ virtual MojoWaitFlags SatisfiableFlagsNoLock() const = 0;
+
+ // |Dispatcher| implementation/overrides:
+ virtual void CancelAllWaitersNoLock() OVERRIDE;
+ virtual MojoResult AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) OVERRIDE;
+ virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE;
+
+ private:
+ // Protected by |lock()|:
+ WaiterList waiter_list_;
+
+ DISALLOW_COPY_AND_ASSIGN(SimpleDispatcher);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_SIMPLE_DISPATCHER_H_
diff --git a/mojo/system/simple_dispatcher_unittest.cc b/mojo/system/simple_dispatcher_unittest.cc
new file mode 100644
index 0000000..9822f95
--- /dev/null
+++ b/mojo/system/simple_dispatcher_unittest.cc
@@ -0,0 +1,514 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
+// heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase
+// tolerance and reduce observed flakiness.
+
+#include "mojo/system/simple_dispatcher.h"
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_vector.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "base/time/time.h"
+#include "mojo/system/test_utils.h"
+#include "mojo/system/waiter.h"
+#include "mojo/system/waiter_test_utils.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+const int64_t kMicrosPerMs = 1000;
+const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
+
+class MockSimpleDispatcher : public SimpleDispatcher {
+ public:
+ MockSimpleDispatcher()
+ : satisfied_flags_(MOJO_WAIT_FLAG_NONE),
+ satisfiable_flags_(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE) {}
+
+ void SetSatisfiedFlags(MojoWaitFlags new_satisfied_flags) {
+ base::AutoLock locker(lock());
+
+ // Any new flags that are set should be satisfiable.
+ CHECK_EQ(new_satisfied_flags & ~satisfied_flags_,
+ new_satisfied_flags & ~satisfied_flags_ & satisfiable_flags_);
+
+ if (new_satisfied_flags == satisfied_flags_)
+ return;
+
+ satisfied_flags_ = new_satisfied_flags;
+ StateChangedNoLock();
+ }
+
+ void SetSatisfiableFlags(MojoWaitFlags new_satisfiable_flags) {
+ base::AutoLock locker(lock());
+
+ if (new_satisfiable_flags == satisfiable_flags_)
+ return;
+
+ satisfiable_flags_ = new_satisfiable_flags;
+ StateChangedNoLock();
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<MockSimpleDispatcher>;
+ virtual ~MockSimpleDispatcher() {}
+
+ // |SimpleDispatcher| implementation:
+ virtual MojoWaitFlags SatisfiedFlagsNoLock() const OVERRIDE {
+ lock().AssertAcquired();
+ return satisfied_flags_;
+ }
+
+ virtual MojoWaitFlags SatisfiableFlagsNoLock() const OVERRIDE {
+ lock().AssertAcquired();
+ return satisfiable_flags_;
+ }
+
+ // Protected by |lock()|:
+ MojoWaitFlags satisfied_flags_;
+ MojoWaitFlags satisfiable_flags_;
+
+ DISALLOW_COPY_AND_ASSIGN(MockSimpleDispatcher);
+};
+
+TEST(SimpleDispatcherTest, Basic) {
+ test::Stopwatch stopwatch;
+ int64_t elapsed_micros;
+
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ Waiter w;
+
+ // Try adding a readable waiter when already readable.
+ w.Init();
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ d->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
+ // Shouldn't need to remove the waiter (it was not added).
+
+ // Wait (forever) for writable when already writable.
+ w.Init();
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 1));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE);
+ stopwatch.Start();
+ EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ // Wait for zero time for writable when already writable.
+ w.Init();
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 2));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE);
+ stopwatch.Start();
+ EXPECT_EQ(2, w.Wait(0));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ // Wait for non-zero, finite time for writable when already writable.
+ w.Init();
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 3));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE);
+ stopwatch.Start();
+ EXPECT_EQ(3, w.Wait(2 * kEpsilonMicros));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ // Wait for zero time for writable when not writable (will time out).
+ w.Init();
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ // Wait for non-zero, finite time for writable when not writable (will time
+ // out).
+ w.Init();
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+}
+
+TEST(SimpleDispatcherTest, BasicUnsatisfiable) {
+ test::Stopwatch stopwatch;
+ int64_t elapsed_micros;
+
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ Waiter w;
+
+ // Try adding a writable waiter when it can never be writable.
+ w.Init();
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE);
+ d->SetSatisfiedFlags(0);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 5));
+ // Shouldn't need to remove the waiter (it was not added).
+
+ // Wait (forever) for writable and then it becomes never writable.
+ w.Init();
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 6));
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE);
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, w.Wait(MOJO_DEADLINE_INDEFINITE));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ // Wait for zero time for writable and then it becomes never writable.
+ w.Init();
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 6));
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE);
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, w.Wait(0));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ // Wait for non-zero, finite time for writable and then it becomes never
+ // writable.
+ w.Init();
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 7));
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE);
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, w.Wait(2 * kEpsilonMicros));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ d->RemoveWaiter(&w);
+
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+}
+
+TEST(SimpleDispatcherTest, BasicClosed) {
+ test::Stopwatch stopwatch;
+ int64_t elapsed_micros;
+
+ scoped_refptr<MockSimpleDispatcher> d;
+ Waiter w;
+
+ // Try adding a writable waiter when the dispatcher has been closed.
+ d = new MockSimpleDispatcher();
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 8));
+ // Shouldn't need to remove the waiter (it was not added).
+
+ // Wait (forever) for writable and then the dispatcher is closed.
+ d = new MockSimpleDispatcher();
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 9));
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, w.Wait(MOJO_DEADLINE_INDEFINITE));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ // Don't need to remove waiters from closed dispatchers.
+
+ // Wait for zero time for writable and then the dispatcher is closed.
+ d = new MockSimpleDispatcher();
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 10));
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, w.Wait(0));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ // Don't need to remove waiters from closed dispatchers.
+
+ // Wait for non-zero, finite time for writable and then the dispatcher is
+ // closed.
+ d = new MockSimpleDispatcher();
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 11));
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, w.Wait(2 * kEpsilonMicros));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ // Don't need to remove waiters from closed dispatchers.
+}
+
+TEST(SimpleDispatcherTest, BasicThreaded) {
+ test::Stopwatch stopwatch;
+ bool did_wait;
+ MojoResult result;
+ int64_t elapsed_micros;
+
+ // Wait for readable (already readable).
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ {
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ test::WaiterThread thread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 0,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ } // Joins the thread.
+ // If we closed earlier, then probably we'd get a |MOJO_RESULT_CANCELLED|.
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ }
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_FALSE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+
+ // Wait for readable and becomes readable after some time.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ test::WaiterThread thread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 1,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(1, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ // Wait for readable and becomes never-readable after some time.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ test::WaiterThread thread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 2,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_NONE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ // Wait for readable and dispatcher gets closed.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ test::WaiterThread thread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ 3,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the thread.
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ // Wait for readable and times out.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ {
+ test::WaiterThread thread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ 2 * kEpsilonMicros,
+ 4,
+ &did_wait, &result);
+ stopwatch.Start();
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+ // Not what we're waiting for.
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE);
+ } // Joins the thread (after its wait times out).
+ // If we closed earlier, then probably we'd get a |MOJO_RESULT_CANCELLED|.
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ }
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_TRUE(did_wait);
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+}
+
+TEST(SimpleDispatcherTest, MultipleWaiters) {
+ static const size_t kNumWaiters = 20;
+
+ bool did_wait[kNumWaiters];
+ MojoResult result[kNumWaiters];
+
+ // All wait for readable and becomes readable after some time.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ ScopedVector<test::WaiterThread> threads;
+ for (size_t i = 0; i < kNumWaiters; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the threads.
+ for (size_t i = 0; i < kNumWaiters; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(static_cast<MojoResult>(i), result[i]);
+ }
+
+ // Some wait for readable, some for writable, and becomes readable after some
+ // time.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ ScopedVector<test::WaiterThread> threads;
+ for (size_t i = 0; i < kNumWaiters / 2; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_WRITABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ // This will wake up the ones waiting to write.
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the threads.
+ for (size_t i = 0; i < kNumWaiters / 2; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(static_cast<MojoResult>(i), result[i]);
+ }
+ for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result[i]);
+ }
+
+ // Some wait for readable, some for writable, and becomes readable and
+ // never-writable after some time.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ ScopedVector<test::WaiterThread> threads;
+ for (size_t i = 0; i < kNumWaiters / 2; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_WRITABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+ d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE);
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the threads.
+ for (size_t i = 0; i < kNumWaiters / 2; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(static_cast<MojoResult>(i), result[i]);
+ }
+ for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result[i]);
+ }
+
+ // Some wait for readable, some for writable, and becomes readable after some
+ // time.
+ {
+ scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher());
+ ScopedVector<test::WaiterThread> threads;
+ for (size_t i = 0; i < kNumWaiters / 2; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_READABLE,
+ 3 * kEpsilonMicros,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) {
+ threads.push_back(new test::WaiterThread(d,
+ MOJO_WAIT_FLAG_WRITABLE,
+ 1 * kEpsilonMicros,
+ static_cast<MojoResult>(i),
+ &did_wait[i], &result[i]));
+ threads.back()->Start();
+ }
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE);
+ // All those waiting for writable should have timed out.
+ EXPECT_EQ(MOJO_RESULT_OK, d->Close());
+ } // Joins the threads.
+ for (size_t i = 0; i < kNumWaiters / 2; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(static_cast<MojoResult>(i), result[i]);
+ }
+ for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) {
+ EXPECT_TRUE(did_wait[i]);
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result[i]);
+ }
+}
+
+// TODO(vtl): Stress test?
+
+} // namespace
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/test_utils.h b/mojo/system/test_utils.h
new file mode 100644
index 0000000..2f1e956
--- /dev/null
+++ b/mojo/system/test_utils.h
@@ -0,0 +1,38 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_TEST_UTILS_H_
+#define MOJO_SYSTEM_TEST_UTILS_H_
+
+#include "base/basictypes.h"
+#include "base/time/time.h"
+
+namespace mojo {
+namespace system {
+namespace test {
+
+class Stopwatch {
+ public:
+ Stopwatch() {}
+ ~Stopwatch() {}
+
+ void Start() {
+ start_time_ = base::TimeTicks::HighResNow();
+ }
+
+ int64_t Elapsed() {
+ return (base::TimeTicks::HighResNow() - start_time_).InMicroseconds();
+ }
+
+ private:
+ base::TimeTicks start_time_;
+
+ DISALLOW_COPY_AND_ASSIGN(Stopwatch);
+};
+
+} // namespace test
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_TEST_UTILS_H_
diff --git a/mojo/system/waiter.cc b/mojo/system/waiter.cc
new file mode 100644
index 0000000..6dda21f
--- /dev/null
+++ b/mojo/system/waiter.cc
@@ -0,0 +1,80 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/waiter.h"
+
+#include <limits>
+
+#include "base/logging.h"
+#include "base/time/time.h"
+
+namespace mojo {
+namespace system {
+
+Waiter::Waiter()
+ : cv_(&lock_),
+ awoken_(false),
+ wait_result_(MOJO_RESULT_INTERNAL) {
+}
+
+Waiter::~Waiter() {
+}
+
+void Waiter::Init() {
+ awoken_ = false;
+ // NOTE(vtl): If performance ever becomes an issue, we can disable the setting
+ // of |wait_result_| (except the first one in |Awake()|) in Release builds.
+ wait_result_ = MOJO_RESULT_INTERNAL;
+}
+
+// TODO(vtl): Fast-path the |deadline == 0| case?
+MojoResult Waiter::Wait(MojoDeadline deadline) {
+ base::AutoLock locker(lock_);
+
+ // Fast-path the already-awoken case:
+ if (awoken_) {
+ DCHECK_NE(wait_result_, MOJO_RESULT_INTERNAL);
+ return wait_result_;
+ }
+
+ // |MojoDeadline| is actually a |uint64_t|, but we need a signed quantity.
+ // Treat any out-of-range deadline as "forever" (which is wrong, but okay
+ // since 2^63 microseconds is ~300000 years). Note that this also takes care
+ // of the |MOJO_DEADLINE_INDEFINITE| (= 2^64 - 1) case.
+ if (deadline > static_cast<uint64_t>(std::numeric_limits<int64_t>::max())) {
+ do {
+ cv_.Wait();
+ } while (!awoken_);
+ } else {
+ // NOTE(vtl): This is very inefficient on POSIX, since pthreads condition
+ // variables take an absolute deadline.
+ const base::TimeTicks end_time = base::TimeTicks::HighResNow() +
+ base::TimeDelta::FromMicroseconds(static_cast<int64_t>(deadline));
+ do {
+ base::TimeTicks now_time = base::TimeTicks::HighResNow();
+ if (now_time >= end_time)
+ return MOJO_RESULT_DEADLINE_EXCEEDED;
+
+ cv_.TimedWait(end_time - now_time);
+ } while (!awoken_);
+ }
+
+ DCHECK_NE(wait_result_, MOJO_RESULT_INTERNAL);
+ return wait_result_;
+}
+
+void Waiter::Awake(MojoResult wait_result) {
+ base::AutoLock locker(lock_);
+
+ if (awoken_)
+ return;
+
+ awoken_ = true;
+ wait_result_ = wait_result;
+ cv_.Signal();
+ // |cv_.Wait()|/|cv_.TimedWait()| will return after |lock_| is released.
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/waiter.h b/mojo/system/waiter.h
new file mode 100644
index 0000000..bdd33fc
--- /dev/null
+++ b/mojo/system/waiter.h
@@ -0,0 +1,57 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_WAITER_H_
+#define MOJO_SYSTEM_WAITER_H_
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace system {
+
+// IMPORTANT (all-caps gets your attention, right?): |Waiter| methods are called
+// under other locks, in particular, |Dispatcher::lock_|s, so |Waiter| methods
+// must never call out to other objects (in particular, |Dispatcher|s). This
+// class is thread-safe.
+class Waiter {
+ public:
+ Waiter();
+ ~Waiter();
+
+ // A |Waiter| can be used multiple times; |Init()| should be called before
+ // each time it's used.
+ void Init();
+
+ // Waits until a suitable |Awake()| is called.
+ // Returns:
+ // - The |wake_result| passed to |Dispatcher::AddWaiter()| if it was woken up
+ // by that dispatcher for the reason specified by |flags| (in the call to
+ // |AddWaiter()|).
+ // - |MOJO_RESULT_CANCELLED| if a handle (on which |MojoWait()| was called)
+ // was closed; and
+ // - |MOJO_RESULT_FAILED_PRECONDITION| if the reasons for being awoken given
+ // by |flags| cannot (or can no longer) be satisfied (e.g., if the other
+ // end of a pipe is closed).
+ MojoResult Wait(MojoDeadline deadline);
+
+ // Wake the waiter up with the given result (or no-op if it's been woken up
+ // already).
+ void Awake(MojoResult wait_result);
+
+ private:
+ base::ConditionVariable cv_; // Associated to |lock_|.
+ base::Lock lock_; // Protects the following members.
+ bool awoken_;
+ MojoResult wait_result_;
+
+ DISALLOW_COPY_AND_ASSIGN(Waiter);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_WAITER_H_
diff --git a/mojo/system/waiter_list.cc b/mojo/system/waiter_list.cc
new file mode 100644
index 0000000..6f2c444
--- /dev/null
+++ b/mojo/system/waiter_list.cc
@@ -0,0 +1,57 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/waiter_list.h"
+
+#include "base/logging.h"
+#include "mojo/system/waiter.h"
+
+namespace mojo {
+namespace system {
+
+WaiterList::WaiterList() {
+}
+
+WaiterList::~WaiterList() {
+ DCHECK(waiters_.empty());
+}
+
+void WaiterList::AwakeWaitersForStateChange(MojoWaitFlags satisfied_flags,
+ MojoWaitFlags satisfiable_flags) {
+ for (WaiterInfoList::iterator it = waiters_.begin(); it != waiters_.end();
+ ++it) {
+ if (it->flags & satisfied_flags)
+ it->waiter->Awake(it->wake_result);
+ else if (!(it->flags & satisfiable_flags))
+ it->waiter->Awake(MOJO_RESULT_FAILED_PRECONDITION);
+ }
+}
+
+void WaiterList::CancelAllWaiters() {
+ for (WaiterInfoList::iterator it = waiters_.begin(); it != waiters_.end();
+ ++it) {
+ it->waiter->Awake(MOJO_RESULT_CANCELLED);
+ }
+ waiters_.clear();
+}
+
+void WaiterList::AddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ waiters_.push_back(WaiterInfo(waiter, flags, wake_result));
+}
+
+void WaiterList::RemoveWaiter(Waiter* waiter) {
+ // We allow a thread to wait on the same handle multiple times simultaneously,
+ // so we need to scan the entire list and remove all occurrences of |waiter|.
+ for (WaiterInfoList::iterator it = waiters_.begin(); it != waiters_.end();) {
+ WaiterInfoList::iterator maybe_delete = it;
+ ++it;
+ if (maybe_delete->waiter == waiter)
+ waiters_.erase(maybe_delete);
+ }
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/waiter_list.h b/mojo/system/waiter_list.h
new file mode 100644
index 0000000..23933e3
--- /dev/null
+++ b/mojo/system/waiter_list.h
@@ -0,0 +1,55 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_WAITER_LIST_H_
+#define MOJO_SYSTEM_WAITER_LIST_H_
+
+#include <list>
+
+#include "base/basictypes.h"
+#include "mojo/public/system/core.h"
+
+namespace mojo {
+namespace system {
+
+class Waiter;
+
+// |WaiterList| tracks all the |Waiter|s that are waiting on a given
+// handle/|Dispatcher|. There should be a |WaiterList| for each handle that can
+// be waited on (in any way). In the simple case, the |WaiterList| is owned by
+// the |Dispatcher|, whereas in more complex cases it is owned by the secondary
+// object (see simple_dispatcher.* and the explanatory comment in core_impl.cc).
+// This class is thread-unsafe (all concurrent access must be protected by some
+// lock).
+class WaiterList {
+ public:
+ WaiterList();
+ ~WaiterList();
+
+ void AwakeWaitersForStateChange(MojoWaitFlags satisfied_flags,
+ MojoWaitFlags satisfiable_flags);
+ void CancelAllWaiters();
+ void AddWaiter(Waiter* waiter, MojoWaitFlags flags, MojoResult wake_result);
+ void RemoveWaiter(Waiter* waiter);
+
+ private:
+ struct WaiterInfo {
+ WaiterInfo(Waiter* waiter, MojoWaitFlags flags, MojoResult wake_result)
+ : waiter(waiter), flags(flags), wake_result(wake_result) {}
+
+ Waiter* waiter;
+ MojoWaitFlags flags;
+ MojoResult wake_result;
+ };
+ typedef std::list<WaiterInfo> WaiterInfoList;
+
+ WaiterInfoList waiters_;
+
+ DISALLOW_COPY_AND_ASSIGN(WaiterList);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_WAITER_LIST_H_
diff --git a/mojo/system/waiter_list_unittest.cc b/mojo/system/waiter_list_unittest.cc
new file mode 100644
index 0000000..fa6564c
--- /dev/null
+++ b/mojo/system/waiter_list_unittest.cc
@@ -0,0 +1,266 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// NOTE(vtl): These tests are inherently flaky (e.g., if run on a heavily-loaded
+// system). Sorry. |kEpsilonMicros| may be increased to increase tolerance and
+// reduce observed flakiness.
+
+#include "mojo/system/waiter_list.h"
+
+#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "base/time/time.h"
+#include "mojo/system/waiter.h"
+#include "mojo/system/waiter_test_utils.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+const int64_t kMicrosPerMs = 1000;
+const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
+
+TEST(WaiterListTest, BasicCancel) {
+ MojoResult result;
+
+ // Cancel immediately after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0);
+ thread.Start();
+ waiter_list.CancelAllWaiters();
+ waiter_list.RemoveWaiter(thread.waiter()); // Double-remove okay.
+ } // Join |thread|.
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+
+ // Cancel before after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1);
+ waiter_list.CancelAllWaiters();
+ thread.Start();
+ } // Join |thread|.
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+
+ // Cancel some time after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 2);
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ waiter_list.CancelAllWaiters();
+ } // Join |thread|.
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+}
+
+TEST(WaiterListTest, BasicAwakeSatisfied) {
+ MojoResult result;
+
+ // Awake immediately after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0);
+ thread.Start();
+ waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE,
+ MOJO_WAIT_FLAG_READABLE |
+ MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread.waiter());
+ } // Join |thread|.
+ EXPECT_EQ(0, result);
+
+ // Awake before after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1);
+ waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_WRITABLE,
+ MOJO_WAIT_FLAG_READABLE |
+ MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread.waiter());
+ waiter_list.RemoveWaiter(thread.waiter()); // Double-remove okay.
+ thread.Start();
+ } // Join |thread|.
+ EXPECT_EQ(1, result);
+
+ // Awake some time after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 2);
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE,
+ MOJO_WAIT_FLAG_READABLE |
+ MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread.waiter());
+ } // Join |thread|.
+ EXPECT_EQ(2, result);
+}
+
+TEST(WaiterListTest, BasicAwakeUnsatisfiable) {
+ MojoResult result;
+
+ // Awake (for unsatisfiability) immediately after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0);
+ thread.Start();
+ waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread.waiter());
+ } // Join |thread|.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+
+ // Awake (for unsatisfiability) before after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1);
+ waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE,
+ MOJO_WAIT_FLAG_READABLE);
+ waiter_list.RemoveWaiter(thread.waiter());
+ thread.Start();
+ } // Join |thread|.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+
+ // Awake (for unsatisfiability) some time after thread start.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread(&result);
+ waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 2);
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread.waiter());
+ waiter_list.RemoveWaiter(thread.waiter()); // Double-remove okay.
+ } // Join |thread|.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+}
+
+TEST(WaiterListTest, MultipleWaiters) {
+ MojoResult result_1;
+ MojoResult result_2;
+ MojoResult result_3;
+ MojoResult result_4;
+
+ // Cancel two waiters.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread_1(&result_1);
+ waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 0);
+ thread_1.Start();
+ test::SimpleWaiterThread thread_2(&result_2);
+ waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1);
+ thread_2.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ waiter_list.CancelAllWaiters();
+ } // Join threads.
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result_1);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result_2);
+
+ // Awake one waiter, cancel other.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread_1(&result_1);
+ waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 2);
+ thread_1.Start();
+ test::SimpleWaiterThread thread_2(&result_2);
+ waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 3);
+ thread_2.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE,
+ MOJO_WAIT_FLAG_READABLE |
+ MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread_1.waiter());
+ waiter_list.CancelAllWaiters();
+ } // Join threads.
+ EXPECT_EQ(2, result_1);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result_2);
+
+ // Cancel one waiter, awake other for unsatisfiability.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread_1(&result_1);
+ waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 4);
+ thread_1.Start();
+ test::SimpleWaiterThread thread_2(&result_2);
+ waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 5);
+ thread_2.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_READABLE);
+ waiter_list.RemoveWaiter(thread_2.waiter());
+ waiter_list.CancelAllWaiters();
+ } // Join threads.
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result_1);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result_2);
+
+ // Cancel one waiter, awake other for unsatisfiability.
+ {
+ WaiterList waiter_list;
+ test::SimpleWaiterThread thread_1(&result_1);
+ waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 6);
+ thread_1.Start();
+
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+
+ // Should do nothing.
+ waiter_list.AwakeWaitersForStateChange(0,
+ MOJO_WAIT_FLAG_READABLE |
+ MOJO_WAIT_FLAG_WRITABLE);
+
+ test::SimpleWaiterThread thread_2(&result_2);
+ waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 7);
+ thread_2.Start();
+
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+
+ // Awake #1.
+ waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE,
+ MOJO_WAIT_FLAG_READABLE |
+ MOJO_WAIT_FLAG_WRITABLE);
+ waiter_list.RemoveWaiter(thread_1.waiter());
+
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+
+ test::SimpleWaiterThread thread_3(&result_3);
+ waiter_list.AddWaiter(thread_3.waiter(), MOJO_WAIT_FLAG_WRITABLE, 8);
+ thread_3.Start();
+
+ test::SimpleWaiterThread thread_4(&result_4);
+ waiter_list.AddWaiter(thread_4.waiter(), MOJO_WAIT_FLAG_READABLE, 9);
+ thread_4.Start();
+
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+
+ // Awake #2 and #3 for unsatisfiability.
+ waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_READABLE);
+ waiter_list.RemoveWaiter(thread_2.waiter());
+ waiter_list.RemoveWaiter(thread_3.waiter());
+
+ // Cancel #4.
+ waiter_list.CancelAllWaiters();
+ } // Join threads.
+ EXPECT_EQ(6, result_1);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result_2);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result_3);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result_4);
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/waiter_test_utils.cc b/mojo/system/waiter_test_utils.cc
new file mode 100644
index 0000000..37df616
--- /dev/null
+++ b/mojo/system/waiter_test_utils.cc
@@ -0,0 +1,63 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/waiter_test_utils.h"
+
+namespace mojo {
+namespace system {
+namespace test {
+
+SimpleWaiterThread::SimpleWaiterThread(MojoResult* result)
+ : base::SimpleThread("waiter_thread"),
+ result_(result) {
+ waiter_.Init();
+ *result_ = -5420734; // Totally invalid result.
+}
+
+SimpleWaiterThread::~SimpleWaiterThread() {
+ Join();
+}
+
+void SimpleWaiterThread::Run() {
+ *result_ = waiter_.Wait(MOJO_DEADLINE_INDEFINITE);
+}
+
+WaiterThread::WaiterThread(scoped_refptr<Dispatcher> dispatcher,
+ MojoWaitFlags wait_flags,
+ MojoDeadline deadline,
+ MojoResult success_result,
+ bool* did_wait_out,
+ MojoResult* result_out)
+ : base::SimpleThread("waiter_thread"),
+ dispatcher_(dispatcher),
+ wait_flags_(wait_flags),
+ deadline_(deadline),
+ success_result_(success_result),
+ did_wait_out_(did_wait_out),
+ result_out_(result_out) {
+ *did_wait_out_ = false;
+ *result_out_ = -8542346; // Totally invalid result.
+}
+
+WaiterThread::~WaiterThread() {
+ Join();
+}
+
+void WaiterThread::Run() {
+ waiter_.Init();
+
+ *result_out_ = dispatcher_->AddWaiter(&waiter_,
+ wait_flags_,
+ success_result_);
+ if (*result_out_ != MOJO_RESULT_OK)
+ return;
+
+ *did_wait_out_ = true;
+ *result_out_ = waiter_.Wait(deadline_);
+ dispatcher_->RemoveWaiter(&waiter_);
+}
+
+} // namespace test
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/waiter_test_utils.h b/mojo/system/waiter_test_utils.h
new file mode 100644
index 0000000..b1a7a33
--- /dev/null
+++ b/mojo/system/waiter_test_utils.h
@@ -0,0 +1,95 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_WAITER_TEST_UTILS_H_
+#define MOJO_SYSTEM_WAITER_TEST_UTILS_H_
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/threading/simple_thread.h"
+#include "mojo/public/system/core.h"
+#include "mojo/system/dispatcher.h"
+#include "mojo/system/waiter.h"
+
+namespace mojo {
+namespace system {
+namespace test {
+
+// This is a very simple thread that has a |Waiter|, on which it waits
+// indefinitely (and records the result). It will create and initialize the
+// |Waiter| on creation, but the caller must start the thread with |Start()|. It
+// will join the thread on destruction.
+//
+// One usually uses it like:
+//
+// MojoResult result;
+// {
+// WaiterList waiter_list;
+// test::SimpleWaiterThread thread(&result);
+// waiter_list.AddWaiter(thread.waiter(), ...);
+// thread.Start();
+// ... some stuff to wake the waiter ...
+// waiter_list.RemoveWaiter(thread.waiter());
+// } // Join |thread|.
+// EXPECT_EQ(..., result);
+//
+// There's a bit of unrealism in its use: In this sort of usage, calls such as
+// |Waiter::Init()|, |AddWaiter()|, and |RemoveWaiter()| are done in the main
+// (test) thread, not the waiter thread (as would actually happen in real code).
+// (We accept this unrealism for simplicity, since |WaiterList| is
+// thread-unsafe so making it more realistic would require adding nontrivial
+// synchronization machinery.)
+class SimpleWaiterThread : public base::SimpleThread {
+ public:
+ // For the duration of the lifetime of this object, |*result| belongs to it
+ // (in the sense that it will write to it whenever it wants).
+ explicit SimpleWaiterThread(MojoResult* result);
+ virtual ~SimpleWaiterThread(); // Joins the thread.
+
+ Waiter* waiter() { return &waiter_; }
+
+ private:
+ virtual void Run() OVERRIDE;
+
+ MojoResult* const result_;
+ Waiter waiter_;
+
+ DISALLOW_COPY_AND_ASSIGN(SimpleWaiterThread);
+};
+
+// This is a more complex and realistic thread that has a |Waiter|, on which it
+// waits for the given deadline (with the given flags). Unlike
+// |SimpleWaiterThread|, it requires the machinery of |Dispatcher|.
+class WaiterThread : public base::SimpleThread {
+ public:
+ // Note: |*did_wait_out| and |*result| belong to this object while it's alive.
+ WaiterThread(scoped_refptr<Dispatcher> dispatcher,
+ MojoWaitFlags wait_flags,
+ MojoDeadline deadline,
+ MojoResult success_result,
+ bool* did_wait_out,
+ MojoResult* result_out);
+ virtual ~WaiterThread();
+
+ private:
+ virtual void Run() OVERRIDE;
+
+ const scoped_refptr<Dispatcher> dispatcher_;
+ const MojoWaitFlags wait_flags_;
+ const MojoDeadline deadline_;
+ const MojoResult success_result_;
+ bool* const did_wait_out_;
+ MojoResult* const result_out_;
+
+ Waiter waiter_;
+
+ DISALLOW_COPY_AND_ASSIGN(WaiterThread);
+};
+
+} // namespace test
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_WAITER_TEST_UTILS_H_
diff --git a/mojo/system/waiter_unittest.cc b/mojo/system/waiter_unittest.cc
new file mode 100644
index 0000000..4a69a89
--- /dev/null
+++ b/mojo/system/waiter_unittest.cc
@@ -0,0 +1,285 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// NOTE(vtl): These tests are inherently flaky (e.g., if run on a heavily-loaded
+// system). Sorry. |kEpsilonMicros| may be increased to increase tolerance and
+// reduce observed flakiness.
+
+#include "mojo/system/waiter.h"
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "base/threading/simple_thread.h"
+#include "base/time/time.h"
+#include "mojo/system/test_utils.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+const int64_t kMicrosPerMs = 1000;
+const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
+const int64_t kPollTimeMicros = 10 * kMicrosPerMs; // 10 ms.
+
+class WaitingThread : public base::SimpleThread {
+ public:
+ explicit WaitingThread(MojoDeadline deadline)
+ : base::SimpleThread("waiting_thread"),
+ deadline_(deadline),
+ done_(false),
+ result_(MOJO_RESULT_UNKNOWN),
+ elapsed_micros_(-1) {
+ waiter_.Init();
+ }
+
+ virtual ~WaitingThread() {
+ Join();
+ }
+
+ void WaitUntilDone(MojoResult* result, int64_t* elapsed_micros) {
+ for (;;) {
+ {
+ base::AutoLock locker(lock_);
+ if (done_) {
+ *result = result_;
+ *elapsed_micros = elapsed_micros_;
+ break;
+ }
+ }
+
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(kPollTimeMicros));
+ }
+ }
+
+ Waiter* waiter() { return &waiter_; }
+
+ private:
+ virtual void Run() OVERRIDE {
+ test::Stopwatch stopwatch;
+ MojoResult result;
+ int64_t elapsed_micros;
+
+ stopwatch.Start();
+ result = waiter_.Wait(deadline_);
+ elapsed_micros = stopwatch.Elapsed();
+
+ {
+ base::AutoLock locker(lock_);
+ done_ = true;
+ result_ = result;
+ elapsed_micros_ = elapsed_micros;
+ }
+ }
+
+ const MojoDeadline deadline_;
+ Waiter waiter_; // Thread-safe.
+
+ base::Lock lock_; // Protects the following members.
+ bool done_;
+ MojoResult result_;
+ int64_t elapsed_micros_;
+
+ DISALLOW_COPY_AND_ASSIGN(WaitingThread);
+};
+
+TEST(WaiterTest, Basic) {
+ MojoResult result;
+ int64_t elapsed_micros;
+
+ // Finite deadline.
+
+ // Awake immediately after thread start.
+ {
+ WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros));
+ thread.Start();
+ thread.waiter()->Awake(0);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(0, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ // Awake before after thread start.
+ {
+ WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros));
+ thread.waiter()->Awake(MOJO_RESULT_CANCELLED);
+ thread.Start();
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ // Awake some time after thread start.
+ {
+ WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros));
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ thread.waiter()->Awake(1);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(1, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+ }
+
+ // Awake some longer time after thread start.
+ {
+ WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros));
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(5 * kEpsilonMicros));
+ thread.waiter()->Awake(1);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(1, result);
+ EXPECT_GT(elapsed_micros, (5-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (5+1) * kEpsilonMicros);
+ }
+
+ // Don't awake -- time out (on another thread).
+ {
+ WaitingThread thread(static_cast<MojoDeadline>(2 * kEpsilonMicros));
+ thread.Start();
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+ }
+
+ // No (indefinite) deadline.
+
+ // Awake immediately after thread start.
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.Start();
+ thread.waiter()->Awake(0);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(0, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ // Awake before after thread start.
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.waiter()->Awake(MOJO_RESULT_CANCELLED);
+ thread.Start();
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ // Awake some time after thread start.
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ thread.waiter()->Awake(1);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(1, result);
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+ }
+
+ // Awake some longer time after thread start.
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(5 * kEpsilonMicros));
+ thread.waiter()->Awake(1);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(1, result);
+ EXPECT_GT(elapsed_micros, (5-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (5+1) * kEpsilonMicros);
+ }
+}
+
+TEST(WaiterTest, TimeOut) {
+ test::Stopwatch stopwatch;
+ int64_t elapsed_micros;
+
+ Waiter waiter;
+
+ waiter.Init();
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+
+ waiter.Init();
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
+ waiter.Wait(static_cast<MojoDeadline>(2 * kEpsilonMicros)));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
+
+ waiter.Init();
+ stopwatch.Start();
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
+ waiter.Wait(static_cast<MojoDeadline>(5 * kEpsilonMicros)));
+ elapsed_micros = stopwatch.Elapsed();
+ EXPECT_GT(elapsed_micros, (5-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (5+1) * kEpsilonMicros);
+}
+
+// The first |Awake()| should always win.
+TEST(WaiterTest, MultipleAwakes) {
+ MojoResult result;
+ int64_t elapsed_micros;
+
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.Start();
+ thread.waiter()->Awake(0);
+ thread.waiter()->Awake(1);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(0, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.waiter()->Awake(1);
+ thread.Start();
+ thread.waiter()->Awake(0);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(1, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ {
+ WaitingThread thread(MOJO_DEADLINE_INDEFINITE);
+ thread.Start();
+ thread.waiter()->Awake(10);
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ thread.waiter()->Awake(20);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(10, result);
+ EXPECT_LT(elapsed_micros, kEpsilonMicros);
+ }
+
+ {
+ WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros));
+ thread.Start();
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros));
+ thread.waiter()->Awake(MOJO_RESULT_FAILED_PRECONDITION);
+ base::PlatformThread::Sleep(
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
+ thread.waiter()->Awake(0);
+ thread.WaitUntilDone(&result, &elapsed_micros);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+ EXPECT_GT(elapsed_micros, (1-1) * kEpsilonMicros);
+ EXPECT_LT(elapsed_micros, (1+1) * kEpsilonMicros);
+ }
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo