summaryrefslogtreecommitdiffstats
path: root/ipc/mojo
diff options
context:
space:
mode:
authormorrita@chromium.org <morrita@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-08-04 23:44:17 +0000
committermorrita@chromium.org <morrita@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-08-04 23:44:17 +0000
commit6486088e8bb6dc810157503edfa3c75a58e9e49d (patch)
treeeda9fabacac5b6796c142ec4cc402b47bdf97dd8 /ipc/mojo
parentd93dbd1248f4e556c9c1c1005f5d051e3fe1efc8 (diff)
downloadchromium_src-6486088e8bb6dc810157503edfa3c75a58e9e49d.zip
chromium_src-6486088e8bb6dc810157503edfa3c75a58e9e49d.tar.gz
chromium_src-6486088e8bb6dc810157503edfa3c75a58e9e49d.tar.bz2
Introduce ChannelMojo
This CL introduces ChannelMojo IPC::Channel implementation and optionally applies it for renderer-browser IPC channel. Current stability is like 5-seconds browser and There are rough edges. It often closes the channel so needs to be more robust. Even though the level of stability, having it in the tree will helps team to try and improve it. BUG=377980 R=darin@chromium.org,jam@chromium.org,viettrungluu@chromium.org TEST=ipc_channel_mojo_unittest.cc Review URL: https://codereview.chromium.org/382333002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@287402 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc/mojo')
-rw-r--r--ipc/mojo/BUILD.gn41
-rw-r--r--ipc/mojo/DEPS4
-rw-r--r--ipc/mojo/ipc_channel_mojo.cc596
-rw-r--r--ipc/mojo/ipc_channel_mojo.h131
-rw-r--r--ipc/mojo/ipc_channel_mojo_unittest.cc260
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.cc144
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.h98
-rw-r--r--ipc/mojo/ipc_mojo.gyp68
-rw-r--r--ipc/mojo/run_all_unittests.cc42
9 files changed, 1384 insertions, 0 deletions
diff --git a/ipc/mojo/BUILD.gn b/ipc/mojo/BUILD.gn
new file mode 100644
index 0000000..accc89d
--- /dev/null
+++ b/ipc/mojo/BUILD.gn
@@ -0,0 +1,41 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+component("mojo") {
+ sources = [
+ "ipc_channel_mojo.cc",
+ "ipc_channel_mojo.h",
+ "ipc_message_pipe_reader.cc",
+ "ipc_message_pipe_reader.h",
+ ]
+
+ deps = [
+ "//base",
+ "//ipc",
+ "//mojo/public/cpp/bindings",
+ "//mojo/system",
+ # TODO(viettrungluu): Needed for base/lazy_instance.h, which is suspect.
+ "//base/third_party/dynamic_annotations",
+ ]
+}
+
+test("ipc_mojo_unittests") {
+ sources = [
+ "ipc_channel_mojo_unittest.cc",
+ "run_all_unittests.cc",
+ ]
+
+ deps = [
+ "//base",
+ # TODO(viettrungluu): Needed for base/lazy_instance.h, which is suspect.
+ "//base/test:test_support",
+ "//base/third_party/dynamic_annotations",
+ "//ipc",
+ "//ipc:test_support",
+ "//ipc/mojo",
+ "//mojo/environment:chromium",
+ "//mojo/system",
+ "//url",
+ ]
+}
diff --git a/ipc/mojo/DEPS b/ipc/mojo/DEPS
new file mode 100644
index 0000000..6706a1f
--- /dev/null
+++ b/ipc/mojo/DEPS
@@ -0,0 +1,4 @@
+include_rules = [
+ "+mojo/public",
+ "+mojo/embedder",
+] \ No newline at end of file
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
new file mode 100644
index 0000000..27d35b7
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -0,0 +1,596 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_channel_mojo.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/lazy_instance.h"
+#include "ipc/ipc_listener.h"
+#include "mojo/embedder/embedder.h"
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+#include "ipc/file_descriptor_set_posix.h"
+#endif
+
+namespace IPC {
+
+namespace {
+
+// IPC::Listener for bootstrap channels.
+// It should never receive any message.
+class NullListener : public Listener {
+ public:
+ virtual bool OnMessageReceived(const Message&) OVERRIDE {
+ NOTREACHED();
+ return false;
+ }
+
+ virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnBadMessageReceived(const Message& message) OVERRIDE {
+ NOTREACHED();
+ }
+};
+
+base::LazyInstance<NullListener> g_null_listener = LAZY_INSTANCE_INITIALIZER;
+
+class MojoChannelFactory : public ChannelFactory {
+ public:
+ MojoChannelFactory(
+ ChannelHandle channel_handle,
+ Channel::Mode mode,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner)
+ : channel_handle_(channel_handle),
+ mode_(mode),
+ io_thread_task_runner_(io_thread_task_runner) {
+ }
+
+ virtual std::string GetName() const OVERRIDE {
+ return channel_handle_.name;
+ }
+
+ virtual scoped_ptr<Channel> BuildChannel(Listener* listener) OVERRIDE {
+ return ChannelMojo::Create(
+ channel_handle_,
+ mode_,
+ listener,
+ io_thread_task_runner_).PassAs<Channel>();
+ }
+
+ private:
+ ChannelHandle channel_handle_;
+ Channel::Mode mode_;
+ scoped_refptr<base::TaskRunner> io_thread_task_runner_;
+};
+
+mojo::embedder::PlatformHandle ToPlatformHandle(
+ const ChannelHandle& handle) {
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ return mojo::embedder::PlatformHandle(handle.socket.fd);
+#elif defined(OS_WIN)
+ return mojo::embedder::PlatformHandle(handle.pipe.handle);
+#else
+#error "Unsupported Platform!"
+#endif
+}
+
+//------------------------------------------------------------------------------
+
+// TODO(morrita): This should be built using higher-level Mojo construct
+// for clarity and extensibility.
+class HelloMessage {
+ public:
+ static Pickle CreateRequest(int32 pid) {
+ Pickle request;
+ request.WriteString(kHelloRequestMagic);
+ request.WriteInt(pid);
+ return request;
+ }
+
+ static bool ReadRequest(Pickle& pickle, int32* pid) {
+ PickleIterator iter(pickle);
+ std::string hello;
+ if (!iter.ReadString(&hello)) {
+ DLOG(WARNING) << "Failed to Read magic string.";
+ return false;
+ }
+
+ if (hello != kHelloRequestMagic) {
+ DLOG(WARNING) << "Magic mismatch:" << hello;
+ return false;
+ }
+
+ int read_pid;
+ if (!iter.ReadInt(&read_pid)) {
+ DLOG(WARNING) << "Failed to Read PID.";
+ return false;
+ }
+
+ *pid = read_pid;
+ return true;
+ }
+
+ static Pickle CreateResponse(int32 pid) {
+ Pickle request;
+ request.WriteString(kHelloResponseMagic);
+ request.WriteInt(pid);
+ return request;
+ }
+
+ static bool ReadResponse(Pickle& pickle, int32* pid) {
+ PickleIterator iter(pickle);
+ std::string hello;
+ if (!iter.ReadString(&hello)) {
+ DLOG(WARNING) << "Failed to read magic string.";
+ return false;
+ }
+
+ if (hello != kHelloResponseMagic) {
+ DLOG(WARNING) << "Magic mismatch:" << hello;
+ return false;
+ }
+
+ int read_pid;
+ if (!iter.ReadInt(&read_pid)) {
+ DLOG(WARNING) << "Failed to read PID.";
+ return false;
+ }
+
+ *pid = read_pid;
+ return true;
+ }
+
+ private:
+ static const char* kHelloRequestMagic;
+ static const char* kHelloResponseMagic;
+};
+
+const char* HelloMessage::kHelloRequestMagic = "MREQ";
+const char* HelloMessage::kHelloResponseMagic = "MRES";
+
+} // namespace
+
+//------------------------------------------------------------------------------
+
+// A MessagePipeReader implemenation for IPC::Message communication.
+class ChannelMojo::MessageReader : public internal::MessagePipeReader {
+ public:
+ MessageReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : internal::MessagePipeReader(pipe.Pass()),
+ owner_(owner) {}
+
+ bool Send(scoped_ptr<Message> message);
+ virtual void OnMessageReceived() OVERRIDE;
+ virtual void OnPipeClosed() OVERRIDE;
+ virtual void OnPipeError(MojoResult error) OVERRIDE;
+
+ private:
+ ChannelMojo* owner_;
+};
+
+void ChannelMojo::MessageReader::OnMessageReceived() {
+ Message message(data_buffer().empty() ? "" : &data_buffer()[0],
+ static_cast<uint32>(data_buffer().size()));
+
+ std::vector<MojoHandle> handle_buffer;
+ TakeHandleBuffer(&handle_buffer);
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ for (size_t i = 0; i < handle_buffer.size(); ++i) {
+ mojo::embedder::ScopedPlatformHandle platform_handle;
+ MojoResult unwrap_result = mojo::embedder::PassWrappedPlatformHandle(
+ handle_buffer[i], &platform_handle);
+ if (unwrap_result != MOJO_RESULT_OK) {
+ DLOG(WARNING) << "Pipe failed to covert handles. Closing: "
+ << unwrap_result;
+ CloseWithError(unwrap_result);
+ return;
+ }
+
+ bool ok = message.file_descriptor_set()->Add(platform_handle.release().fd);
+ DCHECK(ok);
+ }
+#else
+ DCHECK(handle_buffer.empty());
+#endif
+
+ message.TraceMessageEnd();
+ owner_->OnMessageReceived(message);
+}
+
+void ChannelMojo::MessageReader::OnPipeClosed() {
+ if (!owner_)
+ return;
+ owner_->OnPipeClosed(this);
+ owner_ = NULL;
+}
+
+void ChannelMojo::MessageReader::OnPipeError(MojoResult error) {
+ if (!owner_)
+ return;
+ owner_->OnPipeError(this);
+}
+
+bool ChannelMojo::MessageReader::Send(scoped_ptr<Message> message) {
+ DCHECK(IsValid());
+
+ message->TraceMessageBegin();
+ std::vector<MojoHandle> handles;
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ if (message->HasFileDescriptors()) {
+ FileDescriptorSet* fdset = message->file_descriptor_set();
+ for (size_t i = 0; i < fdset->size(); ++i) {
+ MojoHandle wrapped_handle;
+ MojoResult wrap_result = CreatePlatformHandleWrapper(
+ mojo::embedder::ScopedPlatformHandle(
+ mojo::embedder::PlatformHandle(
+ fdset->GetDescriptorAt(i))),
+ &wrapped_handle);
+ if (MOJO_RESULT_OK != wrap_result) {
+ DLOG(WARNING) << "Pipe failed to wrap handles. Closing: "
+ << wrap_result;
+ CloseWithError(wrap_result);
+ return false;
+ }
+
+ handles.push_back(wrapped_handle);
+ }
+ }
+#endif
+ MojoResult write_result = MojoWriteMessage(
+ handle(),
+ message->data(), static_cast<uint32>(message->size()),
+ handles.empty() ? NULL : &handles[0],
+ static_cast<uint32>(handles.size()),
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ if (MOJO_RESULT_OK != write_result) {
+ CloseWithError(write_result);
+ return false;
+ }
+
+ return true;
+}
+
+//------------------------------------------------------------------------------
+
+// MessagePipeReader implemenation for control messages.
+// Actual message handling is implemented by sublcasses.
+class ChannelMojo::ControlReader : public internal::MessagePipeReader {
+ public:
+ ControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : internal::MessagePipeReader(pipe.Pass()),
+ owner_(owner) {}
+
+ virtual bool Connect() { return true; }
+ virtual void OnPipeClosed() OVERRIDE;
+ virtual void OnPipeError(MojoResult error) OVERRIDE;
+
+ protected:
+ ChannelMojo* owner_;
+};
+
+void ChannelMojo::ControlReader::OnPipeClosed() {
+ if (!owner_)
+ return;
+ owner_->OnPipeClosed(this);
+ owner_ = NULL;
+}
+
+void ChannelMojo::ControlReader::OnPipeError(MojoResult error) {
+ if (!owner_)
+ return;
+ owner_->OnPipeError(this);
+}
+
+//------------------------------------------------------------------------------
+
+// ControlReader for server-side ChannelMojo.
+class ChannelMojo::ServerControlReader : public ChannelMojo::ControlReader {
+ public:
+ ServerControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : ControlReader(pipe.Pass(), owner) { }
+
+ virtual bool Connect() OVERRIDE;
+ virtual void OnMessageReceived() OVERRIDE;
+
+ private:
+ MojoResult SendHelloRequest();
+ MojoResult RespondHelloResponse();
+
+ mojo::ScopedMessagePipeHandle message_pipe_;
+};
+
+bool ChannelMojo::ServerControlReader::Connect() {
+ MojoResult result = SendHelloRequest();
+ if (result != MOJO_RESULT_OK) {
+ CloseWithError(result);
+ return false;
+ }
+
+ return true;
+}
+
+MojoResult ChannelMojo::ServerControlReader::SendHelloRequest() {
+ DCHECK(IsValid());
+ DCHECK(!message_pipe_.is_valid());
+
+ mojo::ScopedMessagePipeHandle self;
+ mojo::ScopedMessagePipeHandle peer;
+ MojoResult create_result = mojo::CreateMessagePipe(
+ NULL, &message_pipe_, &peer);
+ if (MOJO_RESULT_OK != create_result) {
+ DLOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
+ return create_result;
+ }
+
+ MojoHandle peer_to_send = peer.get().value();
+ Pickle request = HelloMessage::CreateRequest(owner_->GetSelfPID());
+ MojoResult write_result = MojoWriteMessage(
+ handle(),
+ request.data(), static_cast<uint32>(request.size()),
+ &peer_to_send, 1,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ if (MOJO_RESULT_OK != write_result) {
+ DLOG(WARNING) << "Writing Hello request failed: " << create_result;
+ return write_result;
+ }
+
+ // |peer| is sent and no longer owned by |this|.
+ (void)peer.release();
+ return MOJO_RESULT_OK;
+}
+
+MojoResult ChannelMojo::ServerControlReader::RespondHelloResponse() {
+ Pickle request(data_buffer().empty() ? "" : &data_buffer()[0],
+ static_cast<uint32>(data_buffer().size()));
+
+ int32 read_pid = 0;
+ if (!HelloMessage::ReadResponse(request, &read_pid)) {
+ DLOG(ERROR) << "Failed to parse Hello response.";
+ return MOJO_RESULT_UNKNOWN;
+ }
+
+ base::ProcessId pid = static_cast<base::ProcessId>(read_pid);
+ owner_->set_peer_pid(pid);
+ owner_->OnConnected(message_pipe_.Pass());
+ return MOJO_RESULT_OK;
+}
+
+void ChannelMojo::ServerControlReader::OnMessageReceived() {
+ MojoResult result = RespondHelloResponse();
+ if (result != MOJO_RESULT_OK)
+ CloseWithError(result);
+}
+
+//------------------------------------------------------------------------------
+
+// ControlReader for client-side ChannelMojo.
+class ChannelMojo::ClientControlReader : public ChannelMojo::ControlReader {
+ public:
+ ClientControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+ : ControlReader(pipe.Pass(), owner) {}
+
+ virtual void OnMessageReceived() OVERRIDE;
+
+ private:
+ MojoResult RespondHelloRequest(MojoHandle message_channel);
+};
+
+MojoResult ChannelMojo::ClientControlReader::RespondHelloRequest(
+ MojoHandle message_channel) {
+ DCHECK(IsValid());
+
+ mojo::ScopedMessagePipeHandle received_pipe(
+ (mojo::MessagePipeHandle(message_channel)));
+
+ int32 read_request = 0;
+ Pickle request(data_buffer().empty() ? "" : &data_buffer()[0],
+ static_cast<uint32>(data_buffer().size()));
+ if (!HelloMessage::ReadRequest(request, &read_request)) {
+ DLOG(ERROR) << "Hello request has wrong magic.";
+ return MOJO_RESULT_UNKNOWN;
+ }
+
+ base::ProcessId pid = read_request;
+ Pickle response = HelloMessage::CreateResponse(owner_->GetSelfPID());
+ MojoResult write_result = MojoWriteMessage(
+ handle(),
+ response.data(), static_cast<uint32>(response.size()),
+ NULL, 0,
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+ if (MOJO_RESULT_OK != write_result) {
+ DLOG(ERROR) << "Writing Hello response failed: " << write_result;
+ return write_result;
+ }
+
+ owner_->set_peer_pid(pid);
+ owner_->OnConnected(received_pipe.Pass());
+ return MOJO_RESULT_OK;
+}
+
+void ChannelMojo::ClientControlReader::OnMessageReceived() {
+ std::vector<MojoHandle> handle_buffer;
+ TakeHandleBuffer(&handle_buffer);
+ if (handle_buffer.size() != 1) {
+ DLOG(ERROR) << "Hello request doesn't contains required handle: "
+ << handle_buffer.size();
+ CloseWithError(MOJO_RESULT_UNKNOWN);
+ return;
+ }
+
+ MojoResult result = RespondHelloRequest(handle_buffer[0]);
+ if (result != MOJO_RESULT_OK) {
+ DLOG(ERROR) << "Failed to respond Hello request. Closing: "
+ << result;
+ CloseWithError(result);
+ }
+}
+
+//------------------------------------------------------------------------------
+
+void ChannelMojo::ChannelInfoDeleter::operator()(
+ mojo::embedder::ChannelInfo* ptr) const {
+ mojo::embedder::DestroyChannelOnIOThread(ptr);
+}
+
+//------------------------------------------------------------------------------
+
+// static
+scoped_ptr<ChannelMojo> ChannelMojo::Create(
+ scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ return make_scoped_ptr(new ChannelMojo(
+ bootstrap.Pass(), mode, listener, io_thread_task_runner));
+}
+
+// static
+scoped_ptr<ChannelMojo> ChannelMojo::Create(
+ const ChannelHandle &channel_handle, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ return Create(
+ Channel::Create(channel_handle, mode, g_null_listener.Pointer()),
+ mode, listener, io_thread_task_runner);
+}
+
+// static
+scoped_ptr<ChannelFactory> ChannelMojo::CreateFactory(
+ const ChannelHandle &channel_handle, Mode mode,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+ return make_scoped_ptr(
+ new MojoChannelFactory(
+ channel_handle, mode,
+ io_thread_task_runner)).PassAs<ChannelFactory>();
+}
+
+ChannelMojo::ChannelMojo(
+ scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner)
+ : weak_factory_(this),
+ bootstrap_(bootstrap.Pass()),
+ mode_(mode), listener_(listener),
+ peer_pid_(base::kNullProcessId) {
+ DCHECK(mode_ == MODE_SERVER || mode_ == MODE_CLIENT);
+ mojo::ScopedMessagePipeHandle control_pipe
+ = mojo::embedder::CreateChannel(
+ mojo::embedder::ScopedPlatformHandle(
+ ToPlatformHandle(bootstrap_->TakePipeHandle())),
+ io_thread_task_runner,
+ base::Bind(&ChannelMojo::DidCreateChannel, base::Unretained(this)),
+ io_thread_task_runner);
+
+ // MessagePipeReader, that is crated in InitOnIOThread(), should live only in
+ // IO thread, but IPC::Channel can be instantiated outside of it.
+ // So we move the creation to the appropriate thread.
+ if (base::MessageLoopProxy::current() == io_thread_task_runner) {
+ InitOnIOThread(control_pipe.Pass());
+ } else {
+ io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelMojo::InitOnIOThread,
+ weak_factory_.GetWeakPtr(),
+ base::Passed(control_pipe.Pass())));
+ }
+}
+
+ChannelMojo::~ChannelMojo() {
+ Close();
+}
+
+void ChannelMojo::InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe) {
+ control_reader_ = CreateControlReader(control_pipe.Pass());
+}
+
+scoped_ptr<ChannelMojo::ControlReader> ChannelMojo::CreateControlReader(
+ mojo::ScopedMessagePipeHandle pipe) {
+ if (MODE_SERVER == mode_) {
+ return make_scoped_ptr(
+ new ServerControlReader(pipe.Pass(), this)).PassAs<ControlReader>();
+ }
+
+ DCHECK(mode_ == MODE_CLIENT);
+ return make_scoped_ptr(
+ new ClientControlReader(pipe.Pass(), this)).PassAs<ControlReader>();
+}
+
+bool ChannelMojo::Connect() {
+ DCHECK(!message_reader_);
+ return control_reader_->Connect();
+}
+
+void ChannelMojo::Close() {
+ control_reader_.reset();
+ message_reader_.reset();
+ channel_info_.reset();
+}
+
+void ChannelMojo::OnConnected(mojo::ScopedMessagePipeHandle pipe) {
+ message_reader_ = make_scoped_ptr(new MessageReader(pipe.Pass(), this));
+
+ for (size_t i = 0; i < pending_messages_.size(); ++i) {
+ message_reader_->Send(make_scoped_ptr(pending_messages_[i]));
+ pending_messages_[i] = NULL;
+ }
+
+ pending_messages_.clear();
+
+ listener_->OnChannelConnected(GetPeerPID());
+}
+
+void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
+ Close();
+}
+
+void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
+ listener_->OnChannelError();
+}
+
+
+bool ChannelMojo::Send(Message* message) {
+ if (!message_reader_) {
+ pending_messages_.push_back(message);
+ return true;
+ }
+
+ return message_reader_->Send(make_scoped_ptr(message));
+}
+
+base::ProcessId ChannelMojo::GetPeerPID() const {
+ return peer_pid_;
+}
+
+base::ProcessId ChannelMojo::GetSelfPID() const {
+ return bootstrap_->GetSelfPID();
+}
+
+ChannelHandle ChannelMojo::TakePipeHandle() {
+ return bootstrap_->TakePipeHandle();
+}
+
+void ChannelMojo::DidCreateChannel(mojo::embedder::ChannelInfo* info) {
+ channel_info_.reset(info);
+}
+
+void ChannelMojo::OnMessageReceived(Message& message) {
+ listener_->OnMessageReceived(message);
+ if (message.dispatch_error())
+ listener_->OnBadMessageReceived(message);
+}
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+int ChannelMojo::GetClientFileDescriptor() const {
+ return bootstrap_->GetClientFileDescriptor();
+}
+
+int ChannelMojo::TakeClientFileDescriptor() {
+ return bootstrap_->TakeClientFileDescriptor();
+}
+#endif // defined(OS_POSIX) && !defined(OS_NACL)
+
+} // namespace IPC
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
new file mode 100644
index 0000000..b00abc9
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -0,0 +1,131 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_CHANNEL_MOJO_H_
+#define IPC_IPC_CHANNEL_MOJO_H_
+
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/scoped_vector.h"
+#include "base/memory/weak_ptr.h"
+#include "ipc/ipc_channel.h"
+#include "ipc/ipc_channel_factory.h"
+#include "ipc/ipc_export.h"
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace mojo {
+namespace embedder {
+struct ChannelInfo;
+}
+}
+
+namespace IPC {
+
+// Mojo-based IPC::Channel implementation over a platform handle.
+//
+// ChannelMojo builds Mojo MessagePipe using underlying pipe given by
+// "bootstrap" IPC::Channel which creates and owns platform pipe like
+// named socket. The bootstrap Channel is used only for establishing
+// the underlying connection. ChannelMojo takes its handle over once
+// the it is made and puts MessagePipe on it.
+//
+// ChannelMojo has a couple of MessagePipes:
+//
+// * The first MessagePipe, which is built on top of bootstrap handle,
+// is the "control" pipe. It is used to communicate out-of-band
+// control messages that aren't visible from IPC::Listener.
+//
+// * The second MessagePipe, which is created by the server channel
+// and sent to client Channel over the control pipe, is used
+// to send IPC::Messages as an IPC::Sender.
+//
+// TODO(morrita): Extract handle creation part of IPC::Channel into
+// separate class to clarify what ChannelMojo relies
+// on.
+// TODO(morrita): Add APIs to create extra MessagePipes to let
+// Mojo-based objects talk over this Channel.
+//
+class IPC_MOJO_EXPORT ChannelMojo : public Channel {
+ public:
+ // Create ChannelMojo on top of given |bootstrap| channel.
+ static scoped_ptr<ChannelMojo> Create(
+ scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ // Create ChannelMojo. A bootstrap channel is created as well.
+ static scoped_ptr<ChannelMojo> Create(
+ const ChannelHandle &channel_handle, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ // Create a factory object for ChannelMojo.
+ // The factory is used to create Mojo-based ChannelProxy family.
+ static scoped_ptr<ChannelFactory> CreateFactory(
+ const ChannelHandle &channel_handle, Mode mode,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ virtual ~ChannelMojo();
+
+ // Channel implementation
+ virtual bool Connect() OVERRIDE;
+ virtual void Close() OVERRIDE;
+ virtual bool Send(Message* message) OVERRIDE;
+ virtual base::ProcessId GetPeerPID() const OVERRIDE;
+ virtual base::ProcessId GetSelfPID() const OVERRIDE;
+ virtual ChannelHandle TakePipeHandle() OVERRIDE;
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ virtual int GetClientFileDescriptor() const OVERRIDE;
+ virtual int TakeClientFileDescriptor() OVERRIDE;
+#endif // defined(OS_POSIX) && !defined(OS_NACL)
+
+ // Called from MessagePipeReader implementations
+ void OnMessageReceived(Message& message);
+ void OnConnected(mojo::ScopedMessagePipeHandle pipe);
+ void OnPipeClosed(internal::MessagePipeReader* reader);
+ void OnPipeError(internal::MessagePipeReader* reader);
+ void set_peer_pid(base::ProcessId pid) { peer_pid_ = pid; }
+
+ private:
+ struct ChannelInfoDeleter {
+ void operator()(mojo::embedder::ChannelInfo* ptr) const;
+ };
+
+ // ChannelMojo needs to kill its MessagePipeReader in delayed manner
+ // because the channel wants to kill these readers during the
+ // notifications invoked by them.
+ typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;
+
+ class ControlReader;
+ class ServerControlReader;
+ class ClientControlReader;
+ class MessageReader;
+
+ ChannelMojo(scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+ scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+ void InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe);
+ scoped_ptr<ControlReader> CreateControlReader(
+ mojo::ScopedMessagePipeHandle pipe);
+ void DidCreateChannel(mojo::embedder::ChannelInfo*);
+
+ base::WeakPtrFactory<ChannelMojo> weak_factory_;
+ scoped_ptr<Channel> bootstrap_;
+ Mode mode_;
+ Listener* listener_;
+ base::ProcessId peer_pid_;
+ scoped_ptr<mojo::embedder::ChannelInfo,
+ ChannelInfoDeleter> channel_info_;
+
+ scoped_ptr<ControlReader, ReaderDeleter> control_reader_;
+ scoped_ptr<MessageReader, ReaderDeleter> message_reader_;
+ ScopedVector<Message> pending_messages_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChannelMojo);
+};
+
+} // namespace IPC
+
+#endif // IPC_IPC_CHANNEL_MOJO_H_
diff --git a/ipc/mojo/ipc_channel_mojo_unittest.cc b/ipc/mojo/ipc_channel_mojo_unittest.cc
new file mode 100644
index 0000000..915029d
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo_unittest.cc
@@ -0,0 +1,260 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_channel_mojo.h"
+
+#include "base/base_paths.h"
+#include "base/files/file.h"
+#include "base/message_loop/message_loop.h"
+#include "base/path_service.h"
+#include "base/pickle.h"
+#include "base/threading/thread.h"
+#include "ipc/ipc_message.h"
+#include "ipc/ipc_test_base.h"
+#include "ipc/ipc_test_channel_listener.h"
+
+#if defined(OS_POSIX)
+#include "base/file_descriptor_posix.h"
+#endif
+
+namespace {
+
+class ListenerThatExpectsOK : public IPC::Listener {
+ public:
+ ListenerThatExpectsOK() {}
+
+ virtual ~ListenerThatExpectsOK() {}
+
+ virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+ PickleIterator iter(message);
+ std::string should_be_ok;
+ EXPECT_TRUE(iter.ReadString(&should_be_ok));
+ EXPECT_EQ(should_be_ok, "OK");
+ base::MessageLoop::current()->Quit();
+ return true;
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ static void SendOK(IPC::Sender* sender) {
+ IPC::Message* message = new IPC::Message(
+ 0, 2, IPC::Message::PRIORITY_NORMAL);
+ message->WriteString(std::string("OK"));
+ ASSERT_TRUE(sender->Send(message));
+ }
+};
+
+class ListenerThatShouldBeNeverCalled : public IPC::Listener {
+ virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+ NOTREACHED();
+ return true;
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+ NOTREACHED();
+ }
+
+ virtual void OnBadMessageReceived(const IPC::Message& message) OVERRIDE {
+ NOTREACHED();
+ }
+};
+
+class ChannelClient {
+ public:
+ explicit ChannelClient(IPC::Listener* listener, const char* name) {
+ scoped_ptr<IPC::Channel> bootstrap(IPC::Channel::CreateClient(
+ IPCTestBase::GetChannelName(name),
+ &never_called_));
+ channel_ = IPC::ChannelMojo::Create(
+ bootstrap.Pass(), IPC::Channel::MODE_CLIENT, listener,
+ main_message_loop_.message_loop_proxy());
+ }
+
+ void Connect() {
+ CHECK(channel_->Connect());
+ }
+
+ IPC::ChannelMojo* channel() const { return channel_.get(); }
+
+ private:
+ scoped_ptr<IPC::ChannelMojo> channel_;
+ ListenerThatShouldBeNeverCalled never_called_;
+ base::MessageLoopForIO main_message_loop_;
+};
+
+class IPCChannelMojoTest : public IPCTestBase {
+ public:
+ void CreateMojoChannel(IPC::Listener* listener);
+
+ protected:
+ virtual void SetUp() OVERRIDE {
+ IPCTestBase::SetUp();
+ }
+
+ ListenerThatShouldBeNeverCalled never_called_;
+};
+
+
+void IPCChannelMojoTest::CreateMojoChannel(IPC::Listener* listener) {
+ CreateChannel(&never_called_);
+ scoped_ptr<IPC::Channel> mojo_channel = IPC::ChannelMojo::Create(
+ ReleaseChannel(), IPC::Channel::MODE_SERVER, listener,
+ io_thread_task_runner()).PassAs<IPC::Channel>();
+ SetChannel(mojo_channel.PassAs<IPC::Channel>());
+}
+
+class TestChannelListenerWithExtraExpectations
+ : public IPC::TestChannelListener {
+ public:
+ TestChannelListenerWithExtraExpectations()
+ : is_connected_called_(false) {
+ }
+
+ virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+ IPC::TestChannelListener::OnChannelConnected(peer_pid);
+ EXPECT_TRUE(base::kNullProcessId != peer_pid);
+ is_connected_called_ = true;
+ }
+
+ bool is_connected_called() const { return is_connected_called_; }
+
+ private:
+ bool is_connected_called_;
+};
+
+TEST_F(IPCChannelMojoTest, ConnectedFromClient) {
+ Init("IPCChannelMojoTestClient");
+
+ // Set up IPC channel and start client.
+ TestChannelListenerWithExtraExpectations listener;
+ CreateMojoChannel(&listener);
+ listener.Init(sender());
+ ASSERT_TRUE(ConnectChannel());
+ ASSERT_TRUE(StartClient());
+
+ IPC::TestChannelListener::SendOneMessage(
+ sender(), "hello from parent");
+
+ base::MessageLoop::current()->Run();
+ EXPECT_TRUE(base::kNullProcessId != this->channel()->GetPeerPID());
+
+ this->channel()->Close();
+
+ EXPECT_TRUE(WaitForClientShutdown());
+ EXPECT_TRUE(listener.is_connected_called());
+ EXPECT_TRUE(listener.HasSentAll());
+
+ DestroyChannel();
+}
+
+// A long running process that connects to us
+MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestClient) {
+ TestChannelListenerWithExtraExpectations listener;
+ ChannelClient client(&listener, "IPCChannelMojoTestClient");
+ client.Connect();
+ listener.Init(client.channel());
+
+ IPC::TestChannelListener::SendOneMessage(
+ client.channel(), "hello from child");
+ base::MessageLoop::current()->Run();
+ EXPECT_TRUE(listener.is_connected_called());
+ EXPECT_TRUE(listener.HasSentAll());
+
+ return 0;
+}
+
+#if defined(OS_POSIX)
+class ListenerThatExpectsFile : public IPC::Listener {
+ public:
+ ListenerThatExpectsFile()
+ : sender_(NULL) {}
+
+ virtual ~ListenerThatExpectsFile() {}
+
+ virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+ PickleIterator iter(message);
+ base::FileDescriptor desc;
+ EXPECT_TRUE(message.ReadFileDescriptor(&iter, &desc));
+ std::string content(GetSendingFileContent().size(), ' ');
+ base::File file(desc.fd);
+ file.Read(0, &content[0], content.size());
+ EXPECT_EQ(content, GetSendingFileContent());
+ base::MessageLoop::current()->Quit();
+ ListenerThatExpectsOK::SendOK(sender_);
+ return true;
+ }
+
+ virtual void OnChannelError() OVERRIDE {
+ NOTREACHED();
+ }
+
+ static std::string GetSendingFileContent() {
+ return "Hello";
+ }
+
+ static base::FilePath GetSendingFilePath() {
+ base::FilePath path;
+ bool ok = PathService::Get(base::DIR_CACHE, &path);
+ EXPECT_TRUE(ok);
+ return path.Append("ListenerThatExpectsFile.txt");
+ }
+
+ static void WriteAndSendFile(IPC::Sender* sender, base::File& file) {
+ std::string content = GetSendingFileContent();
+ file.WriteAtCurrentPos(content.data(), content.size());
+ file.Flush();
+ IPC::Message* message = new IPC::Message(
+ 0, 2, IPC::Message::PRIORITY_NORMAL);
+ message->WriteFileDescriptor(
+ base::FileDescriptor(file.TakePlatformFile(), false));
+ ASSERT_TRUE(sender->Send(message));
+ }
+
+ void set_sender(IPC::Sender* sender) { sender_ = sender; }
+
+ private:
+ IPC::Sender* sender_;
+};
+
+
+TEST_F(IPCChannelMojoTest, SendPlatformHandle) {
+ Init("IPCChannelMojoTestSendPlatformHandleClient");
+
+ ListenerThatExpectsOK listener;
+ CreateMojoChannel(&listener);
+ ASSERT_TRUE(ConnectChannel());
+ ASSERT_TRUE(StartClient());
+
+ base::File file(ListenerThatExpectsFile::GetSendingFilePath(),
+ base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE |
+ base::File::FLAG_READ);
+ ListenerThatExpectsFile::WriteAndSendFile(channel(), file);
+ base::MessageLoop::current()->Run();
+
+ this->channel()->Close();
+
+ EXPECT_TRUE(WaitForClientShutdown());
+ DestroyChannel();
+}
+
+MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestSendPlatformHandleClient) {
+ ListenerThatExpectsFile listener;
+ ChannelClient client(
+ &listener, "IPCChannelMojoTestSendPlatformHandleClient");
+ client.Connect();
+ listener.set_sender(client.channel());
+
+ base::MessageLoop::current()->Run();
+
+ return 0;
+}
+#endif
+
+} // namespace
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
new file mode 100644
index 0000000..91022ac
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -0,0 +1,144 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "mojo/public/cpp/environment/environment.h"
+
+namespace IPC {
+namespace internal {
+
+MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
+ : pipe_wait_id_(0),
+ pipe_(handle.Pass()) {
+ StartWaiting();
+}
+
+MessagePipeReader::~MessagePipeReader() {
+ CHECK(!IsValid());
+}
+
+void MessagePipeReader::Close() {
+ StopWaiting();
+ pipe_.reset();
+ OnPipeClosed();
+}
+
+void MessagePipeReader::CloseWithError(MojoResult error) {
+ OnPipeError(error);
+ Close();
+}
+
+// static
+void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
+ reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
+}
+
+void MessagePipeReader::StartWaiting() {
+ DCHECK(pipe_.is_valid());
+ DCHECK(!pipe_wait_id_);
+ // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
+ // MessagePipe.
+ //
+ // TODO(morrita): Should we re-set the signal when we get new
+ // message to send?
+ pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
+ pipe_.get().value(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE,
+ &InvokePipeIsReady,
+ this);
+}
+
+void MessagePipeReader::StopWaiting() {
+ if (!pipe_wait_id_)
+ return;
+ mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
+ pipe_wait_id_ = 0;
+}
+
+void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
+ pipe_wait_id_ = 0;
+
+ if (wait_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION happens when the pipe is
+ // closed before the waiter is scheduled in a backend thread.
+ if (wait_result != MOJO_RESULT_ABORTED &&
+ wait_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING) << "Pipe got error from the waiter. Closing: "
+ << wait_result;
+ OnPipeError(wait_result);
+ }
+
+ Close();
+ return;
+ }
+
+ while (pipe_.is_valid()) {
+ MojoResult read_result = ReadMessageBytes();
+ if (read_result == MOJO_RESULT_SHOULD_WAIT)
+ break;
+ if (read_result != MOJO_RESULT_OK) {
+ // FAILED_PRECONDITION means that all the received messages
+ // got consumed and the peer is already closed.
+ if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
+ DLOG(WARNING)
+ << "Pipe got error from ReadMessage(). Closing: " << read_result;
+ OnPipeError(read_result);
+ }
+
+ Close();
+ break;
+ }
+
+ OnMessageReceived();
+ }
+
+ if (pipe_.is_valid())
+ StartWaiting();
+}
+
+MojoResult MessagePipeReader::ReadMessageBytes() {
+ DCHECK(handle_buffer_.empty());
+
+ uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
+ uint32_t num_handles = 0;
+ MojoResult result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ data_buffer_.resize(num_bytes);
+ handle_buffer_.resize(num_handles);
+ if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
+ // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
+ // it needs more bufer. So we re-read it with resized buffers.
+ result = MojoReadMessage(pipe_.get().value(),
+ num_bytes ? &data_buffer_[0] : NULL,
+ &num_bytes,
+ num_handles ? &handle_buffer_[0] : NULL,
+ &num_handles,
+ MOJO_READ_MESSAGE_FLAG_NONE);
+ }
+
+ DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
+ DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
+ return result;
+}
+
+void MessagePipeReader::DelayedDeleter::operator()(
+ MessagePipeReader* ptr) const {
+ ptr->Close();
+ base::MessageLoopProxy::current()->PostTask(
+ FROM_HERE, base::Bind(&DeleteNow, ptr));
+}
+
+} // namespace internal
+} // namespace IPC
diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h
new file mode 100644
index 0000000..ecfa018
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.h
@@ -0,0 +1,98 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_MESSAGE_PIPE_READER_H_
+#define IPC_IPC_MESSAGE_PIPE_READER_H_
+
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "mojo/public/c/environment/async_waiter.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace IPC {
+namespace internal {
+
+// A helper class to handle bytestream directly over mojo::MessagePipe
+// in template-method pattern. MessagePipeReader manages the lifetime
+// of given MessagePipe and participates the event loop, and
+// read the stream and call the client when it is ready.
+//
+// Each client has to:
+//
+// * Provide a subclass implemenation of a specific use of a MessagePipe
+// and implement callbacks.
+// * Create the subclass instance with a MessagePipeHandle.
+// The constructor automatically start listening on the pipe.
+//
+// MessageReader has to be used in IO thread. It isn't thread-safe.
+//
+class MessagePipeReader {
+ public:
+ // Delay the object deletion using the current message loop.
+ // This is intended to used by MessagePipeReader owners.
+ class DelayedDeleter {
+ public:
+ typedef base::DefaultDeleter<MessagePipeReader> DefaultType;
+
+ static void DeleteNow(MessagePipeReader* ptr) { delete ptr; }
+
+ DelayedDeleter() {}
+ DelayedDeleter(const DefaultType&) {}
+ DelayedDeleter& operator=(const DefaultType&) { return *this; }
+
+ void operator()(MessagePipeReader* ptr) const;
+ };
+
+ explicit MessagePipeReader(mojo::ScopedMessagePipeHandle handle);
+ virtual ~MessagePipeReader();
+
+ MojoHandle handle() const { return pipe_.get().value(); }
+
+ // Returns received bytes.
+ const std::vector<char>& data_buffer() const {
+ return data_buffer_;
+ }
+
+ // Delegate received handles ownership. The subclass should take the
+ // ownership over in its OnMessageReceived(). They will leak otherwise.
+ void TakeHandleBuffer(std::vector<MojoHandle>* handle_buffer) {
+ handle_buffer_.swap(*handle_buffer);
+ }
+
+ // Close and destroy the MessagePipe.
+ void Close();
+ // Close the mesage pipe with notifying the client with the error.
+ void CloseWithError(MojoResult error);
+ // Return true if the MessagePipe is alive.
+ bool IsValid() { return pipe_.is_valid(); }
+
+ //
+ // The client have to implment these callback to get the readiness
+ // event from the reader
+ //
+ virtual void OnMessageReceived() = 0;
+ virtual void OnPipeClosed() = 0;
+ virtual void OnPipeError(MojoResult error) = 0;
+
+ private:
+ static void InvokePipeIsReady(void* closure, MojoResult result);
+
+ MojoResult ReadMessageBytes();
+ void PipeIsReady(MojoResult wait_result);
+ void StartWaiting();
+ void StopWaiting();
+
+ std::vector<char> data_buffer_;
+ std::vector<MojoHandle> handle_buffer_;
+ MojoAsyncWaitID pipe_wait_id_;
+ mojo::ScopedMessagePipeHandle pipe_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
+};
+
+} // namespace internal
+} // namespace IPC
+
+#endif // IPC_IPC_MESSAGE_PIPE_READER_H_
diff --git a/ipc/mojo/ipc_mojo.gyp b/ipc/mojo/ipc_mojo.gyp
new file mode 100644
index 0000000..c408a17
--- /dev/null
+++ b/ipc/mojo/ipc_mojo.gyp
@@ -0,0 +1,68 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+{
+ 'variables': {
+ 'chromium_code': 1,
+ },
+ 'includes': [
+ ],
+ 'targets': [
+ {
+ 'target_name': 'ipc_mojo',
+ 'type': '<(component)',
+ 'variables': {
+ },
+ 'defines': [
+ 'IPC_MOJO_IMPLEMENTATION',
+ ],
+ 'dependencies': [
+ '../ipc.gyp:ipc',
+ '../../base/base.gyp:base',
+ '../../base/third_party/dynamic_annotations/dynamic_annotations.gyp:dynamic_annotations',
+ '../../mojo/mojo_base.gyp:mojo_cpp_bindings',
+ '../../mojo/mojo_base.gyp:mojo_environment_chromium',
+ '../../mojo/mojo_base.gyp:mojo_system_impl',
+ ],
+ 'sources': [
+ 'ipc_channel_mojo.cc',
+ 'ipc_channel_mojo.h',
+ 'ipc_message_pipe_reader.cc',
+ 'ipc_message_pipe_reader.h',
+ ],
+ # TODO(gregoryd): direct_dependent_settings should be shared with the
+ # 64-bit target, but it doesn't work due to a bug in gyp
+ 'direct_dependent_settings': {
+ 'include_dirs': [
+ '..',
+ ],
+ },
+ },
+ {
+ 'target_name': 'ipc_mojo_unittests',
+ 'type': '<(gtest_target_type)',
+ 'dependencies': [
+ '../ipc.gyp:ipc',
+ '../ipc.gyp:test_support_ipc',
+ '../../base/base.gyp:base',
+ '../../base/base.gyp:base_i18n',
+ '../../base/base.gyp:test_support_base',
+ '../../mojo/mojo_base.gyp:mojo_cpp_bindings',
+ '../../mojo/mojo_base.gyp:mojo_environment_chromium',
+ '../../mojo/mojo_base.gyp:mojo_system_impl',
+ '../../testing/gtest.gyp:gtest',
+ 'ipc_mojo',
+ ],
+ 'include_dirs': [
+ '..'
+ ],
+ 'sources': [
+ 'run_all_unittests.cc',
+ 'ipc_channel_mojo_unittest.cc',
+ ],
+ 'conditions': [
+ ],
+ },
+ ],
+}
diff --git a/ipc/mojo/run_all_unittests.cc b/ipc/mojo/run_all_unittests.cc
new file mode 100644
index 0000000..1734e5e
--- /dev/null
+++ b/ipc/mojo/run_all_unittests.cc
@@ -0,0 +1,42 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/at_exit.h"
+#include "base/bind.h"
+#include "base/test/launcher/unit_test_launcher.h"
+#include "base/test/test_suite.h"
+#include "mojo/embedder/embedder.h"
+
+#if defined(OS_ANDROID)
+#include "base/android/jni_android.h"
+#include "base/test/test_file_util.h"
+#endif
+
+namespace {
+
+class NoAtExitBaseTestSuite : public base::TestSuite {
+ public:
+ NoAtExitBaseTestSuite(int argc, char** argv)
+ : base::TestSuite(argc, argv, false) {
+ }
+};
+
+int RunTestSuite(int argc, char** argv) {
+ return NoAtExitBaseTestSuite(argc, argv).Run();
+}
+
+} // namespace
+
+int main(int argc, char** argv) {
+ mojo::embedder::Init();
+#if defined(OS_ANDROID)
+ JNIEnv* env = base::android::AttachCurrentThread();
+ file_util::RegisterContentUriTestUtils(env);
+#else
+ base::AtExitManager at_exit;
+#endif
+ return base::LaunchUnitTestsSerially(argc,
+ argv,
+ base::Bind(&RunTestSuite, argc, argv));
+}