diff options
author | amistry <amistry@chromium.org> | 2015-12-09 23:29:03 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-12-10 07:29:54 +0000 |
commit | 428364d72b6ccf283fff9ce0c9ea58dac3805615 (patch) | |
tree | 436c6f07d190b82bc901cd5543209aa6e24306e3 | |
parent | a9c213e88b8f2df057b249e1e6734d10e6d5825d (diff) | |
download | chromium_src-428364d72b6ccf283fff9ce0c9ea58dac3805615.zip chromium_src-428364d72b6ccf283fff9ce0c9ea58dac3805615.tar.gz chromium_src-428364d72b6ccf283fff9ce0c9ea58dac3805615.tar.bz2 |
WaitSet implementation for old EDK.
BUG=556865
Review URL: https://codereview.chromium.org/1461213002
Cr-Commit-Position: refs/heads/master@{#364309}
-rw-r--r-- | mojo/public/c/system/wait_set.h | 11 | ||||
-rw-r--r-- | mojo/public/platform/native/system_thunks.cc | 27 | ||||
-rw-r--r-- | mojo/public/platform/native/system_thunks.h | 18 | ||||
-rw-r--r-- | third_party/mojo/mojo_edk_system_impl.gypi | 2 | ||||
-rw-r--r-- | third_party/mojo/mojo_edk_tests.gyp | 1 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/BUILD.gn | 3 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/core.cc | 39 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/dispatcher.cc | 61 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/dispatcher.h | 32 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc | 277 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h | 94 | ||||
-rw-r--r-- | third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc | 459 |
12 files changed, 1014 insertions, 10 deletions
diff --git a/mojo/public/c/system/wait_set.h b/mojo/public/c/system/wait_set.h index c237403..3f3365a 100644 --- a/mojo/public/c/system/wait_set.h +++ b/mojo/public/c/system/wait_set.h @@ -38,6 +38,10 @@ MOJO_SYSTEM_EXPORT MojoResult MojoCreateWaitSet( // any number of different wait sets. To modify the signals being waited for, // the handle must first be removed, and then added with the new signals. // +// If a handle is closed while still in the wait set, it is implicitly removed +// from the set after being returned from |MojoGetReadyHandles()| with the +// result |MOJO_RESULT_CANCELLED|. +// // It is safe to add a handle to a wait set while performing a wait on another // thread. If the added handle already has its signals satisfied, the waiting // thread will be woken. @@ -80,7 +84,7 @@ MOJO_SYSTEM_EXPORT MojoResult MojoRemoveHandle( // |MOJO_HANDLE_SIGNAL_READABLE| signal. Since handles may be added and removed // from a wait set concurrently, it is possible for a wait set to satisfy // |MOJO_HANDLE_SIGNAL_READABLE|, but not have any ready handles when -// |MojoGetReadyHandle()| is called. These spurious wake-ups must be gracefully +// |MojoGetReadyHandles()| is called. These spurious wake-ups must be gracefully // handled. // // |*count| on input, must contain the maximum number of ready handles to be @@ -99,11 +103,12 @@ MOJO_SYSTEM_EXPORT MojoResult MojoRemoveHandle( // |signals_state| (optional) if non-null, must point to an array of size // |*count| of |MojoHandleSignalsState|. It will be populated with the signals // state of the corresponding handle in |*handles|. See documentation for -// |MojoHandleSignalsState|. +// |MojoHandleSignalsState| for more details about the meaning of each array +// entry. The array will always be updated for every returned handle. // // Mojo signals and satisfiability are logically 'level-triggered'. Therefore, // if a signal continues to be satisfied and is not removed from the wait set, -// subsequent calls to |MojoGetReadyHandle()| will return the same handle. +// subsequent calls to |MojoGetReadyHandles()| will return the same handle. // // If multiple handles have their signals satisfied, the order in which handles // are returned is undefined. The same handle, if not removed, may be returned diff --git a/mojo/public/platform/native/system_thunks.cc b/mojo/public/platform/native/system_thunks.cc index ed3227f..74aaa08 100644 --- a/mojo/public/platform/native/system_thunks.cc +++ b/mojo/public/platform/native/system_thunks.cc @@ -158,6 +158,33 @@ MojoResult MojoUnmapBuffer(void* buffer) { return g_thunks.UnmapBuffer(buffer); } +MojoResult MojoCreateWaitSet(MojoHandle* wait_set) { + assert(g_thunks.CreateWaitSet); + return g_thunks.CreateWaitSet(wait_set); +} + +MojoResult MojoAddHandle(MojoHandle wait_set, + MojoHandle handle, + MojoHandleSignals signals) { + assert(g_thunks.AddHandle); + return g_thunks.AddHandle(wait_set, handle, signals); +} + +MojoResult MojoRemoveHandle(MojoHandle wait_set, MojoHandle handle) { + assert(g_thunks.RemoveHandle); + return g_thunks.RemoveHandle(wait_set, handle); +} + +MojoResult MojoGetReadyHandles(MojoHandle wait_set, + uint32_t* count, + MojoHandle* handles, + MojoResult* results, + struct MojoHandleSignalsState* signals_states) { + assert(g_thunks.GetReadyHandles); + return g_thunks.GetReadyHandles(wait_set, count, handles, results, + signals_states); +} + extern "C" THUNK_EXPORT size_t MojoSetSystemThunks( const MojoSystemThunks* system_thunks) { if (system_thunks->size >= sizeof(g_thunks)) diff --git a/mojo/public/platform/native/system_thunks.h b/mojo/public/platform/native/system_thunks.h index bb6ca96..8c8ea1e 100644 --- a/mojo/public/platform/native/system_thunks.h +++ b/mojo/public/platform/native/system_thunks.h @@ -101,6 +101,18 @@ struct MojoSystemThunks { void** buffer, MojoMapBufferFlags flags); MojoResult (*UnmapBuffer)(void* buffer); + + MojoResult (*CreateWaitSet)(MojoHandle* wait_set); + MojoResult (*AddHandle)(MojoHandle wait_set, + MojoHandle handle, + MojoHandleSignals signals); + MojoResult (*RemoveHandle)(MojoHandle wait_set, + MojoHandle handle); + MojoResult (*GetReadyHandles)(MojoHandle wait_set, + uint32_t* count, + MojoHandle* handles, + MojoResult* results, + struct MojoHandleSignalsState* signals_states); }; #pragma pack(pop) @@ -127,7 +139,11 @@ inline MojoSystemThunks MojoMakeSystemThunks() { MojoCreateSharedBuffer, MojoDuplicateBufferHandle, MojoMapBuffer, - MojoUnmapBuffer}; + MojoUnmapBuffer, + MojoCreateWaitSet, + MojoAddHandle, + MojoRemoveHandle, + MojoGetReadyHandles}; return system_thunks; } #endif diff --git a/third_party/mojo/mojo_edk_system_impl.gypi b/third_party/mojo/mojo_edk_system_impl.gypi index fc455ed..bdb629c 100644 --- a/third_party/mojo/mojo_edk_system_impl.gypi +++ b/third_party/mojo/mojo_edk_system_impl.gypi @@ -130,6 +130,8 @@ 'src/mojo/edk/system/transport_data.h', 'src/mojo/edk/system/unique_identifier.cc', 'src/mojo/edk/system/unique_identifier.h', + 'src/mojo/edk/system/wait_set_dispatcher.cc', + 'src/mojo/edk/system/wait_set_dispatcher.h', 'src/mojo/edk/system/waiter.cc', 'src/mojo/edk/system/waiter.h', # Test-only code: diff --git a/third_party/mojo/mojo_edk_tests.gyp b/third_party/mojo/mojo_edk_tests.gyp index c02e21d..c0916bf 100644 --- a/third_party/mojo/mojo_edk_tests.gyp +++ b/third_party/mojo/mojo_edk_tests.gyp @@ -218,6 +218,7 @@ 'src/mojo/edk/system/test_utils.h', 'src/mojo/edk/system/thread_annotations_unittest.cc', 'src/mojo/edk/system/unique_identifier_unittest.cc', + 'src/mojo/edk/system/wait_set_dispatcher_unittest.cc', 'src/mojo/edk/system/waiter_test_utils.cc', 'src/mojo/edk/system/waiter_test_utils.h', 'src/mojo/edk/system/waiter_unittest.cc', diff --git a/third_party/mojo/src/mojo/edk/system/BUILD.gn b/third_party/mojo/src/mojo/edk/system/BUILD.gn index 71dcac2..656ac72 100644 --- a/third_party/mojo/src/mojo/edk/system/BUILD.gn +++ b/third_party/mojo/src/mojo/edk/system/BUILD.gn @@ -110,6 +110,8 @@ component("system") { "transport_data.h", "unique_identifier.cc", "unique_identifier.h", + "wait_set_dispatcher.cc", + "wait_set_dispatcher.h", "waiter.cc", "waiter.h", ] @@ -210,6 +212,7 @@ test("mojo_system_unittests") { "test_channel_endpoint_client.h", "thread_annotations_unittest.cc", "unique_identifier_unittest.cc", + "wait_set_dispatcher_unittest.cc", "waiter_test_utils.cc", "waiter_test_utils.h", "waiter_unittest.cc", diff --git a/third_party/mojo/src/mojo/edk/system/core.cc b/third_party/mojo/src/mojo/edk/system/core.cc index d82c845..a11798f 100644 --- a/third_party/mojo/src/mojo/edk/system/core.cc +++ b/third_party/mojo/src/mojo/edk/system/core.cc @@ -4,8 +4,10 @@ #include "third_party/mojo/src/mojo/edk/system/core.h" +#include <utility> #include <vector> +#include "base/containers/stack_container.h" #include "base/logging.h" #include "base/time/time.h" #include "mojo/public/c/system/macros.h" @@ -23,6 +25,7 @@ #include "third_party/mojo/src/mojo/edk/system/message_pipe.h" #include "third_party/mojo/src/mojo/edk/system/message_pipe_dispatcher.h" #include "third_party/mojo/src/mojo/edk/system/shared_buffer_dispatcher.h" +#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h" #include "third_party/mojo/src/mojo/edk/system/waiter.h" namespace mojo { @@ -189,7 +192,16 @@ MojoResult Core::CreateWaitSet(UserPointer<MojoHandle> wait_set_handle) { if (wait_set_handle.IsNull()) return MOJO_RESULT_INVALID_ARGUMENT; - return MOJO_RESULT_UNIMPLEMENTED; + scoped_refptr<WaitSetDispatcher> dispatcher = new WaitSetDispatcher(); + MojoHandle h = AddDispatcher(dispatcher); + if (h == MOJO_HANDLE_INVALID) { + LOG(ERROR) << "Handle table full"; + dispatcher->Close(); + return MOJO_RESULT_RESOURCE_EXHAUSTED; + } + + wait_set_handle.Put(h); + return MOJO_RESULT_OK; } MojoResult Core::AddHandle(MojoHandle wait_set_handle, @@ -203,7 +215,7 @@ MojoResult Core::AddHandle(MojoHandle wait_set_handle, if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; - return MOJO_RESULT_UNIMPLEMENTED; + return wait_set_dispatcher->AddWaitingDispatcher(dispatcher, signals, handle); } MojoResult Core::RemoveHandle(MojoHandle wait_set_handle, @@ -216,7 +228,7 @@ MojoResult Core::RemoveHandle(MojoHandle wait_set_handle, if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; - return MOJO_RESULT_UNIMPLEMENTED; + return wait_set_dispatcher->RemoveWaitingDispatcher(dispatcher); } MojoResult Core::GetReadyHandles( @@ -225,14 +237,29 @@ MojoResult Core::GetReadyHandles( UserPointer<MojoHandle> handles, UserPointer<MojoResult> results, UserPointer<MojoHandleSignalsState> signals_states) { + if (count.IsNull() || !count.Get() || handles.IsNull() || results.IsNull()) + return MOJO_RESULT_INVALID_ARGUMENT; + scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle)); if (!wait_set_dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; - if (count.IsNull() || !count.Get() || handles.IsNull() || results.IsNull()) - return MOJO_RESULT_INVALID_ARGUMENT; + const uint32_t in_count = count.Get(); + DispatcherVector awoken_dispatchers; + base::StackVector<uintptr_t, 16> contexts; + contexts->assign(in_count, MOJO_HANDLE_INVALID); + + MojoResult result = wait_set_dispatcher->GetReadyDispatchers( + count, &awoken_dispatchers, results, MakeUserPointer(contexts->data())); + + const uint32_t out_count = count.Get(); + for (uint32_t i = 0; i < out_count; i++) { + handles.At(i).Put(static_cast<MojoHandle>(contexts[i])); + if (!signals_states.IsNull()) + signals_states.At(i).Put(awoken_dispatchers[i]->GetHandleSignalsState()); + } - return MOJO_RESULT_UNIMPLEMENTED; + return result; } MojoResult Core::CreateMessagePipe( diff --git a/third_party/mojo/src/mojo/edk/system/dispatcher.cc b/third_party/mojo/src/mojo/edk/system/dispatcher.cc index 2c11407..ba4fe18 100644 --- a/third_party/mojo/src/mojo/edk/system/dispatcher.cc +++ b/third_party/mojo/src/mojo/edk/system/dispatcher.cc @@ -74,6 +74,7 @@ scoped_refptr<Dispatcher> Dispatcher::TransportDataAccess::Deserialize( size_t size, embedder::PlatformHandleVector* platform_handles) { switch (static_cast<Dispatcher::Type>(type)) { + case Type::WAIT_SET: case Type::UNKNOWN: DVLOG(2) << "Deserializing invalid handle"; return nullptr; @@ -250,6 +251,37 @@ void Dispatcher::RemoveAwakable(Awakable* awakable, RemoveAwakableImplNoLock(awakable, handle_signals_state); } +MojoResult Dispatcher::AddWaitingDispatcher( + const scoped_refptr<Dispatcher>& dispatcher, + MojoHandleSignals signals, + uintptr_t context) { + MutexLocker locker(&mutex_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return AddWaitingDispatcherImplNoLock(dispatcher, signals, context); +} + +MojoResult Dispatcher::RemoveWaitingDispatcher( + const scoped_refptr<Dispatcher>& dispatcher) { + MutexLocker locker(&mutex_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return RemoveWaitingDispatcherImplNoLock(dispatcher); +} + +MojoResult Dispatcher::GetReadyDispatchers(UserPointer<uint32_t> count, + DispatcherVector* dispatchers, + UserPointer<MojoResult> results, + UserPointer<uintptr_t> contexts) { + MutexLocker locker(&mutex_); + if (is_closed_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return GetReadyDispatchersImplNoLock(count, dispatchers, results, contexts); +} + Dispatcher::Dispatcher() : is_closed_(false) { } @@ -389,6 +421,35 @@ MojoResult Dispatcher::AddAwakableImplNoLock( return MOJO_RESULT_FAILED_PRECONDITION; } +MojoResult Dispatcher::AddWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& /*dispatcher*/, + MojoHandleSignals /*signals*/, + uintptr_t /*context*/) { + mutex_.AssertHeld(); + DCHECK(!is_closed_); + // By default, not supported. Only needed for wait set dispatchers. + return MOJO_RESULT_INVALID_ARGUMENT; +} + +MojoResult Dispatcher::RemoveWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& /*dispatcher*/) { + mutex_.AssertHeld(); + DCHECK(!is_closed_); + // By default, not supported. Only needed for wait set dispatchers. + return MOJO_RESULT_INVALID_ARGUMENT; +} + +MojoResult Dispatcher::GetReadyDispatchersImplNoLock( + UserPointer<uint32_t> /*count*/, + DispatcherVector* /*dispatchers*/, + UserPointer<MojoResult> /*results*/, + UserPointer<uintptr_t> /*contexts*/) { + mutex_.AssertHeld(); + DCHECK(!is_closed_); + // By default, not supported. Only needed for wait set dispatchers. + return MOJO_RESULT_INVALID_ARGUMENT; +} + void Dispatcher::RemoveAwakableImplNoLock(Awakable* /*awakable*/, HandleSignalsState* signals_state) { mutex_.AssertHeld(); diff --git a/third_party/mojo/src/mojo/edk/system/dispatcher.h b/third_party/mojo/src/mojo/edk/system/dispatcher.h index c0bb69b..e9af1c0 100644 --- a/third_party/mojo/src/mojo/edk/system/dispatcher.h +++ b/third_party/mojo/src/mojo/edk/system/dispatcher.h @@ -66,6 +66,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher DATA_PIPE_PRODUCER, DATA_PIPE_CONSUMER, SHARED_BUFFER, + WAIT_SET, // "Private" types (not exposed via the public interface): PLATFORM_HANDLE = -1 @@ -153,6 +154,25 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher // |*signals_state| will be set to the current handle signals state. void RemoveAwakable(Awakable* awakable, HandleSignalsState* signals_state); + // Adds a dispatcher to wait on. When the dispatcher satisfies |signals|, it + // will be returned in the next call to |GetReadyDispatchers()|. If + // |dispatcher| has been added, it must be removed before adding again, + // otherwise |MOJO_RESULT_ALREADY_EXISTS| will be returned. + MojoResult AddWaitingDispatcher(const scoped_refptr<Dispatcher>& dispatcher, + MojoHandleSignals signals, + uintptr_t context); + // Removes a dispatcher to wait on. If |dispatcher| has not been added, + // |MOJO_RESULT_NOT_FOUND| will be returned. + MojoResult RemoveWaitingDispatcher( + const scoped_refptr<Dispatcher>& dispatcher); + // Returns a set of ready dispatchers. |*count| is the maximum number of + // dispatchers to return, and will contain the number of dispatchers returned + // in |dispatchers| on completion. + MojoResult GetReadyDispatchers(UserPointer<uint32_t> count, + DispatcherVector* dispatchers, + UserPointer<MojoResult> results, + UserPointer<uintptr_t> contexts); + // A dispatcher must be put into a special state in order to be sent across a // message pipe. Outside of tests, only |HandleTableAccess| is allowed to do // this, since there are requirements on the handle table (see below). @@ -284,6 +304,18 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher virtual void RemoveAwakableImplNoLock(Awakable* awakable, HandleSignalsState* signals_state) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + virtual MojoResult AddWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& dispatcher, + MojoHandleSignals signals, + uintptr_t context) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + virtual MojoResult RemoveWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& dispatcher) + MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + virtual MojoResult GetReadyDispatchersImplNoLock( + UserPointer<uint32_t> count, + DispatcherVector* dispatchers, + UserPointer<MojoResult> results, + UserPointer<uintptr_t> contexts) MOJO_EXCLUSIVE_LOCKS_REQUIRED(mutex_); // These implement the API used to serialize dispatchers to a |Channel| // (described below). They will only be called on a dispatcher that's attached diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc new file mode 100644 index 0000000..45518e7 --- /dev/null +++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc @@ -0,0 +1,277 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h" + +#include <algorithm> +#include <utility> + +#include "base/logging.h" +#include "third_party/mojo/src/mojo/edk/system/awakable.h" + +namespace mojo { +namespace system { + +class WaitSetDispatcher::Waiter final : public Awakable { + public: + explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {} + ~Waiter() {} + + // |Awakable| implementation. + bool Awake(MojoResult result, uintptr_t context) override { + // Note: This is called with various Mojo locks held. + dispatcher_->WakeDispatcher(result, context); + // Removes |this| from the dispatcher's list of waiters. + return false; + } + + private: + WaitSetDispatcher* const dispatcher_; +}; + +WaitSetDispatcher::WaitSetDispatcher() + : waiter_(new WaitSetDispatcher::Waiter(this)) {} + +WaitSetDispatcher::~WaitSetDispatcher() { + DCHECK(waiting_dispatchers_.empty()); + DCHECK(awoken_queue_.empty()); + DCHECK(processed_dispatchers_.empty()); +} + +Dispatcher::Type WaitSetDispatcher::GetType() const { + return Type::WAIT_SET; +} + +void WaitSetDispatcher::CloseImplNoLock() { + mutex().AssertHeld(); + for (const auto& entry : waiting_dispatchers_) + entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr); + waiting_dispatchers_.clear(); + + MutexLocker locker(&awoken_mutex_); + awoken_queue_.clear(); + processed_dispatchers_.clear(); +} + +MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& dispatcher, + MojoHandleSignals signals, + uintptr_t context) { + mutex().AssertHeld(); + if (dispatcher == this) + return MOJO_RESULT_INVALID_ARGUMENT; + + uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); + auto it = waiting_dispatchers_.find(dispatcher_handle); + if (it != waiting_dispatchers_.end()) { + return MOJO_RESULT_ALREADY_EXISTS; + } + + const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals, + dispatcher_handle, nullptr); + if (result == MOJO_RESULT_INVALID_ARGUMENT) { + // Dispatcher is closed. + return result; + } else if (result != MOJO_RESULT_OK) { + WakeDispatcher(result, dispatcher_handle); + } + + WaitState state; + state.dispatcher = dispatcher; + state.context = context; + state.signals = signals; + bool inserted = + waiting_dispatchers_.insert(std::make_pair(dispatcher_handle, state)) + .second; + DCHECK(inserted); + + return MOJO_RESULT_OK; +} + +MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& dispatcher) { + mutex().AssertHeld(); + uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); + auto it = waiting_dispatchers_.find(dispatcher_handle); + if (it == waiting_dispatchers_.end()) + return MOJO_RESULT_NOT_FOUND; + + dispatcher->RemoveAwakable(waiter_.get(), nullptr); + // At this point, it should not be possible for |waiter_| to be woken with + // |dispatcher|. + waiting_dispatchers_.erase(it); + + MutexLocker locker(&awoken_mutex_); + int num_erased = 0; + for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) { + if (it->first == dispatcher_handle) { + it = awoken_queue_.erase(it); + num_erased++; + } else { + ++it; + } + } + // The dispatcher should only exist in the queue once. + DCHECK_LE(num_erased, 1); + processed_dispatchers_.erase( + std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(), + dispatcher_handle), + processed_dispatchers_.end()); + + return MOJO_RESULT_OK; +} + +MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock( + UserPointer<uint32_t> count, + DispatcherVector* dispatchers, + UserPointer<MojoResult> results, + UserPointer<uintptr_t> contexts) { + mutex().AssertHeld(); + dispatchers->clear(); + + // Re-queue any already retrieved dispatchers. These should be the dispatchers + // that were returned on the last call to this function. This loop is + // necessary to preserve the logically level-triggering behaviour of waiting + // in Mojo. In particular, if no action is taken on a signal, that signal + // continues to be satisfied, and therefore a |MojoWait()| on that + // handle/signal continues to return immediately. + std::deque<uintptr_t> pending; + { + MutexLocker locker(&awoken_mutex_); + pending.swap(processed_dispatchers_); + } + for (uintptr_t d : pending) { + auto it = waiting_dispatchers_.find(d); + // Anything in |processed_dispatchers_| should also be in + // |waiting_dispatchers_| since dispatchers are removed from both in + // |RemoveWaitingDispatcherImplNoLock()|. + DCHECK(it != waiting_dispatchers_.end()); + + // |awoken_mutex_| cannot be held here because + // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This + // mutex is held while running |WakeDispatcher()| below, which needs to + // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in + // a deadlock. + const MojoResult result = it->second.dispatcher->AddAwakable( + waiter_.get(), it->second.signals, d, nullptr); + + if (result == MOJO_RESULT_INVALID_ARGUMENT) { + // Dispatcher is closed. Implicitly remove it from the wait set since + // it may be impossible to remove using |MojoRemoveHandle()|. + waiting_dispatchers_.erase(it); + } else if (result != MOJO_RESULT_OK) { + WakeDispatcher(result, d); + } + } + + const uint32_t max_woken = count.Get(); + uint32_t num_woken = 0; + + MutexLocker locker(&awoken_mutex_); + while (!awoken_queue_.empty() && num_woken < max_woken) { + uintptr_t d = awoken_queue_.front().first; + MojoResult result = awoken_queue_.front().second; + awoken_queue_.pop_front(); + + auto it = waiting_dispatchers_.find(d); + DCHECK(it != waiting_dispatchers_.end()); + + results.At(num_woken).Put(result); + dispatchers->push_back(it->second.dispatcher); + if (!contexts.IsNull()) + contexts.At(num_woken).Put(it->second.context); + + if (result != MOJO_RESULT_CANCELLED) { + processed_dispatchers_.push_back(d); + } else { + waiting_dispatchers_.erase(it); + } + + num_woken++; + } + + count.Put(num_woken); + if (!num_woken) + return MOJO_RESULT_SHOULD_WAIT; + + return MOJO_RESULT_OK; +} + +void WaitSetDispatcher::CancelAllAwakablesNoLock() { + mutex().AssertHeld(); + MutexLocker locker(&awakable_mutex_); + awakable_list_.CancelAll(); +} + +MojoResult WaitSetDispatcher::AddAwakableImplNoLock( + Awakable* awakable, + MojoHandleSignals signals, + uintptr_t context, + HandleSignalsState* signals_state) { + mutex().AssertHeld(); + + HandleSignalsState state(GetHandleSignalsStateImplNoLock()); + if (state.satisfies(signals)) { + if (signals_state) + *signals_state = state; + return MOJO_RESULT_ALREADY_EXISTS; + } + if (!state.can_satisfy(signals)) { + if (signals_state) + *signals_state = state; + return MOJO_RESULT_FAILED_PRECONDITION; + } + + MutexLocker locker(&awakable_mutex_); + awakable_list_.Add(awakable, signals, context); + return MOJO_RESULT_OK; +} + +void WaitSetDispatcher::RemoveAwakableImplNoLock( + Awakable* awakable, + HandleSignalsState* signals_state) { + mutex().AssertHeld(); + MutexLocker locker(&awakable_mutex_); + awakable_list_.Remove(awakable); + if (signals_state) + *signals_state = GetHandleSignalsStateImplNoLock(); +} + +HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const { + mutex().AssertHeld(); + HandleSignalsState rv; + rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; + MutexLocker locker(&awoken_mutex_); + if (!awoken_queue_.empty() || !processed_dispatchers_.empty()) + rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; + return rv; +} + +scoped_refptr<Dispatcher> +WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { + mutex().AssertHeld(); + LOG(ERROR) << "Attempting to serialize WaitSet"; + CloseImplNoLock(); + return new WaitSetDispatcher(); +} + +void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) { + { + MutexLocker locker(&awoken_mutex_); + + if (result == MOJO_RESULT_ALREADY_EXISTS) + result = MOJO_RESULT_OK; + + awoken_queue_.push_back(std::make_pair(context, result)); + } + + MutexLocker locker(&awakable_mutex_); + HandleSignalsState signals_state; + signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; + signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; + awakable_list_.AwakeForStateChange(signals_state); +} + +} // namespace system +} // namespace mojo diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h new file mode 100644 index 0000000..a53f61a --- /dev/null +++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h @@ -0,0 +1,94 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef THIRD_PARTY_MOJO_SRC_MOJO_EDK_SYSTEM_WAIT_SET_DISPATCHER_H_ +#define THIRD_PARTY_MOJO_SRC_MOJO_EDK_SYSTEM_WAIT_SET_DISPATCHER_H_ + +#include <deque> +#include <map> +#include <utility> + +#include "base/memory/scoped_ptr.h" +#include "mojo/public/cpp/system/macros.h" +#include "third_party/mojo/src/mojo/edk/system/awakable_list.h" +#include "third_party/mojo/src/mojo/edk/system/dispatcher.h" +#include "third_party/mojo/src/mojo/edk/system/mutex.h" +#include "third_party/mojo/src/mojo/edk/system/system_impl_export.h" + +namespace mojo { +namespace system { + +class MOJO_SYSTEM_IMPL_EXPORT WaitSetDispatcher : public Dispatcher { + public: + WaitSetDispatcher(); + ~WaitSetDispatcher() override; + + // |Dispatcher| public methods: + Type GetType() const override; + + private: + // Internal implementation of Awakable. + class Waiter; + + struct WaitState { + scoped_refptr<Dispatcher> dispatcher; + MojoHandleSignals signals; + uintptr_t context; + }; + + // |Dispatcher| protected methods: + void CloseImplNoLock() override; + void CancelAllAwakablesNoLock() override; + MojoResult AddAwakableImplNoLock(Awakable* awakable, + MojoHandleSignals signals, + uintptr_t context, + HandleSignalsState* signals_state) override; + void RemoveAwakableImplNoLock(Awakable* awakable, + HandleSignalsState* signals_state) override; + MojoResult AddWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& dispatcher, + MojoHandleSignals signals, + uintptr_t context) override; + MojoResult RemoveWaitingDispatcherImplNoLock( + const scoped_refptr<Dispatcher>& dispatcher) override; + MojoResult GetReadyDispatchersImplNoLock( + UserPointer<uint32_t> count, + DispatcherVector* dispatchers, + UserPointer<MojoResult> results, + UserPointer<uintptr_t> contexts) override; + HandleSignalsState GetHandleSignalsStateImplNoLock() const override; + scoped_refptr<Dispatcher> CreateEquivalentDispatcherAndCloseImplNoLock() + override; + + // Signal that the dispatcher indexed by |context| has been woken up with + // |result| and is now ready. + void WakeDispatcher(MojoResult result, uintptr_t context); + + // Map of dispatchers being waited on. Key is a Dispatcher* casted to a + // uintptr_t, and should be treated as an opaque value and not casted back. + std::map<uintptr_t, WaitState> waiting_dispatchers_ MOJO_GUARDED_BY(mutex()); + + // Separate mutex that can be locked without locking |mutex()|. + mutable Mutex awoken_mutex_ MOJO_ACQUIRED_AFTER(mutex()); + // List of dispatchers that have been woken up. Any dispatcher in this queue + // will NOT currently be waited on. + std::deque<std::pair<uintptr_t, MojoResult>> awoken_queue_ + MOJO_GUARDED_BY(awoken_mutex_); + // List of dispatchers that have been woken up and retrieved on the last call + // to |GetReadyDispatchers()|. + std::deque<uintptr_t> processed_dispatchers_ MOJO_GUARDED_BY(awoken_mutex_); + + // Separate mutex that can be locked without locking |mutex()|. + Mutex awakable_mutex_ MOJO_ACQUIRED_AFTER(mutex()); + // List of dispatchers being waited on. + AwakableList awakable_list_ MOJO_GUARDED_BY(awakable_mutex_); + + // Waiter used to wait on dispatchers. + scoped_ptr<Waiter> waiter_; +}; + +} // namespace system +} // namespace mojo + +#endif // THIRD_PARTY_MOJO_SRC_MOJO_EDK_SYSTEM_WAIT_SET_DISPATCHER_H_ diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc new file mode 100644 index 0000000..86f5e36 --- /dev/null +++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher_unittest.cc @@ -0,0 +1,459 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h" + +#include <algorithm> + +#include "base/memory/ref_counted.h" +#include "mojo/public/cpp/system/macros.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "third_party/mojo/src/mojo/edk/system/message_pipe.h" +#include "third_party/mojo/src/mojo/edk/system/message_pipe_dispatcher.h" +#include "third_party/mojo/src/mojo/edk/system/test_utils.h" +#include "third_party/mojo/src/mojo/edk/system/waiter.h" + +namespace mojo { +namespace system { +namespace { + +class WaitSetDispatcherTest : public ::testing::Test { + public: + WaitSetDispatcherTest() {} + ~WaitSetDispatcherTest() override {} + + void SetUp() override { + dispatcher0_ = MessagePipeDispatcher::Create( + MessagePipeDispatcher::kDefaultCreateOptions); + dispatcher1_ = MessagePipeDispatcher::Create( + MessagePipeDispatcher::kDefaultCreateOptions); + + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); + dispatcher0_->Init(mp, 0); + dispatcher1_->Init(mp, 1); + + dispatchers_to_close_.push_back(dispatcher0_); + dispatchers_to_close_.push_back(dispatcher1_); + } + + void TearDown() override { + for (auto& d : dispatchers_to_close_) + d->Close(); + } + + MojoResult GetOneReadyDispatcher( + const scoped_refptr<WaitSetDispatcher>& wait_set, + scoped_refptr<Dispatcher>* ready_dispatcher, + uintptr_t* context) { + uint32_t count = 1; + MojoResult dispatcher_result = MOJO_RESULT_UNKNOWN; + DispatcherVector dispatchers; + MojoResult result = wait_set->GetReadyDispatchers( + MakeUserPointer(&count), + &dispatchers, + MakeUserPointer(&dispatcher_result), + MakeUserPointer(context)); + if (result == MOJO_RESULT_OK) { + CHECK_EQ(1u, dispatchers.size()); + *ready_dispatcher = dispatchers[0]; + return dispatcher_result; + } + return result; + } + + void CloseOnShutdown(const scoped_refptr<Dispatcher>& dispatcher) { + dispatchers_to_close_.push_back(dispatcher); + } + + protected: + scoped_refptr<MessagePipeDispatcher> dispatcher0_; + scoped_refptr<MessagePipeDispatcher> dispatcher1_; + + DispatcherVector dispatchers_to_close_; + + DISALLOW_COPY_AND_ASSIGN(WaitSetDispatcherTest); +}; + +TEST_F(WaitSetDispatcherTest, Basic) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + CloseOnShutdown(wait_set); + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 1)); + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher1_, + MOJO_HANDLE_SIGNAL_WRITABLE, 2)); + + Waiter w; + uintptr_t context = 0; + w.Init(); + HandleSignalsState hss; + // |dispatcher1_| should already be writable. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); + + scoped_refptr<Dispatcher> woken_dispatcher; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context)); + EXPECT_EQ(dispatcher1_, woken_dispatcher); + EXPECT_EQ(2u, context); + // If a ready dispatcher isn't removed, it will continue to be returned. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + woken_dispatcher = nullptr; + context = 0; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context)); + EXPECT_EQ(dispatcher1_, woken_dispatcher); + EXPECT_EQ(2u, context); + ASSERT_EQ(MOJO_RESULT_OK, wait_set->RemoveWaitingDispatcher(dispatcher1_)); + + // No ready dispatcher. + hss = HandleSignalsState(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_FALSE(hss.satisfies(MOJO_HANDLE_SIGNAL_READABLE)); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); + EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + + // Write to |dispatcher1_|, which should make |dispatcher0_| readable. + char buffer[] = "abcd"; + w.Init(); + ASSERT_EQ(MOJO_RESULT_OK, + dispatcher1_->WriteMessage(UserPointer<const void>(buffer), + sizeof(buffer), nullptr, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr)); + woken_dispatcher = nullptr; + context = 0; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context)); + EXPECT_EQ(dispatcher0_, woken_dispatcher); + EXPECT_EQ(1u, context); + + // Again, if a ready dispatcher isn't removed, it will continue to be + // returned. + woken_dispatcher = nullptr; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + EXPECT_EQ(dispatcher0_, woken_dispatcher); + + wait_set->RemoveAwakable(&w, nullptr); +} + +TEST_F(WaitSetDispatcherTest, HandleWithoutRemoving) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + CloseOnShutdown(wait_set); + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 1)); + + Waiter w; + uintptr_t context = 0; + w.Init(); + HandleSignalsState hss; + // No ready dispatcher. + hss = HandleSignalsState(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_FALSE(hss.satisfies(MOJO_HANDLE_SIGNAL_READABLE)); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); + scoped_refptr<Dispatcher> woken_dispatcher; + EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + + // The tested behaviour below should be repeatable. + for (size_t i = 0; i < 3; i++) { + // Write to |dispatcher1_|, which should make |dispatcher0_| readable. + char buffer[] = "abcd"; + w.Init(); + ASSERT_EQ(MOJO_RESULT_OK, + dispatcher1_->WriteMessage(UserPointer<const void>(buffer), + sizeof(buffer), nullptr, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr)); + woken_dispatcher = nullptr; + context = 0; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context)); + EXPECT_EQ(dispatcher0_, woken_dispatcher); + EXPECT_EQ(1u, context); + + // Read from |dispatcher0_| which should change it's state to non-readable. + char read_buffer[sizeof(buffer) + 5]; + uint32_t num_bytes = sizeof(read_buffer); + ASSERT_EQ(MOJO_RESULT_OK, + dispatcher0_->ReadMessage(UserPointer<void>(read_buffer), + MakeUserPointer(&num_bytes), + nullptr, nullptr, + MOJO_READ_MESSAGE_FLAG_NONE)); + EXPECT_EQ(sizeof(buffer), num_bytes); + + // No dispatchers are ready. + w.Init(); + woken_dispatcher = nullptr; + context = 0; + EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context)); + EXPECT_EQ(nullptr, woken_dispatcher); + EXPECT_EQ(0u, context); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); + } + + wait_set->RemoveAwakable(&w, nullptr); +} + +TEST_F(WaitSetDispatcherTest, MultipleReady) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + CloseOnShutdown(wait_set); + + scoped_refptr<MessagePipeDispatcher> mp1_dispatcher0 = + MessagePipeDispatcher::Create( + MessagePipeDispatcher::kDefaultCreateOptions); + scoped_refptr<MessagePipeDispatcher> mp1_dispatcher1 = + MessagePipeDispatcher::Create( + MessagePipeDispatcher::kDefaultCreateOptions); + CloseOnShutdown(mp1_dispatcher0); + CloseOnShutdown(mp1_dispatcher1); + { + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); + mp1_dispatcher0->Init(mp, 0); + mp1_dispatcher1->Init(mp, 1); + } + + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher1_, + MOJO_HANDLE_SIGNAL_WRITABLE, 0)); + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(mp1_dispatcher0, + MOJO_HANDLE_SIGNAL_WRITABLE, 0)); + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(mp1_dispatcher1, + MOJO_HANDLE_SIGNAL_WRITABLE, 0)); + + Waiter w; + w.Init(); + HandleSignalsState hss; + // The three writable dispatchers should be ready. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); + + scoped_refptr<Dispatcher> woken_dispatcher; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + // Don't know which dispatcher was returned, just that it was one of the + // writable ones. + EXPECT_TRUE(woken_dispatcher == dispatcher1_ || + woken_dispatcher == mp1_dispatcher0 || + woken_dispatcher == mp1_dispatcher1); + + DispatcherVector dispatchers_vector; + uint32_t count = 4; + MojoResult results[4]; + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->GetReadyDispatchers(MakeUserPointer(&count), + &dispatchers_vector, + MakeUserPointer(results), + MakeUserPointer<uintptr_t>(nullptr))); + EXPECT_EQ(3u, count); + std::sort(dispatchers_vector.begin(), dispatchers_vector.end()); + DispatcherVector expected_dispatchers; + expected_dispatchers.push_back(dispatcher1_); + expected_dispatchers.push_back(mp1_dispatcher0); + expected_dispatchers.push_back(mp1_dispatcher1); + std::sort(expected_dispatchers.begin(), expected_dispatchers.end()); + EXPECT_EQ(expected_dispatchers, dispatchers_vector); + + // If a ready dispatcher isn't removed, it will continue to be returned. + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); + count = 4; + dispatchers_vector.clear(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->GetReadyDispatchers(MakeUserPointer(&count), + &dispatchers_vector, + MakeUserPointer(results), + MakeUserPointer<uintptr_t>(nullptr))); + EXPECT_EQ(3u, count); + std::sort(dispatchers_vector.begin(), dispatchers_vector.end()); + EXPECT_EQ(expected_dispatchers, dispatchers_vector); + + // Remove one. It shouldn't be returned any longer. + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->RemoveWaitingDispatcher(expected_dispatchers.back())); + expected_dispatchers.pop_back(); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); + count = 4; + dispatchers_vector.clear(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->GetReadyDispatchers(MakeUserPointer(&count), + &dispatchers_vector, + MakeUserPointer(results), + MakeUserPointer<uintptr_t>(nullptr))); + EXPECT_EQ(2u, count); + std::sort(dispatchers_vector.begin(), dispatchers_vector.end()); + EXPECT_EQ(expected_dispatchers, dispatchers_vector); + + // Write to |dispatcher1_|, which should make |dispatcher0_| readable. + char buffer[] = "abcd"; + w.Init(); + ASSERT_EQ(MOJO_RESULT_OK, + dispatcher1_->WriteMessage(UserPointer<const void>(buffer), + sizeof(buffer), nullptr, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + expected_dispatchers.push_back(dispatcher0_); + std::sort(expected_dispatchers.begin(), expected_dispatchers.end()); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); + count = 4; + dispatchers_vector.clear(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->GetReadyDispatchers(MakeUserPointer(&count), + &dispatchers_vector, + MakeUserPointer(results), + MakeUserPointer<uintptr_t>(nullptr))); + EXPECT_EQ(3u, count); + std::sort(dispatchers_vector.begin(), dispatchers_vector.end()); + EXPECT_EQ(expected_dispatchers, dispatchers_vector); +} + +TEST_F(WaitSetDispatcherTest, InvalidParams) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + + // Can't add a wait set to itself. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + wait_set->AddWaitingDispatcher(wait_set, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + + // Can't add twice. + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + + // Remove a dispatcher that wasn't added. + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + wait_set->RemoveWaitingDispatcher(dispatcher1_)); + + // Add to a closed wait set. + wait_set->Close(); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 0)); +} + +TEST_F(WaitSetDispatcherTest, NotSatisfiable) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + CloseOnShutdown(wait_set); + + // Wait sets can only satisfy MOJO_HANDLE_SIGNAL_READABLE. + Waiter w; + w.Init(); + HandleSignalsState hss; + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_NONE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); + + hss = HandleSignalsState(); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 0, &hss)); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_NONE, hss.satisfied_signals); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); +} + +TEST_F(WaitSetDispatcherTest, ClosedDispatchers) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + CloseOnShutdown(wait_set); + + Waiter w; + w.Init(); + HandleSignalsState hss; + // A dispatcher that was added and then closed will be cancelled. + ASSERT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher0_, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); + dispatcher0_->Close(); + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr)); + EXPECT_TRUE(wait_set->GetHandleSignalsState().satisfies( + MOJO_HANDLE_SIGNAL_READABLE)); + scoped_refptr<Dispatcher> woken_dispatcher; + EXPECT_EQ(MOJO_RESULT_CANCELLED, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + EXPECT_EQ(dispatcher0_, woken_dispatcher); + + // Dispatcher will be implicitly removed because it may be impossible to + // remove explicitly. + woken_dispatcher = nullptr; + EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + EXPECT_EQ(MOJO_RESULT_NOT_FOUND, + wait_set->RemoveWaitingDispatcher(dispatcher0_)); + + // A dispatcher that's not satisfiable should give an error. + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(dispatcher1_, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr)); + EXPECT_TRUE(wait_set->GetHandleSignalsState().satisfies( + MOJO_HANDLE_SIGNAL_READABLE)); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + EXPECT_EQ(dispatcher1_, woken_dispatcher); + + wait_set->RemoveAwakable(&w, nullptr); +} + +TEST_F(WaitSetDispatcherTest, NestedSets) { + scoped_refptr<WaitSetDispatcher> wait_set = new WaitSetDispatcher(); + CloseOnShutdown(wait_set); + scoped_refptr<WaitSetDispatcher> nested_wait_set = new WaitSetDispatcher(); + CloseOnShutdown(nested_wait_set); + + Waiter w; + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddWaitingDispatcher(nested_wait_set, + MOJO_HANDLE_SIGNAL_READABLE, 0)); + EXPECT_EQ(MOJO_RESULT_OK, + wait_set->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, nullptr)); + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); + + // Writable signal is immediately satisfied by the message pipe. + w.Init(); + EXPECT_EQ(MOJO_RESULT_OK, + nested_wait_set->AddWaitingDispatcher( + dispatcher0_, MOJO_HANDLE_SIGNAL_WRITABLE, 0)); + EXPECT_EQ(MOJO_RESULT_OK, w.Wait(0, nullptr)); + scoped_refptr<Dispatcher> woken_dispatcher; + EXPECT_EQ(MOJO_RESULT_OK, + GetOneReadyDispatcher(wait_set, &woken_dispatcher, nullptr)); + EXPECT_EQ(nested_wait_set, woken_dispatcher); + + wait_set->RemoveAwakable(&w, nullptr); +} + +} // namespace +} // namespace system +} // namespace mojo |