diff options
26 files changed, 1127 insertions, 5 deletions
diff --git a/mojo/edk/embedder/entrypoints.cc b/mojo/edk/embedder/entrypoints.cc index 102b25f..23b919d 100644 --- a/mojo/edk/embedder/entrypoints.cc +++ b/mojo/edk/embedder/entrypoints.cc @@ -42,6 +42,17 @@ MojoResult MojoWaitMany(const MojoHandle* handles, signals_states); } +MojoResult MojoWatch(MojoHandle handle, + MojoHandleSignals signals, + MojoWatchCallback callback, + uintptr_t context) { + return g_core->Watch(handle, signals, callback, context); +} + +MojoResult MojoCancelWatch(MojoHandle handle, uintptr_t context) { + return g_core->CancelWatch(handle, context); +} + MojoResult MojoCreateWaitSet(MojoHandle* wait_set_handle) { return g_core->CreateWaitSet(wait_set_handle); } diff --git a/mojo/edk/system/BUILD.gn b/mojo/edk/system/BUILD.gn index 75a097d..b9cac17 100644 --- a/mojo/edk/system/BUILD.gn +++ b/mojo/edk/system/BUILD.gn @@ -64,12 +64,18 @@ component("system") { "ports_message.h", "remote_message_pipe_bootstrap.cc", "remote_message_pipe_bootstrap.h", + "request_context.cc", + "request_context.h", "shared_buffer_dispatcher.cc", "shared_buffer_dispatcher.h", "wait_set_dispatcher.cc", "wait_set_dispatcher.h", "waiter.cc", "waiter.h", + "watcher.cc", + "watcher.h", + "watcher_set.cc", + "watcher_set.h", ] defines = [ @@ -147,6 +153,7 @@ test("mojo_system_unittests") { "waiter_test_utils.cc", "waiter_test_utils.h", "waiter_unittest.cc", + "watch_unittest.cc", ] if (!is_ios) { diff --git a/mojo/edk/system/awakable_list.cc b/mojo/edk/system/awakable_list.cc index 429e691..2045f32 100644 --- a/mojo/edk/system/awakable_list.cc +++ b/mojo/edk/system/awakable_list.cc @@ -39,6 +39,7 @@ void AwakableList::AwakeForStateChange(const HandleSignalsState& state) { } } awakables_.erase(last, awakables_.end()); + watchers_.NotifyForStateChange(state); } void AwakableList::CancelAll() { @@ -47,6 +48,7 @@ void AwakableList::CancelAll() { it->awakable->Awake(MOJO_RESULT_CANCELLED, it->context); } awakables_.clear(); + watchers_.NotifyClosed(); } void AwakableList::Add(Awakable* awakable, @@ -70,5 +72,16 @@ void AwakableList::Remove(Awakable* awakable) { awakables_.erase(last, awakables_.end()); } +MojoResult AwakableList::AddWatcher(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context, + const HandleSignalsState& current_state) { + return watchers_.Add(signals, callback, context, current_state); +} + +MojoResult AwakableList::RemoveWatcher(uintptr_t context) { + return watchers_.Remove(context); +} + } // namespace edk } // namespace mojo diff --git a/mojo/edk/system/awakable_list.h b/mojo/edk/system/awakable_list.h index 4e788ee..a80f5d6 100644 --- a/mojo/edk/system/awakable_list.h +++ b/mojo/edk/system/awakable_list.h @@ -5,11 +5,14 @@ #ifndef MOJO_EDK_SYSTEM_AWAKABLE_LIST_H_ #define MOJO_EDK_SYSTEM_AWAKABLE_LIST_H_ +#include <stddef.h> #include <stdint.h> #include <vector> #include "mojo/edk/system/system_impl_export.h" +#include "mojo/edk/system/watcher.h" +#include "mojo/edk/system/watcher_set.h" #include "mojo/public/c/system/types.h" #include "mojo/public/cpp/system/macros.h" @@ -36,6 +39,13 @@ class MOJO_SYSTEM_IMPL_EXPORT AwakableList { void Add(Awakable* awakable, MojoHandleSignals signals, uintptr_t context); void Remove(Awakable* awakable); + // Add and remove Watchers to this AwakableList. + MojoResult AddWatcher(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context, + const HandleSignalsState& current_state); + MojoResult RemoveWatcher(uintptr_t context); + private: struct AwakeInfo { AwakeInfo(Awakable* awakable, MojoHandleSignals signals, uintptr_t context) @@ -49,6 +59,10 @@ class MOJO_SYSTEM_IMPL_EXPORT AwakableList { AwakeInfoList awakables_; + // TODO: Remove AwakableList and instead use WatcherSet directly in + // dispatchers. + WatcherSet watchers_; + MOJO_DISALLOW_COPY_AND_ASSIGN(AwakableList); }; diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc index e0f4131..a54e8b1 100644 --- a/mojo/edk/system/core.cc +++ b/mojo/edk/system/core.cc @@ -31,6 +31,7 @@ #include "mojo/edk/system/platform_handle_dispatcher.h" #include "mojo/edk/system/ports/node.h" #include "mojo/edk/system/remote_message_pipe_bootstrap.h" +#include "mojo/edk/system/request_context.h" #include "mojo/edk/system/shared_buffer_dispatcher.h" #include "mojo/edk/system/wait_set_dispatcher.h" #include "mojo/edk/system/waiter.h" @@ -47,6 +48,14 @@ const uint32_t kMaxHandlesPerMessage = 1024 * 1024; // too; for now we just use a constant. This only affects bootstrap pipes. const uint64_t kUnknownPipeIdForDebug = 0x7f7f7f7f7f7f7f7fUL; +void CallWatchCallback(MojoWatchCallback callback, + uintptr_t context, + MojoResult result, + const HandleSignalsState& signals_state) { + callback(context, result, + static_cast<MojoHandleSignalsState>(signals_state)); +} + } // namespace Core::Core() {} @@ -268,6 +277,7 @@ MojoTimeTicks Core::GetTimeTicksNow() { } MojoResult Core::Close(MojoHandle handle) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher; { base::AutoLock lock(handles_lock_); @@ -283,6 +293,7 @@ MojoResult Core::Wait(MojoHandle handle, MojoHandleSignals signals, MojoDeadline deadline, MojoHandleSignalsState* signals_state) { + RequestContext request_context; uint32_t unused = static_cast<uint32_t>(-1); HandleSignalsState hss; MojoResult rv = WaitManyInternal(&handle, &signals, 1, deadline, &unused, @@ -298,6 +309,7 @@ MojoResult Core::WaitMany(const MojoHandle* handles, MojoDeadline deadline, uint32_t* result_index, MojoHandleSignalsState* signals_state) { + RequestContext request_context; if (num_handles < 1) return MOJO_RESULT_INVALID_ARGUMENT; if (num_handles > GetConfiguration().max_wait_many_num_handles) @@ -319,7 +331,28 @@ MojoResult Core::WaitMany(const MojoHandle* handles, return rv; } +MojoResult Core::Watch(MojoHandle handle, + MojoHandleSignals signals, + MojoWatchCallback callback, + uintptr_t context) { + RequestContext request_context; + scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle); + if (!dispatcher) + return MOJO_RESULT_INVALID_ARGUMENT; + return dispatcher->Watch( + signals, base::Bind(&CallWatchCallback, callback, context), context); +} + +MojoResult Core::CancelWatch(MojoHandle handle, uintptr_t context) { + RequestContext request_context; + scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle); + if (!dispatcher) + return MOJO_RESULT_INVALID_ARGUMENT; + return dispatcher->CancelWatch(context); +} + MojoResult Core::CreateWaitSet(MojoHandle* wait_set_handle) { + RequestContext request_context; if (!wait_set_handle) return MOJO_RESULT_INVALID_ARGUMENT; @@ -338,6 +371,7 @@ MojoResult Core::CreateWaitSet(MojoHandle* wait_set_handle) { MojoResult Core::AddHandle(MojoHandle wait_set_handle, MojoHandle handle, MojoHandleSignals signals) { + RequestContext request_context; scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle)); if (!wait_set_dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; @@ -351,6 +385,7 @@ MojoResult Core::AddHandle(MojoHandle wait_set_handle, MojoResult Core::RemoveHandle(MojoHandle wait_set_handle, MojoHandle handle) { + RequestContext request_context; scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle)); if (!wait_set_dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; @@ -367,6 +402,7 @@ MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle, MojoHandle* handles, MojoResult* results, MojoHandleSignalsState* signals_states) { + RequestContext request_context; if (!handles || !count || !(*count) || !results) return MOJO_RESULT_INVALID_ARGUMENT; @@ -396,6 +432,7 @@ MojoResult Core::CreateMessagePipe( const MojoCreateMessagePipeOptions* options, MojoHandle* message_pipe_handle0, MojoHandle* message_pipe_handle1) { + RequestContext request_context; ports::PortRef port0, port1; GetNodeController()->node()->CreatePortPair(&port0, &port1); @@ -427,6 +464,7 @@ MojoResult Core::WriteMessage(MojoHandle message_pipe_handle, const MojoHandle* handles, uint32_t num_handles, MojoWriteMessageFlags flags) { + RequestContext request_context; auto dispatcher = GetDispatcher(message_pipe_handle); if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; @@ -476,6 +514,7 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, MojoHandle* handles, uint32_t* num_handles, MojoReadMessageFlags flags) { + RequestContext request_context; CHECK((!num_handles || !*num_handles || handles) && (!num_bytes || !*num_bytes || bytes)); auto dispatcher = GetDispatcher(message_pipe_handle); @@ -488,6 +527,7 @@ MojoResult Core::CreateDataPipe( const MojoCreateDataPipeOptions* options, MojoHandle* data_pipe_producer_handle, MojoHandle* data_pipe_consumer_handle) { + RequestContext request_context; if (options && options->struct_size != sizeof(MojoCreateDataPipeOptions)) return MOJO_RESULT_INVALID_ARGUMENT; @@ -542,6 +582,7 @@ MojoResult Core::WriteData(MojoHandle data_pipe_producer_handle, const void* elements, uint32_t* num_bytes, MojoWriteDataFlags flags) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher( GetDispatcher(data_pipe_producer_handle)); if (!dispatcher) @@ -554,6 +595,7 @@ MojoResult Core::BeginWriteData(MojoHandle data_pipe_producer_handle, void** buffer, uint32_t* buffer_num_bytes, MojoWriteDataFlags flags) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher( GetDispatcher(data_pipe_producer_handle)); if (!dispatcher) @@ -564,6 +606,7 @@ MojoResult Core::BeginWriteData(MojoHandle data_pipe_producer_handle, MojoResult Core::EndWriteData(MojoHandle data_pipe_producer_handle, uint32_t num_bytes_written) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher( GetDispatcher(data_pipe_producer_handle)); if (!dispatcher) @@ -576,6 +619,7 @@ MojoResult Core::ReadData(MojoHandle data_pipe_consumer_handle, void* elements, uint32_t* num_bytes, MojoReadDataFlags flags) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher( GetDispatcher(data_pipe_consumer_handle)); if (!dispatcher) @@ -588,6 +632,7 @@ MojoResult Core::BeginReadData(MojoHandle data_pipe_consumer_handle, const void** buffer, uint32_t* buffer_num_bytes, MojoReadDataFlags flags) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher( GetDispatcher(data_pipe_consumer_handle)); if (!dispatcher) @@ -598,6 +643,7 @@ MojoResult Core::BeginReadData(MojoHandle data_pipe_consumer_handle, MojoResult Core::EndReadData(MojoHandle data_pipe_consumer_handle, uint32_t num_bytes_read) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher( GetDispatcher(data_pipe_consumer_handle)); if (!dispatcher) @@ -610,6 +656,7 @@ MojoResult Core::CreateSharedBuffer( const MojoCreateSharedBufferOptions* options, uint64_t num_bytes, MojoHandle* shared_buffer_handle) { + RequestContext request_context; MojoCreateSharedBufferOptions validated_options = {}; MojoResult result = SharedBufferDispatcher::ValidateCreateOptions( options, &validated_options); @@ -638,6 +685,7 @@ MojoResult Core::DuplicateBufferHandle( MojoHandle buffer_handle, const MojoDuplicateBufferHandleOptions* options, MojoHandle* new_buffer_handle) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher(GetDispatcher(buffer_handle)); if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; @@ -664,6 +712,7 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle, uint64_t num_bytes, void** buffer, MojoMapBufferFlags flags) { + RequestContext request_context; scoped_refptr<Dispatcher> dispatcher(GetDispatcher(buffer_handle)); if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; @@ -687,6 +736,7 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle, } MojoResult Core::UnmapBuffer(void* buffer) { + RequestContext request_context; base::AutoLock lock(mapping_table_lock_); return mapping_table_.RemoveMapping(buffer); } diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h index e2a22b0..65b6933 100644 --- a/mojo/edk/system/core.h +++ b/mojo/edk/system/core.h @@ -132,6 +132,11 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { MojoDeadline deadline, uint32_t* result_index, MojoHandleSignalsState* signals_states); + MojoResult Watch(MojoHandle handle, + MojoHandleSignals signals, + MojoWatchCallback callback, + uintptr_t context); + MojoResult CancelWatch(MojoHandle handle, uintptr_t context); // These methods correspond to the API functions defined in // "mojo/public/c/system/wait_set.h": diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc index 1f6f079..269f8b2 100644 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc @@ -21,6 +21,7 @@ #include "mojo/edk/system/data_pipe_control_message.h" #include "mojo/edk/system/node_controller.h" #include "mojo/edk/system/ports_message.h" +#include "mojo/edk/system/request_context.h" #include "mojo/public/c/system/data_pipe.h" namespace mojo { @@ -86,6 +87,29 @@ MojoResult DataPipeConsumerDispatcher::Close() { return CloseNoLock(); } + +MojoResult DataPipeConsumerDispatcher::Watch( + MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) { + base::AutoLock lock(lock_); + + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return awakable_list_.AddWatcher( + signals, callback, context, GetHandleSignalsStateNoLock()); +} + +MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { + base::AutoLock lock(lock_); + + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return awakable_list_.RemoveWatcher(context); +} + MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, uint32_t* num_bytes, MojoReadDataFlags flags) { @@ -474,6 +498,8 @@ void DataPipeConsumerDispatcher::OnPortStatusChanged() { } void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { + RequestContext request_context; + lock_.AssertAcquired(); bool was_peer_closed = peer_closed_; diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h index c75833b..945aa07 100644 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.h +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h @@ -43,6 +43,10 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final // Dispatcher: Type GetType() const override; MojoResult Close() override; + MojoResult Watch(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) override; + MojoResult CancelWatch(uintptr_t context) override; MojoResult ReadData(void* elements, uint32_t* num_bytes, MojoReadDataFlags flags) override; diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc index cbbade8..634fb68 100644 --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc @@ -20,6 +20,8 @@ #include "mojo/edk/system/data_pipe_control_message.h" #include "mojo/edk/system/node_controller.h" #include "mojo/edk/system/ports_message.h" +#include "mojo/edk/system/request_context.h" +#include "mojo/public/c/system/data_pipe.h" namespace mojo { namespace edk { @@ -85,6 +87,28 @@ MojoResult DataPipeProducerDispatcher::Close() { return CloseNoLock(); } +MojoResult DataPipeProducerDispatcher::Watch( + MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) { + base::AutoLock lock(lock_); + + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return awakable_list_.AddWatcher( + signals, callback, context, GetHandleSignalsStateNoLock()); +} + +MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { + base::AutoLock lock(lock_); + + if (is_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return awakable_list_.RemoveWatcher(context); +} + MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, uint32_t* num_bytes, MojoWriteDataFlags flags) { @@ -452,6 +476,8 @@ void DataPipeProducerDispatcher::OnPortStatusChanged() { } void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { + RequestContext request_context; + lock_.AssertAcquired(); bool was_peer_closed = peer_closed_; diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h index c1020b0..cfdeb96 100644 --- a/mojo/edk/system/data_pipe_producer_dispatcher.h +++ b/mojo/edk/system/data_pipe_producer_dispatcher.h @@ -42,6 +42,10 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeProducerDispatcher final // Dispatcher: Type GetType() const override; MojoResult Close() override; + MojoResult Watch(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) override; + MojoResult CancelWatch(uintptr_t context) override; MojoResult WriteData(const void* elements, uint32_t* num_bytes, MojoReadDataFlags flags) override; diff --git a/mojo/edk/system/dispatcher.cc b/mojo/edk/system/dispatcher.cc index 8a2e15e..6705542 100644 --- a/mojo/edk/system/dispatcher.cc +++ b/mojo/edk/system/dispatcher.cc @@ -22,6 +22,16 @@ Dispatcher::DispatcherInTransit::DispatcherInTransit( Dispatcher::DispatcherInTransit::~DispatcherInTransit() {} +MojoResult Dispatcher::Watch(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) { + return MOJO_RESULT_INVALID_ARGUMENT; +} + +MojoResult Dispatcher::CancelWatch(uintptr_t context) { + return MOJO_RESULT_INVALID_ARGUMENT; +} + MojoResult Dispatcher::WriteMessage(const void* bytes, uint32_t num_bytes, const DispatcherInTransit* dispatchers, diff --git a/mojo/edk/system/dispatcher.h b/mojo/edk/system/dispatcher.h index 89876eb..e8bcf33 100644 --- a/mojo/edk/system/dispatcher.h +++ b/mojo/edk/system/dispatcher.h @@ -20,6 +20,7 @@ #include "mojo/edk/system/handle_signals_state.h" #include "mojo/edk/system/ports/name.h" #include "mojo/edk/system/system_impl_export.h" +#include "mojo/edk/system/watcher.h" #include "mojo/public/c/system/buffer.h" #include "mojo/public/c/system/data_pipe.h" #include "mojo/public/c/system/message_pipe.h" @@ -65,6 +66,14 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher virtual Type GetType() const = 0; virtual MojoResult Close() = 0; + ///////////// Watch API //////////////////// + + virtual MojoResult Watch(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context); + + virtual MojoResult CancelWatch(uintptr_t context); + ///////////// Message pipe API ///////////// virtual MojoResult WriteMessage(const void* bytes, diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc index c7b4a32..ce35f64 100644 --- a/mojo/edk/system/message_pipe_dispatcher.cc +++ b/mojo/edk/system/message_pipe_dispatcher.cc @@ -13,6 +13,7 @@ #include "mojo/edk/system/core.h" #include "mojo/edk/system/node_controller.h" #include "mojo/edk/system/ports_message.h" +#include "mojo/edk/system/request_context.h" #include "mojo/public/c/system/macros.h" namespace mojo { @@ -97,6 +98,27 @@ MojoResult MessagePipeDispatcher::Close() { return CloseNoLock(); } +MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) { + base::AutoLock lock(signal_lock_); + + if (port_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return awakables_.AddWatcher( + signals, callback, context, GetHandleSignalsStateNoLock()); +} + +MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { + base::AutoLock lock(signal_lock_); + + if (port_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; + + return awakables_.RemoveWatcher(context); +} + MojoResult MessagePipeDispatcher::WriteMessage( const void* bytes, uint32_t num_bytes, @@ -579,6 +601,8 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { } void MessagePipeDispatcher::OnPortStatusChanged() { + RequestContext request_context; + base::AutoLock lock(signal_lock_); // We stop observing our port as soon as it's transferred, but this can race diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h index f898ea8..7400515 100644 --- a/mojo/edk/system/message_pipe_dispatcher.h +++ b/mojo/edk/system/message_pipe_dispatcher.h @@ -42,6 +42,10 @@ class MessagePipeDispatcher : public Dispatcher { // Dispatcher: Type GetType() const override; MojoResult Close() override; + MojoResult Watch(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context) override; + MojoResult CancelWatch(uintptr_t context) override; MojoResult WriteMessage(const void* bytes, uint32_t num_bytes, const DispatcherInTransit* dispatchers, diff --git a/mojo/edk/system/request_context.cc b/mojo/edk/system/request_context.cc new file mode 100644 index 0000000..4b7e011 --- /dev/null +++ b/mojo/edk/system/request_context.cc @@ -0,0 +1,77 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/request_context.h" + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/threading/thread_local.h" + +namespace mojo { +namespace edk { + +namespace { + +base::LazyInstance<base::ThreadLocalPointer<RequestContext>>::Leaky + g_current_context; + +} // namespace + +RequestContext::RequestContext() { + // We allow nested RequestContexts to exist as long as they aren't actually + // used for anything. + if (!g_current_context.Pointer()->Get()) + g_current_context.Pointer()->Set(this); +} + +RequestContext::~RequestContext() { + // NOTE: Callbacks invoked by this destructor are allowed to initiate new + // EDK requests on this thread, so we need to reset the thread-local context + // pointer before calling them. + if (IsCurrent()) + g_current_context.Pointer()->Set(nullptr); + + for (const WatchNotifyFinalizer& watch : watch_notify_finalizers_.container()) + watch.watcher->MaybeInvokeCallback(watch.result, watch.state); + + for (const scoped_refptr<Watcher>& watcher : + watch_cancel_finalizers_.container()) + watcher->Cancel(); +} + +// static +RequestContext* RequestContext::current() { + DCHECK(g_current_context.Pointer()->Get()); + return g_current_context.Pointer()->Get(); +} + +void RequestContext::AddWatchNotifyFinalizer( + scoped_refptr<Watcher> watcher, + MojoResult result, + const HandleSignalsState& state) { + DCHECK(IsCurrent()); + watch_notify_finalizers_->push_back( + WatchNotifyFinalizer(watcher, result, state)); +} + +void RequestContext::AddWatchCancelFinalizer(scoped_refptr<Watcher> watcher) { + DCHECK(IsCurrent()); + watch_cancel_finalizers_->push_back(watcher); +} + +bool RequestContext::IsCurrent() const { + return g_current_context.Pointer()->Get() == this; +} + +RequestContext::WatchNotifyFinalizer::WatchNotifyFinalizer( + scoped_refptr<Watcher> watcher, + MojoResult result, + const HandleSignalsState& state) + : watcher(watcher), result(result), state(state) { +} + +RequestContext::WatchNotifyFinalizer::~WatchNotifyFinalizer() {} + +} // namespace edk +} // namespace mojo diff --git a/mojo/edk/system/request_context.h b/mojo/edk/system/request_context.h new file mode 100644 index 0000000..75ba61f --- /dev/null +++ b/mojo/edk/system/request_context.h @@ -0,0 +1,82 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_EDK_SYSTEM_REQUEST_CONTEXT_H_ +#define MOJO_EDK_SYSTEM_REQUEST_CONTEXT_H_ + +#include "base/containers/stack_container.h" +#include "base/macros.h" +#include "mojo/edk/system/handle_signals_state.h" +#include "mojo/edk/system/watcher.h" + +namespace mojo { +namespace edk { + +// A RequestContext is a thread-local object which exists for the duration of +// a single system API call. It is constructed immediately upon EDK entry and +// destructed immediately before returning to the caller, after any internal +// locks have been released. +// +// NOTE: It is legal to construct a RequestContext while another one already +// exists on the current thread, but it is not safe to use the nested context +// for any reason. Therefore it is important to always use +// |RequestContext::current()| rather than referring to any local instance +// directly. +class RequestContext { + public: + RequestContext(); + ~RequestContext(); + + // Returns the current thread-local RequestContext. + static RequestContext* current(); + + // Adds a finalizer to this RequestContext corresponding to a watch callback + // which should be triggered in response to some handle state change. If + // the Watcher hasn't been cancelled by the time this RequestContext is + // destroyed, its WatchCallback will be invoked with |result| and |state| + // arguments. + void AddWatchNotifyFinalizer(scoped_refptr<Watcher> watcher, + MojoResult result, + const HandleSignalsState& state); + + // Adds a finalizer to this RequestContext which cancels a watch. + void AddWatchCancelFinalizer(scoped_refptr<Watcher> watcher); + + private: + // Is this request context the current one? + bool IsCurrent() const; + + struct WatchNotifyFinalizer { + WatchNotifyFinalizer(scoped_refptr<Watcher> watcher, + MojoResult result, + const HandleSignalsState& state); + ~WatchNotifyFinalizer(); + + scoped_refptr<Watcher> watcher; + MojoResult result; + HandleSignalsState state; + }; + + // Chosen by fair dice roll. + // + // TODO: We should measure the distribution of # of finalizers typical to + // any RequestContext and adjust this number accordingly. It's probably + // almost always 1, but 4 seems like a harmless upper bound for now. + static const size_t kStaticWatchFinalizersCapacity = 4; + + using WatchNotifyFinalizerList = + base::StackVector<WatchNotifyFinalizer, kStaticWatchFinalizersCapacity>; + using WatchCancelFinalizerList = + base::StackVector<scoped_refptr<Watcher>, kStaticWatchFinalizersCapacity>; + + WatchNotifyFinalizerList watch_notify_finalizers_; + WatchCancelFinalizerList watch_cancel_finalizers_; + + DISALLOW_COPY_AND_ASSIGN(RequestContext); +}; + +} // namespace edk +} // namespace mojo + +#endif // MOJO_EDK_SYSTEM_REQUEST_CONTEXT_H_ diff --git a/mojo/edk/system/watch_unittest.cc b/mojo/edk/system/watch_unittest.cc new file mode 100644 index 0000000..fd0c13d --- /dev/null +++ b/mojo/edk/system/watch_unittest.cc @@ -0,0 +1,374 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <functional> + +#include "base/macros.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "mojo/edk/test/mojo_test_base.h" +#include "mojo/public/c/system/functions.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace mojo { +namespace edk { +namespace { + +void IgnoreResult(uintptr_t context, + MojoResult result, + MojoHandleSignalsState signals) { +} + +// A test helper class for watching a handle. The WatchHelper instance is used +// as a watch context for a single watch callback. +class WatchHelper { + public: + using Callback = + std::function<void(MojoResult result, MojoHandleSignalsState state)>; + + WatchHelper() {} + ~WatchHelper() { + CHECK(!watching_); + } + + void Watch(MojoHandle handle, + MojoHandleSignals signals, + const Callback& callback) { + CHECK(!watching_); + + handle_ = handle; + callback_ = callback; + watching_ = true; + CHECK_EQ(MOJO_RESULT_OK, MojoWatch(handle_, signals, &WatchHelper::OnNotify, + reinterpret_cast<uintptr_t>(this))); + } + + bool is_watching() const { return watching_; } + + void Cancel() { + CHECK_EQ(MOJO_RESULT_OK, + MojoCancelWatch(handle_, reinterpret_cast<uintptr_t>(this))); + CHECK(watching_); + watching_ = false; + } + + private: + static void OnNotify(uintptr_t context, + MojoResult result, + MojoHandleSignalsState state) { + WatchHelper* watcher = reinterpret_cast<WatchHelper*>(context); + CHECK(watcher->watching_); + if (result == MOJO_RESULT_CANCELLED) + watcher->watching_ = false; + watcher->callback_(result, state); + } + + bool watching_ = false; + MojoHandle handle_; + Callback callback_; + + DISALLOW_COPY_AND_ASSIGN(WatchHelper); +}; + +class WatchTest : public test::MojoTestBase { + public: + WatchTest() {} + ~WatchTest() override {} + + protected: + + private: + base::MessageLoop message_loop_; + + DISALLOW_COPY_AND_ASSIGN(WatchTest); +}; + +TEST_F(WatchTest, NotifyBasic) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + base::RunLoop loop; + WatchHelper b_watcher; + b_watcher.Watch( + b, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_TRUE(b_watcher.is_watching()); + loop.Quit(); + }); + + WriteMessage(a, "Hello!"); + loop.Run(); + + EXPECT_TRUE(b_watcher.is_watching()); + b_watcher.Cancel(); + + CloseHandle(a); + CloseHandle(b); +} + +TEST_F(WatchTest, NotifyUnsatisfiable) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + base::RunLoop loop; + WatchHelper b_watcher; + b_watcher.Watch( + b, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); + EXPECT_EQ(0u, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_EQ(0u, + state.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_TRUE(b_watcher.is_watching()); + loop.Quit(); + }); + + CloseHandle(a); + loop.Run(); + + b_watcher.Cancel(); + + CloseHandle(b); +} + +TEST_F(WatchTest, NotifyCancellation) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + base::RunLoop loop; + WatchHelper b_watcher; + b_watcher.Watch( + b, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_CANCELLED, result); + EXPECT_EQ(0u, state.satisfied_signals); + EXPECT_EQ(0u, state.satisfiable_signals); + EXPECT_FALSE(b_watcher.is_watching()); + loop.Quit(); + }); + + CloseHandle(b); + loop.Run(); + + CloseHandle(a); +} + +TEST_F(WatchTest, InvalidArguemnts) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + uintptr_t context = reinterpret_cast<uintptr_t>(this); + EXPECT_EQ(MOJO_RESULT_OK, MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, + &IgnoreResult, context)); + + // Can't cancel a watch that doesn't exist. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoCancelWatch(a, ~context)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoCancelWatch(b, context)); + + CloseHandle(a); + CloseHandle(b); + + // Can't watch a handle that doesn't exist. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + MojoWatch(b, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context)); +} + +TEST_F(WatchTest, NoDuplicateContext) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + // Try to add the same watch twice; should fail. + uintptr_t context = reinterpret_cast<uintptr_t>(this); + EXPECT_EQ(MOJO_RESULT_OK, MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, + &IgnoreResult, context)); + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, + MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context)); + + // Cancel and add it again; should be OK. + EXPECT_EQ(MOJO_RESULT_OK, MojoCancelWatch(a, context)); + EXPECT_EQ(MOJO_RESULT_OK, MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, + &IgnoreResult, context)); + + CloseHandle(a); + CloseHandle(b); +} + +TEST_F(WatchTest, MultipleWatches) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + // Add multiple watchers to |b| and see that they are both notified by a + // single write to |a|. + base::RunLoop loop; + int expected_notifications = 2; + auto on_readable = [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_GT(expected_notifications, 0); + if (--expected_notifications == 0) + loop.Quit(); + }; + WatchHelper watcher1; + WatchHelper watcher2; + watcher1.Watch(b, MOJO_HANDLE_SIGNAL_READABLE, on_readable); + watcher2.Watch(b, MOJO_HANDLE_SIGNAL_READABLE, on_readable); + + WriteMessage(a, "Ping!"); + loop.Run(); + + watcher1.Cancel(); + watcher2.Cancel(); + + CloseHandle(a); + CloseHandle(b); +} + +TEST_F(WatchTest, WatchWhileSatisfied) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + // Write to |a| and then start watching |b|. The callback should be invoked + // synchronously. + WriteMessage(a, "hey"); + bool signaled = false; + WatchHelper b_watcher; + b_watcher.Watch( + b, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + signaled = true; + }); + EXPECT_TRUE(signaled); + b_watcher.Cancel(); + + CloseHandle(a); + CloseHandle(b); +} + +TEST_F(WatchTest, WatchWhileUnsatisfiable) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + // Close |a| and then try to watch |b|. MojoWatch() should fail. + CloseHandle(a); + uintptr_t context = reinterpret_cast<uintptr_t>(this); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, + MojoWatch(b, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context)); + + CloseHandle(b); +} + +TEST_F(WatchTest, RespondFromCallback) { + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + // Watch |a| and |b|. Write to |a|, then write to |b| from within the callback + // which notifies it of the available message. + const std::string kTestMessage = "hello worlds."; + base::RunLoop loop; + WatchHelper b_watcher; + b_watcher.Watch( + b, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_TRUE(b_watcher.is_watching()); + + // Echo a's message back to it. + WriteMessage(b, ReadMessage(b)); + }); + + WatchHelper a_watcher; + a_watcher.Watch( + a, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_TRUE(a_watcher.is_watching()); + + // Expect to receive back the message that was originally sent to |b|. + EXPECT_EQ(kTestMessage, ReadMessage(a)); + + loop.Quit(); + }); + + WriteMessage(a, kTestMessage); + loop.Run(); + + a_watcher.Cancel(); + b_watcher.Cancel(); + + CloseHandle(a); + CloseHandle(b); +} + +TEST_F(WatchTest, WatchDataPipeConsumer) { + MojoHandle a, b; + CreateDataPipe(&a, &b, 64); + + base::RunLoop loop; + WatchHelper b_watcher; + b_watcher.Watch( + b, MOJO_HANDLE_SIGNAL_READABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE); + EXPECT_TRUE(b_watcher.is_watching()); + loop.Quit(); + }); + + WriteData(a, "Hello!"); + loop.Run(); + + EXPECT_TRUE(b_watcher.is_watching()); + b_watcher.Cancel(); + + CloseHandle(a); + CloseHandle(b); +} + +TEST_F(WatchTest, WatchDataPipeProducer) { + MojoHandle a, b; + CreateDataPipe(&a, &b, 8); + + // Fill the pipe to capacity so writes will block. + WriteData(a, "xxxxxxxx"); + + base::RunLoop loop; + WatchHelper a_watcher; + a_watcher.Watch( + a, MOJO_HANDLE_SIGNAL_WRITABLE, + [&] (MojoResult result, MojoHandleSignalsState state) { + EXPECT_EQ(MOJO_RESULT_OK, result); + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, + state.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE); + EXPECT_TRUE(a_watcher.is_watching()); + loop.Quit(); + }); + + EXPECT_EQ("xxxxxxxx", ReadData(b, 8)); + loop.Run(); + + EXPECT_TRUE(a_watcher.is_watching()); + a_watcher.Cancel(); + + CloseHandle(a); + CloseHandle(b); +} + +} // namespace +} // namespace edk +} // namespace mojo diff --git a/mojo/edk/system/watcher.cc b/mojo/edk/system/watcher.cc new file mode 100644 index 0000000..4bc9dbb --- /dev/null +++ b/mojo/edk/system/watcher.cc @@ -0,0 +1,52 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/watcher.h" + +#include "mojo/edk/system/handle_signals_state.h" +#include "mojo/edk/system/request_context.h" + +namespace mojo { +namespace edk { + +Watcher::Watcher(MojoHandleSignals signals, const WatchCallback& callback) + : signals_(signals), callback_(callback) { +} + +void Watcher::MaybeInvokeCallback(MojoResult result, + const HandleSignalsState& state) { + base::AutoLock lock(lock_); + if (is_cancelled_) + return; + + callback_.Run(result, state); +} + +void Watcher::NotifyForStateChange(const HandleSignalsState& signals_state) { + RequestContext* request_context = RequestContext::current(); + if (signals_state.satisfies(signals_)) { + request_context->AddWatchNotifyFinalizer( + make_scoped_refptr(this), MOJO_RESULT_OK, signals_state); + } else if (!signals_state.can_satisfy(signals_)) { + request_context->AddWatchNotifyFinalizer(make_scoped_refptr(this), + MOJO_RESULT_FAILED_PRECONDITION, + signals_state); + } +} + +void Watcher::NotifyClosed() { + static const HandleSignalsState closed_state = {0, 0}; + RequestContext::current()->AddWatchNotifyFinalizer( + make_scoped_refptr(this), MOJO_RESULT_CANCELLED, closed_state); +} + +void Watcher::Cancel() { + base::AutoLock lock(lock_); + is_cancelled_ = true; +} + +Watcher::~Watcher() {} + +} // namespace edk +} // namespace mojo diff --git a/mojo/edk/system/watcher.h b/mojo/edk/system/watcher.h new file mode 100644 index 0000000..119355e --- /dev/null +++ b/mojo/edk/system/watcher.h @@ -0,0 +1,85 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_EDK_SYSTEM_WATCHER_H_ +#define MOJO_EDK_SYSTEM_WATCHER_H_ + +#include "base/callback.h" +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/synchronization/lock.h" +#include "mojo/public/c/system/functions.h" +#include "mojo/public/c/system/types.h" + +namespace mojo { +namespace edk { + +struct HandleSignalsState; + +// This object corresponds to a watch added by a single call to |MojoWatch()|. +// +// An event may occur at any time which should trigger a Watcher to run its +// callback, but the callback needs to be deferred until all EDK locks are +// released. At the same time, a watch may be cancelled at any time by +// |MojoCancelWatch()| and it is not OK for the callback to be invoked after +// that happens. +// +// Therefore a Watcher needs to have some associated thread-safe state to track +// its cancellation, which is why it's ref-counted. +class Watcher : public base::RefCountedThreadSafe<Watcher> { + public: + using WatchCallback = + base::Callback<void(MojoResult, const HandleSignalsState&)>; + + // Constructs a new Watcher which watches for |signals| to be satisfied on a + // handle and which invokes |callback| either when one such signal is + // satisfied, or all such signals become unsatisfiable. + Watcher(MojoHandleSignals signals, const WatchCallback& callback); + + // Runs the Watcher's callback with the given arguments if it hasn't been + // cancelled yet. + void MaybeInvokeCallback(MojoResult result, const HandleSignalsState& state); + + // Notifies the Watcher of a state change. This may result in the Watcher + // adding a finalizer to the current RequestContext to invoke its callback, + // cancellation notwithstanding. + void NotifyForStateChange(const HandleSignalsState& signals_state); + + // Notifies the Watcher of handle closure. This always results in the Watcher + // adding a finalizer to the current RequestContext to invoke its callback, + // cancellation notwithstanding. + void NotifyClosed(); + + // Explicitly cancels the watch, guaranteeing that its callback will never be + // be invoked again. + void Cancel(); + + private: + friend class base::RefCountedThreadSafe<Watcher>; + + ~Watcher(); + + // The set of signals which are watched by this Watcher. + const MojoHandleSignals signals_; + + // The callback to invoke with a result and signal state any time signals in + // |signals_| are satisfied or become permanently unsatisfiable. + const WatchCallback callback_; + + // Guards |is_cancelled_|. + base::Lock lock_; + + // Indicates whether the watch has been cancelled. A |Watcher| may exist for a + // brief period of time after being cancelled if it's been attached as a + // RequestContext finalizer. In such cases the callback must not be invoked, + // hence this flag. + bool is_cancelled_ = false; + + DISALLOW_COPY_AND_ASSIGN(Watcher); +}; + +} // namespace edk +} // namespace mojo + +#endif // MOJO_EDK_SYSTEM_WATCHER_H_ diff --git a/mojo/edk/system/watcher_set.cc b/mojo/edk/system/watcher_set.cc new file mode 100644 index 0000000..878f29a --- /dev/null +++ b/mojo/edk/system/watcher_set.cc @@ -0,0 +1,57 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/watcher_set.h" + +#include "mojo/edk/system/request_context.h" +#include "mojo/public/c/system/types.h" + +namespace mojo { +namespace edk { + +WatcherSet::WatcherSet() {} + +WatcherSet::~WatcherSet() {} + +void WatcherSet::NotifyForStateChange(const HandleSignalsState& state) { + for (const auto& entry : watchers_) + entry.second->NotifyForStateChange(state); +} + +void WatcherSet::NotifyClosed() { + for (const auto& entry : watchers_) + entry.second->NotifyClosed(); +} + +MojoResult WatcherSet::Add(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context, + const HandleSignalsState& current_state) { + auto it = watchers_.find(context); + if (it != watchers_.end()) + return MOJO_RESULT_ALREADY_EXISTS; + + if (!current_state.can_satisfy(signals)) + return MOJO_RESULT_FAILED_PRECONDITION; + + scoped_refptr<Watcher> watcher(new Watcher(signals, callback)); + watchers_.insert(std::make_pair(context, watcher)); + + watcher->NotifyForStateChange(current_state); + + return MOJO_RESULT_OK; +} + +MojoResult WatcherSet::Remove(uintptr_t context) { + auto it = watchers_.find(context); + if (it == watchers_.end()) + return MOJO_RESULT_INVALID_ARGUMENT; + + RequestContext::current()->AddWatchCancelFinalizer(it->second); + watchers_.erase(it); + return MOJO_RESULT_OK; +} + +} // namespace edk +} // namespace mojo diff --git a/mojo/edk/system/watcher_set.h b/mojo/edk/system/watcher_set.h new file mode 100644 index 0000000..8ae54a1b --- /dev/null +++ b/mojo/edk/system/watcher_set.h @@ -0,0 +1,54 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_EDK_SYSTEM_WATCHER_SET_H_ +#define MOJO_EDK_SYSTEM_WATCHER_SET_H_ + +#include <unordered_map> + +#include "base/callback.h" +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "mojo/edk/system/handle_signals_state.h" +#include "mojo/edk/system/watcher.h" +#include "mojo/public/c/system/types.h" + +namespace mojo { +namespace edk { + +// A WatcherSet maintains a set of Watchers attached to a single handle and +// keyed on an arbitrary user context. +class WatcherSet { + public: + WatcherSet(); + ~WatcherSet(); + + // Notifies all Watchers of a state change. + void NotifyForStateChange(const HandleSignalsState& state); + + // Notifies all Watchers that their watched handle has been closed. + void NotifyClosed(); + + // Adds a new watcher to watch for signals in |signals| to be satisfied or + // unsatisfiable. |current_state| is the current signals state of the + // handle being watched. + MojoResult Add(MojoHandleSignals signals, + const Watcher::WatchCallback& callback, + uintptr_t context, + const HandleSignalsState& current_state); + + // Removes a watcher from the set. + MojoResult Remove(uintptr_t context); + + private: + // A map of watchers keyed on context value. + std::unordered_map<uintptr_t, scoped_refptr<Watcher>> watchers_; + + DISALLOW_COPY_AND_ASSIGN(WatcherSet); +}; + +} // namespace edk +} // namespace mojo + +#endif // MOJO_EDK_SYSTEM_WATCHER_SET_H_ diff --git a/mojo/edk/test/mojo_test_base.cc b/mojo/edk/test/mojo_test_base.cc index 27c92bb..7ec067a 100644 --- a/mojo/edk/test/mojo_test_base.cc +++ b/mojo/edk/test/mojo_test_base.cc @@ -9,6 +9,7 @@ #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/system/handle_signals_state.h" #include "mojo/public/c/system/buffer.h" +#include "mojo/public/c/system/data_pipe.h" #include "mojo/public/c/system/functions.h" #include "testing/gtest/include/gtest/gtest.h" @@ -221,6 +222,48 @@ void MojoTestBase::ExpectBufferContents(MojoHandle h, EXPECT_EQ(MOJO_RESULT_OK, MojoUnmapBuffer(static_cast<void*>(data))); } +// static +void MojoTestBase::CreateDataPipe(MojoHandle *p0, + MojoHandle* p1, + size_t capacity) { + MojoCreateDataPipeOptions options; + options.struct_size = static_cast<uint32_t>(sizeof(options)); + options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; + options.element_num_bytes = 1; + options.capacity_num_bytes = static_cast<uint32_t>(capacity); + + MojoCreateDataPipe(&options, p0, p1); + CHECK_NE(*p0, MOJO_HANDLE_INVALID); + CHECK_NE(*p1, MOJO_HANDLE_INVALID); +} + +// static +void MojoTestBase::WriteData(MojoHandle producer, const std::string& data) { + CHECK_EQ(MojoWait(producer, MOJO_HANDLE_SIGNAL_WRITABLE, + MOJO_DEADLINE_INDEFINITE, nullptr), + MOJO_RESULT_OK); + uint32_t num_bytes = static_cast<uint32_t>(data.size()); + CHECK_EQ(MojoWriteData(producer, data.data(), &num_bytes, + MOJO_WRITE_DATA_FLAG_ALL_OR_NONE), + MOJO_RESULT_OK); + CHECK_EQ(num_bytes, static_cast<uint32_t>(data.size())); +} + +// static +std::string MojoTestBase::ReadData(MojoHandle consumer, size_t size) { + CHECK_EQ(MojoWait(consumer, MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, nullptr), + MOJO_RESULT_OK); + std::vector<char> buffer(size); + uint32_t num_bytes = static_cast<uint32_t>(size); + CHECK_EQ(MojoReadData(consumer, buffer.data(), &num_bytes, + MOJO_WRITE_DATA_FLAG_ALL_OR_NONE), + MOJO_RESULT_OK); + CHECK_EQ(num_bytes, static_cast<uint32_t>(size)); + + return std::string(buffer.data(), buffer.size()); +} + } // namespace test } // namespace edk } // namespace mojo diff --git a/mojo/edk/test/mojo_test_base.h b/mojo/edk/test/mojo_test_base.h index e44316d..71c9276 100644 --- a/mojo/edk/test/mojo_test_base.h +++ b/mojo/edk/test/mojo_test_base.h @@ -14,10 +14,6 @@ #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" -#include "base/message_loop/message_loop.h" -#include "base/run_loop.h" -#include "base/task_runner.h" -#include "base/thread_task_runner_handle.h" #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/test/multiprocess_test_helper.h" #include "mojo/public/c/system/types.h" @@ -131,10 +127,22 @@ class MojoTestBase : public testing::Test { size_t offset, const base::StringPiece& s); + //////// Data pipe test utilities ///////// + + // Creates a new data pipe. + static void CreateDataPipe(MojoHandle* producer, + MojoHandle* consumer, + size_t capacity); + + // Writes data to a data pipe. + static void WriteData(MojoHandle producer, const std::string& data); + + // Reads data from a data pipe. + static std::string ReadData(MojoHandle consumer, size_t size); + private: friend class ClientController; - base::MessageLoop message_loop_; std::vector<scoped_ptr<ClientController>> clients_; DISALLOW_COPY_AND_ASSIGN(MojoTestBase); diff --git a/mojo/mojo_edk.gyp b/mojo/mojo_edk.gyp index 144d8c0..f95cde4 100644 --- a/mojo/mojo_edk.gyp +++ b/mojo/mojo_edk.gyp @@ -112,12 +112,18 @@ 'edk/system/ports_message.h', 'edk/system/remote_message_pipe_bootstrap.cc', 'edk/system/remote_message_pipe_bootstrap.h', + 'edk/system/request_context.cc', + 'edk/system/request_context.h', 'edk/system/shared_buffer_dispatcher.cc', 'edk/system/shared_buffer_dispatcher.h', 'edk/system/wait_set_dispatcher.cc', 'edk/system/wait_set_dispatcher.h', 'edk/system/waiter.cc', 'edk/system/waiter.h', + 'edk/system/watcher.cc', + 'edk/system/watcher.h', + 'edk/system/watcher_set.cc', + 'edk/system/watcher_set.h', # Test-only code: # TODO(vtl): It's a little unfortunate that these end up in the same # component as non-test-only code. In the static build, this code diff --git a/mojo/mojo_edk_tests.gyp b/mojo/mojo_edk_tests.gyp index 03e60d3..94ba7233 100644 --- a/mojo/mojo_edk_tests.gyp +++ b/mojo/mojo_edk_tests.gyp @@ -227,6 +227,7 @@ 'edk/system/waiter_test_utils.cc', 'edk/system/waiter_test_utils.h', 'edk/system/waiter_unittest.cc', + 'edk/system/watch_unittest.cc', ], 'conditions': [ ['OS=="ios"', { diff --git a/mojo/public/c/system/functions.h b/mojo/public/c/system/functions.h index cd357b5..6c7e2df 100644 --- a/mojo/public/c/system/functions.h +++ b/mojo/public/c/system/functions.h @@ -9,6 +9,7 @@ #ifndef MOJO_PUBLIC_C_SYSTEM_FUNCTIONS_H_ #define MOJO_PUBLIC_C_SYSTEM_FUNCTIONS_H_ +#include <stddef.h> #include <stdint.h> #include "mojo/public/c/system/system_export.h" @@ -18,6 +19,13 @@ extern "C" { #endif +// A callback used to notify watchers registered via |MojoWatch()|. Called when +// some watched signals are satisfied or become unsatisfiable. See the +// documentation for |MojoWatch()| for more details. +typedef void (*MojoWatchCallback)(uintptr_t context, + MojoResult result, + struct MojoHandleSignalsState signals_state); + // Note: Pointer parameters that are labelled "optional" may be null (at least // under some circumstances). Non-const pointer parameters are also labeled // "in", "out", or "in/out", to indicate how they are used. (Note that how/if @@ -129,6 +137,74 @@ MojoWaitMany(const MojoHandle* handles, uint32_t* result_index, // Optional out struct MojoHandleSignalsState* signals_states); // Optional out +// Watches the given handle for one of the following events to happen: +// - A signal indicated by |signals| is satisfied. +// - It becomes known that no signal indicated by |signals| will ever be +// satisfied. (See the description of the |MOJO_RESULT_CANCELLED| and +// |MOJO_RESULT_FAILED_PRECONDITION| return values below.) +// - The handle is closed. +// +// |handle|: The handle to watch. Must be an open message pipe or data pipe +// handle. +// |signals|: The signals to watch for. +// |callback|: A function to be called any time one of the above events happens. +// The function must be safe to call from any thread at any time. +// |context|: User-provided context passed to |callback| when called. |context| +// is used to uniquely identify a registered watch and can be used to cancel +// the watch later using |MojoCancelWatch()|. +// +// Returns: +// |MOJO_RESULT_OK| if the watch has been successfully registered. Note that +// if the signals are already satisfied this may synchronously invoke +// |callback| before returning. +// |MOJO_RESULT_CANCELLED| if the watch was cancelled. In this case it is not +// necessary to explicitly call |MojoCancelWatch()|, and in fact it may be +// an error to do so as the handle may have been closed. +// |MOJO_RESULT_INVALID_ARGUMENT| if |handle| is not an open message pipe +// handle. +// |MOJO_RESULT_FAILED_PRECONDITION| if it is already known that |signals| can +// never be satisfied. +// |MOJO_RESULT_ALREADY_EXISTS| if there is already a watch registered for +// the same combination of |handle| and |context|. +// +// Callback result codes: +// The callback may be called at any time on any thread with one of the +// following result codes to indicate various events: +// +// |MOJO_RESULT_OK| indicates that some signal in |signals| has been +// satisfied. +// |MOJO_RESULT_FAILED_PRECONDITION| indicates that no signals in |signals| +// can ever be satisfied again. +// |MOJO_RESULT_CANCELLED| indicates that the handle has been closed. In this +// case the watch is implicitly cancelled and there is no need to call +// |MojoCancelWatch()|. +MOJO_SYSTEM_EXPORT MojoResult +MojoWatch(MojoHandle handle, + MojoHandleSignals signals, + MojoWatchCallback callback, + uintptr_t context); + +// Cancels a handle watch corresponding to some prior call to |MojoWatch()|. +// +// NOTE: If the watch callback corresponding to |context| is currently running +// this will block until the callback completes execution. It is therefore +// illegal to call |MojoCancelWatch()| on a given |handle| and |context| from +// within the associated callback itself, as this will always deadlock. +// +// After |MojoCancelWatch()| function returns, the watch's associated callback +// will NEVER be called again by Mojo. +// +// |context|: The same user-provided context given to some prior call to +// |MojoWatch()|. Only the watch corresponding to this context will be +// cancelled. +// +// Returns: +// |MOJO_RESULT_OK| if the watch corresponding to |context| was cancelled. +// |MOJO_RESULT_INVALID_ARGUMENT| if no watch was registered with |context| +// for the given |handle|, or if |handle| is invalid. +MOJO_SYSTEM_EXPORT MojoResult +MojoCancelWatch(MojoHandle handle, uintptr_t context); + #ifdef __cplusplus } // extern "C" #endif |