diff options
Diffstat (limited to 'mojo/edk/system/message_pipe.cc')
-rw-r--r-- | mojo/edk/system/message_pipe.cc | 384 |
1 files changed, 0 insertions, 384 deletions
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc deleted file mode 100644 index 554401e..0000000 --- a/mojo/edk/system/message_pipe.cc +++ /dev/null @@ -1,384 +0,0 @@ -// Copyright 2013 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "mojo/edk/system/message_pipe.h" - -#include "base/logging.h" -#include "mojo/edk/system/channel.h" -#include "mojo/edk/system/channel_endpoint.h" -#include "mojo/edk/system/channel_endpoint_id.h" -#include "mojo/edk/system/incoming_endpoint.h" -#include "mojo/edk/system/local_message_pipe_endpoint.h" -#include "mojo/edk/system/message_in_transit.h" -#include "mojo/edk/system/message_pipe_dispatcher.h" -#include "mojo/edk/system/message_pipe_endpoint.h" -#include "mojo/edk/system/proxy_message_pipe_endpoint.h" - -namespace mojo { -namespace system { - -// static -MessagePipe* MessagePipe::CreateLocalLocal() { - MessagePipe* message_pipe = new MessagePipe(); - message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); - message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); - return message_pipe; -} - -// static -MessagePipe* MessagePipe::CreateLocalProxy( - scoped_refptr<ChannelEndpoint>* channel_endpoint) { - DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. - MessagePipe* message_pipe = new MessagePipe(); - message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); - *channel_endpoint = new ChannelEndpoint(message_pipe, 1); - message_pipe->endpoints_[1].reset( - new ProxyMessagePipeEndpoint(channel_endpoint->get())); - return message_pipe; -} - -// static -MessagePipe* MessagePipe::CreateLocalProxyFromExisting( - MessageInTransitQueue* message_queue, - ChannelEndpoint* channel_endpoint) { - DCHECK(message_queue); - MessagePipe* message_pipe = new MessagePipe(); - message_pipe->endpoints_[0].reset( - new LocalMessagePipeEndpoint(message_queue)); - if (channel_endpoint) { - bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); - message_pipe->endpoints_[1].reset( - new ProxyMessagePipeEndpoint(channel_endpoint)); - if (!attached_to_channel) - message_pipe->OnDetachFromChannel(1); - } else { - // This means that the proxy side was already closed; we only need to inform - // the local side of this. - // TODO(vtl): This is safe to do without locking (but perhaps slightly - // dubious), since no other thread has access to |message_pipe| yet. - message_pipe->endpoints_[0]->OnPeerClose(); - } - return message_pipe; -} - -// static -MessagePipe* MessagePipe::CreateProxyLocal( - scoped_refptr<ChannelEndpoint>* channel_endpoint) { - DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. - MessagePipe* message_pipe = new MessagePipe(); - *channel_endpoint = new ChannelEndpoint(message_pipe, 0); - message_pipe->endpoints_[0].reset( - new ProxyMessagePipeEndpoint(channel_endpoint->get())); - message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint()); - return message_pipe; -} - -// static -unsigned MessagePipe::GetPeerPort(unsigned port) { - DCHECK(port == 0 || port == 1); - return port ^ 1; -} - -// static -bool MessagePipe::Deserialize(Channel* channel, - const void* source, - size_t size, - scoped_refptr<MessagePipe>* message_pipe, - unsigned* port) { - DCHECK(!*message_pipe); // Not technically wrong, but unlikely. - - if (size != channel->GetSerializedEndpointSize()) { - LOG(ERROR) << "Invalid serialized message pipe"; - return false; - } - - scoped_refptr<IncomingEndpoint> incoming_endpoint = - channel->DeserializeEndpoint(source); - if (!incoming_endpoint) - return false; - - *message_pipe = incoming_endpoint->ConvertToMessagePipe(); - DCHECK(*message_pipe); - *port = 0; - return true; -} - -MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { - DCHECK(port == 0 || port == 1); - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - - return endpoints_[port]->GetType(); -} - -void MessagePipe::CancelAllAwakables(unsigned port) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - endpoints_[port]->CancelAllAwakables(); -} - -void MessagePipe::Close(unsigned port) { - DCHECK(port == 0 || port == 1); - - unsigned peer_port = GetPeerPort(port); - - base::AutoLock locker(lock_); - // The endpoint's |OnPeerClose()| may have been called first and returned - // false, which would have resulted in its destruction. - if (!endpoints_[port]) - return; - - endpoints_[port]->Close(); - if (endpoints_[peer_port]) { - if (!endpoints_[peer_port]->OnPeerClose()) - endpoints_[peer_port].reset(); - } - endpoints_[port].reset(); -} - -// TODO(vtl): Handle flags. -MojoResult MessagePipe::WriteMessage( - unsigned port, - UserPointer<const void> bytes, - uint32_t num_bytes, - std::vector<DispatcherTransport>* transports, - MojoWriteMessageFlags flags) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - return EnqueueMessageNoLock( - GetPeerPort(port), - make_scoped_ptr(new MessageInTransit( - MessageInTransit::kTypeEndpoint, - MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)), - transports); -} - -MojoResult MessagePipe::ReadMessage(unsigned port, - UserPointer<void> bytes, - UserPointer<uint32_t> num_bytes, - DispatcherVector* dispatchers, - uint32_t* num_dispatchers, - MojoReadMessageFlags flags) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - - return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, - num_dispatchers, flags); -} - -HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(const_cast<base::Lock&>(lock_)); - DCHECK(endpoints_[port]); - - return endpoints_[port]->GetHandleSignalsState(); -} - -MojoResult MessagePipe::AddAwakable(unsigned port, - Awakable* awakable, - MojoHandleSignals signals, - uint32_t context, - HandleSignalsState* signals_state) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - - return endpoints_[port]->AddAwakable(awakable, signals, context, - signals_state); -} - -void MessagePipe::RemoveAwakable(unsigned port, - Awakable* awakable, - HandleSignalsState* signals_state) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - - endpoints_[port]->RemoveAwakable(awakable, signals_state); -} - -void MessagePipe::StartSerialize(unsigned /*port*/, - Channel* channel, - size_t* max_size, - size_t* max_platform_handles) { - *max_size = channel->GetSerializedEndpointSize(); - *max_platform_handles = 0; -} - -bool MessagePipe::EndSerialize( - unsigned port, - Channel* channel, - void* destination, - size_t* actual_size, - embedder::PlatformHandleVector* /*platform_handles*/) { - DCHECK(port == 0 || port == 1); - - base::AutoLock locker(lock_); - DCHECK(endpoints_[port]); - - // The port being serialized must be local. - DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal); - - unsigned peer_port = GetPeerPort(port); - MessageInTransitQueue* message_queue = - static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) - ->message_queue(); - // The replacement for |endpoints_[port]|, if any. - MessagePipeEndpoint* replacement_endpoint = nullptr; - - // The three cases below correspond to the ones described above - // |Channel::SerializeEndpoint...()| (in channel.h). - if (!endpoints_[peer_port]) { - // Case 1: (known-)closed peer port. There's no reason for us to continue to - // exist afterwards. - channel->SerializeEndpointWithClosedPeer(destination, message_queue); - } else if (endpoints_[peer_port]->GetType() == - MessagePipeEndpoint::kTypeLocal) { - // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint| - // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that - // the |Channel| returns to us. - scoped_refptr<ChannelEndpoint> channel_endpoint = - channel->SerializeEndpointWithLocalPeer(destination, message_queue, - this, port); - replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get()); - } else { - // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and - // pass it to the |Channel|. There's no reason for us to continue to exist - // afterwards. - DCHECK_EQ(endpoints_[peer_port]->GetType(), - MessagePipeEndpoint::kTypeProxy); - ProxyMessagePipeEndpoint* peer_endpoint = - static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); - scoped_refptr<ChannelEndpoint> peer_channel_endpoint = - peer_endpoint->ReleaseChannelEndpoint(); - channel->SerializeEndpointWithRemotePeer(destination, message_queue, - peer_channel_endpoint); - // No need to call |Close()| after |ReleaseChannelEndpoint()|. - endpoints_[peer_port].reset(); - } - - endpoints_[port]->Close(); - endpoints_[port].reset(replacement_endpoint); - - *actual_size = channel->GetSerializedEndpointSize(); - return true; -} - -bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { - base::AutoLock locker(lock_); - - if (!endpoints_[port]) { - // This will happen only on the rare occasion that the call to - // |OnReadMessage()| is racing with us calling - // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, - // and the |ChannelEndpoint| can retry (calling the new client's - // |OnReadMessage()|). - return false; - } - - // This is called when the |ChannelEndpoint| for the - // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). - // We need to pass this message on to its peer port (typically a - // |LocalMessagePipeEndpoint|). - MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), - make_scoped_ptr(message), nullptr); - DLOG_IF(WARNING, result != MOJO_RESULT_OK) - << "EnqueueMessageNoLock() failed (result = " << result << ")"; - return true; -} - -void MessagePipe::OnDetachFromChannel(unsigned port) { - Close(port); -} - -MessagePipe::MessagePipe() { -} - -MessagePipe::~MessagePipe() { - // Owned by the dispatchers. The owning dispatchers should only release us via - // their |Close()| method, which should inform us of being closed via our - // |Close()|. Thus these should already be null. - DCHECK(!endpoints_[0]); - DCHECK(!endpoints_[1]); -} - -MojoResult MessagePipe::EnqueueMessageNoLock( - unsigned port, - scoped_ptr<MessageInTransit> message, - std::vector<DispatcherTransport>* transports) { - DCHECK(port == 0 || port == 1); - DCHECK(message); - - DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); - DCHECK(endpoints_[GetPeerPort(port)]); - - // The destination port need not be open, unlike the source port. - if (!endpoints_[port]) - return MOJO_RESULT_FAILED_PRECONDITION; - - if (transports) { - MojoResult result = AttachTransportsNoLock(port, message.get(), transports); - if (result != MOJO_RESULT_OK) - return result; - } - - // The endpoint's |EnqueueMessage()| may not report failure. - endpoints_[port]->EnqueueMessage(message.Pass()); - return MOJO_RESULT_OK; -} - -MojoResult MessagePipe::AttachTransportsNoLock( - unsigned port, - MessageInTransit* message, - std::vector<DispatcherTransport>* transports) { - DCHECK(!message->has_dispatchers()); - - // You're not allowed to send either handle to a message pipe over the message - // pipe, so check for this. (The case of trying to write a handle to itself is - // taken care of by |Core|. That case kind of makes sense, but leads to - // complications if, e.g., both sides try to do the same thing with their - // respective handles simultaneously. The other case, of trying to write the - // peer handle to a handle, doesn't make sense -- since no handle will be - // available to read the message from.) - for (size_t i = 0; i < transports->size(); i++) { - if (!(*transports)[i].is_valid()) - continue; - if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) { - MessagePipeDispatcherTransport mp_transport((*transports)[i]); - if (mp_transport.GetMessagePipe() == this) { - // The other case should have been disallowed by |Core|. (Note: |port| - // is the peer port of the handle given to |WriteMessage()|.) - DCHECK_EQ(mp_transport.GetPort(), port); - return MOJO_RESULT_INVALID_ARGUMENT; - } - } - } - - // Clone the dispatchers and attach them to the message. (This must be done as - // a separate loop, since we want to leave the dispatchers alone on failure.) - scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); - dispatchers->reserve(transports->size()); - for (size_t i = 0; i < transports->size(); i++) { - if ((*transports)[i].is_valid()) { - dispatchers->push_back( - (*transports)[i].CreateEquivalentDispatcherAndClose()); - } else { - LOG(WARNING) << "Enqueueing null dispatcher"; - dispatchers->push_back(nullptr); - } - } - message->SetDispatchers(dispatchers.Pass()); - return MOJO_RESULT_OK; -} - -} // namespace system -} // namespace mojo |