// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "ipc/ipc_channel_reader.h" #include #include #include "base/message_loop/message_loop.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_logging.h" #include "ipc/ipc_message.h" #include "ipc/ipc_message_attachment_set.h" #include "ipc/ipc_message_macros.h" namespace IPC { namespace internal { ChannelReader::ChannelReader(Listener* listener) : listener_(listener), max_input_buffer_size_(Channel::kMaximumReadBufferSize) { memset(input_buf_, 0, sizeof(input_buf_)); } ChannelReader::~ChannelReader() { DCHECK(blocked_ids_.empty()); } ChannelReader::DispatchState ChannelReader::ProcessIncomingMessages() { while (true) { int bytes_read = 0; ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, &bytes_read); if (read_state == READ_FAILED) return DISPATCH_ERROR; if (read_state == READ_PENDING) return DISPATCH_FINISHED; DCHECK(bytes_read > 0); if (!TranslateInputData(input_buf_, bytes_read)) return DISPATCH_ERROR; DispatchState state = DispatchMessages(); if (state != DISPATCH_FINISHED) return state; } } ChannelReader::DispatchState ChannelReader::AsyncReadComplete(int bytes_read) { if (!TranslateInputData(input_buf_, bytes_read)) return DISPATCH_ERROR; return DispatchMessages(); } bool ChannelReader::IsInternalMessage(const Message& m) { return m.routing_id() == MSG_ROUTING_NONE && m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE && m.type() <= Channel::HELLO_MESSAGE_TYPE; } bool ChannelReader::IsHelloMessage(const Message& m) { return m.routing_id() == MSG_ROUTING_NONE && m.type() == Channel::HELLO_MESSAGE_TYPE; } void ChannelReader::CleanUp() { if (!blocked_ids_.empty()) { StopObservingAttachmentBroker(); blocked_ids_.clear(); } } void ChannelReader::DispatchMessage(Message* m) { EmitLogBeforeDispatch(*m); listener_->OnMessageReceived(*m); HandleDispatchError(*m); } bool ChannelReader::TranslateInputData(const char* input_data, int input_data_len) { const char* p; const char* end; // Possibly combine with the overflow buffer to make a larger buffer. if (input_overflow_buf_.empty()) { p = input_data; end = input_data + input_data_len; } else { if (!CheckMessageSize(input_overflow_buf_.size() + input_data_len)) return false; input_overflow_buf_.append(input_data, input_data_len); p = input_overflow_buf_.data(); end = p + input_overflow_buf_.size(); } size_t next_message_size = 0; // Dispatch all complete messages in the data buffer. while (p < end) { Message::NextMessageInfo info; Message::FindNext(p, end, &info); if (info.message_found) { int pickle_len = static_cast(info.pickle_end - p); Message translated_message(p, pickle_len); if (!HandleTranslatedMessage(&translated_message, info.attachment_ids)) return false; p = info.message_end; } else { // Last message is partial. next_message_size = info.message_size; if (!CheckMessageSize(next_message_size)) return false; break; } } // Account for the case where last message's byte is in the next data chunk. size_t next_message_buffer_size = next_message_size ? next_message_size + Channel::kReadBufferSize - 1: 0; // Save any partial data in the overflow buffer. if (p != input_overflow_buf_.data()) input_overflow_buf_.assign(p, end - p); if (!input_overflow_buf_.empty()) { // We have something in the overflow buffer, which means that we will // append the next data chunk (instead of parsing it directly). So we // resize the buffer to fit the next message, to avoid repeatedly // growing the buffer as we receive all message' data chunks. if (next_message_buffer_size > input_overflow_buf_.capacity()) { input_overflow_buf_.reserve(next_message_buffer_size); } } // Trim the buffer if we can if (next_message_buffer_size < max_input_buffer_size_ && input_overflow_buf_.size() < max_input_buffer_size_ && input_overflow_buf_.capacity() > max_input_buffer_size_) { // std::string doesn't really have a method to shrink capacity to // a specific value, so we have to swap with another string. std::string trimmed_buf; trimmed_buf.reserve(max_input_buffer_size_); if (trimmed_buf.capacity() > max_input_buffer_size_) { // Since we don't control how much space reserve() actually reserves, // we have to go other way around and change the max size to avoid // getting into the outer if() again. max_input_buffer_size_ = trimmed_buf.capacity(); } trimmed_buf.assign(input_overflow_buf_.data(), input_overflow_buf_.size()); input_overflow_buf_.swap(trimmed_buf); } if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) return false; return true; } bool ChannelReader::HandleTranslatedMessage( Message* translated_message, const AttachmentIdVector& attachment_ids) { // Immediately handle internal messages. if (IsInternalMessage(*translated_message)) { EmitLogBeforeDispatch(*translated_message); HandleInternalMessage(*translated_message); HandleDispatchError(*translated_message); return true; } translated_message->set_sender_pid(GetSenderPID()); // Immediately handle attachment broker messages. if (DispatchAttachmentBrokerMessage(*translated_message)) { // Ideally, the log would have been emitted prior to dispatching the // message, but that would require this class to know more about the // internals of attachment brokering, which should be avoided. EmitLogBeforeDispatch(*translated_message); HandleDispatchError(*translated_message); return true; } return HandleExternalMessage(translated_message, attachment_ids); } bool ChannelReader::HandleExternalMessage( Message* external_message, const AttachmentIdVector& attachment_ids) { for (const auto& id : attachment_ids) external_message->AddPlaceholderBrokerableAttachmentWithId(id); if (!GetNonBrokeredAttachments(external_message)) return false; // If there are no queued messages, attempt to immediately dispatch the // newly translated message. if (queued_messages_.empty()) { DCHECK(blocked_ids_.empty()); AttachmentIdSet blocked_ids = GetBrokeredAttachments(external_message); if (blocked_ids.empty()) { DispatchMessage(external_message); return true; } blocked_ids_.swap(blocked_ids); StartObservingAttachmentBroker(); } // Make a deep copy of |external_message| to add to the queue. scoped_ptr m(new Message(*external_message)); queued_messages_.push_back(m.release()); return true; } void ChannelReader::HandleDispatchError(const Message& message) { if (message.dispatch_error()) listener_->OnBadMessageReceived(message); } void ChannelReader::EmitLogBeforeDispatch(const Message& message) { #ifdef IPC_MESSAGE_LOG_ENABLED std::string name; Logging::GetInstance()->GetMessageText(message.type(), &name, &message, NULL); TRACE_EVENT_WITH_FLOW1("ipc,toplevel", "ChannelReader::DispatchInputData", message.flags(), TRACE_EVENT_FLAG_FLOW_IN, "name", name); #else TRACE_EVENT_WITH_FLOW2("ipc,toplevel", "ChannelReader::DispatchInputData", message.flags(), TRACE_EVENT_FLAG_FLOW_IN, "class", IPC_MESSAGE_ID_CLASS(message.type()), "line", IPC_MESSAGE_ID_LINE(message.type())); #endif } bool ChannelReader::DispatchAttachmentBrokerMessage(const Message& message) { #if USE_ATTACHMENT_BROKER if (IsAttachmentBrokerEndpoint() && GetAttachmentBroker()) { return GetAttachmentBroker()->OnMessageReceived(message); } #endif // USE_ATTACHMENT_BROKER return false; } ChannelReader::DispatchState ChannelReader::DispatchMessages() { while (!queued_messages_.empty()) { if (!blocked_ids_.empty()) return DISPATCH_WAITING_ON_BROKER; Message* m = queued_messages_.front(); AttachmentIdSet blocked_ids = GetBrokeredAttachments(m); if (!blocked_ids.empty()) { blocked_ids_.swap(blocked_ids); StartObservingAttachmentBroker(); return DISPATCH_WAITING_ON_BROKER; } DispatchMessage(m); queued_messages_.erase(queued_messages_.begin()); } return DISPATCH_FINISHED; } ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( Message* msg) { std::set blocked_ids; #if USE_ATTACHMENT_BROKER MessageAttachmentSet* set = msg->attachment_set(); std::vector> brokerable_attachments_copy(set->GetBrokerableAttachments()); for (const auto& attachment : brokerable_attachments_copy) { if (attachment->NeedsBrokering()) { AttachmentBroker* broker = GetAttachmentBroker(); DCHECK(broker); scoped_refptr brokered_attachment; bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), &brokered_attachment); if (!result) { blocked_ids.insert(attachment->GetIdentifier()); continue; } set->ReplacePlaceholderWithAttachment(brokered_attachment); } } #endif // USE_ATTACHMENT_BROKER return blocked_ids; } void ChannelReader::ReceivedBrokerableAttachmentWithId( const BrokerableAttachment::AttachmentId& id) { if (blocked_ids_.empty()) return; auto it = find(blocked_ids_.begin(), blocked_ids_.end(), id); if (it != blocked_ids_.end()) blocked_ids_.erase(it); if (blocked_ids_.empty()) { StopObservingAttachmentBroker(); DispatchMessages(); } } void ChannelReader::StartObservingAttachmentBroker() { #if USE_ATTACHMENT_BROKER GetAttachmentBroker()->AddObserver( this, base::MessageLoopForIO::current()->task_runner()); #endif // USE_ATTACHMENT_BROKER } void ChannelReader::StopObservingAttachmentBroker() { #if USE_ATTACHMENT_BROKER GetAttachmentBroker()->RemoveObserver(this); #endif // USE_ATTACHMENT_BROKER } bool ChannelReader::CheckMessageSize(size_t size) { if (size <= Channel::kMaximumMessageSize) { return true; } input_overflow_buf_.clear(); LOG(ERROR) << "IPC message is too big: " << size; return false; } } // namespace internal } // namespace IPC