summaryrefslogtreecommitdiffstats
path: root/ipc/mojo/ipc_message_pipe_reader.cc
diff options
context:
space:
mode:
authormorrita@chromium.org <morrita@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-08-04 23:44:17 +0000
committermorrita@chromium.org <morrita@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-08-04 23:44:17 +0000
commit6486088e8bb6dc810157503edfa3c75a58e9e49d (patch)
treeeda9fabacac5b6796c142ec4cc402b47bdf97dd8 /ipc/mojo/ipc_message_pipe_reader.cc
parentd93dbd1248f4e556c9c1c1005f5d051e3fe1efc8 (diff)
downloadchromium_src-6486088e8bb6dc810157503edfa3c75a58e9e49d.zip
chromium_src-6486088e8bb6dc810157503edfa3c75a58e9e49d.tar.gz
chromium_src-6486088e8bb6dc810157503edfa3c75a58e9e49d.tar.bz2
Introduce ChannelMojo
This CL introduces ChannelMojo IPC::Channel implementation and optionally applies it for renderer-browser IPC channel. Current stability is like 5-seconds browser and There are rough edges. It often closes the channel so needs to be more robust. Even though the level of stability, having it in the tree will helps team to try and improve it. BUG=377980 R=darin@chromium.org,jam@chromium.org,viettrungluu@chromium.org TEST=ipc_channel_mojo_unittest.cc Review URL: https://codereview.chromium.org/382333002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@287402 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc/mojo/ipc_message_pipe_reader.cc')
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.cc144
1 files changed, 144 insertions, 0 deletions
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
new file mode 100644
index 0000000..91022ac
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -0,0 +1,144 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "mojo/public/cpp/environment/environment.h"
+
+namespace IPC {
+namespace internal {
+
+MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
+ : pipe_wait_id_(0),
+ pipe_(handle.Pass()) {
+ StartWaiting();
+}
+
+MessagePipeReader::~MessagePipeReader() {
+ CHECK(!IsValid());
+}
+
+void MessagePipeReader::Close() {
+ StopWaiting();
+ pipe_.reset();
+ OnPipeClosed();
+}
+
+void MessagePipeReader::CloseWithError(MojoResult error) {
+ OnPipeError(error);
+ Close();
+}
+
+// static
+void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
+ reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
+}
+
+void MessagePipeReader::StartWaiting() {
+ DCHECK(pipe_.is_valid());
+ DCHECK(!pipe_wait_id_);
+ // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
+ // MessagePipe.
+ //
+ // TODO(morrita): Should we re-set the signal when we get new
+ // message to send?
+ pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
+ pipe_.get().value(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ &InvokePipeIsReady,
+ this);
+}
+
+void MessagePipeReader::StopWaiting() {
+ if (!pipe_wait_id_)
+ return;
+ mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
+ pipe_wait_id_ = 0;
+}
+
+void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
+ pipe_wait_id_ = 0;
+
+ if (wait_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION happens when the pipe is
+ // closed before the waiter is scheduled in a backend thread.
+ if (wait_result != MOJO_RESULT_ABORTED &&
+ wait_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING) << "Pipe got error from the waiter. Closing: "
+ << wait_result;
+ OnPipeError(wait_result);
+ }
+
+ Close();
+ return;
+ }
+
+ while (pipe_.is_valid()) {
+ MojoResult read_result = ReadMessageBytes();
+ if (read_result == MOJO_RESULT_SHOULD_WAIT)
+ break;
+ if (read_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION means that all the received messages
+ // got consumed and the peer is already closed.
+ if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING)
+ << "Pipe got error from ReadMessage(). Closing: " << read_result;
+ OnPipeError(read_result);
+ }
+
+ Close();
+ break;
+ }
+
+ OnMessageReceived();
+ }
+
+ if (pipe_.is_valid())
+ StartWaiting();
+}
+
+MojoResult MessagePipeReader::ReadMessageBytes() {
+ DCHECK(handle_buffer_.empty());
+
+ uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
+ uint32_t num_handles = 0;
+ MojoResult result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ data_buffer_.resize(num_bytes);
+ handle_buffer_.resize(num_handles);
+ if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
+ // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
+ // it needs more bufer. So we re-read it with resized buffers.
+ result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ num_handles ? &handle_buffer_[0] : NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ }
+
+ DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
+ DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
+ return result;
+}
+
+void MessagePipeReader::DelayedDeleter::operator()(
+ MessagePipeReader* ptr) const {
+ ptr->Close();
+ base::MessageLoopProxy::current()->PostTask(
+ FROM_HERE, base::Bind(&DeleteNow, ptr));
+}
+
+} // namespace internal
+} // namespace IPC