summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ipc/mojo/ipc_channel_mojo.cc90
-rw-r--r--ipc/mojo/ipc_channel_mojo.h13
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.cc86
-rw-r--r--ipc/mojo/ipc_message_pipe_reader.h28
-rw-r--r--mojo/public/cpp/bindings/associated_interface_ptr.h3
-rw-r--r--mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h6
-rw-r--r--mojo/public/cpp/bindings/lib/interface_endpoint_client.cc5
-rw-r--r--mojo/public/cpp/bindings/lib/interface_endpoint_client.h1
8 files changed, 173 insertions, 59 deletions
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
index a0dabad..db79c11 100644
--- a/ipc/mojo/ipc_channel_mojo.cc
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -223,7 +223,11 @@ scoped_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
ChannelMojo::ChannelMojo(mojo::ScopedMessagePipeHandle handle,
Mode mode,
Listener* listener)
- : listener_(listener), waiting_connect_(true), weak_factory_(this) {
+ : task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ pipe_(handle.get()),
+ listener_(listener),
+ waiting_connect_(true),
+ weak_factory_(this) {
// Create MojoBootstrap after all members are set as it touches
// ChannelMojo from a different thread.
bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, this);
@@ -234,15 +238,27 @@ ChannelMojo::~ChannelMojo() {
}
bool ChannelMojo::Connect() {
+ base::AutoLock lock(lock_);
DCHECK(!message_reader_);
bootstrap_->Connect();
return true;
}
void ChannelMojo::Close() {
- message_reader_.reset();
- // We might Close() before we Connect().
- waiting_connect_ = false;
+ scoped_ptr<internal::MessagePipeReader, ReaderDeleter> reader;
+ {
+ base::AutoLock lock(lock_);
+ if (!message_reader_)
+ return;
+ // The reader's destructor may re-enter Close, so we swap it out first to
+ // avoid deadlock when freeing it below.
+ std::swap(message_reader_, reader);
+
+ // We might Close() before we Connect().
+ waiting_connect_ = false;
+ }
+
+ reader.reset();
}
// MojoBootstrap::Delegate implementation
@@ -264,39 +280,49 @@ void ChannelMojo::InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
mojom::ChannelAssociatedPtr sender_ptr;
sender_ptr.Bind(std::move(sender));
scoped_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter> reader(
- new internal::MessagePipeReader(std::move(sender_ptr),
+ new internal::MessagePipeReader(pipe_, std::move(sender_ptr),
std::move(receiver), peer_pid, this));
- for (size_t i = 0; i < pending_messages_.size(); ++i) {
- bool sent = reader->Send(std::move(pending_messages_[i]));
- if (!sent) {
- // OnChannelError() is notified by OnPipeError().
+ bool connected = true;
+ {
+ base::AutoLock lock(lock_);
+ for (size_t i = 0; i < pending_messages_.size(); ++i) {
+ if (!reader->Send(std::move(pending_messages_[i]))) {
+ LOG(ERROR) << "Failed to flush pending messages";
+ pending_messages_.clear();
+ connected = false;
+ break;
+ }
+ }
+
+ if (connected) {
+ // We set |message_reader_| here and won't get any |pending_messages_|
+ // hereafter. Although we might have some if there is an error, we don't
+ // care. They cannot be sent anyway.
+ message_reader_ = std::move(reader);
pending_messages_.clear();
- LOG(ERROR) << "Failed to flush pending messages";
- return;
+ waiting_connect_ = false;
}
}
- // We set |message_reader_| here and won't get any |pending_messages_|
- // hereafter. Although we might have some if there is an error, we don't
- // care. They cannot be sent anyway.
- message_reader_ = std::move(reader);
- pending_messages_.clear();
- waiting_connect_ = false;
-
- listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
+ if (connected)
+ listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
+ else
+ OnPipeError();
}
-void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
- Close();
-}
-
-void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
- listener_->OnChannelError();
+void ChannelMojo::OnPipeError() {
+ if (task_runner_->RunsTasksOnCurrentThread()) {
+ listener_->OnChannelError();
+ } else {
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelMojo::OnPipeError, weak_factory_.GetWeakPtr()));
+ }
}
-
bool ChannelMojo::Send(Message* message) {
+ base::AutoLock lock(lock_);
if (!message_reader_) {
pending_messages_.push_back(make_scoped_ptr(message));
// Counts as OK before the connection is established, but it's an
@@ -304,10 +330,20 @@ bool ChannelMojo::Send(Message* message) {
return waiting_connect_;
}
- return message_reader_->Send(make_scoped_ptr(message));
+ if (!message_reader_->Send(make_scoped_ptr(message))) {
+ OnPipeError();
+ return false;
+ }
+
+ return true;
+}
+
+bool ChannelMojo::IsSendThreadSafe() const {
+ return true;
}
base::ProcessId ChannelMojo::GetPeerPID() const {
+ base::AutoLock lock(lock_);
if (!message_reader_)
return base::kNullProcessId;
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
index 8c5de24..5968d84 100644
--- a/ipc/mojo/ipc_channel_mojo.h
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -10,9 +10,12 @@
#include <vector>
#include "base/macros.h"
+#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/weak_ptr.h"
+#include "base/synchronization/lock.h"
+#include "base/task_runner.h"
#include "build/build_config.h"
#include "ipc/ipc_channel.h"
#include "ipc/ipc_channel_factory.h"
@@ -58,6 +61,7 @@ class IPC_MOJO_EXPORT ChannelMojo
bool Connect() override;
void Close() override;
bool Send(Message* message) override;
+ bool IsSendThreadSafe() const override;
base::ProcessId GetPeerPID() const override;
base::ProcessId GetSelfPID() const override;
@@ -83,8 +87,7 @@ class IPC_MOJO_EXPORT ChannelMojo
// MessagePipeReader::Delegate
void OnMessageReceived(const Message& message) override;
- void OnPipeClosed(internal::MessagePipeReader* reader) override;
- void OnPipeError(internal::MessagePipeReader* reader) override;
+ void OnPipeError() override;
private:
ChannelMojo(mojo::ScopedMessagePipeHandle handle,
@@ -100,9 +103,15 @@ class IPC_MOJO_EXPORT ChannelMojo
// notifications invoked by them.
typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;
+ // A TaskRunner which runs tasks on the ChannelMojo's owning thread.
+ scoped_refptr<base::TaskRunner> task_runner_;
+
+ const mojo::MessagePipeHandle pipe_;
scoped_ptr<MojoBootstrap> bootstrap_;
Listener* listener_;
+ // Guards access to the fields below.
+ mutable base::Lock lock_;
scoped_ptr<internal::MessagePipeReader, ReaderDeleter> message_reader_;
std::vector<scoped_ptr<Message>> pending_messages_;
bool waiting_connect_;
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
index 495a3a5..b657bfd 100644
--- a/ipc/mojo/ipc_message_pipe_reader.cc
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -5,20 +5,54 @@
#include "ipc/mojo/ipc_message_pipe_reader.h"
#include <stdint.h>
+
#include <utility>
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
+#include "base/macros.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "ipc/mojo/ipc_channel_mojo.h"
+#include "mojo/public/cpp/bindings/message.h"
namespace IPC {
namespace internal {
+namespace {
+
+// Used by Send() to capture a serialized Channel::Receive message.
+class MessageSerializer : public mojo::MessageReceiverWithResponder {
+ public:
+ MessageSerializer() {}
+ ~MessageSerializer() override {}
+
+ mojo::Message* message() { return &message_; }
+
+ private:
+ // mojo::MessageReceiverWithResponder
+ bool Accept(mojo::Message* message) override {
+ message->MoveTo(&message_);
+ return true;
+ }
+
+ bool AcceptWithResponder(mojo::Message* message,
+ mojo::MessageReceiver* responder) override {
+ NOTREACHED();
+ return false;
+ }
+
+ mojo::Message message_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageSerializer);
+};
+
+} // namespace
+
MessagePipeReader::MessagePipeReader(
+ mojo::MessagePipeHandle pipe,
mojom::ChannelAssociatedPtr sender,
mojo::AssociatedInterfaceRequest<mojom::Channel> receiver,
base::ProcessId peer_pid,
@@ -26,7 +60,9 @@ MessagePipeReader::MessagePipeReader(
: delegate_(delegate),
peer_pid_(peer_pid),
sender_(std::move(sender)),
- binding_(this, std::move(receiver)) {
+ binding_(this, std::move(receiver)),
+ sender_interface_id_(sender_.interface_id()),
+ sender_pipe_(pipe) {
sender_.set_connection_error_handler(
base::Bind(&MessagePipeReader::OnPipeError, base::Unretained(this),
MOJO_RESULT_FAILED_PRECONDITION));
@@ -45,12 +81,6 @@ void MessagePipeReader::Close() {
sender_.reset();
if (binding_.is_bound())
binding_.Close();
- OnPipeClosed();
-}
-
-void MessagePipeReader::CloseWithError(MojoResult error) {
- DCHECK(thread_checker_.CalledOnValidThread());
- OnPipeError(error);
}
bool MessagePipeReader::Send(scoped_ptr<Message> message) {
@@ -61,17 +91,35 @@ bool MessagePipeReader::Send(scoped_ptr<Message> message) {
mojo::Array<mojom::SerializedHandlePtr> handles(nullptr);
MojoResult result = MOJO_RESULT_OK;
result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
- if (result != MOJO_RESULT_OK) {
- CloseWithError(result);
+ if (result != MOJO_RESULT_OK)
return false;
- }
+
mojo::Array<uint8_t> data(message->size());
std::copy(reinterpret_cast<const uint8_t*>(message->data()),
reinterpret_cast<const uint8_t*>(message->data()) + message->size(),
&data[0]);
- sender_->Receive(std::move(data), std::move(handles));
+
+ MessageSerializer serializer;
+ mojom::ChannelProxy proxy(&serializer);
+ proxy.Receive(std::move(data), std::move(handles));
+ mojo::Message* mojo_message = serializer.message();
+
+ size_t num_handles = mojo_message->handles()->size();
+ DCHECK_LE(num_handles, std::numeric_limits<uint32_t>::max());
+
+ mojo_message->set_interface_id(sender_interface_id_);
+ result = mojo::WriteMessageRaw(
+ sender_pipe_, mojo_message->data(), mojo_message->data_num_bytes(),
+ reinterpret_cast<const MojoHandle*>(mojo_message->handles()->data()),
+ static_cast<uint32_t>(num_handles), MOJO_WRITE_MESSAGE_FLAG_NONE);
+
+ // If the write was successful, the handles have been transferred and they
+ // should not be closed when the message is destroyed.
+ if (result == MOJO_RESULT_OK)
+ mojo_message->mutable_handles()->clear();
+
DVLOG(4) << "Send " << message->type() << ": " << message->size();
- return true;
+ return result == MOJO_RESULT_OK;
}
void MessagePipeReader::Receive(
@@ -86,29 +134,21 @@ void MessagePipeReader::Receive(
MojoResult write_result =
ChannelMojo::WriteToMessageAttachmentSet(std::move(handles), &message);
if (write_result != MOJO_RESULT_OK) {
- CloseWithError(write_result);
+ OnPipeError(write_result);
return;
}
TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
- "MessagePipeReader::OnMessageReceived",
+ "MessagePipeReader::Receive",
message.flags(),
TRACE_EVENT_FLAG_FLOW_IN);
delegate_->OnMessageReceived(message);
}
-void MessagePipeReader::OnPipeClosed() {
- DCHECK(thread_checker_.CalledOnValidThread());
- if (!delegate_)
- return;
- delegate_->OnPipeClosed(this);
- delegate_ = nullptr;
-}
-
void MessagePipeReader::OnPipeError(MojoResult error) {
DCHECK(thread_checker_.CalledOnValidThread());
if (delegate_)
- delegate_->OnPipeError(this);
+ delegate_->OnPipeError();
Close();
}
diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h
index 3989c25..d2ba9de 100644
--- a/ipc/mojo/ipc_message_pipe_reader.h
+++ b/ipc/mojo/ipc_message_pipe_reader.h
@@ -19,6 +19,7 @@
#include "ipc/mojo/ipc.mojom.h"
#include "mojo/public/cpp/bindings/associated_binding.h"
#include "mojo/public/cpp/system/core.h"
+#include "mojo/public/cpp/system/message_pipe.h"
namespace IPC {
namespace internal {
@@ -46,8 +47,7 @@ class MessagePipeReader : public mojom::Channel {
class Delegate {
public:
virtual void OnMessageReceived(const Message& message) = 0;
- virtual void OnPipeClosed(MessagePipeReader* reader) = 0;
- virtual void OnPipeError(MessagePipeReader* reader) = 0;
+ virtual void OnPipeError() = 0;
};
// Delay the object deletion using the current message loop.
@@ -65,11 +65,18 @@ class MessagePipeReader : public mojom::Channel {
void operator()(MessagePipeReader* ptr) const;
};
- // Both parameters must be non-null.
- // Build a reader that reads messages from |receive_handle| and lets
+ // Builds a reader that reads messages from |receive_handle| and lets
// |delegate| know.
+ //
+ // |pipe| is the message pipe handle corresponding to the channel's master
+ // interface. This is the message pipe underlying both |sender| and
+ // |receiver|.
+ //
+ // Both |sender| and |receiver| must be non-null.
+ //
// Note that MessagePipeReader doesn't delete |delegate|.
- MessagePipeReader(mojom::ChannelAssociatedPtr sender,
+ MessagePipeReader(mojo::MessagePipeHandle pipe,
+ mojom::ChannelAssociatedPtr sender,
mojo::AssociatedInterfaceRequest<mojom::Channel> receiver,
base::ProcessId peer_pid,
Delegate* delegate);
@@ -77,12 +84,12 @@ class MessagePipeReader : public mojom::Channel {
// Close and destroy the MessagePipe.
void Close();
- // Close the mesage pipe with notifying the client with the error.
- void CloseWithError(MojoResult error);
// Return true if the MessagePipe is alive.
bool IsValid() { return sender_; }
+ // Sends an IPC::Message to the other end of the pipe. Safe to call from any
+ // thread.
bool Send(scoped_ptr<Message> message);
base::ProcessId GetPeerPid() const { return peer_pid_; }
@@ -92,6 +99,7 @@ class MessagePipeReader : public mojom::Channel {
void OnPipeError(MojoResult error);
private:
+ // mojom::Channel:
void Receive(mojo::Array<uint8_t> data,
mojo::Array<mojom::SerializedHandlePtr> handles) override;
@@ -100,6 +108,12 @@ class MessagePipeReader : public mojom::Channel {
base::ProcessId peer_pid_;
mojom::ChannelAssociatedPtr sender_;
mojo::AssociatedBinding<mojom::Channel> binding_;
+
+ // Raw message pipe handle and interface ID we use to send legacy IPC messages
+ // over the associated pipe.
+ const uint32_t sender_interface_id_;
+ const mojo::MessagePipeHandle sender_pipe_;
+
base::ThreadChecker thread_checker_;
DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
diff --git a/mojo/public/cpp/bindings/associated_interface_ptr.h b/mojo/public/cpp/bindings/associated_interface_ptr.h
index d3a848a..48caad6 100644
--- a/mojo/public/cpp/bindings/associated_interface_ptr.h
+++ b/mojo/public/cpp/bindings/associated_interface_ptr.h
@@ -79,6 +79,9 @@ class AssociatedInterfacePtr {
// Returns the version number of the interface that the remote side supports.
uint32_t version() const { return internal_state_.version(); }
+ // Returns the internal interface ID of this associated interface.
+ uint32_t interface_id() const { return internal_state_.interface_id(); }
+
// Queries the max version that the remote side supports. On completion, the
// result will be returned as the input of |callback|. The version number of
// this object will also be updated.
diff --git a/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h b/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h
index f792884..1b9e35c 100644
--- a/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h
+++ b/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h
@@ -19,6 +19,7 @@
#include "mojo/public/cpp/bindings/lib/interface_id.h"
#include "mojo/public/cpp/bindings/lib/multiplex_router.h"
#include "mojo/public/cpp/bindings/lib/scoped_interface_endpoint_handle.h"
+#include "mojo/public/cpp/system/message_pipe.h"
namespace mojo {
namespace internal {
@@ -40,6 +41,11 @@ class AssociatedInterfacePtrState {
uint32_t version() const { return version_; }
+ uint32_t interface_id() const {
+ DCHECK(is_bound());
+ return endpoint_client_->interface_id();
+ }
+
void QueryVersion(const Callback<void(uint32_t)>& callback) {
// It is safe to capture |this| because the callback won't be run after this
// object goes away.
diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
index 58bc885..8094ad0 100644
--- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
+++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
@@ -149,6 +149,11 @@ AssociatedGroup* InterfaceEndpointClient::associated_group() {
return associated_group_.get();
}
+uint32_t InterfaceEndpointClient::interface_id() const {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ return handle_.id();
+}
+
ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(!has_pending_responders());
diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.h b/mojo/public/cpp/bindings/lib/interface_endpoint_client.h
index 12ecbcc..548ca38 100644
--- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.h
+++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.h
@@ -60,6 +60,7 @@ class InterfaceEndpointClient : public MessageReceiverWithResponder {
MultiplexRouter* router() const { return handle_.router(); }
AssociatedGroup* associated_group();
+ uint32_t interface_id() const;
// After this call the object is in an invalid state and shouldn't be reused.
ScopedInterfaceEndpointHandle PassHandle();