diff options
Diffstat (limited to 'net/spdy/spdy_session.cc')
-rw-r--r-- | net/spdy/spdy_session.cc | 176 |
1 files changed, 103 insertions, 73 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 8fa52db..760e4e3 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -794,9 +794,6 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, len = new_len; flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); } - if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) - DecreaseSendWindowSize(static_cast<int32>(len)); - stream->DecreaseSendWindowSize(static_cast<int32>(len)); } if (net_log().IsLoggingAllEvents()) { @@ -816,7 +813,17 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), static_cast<uint32>(len), flags)); - return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); + scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); + + if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { + DecreaseSendWindowSize(static_cast<int32>(len)); + data_buffer->AddConsumeCallback( + base::Bind(&SpdySession::OnWriteBufferConsumed, + weak_factory_.GetWeakPtr(), + static_cast<size_t>(len))); + } + + return data_buffer.Pass(); } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { @@ -1447,7 +1454,7 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { DecreaseRecvWindowSize(static_cast<int32>(len)); buffer->AddConsumeCallback( - base::Bind(&SpdySession::IncreaseRecvWindowSize, + base::Bind(&SpdySession::OnReadBufferConsumed, weak_factory_.GetWeakPtr())); } } else { @@ -2186,9 +2193,26 @@ SSLClientSocket* SpdySession::GetSSLClientSocket() const { return ssl_socket; } -void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { +void SpdySession::OnWriteBufferConsumed( + size_t frame_payload_size, + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source) { DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + if (consume_source == SpdyBuffer::DISCARD) { + // If we're discarding a frame or part of it, increase the send + // window by the number of discarded bytes. (Although if we're + // discarding part of a frame, it's probably because of a write + // error and we'll be tearing down the session soon.) + size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); + DCHECK_GT(remaining_payload_bytes, 0u); + IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); + } + // For consumed bytes, the send window is increased when we receive + // a WINDOW_UPDATE frame. +} +void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(delta_window_size, 1); // Check for overflow. @@ -2214,67 +2238,6 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { ResumeSendStalledStreams(); } -void SpdySession::QueueSendStalledStream( - const scoped_refptr<SpdyStream>& stream) { - DCHECK(stream->send_stalled_by_flow_control()); - stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id()); -} - -namespace { - -// Helper function to return the total size of an array of objects -// with .size() member functions. -template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { - size_t total_size = 0; - for (size_t i = 0; i < N; ++i) { - total_size += arr[i].size(); - } - return total_size; -} - -} // namespace - -void SpdySession::ResumeSendStalledStreams() { - DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); - - // We don't have to worry about new streams being queued, since - // doing so would cause IsSendStalled() to return true. But we do - // have to worry about streams being closed, as well as ourselves - // being closed. - - while (!IsClosed() && !IsSendStalled()) { - size_t old_size = 0; - if (DCHECK_IS_ON()) - old_size = GetTotalSize(stream_send_unstall_queue_); - - SpdyStreamId stream_id = PopStreamToPossiblyResume(); - if (stream_id == 0) - break; - ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); - // The stream may actually still be send-stalled after this (due - // to its own send window) but that's okay -- it'll then be - // resumed once its send window increases. - if (it != active_streams_.end()) - it->second->PossiblyResumeIfSendStalled(); - - // The size should decrease unless we got send-stalled again. - if (!IsSendStalled()) - DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); - } -} - -SpdyStreamId SpdySession::PopStreamToPossiblyResume() { - for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { - std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; - if (!queue->empty()) { - SpdyStreamId stream_id = queue->front(); - queue->pop_front(); - return stream_id; - } - } - return 0; -} - void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); @@ -2295,16 +2258,22 @@ void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { -delta_window_size, session_send_window_size_)); } -void SpdySession::IncreaseRecvWindowSize(size_t delta_window_size) { - if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) - return; +void SpdySession::OnReadBufferConsumed( + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + DCHECK_GE(consume_size, 1u); + DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); + IncreaseRecvWindowSize(static_cast<int32>(consume_size)); +} +void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(session_unacked_recv_window_bytes_, 0); DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); - DCHECK_GE(delta_window_size, 1u); + DCHECK_GE(delta_window_size, 1); // Check for overflow. - DCHECK_LE(delta_window_size, - static_cast<size_t>(kint32max - session_recv_window_size_)); + DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); session_recv_window_size_ += delta_window_size; net_log_.AddEvent( @@ -2343,4 +2312,65 @@ void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { -delta_window_size, session_recv_window_size_)); } +void SpdySession::QueueSendStalledStream( + const scoped_refptr<SpdyStream>& stream) { + DCHECK(stream->send_stalled_by_flow_control()); + stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id()); +} + +namespace { + +// Helper function to return the total size of an array of objects +// with .size() member functions. +template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { + size_t total_size = 0; + for (size_t i = 0; i < N; ++i) { + total_size += arr[i].size(); + } + return total_size; +} + +} // namespace + +void SpdySession::ResumeSendStalledStreams() { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + + // We don't have to worry about new streams being queued, since + // doing so would cause IsSendStalled() to return true. But we do + // have to worry about streams being closed, as well as ourselves + // being closed. + + while (!IsClosed() && !IsSendStalled()) { + size_t old_size = 0; + if (DCHECK_IS_ON()) + old_size = GetTotalSize(stream_send_unstall_queue_); + + SpdyStreamId stream_id = PopStreamToPossiblyResume(); + if (stream_id == 0) + break; + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + // The stream may actually still be send-stalled after this (due + // to its own send window) but that's okay -- it'll then be + // resumed once its send window increases. + if (it != active_streams_.end()) + it->second->PossiblyResumeIfSendStalled(); + + // The size should decrease unless we got send-stalled again. + if (!IsSendStalled()) + DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); + } +} + +SpdyStreamId SpdySession::PopStreamToPossiblyResume() { + for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { + std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; + if (!queue->empty()) { + SpdyStreamId stream_id = queue->front(); + queue->pop_front(); + return stream_id; + } + } + return 0; +} + } // namespace net |