diff options
author | rockot <rockot@chromium.org> | 2015-05-11 15:53:22 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-05-11 22:53:35 +0000 |
commit | dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39 (patch) | |
tree | 3e54e478182b353dac3e151dbf6ccdd2ede01f33 /ipc/mojo | |
parent | bbdef2a03b74719929213b97f032077374806eca (diff) | |
download | chromium_src-dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39.zip chromium_src-dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39.tar.gz chromium_src-dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39.tar.bz2 |
Mojo IPC threading fixes
Two independent but related fixes here:
1. IPCSupportInitializer could previously outlive its IO
runner, resulting in Mojo not being shut down in time.
The EDK posts tasks directly to a MessageLoopForIO
pointer because it's assumed to remain valid until
EDK shutdown. This CL limits IPCSupportInitializer's
lifetime to that of the IO thread, regardless of
ScopedIPCSupport lifetime.
2. In single-process mode, the child IO thread cannot be
used to create the client ChannelMojo, because all channels
in-process must be created on the EDK IO thread. This problem only
surfaced in practice because #1 above changed some shutdown
ordering and tickled a DCHECK in EDK shutdown.
This CL has ChannelMojo hop to its IO runner (only if
necessary) before creating underlying messaging pipes.
BUG=None
R=morrita@chromium.org
Review URL: https://codereview.chromium.org/1130413002
Cr-Commit-Position: refs/heads/master@{#329262}
Diffstat (limited to 'ipc/mojo')
-rw-r--r-- | ipc/mojo/ipc_channel_mojo.cc | 96 | ||||
-rw-r--r-- | ipc/mojo/ipc_channel_mojo.h | 23 | ||||
-rw-r--r-- | ipc/mojo/scoped_ipc_support.cc | 183 |
3 files changed, 233 insertions, 69 deletions
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc index cff2789..9badfdf 100644 --- a/ipc/mojo/ipc_channel_mojo.cc +++ b/ipc/mojo/ipc_channel_mojo.cc @@ -7,6 +7,7 @@ #include "base/bind.h" #include "base/bind_helpers.h" #include "base/lazy_instance.h" +#include "base/thread_task_runner_handle.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_logging.h" #include "ipc/ipc_message_attachment_set.h" @@ -74,7 +75,10 @@ class ClientChannelMojo : public ChannelMojo, const mojo::Callback<void(int32_t)>& callback) override; private: + void BindPipe(mojo::ScopedMessagePipeHandle handle); + mojo::Binding<ClientChannel> binding_; + base::WeakPtrFactory<ClientChannelMojo> weak_factory_; DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo); }; @@ -84,7 +88,8 @@ ClientChannelMojo::ClientChannelMojo(ChannelMojo::Delegate* delegate, const ChannelHandle& handle, Listener* listener) : ChannelMojo(delegate, io_runner, handle, Channel::MODE_CLIENT, listener), - binding_(this) { + binding_(this), + weak_factory_(this) { } ClientChannelMojo::~ClientChannelMojo() { @@ -92,7 +97,8 @@ ClientChannelMojo::~ClientChannelMojo() { void ClientChannelMojo::OnPipeAvailable( mojo::embedder::ScopedPlatformHandle handle) { - binding_.Bind(CreateMessagingPipe(handle.Pass())); + CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe, + weak_factory_.GetWeakPtr())); } void ClientChannelMojo::OnConnectionError() { @@ -107,6 +113,10 @@ void ClientChannelMojo::Init( callback.Run(GetSelfPID()); } +void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) { + binding_.Bind(handle.Pass()); +} + //------------------------------------------------------------------------------ class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler { @@ -125,11 +135,15 @@ class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler { void Close() override; private: + void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle, + mojo::ScopedMessagePipeHandle handle); + // ClientChannelClient implementation void ClientChannelWasInitialized(int32_t peer_pid); mojo::InterfacePtr<ClientChannel> client_channel_; mojo::ScopedMessagePipeHandle message_pipe_; + base::WeakPtrFactory<ServerChannelMojo> weak_factory_; DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo); }; @@ -138,7 +152,8 @@ ServerChannelMojo::ServerChannelMojo(ChannelMojo::Delegate* delegate, scoped_refptr<base::TaskRunner> io_runner, const ChannelHandle& handle, Listener* listener) - : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener) { + : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener), + weak_factory_(this) { } ServerChannelMojo::~ServerChannelMojo() { @@ -155,14 +170,20 @@ void ServerChannelMojo::OnPipeAvailable( listener()->OnChannelError(); return; } + CreateMessagingPipe( + handle.Pass(), + base::Bind(&ServerChannelMojo::InitClientChannel, + weak_factory_.GetWeakPtr(), base::Passed(&peer))); +} +void ServerChannelMojo::InitClientChannel( + mojo::ScopedMessagePipeHandle peer_handle, + mojo::ScopedMessagePipeHandle handle) { client_channel_.Bind( - mojo::InterfacePtrInfo<ClientChannel>( - CreateMessagingPipe(handle.Pass()), 0u)); + mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u)); client_channel_.set_error_handler(this); client_channel_->Init( - peer.Pass(), - static_cast<int32_t>(GetSelfPID()), + peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()), base::Bind(&ServerChannelMojo::ClientChannelWasInitialized, base::Unretained(this))); } @@ -190,13 +211,26 @@ base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) { #endif -} // namespace +} // namespace //------------------------------------------------------------------------------ +ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter( + scoped_refptr<base::TaskRunner> io_runner) + : io_runner(io_runner) { +} + +ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() { +} + void ChannelMojo::ChannelInfoDeleter::operator()( mojo::embedder::ChannelInfo* ptr) const { - mojo::embedder::DestroyChannelOnIOThread(ptr); + if (base::ThreadTaskRunnerHandle::Get() == io_runner) { + mojo::embedder::DestroyChannelOnIOThread(ptr); + } else { + io_runner->PostTask( + FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr)); + } } //------------------------------------------------------------------------------ @@ -254,6 +288,7 @@ ChannelMojo::ChannelMojo(ChannelMojo::Delegate* delegate, listener_(listener), peer_pid_(base::kNullProcessId), io_runner_(io_runner), + channel_info_(nullptr, ChannelInfoDeleter(nullptr)), weak_factory_(this) { // Create MojoBootstrap after all members are set as it touches // ChannelMojo from a different thread. @@ -280,14 +315,47 @@ void ChannelMojo::InitOnIOThread(ChannelMojo::Delegate* delegate) { delegate_->OnChannelCreated(weak_factory_.GetWeakPtr()); } -mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe( - mojo::embedder::ScopedPlatformHandle handle) { - DCHECK(!channel_info_.get()); +void ChannelMojo::CreateMessagingPipe( + mojo::embedder::ScopedPlatformHandle handle, + const CreateMessagingPipeCallback& callback) { + auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated, + weak_factory_.GetWeakPtr(), callback); + if (base::ThreadTaskRunnerHandle::Get() == io_runner_) { + CreateMessagingPipeOnIOThread( + handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback); + } else { + io_runner_->PostTask( + FROM_HERE, + base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread, + base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(), + return_callback)); + } +} + +// static +void ChannelMojo::CreateMessagingPipeOnIOThread( + mojo::embedder::ScopedPlatformHandle handle, + scoped_refptr<base::TaskRunner> callback_runner, + const CreateMessagingPipeOnIOThreadCallback& callback) { mojo::embedder::ChannelInfo* channel_info; mojo::ScopedMessagePipeHandle pipe = mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info); - channel_info_.reset(channel_info); - return pipe.Pass(); + if (base::ThreadTaskRunnerHandle::Get() == callback_runner) { + callback.Run(pipe.Pass(), channel_info); + } else { + callback_runner->PostTask( + FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info)); + } +} + +void ChannelMojo::OnMessagingPipeCreated( + const CreateMessagingPipeCallback& callback, + mojo::ScopedMessagePipeHandle handle, + mojo::embedder::ChannelInfo* channel_info) { + DCHECK(!channel_info_.get()); + channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>( + channel_info, ChannelInfoDeleter(io_runner_)); + callback.Run(handle.Pass()); } bool ChannelMojo::Connect() { diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h index 2f98b17..3a1d98a 100644 --- a/ipc/mojo/ipc_channel_mojo.h +++ b/ipc/mojo/ipc_channel_mojo.h @@ -51,6 +51,12 @@ class IPC_MOJO_EXPORT ChannelMojo public MojoBootstrap::Delegate, public NON_EXPORTED_BASE(internal::MessagePipeReader::Delegate) { public: + using CreateMessagingPipeCallback = + base::Callback<void(mojo::ScopedMessagePipeHandle)>; + using CreateMessagingPipeOnIOThreadCallback = + base::Callback<void(mojo::ScopedMessagePipeHandle, + mojo::embedder::ChannelInfo*)>; + class Delegate { public: virtual ~Delegate() {} @@ -125,8 +131,8 @@ class IPC_MOJO_EXPORT ChannelMojo Mode mode, Listener* listener); - mojo::ScopedMessagePipeHandle CreateMessagingPipe( - mojo::embedder::ScopedPlatformHandle handle); + void CreateMessagingPipe(mojo::embedder::ScopedPlatformHandle handle, + const CreateMessagingPipeCallback& callback); void InitMessageReader(mojo::ScopedMessagePipeHandle pipe, int32_t peer_pid); Listener* listener() const { return listener_; } @@ -134,7 +140,12 @@ class IPC_MOJO_EXPORT ChannelMojo private: struct ChannelInfoDeleter { + explicit ChannelInfoDeleter(scoped_refptr<base::TaskRunner> io_runner); + ~ChannelInfoDeleter(); + void operator()(mojo::embedder::ChannelInfo* ptr) const; + + scoped_refptr<base::TaskRunner> io_runner; }; // ChannelMojo needs to kill its MessagePipeReader in delayed manner @@ -144,6 +155,14 @@ class IPC_MOJO_EXPORT ChannelMojo void InitOnIOThread(ChannelMojo::Delegate* delegate); + static void CreateMessagingPipeOnIOThread( + mojo::embedder::ScopedPlatformHandle handle, + scoped_refptr<base::TaskRunner> callback_runner, + const CreateMessagingPipeOnIOThreadCallback& callback); + void OnMessagingPipeCreated(const CreateMessagingPipeCallback& callback, + mojo::ScopedMessagePipeHandle handle, + mojo::embedder::ChannelInfo* channel_info); + scoped_ptr<MojoBootstrap> bootstrap_; base::WeakPtr<Delegate> delegate_; Mode mode_; diff --git a/ipc/mojo/scoped_ipc_support.cc b/ipc/mojo/scoped_ipc_support.cc index fafc9c2..0da35e8 100644 --- a/ipc/mojo/scoped_ipc_support.cc +++ b/ipc/mojo/scoped_ipc_support.cc @@ -7,10 +7,12 @@ #include "base/bind.h" #include "base/lazy_instance.h" #include "base/logging.h" +#include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" +#include "base/thread_task_runner_handle.h" #include "third_party/mojo/src/mojo/edk/embedder/embedder.h" #include "third_party/mojo/src/mojo/edk/embedder/process_delegate.h" @@ -22,80 +24,155 @@ class IPCSupportInitializer : public mojo::embedder::ProcessDelegate { public: IPCSupportInitializer() : init_count_(0), - shutting_down_(false) { - } - - ~IPCSupportInitializer() override {} + shutting_down_(false), + was_shut_down_(false), + observer_(nullptr), + weak_factory_(this) {} - void Init(scoped_refptr<base::TaskRunner> io_thread_task_runner) { - base::AutoLock locker(lock_); - DCHECK((init_count_ == 0 && !io_thread_task_runner_) || - io_thread_task_runner_ == io_thread_task_runner); - - if (shutting_down_) { - // If reinitialized before a pending shutdown task is executed, we - // effectively cancel the shutdown task. - DCHECK(init_count_ == 1); - shutting_down_ = false; - return; - } + ~IPCSupportInitializer() override { DCHECK(!observer_); } - init_count_++; - if (init_count_ == 1) { - io_thread_task_runner_ = io_thread_task_runner; - mojo::embedder::InitIPCSupport(mojo::embedder::ProcessType::NONE, - io_thread_task_runner_, - this, io_thread_task_runner_, - mojo::embedder::ScopedPlatformHandle()); - } - } + void Init(scoped_refptr<base::TaskRunner> io_thread_task_runner); + void ShutDown(); - void ShutDown() { - base::AutoLock locker(lock_); - DCHECK(init_count_ > 0); - DCHECK(!shutting_down_); + // Forces the initializer to shut down even if scopers are still holding it. + void ForceShutdown(); - if (init_count_ > 1) { - init_count_--; - return; + private: + // This watches for destruction of the MessageLoop that IPCSupportInitializer + // uses for IO, and guarantees that the initializer is shut down if it still + // exists when the loop is being destroyed. + class MessageLoopObserver : public base::MessageLoop::DestructionObserver { + public: + MessageLoopObserver( + scoped_refptr<base::TaskRunner> initializer_task_runner, + base::WeakPtr<IPCSupportInitializer> weak_initializer) + : initializer_task_runner_(initializer_task_runner), + weak_initializer_(weak_initializer) {} + + ~MessageLoopObserver() override { + base::MessageLoop::current()->RemoveDestructionObserver(this); } - shutting_down_ = true; - if (base::MessageLoop::current() && - base::MessageLoop::current()->task_runner() == io_thread_task_runner_) { - base::AutoUnlock unlocker_(lock_); - ShutDownOnIOThread(); - } else { - io_thread_task_runner_->PostTask( + private: + // base::MessageLoop::DestructionObserver: + void WillDestroyCurrentMessageLoop() override { + initializer_task_runner_->PostTask( FROM_HERE, - base::Bind(&IPCSupportInitializer::ShutDownOnIOThread, - base::Unretained(this))); + base::Bind(&IPCSupportInitializer::ForceShutdown, weak_initializer_)); } - } - private: - void ShutDownOnIOThread() { - base::AutoLock locker(lock_); - if (shutting_down_) { - DCHECK(init_count_ == 1); - mojo::embedder::ShutdownIPCSupportOnIOThread(); - init_count_ = 0; - shutting_down_ = false; - io_thread_task_runner_ = nullptr; - } - } + scoped_refptr<base::TaskRunner> initializer_task_runner_; + base::WeakPtr<IPCSupportInitializer> weak_initializer_; + + DISALLOW_COPY_AND_ASSIGN(MessageLoopObserver); + }; + void ShutDownOnIOThread(); + + // mojo::embedder::ProcessDelegate: void OnShutdownComplete() override {} + static void WatchMessageLoopOnIOThread(MessageLoopObserver* observer); + base::Lock lock_; size_t init_count_; bool shutting_down_; + // This is used to track whether shutdown has occurred yet, since we can be + // shut down by either the scoper or IO MessageLoop destruction. + bool was_shut_down_; + + // The message loop destruction observer we have watching our IO loop. This + // is created on the initializer's own thread but is used and destroyed on the + // IO thread. + MessageLoopObserver* observer_; + scoped_refptr<base::TaskRunner> io_thread_task_runner_; + base::WeakPtrFactory<IPCSupportInitializer> weak_factory_; + DISALLOW_COPY_AND_ASSIGN(IPCSupportInitializer); }; +void IPCSupportInitializer::Init( + scoped_refptr<base::TaskRunner> io_thread_task_runner) { + base::AutoLock locker(lock_); + DCHECK((init_count_ == 0 && !io_thread_task_runner_) || + io_thread_task_runner_ == io_thread_task_runner); + + if (shutting_down_) { + // If reinitialized before a pending shutdown task is executed, we + // effectively cancel the shutdown task. + DCHECK(init_count_ == 1); + shutting_down_ = false; + return; + } + + init_count_++; + if (init_count_ == 1) { + was_shut_down_ = false; + observer_ = new MessageLoopObserver(base::ThreadTaskRunnerHandle::Get(), + weak_factory_.GetWeakPtr()); + io_thread_task_runner_ = io_thread_task_runner; + io_thread_task_runner_->PostTask( + FROM_HERE, base::Bind(&WatchMessageLoopOnIOThread, observer_)); + mojo::embedder::InitIPCSupport( + mojo::embedder::ProcessType::NONE, io_thread_task_runner_, this, + io_thread_task_runner_, mojo::embedder::ScopedPlatformHandle()); + } +} + +void IPCSupportInitializer::ShutDown() { + { + base::AutoLock locker(lock_); + if (shutting_down_ || was_shut_down_) + return; + DCHECK(init_count_ > 0); + if (init_count_ > 1) { + init_count_--; + return; + } + } + ForceShutdown(); +} + +void IPCSupportInitializer::ForceShutdown() { + base::AutoLock locker(lock_); + if (shutting_down_ || was_shut_down_) + return; + shutting_down_ = true; + if (base::MessageLoop::current() && + base::MessageLoop::current()->task_runner() == io_thread_task_runner_) { + base::AutoUnlock unlocker_(lock_); + ShutDownOnIOThread(); + } else { + io_thread_task_runner_->PostTask( + FROM_HERE, base::Bind(&IPCSupportInitializer::ShutDownOnIOThread, + base::Unretained(this))); + } +} + +void IPCSupportInitializer::ShutDownOnIOThread() { + base::AutoLock locker(lock_); + if (shutting_down_ && !was_shut_down_) { + mojo::embedder::ShutdownIPCSupportOnIOThread(); + init_count_ = 0; + shutting_down_ = false; + io_thread_task_runner_ = nullptr; + was_shut_down_ = true; + if (observer_) { + delete observer_; + observer_ = nullptr; + } + } +} + +// static +void IPCSupportInitializer::WatchMessageLoopOnIOThread( + MessageLoopObserver* observer) { + base::MessageLoop::current()->AddDestructionObserver(observer); +} + base::LazyInstance<IPCSupportInitializer>::Leaky ipc_support_initializer; } // namespace |