diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-05-28 21:55:45 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-05-28 21:55:45 +0000 |
commit | 9646057ea365fd0d0ccdde7d4989517d1e56bdd3 (patch) | |
tree | 650c949fbfbf6f536b46e0f9436343e0ae4e9465 /mojo/system | |
parent | 45e8f89a95998a616d73121b15da567e06472176 (diff) | |
download | chromium_src-9646057ea365fd0d0ccdde7d4989517d1e56bdd3.zip chromium_src-9646057ea365fd0d0ccdde7d4989517d1e56bdd3.tar.gz chromium_src-9646057ea365fd0d0ccdde7d4989517d1e56bdd3.tar.bz2 |
Mojo: Refactor the sending of FDs (a.k.a. PlatformHandles) on POSIX.
This should (hopefully) solve some correctness bugs. (The semantics of
sending control messages with FDs on a SOCK_STREAM Unix domain socket
are somewhat tricky. In particular, if you do a sendmsg() with some FDs
and some data, followed by sending more data (using sendmsg() again,
say), then a recvmsg() on the other side may receive the FDs together
with data from multiple sendmsg()s, or possibly also incomplete data
from the first sendmsg().)
Separately: I'll add more careful tests , and also re-enable
MultiprocessMessagePipeTest.PlatformHandlePassing on Mac -- which caught
the bug.
R=yzshen@chromium.org
Review URL: https://codereview.chromium.org/294393014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@273367 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r-- | mojo/system/message_in_transit.cc | 14 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 10 | ||||
-rw-r--r-- | mojo/system/raw_channel.cc | 77 | ||||
-rw-r--r-- | mojo/system/raw_channel.h | 15 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 108 | ||||
-rw-r--r-- | mojo/system/transport_data.cc | 21 | ||||
-rw-r--r-- | mojo/system/transport_data.h | 14 |
7 files changed, 195 insertions, 64 deletions
diff --git a/mojo/system/message_in_transit.cc b/mojo/system/message_in_transit.cc index f7afa3b..944e12b 100644 --- a/mojo/system/message_in_transit.cc +++ b/mojo/system/message_in_transit.cc @@ -20,6 +20,8 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type MessageInTransit::kTypeMessagePipe; STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type MessageInTransit::kTypeChannel; +STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Type + MessageInTransit::kTypeRawChannel; STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype MessageInTransit::kSubtypeMessagePipeEndpointData; STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype @@ -28,6 +30,8 @@ STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint; STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck; +STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::Subtype + MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles; STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId MessageInTransit::kInvalidEndpointId; STATIC_CONST_MEMBER_DEFINITION const size_t MessageInTransit::kMessageAlignment; @@ -158,6 +162,7 @@ void MessageInTransit::SetDispatchers( scoped_ptr<DispatcherVector> dispatchers) { DCHECK(dispatchers); DCHECK(!dispatchers_); + DCHECK(!transport_data_); dispatchers_ = dispatchers.Pass(); #ifndef NDEBUG @@ -166,6 +171,15 @@ void MessageInTransit::SetDispatchers( #endif } +void MessageInTransit::SetTransportData( + scoped_ptr<TransportData> transport_data) { + DCHECK(transport_data); + DCHECK(!transport_data_); + DCHECK(!dispatchers_); + + transport_data_ = transport_data.Pass(); +} + void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) { DCHECK(channel); DCHECK(!transport_data_); diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index 66bb456..5a4a614 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -46,8 +46,10 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { static const Type kTypeMessagePipeEndpoint = 0; // Messages that are forwarded to |MessagePipe|s. static const Type kTypeMessagePipe = 1; - // Messages that are consumed by the channel. + // Messages that are consumed by the |Channel|. static const Type kTypeChannel = 2; + // Messages that are consumed by the |RawChannel| (implementation). + static const Type kTypeRawChannel = 3; typedef uint16_t Subtype; // Subtypes for type |kTypeMessagePipeEndpoint|: @@ -58,6 +60,8 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { static const Subtype kSubtypeChannelRunMessagePipeEndpoint = 0; static const Subtype kSubtypeChannelRemoveMessagePipeEndpoint = 1; static const Subtype kSubtypeChannelRemoveMessagePipeEndpointAck = 2; + // Subtypes for type |kTypeRawChannel|: + static const Subtype kSubtypeRawChannelPosixExtraPlatformHandles = 0; typedef uint32_t EndpointId; // Never a valid endpoint ID. @@ -152,6 +156,10 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { // message must not already have dispatchers. void SetDispatchers(scoped_ptr<DispatcherVector> dispatchers); + // Sets the |TransportData| for this message. This should only be done when + // there are no dispatchers and no existing |TransportData|. + void SetTransportData(scoped_ptr<TransportData> transport_data); + // Serializes any dispatchers to the secondary buffer. This message must not // already have a secondary buffer (so this must only be called once). The // caller must ensure (e.g., by holding on to a reference) that |channel| diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc index 6fffa86..3aa3ca2 100644 --- a/mojo/system/raw_channel.cc +++ b/mojo/system/raw_channel.cc @@ -130,6 +130,10 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { return; } + // TODO(vtl): We could actually send out buffers from multiple messages, with + // the "stopping" condition being reaching a message with platform handles + // attached. + // Write from both buffers. DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + transport_data_buffer_size); @@ -311,39 +315,48 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { return; } - embedder::ScopedPlatformHandleVectorPtr platform_handles; - if (message_view.transport_data_buffer()) { - size_t num_platform_handles; - const void* platform_handle_table; - TransportData::GetPlatformHandleTable( - message_view.transport_data_buffer(), - &num_platform_handles, - &platform_handle_table); - - if (num_platform_handles > 0) { - platform_handles = - GetReadPlatformHandles(num_platform_handles, - platform_handle_table).Pass(); - if (!platform_handles) { - LOG(WARNING) << "Invalid number of platform handles received"; - read_stopped_ = true; - CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); - return; + if (message_view.type() == MessageInTransit::kTypeRawChannel) { + if (!OnReadMessageForRawChannel(message_view)) { + read_stopped_ = true; + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); + return; + } + } else { + embedder::ScopedPlatformHandleVectorPtr platform_handles; + if (message_view.transport_data_buffer()) { + size_t num_platform_handles; + const void* platform_handle_table; + TransportData::GetPlatformHandleTable( + message_view.transport_data_buffer(), + &num_platform_handles, + &platform_handle_table); + + if (num_platform_handles > 0) { + platform_handles = + GetReadPlatformHandles(num_platform_handles, + platform_handle_table).Pass(); + if (!platform_handles) { + LOG(WARNING) << "Invalid number of platform handles received"; + read_stopped_ = true; + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); + return; + } } } - } - // TODO(vtl): In the case that we aren't expecting any platform handles, - // for the POSIX implementation, we should confirm that none are stored. + // TODO(vtl): In the case that we aren't expecting any platform handles, + // for the POSIX implementation, we should confirm that none are stored. - // Dispatch the message. - DCHECK(delegate_); - delegate_->OnReadMessage(message_view, platform_handles.Pass()); - if (read_stopped_) { - // |Shutdown()| was called in |OnReadMessage()|. - // TODO(vtl): Add test for this case. - return; + // Dispatch the message. + DCHECK(delegate_); + delegate_->OnReadMessage(message_view, platform_handles.Pass()); + if (read_stopped_) { + // |Shutdown()| was called in |OnReadMessage()|. + // TODO(vtl): Add test for this case. + return; + } } + did_dispatch_message = true; // Update our state. @@ -418,6 +431,14 @@ void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { write_buffer_->message_queue_.push_back(message.release()); } +bool RawChannel::OnReadMessageForRawChannel( + const MessageInTransit::View& message_view) { + // No non-implementation specific |RawChannel| control messages. + LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() + << ")"; + return false; +} + void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h index 8021e2f..59600bf 100644 --- a/mojo/system/raw_channel.h +++ b/mojo/system/raw_channel.h @@ -89,8 +89,10 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // This must be called (on the I/O thread) before this object is destroyed. void Shutdown(); - // Writes the given message (or schedules it to be written). This is - // thread-safe. Returns true on success. + // Writes the given message (or schedules it to be written). |message| must + // have no |Dispatcher|s still attached (i.e., + // |SerializeAndCloseDispatchers()| should have been called). This method is + // thread-safe and may be called from any thread. Returns true on success. bool WriteMessage(scoped_ptr<MessageInTransit> message); // Returns true if the write buffer is empty (i.e., all messages written using @@ -209,6 +211,15 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // called (on any thread) with |write_lock_| held. virtual void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message); + // Handles any control messages targeted to the |RawChannel| (or + // implementation subclass). Implementation subclasses may override this to + // handle any implementation-specific control messages, but should call + // |RawChannel::OnReadMessageForRawChannel()| for any remaining messages. + // Returns true on success and false on error (e.g., invalid control message). + // This is only called on the I/O thread. + virtual bool OnReadMessageForRawChannel( + const MessageInTransit::View& message_view); + // Reads into |read_buffer()|. // This class guarantees that: // - the area indicated by |GetBuffer()| will stay valid until read completion diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index cc6324d..245ad81 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -41,10 +41,13 @@ class RawChannelPosix : public RawChannel, private: // |RawChannel| protected methods: - // Actually override |EnqueueMessageNoLock()| so that we can send multiple - // messages with FDs if necessary. + // Actually override this so that we can send multiple messages with (only) + // FDs if necessary. virtual void EnqueueMessageNoLock( scoped_ptr<MessageInTransit> message) OVERRIDE; + // Override this to handle those extra FD-only messages. + virtual bool OnReadMessageForRawChannel( + const MessageInTransit::View& message_view) OVERRIDE; virtual IOResult Read(size_t* bytes_read) OVERRIDE; virtual IOResult ScheduleRead() OVERRIDE; virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( @@ -118,11 +121,57 @@ size_t RawChannelPosix::GetSerializedPlatformHandleSize() const { void RawChannelPosix::EnqueueMessageNoLock( scoped_ptr<MessageInTransit> message) { - // TODO(vtl): Split any message with too many platform handles into multiple - // messages. + if (message->transport_data()) { + embedder::PlatformHandleVector* const platform_handles = + message->transport_data()->platform_handles(); + if (platform_handles && + platform_handles->size() > embedder::kPlatformChannelMaxNumHandles) { + // We can't attach all the FDs to a single message, so we have to "split" + // the message. Send as many control messages as needed first with FDs + // attached (and no data). + size_t i = 0; + for (; platform_handles->size() - i > + embedder::kPlatformChannelMaxNumHandles; + i += embedder::kPlatformChannelMaxNumHandles) { + scoped_ptr<MessageInTransit> fd_message( + new MessageInTransit( + MessageInTransit::kTypeRawChannel, + MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles, + 0, + NULL)); + embedder::ScopedPlatformHandleVectorPtr fds( + new embedder::PlatformHandleVector( + platform_handles->begin() + i, + platform_handles->begin() + i + + embedder::kPlatformChannelMaxNumHandles)); + fd_message->SetTransportData( + make_scoped_ptr(new TransportData(fds.Pass()))); + RawChannel::EnqueueMessageNoLock(fd_message.Pass()); + } + + // Remove the handles that we "moved" into the other messages. + platform_handles->erase(platform_handles->begin(), + platform_handles->begin() + i); + } + } + RawChannel::EnqueueMessageNoLock(message.Pass()); } +bool RawChannelPosix::OnReadMessageForRawChannel( + const MessageInTransit::View& message_view) { + DCHECK_EQ(message_view.type(), MessageInTransit::kTypeRawChannel); + + if (message_view.subtype() == + MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles) { + // We don't need to do anything. |RawChannel| won't extract the platform + // handles, and they'll be accumulated in |Read()|. + return true; + } + + return RawChannel::OnReadMessageForRawChannel(message_view); +} + RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); DCHECK(!pending_read_); @@ -141,11 +190,6 @@ RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles, embedder::kPlatformChannelMaxNumHandles); - if (read_result != 1) { - LOG(WARNING) << "Invalid control message with platform handles"; - return IO_FAILED; - } - // We should never accumulate more than |TransportData::kMaxPlatformHandles // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is // possible because we could have accumulated all the handles for a message, @@ -158,9 +202,6 @@ RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { read_platform_handles_.clear(); return IO_FAILED; } - - // This wasn't a data read (platform handles just get stashed). - return ScheduleRead(); } if (read_result > 0) { @@ -218,33 +259,39 @@ RawChannel::IOResult RawChannelPosix::WriteNoLock( DCHECK(!pending_write_); + size_t num_platform_handles = 0; + ssize_t write_result; if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { - size_t num_platform_handles; embedder::PlatformHandle* platform_handles; void* serialization_data; // Actually unused. write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles, &platform_handles, &serialization_data); DCHECK_GT(num_platform_handles, 0u); + DCHECK_LE(num_platform_handles, embedder::kPlatformChannelMaxNumHandles); DCHECK(platform_handles); - DCHECK(serialization_data); - - size_t num_to_send = std::min(num_platform_handles, - embedder::kPlatformChannelMaxNumHandles); - bool succeeded = embedder::PlatformChannelSendHandles(fd_.get(), - platform_handles, - num_to_send); - if (succeeded) { - *platform_handles_written = num_to_send; - *bytes_written = 0; - return IO_SUCCEEDED; + + // TODO(vtl): Reduce code duplication. (This is duplicated from below.) + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); + const size_t kMaxBufferCount = 10; + iovec iov[kMaxBufferCount]; + size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); + for (size_t i = 0; i < buffer_count; ++i) { + iov[i].iov_base = const_cast<char*>(buffers[i].addr); + iov[i].iov_len = buffers[i].size; } + + write_result = embedder::PlatformChannelSendmsgWithHandles( + fd_.get(), iov, buffer_count, platform_handles, num_platform_handles); + for (size_t i = 0; i < num_platform_handles; i++) + platform_handles[i].CloseIfNecessary(); } else { std::vector<WriteBuffer::Buffer> buffers; write_buffer_no_lock()->GetBuffers(&buffers); DCHECK(!buffers.empty()); - ssize_t write_result; if (buffers.size() == 1) { write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr, buffers[0].size); @@ -252,7 +299,6 @@ RawChannel::IOResult RawChannelPosix::WriteNoLock( const size_t kMaxBufferCount = 10; iovec iov[kMaxBufferCount]; size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); - for (size_t i = 0; i < buffer_count; ++i) { iov[i].iov_base = const_cast<char*>(buffers[i].addr); iov[i].iov_len = buffers[i].size; @@ -261,12 +307,12 @@ RawChannel::IOResult RawChannelPosix::WriteNoLock( write_result = embedder::PlatformChannelWritev(fd_.get(), iov, buffer_count); } + } - if (write_result >= 0) { - *platform_handles_written = 0; - *bytes_written = static_cast<size_t>(write_result); - return IO_SUCCEEDED; - } + if (write_result >= 0) { + *platform_handles_written = num_platform_handles; + *bytes_written = static_cast<size_t>(write_result); + return IO_SUCCEEDED; } if (errno != EAGAIN && errno != EWOULDBLOCK) { diff --git a/mojo/system/transport_data.cc b/mojo/system/transport_data.cc index d7b4e5f..10a3228e 100644 --- a/mojo/system/transport_data.cc +++ b/mojo/system/transport_data.cc @@ -63,8 +63,7 @@ struct TransportData::PrivateStructForCompileAsserts { }; TransportData::TransportData(scoped_ptr<DispatcherVector> dispatchers, - Channel* channel) - : buffer_size_(0) { + Channel* channel) { DCHECK(dispatchers); DCHECK(channel); @@ -193,6 +192,17 @@ TransportData::TransportData(scoped_ptr<DispatcherVector> dispatchers, // |dispatchers_| will be destroyed as it goes out of scope. } +#if defined(OS_POSIX) +TransportData::TransportData( + embedder::ScopedPlatformHandleVectorPtr platform_handles) + : buffer_size_(sizeof(Header)), + platform_handles_(platform_handles.Pass()) { + buffer_.reset(static_cast<char*>( + base::AlignedAlloc(buffer_size_, MessageInTransit::kMessageAlignment))); + memset(buffer_.get(), 0, buffer_size_); +} +#endif // defined(OS_POSIX) + TransportData::~TransportData() { } @@ -212,8 +222,15 @@ const char* TransportData::ValidateBuffer( const Header* header = static_cast<const Header*>(buffer); const size_t num_handles = header->num_handles; + +#if !defined(OS_POSIX) + // On POSIX, we send control messages with platform handles (but no handles) + // attached (see the comments for + // |TransportData(embedder::ScopedPlatformHandleVectorPtr)|. (This check isn't + // important security-wise anyway.) if (num_handles == 0) return "Message has no handles attached, but secondary buffer present"; +#endif // Sanity-check |num_handles| (before multiplying it against anything). if (num_handles > kMaxMessageNumHandles) diff --git a/mojo/system/transport_data.h b/mojo/system/transport_data.h index 8215b7c..ddaa20b 100644 --- a/mojo/system/transport_data.h +++ b/mojo/system/transport_data.h @@ -12,6 +12,7 @@ #include "base/macros.h" #include "base/memory/aligned_memory.h" #include "base/memory/scoped_ptr.h" +#include "build/build_config.h" #include "mojo/embedder/platform_handle.h" #include "mojo/embedder/platform_handle_vector.h" #include "mojo/system/dispatcher.h" @@ -20,6 +21,8 @@ namespace mojo { namespace system { +class Channel; + // This class is used by |MessageInTransit| to represent handles (|Dispatcher|s) // in various stages of serialization. // @@ -87,6 +90,17 @@ class MOJO_SYSTEM_IMPL_EXPORT TransportData { static const size_t kMaxPlatformHandles; TransportData(scoped_ptr<DispatcherVector> dispatchers, Channel* channel); + +#if defined(OS_POSIX) + // This is a hacky POSIX-only constructor to directly attach only platform + // handles to a message, used by |RawChannelPosix| to split messages with too + // many platform handles into multiple messages. |Header| will be present, but + // be zero. (No other information will be attached, and + // |RawChannel::GetSerializedPlatformHandleSize()| should return zero.) + explicit TransportData( + embedder::ScopedPlatformHandleVectorPtr platform_handles); +#endif + ~TransportData(); const void* buffer() const { return buffer_.get(); } |