summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-16 20:39:30 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-16 20:39:30 +0000
commit00798015ad24428d303b788b9cc6ae77214da9ba (patch)
treec9fb6a8222c4e97c3e082544ac165c2f3b82828e /mojo
parent4cb7fdaeb4a0fb53181b0285755ef022654d36bd (diff)
downloadchromium_src-00798015ad24428d303b788b9cc6ae77214da9ba.zip
chromium_src-00798015ad24428d303b788b9cc6ae77214da9ba.tar.gz
chromium_src-00798015ad24428d303b788b9cc6ae77214da9ba.tar.bz2
Mojo: "Finish" local (in-process) implementation of data pipes.
Still to do: - Write tests. - Fix bug in MojoReadData()/MojoWriteData() (see data_pipe.cc). - Write comments in core.h. R=darin@chromium.org Review URL: https://codereview.chromium.org/113603003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@241015 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/mojo.gyp2
-rw-r--r--mojo/public/system/core.h7
-rw-r--r--mojo/system/core_impl.cc5
-rw-r--r--mojo/system/data_pipe.cc103
-rw-r--r--mojo/system/data_pipe.h25
-rw-r--r--mojo/system/limits.h17
-rw-r--r--mojo/system/local_data_pipe.cc226
-rw-r--r--mojo/system/local_data_pipe.h79
8 files changed, 446 insertions, 18 deletions
diff --git a/mojo/mojo.gyp b/mojo/mojo.gyp
index b829011..57d8240 100644
--- a/mojo/mojo.gyp
+++ b/mojo/mojo.gyp
@@ -87,6 +87,8 @@
'system/dispatcher.cc',
'system/dispatcher.h',
'system/limits.h',
+ 'system/local_data_pipe.cc',
+ 'system/local_data_pipe.h',
'system/local_message_pipe_endpoint.cc',
'system/local_message_pipe_endpoint.h',
'system/memory.cc',
diff --git a/mojo/public/system/core.h b/mojo/public/system/core.h
index 7e819fff..097d8fb 100644
--- a/mojo/public/system/core.h
+++ b/mojo/public/system/core.h
@@ -200,6 +200,9 @@ const MojoReadMessageFlags MOJO_READ_MESSAGE_FLAG_MAY_DISCARD = 1 << 0;
// |MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD|: May discard data for
// whatever reason; best-effort delivery. In particular, if the capacity
// is reached, old data may be discard to make room for new data.
+//
+// |element_size * capacity_num_elements| must be less than 2^32 (i.e., it must
+// fit into a 32-bit unsigned integer).
// TODO(vtl): Finish this.
typedef uint32_t MojoCreateDataPipeOptionsFlags;
@@ -218,8 +221,8 @@ const MojoCreateDataPipeOptionsFlags
struct MojoCreateDataPipeOptions {
size_t struct_size; // Set to the size of this structure.
MojoCreateDataPipeOptionsFlags flags;
- uint32_t element_size;
- uint32_t capacity_num_elements;
+ uint32_t element_size; // Must be nonzero.
+ uint32_t capacity_num_elements; // Zero means "default"/automatic.
};
// |MojoWriteDataFlags|: Used to specify different modes to |MojoWriteData()|
diff --git a/mojo/system/core_impl.cc b/mojo/system/core_impl.cc
index c946a25..b8a35ee 100644
--- a/mojo/system/core_impl.cc
+++ b/mojo/system/core_impl.cc
@@ -12,6 +12,7 @@
#include "mojo/system/data_pipe_producer_dispatcher.h"
#include "mojo/system/dispatcher.h"
#include "mojo/system/limits.h"
+#include "mojo/system/local_data_pipe.h"
#include "mojo/system/memory.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/message_pipe_dispatcher.h"
@@ -361,7 +362,6 @@ MojoResult CoreImpl::CreateDataPipe(const MojoCreateDataPipeOptions* options,
if (!VerifyUserPointer<MojoHandle>(data_pipe_consumer_handle, 1))
return MOJO_RESULT_INVALID_ARGUMENT;
-/* TODO(vtl): The rest of the code will look something like this:
scoped_refptr<LocalDataPipe> data_pipe(new LocalDataPipe());
MojoResult result = data_pipe->Init(options);
if (result != MOJO_RESULT_OK)
@@ -393,9 +393,6 @@ MojoResult CoreImpl::CreateDataPipe(const MojoCreateDataPipeOptions* options,
*data_pipe_producer_handle = producer_handle;
*data_pipe_consumer_handle = consumer_handle;
return MOJO_RESULT_OK;
-*/
- NOTIMPLEMENTED();
- return MOJO_RESULT_UNIMPLEMENTED;
}
MojoResult CoreImpl::WriteData(MojoHandle data_pipe_producer_handle,
diff --git a/mojo/system/data_pipe.cc b/mojo/system/data_pipe.cc
index 0f8cb9f..f48210f 100644
--- a/mojo/system/data_pipe.cc
+++ b/mojo/system/data_pipe.cc
@@ -7,8 +7,10 @@
#include <string.h>
#include <algorithm>
+#include <limits>
#include "base/logging.h"
+#include "mojo/system/limits.h"
#include "mojo/system/memory.h"
#include "mojo/system/waiter_list.h"
@@ -34,6 +36,11 @@ MojoResult DataPipe::ProducerWriteData(const void* elements,
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
+ if (producer_in_two_phase_write_)
+ return MOJO_RESULT_BUSY;
+
+ // TODO(vtl): This implementation may write less than requested, even if room
+ // is available. Fix this. (Probably make a subclass-specific impl.)
void* buffer = NULL;
uint32_t buffer_num_elements = *num_elements;
MojoResult rv = ProducerBeginWriteDataImplNoLock(&buffer,
@@ -58,13 +65,30 @@ MojoResult DataPipe::ProducerBeginWriteData(void** buffer,
MojoWriteDataFlags flags) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
- return ProducerBeginWriteDataImplNoLock(buffer, buffer_num_elements, flags);
+
+ if (producer_in_two_phase_write_)
+ return MOJO_RESULT_BUSY;
+
+ MojoResult rv = ProducerBeginWriteDataImplNoLock(buffer,
+ buffer_num_elements,
+ flags);
+ if (rv != MOJO_RESULT_OK)
+ return rv;
+
+ producer_in_two_phase_write_ = true;
+ return MOJO_RESULT_OK;
}
MojoResult DataPipe::ProducerEndWriteData(uint32_t num_elements_written) {
base::AutoLock locker(lock_);
DCHECK(has_local_producer_no_lock());
- return ProducerEndWriteDataImplNoLock(num_elements_written);
+
+ if (!producer_in_two_phase_write_)
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ MojoResult rv = ProducerEndWriteDataImplNoLock(num_elements_written);
+ producer_in_two_phase_write_ = false; // End two-phase write even on failure.
+ return rv;
}
MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
@@ -107,6 +131,9 @@ MojoResult DataPipe::ConsumerReadData(void* elements,
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
+ if (consumer_in_two_phase_read_)
+ return MOJO_RESULT_BUSY;
+
if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
return ConsumerDiscardDataNoLock(num_elements,
(flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE));
@@ -114,6 +141,8 @@ MojoResult DataPipe::ConsumerReadData(void* elements,
if ((flags & MOJO_READ_DATA_FLAG_QUERY))
return ConsumerQueryDataNoLock(num_elements);
+ // TODO(vtl): This implementation may write less than requested, even if room
+ // is available. Fix this. (Probably make a subclass-specific impl.)
const void* buffer = NULL;
uint32_t buffer_num_elements = 0;
MojoResult rv = ConsumerBeginReadDataImplNoLock(&buffer,
@@ -138,13 +167,30 @@ MojoResult DataPipe::ConsumerBeginReadData(const void** buffer,
MojoReadDataFlags flags) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
- return ConsumerBeginReadDataImplNoLock(buffer, buffer_num_elements, flags);
+
+ if (consumer_in_two_phase_read_)
+ return MOJO_RESULT_BUSY;
+
+ MojoResult rv = ConsumerBeginReadDataImplNoLock(buffer,
+ buffer_num_elements,
+ flags);
+ if (rv != MOJO_RESULT_OK)
+ return rv;
+
+ consumer_in_two_phase_read_ = true;
+ return MOJO_RESULT_OK;
}
MojoResult DataPipe::ConsumerEndReadData(uint32_t num_elements_read) {
base::AutoLock locker(lock_);
DCHECK(has_local_consumer_no_lock());
- return ConsumerEndReadDataImplNoLock(num_elements_read);
+
+ if (!consumer_in_two_phase_read_)
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ MojoResult rv = ConsumerEndReadDataImplNoLock(num_elements_read);
+ consumer_in_two_phase_read_ = false; // End two-phase read even on failure.
+ return rv;
}
MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
@@ -169,8 +215,11 @@ void DataPipe::ConsumerRemoveWaiter(Waiter* waiter) {
}
DataPipe::DataPipe(bool has_local_producer, bool has_local_consumer)
- : producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
- consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL) {
+ : element_size_(0),
+ producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
+ consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
+ producer_in_two_phase_write_(false),
+ consumer_in_two_phase_read_(false) {
DCHECK(has_local_producer || has_local_consumer);
}
@@ -179,5 +228,47 @@ DataPipe::~DataPipe() {
DCHECK(!has_local_consumer_no_lock());
}
+MojoResult DataPipe::Init(bool may_discard,
+ size_t element_size,
+ size_t capacity_num_elements) {
+ // No need to lock: This method is not thread-safe.
+
+ if (element_size == 0)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (!capacity_num_elements) {
+ // Set the capacity to the default (rounded down by element size, but always
+ // at least one element).
+ capacity_num_elements =
+ std::max(static_cast<size_t>(1),
+ kDefaultDataPipeCapacityBytes / element_size);
+ }
+ if (capacity_num_elements >
+ std::numeric_limits<uint32_t>::max() / element_size)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ if (capacity_num_elements * element_size > kMaxDataPipeCapacityBytes)
+ return MOJO_RESULT_RESOURCE_EXHAUSTED;
+
+ may_discard_ = may_discard;
+ element_size_ = element_size;
+ capacity_num_elements_ = capacity_num_elements;
+ return MOJO_RESULT_OK;
+}
+
+void DataPipe::AwakeProducerWaitersForStateChangeNoLock() {
+ lock_.AssertAcquired();
+ if (!has_local_producer_no_lock())
+ return;
+ producer_waiter_list_->AwakeWaitersForStateChange(
+ ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock());
+}
+
+void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() {
+ lock_.AssertAcquired();
+ if (!has_local_consumer_no_lock())
+ return;
+ consumer_waiter_list_->AwakeWaitersForStateChange(
+ ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock());
+}
+
} // namespace system
} // namespace mojo
diff --git a/mojo/system/data_pipe.h b/mojo/system/data_pipe.h
index 2196671..3138435 100644
--- a/mojo/system/data_pipe.h
+++ b/mojo/system/data_pipe.h
@@ -63,13 +63,25 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
MojoResult wake_result);
void ConsumerRemoveWaiter(Waiter* waiter);
- // Thread-safe and fast (doesn't take the lock).
+ // Thread-safe and fast (they don't take the lock):
+ bool may_discard() const { return may_discard_; }
size_t element_size() const { return element_size_; }
+ size_t capacity_num_elements() const { return capacity_num_elements_; }
protected:
DataPipe(bool has_local_producer, bool has_local_consumer);
- void Init(size_t element_size);
+ friend class base::RefCountedThreadSafe<DataPipe>;
+ virtual ~DataPipe();
+
+ // Not thread-safe; must be called before any other methods are called. This
+ // object is only usable on success.
+ MojoResult Init(bool may_discard,
+ size_t element_size,
+ size_t capacity_num_elements);
+
+ void AwakeProducerWaitersForStateChangeNoLock();
+ void AwakeConsumerWaitersForStateChangeNoLock();
virtual void ProducerCloseImplNoLock() = 0;
virtual MojoResult ProducerBeginWriteDataImplNoLock(
@@ -95,9 +107,6 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
virtual MojoWaitFlags ConsumerSatisfiableFlagsNoLock() = 0;
private:
- friend class base::RefCountedThreadSafe<DataPipe>;
- virtual ~DataPipe();
-
bool has_local_producer_no_lock() const {
return !!producer_waiter_list_.get();
}
@@ -105,12 +114,16 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe :
return !!consumer_waiter_list_.get();
}
- // Set by |Init()| and never changed afterwards.
+ // Set by |Init()| and never changed afterwards:
+ bool may_discard_;
size_t element_size_;
+ size_t capacity_num_elements_;
base::Lock lock_; // Protects the following members.
scoped_ptr<WaiterList> producer_waiter_list_;
scoped_ptr<WaiterList> consumer_waiter_list_;
+ bool producer_in_two_phase_write_;
+ bool consumer_in_two_phase_read_;
DISALLOW_COPY_AND_ASSIGN(DataPipe);
};
diff --git a/mojo/system/limits.h b/mojo/system/limits.h
index 277c23a..6b1f9bc 100644
--- a/mojo/system/limits.h
+++ b/mojo/system/limits.h
@@ -2,9 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+// TODO(vtl): Rename this file to constants.h.
+
#ifndef MOJO_SYSTEM_LIMITS_H_
#define MOJO_SYSTEM_LIMITS_H_
+#include <stddef.h>
+
namespace mojo {
namespace system {
@@ -19,6 +23,19 @@ const size_t kMaxMessageNumBytes = 4 * 1024 * 1024;
const size_t kMaxMessageNumHandles = 10000;
+// Maximum capacity of a data pipe, in bytes. This value must fit into a
+// |uint32_t|.
+// WARNING: If you bump it closer to 2^32, you must audit all the code to check
+// that we don't overflow (2^31 would definitely be risky; up to 2^30 is
+// probably okay).
+const size_t kMaxDataPipeCapacityBytes = 256 * 1024 * 1024; // 256 MB.
+
+const size_t kDefaultDataPipeCapacityBytes = 1024 * 1024; // 1 MB.
+
+// Alignment for the "start" of the data buffer used by data pipes. (The
+// alignment of elements will depend on this and the element size.)
+const size_t kDataPipeBufferAlignmentBytes = 16;
+
} // namespace system
} // namespace mojo
diff --git a/mojo/system/local_data_pipe.cc b/mojo/system/local_data_pipe.cc
new file mode 100644
index 0000000..d970102
--- /dev/null
+++ b/mojo/system/local_data_pipe.cc
@@ -0,0 +1,226 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// TODO(vtl): I currently potentially overflow in doing index calculations.
+// E.g., |buffer_first_element_index_| and |buffer_current_num_elements_| fit
+// into a |uint32_t|, but their sum may not. This is bad and poses a security
+// risk. (We're currently saved by the limit on capacity -- the maximum size of
+// the buffer, checked in |DataPipe::Init()|, is currently sufficiently small.
+
+#include "mojo/system/local_data_pipe.h"
+
+#include <algorithm>
+
+#include "base/logging.h"
+#include "mojo/system/limits.h"
+
+namespace mojo {
+namespace system {
+
+LocalDataPipe::LocalDataPipe()
+ : DataPipe(true, true),
+ producer_open_(true),
+ consumer_open_(true),
+ buffer_first_element_index_(0),
+ buffer_current_num_elements_(0),
+ two_phase_max_elements_written_(0),
+ two_phase_max_elements_read_(0) {
+}
+
+MojoResult LocalDataPipe::Init(const MojoCreateDataPipeOptions* options) {
+ static const MojoCreateDataPipeOptions kDefaultOptions = {
+ sizeof(MojoCreateDataPipeOptions), // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_size|.
+ static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)
+ };
+ if (!options)
+ options = &kDefaultOptions;
+
+ if (options->struct_size < sizeof(*options))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ // Note: lazily allocate memory, since a common case will be that one of the
+ // handles is immediately passed off to another process.
+ return DataPipe::Init(
+ (options->flags & MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD),
+ static_cast<size_t>(options->element_size),
+ static_cast<size_t>(options->capacity_num_elements));
+}
+
+LocalDataPipe::~LocalDataPipe() {
+ DCHECK(!producer_open_);
+ DCHECK(!consumer_open_);
+}
+
+void LocalDataPipe::ProducerCloseImplNoLock() {
+ DCHECK(producer_open_);
+ producer_open_ = false;
+ if (!buffer_current_num_elements_) {
+ buffer_.reset();
+ buffer_current_num_elements_ = 0;
+ }
+ AwakeConsumerWaitersForStateChangeNoLock();
+}
+
+MojoResult LocalDataPipe::ProducerBeginWriteDataImplNoLock(
+ void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoWriteDataFlags flags) {
+ size_t max_elements_to_write = GetMaxElementsToWriteNoLock();
+ // TODO(vtl): Consider this return value.
+ if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) &&
+ *buffer_num_elements < max_elements_to_write)
+ return MOJO_RESULT_OUT_OF_RANGE;
+
+ size_t next_index = (buffer_first_element_index_ +
+ buffer_current_num_elements_) % capacity_num_elements();
+ EnsureBufferNoLock();
+ *buffer = buffer_.get() + next_index * element_size();
+ *buffer_num_elements = static_cast<uint32_t>(max_elements_to_write);
+ two_phase_max_elements_written_ =
+ static_cast<uint32_t>(max_elements_to_write);
+ return MOJO_RESULT_OK;
+}
+
+MojoResult LocalDataPipe::ProducerEndWriteDataImplNoLock(
+ uint32_t num_elements_written) {
+ if (num_elements_written > two_phase_max_elements_written_) {
+ // Note: The two-phase write ends here even on failure.
+ two_phase_max_elements_written_ = 0; // For safety.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+
+ buffer_current_num_elements_ += num_elements_written;
+ DCHECK_LE(buffer_current_num_elements_, capacity_num_elements());
+ two_phase_max_elements_written_ = 0; // For safety.
+ return MOJO_RESULT_OK;
+}
+
+MojoWaitFlags LocalDataPipe::ProducerSatisfiedFlagsNoLock() {
+ MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
+ if (consumer_open_ && buffer_current_num_elements_ < capacity_num_elements())
+ rv |= MOJO_WAIT_FLAG_WRITABLE;
+ return rv;
+}
+
+MojoWaitFlags LocalDataPipe::ProducerSatisfiableFlagsNoLock() {
+ MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
+ if (consumer_open_)
+ rv |= MOJO_WAIT_FLAG_WRITABLE;
+ return rv;
+}
+
+void LocalDataPipe::ConsumerCloseImplNoLock() {
+ DCHECK(consumer_open_);
+ consumer_open_ = false;
+ buffer_.reset();
+ buffer_current_num_elements_ = 0;
+ AwakeProducerWaitersForStateChangeNoLock();
+}
+
+MojoResult LocalDataPipe::ConsumerDiscardDataNoLock(uint32_t* num_elements,
+ bool all_or_none) {
+ // TODO(vtl): Think about the error code in this case.
+ if (all_or_none && *num_elements > buffer_current_num_elements_)
+ return MOJO_RESULT_OUT_OF_RANGE;
+
+ size_t num_elements_to_discard =
+ std::min(static_cast<size_t>(*num_elements),
+ buffer_current_num_elements_);
+ buffer_first_element_index_ =
+ (buffer_first_element_index_ + num_elements_to_discard) %
+ capacity_num_elements();
+ buffer_current_num_elements_ -= num_elements_to_discard;
+
+ AwakeProducerWaitersForStateChangeNoLock();
+ AwakeConsumerWaitersForStateChangeNoLock();
+
+ *num_elements = static_cast<uint32_t>(num_elements_to_discard);
+ return MOJO_RESULT_OK;
+}
+
+MojoResult LocalDataPipe::ConsumerQueryDataNoLock(uint32_t* num_elements) {
+ // Note: This cast is safe, since the capacity fits into a |uint32_t|.
+ *num_elements = static_cast<uint32_t>(buffer_current_num_elements_);
+ return MOJO_RESULT_OK;
+}
+
+MojoResult LocalDataPipe::ConsumerBeginReadDataImplNoLock(
+ const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags) {
+ size_t max_elements_to_read = GetMaxElementsToReadNoLock();
+ // TODO(vtl): Consider this return value.
+ if ((flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE) &&
+ *buffer_num_elements < max_elements_to_read)
+ return MOJO_RESULT_OUT_OF_RANGE;
+ // Note: This works even if |buffer_| is null.
+ *buffer = buffer_.get() + buffer_first_element_index_ * element_size();
+ *buffer_num_elements = static_cast<uint32_t>(max_elements_to_read);
+ two_phase_max_elements_read_ = static_cast<uint32_t>(max_elements_to_read);
+ return MOJO_RESULT_OK;
+}
+
+MojoResult LocalDataPipe::ConsumerEndReadDataImplNoLock(
+ uint32_t num_elements_read) {
+ if (num_elements_read > two_phase_max_elements_read_) {
+ // Note: The two-phase read ends here even on failure.
+ two_phase_max_elements_read_ = 0; // For safety.
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+
+ buffer_first_element_index_ += num_elements_read;
+ DCHECK_LE(buffer_first_element_index_, capacity_num_elements());
+ buffer_first_element_index_ %= capacity_num_elements();
+ DCHECK_LE(num_elements_read, buffer_current_num_elements_);
+ buffer_current_num_elements_ -= num_elements_read;
+ two_phase_max_elements_read_ = 0; // For safety.
+ return MOJO_RESULT_OK;
+}
+
+MojoWaitFlags LocalDataPipe::ConsumerSatisfiedFlagsNoLock() {
+ MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
+ if (buffer_current_num_elements_ > 0)
+ rv |= MOJO_WAIT_FLAG_READABLE;
+ return rv;
+}
+
+MojoWaitFlags LocalDataPipe::ConsumerSatisfiableFlagsNoLock() {
+ MojoWaitFlags rv = MOJO_WAIT_FLAG_NONE;
+ if (buffer_current_num_elements_ > 0 || producer_open_)
+ rv |= MOJO_WAIT_FLAG_READABLE;
+ return rv;
+}
+
+void LocalDataPipe::EnsureBufferNoLock() {
+ DCHECK(producer_open_);
+ if (buffer_.get())
+ return;
+ buffer_.reset(static_cast<char*>(
+ base::AlignedAlloc(static_cast<size_t>(capacity_num_elements()) *
+ element_size(),
+ kDataPipeBufferAlignmentBytes)));
+}
+
+size_t LocalDataPipe::GetMaxElementsToWriteNoLock() {
+ size_t next_index = buffer_first_element_index_ +
+ buffer_current_num_elements_;
+ if (next_index >= capacity_num_elements()) {
+ next_index %= capacity_num_elements();
+ DCHECK_GE(buffer_first_element_index_, next_index);
+ return buffer_first_element_index_ - next_index;
+ }
+ return capacity_num_elements() - next_index;
+}
+
+size_t LocalDataPipe::GetMaxElementsToReadNoLock() {
+ if (buffer_first_element_index_ + buffer_current_num_elements_ >
+ capacity_num_elements())
+ return capacity_num_elements() - buffer_first_element_index_;
+ return buffer_current_num_elements_;
+}
+
+} // namespace system
+} // namespace mojo
diff --git a/mojo/system/local_data_pipe.h b/mojo/system/local_data_pipe.h
new file mode 100644
index 0000000..0c9e4b6
--- /dev/null
+++ b/mojo/system/local_data_pipe.h
@@ -0,0 +1,79 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_SYSTEM_LOCAL_DATA_PIPE_H_
+#define MOJO_SYSTEM_LOCAL_DATA_PIPE_H_
+
+#include "base/basictypes.h"
+#include "base/memory/aligned_memory.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "mojo/system/data_pipe.h"
+#include "mojo/system/system_impl_export.h"
+
+namespace mojo {
+namespace system {
+
+// |LocalDataPipe| is a subclass that "implements" |DataPipe| for data pipes
+// whose producer and consumer are both local. This class is thread-safe (with
+// protection provided by |DataPipe|'s |lock_|.
+class MOJO_SYSTEM_IMPL_EXPORT LocalDataPipe : public DataPipe {
+ public:
+ LocalDataPipe();
+
+ MojoResult Init(const MojoCreateDataPipeOptions* options);
+
+ private:
+ friend class base::RefCountedThreadSafe<LocalDataPipe>;
+ virtual ~LocalDataPipe();
+
+ // |DataPipe| implementation:
+ virtual void ProducerCloseImplNoLock() OVERRIDE;
+ virtual MojoResult ProducerBeginWriteDataImplNoLock(
+ void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoWriteDataFlags flags) OVERRIDE;
+ virtual MojoResult ProducerEndWriteDataImplNoLock(
+ uint32_t num_elements_written) OVERRIDE;
+ virtual MojoWaitFlags ProducerSatisfiedFlagsNoLock() OVERRIDE;
+ virtual MojoWaitFlags ProducerSatisfiableFlagsNoLock() OVERRIDE;
+ virtual void ConsumerCloseImplNoLock() OVERRIDE;
+ virtual MojoResult ConsumerDiscardDataNoLock(uint32_t* num_elements,
+ bool all_or_none) OVERRIDE;
+ virtual MojoResult ConsumerQueryDataNoLock(uint32_t* num_elements) OVERRIDE;
+ virtual MojoResult ConsumerBeginReadDataImplNoLock(
+ const void** buffer,
+ uint32_t* buffer_num_elements,
+ MojoReadDataFlags flags) OVERRIDE;
+ virtual MojoResult ConsumerEndReadDataImplNoLock(
+ uint32_t num_elements_read) OVERRIDE;
+ virtual MojoWaitFlags ConsumerSatisfiedFlagsNoLock() OVERRIDE;
+ virtual MojoWaitFlags ConsumerSatisfiableFlagsNoLock() OVERRIDE;
+
+ void EnsureBufferNoLock();
+
+ // Get the maximum (single) write/read size right now (in number of elements);
+ // result fits in a |uint32_t|.
+ size_t GetMaxElementsToWriteNoLock();
+ size_t GetMaxElementsToReadNoLock();
+
+ // The members below are protected by |DataPipe|'s |lock_|:
+ bool producer_open_;
+ bool consumer_open_;
+
+ scoped_ptr_malloc<char, base::ScopedPtrAlignedFree> buffer_;
+ // Circular buffer.
+ size_t buffer_first_element_index_;
+ size_t buffer_current_num_elements_;
+
+ uint32_t two_phase_max_elements_written_;
+ uint32_t two_phase_max_elements_read_;
+
+ DISALLOW_COPY_AND_ASSIGN(LocalDataPipe);
+};
+
+} // namespace system
+} // namespace mojo
+
+#endif // MOJO_SYSTEM_LOCAL_DATA_PIPE_H_