summaryrefslogtreecommitdiffstats
path: root/mojo/edk
diff options
context:
space:
mode:
authorrockot <rockot@chromium.org>2014-12-15 16:54:14 -0800
committerCommit bot <commit-bot@chromium.org>2014-12-16 00:54:37 +0000
commit6f6c4e614de20d2509ee53e04b1c5351db71e1c6 (patch)
tree59b3af8852d3a35bf72981e29bdd4516d67abb4b /mojo/edk
parent3ac8a6d5596a29d00102e280ec7e7164c2807370 (diff)
downloadchromium_src-6f6c4e614de20d2509ee53e04b1c5351db71e1c6.zip
chromium_src-6f6c4e614de20d2509ee53e04b1c5351db71e1c6.tar.gz
chromium_src-6f6c4e614de20d2509ee53e04b1c5351db71e1c6.tar.bz2
Update mojo sdk to rev 59145288bae55b0fce4276b017df6a1117bcf00f
BUG=None TBR=jamesr@chromium.org for mojo Review URL: https://codereview.chromium.org/799113004 Cr-Commit-Position: refs/heads/master@{#308479}
Diffstat (limited to 'mojo/edk')
-rw-r--r--mojo/edk/embedder/embedder.cc2
-rw-r--r--mojo/edk/js/tests/sample_service_tests.js4
-rw-r--r--mojo/edk/mojo_edk.gni7
-rw-r--r--mojo/edk/mojo_edk_system_impl.gypi6
-rw-r--r--mojo/edk/mojo_edk_tests.gyp8
-rw-r--r--mojo/edk/system/BUILD.gn2
-rw-r--r--mojo/edk/system/awakable.h4
-rw-r--r--mojo/edk/system/awakable_list.cc14
-rw-r--r--mojo/edk/system/awakable_list_unittest.cc69
-rw-r--r--mojo/edk/system/channel.cc236
-rw-r--r--mojo/edk/system/channel.h100
-rw-r--r--mojo/edk/system/channel_unittest.cc6
-rw-r--r--mojo/edk/system/incoming_endpoint.cc59
-rw-r--r--mojo/edk/system/incoming_endpoint.h54
-rw-r--r--mojo/edk/system/local_message_pipe_endpoint.cc5
-rw-r--r--mojo/edk/system/local_message_pipe_endpoint.h5
-rw-r--r--mojo/edk/system/message_in_transit.cc4
-rw-r--r--mojo/edk/system/message_in_transit.h4
-rw-r--r--mojo/edk/system/message_pipe.cc95
-rw-r--r--mojo/edk/system/message_pipe.h9
-rw-r--r--mojo/edk/system/message_pipe_test_utils.cc4
-rw-r--r--mojo/edk/system/remote_message_pipe_unittest.cc36
-rw-r--r--mojo/edk/system/waiter.cc5
-rw-r--r--mojo/edk/system/waiter.h2
-rw-r--r--mojo/edk/test/BUILD.gn63
25 files changed, 543 insertions, 260 deletions
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index d538810..9704fef 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -44,7 +44,7 @@ system::ChannelId MakeChannel(
return 0;
}
- channel->AttachAndRunEndpoint(channel_endpoint, true);
+ channel->SetBootstrapEndpoint(channel_endpoint);
DCHECK(internal::g_channel_manager);
return internal::g_channel_manager->AddChannel(
diff --git a/mojo/edk/js/tests/sample_service_tests.js b/mojo/edk/js/tests/sample_service_tests.js
index ca4f8e6..1b62e97 100644
--- a/mojo/edk/js/tests/sample_service_tests.js
+++ b/mojo/edk/js/tests/sample_service_tests.js
@@ -155,8 +155,8 @@ define([
serviceImpl.accept(message);
};
- var receiver = new SimpleMessageReceiver();
- var serviceProxy = new sample.Service.proxyClass(receiver);
+ var serviceProxy = new sample.Service.proxyClass;
+ serviceProxy.receiver_ = new SimpleMessageReceiver;
checkDefaultValues();
diff --git a/mojo/edk/mojo_edk.gni b/mojo/edk/mojo_edk.gni
index 34b82d4..1d1d992 100644
--- a/mojo/edk/mojo_edk.gni
+++ b/mojo/edk/mojo_edk.gni
@@ -19,7 +19,7 @@ import("../public/mojo_sdk.gni")
template("mojo_edk_source_set") {
mojo_sdk_source_set(target_name) {
restrict_external_deps = false
-
+
if (defined(invoker.visibility)) {
visibility = invoker.visibility
}
@@ -61,9 +61,8 @@ template("mojo_edk_source_set") {
# Check that the EDK target was not mistakenly given as an absolute
# path.
assert(get_path_info(edk_target, "abspath") != edk_target)
- allow_circular_includes_from += [
- rebase_path(edk_target, ".", mojo_root)
- ]
+ allow_circular_includes_from +=
+ [ rebase_path(edk_target, ".", mojo_root) ]
}
}
diff --git a/mojo/edk/mojo_edk_system_impl.gypi b/mojo/edk/mojo_edk_system_impl.gypi
index bab747b..2061527 100644
--- a/mojo/edk/mojo_edk_system_impl.gypi
+++ b/mojo/edk/mojo_edk_system_impl.gypi
@@ -73,6 +73,8 @@
'system/handle_signals_state.h',
'system/handle_table.cc',
'system/handle_table.h',
+ 'system/incoming_endpoint.cc',
+ 'system/incoming_endpoint.h',
'system/local_data_pipe.cc',
'system/local_data_pipe.h',
'system/local_message_pipe_endpoint.cc',
@@ -121,8 +123,8 @@
},
'conditions': [
['OS=="android"', {
- "dependencies": [
- "<(DEPTH)/third_party/ashmem/ashmem.gyp:ashmem",
+ 'dependencies': [
+ '<(DEPTH)/third_party/ashmem/ashmem.gyp:ashmem',
],
}],
],
diff --git a/mojo/edk/mojo_edk_tests.gyp b/mojo/edk/mojo_edk_tests.gyp
index a571069..bcd71e7 100644
--- a/mojo/edk/mojo_edk_tests.gyp
+++ b/mojo/edk/mojo_edk_tests.gyp
@@ -30,7 +30,7 @@
},
# TODO(vtl): Reorganize the mojo_public_*_unittests.
{
- # GN version: //mojo/public/cpp/bindings/tests:mojo_public_bindings_unittests
+ # GN version: //mojo/edk/test:mojo_public_bindings_unittests
'target_name': 'mojo_public_bindings_unittests',
'type': 'executable',
'dependencies': [
@@ -64,7 +64,7 @@
],
},
{
- # GN version: //mojo/public/cpp/environment/tests:mojo_public_environment_unittests
+ # GN version: //mojo/edk/test:mojo_public_environment_unittests
'target_name': 'mojo_public_environment_unittests',
'type': 'executable',
'dependencies': [
@@ -83,7 +83,7 @@
],
},
{
- # GN version: //mojo/public/cpp/application/tests:mojo_public_application_unittests
+ # GN version: //mojo/edk/test:mojo_public_application_unittests
'target_name': 'mojo_public_application_unittests',
'type': 'executable',
'dependencies': [
@@ -140,7 +140,7 @@
],
},
{
- # GN version: //mojo/public/c/system/tests:perftests
+ # GN version: //mojo/edk/test:mojo_public_system_perftests
'target_name': 'mojo_public_system_perftests',
'type': 'executable',
'dependencies': [
diff --git a/mojo/edk/system/BUILD.gn b/mojo/edk/system/BUILD.gn
index 8d24a08..59df9c8 100644
--- a/mojo/edk/system/BUILD.gn
+++ b/mojo/edk/system/BUILD.gn
@@ -51,6 +51,8 @@ component("system") {
"handle_signals_state.h",
"handle_table.cc",
"handle_table.h",
+ "incoming_endpoint.cc",
+ "incoming_endpoint.h",
"local_data_pipe.cc",
"local_data_pipe.h",
"local_message_pipe_endpoint.cc",
diff --git a/mojo/edk/system/awakable.h b/mojo/edk/system/awakable.h
index 5479912..057d3eb 100644
--- a/mojo/edk/system/awakable.h
+++ b/mojo/edk/system/awakable.h
@@ -20,7 +20,9 @@ class MOJO_SYSTEM_IMPL_EXPORT Awakable {
// * As this is called from any thread, this must be thread-safe.
// * As this is called inside a lock, this must not call anything that takes
// "non-terminal" locks, i.e., those which are always safe to take.
- virtual void Awake(MojoResult result, uintptr_t context) = 0;
+ // This should return false if this must not be called again for the same
+ // reason (e.g., for the same call to |AwakableList::Add()|).
+ virtual bool Awake(MojoResult result, uintptr_t context) = 0;
protected:
Awakable() {}
diff --git a/mojo/edk/system/awakable_list.cc b/mojo/edk/system/awakable_list.cc
index 6af305b..e74e2c0 100644
--- a/mojo/edk/system/awakable_list.cc
+++ b/mojo/edk/system/awakable_list.cc
@@ -19,12 +19,18 @@ AwakableList::~AwakableList() {
}
void AwakableList::AwakeForStateChange(const HandleSignalsState& state) {
- for (AwakeInfoList::iterator it = awakables_.begin(); it != awakables_.end();
- ++it) {
+ for (AwakeInfoList::iterator it = awakables_.begin();
+ it != awakables_.end();) {
+ bool keep = true;
if (state.satisfies(it->signals))
- it->awakable->Awake(MOJO_RESULT_OK, it->context);
+ keep = it->awakable->Awake(MOJO_RESULT_OK, it->context);
else if (!state.can_satisfy(it->signals))
- it->awakable->Awake(MOJO_RESULT_FAILED_PRECONDITION, it->context);
+ keep = it->awakable->Awake(MOJO_RESULT_FAILED_PRECONDITION, it->context);
+ AwakeInfoList::iterator maybe_delete = it;
+ ++it;
+
+ if (!keep)
+ awakables_.erase(maybe_delete);
}
}
diff --git a/mojo/edk/system/awakable_list_unittest.cc b/mojo/edk/system/awakable_list_unittest.cc
index 201b751..0ef6a7a 100644
--- a/mojo/edk/system/awakable_list_unittest.cc
+++ b/mojo/edk/system/awakable_list_unittest.cc
@@ -284,6 +284,75 @@ TEST(AwakableListTest, MultipleAwakables) {
EXPECT_EQ(10u, context4);
}
+class KeepAwakable : public Awakable {
+ public:
+ KeepAwakable() : awake_count(0) {}
+
+ bool Awake(MojoResult result, uintptr_t context) override {
+ awake_count++;
+ return true;
+ }
+
+ int awake_count;
+
+ DISALLOW_COPY_AND_ASSIGN(KeepAwakable);
+};
+
+class RemoveAwakable : public Awakable {
+ public:
+ RemoveAwakable() : awake_count(0) {}
+
+ bool Awake(MojoResult result, uintptr_t context) override {
+ awake_count++;
+ return false;
+ }
+
+ int awake_count;
+
+ DISALLOW_COPY_AND_ASSIGN(RemoveAwakable);
+};
+
+TEST(AwakableListTest, KeepAwakablesReturningTrue) {
+ KeepAwakable keep0;
+ KeepAwakable keep1;
+ RemoveAwakable remove0;
+ RemoveAwakable remove1;
+ RemoveAwakable remove2;
+
+ HandleSignalsState hss(MOJO_HANDLE_SIGNAL_WRITABLE,
+ MOJO_HANDLE_SIGNAL_WRITABLE);
+
+ AwakableList remove_all;
+ remove_all.Add(&remove0, MOJO_HANDLE_SIGNAL_WRITABLE, 0);
+ remove_all.Add(&remove1, MOJO_HANDLE_SIGNAL_WRITABLE, 0);
+
+ remove_all.AwakeForStateChange(hss);
+ EXPECT_EQ(remove0.awake_count, 1);
+ EXPECT_EQ(remove1.awake_count, 1);
+
+ remove_all.AwakeForStateChange(hss);
+ EXPECT_EQ(remove0.awake_count, 1);
+ EXPECT_EQ(remove1.awake_count, 1);
+
+ AwakableList remove_first;
+ remove_first.Add(&remove2, MOJO_HANDLE_SIGNAL_WRITABLE, 0);
+ remove_first.Add(&keep0, MOJO_HANDLE_SIGNAL_WRITABLE, 0);
+ remove_first.Add(&keep1, MOJO_HANDLE_SIGNAL_WRITABLE, 0);
+
+ remove_first.AwakeForStateChange(hss);
+ EXPECT_EQ(keep0.awake_count, 1);
+ EXPECT_EQ(keep1.awake_count, 1);
+ EXPECT_EQ(remove2.awake_count, 1);
+
+ remove_first.AwakeForStateChange(hss);
+ EXPECT_EQ(keep0.awake_count, 2);
+ EXPECT_EQ(keep1.awake_count, 2);
+ EXPECT_EQ(remove2.awake_count, 1);
+
+ remove_first.Remove(&keep0);
+ remove_first.Remove(&keep1);
+}
+
} // namespace
} // namespace system
} // namespace mojo
diff --git a/mojo/edk/system/channel.cc b/mojo/edk/system/channel.cc
index 1e26145..a7162dd 100644
--- a/mojo/edk/system/channel.cc
+++ b/mojo/edk/system/channel.cc
@@ -16,6 +16,18 @@
namespace mojo {
namespace system {
+namespace {
+
+struct SerializedEndpoint {
+ // This is the endpoint ID on the receiving side, and should be a "remote ID".
+ // (The receiving side should already have had an endpoint attached and been
+ // run via the |Channel|s. This endpoint will have both IDs assigned, so this
+ // ID is only needed to associate that endpoint with a particular dispatcher.)
+ ChannelEndpointId receiver_endpoint_id;
+};
+
+} // namespace
+
Channel::Channel(embedder::PlatformSupport* platform_support)
: platform_support_(platform_support),
is_running_(false),
@@ -90,57 +102,25 @@ void Channel::WillShutdownSoon() {
channel_manager_ = nullptr;
}
-// Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
-// keeps the endpoint alive even after the lock is released. Otherwise, there's
-// the temptation to simply pass the result of |new ChannelEndpoint(...)|
-// directly to this function, which wouldn't be sufficient for safety.
-ChannelEndpointId Channel::AttachAndRunEndpoint(
- scoped_refptr<ChannelEndpoint> endpoint,
- bool is_bootstrap) {
+void Channel::SetBootstrapEndpoint(scoped_refptr<ChannelEndpoint> endpoint) {
DCHECK(endpoint);
- ChannelEndpointId local_id;
- ChannelEndpointId remote_id;
+ // Used for both local and remote IDs.
+ ChannelEndpointId bootstrap_id = ChannelEndpointId::GetBootstrap();
+
{
base::AutoLock locker(lock_);
DLOG_IF(WARNING, is_shutting_down_)
- << "AttachEndpoint() while shutting down";
+ << "SetBootstrapEndpoint() while shutting down";
- if (is_bootstrap) {
- local_id = ChannelEndpointId::GetBootstrap();
- DCHECK(local_id_to_endpoint_map_.find(local_id) ==
- local_id_to_endpoint_map_.end());
-
- remote_id = ChannelEndpointId::GetBootstrap();
- } else {
- do {
- local_id = local_id_generator_.GetNext();
- } while (local_id_to_endpoint_map_.find(local_id) !=
- local_id_to_endpoint_map_.end());
+ // Bootstrap endpoint should be the first.
+ DCHECK(local_id_to_endpoint_map_.empty());
- // TODO(vtl): We also need to check for collisions of remote IDs here.
- remote_id = remote_id_generator_.GetNext();
- }
-
- local_id_to_endpoint_map_[local_id] = endpoint;
+ local_id_to_endpoint_map_[bootstrap_id] = endpoint;
}
- if (!is_bootstrap) {
- if (!SendControlMessage(
- MessageInTransit::kSubtypeChannelAttachAndRunEndpoint, local_id,
- remote_id)) {
- HandleLocalError(base::StringPrintf(
- "Failed to send message to run remote message pipe endpoint (local "
- "ID %u, remote ID %u)",
- static_cast<unsigned>(local_id.value()),
- static_cast<unsigned>(remote_id.value())));
- // TODO(vtl): Should we continue on to |AttachAndRun()|?
- }
- }
-
- endpoint->AttachAndRun(this, local_id, remote_id);
- return remote_id;
+ endpoint->AttachAndRun(this, bootstrap_id, bootstrap_id);
}
bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
@@ -191,33 +171,50 @@ void Channel::DetachEndpoint(ChannelEndpoint* endpoint,
// Send a remove message outside the lock.
}
- if (!SendControlMessage(
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, local_id,
- remote_id)) {
+ if (!SendControlMessage(MessageInTransit::kSubtypeChannelRemoveEndpoint,
+ local_id, remote_id)) {
HandleLocalError(base::StringPrintf(
- "Failed to send message to remove remote message pipe endpoint (local "
- "ID %u, remote ID %u)",
+ "Failed to send message to remove remote endpoint (local ID %u, remote "
+ "ID %u)",
static_cast<unsigned>(local_id.value()),
static_cast<unsigned>(remote_id.value())));
}
}
-scoped_refptr<MessagePipe> Channel::PassIncomingMessagePipe(
- ChannelEndpointId local_id) {
+size_t Channel::GetSerializedEndpointSize() const {
+ return sizeof(SerializedEndpoint);
+}
+
+void Channel::SerializeEndpoint(scoped_refptr<ChannelEndpoint> endpoint,
+ void* destination) {
+ SerializedEndpoint* s = static_cast<SerializedEndpoint*>(destination);
+ s->receiver_endpoint_id = AttachAndRunEndpoint(endpoint);
+ DVLOG(2) << "Serializing endpoint (remote ID = " << s->receiver_endpoint_id
+ << ")";
+}
+
+scoped_refptr<IncomingEndpoint> Channel::DeserializeEndpoint(
+ const void* source) {
+ const SerializedEndpoint* s = static_cast<const SerializedEndpoint*>(source);
+ ChannelEndpointId local_id = s->receiver_endpoint_id;
// No need to check the validity of |local_id| -- if it's not valid, it simply
- // won't be in |incoming_message_pipes_|.
+ // won't be in |incoming_endpoints_|.
DVLOG_IF(2, !local_id.is_valid() || !local_id.is_remote())
- << "Attempt to get invalid incoming message pipe for ID " << local_id;
+ << "Attempt to get incoming endpoint for invalid ID " << local_id;
base::AutoLock locker(lock_);
- auto it = incoming_message_pipes_.find(local_id);
- if (it == incoming_message_pipes_.end())
+ auto it = incoming_endpoints_.find(local_id);
+ if (it == incoming_endpoints_.end()) {
+ LOG(ERROR) << "Failed to deserialize endpoint (ID = " << local_id << ")";
return nullptr;
+ }
+
+ DVLOG(2) << "Deserializing endpoint (new local ID = " << local_id << ")";
- scoped_refptr<MessagePipe> rv;
+ scoped_refptr<IncomingEndpoint> rv;
rv.swap(it->second);
- incoming_message_pipes_.erase(it);
+ incoming_endpoints_.erase(it);
return rv;
}
@@ -356,32 +353,32 @@ void Channel::OnReadMessageForChannel(
switch (message_view.subtype()) {
case MessageInTransit::kSubtypeChannelAttachAndRunEndpoint:
- DVLOG(2) << "Handling channel message to attach and run message pipe "
- "(local ID " << message_view.destination_id()
- << ", remote ID " << message_view.source_id() << ")";
+ DVLOG(2) << "Handling channel message to attach and run endpoint (local "
+ "ID " << message_view.destination_id() << ", remote ID "
+ << message_view.source_id() << ")";
if (!OnAttachAndRunEndpoint(message_view.destination_id(),
message_view.source_id())) {
HandleRemoteError(
- "Received invalid channel message to attach and run message pipe");
+ "Received invalid channel message to attach and run endpoint");
}
break;
- case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
- DVLOG(2) << "Handling channel message to remove message pipe (local ID "
+ case MessageInTransit::kSubtypeChannelRemoveEndpoint:
+ DVLOG(2) << "Handling channel message to remove endpoint (local ID "
<< message_view.destination_id() << ", remote ID "
<< message_view.source_id() << ")";
- if (!OnRemoveMessagePipeEndpoint(message_view.destination_id(),
- message_view.source_id())) {
+ if (!OnRemoveEndpoint(message_view.destination_id(),
+ message_view.source_id())) {
HandleRemoteError(
- "Received invalid channel message to remove message pipe");
+ "Received invalid channel message to remove endpoint");
}
break;
- case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
- DVLOG(2) << "Handling channel message to ack remove message pipe (local "
- "ID " << message_view.destination_id() << ", remote ID "
+ case MessageInTransit::kSubtypeChannelRemoveEndpointAck:
+ DVLOG(2) << "Handling channel message to ack remove endpoint (local ID "
+ << message_view.destination_id() << ", remote ID "
<< message_view.source_id() << ")";
- if (!OnRemoveMessagePipeEndpointAck(message_view.destination_id())) {
+ if (!OnRemoveEndpointAck(message_view.destination_id())) {
HandleRemoteError(
- "Received invalid channel message to ack remove message pipe");
+ "Received invalid channel message to ack remove endpoint");
}
break;
default:
@@ -406,10 +403,10 @@ bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id,
return false;
}
- // Create a message pipe and thus an endpoint (outside the lock).
- scoped_refptr<ChannelEndpoint> endpoint;
- scoped_refptr<MessagePipe> message_pipe(
- MessagePipe::CreateLocalProxy(&endpoint));
+ // Create/initialize an |IncomingEndpoint| and thus an endpoint (outside the
+ // lock).
+ scoped_refptr<IncomingEndpoint> incoming_endpoint(new IncomingEndpoint());
+ scoped_refptr<ChannelEndpoint> endpoint = incoming_endpoint->Init();
bool success = true;
{
@@ -417,21 +414,20 @@ bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id,
if (local_id_to_endpoint_map_.find(local_id) ==
local_id_to_endpoint_map_.end()) {
- DCHECK(incoming_message_pipes_.find(local_id) ==
- incoming_message_pipes_.end());
+ DCHECK(incoming_endpoints_.find(local_id) == incoming_endpoints_.end());
// TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll
// avoid some refcount churn.)
local_id_to_endpoint_map_[local_id] = endpoint;
- incoming_message_pipes_[local_id] = message_pipe;
+ incoming_endpoints_[local_id] = incoming_endpoint;
} else {
- // We need to call |Close()| on the message pipe outside the lock.
+ // We need to call |Close()| outside the lock.
success = false;
}
}
if (!success) {
DVLOG(2) << "Received attach and run endpoint for existing local ID";
- message_pipe->Close(0);
+ incoming_endpoint->Close();
return false;
}
@@ -439,8 +435,8 @@ bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id,
return true;
}
-bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id,
- ChannelEndpointId remote_id) {
+bool Channel::OnRemoveEndpoint(ChannelEndpointId local_id,
+ ChannelEndpointId remote_id) {
DCHECK(creation_thread_checker_.CalledOnValidThread());
scoped_refptr<ChannelEndpoint> endpoint;
@@ -449,7 +445,7 @@ bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id,
IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
if (it == local_id_to_endpoint_map_.end()) {
- DVLOG(2) << "Remove message pipe endpoint error: not found";
+ DVLOG(2) << "Remove endpoint error: not found";
return false;
}
@@ -465,12 +461,11 @@ bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id,
endpoint->DetachFromChannel();
- if (!SendControlMessage(
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
- local_id, remote_id)) {
+ if (!SendControlMessage(MessageInTransit::kSubtypeChannelRemoveEndpointAck,
+ local_id, remote_id)) {
HandleLocalError(base::StringPrintf(
- "Failed to send message to remove remote message pipe endpoint ack "
- "(local ID %u, remote ID %u)",
+ "Failed to send message to ack remove remote endpoint (local ID %u, "
+ "remote ID %u)",
static_cast<unsigned>(local_id.value()),
static_cast<unsigned>(remote_id.value())));
}
@@ -478,19 +473,19 @@ bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id,
return true;
}
-bool Channel::OnRemoveMessagePipeEndpointAck(ChannelEndpointId local_id) {
+bool Channel::OnRemoveEndpointAck(ChannelEndpointId local_id) {
DCHECK(creation_thread_checker_.CalledOnValidThread());
base::AutoLock locker(lock_);
IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
if (it == local_id_to_endpoint_map_.end()) {
- DVLOG(2) << "Remove message pipe endpoint ack error: not found";
+ DVLOG(2) << "Remove endpoint ack error: not found";
return false;
}
if (it->second) {
- DVLOG(2) << "Remove message pipe endpoint ack error: wrong state";
+ DVLOG(2) << "Remove endpoint ack error: wrong state";
return false;
}
@@ -498,18 +493,6 @@ bool Channel::OnRemoveMessagePipeEndpointAck(ChannelEndpointId local_id) {
return true;
}
-bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
- ChannelEndpointId local_id,
- ChannelEndpointId remote_id) {
- DVLOG(2) << "Sending channel control message: subtype " << subtype
- << ", local ID " << local_id << ", remote ID " << remote_id;
- scoped_ptr<MessageInTransit> message(new MessageInTransit(
- MessageInTransit::kTypeChannel, subtype, 0, nullptr));
- message->set_source_id(local_id);
- message->set_destination_id(remote_id);
- return WriteMessage(message.Pass());
-}
-
void Channel::HandleRemoteError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this? Probably we want to
// terminate the connection, since it's spewing invalid stuff.
@@ -525,5 +508,58 @@ void Channel::HandleLocalError(const base::StringPiece& error_message) {
LOG(WARNING) << error_message;
}
+// Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
+// keeps the endpoint alive even after the lock is released. Otherwise, there's
+// the temptation to simply pass the result of |new ChannelEndpoint(...)|
+// directly to this function, which wouldn't be sufficient for safety.
+ChannelEndpointId Channel::AttachAndRunEndpoint(
+ scoped_refptr<ChannelEndpoint> endpoint) {
+ DCHECK(endpoint);
+
+ ChannelEndpointId local_id;
+ ChannelEndpointId remote_id;
+ {
+ base::AutoLock locker(lock_);
+
+ DLOG_IF(WARNING, is_shutting_down_)
+ << "AttachAndRunEndpoint() while shutting down";
+
+ do {
+ local_id = local_id_generator_.GetNext();
+ } while (local_id_to_endpoint_map_.find(local_id) !=
+ local_id_to_endpoint_map_.end());
+
+ // TODO(vtl): We also need to check for collisions of remote IDs here.
+ remote_id = remote_id_generator_.GetNext();
+
+ local_id_to_endpoint_map_[local_id] = endpoint;
+ }
+
+ if (!SendControlMessage(MessageInTransit::kSubtypeChannelAttachAndRunEndpoint,
+ local_id, remote_id)) {
+ HandleLocalError(base::StringPrintf(
+ "Failed to send message to run remote endpoint (local ID %u, remote ID "
+ "%u)",
+ static_cast<unsigned>(local_id.value()),
+ static_cast<unsigned>(remote_id.value())));
+ // TODO(vtl): Should we continue on to |AttachAndRun()|?
+ }
+
+ endpoint->AttachAndRun(this, local_id, remote_id);
+ return remote_id;
+}
+
+bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
+ ChannelEndpointId local_id,
+ ChannelEndpointId remote_id) {
+ DVLOG(2) << "Sending channel control message: subtype " << subtype
+ << ", local ID " << local_id << ", remote ID " << remote_id;
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::kTypeChannel, subtype, 0, nullptr));
+ message->set_source_id(local_id);
+ message->set_destination_id(remote_id);
+ return WriteMessage(message.Pass());
+}
+
} // namespace system
} // namespace mojo
diff --git a/mojo/edk/system/channel.h b/mojo/edk/system/channel.h
index 227d202..4d65d9b 100644
--- a/mojo/edk/system/channel.h
+++ b/mojo/edk/system/channel.h
@@ -17,8 +17,8 @@
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
+#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/message_in_transit.h"
-#include "mojo/edk/system/message_pipe.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/system_impl_export.h"
#include "mojo/public/c/system/types.h"
@@ -45,9 +45,10 @@ class ChannelManager;
// reference is kept on its creation thread and is released after |Shutdown()|
// is called, but other threads may have temporarily "dangling" references).
//
-// Note the lock order (in order of allowable acquisition): |MessagePipe|,
-// |ChannelEndpoint|, |Channel|. Thus |Channel| may not call into
-// |ChannelEndpoint| with |Channel|'s lock held.
+// Note the lock order (in order of allowable acquisition):
+// |ChannelEndpointClient| (e.g., |MessagePipe|), |ChannelEndpoint|, |Channel|.
+// Thus |Channel| may not call into |ChannelEndpoint| with |Channel|'s lock
+// held.
class MOJO_SYSTEM_IMPL_EXPORT Channel
: public base::RefCountedThreadSafe<Channel>,
public RawChannel::Delegate {
@@ -79,25 +80,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
// If set, the channel manager associated with this channel will be reset.
void WillShutdownSoon();
- // Attaches the given endpoint to this channel and runs it. |is_bootstrap|
- // should be set if and only if it is the first endpoint on the channel. This
- // assigns the endpoint both local and remote IDs. If |is_bootstrap| is set,
- // both are the bootstrap ID (given by |ChannelEndpointId::GetBootstrap()|);
- // if not, it will also send a |kSubtypeChannelAttachAndRunEndpoint| message
- // to the remote side to tell it to create an endpoint as well.
+ // Called to set (i.e., attach and run) the bootstrap (first) endpoint on the
+ // channel. Both the local and remote IDs are the bootstrap ID (given by
+ // |ChannelEndpointId::GetBootstrap()|).
//
- // (Bootstrapping is symmetric: Both sides attach and run endpoints with
- // |is_bootstrap| set, which establishes the first message pipe across a
- // channel.)
- //
- // This returns the *remote* ID (which will be the bootstrap ID in the
- // bootstrap case, and a "remote ID", i.e., one for which |is_remote()|
- // returns true, otherwise).
- //
- // TODO(vtl): Maybe limit the number of attached message pipes.
- ChannelEndpointId AttachAndRunEndpoint(
- scoped_refptr<ChannelEndpoint> endpoint,
- bool is_bootstrap);
+ // (Bootstrapping is symmetric: Both sides call this, which will establish the
+ // first connection across a channel.)
+ void SetBootstrapEndpoint(scoped_refptr<ChannelEndpoint> endpoint);
// This forwards |message| verbatim to |raw_channel_|.
bool WriteMessage(scoped_ptr<MessageInTransit> message);
@@ -116,10 +105,25 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
ChannelEndpointId local_id,
ChannelEndpointId remote_id);
- // Takes ownership of an incoming message pipe (i.e., one that was created via
- // a |kSubtypeChannelAttachAndRunEndpoint| message).
- scoped_refptr<MessagePipe> PassIncomingMessagePipe(
- ChannelEndpointId local_id);
+ // Returns the size of a serialized endpoint (see |SerializeEndpoint()| and
+ // |DeserializeEndpoint()| below). This value will remain constant for a given
+ // instance of |Channel|.
+ size_t GetSerializedEndpointSize() const;
+
+ // Serializes the given endpoint, writing to |destination| auxiliary
+ // information to be transmitted to the peer |Channel| via some other means.
+ // |destination| should point to a buffer of (at least) the size returned by
+ // |GetSerializedEndpointSize()| (exactly that much data will be written).
+ void SerializeEndpoint(scoped_refptr<ChannelEndpoint> endpoint,
+ void* destination);
+
+ // Deserializes an endpoint that was sent from the peer |Channel| (using
+ // |SerializeEndpoint()|. |source| should be (a copy of) the data that
+ // |SerializeEndpoint()| wrote, and must be (at least)
+ // |GetSerializedEndpointSize()| bytes. This returns the deserialized
+ // |IncomingEndpoint| (which can be converted into a |MessagePipe|) or null on
+ // error.
+ scoped_refptr<IncomingEndpoint> DeserializeEndpoint(const void* source);
// See |RawChannel::GetSerializedPlatformHandleSize()|.
size_t GetSerializedPlatformHandleSize() const;
@@ -149,11 +153,11 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
// Handles "attach and run endpoint" messages.
bool OnAttachAndRunEndpoint(ChannelEndpointId local_id,
ChannelEndpointId remote_id);
- // Handles "remove message pipe endpoint" messages.
- bool OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id,
- ChannelEndpointId remote_id);
- // Handles "remove message pipe endpoint ack" messages.
- bool OnRemoveMessagePipeEndpointAck(ChannelEndpointId local_id);
+ // Handles "remove endpoint" messages.
+ bool OnRemoveEndpoint(ChannelEndpointId local_id,
+ ChannelEndpointId remote_id);
+ // Handles "remove endpoint ack" messages.
+ bool OnRemoveEndpointAck(ChannelEndpointId local_id);
// Handles errors (e.g., invalid messages) from the remote side. Callable from
// any thread.
@@ -162,6 +166,16 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
// thread.
void HandleLocalError(const base::StringPiece& error_message);
+ // Helper for |SerializeEndpoint()|: Attaches the given (non-bootstrap)
+ // endpoint to this channel and runs it. This assigns the endpoint both local
+ // and remote IDs. This will also send a |kSubtypeChannelAttachAndRunEndpoint|
+ // message to the remote side to tell it to create an endpoint as well. This
+ // returns the *remote* ID (one for which |is_remote()| returns true).
+ //
+ // TODO(vtl): Maybe limit the number of attached message pipes.
+ ChannelEndpointId AttachAndRunEndpoint(
+ scoped_refptr<ChannelEndpoint> endpoint);
+
// Helper to send channel control messages. Returns true on success. Should be
// called *without* |lock_| held. Callable from any thread.
bool SendControlMessage(MessageInTransit::Subtype subtype,
@@ -172,9 +186,10 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
embedder::PlatformSupport* const platform_support_;
- // Note: |MessagePipe|s MUST NOT be used under |lock_|. I.e., |lock_| can only
- // be acquired after |MessagePipe::lock_|, never before. Thus to call into a
- // |MessagePipe|, a reference to the |MessagePipe| should be acquired from
+ // Note: |ChannelEndpointClient|s (in particular, |MessagePipe|s) MUST NOT be
+ // used under |lock_|. E.g., |lock_| can only be acquired after
+ // |MessagePipe::lock_|, never before. Thus to call into a
+ // |ChannelEndpointClinet|, a reference should be acquired from
// |local_id_to_endpoint_map_| under |lock_| and then the lock released.
base::Lock lock_; // Protects the members below.
@@ -194,18 +209,11 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
// Note: The IDs generated by this should be checked for existence before use.
LocalChannelEndpointIdGenerator local_id_generator_;
- typedef base::hash_map<ChannelEndpointId, scoped_refptr<MessagePipe>>
- IdToMessagePipeMap;
- // Map from local IDs to pending/incoming message pipes (i.e., those which do
- // not yet have a dispatcher attached).
- // TODO(vtl): This is a layering violation, since |Channel| shouldn't know
- // about |MessagePipe|. However, we can't just hang on to |ChannelEndpoint|s
- // (even if they have a reference to the |MessagePipe|) since their lifetimes
- // are tied to the "remote" side. When |ChannelEndpoint::DetachFromChannel()|
- // (eventually) results in |ChannelEndpoint::DetachFromClient()| being called.
- // We really need to hang on to the "local" side of the message pipe, to which
- // dispatchers will be "attached".
- IdToMessagePipeMap incoming_message_pipes_;
+ typedef base::hash_map<ChannelEndpointId, scoped_refptr<IncomingEndpoint>>
+ IdToIncomingEndpointMap;
+ // Map from local IDs to incoming endpoints (i.e., those received inside other
+ // messages, but not yet claimed via |DeserializeEndpoint()|).
+ IdToIncomingEndpointMap incoming_endpoints_;
// TODO(vtl): We need to keep track of remote IDs (so that we don't collide
// if/when we wrap).
RemoteChannelEndpointIdGenerator remote_id_generator_;
diff --git a/mojo/edk/system/channel_unittest.cc b/mojo/edk/system/channel_unittest.cc
index ce4eefe..741c99f 100644
--- a/mojo/edk/system/channel_unittest.cc
+++ b/mojo/edk/system/channel_unittest.cc
@@ -199,7 +199,7 @@ TEST_F(ChannelTest, CloseBeforeRun) {
mp->Close(0);
- channel()->AttachAndRunEndpoint(channel_endpoint, true);
+ channel()->SetBootstrapEndpoint(channel_endpoint);
io_thread()->PostTaskAndWait(
FROM_HERE, base::Bind(&ChannelTest::ShutdownChannelOnIOThread,
@@ -225,7 +225,7 @@ TEST_F(ChannelTest, ShutdownAfterAttach) {
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));
- channel()->AttachAndRunEndpoint(channel_endpoint, true);
+ channel()->SetBootstrapEndpoint(channel_endpoint);
Waiter waiter;
waiter.Init();
@@ -268,7 +268,7 @@ TEST_F(ChannelTest, WaitAfterAttachRunAndShutdown) {
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));
- channel()->AttachAndRunEndpoint(channel_endpoint, true);
+ channel()->SetBootstrapEndpoint(channel_endpoint);
io_thread()->PostTaskAndWait(
FROM_HERE, base::Bind(&ChannelTest::ShutdownChannelOnIOThread,
diff --git a/mojo/edk/system/incoming_endpoint.cc b/mojo/edk/system/incoming_endpoint.cc
new file mode 100644
index 0000000..14f1a71
--- /dev/null
+++ b/mojo/edk/system/incoming_endpoint.cc
@@ -0,0 +1,59 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/incoming_endpoint.h"
+
+#include "base/logging.h"
+#include "mojo/edk/system/channel_endpoint.h"
+#include "mojo/edk/system/message_in_transit.h"
+#include "mojo/edk/system/message_pipe.h"
+
+namespace mojo {
+namespace system {
+
+IncomingEndpoint::IncomingEndpoint() {
+}
+
+scoped_refptr<ChannelEndpoint> IncomingEndpoint::Init() {
+ endpoint_ = new ChannelEndpoint(this, 0);
+ return endpoint_;
+}
+
+scoped_refptr<MessagePipe> IncomingEndpoint::ConvertToMessagePipe() {
+ base::AutoLock locker(lock_);
+ scoped_refptr<MessagePipe> message_pipe(
+ MessagePipe::CreateLocalProxyFromExisting(&message_queue_,
+ endpoint_.get()));
+ DCHECK(message_queue_.IsEmpty());
+ endpoint_ = nullptr;
+ return message_pipe;
+}
+
+void IncomingEndpoint::Close() {
+ base::AutoLock locker(lock_);
+ if (endpoint_) {
+ endpoint_->DetachFromClient();
+ endpoint_ = nullptr;
+ }
+}
+
+bool IncomingEndpoint::OnReadMessage(unsigned /*port*/,
+ MessageInTransit* message) {
+ base::AutoLock locker(lock_);
+ if (!endpoint_)
+ return false;
+
+ message_queue_.AddMessage(make_scoped_ptr(message));
+ return true;
+}
+
+void IncomingEndpoint::OnDetachFromChannel(unsigned /*port*/) {
+ Close();
+}
+
+IncomingEndpoint::~IncomingEndpoint() {
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/edk/system/incoming_endpoint.h b/mojo/edk/system/incoming_endpoint.h
new file mode 100644
index 0000000..cee34f0
--- /dev/null
+++ b/mojo/edk/system/incoming_endpoint.h
@@ -0,0 +1,54 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_SYSTEM_INCOMING_ENDPOINT_H_
+#define MOJO_EDK_SYSTEM_INCOMING_ENDPOINT_H_
+
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "mojo/edk/system/channel_endpoint_client.h"
+#include "mojo/edk/system/message_in_transit_queue.h"
+#include "mojo/edk/system/system_impl_export.h"
+
+namespace mojo {
+namespace system {
+
+class ChannelEndpoint;
+class MessagePipe;
+
+// This is a simple |ChannelEndpointClient| that only receives messages. It's
+// used for endpoints that are "received" by |Channel|, but not yet turned into
+// |MessagePipe|s.
+class MOJO_SYSTEM_IMPL_EXPORT IncomingEndpoint : public ChannelEndpointClient {
+ public:
+ IncomingEndpoint();
+
+ // Must be called before any other method.
+ scoped_refptr<ChannelEndpoint> Init();
+
+ scoped_refptr<MessagePipe> ConvertToMessagePipe();
+
+ // Must be called before destroying this object if |ConvertToMessagePipe()|
+ // wasn't called (but |Init()| was).
+ void Close();
+
+ // |ChannelEndpointClient| methods:
+ bool OnReadMessage(unsigned port, MessageInTransit* message) override;
+ void OnDetachFromChannel(unsigned port) override;
+
+ private:
+ virtual ~IncomingEndpoint();
+
+ base::Lock lock_; // Protects the following members.
+ scoped_refptr<ChannelEndpoint> endpoint_;
+ MessageInTransitQueue message_queue_;
+
+ DISALLOW_COPY_AND_ASSIGN(IncomingEndpoint);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_EDK_SYSTEM_INCOMING_ENDPOINT_H_
diff --git a/mojo/edk/system/local_message_pipe_endpoint.cc b/mojo/edk/system/local_message_pipe_endpoint.cc
index 576ad50..1800aa4 100644
--- a/mojo/edk/system/local_message_pipe_endpoint.cc
+++ b/mojo/edk/system/local_message_pipe_endpoint.cc
@@ -13,8 +13,11 @@
namespace mojo {
namespace system {
-LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
+LocalMessagePipeEndpoint::LocalMessagePipeEndpoint(
+ MessageInTransitQueue* message_queue)
: is_open_(true), is_peer_open_(true) {
+ if (message_queue)
+ message_queue_.Swap(message_queue);
}
LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
diff --git a/mojo/edk/system/local_message_pipe_endpoint.h b/mojo/edk/system/local_message_pipe_endpoint.h
index eb1c6ee..f792d90 100644
--- a/mojo/edk/system/local_message_pipe_endpoint.h
+++ b/mojo/edk/system/local_message_pipe_endpoint.h
@@ -19,7 +19,10 @@ namespace system {
class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
: public MessagePipeEndpoint {
public:
- LocalMessagePipeEndpoint();
+ // If |message_queue| is non-null, its contents will be taken as the queue of
+ // (already-received) messages.
+ explicit LocalMessagePipeEndpoint(
+ MessageInTransitQueue* message_queue = nullptr);
~LocalMessagePipeEndpoint() override;
// |MessagePipeEndpoint| implementation:
diff --git a/mojo/edk/system/message_in_transit.cc b/mojo/edk/system/message_in_transit.cc
index 1387dd82..f6fcd30 100644
--- a/mojo/edk/system/message_in_transit.cc
+++ b/mojo/edk/system/message_in_transit.cc
@@ -25,9 +25,9 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
MessageInTransit::kSubtypeChannelAttachAndRunEndpoint;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint;
+ MessageInTransit::kSubtypeChannelRemoveEndpoint;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck;
+ MessageInTransit::kSubtypeChannelRemoveEndpointAck;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles;
STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment;
diff --git a/mojo/edk/system/message_in_transit.h b/mojo/edk/system/message_in_transit.h
index 0f92b05..023362a 100644
--- a/mojo/edk/system/message_in_transit.h
+++ b/mojo/edk/system/message_in_transit.h
@@ -56,8 +56,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
static const Subtype kSubtypeEndpointData = 0;
// Subtypes for type |kTypeChannel|:
static const Subtype kSubtypeChannelAttachAndRunEndpoint = 0;
- static const Subtype kSubtypeChannelRemoveMessagePipeEndpoint = 1;
- static const Subtype kSubtypeChannelRemoveMessagePipeEndpointAck = 2;
+ static const Subtype kSubtypeChannelRemoveEndpoint = 1;
+ static const Subtype kSubtypeChannelRemoveEndpointAck = 2;
// Subtypes for type |kTypeRawChannel|:
static const Subtype kSubtypeRawChannelPosixExtraPlatformHandles = 0;
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
index 3e243b6..1b0427b 100644
--- a/mojo/edk/system/message_pipe.cc
+++ b/mojo/edk/system/message_pipe.cc
@@ -9,6 +9,7 @@
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
#include "mojo/edk/system/endpoint_relayer.h"
+#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/local_message_pipe_endpoint.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
@@ -18,19 +19,6 @@
namespace mojo {
namespace system {
-namespace {
-
-// TODO(vtl): Move this into |Channel| (and possible further).
-struct SerializedMessagePipe {
- // This is the endpoint ID on the receiving side, and should be a "remote ID".
- // (The receiving side should already have had an endpoint attached and been
- // run via the |Channel|s. This endpoint will have both IDs assigned, so this
- // ID is only needed to associate that endpoint with a particular dispatcher.)
- ChannelEndpointId receiver_endpoint_id;
-};
-
-} // namespace
-
// static
MessagePipe* MessagePipe::CreateLocalLocal() {
MessagePipe* message_pipe = new MessagePipe();
@@ -52,6 +40,30 @@ MessagePipe* MessagePipe::CreateLocalProxy(
}
// static
+MessagePipe* MessagePipe::CreateLocalProxyFromExisting(
+ MessageInTransitQueue* message_queue,
+ ChannelEndpoint* channel_endpoint) {
+ DCHECK(message_queue);
+ MessagePipe* message_pipe = new MessagePipe();
+ message_pipe->endpoints_[0].reset(
+ new LocalMessagePipeEndpoint(message_queue));
+ if (channel_endpoint) {
+ bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
+ message_pipe->endpoints_[1].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint));
+ if (!attached_to_channel)
+ message_pipe->OnDetachFromChannel(1);
+ } else {
+ // This means that the proxy side was already closed; we only need to inform
+ // the local side of this.
+ // TODO(vtl): This is safe to do without locking (but perhaps slightly
+ // dubious), since no other thread has access to |message_pipe| yet.
+ message_pipe->endpoints_[0]->OnPeerClose();
+ }
+ return message_pipe;
+}
+
+// static
MessagePipe* MessagePipe::CreateProxyLocal(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
@@ -77,22 +89,18 @@ bool MessagePipe::Deserialize(Channel* channel,
unsigned* port) {
DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
- if (size != sizeof(SerializedMessagePipe)) {
+ if (size != channel->GetSerializedEndpointSize()) {
LOG(ERROR) << "Invalid serialized message pipe";
return false;
}
- const SerializedMessagePipe* s =
- static_cast<const SerializedMessagePipe*>(source);
- *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
- if (!*message_pipe) {
- LOG(ERROR) << "Failed to deserialize message pipe (ID = "
- << s->receiver_endpoint_id << ")";
+ scoped_refptr<IncomingEndpoint> incoming_endpoint =
+ channel->DeserializeEndpoint(source);
+ if (!incoming_endpoint)
return false;
- }
- DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
- << s->receiver_endpoint_id << ")";
+ *message_pipe = incoming_endpoint->ConvertToMessagePipe();
+ DCHECK(*message_pipe);
*port = 0;
return true;
}
@@ -200,10 +208,10 @@ void MessagePipe::RemoveAwakable(unsigned port,
}
void MessagePipe::StartSerialize(unsigned /*port*/,
- Channel* /*channel*/,
+ Channel* channel,
size_t* max_size,
size_t* max_platform_handles) {
- *max_size = sizeof(SerializedMessagePipe);
+ *max_size = channel->GetSerializedEndpointSize();
*max_platform_handles = 0;
}
@@ -249,25 +257,22 @@ bool MessagePipe::EndSerialize(
//
// TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
+ // The replacement for |endpoints_[port]|, if any.
+ MessagePipeEndpoint* replacement_endpoint = nullptr;
+
unsigned peer_port = GetPeerPort(port);
- if (!endpoints_[peer_port]) {
- // Case 1.
+ if (!endpoints_[peer_port]) { // Case 1.
channel_endpoint = new ChannelEndpoint(
nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
endpoints_[port].get())->message_queue());
- endpoints_[port]->Close();
- endpoints_[port].reset();
} else if (endpoints_[peer_port]->GetType() ==
- MessagePipeEndpoint::kTypeLocal) {
- // Case 2.
+ MessagePipeEndpoint::kTypeLocal) { // Case 2.
channel_endpoint = new ChannelEndpoint(
this, port, static_cast<LocalMessagePipeEndpoint*>(
endpoints_[port].get())->message_queue());
- endpoints_[port]->Close();
- endpoints_[port].reset(
- new ProxyMessagePipeEndpoint(channel_endpoint.get()));
- } else {
- // Case 3.
+ replacement_endpoint =
+ new ProxyMessagePipeEndpoint(channel_endpoint.get());
+ } else { // Case 3.
DLOG(WARNING) << "Direct message pipe passing across multiple channels "
"not yet implemented; will proxy";
@@ -302,22 +307,18 @@ bool MessagePipe::EndSerialize(
relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
- endpoints_[port]->Close();
- endpoints_[port].reset();
// No need to call |Close()| after |ReleaseChannelEndpoint()|.
endpoints_[peer_port].reset();
}
- }
- SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
+ endpoints_[port]->Close();
+ endpoints_[port].reset(replacement_endpoint);
+ }
- // Convert the local endpoint to a proxy endpoint (moving the message queue)
- // and attach it to the channel.
- s->receiver_endpoint_id =
- channel->AttachAndRunEndpoint(channel_endpoint, false);
- DVLOG(2) << "Serializing message pipe (remote ID = "
- << s->receiver_endpoint_id << ")";
- *actual_size = sizeof(SerializedMessagePipe);
+ // TODO(vtl): More/most of the above should be moved into (some variant of)
+ // |Channel::SerializeEndpoint()|.
+ channel->SerializeEndpoint(channel_endpoint, destination);
+ *actual_size = channel->GetSerializedEndpointSize();
return true;
}
diff --git a/mojo/edk/system/message_pipe.h b/mojo/edk/system/message_pipe.h
index 431b5a6..6b75eb8 100644
--- a/mojo/edk/system/message_pipe.h
+++ b/mojo/edk/system/message_pipe.h
@@ -31,6 +31,7 @@ namespace system {
class Awakable;
class Channel;
class ChannelEndpoint;
+class MessageInTransitQueue;
// |MessagePipe| is the secondary object implementing a message pipe (see the
// explanatory comment in core.cc). It is typically owned by the dispatcher(s)
@@ -46,6 +47,14 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe : public ChannelEndpointClient {
static MessagePipe* CreateLocalProxy(
scoped_refptr<ChannelEndpoint>* channel_endpoint);
+ // Similar to |CreateLocalProxy()|, except that it'll do so from an existing
+ // |ChannelEndpoint| (whose |ReplaceClient()| it'll call) and take
+ // |message_queue|'s contents as already-received incoming messages. If
+ // |channel_endpoint| is null, this will create a "half-open" message pipe.
+ static MessagePipe* CreateLocalProxyFromExisting(
+ MessageInTransitQueue* message_queue,
+ ChannelEndpoint* channel_endpoint);
+
// Creates a |MessagePipe| with a |ProxyMessagePipeEndpoint| on port 0 and a
// |LocalMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the
// (newly-created) |ChannelEndpoint| for the former.
diff --git a/mojo/edk/system/message_pipe_test_utils.cc b/mojo/edk/system/message_pipe_test_utils.cc
index 38fc899..ba6a0e0 100644
--- a/mojo/edk/system/message_pipe_test_utils.cc
+++ b/mojo/edk/system/message_pipe_test_utils.cc
@@ -76,13 +76,13 @@ void ChannelThread::InitChannelOnIOThread(
channel_ = new Channel(platform_support_);
CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass())));
- // Attach and run the endpoint.
+ // Start the bootstrap endpoint.
// Note: On the "server" (parent process) side, we need not attach/run the
// endpoint immediately. However, on the "client" (child process) side, this
// *must* be done here -- otherwise, the |Channel| may receive/process
// messages (which it can do as soon as it's hooked up to the IO thread
// message loop, and that message loop runs) before the endpoint is attached.
- channel_->AttachAndRunEndpoint(channel_endpoint, true);
+ channel_->SetBootstrapEndpoint(channel_endpoint);
}
void ChannelThread::ShutdownChannelOnIOThread() {
diff --git a/mojo/edk/system/remote_message_pipe_unittest.cc b/mojo/edk/system/remote_message_pipe_unittest.cc
index d34602c..8a02f2e 100644
--- a/mojo/edk/system/remote_message_pipe_unittest.cc
+++ b/mojo/edk/system/remote_message_pipe_unittest.cc
@@ -27,6 +27,7 @@
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
+#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/message_pipe.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
#include "mojo/edk/system/platform_handle_dispatcher.h"
@@ -139,8 +140,8 @@ class RemoteMessagePipeTest : public testing::Test {
if (!channels_[1])
CreateAndInitChannel(1);
- channels_[0]->AttachAndRunEndpoint(ep0, true);
- channels_[1]->AttachAndRunEndpoint(ep1, true);
+ channels_[0]->SetBootstrapEndpoint(ep0);
+ channels_[1]->SetBootstrapEndpoint(ep1);
}
void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
@@ -149,7 +150,7 @@ class RemoteMessagePipeTest : public testing::Test {
CHECK(channel_index == 0 || channel_index == 1);
CreateAndInitChannel(channel_index);
- channels_[channel_index]->AttachAndRunEndpoint(ep, true);
+ channels_[channel_index]->SetBootstrapEndpoint(ep);
}
void RestoreInitialStateOnIOThread() {
@@ -333,8 +334,9 @@ TEST_F(RemoteMessagePipeTest, Multiplex) {
scoped_refptr<ChannelEndpoint> ep2;
scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
ASSERT_TRUE(channels(0));
- ChannelEndpointId remote_id = channels(0)->AttachAndRunEndpoint(ep2, false);
- EXPECT_TRUE(remote_id.is_remote());
+ size_t endpoint_info_size = channels(0)->GetSerializedEndpointSize();
+ scoped_ptr<char[]> endpoint_info(new char[endpoint_info_size]);
+ channels(0)->SerializeEndpoint(ep2, endpoint_info.get());
waiter.Init();
ASSERT_EQ(
@@ -342,9 +344,9 @@ TEST_F(RemoteMessagePipeTest, Multiplex) {
mp1->AddAwakable(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
EXPECT_EQ(MOJO_RESULT_OK,
- mp0->WriteMessage(0, UserPointer<const void>(&remote_id),
- sizeof(remote_id), nullptr,
- MOJO_WRITE_MESSAGE_FLAG_NONE));
+ mp0->WriteMessage(0, UserPointer<const void>(endpoint_info.get()),
+ static_cast<uint32_t>(endpoint_info_size),
+ nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
EXPECT_EQ(123u, context);
@@ -354,18 +356,22 @@ TEST_F(RemoteMessagePipeTest, Multiplex) {
hss.satisfied_signals);
EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
- ChannelEndpointId received_id;
- buffer_size = static_cast<uint32_t>(sizeof(received_id));
+ EXPECT_EQ(endpoint_info_size, channels(1)->GetSerializedEndpointSize());
+ scoped_ptr<char[]> received_endpoint_info(new char[endpoint_info_size]);
+ buffer_size = static_cast<uint32_t>(endpoint_info_size);
EXPECT_EQ(MOJO_RESULT_OK,
- mp1->ReadMessage(1, UserPointer<void>(&received_id),
+ mp1->ReadMessage(1, UserPointer<void>(received_endpoint_info.get()),
MakeUserPointer(&buffer_size), nullptr, nullptr,
MOJO_READ_MESSAGE_FLAG_NONE));
- EXPECT_EQ(sizeof(received_id), static_cast<size_t>(buffer_size));
- EXPECT_EQ(remote_id, received_id);
+ EXPECT_EQ(endpoint_info_size, static_cast<size_t>(buffer_size));
+ EXPECT_EQ(0, memcmp(received_endpoint_info.get(), endpoint_info.get(),
+ endpoint_info_size));
// Warning: The local side of mp3 is port 0, not port 1.
- scoped_refptr<MessagePipe> mp3 =
- channels(1)->PassIncomingMessagePipe(received_id);
+ scoped_refptr<IncomingEndpoint> incoming_endpoint =
+ channels(1)->DeserializeEndpoint(received_endpoint_info.get());
+ ASSERT_TRUE(incoming_endpoint);
+ scoped_refptr<MessagePipe> mp3 = incoming_endpoint->ConvertToMessagePipe();
ASSERT_TRUE(mp3);
// Write: MP 2, port 0 -> MP 3, port 1.
diff --git a/mojo/edk/system/waiter.cc b/mojo/edk/system/waiter.cc
index f9047cb..f18edc8 100644
--- a/mojo/edk/system/waiter.cc
+++ b/mojo/edk/system/waiter.cc
@@ -82,17 +82,18 @@ MojoResult Waiter::Wait(MojoDeadline deadline, uint32_t* context) {
return awake_result_;
}
-void Waiter::Awake(MojoResult result, uintptr_t context) {
+bool Waiter::Awake(MojoResult result, uintptr_t context) {
base::AutoLock locker(lock_);
if (awoken_)
- return;
+ return true;
awoken_ = true;
awake_result_ = result;
awake_context_ = context;
cv_.Signal();
// |cv_.Wait()|/|cv_.TimedWait()| will return after |lock_| is released.
+ return true;
}
} // namespace system
diff --git a/mojo/edk/system/waiter.h b/mojo/edk/system/waiter.h
index 999f286..b9b63cd 100644
--- a/mojo/edk/system/waiter.h
+++ b/mojo/edk/system/waiter.h
@@ -58,7 +58,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Waiter : public Awakable {
// Wake the waiter up with the given result and context (or no-op if it's been
// woken up already).
- void Awake(MojoResult result, uintptr_t context) override;
+ bool Awake(MojoResult result, uintptr_t context) override;
private:
base::ConditionVariable cv_; // Associated to |lock_|.
diff --git a/mojo/edk/test/BUILD.gn b/mojo/edk/test/BUILD.gn
index 3cf8ae8..8d032ad 100644
--- a/mojo/edk/test/BUILD.gn
+++ b/mojo/edk/test/BUILD.gn
@@ -21,16 +21,14 @@ mojo_edk_source_set("test_support") {
"//testing/gtest",
]
- mojo_edk_deps = [
- "mojo/edk/system",
- ]
+ mojo_edk_deps = [ "mojo/edk/system" ]
}
# GYP version: mojo/edk/mojo_edk.gyp:mojo_run_all_unittests
mojo_edk_source_set("run_all_unittests") {
testonly = true
sources = [
- "run_all_unittests.cc"
+ "run_all_unittests.cc",
]
deps = [
@@ -40,13 +38,9 @@ mojo_edk_source_set("run_all_unittests") {
"//testing/gtest",
]
- mojo_edk_deps = [
- "mojo/edk/system",
- ]
-
- mojo_sdk_deps = [
- "mojo/public/c/test_support",
- ]
+ mojo_edk_deps = [ "mojo/edk/system" ]
+
+ mojo_sdk_deps = [ "mojo/public/c/test_support" ]
}
# GYP version: mojo/edk/mojo_edk.gyp:mojo_run_all_perftests
@@ -58,13 +52,9 @@ mojo_edk_source_set("run_all_perftests") {
"//base/test:test_support",
]
- mojo_edk_deps = [
- "mojo/edk/system",
- ]
+ mojo_edk_deps = [ "mojo/edk/system" ]
- mojo_sdk_deps = [
- "mojo/public/c/test_support",
- ]
+ mojo_sdk_deps = [ "mojo/public/c/test_support" ]
sources = [
"run_all_perftests.cc",
@@ -79,12 +69,45 @@ mojo_edk_source_set("test_support_impl") {
"//base/test:test_support",
]
- mojo_sdk_deps = [
- "mojo/public/c/test_support",
- ]
+ mojo_sdk_deps = [ "mojo/public/c/test_support" ]
sources = [
"test_support_impl.cc",
"test_support_impl.h",
]
}
+
+# Public SDK test targets follow. These targets are not defined within the
+# public SDK itself as running the unittests requires the EDK.
+
+# GYP version: mojo/mojo_base.gyp:mojo_public_application_unittests
+test("mojo_public_application_unittests") {
+ deps = [
+ ":run_all_unittests",
+ "../../public/cpp/application/tests",
+ ]
+}
+
+# GYP version: mojo/mojo_base.gyp:mojo_public_bindings_unittests
+test("mojo_public_bindings_unittests") {
+ deps = [
+ ":run_all_unittests",
+ "../../public/cpp/bindings/tests",
+ ]
+}
+
+# GYP version: mojo/mojo_base.gyp:mojo_public_environment_unittests
+test("mojo_public_environment_unittests") {
+ deps = [
+ ":run_all_unittests",
+ "../../public/cpp/environment/tests",
+ ]
+}
+
+# GYP version: mojo/mojo_base.gyp:mojo_public_system_perftests
+test("mojo_public_system_perftests") {
+ deps = [
+ ":run_all_perftests",
+ "../../public/c/system/tests:perftests",
+ ]
+}