summaryrefslogtreecommitdiffstats
path: root/mojo/system
diff options
context:
space:
mode:
Diffstat (limited to 'mojo/system')
-rw-r--r--mojo/system/channel.cc2
-rw-r--r--mojo/system/message_in_transit.cc63
-rw-r--r--mojo/system/message_in_transit.h4
-rw-r--r--mojo/system/message_pipe.cc163
-rw-r--r--mojo/system/message_pipe.h29
5 files changed, 141 insertions, 120 deletions
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
index 6999a3b..ca65284 100644
--- a/mojo/system/channel.cc
+++ b/mojo/system/channel.cc
@@ -352,7 +352,7 @@ void Channel::OnReadMessageForDownstream(
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
message->DeserializeDispatchers(this);
MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
- MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(), NULL);
+ MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
if (result != MOJO_RESULT_OK) {
// TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
// has been closed (in an unavoidable race). This might also be a "remote"
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index 442cd71..c15d862 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -16,6 +16,28 @@
namespace mojo {
namespace system {
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::kTypeMessagePipeEndpoint;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::kTypeMessagePipe;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::kTypeChannel;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeMessagePipeEndpointData;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
+ MessageInTransit::kInvalidEndpointId;
+STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment;
+STATIC_CONST_MEMBER_DEFINITION const size_t
+ MessageInTransit::kMaxSerializedDispatcherSize;
+STATIC_CONST_MEMBER_DEFINITION const size_t
+ MessageInTransit::kMaxSerializedDispatcherPlatformHandles;
+
struct MessageInTransit::PrivateStructForCompileAsserts {
// The size of |Header| must be a multiple of the alignment.
COMPILE_ASSERT(sizeof(Header) % kMessageAlignment == 0,
@@ -40,34 +62,13 @@ struct MessageInTransit::PrivateStructForCompileAsserts {
sizeof_MessageInTransit_HandleTableEntry_invalid);
};
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
- MessageInTransit::kTypeMessagePipeEndpoint;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
- MessageInTransit::kTypeMessagePipe;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
- MessageInTransit::kTypeChannel;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
- MessageInTransit::kSubtypeMessagePipeEndpointData;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
- MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck;
-STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
- MessageInTransit::kInvalidEndpointId;
-STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment;
-STATIC_CONST_MEMBER_DEFINITION const size_t
- MessageInTransit::kMaxSerializedDispatcherSize;
-STATIC_CONST_MEMBER_DEFINITION const size_t
- MessageInTransit::kMaxSerializedDispatcherPlatformHandles;
-
// For each attached (Mojo) handle, there'll be a handle table entry and
// serialized dispatcher data.
// static
const size_t MessageInTransit::kMaxSecondaryBufferSize = kMaxMessageNumHandles *
(sizeof(HandleTableEntry) + kMaxSerializedDispatcherSize);
+// static
const size_t MessageInTransit::kMaxPlatformHandles =
kMaxMessageNumHandles * kMaxSerializedDispatcherPlatformHandles;
@@ -221,10 +222,14 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
if (!num_handles())
return;
- size_t handle_table_size = num_handles() * sizeof(HandleTableEntry);
- // The size of the secondary buffer. We'll start with the size of the handle
- // table, and add to it as we go along.
- size_t size = handle_table_size;
+ // The offset to the start of the (Mojo) handle table.
+ // TODO(vtl): Add a header to the secondary buffer.
+ const size_t handle_table_start_offset = 0;
+ // The offset to the start of the serialized dispatcher data.
+ const size_t serialized_dispatcher_start_offset =
+ handle_table_start_offset + num_handles() * sizeof(HandleTableEntry);
+ // The size of the secondary buffer we'll add to this as we go along).
+ size_t size = serialized_dispatcher_start_offset;
size_t num_platform_handles = 0;
#if DCHECK_IS_ON
std::vector<size_t> all_max_sizes(num_handles());
@@ -265,9 +270,9 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
platform_handles_.reset(new std::vector<embedder::PlatformHandle>());
}
- HandleTableEntry* handle_table =
- static_cast<HandleTableEntry*>(secondary_buffer_);
- size_t current_offset = handle_table_size;
+ HandleTableEntry* handle_table = reinterpret_cast<HandleTableEntry*>(
+ static_cast<char*>(secondary_buffer_) + handle_table_start_offset);
+ size_t current_offset = serialized_dispatcher_start_offset;
for (size_t i = 0; i < num_handles(); i++) {
Dispatcher* dispatcher = (*dispatchers_)[i].get();
if (!dispatcher) {
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 9e578af..4018dd6 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -242,8 +242,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// To allow us to make assertions about |Header| in the .cc file.
struct PrivateStructForCompileAsserts;
- // "Header" for the data. Must be a multiple of |kMessageAlignment| bytes in
- // size. Must be POD.
+ // Header for the data (main buffer). Must be a multiple of
+ // |kMessageAlignment| bytes in size. Must be POD.
struct Header {
// Total size of the message, including the header, the message data
// ("bytes") including padding (to make it a multiple of |kMessageAlignment|
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index 9752934..e154064 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -74,7 +74,7 @@ MojoResult MessagePipe::WriteMessage(
DCHECK(port == 0 || port == 1);
uint32_t num_handles =
transports ? static_cast<uint32_t>(transports->size()) : 0;
- return EnqueueMessage(
+ return EnqueueMessageInternal(
GetPeerPort(port),
make_scoped_ptr(new MessageInTransit(
MessageInTransit::kTypeMessagePipeEndpoint,
@@ -145,84 +145,8 @@ void MessagePipe::ConvertLocalToProxy(unsigned port) {
MojoResult MessagePipe::EnqueueMessage(
unsigned port,
- scoped_ptr<MessageInTransit> message,
- std::vector<DispatcherTransport>* transports) {
- DCHECK(port == 0 || port == 1);
- DCHECK(message.get());
- if (message->num_handles() == 0) {
- // If |message->num_handles()| is 0, then |transports| should be null and
- // |message| should not have dispatchers.
- DCHECK(!transports);
- DCHECK(!message->has_dispatchers());
- } else {
- // Otherwise either |transports| must be (non-null and) of the right size
- // and the message shouldn't have dispatchers, or |transports| must be null
- // and the message should have the right number of dispatchers.
- DCHECK((transports && transports->size() == message->num_handles() &&
- !message->has_dispatchers()) ||
- (!transports && message->has_dispatchers() &&
- message->dispatchers()->size() == message->num_handles()));
- }
-
- if (message->type() == MessageInTransit::kTypeMessagePipe) {
- DCHECK(!transports);
- return HandleControlMessage(port, message.Pass());
- }
-
- DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[GetPeerPort(port)].get());
-
- // The destination port need not be open, unlike the source port.
- if (!endpoints_[port].get())
- return MOJO_RESULT_FAILED_PRECONDITION;
-
- if (transports) {
- DCHECK(!message->dispatchers());
-
- // You're not allowed to send either handle to a message pipe over the
- // message pipe, so check for this. (The case of trying to write a handle to
- // itself is taken care of by |Core|. That case kind of makes sense, but
- // leads to complications if, e.g., both sides try to do the same thing with
- // their respective handles simultaneously. The other case, of trying to
- // write the peer handle to a handle, doesn't make sense -- since no handle
- // will be available to read the message from.)
- for (size_t i = 0; i < transports->size(); i++) {
- if (!(*transports)[i].is_valid())
- continue;
- if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
- MessagePipeDispatcherTransport mp_transport((*transports)[i]);
- if (mp_transport.GetMessagePipe() == this) {
- // The other case should have been disallowed by |Core|. (Note: |port|
- // is the peer port of the handle given to |WriteMessage()|.)
- DCHECK_EQ(mp_transport.GetPort(), port);
- return MOJO_RESULT_INVALID_ARGUMENT;
- }
- }
- }
-
- // Clone the dispatchers and attach them to the message. (This must be done
- // as a separate loop, since we want to leave the dispatchers alone on
- // failure.)
- scoped_ptr<std::vector<scoped_refptr<Dispatcher> > >
- dispatchers(new std::vector<scoped_refptr<Dispatcher> >());
- dispatchers->reserve(transports->size());
- for (size_t i = 0; i < transports->size(); i++) {
- if ((*transports)[i].is_valid()) {
- dispatchers->push_back(
- (*transports)[i].CreateEquivalentDispatcherAndClose());
- } else {
- LOG(WARNING) << "Enqueueing null dispatcher";
- dispatchers->push_back(scoped_refptr<Dispatcher>());
- }
- }
- message->SetDispatchers(dispatchers.Pass());
- }
-
- // The endpoint's |EnqueueMessage()| may not report failure.
- endpoints_[port]->EnqueueMessage(message.Pass());
- return MOJO_RESULT_OK;
+ scoped_ptr<MessageInTransit> message) {
+ return EnqueueMessageInternal(port, message.Pass(), NULL);
}
bool MessagePipe::Attach(unsigned port,
@@ -275,6 +199,87 @@ MessagePipe::~MessagePipe() {
DCHECK(!endpoints_[1].get());
}
+MojoResult MessagePipe::EnqueueMessageInternal(
+ unsigned port,
+ scoped_ptr<MessageInTransit> message,
+ std::vector<DispatcherTransport>* transports) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(message.get());
+
+ if (message->type() == MessageInTransit::kTypeMessagePipe) {
+ DCHECK(!transports);
+ return HandleControlMessage(port, message.Pass());
+ }
+
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[GetPeerPort(port)].get());
+
+ // The destination port need not be open, unlike the source port.
+ if (!endpoints_[port].get())
+ return MOJO_RESULT_FAILED_PRECONDITION;
+
+ if (transports) {
+ MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
+ if (result != MOJO_RESULT_OK)
+ return result;
+ }
+
+ if (message->has_dispatchers())
+ DCHECK_EQ(message->dispatchers()->size(), message->num_handles());
+
+ // The endpoint's |EnqueueMessage()| may not report failure.
+ endpoints_[port]->EnqueueMessage(message.Pass());
+ return MOJO_RESULT_OK;
+}
+
+MojoResult MessagePipe::AttachTransportsNoLock(
+ unsigned port,
+ MessageInTransit* message,
+ std::vector<DispatcherTransport>* transports) {
+ DCHECK(!message->has_dispatchers());
+ DCHECK_EQ(transports->size(), message->num_handles());
+
+ // You're not allowed to send either handle to a message pipe over the message
+ // pipe, so check for this. (The case of trying to write a handle to itself is
+ // taken care of by |Core|. That case kind of makes sense, but leads to
+ // complications if, e.g., both sides try to do the same thing with their
+ // respective handles simultaneously. The other case, of trying to write the
+ // peer handle to a handle, doesn't make sense -- since no handle will be
+ // available to read the message from.)
+ for (size_t i = 0; i < transports->size(); i++) {
+ if (!(*transports)[i].is_valid())
+ continue;
+ if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
+ MessagePipeDispatcherTransport mp_transport((*transports)[i]);
+ if (mp_transport.GetMessagePipe() == this) {
+ // The other case should have been disallowed by |Core|. (Note: |port|
+ // is the peer port of the handle given to |WriteMessage()|.)
+ DCHECK_EQ(mp_transport.GetPort(), port);
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+ }
+ }
+
+ // Clone the dispatchers and attach them to the message. (This must be done as
+ // a separate loop, since we want to leave the dispatchers alone on failure.)
+ scoped_ptr<std::vector<scoped_refptr<Dispatcher> > >
+ dispatchers(new std::vector<scoped_refptr<Dispatcher> >());
+ dispatchers->reserve(transports->size());
+ for (size_t i = 0; i < transports->size(); i++) {
+ if ((*transports)[i].is_valid()) {
+ dispatchers->push_back(
+ (*transports)[i].CreateEquivalentDispatcherAndClose());
+ } else {
+ LOG(WARNING) << "Enqueueing null dispatcher";
+ dispatchers->push_back(scoped_refptr<Dispatcher>());
+ }
+ }
+ message->SetDispatchers(dispatchers.Pass());
+ return MOJO_RESULT_OK;
+}
+
MojoResult MessagePipe::HandleControlMessage(
unsigned /*port*/,
scoped_ptr<MessageInTransit> message) {
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
index 6392ab3..7fef65b 100644
--- a/mojo/system/message_pipe.h
+++ b/mojo/system/message_pipe.h
@@ -71,14 +71,11 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe :
// endpoint.
void ConvertLocalToProxy(unsigned port);
- // This is used internally by |WriteMessage()| and by |Channel| to enqueue
- // messages (typically to a |LocalMessagePipeEndpoint|). Unlike
- // |WriteMessage()|, |port| is the *destination* port. |transports| should be
- // non-null only if it's nonempty, and only if |message| has no dispatchers
- // attached.
+ // This is used by |Channel| to enqueue messages (typically to a
+ // |LocalMessagePipeEndpoint|). Unlike |WriteMessage()|, |port| is the
+ // *destination* port.
MojoResult EnqueueMessage(unsigned port,
- scoped_ptr<MessageInTransit> message,
- std::vector<DispatcherTransport>* transports);
+ scoped_ptr<MessageInTransit> message);
// These are used by |Channel|.
bool Attach(unsigned port,
@@ -91,8 +88,22 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe :
friend class base::RefCountedThreadSafe<MessagePipe>;
virtual ~MessagePipe();
- // Used by |EnqueueMessage()| to handle control messages that are actually
- // meant for us.
+ // This is used internally by |WriteMessage()| and by |EnqueueMessage()|.
+ // |transports| may be non-null only if it's nonempty and |message| has no
+ // dispatchers attached.
+ MojoResult EnqueueMessageInternal(
+ unsigned port,
+ scoped_ptr<MessageInTransit> message,
+ std::vector<DispatcherTransport>* transports);
+
+ // Helper for |EnqueueMessageInternal()|. Must be called with |lock_| held.
+ MojoResult AttachTransportsNoLock(
+ unsigned port,
+ MessageInTransit* message,
+ std::vector<DispatcherTransport>* transports);
+
+ // Used by |EnqueueMessageInternal()| to handle control messages that are
+ // actually meant for us.
MojoResult HandleControlMessage(unsigned port,
scoped_ptr<MessageInTransit> message);