summaryrefslogtreecommitdiffstats
path: root/device
diff options
context:
space:
mode:
authorsammc@chromium.org <sammc@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-08-19 06:14:02 +0000
committersammc@chromium.org <sammc@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-08-19 06:15:27 +0000
commit4e99a933bf3cfc73ba8d4b6086634cf72449227f (patch)
tree6772b725b73e07e31322ccc6f1a30f28b297e269 /device
parent3eef1b35d0ea704a274d65e2d662ecdb83768eb2 (diff)
downloadchromium_src-4e99a933bf3cfc73ba8d4b6086634cf72449227f.zip
chromium_src-4e99a933bf3cfc73ba8d4b6086634cf72449227f.tar.gz
chromium_src-4e99a933bf3cfc73ba8d4b6086634cf72449227f.tar.bz2
Add data pipe wrappers to be used to implement serial send.
BUG=389016 Review URL: https://codereview.chromium.org/466623003 Cr-Commit-Position: refs/heads/master@{#290507} git-svn-id: svn://svn.chromium.org/chrome/trunk/src@290507 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'device')
-rw-r--r--device/device_tests.gyp1
-rw-r--r--device/serial/BUILD.gn4
-rw-r--r--device/serial/data_sender.cc280
-rw-r--r--device/serial/data_sender.h112
-rw-r--r--device/serial/data_sink_receiver.cc306
-rw-r--r--device/serial/data_sink_receiver.h114
-rw-r--r--device/serial/data_sink_unittest.cc413
-rw-r--r--device/serial/data_stream.mojom22
-rw-r--r--device/serial/serial.gyp4
9 files changed, 1256 insertions, 0 deletions
diff --git a/device/device_tests.gyp b/device/device_tests.gyp
index 118e785..7e913e6 100644
--- a/device/device_tests.gyp
+++ b/device/device_tests.gyp
@@ -47,6 +47,7 @@
'hid/hid_report_descriptor_unittest.cc',
'hid/hid_service_unittest.cc',
'hid/input_service_linux_unittest.cc',
+ 'serial/data_sink_unittest.cc',
'serial/data_source_unittest.cc',
'serial/serial_connection_unittest.cc',
'serial/serial_service_unittest.cc',
diff --git a/device/serial/BUILD.gn b/device/serial/BUILD.gn
index 58cc637..d6b1313 100644
--- a/device/serial/BUILD.gn
+++ b/device/serial/BUILD.gn
@@ -15,6 +15,10 @@ static_library("serial") {
"buffer.h",
"data_receiver.cc",
"data_receiver.h",
+ "data_sender.cc",
+ "data_sender.h",
+ "data_sink_receiver.cc",
+ "data_sink_receiver.h",
"data_source_sender.cc",
"data_source_sender.h",
"serial_connection.cc",
diff --git a/device/serial/data_sender.cc b/device/serial/data_sender.cc
new file mode 100644
index 0000000..409f5f4
--- /dev/null
+++ b/device/serial/data_sender.cc
@@ -0,0 +1,280 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_sender.h"
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+// Represents a send that is not yet fulfilled.
+class DataSender::PendingSend {
+ public:
+ PendingSend(const base::StringPiece& data,
+ const DataSentCallback& callback,
+ const SendErrorCallback& error_callback,
+ int32_t fatal_error_value);
+
+ // Invoked to report that |num_bytes| of data have been sent. Subtracts the
+ // number of bytes that were part of this send from |num_bytes|. Returns
+ // whether this send has been completed. If this send has been completed, this
+ // calls |callback_|.
+ bool ReportBytesSent(uint32_t* num_bytes);
+
+ // Invoked to report that |num_bytes| of data have been sent and then an
+ // error, |error| was encountered. Subtracts the number of bytes that were
+ // part of this send from |num_bytes|. If this send was not completed before
+ // the error, this calls |error_callback_| to report the error. Otherwise,
+ // this calls |callback_|. Returns the number of bytes sent but not acked.
+ uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
+
+ // Reports |fatal_error_value_| to |receive_error_callback_|.
+ void DispatchFatalError();
+
+ // Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
+ // if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
+ // or the error if one is encountered writing to |handle|.
+ MojoResult SendData(mojo::DataPipeProducerHandle handle);
+
+ private:
+ // Invoked to update |bytes_acked_| and |num_bytes|.
+ void ReportBytesSentInternal(uint32_t* num_bytes);
+
+ // The data to send.
+ const base::StringPiece data_;
+
+ // The callback to report success.
+ const DataSentCallback callback_;
+
+ // The callback to report errors.
+ const SendErrorCallback error_callback_;
+
+ // The error value to report when DispatchFatalError() is called.
+ const int32_t fatal_error_value_;
+
+ // The number of bytes sent to the data pipe.
+ uint32_t bytes_sent_;
+
+ // The number of bytes acked.
+ uint32_t bytes_acked_;
+};
+
+DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
+ uint32_t buffer_size,
+ int32_t fatal_error_value)
+ : sink_(sink.Pass()),
+ fatal_error_value_(fatal_error_value),
+ shut_down_(false) {
+ sink_.set_error_handler(this);
+ MojoCreateDataPipeOptions options = {
+ sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
+ };
+ options.struct_size = sizeof(options);
+ mojo::ScopedDataPipeConsumerHandle remote_handle;
+ MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
+ DCHECK_EQ(MOJO_RESULT_OK, result);
+ sink_->Init(remote_handle.Pass());
+ sink_.set_client(this);
+}
+
+DataSender::~DataSender() {
+ ShutDown();
+}
+
+bool DataSender::Send(const base::StringPiece& data,
+ const DataSentCallback& callback,
+ const SendErrorCallback& error_callback) {
+ DCHECK(!callback.is_null() && !error_callback.is_null());
+ if (!pending_cancel_.is_null() || shut_down_)
+ return false;
+
+ pending_sends_.push(linked_ptr<PendingSend>(
+ new PendingSend(data, callback, error_callback, fatal_error_value_)));
+ SendInternal();
+ return true;
+}
+
+bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
+ DCHECK(!callback.is_null());
+ if (!pending_cancel_.is_null() || shut_down_)
+ return false;
+ if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
+ base::MessageLoop::current()->PostTask(FROM_HERE, callback);
+ return true;
+ }
+
+ pending_cancel_ = callback;
+ sink_->Cancel(error);
+ return true;
+}
+
+void DataSender::ReportBytesSent(uint32_t bytes_sent) {
+ if (shut_down_)
+ return;
+
+ while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
+ sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
+ sends_awaiting_ack_.pop();
+ }
+ if (bytes_sent > 0 && !pending_sends_.empty()) {
+ bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
+ DCHECK(!finished);
+ if (finished) {
+ ShutDown();
+ return;
+ }
+ }
+ if (bytes_sent != 0) {
+ ShutDown();
+ return;
+ }
+ if (pending_sends_.empty() && sends_awaiting_ack_.empty())
+ RunCancelCallback();
+}
+
+void DataSender::ReportBytesSentAndError(
+ uint32_t bytes_sent,
+ int32_t error,
+ const mojo::Callback<void(uint32_t)>& callback) {
+ if (shut_down_)
+ return;
+
+ uint32_t bytes_to_flush = 0;
+ while (!sends_awaiting_ack_.empty()) {
+ bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
+ &bytes_sent, error);
+ sends_awaiting_ack_.pop();
+ }
+ while (!pending_sends_.empty()) {
+ bytes_to_flush +=
+ pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
+ pending_sends_.pop();
+ }
+ callback.Run(bytes_to_flush);
+ RunCancelCallback();
+}
+
+void DataSender::OnConnectionError() {
+ ShutDown();
+}
+
+void DataSender::SendInternal() {
+ while (!pending_sends_.empty()) {
+ MojoResult result = pending_sends_.front()->SendData(handle_.get());
+ if (result == MOJO_RESULT_OK) {
+ sends_awaiting_ack_.push(pending_sends_.front());
+ pending_sends_.pop();
+ } else if (result == MOJO_RESULT_SHOULD_WAIT) {
+ waiter_.reset(new AsyncWaiter(
+ handle_.get(),
+ MOJO_HANDLE_SIGNAL_WRITABLE,
+ base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
+ return;
+ } else {
+ ShutDown();
+ return;
+ }
+ }
+}
+
+void DataSender::OnDoneWaiting(MojoResult result) {
+ waiter_.reset();
+ if (result != MOJO_RESULT_OK) {
+ ShutDown();
+ return;
+ }
+ SendInternal();
+}
+
+void DataSender::RunCancelCallback() {
+ DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
+ if (pending_cancel_.is_null())
+ return;
+
+ base::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(pending_cancel_));
+ pending_cancel_.Reset();
+}
+
+void DataSender::ShutDown() {
+ waiter_.reset();
+ shut_down_ = true;
+ while (!pending_sends_.empty()) {
+ pending_sends_.front()->DispatchFatalError();
+ pending_sends_.pop();
+ }
+ while (!sends_awaiting_ack_.empty()) {
+ sends_awaiting_ack_.front()->DispatchFatalError();
+ sends_awaiting_ack_.pop();
+ }
+ RunCancelCallback();
+}
+
+DataSender::PendingSend::PendingSend(const base::StringPiece& data,
+ const DataSentCallback& callback,
+ const SendErrorCallback& error_callback,
+ int32_t fatal_error_value)
+ : data_(data),
+ callback_(callback),
+ error_callback_(error_callback),
+ fatal_error_value_(fatal_error_value),
+ bytes_sent_(0),
+ bytes_acked_(0) {
+}
+
+bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
+ ReportBytesSentInternal(num_bytes);
+ if (bytes_acked_ < data_.size())
+ return false;
+
+ base::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(callback_, bytes_acked_));
+ return true;
+}
+
+uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
+ int32_t error) {
+ ReportBytesSentInternal(num_bytes);
+ if (*num_bytes > 0) {
+ base::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(callback_, bytes_acked_));
+ return 0;
+ }
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
+ return bytes_sent_ - bytes_acked_;
+}
+
+void DataSender::PendingSend::DispatchFatalError() {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
+}
+
+MojoResult DataSender::PendingSend::SendData(
+ mojo::DataPipeProducerHandle handle) {
+ uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
+ MojoResult result = mojo::WriteDataRaw(handle,
+ data_.data() + bytes_sent_,
+ &bytes_to_send,
+ MOJO_WRITE_DATA_FLAG_NONE);
+ if (result != MOJO_RESULT_OK)
+ return result;
+
+ bytes_sent_ += bytes_to_send;
+ return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
+}
+
+void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
+ bytes_acked_ += *num_bytes;
+ if (bytes_acked_ > bytes_sent_) {
+ *num_bytes = bytes_acked_ - bytes_sent_;
+ bytes_acked_ = bytes_sent_;
+ } else {
+ *num_bytes = 0;
+ }
+}
+
+} // namespace device
diff --git a/device/serial/data_sender.h b/device/serial/data_sender.h
new file mode 100644
index 0000000..ab21db4
--- /dev/null
+++ b/device/serial/data_sender.h
@@ -0,0 +1,112 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef DEVICE_SERIAL_DATA_SENDER_H_
+#define DEVICE_SERIAL_DATA_SENDER_H_
+
+#include <queue>
+
+#include "base/callback.h"
+#include "base/memory/linked_ptr.h"
+#include "base/strings/string_piece.h"
+#include "device/serial/buffer.h"
+#include "device/serial/data_stream.mojom.h"
+#include "mojo/public/cpp/system/data_pipe.h"
+
+namespace device {
+
+class AsyncWaiter;
+
+// A DataSender sends data to a DataSink.
+class DataSender : public serial::DataSinkClient, public mojo::ErrorHandler {
+ public:
+ typedef base::Callback<void(uint32_t bytes_sent)> DataSentCallback;
+ typedef base::Callback<void(uint32_t bytes_sent, int32_t error)>
+ SendErrorCallback;
+ typedef base::Callback<void()> CancelCallback;
+
+ // Constructs a DataSender to send data to |sink|, using a data pipe with a
+ // buffer size of |buffer_size|, with connection errors reported as
+ // |fatal_error_value|.
+ DataSender(mojo::InterfacePtr<serial::DataSink> sink,
+ uint32_t buffer_size,
+ int32_t fatal_error_value);
+
+ virtual ~DataSender();
+
+ // Starts an asynchronous send of |data|. If the send completes successfully,
+ // |callback| will be called. Otherwise, |error_callback| will be called with
+ // the error. If there is a cancel in progress or there has been a connection
+ // error, this will not start a send and returns false. It is the caller's
+ // responsibility to ensure |data| remains valid until one of |callback| and
+ // |error_callback| is called.
+ bool Send(const base::StringPiece& data,
+ const DataSentCallback& callback,
+ const SendErrorCallback& error_callback);
+
+ // Requests the cancellation of any in-progress sends. Calls to Send() will
+ // fail until |callback| is called.
+ bool Cancel(int32_t error, const CancelCallback& callback);
+
+ private:
+ class PendingSend;
+
+ // serial::DataSinkClient overrides.
+ virtual void ReportBytesSent(uint32_t bytes_sent) OVERRIDE;
+ virtual void ReportBytesSentAndError(
+ uint32_t bytes_sent,
+ int32_t error,
+ const mojo::Callback<void(uint32_t)>& callback) OVERRIDE;
+
+ // mojo::ErrorHandler override.
+ virtual void OnConnectionError() OVERRIDE;
+
+ // Copies data from |pending_sends_| into the data pipe and starts |waiter_|
+ // waiting if the pipe is full. When a PendingSend in |pending_sends_| has
+ // been fully copied into the data pipe, it moves to |sends_awaiting_ack_|.
+ void SendInternal();
+
+ // Invoked when |handle_| is ready for writes. Calls SendInternal().
+ void OnDoneWaiting(MojoResult result);
+
+ // Dispatches a cancel callback if one is pending.
+ void RunCancelCallback();
+
+ // Shuts down this DataSender and dispatches fatal errors to all pending
+ // operations.
+ void ShutDown();
+
+ // The control connection to the data sink.
+ mojo::InterfacePtr<serial::DataSink> sink_;
+
+ // The data connection to the data sink.
+ mojo::ScopedDataPipeProducerHandle handle_;
+
+ // The error value to report in the event of a fatal error.
+ const int32_t fatal_error_value_;
+
+ // A waiter used to wait until |handle_| is writable if we are waiting.
+ scoped_ptr<AsyncWaiter> waiter_;
+
+ // A queue of PendingSend that have not yet been fully written to the data
+ // pipe.
+ std::queue<linked_ptr<PendingSend> > pending_sends_;
+
+ // A queue of PendingSend that have been written to the data pipe, but have
+ // not yet been acked by the DataSink.
+ std::queue<linked_ptr<PendingSend> > sends_awaiting_ack_;
+
+ // The callback to report cancel completion if a cancel operation is in
+ // progress.
+ CancelCallback pending_cancel_;
+
+ // Whether we have encountered a fatal error and shut down.
+ bool shut_down_;
+
+ DISALLOW_COPY_AND_ASSIGN(DataSender);
+};
+
+} // namespace device
+
+#endif // DEVICE_SERIAL_DATA_SENDER_H_
diff --git a/device/serial/data_sink_receiver.cc b/device/serial/data_sink_receiver.cc
new file mode 100644
index 0000000..ede0468
--- /dev/null
+++ b/device/serial/data_sink_receiver.cc
@@ -0,0 +1,306 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "device/serial/data_sink_receiver.h"
+
+#include <limits>
+
+#include "base/bind.h"
+#include "device/serial/async_waiter.h"
+
+namespace device {
+
+// Represents a flush of data that has not been completed.
+class DataSinkReceiver::PendingFlush {
+ public:
+ PendingFlush();
+
+ // Initializes this PendingFlush with |num_bytes|, the number of bytes to
+ // flush.
+ void SetNumBytesToFlush(uint32_t num_bytes);
+
+ // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns
+ // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than
+ // |bytes_to_flush_| bytes were flushed or the error if one is encountered
+ // discarding data from |handle|.
+ MojoResult Flush(mojo::DataPipeConsumerHandle handle);
+
+ // Whether this PendingFlush has received the number of bytes to flush.
+ bool received_flush() { return received_flush_; }
+
+ private:
+ // Whether this PendingFlush has received the number of bytes to flush.
+ bool received_flush_;
+
+ // The remaining number of bytes to flush.
+ uint32_t bytes_to_flush_;
+};
+
+// A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
+// a DataSinkReceiver.
+class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
+ public:
+ Buffer(scoped_refptr<DataSinkReceiver> receiver,
+ const char* buffer,
+ uint32_t buffer_size);
+ virtual ~Buffer();
+
+ void Cancel(int32_t error);
+
+ // ReadOnlyBuffer overrides.
+ virtual const char* GetData() OVERRIDE;
+ virtual uint32_t GetSize() OVERRIDE;
+ virtual void Done(uint32_t bytes_read) OVERRIDE;
+ virtual void DoneWithError(uint32_t bytes_read, int32_t error) OVERRIDE;
+
+ private:
+ // The DataSinkReceiver whose data pipe we are providing a view.
+ scoped_refptr<DataSinkReceiver> receiver_;
+
+ const char* buffer_;
+ uint32_t buffer_size_;
+
+ // Whether this receive has been cancelled.
+ bool cancelled_;
+
+ // If |cancelled_|, contains the cancellation error to report.
+ int32_t cancellation_error_;
+};
+
+DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
+ const CancelCallback& cancel_callback,
+ const ErrorCallback& error_callback)
+ : ready_callback_(ready_callback),
+ cancel_callback_(cancel_callback),
+ error_callback_(error_callback),
+ buffer_in_use_(NULL),
+ shut_down_(false),
+ weak_factory_(this) {
+}
+
+void DataSinkReceiver::ShutDown() {
+ shut_down_ = true;
+ if (waiter_)
+ waiter_.reset();
+}
+
+DataSinkReceiver::~DataSinkReceiver() {
+}
+
+void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) {
+ if (!handle.is_valid() || handle_.is_valid()) {
+ DispatchFatalError();
+ return;
+ }
+
+ handle_ = handle.Pass();
+ StartWaiting();
+}
+
+void DataSinkReceiver::Cancel(int32_t error) {
+ // If we have sent a ReportBytesSentAndError but have not received the
+ // response, that ReportBytesSentAndError message will appear to the
+ // DataSinkClient to be caused by this Cancel message. In that case, we ignore
+ // the cancel.
+ if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush())
+ return;
+
+ // If there is a buffer is in use, mark the buffer as cancelled and notify the
+ // client by calling |cancel_callback_|. The sink implementation may or may
+ // not take the cancellation into account when deciding what error (if any) to
+ // return. If the sink returns an error, we ignore the cancellation error.
+ // Otherwise, if the sink does not report an error, we override that with the
+ // cancellation error. Once a cancellation has been received, the next report
+ // sent to the client will always contain an error; the error returned by the
+ // sink or the cancellation error if the sink does not return an error.
+ if (buffer_in_use_) {
+ buffer_in_use_->Cancel(error);
+ if (!cancel_callback_.is_null())
+ cancel_callback_.Run(error);
+ return;
+ }
+ // If there is no buffer in use, immediately report the error and cancel the
+ // waiting for the data pipe if one exists. This transitions straight into the
+ // state after the sink has returned an error.
+ waiter_.reset();
+ ReportBytesSentAndError(0, error);
+}
+
+void DataSinkReceiver::OnConnectionError() {
+ DispatchFatalError();
+}
+
+void DataSinkReceiver::StartWaiting() {
+ DCHECK(!waiter_ && !shut_down_);
+ waiter_.reset(
+ new AsyncWaiter(handle_.get(),
+ MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&DataSinkReceiver::OnDoneWaiting, this)));
+}
+
+void DataSinkReceiver::OnDoneWaiting(MojoResult result) {
+ DCHECK(waiter_ && !shut_down_);
+ waiter_.reset();
+ if (result != MOJO_RESULT_OK) {
+ DispatchFatalError();
+ return;
+ }
+ // If there are any queued flushes (from ReportBytesSentAndError()), let them
+ // flush data from the data pipe.
+ if (!pending_flushes_.empty()) {
+ MojoResult result = pending_flushes_.front()->Flush(handle_.get());
+ if (result == MOJO_RESULT_OK) {
+ pending_flushes_.pop();
+ } else if (result != MOJO_RESULT_SHOULD_WAIT) {
+ DispatchFatalError();
+ return;
+ }
+ StartWaiting();
+ return;
+ }
+ const void* data = NULL;
+ uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
+ result = mojo::BeginReadDataRaw(
+ handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
+ if (result != MOJO_RESULT_OK) {
+ DispatchFatalError();
+ return;
+ }
+ buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes);
+ ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
+}
+
+void DataSinkReceiver::Done(uint32_t bytes_read) {
+ if (!DoneInternal(bytes_read))
+ return;
+ client()->ReportBytesSent(bytes_read);
+ StartWaiting();
+}
+
+void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
+ if (!DoneInternal(bytes_read))
+ return;
+ ReportBytesSentAndError(bytes_read, error);
+}
+
+bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
+ if (shut_down_)
+ return false;
+
+ DCHECK(buffer_in_use_);
+ buffer_in_use_ = NULL;
+ MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read);
+ if (result != MOJO_RESULT_OK) {
+ DispatchFatalError();
+ return false;
+ }
+ return true;
+}
+
+void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
+ int32_t error) {
+ // When we encounter an error, we must discard the data from any sends already
+ // in the data pipe before we can resume dispatching data to the sink. We add
+ // a pending flush here. The response containing the number of bytes to flush
+ // is handled in SetNumBytesToFlush(). The actual flush is handled in
+ // OnDoneWaiting().
+ pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush()));
+ client()->ReportBytesSentAndError(
+ bytes_read,
+ error,
+ base::Bind(&DataSinkReceiver::SetNumBytesToFlush,
+ weak_factory_.GetWeakPtr()));
+}
+
+void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) {
+ DCHECK(!pending_flushes_.empty());
+ DCHECK(!pending_flushes_.back()->received_flush());
+ pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush);
+ if (!waiter_)
+ StartWaiting();
+}
+
+void DataSinkReceiver::DispatchFatalError() {
+ if (shut_down_)
+ return;
+
+ ShutDown();
+ if (!error_callback_.is_null())
+ error_callback_.Run();
+}
+
+DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver,
+ const char* buffer,
+ uint32_t buffer_size)
+ : receiver_(receiver),
+ buffer_(buffer),
+ buffer_size_(buffer_size),
+ cancelled_(false),
+ cancellation_error_(0) {
+}
+
+DataSinkReceiver::Buffer::~Buffer() {
+ if (!receiver_)
+ return;
+ if (cancelled_)
+ receiver_->DoneWithError(0, cancellation_error_);
+ else
+ receiver_->Done(0);
+}
+
+void DataSinkReceiver::Buffer::Cancel(int32_t error) {
+ cancelled_ = true;
+ cancellation_error_ = error;
+}
+
+const char* DataSinkReceiver::Buffer::GetData() {
+ return buffer_;
+}
+
+uint32_t DataSinkReceiver::Buffer::GetSize() {
+ return buffer_size_;
+}
+
+void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
+ if (cancelled_)
+ receiver_->DoneWithError(bytes_read, cancellation_error_);
+ else
+ receiver_->Done(bytes_read);
+ receiver_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
+ int32_t error) {
+ receiver_->DoneWithError(bytes_read, error);
+ receiver_ = NULL;
+ buffer_ = NULL;
+ buffer_size_ = 0;
+}
+
+DataSinkReceiver::PendingFlush::PendingFlush()
+ : received_flush_(false), bytes_to_flush_(0) {
+}
+
+void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) {
+ DCHECK(!received_flush_);
+ received_flush_ = true;
+ bytes_to_flush_ = num_bytes;
+}
+
+MojoResult DataSinkReceiver::PendingFlush::Flush(
+ mojo::DataPipeConsumerHandle handle) {
+ DCHECK(received_flush_);
+ uint32_t num_bytes = bytes_to_flush_;
+ MojoResult result =
+ mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD);
+ if (result != MOJO_RESULT_OK)
+ return result;
+ DCHECK(num_bytes <= bytes_to_flush_);
+ bytes_to_flush_ -= num_bytes;
+ return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
+}
+
+} // namespace device
diff --git a/device/serial/data_sink_receiver.h b/device/serial/data_sink_receiver.h
new file mode 100644
index 0000000..2e63d7a
--- /dev/null
+++ b/device/serial/data_sink_receiver.h
@@ -0,0 +1,114 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef DEVICE_SERIAL_DATA_SINK_RECEIVER_H_
+#define DEVICE_SERIAL_DATA_SINK_RECEIVER_H_
+
+#include <queue>
+
+#include "base/callback.h"
+#include "base/memory/linked_ptr.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "device/serial/buffer.h"
+#include "device/serial/data_stream.mojom.h"
+#include "mojo/public/cpp/system/data_pipe.h"
+
+namespace device {
+
+class AsyncWaiter;
+
+class DataSinkReceiver : public base::RefCounted<DataSinkReceiver>,
+ public mojo::InterfaceImpl<serial::DataSink> {
+ public:
+ typedef base::Callback<void(scoped_ptr<ReadOnlyBuffer>)> ReadyCallback;
+ typedef base::Callback<void(int32_t error)> CancelCallback;
+ typedef base::Callback<void()> ErrorCallback;
+
+ // Constructs a DataSinkReceiver. Whenever the pipe is ready for reading, the
+ // |ready_callback| will be called with the ReadOnlyBuffer to read.
+ // |ready_callback| will not be called again until the previous ReadOnlyBuffer
+ // is destroyed. If a connection error occurs, |error_callback| will be called
+ // and the DataSinkReceiver will act as if ShutDown() had been called. If
+ // |cancel_callback| is valid, it will be called when the DataSinkClient
+ // requests cancellation of the in-progress read.
+ DataSinkReceiver(const ReadyCallback& ready_callback,
+ const CancelCallback& cancel_callback,
+ const ErrorCallback& error_callback);
+
+ // Shuts down this DataSinkReceiver. After shut down, |ready_callback|,
+ // |cancel_callback| and |error_callback| will never be called.
+ void ShutDown();
+
+ private:
+ class Buffer;
+ class PendingFlush;
+ friend class base::RefCounted<DataSinkReceiver>;
+
+ virtual ~DataSinkReceiver();
+
+ // mojo::InterfaceImpl<serial::DataSink> overrides.
+ virtual void Init(mojo::ScopedDataPipeConsumerHandle handle) OVERRIDE;
+ virtual void Cancel(int32_t error) OVERRIDE;
+ virtual void OnConnectionError() OVERRIDE;
+
+ // Starts waiting for |handle_| to be ready for reads.
+ void StartWaiting();
+
+ // Invoked when |handle_| is ready for reads.
+ void OnDoneWaiting(MojoResult result);
+
+ // Reports a successful read of |bytes_read|.
+ void Done(uint32_t bytes_read);
+
+ // Reports a partially successful or unsuccessful read of |bytes_read|
+ // with an error of |error|.
+ void DoneWithError(uint32_t bytes_read, int32_t error);
+
+ // Finishes the two-phase data pipe read.
+ bool DoneInternal(uint32_t bytes_read);
+
+ // Sends an ReportBytesSentAndError message to the client.
+ void ReportBytesSentAndError(uint32_t bytes_read, int32_t error);
+
+ // Invoked in response to an ReportBytesSentAndError call to the client with
+ // the number of bytes to flush.
+ void SetNumBytesToFlush(uint32_t bytes_to_flush);
+
+ // Reports a fatal error to the client and shuts down.
+ void DispatchFatalError();
+
+ // The data connection to the data sender.
+ mojo::ScopedDataPipeConsumerHandle handle_;
+
+ // The callback to call when |handle_| has data ready to read.
+ const ReadyCallback ready_callback_;
+
+ // The callback to call when the client has requested cancellation.
+ const CancelCallback cancel_callback_;
+
+ // The callback to call if a fatal error occurs.
+ const ErrorCallback error_callback_;
+
+ // The queue of pending flushes.
+ std::queue<linked_ptr<PendingFlush> > pending_flushes_;
+
+ // A waiter used to wait until |handle_| is readable if we are waiting.
+ scoped_ptr<AsyncWaiter> waiter_;
+
+ // The buffer passed to |ready_callback_| if one exists. This is not owned,
+ // but the Buffer will call Done or DoneWithError before being deleted.
+ Buffer* buffer_in_use_;
+
+ // Whether we have encountered a fatal error and shut down.
+ bool shut_down_;
+
+ base::WeakPtrFactory<DataSinkReceiver> weak_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(DataSinkReceiver);
+};
+
+} // namespace device
+
+#endif // DEVICE_SERIAL_DATA_SINK_RECEIVER_H_
diff --git a/device/serial/data_sink_unittest.cc b/device/serial/data_sink_unittest.cc
new file mode 100644
index 0000000..46688d2
--- /dev/null
+++ b/device/serial/data_sink_unittest.cc
@@ -0,0 +1,413 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "base/strings/string_piece.h"
+#include "device/serial/async_waiter.h"
+#include "device/serial/data_sender.h"
+#include "device/serial/data_sink_receiver.h"
+#include "device/serial/data_stream.mojom.h"
+#include "mojo/public/cpp/bindings/interface_ptr.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace device {
+
+class DataSinkTest : public testing::Test {
+ public:
+ enum Event {
+ EVENT_NONE,
+ EVENT_READ_BUFFER_READY,
+ EVENT_CANCEL_RECEIVED,
+ EVENT_SEND_SUCCESS,
+ EVENT_SEND_ERROR,
+ EVENT_CANCEL_COMPLETE,
+ EVENT_ERROR,
+ };
+
+ DataSinkTest()
+ : bytes_sent_(0),
+ send_error_(0),
+ has_send_error_(false),
+ cancel_error_(0),
+ seen_connection_error_(false),
+ expected_event_(EVENT_NONE) {}
+
+ virtual void SetUp() OVERRIDE {
+ message_loop_.reset(new base::MessageLoop);
+ mojo::InterfacePtr<serial::DataSink> sink_handle;
+ sink_receiver_ = mojo::WeakBindToProxy(
+ new DataSinkReceiver(
+ base::Bind(&DataSinkTest::OnDataToRead, base::Unretained(this)),
+ base::Bind(&DataSinkTest::OnCancel, base::Unretained(this)),
+ base::Bind(&DataSinkTest::OnError, base::Unretained(this))),
+ &sink_handle);
+ sender_.reset(new DataSender(sink_handle.Pass(), kBufferSize, kFatalError));
+ }
+
+ virtual void TearDown() OVERRIDE {
+ read_buffer_.reset();
+ message_loop_.reset();
+ if (sink_receiver_)
+ EXPECT_TRUE(sink_receiver_->HasOneRef());
+ }
+
+ void WaitForEvent(Event event) {
+ expected_event_ = event;
+ base::RunLoop run_loop;
+ stop_run_loop_ = run_loop.QuitClosure();
+ run_loop.Run();
+ }
+
+ void EventReceived(Event event) {
+ if (event == expected_event_ && !stop_run_loop_.is_null())
+ stop_run_loop_.Run();
+ }
+
+ void OnError() {
+ seen_connection_error_ = true;
+ EventReceived(EVENT_ERROR);
+ }
+
+ void ExpectDataAndReadAll(const base::StringPiece& expected_data) {
+ ExpectData(expected_data, static_cast<uint32_t>(expected_data.size()), 0);
+ }
+
+ void ExpectData(const base::StringPiece& expected_data,
+ uint32_t bytes_to_read,
+ int32_t error) {
+ DCHECK(bytes_to_read <= static_cast<uint32_t>(expected_data.size()));
+ std::string data;
+ while (data.size() < static_cast<size_t>(expected_data.size())) {
+ if (!read_buffer_)
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(read_buffer_);
+ data.append(read_buffer_->GetData(), read_buffer_->GetSize());
+ if (bytes_to_read <= read_buffer_->GetSize()) {
+ if (error)
+ read_buffer_->DoneWithError(bytes_to_read, error);
+ else
+ read_buffer_->Done(bytes_to_read);
+ read_buffer_.reset();
+ break;
+ } else {
+ bytes_to_read -= read_buffer_->GetSize();
+ read_buffer_->Done(read_buffer_->GetSize());
+ read_buffer_.reset();
+ }
+ }
+ // If we terminate early, we may not see all of the data. In that case,
+ // check that the part we saw matches what we expected.
+ if (static_cast<uint32_t>(data.size()) <
+ static_cast<uint32_t>(expected_data.size()) &&
+ data.size() >= bytes_to_read) {
+ EXPECT_EQ(expected_data.substr(0, data.size()), data);
+ return;
+ }
+ EXPECT_EQ(expected_data, data);
+ }
+
+ void ExpectSendSuccess(uint32_t expected_bytes_sent) {
+ bytes_sent_ = 0;
+ WaitForEvent(EVENT_SEND_SUCCESS);
+ EXPECT_EQ(expected_bytes_sent, bytes_sent_);
+ EXPECT_FALSE(has_send_error_);
+ }
+
+ void ExpectSendError(uint32_t expected_bytes_sent, int32_t expected_error) {
+ bytes_sent_ = 0;
+ has_send_error_ = 0;
+ send_error_ = 0;
+ WaitForEvent(EVENT_SEND_ERROR);
+ EXPECT_EQ(expected_bytes_sent, bytes_sent_);
+ EXPECT_TRUE(has_send_error_);
+ EXPECT_EQ(expected_error, send_error_);
+ }
+
+ void ExpectCancel(int32_t expected_error) {
+ cancel_error_ = 0;
+ WaitForEvent(EVENT_CANCEL_RECEIVED);
+ EXPECT_EQ(expected_error, cancel_error_);
+ }
+
+ void ExpectCancelResult() { WaitForEvent(EVENT_CANCEL_COMPLETE); }
+
+ bool Send(const base::StringPiece& data) {
+ return sender_->Send(
+ data,
+ base::Bind(&DataSinkTest::OnDataSent, base::Unretained(this)),
+ base::Bind(&DataSinkTest::OnSendError, base::Unretained(this)));
+ }
+
+ void OnDataSent(uint32_t bytes_sent) {
+ bytes_sent_ = bytes_sent;
+ has_send_error_ = false;
+ EventReceived(EVENT_SEND_SUCCESS);
+ }
+
+ void OnSendError(uint32_t bytes_sent, int32_t error) {
+ bytes_sent_ = bytes_sent;
+ send_error_ = error;
+ has_send_error_ = true;
+ EventReceived(EVENT_SEND_ERROR);
+ }
+
+ void OnDataToRead(scoped_ptr<ReadOnlyBuffer> buffer) {
+ read_buffer_ = buffer.Pass();
+ read_buffer_contents_ =
+ std::string(read_buffer_->GetData(), read_buffer_->GetSize());
+ EventReceived(EVENT_READ_BUFFER_READY);
+ }
+
+ bool Cancel(int32_t error) {
+ return sender_->Cancel(
+ error, base::Bind(&DataSinkTest::CancelAck, base::Unretained(this)));
+ }
+
+ void CancelAck() { EventReceived(EVENT_CANCEL_COMPLETE); }
+
+ void OnCancel(int32_t error) {
+ cancel_error_ = error;
+ EventReceived(EVENT_CANCEL_RECEIVED);
+ }
+
+ protected:
+ static const int32_t kFatalError;
+ static const uint32_t kBufferSize;
+ scoped_ptr<base::MessageLoop> message_loop_;
+ base::Closure stop_run_loop_;
+
+ scoped_refptr<DataSinkReceiver> sink_receiver_;
+ scoped_ptr<DataSender> sender_;
+
+ uint32_t bytes_sent_;
+ int32_t send_error_;
+ bool has_send_error_;
+ int32_t cancel_error_;
+ scoped_ptr<ReadOnlyBuffer> read_buffer_;
+ std::string read_buffer_contents_;
+
+ bool seen_connection_error_;
+
+ Event expected_event_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(DataSinkTest);
+};
+
+const int32_t DataSinkTest::kFatalError = -10;
+const uint32_t DataSinkTest::kBufferSize = 100;
+
+TEST_F(DataSinkTest, Basic) {
+ ASSERT_TRUE(Send("a"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("a"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, LargeSend) {
+ std::string pattern = "1234567890";
+ std::string data;
+ for (uint32_t i = 0; i < kBufferSize; i++)
+ data.append(pattern);
+ ASSERT_TRUE(Send(data));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll(data));
+ ExpectSendSuccess(static_cast<uint32_t>(data.size()));
+}
+
+TEST_F(DataSinkTest, ErrorAndAllData) {
+ ASSERT_TRUE(Send("a"));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a", 1, -1));
+ ExpectSendError(1, -1);
+}
+
+TEST_F(DataSinkTest, ErrorAndPartialData) {
+ ASSERT_TRUE(Send("ab"));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 1, -1));
+ ExpectSendError(1, -1);
+
+ ASSERT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, ErrorAndPartialDataAtPacketBoundary) {
+ ASSERT_TRUE(Send("ab"));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 2, -1));
+ ExpectSendError(2, -1);
+
+ ASSERT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, ErrorInSecondPacket) {
+ ASSERT_TRUE(Send("a"));
+ ASSERT_TRUE(Send("b"));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 2, -1));
+ ExpectSendSuccess(1);
+ ExpectSendError(1, -1);
+
+ ASSERT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, ErrorBeforeLargeSend) {
+ ASSERT_TRUE(Send("a"));
+ std::string pattern = "123456789";
+ std::string data;
+ for (int i = 0; i < 100; i++)
+ data.append("1234567890");
+ ASSERT_TRUE(Send(data));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a" + data, 1, -1));
+ ExpectSendError(1, -1);
+ ExpectSendError(0, -1);
+}
+
+TEST_F(DataSinkTest, ErrorOnly) {
+ ASSERT_TRUE(Send("a"));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -1));
+ ExpectSendError(0, -1);
+
+ ASSERT_TRUE(Send("b"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("b"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, Cancel) {
+ ASSERT_TRUE(Send("a"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-2));
+ // Check that sends fail until the cancel operation completes.
+ EXPECT_FALSE(Send("b"));
+ ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -1));
+ // Check that the error reported by the sink implementation is reported to the
+ // client - not the cancellation error.
+ ExpectSendError(0, -1);
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, CancelSinkSucceeds) {
+ ASSERT_TRUE(Send("a"));
+ EXPECT_TRUE(Send("b"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 1, 0));
+ ExpectSendError(1, -2);
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, CancelTwice) {
+ ASSERT_TRUE(Send("a"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -2));
+ ExpectSendError(0, -2);
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("b"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-3));
+ ASSERT_NO_FATAL_FAILURE(ExpectCancel(-3));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("b", 0, -3));
+ ExpectSendError(0, -3);
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, CancelTwiceWithNoSendBetween) {
+ ASSERT_TRUE(Send("a"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -2));
+ ExpectSendError(0, -2);
+ ExpectCancelResult();
+ ASSERT_TRUE(Cancel(-3));
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("b"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("b"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, CancelDuringError) {
+ ASSERT_TRUE(Send("a"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("a", 0, -1));
+ ExpectSendError(0, -1);
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("a"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("a"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, CancelWithoutSend) {
+ ASSERT_TRUE(Cancel(-2));
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("a"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("a"));
+ ExpectSendSuccess(1);
+ EXPECT_EQ(0, cancel_error_);
+}
+
+TEST_F(DataSinkTest, CancelPartialPacket) {
+ ASSERT_TRUE(Send("ab"));
+ WaitForEvent(EVENT_READ_BUFFER_READY);
+ ASSERT_TRUE(Cancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectCancel(-2));
+ ASSERT_NO_FATAL_FAILURE(ExpectData("ab", 1, -2));
+ ExpectSendError(1, -2);
+ ExpectCancelResult();
+
+ EXPECT_TRUE(Send("c"));
+ ASSERT_NO_FATAL_FAILURE(ExpectDataAndReadAll("c"));
+ ExpectSendSuccess(1);
+}
+
+TEST_F(DataSinkTest, SinkReceiverShutdown) {
+ ASSERT_TRUE(Send("a"));
+ ASSERT_TRUE(Send(std::string(kBufferSize * 10, 'b')));
+ ASSERT_TRUE(Cancel(-1));
+ sink_receiver_->ShutDown();
+ sink_receiver_ = NULL;
+ ExpectSendError(0, kFatalError);
+ ExpectSendError(0, kFatalError);
+ ExpectCancelResult();
+ ASSERT_FALSE(Send("a"));
+ ASSERT_FALSE(Cancel(-1));
+}
+
+TEST_F(DataSinkTest, SenderShutdown) {
+ ASSERT_TRUE(Send("a"));
+ ASSERT_TRUE(Send(std::string(kBufferSize * 10, 'b')));
+ ASSERT_TRUE(Cancel(-1));
+ sender_.reset();
+ ExpectSendError(0, kFatalError);
+ ExpectSendError(0, kFatalError);
+ ExpectCancelResult();
+ if (!seen_connection_error_)
+ WaitForEvent(EVENT_ERROR);
+ EXPECT_TRUE(seen_connection_error_);
+}
+
+} // namespace device
diff --git a/device/serial/data_stream.mojom b/device/serial/data_stream.mojom
index f303f20..689b381 100644
--- a/device/serial/data_stream.mojom
+++ b/device/serial/data_stream.mojom
@@ -21,4 +21,26 @@ interface DataSourceClient {
OnError(uint32 error_location, int32 error);
};
+[Client=DataSinkClient]
+interface DataSink {
+ // Initializes this DataSink with a data pipe handle to use for data
+ // transmission.
+ Init(handle<data_pipe_consumer> consumer_handle);
+
+ // Requests the cancellation of any data that has been written to the pipe,
+ // but has not yet been sent to the sink.
+ Cancel(int32 error);
+};
+
+interface DataSinkClient {
+ // Reports that the sink has successfully received |bytes_sent| bytes of data.
+ ReportBytesSent(uint32 bytes_sent);
+
+ // Reports that the sink has received |bytes_sent| bytes of data (possibly 0)
+ // and encountered an error: |error|. The client should respond with
+ // |bytes_to_flush|, the number of bytes enqueued in the data pipe but not yet
+ // acked so the correct number of bytes can be flushed from the pipe.
+ ReportBytesSentAndError(uint32 bytes_sent, int32 error) => (uint32 bytes_to_flush);
+};
+
}
diff --git a/device/serial/serial.gyp b/device/serial/serial.gyp
index 3bf050d..37553a1 100644
--- a/device/serial/serial.gyp
+++ b/device/serial/serial.gyp
@@ -58,6 +58,10 @@
'buffer.h',
'data_receiver.cc',
'data_receiver.h',
+ 'data_sender.cc',
+ 'data_sender.h',
+ 'data_sink_receiver.cc',
+ 'data_sink_receiver.h',
'data_source_sender.cc',
'data_source_sender.h',
'serial_connection.cc',