// Copyright (c) 2015 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/quic/stream_sequencer_buffer.h" #include "base/basictypes.h" #include "base/logging.h" using std::min; namespace net { StreamSequencerBuffer::Gap::Gap(QuicStreamOffset begin_offset, QuicStreamOffset end_offset) : begin_offset(begin_offset), end_offset(end_offset) {} StreamSequencerBuffer::FrameInfo::FrameInfo() : length(1), timestamp(QuicTime::Zero()) {} StreamSequencerBuffer::FrameInfo::FrameInfo(size_t length, QuicTime timestamp) : length(length), timestamp(timestamp) {} StreamSequencerBuffer::StreamSequencerBuffer(size_t max_capacity_bytes) : max_buffer_capacity_bytes_(max_capacity_bytes), blocks_count_( ceil(static_cast(max_capacity_bytes) / kBlockSizeBytes)), total_bytes_read_(0), blocks_(blocks_count_) { Clear(); } StreamSequencerBuffer::~StreamSequencerBuffer() { Clear(); } void StreamSequencerBuffer::Clear() { for (size_t i = 0; i < blocks_count_; ++i) { if (blocks_[i] != nullptr) { RetireBlock(i); } } num_bytes_buffered_ = 0; // Reset gaps_ so that buffer is in a state as if all data before // total_bytes_read_ has been consumed, and those after total_bytes_read_ // has never arrived. gaps_ = std::list( 1, Gap(total_bytes_read_, std::numeric_limits::max())), frame_arrival_time_map_.clear(); } void StreamSequencerBuffer::RetireBlock(size_t idx) { DCHECK(blocks_[idx] != nullptr); delete blocks_[idx]; blocks_[idx] = nullptr; DVLOG(1) << "Retired block" << idx; } QuicErrorCode StreamSequencerBuffer::OnStreamData( QuicStreamOffset starting_offset, base::StringPiece data, QuicTime timestamp, size_t* const bytes_buffered) { *bytes_buffered = 0; QuicStreamOffset offset = starting_offset; size_t size = data.size(); if (size == 0) { LOG(DFATAL) << "Attempted to write 0 bytes of data."; return QUIC_INVALID_STREAM_FRAME; } // Find the first gap not ending before |offset|. This gap maybe the gap to // fill if the arriving frame doesn't overlaps with previous ones. std::list::iterator current_gap = gaps_.begin(); while (current_gap != gaps_.end() && current_gap->end_offset <= offset) { ++current_gap; } DCHECK(current_gap != gaps_.end()); // "duplication": might duplicate with data alread filled,but also might // overlap across different base::StringPiece objects already written. // In both cases, don't write the data, // and allow the caller of this method to handle the result. if (offset < current_gap->begin_offset && offset + size <= current_gap->begin_offset) { DVLOG(1) << "duplicated data at offset:" << offset << " len: " << size; return QUIC_NO_ERROR; } if (offset < current_gap->begin_offset && offset + size > current_gap->begin_offset) { // Beginning of new data overlaps data before current gap. return QUIC_INVALID_STREAM_DATA; } if (offset + size > current_gap->end_offset) { // End of new data overlaps with data after current gap. return QUIC_INVALID_STREAM_DATA; } // Write beyond the current range this buffer is covering. if (offset + size > total_bytes_read_ + max_buffer_capacity_bytes_) { return QUIC_INTERNAL_ERROR; } size_t total_written = 0; size_t source_remaining = size; const char* source = data.data(); // Write data block by block. If corresponding block has not created yet, // create it first. // Stop when all data are written or reaches the logical end of the buffer. while (source_remaining > 0) { const size_t write_block_num = GetBlockIndex(offset); const size_t write_block_offset = GetInBlockOffset(offset); DCHECK_GT(blocks_count_, write_block_num); size_t block_capacity = GetBlockCapacity(write_block_num); size_t bytes_avail = block_capacity - write_block_offset; // If this write meets the upper boundary of the buffer, // reduce the available free bytes. if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) { bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset; } if (blocks_[write_block_num] == nullptr) { // TODO(danzh): Investigate if using a freelist would improve performance. // Same as RetireBlock(). blocks_[write_block_num] = new BufferBlock(); } const size_t bytes_to_copy = min(bytes_avail, source_remaining); char* dest = blocks_[write_block_num]->buffer + write_block_offset; DVLOG(1) << "write at offset: " << offset << " len: " << bytes_to_copy; memcpy(dest, source, bytes_to_copy); source += bytes_to_copy; source_remaining -= bytes_to_copy; offset += bytes_to_copy; total_written += bytes_to_copy; } DCHECK_GT(total_written, 0u); *bytes_buffered = total_written; UpdateGapList(current_gap, starting_offset, total_written); frame_arrival_time_map_.insert( std::make_pair(starting_offset, FrameInfo(size, timestamp))); num_bytes_buffered_ += total_written; return QUIC_NO_ERROR; } inline void StreamSequencerBuffer::UpdateGapList( std::list::iterator gap_with_new_data_written, QuicStreamOffset start_offset, size_t bytes_written) { if (gap_with_new_data_written->begin_offset == start_offset && gap_with_new_data_written->end_offset > start_offset + bytes_written) { // New data has been written into the left part of the buffer. gap_with_new_data_written->begin_offset = start_offset + bytes_written; } else if (gap_with_new_data_written->begin_offset < start_offset && gap_with_new_data_written->end_offset == start_offset + bytes_written) { // New data has been written into the right part of the buffer. gap_with_new_data_written->end_offset = start_offset; } else if (gap_with_new_data_written->begin_offset < start_offset && gap_with_new_data_written->end_offset > start_offset + bytes_written) { // New data has been written into the middle of the buffer. auto current = gap_with_new_data_written++; size_t current_end = current->end_offset; current->end_offset = start_offset; gaps_.insert(gap_with_new_data_written, Gap(start_offset + bytes_written, current_end)); } else if (gap_with_new_data_written->begin_offset == start_offset && gap_with_new_data_written->end_offset == start_offset + bytes_written) { // This gap has been filled with new data. So it's no longer a gap. gaps_.erase(gap_with_new_data_written); } } size_t StreamSequencerBuffer::Readv(const iovec* dest_iov, size_t dest_count) { size_t bytes_read = 0; for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) { char* dest = reinterpret_cast(dest_iov[i].iov_base); size_t dest_remaining = dest_iov[i].iov_len; while (dest_remaining > 0 && ReadableBytes() > 0) { size_t block_idx = NextBlockToRead(); size_t start_offset_in_block = ReadOffset(); size_t block_capacity = GetBlockCapacity(block_idx); size_t bytes_available_in_block = min(ReadableBytes(), block_capacity - start_offset_in_block); size_t bytes_to_copy = min(bytes_available_in_block, dest_remaining); DCHECK_GT(bytes_to_copy, 0u); DCHECK_NE(static_cast(nullptr), blocks_[block_idx]); memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block, bytes_to_copy); dest += bytes_to_copy; dest_remaining -= bytes_to_copy; num_bytes_buffered_ -= bytes_to_copy; total_bytes_read_ += bytes_to_copy; bytes_read += bytes_to_copy; // Retire the block if all the data is read out // and no other data is stored in this block. if (bytes_to_copy == bytes_available_in_block) { RetireBlockIfEmpty(block_idx); } } } if (bytes_read > 0) { UpdateFrameArrivalMap(total_bytes_read_); } return bytes_read; } int StreamSequencerBuffer::GetReadableRegions(struct iovec* iov, int iov_count) const { DCHECK(iov != nullptr); DCHECK_GT(iov_count, 0); if (ReadableBytes() == 0) { iov[0].iov_base = nullptr; iov[0].iov_len = 0; return 0; } size_t start_block_idx = NextBlockToRead(); QuicStreamOffset readable_offset_end = gaps_.front().begin_offset - 1; DCHECK_GE(readable_offset_end + 1, total_bytes_read_); size_t end_block_offset = GetInBlockOffset(readable_offset_end); size_t end_block_idx = GetBlockIndex(readable_offset_end); // If readable region is within one block, deal with it seperately. if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) { iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); iov[0].iov_len = ReadableBytes(); DVLOG(1) << "get only block" << start_block_idx; return 1; } // Get first block iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset(); DVLOG(1) << "get first block" << start_block_idx << " with len " << iov[0].iov_len; DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len) << "there should be more available data"; // Get readable regions of the rest blocks till either 2nd to last block // before gap is met or |iov| is filled. For these blocks, one whole block is // a region. int iov_used = 1; size_t block_idx = (start_block_idx + iov_used) % blocks_count_; while (block_idx != end_block_idx && iov_used < iov_count) { DCHECK_NE(static_cast(nullptr), blocks_[block_idx]); iov[iov_used].iov_base = blocks_[block_idx]->buffer; iov[iov_used].iov_len = GetBlockCapacity(block_idx); DVLOG(1) << "get block" << block_idx; ++iov_used; block_idx = (start_block_idx + iov_used) % blocks_count_; } // Deal with last block if |iov| can hold more. if (iov_used < iov_count) { DCHECK_NE(static_cast(nullptr), blocks_[block_idx]); iov[iov_used].iov_base = blocks_[end_block_idx]->buffer; iov[iov_used].iov_len = end_block_offset + 1; DVLOG(1) << "get last block " << end_block_idx; ++iov_used; } return iov_used; } bool StreamSequencerBuffer::GetReadableRegion(iovec* iov, QuicTime* timestamp) const { if (ReadableBytes() == 0) { iov[0].iov_base = nullptr; iov[0].iov_len = 0; return false; } size_t start_block_idx = NextBlockToRead(); iov->iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); size_t readable_bytes_in_block = min( GetBlockCapacity(start_block_idx) - ReadOffset(), ReadableBytes()); size_t region_len = 0; auto iter = frame_arrival_time_map_.begin(); *timestamp = iter->second.timestamp; DVLOG(1) << "readable bytes in block: " << readable_bytes_in_block; for (; iter != frame_arrival_time_map_.end() && region_len + iter->second.length <= readable_bytes_in_block; ++iter) { if (iter->second.timestamp != *timestamp) { // If reaches a frame arrive at another timestamp, stop expanding current // region. DVLOG(1) << "Meet frame with different timestamp."; break; } region_len += iter->second.length; DVLOG(1) << "Add bytes to region: " << iter->second.length; } if (iter == frame_arrival_time_map_.end() || iter->second.timestamp == *timestamp) { // If encountered the end of readable bytes before reaching a different // timestamp. DVLOG(1) << "Get all readable bytes in first block."; region_len = readable_bytes_in_block; } iov->iov_len = region_len; return true; } bool StreamSequencerBuffer::MarkConsumed(size_t bytes_used) { if (bytes_used > ReadableBytes()) { return false; } size_t bytes_to_consume = bytes_used; while (bytes_to_consume > 0) { size_t block_idx = NextBlockToRead(); size_t offset_in_block = ReadOffset(); size_t bytes_available = min( ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block); size_t bytes_read = min(bytes_to_consume, bytes_available); total_bytes_read_ += bytes_read; num_bytes_buffered_ -= bytes_read; bytes_to_consume -= bytes_read; // If advanced to the end of current block and end of buffer hasn't wrapped // to this block yet. if (bytes_available == bytes_read) { RetireBlockIfEmpty(block_idx); } } if (bytes_used > 0) { UpdateFrameArrivalMap(total_bytes_read_); } return true; } size_t StreamSequencerBuffer::FlushBufferedFrames() { size_t prev_total_bytes_read = total_bytes_read_; total_bytes_read_ = gaps_.back().begin_offset; Clear(); return total_bytes_read_ - prev_total_bytes_read; } size_t StreamSequencerBuffer::ReadableBytes() const { return gaps_.front().begin_offset - total_bytes_read_; } bool StreamSequencerBuffer::HasBytesToRead() const { return ReadableBytes() > 0; } QuicStreamOffset StreamSequencerBuffer::BytesConsumed() const { return total_bytes_read_; } size_t StreamSequencerBuffer::BytesBuffered() const { return num_bytes_buffered_; } size_t StreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const { return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes; } size_t StreamSequencerBuffer::GetInBlockOffset(QuicStreamOffset offset) const { return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes; } size_t StreamSequencerBuffer::ReadOffset() const { return GetInBlockOffset(total_bytes_read_); } size_t StreamSequencerBuffer::NextBlockToRead() const { return GetBlockIndex(total_bytes_read_); } void StreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) { DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0) << "RetireBlockIfEmpty() should only be called when advancing to next " "block" " or a gap has been reached."; // If the whole buffer becomes empty, the last piece of data has been read. if (Empty()) { RetireBlock(block_index); return; } // Check where the logical end of this buffer is. // Not empty if the end of circular buffer has been wrapped to this block. if (GetBlockIndex(gaps_.back().begin_offset - 1) == block_index) { return; } // Read index remains in this block, which means a gap has been reached. if (NextBlockToRead() == block_index) { Gap first_gap = gaps_.front(); DCHECK(first_gap.begin_offset == total_bytes_read_); // Check where the next piece data is. // Not empty if next piece of data is still in this chunk. bool gap_extends_to_infinity = (first_gap.end_offset != std::numeric_limits::max()); bool gap_ends_in_this_block = (GetBlockIndex(first_gap.end_offset) == block_index); if (gap_extends_to_infinity || gap_ends_in_this_block) { return; } } RetireBlock(block_index); } bool StreamSequencerBuffer::Empty() const { return gaps_.size() == 1 && gaps_.front().begin_offset == total_bytes_read_; } size_t StreamSequencerBuffer::GetBlockCapacity(size_t block_index) const { if ((block_index + 1) == blocks_count_) { size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes; if (result == 0) { // whole block result = kBlockSizeBytes; } return result; } else { return kBlockSizeBytes; } } void StreamSequencerBuffer::UpdateFrameArrivalMap(QuicStreamOffset offset) { // Get the frame before which all frames should be removed. auto next_frame = frame_arrival_time_map_.upper_bound(offset); DCHECK(next_frame != frame_arrival_time_map_.begin()); auto iter = frame_arrival_time_map_.begin(); while (iter != next_frame) { auto erased = *iter; iter = frame_arrival_time_map_.erase(iter); DVLOG(1) << "remove FrameInfo with offsest: " << erased.first << " len: " << erased.second.length; if (erased.first + erased.second.length > offset) { // If last frame is partially read out, update this FrameInfo and insert // it back. auto updated = std::make_pair( offset, FrameInfo(erased.first + erased.second.length - offset, erased.second.timestamp)); DVLOG(1) << "insert back FrameInfo with offset: " << updated.first << " len: " << updated.second.length; frame_arrival_time_map_.insert(updated); } } } } // namespace net