diff options
author | amistry <amistry@chromium.org> | 2015-09-02 11:04:22 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-09-02 18:05:40 +0000 |
commit | 0b0e7480d9fd3816cbee891bf1618bc02b112cb4 (patch) | |
tree | 340e6c6a5cfaeb0951737f49dbf035b3884c5044 /ipc/mojo | |
parent | 52c41cb25a551989d516abd79f0134d70a17b8d0 (diff) | |
download | chromium_src-0b0e7480d9fd3816cbee891bf1618bc02b112cb4.zip chromium_src-0b0e7480d9fd3816cbee891bf1618bc02b112cb4.tar.gz chromium_src-0b0e7480d9fd3816cbee891bf1618bc02b112cb4.tar.bz2 |
Fix races with MessagePipeReader due to the Mojo IPC channel being thread-safe.
This change does three things:
1. Checks the threading of MessagePipeReader using a ThreadChecker.
2. Eliminates races on |pending_send_error_| by using atomic ops.
3. Documents and removes checks which are no longer valid because Send()
can be called on a non-IO thread.
BUG=522888,492867
TESTED=Enabled Mojo IPC channel and ran StatsTableBrowserTest.StartWithStatTable
with tsan 100 times.
Before: IPC-related races and DCHECK failures
After: No IPC-related races and DCHECK failures.
Review URL: https://codereview.chromium.org/1318453002
Cr-Commit-Position: refs/heads/master@{#346967}
Diffstat (limited to 'ipc/mojo')
-rw-r--r-- | ipc/mojo/ipc_message_pipe_reader.cc | 32 | ||||
-rw-r--r-- | ipc/mojo/ipc_message_pipe_reader.h | 18 |
2 files changed, 37 insertions, 13 deletions
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc index 2202a57..6609f26 100644 --- a/ipc/mojo/ipc_message_pipe_reader.cc +++ b/ipc/mojo/ipc_message_pipe_reader.cc @@ -19,6 +19,7 @@ namespace internal { MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, MessagePipeReader::Delegate* delegate) : pipe_(handle.Pass()), + handle_copy_(pipe_.get().value()), delegate_(delegate), async_waiter_( new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, @@ -27,40 +28,45 @@ MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, } MessagePipeReader::~MessagePipeReader() { + DCHECK(thread_checker_.CalledOnValidThread()); // The pipe should be closed before deletion. CHECK(!IsValid()); - DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); } void MessagePipeReader::Close() { - // All pending errors should be signaled before Close(). - DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); + DCHECK(thread_checker_.CalledOnValidThread()); async_waiter_.reset(); pipe_.reset(); OnPipeClosed(); } void MessagePipeReader::CloseWithError(MojoResult error) { + DCHECK(thread_checker_.CalledOnValidThread()); OnPipeError(error); Close(); } void MessagePipeReader::CloseWithErrorIfPending() { - if (pending_send_error_ == MOJO_RESULT_OK) + DCHECK(thread_checker_.CalledOnValidThread()); + MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_); + if (pending_error == MOJO_RESULT_OK) return; - MojoResult error = pending_send_error_; - pending_send_error_ = MOJO_RESULT_OK; - CloseWithError(error); + // NOTE: This races with Send(), and therefore the value of + // pending_send_error() can change. + CloseWithError(pending_error); return; } void MessagePipeReader::CloseWithErrorLater(MojoResult error) { - pending_send_error_ = error; + DCHECK_NE(error, MOJO_RESULT_OK); + // NOTE: No assumptions about the value of |pending_send_error_| or whether or + // not the error has been signaled can be made. If Send() is called + // immediately before Close() and errors, it's possible for the error to not + // be signaled. + base::subtle::NoBarrier_Store(&pending_send_error_, error); } bool MessagePipeReader::Send(scoped_ptr<Message> message) { - DCHECK(IsValid()); - TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), "MessagePipeReader::Send", message->flags(), @@ -111,6 +117,7 @@ void MessagePipeReader::OnMessageReceived() { } void MessagePipeReader::OnPipeClosed() { + DCHECK(thread_checker_.CalledOnValidThread()); if (!delegate_) return; delegate_->OnPipeClosed(this); @@ -118,12 +125,14 @@ void MessagePipeReader::OnPipeClosed() { } void MessagePipeReader::OnPipeError(MojoResult error) { + DCHECK(thread_checker_.CalledOnValidThread()); if (!delegate_) return; delegate_->OnPipeError(this); } MojoResult MessagePipeReader::ReadMessageBytes() { + DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(handle_buffer_.empty()); uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); @@ -153,6 +162,7 @@ MojoResult MessagePipeReader::ReadMessageBytes() { } void MessagePipeReader::ReadAvailableMessages() { + DCHECK(thread_checker_.CalledOnValidThread()); while (pipe_.is_valid()) { MojoResult read_result = ReadMessageBytes(); if (read_result == MOJO_RESULT_SHOULD_WAIT) @@ -171,6 +181,7 @@ void MessagePipeReader::ReadAvailableMessages() { } void MessagePipeReader::ReadMessagesThenWait() { + DCHECK(thread_checker_.CalledOnValidThread()); while (true) { ReadAvailableMessages(); if (!pipe_.is_valid()) @@ -197,6 +208,7 @@ void MessagePipeReader::ReadMessagesThenWait() { } void MessagePipeReader::PipeIsReady(MojoResult wait_result) { + DCHECK(thread_checker_.CalledOnValidThread()); CloseWithErrorIfPending(); if (!IsValid()) { // There was a pending error and it closed the pipe. diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h index 937930e..b9c11c6 100644 --- a/ipc/mojo/ipc_message_pipe_reader.h +++ b/ipc/mojo/ipc_message_pipe_reader.h @@ -7,8 +7,10 @@ #include <vector> +#include "base/atomicops.h" #include "base/compiler_specific.h" #include "base/memory/scoped_ptr.h" +#include "base/threading/thread_checker.h" #include "ipc/ipc_message.h" #include "third_party/mojo/src/mojo/public/c/environment/async_waiter.h" #include "third_party/mojo/src/mojo/public/cpp/system/core.h" @@ -30,7 +32,9 @@ class AsyncHandleWaiter; // * Create the subclass instance with a MessagePipeHandle. // The constructor automatically start listening on the pipe. // -// MessageReader has to be used in IO thread. It isn't thread-safe. +// All functions must be called on the IO thread, except for Send(), which can +// be called on any thread. All |Delegate| functions will be called on the IO +// thread. // class MessagePipeReader { public: @@ -62,7 +66,7 @@ class MessagePipeReader { MessagePipeReader(mojo::ScopedMessagePipeHandle handle, Delegate* delegate); virtual ~MessagePipeReader(); - MojoHandle handle() const { return pipe_.get().value(); } + MojoHandle handle() const { return handle_copy_; } // Returns received bytes. const std::vector<char>& data_buffer() const { @@ -100,10 +104,18 @@ class MessagePipeReader { std::vector<char> data_buffer_; std::vector<MojoHandle> handle_buffer_; mojo::ScopedMessagePipeHandle pipe_; + // Constant copy of the message pipe handle. For use by Send(), which can run + // concurrently on non-IO threads. + // TODO(amistry): This isn't quite right because handles can be re-used and + // using this can run into the ABA problem. Currently, this is highly unlikely + // because Mojo internally uses an increasing uint32_t as handle values, but + // this could change. See crbug.com/524894. + const MojoHandle handle_copy_; // |delegate_| and |async_waiter_| are null once the message pipe is closed. Delegate* delegate_; scoped_ptr<AsyncHandleWaiter> async_waiter_; - MojoResult pending_send_error_; + base::subtle::Atomic32 pending_send_error_; + base::ThreadChecker thread_checker_; DISALLOW_COPY_AND_ASSIGN(MessagePipeReader); }; |