summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorjam <jam@chromium.org>2015-10-16 15:44:02 -0700
committerCommit bot <commit-bot@chromium.org>2015-10-16 22:44:43 +0000
commite1336e604d3b3ac1932ee0a4f4b8a47d99864f5c (patch)
tree10f7c01a14bb84a90c663c34a3e1c0a2731fcbcf /mojo
parent1fb8da7702f22b9181f7d702a3a8d27b56a9dbf0 (diff)
downloadchromium_src-e1336e604d3b3ac1932ee0a4f4b8a47d99864f5c.zip
chromium_src-e1336e604d3b3ac1932ee0a4f4b8a47d99864f5c.tar.gz
chromium_src-e1336e604d3b3ac1932ee0a4f4b8a47d99864f5c.tar.bz2
Only connect to the named pipe when the delegate writes or when it starts watching the status of the pipe.
This avoids the case where an endpoint of a data or message pipe is created just to be sent to another process, but it starts reading immediately and then it has to serialize the data again. This speeds up the page cycler by about 10%. Also remove the IO thread caching in RawChannel as otherwise we'd also need another one in RawChannelWin::RawChannelIOHandler. There's no point in doing this caching since we only have one IO thread. If we have more in the future, then we can plumb it through. BUG=478251 Review URL: https://codereview.chromium.org/1411543002 Cr-Commit-Position: refs/heads/master@{#354628}
Diffstat (limited to 'mojo')
-rw-r--r--mojo/edk/system/data_pipe_consumer_dispatcher.cc22
-rw-r--r--mojo/edk/system/data_pipe_producer_dispatcher.cc18
-rw-r--r--mojo/edk/system/message_pipe_dispatcher.cc20
-rw-r--r--mojo/edk/system/raw_channel.cc76
-rw-r--r--mojo/edk/system/raw_channel.h26
-rw-r--r--mojo/edk/system/raw_channel_posix.cc42
-rw-r--r--mojo/edk/system/raw_channel_unittest.cc3
-rw-r--r--mojo/edk/system/raw_channel_win.cc64
8 files changed, 157 insertions, 114 deletions
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index 6858c96..fcd98aa 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -150,6 +150,8 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
uint32_t* num_bytes,
MojoReadDataFlags flags) {
lock().AssertAcquired();
+ if (channel_)
+ channel_->EnsureLazyInitialized();
if (in_two_phase_read_)
return MOJO_RESULT_BUSY;
@@ -206,6 +208,8 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
uint32_t* buffer_num_bytes,
MojoReadDataFlags flags) {
lock().AssertAcquired();
+ if (channel_)
+ channel_->EnsureLazyInitialized();
if (in_two_phase_read_)
return MOJO_RESULT_BUSY;
@@ -287,6 +291,8 @@ MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
uint32_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
+ if (channel_)
+ channel_->EnsureLazyInitialized();
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -455,16 +461,14 @@ void DataPipeConsumerDispatcher::OnError(Error error) {
// Although RawChannel still has a pointer to this object until Shutdown is
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
- if (!channel_)
- return;
-
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ if (channel_) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
+ channel_ = nullptr;
+ }
started_transport_.Release();
-
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
- channel_ = nullptr;
} else {
// We must be waiting to call ReleaseHandle. It will call Shutdown.
}
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index d1ad3fc..0d56f32 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -229,6 +229,8 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
uint32_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
+ if (channel_)
+ channel_->EnsureLazyInitialized();
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -335,15 +337,13 @@ void DataPipeProducerDispatcher::OnError(Error error) {
// Although RawChannel still has a pointer to this object until Shutdown is
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
- if (!channel_)
- return;
-
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
-
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
- channel_ = nullptr;
+ if (channel_) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
+ channel_ = nullptr;
+ }
started_transport_.Release();
} else {
// We must be waiting to call ReleaseHandle. It will call Shutdown.
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index d906aac..8dc2891 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -476,6 +476,8 @@ MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
uint32_t* num_dispatchers,
MojoReadMessageFlags flags) {
lock().AssertAcquired();
+ if (channel_)
+ channel_->EnsureLazyInitialized();
DCHECK(!dispatchers || dispatchers->empty());
const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes;
@@ -560,6 +562,8 @@ MojoResult MessagePipeDispatcher::AddAwakableImplNoLock(
uint32_t context,
HandleSignalsState* signals_state) {
lock().AssertAcquired();
+ if (channel_)
+ channel_->EnsureLazyInitialized();
HandleSignalsState state = GetHandleSignalsStateImplNoLock();
if (state.satisfies(signals)) {
if (signals_state)
@@ -729,15 +733,13 @@ void MessagePipeDispatcher::OnError(Error error) {
// Although RawChannel still has a pointer to this object until Shutdown is
// called, that is safe since this class always does a PostTask to the IO
// thread to self destruct.
- if (!channel_)
- return;
-
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
-
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
- channel_ = nullptr;
+ if (channel_) {
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
+ channel_ = nullptr;
+ }
started_transport_.Release();
} else {
// We must be waiting to call ReleaseHandle. It will call Shutdown.
diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
index 69a694b..5da441a 100644
--- a/mojo/edk/system/raw_channel.cc
+++ b/mojo/edk/system/raw_channel.cc
@@ -165,12 +165,12 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) {
// RawChannel ------------------------------------------------------------------
RawChannel::RawChannel()
- : message_loop_for_io_(nullptr),
- delegate_(nullptr),
+ : delegate_(nullptr),
write_ready_(false),
write_stopped_(false),
error_occurred_(false),
pending_error_(false),
+ initialized_(false),
weak_ptr_factory_(this) {
read_buffer_.reset(new ReadBuffer);
write_buffer_.reset(new WriteBuffer());
@@ -179,15 +179,6 @@ RawChannel::RawChannel()
RawChannel::~RawChannel() {
DCHECK(!read_buffer_);
DCHECK(!write_buffer_);
-
- // Only want to decrement counter if Init was called.
- if (message_loop_for_io_) {
- // No need to take the |write_lock_| here -- if there are still weak
- // pointers outstanding, then we're hosed anyway (since we wouldn't be able
- // to invalidate them cleanly, since we might not be on the I/O thread).
- // DCHECK(!weak_ptr_factory_.HasWeakPtrs());
- internal::ChannelShutdown();
- }
}
void RawChannel::Init(Delegate* delegate) {
@@ -202,9 +193,34 @@ void RawChannel::Init(Delegate* delegate) {
DCHECK(!delegate_);
delegate_ = delegate;
- DCHECK(!message_loop_for_io_);
- message_loop_for_io_ =
- static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
+ if (read_buffer_->num_valid_bytes_ ||
+ !write_buffer_->message_queue_.IsEmpty()) {
+ LazyInitialize();
+ }
+}
+
+void RawChannel::EnsureLazyInitialized() {
+ if (!initialized_) {
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::LockAndCallLazyInitialize,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+}
+
+void RawChannel::LockAndCallLazyInitialize() {
+ base::AutoLock read_locker(read_lock_);
+ base::AutoLock locker(write_lock_);
+ LazyInitialize();
+}
+
+void RawChannel::LazyInitialize() {
+ read_lock_.AssertAcquired();
+ write_lock_.AssertAcquired();
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ if (initialized_)
+ return;
+ initialized_ = true;
OnInit();
@@ -225,7 +241,7 @@ void RawChannel::Init(Delegate* delegate) {
if (io_result != IO_PENDING) {
// This will notify the delegate about the read failure. Although we're on
// the I/O thread, don't call it in the nested context.
- message_loop_for_io_->PostTask(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted,
weak_ptr_factory_.GetWeakPtr(), io_result, 0));
}
@@ -246,12 +262,14 @@ void RawChannel::Shutdown() {
// doesn't apply when 1) we don't have a handle (for obvious reasons) or
// 2) when the other side already quit and asked us to close the handle to
// ensure that we read everything out of the pipe first.
- if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) {
+ if (!IsHandleValid() || error_occurred_) {
{
base::AutoLock read_locker(read_lock_);
base::AutoLock locker(write_lock_);
OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
}
+
+ internal::ChannelShutdown();
delete this;
return;
}
@@ -262,9 +280,7 @@ void RawChannel::Shutdown() {
"RawChannel::Shutdown called but there is pending data to be read";
// happens on shutdown if didn't call init when doing createduplicate
- if (message_loop_for_io()) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- }
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
// Reset the delegate so that it won't receive further calls.
delegate_ = nullptr;
@@ -281,8 +297,15 @@ void RawChannel::Shutdown() {
EnqueueMessageNoLock(quit_message.Pass());
write_stopped_ = true;
- if (empty)
+ if (!write_ready_) {
+ // Haven't lazy initialized yet. We need to initialize so that we can send
+ // the quit message. Otherwise there could be pending data in the pipe and
+ // we don't want the other side to get a write error until they get finish
+ // reading all the data.
+ LazyInitialize();
+ } else if (empty) {
SendQueuedMessagesNoLock();
+ }
}
ScopedPlatformHandle RawChannel::ReleaseHandle(
@@ -298,7 +321,6 @@ ScopedPlatformHandle RawChannel::ReleaseHandle(
// The Unretained is safe because above cancelled IO so we shouldn't get any
// channel errors.
- // |message_loop_for_io_| might not be set yet
internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannel::Shutdown, base::Unretained(this)));
@@ -310,6 +332,7 @@ ScopedPlatformHandle RawChannel::ReleaseHandle(
// Reminder: This must be thread-safe.
bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
+ EnsureLazyInitialized();
base::AutoLock locker(write_lock_);
if (write_stopped_)
return false;
@@ -337,7 +360,7 @@ bool RawChannel::SendQueuedMessagesNoLock() {
// Even if we're on the I/O thread, don't call |OnError()| in the nested
// context.
pending_error_ = true;
- message_loop_for_io_->PostTask(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannel::LockAndCallOnError,
weak_ptr_factory_.GetWeakPtr(),
@@ -379,7 +402,7 @@ void RawChannel::SetSerializedData(
}
void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
read_lock_.AssertAcquired();
// Keep reading data in a loop, and dispatch messages if enough data is
// received. Exit the loop if any of the following happens:
@@ -440,7 +463,7 @@ void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
void RawChannel::OnWriteCompletedNoLock(IOResult io_result,
size_t platform_handles_written,
size_t bytes_written) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
write_lock_.AssertAcquired();
DCHECK_NE(io_result, IO_PENDING);
@@ -484,7 +507,6 @@ void RawChannel::SerializeWriteBuffer(
void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
write_lock_.AssertAcquired();
- DCHECK(HandleForDebuggingNoLock().is_valid());
write_buffer_->message_queue_.AddMessage(message.Pass());
}
@@ -492,7 +514,7 @@ bool RawChannel::OnReadMessageForRawChannel(
const MessageInTransit::View& message_view) {
if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
pending_error_ = true;
- message_loop_for_io_->PostTask(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
weak_ptr_factory_.GetWeakPtr(),
Delegate::ERROR_READ_SHUTDOWN));
@@ -523,7 +545,7 @@ RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
}
void RawChannel::CallOnError(Delegate::Error error) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
read_lock_.AssertAcquired();
error_occurred_ = true;
if (delegate_) {
diff --git a/mojo/edk/system/raw_channel.h b/mojo/edk/system/raw_channel.h
index 8b0b010..a545942 100644
--- a/mojo/edk/system/raw_channel.h
+++ b/mojo/edk/system/raw_channel.h
@@ -17,10 +17,6 @@
#include "mojo/edk/system/system_impl_export.h"
#include "mojo/public/cpp/system/macros.h"
-namespace base {
-class MessageLoopForIO;
-}
-
namespace mojo {
namespace edk {
@@ -92,8 +88,15 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// *not* take ownership of |delegate|. Both the I/O thread and |delegate| must
// remain alive until |Shutdown()| is called (unless this fails); |delegate|
// will no longer be used after |Shutdown()|.
+ // NOTE: for performance reasons, this doesn't connect to the raw pipe until
+ // either WriteMessage or EnsureLazyInitialized are called. If the delegate
+ // cares about reading data from the pipe or getting OnError notifications,
+ // they must call EnsureLazyInitialized at such point.
void Init(Delegate* delegate);
+ // This can be called on any thread. It's safe to call multiple times.
+ void EnsureLazyInitialized();
+
// This must be called (on the I/O thread) before this object is destroyed.
void Shutdown();
@@ -246,7 +249,6 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
size_t additional_platform_handles_written,
std::vector<char>* buffer);
- base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; }
base::Lock& write_lock() { return write_lock_; }
base::Lock& read_lock() { return read_lock_; }
@@ -275,7 +277,8 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
virtual bool OnReadMessageForRawChannel(
const MessageInTransit::View& message_view);
- virtual PlatformHandle HandleForDebuggingNoLock() = 0;
+ // Returns true iff the pipe handle is valid.
+ virtual bool IsHandleValid() = 0;
// Implementation must write any pending messages synchronously.
// TODO(jam): change to return shared memory with pending serialized msgs.
@@ -382,9 +385,11 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// Acquires read_lock_ and calls OnReadCompletedNoLock.
void CallOnReadCompleted(IOResult io_result, size_t bytes_read);
- // Set in |Init()| and never changed (hence usable on any thread without
- // locking):
- base::MessageLoopForIO* message_loop_for_io_;
+ // Used with PostTask to acquire both locks and call LazyInitialize.
+ void LockAndCallLazyInitialize();
+
+ // Connects to the OS pipe.
+ void LazyInitialize();
@@ -418,6 +423,9 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// either read or write lock. It's read with both acquired.
bool pending_error_;
+ // True iff we connected to the underying pipe.
+ bool initialized_;
+
// This is used for posting tasks from write threads to the I/O thread. It
// must only be accessed under |write_lock_|. The weak pointers it produces
// are only used/invalidated on the I/O thread.
diff --git a/mojo/edk/system/raw_channel_posix.cc b/mojo/edk/system/raw_channel_posix.cc
index 4e97a11..81bf246 100644
--- a/mojo/edk/system/raw_channel_posix.cc
+++ b/mojo/edk/system/raw_channel_posix.cc
@@ -19,6 +19,7 @@
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/synchronization/lock.h"
+#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_channel_utils_posix.h"
#include "mojo/edk/embedder/platform_handle.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
@@ -36,6 +37,8 @@ class RawChannelPosix final : public RawChannel,
explicit RawChannelPosix(ScopedPlatformHandle handle);
~RawChannelPosix() override;
+ PlatformHandle GetFD() { return fd_.get(); }
+
private:
// |RawChannel| protected methods:
// Actually override this so that we can send multiple messages with (only)
@@ -48,7 +51,7 @@ class RawChannelPosix final : public RawChannel,
ScopedPlatformHandle ReleaseHandleNoLock(
std::vector<char>* serialized_read_buffer,
std::vector<char>* serialized_write_buffer) override;
- PlatformHandle HandleForDebuggingNoLock() override;
+ bool IsHandleValid() override;
IOResult Read(size_t* bytes_read) override;
IOResult ScheduleRead() override;
ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
@@ -172,12 +175,12 @@ ScopedPlatformHandle RawChannelPosix::ReleaseHandleNoLock(
return fd_.Pass();
}
-PlatformHandle RawChannelPosix::HandleForDebuggingNoLock() {
- return fd_.get();
+bool RawChannelPosix::IsHandleValid() {
+ return fd_.is_valid();
}
RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(!pending_read_);
IOResult rv = ReadImpl(bytes_read);
@@ -189,7 +192,7 @@ RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
}
RawChannel::IOResult RawChannelPosix::ScheduleRead() {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(!pending_read_);
pending_read_ = true;
@@ -304,15 +307,16 @@ RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
// Set up to wait for the FD to become writable.
// If we're not on the I/O thread, we have to post a task to do this.
- if (base::MessageLoop::current() != message_loop_for_io()) {
- message_loop_for_io()->PostTask(FROM_HERE,
- base::Bind(&RawChannelPosix::WaitToWrite,
- weak_ptr_factory_.GetWeakPtr()));
+ if (!internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) {
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannelPosix::WaitToWrite,
+ weak_ptr_factory_.GetWeakPtr()));
pending_write_ = true;
return IO_PENDING;
}
- if (message_loop_for_io()->WatchFileDescriptor(
+ if (base::MessageLoopForIO::current()->WatchFileDescriptor(
fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
write_watcher_.get(), this)) {
pending_write_ = true;
@@ -323,7 +327,7 @@ RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
}
void RawChannelPosix::OnInit() {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(!read_watcher_);
read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
@@ -333,7 +337,7 @@ void RawChannelPosix::OnInit() {
// I don't know how this can fail (unless |fd_| is bad, in which case it's a
// bug in our code). I also don't know if |WatchFileDescriptor()| actually
// fails cleanly.
- CHECK(message_loop_for_io()->WatchFileDescriptor(
+ CHECK(base::MessageLoopForIO::current()->WatchFileDescriptor(
fd_.get().fd, true, base::MessageLoopForIO::WATCH_READ,
read_watcher_.get(), this));
}
@@ -341,7 +345,7 @@ void RawChannelPosix::OnInit() {
void RawChannelPosix::OnShutdownNoLock(
scoped_ptr<ReadBuffer> /*read_buffer*/,
scoped_ptr<WriteBuffer> /*write_buffer*/) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
write_lock().AssertAcquired();
read_watcher_.reset(); // This will stop watching (if necessary).
@@ -358,7 +362,7 @@ void RawChannelPosix::OnShutdownNoLock(
void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_.get().fd);
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
if (!pending_read_) {
NOTREACHED();
@@ -390,7 +394,7 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_.get().fd);
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
IOResult io_result;
size_t platform_handles_written = 0;
@@ -458,11 +462,11 @@ RawChannel::IOResult RawChannelPosix::ReadImpl(size_t* bytes_read) {
}
void RawChannelPosix::WaitToWrite() {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(write_watcher_);
- if (!message_loop_for_io()->WatchFileDescriptor(
+ if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
write_watcher_.get(), this)) {
base::AutoLock locker(write_lock());
@@ -490,8 +494,8 @@ size_t RawChannel::GetSerializedPlatformHandleSize() {
bool RawChannel::IsOtherEndOf(RawChannel* other) {
DCHECK_NE(other, this);
- PlatformHandle this_handle = HandleForDebuggingNoLock();
- PlatformHandle other_handle = other->HandleForDebuggingNoLock();
+ PlatformHandle this_handle = static_cast<RawChannelPosix*>(this)->GetFD();
+ PlatformHandle other_handle = static_cast<RawChannelPosix*>(other)->GetFD();
struct stat stat1, stat2;
if (fstat(this_handle.fd, &stat1) < 0)
diff --git a/mojo/edk/system/raw_channel_unittest.cc b/mojo/edk/system/raw_channel_unittest.cc
index af75543..86a91cab 100644
--- a/mojo/edk/system/raw_channel_unittest.cc
+++ b/mojo/edk/system/raw_channel_unittest.cc
@@ -58,6 +58,7 @@ bool CheckMessageData(const void* bytes, uint32_t num_bytes) {
void InitOnIOThread(RawChannel* raw_channel, RawChannel::Delegate* delegate) {
raw_channel->Init(delegate);
+ raw_channel->EnsureLazyInitialized();
}
bool WriteTestMessageToHandle(const PlatformHandle& handle,
@@ -366,7 +367,7 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) {
static const size_t kNumWriterThreads = 10;
- static const size_t kNumWriteMessagesPerThread = 4000;
+ static const size_t kNumWriteMessagesPerThread = 400;
WriteOnlyRawChannelDelegate writer_delegate;
RawChannel* writer_rc = RawChannel::Create(handles[0].Pass());
diff --git a/mojo/edk/system/raw_channel_win.cc b/mojo/edk/system/raw_channel_win.cc
index 789b36d..a872355 100644
--- a/mojo/edk/system/raw_channel_win.cc
+++ b/mojo/edk/system/raw_channel_win.cc
@@ -16,6 +16,7 @@
#include "base/synchronization/lock.h"
#include "base/win/scoped_handle.h"
#include "base/win/windows_version.h"
+#include "mojo/edk/embedder/embedder_internal.h"
#include "mojo/edk/embedder/platform_handle.h"
#include "mojo/edk/system/transport_data.h"
#include "mojo/public/cpp/system/macros.h"
@@ -91,6 +92,21 @@ class RawChannelWin final : public RawChannel {
DCHECK(!io_handler_);
}
+ PlatformHandle GetHandle() {
+ // We need to acquire write_lock() and not read_lock() to avoid deadlocks.
+ // The reason is that we acquire read_lock() when calling the delegate,
+ // which in turn is a Dispatcher that has acquired its lock(). But it could
+ // already be in its lock() and calling IsOtherEndOf (i.e. this method).
+ base::AutoLock locker(write_lock());
+ if (handle_.is_valid())
+ return handle_.get();
+
+ if (!io_handler_)
+ return PlatformHandle();
+
+ return PlatformHandle(io_handler_->handle());
+ }
+
private:
// RawChannelIOHandler receives OS notifications for I/O completion. Currently
// this object is only used on the IO thread, other than ReleaseHandle. But
@@ -117,7 +133,6 @@ class RawChannelWin final : public RawChannel {
write_wait_object_(NULL),
read_event_signalled_(false),
write_event_signalled_(false),
- message_loop_for_io_(base::MessageLoop::current()->task_runner()),
weak_ptr_factory_(this) {
memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
@@ -145,20 +160,20 @@ class RawChannelWin final : public RawChannel {
// The following methods are only called by the owner on the I/O thread.
bool pending_read() const {
DCHECK(owner_);
- DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
return pending_read_;
}
base::MessageLoopForIO::IOContext* read_context() {
DCHECK(owner_);
- DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
return &read_context_;
}
// Instructs the object to wait for an OnObjectSignaled notification.
void OnPendingReadStarted() {
DCHECK(owner_);
- DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(!pending_read_);
pending_read_ = true;
read_event_signalled_ = false;
@@ -195,7 +210,7 @@ class RawChannelWin final : public RawChannel {
void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer) {
DCHECK(owner_);
- DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
owner_->read_lock().AssertAcquired();
owner_->write_lock().AssertAcquired();
@@ -304,8 +319,7 @@ class RawChannelWin final : public RawChannel {
// Must be called on the I/O thread. It may be called before or after
// detaching from the owner.
void OnReadCompleted(DWORD bytes_read, DWORD error) {
- DCHECK(!owner_ ||
- base::MessageLoop::current() == owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(suppress_self_destruct_);
if (!owner_) {
pending_read_ = false;
@@ -344,8 +358,7 @@ class RawChannelWin final : public RawChannel {
// Must be called on the I/O thread. It may be called before or after
// detaching from the owner.
void OnWriteCompleted(DWORD bytes_written, DWORD error) {
- DCHECK(!owner_ ||
- base::MessageLoop::current() == owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(suppress_self_destruct_);
if (!owner_) {
@@ -383,8 +396,7 @@ class RawChannelWin final : public RawChannel {
}
void OnObjectSignaled(HANDLE object) {
- DCHECK(!owner_ ||
- base::MessageLoop::current() == owner_->message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
// Since this is called on the IO thread, no locks needed for owner_.
bool handle_is_valid = false;
if (owner_)
@@ -428,7 +440,7 @@ class RawChannelWin final : public RawChannel {
// that that is always a pointer to a valid RawChannelIOHandler.
RawChannelIOHandler* that = static_cast<RawChannelIOHandler*>(param);
that->read_event_signalled_ = true;
- that->message_loop_for_io_->PostTask(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannelIOHandler::OnObjectSignaled,
that->this_weakptr_, that->read_event_.Get()));
@@ -440,7 +452,7 @@ class RawChannelWin final : public RawChannel {
// that that is always a pointer to a valid RawChannelIOHandler.
RawChannelIOHandler* that = static_cast<RawChannelIOHandler*>(param);
that->write_event_signalled_ = true;
- that->message_loop_for_io_->PostTask(
+ internal::g_io_thread_task_runner->PostTask(
FROM_HERE,
base::Bind(&RawChannelIOHandler::OnObjectSignaled,
that->this_weakptr_, that->write_event_.Get()));
@@ -480,7 +492,6 @@ class RawChannelWin final : public RawChannel {
bool write_event_signalled_;
// These are used by the callbacks for the wait event watchers.
- scoped_refptr<base::SingleThreadTaskRunner> message_loop_for_io_;
base::WeakPtr<RawChannelIOHandler> this_weakptr_;
base::WeakPtrFactory<RawChannelIOHandler> weak_ptr_factory_;
@@ -505,18 +516,12 @@ class RawChannelWin final : public RawChannel {
serialized_write_buffer);
}
- PlatformHandle HandleForDebuggingNoLock() override {
- if (handle_.is_valid())
- return handle_.get();
-
- if (!io_handler_)
- return PlatformHandle();
-
- return PlatformHandle(io_handler_->handle());
+ bool IsHandleValid() override {
+ return GetHandle().is_valid();
}
IOResult Read(size_t* bytes_read) override {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
char* buffer = nullptr;
size_t bytes_to_read = 0;
@@ -553,7 +558,7 @@ class RawChannelWin final : public RawChannel {
if (!io_handler_)
return IO_PENDING; // OnInit could have earlied out.
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_read());
@@ -679,7 +684,7 @@ class RawChannelWin final : public RawChannel {
}
void OnInit() override {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
if (!handle_.is_valid()) {
LOG(ERROR) << "Note: RawChannelWin " << this
@@ -694,10 +699,7 @@ class RawChannelWin final : public RawChannel {
void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer) override {
- // happens on shutdown if didn't call init when doing createduplicate
- if (message_loop_for_io()) {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- }
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
if (!io_handler_) {
// This is hit when creating a duplicate dispatcher since we don't call
@@ -748,8 +750,8 @@ size_t RawChannel::GetSerializedPlatformHandleSize() {
bool RawChannel::IsOtherEndOf(RawChannel* other) {
DCHECK_NE(other, this);
- PlatformHandle this_handle = HandleForDebuggingNoLock();
- PlatformHandle other_handle = other->HandleForDebuggingNoLock();
+ PlatformHandle this_handle = static_cast<RawChannelWin*>(this)->GetHandle();
+ PlatformHandle other_handle = static_cast<RawChannelWin*>(other)->GetHandle();
if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
WCHAR data1[_MAX_PATH + sizeof(FILE_NAME_INFO)];