summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--BUILD.gn1
-rw-r--r--build/all.gyp6
-rw-r--r--content/browser/renderer_host/render_process_host_impl.cc29
-rw-r--r--content/browser/renderer_host/render_process_host_impl.h3
-rw-r--r--content/child/child_thread.cc59
-rw-r--r--content/child/child_thread.h15
-rw-r--r--content/common/BUILD.gn1
-rw-r--r--content/content_common.gypi1
-rw-r--r--content/gpu/gpu_child_thread.cc2
-rw-r--r--content/public/common/content_switches.cc4
-rw-r--r--content/public/common/content_switches.h1
-rw-r--r--content/renderer/render_thread_impl.cc10
-rw-r--r--content/utility/utility_thread_impl.cc2
-rw-r--r--ipc/BUILD.gn10
-rw-r--r--ipc/ipc.gyp6
-rw-r--r--ipc/ipc.gypi2
-rw-r--r--ipc/ipc_channel.h8
-rw-r--r--ipc/ipc_channel_factory.cc42
-rw-r--r--ipc/ipc_channel_factory.h33
-rw-r--r--ipc/ipc_channel_nacl.cc9
-rw-r--r--ipc/ipc_channel_nacl.h2
-rw-r--r--ipc/ipc_channel_posix.cc13
-rw-r--r--ipc/ipc_channel_posix.h4
-rw-r--r--ipc/ipc_channel_proxy.cc34
-rw-r--r--ipc/ipc_channel_proxy.h10
-rw-r--r--ipc/ipc_channel_reader.cc4
-rw-r--r--ipc/ipc_channel_reader.h5
-rw-r--r--ipc/ipc_channel_unittest.cc88
-rw-r--r--ipc/ipc_channel_win.cc10
-rw-r--r--ipc/ipc_channel_win.h2
-rw-r--r--ipc/ipc_export.h15
-rw-r--r--ipc/ipc_message.h1
-rw-r--r--ipc/ipc_sync_channel.cc14
-rw-r--r--ipc/ipc_sync_channel.h8
-rw-r--r--ipc/ipc_test_base.cc13
-rw-r--r--ipc/ipc_test_base.h6
-rw-r--r--ipc/ipc_test_channel_listener.cc62
-rw-r--r--ipc/ipc_test_channel_listener.h44
-rw-r--r--ipc/ipc_test_sink.cc9
-rw-r--r--ipc/ipc_test_sink.h2
-rw-r--r--ipc/mojo/BUILD.gn41
-rw-r--r--ipc/mojo/DEPS4
-rw-r--r--ipc/mojo/ipc_channel_mojo.cc596
-rw-r--r--ipc/mojo/ipc_channel_mojo.h131
-rw-r--r--ipc/mojo/ipc_channel_mojo_unittest.cc260
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.cc144
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.h98
-rw-r--r--ipc/mojo/ipc_mojo.gyp68
-rw-r--r--ipc/mojo/run_all_unittests.cc42
49 files changed, 1848 insertions, 126 deletions
diff --git a/BUILD.gn b/BUILD.gn
index b0f0885..d65662d 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -46,6 +46,7 @@ group("root") {
"//google_apis",
"//google_apis/gcm",
"//ipc",
+ "//ipc/mojo",
"//media",
"//media/cast",
"//mojo",
diff --git a/build/all.gyp b/build/all.gyp
index 133917b..96d4ea8 100644
--- a/build/all.gyp
+++ b/build/all.gyp
@@ -86,6 +86,7 @@
'../gpu/gpu.gyp:*',
'../gpu/tools/tools.gyp:*',
'../ipc/ipc.gyp:*',
+ '../ipc/mojo/ipc_mojo.gyp:*',
'../jingle/jingle.gyp:*',
'../media/cast/cast.gyp:*',
'../media/media.gyp:*',
@@ -291,6 +292,7 @@
'../gpu/gles2_conform_support/gles2_conform_support.gyp:gles2_conform_support',
'../gpu/gpu.gyp:gpu_unittests',
'../ipc/ipc.gyp:ipc_tests',
+ '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests',
'../jingle/jingle.gyp:jingle_unittests',
'../media/cast/cast.gyp:cast_unittests',
'../media/media.gyp:media_unittests',
@@ -849,6 +851,7 @@
'../google_apis/gcm/gcm.gyp:gcm_unit_tests',
'../gpu/gpu.gyp:gpu_unittests',
'../ipc/ipc.gyp:ipc_tests',
+ '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests',
'../jingle/jingle.gyp:jingle_unittests',
'../media/media.gyp:media_unittests',
'../ppapi/ppapi_internal.gyp:ppapi_unittests',
@@ -885,6 +888,7 @@
'../google_apis/gcm/gcm.gyp:gcm_unit_tests',
'../gpu/gpu.gyp:gpu_unittests',
'../ipc/ipc.gyp:ipc_tests',
+ '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests',
'../jingle/jingle.gyp:jingle_unittests',
'../media/media.gyp:media_unittests',
'../ppapi/ppapi_internal.gyp:ppapi_unittests',
@@ -982,6 +986,7 @@
'../google_apis/gcm/gcm.gyp:gcm_unit_tests',
'../gpu/gpu.gyp:gpu_unittests',
'../ipc/ipc.gyp:ipc_tests',
+ '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests',
'../jingle/jingle.gyp:jingle_unittests',
'../media/media.gyp:media_unittests',
'../ppapi/ppapi_internal.gyp:ppapi_unittests',
@@ -1077,6 +1082,7 @@
'../gpu/gpu.gyp:angle_unittests',
'../gpu/gpu.gyp:gpu_unittests',
'../ipc/ipc.gyp:ipc_tests',
+ '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests',
'../jingle/jingle.gyp:jingle_unittests',
'../media/cast/cast.gyp:cast_unittests',
'../media/media.gyp:media_unittests',
diff --git a/content/browser/renderer_host/render_process_host_impl.cc b/content/browser/renderer_host/render_process_host_impl.cc
index a1592c3..bbd5949 100644
--- a/content/browser/renderer_host/render_process_host_impl.cc
+++ b/content/browser/renderer_host/render_process_host_impl.cc
@@ -137,6 +137,7 @@
#include "ipc/ipc_channel.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_switches.h"
+#include "ipc/mojo/ipc_channel_mojo.h"
#include "media/base/media_switches.h"
#include "net/url_request/url_request_context_getter.h"
#include "ppapi/shared_impl/ppapi_switches.h"
@@ -594,11 +595,7 @@ bool RenderProcessHostImpl::Init() {
// Setup the IPC channel.
const std::string channel_id =
IPC::Channel::GenerateVerifiedChannelID(std::string());
- channel_ = IPC::ChannelProxy::Create(
- channel_id,
- IPC::Channel::MODE_SERVER,
- this,
- BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO).get());
+ channel_ = CreateChannelProxy(channel_id);
// Setup the Mojo channel.
mojo_application_host_->Init();
@@ -678,6 +675,27 @@ void RenderProcessHostImpl::MaybeActivateMojo() {
mojo_application_host_->Activate(this, GetHandle());
}
+bool RenderProcessHostImpl::ShouldUseMojoChannel() const {
+ const CommandLine& command_line = *CommandLine::ForCurrentProcess();
+ return command_line.HasSwitch(switches::kEnableRendererMojoChannel);
+}
+
+scoped_ptr<IPC::ChannelProxy> RenderProcessHostImpl::CreateChannelProxy(
+ const std::string& channel_id) {
+ scoped_refptr<base::SingleThreadTaskRunner> runner =
+ BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO);
+ if (ShouldUseMojoChannel()) {
+ VLOG(1) << "Mojo Channel is enabled on host";
+ return IPC::ChannelProxy::Create(
+ IPC::ChannelMojo::CreateFactory(
+ channel_id, IPC::Channel::MODE_SERVER, runner),
+ this, runner.get());
+ }
+
+ return IPC::ChannelProxy::Create(
+ channel_id, IPC::Channel::MODE_SERVER, this, runner.get());
+}
+
void RenderProcessHostImpl::CreateMessageFilters() {
DCHECK_CURRENTLY_ON(BrowserThread::UI);
AddFilter(new ResourceSchedulerFilter(GetID()));
@@ -1128,6 +1146,7 @@ void RenderProcessHostImpl::PropagateBrowserCommandLineToRenderer(
switches::kEnablePinch,
switches::kEnablePreciseMemoryInfo,
switches::kEnablePreparsedJsCaching,
+ switches::kEnableRendererMojoChannel,
switches::kEnableSeccompFilterSandbox,
switches::kEnableSkiaBenchmarking,
switches::kEnableSmoothScrolling,
diff --git a/content/browser/renderer_host/render_process_host_impl.h b/content/browser/renderer_host/render_process_host_impl.h
index f4f29ac..18dac77 100644
--- a/content/browser/renderer_host/render_process_host_impl.h
+++ b/content/browser/renderer_host/render_process_host_impl.h
@@ -279,6 +279,9 @@ class CONTENT_EXPORT RenderProcessHostImpl
friend class VisitRelayingRenderProcessHost;
void MaybeActivateMojo();
+ bool ShouldUseMojoChannel() const;
+ scoped_ptr<IPC::ChannelProxy> CreateChannelProxy(
+ const std::string& channel_id);
// Creates and adds the IO thread message filters.
void CreateMessageFilters();
diff --git a/content/child/child_thread.cc b/content/child/child_thread.cc
index b153f04..7e58c1f 100644
--- a/content/child/child_thread.cc
+++ b/content/child/child_thread.cc
@@ -48,6 +48,7 @@
#include "ipc/ipc_switches.h"
#include "ipc/ipc_sync_channel.h"
#include "ipc/ipc_sync_message_filter.h"
+#include "ipc/mojo/ipc_channel_mojo.h"
#if defined(OS_WIN)
#include "content/common/handle_enumerator_win.h"
@@ -192,6 +193,17 @@ void QuitMainThreadMessageLoop() {
} // namespace
+ChildThread::Options::Options()
+ : channel_name(CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ switches::kProcessChannelID)),
+ use_mojo_channel(false) {}
+
+ChildThread::Options::Options(bool mojo)
+ : channel_name(CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ switches::kProcessChannelID)),
+ use_mojo_channel(mojo) {}
+
+
ChildThread::ChildThreadMessageRouter::ChildThreadMessageRouter(
IPC::Sender* sender)
: sender_(sender) {}
@@ -204,20 +216,43 @@ ChildThread::ChildThread()
: router_(this),
channel_connected_factory_(this),
in_browser_process_(false) {
- channel_name_ = CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
- switches::kProcessChannelID);
- Init();
+ Init(Options());
}
-ChildThread::ChildThread(const std::string& channel_name)
- : channel_name_(channel_name),
- router_(this),
+ChildThread::ChildThread(const Options& options)
+ : router_(this),
channel_connected_factory_(this),
in_browser_process_(true) {
- Init();
+ Init(options);
}
-void ChildThread::Init() {
+scoped_ptr<IPC::SyncChannel> ChildThread::CreateChannel(bool use_mojo_channel) {
+ if (use_mojo_channel) {
+ VLOG(1) << "Mojo is enabled on child";
+ return IPC::SyncChannel::Create(
+ IPC::ChannelMojo::CreateFactory(
+ channel_name_,
+ IPC::Channel::MODE_CLIENT,
+ ChildProcess::current()->io_message_loop_proxy()),
+ this,
+ ChildProcess::current()->io_message_loop_proxy(),
+ true,
+ ChildProcess::current()->GetShutDownEvent());
+ }
+
+ VLOG(1) << "Mojo is disabled on child";
+ return IPC::SyncChannel::Create(
+ channel_name_,
+ IPC::Channel::MODE_CLIENT,
+ this,
+ ChildProcess::current()->io_message_loop_proxy(),
+ true,
+ ChildProcess::current()->GetShutDownEvent());
+}
+
+void ChildThread::Init(const Options& options) {
+ channel_name_ = options.channel_name;
+
g_lazy_tls.Pointer()->Set(this);
on_channel_error_called_ = false;
message_loop_ = base::MessageLoop::current();
@@ -227,13 +262,7 @@ void ChildThread::Init() {
// the logger, and the logger does not like being created on the IO thread.
IPC::Logging::GetInstance();
#endif
- channel_ =
- IPC::SyncChannel::Create(channel_name_,
- IPC::Channel::MODE_CLIENT,
- this,
- ChildProcess::current()->io_message_loop_proxy(),
- true,
- ChildProcess::current()->GetShutDownEvent());
+ channel_ = CreateChannel(options.use_mojo_channel);
#ifdef IPC_MESSAGE_LOG_ENABLED
if (!in_browser_process_)
IPC::Logging::GetInstance()->SetIPCSender(this);
diff --git a/content/child/child_thread.h b/content/child/child_thread.h
index 8bad5ef..88b52ab 100644
--- a/content/child/child_thread.h
+++ b/content/child/child_thread.h
@@ -53,10 +53,20 @@ struct RequestInfo;
// The main thread of a child process derives from this class.
class CONTENT_EXPORT ChildThread : public IPC::Listener, public IPC::Sender {
public:
+ struct CONTENT_EXPORT Options {
+ Options();
+ explicit Options(bool mojo);
+ Options(std::string name, bool mojo)
+ : channel_name(name), use_mojo_channel(mojo) {}
+
+ std::string channel_name;
+ bool use_mojo_channel;
+ };
+
// Creates the thread.
ChildThread();
// Used for single-process mode and for in process gpu mode.
- explicit ChildThread(const std::string& channel_name);
+ explicit ChildThread(const Options& options);
// ChildProcess::main_thread() is reset after Shutdown(), and before the
// destructor, so any subsystem that relies on ChildProcess::main_thread()
// must be terminated before Shutdown returns. In particular, if a subsystem
@@ -173,7 +183,8 @@ class CONTENT_EXPORT ChildThread : public IPC::Listener, public IPC::Sender {
IPC::Sender* const sender_;
};
- void Init();
+ void Init(const Options& options);
+ scoped_ptr<IPC::SyncChannel> CreateChannel(bool use_mojo_channel);
// IPC message handlers.
void OnShutdown();
diff --git a/content/common/BUILD.gn b/content/common/BUILD.gn
index 0e44b2c..02efe32 100644
--- a/content/common/BUILD.gn
+++ b/content/common/BUILD.gn
@@ -48,6 +48,7 @@ source_set("common") {
deps += [
"//cc",
"//ipc",
+ "//ipc/mojo",
"//mojo/environment:chromium",
"//mojo/system",
# TODO: the dependency on gl_in_process_context should be decoupled from
diff --git a/content/content_common.gypi b/content/content_common.gypi
index 9d780a2..1d2b409 100644
--- a/content/content_common.gypi
+++ b/content/content_common.gypi
@@ -553,6 +553,7 @@
'../gpu/gpu.gyp:gpu_ipc',
'../gpu/skia_bindings/skia_bindings.gyp:gpu_skia_bindings',
'../ipc/ipc.gyp:ipc',
+ '../ipc/mojo/ipc_mojo.gyp:ipc_mojo',
'../media/media.gyp:media',
'../media/media.gyp:shared_memory_support',
'../mojo/mojo_base.gyp:mojo_application_bindings',
diff --git a/content/gpu/gpu_child_thread.cc b/content/gpu/gpu_child_thread.cc
index 8bfec29..4f77446 100644
--- a/content/gpu/gpu_child_thread.cc
+++ b/content/gpu/gpu_child_thread.cc
@@ -61,7 +61,7 @@ GpuChildThread::GpuChildThread(GpuWatchdogThread* watchdog_thread,
}
GpuChildThread::GpuChildThread(const std::string& channel_id)
- : ChildThread(channel_id),
+ : ChildThread(Options(channel_id, false)),
dead_on_arrival_(false),
in_browser_process_(true) {
#if defined(OS_WIN)
diff --git a/content/public/common/content_switches.cc b/content/public/common/content_switches.cc
index 1b7a1ef..1bc2a24 100644
--- a/content/public/common/content_switches.cc
+++ b/content/public/common/content_switches.cc
@@ -425,6 +425,10 @@ const char kEnablePreparsedJsCaching[] = "enable-preparsed-js-caching";
const char kEnableRegionBasedColumns[] =
"enable-region-based-columns";
+// Replaces renderer-browser IPC channel with ChnanelMojo.
+const char kEnableRendererMojoChannel[] =
+ "enable-renderer-mojo-channel";
+
// Enables targeted style recalculation optimizations.
const char kEnableTargetedStyleRecalc[] =
"enable-targeted-style-recalc";
diff --git a/content/public/common/content_switches.h b/content/public/common/content_switches.h
index ae77186..47b5fdc 100644
--- a/content/public/common/content_switches.h
+++ b/content/public/common/content_switches.h
@@ -124,6 +124,7 @@ CONTENT_EXPORT extern const char kEnablePinch[];
CONTENT_EXPORT extern const char kEnablePreciseMemoryInfo[];
extern const char kEnablePreparsedJsCaching[];
CONTENT_EXPORT extern const char kEnableRegionBasedColumns[];
+CONTENT_EXPORT extern const char kEnableRendererMojoChannel[];
CONTENT_EXPORT extern const char kEnableSandboxLogging[];
extern const char kEnableSeccompFilterSandbox[];
extern const char kEnableSkiaBenchmarking[];
diff --git a/content/renderer/render_thread_impl.cc b/content/renderer/render_thread_impl.cc
index 5f4d4fb..e652d22 100644
--- a/content/renderer/render_thread_impl.cc
+++ b/content/renderer/render_thread_impl.cc
@@ -295,6 +295,11 @@ void CreateRenderFrameSetup(mojo::InterfaceRequest<RenderFrameSetup> request) {
mojo::BindToRequest(new RenderFrameSetupImpl(), &request);
}
+bool ShouldUseMojoChannel() {
+ return CommandLine::ForCurrentProcess()->HasSwitch(
+ switches::kEnableRendererMojoChannel);
+}
+
} // namespace
// For measuring memory usage after each task. Behind a command line flag.
@@ -363,12 +368,13 @@ RenderThreadImpl* RenderThreadImpl::current() {
// When we run plugins in process, we actually run them on the render thread,
// which means that we need to make the render thread pump UI events.
-RenderThreadImpl::RenderThreadImpl() {
+RenderThreadImpl::RenderThreadImpl()
+ : ChildThread(Options(ShouldUseMojoChannel())) {
Init();
}
RenderThreadImpl::RenderThreadImpl(const std::string& channel_name)
- : ChildThread(channel_name) {
+ : ChildThread(Options(channel_name, ShouldUseMojoChannel())) {
Init();
}
diff --git a/content/utility/utility_thread_impl.cc b/content/utility/utility_thread_impl.cc
index c94312f..bb51842 100644
--- a/content/utility/utility_thread_impl.cc
+++ b/content/utility/utility_thread_impl.cc
@@ -37,7 +37,7 @@ UtilityThreadImpl::UtilityThreadImpl() : single_process_(false) {
}
UtilityThreadImpl::UtilityThreadImpl(const std::string& channel_name)
- : ChildThread(channel_name),
+ : ChildThread(Options(channel_name, false)),
single_process_(true) {
Init();
}
diff --git a/ipc/BUILD.gn b/ipc/BUILD.gn
index 415c08d..2380596 100644
--- a/ipc/BUILD.gn
+++ b/ipc/BUILD.gn
@@ -8,6 +8,8 @@ component("ipc") {
"file_descriptor_set_posix.h",
"ipc_channel.cc",
"ipc_channel.h",
+ "ipc_channel_factory.cc",
+ "ipc_channel_factory.h",
"ipc_channel_common.cc",
"ipc_channel_handle.h",
"ipc_channel_nacl.cc",
@@ -95,8 +97,6 @@ if (!is_android) {
"ipc_sync_channel_unittest.cc",
"ipc_sync_message_unittest.cc",
"ipc_sync_message_unittest.h",
- "ipc_test_base.cc",
- "ipc_test_base.h",
"sync_socket_unittest.cc",
"unix_domain_socket_util_unittest.cc",
]
@@ -131,8 +131,6 @@ if (!is_android) {
test("ipc_perftests") {
sources = [
"ipc_perftests.cc",
- "ipc_test_base.cc",
- "ipc_test_base.h",
]
# TODO(brettw) hook up Android testing.
@@ -165,6 +163,10 @@ static_library("test_support") {
"ipc_multiprocess_test.h",
"ipc_test_sink.cc",
"ipc_test_sink.h",
+ "ipc_test_base.cc",
+ "ipc_test_base.h",
+ "ipc_test_channel_listener.h",
+ "ipc_test_channel_listener.cc",
]
deps = [
":ipc",
diff --git a/ipc/ipc.gyp b/ipc/ipc.gyp
index a6e9269..6d90ab6 100644
--- a/ipc/ipc.gyp
+++ b/ipc/ipc.gyp
@@ -55,8 +55,6 @@
'ipc_sync_channel_unittest.cc',
'ipc_sync_message_unittest.cc',
'ipc_sync_message_unittest.h',
- 'ipc_test_base.cc',
- 'ipc_test_base.h',
'run_all_unittests.cc',
'sync_socket_unittest.cc',
'unix_domain_socket_util_unittest.cc',
@@ -132,6 +130,10 @@
'sources': [
'ipc_multiprocess_test.cc',
'ipc_multiprocess_test.h',
+ 'ipc_test_base.cc',
+ 'ipc_test_base.h',
+ 'ipc_test_channel_listener.cc',
+ 'ipc_test_channel_listener.h',
'ipc_test_sink.cc',
'ipc_test_sink.h',
],
diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi
index 5cf5051..5f68757 100644
--- a/ipc/ipc.gypi
+++ b/ipc/ipc.gypi
@@ -15,6 +15,8 @@
'file_descriptor_set_posix.h',
'ipc_channel.cc',
'ipc_channel.h',
+ 'ipc_channel_factory.cc',
+ 'ipc_channel_factory.h',
'ipc_channel_common.cc',
'ipc_channel_handle.h',
'ipc_channel_nacl.cc',
diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h
index bca3ea0..fbd4ae0 100644
--- a/ipc/ipc_channel.h
+++ b/ipc/ipc_channel.h
@@ -170,6 +170,14 @@ class IPC_EXPORT Channel : public Sender {
// listener.
virtual base::ProcessId GetPeerPID() const = 0;
+ // Get its own process id. This value is told to the peer.
+ virtual base::ProcessId GetSelfPID() const = 0;
+
+ // Return connected ChannelHandle which the channel has owned.
+ // This method transfers the ownership to the caller
+ // so the channel isn't valid after the call.
+ virtual ChannelHandle TakePipeHandle() WARN_UNUSED_RESULT = 0;
+
// Send a message over the Channel to the listener on the other end.
//
// |message| must be allocated using operator new. This object will be
diff --git a/ipc/ipc_channel_factory.cc b/ipc/ipc_channel_factory.cc
new file mode 100644
index 0000000..4cb1790
--- /dev/null
+++ b/ipc/ipc_channel_factory.cc
@@ -0,0 +1,42 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/ipc_channel_factory.h"
+
+namespace IPC {
+
+namespace {
+
+class PlatformChannelFactory : public ChannelFactory {
+ public:
+ PlatformChannelFactory(ChannelHandle handle,
+ Channel::Mode mode)
+ : handle_(handle), mode_(mode) {
+ }
+
+ virtual std::string GetName() const OVERRIDE {
+ return handle_.name;
+ }
+
+ virtual scoped_ptr<Channel> BuildChannel(
+ Listener* listener) OVERRIDE {
+ return Channel::Create(handle_, mode_, listener);
+ }
+
+ private:
+ ChannelHandle handle_;
+ Channel::Mode mode_;
+
+ DISALLOW_COPY_AND_ASSIGN(PlatformChannelFactory);
+};
+
+} // namespace
+
+// static
+scoped_ptr<ChannelFactory> ChannelFactory::Create(
+ const ChannelHandle& handle, Channel::Mode mode) {
+ return scoped_ptr<ChannelFactory>(new PlatformChannelFactory(handle, mode));
+}
+
+} // namespace IPC
diff --git a/ipc/ipc_channel_factory.h b/ipc/ipc_channel_factory.h
new file mode 100644
index 0000000..30bfd8c
--- /dev/null
+++ b/ipc/ipc_channel_factory.h
@@ -0,0 +1,33 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_CHANNEL_FACTORY_H_
+#define IPC_IPC_CHANNEL_FACTORY_H_
+
+#include <string>
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "ipc/ipc_channel.h"
+
+namespace IPC {
+
+// Encapsulates how a Channel is created. A ChannelFactory can be
+// passed to the constructor of ChannelProxy or SyncChannel to tell them
+// how to create underlying channel.
+class ChannelFactory {
+ public:
+ // Creates a factory for "native" channel built through
+ // IPC::Channel::Create().
+ static scoped_ptr<ChannelFactory> Create(
+ const ChannelHandle& handle, Channel::Mode mode);
+
+ virtual ~ChannelFactory() { }
+ virtual std::string GetName() const = 0;
+ virtual scoped_ptr<Channel> BuildChannel(Listener* listener) = 0;
+};
+
+} // namespace IPC
+
+#endif // IPC_IPC_CHANNEL_FACTORY_H_
diff --git a/ipc/ipc_channel_nacl.cc b/ipc/ipc_channel_nacl.cc
index 0928ba6..704f7d8 100644
--- a/ipc/ipc_channel_nacl.cc
+++ b/ipc/ipc_channel_nacl.cc
@@ -145,6 +145,15 @@ base::ProcessId ChannelNacl::GetPeerPID() const {
return -1;
}
+base::ProcessId ChannelNacl::GetSelfPID() const {
+ return -1;
+}
+
+ChannelHandle ChannelNacl::TakePipeHandle() {
+ // NACL doesn't have any platform-level handles.
+ return ChannelHandle();
+}
+
bool ChannelNacl::Connect() {
if (pipe_ == -1) {
DLOG(WARNING) << "Channel creation failed: " << pipe_name_;
diff --git a/ipc/ipc_channel_nacl.h b/ipc/ipc_channel_nacl.h
index c47892d..e217b88 100644
--- a/ipc/ipc_channel_nacl.h
+++ b/ipc/ipc_channel_nacl.h
@@ -42,6 +42,8 @@ class ChannelNacl : public Channel,
// Channel implementation.
virtual base::ProcessId GetPeerPID() const OVERRIDE;
+ virtual base::ProcessId GetSelfPID() const OVERRIDE;
+ virtual ChannelHandle TakePipeHandle() OVERRIDE;
virtual bool Connect() OVERRIDE;
virtual void Close() OVERRIDE;
virtual bool Send(Message* message) OVERRIDE;
diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc
index 8ddf73a..ac9de55 100644
--- a/ipc/ipc_channel_posix.cc
+++ b/ipc/ipc_channel_posix.cc
@@ -747,7 +747,7 @@ void ChannelPosix::ClosePipeOnError() {
}
}
-int ChannelPosix::GetHelloMessageProcId() {
+int ChannelPosix::GetHelloMessageProcId() const {
int pid = base::GetCurrentProcId();
#if defined(OS_LINUX)
// Our process may be in a sandbox with a separate PID namespace.
@@ -1050,6 +1050,17 @@ base::ProcessId ChannelPosix::GetPeerPID() const {
return peer_pid_;
}
+base::ProcessId ChannelPosix::GetSelfPID() const {
+ return GetHelloMessageProcId();
+}
+
+ChannelHandle ChannelPosix::TakePipeHandle() {
+ ChannelHandle handle = ChannelHandle(pipe_name_,
+ base::FileDescriptor(pipe_, false));
+ pipe_ = -1;
+ return handle;
+}
+
//------------------------------------------------------------------------------
// Channel's methods
diff --git a/ipc/ipc_channel_posix.h b/ipc/ipc_channel_posix.h
index 7f17b2f..a235739 100644
--- a/ipc/ipc_channel_posix.h
+++ b/ipc/ipc_channel_posix.h
@@ -62,6 +62,8 @@ class IPC_EXPORT ChannelPosix : public Channel,
virtual void Close() OVERRIDE;
virtual bool Send(Message* message) OVERRIDE;
virtual base::ProcessId GetPeerPID() const OVERRIDE;
+ virtual base::ProcessId GetSelfPID() const OVERRIDE;
+ virtual ChannelHandle TakePipeHandle() OVERRIDE;
virtual int GetClientFileDescriptor() const OVERRIDE;
virtual int TakeClientFileDescriptor() OVERRIDE;
@@ -94,7 +96,7 @@ class IPC_EXPORT ChannelPosix : public Channel,
bool AcceptConnection();
void ClosePipeOnError();
- int GetHelloMessageProcId();
+ int GetHelloMessageProcId() const;
void QueueHelloMessage();
void CloseFileDescriptors(Message* msg);
void QueueCloseFDMessage(int fd, int hops);
diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc
index 7441c65..2ea722d 100644
--- a/ipc/ipc_channel_proxy.cc
+++ b/ipc/ipc_channel_proxy.cc
@@ -11,6 +11,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
+#include "ipc/ipc_channel_factory.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_macros.h"
@@ -48,11 +49,10 @@ void ChannelProxy::Context::ClearIPCTaskRunner() {
ipc_task_runner_ = NULL;
}
-void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
- const Channel::Mode& mode) {
+void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
DCHECK(!channel_);
- channel_id_ = handle.name;
- channel_ = Channel::Create(handle, mode, this);
+ channel_id_ = factory->GetName();
+ channel_ = factory->BuildChannel(this);
}
bool ChannelProxy::Context::TryFilters(const Message& message) {
@@ -315,6 +315,16 @@ scoped_ptr<ChannelProxy> ChannelProxy::Create(
return channel.Pass();
}
+// static
+scoped_ptr<ChannelProxy> ChannelProxy::Create(
+ scoped_ptr<ChannelFactory> factory,
+ Listener* listener,
+ base::SingleThreadTaskRunner* ipc_task_runner) {
+ scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
+ channel->Init(factory.Pass(), true);
+ return channel.Pass();
+}
+
ChannelProxy::ChannelProxy(Context* context)
: context_(context),
did_init_(false) {
@@ -334,8 +344,6 @@ ChannelProxy::~ChannelProxy() {
void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
Channel::Mode mode,
bool create_pipe_now) {
- DCHECK(CalledOnValidThread());
- DCHECK(!did_init_);
#if defined(OS_POSIX)
// When we are creating a server on POSIX, we need its file descriptor
// to be created immediately so that it can be accessed and passed
@@ -345,17 +353,25 @@ void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
create_pipe_now = true;
}
#endif // defined(OS_POSIX)
+ Init(ChannelFactory::Create(channel_handle, mode),
+ create_pipe_now);
+}
+
+void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
+ bool create_pipe_now) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(!did_init_);
if (create_pipe_now) {
// Create the channel immediately. This effectively sets up the
// low-level pipe so that the client can connect. Without creating
// the pipe immediately, it is possible for a listener to attempt
// to connect and get an error since the pipe doesn't exist yet.
- context_->CreateChannel(channel_handle, mode);
+ context_->CreateChannel(factory.Pass());
} else {
context_->ipc_task_runner()->PostTask(
- FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
- channel_handle, mode));
+ FROM_HERE, base::Bind(&Context::CreateChannel,
+ context_.get(), Passed(factory.Pass())));
}
// complete initialization on the background thread
diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h
index 2e483ac..e76e56f 100644
--- a/ipc/ipc_channel_proxy.h
+++ b/ipc/ipc_channel_proxy.h
@@ -22,6 +22,7 @@ class SingleThreadTaskRunner;
namespace IPC {
+class ChannelFactory;
class MessageFilter;
class MessageFilterRouter;
class SendCallbackHelper;
@@ -70,6 +71,11 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe {
Listener* listener,
base::SingleThreadTaskRunner* ipc_task_runner);
+ static scoped_ptr<ChannelProxy> Create(
+ scoped_ptr<ChannelFactory> factory,
+ Listener* listener,
+ base::SingleThreadTaskRunner* ipc_task_runner);
+
virtual ~ChannelProxy();
// Initializes the channel proxy. Only call this once to initialize a channel
@@ -78,6 +84,7 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe {
// thread.
void Init(const IPC::ChannelHandle& channel_handle, Channel::Mode mode,
bool create_pipe_now);
+ void Init(scoped_ptr<ChannelFactory> factory, bool create_pipe_now);
// Close the IPC::Channel. This operation completes asynchronously, once the
// background thread processes the command to close the channel. It is ok to
@@ -171,8 +178,7 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe {
friend class SendCallbackHelper;
// Create the Channel
- void CreateChannel(const IPC::ChannelHandle& channel_handle,
- const Channel::Mode& mode);
+ void CreateChannel(scoped_ptr<ChannelFactory> factory);
// Methods called on the IO thread.
void OnSendMessage(scoped_ptr<Message> message_ptr);
diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc
index 9a3cc3c..28a889a 100644
--- a/ipc/ipc_channel_reader.cc
+++ b/ipc/ipc_channel_reader.cc
@@ -38,13 +38,13 @@ bool ChannelReader::AsyncReadComplete(int bytes_read) {
return DispatchInputData(input_buf_, bytes_read);
}
-bool ChannelReader::IsInternalMessage(const Message& m) const {
+bool ChannelReader::IsInternalMessage(const Message& m) {
return m.routing_id() == MSG_ROUTING_NONE &&
m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE &&
m.type() <= Channel::HELLO_MESSAGE_TYPE;
}
-bool ChannelReader::IsHelloMessage(const Message& m) const {
+bool ChannelReader::IsHelloMessage(const Message& m) {
return m.routing_id() == MSG_ROUTING_NONE &&
m.type() == Channel::HELLO_MESSAGE_TYPE;
}
diff --git a/ipc/ipc_channel_reader.h b/ipc/ipc_channel_reader.h
index 1303846..9dec8c1 100644
--- a/ipc/ipc_channel_reader.h
+++ b/ipc/ipc_channel_reader.h
@@ -7,6 +7,7 @@
#include "base/basictypes.h"
#include "ipc/ipc_channel.h"
+#include "ipc/ipc_export.h"
namespace IPC {
namespace internal {
@@ -44,11 +45,11 @@ class ChannelReader {
// Returns true if the given message is internal to the IPC implementation,
// like the "hello" message sent on channel set-up.
- bool IsInternalMessage(const Message& m) const;
+ bool IsInternalMessage(const Message& m);
// Returns true if the given message is an Hello message
// sent on channel set-up.
- bool IsHelloMessage(const Message& m) const;
+ bool IsHelloMessage(const Message& m);
protected:
enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
diff --git a/ipc/ipc_channel_unittest.cc b/ipc/ipc_channel_unittest.cc
index b9665db..1f85311 100644
--- a/ipc/ipc_channel_unittest.cc
+++ b/ipc/ipc_channel_unittest.cc
@@ -15,76 +15,10 @@
#include "base/threading/thread.h"
#include "ipc/ipc_message.h"
#include "ipc/ipc_test_base.h"
+#include "ipc/ipc_test_channel_listener.h"
namespace {
-const size_t kLongMessageStringNumBytes = 50000;
-
-static void Send(IPC::Sender* sender, const char* text) {
- static int message_index = 0;
-
- IPC::Message* message = new IPC::Message(0,
- 2,
- IPC::Message::PRIORITY_NORMAL);
- message->WriteInt(message_index++);
- message->WriteString(std::string(text));
-
- // Make sure we can handle large messages.
- char junk[kLongMessageStringNumBytes];
- memset(junk, 'a', sizeof(junk)-1);
- junk[sizeof(junk)-1] = 0;
- message->WriteString(std::string(junk));
-
- // DEBUG: printf("[%u] sending message [%s]\n", GetCurrentProcessId(), text);
- sender->Send(message);
-}
-
-// A generic listener that expects messages of a certain type (see
-// OnMessageReceived()), and either sends a generic response or quits after the
-// 50th message (or on channel error).
-class GenericChannelListener : public IPC::Listener {
- public:
- GenericChannelListener() : sender_(NULL), messages_left_(50) {}
- virtual ~GenericChannelListener() {}
-
- virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
- PickleIterator iter(message);
-
- int ignored;
- EXPECT_TRUE(iter.ReadInt(&ignored));
- std::string data;
- EXPECT_TRUE(iter.ReadString(&data));
- std::string big_string;
- EXPECT_TRUE(iter.ReadString(&big_string));
- EXPECT_EQ(kLongMessageStringNumBytes - 1, big_string.length());
-
- SendNextMessage();
- return true;
- }
-
- virtual void OnChannelError() OVERRIDE {
- // There is a race when closing the channel so the last message may be lost.
- EXPECT_LE(messages_left_, 1);
- base::MessageLoop::current()->Quit();
- }
-
- void Init(IPC::Sender* s) {
- sender_ = s;
- }
-
- protected:
- void SendNextMessage() {
- if (--messages_left_ <= 0)
- base::MessageLoop::current()->Quit();
- else
- Send(sender_, "Foo");
- }
-
- private:
- IPC::Sender* sender_;
- int messages_left_;
-};
-
class IPCChannelTest : public IPCTestBase {
};
@@ -124,13 +58,13 @@ TEST_F(IPCChannelTest, ChannelTest) {
Init("GenericClient");
// Set up IPC channel and start client.
- GenericChannelListener listener;
+ IPC::TestChannelListener listener;
CreateChannel(&listener);
listener.Init(sender());
ASSERT_TRUE(ConnectChannel());
ASSERT_TRUE(StartClient());
- Send(sender(), "hello from parent");
+ IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
// Run message loop.
base::MessageLoop::current()->Run();
@@ -149,7 +83,7 @@ TEST_F(IPCChannelTest, ChannelTestExistingPipe) {
// Create pipe manually using the standard Chromium name and set up IPC
// channel.
- GenericChannelListener listener;
+ IPC::TestChannelListener listener;
std::string name("\\\\.\\pipe\\chrome.");
name.append(GetChannelName("GenericClient"));
HANDLE pipe = CreateNamedPipeA(name.c_str(),
@@ -169,7 +103,7 @@ TEST_F(IPCChannelTest, ChannelTestExistingPipe) {
ASSERT_TRUE(ConnectChannel());
ASSERT_TRUE(StartClient());
- Send(sender(), "hello from parent");
+ IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
// Run message loop.
base::MessageLoop::current()->Run();
@@ -191,13 +125,13 @@ TEST_F(IPCChannelTest, ChannelProxyTest) {
thread.StartWithOptions(options);
// Set up IPC channel proxy.
- GenericChannelListener listener;
+ IPC::TestChannelListener listener;
CreateChannelProxy(&listener, thread.message_loop_proxy().get());
listener.Init(sender());
ASSERT_TRUE(StartClient());
- Send(sender(), "hello from parent");
+ IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
// Run message loop.
base::MessageLoop::current()->Run();
@@ -209,7 +143,7 @@ TEST_F(IPCChannelTest, ChannelProxyTest) {
thread.Stop();
}
-class ChannelListenerWithOnConnectedSend : public GenericChannelListener {
+class ChannelListenerWithOnConnectedSend : public IPC::TestChannelListener {
public:
ChannelListenerWithOnConnectedSend() {}
virtual ~ChannelListenerWithOnConnectedSend() {}
@@ -237,7 +171,7 @@ TEST_F(IPCChannelTest, MAYBE_SendMessageInChannelConnected) {
ASSERT_TRUE(ConnectChannel());
ASSERT_TRUE(StartClient());
- Send(sender(), "hello from parent");
+ IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
// Run message loop.
base::MessageLoop::current()->Run();
@@ -251,7 +185,7 @@ TEST_F(IPCChannelTest, MAYBE_SendMessageInChannelConnected) {
MULTIPROCESS_IPC_TEST_CLIENT_MAIN(GenericClient) {
base::MessageLoopForIO main_message_loop;
- GenericChannelListener listener;
+ IPC::TestChannelListener listener;
// Set up IPC channel.
scoped_ptr<IPC::Channel> channel(IPC::Channel::CreateClient(
@@ -259,7 +193,7 @@ MULTIPROCESS_IPC_TEST_CLIENT_MAIN(GenericClient) {
&listener));
CHECK(channel->Connect());
listener.Init(channel.get());
- Send(channel.get(), "hello from child");
+ IPC::TestChannelListener::SendOneMessage(channel.get(), "hello from child");
base::MessageLoop::current()->Run();
return 0;
diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc
index 0dcde17..eabf85e 100644
--- a/ipc/ipc_channel_win.cc
+++ b/ipc/ipc_channel_win.cc
@@ -137,6 +137,16 @@ base::ProcessId ChannelWin::GetPeerPID() const {
return peer_pid_;
}
+base::ProcessId ChannelWin::GetSelfPID() const {
+ return GetCurrentProcessId();
+}
+
+ChannelHandle ChannelWin::TakePipeHandle() {
+ ChannelHandle handle = ChannelHandle(pipe_);
+ pipe_ = INVALID_HANDLE_VALUE;
+ return handle;
+}
+
// static
bool ChannelWin::IsNamedServerInitialized(
const std::string& channel_id) {
diff --git a/ipc/ipc_channel_win.h b/ipc/ipc_channel_win.h
index bb20950..2bbb294 100644
--- a/ipc/ipc_channel_win.h
+++ b/ipc/ipc_channel_win.h
@@ -35,6 +35,8 @@ class ChannelWin : public Channel,
virtual void Close() OVERRIDE;
virtual bool Send(Message* message) OVERRIDE;
virtual base::ProcessId GetPeerPID() const OVERRIDE;
+ virtual base::ProcessId GetSelfPID() const OVERRIDE;
+ virtual ChannelHandle TakePipeHandle() OVERRIDE;
static bool IsNamedServerInitialized(const std::string& channel_id);
diff --git a/ipc/ipc_export.h b/ipc/ipc_export.h
index 776b3ee..9f8411c 100644
--- a/ipc/ipc_export.h
+++ b/ipc/ipc_export.h
@@ -17,16 +17,31 @@
#define IPC_EXPORT __declspec(dllimport)
#endif // defined(IPC_IMPLEMENTATION)
+#if defined(IPC_MOJO_IMPLEMENTATION)
+#define IPC_MOJO_EXPORT __declspec(dllexport)
+#else
+#define IPC_MOJO_EXPORT __declspec(dllimport)
+#endif // defined(IPC_MOJO_IMPLEMENTATION)
+
#else // defined(WIN32)
+
#if defined(IPC_IMPLEMENTATION)
#define IPC_EXPORT __attribute__((visibility("default")))
#else
#define IPC_EXPORT
#endif
+
+#if defined(IPC_MOJO_IMPLEMENTATION)
+#define IPC_MOJO_EXPORT __attribute__((visibility("default")))
+#else
+#define IPC_MOJO_EXPORT
+#endif
+
#endif
#else // defined(COMPONENT_BUILD)
#define IPC_EXPORT
+#define IPC_MOJO_EXPORT
#endif
#endif // IPC_IPC_EXPORT_H_
diff --git a/ipc/ipc_message.h b/ipc/ipc_message.h
index fc37d72..198e6c0 100644
--- a/ipc/ipc_message.h
+++ b/ipc/ipc_message.h
@@ -222,6 +222,7 @@ class IPC_EXPORT Message : public Pickle {
protected:
friend class Channel;
+ friend class ChannelMojo;
friend class ChannelNacl;
friend class ChannelPosix;
friend class ChannelWin;
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index a7ed230..0c4702c 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -13,6 +13,7 @@
#include "base/synchronization/waitable_event_watcher.h"
#include "base/thread_task_runner_handle.h"
#include "base/threading/thread_local.h"
+#include "ipc/ipc_channel_factory.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_macros.h"
#include "ipc/ipc_sync_message.h"
@@ -420,6 +421,19 @@ scoped_ptr<SyncChannel> SyncChannel::Create(
// static
scoped_ptr<SyncChannel> SyncChannel::Create(
+ scoped_ptr<ChannelFactory> factory,
+ Listener* listener,
+ base::SingleThreadTaskRunner* ipc_task_runner,
+ bool create_pipe_now,
+ base::WaitableEvent* shutdown_event) {
+ scoped_ptr<SyncChannel> channel =
+ Create(listener, ipc_task_runner, shutdown_event);
+ channel->Init(factory.Pass(), create_pipe_now);
+ return channel.Pass();
+}
+
+// static
+scoped_ptr<SyncChannel> SyncChannel::Create(
Listener* listener,
base::SingleThreadTaskRunner* ipc_task_runner,
WaitableEvent* shutdown_event) {
diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h
index 8984184..f8207ce 100644
--- a/ipc/ipc_sync_channel.h
+++ b/ipc/ipc_sync_channel.h
@@ -23,6 +23,7 @@ class WaitableEvent;
namespace IPC {
class SyncMessage;
+class ChannelFactory;
// This is similar to ChannelProxy, with the added feature of supporting sending
// synchronous messages.
@@ -75,6 +76,13 @@ class IPC_EXPORT SyncChannel : public ChannelProxy {
bool create_pipe_now,
base::WaitableEvent* shutdown_event);
+ static scoped_ptr<SyncChannel> Create(
+ scoped_ptr<ChannelFactory> factory,
+ Listener* listener,
+ base::SingleThreadTaskRunner* ipc_task_runner,
+ bool create_pipe_now,
+ base::WaitableEvent* shutdown_event);
+
// Creates an uninitialized sync channel. Call ChannelProxy::Init to
// initialize the channel. This two-step setup allows message filters to be
// added before any messages are sent or received.
diff --git a/ipc/ipc_test_base.cc b/ipc/ipc_test_base.cc
index 589ee98..f893c28 100644
--- a/ipc/ipc_test_base.cc
+++ b/ipc/ipc_test_base.cc
@@ -59,6 +59,15 @@ bool IPCTestBase::ConnectChannel() {
return channel_->Connect();
}
+scoped_ptr<IPC::Channel> IPCTestBase::ReleaseChannel() {
+ return channel_.Pass();
+}
+
+void IPCTestBase::SetChannel(scoped_ptr<IPC::Channel> channel) {
+ channel_ = channel.Pass();
+}
+
+
void IPCTestBase::DestroyChannel() {
DCHECK(channel_.get());
channel_.reset();
@@ -120,3 +129,7 @@ bool IPCTestBase::WaitForClientShutdown() {
client_process_ = base::kNullProcessHandle;
return rv;
}
+
+scoped_refptr<base::TaskRunner> IPCTestBase::io_thread_task_runner() {
+ return message_loop_->message_loop_proxy();
+}
diff --git a/ipc/ipc_test_base.h b/ipc/ipc_test_base.h
index 5bd3e96..ce3328a 100644
--- a/ipc/ipc_test_base.h
+++ b/ipc/ipc_test_base.h
@@ -47,6 +47,11 @@ class IPCTestBase : public base::MultiProcessTest {
bool ConnectChannel();
void DestroyChannel();
+ // Releases or replaces existing channel.
+ // These are useful for testing specific types of channel subclasses.
+ scoped_ptr<IPC::Channel> ReleaseChannel();
+ void SetChannel(scoped_ptr<IPC::Channel> channel);
+
// Use this instead of CreateChannel() if you want to use some different
// channel specification (then use ConnectChannel() as usual).
void CreateChannelFromChannelHandle(const IPC::ChannelHandle& channel_handle,
@@ -81,6 +86,7 @@ class IPCTestBase : public base::MultiProcessTest {
IPC::ChannelProxy* channel_proxy() { return channel_proxy_.get(); }
const base::ProcessHandle& client_process() const { return client_process_; }
+ scoped_refptr<base::TaskRunner> io_thread_task_runner();
private:
std::string test_client_name_;
diff --git a/ipc/ipc_test_channel_listener.cc b/ipc/ipc_test_channel_listener.cc
new file mode 100644
index 0000000..e98f6b7
--- /dev/null
+++ b/ipc/ipc_test_channel_listener.cc
@@ -0,0 +1,62 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/ipc_test_channel_listener.h"
+
+#include "ipc/ipc_message.h"
+#include "ipc/ipc_sender.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace IPC {
+
+// static
+void TestChannelListener::SendOneMessage(IPC::Sender* sender,
+ const char* text) {
+ static int message_index = 0;
+
+ IPC::Message* message = new IPC::Message(0,
+ 2,
+ IPC::Message::PRIORITY_NORMAL);
+ message->WriteInt(message_index++);
+ message->WriteString(std::string(text));
+
+ // Make sure we can handle large messages.
+ char junk[kLongMessageStringNumBytes];
+ memset(junk, 'a', sizeof(junk)-1);
+ junk[sizeof(junk)-1] = 0;
+ message->WriteString(std::string(junk));
+
+ sender->Send(message);
+}
+
+
+bool TestChannelListener::OnMessageReceived(const IPC::Message& message) {
+ PickleIterator iter(message);
+
+ int ignored;
+ EXPECT_TRUE(iter.ReadInt(&ignored));
+ std::string data;
+ EXPECT_TRUE(iter.ReadString(&data));
+ std::string big_string;
+ EXPECT_TRUE(iter.ReadString(&big_string));
+ EXPECT_EQ(kLongMessageStringNumBytes - 1, big_string.length());
+
+ SendNextMessage();
+ return true;
+}
+
+void TestChannelListener::OnChannelError() {
+ // There is a race when closing the channel so the last message may be lost.
+ EXPECT_LE(messages_left_, 1);
+ base::MessageLoop::current()->Quit();
+}
+
+void TestChannelListener::SendNextMessage() {
+ if (--messages_left_ <= 0)
+ base::MessageLoop::current()->Quit();
+ else
+ SendOneMessage(sender_, "Foo");
+}
+
+}
diff --git a/ipc/ipc_test_channel_listener.h b/ipc/ipc_test_channel_listener.h
new file mode 100644
index 0000000..047e15d
--- /dev/null
+++ b/ipc/ipc_test_channel_listener.h
@@ -0,0 +1,44 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_TEST_CHANNEL_LISTENER_H_
+#define IPC_IPC_TEST_CHANNEL_LISTENER_H_
+
+#include "ipc/ipc_listener.h"
+
+namespace IPC {
+
+class Sender;
+
+// A generic listener that expects messages of a certain type (see
+// OnMessageReceived()), and either sends a generic response or quits after the
+// 50th message (or on channel error).
+class TestChannelListener : public Listener {
+ public:
+ static const size_t kLongMessageStringNumBytes = 50000;
+ static void SendOneMessage(Sender* sender, const char* text);
+
+ TestChannelListener() : sender_(NULL), messages_left_(50) {}
+ virtual ~TestChannelListener() {}
+
+ virtual bool OnMessageReceived(const Message& message) OVERRIDE;
+ virtual void OnChannelError() OVERRIDE;
+
+ void Init(Sender* s) {
+ sender_ = s;
+ }
+
+ bool HasSentAll() const { return 0 == messages_left_; }
+
+ protected:
+ void SendNextMessage();
+
+ private:
+ Sender* sender_;
+ int messages_left_;
+};
+
+}
+
+#endif // IPC_IPC_TEST_CHANNEL_LISTENER_H_
diff --git a/ipc/ipc_test_sink.cc b/ipc/ipc_test_sink.cc
index 9e9d1fd..b1a21bf 100644
--- a/ipc/ipc_test_sink.cc
+++ b/ipc/ipc_test_sink.cc
@@ -35,6 +35,15 @@ base::ProcessId TestSink::GetPeerPID() const {
return base::ProcessId();
}
+base::ProcessId TestSink::GetSelfPID() const {
+ NOTIMPLEMENTED();
+ return base::ProcessId();
+}
+
+ChannelHandle TestSink::TakePipeHandle() {
+ NOTIMPLEMENTED();
+ return ChannelHandle();
+}
bool TestSink::OnMessageReceived(const Message& msg) {
ObserverListBase<Listener>::Iterator it(filter_list_);
diff --git a/ipc/ipc_test_sink.h b/ipc/ipc_test_sink.h
index 1a213ee1..54bab34 100644
--- a/ipc/ipc_test_sink.h
+++ b/ipc/ipc_test_sink.h
@@ -81,6 +81,8 @@ class TestSink : public Channel {
virtual bool Connect() OVERRIDE WARN_UNUSED_RESULT;
virtual void Close() OVERRIDE;
virtual base::ProcessId GetPeerPID() const OVERRIDE;
+ virtual base::ProcessId GetSelfPID() const OVERRIDE;
+ virtual ChannelHandle TakePipeHandle() OVERRIDE;
#if defined(OS_POSIX) && !defined(OS_NACL)
virtual int GetClientFileDescriptor() const OVERRIDE;
diff --git a/ipc/mojo/BUILD.gn b/ipc/mojo/BUILD.gn
new file mode 100644
index 0000000..accc89d
--- /dev/null
+++ b/ipc/mojo/BUILD.gn
@@ -0,0 +1,41 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+component("mojo") {
+ sources = [
+ "ipc_channel_mojo.cc",
+ "ipc_channel_mojo.h",
+ "ipc_message_pipe_reader.cc",
+ "ipc_message_pipe_reader.h",
+ ]
+
+ deps = [
+ "//base",
+ "//ipc",
+ "//mojo/public/cpp/bindings",
+ "//mojo/system",
+ # TODO(viettrungluu): Needed for base/lazy_instance.h, which is suspect.
+ "//base/third_party/dynamic_annotations",
+ ]
+}
+
+test("ipc_mojo_unittests") {
+ sources = [
+ "ipc_channel_mojo_unittest.cc",
+ "run_all_unittests.cc",
+ ]
+
+ deps = [
+ "//base",
+ # TODO(viettrungluu): Needed for base/lazy_instance.h, which is suspect.
+ "//base/test:test_support",
+ "//base/third_party/dynamic_annotations",
+ "//ipc",
+ "//ipc:test_support",
+ "//ipc/mojo",
+ "//mojo/environment:chromium",
+ "//mojo/system",
+ "//url",
+ ]
+}
diff --git a/ipc/mojo/DEPS b/ipc/mojo/DEPS
new file mode 100644
index 0000000..6706a1f
--- /dev/null
+++ b/ipc/mojo/DEPS
@@ -0,0 +1,4 @@
+include_rules = [
+ "+mojo/public",
+ "+mojo/embedder",
+] \ No newline at end of file
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
new file mode 100644
index 0000000..27d35b7
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -0,0 +1,596 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_channel_mojo.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/lazy_instance.h"
+#include "ipc/ipc_listener.h"
+#include "mojo/embedder/embedder.h"
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+#include "ipc/file_descriptor_set_posix.h"
+#endif
+
+namespace IPC {
+
+namespace {
+
+// IPC::Listener for bootstrap channels.
+// It should never receive any message.
+class NullListener : public Listener {
+ public:
+ virtual bool OnMessageReceived(const Message&) OVERRIDE {
+ NOTREACHED();
+ return false;
+ }
+
+ virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnBadMessageReceived(const Message& message) OVERRIDE {
+ NOTREACHED();
+ }
+};
+
+base::LazyInstance<NullListener> g_null_listener = LAZY_INSTANCE_INITIALIZER;
+
+class MojoChannelFactory : public ChannelFactory {
+ public:
+ MojoChannelFactory(
+ ChannelHandle channel_handle,
+ Channel::Mode mode,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner)
+ : channel_handle_(channel_handle),
+ mode_(mode),
+ io_thread_task_runner_(io_thread_task_runner) {
+ }
+
+ virtual std::string GetName() const OVERRIDE {
+ return channel_handle_.name;
+ }
+
+ virtual scoped_ptr<Channel> BuildChannel(Listener* listener) OVERRIDE {
+ return ChannelMojo::Create(
+ channel_handle_,
+ mode_,
+ listener,
+ io_thread_task_runner_).PassAs<Channel>();
+ }
+
+ private:
+ ChannelHandle channel_handle_;
+ Channel::Mode mode_;
+ scoped_refptr<base::TaskRunner> io_thread_task_runner_;
+};
+
+mojo::embedder::PlatformHandle ToPlatformHandle(
+ const ChannelHandle& handle) {
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ return mojo::embedder::PlatformHandle(handle.socket.fd);
+#elif defined(OS_WIN)
+ return mojo::embedder::PlatformHandle(handle.pipe.handle);
+#else
+#error "Unsupported Platform!"
+#endif
+}
+
+//------------------------------------------------------------------------------
+
+// TODO(morrita): This should be built using higher-level Mojo construct
+// for clarity and extensibility.
+class HelloMessage {
+ public:
+ static Pickle CreateRequest(int32 pid) {
+ Pickle request;
+ request.WriteString(kHelloRequestMagic);
+ request.WriteInt(pid);
+ return request;
+ }
+
+ static bool ReadRequest(Pickle& pickle, int32* pid) {
+ PickleIterator iter(pickle);
+ std::string hello;
+ if (!iter.ReadString(&hello)) {
+ DLOG(WARNING) << "Failed to Read magic string.";
+ return false;
+ }
+
+ if (hello != kHelloRequestMagic) {
+ DLOG(WARNING) << "Magic mismatch:" << hello;
+ return false;
+ }
+
+ int read_pid;
+ if (!iter.ReadInt(&read_pid)) {
+ DLOG(WARNING) << "Failed to Read PID.";
+ return false;
+ }
+
+ *pid = read_pid;
+ return true;
+ }
+
+ static Pickle CreateResponse(int32 pid) {
+ Pickle request;
+ request.WriteString(kHelloResponseMagic);
+ request.WriteInt(pid);
+ return request;
+ }
+
+ static bool ReadResponse(Pickle& pickle, int32* pid) {
+ PickleIterator iter(pickle);
+ std::string hello;
+ if (!iter.ReadString(&hello)) {
+ DLOG(WARNING) << "Failed to read magic string.";
+ return false;
+ }
+
+ if (hello != kHelloResponseMagic) {
+ DLOG(WARNING) << "Magic mismatch:" << hello;
+ return false;
+ }
+
+ int read_pid;
+ if (!iter.ReadInt(&read_pid)) {
+ DLOG(WARNING) << "Failed to read PID.";
+ return false;
+ }
+
+ *pid = read_pid;
+ return true;
+ }
+
+ private:
+ static const char* kHelloRequestMagic;
+ static const char* kHelloResponseMagic;
+};
+
+const char* HelloMessage::kHelloRequestMagic = "MREQ";
+const char* HelloMessage::kHelloResponseMagic = "MRES";
+
+} // namespace
+
+//------------------------------------------------------------------------------
+
+// A MessagePipeReader implemenation for IPC::Message communication.
+class ChannelMojo::MessageReader : public internal::MessagePipeReader {
+ public:
+ MessageReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : internal::MessagePipeReader(pipe.Pass()),
+ owner_(owner) {}
+
+ bool Send(scoped_ptr<Message> message);
+ virtual void OnMessageReceived() OVERRIDE;
+ virtual void OnPipeClosed() OVERRIDE;
+ virtual void OnPipeError(MojoResult error) OVERRIDE;
+
+ private:
+ ChannelMojo* owner_;
+};
+
+void ChannelMojo::MessageReader::OnMessageReceived() {
+ Message message(data_buffer().empty() ? "" : &data_buffer()[0],
+ static_cast<uint32>(data_buffer().size()));
+
+ std::vector<MojoHandle> handle_buffer;
+ TakeHandleBuffer(&handle_buffer);
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ for (size_t i = 0; i < handle_buffer.size(); ++i) {
+ mojo::embedder::ScopedPlatformHandle platform_handle;
+ MojoResult unwrap_result = mojo::embedder::PassWrappedPlatformHandle(
+ handle_buffer[i], &platform_handle);
+ if (unwrap_result != MOJO_RESULT_OK) {
+ DLOG(WARNING) << "Pipe failed to covert handles. Closing: "
+ << unwrap_result;
+ CloseWithError(unwrap_result);
+ return;
+ }
+
+ bool ok = message.file_descriptor_set()->Add(platform_handle.release().fd);
+ DCHECK(ok);
+ }
+#else
+ DCHECK(handle_buffer.empty());
+#endif
+
+ message.TraceMessageEnd();
+ owner_->OnMessageReceived(message);
+}
+
+void ChannelMojo::MessageReader::OnPipeClosed() {
+ if (!owner_)
+ return;
+ owner_->OnPipeClosed(this);
+ owner_ = NULL;
+}
+
+void ChannelMojo::MessageReader::OnPipeError(MojoResult error) {
+ if (!owner_)
+ return;
+ owner_->OnPipeError(this);
+}
+
+bool ChannelMojo::MessageReader::Send(scoped_ptr<Message> message) {
+ DCHECK(IsValid());
+
+ message->TraceMessageBegin();
+ std::vector<MojoHandle> handles;
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ if (message->HasFileDescriptors()) {
+ FileDescriptorSet* fdset = message->file_descriptor_set();
+ for (size_t i = 0; i < fdset->size(); ++i) {
+ MojoHandle wrapped_handle;
+ MojoResult wrap_result = CreatePlatformHandleWrapper(
+ mojo::embedder::ScopedPlatformHandle(
+ mojo::embedder::PlatformHandle(
+ fdset->GetDescriptorAt(i))),
+ &wrapped_handle);
+ if (MOJO_RESULT_OK != wrap_result) {
+ DLOG(WARNING) << "Pipe failed to wrap handles. Closing: "
+ << wrap_result;
+ CloseWithError(wrap_result);
+ return false;
+ }
+
+ handles.push_back(wrapped_handle);
+ }
+ }
+#endif
+ MojoResult write_result = MojoWriteMessage(
+ handle(),
+ message->data(), static_cast<uint32>(message->size()),
+ handles.empty() ? NULL : &handles[0],
+ static_cast<uint32>(handles.size()),
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ if (MOJO_RESULT_OK != write_result) {
+ CloseWithError(write_result);
+ return false;
+ }
+
+ return true;
+}
+
+//------------------------------------------------------------------------------
+
+// MessagePipeReader implemenation for control messages.
+// Actual message handling is implemented by sublcasses.
+class ChannelMojo::ControlReader : public internal::MessagePipeReader {
+ public:
+ ControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : internal::MessagePipeReader(pipe.Pass()),
+ owner_(owner) {}
+
+ virtual bool Connect() { return true; }
+ virtual void OnPipeClosed() OVERRIDE;
+ virtual void OnPipeError(MojoResult error) OVERRIDE;
+
+ protected:
+ ChannelMojo* owner_;
+};
+
+void ChannelMojo::ControlReader::OnPipeClosed() {
+ if (!owner_)
+ return;
+ owner_->OnPipeClosed(this);
+ owner_ = NULL;
+}
+
+void ChannelMojo::ControlReader::OnPipeError(MojoResult error) {
+ if (!owner_)
+ return;
+ owner_->OnPipeError(this);
+}
+
+//------------------------------------------------------------------------------
+
+// ControlReader for server-side ChannelMojo.
+class ChannelMojo::ServerControlReader : public ChannelMojo::ControlReader {
+ public:
+ ServerControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : ControlReader(pipe.Pass(), owner) { }
+
+ virtual bool Connect() OVERRIDE;
+ virtual void OnMessageReceived() OVERRIDE;
+
+ private:
+ MojoResult SendHelloRequest();
+ MojoResult RespondHelloResponse();
+
+ mojo::ScopedMessagePipeHandle message_pipe_;
+};
+
+bool ChannelMojo::ServerControlReader::Connect() {
+ MojoResult result = SendHelloRequest();
+ if (result != MOJO_RESULT_OK) {
+ CloseWithError(result);
+ return false;
+ }
+
+ return true;
+}
+
+MojoResult ChannelMojo::ServerControlReader::SendHelloRequest() {
+ DCHECK(IsValid());
+ DCHECK(!message_pipe_.is_valid());
+
+ mojo::ScopedMessagePipeHandle self;
+ mojo::ScopedMessagePipeHandle peer;
+ MojoResult create_result = mojo::CreateMessagePipe(
+ NULL, &message_pipe_, &peer);
+ if (MOJO_RESULT_OK != create_result) {
+ DLOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
+ return create_result;
+ }
+
+ MojoHandle peer_to_send = peer.get().value();
+ Pickle request = HelloMessage::CreateRequest(owner_->GetSelfPID());
+ MojoResult write_result = MojoWriteMessage(
+ handle(),
+ request.data(), static_cast<uint32>(request.size()),
+ &peer_to_send, 1,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ if (MOJO_RESULT_OK != write_result) {
+ DLOG(WARNING) << "Writing Hello request failed: " << create_result;
+ return write_result;
+ }
+
+ // |peer| is sent and no longer owned by |this|.
+ (void)peer.release();
+ return MOJO_RESULT_OK;
+}
+
+MojoResult ChannelMojo::ServerControlReader::RespondHelloResponse() {
+ Pickle request(data_buffer().empty() ? "" : &data_buffer()[0],
+ static_cast<uint32>(data_buffer().size()));
+
+ int32 read_pid = 0;
+ if (!HelloMessage::ReadResponse(request, &read_pid)) {
+ DLOG(ERROR) << "Failed to parse Hello response.";
+ return MOJO_RESULT_UNKNOWN;
+ }
+
+ base::ProcessId pid = static_cast<base::ProcessId>(read_pid);
+ owner_->set_peer_pid(pid);
+ owner_->OnConnected(message_pipe_.Pass());
+ return MOJO_RESULT_OK;
+}
+
+void ChannelMojo::ServerControlReader::OnMessageReceived() {
+ MojoResult result = RespondHelloResponse();
+ if (result != MOJO_RESULT_OK)
+ CloseWithError(result);
+}
+
+//------------------------------------------------------------------------------
+
+// ControlReader for client-side ChannelMojo.
+class ChannelMojo::ClientControlReader : public ChannelMojo::ControlReader {
+ public:
+ ClientControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : ControlReader(pipe.Pass(), owner) {}
+
+ virtual void OnMessageReceived() OVERRIDE;
+
+ private:
+ MojoResult RespondHelloRequest(MojoHandle message_channel);
+};
+
+MojoResult ChannelMojo::ClientControlReader::RespondHelloRequest(
+ MojoHandle message_channel) {
+ DCHECK(IsValid());
+
+ mojo::ScopedMessagePipeHandle received_pipe(
+ (mojo::MessagePipeHandle(message_channel)));
+
+ int32 read_request = 0;
+ Pickle request(data_buffer().empty() ? "" : &data_buffer()[0],
+ static_cast<uint32>(data_buffer().size()));
+ if (!HelloMessage::ReadRequest(request, &read_request)) {
+ DLOG(ERROR) << "Hello request has wrong magic.";
+ return MOJO_RESULT_UNKNOWN;
+ }
+
+ base::ProcessId pid = read_request;
+ Pickle response = HelloMessage::CreateResponse(owner_->GetSelfPID());
+ MojoResult write_result = MojoWriteMessage(
+ handle(),
+ response.data(), static_cast<uint32>(response.size()),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ if (MOJO_RESULT_OK != write_result) {
+ DLOG(ERROR) << "Writing Hello response failed: " << write_result;
+ return write_result;
+ }
+
+ owner_->set_peer_pid(pid);
+ owner_->OnConnected(received_pipe.Pass());
+ return MOJO_RESULT_OK;
+}
+
+void ChannelMojo::ClientControlReader::OnMessageReceived() {
+ std::vector<MojoHandle> handle_buffer;
+ TakeHandleBuffer(&handle_buffer);
+ if (handle_buffer.size() != 1) {
+ DLOG(ERROR) << "Hello request doesn't contains required handle: "
+ << handle_buffer.size();
+ CloseWithError(MOJO_RESULT_UNKNOWN);
+ return;
+ }
+
+ MojoResult result = RespondHelloRequest(handle_buffer[0]);
+ if (result != MOJO_RESULT_OK) {
+ DLOG(ERROR) << "Failed to respond Hello request. Closing: "
+ << result;
+ CloseWithError(result);
+ }
+}
+
+//------------------------------------------------------------------------------
+
+void ChannelMojo::ChannelInfoDeleter::operator()(
+ mojo::embedder::ChannelInfo* ptr) const {
+ mojo::embedder::DestroyChannelOnIOThread(ptr);
+}
+
+//------------------------------------------------------------------------------
+
+// static
+scoped_ptr<ChannelMojo> ChannelMojo::Create(
+ scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ return make_scoped_ptr(new ChannelMojo(
+ bootstrap.Pass(), mode, listener, io_thread_task_runner));
+}
+
+// static
+scoped_ptr<ChannelMojo> ChannelMojo::Create(
+ const ChannelHandle &channel_handle, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ return Create(
+ Channel::Create(channel_handle, mode, g_null_listener.Pointer()),
+ mode, listener, io_thread_task_runner);
+}
+
+// static
+scoped_ptr<ChannelFactory> ChannelMojo::CreateFactory(
+ const ChannelHandle &channel_handle, Mode mode,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ return make_scoped_ptr(
+ new MojoChannelFactory(
+ channel_handle, mode,
+ io_thread_task_runner)).PassAs<ChannelFactory>();
+}
+
+ChannelMojo::ChannelMojo(
+ scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner)
+ : weak_factory_(this),
+ bootstrap_(bootstrap.Pass()),
+ mode_(mode), listener_(listener),
+ peer_pid_(base::kNullProcessId) {
+ DCHECK(mode_ == MODE_SERVER || mode_ == MODE_CLIENT);
+ mojo::ScopedMessagePipeHandle control_pipe
+ = mojo::embedder::CreateChannel(
+ mojo::embedder::ScopedPlatformHandle(
+ ToPlatformHandle(bootstrap_->TakePipeHandle())),
+ io_thread_task_runner,
+ base::Bind(&ChannelMojo::DidCreateChannel, base::Unretained(this)),
+ io_thread_task_runner);
+
+ // MessagePipeReader, that is crated in InitOnIOThread(), should live only in
+ // IO thread, but IPC::Channel can be instantiated outside of it.
+ // So we move the creation to the appropriate thread.
+ if (base::MessageLoopProxy::current() == io_thread_task_runner) {
+ InitOnIOThread(control_pipe.Pass());
+ } else {
+ io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelMojo::InitOnIOThread,
+ weak_factory_.GetWeakPtr(),
+ base::Passed(control_pipe.Pass())));
+ }
+}
+
+ChannelMojo::~ChannelMojo() {
+ Close();
+}
+
+void ChannelMojo::InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe) {
+ control_reader_ = CreateControlReader(control_pipe.Pass());
+}
+
+scoped_ptr<ChannelMojo::ControlReader> ChannelMojo::CreateControlReader(
+ mojo::ScopedMessagePipeHandle pipe) {
+ if (MODE_SERVER == mode_) {
+ return make_scoped_ptr(
+ new ServerControlReader(pipe.Pass(), this)).PassAs<ControlReader>();
+ }
+
+ DCHECK(mode_ == MODE_CLIENT);
+ return make_scoped_ptr(
+ new ClientControlReader(pipe.Pass(), this)).PassAs<ControlReader>();
+}
+
+bool ChannelMojo::Connect() {
+ DCHECK(!message_reader_);
+ return control_reader_->Connect();
+}
+
+void ChannelMojo::Close() {
+ control_reader_.reset();
+ message_reader_.reset();
+ channel_info_.reset();
+}
+
+void ChannelMojo::OnConnected(mojo::ScopedMessagePipeHandle pipe) {
+ message_reader_ = make_scoped_ptr(new MessageReader(pipe.Pass(), this));
+
+ for (size_t i = 0; i < pending_messages_.size(); ++i) {
+ message_reader_->Send(make_scoped_ptr(pending_messages_[i]));
+ pending_messages_[i] = NULL;
+ }
+
+ pending_messages_.clear();
+
+ listener_->OnChannelConnected(GetPeerPID());
+}
+
+void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
+ Close();
+}
+
+void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
+ listener_->OnChannelError();
+}
+
+
+bool ChannelMojo::Send(Message* message) {
+ if (!message_reader_) {
+ pending_messages_.push_back(message);
+ return true;
+ }
+
+ return message_reader_->Send(make_scoped_ptr(message));
+}
+
+base::ProcessId ChannelMojo::GetPeerPID() const {
+ return peer_pid_;
+}
+
+base::ProcessId ChannelMojo::GetSelfPID() const {
+ return bootstrap_->GetSelfPID();
+}
+
+ChannelHandle ChannelMojo::TakePipeHandle() {
+ return bootstrap_->TakePipeHandle();
+}
+
+void ChannelMojo::DidCreateChannel(mojo::embedder::ChannelInfo* info) {
+ channel_info_.reset(info);
+}
+
+void ChannelMojo::OnMessageReceived(Message& message) {
+ listener_->OnMessageReceived(message);
+ if (message.dispatch_error())
+ listener_->OnBadMessageReceived(message);
+}
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+int ChannelMojo::GetClientFileDescriptor() const {
+ return bootstrap_->GetClientFileDescriptor();
+}
+
+int ChannelMojo::TakeClientFileDescriptor() {
+ return bootstrap_->TakeClientFileDescriptor();
+}
+#endif // defined(OS_POSIX) && !defined(OS_NACL)
+
+} // namespace IPC
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
new file mode 100644
index 0000000..b00abc9
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -0,0 +1,131 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_CHANNEL_MOJO_H_
+#define IPC_IPC_CHANNEL_MOJO_H_
+
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/scoped_vector.h"
+#include "base/memory/weak_ptr.h"
+#include "ipc/ipc_channel.h"
+#include "ipc/ipc_channel_factory.h"
+#include "ipc/ipc_export.h"
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace mojo {
+namespace embedder {
+struct ChannelInfo;
+}
+}
+
+namespace IPC {
+
+// Mojo-based IPC::Channel implementation over a platform handle.
+//
+// ChannelMojo builds Mojo MessagePipe using underlying pipe given by
+// "bootstrap" IPC::Channel which creates and owns platform pipe like
+// named socket. The bootstrap Channel is used only for establishing
+// the underlying connection. ChannelMojo takes its handle over once
+// the it is made and puts MessagePipe on it.
+//
+// ChannelMojo has a couple of MessagePipes:
+//
+// * The first MessagePipe, which is built on top of bootstrap handle,
+// is the "control" pipe. It is used to communicate out-of-band
+// control messages that aren't visible from IPC::Listener.
+//
+// * The second MessagePipe, which is created by the server channel
+// and sent to client Channel over the control pipe, is used
+// to send IPC::Messages as an IPC::Sender.
+//
+// TODO(morrita): Extract handle creation part of IPC::Channel into
+// separate class to clarify what ChannelMojo relies
+// on.
+// TODO(morrita): Add APIs to create extra MessagePipes to let
+// Mojo-based objects talk over this Channel.
+//
+class IPC_MOJO_EXPORT ChannelMojo : public Channel {
+ public:
+ // Create ChannelMojo on top of given |bootstrap| channel.
+ static scoped_ptr<ChannelMojo> Create(
+ scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ // Create ChannelMojo. A bootstrap channel is created as well.
+ static scoped_ptr<ChannelMojo> Create(
+ const ChannelHandle &channel_handle, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ // Create a factory object for ChannelMojo.
+ // The factory is used to create Mojo-based ChannelProxy family.
+ static scoped_ptr<ChannelFactory> CreateFactory(
+ const ChannelHandle &channel_handle, Mode mode,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ virtual ~ChannelMojo();
+
+ // Channel implementation
+ virtual bool Connect() OVERRIDE;
+ virtual void Close() OVERRIDE;
+ virtual bool Send(Message* message) OVERRIDE;
+ virtual base::ProcessId GetPeerPID() const OVERRIDE;
+ virtual base::ProcessId GetSelfPID() const OVERRIDE;
+ virtual ChannelHandle TakePipeHandle() OVERRIDE;
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ virtual int GetClientFileDescriptor() const OVERRIDE;
+ virtual int TakeClientFileDescriptor() OVERRIDE;
+#endif // defined(OS_POSIX) && !defined(OS_NACL)
+
+ // Called from MessagePipeReader implementations
+ void OnMessageReceived(Message& message);
+ void OnConnected(mojo::ScopedMessagePipeHandle pipe);
+ void OnPipeClosed(internal::MessagePipeReader* reader);
+ void OnPipeError(internal::MessagePipeReader* reader);
+ void set_peer_pid(base::ProcessId pid) { peer_pid_ = pid; }
+
+ private:
+ struct ChannelInfoDeleter {
+ void operator()(mojo::embedder::ChannelInfo* ptr) const;
+ };
+
+ // ChannelMojo needs to kill its MessagePipeReader in delayed manner
+ // because the channel wants to kill these readers during the
+ // notifications invoked by them.
+ typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;
+
+ class ControlReader;
+ class ServerControlReader;
+ class ClientControlReader;
+ class MessageReader;
+
+ ChannelMojo(scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ void InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe);
+ scoped_ptr<ControlReader> CreateControlReader(
+ mojo::ScopedMessagePipeHandle pipe);
+ void DidCreateChannel(mojo::embedder::ChannelInfo*);
+
+ base::WeakPtrFactory<ChannelMojo> weak_factory_;
+ scoped_ptr<Channel> bootstrap_;
+ Mode mode_;
+ Listener* listener_;
+ base::ProcessId peer_pid_;
+ scoped_ptr<mojo::embedder::ChannelInfo,
+ ChannelInfoDeleter> channel_info_;
+
+ scoped_ptr<ControlReader, ReaderDeleter> control_reader_;
+ scoped_ptr<MessageReader, ReaderDeleter> message_reader_;
+ ScopedVector<Message> pending_messages_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChannelMojo);
+};
+
+} // namespace IPC
+
+#endif // IPC_IPC_CHANNEL_MOJO_H_
diff --git a/ipc/mojo/ipc_channel_mojo_unittest.cc b/ipc/mojo/ipc_channel_mojo_unittest.cc
new file mode 100644
index 0000000..915029d
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo_unittest.cc
@@ -0,0 +1,260 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_channel_mojo.h"
+
+#include "base/base_paths.h"
+#include "base/files/file.h"
+#include "base/message_loop/message_loop.h"
+#include "base/path_service.h"
+#include "base/pickle.h"
+#include "base/threading/thread.h"
+#include "ipc/ipc_message.h"
+#include "ipc/ipc_test_base.h"
+#include "ipc/ipc_test_channel_listener.h"
+
+#if defined(OS_POSIX)
+#include "base/file_descriptor_posix.h"
+#endif
+
+namespace {
+
+class ListenerThatExpectsOK : public IPC::Listener {
+ public:
+ ListenerThatExpectsOK() {}
+
+ virtual ~ListenerThatExpectsOK() {}
+
+ virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+ PickleIterator iter(message);
+ std::string should_be_ok;
+ EXPECT_TRUE(iter.ReadString(&should_be_ok));
+ EXPECT_EQ(should_be_ok, "OK");
+ base::MessageLoop::current()->Quit();
+ return true;
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ static void SendOK(IPC::Sender* sender) {
+ IPC::Message* message = new IPC::Message(
+ 0, 2, IPC::Message::PRIORITY_NORMAL);
+ message->WriteString(std::string("OK"));
+ ASSERT_TRUE(sender->Send(message));
+ }
+};
+
+class ListenerThatShouldBeNeverCalled : public IPC::Listener {
+ virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+ NOTREACHED();
+ return true;
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnBadMessageReceived(const IPC::Message& message) OVERRIDE {
+ NOTREACHED();
+ }
+};
+
+class ChannelClient {
+ public:
+ explicit ChannelClient(IPC::Listener* listener, const char* name) {
+ scoped_ptr<IPC::Channel> bootstrap(IPC::Channel::CreateClient(
+ IPCTestBase::GetChannelName(name),
+ &never_called_));
+ channel_ = IPC::ChannelMojo::Create(
+ bootstrap.Pass(), IPC::Channel::MODE_CLIENT, listener,
+ main_message_loop_.message_loop_proxy());
+ }
+
+ void Connect() {
+ CHECK(channel_->Connect());
+ }
+
+ IPC::ChannelMojo* channel() const { return channel_.get(); }
+
+ private:
+ scoped_ptr<IPC::ChannelMojo> channel_;
+ ListenerThatShouldBeNeverCalled never_called_;
+ base::MessageLoopForIO main_message_loop_;
+};
+
+class IPCChannelMojoTest : public IPCTestBase {
+ public:
+ void CreateMojoChannel(IPC::Listener* listener);
+
+ protected:
+ virtual void SetUp() OVERRIDE {
+ IPCTestBase::SetUp();
+ }
+
+ ListenerThatShouldBeNeverCalled never_called_;
+};
+
+
+void IPCChannelMojoTest::CreateMojoChannel(IPC::Listener* listener) {
+ CreateChannel(&never_called_);
+ scoped_ptr<IPC::Channel> mojo_channel = IPC::ChannelMojo::Create(
+ ReleaseChannel(), IPC::Channel::MODE_SERVER, listener,
+ io_thread_task_runner()).PassAs<IPC::Channel>();
+ SetChannel(mojo_channel.PassAs<IPC::Channel>());
+}
+
+class TestChannelListenerWithExtraExpectations
+ : public IPC::TestChannelListener {
+ public:
+ TestChannelListenerWithExtraExpectations()
+ : is_connected_called_(false) {
+ }
+
+ virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+ IPC::TestChannelListener::OnChannelConnected(peer_pid);
+ EXPECT_TRUE(base::kNullProcessId != peer_pid);
+ is_connected_called_ = true;
+ }
+
+ bool is_connected_called() const { return is_connected_called_; }
+
+ private:
+ bool is_connected_called_;
+};
+
+TEST_F(IPCChannelMojoTest, ConnectedFromClient) {
+ Init("IPCChannelMojoTestClient");
+
+ // Set up IPC channel and start client.
+ TestChannelListenerWithExtraExpectations listener;
+ CreateMojoChannel(&listener);
+ listener.Init(sender());
+ ASSERT_TRUE(ConnectChannel());
+ ASSERT_TRUE(StartClient());
+
+ IPC::TestChannelListener::SendOneMessage(
+ sender(), "hello from parent");
+
+ base::MessageLoop::current()->Run();
+ EXPECT_TRUE(base::kNullProcessId != this->channel()->GetPeerPID());
+
+ this->channel()->Close();
+
+ EXPECT_TRUE(WaitForClientShutdown());
+ EXPECT_TRUE(listener.is_connected_called());
+ EXPECT_TRUE(listener.HasSentAll());
+
+ DestroyChannel();
+}
+
+// A long running process that connects to us
+MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestClient) {
+ TestChannelListenerWithExtraExpectations listener;
+ ChannelClient client(&listener, "IPCChannelMojoTestClient");
+ client.Connect();
+ listener.Init(client.channel());
+
+ IPC::TestChannelListener::SendOneMessage(
+ client.channel(), "hello from child");
+ base::MessageLoop::current()->Run();
+ EXPECT_TRUE(listener.is_connected_called());
+ EXPECT_TRUE(listener.HasSentAll());
+
+ return 0;
+}
+
+#if defined(OS_POSIX)
+class ListenerThatExpectsFile : public IPC::Listener {
+ public:
+ ListenerThatExpectsFile()
+ : sender_(NULL) {}
+
+ virtual ~ListenerThatExpectsFile() {}
+
+ virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+ PickleIterator iter(message);
+ base::FileDescriptor desc;
+ EXPECT_TRUE(message.ReadFileDescriptor(&iter, &desc));
+ std::string content(GetSendingFileContent().size(), ' ');
+ base::File file(desc.fd);
+ file.Read(0, &content[0], content.size());
+ EXPECT_EQ(content, GetSendingFileContent());
+ base::MessageLoop::current()->Quit();
+ ListenerThatExpectsOK::SendOK(sender_);
+ return true;
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ static std::string GetSendingFileContent() {
+ return "Hello";
+ }
+
+ static base::FilePath GetSendingFilePath() {
+ base::FilePath path;
+ bool ok = PathService::Get(base::DIR_CACHE, &path);
+ EXPECT_TRUE(ok);
+ return path.Append("ListenerThatExpectsFile.txt");
+ }
+
+ static void WriteAndSendFile(IPC::Sender* sender, base::File& file) {
+ std::string content = GetSendingFileContent();
+ file.WriteAtCurrentPos(content.data(), content.size());
+ file.Flush();
+ IPC::Message* message = new IPC::Message(
+ 0, 2, IPC::Message::PRIORITY_NORMAL);
+ message->WriteFileDescriptor(
+ base::FileDescriptor(file.TakePlatformFile(), false));
+ ASSERT_TRUE(sender->Send(message));
+ }
+
+ void set_sender(IPC::Sender* sender) { sender_ = sender; }
+
+ private:
+ IPC::Sender* sender_;
+};
+
+
+TEST_F(IPCChannelMojoTest, SendPlatformHandle) {
+ Init("IPCChannelMojoTestSendPlatformHandleClient");
+
+ ListenerThatExpectsOK listener;
+ CreateMojoChannel(&listener);
+ ASSERT_TRUE(ConnectChannel());
+ ASSERT_TRUE(StartClient());
+
+ base::File file(ListenerThatExpectsFile::GetSendingFilePath(),
+ base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE |
+ base::File::FLAG_READ);
+ ListenerThatExpectsFile::WriteAndSendFile(channel(), file);
+ base::MessageLoop::current()->Run();
+
+ this->channel()->Close();
+
+ EXPECT_TRUE(WaitForClientShutdown());
+ DestroyChannel();
+}
+
+MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestSendPlatformHandleClient) {
+ ListenerThatExpectsFile listener;
+ ChannelClient client(
+ &listener, "IPCChannelMojoTestSendPlatformHandleClient");
+ client.Connect();
+ listener.set_sender(client.channel());
+
+ base::MessageLoop::current()->Run();
+
+ return 0;
+}
+#endif
+
+} // namespace
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
new file mode 100644
index 0000000..91022ac
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -0,0 +1,144 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "mojo/public/cpp/environment/environment.h"
+
+namespace IPC {
+namespace internal {
+
+MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
+ : pipe_wait_id_(0),
+ pipe_(handle.Pass()) {
+ StartWaiting();
+}
+
+MessagePipeReader::~MessagePipeReader() {
+ CHECK(!IsValid());
+}
+
+void MessagePipeReader::Close() {
+ StopWaiting();
+ pipe_.reset();
+ OnPipeClosed();
+}
+
+void MessagePipeReader::CloseWithError(MojoResult error) {
+ OnPipeError(error);
+ Close();
+}
+
+// static
+void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
+ reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
+}
+
+void MessagePipeReader::StartWaiting() {
+ DCHECK(pipe_.is_valid());
+ DCHECK(!pipe_wait_id_);
+ // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
+ // MessagePipe.
+ //
+ // TODO(morrita): Should we re-set the signal when we get new
+ // message to send?
+ pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
+ pipe_.get().value(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ &InvokePipeIsReady,
+ this);
+}
+
+void MessagePipeReader::StopWaiting() {
+ if (!pipe_wait_id_)
+ return;
+ mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
+ pipe_wait_id_ = 0;
+}
+
+void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
+ pipe_wait_id_ = 0;
+
+ if (wait_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION happens when the pipe is
+ // closed before the waiter is scheduled in a backend thread.
+ if (wait_result != MOJO_RESULT_ABORTED &&
+ wait_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING) << "Pipe got error from the waiter. Closing: "
+ << wait_result;
+ OnPipeError(wait_result);
+ }
+
+ Close();
+ return;
+ }
+
+ while (pipe_.is_valid()) {
+ MojoResult read_result = ReadMessageBytes();
+ if (read_result == MOJO_RESULT_SHOULD_WAIT)
+ break;
+ if (read_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION means that all the received messages
+ // got consumed and the peer is already closed.
+ if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING)
+ << "Pipe got error from ReadMessage(). Closing: " << read_result;
+ OnPipeError(read_result);
+ }
+
+ Close();
+ break;
+ }
+
+ OnMessageReceived();
+ }
+
+ if (pipe_.is_valid())
+ StartWaiting();
+}
+
+MojoResult MessagePipeReader::ReadMessageBytes() {
+ DCHECK(handle_buffer_.empty());
+
+ uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
+ uint32_t num_handles = 0;
+ MojoResult result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ data_buffer_.resize(num_bytes);
+ handle_buffer_.resize(num_handles);
+ if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
+ // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
+ // it needs more bufer. So we re-read it with resized buffers.
+ result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ num_handles ? &handle_buffer_[0] : NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ }
+
+ DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
+ DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
+ return result;
+}
+
+void MessagePipeReader::DelayedDeleter::operator()(
+ MessagePipeReader* ptr) const {
+ ptr->Close();
+ base::MessageLoopProxy::current()->PostTask(
+ FROM_HERE, base::Bind(&DeleteNow, ptr));
+}
+
+} // namespace internal
+} // namespace IPC
diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h
new file mode 100644
index 0000000..ecfa018
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.h
@@ -0,0 +1,98 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_MESSAGE_PIPE_READER_H_
+#define IPC_IPC_MESSAGE_PIPE_READER_H_
+
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "mojo/public/c/environment/async_waiter.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace IPC {
+namespace internal {
+
+// A helper class to handle bytestream directly over mojo::MessagePipe
+// in template-method pattern. MessagePipeReader manages the lifetime
+// of given MessagePipe and participates the event loop, and
+// read the stream and call the client when it is ready.
+//
+// Each client has to:
+//
+// * Provide a subclass implemenation of a specific use of a MessagePipe
+// and implement callbacks.
+// * Create the subclass instance with a MessagePipeHandle.
+// The constructor automatically start listening on the pipe.
+//
+// MessageReader has to be used in IO thread. It isn't thread-safe.
+//
+class MessagePipeReader {
+ public:
+ // Delay the object deletion using the current message loop.
+ // This is intended to used by MessagePipeReader owners.
+ class DelayedDeleter {
+ public:
+ typedef base::DefaultDeleter<MessagePipeReader> DefaultType;
+
+ static void DeleteNow(MessagePipeReader* ptr) { delete ptr; }
+
+ DelayedDeleter() {}
+ DelayedDeleter(const DefaultType&) {}
+ DelayedDeleter& operator=(const DefaultType&) { return *this; }
+
+ void operator()(MessagePipeReader* ptr) const;
+ };
+
+ explicit MessagePipeReader(mojo::ScopedMessagePipeHandle handle);
+ virtual ~MessagePipeReader();
+
+ MojoHandle handle() const { return pipe_.get().value(); }
+
+ // Returns received bytes.
+ const std::vector<char>& data_buffer() const {
+ return data_buffer_;
+ }
+
+ // Delegate received handles ownership. The subclass should take the
+ // ownership over in its OnMessageReceived(). They will leak otherwise.
+ void TakeHandleBuffer(std::vector<MojoHandle>* handle_buffer) {
+ handle_buffer_.swap(*handle_buffer);
+ }
+
+ // Close and destroy the MessagePipe.
+ void Close();
+ // Close the mesage pipe with notifying the client with the error.
+ void CloseWithError(MojoResult error);
+ // Return true if the MessagePipe is alive.
+ bool IsValid() { return pipe_.is_valid(); }
+
+ //
+ // The client have to implment these callback to get the readiness
+ // event from the reader
+ //
+ virtual void OnMessageReceived() = 0;
+ virtual void OnPipeClosed() = 0;
+ virtual void OnPipeError(MojoResult error) = 0;
+
+ private:
+ static void InvokePipeIsReady(void* closure, MojoResult result);
+
+ MojoResult ReadMessageBytes();
+ void PipeIsReady(MojoResult wait_result);
+ void StartWaiting();
+ void StopWaiting();
+
+ std::vector<char> data_buffer_;
+ std::vector<MojoHandle> handle_buffer_;
+ MojoAsyncWaitID pipe_wait_id_;
+ mojo::ScopedMessagePipeHandle pipe_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
+};
+
+} // namespace internal
+} // namespace IPC
+
+#endif // IPC_IPC_MESSAGE_PIPE_READER_H_
diff --git a/ipc/mojo/ipc_mojo.gyp b/ipc/mojo/ipc_mojo.gyp
new file mode 100644
index 0000000..c408a17
--- /dev/null
+++ b/ipc/mojo/ipc_mojo.gyp
@@ -0,0 +1,68 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+{
+ 'variables': {
+ 'chromium_code': 1,
+ },
+ 'includes': [
+ ],
+ 'targets': [
+ {
+ 'target_name': 'ipc_mojo',
+ 'type': '<(component)',
+ 'variables': {
+ },
+ 'defines': [
+ 'IPC_MOJO_IMPLEMENTATION',
+ ],
+ 'dependencies': [
+ '../ipc.gyp:ipc',
+ '../../base/base.gyp:base',
+ '../../base/third_party/dynamic_annotations/dynamic_annotations.gyp:dynamic_annotations',
+ '../../mojo/mojo_base.gyp:mojo_cpp_bindings',
+ '../../mojo/mojo_base.gyp:mojo_environment_chromium',
+ '../../mojo/mojo_base.gyp:mojo_system_impl',
+ ],
+ 'sources': [
+ 'ipc_channel_mojo.cc',
+ 'ipc_channel_mojo.h',
+ 'ipc_message_pipe_reader.cc',
+ 'ipc_message_pipe_reader.h',
+ ],
+ # TODO(gregoryd): direct_dependent_settings should be shared with the
+ # 64-bit target, but it doesn't work due to a bug in gyp
+ 'direct_dependent_settings': {
+ 'include_dirs': [
+ '..',
+ ],
+ },
+ },
+ {
+ 'target_name': 'ipc_mojo_unittests',
+ 'type': '<(gtest_target_type)',
+ 'dependencies': [
+ '../ipc.gyp:ipc',
+ '../ipc.gyp:test_support_ipc',
+ '../../base/base.gyp:base',
+ '../../base/base.gyp:base_i18n',
+ '../../base/base.gyp:test_support_base',
+ '../../mojo/mojo_base.gyp:mojo_cpp_bindings',
+ '../../mojo/mojo_base.gyp:mojo_environment_chromium',
+ '../../mojo/mojo_base.gyp:mojo_system_impl',
+ '../../testing/gtest.gyp:gtest',
+ 'ipc_mojo',
+ ],
+ 'include_dirs': [
+ '..'
+ ],
+ 'sources': [
+ 'run_all_unittests.cc',
+ 'ipc_channel_mojo_unittest.cc',
+ ],
+ 'conditions': [
+ ],
+ },
+ ],
+}
diff --git a/ipc/mojo/run_all_unittests.cc b/ipc/mojo/run_all_unittests.cc
new file mode 100644
index 0000000..1734e5e
--- /dev/null
+++ b/ipc/mojo/run_all_unittests.cc
@@ -0,0 +1,42 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/at_exit.h"
+#include "base/bind.h"
+#include "base/test/launcher/unit_test_launcher.h"
+#include "base/test/test_suite.h"
+#include "mojo/embedder/embedder.h"
+
+#if defined(OS_ANDROID)
+#include "base/android/jni_android.h"
+#include "base/test/test_file_util.h"
+#endif
+
+namespace {
+
+class NoAtExitBaseTestSuite : public base::TestSuite {
+ public:
+ NoAtExitBaseTestSuite(int argc, char** argv)
+ : base::TestSuite(argc, argv, false) {
+ }
+};
+
+int RunTestSuite(int argc, char** argv) {
+ return NoAtExitBaseTestSuite(argc, argv).Run();
+}
+
+} // namespace
+
+int main(int argc, char** argv) {
+ mojo::embedder::Init();
+#if defined(OS_ANDROID)
+ JNIEnv* env = base::android::AttachCurrentThread();
+ file_util::RegisterContentUriTestUtils(env);
+#else
+ base::AtExitManager at_exit;
+#endif
+ return base::LaunchUnitTestsSerially(argc,
+ argv,
+ base::Bind(&RunTestSuite, argc, argv));
+}