diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-26 23:39:53 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-26 23:39:53 +0000 |
commit | 71a7ad908faa7f1639459cdf4b4b7d2b8c5a5f56 (patch) | |
tree | 802c4296f6a0d4007acf1b2c79cf5dd4ee67cda8 /mojo | |
parent | a881c45a4d88a50a940ba412f8b6c9f74aec1ea4 (diff) | |
download | chromium_src-71a7ad908faa7f1639459cdf4b4b7d2b8c5a5f56.zip chromium_src-71a7ad908faa7f1639459cdf4b4b7d2b8c5a5f56.tar.gz chromium_src-71a7ad908faa7f1639459cdf4b4b7d2b8c5a5f56.tar.bz2 |
Mojo: Add a "secondary buffer" to MessageInTransit.
I think I really want to move away from "unowned buffer"
MessageInTransits, but I'll do that refactor after this change (since
it'd conflict badly with this change). Also, the format of the secondary
buffer isn't finalized yet.
R=yzshen@chromium.org
Review URL: https://codereview.chromium.org/177123011
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253633 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/system/message_in_transit.cc | 115 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 80 | ||||
-rw-r--r-- | mojo/system/proxy_message_pipe_endpoint.cc | 5 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 61 |
4 files changed, 202 insertions, 59 deletions
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc index 8ab1148..3a0abe2 100644 --- a/mojo/system/message_in_transit.cc +++ b/mojo/system/message_in_transit.cc @@ -48,16 +48,15 @@ MessageInTransit::MessageInTransit(OwnedBuffer, uint32_t num_bytes, uint32_t num_handles, const void* bytes) - : owns_main_buffer_(true), + : owns_buffers_(true), main_buffer_size_(RoundUpMessageAlignment(sizeof(Header) + num_bytes)), - main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)) { + main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)), + secondary_buffer_size_(0), + secondary_buffer_(NULL) { DCHECK_LE(num_bytes, kMaxMessageNumBytes); DCHECK_LE(num_handles, kMaxMessageNumHandles); - // Note: If dispatchers are subsequently attached (in particular, if - // |num_handles| is nonzero), then |total_size| will have to be adjusted. - // TODO(vtl): Figure out where we should really set this. - header()->total_size = static_cast<uint32_t>(main_buffer_size_); + // |total_size| is updated below, from the other values. header()->type = type; header()->subtype = subtype; header()->source_id = kInvalidEndpointId; @@ -66,6 +65,9 @@ MessageInTransit::MessageInTransit(OwnedBuffer, header()->num_handles = num_handles; header()->reserved0 = 0; header()->reserved1 = 0; + // Note: If dispatchers are subsequently attached (in particular, if + // |num_handles| is nonzero), then |total_size| will have to be adjusted. + UpdateTotalSize(); if (bytes) { memcpy(MessageInTransit::bytes(), bytes, num_bytes); @@ -78,16 +80,20 @@ MessageInTransit::MessageInTransit(OwnedBuffer, MessageInTransit::MessageInTransit(OwnedBuffer, const MessageInTransit& other) - : owns_main_buffer_(true), + : owns_buffers_(true), main_buffer_size_(other.main_buffer_size()), - main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)) { + main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)), + secondary_buffer_size_(other.secondary_buffer_size()), + secondary_buffer_(secondary_buffer_size_ ? + base::AlignedAlloc(secondary_buffer_size_, + kMessageAlignment) : NULL) { DCHECK(!other.dispatchers_.get()); DCHECK_GE(main_buffer_size_, sizeof(Header)); DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u); - memcpy(main_buffer_, other.main_buffer_, main_buffer_size_); + memcpy(main_buffer_, other.main_buffer(), main_buffer_size_); + memcpy(secondary_buffer_, other.secondary_buffer(), secondary_buffer_size_); - // TODO(vtl): This will change. DCHECK_EQ(main_buffer_size_, RoundUpMessageAlignment(sizeof(Header) + num_bytes())); } @@ -95,24 +101,41 @@ MessageInTransit::MessageInTransit(OwnedBuffer, MessageInTransit::MessageInTransit(UnownedBuffer, size_t message_size, void* buffer) - : owns_main_buffer_(false), - main_buffer_size_(message_size), - main_buffer_(buffer) { - DCHECK_GE(main_buffer_size_, sizeof(Header)); - DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u); - DCHECK(main_buffer_); + : owns_buffers_(false), + main_buffer_size_(0), + main_buffer_(NULL), + secondary_buffer_size_(0), + secondary_buffer_(NULL) { + DCHECK_GE(message_size, sizeof(Header)); + DCHECK_EQ(message_size % kMessageAlignment, 0u); + DCHECK(buffer); + + Header* header = static_cast<Header*>(buffer); + DCHECK_EQ(header->total_size, message_size); + + main_buffer_size_ = + RoundUpMessageAlignment(sizeof(Header) + header->num_bytes); + DCHECK_LE(main_buffer_size_, message_size); + main_buffer_ = buffer; DCHECK_EQ(reinterpret_cast<uintptr_t>(main_buffer_) % kMessageAlignment, 0u); - // TODO(vtl): This will change. - DCHECK_EQ(main_buffer_size_, - RoundUpMessageAlignment(sizeof(Header) + num_bytes())); + + if (message_size > main_buffer_size_) { + secondary_buffer_size_ = message_size - main_buffer_size_; + secondary_buffer_ = static_cast<char*>(buffer) + main_buffer_size_; + DCHECK_EQ(reinterpret_cast<uintptr_t>(secondary_buffer_) % + kMessageAlignment, 0u); + } } MessageInTransit::~MessageInTransit() { - if (owns_main_buffer_) { + if (owns_buffers_) { base::AlignedFree(main_buffer_); + base::AlignedFree(secondary_buffer_); // Okay if null. #ifndef NDEBUG main_buffer_size_ = 0; main_buffer_ = NULL; + secondary_buffer_size_ = 0; + secondary_buffer_ = NULL; #endif } @@ -149,7 +172,7 @@ bool MessageInTransit::GetNextMessageSize(const void* buffer, void MessageInTransit::SetDispatchers( scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers) { DCHECK(dispatchers.get()); - DCHECK(owns_main_buffer_); + DCHECK(owns_buffers_); DCHECK(!dispatchers_.get()); dispatchers_ = dispatchers.Pass(); @@ -159,5 +182,55 @@ void MessageInTransit::SetDispatchers( #endif } +void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { + DCHECK(channel); + DCHECK(owns_buffers_); + DCHECK(!secondary_buffer_); + CHECK_EQ(num_handles(), + dispatchers_.get() ? dispatchers_->size() : static_cast<size_t>(0)); + + if (!num_handles()) + return; + + // The size of the secondary buffer. We'll start with the size of the entry + // size table (which will contain the size of the data for each handle), and + // add to it as we go along. + size_t size = RoundUpMessageAlignment(num_handles() * sizeof(uint32_t)); + // The maximum size that we'll need for the secondary buffer. We'll allocate + // this much. + size_t max_size = size; + // TODO(vtl): Iterate through dispatchers and query their maximum size (and + // add each, rounded up, to |max_size|). + + secondary_buffer_ = base::AlignedAlloc(max_size, kMessageAlignment); + // TODO(vtl): I wonder if it's faster to clear everything once, or to only + // clear padding as needed. + memset(secondary_buffer_, 0, max_size); + + uint32_t* entry_size_table = static_cast<uint32_t*>(secondary_buffer_); + for (size_t i = 0; i < dispatchers_->size(); i++) { + // The entry size table entry is already zero by default. + if (!(*dispatchers_)[i]) + continue; + + // TODO(vtl): Serialize this dispatcher (getting its actual size, and adding + // that (rounded up) to |size|. + entry_size_table[i] = 0; + + DCHECK((*dispatchers_)[i]->HasOneRef()); + (*dispatchers_)[i]->Close(); + } + + secondary_buffer_size_ = static_cast<uint32_t>(size); + UpdateTotalSize(); +} + +void MessageInTransit::UpdateTotalSize() { + DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u); + DCHECK_EQ(secondary_buffer_size_ % kMessageAlignment, 0u); + header()->total_size = + static_cast<uint32_t>(main_buffer_size_ + secondary_buffer_size_); +} + } // namespace system } // namespace mojo diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index b3ac389..60d5c25 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -17,6 +17,8 @@ namespace mojo { namespace system { +class Channel; + // This class is used to represent data in transit. It is thread-unsafe. class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { public: @@ -87,30 +89,48 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { // Makes this message "own" the given set of dispatchers. The dispatchers must // not be referenced from anywhere else (in particular, not from the handle // table), i.e., each dispatcher must have a reference count of 1. This - // message must also own its main buffer and not already have dispatchers. + // message must also own its buffers and not already have dispatchers. void SetDispatchers( scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers); - // Gets the "main" buffer for a |MessageInTransit|. A |MessageInTransit| can - // be serialized by writing the main buffer. The returned pointer will be - // aligned to a multiple of |kMessageAlignment| bytes, and the size of the - // buffer (see below) will also be a multiple of |kMessageAlignment|. + // Serializes any dispatchers to the secondary buffer. This message must own + // its buffers (in order to have dispatchers in the first place), and the + // secondary buffer must not yet exist (so this must only be called once). The + // caller must ensure (e.g., by holding on to a reference) that |channel| + // stays alive through the call. + void SerializeAndCloseDispatchers(Channel* channel); + + // |MessageInTransit| buffers: A |MessageInTransit| can be serialized by + // writing the main buffer and then, if it has one, the secondary buffer. Both + // buffers are |kMessageAlignment|-byte aligned and a multiple of + // |kMessageAlignment| bytes in size. + // + // The main buffer consists of the header (of type |Header|, which is an + // internal detail of this class) followed immediately by the message data + // (accessed by |bytes()| and of size |num_bytes()|, and also + // |kMessageAlignment|-byte aligned), and then any padding needed to make the + // main buffer a multiple of |kMessageAlignment| bytes in size. // - // The main buffer always consists of the header (of type |Header|, which is - // an internal detail), followed by the message data, accessed by |bytes()| - // (of size |num_bytes()|, and also |kMessageAlignment|-aligned), and then any - // necessary padding to make |main_buffer_size()| a multiple of - // |kMessageAlignment|. - // TODO(vtl): Add a "secondary" buffer, so that this makes more sense. + // The secondary buffer consists first of a table of |uint32_t|s with + // |num_handles()| entries; each entry in the table is the (unpadded) size of + // the data for a handle (a size of 0 indicates in invalid handle/null + // dispatcher). The table is followed by padding, then the first entry and + // padding, the second entry and padding, etc. (padding as required to + // maintain |kMessageAlignment|-byte alignment). + + // Gets the main buffer and its size (in number of bytes), respectively. const void* main_buffer() const { return main_buffer_; } - - // Gets the size of the main buffer (in number of bytes). size_t main_buffer_size() const { return main_buffer_size_; } + // Gets the secondary buffer and its size (in number of bytes), respectively. + const void* secondary_buffer() const { return secondary_buffer_; } + size_t secondary_buffer_size() const { return secondary_buffer_size_; } + + // Gets the total size of the message (see comment in |Header|, below). + size_t total_size() const { return header()->total_size; } + // Gets the size of the message data. - uint32_t num_bytes() const { - return header()->num_bytes; - } + uint32_t num_bytes() const { return header()->num_bytes; } // Gets the message data (of size |num_bytes()| bytes). const void* bytes() const { @@ -118,9 +138,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { } void* bytes() { return static_cast<char*>(main_buffer_) + sizeof(Header); } - uint32_t num_handles() const { - return header()->num_handles; - } + uint32_t num_handles() const { return header()->num_handles; } Type type() const { return header()->type; } Subtype subtype() const { return header()->subtype; } @@ -139,8 +157,6 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { return dispatchers_.get(); } - // TODO(vtl): Add whatever's necessary to transport handles. - // Rounds |n| up to a multiple of |kMessageAlignment|. static inline size_t RoundUpMessageAlignment(size_t n) { return (n + kMessageAlignment - 1) & ~(kMessageAlignment - 1); @@ -155,12 +171,14 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { struct Header { // Total size of the message, including the header, the message data // ("bytes") including padding (to make it a multiple of |kMessageAlignment| - // bytes), and serialized handle information (TODO(vtl)). + // bytes), and serialized handle information. Note that this may not be the + // correct value if dispatchers are attached but + // |SerializeAndCloseDispatchers()| has not been called. uint32_t total_size; - Type type; - Subtype subtype; - EndpointId source_id; - EndpointId destination_id; + Type type; // 2 bytes. + Subtype subtype; // 2 bytes. + EndpointId source_id; // 4 bytes. + EndpointId destination_id; // 4 bytes. // Size of actual message data. uint32_t num_bytes; // Number of handles "attached". @@ -175,10 +193,18 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { } Header* header() { return static_cast<Header*>(main_buffer_); } - bool owns_main_buffer_; + void UpdateTotalSize(); + + // Whether we own |main_buffer_| and |secondary_buffer_| or not (that is, we + // own neither). + bool owns_buffers_; + size_t main_buffer_size_; void* main_buffer_; + size_t secondary_buffer_size_; + void* secondary_buffer_; // May be null. + // Any dispatchers that may be attached to this message. This is only // supported if this message owns its main buffer. These dispatchers should be // "owned" by this message, i.e., have a ref count of exactly 1. (We allow a diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc index f0a5de5..aafee31 100644 --- a/mojo/system/proxy_message_pipe_endpoint.cc +++ b/mojo/system/proxy_message_pipe_endpoint.cc @@ -68,11 +68,10 @@ void ProxyMessagePipeEndpoint::EnqueueMessage( } if (is_running()) { + message->SerializeAndCloseDispatchers(channel_.get()); + message->set_source_id(local_id_); message->set_destination_id(remote_id_); - // If it fails at this point, the message gets dropped. (This is no - // different from any other in-transit errors.) - // Note: |WriteMessage()| will destroy the message even on failure. if (!channel_->WriteMessage(message.Pass())) LOG(WARNING) << "Failed to write message to channel"; } else { diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index 433ea4f..fa1a5ce 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -6,6 +6,7 @@ #include <errno.h> #include <string.h> +#include <sys/uio.h> #include <unistd.h> #include <algorithm> @@ -263,9 +264,11 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { &read_buffer_[read_buffer_start], read_buffer_num_valid_bytes_, &message_size) && read_buffer_num_valid_bytes_ >= message_size) { + // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with + // some sort of "view" abstraction. MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size, &read_buffer_[read_buffer_start]); - DCHECK_EQ(message.main_buffer_size(), message_size); + DCHECK_EQ(message.total_size(), message_size); // Dispatch the message. delegate()->OnReadMessage(message); @@ -351,6 +354,49 @@ void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { delegate()->OnFatalError(fatal_error); } +// TODO(vtl): This will collide with yzshen's work. This is just a hacky, +// minimally-invasive function that does what I want (write/resume writing a +// |MessageInTransit| that may consist of more than one buffer). +ssize_t WriteMessageInTransit(int fd, + MessageInTransit* message, + size_t offset, + size_t* bytes_to_write) { + *bytes_to_write = message->total_size() - offset; + if (!message->secondary_buffer_size()) { + // Only write from the main buffer. + DCHECK_LT(offset, message->main_buffer_size()); + DCHECK_LE(*bytes_to_write, message->main_buffer_size()); + return HANDLE_EINTR( + write(fd, + static_cast<const char*>(message->main_buffer()) + offset, + *bytes_to_write)); + } + if (offset >= message->main_buffer_size()) { + // Only write from the secondary buffer. + DCHECK_LT(offset - message->main_buffer_size(), + message->secondary_buffer_size()); + DCHECK_LE(*bytes_to_write, message->secondary_buffer_size()); + return HANDLE_EINTR( + write(fd, + static_cast<const char*>(message->secondary_buffer()) + + (offset - message->main_buffer_size()), + *bytes_to_write)); + } + // Write from both buffers. (Note that using |writev()| is measurably slower + // than using |write()| -- at least in a microbenchmark -- but much faster + // than using two |write()|s.) + DCHECK_EQ(*bytes_to_write, message->main_buffer_size() - offset + + message->secondary_buffer_size()); + struct iovec iov[2] = { + { const_cast<char*>( + static_cast<const char*>(message->main_buffer()) + offset), + message->main_buffer_size() - offset }, + { const_cast<void*>(message->secondary_buffer()), + message->secondary_buffer_size() } + }; + return HANDLE_EINTR(writev(fd, iov, 2)); +} + bool RawChannelPosix::WriteFrontMessageNoLock() { write_lock_.AssertAcquired(); @@ -358,13 +404,12 @@ bool RawChannelPosix::WriteFrontMessageNoLock() { DCHECK(!write_message_queue_.empty()); MessageInTransit* message = write_message_queue_.front(); - DCHECK_LT(write_message_offset_, message->main_buffer_size()); - size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; - ssize_t bytes_written = HANDLE_EINTR( - write(fd_.get().fd, - static_cast<const char*>(message->main_buffer()) + - write_message_offset_, - bytes_to_write)); + DCHECK_LT(write_message_offset_, message->total_size()); + size_t bytes_to_write; + ssize_t bytes_written = WriteMessageInTransit(fd_.get().fd, + message, + write_message_offset_, + &bytes_to_write); if (bytes_written < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { PLOG(ERROR) << "write of size " << bytes_to_write; |