summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-25 21:14:39 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-25 21:14:39 +0000
commit27198c59523168aad3644a32a3a8d0644696d088 (patch)
tree062ed04597c42d46f504bfd96c492b044cefbbcc /mojo
parent8aecd702877467893896ceea552f8503fa2d4c8a (diff)
downloadchromium_src-27198c59523168aad3644a32a3a8d0644696d088.zip
chromium_src-27198c59523168aad3644a32a3a8d0644696d088.tar.gz
chromium_src-27198c59523168aad3644a32a3a8d0644696d088.tar.bz2
Mojo: Get rid of LocalMessagePipeEndpoint::MessageQueueEntry.
Instead, add the ability to attach dispatchers directly to a MessageInTransit, and just use a queue of MessageInTransits. R=yzshen@chromium.org Review URL: https://codereview.chromium.org/177713007 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253249 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/system/local_message_pipe_endpoint.cc122
-rw-r--r--mojo/system/local_message_pipe_endpoint.h37
-rw-r--r--mojo/system/message_in_transit.cc25
-rw-r--r--mojo/system/message_in_transit.h30
4 files changed, 100 insertions, 114 deletions
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc
index a58e489..723f05b 100644
--- a/mojo/system/local_message_pipe_endpoint.cc
+++ b/mojo/system/local_message_pipe_endpoint.cc
@@ -7,70 +7,13 @@
#include <string.h>
#include "base/logging.h"
+#include "base/stl_util.h"
#include "mojo/system/dispatcher.h"
#include "mojo/system/message_in_transit.h"
namespace mojo {
namespace system {
-LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
- : message_(NULL) {
-}
-
-// See comment in header file.
-LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
- const MessageQueueEntry& other)
- : message_(NULL) {
- DCHECK(!other.message_);
- DCHECK(other.dispatchers_.empty());
-}
-
-LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
- if (message_)
- delete message_;
- // Close all the dispatchers.
- for (size_t i = 0; i < dispatchers_.size(); i++) {
- if (!dispatchers_[i].get())
- continue;
-
- // Note: Taking the |Dispatcher| locks is okay, since no one else should
- // have a reference to the dispatchers (and the locks shouldn't be held).
- DCHECK(dispatchers_[i]->HasOneRef());
- dispatchers_[i]->Close();
- }
-}
-
-void LocalMessagePipeEndpoint::MessageQueueEntry::Init(
- scoped_ptr<MessageInTransit> message,
- std::vector<DispatcherTransport>* transports) {
- DCHECK(message.get());
- DCHECK(!transports || !transports->empty());
- DCHECK(!message_);
- DCHECK(dispatchers_.empty());
-
- message_ = message.release();
- if (transports) {
- dispatchers_.reserve(transports->size());
- for (size_t i = 0; i < transports->size(); i++) {
- if ((*transports)[i].is_valid()) {
- dispatchers_.push_back(
- (*transports)[i].CreateEquivalentDispatcherAndClose());
-
-#ifndef NDEBUG
- // It's important that we have "ownership" of these dispatchers. In
- // particular, they must not be in the global handle table (i.e., have
- // live handles referring to them). If we need to destroy any queued
- // messages, we need to know that any handles in them should be closed.
- DCHECK(dispatchers_[i]->HasOneRef());
-#endif
- } else {
- LOG(WARNING) << "Enqueueing null dispatcher";
- dispatchers_.push_back(scoped_refptr<Dispatcher>());
- }
- }
- }
-}
-
LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
: is_open_(true),
is_peer_open_(true) {
@@ -78,12 +21,14 @@ LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
DCHECK(!is_open_);
+ DCHECK(message_queue_.empty()); // Should be implied by not being open.
}
void LocalMessagePipeEndpoint::Close() {
DCHECK(is_open_);
is_open_ = false;
- message_queue_.clear();
+
+ STLDeleteElements(&message_queue_);
}
void LocalMessagePipeEndpoint::OnPeerClose() {
@@ -110,11 +55,25 @@ MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
DCHECK(is_peer_open_);
DCHECK(!transports || !transports->empty());
+ // "Move" the dispatchers
+ if (transports) {
+ scoped_ptr<std::vector<scoped_refptr<Dispatcher> > >
+ dispatchers(new std::vector<scoped_refptr<Dispatcher> >());
+ dispatchers->reserve(transports->size());
+ for (size_t i = 0; i < transports->size(); i++) {
+ if ((*transports)[i].is_valid()) {
+ dispatchers->push_back(
+ (*transports)[i].CreateEquivalentDispatcherAndClose());
+ } else {
+ LOG(WARNING) << "Enqueueing null dispatcher";
+ dispatchers->push_back(scoped_refptr<Dispatcher>());
+ }
+ }
+ message->SetDispatchers(dispatchers.Pass());
+ }
+
bool was_empty = message_queue_.empty();
- // TODO(vtl): Use |emplace_back()| (and a suitable constructor, instead of
- // |Init()|) when that becomes available.
- message_queue_.push_back(MessageQueueEntry());
- message_queue_.back().Init(message.Pass(), transports);
+ message_queue_.push_back(message.release());
if (was_empty) {
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
SatisfiableFlags());
@@ -147,30 +106,35 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage(
// TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
// and release the lock immediately.
bool enough_space = true;
- const MessageInTransit* queued_message = message_queue_.front().message();
+ MessageInTransit* message = message_queue_.front();
if (num_bytes)
- *num_bytes = queued_message->num_bytes();
- if (queued_message->num_bytes() <= max_bytes)
- memcpy(bytes, queued_message->bytes(), queued_message->num_bytes());
+ *num_bytes = message->num_bytes();
+ if (message->num_bytes() <= max_bytes)
+ memcpy(bytes, message->bytes(), message->num_bytes());
else
enough_space = false;
- std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
- message_queue_.front().dispatchers();
- if (num_dispatchers)
- *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
- if (enough_space) {
- if (queued_dispatchers->empty()) {
- // Nothing to do.
- } else if (queued_dispatchers->size() <= max_num_dispatchers) {
- DCHECK(dispatchers);
- dispatchers->swap(*queued_dispatchers);
- } else {
- enough_space = false;
+ if (std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
+ message->dispatchers()) {
+ if (num_dispatchers)
+ *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
+ if (enough_space) {
+ if (queued_dispatchers->empty()) {
+ // Nothing to do.
+ } else if (queued_dispatchers->size() <= max_num_dispatchers) {
+ DCHECK(dispatchers);
+ dispatchers->swap(*queued_dispatchers);
+ } else {
+ enough_space = false;
+ }
}
+ } else {
+ if (num_dispatchers)
+ *num_dispatchers = 0;
}
if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
+ delete message;
message_queue_.pop_front();
// Now it's empty, thus no longer readable.
diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h
index 98a665e..45dbe2a 100644
--- a/mojo/system/local_message_pipe_endpoint.h
+++ b/mojo/system/local_message_pipe_endpoint.h
@@ -45,45 +45,16 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
virtual void RemoveWaiter(Waiter* waiter) OVERRIDE;
private:
- class MessageQueueEntry {
- public:
- MessageQueueEntry();
- // Provide an explicit copy constructor, so that we can use this directly in
- // a (C++03) STL container. However, we only allow the case where |other| is
- // empty. (We don't provide a nontrivial constructor, because it wouldn't be
- // useful with these constraints. This will change with C++11.)
- MessageQueueEntry(const MessageQueueEntry& other);
- ~MessageQueueEntry();
-
- // Initialize, creating equivalent "duplicate" dispatchers. |transports|
- // should be non-null only if nonempty.
- // TODO(vtl): This would simply be a constructor, but we don't have C++11's
- // emplace operations yet, and I don't want to copy |dispatchers_|.
- void Init(scoped_ptr<MessageInTransit> message,
- std::vector<DispatcherTransport>* transports);
-
- MessageInTransit* message() {
- return message_;
- }
- std::vector<scoped_refptr<Dispatcher> >* dispatchers() {
- return &dispatchers_;
- }
-
- private:
- MessageInTransit* message_;
- std::vector<scoped_refptr<Dispatcher> > dispatchers_;
-
- // We don't need assignment, however.
- DISALLOW_ASSIGN(MessageQueueEntry);
- };
-
MojoWaitFlags SatisfiedFlags();
MojoWaitFlags SatisfiableFlags();
bool is_open_;
bool is_peer_open_;
- std::deque<MessageQueueEntry> message_queue_;
+ // Queue of incoming messages; owns its entries.
+ // TODO(vtl): When C++11 is available, switch this to a deque of
+ // |scoped_ptr|s.
+ std::deque<MessageInTransit*> message_queue_;
WaiterList waiter_list_;
DISALLOW_COPY_AND_ASSIGN(LocalMessagePipeEndpoint);
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index 80ebf08..3b7f315 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -78,6 +78,7 @@ MessageInTransit::MessageInTransit(OwnedBuffer,
: owns_main_buffer_(true),
main_buffer_size_(other.main_buffer_size()),
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)) {
+ DCHECK(!other.dispatchers_.get());
DCHECK_GE(main_buffer_size_, sizeof(Header));
DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u);
@@ -111,6 +112,17 @@ MessageInTransit::~MessageInTransit() {
main_buffer_ = NULL;
#endif
}
+
+ if (dispatchers_.get()) {
+ for (size_t i = 0; i < dispatchers_->size(); i++) {
+ if (!(*dispatchers_)[i])
+ continue;
+
+ DCHECK((*dispatchers_)[i]->HasOneRef());
+ (*dispatchers_)[i]->Close();
+ }
+ dispatchers_.reset();
+ }
}
// static
@@ -131,5 +143,18 @@ bool MessageInTransit::GetNextMessageSize(const void* buffer,
return true;
}
+void MessageInTransit::SetDispatchers(
+ scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers) {
+ DCHECK(dispatchers.get());
+ DCHECK(owns_main_buffer_);
+ DCHECK(!dispatchers_.get());
+
+ dispatchers_ = dispatchers.Pass();
+#ifndef NDEBUG
+ for (size_t i = 0; i < dispatchers_->size(); i++)
+ DCHECK(!(*dispatchers_)[i] || (*dispatchers_)[i]->HasOneRef());
+#endif
+}
+
} // namespace system
} // namespace mojo
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 0cec0ae..eda0963 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -7,7 +7,11 @@
#include <stdint.h>
+#include <vector>
+
#include "base/macros.h"
+#include "base/memory/scoped_ptr.h"
+#include "mojo/system/dispatcher.h"
#include "mojo/system/system_impl_export.h"
namespace mojo {
@@ -47,8 +51,9 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
uint32_t num_bytes,
uint32_t num_handles,
const void* bytes);
- // "Copy" constructor. The input |MessageInTransit| may own its buffer or not.
- // The constructed |MessageInTransit| will own its buffer.
+ // "Copy" constructor. The input |MessageInTransit| may own its buffer or not;
+ // however, it must not have any dispatchers. The constructed
+ // |MessageInTransit| will own its buffer.
MessageInTransit(OwnedBuffer,
const MessageInTransit& other);
@@ -79,6 +84,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
size_t buffer_size,
size_t* next_message_size);
+ // Makes this message "own" the given set of dispatchers. The dispatchers must
+ // not be referenced from anywhere else (in particular, not from the handle
+ // table), i.e., each dispatcher must have a reference count of 1. This
+ // message must also own its main buffer and not already have dispatchers.
+ void SetDispatchers(
+ scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers);
+
// Gets the "main" buffer for a |MessageInTransit|. A |MessageInTransit| can
// be serialized by writing the main buffer. The returned pointer will be
// aligned to a multiple of |kMessageAlignment| bytes, and the size of the
@@ -120,6 +132,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
header()->destination_id = destination_id;
}
+ // Gets the dispatchers attached to this message; this may return null if
+ // there are none. Note that the caller may mutate the set of dispatchers
+ // (e.g., take ownership of all the dispatchers, leaving the vector empty).
+ std::vector<scoped_refptr<Dispatcher> >* dispatchers() {
+ return dispatchers_.get();
+ }
+
// TODO(vtl): Add whatever's necessary to transport handles.
// Rounds |n| up to a multiple of |kMessageAlignment|.
@@ -158,6 +177,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
size_t main_buffer_size_;
void* main_buffer_;
+ // Any dispatchers that may be attached to this message. This is only
+ // supported if this message owns its main buffer. These dispatchers should be
+ // "owned" by this message, i.e., have a ref count of exactly 1. (We allow a
+ // dispatcher entry to be null, in case it couldn't be duplicated for some
+ // reason.)
+ scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers_;
+
DISALLOW_COPY_AND_ASSIGN(MessageInTransit);
};