diff options
Diffstat (limited to 'mojo/edk/system/data_pipe_producer_dispatcher.cc')
-rw-r--r-- | mojo/edk/system/data_pipe_producer_dispatcher.cc | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc new file mode 100644 index 0000000..5da76d5 --- /dev/null +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc @@ -0,0 +1,357 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/data_pipe_producer_dispatcher.h" + +#include "base/bind.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "mojo/edk/embedder/embedder_internal.h" +#include "mojo/edk/system/configuration.h" +#include "mojo/edk/system/data_pipe.h" + +namespace mojo { +namespace edk { + +void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe) { + if (message_pipe.is_valid()) { + channel_ = RawChannel::Create(message_pipe.Pass()); + internal::g_io_thread_task_runner->PostTask( + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); + } +} + +void DataPipeProducerDispatcher::InitOnIO() { + base::AutoLock locker(lock()); + if (channel_) + channel_->Init(this); +} + +void DataPipeProducerDispatcher::CloseOnIO() { + base::AutoLock locker(lock()); + if (channel_) { + channel_->Shutdown(); + channel_ = nullptr; + } +} + +Dispatcher::Type DataPipeProducerDispatcher::GetType() const { + return Type::DATA_PIPE_PRODUCER; +} + +scoped_refptr<DataPipeProducerDispatcher> +DataPipeProducerDispatcher::Deserialize( + const void* source, + size_t size, + PlatformHandleVector* platform_handles) { + MojoCreateDataPipeOptions options; + ScopedPlatformHandle platform_handle = + DataPipe::Deserialize(source, size, platform_handles, &options, + nullptr, 0); + + scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); + if (platform_handle.is_valid()) + rv->Init(platform_handle.Pass()); + return rv; +} + +DataPipeProducerDispatcher::DataPipeProducerDispatcher( + const MojoCreateDataPipeOptions& options) + : options_(options), channel_(nullptr), error_(false) { +} + +DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { + // |Close()|/|CloseImplNoLock()| should have taken care of the channel. + DCHECK(!channel_); +} + +void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { + lock().AssertAcquired(); + awakable_list_.CancelAll(); +} + +void DataPipeProducerDispatcher::CloseImplNoLock() { + lock().AssertAcquired(); + internal::g_io_thread_task_runner->PostTask( + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); +} + +scoped_refptr<Dispatcher> +DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { + lock().AssertAcquired(); + + scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); + rv->channel_ = channel_; + channel_ = nullptr; + rv->options_ = options_; + return scoped_refptr<Dispatcher>(rv.get()); +} + +MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( + const void* elements, + uint32_t* num_bytes, + MojoWriteDataFlags flags) { + lock().AssertAcquired(); + if (InTwoPhaseWrite()) + return MOJO_RESULT_BUSY; + if (error_) + return MOJO_RESULT_FAILED_PRECONDITION; + if (*num_bytes % options_.element_num_bytes != 0) + return MOJO_RESULT_INVALID_ARGUMENT; + if (*num_bytes == 0) + return MOJO_RESULT_OK; // Nothing to do. + + // For now, we ignore options.capacity_num_bytes as a total of all pending + // writes (and just treat it per message). We will implement that later if + // we need to. All current uses want all their data to be sent, and it's not + // clear that this backpressure should be done at the mojo layer or at a + // higher application layer. + bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; + uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; + if (min_num_bytes_to_write > options_.capacity_num_bytes) { + // Don't return "should wait" since you can't wait for a specified amount of + // data. + return MOJO_RESULT_OUT_OF_RANGE; + } + + uint32_t num_bytes_to_write = + std::min(*num_bytes, options_.capacity_num_bytes); + if (num_bytes_to_write == 0) + return MOJO_RESULT_SHOULD_WAIT; + + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); + + *num_bytes = num_bytes_to_write; + WriteDataIntoMessages(elements, num_bytes_to_write); + + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); + if (!new_state.equals(old_state)) + awakable_list_.AwakeForStateChange(new_state); + return MOJO_RESULT_OK; +} + +MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( + void** buffer, + uint32_t* buffer_num_bytes, + MojoWriteDataFlags flags) { + lock().AssertAcquired(); + if (InTwoPhaseWrite()) + return MOJO_RESULT_BUSY; + if (error_) + return MOJO_RESULT_FAILED_PRECONDITION; + + bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; + uint32_t min_num_bytes_to_write = 0; + if (all_or_none) { + min_num_bytes_to_write = *buffer_num_bytes; + if (min_num_bytes_to_write % options_.element_num_bytes != 0) + return MOJO_RESULT_INVALID_ARGUMENT; + if (min_num_bytes_to_write > options_.capacity_num_bytes) { + // Don't return "should wait" since you can't wait for a specified amount + // of data. + return MOJO_RESULT_OUT_OF_RANGE; + } + } + + // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. + if (*buffer_num_bytes == 0) + *buffer_num_bytes = options_.capacity_num_bytes; + + two_phase_data_.resize(*buffer_num_bytes); + *buffer = &two_phase_data_[0]; + + // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes + // we can construct a MessageInTransit here. But then we need to make + // MessageInTransit support changing its data size later. + + return MOJO_RESULT_OK; +} + +MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( + uint32_t num_bytes_written) { + lock().AssertAcquired(); + if (!InTwoPhaseWrite()) + return MOJO_RESULT_FAILED_PRECONDITION; + + // Note: Allow successful completion of the two-phase write even if the other + // side has been closed. + MojoResult rv = MOJO_RESULT_OK; + if (num_bytes_written > two_phase_data_.size() || + num_bytes_written % options_.element_num_bytes != 0) { + rv = MOJO_RESULT_INVALID_ARGUMENT; + } else if (channel_) { + WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); + } + + // Two-phase write ended even on failure. + two_phase_data_.clear(); + // If we're now writable, we *became* writable (since we weren't writable + // during the two-phase write), so awake producer awakables. + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); + if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) + awakable_list_.AwakeForStateChange(new_state); + + return rv; +} + +HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() + const { + lock().AssertAcquired(); + + HandleSignalsState rv; + if (!error_) { + if (!InTwoPhaseWrite()) + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; + } else { + rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; + } + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; + return rv; +} + +MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( + Awakable* awakable, + MojoHandleSignals signals, + uint32_t context, + HandleSignalsState* signals_state) { + lock().AssertAcquired(); + HandleSignalsState state = GetHandleSignalsStateImplNoLock(); + if (state.satisfies(signals)) { + if (signals_state) + *signals_state = state; + return MOJO_RESULT_ALREADY_EXISTS; + } + if (!state.can_satisfy(signals)) { + if (signals_state) + *signals_state = state; + return MOJO_RESULT_FAILED_PRECONDITION; + } + + awakable_list_.Add(awakable, signals, context); + return MOJO_RESULT_OK; +} + +void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( + Awakable* awakable, + HandleSignalsState* signals_state) { + lock().AssertAcquired(); + awakable_list_.Remove(awakable); + if (signals_state) + *signals_state = GetHandleSignalsStateImplNoLock(); +} + +void DataPipeProducerDispatcher::StartSerializeImplNoLock( + size_t* max_size, + size_t* max_platform_handles) { + DCHECK(HasOneRef()); // Only one ref => no need to take the lock. + + if (channel_) { + std::vector<char> temp; + serialized_platform_handle_ = channel_->ReleaseHandle(&temp); + channel_ = nullptr; + DCHECK(temp.empty()); + } + DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), + false, max_size, max_platform_handles); +} + +bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( + void* destination, + size_t* actual_size, + PlatformHandleVector* platform_handles) { + DCHECK(HasOneRef()); // Only one ref => no need to take the lock. + + DataPipe::EndSerialize( + options_, + serialized_platform_handle_.Pass(), + ScopedPlatformHandle(), 0, + destination, actual_size, platform_handles); + CloseImplNoLock(); + return true; +} + +void DataPipeProducerDispatcher::TransportStarted() { + started_transport_.Acquire(); +} + +void DataPipeProducerDispatcher::TransportEnded() { + started_transport_.Release(); +} + +bool DataPipeProducerDispatcher::IsBusyNoLock() const { + lock().AssertAcquired(); + return InTwoPhaseWrite(); +} + +void DataPipeProducerDispatcher::OnReadMessage( + const MessageInTransit::View& message_view, + ScopedPlatformHandleVectorPtr platform_handles) { + NOTREACHED(); +} + +void DataPipeProducerDispatcher::OnError(Error error) { + switch (error) { + case ERROR_READ_SHUTDOWN: + case ERROR_READ_BROKEN: + case ERROR_READ_BAD_MESSAGE: + case ERROR_READ_UNKNOWN: + LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages"; + break; + case ERROR_WRITE: + // Write errors are slightly notable: they probably shouldn't happen under + // normal operation (but maybe the other side crashed). + LOG(WARNING) << "DataPipeProducerDispatcher write error"; + break; + } + + error_ = true; + if (started_transport_.Try()) { + base::AutoLock locker(lock()); + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); + + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); + channel_ = nullptr; + started_transport_.Release(); + } else { + // We must be waiting to call ReleaseHandle. It will call Shutdown. + } +} + +bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { + return !two_phase_data_.empty(); +} + +bool DataPipeProducerDispatcher::WriteDataIntoMessages( + const void* elements, + uint32_t num_bytes) { + // The maximum amount of data to send per message (make it a multiple of the + // element size. + size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; + max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; + DCHECK_GT(max_message_num_bytes, 0u); + + uint32_t offset = 0; + while (offset < num_bytes) { + uint32_t message_num_bytes = + std::min(static_cast<uint32_t>(max_message_num_bytes), + num_bytes - offset); + scoped_ptr<MessageInTransit> message(new MessageInTransit( + MessageInTransit::Type::MESSAGE, message_num_bytes, + static_cast<const char*>(elements) + offset)); + if (!channel_->WriteMessage(message.Pass())) { + error_ = true; + return false; + } + + offset += message_num_bytes; + } + + return true; +} + +} // namespace edk +} // namespace mojo |