diff options
Diffstat (limited to 'mojo/edk/system/raw_channel.cc')
-rw-r--r-- | mojo/edk/system/raw_channel.cc | 631 |
1 files changed, 631 insertions, 0 deletions
diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc new file mode 100644 index 0000000..acada52 --- /dev/null +++ b/mojo/edk/system/raw_channel.cc @@ -0,0 +1,631 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/raw_channel.h" + +#include <string.h> + +#include <algorithm> + +#include "base/bind.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "mojo/edk/embedder/embedder_internal.h" +#include "mojo/edk/system/message_in_transit.h" +#include "mojo/edk/system/transport_data.h" + +namespace mojo { +namespace edk { + +const size_t kReadSize = 4096; + +// RawChannel::ReadBuffer ------------------------------------------------------ + +RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { +} + +RawChannel::ReadBuffer::~ReadBuffer() { +} + +void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { + DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); + *addr = &buffer_[0] + num_valid_bytes_; + *size = kReadSize; +} + +// RawChannel::WriteBuffer ----------------------------------------------------- + +RawChannel::WriteBuffer::WriteBuffer() + : serialized_platform_handle_size_(0), + platform_handles_offset_(0), + data_offset_(0) { +} + +RawChannel::WriteBuffer::~WriteBuffer() { + message_queue_.Clear(); +} + +bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { + if (message_queue_.IsEmpty()) + return false; + + const TransportData* transport_data = + message_queue_.PeekMessage()->transport_data(); + if (!transport_data) + return false; + + const PlatformHandleVector* all_platform_handles = + transport_data->platform_handles(); + if (!all_platform_handles) { + DCHECK_EQ(platform_handles_offset_, 0u); + return false; + } + if (platform_handles_offset_ >= all_platform_handles->size()) { + DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); + return false; + } + + return true; +} + +void RawChannel::WriteBuffer::GetPlatformHandlesToSend( + size_t* num_platform_handles, + PlatformHandle** platform_handles, + void** serialization_data) { + DCHECK(HavePlatformHandlesToSend()); + + MessageInTransit* message = message_queue_.PeekMessage(); + TransportData* transport_data = message->transport_data(); + PlatformHandleVector* all_platform_handles = + transport_data->platform_handles(); + *num_platform_handles = + all_platform_handles->size() - platform_handles_offset_; + *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; + + if (serialized_platform_handle_size_ > 0) { + size_t serialization_data_offset = + transport_data->platform_handle_table_offset(); + serialization_data_offset += + platform_handles_offset_ * serialized_platform_handle_size_; + *serialization_data = static_cast<char*>(transport_data->buffer()) + + serialization_data_offset; + } else { + *serialization_data = nullptr; + } +} + +void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { + buffers->clear(); + + if (message_queue_.IsEmpty()) + return; + + const MessageInTransit* message = message_queue_.PeekMessage(); + DCHECK_LT(data_offset_, message->total_size()); + size_t bytes_to_write = message->total_size() - data_offset_; + + size_t transport_data_buffer_size = + message->transport_data() ? message->transport_data()->buffer_size() : 0; + + if (!transport_data_buffer_size) { + // Only write from the main buffer. + DCHECK_LT(data_offset_, message->main_buffer_size()); + DCHECK_LE(bytes_to_write, message->main_buffer_size()); + Buffer buffer = { + static_cast<const char*>(message->main_buffer()) + data_offset_, + bytes_to_write}; + + buffers->push_back(buffer); + return; + } + + if (data_offset_ >= message->main_buffer_size()) { + // Only write from the transport data buffer. + DCHECK_LT(data_offset_ - message->main_buffer_size(), + transport_data_buffer_size); + DCHECK_LE(bytes_to_write, transport_data_buffer_size); + Buffer buffer = { + static_cast<const char*>(message->transport_data()->buffer()) + + (data_offset_ - message->main_buffer_size()), + bytes_to_write}; + + buffers->push_back(buffer); + return; + } + + // TODO(vtl): We could actually send out buffers from multiple messages, with + // the "stopping" condition being reaching a message with platform handles + // attached. + + // Write from both buffers. + DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + + transport_data_buffer_size); + Buffer buffer1 = { + static_cast<const char*>(message->main_buffer()) + data_offset_, + message->main_buffer_size() - data_offset_}; + buffers->push_back(buffer1); + Buffer buffer2 = { + static_cast<const char*>(message->transport_data()->buffer()), + transport_data_buffer_size}; + buffers->push_back(buffer2); +} + +// RawChannel ------------------------------------------------------------------ + +RawChannel::RawChannel() + : message_loop_for_io_(nullptr), + delegate_(nullptr), + write_ready_(false), + write_stopped_(false), + error_occurred_(false), + weak_ptr_factory_(this) { + read_buffer_.reset(new ReadBuffer); + write_buffer_.reset(new WriteBuffer()); +} + +RawChannel::~RawChannel() { + DCHECK(!read_buffer_); + DCHECK(!write_buffer_); + + // Only want to decrement counter if Init was called. + if (message_loop_for_io_) { + // No need to take the |write_lock_| here -- if there are still weak + // pointers outstanding, then we're hosed anyway (since we wouldn't be able + // to invalidate them cleanly, since we might not be on the I/O thread). + // DCHECK(!weak_ptr_factory_.HasWeakPtrs()); + internal::ChannelShutdown(); + } +} + +void RawChannel::Init(Delegate* delegate) { + internal::ChannelStarted(); + DCHECK(delegate); + + base::AutoLock read_locker(read_lock_); + // solves race where initialiing on io thread while main thread is serializing + // this channel and releases handle. + base::AutoLock locker(write_lock_); + + DCHECK(!delegate_); + delegate_ = delegate; + + DCHECK(!message_loop_for_io_); + message_loop_for_io_ = + static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); + + OnInit(); + + if (read_buffer_->num_valid_bytes()) { + // We had serialized read buffer data through SetInitialReadBufferData call. + // Make sure we read messages out of it now, otherwise the delegate won't + // get notified if no other data gets written to the pipe. + // Although this means that we can call back synchronously into the caller, + // that's easier than posting a task to do this. That is because if we post + // a task, a pending read could have started and we wouldn't be able to move + // the read buffer since it can be in use by the OS in an async operation. + bool did_dispatch_message = false; + bool stop_dispatching = false; + DispatchMessages(&did_dispatch_message, &stop_dispatching); + } + + IOResult io_result = ScheduleRead(); + if (io_result != IO_PENDING) { + // This will notify the delegate about the read failure. Although we're on + // the I/O thread, don't call it in the nested context. + message_loop_for_io_->PostTask( + FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, + weak_ptr_factory_.GetWeakPtr(), io_result, 0)); + } + // Note: |ScheduleRead()| failure is treated as a read failure (by notifying + // the delegate), not an initialization failure. + + write_ready_ = true; + write_buffer_->serialized_platform_handle_size_ = + GetSerializedPlatformHandleSize(); + if (!write_buffer_->message_queue_.IsEmpty()) + SendQueuedMessagesNoLock(); +} + +void RawChannel::Shutdown() { + weak_ptr_factory_.InvalidateWeakPtrs(); + + // Normally, we want to flush any pending writes before shutting down. This + // doesn't apply when 1) we don't have a handle (for obvious reasons) or + // 2) when the other side already quit and asked us to close the handle to + // ensure that we read everything out of the pipe first. + if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) { + { + base::AutoLock read_locker(read_lock_); + base::AutoLock locker(write_lock_); + OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); + } + delete this; + return; + } + + base::AutoLock read_locker(read_lock_); + base::AutoLock locker(write_lock_); + DCHECK(read_buffer_->num_valid_bytes() == 0) << + "RawChannel::Shutdown called but there is pending data to be read"; + + // happens on shutdown if didn't call init when doing createduplicate + if (message_loop_for_io()) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + } + + // Reset the delegate so that it won't receive further calls. + delegate_ = nullptr; + + bool empty = write_buffer_->message_queue_.IsEmpty(); + + // We may have no messages to write. However just because our end of the pipe + // wrote everything doesn't mean that the other end read it. We don't want to + // call FlushFileBuffers since a) that only works for server end of the pipe, + // and b) it pauses this thread (which can block a process on another, or + // worse hang if both pipes are in the same process). + scoped_ptr<MessageInTransit> quit_message(new MessageInTransit( + MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr)); + EnqueueMessageNoLock(quit_message.Pass()); + write_stopped_ = true; + + if (empty) + SendQueuedMessagesNoLock(); +} + +ScopedPlatformHandle RawChannel::ReleaseHandle( + std::vector<char>* read_buffer) { + ScopedPlatformHandle rv; + { + base::AutoLock read_locker(read_lock_); + base::AutoLock locker(write_lock_); + rv = ReleaseHandleNoLock(read_buffer); + + // TODO(jam); if we use these, use nolock versions of these methods that are + // copied. + if (!write_buffer_->message_queue_.IsEmpty()) { + NOTREACHED() << "TODO(JAM)"; + } + + delegate_ = nullptr; + + // The Unretained is safe because above cancelled IO so we shouldn't get any + // channel errors. + // |message_loop_for_io_| might not be set yet + internal::g_io_thread_task_runner->PostTask( + FROM_HERE, + base::Bind(&RawChannel::Shutdown, base::Unretained(this))); + } + + return rv; +} + +// Reminder: This must be thread-safe. +bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { + DCHECK(message); + base::AutoLock locker(write_lock_); + if (write_stopped_) + return false; + + bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); + EnqueueMessageNoLock(message.Pass()); + if (queue_was_empty && write_ready_) + return SendQueuedMessagesNoLock(); + + return true; +} + +bool RawChannel::SendQueuedMessagesNoLock() { + DCHECK_EQ(write_buffer_->data_offset_, 0u); + + size_t platform_handles_written = 0; + size_t bytes_written = 0; + IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); + if (io_result == IO_PENDING) + return true; + + bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, + bytes_written); + if (!result) { + // Even if we're on the I/O thread, don't call |OnError()| in the nested + // context. + message_loop_for_io_->PostTask( + FROM_HERE, + base::Bind(&RawChannel::LockAndCallOnError, + weak_ptr_factory_.GetWeakPtr(), + Delegate::ERROR_WRITE)); + } + + return result; +} + +// Reminder: This must be thread-safe. +bool RawChannel::IsWriteBufferEmpty() { + base::AutoLock locker(write_lock_); + return write_buffer_->message_queue_.IsEmpty(); +} + +bool RawChannel::IsReadBufferEmpty() { + base::AutoLock locker(read_lock_); + return read_buffer_->num_valid_bytes_ != 0; +} + +void RawChannel::SetInitialReadBufferData(char* data, size_t size) { + base::AutoLock locker(read_lock_); + // TODO(jam): copy power of 2 algorithm below? or share. + read_buffer_->buffer_.resize(size+kReadSize); + memcpy(&read_buffer_->buffer_[0], data, size); + read_buffer_->num_valid_bytes_ = size; +} + +void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + + base::AutoLock locker(read_lock_); + + // Keep reading data in a loop, and dispatch messages if enough data is + // received. Exit the loop if any of the following happens: + // - one or more messages were dispatched; + // - the last read failed, was a partial read or would block; + // - |Shutdown()| was called. + do { + switch (io_result) { + case IO_SUCCEEDED: + break; + case IO_FAILED_SHUTDOWN: + case IO_FAILED_BROKEN: + case IO_FAILED_UNKNOWN: + CallOnError(ReadIOResultToError(io_result)); + return; // |this| may have been destroyed in |CallOnError()|. + case IO_PENDING: + NOTREACHED(); + return; + } + + read_buffer_->num_valid_bytes_ += bytes_read; + + // Dispatch all the messages that we can. + bool did_dispatch_message = false; + bool stop_dispatching = false; + DispatchMessages(&did_dispatch_message, &stop_dispatching); + if (stop_dispatching) + return; + + if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < + kReadSize) { + // Use power-of-2 buffer sizes. + // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the + // maximum message size to whatever extent necessary). + // TODO(vtl): We may often be able to peek at the header and get the real + // required extra space (which may be much bigger than |kReadSize|). + size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); + while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) + new_size *= 2; + + // TODO(vtl): It's suboptimal to zero out the fresh memory. + read_buffer_->buffer_.resize(new_size, 0); + } + + // (1) If we dispatched any messages, stop reading for now (and let the + // message loop do its thing for another round). + // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only + // a single message. Risks: slower, more complex if we want to avoid lots of + // copying. ii. Keep reading until there's no more data and dispatch all the + // messages we can. Risks: starvation of other users of the message loop.) + // (2) If we didn't max out |kReadSize|, stop reading for now. + bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; + bytes_read = 0; + io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); + } while (io_result != IO_PENDING); +} + +void RawChannel::OnWriteCompleted(IOResult io_result, + size_t platform_handles_written, + size_t bytes_written) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + DCHECK_NE(io_result, IO_PENDING); + + bool did_fail = false; + { + base::AutoLock locker(write_lock_); + did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, + bytes_written); + } + + if (did_fail) { + LockAndCallOnError(Delegate::ERROR_WRITE); + return; // |this| may have been destroyed in |CallOnError()|. + } +} + +void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { + write_lock_.AssertAcquired(); + DCHECK(HandleForDebuggingNoLock().is_valid()); + write_buffer_->message_queue_.AddMessage(message.Pass()); +} + +bool RawChannel::OnReadMessageForRawChannel( + const MessageInTransit::View& message_view) { + if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { + message_loop_for_io_->PostTask( + FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, + weak_ptr_factory_.GetWeakPtr(), + Delegate::ERROR_READ_SHUTDOWN)); + return true; + } + + // No non-implementation specific |RawChannel| control messages. + LOG(ERROR) << "Invalid control message (type " << message_view.type() + << ")"; + return false; +} + +RawChannel::Delegate::Error RawChannel::ReadIOResultToError( + IOResult io_result) { + switch (io_result) { + case IO_FAILED_SHUTDOWN: + return Delegate::ERROR_READ_SHUTDOWN; + case IO_FAILED_BROKEN: + return Delegate::ERROR_READ_BROKEN; + case IO_FAILED_UNKNOWN: + return Delegate::ERROR_READ_UNKNOWN; + case IO_SUCCEEDED: + case IO_PENDING: + NOTREACHED(); + break; + } + return Delegate::ERROR_READ_UNKNOWN; +} + +void RawChannel::CallOnError(Delegate::Error error) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); + read_lock_.AssertAcquired(); + error_occurred_ = true; + if (delegate_) { + delegate_->OnError(error); + } else { + // We depend on delegate to delete since it could be waiting to call + // ReleaseHandle. + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); + } +} + +void RawChannel::LockAndCallOnError(Delegate::Error error) { + base::AutoLock locker(read_lock_); + CallOnError(error); +} + +bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, + size_t platform_handles_written, + size_t bytes_written) { + write_lock_.AssertAcquired(); + + DCHECK(!write_buffer_->message_queue_.IsEmpty()); + + if (io_result == IO_SUCCEEDED) { + write_buffer_->platform_handles_offset_ += platform_handles_written; + write_buffer_->data_offset_ += bytes_written; + + MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); + if (write_buffer_->data_offset_ >= message->total_size()) { + // Complete write. + CHECK_EQ(write_buffer_->data_offset_, message->total_size()); + write_buffer_->message_queue_.DiscardMessage(); + write_buffer_->platform_handles_offset_ = 0; + write_buffer_->data_offset_ = 0; + + if (write_buffer_->message_queue_.IsEmpty()) + return true; + } + + // Schedule the next write. + io_result = ScheduleWriteNoLock(); + if (io_result == IO_PENDING) + return true; + DCHECK_NE(io_result, IO_SUCCEEDED); + } + + write_stopped_ = true; + write_buffer_->message_queue_.Clear(); + write_buffer_->platform_handles_offset_ = 0; + write_buffer_->data_offset_ = 0; + return false; +} + +void RawChannel::DispatchMessages(bool* did_dispatch_message, + bool* stop_dispatching) { + *did_dispatch_message = false; + *stop_dispatching = false; + // Tracks the offset of the first undispatched message in |read_buffer_|. + // Currently, we copy data to ensure that this is zero at the beginning. + size_t read_buffer_start = 0; + size_t remaining_bytes = read_buffer_->num_valid_bytes_; + size_t message_size; + // Note that we rely on short-circuit evaluation here: + // - |read_buffer_start| may be an invalid index into + // |read_buffer_->buffer_| if |remaining_bytes| is zero. + // - |message_size| is only valid if |GetNextMessageSize()| returns true. + // TODO(vtl): Use |message_size| more intelligently (e.g., to request the + // next read). + // TODO(vtl): Validate that |message_size| is sane. + while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( + &read_buffer_->buffer_[read_buffer_start], + remaining_bytes, &message_size) && + remaining_bytes >= message_size) { + MessageInTransit::View message_view( + message_size, &read_buffer_->buffer_[read_buffer_start]); + DCHECK_EQ(message_view.total_size(), message_size); + + const char* error_message = nullptr; + if (!message_view.IsValid(GetSerializedPlatformHandleSize(), + &error_message)) { + DCHECK(error_message); + LOG(ERROR) << "Received invalid message: " << error_message; + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); + *stop_dispatching = true; + return; // |this| may have been destroyed in |CallOnError()|. + } + + if (message_view.type() != MessageInTransit::Type::MESSAGE) { + if (!OnReadMessageForRawChannel(message_view)) { + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); + *stop_dispatching = true; + return; // |this| may have been destroyed in |CallOnError()|. + } + } else { + ScopedPlatformHandleVectorPtr platform_handles; + if (message_view.transport_data_buffer()) { + size_t num_platform_handles; + const void* platform_handle_table; + TransportData::GetPlatformHandleTable( + message_view.transport_data_buffer(), &num_platform_handles, + &platform_handle_table); + + if (num_platform_handles > 0) { + platform_handles = + GetReadPlatformHandles(num_platform_handles, + platform_handle_table).Pass(); + if (!platform_handles) { + LOG(ERROR) << "Invalid number of platform handles received"; + CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); + *stop_dispatching = true; + return; // |this| may have been destroyed in |CallOnError()|. + } + } + } + + // TODO(vtl): In the case that we aren't expecting any platform handles, + // for the POSIX implementation, we should confirm that none are stored. + + // Dispatch the message. + // Note: it's valid to get here without a delegate. i.e. after Shutdown + // is called, if this object still has a valid handle we keep it alive + // until the other side closes it in response to the RAW_CHANNEL_QUIT + // message. In the meantime the sender could have sent us a message. + if (delegate_) + delegate_->OnReadMessage(message_view, platform_handles.Pass()); + } + + *did_dispatch_message = true; + + // Update our state. + read_buffer_start += message_size; + remaining_bytes -= message_size; + } + + if (read_buffer_start > 0) { + // Move data back to start. + read_buffer_->num_valid_bytes_ = remaining_bytes; + if (read_buffer_->num_valid_bytes_ > 0) { + memmove(&read_buffer_->buffer_[0], + &read_buffer_->buffer_[read_buffer_start], remaining_bytes); + } + read_buffer_start = 0; + } +} + +} // namespace edk +} // namespace mojo |