summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-14 00:00:53 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-14 00:00:53 +0000
commitae3d357e37bcafa1544a624a0929ea90ef8bd1b2 (patch)
treeffa6287da80977d534b8d8641754a6756f9dbf03 /mojo
parent481d2498a8c07fd290ae0e9d6b25dcaf6198c05f (diff)
downloadchromium_src-ae3d357e37bcafa1544a624a0929ea90ef8bd1b2.zip
chromium_src-ae3d357e37bcafa1544a624a0929ea90ef8bd1b2.tar.gz
chromium_src-ae3d357e37bcafa1544a624a0929ea90ef8bd1b2.tar.bz2
Mojo: More data pipe work -- consumer-side dispatcher.
Define/implement DataPipeConsumerDispatcher, and the parts of the DataPipe base class that it interacts with. R=sky@chromium.org Review URL: https://codereview.chromium.org/101623013 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@240809 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp2
-rw-r--r--mojo/public/system/core.h22
-rw-r--r--mojo/system/core_impl.cc1
-rw-r--r--mojo/system/data_pipe.cc80
-rw-r--r--mojo/system/data_pipe.h31
-rw-r--r--mojo/system/data_pipe_consumer_dispatcher.cc115
-rw-r--r--mojo/system/data_pipe_consumer_dispatcher.h59
7 files changed, 303 insertions, 7 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index 51f9050..b829011 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -80,6 +80,8 @@
'system/core_impl.h',
'system/data_pipe.cc',
'system/data_pipe.h',
+ 'system/data_pipe_consumer_dispatcher.cc',
+ 'system/data_pipe_consumer_dispatcher.h',
'system/data_pipe_producer_dispatcher.cc',
'system/data_pipe_producer_dispatcher.h',
'system/dispatcher.cc',
diff --git a/mojo/public/system/core.h b/mojo/public/system/core.h
index 8d4cb02..7e819fff 100644
--- a/mojo/public/system/core.h
+++ b/mojo/public/system/core.h
@@ -225,27 +225,43 @@ struct MojoCreateDataPipeOptions {
// |MojoWriteDataFlags|: Used to specify different modes to |MojoWriteData()|
// and |MojoBeginWriteData()|.
// |MOJO_WRITE_DATA_FLAG_NONE| - No flags; default mode.
-// TODO(vtl)
+// |MOJO_WRITE_DATA_FLAG_ALL_OR_NOTHING| - Write either all the elements
+// requested or none of them.
typedef uint32_t MojoWriteDataFlags;
#ifdef __cplusplus
const MojoWriteDataFlags MOJO_WRITE_DATA_FLAG_NONE = 0;
+const MojoWriteDataFlags MOJO_WRITE_DATA_FLAG_ALL_OR_NONE = 1 << 0;
#else
#define MOJO_WRITE_DATA_FLAG_NONE ((MojoWriteDataFlags) 0)
+#define MOJO_WRITE_DATA_FLAG_ALL_OR_NONE ((MojoWriteDataFlags) 1 << 0)
#endif
// |MojoReadDataFlags|: Used to specify different modes to |MojoReadData()| and
// |MojoBeginReadData()|.
// |MOJO_READ_DATA_FLAG_NONE| - No flags; default mode.
-// TODO(vtl)
+// |MOJO_READ_DATA_FLAG_ALL_OR_NONE| - Read (or discard) either the requested
+// number of elements or none.
+// |MOJO_READ_DATA_FLAG_DISCARD| - Discard (up to) the requested number of
+// elements.
+// |MOJO_READ_DATA_FLAG_QUERY| - Query the number of elements available to
+// read. For use with |MojoReadData()| only. Mutually exclusive with
+// |MOJO_READ_DATA_FLAG_DISCARD| and |MOJO_READ_DATA_FLAG_ALL_OR_NONE| is
+// ignored if this flag is set.
typedef uint32_t MojoReadDataFlags;
#ifdef __cplusplus
const MojoReadDataFlags MOJO_READ_DATA_FLAG_NONE = 0;
+const MojoReadDataFlags MOJO_READ_DATA_FLAG_ALL_OR_NONE = 1 << 0;
+const MojoReadDataFlags MOJO_READ_DATA_FLAG_DISCARD = 1 << 1;
+const MojoReadDataFlags MOJO_READ_DATA_FLAG_QUERY = 1 << 2;
#else
#define MOJO_READ_DATA_FLAG_NONE ((MojoReadDataFlags) 0)
+#define MOJO_READ_DATA_FLAG_ALL_OR_NONE ((MojoReadDataFlags) 1 << 0)
+#define MOJO_READ_DATA_FLAG_DISCARD ((MojoReadDataFlags) 1 << 1)
+#define MOJO_READ_DATA_FLAG_QUERY ((MojoReadDataFlags) 1 << 2)
#endif
// Functions -------------------------------------------------------------------
@@ -430,6 +446,8 @@ MOJO_SYSTEM_EXPORT MojoResult MojoEndWriteData(
MojoHandle data_pipe_producer_handle,
uint32_t num_elements_written);
+// TODO(vtl): Note to self: If |MOJO_READ_DATA_FLAG_QUERY| is set, then
+// |elements| must be null (and nothing will be read).
MOJO_SYSTEM_EXPORT MojoResult MojoReadData(
MojoHandle data_pipe_consumer_handle,
void* elements,
diff --git a/mojo/system/core_impl.cc b/mojo/system/core_impl.cc
index 5b26d9f..c946a25 100644
--- a/mojo/system/core_impl.cc
+++ b/mojo/system/core_impl.cc
@@ -8,6 +8,7 @@
#include "base/logging.h"
#include "base/time/time.h"
+#include "mojo/system/data_pipe_consumer_dispatcher.h"
#include "mojo/system/data_pipe_producer_dispatcher.h"
#include "mojo/system/dispatcher.h"
#include "mojo/system/limits.h"
diff --git a/mojo/system/data_pipe.cc b/mojo/system/data_pipe.cc
index 3d6aeb5..0f8cb9f 100644
--- a/mojo/system/data_pipe.cc
+++ b/mojo/system/data_pipe.cc
@@ -88,6 +88,86 @@ void DataPipe::ProducerRemoveWaiter(Waiter* waiter) {
producer_waiter_list_->RemoveWaiter(waiter);
}
+void DataPipe::ConsumerCancelAllWaiters() {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+ consumer_waiter_list_->CancelAllWaiters();
+}
+
+void DataPipe::ConsumerClose() {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+ consumer_waiter_list_.reset();
+ ConsumerCloseImplNoLock();
+}
+
+MojoResult DataPipe::ConsumerReadData(void* elements,
+ uint32_t* num_elements,
+ MojoReadDataFlags flags) {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+
+ if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
+ return ConsumerDiscardDataNoLock(num_elements,
+ (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE));
+ }
+ if ((flags & MOJO_READ_DATA_FLAG_QUERY))
+ return ConsumerQueryDataNoLock(num_elements);
+
+ const void* buffer = NULL;
+ uint32_t buffer_num_elements = 0;
+ MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer,
+ &buffer_num_elements,
+ flags);
+ if (rv != MOJO_RESULT_OK)
+ return rv;
+
+ uint32_t num_elements_to_read = std::min(*num_elements, buffer_num_elements);
+ memcpy(elements, buffer, num_elements_to_read * element_size_);
+
+ rv = ConsumerEndReadDataImplNoLock(num_elements_to_read);
+ if (rv != MOJO_RESULT_OK)
+ return rv;
+
+ *num_elements = num_elements_to_read;
+ return MOJO_RESULT_OK;
+}
+
+MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags) {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+ return ConsumerBeginReadDataImplNoLock(buffer, buffer_num_elements, flags);
+}
+
+MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+ return ConsumerEndReadDataImplNoLock(num_elements_read);
+}
+
+MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+
+ if ((flags & ConsumerSatisfiedFlagsNoLock()))
+ return MOJO_RESULT_ALREADY_EXISTS;
+ if (!(flags & ConsumerSatisfiableFlagsNoLock()))
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ consumer_waiter_list_->AddWaiter(waiter, flags, wake_result);
+ return MOJO_RESULT_OK;
+}
+
+void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
+ base::AutoLock locker(lock_);
+ DCHECK(has_local_consumer_no_lock());
+ consumer_waiter_list_->RemoveWaiter(waiter);
+}
+
DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer)
: producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL) {
diff --git a/mojo/system/data_pipe.h b/mojo/system/data_pipe.h
index 44dc592..2196671 100644
--- a/mojo/system/data_pipe.h
+++ b/mojo/system/data_pipe.h
@@ -31,11 +31,11 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
// corresponding names.
void ProducerCancelAllWaiters();
void ProducerClose();
- // This does not validate |elements| or |num_elements| (or |*num_elements|).
+ // This does not validate its arguments.
MojoResult ProducerWriteData(const void* elements,
uint32_t* num_elements,
MojoWriteDataFlags flags);
- // This does not validate |buffer| or |buffer_num_elements|.
+ // This does not validate its arguments.
MojoResult ProducerBeginWriteData(void** buffer,
uint32_t* buffer_num_elements,
MojoWriteDataFlags flags);
@@ -45,15 +45,23 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
MojoResult wake_result);
void ProducerRemoveWaiter(Waiter* waiter);
-/* TODO(vtl)
+ // These are called by the consumer dispatcher to implement its methods of
+ // corresponding names.
void ConsumerCancelAllWaiters();
void ConsumerClose();
-...
+ // This does not validate its arguments.
+ MojoResult ConsumerReadData(void* elements,
+ uint32_t* num_elements,
+ MojoReadDataFlags flags);
+ // This does not validate its arguments.
+ MojoResult ConsumerBeginReadData(const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags);
+ MojoResult ConsumerEndReadData(uint32_t num_elements_read);
MojoResult ConsumerAddWaiter(Waiter* waiter,
MojoWaitFlags flags,
MojoResult wake_result);
void ConsumerRemoveWaiter(Waiter* waiter);
-*/
// Thread-safe and fast (doesn't take the lock).
size_t element_size() const { return element_size_; }
@@ -73,6 +81,19 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
virtual MojoWaitFlags ProducerSatisfiedFlagsNoLock() = 0;
virtual MojoWaitFlags ProducerSatisfiableFlagsNoLock() = 0;
+ virtual void ConsumerCloseImplNoLock() = 0;
+ virtual MojoResult ConsumerDiscardDataNoLock(uint32_t* num_elements,
+ bool all_or_none) = 0;
+ virtual MojoResult ConsumerQueryDataNoLock(uint32_t* num_elements) = 0;
+ virtual MojoResult ConsumerBeginReadDataImplNoLock(
+ const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags) = 0;
+ virtual MojoResult ConsumerEndReadDataImplNoLock(
+ uint32_t num_elements_read) = 0;
+ virtual MojoWaitFlags ConsumerSatisfiedFlagsNoLock() = 0;
+ virtual MojoWaitFlags ConsumerSatisfiableFlagsNoLock() = 0;
+
private:
friend class base::RefCountedThreadSafe<DataPipe>;
virtual ~DataPipe();
diff --git a/mojo/system/data_pipe_consumer_dispatcher.cc b/mojo/system/data_pipe_consumer_dispatcher.cc
new file mode 100644
index 0000000..f0fc451
--- /dev/null
+++ b/mojo/system/data_pipe_consumer_dispatcher.cc
@@ -0,0 +1,115 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/system/data_pipe_consumer_dispatcher.h"
+
+#include "base/logging.h"
+#include "mojo/system/data_pipe.h"
+#include "mojo/system/memory.h"
+
+namespace mojo {
+namespace system {
+
+DataPipeConsumerDispatcher::DataPipeConsumerDispatcher() {
+}
+
+void DataPipeConsumerDispatcher::Init(scoped_refptr<DataPipe> data_pipe) {
+ DCHECK(data_pipe.get());
+ data_pipe_ = data_pipe;
+}
+
+DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
+ // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
+ DCHECK(!data_pipe_.get());
+}
+
+void DataPipeConsumerDispatcher::CancelAllWaitersNoLock() {
+ lock().AssertAcquired();
+ data_pipe_->ConsumerCancelAllWaiters();
+}
+
+MojoResult DataPipeConsumerDispatcher::CloseImplNoLock() {
+ lock().AssertAcquired();
+ data_pipe_->ConsumerClose();
+ data_pipe_ = NULL;
+ return MOJO_RESULT_OK;
+}
+
+MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
+ void* elements,
+ uint32_t* num_elements,
+ MojoReadDataFlags flags) {
+ lock().AssertAcquired();
+
+ if (!VerifyUserPointer<uint32_t>(num_elements, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ // These flags are mutally exclusive.
+ if ((flags & MOJO_READ_DATA_FLAG_DISCARD) &&
+ (flags & MOJO_READ_DATA_FLAG_QUERY))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
+ (flags & MOJO_READ_DATA_FLAG_QUERY)) {
+ DVLOG_IF(2, elements) << "Discard/query mode: ignoring non-null |elements|";
+ elements = NULL; // Null it out for safety.
+ } else {
+ // Only verify |elements| if we're neither discarding nor querying.
+ if (!VerifyUserPointerForSize(elements, data_pipe_->element_size(),
+ *num_elements))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+
+ return data_pipe_->ConsumerReadData(elements, num_elements, flags);
+}
+
+MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
+ const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags) {
+ lock().AssertAcquired();
+
+ if (!VerifyUserPointer<const void*>(buffer, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (!VerifyUserPointer<uint32_t>(buffer_num_elements, 1))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ // These flags may not be used in two-phase mode.
+ if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
+ (flags & MOJO_READ_DATA_FLAG_QUERY))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return data_pipe_->ConsumerBeginReadData(buffer, buffer_num_elements, flags);
+}
+
+MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
+ uint32_t num_elements_read) {
+ lock().AssertAcquired();
+
+ return data_pipe_->ConsumerEndReadData(num_elements_read);
+}
+
+MojoResult DataPipeConsumerDispatcher::AddWaiterImplNoLock(
+ Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) {
+ lock().AssertAcquired();
+ return data_pipe_->ConsumerAddWaiter(waiter, flags, wake_result);
+}
+
+void DataPipeConsumerDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
+ lock().AssertAcquired();
+ data_pipe_->ConsumerRemoveWaiter(waiter);
+}
+
+scoped_refptr<Dispatcher>
+DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
+ lock().AssertAcquired();
+
+ scoped_refptr<DataPipeConsumerDispatcher> rv =
+ new DataPipeConsumerDispatcher();
+ rv->Init(data_pipe_);
+ data_pipe_ = NULL;
+ return scoped_refptr<Dispatcher>(rv.get());
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/data_pipe_consumer_dispatcher.h b/mojo/system/data_pipe_consumer_dispatcher.h
new file mode 100644
index 0000000..cd24db06
--- /dev/null
+++ b/mojo/system/data_pipe_consumer_dispatcher.h
@@ -0,0 +1,59 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_
+#define MOJO_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "mojo/system/dispatcher.h"
+#include "mojo/system/system_impl_export.h"
+
+namespace mojo {
+namespace system {
+
+class DataPipe;
+
+// This is the |Dispatcher| implementation for the consumer handle for data
+// pipes (created by the Mojo primitive |MojoCreateDataPipe()|). This class is
+// thread-safe.
+class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher : public Dispatcher {
+ public:
+ DataPipeConsumerDispatcher();
+
+ // Must be called before any other methods.
+ void Init(scoped_refptr<DataPipe> data_pipe);
+
+ private:
+ friend class base::RefCountedThreadSafe<DataPipeConsumerDispatcher>;
+ virtual ~DataPipeConsumerDispatcher();
+
+ // |Dispatcher| implementation/overrides:
+ virtual void CancelAllWaitersNoLock() OVERRIDE;
+ virtual MojoResult CloseImplNoLock() OVERRIDE;
+ virtual MojoResult ReadDataImplNoLock(void* elements,
+ uint32_t* num_elements,
+ MojoReadDataFlags flags) OVERRIDE;
+ virtual MojoResult BeginReadDataImplNoLock(const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags) OVERRIDE;
+ virtual MojoResult EndReadDataImplNoLock(uint32_t num_elements_read) OVERRIDE;
+ virtual MojoResult AddWaiterImplNoLock(Waiter* waiter,
+ MojoWaitFlags flags,
+ MojoResult wake_result) OVERRIDE;
+ virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE;
+ virtual scoped_refptr<Dispatcher>
+ CreateEquivalentDispatcherAndCloseImplNoLock() OVERRIDE;
+
+ // Protected by |lock()|:
+ scoped_refptr<DataPipe> data_pipe_; // This will be null if closed.
+
+ DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_DATA_PIPE_CONSUMER_DISPATCHER_H_