diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-12-16 20:39:30 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-12-16 20:39:30 +0000 |
commit | 00798015ad24428d303b788b9cc6ae77214da9ba (patch) | |
tree | c9fb6a8222c4e97c3e082544ac165c2f3b82828e /mojo | |
parent | 4cb7fdaeb4a0fb53181b0285755ef022654d36bd (diff) | |
download | chromium_src-00798015ad24428d303b788b9cc6ae77214da9ba.zip chromium_src-00798015ad24428d303b788b9cc6ae77214da9ba.tar.gz chromium_src-00798015ad24428d303b788b9cc6ae77214da9ba.tar.bz2 |
Mojo: "Finish" local (in-process) implementation of data pipes.
Still to do:
- Write tests.
- Fix bug in MojoReadData()/MojoWriteData() (see data_pipe.cc).
- Write comments in core.h.
R=darin@chromium.org
Review URL: https://codereview.chromium.org/113603003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@241015 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/mojo.gyp | 2 | ||||
-rw-r--r-- | mojo/public/system/core.h | 7 | ||||
-rw-r--r-- | mojo/system/core_impl.cc | 5 | ||||
-rw-r--r-- | mojo/system/data_pipe.cc | 103 | ||||
-rw-r--r-- | mojo/system/data_pipe.h | 25 | ||||
-rw-r--r-- | mojo/system/limits.h | 17 | ||||
-rw-r--r-- | mojo/system/local_data_pipe.cc | 226 | ||||
-rw-r--r-- | mojo/system/local_data_pipe.h | 79 |
8 files changed, 446 insertions, 18 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp index b829011..57d8240 100644 --- a/mojo/mojo.gyp +++ b/mojo/mojo.gyp @@ -87,6 +87,8 @@ 'system/dispatcher.cc', 'system/dispatcher.h', 'system/limits.h', + 'system/local_data_pipe.cc', + 'system/local_data_pipe.h', 'system/local_message_pipe_endpoint.cc', 'system/local_message_pipe_endpoint.h', 'system/memory.cc', diff --git a/mojo/public/system/core.h b/mojo/public/system/core.h index 7e819fff..097d8fb 100644 --- a/mojo/public/system/core.h +++ b/mojo/public/system/core.h @@ -200,6 +200,9 @@ const MojoReadMessageFlags MOJO_READ_MESSAGE_FLAG_MAY_DISCARD = 1 << 0; // |MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD|: May discard data for // whatever reason; best-effort delivery. In particular, if the capacity // is reached, old data may be discard to make room for new data. +// +// |element_size * capacity_num_elements| must be less than 2^32 (i.e., it must +// fit into a 32-bit unsigned integer). // TODO(vtl): Finish this. typedef uint32_t MojoCreateDataPipeOptionsFlags; @@ -218,8 +221,8 @@ const MojoCreateDataPipeOptionsFlags struct MojoCreateDataPipeOptions { size_t struct_size; // Set to the size of this structure. MojoCreateDataPipeOptionsFlags flags; - uint32_t element_size; - uint32_t capacity_num_elements; + uint32_t element_size; // Must be nonzero. + uint32_t capacity_num_elements; // Zero means "default"/automatic. }; // |MojoWriteDataFlags|: Used to specify different modes to |MojoWriteData()| diff --git a/mojo/system/core_impl.cc b/mojo/system/core_impl.cc index c946a25..b8a35ee 100644 --- a/mojo/system/core_impl.cc +++ b/mojo/system/core_impl.cc @@ -12,6 +12,7 @@ #include "mojo/system/data_pipe_producer_dispatcher.h" #include "mojo/system/dispatcher.h" #include "mojo/system/limits.h" +#include "mojo/system/local_data_pipe.h" #include "mojo/system/memory.h" #include "mojo/system/message_pipe.h" #include "mojo/system/message_pipe_dispatcher.h" @@ -361,7 +362,6 @@ MojoResult CoreImpl::CreateDataPipe(const MojoCreateDataPipeOptions* options, if (!VerifyUserPointer<MojoHandle>(data_pipe_consumer_handle, 1)) return MOJO_RESULT_INVALID_ARGUMENT; -/* TODO(vtl): The rest of the code will look something like this: scoped_refptr<LocalDataPipe> data_pipe(new LocalDataPipe()); MojoResult result = data_pipe->Init(options); if (result != MOJO_RESULT_OK) @@ -393,9 +393,6 @@ MojoResult CoreImpl::CreateDataPipe(const MojoCreateDataPipeOptions* options, *data_pipe_producer_handle = producer_handle; *data_pipe_consumer_handle = consumer_handle; return MOJO_RESULT_OK; -*/ - NOTIMPLEMENTED(); - return MOJO_RESULT_UNIMPLEMENTED; } MojoResult CoreImpl::WriteData(MojoHandle data_pipe_producer_handle, diff --git a/mojo/system/data_pipe.cc b/mojo/system/data_pipe.cc index 0f8cb9f..f48210f 100644 --- a/mojo/system/data_pipe.cc +++ b/mojo/system/data_pipe.cc @@ -7,8 +7,10 @@ #include <string.h> #include <algorithm> +#include <limits> #include "base/logging.h" +#include "mojo/system/limits.h" #include "mojo/system/memory.h" #include "mojo/system/waiter_list.h" @@ -34,6 +36,11 @@ MojoResult DataPipe::ProducerWriteData(const void* elements, base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); + if (producer_in_two_phase_write_) + return MOJO_RESULT_BUSY; + + // TODO(vtl): This implementation may write less than requested, even if room + // is available. Fix this. (Probably make a subclass-specific impl.) void* buffer = NULL; uint32_t buffer_num_elements = *num_elements; MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer, @@ -58,13 +65,30 @@ MojoResult DataPipe::ProducerBeginWriteData(void** buffer, MojoWriteDataFlags flags) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); - return ProducerBeginWriteDataImplNoLock(buffer, buffer_num_elements, flags); + + if (producer_in_two_phase_write_) + return MOJO_RESULT_BUSY; + + MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer, + buffer_num_elements, + flags); + if (rv != MOJO_RESULT_OK) + return rv; + + producer_in_two_phase_write_ = true; + return MOJO_RESULT_OK; } MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) { base::AutoLock locker(lock_); DCHECK(has_local_producer_no_lock()); - return ProducerEndWriteDataImplNoLock(num_elements_written); + + if (!producer_in_two_phase_write_) + return MOJO_RESULT_FAILED_PRECONDITION; + + MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written); + producer_in_two_phase_write_ = false; // End two-phase write even on failure. + return rv; } MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, @@ -107,6 +131,9 @@ MojoResult DataPipe::ConsumerReadData(void* elements, base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); + if (consumer_in_two_phase_read_) + return MOJO_RESULT_BUSY; + if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { return ConsumerDiscardDataNoLock(num_elements, (flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE)); @@ -114,6 +141,8 @@ MojoResult DataPipe::ConsumerReadData(void* elements, if ((flags & MOJO_READ_DATA_FLAG_QUERY)) return ConsumerQueryDataNoLock(num_elements); + // TODO(vtl): This implementation may write less than requested, even if room + // is available. Fix this. (Probably make a subclass-specific impl.) const void* buffer = NULL; uint32_t buffer_num_elements = 0; MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer, @@ -138,13 +167,30 @@ MojoResult DataPipe::ConsumerBeginReadData(const void** buffer, MojoReadDataFlags flags) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); - return ConsumerBeginReadDataImplNoLock(buffer, buffer_num_elements, flags); + + if (consumer_in_two_phase_read_) + return MOJO_RESULT_BUSY; + + MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer, + buffer_num_elements, + flags); + if (rv != MOJO_RESULT_OK) + return rv; + + consumer_in_two_phase_read_ = true; + return MOJO_RESULT_OK; } MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) { base::AutoLock locker(lock_); DCHECK(has_local_consumer_no_lock()); - return ConsumerEndReadDataImplNoLock(num_elements_read); + + if (!consumer_in_two_phase_read_) + return MOJO_RESULT_FAILED_PRECONDITION; + + MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read); + consumer_in_two_phase_read_ = false; // End two-phase read even on failure. + return rv; } MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, @@ -169,8 +215,11 @@ void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) { } DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer) - : producer_waiter_list_(has_local_producer ? new WaiterList() : NULL), - consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL) { + : element_size_(0), + producer_waiter_list_(has_local_producer ? new WaiterList() : NULL), + consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL), + producer_in_two_phase_write_(false), + consumer_in_two_phase_read_(false) { DCHECK(has_local_producer || has_local_consumer); } @@ -179,5 +228,47 @@ DataPipe::~DataPipe() { DCHECK(!has_local_consumer_no_lock()); } +MojoResult DataPipe::Init(bool may_discard, + size_t element_size, + size_t capacity_num_elements) { + // No need to lock: This method is not thread-safe. + + if (element_size == 0) + return MOJO_RESULT_INVALID_ARGUMENT; + if (!capacity_num_elements) { + // Set the capacity to the default (rounded down by element size, but always + // at least one element). + capacity_num_elements = + std::max(static_cast<size_t>(1), + kDefaultDataPipeCapacityBytes / element_size); + } + if (capacity_num_elements > + std::numeric_limits<uint32_t>::max() / element_size) + return MOJO_RESULT_INVALID_ARGUMENT; + if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + may_discard_ = may_discard; + element_size_ = element_size; + capacity_num_elements_ = capacity_num_elements; + return MOJO_RESULT_OK; +} + +void DataPipe::AwakeProducerWaitersForStateChangeNoLock() { + lock_.AssertAcquired(); + if (!has_local_producer_no_lock()) + return; + producer_waiter_list_->AwakeWaitersForStateChange( + ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock()); +} + +void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() { + lock_.AssertAcquired(); + if (!has_local_consumer_no_lock()) + return; + consumer_waiter_list_->AwakeWaitersForStateChange( + ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock()); +} + } // namespace system } // namespace mojo diff --git a/mojo/system/data_pipe.h b/mojo/system/data_pipe.h index 2196671..3138435 100644 --- a/mojo/system/data_pipe.h +++ b/mojo/system/data_pipe.h @@ -63,13 +63,25 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe : MojoResult wake_result); void ConsumerRemoveWaiter(Waiter* waiter); - // Thread-safe and fast (doesn't take the lock). + // Thread-safe and fast (they don't take the lock): + bool may_discard() const { return may_discard_; } size_t element_size() const { return element_size_; } + size_t capacity_num_elements() const { return capacity_num_elements_; } protected: DataPipe(bool has_local_producer, bool has_local_consumer); - void Init(size_t element_size); + friend class base::RefCountedThreadSafe<DataPipe>; + virtual ~DataPipe(); + + // Not thread-safe; must be called before any other methods are called. This + // object is only usable on success. + MojoResult Init(bool may_discard, + size_t element_size, + size_t capacity_num_elements); + + void AwakeProducerWaitersForStateChangeNoLock(); + void AwakeConsumerWaitersForStateChangeNoLock(); virtual void ProducerCloseImplNoLock() = 0; virtual MojoResult ProducerBeginWriteDataImplNoLock( @@ -95,9 +107,6 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe : virtual MojoWaitFlags ConsumerSatisfiableFlagsNoLock() = 0; private: - friend class base::RefCountedThreadSafe<DataPipe>; - virtual ~DataPipe(); - bool has_local_producer_no_lock() const { return !!producer_waiter_list_.get(); } @@ -105,12 +114,16 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe : return !!consumer_waiter_list_.get(); } - // Set by |Init()| and never changed afterwards. + // Set by |Init()| and never changed afterwards: + bool may_discard_; size_t element_size_; + size_t capacity_num_elements_; base::Lock lock_; // Protects the following members. scoped_ptr<WaiterList> producer_waiter_list_; scoped_ptr<WaiterList> consumer_waiter_list_; + bool producer_in_two_phase_write_; + bool consumer_in_two_phase_read_; DISALLOW_COPY_AND_ASSIGN(DataPipe); }; diff --git a/mojo/system/limits.h b/mojo/system/limits.h index 277c23a..6b1f9bc 100644 --- a/mojo/system/limits.h +++ b/mojo/system/limits.h @@ -2,9 +2,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// TODO(vtl): Rename this file to constants.h. + #ifndef MOJO_SYSTEM_LIMITS_H_ #define MOJO_SYSTEM_LIMITS_H_ +#include <stddef.h> + namespace mojo { namespace system { @@ -19,6 +23,19 @@ const size_t kMaxMessageNumBytes = 4 * 1024 * 1024; const size_t kMaxMessageNumHandles = 10000; +// Maximum capacity of a data pipe, in bytes. This value must fit into a +// |uint32_t|. +// WARNING: If you bump it closer to 2^32, you must audit all the code to check +// that we don't overflow (2^31 would definitely be risky; up to 2^30 is +// probably okay). +const size_t kMaxDataPipeCapacityBytes = 256 * 1024 * 1024; // 256 MB. + +const size_t kDefaultDataPipeCapacityBytes = 1024 * 1024; // 1 MB. + +// Alignment for the "start" of the data buffer used by data pipes. (The +// alignment of elements will depend on this and the element size.) +const size_t kDataPipeBufferAlignmentBytes = 16; + } // namespace system } // namespace mojo diff --git a/mojo/system/local_data_pipe.cc b/mojo/system/local_data_pipe.cc new file mode 100644 index 0000000..d970102 --- /dev/null +++ b/mojo/system/local_data_pipe.cc @@ -0,0 +1,226 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// TODO(vtl): I currently potentially overflow in doing index calculations. +// E.g., |buffer_first_element_index_| and |buffer_current_num_elements_| fit +// into a |uint32_t|, but their sum may not. This is bad and poses a security +// risk. (We're currently saved by the limit on capacity -- the maximum size of +// the buffer, checked in |DataPipe::Init()|, is currently sufficiently small. + +#include "mojo/system/local_data_pipe.h" + +#include <algorithm> + +#include "base/logging.h" +#include "mojo/system/limits.h" + +namespace mojo { +namespace system { + +LocalDataPipe::LocalDataPipe() + : DataPipe(true, true), + producer_open_(true), + consumer_open_(true), + buffer_first_element_index_(0), + buffer_current_num_elements_(0), + two_phase_max_elements_written_(0), + two_phase_max_elements_read_(0) { +} + +MojoResult LocalDataPipe::Init(const MojoCreateDataPipeOptions* options) { + static const MojoCreateDataPipeOptions kDefaultOptions = { + sizeof(MojoCreateDataPipeOptions), // |struct_size|. + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. + 1u, // |element_size|. + static_cast<uint32_t>(kDefaultDataPipeCapacityBytes) + }; + if (!options) + options = &kDefaultOptions; + + if (options->struct_size < sizeof(*options)) + return MOJO_RESULT_INVALID_ARGUMENT; + + // Note: lazily allocate memory, since a common case will be that one of the + // handles is immediately passed off to another process. + return DataPipe::Init( + (options->flags & MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD), + static_cast<size_t>(options->element_size), + static_cast<size_t>(options->capacity_num_elements)); +} + +LocalDataPipe::~LocalDataPipe() { + DCHECK(!producer_open_); + DCHECK(!consumer_open_); +} + +void LocalDataPipe::ProducerCloseImplNoLock() { + DCHECK(producer_open_); + producer_open_ = false; + if (!buffer_current_num_elements_) { + buffer_.reset(); + buffer_current_num_elements_ = 0; + } + AwakeConsumerWaitersForStateChangeNoLock(); +} + +MojoResult LocalDataPipe::ProducerBeginWriteDataImplNoLock( + void** buffer, + uint32_t* buffer_num_elements, + MojoWriteDataFlags flags) { + size_t max_elements_to_write = GetMaxElementsToWriteNoLock(); + // TODO(vtl): Consider this return value. + if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) && + *buffer_num_elements < max_elements_to_write) + return MOJO_RESULT_OUT_OF_RANGE; + + size_t next_index = (buffer_first_element_index_ + + buffer_current_num_elements_) % capacity_num_elements(); + EnsureBufferNoLock(); + *buffer = buffer_.get() + next_index * element_size(); + *buffer_num_elements = static_cast<uint32_t>(max_elements_to_write); + two_phase_max_elements_written_ = + static_cast<uint32_t>(max_elements_to_write); + return MOJO_RESULT_OK; +} + +MojoResult LocalDataPipe::ProducerEndWriteDataImplNoLock( + uint32_t num_elements_written) { + if (num_elements_written > two_phase_max_elements_written_) { + // Note: The two-phase write ends here even on failure. + two_phase_max_elements_written_ = 0; // For safety. + return MOJO_RESULT_INVALID_ARGUMENT; + } + + buffer_current_num_elements_ += num_elements_written; + DCHECK_LE(buffer_current_num_elements_, capacity_num_elements()); + two_phase_max_elements_written_ = 0; // For safety. + return MOJO_RESULT_OK; +} + +MojoWaitFlags LocalDataPipe::ProducerSatisfiedFlagsNoLock() { + MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE; + if (consumer_open_ && buffer_current_num_elements_ < capacity_num_elements()) + rv |= MOJO_WAIT_FLAG_WRITABLE; + return rv; +} + +MojoWaitFlags LocalDataPipe::ProducerSatisfiableFlagsNoLock() { + MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE; + if (consumer_open_) + rv |= MOJO_WAIT_FLAG_WRITABLE; + return rv; +} + +void LocalDataPipe::ConsumerCloseImplNoLock() { + DCHECK(consumer_open_); + consumer_open_ = false; + buffer_.reset(); + buffer_current_num_elements_ = 0; + AwakeProducerWaitersForStateChangeNoLock(); +} + +MojoResult LocalDataPipe::ConsumerDiscardDataNoLock(uint32_t* num_elements, + bool all_or_none) { + // TODO(vtl): Think about the error code in this case. + if (all_or_none && *num_elements > buffer_current_num_elements_) + return MOJO_RESULT_OUT_OF_RANGE; + + size_t num_elements_to_discard = + std::min(static_cast<size_t>(*num_elements), + buffer_current_num_elements_); + buffer_first_element_index_ = + (buffer_first_element_index_ + num_elements_to_discard) % + capacity_num_elements(); + buffer_current_num_elements_ -= num_elements_to_discard; + + AwakeProducerWaitersForStateChangeNoLock(); + AwakeConsumerWaitersForStateChangeNoLock(); + + *num_elements = static_cast<uint32_t>(num_elements_to_discard); + return MOJO_RESULT_OK; +} + +MojoResult LocalDataPipe::ConsumerQueryDataNoLock(uint32_t* num_elements) { + // Note: This cast is safe, since the capacity fits into a |uint32_t|. + *num_elements = static_cast<uint32_t>(buffer_current_num_elements_); + return MOJO_RESULT_OK; +} + +MojoResult LocalDataPipe::ConsumerBeginReadDataImplNoLock( + const void** buffer, + uint32_t* buffer_num_elements, + MojoReadDataFlags flags) { + size_t max_elements_to_read = GetMaxElementsToReadNoLock(); + // TODO(vtl): Consider this return value. + if ((flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE) && + *buffer_num_elements < max_elements_to_read) + return MOJO_RESULT_OUT_OF_RANGE; + // Note: This works even if |buffer_| is null. + *buffer = buffer_.get() + buffer_first_element_index_ * element_size(); + *buffer_num_elements = static_cast<uint32_t>(max_elements_to_read); + two_phase_max_elements_read_ = static_cast<uint32_t>(max_elements_to_read); + return MOJO_RESULT_OK; +} + +MojoResult LocalDataPipe::ConsumerEndReadDataImplNoLock( + uint32_t num_elements_read) { + if (num_elements_read > two_phase_max_elements_read_) { + // Note: The two-phase read ends here even on failure. + two_phase_max_elements_read_ = 0; // For safety. + return MOJO_RESULT_INVALID_ARGUMENT; + } + + buffer_first_element_index_ += num_elements_read; + DCHECK_LE(buffer_first_element_index_, capacity_num_elements()); + buffer_first_element_index_ %= capacity_num_elements(); + DCHECK_LE(num_elements_read, buffer_current_num_elements_); + buffer_current_num_elements_ -= num_elements_read; + two_phase_max_elements_read_ = 0; // For safety. + return MOJO_RESULT_OK; +} + +MojoWaitFlags LocalDataPipe::ConsumerSatisfiedFlagsNoLock() { + MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE; + if (buffer_current_num_elements_ > 0) + rv |= MOJO_WAIT_FLAG_READABLE; + return rv; +} + +MojoWaitFlags LocalDataPipe::ConsumerSatisfiableFlagsNoLock() { + MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE; + if (buffer_current_num_elements_ > 0 || producer_open_) + rv |= MOJO_WAIT_FLAG_READABLE; + return rv; +} + +void LocalDataPipe::EnsureBufferNoLock() { + DCHECK(producer_open_); + if (buffer_.get()) + return; + buffer_.reset(static_cast<char*>( + base::AlignedAlloc(static_cast<size_t>(capacity_num_elements()) * + element_size(), + kDataPipeBufferAlignmentBytes))); +} + +size_t LocalDataPipe::GetMaxElementsToWriteNoLock() { + size_t next_index = buffer_first_element_index_ + + buffer_current_num_elements_; + if (next_index >= capacity_num_elements()) { + next_index %= capacity_num_elements(); + DCHECK_GE(buffer_first_element_index_, next_index); + return buffer_first_element_index_ - next_index; + } + return capacity_num_elements() - next_index; +} + +size_t LocalDataPipe::GetMaxElementsToReadNoLock() { + if (buffer_first_element_index_ + buffer_current_num_elements_ > + capacity_num_elements()) + return capacity_num_elements() - buffer_first_element_index_; + return buffer_current_num_elements_; +} + +} // namespace system +} // namespace mojo diff --git a/mojo/system/local_data_pipe.h b/mojo/system/local_data_pipe.h new file mode 100644 index 0000000..0c9e4b6 --- /dev/null +++ b/mojo/system/local_data_pipe.h @@ -0,0 +1,79 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_SYSTEM_LOCAL_DATA_PIPE_H_ +#define MOJO_SYSTEM_LOCAL_DATA_PIPE_H_ + +#include "base/basictypes.h" +#include "base/memory/aligned_memory.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "mojo/system/data_pipe.h" +#include "mojo/system/system_impl_export.h" + +namespace mojo { +namespace system { + +// |LocalDataPipe| is a subclass that "implements" |DataPipe| for data pipes +// whose producer and consumer are both local. This class is thread-safe (with +// protection provided by |DataPipe|'s |lock_|. +class MOJO_SYSTEM_IMPL_EXPORT LocalDataPipe : public DataPipe { + public: + LocalDataPipe(); + + MojoResult Init(const MojoCreateDataPipeOptions* options); + + private: + friend class base::RefCountedThreadSafe<LocalDataPipe>; + virtual ~LocalDataPipe(); + + // |DataPipe| implementation: + virtual void ProducerCloseImplNoLock() OVERRIDE; + virtual MojoResult ProducerBeginWriteDataImplNoLock( + void** buffer, + uint32_t* buffer_num_elements, + MojoWriteDataFlags flags) OVERRIDE; + virtual MojoResult ProducerEndWriteDataImplNoLock( + uint32_t num_elements_written) OVERRIDE; + virtual MojoWaitFlags ProducerSatisfiedFlagsNoLock() OVERRIDE; + virtual MojoWaitFlags ProducerSatisfiableFlagsNoLock() OVERRIDE; + virtual void ConsumerCloseImplNoLock() OVERRIDE; + virtual MojoResult ConsumerDiscardDataNoLock(uint32_t* num_elements, + bool all_or_none) OVERRIDE; + virtual MojoResult ConsumerQueryDataNoLock(uint32_t* num_elements) OVERRIDE; + virtual MojoResult ConsumerBeginReadDataImplNoLock( + const void** buffer, + uint32_t* buffer_num_elements, + MojoReadDataFlags flags) OVERRIDE; + virtual MojoResult ConsumerEndReadDataImplNoLock( + uint32_t num_elements_read) OVERRIDE; + virtual MojoWaitFlags ConsumerSatisfiedFlagsNoLock() OVERRIDE; + virtual MojoWaitFlags ConsumerSatisfiableFlagsNoLock() OVERRIDE; + + void EnsureBufferNoLock(); + + // Get the maximum (single) write/read size right now (in number of elements); + // result fits in a |uint32_t|. + size_t GetMaxElementsToWriteNoLock(); + size_t GetMaxElementsToReadNoLock(); + + // The members below are protected by |DataPipe|'s |lock_|: + bool producer_open_; + bool consumer_open_; + + scoped_ptr_malloc<char, base::ScopedPtrAlignedFree> buffer_; + // Circular buffer. + size_t buffer_first_element_index_; + size_t buffer_current_num_elements_; + + uint32_t two_phase_max_elements_written_; + uint32_t two_phase_max_elements_read_; + + DISALLOW_COPY_AND_ASSIGN(LocalDataPipe); +}; + +} // namespace system +} // namespace mojo + +#endif // MOJO_SYSTEM_LOCAL_DATA_PIPE_H_ |