summaryrefslogtreecommitdiffstats
path: root/net/quic/reliable_quic_stream.cc
diff options
context:
space:
mode:
Diffstat (limited to 'net/quic/reliable_quic_stream.cc')
-rw-r--r--net/quic/reliable_quic_stream.cc124
1 files changed, 114 insertions, 10 deletions
diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc
index bc099e2..edcb026 100644
--- a/net/quic/reliable_quic_stream.cc
+++ b/net/quic/reliable_quic_stream.cc
@@ -25,6 +25,85 @@ struct iovec MakeIovec(StringPiece data) {
} // namespace
+// Wrapper that aggregates OnAckNotifications for packets sent using
+// WriteOrBufferData and delivers them to the original
+// QuicAckNotifier::DelegateInterface after all bytes written using
+// WriteOrBufferData are acked. This level of indirection is
+// necessary because the delegate interface provides no mechanism that
+// WriteOrBufferData can use to inform it that the write required
+// multiple WritevData calls or that only part of the data has been
+// sent out by the time ACKs start arriving.
+class ReliableQuicStream::ProxyAckNotifierDelegate
+ : public QuicAckNotifier::DelegateInterface {
+ public:
+ explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
+ : delegate_(delegate),
+ pending_acks_(0),
+ wrote_last_data_(false),
+ num_original_packets_(0),
+ num_original_bytes_(0),
+ num_retransmitted_packets_(0),
+ num_retransmitted_bytes_(0) {
+ }
+
+ virtual void OnAckNotification(int num_original_packets,
+ int num_original_bytes,
+ int num_retransmitted_packets,
+ int num_retransmitted_bytes) OVERRIDE {
+ DCHECK_LT(0, pending_acks_);
+ --pending_acks_;
+ num_original_packets_ += num_original_packets;
+ num_original_bytes_ += num_original_bytes;
+ num_retransmitted_packets_ += num_retransmitted_packets;
+ num_retransmitted_bytes_ += num_retransmitted_bytes;
+
+ if (wrote_last_data_ && pending_acks_ == 0) {
+ delegate_->OnAckNotification(num_original_packets_,
+ num_original_bytes_,
+ num_retransmitted_packets_,
+ num_retransmitted_bytes_);
+ }
+ }
+
+ void WroteData(bool last_data) {
+ DCHECK(!wrote_last_data_);
+ ++pending_acks_;
+ wrote_last_data_ = last_data;
+ }
+
+ protected:
+ // Delegates are ref counted.
+ virtual ~ProxyAckNotifierDelegate() {
+ }
+
+ private:
+ // Original delegate. delegate_->OnAckNotification will be called when:
+ // wrote_last_data_ == true and pending_acks_ == 0
+ scoped_refptr<DelegateInterface> delegate_;
+
+ // Number of outstanding acks.
+ int pending_acks_;
+
+ // True if no pending writes remain.
+ bool wrote_last_data_;
+
+ // Accumulators.
+ int num_original_packets_;
+ int num_original_bytes_;
+ int num_retransmitted_packets_;
+ int num_retransmitted_bytes_;
+
+ DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
+};
+
+ReliableQuicStream::PendingData::PendingData(
+ string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
+ : data(data_in), delegate(delegate_in) {
+}
+
+ReliableQuicStream::PendingData::~PendingData() {
+}
+
ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
QuicSession* session)
: sequencer_(this),
@@ -118,7 +197,10 @@ QuicVersion ReliableQuicStream::version() const {
return session()->connection()->version();
}
-void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
+void ReliableQuicStream::WriteOrBufferData(
+ StringPiece data,
+ bool fin,
+ QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
if (data.empty() && !fin) {
LOG(DFATAL) << "data.empty() && !fin";
return;
@@ -129,38 +211,60 @@ void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
return;
}
+ scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
+ if (ack_notifier_delegate != NULL) {
+ proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
+ }
+
QuicConsumedData consumed_data(0, false);
fin_buffered_ = fin;
if (queued_data_.empty()) {
struct iovec iov(MakeIovec(data));
- consumed_data = WritevData(&iov, 1, fin, NULL);
+ consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
DCHECK_LE(consumed_data.bytes_consumed, data.length());
}
+ bool write_completed;
// If there's unconsumed data or an unconsumed fin, queue it.
if (consumed_data.bytes_consumed < data.length() ||
(fin && !consumed_data.fin_consumed)) {
- queued_data_.push_back(
- string(data.data() + consumed_data.bytes_consumed,
- data.length() - consumed_data.bytes_consumed));
+ StringPiece remainder(data.substr(consumed_data.bytes_consumed));
+ queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
+ write_completed = false;
+ } else {
+ write_completed = true;
+ }
+
+ if ((proxy_delegate.get() != NULL) &&
+ (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
+ proxy_delegate->WroteData(write_completed);
}
}
void ReliableQuicStream::OnCanWrite() {
bool fin = false;
while (!queued_data_.empty()) {
- const string& data = queued_data_.front();
+ PendingData* pending_data = &queued_data_.front();
+ ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
if (queued_data_.size() == 1 && fin_buffered_) {
fin = true;
}
- struct iovec iov(MakeIovec(data));
- QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
- if (consumed_data.bytes_consumed == data.size() &&
+ struct iovec iov(MakeIovec(pending_data->data));
+ QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
+ if (consumed_data.bytes_consumed == pending_data->data.size() &&
fin == consumed_data.fin_consumed) {
queued_data_.pop_front();
+ if (delegate != NULL) {
+ delegate->WroteData(true);
+ }
} else {
- queued_data_.front().erase(0, consumed_data.bytes_consumed);
+ if (consumed_data.bytes_consumed > 0) {
+ pending_data->data.erase(0, consumed_data.bytes_consumed);
+ if (delegate != NULL) {
+ delegate->WroteData(false);
+ }
+ }
break;
}
}