diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-05-09 21:31:35 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-05-09 21:31:35 +0000 |
commit | a33c75b331607d78dc09b2a89be88076c18e269f (patch) | |
tree | 7aed37c008cb2a6414c09be8bbf8262d01b38025 | |
parent | 6fd8aa75b22c68dcfbc3d6d30d1ce96cc20bf917 (diff) | |
download | chromium_src-a33c75b331607d78dc09b2a89be88076c18e269f.zip chromium_src-a33c75b331607d78dc09b2a89be88076c18e269f.tar.gz chromium_src-a33c75b331607d78dc09b2a89be88076c18e269f.tar.bz2 |
Mojo: (Theoretically) implement sending PlatformHandles (i.e., FDs) across a RawChannel on POSIX.
The read side isn't implemented yet, so I can't test it yet.
R=yzshen@chromium.org
Review URL: https://codereview.chromium.org/279613002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@269412 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | mojo/system/channel_unittest.cc | 3 | ||||
-rw-r--r-- | mojo/system/message_in_transit.h | 1 | ||||
-rw-r--r-- | mojo/system/raw_channel.cc | 123 | ||||
-rw-r--r-- | mojo/system/raw_channel.h | 63 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 103 | ||||
-rw-r--r-- | mojo/system/raw_channel_win.cc | 21 | ||||
-rw-r--r-- | mojo/system/transport_data.h | 17 |
7 files changed, 229 insertions, 102 deletions
diff --git a/mojo/system/channel_unittest.cc b/mojo/system/channel_unittest.cc index 671b251..a96c4d1 100644 --- a/mojo/system/channel_unittest.cc +++ b/mojo/system/channel_unittest.cc @@ -126,7 +126,6 @@ class MockRawChannelOnInitFails : public RawChannel { // |RawChannel| public methods: virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE { - CHECK(false); return 0; } @@ -140,7 +139,7 @@ class MockRawChannelOnInitFails : public RawChannel { CHECK(false); return IO_FAILED; } - virtual IOResult WriteNoLock(size_t*) OVERRIDE { + virtual IOResult WriteNoLock(size_t*, size_t*) OVERRIDE { CHECK(false); return IO_FAILED; } diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index 38c38d4..8b9fbbb 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -163,6 +163,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { // Gets the transport data buffer (if any). const TransportData* transport_data() const { return transport_data_.get(); } + TransportData* transport_data() { return transport_data_.get(); } // Gets the total size of the message (see comment in |Header|, below). size_t total_size() const { return header()->total_size; } diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc index 27a8c28..548f8af 100644 --- a/mojo/system/raw_channel.cc +++ b/mojo/system/raw_channel.cc @@ -21,10 +21,15 @@ namespace system { const size_t kReadSize = 4096; -RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { +// RawChannel::ReadBuffer ------------------------------------------------------ + +RawChannel::ReadBuffer::ReadBuffer() + : buffer_(kReadSize), + num_valid_bytes_(0) { } -RawChannel::ReadBuffer::~ReadBuffer() {} +RawChannel::ReadBuffer::~ReadBuffer() { +} void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); @@ -32,12 +37,62 @@ void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { *size = kReadSize; } -RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} +// RawChannel::WriteBuffer ----------------------------------------------------- + +RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) + : serialized_platform_handle_size_(serialized_platform_handle_size), + platform_handles_offset_(0), + data_offset_(0) { +} RawChannel::WriteBuffer::~WriteBuffer() { STLDeleteElements(&message_queue_); } +bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { + if (message_queue_.empty()) + return false; + + const TransportData* transport_data = + message_queue_.front()->transport_data(); + if (!transport_data) + return false; + + const std::vector<embedder::PlatformHandle>* all_platform_handles = + transport_data->platform_handles(); + if (!all_platform_handles) { + DCHECK_EQ(platform_handles_offset_, 0u); + return false; + } + if (platform_handles_offset_ >= all_platform_handles->size()) { + DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); + return false; + } + + return true; +} + +void RawChannel::WriteBuffer::GetPlatformHandlesToSend( + size_t* num_platform_handles, + embedder::PlatformHandle** platform_handles, + void** serialization_data) { + DCHECK(HavePlatformHandlesToSend()); + + TransportData* transport_data = message_queue_.front()->transport_data(); + std::vector<embedder::PlatformHandle>* all_platform_handles = + transport_data->platform_handles(); + *num_platform_handles = + all_platform_handles->size() - platform_handles_offset_; + *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; + size_t serialization_data_offset = + transport_data->platform_handle_table_offset(); + DCHECK_GT(serialization_data_offset, 0u); + serialization_data_offset += + platform_handles_offset_ * serialized_platform_handle_size_; + *serialization_data = + static_cast<char*>(transport_data->buffer()) + serialization_data_offset; +} + void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { buffers->clear(); @@ -45,42 +100,42 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { return; MessageInTransit* message = message_queue_.front(); - DCHECK_LT(offset_, message->total_size()); - size_t bytes_to_write = message->total_size() - offset_; + DCHECK_LT(data_offset_, message->total_size()); + size_t bytes_to_write = message->total_size() - data_offset_; size_t transport_data_buffer_size = message->transport_data() ? message->transport_data()->buffer_size() : 0; if (!transport_data_buffer_size) { // Only write from the main buffer. - DCHECK_LT(offset_, message->main_buffer_size()); + DCHECK_LT(data_offset_, message->main_buffer_size()); DCHECK_LE(bytes_to_write, message->main_buffer_size()); Buffer buffer = { - static_cast<const char*>(message->main_buffer()) + offset_, + static_cast<const char*>(message->main_buffer()) + data_offset_, bytes_to_write}; buffers->push_back(buffer); return; } - if (offset_ >= message->main_buffer_size()) { + if (data_offset_ >= message->main_buffer_size()) { // Only write from the transport data buffer. - DCHECK_LT(offset_ - message->main_buffer_size(), + DCHECK_LT(data_offset_ - message->main_buffer_size(), transport_data_buffer_size); DCHECK_LE(bytes_to_write, transport_data_buffer_size); Buffer buffer = { static_cast<const char*>(message->transport_data()->buffer()) + - (offset_ - message->main_buffer_size()), + (data_offset_ - message->main_buffer_size()), bytes_to_write}; buffers->push_back(buffer); return; } // Write from both buffers. - DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + + DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + transport_data_buffer_size); Buffer buffer1 = { - static_cast<const char*>(message->main_buffer()) + offset_, - message->main_buffer_size() - offset_ + static_cast<const char*>(message->main_buffer()) + data_offset_, + message->main_buffer_size() - data_offset_ }; buffers->push_back(buffer1); Buffer buffer2 = { @@ -90,6 +145,8 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { buffers->push_back(buffer2); } +// RawChannel ------------------------------------------------------------------ + RawChannel::RawChannel() : message_loop_for_io_(NULL), delegate_(NULL), @@ -123,7 +180,7 @@ bool RawChannel::Init(Delegate* delegate) { DCHECK(!read_buffer_); read_buffer_.reset(new ReadBuffer); DCHECK(!write_buffer_); - write_buffer_.reset(new WriteBuffer); + write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); if (!OnInit()) { delegate_ = NULL; @@ -157,13 +214,6 @@ void RawChannel::Shutdown() { bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { DCHECK(message); - // TODO(vtl) - if (message->transport_data() && - message->transport_data()->has_platform_handles()) { - NOTIMPLEMENTED(); - return false; - } - base::AutoLock locker(write_lock_); if (write_stopped_) return false; @@ -174,14 +224,16 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { } write_buffer_->message_queue_.push_front(message.release()); - DCHECK_EQ(write_buffer_->offset_, 0u); + DCHECK_EQ(write_buffer_->data_offset_, 0u); + size_t platform_handles_written = 0; size_t bytes_written = 0; - IOResult io_result = WriteNoLock(&bytes_written); + IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); if (io_result == IO_PENDING) return true; bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, + platform_handles_written, bytes_written); if (!result) { // Even if we're on the I/O thread, don't call |OnFatalError()| in the @@ -312,7 +364,9 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { } while (io_result != IO_PENDING); } -void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { +void RawChannel::OnWriteCompleted(bool result, + size_t platform_handles_written, + size_t bytes_written) { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); bool did_fail = false; @@ -325,7 +379,9 @@ void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { return; } - did_fail = !OnWriteCompletedNoLock(result, bytes_written); + did_fail = !OnWriteCompletedNoLock(result, + platform_handles_written, + bytes_written); } if (did_fail) @@ -339,22 +395,26 @@ void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { delegate_->OnFatalError(fatal_error); } -bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { +bool RawChannel::OnWriteCompletedNoLock(bool result, + size_t platform_handles_written, + size_t bytes_written) { write_lock_.AssertAcquired(); DCHECK(!write_stopped_); DCHECK(!write_buffer_->message_queue_.empty()); if (result) { - write_buffer_->offset_ += bytes_written; + write_buffer_->platform_handles_offset_ += platform_handles_written; + write_buffer_->data_offset_ += bytes_written; MessageInTransit* message = write_buffer_->message_queue_.front(); - if (write_buffer_->offset_ >= message->total_size()) { + if (write_buffer_->data_offset_ >= message->total_size()) { // Complete write. - DCHECK_EQ(write_buffer_->offset_, message->total_size()); + DCHECK_EQ(write_buffer_->data_offset_, message->total_size()); write_buffer_->message_queue_.pop_front(); delete message; - write_buffer_->offset_ = 0; + write_buffer_->platform_handles_offset_ = 0; + write_buffer_->data_offset_ = 0; if (write_buffer_->message_queue_.empty()) return true; @@ -369,7 +429,8 @@ bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { write_stopped_ = true; STLDeleteElements(&write_buffer_->message_queue_); - write_buffer_->offset_ = 0; + write_buffer_->platform_handles_offset_ = 0; + write_buffer_->data_offset_ = 0; return false; } diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h index 1747db5..5eb9d0f 100644 --- a/mojo/system/raw_channel.h +++ b/mojo/system/raw_channel.h @@ -136,9 +136,24 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { size_t size; }; - WriteBuffer(); + explicit WriteBuffer(size_t serialized_platform_handle_size); ~WriteBuffer(); + // Returns true if there are (more) platform handles to be sent (from the + // front of |message_queue_|). + bool HavePlatformHandlesToSend() const; + // Gets platform handles to be sent (from the front of |message_queue_|). + // This should only be called if |HavePlatformHandlesToSend()| returned + // true. There are two components to this: the actual |PlatformHandle|s + // (which should be closed once sent) and any additional serialization + // information (which will be embedded in the message's data; there are + // |GetSerializedPlatformHandleSize()| bytes per handle). Once all platform + // handles have been sent, the message data should be written next (see + // |GetBuffers()|). + void GetPlatformHandlesToSend(size_t* num_platform_handles, + embedder::PlatformHandle** platform_handles, + void** serialization_data); + // Gets buffers to be written. These buffers will always come from the front // of |message_queue_|. Once they are completely written, the front // |MessageInTransit| should be popped (and destroyed); this is done in @@ -148,12 +163,19 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { private: friend class RawChannel; + const size_t serialized_platform_handle_size_; + // TODO(vtl): When C++11 is available, switch this to a deque of // |scoped_ptr|/|unique_ptr|s. std::deque<MessageInTransit*> message_queue_; - // The first message may have been partially sent. |offset_| indicates the - // position in the first message where to start the next write. - size_t offset_; + // Platform handles are sent before the message data, but doing so may + // require several passes. |platform_handles_offset_| indicates the position + // in the first message's vector of platform handles to send next. + size_t platform_handles_offset_; + // The first message's data may have been partially sent. |data_offset_| + // indicates the position in the first message's data to start the next + // write. + size_t data_offset_; DISALLOW_COPY_AND_ASSIGN(WriteBuffer); }; @@ -177,32 +199,33 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // - the method is called on the I/O thread WITHOUT |write_lock_| held. // // The implementing subclass must guarantee that: - // - |bytes_read| is untouched if the method returns values other than - // IO_SUCCEEDED; - // - if the method returns IO_PENDING, |OnReadCompleted()| will be called on + // - |bytes_read| is untouched unless |Read()| returns |IO_SUCCEEDED|; + // - if the method returns |IO_PENDING|, |OnReadCompleted()| will be called on // the I/O thread to report the result, unless |Shutdown()| is called. virtual IOResult Read(size_t* bytes_read) = 0; // Similar to |Read()|, except that the implementing subclass must also // guarantee that the method doesn't succeed synchronously, i.e., it only - // returns IO_FAILED or IO_PENDING. + // returns |IO_FAILED| or |IO_PENDING|. virtual IOResult ScheduleRead() = 0; // Writes contents in |write_buffer_no_lock()|. // This class guarantees that: - // - the area indicated by |GetBuffers()| will stay valid until write - // completion (but please also see the comments for |OnShutdownNoLock()|); + // - the |PlatformHandle|s given by |GetPlatformHandlesToSend()| and the + // buffer(s) given by |GetBuffers()| will remain valid until write + // completion (see also the comments for |OnShutdownNoLock()|); // - a second write is not started if there is a pending write; // - the method is called under |write_lock_|. // // The implementing subclass must guarantee that: - // - |bytes_written| is untouched if the method returns values other than - // IO_SUCCEEDED; - // - if the method returns IO_PENDING, |OnWriteCompleted()| will be called on - // the I/O thread to report the result, unless |Shutdown()| is called. - virtual IOResult WriteNoLock(size_t* bytes_written) = 0; + // - |platform_handles_written| and |bytes_written| are untouched unless + // |WriteNoLock()| returns |IO_SUCCEEDED|; + // - if the method returns |IO_PENDING|, |OnWriteCompleted()| will be called + // on the I/O thread to report the result, unless |Shutdown()| is called. + virtual IOResult WriteNoLock(size_t* platform_handles_written, + size_t* bytes_written) = 0; // Similar to |WriteNoLock()|, except that the implementing subclass must also // guarantee that the method doesn't succeed synchronously, i.e., it only - // returns IO_FAILED or IO_PENDING. + // returns |IO_FAILED| or |IO_PENDING|. virtual IOResult ScheduleWriteNoLock() = 0; // Must be called on the I/O thread WITHOUT |write_lock_| held. @@ -217,7 +240,9 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // Must be called on the I/O thread WITHOUT |write_lock_| held. void OnReadCompleted(bool result, size_t bytes_read); // Must be called on the I/O thread WITHOUT |write_lock_| held. - void OnWriteCompleted(bool result, size_t bytes_written); + void OnWriteCompleted(bool result, + size_t platform_handles_written, + size_t bytes_written); private: // Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O @@ -229,7 +254,9 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel { // false or any error occurs during the method execution, cancels pending // writes and returns false. // Must be called only if |write_stopped_| is false and under |write_lock_|. - bool OnWriteCompletedNoLock(bool result, size_t bytes_written); + bool OnWriteCompletedNoLock(bool result, + size_t platform_handles_written, + size_t bytes_written); // Set in |Init()| and never changed (hence usable on any thread without // locking): diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index 10ef535..1fd4a31 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -40,7 +40,8 @@ class RawChannelPosix : public RawChannel, // |RawChannel| protected methods: virtual IOResult Read(size_t* bytes_read) OVERRIDE; virtual IOResult ScheduleRead() OVERRIDE; - virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; + virtual IOResult WriteNoLock(size_t* platform_handles_written, + size_t* bytes_written) OVERRIDE; virtual IOResult ScheduleWriteNoLock() OVERRIDE; virtual bool OnInit() OVERRIDE; virtual void OnShutdownNoLock( @@ -155,49 +156,66 @@ RawChannel::IOResult RawChannelPosix::ScheduleRead() { return IO_PENDING; } -RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { +RawChannel::IOResult RawChannelPosix::WriteNoLock( + size_t* platform_handles_written, + size_t* bytes_written) { write_lock().AssertAcquired(); DCHECK(!pending_write_); - std::vector<WriteBuffer::Buffer> buffers; - write_buffer_no_lock()->GetBuffers(&buffers); - DCHECK(!buffers.empty()); - - ssize_t write_result = -1; - if (buffers.size() == 1) { - write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr, - buffers[0].size); + if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { + size_t num_platform_handles; + embedder::PlatformHandle* platform_handles; + void* serialization_data; // Actually unused. + write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles, + &platform_handles, + &serialization_data); + DCHECK_GT(num_platform_handles, 0u); + DCHECK(platform_handles); + DCHECK(serialization_data); + + size_t num_to_send = std::min(num_platform_handles, + embedder::kPlatformChannelMaxNumHandles); + bool succeeded = embedder::PlatformChannelSendHandles(fd_.get(), + platform_handles, + num_to_send); + if (succeeded) { + *platform_handles_written = num_to_send; + *bytes_written = 0; + return IO_SUCCEEDED; + } } else { - // Note that using |writev()|/|sendmsg()| is measurably slower than using - // |write()| -- at least in a microbenchmark -- but much faster than using - // multiple |write()|s. (|sendmsg()| is also measurably slightly slower than - // |writev()|.) - // - // On Linux, we need to use |sendmsg()| since it's the only way to suppress - // |SIGPIPE| (on Mac, this is suppressed on the socket itself using - // |setsockopt()|, since |MSG_NOSIGNAL| is not supported -- see - // platform_channel_pair_posix.cc). - const size_t kMaxBufferCount = 10; - iovec iov[kMaxBufferCount]; - size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); - - for (size_t i = 0; i < buffer_count; ++i) { - iov[i].iov_base = const_cast<char*>(buffers[i].addr); - iov[i].iov_len = buffers[i].size; + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); + + ssize_t write_result; + if (buffers.size() == 1) { + write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr, + buffers[0].size); + } else { + const size_t kMaxBufferCount = 10; + iovec iov[kMaxBufferCount]; + size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); + + for (size_t i = 0; i < buffer_count; ++i) { + iov[i].iov_base = const_cast<char*>(buffers[i].addr); + iov[i].iov_len = buffers[i].size; + } + + write_result = embedder::PlatformChannelWritev(fd_.get(), iov, + buffer_count); } - write_result = embedder::PlatformChannelWritev(fd_.get(), iov, - buffer_count); - } - - if (write_result >= 0) { - *bytes_written = static_cast<size_t>(write_result); - return IO_SUCCEEDED; + if (write_result >= 0) { + *platform_handles_written = 0; + *bytes_written = static_cast<size_t>(write_result); + return IO_SUCCEEDED; + } } if (errno != EAGAIN && errno != EWOULDBLOCK) { - PLOG(ERROR) << "write"; + PLOG(ERROR) << "sendmsg/write/writev"; return IO_FAILED; } @@ -220,9 +238,8 @@ RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { return IO_PENDING; } - if (message_loop_for_io()->WatchFileDescriptor( - fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, - write_watcher_.get(), this)) { + if (message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, false, + base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this)) { pending_write_ = true; return IO_PENDING; } @@ -300,6 +317,7 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); IOResult result = IO_FAILED; + size_t platform_handles_written = 0; size_t bytes_written = 0; { base::AutoLock locker(write_lock()); @@ -307,11 +325,14 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { DCHECK(pending_write_); pending_write_ = false; - result = WriteNoLock(&bytes_written); + result = WriteNoLock(&platform_handles_written, &bytes_written); } - if (result != IO_PENDING) - OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); + if (result != IO_PENDING) { + OnWriteCompleted(result == IO_SUCCEEDED, + platform_handles_written, + bytes_written); + } } void RawChannelPosix::WaitToWrite() { @@ -328,7 +349,7 @@ void RawChannelPosix::WaitToWrite() { DCHECK(pending_write_); pending_write_ = false; } - OnWriteCompleted(false, 0); + OnWriteCompleted(false, 0, 0); } } diff --git a/mojo/system/raw_channel_win.cc b/mojo/system/raw_channel_win.cc index 28c7302..515311e 100644 --- a/mojo/system/raw_channel_win.cc +++ b/mojo/system/raw_channel_win.cc @@ -162,7 +162,8 @@ class RawChannelWin : public RawChannel { // |RawChannel| private methods: virtual IOResult Read(size_t* bytes_read) OVERRIDE; virtual IOResult ScheduleRead() OVERRIDE; - virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; + virtual IOResult WriteNoLock(size_t* platform_handles_written, + size_t* bytes_written) OVERRIDE; virtual IOResult ScheduleWriteNoLock() OVERRIDE; virtual bool OnInit() OVERRIDE; virtual void OnShutdownNoLock( @@ -331,9 +332,9 @@ void RawChannelWin::RawChannelIOHandler::OnWriteCompleted(DWORD bytes_written, if (error != ERROR_SUCCESS) { LOG(ERROR) << "WriteFile: " << logging::SystemErrorCodeToString(error); - owner_->OnWriteCompleted(false, 0); + owner_->OnWriteCompleted(false, 0, 0); } else { - owner_->OnWriteCompleted(true, bytes_written); + owner_->OnWriteCompleted(true, 0, bytes_written); } } @@ -424,12 +425,19 @@ RawChannel::IOResult RawChannelWin::ScheduleRead() { return io_result; } -RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) { +RawChannel::IOResult RawChannelWin::WriteNoLock( + size_t* platform_handles_written, + size_t* bytes_written) { write_lock().AssertAcquired(); DCHECK(io_handler_); DCHECK(!io_handler_->pending_write_no_lock()); + if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { + // TODO(vtl): Implement. + NOTIMPLEMENTED(); + } + std::vector<WriteBuffer::Buffer> buffers; write_buffer_no_lock()->GetBuffers(&buffers); DCHECK(!buffers.empty()); @@ -447,6 +455,7 @@ RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) { } if (result && skip_completion_port_on_success_) { + *platform_handles_written = 0; *bytes_written = bytes_written_dword; return IO_SUCCEEDED; } @@ -470,8 +479,10 @@ RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() { DCHECK(io_handler_); DCHECK(!io_handler_->pending_write_no_lock()); + // TODO(vtl): Do something with |platform_handles_written|. + size_t platform_handles_written = 0; size_t bytes_written = 0; - IOResult io_result = WriteNoLock(&bytes_written); + IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); if (io_result == IO_SUCCEEDED) { DCHECK(skip_completion_port_on_success_); diff --git a/mojo/system/transport_data.h b/mojo/system/transport_data.h index 4dc9238..0f03c81 100644 --- a/mojo/system/transport_data.h +++ b/mojo/system/transport_data.h @@ -83,17 +83,20 @@ class MOJO_SYSTEM_IMPL_EXPORT TransportData { ~TransportData(); const void* buffer() const { return buffer_.get(); } + void* buffer() { return buffer_.get(); } size_t buffer_size() const { return buffer_size_; } + uint32_t platform_handle_table_offset() const { + return header()->platform_handle_table_offset; + } + // Gets attached platform-specific handles; this may return null if there are // none. Note that the caller may mutate the set of platform-specific handles. - std::vector<embedder::PlatformHandle>* platform_handles() { + const std::vector<embedder::PlatformHandle>* platform_handles() const { return platform_handles_.get(); } - - // Returns true if there are platform-specific handles attached. - bool has_platform_handles() const { - return platform_handles_ && !platform_handles_->empty(); + std::vector<embedder::PlatformHandle>* platform_handles() { + return platform_handles_.get(); } // Receive-side functions: @@ -143,6 +146,10 @@ class MOJO_SYSTEM_IMPL_EXPORT TransportData { // The maximum total number of platform handles that may be attached. static const size_t kMaxPlatformHandles; + const Header* header() const { + return reinterpret_cast<const Header*>(buffer_.get()); + } + size_t buffer_size_; scoped_ptr<char, base::AlignedFreeDeleter> buffer_; // Never null. |