From 132566930a1da7373ba91e1c30aec0825d70590d Mon Sep 17 00:00:00 2001 From: "akalin@chromium.org" Date: Fri, 1 Mar 2013 22:12:20 +0000 Subject: [SPDY] Implement per-session flow control Basically, we treat a WINDOW_UPDATE frame with stream_id 0 as an update for the session window. Convert the flow_control boolean into a tri-state FlowControlState enum. Add a kProtoSPDY31 constant in next_proto.h and turn on per-session flow control only if SPDY 3.1 is negotiated. BUG=176592 Review URL: https://codereview.chromium.org/12277015 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@185603 0039d316-1c4b-4281-b951-d872f2087c98 --- net/base/net_log_event_type_list.h | 29 ++- net/socket/next_proto.h | 3 +- net/spdy/buffered_spdy_framer.h | 3 +- net/spdy/spdy_protocol.h | 24 ++- net/spdy/spdy_session.cc | 244 +++++++++++++++++---- net/spdy/spdy_session.h | 71 ++++++- net/spdy/spdy_session_spdy2_unittest.cc | 46 +++- net/spdy/spdy_session_spdy3_unittest.cc | 365 +++++++++++++++++++++++++++++++- net/spdy/spdy_stream.cc | 19 +- net/spdy/spdy_stream.h | 40 ++-- net/spdy/spdy_test_util_spdy3.cc | 5 +- net/spdy/spdy_test_util_spdy3.h | 1 + 12 files changed, 751 insertions(+), 99 deletions(-) diff --git a/net/base/net_log_event_type_list.h b/net/base/net_log_event_type_list.h index f25ae52..75291d5 100644 --- a/net/base/net_log_event_type_list.h +++ b/net/base/net_log_event_type_list.h @@ -1070,14 +1070,28 @@ EVENT_TYPE(SPDY_SESSION_GOAWAY) // "stream_id": , // "delta" : , // } -EVENT_TYPE(SPDY_SESSION_RECEIVED_WINDOW_UPDATE) +EVENT_TYPE(SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME) // Sending of a SPDY WINDOW_UPDATE frame (which controls the receive window). // { // "stream_id": , // "delta" : , // } -EVENT_TYPE(SPDY_SESSION_SENT_WINDOW_UPDATE) +EVENT_TYPE(SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME) + +// This event indicates that the send window has been updated for a session. +// { +// "delta": , +// "new_window": , +// } +EVENT_TYPE(SPDY_SESSION_UPDATE_SEND_WINDOW) + +// This event indicates that the recv window has been updated for a session. +// { +// "delta": , +// "new_window": , +// } +EVENT_TYPE(SPDY_SESSION_UPDATE_RECV_WINDOW) // Sending of a SPDY CREDENTIAL frame (which sends a certificate or // certificate chain to the server). @@ -1103,8 +1117,11 @@ EVENT_TYPE(SPDY_SESSION_SEND_DATA) // } EVENT_TYPE(SPDY_SESSION_RECV_DATA) -// Logs that a stream is stalled on the send window being closed. -EVENT_TYPE(SPDY_SESSION_STALLED_ON_SEND_WINDOW) +// Logs that a stream is stalled on the session send window being closed. +EVENT_TYPE(SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW) + +// Logs that a stream is stalled on its send window being closed. +EVENT_TYPE(SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW) // Session is closing // { @@ -1174,7 +1191,7 @@ EVENT_TYPE(SPDY_STREAM) // Logs that a stream attached to a pushed stream. EVENT_TYPE(SPDY_STREAM_ADOPTED_PUSH_STREAM) -// This event indicates that the send window has been updated +// This event indicates that the send window has been updated for a stream. // { // "id": , // "delta": , @@ -1182,7 +1199,7 @@ EVENT_TYPE(SPDY_STREAM_ADOPTED_PUSH_STREAM) // } EVENT_TYPE(SPDY_STREAM_UPDATE_SEND_WINDOW) -// This event indicates that the recv window has been updated +// This event indicates that the recv window has been updated for a stream. // { // "id": , // "delta": , diff --git a/net/socket/next_proto.h b/net/socket/next_proto.h index 6c26197..a60437f 100644 --- a/net/socket/next_proto.h +++ b/net/socket/next_proto.h @@ -18,7 +18,8 @@ enum NextProto { kProtoSPDY2 = 3, kProtoSPDY21 = 4, kProtoSPDY3 = 5, - kProtoMaximumVersion = 6, + kProtoSPDY31 = 6, + kProtoMaximumVersion = 7, }; } // namespace net diff --git a/net/spdy/buffered_spdy_framer.h b/net/spdy/buffered_spdy_framer.h index ba669cb..ff0b082 100644 --- a/net/spdy/buffered_spdy_framer.h +++ b/net/spdy/buffered_spdy_framer.h @@ -50,7 +50,8 @@ class NET_EXPORT_PRIVATE BufferedSpdyFramerVisitorInterface { // Called when data is received. // |stream_id| The stream receiving data. // |data| A buffer containing the data received. - // |len| The length of the data buffer. + // |len| The length of the data buffer (at most 2^24 - 1 for SPDY/3, + // but 2^16 - 1 - 8 for SPDY/4). // When the other side has finished sending data on this stream, // this method will be called with a zero-length buffer. virtual void OnStreamFrameData(SpdyStreamId stream_id, diff --git a/net/spdy/spdy_protocol.h b/net/spdy/spdy_protocol.h index 5b6ddbd..18e7ad5 100644 --- a/net/spdy/spdy_protocol.h +++ b/net/spdy/spdy_protocol.h @@ -152,11 +152,26 @@ namespace net { const int32 kSpdyVersion2 = 2; const int32 kSpdyVersion3 = 3; +// A SPDY stream id is a 31 bit entity. +typedef uint32 SpdyStreamId; + +// Specifies the stream ID used to denote the current session (for +// flow control). +const SpdyStreamId kSessionFlowControlStreamId = 0; + // Initial window size for a Spdy stream const int32 kSpdyStreamInitialWindowSize = 64 * 1024; // 64 KBytes -// Maximum window size for a Spdy stream -const int32 kSpdyStreamMaximumWindowSize = 0x7FFFFFFF; // Max signed 32bit int +// Initial window size for a Spdy session +// +// TODO(akalin): Update this once we settle on the correct session +// initial window size. +// +// TODO(akalin): Upstream this. +const int32 kSpdySessionInitialWindowSize = 64 * 1024; // 64 KBytes + +// Maximum window size for a Spdy stream or session. +const int32 kSpdyMaximumWindowSize = 0x7FFFFFFF; // Max signed 32bit int // SPDY 2 dictionary. // This is just a hacked dictionary to use for shrinking HTTP-like headers. @@ -445,9 +460,6 @@ enum SpdyGoAwayStatus { GOAWAY_NUM_STATUS_CODES = 3 }; -// A SPDY stream id is a 31 bit entity. -typedef uint32 SpdyStreamId; - // A SPDY priority is a number between 0 and 7 (inclusive). // SPDY priority range is version-dependant. For SPDY 2 and below, priority is a // number between 0 and 3. @@ -735,7 +747,7 @@ class SpdyWindowUpdateIR : public SpdyFrameWithStreamIdIR { int32 delta() const { return delta_; } void set_delta(int32 delta) { DCHECK_LT(0, delta); - DCHECK_LE(delta, kSpdyStreamMaximumWindowSize); + DCHECK_LE(delta, kSpdyMaximumWindowSize); delta_ = delta; } diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 2f8d3aa..714b4f0 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -126,15 +126,26 @@ base::Value* NetLogSpdySettingsCallback(const SettingsMap* settings, return dict; } -base::Value* NetLogSpdyWindowUpdateCallback(SpdyStreamId stream_id, - uint32 delta, - NetLog::LogLevel /* log_level */) { +base::Value* NetLogSpdyWindowUpdateFrameCallback( + SpdyStreamId stream_id, + uint32 delta, + NetLog::LogLevel /* log_level */) { base::DictionaryValue* dict = new base::DictionaryValue(); dict->SetInteger("stream_id", static_cast(stream_id)); dict->SetInteger("delta", delta); return dict; } +base::Value* NetLogSpdySessionWindowUpdateCallback( + int32 delta, + int32 window_size, + NetLog::LogLevel /* log_level */) { + DictionaryValue* dict = new DictionaryValue(); + dict->SetInteger("delta", delta); + dict->SetInteger("window_size", window_size); + return dict; +} + base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id, int size, SpdyDataFlags flags, @@ -259,11 +270,14 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, next_ping_id_(1), last_activity_time_(base::TimeTicks::Now()), check_ping_status_pending_(false), - flow_control_(false), + flow_control_state_(FLOW_CONTROL_NONE), stream_initial_send_window_size_(kSpdyStreamInitialWindowSize), stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ? kDefaultInitialRecvWindowSize : stream_initial_recv_window_size), + session_send_window_size_(0), + session_recv_window_size_(0), + session_unacked_recv_window_bytes_(0), net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)), verify_domain_authentication_(verify_domain_authentication), enable_sending_initial_settings_(enable_sending_initial_settings), @@ -360,10 +374,18 @@ net::Error SpdySession::InitializeWithSocket( host_port_pair().ToString())); } - DCHECK(protocol >= kProtoSPDY2); - DCHECK(protocol <= kProtoSPDY3); - int version = (protocol == kProtoSPDY3) ? kSpdyVersion3 : kSpdyVersion2; - flow_control_ = (protocol >= kProtoSPDY3); + DCHECK_GE(protocol, kProtoSPDY2); + DCHECK_LE(protocol, kProtoSPDY31); + int version = (protocol >= kProtoSPDY3) ? kSpdyVersion3 : kSpdyVersion2; + if (protocol >= kProtoSPDY31) { + flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION; + session_send_window_size_ = kSpdySessionInitialWindowSize; + session_recv_window_size_ = kSpdySessionInitialWindowSize; + } else if (protocol >= kProtoSPDY3) { + flow_control_state_ = FLOW_CONTROL_STREAM; + } else { + flow_control_state_ = FLOW_CONTROL_NONE; + } buffered_spdy_framer_.reset(new BufferedSpdyFramer(version, enable_compression_)); @@ -687,9 +709,11 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, flags = static_cast(flags & ~DATA_FLAG_FIN); } - // Obey send window size of the stream if flow control is enabled. - if (flow_control_) { - if (stream->send_window_size() <= 0) { + // Obey send window size of the stream (and session, if applicable) + // if flow control is enabled. + if (flow_control_state_ >= FLOW_CONTROL_STREAM) { + int32 effective_window_size = stream->send_window_size(); + if (effective_window_size <= 0) { // Because we queue frames onto the session, it is possible that // a stream was not flow controlled at the time it attempted the // write, but when we go to fulfill the write, it is now flow @@ -698,15 +722,29 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, // stall occurs. stream->set_stalled_by_flow_control(true); net_log().AddEvent( - NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, + NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); return NULL; } - int new_len = std::min(len, stream->send_window_size()); + if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { + effective_window_size = + std::min(effective_window_size, session_send_window_size_); + if (effective_window_size <= 0) { + stream->set_stalled_by_flow_control(true); + net_log().AddEvent( + NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, + NetLog::IntegerCallback("stream_id", stream_id)); + return NULL; + } + } + + int new_len = std::min(len, effective_window_size); if (new_len < len) { len = new_len; flags = static_cast(flags & ~DATA_FLAG_FIN); } + if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) + DecreaseSendWindowSize(static_cast(len)); stream->DecreaseSendWindowSize(static_cast(len)); } @@ -1302,12 +1340,16 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, const char* data, size_t len, SpdyDataFlags flags) { + DCHECK_LT(len, 1u << 24); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RECV_DATA, base::Bind(&NetLogSpdyDataCallback, stream_id, len, flags)); } + if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) + DecreaseRecvWindowSize(static_cast(len)); + // By the time data comes in, the stream may already be inactive. if (!IsStreamActive(stream_id)) return; @@ -1646,46 +1688,86 @@ void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size) { DCHECK_LE(delta_window_size, static_cast(kint32max)); net_log_.AddEvent( - NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE, - base::Bind(&NetLogSpdyWindowUpdateCallback, + NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, + base::Bind(&NetLogSpdyWindowUpdateFrameCallback, stream_id, delta_window_size)); - if (!IsStreamActive(stream_id)) { + if (flow_control_state_ < FLOW_CONTROL_STREAM) { + LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id + << " when flow control is not turned on"; + return; + } + + if ((stream_id == kSessionFlowControlStreamId) && + flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { + LOG(WARNING) << "Received WINDOW_UPDATE for session when " + << "session flow control is not turned on"; + return; + } + + if ((stream_id != kSessionFlowControlStreamId) && + !IsStreamActive(stream_id)) { LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; return; } if (delta_window_size < 1u) { - ResetStream(stream_id, RST_STREAM_FLOW_CONTROL_ERROR, - base::StringPrintf( - "Received WINDOW_UPDATE with an invalid " - "delta_window_size %ud", delta_window_size)); + if (stream_id == kSessionFlowControlStreamId) { + LOG(WARNING) << "Received session WINDOW_UPDATE with an " + << "invalid delta_window_size " << delta_window_size; + // TODO(akalin): Figure out whether we should instead send a + // GOAWAY and close the connection here. + } else { + ResetStream(stream_id, RST_STREAM_FLOW_CONTROL_ERROR, + base::StringPrintf( + "Received WINDOW_UPDATE with an invalid " + "delta_window_size %ud", delta_window_size)); + } return; } - scoped_refptr stream = active_streams_[stream_id]; - CHECK_EQ(stream->stream_id(), stream_id); - CHECK(!stream->cancelled()); - - if (flow_control_) + if (stream_id == kSessionFlowControlStreamId) { + IncreaseSendWindowSize(static_cast(delta_window_size)); + } else { + scoped_refptr stream = active_streams_[stream_id]; + CHECK_EQ(stream->stream_id(), stream_id); + CHECK(!stream->cancelled()); stream->IncreaseSendWindowSize(static_cast(delta_window_size)); + } } -void SpdySession::SendWindowUpdate(SpdyStreamId stream_id, - uint32 delta_window_size) { +void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, + uint32 delta_window_size) { + CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); CHECK(IsStreamActive(stream_id)); scoped_refptr stream = active_streams_[stream_id]; CHECK_EQ(stream->stream_id(), stream_id); + SendWindowUpdateFrame(stream_id, delta_window_size, stream->priority()); +} +void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { + if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) + return; + + DCHECK_GE(session_unacked_recv_window_bytes_, 0); + DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); + DCHECK_GE(delta_window_size, 1); + // Check for overflow. + DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); + + session_recv_window_size_ += delta_window_size; net_log_.AddEvent( - NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE, - base::Bind(&NetLogSpdyWindowUpdateCallback, - stream_id, delta_window_size)); + NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, + base::Bind(&NetLogSpdySessionWindowUpdateCallback, + delta_window_size, session_recv_window_size_)); - DCHECK(buffered_spdy_framer_.get()); - scoped_ptr window_update_frame( - buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); - QueueFrame(window_update_frame.release(), stream->priority()); + session_unacked_recv_window_bytes_ += delta_window_size; + if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { + SendWindowUpdateFrame(kSessionFlowControlStreamId, + session_unacked_recv_window_bytes_, + HIGHEST); + session_unacked_recv_window_bytes_ = 0; + } } // Given a cwnd that we would have sent to the server, modify it based on the @@ -1782,7 +1864,7 @@ void SpdySession::HandleSetting(uint32 id, uint32 value) { ProcessPendingCreateStreams(); break; case SETTINGS_INITIAL_WINDOW_SIZE: { - if (!flow_control_) { + if (flow_control_state_ < FLOW_CONTROL_STREAM) { LOG(WARNING) << "SETTINGS_INITIAL_WINDOW_SIZE setting received " << "when flow control is turned off"; // TODO(akalin): Figure out whether we should instead send a @@ -1810,7 +1892,7 @@ void SpdySession::HandleSetting(uint32 id, uint32 value) { } void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { - DCHECK(flow_control_); + DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); ActiveStreamMap::iterator it; for (it = active_streams_.begin(); it != active_streams_.end(); ++it) { const scoped_refptr& stream = it->second; @@ -1839,6 +1921,29 @@ void SpdySession::SendPrefacePing() { WritePingFrame(next_ping_id_); } +void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, + uint32 delta_window_size, + RequestPriority priority) { + CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); + if (IsStreamActive(stream_id)) { + scoped_refptr stream = active_streams_[stream_id]; + CHECK_EQ(stream->stream_id(), stream_id); + } else { + CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + CHECK_EQ(stream_id, kSessionFlowControlStreamId); + } + + net_log_.AddEvent( + NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, + base::Bind(&NetLogSpdyWindowUpdateFrameCallback, + stream_id, delta_window_size)); + + DCHECK(buffered_spdy_framer_.get()); + scoped_ptr window_update_frame( + buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); + QueueFrame(window_update_frame.release(), priority); +} + void SpdySession::WritePingFrame(uint32 unique_id) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr ping_frame( @@ -2005,4 +2110,71 @@ SSLClientSocket* SpdySession::GetSSLClientSocket() const { return ssl_socket; } +void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + + DCHECK_GE(delta_window_size, 1); + + // Check for overflow. + int32 max_delta_window_size = kint32max - session_send_window_size_; + if (delta_window_size > max_delta_window_size) { + LOG(WARNING) << "Received WINDOW_UPDATE [delta: " + << delta_window_size + << "] for session overflows session_send_window_size_ " + << "[current: " << session_send_window_size_ << "]"; + // TODO(akalin): Figure out whether we should instead send a + // GOAWAY and close the connection here. + return; + } + + session_send_window_size_ += delta_window_size; + + net_log_.AddEvent( + NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, + base::Bind(&NetLogSpdySessionWindowUpdateCallback, + delta_window_size, session_send_window_size_)); +} + +void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + + // We only call this method when sending a frame. Therefore, + // |delta_window_size| should be within the valid frame size range. + DCHECK_GE(delta_window_size, 1); + DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); + + // |send_window_size_| should have been at least |delta_window_size| for + // this call to happen. + DCHECK_GE(session_send_window_size_, delta_window_size); + + session_send_window_size_ -= delta_window_size; + + net_log_.AddEvent( + NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, + base::Bind(&NetLogSpdySessionWindowUpdateCallback, + -delta_window_size, session_send_window_size_)); +} + +void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + DCHECK_GE(delta_window_size, 1); + + // |delta_window_size| should never cause + // |session_recv_window_size_| to go negative. If we do, it's a + // client-side bug. + if (delta_window_size > session_recv_window_size_) { + NOTREACHED() << "Received session WINDOW_UPDATE with an " + << "invalid delta_window_size " << delta_window_size; + // TODO(akalin): Figure out whether we should instead send a + // GOAWAY and close the connection here. + return; + } + + session_recv_window_size_ -= delta_window_size; + net_log_.AddEvent( + NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, + base::Bind(&NetLogSpdySessionWindowUpdateCallback, + -delta_window_size, session_recv_window_size_)); +} + } // namespace net diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 80a2dd3..865589d 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -101,8 +101,16 @@ COMPILE_ASSERT(PROTOCOL_ERROR_UNEXPECTED_PING == class NET_EXPORT SpdySession : public base::RefCounted, public BufferedSpdyFramerVisitorInterface { public: + // TODO(akalin): Use base::TickClock when it becomes available. typedef base::TimeTicks (*TimeFunc)(void); + // How we handle flow control (version-dependent). + enum FlowControlState { + FLOW_CONTROL_NONE, + FLOW_CONTROL_STREAM, + FLOW_CONTROL_STREAM_AND_SESSION + }; + // Defines an interface for producing SpdyIOBuffers. class NET_EXPORT_PRIVATE SpdyIOBufferProducer { public: @@ -266,9 +274,17 @@ class NET_EXPORT SpdySession : public base::RefCounted, // if server bound certs are not supported in this session. ServerBoundCertService* GetServerBoundCertService() const; - // Send WINDOW_UPDATE frame, called by a stream whenever receive window - // size is increased. - void SendWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size); + // Send a WINDOW_UPDATE frame for a stream. Called by a stream + // whenever receive window size is increased. + void SendStreamWindowUpdate(SpdyStreamId stream_id, + uint32 delta_window_size); + + // Called by a stream to increase this session's receive window size + // by |delta_window_size|, which must be at least 1 and must not + // cause this session's receive window size to overflow, possibly + // also sending a WINDOW_UPDATE frame. Does nothing if session flow + // control is turned off. + void IncreaseRecvWindowSize(int32 delta_window_size); // If session is closed, no new streams/transactions should be created. bool IsClosed() const { return state_ == STATE_CLOSED; } @@ -331,9 +347,9 @@ class NET_EXPORT SpdySession : public base::RefCounted, return pending_create_stream_queues_[priority].size(); } - // Returns true if flow control is enabled for the session. - bool is_flow_control_enabled() const { - return flow_control_; + // Returns the (version-dependent) flow control state. + FlowControlState flow_control_state() const { + return flow_control_state_; } // Returns the current |stream_initial_send_window_size_|. @@ -374,10 +390,17 @@ class NET_EXPORT SpdySession : public base::RefCounted, FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy2Test, FailedPing); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy2Test, GetActivePushStream); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy2Test, DeleteExpiredPushStreams); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy2Test, ProtocolNegotiation); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ClientPing); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, FailedPing); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, GetActivePushStream); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, DeleteExpiredPushStreams); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation31); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, IncreaseRecvWindowSize); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustRecvWindowSize31); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustSendWindowSize31); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31); struct PendingCreateStream { PendingCreateStream(const GURL& url, RequestPriority priority, @@ -479,6 +502,10 @@ class NET_EXPORT SpdySession : public base::RefCounted, // Send PING if there are no PINGs in flight and we haven't heard from server. void SendPrefacePing(); + // Send a single WINDOW_UPDATE frame. + void SendWindowUpdateFrame(SpdyStreamId stream_id, uint32 delta_window_size, + RequestPriority priority); + // Send the PING frame. void WritePingFrame(uint32 unique_id); @@ -572,6 +599,28 @@ class NET_EXPORT SpdySession : public base::RefCounted, bool fin, const SpdyHeaderBlock& headers) OVERRIDE; + // If session flow control is turned on, called by OnWindowUpdate() + // (which is in turn called by the framer) to increase this + // session's send window size by |delta_window_size| from a + // WINDOW_UPDATE frome, which must be at least 1. If + // |delta_window_size| would cause this session's send window size + // to overflow, does nothing. + void IncreaseSendWindowSize(int32 delta_window_size); + + // If session flow control is turned on, called by CreateDataFrame() + // (which is in turn called by a stream) to decrease this session's + // send window size by |delta_window_size|, which must be at least 1 + // and at most kMaxSpdyFrameChunkSize. |delta_window_size| must not + // cause this session's send window size to go negative. + void DecreaseSendWindowSize(int32 delta_window_size); + + // If session flow control is turned on, called by OnStreamFrameData + // (which is in turn called by the framer) to decrease this + // session's receive window size by |delta_window_size|, which must + // be at least 1 and must not cause this session's receive window + // size to go negative. + void DecreaseRecvWindowSize(int32 delta_window_size); + // -------------------------- // Helper methods for testing // -------------------------- @@ -720,8 +769,8 @@ class NET_EXPORT SpdySession : public base::RefCounted, // status. bool check_ping_status_pending_; - // Indicate if flow control is enabled or not. - bool flow_control_; + // The (version-dependent) flow control state. + FlowControlState flow_control_state_; // Initial send window size for this session's streams. Can be // changed by an arriving SETTINGS frame. Newly created streams use @@ -735,6 +784,12 @@ class NET_EXPORT SpdySession : public base::RefCounted, // window size. int32 stream_initial_recv_window_size_; + // Session flow control variables. All zero unless session flow + // control is turned on. + int32 session_send_window_size_; + int32 session_recv_window_size_; + int32 session_unacked_recv_window_bytes_; + BoundNetLog net_log_; // Outside of tests, these should always be true. diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index 01e7e4c..93263b3 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -143,13 +143,16 @@ class SpdySessionSpdy2Test : public PlatformTest { // Creates an initialized session to |pair_|. scoped_refptr CreateInitializedSession() { scoped_refptr session = GetSession(pair_); - InitializeSession(http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ( + OK, + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_)); return session; } - void InitializeSession(HttpNetworkSession* http_session, - SpdySession* session, - const HostPortPair& host_port_pair) { + net::Error InitializeSession(HttpNetworkSession* http_session, + SpdySession* session, + const HostPortPair& host_port_pair) { transport_params_ = new TransportSocketParams( host_port_pair, MEDIUM, false, false, OnHostResolutionCallback()); @@ -160,8 +163,7 @@ class SpdySessionSpdy2Test : public PlatformTest { http_session->GetTransportSocketPool( HttpNetworkSession::NORMAL_SOCKET_POOL), BoundNetLog())); - EXPECT_EQ(OK, - session->InitializeWithSocket(connection.release(), false, OK)); + return session->InitializeWithSocket(connection.release(), false, OK); } scoped_refptr transport_params_; @@ -2001,4 +2003,36 @@ TEST_F(SpdySessionSpdy2Test, GoAwayWhileInDoLoop) { EXPECT_TRUE(data.at_read_eof()); } +// Within this framework, a SpdySession should be initialized with +// flow control disabled and with protocol version 2. +TEST_F(SpdySessionSpdy2Test, ProtocolNegotiation) { + session_deps_.host_resolver->set_synchronous_mode(true); + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, 0, 0) // EOF + }; + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + CreateNetworkSession(); + scoped_refptr session = GetSession(pair_); + + EXPECT_EQ(SpdySession::FLOW_CONTROL_NONE, session->flow_control_state()); + EXPECT_TRUE(session->buffered_spdy_framer_ == NULL); + EXPECT_EQ(0, session->session_send_window_size_); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + + EXPECT_EQ(SpdySession::FLOW_CONTROL_NONE, session->flow_control_state()); + EXPECT_EQ(kSpdyVersion2, session->buffered_spdy_framer_->protocol_version()); + EXPECT_EQ(0, session->session_send_window_size_); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); +} + } // namespace net diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index e7e2d18..6dc72d4 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -16,10 +16,12 @@ #include "net/base/net_log_unittest.h" #include "net/base/test_data_directory.h" #include "net/base/test_data_stream.h" +#include "net/spdy/spdy_http_utils.h" #include "net/spdy/spdy_io_buffer.h" #include "net/spdy/spdy_session_pool.h" #include "net/spdy/spdy_session_test_util.h" #include "net/spdy/spdy_stream.h" +#include "net/spdy/spdy_stream_test_util.h" #include "net/spdy/spdy_test_util_spdy3.h" #include "testing/platform_test.h" @@ -146,13 +148,16 @@ class SpdySessionSpdy3Test : public PlatformTest { // Creates an initialized session to |pair_|. scoped_refptr CreateInitializedSession() { scoped_refptr session = GetSession(pair_); - InitializeSession(http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ( + OK, + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_)); return session; } - void InitializeSession(HttpNetworkSession* http_session, - SpdySession* session, - const HostPortPair& host_port_pair) { + net::Error InitializeSession(HttpNetworkSession* http_session, + SpdySession* session, + const HostPortPair& host_port_pair) { transport_params_ = new TransportSocketParams( host_port_pair, MEDIUM, false, false, OnHostResolutionCallback()); @@ -163,8 +168,7 @@ class SpdySessionSpdy3Test : public PlatformTest { http_session->GetTransportSocketPool( HttpNetworkSession::NORMAL_SOCKET_POOL), BoundNetLog())); - EXPECT_EQ(OK, - session->InitializeWithSocket(connection.release(), false, OK)); + return session->InitializeWithSocket(connection.release(), false, OK); } scoped_refptr transport_params_; @@ -978,7 +982,9 @@ TEST_F(SpdySessionSpdy3Test, CloseSessionOnError) { spdy_session_pool_->Get(pair_, log.bound()); EXPECT_TRUE(spdy_session_pool_->HasSession(pair_)); - InitializeSession(http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ(OK, + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_)); // Flush the SpdySession::OnReadComplete() task. MessageLoop::current()->RunUntilIdle(); @@ -2130,4 +2136,349 @@ TEST_F(SpdySessionSpdy3Test, GoAwayWhileInDoLoop) { EXPECT_TRUE(data.at_read_eof()); } +// Within this framework, a SpdySession should be initialized with +// flow control enabled only for streams and with protocol version 3 +// by default. +TEST_F(SpdySessionSpdy3Test, ProtocolNegotiation) { + session_deps_.host_resolver->set_synchronous_mode(true); + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, 0, 0) // EOF + }; + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + CreateNetworkSession(); + scoped_refptr session = GetSession(pair_); + + EXPECT_EQ(SpdySession::FLOW_CONTROL_NONE, session->flow_control_state()); + EXPECT_TRUE(session->buffered_spdy_framer_ == NULL); + EXPECT_EQ(0, session->session_send_window_size_); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM, session->flow_control_state()); + EXPECT_EQ(kSpdyVersion3, session->buffered_spdy_framer_->protocol_version()); + EXPECT_EQ(0, session->session_send_window_size_); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); +} + +// Within this framework and with the "enable_spdy_31" flag, a +// SpdySession should be initialized with flow control enabled for +// streams and sessions and with protocol version 3. +TEST_F(SpdySessionSpdy3Test, ProtocolNegotiation31) { + session_deps_.enable_spdy_31 = true; + session_deps_.host_resolver->set_synchronous_mode(true); + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, 0, 0) // EOF + }; + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + CreateNetworkSession(); + scoped_refptr session = GetSession(pair_); + + EXPECT_EQ(SpdySession::FLOW_CONTROL_NONE, session->flow_control_state()); + EXPECT_TRUE(session->buffered_spdy_framer_ == NULL); + EXPECT_EQ(0, session->session_send_window_size_); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM_AND_SESSION, + session->flow_control_state()); + EXPECT_EQ(kSpdyVersion3, session->buffered_spdy_framer_->protocol_version()); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); +} + +// SpdySession::IncreaseRecvWindowSize should be callable even if +// session flow control isn't turned on, but it should have no effect. +TEST_F(SpdySessionSpdy3Test, IncreaseRecvWindowSize) { + session_deps_.host_resolver->set_synchronous_mode(true); + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, 0, 0) // EOF + }; + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + CreateNetworkSession(); + scoped_refptr session = GetSession(pair_); + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM, + session->flow_control_state()); + + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + session->IncreaseRecvWindowSize(100); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); +} + +// SpdySession::{Increase,Decrease}RecvWindowSize should properly +// adjust the session receive window size when the "enable_spdy_31" +// flag is set. In addition, SpdySession::IncreaseRecvWindowSize +// should trigger sending a WINDOW_UPDATE frame for a large enough +// delta. +TEST_F(SpdySessionSpdy3Test, AdjustRecvWindowSize31) { + session_deps_.enable_spdy_31 = true; + session_deps_.host_resolver->set_synchronous_mode(true); + + const int32 delta_window_size = 100; + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(ASYNC, 0, 1) // EOF + }; + scoped_ptr window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, + kSpdySessionInitialWindowSize + delta_window_size)); + MockWrite writes[] = { + CreateMockWrite(*window_update, 0), + }; + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + data.set_connect_data(connect_data); + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + SSLSocketDataProvider ssl(SYNCHRONOUS, OK); + session_deps_.deterministic_socket_factory->AddSSLSocketDataProvider(&ssl); + + CreateDeterministicNetworkSession(); + scoped_refptr session = GetSession(pair_); + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM_AND_SESSION, + session->flow_control_state()); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + session->IncreaseRecvWindowSize(delta_window_size); + EXPECT_EQ(kSpdySessionInitialWindowSize + delta_window_size, + session->session_recv_window_size_); + EXPECT_EQ(delta_window_size, session->session_unacked_recv_window_bytes_); + + // Should trigger sending a WINDOW_UPDATE frame. + session->IncreaseRecvWindowSize(kSpdySessionInitialWindowSize); + EXPECT_EQ(2 * kSpdySessionInitialWindowSize + delta_window_size, + session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(2); + + session->DecreaseRecvWindowSize( + 2 * kSpdySessionInitialWindowSize + delta_window_size); + EXPECT_EQ(0, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); +} + +// SpdySession::{Increase,Decrease}SendWindowSize should properly +// adjust the session send window size when the "enable_spdy_31" flag +// is set. +TEST_F(SpdySessionSpdy3Test, AdjustSendWindowSize31) { + session_deps_.enable_spdy_31 = true; + session_deps_.host_resolver->set_synchronous_mode(true); + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, 0, 0) // EOF + }; + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + CreateNetworkSession(); + scoped_refptr session = GetSession(pair_); + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM_AND_SESSION, + session->flow_control_state()); + + const int32 delta_window_size = 100; + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + + session->IncreaseSendWindowSize(delta_window_size); + EXPECT_EQ(kSpdySessionInitialWindowSize + delta_window_size, + session->session_send_window_size_); + + session->DecreaseSendWindowSize(delta_window_size); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); +} + +namespace { + +void ExpectOK(int status) { + EXPECT_EQ(OK, status); +} + +} // namespace + +// Send data back and forth; the send and receive windows should +// change appropriately. +TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { + session_deps_.enable_spdy_31 = true; + + const int32 msg_data_size = 100; + const std::string msg_data(msg_data_size, 'a'); + + MockConnect connect_data(SYNCHRONOUS, OK); + + const SpdyHeaderInfo kSynStartHeader = { + SYN_STREAM, + 1, + 0, + ConvertRequestPriorityToSpdyPriority(MEDIUM, 3), + 0, + CONTROL_FLAG_NONE, + false, + RST_STREAM_INVALID, + NULL, + 0, + DATA_FLAG_NONE + }; + static const char* const kGetHeaders[] = { + ":method", + "GET", + ":scheme", + "http", + ":host", + "www.google.com", + ":path", + "/", + ":version", + "HTTP/1.1", + }; + scoped_ptr req( + ConstructSpdyPacket( + kSynStartHeader, NULL, 0, kGetHeaders, arraysize(kGetHeaders) / 2)); + scoped_ptr msg( + ConstructSpdyBodyFrame(1, msg_data.data(), msg_data_size, false)); + MockWrite writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*msg, 2), + }; + + scoped_ptr resp(ConstructSpdyGetSynReply(NULL, 0, 1)); + scoped_ptr echo( + ConstructSpdyBodyFrame(1, msg_data.data(), msg_data_size, false)); + scoped_ptr window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, msg_data_size)); + MockRead reads[] = { + CreateMockRead(*resp, 1), + CreateMockRead(*echo, 3), + CreateMockRead(*window_update, 4), + MockRead(ASYNC, 0, 5) // EOF + }; + + // Create SpdySession and SpdyStream and send the request. + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + data.set_connect_data(connect_data); + session_deps_.host_resolver->set_synchronous_mode(true); + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + SSLSocketDataProvider ssl(SYNCHRONOUS, OK); + session_deps_.deterministic_socket_factory->AddSSLSocketDataProvider(&ssl); + + CreateDeterministicNetworkSession(); + + scoped_refptr session = CreateInitializedSession(); + + scoped_refptr stream; + TestCompletionCallback callback1; + GURL url1("http://www.google.com"); + EXPECT_EQ(OK, session->CreateStream(url1, MEDIUM, &stream, + BoundNetLog(), callback1.callback())); + EXPECT_EQ(0u, stream->stream_id()); + + scoped_refptr buf(new IOBufferWithSize(msg_data_size)); + memcpy(buf->data(), msg_data.data(), msg_data_size); + scoped_ptr delegate( + new test::TestSpdyStreamDelegate( + stream.get(), NULL, buf.get(), base::Bind(&ExpectOK))); + stream->SetDelegate(delegate.get()); + + scoped_ptr headers(new SpdyHeaderBlock); + (*headers)[":method"] = "GET"; + (*headers)[":scheme"] = url1.scheme(); + (*headers)[":host"] = url1.host(); + (*headers)[":path"] = url1.path(); + (*headers)[":version"] = "HTTP/1.1"; + + stream->set_spdy_headers(headers.Pass()); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(1); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(1); + + EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size, + session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(1); + + EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size, + session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(1); + + EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size, + session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size, + session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(1); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size, + session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + EXPECT_TRUE(data.at_write_eof()); + EXPECT_TRUE(data.at_read_eof()); + + // Normally done by the delegate, but not by our test delegate. + session->IncreaseRecvWindowSize(msg_data_size); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_recv_window_size_); + EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); + + stream->Close(); +} + } // namespace net diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 335516a..aed7af7 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -247,7 +247,7 @@ void SpdyStream::PossiblyResumeIfStalled() { } void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { - DCHECK(session_->is_flow_control_enabled()); + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); if (closed()) return; @@ -264,7 +264,7 @@ void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { } void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { - DCHECK(session_->is_flow_control_enabled()); + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); // Ignore late WINDOW_UPDATEs. if (closed()) @@ -296,7 +296,7 @@ void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { } void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { - DCHECK(session_->is_flow_control_enabled()); + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); if (closed()) return; @@ -319,9 +319,14 @@ void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { } void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { - if (!session_->is_flow_control_enabled()) + if (session_->flow_control_state() < SpdySession::FLOW_CONTROL_STREAM) return; + // Call back into the session, since this is the only + // window-size-related function that is called by the delegate + // instead of by the session. + session_->IncreaseRecvWindowSize(delta_window_size); + // By the time a read is processed by the delegate, this stream may // already be inactive. if (!session_->IsStreamActive(stream_id_)) @@ -342,7 +347,7 @@ void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { unacked_recv_window_bytes_ += delta_window_size; if (unacked_recv_window_bytes_ > session_->stream_initial_recv_window_size() / 2) { - session_->SendWindowUpdate( + session_->SendStreamWindowUpdate( stream_id_, static_cast(unacked_recv_window_bytes_)); unacked_recv_window_bytes_ = 0; } @@ -492,7 +497,7 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) { return; } - if (session_->is_flow_control_enabled()) + if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) DecreaseRecvWindowSize(static_cast(length)); // Track our bandwidth. @@ -852,7 +857,7 @@ void SpdyStream::UpdateHistograms() { void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { DCHECK(session_->IsStreamActive(stream_id_)); - DCHECK(session_->is_flow_control_enabled()); + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); DCHECK_GE(delta_window_size, 1); // Since we never decrease the initial window size, diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 3a5b327..78e3f8b 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -155,35 +155,35 @@ class NET_EXPORT_PRIVATE SpdyStream stalled_by_flow_control_ = stalled; } - // If flow control is turned on, called by the session to adjust - // this stream's send window size by |delta_window_size|, which is - // the difference between the SETTINGS_INITIAL_WINDOW_SIZE in the - // most recent SETTINGS frame and the previous initial send window - // size, possibly unstalling this stream. Although + // If stream flow control is turned on, called by the session to + // adjust this stream's send window size by |delta_window_size|, + // which is the difference between the SETTINGS_INITIAL_WINDOW_SIZE + // in the most recent SETTINGS frame and the previous initial send + // window size, possibly unstalling this stream. Although // |delta_window_size| may cause this stream's send window size to // go negative, it must not cause it to wrap around in either // direction. Does nothing if the stream is already closed. // - // If flow control is turned off, this must not be called. + // If stream flow control is turned off, this must not be called. void AdjustSendWindowSize(int32 delta_window_size); - // If flow control is turned on, called by the session to increase - // this stream's send window size by |delta_window_size| from a - // WINDOW_UPDATE frome, which must be at least 1, possibly + // If stream flow control is turned on, called by the session to + // increase this stream's send window size by |delta_window_size| + // from a WINDOW_UPDATE frome, which must be at least 1, possibly // unstalling this stream. If |delta_window_size| would cause this // stream's send window size to overflow, calls into the session to // reset this stream. Does nothing if the stream is already closed. // - // If flow control is turned off, this must not be called. + // If stream flow control is turned off, this must not be called. void IncreaseSendWindowSize(int32 delta_window_size); - // If flow control is turned on, called by the session to decrease - // this stream's send window size by |delta_window_size|, which must - // be at least 0 and at most kMaxSpdyFrameChunkSize. + // If stream flow control is turned on, called by the session to + // decrease this stream's send window size by |delta_window_size|, + // which must be at least 0 and at most kMaxSpdyFrameChunkSize. // |delta_window_size| must not cause this stream's send window size // to go negative. Does nothing if the stream is already closed. // - // If flow control is turned off, this must not be called. + // If stream flow control is turned off, this must not be called. void DecreaseSendWindowSize(int32 delta_window_size); // Called by the delegate to increase this stream's receive window @@ -191,9 +191,9 @@ class NET_EXPORT_PRIVATE SpdyStream // not cause this stream's receive window size to overflow, possibly // also sending a WINDOW_UPDATE frame. // - // Unlike the functions above, this may be called even when flow - // control is turned off, although this does nothing in that case - // (and also if the stream is inactive). + // Unlike the functions above, this may be called even when stream + // flow control is turned off, although this does nothing in that + // case (and also if the stream is inactive). void IncreaseRecvWindowSize(int32 delta_window_size); int GetPeerAddress(IPEndPoint* address) const; @@ -353,9 +353,9 @@ class NET_EXPORT_PRIVATE SpdyStream // stream has become stalled on flow control. SpdyFrame* ProduceNextFrame(); - // If the stream is active and flow control is turned on, called by - // OnDataReceived (which is in turn called by the session) to - // decrease this stream's receive window size by + // If the stream is active and stream flow control is turned on, + // called by OnDataReceived (which is in turn called by the session) + // to decrease this stream's receive window size by // |delta_window_size|, which must be at least 1 and must not cause // this stream's receive window size to go negative. void DecreaseRecvWindowSize(int32 delta_window_size); diff --git a/net/spdy/spdy_test_util_spdy3.cc b/net/spdy/spdy_test_util_spdy3.cc index 8832544..660c19d 100644 --- a/net/spdy/spdy_test_util_spdy3.cc +++ b/net/spdy/spdy_test_util_spdy3.cc @@ -897,6 +897,7 @@ SpdySessionDependencies::SpdySessionDependencies() enable_compression(false), enable_ping(false), enable_user_alternate_protocol_ports(false), + enable_spdy_31(false), stream_initial_recv_window_size(kSpdyStreamInitialWindowSize), time_func(&base::TimeTicks::Now), net_log(NULL) { @@ -922,6 +923,7 @@ SpdySessionDependencies::SpdySessionDependencies(ProxyService* proxy_service) enable_compression(false), enable_ping(false), enable_user_alternate_protocol_ports(false), + enable_spdy_31(false), stream_initial_recv_window_size(kSpdyStreamInitialWindowSize), time_func(&base::TimeTicks::Now), net_log(NULL) {} @@ -966,7 +968,8 @@ net::HttpNetworkSession::Params SpdySessionDependencies::CreateSessionParams( params.enable_spdy_ping_based_connection_checking = session_deps->enable_ping; params.enable_user_alternate_protocol_ports = session_deps->enable_user_alternate_protocol_ports; - params.spdy_default_protocol = kProtoSPDY3; + params.spdy_default_protocol = + session_deps->enable_spdy_31 ? kProtoSPDY31 : kProtoSPDY3; params.spdy_stream_initial_recv_window_size = session_deps->stream_initial_recv_window_size; params.time_func = session_deps->time_func; diff --git a/net/spdy/spdy_test_util_spdy3.h b/net/spdy/spdy_test_util_spdy3.h index 1c627a3..dfdf6af 100644 --- a/net/spdy/spdy_test_util_spdy3.h +++ b/net/spdy/spdy_test_util_spdy3.h @@ -409,6 +409,7 @@ struct SpdySessionDependencies { bool enable_compression; bool enable_ping; bool enable_user_alternate_protocol_ports; + bool enable_spdy_31; size_t stream_initial_recv_window_size; SpdySession::TimeFunc time_func; std::string trusted_spdy_proxy; -- cgit v1.1