// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "device/serial/data_source_sender.h" #include #include "base/bind.h" #include "base/message_loop/message_loop.h" #include "device/serial/async_waiter.h" namespace device { // Represents a send that is not yet fulfilled. class DataSourceSender::PendingSend { public: PendingSend(DataSourceSender* sender, const ReadyCallback& callback); // Asynchronously fills |data| with up to |num_bytes| of data. Following this, // one of Done() and DoneWithError() will be called with the result. void GetData(void* data, uint32_t num_bytes); private: class Buffer; // Reports a successful write of |bytes_written|. void Done(uint32_t bytes_written); // Reports a partially successful or unsuccessful write of |bytes_written| // with an error of |error|. void DoneWithError(uint32_t bytes_written, int32_t error); // The DataSourceSender that owns this. DataSourceSender* sender_; // The callback to call to get data. ReadyCallback callback_; // Whether the buffer specified by GetData() has been passed to |callback_|, // but has not yet called Done() or DoneWithError(). bool buffer_in_use_; }; // A Writable implementation that provides a view of a data pipe owned by a // DataSourceSender. class DataSourceSender::PendingSend::Buffer : public WritableBuffer { public: Buffer(scoped_refptr sender, PendingSend* send, char* buffer, uint32_t buffer_size); virtual ~Buffer(); // WritableBuffer overrides. virtual char* GetData() OVERRIDE; virtual uint32_t GetSize() OVERRIDE; virtual void Done(uint32_t bytes_written) OVERRIDE; virtual void DoneWithError(uint32_t bytes_written, int32_t error) OVERRIDE; private: // The DataSourceSender whose data pipe we are providing a view. scoped_refptr sender_; // The PendingSend to which this buffer has been created in response. PendingSend* pending_send_; char* buffer_; uint32_t buffer_size_; }; DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, const ErrorCallback& error_callback) : ready_callback_(ready_callback), error_callback_(error_callback), bytes_sent_(0), shut_down_(false) { DCHECK(!ready_callback.is_null() && !error_callback.is_null()); } void DataSourceSender::ShutDown() { shut_down_ = true; waiter_.reset(); ready_callback_.Reset(); error_callback_.Reset(); } DataSourceSender::~DataSourceSender() { DCHECK(shut_down_); } void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { // This should never occur. |handle_| is only valid and |pending_send_| is // only set after Init is called. if (pending_send_ || handle_.is_valid() || shut_down_) { DispatchFatalError(); return; } handle_ = handle.Pass(); pending_send_.reset(new PendingSend(this, ready_callback_)); StartWaiting(); } void DataSourceSender::Resume() { if (pending_send_ || !handle_.is_valid()) { DispatchFatalError(); return; } pending_send_.reset(new PendingSend(this, ready_callback_)); StartWaiting(); } void DataSourceSender::OnConnectionError() { DispatchFatalError(); } void DataSourceSender::StartWaiting() { DCHECK(pending_send_ && !waiter_); waiter_.reset( new AsyncWaiter(handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, base::Bind(&DataSourceSender::OnDoneWaiting, this))); } void DataSourceSender::OnDoneWaiting(MojoResult result) { DCHECK(pending_send_ && !shut_down_ && waiter_); waiter_.reset(); if (result != MOJO_RESULT_OK) { DispatchFatalError(); return; } void* data = NULL; uint32_t num_bytes = std::numeric_limits::max(); result = mojo::BeginWriteDataRaw( handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); if (result != MOJO_RESULT_OK) { DispatchFatalError(); return; } pending_send_->GetData(static_cast(data), num_bytes); } void DataSourceSender::Done(uint32_t bytes_written) { DoneInternal(bytes_written); if (!shut_down_) StartWaiting(); } void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { DoneInternal(bytes_written); pending_send_.reset(); if (!shut_down_) client()->OnError(bytes_sent_, error); // We don't call StartWaiting here so we don't send any additional data until // Resume() is called. } void DataSourceSender::DoneInternal(uint32_t bytes_written) { DCHECK(pending_send_); if (shut_down_) return; bytes_sent_ += bytes_written; MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); if (result != MOJO_RESULT_OK) { DispatchFatalError(); return; } } void DataSourceSender::DispatchFatalError() { if (shut_down_) return; error_callback_.Run(); ShutDown(); } DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, const ReadyCallback& callback) : sender_(sender), callback_(callback), buffer_in_use_(false) { } void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { DCHECK(!buffer_in_use_); buffer_in_use_ = true; callback_.Run(scoped_ptr( new Buffer(sender_, this, static_cast(data), num_bytes))); } void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { DCHECK(buffer_in_use_); buffer_in_use_ = false; sender_->Done(bytes_written); } void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, int32_t error) { DCHECK(buffer_in_use_); buffer_in_use_ = false; sender_->DoneWithError(bytes_written, error); } DataSourceSender::PendingSend::Buffer::Buffer( scoped_refptr sender, PendingSend* send, char* buffer, uint32_t buffer_size) : sender_(sender), pending_send_(send), buffer_(buffer), buffer_size_(buffer_size) { } DataSourceSender::PendingSend::Buffer::~Buffer() { if (sender_.get()) pending_send_->Done(0); } char* DataSourceSender::PendingSend::Buffer::GetData() { return buffer_; } uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { return buffer_size_; } void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { DCHECK(sender_.get()); pending_send_->Done(bytes_written); sender_ = NULL; pending_send_ = NULL; buffer_ = NULL; buffer_size_ = 0; } void DataSourceSender::PendingSend::Buffer::DoneWithError( uint32_t bytes_written, int32_t error) { DCHECK(sender_.get()); pending_send_->DoneWithError(bytes_written, error); sender_ = NULL; pending_send_ = NULL; buffer_ = NULL; buffer_size_ = 0; } } // namespace device