diff options
49 files changed, 1848 insertions, 126 deletions
@@ -46,6 +46,7 @@ group("root") { "//google_apis", "//google_apis/gcm", "//ipc", + "//ipc/mojo", "//media", "//media/cast", "//mojo", diff --git a/build/all.gyp b/build/all.gyp index 133917b..96d4ea8 100644 --- a/build/all.gyp +++ b/build/all.gyp @@ -86,6 +86,7 @@ '../gpu/gpu.gyp:*', '../gpu/tools/tools.gyp:*', '../ipc/ipc.gyp:*', + '../ipc/mojo/ipc_mojo.gyp:*', '../jingle/jingle.gyp:*', '../media/cast/cast.gyp:*', '../media/media.gyp:*', @@ -291,6 +292,7 @@ '../gpu/gles2_conform_support/gles2_conform_support.gyp:gles2_conform_support', '../gpu/gpu.gyp:gpu_unittests', '../ipc/ipc.gyp:ipc_tests', + '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests', '../jingle/jingle.gyp:jingle_unittests', '../media/cast/cast.gyp:cast_unittests', '../media/media.gyp:media_unittests', @@ -849,6 +851,7 @@ '../google_apis/gcm/gcm.gyp:gcm_unit_tests', '../gpu/gpu.gyp:gpu_unittests', '../ipc/ipc.gyp:ipc_tests', + '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests', '../jingle/jingle.gyp:jingle_unittests', '../media/media.gyp:media_unittests', '../ppapi/ppapi_internal.gyp:ppapi_unittests', @@ -885,6 +888,7 @@ '../google_apis/gcm/gcm.gyp:gcm_unit_tests', '../gpu/gpu.gyp:gpu_unittests', '../ipc/ipc.gyp:ipc_tests', + '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests', '../jingle/jingle.gyp:jingle_unittests', '../media/media.gyp:media_unittests', '../ppapi/ppapi_internal.gyp:ppapi_unittests', @@ -982,6 +986,7 @@ '../google_apis/gcm/gcm.gyp:gcm_unit_tests', '../gpu/gpu.gyp:gpu_unittests', '../ipc/ipc.gyp:ipc_tests', + '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests', '../jingle/jingle.gyp:jingle_unittests', '../media/media.gyp:media_unittests', '../ppapi/ppapi_internal.gyp:ppapi_unittests', @@ -1077,6 +1082,7 @@ '../gpu/gpu.gyp:angle_unittests', '../gpu/gpu.gyp:gpu_unittests', '../ipc/ipc.gyp:ipc_tests', + '../ipc/mojo/ipc_mojo.gyp:ipc_mojo_unittests', '../jingle/jingle.gyp:jingle_unittests', '../media/cast/cast.gyp:cast_unittests', '../media/media.gyp:media_unittests', diff --git a/content/browser/renderer_host/render_process_host_impl.cc b/content/browser/renderer_host/render_process_host_impl.cc index a1592c3..bbd5949 100644 --- a/content/browser/renderer_host/render_process_host_impl.cc +++ b/content/browser/renderer_host/render_process_host_impl.cc @@ -137,6 +137,7 @@ #include "ipc/ipc_channel.h" #include "ipc/ipc_logging.h" #include "ipc/ipc_switches.h" +#include "ipc/mojo/ipc_channel_mojo.h" #include "media/base/media_switches.h" #include "net/url_request/url_request_context_getter.h" #include "ppapi/shared_impl/ppapi_switches.h" @@ -594,11 +595,7 @@ bool RenderProcessHostImpl::Init() { // Setup the IPC channel. const std::string channel_id = IPC::Channel::GenerateVerifiedChannelID(std::string()); - channel_ = IPC::ChannelProxy::Create( - channel_id, - IPC::Channel::MODE_SERVER, - this, - BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO).get()); + channel_ = CreateChannelProxy(channel_id); // Setup the Mojo channel. mojo_application_host_->Init(); @@ -678,6 +675,27 @@ void RenderProcessHostImpl::MaybeActivateMojo() { mojo_application_host_->Activate(this, GetHandle()); } +bool RenderProcessHostImpl::ShouldUseMojoChannel() const { + const CommandLine& command_line = *CommandLine::ForCurrentProcess(); + return command_line.HasSwitch(switches::kEnableRendererMojoChannel); +} + +scoped_ptr<IPC::ChannelProxy> RenderProcessHostImpl::CreateChannelProxy( + const std::string& channel_id) { + scoped_refptr<base::SingleThreadTaskRunner> runner = + BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO); + if (ShouldUseMojoChannel()) { + VLOG(1) << "Mojo Channel is enabled on host"; + return IPC::ChannelProxy::Create( + IPC::ChannelMojo::CreateFactory( + channel_id, IPC::Channel::MODE_SERVER, runner), + this, runner.get()); + } + + return IPC::ChannelProxy::Create( + channel_id, IPC::Channel::MODE_SERVER, this, runner.get()); +} + void RenderProcessHostImpl::CreateMessageFilters() { DCHECK_CURRENTLY_ON(BrowserThread::UI); AddFilter(new ResourceSchedulerFilter(GetID())); @@ -1128,6 +1146,7 @@ void RenderProcessHostImpl::PropagateBrowserCommandLineToRenderer( switches::kEnablePinch, switches::kEnablePreciseMemoryInfo, switches::kEnablePreparsedJsCaching, + switches::kEnableRendererMojoChannel, switches::kEnableSeccompFilterSandbox, switches::kEnableSkiaBenchmarking, switches::kEnableSmoothScrolling, diff --git a/content/browser/renderer_host/render_process_host_impl.h b/content/browser/renderer_host/render_process_host_impl.h index f4f29ac..18dac77 100644 --- a/content/browser/renderer_host/render_process_host_impl.h +++ b/content/browser/renderer_host/render_process_host_impl.h @@ -279,6 +279,9 @@ class CONTENT_EXPORT RenderProcessHostImpl friend class VisitRelayingRenderProcessHost; void MaybeActivateMojo(); + bool ShouldUseMojoChannel() const; + scoped_ptr<IPC::ChannelProxy> CreateChannelProxy( + const std::string& channel_id); // Creates and adds the IO thread message filters. void CreateMessageFilters(); diff --git a/content/child/child_thread.cc b/content/child/child_thread.cc index b153f04..7e58c1f 100644 --- a/content/child/child_thread.cc +++ b/content/child/child_thread.cc @@ -48,6 +48,7 @@ #include "ipc/ipc_switches.h" #include "ipc/ipc_sync_channel.h" #include "ipc/ipc_sync_message_filter.h" +#include "ipc/mojo/ipc_channel_mojo.h" #if defined(OS_WIN) #include "content/common/handle_enumerator_win.h" @@ -192,6 +193,17 @@ void QuitMainThreadMessageLoop() { } // namespace +ChildThread::Options::Options() + : channel_name(CommandLine::ForCurrentProcess()->GetSwitchValueASCII( + switches::kProcessChannelID)), + use_mojo_channel(false) {} + +ChildThread::Options::Options(bool mojo) + : channel_name(CommandLine::ForCurrentProcess()->GetSwitchValueASCII( + switches::kProcessChannelID)), + use_mojo_channel(mojo) {} + + ChildThread::ChildThreadMessageRouter::ChildThreadMessageRouter( IPC::Sender* sender) : sender_(sender) {} @@ -204,20 +216,43 @@ ChildThread::ChildThread() : router_(this), channel_connected_factory_(this), in_browser_process_(false) { - channel_name_ = CommandLine::ForCurrentProcess()->GetSwitchValueASCII( - switches::kProcessChannelID); - Init(); + Init(Options()); } -ChildThread::ChildThread(const std::string& channel_name) - : channel_name_(channel_name), - router_(this), +ChildThread::ChildThread(const Options& options) + : router_(this), channel_connected_factory_(this), in_browser_process_(true) { - Init(); + Init(options); } -void ChildThread::Init() { +scoped_ptr<IPC::SyncChannel> ChildThread::CreateChannel(bool use_mojo_channel) { + if (use_mojo_channel) { + VLOG(1) << "Mojo is enabled on child"; + return IPC::SyncChannel::Create( + IPC::ChannelMojo::CreateFactory( + channel_name_, + IPC::Channel::MODE_CLIENT, + ChildProcess::current()->io_message_loop_proxy()), + this, + ChildProcess::current()->io_message_loop_proxy(), + true, + ChildProcess::current()->GetShutDownEvent()); + } + + VLOG(1) << "Mojo is disabled on child"; + return IPC::SyncChannel::Create( + channel_name_, + IPC::Channel::MODE_CLIENT, + this, + ChildProcess::current()->io_message_loop_proxy(), + true, + ChildProcess::current()->GetShutDownEvent()); +} + +void ChildThread::Init(const Options& options) { + channel_name_ = options.channel_name; + g_lazy_tls.Pointer()->Set(this); on_channel_error_called_ = false; message_loop_ = base::MessageLoop::current(); @@ -227,13 +262,7 @@ void ChildThread::Init() { // the logger, and the logger does not like being created on the IO thread. IPC::Logging::GetInstance(); #endif - channel_ = - IPC::SyncChannel::Create(channel_name_, - IPC::Channel::MODE_CLIENT, - this, - ChildProcess::current()->io_message_loop_proxy(), - true, - ChildProcess::current()->GetShutDownEvent()); + channel_ = CreateChannel(options.use_mojo_channel); #ifdef IPC_MESSAGE_LOG_ENABLED if (!in_browser_process_) IPC::Logging::GetInstance()->SetIPCSender(this); diff --git a/content/child/child_thread.h b/content/child/child_thread.h index 8bad5ef..88b52ab 100644 --- a/content/child/child_thread.h +++ b/content/child/child_thread.h @@ -53,10 +53,20 @@ struct RequestInfo; // The main thread of a child process derives from this class. class CONTENT_EXPORT ChildThread : public IPC::Listener, public IPC::Sender { public: + struct CONTENT_EXPORT Options { + Options(); + explicit Options(bool mojo); + Options(std::string name, bool mojo) + : channel_name(name), use_mojo_channel(mojo) {} + + std::string channel_name; + bool use_mojo_channel; + }; + // Creates the thread. ChildThread(); // Used for single-process mode and for in process gpu mode. - explicit ChildThread(const std::string& channel_name); + explicit ChildThread(const Options& options); // ChildProcess::main_thread() is reset after Shutdown(), and before the // destructor, so any subsystem that relies on ChildProcess::main_thread() // must be terminated before Shutdown returns. In particular, if a subsystem @@ -173,7 +183,8 @@ class CONTENT_EXPORT ChildThread : public IPC::Listener, public IPC::Sender { IPC::Sender* const sender_; }; - void Init(); + void Init(const Options& options); + scoped_ptr<IPC::SyncChannel> CreateChannel(bool use_mojo_channel); // IPC message handlers. void OnShutdown(); diff --git a/content/common/BUILD.gn b/content/common/BUILD.gn index 0e44b2c..02efe32 100644 --- a/content/common/BUILD.gn +++ b/content/common/BUILD.gn @@ -48,6 +48,7 @@ source_set("common") { deps += [ "//cc", "//ipc", + "//ipc/mojo", "//mojo/environment:chromium", "//mojo/system", # TODO: the dependency on gl_in_process_context should be decoupled from diff --git a/content/content_common.gypi b/content/content_common.gypi index 9d780a2..1d2b409 100644 --- a/content/content_common.gypi +++ b/content/content_common.gypi @@ -553,6 +553,7 @@ '../gpu/gpu.gyp:gpu_ipc', '../gpu/skia_bindings/skia_bindings.gyp:gpu_skia_bindings', '../ipc/ipc.gyp:ipc', + '../ipc/mojo/ipc_mojo.gyp:ipc_mojo', '../media/media.gyp:media', '../media/media.gyp:shared_memory_support', '../mojo/mojo_base.gyp:mojo_application_bindings', diff --git a/content/gpu/gpu_child_thread.cc b/content/gpu/gpu_child_thread.cc index 8bfec29..4f77446 100644 --- a/content/gpu/gpu_child_thread.cc +++ b/content/gpu/gpu_child_thread.cc @@ -61,7 +61,7 @@ GpuChildThread::GpuChildThread(GpuWatchdogThread* watchdog_thread, } GpuChildThread::GpuChildThread(const std::string& channel_id) - : ChildThread(channel_id), + : ChildThread(Options(channel_id, false)), dead_on_arrival_(false), in_browser_process_(true) { #if defined(OS_WIN) diff --git a/content/public/common/content_switches.cc b/content/public/common/content_switches.cc index 1b7a1ef..1bc2a24 100644 --- a/content/public/common/content_switches.cc +++ b/content/public/common/content_switches.cc @@ -425,6 +425,10 @@ const char kEnablePreparsedJsCaching[] = "enable-preparsed-js-caching"; const char kEnableRegionBasedColumns[] = "enable-region-based-columns"; +// Replaces renderer-browser IPC channel with ChnanelMojo. +const char kEnableRendererMojoChannel[] = + "enable-renderer-mojo-channel"; + // Enables targeted style recalculation optimizations. const char kEnableTargetedStyleRecalc[] = "enable-targeted-style-recalc"; diff --git a/content/public/common/content_switches.h b/content/public/common/content_switches.h index ae77186..47b5fdc 100644 --- a/content/public/common/content_switches.h +++ b/content/public/common/content_switches.h @@ -124,6 +124,7 @@ CONTENT_EXPORT extern const char kEnablePinch[]; CONTENT_EXPORT extern const char kEnablePreciseMemoryInfo[]; extern const char kEnablePreparsedJsCaching[]; CONTENT_EXPORT extern const char kEnableRegionBasedColumns[]; +CONTENT_EXPORT extern const char kEnableRendererMojoChannel[]; CONTENT_EXPORT extern const char kEnableSandboxLogging[]; extern const char kEnableSeccompFilterSandbox[]; extern const char kEnableSkiaBenchmarking[]; diff --git a/content/renderer/render_thread_impl.cc b/content/renderer/render_thread_impl.cc index 5f4d4fb..e652d22 100644 --- a/content/renderer/render_thread_impl.cc +++ b/content/renderer/render_thread_impl.cc @@ -295,6 +295,11 @@ void CreateRenderFrameSetup(mojo::InterfaceRequest<RenderFrameSetup> request) { mojo::BindToRequest(new RenderFrameSetupImpl(), &request); } +bool ShouldUseMojoChannel() { + return CommandLine::ForCurrentProcess()->HasSwitch( + switches::kEnableRendererMojoChannel); +} + } // namespace // For measuring memory usage after each task. Behind a command line flag. @@ -363,12 +368,13 @@ RenderThreadImpl* RenderThreadImpl::current() { // When we run plugins in process, we actually run them on the render thread, // which means that we need to make the render thread pump UI events. -RenderThreadImpl::RenderThreadImpl() { +RenderThreadImpl::RenderThreadImpl() + : ChildThread(Options(ShouldUseMojoChannel())) { Init(); } RenderThreadImpl::RenderThreadImpl(const std::string& channel_name) - : ChildThread(channel_name) { + : ChildThread(Options(channel_name, ShouldUseMojoChannel())) { Init(); } diff --git a/content/utility/utility_thread_impl.cc b/content/utility/utility_thread_impl.cc index c94312f..bb51842 100644 --- a/content/utility/utility_thread_impl.cc +++ b/content/utility/utility_thread_impl.cc @@ -37,7 +37,7 @@ UtilityThreadImpl::UtilityThreadImpl() : single_process_(false) { } UtilityThreadImpl::UtilityThreadImpl(const std::string& channel_name) - : ChildThread(channel_name), + : ChildThread(Options(channel_name, false)), single_process_(true) { Init(); } diff --git a/ipc/BUILD.gn b/ipc/BUILD.gn index 415c08d..2380596 100644 --- a/ipc/BUILD.gn +++ b/ipc/BUILD.gn @@ -8,6 +8,8 @@ component("ipc") { "file_descriptor_set_posix.h", "ipc_channel.cc", "ipc_channel.h", + "ipc_channel_factory.cc", + "ipc_channel_factory.h", "ipc_channel_common.cc", "ipc_channel_handle.h", "ipc_channel_nacl.cc", @@ -95,8 +97,6 @@ if (!is_android) { "ipc_sync_channel_unittest.cc", "ipc_sync_message_unittest.cc", "ipc_sync_message_unittest.h", - "ipc_test_base.cc", - "ipc_test_base.h", "sync_socket_unittest.cc", "unix_domain_socket_util_unittest.cc", ] @@ -131,8 +131,6 @@ if (!is_android) { test("ipc_perftests") { sources = [ "ipc_perftests.cc", - "ipc_test_base.cc", - "ipc_test_base.h", ] # TODO(brettw) hook up Android testing. @@ -165,6 +163,10 @@ static_library("test_support") { "ipc_multiprocess_test.h", "ipc_test_sink.cc", "ipc_test_sink.h", + "ipc_test_base.cc", + "ipc_test_base.h", + "ipc_test_channel_listener.h", + "ipc_test_channel_listener.cc", ] deps = [ ":ipc", diff --git a/ipc/ipc.gyp b/ipc/ipc.gyp index a6e9269..6d90ab6 100644 --- a/ipc/ipc.gyp +++ b/ipc/ipc.gyp @@ -55,8 +55,6 @@ 'ipc_sync_channel_unittest.cc', 'ipc_sync_message_unittest.cc', 'ipc_sync_message_unittest.h', - 'ipc_test_base.cc', - 'ipc_test_base.h', 'run_all_unittests.cc', 'sync_socket_unittest.cc', 'unix_domain_socket_util_unittest.cc', @@ -132,6 +130,10 @@ 'sources': [ 'ipc_multiprocess_test.cc', 'ipc_multiprocess_test.h', + 'ipc_test_base.cc', + 'ipc_test_base.h', + 'ipc_test_channel_listener.cc', + 'ipc_test_channel_listener.h', 'ipc_test_sink.cc', 'ipc_test_sink.h', ], diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi index 5cf5051..5f68757 100644 --- a/ipc/ipc.gypi +++ b/ipc/ipc.gypi @@ -15,6 +15,8 @@ 'file_descriptor_set_posix.h', 'ipc_channel.cc', 'ipc_channel.h', + 'ipc_channel_factory.cc', + 'ipc_channel_factory.h', 'ipc_channel_common.cc', 'ipc_channel_handle.h', 'ipc_channel_nacl.cc', diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h index bca3ea0..fbd4ae0 100644 --- a/ipc/ipc_channel.h +++ b/ipc/ipc_channel.h @@ -170,6 +170,14 @@ class IPC_EXPORT Channel : public Sender { // listener. virtual base::ProcessId GetPeerPID() const = 0; + // Get its own process id. This value is told to the peer. + virtual base::ProcessId GetSelfPID() const = 0; + + // Return connected ChannelHandle which the channel has owned. + // This method transfers the ownership to the caller + // so the channel isn't valid after the call. + virtual ChannelHandle TakePipeHandle() WARN_UNUSED_RESULT = 0; + // Send a message over the Channel to the listener on the other end. // // |message| must be allocated using operator new. This object will be diff --git a/ipc/ipc_channel_factory.cc b/ipc/ipc_channel_factory.cc new file mode 100644 index 0000000..4cb1790 --- /dev/null +++ b/ipc/ipc_channel_factory.cc @@ -0,0 +1,42 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/ipc_channel_factory.h" + +namespace IPC { + +namespace { + +class PlatformChannelFactory : public ChannelFactory { + public: + PlatformChannelFactory(ChannelHandle handle, + Channel::Mode mode) + : handle_(handle), mode_(mode) { + } + + virtual std::string GetName() const OVERRIDE { + return handle_.name; + } + + virtual scoped_ptr<Channel> BuildChannel( + Listener* listener) OVERRIDE { + return Channel::Create(handle_, mode_, listener); + } + + private: + ChannelHandle handle_; + Channel::Mode mode_; + + DISALLOW_COPY_AND_ASSIGN(PlatformChannelFactory); +}; + +} // namespace + +// static +scoped_ptr<ChannelFactory> ChannelFactory::Create( + const ChannelHandle& handle, Channel::Mode mode) { + return scoped_ptr<ChannelFactory>(new PlatformChannelFactory(handle, mode)); +} + +} // namespace IPC diff --git a/ipc/ipc_channel_factory.h b/ipc/ipc_channel_factory.h new file mode 100644 index 0000000..30bfd8c --- /dev/null +++ b/ipc/ipc_channel_factory.h @@ -0,0 +1,33 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_CHANNEL_FACTORY_H_ +#define IPC_IPC_CHANNEL_FACTORY_H_ + +#include <string> +#include <vector> + +#include "base/memory/scoped_ptr.h" +#include "ipc/ipc_channel.h" + +namespace IPC { + +// Encapsulates how a Channel is created. A ChannelFactory can be +// passed to the constructor of ChannelProxy or SyncChannel to tell them +// how to create underlying channel. +class ChannelFactory { + public: + // Creates a factory for "native" channel built through + // IPC::Channel::Create(). + static scoped_ptr<ChannelFactory> Create( + const ChannelHandle& handle, Channel::Mode mode); + + virtual ~ChannelFactory() { } + virtual std::string GetName() const = 0; + virtual scoped_ptr<Channel> BuildChannel(Listener* listener) = 0; +}; + +} // namespace IPC + +#endif // IPC_IPC_CHANNEL_FACTORY_H_ diff --git a/ipc/ipc_channel_nacl.cc b/ipc/ipc_channel_nacl.cc index 0928ba6..704f7d8 100644 --- a/ipc/ipc_channel_nacl.cc +++ b/ipc/ipc_channel_nacl.cc @@ -145,6 +145,15 @@ base::ProcessId ChannelNacl::GetPeerPID() const { return -1; } +base::ProcessId ChannelNacl::GetSelfPID() const { + return -1; +} + +ChannelHandle ChannelNacl::TakePipeHandle() { + // NACL doesn't have any platform-level handles. + return ChannelHandle(); +} + bool ChannelNacl::Connect() { if (pipe_ == -1) { DLOG(WARNING) << "Channel creation failed: " << pipe_name_; diff --git a/ipc/ipc_channel_nacl.h b/ipc/ipc_channel_nacl.h index c47892d..e217b88 100644 --- a/ipc/ipc_channel_nacl.h +++ b/ipc/ipc_channel_nacl.h @@ -42,6 +42,8 @@ class ChannelNacl : public Channel, // Channel implementation. virtual base::ProcessId GetPeerPID() const OVERRIDE; + virtual base::ProcessId GetSelfPID() const OVERRIDE; + virtual ChannelHandle TakePipeHandle() OVERRIDE; virtual bool Connect() OVERRIDE; virtual void Close() OVERRIDE; virtual bool Send(Message* message) OVERRIDE; diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc index 8ddf73a..ac9de55 100644 --- a/ipc/ipc_channel_posix.cc +++ b/ipc/ipc_channel_posix.cc @@ -747,7 +747,7 @@ void ChannelPosix::ClosePipeOnError() { } } -int ChannelPosix::GetHelloMessageProcId() { +int ChannelPosix::GetHelloMessageProcId() const { int pid = base::GetCurrentProcId(); #if defined(OS_LINUX) // Our process may be in a sandbox with a separate PID namespace. @@ -1050,6 +1050,17 @@ base::ProcessId ChannelPosix::GetPeerPID() const { return peer_pid_; } +base::ProcessId ChannelPosix::GetSelfPID() const { + return GetHelloMessageProcId(); +} + +ChannelHandle ChannelPosix::TakePipeHandle() { + ChannelHandle handle = ChannelHandle(pipe_name_, + base::FileDescriptor(pipe_, false)); + pipe_ = -1; + return handle; +} + //------------------------------------------------------------------------------ // Channel's methods diff --git a/ipc/ipc_channel_posix.h b/ipc/ipc_channel_posix.h index 7f17b2f..a235739 100644 --- a/ipc/ipc_channel_posix.h +++ b/ipc/ipc_channel_posix.h @@ -62,6 +62,8 @@ class IPC_EXPORT ChannelPosix : public Channel, virtual void Close() OVERRIDE; virtual bool Send(Message* message) OVERRIDE; virtual base::ProcessId GetPeerPID() const OVERRIDE; + virtual base::ProcessId GetSelfPID() const OVERRIDE; + virtual ChannelHandle TakePipeHandle() OVERRIDE; virtual int GetClientFileDescriptor() const OVERRIDE; virtual int TakeClientFileDescriptor() OVERRIDE; @@ -94,7 +96,7 @@ class IPC_EXPORT ChannelPosix : public Channel, bool AcceptConnection(); void ClosePipeOnError(); - int GetHelloMessageProcId(); + int GetHelloMessageProcId() const; void QueueHelloMessage(); void CloseFileDescriptors(Message* msg); void QueueCloseFDMessage(int fd, int hops); diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc index 7441c65..2ea722d 100644 --- a/ipc/ipc_channel_proxy.cc +++ b/ipc/ipc_channel_proxy.cc @@ -11,6 +11,7 @@ #include "base/memory/scoped_ptr.h" #include "base/single_thread_task_runner.h" #include "base/thread_task_runner_handle.h" +#include "ipc/ipc_channel_factory.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_logging.h" #include "ipc/ipc_message_macros.h" @@ -48,11 +49,10 @@ void ChannelProxy::Context::ClearIPCTaskRunner() { ipc_task_runner_ = NULL; } -void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, - const Channel::Mode& mode) { +void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) { DCHECK(!channel_); - channel_id_ = handle.name; - channel_ = Channel::Create(handle, mode, this); + channel_id_ = factory->GetName(); + channel_ = factory->BuildChannel(this); } bool ChannelProxy::Context::TryFilters(const Message& message) { @@ -315,6 +315,16 @@ scoped_ptr<ChannelProxy> ChannelProxy::Create( return channel.Pass(); } +// static +scoped_ptr<ChannelProxy> ChannelProxy::Create( + scoped_ptr<ChannelFactory> factory, + Listener* listener, + base::SingleThreadTaskRunner* ipc_task_runner) { + scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner)); + channel->Init(factory.Pass(), true); + return channel.Pass(); +} + ChannelProxy::ChannelProxy(Context* context) : context_(context), did_init_(false) { @@ -334,8 +344,6 @@ ChannelProxy::~ChannelProxy() { void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, Channel::Mode mode, bool create_pipe_now) { - DCHECK(CalledOnValidThread()); - DCHECK(!did_init_); #if defined(OS_POSIX) // When we are creating a server on POSIX, we need its file descriptor // to be created immediately so that it can be accessed and passed @@ -345,17 +353,25 @@ void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, create_pipe_now = true; } #endif // defined(OS_POSIX) + Init(ChannelFactory::Create(channel_handle, mode), + create_pipe_now); +} + +void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory, + bool create_pipe_now) { + DCHECK(CalledOnValidThread()); + DCHECK(!did_init_); if (create_pipe_now) { // Create the channel immediately. This effectively sets up the // low-level pipe so that the client can connect. Without creating // the pipe immediately, it is possible for a listener to attempt // to connect and get an error since the pipe doesn't exist yet. - context_->CreateChannel(channel_handle, mode); + context_->CreateChannel(factory.Pass()); } else { context_->ipc_task_runner()->PostTask( - FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), - channel_handle, mode)); + FROM_HERE, base::Bind(&Context::CreateChannel, + context_.get(), Passed(factory.Pass()))); } // complete initialization on the background thread diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h index 2e483ac..e76e56f 100644 --- a/ipc/ipc_channel_proxy.h +++ b/ipc/ipc_channel_proxy.h @@ -22,6 +22,7 @@ class SingleThreadTaskRunner; namespace IPC { +class ChannelFactory; class MessageFilter; class MessageFilterRouter; class SendCallbackHelper; @@ -70,6 +71,11 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { Listener* listener, base::SingleThreadTaskRunner* ipc_task_runner); + static scoped_ptr<ChannelProxy> Create( + scoped_ptr<ChannelFactory> factory, + Listener* listener, + base::SingleThreadTaskRunner* ipc_task_runner); + virtual ~ChannelProxy(); // Initializes the channel proxy. Only call this once to initialize a channel @@ -78,6 +84,7 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { // thread. void Init(const IPC::ChannelHandle& channel_handle, Channel::Mode mode, bool create_pipe_now); + void Init(scoped_ptr<ChannelFactory> factory, bool create_pipe_now); // Close the IPC::Channel. This operation completes asynchronously, once the // background thread processes the command to close the channel. It is ok to @@ -171,8 +178,7 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { friend class SendCallbackHelper; // Create the Channel - void CreateChannel(const IPC::ChannelHandle& channel_handle, - const Channel::Mode& mode); + void CreateChannel(scoped_ptr<ChannelFactory> factory); // Methods called on the IO thread. void OnSendMessage(scoped_ptr<Message> message_ptr); diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc index 9a3cc3c..28a889a 100644 --- a/ipc/ipc_channel_reader.cc +++ b/ipc/ipc_channel_reader.cc @@ -38,13 +38,13 @@ bool ChannelReader::AsyncReadComplete(int bytes_read) { return DispatchInputData(input_buf_, bytes_read); } -bool ChannelReader::IsInternalMessage(const Message& m) const { +bool ChannelReader::IsInternalMessage(const Message& m) { return m.routing_id() == MSG_ROUTING_NONE && m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && m.type() <= Channel::HELLO_MESSAGE_TYPE; } -bool ChannelReader::IsHelloMessage(const Message& m) const { +bool ChannelReader::IsHelloMessage(const Message& m) { return m.routing_id() == MSG_ROUTING_NONE && m.type() == Channel::HELLO_MESSAGE_TYPE; } diff --git a/ipc/ipc_channel_reader.h b/ipc/ipc_channel_reader.h index 1303846..9dec8c1 100644 --- a/ipc/ipc_channel_reader.h +++ b/ipc/ipc_channel_reader.h @@ -7,6 +7,7 @@ #include "base/basictypes.h" #include "ipc/ipc_channel.h" +#include "ipc/ipc_export.h" namespace IPC { namespace internal { @@ -44,11 +45,11 @@ class ChannelReader { // Returns true if the given message is internal to the IPC implementation, // like the "hello" message sent on channel set-up. - bool IsInternalMessage(const Message& m) const; + bool IsInternalMessage(const Message& m); // Returns true if the given message is an Hello message // sent on channel set-up. - bool IsHelloMessage(const Message& m) const; + bool IsHelloMessage(const Message& m); protected: enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING }; diff --git a/ipc/ipc_channel_unittest.cc b/ipc/ipc_channel_unittest.cc index b9665db..1f85311 100644 --- a/ipc/ipc_channel_unittest.cc +++ b/ipc/ipc_channel_unittest.cc @@ -15,76 +15,10 @@ #include "base/threading/thread.h" #include "ipc/ipc_message.h" #include "ipc/ipc_test_base.h" +#include "ipc/ipc_test_channel_listener.h" namespace { -const size_t kLongMessageStringNumBytes = 50000; - -static void Send(IPC::Sender* sender, const char* text) { - static int message_index = 0; - - IPC::Message* message = new IPC::Message(0, - 2, - IPC::Message::PRIORITY_NORMAL); - message->WriteInt(message_index++); - message->WriteString(std::string(text)); - - // Make sure we can handle large messages. - char junk[kLongMessageStringNumBytes]; - memset(junk, 'a', sizeof(junk)-1); - junk[sizeof(junk)-1] = 0; - message->WriteString(std::string(junk)); - - // DEBUG: printf("[%u] sending message [%s]\n", GetCurrentProcessId(), text); - sender->Send(message); -} - -// A generic listener that expects messages of a certain type (see -// OnMessageReceived()), and either sends a generic response or quits after the -// 50th message (or on channel error). -class GenericChannelListener : public IPC::Listener { - public: - GenericChannelListener() : sender_(NULL), messages_left_(50) {} - virtual ~GenericChannelListener() {} - - virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE { - PickleIterator iter(message); - - int ignored; - EXPECT_TRUE(iter.ReadInt(&ignored)); - std::string data; - EXPECT_TRUE(iter.ReadString(&data)); - std::string big_string; - EXPECT_TRUE(iter.ReadString(&big_string)); - EXPECT_EQ(kLongMessageStringNumBytes - 1, big_string.length()); - - SendNextMessage(); - return true; - } - - virtual void OnChannelError() OVERRIDE { - // There is a race when closing the channel so the last message may be lost. - EXPECT_LE(messages_left_, 1); - base::MessageLoop::current()->Quit(); - } - - void Init(IPC::Sender* s) { - sender_ = s; - } - - protected: - void SendNextMessage() { - if (--messages_left_ <= 0) - base::MessageLoop::current()->Quit(); - else - Send(sender_, "Foo"); - } - - private: - IPC::Sender* sender_; - int messages_left_; -}; - class IPCChannelTest : public IPCTestBase { }; @@ -124,13 +58,13 @@ TEST_F(IPCChannelTest, ChannelTest) { Init("GenericClient"); // Set up IPC channel and start client. - GenericChannelListener listener; + IPC::TestChannelListener listener; CreateChannel(&listener); listener.Init(sender()); ASSERT_TRUE(ConnectChannel()); ASSERT_TRUE(StartClient()); - Send(sender(), "hello from parent"); + IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent"); // Run message loop. base::MessageLoop::current()->Run(); @@ -149,7 +83,7 @@ TEST_F(IPCChannelTest, ChannelTestExistingPipe) { // Create pipe manually using the standard Chromium name and set up IPC // channel. - GenericChannelListener listener; + IPC::TestChannelListener listener; std::string name("\\\\.\\pipe\\chrome."); name.append(GetChannelName("GenericClient")); HANDLE pipe = CreateNamedPipeA(name.c_str(), @@ -169,7 +103,7 @@ TEST_F(IPCChannelTest, ChannelTestExistingPipe) { ASSERT_TRUE(ConnectChannel()); ASSERT_TRUE(StartClient()); - Send(sender(), "hello from parent"); + IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent"); // Run message loop. base::MessageLoop::current()->Run(); @@ -191,13 +125,13 @@ TEST_F(IPCChannelTest, ChannelProxyTest) { thread.StartWithOptions(options); // Set up IPC channel proxy. - GenericChannelListener listener; + IPC::TestChannelListener listener; CreateChannelProxy(&listener, thread.message_loop_proxy().get()); listener.Init(sender()); ASSERT_TRUE(StartClient()); - Send(sender(), "hello from parent"); + IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent"); // Run message loop. base::MessageLoop::current()->Run(); @@ -209,7 +143,7 @@ TEST_F(IPCChannelTest, ChannelProxyTest) { thread.Stop(); } -class ChannelListenerWithOnConnectedSend : public GenericChannelListener { +class ChannelListenerWithOnConnectedSend : public IPC::TestChannelListener { public: ChannelListenerWithOnConnectedSend() {} virtual ~ChannelListenerWithOnConnectedSend() {} @@ -237,7 +171,7 @@ TEST_F(IPCChannelTest, MAYBE_SendMessageInChannelConnected) { ASSERT_TRUE(ConnectChannel()); ASSERT_TRUE(StartClient()); - Send(sender(), "hello from parent"); + IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent"); // Run message loop. base::MessageLoop::current()->Run(); @@ -251,7 +185,7 @@ TEST_F(IPCChannelTest, MAYBE_SendMessageInChannelConnected) { MULTIPROCESS_IPC_TEST_CLIENT_MAIN(GenericClient) { base::MessageLoopForIO main_message_loop; - GenericChannelListener listener; + IPC::TestChannelListener listener; // Set up IPC channel. scoped_ptr<IPC::Channel> channel(IPC::Channel::CreateClient( @@ -259,7 +193,7 @@ MULTIPROCESS_IPC_TEST_CLIENT_MAIN(GenericClient) { &listener)); CHECK(channel->Connect()); listener.Init(channel.get()); - Send(channel.get(), "hello from child"); + IPC::TestChannelListener::SendOneMessage(channel.get(), "hello from child"); base::MessageLoop::current()->Run(); return 0; diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc index 0dcde17..eabf85e 100644 --- a/ipc/ipc_channel_win.cc +++ b/ipc/ipc_channel_win.cc @@ -137,6 +137,16 @@ base::ProcessId ChannelWin::GetPeerPID() const { return peer_pid_; } +base::ProcessId ChannelWin::GetSelfPID() const { + return GetCurrentProcessId(); +} + +ChannelHandle ChannelWin::TakePipeHandle() { + ChannelHandle handle = ChannelHandle(pipe_); + pipe_ = INVALID_HANDLE_VALUE; + return handle; +} + // static bool ChannelWin::IsNamedServerInitialized( const std::string& channel_id) { diff --git a/ipc/ipc_channel_win.h b/ipc/ipc_channel_win.h index bb20950..2bbb294 100644 --- a/ipc/ipc_channel_win.h +++ b/ipc/ipc_channel_win.h @@ -35,6 +35,8 @@ class ChannelWin : public Channel, virtual void Close() OVERRIDE; virtual bool Send(Message* message) OVERRIDE; virtual base::ProcessId GetPeerPID() const OVERRIDE; + virtual base::ProcessId GetSelfPID() const OVERRIDE; + virtual ChannelHandle TakePipeHandle() OVERRIDE; static bool IsNamedServerInitialized(const std::string& channel_id); diff --git a/ipc/ipc_export.h b/ipc/ipc_export.h index 776b3ee..9f8411c 100644 --- a/ipc/ipc_export.h +++ b/ipc/ipc_export.h @@ -17,16 +17,31 @@ #define IPC_EXPORT __declspec(dllimport) #endif // defined(IPC_IMPLEMENTATION) +#if defined(IPC_MOJO_IMPLEMENTATION) +#define IPC_MOJO_EXPORT __declspec(dllexport) +#else +#define IPC_MOJO_EXPORT __declspec(dllimport) +#endif // defined(IPC_MOJO_IMPLEMENTATION) + #else // defined(WIN32) + #if defined(IPC_IMPLEMENTATION) #define IPC_EXPORT __attribute__((visibility("default"))) #else #define IPC_EXPORT #endif + +#if defined(IPC_MOJO_IMPLEMENTATION) +#define IPC_MOJO_EXPORT __attribute__((visibility("default"))) +#else +#define IPC_MOJO_EXPORT +#endif + #endif #else // defined(COMPONENT_BUILD) #define IPC_EXPORT +#define IPC_MOJO_EXPORT #endif #endif // IPC_IPC_EXPORT_H_ diff --git a/ipc/ipc_message.h b/ipc/ipc_message.h index fc37d72..198e6c0 100644 --- a/ipc/ipc_message.h +++ b/ipc/ipc_message.h @@ -222,6 +222,7 @@ class IPC_EXPORT Message : public Pickle { protected: friend class Channel; + friend class ChannelMojo; friend class ChannelNacl; friend class ChannelPosix; friend class ChannelWin; diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc index a7ed230..0c4702c 100644 --- a/ipc/ipc_sync_channel.cc +++ b/ipc/ipc_sync_channel.cc @@ -13,6 +13,7 @@ #include "base/synchronization/waitable_event_watcher.h" #include "base/thread_task_runner_handle.h" #include "base/threading/thread_local.h" +#include "ipc/ipc_channel_factory.h" #include "ipc/ipc_logging.h" #include "ipc/ipc_message_macros.h" #include "ipc/ipc_sync_message.h" @@ -420,6 +421,19 @@ scoped_ptr<SyncChannel> SyncChannel::Create( // static scoped_ptr<SyncChannel> SyncChannel::Create( + scoped_ptr<ChannelFactory> factory, + Listener* listener, + base::SingleThreadTaskRunner* ipc_task_runner, + bool create_pipe_now, + base::WaitableEvent* shutdown_event) { + scoped_ptr<SyncChannel> channel = + Create(listener, ipc_task_runner, shutdown_event); + channel->Init(factory.Pass(), create_pipe_now); + return channel.Pass(); +} + +// static +scoped_ptr<SyncChannel> SyncChannel::Create( Listener* listener, base::SingleThreadTaskRunner* ipc_task_runner, WaitableEvent* shutdown_event) { diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h index 8984184..f8207ce 100644 --- a/ipc/ipc_sync_channel.h +++ b/ipc/ipc_sync_channel.h @@ -23,6 +23,7 @@ class WaitableEvent; namespace IPC { class SyncMessage; +class ChannelFactory; // This is similar to ChannelProxy, with the added feature of supporting sending // synchronous messages. @@ -75,6 +76,13 @@ class IPC_EXPORT SyncChannel : public ChannelProxy { bool create_pipe_now, base::WaitableEvent* shutdown_event); + static scoped_ptr<SyncChannel> Create( + scoped_ptr<ChannelFactory> factory, + Listener* listener, + base::SingleThreadTaskRunner* ipc_task_runner, + bool create_pipe_now, + base::WaitableEvent* shutdown_event); + // Creates an uninitialized sync channel. Call ChannelProxy::Init to // initialize the channel. This two-step setup allows message filters to be // added before any messages are sent or received. diff --git a/ipc/ipc_test_base.cc b/ipc/ipc_test_base.cc index 589ee98..f893c28 100644 --- a/ipc/ipc_test_base.cc +++ b/ipc/ipc_test_base.cc @@ -59,6 +59,15 @@ bool IPCTestBase::ConnectChannel() { return channel_->Connect(); } +scoped_ptr<IPC::Channel> IPCTestBase::ReleaseChannel() { + return channel_.Pass(); +} + +void IPCTestBase::SetChannel(scoped_ptr<IPC::Channel> channel) { + channel_ = channel.Pass(); +} + + void IPCTestBase::DestroyChannel() { DCHECK(channel_.get()); channel_.reset(); @@ -120,3 +129,7 @@ bool IPCTestBase::WaitForClientShutdown() { client_process_ = base::kNullProcessHandle; return rv; } + +scoped_refptr<base::TaskRunner> IPCTestBase::io_thread_task_runner() { + return message_loop_->message_loop_proxy(); +} diff --git a/ipc/ipc_test_base.h b/ipc/ipc_test_base.h index 5bd3e96..ce3328a 100644 --- a/ipc/ipc_test_base.h +++ b/ipc/ipc_test_base.h @@ -47,6 +47,11 @@ class IPCTestBase : public base::MultiProcessTest { bool ConnectChannel(); void DestroyChannel(); + // Releases or replaces existing channel. + // These are useful for testing specific types of channel subclasses. + scoped_ptr<IPC::Channel> ReleaseChannel(); + void SetChannel(scoped_ptr<IPC::Channel> channel); + // Use this instead of CreateChannel() if you want to use some different // channel specification (then use ConnectChannel() as usual). void CreateChannelFromChannelHandle(const IPC::ChannelHandle& channel_handle, @@ -81,6 +86,7 @@ class IPCTestBase : public base::MultiProcessTest { IPC::ChannelProxy* channel_proxy() { return channel_proxy_.get(); } const base::ProcessHandle& client_process() const { return client_process_; } + scoped_refptr<base::TaskRunner> io_thread_task_runner(); private: std::string test_client_name_; diff --git a/ipc/ipc_test_channel_listener.cc b/ipc/ipc_test_channel_listener.cc new file mode 100644 index 0000000..e98f6b7 --- /dev/null +++ b/ipc/ipc_test_channel_listener.cc @@ -0,0 +1,62 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/ipc_test_channel_listener.h" + +#include "ipc/ipc_message.h" +#include "ipc/ipc_sender.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace IPC { + +// static +void TestChannelListener::SendOneMessage(IPC::Sender* sender, + const char* text) { + static int message_index = 0; + + IPC::Message* message = new IPC::Message(0, + 2, + IPC::Message::PRIORITY_NORMAL); + message->WriteInt(message_index++); + message->WriteString(std::string(text)); + + // Make sure we can handle large messages. + char junk[kLongMessageStringNumBytes]; + memset(junk, 'a', sizeof(junk)-1); + junk[sizeof(junk)-1] = 0; + message->WriteString(std::string(junk)); + + sender->Send(message); +} + + +bool TestChannelListener::OnMessageReceived(const IPC::Message& message) { + PickleIterator iter(message); + + int ignored; + EXPECT_TRUE(iter.ReadInt(&ignored)); + std::string data; + EXPECT_TRUE(iter.ReadString(&data)); + std::string big_string; + EXPECT_TRUE(iter.ReadString(&big_string)); + EXPECT_EQ(kLongMessageStringNumBytes - 1, big_string.length()); + + SendNextMessage(); + return true; +} + +void TestChannelListener::OnChannelError() { + // There is a race when closing the channel so the last message may be lost. + EXPECT_LE(messages_left_, 1); + base::MessageLoop::current()->Quit(); +} + +void TestChannelListener::SendNextMessage() { + if (--messages_left_ <= 0) + base::MessageLoop::current()->Quit(); + else + SendOneMessage(sender_, "Foo"); +} + +} diff --git a/ipc/ipc_test_channel_listener.h b/ipc/ipc_test_channel_listener.h new file mode 100644 index 0000000..047e15d --- /dev/null +++ b/ipc/ipc_test_channel_listener.h @@ -0,0 +1,44 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_TEST_CHANNEL_LISTENER_H_ +#define IPC_IPC_TEST_CHANNEL_LISTENER_H_ + +#include "ipc/ipc_listener.h" + +namespace IPC { + +class Sender; + +// A generic listener that expects messages of a certain type (see +// OnMessageReceived()), and either sends a generic response or quits after the +// 50th message (or on channel error). +class TestChannelListener : public Listener { + public: + static const size_t kLongMessageStringNumBytes = 50000; + static void SendOneMessage(Sender* sender, const char* text); + + TestChannelListener() : sender_(NULL), messages_left_(50) {} + virtual ~TestChannelListener() {} + + virtual bool OnMessageReceived(const Message& message) OVERRIDE; + virtual void OnChannelError() OVERRIDE; + + void Init(Sender* s) { + sender_ = s; + } + + bool HasSentAll() const { return 0 == messages_left_; } + + protected: + void SendNextMessage(); + + private: + Sender* sender_; + int messages_left_; +}; + +} + +#endif // IPC_IPC_TEST_CHANNEL_LISTENER_H_ diff --git a/ipc/ipc_test_sink.cc b/ipc/ipc_test_sink.cc index 9e9d1fd..b1a21bf 100644 --- a/ipc/ipc_test_sink.cc +++ b/ipc/ipc_test_sink.cc @@ -35,6 +35,15 @@ base::ProcessId TestSink::GetPeerPID() const { return base::ProcessId(); } +base::ProcessId TestSink::GetSelfPID() const { + NOTIMPLEMENTED(); + return base::ProcessId(); +} + +ChannelHandle TestSink::TakePipeHandle() { + NOTIMPLEMENTED(); + return ChannelHandle(); +} bool TestSink::OnMessageReceived(const Message& msg) { ObserverListBase<Listener>::Iterator it(filter_list_); diff --git a/ipc/ipc_test_sink.h b/ipc/ipc_test_sink.h index 1a213ee1..54bab34 100644 --- a/ipc/ipc_test_sink.h +++ b/ipc/ipc_test_sink.h @@ -81,6 +81,8 @@ class TestSink : public Channel { virtual bool Connect() OVERRIDE WARN_UNUSED_RESULT; virtual void Close() OVERRIDE; virtual base::ProcessId GetPeerPID() const OVERRIDE; + virtual base::ProcessId GetSelfPID() const OVERRIDE; + virtual ChannelHandle TakePipeHandle() OVERRIDE; #if defined(OS_POSIX) && !defined(OS_NACL) virtual int GetClientFileDescriptor() const OVERRIDE; diff --git a/ipc/mojo/BUILD.gn b/ipc/mojo/BUILD.gn new file mode 100644 index 0000000..accc89d --- /dev/null +++ b/ipc/mojo/BUILD.gn @@ -0,0 +1,41 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +component("mojo") { + sources = [ + "ipc_channel_mojo.cc", + "ipc_channel_mojo.h", + "ipc_message_pipe_reader.cc", + "ipc_message_pipe_reader.h", + ] + + deps = [ + "//base", + "//ipc", + "//mojo/public/cpp/bindings", + "//mojo/system", + # TODO(viettrungluu): Needed for base/lazy_instance.h, which is suspect. + "//base/third_party/dynamic_annotations", + ] +} + +test("ipc_mojo_unittests") { + sources = [ + "ipc_channel_mojo_unittest.cc", + "run_all_unittests.cc", + ] + + deps = [ + "//base", + # TODO(viettrungluu): Needed for base/lazy_instance.h, which is suspect. + "//base/test:test_support", + "//base/third_party/dynamic_annotations", + "//ipc", + "//ipc:test_support", + "//ipc/mojo", + "//mojo/environment:chromium", + "//mojo/system", + "//url", + ] +} diff --git a/ipc/mojo/DEPS b/ipc/mojo/DEPS new file mode 100644 index 0000000..6706a1f --- /dev/null +++ b/ipc/mojo/DEPS @@ -0,0 +1,4 @@ +include_rules = [ + "+mojo/public", + "+mojo/embedder", +]
\ No newline at end of file diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc new file mode 100644 index 0000000..27d35b7 --- /dev/null +++ b/ipc/mojo/ipc_channel_mojo.cc @@ -0,0 +1,596 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/mojo/ipc_channel_mojo.h" + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/lazy_instance.h" +#include "ipc/ipc_listener.h" +#include "mojo/embedder/embedder.h" + +#if defined(OS_POSIX) && !defined(OS_NACL) +#include "ipc/file_descriptor_set_posix.h" +#endif + +namespace IPC { + +namespace { + +// IPC::Listener for bootstrap channels. +// It should never receive any message. +class NullListener : public Listener { + public: + virtual bool OnMessageReceived(const Message&) OVERRIDE { + NOTREACHED(); + return false; + } + + virtual void OnChannelConnected(int32 peer_pid) OVERRIDE { + NOTREACHED(); + } + + virtual void OnChannelError() OVERRIDE { + NOTREACHED(); + } + + virtual void OnBadMessageReceived(const Message& message) OVERRIDE { + NOTREACHED(); + } +}; + +base::LazyInstance<NullListener> g_null_listener = LAZY_INSTANCE_INITIALIZER; + +class MojoChannelFactory : public ChannelFactory { + public: + MojoChannelFactory( + ChannelHandle channel_handle, + Channel::Mode mode, + scoped_refptr<base::TaskRunner> io_thread_task_runner) + : channel_handle_(channel_handle), + mode_(mode), + io_thread_task_runner_(io_thread_task_runner) { + } + + virtual std::string GetName() const OVERRIDE { + return channel_handle_.name; + } + + virtual scoped_ptr<Channel> BuildChannel(Listener* listener) OVERRIDE { + return ChannelMojo::Create( + channel_handle_, + mode_, + listener, + io_thread_task_runner_).PassAs<Channel>(); + } + + private: + ChannelHandle channel_handle_; + Channel::Mode mode_; + scoped_refptr<base::TaskRunner> io_thread_task_runner_; +}; + +mojo::embedder::PlatformHandle ToPlatformHandle( + const ChannelHandle& handle) { +#if defined(OS_POSIX) && !defined(OS_NACL) + return mojo::embedder::PlatformHandle(handle.socket.fd); +#elif defined(OS_WIN) + return mojo::embedder::PlatformHandle(handle.pipe.handle); +#else +#error "Unsupported Platform!" +#endif +} + +//------------------------------------------------------------------------------ + +// TODO(morrita): This should be built using higher-level Mojo construct +// for clarity and extensibility. +class HelloMessage { + public: + static Pickle CreateRequest(int32 pid) { + Pickle request; + request.WriteString(kHelloRequestMagic); + request.WriteInt(pid); + return request; + } + + static bool ReadRequest(Pickle& pickle, int32* pid) { + PickleIterator iter(pickle); + std::string hello; + if (!iter.ReadString(&hello)) { + DLOG(WARNING) << "Failed to Read magic string."; + return false; + } + + if (hello != kHelloRequestMagic) { + DLOG(WARNING) << "Magic mismatch:" << hello; + return false; + } + + int read_pid; + if (!iter.ReadInt(&read_pid)) { + DLOG(WARNING) << "Failed to Read PID."; + return false; + } + + *pid = read_pid; + return true; + } + + static Pickle CreateResponse(int32 pid) { + Pickle request; + request.WriteString(kHelloResponseMagic); + request.WriteInt(pid); + return request; + } + + static bool ReadResponse(Pickle& pickle, int32* pid) { + PickleIterator iter(pickle); + std::string hello; + if (!iter.ReadString(&hello)) { + DLOG(WARNING) << "Failed to read magic string."; + return false; + } + + if (hello != kHelloResponseMagic) { + DLOG(WARNING) << "Magic mismatch:" << hello; + return false; + } + + int read_pid; + if (!iter.ReadInt(&read_pid)) { + DLOG(WARNING) << "Failed to read PID."; + return false; + } + + *pid = read_pid; + return true; + } + + private: + static const char* kHelloRequestMagic; + static const char* kHelloResponseMagic; +}; + +const char* HelloMessage::kHelloRequestMagic = "MREQ"; +const char* HelloMessage::kHelloResponseMagic = "MRES"; + +} // namespace + +//------------------------------------------------------------------------------ + +// A MessagePipeReader implemenation for IPC::Message communication. +class ChannelMojo::MessageReader : public internal::MessagePipeReader { + public: + MessageReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner) + : internal::MessagePipeReader(pipe.Pass()), + owner_(owner) {} + + bool Send(scoped_ptr<Message> message); + virtual void OnMessageReceived() OVERRIDE; + virtual void OnPipeClosed() OVERRIDE; + virtual void OnPipeError(MojoResult error) OVERRIDE; + + private: + ChannelMojo* owner_; +}; + +void ChannelMojo::MessageReader::OnMessageReceived() { + Message message(data_buffer().empty() ? "" : &data_buffer()[0], + static_cast<uint32>(data_buffer().size())); + + std::vector<MojoHandle> handle_buffer; + TakeHandleBuffer(&handle_buffer); +#if defined(OS_POSIX) && !defined(OS_NACL) + for (size_t i = 0; i < handle_buffer.size(); ++i) { + mojo::embedder::ScopedPlatformHandle platform_handle; + MojoResult unwrap_result = mojo::embedder::PassWrappedPlatformHandle( + handle_buffer[i], &platform_handle); + if (unwrap_result != MOJO_RESULT_OK) { + DLOG(WARNING) << "Pipe failed to covert handles. Closing: " + << unwrap_result; + CloseWithError(unwrap_result); + return; + } + + bool ok = message.file_descriptor_set()->Add(platform_handle.release().fd); + DCHECK(ok); + } +#else + DCHECK(handle_buffer.empty()); +#endif + + message.TraceMessageEnd(); + owner_->OnMessageReceived(message); +} + +void ChannelMojo::MessageReader::OnPipeClosed() { + if (!owner_) + return; + owner_->OnPipeClosed(this); + owner_ = NULL; +} + +void ChannelMojo::MessageReader::OnPipeError(MojoResult error) { + if (!owner_) + return; + owner_->OnPipeError(this); +} + +bool ChannelMojo::MessageReader::Send(scoped_ptr<Message> message) { + DCHECK(IsValid()); + + message->TraceMessageBegin(); + std::vector<MojoHandle> handles; +#if defined(OS_POSIX) && !defined(OS_NACL) + if (message->HasFileDescriptors()) { + FileDescriptorSet* fdset = message->file_descriptor_set(); + for (size_t i = 0; i < fdset->size(); ++i) { + MojoHandle wrapped_handle; + MojoResult wrap_result = CreatePlatformHandleWrapper( + mojo::embedder::ScopedPlatformHandle( + mojo::embedder::PlatformHandle( + fdset->GetDescriptorAt(i))), + &wrapped_handle); + if (MOJO_RESULT_OK != wrap_result) { + DLOG(WARNING) << "Pipe failed to wrap handles. Closing: " + << wrap_result; + CloseWithError(wrap_result); + return false; + } + + handles.push_back(wrapped_handle); + } + } +#endif + MojoResult write_result = MojoWriteMessage( + handle(), + message->data(), static_cast<uint32>(message->size()), + handles.empty() ? NULL : &handles[0], + static_cast<uint32>(handles.size()), + MOJO_WRITE_MESSAGE_FLAG_NONE); + if (MOJO_RESULT_OK != write_result) { + CloseWithError(write_result); + return false; + } + + return true; +} + +//------------------------------------------------------------------------------ + +// MessagePipeReader implemenation for control messages. +// Actual message handling is implemented by sublcasses. +class ChannelMojo::ControlReader : public internal::MessagePipeReader { + public: + ControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner) + : internal::MessagePipeReader(pipe.Pass()), + owner_(owner) {} + + virtual bool Connect() { return true; } + virtual void OnPipeClosed() OVERRIDE; + virtual void OnPipeError(MojoResult error) OVERRIDE; + + protected: + ChannelMojo* owner_; +}; + +void ChannelMojo::ControlReader::OnPipeClosed() { + if (!owner_) + return; + owner_->OnPipeClosed(this); + owner_ = NULL; +} + +void ChannelMojo::ControlReader::OnPipeError(MojoResult error) { + if (!owner_) + return; + owner_->OnPipeError(this); +} + +//------------------------------------------------------------------------------ + +// ControlReader for server-side ChannelMojo. +class ChannelMojo::ServerControlReader : public ChannelMojo::ControlReader { + public: + ServerControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner) + : ControlReader(pipe.Pass(), owner) { } + + virtual bool Connect() OVERRIDE; + virtual void OnMessageReceived() OVERRIDE; + + private: + MojoResult SendHelloRequest(); + MojoResult RespondHelloResponse(); + + mojo::ScopedMessagePipeHandle message_pipe_; +}; + +bool ChannelMojo::ServerControlReader::Connect() { + MojoResult result = SendHelloRequest(); + if (result != MOJO_RESULT_OK) { + CloseWithError(result); + return false; + } + + return true; +} + +MojoResult ChannelMojo::ServerControlReader::SendHelloRequest() { + DCHECK(IsValid()); + DCHECK(!message_pipe_.is_valid()); + + mojo::ScopedMessagePipeHandle self; + mojo::ScopedMessagePipeHandle peer; + MojoResult create_result = mojo::CreateMessagePipe( + NULL, &message_pipe_, &peer); + if (MOJO_RESULT_OK != create_result) { + DLOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result; + return create_result; + } + + MojoHandle peer_to_send = peer.get().value(); + Pickle request = HelloMessage::CreateRequest(owner_->GetSelfPID()); + MojoResult write_result = MojoWriteMessage( + handle(), + request.data(), static_cast<uint32>(request.size()), + &peer_to_send, 1, + MOJO_WRITE_MESSAGE_FLAG_NONE); + if (MOJO_RESULT_OK != write_result) { + DLOG(WARNING) << "Writing Hello request failed: " << create_result; + return write_result; + } + + // |peer| is sent and no longer owned by |this|. + (void)peer.release(); + return MOJO_RESULT_OK; +} + +MojoResult ChannelMojo::ServerControlReader::RespondHelloResponse() { + Pickle request(data_buffer().empty() ? "" : &data_buffer()[0], + static_cast<uint32>(data_buffer().size())); + + int32 read_pid = 0; + if (!HelloMessage::ReadResponse(request, &read_pid)) { + DLOG(ERROR) << "Failed to parse Hello response."; + return MOJO_RESULT_UNKNOWN; + } + + base::ProcessId pid = static_cast<base::ProcessId>(read_pid); + owner_->set_peer_pid(pid); + owner_->OnConnected(message_pipe_.Pass()); + return MOJO_RESULT_OK; +} + +void ChannelMojo::ServerControlReader::OnMessageReceived() { + MojoResult result = RespondHelloResponse(); + if (result != MOJO_RESULT_OK) + CloseWithError(result); +} + +//------------------------------------------------------------------------------ + +// ControlReader for client-side ChannelMojo. +class ChannelMojo::ClientControlReader : public ChannelMojo::ControlReader { + public: + ClientControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner) + : ControlReader(pipe.Pass(), owner) {} + + virtual void OnMessageReceived() OVERRIDE; + + private: + MojoResult RespondHelloRequest(MojoHandle message_channel); +}; + +MojoResult ChannelMojo::ClientControlReader::RespondHelloRequest( + MojoHandle message_channel) { + DCHECK(IsValid()); + + mojo::ScopedMessagePipeHandle received_pipe( + (mojo::MessagePipeHandle(message_channel))); + + int32 read_request = 0; + Pickle request(data_buffer().empty() ? "" : &data_buffer()[0], + static_cast<uint32>(data_buffer().size())); + if (!HelloMessage::ReadRequest(request, &read_request)) { + DLOG(ERROR) << "Hello request has wrong magic."; + return MOJO_RESULT_UNKNOWN; + } + + base::ProcessId pid = read_request; + Pickle response = HelloMessage::CreateResponse(owner_->GetSelfPID()); + MojoResult write_result = MojoWriteMessage( + handle(), + response.data(), static_cast<uint32>(response.size()), + NULL, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE); + if (MOJO_RESULT_OK != write_result) { + DLOG(ERROR) << "Writing Hello response failed: " << write_result; + return write_result; + } + + owner_->set_peer_pid(pid); + owner_->OnConnected(received_pipe.Pass()); + return MOJO_RESULT_OK; +} + +void ChannelMojo::ClientControlReader::OnMessageReceived() { + std::vector<MojoHandle> handle_buffer; + TakeHandleBuffer(&handle_buffer); + if (handle_buffer.size() != 1) { + DLOG(ERROR) << "Hello request doesn't contains required handle: " + << handle_buffer.size(); + CloseWithError(MOJO_RESULT_UNKNOWN); + return; + } + + MojoResult result = RespondHelloRequest(handle_buffer[0]); + if (result != MOJO_RESULT_OK) { + DLOG(ERROR) << "Failed to respond Hello request. Closing: " + << result; + CloseWithError(result); + } +} + +//------------------------------------------------------------------------------ + +void ChannelMojo::ChannelInfoDeleter::operator()( + mojo::embedder::ChannelInfo* ptr) const { + mojo::embedder::DestroyChannelOnIOThread(ptr); +} + +//------------------------------------------------------------------------------ + +// static +scoped_ptr<ChannelMojo> ChannelMojo::Create( + scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener, + scoped_refptr<base::TaskRunner> io_thread_task_runner) { + return make_scoped_ptr(new ChannelMojo( + bootstrap.Pass(), mode, listener, io_thread_task_runner)); +} + +// static +scoped_ptr<ChannelMojo> ChannelMojo::Create( + const ChannelHandle &channel_handle, Mode mode, Listener* listener, + scoped_refptr<base::TaskRunner> io_thread_task_runner) { + return Create( + Channel::Create(channel_handle, mode, g_null_listener.Pointer()), + mode, listener, io_thread_task_runner); +} + +// static +scoped_ptr<ChannelFactory> ChannelMojo::CreateFactory( + const ChannelHandle &channel_handle, Mode mode, + scoped_refptr<base::TaskRunner> io_thread_task_runner) { + return make_scoped_ptr( + new MojoChannelFactory( + channel_handle, mode, + io_thread_task_runner)).PassAs<ChannelFactory>(); +} + +ChannelMojo::ChannelMojo( + scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener, + scoped_refptr<base::TaskRunner> io_thread_task_runner) + : weak_factory_(this), + bootstrap_(bootstrap.Pass()), + mode_(mode), listener_(listener), + peer_pid_(base::kNullProcessId) { + DCHECK(mode_ == MODE_SERVER || mode_ == MODE_CLIENT); + mojo::ScopedMessagePipeHandle control_pipe + = mojo::embedder::CreateChannel( + mojo::embedder::ScopedPlatformHandle( + ToPlatformHandle(bootstrap_->TakePipeHandle())), + io_thread_task_runner, + base::Bind(&ChannelMojo::DidCreateChannel, base::Unretained(this)), + io_thread_task_runner); + + // MessagePipeReader, that is crated in InitOnIOThread(), should live only in + // IO thread, but IPC::Channel can be instantiated outside of it. + // So we move the creation to the appropriate thread. + if (base::MessageLoopProxy::current() == io_thread_task_runner) { + InitOnIOThread(control_pipe.Pass()); + } else { + io_thread_task_runner->PostTask( + FROM_HERE, + base::Bind(&ChannelMojo::InitOnIOThread, + weak_factory_.GetWeakPtr(), + base::Passed(control_pipe.Pass()))); + } +} + +ChannelMojo::~ChannelMojo() { + Close(); +} + +void ChannelMojo::InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe) { + control_reader_ = CreateControlReader(control_pipe.Pass()); +} + +scoped_ptr<ChannelMojo::ControlReader> ChannelMojo::CreateControlReader( + mojo::ScopedMessagePipeHandle pipe) { + if (MODE_SERVER == mode_) { + return make_scoped_ptr( + new ServerControlReader(pipe.Pass(), this)).PassAs<ControlReader>(); + } + + DCHECK(mode_ == MODE_CLIENT); + return make_scoped_ptr( + new ClientControlReader(pipe.Pass(), this)).PassAs<ControlReader>(); +} + +bool ChannelMojo::Connect() { + DCHECK(!message_reader_); + return control_reader_->Connect(); +} + +void ChannelMojo::Close() { + control_reader_.reset(); + message_reader_.reset(); + channel_info_.reset(); +} + +void ChannelMojo::OnConnected(mojo::ScopedMessagePipeHandle pipe) { + message_reader_ = make_scoped_ptr(new MessageReader(pipe.Pass(), this)); + + for (size_t i = 0; i < pending_messages_.size(); ++i) { + message_reader_->Send(make_scoped_ptr(pending_messages_[i])); + pending_messages_[i] = NULL; + } + + pending_messages_.clear(); + + listener_->OnChannelConnected(GetPeerPID()); +} + +void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) { + Close(); +} + +void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) { + listener_->OnChannelError(); +} + + +bool ChannelMojo::Send(Message* message) { + if (!message_reader_) { + pending_messages_.push_back(message); + return true; + } + + return message_reader_->Send(make_scoped_ptr(message)); +} + +base::ProcessId ChannelMojo::GetPeerPID() const { + return peer_pid_; +} + +base::ProcessId ChannelMojo::GetSelfPID() const { + return bootstrap_->GetSelfPID(); +} + +ChannelHandle ChannelMojo::TakePipeHandle() { + return bootstrap_->TakePipeHandle(); +} + +void ChannelMojo::DidCreateChannel(mojo::embedder::ChannelInfo* info) { + channel_info_.reset(info); +} + +void ChannelMojo::OnMessageReceived(Message& message) { + listener_->OnMessageReceived(message); + if (message.dispatch_error()) + listener_->OnBadMessageReceived(message); +} + +#if defined(OS_POSIX) && !defined(OS_NACL) +int ChannelMojo::GetClientFileDescriptor() const { + return bootstrap_->GetClientFileDescriptor(); +} + +int ChannelMojo::TakeClientFileDescriptor() { + return bootstrap_->TakeClientFileDescriptor(); +} +#endif // defined(OS_POSIX) && !defined(OS_NACL) + +} // namespace IPC diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h new file mode 100644 index 0000000..b00abc9 --- /dev/null +++ b/ipc/mojo/ipc_channel_mojo.h @@ -0,0 +1,131 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_CHANNEL_MOJO_H_ +#define IPC_IPC_CHANNEL_MOJO_H_ + +#include <vector> + +#include "base/memory/scoped_ptr.h" +#include "base/memory/scoped_vector.h" +#include "base/memory/weak_ptr.h" +#include "ipc/ipc_channel.h" +#include "ipc/ipc_channel_factory.h" +#include "ipc/ipc_export.h" +#include "ipc/mojo/ipc_message_pipe_reader.h" +#include "mojo/public/cpp/system/core.h" + +namespace mojo { +namespace embedder { +struct ChannelInfo; +} +} + +namespace IPC { + +// Mojo-based IPC::Channel implementation over a platform handle. +// +// ChannelMojo builds Mojo MessagePipe using underlying pipe given by +// "bootstrap" IPC::Channel which creates and owns platform pipe like +// named socket. The bootstrap Channel is used only for establishing +// the underlying connection. ChannelMojo takes its handle over once +// the it is made and puts MessagePipe on it. +// +// ChannelMojo has a couple of MessagePipes: +// +// * The first MessagePipe, which is built on top of bootstrap handle, +// is the "control" pipe. It is used to communicate out-of-band +// control messages that aren't visible from IPC::Listener. +// +// * The second MessagePipe, which is created by the server channel +// and sent to client Channel over the control pipe, is used +// to send IPC::Messages as an IPC::Sender. +// +// TODO(morrita): Extract handle creation part of IPC::Channel into +// separate class to clarify what ChannelMojo relies +// on. +// TODO(morrita): Add APIs to create extra MessagePipes to let +// Mojo-based objects talk over this Channel. +// +class IPC_MOJO_EXPORT ChannelMojo : public Channel { + public: + // Create ChannelMojo on top of given |bootstrap| channel. + static scoped_ptr<ChannelMojo> Create( + scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener, + scoped_refptr<base::TaskRunner> io_thread_task_runner); + + // Create ChannelMojo. A bootstrap channel is created as well. + static scoped_ptr<ChannelMojo> Create( + const ChannelHandle &channel_handle, Mode mode, Listener* listener, + scoped_refptr<base::TaskRunner> io_thread_task_runner); + + // Create a factory object for ChannelMojo. + // The factory is used to create Mojo-based ChannelProxy family. + static scoped_ptr<ChannelFactory> CreateFactory( + const ChannelHandle &channel_handle, Mode mode, + scoped_refptr<base::TaskRunner> io_thread_task_runner); + + virtual ~ChannelMojo(); + + // Channel implementation + virtual bool Connect() OVERRIDE; + virtual void Close() OVERRIDE; + virtual bool Send(Message* message) OVERRIDE; + virtual base::ProcessId GetPeerPID() const OVERRIDE; + virtual base::ProcessId GetSelfPID() const OVERRIDE; + virtual ChannelHandle TakePipeHandle() OVERRIDE; + +#if defined(OS_POSIX) && !defined(OS_NACL) + virtual int GetClientFileDescriptor() const OVERRIDE; + virtual int TakeClientFileDescriptor() OVERRIDE; +#endif // defined(OS_POSIX) && !defined(OS_NACL) + + // Called from MessagePipeReader implementations + void OnMessageReceived(Message& message); + void OnConnected(mojo::ScopedMessagePipeHandle pipe); + void OnPipeClosed(internal::MessagePipeReader* reader); + void OnPipeError(internal::MessagePipeReader* reader); + void set_peer_pid(base::ProcessId pid) { peer_pid_ = pid; } + + private: + struct ChannelInfoDeleter { + void operator()(mojo::embedder::ChannelInfo* ptr) const; + }; + + // ChannelMojo needs to kill its MessagePipeReader in delayed manner + // because the channel wants to kill these readers during the + // notifications invoked by them. + typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter; + + class ControlReader; + class ServerControlReader; + class ClientControlReader; + class MessageReader; + + ChannelMojo(scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener, + scoped_refptr<base::TaskRunner> io_thread_task_runner); + + void InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe); + scoped_ptr<ControlReader> CreateControlReader( + mojo::ScopedMessagePipeHandle pipe); + void DidCreateChannel(mojo::embedder::ChannelInfo*); + + base::WeakPtrFactory<ChannelMojo> weak_factory_; + scoped_ptr<Channel> bootstrap_; + Mode mode_; + Listener* listener_; + base::ProcessId peer_pid_; + scoped_ptr<mojo::embedder::ChannelInfo, + ChannelInfoDeleter> channel_info_; + + scoped_ptr<ControlReader, ReaderDeleter> control_reader_; + scoped_ptr<MessageReader, ReaderDeleter> message_reader_; + ScopedVector<Message> pending_messages_; + + DISALLOW_COPY_AND_ASSIGN(ChannelMojo); +}; + +} // namespace IPC + +#endif // IPC_IPC_CHANNEL_MOJO_H_ diff --git a/ipc/mojo/ipc_channel_mojo_unittest.cc b/ipc/mojo/ipc_channel_mojo_unittest.cc new file mode 100644 index 0000000..915029d --- /dev/null +++ b/ipc/mojo/ipc_channel_mojo_unittest.cc @@ -0,0 +1,260 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/mojo/ipc_channel_mojo.h" + +#include "base/base_paths.h" +#include "base/files/file.h" +#include "base/message_loop/message_loop.h" +#include "base/path_service.h" +#include "base/pickle.h" +#include "base/threading/thread.h" +#include "ipc/ipc_message.h" +#include "ipc/ipc_test_base.h" +#include "ipc/ipc_test_channel_listener.h" + +#if defined(OS_POSIX) +#include "base/file_descriptor_posix.h" +#endif + +namespace { + +class ListenerThatExpectsOK : public IPC::Listener { + public: + ListenerThatExpectsOK() {} + + virtual ~ListenerThatExpectsOK() {} + + virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE { + PickleIterator iter(message); + std::string should_be_ok; + EXPECT_TRUE(iter.ReadString(&should_be_ok)); + EXPECT_EQ(should_be_ok, "OK"); + base::MessageLoop::current()->Quit(); + return true; + } + + virtual void OnChannelError() OVERRIDE { + NOTREACHED(); + } + + static void SendOK(IPC::Sender* sender) { + IPC::Message* message = new IPC::Message( + 0, 2, IPC::Message::PRIORITY_NORMAL); + message->WriteString(std::string("OK")); + ASSERT_TRUE(sender->Send(message)); + } +}; + +class ListenerThatShouldBeNeverCalled : public IPC::Listener { + virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE { + NOTREACHED(); + return true; + } + + virtual void OnChannelError() OVERRIDE { + NOTREACHED(); + } + + virtual void OnChannelConnected(int32 peer_pid) OVERRIDE { + NOTREACHED(); + } + + virtual void OnBadMessageReceived(const IPC::Message& message) OVERRIDE { + NOTREACHED(); + } +}; + +class ChannelClient { + public: + explicit ChannelClient(IPC::Listener* listener, const char* name) { + scoped_ptr<IPC::Channel> bootstrap(IPC::Channel::CreateClient( + IPCTestBase::GetChannelName(name), + &never_called_)); + channel_ = IPC::ChannelMojo::Create( + bootstrap.Pass(), IPC::Channel::MODE_CLIENT, listener, + main_message_loop_.message_loop_proxy()); + } + + void Connect() { + CHECK(channel_->Connect()); + } + + IPC::ChannelMojo* channel() const { return channel_.get(); } + + private: + scoped_ptr<IPC::ChannelMojo> channel_; + ListenerThatShouldBeNeverCalled never_called_; + base::MessageLoopForIO main_message_loop_; +}; + +class IPCChannelMojoTest : public IPCTestBase { + public: + void CreateMojoChannel(IPC::Listener* listener); + + protected: + virtual void SetUp() OVERRIDE { + IPCTestBase::SetUp(); + } + + ListenerThatShouldBeNeverCalled never_called_; +}; + + +void IPCChannelMojoTest::CreateMojoChannel(IPC::Listener* listener) { + CreateChannel(&never_called_); + scoped_ptr<IPC::Channel> mojo_channel = IPC::ChannelMojo::Create( + ReleaseChannel(), IPC::Channel::MODE_SERVER, listener, + io_thread_task_runner()).PassAs<IPC::Channel>(); + SetChannel(mojo_channel.PassAs<IPC::Channel>()); +} + +class TestChannelListenerWithExtraExpectations + : public IPC::TestChannelListener { + public: + TestChannelListenerWithExtraExpectations() + : is_connected_called_(false) { + } + + virtual void OnChannelConnected(int32 peer_pid) OVERRIDE { + IPC::TestChannelListener::OnChannelConnected(peer_pid); + EXPECT_TRUE(base::kNullProcessId != peer_pid); + is_connected_called_ = true; + } + + bool is_connected_called() const { return is_connected_called_; } + + private: + bool is_connected_called_; +}; + +TEST_F(IPCChannelMojoTest, ConnectedFromClient) { + Init("IPCChannelMojoTestClient"); + + // Set up IPC channel and start client. + TestChannelListenerWithExtraExpectations listener; + CreateMojoChannel(&listener); + listener.Init(sender()); + ASSERT_TRUE(ConnectChannel()); + ASSERT_TRUE(StartClient()); + + IPC::TestChannelListener::SendOneMessage( + sender(), "hello from parent"); + + base::MessageLoop::current()->Run(); + EXPECT_TRUE(base::kNullProcessId != this->channel()->GetPeerPID()); + + this->channel()->Close(); + + EXPECT_TRUE(WaitForClientShutdown()); + EXPECT_TRUE(listener.is_connected_called()); + EXPECT_TRUE(listener.HasSentAll()); + + DestroyChannel(); +} + +// A long running process that connects to us +MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestClient) { + TestChannelListenerWithExtraExpectations listener; + ChannelClient client(&listener, "IPCChannelMojoTestClient"); + client.Connect(); + listener.Init(client.channel()); + + IPC::TestChannelListener::SendOneMessage( + client.channel(), "hello from child"); + base::MessageLoop::current()->Run(); + EXPECT_TRUE(listener.is_connected_called()); + EXPECT_TRUE(listener.HasSentAll()); + + return 0; +} + +#if defined(OS_POSIX) +class ListenerThatExpectsFile : public IPC::Listener { + public: + ListenerThatExpectsFile() + : sender_(NULL) {} + + virtual ~ListenerThatExpectsFile() {} + + virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE { + PickleIterator iter(message); + base::FileDescriptor desc; + EXPECT_TRUE(message.ReadFileDescriptor(&iter, &desc)); + std::string content(GetSendingFileContent().size(), ' '); + base::File file(desc.fd); + file.Read(0, &content[0], content.size()); + EXPECT_EQ(content, GetSendingFileContent()); + base::MessageLoop::current()->Quit(); + ListenerThatExpectsOK::SendOK(sender_); + return true; + } + + virtual void OnChannelError() OVERRIDE { + NOTREACHED(); + } + + static std::string GetSendingFileContent() { + return "Hello"; + } + + static base::FilePath GetSendingFilePath() { + base::FilePath path; + bool ok = PathService::Get(base::DIR_CACHE, &path); + EXPECT_TRUE(ok); + return path.Append("ListenerThatExpectsFile.txt"); + } + + static void WriteAndSendFile(IPC::Sender* sender, base::File& file) { + std::string content = GetSendingFileContent(); + file.WriteAtCurrentPos(content.data(), content.size()); + file.Flush(); + IPC::Message* message = new IPC::Message( + 0, 2, IPC::Message::PRIORITY_NORMAL); + message->WriteFileDescriptor( + base::FileDescriptor(file.TakePlatformFile(), false)); + ASSERT_TRUE(sender->Send(message)); + } + + void set_sender(IPC::Sender* sender) { sender_ = sender; } + + private: + IPC::Sender* sender_; +}; + + +TEST_F(IPCChannelMojoTest, SendPlatformHandle) { + Init("IPCChannelMojoTestSendPlatformHandleClient"); + + ListenerThatExpectsOK listener; + CreateMojoChannel(&listener); + ASSERT_TRUE(ConnectChannel()); + ASSERT_TRUE(StartClient()); + + base::File file(ListenerThatExpectsFile::GetSendingFilePath(), + base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE | + base::File::FLAG_READ); + ListenerThatExpectsFile::WriteAndSendFile(channel(), file); + base::MessageLoop::current()->Run(); + + this->channel()->Close(); + + EXPECT_TRUE(WaitForClientShutdown()); + DestroyChannel(); +} + +MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestSendPlatformHandleClient) { + ListenerThatExpectsFile listener; + ChannelClient client( + &listener, "IPCChannelMojoTestSendPlatformHandleClient"); + client.Connect(); + listener.set_sender(client.channel()); + + base::MessageLoop::current()->Run(); + + return 0; +} +#endif + +} // namespace diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc new file mode 100644 index 0000000..91022ac --- /dev/null +++ b/ipc/mojo/ipc_message_pipe_reader.cc @@ -0,0 +1,144 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/mojo/ipc_message_pipe_reader.h" + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/message_loop/message_loop_proxy.h" +#include "mojo/public/cpp/environment/environment.h" + +namespace IPC { +namespace internal { + +MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle) + : pipe_wait_id_(0), + pipe_(handle.Pass()) { + StartWaiting(); +} + +MessagePipeReader::~MessagePipeReader() { + CHECK(!IsValid()); +} + +void MessagePipeReader::Close() { + StopWaiting(); + pipe_.reset(); + OnPipeClosed(); +} + +void MessagePipeReader::CloseWithError(MojoResult error) { + OnPipeError(error); + Close(); +} + +// static +void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) { + reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result); +} + +void MessagePipeReader::StartWaiting() { + DCHECK(pipe_.is_valid()); + DCHECK(!pipe_wait_id_); + // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in + // MessagePipe. + // + // TODO(morrita): Should we re-set the signal when we get new + // message to send? + pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait( + pipe_.get().value(), + MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, + &InvokePipeIsReady, + this); +} + +void MessagePipeReader::StopWaiting() { + if (!pipe_wait_id_) + return; + mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_); + pipe_wait_id_ = 0; +} + +void MessagePipeReader::PipeIsReady(MojoResult wait_result) { + pipe_wait_id_ = 0; + + if (wait_result != MOJO_RESULT_OK) { + // FAILED_PRECONDITION happens when the pipe is + // closed before the waiter is scheduled in a backend thread. + if (wait_result != MOJO_RESULT_ABORTED && + wait_result != MOJO_RESULT_FAILED_PRECONDITION) { + DLOG(WARNING) << "Pipe got error from the waiter. Closing: " + << wait_result; + OnPipeError(wait_result); + } + + Close(); + return; + } + + while (pipe_.is_valid()) { + MojoResult read_result = ReadMessageBytes(); + if (read_result == MOJO_RESULT_SHOULD_WAIT) + break; + if (read_result != MOJO_RESULT_OK) { + // FAILED_PRECONDITION means that all the received messages + // got consumed and the peer is already closed. + if (read_result != MOJO_RESULT_FAILED_PRECONDITION) { + DLOG(WARNING) + << "Pipe got error from ReadMessage(). Closing: " << read_result; + OnPipeError(read_result); + } + + Close(); + break; + } + + OnMessageReceived(); + } + + if (pipe_.is_valid()) + StartWaiting(); +} + +MojoResult MessagePipeReader::ReadMessageBytes() { + DCHECK(handle_buffer_.empty()); + + uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); + uint32_t num_handles = 0; + MojoResult result = MojoReadMessage(pipe_.get().value(), + num_bytes ? &data_buffer_[0] : NULL, + &num_bytes, + NULL, + &num_handles, + MOJO_READ_MESSAGE_FLAG_NONE); + data_buffer_.resize(num_bytes); + handle_buffer_.resize(num_handles); + if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { + // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that + // it needs more bufer. So we re-read it with resized buffers. + result = MojoReadMessage(pipe_.get().value(), + num_bytes ? &data_buffer_[0] : NULL, + &num_bytes, + num_handles ? &handle_buffer_[0] : NULL, + &num_handles, + MOJO_READ_MESSAGE_FLAG_NONE); + } + + DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); + DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); + return result; +} + +void MessagePipeReader::DelayedDeleter::operator()( + MessagePipeReader* ptr) const { + ptr->Close(); + base::MessageLoopProxy::current()->PostTask( + FROM_HERE, base::Bind(&DeleteNow, ptr)); +} + +} // namespace internal +} // namespace IPC diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h new file mode 100644 index 0000000..ecfa018 --- /dev/null +++ b/ipc/mojo/ipc_message_pipe_reader.h @@ -0,0 +1,98 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_MESSAGE_PIPE_READER_H_ +#define IPC_IPC_MESSAGE_PIPE_READER_H_ + +#include <vector> + +#include "base/memory/scoped_ptr.h" +#include "mojo/public/c/environment/async_waiter.h" +#include "mojo/public/cpp/system/core.h" + +namespace IPC { +namespace internal { + +// A helper class to handle bytestream directly over mojo::MessagePipe +// in template-method pattern. MessagePipeReader manages the lifetime +// of given MessagePipe and participates the event loop, and +// read the stream and call the client when it is ready. +// +// Each client has to: +// +// * Provide a subclass implemenation of a specific use of a MessagePipe +// and implement callbacks. +// * Create the subclass instance with a MessagePipeHandle. +// The constructor automatically start listening on the pipe. +// +// MessageReader has to be used in IO thread. It isn't thread-safe. +// +class MessagePipeReader { + public: + // Delay the object deletion using the current message loop. + // This is intended to used by MessagePipeReader owners. + class DelayedDeleter { + public: + typedef base::DefaultDeleter<MessagePipeReader> DefaultType; + + static void DeleteNow(MessagePipeReader* ptr) { delete ptr; } + + DelayedDeleter() {} + DelayedDeleter(const DefaultType&) {} + DelayedDeleter& operator=(const DefaultType&) { return *this; } + + void operator()(MessagePipeReader* ptr) const; + }; + + explicit MessagePipeReader(mojo::ScopedMessagePipeHandle handle); + virtual ~MessagePipeReader(); + + MojoHandle handle() const { return pipe_.get().value(); } + + // Returns received bytes. + const std::vector<char>& data_buffer() const { + return data_buffer_; + } + + // Delegate received handles ownership. The subclass should take the + // ownership over in its OnMessageReceived(). They will leak otherwise. + void TakeHandleBuffer(std::vector<MojoHandle>* handle_buffer) { + handle_buffer_.swap(*handle_buffer); + } + + // Close and destroy the MessagePipe. + void Close(); + // Close the mesage pipe with notifying the client with the error. + void CloseWithError(MojoResult error); + // Return true if the MessagePipe is alive. + bool IsValid() { return pipe_.is_valid(); } + + // + // The client have to implment these callback to get the readiness + // event from the reader + // + virtual void OnMessageReceived() = 0; + virtual void OnPipeClosed() = 0; + virtual void OnPipeError(MojoResult error) = 0; + + private: + static void InvokePipeIsReady(void* closure, MojoResult result); + + MojoResult ReadMessageBytes(); + void PipeIsReady(MojoResult wait_result); + void StartWaiting(); + void StopWaiting(); + + std::vector<char> data_buffer_; + std::vector<MojoHandle> handle_buffer_; + MojoAsyncWaitID pipe_wait_id_; + mojo::ScopedMessagePipeHandle pipe_; + + DISALLOW_COPY_AND_ASSIGN(MessagePipeReader); +}; + +} // namespace internal +} // namespace IPC + +#endif // IPC_IPC_MESSAGE_PIPE_READER_H_ diff --git a/ipc/mojo/ipc_mojo.gyp b/ipc/mojo/ipc_mojo.gyp new file mode 100644 index 0000000..c408a17 --- /dev/null +++ b/ipc/mojo/ipc_mojo.gyp @@ -0,0 +1,68 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +{ + 'variables': { + 'chromium_code': 1, + }, + 'includes': [ + ], + 'targets': [ + { + 'target_name': 'ipc_mojo', + 'type': '<(component)', + 'variables': { + }, + 'defines': [ + 'IPC_MOJO_IMPLEMENTATION', + ], + 'dependencies': [ + '../ipc.gyp:ipc', + '../../base/base.gyp:base', + '../../base/third_party/dynamic_annotations/dynamic_annotations.gyp:dynamic_annotations', + '../../mojo/mojo_base.gyp:mojo_cpp_bindings', + '../../mojo/mojo_base.gyp:mojo_environment_chromium', + '../../mojo/mojo_base.gyp:mojo_system_impl', + ], + 'sources': [ + 'ipc_channel_mojo.cc', + 'ipc_channel_mojo.h', + 'ipc_message_pipe_reader.cc', + 'ipc_message_pipe_reader.h', + ], + # TODO(gregoryd): direct_dependent_settings should be shared with the + # 64-bit target, but it doesn't work due to a bug in gyp + 'direct_dependent_settings': { + 'include_dirs': [ + '..', + ], + }, + }, + { + 'target_name': 'ipc_mojo_unittests', + 'type': '<(gtest_target_type)', + 'dependencies': [ + '../ipc.gyp:ipc', + '../ipc.gyp:test_support_ipc', + '../../base/base.gyp:base', + '../../base/base.gyp:base_i18n', + '../../base/base.gyp:test_support_base', + '../../mojo/mojo_base.gyp:mojo_cpp_bindings', + '../../mojo/mojo_base.gyp:mojo_environment_chromium', + '../../mojo/mojo_base.gyp:mojo_system_impl', + '../../testing/gtest.gyp:gtest', + 'ipc_mojo', + ], + 'include_dirs': [ + '..' + ], + 'sources': [ + 'run_all_unittests.cc', + 'ipc_channel_mojo_unittest.cc', + ], + 'conditions': [ + ], + }, + ], +} diff --git a/ipc/mojo/run_all_unittests.cc b/ipc/mojo/run_all_unittests.cc new file mode 100644 index 0000000..1734e5e --- /dev/null +++ b/ipc/mojo/run_all_unittests.cc @@ -0,0 +1,42 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/at_exit.h" +#include "base/bind.h" +#include "base/test/launcher/unit_test_launcher.h" +#include "base/test/test_suite.h" +#include "mojo/embedder/embedder.h" + +#if defined(OS_ANDROID) +#include "base/android/jni_android.h" +#include "base/test/test_file_util.h" +#endif + +namespace { + +class NoAtExitBaseTestSuite : public base::TestSuite { + public: + NoAtExitBaseTestSuite(int argc, char** argv) + : base::TestSuite(argc, argv, false) { + } +}; + +int RunTestSuite(int argc, char** argv) { + return NoAtExitBaseTestSuite(argc, argv).Run(); +} + +} // namespace + +int main(int argc, char** argv) { + mojo::embedder::Init(); +#if defined(OS_ANDROID) + JNIEnv* env = base::android::AttachCurrentThread(); + file_util::RegisterContentUriTestUtils(env); +#else + base::AtExitManager at_exit; +#endif + return base::LaunchUnitTestsSerially(argc, + argv, + base::Bind(&RunTestSuite, argc, argv)); +} |