summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorviettrungluu <viettrungluu@chromium.org>2014-09-24 12:23:09 -0700
committerCommit bot <commit-bot@chromium.org>2014-09-24 19:23:29 +0000
commit23ae8173cca87c9dd9ca71cf3142f75a88e43919 (patch)
treed5b218afedf74134a50a71e67e751f6f7c3c80a0
parentfaa99f8c03c439f26c5ff497d3804b0818f1cf2f (diff)
downloadchromium_src-23ae8173cca87c9dd9ca71cf3142f75a88e43919.zip
chromium_src-23ae8173cca87c9dd9ca71cf3142f75a88e43919.tar.gz
chromium_src-23ae8173cca87c9dd9ca71cf3142f75a88e43919.tar.bz2
Mojo: Have |ProxyMessagePipeEndpoint|s constructed with a |ChannelEndpoint|.
This eliminates the need for |ProxyMessagePipeEndpoint::Attach()|. R=brettw@chromium.org Review URL: https://codereview.chromium.org/588193004 Cr-Commit-Position: refs/heads/master@{#296485}
-rw-r--r--mojo/embedder/embedder.cc33
-rw-r--r--mojo/system/channel_unittest.cc24
-rw-r--r--mojo/system/message_pipe.cc48
-rw-r--r--mojo/system/message_pipe.h18
-rw-r--r--mojo/system/message_pipe_dispatcher.cc21
-rw-r--r--mojo/system/message_pipe_dispatcher.h8
-rw-r--r--mojo/system/message_pipe_perftest.cc14
-rw-r--r--mojo/system/message_pipe_test_utils.cc15
-rw-r--r--mojo/system/message_pipe_test_utils.h12
-rw-r--r--mojo/system/multiprocess_message_pipe_unittest.cc37
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.cc18
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.h4
-rw-r--r--mojo/system/remote_message_pipe_unittest.cc125
13 files changed, 208 insertions, 169 deletions
diff --git a/mojo/embedder/embedder.cc b/mojo/embedder/embedder.cc
index 5b2478a..ca9bcd7 100644
--- a/mojo/embedder/embedder.cc
+++ b/mojo/embedder/embedder.cc
@@ -14,7 +14,6 @@
#include "mojo/system/core.h"
#include "mojo/system/entrypoints.h"
#include "mojo/system/message_in_transit.h"
-#include "mojo/system/message_pipe.h"
#include "mojo/system/message_pipe_dispatcher.h"
#include "mojo/system/platform_handle_dispatcher.h"
#include "mojo/system/raw_channel.h"
@@ -42,7 +41,7 @@ namespace {
scoped_refptr<system::Channel> MakeChannel(
system::Core* core,
ScopedPlatformHandle platform_handle,
- scoped_refptr<system::MessagePipe> message_pipe) {
+ scoped_refptr<system::ChannelEndpoint> channel_endpoint) {
DCHECK(platform_handle.is_valid());
// Create and initialize a |system::Channel|.
@@ -58,9 +57,9 @@ scoped_refptr<system::Channel> MakeChannel(
// Once |Init()| has succeeded, we have to return |channel| (since
// |Shutdown()| will have to be called on it).
- // Attach the message pipe endpoint.
- system::MessageInTransit::EndpointId endpoint_id = channel->AttachEndpoint(
- make_scoped_refptr(new system::ChannelEndpoint(message_pipe.get(), 1)));
+ // Attach the endpoint.
+ system::MessageInTransit::EndpointId endpoint_id =
+ channel->AttachEndpoint(channel_endpoint);
if (endpoint_id == system::MessageInTransit::kInvalidEndpointId) {
// This means that, e.g., the other endpoint of the message pipe was closed
// first. But it's not necessarily an error per se.
@@ -83,11 +82,11 @@ void CreateChannelHelper(
system::Core* core,
ScopedPlatformHandle platform_handle,
scoped_ptr<ChannelInfo> channel_info,
- scoped_refptr<system::MessagePipe> message_pipe,
+ scoped_refptr<system::ChannelEndpoint> channel_endpoint,
DidCreateChannelCallback callback,
scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
channel_info->channel =
- MakeChannel(core, platform_handle.Pass(), message_pipe);
+ MakeChannel(core, platform_handle.Pass(), channel_endpoint);
// Hand the channel back to the embedder.
if (callback_thread_task_runner.get()) {
@@ -111,18 +110,18 @@ ScopedMessagePipeHandle CreateChannelOnIOThread(
DCHECK(platform_handle.is_valid());
DCHECK(channel_info);
- std::pair<scoped_refptr<system::MessagePipeDispatcher>,
- scoped_refptr<system::MessagePipe> > remote_message_pipe =
- system::MessagePipeDispatcher::CreateRemoteMessagePipe();
+ scoped_refptr<system::ChannelEndpoint> channel_endpoint;
+ scoped_refptr<system::MessagePipeDispatcher> dispatcher =
+ system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint);
system::Core* core = system::entrypoints::GetCore();
DCHECK(core);
ScopedMessagePipeHandle rv(
- MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first)));
+ MessagePipeHandle(core->AddDispatcher(dispatcher)));
*channel_info = new ChannelInfo();
(*channel_info)->channel =
- MakeChannel(core, platform_handle.Pass(), remote_message_pipe.second);
+ MakeChannel(core, platform_handle.Pass(), channel_endpoint);
return rv.Pass();
}
@@ -134,14 +133,14 @@ ScopedMessagePipeHandle CreateChannel(
scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
DCHECK(platform_handle.is_valid());
- std::pair<scoped_refptr<system::MessagePipeDispatcher>,
- scoped_refptr<system::MessagePipe> > remote_message_pipe =
- system::MessagePipeDispatcher::CreateRemoteMessagePipe();
+ scoped_refptr<system::ChannelEndpoint> channel_endpoint;
+ scoped_refptr<system::MessagePipeDispatcher> dispatcher =
+ system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint);
system::Core* core = system::entrypoints::GetCore();
DCHECK(core);
ScopedMessagePipeHandle rv(
- MessagePipeHandle(core->AddDispatcher(remote_message_pipe.first)));
+ MessagePipeHandle(core->AddDispatcher(dispatcher)));
scoped_ptr<ChannelInfo> channel_info(new ChannelInfo());
channel_info->io_thread_task_runner = io_thread_task_runner;
@@ -152,7 +151,7 @@ ScopedMessagePipeHandle CreateChannel(
base::Unretained(core),
base::Passed(&platform_handle),
base::Passed(&channel_info),
- remote_message_pipe.second,
+ channel_endpoint,
callback,
callback_thread_task_runner));
} else {
diff --git a/mojo/system/channel_unittest.cc b/mojo/system/channel_unittest.cc
index 18329a8..e710096 100644
--- a/mojo/system/channel_unittest.cc
+++ b/mojo/system/channel_unittest.cc
@@ -194,10 +194,12 @@ TEST_F(ChannelTest, CloseBeforeRun) {
base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this)));
EXPECT_EQ(TRISTATE_TRUE, init_result());
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
+ scoped_refptr<ChannelEndpoint> channel_endpoint;
+ scoped_refptr<MessagePipe> mp(
+ MessagePipe::CreateLocalProxy(&channel_endpoint));
- MessageInTransit::EndpointId local_id = channel()->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(mp.get(), 1)));
+ MessageInTransit::EndpointId local_id =
+ channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(Channel::kBootstrapEndpointId, local_id);
mp->Close(0);
@@ -232,10 +234,12 @@ TEST_F(ChannelTest, ShutdownAfterAttach) {
base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this)));
EXPECT_EQ(TRISTATE_TRUE, init_result());
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
+ scoped_refptr<ChannelEndpoint> channel_endpoint;
+ scoped_refptr<MessagePipe> mp(
+ MessagePipe::CreateLocalProxy(&channel_endpoint));
- MessageInTransit::EndpointId local_id = channel()->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(mp.get(), 1)));
+ MessageInTransit::EndpointId local_id =
+ channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(Channel::kBootstrapEndpointId, local_id);
// TODO(vtl): Currently, we always "expect" a |RunMessagePipeEndpoint()| after
@@ -282,10 +286,12 @@ TEST_F(ChannelTest, WaitAfterAttachRunAndShutdown) {
base::Bind(&ChannelTest::InitChannelOnIOThread, base::Unretained(this)));
EXPECT_EQ(TRISTATE_TRUE, init_result());
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
+ scoped_refptr<ChannelEndpoint> channel_endpoint;
+ scoped_refptr<MessagePipe> mp(
+ MessagePipe::CreateLocalProxy(&channel_endpoint));
- MessageInTransit::EndpointId local_id = channel()->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(mp.get(), 1)));
+ MessageInTransit::EndpointId local_id =
+ channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(Channel::kBootstrapEndpointId, local_id);
EXPECT_TRUE(channel()->RunMessagePipeEndpoint(local_id,
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index 872ed30..4a75a29 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -15,31 +15,36 @@
namespace mojo {
namespace system {
-MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
- scoped_ptr<MessagePipeEndpoint> endpoint1) {
- endpoints_[0].reset(endpoint0.release());
- endpoints_[1].reset(endpoint1.release());
-}
-
// static
MessagePipe* MessagePipe::CreateLocalLocal() {
- return new MessagePipe(
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint),
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint));
+ MessagePipe* message_pipe = new MessagePipe();
+ message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
+ message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
+ return message_pipe;
}
// static
-MessagePipe* MessagePipe::CreateLocalProxy() {
- return new MessagePipe(
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint),
- scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint));
+MessagePipe* MessagePipe::CreateLocalProxy(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+ DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
+ MessagePipe* message_pipe = new MessagePipe();
+ message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
+ *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
+ message_pipe->endpoints_[1].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint->get()));
+ return message_pipe;
}
// static
-MessagePipe* MessagePipe::CreateProxyLocal() {
- return new MessagePipe(
- scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint),
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint));
+MessagePipe* MessagePipe::CreateProxyLocal(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+ DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
+ MessagePipe* message_pipe = new MessagePipe();
+ *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
+ message_pipe->endpoints_[0].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint->get()));
+ message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
+ return message_pipe;
}
// static
@@ -165,13 +170,16 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
<< "Direct message pipe passing across multiple channels not yet "
"implemented; will proxy";
+ scoped_refptr<ChannelEndpoint> channel_endpoint(
+ new ChannelEndpoint(this, port));
scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
new ProxyMessagePipeEndpoint(
+ channel_endpoint.get(),
static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
is_peer_open));
endpoints_[port].swap(replacement_endpoint);
- return make_scoped_refptr(new ChannelEndpoint(this, port));
+ return channel_endpoint;
}
MojoResult MessagePipe::EnqueueMessage(unsigned port,
@@ -188,7 +196,6 @@ bool MessagePipe::Attach(unsigned port, ChannelEndpoint* channel_endpoint) {
return false;
DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
- endpoints_[port]->Attach(channel_endpoint);
return true;
}
@@ -217,6 +224,9 @@ void MessagePipe::OnRemove(unsigned port) {
endpoints_[port].reset();
}
+MessagePipe::MessagePipe() {
+}
+
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
index 07b8923..26f3fa7 100644
--- a/mojo/system/message_pipe.h
+++ b/mojo/system/message_pipe.h
@@ -34,22 +34,23 @@ class Waiter;
class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
: public base::RefCountedThreadSafe<MessagePipe> {
public:
- MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
- scoped_ptr<MessagePipeEndpoint> endpoint1);
-
// Creates a |MessagePipe| with two new |LocalMessagePipeEndpoint|s.
static MessagePipe* CreateLocalLocal();
// Creates a |MessagePipe| with a |LocalMessagePipeEndpoint| on port 0 and a
- // |ProxyMessagePipeEndpoint| on port 1.
- static MessagePipe* CreateLocalProxy();
+ // |ProxyMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the
+ // (newly-created) |ChannelEndpoint| for the latter.
+ static MessagePipe* CreateLocalProxy(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint);
// Creates a |MessagePipe| with a |ProxyMessagePipeEndpoint| on port 0 and a
- // |LocalMessagePipeEndpoint| on port 1.
+ // |LocalMessagePipeEndpoint| on port 1. |*channel_endpoint| is set to the
+ // (newly-created) |ChannelEndpoint| for the former.
// Note: This is really only needed in tests (outside of tests, this
// configuration arises from a local message pipe having its port 0
// "converted" using |ConvertLocalToProxy()|).
- static MessagePipe* CreateProxyLocal();
+ static MessagePipe* CreateProxyLocal(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint);
// Gets the other port number (i.e., 0 -> 1, 1 -> 0).
static unsigned GetPeerPort(unsigned port);
@@ -95,11 +96,14 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe
scoped_ptr<MessageInTransit> message);
// These are used by |Channel|.
+ // TODO(vtl): Remove |Attach()|.
bool Attach(unsigned port, ChannelEndpoint* channel_endpoint);
void Run(unsigned port);
void OnRemove(unsigned port);
private:
+ MessagePipe();
+
friend class base::RefCountedThreadSafe<MessagePipe>;
virtual ~MessagePipe();
diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc
index 85c0d38..94aceb0 100644
--- a/mojo/system/message_pipe_dispatcher.cc
+++ b/mojo/system/message_pipe_dispatcher.cc
@@ -83,14 +83,15 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
}
// static
-std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
-MessagePipeDispatcher::CreateRemoteMessagePipe() {
- scoped_refptr<MessagePipe> message_pipe(MessagePipe::CreateLocalProxy());
+scoped_refptr<MessagePipeDispatcher>
+MessagePipeDispatcher::CreateRemoteMessagePipe(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+ scoped_refptr<MessagePipe> message_pipe(
+ MessagePipe::CreateLocalProxy(channel_endpoint));
scoped_refptr<MessagePipeDispatcher> dispatcher(
new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
dispatcher->Init(message_pipe, 0);
-
- return std::make_pair(dispatcher, message_pipe);
+ return dispatcher;
}
// static
@@ -103,8 +104,9 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
return scoped_refptr<MessagePipeDispatcher>();
}
- std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
- remote_message_pipe = CreateRemoteMessagePipe();
+ scoped_refptr<ChannelEndpoint> channel_endpoint;
+ scoped_refptr<MessagePipeDispatcher> dispatcher =
+ CreateRemoteMessagePipe(&channel_endpoint);
MessageInTransit::EndpointId remote_id =
static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
@@ -117,8 +119,7 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
return scoped_refptr<MessagePipeDispatcher>();
}
MessageInTransit::EndpointId local_id =
- channel->AttachEndpoint(make_scoped_refptr(
- new ChannelEndpoint(remote_message_pipe.second.get(), 1)));
+ channel->AttachEndpoint(channel_endpoint);
if (local_id == MessageInTransit::kInvalidEndpointId) {
LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
"attach; remote ID = " << remote_id << ")";
@@ -135,7 +136,7 @@ scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
// TODO(vtl): FIXME -- Need some error handling here.
channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
- return remote_message_pipe.first;
+ return dispatcher;
}
MessagePipeDispatcher::~MessagePipeDispatcher() {
diff --git a/mojo/system/message_pipe_dispatcher.h b/mojo/system/message_pipe_dispatcher.h
index 1c843fc..7a3ed93 100644
--- a/mojo/system/message_pipe_dispatcher.h
+++ b/mojo/system/message_pipe_dispatcher.h
@@ -5,8 +5,6 @@
#ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
#define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
-#include <utility>
-
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "mojo/system/dispatcher.h"
@@ -16,6 +14,7 @@
namespace mojo {
namespace system {
+class ChannelEndpoint;
class MessagePipe;
class MessagePipeDispatcherTransport;
@@ -51,9 +50,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher {
// the message pipe, port 0).
// TODO(vtl): This currently uses |kDefaultCreateOptions|, which is okay since
// there aren't any options, but eventually options should be plumbed through.
- static std::pair<scoped_refptr<MessagePipeDispatcher>,
- scoped_refptr<MessagePipe> >
- CreateRemoteMessagePipe();
+ static scoped_refptr<MessagePipeDispatcher> CreateRemoteMessagePipe(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint);
// The "opposite" of |SerializeAndClose()|. (Typically this is called by
// |Dispatcher::Deserialize()|.)
diff --git a/mojo/system/message_pipe_perftest.cc b/mojo/system/message_pipe_perftest.cc
index 4c1a1ab..33cffee 100644
--- a/mojo/system/message_pipe_perftest.cc
+++ b/mojo/system/message_pipe_perftest.cc
@@ -110,10 +110,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
- scoped_refptr<MessagePipe> mp(new MessagePipe(
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
- scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
- channel_thread.Start(client_platform_handle.Pass(), mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ channel_thread.Start(client_platform_handle.Pass(), ep);
std::string buffer(1000000, '\0');
int rv = 0;
@@ -158,10 +157,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(PingPongClient) {
TEST_F(MultiprocessMessagePipePerfTest, PingPong) {
helper()->StartChild("PingPongClient");
- scoped_refptr<MessagePipe> mp(new MessagePipe(
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
- scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
- Init(mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ Init(ep);
// This values are set to align with one at ipc_pertests.cc for comparison.
const size_t kMsgSize[5] = {12, 144, 1728, 20736, 248832};
diff --git a/mojo/system/message_pipe_test_utils.cc b/mojo/system/message_pipe_test_utils.cc
index 40f80d2..c70f0a0c 100644
--- a/mojo/system/message_pipe_test_utils.cc
+++ b/mojo/system/message_pipe_test_utils.cc
@@ -6,7 +6,9 @@
#include "base/bind.h"
#include "base/threading/platform_thread.h" // For |Sleep()|.
+#include "mojo/system/channel.h"
#include "mojo/system/channel_endpoint.h"
+#include "mojo/system/message_pipe.h"
#include "mojo/system/waiter.h"
namespace mojo {
@@ -40,14 +42,14 @@ ChannelThread::~ChannelThread() {
}
void ChannelThread::Start(embedder::ScopedPlatformHandle platform_handle,
- scoped_refptr<MessagePipe> message_pipe) {
+ scoped_refptr<ChannelEndpoint> channel_endpoint) {
test_io_thread_.Start();
test_io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&ChannelThread::InitChannelOnIOThread,
base::Unretained(this),
base::Passed(&platform_handle),
- message_pipe));
+ channel_endpoint));
}
void ChannelThread::Stop() {
@@ -68,7 +70,7 @@ void ChannelThread::Stop() {
void ChannelThread::InitChannelOnIOThread(
embedder::ScopedPlatformHandle platform_handle,
- scoped_refptr<MessagePipe> message_pipe) {
+ scoped_refptr<ChannelEndpoint> channel_endpoint) {
CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop());
CHECK(platform_handle.is_valid());
@@ -83,8 +85,7 @@ void ChannelThread::InitChannelOnIOThread(
// receive/process messages (which it can do as soon as it's hooked up to
// the IO thread message loop, and that message loop runs) before the
// message pipe endpoint is attached.
- CHECK_EQ(channel_->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(message_pipe.get(), 1))),
+ CHECK_EQ(channel_->AttachEndpoint(channel_endpoint),
Channel::kBootstrapEndpointId);
CHECK(channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
Channel::kBootstrapEndpointId));
@@ -104,8 +105,8 @@ MultiprocessMessagePipeTestBase::MultiprocessMessagePipeTestBase()
MultiprocessMessagePipeTestBase::~MultiprocessMessagePipeTestBase() {
}
-void MultiprocessMessagePipeTestBase::Init(scoped_refptr<MessagePipe> mp) {
- channel_thread_.Start(helper_.server_platform_handle.Pass(), mp);
+void MultiprocessMessagePipeTestBase::Init(scoped_refptr<ChannelEndpoint> ep) {
+ channel_thread_.Start(helper_.server_platform_handle.Pass(), ep);
}
#endif
diff --git a/mojo/system/message_pipe_test_utils.h b/mojo/system/message_pipe_test_utils.h
index 4db7bdc..1d1f861 100644
--- a/mojo/system/message_pipe_test_utils.h
+++ b/mojo/system/message_pipe_test_utils.h
@@ -9,11 +9,15 @@
#include "mojo/common/test/multiprocess_test_helper.h"
#include "mojo/embedder/simple_platform_support.h"
#include "mojo/system/channel.h"
-#include "mojo/system/message_pipe.h"
#include "mojo/system/test_utils.h"
namespace mojo {
namespace system {
+
+class Channel;
+class ChannelEndpoint;
+class MessagePipe;
+
namespace test {
MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp,
@@ -26,12 +30,12 @@ class ChannelThread {
~ChannelThread();
void Start(embedder::ScopedPlatformHandle platform_handle,
- scoped_refptr<MessagePipe> message_pipe);
+ scoped_refptr<ChannelEndpoint> channel_endpoint);
void Stop();
private:
void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,
- scoped_refptr<MessagePipe> message_pipe);
+ scoped_refptr<ChannelEndpoint> channel_endpoint);
void ShutdownChannelOnIOThread();
embedder::PlatformSupport* const platform_support_;
@@ -48,7 +52,7 @@ class MultiprocessMessagePipeTestBase : public testing::Test {
virtual ~MultiprocessMessagePipeTestBase();
protected:
- void Init(scoped_refptr<MessagePipe> mp);
+ void Init(scoped_refptr<ChannelEndpoint> ep);
embedder::PlatformSupport* platform_support() { return &platform_support_; }
mojo::test::MultiprocessTestHelper* helper() { return &helper_; }
diff --git a/mojo/system/multiprocess_message_pipe_unittest.cc b/mojo/system/multiprocess_message_pipe_unittest.cc
index 829786d..7991c28 100644
--- a/mojo/system/multiprocess_message_pipe_unittest.cc
+++ b/mojo/system/multiprocess_message_pipe_unittest.cc
@@ -17,7 +17,7 @@
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
-#include "build/build_config.h" // TODO(vtl): Remove this.
+#include "build/build_config.h" // TODO(vtl): Remove this.
#include "mojo/common/test/test_utils.h"
#include "mojo/embedder/platform_shared_buffer.h"
#include "mojo/embedder/scoped_platform_handle.h"
@@ -48,8 +48,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- channel_thread.Start(client_platform_handle.Pass(), mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ channel_thread.Start(client_platform_handle.Pass(), ep);
const std::string quitquitquit("quitquitquit");
int rv = 0;
@@ -103,8 +104,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
TEST_F(MultiprocessMessagePipeTest, Basic) {
helper()->StartChild("EchoEcho");
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- Init(mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ Init(ep);
std::string hello("hello");
EXPECT_EQ(MOJO_RESULT_OK,
@@ -147,8 +149,9 @@ TEST_F(MultiprocessMessagePipeTest, Basic) {
TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
helper()->StartChild("EchoEcho");
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- Init(mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ Init(ep);
static const size_t kNumMessages = 1001;
for (size_t i = 0; i < kNumMessages; i++) {
@@ -213,8 +216,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- channel_thread.Start(client_platform_handle.Pass(), mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ channel_thread.Start(client_platform_handle.Pass(), ep);
// Wait for the first message from our parent.
HandleSignalsState hss;
@@ -312,8 +316,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckSharedBuffer) {
TEST_F(MultiprocessMessagePipeTest, MAYBE_SharedBufferPassing) {
helper()->StartChild("CheckSharedBuffer");
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- Init(mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ Init(ep);
// Make a shared buffer.
scoped_refptr<SharedBufferDispatcher> dispatcher;
@@ -407,8 +412,9 @@ MOJO_MULTIPROCESS_TEST_CHILD_MAIN(CheckPlatformHandleFile) {
embedder::ScopedPlatformHandle client_platform_handle =
mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
CHECK(client_platform_handle.is_valid());
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- channel_thread.Start(client_platform_handle.Pass(), mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ channel_thread.Start(client_platform_handle.Pass(), ep);
HandleSignalsState hss;
CHECK_EQ(test::WaitIfNecessary(mp, MOJO_HANDLE_SIGNAL_READABLE, &hss),
@@ -465,8 +471,9 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_PlatformHandlePassing) {
helper()->StartChild("CheckPlatformHandleFile");
- scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy());
- Init(mp);
+ scoped_refptr<ChannelEndpoint> ep;
+ scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalProxy(&ep));
+ Init(ep);
base::FilePath unused;
base::ScopedFILE fp(
diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc
index f3a3b34..a52099f 100644
--- a/mojo/system/proxy_message_pipe_endpoint.cc
+++ b/mojo/system/proxy_message_pipe_endpoint.cc
@@ -14,14 +14,20 @@
namespace mojo {
namespace system {
-ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
- : is_running_(false), is_peer_open_(true) {
+ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
+ ChannelEndpoint* channel_endpoint)
+ : channel_endpoint_(channel_endpoint),
+ is_running_(false),
+ is_peer_open_(true) {
}
ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
+ ChannelEndpoint* channel_endpoint,
LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open)
- : is_running_(false), is_peer_open_(is_peer_open) {
+ : channel_endpoint_(channel_endpoint),
+ is_running_(false),
+ is_peer_open_(is_peer_open) {
paused_message_queue_.Swap(local_message_pipe_endpoint->message_queue());
local_message_pipe_endpoint->Close();
}
@@ -72,12 +78,6 @@ void ProxyMessagePipeEndpoint::EnqueueMessage(
}
}
-void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint) {
- DCHECK(channel_endpoint);
- DCHECK(!is_attached());
- channel_endpoint_ = channel_endpoint;
-}
-
bool ProxyMessagePipeEndpoint::Run() {
// Assertions about current state:
DCHECK(is_attached());
diff --git a/mojo/system/proxy_message_pipe_endpoint.h b/mojo/system/proxy_message_pipe_endpoint.h
index 95126f3..fdc5e72 100644
--- a/mojo/system/proxy_message_pipe_endpoint.h
+++ b/mojo/system/proxy_message_pipe_endpoint.h
@@ -37,12 +37,13 @@ class MessagePipe;
class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
: public MessagePipeEndpoint {
public:
- ProxyMessagePipeEndpoint();
+ explicit ProxyMessagePipeEndpoint(ChannelEndpoint* channel_endpoint);
// Constructs a |ProxyMessagePipeEndpoint| that replaces the given
// |LocalMessagePipeEndpoint| (which this constructor will close), taking its
// message queue's contents. This is done when transferring a message pipe
// handle over a remote message pipe.
ProxyMessagePipeEndpoint(
+ ChannelEndpoint* channel_endpoint,
LocalMessagePipeEndpoint* local_message_pipe_endpoint,
bool is_peer_open);
virtual ~ProxyMessagePipeEndpoint();
@@ -51,7 +52,6 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
virtual Type GetType() const OVERRIDE;
virtual bool OnPeerClose() OVERRIDE;
virtual void EnqueueMessage(scoped_ptr<MessageInTransit> message) OVERRIDE;
- virtual void Attach(ChannelEndpoint* channel_endpoint) OVERRIDE;
virtual bool Run() OVERRIDE;
virtual void OnRemove() OVERRIDE;
diff --git a/mojo/system/remote_message_pipe_unittest.cc b/mojo/system/remote_message_pipe_unittest.cc
index 8a3b9e6..ee1c80f 100644
--- a/mojo/system/remote_message_pipe_unittest.cc
+++ b/mojo/system/remote_message_pipe_unittest.cc
@@ -60,31 +60,29 @@ class RemoteMessagePipeTest : public testing::Test {
}
protected:
- // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
- // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
- // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
- void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
- scoped_refptr<MessagePipe> mp1) {
+ // This connects the two given |ChannelEndpoint|s.
+ void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
+ scoped_refptr<ChannelEndpoint> ep1) {
io_thread_.PostTaskAndWait(
FROM_HERE,
- base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
+ base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
base::Unretained(this),
- mp0,
- mp1));
+ ep0,
+ ep1));
}
- // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
- // It assumes/requires that this is the bootstrap case, i.e., that the
- // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
- // returns *without* waiting for it to finish connecting.
- void BootstrapMessagePipeNoWait(unsigned channel_index,
- scoped_refptr<MessagePipe> mp) {
+ // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
+ // that this is the bootstrap case, i.e., that the endpoint IDs are both/will
+ // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
+ // it to finish connecting.
+ void BootstrapChannelEndpointNoWait(unsigned channel_index,
+ scoped_refptr<ChannelEndpoint> ep) {
io_thread_.PostTask(
FROM_HERE,
- base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
+ base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
base::Unretained(this),
channel_index,
- mp));
+ ep));
}
void RestoreInitialState() {
@@ -129,8 +127,8 @@ class RemoteMessagePipeTest : public testing::Test {
RawChannel::Create(platform_handles_[channel_index].Pass())));
}
- void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
- scoped_refptr<MessagePipe> mp1) {
+ void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
+ scoped_refptr<ChannelEndpoint> ep1) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
if (!channels_[0].get())
@@ -138,26 +136,21 @@ class RemoteMessagePipeTest : public testing::Test {
if (!channels_[1].get())
CreateAndInitChannel(1);
- MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(mp0.get(), 1)));
- MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(mp1.get(), 0)));
+ MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
+ MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);
CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
}
- void BootstrapMessagePipeOnIOThread(unsigned channel_index,
- scoped_refptr<MessagePipe> mp) {
+ void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
+ scoped_refptr<ChannelEndpoint> ep) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
CHECK(channel_index == 0 || channel_index == 1);
- unsigned port = channel_index ^ 1u;
-
CreateAndInitChannel(channel_index);
MessageInTransit::EndpointId endpoint_id =
- channels_[channel_index]->AttachEndpoint(
- make_scoped_refptr(new ChannelEndpoint(mp.get(), port)));
+ channels_[channel_index]->AttachEndpoint(ep);
if (endpoint_id == MessageInTransit::kInvalidEndpointId)
return;
@@ -194,9 +187,11 @@ TEST_F(RemoteMessagePipeTest, Basic) {
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp0, mp1);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ ConnectChannelEndpoints(ep0, ep1);
// Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
@@ -303,15 +298,19 @@ TEST_F(RemoteMessagePipeTest, Multiplex) {
// Connect message pipes as in the |Basic| test.
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp0, mp1);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ ConnectChannelEndpoints(ep0, ep1);
// Now put another message pipe on the channel.
- scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp2, mp3);
+ scoped_refptr<ChannelEndpoint> ep2;
+ scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
+ scoped_refptr<ChannelEndpoint> ep3;
+ scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
+ ConnectChannelEndpoints(ep2, ep3);
// Write: MP 2, port 0 -> MP 3, port 1.
@@ -450,7 +449,8 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
// connected to MP 1, port 0, which will be attached to channel 1. This leaves
// MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
// Write to MP 0, port 0.
EXPECT_EQ(MOJO_RESULT_OK,
@@ -460,12 +460,13 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
- BootstrapMessagePipeNoWait(0, mp0);
+ BootstrapChannelEndpointNoWait(0, ep0);
// Close MP 0, port 0 before channel 1 is even connected.
mp0->Close(0);
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
// Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
// it later, it might already be readable.)
@@ -473,7 +474,7 @@ TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
ASSERT_EQ(MOJO_RESULT_OK,
mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
- BootstrapMessagePipeNoWait(1, mp1);
+ BootstrapChannelEndpointNoWait(1, ep1);
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
@@ -508,9 +509,11 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) {
HandleSignalsState hss;
uint32_t context = 0;
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp0, mp1);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ ConnectChannelEndpoints(ep0, ep1);
// We'll try to pass this dispatcher.
scoped_refptr<MessagePipeDispatcher> dispatcher(
@@ -676,9 +679,11 @@ TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
HandleSignalsState hss;
uint32_t context = 0;
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp0, mp1);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ ConnectChannelEndpoints(ep0, ep1);
// We'll try to pass this dispatcher.
scoped_refptr<SharedBufferDispatcher> dispatcher;
@@ -810,9 +815,11 @@ TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
uint32_t context = 0;
HandleSignalsState hss;
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp0, mp1);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ ConnectChannelEndpoints(ep0, ep1);
base::FilePath unused;
base::ScopedFILE fp(
@@ -913,11 +920,13 @@ TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
for (unsigned i = 0; i < 256; i++) {
DVLOG(2) << "---------------------------------------- " << i;
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- BootstrapMessagePipeNoWait(0, mp0);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ BootstrapChannelEndpointNoWait(0, ep0);
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- BootstrapMessagePipeNoWait(1, mp1);
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ BootstrapChannelEndpointNoWait(1, ep1);
if (i & 1u) {
io_thread()->task_runner()->PostTask(
@@ -951,9 +960,11 @@ TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
HandleSignalsState hss;
uint32_t context = 0;
- scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
- scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
- ConnectMessagePipes(mp0, mp1);
+ scoped_refptr<ChannelEndpoint> ep0;
+ scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
+ scoped_refptr<ChannelEndpoint> ep1;
+ scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
+ ConnectChannelEndpoints(ep0, ep1);
// We'll try to pass this dispatcher.
scoped_refptr<MessagePipeDispatcher> dispatcher(