diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-09-28 00:30:04 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-09-28 00:30:04 +0000 |
commit | 3d58663b00b219889b298b233e0c0e0f80596ed7 (patch) | |
tree | 9a4bdf0e0f48f1af7a083924c2711af9cc7310d1 /mojo | |
parent | 82ce871c06a72ffb38f3ddcac1090a2ceed266fb (diff) | |
download | chromium_src-3d58663b00b219889b298b233e0c0e0f80596ed7.zip chromium_src-3d58663b00b219889b298b233e0c0e0f80596ed7.tar.gz chromium_src-3d58663b00b219889b298b233e0c0e0f80596ed7.tar.bz2 |
Initial in-process implementation of some Mojo primitives.
This has an initial in-process implementation of the most basic Mojo primitives:
- MojoClose()
- MojoWait()
- MojoWaitMany()
- MojoCreateMessagePipe()
- MojoWriteMessage()
- MojoReadMessage()
R=darin@chromium.org
Committed: https://src.chromium.org/viewvc/chrome?view=rev&revision=225801
Review URL: https://codereview.chromium.org/23621056
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@225821 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
36 files changed, 5344 insertions, 1 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp index 2cd3d5c..567452f 100644 --- a/mojo/mojo.gyp +++ b/mojo/mojo.gyp @@ -4,7 +4,7 @@ { 'variables': { - 'chromium_code': 1, # Use higher warning level. + 'chromium_code': 1, }, 'target_defaults': { 'defines': ['MOJO_IMPLEMENTATION'], @@ -13,6 +13,110 @@ { 'target_name': 'mojo', 'type': 'none', + 'dependencies': [ + 'mojo_public_test_support', + 'mojo_public_unittests', + 'mojo_public_perftests', + 'mojo_system', + 'mojo_system_unittests', + ], + }, + { + 'target_name': 'mojo_public_test_support', + 'type': 'static_library', + 'dependencies': [ + '../base/base.gyp:base', + '../testing/gtest.gyp:gtest', + 'mojo_system', + ], + 'sources': [ + 'public/tests/test_support.cc', + 'public/tests/test_support.h', + ], + }, + { + 'target_name': 'mojo_public_unittests', + 'type': 'executable', + 'dependencies': [ + '../base/base.gyp:run_all_unittests', + '../testing/gtest.gyp:gtest', + 'mojo_public_test_support', + 'mojo_system', + ], + 'sources': [ + 'public/tests/system_core_unittest.cc', + ], + }, + { + 'target_name': 'mojo_public_perftests', + 'type': 'executable', + 'dependencies': [ + '../base/base.gyp:base', + '../base/base.gyp:test_support_perf', + '../testing/gtest.gyp:gtest', + 'mojo_public_test_support', + 'mojo_system', + ], + 'sources': [ + 'public/tests/system_core_perftest.cc', + ], + }, + { + 'target_name': 'mojo_system', + # TODO(vtl): This should probably be '<(component)'; make it work. + 'type': 'static_library', + 'dependencies': [ + '../base/base.gyp:base', + ], + 'sources': [ + 'public/system/core.h', + 'system/core.cc', + 'system/core_impl.cc', + 'system/core_impl.h', + 'system/dispatcher.cc', + 'system/dispatcher.h', + 'system/limits.h', + 'system/memory.cc', + 'system/memory.h', + 'system/message_pipe.cc', + 'system/message_pipe.h', + 'system/message_pipe_dispatcher.cc', + 'system/message_pipe_dispatcher.h', + 'system/simple_dispatcher.cc', + 'system/simple_dispatcher.h', + 'system/waiter.cc', + 'system/waiter.h', + 'system/waiter_list.cc', + 'system/waiter_list.h', + ], + 'direct_dependent_settings': { + 'include_dirs': [ + '..', + ], + }, + }, + { + 'target_name': 'mojo_system_unittests', + 'type': 'executable', + 'dependencies': [ + '../base/base.gyp:run_all_unittests', + '../testing/gtest.gyp:gtest', + 'mojo_system', + ], + 'sources': [ + 'system/core_impl_unittest.cc', + 'system/core_test_base.cc', + 'system/core_test_base.h', + 'system/dispatcher_unittest.cc', + 'system/message_pipe_dispatcher_unittest.cc', + 'system/message_pipe_unittest.cc', + 'system/simple_dispatcher_unittest.cc', + 'system/test_utils.h', + 'system/waiter_list_unittest.cc', + 'system/waiter_test_utils.cc', + 'system/waiter_test_utils.h', + 'system/waiter_unittest.cc', + ], }, ], } diff --git a/mojo/public/system/core.h b/mojo/public/system/core.h index 9a75c41..cb405ae 100644 --- a/mojo/public/system/core.h +++ b/mojo/public/system/core.h @@ -5,15 +5,310 @@ #ifndef MOJO_PUBLIC_SYSTEM_CORE_H_ #define MOJO_PUBLIC_SYSTEM_CORE_H_ +// Note: This header should be compilable as C. + #include <stdint.h> +// Types ----------------------------------------------------------------------- + +// TODO(vtl): Notes: Use of undefined flags will lead to undefined behavior +// (typically they'll be ignored), not necessarily an error. + +// Handles to Mojo objects. typedef uint32_t MojoHandle; +// Result codes for Mojo operations. Non-negative values are success codes; +// negative values are error/failure codes. +typedef int32_t MojoResult; + +// Used to specify deadlines (timeouts), in microseconds. Note that +// |MOJO_DEADLINE_INDEFINITE| (-1) means "forever". +typedef uint64_t MojoDeadline; + +// Used to specify the state of a handle to wait on (e.g., the ability to read +// or write to it). +typedef uint32_t MojoWaitFlags; + +// Used to specify different modes to |MojoWriteMessage()|. +typedef uint32_t MojoWriteMessageFlags; + +// Used to specify different modes to |MojoReadMessage()|. +typedef uint32_t MojoReadMessageFlags; + +// Constants ------------------------------------------------------------------- + +// |MojoHandle|: +// |MOJO_HANDLE_INVALID| - A value that is never a valid handle. +#ifdef __cplusplus +const MojoHandle MOJO_HANDLE_INVALID = 0; +#else +#define MOJO_HANDLE_INVALID ((MojoHandle) 0) +#endif + +// |MojoResult|: +// |MOJO_RESULT_OK| - Not an error; returned on success. Note that positive +// |MojoResult|s may also be used to indicate success. +// |MOJO_RESULT_CANCELLED| - Operation was cancelled, typically by the +// caller. +// |MOJO_RESULT_UNKNOWN| - Unknown error (e.g., if not enough information is +// available for a more specific error). +// |MOJO_RESULT_INVALID_ARGUMENT| - Caller specified an invalid argument. +// This differs from |MOJO_RESULT_FAILED_PRECONDITION| in that the former +// indicates arguments that are invalid regardless of the state of the +// system. +// |MOJO_RESULT_DEADLINE_EXCEEDED| - Deadline expired before the operation +// could complete. +// |MOJO_RESULT_NOT_FOUND| - Some requested entity was not found (i.e., does +// not exist). +// |MOJO_RESULT_ALREADY_EXISTS| - Some entity or condition that we attempted +// to create already exists. +// |MOJO_RESULT_PERMISSION_DENIED| - The caller does not have permission to +// for the operation (use |MOJO_RESULT_RESOURCE_EXHAUSTED| for rejections +// caused by exhausting some resource instead). +// |MOJO_RESULT_RESOURCE_EXHAUSTED| - Some resource required for the call +// (possibly some quota) has been exhausted. +// |MOJO_RESULT_FAILED_PRECONDITION| - The system is not in a state required +// for the operation (use this if the caller must do something to rectify +// the state before retrying). +// |MOJO_RESULT_ABORTED| - The operation was aborted by the system, possibly +// due to a concurrency issue (use this if the caller may retry at a +// higher level). +// |MOJO_RESULT_OUT_OF_RANGE| - The operation was attempted past the valid +// range. Unlike |MOJO_RESULT_INVALID_ARGUMENT|, this indicates that the +// operation may be/become valid depending on the system state. (This +// error is similar to |MOJO_RESULT_FAILED_PRECONDITION|, but is more +// specific.) +// |MOJO_RESULT_UNIMPLEMENTED| - The operation is not implemented, supported, +// or enabled. +// |MOJO_RESULT_INTERNAL| - Internal error: this should never happen and +// indicates that some invariant expected by the system has been broken. +// |MOJO_RESULT_UNAVAILABLE| - The operation is (temporarily) currently +// unavailable. The caller may simply retry the operation (possibly with +// a backoff). +// |MOJO_RESULT_DATA_LOSS| - Unrecoverable data loss or corruption. +// +// Note that positive values are also available as success codes. +// +// The codes from |MOJO_RESULT_OK| to |MOJO_RESULT_DATA_LOSS| come from +// Google3's canonical error codes. +#ifdef __cplusplus +const MojoResult MOJO_RESULT_OK = 0; +const MojoResult MOJO_RESULT_CANCELLED = -1; +const MojoResult MOJO_RESULT_UNKNOWN = -2; +const MojoResult MOJO_RESULT_INVALID_ARGUMENT = -3; +const MojoResult MOJO_RESULT_DEADLINE_EXCEEDED = -4; +const MojoResult MOJO_RESULT_NOT_FOUND = -5; +const MojoResult MOJO_RESULT_ALREADY_EXISTS = -6; +const MojoResult MOJO_RESULT_PERMISSION_DENIED = -7; +const MojoResult MOJO_RESULT_RESOURCE_EXHAUSTED = -8; +const MojoResult MOJO_RESULT_FAILED_PRECONDITION = -9; +const MojoResult MOJO_RESULT_ABORTED = -10; +const MojoResult MOJO_RESULT_OUT_OF_RANGE = -11; +const MojoResult MOJO_RESULT_UNIMPLEMENTED = -12; +const MojoResult MOJO_RESULT_INTERNAL = -13; +const MojoResult MOJO_RESULT_UNAVAILABLE = -14; +const MojoResult MOJO_RESULT_DATA_LOSS = -15; +#else +#define MOJO_RESULT_OK ((MojoResult) 0) +#define MOJO_RESULT_CANCELLED ((MojoResult) -1) +#define MOJO_RESULT_UNKNOWN ((MojoResult) -2) +#define MOJO_RESULT_INVALID_ARGUMENT ((MojoResult) -3) +#define MOJO_RESULT_DEADLINE_EXCEEDED ((MojoResult) -4) +#define MOJO_RESULT_NOT_FOUND ((MojoResult) -5) +#define MOJO_RESULT_ALREADY_EXISTS ((MojoResult) -6) +#define MOJO_RESULT_PERMISSION_DENIED ((MojoResult) -7) +#define MOJO_RESULT_RESOURCE_EXHAUSTED ((MojoResult) -8) +#define MOJO_RESULT_FAILED_PRECONDITION ((MojoResult) -9) +#define MOJO_RESULT_ABORTED ((MojoResult) -10) +#define MOJO_RESULT_OUT_OF_RANGE ((MojoResult) -11) +#define MOJO_RESULT_UNIMPLEMENTED ((MojoResult) -12) +#define MOJO_RESULT_INTERNAL ((MojoResult) -13) +#define MOJO_RESULT_UNAVAILABLE ((MojoResult) -14) +#define MOJO_RESULT_DATA_LOSS ((MojoResult) -15) +#endif + +// |MojoDeadline|: +// |MOJO_DEADLINE_INDEFINITE| #ifdef __cplusplus +const MojoDeadline MOJO_DEADLINE_INDEFINITE = static_cast<MojoDeadline>(-1); +#else +#define MOJO_DEADLINE_INDEFINITE = ((MojoDeadline) -1); +#endif + +// |MojoWaitFlags|: +// |MOJO_WAIT_FLAG_NONE| - No flags. |MojoWait()|, etc. will return +// |MOJO_RESULT_FAILED_PRECONDITION| if you attempt to wait on this. +// |MOJO_WAIT_FLAG_READABLE| - Can read (e.g., a message) from the handle. +// |MOJO_WAIT_FLAG_WRITABLE| - Can write (e.g., a message) to the handle. +// |MOJO_WAIT_FLAG_EVERYTHING| - All flags. +#ifdef __cplusplus +const MojoWaitFlags MOJO_WAIT_FLAG_NONE = 0; +const MojoWaitFlags MOJO_WAIT_FLAG_READABLE = 1 << 0; +const MojoWaitFlags MOJO_WAIT_FLAG_WRITABLE = 1 << 1; +const MojoWaitFlags MOJO_WAIT_FLAG_EVERYTHING = ~0; +#else +#define MOJO_WAIT_FLAG_NONE ((MojoWaitFlags) 0) +#define MOJO_WAIT_FLAG_READABLE ((MojoWaitFlags) 1 << 0) +#define MOJO_WAIT_FLAG_WRITABLE ((MojoWaitFlags) 1 << 1) +#define MOJO_WAIT_FLAG_EVERYTHING (~((MojoWaitFlags) 0)) +#endif + +// |MojoWriteMessageFlags|: +// |MOJO_WRITE_MESSAGE_FLAG_NONE| - No flags; default mode. +#ifdef __cplusplus +const MojoWriteMessageFlags MOJO_WRITE_MESSAGE_FLAG_NONE = 0; +#else +#define MOJO_WRITE_MESSAGE_FLAG_NONE ((MojoWriteMessageFlags) 0) +#endif + +// |MojoReadMessageFlags|: +// |MOJO_READ_MESSAGE_FLAG_NONE| - No flags; default mode. +// |MOJO_READ_MESSAGE_FLAG_MAY_DISCARD| - If the message is unable to be read +// for whatever reason (e.g., the caller-supplied buffer is too small), +// discard the message (i.e., simply dequeue it). +#ifdef __cplusplus +const MojoReadMessageFlags MOJO_READ_MESSAGE_FLAG_NONE = 0; +const MojoReadMessageFlags MOJO_READ_MESSAGE_FLAG_MAY_DISCARD = 1 << 0; +#else +#define MOJO_READ_MESSAGE_FLAG_NONE ((MojoReadMessageFlags) 0) +#define MOJO_READ_MESSAGE_FLAG_MAY_DISCARD ((MojoReadMessageFlags) 1 << 0) +#endif + +// Functions ------------------------------------------------------------------- + +#ifdef __cplusplus +extern "C" { +#endif + +// Closes the given |handle|. +// +// Returns: +// |MOJO_RESULT_OK| on success. +// |MOJO_RESULT_INVALID_ARGUMENT| if |handle| is not a valid handle. +// +// Concurrent operations on |handle| may succeed (or fail as usual) if they +// happen before the close, be cancelled with result |MOJO_RESULT_CANCELLED| if +// they properly overlap (this is likely the case with |MojoWait()|, etc.), or +// fail with |MOJO_RESULT_INVALID_ARGUMENT| if they happen after. +MojoResult MojoClose(MojoHandle handle); + +// Waits on the given handle until the state indicated by |flags| is satisfied +// or until |deadline| has passed. +// +// Returns: +// |MOJO_RESULT_OK| if some flag in |flags| was satisfied (or is already +// satisfied). +// |MOJO_RESULT_INVALID_ARGUMENT| if |handle| is not a valid handle (e.g., if +// it has already been closed). +// |MOJO_RESULT_DEADLINE_EXCEEDED| if the deadline has passed without any of +// the flags being satisfied. +// |MOJO_RESULT_FAILED_PRECONDITION| if it is or becomes impossible that any +// flag in |flags| will ever be satisfied. +// +// If there are multiple waiters (on different threads, obviously) waiting on +// the same handle and flag and that flag becomes set, all waiters will be +// awoken. +MojoResult MojoWait(MojoHandle handle, + MojoWaitFlags flags, + MojoDeadline deadline); + +// Waits on |handles[0]|, ..., |handles[num_handles-1]| for at least one of them +// to satisfy the state indicated by |flags[0]|, ..., |flags[num_handles-1]|, +// respectively, or until |deadline| has passed. +// +// Returns: +// The index |i| (from 0 to |num_handles-1|) if |handle[i]| satisfies +// |flags[i]|. +// |MOJO_RESULT_INVALID_ARGUMENT| if some |handle[i]| is not a valid handle +// (e.g., if it has already been closed). +// |MOJO_RESULT_DEADLINE_EXCEEDED| if the deadline has passed without any of +// handles satisfying any of its flags. +// |MOJO_RESULT_FAILED_PRECONDITION| if it is or becomes impossible that SOME +// |handle[i]| will ever satisfy any of its flags |flags[i]|. +MojoResult MojoWaitMany(const MojoHandle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline); + +// TODO(vtl): flags? other params (e.g., queue sizes, max message sizes?) +MojoResult MojoCreateMessagePipe(MojoHandle* handle_0, MojoHandle* handle_1); + +MojoResult MojoWriteMessage(MojoHandle handle, + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags); + +MojoResult MojoReadMessage(MojoHandle handle, + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags); + +#ifdef __cplusplus +} // extern "C" +#endif + +// C++ wrapper functions ------------------------------------------------------- + +#ifdef __cplusplus + namespace mojo { +// Used to assert things at compile time. (Use our own copy instead of +// Chromium's, since we can't depend on Chromium.) +template <bool> struct CompileAssert {}; +#define MOJO_COMPILE_ASSERT(expr, msg) \ + typedef CompileAssert<(bool(expr))> msg[bool(expr) ? 1 : -1] + struct Handle { MojoHandle value; }; +const Handle kInvalidHandle = { MOJO_HANDLE_INVALID }; + +// A |mojo::Handle| must take no extra space, since we'll treat arrays of them +// as if they were arrays of |MojoHandle|s. +MOJO_COMPILE_ASSERT(sizeof(Handle) == sizeof(MojoHandle), + bad_size_for_cplusplus_handle); + +inline MojoResult Close(Handle handle) { + return MojoClose(handle.value); +} + +inline MojoResult Wait(Handle handle, + MojoWaitFlags flags, + MojoDeadline deadline) { + return MojoWait(handle.value, flags, deadline); +} + +inline MojoResult WaitMany(const Handle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline) { + return MojoWaitMany(&handles[0].value, flags, num_handles, deadline); +} + +inline MojoResult CreateMessagePipe(Handle* handle_0, Handle* handle_1) { + return MojoCreateMessagePipe(&handle_0->value, &handle_1->value); +} + +inline MojoResult WriteMessage(Handle handle, + const void* bytes, uint32_t num_bytes, + const Handle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags) { + return MojoWriteMessage(handle.value, + bytes, num_bytes, + &handles[0].value, num_handles, + flags); +} + +inline MojoResult ReadMessage(Handle handle, + void* bytes, uint32_t* num_bytes, + Handle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) { + return MojoReadMessage(handle.value, + bytes, num_bytes, + &handles[0].value, num_handles, + flags); +} + } // namespace mojo #endif diff --git a/mojo/public/tests/system_core_perftest.cc b/mojo/public/tests/system_core_perftest.cc new file mode 100644 index 0000000..2e9fef2 --- /dev/null +++ b/mojo/public/tests/system_core_perftest.cc @@ -0,0 +1,99 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/system/core.h" + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/logging.h" +#include "mojo/public/tests/test_support.h" + +namespace mojo { +namespace { + +class SystemPerftest : public test::TestBase { + public: + SystemPerftest() {} + virtual ~SystemPerftest() {} + + void NoOp() { + } + + void MessagePipe_CreateAndClose() { + MojoResult result; + result = CreateMessagePipe(&h_0_, &h_1_); + DCHECK_EQ(result, MOJO_RESULT_OK); + result = Close(h_0_); + DCHECK_EQ(result, MOJO_RESULT_OK); + result = Close(h_1_); + DCHECK_EQ(result, MOJO_RESULT_OK); + } + + void MessagePipe_WriteAndRead(void* buffer, uint32_t bytes) { + MojoResult result; + result = WriteMessage(h_0_, + buffer, bytes, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE); + DCHECK_EQ(result, MOJO_RESULT_OK); + uint32_t read_bytes = bytes; + result = ReadMessage(h_1_, + buffer, &read_bytes, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE); + DCHECK_EQ(result, MOJO_RESULT_OK); + } + + protected: + Handle h_0_; + Handle h_1_; + + private: + DISALLOW_COPY_AND_ASSIGN(SystemPerftest); +}; + +// A no-op test so we can compare performance. +TEST_F(SystemPerftest, NoOp) { + test::IterateAndReportPerf( + "NoOp", + base::Bind(&SystemPerftest::NoOp, + base::Unretained(this))); +} + +TEST_F(SystemPerftest, MessagePipe_CreateAndClose) { + test::IterateAndReportPerf( + "MessagePipe_CreateAndClose", + base::Bind(&SystemPerftest::MessagePipe_CreateAndClose, + base::Unretained(this))); +} + +TEST_F(SystemPerftest, MessagePipe_WriteAndRead) { + CHECK_EQ(CreateMessagePipe(&h_0_, &h_1_), MOJO_RESULT_OK); + char buffer[10000] = { 0 }; + test::IterateAndReportPerf( + "MessagePipe_WriteAndRead_10bytes", + base::Bind(&SystemPerftest::MessagePipe_WriteAndRead, + base::Unretained(this), + static_cast<void*>(buffer), static_cast<uint32_t>(10))); + test::IterateAndReportPerf( + "MessagePipe_WriteAndRead_100bytes", + base::Bind(&SystemPerftest::MessagePipe_WriteAndRead, + base::Unretained(this), + static_cast<void*>(buffer), static_cast<uint32_t>(100))); + test::IterateAndReportPerf( + "MessagePipe_WriteAndRead_1000bytes", + base::Bind(&SystemPerftest::MessagePipe_WriteAndRead, + base::Unretained(this), + static_cast<void*>(buffer), static_cast<uint32_t>(1000))); + test::IterateAndReportPerf( + "MessagePipe_WriteAndRead_10000bytes", + base::Bind(&SystemPerftest::MessagePipe_WriteAndRead, + base::Unretained(this), + static_cast<void*>(buffer), static_cast<uint32_t>(10000))); + CHECK_EQ(Close(h_0_), MOJO_RESULT_OK); + CHECK_EQ(Close(h_1_), MOJO_RESULT_OK); +} + +} // namespace +} // namespace mojo diff --git a/mojo/public/tests/system_core_unittest.cc b/mojo/public/tests/system_core_unittest.cc new file mode 100644 index 0000000..382a800 --- /dev/null +++ b/mojo/public/tests/system_core_unittest.cc @@ -0,0 +1,105 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/system/core.h" + +#include <string.h> + +#include "mojo/public/tests/test_support.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace { + +class SystemTest : public test::TestBase { +}; + +TEST_F(SystemTest, Basic) { + Handle h_0; + MojoWaitFlags wf; + char buffer[10] = { 0 }; + uint32_t buffer_size; + + // The only handle that's guaranteed to be invalid is |kInvalidHandle|. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, Close(kInvalidHandle)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + Wait(kInvalidHandle, MOJO_WAIT_FLAG_EVERYTHING, 1000000)); + h_0 = kInvalidHandle; + wf = MOJO_WAIT_FLAG_EVERYTHING; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + WaitMany(&h_0, &wf, 1, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + WriteMessage(h_0, + buffer, 3, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + buffer_size = static_cast<uint32_t>(sizeof(buffer)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + ReadMessage(h_0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + Handle h_1; + EXPECT_EQ(MOJO_RESULT_OK, CreateMessagePipe(&h_0, &h_1)); + + // Shouldn't be readable. + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + Wait(h_0, MOJO_WAIT_FLAG_READABLE, 0)); + + // Should be writable. + EXPECT_EQ(MOJO_RESULT_OK, + Wait(h_0, MOJO_WAIT_FLAG_WRITABLE, 0)); + + // Try to read. + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + ReadMessage(h_0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Write to |h_1|. + static const char hello[] = "hello"; + memcpy(buffer, hello, sizeof(hello)); + buffer_size = static_cast<uint32_t>(sizeof(hello)); + EXPECT_EQ(MOJO_RESULT_OK, + WriteMessage(h_1, + hello, buffer_size, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // |h_0| should be readable. + wf = MOJO_WAIT_FLAG_READABLE; + EXPECT_EQ(MOJO_RESULT_OK, + WaitMany(&h_0, &wf, 1, MOJO_DEADLINE_INDEFINITE)); + + // Read from |h_0|. + memset(buffer, 0, sizeof(buffer)); + buffer_size = static_cast<uint32_t>(sizeof(buffer)); + EXPECT_EQ(MOJO_RESULT_OK, + ReadMessage(h_0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(hello)), buffer_size); + EXPECT_EQ(0, memcmp(hello, buffer, sizeof(hello))); + + // |h_0| should no longer be readable. + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + Wait(h_0, MOJO_WAIT_FLAG_READABLE, 10)); + + // Close |h_0|. + EXPECT_EQ(MOJO_RESULT_OK, Close(h_0)); + + // |h_1| should no longer be readable or writable. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + Wait(h_1, MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE, 1000)); + + EXPECT_EQ(MOJO_RESULT_OK, Close(h_1)); +} + +// TODO(vtl): Add multi-threaded tests. + +} // namespace +} // namespace mojo diff --git a/mojo/public/tests/test_support.cc b/mojo/public/tests/test_support.cc new file mode 100644 index 0000000..0140c9f --- /dev/null +++ b/mojo/public/tests/test_support.cc @@ -0,0 +1,48 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/tests/test_support.h" + +#include "base/test/perf_log.h" +#include "base/time/time.h" +#include "mojo/system/core_impl.h" + +namespace mojo { +namespace test { + +TestBase::TestBase() { +} + +TestBase::~TestBase() { +} + +void TestBase::SetUp() { + if (!system::CoreImpl::Get()) + system::CoreImpl::Init(); +} + +void IterateAndReportPerf(const char* test_name, + base::Callback<void()> single_iteration) { + // TODO(vtl): These should be specifiable using command-line flags. + static const size_t kGranularity = 100; + static const double kPerftestTimeSeconds = 3.0; + + const base::TimeTicks start_time = base::TimeTicks::HighResNow(); + base::TimeTicks end_time; + size_t iterations = 0; + do { + for (size_t i = 0; i < kGranularity; i++) + single_iteration.Run(); + iterations += kGranularity; + + end_time = base::TimeTicks::HighResNow(); + } while ((end_time - start_time).InSecondsF() < kPerftestTimeSeconds); + + base::LogPerfResult(test_name, + iterations / (end_time - start_time).InSecondsF(), + "iterations/second"); +} + +} // namespace test +} // namespace mojo diff --git a/mojo/public/tests/test_support.h b/mojo/public/tests/test_support.h new file mode 100644 index 0000000..34ec0bf --- /dev/null +++ b/mojo/public/tests/test_support.h @@ -0,0 +1,36 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_PUBLIC_TESTS_TEST_SUPPORT_H_ +#define MOJO_PUBLIC_TESTS_TEST_SUPPORT_H_ + +#include "base/basictypes.h" +#include "base/callback.h" +#include "base/compiler_specific.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace test { + +class TestBase : public testing::Test { + public: + TestBase(); + virtual ~TestBase(); + + virtual void SetUp() OVERRIDE; + + private: + DISALLOW_COPY_AND_ASSIGN(TestBase); +}; + +// Run |single_iteration| an appropriate number of times and report its +// performance appropriately. (This actually runs |single_iteration| for a fixed +// amount of time and reports the number of iterations per unit time.) +void IterateAndReportPerf(const char* test_name, + base::Callback<void()> single_iteration); + +} // namespace test +} // namespace mojo + +#endif // MOJO_PUBLIC_TESTS_TEST_SUPPORT_H_ diff --git a/mojo/system/core.cc b/mojo/system/core.cc new file mode 100644 index 0000000..5367fd31 --- /dev/null +++ b/mojo/system/core.cc @@ -0,0 +1,53 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/public/system/core.h" + +#include "mojo/system/core_impl.h" + +extern "C" { + +MojoResult MojoClose(MojoHandle handle) { + return mojo::system::CoreImpl::Get()->Close(handle); +} + +MojoResult MojoWait(MojoHandle handle, + MojoWaitFlags flags, + MojoDeadline deadline) { + return mojo::system::CoreImpl::Get()->Wait(handle, flags, deadline); +} + +MojoResult MojoWaitMany(const MojoHandle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline) { + return mojo::system::CoreImpl::Get()->WaitMany(handles, flags, num_handles, + deadline); +} + +MojoResult MojoCreateMessagePipe(MojoHandle* handle_0, MojoHandle* handle_1) { + return mojo::system::CoreImpl::Get()->CreateMessagePipe(handle_0, handle_1); +} + +MojoResult MojoWriteMessage(MojoHandle handle, + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags) { + return mojo::system::CoreImpl::Get()->WriteMessage(handle, + bytes, num_bytes, + handles, num_handles, + flags); +} + +MojoResult MojoReadMessage(MojoHandle handle, + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) { + return mojo::system::CoreImpl::Get()->ReadMessage(handle, + bytes, num_bytes, + handles, num_handles, + flags); +} + +} // extern "C" diff --git a/mojo/system/core_impl.cc b/mojo/system/core_impl.cc new file mode 100644 index 0000000..00d1107 --- /dev/null +++ b/mojo/system/core_impl.cc @@ -0,0 +1,275 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/core_impl.h" + +#include <vector> + +#include "base/logging.h" +#include "mojo/system/dispatcher.h" +#include "mojo/system/limits.h" +#include "mojo/system/memory.h" +#include "mojo/system/message_pipe.h" +#include "mojo/system/message_pipe_dispatcher.h" +#include "mojo/system/waiter.h" + +namespace mojo { +namespace system { + +// Implementation notes +// +// Mojo primitives are implemented by the singleton |CoreImpl| object. Most +// calls are for a "primary" handle (the first argument). +// |CoreImpl::GetDispatcher()| is used to look up a |Dispatcher| object for a +// given handle. That object implements most primitives for that object. The +// wait primitives are not attached to objects and are implemented by |CoreImpl| +// itself. +// +// Some objects have multiple handles associated to them, e.g., message pipes +// (which have two). In such a case, there is still a |Dispatcher| (e.g., +// |MessagePipeDispatcher|) for each handle, with each handle having a strong +// reference to the common "secondary" object (e.g., |MessagePipe|). This +// secondary object does NOT have any references to the |Dispatcher|s (even if +// it did, it wouldn't be able to do anything with them due to lock order +// requirements -- see below). +// +// Waiting is implemented by having the thread that wants to wait call the +// |Dispatcher|s for the handles that it wants to wait on with a |Waiter| +// object; this |Waiter| object may be created on the stack of that thread or be +// kept in thread local storage for that thread (TODO(vtl): future improvement). +// The |Dispatcher| then adds the |Waiter| to a |WaiterList| that's either owned +// by that |Dispatcher| (see |SimpleDispatcher|) or by a secondary object (e.g., +// |MessagePipe|). To signal/wake a |Waiter|, the object in question -- either a +// |SimpleDispatcher| or a secondary object -- talks to its |WaiterList|. + +// Thread-safety notes +// +// Mojo primitives calls are thread-safe. We achieve this with relatively +// fine-grained locking. There is a global handle table lock. This lock should +// be held as briefly as possible (TODO(vtl): a future improvement would be to +// switch it to a reader-writer lock). Each |Dispatcher| object then has a lock +// (which subclasses can use to protect their data). +// +// The lock ordering is as follows: +// 1. global handle table lock +// 2. |Dispatcher| locks +// 3. secondary object locks +// ... +// INF. |Waiter| locks +// +// Notes: +// - While holding a |Dispatcher| lock, you may not unconditionally attempt +// to take another |Dispatcher| lock. (This has consequences on the +// concurrency semantics of |MojoWriteMessage()| when passing handles.) +// Doing so would lead to deadlock. +// - Locks at the "INF" level may not have any locks taken while they are +// held. + +// static +CoreImpl* CoreImpl::singleton_ = NULL; + +// static +void CoreImpl::Init() { + CHECK(!singleton_); + singleton_ = new CoreImpl(); +} + +MojoResult CoreImpl::Close(MojoHandle handle) { + if (handle == MOJO_HANDLE_INVALID) + return MOJO_RESULT_INVALID_ARGUMENT; + + scoped_refptr<Dispatcher> dispatcher; + { + base::AutoLock locker(handle_table_lock_); + HandleTableMap::iterator it = handle_table_.find(handle); + if (it == handle_table_.end()) + return MOJO_RESULT_INVALID_ARGUMENT; + dispatcher = it->second; + handle_table_.erase(it); + } + + // The dispatcher doesn't have a say in being closed, but gets notified of it. + // Note: This is done outside of |handle_table_lock_|. As a result, there's a + // race condition that the dispatcher must handle; see the comment in + // |Dispatcher| in dispatcher.h. + return dispatcher->Close(); +} + +MojoResult CoreImpl::Wait(MojoHandle handle, + MojoWaitFlags flags, + MojoDeadline deadline) { + return WaitManyInternal(&handle, &flags, 1, deadline); +} + +MojoResult CoreImpl::WaitMany(const MojoHandle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline) { + if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0]))) + return MOJO_RESULT_INVALID_ARGUMENT; + if (!VerifyUserPointer(flags, num_handles, sizeof(flags[0]))) + return MOJO_RESULT_INVALID_ARGUMENT; + if (num_handles < 1) + return MOJO_RESULT_INVALID_ARGUMENT; + if (num_handles > kMaxWaitManyNumHandles) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + return WaitManyInternal(handles, flags, num_handles, deadline); +} + +MojoResult CoreImpl::CreateMessagePipe(MojoHandle* handle_0, + MojoHandle* handle_1) { + scoped_refptr<MessagePipeDispatcher> dispatcher_0( + new MessagePipeDispatcher()); + scoped_refptr<MessagePipeDispatcher> dispatcher_1( + new MessagePipeDispatcher()); + + MojoHandle h0, h1; + { + base::AutoLock locker(handle_table_lock_); + + h0 = AddDispatcherNoLock(dispatcher_0); + if (h0 == MOJO_HANDLE_INVALID) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + h1 = AddDispatcherNoLock(dispatcher_1); + if (h1 == MOJO_HANDLE_INVALID) { + handle_table_.erase(h0); + return MOJO_RESULT_RESOURCE_EXHAUSTED; + } + } + + scoped_refptr<MessagePipe> message_pipe(new MessagePipe()); + dispatcher_0->Init(message_pipe, 0); + dispatcher_1->Init(message_pipe, 1); + + *handle_0 = h0; + *handle_1 = h1; + return MOJO_RESULT_OK; +} + +MojoResult CoreImpl::WriteMessage( + MojoHandle handle, + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags) { + scoped_refptr<Dispatcher> dispatcher(GetDispatcher(handle)); + if (!dispatcher.get()) + return MOJO_RESULT_INVALID_ARGUMENT; + + return dispatcher->WriteMessage(bytes, num_bytes, + handles, num_handles, + flags); +} + +MojoResult CoreImpl::ReadMessage( + MojoHandle handle, + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) { + scoped_refptr<Dispatcher> dispatcher(GetDispatcher(handle)); + if (!dispatcher.get()) + return MOJO_RESULT_INVALID_ARGUMENT; + + return dispatcher->ReadMessage(bytes, num_bytes, + handles, num_handles, + flags); +} + +CoreImpl::CoreImpl() + : next_handle_(MOJO_HANDLE_INVALID + 1) { +} + +CoreImpl::~CoreImpl() { + // This should usually not be reached (the singleton lives forever), except + // in tests. +} + +scoped_refptr<Dispatcher> CoreImpl::GetDispatcher(MojoHandle handle) { + if (handle == MOJO_HANDLE_INVALID) + return NULL; + + base::AutoLock locker(handle_table_lock_); + HandleTableMap::iterator it = handle_table_.find(handle); + if (it == handle_table_.end()) + return NULL; + + return it->second; +} + +MojoHandle CoreImpl::AddDispatcherNoLock(scoped_refptr<Dispatcher> dispatcher) { + DCHECK(dispatcher.get()); + handle_table_lock_.AssertAcquired(); + DCHECK_NE(next_handle_, MOJO_HANDLE_INVALID); + + if (handle_table_.size() >= kMaxHandleTableSize) + return MOJO_HANDLE_INVALID; + + // TODO(vtl): Maybe we want to do something different/smarter. (Or maybe try + // assigning randomly?) + while (handle_table_.find(next_handle_) != handle_table_.end()) { + next_handle_++; + if (next_handle_ == MOJO_HANDLE_INVALID) + next_handle_++; + } + + MojoHandle new_handle = next_handle_; + handle_table_[new_handle] = dispatcher; + + next_handle_++; + if (next_handle_ == MOJO_HANDLE_INVALID) + next_handle_++; + + return new_handle; +} + +// Note: We allow |handles| to repeat the same handle multiple times, since +// different flags may be specified. +// TODO(vtl): This incurs a performance cost in |RemoveWaiter()|. Analyze this +// more carefully and address it if necessary. +MojoResult CoreImpl::WaitManyInternal(const MojoHandle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline) { + DCHECK_GT(num_handles, 0u); + + std::vector<scoped_refptr<Dispatcher> > dispatchers; + dispatchers.reserve(num_handles); + for (uint32_t i = 0; i < num_handles; i++) { + scoped_refptr<Dispatcher> d = GetDispatcher(handles[i]); + if (!d.get()) + return MOJO_RESULT_INVALID_ARGUMENT; + dispatchers.push_back(d); + } + + // TODO(vtl): Should make the waiter live (permanently) in TLS. + Waiter waiter; + waiter.Init(); + + uint32_t i; + MojoResult rv = MOJO_RESULT_OK; + for (i = 0; i < num_handles; i++) { + rv = dispatchers[i]->AddWaiter(&waiter, + flags[i], + static_cast<MojoResult>(i)); + if (rv != MOJO_RESULT_OK) + break; + } + uint32_t num_added = i; + + if (rv == MOJO_RESULT_ALREADY_EXISTS) + rv = static_cast<MojoResult>(i); // The i-th one is already "triggered". + else if (rv == MOJO_RESULT_OK) + rv = waiter.Wait(deadline); + + // Make sure no other dispatchers try to wake |waiter| for the current + // |Wait()|/|WaitMany()| call. (Only after doing this can |waiter| be + // destroyed, but this would still be required if the waiter were in TLS.) + for (i = 0; i < num_added; i++) + dispatchers[i]->RemoveWaiter(&waiter); + + return rv; +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/core_impl.h b/mojo/system/core_impl.h new file mode 100644 index 0000000..1417a06 --- /dev/null +++ b/mojo/system/core_impl.h @@ -0,0 +1,104 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_CORE_IMPL_H_ +#define MOJO_SYSTEM_CORE_IMPL_H_ + +#include "base/basictypes.h" +#include "base/containers/hash_tables.h" +#include "base/memory/ref_counted.h" +#include "base/synchronization/lock.h" +#include "mojo/public/system/core.h" + +namespace mojo { +namespace system { + +class CoreImpl; +class Dispatcher; + +namespace test { +class CoreTestBase; +} + +// |CoreImpl| is a singleton object that implements the Mojo system calls. With +// the (obvious) exception of |Init()|, which must be called first (and the call +// completed) before making any other calls, all the public methods are +// thread-safe. +class CoreImpl { + public: + static void Init(); + + static CoreImpl* Get() { + return singleton_; + } + + MojoResult Close(MojoHandle handle); + + MojoResult Wait(MojoHandle handle, + MojoWaitFlags flags, + MojoDeadline deadline); + + MojoResult WaitMany(const MojoHandle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline); + + MojoResult CreateMessagePipe(MojoHandle* handle_0, MojoHandle* handle_1); + + MojoResult WriteMessage(MojoHandle handle, + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags); + + MojoResult ReadMessage(MojoHandle handle, + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags); + + private: + friend class test::CoreTestBase; + + typedef base::hash_map<MojoHandle, scoped_refptr<Dispatcher> > + HandleTableMap; + + CoreImpl(); + ~CoreImpl(); + + // Looks up the dispatcher for the given handle. Returns null if the handle is + // invalid. + scoped_refptr<Dispatcher> GetDispatcher(MojoHandle handle); + + // Assigns a new handle for the given dispatcher (which must be valid); + // returns |MOJO_HANDLE_INVALID| on failure (due to hitting resource limits). + // Must be called under |handle_table_lock_|. + MojoHandle AddDispatcherNoLock(scoped_refptr<Dispatcher> dispatcher); + + // Internal implementation of |Wait()| and |WaitMany()|; doesn't do basic + // validation of arguments. + MojoResult WaitManyInternal(const MojoHandle* handles, + const MojoWaitFlags* flags, + uint32_t num_handles, + MojoDeadline deadline); + + // --------------------------------------------------------------------------- + + static CoreImpl* singleton_; + + // --------------------------------------------------------------------------- + + // TODO(vtl): |handle_table_lock_| should be a reader-writer lock (if only we + // had them). + base::Lock handle_table_lock_; // Protects the immediately-following members. + HandleTableMap handle_table_; + MojoHandle next_handle_; // Invariant: never |MOJO_HANDLE_INVALID|. + + // --------------------------------------------------------------------------- + + DISALLOW_COPY_AND_ASSIGN(CoreImpl); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_CORE_IMPL_H_ diff --git a/mojo/system/core_impl_unittest.cc b/mojo/system/core_impl_unittest.cc new file mode 100644 index 0000000..1ace04d --- /dev/null +++ b/mojo/system/core_impl_unittest.cc @@ -0,0 +1,245 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/core_impl.h" + +#include "mojo/system/core_test_base.h" + +namespace mojo { +namespace system { +namespace { + +class CoreImplTest : public test::CoreTestBase { +}; + +TEST_F(CoreImplTest, Basic) { + MockHandleInfo info; + + EXPECT_EQ(0u, info.GetCtorCallCount()); + MojoHandle h = CreateMockHandle(&info); + EXPECT_EQ(1u, info.GetCtorCallCount()); + EXPECT_NE(h, MOJO_HANDLE_INVALID); + + EXPECT_EQ(0u, info.GetWriteMessageCallCount()); + EXPECT_EQ(MOJO_RESULT_OK, + core()->WriteMessage(h, NULL, 0, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(1u, info.GetWriteMessageCallCount()); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WriteMessage(h, NULL, 1, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(2u, info.GetWriteMessageCallCount()); + + EXPECT_EQ(0u, info.GetReadMessageCallCount()); + uint32_t num_bytes = 0; + EXPECT_EQ(MOJO_RESULT_OK, + core()->ReadMessage(h, NULL, &num_bytes, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(1u, info.GetReadMessageCallCount()); + num_bytes = 1; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->ReadMessage(h, NULL, &num_bytes, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(2u, info.GetReadMessageCallCount()); + + EXPECT_EQ(0u, info.GetAddWaiterCallCount()); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->Wait(h, MOJO_WAIT_FLAG_EVERYTHING, + MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(1u, info.GetAddWaiterCallCount()); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->Wait(h, MOJO_WAIT_FLAG_EVERYTHING, 0)); + EXPECT_EQ(2u, info.GetAddWaiterCallCount()); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->Wait(h, MOJO_WAIT_FLAG_EVERYTHING, 10 * 1000)); + EXPECT_EQ(3u, info.GetAddWaiterCallCount()); + MojoWaitFlags wait_flags = MOJO_WAIT_FLAG_EVERYTHING; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->WaitMany(&h, &wait_flags, 1, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(4u, info.GetAddWaiterCallCount()); + + EXPECT_EQ(0u, info.GetDtorCallCount()); + EXPECT_EQ(0u, info.GetCloseCallCount()); + EXPECT_EQ(0u, info.GetCancelAllWaitersCallCount()); + EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h)); + EXPECT_EQ(1u, info.GetCancelAllWaitersCallCount()); + EXPECT_EQ(1u, info.GetCloseCallCount()); + EXPECT_EQ(1u, info.GetDtorCallCount()); + + // No waiters should ever have ever been added. + EXPECT_EQ(0u, info.GetRemoveWaiterCallCount()); +} + +TEST_F(CoreImplTest, InvalidArguments) { + // |Close()|: + { + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(MOJO_HANDLE_INVALID)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(10)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(1000000000)); + + // Test a double-close. + MockHandleInfo info; + MojoHandle h = CreateMockHandle(&info); + EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h)); + EXPECT_EQ(1u, info.GetCloseCallCount()); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->Close(h)); + EXPECT_EQ(1u, info.GetCloseCallCount()); + } + + // |Wait()|: + { + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->Wait(MOJO_HANDLE_INVALID, MOJO_WAIT_FLAG_EVERYTHING, + MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->Wait(10, MOJO_WAIT_FLAG_EVERYTHING, + MOJO_DEADLINE_INDEFINITE)); + } + + // |WaitMany()|: + { + MojoHandle handles[2] = { MOJO_HANDLE_INVALID, MOJO_HANDLE_INVALID }; + MojoWaitFlags flags[2] = { MOJO_WAIT_FLAG_EVERYTHING, + MOJO_WAIT_FLAG_EVERYTHING }; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(handles, flags, 0, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(NULL, flags, 0, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(handles, NULL, 0, MOJO_DEADLINE_INDEFINITE)); + + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(NULL, flags, 1, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(handles, NULL, 1, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(handles, flags, 1, MOJO_DEADLINE_INDEFINITE)); + + MockHandleInfo info[2]; + handles[0] = CreateMockHandle(&info[0]); + + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->WaitMany(handles, flags, 1, MOJO_DEADLINE_INDEFINITE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(handles, flags, 2, MOJO_DEADLINE_INDEFINITE)); + handles[1] = handles[0] + 1; // Invalid handle. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WaitMany(handles, flags, 2, MOJO_DEADLINE_INDEFINITE)); + handles[1] = CreateMockHandle(&info[1]); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->WaitMany(handles, flags, 2, MOJO_DEADLINE_INDEFINITE)); + + EXPECT_EQ(MOJO_RESULT_OK, core()->Close(handles[0])); + EXPECT_EQ(MOJO_RESULT_OK, core()->Close(handles[1])); + } +} + +// TODO(vtl): test |Wait()| and |WaitMany()| properly +// - including |WaitMany()| with the same handle more than once (with +// same/different flags) + +TEST_F(CoreImplTest, MessagePipe) { + MojoHandle h[2]; + + EXPECT_EQ(MOJO_RESULT_OK, core()->CreateMessagePipe(&h[0], &h[1])); + // Should get two distinct, valid handles. + EXPECT_NE(h[0], MOJO_HANDLE_INVALID); + EXPECT_NE(h[1], MOJO_HANDLE_INVALID); + EXPECT_NE(h[0], h[1]); + + // Neither should be readable. + MojoWaitFlags flags[2] = { MOJO_WAIT_FLAG_READABLE, MOJO_WAIT_FLAG_READABLE }; + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + core()->WaitMany(h, flags, 2, 0)); + + // Try to read anyway. + char buffer[1] = { 'a' }; + uint32_t buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + core()->ReadMessage(h[0], buffer, &buffer_size, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + // Check that it left its inputs alone. + EXPECT_EQ('a', buffer[0]); + EXPECT_EQ(1u, buffer_size); + + // Both should be writable. + EXPECT_EQ(MOJO_RESULT_OK, + core()->Wait(h[0], MOJO_WAIT_FLAG_WRITABLE, 1000000000)); + EXPECT_EQ(MOJO_RESULT_OK, + core()->Wait(h[1], MOJO_WAIT_FLAG_WRITABLE, 1000000000)); + + // Also check that |h[1]| is writable using |WaitMany()|. + flags[0] = MOJO_WAIT_FLAG_READABLE; + flags[1] = MOJO_WAIT_FLAG_WRITABLE; + EXPECT_EQ(1, core()->WaitMany(h, flags, 2, MOJO_DEADLINE_INDEFINITE)); + + // Write to |h[1]|. + buffer[0] = 'b'; + EXPECT_EQ(MOJO_RESULT_OK, + core()->WriteMessage(h[1], buffer, 1, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Check that |h[0]| is now readable. + flags[0] = MOJO_WAIT_FLAG_READABLE; + flags[1] = MOJO_WAIT_FLAG_READABLE; + EXPECT_EQ(0, core()->WaitMany(h, flags, 2, MOJO_DEADLINE_INDEFINITE)); + + // Read from |h[0]|. + // First, get only the size. + buffer_size = 0; + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + core()->ReadMessage(h[0], NULL, &buffer_size, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(1u, buffer_size); + // Then actually read it. + buffer[0] = 'c'; + buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_OK, + core()->ReadMessage(h[0], buffer, &buffer_size, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ('b', buffer[0]); + EXPECT_EQ(1u, buffer_size); + + // |h[0]| should no longer be readable. + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + core()->Wait(h[0], MOJO_WAIT_FLAG_READABLE, 0)); + + // Write to |h[0]|. + buffer[0] = 'd'; + EXPECT_EQ(MOJO_RESULT_OK, + core()->WriteMessage(h[0], buffer, 1, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Close |h[0]|. + EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h[0])); + + // Check that |h[1]| is no longer writable (and will never be). + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->Wait(h[1], MOJO_WAIT_FLAG_WRITABLE, 1000000000)); + + // Check that |h[1]| is still readable (for the moment). + EXPECT_EQ(MOJO_RESULT_OK, + core()->Wait(h[1], MOJO_WAIT_FLAG_READABLE, 1000000000)); + + // Discard a message from |h[1]|. + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + core()->ReadMessage(h[1], NULL, NULL, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + + // |h[1]| is no longer readable (and will never be). + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->Wait(h[1], MOJO_WAIT_FLAG_READABLE, 1000000000)); + + // Try writing to |h[1]|. + buffer[0] = 'e'; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + core()->WriteMessage(h[1], buffer, 1, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + EXPECT_EQ(MOJO_RESULT_OK, core()->Close(h[1])); +} + +} // namespace +} // namespace system +} // namespace mojo diff --git a/mojo/system/core_test_base.cc b/mojo/system/core_test_base.cc new file mode 100644 index 0000000..36241f8 --- /dev/null +++ b/mojo/system/core_test_base.cc @@ -0,0 +1,227 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/core_test_base.h" + +#include <vector> + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "mojo/system/core_impl.h" +#include "mojo/system/dispatcher.h" +#include "mojo/system/memory.h" + +namespace mojo { +namespace system { +namespace test { + +namespace { + +// MockDispatcher -------------------------------------------------------------- + +class MockDispatcher : public Dispatcher { + public: + explicit MockDispatcher(CoreTestBase::MockHandleInfo* info) + : info_(info) { + CHECK(info_); + info_->IncrementCtorCallCount(); + } + + private: + friend class base::RefCountedThreadSafe<MockDispatcher>; + virtual ~MockDispatcher() { + info_->IncrementDtorCallCount(); + } + + // |Dispatcher| implementation/overrides: + virtual MojoResult CloseImplNoLock() OVERRIDE { + info_->IncrementCloseCallCount(); + lock().AssertAcquired(); + return MOJO_RESULT_OK; + } + + virtual MojoResult WriteMessageImplNoLock( + const void* bytes, + uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoWriteMessageFlags /*flags*/) OVERRIDE { + info_->IncrementWriteMessageCallCount(); + lock().AssertAcquired(); + + if (!VerifyUserPointer(bytes, num_bytes, 1)) + return MOJO_RESULT_INVALID_ARGUMENT; + if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0]))) + return MOJO_RESULT_INVALID_ARGUMENT; + + return MOJO_RESULT_OK; + } + + virtual MojoResult ReadMessageImplNoLock( + void* bytes, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags /*flags*/) OVERRIDE { + info_->IncrementReadMessageCallCount(); + lock().AssertAcquired(); + + if (num_bytes && !VerifyUserPointer(bytes, *num_bytes, 1)) + return MOJO_RESULT_INVALID_ARGUMENT; + if (num_handles && + !VerifyUserPointer(handles, *num_handles, sizeof(handles[0]))) + return MOJO_RESULT_INVALID_ARGUMENT; + + return MOJO_RESULT_OK; + } + + virtual MojoResult AddWaiterImplNoLock(Waiter* /*waiter*/, + MojoWaitFlags /*flags*/, + MojoResult /*wake_result*/) OVERRIDE { + info_->IncrementAddWaiterCallCount(); + lock().AssertAcquired(); + return MOJO_RESULT_FAILED_PRECONDITION; + } + + virtual void RemoveWaiterImplNoLock(Waiter* /*waiter*/) OVERRIDE { + info_->IncrementRemoveWaiterCallCount(); + lock().AssertAcquired(); + } + + virtual void CancelAllWaitersNoLock() OVERRIDE { + info_->IncrementCancelAllWaitersCallCount(); + lock().AssertAcquired(); + } + + CoreTestBase::MockHandleInfo* const info_; + + DISALLOW_COPY_AND_ASSIGN(MockDispatcher); +}; + +} // namespace + +// CoreTestBase ---------------------------------------------------------------- + +CoreTestBase::CoreTestBase() { +} + +CoreTestBase::~CoreTestBase() { +} + +void CoreTestBase::SetUp() { + core_ = new CoreImpl(); +} + +void CoreTestBase::TearDown() { + delete core_; + core_ = NULL; +} + +MojoHandle CoreTestBase::CreateMockHandle(CoreTestBase::MockHandleInfo* info) { + CHECK(core_); + scoped_refptr<MockDispatcher> dispatcher(new MockDispatcher(info)); + base::AutoLock locker(core_->handle_table_lock_); + return core_->AddDispatcherNoLock(dispatcher); +} + +// CoreTestBase_MockHandleInfo ------------------------------------------------- + +CoreTestBase_MockHandleInfo::CoreTestBase_MockHandleInfo() + : ctor_call_count_(0), + dtor_call_count_(0), + close_call_count_(0), + write_message_call_count_(0), + read_message_call_count_(0), + add_waiter_call_count_(0), + remove_waiter_call_count_(0), + cancel_all_waiters_call_count_(0) { +} + +CoreTestBase_MockHandleInfo::~CoreTestBase_MockHandleInfo() { +} + +unsigned CoreTestBase_MockHandleInfo::GetCtorCallCount() const { + base::AutoLock locker(lock_); + return ctor_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetDtorCallCount() const { + base::AutoLock locker(lock_); + return dtor_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetCloseCallCount() const { + base::AutoLock locker(lock_); + return close_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetWriteMessageCallCount() const { + base::AutoLock locker(lock_); + return write_message_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetReadMessageCallCount() const { + base::AutoLock locker(lock_); + return read_message_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetAddWaiterCallCount() const { + base::AutoLock locker(lock_); + return add_waiter_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetRemoveWaiterCallCount() const { + base::AutoLock locker(lock_); + return remove_waiter_call_count_; +} + +unsigned CoreTestBase_MockHandleInfo::GetCancelAllWaitersCallCount() const { + base::AutoLock locker(lock_); + return cancel_all_waiters_call_count_; +} + +void CoreTestBase_MockHandleInfo::IncrementCtorCallCount() { + base::AutoLock locker(lock_); + ctor_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementDtorCallCount() { + base::AutoLock locker(lock_); + dtor_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementCloseCallCount() { + base::AutoLock locker(lock_); + close_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementWriteMessageCallCount() { + base::AutoLock locker(lock_); + write_message_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementReadMessageCallCount() { + base::AutoLock locker(lock_); + read_message_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementAddWaiterCallCount() { + base::AutoLock locker(lock_); + add_waiter_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementRemoveWaiterCallCount() { + base::AutoLock locker(lock_); + remove_waiter_call_count_++; +} + +void CoreTestBase_MockHandleInfo::IncrementCancelAllWaitersCallCount() { + base::AutoLock locker(lock_); + cancel_all_waiters_call_count_++; +} + +} // namespace test +} // namespace system +} // namespace mojo diff --git a/mojo/system/core_test_base.h b/mojo/system/core_test_base.h new file mode 100644 index 0000000..114e62e --- /dev/null +++ b/mojo/system/core_test_base.h @@ -0,0 +1,87 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_CORE_TEST_BASE_H_ +#define MOJO_SYSTEM_CORE_TEST_BASE_H_ + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/synchronization/lock.h" +#include "mojo/public/system/core.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { + +class CoreImpl; + +namespace test { + +class CoreTestBase_MockHandleInfo; + +class CoreTestBase : public testing::Test { + public: + typedef CoreTestBase_MockHandleInfo MockHandleInfo; + + CoreTestBase(); + virtual ~CoreTestBase(); + + virtual void SetUp() OVERRIDE; + virtual void TearDown() OVERRIDE; + + protected: + // |info| must remain alive until the returned handle is closed. + MojoHandle CreateMockHandle(MockHandleInfo* info); + + CoreImpl* core() { return core_; } + + private: + CoreImpl* core_; + + DISALLOW_COPY_AND_ASSIGN(CoreTestBase); +}; + +class CoreTestBase_MockHandleInfo { + public: + CoreTestBase_MockHandleInfo(); + ~CoreTestBase_MockHandleInfo(); + + unsigned GetCtorCallCount() const; + unsigned GetDtorCallCount() const; + unsigned GetCloseCallCount() const; + unsigned GetWriteMessageCallCount() const; + unsigned GetReadMessageCallCount() const; + unsigned GetAddWaiterCallCount() const; + unsigned GetRemoveWaiterCallCount() const; + unsigned GetCancelAllWaitersCallCount() const; + + // For use by |MockDispatcher|: + void IncrementCtorCallCount(); + void IncrementDtorCallCount(); + void IncrementCloseCallCount(); + void IncrementWriteMessageCallCount(); + void IncrementReadMessageCallCount(); + void IncrementAddWaiterCallCount(); + void IncrementRemoveWaiterCallCount(); + void IncrementCancelAllWaitersCallCount(); + + private: + mutable base::Lock lock_; // Protects the following members. + unsigned ctor_call_count_; + unsigned dtor_call_count_; + unsigned close_call_count_; + unsigned write_message_call_count_; + unsigned read_message_call_count_; + unsigned add_waiter_call_count_; + unsigned remove_waiter_call_count_; + unsigned cancel_all_waiters_call_count_; + + DISALLOW_COPY_AND_ASSIGN(CoreTestBase_MockHandleInfo); +}; + +} // namespace test +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_CORE_TEST_BASE_H_ diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc new file mode 100644 index 0000000..04f3550 --- /dev/null +++ b/mojo/system/dispatcher.cc @@ -0,0 +1,131 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/dispatcher.h" + +#include "base/logging.h" + +namespace mojo { +namespace system { + +MojoResult Dispatcher::Close() { + base::AutoLock locker(lock_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + is_closed_ = true; + CancelAllWaitersNoLock(); + return CloseImplNoLock(); +} + +MojoResult Dispatcher::WriteMessage(const void* bytes, + uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoWriteMessageFlags flags) { + base::AutoLock locker(lock_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return WriteMessageImplNoLock(bytes, num_bytes, handles, num_handles, flags); +} + +MojoResult Dispatcher::ReadMessage(void* bytes, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags) { + base::AutoLock locker(lock_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return ReadMessageImplNoLock(bytes, num_bytes, handles, num_handles, flags); +} + +MojoResult Dispatcher::AddWaiter(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + DCHECK_GE(wake_result, 0); + + base::AutoLock locker(lock_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return AddWaiterImplNoLock(waiter, flags, wake_result); +} + +void Dispatcher::RemoveWaiter(Waiter* waiter) { + base::AutoLock locker(lock_); + if (is_closed_) + return; + RemoveWaiterImplNoLock(waiter); +} + +Dispatcher::Dispatcher() + : is_closed_(false) { +} + +Dispatcher::~Dispatcher() { + // Make sure that |Close()| was called. + DCHECK(is_closed_); +} + +void Dispatcher::CancelAllWaitersNoLock() { + lock_.AssertAcquired(); + DCHECK(is_closed_); + // By default, waiting isn't supported. Only dispatchers that can be waited on + // will do something nontrivial. +} + +MojoResult Dispatcher::CloseImplNoLock() { + lock_.AssertAcquired(); + DCHECK(is_closed_); + // This may not need to do anything. Dispatchers should override this to do + // any actual close-time cleanup necessary. + return MOJO_RESULT_OK; +} + +MojoResult Dispatcher::WriteMessageImplNoLock(const void* bytes, + uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoWriteMessageFlags flags) { + lock_.AssertAcquired(); + DCHECK(!is_closed_); + // By default, this isn't supported. Only dispatchers for message pipes (with + // whatever implementation, possibly a proxy) will do something nontrivial. + return MOJO_RESULT_INVALID_ARGUMENT; +} + +MojoResult Dispatcher::ReadMessageImplNoLock(void* bytes, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags) { + lock_.AssertAcquired(); + DCHECK(!is_closed_); + // By default, this isn't supported. Only dispatchers for message pipes (with + // whatever implementation, possibly a proxy) will do something nontrivial. + return MOJO_RESULT_INVALID_ARGUMENT; +} + +MojoResult Dispatcher::AddWaiterImplNoLock(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + lock_.AssertAcquired(); + DCHECK(!is_closed_); + // By default, waiting isn't supported. Only dispatchers that can be waited on + // will do something nontrivial. + return MOJO_RESULT_FAILED_PRECONDITION; +} + +void Dispatcher::RemoveWaiterImplNoLock(Waiter* waiter) { + lock_.AssertAcquired(); + DCHECK(!is_closed_); + // By default, waiting isn't supported. Only dispatchers that can be waited on + // will do something nontrivial. +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h new file mode 100644 index 0000000..6b44f6a --- /dev/null +++ b/mojo/system/dispatcher.h @@ -0,0 +1,106 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_DISPATCHER_H_ +#define MOJO_SYSTEM_DISPATCHER_H_ + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/synchronization/lock.h" +#include "mojo/public/system/core.h" + +namespace mojo { +namespace system { + +class Waiter; + +// A |Dispatcher| implements Mojo primitives that are "attached" to a particular +// handle. This includes most (all?) primitives except for |MojoWait...()|. This +// object is thread-safe, with its state being protected by a single lock +// |lock_|, which is also made available to implementation subclasses (via the +// |lock()| method). +class Dispatcher : public base::RefCountedThreadSafe<Dispatcher> { + public: + // These methods implement the various primitives named |Mojo...()|. These + // take |lock_| and handle races with |Close()|. Then they call out to + // subclasses' |...ImplNoLock()| methods (still under |lock_|), which actually + // implement the primitives. + // NOTE(vtl): This puts a big lock around each dispatcher (i.e., handle), and + // prevents the various |...ImplNoLock()|s from releasing the lock as soon as + // possible. If this becomes an issue, we can rethink this. + MojoResult Close(); + MojoResult WriteMessage(const void* bytes, + uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoWriteMessageFlags flags); + MojoResult ReadMessage(void* bytes, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags); + + // Adds a waiter to this dispatcher. The waiter will be woken up when this + // object changes state to satisfy |flags| with result |wake_result| (which + // must be >= 0, i.e., a success status). It will also be woken up when it + // becomes impossible for the object to ever satisfy |flags| with a suitable + // error status. + // + // Returns: + // - |MOJO_RESULT_OK| if the waiter was added; + // - |MOJO_RESULT_ALREADY_EXISTS| if |flags| is already satisfied; + // - |MOJO_RESULT_INVALID_ARGUMENT| if the dispatcher has been closed; and + // - |MOJO_RESULT_FAILED_PRECONDITION| if it is not (or no longer) possible + // that |flags| will ever be satisfied. + MojoResult AddWaiter(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result); + void RemoveWaiter(Waiter* waiter); + + protected: + Dispatcher(); + + friend class base::RefCountedThreadSafe<Dispatcher>; + virtual ~Dispatcher(); + + // These are to be overridden by subclasses (if necessary). They are called + // exactly once -- first |CancelAllWaitersNoLock()|, then |CloseImplNoLock()|, + // when the dispatcher is being closed. They are called under |lock_|. + virtual void CancelAllWaitersNoLock(); + virtual MojoResult CloseImplNoLock(); + + // These are to be overridden by subclasses (if necessary). They are never + // called after the dispatcher has been closed. They are called under |lock_|. + virtual MojoResult WriteMessageImplNoLock(const void* bytes, + uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoWriteMessageFlags flags); + virtual MojoResult ReadMessageImplNoLock(void* bytes, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags); + virtual MojoResult AddWaiterImplNoLock(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result); + virtual void RemoveWaiterImplNoLock(Waiter* waiter); + + // Available to subclasses. (Note: Returns a non-const reference, just like + // |base::AutoLock|'s constructor takes a non-const reference. + base::Lock& lock() const { return lock_; } + + private: + // This protects the following members as well as any state added by + // subclasses. + mutable base::Lock lock_; + bool is_closed_; + + DISALLOW_COPY_AND_ASSIGN(Dispatcher); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_DISPATCHER_H_ diff --git a/mojo/system/dispatcher_unittest.cc b/mojo/system/dispatcher_unittest.cc new file mode 100644 index 0000000..a479fce --- /dev/null +++ b/mojo/system/dispatcher_unittest.cc @@ -0,0 +1,189 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/dispatcher.h" + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_vector.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/simple_thread.h" +#include "mojo/system/waiter.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +// Trivial subclass that makes the constructor public. +class TrivialDispatcher : public Dispatcher { + public: + TrivialDispatcher() {} + + private: + friend class base::RefCountedThreadSafe<TrivialDispatcher>; + virtual ~TrivialDispatcher() {} + + DISALLOW_COPY_AND_ASSIGN(TrivialDispatcher); +}; + +TEST(DispatcherTest, Basic) { + scoped_refptr<Dispatcher> d(new TrivialDispatcher()); + + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d->WriteMessage(NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d->ReadMessage(NULL, NULL, NULL, NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + Waiter w; + w.Init(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d->AddWaiter(&w, MOJO_WAIT_FLAG_EVERYTHING, 0)); + // Okay to remove even if it wasn't added (or was already removed). + d->RemoveWaiter(&w); + d->RemoveWaiter(&w); + + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d->WriteMessage(NULL, 0, NULL, 0, MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d->ReadMessage(NULL, NULL, NULL, NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d->AddWaiter(&w, MOJO_WAIT_FLAG_EVERYTHING, 0)); + d->RemoveWaiter(&w); +} + +class ThreadSafetyStressThread : public base::SimpleThread { + public: + enum DispatcherOp { + CLOSE = 0, + WRITE_MESSAGE, + READ_MESSAGE, + ADD_WAITER, + REMOVE_WAITER, + + DISPATCHER_OP_COUNT + }; + + ThreadSafetyStressThread(base::WaitableEvent* event, + scoped_refptr<Dispatcher> dispatcher, + DispatcherOp op) + : base::SimpleThread("thread_safety_stress_thread"), + event_(event), + dispatcher_(dispatcher), + op_(op) { + CHECK_LE(0, op_); + CHECK_LT(op_, DISPATCHER_OP_COUNT); + } + + virtual ~ThreadSafetyStressThread() { + Join(); + } + + private: + virtual void Run() OVERRIDE { + event_->Wait(); + + waiter_.Init(); + switch(op_) { + case CLOSE: { + MojoResult r = dispatcher_->Close(); + EXPECT_TRUE(r == MOJO_RESULT_OK || r == MOJO_RESULT_INVALID_ARGUMENT) + << "Result: " << r; + break; + } + case WRITE_MESSAGE: + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + dispatcher_->WriteMessage(NULL, 0, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + break; + case READ_MESSAGE: + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + dispatcher_->ReadMessage(NULL, NULL, NULL, NULL, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + break; + case ADD_WAITER: { + MojoResult r = dispatcher_->AddWaiter(&waiter_, + MOJO_WAIT_FLAG_EVERYTHING, 0); + EXPECT_TRUE(r == MOJO_RESULT_FAILED_PRECONDITION || + r == MOJO_RESULT_INVALID_ARGUMENT); + break; + } + case REMOVE_WAITER: + dispatcher_->RemoveWaiter(&waiter_); + break; + default: + NOTREACHED(); + break; + } + + // Always try to remove the waiter, in case we added it. + dispatcher_->RemoveWaiter(&waiter_); + } + + base::WaitableEvent* const event_; + const scoped_refptr<Dispatcher> dispatcher_; + const DispatcherOp op_; + + Waiter waiter_; + + DISALLOW_COPY_AND_ASSIGN(ThreadSafetyStressThread); +}; + +TEST(DispatcherTest, ThreadSafetyStress) { + static const size_t kRepeatCount = 20; + static const size_t kNumThreads = 100; + + for (size_t i = 0; i < kRepeatCount; i++) { + // Manual reset, not initially signalled. + base::WaitableEvent event(true, false); + scoped_refptr<Dispatcher> d(new TrivialDispatcher()); + + { + ScopedVector<ThreadSafetyStressThread> threads; + for (size_t j = 0; j < kNumThreads; j++) { + ThreadSafetyStressThread::DispatcherOp op = + static_cast<ThreadSafetyStressThread::DispatcherOp>( + (i+j) % ThreadSafetyStressThread::DISPATCHER_OP_COUNT); + threads.push_back(new ThreadSafetyStressThread(&event, d, op)); + threads.back()->Start(); + } + event.Signal(); // Kicks off real work on the threads. + } // Joins all the threads. + + // One of the threads should already have closed the dispatcher. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, d->Close()); + } +} + +TEST(DispatcherTest, ThreadSafetyStressNoClose) { + static const size_t kRepeatCount = 20; + static const size_t kNumThreads = 100; + + for (size_t i = 0; i < kRepeatCount; i++) { + // Manual reset, not initially signalled. + base::WaitableEvent event(true, false); + scoped_refptr<Dispatcher> d(new TrivialDispatcher()); + + { + ScopedVector<ThreadSafetyStressThread> threads; + for (size_t j = 0; j < kNumThreads; j++) { + ThreadSafetyStressThread::DispatcherOp op = + static_cast<ThreadSafetyStressThread::DispatcherOp>( + (i+j) % (ThreadSafetyStressThread::DISPATCHER_OP_COUNT-1) + 1); + threads.push_back(new ThreadSafetyStressThread(&event, d, op)); + threads.back()->Start(); + } + event.Signal(); // Kicks off real work on the threads. + } // Joins all the threads. + + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } +} + +} // namespace +} // namespace system +} // namespace mojo diff --git a/mojo/system/limits.h b/mojo/system/limits.h new file mode 100644 index 0000000..277c23a --- /dev/null +++ b/mojo/system/limits.h @@ -0,0 +1,25 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_LIMITS_H_ +#define MOJO_SYSTEM_LIMITS_H_ + +namespace mojo { +namespace system { + +// Maximum of open (Mojo) handles. +// TODO(vtl): This doesn't count "live" handles, some of which may live in +// messages. +const size_t kMaxHandleTableSize = 1000000; + +const size_t kMaxWaitManyNumHandles = kMaxHandleTableSize; + +const size_t kMaxMessageNumBytes = 4 * 1024 * 1024; + +const size_t kMaxMessageNumHandles = 10000; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_LIMITS_H_ diff --git a/mojo/system/memory.cc b/mojo/system/memory.cc new file mode 100644 index 0000000..df658ae --- /dev/null +++ b/mojo/system/memory.cc @@ -0,0 +1,26 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/memory.h" + +#include <limits> + +#include "base/logging.h" + +namespace mojo { +namespace system { + +bool VerifyUserPointer(const void* pointer, size_t count, size_t size_each) { + DCHECK_GT(size_each, 0u); + if (count > std::numeric_limits<size_t>::max() / size_each) + return false; + + // TODO(vtl): If running in kernel mode, do a full verification. For now, just + // check that it's non-null if |size| is nonzero. (A faster user mode + // implementation is also possible if this check is skipped.) + return count == 0 || !!pointer; +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/memory.h b/mojo/system/memory.h new file mode 100644 index 0000000..6a1dafc --- /dev/null +++ b/mojo/system/memory.h @@ -0,0 +1,27 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_MEMORY_H_ +#define MOJO_SYSTEM_MEMORY_H_ + +#include <stddef.h> + +namespace mojo { +namespace system { + +// Verify that |count * size_each| bytes can be read from the user |pointer| +// insofar as possible/necessary. |count| and |size_each| are specified +// separately instead of a single size, since |count * size_each| may overflow a +// |size_t|. |count| may be zero but |size_each| must be nonzero. +// +// For example, if running in kernel mode, this should be a full verification +// that the given memory is owned and readable by the user process. In user +// mode, if crashes are acceptable, this may do nothing at all (and always +// return true). +bool VerifyUserPointer(const void* pointer, size_t count, size_t size_each); + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_MEMORY_H_ diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc new file mode 100644 index 0000000..8461705 --- /dev/null +++ b/mojo/system/message_pipe.cc @@ -0,0 +1,248 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/message_pipe.h" + +#include "base/logging.h" +#include "base/stl_util.h" +#include "mojo/system/limits.h" +#include "mojo/system/memory.h" + +namespace mojo { +namespace system { + +namespace { + +unsigned DestinationPortFromSourcePort(unsigned port) { + DCHECK(port == 0 || port == 1); + return port ^ 1; +} + +} // namespace + +MessagePipe::MessagePipe() { + is_open_[0] = is_open_[1] = true; +} + +void MessagePipe::CancelAllWaiters(unsigned port) { + DCHECK(port == 0 || port == 1); + + base::AutoLock locker(lock_); + DCHECK(is_open_[port]); + + waiter_lists_[port].CancelAllWaiters(); +} + +void MessagePipe::Close(unsigned port) { + DCHECK(port == 0 || port == 1); + + unsigned destination_port = DestinationPortFromSourcePort(port); + + base::AutoLock locker(lock_); + DCHECK(is_open_[port]); + + // Record the old state of the other (destination) port, so we can tell if it + // changes. + // TODO(vtl): Maybe the |WaiterList| should track the old state, so that we + // don't have to do this. + MojoWaitFlags old_dest_satisfied_flags = MOJO_WAIT_FLAG_NONE; + MojoWaitFlags old_dest_satisfiable_flags = MOJO_WAIT_FLAG_NONE; + bool dest_is_open = is_open_[destination_port]; + if (dest_is_open) { + old_dest_satisfied_flags = SatisfiedFlagsNoLock(destination_port); + old_dest_satisfiable_flags = SatisfiableFlagsNoLock(destination_port); + } + + is_open_[port] = false; + STLDeleteElements(&message_queues_[port]); // Clear incoming queue for port. + + // Notify the other (destination) port if its state has changed. + if (dest_is_open) { + MojoWaitFlags new_dest_satisfied_flags = + SatisfiedFlagsNoLock(destination_port); + MojoWaitFlags new_dest_satisfiable_flags = + SatisfiableFlagsNoLock(destination_port); + if (new_dest_satisfied_flags != old_dest_satisfied_flags || + new_dest_satisfiable_flags != old_dest_satisfiable_flags) { + waiter_lists_[destination_port].AwakeWaitersForStateChange( + new_dest_satisfied_flags, new_dest_satisfiable_flags); + } + } +} + +MojoResult MessagePipe::WriteMessage( + unsigned port, + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags /*flags*/) { + DCHECK(port == 0 || port == 1); + + unsigned destination_port = DestinationPortFromSourcePort(port); + + if (!VerifyUserPointer(bytes, num_bytes, 1)) + return MOJO_RESULT_INVALID_ARGUMENT; + if (num_bytes > kMaxMessageNumBytes) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + if (!VerifyUserPointer(handles, num_handles, sizeof(handles[0]))) + return MOJO_RESULT_INVALID_ARGUMENT; + if (num_handles > kMaxMessageNumHandles) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + if (num_handles > 0) { + // TODO(vtl): Verify each handle. + NOTIMPLEMENTED(); + return MOJO_RESULT_UNIMPLEMENTED; + } + + // TODO(vtl): Handle flags. + + base::AutoLock locker(lock_); + DCHECK(is_open_[port]); + + // The destination port need not be open, unlike the source port. + if (!is_open_[destination_port]) + return MOJO_RESULT_FAILED_PRECONDITION; + + bool dest_was_empty = message_queues_[destination_port].empty(); + + // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|. + message_queues_[destination_port].push_back( + new MessageInTransit(bytes, num_bytes)); + // TODO(vtl): Support sending handles. + + // The other (destination) port was empty and now isn't, so it should now be + // readable. Wake up anyone waiting on this. + if (dest_was_empty) { + waiter_lists_[destination_port].AwakeWaitersForStateChange( + SatisfiedFlagsNoLock(destination_port), + SatisfiableFlagsNoLock(destination_port)); + } + + return MOJO_RESULT_OK; +} + +MojoResult MessagePipe::ReadMessage(unsigned port, + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) { + DCHECK(port == 0 || port == 1); + + const size_t max_bytes = num_bytes ? *num_bytes : 0; + if (!VerifyUserPointer(bytes, max_bytes, 1)) + return MOJO_RESULT_INVALID_ARGUMENT; + + const size_t max_handles = num_handles ? *num_handles : 0; + if (!VerifyUserPointer(handles, max_handles, sizeof(handles[0]))) + return MOJO_RESULT_INVALID_ARGUMENT; + + base::AutoLock locker(lock_); + DCHECK(is_open_[port]); + + if (message_queues_[port].empty()) + return MOJO_RESULT_NOT_FOUND; + + // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop + // and release the lock immediately. + bool not_enough_space = false; + MessageInTransit* const message = message_queues_[port].front(); + const size_t message_size = message->data.size(); + if (num_bytes) + *num_bytes = static_cast<uint32_t>(message_size); + if (message_size <= max_bytes) + memcpy(bytes, message->data.data(), message_size); + else + not_enough_space = true; + + // TODO(vtl): Support receiving handles. + if (num_handles) + *num_handles = 0; + + if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { + message_queues_[port].pop_front(); + delete message; + + // Now it's empty, thus no longer readable. + if (message_queues_[port].empty()) { + // It's currently not possible to wait for non-readability, but we should + // do the state change anyway. + waiter_lists_[port].AwakeWaitersForStateChange( + SatisfiedFlagsNoLock(port), SatisfiableFlagsNoLock(port)); + } + } + + if (not_enough_space) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + return MOJO_RESULT_OK; +} + +MojoResult MessagePipe::AddWaiter(unsigned port, + Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + DCHECK(port == 0 || port == 1); + + base::AutoLock locker(lock_); + DCHECK(is_open_[port]); + + if ((flags & SatisfiedFlagsNoLock(port))) + return MOJO_RESULT_ALREADY_EXISTS; + if (!(flags & SatisfiableFlagsNoLock(port))) + return MOJO_RESULT_FAILED_PRECONDITION; + + waiter_lists_[port].AddWaiter(waiter, flags, wake_result); + return MOJO_RESULT_OK; +} + +void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { + DCHECK(port == 0 || port == 1); + + base::AutoLock locker(lock_); + DCHECK(is_open_[port]); + + waiter_lists_[port].RemoveWaiter(waiter); +} + +MessagePipe::~MessagePipe() { + // Owned by the dispatchers. The owning dispatchers should only release us via + // their |Close()| method, which should inform us of being closed via our + // |Close()|. Thus these should already be null. + DCHECK(!is_open_[0]); + DCHECK(!is_open_[1]); +} + +MojoWaitFlags MessagePipe::SatisfiedFlagsNoLock(unsigned port) { + DCHECK(port == 0 || port == 1); + + unsigned destination_port = DestinationPortFromSourcePort(port); + + lock_.AssertAcquired(); + + MojoWaitFlags satisfied_flags = 0; + if (!message_queues_[port].empty()) + satisfied_flags |= MOJO_WAIT_FLAG_READABLE; + if (is_open_[destination_port]) + satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; + + return satisfied_flags; +} + +MojoWaitFlags MessagePipe::SatisfiableFlagsNoLock(unsigned port) { + DCHECK(port == 0 || port == 1); + + unsigned destination_port = DestinationPortFromSourcePort(port); + + lock_.AssertAcquired(); + + MojoWaitFlags satisfiable_flags = 0; + if (!message_queues_[port].empty() || is_open_[destination_port]) + satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; + if (is_open_[destination_port]) + satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; + + return satisfiable_flags; +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h new file mode 100644 index 0000000..c02f29e --- /dev/null +++ b/mojo/system/message_pipe.h @@ -0,0 +1,79 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_MESSAGE_PIPE_H_ +#define MOJO_SYSTEM_MESSAGE_PIPE_H_ + +#include <list> +#include <string> + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/synchronization/lock.h" +#include "mojo/public/system/core.h" +#include "mojo/system/waiter_list.h" + +namespace mojo { +namespace system { + +class Waiter; + +// |MessagePipe| is the secondary object implementing a message pipe (see the +// explanatory comment in core_impl.cc), and is jointly owned by the two +// dispatchers passed in to the constructor. This class is thread-safe. +class MessagePipe : public base::RefCountedThreadSafe<MessagePipe> { + public: + MessagePipe(); + + // These are called by the dispatcher to implement its methods of + // corresponding names. In all cases, the port |port| must be open. + void CancelAllWaiters(unsigned port); + void Close(unsigned port); + MojoResult WriteMessage(unsigned port, + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags); + MojoResult ReadMessage(unsigned port, + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags); + MojoResult AddWaiter(unsigned port, + Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result); + void RemoveWaiter(unsigned port, Waiter* waiter); + + private: + struct MessageInTransit { + MessageInTransit(const void* bytes, uint32_t num_bytes) + : data(static_cast<const char*>(bytes), num_bytes) {} + + // TODO(vtl): Replace with something more efficient. + std::string data; + }; + + friend class base::RefCountedThreadSafe<MessagePipe>; + virtual ~MessagePipe(); + + MojoWaitFlags SatisfiedFlagsNoLock(unsigned port); + MojoWaitFlags SatisfiableFlagsNoLock(unsigned port); + + base::Lock lock_; // Protects the following members. + bool is_open_[2]; + // These are *incoming* queues for their corresponding ports. It owns its + // contents. + // TODO(vtl): When possible (with C++11), convert the plain pointers to + // scoped_ptr/unique_ptr. (Then we'll be able to use an |std::queue| instead + // of an |std::list|.) + std::list<MessageInTransit*> message_queues_[2]; + WaiterList waiter_lists_[2]; + + DISALLOW_COPY_AND_ASSIGN(MessagePipe); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_MESSAGE_PIPE_H_ diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc new file mode 100644 index 0000000..ec5c80b --- /dev/null +++ b/mojo/system/message_pipe_dispatcher.cc @@ -0,0 +1,77 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/message_pipe_dispatcher.h" + +#include "base/logging.h" +#include "mojo/system/message_pipe.h" + +namespace mojo { +namespace system { + +MessagePipeDispatcher::MessagePipeDispatcher() { +} + +void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe, + unsigned port) { + DCHECK(message_pipe.get()); + DCHECK(port == 0 || port == 1); + + message_pipe_ = message_pipe; + port_ = port; +} + +MessagePipeDispatcher::~MessagePipeDispatcher() { + // |Close()|/|CloseImplNoLock()| should have taken care of the pipe. + DCHECK(!message_pipe_.get()); +} + +void MessagePipeDispatcher::CancelAllWaitersNoLock() { + lock().AssertAcquired(); + message_pipe_->CancelAllWaiters(port_); +} + +MojoResult MessagePipeDispatcher::CloseImplNoLock() { + lock().AssertAcquired(); + message_pipe_->Close(port_); + message_pipe_ = NULL; + return MOJO_RESULT_OK; +} + +MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( + const void* bytes, uint32_t num_bytes, + const MojoHandle* handles, uint32_t num_handles, + MojoWriteMessageFlags flags) { + lock().AssertAcquired(); + return message_pipe_->WriteMessage(port_, + bytes, num_bytes, + handles, num_handles, + flags); +} + +MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( + void* bytes, uint32_t* num_bytes, + MojoHandle* handles, uint32_t* num_handles, + MojoReadMessageFlags flags) { + lock().AssertAcquired(); + return message_pipe_->ReadMessage(port_, + bytes, num_bytes, + handles, num_handles, + flags); +} + +MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + lock().AssertAcquired(); + return message_pipe_->AddWaiter(port_, waiter, flags, wake_result); +} + +void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) { + lock().AssertAcquired(); + message_pipe_->RemoveWaiter(port_, waiter); +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/message_pipe_dispatcher.h b/mojo/system/message_pipe_dispatcher.h new file mode 100644 index 0000000..b52ebb8 --- /dev/null +++ b/mojo/system/message_pipe_dispatcher.h @@ -0,0 +1,61 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ +#define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "mojo/system/dispatcher.h" + +namespace mojo { +namespace system { + +class MessagePipe; + +// This is the |Dispatcher| implementation for message pipes (created by the +// Mojo primitive |MojoCreateMessagePipe()|). This class is thread-safe. +class MessagePipeDispatcher : public Dispatcher { + public: + MessagePipeDispatcher(); + + // Must be called before any other methods. (This method is not thread-safe.) + void Init(scoped_refptr<MessagePipe> message_pipe, unsigned port); + + private: + friend class base::RefCountedThreadSafe<MessagePipeDispatcher>; + virtual ~MessagePipeDispatcher(); + + // |Dispatcher| implementation/overrides: + virtual void CancelAllWaitersNoLock() OVERRIDE; + virtual MojoResult CloseImplNoLock() OVERRIDE; + virtual MojoResult WriteMessageImplNoLock( + const void* bytes, + uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoWriteMessageFlags flags) OVERRIDE; + virtual MojoResult ReadMessageImplNoLock( + void* bytes, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags) OVERRIDE; + virtual MojoResult AddWaiterImplNoLock(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) OVERRIDE; + virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE; + + // Protected by |lock()|: + scoped_refptr<MessagePipe> message_pipe_; // This will be null if closed. + unsigned port_; + + DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ diff --git a/mojo/system/message_pipe_dispatcher_unittest.cc b/mojo/system/message_pipe_dispatcher_unittest.cc new file mode 100644 index 0000000..b96d316 --- /dev/null +++ b/mojo/system/message_pipe_dispatcher_unittest.cc @@ -0,0 +1,539 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a +// heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase +// tolerance and reduce observed flakiness. + +#include "mojo/system/message_pipe_dispatcher.h" + +#include <string.h> + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_vector.h" +#include "base/rand_util.h" +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/threading/simple_thread.h" +#include "base/time/time.h" +#include "mojo/system/message_pipe.h" +#include "mojo/system/test_utils.h" +#include "mojo/system/waiter.h" +#include "mojo/system/waiter_test_utils.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +const int64_t kMicrosPerMs = 1000; +const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms. + +TEST(MessagePipeDispatcherTest, Basic) { + test::Stopwatch stopwatch; + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + int64_t elapsed_micros; + + // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa. + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d_0->Init(mp, i); // 0, 1. + d_1->Init(mp, i ^ 1); // 1, 0. + } + Waiter w; + + // Try adding a writable waiter when already writable. + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0)); + // Shouldn't need to remove the waiter (it was not added). + + // Add a readable waiter to |d_0|, then make it readable (by writing to + // |d_1|), then wait. + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1)); + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + d_1->WriteMessage(buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + stopwatch.Start(); + EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d_0->RemoveWaiter(&w); + + // Try adding a readable waiter when already readable (from above). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2)); + // Shouldn't need to remove the waiter (it was not added). + + // Make |d_0| no longer readable (by reading from it). + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d_0->ReadMessage(buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123456789, buffer[0]); + + // Wait for zero time for readability on |d_0| (will time out). + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d_0->RemoveWaiter(&w); + + // Wait for non-zero, finite time for readability on |d_0| (will time out). + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + d_0->RemoveWaiter(&w); + + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); + } +} + +// Test what happens when one end is closed (single-threaded test). +TEST(MessagePipeDispatcherTest, BasicClosed) { + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + + // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa. + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d_0->Init(mp, i); // 0, 1. + d_1->Init(mp, i ^ 1); // 1, 0. + } + Waiter w; + + // Write (twice) to |d_1|. + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + d_1->WriteMessage(buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + buffer[0] = 234567890; + EXPECT_EQ(MOJO_RESULT_OK, + d_1->WriteMessage(buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Try waiting for readable on |d_0|; should fail (already satisfied). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0)); + + // Close |d_1|. + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); + + // Try waiting for readable on |d_0|; should fail (already satisfied). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1)); + + // Read from |d_0|. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d_0->ReadMessage(buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123456789, buffer[0]); + + // Try waiting for readable on |d_0|; should fail (already satisfied). + w.Init(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2)); + + // Read again from |d_0|. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d_0->ReadMessage(buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(234567890, buffer[0]); + + // Try waiting for readable on |d_0|; should fail (unsatisfiable). + w.Init(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3)); + + // Try waiting for writable on |d_0|; should fail (unsatisfiable). + w.Init(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4)); + + // Try reading from |d_0|; should fail (nothing to read). + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + d_0->ReadMessage(buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Try writing to |d_0|; should fail (other end closed). + buffer[0] = 345678901; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d_0->WriteMessage(buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); + } +} + +TEST(MessagePipeDispatcherTest, BasicThreaded) { + test::Stopwatch stopwatch; + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + bool did_wait; + MojoResult result; + int64_t elapsed_micros; + + // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa. + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d_0->Init(mp, i); // 0, 1. + d_1->Init(mp, i ^ 1); // 1, 0. + } + + // Wait for readable on |d_1|, which will become readable after some time. + { + test::WaiterThread thread(d_1, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 0, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + // Wake it up by writing to |d_0|. + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + d_0->WriteMessage(buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(0, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + // Now |d_1| is already readable. Try waiting for it again. + { + test::WaiterThread thread(d_1, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 1, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_FALSE(did_wait); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + + // Consume what we wrote to |d_0|. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + d_1->ReadMessage(buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123456789, buffer[0]); + + // Wait for readable on |d_1| and close |d_0| after some time, which should + // cancel that wait. + { + test::WaiterThread thread(d_1, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 0, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); + } + + for (unsigned i = 0; i < 2; i++) { + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d_0->Init(mp, i); // 0, 1. + d_1->Init(mp, i ^ 1); // 1, 0. + } + + // Wait for readable on |d_1| and close |d_1| after some time, which should + // cancel that wait. + { + test::WaiterThread thread(d_1, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 0, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); + } +} + +// Stress test ----------------------------------------------------------------- + +const size_t kMaxMessageSize = 2000; + +class WriterThread : public base::SimpleThread { + public: + // |*messages_written| and |*bytes_written| belong to the thread while it's + // alive. + WriterThread(scoped_refptr<Dispatcher> write_dispatcher, + size_t* messages_written, size_t* bytes_written) + : base::SimpleThread("writer_thread"), + write_dispatcher_(write_dispatcher), + messages_written_(messages_written), + bytes_written_(bytes_written) { + *messages_written_ = 0; + *bytes_written_ = 0; + } + + virtual ~WriterThread() { + Join(); + } + + private: + virtual void Run() OVERRIDE { + // Make some data to write. + unsigned char buffer[kMaxMessageSize]; + for (size_t i = 0; i < kMaxMessageSize; i++) + buffer[i] = static_cast<unsigned char>(i); + + // Number of messages to write. + *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); + + // Write messages. + for (size_t i = 0; i < *messages_written_; i++) { + uint32_t bytes_to_write = static_cast<uint32_t>( + base::RandInt(1, static_cast<int>(kMaxMessageSize))); + EXPECT_EQ(MOJO_RESULT_OK, + write_dispatcher_->WriteMessage(buffer, bytes_to_write, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + *bytes_written_ += bytes_to_write; + } + + // Write one last "quit" message. + EXPECT_EQ(MOJO_RESULT_OK, + write_dispatcher_->WriteMessage("quit", 4, NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + } + + const scoped_refptr<Dispatcher> write_dispatcher_; + size_t* const messages_written_; + size_t* const bytes_written_; + + DISALLOW_COPY_AND_ASSIGN(WriterThread); +}; + +class ReaderThread : public base::SimpleThread { + public: + // |*messages_read| and |*bytes_read| belong to the thread while it's alive. + ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, + size_t* messages_read, size_t* bytes_read) + : base::SimpleThread("reader_thread"), + read_dispatcher_(read_dispatcher), + messages_read_(messages_read), + bytes_read_(bytes_read) { + *messages_read_ = 0; + *bytes_read_ = 0; + } + + virtual ~ReaderThread() { + Join(); + } + + private: + virtual void Run() OVERRIDE { + unsigned char buffer[kMaxMessageSize]; + MojoResult result; + Waiter w; + + // Read messages. + for (;;) { + // Wait for it to be readable. + w.Init(); + result = read_dispatcher_->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0); + EXPECT_TRUE(result == MOJO_RESULT_OK || + result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result; + if (result == MOJO_RESULT_OK) { + // Actually need to wait. + EXPECT_EQ(0, w.Wait(MOJO_DEADLINE_INDEFINITE)); + read_dispatcher_->RemoveWaiter(&w); + } + + // Now, try to do the read. + // Clear the buffer so that we can check the result. + memset(buffer, 0, sizeof(buffer)); + uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); + result = read_dispatcher_->ReadMessage(buffer, &buffer_size, NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE); + EXPECT_TRUE(result == MOJO_RESULT_OK || + result == MOJO_RESULT_NOT_FOUND) << "result: " << result; + // We're racing with others to read, so maybe we failed. + if (result == MOJO_RESULT_NOT_FOUND) + continue; // In which case, try again. + // Check for quit. + if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) + return; + EXPECT_GE(buffer_size, 1u); + EXPECT_LE(buffer_size, kMaxMessageSize); + EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); + + (*messages_read_)++; + *bytes_read_ += buffer_size; + } + } + + static bool IsValidMessage(const unsigned char* buffer, + uint32_t message_size) { + size_t i; + for (i = 0; i < message_size; i++) { + if (buffer[i] != static_cast<unsigned char>(i)) + return false; + } + // Check that the remaining bytes weren't stomped on. + for (; i < kMaxMessageSize; i++) { + if (buffer[i] != 0) + return false; + } + return true; + } + + const scoped_refptr<Dispatcher> read_dispatcher_; + size_t* const messages_read_; + size_t* const bytes_read_; + + DISALLOW_COPY_AND_ASSIGN(ReaderThread); +}; + +TEST(MessagePipeDispatcherTest, Stress) { + static const size_t kNumWriters = 30; + static const size_t kNumReaders = kNumWriters; + + scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher()); + scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher()); + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + d_write->Init(mp, 0); + d_read->Init(mp, 1); + } + + size_t messages_written[kNumWriters]; + size_t bytes_written[kNumWriters]; + size_t messages_read[kNumReaders]; + size_t bytes_read[kNumReaders]; + { + // Make writers. + ScopedVector<WriterThread> writers; + for (size_t i = 0; i < kNumWriters; i++) { + writers.push_back( + new WriterThread(d_write, &messages_written[i], &bytes_written[i])); + } + + // Make readers. + ScopedVector<ReaderThread> readers; + for (size_t i = 0; i < kNumReaders; i++) { + readers.push_back( + new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); + } + + // Start writers. + for (size_t i = 0; i < kNumWriters; i++) + writers[i]->Start(); + + // Start readers. + for (size_t i = 0; i < kNumReaders; i++) + readers[i]->Start(); + + // TODO(vtl): Maybe I should have an event that triggers all the threads to + // start doing stuff for real (so that the first ones created/started aren't + // advantaged). + } // Joins all the threads. + + size_t total_messages_written = 0; + size_t total_bytes_written = 0; + for (size_t i = 0; i < kNumWriters; i++) { + total_messages_written += messages_written[i]; + total_bytes_written += bytes_written[i]; + } + size_t total_messages_read = 0; + size_t total_bytes_read = 0; + for (size_t i = 0; i < kNumReaders; i++) { + total_messages_read += messages_read[i]; + total_bytes_read += bytes_read[i]; + // We'd have to be really unlucky to have read no messages on a thread. + EXPECT_GT(messages_read[i], 0u) << "reader: " << i; + EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; + } + EXPECT_EQ(total_messages_written, total_messages_read); + EXPECT_EQ(total_bytes_written, total_bytes_read); + + EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); + EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); +} + +} // namespace +} // namespace system +} // namespace mojo diff --git a/mojo/system/message_pipe_unittest.cc b/mojo/system/message_pipe_unittest.cc new file mode 100644 index 0000000..4ecd4ce --- /dev/null +++ b/mojo/system/message_pipe_unittest.cc @@ -0,0 +1,540 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/message_pipe.h" + +#include <limits> + +#include "base/memory/ref_counted.h" +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/time/time.h" +#include "mojo/system/waiter.h" +#include "mojo/system/waiter_test_utils.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +// Tests: +// - only default flags +// - reading messages from a port +// - when there are no/one/two messages available for that port +// - with buffer size 0 (and null buffer) -- should get size +// - with too-small buffer -- should get size +// - also verify that buffers aren't modified when/where they shouldn't be +// - writing messages to a port +// - in the obvious scenarios (as above) +// - to a port that's been closed +// - writing a message to a port, closing the other (would be the source) port, +// and reading it +TEST(MessagePipeTest, Basic) { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + + int32_t buffer[2]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + + // Nothing to read yet on port 0. + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(kBufferSize, buffer_size); + EXPECT_EQ(123, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Ditto for port 1. + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Write from port 1 (to port 0). + buffer[0] = 789012345; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(1, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Read from port 0. + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + EXPECT_EQ(789012345, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Read again from port 0 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Write two messages from port 0 (to port 1). + buffer[0] = 123456789; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(0, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + buffer[0] = 234567890; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(0, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Read from port 1 with buffer size 0 (should get the size of next message). + // Also test that giving a null buffer is okay when the buffer size is 0. + buffer_size = 0; + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->ReadMessage(1, + NULL, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + + // Read from port 1 with buffer size 1 (too small; should get the size of next + // message). + buffer[0] = 123; + buffer[1] = 456; + buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + EXPECT_EQ(123, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Read from port 1. + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + EXPECT_EQ(123456789, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Read again from port 1. + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + EXPECT_EQ(234567890, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Read again from port 1 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + // Write from port 0 (to port 1). + buffer[0] = 345678901; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(0, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Close port 0. + mp->Close(0); + + // Try to write from port 1 (to port 0). + buffer[0] = 456789012; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + mp->WriteMessage(1, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Read from port 1; should still get message (even though port 0 was closed). + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + EXPECT_EQ(345678901, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Read again from port 1 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + + mp->Close(1); +} + +TEST(MessagePipeTest, DiscardMode) { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + + int32_t buffer[2]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + + // Write from port 1 (to port 0). + buffer[0] = 789012345; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(1, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Read/discard from port 0 (no buffer); get size. + buffer_size = 0; + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->ReadMessage(0, + NULL, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + + // Read again from port 0 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + + // Write from port 1 (to port 0). + buffer[0] = 890123456; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(1, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Read from port 0 (buffer big enough). + buffer[0] = 123; + buffer[1] = 456; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + EXPECT_EQ(890123456, buffer[0]); + EXPECT_EQ(456, buffer[1]); + + // Read again from port 0 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + + // Write from port 1 (to port 0). + buffer[0] = 901234567; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(1, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Read/discard from port 0 (buffer too small); get size. + buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + EXPECT_EQ(static_cast<uint32_t>(sizeof(buffer[0])), buffer_size); + + // Read again from port 0 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + + // Write from port 1 (to port 0). + buffer[0] = 123456789; + buffer[1] = 0; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(1, + buffer, static_cast<uint32_t>(sizeof(buffer[0])), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Discard from port 0. + buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->ReadMessage(0, + NULL, NULL, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + + // Read again from port 0 -- it should be empty. + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)); + + mp->Close(0); + mp->Close(1); +} + +TEST(MessagePipeTest, InvalidParams) { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + + char buffer[1]; + MojoHandle handles[1]; + + // |WriteMessage|: + // Null buffer with nonzero buffer size. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + mp->WriteMessage(0, + NULL, 1, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + // Huge buffer size. + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->WriteMessage(0, + buffer, std::numeric_limits<uint32_t>::max(), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Null handles with nonzero handle count. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + mp->WriteMessage(0, + buffer, sizeof(buffer), + NULL, 1, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + // Huge handle count (implausibly big on some systems -- more than can be + // stored in a 32-bit address space). + // Note: This may return either |MOJO_RESULT_INVALID_ARGUMENT| or + // |MOJO_RESULT_RESOURCE_EXHAUSTED|, depending on whether it's plausible or + // not. + EXPECT_NE(MOJO_RESULT_OK, + mp->WriteMessage(0, + buffer, sizeof(buffer), + handles, std::numeric_limits<uint32_t>::max(), + MOJO_WRITE_MESSAGE_FLAG_NONE)); + // Huge handle count (plausibly big). + EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, + mp->WriteMessage(0, + buffer, sizeof(buffer), + handles, std::numeric_limits<uint32_t>::max() / + sizeof(handles[0]), + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // |ReadMessage|: + // Null buffer with nonzero buffer size. + uint32_t buffer_size = 1; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + mp->ReadMessage(0, + NULL, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + // Null handles with nonzero handle count. + buffer_size = static_cast<uint32_t>(sizeof(buffer)); + uint32_t handle_count = 1; + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + mp->ReadMessage(0, + buffer, &buffer_size, + NULL, &handle_count, + MOJO_READ_MESSAGE_FLAG_NONE)); + + mp->Close(0); + mp->Close(1); +} + +TEST(MessagePipeTest, BasicWaiting) { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + Waiter waiter; + + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + uint32_t buffer_size; + + // Always writable (until the other port is closed). + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + mp->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_WRITABLE, 0)); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + mp->AddWaiter(0, + &waiter, + MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE, + 0)); + + // Not yet readable. + EXPECT_EQ(MOJO_RESULT_OK, + mp->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 1)); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); + mp->RemoveWaiter(0, &waiter); + + // Write from port 0 (to port 1), to make port 1 readable. + buffer[0] = 123456789; + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(0, + buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + // Port 1 should already be readable now. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 2)); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + mp->AddWaiter(1, + &waiter, + MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE, + 0)); + // ... and still writable. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_WRITABLE, 3)); + + // Close port 0. + mp->Close(0); + + // Now port 1 should not be writable. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_WRITABLE, 4)); + + // But it should still be readable. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 5)); + + // Read from port 1. + buffer[0] = 0; + buffer_size = kBufferSize; + EXPECT_EQ(MOJO_RESULT_OK, + mp->ReadMessage(1, + buffer, &buffer_size, + NULL, NULL, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(123456789, buffer[0]); + + // Now port 1 should no longer be readable. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 6)); + + mp->Close(1); +} + +TEST(MessagePipeTest, ThreadedWaiting) { + int32_t buffer[1]; + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); + + MojoResult result; + + // Write to wake up waiter waiting for read. + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + test::SimpleWaiterThread thread(&result); + + EXPECT_EQ(MOJO_RESULT_OK, + mp->AddWaiter(1, thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0)); + thread.Start(); + + buffer[0] = 123456789; + // Write from port 0 (to port 1), which should wake up the waiter. + EXPECT_EQ(MOJO_RESULT_OK, + mp->WriteMessage(0, + buffer, kBufferSize, + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + + mp->RemoveWaiter(1, thread.waiter()); + + mp->Close(0); + mp->Close(1); + } // Joins |thread|. + // The waiter should have woken up successfully. + EXPECT_EQ(0, result); + + // Close to cancel waiter. + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + test::SimpleWaiterThread thread(&result); + + EXPECT_EQ(MOJO_RESULT_OK, + mp->AddWaiter(1, thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0)); + thread.Start(); + + // Close port 1 first -- this should result in the waiter being cancelled. + mp->CancelAllWaiters(1); + mp->Close(1); + + // Port 1 is closed, so |Dispatcher::RemoveWaiter()| wouldn't call into the + // |MessagePipe| to remove any waiter. + + mp->Close(0); + } // Joins |thread|. + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + + // Close to make waiter un-wake-up-able. + { + scoped_refptr<MessagePipe> mp(new MessagePipe()); + test::SimpleWaiterThread thread(&result); + + EXPECT_EQ(MOJO_RESULT_OK, + mp->AddWaiter(1, thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0)); + thread.Start(); + + // Close port 0 first -- this should wake the waiter up, since port 1 will + // never be readable. + mp->CancelAllWaiters(0); + mp->Close(0); + + mp->RemoveWaiter(1, thread.waiter()); + + mp->CancelAllWaiters(1); + mp->Close(1); + } // Joins |thread|. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); +} + +} // namespace +} // namespace system +} // namespace mojo diff --git a/mojo/system/simple_dispatcher.cc b/mojo/system/simple_dispatcher.cc new file mode 100644 index 0000000..ce2baef --- /dev/null +++ b/mojo/system/simple_dispatcher.cc @@ -0,0 +1,49 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/simple_dispatcher.h" + +#include "base/logging.h" + +namespace mojo { +namespace system { + +SimpleDispatcher::SimpleDispatcher() { +} + +SimpleDispatcher::~SimpleDispatcher() { +} + +void SimpleDispatcher::StateChangedNoLock() { + lock().AssertAcquired(); + waiter_list_.AwakeWaitersForStateChange(SatisfiedFlagsNoLock(), + SatisfiableFlagsNoLock()); +} + +void SimpleDispatcher::CancelAllWaitersNoLock() { + lock().AssertAcquired(); + waiter_list_.CancelAllWaiters(); +} + +MojoResult SimpleDispatcher::AddWaiterImplNoLock(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + lock().AssertAcquired(); + + if ((flags & SatisfiedFlagsNoLock())) + return MOJO_RESULT_ALREADY_EXISTS; + if (!(flags & SatisfiableFlagsNoLock())) + return MOJO_RESULT_FAILED_PRECONDITION; + + waiter_list_.AddWaiter(waiter, flags, wake_result); + return MOJO_RESULT_OK; +} + +void SimpleDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) { + lock().AssertAcquired(); + waiter_list_.RemoveWaiter(waiter); +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/simple_dispatcher.h b/mojo/system/simple_dispatcher.h new file mode 100644 index 0000000..f306aba --- /dev/null +++ b/mojo/system/simple_dispatcher.h @@ -0,0 +1,58 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_SIMPLE_DISPATCHER_H_ +#define MOJO_SYSTEM_SIMPLE_DISPATCHER_H_ + +#include <list> + +#include "base/basictypes.h" +#include "mojo/system/dispatcher.h" +#include "mojo/system/waiter_list.h" + +namespace mojo { +namespace system { + +// A base class for simple dispatchers. "Simple" means that there's a one-to-one +// correspondence between handles and dispatchers (see the explanatory comment +// in core_impl.cc). This class implements the standard waiter-signalling +// mechanism in that case. +class SimpleDispatcher : public Dispatcher { + protected: + SimpleDispatcher(); + + friend class base::RefCountedThreadSafe<SimpleDispatcher>; + virtual ~SimpleDispatcher(); + + // To be called by subclasses when the state changes (so + // |SatisfiedFlagsNoLock()| and |SatisfiableFlagsNoLock()| should be checked + // again). Must be called under lock. + void StateChangedNoLock(); + + // These should return the wait flags that are satisfied by the object's + // current state and those that may eventually be satisfied by this object's + // state, respectively. They should be overridden by subclasses to reflect + // their notion of state. They are never called after the dispatcher has been + // closed. They are called under |lock_|. + virtual MojoWaitFlags SatisfiedFlagsNoLock() const = 0; + virtual MojoWaitFlags SatisfiableFlagsNoLock() const = 0; + + // |Dispatcher| implementation/overrides: + virtual void CancelAllWaitersNoLock() OVERRIDE; + virtual MojoResult AddWaiterImplNoLock(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) OVERRIDE; + virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE; + + private: + // Protected by |lock()|: + WaiterList waiter_list_; + + DISALLOW_COPY_AND_ASSIGN(SimpleDispatcher); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_SIMPLE_DISPATCHER_H_ diff --git a/mojo/system/simple_dispatcher_unittest.cc b/mojo/system/simple_dispatcher_unittest.cc new file mode 100644 index 0000000..9822f95 --- /dev/null +++ b/mojo/system/simple_dispatcher_unittest.cc @@ -0,0 +1,514 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a +// heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase +// tolerance and reduce observed flakiness. + +#include "mojo/system/simple_dispatcher.h" + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_vector.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/time/time.h" +#include "mojo/system/test_utils.h" +#include "mojo/system/waiter.h" +#include "mojo/system/waiter_test_utils.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +const int64_t kMicrosPerMs = 1000; +const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms. + +class MockSimpleDispatcher : public SimpleDispatcher { + public: + MockSimpleDispatcher() + : satisfied_flags_(MOJO_WAIT_FLAG_NONE), + satisfiable_flags_(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE) {} + + void SetSatisfiedFlags(MojoWaitFlags new_satisfied_flags) { + base::AutoLock locker(lock()); + + // Any new flags that are set should be satisfiable. + CHECK_EQ(new_satisfied_flags & ~satisfied_flags_, + new_satisfied_flags & ~satisfied_flags_ & satisfiable_flags_); + + if (new_satisfied_flags == satisfied_flags_) + return; + + satisfied_flags_ = new_satisfied_flags; + StateChangedNoLock(); + } + + void SetSatisfiableFlags(MojoWaitFlags new_satisfiable_flags) { + base::AutoLock locker(lock()); + + if (new_satisfiable_flags == satisfiable_flags_) + return; + + satisfiable_flags_ = new_satisfiable_flags; + StateChangedNoLock(); + } + + private: + friend class base::RefCountedThreadSafe<MockSimpleDispatcher>; + virtual ~MockSimpleDispatcher() {} + + // |SimpleDispatcher| implementation: + virtual MojoWaitFlags SatisfiedFlagsNoLock() const OVERRIDE { + lock().AssertAcquired(); + return satisfied_flags_; + } + + virtual MojoWaitFlags SatisfiableFlagsNoLock() const OVERRIDE { + lock().AssertAcquired(); + return satisfiable_flags_; + } + + // Protected by |lock()|: + MojoWaitFlags satisfied_flags_; + MojoWaitFlags satisfiable_flags_; + + DISALLOW_COPY_AND_ASSIGN(MockSimpleDispatcher); +}; + +TEST(SimpleDispatcherTest, Basic) { + test::Stopwatch stopwatch; + int64_t elapsed_micros; + + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + Waiter w; + + // Try adding a readable waiter when already readable. + w.Init(); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + d->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0)); + // Shouldn't need to remove the waiter (it was not added). + + // Wait (forever) for writable when already writable. + w.Init(); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 1)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE); + stopwatch.Start(); + EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + // Wait for zero time for writable when already writable. + w.Init(); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 2)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE); + stopwatch.Start(); + EXPECT_EQ(2, w.Wait(0)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + // Wait for non-zero, finite time for writable when already writable. + w.Init(); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 3)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE); + stopwatch.Start(); + EXPECT_EQ(3, w.Wait(2 * kEpsilonMicros)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + // Wait for zero time for writable when not writable (will time out). + w.Init(); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + // Wait for non-zero, finite time for writable when not writable (will time + // out). + w.Init(); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4)); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + d->RemoveWaiter(&w); + + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); +} + +TEST(SimpleDispatcherTest, BasicUnsatisfiable) { + test::Stopwatch stopwatch; + int64_t elapsed_micros; + + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + Waiter w; + + // Try adding a writable waiter when it can never be writable. + w.Init(); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE); + d->SetSatisfiedFlags(0); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 5)); + // Shouldn't need to remove the waiter (it was not added). + + // Wait (forever) for writable and then it becomes never writable. + w.Init(); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 6)); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, w.Wait(MOJO_DEADLINE_INDEFINITE)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + // Wait for zero time for writable and then it becomes never writable. + w.Init(); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 6)); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, w.Wait(0)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + // Wait for non-zero, finite time for writable and then it becomes never + // writable. + w.Init(); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE | MOJO_WAIT_FLAG_WRITABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 7)); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, w.Wait(2 * kEpsilonMicros)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + d->RemoveWaiter(&w); + + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); +} + +TEST(SimpleDispatcherTest, BasicClosed) { + test::Stopwatch stopwatch; + int64_t elapsed_micros; + + scoped_refptr<MockSimpleDispatcher> d; + Waiter w; + + // Try adding a writable waiter when the dispatcher has been closed. + d = new MockSimpleDispatcher(); + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 8)); + // Shouldn't need to remove the waiter (it was not added). + + // Wait (forever) for writable and then the dispatcher is closed. + d = new MockSimpleDispatcher(); + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 9)); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_CANCELLED, w.Wait(MOJO_DEADLINE_INDEFINITE)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + // Don't need to remove waiters from closed dispatchers. + + // Wait for zero time for writable and then the dispatcher is closed. + d = new MockSimpleDispatcher(); + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 10)); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_CANCELLED, w.Wait(0)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + // Don't need to remove waiters from closed dispatchers. + + // Wait for non-zero, finite time for writable and then the dispatcher is + // closed. + d = new MockSimpleDispatcher(); + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, d->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 11)); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_CANCELLED, w.Wait(2 * kEpsilonMicros)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + // Don't need to remove waiters from closed dispatchers. +} + +TEST(SimpleDispatcherTest, BasicThreaded) { + test::Stopwatch stopwatch; + bool did_wait; + MojoResult result; + int64_t elapsed_micros; + + // Wait for readable (already readable). + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + { + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + test::WaiterThread thread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 0, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + } // Joins the thread. + // If we closed earlier, then probably we'd get a |MOJO_RESULT_CANCELLED|. + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } + elapsed_micros = stopwatch.Elapsed(); + EXPECT_FALSE(did_wait); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + + // Wait for readable and becomes readable after some time. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + test::WaiterThread thread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 1, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(1, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + // Wait for readable and becomes never-readable after some time. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + test::WaiterThread thread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 2, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_NONE); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + // Wait for readable and dispatcher gets closed. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + test::WaiterThread thread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + 3, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the thread. + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + // Wait for readable and times out. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + { + test::WaiterThread thread(d, + MOJO_WAIT_FLAG_READABLE, + 2 * kEpsilonMicros, + 4, + &did_wait, &result); + stopwatch.Start(); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + // Not what we're waiting for. + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_WRITABLE); + } // Joins the thread (after its wait times out). + // If we closed earlier, then probably we'd get a |MOJO_RESULT_CANCELLED|. + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } + elapsed_micros = stopwatch.Elapsed(); + EXPECT_TRUE(did_wait); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); +} + +TEST(SimpleDispatcherTest, MultipleWaiters) { + static const size_t kNumWaiters = 20; + + bool did_wait[kNumWaiters]; + MojoResult result[kNumWaiters]; + + // All wait for readable and becomes readable after some time. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + ScopedVector<test::WaiterThread> threads; + for (size_t i = 0; i < kNumWaiters; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the threads. + for (size_t i = 0; i < kNumWaiters; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(static_cast<MojoResult>(i), result[i]); + } + + // Some wait for readable, some for writable, and becomes readable after some + // time. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + ScopedVector<test::WaiterThread> threads; + for (size_t i = 0; i < kNumWaiters / 2; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_WRITABLE, + MOJO_DEADLINE_INDEFINITE, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + // This will wake up the ones waiting to write. + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the threads. + for (size_t i = 0; i < kNumWaiters / 2; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(static_cast<MojoResult>(i), result[i]); + } + for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result[i]); + } + + // Some wait for readable, some for writable, and becomes readable and + // never-writable after some time. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + ScopedVector<test::WaiterThread> threads; + for (size_t i = 0; i < kNumWaiters / 2; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_READABLE, + MOJO_DEADLINE_INDEFINITE, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_WRITABLE, + MOJO_DEADLINE_INDEFINITE, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + d->SetSatisfiableFlags(MOJO_WAIT_FLAG_READABLE); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the threads. + for (size_t i = 0; i < kNumWaiters / 2; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(static_cast<MojoResult>(i), result[i]); + } + for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result[i]); + } + + // Some wait for readable, some for writable, and becomes readable after some + // time. + { + scoped_refptr<MockSimpleDispatcher> d(new MockSimpleDispatcher()); + ScopedVector<test::WaiterThread> threads; + for (size_t i = 0; i < kNumWaiters / 2; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_READABLE, + 3 * kEpsilonMicros, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) { + threads.push_back(new test::WaiterThread(d, + MOJO_WAIT_FLAG_WRITABLE, + 1 * kEpsilonMicros, + static_cast<MojoResult>(i), + &did_wait[i], &result[i])); + threads.back()->Start(); + } + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + d->SetSatisfiedFlags(MOJO_WAIT_FLAG_READABLE); + // All those waiting for writable should have timed out. + EXPECT_EQ(MOJO_RESULT_OK, d->Close()); + } // Joins the threads. + for (size_t i = 0; i < kNumWaiters / 2; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(static_cast<MojoResult>(i), result[i]); + } + for (size_t i = kNumWaiters / 2; i < kNumWaiters; i++) { + EXPECT_TRUE(did_wait[i]); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result[i]); + } +} + +// TODO(vtl): Stress test? + +} // namespace +} // namespace system +} // namespace mojo diff --git a/mojo/system/test_utils.h b/mojo/system/test_utils.h new file mode 100644 index 0000000..2f1e956 --- /dev/null +++ b/mojo/system/test_utils.h @@ -0,0 +1,38 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_TEST_UTILS_H_ +#define MOJO_SYSTEM_TEST_UTILS_H_ + +#include "base/basictypes.h" +#include "base/time/time.h" + +namespace mojo { +namespace system { +namespace test { + +class Stopwatch { + public: + Stopwatch() {} + ~Stopwatch() {} + + void Start() { + start_time_ = base::TimeTicks::HighResNow(); + } + + int64_t Elapsed() { + return (base::TimeTicks::HighResNow() - start_time_).InMicroseconds(); + } + + private: + base::TimeTicks start_time_; + + DISALLOW_COPY_AND_ASSIGN(Stopwatch); +}; + +} // namespace test +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_TEST_UTILS_H_ diff --git a/mojo/system/waiter.cc b/mojo/system/waiter.cc new file mode 100644 index 0000000..6dda21f --- /dev/null +++ b/mojo/system/waiter.cc @@ -0,0 +1,80 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/waiter.h" + +#include <limits> + +#include "base/logging.h" +#include "base/time/time.h" + +namespace mojo { +namespace system { + +Waiter::Waiter() + : cv_(&lock_), + awoken_(false), + wait_result_(MOJO_RESULT_INTERNAL) { +} + +Waiter::~Waiter() { +} + +void Waiter::Init() { + awoken_ = false; + // NOTE(vtl): If performance ever becomes an issue, we can disable the setting + // of |wait_result_| (except the first one in |Awake()|) in Release builds. + wait_result_ = MOJO_RESULT_INTERNAL; +} + +// TODO(vtl): Fast-path the |deadline == 0| case? +MojoResult Waiter::Wait(MojoDeadline deadline) { + base::AutoLock locker(lock_); + + // Fast-path the already-awoken case: + if (awoken_) { + DCHECK_NE(wait_result_, MOJO_RESULT_INTERNAL); + return wait_result_; + } + + // |MojoDeadline| is actually a |uint64_t|, but we need a signed quantity. + // Treat any out-of-range deadline as "forever" (which is wrong, but okay + // since 2^63 microseconds is ~300000 years). Note that this also takes care + // of the |MOJO_DEADLINE_INDEFINITE| (= 2^64 - 1) case. + if (deadline > static_cast<uint64_t>(std::numeric_limits<int64_t>::max())) { + do { + cv_.Wait(); + } while (!awoken_); + } else { + // NOTE(vtl): This is very inefficient on POSIX, since pthreads condition + // variables take an absolute deadline. + const base::TimeTicks end_time = base::TimeTicks::HighResNow() + + base::TimeDelta::FromMicroseconds(static_cast<int64_t>(deadline)); + do { + base::TimeTicks now_time = base::TimeTicks::HighResNow(); + if (now_time >= end_time) + return MOJO_RESULT_DEADLINE_EXCEEDED; + + cv_.TimedWait(end_time - now_time); + } while (!awoken_); + } + + DCHECK_NE(wait_result_, MOJO_RESULT_INTERNAL); + return wait_result_; +} + +void Waiter::Awake(MojoResult wait_result) { + base::AutoLock locker(lock_); + + if (awoken_) + return; + + awoken_ = true; + wait_result_ = wait_result; + cv_.Signal(); + // |cv_.Wait()|/|cv_.TimedWait()| will return after |lock_| is released. +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/waiter.h b/mojo/system/waiter.h new file mode 100644 index 0000000..bdd33fc --- /dev/null +++ b/mojo/system/waiter.h @@ -0,0 +1,57 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_WAITER_H_ +#define MOJO_SYSTEM_WAITER_H_ + +#include "base/basictypes.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "mojo/public/system/core.h" + +namespace mojo { +namespace system { + +// IMPORTANT (all-caps gets your attention, right?): |Waiter| methods are called +// under other locks, in particular, |Dispatcher::lock_|s, so |Waiter| methods +// must never call out to other objects (in particular, |Dispatcher|s). This +// class is thread-safe. +class Waiter { + public: + Waiter(); + ~Waiter(); + + // A |Waiter| can be used multiple times; |Init()| should be called before + // each time it's used. + void Init(); + + // Waits until a suitable |Awake()| is called. + // Returns: + // - The |wake_result| passed to |Dispatcher::AddWaiter()| if it was woken up + // by that dispatcher for the reason specified by |flags| (in the call to + // |AddWaiter()|). + // - |MOJO_RESULT_CANCELLED| if a handle (on which |MojoWait()| was called) + // was closed; and + // - |MOJO_RESULT_FAILED_PRECONDITION| if the reasons for being awoken given + // by |flags| cannot (or can no longer) be satisfied (e.g., if the other + // end of a pipe is closed). + MojoResult Wait(MojoDeadline deadline); + + // Wake the waiter up with the given result (or no-op if it's been woken up + // already). + void Awake(MojoResult wait_result); + + private: + base::ConditionVariable cv_; // Associated to |lock_|. + base::Lock lock_; // Protects the following members. + bool awoken_; + MojoResult wait_result_; + + DISALLOW_COPY_AND_ASSIGN(Waiter); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_WAITER_H_ diff --git a/mojo/system/waiter_list.cc b/mojo/system/waiter_list.cc new file mode 100644 index 0000000..6f2c444 --- /dev/null +++ b/mojo/system/waiter_list.cc @@ -0,0 +1,57 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/waiter_list.h" + +#include "base/logging.h" +#include "mojo/system/waiter.h" + +namespace mojo { +namespace system { + +WaiterList::WaiterList() { +} + +WaiterList::~WaiterList() { + DCHECK(waiters_.empty()); +} + +void WaiterList::AwakeWaitersForStateChange(MojoWaitFlags satisfied_flags, + MojoWaitFlags satisfiable_flags) { + for (WaiterInfoList::iterator it = waiters_.begin(); it != waiters_.end(); + ++it) { + if (it->flags & satisfied_flags) + it->waiter->Awake(it->wake_result); + else if (!(it->flags & satisfiable_flags)) + it->waiter->Awake(MOJO_RESULT_FAILED_PRECONDITION); + } +} + +void WaiterList::CancelAllWaiters() { + for (WaiterInfoList::iterator it = waiters_.begin(); it != waiters_.end(); + ++it) { + it->waiter->Awake(MOJO_RESULT_CANCELLED); + } + waiters_.clear(); +} + +void WaiterList::AddWaiter(Waiter* waiter, + MojoWaitFlags flags, + MojoResult wake_result) { + waiters_.push_back(WaiterInfo(waiter, flags, wake_result)); +} + +void WaiterList::RemoveWaiter(Waiter* waiter) { + // We allow a thread to wait on the same handle multiple times simultaneously, + // so we need to scan the entire list and remove all occurrences of |waiter|. + for (WaiterInfoList::iterator it = waiters_.begin(); it != waiters_.end();) { + WaiterInfoList::iterator maybe_delete = it; + ++it; + if (maybe_delete->waiter == waiter) + waiters_.erase(maybe_delete); + } +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/waiter_list.h b/mojo/system/waiter_list.h new file mode 100644 index 0000000..23933e3 --- /dev/null +++ b/mojo/system/waiter_list.h @@ -0,0 +1,55 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_WAITER_LIST_H_ +#define MOJO_SYSTEM_WAITER_LIST_H_ + +#include <list> + +#include "base/basictypes.h" +#include "mojo/public/system/core.h" + +namespace mojo { +namespace system { + +class Waiter; + +// |WaiterList| tracks all the |Waiter|s that are waiting on a given +// handle/|Dispatcher|. There should be a |WaiterList| for each handle that can +// be waited on (in any way). In the simple case, the |WaiterList| is owned by +// the |Dispatcher|, whereas in more complex cases it is owned by the secondary +// object (see simple_dispatcher.* and the explanatory comment in core_impl.cc). +// This class is thread-unsafe (all concurrent access must be protected by some +// lock). +class WaiterList { + public: + WaiterList(); + ~WaiterList(); + + void AwakeWaitersForStateChange(MojoWaitFlags satisfied_flags, + MojoWaitFlags satisfiable_flags); + void CancelAllWaiters(); + void AddWaiter(Waiter* waiter, MojoWaitFlags flags, MojoResult wake_result); + void RemoveWaiter(Waiter* waiter); + + private: + struct WaiterInfo { + WaiterInfo(Waiter* waiter, MojoWaitFlags flags, MojoResult wake_result) + : waiter(waiter), flags(flags), wake_result(wake_result) {} + + Waiter* waiter; + MojoWaitFlags flags; + MojoResult wake_result; + }; + typedef std::list<WaiterInfo> WaiterInfoList; + + WaiterInfoList waiters_; + + DISALLOW_COPY_AND_ASSIGN(WaiterList); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_WAITER_LIST_H_ diff --git a/mojo/system/waiter_list_unittest.cc b/mojo/system/waiter_list_unittest.cc new file mode 100644 index 0000000..fa6564c --- /dev/null +++ b/mojo/system/waiter_list_unittest.cc @@ -0,0 +1,266 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// NOTE(vtl): These tests are inherently flaky (e.g., if run on a heavily-loaded +// system). Sorry. |kEpsilonMicros| may be increased to increase tolerance and +// reduce observed flakiness. + +#include "mojo/system/waiter_list.h" + +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/time/time.h" +#include "mojo/system/waiter.h" +#include "mojo/system/waiter_test_utils.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +const int64_t kMicrosPerMs = 1000; +const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms. + +TEST(WaiterListTest, BasicCancel) { + MojoResult result; + + // Cancel immediately after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0); + thread.Start(); + waiter_list.CancelAllWaiters(); + waiter_list.RemoveWaiter(thread.waiter()); // Double-remove okay. + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + + // Cancel before after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1); + waiter_list.CancelAllWaiters(); + thread.Start(); + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + + // Cancel some time after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 2); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + waiter_list.CancelAllWaiters(); + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); +} + +TEST(WaiterListTest, BasicAwakeSatisfied) { + MojoResult result; + + // Awake immediately after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0); + thread.Start(); + waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE, + MOJO_WAIT_FLAG_READABLE | + MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread.waiter()); + } // Join |thread|. + EXPECT_EQ(0, result); + + // Awake before after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1); + waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_WRITABLE, + MOJO_WAIT_FLAG_READABLE | + MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread.waiter()); + waiter_list.RemoveWaiter(thread.waiter()); // Double-remove okay. + thread.Start(); + } // Join |thread|. + EXPECT_EQ(1, result); + + // Awake some time after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 2); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE, + MOJO_WAIT_FLAG_READABLE | + MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread.waiter()); + } // Join |thread|. + EXPECT_EQ(2, result); +} + +TEST(WaiterListTest, BasicAwakeUnsatisfiable) { + MojoResult result; + + // Awake (for unsatisfiability) immediately after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 0); + thread.Start(); + waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread.waiter()); + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + + // Awake (for unsatisfiability) before after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1); + waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE, + MOJO_WAIT_FLAG_READABLE); + waiter_list.RemoveWaiter(thread.waiter()); + thread.Start(); + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + + // Awake (for unsatisfiability) some time after thread start. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread(&result); + waiter_list.AddWaiter(thread.waiter(), MOJO_WAIT_FLAG_READABLE, 2); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread.waiter()); + waiter_list.RemoveWaiter(thread.waiter()); // Double-remove okay. + } // Join |thread|. + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); +} + +TEST(WaiterListTest, MultipleWaiters) { + MojoResult result_1; + MojoResult result_2; + MojoResult result_3; + MojoResult result_4; + + // Cancel two waiters. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread_1(&result_1); + waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 0); + thread_1.Start(); + test::SimpleWaiterThread thread_2(&result_2); + waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 1); + thread_2.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + waiter_list.CancelAllWaiters(); + } // Join threads. + EXPECT_EQ(MOJO_RESULT_CANCELLED, result_1); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result_2); + + // Awake one waiter, cancel other. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread_1(&result_1); + waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 2); + thread_1.Start(); + test::SimpleWaiterThread thread_2(&result_2); + waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 3); + thread_2.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE, + MOJO_WAIT_FLAG_READABLE | + MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread_1.waiter()); + waiter_list.CancelAllWaiters(); + } // Join threads. + EXPECT_EQ(2, result_1); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result_2); + + // Cancel one waiter, awake other for unsatisfiability. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread_1(&result_1); + waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 4); + thread_1.Start(); + test::SimpleWaiterThread thread_2(&result_2); + waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 5); + thread_2.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_READABLE); + waiter_list.RemoveWaiter(thread_2.waiter()); + waiter_list.CancelAllWaiters(); + } // Join threads. + EXPECT_EQ(MOJO_RESULT_CANCELLED, result_1); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result_2); + + // Cancel one waiter, awake other for unsatisfiability. + { + WaiterList waiter_list; + test::SimpleWaiterThread thread_1(&result_1); + waiter_list.AddWaiter(thread_1.waiter(), MOJO_WAIT_FLAG_READABLE, 6); + thread_1.Start(); + + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + + // Should do nothing. + waiter_list.AwakeWaitersForStateChange(0, + MOJO_WAIT_FLAG_READABLE | + MOJO_WAIT_FLAG_WRITABLE); + + test::SimpleWaiterThread thread_2(&result_2); + waiter_list.AddWaiter(thread_2.waiter(), MOJO_WAIT_FLAG_WRITABLE, 7); + thread_2.Start(); + + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + + // Awake #1. + waiter_list.AwakeWaitersForStateChange(MOJO_WAIT_FLAG_READABLE, + MOJO_WAIT_FLAG_READABLE | + MOJO_WAIT_FLAG_WRITABLE); + waiter_list.RemoveWaiter(thread_1.waiter()); + + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + + test::SimpleWaiterThread thread_3(&result_3); + waiter_list.AddWaiter(thread_3.waiter(), MOJO_WAIT_FLAG_WRITABLE, 8); + thread_3.Start(); + + test::SimpleWaiterThread thread_4(&result_4); + waiter_list.AddWaiter(thread_4.waiter(), MOJO_WAIT_FLAG_READABLE, 9); + thread_4.Start(); + + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + + // Awake #2 and #3 for unsatisfiability. + waiter_list.AwakeWaitersForStateChange(0, MOJO_WAIT_FLAG_READABLE); + waiter_list.RemoveWaiter(thread_2.waiter()); + waiter_list.RemoveWaiter(thread_3.waiter()); + + // Cancel #4. + waiter_list.CancelAllWaiters(); + } // Join threads. + EXPECT_EQ(6, result_1); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result_2); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result_3); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result_4); +} + +} // namespace +} // namespace system +} // namespace mojo diff --git a/mojo/system/waiter_test_utils.cc b/mojo/system/waiter_test_utils.cc new file mode 100644 index 0000000..37df616 --- /dev/null +++ b/mojo/system/waiter_test_utils.cc @@ -0,0 +1,63 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/system/waiter_test_utils.h" + +namespace mojo { +namespace system { +namespace test { + +SimpleWaiterThread::SimpleWaiterThread(MojoResult* result) + : base::SimpleThread("waiter_thread"), + result_(result) { + waiter_.Init(); + *result_ = -5420734; // Totally invalid result. +} + +SimpleWaiterThread::~SimpleWaiterThread() { + Join(); +} + +void SimpleWaiterThread::Run() { + *result_ = waiter_.Wait(MOJO_DEADLINE_INDEFINITE); +} + +WaiterThread::WaiterThread(scoped_refptr<Dispatcher> dispatcher, + MojoWaitFlags wait_flags, + MojoDeadline deadline, + MojoResult success_result, + bool* did_wait_out, + MojoResult* result_out) + : base::SimpleThread("waiter_thread"), + dispatcher_(dispatcher), + wait_flags_(wait_flags), + deadline_(deadline), + success_result_(success_result), + did_wait_out_(did_wait_out), + result_out_(result_out) { + *did_wait_out_ = false; + *result_out_ = -8542346; // Totally invalid result. +} + +WaiterThread::~WaiterThread() { + Join(); +} + +void WaiterThread::Run() { + waiter_.Init(); + + *result_out_ = dispatcher_->AddWaiter(&waiter_, + wait_flags_, + success_result_); + if (*result_out_ != MOJO_RESULT_OK) + return; + + *did_wait_out_ = true; + *result_out_ = waiter_.Wait(deadline_); + dispatcher_->RemoveWaiter(&waiter_); +} + +} // namespace test +} // namespace system +} // namespace mojo diff --git a/mojo/system/waiter_test_utils.h b/mojo/system/waiter_test_utils.h new file mode 100644 index 0000000..b1a7a33 --- /dev/null +++ b/mojo/system/waiter_test_utils.h @@ -0,0 +1,95 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_WAITER_TEST_UTILS_H_ +#define MOJO_SYSTEM_WAITER_TEST_UTILS_H_ + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "base/threading/simple_thread.h" +#include "mojo/public/system/core.h" +#include "mojo/system/dispatcher.h" +#include "mojo/system/waiter.h" + +namespace mojo { +namespace system { +namespace test { + +// This is a very simple thread that has a |Waiter|, on which it waits +// indefinitely (and records the result). It will create and initialize the +// |Waiter| on creation, but the caller must start the thread with |Start()|. It +// will join the thread on destruction. +// +// One usually uses it like: +// +// MojoResult result; +// { +// WaiterList waiter_list; +// test::SimpleWaiterThread thread(&result); +// waiter_list.AddWaiter(thread.waiter(), ...); +// thread.Start(); +// ... some stuff to wake the waiter ... +// waiter_list.RemoveWaiter(thread.waiter()); +// } // Join |thread|. +// EXPECT_EQ(..., result); +// +// There's a bit of unrealism in its use: In this sort of usage, calls such as +// |Waiter::Init()|, |AddWaiter()|, and |RemoveWaiter()| are done in the main +// (test) thread, not the waiter thread (as would actually happen in real code). +// (We accept this unrealism for simplicity, since |WaiterList| is +// thread-unsafe so making it more realistic would require adding nontrivial +// synchronization machinery.) +class SimpleWaiterThread : public base::SimpleThread { + public: + // For the duration of the lifetime of this object, |*result| belongs to it + // (in the sense that it will write to it whenever it wants). + explicit SimpleWaiterThread(MojoResult* result); + virtual ~SimpleWaiterThread(); // Joins the thread. + + Waiter* waiter() { return &waiter_; } + + private: + virtual void Run() OVERRIDE; + + MojoResult* const result_; + Waiter waiter_; + + DISALLOW_COPY_AND_ASSIGN(SimpleWaiterThread); +}; + +// This is a more complex and realistic thread that has a |Waiter|, on which it +// waits for the given deadline (with the given flags). Unlike +// |SimpleWaiterThread|, it requires the machinery of |Dispatcher|. +class WaiterThread : public base::SimpleThread { + public: + // Note: |*did_wait_out| and |*result| belong to this object while it's alive. + WaiterThread(scoped_refptr<Dispatcher> dispatcher, + MojoWaitFlags wait_flags, + MojoDeadline deadline, + MojoResult success_result, + bool* did_wait_out, + MojoResult* result_out); + virtual ~WaiterThread(); + + private: + virtual void Run() OVERRIDE; + + const scoped_refptr<Dispatcher> dispatcher_; + const MojoWaitFlags wait_flags_; + const MojoDeadline deadline_; + const MojoResult success_result_; + bool* const did_wait_out_; + MojoResult* const result_out_; + + Waiter waiter_; + + DISALLOW_COPY_AND_ASSIGN(WaiterThread); +}; + +} // namespace test +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_WAITER_TEST_UTILS_H_ diff --git a/mojo/system/waiter_unittest.cc b/mojo/system/waiter_unittest.cc new file mode 100644 index 0000000..4a69a89 --- /dev/null +++ b/mojo/system/waiter_unittest.cc @@ -0,0 +1,285 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// NOTE(vtl): These tests are inherently flaky (e.g., if run on a heavily-loaded +// system). Sorry. |kEpsilonMicros| may be increased to increase tolerance and +// reduce observed flakiness. + +#include "mojo/system/waiter.h" + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" // For |Sleep()|. +#include "base/threading/simple_thread.h" +#include "base/time/time.h" +#include "mojo/system/test_utils.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace system { +namespace { + +const int64_t kMicrosPerMs = 1000; +const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms. +const int64_t kPollTimeMicros = 10 * kMicrosPerMs; // 10 ms. + +class WaitingThread : public base::SimpleThread { + public: + explicit WaitingThread(MojoDeadline deadline) + : base::SimpleThread("waiting_thread"), + deadline_(deadline), + done_(false), + result_(MOJO_RESULT_UNKNOWN), + elapsed_micros_(-1) { + waiter_.Init(); + } + + virtual ~WaitingThread() { + Join(); + } + + void WaitUntilDone(MojoResult* result, int64_t* elapsed_micros) { + for (;;) { + { + base::AutoLock locker(lock_); + if (done_) { + *result = result_; + *elapsed_micros = elapsed_micros_; + break; + } + } + + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(kPollTimeMicros)); + } + } + + Waiter* waiter() { return &waiter_; } + + private: + virtual void Run() OVERRIDE { + test::Stopwatch stopwatch; + MojoResult result; + int64_t elapsed_micros; + + stopwatch.Start(); + result = waiter_.Wait(deadline_); + elapsed_micros = stopwatch.Elapsed(); + + { + base::AutoLock locker(lock_); + done_ = true; + result_ = result; + elapsed_micros_ = elapsed_micros; + } + } + + const MojoDeadline deadline_; + Waiter waiter_; // Thread-safe. + + base::Lock lock_; // Protects the following members. + bool done_; + MojoResult result_; + int64_t elapsed_micros_; + + DISALLOW_COPY_AND_ASSIGN(WaitingThread); +}; + +TEST(WaiterTest, Basic) { + MojoResult result; + int64_t elapsed_micros; + + // Finite deadline. + + // Awake immediately after thread start. + { + WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros)); + thread.Start(); + thread.waiter()->Awake(0); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(0, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + // Awake before after thread start. + { + WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros)); + thread.waiter()->Awake(MOJO_RESULT_CANCELLED); + thread.Start(); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + // Awake some time after thread start. + { + WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros)); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + thread.waiter()->Awake(1); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(1, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + } + + // Awake some longer time after thread start. + { + WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros)); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(5 * kEpsilonMicros)); + thread.waiter()->Awake(1); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(1, result); + EXPECT_GT(elapsed_micros, (5-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (5+1) * kEpsilonMicros); + } + + // Don't awake -- time out (on another thread). + { + WaitingThread thread(static_cast<MojoDeadline>(2 * kEpsilonMicros)); + thread.Start(); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + } + + // No (indefinite) deadline. + + // Awake immediately after thread start. + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.Start(); + thread.waiter()->Awake(0); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(0, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + // Awake before after thread start. + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.waiter()->Awake(MOJO_RESULT_CANCELLED); + thread.Start(); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + // Awake some time after thread start. + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + thread.waiter()->Awake(1); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(1, result); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + } + + // Awake some longer time after thread start. + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(5 * kEpsilonMicros)); + thread.waiter()->Awake(1); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(1, result); + EXPECT_GT(elapsed_micros, (5-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (5+1) * kEpsilonMicros); + } +} + +TEST(WaiterTest, TimeOut) { + test::Stopwatch stopwatch; + int64_t elapsed_micros; + + Waiter waiter; + + waiter.Init(); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + + waiter.Init(); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + waiter.Wait(static_cast<MojoDeadline>(2 * kEpsilonMicros))); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); + + waiter.Init(); + stopwatch.Start(); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, + waiter.Wait(static_cast<MojoDeadline>(5 * kEpsilonMicros))); + elapsed_micros = stopwatch.Elapsed(); + EXPECT_GT(elapsed_micros, (5-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (5+1) * kEpsilonMicros); +} + +// The first |Awake()| should always win. +TEST(WaiterTest, MultipleAwakes) { + MojoResult result; + int64_t elapsed_micros; + + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.Start(); + thread.waiter()->Awake(0); + thread.waiter()->Awake(1); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(0, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.waiter()->Awake(1); + thread.Start(); + thread.waiter()->Awake(0); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(1, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + { + WaitingThread thread(MOJO_DEADLINE_INDEFINITE); + thread.Start(); + thread.waiter()->Awake(10); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + thread.waiter()->Awake(20); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(10, result); + EXPECT_LT(elapsed_micros, kEpsilonMicros); + } + + { + WaitingThread thread(static_cast<MojoDeadline>(10 * kEpsilonMicros)); + thread.Start(); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(1 * kEpsilonMicros)); + thread.waiter()->Awake(MOJO_RESULT_FAILED_PRECONDITION); + base::PlatformThread::Sleep( + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); + thread.waiter()->Awake(0); + thread.WaitUntilDone(&result, &elapsed_micros); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + EXPECT_GT(elapsed_micros, (1-1) * kEpsilonMicros); + EXPECT_LT(elapsed_micros, (1+1) * kEpsilonMicros); + } +} + +} // namespace +} // namespace system +} // namespace mojo |