summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrockot <rockot@chromium.org>2015-05-11 15:53:22 -0700
committerCommit bot <commit-bot@chromium.org>2015-05-11 22:53:35 +0000
commitdbb3bb6bfa0b222fb6745608cd571bb0ec8bef39 (patch)
tree3e54e478182b353dac3e151dbf6ccdd2ede01f33
parentbbdef2a03b74719929213b97f032077374806eca (diff)
downloadchromium_src-dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39.zip
chromium_src-dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39.tar.gz
chromium_src-dbb3bb6bfa0b222fb6745608cd571bb0ec8bef39.tar.bz2
Mojo IPC threading fixes
Two independent but related fixes here: 1. IPCSupportInitializer could previously outlive its IO runner, resulting in Mojo not being shut down in time. The EDK posts tasks directly to a MessageLoopForIO pointer because it's assumed to remain valid until EDK shutdown. This CL limits IPCSupportInitializer's lifetime to that of the IO thread, regardless of ScopedIPCSupport lifetime. 2. In single-process mode, the child IO thread cannot be used to create the client ChannelMojo, because all channels in-process must be created on the EDK IO thread. This problem only surfaced in practice because #1 above changed some shutdown ordering and tickled a DCHECK in EDK shutdown. This CL has ChannelMojo hop to its IO runner (only if necessary) before creating underlying messaging pipes. BUG=None R=morrita@chromium.org Review URL: https://codereview.chromium.org/1130413002 Cr-Commit-Position: refs/heads/master@{#329262}
-rw-r--r--ipc/mojo/ipc_channel_mojo.cc96
-rw-r--r--ipc/mojo/ipc_channel_mojo.h23
-rw-r--r--ipc/mojo/scoped_ipc_support.cc183
3 files changed, 233 insertions, 69 deletions
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
index cff2789..9badfdf 100644
--- a/ipc/mojo/ipc_channel_mojo.cc
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -7,6 +7,7 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
+#include "base/thread_task_runner_handle.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_attachment_set.h"
@@ -74,7 +75,10 @@ class ClientChannelMojo : public ChannelMojo,
const mojo::Callback<void(int32_t)>& callback) override;
private:
+ void BindPipe(mojo::ScopedMessagePipeHandle handle);
+
mojo::Binding<ClientChannel> binding_;
+ base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
};
@@ -84,7 +88,8 @@ ClientChannelMojo::ClientChannelMojo(ChannelMojo::Delegate* delegate,
const ChannelHandle& handle,
Listener* listener)
: ChannelMojo(delegate, io_runner, handle, Channel::MODE_CLIENT, listener),
- binding_(this) {
+ binding_(this),
+ weak_factory_(this) {
}
ClientChannelMojo::~ClientChannelMojo() {
@@ -92,7 +97,8 @@ ClientChannelMojo::~ClientChannelMojo() {
void ClientChannelMojo::OnPipeAvailable(
mojo::embedder::ScopedPlatformHandle handle) {
- binding_.Bind(CreateMessagingPipe(handle.Pass()));
+ CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe,
+ weak_factory_.GetWeakPtr()));
}
void ClientChannelMojo::OnConnectionError() {
@@ -107,6 +113,10 @@ void ClientChannelMojo::Init(
callback.Run(GetSelfPID());
}
+void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) {
+ binding_.Bind(handle.Pass());
+}
+
//------------------------------------------------------------------------------
class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
@@ -125,11 +135,15 @@ class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
void Close() override;
private:
+ void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
+ mojo::ScopedMessagePipeHandle handle);
+
// ClientChannelClient implementation
void ClientChannelWasInitialized(int32_t peer_pid);
mojo::InterfacePtr<ClientChannel> client_channel_;
mojo::ScopedMessagePipeHandle message_pipe_;
+ base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
};
@@ -138,7 +152,8 @@ ServerChannelMojo::ServerChannelMojo(ChannelMojo::Delegate* delegate,
scoped_refptr<base::TaskRunner> io_runner,
const ChannelHandle& handle,
Listener* listener)
- : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener) {
+ : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener),
+ weak_factory_(this) {
}
ServerChannelMojo::~ServerChannelMojo() {
@@ -155,14 +170,20 @@ void ServerChannelMojo::OnPipeAvailable(
listener()->OnChannelError();
return;
}
+ CreateMessagingPipe(
+ handle.Pass(),
+ base::Bind(&ServerChannelMojo::InitClientChannel,
+ weak_factory_.GetWeakPtr(), base::Passed(&peer)));
+}
+void ServerChannelMojo::InitClientChannel(
+ mojo::ScopedMessagePipeHandle peer_handle,
+ mojo::ScopedMessagePipeHandle handle) {
client_channel_.Bind(
- mojo::InterfacePtrInfo<ClientChannel>(
- CreateMessagingPipe(handle.Pass()), 0u));
+ mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u));
client_channel_.set_error_handler(this);
client_channel_->Init(
- peer.Pass(),
- static_cast<int32_t>(GetSelfPID()),
+ peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()),
base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
base::Unretained(this)));
}
@@ -190,13 +211,26 @@ base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) {
#endif
-} // namespace
+} // namespace
//------------------------------------------------------------------------------
+ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
+ scoped_refptr<base::TaskRunner> io_runner)
+ : io_runner(io_runner) {
+}
+
+ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
+}
+
void ChannelMojo::ChannelInfoDeleter::operator()(
mojo::embedder::ChannelInfo* ptr) const {
- mojo::embedder::DestroyChannelOnIOThread(ptr);
+ if (base::ThreadTaskRunnerHandle::Get() == io_runner) {
+ mojo::embedder::DestroyChannelOnIOThread(ptr);
+ } else {
+ io_runner->PostTask(
+ FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr));
+ }
}
//------------------------------------------------------------------------------
@@ -254,6 +288,7 @@ ChannelMojo::ChannelMojo(ChannelMojo::Delegate* delegate,
listener_(listener),
peer_pid_(base::kNullProcessId),
io_runner_(io_runner),
+ channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
weak_factory_(this) {
// Create MojoBootstrap after all members are set as it touches
// ChannelMojo from a different thread.
@@ -280,14 +315,47 @@ void ChannelMojo::InitOnIOThread(ChannelMojo::Delegate* delegate) {
delegate_->OnChannelCreated(weak_factory_.GetWeakPtr());
}
-mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe(
- mojo::embedder::ScopedPlatformHandle handle) {
- DCHECK(!channel_info_.get());
+void ChannelMojo::CreateMessagingPipe(
+ mojo::embedder::ScopedPlatformHandle handle,
+ const CreateMessagingPipeCallback& callback) {
+ auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated,
+ weak_factory_.GetWeakPtr(), callback);
+ if (base::ThreadTaskRunnerHandle::Get() == io_runner_) {
+ CreateMessagingPipeOnIOThread(
+ handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback);
+ } else {
+ io_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread,
+ base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(),
+ return_callback));
+ }
+}
+
+// static
+void ChannelMojo::CreateMessagingPipeOnIOThread(
+ mojo::embedder::ScopedPlatformHandle handle,
+ scoped_refptr<base::TaskRunner> callback_runner,
+ const CreateMessagingPipeOnIOThreadCallback& callback) {
mojo::embedder::ChannelInfo* channel_info;
mojo::ScopedMessagePipeHandle pipe =
mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info);
- channel_info_.reset(channel_info);
- return pipe.Pass();
+ if (base::ThreadTaskRunnerHandle::Get() == callback_runner) {
+ callback.Run(pipe.Pass(), channel_info);
+ } else {
+ callback_runner->PostTask(
+ FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info));
+ }
+}
+
+void ChannelMojo::OnMessagingPipeCreated(
+ const CreateMessagingPipeCallback& callback,
+ mojo::ScopedMessagePipeHandle handle,
+ mojo::embedder::ChannelInfo* channel_info) {
+ DCHECK(!channel_info_.get());
+ channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>(
+ channel_info, ChannelInfoDeleter(io_runner_));
+ callback.Run(handle.Pass());
}
bool ChannelMojo::Connect() {
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
index 2f98b17..3a1d98a 100644
--- a/ipc/mojo/ipc_channel_mojo.h
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -51,6 +51,12 @@ class IPC_MOJO_EXPORT ChannelMojo
public MojoBootstrap::Delegate,
public NON_EXPORTED_BASE(internal::MessagePipeReader::Delegate) {
public:
+ using CreateMessagingPipeCallback =
+ base::Callback<void(mojo::ScopedMessagePipeHandle)>;
+ using CreateMessagingPipeOnIOThreadCallback =
+ base::Callback<void(mojo::ScopedMessagePipeHandle,
+ mojo::embedder::ChannelInfo*)>;
+
class Delegate {
public:
virtual ~Delegate() {}
@@ -125,8 +131,8 @@ class IPC_MOJO_EXPORT ChannelMojo
Mode mode,
Listener* listener);
- mojo::ScopedMessagePipeHandle CreateMessagingPipe(
- mojo::embedder::ScopedPlatformHandle handle);
+ void CreateMessagingPipe(mojo::embedder::ScopedPlatformHandle handle,
+ const CreateMessagingPipeCallback& callback);
void InitMessageReader(mojo::ScopedMessagePipeHandle pipe, int32_t peer_pid);
Listener* listener() const { return listener_; }
@@ -134,7 +140,12 @@ class IPC_MOJO_EXPORT ChannelMojo
private:
struct ChannelInfoDeleter {
+ explicit ChannelInfoDeleter(scoped_refptr<base::TaskRunner> io_runner);
+ ~ChannelInfoDeleter();
+
void operator()(mojo::embedder::ChannelInfo* ptr) const;
+
+ scoped_refptr<base::TaskRunner> io_runner;
};
// ChannelMojo needs to kill its MessagePipeReader in delayed manner
@@ -144,6 +155,14 @@ class IPC_MOJO_EXPORT ChannelMojo
void InitOnIOThread(ChannelMojo::Delegate* delegate);
+ static void CreateMessagingPipeOnIOThread(
+ mojo::embedder::ScopedPlatformHandle handle,
+ scoped_refptr<base::TaskRunner> callback_runner,
+ const CreateMessagingPipeOnIOThreadCallback& callback);
+ void OnMessagingPipeCreated(const CreateMessagingPipeCallback& callback,
+ mojo::ScopedMessagePipeHandle handle,
+ mojo::embedder::ChannelInfo* channel_info);
+
scoped_ptr<MojoBootstrap> bootstrap_;
base::WeakPtr<Delegate> delegate_;
Mode mode_;
diff --git a/ipc/mojo/scoped_ipc_support.cc b/ipc/mojo/scoped_ipc_support.cc
index fafc9c2..0da35e8 100644
--- a/ipc/mojo/scoped_ipc_support.cc
+++ b/ipc/mojo/scoped_ipc_support.cc
@@ -7,10 +7,12 @@
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
+#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
+#include "base/thread_task_runner_handle.h"
#include "third_party/mojo/src/mojo/edk/embedder/embedder.h"
#include "third_party/mojo/src/mojo/edk/embedder/process_delegate.h"
@@ -22,80 +24,155 @@ class IPCSupportInitializer : public mojo::embedder::ProcessDelegate {
public:
IPCSupportInitializer()
: init_count_(0),
- shutting_down_(false) {
- }
-
- ~IPCSupportInitializer() override {}
+ shutting_down_(false),
+ was_shut_down_(false),
+ observer_(nullptr),
+ weak_factory_(this) {}
- void Init(scoped_refptr<base::TaskRunner> io_thread_task_runner) {
- base::AutoLock locker(lock_);
- DCHECK((init_count_ == 0 && !io_thread_task_runner_) ||
- io_thread_task_runner_ == io_thread_task_runner);
-
- if (shutting_down_) {
- // If reinitialized before a pending shutdown task is executed, we
- // effectively cancel the shutdown task.
- DCHECK(init_count_ == 1);
- shutting_down_ = false;
- return;
- }
+ ~IPCSupportInitializer() override { DCHECK(!observer_); }
- init_count_++;
- if (init_count_ == 1) {
- io_thread_task_runner_ = io_thread_task_runner;
- mojo::embedder::InitIPCSupport(mojo::embedder::ProcessType::NONE,
- io_thread_task_runner_,
- this, io_thread_task_runner_,
- mojo::embedder::ScopedPlatformHandle());
- }
- }
+ void Init(scoped_refptr<base::TaskRunner> io_thread_task_runner);
+ void ShutDown();
- void ShutDown() {
- base::AutoLock locker(lock_);
- DCHECK(init_count_ > 0);
- DCHECK(!shutting_down_);
+ // Forces the initializer to shut down even if scopers are still holding it.
+ void ForceShutdown();
- if (init_count_ > 1) {
- init_count_--;
- return;
+ private:
+ // This watches for destruction of the MessageLoop that IPCSupportInitializer
+ // uses for IO, and guarantees that the initializer is shut down if it still
+ // exists when the loop is being destroyed.
+ class MessageLoopObserver : public base::MessageLoop::DestructionObserver {
+ public:
+ MessageLoopObserver(
+ scoped_refptr<base::TaskRunner> initializer_task_runner,
+ base::WeakPtr<IPCSupportInitializer> weak_initializer)
+ : initializer_task_runner_(initializer_task_runner),
+ weak_initializer_(weak_initializer) {}
+
+ ~MessageLoopObserver() override {
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
}
- shutting_down_ = true;
- if (base::MessageLoop::current() &&
- base::MessageLoop::current()->task_runner() == io_thread_task_runner_) {
- base::AutoUnlock unlocker_(lock_);
- ShutDownOnIOThread();
- } else {
- io_thread_task_runner_->PostTask(
+ private:
+ // base::MessageLoop::DestructionObserver:
+ void WillDestroyCurrentMessageLoop() override {
+ initializer_task_runner_->PostTask(
FROM_HERE,
- base::Bind(&IPCSupportInitializer::ShutDownOnIOThread,
- base::Unretained(this)));
+ base::Bind(&IPCSupportInitializer::ForceShutdown, weak_initializer_));
}
- }
- private:
- void ShutDownOnIOThread() {
- base::AutoLock locker(lock_);
- if (shutting_down_) {
- DCHECK(init_count_ == 1);
- mojo::embedder::ShutdownIPCSupportOnIOThread();
- init_count_ = 0;
- shutting_down_ = false;
- io_thread_task_runner_ = nullptr;
- }
- }
+ scoped_refptr<base::TaskRunner> initializer_task_runner_;
+ base::WeakPtr<IPCSupportInitializer> weak_initializer_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageLoopObserver);
+ };
+ void ShutDownOnIOThread();
+
+ // mojo::embedder::ProcessDelegate:
void OnShutdownComplete() override {}
+ static void WatchMessageLoopOnIOThread(MessageLoopObserver* observer);
+
base::Lock lock_;
size_t init_count_;
bool shutting_down_;
+ // This is used to track whether shutdown has occurred yet, since we can be
+ // shut down by either the scoper or IO MessageLoop destruction.
+ bool was_shut_down_;
+
+ // The message loop destruction observer we have watching our IO loop. This
+ // is created on the initializer's own thread but is used and destroyed on the
+ // IO thread.
+ MessageLoopObserver* observer_;
+
scoped_refptr<base::TaskRunner> io_thread_task_runner_;
+ base::WeakPtrFactory<IPCSupportInitializer> weak_factory_;
+
DISALLOW_COPY_AND_ASSIGN(IPCSupportInitializer);
};
+void IPCSupportInitializer::Init(
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ base::AutoLock locker(lock_);
+ DCHECK((init_count_ == 0 && !io_thread_task_runner_) ||
+ io_thread_task_runner_ == io_thread_task_runner);
+
+ if (shutting_down_) {
+ // If reinitialized before a pending shutdown task is executed, we
+ // effectively cancel the shutdown task.
+ DCHECK(init_count_ == 1);
+ shutting_down_ = false;
+ return;
+ }
+
+ init_count_++;
+ if (init_count_ == 1) {
+ was_shut_down_ = false;
+ observer_ = new MessageLoopObserver(base::ThreadTaskRunnerHandle::Get(),
+ weak_factory_.GetWeakPtr());
+ io_thread_task_runner_ = io_thread_task_runner;
+ io_thread_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&WatchMessageLoopOnIOThread, observer_));
+ mojo::embedder::InitIPCSupport(
+ mojo::embedder::ProcessType::NONE, io_thread_task_runner_, this,
+ io_thread_task_runner_, mojo::embedder::ScopedPlatformHandle());
+ }
+}
+
+void IPCSupportInitializer::ShutDown() {
+ {
+ base::AutoLock locker(lock_);
+ if (shutting_down_ || was_shut_down_)
+ return;
+ DCHECK(init_count_ > 0);
+ if (init_count_ > 1) {
+ init_count_--;
+ return;
+ }
+ }
+ ForceShutdown();
+}
+
+void IPCSupportInitializer::ForceShutdown() {
+ base::AutoLock locker(lock_);
+ if (shutting_down_ || was_shut_down_)
+ return;
+ shutting_down_ = true;
+ if (base::MessageLoop::current() &&
+ base::MessageLoop::current()->task_runner() == io_thread_task_runner_) {
+ base::AutoUnlock unlocker_(lock_);
+ ShutDownOnIOThread();
+ } else {
+ io_thread_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&IPCSupportInitializer::ShutDownOnIOThread,
+ base::Unretained(this)));
+ }
+}
+
+void IPCSupportInitializer::ShutDownOnIOThread() {
+ base::AutoLock locker(lock_);
+ if (shutting_down_ && !was_shut_down_) {
+ mojo::embedder::ShutdownIPCSupportOnIOThread();
+ init_count_ = 0;
+ shutting_down_ = false;
+ io_thread_task_runner_ = nullptr;
+ was_shut_down_ = true;
+ if (observer_) {
+ delete observer_;
+ observer_ = nullptr;
+ }
+ }
+}
+
+// static
+void IPCSupportInitializer::WatchMessageLoopOnIOThread(
+ MessageLoopObserver* observer) {
+ base::MessageLoop::current()->AddDestructionObserver(observer);
+}
+
base::LazyInstance<IPCSupportInitializer>::Leaky ipc_support_initializer;
} // namespace