diff options
author | jam <jam@chromium.org> | 2015-10-13 12:27:29 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-10-13 19:28:10 +0000 |
commit | 3dbf31732ef535b9d8376c6eb36dda831a8cd20f (patch) | |
tree | 879c547c3cc4708bd88fed09a92b2718439cbc69 /mojo | |
parent | 74a4cfc31efbd98f18d1a18d03e224a2f3832efa (diff) | |
download | chromium_src-3dbf31732ef535b9d8376c6eb36dda831a8cd20f.zip chromium_src-3dbf31732ef535b9d8376c6eb36dda831a8cd20f.tar.gz chromium_src-3dbf31732ef535b9d8376c6eb36dda831a8cd20f.tar.bz2 |
Ensure that the memory for a two-phase read stays valid even if more data is received.
Also simplify some code since pointers can be STL iterators, and avoid an unnecessary copy of the data in DataPipeConsumerDispatcher::OnReadMessage by not creating an unnecessary MessageInTransit.
BUG=478251
Review URL: https://codereview.chromium.org/1401223003
Cr-Commit-Position: refs/heads/master@{#353823}
Diffstat (limited to 'mojo')
-rw-r--r-- | mojo/edk/system/data_pipe_consumer_dispatcher.cc | 43 | ||||
-rw-r--r-- | mojo/edk/system/data_pipe_consumer_dispatcher.h | 3 | ||||
-rw-r--r-- | mojo/edk/system/data_pipe_unittest.cc | 61 | ||||
-rw-r--r-- | mojo/edk/system/message_pipe_dispatcher.cc | 21 | ||||
-rw-r--r-- | mojo/edk/system/raw_channel.cc | 6 |
5 files changed, 101 insertions, 33 deletions
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc index 72e33f4..6e0c35b 100644 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc @@ -81,8 +81,7 @@ DataPipeConsumerDispatcher::Deserialize( SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer); buffer += sizeof(SharedMemoryHeader); if (header->data_size) { - rv->data_.resize(header->data_size); - memcpy(&rv->data_[0], buffer, header->data_size); + rv->data_.assign(buffer, buffer + header->data_size); buffer += header->data_size; } @@ -140,9 +139,6 @@ DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { SerializeInternal(); scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); - rv->channel_ = channel_; - channel_ = nullptr; - rv->options_ = options_; data_.swap(rv->data_); serialized_read_buffer_.swap(rv->serialized_read_buffer_); rv->serialized_platform_handle_ = serialized_platform_handle_.Pass(); @@ -251,6 +247,15 @@ MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( in_two_phase_read_ = false; two_phase_max_bytes_read_ = 0; + if (!data_received_during_two_phase_read_.empty()) { + if (data_.empty()) { + data_received_during_two_phase_read_.swap(data_); + } else { + data_.insert(data_.end(), data_received_during_two_phase_read_.begin(), + data_received_during_two_phase_read_.end()); + data_received_during_two_phase_read_.clear(); + } + } HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); if (!new_state.equals(old_state)) @@ -394,8 +399,8 @@ bool DataPipeConsumerDispatcher::IsBusyNoLock() const { void DataPipeConsumerDispatcher::OnReadMessage( const MessageInTransit::View& message_view, ScopedPlatformHandleVectorPtr platform_handles) { - scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); - + const char* bytes_start = static_cast<const char*>(message_view.bytes()); + const char* bytes_end = bytes_start + message_view.num_bytes(); if (started_transport_.Try()) { // We're not in the middle of being sent. @@ -405,16 +410,20 @@ void DataPipeConsumerDispatcher::OnReadMessage( locker.reset(new base::AutoLock(lock())); } - size_t old_size = data_.size(); - data_.resize(old_size + message->num_bytes()); - memcpy(&data_[old_size], message->bytes(), message->num_bytes()); - if (!old_size) - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); + if (in_two_phase_read_) { + data_received_during_two_phase_read_.insert( + data_received_during_two_phase_read_.end(), bytes_start, bytes_end); + } else { + bool was_empty = data_.empty(); + data_.insert(data_.end(), bytes_start, bytes_end); + if (was_empty) + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); + } started_transport_.Release(); } else { - size_t old_size = data_.size(); - data_.resize(old_size + message->num_bytes()); - memcpy(&data_[old_size], message->bytes(), message->num_bytes()); + // See comment in MessagePipeDispatcher about why we can't and don't need + // to lock here. + data_.insert(data_.end(), bytes_start, bytes_end); } } @@ -457,6 +466,7 @@ void DataPipeConsumerDispatcher::OnError(Error error) { } void DataPipeConsumerDispatcher::SerializeInternal() { + DCHECK(!in_two_phase_read_); // We need to stop watching handle immediately, even though not on IO thread, // so that other messages aren't read after this. if (channel_) { @@ -467,8 +477,9 @@ void DataPipeConsumerDispatcher::SerializeInternal() { CHECK(serialized_write_buffer.empty()); channel_ = nullptr; - serialized_ = true; } + + serialized_ = true; } } // namespace edk diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.h b/mojo/edk/system/data_pipe_consumer_dispatcher.h index 6a55d83..7c09346 100644 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.h +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.h @@ -102,6 +102,9 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final bool in_two_phase_read_; uint32_t two_phase_max_bytes_read_; + // If we get data from the channel while we're in two-phase read, we can't + // resize data_ since it's being used. So instead we store it temporarly. + std::vector<char> data_received_during_two_phase_read_; bool error_; diff --git a/mojo/edk/system/data_pipe_unittest.cc b/mojo/edk/system/data_pipe_unittest.cc index 7b15da4..beda4592 100644 --- a/mojo/edk/system/data_pipe_unittest.cc +++ b/mojo/edk/system/data_pipe_unittest.cc @@ -1294,13 +1294,72 @@ TEST_F(DataPipeTest, WriteCloseProducerReadNoData) { const void* read_buffer_ptr = nullptr; num_bytes = 0u; ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, - ReadData(&read_buffer_ptr, &num_bytes)); + BeginReadData(&read_buffer_ptr, &num_bytes)); // Ditto for discard. num_bytes = 10u; ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes)); } +// Test that during a two phase read the memory stays valid even if more data +// comes in. +TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) { + const char kTestData[] = "hello world"; + const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData)); + + const MojoCreateDataPipeOptions options = { + kSizeOfOptions, // |struct_size|. + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. + 1u, // |element_num_bytes|. + 1000u // |capacity_num_bytes|. + }; + ASSERT_EQ(MOJO_RESULT_OK, Create(&options)); + MojoHandleSignalsState hss; + + // Write some data. + uint32_t num_bytes = kTestDataSize; + ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes)); + ASSERT_EQ(kTestDataSize, num_bytes); + + // Wait for the data. + hss = MojoHandleSignalsState(); + ASSERT_EQ(MOJO_RESULT_OK, + MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, &hss)); + ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); + ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, + hss.satisfiable_signals); + + // Begin a two-phase read. + const void* read_buffer_ptr = nullptr; + uint32_t read_buffer_size = 0u; + ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size)); + + // Write more data. + const char kExtraData[] = "bye world"; + const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData)); + num_bytes = kExtraDataSize; + ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes)); + ASSERT_EQ(kExtraDataSize, num_bytes); + + // Close the producer. + CloseProducer(); + + // Wait. (Note that once the consumer knows that the producer is closed, it + // must also have received the extra data). + hss = MojoHandleSignalsState(); + ASSERT_EQ(MOJO_RESULT_OK, + MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, + MOJO_DEADLINE_INDEFINITE, &hss)); + ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); + ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, + hss.satisfiable_signals); + + // Read the two phase memory to check it's still valid. + ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize)); + EndReadData(read_buffer_size); +} + // Test that two-phase reads/writes behave correctly when given invalid // arguments. TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) { diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc index 1af5b4d..e3bc13b 100644 --- a/mojo/edk/system/message_pipe_dispatcher.cc +++ b/mojo/edk/system/message_pipe_dispatcher.cc @@ -348,12 +348,8 @@ void MessagePipeDispatcher::SerializeInternal() { } DCHECK(serialized_message_queue_.empty()); - // see comment in method below, this is only temporary till we implement a - // solution with shared buffer while (!message_queue_.IsEmpty()) { scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); - size_t cur_size = serialized_message_queue_.size(); - // When MojoWriteMessage is called, the MessageInTransit doesn't have // dispatchers set and CreateEquivaent... is called since the dispatchers @@ -371,15 +367,14 @@ void MessagePipeDispatcher::SerializeInternal() { message->SerializeAndCloseDispatchers(); // cont'd below - size_t main_buffer_size = message->main_buffer_size(); size_t transport_data_buffer_size = message->transport_data() ? message->transport_data()->buffer_size() : 0; - size_t total_size = message->total_size(); - serialized_message_queue_.resize(cur_size + total_size); - memcpy(&serialized_message_queue_[cur_size], message->main_buffer(), - main_buffer_size); + serialized_message_queue_.insert( + serialized_message_queue_.end(), + static_cast<const char*>(message->main_buffer()), + static_cast<const char*>(message->main_buffer()) + main_buffer_size); // cont'd if (transport_data_buffer_size != 0) { @@ -404,9 +399,11 @@ void MessagePipeDispatcher::SerializeInternal() { } } - memcpy(&serialized_message_queue_[ - cur_size + total_size - transport_data_buffer_size], - message->transport_data()->buffer(), transport_data_buffer_size); + serialized_message_queue_.insert( + serialized_message_queue_.end(), + static_cast<const char*>(message->transport_data()->buffer()), + static_cast<const char*>(message->transport_data()->buffer()) + + transport_data_buffer_size); #else NOTREACHED() << "TODO(jam) implement"; #endif diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc index 5ba9a6e..819762b3 100644 --- a/mojo/edk/system/raw_channel.cc +++ b/mojo/edk/system/raw_channel.cc @@ -481,10 +481,8 @@ void RawChannel::SerializeWriteBuffer( std::vector<WriteBuffer::Buffer> buffers; write_buffer_no_lock()->GetBuffers(&buffers); for (size_t i = 0; i < buffers.size(); ++i) { - size_t orig_size = buffer->size(); - buffer->resize(orig_size + buffers[i].size); - memcpy(&((*buffer)[orig_size]), buffers[i].addr, - buffers[i].size); + buffer->insert(buffer->end(), buffers[i].addr, + buffers[i].addr + buffers[i].size); } write_buffer_->message_queue_.DiscardMessage(); } |