// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/system/local_message_pipe_endpoint.h" #include #include "base/logging.h" #include "mojo/system/dispatcher.h" #include "mojo/system/message_in_transit.h" namespace mojo { namespace system { LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() : is_open_(true), is_peer_open_(true) { } LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { DCHECK(!is_open_); DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open. } MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const { return kTypeLocal; } bool LocalMessagePipeEndpoint::OnPeerClose() { DCHECK(is_open_); DCHECK(is_peer_open_); MojoWaitFlags old_satisfied_flags = SatisfiedFlags(); MojoWaitFlags old_satisfiable_flags = SatisfiableFlags(); is_peer_open_ = false; MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); if (new_satisfied_flags != old_satisfied_flags || new_satisfiable_flags != old_satisfiable_flags) { waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, new_satisfiable_flags); } return true; } void LocalMessagePipeEndpoint::EnqueueMessage( scoped_ptr message) { DCHECK(is_open_); DCHECK(is_peer_open_); bool was_empty = message_queue_.IsEmpty(); message_queue_.AddMessage(message.Pass()); if (was_empty) { waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), SatisfiableFlags()); } } void LocalMessagePipeEndpoint::Close() { DCHECK(is_open_); is_open_ = false; message_queue_.Clear(); } void LocalMessagePipeEndpoint::CancelAllWaiters() { DCHECK(is_open_); waiter_list_.CancelAllWaiters(); } MojoResult LocalMessagePipeEndpoint::ReadMessage(void* bytes, uint32_t* num_bytes, DispatcherVector* dispatchers, uint32_t* num_dispatchers, MojoReadMessageFlags flags) { DCHECK(is_open_); DCHECK(!dispatchers || dispatchers->empty()); const uint32_t max_bytes = num_bytes ? *num_bytes : 0; const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; if (message_queue_.IsEmpty()) { return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT : MOJO_RESULT_FAILED_PRECONDITION; } // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop // and release the lock immediately. bool enough_space = true; MessageInTransit* message = message_queue_.PeekMessage(); if (num_bytes) *num_bytes = message->num_bytes(); if (message->num_bytes() <= max_bytes) memcpy(bytes, message->bytes(), message->num_bytes()); else enough_space = false; if (DispatcherVector* queued_dispatchers = message->dispatchers()) { if (num_dispatchers) *num_dispatchers = static_cast(queued_dispatchers->size()); if (enough_space) { if (queued_dispatchers->empty()) { // Nothing to do. } else if (queued_dispatchers->size() <= max_num_dispatchers) { DCHECK(dispatchers); dispatchers->swap(*queued_dispatchers); } else { enough_space = false; } } } else { if (num_dispatchers) *num_dispatchers = 0; } message = NULL; if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { message_queue_.DiscardMessage(); // Now it's empty, thus no longer readable. if (message_queue_.IsEmpty()) { // It's currently not possible to wait for non-readability, but we should // do the state change anyway. waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), SatisfiableFlags()); } } if (!enough_space) return MOJO_RESULT_RESOURCE_EXHAUSTED; return MOJO_RESULT_OK; } MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, MojoWaitFlags flags, MojoResult wake_result) { DCHECK(is_open_); if ((flags & SatisfiedFlags())) return MOJO_RESULT_ALREADY_EXISTS; if (!(flags & SatisfiableFlags())) return MOJO_RESULT_FAILED_PRECONDITION; waiter_list_.AddWaiter(waiter, flags, wake_result); return MOJO_RESULT_OK; } void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { DCHECK(is_open_); waiter_list_.RemoveWaiter(waiter); } MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() { MojoWaitFlags satisfied_flags = 0; if (!message_queue_.IsEmpty()) satisfied_flags |= MOJO_WAIT_FLAG_READABLE; if (is_peer_open_) satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; return satisfied_flags; } MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() { MojoWaitFlags satisfiable_flags = 0; if (!message_queue_.IsEmpty() || is_peer_open_) satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; if (is_peer_open_) satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; return satisfiable_flags; } } // namespace system } // namespace mojo