diff options
Diffstat (limited to 'net/quic/reliable_quic_stream.cc')
-rw-r--r-- | net/quic/reliable_quic_stream.cc | 124 |
1 files changed, 114 insertions, 10 deletions
diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc index bc099e2..edcb026 100644 --- a/net/quic/reliable_quic_stream.cc +++ b/net/quic/reliable_quic_stream.cc @@ -25,6 +25,85 @@ struct iovec MakeIovec(StringPiece data) { } // namespace +// Wrapper that aggregates OnAckNotifications for packets sent using +// WriteOrBufferData and delivers them to the original +// QuicAckNotifier::DelegateInterface after all bytes written using +// WriteOrBufferData are acked. This level of indirection is +// necessary because the delegate interface provides no mechanism that +// WriteOrBufferData can use to inform it that the write required +// multiple WritevData calls or that only part of the data has been +// sent out by the time ACKs start arriving. +class ReliableQuicStream::ProxyAckNotifierDelegate + : public QuicAckNotifier::DelegateInterface { + public: + explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) + : delegate_(delegate), + pending_acks_(0), + wrote_last_data_(false), + num_original_packets_(0), + num_original_bytes_(0), + num_retransmitted_packets_(0), + num_retransmitted_bytes_(0) { + } + + virtual void OnAckNotification(int num_original_packets, + int num_original_bytes, + int num_retransmitted_packets, + int num_retransmitted_bytes) OVERRIDE { + DCHECK_LT(0, pending_acks_); + --pending_acks_; + num_original_packets_ += num_original_packets; + num_original_bytes_ += num_original_bytes; + num_retransmitted_packets_ += num_retransmitted_packets; + num_retransmitted_bytes_ += num_retransmitted_bytes; + + if (wrote_last_data_ && pending_acks_ == 0) { + delegate_->OnAckNotification(num_original_packets_, + num_original_bytes_, + num_retransmitted_packets_, + num_retransmitted_bytes_); + } + } + + void WroteData(bool last_data) { + DCHECK(!wrote_last_data_); + ++pending_acks_; + wrote_last_data_ = last_data; + } + + protected: + // Delegates are ref counted. + virtual ~ProxyAckNotifierDelegate() { + } + + private: + // Original delegate. delegate_->OnAckNotification will be called when: + // wrote_last_data_ == true and pending_acks_ == 0 + scoped_refptr<DelegateInterface> delegate_; + + // Number of outstanding acks. + int pending_acks_; + + // True if no pending writes remain. + bool wrote_last_data_; + + // Accumulators. + int num_original_packets_; + int num_original_bytes_; + int num_retransmitted_packets_; + int num_retransmitted_bytes_; + + DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); +}; + +ReliableQuicStream::PendingData::PendingData( + string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) + : data(data_in), delegate(delegate_in) { +} + +ReliableQuicStream::PendingData::~PendingData() { +} + ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) : sequencer_(this), @@ -118,7 +197,10 @@ QuicVersion ReliableQuicStream::version() const { return session()->connection()->version(); } -void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { +void ReliableQuicStream::WriteOrBufferData( + StringPiece data, + bool fin, + QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { if (data.empty() && !fin) { LOG(DFATAL) << "data.empty() && !fin"; return; @@ -129,38 +211,60 @@ void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { return; } + scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; + if (ack_notifier_delegate != NULL) { + proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); + } + QuicConsumedData consumed_data(0, false); fin_buffered_ = fin; if (queued_data_.empty()) { struct iovec iov(MakeIovec(data)); - consumed_data = WritevData(&iov, 1, fin, NULL); + consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); DCHECK_LE(consumed_data.bytes_consumed, data.length()); } + bool write_completed; // If there's unconsumed data or an unconsumed fin, queue it. if (consumed_data.bytes_consumed < data.length() || (fin && !consumed_data.fin_consumed)) { - queued_data_.push_back( - string(data.data() + consumed_data.bytes_consumed, - data.length() - consumed_data.bytes_consumed)); + StringPiece remainder(data.substr(consumed_data.bytes_consumed)); + queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); + write_completed = false; + } else { + write_completed = true; + } + + if ((proxy_delegate.get() != NULL) && + (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { + proxy_delegate->WroteData(write_completed); } } void ReliableQuicStream::OnCanWrite() { bool fin = false; while (!queued_data_.empty()) { - const string& data = queued_data_.front(); + PendingData* pending_data = &queued_data_.front(); + ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); if (queued_data_.size() == 1 && fin_buffered_) { fin = true; } - struct iovec iov(MakeIovec(data)); - QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL); - if (consumed_data.bytes_consumed == data.size() && + struct iovec iov(MakeIovec(pending_data->data)); + QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); + if (consumed_data.bytes_consumed == pending_data->data.size() && fin == consumed_data.fin_consumed) { queued_data_.pop_front(); + if (delegate != NULL) { + delegate->WroteData(true); + } } else { - queued_data_.front().erase(0, consumed_data.bytes_consumed); + if (consumed_data.bytes_consumed > 0) { + pending_data->data.erase(0, consumed_data.bytes_consumed); + if (delegate != NULL) { + delegate->WroteData(false); + } + } break; } } |