summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrockot <rockot@chromium.org>2016-03-01 19:46:37 -0800
committerCommit bot <commit-bot@chromium.org>2016-03-02 03:48:53 +0000
commit49b69e6b27a12426ba56b6b5271ccec9b54c94c2 (patch)
tree503a0782b56020a0e95dc50035f447c4c5ad81b3
parenteff7ca929ee4c19c2f8bb2544a31009599217bd1 (diff)
downloadchromium_src-49b69e6b27a12426ba56b6b5271ccec9b54c94c2.zip
chromium_src-49b69e6b27a12426ba56b6b5271ccec9b54c94c2.tar.gz
chromium_src-49b69e6b27a12426ba56b6b5271ccec9b54c94c2.tar.bz2
[mojo-edk] Add MojoWatch and MojoCancelWatch APIs
This adds MojoWatch() and MojoCancelWatch() APIs to support efficient asynchronous handle event notifications. BUG=590495 Review URL: https://codereview.chromium.org/1748503002 Cr-Commit-Position: refs/heads/master@{#378677}
-rw-r--r--mojo/edk/embedder/entrypoints.cc11
-rw-r--r--mojo/edk/system/BUILD.gn7
-rw-r--r--mojo/edk/system/awakable_list.cc13
-rw-r--r--mojo/edk/system/awakable_list.h14
-rw-r--r--mojo/edk/system/core.cc50
-rw-r--r--mojo/edk/system/core.h5
-rw-r--r--mojo/edk/system/data_pipe_consumer_dispatcher.cc26
-rw-r--r--mojo/edk/system/data_pipe_consumer_dispatcher.h4
-rw-r--r--mojo/edk/system/data_pipe_producer_dispatcher.cc26
-rw-r--r--mojo/edk/system/data_pipe_producer_dispatcher.h4
-rw-r--r--mojo/edk/system/dispatcher.cc10
-rw-r--r--mojo/edk/system/dispatcher.h9
-rw-r--r--mojo/edk/system/message_pipe_dispatcher.cc24
-rw-r--r--mojo/edk/system/message_pipe_dispatcher.h4
-rw-r--r--mojo/edk/system/request_context.cc77
-rw-r--r--mojo/edk/system/request_context.h82
-rw-r--r--mojo/edk/system/watch_unittest.cc374
-rw-r--r--mojo/edk/system/watcher.cc52
-rw-r--r--mojo/edk/system/watcher.h85
-rw-r--r--mojo/edk/system/watcher_set.cc57
-rw-r--r--mojo/edk/system/watcher_set.h54
-rw-r--r--mojo/edk/test/mojo_test_base.cc43
-rw-r--r--mojo/edk/test/mojo_test_base.h18
-rw-r--r--mojo/mojo_edk.gyp6
-rw-r--r--mojo/mojo_edk_tests.gyp1
-rw-r--r--mojo/public/c/system/functions.h76
26 files changed, 1127 insertions, 5 deletions
diff --git a/mojo/edk/embedder/entrypoints.cc b/mojo/edk/embedder/entrypoints.cc
index 102b25f..23b919d 100644
--- a/mojo/edk/embedder/entrypoints.cc
+++ b/mojo/edk/embedder/entrypoints.cc
@@ -42,6 +42,17 @@ MojoResult MojoWaitMany(const MojoHandle* handles,
signals_states);
}
+MojoResult MojoWatch(MojoHandle handle,
+ MojoHandleSignals signals,
+ MojoWatchCallback callback,
+ uintptr_t context) {
+ return g_core->Watch(handle, signals, callback, context);
+}
+
+MojoResult MojoCancelWatch(MojoHandle handle, uintptr_t context) {
+ return g_core->CancelWatch(handle, context);
+}
+
MojoResult MojoCreateWaitSet(MojoHandle* wait_set_handle) {
return g_core->CreateWaitSet(wait_set_handle);
}
diff --git a/mojo/edk/system/BUILD.gn b/mojo/edk/system/BUILD.gn
index 75a097d..b9cac17 100644
--- a/mojo/edk/system/BUILD.gn
+++ b/mojo/edk/system/BUILD.gn
@@ -64,12 +64,18 @@ component("system") {
"ports_message.h",
"remote_message_pipe_bootstrap.cc",
"remote_message_pipe_bootstrap.h",
+ "request_context.cc",
+ "request_context.h",
"shared_buffer_dispatcher.cc",
"shared_buffer_dispatcher.h",
"wait_set_dispatcher.cc",
"wait_set_dispatcher.h",
"waiter.cc",
"waiter.h",
+ "watcher.cc",
+ "watcher.h",
+ "watcher_set.cc",
+ "watcher_set.h",
]
defines = [
@@ -147,6 +153,7 @@ test("mojo_system_unittests") {
"waiter_test_utils.cc",
"waiter_test_utils.h",
"waiter_unittest.cc",
+ "watch_unittest.cc",
]
if (!is_ios) {
diff --git a/mojo/edk/system/awakable_list.cc b/mojo/edk/system/awakable_list.cc
index 429e691..2045f32 100644
--- a/mojo/edk/system/awakable_list.cc
+++ b/mojo/edk/system/awakable_list.cc
@@ -39,6 +39,7 @@ void AwakableList::AwakeForStateChange(const HandleSignalsState& state) {
}
}
awakables_.erase(last, awakables_.end());
+ watchers_.NotifyForStateChange(state);
}
void AwakableList::CancelAll() {
@@ -47,6 +48,7 @@ void AwakableList::CancelAll() {
it->awakable->Awake(MOJO_RESULT_CANCELLED, it->context);
}
awakables_.clear();
+ watchers_.NotifyClosed();
}
void AwakableList::Add(Awakable* awakable,
@@ -70,5 +72,16 @@ void AwakableList::Remove(Awakable* awakable) {
awakables_.erase(last, awakables_.end());
}
+MojoResult AwakableList::AddWatcher(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context,
+ const HandleSignalsState& current_state) {
+ return watchers_.Add(signals, callback, context, current_state);
+}
+
+MojoResult AwakableList::RemoveWatcher(uintptr_t context) {
+ return watchers_.Remove(context);
+}
+
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/awakable_list.h b/mojo/edk/system/awakable_list.h
index 4e788ee..a80f5d6 100644
--- a/mojo/edk/system/awakable_list.h
+++ b/mojo/edk/system/awakable_list.h
@@ -5,11 +5,14 @@
#ifndef MOJO_EDK_SYSTEM_AWAKABLE_LIST_H_
#define MOJO_EDK_SYSTEM_AWAKABLE_LIST_H_
+#include <stddef.h>
#include <stdint.h>
#include <vector>
#include "mojo/edk/system/system_impl_export.h"
+#include "mojo/edk/system/watcher.h"
+#include "mojo/edk/system/watcher_set.h"
#include "mojo/public/c/system/types.h"
#include "mojo/public/cpp/system/macros.h"
@@ -36,6 +39,13 @@ class MOJO_SYSTEM_IMPL_EXPORT AwakableList {
void Add(Awakable* awakable, MojoHandleSignals signals, uintptr_t context);
void Remove(Awakable* awakable);
+ // Add and remove Watchers to this AwakableList.
+ MojoResult AddWatcher(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context,
+ const HandleSignalsState& current_state);
+ MojoResult RemoveWatcher(uintptr_t context);
+
private:
struct AwakeInfo {
AwakeInfo(Awakable* awakable, MojoHandleSignals signals, uintptr_t context)
@@ -49,6 +59,10 @@ class MOJO_SYSTEM_IMPL_EXPORT AwakableList {
AwakeInfoList awakables_;
+ // TODO: Remove AwakableList and instead use WatcherSet directly in
+ // dispatchers.
+ WatcherSet watchers_;
+
MOJO_DISALLOW_COPY_AND_ASSIGN(AwakableList);
};
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index e0f4131..a54e8b1 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -31,6 +31,7 @@
#include "mojo/edk/system/platform_handle_dispatcher.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/remote_message_pipe_bootstrap.h"
+#include "mojo/edk/system/request_context.h"
#include "mojo/edk/system/shared_buffer_dispatcher.h"
#include "mojo/edk/system/wait_set_dispatcher.h"
#include "mojo/edk/system/waiter.h"
@@ -47,6 +48,14 @@ const uint32_t kMaxHandlesPerMessage = 1024 * 1024;
// too; for now we just use a constant. This only affects bootstrap pipes.
const uint64_t kUnknownPipeIdForDebug = 0x7f7f7f7f7f7f7f7fUL;
+void CallWatchCallback(MojoWatchCallback callback,
+ uintptr_t context,
+ MojoResult result,
+ const HandleSignalsState& signals_state) {
+ callback(context, result,
+ static_cast<MojoHandleSignalsState>(signals_state));
+}
+
} // namespace
Core::Core() {}
@@ -268,6 +277,7 @@ MojoTimeTicks Core::GetTimeTicksNow() {
}
MojoResult Core::Close(MojoHandle handle) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher;
{
base::AutoLock lock(handles_lock_);
@@ -283,6 +293,7 @@ MojoResult Core::Wait(MojoHandle handle,
MojoHandleSignals signals,
MojoDeadline deadline,
MojoHandleSignalsState* signals_state) {
+ RequestContext request_context;
uint32_t unused = static_cast<uint32_t>(-1);
HandleSignalsState hss;
MojoResult rv = WaitManyInternal(&handle, &signals, 1, deadline, &unused,
@@ -298,6 +309,7 @@ MojoResult Core::WaitMany(const MojoHandle* handles,
MojoDeadline deadline,
uint32_t* result_index,
MojoHandleSignalsState* signals_state) {
+ RequestContext request_context;
if (num_handles < 1)
return MOJO_RESULT_INVALID_ARGUMENT;
if (num_handles > GetConfiguration().max_wait_many_num_handles)
@@ -319,7 +331,28 @@ MojoResult Core::WaitMany(const MojoHandle* handles,
return rv;
}
+MojoResult Core::Watch(MojoHandle handle,
+ MojoHandleSignals signals,
+ MojoWatchCallback callback,
+ uintptr_t context) {
+ RequestContext request_context;
+ scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle);
+ if (!dispatcher)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return dispatcher->Watch(
+ signals, base::Bind(&CallWatchCallback, callback, context), context);
+}
+
+MojoResult Core::CancelWatch(MojoHandle handle, uintptr_t context) {
+ RequestContext request_context;
+ scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle);
+ if (!dispatcher)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return dispatcher->CancelWatch(context);
+}
+
MojoResult Core::CreateWaitSet(MojoHandle* wait_set_handle) {
+ RequestContext request_context;
if (!wait_set_handle)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -338,6 +371,7 @@ MojoResult Core::CreateWaitSet(MojoHandle* wait_set_handle) {
MojoResult Core::AddHandle(MojoHandle wait_set_handle,
MojoHandle handle,
MojoHandleSignals signals) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle));
if (!wait_set_dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -351,6 +385,7 @@ MojoResult Core::AddHandle(MojoHandle wait_set_handle,
MojoResult Core::RemoveHandle(MojoHandle wait_set_handle,
MojoHandle handle) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> wait_set_dispatcher(GetDispatcher(wait_set_handle));
if (!wait_set_dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -367,6 +402,7 @@ MojoResult Core::GetReadyHandles(MojoHandle wait_set_handle,
MojoHandle* handles,
MojoResult* results,
MojoHandleSignalsState* signals_states) {
+ RequestContext request_context;
if (!handles || !count || !(*count) || !results)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -396,6 +432,7 @@ MojoResult Core::CreateMessagePipe(
const MojoCreateMessagePipeOptions* options,
MojoHandle* message_pipe_handle0,
MojoHandle* message_pipe_handle1) {
+ RequestContext request_context;
ports::PortRef port0, port1;
GetNodeController()->node()->CreatePortPair(&port0, &port1);
@@ -427,6 +464,7 @@ MojoResult Core::WriteMessage(MojoHandle message_pipe_handle,
const MojoHandle* handles,
uint32_t num_handles,
MojoWriteMessageFlags flags) {
+ RequestContext request_context;
auto dispatcher = GetDispatcher(message_pipe_handle);
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -476,6 +514,7 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle,
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags) {
+ RequestContext request_context;
CHECK((!num_handles || !*num_handles || handles) &&
(!num_bytes || !*num_bytes || bytes));
auto dispatcher = GetDispatcher(message_pipe_handle);
@@ -488,6 +527,7 @@ MojoResult Core::CreateDataPipe(
const MojoCreateDataPipeOptions* options,
MojoHandle* data_pipe_producer_handle,
MojoHandle* data_pipe_consumer_handle) {
+ RequestContext request_context;
if (options && options->struct_size != sizeof(MojoCreateDataPipeOptions))
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -542,6 +582,7 @@ MojoResult Core::WriteData(MojoHandle data_pipe_producer_handle,
const void* elements,
uint32_t* num_bytes,
MojoWriteDataFlags flags) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(
GetDispatcher(data_pipe_producer_handle));
if (!dispatcher)
@@ -554,6 +595,7 @@ MojoResult Core::BeginWriteData(MojoHandle data_pipe_producer_handle,
void** buffer,
uint32_t* buffer_num_bytes,
MojoWriteDataFlags flags) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(
GetDispatcher(data_pipe_producer_handle));
if (!dispatcher)
@@ -564,6 +606,7 @@ MojoResult Core::BeginWriteData(MojoHandle data_pipe_producer_handle,
MojoResult Core::EndWriteData(MojoHandle data_pipe_producer_handle,
uint32_t num_bytes_written) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(
GetDispatcher(data_pipe_producer_handle));
if (!dispatcher)
@@ -576,6 +619,7 @@ MojoResult Core::ReadData(MojoHandle data_pipe_consumer_handle,
void* elements,
uint32_t* num_bytes,
MojoReadDataFlags flags) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(
GetDispatcher(data_pipe_consumer_handle));
if (!dispatcher)
@@ -588,6 +632,7 @@ MojoResult Core::BeginReadData(MojoHandle data_pipe_consumer_handle,
const void** buffer,
uint32_t* buffer_num_bytes,
MojoReadDataFlags flags) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(
GetDispatcher(data_pipe_consumer_handle));
if (!dispatcher)
@@ -598,6 +643,7 @@ MojoResult Core::BeginReadData(MojoHandle data_pipe_consumer_handle,
MojoResult Core::EndReadData(MojoHandle data_pipe_consumer_handle,
uint32_t num_bytes_read) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(
GetDispatcher(data_pipe_consumer_handle));
if (!dispatcher)
@@ -610,6 +656,7 @@ MojoResult Core::CreateSharedBuffer(
const MojoCreateSharedBufferOptions* options,
uint64_t num_bytes,
MojoHandle* shared_buffer_handle) {
+ RequestContext request_context;
MojoCreateSharedBufferOptions validated_options = {};
MojoResult result = SharedBufferDispatcher::ValidateCreateOptions(
options, &validated_options);
@@ -638,6 +685,7 @@ MojoResult Core::DuplicateBufferHandle(
MojoHandle buffer_handle,
const MojoDuplicateBufferHandleOptions* options,
MojoHandle* new_buffer_handle) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(GetDispatcher(buffer_handle));
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -664,6 +712,7 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle,
uint64_t num_bytes,
void** buffer,
MojoMapBufferFlags flags) {
+ RequestContext request_context;
scoped_refptr<Dispatcher> dispatcher(GetDispatcher(buffer_handle));
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -687,6 +736,7 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle,
}
MojoResult Core::UnmapBuffer(void* buffer) {
+ RequestContext request_context;
base::AutoLock lock(mapping_table_lock_);
return mapping_table_.RemoveMapping(buffer);
}
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index e2a22b0..65b6933 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -132,6 +132,11 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
MojoDeadline deadline,
uint32_t* result_index,
MojoHandleSignalsState* signals_states);
+ MojoResult Watch(MojoHandle handle,
+ MojoHandleSignals signals,
+ MojoWatchCallback callback,
+ uintptr_t context);
+ MojoResult CancelWatch(MojoHandle handle, uintptr_t context);
// These methods correspond to the API functions defined in
// "mojo/public/c/system/wait_set.h":
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index 1f6f079..269f8b2 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -21,6 +21,7 @@
#include "mojo/edk/system/data_pipe_control_message.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
+#include "mojo/edk/system/request_context.h"
#include "mojo/public/c/system/data_pipe.h"
namespace mojo {
@@ -86,6 +87,29 @@ MojoResult DataPipeConsumerDispatcher::Close() {
return CloseNoLock();
}
+
+MojoResult DataPipeConsumerDispatcher::Watch(
+ MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return awakable_list_.AddWatcher(
+ signals, callback, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
+ base::AutoLock lock(lock_);
+
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return awakable_list_.RemoveWatcher(context);
+}
+
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
uint32_t* num_bytes,
MojoReadDataFlags flags) {
@@ -474,6 +498,8 @@ void DataPipeConsumerDispatcher::OnPortStatusChanged() {
}
void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
+ RequestContext request_context;
+
lock_.AssertAcquired();
bool was_peer_closed = peer_closed_;
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h
index c75833b..945aa07 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h
@@ -43,6 +43,10 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final
// Dispatcher:
Type GetType() const override;
MojoResult Close() override;
+ MojoResult Watch(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) override;
+ MojoResult CancelWatch(uintptr_t context) override;
MojoResult ReadData(void* elements,
uint32_t* num_bytes,
MojoReadDataFlags flags) override;
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index cbbade8..634fb68 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -20,6 +20,8 @@
#include "mojo/edk/system/data_pipe_control_message.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
+#include "mojo/edk/system/request_context.h"
+#include "mojo/public/c/system/data_pipe.h"
namespace mojo {
namespace edk {
@@ -85,6 +87,28 @@ MojoResult DataPipeProducerDispatcher::Close() {
return CloseNoLock();
}
+MojoResult DataPipeProducerDispatcher::Watch(
+ MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return awakable_list_.AddWatcher(
+ signals, callback, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
+ base::AutoLock lock(lock_);
+
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return awakable_list_.RemoveWatcher(context);
+}
+
MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
uint32_t* num_bytes,
MojoWriteDataFlags flags) {
@@ -452,6 +476,8 @@ void DataPipeProducerDispatcher::OnPortStatusChanged() {
}
void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
+ RequestContext request_context;
+
lock_.AssertAcquired();
bool was_peer_closed = peer_closed_;
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.h b/mojo/edk/system/data_pipe_producer_dispatcher.h
index c1020b0..cfdeb96 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.h
@@ -42,6 +42,10 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeProducerDispatcher final
// Dispatcher:
Type GetType() const override;
MojoResult Close() override;
+ MojoResult Watch(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) override;
+ MojoResult CancelWatch(uintptr_t context) override;
MojoResult WriteData(const void* elements,
uint32_t* num_bytes,
MojoReadDataFlags flags) override;
diff --git a/mojo/edk/system/dispatcher.cc b/mojo/edk/system/dispatcher.cc
index 8a2e15e..6705542 100644
--- a/mojo/edk/system/dispatcher.cc
+++ b/mojo/edk/system/dispatcher.cc
@@ -22,6 +22,16 @@ Dispatcher::DispatcherInTransit::DispatcherInTransit(
Dispatcher::DispatcherInTransit::~DispatcherInTransit() {}
+MojoResult Dispatcher::Watch(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
+MojoResult Dispatcher::CancelWatch(uintptr_t context) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
+}
+
MojoResult Dispatcher::WriteMessage(const void* bytes,
uint32_t num_bytes,
const DispatcherInTransit* dispatchers,
diff --git a/mojo/edk/system/dispatcher.h b/mojo/edk/system/dispatcher.h
index 89876eb..e8bcf33 100644
--- a/mojo/edk/system/dispatcher.h
+++ b/mojo/edk/system/dispatcher.h
@@ -20,6 +20,7 @@
#include "mojo/edk/system/handle_signals_state.h"
#include "mojo/edk/system/ports/name.h"
#include "mojo/edk/system/system_impl_export.h"
+#include "mojo/edk/system/watcher.h"
#include "mojo/public/c/system/buffer.h"
#include "mojo/public/c/system/data_pipe.h"
#include "mojo/public/c/system/message_pipe.h"
@@ -65,6 +66,14 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher
virtual Type GetType() const = 0;
virtual MojoResult Close() = 0;
+ ///////////// Watch API ////////////////////
+
+ virtual MojoResult Watch(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context);
+
+ virtual MojoResult CancelWatch(uintptr_t context);
+
///////////// Message pipe API /////////////
virtual MojoResult WriteMessage(const void* bytes,
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index c7b4a32..ce35f64 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -13,6 +13,7 @@
#include "mojo/edk/system/core.h"
#include "mojo/edk/system/node_controller.h"
#include "mojo/edk/system/ports_message.h"
+#include "mojo/edk/system/request_context.h"
#include "mojo/public/c/system/macros.h"
namespace mojo {
@@ -97,6 +98,27 @@ MojoResult MessagePipeDispatcher::Close() {
return CloseNoLock();
}
+MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) {
+ base::AutoLock lock(signal_lock_);
+
+ if (port_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return awakables_.AddWatcher(
+ signals, callback, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
+ base::AutoLock lock(signal_lock_);
+
+ if (port_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return awakables_.RemoveWatcher(context);
+}
+
MojoResult MessagePipeDispatcher::WriteMessage(
const void* bytes,
uint32_t num_bytes,
@@ -579,6 +601,8 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
}
void MessagePipeDispatcher::OnPortStatusChanged() {
+ RequestContext request_context;
+
base::AutoLock lock(signal_lock_);
// We stop observing our port as soon as it's transferred, but this can race
diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h
index f898ea8..7400515 100644
--- a/mojo/edk/system/message_pipe_dispatcher.h
+++ b/mojo/edk/system/message_pipe_dispatcher.h
@@ -42,6 +42,10 @@ class MessagePipeDispatcher : public Dispatcher {
// Dispatcher:
Type GetType() const override;
MojoResult Close() override;
+ MojoResult Watch(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context) override;
+ MojoResult CancelWatch(uintptr_t context) override;
MojoResult WriteMessage(const void* bytes,
uint32_t num_bytes,
const DispatcherInTransit* dispatchers,
diff --git a/mojo/edk/system/request_context.cc b/mojo/edk/system/request_context.cc
new file mode 100644
index 0000000..4b7e011
--- /dev/null
+++ b/mojo/edk/system/request_context.cc
@@ -0,0 +1,77 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/request_context.h"
+
+#include "base/lazy_instance.h"
+#include "base/logging.h"
+#include "base/threading/thread_local.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+base::LazyInstance<base::ThreadLocalPointer<RequestContext>>::Leaky
+ g_current_context;
+
+} // namespace
+
+RequestContext::RequestContext() {
+ // We allow nested RequestContexts to exist as long as they aren't actually
+ // used for anything.
+ if (!g_current_context.Pointer()->Get())
+ g_current_context.Pointer()->Set(this);
+}
+
+RequestContext::~RequestContext() {
+ // NOTE: Callbacks invoked by this destructor are allowed to initiate new
+ // EDK requests on this thread, so we need to reset the thread-local context
+ // pointer before calling them.
+ if (IsCurrent())
+ g_current_context.Pointer()->Set(nullptr);
+
+ for (const WatchNotifyFinalizer& watch : watch_notify_finalizers_.container())
+ watch.watcher->MaybeInvokeCallback(watch.result, watch.state);
+
+ for (const scoped_refptr<Watcher>& watcher :
+ watch_cancel_finalizers_.container())
+ watcher->Cancel();
+}
+
+// static
+RequestContext* RequestContext::current() {
+ DCHECK(g_current_context.Pointer()->Get());
+ return g_current_context.Pointer()->Get();
+}
+
+void RequestContext::AddWatchNotifyFinalizer(
+ scoped_refptr<Watcher> watcher,
+ MojoResult result,
+ const HandleSignalsState& state) {
+ DCHECK(IsCurrent());
+ watch_notify_finalizers_->push_back(
+ WatchNotifyFinalizer(watcher, result, state));
+}
+
+void RequestContext::AddWatchCancelFinalizer(scoped_refptr<Watcher> watcher) {
+ DCHECK(IsCurrent());
+ watch_cancel_finalizers_->push_back(watcher);
+}
+
+bool RequestContext::IsCurrent() const {
+ return g_current_context.Pointer()->Get() == this;
+}
+
+RequestContext::WatchNotifyFinalizer::WatchNotifyFinalizer(
+ scoped_refptr<Watcher> watcher,
+ MojoResult result,
+ const HandleSignalsState& state)
+ : watcher(watcher), result(result), state(state) {
+}
+
+RequestContext::WatchNotifyFinalizer::~WatchNotifyFinalizer() {}
+
+} // namespace edk
+} // namespace mojo
diff --git a/mojo/edk/system/request_context.h b/mojo/edk/system/request_context.h
new file mode 100644
index 0000000..75ba61f
--- /dev/null
+++ b/mojo/edk/system/request_context.h
@@ -0,0 +1,82 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_SYSTEM_REQUEST_CONTEXT_H_
+#define MOJO_EDK_SYSTEM_REQUEST_CONTEXT_H_
+
+#include "base/containers/stack_container.h"
+#include "base/macros.h"
+#include "mojo/edk/system/handle_signals_state.h"
+#include "mojo/edk/system/watcher.h"
+
+namespace mojo {
+namespace edk {
+
+// A RequestContext is a thread-local object which exists for the duration of
+// a single system API call. It is constructed immediately upon EDK entry and
+// destructed immediately before returning to the caller, after any internal
+// locks have been released.
+//
+// NOTE: It is legal to construct a RequestContext while another one already
+// exists on the current thread, but it is not safe to use the nested context
+// for any reason. Therefore it is important to always use
+// |RequestContext::current()| rather than referring to any local instance
+// directly.
+class RequestContext {
+ public:
+ RequestContext();
+ ~RequestContext();
+
+ // Returns the current thread-local RequestContext.
+ static RequestContext* current();
+
+ // Adds a finalizer to this RequestContext corresponding to a watch callback
+ // which should be triggered in response to some handle state change. If
+ // the Watcher hasn't been cancelled by the time this RequestContext is
+ // destroyed, its WatchCallback will be invoked with |result| and |state|
+ // arguments.
+ void AddWatchNotifyFinalizer(scoped_refptr<Watcher> watcher,
+ MojoResult result,
+ const HandleSignalsState& state);
+
+ // Adds a finalizer to this RequestContext which cancels a watch.
+ void AddWatchCancelFinalizer(scoped_refptr<Watcher> watcher);
+
+ private:
+ // Is this request context the current one?
+ bool IsCurrent() const;
+
+ struct WatchNotifyFinalizer {
+ WatchNotifyFinalizer(scoped_refptr<Watcher> watcher,
+ MojoResult result,
+ const HandleSignalsState& state);
+ ~WatchNotifyFinalizer();
+
+ scoped_refptr<Watcher> watcher;
+ MojoResult result;
+ HandleSignalsState state;
+ };
+
+ // Chosen by fair dice roll.
+ //
+ // TODO: We should measure the distribution of # of finalizers typical to
+ // any RequestContext and adjust this number accordingly. It's probably
+ // almost always 1, but 4 seems like a harmless upper bound for now.
+ static const size_t kStaticWatchFinalizersCapacity = 4;
+
+ using WatchNotifyFinalizerList =
+ base::StackVector<WatchNotifyFinalizer, kStaticWatchFinalizersCapacity>;
+ using WatchCancelFinalizerList =
+ base::StackVector<scoped_refptr<Watcher>, kStaticWatchFinalizersCapacity>;
+
+ WatchNotifyFinalizerList watch_notify_finalizers_;
+ WatchCancelFinalizerList watch_cancel_finalizers_;
+
+ DISALLOW_COPY_AND_ASSIGN(RequestContext);
+};
+
+} // namespace edk
+} // namespace mojo
+
+#endif // MOJO_EDK_SYSTEM_REQUEST_CONTEXT_H_
diff --git a/mojo/edk/system/watch_unittest.cc b/mojo/edk/system/watch_unittest.cc
new file mode 100644
index 0000000..fd0c13d
--- /dev/null
+++ b/mojo/edk/system/watch_unittest.cc
@@ -0,0 +1,374 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <functional>
+
+#include "base/macros.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "mojo/edk/test/mojo_test_base.h"
+#include "mojo/public/c/system/functions.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace mojo {
+namespace edk {
+namespace {
+
+void IgnoreResult(uintptr_t context,
+ MojoResult result,
+ MojoHandleSignalsState signals) {
+}
+
+// A test helper class for watching a handle. The WatchHelper instance is used
+// as a watch context for a single watch callback.
+class WatchHelper {
+ public:
+ using Callback =
+ std::function<void(MojoResult result, MojoHandleSignalsState state)>;
+
+ WatchHelper() {}
+ ~WatchHelper() {
+ CHECK(!watching_);
+ }
+
+ void Watch(MojoHandle handle,
+ MojoHandleSignals signals,
+ const Callback& callback) {
+ CHECK(!watching_);
+
+ handle_ = handle;
+ callback_ = callback;
+ watching_ = true;
+ CHECK_EQ(MOJO_RESULT_OK, MojoWatch(handle_, signals, &WatchHelper::OnNotify,
+ reinterpret_cast<uintptr_t>(this)));
+ }
+
+ bool is_watching() const { return watching_; }
+
+ void Cancel() {
+ CHECK_EQ(MOJO_RESULT_OK,
+ MojoCancelWatch(handle_, reinterpret_cast<uintptr_t>(this)));
+ CHECK(watching_);
+ watching_ = false;
+ }
+
+ private:
+ static void OnNotify(uintptr_t context,
+ MojoResult result,
+ MojoHandleSignalsState state) {
+ WatchHelper* watcher = reinterpret_cast<WatchHelper*>(context);
+ CHECK(watcher->watching_);
+ if (result == MOJO_RESULT_CANCELLED)
+ watcher->watching_ = false;
+ watcher->callback_(result, state);
+ }
+
+ bool watching_ = false;
+ MojoHandle handle_;
+ Callback callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatchHelper);
+};
+
+class WatchTest : public test::MojoTestBase {
+ public:
+ WatchTest() {}
+ ~WatchTest() override {}
+
+ protected:
+
+ private:
+ base::MessageLoop message_loop_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatchTest);
+};
+
+TEST_F(WatchTest, NotifyBasic) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ base::RunLoop loop;
+ WatchHelper b_watcher;
+ b_watcher.Watch(
+ b, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_TRUE(b_watcher.is_watching());
+ loop.Quit();
+ });
+
+ WriteMessage(a, "Hello!");
+ loop.Run();
+
+ EXPECT_TRUE(b_watcher.is_watching());
+ b_watcher.Cancel();
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, NotifyUnsatisfiable) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ base::RunLoop loop;
+ WatchHelper b_watcher;
+ b_watcher.Watch(
+ b, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
+ EXPECT_EQ(0u,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_EQ(0u,
+ state.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_TRUE(b_watcher.is_watching());
+ loop.Quit();
+ });
+
+ CloseHandle(a);
+ loop.Run();
+
+ b_watcher.Cancel();
+
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, NotifyCancellation) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ base::RunLoop loop;
+ WatchHelper b_watcher;
+ b_watcher.Watch(
+ b, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
+ EXPECT_EQ(0u, state.satisfied_signals);
+ EXPECT_EQ(0u, state.satisfiable_signals);
+ EXPECT_FALSE(b_watcher.is_watching());
+ loop.Quit();
+ });
+
+ CloseHandle(b);
+ loop.Run();
+
+ CloseHandle(a);
+}
+
+TEST_F(WatchTest, InvalidArguemnts) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ uintptr_t context = reinterpret_cast<uintptr_t>(this);
+ EXPECT_EQ(MOJO_RESULT_OK, MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE,
+ &IgnoreResult, context));
+
+ // Can't cancel a watch that doesn't exist.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoCancelWatch(a, ~context));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoCancelWatch(b, context));
+
+ CloseHandle(a);
+ CloseHandle(b);
+
+ // Can't watch a handle that doesn't exist.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ MojoWatch(b, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context));
+}
+
+TEST_F(WatchTest, NoDuplicateContext) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ // Try to add the same watch twice; should fail.
+ uintptr_t context = reinterpret_cast<uintptr_t>(this);
+ EXPECT_EQ(MOJO_RESULT_OK, MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE,
+ &IgnoreResult, context));
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
+ MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context));
+
+ // Cancel and add it again; should be OK.
+ EXPECT_EQ(MOJO_RESULT_OK, MojoCancelWatch(a, context));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoWatch(a, MOJO_HANDLE_SIGNAL_READABLE,
+ &IgnoreResult, context));
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, MultipleWatches) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ // Add multiple watchers to |b| and see that they are both notified by a
+ // single write to |a|.
+ base::RunLoop loop;
+ int expected_notifications = 2;
+ auto on_readable = [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_GT(expected_notifications, 0);
+ if (--expected_notifications == 0)
+ loop.Quit();
+ };
+ WatchHelper watcher1;
+ WatchHelper watcher2;
+ watcher1.Watch(b, MOJO_HANDLE_SIGNAL_READABLE, on_readable);
+ watcher2.Watch(b, MOJO_HANDLE_SIGNAL_READABLE, on_readable);
+
+ WriteMessage(a, "Ping!");
+ loop.Run();
+
+ watcher1.Cancel();
+ watcher2.Cancel();
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, WatchWhileSatisfied) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ // Write to |a| and then start watching |b|. The callback should be invoked
+ // synchronously.
+ WriteMessage(a, "hey");
+ bool signaled = false;
+ WatchHelper b_watcher;
+ b_watcher.Watch(
+ b, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ signaled = true;
+ });
+ EXPECT_TRUE(signaled);
+ b_watcher.Cancel();
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, WatchWhileUnsatisfiable) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ // Close |a| and then try to watch |b|. MojoWatch() should fail.
+ CloseHandle(a);
+ uintptr_t context = reinterpret_cast<uintptr_t>(this);
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ MojoWatch(b, MOJO_HANDLE_SIGNAL_READABLE, &IgnoreResult, context));
+
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, RespondFromCallback) {
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ // Watch |a| and |b|. Write to |a|, then write to |b| from within the callback
+ // which notifies it of the available message.
+ const std::string kTestMessage = "hello worlds.";
+ base::RunLoop loop;
+ WatchHelper b_watcher;
+ b_watcher.Watch(
+ b, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_TRUE(b_watcher.is_watching());
+
+ // Echo a's message back to it.
+ WriteMessage(b, ReadMessage(b));
+ });
+
+ WatchHelper a_watcher;
+ a_watcher.Watch(
+ a, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_TRUE(a_watcher.is_watching());
+
+ // Expect to receive back the message that was originally sent to |b|.
+ EXPECT_EQ(kTestMessage, ReadMessage(a));
+
+ loop.Quit();
+ });
+
+ WriteMessage(a, kTestMessage);
+ loop.Run();
+
+ a_watcher.Cancel();
+ b_watcher.Cancel();
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, WatchDataPipeConsumer) {
+ MojoHandle a, b;
+ CreateDataPipe(&a, &b, 64);
+
+ base::RunLoop loop;
+ WatchHelper b_watcher;
+ b_watcher.Watch(
+ b, MOJO_HANDLE_SIGNAL_READABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
+ EXPECT_TRUE(b_watcher.is_watching());
+ loop.Quit();
+ });
+
+ WriteData(a, "Hello!");
+ loop.Run();
+
+ EXPECT_TRUE(b_watcher.is_watching());
+ b_watcher.Cancel();
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+TEST_F(WatchTest, WatchDataPipeProducer) {
+ MojoHandle a, b;
+ CreateDataPipe(&a, &b, 8);
+
+ // Fill the pipe to capacity so writes will block.
+ WriteData(a, "xxxxxxxx");
+
+ base::RunLoop loop;
+ WatchHelper a_watcher;
+ a_watcher.Watch(
+ a, MOJO_HANDLE_SIGNAL_WRITABLE,
+ [&] (MojoResult result, MojoHandleSignalsState state) {
+ EXPECT_EQ(MOJO_RESULT_OK, result);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE,
+ state.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
+ EXPECT_TRUE(a_watcher.is_watching());
+ loop.Quit();
+ });
+
+ EXPECT_EQ("xxxxxxxx", ReadData(b, 8));
+ loop.Run();
+
+ EXPECT_TRUE(a_watcher.is_watching());
+ a_watcher.Cancel();
+
+ CloseHandle(a);
+ CloseHandle(b);
+}
+
+} // namespace
+} // namespace edk
+} // namespace mojo
diff --git a/mojo/edk/system/watcher.cc b/mojo/edk/system/watcher.cc
new file mode 100644
index 0000000..4bc9dbb
--- /dev/null
+++ b/mojo/edk/system/watcher.cc
@@ -0,0 +1,52 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/watcher.h"
+
+#include "mojo/edk/system/handle_signals_state.h"
+#include "mojo/edk/system/request_context.h"
+
+namespace mojo {
+namespace edk {
+
+Watcher::Watcher(MojoHandleSignals signals, const WatchCallback& callback)
+ : signals_(signals), callback_(callback) {
+}
+
+void Watcher::MaybeInvokeCallback(MojoResult result,
+ const HandleSignalsState& state) {
+ base::AutoLock lock(lock_);
+ if (is_cancelled_)
+ return;
+
+ callback_.Run(result, state);
+}
+
+void Watcher::NotifyForStateChange(const HandleSignalsState& signals_state) {
+ RequestContext* request_context = RequestContext::current();
+ if (signals_state.satisfies(signals_)) {
+ request_context->AddWatchNotifyFinalizer(
+ make_scoped_refptr(this), MOJO_RESULT_OK, signals_state);
+ } else if (!signals_state.can_satisfy(signals_)) {
+ request_context->AddWatchNotifyFinalizer(make_scoped_refptr(this),
+ MOJO_RESULT_FAILED_PRECONDITION,
+ signals_state);
+ }
+}
+
+void Watcher::NotifyClosed() {
+ static const HandleSignalsState closed_state = {0, 0};
+ RequestContext::current()->AddWatchNotifyFinalizer(
+ make_scoped_refptr(this), MOJO_RESULT_CANCELLED, closed_state);
+}
+
+void Watcher::Cancel() {
+ base::AutoLock lock(lock_);
+ is_cancelled_ = true;
+}
+
+Watcher::~Watcher() {}
+
+} // namespace edk
+} // namespace mojo
diff --git a/mojo/edk/system/watcher.h b/mojo/edk/system/watcher.h
new file mode 100644
index 0000000..119355e
--- /dev/null
+++ b/mojo/edk/system/watcher.h
@@ -0,0 +1,85 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_SYSTEM_WATCHER_H_
+#define MOJO_EDK_SYSTEM_WATCHER_H_
+
+#include "base/callback.h"
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "mojo/public/c/system/functions.h"
+#include "mojo/public/c/system/types.h"
+
+namespace mojo {
+namespace edk {
+
+struct HandleSignalsState;
+
+// This object corresponds to a watch added by a single call to |MojoWatch()|.
+//
+// An event may occur at any time which should trigger a Watcher to run its
+// callback, but the callback needs to be deferred until all EDK locks are
+// released. At the same time, a watch may be cancelled at any time by
+// |MojoCancelWatch()| and it is not OK for the callback to be invoked after
+// that happens.
+//
+// Therefore a Watcher needs to have some associated thread-safe state to track
+// its cancellation, which is why it's ref-counted.
+class Watcher : public base::RefCountedThreadSafe<Watcher> {
+ public:
+ using WatchCallback =
+ base::Callback<void(MojoResult, const HandleSignalsState&)>;
+
+ // Constructs a new Watcher which watches for |signals| to be satisfied on a
+ // handle and which invokes |callback| either when one such signal is
+ // satisfied, or all such signals become unsatisfiable.
+ Watcher(MojoHandleSignals signals, const WatchCallback& callback);
+
+ // Runs the Watcher's callback with the given arguments if it hasn't been
+ // cancelled yet.
+ void MaybeInvokeCallback(MojoResult result, const HandleSignalsState& state);
+
+ // Notifies the Watcher of a state change. This may result in the Watcher
+ // adding a finalizer to the current RequestContext to invoke its callback,
+ // cancellation notwithstanding.
+ void NotifyForStateChange(const HandleSignalsState& signals_state);
+
+ // Notifies the Watcher of handle closure. This always results in the Watcher
+ // adding a finalizer to the current RequestContext to invoke its callback,
+ // cancellation notwithstanding.
+ void NotifyClosed();
+
+ // Explicitly cancels the watch, guaranteeing that its callback will never be
+ // be invoked again.
+ void Cancel();
+
+ private:
+ friend class base::RefCountedThreadSafe<Watcher>;
+
+ ~Watcher();
+
+ // The set of signals which are watched by this Watcher.
+ const MojoHandleSignals signals_;
+
+ // The callback to invoke with a result and signal state any time signals in
+ // |signals_| are satisfied or become permanently unsatisfiable.
+ const WatchCallback callback_;
+
+ // Guards |is_cancelled_|.
+ base::Lock lock_;
+
+ // Indicates whether the watch has been cancelled. A |Watcher| may exist for a
+ // brief period of time after being cancelled if it's been attached as a
+ // RequestContext finalizer. In such cases the callback must not be invoked,
+ // hence this flag.
+ bool is_cancelled_ = false;
+
+ DISALLOW_COPY_AND_ASSIGN(Watcher);
+};
+
+} // namespace edk
+} // namespace mojo
+
+#endif // MOJO_EDK_SYSTEM_WATCHER_H_
diff --git a/mojo/edk/system/watcher_set.cc b/mojo/edk/system/watcher_set.cc
new file mode 100644
index 0000000..878f29a
--- /dev/null
+++ b/mojo/edk/system/watcher_set.cc
@@ -0,0 +1,57 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/watcher_set.h"
+
+#include "mojo/edk/system/request_context.h"
+#include "mojo/public/c/system/types.h"
+
+namespace mojo {
+namespace edk {
+
+WatcherSet::WatcherSet() {}
+
+WatcherSet::~WatcherSet() {}
+
+void WatcherSet::NotifyForStateChange(const HandleSignalsState& state) {
+ for (const auto& entry : watchers_)
+ entry.second->NotifyForStateChange(state);
+}
+
+void WatcherSet::NotifyClosed() {
+ for (const auto& entry : watchers_)
+ entry.second->NotifyClosed();
+}
+
+MojoResult WatcherSet::Add(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context,
+ const HandleSignalsState& current_state) {
+ auto it = watchers_.find(context);
+ if (it != watchers_.end())
+ return MOJO_RESULT_ALREADY_EXISTS;
+
+ if (!current_state.can_satisfy(signals))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ scoped_refptr<Watcher> watcher(new Watcher(signals, callback));
+ watchers_.insert(std::make_pair(context, watcher));
+
+ watcher->NotifyForStateChange(current_state);
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherSet::Remove(uintptr_t context) {
+ auto it = watchers_.find(context);
+ if (it == watchers_.end())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ RequestContext::current()->AddWatchCancelFinalizer(it->second);
+ watchers_.erase(it);
+ return MOJO_RESULT_OK;
+}
+
+} // namespace edk
+} // namespace mojo
diff --git a/mojo/edk/system/watcher_set.h b/mojo/edk/system/watcher_set.h
new file mode 100644
index 0000000..8ae54a1b
--- /dev/null
+++ b/mojo/edk/system/watcher_set.h
@@ -0,0 +1,54 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_SYSTEM_WATCHER_SET_H_
+#define MOJO_EDK_SYSTEM_WATCHER_SET_H_
+
+#include <unordered_map>
+
+#include "base/callback.h"
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "mojo/edk/system/handle_signals_state.h"
+#include "mojo/edk/system/watcher.h"
+#include "mojo/public/c/system/types.h"
+
+namespace mojo {
+namespace edk {
+
+// A WatcherSet maintains a set of Watchers attached to a single handle and
+// keyed on an arbitrary user context.
+class WatcherSet {
+ public:
+ WatcherSet();
+ ~WatcherSet();
+
+ // Notifies all Watchers of a state change.
+ void NotifyForStateChange(const HandleSignalsState& state);
+
+ // Notifies all Watchers that their watched handle has been closed.
+ void NotifyClosed();
+
+ // Adds a new watcher to watch for signals in |signals| to be satisfied or
+ // unsatisfiable. |current_state| is the current signals state of the
+ // handle being watched.
+ MojoResult Add(MojoHandleSignals signals,
+ const Watcher::WatchCallback& callback,
+ uintptr_t context,
+ const HandleSignalsState& current_state);
+
+ // Removes a watcher from the set.
+ MojoResult Remove(uintptr_t context);
+
+ private:
+ // A map of watchers keyed on context value.
+ std::unordered_map<uintptr_t, scoped_refptr<Watcher>> watchers_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatcherSet);
+};
+
+} // namespace edk
+} // namespace mojo
+
+#endif // MOJO_EDK_SYSTEM_WATCHER_SET_H_
diff --git a/mojo/edk/test/mojo_test_base.cc b/mojo/edk/test/mojo_test_base.cc
index 27c92bb..7ec067a 100644
--- a/mojo/edk/test/mojo_test_base.cc
+++ b/mojo/edk/test/mojo_test_base.cc
@@ -9,6 +9,7 @@
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/system/handle_signals_state.h"
#include "mojo/public/c/system/buffer.h"
+#include "mojo/public/c/system/data_pipe.h"
#include "mojo/public/c/system/functions.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -221,6 +222,48 @@ void MojoTestBase::ExpectBufferContents(MojoHandle h,
EXPECT_EQ(MOJO_RESULT_OK, MojoUnmapBuffer(static_cast<void*>(data)));
}
+// static
+void MojoTestBase::CreateDataPipe(MojoHandle *p0,
+ MojoHandle* p1,
+ size_t capacity) {
+ MojoCreateDataPipeOptions options;
+ options.struct_size = static_cast<uint32_t>(sizeof(options));
+ options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
+ options.element_num_bytes = 1;
+ options.capacity_num_bytes = static_cast<uint32_t>(capacity);
+
+ MojoCreateDataPipe(&options, p0, p1);
+ CHECK_NE(*p0, MOJO_HANDLE_INVALID);
+ CHECK_NE(*p1, MOJO_HANDLE_INVALID);
+}
+
+// static
+void MojoTestBase::WriteData(MojoHandle producer, const std::string& data) {
+ CHECK_EQ(MojoWait(producer, MOJO_HANDLE_SIGNAL_WRITABLE,
+ MOJO_DEADLINE_INDEFINITE, nullptr),
+ MOJO_RESULT_OK);
+ uint32_t num_bytes = static_cast<uint32_t>(data.size());
+ CHECK_EQ(MojoWriteData(producer, data.data(), &num_bytes,
+ MOJO_WRITE_DATA_FLAG_ALL_OR_NONE),
+ MOJO_RESULT_OK);
+ CHECK_EQ(num_bytes, static_cast<uint32_t>(data.size()));
+}
+
+// static
+std::string MojoTestBase::ReadData(MojoHandle consumer, size_t size) {
+ CHECK_EQ(MojoWait(consumer, MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, nullptr),
+ MOJO_RESULT_OK);
+ std::vector<char> buffer(size);
+ uint32_t num_bytes = static_cast<uint32_t>(size);
+ CHECK_EQ(MojoReadData(consumer, buffer.data(), &num_bytes,
+ MOJO_WRITE_DATA_FLAG_ALL_OR_NONE),
+ MOJO_RESULT_OK);
+ CHECK_EQ(num_bytes, static_cast<uint32_t>(size));
+
+ return std::string(buffer.data(), buffer.size());
+}
+
} // namespace test
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/test/mojo_test_base.h b/mojo/edk/test/mojo_test_base.h
index e44316d..71c9276 100644
--- a/mojo/edk/test/mojo_test_base.h
+++ b/mojo/edk/test/mojo_test_base.h
@@ -14,10 +14,6 @@
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
-#include "base/message_loop/message_loop.h"
-#include "base/run_loop.h"
-#include "base/task_runner.h"
-#include "base/thread_task_runner_handle.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/test/multiprocess_test_helper.h"
#include "mojo/public/c/system/types.h"
@@ -131,10 +127,22 @@ class MojoTestBase : public testing::Test {
size_t offset,
const base::StringPiece& s);
+ //////// Data pipe test utilities /////////
+
+ // Creates a new data pipe.
+ static void CreateDataPipe(MojoHandle* producer,
+ MojoHandle* consumer,
+ size_t capacity);
+
+ // Writes data to a data pipe.
+ static void WriteData(MojoHandle producer, const std::string& data);
+
+ // Reads data from a data pipe.
+ static std::string ReadData(MojoHandle consumer, size_t size);
+
private:
friend class ClientController;
- base::MessageLoop message_loop_;
std::vector<scoped_ptr<ClientController>> clients_;
DISALLOW_COPY_AND_ASSIGN(MojoTestBase);
diff --git a/mojo/mojo_edk.gyp b/mojo/mojo_edk.gyp
index 144d8c0..f95cde4 100644
--- a/mojo/mojo_edk.gyp
+++ b/mojo/mojo_edk.gyp
@@ -112,12 +112,18 @@
'edk/system/ports_message.h',
'edk/system/remote_message_pipe_bootstrap.cc',
'edk/system/remote_message_pipe_bootstrap.h',
+ 'edk/system/request_context.cc',
+ 'edk/system/request_context.h',
'edk/system/shared_buffer_dispatcher.cc',
'edk/system/shared_buffer_dispatcher.h',
'edk/system/wait_set_dispatcher.cc',
'edk/system/wait_set_dispatcher.h',
'edk/system/waiter.cc',
'edk/system/waiter.h',
+ 'edk/system/watcher.cc',
+ 'edk/system/watcher.h',
+ 'edk/system/watcher_set.cc',
+ 'edk/system/watcher_set.h',
# Test-only code:
# TODO(vtl): It's a little unfortunate that these end up in the same
# component as non-test-only code. In the static build, this code
diff --git a/mojo/mojo_edk_tests.gyp b/mojo/mojo_edk_tests.gyp
index 03e60d3..94ba7233 100644
--- a/mojo/mojo_edk_tests.gyp
+++ b/mojo/mojo_edk_tests.gyp
@@ -227,6 +227,7 @@
'edk/system/waiter_test_utils.cc',
'edk/system/waiter_test_utils.h',
'edk/system/waiter_unittest.cc',
+ 'edk/system/watch_unittest.cc',
],
'conditions': [
['OS=="ios"', {
diff --git a/mojo/public/c/system/functions.h b/mojo/public/c/system/functions.h
index cd357b5..6c7e2df 100644
--- a/mojo/public/c/system/functions.h
+++ b/mojo/public/c/system/functions.h
@@ -9,6 +9,7 @@
#ifndef MOJO_PUBLIC_C_SYSTEM_FUNCTIONS_H_
#define MOJO_PUBLIC_C_SYSTEM_FUNCTIONS_H_
+#include <stddef.h>
#include <stdint.h>
#include "mojo/public/c/system/system_export.h"
@@ -18,6 +19,13 @@
extern "C" {
#endif
+// A callback used to notify watchers registered via |MojoWatch()|. Called when
+// some watched signals are satisfied or become unsatisfiable. See the
+// documentation for |MojoWatch()| for more details.
+typedef void (*MojoWatchCallback)(uintptr_t context,
+ MojoResult result,
+ struct MojoHandleSignalsState signals_state);
+
// Note: Pointer parameters that are labelled "optional" may be null (at least
// under some circumstances). Non-const pointer parameters are also labeled
// "in", "out", or "in/out", to indicate how they are used. (Note that how/if
@@ -129,6 +137,74 @@ MojoWaitMany(const MojoHandle* handles,
uint32_t* result_index, // Optional out
struct MojoHandleSignalsState* signals_states); // Optional out
+// Watches the given handle for one of the following events to happen:
+// - A signal indicated by |signals| is satisfied.
+// - It becomes known that no signal indicated by |signals| will ever be
+// satisfied. (See the description of the |MOJO_RESULT_CANCELLED| and
+// |MOJO_RESULT_FAILED_PRECONDITION| return values below.)
+// - The handle is closed.
+//
+// |handle|: The handle to watch. Must be an open message pipe or data pipe
+// handle.
+// |signals|: The signals to watch for.
+// |callback|: A function to be called any time one of the above events happens.
+// The function must be safe to call from any thread at any time.
+// |context|: User-provided context passed to |callback| when called. |context|
+// is used to uniquely identify a registered watch and can be used to cancel
+// the watch later using |MojoCancelWatch()|.
+//
+// Returns:
+// |MOJO_RESULT_OK| if the watch has been successfully registered. Note that
+// if the signals are already satisfied this may synchronously invoke
+// |callback| before returning.
+// |MOJO_RESULT_CANCELLED| if the watch was cancelled. In this case it is not
+// necessary to explicitly call |MojoCancelWatch()|, and in fact it may be
+// an error to do so as the handle may have been closed.
+// |MOJO_RESULT_INVALID_ARGUMENT| if |handle| is not an open message pipe
+// handle.
+// |MOJO_RESULT_FAILED_PRECONDITION| if it is already known that |signals| can
+// never be satisfied.
+// |MOJO_RESULT_ALREADY_EXISTS| if there is already a watch registered for
+// the same combination of |handle| and |context|.
+//
+// Callback result codes:
+// The callback may be called at any time on any thread with one of the
+// following result codes to indicate various events:
+//
+// |MOJO_RESULT_OK| indicates that some signal in |signals| has been
+// satisfied.
+// |MOJO_RESULT_FAILED_PRECONDITION| indicates that no signals in |signals|
+// can ever be satisfied again.
+// |MOJO_RESULT_CANCELLED| indicates that the handle has been closed. In this
+// case the watch is implicitly cancelled and there is no need to call
+// |MojoCancelWatch()|.
+MOJO_SYSTEM_EXPORT MojoResult
+MojoWatch(MojoHandle handle,
+ MojoHandleSignals signals,
+ MojoWatchCallback callback,
+ uintptr_t context);
+
+// Cancels a handle watch corresponding to some prior call to |MojoWatch()|.
+//
+// NOTE: If the watch callback corresponding to |context| is currently running
+// this will block until the callback completes execution. It is therefore
+// illegal to call |MojoCancelWatch()| on a given |handle| and |context| from
+// within the associated callback itself, as this will always deadlock.
+//
+// After |MojoCancelWatch()| function returns, the watch's associated callback
+// will NEVER be called again by Mojo.
+//
+// |context|: The same user-provided context given to some prior call to
+// |MojoWatch()|. Only the watch corresponding to this context will be
+// cancelled.
+//
+// Returns:
+// |MOJO_RESULT_OK| if the watch corresponding to |context| was cancelled.
+// |MOJO_RESULT_INVALID_ARGUMENT| if no watch was registered with |context|
+// for the given |handle|, or if |handle| is invalid.
+MOJO_SYSTEM_EXPORT MojoResult
+MojoCancelWatch(MojoHandle handle, uintptr_t context);
+
#ifdef __cplusplus
} // extern "C"
#endif