diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-28 23:30:55 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-28 23:30:55 +0000 |
commit | dd95a33519efb72a82dbe4ec16cc6cd90c8c8850 (patch) | |
tree | a5f7875bdcb887a55a80f51afeabb3af0b8248ad /mojo | |
parent | 14ffc08c27ffb79d5b263728ae8d19a1ffa3c7f8 (diff) | |
download | chromium_src-dd95a33519efb72a82dbe4ec16cc6cd90c8c8850.zip chromium_src-dd95a33519efb72a82dbe4ec16cc6cd90c8c8850.tar.gz chromium_src-dd95a33519efb72a82dbe4ec16cc6cd90c8c8850.tar.bz2 |
Mojo: Get rid of |MessageInTransit|s with unowned buffers.
Replace them with |MessageInTransit::View|s.
(Also modify our serialization hooks a little, and add a stub for
deserialization -- which isn't hooked up to anything yet.)
R=yzshen@chromium.org
Review URL: https://codereview.chromium.org/184683003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@254268 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/system/channel.cc | 31 | ||||
-rw-r--r-- | mojo/system/channel.h | 7 | ||||
-rw-r--r-- | mojo/system/dispatcher.cc | 35 | ||||
-rw-r--r-- | mojo/system/dispatcher.h | 40 | ||||
-rw-r--r-- | mojo/system/message_in_transit.cc | 116 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 106 | ||||
-rw-r--r-- | mojo/system/message_pipe.cc | 1 | ||||
-rw-r--r-- | mojo/system/proxy_message_pipe_endpoint.cc | 3 | ||||
-rw-r--r-- | mojo/system/raw_channel.cc | 19 | ||||
-rw-r--r-- | mojo/system/raw_channel.h | 5 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix_unittest.cc | 36 |
11 files changed, 216 insertions, 183 deletions
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc index c45ce4c..1c5d336 100644 --- a/mojo/system/channel.cc +++ b/mojo/system/channel.cc @@ -135,19 +135,19 @@ Channel::~Channel() { << " endpoints still present"; } -void Channel::OnReadMessage(const MessageInTransit& message) { - switch (message.type()) { +void Channel::OnReadMessage(const MessageInTransit::View& message_view) { + switch (message_view.type()) { case MessageInTransit::kTypeMessagePipeEndpoint: case MessageInTransit::kTypeMessagePipe: - OnReadMessageForDownstream(message); + OnReadMessageForDownstream(message_view); break; case MessageInTransit::kTypeChannel: - OnReadMessageForChannel(message); + OnReadMessageForChannel(message_view); break; default: HandleRemoteError(base::StringPrintf( "Received message of invalid type %u", - static_cast<unsigned>(message.type()))); + static_cast<unsigned>(message_view.type()))); break; } } @@ -157,11 +157,12 @@ void Channel::OnFatalError(FatalError fatal_error) { NOTIMPLEMENTED(); } -void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { - DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint || - message.type() == MessageInTransit::kTypeMessagePipe); +void Channel::OnReadMessageForDownstream( + const MessageInTransit::View& message_view) { + DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint || + message_view.type() == MessageInTransit::kTypeMessagePipe); - MessageInTransit::EndpointId local_id = message.destination_id(); + MessageInTransit::EndpointId local_id = message_view.destination_id(); if (local_id == MessageInTransit::kInvalidEndpointId) { HandleRemoteError("Received message with no destination ID"); return; @@ -195,16 +196,15 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { // We need to duplicate the message, because |EnqueueMessage()| will take // ownership of it. // TODO(vtl): Need to enforce limits on message size and handle count. - scoped_ptr<MessageInTransit> own_message( - new MessageInTransit(MessageInTransit::OWNED_BUFFER, message)); - std::vector<DispatcherTransport> transports(message.num_handles()); + scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); + std::vector<DispatcherTransport> transports(message->num_handles()); // TODO(vtl): Create dispatchers for handles. // TODO(vtl): It's bad that the current API will create equivalent dispatchers // for the freshly-created ones, which is totally redundant. Make a version of // |EnqueueMessage()| that passes ownership. if (endpoint_info.message_pipe->EnqueueMessage( - MessagePipe::GetPeerPort(endpoint_info.port), own_message.Pass(), - message.num_handles() ? &transports : NULL) != MOJO_RESULT_OK) { + MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(), + transports.empty() ? NULL : &transports) != MOJO_RESULT_OK) { HandleLocalError(base::StringPrintf( "Failed to enqueue message to local destination ID %u", static_cast<unsigned>(local_id))); @@ -212,7 +212,8 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) { } } -void Channel::OnReadMessageForChannel(const MessageInTransit& message) { +void Channel::OnReadMessageForChannel( + const MessageInTransit::View& message_view) { // TODO(vtl): Currently no channel-only messages yet. HandleRemoteError("Received invalid channel message"); NOTREACHED(); diff --git a/mojo/system/channel.h b/mojo/system/channel.h index e0285dc..336aae9 100644 --- a/mojo/system/channel.h +++ b/mojo/system/channel.h @@ -93,12 +93,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel virtual ~Channel(); // |RawChannel::Delegate| implementation: - virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE; + virtual void OnReadMessage( + const MessageInTransit::View& message_view) OVERRIDE; virtual void OnFatalError(FatalError fatal_error) OVERRIDE; // Helpers for |OnReadMessage|: - void OnReadMessageForDownstream(const MessageInTransit& message); - void OnReadMessageForChannel(const MessageInTransit& message); + void OnReadMessageForDownstream(const MessageInTransit::View& message_view); + void OnReadMessageForChannel(const MessageInTransit::View& message_view); // Handles errors (e.g., invalid messages) from the remote side. void HandleRemoteError(const base::StringPiece& error_message); diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc index 3d970d0..896d044 100644 --- a/mojo/system/dispatcher.cc +++ b/mojo/system/dispatcher.cc @@ -37,12 +37,23 @@ size_t Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize( } // static -size_t Dispatcher::MessageInTransitAccess::SerializeAndClose( +bool Dispatcher::MessageInTransitAccess::SerializeAndClose( Dispatcher* dispatcher, void* destination, - Channel* channel) { + Channel* channel, + size_t* actual_size) { DCHECK(dispatcher); - return dispatcher->SerializeAndClose(destination, channel); + return dispatcher->SerializeAndClose(destination, channel, actual_size); +} + +// static +scoped_refptr<Dispatcher> Dispatcher::MessageInTransitAccess::Deserialize( + Channel* channel, + int32_t type, + const void* source, + size_t size) { + // TODO(vtl) + return scoped_refptr<Dispatcher>(); } MojoResult Dispatcher::Close() { @@ -322,8 +333,9 @@ size_t Dispatcher::GetMaximumSerializedSizeImplNoLock( return 0; } -size_t Dispatcher::SerializeAndCloseImplNoLock(void* /*destination*/, - Channel* /*channel*/) { +bool Dispatcher::SerializeAndCloseImplNoLock(void* /*destination*/, + Channel* /*channel*/, + size_t* /*actual_size*/) { lock_.AssertAcquired(); DCHECK(is_closed_); // By default, serializing isn't supported, so just close. @@ -367,9 +379,12 @@ size_t Dispatcher::GetMaximumSerializedSize(const Channel* channel) const { return GetMaximumSerializedSizeImplNoLock(channel); } -size_t Dispatcher::SerializeAndClose(void* destination, Channel* channel) { +bool Dispatcher::SerializeAndClose(void* destination, + Channel* channel, + size_t* actual_size) { DCHECK(destination); DCHECK(channel); + DCHECK(actual_size); DCHECK(HasOneRef()); base::AutoLock locker(lock_); @@ -386,9 +401,11 @@ size_t Dispatcher::SerializeAndClose(void* destination, Channel* channel) { // No need to cancel waiters: we shouldn't have any (and shouldn't be in // |Core|'s handle table. - size_t size = SerializeAndCloseImplNoLock(destination, channel); - DCHECK_LE(size, max_size); - return size; + if (!SerializeAndCloseImplNoLock(destination, channel, actual_size)) + return false; + + DCHECK_LE(*actual_size, max_size); + return true; } // DispatcherTransport --------------------------------------------------------- diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h index 73bf77b..7b2bdb5 100644 --- a/mojo/system/dispatcher.h +++ b/mojo/system/dispatcher.h @@ -142,8 +142,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : }; // A |MessageInTransit| may serialize dispatchers that are attached to it to a - // given |Channel|. These may only be called on such dispatchers. See the - // |Dispatcher| methods of the same names for more details. + // given |Channel| and then (probably in a different process) deserialize. // TODO(vtl): Consider making another wrapper similar to |DispatcherTransport| // (but with an owning, unique reference), and having // |CreateEquivalentDispatcherAndCloseImplNoLock()| return that wrapper (and @@ -152,11 +151,26 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : private: friend class MessageInTransit; + // Serialization API. These functions may only be called on such + // dispatchers. (|channel| is the |Channel| to which the dispatcher is to be + // serialized.) See the |Dispatcher| methods of the same names for more + // details. + // TODO(vtl): Consider replacing this API below with a proper two-phase one + // ("StartSerialize()" and "EndSerializeAndClose()", with the lock possibly + // being held across their invocations). static size_t GetMaximumSerializedSize(const Dispatcher* dispatcher, const Channel* channel); - static size_t SerializeAndClose(Dispatcher* dispatcher, - void* destination, - Channel* channel); + static bool SerializeAndClose(Dispatcher* dispatcher, + void* destination, + Channel* channel, + size_t* actual_size); + + // Deserialization API. + // TODO(vtl): Implement this. + static scoped_refptr<Dispatcher> Deserialize(Channel* channel, + int32_t type, + const void* source, + size_t size); }; protected: @@ -222,8 +236,9 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : // being passed over a message pipe. virtual size_t GetMaximumSerializedSizeImplNoLock( const Channel* channel) const; - virtual size_t SerializeAndCloseImplNoLock(void* destination, - Channel* channel); + virtual bool SerializeAndCloseImplNoLock(void* destination, + Channel* channel, + size_t* actual_size); // Available to subclasses. (Note: Returns a non-const reference, just like // |base::AutoLock|'s constructor takes a non-const reference.) @@ -261,10 +276,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher : // Serializes this dispatcher to the given |Channel| by writing to // |destination| and then closes this dispatcher. It may write no more than // was indicated by |GetMaximumSerializedSize()|. (WARNING: Beware of races, - // e.g., if something can be mutated between the two calls!) This may return - // zero to indicate failure to serialize (but this dispatcher will still be - // closed). - size_t SerializeAndClose(void* destination, Channel* channel); + // e.g., if something can be mutated between the two calls!) Returns true on + // success, in which case |*actual_size| is set to the amount it actually + // wrote to |destination|. On failure, |*actual_size| should not be modified; + // however, the dispatcher will still be closed. + bool SerializeAndClose(void* destination, + Channel* channel, + size_t* actual_size); // This protects the following members as well as any state added by // subclasses. diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc index 16d3e0c..5b6236b 100644 --- a/mojo/system/message_in_transit.cc +++ b/mojo/system/message_in_transit.cc @@ -46,14 +46,22 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId MessageInTransit::kInvalidEndpointId; STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment; -MessageInTransit::MessageInTransit(OwnedBuffer, - Type type, + +MessageInTransit::View::View(size_t message_size, const void* buffer) + : message_size_(message_size), + buffer_(buffer) { + size_t next_message_size = 0; + DCHECK(MessageInTransit::GetNextMessageSize(buffer_, message_size_, + &next_message_size)); + DCHECK_EQ(message_size_, next_message_size); +} + +MessageInTransit::MessageInTransit(Type type, Subtype subtype, uint32_t num_bytes, uint32_t num_handles, const void* bytes) - : owns_buffers_(true), - main_buffer_size_(RoundUpMessageAlignment(sizeof(Header) + num_bytes)), + : main_buffer_size_(RoundUpMessageAlignment(sizeof(Header) + num_bytes)), main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)), secondary_buffer_size_(0), secondary_buffer_(NULL) { @@ -67,8 +75,6 @@ MessageInTransit::MessageInTransit(OwnedBuffer, header()->destination_id = kInvalidEndpointId; header()->num_bytes = num_bytes; header()->num_handles = num_handles; - header()->reserved0 = 0; - header()->reserved1 = 0; // Note: If dispatchers are subsequently attached (in particular, if // |num_handles| is nonzero), then |total_size| will have to be adjusted. UpdateTotalSize(); @@ -82,66 +88,33 @@ MessageInTransit::MessageInTransit(OwnedBuffer, } } -MessageInTransit::MessageInTransit(OwnedBuffer, - const MessageInTransit& other) - : owns_buffers_(true), - main_buffer_size_(other.main_buffer_size()), +MessageInTransit::MessageInTransit(const View& message_view) + : main_buffer_size_(message_view.main_buffer_size()), main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)), - secondary_buffer_size_(other.secondary_buffer_size()), + secondary_buffer_size_(message_view.secondary_buffer_size()), secondary_buffer_(secondary_buffer_size_ ? base::AlignedAlloc(secondary_buffer_size_, kMessageAlignment) : NULL) { - DCHECK(!other.dispatchers_.get()); DCHECK_GE(main_buffer_size_, sizeof(Header)); DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u); - memcpy(main_buffer_, other.main_buffer(), main_buffer_size_); - memcpy(secondary_buffer_, other.secondary_buffer(), secondary_buffer_size_); + memcpy(main_buffer_, message_view.main_buffer(), main_buffer_size_); + memcpy(secondary_buffer_, message_view.secondary_buffer(), + secondary_buffer_size_); DCHECK_EQ(main_buffer_size_, RoundUpMessageAlignment(sizeof(Header) + num_bytes())); } -MessageInTransit::MessageInTransit(UnownedBuffer, - size_t message_size, - void* buffer) - : owns_buffers_(false), - main_buffer_size_(0), - main_buffer_(NULL), - secondary_buffer_size_(0), - secondary_buffer_(NULL) { - DCHECK_GE(message_size, sizeof(Header)); - DCHECK_EQ(message_size % kMessageAlignment, 0u); - DCHECK(buffer); - - Header* header = static_cast<Header*>(buffer); - DCHECK_EQ(header->total_size, message_size); - - main_buffer_size_ = - RoundUpMessageAlignment(sizeof(Header) + header->num_bytes); - DCHECK_LE(main_buffer_size_, message_size); - main_buffer_ = buffer; - DCHECK_EQ(reinterpret_cast<uintptr_t>(main_buffer_) % kMessageAlignment, 0u); - - if (message_size > main_buffer_size_) { - secondary_buffer_size_ = message_size - main_buffer_size_; - secondary_buffer_ = static_cast<char*>(buffer) + main_buffer_size_; - DCHECK_EQ(reinterpret_cast<uintptr_t>(secondary_buffer_) % - kMessageAlignment, 0u); - } -} - MessageInTransit::~MessageInTransit() { - if (owns_buffers_) { - base::AlignedFree(main_buffer_); - base::AlignedFree(secondary_buffer_); // Okay if null. + base::AlignedFree(main_buffer_); + base::AlignedFree(secondary_buffer_); // Okay if null. #ifndef NDEBUG - main_buffer_size_ = 0; - main_buffer_ = NULL; - secondary_buffer_size_ = 0; - secondary_buffer_ = NULL; + main_buffer_size_ = 0; + main_buffer_ = NULL; + secondary_buffer_size_ = 0; + secondary_buffer_ = NULL; #endif - } if (dispatchers_.get()) { for (size_t i = 0; i < dispatchers_->size(); i++) { @@ -176,7 +149,6 @@ bool MessageInTransit::GetNextMessageSize(const void* buffer, void MessageInTransit::SetDispatchers( scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers) { DCHECK(dispatchers.get()); - DCHECK(owns_buffers_); DCHECK(!dispatchers_.get()); dispatchers_ = dispatchers.Pass(); @@ -188,7 +160,6 @@ void MessageInTransit::SetDispatchers( void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { DCHECK(channel); - DCHECK(owns_buffers_); DCHECK(!secondary_buffer_); CHECK_EQ(num_handles(), dispatchers_.get() ? dispatchers_->size() : static_cast<size_t>(0)); @@ -201,13 +172,12 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { // table, and add to it as we go along. size_t size = handle_table_size; for (size_t i = 0; i < dispatchers_->size(); i++) { - if (!(*dispatchers_)[i]) - continue; - - size += RoundUpMessageAlignment( - Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize( - (*dispatchers_)[i].get(), channel)); - // TODO(vtl): Check for overflow? + if (Dispatcher* dispatcher = (*dispatchers_)[i]) { + size += RoundUpMessageAlignment( + Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize( + dispatcher, channel)); + // TODO(vtl): Check for overflow? + } } secondary_buffer_ = base::AlignedAlloc(size, kMessageAlignment); @@ -222,19 +192,25 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { static_cast<HandleTableEntry*>(secondary_buffer_); size_t current_offset = handle_table_size; for (size_t i = 0; i < dispatchers_->size(); i++) { - if (!(*dispatchers_)[i]) { - // The |handle_table[i].size| is already zero, designating this entry as - // invalid. + Dispatcher* dispatcher = (*dispatchers_)[i]; + if (!dispatcher) { + COMPILE_ASSERT(Dispatcher::kTypeUnknown == 0, + need_Dispatcher_kTypeUnknown_to_be_zero); continue; } - handle_table[i].offset = static_cast<uint32_t>(current_offset); - handle_table[i].size = static_cast<uint32_t>( - Dispatcher::MessageInTransitAccess::SerializeAndClose( - (*dispatchers_)[i].get(), - static_cast<char*>(secondary_buffer_) + current_offset, - channel)); - current_offset += RoundUpMessageAlignment(handle_table[i].size); + size_t actual_size = 0; + if (Dispatcher::MessageInTransitAccess::SerializeAndClose( + dispatcher, static_cast<char*>(secondary_buffer_) + current_offset, + channel, &actual_size)) { + handle_table[i].type = static_cast<int32_t>(dispatcher->GetType()); + handle_table[i].offset = static_cast<uint32_t>(current_offset); + handle_table[i].size = static_cast<uint32_t>(actual_size); + } + // (Nothing to do on failure, since |secondary_buffer_| was cleared, and + // |kTypeUnknown| is zero.) + + current_offset += RoundUpMessageAlignment(actual_size); DCHECK_LE(current_offset, size); } diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index 1503e44..d8eb07d 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -63,35 +63,64 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { // quantity (which must be a power of 2). static const size_t kMessageAlignment = 8; - enum OwnedBuffer { OWNED_BUFFER }; - // Constructor for a |MessageInTransit| that owns its buffer. |bytes| is - // optional; if null, the message data will be zero-initialized. - MessageInTransit(OwnedBuffer, - Type type, + // Forward-declare |Header| so that |View| can use it: + private: + struct Header; + public: + // This represents a view of serialized message data in a raw buffer. + class View { + public: + // Constructs a view from the given buffer of the given size. (The size must + // be as provided by |MessageInTransit::GetNextMessageSize()|.) The buffer + // must remain alive/unmodified through the lifetime of this object. + // |buffer| should be |kMessageAlignment|-byte aligned. + View(size_t message_size, const void* buffer); + + // API parallel to that for |MessageInTransit| itself (mostly getters for + // header data). + const void* main_buffer() const { return buffer_; } + size_t main_buffer_size() const { + return RoundUpMessageAlignment(sizeof(Header) + header()->num_bytes); + } + const void* secondary_buffer() const { + return (message_size_ > main_buffer_size()) ? + static_cast<const char*>(buffer_) + main_buffer_size() : NULL; + } + size_t secondary_buffer_size() const { + return message_size_ - main_buffer_size(); + } + size_t total_size() const { return header()->total_size; } + uint32_t num_bytes() const { return header()->num_bytes; } + const void* bytes() const { + return static_cast<const char*>(buffer_) + sizeof(Header); + } + uint32_t num_handles() const { return header()->num_handles; } + Type type() const { return header()->type; } + Subtype subtype() const { return header()->subtype; } + EndpointId source_id() const { return header()->source_id; } + EndpointId destination_id() const { return header()->destination_id; } + + private: + const Header* header() const { return static_cast<const Header*>(buffer_); } + + const size_t message_size_; + const void* const buffer_; + + // Though this struct is trivial, disallow copy and assign, since it doesn't + // own its data. (If you're copying/assigning this, you're probably doing + // something wrong.) + DISALLOW_COPY_AND_ASSIGN(View); + }; + + // |bytes| is optional; if null, the message data will be zero-initialized. + MessageInTransit(Type type, Subtype subtype, uint32_t num_bytes, uint32_t num_handles, const void* bytes); - // "Copy" constructor. The input |MessageInTransit| may own its buffer or not; - // however, it must not have any dispatchers. The constructed - // |MessageInTransit| will own its buffer. - MessageInTransit(OwnedBuffer, - const MessageInTransit& other); - - enum UnownedBuffer { UNOWNED_BUFFER }; - // Constructor for a |MessageInTransit| that is a "view" into another buffer. - // |buffer| should point to a fully-serialized |MessageInTransit|, and should - // be aligned on a |kMessageAlignment|-byte boundary. |message_size| should be - // the value provided by |GetNextMessageSize()|, and |buffer| should have at - // least that many bytes available. |buffer| should live (without change to - // the first |message_size| bytes) at least as long the new |MessageInTransit| - // does. - // - // Note: You probably don't want to heap-allocate this kind of - // |MessageInTransit| (and, e.g., put it into a |scoped_ptr|); you definitely - // don't want to pass it as a parameter in a |scoped_ptr|. Whenever you use - // this, you can probably create it directly on the stack. - MessageInTransit(UnownedBuffer, size_t message_size, void* buffer); + // Constructs a |MessageInTransit| from a |View|. + explicit MessageInTransit(const View& message_view); + ~MessageInTransit(); // Gets the size of the next message from |buffer|, which has |buffer_size| @@ -108,13 +137,12 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { // Makes this message "own" the given set of dispatchers. The dispatchers must // not be referenced from anywhere else (in particular, not from the handle // table), i.e., each dispatcher must have a reference count of 1. This - // message must also own its buffers and not already have dispatchers. + // message must not already have dispatchers. void SetDispatchers( scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers); - // Serializes any dispatchers to the secondary buffer. This message must own - // its buffers (in order to have dispatchers in the first place), and the - // secondary buffer must not yet exist (so this must only be called once). The + // Serializes any dispatchers to the secondary buffer. This message must not + // already have a secondary buffer (so this must only be called once). The // caller must ensure (e.g., by holding on to a reference) that |channel| // stays alive through the call. void SerializeAndCloseDispatchers(Channel* channel); @@ -184,14 +212,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { uint32_t num_bytes; // Number of handles "attached". uint32_t num_handles; - // To be used soon. - uint32_t reserved0; - uint32_t reserved1; }; struct HandleTableEntry { + int32_t type; // From |Dispatcher::Type| (|kTypeUnknown| for "invalid"). uint32_t offset; - uint32_t size; // (Not including any padding.) A size of 0 means "invalid". + uint32_t size; // (Not including any padding.) + uint32_t unused; }; const Header* header() const { @@ -201,21 +228,16 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { void UpdateTotalSize(); - // Whether we own |main_buffer_| and |secondary_buffer_| or not (that is, we - // own neither). - bool owns_buffers_; - size_t main_buffer_size_; void* main_buffer_; size_t secondary_buffer_size_; void* secondary_buffer_; // May be null. - // Any dispatchers that may be attached to this message. This is only - // supported if this message owns its main buffer. These dispatchers should be - // "owned" by this message, i.e., have a ref count of exactly 1. (We allow a - // dispatcher entry to be null, in case it couldn't be duplicated for some - // reason.) + // Any dispatchers that may be attached to this message. These dispatchers + // should be "owned" by this message, i.e., have a ref count of exactly 1. (We + // allow a dispatcher entry to be null, in case it couldn't be duplicated for + // some reason.) scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers_; DISALLOW_COPY_AND_ASSIGN(MessageInTransit); diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc index d95df38..0092abd 100644 --- a/mojo/system/message_pipe.cc +++ b/mojo/system/message_pipe.cc @@ -68,7 +68,6 @@ MojoResult MessagePipe::WriteMessage( return EnqueueMessage( GetPeerPort(port), make_scoped_ptr(new MessageInTransit( - MessageInTransit::OWNED_BUFFER, MessageInTransit::kTypeMessagePipeEndpoint, MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, num_handles, bytes)), diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc index aafee31..58c72da 100644 --- a/mojo/system/proxy_message_pipe_endpoint.cc +++ b/mojo/system/proxy_message_pipe_endpoint.cc @@ -47,8 +47,7 @@ void ProxyMessagePipeEndpoint::OnPeerClose() { is_peer_open_ = false; EnqueueMessage(make_scoped_ptr( - new MessageInTransit(MessageInTransit::OWNED_BUFFER, - MessageInTransit::kTypeMessagePipe, + new MessageInTransit(MessageInTransit::kTypeMessagePipe, MessageInTransit::kSubtypeMessagePipePeerClosed, 0, 0, NULL))); } diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc index 5d8da3b..029e18a 100644 --- a/mojo/system/raw_channel.cc +++ b/mojo/system/raw_channel.cc @@ -192,11 +192,11 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; - // Keep reading data in a loop, and dispatches messages if enough data is + // Keep reading data in a loop, and dispatch messages if enough data is // received. Exit the loop if any of the following happens: - // - one or more messages were dispatched; - // - the last read failed, was a partial read or would block; - // - |Shutdown()| was called. + // - one or more messages were dispatched; + // - the last read failed, was a partial read or would block; + // - |Shutdown()| was called. do { if (io_result != IO_SUCCEEDED) { read_stopped_ = true; @@ -219,19 +219,18 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { // - |message_size| is only valid if |GetNextMessageSize()| returns true. // TODO(vtl): Use |message_size| more intelligently (e.g., to request the // next read). + // TODO(vtl): Validate that |message_size| is sane. while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( &read_buffer_->buffer_[read_buffer_start], remaining_bytes, &message_size) && remaining_bytes >= message_size) { - // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with - // some sort of "view" abstraction. - MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, - &read_buffer_->buffer_[read_buffer_start]); - DCHECK_EQ(message.total_size(), message_size); + MessageInTransit::View + message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); + DCHECK_EQ(message_view.total_size(), message_size); // Dispatch the message. - delegate_->OnReadMessage(message); + delegate_->OnReadMessage(message_view); if (read_stopped_) { // |Shutdown()| was called in |OnReadMessage()|. // TODO(vtl): Add test for this case. diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h index 4b0f0f4..40a65bf 100644 --- a/mojo/system/raw_channel.h +++ b/mojo/system/raw_channel.h @@ -14,6 +14,7 @@ #include "base/synchronization/lock.h" #include "mojo/system/constants.h" #include "mojo/system/embedder/scoped_platform_handle.h" +#include "mojo/system/message_in_transit.h" #include "mojo/system/system_impl_export.h" namespace base { @@ -23,8 +24,6 @@ class MessageLoopForIO; namespace mojo { namespace system { -class MessageInTransit; - // |RawChannel| is an interface to objects that wrap an OS "pipe". It presents // the following interface to users: // - Receives and dispatches messages on an I/O thread (running a @@ -56,7 +55,7 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // Called when a message is read. This may call |Shutdown()| on the // |RawChannel|, but must not destroy it. - virtual void OnReadMessage(const MessageInTransit& message) = 0; + virtual void OnReadMessage(const MessageInTransit::View& message_view) = 0; // Called when there's a fatal error, which leads to the channel no longer // being viable. diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc index 5af66c3..b0d5fc6 100644 --- a/mojo/system/raw_channel_posix_unittest.cc +++ b/mojo/system/raw_channel_posix_unittest.cc @@ -41,8 +41,7 @@ scoped_ptr<MessageInTransit> MakeTestMessage(uint32_t num_bytes) { for (size_t i = 0; i < num_bytes; i++) bytes[i] = static_cast<unsigned char>(i + num_bytes); return make_scoped_ptr( - new MessageInTransit(MessageInTransit::OWNED_BUFFER, - MessageInTransit::kTypeMessagePipeEndpoint, + new MessageInTransit(MessageInTransit::kTypeMessagePipeEndpoint, MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, 0, bytes.data())); } @@ -107,7 +106,8 @@ class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { virtual ~WriteOnlyRawChannelDelegate() {} // |RawChannel::Delegate| implementation: - virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE { + virtual void OnReadMessage( + const MessageInTransit::View& /*message_view*/) OVERRIDE { NOTREACHED(); } virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE { @@ -145,16 +145,15 @@ class TestMessageReaderAndChecker { // If we've read the whole message.... if (bytes_.size() >= message_size) { bool rv = true; - MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, - message_size, bytes_.data()); - CHECK_EQ(message.main_buffer_size(), message_size); + MessageInTransit::View message_view(message_size, bytes_.data()); + CHECK_EQ(message_view.main_buffer_size(), message_size); - if (message.num_bytes() != expected_size) { + if (message_view.num_bytes() != expected_size) { LOG(ERROR) << "Wrong size: " << message_size << " instead of " << expected_size << " bytes."; rv = false; - } else if (!CheckMessageData(message.bytes(), - message.num_bytes())) { + } else if (!CheckMessageData(message_view.bytes(), + message_view.num_bytes())) { LOG(ERROR) << "Incorrect message bytes."; rv = false; } @@ -162,7 +161,7 @@ class TestMessageReaderAndChecker { // Erase message data. bytes_.erase(bytes_.begin(), bytes_.begin() + - message.main_buffer_size()); + message_view.main_buffer_size()); return rv; } } @@ -228,7 +227,8 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { virtual ~ReadCheckerRawChannelDelegate() {} // |RawChannel::Delegate| implementation (called on the I/O thread): - virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE { + virtual void OnReadMessage( + const MessageInTransit::View& message_view) OVERRIDE { size_t position; size_t expected_size; bool should_signal = false; @@ -242,10 +242,10 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { should_signal = true; } - EXPECT_EQ(expected_size, message.num_bytes()) << position; - if (message.num_bytes() == expected_size) { - EXPECT_TRUE(CheckMessageData(message.bytes(), message.num_bytes())) - << position; + EXPECT_EQ(expected_size, message_view.num_bytes()) << position; + if (message_view.num_bytes() == expected_size) { + EXPECT_TRUE(CheckMessageData(message_view.bytes(), + message_view.num_bytes())) << position; } if (should_signal) @@ -352,11 +352,13 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { virtual ~ReadCountdownRawChannelDelegate() {} // |RawChannel::Delegate| implementation (called on the I/O thread): - virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE { + virtual void OnReadMessage( + const MessageInTransit::View& message_view) OVERRIDE { EXPECT_LT(count_, expected_count_); count_++; - EXPECT_TRUE(CheckMessageData(message.bytes(), message.num_bytes())); + EXPECT_TRUE(CheckMessageData(message_view.bytes(), + message_view.num_bytes())); if (count_ >= expected_count_) done_event_.Signal(); |