summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authorjam <jam@chromium.org>2015-10-13 12:27:29 -0700
committerCommit bot <commit-bot@chromium.org>2015-10-13 19:28:10 +0000
commit3dbf31732ef535b9d8376c6eb36dda831a8cd20f (patch)
tree879c547c3cc4708bd88fed09a92b2718439cbc69 /mojo
parent74a4cfc31efbd98f18d1a18d03e224a2f3832efa (diff)
downloadchromium_src-3dbf31732ef535b9d8376c6eb36dda831a8cd20f.zip
chromium_src-3dbf31732ef535b9d8376c6eb36dda831a8cd20f.tar.gz
chromium_src-3dbf31732ef535b9d8376c6eb36dda831a8cd20f.tar.bz2
Ensure that the memory for a two-phase read stays valid even if more data is received.
Also simplify some code since pointers can be STL iterators, and avoid an unnecessary copy of the data in DataPipeConsumerDispatcher::OnReadMessage by not creating an unnecessary MessageInTransit. BUG=478251 Review URL: https://codereview.chromium.org/1401223003 Cr-Commit-Position: refs/heads/master@{#353823}
Diffstat (limited to 'mojo')
-rw-r--r--mojo/edk/system/data_pipe_consumer_dispatcher.cc43
-rw-r--r--mojo/edk/system/data_pipe_consumer_dispatcher.h3
-rw-r--r--mojo/edk/system/data_pipe_unittest.cc61
-rw-r--r--mojo/edk/system/message_pipe_dispatcher.cc21
-rw-r--r--mojo/edk/system/raw_channel.cc6
5 files changed, 101 insertions, 33 deletions
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index 72e33f4..6e0c35b 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -81,8 +81,7 @@ DataPipeConsumerDispatcher::Deserialize(
SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
buffer += sizeof(SharedMemoryHeader);
if (header->data_size) {
- rv->data_.resize(header->data_size);
- memcpy(&rv->data_[0], buffer, header->data_size);
+ rv->data_.assign(buffer, buffer + header->data_size);
buffer += header->data_size;
}
@@ -140,9 +139,6 @@ DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
SerializeInternal();
scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
- rv->channel_ = channel_;
- channel_ = nullptr;
- rv->options_ = options_;
data_.swap(rv->data_);
serialized_read_buffer_.swap(rv->serialized_read_buffer_);
rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
@@ -251,6 +247,15 @@ MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
in_two_phase_read_ = false;
two_phase_max_bytes_read_ = 0;
+ if (!data_received_during_two_phase_read_.empty()) {
+ if (data_.empty()) {
+ data_received_during_two_phase_read_.swap(data_);
+ } else {
+ data_.insert(data_.end(), data_received_during_two_phase_read_.begin(),
+ data_received_during_two_phase_read_.end());
+ data_received_during_two_phase_read_.clear();
+ }
+ }
HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
if (!new_state.equals(old_state))
@@ -394,8 +399,8 @@ bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
void DataPipeConsumerDispatcher::OnReadMessage(
const MessageInTransit::View& message_view,
ScopedPlatformHandleVectorPtr platform_handles) {
- scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
-
+ const char* bytes_start = static_cast<const char*>(message_view.bytes());
+ const char* bytes_end = bytes_start + message_view.num_bytes();
if (started_transport_.Try()) {
// We're not in the middle of being sent.
@@ -405,16 +410,20 @@ void DataPipeConsumerDispatcher::OnReadMessage(
locker.reset(new base::AutoLock(lock()));
}
- size_t old_size = data_.size();
- data_.resize(old_size + message->num_bytes());
- memcpy(&data_[old_size], message->bytes(), message->num_bytes());
- if (!old_size)
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ if (in_two_phase_read_) {
+ data_received_during_two_phase_read_.insert(
+ data_received_during_two_phase_read_.end(), bytes_start, bytes_end);
+ } else {
+ bool was_empty = data_.empty();
+ data_.insert(data_.end(), bytes_start, bytes_end);
+ if (was_empty)
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
+ }
started_transport_.Release();
} else {
- size_t old_size = data_.size();
- data_.resize(old_size + message->num_bytes());
- memcpy(&data_[old_size], message->bytes(), message->num_bytes());
+ // See comment in MessagePipeDispatcher about why we can't and don't need
+ // to lock here.
+ data_.insert(data_.end(), bytes_start, bytes_end);
}
}
@@ -457,6 +466,7 @@ void DataPipeConsumerDispatcher::OnError(Error error) {
}
void DataPipeConsumerDispatcher::SerializeInternal() {
+ DCHECK(!in_two_phase_read_);
// We need to stop watching handle immediately, even though not on IO thread,
// so that other messages aren't read after this.
if (channel_) {
@@ -467,8 +477,9 @@ void DataPipeConsumerDispatcher::SerializeInternal() {
CHECK(serialized_write_buffer.empty());
channel_ = nullptr;
- serialized_ = true;
}
+
+ serialized_ = true;
}
} // namespace edk
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h
index 6a55d83..7c09346 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.h
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h
@@ -102,6 +102,9 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final
bool in_two_phase_read_;
uint32_t two_phase_max_bytes_read_;
+ // If we get data from the channel while we're in two-phase read, we can't
+ // resize data_ since it's being used. So instead we store it temporarly.
+ std::vector<char> data_received_during_two_phase_read_;
bool error_;
diff --git a/mojo/edk/system/data_pipe_unittest.cc b/mojo/edk/system/data_pipe_unittest.cc
index 7b15da4..beda4592 100644
--- a/mojo/edk/system/data_pipe_unittest.cc
+++ b/mojo/edk/system/data_pipe_unittest.cc
@@ -1294,13 +1294,72 @@ TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
const void* read_buffer_ptr = nullptr;
num_bytes = 0u;
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- ReadData(&read_buffer_ptr, &num_bytes));
+ BeginReadData(&read_buffer_ptr, &num_bytes));
// Ditto for discard.
num_bytes = 10u;
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
}
+// Test that during a two phase read the memory stays valid even if more data
+// comes in.
+TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
+ const char kTestData[] = "hello world";
+ const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
+
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 1000u // |capacity_num_bytes|.
+ };
+ ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
+ MojoHandleSignalsState hss;
+
+ // Write some data.
+ uint32_t num_bytes = kTestDataSize;
+ ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
+ ASSERT_EQ(kTestDataSize, num_bytes);
+
+ // Wait for the data.
+ hss = MojoHandleSignalsState();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
+ MOJO_DEADLINE_INDEFINITE, &hss));
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfiable_signals);
+
+ // Begin a two-phase read.
+ const void* read_buffer_ptr = nullptr;
+ uint32_t read_buffer_size = 0u;
+ ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
+
+ // Write more data.
+ const char kExtraData[] = "bye world";
+ const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
+ num_bytes = kExtraDataSize;
+ ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
+ ASSERT_EQ(kExtraDataSize, num_bytes);
+
+ // Close the producer.
+ CloseProducer();
+
+ // Wait. (Note that once the consumer knows that the producer is closed, it
+ // must also have received the extra data).
+ hss = MojoHandleSignalsState();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ MOJO_DEADLINE_INDEFINITE, &hss));
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
+ ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfiable_signals);
+
+ // Read the two phase memory to check it's still valid.
+ ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
+ EndReadData(read_buffer_size);
+}
+
// Test that two-phase reads/writes behave correctly when given invalid
// arguments.
TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index 1af5b4d..e3bc13b 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -348,12 +348,8 @@ void MessagePipeDispatcher::SerializeInternal() {
}
DCHECK(serialized_message_queue_.empty());
- // see comment in method below, this is only temporary till we implement a
- // solution with shared buffer
while (!message_queue_.IsEmpty()) {
scoped_ptr<MessageInTransit> message = message_queue_.GetMessage();
- size_t cur_size = serialized_message_queue_.size();
-
// When MojoWriteMessage is called, the MessageInTransit doesn't have
// dispatchers set and CreateEquivaent... is called since the dispatchers
@@ -371,15 +367,14 @@ void MessagePipeDispatcher::SerializeInternal() {
message->SerializeAndCloseDispatchers();
// cont'd below
-
size_t main_buffer_size = message->main_buffer_size();
size_t transport_data_buffer_size = message->transport_data() ?
message->transport_data()->buffer_size() : 0;
- size_t total_size = message->total_size();
- serialized_message_queue_.resize(cur_size + total_size);
- memcpy(&serialized_message_queue_[cur_size], message->main_buffer(),
- main_buffer_size);
+ serialized_message_queue_.insert(
+ serialized_message_queue_.end(),
+ static_cast<const char*>(message->main_buffer()),
+ static_cast<const char*>(message->main_buffer()) + main_buffer_size);
// cont'd
if (transport_data_buffer_size != 0) {
@@ -404,9 +399,11 @@ void MessagePipeDispatcher::SerializeInternal() {
}
}
- memcpy(&serialized_message_queue_[
- cur_size + total_size - transport_data_buffer_size],
- message->transport_data()->buffer(), transport_data_buffer_size);
+ serialized_message_queue_.insert(
+ serialized_message_queue_.end(),
+ static_cast<const char*>(message->transport_data()->buffer()),
+ static_cast<const char*>(message->transport_data()->buffer()) +
+ transport_data_buffer_size);
#else
NOTREACHED() << "TODO(jam) implement";
#endif
diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
index 5ba9a6e..819762b3 100644
--- a/mojo/edk/system/raw_channel.cc
+++ b/mojo/edk/system/raw_channel.cc
@@ -481,10 +481,8 @@ void RawChannel::SerializeWriteBuffer(
std::vector<WriteBuffer::Buffer> buffers;
write_buffer_no_lock()->GetBuffers(&buffers);
for (size_t i = 0; i < buffers.size(); ++i) {
- size_t orig_size = buffer->size();
- buffer->resize(orig_size + buffers[i].size);
- memcpy(&((*buffer)[orig_size]), buffers[i].addr,
- buffers[i].size);
+ buffer->insert(buffer->end(), buffers[i].addr,
+ buffers[i].addr + buffers[i].size);
}
write_buffer_->message_queue_.DiscardMessage();
}