diff options
-rw-r--r-- | mojo/embedder/embedder.cc | 33 | ||||
-rw-r--r-- | mojo/system/channel_unittest.cc | 24 | ||||
-rw-r--r-- | mojo/system/message_pipe.cc | 48 | ||||
-rw-r--r-- | mojo/system/message_pipe.h | 18 | ||||
-rw-r--r-- | mojo/system/message_pipe_dispatcher.cc | 21 | ||||
-rw-r--r-- | mojo/system/message_pipe_dispatcher.h | 8 | ||||
-rw-r--r-- | mojo/system/message_pipe_perftest.cc | 14 | ||||
-rw-r--r-- | mojo/system/message_pipe_test_utils.cc | 15 | ||||
-rw-r--r-- | mojo/system/message_pipe_test_utils.h | 12 | ||||
-rw-r--r-- | mojo/system/multiprocess_message_pipe_unittest.cc | 37 | ||||
-rw-r--r-- | mojo/system/proxy_message_pipe_endpoint.cc | 18 | ||||
-rw-r--r-- | mojo/system/proxy_message_pipe_endpoint.h | 4 | ||||
-rw-r--r-- | mojo/system/remote_message_pipe_unittest.cc | 125 |
13 files changed, 208 insertions, 169 deletions
diff --git a/mojo/embedder/embedder.cc b/mojo/embedder/embedder.cc index 5b2478a..ca9bcd7 100644 --- a/mojo/embedder/embedder.cc +++ b/mojo/embedder/embedder.cc @@ -14,7 +14,6 @@ #include "mojo/system/core.h" #include "mojo/system/entrypoints.h" #include "mojo/system/message_in_transit.h" -#include "mojo/system/message_pipe.h" #include "mojo/system/message_pipe_dispatcher.h" #include "mojo/system/platform_handle_dispatcher.h" #include "mojo/system/raw_channel.h" @@ -42,7 +41,7 @@ namespace { scoped_refptr<system::Channel> MakeChannel( system::Core* core, ScopedPlatformHandle platform_handle, - scoped_refptr<system::MessagePipe> message_pipe) { + scoped_refptr<system::ChannelEndpoint> channel_endpoint) { DCHECK(platform_handle.is_valid()); // Create and initialize a |system::Channel|. @@ -58,9 +57,9 @@ scoped_refptr<system::Channel> MakeChannel( // Once |Init()| has succeeded, we have to return |channel| (since // |Shutdown()| will have to be called on it). - // Attach the message pipe endpoint. - system::MessageInTransit::EndpointId endpoint_id = channel->AttachEndpoint( - make_scoped_refptr(new system::ChannelEndpoint(message_pipe.get(), 1))); + // Attach the endpoint. + system::MessageInTransit::EndpointId endpoint_id = + channel->AttachEndpoint(channel_endpoint); if (endpoint_id == system::MessageInTransit::kInvalidEndpointId) { // This means that, e.g., the other endpoint of the message pipe was closed // first. But it's not necessarily an error per se. @@ -83,11 +82,11 @@ void CreateChannelHelper( system::Core* core, ScopedPlatformHandle platform_handle, scoped_ptr<ChannelInfo> channel_info, - scoped_refptr<system::MessagePipe> message_pipe, + scoped_refptr<system::ChannelEndpoint> channel_endpoint, DidCreateChannelCallback callback, scoped_refptr<base::TaskRunner> callback_thread_task_runner) { channel_info->channel = - MakeChannel(core, platform_handle.Pass(), message_pipe); + MakeChannel(core, platform_handle.Pass(), channel_endpoint); // Hand the channel back to the embedder. if (callback_thread_task_runner.get()) { @@ -111,18 +110,18 @@ ScopedMessagePipeHandle CreateChannelOnIOThread( DCHECK(platform_handle.is_valid()); DCHECK(channel_info); - std::pair<scoped_refptr<system::MessagePipeDispatcher>, - scoped_refptr<system::MessagePipe> > remote_message_pipe = - system::MessagePipeDispatcher::CreateRemoteMessagePipe(); + scoped_refptr<system::ChannelEndpoint> channel_endpoint; + scoped_refptr<system::MessagePipeDispatcher> dispatcher = + system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint); system::Core* core = system::entrypoints::GetCore(); DCHECK(core); ScopedMessagePipeHandle rv( - MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first))); + MessagePipeHandle(core->AddDispatcher(dispatcher))); *channel_info = new ChannelInfo(); (*channel_info)->channel = - MakeChannel(core, platform_handle.Pass(), remote_message_pipe.second); + MakeChannel(core, platform_handle.Pass(), channel_endpoint); return rv.Pass(); } @@ -134,14 +133,14 @@ ScopedMessagePipeHandle CreateChannel( scoped_refptr<base::TaskRunner> callback_thread_task_runner) { DCHECK(platform_handle.is_valid()); - std::pair<scoped_refptr<system::MessagePipeDispatcher>, - scoped_refptr<system::MessagePipe> > remote_message_pipe = - system::MessagePipeDispatcher::CreateRemoteMessagePipe(); + scoped_refptr<system::ChannelEndpoint> channel_endpoint; + scoped_refptr<system::MessagePipeDispatcher> dispatcher = + system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint); system::Core* core = system::entrypoints::GetCore(); DCHECK(core); ScopedMessagePipeHandle rv( - MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first))); + MessagePipeHandle(core->AddDispatcher(dispatcher))); scoped_ptr<ChannelInfo> channel_info(new ChannelInfo()); channel_info->io_thread_task_runner = io_thread_task_runner; @@ -152,7 +151,7 @@ ScopedMessagePipeHandle CreateChannel( base::Unretained(core), base::Passed(&platform_handle), base::Passed(&channel_info), - remote_message_pipe.second, + channel_endpoint, callback, callback_thread_task_runner)); } else { diff --git a/mojo/system/channel_unittest.cc b/mojo/system/channel_unittest.cc index 18329a8..e710096 100644 --- a/mojo/system/channel_unittest.cc +++ b/mojo/system/channel_unittest.cc @@ -194,10 +194,12 @@ TEST_F(ChannelTest, CloseBeforeRun) { base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this))); EXPECT_EQ(TRISTATE_TRUE, init_result()); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); + scoped_refptr<ChannelEndpoint> channel_endpoint; + scoped_refptr<MessagePipe> mp( + MessagePipe::CreateLocalProxy(&channel_endpoint)); - MessageInTransit::EndpointId local_id = channel()->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(mp.get(), 1))); + MessageInTransit::EndpointId local_id = + channel()->AttachEndpoint(channel_endpoint); EXPECT_EQ(Channel::kBootstrapEndpointId, local_id); mp->Close(0); @@ -232,10 +234,12 @@ TEST_F(ChannelTest, ShutdownAfterAttach) { base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this))); EXPECT_EQ(TRISTATE_TRUE, init_result()); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); + scoped_refptr<ChannelEndpoint> channel_endpoint; + scoped_refptr<MessagePipe> mp( + MessagePipe::CreateLocalProxy(&channel_endpoint)); - MessageInTransit::EndpointId local_id = channel()->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(mp.get(), 1))); + MessageInTransit::EndpointId local_id = + channel()->AttachEndpoint(channel_endpoint); EXPECT_EQ(Channel::kBootstrapEndpointId, local_id); // TODO(vtl): Currently, we always "expect" a |RunMessagePipeEndpoint()| after @@ -282,10 +286,12 @@ TEST_F(ChannelTest, WaitAfterAttachRunAndShutdown) { base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this))); EXPECT_EQ(TRISTATE_TRUE, init_result()); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); + scoped_refptr<ChannelEndpoint> channel_endpoint; + scoped_refptr<MessagePipe> mp( + MessagePipe::CreateLocalProxy(&channel_endpoint)); - MessageInTransit::EndpointId local_id = channel()->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(mp.get(), 1))); + MessageInTransit::EndpointId local_id = + channel()->AttachEndpoint(channel_endpoint); EXPECT_EQ(Channel::kBootstrapEndpointId, local_id); EXPECT_TRUE(channel()->RunMessagePipeEndpoint(local_id, diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc index 872ed30..4a75a29 100644 --- a/mojo/system/message_pipe.cc +++ b/mojo/system/message_pipe.cc @@ -15,31 +15,36 @@ namespace mojo { namespace system { -MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0, - scoped_ptr<MessagePipeEndpoint> endpoint1) { - endpoints_[0].reset(endpoint0.release()); - endpoints_[1].reset(endpoint1.release()); -} - // static MessagePipe* MessagePipe::CreateLocalLocal() { - return new MessagePipe( - scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint), - scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint)); + MessagePipe* message_pipe = new MessagePipe(); + message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); + message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); + return message_pipe; } // static -MessagePipe* MessagePipe::CreateLocalProxy() { - return new MessagePipe( - scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint), - scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint)); +MessagePipe* MessagePipe::CreateLocalProxy( + scoped_refptr<ChannelEndpoint>* channel_endpoint) { + DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. + MessagePipe* message_pipe = new MessagePipe(); + message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); + *channel_endpoint = new ChannelEndpoint(message_pipe, 1); + message_pipe->endpoints_[1].reset( + new ProxyMessagePipeEndpoint(channel_endpoint->get())); + return message_pipe; } // static -MessagePipe* MessagePipe::CreateProxyLocal() { - return new MessagePipe( - scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint), - scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint)); +MessagePipe* MessagePipe::CreateProxyLocal( + scoped_refptr<ChannelEndpoint>* channel_endpoint) { + DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. + MessagePipe* message_pipe = new MessagePipe(); + *channel_endpoint = new ChannelEndpoint(message_pipe, 0); + message_pipe->endpoints_[0].reset( + new ProxyMessagePipeEndpoint(channel_endpoint->get())); + message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); + return message_pipe; } // static @@ -165,13 +170,16 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { << "Direct message pipe passing across multiple channels not yet " "implemented; will proxy"; + scoped_refptr<ChannelEndpoint> channel_endpoint( + new ChannelEndpoint(this, port)); scoped_ptr<MessagePipeEndpoint> replacement_endpoint( new ProxyMessagePipeEndpoint( + channel_endpoint.get(), static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()), is_peer_open)); endpoints_[port].swap(replacement_endpoint); - return make_scoped_refptr(new ChannelEndpoint(this, port)); + return channel_endpoint; } MojoResult MessagePipe::EnqueueMessage(unsigned port, @@ -188,7 +196,6 @@ bool MessagePipe::Attach(unsigned port, ChannelEndpoint* channel_endpoint) { return false; DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy); - endpoints_[port]->Attach(channel_endpoint); return true; } @@ -217,6 +224,9 @@ void MessagePipe::OnRemove(unsigned port) { endpoints_[port].reset(); } +MessagePipe::MessagePipe() { +} + MessagePipe::~MessagePipe() { // Owned by the dispatchers. The owning dispatchers should only release us via // their |Close()| method, which should inform us of being closed via our diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h index 07b8923..26f3fa7 100644 --- a/mojo/system/message_pipe.h +++ b/mojo/system/message_pipe.h @@ -34,22 +34,23 @@ class Waiter; class MOJO_SYSTEM_IMPL_EXPORT MessagePipe : public base::RefCountedThreadSafe<MessagePipe> { public: - MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0, - scoped_ptr<MessagePipeEndpoint> endpoint1); - // Creates a |MessagePipe| with two new |LocalMessagePipeEndpoint|s. static MessagePipe* CreateLocalLocal(); // Creates a |MessagePipe| with a |LocalMessagePipeEndpoint| on port 0 and a - // |ProxyMessagePipeEndpoint| on port 1. - static MessagePipe* CreateLocalProxy(); + // |ProxyMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the + // (newly-created) |ChannelEndpoint| for the latter. + static MessagePipe* CreateLocalProxy( + scoped_refptr<ChannelEndpoint>* channel_endpoint); // Creates a |MessagePipe| with a |ProxyMessagePipeEndpoint| on port 0 and a - // |LocalMessagePipeEndpoint| on port 1. + // |LocalMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the + // (newly-created) |ChannelEndpoint| for the former. // Note: This is really only needed in tests (outside of tests, this // configuration arises from a local message pipe having its port 0 // "converted" using |ConvertLocalToProxy()|). - static MessagePipe* CreateProxyLocal(); + static MessagePipe* CreateProxyLocal( + scoped_refptr<ChannelEndpoint>* channel_endpoint); // Gets the other port number (i.e., 0 -> 1, 1 -> 0). static unsigned GetPeerPort(unsigned port); @@ -95,11 +96,14 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe scoped_ptr<MessageInTransit> message); // These are used by |Channel|. + // TODO(vtl): Remove |Attach()|. bool Attach(unsigned port, ChannelEndpoint* channel_endpoint); void Run(unsigned port); void OnRemove(unsigned port); private: + MessagePipe(); + friend class base::RefCountedThreadSafe<MessagePipe>; virtual ~MessagePipe(); diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc index 85c0d38..94aceb0 100644 --- a/mojo/system/message_pipe_dispatcher.cc +++ b/mojo/system/message_pipe_dispatcher.cc @@ -83,14 +83,15 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { } // static -std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> > -MessagePipeDispatcher::CreateRemoteMessagePipe() { - scoped_refptr<MessagePipe> message_pipe(MessagePipe::CreateLocalProxy()); +scoped_refptr<MessagePipeDispatcher> +MessagePipeDispatcher::CreateRemoteMessagePipe( + scoped_refptr<ChannelEndpoint>* channel_endpoint) { + scoped_refptr<MessagePipe> message_pipe( + MessagePipe::CreateLocalProxy(channel_endpoint)); scoped_refptr<MessagePipeDispatcher> dispatcher( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); dispatcher->Init(message_pipe, 0); - - return std::make_pair(dispatcher, message_pipe); + return dispatcher; } // static @@ -103,8 +104,9 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( return scoped_refptr<MessagePipeDispatcher>(); } - std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> > - remote_message_pipe = CreateRemoteMessagePipe(); + scoped_refptr<ChannelEndpoint> channel_endpoint; + scoped_refptr<MessagePipeDispatcher> dispatcher = + CreateRemoteMessagePipe(&channel_endpoint); MessageInTransit::EndpointId remote_id = static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id; @@ -117,8 +119,7 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( return scoped_refptr<MessagePipeDispatcher>(); } MessageInTransit::EndpointId local_id = - channel->AttachEndpoint(make_scoped_refptr( - new ChannelEndpoint(remote_message_pipe.second.get(), 1))); + channel->AttachEndpoint(channel_endpoint); if (local_id == MessageInTransit::kInvalidEndpointId) { LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to " "attach; remote ID = " << remote_id << ")"; @@ -135,7 +136,7 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( // TODO(vtl): FIXME -- Need some error handling here. channel->RunRemoteMessagePipeEndpoint(local_id, remote_id); - return remote_message_pipe.first; + return dispatcher; } MessagePipeDispatcher::~MessagePipeDispatcher() { diff --git a/mojo/system/message_pipe_dispatcher.h b/mojo/system/message_pipe_dispatcher.h index 1c843fc..7a3ed93 100644 --- a/mojo/system/message_pipe_dispatcher.h +++ b/mojo/system/message_pipe_dispatcher.h @@ -5,8 +5,6 @@ #ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ #define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_ -#include <utility> - #include "base/macros.h" #include "base/memory/ref_counted.h" #include "mojo/system/dispatcher.h" @@ -16,6 +14,7 @@ namespace mojo { namespace system { +class ChannelEndpoint; class MessagePipe; class MessagePipeDispatcherTransport; @@ -51,9 +50,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher { // the message pipe, port 0). // TODO(vtl): This currently uses |kDefaultCreateOptions|, which is okay since // there aren't any options, but eventually options should be plumbed through. - static std::pair<scoped_refptr<MessagePipeDispatcher>, - scoped_refptr<MessagePipe> > - CreateRemoteMessagePipe(); + static scoped_refptr<MessagePipeDispatcher> CreateRemoteMessagePipe( + scoped_refptr<ChannelEndpoint>* channel_endpoint); // The "opposite" of |SerializeAndClose()|. (Typically this is called by // |Dispatcher::Deserialize()|.) diff --git a/mojo/system/message_pipe_perftest.cc b/mojo/system/message_pipe_perftest.cc index 4c1a1ab..33cffee 100644 --- a/mojo/system/message_pipe_perftest.cc +++ b/mojo/system/message_pipe_perftest.cc @@ -110,10 +110,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) { embedder::ScopedPlatformHandle client_platform_handle = mojo::test::MultiprocessTestHelper::client_platform_handle.Pass(); CHECK(client_platform_handle.is_valid()); - scoped_refptr<MessagePipe> mp(new MessagePipe( - scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), - scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); - channel_thread.Start(client_platform_handle.Pass(), mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + channel_thread.Start(client_platform_handle.Pass(), ep); std::string buffer(1000000, '\0'); int rv = 0; @@ -158,10 +157,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) { TEST_F(MultiprocessMessagePipePerfTest, PingPong) { helper()->StartChild("PingPongClient"); - scoped_refptr<MessagePipe> mp(new MessagePipe( - scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), - scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); - Init(mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + Init(ep); // This values are set to align with one at ipc_pertests.cc for comparison. const size_t kMsgSize[5] = {12, 144, 1728, 20736, 248832}; diff --git a/mojo/system/message_pipe_test_utils.cc b/mojo/system/message_pipe_test_utils.cc index 40f80d2..c70f0a0c 100644 --- a/mojo/system/message_pipe_test_utils.cc +++ b/mojo/system/message_pipe_test_utils.cc @@ -6,7 +6,9 @@ #include "base/bind.h" #include "base/threading/platform_thread.h" // For |Sleep()|. +#include "mojo/system/channel.h" #include "mojo/system/channel_endpoint.h" +#include "mojo/system/message_pipe.h" #include "mojo/system/waiter.h" namespace mojo { @@ -40,14 +42,14 @@ ChannelThread::~ChannelThread() { } void ChannelThread::Start(embedder::ScopedPlatformHandle platform_handle, - scoped_refptr<MessagePipe> message_pipe) { + scoped_refptr<ChannelEndpoint> channel_endpoint) { test_io_thread_.Start(); test_io_thread_.PostTaskAndWait( FROM_HERE, base::Bind(&ChannelThread::InitChannelOnIOThread, base::Unretained(this), base::Passed(&platform_handle), - message_pipe)); + channel_endpoint)); } void ChannelThread::Stop() { @@ -68,7 +70,7 @@ void ChannelThread::Stop() { void ChannelThread::InitChannelOnIOThread( embedder::ScopedPlatformHandle platform_handle, - scoped_refptr<MessagePipe> message_pipe) { + scoped_refptr<ChannelEndpoint> channel_endpoint) { CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop()); CHECK(platform_handle.is_valid()); @@ -83,8 +85,7 @@ void ChannelThread::InitChannelOnIOThread( // receive/process messages (which it can do as soon as it's hooked up to // the IO thread message loop, and that message loop runs) before the // message pipe endpoint is attached. - CHECK_EQ(channel_->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(message_pipe.get(), 1))), + CHECK_EQ(channel_->AttachEndpoint(channel_endpoint), Channel::kBootstrapEndpointId); CHECK(channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId)); @@ -104,8 +105,8 @@ MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase() MultiprocessMessagePipeTestBase::~MultiprocessMessagePipeTestBase() { } -void MultiprocessMessagePipeTestBase::Init(scoped_refptr<MessagePipe> mp) { - channel_thread_.Start(helper_.server_platform_handle.Pass(), mp); +void MultiprocessMessagePipeTestBase::Init(scoped_refptr<ChannelEndpoint> ep) { + channel_thread_.Start(helper_.server_platform_handle.Pass(), ep); } #endif diff --git a/mojo/system/message_pipe_test_utils.h b/mojo/system/message_pipe_test_utils.h index 4db7bdc..1d1f861 100644 --- a/mojo/system/message_pipe_test_utils.h +++ b/mojo/system/message_pipe_test_utils.h @@ -9,11 +9,15 @@ #include "mojo/common/test/multiprocess_test_helper.h" #include "mojo/embedder/simple_platform_support.h" #include "mojo/system/channel.h" -#include "mojo/system/message_pipe.h" #include "mojo/system/test_utils.h" namespace mojo { namespace system { + +class Channel; +class ChannelEndpoint; +class MessagePipe; + namespace test { MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, @@ -26,12 +30,12 @@ class ChannelThread { ~ChannelThread(); void Start(embedder::ScopedPlatformHandle platform_handle, - scoped_refptr<MessagePipe> message_pipe); + scoped_refptr<ChannelEndpoint> channel_endpoint); void Stop(); private: void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle, - scoped_refptr<MessagePipe> message_pipe); + scoped_refptr<ChannelEndpoint> channel_endpoint); void ShutdownChannelOnIOThread(); embedder::PlatformSupport* const platform_support_; @@ -48,7 +52,7 @@ class MultiprocessMessagePipeTestBase : public testing::Test { virtual ~MultiprocessMessagePipeTestBase(); protected: - void Init(scoped_refptr<MessagePipe> mp); + void Init(scoped_refptr<ChannelEndpoint> ep); embedder::PlatformSupport* platform_support() { return &platform_support_; } mojo::test::MultiprocessTestHelper* helper() { return &helper_; } diff --git a/mojo/system/multiprocess_message_pipe_unittest.cc b/mojo/system/multiprocess_message_pipe_unittest.cc index 829786d..7991c28 100644 --- a/mojo/system/multiprocess_message_pipe_unittest.cc +++ b/mojo/system/multiprocess_message_pipe_unittest.cc @@ -17,7 +17,7 @@ #include "base/location.h" #include "base/logging.h" #include "base/macros.h" -#include "build/build_config.h" // TODO(vtl): Remove this. +#include "build/build_config.h" // TODO(vtl): Remove this. #include "mojo/common/test/test_utils.h" #include "mojo/embedder/platform_shared_buffer.h" #include "mojo/embedder/scoped_platform_handle.h" @@ -48,8 +48,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) { embedder::ScopedPlatformHandle client_platform_handle = mojo::test::MultiprocessTestHelper::client_platform_handle.Pass(); CHECK(client_platform_handle.is_valid()); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - channel_thread.Start(client_platform_handle.Pass(), mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + channel_thread.Start(client_platform_handle.Pass(), ep); const std::string quitquitquit("quitquitquit"); int rv = 0; @@ -103,8 +104,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) { TEST_F(MultiprocessMessagePipeTest, Basic) { helper()->StartChild("EchoEcho"); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - Init(mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + Init(ep); std::string hello("hello"); EXPECT_EQ(MOJO_RESULT_OK, @@ -147,8 +149,9 @@ TEST_F(MultiprocessMessagePipeTest, Basic) { TEST_F(MultiprocessMessagePipeTest, QueueMessages) { helper()->StartChild("EchoEcho"); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - Init(mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + Init(ep); static const size_t kNumMessages = 1001; for (size_t i = 0; i < kNumMessages; i++) { @@ -213,8 +216,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) { embedder::ScopedPlatformHandle client_platform_handle = mojo::test::MultiprocessTestHelper::client_platform_handle.Pass(); CHECK(client_platform_handle.is_valid()); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - channel_thread.Start(client_platform_handle.Pass(), mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + channel_thread.Start(client_platform_handle.Pass(), ep); // Wait for the first message from our parent. HandleSignalsState hss; @@ -312,8 +316,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) { TEST_F(MultiprocessMessagePipeTest, MAYBE_SharedBufferPassing) { helper()->StartChild("CheckSharedBuffer"); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - Init(mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + Init(ep); // Make a shared buffer. scoped_refptr<SharedBufferDispatcher> dispatcher; @@ -407,8 +412,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile) { embedder::ScopedPlatformHandle client_platform_handle = mojo::test::MultiprocessTestHelper::client_platform_handle.Pass(); CHECK(client_platform_handle.is_valid()); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - channel_thread.Start(client_platform_handle.Pass(), mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + channel_thread.Start(client_platform_handle.Pass(), ep); HandleSignalsState hss; CHECK_EQ(test::WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE, &hss), @@ -465,8 +471,9 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_PlatformHandlePassing) { helper()->StartChild("CheckPlatformHandleFile"); - scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy()); - Init(mp); + scoped_refptr<ChannelEndpoint> ep; + scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep)); + Init(ep); base::FilePath unused; base::ScopedFILE fp( diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc index f3a3b34..a52099f 100644 --- a/mojo/system/proxy_message_pipe_endpoint.cc +++ b/mojo/system/proxy_message_pipe_endpoint.cc @@ -14,14 +14,20 @@ namespace mojo { namespace system { -ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() - : is_running_(false), is_peer_open_(true) { +ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( + ChannelEndpoint* channel_endpoint) + : channel_endpoint_(channel_endpoint), + is_running_(false), + is_peer_open_(true) { } ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( + ChannelEndpoint* channel_endpoint, LocalMessagePipeEndpoint* local_message_pipe_endpoint, bool is_peer_open) - : is_running_(false), is_peer_open_(is_peer_open) { + : channel_endpoint_(channel_endpoint), + is_running_(false), + is_peer_open_(is_peer_open) { paused_message_queue_.Swap(local_message_pipe_endpoint->message_queue()); local_message_pipe_endpoint->Close(); } @@ -72,12 +78,6 @@ void ProxyMessagePipeEndpoint::EnqueueMessage( } } -void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint) { - DCHECK(channel_endpoint); - DCHECK(!is_attached()); - channel_endpoint_ = channel_endpoint; -} - bool ProxyMessagePipeEndpoint::Run() { // Assertions about current state: DCHECK(is_attached()); diff --git a/mojo/system/proxy_message_pipe_endpoint.h b/mojo/system/proxy_message_pipe_endpoint.h index 95126f3..fdc5e72 100644 --- a/mojo/system/proxy_message_pipe_endpoint.h +++ b/mojo/system/proxy_message_pipe_endpoint.h @@ -37,12 +37,13 @@ class MessagePipe; class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint : public MessagePipeEndpoint { public: - ProxyMessagePipeEndpoint(); + explicit ProxyMessagePipeEndpoint(ChannelEndpoint* channel_endpoint); // Constructs a |ProxyMessagePipeEndpoint| that replaces the given // |LocalMessagePipeEndpoint| (which this constructor will close), taking its // message queue's contents. This is done when transferring a message pipe // handle over a remote message pipe. ProxyMessagePipeEndpoint( + ChannelEndpoint* channel_endpoint, LocalMessagePipeEndpoint* local_message_pipe_endpoint, bool is_peer_open); virtual ~ProxyMessagePipeEndpoint(); @@ -51,7 +52,6 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint virtual Type GetType() const OVERRIDE; virtual bool OnPeerClose() OVERRIDE; virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE; - virtual void Attach(ChannelEndpoint* channel_endpoint) OVERRIDE; virtual bool Run() OVERRIDE; virtual void OnRemove() OVERRIDE; diff --git a/mojo/system/remote_message_pipe_unittest.cc b/mojo/system/remote_message_pipe_unittest.cc index 8a3b9e6..ee1c80f 100644 --- a/mojo/system/remote_message_pipe_unittest.cc +++ b/mojo/system/remote_message_pipe_unittest.cc @@ -60,31 +60,29 @@ class RemoteMessagePipeTest : public testing::Test { } protected: - // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1, - // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP - // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s. - void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0, - scoped_refptr<MessagePipe> mp1) { + // This connects the two given |ChannelEndpoint|s. + void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0, + scoped_refptr<ChannelEndpoint> ep1) { io_thread_.PostTaskAndWait( FROM_HERE, - base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread, + base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread, base::Unretained(this), - mp0, - mp1)); + ep0, + ep1)); } - // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|. - // It assumes/requires that this is the bootstrap case, i.e., that the - // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This - // returns *without* waiting for it to finish connecting. - void BootstrapMessagePipeNoWait(unsigned channel_index, - scoped_refptr<MessagePipe> mp) { + // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires + // that this is the bootstrap case, i.e., that the endpoint IDs are both/will + // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for + // it to finish connecting. + void BootstrapChannelEndpointNoWait(unsigned channel_index, + scoped_refptr<ChannelEndpoint> ep) { io_thread_.PostTask( FROM_HERE, - base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread, + base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread, base::Unretained(this), channel_index, - mp)); + ep)); } void RestoreInitialState() { @@ -129,8 +127,8 @@ class RemoteMessagePipeTest : public testing::Test { RawChannel::Create(platform_handles_[channel_index].Pass()))); } - void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0, - scoped_refptr<MessagePipe> mp1) { + void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0, + scoped_refptr<ChannelEndpoint> ep1) { CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); if (!channels_[0].get()) @@ -138,26 +136,21 @@ class RemoteMessagePipeTest : public testing::Test { if (!channels_[1].get()) CreateAndInitChannel(1); - MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(mp0.get(), 1))); - MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(mp1.get(), 0))); + MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0); + MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1); CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1)); CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0)); } - void BootstrapMessagePipeOnIOThread(unsigned channel_index, - scoped_refptr<MessagePipe> mp) { + void BootstrapChannelEndpointOnIOThread(unsigned channel_index, + scoped_refptr<ChannelEndpoint> ep) { CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop()); CHECK(channel_index == 0 || channel_index == 1); - unsigned port = channel_index ^ 1u; - CreateAndInitChannel(channel_index); MessageInTransit::EndpointId endpoint_id = - channels_[channel_index]->AttachEndpoint( - make_scoped_refptr(new ChannelEndpoint(mp.get(), port))); + channels_[channel_index]->AttachEndpoint(ep); if (endpoint_id == MessageInTransit::kInvalidEndpointId) return; @@ -194,9 +187,11 @@ TEST_F(RemoteMessagePipeTest, Basic) { // connected to MP 1, port 0, which will be attached to channel 1. This leaves // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp0, mp1); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + ConnectChannelEndpoints(ep0, ep1); // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1. @@ -303,15 +298,19 @@ TEST_F(RemoteMessagePipeTest, Multiplex) { // Connect message pipes as in the |Basic| test. - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp0, mp1); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + ConnectChannelEndpoints(ep0, ep1); // Now put another message pipe on the channel. - scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp2, mp3); + scoped_refptr<ChannelEndpoint> ep2; + scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2)); + scoped_refptr<ChannelEndpoint> ep3; + scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3)); + ConnectChannelEndpoints(ep2, ep3); // Write: MP 2, port 0 -> MP 3, port 1. @@ -450,7 +449,8 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) { // connected to MP 1, port 0, which will be attached to channel 1. This leaves // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); // Write to MP 0, port 0. EXPECT_EQ(MOJO_RESULT_OK, @@ -460,12 +460,13 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) { NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); - BootstrapMessagePipeNoWait(0, mp0); + BootstrapChannelEndpointNoWait(0, ep0); // Close MP 0, port 0 before channel 1 is even connected. mp0->Close(0); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do // it later, it might already be readable.) @@ -473,7 +474,7 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) { ASSERT_EQ(MOJO_RESULT_OK, mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL)); - BootstrapMessagePipeNoWait(1, mp1); + BootstrapChannelEndpointNoWait(1, ep1); // Wait. EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context)); @@ -508,9 +509,11 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) { HandleSignalsState hss; uint32_t context = 0; - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp0, mp1); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + ConnectChannelEndpoints(ep0, ep1); // We'll try to pass this dispatcher. scoped_refptr<MessagePipeDispatcher> dispatcher( @@ -676,9 +679,11 @@ TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) { HandleSignalsState hss; uint32_t context = 0; - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp0, mp1); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + ConnectChannelEndpoints(ep0, ep1); // We'll try to pass this dispatcher. scoped_refptr<SharedBufferDispatcher> dispatcher; @@ -810,9 +815,11 @@ TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) { uint32_t context = 0; HandleSignalsState hss; - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp0, mp1); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + ConnectChannelEndpoints(ep0, ep1); base::FilePath unused; base::ScopedFILE fp( @@ -913,11 +920,13 @@ TEST_F(RemoteMessagePipeTest, RacingClosesStress) { for (unsigned i = 0; i < 256; i++) { DVLOG(2) << "---------------------------------------- " << i; - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - BootstrapMessagePipeNoWait(0, mp0); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + BootstrapChannelEndpointNoWait(0, ep0); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - BootstrapMessagePipeNoWait(1, mp1); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + BootstrapChannelEndpointNoWait(1, ep1); if (i & 1u) { io_thread()->task_runner()->PostTask( @@ -951,9 +960,11 @@ TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) { HandleSignalsState hss; uint32_t context = 0; - scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy()); - scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal()); - ConnectMessagePipes(mp0, mp1); + scoped_refptr<ChannelEndpoint> ep0; + scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0)); + scoped_refptr<ChannelEndpoint> ep1; + scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1)); + ConnectChannelEndpoints(ep0, ep1); // We'll try to pass this dispatcher. scoped_refptr<MessagePipeDispatcher> dispatcher( |