diff options
author | sammc@chromium.org <sammc@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-08-19 06:14:02 +0000 |
---|---|---|
committer | sammc@chromium.org <sammc@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-08-19 06:15:27 +0000 |
commit | 4e99a933bf3cfc73ba8d4b6086634cf72449227f (patch) | |
tree | 6772b725b73e07e31322ccc6f1a30f28b297e269 /device | |
parent | 3eef1b35d0ea704a274d65e2d662ecdb83768eb2 (diff) | |
download | chromium_src-4e99a933bf3cfc73ba8d4b6086634cf72449227f.zip chromium_src-4e99a933bf3cfc73ba8d4b6086634cf72449227f.tar.gz chromium_src-4e99a933bf3cfc73ba8d4b6086634cf72449227f.tar.bz2 |
Add data pipe wrappers to be used to implement serial send.
BUG=389016
Review URL: https://codereview.chromium.org/466623003
Cr-Commit-Position: refs/heads/master@{#290507}
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@290507 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'device')
-rw-r--r-- | device/device_tests.gyp | 1 | ||||
-rw-r--r-- | device/serial/BUILD.gn | 4 | ||||
-rw-r--r-- | device/serial/data_sender.cc | 280 | ||||
-rw-r--r-- | device/serial/data_sender.h | 112 | ||||
-rw-r--r-- | device/serial/data_sink_receiver.cc | 306 | ||||
-rw-r--r-- | device/serial/data_sink_receiver.h | 114 | ||||
-rw-r--r-- | device/serial/data_sink_unittest.cc | 413 | ||||
-rw-r--r-- | device/serial/data_stream.mojom | 22 | ||||
-rw-r--r-- | device/serial/serial.gyp | 4 |
9 files changed, 1256 insertions, 0 deletions
diff --git a/device/device_tests.gyp b/device/device_tests.gyp index 118e785..7e913e6 100644 --- a/device/device_tests.gyp +++ b/device/device_tests.gyp @@ -47,6 +47,7 @@ 'hid/hid_report_descriptor_unittest.cc', 'hid/hid_service_unittest.cc', 'hid/input_service_linux_unittest.cc', + 'serial/data_sink_unittest.cc', 'serial/data_source_unittest.cc', 'serial/serial_connection_unittest.cc', 'serial/serial_service_unittest.cc', diff --git a/device/serial/BUILD.gn b/device/serial/BUILD.gn index 58cc637..d6b1313 100644 --- a/device/serial/BUILD.gn +++ b/device/serial/BUILD.gn @@ -15,6 +15,10 @@ static_library("serial") { "buffer.h", "data_receiver.cc", "data_receiver.h", + "data_sender.cc", + "data_sender.h", + "data_sink_receiver.cc", + "data_sink_receiver.h", "data_source_sender.cc", "data_source_sender.h", "serial_connection.cc", diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc new file mode 100644 index 0000000..409f5f4 --- /dev/null +++ b/device/serial/data_sender.cc @@ -0,0 +1,280 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "device/serial/data_sender.h" + +#include "base/bind.h" +#include "base/message_loop/message_loop.h" +#include "device/serial/async_waiter.h" + +namespace device { + +// Represents a send that is not yet fulfilled. +class DataSender::PendingSend { + public: + PendingSend(const base::StringPiece& data, + const DataSentCallback& callback, + const SendErrorCallback& error_callback, + int32_t fatal_error_value); + + // Invoked to report that |num_bytes| of data have been sent. Subtracts the + // number of bytes that were part of this send from |num_bytes|. Returns + // whether this send has been completed. If this send has been completed, this + // calls |callback_|. + bool ReportBytesSent(uint32_t* num_bytes); + + // Invoked to report that |num_bytes| of data have been sent and then an + // error, |error| was encountered. Subtracts the number of bytes that were + // part of this send from |num_bytes|. If this send was not completed before + // the error, this calls |error_callback_| to report the error. Otherwise, + // this calls |callback_|. Returns the number of bytes sent but not acked. + uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error); + + // Reports |fatal_error_value_| to |receive_error_callback_|. + void DispatchFatalError(); + + // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK + // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent + // or the error if one is encountered writing to |handle|. + MojoResult SendData(mojo::DataPipeProducerHandle handle); + + private: + // Invoked to update |bytes_acked_| and |num_bytes|. + void ReportBytesSentInternal(uint32_t* num_bytes); + + // The data to send. + const base::StringPiece data_; + + // The callback to report success. + const DataSentCallback callback_; + + // The callback to report errors. + const SendErrorCallback error_callback_; + + // The error value to report when DispatchFatalError() is called. + const int32_t fatal_error_value_; + + // The number of bytes sent to the data pipe. + uint32_t bytes_sent_; + + // The number of bytes acked. + uint32_t bytes_acked_; +}; + +DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink, + uint32_t buffer_size, + int32_t fatal_error_value) + : sink_(sink.Pass()), + fatal_error_value_(fatal_error_value), + shut_down_(false) { + sink_.set_error_handler(this); + MojoCreateDataPipeOptions options = { + sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size, + }; + options.struct_size = sizeof(options); + mojo::ScopedDataPipeConsumerHandle remote_handle; + MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle); + DCHECK_EQ(MOJO_RESULT_OK, result); + sink_->Init(remote_handle.Pass()); + sink_.set_client(this); +} + +DataSender::~DataSender() { + ShutDown(); +} + +bool DataSender::Send(const base::StringPiece& data, + const DataSentCallback& callback, + const SendErrorCallback& error_callback) { + DCHECK(!callback.is_null() && !error_callback.is_null()); + if (!pending_cancel_.is_null() || shut_down_) + return false; + + pending_sends_.push(linked_ptr<PendingSend>( + new PendingSend(data, callback, error_callback, fatal_error_value_))); + SendInternal(); + return true; +} + +bool DataSender::Cancel(int32_t error, const CancelCallback& callback) { + DCHECK(!callback.is_null()); + if (!pending_cancel_.is_null() || shut_down_) + return false; + if (pending_sends_.empty() && sends_awaiting_ack_.empty()) { + base::MessageLoop::current()->PostTask(FROM_HERE, callback); + return true; + } + + pending_cancel_ = callback; + sink_->Cancel(error); + return true; +} + +void DataSender::ReportBytesSent(uint32_t bytes_sent) { + if (shut_down_) + return; + + while (bytes_sent != 0 && !sends_awaiting_ack_.empty() && + sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) { + sends_awaiting_ack_.pop(); + } + if (bytes_sent > 0 && !pending_sends_.empty()) { + bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent); + DCHECK(!finished); + if (finished) { + ShutDown(); + return; + } + } + if (bytes_sent != 0) { + ShutDown(); + return; + } + if (pending_sends_.empty() && sends_awaiting_ack_.empty()) + RunCancelCallback(); +} + +void DataSender::ReportBytesSentAndError( + uint32_t bytes_sent, + int32_t error, + const mojo::Callback<void(uint32_t)>& callback) { + if (shut_down_) + return; + + uint32_t bytes_to_flush = 0; + while (!sends_awaiting_ack_.empty()) { + bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError( + &bytes_sent, error); + sends_awaiting_ack_.pop(); + } + while (!pending_sends_.empty()) { + bytes_to_flush += + pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error); + pending_sends_.pop(); + } + callback.Run(bytes_to_flush); + RunCancelCallback(); +} + +void DataSender::OnConnectionError() { + ShutDown(); +} + +void DataSender::SendInternal() { + while (!pending_sends_.empty()) { + MojoResult result = pending_sends_.front()->SendData(handle_.get()); + if (result == MOJO_RESULT_OK) { + sends_awaiting_ack_.push(pending_sends_.front()); + pending_sends_.pop(); + } else if (result == MOJO_RESULT_SHOULD_WAIT) { + waiter_.reset(new AsyncWaiter( + handle_.get(), + MOJO_HANDLE_SIGNAL_WRITABLE, + base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this)))); + return; + } else { + ShutDown(); + return; + } + } +} + +void DataSender::OnDoneWaiting(MojoResult result) { + waiter_.reset(); + if (result != MOJO_RESULT_OK) { + ShutDown(); + return; + } + SendInternal(); +} + +void DataSender::RunCancelCallback() { + DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty()); + if (pending_cancel_.is_null()) + return; + + base::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(pending_cancel_)); + pending_cancel_.Reset(); +} + +void DataSender::ShutDown() { + waiter_.reset(); + shut_down_ = true; + while (!pending_sends_.empty()) { + pending_sends_.front()->DispatchFatalError(); + pending_sends_.pop(); + } + while (!sends_awaiting_ack_.empty()) { + sends_awaiting_ack_.front()->DispatchFatalError(); + sends_awaiting_ack_.pop(); + } + RunCancelCallback(); +} + +DataSender::PendingSend::PendingSend(const base::StringPiece& data, + const DataSentCallback& callback, + const SendErrorCallback& error_callback, + int32_t fatal_error_value) + : data_(data), + callback_(callback), + error_callback_(error_callback), + fatal_error_value_(fatal_error_value), + bytes_sent_(0), + bytes_acked_(0) { +} + +bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) { + ReportBytesSentInternal(num_bytes); + if (bytes_acked_ < data_.size()) + return false; + + base::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(callback_, bytes_acked_)); + return true; +} + +uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes, + int32_t error) { + ReportBytesSentInternal(num_bytes); + if (*num_bytes > 0) { + base::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(callback_, bytes_acked_)); + return 0; + } + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(error_callback_, bytes_acked_, error)); + return bytes_sent_ - bytes_acked_; +} + +void DataSender::PendingSend::DispatchFatalError() { + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_)); +} + +MojoResult DataSender::PendingSend::SendData( + mojo::DataPipeProducerHandle handle) { + uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_; + MojoResult result = mojo::WriteDataRaw(handle, + data_.data() + bytes_sent_, + &bytes_to_send, + MOJO_WRITE_DATA_FLAG_NONE); + if (result != MOJO_RESULT_OK) + return result; + + bytes_sent_ += bytes_to_send; + return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; +} + +void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) { + bytes_acked_ += *num_bytes; + if (bytes_acked_ > bytes_sent_) { + *num_bytes = bytes_acked_ - bytes_sent_; + bytes_acked_ = bytes_sent_; + } else { + *num_bytes = 0; + } +} + +} // namespace device diff --git a/device/serial/data_sender.h b/device/serial/data_sender.h new file mode 100644 index 0000000..ab21db4 --- /dev/null +++ b/device/serial/data_sender.h @@ -0,0 +1,112 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DEVICE_SERIAL_DATA_SENDER_H_ +#define DEVICE_SERIAL_DATA_SENDER_H_ + +#include <queue> + +#include "base/callback.h" +#include "base/memory/linked_ptr.h" +#include "base/strings/string_piece.h" +#include "device/serial/buffer.h" +#include "device/serial/data_stream.mojom.h" +#include "mojo/public/cpp/system/data_pipe.h" + +namespace device { + +class AsyncWaiter; + +// A DataSender sends data to a DataSink. +class DataSender : public serial::DataSinkClient, public mojo::ErrorHandler { + public: + typedef base::Callback<void(uint32_t bytes_sent)> DataSentCallback; + typedef base::Callback<void(uint32_t bytes_sent, int32_t error)> + SendErrorCallback; + typedef base::Callback<void()> CancelCallback; + + // Constructs a DataSender to send data to |sink|, using a data pipe with a + // buffer size of |buffer_size|, with connection errors reported as + // |fatal_error_value|. + DataSender(mojo::InterfacePtr<serial::DataSink> sink, + uint32_t buffer_size, + int32_t fatal_error_value); + + virtual ~DataSender(); + + // Starts an asynchronous send of |data|. If the send completes successfully, + // |callback| will be called. Otherwise, |error_callback| will be called with + // the error. If there is a cancel in progress or there has been a connection + // error, this will not start a send and returns false. It is the caller's + // responsibility to ensure |data| remains valid until one of |callback| and + // |error_callback| is called. + bool Send(const base::StringPiece& data, + const DataSentCallback& callback, + const SendErrorCallback& error_callback); + + // Requests the cancellation of any in-progress sends. Calls to Send() will + // fail until |callback| is called. + bool Cancel(int32_t error, const CancelCallback& callback); + + private: + class PendingSend; + + // serial::DataSinkClient overrides. + virtual void ReportBytesSent(uint32_t bytes_sent) OVERRIDE; + virtual void ReportBytesSentAndError( + uint32_t bytes_sent, + int32_t error, + const mojo::Callback<void(uint32_t)>& callback) OVERRIDE; + + // mojo::ErrorHandler override. + virtual void OnConnectionError() OVERRIDE; + + // Copies data from |pending_sends_| into the data pipe and starts |waiter_| + // waiting if the pipe is full. When a PendingSend in |pending_sends_| has + // been fully copied into the data pipe, it moves to |sends_awaiting_ack_|. + void SendInternal(); + + // Invoked when |handle_| is ready for writes. Calls SendInternal(). + void OnDoneWaiting(MojoResult result); + + // Dispatches a cancel callback if one is pending. + void RunCancelCallback(); + + // Shuts down this DataSender and dispatches fatal errors to all pending + // operations. + void ShutDown(); + + // The control connection to the data sink. + mojo::InterfacePtr<serial::DataSink> sink_; + + // The data connection to the data sink. + mojo::ScopedDataPipeProducerHandle handle_; + + // The error value to report in the event of a fatal error. + const int32_t fatal_error_value_; + + // A waiter used to wait until |handle_| is writable if we are waiting. + scoped_ptr<AsyncWaiter> waiter_; + + // A queue of PendingSend that have not yet been fully written to the data + // pipe. + std::queue<linked_ptr<PendingSend> > pending_sends_; + + // A queue of PendingSend that have been written to the data pipe, but have + // not yet been acked by the DataSink. + std::queue<linked_ptr<PendingSend> > sends_awaiting_ack_; + + // The callback to report cancel completion if a cancel operation is in + // progress. + CancelCallback pending_cancel_; + + // Whether we have encountered a fatal error and shut down. + bool shut_down_; + + DISALLOW_COPY_AND_ASSIGN(DataSender); +}; + +} // namespace device + +#endif // DEVICE_SERIAL_DATA_SENDER_H_ diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc new file mode 100644 index 0000000..ede0468 --- /dev/null +++ b/device/serial/data_sink_receiver.cc @@ -0,0 +1,306 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "device/serial/data_sink_receiver.h" + +#include <limits> + +#include "base/bind.h" +#include "device/serial/async_waiter.h" + +namespace device { + +// Represents a flush of data that has not been completed. +class DataSinkReceiver::PendingFlush { + public: + PendingFlush(); + + // Initializes this PendingFlush with |num_bytes|, the number of bytes to + // flush. + void SetNumBytesToFlush(uint32_t num_bytes); + + // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns + // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than + // |bytes_to_flush_| bytes were flushed or the error if one is encountered + // discarding data from |handle|. + MojoResult Flush(mojo::DataPipeConsumerHandle handle); + + // Whether this PendingFlush has received the number of bytes to flush. + bool received_flush() { return received_flush_; } + + private: + // Whether this PendingFlush has received the number of bytes to flush. + bool received_flush_; + + // The remaining number of bytes to flush. + uint32_t bytes_to_flush_; +}; + +// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by +// a DataSinkReceiver. +class DataSinkReceiver::Buffer : public ReadOnlyBuffer { + public: + Buffer(scoped_refptr<DataSinkReceiver> receiver, + const char* buffer, + uint32_t buffer_size); + virtual ~Buffer(); + + void Cancel(int32_t error); + + // ReadOnlyBuffer overrides. + virtual const char* GetData() OVERRIDE; + virtual uint32_t GetSize() OVERRIDE; + virtual void Done(uint32_t bytes_read) OVERRIDE; + virtual void DoneWithError(uint32_t bytes_read, int32_t error) OVERRIDE; + + private: + // The DataSinkReceiver whose data pipe we are providing a view. + scoped_refptr<DataSinkReceiver> receiver_; + + const char* buffer_; + uint32_t buffer_size_; + + // Whether this receive has been cancelled. + bool cancelled_; + + // If |cancelled_|, contains the cancellation error to report. + int32_t cancellation_error_; +}; + +DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, + const CancelCallback& cancel_callback, + const ErrorCallback& error_callback) + : ready_callback_(ready_callback), + cancel_callback_(cancel_callback), + error_callback_(error_callback), + buffer_in_use_(NULL), + shut_down_(false), + weak_factory_(this) { +} + +void DataSinkReceiver::ShutDown() { + shut_down_ = true; + if (waiter_) + waiter_.reset(); +} + +DataSinkReceiver::~DataSinkReceiver() { +} + +void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { + if (!handle.is_valid() || handle_.is_valid()) { + DispatchFatalError(); + return; + } + + handle_ = handle.Pass(); + StartWaiting(); +} + +void DataSinkReceiver::Cancel(int32_t error) { + // If we have sent a ReportBytesSentAndError but have not received the + // response, that ReportBytesSentAndError message will appear to the + // DataSinkClient to be caused by this Cancel message. In that case, we ignore + // the cancel. + if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) + return; + + // If there is a buffer is in use, mark the buffer as cancelled and notify the + // client by calling |cancel_callback_|. The sink implementation may or may + // not take the cancellation into account when deciding what error (if any) to + // return. If the sink returns an error, we ignore the cancellation error. + // Otherwise, if the sink does not report an error, we override that with the + // cancellation error. Once a cancellation has been received, the next report + // sent to the client will always contain an error; the error returned by the + // sink or the cancellation error if the sink does not return an error. + if (buffer_in_use_) { + buffer_in_use_->Cancel(error); + if (!cancel_callback_.is_null()) + cancel_callback_.Run(error); + return; + } + // If there is no buffer in use, immediately report the error and cancel the + // waiting for the data pipe if one exists. This transitions straight into the + // state after the sink has returned an error. + waiter_.reset(); + ReportBytesSentAndError(0, error); +} + +void DataSinkReceiver::OnConnectionError() { + DispatchFatalError(); +} + +void DataSinkReceiver::StartWaiting() { + DCHECK(!waiter_ && !shut_down_); + waiter_.reset( + new AsyncWaiter(handle_.get(), + MOJO_HANDLE_SIGNAL_READABLE, + base::Bind(&DataSinkReceiver::OnDoneWaiting, this))); +} + +void DataSinkReceiver::OnDoneWaiting(MojoResult result) { + DCHECK(waiter_ && !shut_down_); + waiter_.reset(); + if (result != MOJO_RESULT_OK) { + DispatchFatalError(); + return; + } + // If there are any queued flushes (from ReportBytesSentAndError()), let them + // flush data from the data pipe. + if (!pending_flushes_.empty()) { + MojoResult result = pending_flushes_.front()->Flush(handle_.get()); + if (result == MOJO_RESULT_OK) { + pending_flushes_.pop(); + } else if (result != MOJO_RESULT_SHOULD_WAIT) { + DispatchFatalError(); + return; + } + StartWaiting(); + return; + } + const void* data = NULL; + uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); + result = mojo::BeginReadDataRaw( + handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); + if (result != MOJO_RESULT_OK) { + DispatchFatalError(); + return; + } + buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes); + ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); +} + +void DataSinkReceiver::Done(uint32_t bytes_read) { + if (!DoneInternal(bytes_read)) + return; + client()->ReportBytesSent(bytes_read); + StartWaiting(); +} + +void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { + if (!DoneInternal(bytes_read)) + return; + ReportBytesSentAndError(bytes_read, error); +} + +bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { + if (shut_down_) + return false; + + DCHECK(buffer_in_use_); + buffer_in_use_ = NULL; + MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); + if (result != MOJO_RESULT_OK) { + DispatchFatalError(); + return false; + } + return true; +} + +void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, + int32_t error) { + // When we encounter an error, we must discard the data from any sends already + // in the data pipe before we can resume dispatching data to the sink. We add + // a pending flush here. The response containing the number of bytes to flush + // is handled in SetNumBytesToFlush(). The actual flush is handled in + // OnDoneWaiting(). + pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush())); + client()->ReportBytesSentAndError( + bytes_read, + error, + base::Bind(&DataSinkReceiver::SetNumBytesToFlush, + weak_factory_.GetWeakPtr())); +} + +void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { + DCHECK(!pending_flushes_.empty()); + DCHECK(!pending_flushes_.back()->received_flush()); + pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); + if (!waiter_) + StartWaiting(); +} + +void DataSinkReceiver::DispatchFatalError() { + if (shut_down_) + return; + + ShutDown(); + if (!error_callback_.is_null()) + error_callback_.Run(); +} + +DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver, + const char* buffer, + uint32_t buffer_size) + : receiver_(receiver), + buffer_(buffer), + buffer_size_(buffer_size), + cancelled_(false), + cancellation_error_(0) { +} + +DataSinkReceiver::Buffer::~Buffer() { + if (!receiver_) + return; + if (cancelled_) + receiver_->DoneWithError(0, cancellation_error_); + else + receiver_->Done(0); +} + +void DataSinkReceiver::Buffer::Cancel(int32_t error) { + cancelled_ = true; + cancellation_error_ = error; +} + +const char* DataSinkReceiver::Buffer::GetData() { + return buffer_; +} + +uint32_t DataSinkReceiver::Buffer::GetSize() { + return buffer_size_; +} + +void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { + if (cancelled_) + receiver_->DoneWithError(bytes_read, cancellation_error_); + else + receiver_->Done(bytes_read); + receiver_ = NULL; + buffer_ = NULL; + buffer_size_ = 0; +} + +void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, + int32_t error) { + receiver_->DoneWithError(bytes_read, error); + receiver_ = NULL; + buffer_ = NULL; + buffer_size_ = 0; +} + +DataSinkReceiver::PendingFlush::PendingFlush() + : received_flush_(false), bytes_to_flush_(0) { +} + +void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { + DCHECK(!received_flush_); + received_flush_ = true; + bytes_to_flush_ = num_bytes; +} + +MojoResult DataSinkReceiver::PendingFlush::Flush( + mojo::DataPipeConsumerHandle handle) { + DCHECK(received_flush_); + uint32_t num_bytes = bytes_to_flush_; + MojoResult result = + mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); + if (result != MOJO_RESULT_OK) + return result; + DCHECK(num_bytes <= bytes_to_flush_); + bytes_to_flush_ -= num_bytes; + return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; +} + +} // namespace device diff --git a/device/serial/data_sink_receiver.h b/device/serial/data_sink_receiver.h new file mode 100644 index 0000000..2e63d7a --- /dev/null +++ b/device/serial/data_sink_receiver.h @@ -0,0 +1,114 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DEVICE_SERIAL_DATA_SINK_RECEIVER_H_ +#define DEVICE_SERIAL_DATA_SINK_RECEIVER_H_ + +#include <queue> + +#include "base/callback.h" +#include "base/memory/linked_ptr.h" +#include "base/memory/ref_counted.h" +#include "base/memory/weak_ptr.h" +#include "device/serial/buffer.h" +#include "device/serial/data_stream.mojom.h" +#include "mojo/public/cpp/system/data_pipe.h" + +namespace device { + +class AsyncWaiter; + +class DataSinkReceiver : public base::RefCounted<DataSinkReceiver>, + public mojo::InterfaceImpl<serial::DataSink> { + public: + typedef base::Callback<void(scoped_ptr<ReadOnlyBuffer>)> ReadyCallback; + typedef base::Callback<void(int32_t error)> CancelCallback; + typedef base::Callback<void()> ErrorCallback; + + // Constructs a DataSinkReceiver. Whenever the pipe is ready for reading, the + // |ready_callback| will be called with the ReadOnlyBuffer to read. + // |ready_callback| will not be called again until the previous ReadOnlyBuffer + // is destroyed. If a connection error occurs, |error_callback| will be called + // and the DataSinkReceiver will act as if ShutDown() had been called. If + // |cancel_callback| is valid, it will be called when the DataSinkClient + // requests cancellation of the in-progress read. + DataSinkReceiver(const ReadyCallback& ready_callback, + const CancelCallback& cancel_callback, + const ErrorCallback& error_callback); + + // Shuts down this DataSinkReceiver. After shut down, |ready_callback|, + // |cancel_callback| and |error_callback| will never be called. + void ShutDown(); + + private: + class Buffer; + class PendingFlush; + friend class base::RefCounted<DataSinkReceiver>; + + virtual ~DataSinkReceiver(); + + // mojo::InterfaceImpl<serial::DataSink> overrides. + virtual void Init(mojo::ScopedDataPipeConsumerHandle handle) OVERRIDE; + virtual void Cancel(int32_t error) OVERRIDE; + virtual void OnConnectionError() OVERRIDE; + + // Starts waiting for |handle_| to be ready for reads. + void StartWaiting(); + + // Invoked when |handle_| is ready for reads. + void OnDoneWaiting(MojoResult result); + + // Reports a successful read of |bytes_read|. + void Done(uint32_t bytes_read); + + // Reports a partially successful or unsuccessful read of |bytes_read| + // with an error of |error|. + void DoneWithError(uint32_t bytes_read, int32_t error); + + // Finishes the two-phase data pipe read. + bool DoneInternal(uint32_t bytes_read); + + // Sends an ReportBytesSentAndError message to the client. + void ReportBytesSentAndError(uint32_t bytes_read, int32_t error); + + // Invoked in response to an ReportBytesSentAndError call to the client with + // the number of bytes to flush. + void SetNumBytesToFlush(uint32_t bytes_to_flush); + + // Reports a fatal error to the client and shuts down. + void DispatchFatalError(); + + // The data connection to the data sender. + mojo::ScopedDataPipeConsumerHandle handle_; + + // The callback to call when |handle_| has data ready to read. + const ReadyCallback ready_callback_; + + // The callback to call when the client has requested cancellation. + const CancelCallback cancel_callback_; + + // The callback to call if a fatal error occurs. + const ErrorCallback error_callback_; + + // The queue of pending flushes. + std::queue<linked_ptr<PendingFlush> > pending_flushes_; + + // A waiter used to wait until |handle_| is readable if we are waiting. + scoped_ptr<AsyncWaiter> waiter_; + + // The buffer passed to |ready_callback_| if one exists. This is not owned, + // but the Buffer will call Done or DoneWithError before being deleted. + Buffer* buffer_in_use_; + + // Whether we have encountered a fatal error and shut down. + bool shut_down_; + + base::WeakPtrFactory<DataSinkReceiver> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(DataSinkReceiver); +}; + +} // namespace device + +#endif // DEVICE_SERIAL_DATA_SINK_RECEIVER_H_ diff --git a/device/serial/data_sink_unittest.cc b/device/serial/data_sink_unittest.cc new file mode 100644 index 0000000..46688d2 --- /dev/null +++ b/device/serial/data_sink_unittest.cc @@ -0,0 +1,413 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/bind.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "base/strings/string_piece.h" +#include "device/serial/async_waiter.h" +#include "device/serial/data_sender.h" +#include "device/serial/data_sink_receiver.h" +#include "device/serial/data_stream.mojom.h" +#include "mojo/public/cpp/bindings/interface_ptr.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace device { + +class DataSinkTest : public testing::Test { + public: + enum Event { + EVENT_NONE, + EVENT_READ_BUFFER_READY, + EVENT_CANCEL_RECEIVED, + EVENT_SEND_SUCCESS, + EVENT_SEND_ERROR, + EVENT_CANCEL_COMPLETE, + EVENT_ERROR, + }; + + DataSinkTest() + : bytes_sent_(0), + send_error_(0), + has_send_error_(false), + cancel_error_(0), + seen_connection_error_(false), + expected_event_(EVENT_NONE) {} + + virtual void SetUp() OVERRIDE { + message_loop_.reset(new base::MessageLoop); + mojo::InterfacePtr<serial::DataSink> sink_handle; + sink_receiver_ = mojo::WeakBindToProxy( + new DataSinkReceiver( + base::Bind(&DataSinkTest::OnDataToRead, base::Unretained(this)), + base::Bind(&DataSinkTest::OnCancel, base::Unretained(this)), + base::Bind(&DataSinkTest::OnError, base::Unretained(this))), + &sink_handle); + sender_.reset(new DataSender(sink_handle.Pass(), kBufferSize, kFatalError)); + } + + virtual void TearDown() OVERRIDE { + read_buffer_.reset(); + message_loop_.reset(); + if (sink_receiver_) + EXPECT_TRUE(sink_receiver_->HasOneRef()); + } + + void WaitForEvent(Event event) { + expected_event_ = event; + base::RunLoop run_loop; + stop_run_loop_ = run_loop.QuitClosure(); + run_loop.Run(); + } + + void EventReceived(Event event) { + if (event == expected_event_ && !stop_run_loop_.is_null()) + stop_run_loop_.Run(); + } + + void OnError() { + seen_connection_error_ = true; + EventReceived(EVENT_ERROR); + } + + void ExpectDataAndReadAll(const base::StringPiece& expected_data) { + ExpectData(expected_data, static_cast<uint32_t>(expected_data.size()), 0); + } + + void ExpectData(const base::StringPiece& expected_data, + uint32_t bytes_to_read, + int32_t error) { + DCHECK(bytes_to_read <= static_cast<uint32_t>(expected_data.size())); + std::string data; + while (data.size() < static_cast<size_t>(expected_data.size())) { + if (!read_buffer_) + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(read_buffer_); + data.append(read_buffer_->GetData(), read_buffer_->GetSize()); + if (bytes_to_read <= read_buffer_->GetSize()) { + if (error) + read_buffer_->DoneWithError(bytes_to_read, error); + else + read_buffer_->Done(bytes_to_read); + read_buffer_.reset(); + break; + } else { + bytes_to_read -= read_buffer_->GetSize(); + read_buffer_->Done(read_buffer_->GetSize()); + read_buffer_.reset(); + } + } + // If we terminate early, we may not see all of the data. In that case, + // check that the part we saw matches what we expected. + if (static_cast<uint32_t>(data.size()) < + static_cast<uint32_t>(expected_data.size()) && + data.size() >= bytes_to_read) { + EXPECT_EQ(expected_data.substr(0, data.size()), data); + return; + } + EXPECT_EQ(expected_data, data); + } + + void ExpectSendSuccess(uint32_t expected_bytes_sent) { + bytes_sent_ = 0; + WaitForEvent(EVENT_SEND_SUCCESS); + EXPECT_EQ(expected_bytes_sent, bytes_sent_); + EXPECT_FALSE(has_send_error_); + } + + void ExpectSendError(uint32_t expected_bytes_sent, int32_t expected_error) { + bytes_sent_ = 0; + has_send_error_ = 0; + send_error_ = 0; + WaitForEvent(EVENT_SEND_ERROR); + EXPECT_EQ(expected_bytes_sent, bytes_sent_); + EXPECT_TRUE(has_send_error_); + EXPECT_EQ(expected_error, send_error_); + } + + void ExpectCancel(int32_t expected_error) { + cancel_error_ = 0; + WaitForEvent(EVENT_CANCEL_RECEIVED); + EXPECT_EQ(expected_error, cancel_error_); + } + + void ExpectCancelResult() { WaitForEvent(EVENT_CANCEL_COMPLETE); } + + bool Send(const base::StringPiece& data) { + return sender_->Send( + data, + base::Bind(&DataSinkTest::OnDataSent, base::Unretained(this)), + base::Bind(&DataSinkTest::OnSendError, base::Unretained(this))); + } + + void OnDataSent(uint32_t bytes_sent) { + bytes_sent_ = bytes_sent; + has_send_error_ = false; + EventReceived(EVENT_SEND_SUCCESS); + } + + void OnSendError(uint32_t bytes_sent, int32_t error) { + bytes_sent_ = bytes_sent; + send_error_ = error; + has_send_error_ = true; + EventReceived(EVENT_SEND_ERROR); + } + + void OnDataToRead(scoped_ptr<ReadOnlyBuffer> buffer) { + read_buffer_ = buffer.Pass(); + read_buffer_contents_ = + std::string(read_buffer_->GetData(), read_buffer_->GetSize()); + EventReceived(EVENT_READ_BUFFER_READY); + } + + bool Cancel(int32_t error) { + return sender_->Cancel( + error, base::Bind(&DataSinkTest::CancelAck, base::Unretained(this))); + } + + void CancelAck() { EventReceived(EVENT_CANCEL_COMPLETE); } + + void OnCancel(int32_t error) { + cancel_error_ = error; + EventReceived(EVENT_CANCEL_RECEIVED); + } + + protected: + static const int32_t kFatalError; + static const uint32_t kBufferSize; + scoped_ptr<base::MessageLoop> message_loop_; + base::Closure stop_run_loop_; + + scoped_refptr<DataSinkReceiver> sink_receiver_; + scoped_ptr<DataSender> sender_; + + uint32_t bytes_sent_; + int32_t send_error_; + bool has_send_error_; + int32_t cancel_error_; + scoped_ptr<ReadOnlyBuffer> read_buffer_; + std::string read_buffer_contents_; + + bool seen_connection_error_; + + Event expected_event_; + + private: + DISALLOW_COPY_AND_ASSIGN(DataSinkTest); +}; + +const int32_t DataSinkTest::kFatalError = -10; +const uint32_t DataSinkTest::kBufferSize = 100; + +TEST_F(DataSinkTest, Basic) { + ASSERT_TRUE(Send("a")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("a")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, LargeSend) { + std::string pattern = "1234567890"; + std::string data; + for (uint32_t i = 0; i < kBufferSize; i++) + data.append(pattern); + ASSERT_TRUE(Send(data)); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll(data)); + ExpectSendSuccess(static_cast<uint32_t>(data.size())); +} + +TEST_F(DataSinkTest, ErrorAndAllData) { + ASSERT_TRUE(Send("a")); + ASSERT_NO_FATAL_FAILURE(ExpectData("a", 1, -1)); + ExpectSendError(1, -1); +} + +TEST_F(DataSinkTest, ErrorAndPartialData) { + ASSERT_TRUE(Send("ab")); + ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 1, -1)); + ExpectSendError(1, -1); + + ASSERT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, ErrorAndPartialDataAtPacketBoundary) { + ASSERT_TRUE(Send("ab")); + ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 2, -1)); + ExpectSendError(2, -1); + + ASSERT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, ErrorInSecondPacket) { + ASSERT_TRUE(Send("a")); + ASSERT_TRUE(Send("b")); + ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 2, -1)); + ExpectSendSuccess(1); + ExpectSendError(1, -1); + + ASSERT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, ErrorBeforeLargeSend) { + ASSERT_TRUE(Send("a")); + std::string pattern = "123456789"; + std::string data; + for (int i = 0; i < 100; i++) + data.append("1234567890"); + ASSERT_TRUE(Send(data)); + ASSERT_NO_FATAL_FAILURE(ExpectData("a" + data, 1, -1)); + ExpectSendError(1, -1); + ExpectSendError(0, -1); +} + +TEST_F(DataSinkTest, ErrorOnly) { + ASSERT_TRUE(Send("a")); + ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -1)); + ExpectSendError(0, -1); + + ASSERT_TRUE(Send("b")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("b")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, Cancel) { + ASSERT_TRUE(Send("a")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-2)); + // Check that sends fail until the cancel operation completes. + EXPECT_FALSE(Send("b")); + ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -1)); + // Check that the error reported by the sink implementation is reported to the + // client - not the cancellation error. + ExpectSendError(0, -1); + ExpectCancelResult(); + + EXPECT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, CancelSinkSucceeds) { + ASSERT_TRUE(Send("a")); + EXPECT_TRUE(Send("b")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 1, 0)); + ExpectSendError(1, -2); + ExpectCancelResult(); + + EXPECT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, CancelTwice) { + ASSERT_TRUE(Send("a")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -2)); + ExpectSendError(0, -2); + ExpectCancelResult(); + + EXPECT_TRUE(Send("b")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-3)); + ASSERT_NO_FATAL_FAILURE(ExpectCancel(-3)); + ASSERT_NO_FATAL_FAILURE(ExpectData("b", 0, -3)); + ExpectSendError(0, -3); + ExpectCancelResult(); + + EXPECT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, CancelTwiceWithNoSendBetween) { + ASSERT_TRUE(Send("a")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -2)); + ExpectSendError(0, -2); + ExpectCancelResult(); + ASSERT_TRUE(Cancel(-3)); + ExpectCancelResult(); + + EXPECT_TRUE(Send("b")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("b")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, CancelDuringError) { + ASSERT_TRUE(Send("a")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -1)); + ExpectSendError(0, -1); + ExpectCancelResult(); + + EXPECT_TRUE(Send("a")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("a")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, CancelWithoutSend) { + ASSERT_TRUE(Cancel(-2)); + ExpectCancelResult(); + + EXPECT_TRUE(Send("a")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("a")); + ExpectSendSuccess(1); + EXPECT_EQ(0, cancel_error_); +} + +TEST_F(DataSinkTest, CancelPartialPacket) { + ASSERT_TRUE(Send("ab")); + WaitForEvent(EVENT_READ_BUFFER_READY); + ASSERT_TRUE(Cancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2)); + ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 1, -2)); + ExpectSendError(1, -2); + ExpectCancelResult(); + + EXPECT_TRUE(Send("c")); + ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c")); + ExpectSendSuccess(1); +} + +TEST_F(DataSinkTest, SinkReceiverShutdown) { + ASSERT_TRUE(Send("a")); + ASSERT_TRUE(Send(std::string(kBufferSize * 10, 'b'))); + ASSERT_TRUE(Cancel(-1)); + sink_receiver_->ShutDown(); + sink_receiver_ = NULL; + ExpectSendError(0, kFatalError); + ExpectSendError(0, kFatalError); + ExpectCancelResult(); + ASSERT_FALSE(Send("a")); + ASSERT_FALSE(Cancel(-1)); +} + +TEST_F(DataSinkTest, SenderShutdown) { + ASSERT_TRUE(Send("a")); + ASSERT_TRUE(Send(std::string(kBufferSize * 10, 'b'))); + ASSERT_TRUE(Cancel(-1)); + sender_.reset(); + ExpectSendError(0, kFatalError); + ExpectSendError(0, kFatalError); + ExpectCancelResult(); + if (!seen_connection_error_) + WaitForEvent(EVENT_ERROR); + EXPECT_TRUE(seen_connection_error_); +} + +} // namespace device diff --git a/device/serial/data_stream.mojom b/device/serial/data_stream.mojom index f303f20..689b381 100644 --- a/device/serial/data_stream.mojom +++ b/device/serial/data_stream.mojom @@ -21,4 +21,26 @@ interface DataSourceClient { OnError(uint32 error_location, int32 error); }; +[Client=DataSinkClient] +interface DataSink { + // Initializes this DataSink with a data pipe handle to use for data + // transmission. + Init(handle<data_pipe_consumer> consumer_handle); + + // Requests the cancellation of any data that has been written to the pipe, + // but has not yet been sent to the sink. + Cancel(int32 error); +}; + +interface DataSinkClient { + // Reports that the sink has successfully received |bytes_sent| bytes of data. + ReportBytesSent(uint32 bytes_sent); + + // Reports that the sink has received |bytes_sent| bytes of data (possibly 0) + // and encountered an error: |error|. The client should respond with + // |bytes_to_flush|, the number of bytes enqueued in the data pipe but not yet + // acked so the correct number of bytes can be flushed from the pipe. + ReportBytesSentAndError(uint32 bytes_sent, int32 error) => (uint32 bytes_to_flush); +}; + } diff --git a/device/serial/serial.gyp b/device/serial/serial.gyp index 3bf050d..37553a1 100644 --- a/device/serial/serial.gyp +++ b/device/serial/serial.gyp @@ -58,6 +58,10 @@ 'buffer.h', 'data_receiver.cc', 'data_receiver.h', + 'data_sender.cc', + 'data_sender.h', + 'data_sink_receiver.cc', + 'data_sink_receiver.h', 'data_source_sender.cc', 'data_source_sender.h', 'serial_connection.cc', |