// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/system/channel.h" #include "base/basictypes.h" #include "base/bind.h" #include "base/compiler_specific.h" #include "base/logging.h" #include "base/message_loop/message_loop.h" #include "base/strings/stringprintf.h" namespace mojo { namespace system { COMPILE_ASSERT(Channel::kBootstrapEndpointId != MessageInTransit::kInvalidEndpointId, kBootstrapEndpointId_is_invalid); STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId Channel::kBootstrapEndpointId; Channel::EndpointInfo::EndpointInfo() { } Channel::EndpointInfo::EndpointInfo(scoped_refptr message_pipe, unsigned port) : message_pipe(message_pipe), port(port) { } Channel::EndpointInfo::~EndpointInfo() { } Channel::Channel() : next_local_id_(kBootstrapEndpointId) { } bool Channel::Init(const PlatformChannelHandle& handle) { DCHECK(creation_thread_checker_.CalledOnValidThread()); // No need to take |lock_|, since this must be called before this object // becomes thread-safe. DCHECK(!raw_channel_.get()); raw_channel_.reset( RawChannel::Create(handle, this, base::MessageLoop::current())); if (!raw_channel_->Init()) { raw_channel_.reset(); return false; } return true; } void Channel::Shutdown() { DCHECK(creation_thread_checker_.CalledOnValidThread()); base::AutoLock locker(lock_); DCHECK(raw_channel_.get()); raw_channel_->Shutdown(); raw_channel_.reset(); // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that // it's empty? } MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint( scoped_refptr message_pipe, unsigned port) { MessageInTransit::EndpointId local_id; { base::AutoLock locker(lock_); while (next_local_id_ == MessageInTransit::kInvalidEndpointId || local_id_to_endpoint_info_map_.find(next_local_id_) != local_id_to_endpoint_info_map_.end()) next_local_id_++; local_id = next_local_id_; next_local_id_++; // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid // some expensive reference count increment/decrements.) Once this is done, // we should be able to delete |EndpointInfo|'s default constructor. local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port); } message_pipe->Attach(port, scoped_refptr(this), local_id); return local_id; } void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id, MessageInTransit::EndpointId remote_id) { EndpointInfo endpoint_info; { base::AutoLock locker(lock_); IdToEndpointInfoMap::const_iterator it = local_id_to_endpoint_info_map_.find(local_id); CHECK(it != local_id_to_endpoint_info_map_.end()); endpoint_info = it->second; } endpoint_info.message_pipe->Run(endpoint_info.port, remote_id); } bool Channel::WriteMessage(MessageInTransit* message) { base::AutoLock locker(lock_); if (!raw_channel_.get()) { // TODO(vtl): I think this is probably not an error condition, but I should // think about it (and the shutdown sequence) more carefully. LOG(INFO) << "WriteMessage() after shutdown"; return false; } return raw_channel_->WriteMessage(message); } void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) { DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); base::AutoLock locker_(lock_); local_id_to_endpoint_info_map_.erase(local_id); } Channel::~Channel() { // The channel should have been shut down first. DCHECK(!raw_channel_.get()); } void Channel::OnReadMessage(const MessageInTransit& message) { switch (message.type()) { case MessageInTransit::kTypeMessagePipeEndpoint: case MessageInTransit::kTypeMessagePipe: OnReadMessageForDownstream(message); break; case MessageInTransit::TYPE_CHANNEL: OnReadMessageForChannel(message); break; default: HandleRemoteError(base::StringPrintf( "Received message of invalid type %u", static_cast(message.type()))); break; } } void Channel::OnFatalError(FatalError fatal_error) { // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead. NOTIMPLEMENTED(); } void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint || message.type() == MessageInTransit::kTypeMessagePipe); MessageInTransit::EndpointId local_id = message.destination_id(); if (local_id == MessageInTransit::kInvalidEndpointId) { HandleRemoteError("Received message with no destination ID"); return; } EndpointInfo endpoint_info; { base::AutoLock locker(lock_); // Since we own |raw_channel_|, and this method and |Shutdown()| should only // be called from the creation thread, |raw_channel_| should never be null // here. DCHECK(raw_channel_.get()); IdToEndpointInfoMap::const_iterator it = local_id_to_endpoint_info_map_.find(local_id); if (it == local_id_to_endpoint_info_map_.end()) { HandleRemoteError(base::StringPrintf( "Received a message for nonexistent local destination ID %u", static_cast(local_id))); return; } endpoint_info = it->second; } // We need to duplicate the message, because |EnqueueMessage()| will take // ownership of it. MessageInTransit* own_message = MessageInTransit::Create( message.type(), message.subtype(), message.data(), message.data_size()); if (endpoint_info.message_pipe->EnqueueMessage( MessagePipe::GetPeerPort(endpoint_info.port), own_message, NULL) != MOJO_RESULT_OK) { HandleLocalError(base::StringPrintf( "Failed to enqueue message to local destination ID %u", static_cast(local_id))); return; } } void Channel::OnReadMessageForChannel(const MessageInTransit& message) { // TODO(vtl): Currently no channel-only messages yet. HandleRemoteError("Received invalid channel message"); NOTREACHED(); } void Channel::HandleRemoteError(const base::StringPiece& error_message) { // TODO(vtl): Is this how we really want to handle this? LOG(INFO) << error_message; } void Channel::HandleLocalError(const base::StringPiece& error_message) { // TODO(vtl): Is this how we really want to handle this? LOG(FATAL) << error_message; } } // namespace system } // namespace mojo