summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrockot <rockot@chromium.org>2016-03-14 06:18:30 -0700
committerCommit bot <commit-bot@chromium.org>2016-03-14 13:19:38 +0000
commitb1e74df4227184deaaee4d192a5fb3c48116f081 (patch)
tree708cfea1358da266b57bfaedcf4586dc6395d51d
parenta1f711bef18fcac6046c5760dba0236d9bdf5392 (diff)
downloadchromium_src-b1e74df4227184deaaee4d192a5fb3c48116f081.zip
chromium_src-b1e74df4227184deaaee4d192a5fb3c48116f081.tar.gz
chromium_src-b1e74df4227184deaaee4d192a5fb3c48116f081.tar.bz2
[mojo] Implement pipe fusion API
Sometimes it's desirable to fuse two pipes together. This is particularly useful when a service provides some kind of interface pipe later than its consumers would like to have such a pipe available. Rather than require the service to be rewritten to accommodate such cases, a consumer could create its own disconnected pipe and fuse it lazily with a service endpoint. BUG=591742 Review URL: https://codereview.chromium.org/1785843002 Cr-Commit-Position: refs/heads/master@{#380964}
-rw-r--r--mojo/edk/embedder/entrypoints.cc4
-rw-r--r--mojo/edk/system/core.cc38
-rw-r--r--mojo/edk/system/core.h1
-rw-r--r--mojo/edk/system/message_pipe_dispatcher.cc25
-rw-r--r--mojo/edk/system/message_pipe_dispatcher.h5
-rw-r--r--mojo/edk/system/message_pipe_unittest.cc156
-rw-r--r--mojo/edk/system/node_controller.cc7
-rw-r--r--mojo/edk/system/node_controller.h3
-rw-r--r--mojo/edk/system/ports/node.cc138
-rw-r--r--mojo/edk/system/ports/node.h7
-rw-r--r--mojo/public/c/system/message_pipe.h25
-rw-r--r--mojo/public/cpp/bindings/interface_request.h10
-rw-r--r--mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc36
-rw-r--r--mojo/public/cpp/system/message_pipe.h8
-rw-r--r--mojo/public/interfaces/bindings/tests/sample_interfaces.mojom4
-rw-r--r--mojo/public/platform/native/system_thunks.cc5
-rw-r--r--mojo/public/platform/native/system_thunks.h4
17 files changed, 450 insertions, 26 deletions
diff --git a/mojo/edk/embedder/entrypoints.cc b/mojo/edk/embedder/entrypoints.cc
index 23b919d..6acd985 100644
--- a/mojo/edk/embedder/entrypoints.cc
+++ b/mojo/edk/embedder/entrypoints.cc
@@ -103,6 +103,10 @@ MojoResult MojoReadMessage(MojoHandle message_pipe_handle,
message_pipe_handle, bytes, num_bytes, handles, num_handles, flags);
}
+MojoResult MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1) {
+ return g_core->FuseMessagePipes(handle0, handle1);
+}
+
MojoResult MojoCreateDataPipe(const MojoCreateDataPipeOptions* options,
MojoHandle* data_pipe_producer_handle,
MojoHandle* data_pipe_consumer_handle) {
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index a54e8b1..d436307 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -451,6 +451,8 @@ MojoResult Core::CreateMessagePipe(
if (*message_pipe_handle1 == MOJO_HANDLE_INVALID) {
scoped_refptr<Dispatcher> unused;
unused->Close();
+
+ base::AutoLock lock(handles_lock_);
handles_.GetAndRemoveDispatcher(*message_pipe_handle0, &unused);
return MOJO_RESULT_RESOURCE_EXHAUSTED;
}
@@ -523,6 +525,41 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle,
return dispatcher->ReadMessage(bytes, num_bytes, handles, num_handles, flags);
}
+MojoResult Core::FuseMessagePipes(MojoHandle handle0, MojoHandle handle1) {
+ RequestContext request_context;
+ scoped_refptr<Dispatcher> dispatcher0;
+ scoped_refptr<Dispatcher> dispatcher1;
+
+ bool valid_handles = true;
+ {
+ base::AutoLock lock(handles_lock_);
+ MojoResult result0 = handles_.GetAndRemoveDispatcher(handle0, &dispatcher0);
+ MojoResult result1 = handles_.GetAndRemoveDispatcher(handle1, &dispatcher1);
+ if (result0 != MOJO_RESULT_OK || result1 != MOJO_RESULT_OK ||
+ dispatcher0->GetType() != Dispatcher::Type::MESSAGE_PIPE ||
+ dispatcher1->GetType() != Dispatcher::Type::MESSAGE_PIPE)
+ valid_handles = false;
+ }
+
+ if (!valid_handles) {
+ if (dispatcher0)
+ dispatcher0->Close();
+ if (dispatcher1)
+ dispatcher1->Close();
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+
+ MessagePipeDispatcher* mpd0 =
+ static_cast<MessagePipeDispatcher*>(dispatcher0.get());
+ MessagePipeDispatcher* mpd1 =
+ static_cast<MessagePipeDispatcher*>(dispatcher1.get());
+
+ if (!mpd0->Fuse(mpd1))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ return MOJO_RESULT_OK;
+}
+
MojoResult Core::CreateDataPipe(
const MojoCreateDataPipeOptions* options,
MojoHandle* data_pipe_producer_handle,
@@ -568,6 +605,7 @@ MojoResult Core::CreateDataPipe(
*data_pipe_consumer_handle == MOJO_HANDLE_INVALID) {
if (*data_pipe_producer_handle != MOJO_HANDLE_INVALID) {
scoped_refptr<Dispatcher> unused;
+ base::AutoLock lock(handles_lock_);
handles_.GetAndRemoveDispatcher(*data_pipe_producer_handle, &unused);
}
producer->Close();
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index 65b6933..11aff94 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -170,6 +170,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
MojoHandle* handles,
uint32_t* num_handles,
MojoReadMessageFlags flags);
+ MojoResult FuseMessagePipes(MojoHandle handle0, MojoHandle handle1);
// These methods correspond to the API functions defined in
// "mojo/public/c/system/data_pipe.h":
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index c630b8b..1722567 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -99,6 +99,31 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
make_scoped_refptr(new PortObserverThunk(this)));
}
+bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
+ node_controller_->SetPortObserver(port_, nullptr);
+ node_controller_->SetPortObserver(other->port_, nullptr);
+
+ ports::PortRef port0;
+ {
+ base::AutoLock lock(signal_lock_);
+ port0 = port_;
+ port_closed_ = true;
+ awakables_.CancelAll();
+ }
+
+ ports::PortRef port1;
+ {
+ base::AutoLock lock(other->signal_lock_);
+ port1 = other->port_;
+ other->port_closed_ = true;
+ other->awakables_.CancelAll();
+ }
+
+ // Both ports are always closed by this call.
+ int rv = node_controller_->MergeLocalPorts(port0, port1);
+ return rv == ports::OK;
+}
+
Dispatcher::Type MessagePipeDispatcher::GetType() const {
return Type::MESSAGE_PIPE;
}
diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h
index 7400515..b457ab9 100644
--- a/mojo/edk/system/message_pipe_dispatcher.h
+++ b/mojo/edk/system/message_pipe_dispatcher.h
@@ -39,6 +39,11 @@ class MessagePipeDispatcher : public Dispatcher {
uint64_t pipe_id,
int endpoint);
+ // Fuses this pipe with |other|. Returns |true| on success or |false| on
+ // failure. Regardless of the return value, both dispatchers are closed by
+ // this call.
+ bool Fuse(MessagePipeDispatcher* other);
+
// Dispatcher:
Type GetType() const override;
MojoResult Close() override;
diff --git a/mojo/edk/system/message_pipe_unittest.cc b/mojo/edk/system/message_pipe_unittest.cc
index 5abea44..bfafbb7 100644
--- a/mojo/edk/system/message_pipe_unittest.cc
+++ b/mojo/edk/system/message_pipe_unittest.cc
@@ -54,6 +54,8 @@ class MessagePipeTest : public test::MojoTestBase {
DISALLOW_COPY_AND_ASSIGN(MessagePipeTest);
};
+using FuseMessagePipeTest = test::MojoTestBase;
+
TEST_F(MessagePipeTest, WriteData) {
ASSERT_EQ(MOJO_RESULT_OK,
WriteMessage(pipe0_, kHelloWorld, sizeof(kHelloWorld)));
@@ -489,6 +491,160 @@ TEST_F(MessagePipeTest, MAYBE_SharedBufferHandlePingPong) {
#endif // !defined(OS_IOS)
+TEST_F(FuseMessagePipeTest, Basic) {
+ // Test that we can fuse pipes and they still work.
+
+ MojoHandle a, b, c, d;
+ CreateMessagePipe(&a, &b);
+ CreateMessagePipe(&c, &d);
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c));
+
+ // Handles b and c should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c));
+
+ const std::string kTestMessage1 = "Hello, world!";
+ const std::string kTestMessage2 = "Goodbye, world!";
+
+ WriteMessage(a, kTestMessage1);
+ EXPECT_EQ(kTestMessage1, ReadMessage(d));
+
+ WriteMessage(d, kTestMessage2);
+ EXPECT_EQ(kTestMessage2, ReadMessage(a));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d));
+}
+
+TEST_F(FuseMessagePipeTest, FuseAfterPeerWrite) {
+ // Test that messages written before fusion are eventually delivered.
+
+ MojoHandle a, b, c, d;
+ CreateMessagePipe(&a, &b);
+ CreateMessagePipe(&c, &d);
+
+ const std::string kTestMessage1 = "Hello, world!";
+ const std::string kTestMessage2 = "Goodbye, world!";
+ WriteMessage(a, kTestMessage1);
+ WriteMessage(d, kTestMessage2);
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c));
+
+ // Handles b and c should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c));
+
+ EXPECT_EQ(kTestMessage1, ReadMessage(d));
+ EXPECT_EQ(kTestMessage2, ReadMessage(a));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d));
+}
+
+TEST_F(FuseMessagePipeTest, NoFuseAfterWrite) {
+ // Test that a pipe endpoint which has been written to cannot be fused.
+
+ MojoHandle a, b, c, d;
+ CreateMessagePipe(&a, &b);
+ CreateMessagePipe(&c, &d);
+
+ WriteMessage(b, "shouldn't have done that!");
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, MojoFuseMessagePipes(b, c));
+
+ // Handles b and c should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d));
+}
+
+TEST_F(FuseMessagePipeTest, NoFuseSelf) {
+ // Test that a pipe's own endpoints can't be fused together.
+
+ MojoHandle a, b;
+ CreateMessagePipe(&a, &b);
+
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, MojoFuseMessagePipes(a, b));
+
+ // Handles a and b should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(a));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b));
+}
+
+TEST_F(FuseMessagePipeTest, FuseInvalidArguments) {
+ MojoHandle a, b, c, d;
+ CreateMessagePipe(&a, &b);
+ CreateMessagePipe(&c, &d);
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(b));
+
+ // Can't fuse an invalid handle.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoFuseMessagePipes(b, c));
+
+ // Handle c should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c));
+
+ // Can't fuse a non-message pipe handle.
+ MojoHandle e, f;
+ CreateDataPipe(&e, &f, 16);
+
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoFuseMessagePipes(e, d));
+
+ // Handles d and e should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(d));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(e));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(f));
+}
+
+TEST_F(FuseMessagePipeTest, FuseAfterPeerClosure) {
+ // Test that peer closure prior to fusion can still be detected after fusion.
+
+ MojoHandle a, b, c, d;
+ CreateMessagePipe(&a, &b);
+ CreateMessagePipe(&c, &d);
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c));
+
+ // Handles b and c should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoWait(d, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ MOJO_DEADLINE_INDEFINITE, nullptr));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d));
+}
+
+TEST_F(FuseMessagePipeTest, FuseAfterPeerWriteAndClosure) {
+ // Test that peer write and closure prior to fusion still results in the
+ // both message arrival and awareness of peer closure.
+
+ MojoHandle a, b, c, d;
+ CreateMessagePipe(&a, &b);
+ CreateMessagePipe(&c, &d);
+
+ const std::string kTestMessage = "ayyy lmao";
+ WriteMessage(a, kTestMessage);
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c));
+
+ // Handles b and c should be closed.
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c));
+
+ EXPECT_EQ(kTestMessage, ReadMessage(d));
+ EXPECT_EQ(MOJO_RESULT_OK, MojoWait(d, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ MOJO_DEADLINE_INDEFINITE, nullptr));
+
+ EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d));
+}
+
} // namespace
} // namespace edk
} // namespace mojo
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
index 4849806..1472f03 100644
--- a/mojo/edk/system/node_controller.cc
+++ b/mojo/edk/system/node_controller.cc
@@ -209,6 +209,13 @@ void NodeController::MergePortIntoParent(const std::string& token,
parent->RequestPortMerge(port.name(), token);
}
+int NodeController::MergeLocalPorts(const ports::PortRef& port0,
+ const ports::PortRef& port1) {
+ int rv = node_->MergeLocalPorts(port0, port1);
+ AcceptIncomingMessages();
+ return rv;
+}
+
scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
size_t num_bytes) {
// TODO(amistry): Fix sync broker and re-enable on OSX.
diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h
index c103e03..75d2892 100644
--- a/mojo/edk/system/node_controller.h
+++ b/mojo/edk/system/node_controller.h
@@ -90,6 +90,9 @@ class NodeController : public ports::NodeDelegate,
void MergePortIntoParent(const std::string& token,
const ports::PortRef& port);
+ // Merges two local ports together.
+ int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
+
// Creates a new shared buffer for use in the current process.
scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc
index f26e831..da2bdb0 100644
--- a/mojo/edk/system/ports/node.cc
+++ b/mojo/edk/system/ports/node.cc
@@ -388,6 +388,33 @@ int Node::MergePorts(const PortRef& port_ref,
return OK;
}
+int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
+ Port* port0 = port0_ref.port();
+ Port* port1 = port1_ref.port();
+ int rv;
+ {
+ // |ports_lock_| must be held when acquiring overlapping port locks.
+ base::AutoLock ports_lock(ports_lock_);
+ base::AutoLock port0_lock(port0->lock);
+ base::AutoLock port1_lock(port1->lock);
+
+ DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
+ << " and " << port1_ref.name() << "@" << name_;
+
+ if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
+ rv = ERROR_PORT_STATE_UNEXPECTED;
+ else
+ rv = MergePorts_Locked(port0_ref, port1_ref);
+ }
+
+ if (rv != OK) {
+ ClosePort(port0_ref);
+ ClosePort(port1_ref);
+ }
+
+ return rv;
+}
+
int Node::LostConnectionToNode(const NodeName& node_name) {
// We can no longer send events to the given node. We also can't expect any
// PortAccepted events.
@@ -763,32 +790,12 @@ int Node::OnMergePort(const PortName& port_name,
// Both ports are locked. Now all we have to do is swap their peer
// information and set them up as proxies.
- std::swap(port->peer_node_name, new_port->peer_node_name);
- std::swap(port->peer_port_name, new_port->peer_port_name);
- std::swap(port->peer_closed, new_port->peer_closed);
-
- port->state = Port::kBuffering;
- if (port->peer_closed)
- port->remove_proxy_on_last_message = true;
-
- new_port->state = Port::kBuffering;
- if (new_port->peer_closed)
- new_port->remove_proxy_on_last_message = true;
-
- int rv1 = BeginProxying_Locked(port.get(), port_name);
- int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name);
+ PortRef port0_ref(port_name, port);
+ PortRef port1_ref(event.new_port_name, new_port);
+ int rv = MergePorts_Locked(port0_ref, port1_ref);
+ if (rv == OK)
+ return rv;
- if (rv1 == OK && rv2 == OK)
- return OK;
-
- // If either proxy failed to initialize (e.g. had undeliverable messages
- // or ended up in a bad state somehow), we keep the system in a consistent
- // state by undoing the peer swap and closing both merge ports.
-
- std::swap(port->peer_node_name, new_port->peer_node_name);
- std::swap(port->peer_port_name, new_port->peer_port_name);
- port->state = Port::kReceiving;
- new_port->state = Port::kReceiving;
close_new_port = true;
close_target_port = true;
}
@@ -849,6 +856,87 @@ scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
return iter->second;
}
+int Node::MergePorts_Locked(const PortRef& port0_ref,
+ const PortRef& port1_ref) {
+ Port* port0 = port0_ref.port();
+ Port* port1 = port1_ref.port();
+
+ ports_lock_.AssertAcquired();
+ port0->lock.AssertAcquired();
+ port1->lock.AssertAcquired();
+
+ CHECK(port0->state == Port::kReceiving);
+ CHECK(port1->state == Port::kReceiving);
+
+ // Ports cannot be merged with their own receiving peer!
+ if (port0->peer_node_name == name_ &&
+ port0->peer_port_name == port1_ref.name())
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ if (port1->peer_node_name == name_ &&
+ port1->peer_port_name == port0_ref.name())
+ return ERROR_PORT_STATE_UNEXPECTED;
+
+ // Only merge if both ports have never sent a message.
+ if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
+ port1->next_sequence_num_to_send == kInitialSequenceNum) {
+ // Swap the ports' peer information and switch them both into buffering
+ // (eventually proxying) mode.
+
+ std::swap(port0->peer_node_name, port1->peer_node_name);
+ std::swap(port0->peer_port_name, port1->peer_port_name);
+ std::swap(port0->peer_closed, port1->peer_closed);
+
+ port0->state = Port::kBuffering;
+ if (port0->peer_closed)
+ port0->remove_proxy_on_last_message = true;
+
+ port1->state = Port::kBuffering;
+ if (port1->peer_closed)
+ port1->remove_proxy_on_last_message = true;
+
+ int rv1 = BeginProxying_Locked(port0, port0_ref.name());
+ int rv2 = BeginProxying_Locked(port1, port1_ref.name());
+
+ if (rv1 == OK && rv2 == OK) {
+ // If either merged port had a closed peer, its new peer needs to be
+ // informed of this.
+ if (port1->peer_closed) {
+ ObserveClosureEventData data;
+ data.last_sequence_num = port0->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port0->peer_node_name,
+ NewInternalMessage(port0->peer_port_name,
+ EventType::kObserveClosure, data));
+ }
+
+ if (port0->peer_closed) {
+ ObserveClosureEventData data;
+ data.last_sequence_num = port1->last_sequence_num_to_receive;
+ delegate_->ForwardMessage(
+ port1->peer_node_name,
+ NewInternalMessage(port1->peer_port_name,
+ EventType::kObserveClosure, data));
+ }
+
+ return OK;
+ }
+
+ // If either proxy failed to initialize (e.g. had undeliverable messages
+ // or ended up in a bad state somehow), we keep the system in a consistent
+ // state by undoing the peer swap.
+ std::swap(port0->peer_node_name, port1->peer_node_name);
+ std::swap(port0->peer_port_name, port1->peer_port_name);
+ std::swap(port0->peer_closed, port1->peer_closed);
+ port0->remove_proxy_on_last_message = false;
+ port1->remove_proxy_on_last_message = false;
+ port0->state = Port::kReceiving;
+ port1->state = Port::kReceiving;
+ }
+
+ return ERROR_PORT_STATE_UNEXPECTED;
+}
+
void Node::WillSendPort_Locked(Port* port,
const NodeName& to_node_name,
PortName* port_name,
diff --git a/mojo/edk/system/ports/node.h b/mojo/edk/system/ports/node.h
index cb1d5fd..2dc513c 100644
--- a/mojo/edk/system/ports/node.h
+++ b/mojo/edk/system/ports/node.h
@@ -137,6 +137,12 @@ class Node {
const NodeName& destination_node_name,
const PortName& destination_port_name);
+ // Like above but merges two ports local to this node. Because both ports are
+ // local this can also verify that neither port has been written to before the
+ // merge. If this fails for any reason, both ports are closed. Otherwise OK
+ // is returned and the ports' receiving peers are connected to each other.
+ int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref);
+
// Called to inform this node that communication with another node is lost
// indefinitely. This triggers cleanup of ports bound to this node.
int LostConnectionToNode(const NodeName& node_name);
@@ -157,6 +163,7 @@ class Node {
scoped_refptr<Port> GetPort(const PortName& port_name);
scoped_refptr<Port> GetPort_Locked(const PortName& port_name);
+ int MergePorts_Locked(const PortRef& port0_ref, const PortRef& port1_ref);
void WillSendPort_Locked(Port* port,
const NodeName& to_node_name,
PortName* port_name,
diff --git a/mojo/public/c/system/message_pipe.h b/mojo/public/c/system/message_pipe.h
index 10da525..8c5c215 100644
--- a/mojo/public/c/system/message_pipe.h
+++ b/mojo/public/c/system/message_pipe.h
@@ -181,6 +181,31 @@ MOJO_SYSTEM_EXPORT MojoResult
uint32_t* num_handles, // Optional in/out.
MojoReadMessageFlags flags);
+// Fuses two message pipe endpoints together. Given two pipes:
+//
+// A <-> B and C <-> D
+//
+// Fusing handle B and handle C results in a single pipe:
+//
+// A <-> D
+//
+// Handles B and C are ALWAYS closed. Any unread messages at C will eventually
+// be delivered to A, and any unread messages at B will eventually be delivered
+// to D.
+//
+// NOTE: A handle may only be fused if it is an open message pipe handle which
+// has not been written to.
+//
+// Returns:
+// |MOJO_RESULT_OK| on success.
+// |MOJO_RESULT_FAILED_PRECONDITION| if both handles were valid message pipe
+// handles but could not be merged (e.g. one of them has been written to).
+// |MOJO_INVALID_ARGUMENT| if either handle is not a fusable message pipe
+// handle.
+MOJO_SYSTEM_EXPORT MojoResult
+ MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1);
+
+
#ifdef __cplusplus
} // extern "C"
#endif
diff --git a/mojo/public/cpp/bindings/interface_request.h b/mojo/public/cpp/bindings/interface_request.h
index f67566d..807cd5d 100644
--- a/mojo/public/cpp/bindings/interface_request.h
+++ b/mojo/public/cpp/bindings/interface_request.h
@@ -117,6 +117,16 @@ InterfaceRequest<Interface> GetProxy(InterfacePtr<Interface>* ptr) {
return MakeRequest<Interface>(std::move(pipe.handle1));
}
+// Fuses an InterfaceRequest<T> endpoint with an InterfacePtrInfo<T> endpoint.
+// Returns |true| on success or |false| on failure.
+template <typename Interface>
+bool FuseInterface(InterfaceRequest<Interface> request,
+ InterfacePtrInfo<Interface> proxy_info) {
+ MojoResult result = FuseMessagePipes(request.PassMessagePipe(),
+ proxy_info.PassHandle());
+ return result == MOJO_RESULT_OK;
+}
+
} // namespace mojo
#endif // MOJO_PUBLIC_CPP_BINDINGS_INTERFACE_REQUEST_H_
diff --git a/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc b/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc
index d63b542..633d8f1 100644
--- a/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc
+++ b/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc
@@ -714,6 +714,42 @@ TEST_F(InterfacePtrTest, Scoping) {
EXPECT_TRUE(a_impl.d_called());
}
+class PingTestImpl : public sample::PingTest {
+ public:
+ explicit PingTestImpl(InterfaceRequest<sample::PingTest> request)
+ : binding_(this, std::move(request)) {}
+ ~PingTestImpl() override {}
+
+ private:
+ // sample::PingTest:
+ void Ping(const PingCallback& callback) override { callback.Run(); }
+
+ Binding<sample::PingTest> binding_;
+};
+
+// Tests that FuseProxy does what it's supposed to do.
+TEST_F(InterfacePtrTest, Fusion) {
+ sample::PingTestPtr proxy;
+ PingTestImpl impl(GetProxy(&proxy));
+
+ // Create another PingTest pipe.
+ sample::PingTestPtr ptr;
+ sample::PingTestRequest request = GetProxy(&ptr);
+
+ // Fuse the new pipe to the one hanging off |impl|.
+ EXPECT_TRUE(FuseInterface(std::move(request), proxy.PassInterface()));
+
+ // Ping!
+ bool called = false;
+ base::RunLoop loop;
+ ptr->Ping([&called, &loop] {
+ called = true;
+ loop.Quit();
+ });
+ loop.Run();
+ EXPECT_TRUE(called);
+}
+
} // namespace
} // namespace test
} // namespace mojo
diff --git a/mojo/public/cpp/system/message_pipe.h b/mojo/public/cpp/system/message_pipe.h
index 94818e8..a56021d 100644
--- a/mojo/public/cpp/system/message_pipe.h
+++ b/mojo/public/cpp/system/message_pipe.h
@@ -88,6 +88,14 @@ inline MojoResult ReadMessageRaw(MessagePipeHandle message_pipe,
message_pipe.value(), bytes, num_bytes, handles, num_handles, flags);
}
+// Fuses two message pipes together at the given handles. See
+// |MojoFuseMessagePipes()| for complete documentation.
+inline MojoResult FuseMessagePipes(ScopedMessagePipeHandle message_pipe0,
+ ScopedMessagePipeHandle message_pipe1) {
+ return MojoFuseMessagePipes(message_pipe0.release().value(),
+ message_pipe1.release().value());
+}
+
// A wrapper class that automatically creates a message pipe and owns both
// handles.
class MessagePipe {
diff --git a/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom b/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom
index a2dd73f..5960d75 100644
--- a/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom
+++ b/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom
@@ -13,6 +13,10 @@ enum Enum {
VALUE
};
+interface PingTest {
+ Ping() => ();
+};
+
interface Provider {
EchoString(string a) => (string a);
EchoStrings(string a, string b) => (string a, string b);
diff --git a/mojo/public/platform/native/system_thunks.cc b/mojo/public/platform/native/system_thunks.cc
index fd14d48..72d879b 100644
--- a/mojo/public/platform/native/system_thunks.cc
+++ b/mojo/public/platform/native/system_thunks.cc
@@ -200,6 +200,11 @@ MojoResult MojoCancelWatch(MojoHandle handle, uintptr_t context) {
return g_thunks.CancelWatch(handle, context);
}
+MojoResult MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1) {
+ assert(g_thunks.FuseMessagePipes);
+ return g_thunks.FuseMessagePipes(handle0, handle1);
+}
+
extern "C" THUNK_EXPORT size_t MojoSetSystemThunks(
const MojoSystemThunks* system_thunks) {
if (system_thunks->size >= sizeof(g_thunks))
diff --git a/mojo/public/platform/native/system_thunks.h b/mojo/public/platform/native/system_thunks.h
index b50970e..6790436 100644
--- a/mojo/public/platform/native/system_thunks.h
+++ b/mojo/public/platform/native/system_thunks.h
@@ -119,6 +119,7 @@ struct MojoSystemThunks {
MojoWatchCallback callback,
uintptr_t context);
MojoResult (*CancelWatch)(MojoHandle handle, uintptr_t context);
+ MojoResult (*FuseMessagePipes)(MojoHandle handle0, MojoHandle handle1);
};
#pragma pack(pop)
@@ -151,7 +152,8 @@ inline MojoSystemThunks MojoMakeSystemThunks() {
MojoRemoveHandle,
MojoGetReadyHandles,
MojoWatch,
- MojoCancelWatch};
+ MojoCancelWatch,
+ MojoFuseMessagePipes};
return system_thunks;
}
#endif