summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-28 23:30:55 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-28 23:30:55 +0000
commitdd95a33519efb72a82dbe4ec16cc6cd90c8c8850 (patch)
treea5f7875bdcb887a55a80f51afeabb3af0b8248ad /mojo
parent14ffc08c27ffb79d5b263728ae8d19a1ffa3c7f8 (diff)
downloadchromium_src-dd95a33519efb72a82dbe4ec16cc6cd90c8c8850.zip
chromium_src-dd95a33519efb72a82dbe4ec16cc6cd90c8c8850.tar.gz
chromium_src-dd95a33519efb72a82dbe4ec16cc6cd90c8c8850.tar.bz2
Mojo: Get rid of |MessageInTransit|s with unowned buffers.
Replace them with |MessageInTransit::View|s. (Also modify our serialization hooks a little, and add a stub for deserialization -- which isn't hooked up to anything yet.) R=yzshen@chromium.org Review URL: https://codereview.chromium.org/184683003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@254268 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/system/channel.cc31
-rw-r--r--mojo/system/channel.h7
-rw-r--r--mojo/system/dispatcher.cc35
-rw-r--r--mojo/system/dispatcher.h40
-rw-r--r--mojo/system/message_in_transit.cc116
-rw-r--r--mojo/system/message_in_transit.h106
-rw-r--r--mojo/system/message_pipe.cc1
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.cc3
-rw-r--r--mojo/system/raw_channel.cc19
-rw-r--r--mojo/system/raw_channel.h5
-rw-r--r--mojo/system/raw_channel_posix_unittest.cc36
11 files changed, 216 insertions, 183 deletions
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
index c45ce4c..1c5d336 100644
--- a/mojo/system/channel.cc
+++ b/mojo/system/channel.cc
@@ -135,19 +135,19 @@ Channel::~Channel() {
<< " endpoints still present";
}
-void Channel::OnReadMessage(const MessageInTransit& message) {
- switch (message.type()) {
+void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
+ switch (message_view.type()) {
case MessageInTransit::kTypeMessagePipeEndpoint:
case MessageInTransit::kTypeMessagePipe:
- OnReadMessageForDownstream(message);
+ OnReadMessageForDownstream(message_view);
break;
case MessageInTransit::kTypeChannel:
- OnReadMessageForChannel(message);
+ OnReadMessageForChannel(message_view);
break;
default:
HandleRemoteError(base::StringPrintf(
"Received message of invalid type %u",
- static_cast<unsigned>(message.type())));
+ static_cast<unsigned>(message_view.type())));
break;
}
}
@@ -157,11 +157,12 @@ void Channel::OnFatalError(FatalError fatal_error) {
NOTIMPLEMENTED();
}
-void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
- DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
- message.type() == MessageInTransit::kTypeMessagePipe);
+void Channel::OnReadMessageForDownstream(
+ const MessageInTransit::View& message_view) {
+ DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
+ message_view.type() == MessageInTransit::kTypeMessagePipe);
- MessageInTransit::EndpointId local_id = message.destination_id();
+ MessageInTransit::EndpointId local_id = message_view.destination_id();
if (local_id == MessageInTransit::kInvalidEndpointId) {
HandleRemoteError("Received message with no destination ID");
return;
@@ -195,16 +196,15 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
// We need to duplicate the message, because |EnqueueMessage()| will take
// ownership of it.
// TODO(vtl): Need to enforce limits on message size and handle count.
- scoped_ptr<MessageInTransit> own_message(
- new MessageInTransit(MessageInTransit::OWNED_BUFFER, message));
- std::vector<DispatcherTransport> transports(message.num_handles());
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
+ std::vector<DispatcherTransport> transports(message->num_handles());
// TODO(vtl): Create dispatchers for handles.
// TODO(vtl): It's bad that the current API will create equivalent dispatchers
// for the freshly-created ones, which is totally redundant. Make a version of
// |EnqueueMessage()| that passes ownership.
if (endpoint_info.message_pipe->EnqueueMessage(
- MessagePipe::GetPeerPort(endpoint_info.port), own_message.Pass(),
- message.num_handles() ? &transports : NULL) != MOJO_RESULT_OK) {
+ MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(),
+ transports.empty() ? NULL : &transports) != MOJO_RESULT_OK) {
HandleLocalError(base::StringPrintf(
"Failed to enqueue message to local destination ID %u",
static_cast<unsigned>(local_id)));
@@ -212,7 +212,8 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
}
}
-void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
+void Channel::OnReadMessageForChannel(
+ const MessageInTransit::View& message_view) {
// TODO(vtl): Currently no channel-only messages yet.
HandleRemoteError("Received invalid channel message");
NOTREACHED();
diff --git a/mojo/system/channel.h b/mojo/system/channel.h
index e0285dc..336aae9 100644
--- a/mojo/system/channel.h
+++ b/mojo/system/channel.h
@@ -93,12 +93,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
virtual ~Channel();
// |RawChannel::Delegate| implementation:
- virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE;
+ virtual void OnReadMessage(
+ const MessageInTransit::View& message_view) OVERRIDE;
virtual void OnFatalError(FatalError fatal_error) OVERRIDE;
// Helpers for |OnReadMessage|:
- void OnReadMessageForDownstream(const MessageInTransit& message);
- void OnReadMessageForChannel(const MessageInTransit& message);
+ void OnReadMessageForDownstream(const MessageInTransit::View& message_view);
+ void OnReadMessageForChannel(const MessageInTransit::View& message_view);
// Handles errors (e.g., invalid messages) from the remote side.
void HandleRemoteError(const base::StringPiece& error_message);
diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc
index 3d970d0..896d044 100644
--- a/mojo/system/dispatcher.cc
+++ b/mojo/system/dispatcher.cc
@@ -37,12 +37,23 @@ size_t Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize(
}
// static
-size_t Dispatcher::MessageInTransitAccess::SerializeAndClose(
+bool Dispatcher::MessageInTransitAccess::SerializeAndClose(
Dispatcher* dispatcher,
void* destination,
- Channel* channel) {
+ Channel* channel,
+ size_t* actual_size) {
DCHECK(dispatcher);
- return dispatcher->SerializeAndClose(destination, channel);
+ return dispatcher->SerializeAndClose(destination, channel, actual_size);
+}
+
+// static
+scoped_refptr<Dispatcher> Dispatcher::MessageInTransitAccess::Deserialize(
+ Channel* channel,
+ int32_t type,
+ const void* source,
+ size_t size) {
+ // TODO(vtl)
+ return scoped_refptr<Dispatcher>();
}
MojoResult Dispatcher::Close() {
@@ -322,8 +333,9 @@ size_t Dispatcher::GetMaximumSerializedSizeImplNoLock(
return 0;
}
-size_t Dispatcher::SerializeAndCloseImplNoLock(void* /*destination*/,
- Channel* /*channel*/) {
+bool Dispatcher::SerializeAndCloseImplNoLock(void* /*destination*/,
+ Channel* /*channel*/,
+ size_t* /*actual_size*/) {
lock_.AssertAcquired();
DCHECK(is_closed_);
// By default, serializing isn't supported, so just close.
@@ -367,9 +379,12 @@ size_t Dispatcher::GetMaximumSerializedSize(const Channel* channel) const {
return GetMaximumSerializedSizeImplNoLock(channel);
}
-size_t Dispatcher::SerializeAndClose(void* destination, Channel* channel) {
+bool Dispatcher::SerializeAndClose(void* destination,
+ Channel* channel,
+ size_t* actual_size) {
DCHECK(destination);
DCHECK(channel);
+ DCHECK(actual_size);
DCHECK(HasOneRef());
base::AutoLock locker(lock_);
@@ -386,9 +401,11 @@ size_t Dispatcher::SerializeAndClose(void* destination, Channel* channel) {
// No need to cancel waiters: we shouldn't have any (and shouldn't be in
// |Core|'s handle table.
- size_t size = SerializeAndCloseImplNoLock(destination, channel);
- DCHECK_LE(size, max_size);
- return size;
+ if (!SerializeAndCloseImplNoLock(destination, channel, actual_size))
+ return false;
+
+ DCHECK_LE(*actual_size, max_size);
+ return true;
}
// DispatcherTransport ---------------------------------------------------------
diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h
index 73bf77b..7b2bdb5 100644
--- a/mojo/system/dispatcher.h
+++ b/mojo/system/dispatcher.h
@@ -142,8 +142,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
};
// A |MessageInTransit| may serialize dispatchers that are attached to it to a
- // given |Channel|. These may only be called on such dispatchers. See the
- // |Dispatcher| methods of the same names for more details.
+ // given |Channel| and then (probably in a different process) deserialize.
// TODO(vtl): Consider making another wrapper similar to |DispatcherTransport|
// (but with an owning, unique reference), and having
// |CreateEquivalentDispatcherAndCloseImplNoLock()| return that wrapper (and
@@ -152,11 +151,26 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
private:
friend class MessageInTransit;
+ // Serialization API. These functions may only be called on such
+ // dispatchers. (|channel| is the |Channel| to which the dispatcher is to be
+ // serialized.) See the |Dispatcher| methods of the same names for more
+ // details.
+ // TODO(vtl): Consider replacing this API below with a proper two-phase one
+ // ("StartSerialize()" and "EndSerializeAndClose()", with the lock possibly
+ // being held across their invocations).
static size_t GetMaximumSerializedSize(const Dispatcher* dispatcher,
const Channel* channel);
- static size_t SerializeAndClose(Dispatcher* dispatcher,
- void* destination,
- Channel* channel);
+ static bool SerializeAndClose(Dispatcher* dispatcher,
+ void* destination,
+ Channel* channel,
+ size_t* actual_size);
+
+ // Deserialization API.
+ // TODO(vtl): Implement this.
+ static scoped_refptr<Dispatcher> Deserialize(Channel* channel,
+ int32_t type,
+ const void* source,
+ size_t size);
};
protected:
@@ -222,8 +236,9 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
// being passed over a message pipe.
virtual size_t GetMaximumSerializedSizeImplNoLock(
const Channel* channel) const;
- virtual size_t SerializeAndCloseImplNoLock(void* destination,
- Channel* channel);
+ virtual bool SerializeAndCloseImplNoLock(void* destination,
+ Channel* channel,
+ size_t* actual_size);
// Available to subclasses. (Note: Returns a non-const reference, just like
// |base::AutoLock|'s constructor takes a non-const reference.)
@@ -261,10 +276,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
// Serializes this dispatcher to the given |Channel| by writing to
// |destination| and then closes this dispatcher. It may write no more than
// was indicated by |GetMaximumSerializedSize()|. (WARNING: Beware of races,
- // e.g., if something can be mutated between the two calls!) This may return
- // zero to indicate failure to serialize (but this dispatcher will still be
- // closed).
- size_t SerializeAndClose(void* destination, Channel* channel);
+ // e.g., if something can be mutated between the two calls!) Returns true on
+ // success, in which case |*actual_size| is set to the amount it actually
+ // wrote to |destination|. On failure, |*actual_size| should not be modified;
+ // however, the dispatcher will still be closed.
+ bool SerializeAndClose(void* destination,
+ Channel* channel,
+ size_t* actual_size);
// This protects the following members as well as any state added by
// subclasses.
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index 16d3e0c..5b6236b 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -46,14 +46,22 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
MessageInTransit::kInvalidEndpointId;
STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment;
-MessageInTransit::MessageInTransit(OwnedBuffer,
- Type type,
+
+MessageInTransit::View::View(size_t message_size, const void* buffer)
+ : message_size_(message_size),
+ buffer_(buffer) {
+ size_t next_message_size = 0;
+ DCHECK(MessageInTransit::GetNextMessageSize(buffer_, message_size_,
+ &next_message_size));
+ DCHECK_EQ(message_size_, next_message_size);
+}
+
+MessageInTransit::MessageInTransit(Type type,
Subtype subtype,
uint32_t num_bytes,
uint32_t num_handles,
const void* bytes)
- : owns_buffers_(true),
- main_buffer_size_(RoundUpMessageAlignment(sizeof(Header) + num_bytes)),
+ : main_buffer_size_(RoundUpMessageAlignment(sizeof(Header) + num_bytes)),
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)),
secondary_buffer_size_(0),
secondary_buffer_(NULL) {
@@ -67,8 +75,6 @@ MessageInTransit::MessageInTransit(OwnedBuffer,
header()->destination_id = kInvalidEndpointId;
header()->num_bytes = num_bytes;
header()->num_handles = num_handles;
- header()->reserved0 = 0;
- header()->reserved1 = 0;
// Note: If dispatchers are subsequently attached (in particular, if
// |num_handles| is nonzero), then |total_size| will have to be adjusted.
UpdateTotalSize();
@@ -82,66 +88,33 @@ MessageInTransit::MessageInTransit(OwnedBuffer,
}
}
-MessageInTransit::MessageInTransit(OwnedBuffer,
- const MessageInTransit& other)
- : owns_buffers_(true),
- main_buffer_size_(other.main_buffer_size()),
+MessageInTransit::MessageInTransit(const View& message_view)
+ : main_buffer_size_(message_view.main_buffer_size()),
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)),
- secondary_buffer_size_(other.secondary_buffer_size()),
+ secondary_buffer_size_(message_view.secondary_buffer_size()),
secondary_buffer_(secondary_buffer_size_ ?
base::AlignedAlloc(secondary_buffer_size_,
kMessageAlignment) : NULL) {
- DCHECK(!other.dispatchers_.get());
DCHECK_GE(main_buffer_size_, sizeof(Header));
DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u);
- memcpy(main_buffer_, other.main_buffer(), main_buffer_size_);
- memcpy(secondary_buffer_, other.secondary_buffer(), secondary_buffer_size_);
+ memcpy(main_buffer_, message_view.main_buffer(), main_buffer_size_);
+ memcpy(secondary_buffer_, message_view.secondary_buffer(),
+ secondary_buffer_size_);
DCHECK_EQ(main_buffer_size_,
RoundUpMessageAlignment(sizeof(Header) + num_bytes()));
}
-MessageInTransit::MessageInTransit(UnownedBuffer,
- size_t message_size,
- void* buffer)
- : owns_buffers_(false),
- main_buffer_size_(0),
- main_buffer_(NULL),
- secondary_buffer_size_(0),
- secondary_buffer_(NULL) {
- DCHECK_GE(message_size, sizeof(Header));
- DCHECK_EQ(message_size % kMessageAlignment, 0u);
- DCHECK(buffer);
-
- Header* header = static_cast<Header*>(buffer);
- DCHECK_EQ(header->total_size, message_size);
-
- main_buffer_size_ =
- RoundUpMessageAlignment(sizeof(Header) + header->num_bytes);
- DCHECK_LE(main_buffer_size_, message_size);
- main_buffer_ = buffer;
- DCHECK_EQ(reinterpret_cast<uintptr_t>(main_buffer_) % kMessageAlignment, 0u);
-
- if (message_size > main_buffer_size_) {
- secondary_buffer_size_ = message_size - main_buffer_size_;
- secondary_buffer_ = static_cast<char*>(buffer) + main_buffer_size_;
- DCHECK_EQ(reinterpret_cast<uintptr_t>(secondary_buffer_) %
- kMessageAlignment, 0u);
- }
-}
-
MessageInTransit::~MessageInTransit() {
- if (owns_buffers_) {
- base::AlignedFree(main_buffer_);
- base::AlignedFree(secondary_buffer_); // Okay if null.
+ base::AlignedFree(main_buffer_);
+ base::AlignedFree(secondary_buffer_); // Okay if null.
#ifndef NDEBUG
- main_buffer_size_ = 0;
- main_buffer_ = NULL;
- secondary_buffer_size_ = 0;
- secondary_buffer_ = NULL;
+ main_buffer_size_ = 0;
+ main_buffer_ = NULL;
+ secondary_buffer_size_ = 0;
+ secondary_buffer_ = NULL;
#endif
- }
if (dispatchers_.get()) {
for (size_t i = 0; i < dispatchers_->size(); i++) {
@@ -176,7 +149,6 @@ bool MessageInTransit::GetNextMessageSize(const void* buffer,
void MessageInTransit::SetDispatchers(
scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers) {
DCHECK(dispatchers.get());
- DCHECK(owns_buffers_);
DCHECK(!dispatchers_.get());
dispatchers_ = dispatchers.Pass();
@@ -188,7 +160,6 @@ void MessageInTransit::SetDispatchers(
void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
DCHECK(channel);
- DCHECK(owns_buffers_);
DCHECK(!secondary_buffer_);
CHECK_EQ(num_handles(),
dispatchers_.get() ? dispatchers_->size() : static_cast<size_t>(0));
@@ -201,13 +172,12 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
// table, and add to it as we go along.
size_t size = handle_table_size;
for (size_t i = 0; i < dispatchers_->size(); i++) {
- if (!(*dispatchers_)[i])
- continue;
-
- size += RoundUpMessageAlignment(
- Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize(
- (*dispatchers_)[i].get(), channel));
- // TODO(vtl): Check for overflow?
+ if (Dispatcher* dispatcher = (*dispatchers_)[i]) {
+ size += RoundUpMessageAlignment(
+ Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize(
+ dispatcher, channel));
+ // TODO(vtl): Check for overflow?
+ }
}
secondary_buffer_ = base::AlignedAlloc(size, kMessageAlignment);
@@ -222,19 +192,25 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
static_cast<HandleTableEntry*>(secondary_buffer_);
size_t current_offset = handle_table_size;
for (size_t i = 0; i < dispatchers_->size(); i++) {
- if (!(*dispatchers_)[i]) {
- // The |handle_table[i].size| is already zero, designating this entry as
- // invalid.
+ Dispatcher* dispatcher = (*dispatchers_)[i];
+ if (!dispatcher) {
+ COMPILE_ASSERT(Dispatcher::kTypeUnknown == 0,
+ need_Dispatcher_kTypeUnknown_to_be_zero);
continue;
}
- handle_table[i].offset = static_cast<uint32_t>(current_offset);
- handle_table[i].size = static_cast<uint32_t>(
- Dispatcher::MessageInTransitAccess::SerializeAndClose(
- (*dispatchers_)[i].get(),
- static_cast<char*>(secondary_buffer_) + current_offset,
- channel));
- current_offset += RoundUpMessageAlignment(handle_table[i].size);
+ size_t actual_size = 0;
+ if (Dispatcher::MessageInTransitAccess::SerializeAndClose(
+ dispatcher, static_cast<char*>(secondary_buffer_) + current_offset,
+ channel, &actual_size)) {
+ handle_table[i].type = static_cast<int32_t>(dispatcher->GetType());
+ handle_table[i].offset = static_cast<uint32_t>(current_offset);
+ handle_table[i].size = static_cast<uint32_t>(actual_size);
+ }
+ // (Nothing to do on failure, since |secondary_buffer_| was cleared, and
+ // |kTypeUnknown| is zero.)
+
+ current_offset += RoundUpMessageAlignment(actual_size);
DCHECK_LE(current_offset, size);
}
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 1503e44..d8eb07d 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -63,35 +63,64 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// quantity (which must be a power of 2).
static const size_t kMessageAlignment = 8;
- enum OwnedBuffer { OWNED_BUFFER };
- // Constructor for a |MessageInTransit| that owns its buffer. |bytes| is
- // optional; if null, the message data will be zero-initialized.
- MessageInTransit(OwnedBuffer,
- Type type,
+ // Forward-declare |Header| so that |View| can use it:
+ private:
+ struct Header;
+ public:
+ // This represents a view of serialized message data in a raw buffer.
+ class View {
+ public:
+ // Constructs a view from the given buffer of the given size. (The size must
+ // be as provided by |MessageInTransit::GetNextMessageSize()|.) The buffer
+ // must remain alive/unmodified through the lifetime of this object.
+ // |buffer| should be |kMessageAlignment|-byte aligned.
+ View(size_t message_size, const void* buffer);
+
+ // API parallel to that for |MessageInTransit| itself (mostly getters for
+ // header data).
+ const void* main_buffer() const { return buffer_; }
+ size_t main_buffer_size() const {
+ return RoundUpMessageAlignment(sizeof(Header) + header()->num_bytes);
+ }
+ const void* secondary_buffer() const {
+ return (message_size_ > main_buffer_size()) ?
+ static_cast<const char*>(buffer_) + main_buffer_size() : NULL;
+ }
+ size_t secondary_buffer_size() const {
+ return message_size_ - main_buffer_size();
+ }
+ size_t total_size() const { return header()->total_size; }
+ uint32_t num_bytes() const { return header()->num_bytes; }
+ const void* bytes() const {
+ return static_cast<const char*>(buffer_) + sizeof(Header);
+ }
+ uint32_t num_handles() const { return header()->num_handles; }
+ Type type() const { return header()->type; }
+ Subtype subtype() const { return header()->subtype; }
+ EndpointId source_id() const { return header()->source_id; }
+ EndpointId destination_id() const { return header()->destination_id; }
+
+ private:
+ const Header* header() const { return static_cast<const Header*>(buffer_); }
+
+ const size_t message_size_;
+ const void* const buffer_;
+
+ // Though this struct is trivial, disallow copy and assign, since it doesn't
+ // own its data. (If you're copying/assigning this, you're probably doing
+ // something wrong.)
+ DISALLOW_COPY_AND_ASSIGN(View);
+ };
+
+ // |bytes| is optional; if null, the message data will be zero-initialized.
+ MessageInTransit(Type type,
Subtype subtype,
uint32_t num_bytes,
uint32_t num_handles,
const void* bytes);
- // "Copy" constructor. The input |MessageInTransit| may own its buffer or not;
- // however, it must not have any dispatchers. The constructed
- // |MessageInTransit| will own its buffer.
- MessageInTransit(OwnedBuffer,
- const MessageInTransit& other);
-
- enum UnownedBuffer { UNOWNED_BUFFER };
- // Constructor for a |MessageInTransit| that is a "view" into another buffer.
- // |buffer| should point to a fully-serialized |MessageInTransit|, and should
- // be aligned on a |kMessageAlignment|-byte boundary. |message_size| should be
- // the value provided by |GetNextMessageSize()|, and |buffer| should have at
- // least that many bytes available. |buffer| should live (without change to
- // the first |message_size| bytes) at least as long the new |MessageInTransit|
- // does.
- //
- // Note: You probably don't want to heap-allocate this kind of
- // |MessageInTransit| (and, e.g., put it into a |scoped_ptr|); you definitely
- // don't want to pass it as a parameter in a |scoped_ptr|. Whenever you use
- // this, you can probably create it directly on the stack.
- MessageInTransit(UnownedBuffer, size_t message_size, void* buffer);
+ // Constructs a |MessageInTransit| from a |View|.
+ explicit MessageInTransit(const View& message_view);
+
~MessageInTransit();
// Gets the size of the next message from |buffer|, which has |buffer_size|
@@ -108,13 +137,12 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// Makes this message "own" the given set of dispatchers. The dispatchers must
// not be referenced from anywhere else (in particular, not from the handle
// table), i.e., each dispatcher must have a reference count of 1. This
- // message must also own its buffers and not already have dispatchers.
+ // message must not already have dispatchers.
void SetDispatchers(
scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers);
- // Serializes any dispatchers to the secondary buffer. This message must own
- // its buffers (in order to have dispatchers in the first place), and the
- // secondary buffer must not yet exist (so this must only be called once). The
+ // Serializes any dispatchers to the secondary buffer. This message must not
+ // already have a secondary buffer (so this must only be called once). The
// caller must ensure (e.g., by holding on to a reference) that |channel|
// stays alive through the call.
void SerializeAndCloseDispatchers(Channel* channel);
@@ -184,14 +212,13 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
uint32_t num_bytes;
// Number of handles "attached".
uint32_t num_handles;
- // To be used soon.
- uint32_t reserved0;
- uint32_t reserved1;
};
struct HandleTableEntry {
+ int32_t type; // From |Dispatcher::Type| (|kTypeUnknown| for "invalid").
uint32_t offset;
- uint32_t size; // (Not including any padding.) A size of 0 means "invalid".
+ uint32_t size; // (Not including any padding.)
+ uint32_t unused;
};
const Header* header() const {
@@ -201,21 +228,16 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
void UpdateTotalSize();
- // Whether we own |main_buffer_| and |secondary_buffer_| or not (that is, we
- // own neither).
- bool owns_buffers_;
-
size_t main_buffer_size_;
void* main_buffer_;
size_t secondary_buffer_size_;
void* secondary_buffer_; // May be null.
- // Any dispatchers that may be attached to this message. This is only
- // supported if this message owns its main buffer. These dispatchers should be
- // "owned" by this message, i.e., have a ref count of exactly 1. (We allow a
- // dispatcher entry to be null, in case it couldn't be duplicated for some
- // reason.)
+ // Any dispatchers that may be attached to this message. These dispatchers
+ // should be "owned" by this message, i.e., have a ref count of exactly 1. (We
+ // allow a dispatcher entry to be null, in case it couldn't be duplicated for
+ // some reason.)
scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers_;
DISALLOW_COPY_AND_ASSIGN(MessageInTransit);
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index d95df38..0092abd 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -68,7 +68,6 @@ MojoResult MessagePipe::WriteMessage(
return EnqueueMessage(
GetPeerPort(port),
make_scoped_ptr(new MessageInTransit(
- MessageInTransit::OWNED_BUFFER,
MessageInTransit::kTypeMessagePipeEndpoint,
MessageInTransit::kSubtypeMessagePipeEndpointData,
num_bytes, num_handles, bytes)),
diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc
index aafee31..58c72da 100644
--- a/mojo/system/proxy_message_pipe_endpoint.cc
+++ b/mojo/system/proxy_message_pipe_endpoint.cc
@@ -47,8 +47,7 @@ void ProxyMessagePipeEndpoint::OnPeerClose() {
is_peer_open_ = false;
EnqueueMessage(make_scoped_ptr(
- new MessageInTransit(MessageInTransit::OWNED_BUFFER,
- MessageInTransit::kTypeMessagePipe,
+ new MessageInTransit(MessageInTransit::kTypeMessagePipe,
MessageInTransit::kSubtypeMessagePipePeerClosed,
0, 0, NULL)));
}
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
index 5d8da3b..029e18a 100644
--- a/mojo/system/raw_channel.cc
+++ b/mojo/system/raw_channel.cc
@@ -192,11 +192,11 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
- // Keep reading data in a loop, and dispatches messages if enough data is
+ // Keep reading data in a loop, and dispatch messages if enough data is
// received. Exit the loop if any of the following happens:
- // - one or more messages were dispatched;
- // - the last read failed, was a partial read or would block;
- // - |Shutdown()| was called.
+ // - one or more messages were dispatched;
+ // - the last read failed, was a partial read or would block;
+ // - |Shutdown()| was called.
do {
if (io_result != IO_SUCCEEDED) {
read_stopped_ = true;
@@ -219,19 +219,18 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
// - |message_size| is only valid if |GetNextMessageSize()| returns true.
// TODO(vtl): Use |message_size| more intelligently (e.g., to request the
// next read).
+ // TODO(vtl): Validate that |message_size| is sane.
while (remaining_bytes > 0 &&
MessageInTransit::GetNextMessageSize(
&read_buffer_->buffer_[read_buffer_start], remaining_bytes,
&message_size) &&
remaining_bytes >= message_size) {
- // TODO(vtl): FIXME -- replace "unowned buffer" |MessageInTransit|s with
- // some sort of "view" abstraction.
- MessageInTransit message(MessageInTransit::UNOWNED_BUFFER, message_size,
- &read_buffer_->buffer_[read_buffer_start]);
- DCHECK_EQ(message.total_size(), message_size);
+ MessageInTransit::View
+ message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
+ DCHECK_EQ(message_view.total_size(), message_size);
// Dispatch the message.
- delegate_->OnReadMessage(message);
+ delegate_->OnReadMessage(message_view);
if (read_stopped_) {
// |Shutdown()| was called in |OnReadMessage()|.
// TODO(vtl): Add test for this case.
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
index 4b0f0f4..40a65bf 100644
--- a/mojo/system/raw_channel.h
+++ b/mojo/system/raw_channel.h
@@ -14,6 +14,7 @@
#include "base/synchronization/lock.h"
#include "mojo/system/constants.h"
#include "mojo/system/embedder/scoped_platform_handle.h"
+#include "mojo/system/message_in_transit.h"
#include "mojo/system/system_impl_export.h"
namespace base {
@@ -23,8 +24,6 @@ class MessageLoopForIO;
namespace mojo {
namespace system {
-class MessageInTransit;
-
// |RawChannel| is an interface to objects that wrap an OS "pipe". It presents
// the following interface to users:
// - Receives and dispatches messages on an I/O thread (running a
@@ -56,7 +55,7 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// Called when a message is read. This may call |Shutdown()| on the
// |RawChannel|, but must not destroy it.
- virtual void OnReadMessage(const MessageInTransit& message) = 0;
+ virtual void OnReadMessage(const MessageInTransit::View& message_view) = 0;
// Called when there's a fatal error, which leads to the channel no longer
// being viable.
diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc
index 5af66c3..b0d5fc6 100644
--- a/mojo/system/raw_channel_posix_unittest.cc
+++ b/mojo/system/raw_channel_posix_unittest.cc
@@ -41,8 +41,7 @@ scoped_ptr<MessageInTransit> MakeTestMessage(uint32_t num_bytes) {
for (size_t i = 0; i < num_bytes; i++)
bytes[i] = static_cast<unsigned char>(i + num_bytes);
return make_scoped_ptr(
- new MessageInTransit(MessageInTransit::OWNED_BUFFER,
- MessageInTransit::kTypeMessagePipeEndpoint,
+ new MessageInTransit(MessageInTransit::kTypeMessagePipeEndpoint,
MessageInTransit::kSubtypeMessagePipeEndpointData,
num_bytes, 0, bytes.data()));
}
@@ -107,7 +106,8 @@ class WriteOnlyRawChannelDelegate : public RawChannel::Delegate {
virtual ~WriteOnlyRawChannelDelegate() {}
// |RawChannel::Delegate| implementation:
- virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE {
+ virtual void OnReadMessage(
+ const MessageInTransit::View& /*message_view*/) OVERRIDE {
NOTREACHED();
}
virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
@@ -145,16 +145,15 @@ class TestMessageReaderAndChecker {
// If we've read the whole message....
if (bytes_.size() >= message_size) {
bool rv = true;
- MessageInTransit message(MessageInTransit::UNOWNED_BUFFER,
- message_size, bytes_.data());
- CHECK_EQ(message.main_buffer_size(), message_size);
+ MessageInTransit::View message_view(message_size, bytes_.data());
+ CHECK_EQ(message_view.main_buffer_size(), message_size);
- if (message.num_bytes() != expected_size) {
+ if (message_view.num_bytes() != expected_size) {
LOG(ERROR) << "Wrong size: " << message_size << " instead of "
<< expected_size << " bytes.";
rv = false;
- } else if (!CheckMessageData(message.bytes(),
- message.num_bytes())) {
+ } else if (!CheckMessageData(message_view.bytes(),
+ message_view.num_bytes())) {
LOG(ERROR) << "Incorrect message bytes.";
rv = false;
}
@@ -162,7 +161,7 @@ class TestMessageReaderAndChecker {
// Erase message data.
bytes_.erase(bytes_.begin(),
bytes_.begin() +
- message.main_buffer_size());
+ message_view.main_buffer_size());
return rv;
}
}
@@ -228,7 +227,8 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
virtual ~ReadCheckerRawChannelDelegate() {}
// |RawChannel::Delegate| implementation (called on the I/O thread):
- virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
+ virtual void OnReadMessage(
+ const MessageInTransit::View& message_view) OVERRIDE {
size_t position;
size_t expected_size;
bool should_signal = false;
@@ -242,10 +242,10 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
should_signal = true;
}
- EXPECT_EQ(expected_size, message.num_bytes()) << position;
- if (message.num_bytes() == expected_size) {
- EXPECT_TRUE(CheckMessageData(message.bytes(), message.num_bytes()))
- << position;
+ EXPECT_EQ(expected_size, message_view.num_bytes()) << position;
+ if (message_view.num_bytes() == expected_size) {
+ EXPECT_TRUE(CheckMessageData(message_view.bytes(),
+ message_view.num_bytes())) << position;
}
if (should_signal)
@@ -352,11 +352,13 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
virtual ~ReadCountdownRawChannelDelegate() {}
// |RawChannel::Delegate| implementation (called on the I/O thread):
- virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
+ virtual void OnReadMessage(
+ const MessageInTransit::View& message_view) OVERRIDE {
EXPECT_LT(count_, expected_count_);
count_++;
- EXPECT_TRUE(CheckMessageData(message.bytes(), message.num_bytes()));
+ EXPECT_TRUE(CheckMessageData(message_view.bytes(),
+ message_view.num_bytes()));
if (count_ >= expected_count_)
done_event_.Signal();