diff options
author | rockot <rockot@chromium.org> | 2016-03-14 06:18:30 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-03-14 13:19:38 +0000 |
commit | b1e74df4227184deaaee4d192a5fb3c48116f081 (patch) | |
tree | 708cfea1358da266b57bfaedcf4586dc6395d51d | |
parent | a1f711bef18fcac6046c5760dba0236d9bdf5392 (diff) | |
download | chromium_src-b1e74df4227184deaaee4d192a5fb3c48116f081.zip chromium_src-b1e74df4227184deaaee4d192a5fb3c48116f081.tar.gz chromium_src-b1e74df4227184deaaee4d192a5fb3c48116f081.tar.bz2 |
[mojo] Implement pipe fusion API
Sometimes it's desirable to fuse two pipes together. This is
particularly useful when a service provides some kind of interface
pipe later than its consumers would like to have such a pipe
available. Rather than require the service to be rewritten to
accommodate such cases, a consumer could create its own
disconnected pipe and fuse it lazily with a service endpoint.
BUG=591742
Review URL: https://codereview.chromium.org/1785843002
Cr-Commit-Position: refs/heads/master@{#380964}
-rw-r--r-- | mojo/edk/embedder/entrypoints.cc | 4 | ||||
-rw-r--r-- | mojo/edk/system/core.cc | 38 | ||||
-rw-r--r-- | mojo/edk/system/core.h | 1 | ||||
-rw-r--r-- | mojo/edk/system/message_pipe_dispatcher.cc | 25 | ||||
-rw-r--r-- | mojo/edk/system/message_pipe_dispatcher.h | 5 | ||||
-rw-r--r-- | mojo/edk/system/message_pipe_unittest.cc | 156 | ||||
-rw-r--r-- | mojo/edk/system/node_controller.cc | 7 | ||||
-rw-r--r-- | mojo/edk/system/node_controller.h | 3 | ||||
-rw-r--r-- | mojo/edk/system/ports/node.cc | 138 | ||||
-rw-r--r-- | mojo/edk/system/ports/node.h | 7 | ||||
-rw-r--r-- | mojo/public/c/system/message_pipe.h | 25 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/interface_request.h | 10 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc | 36 | ||||
-rw-r--r-- | mojo/public/cpp/system/message_pipe.h | 8 | ||||
-rw-r--r-- | mojo/public/interfaces/bindings/tests/sample_interfaces.mojom | 4 | ||||
-rw-r--r-- | mojo/public/platform/native/system_thunks.cc | 5 | ||||
-rw-r--r-- | mojo/public/platform/native/system_thunks.h | 4 |
17 files changed, 450 insertions, 26 deletions
diff --git a/mojo/edk/embedder/entrypoints.cc b/mojo/edk/embedder/entrypoints.cc index 23b919d..6acd985 100644 --- a/mojo/edk/embedder/entrypoints.cc +++ b/mojo/edk/embedder/entrypoints.cc @@ -103,6 +103,10 @@ MojoResult MojoReadMessage(MojoHandle message_pipe_handle, message_pipe_handle, bytes, num_bytes, handles, num_handles, flags); } +MojoResult MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1) { + return g_core->FuseMessagePipes(handle0, handle1); +} + MojoResult MojoCreateDataPipe(const MojoCreateDataPipeOptions* options, MojoHandle* data_pipe_producer_handle, MojoHandle* data_pipe_consumer_handle) { diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc index a54e8b1..d436307 100644 --- a/mojo/edk/system/core.cc +++ b/mojo/edk/system/core.cc @@ -451,6 +451,8 @@ MojoResult Core::CreateMessagePipe( if (*message_pipe_handle1 == MOJO_HANDLE_INVALID) { scoped_refptr<Dispatcher> unused; unused->Close(); + + base::AutoLock lock(handles_lock_); handles_.GetAndRemoveDispatcher(*message_pipe_handle0, &unused); return MOJO_RESULT_RESOURCE_EXHAUSTED; } @@ -523,6 +525,41 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, return dispatcher->ReadMessage(bytes, num_bytes, handles, num_handles, flags); } +MojoResult Core::FuseMessagePipes(MojoHandle handle0, MojoHandle handle1) { + RequestContext request_context; + scoped_refptr<Dispatcher> dispatcher0; + scoped_refptr<Dispatcher> dispatcher1; + + bool valid_handles = true; + { + base::AutoLock lock(handles_lock_); + MojoResult result0 = handles_.GetAndRemoveDispatcher(handle0, &dispatcher0); + MojoResult result1 = handles_.GetAndRemoveDispatcher(handle1, &dispatcher1); + if (result0 != MOJO_RESULT_OK || result1 != MOJO_RESULT_OK || + dispatcher0->GetType() != Dispatcher::Type::MESSAGE_PIPE || + dispatcher1->GetType() != Dispatcher::Type::MESSAGE_PIPE) + valid_handles = false; + } + + if (!valid_handles) { + if (dispatcher0) + dispatcher0->Close(); + if (dispatcher1) + dispatcher1->Close(); + return MOJO_RESULT_INVALID_ARGUMENT; + } + + MessagePipeDispatcher* mpd0 = + static_cast<MessagePipeDispatcher*>(dispatcher0.get()); + MessagePipeDispatcher* mpd1 = + static_cast<MessagePipeDispatcher*>(dispatcher1.get()); + + if (!mpd0->Fuse(mpd1)) + return MOJO_RESULT_FAILED_PRECONDITION; + + return MOJO_RESULT_OK; +} + MojoResult Core::CreateDataPipe( const MojoCreateDataPipeOptions* options, MojoHandle* data_pipe_producer_handle, @@ -568,6 +605,7 @@ MojoResult Core::CreateDataPipe( *data_pipe_consumer_handle == MOJO_HANDLE_INVALID) { if (*data_pipe_producer_handle != MOJO_HANDLE_INVALID) { scoped_refptr<Dispatcher> unused; + base::AutoLock lock(handles_lock_); handles_.GetAndRemoveDispatcher(*data_pipe_producer_handle, &unused); } producer->Close(); diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h index 65b6933..11aff94 100644 --- a/mojo/edk/system/core.h +++ b/mojo/edk/system/core.h @@ -170,6 +170,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { MojoHandle* handles, uint32_t* num_handles, MojoReadMessageFlags flags); + MojoResult FuseMessagePipes(MojoHandle handle0, MojoHandle handle1); // These methods correspond to the API functions defined in // "mojo/public/c/system/data_pipe.h": diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc index c630b8b..1722567 100644 --- a/mojo/edk/system/message_pipe_dispatcher.cc +++ b/mojo/edk/system/message_pipe_dispatcher.cc @@ -99,6 +99,31 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, make_scoped_refptr(new PortObserverThunk(this))); } +bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { + node_controller_->SetPortObserver(port_, nullptr); + node_controller_->SetPortObserver(other->port_, nullptr); + + ports::PortRef port0; + { + base::AutoLock lock(signal_lock_); + port0 = port_; + port_closed_ = true; + awakables_.CancelAll(); + } + + ports::PortRef port1; + { + base::AutoLock lock(other->signal_lock_); + port1 = other->port_; + other->port_closed_ = true; + other->awakables_.CancelAll(); + } + + // Both ports are always closed by this call. + int rv = node_controller_->MergeLocalPorts(port0, port1); + return rv == ports::OK; +} + Dispatcher::Type MessagePipeDispatcher::GetType() const { return Type::MESSAGE_PIPE; } diff --git a/mojo/edk/system/message_pipe_dispatcher.h b/mojo/edk/system/message_pipe_dispatcher.h index 7400515..b457ab9 100644 --- a/mojo/edk/system/message_pipe_dispatcher.h +++ b/mojo/edk/system/message_pipe_dispatcher.h @@ -39,6 +39,11 @@ class MessagePipeDispatcher : public Dispatcher { uint64_t pipe_id, int endpoint); + // Fuses this pipe with |other|. Returns |true| on success or |false| on + // failure. Regardless of the return value, both dispatchers are closed by + // this call. + bool Fuse(MessagePipeDispatcher* other); + // Dispatcher: Type GetType() const override; MojoResult Close() override; diff --git a/mojo/edk/system/message_pipe_unittest.cc b/mojo/edk/system/message_pipe_unittest.cc index 5abea44..bfafbb7 100644 --- a/mojo/edk/system/message_pipe_unittest.cc +++ b/mojo/edk/system/message_pipe_unittest.cc @@ -54,6 +54,8 @@ class MessagePipeTest : public test::MojoTestBase { DISALLOW_COPY_AND_ASSIGN(MessagePipeTest); }; +using FuseMessagePipeTest = test::MojoTestBase; + TEST_F(MessagePipeTest, WriteData) { ASSERT_EQ(MOJO_RESULT_OK, WriteMessage(pipe0_, kHelloWorld, sizeof(kHelloWorld))); @@ -489,6 +491,160 @@ TEST_F(MessagePipeTest, MAYBE_SharedBufferHandlePingPong) { #endif // !defined(OS_IOS) +TEST_F(FuseMessagePipeTest, Basic) { + // Test that we can fuse pipes and they still work. + + MojoHandle a, b, c, d; + CreateMessagePipe(&a, &b); + CreateMessagePipe(&c, &d); + + EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c)); + + // Handles b and c should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c)); + + const std::string kTestMessage1 = "Hello, world!"; + const std::string kTestMessage2 = "Goodbye, world!"; + + WriteMessage(a, kTestMessage1); + EXPECT_EQ(kTestMessage1, ReadMessage(d)); + + WriteMessage(d, kTestMessage2); + EXPECT_EQ(kTestMessage2, ReadMessage(a)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d)); +} + +TEST_F(FuseMessagePipeTest, FuseAfterPeerWrite) { + // Test that messages written before fusion are eventually delivered. + + MojoHandle a, b, c, d; + CreateMessagePipe(&a, &b); + CreateMessagePipe(&c, &d); + + const std::string kTestMessage1 = "Hello, world!"; + const std::string kTestMessage2 = "Goodbye, world!"; + WriteMessage(a, kTestMessage1); + WriteMessage(d, kTestMessage2); + + EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c)); + + // Handles b and c should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c)); + + EXPECT_EQ(kTestMessage1, ReadMessage(d)); + EXPECT_EQ(kTestMessage2, ReadMessage(a)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d)); +} + +TEST_F(FuseMessagePipeTest, NoFuseAfterWrite) { + // Test that a pipe endpoint which has been written to cannot be fused. + + MojoHandle a, b, c, d; + CreateMessagePipe(&a, &b); + CreateMessagePipe(&c, &d); + + WriteMessage(b, "shouldn't have done that!"); + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, MojoFuseMessagePipes(b, c)); + + // Handles b and c should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d)); +} + +TEST_F(FuseMessagePipeTest, NoFuseSelf) { + // Test that a pipe's own endpoints can't be fused together. + + MojoHandle a, b; + CreateMessagePipe(&a, &b); + + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, MojoFuseMessagePipes(a, b)); + + // Handles a and b should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b)); +} + +TEST_F(FuseMessagePipeTest, FuseInvalidArguments) { + MojoHandle a, b, c, d; + CreateMessagePipe(&a, &b); + CreateMessagePipe(&c, &d); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(b)); + + // Can't fuse an invalid handle. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoFuseMessagePipes(b, c)); + + // Handle c should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c)); + + // Can't fuse a non-message pipe handle. + MojoHandle e, f; + CreateDataPipe(&e, &f, 16); + + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoFuseMessagePipes(e, d)); + + // Handles d and e should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(d)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(e)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(f)); +} + +TEST_F(FuseMessagePipeTest, FuseAfterPeerClosure) { + // Test that peer closure prior to fusion can still be detected after fusion. + + MojoHandle a, b, c, d; + CreateMessagePipe(&a, &b); + CreateMessagePipe(&c, &d); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c)); + + // Handles b and c should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoWait(d, MOJO_HANDLE_SIGNAL_PEER_CLOSED, + MOJO_DEADLINE_INDEFINITE, nullptr)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d)); +} + +TEST_F(FuseMessagePipeTest, FuseAfterPeerWriteAndClosure) { + // Test that peer write and closure prior to fusion still results in the + // both message arrival and awareness of peer closure. + + MojoHandle a, b, c, d; + CreateMessagePipe(&a, &b); + CreateMessagePipe(&c, &d); + + const std::string kTestMessage = "ayyy lmao"; + WriteMessage(a, kTestMessage); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoFuseMessagePipes(b, c)); + + // Handles b and c should be closed. + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(b)); + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, MojoClose(c)); + + EXPECT_EQ(kTestMessage, ReadMessage(d)); + EXPECT_EQ(MOJO_RESULT_OK, MojoWait(d, MOJO_HANDLE_SIGNAL_PEER_CLOSED, + MOJO_DEADLINE_INDEFINITE, nullptr)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(d)); +} + } // namespace } // namespace edk } // namespace mojo diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc index 4849806..1472f03 100644 --- a/mojo/edk/system/node_controller.cc +++ b/mojo/edk/system/node_controller.cc @@ -209,6 +209,13 @@ void NodeController::MergePortIntoParent(const std::string& token, parent->RequestPortMerge(port.name(), token); } +int NodeController::MergeLocalPorts(const ports::PortRef& port0, + const ports::PortRef& port1) { + int rv = node_->MergeLocalPorts(port0, port1); + AcceptIncomingMessages(); + return rv; +} + scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( size_t num_bytes) { // TODO(amistry): Fix sync broker and re-enable on OSX. diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h index c103e03..75d2892 100644 --- a/mojo/edk/system/node_controller.h +++ b/mojo/edk/system/node_controller.h @@ -90,6 +90,9 @@ class NodeController : public ports::NodeDelegate, void MergePortIntoParent(const std::string& token, const ports::PortRef& port); + // Merges two local ports together. + int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1); + // Creates a new shared buffer for use in the current process. scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes); diff --git a/mojo/edk/system/ports/node.cc b/mojo/edk/system/ports/node.cc index f26e831..da2bdb0 100644 --- a/mojo/edk/system/ports/node.cc +++ b/mojo/edk/system/ports/node.cc @@ -388,6 +388,33 @@ int Node::MergePorts(const PortRef& port_ref, return OK; } +int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) { + Port* port0 = port0_ref.port(); + Port* port1 = port1_ref.port(); + int rv; + { + // |ports_lock_| must be held when acquiring overlapping port locks. + base::AutoLock ports_lock(ports_lock_); + base::AutoLock port0_lock(port0->lock); + base::AutoLock port1_lock(port1->lock); + + DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_ + << " and " << port1_ref.name() << "@" << name_; + + if (port0->state != Port::kReceiving || port1->state != Port::kReceiving) + rv = ERROR_PORT_STATE_UNEXPECTED; + else + rv = MergePorts_Locked(port0_ref, port1_ref); + } + + if (rv != OK) { + ClosePort(port0_ref); + ClosePort(port1_ref); + } + + return rv; +} + int Node::LostConnectionToNode(const NodeName& node_name) { // We can no longer send events to the given node. We also can't expect any // PortAccepted events. @@ -763,32 +790,12 @@ int Node::OnMergePort(const PortName& port_name, // Both ports are locked. Now all we have to do is swap their peer // information and set them up as proxies. - std::swap(port->peer_node_name, new_port->peer_node_name); - std::swap(port->peer_port_name, new_port->peer_port_name); - std::swap(port->peer_closed, new_port->peer_closed); - - port->state = Port::kBuffering; - if (port->peer_closed) - port->remove_proxy_on_last_message = true; - - new_port->state = Port::kBuffering; - if (new_port->peer_closed) - new_port->remove_proxy_on_last_message = true; - - int rv1 = BeginProxying_Locked(port.get(), port_name); - int rv2 = BeginProxying_Locked(new_port.get(), event.new_port_name); + PortRef port0_ref(port_name, port); + PortRef port1_ref(event.new_port_name, new_port); + int rv = MergePorts_Locked(port0_ref, port1_ref); + if (rv == OK) + return rv; - if (rv1 == OK && rv2 == OK) - return OK; - - // If either proxy failed to initialize (e.g. had undeliverable messages - // or ended up in a bad state somehow), we keep the system in a consistent - // state by undoing the peer swap and closing both merge ports. - - std::swap(port->peer_node_name, new_port->peer_node_name); - std::swap(port->peer_port_name, new_port->peer_port_name); - port->state = Port::kReceiving; - new_port->state = Port::kReceiving; close_new_port = true; close_target_port = true; } @@ -849,6 +856,87 @@ scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { return iter->second; } +int Node::MergePorts_Locked(const PortRef& port0_ref, + const PortRef& port1_ref) { + Port* port0 = port0_ref.port(); + Port* port1 = port1_ref.port(); + + ports_lock_.AssertAcquired(); + port0->lock.AssertAcquired(); + port1->lock.AssertAcquired(); + + CHECK(port0->state == Port::kReceiving); + CHECK(port1->state == Port::kReceiving); + + // Ports cannot be merged with their own receiving peer! + if (port0->peer_node_name == name_ && + port0->peer_port_name == port1_ref.name()) + return ERROR_PORT_STATE_UNEXPECTED; + + if (port1->peer_node_name == name_ && + port1->peer_port_name == port0_ref.name()) + return ERROR_PORT_STATE_UNEXPECTED; + + // Only merge if both ports have never sent a message. + if (port0->next_sequence_num_to_send == kInitialSequenceNum && + port1->next_sequence_num_to_send == kInitialSequenceNum) { + // Swap the ports' peer information and switch them both into buffering + // (eventually proxying) mode. + + std::swap(port0->peer_node_name, port1->peer_node_name); + std::swap(port0->peer_port_name, port1->peer_port_name); + std::swap(port0->peer_closed, port1->peer_closed); + + port0->state = Port::kBuffering; + if (port0->peer_closed) + port0->remove_proxy_on_last_message = true; + + port1->state = Port::kBuffering; + if (port1->peer_closed) + port1->remove_proxy_on_last_message = true; + + int rv1 = BeginProxying_Locked(port0, port0_ref.name()); + int rv2 = BeginProxying_Locked(port1, port1_ref.name()); + + if (rv1 == OK && rv2 == OK) { + // If either merged port had a closed peer, its new peer needs to be + // informed of this. + if (port1->peer_closed) { + ObserveClosureEventData data; + data.last_sequence_num = port0->last_sequence_num_to_receive; + delegate_->ForwardMessage( + port0->peer_node_name, + NewInternalMessage(port0->peer_port_name, + EventType::kObserveClosure, data)); + } + + if (port0->peer_closed) { + ObserveClosureEventData data; + data.last_sequence_num = port1->last_sequence_num_to_receive; + delegate_->ForwardMessage( + port1->peer_node_name, + NewInternalMessage(port1->peer_port_name, + EventType::kObserveClosure, data)); + } + + return OK; + } + + // If either proxy failed to initialize (e.g. had undeliverable messages + // or ended up in a bad state somehow), we keep the system in a consistent + // state by undoing the peer swap. + std::swap(port0->peer_node_name, port1->peer_node_name); + std::swap(port0->peer_port_name, port1->peer_port_name); + std::swap(port0->peer_closed, port1->peer_closed); + port0->remove_proxy_on_last_message = false; + port1->remove_proxy_on_last_message = false; + port0->state = Port::kReceiving; + port1->state = Port::kReceiving; + } + + return ERROR_PORT_STATE_UNEXPECTED; +} + void Node::WillSendPort_Locked(Port* port, const NodeName& to_node_name, PortName* port_name, diff --git a/mojo/edk/system/ports/node.h b/mojo/edk/system/ports/node.h index cb1d5fd..2dc513c 100644 --- a/mojo/edk/system/ports/node.h +++ b/mojo/edk/system/ports/node.h @@ -137,6 +137,12 @@ class Node { const NodeName& destination_node_name, const PortName& destination_port_name); + // Like above but merges two ports local to this node. Because both ports are + // local this can also verify that neither port has been written to before the + // merge. If this fails for any reason, both ports are closed. Otherwise OK + // is returned and the ports' receiving peers are connected to each other. + int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref); + // Called to inform this node that communication with another node is lost // indefinitely. This triggers cleanup of ports bound to this node. int LostConnectionToNode(const NodeName& node_name); @@ -157,6 +163,7 @@ class Node { scoped_refptr<Port> GetPort(const PortName& port_name); scoped_refptr<Port> GetPort_Locked(const PortName& port_name); + int MergePorts_Locked(const PortRef& port0_ref, const PortRef& port1_ref); void WillSendPort_Locked(Port* port, const NodeName& to_node_name, PortName* port_name, diff --git a/mojo/public/c/system/message_pipe.h b/mojo/public/c/system/message_pipe.h index 10da525..8c5c215 100644 --- a/mojo/public/c/system/message_pipe.h +++ b/mojo/public/c/system/message_pipe.h @@ -181,6 +181,31 @@ MOJO_SYSTEM_EXPORT MojoResult uint32_t* num_handles, // Optional in/out. MojoReadMessageFlags flags); +// Fuses two message pipe endpoints together. Given two pipes: +// +// A <-> B and C <-> D +// +// Fusing handle B and handle C results in a single pipe: +// +// A <-> D +// +// Handles B and C are ALWAYS closed. Any unread messages at C will eventually +// be delivered to A, and any unread messages at B will eventually be delivered +// to D. +// +// NOTE: A handle may only be fused if it is an open message pipe handle which +// has not been written to. +// +// Returns: +// |MOJO_RESULT_OK| on success. +// |MOJO_RESULT_FAILED_PRECONDITION| if both handles were valid message pipe +// handles but could not be merged (e.g. one of them has been written to). +// |MOJO_INVALID_ARGUMENT| if either handle is not a fusable message pipe +// handle. +MOJO_SYSTEM_EXPORT MojoResult + MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1); + + #ifdef __cplusplus } // extern "C" #endif diff --git a/mojo/public/cpp/bindings/interface_request.h b/mojo/public/cpp/bindings/interface_request.h index f67566d..807cd5d 100644 --- a/mojo/public/cpp/bindings/interface_request.h +++ b/mojo/public/cpp/bindings/interface_request.h @@ -117,6 +117,16 @@ InterfaceRequest<Interface> GetProxy(InterfacePtr<Interface>* ptr) { return MakeRequest<Interface>(std::move(pipe.handle1)); } +// Fuses an InterfaceRequest<T> endpoint with an InterfacePtrInfo<T> endpoint. +// Returns |true| on success or |false| on failure. +template <typename Interface> +bool FuseInterface(InterfaceRequest<Interface> request, + InterfacePtrInfo<Interface> proxy_info) { + MojoResult result = FuseMessagePipes(request.PassMessagePipe(), + proxy_info.PassHandle()); + return result == MOJO_RESULT_OK; +} + } // namespace mojo #endif // MOJO_PUBLIC_CPP_BINDINGS_INTERFACE_REQUEST_H_ diff --git a/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc b/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc index d63b542..633d8f1 100644 --- a/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc +++ b/mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc @@ -714,6 +714,42 @@ TEST_F(InterfacePtrTest, Scoping) { EXPECT_TRUE(a_impl.d_called()); } +class PingTestImpl : public sample::PingTest { + public: + explicit PingTestImpl(InterfaceRequest<sample::PingTest> request) + : binding_(this, std::move(request)) {} + ~PingTestImpl() override {} + + private: + // sample::PingTest: + void Ping(const PingCallback& callback) override { callback.Run(); } + + Binding<sample::PingTest> binding_; +}; + +// Tests that FuseProxy does what it's supposed to do. +TEST_F(InterfacePtrTest, Fusion) { + sample::PingTestPtr proxy; + PingTestImpl impl(GetProxy(&proxy)); + + // Create another PingTest pipe. + sample::PingTestPtr ptr; + sample::PingTestRequest request = GetProxy(&ptr); + + // Fuse the new pipe to the one hanging off |impl|. + EXPECT_TRUE(FuseInterface(std::move(request), proxy.PassInterface())); + + // Ping! + bool called = false; + base::RunLoop loop; + ptr->Ping([&called, &loop] { + called = true; + loop.Quit(); + }); + loop.Run(); + EXPECT_TRUE(called); +} + } // namespace } // namespace test } // namespace mojo diff --git a/mojo/public/cpp/system/message_pipe.h b/mojo/public/cpp/system/message_pipe.h index 94818e8..a56021d 100644 --- a/mojo/public/cpp/system/message_pipe.h +++ b/mojo/public/cpp/system/message_pipe.h @@ -88,6 +88,14 @@ inline MojoResult ReadMessageRaw(MessagePipeHandle message_pipe, message_pipe.value(), bytes, num_bytes, handles, num_handles, flags); } +// Fuses two message pipes together at the given handles. See +// |MojoFuseMessagePipes()| for complete documentation. +inline MojoResult FuseMessagePipes(ScopedMessagePipeHandle message_pipe0, + ScopedMessagePipeHandle message_pipe1) { + return MojoFuseMessagePipes(message_pipe0.release().value(), + message_pipe1.release().value()); +} + // A wrapper class that automatically creates a message pipe and owns both // handles. class MessagePipe { diff --git a/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom b/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom index a2dd73f..5960d75 100644 --- a/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom +++ b/mojo/public/interfaces/bindings/tests/sample_interfaces.mojom @@ -13,6 +13,10 @@ enum Enum { VALUE }; +interface PingTest { + Ping() => (); +}; + interface Provider { EchoString(string a) => (string a); EchoStrings(string a, string b) => (string a, string b); diff --git a/mojo/public/platform/native/system_thunks.cc b/mojo/public/platform/native/system_thunks.cc index fd14d48..72d879b 100644 --- a/mojo/public/platform/native/system_thunks.cc +++ b/mojo/public/platform/native/system_thunks.cc @@ -200,6 +200,11 @@ MojoResult MojoCancelWatch(MojoHandle handle, uintptr_t context) { return g_thunks.CancelWatch(handle, context); } +MojoResult MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1) { + assert(g_thunks.FuseMessagePipes); + return g_thunks.FuseMessagePipes(handle0, handle1); +} + extern "C" THUNK_EXPORT size_t MojoSetSystemThunks( const MojoSystemThunks* system_thunks) { if (system_thunks->size >= sizeof(g_thunks)) diff --git a/mojo/public/platform/native/system_thunks.h b/mojo/public/platform/native/system_thunks.h index b50970e..6790436 100644 --- a/mojo/public/platform/native/system_thunks.h +++ b/mojo/public/platform/native/system_thunks.h @@ -119,6 +119,7 @@ struct MojoSystemThunks { MojoWatchCallback callback, uintptr_t context); MojoResult (*CancelWatch)(MojoHandle handle, uintptr_t context); + MojoResult (*FuseMessagePipes)(MojoHandle handle0, MojoHandle handle1); }; #pragma pack(pop) @@ -151,7 +152,8 @@ inline MojoSystemThunks MojoMakeSystemThunks() { MojoRemoveHandle, MojoGetReadyHandles, MojoWatch, - MojoCancelWatch}; + MojoCancelWatch, + MojoFuseMessagePipes}; return system_thunks; } #endif |