summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-05-09 21:31:35 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-05-09 21:31:35 +0000
commita33c75b331607d78dc09b2a89be88076c18e269f (patch)
tree7aed37c008cb2a6414c09be8bbf8262d01b38025
parent6fd8aa75b22c68dcfbc3d6d30d1ce96cc20bf917 (diff)
downloadchromium_src-a33c75b331607d78dc09b2a89be88076c18e269f.zip
chromium_src-a33c75b331607d78dc09b2a89be88076c18e269f.tar.gz
chromium_src-a33c75b331607d78dc09b2a89be88076c18e269f.tar.bz2
Mojo: (Theoretically) implement sending PlatformHandles (i.e., FDs) across a RawChannel on POSIX.
The read side isn't implemented yet, so I can't test it yet. R=yzshen@chromium.org Review URL: https://codereview.chromium.org/279613002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@269412 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--mojo/system/channel_unittest.cc3
-rw-r--r--mojo/system/message_in_transit.h1
-rw-r--r--mojo/system/raw_channel.cc123
-rw-r--r--mojo/system/raw_channel.h63
-rw-r--r--mojo/system/raw_channel_posix.cc103
-rw-r--r--mojo/system/raw_channel_win.cc21
-rw-r--r--mojo/system/transport_data.h17
7 files changed, 229 insertions, 102 deletions
diff --git a/mojo/system/channel_unittest.cc b/mojo/system/channel_unittest.cc
index 671b251..a96c4d1 100644
--- a/mojo/system/channel_unittest.cc
+++ b/mojo/system/channel_unittest.cc
@@ -126,7 +126,6 @@ class MockRawChannelOnInitFails : public RawChannel {
// |RawChannel| public methods:
virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE {
- CHECK(false);
return 0;
}
@@ -140,7 +139,7 @@ class MockRawChannelOnInitFails : public RawChannel {
CHECK(false);
return IO_FAILED;
}
- virtual IOResult WriteNoLock(size_t*) OVERRIDE {
+ virtual IOResult WriteNoLock(size_t*, size_t*) OVERRIDE {
CHECK(false);
return IO_FAILED;
}
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 38c38d4..8b9fbbb 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -163,6 +163,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// Gets the transport data buffer (if any).
const TransportData* transport_data() const { return transport_data_.get(); }
+ TransportData* transport_data() { return transport_data_.get(); }
// Gets the total size of the message (see comment in |Header|, below).
size_t total_size() const { return header()->total_size; }
diff --git a/mojo/system/raw_channel.cc b/mojo/system/raw_channel.cc
index 27a8c28..548f8af 100644
--- a/mojo/system/raw_channel.cc
+++ b/mojo/system/raw_channel.cc
@@ -21,10 +21,15 @@ namespace system {
const size_t kReadSize = 4096;
-RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
+// RawChannel::ReadBuffer ------------------------------------------------------
+
+RawChannel::ReadBuffer::ReadBuffer()
+ : buffer_(kReadSize),
+ num_valid_bytes_(0) {
}
-RawChannel::ReadBuffer::~ReadBuffer() {}
+RawChannel::ReadBuffer::~ReadBuffer() {
+}
void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
@@ -32,12 +37,62 @@ void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
*size = kReadSize;
}
-RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
+// RawChannel::WriteBuffer -----------------------------------------------------
+
+RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
+ : serialized_platform_handle_size_(serialized_platform_handle_size),
+ platform_handles_offset_(0),
+ data_offset_(0) {
+}
RawChannel::WriteBuffer::~WriteBuffer() {
STLDeleteElements(&message_queue_);
}
+bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
+ if (message_queue_.empty())
+ return false;
+
+ const TransportData* transport_data =
+ message_queue_.front()->transport_data();
+ if (!transport_data)
+ return false;
+
+ const std::vector<embedder::PlatformHandle>* all_platform_handles =
+ transport_data->platform_handles();
+ if (!all_platform_handles) {
+ DCHECK_EQ(platform_handles_offset_, 0u);
+ return false;
+ }
+ if (platform_handles_offset_ >= all_platform_handles->size()) {
+ DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
+ return false;
+ }
+
+ return true;
+}
+
+void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
+ size_t* num_platform_handles,
+ embedder::PlatformHandle** platform_handles,
+ void** serialization_data) {
+ DCHECK(HavePlatformHandlesToSend());
+
+ TransportData* transport_data = message_queue_.front()->transport_data();
+ std::vector<embedder::PlatformHandle>* all_platform_handles =
+ transport_data->platform_handles();
+ *num_platform_handles =
+ all_platform_handles->size() - platform_handles_offset_;
+ *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
+ size_t serialization_data_offset =
+ transport_data->platform_handle_table_offset();
+ DCHECK_GT(serialization_data_offset, 0u);
+ serialization_data_offset +=
+ platform_handles_offset_ * serialized_platform_handle_size_;
+ *serialization_data =
+ static_cast<char*>(transport_data->buffer()) + serialization_data_offset;
+}
+
void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
buffers->clear();
@@ -45,42 +100,42 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
return;
MessageInTransit* message = message_queue_.front();
- DCHECK_LT(offset_, message->total_size());
- size_t bytes_to_write = message->total_size() - offset_;
+ DCHECK_LT(data_offset_, message->total_size());
+ size_t bytes_to_write = message->total_size() - data_offset_;
size_t transport_data_buffer_size = message->transport_data() ?
message->transport_data()->buffer_size() : 0;
if (!transport_data_buffer_size) {
// Only write from the main buffer.
- DCHECK_LT(offset_, message->main_buffer_size());
+ DCHECK_LT(data_offset_, message->main_buffer_size());
DCHECK_LE(bytes_to_write, message->main_buffer_size());
Buffer buffer = {
- static_cast<const char*>(message->main_buffer()) + offset_,
+ static_cast<const char*>(message->main_buffer()) + data_offset_,
bytes_to_write};
buffers->push_back(buffer);
return;
}
- if (offset_ >= message->main_buffer_size()) {
+ if (data_offset_ >= message->main_buffer_size()) {
// Only write from the transport data buffer.
- DCHECK_LT(offset_ - message->main_buffer_size(),
+ DCHECK_LT(data_offset_ - message->main_buffer_size(),
transport_data_buffer_size);
DCHECK_LE(bytes_to_write, transport_data_buffer_size);
Buffer buffer = {
static_cast<const char*>(message->transport_data()->buffer()) +
- (offset_ - message->main_buffer_size()),
+ (data_offset_ - message->main_buffer_size()),
bytes_to_write};
buffers->push_back(buffer);
return;
}
// Write from both buffers.
- DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
+ DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
transport_data_buffer_size);
Buffer buffer1 = {
- static_cast<const char*>(message->main_buffer()) + offset_,
- message->main_buffer_size() - offset_
+ static_cast<const char*>(message->main_buffer()) + data_offset_,
+ message->main_buffer_size() - data_offset_
};
buffers->push_back(buffer1);
Buffer buffer2 = {
@@ -90,6 +145,8 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
buffers->push_back(buffer2);
}
+// RawChannel ------------------------------------------------------------------
+
RawChannel::RawChannel()
: message_loop_for_io_(NULL),
delegate_(NULL),
@@ -123,7 +180,7 @@ bool RawChannel::Init(Delegate* delegate) {
DCHECK(!read_buffer_);
read_buffer_.reset(new ReadBuffer);
DCHECK(!write_buffer_);
- write_buffer_.reset(new WriteBuffer);
+ write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
if (!OnInit()) {
delegate_ = NULL;
@@ -157,13 +214,6 @@ void RawChannel::Shutdown() {
bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
- // TODO(vtl)
- if (message->transport_data() &&
- message->transport_data()->has_platform_handles()) {
- NOTIMPLEMENTED();
- return false;
- }
-
base::AutoLock locker(write_lock_);
if (write_stopped_)
return false;
@@ -174,14 +224,16 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
}
write_buffer_->message_queue_.push_front(message.release());
- DCHECK_EQ(write_buffer_->offset_, 0u);
+ DCHECK_EQ(write_buffer_->data_offset_, 0u);
+ size_t platform_handles_written = 0;
size_t bytes_written = 0;
- IOResult io_result = WriteNoLock(&bytes_written);
+ IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
if (io_result == IO_PENDING)
return true;
bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
+ platform_handles_written,
bytes_written);
if (!result) {
// Even if we're on the I/O thread, don't call |OnFatalError()| in the
@@ -312,7 +364,9 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
} while (io_result != IO_PENDING);
}
-void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
+void RawChannel::OnWriteCompleted(bool result,
+ size_t platform_handles_written,
+ size_t bytes_written) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
bool did_fail = false;
@@ -325,7 +379,9 @@ void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
return;
}
- did_fail = !OnWriteCompletedNoLock(result, bytes_written);
+ did_fail = !OnWriteCompletedNoLock(result,
+ platform_handles_written,
+ bytes_written);
}
if (did_fail)
@@ -339,22 +395,26 @@ void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
delegate_->OnFatalError(fatal_error);
}
-bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
+bool RawChannel::OnWriteCompletedNoLock(bool result,
+ size_t platform_handles_written,
+ size_t bytes_written) {
write_lock_.AssertAcquired();
DCHECK(!write_stopped_);
DCHECK(!write_buffer_->message_queue_.empty());
if (result) {
- write_buffer_->offset_ += bytes_written;
+ write_buffer_->platform_handles_offset_ += platform_handles_written;
+ write_buffer_->data_offset_ += bytes_written;
MessageInTransit* message = write_buffer_->message_queue_.front();
- if (write_buffer_->offset_ >= message->total_size()) {
+ if (write_buffer_->data_offset_ >= message->total_size()) {
// Complete write.
- DCHECK_EQ(write_buffer_->offset_, message->total_size());
+ DCHECK_EQ(write_buffer_->data_offset_, message->total_size());
write_buffer_->message_queue_.pop_front();
delete message;
- write_buffer_->offset_ = 0;
+ write_buffer_->platform_handles_offset_ = 0;
+ write_buffer_->data_offset_ = 0;
if (write_buffer_->message_queue_.empty())
return true;
@@ -369,7 +429,8 @@ bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
write_stopped_ = true;
STLDeleteElements(&write_buffer_->message_queue_);
- write_buffer_->offset_ = 0;
+ write_buffer_->platform_handles_offset_ = 0;
+ write_buffer_->data_offset_ = 0;
return false;
}
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
index 1747db5..5eb9d0f 100644
--- a/mojo/system/raw_channel.h
+++ b/mojo/system/raw_channel.h
@@ -136,9 +136,24 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
size_t size;
};
- WriteBuffer();
+ explicit WriteBuffer(size_t serialized_platform_handle_size);
~WriteBuffer();
+ // Returns true if there are (more) platform handles to be sent (from the
+ // front of |message_queue_|).
+ bool HavePlatformHandlesToSend() const;
+ // Gets platform handles to be sent (from the front of |message_queue_|).
+ // This should only be called if |HavePlatformHandlesToSend()| returned
+ // true. There are two components to this: the actual |PlatformHandle|s
+ // (which should be closed once sent) and any additional serialization
+ // information (which will be embedded in the message's data; there are
+ // |GetSerializedPlatformHandleSize()| bytes per handle). Once all platform
+ // handles have been sent, the message data should be written next (see
+ // |GetBuffers()|).
+ void GetPlatformHandlesToSend(size_t* num_platform_handles,
+ embedder::PlatformHandle** platform_handles,
+ void** serialization_data);
+
// Gets buffers to be written. These buffers will always come from the front
// of |message_queue_|. Once they are completely written, the front
// |MessageInTransit| should be popped (and destroyed); this is done in
@@ -148,12 +163,19 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
private:
friend class RawChannel;
+ const size_t serialized_platform_handle_size_;
+
// TODO(vtl): When C++11 is available, switch this to a deque of
// |scoped_ptr|/|unique_ptr|s.
std::deque<MessageInTransit*> message_queue_;
- // The first message may have been partially sent. |offset_| indicates the
- // position in the first message where to start the next write.
- size_t offset_;
+ // Platform handles are sent before the message data, but doing so may
+ // require several passes. |platform_handles_offset_| indicates the position
+ // in the first message's vector of platform handles to send next.
+ size_t platform_handles_offset_;
+ // The first message's data may have been partially sent. |data_offset_|
+ // indicates the position in the first message's data to start the next
+ // write.
+ size_t data_offset_;
DISALLOW_COPY_AND_ASSIGN(WriteBuffer);
};
@@ -177,32 +199,33 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// - the method is called on the I/O thread WITHOUT |write_lock_| held.
//
// The implementing subclass must guarantee that:
- // - |bytes_read| is untouched if the method returns values other than
- // IO_SUCCEEDED;
- // - if the method returns IO_PENDING, |OnReadCompleted()| will be called on
+ // - |bytes_read| is untouched unless |Read()| returns |IO_SUCCEEDED|;
+ // - if the method returns |IO_PENDING|, |OnReadCompleted()| will be called on
// the I/O thread to report the result, unless |Shutdown()| is called.
virtual IOResult Read(size_t* bytes_read) = 0;
// Similar to |Read()|, except that the implementing subclass must also
// guarantee that the method doesn't succeed synchronously, i.e., it only
- // returns IO_FAILED or IO_PENDING.
+ // returns |IO_FAILED| or |IO_PENDING|.
virtual IOResult ScheduleRead() = 0;
// Writes contents in |write_buffer_no_lock()|.
// This class guarantees that:
- // - the area indicated by |GetBuffers()| will stay valid until write
- // completion (but please also see the comments for |OnShutdownNoLock()|);
+ // - the |PlatformHandle|s given by |GetPlatformHandlesToSend()| and the
+ // buffer(s) given by |GetBuffers()| will remain valid until write
+ // completion (see also the comments for |OnShutdownNoLock()|);
// - a second write is not started if there is a pending write;
// - the method is called under |write_lock_|.
//
// The implementing subclass must guarantee that:
- // - |bytes_written| is untouched if the method returns values other than
- // IO_SUCCEEDED;
- // - if the method returns IO_PENDING, |OnWriteCompleted()| will be called on
- // the I/O thread to report the result, unless |Shutdown()| is called.
- virtual IOResult WriteNoLock(size_t* bytes_written) = 0;
+ // - |platform_handles_written| and |bytes_written| are untouched unless
+ // |WriteNoLock()| returns |IO_SUCCEEDED|;
+ // - if the method returns |IO_PENDING|, |OnWriteCompleted()| will be called
+ // on the I/O thread to report the result, unless |Shutdown()| is called.
+ virtual IOResult WriteNoLock(size_t* platform_handles_written,
+ size_t* bytes_written) = 0;
// Similar to |WriteNoLock()|, except that the implementing subclass must also
// guarantee that the method doesn't succeed synchronously, i.e., it only
- // returns IO_FAILED or IO_PENDING.
+ // returns |IO_FAILED| or |IO_PENDING|.
virtual IOResult ScheduleWriteNoLock() = 0;
// Must be called on the I/O thread WITHOUT |write_lock_| held.
@@ -217,7 +240,9 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// Must be called on the I/O thread WITHOUT |write_lock_| held.
void OnReadCompleted(bool result, size_t bytes_read);
// Must be called on the I/O thread WITHOUT |write_lock_| held.
- void OnWriteCompleted(bool result, size_t bytes_written);
+ void OnWriteCompleted(bool result,
+ size_t platform_handles_written,
+ size_t bytes_written);
private:
// Calls |delegate_->OnFatalError(fatal_error)|. Must be called on the I/O
@@ -229,7 +254,9 @@ class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
// false or any error occurs during the method execution, cancels pending
// writes and returns false.
// Must be called only if |write_stopped_| is false and under |write_lock_|.
- bool OnWriteCompletedNoLock(bool result, size_t bytes_written);
+ bool OnWriteCompletedNoLock(bool result,
+ size_t platform_handles_written,
+ size_t bytes_written);
// Set in |Init()| and never changed (hence usable on any thread without
// locking):
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index 10ef535..1fd4a31 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -40,7 +40,8 @@ class RawChannelPosix : public RawChannel,
// |RawChannel| protected methods:
virtual IOResult Read(size_t* bytes_read) OVERRIDE;
virtual IOResult ScheduleRead() OVERRIDE;
- virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* platform_handles_written,
+ size_t* bytes_written) OVERRIDE;
virtual IOResult ScheduleWriteNoLock() OVERRIDE;
virtual bool OnInit() OVERRIDE;
virtual void OnShutdownNoLock(
@@ -155,49 +156,66 @@ RawChannel::IOResult RawChannelPosix::ScheduleRead() {
return IO_PENDING;
}
-RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
+RawChannel::IOResult RawChannelPosix::WriteNoLock(
+ size_t* platform_handles_written,
+ size_t* bytes_written) {
write_lock().AssertAcquired();
DCHECK(!pending_write_);
- std::vector<WriteBuffer::Buffer> buffers;
- write_buffer_no_lock()->GetBuffers(&buffers);
- DCHECK(!buffers.empty());
-
- ssize_t write_result = -1;
- if (buffers.size() == 1) {
- write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr,
- buffers[0].size);
+ if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
+ size_t num_platform_handles;
+ embedder::PlatformHandle* platform_handles;
+ void* serialization_data; // Actually unused.
+ write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles,
+ &platform_handles,
+ &serialization_data);
+ DCHECK_GT(num_platform_handles, 0u);
+ DCHECK(platform_handles);
+ DCHECK(serialization_data);
+
+ size_t num_to_send = std::min(num_platform_handles,
+ embedder::kPlatformChannelMaxNumHandles);
+ bool succeeded = embedder::PlatformChannelSendHandles(fd_.get(),
+ platform_handles,
+ num_to_send);
+ if (succeeded) {
+ *platform_handles_written = num_to_send;
+ *bytes_written = 0;
+ return IO_SUCCEEDED;
+ }
} else {
- // Note that using |writev()|/|sendmsg()| is measurably slower than using
- // |write()| -- at least in a microbenchmark -- but much faster than using
- // multiple |write()|s. (|sendmsg()| is also measurably slightly slower than
- // |writev()|.)
- //
- // On Linux, we need to use |sendmsg()| since it's the only way to suppress
- // |SIGPIPE| (on Mac, this is suppressed on the socket itself using
- // |setsockopt()|, since |MSG_NOSIGNAL| is not supported -- see
- // platform_channel_pair_posix.cc).
- const size_t kMaxBufferCount = 10;
- iovec iov[kMaxBufferCount];
- size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
-
- for (size_t i = 0; i < buffer_count; ++i) {
- iov[i].iov_base = const_cast<char*>(buffers[i].addr);
- iov[i].iov_len = buffers[i].size;
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+
+ ssize_t write_result;
+ if (buffers.size() == 1) {
+ write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr,
+ buffers[0].size);
+ } else {
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
+ }
+
+ write_result = embedder::PlatformChannelWritev(fd_.get(), iov,
+ buffer_count);
}
- write_result = embedder::PlatformChannelWritev(fd_.get(), iov,
- buffer_count);
- }
-
- if (write_result >= 0) {
- *bytes_written = static_cast<size_t>(write_result);
- return IO_SUCCEEDED;
+ if (write_result >= 0) {
+ *platform_handles_written = 0;
+ *bytes_written = static_cast<size_t>(write_result);
+ return IO_SUCCEEDED;
+ }
}
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- PLOG(ERROR) << "write";
+ PLOG(ERROR) << "sendmsg/write/writev";
return IO_FAILED;
}
@@ -220,9 +238,8 @@ RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
return IO_PENDING;
}
- if (message_loop_for_io()->WatchFileDescriptor(
- fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
- write_watcher_.get(), this)) {
+ if (message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, false,
+ base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this)) {
pending_write_ = true;
return IO_PENDING;
}
@@ -300,6 +317,7 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
IOResult result = IO_FAILED;
+ size_t platform_handles_written = 0;
size_t bytes_written = 0;
{
base::AutoLock locker(write_lock());
@@ -307,11 +325,14 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK(pending_write_);
pending_write_ = false;
- result = WriteNoLock(&bytes_written);
+ result = WriteNoLock(&platform_handles_written, &bytes_written);
}
- if (result != IO_PENDING)
- OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
+ if (result != IO_PENDING) {
+ OnWriteCompleted(result == IO_SUCCEEDED,
+ platform_handles_written,
+ bytes_written);
+ }
}
void RawChannelPosix::WaitToWrite() {
@@ -328,7 +349,7 @@ void RawChannelPosix::WaitToWrite() {
DCHECK(pending_write_);
pending_write_ = false;
}
- OnWriteCompleted(false, 0);
+ OnWriteCompleted(false, 0, 0);
}
}
diff --git a/mojo/system/raw_channel_win.cc b/mojo/system/raw_channel_win.cc
index 28c7302..515311e 100644
--- a/mojo/system/raw_channel_win.cc
+++ b/mojo/system/raw_channel_win.cc
@@ -162,7 +162,8 @@ class RawChannelWin : public RawChannel {
// |RawChannel| private methods:
virtual IOResult Read(size_t* bytes_read) OVERRIDE;
virtual IOResult ScheduleRead() OVERRIDE;
- virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* platform_handles_written,
+ size_t* bytes_written) OVERRIDE;
virtual IOResult ScheduleWriteNoLock() OVERRIDE;
virtual bool OnInit() OVERRIDE;
virtual void OnShutdownNoLock(
@@ -331,9 +332,9 @@ void RawChannelWin::RawChannelIOHandler::OnWriteCompleted(DWORD bytes_written,
if (error != ERROR_SUCCESS) {
LOG(ERROR) << "WriteFile: " << logging::SystemErrorCodeToString(error);
- owner_->OnWriteCompleted(false, 0);
+ owner_->OnWriteCompleted(false, 0, 0);
} else {
- owner_->OnWriteCompleted(true, bytes_written);
+ owner_->OnWriteCompleted(true, 0, bytes_written);
}
}
@@ -424,12 +425,19 @@ RawChannel::IOResult RawChannelWin::ScheduleRead() {
return io_result;
}
-RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) {
+RawChannel::IOResult RawChannelWin::WriteNoLock(
+ size_t* platform_handles_written,
+ size_t* bytes_written) {
write_lock().AssertAcquired();
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_write_no_lock());
+ if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
+ // TODO(vtl): Implement.
+ NOTIMPLEMENTED();
+ }
+
std::vector<WriteBuffer::Buffer> buffers;
write_buffer_no_lock()->GetBuffers(&buffers);
DCHECK(!buffers.empty());
@@ -447,6 +455,7 @@ RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) {
}
if (result && skip_completion_port_on_success_) {
+ *platform_handles_written = 0;
*bytes_written = bytes_written_dword;
return IO_SUCCEEDED;
}
@@ -470,8 +479,10 @@ RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() {
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_write_no_lock());
+ // TODO(vtl): Do something with |platform_handles_written|.
+ size_t platform_handles_written = 0;
size_t bytes_written = 0;
- IOResult io_result = WriteNoLock(&bytes_written);
+ IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
if (io_result == IO_SUCCEEDED) {
DCHECK(skip_completion_port_on_success_);
diff --git a/mojo/system/transport_data.h b/mojo/system/transport_data.h
index 4dc9238..0f03c81 100644
--- a/mojo/system/transport_data.h
+++ b/mojo/system/transport_data.h
@@ -83,17 +83,20 @@ class MOJO_SYSTEM_IMPL_EXPORT TransportData {
~TransportData();
const void* buffer() const { return buffer_.get(); }
+ void* buffer() { return buffer_.get(); }
size_t buffer_size() const { return buffer_size_; }
+ uint32_t platform_handle_table_offset() const {
+ return header()->platform_handle_table_offset;
+ }
+
// Gets attached platform-specific handles; this may return null if there are
// none. Note that the caller may mutate the set of platform-specific handles.
- std::vector<embedder::PlatformHandle>* platform_handles() {
+ const std::vector<embedder::PlatformHandle>* platform_handles() const {
return platform_handles_.get();
}
-
- // Returns true if there are platform-specific handles attached.
- bool has_platform_handles() const {
- return platform_handles_ && !platform_handles_->empty();
+ std::vector<embedder::PlatformHandle>* platform_handles() {
+ return platform_handles_.get();
}
// Receive-side functions:
@@ -143,6 +146,10 @@ class MOJO_SYSTEM_IMPL_EXPORT TransportData {
// The maximum total number of platform handles that may be attached.
static const size_t kMaxPlatformHandles;
+ const Header* header() const {
+ return reinterpret_cast<const Header*>(buffer_.get());
+ }
+
size_t buffer_size_;
scoped_ptr<char, base::AlignedFreeDeleter> buffer_; // Never null.