summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-12 10:32:09 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-12 10:32:09 +0000
commit13d073cd6ee741112c278197c8f0ad5d7ca63f57 (patch)
treecf2623e6e3d9311863c48cc89deb62b2754a4f14 /mojo
parente6f9c69af889109deb27caee726c506a36984f51 (diff)
downloadchromium_src-13d073cd6ee741112c278197c8f0ad5d7ca63f57.zip
chromium_src-13d073cd6ee741112c278197c8f0ad5d7ca63f57.tar.gz
chromium_src-13d073cd6ee741112c278197c8f0ad5d7ca63f57.tar.bz2
Mojo: More plumbing to support sending handles over MessagePipes.
R=darin Review URL: https://codereview.chromium.org/68993005 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@234475 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo')
-rw-r--r--mojo/system/channel.cc4
-rw-r--r--mojo/system/dispatcher.cc2
-rw-r--r--mojo/system/dispatcher.h6
-rw-r--r--mojo/system/local_message_pipe_endpoint.cc18
-rw-r--r--mojo/system/local_message_pipe_endpoint.h12
-rw-r--r--mojo/system/message_pipe.cc19
-rw-r--r--mojo/system/message_pipe.h4
-rw-r--r--mojo/system/message_pipe_dispatcher.cc12
-rw-r--r--mojo/system/message_pipe_endpoint.cc3
-rw-r--r--mojo/system/message_pipe_endpoint.h17
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.cc15
-rw-r--r--mojo/system/proxy_message_pipe_endpoint.h4
12 files changed, 74 insertions, 42 deletions
diff --git a/mojo/system/channel.cc b/mojo/system/channel.cc
index 095e44c..1160d73 100644
--- a/mojo/system/channel.cc
+++ b/mojo/system/channel.cc
@@ -186,8 +186,8 @@ void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
MessageInTransit* own_message = MessageInTransit::Create(
message.type(), message.subtype(), message.data(), message.data_size());
if (endpoint_info.message_pipe->EnqueueMessage(
- MessagePipe::GetPeerPort(endpoint_info.port),
- own_message) != MOJO_RESULT_OK) {
+ MessagePipe::GetPeerPort(endpoint_info.port), own_message, NULL) !=
+ MOJO_RESULT_OK) {
HandleLocalError(base::StringPrintf(
"Failed to enqueue message to local destination ID %u",
static_cast<unsigned>(local_id)));
diff --git a/mojo/system/dispatcher.cc b/mojo/system/dispatcher.cc
index bb3327f..a2c5126 100644
--- a/mojo/system/dispatcher.cc
+++ b/mojo/system/dispatcher.cc
@@ -38,7 +38,7 @@ MojoResult Dispatcher::ReadMessage(
uint32_t max_num_dispatchers,
std::vector<scoped_refptr<Dispatcher> >* dispatchers,
MojoReadMessageFlags flags) {
- DCHECK(max_num_dispatchers == 0 || !!dispatchers);
+ DCHECK(max_num_dispatchers == 0 || (dispatchers && dispatchers->empty()));
base::AutoLock locker(lock_);
if (is_closed_)
diff --git a/mojo/system/dispatcher.h b/mojo/system/dispatcher.h
index d1721d6..abd1ed1 100644
--- a/mojo/system/dispatcher.h
+++ b/mojo/system/dispatcher.h
@@ -42,9 +42,9 @@ class MOJO_SYSTEM_EXPORT Dispatcher :
MojoResult WriteMessage(const void* bytes, uint32_t num_bytes,
const std::vector<Dispatcher*>* dispatchers,
MojoWriteMessageFlags flags);
- // |dispatchers| must be non-null if |max_num_dispatchers| is nonzero. On
- // success, it will be set to the dispatchers to be received (and assigned
- // handles) as part of the message.
+ // |dispatchers| must be non-null but empty, if |max_num_dispatchers| is
+ // nonzero. On success, it will be set to the dispatchers to be received (and
+ // assigned handles) as part of the message.
MojoResult ReadMessage(
void* bytes, uint32_t* num_bytes,
uint32_t max_num_dispatchers,
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc
index 8f1cdf3..2886a1d 100644
--- a/mojo/system/local_message_pipe_endpoint.cc
+++ b/mojo/system/local_message_pipe_endpoint.cc
@@ -51,10 +51,18 @@ bool LocalMessagePipeEndpoint::OnPeerClose() {
return true;
}
-MojoResult LocalMessagePipeEndpoint::EnqueueMessage(MessageInTransit* message) {
+MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers) {
DCHECK(is_open_);
DCHECK(is_peer_open_);
+ // TODO(vtl)
+ if (dispatchers) {
+ message->Destroy();
+ return MOJO_RESULT_UNIMPLEMENTED;
+ }
+
bool was_empty = message_queue_.empty();
message_queue_.push_back(message);
if (was_empty) {
@@ -70,9 +78,11 @@ void LocalMessagePipeEndpoint::CancelAllWaiters() {
waiter_list_.CancelAllWaiters();
}
+// TODO(vtl): Support receiving handles.
MojoResult LocalMessagePipeEndpoint::ReadMessage(
void* bytes, uint32_t* num_bytes,
- MojoHandle* handles, uint32_t* num_handles,
+ uint32_t max_num_dispatchers,
+ std::vector<scoped_refptr<Dispatcher> >* dispatchers,
MojoReadMessageFlags flags) {
DCHECK(is_open_);
@@ -96,10 +106,6 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage(
else
not_enough_space = true;
- // TODO(vtl): Support receiving handles.
- if (num_handles)
- *num_handles = 0;
-
if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
message_queue_.pop_front();
message->Destroy();
diff --git a/mojo/system/local_message_pipe_endpoint.h b/mojo/system/local_message_pipe_endpoint.h
index 7e8703d..99ebec2 100644
--- a/mojo/system/local_message_pipe_endpoint.h
+++ b/mojo/system/local_message_pipe_endpoint.h
@@ -25,14 +25,18 @@ class MOJO_SYSTEM_EXPORT LocalMessagePipeEndpoint : public MessagePipeEndpoint {
// |MessagePipeEndpoint| implementation:
virtual void Close() OVERRIDE;
virtual bool OnPeerClose() OVERRIDE;
- virtual MojoResult EnqueueMessage(MessageInTransit* message) OVERRIDE;
+ virtual MojoResult EnqueueMessage(
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers) OVERRIDE;
// There's a dispatcher for |LocalMessagePipeEndpoint|s, so we have to
// implement/override these:
virtual void CancelAllWaiters() OVERRIDE;
- virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes,
- MojoHandle* handles, uint32_t* num_handles,
- MojoReadMessageFlags flags) OVERRIDE;
+ virtual MojoResult ReadMessage(
+ void* bytes, uint32_t* num_bytes,
+ uint32_t max_num_dispatchers,
+ std::vector<scoped_refptr<Dispatcher> >* dispatchers,
+ MojoReadMessageFlags flags) OVERRIDE;
virtual MojoResult AddWaiter(Waiter* waiter,
MojoWaitFlags flags,
MojoResult wake_result) OVERRIDE;
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index 7f95a90..b964352 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -64,7 +64,7 @@ void MessagePipe::Close(unsigned port) {
MojoResult MessagePipe::WriteMessage(
unsigned port,
const void* bytes, uint32_t num_bytes,
- const std::vector<Dispatcher*>* /*dispatchers*/,
+ const std::vector<Dispatcher*>* dispatchers,
MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
return EnqueueMessage(
@@ -72,7 +72,8 @@ MojoResult MessagePipe::WriteMessage(
MessageInTransit::Create(
MessageInTransit::kTypeMessagePipeEndpoint,
MessageInTransit::kSubtypeMessagePipeEndpointData,
- bytes, num_bytes));
+ bytes, num_bytes),
+ dispatchers);
}
MojoResult MessagePipe::ReadMessage(
@@ -87,7 +88,7 @@ MojoResult MessagePipe::ReadMessage(
DCHECK(endpoints_[port].get());
return endpoints_[port]->ReadMessage(bytes, num_bytes,
- NULL, NULL,
+ max_num_dispatchers, dispatchers,
flags);
}
@@ -112,13 +113,17 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
endpoints_[port]->RemoveWaiter(waiter);
}
-MojoResult MessagePipe::EnqueueMessage(unsigned port,
- MessageInTransit* message) {
+MojoResult MessagePipe::EnqueueMessage(
+ unsigned port,
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers) {
DCHECK(port == 0 || port == 1);
DCHECK(message);
- if (message->type() == MessageInTransit::kTypeMessagePipe)
+ if (message->type() == MessageInTransit::kTypeMessagePipe) {
+ DCHECK(!dispatchers);
return HandleControlMessage(port, message);
+ }
DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
@@ -131,7 +136,7 @@ MojoResult MessagePipe::EnqueueMessage(unsigned port,
return MOJO_RESULT_FAILED_PRECONDITION;
}
- return endpoints_[port]->EnqueueMessage(message);
+ return endpoints_[port]->EnqueueMessage(message, dispatchers);
}
void MessagePipe::Attach(unsigned port,
diff --git a/mojo/system/message_pipe.h b/mojo/system/message_pipe.h
index fc5ec63..e5fab3d 100644
--- a/mojo/system/message_pipe.h
+++ b/mojo/system/message_pipe.h
@@ -68,7 +68,9 @@ class MOJO_SYSTEM_EXPORT MessagePipe :
// messages (typically to a |LocalMessagePipeEndpoint|). Unlike
// |WriteMessage()|, |port| is the *destination* port. Takes ownership of
// |message|.
- MojoResult EnqueueMessage(unsigned port, MessageInTransit* message);
+ MojoResult EnqueueMessage(unsigned port,
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers);
// These are used by |Channel|.
void Attach(unsigned port,
diff --git a/mojo/system/message_pipe_dispatcher.cc b/mojo/system/message_pipe_dispatcher.cc
index df4def1..e0270f8 100644
--- a/mojo/system/message_pipe_dispatcher.cc
+++ b/mojo/system/message_pipe_dispatcher.cc
@@ -45,6 +45,9 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
const void* bytes, uint32_t num_bytes,
const std::vector<Dispatcher*>* dispatchers,
MojoWriteMessageFlags flags) {
+ DCHECK(!dispatchers || (dispatchers->size() > 0 &&
+ dispatchers->size() <= kMaxMessageNumHandles));
+
lock().AssertAcquired();
if (!VerifyUserPointer<void>(bytes, num_bytes))
@@ -52,15 +55,6 @@ MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
if (num_bytes > kMaxMessageNumBytes)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
- if (dispatchers) {
- DCHECK_GT(dispatchers->size(), 0u);
- DCHECK_LE(dispatchers->size(), kMaxMessageNumHandles);
-
- // TODO(vtl)
- NOTIMPLEMENTED();
- return MOJO_RESULT_UNIMPLEMENTED;
- }
-
return message_pipe_->WriteMessage(port_,
bytes, num_bytes,
dispatchers,
diff --git a/mojo/system/message_pipe_endpoint.cc b/mojo/system/message_pipe_endpoint.cc
index 977ee2b..c2e387a 100644
--- a/mojo/system/message_pipe_endpoint.cc
+++ b/mojo/system/message_pipe_endpoint.cc
@@ -16,7 +16,8 @@ void MessagePipeEndpoint::CancelAllWaiters() {
MojoResult MessagePipeEndpoint::ReadMessage(
void* /*bytes*/, uint32_t* /*num_bytes*/,
- MojoHandle* /*handles*/, uint32_t* /*num_handles*/,
+ uint32_t /*max_num_dispatchers*/,
+ std::vector<scoped_refptr<Dispatcher> >* /*dispatchers*/,
MojoReadMessageFlags /*flags*/) {
NOTREACHED();
return MOJO_RESULT_INTERNAL;
diff --git a/mojo/system/message_pipe_endpoint.h b/mojo/system/message_pipe_endpoint.h
index 14e9897..00c67e3 100644
--- a/mojo/system/message_pipe_endpoint.h
+++ b/mojo/system/message_pipe_endpoint.h
@@ -5,6 +5,10 @@
#ifndef MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_
#define MOJO_SYSTEM_MESSAGE_PIPE_ENDPOINT_H_
+#include <stdint.h>
+
+#include <vector>
+
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "mojo/public/system/core.h"
@@ -15,6 +19,7 @@ namespace mojo {
namespace system {
class Channel;
+class Dispatcher;
class Waiter;
// This is an interface to one of the ends of a message pipe, and is used by
@@ -34,7 +39,9 @@ class MOJO_SYSTEM_EXPORT MessagePipeEndpoint {
// Returns false if the endpoint should be closed and destroyed, else true.
virtual bool OnPeerClose() = 0;
// Takes ownership of |message|.
- virtual MojoResult EnqueueMessage(MessageInTransit* message) = 0;
+ virtual MojoResult EnqueueMessage(
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers) = 0;
// Implementations must override these if they represent a local endpoint,
// i.e., one for which there's a |MessagePipeDispatcher| (and thus a handle).
@@ -45,9 +52,11 @@ class MOJO_SYSTEM_EXPORT MessagePipeEndpoint {
// though |MessagePipe|'s implementation may have to do a little more if the
// operation involves both endpoints.
virtual void CancelAllWaiters();
- virtual MojoResult ReadMessage(void* bytes, uint32_t* num_bytes,
- MojoHandle* handles, uint32_t* num_handles,
- MojoReadMessageFlags flags);
+ virtual MojoResult ReadMessage(
+ void* bytes, uint32_t* num_bytes,
+ uint32_t max_num_dispatchers,
+ std::vector<scoped_refptr<Dispatcher> >* dispatchers,
+ MojoReadMessageFlags flags);
virtual MojoResult AddWaiter(Waiter* waiter,
MojoWaitFlags flags,
MojoResult wake_result);
diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc
index fce2e43..f9b9e56 100644
--- a/mojo/system/proxy_message_pipe_endpoint.cc
+++ b/mojo/system/proxy_message_pipe_endpoint.cc
@@ -54,7 +54,7 @@ bool ProxyMessagePipeEndpoint::OnPeerClose() {
if (EnqueueMessage(MessageInTransit::Create(
MessageInTransit::kTypeMessagePipe,
MessageInTransit::kSubtypeMessagePipePeerClosed,
- NULL, 0)) != MOJO_RESULT_OK) {
+ NULL, 0), NULL) != MOJO_RESULT_OK) {
// TODO(vtl): Do something more sensible on error here?
LOG(WARNING) << "Failed to send peer closed control message";
}
@@ -65,13 +65,22 @@ bool ProxyMessagePipeEndpoint::OnPeerClose() {
return !paused_message_queue_.empty();
}
-MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(MessageInTransit* message) {
+MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers) {
DCHECK(is_open_);
// If our (local) peer isn't open, we should only be enqueueing our own
// control messages.
DCHECK(is_peer_open_ ||
(message->type() == MessageInTransit::kTypeMessagePipe));
+ // TODO(vtl): Support sending handles over OS pipes.
+ if (dispatchers) {
+ message->Destroy();
+ NOTIMPLEMENTED();
+ return MOJO_RESULT_UNIMPLEMENTED;
+ }
+
MojoResult rv = MOJO_RESULT_OK;
if (is_running()) {
@@ -116,7 +125,7 @@ bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
paused_message_queue_.begin();
it != paused_message_queue_.end();
++it) {
- result = EnqueueMessage(*it);
+ result = EnqueueMessage(*it, NULL);
if (result != MOJO_RESULT_OK) {
// TODO(vtl): Do something more sensible on error here?
LOG(WARNING) << "Failed to send message";
diff --git a/mojo/system/proxy_message_pipe_endpoint.h b/mojo/system/proxy_message_pipe_endpoint.h
index 43b59b4..6b11d26 100644
--- a/mojo/system/proxy_message_pipe_endpoint.h
+++ b/mojo/system/proxy_message_pipe_endpoint.h
@@ -45,7 +45,9 @@ class MOJO_SYSTEM_EXPORT ProxyMessagePipeEndpoint : public MessagePipeEndpoint {
// |MessagePipeEndpoint| implementation:
virtual void Close() OVERRIDE;
virtual bool OnPeerClose() OVERRIDE;
- virtual MojoResult EnqueueMessage(MessageInTransit* message) OVERRIDE;
+ virtual MojoResult EnqueueMessage(
+ MessageInTransit* message,
+ const std::vector<Dispatcher*>* dispatchers) OVERRIDE;
virtual void Attach(scoped_refptr<Channel> channel,
MessageInTransit::EndpointId local_id) OVERRIDE;
virtual bool Run(MessageInTransit::EndpointId remote_id) OVERRIDE;