// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "ipc/mojo/ipc_message_pipe_reader.h" #include "base/bind.h" #include "base/bind_helpers.h" #include "base/location.h" #include "base/logging.h" #include "base/message_loop/message_loop_proxy.h" #include "mojo/public/cpp/environment/environment.h" namespace IPC { namespace internal { MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle) : pipe_wait_id_(0), pipe_(handle.Pass()) { StartWaiting(); } MessagePipeReader::~MessagePipeReader() { CHECK(!IsValid()); } void MessagePipeReader::Close() { StopWaiting(); pipe_.reset(); OnPipeClosed(); } void MessagePipeReader::CloseWithError(MojoResult error) { OnPipeError(error); Close(); } // static void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) { reinterpret_cast(closure)->PipeIsReady(result); } void MessagePipeReader::StartWaiting() { DCHECK(pipe_.is_valid()); DCHECK(!pipe_wait_id_); // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in // MessagePipe. // // TODO(morrita): Should we re-set the signal when we get new // message to send? pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait( pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, &InvokePipeIsReady, this); } void MessagePipeReader::StopWaiting() { if (!pipe_wait_id_) return; mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_); pipe_wait_id_ = 0; } void MessagePipeReader::PipeIsReady(MojoResult wait_result) { pipe_wait_id_ = 0; if (wait_result != MOJO_RESULT_OK) { if (wait_result != MOJO_RESULT_ABORTED) { // FAILED_PRECONDITION happens every time the peer is dead so // it isn't worth polluting the log message. DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION) << "Pipe got error from the waiter. Closing: " << wait_result; OnPipeError(wait_result); } Close(); return; } while (pipe_.is_valid()) { MojoResult read_result = ReadMessageBytes(); if (read_result == MOJO_RESULT_SHOULD_WAIT) break; if (read_result != MOJO_RESULT_OK) { // FAILED_PRECONDITION means that all the received messages // got consumed and the peer is already closed. if (read_result != MOJO_RESULT_FAILED_PRECONDITION) { DLOG(WARNING) << "Pipe got error from ReadMessage(). Closing: " << read_result; OnPipeError(read_result); } Close(); break; } OnMessageReceived(); } if (pipe_.is_valid()) StartWaiting(); } MojoResult MessagePipeReader::ReadMessageBytes() { DCHECK(handle_buffer_.empty()); uint32_t num_bytes = static_cast(data_buffer_.size()); uint32_t num_handles = 0; MojoResult result = MojoReadMessage(pipe_.get().value(), num_bytes ? &data_buffer_[0] : NULL, &num_bytes, NULL, &num_handles, MOJO_READ_MESSAGE_FLAG_NONE); data_buffer_.resize(num_bytes); handle_buffer_.resize(num_handles); if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that // it needs more bufer. So we re-read it with resized buffers. result = MojoReadMessage(pipe_.get().value(), num_bytes ? &data_buffer_[0] : NULL, &num_bytes, num_handles ? &handle_buffer_[0] : NULL, &num_handles, MOJO_READ_MESSAGE_FLAG_NONE); } DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); return result; } void MessagePipeReader::DelayedDeleter::operator()( MessagePipeReader* ptr) const { ptr->Close(); base::MessageLoopProxy::current()->PostTask( FROM_HERE, base::Bind(&DeleteNow, ptr)); } } // namespace internal } // namespace IPC