diff options
-rw-r--r-- | ipc/mojo/ipc_channel_mojo.cc | 90 | ||||
-rw-r--r-- | ipc/mojo/ipc_channel_mojo.h | 13 | ||||
-rw-r--r-- | ipc/mojo/ipc_message_pipe_reader.cc | 86 | ||||
-rw-r--r-- | ipc/mojo/ipc_message_pipe_reader.h | 28 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/associated_interface_ptr.h | 3 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h | 6 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/lib/interface_endpoint_client.cc | 5 | ||||
-rw-r--r-- | mojo/public/cpp/bindings/lib/interface_endpoint_client.h | 1 |
8 files changed, 173 insertions, 59 deletions
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc index a0dabad..db79c11 100644 --- a/ipc/mojo/ipc_channel_mojo.cc +++ b/ipc/mojo/ipc_channel_mojo.cc @@ -223,7 +223,11 @@ scoped_ptr<ChannelFactory> ChannelMojo::CreateClientFactory( ChannelMojo::ChannelMojo(mojo::ScopedMessagePipeHandle handle, Mode mode, Listener* listener) - : listener_(listener), waiting_connect_(true), weak_factory_(this) { + : task_runner_(base::ThreadTaskRunnerHandle::Get()), + pipe_(handle.get()), + listener_(listener), + waiting_connect_(true), + weak_factory_(this) { // Create MojoBootstrap after all members are set as it touches // ChannelMojo from a different thread. bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, this); @@ -234,15 +238,27 @@ ChannelMojo::~ChannelMojo() { } bool ChannelMojo::Connect() { + base::AutoLock lock(lock_); DCHECK(!message_reader_); bootstrap_->Connect(); return true; } void ChannelMojo::Close() { - message_reader_.reset(); - // We might Close() before we Connect(). - waiting_connect_ = false; + scoped_ptr<internal::MessagePipeReader, ReaderDeleter> reader; + { + base::AutoLock lock(lock_); + if (!message_reader_) + return; + // The reader's destructor may re-enter Close, so we swap it out first to + // avoid deadlock when freeing it below. + std::swap(message_reader_, reader); + + // We might Close() before we Connect(). + waiting_connect_ = false; + } + + reader.reset(); } // MojoBootstrap::Delegate implementation @@ -264,39 +280,49 @@ void ChannelMojo::InitMessageReader(mojom::ChannelAssociatedPtrInfo sender, mojom::ChannelAssociatedPtr sender_ptr; sender_ptr.Bind(std::move(sender)); scoped_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter> reader( - new internal::MessagePipeReader(std::move(sender_ptr), + new internal::MessagePipeReader(pipe_, std::move(sender_ptr), std::move(receiver), peer_pid, this)); - for (size_t i = 0; i < pending_messages_.size(); ++i) { - bool sent = reader->Send(std::move(pending_messages_[i])); - if (!sent) { - // OnChannelError() is notified by OnPipeError(). + bool connected = true; + { + base::AutoLock lock(lock_); + for (size_t i = 0; i < pending_messages_.size(); ++i) { + if (!reader->Send(std::move(pending_messages_[i]))) { + LOG(ERROR) << "Failed to flush pending messages"; + pending_messages_.clear(); + connected = false; + break; + } + } + + if (connected) { + // We set |message_reader_| here and won't get any |pending_messages_| + // hereafter. Although we might have some if there is an error, we don't + // care. They cannot be sent anyway. + message_reader_ = std::move(reader); pending_messages_.clear(); - LOG(ERROR) << "Failed to flush pending messages"; - return; + waiting_connect_ = false; } } - // We set |message_reader_| here and won't get any |pending_messages_| - // hereafter. Although we might have some if there is an error, we don't - // care. They cannot be sent anyway. - message_reader_ = std::move(reader); - pending_messages_.clear(); - waiting_connect_ = false; - - listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID())); + if (connected) + listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID())); + else + OnPipeError(); } -void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) { - Close(); -} - -void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) { - listener_->OnChannelError(); +void ChannelMojo::OnPipeError() { + if (task_runner_->RunsTasksOnCurrentThread()) { + listener_->OnChannelError(); + } else { + task_runner_->PostTask( + FROM_HERE, + base::Bind(&ChannelMojo::OnPipeError, weak_factory_.GetWeakPtr())); + } } - bool ChannelMojo::Send(Message* message) { + base::AutoLock lock(lock_); if (!message_reader_) { pending_messages_.push_back(make_scoped_ptr(message)); // Counts as OK before the connection is established, but it's an @@ -304,10 +330,20 @@ bool ChannelMojo::Send(Message* message) { return waiting_connect_; } - return message_reader_->Send(make_scoped_ptr(message)); + if (!message_reader_->Send(make_scoped_ptr(message))) { + OnPipeError(); + return false; + } + + return true; +} + +bool ChannelMojo::IsSendThreadSafe() const { + return true; } base::ProcessId ChannelMojo::GetPeerPID() const { + base::AutoLock lock(lock_); if (!message_reader_) return base::kNullProcessId; diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h index 8c5de24..5968d84 100644 --- a/ipc/mojo/ipc_channel_mojo.h +++ b/ipc/mojo/ipc_channel_mojo.h @@ -10,9 +10,12 @@ #include <vector> #include "base/macros.h" +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/memory/scoped_vector.h" #include "base/memory/weak_ptr.h" +#include "base/synchronization/lock.h" +#include "base/task_runner.h" #include "build/build_config.h" #include "ipc/ipc_channel.h" #include "ipc/ipc_channel_factory.h" @@ -58,6 +61,7 @@ class IPC_MOJO_EXPORT ChannelMojo bool Connect() override; void Close() override; bool Send(Message* message) override; + bool IsSendThreadSafe() const override; base::ProcessId GetPeerPID() const override; base::ProcessId GetSelfPID() const override; @@ -83,8 +87,7 @@ class IPC_MOJO_EXPORT ChannelMojo // MessagePipeReader::Delegate void OnMessageReceived(const Message& message) override; - void OnPipeClosed(internal::MessagePipeReader* reader) override; - void OnPipeError(internal::MessagePipeReader* reader) override; + void OnPipeError() override; private: ChannelMojo(mojo::ScopedMessagePipeHandle handle, @@ -100,9 +103,15 @@ class IPC_MOJO_EXPORT ChannelMojo // notifications invoked by them. typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter; + // A TaskRunner which runs tasks on the ChannelMojo's owning thread. + scoped_refptr<base::TaskRunner> task_runner_; + + const mojo::MessagePipeHandle pipe_; scoped_ptr<MojoBootstrap> bootstrap_; Listener* listener_; + // Guards access to the fields below. + mutable base::Lock lock_; scoped_ptr<internal::MessagePipeReader, ReaderDeleter> message_reader_; std::vector<scoped_ptr<Message>> pending_messages_; bool waiting_connect_; diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc index 495a3a5..b657bfd 100644 --- a/ipc/mojo/ipc_message_pipe_reader.cc +++ b/ipc/mojo/ipc_message_pipe_reader.cc @@ -5,20 +5,54 @@ #include "ipc/mojo/ipc_message_pipe_reader.h" #include <stdint.h> + #include <utility> #include "base/bind.h" #include "base/bind_helpers.h" #include "base/location.h" #include "base/logging.h" +#include "base/macros.h" #include "base/single_thread_task_runner.h" #include "base/thread_task_runner_handle.h" #include "ipc/mojo/ipc_channel_mojo.h" +#include "mojo/public/cpp/bindings/message.h" namespace IPC { namespace internal { +namespace { + +// Used by Send() to capture a serialized Channel::Receive message. +class MessageSerializer : public mojo::MessageReceiverWithResponder { + public: + MessageSerializer() {} + ~MessageSerializer() override {} + + mojo::Message* message() { return &message_; } + + private: + // mojo::MessageReceiverWithResponder + bool Accept(mojo::Message* message) override { + message->MoveTo(&message_); + return true; + } + + bool AcceptWithResponder(mojo::Message* message, + mojo::MessageReceiver* responder) override { + NOTREACHED(); + return false; + } + + mojo::Message message_; + + DISALLOW_COPY_AND_ASSIGN(MessageSerializer); +}; + +} // namespace + MessagePipeReader::MessagePipeReader( + mojo::MessagePipeHandle pipe, mojom::ChannelAssociatedPtr sender, mojo::AssociatedInterfaceRequest<mojom::Channel> receiver, base::ProcessId peer_pid, @@ -26,7 +60,9 @@ MessagePipeReader::MessagePipeReader( : delegate_(delegate), peer_pid_(peer_pid), sender_(std::move(sender)), - binding_(this, std::move(receiver)) { + binding_(this, std::move(receiver)), + sender_interface_id_(sender_.interface_id()), + sender_pipe_(pipe) { sender_.set_connection_error_handler( base::Bind(&MessagePipeReader::OnPipeError, base::Unretained(this), MOJO_RESULT_FAILED_PRECONDITION)); @@ -45,12 +81,6 @@ void MessagePipeReader::Close() { sender_.reset(); if (binding_.is_bound()) binding_.Close(); - OnPipeClosed(); -} - -void MessagePipeReader::CloseWithError(MojoResult error) { - DCHECK(thread_checker_.CalledOnValidThread()); - OnPipeError(error); } bool MessagePipeReader::Send(scoped_ptr<Message> message) { @@ -61,17 +91,35 @@ bool MessagePipeReader::Send(scoped_ptr<Message> message) { mojo::Array<mojom::SerializedHandlePtr> handles(nullptr); MojoResult result = MOJO_RESULT_OK; result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles); - if (result != MOJO_RESULT_OK) { - CloseWithError(result); + if (result != MOJO_RESULT_OK) return false; - } + mojo::Array<uint8_t> data(message->size()); std::copy(reinterpret_cast<const uint8_t*>(message->data()), reinterpret_cast<const uint8_t*>(message->data()) + message->size(), &data[0]); - sender_->Receive(std::move(data), std::move(handles)); + + MessageSerializer serializer; + mojom::ChannelProxy proxy(&serializer); + proxy.Receive(std::move(data), std::move(handles)); + mojo::Message* mojo_message = serializer.message(); + + size_t num_handles = mojo_message->handles()->size(); + DCHECK_LE(num_handles, std::numeric_limits<uint32_t>::max()); + + mojo_message->set_interface_id(sender_interface_id_); + result = mojo::WriteMessageRaw( + sender_pipe_, mojo_message->data(), mojo_message->data_num_bytes(), + reinterpret_cast<const MojoHandle*>(mojo_message->handles()->data()), + static_cast<uint32_t>(num_handles), MOJO_WRITE_MESSAGE_FLAG_NONE); + + // If the write was successful, the handles have been transferred and they + // should not be closed when the message is destroyed. + if (result == MOJO_RESULT_OK) + mojo_message->mutable_handles()->clear(); + DVLOG(4) << "Send " << message->type() << ": " << message->size(); - return true; + return result == MOJO_RESULT_OK; } void MessagePipeReader::Receive( @@ -86,29 +134,21 @@ void MessagePipeReader::Receive( MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(std::move(handles), &message); if (write_result != MOJO_RESULT_OK) { - CloseWithError(write_result); + OnPipeError(write_result); return; } TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), - "MessagePipeReader::OnMessageReceived", + "MessagePipeReader::Receive", message.flags(), TRACE_EVENT_FLAG_FLOW_IN); delegate_->OnMessageReceived(message); } -void MessagePipeReader::OnPipeClosed() { - DCHECK(thread_checker_.CalledOnValidThread()); - if (!delegate_) - return; - delegate_->OnPipeClosed(this); - delegate_ = nullptr; -} - void MessagePipeReader::OnPipeError(MojoResult error) { DCHECK(thread_checker_.CalledOnValidThread()); if (delegate_) - delegate_->OnPipeError(this); + delegate_->OnPipeError(); Close(); } diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h index 3989c25..d2ba9de 100644 --- a/ipc/mojo/ipc_message_pipe_reader.h +++ b/ipc/mojo/ipc_message_pipe_reader.h @@ -19,6 +19,7 @@ #include "ipc/mojo/ipc.mojom.h" #include "mojo/public/cpp/bindings/associated_binding.h" #include "mojo/public/cpp/system/core.h" +#include "mojo/public/cpp/system/message_pipe.h" namespace IPC { namespace internal { @@ -46,8 +47,7 @@ class MessagePipeReader : public mojom::Channel { class Delegate { public: virtual void OnMessageReceived(const Message& message) = 0; - virtual void OnPipeClosed(MessagePipeReader* reader) = 0; - virtual void OnPipeError(MessagePipeReader* reader) = 0; + virtual void OnPipeError() = 0; }; // Delay the object deletion using the current message loop. @@ -65,11 +65,18 @@ class MessagePipeReader : public mojom::Channel { void operator()(MessagePipeReader* ptr) const; }; - // Both parameters must be non-null. - // Build a reader that reads messages from |receive_handle| and lets + // Builds a reader that reads messages from |receive_handle| and lets // |delegate| know. + // + // |pipe| is the message pipe handle corresponding to the channel's master + // interface. This is the message pipe underlying both |sender| and + // |receiver|. + // + // Both |sender| and |receiver| must be non-null. + // // Note that MessagePipeReader doesn't delete |delegate|. - MessagePipeReader(mojom::ChannelAssociatedPtr sender, + MessagePipeReader(mojo::MessagePipeHandle pipe, + mojom::ChannelAssociatedPtr sender, mojo::AssociatedInterfaceRequest<mojom::Channel> receiver, base::ProcessId peer_pid, Delegate* delegate); @@ -77,12 +84,12 @@ class MessagePipeReader : public mojom::Channel { // Close and destroy the MessagePipe. void Close(); - // Close the mesage pipe with notifying the client with the error. - void CloseWithError(MojoResult error); // Return true if the MessagePipe is alive. bool IsValid() { return sender_; } + // Sends an IPC::Message to the other end of the pipe. Safe to call from any + // thread. bool Send(scoped_ptr<Message> message); base::ProcessId GetPeerPid() const { return peer_pid_; } @@ -92,6 +99,7 @@ class MessagePipeReader : public mojom::Channel { void OnPipeError(MojoResult error); private: + // mojom::Channel: void Receive(mojo::Array<uint8_t> data, mojo::Array<mojom::SerializedHandlePtr> handles) override; @@ -100,6 +108,12 @@ class MessagePipeReader : public mojom::Channel { base::ProcessId peer_pid_; mojom::ChannelAssociatedPtr sender_; mojo::AssociatedBinding<mojom::Channel> binding_; + + // Raw message pipe handle and interface ID we use to send legacy IPC messages + // over the associated pipe. + const uint32_t sender_interface_id_; + const mojo::MessagePipeHandle sender_pipe_; + base::ThreadChecker thread_checker_; DISALLOW_COPY_AND_ASSIGN(MessagePipeReader); diff --git a/mojo/public/cpp/bindings/associated_interface_ptr.h b/mojo/public/cpp/bindings/associated_interface_ptr.h index d3a848a..48caad6 100644 --- a/mojo/public/cpp/bindings/associated_interface_ptr.h +++ b/mojo/public/cpp/bindings/associated_interface_ptr.h @@ -79,6 +79,9 @@ class AssociatedInterfacePtr { // Returns the version number of the interface that the remote side supports. uint32_t version() const { return internal_state_.version(); } + // Returns the internal interface ID of this associated interface. + uint32_t interface_id() const { return internal_state_.interface_id(); } + // Queries the max version that the remote side supports. On completion, the // result will be returned as the input of |callback|. The version number of // this object will also be updated. diff --git a/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h b/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h index f792884..1b9e35c 100644 --- a/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h +++ b/mojo/public/cpp/bindings/lib/associated_interface_ptr_state.h @@ -19,6 +19,7 @@ #include "mojo/public/cpp/bindings/lib/interface_id.h" #include "mojo/public/cpp/bindings/lib/multiplex_router.h" #include "mojo/public/cpp/bindings/lib/scoped_interface_endpoint_handle.h" +#include "mojo/public/cpp/system/message_pipe.h" namespace mojo { namespace internal { @@ -40,6 +41,11 @@ class AssociatedInterfacePtrState { uint32_t version() const { return version_; } + uint32_t interface_id() const { + DCHECK(is_bound()); + return endpoint_client_->interface_id(); + } + void QueryVersion(const Callback<void(uint32_t)>& callback) { // It is safe to capture |this| because the callback won't be run after this // object goes away. diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc index 58bc885..8094ad0 100644 --- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc +++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc @@ -149,6 +149,11 @@ AssociatedGroup* InterfaceEndpointClient::associated_group() { return associated_group_.get(); } +uint32_t InterfaceEndpointClient::interface_id() const { + DCHECK(thread_checker_.CalledOnValidThread()); + return handle_.id(); +} + ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(!has_pending_responders()); diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.h b/mojo/public/cpp/bindings/lib/interface_endpoint_client.h index 12ecbcc..548ca38 100644 --- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.h +++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.h @@ -60,6 +60,7 @@ class InterfaceEndpointClient : public MessageReceiverWithResponder { MultiplexRouter* router() const { return handle_.router(); } AssociatedGroup* associated_group(); + uint32_t interface_id() const; // After this call the object is in an invalid state and shouldn't be reused. ScopedInterfaceEndpointHandle PassHandle(); |