diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-25 21:14:39 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-25 21:14:39 +0000 |
commit | 27198c59523168aad3644a32a3a8d0644696d088 (patch) | |
tree | 062ed04597c42d46f504bfd96c492b044cefbbcc /mojo | |
parent | 8aecd702877467893896ceea552f8503fa2d4c8a (diff) | |
download | chromium_src-27198c59523168aad3644a32a3a8d0644696d088.zip chromium_src-27198c59523168aad3644a32a3a8d0644696d088.tar.gz chromium_src-27198c59523168aad3644a32a3a8d0644696d088.tar.bz2 |
Mojo: Get rid of LocalMessagePipeEndpoint::MessageQueueEntry.
Instead, add the ability to attach dispatchers directly to a
MessageInTransit, and just use a queue of MessageInTransits.
R=yzshen@chromium.org
Review URL: https://codereview.chromium.org/177713007
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253249 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/system/local_message_pipe_endpoint.cc | 122 | ||||
-rw-r--r-- | mojo/system/local_message_pipe_endpoint.h | 37 | ||||
-rw-r--r-- | mojo/system/message_in_transit.cc | 25 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 30 |
4 files changed, 100 insertions, 114 deletions
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc index a58e489..723f05b 100644 --- a/mojo/system/local_message_pipe_endpoint.cc +++ b/mojo/system/local_message_pipe_endpoint.cc @@ -7,70 +7,13 @@ #include <string.h> #include "base/logging.h" +#include "base/stl_util.h" #include "mojo/system/dispatcher.h" #include "mojo/system/message_in_transit.h" namespace mojo { namespace system { -LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry() - : message_(NULL) { -} - -// See comment in header file. -LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry( - const MessageQueueEntry& other) - : message_(NULL) { - DCHECK(!other.message_); - DCHECK(other.dispatchers_.empty()); -} - -LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() { - if (message_) - delete message_; - // Close all the dispatchers. - for (size_t i = 0; i < dispatchers_.size(); i++) { - if (!dispatchers_[i].get()) - continue; - - // Note: Taking the |Dispatcher| locks is okay, since no one else should - // have a reference to the dispatchers (and the locks shouldn't be held). - DCHECK(dispatchers_[i]->HasOneRef()); - dispatchers_[i]->Close(); - } -} - -void LocalMessagePipeEndpoint::MessageQueueEntry::Init( - scoped_ptr<MessageInTransit> message, - std::vector<DispatcherTransport>* transports) { - DCHECK(message.get()); - DCHECK(!transports || !transports->empty()); - DCHECK(!message_); - DCHECK(dispatchers_.empty()); - - message_ = message.release(); - if (transports) { - dispatchers_.reserve(transports->size()); - for (size_t i = 0; i < transports->size(); i++) { - if ((*transports)[i].is_valid()) { - dispatchers_.push_back( - (*transports)[i].CreateEquivalentDispatcherAndClose()); - -#ifndef NDEBUG - // It's important that we have "ownership" of these dispatchers. In - // particular, they must not be in the global handle table (i.e., have - // live handles referring to them). If we need to destroy any queued - // messages, we need to know that any handles in them should be closed. - DCHECK(dispatchers_[i]->HasOneRef()); -#endif - } else { - LOG(WARNING) << "Enqueueing null dispatcher"; - dispatchers_.push_back(scoped_refptr<Dispatcher>()); - } - } - } -} - LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() : is_open_(true), is_peer_open_(true) { @@ -78,12 +21,14 @@ LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { DCHECK(!is_open_); + DCHECK(message_queue_.empty()); // Should be implied by not being open. } void LocalMessagePipeEndpoint::Close() { DCHECK(is_open_); is_open_ = false; - message_queue_.clear(); + + STLDeleteElements(&message_queue_); } void LocalMessagePipeEndpoint::OnPeerClose() { @@ -110,11 +55,25 @@ MojoResult LocalMessagePipeEndpoint::EnqueueMessage( DCHECK(is_peer_open_); DCHECK(!transports || !transports->empty()); + // "Move" the dispatchers + if (transports) { + scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > + dispatchers(new std::vector<scoped_refptr<Dispatcher> >()); + dispatchers->reserve(transports->size()); + for (size_t i = 0; i < transports->size(); i++) { + if ((*transports)[i].is_valid()) { + dispatchers->push_back( + (*transports)[i].CreateEquivalentDispatcherAndClose()); + } else { + LOG(WARNING) << "Enqueueing null dispatcher"; + dispatchers->push_back(scoped_refptr<Dispatcher>()); + } + } + message->SetDispatchers(dispatchers.Pass()); + } + bool was_empty = message_queue_.empty(); - // TODO(vtl): Use |emplace_back()| (and a suitable constructor, instead of - // |Init()|) when that becomes available. - message_queue_.push_back(MessageQueueEntry()); - message_queue_.back().Init(message.Pass(), transports); + message_queue_.push_back(message.release()); if (was_empty) { waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), SatisfiableFlags()); @@ -147,30 +106,35 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage( // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop // and release the lock immediately. bool enough_space = true; - const MessageInTransit* queued_message = message_queue_.front().message(); + MessageInTransit* message = message_queue_.front(); if (num_bytes) - *num_bytes = queued_message->num_bytes(); - if (queued_message->num_bytes() <= max_bytes) - memcpy(bytes, queued_message->bytes(), queued_message->num_bytes()); + *num_bytes = message->num_bytes(); + if (message->num_bytes() <= max_bytes) + memcpy(bytes, message->bytes(), message->num_bytes()); else enough_space = false; - std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers = - message_queue_.front().dispatchers(); - if (num_dispatchers) - *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); - if (enough_space) { - if (queued_dispatchers->empty()) { - // Nothing to do. - } else if (queued_dispatchers->size() <= max_num_dispatchers) { - DCHECK(dispatchers); - dispatchers->swap(*queued_dispatchers); - } else { - enough_space = false; + if (std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers = + message->dispatchers()) { + if (num_dispatchers) + *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); + if (enough_space) { + if (queued_dispatchers->empty()) { + // Nothing to do. + } else if (queued_dispatchers->size() <= max_num_dispatchers) { + DCHECK(dispatchers); + dispatchers->swap(*queued_dispatchers); + } else { + enough_space = false; + } } + } else { + if (num_dispatchers) + *num_dispatchers = 0; } if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { + delete message; message_queue_.pop_front(); // Now it's empty, thus no longer readable. diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h index 98a665e..45dbe2a 100644 --- a/mojo/system/local_message_pipe_endpoint.h +++ b/mojo/system/local_message_pipe_endpoint.h @@ -45,45 +45,16 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint virtual void RemoveWaiter(Waiter* waiter) OVERRIDE; private: - class MessageQueueEntry { - public: - MessageQueueEntry(); - // Provide an explicit copy constructor, so that we can use this directly in - // a (C++03) STL container. However, we only allow the case where |other| is - // empty. (We don't provide a nontrivial constructor, because it wouldn't be - // useful with these constraints. This will change with C++11.) - MessageQueueEntry(const MessageQueueEntry& other); - ~MessageQueueEntry(); - - // Initialize, creating equivalent "duplicate" dispatchers. |transports| - // should be non-null only if nonempty. - // TODO(vtl): This would simply be a constructor, but we don't have C++11's - // emplace operations yet, and I don't want to copy |dispatchers_|. - void Init(scoped_ptr<MessageInTransit> message, - std::vector<DispatcherTransport>* transports); - - MessageInTransit* message() { - return message_; - } - std::vector<scoped_refptr<Dispatcher> >* dispatchers() { - return &dispatchers_; - } - - private: - MessageInTransit* message_; - std::vector<scoped_refptr<Dispatcher> > dispatchers_; - - // We don't need assignment, however. - DISALLOW_ASSIGN(MessageQueueEntry); - }; - MojoWaitFlags SatisfiedFlags(); MojoWaitFlags SatisfiableFlags(); bool is_open_; bool is_peer_open_; - std::deque<MessageQueueEntry> message_queue_; + // Queue of incoming messages; owns its entries. + // TODO(vtl): When C++11 is available, switch this to a deque of + // |scoped_ptr|s. + std::deque<MessageInTransit*> message_queue_; WaiterList waiter_list_; DISALLOW_COPY_AND_ASSIGN(LocalMessagePipeEndpoint); diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc index 80ebf08..3b7f315 100644 --- a/mojo/system/message_in_transit.cc +++ b/mojo/system/message_in_transit.cc @@ -78,6 +78,7 @@ MessageInTransit::MessageInTransit(OwnedBuffer, : owns_main_buffer_(true), main_buffer_size_(other.main_buffer_size()), main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)) { + DCHECK(!other.dispatchers_.get()); DCHECK_GE(main_buffer_size_, sizeof(Header)); DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u); @@ -111,6 +112,17 @@ MessageInTransit::~MessageInTransit() { main_buffer_ = NULL; #endif } + + if (dispatchers_.get()) { + for (size_t i = 0; i < dispatchers_->size(); i++) { + if (!(*dispatchers_)[i]) + continue; + + DCHECK((*dispatchers_)[i]->HasOneRef()); + (*dispatchers_)[i]->Close(); + } + dispatchers_.reset(); + } } // static @@ -131,5 +143,18 @@ bool MessageInTransit::GetNextMessageSize(const void* buffer, return true; } +void MessageInTransit::SetDispatchers( + scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers) { + DCHECK(dispatchers.get()); + DCHECK(owns_main_buffer_); + DCHECK(!dispatchers_.get()); + + dispatchers_ = dispatchers.Pass(); +#ifndef NDEBUG + for (size_t i = 0; i < dispatchers_->size(); i++) + DCHECK(!(*dispatchers_)[i] || (*dispatchers_)[i]->HasOneRef()); +#endif +} + } // namespace system } // namespace mojo diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index 0cec0ae..eda0963 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -7,7 +7,11 @@ #include <stdint.h> +#include <vector> + #include "base/macros.h" +#include "base/memory/scoped_ptr.h" +#include "mojo/system/dispatcher.h" #include "mojo/system/system_impl_export.h" namespace mojo { @@ -47,8 +51,9 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { uint32_t num_bytes, uint32_t num_handles, const void* bytes); - // "Copy" constructor. The input |MessageInTransit| may own its buffer or not. - // The constructed |MessageInTransit| will own its buffer. + // "Copy" constructor. The input |MessageInTransit| may own its buffer or not; + // however, it must not have any dispatchers. The constructed + // |MessageInTransit| will own its buffer. MessageInTransit(OwnedBuffer, const MessageInTransit& other); @@ -79,6 +84,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { size_t buffer_size, size_t* next_message_size); + // Makes this message "own" the given set of dispatchers. The dispatchers must + // not be referenced from anywhere else (in particular, not from the handle + // table), i.e., each dispatcher must have a reference count of 1. This + // message must also own its main buffer and not already have dispatchers. + void SetDispatchers( + scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers); + // Gets the "main" buffer for a |MessageInTransit|. A |MessageInTransit| can // be serialized by writing the main buffer. The returned pointer will be // aligned to a multiple of |kMessageAlignment| bytes, and the size of the @@ -120,6 +132,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { header()->destination_id = destination_id; } + // Gets the dispatchers attached to this message; this may return null if + // there are none. Note that the caller may mutate the set of dispatchers + // (e.g., take ownership of all the dispatchers, leaving the vector empty). + std::vector<scoped_refptr<Dispatcher> >* dispatchers() { + return dispatchers_.get(); + } + // TODO(vtl): Add whatever's necessary to transport handles. // Rounds |n| up to a multiple of |kMessageAlignment|. @@ -158,6 +177,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { size_t main_buffer_size_; void* main_buffer_; + // Any dispatchers that may be attached to this message. This is only + // supported if this message owns its main buffer. These dispatchers should be + // "owned" by this message, i.e., have a ref count of exactly 1. (We allow a + // dispatcher entry to be null, in case it couldn't be duplicated for some + // reason.) + scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers_; + DISALLOW_COPY_AND_ASSIGN(MessageInTransit); }; |