// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/quic/quic_data_stream.h" #include "base/logging.h" #include "net/quic/quic_session.h" #include "net/quic/quic_utils.h" #include "net/quic/quic_write_blocked_list.h" using base::StringPiece; using std::min; namespace net { #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") namespace { // This is somewhat arbitrary. It's possible, but unlikely, we will either fail // to set a priority client-side, or cancel a stream before stripping the // priority from the wire server-side. In either case, start out with a // priority in the middle. QuicPriority kDefaultPriority = 3; } // namespace QuicDataStream::QuicDataStream(QuicStreamId id, QuicSession* session) : ReliableQuicStream(id, session), visitor_(NULL), headers_decompressed_(false), priority_(kDefaultPriority), decompression_failed_(false), priority_parsed_(false) { DCHECK_NE(kCryptoStreamId, id); // Don't receive any callbacks from the sequencer until headers // are complete. sequencer()->SetBlockedUntilFlush(); } QuicDataStream::~QuicDataStream() { } size_t QuicDataStream::WriteHeaders( const SpdyHeaderBlock& header_block, bool fin, QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { size_t bytes_written = session()->WriteHeaders( id(), header_block, fin, ack_notifier_delegate); if (fin) { // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent. set_fin_sent(true); CloseWriteSide(); } return bytes_written; } size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) { if (FinishedReadingHeaders()) { // If the headers have been read, simply delegate to the sequencer's // Readv method. return sequencer()->Readv(iov, iov_len); } // Otherwise, copy decompressed header data into |iov|. size_t bytes_consumed = 0; size_t iov_index = 0; while (iov_index < iov_len && decompressed_headers_.length() > bytes_consumed) { size_t bytes_to_read = min(iov[iov_index].iov_len, decompressed_headers_.length() - bytes_consumed); char* iov_ptr = static_cast(iov[iov_index].iov_base); memcpy(iov_ptr, decompressed_headers_.data() + bytes_consumed, bytes_to_read); bytes_consumed += bytes_to_read; ++iov_index; } decompressed_headers_.erase(0, bytes_consumed); if (FinishedReadingHeaders()) { sequencer()->FlushBufferedFrames(); } return bytes_consumed; } int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) { if (FinishedReadingHeaders()) { return sequencer()->GetReadableRegions(iov, iov_len); } if (iov_len == 0) { return 0; } iov[0].iov_base = static_cast( const_cast(decompressed_headers_.data())); iov[0].iov_len = decompressed_headers_.length(); return 1; } bool QuicDataStream::IsDoneReading() const { if (!headers_decompressed_ || !decompressed_headers_.empty()) { return false; } return sequencer()->IsClosed(); } bool QuicDataStream::HasBytesToRead() const { return !decompressed_headers_.empty() || sequencer()->HasBytesToRead(); } void QuicDataStream::set_priority(QuicPriority priority) { DCHECK_EQ(0u, stream_bytes_written()); priority_ = priority; } QuicPriority QuicDataStream::EffectivePriority() const { return priority(); } uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { if (!FinishedReadingHeaders()) { LOG(DFATAL) << "ProcessRawData called before headers have been finished"; return 0; } return ProcessData(data, data_len); } const IPEndPoint& QuicDataStream::GetPeerAddress() { return session()->peer_address(); } bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { return session()->GetSSLInfo(ssl_info); } uint32 QuicDataStream::ProcessHeaderData() { if (decompressed_headers_.empty()) { return 0; } size_t bytes_processed = ProcessData(decompressed_headers_.data(), decompressed_headers_.length()); if (bytes_processed == decompressed_headers_.length()) { decompressed_headers_.clear(); } else { decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); } return bytes_processed; } void QuicDataStream::OnStreamHeaders(StringPiece headers_data) { headers_data.AppendToString(&decompressed_headers_); ProcessHeaderData(); } void QuicDataStream::OnStreamHeadersPriority(QuicPriority priority) { DCHECK(session()->connection()->is_server()); set_priority(priority); } void QuicDataStream::OnStreamHeadersComplete(bool fin, size_t frame_len) { headers_decompressed_ = true; if (fin) { sequencer()->OnStreamFrame(QuicStreamFrame(id(), fin, 0, IOVector())); } ProcessHeaderData(); if (FinishedReadingHeaders()) { sequencer()->FlushBufferedFrames(); } } void QuicDataStream::OnClose() { ReliableQuicStream::OnClose(); if (visitor_) { Visitor* visitor = visitor_; // Calling Visitor::OnClose() may result the destruction of the visitor, // so we need to ensure we don't call it again. visitor_ = NULL; visitor->OnClose(this); } } bool QuicDataStream::FinishedReadingHeaders() { return headers_decompressed_ && decompressed_headers_.empty(); } } // namespace net