// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "ipc/mojo/ipc_message_pipe_reader.h" #include "base/bind.h" #include "base/bind_helpers.h" #include "base/location.h" #include "base/logging.h" #include "base/single_thread_task_runner.h" #include "base/thread_task_runner_handle.h" #include "ipc/mojo/async_handle_waiter.h" #include "ipc/mojo/ipc_channel_mojo.h" namespace IPC { namespace internal { MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle, MessagePipeReader::Delegate* delegate) : pipe_(handle.Pass()), delegate_(delegate), async_waiter_( new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady, base::Unretained(this)))), pending_send_error_(MOJO_RESULT_OK) { } MessagePipeReader::~MessagePipeReader() { // The pipe should be closed before deletion. CHECK(!IsValid()); DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); } void MessagePipeReader::Close() { // All pending errors should be signaled before Close(). DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK); async_waiter_.reset(); pipe_.reset(); OnPipeClosed(); } void MessagePipeReader::CloseWithError(MojoResult error) { OnPipeError(error); Close(); } void MessagePipeReader::CloseWithErrorIfPending() { if (pending_send_error_ == MOJO_RESULT_OK) return; MojoResult error = pending_send_error_; pending_send_error_ = MOJO_RESULT_OK; CloseWithError(error); return; } void MessagePipeReader::CloseWithErrorLater(MojoResult error) { pending_send_error_ = error; } bool MessagePipeReader::Send(scoped_ptr<Message> message) { DCHECK(IsValid()); message->TraceMessageBegin(); std::vector<MojoHandle> handles; MojoResult result = MOJO_RESULT_OK; result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); if (result == MOJO_RESULT_OK) { result = MojoWriteMessage(handle(), message->data(), static_cast<uint32>(message->size()), handles.empty() ? nullptr : &handles[0], static_cast<uint32>(handles.size()), MOJO_WRITE_MESSAGE_FLAG_NONE); } if (result != MOJO_RESULT_OK) { std::for_each(handles.begin(), handles.end(), &MojoClose); // We cannot call CloseWithError() here as Send() is protected by // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We // cannot call CloseWithError() also because Send() can be called from // non-UI thread while OnPipeError() expects to be called on IO thread. CloseWithErrorLater(result); return false; } return true; } void MessagePipeReader::OnMessageReceived() { Message message(data_buffer().empty() ? "" : &data_buffer()[0], static_cast<uint32>(data_buffer().size())); std::vector<MojoHandle> handle_buffer; TakeHandleBuffer(&handle_buffer); MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message); if (write_result != MOJO_RESULT_OK) { CloseWithError(write_result); return; } message.TraceMessageEnd(); delegate_->OnMessageReceived(message); } void MessagePipeReader::OnPipeClosed() { if (!delegate_) return; delegate_->OnPipeClosed(this); delegate_ = nullptr; } void MessagePipeReader::OnPipeError(MojoResult error) { if (!delegate_) return; delegate_->OnPipeError(this); } MojoResult MessagePipeReader::ReadMessageBytes() { DCHECK(handle_buffer_.empty()); uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); uint32_t num_handles = 0; MojoResult result = MojoReadMessage(pipe_.get().value(), num_bytes ? &data_buffer_[0] : nullptr, &num_bytes, nullptr, &num_handles, MOJO_READ_MESSAGE_FLAG_NONE); data_buffer_.resize(num_bytes); handle_buffer_.resize(num_handles); if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that // it needs more bufer. So we re-read it with resized buffers. result = MojoReadMessage(pipe_.get().value(), num_bytes ? &data_buffer_[0] : nullptr, &num_bytes, num_handles ? &handle_buffer_[0] : nullptr, &num_handles, MOJO_READ_MESSAGE_FLAG_NONE); } DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); return result; } void MessagePipeReader::ReadAvailableMessages() { while (pipe_.is_valid()) { MojoResult read_result = ReadMessageBytes(); if (read_result == MOJO_RESULT_SHOULD_WAIT) break; if (read_result != MOJO_RESULT_OK) { // FAILED_PRECONDITION means that all the received messages // got consumed and the peer is already closed. if (read_result != MOJO_RESULT_FAILED_PRECONDITION) { DLOG(WARNING) << "Pipe got error from ReadMessage(). Closing: " << read_result; OnPipeError(read_result); } Close(); break; } OnMessageReceived(); } } void MessagePipeReader::ReadMessagesThenWait() { while (true) { ReadAvailableMessages(); if (!pipe_.is_valid()) break; // |Wait()| is safe to call only after all messages are read. // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise. // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in // MessagePipe. MojoResult result = async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE); // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages // that have been arrived after the last |ReadAvailableMessages()|. // We have to consume then and retry in that case. if (result != MOJO_RESULT_ALREADY_EXISTS) { if (result != MOJO_RESULT_OK) { LOG(ERROR) << "Failed to wait on the pipe. Result is " << result; OnPipeError(result); Close(); } break; } } } void MessagePipeReader::PipeIsReady(MojoResult wait_result) { CloseWithErrorIfPending(); if (!IsValid()) { // There was a pending error and it closed the pipe. // We cannot do the work anymore. return; } if (wait_result != MOJO_RESULT_OK) { if (wait_result != MOJO_RESULT_ABORTED) { // FAILED_PRECONDITION happens every time the peer is dead so // it isn't worth polluting the log message. LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION) << "Pipe got error from the waiter. Closing: " << wait_result; OnPipeError(wait_result); } Close(); return; } ReadMessagesThenWait(); } void MessagePipeReader::DelayedDeleter::operator()( MessagePipeReader* ptr) const { ptr->Close(); base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, base::Bind(&DeleteNow, ptr)); } } // namespace internal } // namespace IPC