// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/quic/quic_stream_sequencer.h" #include #include #include #include "base/logging.h" #include "net/quic/quic_clock.h" #include "net/quic/quic_flags.h" #include "net/quic/quic_frame_list.h" #include "net/quic/quic_protocol.h" #include "net/quic/reliable_quic_stream.h" #include "net/quic/stream_sequencer_buffer.h" using std::min; using std::numeric_limits; using std::string; namespace net { QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream, const QuicClock* clock) : stream_(quic_stream), close_offset_(numeric_limits::max()), blocked_(false), num_frames_received_(0), num_duplicate_frames_received_(0), num_early_frames_received_(0), clock_(clock), ignore_read_data_(false) { if (FLAGS_quic_use_stream_sequencer_buffer) { DVLOG(1) << "Use StreamSequencerBuffer for stream: " << stream_->id(); buffered_frames_.reset( new StreamSequencerBuffer(kStreamReceiveWindowLimit)); } else { buffered_frames_.reset(new QuicFrameList()); } } QuicStreamSequencer::~QuicStreamSequencer() {} void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { ++num_frames_received_; const QuicStreamOffset byte_offset = frame.offset; const size_t data_len = frame.data.length(); if (data_len == 0 && !frame.fin) { // Stream frames must have data or a fin flag. stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, "Empty stream frame without FIN set."); return; } if (frame.fin) { CloseStreamAtOffset(frame.offset + data_len); if (data_len == 0) { return; } } size_t bytes_written; QuicErrorCode result = buffered_frames_->OnStreamData( byte_offset, frame.data, clock_->ApproximateNow(), &bytes_written); if (result == QUIC_INVALID_STREAM_DATA) { stream_->CloseConnectionWithDetails( QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); return; } if (result == QUIC_NO_ERROR && bytes_written == 0) { ++num_duplicate_frames_received_; // Silently ignore duplicates. return; } if (byte_offset > buffered_frames_->BytesConsumed()) { ++num_early_frames_received_; } if (blocked_) { return; } if (byte_offset == buffered_frames_->BytesConsumed()) { if (FLAGS_quic_implement_stop_reading && ignore_read_data_) { FlushBufferedFrames(); } else { stream_->OnDataAvailable(); } } } void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { const QuicStreamOffset kMaxOffset = numeric_limits::max(); // If there is a scheduled close, the new offset should match it. if (close_offset_ != kMaxOffset && offset != close_offset_) { stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); return; } close_offset_ = offset; MaybeCloseStream(); } bool QuicStreamSequencer::MaybeCloseStream() { if (blocked_ || !IsClosed()) { return false; } DVLOG(1) << "Passing up termination, as we've processed " << buffered_frames_->BytesConsumed() << " of " << close_offset_ << " bytes."; // This will cause the stream to consume the FIN. // Technically it's an error if |num_bytes_consumed| isn't exactly // equal to |close_offset|, but error handling seems silly at this point. if (FLAGS_quic_implement_stop_reading && ignore_read_data_) { // The sequencer is discarding stream data and must notify the stream on // receipt of a FIN because the consumer won't. stream_->OnFinRead(); } else { stream_->OnDataAvailable(); } buffered_frames_->Clear(); return true; } int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { DCHECK(!blocked_); return buffered_frames_->GetReadableRegions(iov, iov_len); } bool QuicStreamSequencer::GetReadableRegion(iovec* iov, QuicTime* timestamp) const { DCHECK(!blocked_); return buffered_frames_->GetReadableRegion(iov, timestamp); } int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { DCHECK(!blocked_); size_t bytes_read = buffered_frames_->Readv(iov, iov_len); stream_->AddBytesConsumed(bytes_read); return static_cast(bytes_read); } bool QuicStreamSequencer::HasBytesToRead() const { return buffered_frames_->HasBytesToRead(); } bool QuicStreamSequencer::IsClosed() const { return buffered_frames_->BytesConsumed() >= close_offset_; } void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { DCHECK(!blocked_); bool result = buffered_frames_->MarkConsumed(num_bytes_consumed); if (!result) { LOG(DFATAL) << "Invalid argument to MarkConsumed." << " expect to consume: " << num_bytes_consumed << ", but not enough bytes available."; stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); return; } stream_->AddBytesConsumed(num_bytes_consumed); } void QuicStreamSequencer::SetBlockedUntilFlush() { blocked_ = true; } void QuicStreamSequencer::SetUnblocked() { blocked_ = false; if (IsClosed() || HasBytesToRead()) { stream_->OnDataAvailable(); } } void QuicStreamSequencer::StopReading() { if (ignore_read_data_) { return; } ignore_read_data_ = true; FlushBufferedFrames(); } void QuicStreamSequencer::FlushBufferedFrames() { DCHECK(ignore_read_data_); size_t bytes_flushed = buffered_frames_->FlushBufferedFrames(); DVLOG(1) << "Flushing buffered data at offset " << buffered_frames_->BytesConsumed() << " length " << bytes_flushed << " for stream " << stream_->id(); stream_->AddBytesConsumed(bytes_flushed); MaybeCloseStream(); } size_t QuicStreamSequencer::NumBytesBuffered() const { return buffered_frames_->BytesBuffered(); } QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const { return buffered_frames_->BytesConsumed(); } } // namespace net