summaryrefslogtreecommitdiffstats
path: root/net/spdy/spdy_session.cc
diff options
context:
space:
mode:
Diffstat (limited to 'net/spdy/spdy_session.cc')
-rw-r--r--net/spdy/spdy_session.cc176
1 files changed, 103 insertions, 73 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 8fa52db..760e4e3 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -794,9 +794,6 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
len = new_len;
flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
}
- if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION)
- DecreaseSendWindowSize(static_cast<int32>(len));
- stream->DecreaseSendWindowSize(static_cast<int32>(len));
}
if (net_log().IsLoggingAllEvents()) {
@@ -816,7 +813,17 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), static_cast<uint32>(len), flags));
- return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
+ scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
+
+ if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
+ DecreaseSendWindowSize(static_cast<int32>(len));
+ data_buffer->AddConsumeCallback(
+ base::Bind(&SpdySession::OnWriteBufferConsumed,
+ weak_factory_.GetWeakPtr(),
+ static_cast<size_t>(len)));
+ }
+
+ return data_buffer.Pass();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
@@ -1447,7 +1454,7 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
DecreaseRecvWindowSize(static_cast<int32>(len));
buffer->AddConsumeCallback(
- base::Bind(&SpdySession::IncreaseRecvWindowSize,
+ base::Bind(&SpdySession::OnReadBufferConsumed,
weak_factory_.GetWeakPtr()));
}
} else {
@@ -2186,9 +2193,26 @@ SSLClientSocket* SpdySession::GetSSLClientSocket() const {
return ssl_socket;
}
-void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
+void SpdySession::OnWriteBufferConsumed(
+ size_t frame_payload_size,
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source) {
DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
+ if (consume_source == SpdyBuffer::DISCARD) {
+ // If we're discarding a frame or part of it, increase the send
+ // window by the number of discarded bytes. (Although if we're
+ // discarding part of a frame, it's probably because of a write
+ // error and we'll be tearing down the session soon.)
+ size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
+ DCHECK_GT(remaining_payload_bytes, 0u);
+ IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
+ }
+ // For consumed bytes, the send window is increased when we receive
+ // a WINDOW_UPDATE frame.
+}
+void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
DCHECK_GE(delta_window_size, 1);
// Check for overflow.
@@ -2214,67 +2238,6 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
ResumeSendStalledStreams();
}
-void SpdySession::QueueSendStalledStream(
- const scoped_refptr<SpdyStream>& stream) {
- DCHECK(stream->send_stalled_by_flow_control());
- stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id());
-}
-
-namespace {
-
-// Helper function to return the total size of an array of objects
-// with .size() member functions.
-template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
- size_t total_size = 0;
- for (size_t i = 0; i < N; ++i) {
- total_size += arr[i].size();
- }
- return total_size;
-}
-
-} // namespace
-
-void SpdySession::ResumeSendStalledStreams() {
- DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
-
- // We don't have to worry about new streams being queued, since
- // doing so would cause IsSendStalled() to return true. But we do
- // have to worry about streams being closed, as well as ourselves
- // being closed.
-
- while (!IsClosed() && !IsSendStalled()) {
- size_t old_size = 0;
- if (DCHECK_IS_ON())
- old_size = GetTotalSize(stream_send_unstall_queue_);
-
- SpdyStreamId stream_id = PopStreamToPossiblyResume();
- if (stream_id == 0)
- break;
- ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
- // The stream may actually still be send-stalled after this (due
- // to its own send window) but that's okay -- it'll then be
- // resumed once its send window increases.
- if (it != active_streams_.end())
- it->second->PossiblyResumeIfSendStalled();
-
- // The size should decrease unless we got send-stalled again.
- if (!IsSendStalled())
- DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
- }
-}
-
-SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
- for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
- std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
- if (!queue->empty()) {
- SpdyStreamId stream_id = queue->front();
- queue->pop_front();
- return stream_id;
- }
- }
- return 0;
-}
-
void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
@@ -2295,16 +2258,22 @@ void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
-delta_window_size, session_send_window_size_));
}
-void SpdySession::IncreaseRecvWindowSize(size_t delta_window_size) {
- if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION)
- return;
+void SpdySession::OnReadBufferConsumed(
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source) {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
+ DCHECK_GE(consume_size, 1u);
+ DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
+ IncreaseRecvWindowSize(static_cast<int32>(consume_size));
+}
+void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
DCHECK_GE(session_unacked_recv_window_bytes_, 0);
DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
- DCHECK_GE(delta_window_size, 1u);
+ DCHECK_GE(delta_window_size, 1);
// Check for overflow.
- DCHECK_LE(delta_window_size,
- static_cast<size_t>(kint32max - session_recv_window_size_));
+ DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
session_recv_window_size_ += delta_window_size;
net_log_.AddEvent(
@@ -2343,4 +2312,65 @@ void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
-delta_window_size, session_recv_window_size_));
}
+void SpdySession::QueueSendStalledStream(
+ const scoped_refptr<SpdyStream>& stream) {
+ DCHECK(stream->send_stalled_by_flow_control());
+ stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id());
+}
+
+namespace {
+
+// Helper function to return the total size of an array of objects
+// with .size() member functions.
+template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
+ size_t total_size = 0;
+ for (size_t i = 0; i < N; ++i) {
+ total_size += arr[i].size();
+ }
+ return total_size;
+}
+
+} // namespace
+
+void SpdySession::ResumeSendStalledStreams() {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
+
+ // We don't have to worry about new streams being queued, since
+ // doing so would cause IsSendStalled() to return true. But we do
+ // have to worry about streams being closed, as well as ourselves
+ // being closed.
+
+ while (!IsClosed() && !IsSendStalled()) {
+ size_t old_size = 0;
+ if (DCHECK_IS_ON())
+ old_size = GetTotalSize(stream_send_unstall_queue_);
+
+ SpdyStreamId stream_id = PopStreamToPossiblyResume();
+ if (stream_id == 0)
+ break;
+ ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
+ // The stream may actually still be send-stalled after this (due
+ // to its own send window) but that's okay -- it'll then be
+ // resumed once its send window increases.
+ if (it != active_streams_.end())
+ it->second->PossiblyResumeIfSendStalled();
+
+ // The size should decrease unless we got send-stalled again.
+ if (!IsSendStalled())
+ DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
+ }
+}
+
+SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
+ for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
+ std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
+ if (!queue->empty()) {
+ SpdyStreamId stream_id = queue->front();
+ queue->pop_front();
+ return stream_id;
+ }
+ }
+ return 0;
+}
+
} // namespace net