summaryrefslogtreecommitdiffstats
path: root/mojo/system
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-03-05 05:58:10 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-03-05 05:58:10 +0000
commit23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d (patch)
treee60007ea90c0ad1af1653f772f689054b9c7bb4d /mojo/system
parenta0c57c178a064a6a28daa15356876c4298948258 (diff)
downloadchromium_src-23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d.zip
chromium_src-23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d.tar.gz
chromium_src-23b89dfa99470b2cc6d0e25ca47d708d9e2cd77d.tar.bz2
Mojo: First pass at cross-process passing of MessagePipe handles.
There's still stuff left to do: - Some high-priority TODOs are marked with an additional FIXME (mostly for my benefit). - Notably, we need better error handling (in particular, if the other process spews garbage, we should correctly detect this and not die). - There's a crasher bug (see RemoteMessagePipeTest.HandlePassing). - Also, we need more testing, and more end-to-end testing. R=darin@chromium.org Review URL: https://codereview.chromium.org/180953003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@254946 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r--mojo/system/channel.cc61
-rw-r--r--mojo/system/channel.h9
-rw-r--r--mojo/system/dispatcher.cc30
-rw-r--r--mojo/system/dispatcher.h10
-rw-r--r--mojo/system/embedder/embedder.cc17
-rw-r--r--mojo/system/local_message_pipe_endpoint.h3
-rw-r--r--mojo/system/message_in_transit.cc51
-rw-r--r--mojo/system/message_in_transit.h15
-rw-r--r--mojo/system/message_pipe.cc44
-rw-r--r--mojo/system/message_pipe.h9
-rw-r--r--mojo/system/message_pipe_dispatcher.cc85
-rw-r--r--mojo/system/message_pipe_dispatcher.h19
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.cc20
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.h8
-rw-r--r--mojo/system/remote_message_pipe_posix_unittest.cc67
15 files changed, 384 insertions, 64 deletions
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
index 804e4d7..02dc0da 100644
--- a/mojo/system/channel.cc
+++ b/mojo/system/channel.cc
@@ -105,13 +105,38 @@ void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.find(local_id);
+ // TODO(vtl): FIXME -- This check is wrong if this is in response to a
+ // |kSubtypeChannelRunMessagePipeEndpoint| message. We should report error.
CHECK(it != local_id_to_endpoint_info_map_.end());
endpoint_info = it->second;
}
+ // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
+ // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
}
+void Channel::RunRemoteMessagePipeEndpoint(
+ MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id) {
+ base::AutoLock locker(lock_);
+
+ DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
+ local_id_to_endpoint_info_map_.end());
+
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::kTypeChannel,
+ MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
+ 0, 0, NULL));
+ message->set_source_id(local_id);
+ message->set_destination_id(remote_id);
+ if (!raw_channel_->WriteMessage(message.Pass())) {
+ // TODO(vtl): FIXME -- I guess we should report the error back somehow so
+ // that the dispatcher can be closed?
+ CHECK(false) << "Not yet handled";
+ }
+}
+
bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
base::AutoLock locker(lock_);
if (!raw_channel_.get()) {
@@ -202,26 +227,36 @@ void Channel::OnReadMessageForDownstream(
// ownership of it.
// TODO(vtl): Need to enforce limits on message size and handle count.
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
- std::vector<DispatcherTransport> transports(message->num_handles());
- // TODO(vtl): Create dispatchers for handles.
- // TODO(vtl): It's bad that the current API will create equivalent dispatchers
- // for the freshly-created ones, which is totally redundant. Make a version of
- // |EnqueueMessage()| that passes ownership.
- if (endpoint_info.message_pipe->EnqueueMessage(
- MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(),
- transports.empty() ? NULL : &transports) != MOJO_RESULT_OK) {
+ message->DeserializeDispatchers(this);
+ MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
+ MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(), NULL);
+ if (result != MOJO_RESULT_OK) {
HandleLocalError(base::StringPrintf(
- "Failed to enqueue message to local destination ID %u",
- static_cast<unsigned>(local_id)));
+ "Failed to enqueue message to local destination ID %u (result %d)",
+ static_cast<unsigned>(local_id), static_cast<int>(result)));
return;
}
}
void Channel::OnReadMessageForChannel(
const MessageInTransit::View& message_view) {
- // TODO(vtl): Currently no channel-only messages yet.
- HandleRemoteError("Received invalid channel message");
- NOTREACHED();
+ DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
+
+ switch (message_view.subtype()) {
+ case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
+ // TODO(vtl): FIXME -- Error handling (also validation of
+ // source/destination IDs).
+ DVLOG(2) << "Handling channel message to run message pipe (local ID = "
+ << message_view.destination_id() << ", remote ID = "
+ << message_view.source_id() << ")";
+ RunMessagePipeEndpoint(message_view.destination_id(),
+ message_view.source_id());
+ break;
+ default:
+ HandleRemoteError("Received invalid channel message");
+ NOTREACHED();
+ break;
+ }
}
void Channel::HandleRemoteError(const base::StringPiece& error_message) {
diff --git a/mojo/system/channel.h b/mojo/system/channel.h
index 336aae9..c50bf49 100644
--- a/mojo/system/channel.h
+++ b/mojo/system/channel.h
@@ -80,6 +80,15 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
void RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id);
+ // Tells the other side of the channel to run a message pipe endpoint (which
+ // must already be attached); |local_id| and |remote_id| are relative to this
+ // channel (i.e., |local_id| is the other side's remote ID and |remote_id| is
+ // its local ID).
+ // TODO(vtl): Maybe we should just have a flag argument to
+ // |RunMessagePipeEndpoint()| that tells it to do this.
+ void RunRemoteMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
+ MessageInTransit::EndpointId remote_id);
+
// This forwards |message| verbatim to |raw_channel_|.
bool WriteMessage(scoped_ptr<MessageInTransit> message);
diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc
index 896d044..c22a73b 100644
--- a/mojo/system/dispatcher.cc
+++ b/mojo/system/dispatcher.cc
@@ -6,6 +6,7 @@
#include "base/logging.h"
#include "mojo/system/constants.h"
+#include "mojo/system/message_pipe_dispatcher.h"
namespace mojo {
namespace system {
@@ -39,11 +40,11 @@ size_t Dispatcher::MessageInTransitAccess::GetMaximumSerializedSize(
// static
bool Dispatcher::MessageInTransitAccess::SerializeAndClose(
Dispatcher* dispatcher,
- void* destination,
Channel* channel,
+ void* destination,
size_t* actual_size) {
DCHECK(dispatcher);
- return dispatcher->SerializeAndClose(destination, channel, actual_size);
+ return dispatcher->SerializeAndClose(channel, destination, actual_size);
}
// static
@@ -52,7 +53,20 @@ scoped_refptr<Dispatcher> Dispatcher::MessageInTransitAccess::Deserialize(
int32_t type,
const void* source,
size_t size) {
- // TODO(vtl)
+ switch (static_cast<int32_t>(type)) {
+ case kTypeUnknown:
+ DVLOG(2) << "Deserializing invalid handle";
+ return scoped_refptr<Dispatcher>();
+ case kTypeMessagePipe:
+ return scoped_refptr<Dispatcher>(
+ MessagePipeDispatcher::Deserialize(channel, source, size));
+ case kTypeDataPipeProducer:
+ case kTypeDataPipeConsumer:
+ LOG(WARNING) << "Deserialization of dispatcher type " << type
+ << " not supported";
+ return scoped_refptr<Dispatcher>();
+ }
+ LOG(WARNING) << "Unknown dispatcher type " << type;
return scoped_refptr<Dispatcher>();
}
@@ -333,8 +347,8 @@ size_t Dispatcher::GetMaximumSerializedSizeImplNoLock(
return 0;
}
-bool Dispatcher::SerializeAndCloseImplNoLock(void* /*destination*/,
- Channel* /*channel*/,
+bool Dispatcher::SerializeAndCloseImplNoLock(Channel* /*channel*/,
+ void* /*destination*/,
size_t* /*actual_size*/) {
lock_.AssertAcquired();
DCHECK(is_closed_);
@@ -379,8 +393,8 @@ size_t Dispatcher::GetMaximumSerializedSize(const Channel* channel) const {
return GetMaximumSerializedSizeImplNoLock(channel);
}
-bool Dispatcher::SerializeAndClose(void* destination,
- Channel* channel,
+bool Dispatcher::SerializeAndClose(Channel* channel,
+ void* destination,
size_t* actual_size) {
DCHECK(destination);
DCHECK(channel);
@@ -401,7 +415,7 @@ bool Dispatcher::SerializeAndClose(void* destination,
// No need to cancel waiters: we shouldn't have any (and shouldn't be in
// |Core|'s handle table.
- if (!SerializeAndCloseImplNoLock(destination, channel, actual_size))
+ if (!SerializeAndCloseImplNoLock(channel, destination, actual_size))
return false;
DCHECK_LE(*actual_size, max_size);
diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h
index 7b2bdb5..b4cd526 100644
--- a/mojo/system/dispatcher.h
+++ b/mojo/system/dispatcher.h
@@ -161,8 +161,8 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
static size_t GetMaximumSerializedSize(const Dispatcher* dispatcher,
const Channel* channel);
static bool SerializeAndClose(Dispatcher* dispatcher,
- void* destination,
Channel* channel,
+ void* destination,
size_t* actual_size);
// Deserialization API.
@@ -236,8 +236,8 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
// being passed over a message pipe.
virtual size_t GetMaximumSerializedSizeImplNoLock(
const Channel* channel) const;
- virtual bool SerializeAndCloseImplNoLock(void* destination,
- Channel* channel,
+ virtual bool SerializeAndCloseImplNoLock(Channel* channel,
+ void* destination,
size_t* actual_size);
// Available to subclasses. (Note: Returns a non-const reference, just like
@@ -280,8 +280,8 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher :
// success, in which case |*actual_size| is set to the amount it actually
// wrote to |destination|. On failure, |*actual_size| should not be modified;
// however, the dispatcher will still be closed.
- bool SerializeAndClose(void* destination,
- Channel* channel,
+ bool SerializeAndClose(Channel* channel,
+ void* destination,
size_t* actual_size);
// This protects the following members as well as any state added by
diff --git a/mojo/system/embedder/embedder.cc b/mojo/system/embedder/embedder.cc
index 84949c7..03b5440 100644
--- a/mojo/system/embedder/embedder.cc
+++ b/mojo/system/embedder/embedder.cc
@@ -10,10 +10,8 @@
#include "base/memory/scoped_ptr.h"
#include "mojo/system/channel.h"
#include "mojo/system/core_impl.h"
-#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/message_pipe_dispatcher.h"
-#include "mojo/system/proxy_message_pipe_endpoint.h"
namespace mojo {
namespace embedder {
@@ -57,24 +55,19 @@ MojoHandle CreateChannel(
DidCreateChannelOnIOThreadCallback callback) {
DCHECK(platform_handle.is_valid());
- scoped_refptr<system::MessagePipe> message_pipe(
- new system::MessagePipe(scoped_ptr<system::MessagePipeEndpoint>(
- new system::LocalMessagePipeEndpoint()),
- scoped_ptr<system::MessagePipeEndpoint>(
- new system::ProxyMessagePipeEndpoint())));
- scoped_refptr<system::MessagePipeDispatcher> dispatcher(
- new system::MessagePipeDispatcher());
- dispatcher->Init(message_pipe, 0);
+ std::pair<scoped_refptr<system::MessagePipeDispatcher>,
+ scoped_refptr<system::MessagePipe> > remote_message_pipe =
+ system::MessagePipeDispatcher::CreateRemoteMessagePipe();
system::CoreImpl* core_impl = static_cast<system::CoreImpl*>(Core::Get());
DCHECK(core_impl);
- MojoHandle rv = core_impl->AddDispatcher(dispatcher);
+ MojoHandle rv = core_impl->AddDispatcher(remote_message_pipe.first);
// TODO(vtl): Do we properly handle the failure case here?
if (rv != MOJO_HANDLE_INVALID) {
io_thread_task_runner->PostTask(FROM_HERE,
base::Bind(&CreateChannelOnIOThread,
base::Passed(&platform_handle),
- message_pipe,
+ remote_message_pipe.second,
callback));
}
return rv;
diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h
index d923321..bae0786 100644
--- a/mojo/system/local_message_pipe_endpoint.h
+++ b/mojo/system/local_message_pipe_endpoint.h
@@ -41,6 +41,9 @@ class MOJO_SYSTEM_IMPL_EXPORT LocalMessagePipeEndpoint
MojoResult wake_result) OVERRIDE;
virtual void RemoveWaiter(Waiter* waiter) OVERRIDE;
+ // This is only to be used by |ProxyMessagePipeEndpoint|:
+ MessageInTransitQueue* message_queue() { return &message_queue_; }
+
private:
MojoWaitFlags SatisfiedFlags();
MojoWaitFlags SatisfiableFlags();
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index a444a8e..9ac129e 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -89,6 +89,8 @@ MessageInTransit::MessageInTransit(Type type,
}
}
+// TODO(vtl): Do I really want/need to copy the secondary buffer here, or should
+// I just create (deserialize) the dispatchers right away?
MessageInTransit::MessageInTransit(const View& message_view)
: main_buffer_size_(message_view.main_buffer_size()),
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)),
@@ -200,16 +202,19 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
continue;
}
+ void* destination = static_cast<char*>(secondary_buffer_) + current_offset;
size_t actual_size = 0;
if (Dispatcher::MessageInTransitAccess::SerializeAndClose(
- dispatcher, static_cast<char*>(secondary_buffer_) + current_offset,
- channel, &actual_size)) {
+ dispatcher, channel, destination, &actual_size)) {
handle_table[i].type = static_cast<int32_t>(dispatcher->GetType());
handle_table[i].offset = static_cast<uint32_t>(current_offset);
handle_table[i].size = static_cast<uint32_t>(actual_size);
+ } else {
+ // (Nothing to do on failure, since |secondary_buffer_| was cleared, and
+ // |kTypeUnknown| is zero.)
+ // The handle will simply be closed.
+ LOG(ERROR) << "Failed to serialize handle to remote message pipe";
}
- // (Nothing to do on failure, since |secondary_buffer_| was cleared, and
- // |kTypeUnknown| is zero.)
current_offset += RoundUpMessageAlignment(actual_size);
DCHECK_LE(current_offset, size);
@@ -218,6 +223,44 @@ void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
UpdateTotalSize();
}
+void MessageInTransit::DeserializeDispatchers(Channel* channel) {
+ DCHECK(!dispatchers_.get());
+
+ if (!num_handles())
+ return;
+
+ // TODO(vtl): Restrict |num_handles()| to a sane range. (Maybe this should be
+ // done earlier?)
+
+ dispatchers_.reset(
+ new std::vector<scoped_refptr<Dispatcher> >(num_handles()));
+
+ size_t handle_table_size = num_handles() * sizeof(HandleTableEntry);
+ if (secondary_buffer_size_ < handle_table_size) {
+ LOG(ERROR) << "Serialized handle table too small";
+ return;
+ }
+
+ const HandleTableEntry* handle_table =
+ static_cast<const HandleTableEntry*>(secondary_buffer_);
+ for (size_t i = 0; i < num_handles(); i++) {
+ size_t offset = handle_table[i].offset;
+ size_t size = handle_table[i].size;
+ // TODO(vtl): Sanity-check the size.
+ if (offset % kMessageAlignment != 0 || offset > secondary_buffer_size_ ||
+ offset + size > secondary_buffer_size_) {
+ // TODO(vtl): Maybe should report error (and make it possible to kill the
+ // connection with extreme prejudice).
+ LOG(ERROR) << "Invalid serialized handle table entry";
+ continue;
+ }
+
+ const void* source = static_cast<const char*>(secondary_buffer_) + offset;
+ (*dispatchers_)[i] = Dispatcher::MessageInTransitAccess::Deserialize(
+ channel, handle_table[i].type, source, size);
+ }
+}
+
void MessageInTransit::UpdateTotalSize() {
DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u);
DCHECK_EQ(secondary_buffer_size_ % kMessageAlignment, 0u);
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index cbb7271..7a8475d 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -54,6 +54,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
static const Subtype kSubtypeMessagePipeEndpointData = 0;
// Subtypes for type |kTypeMessagePipe|:
static const Subtype kSubtypeMessagePipePeerClosed = 0;
+ // Subtypes for type |kTypeChannel|:
+ static const Subtype kSubtypeChannelRunMessagePipeEndpoint = 0;
typedef uint32_t EndpointId;
// Never a valid endpoint ID.
@@ -146,6 +148,12 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// stays alive through the call.
void SerializeAndCloseDispatchers(Channel* channel);
+ // Deserializes any dispatchers from the secondary buffer. This message must
+ // not have any dispatchers attached.
+ // TODO(vtl): Having to copy the secondary buffer (in the constructor from a
+ // |View|) is suboptimal. Maybe this should just be done in the constructor?
+ void DeserializeDispatchers(Channel* channel);
+
// Gets the main buffer and its size (in number of bytes), respectively.
const void* main_buffer() const { return main_buffer_; }
size_t main_buffer_size() const { return main_buffer_size_; }
@@ -185,6 +193,11 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
return dispatchers_.get();
}
+ // Returns true if this message has dispatchers attached.
+ bool has_dispatchers() const {
+ return dispatchers_.get() && !dispatchers_->empty();
+ }
+
// Rounds |n| up to a multiple of |kMessageAlignment|.
static inline size_t RoundUpMessageAlignment(size_t n) {
return (n + kMessageAlignment - 1) & ~(kMessageAlignment - 1);
@@ -215,7 +228,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
struct HandleTableEntry {
int32_t type; // From |Dispatcher::Type| (|kTypeUnknown| for "invalid").
- uint32_t offset;
+ uint32_t offset; // Relative to the start of the secondary buffer.
uint32_t size; // (Not including any padding.)
uint32_t unused;
};
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index b18fb75..e450569 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -35,6 +35,8 @@ unsigned MessagePipe::GetPeerPort(unsigned port) {
MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port].get());
+
return endpoints_[port]->GetType();
}
@@ -116,15 +118,49 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
endpoints_[port]->RemoveWaiter(waiter);
}
+void MessagePipe::ConvertLocalToProxy(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port].get());
+ DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
+
+ bool is_peer_open = !!endpoints_[GetPeerPort(port)].get();
+
+ // TODO(vtl): Hopefully this will work if the peer has been closed and when
+ // the peer is local. If the peer is remote, we should do something more
+ // sophisticated.
+ DCHECK(!is_peer_open ||
+ endpoints_[GetPeerPort(port)]->GetType() ==
+ MessagePipeEndpoint::kTypeLocal);
+
+ scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
+ new ProxyMessagePipeEndpoint(
+ static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
+ is_peer_open));
+ endpoints_[port].swap(replacement_endpoint);
+}
+
MojoResult MessagePipe::EnqueueMessage(
unsigned port,
scoped_ptr<MessageInTransit> message,
std::vector<DispatcherTransport>* transports) {
DCHECK(port == 0 || port == 1);
DCHECK(message.get());
- DCHECK((!transports && message->num_handles() == 0) ||
- (transports && transports->size() > 0 &&
- message->num_handles() == transports->size()));
+ if (message->num_handles() == 0) {
+ // If |message->num_handles()| is 0, then |transports| should be null and
+ // |message| should not have dispatchers.
+ DCHECK(!transports);
+ DCHECK(!message->has_dispatchers());
+ } else {
+ // Otherwise either |transports| must be (non-null and) of the right size
+ // and the message shouldn't have dispatchers, or |transports| must be null
+ // and the message should have the right number of dispatchers.
+ DCHECK((transports && transports->size() == message->num_handles() &&
+ !message->has_dispatchers()) ||
+ (!transports && message->has_dispatchers() &&
+ message->dispatchers()->size() == message->num_handles()));
+ }
if (message->type() == MessageInTransit::kTypeMessagePipe) {
DCHECK(!transports);
@@ -141,6 +177,8 @@ MojoResult MessagePipe::EnqueueMessage(
return MOJO_RESULT_FAILED_PRECONDITION;
if (transports) {
+ DCHECK(!message->dispatchers());
+
// You're not allowed to send either handle to a message pipe over the
// message pipe, so check for this. (The case of trying to write a handle to
// itself is taken care of by |CoreImpl|. That case kind of makes sense, but
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
index 33e49a1..6815dcb 100644
--- a/mojo/system/message_pipe.h
+++ b/mojo/system/message_pipe.h
@@ -68,10 +68,15 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipe :
MojoResult wake_result);
void RemoveWaiter(unsigned port, Waiter* waiter);
+ // This is called by the dispatcher to convert a local endpoint to a proxy
+ // endpoint.
+ void ConvertLocalToProxy(unsigned port);
+
// This is used internally by |WriteMessage()| and by |Channel| to enqueue
// messages (typically to a |LocalMessagePipeEndpoint|). Unlike
- // |WriteMessage()|, |port| is the *destination* port. |dispatchers| should be
- // non-null only if it's nonempty.
+ // |WriteMessage()|, |port| is the *destination* port. |transports| should be
+ // non-null only if it's nonempty, and only if |message| has no dispatchers
+ // attached.
MojoResult EnqueueMessage(unsigned port,
scoped_ptr<MessageInTransit> message,
std::vector<DispatcherTransport>* transports);
diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc
index 2390acb..bf9e5b0 100644
--- a/mojo/system/message_pipe_dispatcher.cc
+++ b/mojo/system/message_pipe_dispatcher.cc
@@ -5,15 +5,27 @@
#include "mojo/system/message_pipe_dispatcher.h"
#include "base/logging.h"
+#include "mojo/system/channel.h"
#include "mojo/system/constants.h"
+#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/memory.h"
+#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe.h"
+#include "mojo/system/proxy_message_pipe_endpoint.h"
namespace mojo {
namespace system {
+namespace {
+
const unsigned kInvalidPort = static_cast<unsigned>(-1);
+struct SerializedMessagePipeDispatcher {
+ MessageInTransit::EndpointId endpoint_id;
+};
+
+} // namespace
+
// MessagePipeDispatcher -------------------------------------------------------
MessagePipeDispatcher::MessagePipeDispatcher()
@@ -33,6 +45,45 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
return kTypeMessagePipe;
}
+// static
+std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
+MessagePipeDispatcher::CreateRemoteMessagePipe() {
+ scoped_refptr<MessagePipe> message_pipe(
+ new MessagePipe(
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
+ scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher());
+ dispatcher->Init(message_pipe, 0);
+
+ return std::make_pair(dispatcher, message_pipe);
+}
+
+// static
+scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
+ Channel* channel,
+ const void* source,
+ size_t size) {
+ if (size != sizeof(SerializedMessagePipeDispatcher)) {
+ LOG(ERROR) << "Invalid serialized message pipe dispatcher";
+ return scoped_refptr<MessagePipeDispatcher>();
+ }
+
+ std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
+ remote_message_pipe = CreateRemoteMessagePipe();
+
+ MessageInTransit::EndpointId remote_id =
+ static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
+ MessageInTransit::EndpointId local_id =
+ channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1);
+ DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = "
+ << remote_id << ", new local ID = " << local_id << ")";
+
+ channel->RunMessagePipeEndpoint(local_id, remote_id);
+ // TODO(vtl): FIXME -- Need some error handling here.
+ channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
+ return remote_message_pipe.first;
+}
+
MessagePipeDispatcher::~MessagePipeDispatcher() {
// |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
DCHECK(!message_pipe_.get());
@@ -121,6 +172,40 @@ void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
message_pipe_->RemoveWaiter(port_, waiter);
}
+size_t MessagePipeDispatcher::GetMaximumSerializedSizeImplNoLock(
+ const Channel* /*channel*/) const {
+ lock().AssertAcquired();
+ return sizeof(SerializedMessagePipeDispatcher);
+}
+
+bool MessagePipeDispatcher::SerializeAndCloseImplNoLock(Channel* channel,
+ void* destination,
+ size_t* actual_size) {
+ lock().AssertAcquired();
+
+ // Convert the local endpoint to a proxy endpoint (moving the message queue).
+ message_pipe_->ConvertLocalToProxy(port_);
+
+ // Attach the new proxy endpoint to the channel.
+ MessageInTransit::EndpointId endpoint_id =
+ channel->AttachMessagePipeEndpoint(message_pipe_, port_);
+ DCHECK_NE(endpoint_id, MessageInTransit::kInvalidEndpointId);
+
+ DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
+ << ")";
+
+ // We now have a local ID. Before we can run the proxy endpoint, we need to
+ // get an ack back from the other side with the remote ID.
+ static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
+ endpoint_id;
+
+ message_pipe_ = NULL;
+ port_ = kInvalidPort;
+
+ *actual_size = sizeof(SerializedMessagePipeDispatcher);
+ return true;
+}
+
// MessagePipeDispatcherTransport ----------------------------------------------
MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
diff --git a/mojo/system/message_pipe_dispatcher.h b/mojo/system/message_pipe_dispatcher.h
index f9ce641..34e4b96 100644
--- a/mojo/system/message_pipe_dispatcher.h
+++ b/mojo/system/message_pipe_dispatcher.h
@@ -5,6 +5,8 @@
#ifndef MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
#define MOJO_SYSTEM_MESSAGE_PIPE_DISPATCHER_H_
+#include <utility>
+
#include "base/basictypes.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
@@ -28,6 +30,18 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher {
virtual Type GetType() const OVERRIDE;
+ // Creates a |MessagePipe| with a local endpoint (at port 0) and a proxy
+ // endpoint, and creates/initializes a |MessagePipeDispatcher| (attached to
+ // the message pipe, port 0).
+ static std::pair<scoped_refptr<MessagePipeDispatcher>,
+ scoped_refptr<MessagePipe> > CreateRemoteMessagePipe();
+
+ // The "opposite" of |SerializeAndClose()|. (Typically this is called by
+ // |Dispatcher::Deserialize()|.)
+ static scoped_refptr<MessagePipeDispatcher> Deserialize(Channel* channel,
+ const void* source,
+ size_t size);
+
private:
friend class MessagePipeDispatcherTransport;
@@ -62,6 +76,11 @@ class MOJO_SYSTEM_IMPL_EXPORT MessagePipeDispatcher : public Dispatcher {
MojoWaitFlags flags,
MojoResult wake_result) OVERRIDE;
virtual void RemoveWaiterImplNoLock(Waiter* waiter) OVERRIDE;
+ virtual size_t GetMaximumSerializedSizeImplNoLock(
+ const Channel* channel) const OVERRIDE;
+ virtual bool SerializeAndCloseImplNoLock(Channel* channel,
+ void* destination,
+ size_t* actual_size) OVERRIDE;
// Protected by |lock()|:
scoped_refptr<MessagePipe> message_pipe_; // This will be null if closed.
diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc
index 95fe180..d5ef656 100644
--- a/mojo/system/proxy_message_pipe_endpoint.cc
+++ b/mojo/system/proxy_message_pipe_endpoint.cc
@@ -8,6 +8,7 @@
#include "base/logging.h"
#include "mojo/system/channel.h"
+#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_pipe_dispatcher.h"
namespace mojo {
@@ -20,6 +21,18 @@ ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
is_peer_open_(true) {
}
+ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
+ LocalMessagePipeEndpoint* local_message_pipe_endpoint,
+ bool is_peer_open)
+ : local_id_(MessageInTransit::kInvalidEndpointId),
+ remote_id_(MessageInTransit::kInvalidEndpointId),
+ is_open_(true),
+ is_peer_open_(is_peer_open),
+ paused_message_queue_(MessageInTransitQueue::PassContents(),
+ local_message_pipe_endpoint->message_queue()) {
+ local_message_pipe_endpoint->Close();
+}
+
ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
DCHECK(!is_running());
DCHECK(!is_attached());
@@ -61,13 +74,6 @@ void ProxyMessagePipeEndpoint::EnqueueMessage(
scoped_ptr<MessageInTransit> message) {
DCHECK(is_open_);
- if (message->dispatchers() && !message->dispatchers()->empty()) {
- // Since the dispatchers are attached to the message, they'll be closed on
- // message destruction.
- LOG(ERROR) << "Sending handles over remote message pipes not yet supported "
- "(sent handles will simply be closed)";
- }
-
if (is_running()) {
message->SerializeAndCloseDispatchers(channel_.get());
diff --git a/mojo/system/proxy_message_pipe_endpoint.h b/mojo/system/proxy_message_pipe_endpoint.h
index f3cc221..c1ed3c0 100644
--- a/mojo/system/proxy_message_pipe_endpoint.h
+++ b/mojo/system/proxy_message_pipe_endpoint.h
@@ -20,6 +20,7 @@ namespace mojo {
namespace system {
class Channel;
+class LocalMessagePipeEndpoint;
class MessagePipe;
// A |ProxyMessagePipeEndpoint| connects an end of a |MessagePipe| to a
@@ -41,6 +42,13 @@ class MOJO_SYSTEM_IMPL_EXPORT ProxyMessagePipeEndpoint
: public MessagePipeEndpoint {
public:
ProxyMessagePipeEndpoint();
+ // Constructs a |ProxyMessagePipeEndpoint| that replaces the given
+ // |LocalMessagePipeEndpoint| (which this constructor will close), taking its
+ // message queue's contents. This is done when transferring a message pipe
+ // handle over a remote message pipe.
+ ProxyMessagePipeEndpoint(
+ LocalMessagePipeEndpoint* local_message_pipe_endpoint,
+ bool is_peer_open);
virtual ~ProxyMessagePipeEndpoint();
// |MessagePipeEndpoint| implementation:
diff --git a/mojo/system/remote_message_pipe_posix_unittest.cc b/mojo/system/remote_message_pipe_posix_unittest.cc
index c76d5b6..bd279e8 100644
--- a/mojo/system/remote_message_pipe_posix_unittest.cc
+++ b/mojo/system/remote_message_pipe_posix_unittest.cc
@@ -439,7 +439,7 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) {
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
ConnectMessagePipes(mp0, mp1);
- // We'll try to pass one of these dispatc
+ // We'll try to pass this dispatcher.
scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher());
scoped_refptr<MessagePipe> local_mp(new MessagePipe());
dispatcher->Init(local_mp, 0);
@@ -459,9 +459,7 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) {
std::vector<DispatcherTransport> transports;
transports.push_back(transport);
EXPECT_EQ(MOJO_RESULT_OK,
- mp0->WriteMessage(0,
- hello, sizeof(hello),
- &transports,
+ mp0->WriteMessage(0, hello, sizeof(hello), &transports,
MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
@@ -481,22 +479,73 @@ TEST_F(RemoteMessagePipeTest, HandlePassing) {
std::vector<scoped_refptr<Dispatcher> > read_dispatchers;
uint32_t read_num_dispatchers = 10; // Maximum to get.
EXPECT_EQ(MOJO_RESULT_OK,
- mp1->ReadMessage(1,
- read_buffer, &read_buffer_size,
+ mp1->ReadMessage(1, read_buffer, &read_buffer_size,
&read_dispatchers, &read_num_dispatchers,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(hello, read_buffer);
EXPECT_EQ(1u, read_dispatchers.size());
EXPECT_EQ(1u, read_num_dispatchers);
+ ASSERT_TRUE(read_dispatchers[0].get());
+ EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
+
+ EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
+ dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
+
+ // Write to "local_mp", port 1.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ local_mp->WriteMessage(1, hello, sizeof(hello), NULL,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
+ // here. (We don't crash if I sleep and then close.)
+
+ // Wait for the dispatcher to become readable.
+ waiter.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ dispatcher->AddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 456));
+ EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ dispatcher->RemoveWaiter(&waiter);
+
+ // Read from the dispatcher.
+ memset(read_buffer, 0, sizeof(read_buffer));
+ read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
+ EXPECT_STREQ(hello, read_buffer);
+
+ // Prepare to wait on "local_mp", port 1.
+ waiter.Init();
+ EXPECT_EQ(MOJO_RESULT_OK,
+ local_mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
+
+ // Write to the dispatcher.
+ EXPECT_EQ(MOJO_RESULT_OK,
+ dispatcher->WriteMessage(hello, sizeof(hello), NULL,
+ MOJO_WRITE_MESSAGE_FLAG_NONE));
+
+ // Wait.
+ EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
+ local_mp->RemoveWaiter(1, &waiter);
+
+ // Read from "local_mp", port 1.
+ memset(read_buffer, 0, sizeof(read_buffer));
+ read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
+ MOJO_READ_MESSAGE_FLAG_NONE));
+ EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
+ EXPECT_STREQ(hello, read_buffer);
- // TODO(vtl): Once we can pass the local message pipe handle (over a remote
- // message pipe), this will fail.
- EXPECT_FALSE(read_dispatchers[0].get());
+ // TODO(vtl): Also test that messages queued up before the handle was sent are
+ // delivered properly.
// Close everything that belongs to us.
mp0->Close(0);
mp1->Close(1);
+ EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
// Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
local_mp->Close(1);
}