summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoramistry <amistry@chromium.org>2015-12-09 23:29:03 -0800
committerCommit bot <commit-bot@chromium.org>2015-12-10 07:29:54 +0000
commit428364d72b6ccf283fff9ce0c9ea58dac3805615 (patch)
tree436c6f07d190b82bc901cd5543209aa6e24306e3
parenta9c213e88b8f2df057b249e1e6734d10e6d5825d (diff)
downloadchromium_src-428364d72b6ccf283fff9ce0c9ea58dac3805615.zip
chromium_src-428364d72b6ccf283fff9ce0c9ea58dac3805615.tar.gz
chromium_src-428364d72b6ccf283fff9ce0c9ea58dac3805615.tar.bz2
WaitSet implementation for old EDK.
BUG=556865 Review URL: https://codereview.chromium.org/1461213002 Cr-Commit-Position: refs/heads/master@{#364309}
-rw-r--r--mojo/public/c/system/wait_set.h11
-rw-r--r--mojo/public/platform/native/system_thunks.cc27
-rw-r--r--mojo/public/platform/native/system_thunks.h18
-rw-r--r--third_party/mojo/mojo_edk_system_impl.gypi2
-rw-r--r--third_party/mojo/mojo_edk_tests.gyp1
-rw-r--r--third_party/mojo/src/mojo/edk/system/BUILD.gn3
-rw-r--r--third_party/mojo/src/mojo/edk/system/core.cc39
-rw-r--r--third_party/mojo/src/mojo/edk/system/dispatcher.cc61
-rw-r--r--third_party/mojo/src/mojo/edk/system/dispatcher.h32
-rw-r--r--third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc277
-rw-r--r--third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h94
-rw-r--r--third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc459
12 files changed, 1014 insertions, 10 deletions
diff --git a/mojo/public/c/system/wait_set.h b/mojo/public/c/system/wait_set.h
index c237403..3f3365a 100644
--- a/mojo/public/c/system/wait_set.h
+++ b/mojo/public/c/system/wait_set.h
@@ -38,6 +38,10 @@ MOJO_SYSTEM_EXPORT MojoResult MojoCreateWaitSet(
// any number of different wait sets. To modify the signals being waited for,
// the handle must first be removed, and then added with the new signals.
//
+// If a handle is closed while still in the wait set, it is implicitly removed
+// from the set after being returned from |MojoGetReadyHandles()| with the
+// result |MOJO_RESULT_CANCELLED|.
+//
// It is safe to add a handle to a wait set while performing a wait on another
// thread. If the added handle already has its signals satisfied, the waiting
// thread will be woken.
@@ -80,7 +84,7 @@ MOJO_SYSTEM_EXPORT MojoResult MojoRemoveHandle(
// |MOJO_HANDLE_SIGNAL_READABLE| signal. Since handles may be added and removed
// from a wait set concurrently, it is possible for a wait set to satisfy
// |MOJO_HANDLE_SIGNAL_READABLE|, but not have any ready handles when
-// |MojoGetReadyHandle()| is called. These spurious wake-ups must be gracefully
+// |MojoGetReadyHandles()| is called. These spurious wake-ups must be gracefully
// handled.
//
// |*count| on input, must contain the maximum number of ready handles to be
@@ -99,11 +103,12 @@ MOJO_SYSTEM_EXPORT MojoResult MojoRemoveHandle(
// |signals_state| (optional) if non-null, must point to an array of size
// |*count| of |MojoHandleSignalsState|. It will be populated with the signals
// state of the corresponding handle in |*handles|. See documentation for
-// |MojoHandleSignalsState|.
+// |MojoHandleSignalsState| for more details about the meaning of each array
+// entry. The array will always be updated for every returned handle.
//
// Mojo signals and satisfiability are logically 'level-triggered'. Therefore,
// if a signal continues to be satisfied and is not removed from the wait set,
-// subsequent calls to |MojoGetReadyHandle()| will return the same handle.
+// subsequent calls to |MojoGetReadyHandles()| will return the same handle.
//
// If multiple handles have their signals satisfied, the order in which handles
// are returned is undefined. The same handle, if not removed, may be returned
diff --git a/mojo/public/platform/native/system_thunks.cc b/mojo/public/platform/native/system_thunks.cc
index ed3227f..74aaa08 100644
--- a/mojo/public/platform/native/system_thunks.cc
+++ b/mojo/public/platform/native/system_thunks.cc
@@ -158,6 +158,33 @@ MojoResult MojoUnmapBuffer(void* buffer) {
return g_thunks.UnmapBuffer(buffer);
}
+MojoResult MojoCreateWaitSet(MojoHandle* wait_set) {
+ assert(g_thunks.CreateWaitSet);
+ return g_thunks.CreateWaitSet(wait_set);
+}
+
+MojoResult MojoAddHandle(MojoHandle wait_set,
+ MojoHandle handle,
+ MojoHandleSignals signals) {
+ assert(g_thunks.AddHandle);
+ return g_thunks.AddHandle(wait_set, handle, signals);
+}
+
+MojoResult MojoRemoveHandle(MojoHandle wait_set, MojoHandle handle) {
+ assert(g_thunks.RemoveHandle);
+ return g_thunks.RemoveHandle(wait_set, handle);
+}
+
+MojoResult MojoGetReadyHandles(MojoHandle wait_set,
+ uint32_t* count,
+ MojoHandle* handles,
+ MojoResult* results,
+ struct MojoHandleSignalsState* signals_states) {
+ assert(g_thunks.GetReadyHandles);
+ return g_thunks.GetReadyHandles(wait_set, count, handles, results,
+ signals_states);
+}
+
extern "C" THUNK_EXPORT size_t MojoSetSystemThunks(
const MojoSystemThunks* system_thunks) {
if (system_thunks->size >= sizeof(g_thunks))
diff --git a/mojo/public/platform/native/system_thunks.h b/mojo/public/platform/native/system_thunks.h
index bb6ca96..8c8ea1e 100644
--- a/mojo/public/platform/native/system_thunks.h
+++ b/mojo/public/platform/native/system_thunks.h
@@ -101,6 +101,18 @@ struct MojoSystemThunks {
void** buffer,
MojoMapBufferFlags flags);
MojoResult (*UnmapBuffer)(void* buffer);
+
+ MojoResult (*CreateWaitSet)(MojoHandle* wait_set);
+ MojoResult (*AddHandle)(MojoHandle wait_set,
+ MojoHandle handle,
+ MojoHandleSignals signals);
+ MojoResult (*RemoveHandle)(MojoHandle wait_set,
+ MojoHandle handle);
+ MojoResult (*GetReadyHandles)(MojoHandle wait_set,
+ uint32_t* count,
+ MojoHandle* handles,
+ MojoResult* results,
+ struct MojoHandleSignalsState* signals_states);
};
#pragma pack(pop)
@@ -127,7 +139,11 @@ inline MojoSystemThunks MojoMakeSystemThunks() {
MojoCreateSharedBuffer,
MojoDuplicateBufferHandle,
MojoMapBuffer,
- MojoUnmapBuffer};
+ MojoUnmapBuffer,
+ MojoCreateWaitSet,
+ MojoAddHandle,
+ MojoRemoveHandle,
+ MojoGetReadyHandles};
return system_thunks;
}
#endif
diff --git a/third_party/mojo/mojo_edk_system_impl.gypi b/third_party/mojo/mojo_edk_system_impl.gypi
index fc455ed..bdb629c 100644
--- a/third_party/mojo/mojo_edk_system_impl.gypi
+++ b/third_party/mojo/mojo_edk_system_impl.gypi
@@ -130,6 +130,8 @@
'src/mojo/edk/system/transport_data.h',
'src/mojo/edk/system/unique_identifier.cc',
'src/mojo/edk/system/unique_identifier.h',
+ 'src/mojo/edk/system/wait_set_dispatcher.cc',
+ 'src/mojo/edk/system/wait_set_dispatcher.h',
'src/mojo/edk/system/waiter.cc',
'src/mojo/edk/system/waiter.h',
# Test-only code:
diff --git a/third_party/mojo/mojo_edk_tests.gyp b/third_party/mojo/mojo_edk_tests.gyp
index c02e21d..c0916bf 100644
--- a/third_party/mojo/mojo_edk_tests.gyp
+++ b/third_party/mojo/mojo_edk_tests.gyp
@@ -218,6 +218,7 @@
'src/mojo/edk/system/test_utils.h',
'src/mojo/edk/system/thread_annotations_unittest.cc',
'src/mojo/edk/system/unique_identifier_unittest.cc',
+ 'src/mojo/edk/system/wait_set_dispatcher_unittest.cc',
'src/mojo/edk/system/waiter_test_utils.cc',
'src/mojo/edk/system/waiter_test_utils.h',
'src/mojo/edk/system/waiter_unittest.cc',
diff --git a/third_party/mojo/src/mojo/edk/system/BUILD.gn b/third_party/mojo/src/mojo/edk/system/BUILD.gn
index 71dcac2..656ac72 100644
--- a/third_party/mojo/src/mojo/edk/system/BUILD.gn
+++ b/third_party/mojo/src/mojo/edk/system/BUILD.gn
@@ -110,6 +110,8 @@ component("system") {
"transport_data.h",
"unique_identifier.cc",
"unique_identifier.h",
+ "wait_set_dispatcher.cc",
+ "wait_set_dispatcher.h",
"waiter.cc",
"waiter.h",
]
@@ -210,6 +212,7 @@ test("mojo_system_unittests") {
"test_channel_endpoint_client.h",
"thread_annotations_unittest.cc",
"unique_identifier_unittest.cc",
+ "wait_set_dispatcher_unittest.cc",
"waiter_test_utils.cc",
"waiter_test_utils.h",
"waiter_unittest.cc",
diff --git a/third_party/mojo/src/mojo/edk/system/core.cc b/third_party/mojo/src/mojo/edk/system/core.cc
index d82c845..a11798f 100644
--- a/third_party/mojo/src/mojo/edk/system/core.cc
+++ b/third_party/mojo/src/mojo/edk/system/core.cc
@@ -4,8 +4,10 @@
#include "third_party/mojo/src/mojo/edk/system/core.h"
+#include <utility>
#include <vector>
+#include "base/containers/stack_container.h"
#include "base/logging.h"
#include "base/time/time.h"
#include "mojo/public/c/system/macros.h"
@@ -23,6 +25,7 @@
#include "third_party/mojo/src/mojo/edk/system/message_pipe.h"
#include "third_party/mojo/src/mojo/edk/system/message_pipe_dispatcher.h"
#include "third_party/mojo/src/mojo/edk/system/shared_buffer_dispatcher.h"
+#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h"
#include "third_party/mojo/src/mojo/edk/system/waiter.h"
namespace mojo {
@@ -189,7 +192,16 @@ MojoResult Core::CreateWaitSet(UserPointer<MojoHandle> wait_set_handle) {
if (wait_set_handle.IsNull())
return MOJO_RESULT_INVALID_ARGUMENT;
- return MOJO_RESULT_UNIMPLEMENTED;
+ scoped_refptr<WaitSetDispatcher> dispatcher = new WaitSetDispatcher();
+ MojoHandle h = AddDispatcher(dispatcher);
+ if (h == MOJO_HANDLE_INVALID) {
+ LOG(ERROR) << "Handle table full";
+ dispatcher->Close();
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+ }
+
+ wait_set_handle.Put(h);
+ return MOJO_RESULT_OK;
}
MojoResult Core::AddHandle(MojoHandle wait_set_handle,
@@ -203,7 +215,7 @@ MojoResult Core::AddHandle(MojoHandle wait_set_handle,
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
- return MOJO_RESULT_UNIMPLEMENTED;
+ return wait_set_dispatcher->AddWaitingDispatcher(dispatcher, signals, handle);
}
MojoResult Core::RemoveHandle(MojoHandle wait_set_handle,
@@ -216,7 +228,7 @@ MojoResult Core::RemoveHandle(MojoHandle wait_set_handle,
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
- return MOJO_RESULT_UNIMPLEMENTED;
+ return wait_set_dispatcher->RemoveWaitingDispatcher(dispatcher);
}
MojoResult Core::GetReadyHandles(
@@ -225,14 +237,29 @@ MojoResult Core::GetReadyHandles(
UserPointer<MojoHandle> handles,
UserPointer<MojoResult> results,
UserPointer<MojoHandleSignalsState> signals_states) {
+ if (count.IsNull() || !count.Get() || handles.IsNull() || results.IsNull())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle));
if (!wait_set_dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
- if (count.IsNull() || !count.Get() || handles.IsNull() || results.IsNull())
- return MOJO_RESULT_INVALID_ARGUMENT;
+ const uint32_t in_count = count.Get();
+ DispatcherVector awoken_dispatchers;
+ base::StackVector<uintptr_t, 16> contexts;
+ contexts->assign(in_count, MOJO_HANDLE_INVALID);
+
+ MojoResult result = wait_set_dispatcher->GetReadyDispatchers(
+ count, &awoken_dispatchers, results, MakeUserPointer(contexts->data()));
+
+ const uint32_t out_count = count.Get();
+ for (uint32_t i = 0; i < out_count; i++) {
+ handles.At(i).Put(static_cast<MojoHandle>(contexts[i]));
+ if (!signals_states.IsNull())
+ signals_states.At(i).Put(awoken_dispatchers[i]->GetHandleSignalsState());
+ }
- return MOJO_RESULT_UNIMPLEMENTED;
+ return result;
}
MojoResult Core::CreateMessagePipe(
diff --git a/third_party/mojo/src/mojo/edk/system/dispatcher.cc b/third_party/mojo/src/mojo/edk/system/dispatcher.cc
index 2c11407..ba4fe18 100644
--- a/third_party/mojo/src/mojo/edk/system/dispatcher.cc
+++ b/third_party/mojo/src/mojo/edk/system/dispatcher.cc
@@ -74,6 +74,7 @@ scoped_refptr<Dispatcher> Dispatcher::TransportDataAccess::Deserialize(
size_t size,
embedder::PlatformHandleVector* platform_handles) {
switch (static_cast<Dispatcher::Type>(type)) {
+ case Type::WAIT_SET:
case Type::UNKNOWN:
DVLOG(2) << "Deserializing invalid handle";
return nullptr;
@@ -250,6 +251,37 @@ void Dispatcher::RemoveAwakable(Awakable* awakable,
RemoveAwakableImplNoLock(awakable, handle_signals_state);
}
+MojoResult Dispatcher::AddWaitingDispatcher(
+ const scoped_refptr<Dispatcher>& dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) {
+ MutexLocker locker(&mutex_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return AddWaitingDispatcherImplNoLock(dispatcher, signals, context);
+}
+
+MojoResult Dispatcher::RemoveWaitingDispatcher(
+ const scoped_refptr<Dispatcher>& dispatcher) {
+ MutexLocker locker(&mutex_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return RemoveWaitingDispatcherImplNoLock(dispatcher);
+}
+
+MojoResult Dispatcher::GetReadyDispatchers(UserPointer<uint32_t> count,
+ DispatcherVector* dispatchers,
+ UserPointer<MojoResult> results,
+ UserPointer<uintptr_t> contexts) {
+ MutexLocker locker(&mutex_);
+ if (is_closed_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return GetReadyDispatchersImplNoLock(count, dispatchers, results, contexts);
+}
+
Dispatcher::Dispatcher() : is_closed_(false) {
}
@@ -389,6 +421,35 @@ MojoResult Dispatcher::AddAwakableImplNoLock(
return MOJO_RESULT_FAILED_PRECONDITION;
}
+MojoResult Dispatcher::AddWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& /*dispatcher*/,
+ MojoHandleSignals /*signals*/,
+ uintptr_t /*context*/) {
+ mutex_.AssertHeld();
+ DCHECK(!is_closed_);
+ // By default, not supported. Only needed for wait set dispatchers.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::RemoveWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& /*dispatcher*/) {
+ mutex_.AssertHeld();
+ DCHECK(!is_closed_);
+ // By default, not supported. Only needed for wait set dispatchers.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::GetReadyDispatchersImplNoLock(
+ UserPointer<uint32_t> /*count*/,
+ DispatcherVector* /*dispatchers*/,
+ UserPointer<MojoResult> /*results*/,
+ UserPointer<uintptr_t> /*contexts*/) {
+ mutex_.AssertHeld();
+ DCHECK(!is_closed_);
+ // By default, not supported. Only needed for wait set dispatchers.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
void Dispatcher::RemoveAwakableImplNoLock(Awakable* /*awakable*/,
HandleSignalsState* signals_state) {
mutex_.AssertHeld();
diff --git a/third_party/mojo/src/mojo/edk/system/dispatcher.h b/third_party/mojo/src/mojo/edk/system/dispatcher.h
index c0bb69b..e9af1c0 100644
--- a/third_party/mojo/src/mojo/edk/system/dispatcher.h
+++ b/third_party/mojo/src/mojo/edk/system/dispatcher.h
@@ -66,6 +66,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher
DATA_PIPE_PRODUCER,
DATA_PIPE_CONSUMER,
SHARED_BUFFER,
+ WAIT_SET,
// "Private" types (not exposed via the public interface):
PLATFORM_HANDLE = -1
@@ -153,6 +154,25 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher
// |*signals_state| will be set to the current handle signals state.
void RemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state);
+ // Adds a dispatcher to wait on. When the dispatcher satisfies |signals|, it
+ // will be returned in the next call to |GetReadyDispatchers()|. If
+ // |dispatcher| has been added, it must be removed before adding again,
+ // otherwise |MOJO_RESULT_ALREADY_EXISTS| will be returned.
+ MojoResult AddWaitingDispatcher(const scoped_refptr<Dispatcher>& dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context);
+ // Removes a dispatcher to wait on. If |dispatcher| has not been added,
+ // |MOJO_RESULT_NOT_FOUND| will be returned.
+ MojoResult RemoveWaitingDispatcher(
+ const scoped_refptr<Dispatcher>& dispatcher);
+ // Returns a set of ready dispatchers. |*count| is the maximum number of
+ // dispatchers to return, and will contain the number of dispatchers returned
+ // in |dispatchers| on completion.
+ MojoResult GetReadyDispatchers(UserPointer<uint32_t> count,
+ DispatcherVector* dispatchers,
+ UserPointer<MojoResult> results,
+ UserPointer<uintptr_t> contexts);
+
// A dispatcher must be put into a special state in order to be sent across a
// message pipe. Outside of tests, only |HandleTableAccess| is allowed to do
// this, since there are requirements on the handle table (see below).
@@ -284,6 +304,18 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher
virtual void RemoveAwakableImplNoLock(Awakable* awakable,
HandleSignalsState* signals_state)
MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ virtual MojoResult AddWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ virtual MojoResult RemoveWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher)
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ virtual MojoResult GetReadyDispatchersImplNoLock(
+ UserPointer<uint32_t> count,
+ DispatcherVector* dispatchers,
+ UserPointer<MojoResult> results,
+ UserPointer<uintptr_t> contexts) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// These implement the API used to serialize dispatchers to a |Channel|
// (described below). They will only be called on a dispatcher that's attached
diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc
new file mode 100644
index 0000000..45518e7
--- /dev/null
+++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc
@@ -0,0 +1,277 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "base/logging.h"
+#include "third_party/mojo/src/mojo/edk/system/awakable.h"
+
+namespace mojo {
+namespace system {
+
+class WaitSetDispatcher::Waiter final : public Awakable {
+ public:
+ explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
+ ~Waiter() {}
+
+ // |Awakable| implementation.
+ bool Awake(MojoResult result, uintptr_t context) override {
+ // Note: This is called with various Mojo locks held.
+ dispatcher_->WakeDispatcher(result, context);
+ // Removes |this| from the dispatcher's list of waiters.
+ return false;
+ }
+
+ private:
+ WaitSetDispatcher* const dispatcher_;
+};
+
+WaitSetDispatcher::WaitSetDispatcher()
+ : waiter_(new WaitSetDispatcher::Waiter(this)) {}
+
+WaitSetDispatcher::~WaitSetDispatcher() {
+ DCHECK(waiting_dispatchers_.empty());
+ DCHECK(awoken_queue_.empty());
+ DCHECK(processed_dispatchers_.empty());
+}
+
+Dispatcher::Type WaitSetDispatcher::GetType() const {
+ return Type::WAIT_SET;
+}
+
+void WaitSetDispatcher::CloseImplNoLock() {
+ mutex().AssertHeld();
+ for (const auto& entry : waiting_dispatchers_)
+ entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
+ waiting_dispatchers_.clear();
+
+ MutexLocker locker(&awoken_mutex_);
+ awoken_queue_.clear();
+ processed_dispatchers_.clear();
+}
+
+MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) {
+ mutex().AssertHeld();
+ if (dispatcher == this)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
+ auto it = waiting_dispatchers_.find(dispatcher_handle);
+ if (it != waiting_dispatchers_.end()) {
+ return MOJO_RESULT_ALREADY_EXISTS;
+ }
+
+ const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
+ dispatcher_handle, nullptr);
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) {
+ // Dispatcher is closed.
+ return result;
+ } else if (result != MOJO_RESULT_OK) {
+ WakeDispatcher(result, dispatcher_handle);
+ }
+
+ WaitState state;
+ state.dispatcher = dispatcher;
+ state.context = context;
+ state.signals = signals;
+ bool inserted =
+ waiting_dispatchers_.insert(std::make_pair(dispatcher_handle, state))
+ .second;
+ DCHECK(inserted);
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher) {
+ mutex().AssertHeld();
+ uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
+ auto it = waiting_dispatchers_.find(dispatcher_handle);
+ if (it == waiting_dispatchers_.end())
+ return MOJO_RESULT_NOT_FOUND;
+
+ dispatcher->RemoveAwakable(waiter_.get(), nullptr);
+ // At this point, it should not be possible for |waiter_| to be woken with
+ // |dispatcher|.
+ waiting_dispatchers_.erase(it);
+
+ MutexLocker locker(&awoken_mutex_);
+ int num_erased = 0;
+ for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
+ if (it->first == dispatcher_handle) {
+ it = awoken_queue_.erase(it);
+ num_erased++;
+ } else {
+ ++it;
+ }
+ }
+ // The dispatcher should only exist in the queue once.
+ DCHECK_LE(num_erased, 1);
+ processed_dispatchers_.erase(
+ std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
+ dispatcher_handle),
+ processed_dispatchers_.end());
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock(
+ UserPointer<uint32_t> count,
+ DispatcherVector* dispatchers,
+ UserPointer<MojoResult> results,
+ UserPointer<uintptr_t> contexts) {
+ mutex().AssertHeld();
+ dispatchers->clear();
+
+ // Re-queue any already retrieved dispatchers. These should be the dispatchers
+ // that were returned on the last call to this function. This loop is
+ // necessary to preserve the logically level-triggering behaviour of waiting
+ // in Mojo. In particular, if no action is taken on a signal, that signal
+ // continues to be satisfied, and therefore a |MojoWait()| on that
+ // handle/signal continues to return immediately.
+ std::deque<uintptr_t> pending;
+ {
+ MutexLocker locker(&awoken_mutex_);
+ pending.swap(processed_dispatchers_);
+ }
+ for (uintptr_t d : pending) {
+ auto it = waiting_dispatchers_.find(d);
+ // Anything in |processed_dispatchers_| should also be in
+ // |waiting_dispatchers_| since dispatchers are removed from both in
+ // |RemoveWaitingDispatcherImplNoLock()|.
+ DCHECK(it != waiting_dispatchers_.end());
+
+ // |awoken_mutex_| cannot be held here because
+ // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
+ // mutex is held while running |WakeDispatcher()| below, which needs to
+ // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
+ // a deadlock.
+ const MojoResult result = it->second.dispatcher->AddAwakable(
+ waiter_.get(), it->second.signals, d, nullptr);
+
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) {
+ // Dispatcher is closed. Implicitly remove it from the wait set since
+ // it may be impossible to remove using |MojoRemoveHandle()|.
+ waiting_dispatchers_.erase(it);
+ } else if (result != MOJO_RESULT_OK) {
+ WakeDispatcher(result, d);
+ }
+ }
+
+ const uint32_t max_woken = count.Get();
+ uint32_t num_woken = 0;
+
+ MutexLocker locker(&awoken_mutex_);
+ while (!awoken_queue_.empty() && num_woken < max_woken) {
+ uintptr_t d = awoken_queue_.front().first;
+ MojoResult result = awoken_queue_.front().second;
+ awoken_queue_.pop_front();
+
+ auto it = waiting_dispatchers_.find(d);
+ DCHECK(it != waiting_dispatchers_.end());
+
+ results.At(num_woken).Put(result);
+ dispatchers->push_back(it->second.dispatcher);
+ if (!contexts.IsNull())
+ contexts.At(num_woken).Put(it->second.context);
+
+ if (result != MOJO_RESULT_CANCELLED) {
+ processed_dispatchers_.push_back(d);
+ } else {
+ waiting_dispatchers_.erase(it);
+ }
+
+ num_woken++;
+ }
+
+ count.Put(num_woken);
+ if (!num_woken)
+ return MOJO_RESULT_SHOULD_WAIT;
+
+ return MOJO_RESULT_OK;
+}
+
+void WaitSetDispatcher::CancelAllAwakablesNoLock() {
+ mutex().AssertHeld();
+ MutexLocker locker(&awakable_mutex_);
+ awakable_list_.CancelAll();
+}
+
+MojoResult WaitSetDispatcher::AddAwakableImplNoLock(
+ Awakable* awakable,
+ MojoHandleSignals signals,
+ uintptr_t context,
+ HandleSignalsState* signals_state) {
+ mutex().AssertHeld();
+
+ HandleSignalsState state(GetHandleSignalsStateImplNoLock());
+ if (state.satisfies(signals)) {
+ if (signals_state)
+ *signals_state = state;
+ return MOJO_RESULT_ALREADY_EXISTS;
+ }
+ if (!state.can_satisfy(signals)) {
+ if (signals_state)
+ *signals_state = state;
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ MutexLocker locker(&awakable_mutex_);
+ awakable_list_.Add(awakable, signals, context);
+ return MOJO_RESULT_OK;
+}
+
+void WaitSetDispatcher::RemoveAwakableImplNoLock(
+ Awakable* awakable,
+ HandleSignalsState* signals_state) {
+ mutex().AssertHeld();
+ MutexLocker locker(&awakable_mutex_);
+ awakable_list_.Remove(awakable);
+ if (signals_state)
+ *signals_state = GetHandleSignalsStateImplNoLock();
+}
+
+HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const {
+ mutex().AssertHeld();
+ HandleSignalsState rv;
+ rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ MutexLocker locker(&awoken_mutex_);
+ if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
+ rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ return rv;
+}
+
+scoped_refptr<Dispatcher>
+WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
+ mutex().AssertHeld();
+ LOG(ERROR) << "Attempting to serialize WaitSet";
+ CloseImplNoLock();
+ return new WaitSetDispatcher();
+}
+
+void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
+ {
+ MutexLocker locker(&awoken_mutex_);
+
+ if (result == MOJO_RESULT_ALREADY_EXISTS)
+ result = MOJO_RESULT_OK;
+
+ awoken_queue_.push_back(std::make_pair(context, result));
+ }
+
+ MutexLocker locker(&awakable_mutex_);
+ HandleSignalsState signals_state;
+ signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ awakable_list_.AwakeForStateChange(signals_state);
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h
new file mode 100644
index 0000000..a53f61a
--- /dev/null
+++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h
@@ -0,0 +1,94 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef THIRD_PARTY_MOJO_SRC_MOJO_EDK_SYSTEM_WAIT_SET_DISPATCHER_H_
+#define THIRD_PARTY_MOJO_SRC_MOJO_EDK_SYSTEM_WAIT_SET_DISPATCHER_H_
+
+#include <deque>
+#include <map>
+#include <utility>
+
+#include "base/memory/scoped_ptr.h"
+#include "mojo/public/cpp/system/macros.h"
+#include "third_party/mojo/src/mojo/edk/system/awakable_list.h"
+#include "third_party/mojo/src/mojo/edk/system/dispatcher.h"
+#include "third_party/mojo/src/mojo/edk/system/mutex.h"
+#include "third_party/mojo/src/mojo/edk/system/system_impl_export.h"
+
+namespace mojo {
+namespace system {
+
+class MOJO_SYSTEM_IMPL_EXPORT WaitSetDispatcher : public Dispatcher {
+ public:
+ WaitSetDispatcher();
+ ~WaitSetDispatcher() override;
+
+ // |Dispatcher| public methods:
+ Type GetType() const override;
+
+ private:
+ // Internal implementation of Awakable.
+ class Waiter;
+
+ struct WaitState {
+ scoped_refptr<Dispatcher> dispatcher;
+ MojoHandleSignals signals;
+ uintptr_t context;
+ };
+
+ // |Dispatcher| protected methods:
+ void CloseImplNoLock() override;
+ void CancelAllAwakablesNoLock() override;
+ MojoResult AddAwakableImplNoLock(Awakable* awakable,
+ MojoHandleSignals signals,
+ uintptr_t context,
+ HandleSignalsState* signals_state) override;
+ void RemoveAwakableImplNoLock(Awakable* awakable,
+ HandleSignalsState* signals_state) override;
+ MojoResult AddWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) override;
+ MojoResult RemoveWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher) override;
+ MojoResult GetReadyDispatchersImplNoLock(
+ UserPointer<uint32_t> count,
+ DispatcherVector* dispatchers,
+ UserPointer<MojoResult> results,
+ UserPointer<uintptr_t> contexts) override;
+ HandleSignalsState GetHandleSignalsStateImplNoLock() const override;
+ scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock()
+ override;
+
+ // Signal that the dispatcher indexed by |context| has been woken up with
+ // |result| and is now ready.
+ void WakeDispatcher(MojoResult result, uintptr_t context);
+
+ // Map of dispatchers being waited on. Key is a Dispatcher* casted to a
+ // uintptr_t, and should be treated as an opaque value and not casted back.
+ std::map<uintptr_t, WaitState> waiting_dispatchers_ MOJO_GUARDED_BY(mutex());
+
+ // Separate mutex that can be locked without locking |mutex()|.
+ mutable Mutex awoken_mutex_ MOJO_ACQUIRED_AFTER(mutex());
+ // List of dispatchers that have been woken up. Any dispatcher in this queue
+ // will NOT currently be waited on.
+ std::deque<std::pair<uintptr_t, MojoResult>> awoken_queue_
+ MOJO_GUARDED_BY(awoken_mutex_);
+ // List of dispatchers that have been woken up and retrieved on the last call
+ // to |GetReadyDispatchers()|.
+ std::deque<uintptr_t> processed_dispatchers_ MOJO_GUARDED_BY(awoken_mutex_);
+
+ // Separate mutex that can be locked without locking |mutex()|.
+ Mutex awakable_mutex_ MOJO_ACQUIRED_AFTER(mutex());
+ // List of dispatchers being waited on.
+ AwakableList awakable_list_ MOJO_GUARDED_BY(awakable_mutex_);
+
+ // Waiter used to wait on dispatchers.
+ scoped_ptr<Waiter> waiter_;
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // THIRD_PARTY_MOJO_SRC_MOJO_EDK_SYSTEM_WAIT_SET_DISPATCHER_H_
diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc
new file mode 100644
index 0000000..86f5e36
--- /dev/null
+++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc
@@ -0,0 +1,459 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h"
+
+#include <algorithm>
+
+#include "base/memory/ref_counted.h"
+#include "mojo/public/cpp/system/macros.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/mojo/src/mojo/edk/system/message_pipe.h"
+#include "third_party/mojo/src/mojo/edk/system/message_pipe_dispatcher.h"
+#include "third_party/mojo/src/mojo/edk/system/test_utils.h"
+#include "third_party/mojo/src/mojo/edk/system/waiter.h"
+
+namespace mojo {
+namespace system {
+namespace {
+
+class WaitSetDispatcherTest : public ::testing::Test {
+ public:
+ WaitSetDispatcherTest() {}
+ ~WaitSetDispatcherTest() override {}
+
+ void SetUp() override {
+ dispatcher0_ = MessagePipeDispatcher::Create(
+ MessagePipeDispatcher::kDefaultCreateOptions);
+ dispatcher1_ = MessagePipeDispatcher::Create(
+ MessagePipeDispatcher::kDefaultCreateOptions);
+
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
+ dispatcher0_->Init(mp, 0);
+ dispatcher1_->Init(mp, 1);
+
+ dispatchers_to_close_.push_back(dispatcher0_);
+ dispatchers_to_close_.push_back(dispatcher1_);
+ }
+
+ void TearDown() override {
+ for (auto& d : dispatchers_to_close_)
+ d->Close();
+ }
+
+ MojoResult GetOneReadyDispatcher(
+ const scoped_refptr<WaitSetDispatcher>& wait_set,
+ scoped_refptr<Dispatcher>* ready_dispatcher,
+ uintptr_t* context) {
+ uint32_t count = 1;
+ MojoResult dispatcher_result = MOJO_RESULT_UNKNOWN;
+ DispatcherVector dispatchers;
+ MojoResult result = wait_set->GetReadyDispatchers(
+ MakeUserPointer(&count),
+ &dispatchers,
+ MakeUserPointer(&dispatcher_result),
+ MakeUserPointer(context));
+ if (result == MOJO_RESULT_OK) {
+ CHECK_EQ(1u, dispatchers.size());
+ *ready_dispatcher = dispatchers[0];
+ return dispatcher_result;
+ }
+ return result;
+ }
+
+ void CloseOnShutdown(const scoped_refptr<Dispatcher>& dispatcher) {
+ dispatchers_to_close_.push_back(dispatcher);
+ }
+
+ protected:
+ scoped_refptr<MessagePipeDispatcher> dispatcher0_;
+ scoped_refptr<MessagePipeDispatcher> dispatcher1_;
+
+ DispatcherVector dispatchers_to_close_;
+
+ DISALLOW_COPY_AND_ASSIGN(WaitSetDispatcherTest);
+};
+
+TEST_F(WaitSetDispatcherTest, Basic) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(wait_set);
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 1));
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher1_,
+ MOJO_HANDLE_SIGNAL_WRITABLE, 2));
+
+ Waiter w;
+ uintptr_t context = 0;
+ w.Init();
+ HandleSignalsState hss;
+ // |dispatcher1_| should already be writable.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+
+ scoped_refptr<Dispatcher> woken_dispatcher;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context));
+ EXPECT_EQ(dispatcher1_, woken_dispatcher);
+ EXPECT_EQ(2u, context);
+ // If a ready dispatcher isn't removed, it will continue to be returned.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ woken_dispatcher = nullptr;
+ context = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context));
+ EXPECT_EQ(dispatcher1_, woken_dispatcher);
+ EXPECT_EQ(2u, context);
+ ASSERT_EQ(MOJO_RESULT_OK, wait_set->RemoveWaitingDispatcher(dispatcher1_));
+
+ // No ready dispatcher.
+ hss = HandleSignalsState();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_FALSE(hss.satisfies(MOJO_HANDLE_SIGNAL_READABLE));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+
+ // Write to |dispatcher1_|, which should make |dispatcher0_| readable.
+ char buffer[] = "abcd";
+ w.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ dispatcher1_->WriteMessage(UserPointer<const void>(buffer),
+ sizeof(buffer), nullptr,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr));
+ woken_dispatcher = nullptr;
+ context = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context));
+ EXPECT_EQ(dispatcher0_, woken_dispatcher);
+ EXPECT_EQ(1u, context);
+
+ // Again, if a ready dispatcher isn't removed, it will continue to be
+ // returned.
+ woken_dispatcher = nullptr;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+ EXPECT_EQ(dispatcher0_, woken_dispatcher);
+
+ wait_set->RemoveAwakable(&w, nullptr);
+}
+
+TEST_F(WaitSetDispatcherTest, HandleWithoutRemoving) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(wait_set);
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 1));
+
+ Waiter w;
+ uintptr_t context = 0;
+ w.Init();
+ HandleSignalsState hss;
+ // No ready dispatcher.
+ hss = HandleSignalsState();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_FALSE(hss.satisfies(MOJO_HANDLE_SIGNAL_READABLE));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
+ scoped_refptr<Dispatcher> woken_dispatcher;
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+
+ // The tested behaviour below should be repeatable.
+ for (size_t i = 0; i < 3; i++) {
+ // Write to |dispatcher1_|, which should make |dispatcher0_| readable.
+ char buffer[] = "abcd";
+ w.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ dispatcher1_->WriteMessage(UserPointer<const void>(buffer),
+ sizeof(buffer), nullptr,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr));
+ woken_dispatcher = nullptr;
+ context = 0;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context));
+ EXPECT_EQ(dispatcher0_, woken_dispatcher);
+ EXPECT_EQ(1u, context);
+
+ // Read from |dispatcher0_| which should change it's state to non-readable.
+ char read_buffer[sizeof(buffer) + 5];
+ uint32_t num_bytes = sizeof(read_buffer);
+ ASSERT_EQ(MOJO_RESULT_OK,
+ dispatcher0_->ReadMessage(UserPointer<void>(read_buffer),
+ MakeUserPointer(&num_bytes),
+ nullptr, nullptr,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(buffer), num_bytes);
+
+ // No dispatchers are ready.
+ w.Init();
+ woken_dispatcher = nullptr;
+ context = 0;
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context));
+ EXPECT_EQ(nullptr, woken_dispatcher);
+ EXPECT_EQ(0u, context);
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
+ }
+
+ wait_set->RemoveAwakable(&w, nullptr);
+}
+
+TEST_F(WaitSetDispatcherTest, MultipleReady) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(wait_set);
+
+ scoped_refptr<MessagePipeDispatcher> mp1_dispatcher0 =
+ MessagePipeDispatcher::Create(
+ MessagePipeDispatcher::kDefaultCreateOptions);
+ scoped_refptr<MessagePipeDispatcher> mp1_dispatcher1 =
+ MessagePipeDispatcher::Create(
+ MessagePipeDispatcher::kDefaultCreateOptions);
+ CloseOnShutdown(mp1_dispatcher0);
+ CloseOnShutdown(mp1_dispatcher1);
+ {
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
+ mp1_dispatcher0->Init(mp, 0);
+ mp1_dispatcher1->Init(mp, 1);
+ }
+
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher1_,
+ MOJO_HANDLE_SIGNAL_WRITABLE, 0));
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(mp1_dispatcher0,
+ MOJO_HANDLE_SIGNAL_WRITABLE, 0));
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(mp1_dispatcher1,
+ MOJO_HANDLE_SIGNAL_WRITABLE, 0));
+
+ Waiter w;
+ w.Init();
+ HandleSignalsState hss;
+ // The three writable dispatchers should be ready.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+
+ scoped_refptr<Dispatcher> woken_dispatcher;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+ // Don't know which dispatcher was returned, just that it was one of the
+ // writable ones.
+ EXPECT_TRUE(woken_dispatcher == dispatcher1_ ||
+ woken_dispatcher == mp1_dispatcher0 ||
+ woken_dispatcher == mp1_dispatcher1);
+
+ DispatcherVector dispatchers_vector;
+ uint32_t count = 4;
+ MojoResult results[4];
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->GetReadyDispatchers(MakeUserPointer(&count),
+ &dispatchers_vector,
+ MakeUserPointer(results),
+ MakeUserPointer<uintptr_t>(nullptr)));
+ EXPECT_EQ(3u, count);
+ std::sort(dispatchers_vector.begin(), dispatchers_vector.end());
+ DispatcherVector expected_dispatchers;
+ expected_dispatchers.push_back(dispatcher1_);
+ expected_dispatchers.push_back(mp1_dispatcher0);
+ expected_dispatchers.push_back(mp1_dispatcher1);
+ std::sort(expected_dispatchers.begin(), expected_dispatchers.end());
+ EXPECT_EQ(expected_dispatchers, dispatchers_vector);
+
+ // If a ready dispatcher isn't removed, it will continue to be returned.
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+ count = 4;
+ dispatchers_vector.clear();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->GetReadyDispatchers(MakeUserPointer(&count),
+ &dispatchers_vector,
+ MakeUserPointer(results),
+ MakeUserPointer<uintptr_t>(nullptr)));
+ EXPECT_EQ(3u, count);
+ std::sort(dispatchers_vector.begin(), dispatchers_vector.end());
+ EXPECT_EQ(expected_dispatchers, dispatchers_vector);
+
+ // Remove one. It shouldn't be returned any longer.
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->RemoveWaitingDispatcher(expected_dispatchers.back()));
+ expected_dispatchers.pop_back();
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+ count = 4;
+ dispatchers_vector.clear();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->GetReadyDispatchers(MakeUserPointer(&count),
+ &dispatchers_vector,
+ MakeUserPointer(results),
+ MakeUserPointer<uintptr_t>(nullptr)));
+ EXPECT_EQ(2u, count);
+ std::sort(dispatchers_vector.begin(), dispatchers_vector.end());
+ EXPECT_EQ(expected_dispatchers, dispatchers_vector);
+
+ // Write to |dispatcher1_|, which should make |dispatcher0_| readable.
+ char buffer[] = "abcd";
+ w.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ dispatcher1_->WriteMessage(UserPointer<const void>(buffer),
+ sizeof(buffer), nullptr,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+ expected_dispatchers.push_back(dispatcher0_);
+ std::sort(expected_dispatchers.begin(), expected_dispatchers.end());
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+ count = 4;
+ dispatchers_vector.clear();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->GetReadyDispatchers(MakeUserPointer(&count),
+ &dispatchers_vector,
+ MakeUserPointer(results),
+ MakeUserPointer<uintptr_t>(nullptr)));
+ EXPECT_EQ(3u, count);
+ std::sort(dispatchers_vector.begin(), dispatchers_vector.end());
+ EXPECT_EQ(expected_dispatchers, dispatchers_vector);
+}
+
+TEST_F(WaitSetDispatcherTest, InvalidParams) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+
+ // Can't add a wait set to itself.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ wait_set->AddWaitingDispatcher(wait_set,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+
+ // Can't add twice.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+
+ // Remove a dispatcher that wasn't added.
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ wait_set->RemoveWaitingDispatcher(dispatcher1_));
+
+ // Add to a closed wait set.
+ wait_set->Close();
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+}
+
+TEST_F(WaitSetDispatcherTest, NotSatisfiable) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(wait_set);
+
+ // Wait sets can only satisfy MOJO_HANDLE_SIGNAL_READABLE.
+ Waiter w;
+ w.Init();
+ HandleSignalsState hss;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_NONE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+
+ hss = HandleSignalsState();
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 0, &hss));
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_NONE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
+}
+
+TEST_F(WaitSetDispatcherTest, ClosedDispatchers) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(wait_set);
+
+ Waiter w;
+ w.Init();
+ HandleSignalsState hss;
+ // A dispatcher that was added and then closed will be cancelled.
+ ASSERT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher0_,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
+ dispatcher0_->Close();
+ EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr));
+ EXPECT_TRUE(wait_set->GetHandleSignalsState().satisfies(
+ MOJO_HANDLE_SIGNAL_READABLE));
+ scoped_refptr<Dispatcher> woken_dispatcher;
+ EXPECT_EQ(MOJO_RESULT_CANCELLED,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+ EXPECT_EQ(dispatcher0_, woken_dispatcher);
+
+ // Dispatcher will be implicitly removed because it may be impossible to
+ // remove explicitly.
+ woken_dispatcher = nullptr;
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
+ wait_set->RemoveWaitingDispatcher(dispatcher0_));
+
+ // A dispatcher that's not satisfiable should give an error.
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(dispatcher1_,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+ EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr));
+ EXPECT_TRUE(wait_set->GetHandleSignalsState().satisfies(
+ MOJO_HANDLE_SIGNAL_READABLE));
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+ EXPECT_EQ(dispatcher1_, woken_dispatcher);
+
+ wait_set->RemoveAwakable(&w, nullptr);
+}
+
+TEST_F(WaitSetDispatcherTest, NestedSets) {
+ scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(wait_set);
+ scoped_refptr<WaitSetDispatcher> nested_wait_set = new WaitSetDispatcher();
+ CloseOnShutdown(nested_wait_set);
+
+ Waiter w;
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddWaitingDispatcher(nested_wait_set,
+ MOJO_HANDLE_SIGNAL_READABLE, 0));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, nullptr));
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
+
+ // Writable signal is immediately satisfied by the message pipe.
+ w.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ nested_wait_set->AddWaitingDispatcher(
+ dispatcher0_, MOJO_HANDLE_SIGNAL_WRITABLE, 0));
+ EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr));
+ scoped_refptr<Dispatcher> woken_dispatcher;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr));
+ EXPECT_EQ(nested_wait_set, woken_dispatcher);
+
+ wait_set->RemoveAwakable(&w, nullptr);
+}
+
+} // namespace
+} // namespace system
+} // namespace mojo