summaryrefslogtreecommitdiffstats
path: root/mojo/system
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-05-28 21:55:45 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-05-28 21:55:45 +0000
commit9646057ea365fd0d0ccdde7d4989517d1e56bdd3 (patch)
tree650c949fbfbf6f536b46e0f9436343e0ae4e9465 /mojo/system
parent45e8f89a95998a616d73121b15da567e06472176 (diff)
downloadchromium_src-9646057ea365fd0d0ccdde7d4989517d1e56bdd3.zip
chromium_src-9646057ea365fd0d0ccdde7d4989517d1e56bdd3.tar.gz
chromium_src-9646057ea365fd0d0ccdde7d4989517d1e56bdd3.tar.bz2
Mojo: Refactor the sending of FDs (a.k.a. PlatformHandles) on POSIX.
This should (hopefully) solve some correctness bugs. (The semantics of sending control messages with FDs on a SOCK_STREAM Unix domain socket are somewhat tricky. In particular, if you do a sendmsg() with some FDs and some data, followed by sending more data (using sendmsg() again, say), then a recvmsg() on the other side may receive the FDs together with data from multiple sendmsg()s, or possibly also incomplete data from the first sendmsg().) Separately: I'll add more careful tests , and also re-enable MultiprocessMessagePipeTest.PlatformHandlePassing on Mac -- which caught the bug. R=yzshen@chromium.org Review URL: https://codereview.chromium.org/294393014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@273367 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r--mojo/system/message_in_transit.cc14
-rw-r--r--mojo/system/message_in_transit.h10
-rw-r--r--mojo/system/raw_channel.cc77
-rw-r--r--mojo/system/raw_channel.h15
-rw-r--r--mojo/system/raw_channel_posix.cc108
-rw-r--r--mojo/system/transport_data.cc21
-rw-r--r--mojo/system/transport_data.h14
7 files changed, 195 insertions, 64 deletions
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc
index f7afa3b..944e12b 100644
--- a/mojo/system/message_in_transit.cc
+++ b/mojo/system/message_in_transit.cc
@@ -20,6 +20,8 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
MessageInTransit::kTypeMessagePipe;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
MessageInTransit::kTypeChannel;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type
+ MessageInTransit::kTypeRawChannel;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
MessageInTransit::kSubtypeMessagePipeEndpointData;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
@@ -28,6 +30,8 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck;
+STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype
+ MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles;
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
MessageInTransit::kInvalidEndpointId;
STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment;
@@ -158,6 +162,7 @@ void MessageInTransit::SetDispatchers(
scoped_ptr<DispatcherVector> dispatchers) {
DCHECK(dispatchers);
DCHECK(!dispatchers_);
+ DCHECK(!transport_data_);
dispatchers_ = dispatchers.Pass();
#ifndef NDEBUG
@@ -166,6 +171,15 @@ void MessageInTransit::SetDispatchers(
#endif
}
+void MessageInTransit::SetTransportData(
+ scoped_ptr<TransportData> transport_data) {
+ DCHECK(transport_data);
+ DCHECK(!transport_data_);
+ DCHECK(!dispatchers_);
+
+ transport_data_ = transport_data.Pass();
+}
+
void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
DCHECK(channel);
DCHECK(!transport_data_);
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 66bb456..5a4a614 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -46,8 +46,10 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
static const Type kTypeMessagePipeEndpoint = 0;
// Messages that are forwarded to |MessagePipe|s.
static const Type kTypeMessagePipe = 1;
- // Messages that are consumed by the channel.
+ // Messages that are consumed by the |Channel|.
static const Type kTypeChannel = 2;
+ // Messages that are consumed by the |RawChannel| (implementation).
+ static const Type kTypeRawChannel = 3;
typedef uint16_t Subtype;
// Subtypes for type |kTypeMessagePipeEndpoint|:
@@ -58,6 +60,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
static const Subtype kSubtypeChannelRunMessagePipeEndpoint = 0;
static const Subtype kSubtypeChannelRemoveMessagePipeEndpoint = 1;
static const Subtype kSubtypeChannelRemoveMessagePipeEndpointAck = 2;
+ // Subtypes for type |kTypeRawChannel|:
+ static const Subtype kSubtypeRawChannelPosixExtraPlatformHandles = 0;
typedef uint32_t EndpointId;
// Never a valid endpoint ID.
@@ -152,6 +156,10 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// message must not already have dispatchers.
void SetDispatchers(scoped_ptr<DispatcherVector> dispatchers);
+ // Sets the |TransportData| for this message. This should only be done when
+ // there are no dispatchers and no existing |TransportData|.
+ void SetTransportData(scoped_ptr<TransportData> transport_data);
+
// Serializes any dispatchers to the secondary buffer. This message must not
// already have a secondary buffer (so this must only be called once). The
// caller must ensure (e.g., by holding on to a reference) that |channel|
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
index 6fffa86..3aa3ca2 100644
--- a/mojo/system/raw_channel.cc
+++ b/mojo/system/raw_channel.cc
@@ -130,6 +130,10 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
return;
}
+ // TODO(vtl): We could actually send out buffers from multiple messages, with
+ // the "stopping" condition being reaching a message with platform handles
+ // attached.
+
// Write from both buffers.
DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
transport_data_buffer_size);
@@ -311,39 +315,48 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
return;
}
- embedder::ScopedPlatformHandleVectorPtr platform_handles;
- if (message_view.transport_data_buffer()) {
- size_t num_platform_handles;
- const void* platform_handle_table;
- TransportData::GetPlatformHandleTable(
- message_view.transport_data_buffer(),
- &num_platform_handles,
- &platform_handle_table);
-
- if (num_platform_handles > 0) {
- platform_handles =
- GetReadPlatformHandles(num_platform_handles,
- platform_handle_table).Pass();
- if (!platform_handles) {
- LOG(WARNING) << "Invalid number of platform handles received";
- read_stopped_ = true;
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
- return;
+ if (message_view.type() == MessageInTransit::kTypeRawChannel) {
+ if (!OnReadMessageForRawChannel(message_view)) {
+ read_stopped_ = true;
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
+ return;
+ }
+ } else {
+ embedder::ScopedPlatformHandleVectorPtr platform_handles;
+ if (message_view.transport_data_buffer()) {
+ size_t num_platform_handles;
+ const void* platform_handle_table;
+ TransportData::GetPlatformHandleTable(
+ message_view.transport_data_buffer(),
+ &num_platform_handles,
+ &platform_handle_table);
+
+ if (num_platform_handles > 0) {
+ platform_handles =
+ GetReadPlatformHandles(num_platform_handles,
+ platform_handle_table).Pass();
+ if (!platform_handles) {
+ LOG(WARNING) << "Invalid number of platform handles received";
+ read_stopped_ = true;
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
+ return;
+ }
}
}
- }
- // TODO(vtl): In the case that we aren't expecting any platform handles,
- // for the POSIX implementation, we should confirm that none are stored.
+ // TODO(vtl): In the case that we aren't expecting any platform handles,
+ // for the POSIX implementation, we should confirm that none are stored.
- // Dispatch the message.
- DCHECK(delegate_);
- delegate_->OnReadMessage(message_view, platform_handles.Pass());
- if (read_stopped_) {
- // |Shutdown()| was called in |OnReadMessage()|.
- // TODO(vtl): Add test for this case.
- return;
+ // Dispatch the message.
+ DCHECK(delegate_);
+ delegate_->OnReadMessage(message_view, platform_handles.Pass());
+ if (read_stopped_) {
+ // |Shutdown()| was called in |OnReadMessage()|.
+ // TODO(vtl): Add test for this case.
+ return;
+ }
}
+
did_dispatch_message = true;
// Update our state.
@@ -418,6 +431,14 @@ void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
write_buffer_->message_queue_.push_back(message.release());
}
+bool RawChannel::OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view) {
+ // No non-implementation specific |RawChannel| control messages.
+ LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
+ << ")";
+ return false;
+}
+
void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
// TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
index 8021e2f..59600bf 100644
--- a/mojo/system/raw_channel.h
+++ b/mojo/system/raw_channel.h
@@ -89,8 +89,10 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// This must be called (on the I/O thread) before this object is destroyed.
void Shutdown();
- // Writes the given message (or schedules it to be written). This is
- // thread-safe. Returns true on success.
+ // Writes the given message (or schedules it to be written). |message| must
+ // have no |Dispatcher|s still attached (i.e.,
+ // |SerializeAndCloseDispatchers()| should have been called). This method is
+ // thread-safe and may be called from any thread. Returns true on success.
bool WriteMessage(scoped_ptr<MessageInTransit> message);
// Returns true if the write buffer is empty (i.e., all messages written using
@@ -209,6 +211,15 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// called (on any thread) with |write_lock_| held.
virtual void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message);
+ // Handles any control messages targeted to the |RawChannel| (or
+ // implementation subclass). Implementation subclasses may override this to
+ // handle any implementation-specific control messages, but should call
+ // |RawChannel::OnReadMessageForRawChannel()| for any remaining messages.
+ // Returns true on success and false on error (e.g., invalid control message).
+ // This is only called on the I/O thread.
+ virtual bool OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view);
+
// Reads into |read_buffer()|.
// This class guarantees that:
// - the area indicated by |GetBuffer()| will stay valid until read completion
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index cc6324d..245ad81 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -41,10 +41,13 @@ class RawChannelPosix : public RawChannel,
private:
// |RawChannel| protected methods:
- // Actually override |EnqueueMessageNoLock()| so that we can send multiple
- // messages with FDs if necessary.
+ // Actually override this so that we can send multiple messages with (only)
+ // FDs if necessary.
virtual void EnqueueMessageNoLock(
scoped_ptr<MessageInTransit> message) OVERRIDE;
+ // Override this to handle those extra FD-only messages.
+ virtual bool OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view) OVERRIDE;
virtual IOResult Read(size_t* bytes_read) OVERRIDE;
virtual IOResult ScheduleRead() OVERRIDE;
virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
@@ -118,11 +121,57 @@ size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
void RawChannelPosix::EnqueueMessageNoLock(
scoped_ptr<MessageInTransit> message) {
- // TODO(vtl): Split any message with too many platform handles into multiple
- // messages.
+ if (message->transport_data()) {
+ embedder::PlatformHandleVector* const platform_handles =
+ message->transport_data()->platform_handles();
+ if (platform_handles &&
+ platform_handles->size() > embedder::kPlatformChannelMaxNumHandles) {
+ // We can't attach all the FDs to a single message, so we have to "split"
+ // the message. Send as many control messages as needed first with FDs
+ // attached (and no data).
+ size_t i = 0;
+ for (; platform_handles->size() - i >
+ embedder::kPlatformChannelMaxNumHandles;
+ i += embedder::kPlatformChannelMaxNumHandles) {
+ scoped_ptr<MessageInTransit> fd_message(
+ new MessageInTransit(
+ MessageInTransit::kTypeRawChannel,
+ MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles,
+ 0,
+ NULL));
+ embedder::ScopedPlatformHandleVectorPtr fds(
+ new embedder::PlatformHandleVector(
+ platform_handles->begin() + i,
+ platform_handles->begin() + i +
+ embedder::kPlatformChannelMaxNumHandles));
+ fd_message->SetTransportData(
+ make_scoped_ptr(new TransportData(fds.Pass())));
+ RawChannel::EnqueueMessageNoLock(fd_message.Pass());
+ }
+
+ // Remove the handles that we "moved" into the other messages.
+ platform_handles->erase(platform_handles->begin(),
+ platform_handles->begin() + i);
+ }
+ }
+
RawChannel::EnqueueMessageNoLock(message.Pass());
}
+bool RawChannelPosix::OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view) {
+ DCHECK_EQ(message_view.type(), MessageInTransit::kTypeRawChannel);
+
+ if (message_view.subtype() ==
+ MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles) {
+ // We don't need to do anything. |RawChannel| won't extract the platform
+ // handles, and they'll be accumulated in |Read()|.
+ return true;
+ }
+
+ return RawChannel::OnReadMessageForRawChannel(message_view);
+}
+
RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(!pending_read_);
@@ -141,11 +190,6 @@ RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles,
embedder::kPlatformChannelMaxNumHandles);
- if (read_result != 1) {
- LOG(WARNING) << "Invalid control message with platform handles";
- return IO_FAILED;
- }
-
// We should never accumulate more than |TransportData::kMaxPlatformHandles
// + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
// possible because we could have accumulated all the handles for a message,
@@ -158,9 +202,6 @@ RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
read_platform_handles_.clear();
return IO_FAILED;
}
-
- // This wasn't a data read (platform handles just get stashed).
- return ScheduleRead();
}
if (read_result > 0) {
@@ -218,33 +259,39 @@ RawChannel::IOResult RawChannelPosix::WriteNoLock(
DCHECK(!pending_write_);
+ size_t num_platform_handles = 0;
+ ssize_t write_result;
if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
- size_t num_platform_handles;
embedder::PlatformHandle* platform_handles;
void* serialization_data; // Actually unused.
write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles,
&platform_handles,
&serialization_data);
DCHECK_GT(num_platform_handles, 0u);
+ DCHECK_LE(num_platform_handles, embedder::kPlatformChannelMaxNumHandles);
DCHECK(platform_handles);
- DCHECK(serialization_data);
-
- size_t num_to_send = std::min(num_platform_handles,
- embedder::kPlatformChannelMaxNumHandles);
- bool succeeded = embedder::PlatformChannelSendHandles(fd_.get(),
- platform_handles,
- num_to_send);
- if (succeeded) {
- *platform_handles_written = num_to_send;
- *bytes_written = 0;
- return IO_SUCCEEDED;
+
+ // TODO(vtl): Reduce code duplication. (This is duplicated from below.)
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
}
+
+ write_result = embedder::PlatformChannelSendmsgWithHandles(
+ fd_.get(), iov, buffer_count, platform_handles, num_platform_handles);
+ for (size_t i = 0; i < num_platform_handles; i++)
+ platform_handles[i].CloseIfNecessary();
} else {
std::vector<WriteBuffer::Buffer> buffers;
write_buffer_no_lock()->GetBuffers(&buffers);
DCHECK(!buffers.empty());
- ssize_t write_result;
if (buffers.size() == 1) {
write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr,
buffers[0].size);
@@ -252,7 +299,6 @@ RawChannel::IOResult RawChannelPosix::WriteNoLock(
const size_t kMaxBufferCount = 10;
iovec iov[kMaxBufferCount];
size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
-
for (size_t i = 0; i < buffer_count; ++i) {
iov[i].iov_base = const_cast<char*>(buffers[i].addr);
iov[i].iov_len = buffers[i].size;
@@ -261,12 +307,12 @@ RawChannel::IOResult RawChannelPosix::WriteNoLock(
write_result = embedder::PlatformChannelWritev(fd_.get(), iov,
buffer_count);
}
+ }
- if (write_result >= 0) {
- *platform_handles_written = 0;
- *bytes_written = static_cast<size_t>(write_result);
- return IO_SUCCEEDED;
- }
+ if (write_result >= 0) {
+ *platform_handles_written = num_platform_handles;
+ *bytes_written = static_cast<size_t>(write_result);
+ return IO_SUCCEEDED;
}
if (errno != EAGAIN && errno != EWOULDBLOCK) {
diff --git a/mojo/system/transport_data.cc b/mojo/system/transport_data.cc
index d7b4e5f..10a3228e 100644
--- a/mojo/system/transport_data.cc
+++ b/mojo/system/transport_data.cc
@@ -63,8 +63,7 @@ struct TransportData::PrivateStructForCompileAsserts {
};
TransportData::TransportData(scoped_ptr<DispatcherVector> dispatchers,
- Channel* channel)
- : buffer_size_(0) {
+ Channel* channel) {
DCHECK(dispatchers);
DCHECK(channel);
@@ -193,6 +192,17 @@ TransportData::TransportData(scoped_ptr<DispatcherVector> dispatchers,
// |dispatchers_| will be destroyed as it goes out of scope.
}
+#if defined(OS_POSIX)
+TransportData::TransportData(
+ embedder::ScopedPlatformHandleVectorPtr platform_handles)
+ : buffer_size_(sizeof(Header)),
+ platform_handles_(platform_handles.Pass()) {
+ buffer_.reset(static_cast<char*>(
+ base::AlignedAlloc(buffer_size_, MessageInTransit::kMessageAlignment)));
+ memset(buffer_.get(), 0, buffer_size_);
+}
+#endif // defined(OS_POSIX)
+
TransportData::~TransportData() {
}
@@ -212,8 +222,15 @@ const char* TransportData::ValidateBuffer(
const Header* header = static_cast<const Header*>(buffer);
const size_t num_handles = header->num_handles;
+
+#if !defined(OS_POSIX)
+ // On POSIX, we send control messages with platform handles (but no handles)
+ // attached (see the comments for
+ // |TransportData(embedder::ScopedPlatformHandleVectorPtr)|. (This check isn't
+ // important security-wise anyway.)
if (num_handles == 0)
return "Message has no handles attached, but secondary buffer present";
+#endif
// Sanity-check |num_handles| (before multiplying it against anything).
if (num_handles > kMaxMessageNumHandles)
diff --git a/mojo/system/transport_data.h b/mojo/system/transport_data.h
index 8215b7c..ddaa20b 100644
--- a/mojo/system/transport_data.h
+++ b/mojo/system/transport_data.h
@@ -12,6 +12,7 @@
#include "base/macros.h"
#include "base/memory/aligned_memory.h"
#include "base/memory/scoped_ptr.h"
+#include "build/build_config.h"
#include "mojo/embedder/platform_handle.h"
#include "mojo/embedder/platform_handle_vector.h"
#include "mojo/system/dispatcher.h"
@@ -20,6 +21,8 @@
namespace mojo {
namespace system {
+class Channel;
+
// This class is used by |MessageInTransit| to represent handles (|Dispatcher|s)
// in various stages of serialization.
//
@@ -87,6 +90,17 @@ class MOJO_SYSTEM_IMPL_EXPORT TransportData {
static const size_t kMaxPlatformHandles;
TransportData(scoped_ptr<DispatcherVector> dispatchers, Channel* channel);
+
+#if defined(OS_POSIX)
+ // This is a hacky POSIX-only constructor to directly attach only platform
+ // handles to a message, used by |RawChannelPosix| to split messages with too
+ // many platform handles into multiple messages. |Header| will be present, but
+ // be zero. (No other information will be attached, and
+ // |RawChannel::GetSerializedPlatformHandleSize()| should return zero.)
+ explicit TransportData(
+ embedder::ScopedPlatformHandleVectorPtr platform_handles);
+#endif
+
~TransportData();
const void* buffer() const { return buffer_.get(); }