diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-18 08:31:58 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-18 08:31:58 +0000 |
commit | 8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a (patch) | |
tree | 6a11658779d98ea44c2a195d9ba1009e27e46e43 /net | |
parent | f9961d8588581ef6aac7b112c59fe8bd762cda44 (diff) | |
download | chromium_src-8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a.zip chromium_src-8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a.tar.gz chromium_src-8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a.tar.bz2 |
[SPDY] Avoid leaking bytes from the session flow control send window
Add a ConsumeSource parameter to SpdyBuffer::ConsumeCallback. Use
it to detect when a DATA frame to be written is dropped before it
is written.
Put all the flow control functions together and clean them up a bit.
BUG=176592
Review URL: https://chromiumcodereview.appspot.com/14188025
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194851 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_buffer.cc | 17 | ||||
-rw-r--r-- | net/spdy/spdy_buffer.h | 31 | ||||
-rw-r--r-- | net/spdy/spdy_buffer_unittest.cc | 15 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 176 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 83 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 115 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 158 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 82 |
8 files changed, 436 insertions, 241 deletions
diff --git a/net/spdy/spdy_buffer.cc b/net/spdy/spdy_buffer.cc index a48b35d..ef3e640 100644 --- a/net/spdy/spdy_buffer.cc +++ b/net/spdy/spdy_buffer.cc @@ -41,7 +41,7 @@ SpdyBuffer::SpdyBuffer(const char* data, size_t size) : SpdyBuffer::~SpdyBuffer() { if (GetRemainingSize() > 0) - Consume(GetRemainingSize()); + ConsumeHelper(GetRemainingSize(), DISCARD); } const char* SpdyBuffer::GetRemainingData() const { @@ -57,17 +57,22 @@ void SpdyBuffer::AddConsumeCallback(const ConsumeCallback& consume_callback) { } void SpdyBuffer::Consume(size_t consume_size) { + ConsumeHelper(consume_size, CONSUME); +}; + +IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() { + return new WrappedIOBuffer(GetRemainingData()); +} + +void SpdyBuffer::ConsumeHelper(size_t consume_size, + ConsumeSource consume_source) { DCHECK_GE(consume_size, 1u); DCHECK_LE(consume_size, GetRemainingSize()); offset_ += consume_size; for (std::vector<ConsumeCallback>::const_iterator it = consume_callbacks_.begin(); it != consume_callbacks_.end(); ++it) { - it->Run(consume_size); + it->Run(consume_size, consume_source); } }; -IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() { - return new WrappedIOBuffer(GetRemainingData()); -} - } // namespace net diff --git a/net/spdy/spdy_buffer.h b/net/spdy/spdy_buffer.h index 4d04184..69d39ea 100644 --- a/net/spdy/spdy_buffer.h +++ b/net/spdy/spdy_buffer.h @@ -28,9 +28,21 @@ class SpdyFrame; // fact that IOBuffer member functions are not virtual. class NET_EXPORT_PRIVATE SpdyBuffer { public: - // A Callback that gets called whenever Consume() is called with the - // number of bytes consumed. - typedef base::Callback<void(size_t)> ConsumeCallback; + // The source of a call to a ConsumeCallback. + enum ConsumeSource { + // Called via a call to Consume(). + CONSUME, + // Called via the SpdyBuffer being destroyed. + DISCARD + }; + + // A Callback that gets called when bytes are consumed with the + // (non-zero) number of bytes consumed and the source of the + // consume. May be called any number of times with CONSUME as the + // source followed by at most one call with DISCARD as the + // source. The sum of the number of bytes consumed equals the total + // size of the buffer. + typedef base::Callback<void(size_t, ConsumeSource)> ConsumeCallback; // Construct with the data in the given frame. Assumes that data is // owned by |frame| or outlives it. @@ -40,6 +52,8 @@ class NET_EXPORT_PRIVATE SpdyBuffer { // non-NULL and |size| must be non-zero. SpdyBuffer(const char* data, size_t size); + // If there are bytes remaining in the buffer, triggers a call to + // any consume callbacks with a DISCARD source. ~SpdyBuffer(); // Returns the remaining (unconsumed) data. @@ -48,11 +62,10 @@ class NET_EXPORT_PRIVATE SpdyBuffer { // Returns the number of remaining (unconsumed) bytes. size_t GetRemainingSize() const; - // Add a callback which is called whenever Consume() is called. Used - // mainly to update flow control windows. The ConsumeCallback should - // not do anything complicated; ideally it should only update a - // counter. In particular, it must *not* cause the SpdyBuffer itself - // to be destroyed. + // Add a callback to be called when bytes are consumed. The + // ConsumeCallback should not do anything complicated; ideally it + // should only update a counter. In particular, it must *not* cause + // the SpdyBuffer itself to be destroyed. void AddConsumeCallback(const ConsumeCallback& consume_callback); // Consume the given number of bytes, which must be positive but not @@ -66,6 +79,8 @@ class NET_EXPORT_PRIVATE SpdyBuffer { IOBuffer* GetIOBufferForRemainingData(); private: + void ConsumeHelper(size_t consume_size, ConsumeSource consume_source); + const scoped_ptr<SpdyFrame> frame_; std::vector<ConsumeCallback> consume_callbacks_; size_t offset_; diff --git a/net/spdy/spdy_buffer_unittest.cc b/net/spdy/spdy_buffer_unittest.cc index bcf25d2..5a8bfc8 100644 --- a/net/spdy/spdy_buffer_unittest.cc +++ b/net/spdy/spdy_buffer_unittest.cc @@ -54,7 +54,11 @@ TEST_F(SpdyBufferTest, DataConstructor) { EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer)); } -void IncrementBy(size_t* x, size_t delta) { +void IncrementBy(size_t* x, + SpdyBuffer::ConsumeSource expected_consume_source, + size_t delta, + SpdyBuffer::ConsumeSource consume_source) { + EXPECT_EQ(expected_consume_source, consume_source); *x += delta; } @@ -66,8 +70,10 @@ TEST_F(SpdyBufferTest, Consume) { size_t x1 = 0; size_t x2 = 0; - buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x1)); - buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x2)); + buffer.AddConsumeCallback( + base::Bind(&IncrementBy, &x1, SpdyBuffer::CONSUME)); + buffer.AddConsumeCallback( + base::Bind(&IncrementBy, &x2, SpdyBuffer::CONSUME)); EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer)); @@ -89,7 +95,8 @@ TEST_F(SpdyBufferTest, ConsumeOnDestruction) { { SpdyBuffer buffer(kData, kDataSize); - buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x)); + buffer.AddConsumeCallback( + base::Bind(&IncrementBy, &x, SpdyBuffer::DISCARD)); } EXPECT_EQ(kDataSize, x); diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 8fa52db..760e4e3 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -794,9 +794,6 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, len = new_len; flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); } - if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) - DecreaseSendWindowSize(static_cast<int32>(len)); - stream->DecreaseSendWindowSize(static_cast<int32>(len)); } if (net_log().IsLoggingAllEvents()) { @@ -816,7 +813,17 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), static_cast<uint32>(len), flags)); - return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); + scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); + + if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { + DecreaseSendWindowSize(static_cast<int32>(len)); + data_buffer->AddConsumeCallback( + base::Bind(&SpdySession::OnWriteBufferConsumed, + weak_factory_.GetWeakPtr(), + static_cast<size_t>(len))); + } + + return data_buffer.Pass(); } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { @@ -1447,7 +1454,7 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { DecreaseRecvWindowSize(static_cast<int32>(len)); buffer->AddConsumeCallback( - base::Bind(&SpdySession::IncreaseRecvWindowSize, + base::Bind(&SpdySession::OnReadBufferConsumed, weak_factory_.GetWeakPtr())); } } else { @@ -2186,9 +2193,26 @@ SSLClientSocket* SpdySession::GetSSLClientSocket() const { return ssl_socket; } -void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { +void SpdySession::OnWriteBufferConsumed( + size_t frame_payload_size, + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source) { DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + if (consume_source == SpdyBuffer::DISCARD) { + // If we're discarding a frame or part of it, increase the send + // window by the number of discarded bytes. (Although if we're + // discarding part of a frame, it's probably because of a write + // error and we'll be tearing down the session soon.) + size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); + DCHECK_GT(remaining_payload_bytes, 0u); + IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); + } + // For consumed bytes, the send window is increased when we receive + // a WINDOW_UPDATE frame. +} +void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(delta_window_size, 1); // Check for overflow. @@ -2214,67 +2238,6 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { ResumeSendStalledStreams(); } -void SpdySession::QueueSendStalledStream( - const scoped_refptr<SpdyStream>& stream) { - DCHECK(stream->send_stalled_by_flow_control()); - stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id()); -} - -namespace { - -// Helper function to return the total size of an array of objects -// with .size() member functions. -template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { - size_t total_size = 0; - for (size_t i = 0; i < N; ++i) { - total_size += arr[i].size(); - } - return total_size; -} - -} // namespace - -void SpdySession::ResumeSendStalledStreams() { - DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); - - // We don't have to worry about new streams being queued, since - // doing so would cause IsSendStalled() to return true. But we do - // have to worry about streams being closed, as well as ourselves - // being closed. - - while (!IsClosed() && !IsSendStalled()) { - size_t old_size = 0; - if (DCHECK_IS_ON()) - old_size = GetTotalSize(stream_send_unstall_queue_); - - SpdyStreamId stream_id = PopStreamToPossiblyResume(); - if (stream_id == 0) - break; - ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); - // The stream may actually still be send-stalled after this (due - // to its own send window) but that's okay -- it'll then be - // resumed once its send window increases. - if (it != active_streams_.end()) - it->second->PossiblyResumeIfSendStalled(); - - // The size should decrease unless we got send-stalled again. - if (!IsSendStalled()) - DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); - } -} - -SpdyStreamId SpdySession::PopStreamToPossiblyResume() { - for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { - std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; - if (!queue->empty()) { - SpdyStreamId stream_id = queue->front(); - queue->pop_front(); - return stream_id; - } - } - return 0; -} - void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); @@ -2295,16 +2258,22 @@ void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { -delta_window_size, session_send_window_size_)); } -void SpdySession::IncreaseRecvWindowSize(size_t delta_window_size) { - if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) - return; +void SpdySession::OnReadBufferConsumed( + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + DCHECK_GE(consume_size, 1u); + DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); + IncreaseRecvWindowSize(static_cast<int32>(consume_size)); +} +void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(session_unacked_recv_window_bytes_, 0); DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); - DCHECK_GE(delta_window_size, 1u); + DCHECK_GE(delta_window_size, 1); // Check for overflow. - DCHECK_LE(delta_window_size, - static_cast<size_t>(kint32max - session_recv_window_size_)); + DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); session_recv_window_size_ += delta_window_size; net_log_.AddEvent( @@ -2343,4 +2312,65 @@ void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { -delta_window_size, session_recv_window_size_)); } +void SpdySession::QueueSendStalledStream( + const scoped_refptr<SpdyStream>& stream) { + DCHECK(stream->send_stalled_by_flow_control()); + stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id()); +} + +namespace { + +// Helper function to return the total size of an array of objects +// with .size() member functions. +template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { + size_t total_size = 0; + for (size_t i = 0; i < N; ++i) { + total_size += arr[i].size(); + } + return total_size; +} + +} // namespace + +void SpdySession::ResumeSendStalledStreams() { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + + // We don't have to worry about new streams being queued, since + // doing so would cause IsSendStalled() to return true. But we do + // have to worry about streams being closed, as well as ourselves + // being closed. + + while (!IsClosed() && !IsSendStalled()) { + size_t old_size = 0; + if (DCHECK_IS_ON()) + old_size = GetTotalSize(stream_send_unstall_queue_); + + SpdyStreamId stream_id = PopStreamToPossiblyResume(); + if (stream_id == 0) + break; + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + // The stream may actually still be send-stalled after this (due + // to its own send window) but that's okay -- it'll then be + // resumed once its send window increases. + if (it != active_streams_.end()) + it->second->PossiblyResumeIfSendStalled(); + + // The size should decrease unless we got send-stalled again. + if (!IsSendStalled()) + DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); + } +} + +SpdyStreamId SpdySession::PopStreamToPossiblyResume() { + for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { + std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; + if (!queue->empty()) { + SpdyStreamId stream_id = queue->front(); + queue->pop_front(); + return stream_id; + } + } + return 0; +} + } // namespace net diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 0746398..63d104d 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -458,13 +458,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation4); - FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, IncreaseRecvWindowSize); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustRecvWindowSize31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustSendWindowSize31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlInactiveStream31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlNoReceiveLeaks31); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, + SessionFlowControlNoSendLeaks31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31); typedef std::deque<SpdyStreamRequest*> PendingStreamRequestQueue; @@ -652,14 +653,59 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, bool fin, const SpdyHeaderBlock& headers) OVERRIDE; - // If session flow control is turned on, called by OnWindowUpdate() - // (which is in turn called by the framer) to increase this - // session's send window size by |delta_window_size| from a - // WINDOW_UPDATE frome, which must be at least 1. If - // |delta_window_size| would cause this session's send window size - // to overflow, does nothing. + // Called when bytes are consumed from a SpdyBuffer for a DATA frame + // that is to be written or is being written. Increases the send + // window size accordingly if some or all of the SpdyBuffer is being + // discarded. + // + // If session flow control is turned off, this must not be called. + void OnWriteBufferConsumed(size_t frame_payload_size, + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source); + + // Called by OnWindowUpdate() (which is in turn called by the + // framer) to increase this session's send window size by + // |delta_window_size| from a WINDOW_UPDATE frome, which must be at + // least 1. If |delta_window_size| would cause this session's send + // window size to overflow, does nothing. + // + // If session flow control is turned off, this must not be called. void IncreaseSendWindowSize(int32 delta_window_size); + // If session flow control is turned on, called by CreateDataFrame() + // (which is in turn called by a stream) to decrease this session's + // send window size by |delta_window_size|, which must be at least 1 + // and at most kMaxSpdyFrameChunkSize. |delta_window_size| must not + // cause this session's send window size to go negative. + // + // If session flow control is turned off, this must not be called. + void DecreaseSendWindowSize(int32 delta_window_size); + + // Called when bytes are consumed by the delegate from a SpdyBuffer + // containing received data. Increases the receive window size + // accordingly. + // + // If session flow control is turned off, this must not be called. + void OnReadBufferConsumed(size_t consume_size, + SpdyBuffer::ConsumeSource consume_source); + + // Called by OnReadBufferConsume to increase this session's receive + // window size by |delta_window_size|, which must be at least 1 and + // must not cause this session's receive window size to overflow, + // possibly also sending a WINDOW_UPDATE frame. Also called during + // initialization to set the initial receive window size. + // + // If session flow control is turned off, this must not be called. + void IncreaseRecvWindowSize(int32 delta_window_size); + + // Called by OnStreamFrameData (which is in turn called by the + // framer) to decrease this session's receive window size by + // |delta_window_size|, which must be at least 1 and must not cause + // this session's receive window size to go negative. + // + // If session flow control is turned off, this must not be called. + void DecreaseRecvWindowSize(int32 delta_window_size); + // Queue a send-stalled stream for possibly resuming once we're not // send-stalled anymore. void QueueSendStalledStream(const scoped_refptr<SpdyStream>& stream); @@ -672,29 +718,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // empty. SpdyStreamId PopStreamToPossiblyResume(); - // If session flow control is turned on, called by CreateDataFrame() - // (which is in turn called by a stream) to decrease this session's - // send window size by |delta_window_size|, which must be at least 1 - // and at most kMaxSpdyFrameChunkSize. |delta_window_size| must not - // cause this session's send window size to go negative. - void DecreaseSendWindowSize(int32 delta_window_size); - - // Called by SpdyBuffers (via ConsumeCallbacks) to increase this - // session's receive window size by |delta_window_size|, which must - // be at least 1 and must not cause this session's receive window - // size to overflow, possibly also sending a WINDOW_UPDATE - // frame. Also called during initialization to set the initial - // receive window size. Does nothing if session flow control is - // turned off. - void IncreaseRecvWindowSize(size_t delta_window_size); - - // If session flow control is turned on, called by OnStreamFrameData - // (which is in turn called by the framer) to decrease this - // session's receive window size by |delta_window_size|, which must - // be at least 1 and must not cause this session's receive window - // size to go negative. - void DecreaseRecvWindowSize(int32 delta_window_size); - // -------------------------- // Helper methods for testing // -------------------------- diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index c15d3fd..f927bb4 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -2103,34 +2103,6 @@ TEST_F(SpdySessionSpdy3Test, ProtocolNegotiation4) { EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); } -// SpdySession::IncreaseRecvWindowSize should be callable even if -// session flow control isn't turned on, but it should have no effect. -TEST_F(SpdySessionSpdy3Test, IncreaseRecvWindowSize) { - session_deps_.host_resolver->set_synchronous_mode(true); - - MockConnect connect_data(SYNCHRONOUS, OK); - MockRead reads[] = { - MockRead(SYNCHRONOUS, 0, 0) // EOF - }; - StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); - data.set_connect_data(connect_data); - session_deps_.socket_factory->AddSocketDataProvider(&data); - - CreateNetworkSession(); - scoped_refptr<SpdySession> session = GetSession(pair_); - InitializeSession( - http_session_.get(), session.get(), test_host_port_pair_); - EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM, - session->flow_control_state()); - - EXPECT_EQ(0, session->session_recv_window_size_); - EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); - - session->IncreaseRecvWindowSize(100); - EXPECT_EQ(0, session->session_recv_window_size_); - EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); -} - // SpdySession::{Increase,Decrease}RecvWindowSize should properly // adjust the session receive window size when the "enable_spdy_31" // flag is set. In addition, SpdySession::IncreaseRecvWindowSize @@ -2376,6 +2348,87 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlNoReceiveLeaks31) { EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); } +// Send data back and forth but close the stream before its data frame +// can be written to the socket. The send window should then increase +// to its original value, i.e. we shouldn't "leak" send window bytes. +TEST_F(SpdySessionSpdy3Test, SessionFlowControlNoSendLeaks31) { + const char kStreamUrl[] = "http://www.google.com/"; + + session_deps_.enable_spdy_31 = true; + + const int32 msg_data_size = 100; + const std::string msg_data(msg_data_size, 'a'); + + MockConnect connect_data(SYNCHRONOUS, OK); + + scoped_ptr<SpdyFrame> initial_window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, + kDefaultInitialRecvWindowSize - kSpdySessionInitialWindowSize)); + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, 1, msg_data_size, MEDIUM, NULL, 0)); + MockWrite writes[] = { + CreateMockWrite(*initial_window_update, 0), + CreateMockWrite(*req, 1), + }; + + scoped_ptr<SpdyFrame> resp(ConstructSpdyGetSynReply(NULL, 0, 1)); + MockRead reads[] = { + CreateMockRead(*resp, 2), + MockRead(ASYNC, 0, 3) // EOF + }; + + // Create SpdySession and SpdyStream and send the request. + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + data.set_connect_data(connect_data); + session_deps_.host_resolver->set_synchronous_mode(true); + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + SSLSocketDataProvider ssl(SYNCHRONOUS, OK); + session_deps_.deterministic_socket_factory->AddSSLSocketDataProvider(&ssl); + + CreateDeterministicNetworkSession(); + + scoped_refptr<SpdySession> session = CreateInitializedSession(); + + GURL url(kStreamUrl); + scoped_refptr<SpdyStream> stream = + CreateStreamSynchronously(session, url, MEDIUM, BoundNetLog()); + ASSERT_TRUE(stream.get() != NULL); + EXPECT_EQ(0u, stream->stream_id()); + + test::StreamDelegateSendImmediate delegate( + stream.get(), scoped_ptr<SpdyHeaderBlock>(), msg_data); + stream->SetDelegate(&delegate); + + stream->set_spdy_headers( + ConstructPostHeaderBlock(url.spec(), msg_data_size)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + + data.RunFor(2); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + + data.RunFor(1); + + EXPECT_TRUE(data.at_write_eof()); + EXPECT_TRUE(data.at_read_eof()); + + EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size, + session->session_send_window_size_); + + // Closing the stream should increase the session's send window. + stream->Close(); + + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); + + EXPECT_EQ(OK, delegate.WaitForClose()); +} + // Send data back and forth; the send and receive windows should // change appropriately. TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { @@ -2488,8 +2541,9 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { EXPECT_EQ(msg_data, delegate.TakeReceivedData()); - // Draining the delegate's read queue should increase our receive - // window. + // Draining the delegate's read queue should increase the session's + // receive window. + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); @@ -2497,6 +2551,7 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { EXPECT_EQ(OK, delegate.WaitForClose()); + EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_); EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); } diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 2d7f5e6..482b12c 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -256,15 +256,32 @@ void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { PossiblyResumeIfSendStalled(); } +void SpdyStream::OnWriteBufferConsumed( + size_t frame_payload_size, + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source) { + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); + if (consume_source == SpdyBuffer::DISCARD) { + // If we're discarding a frame or part of it, increase the send + // window by the number of discarded bytes. (Although if we're + // discarding part of a frame, it's probably because of a write + // error and we'll be tearing down the stream soon.) + size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); + DCHECK_GT(remaining_payload_bytes, 0u); + IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); + } + // For consumed bytes, the send window is increased when we receive + // a WINDOW_UPDATE frame. +} + void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); + DCHECK_GE(delta_window_size, 1); // Ignore late WINDOW_UPDATEs. if (closed()) return; - DCHECK_GE(delta_window_size, 1); - if (send_window_size_ > 0) { // Check for overflow. int32 max_delta_window_size = kint32max - send_window_size_; @@ -296,7 +313,7 @@ void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { // We only call this method when sending a frame. Therefore, // |delta_window_size| should be within the valid frame size range. - DCHECK_GE(delta_window_size, 0); + DCHECK_GE(delta_window_size, 1); DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); // |send_window_size_| should have been at least |delta_window_size| for @@ -311,6 +328,68 @@ void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { stream_id_, -delta_window_size, send_window_size_)); } +void SpdyStream::OnReadBufferConsumed( + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source) { + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); + DCHECK_GE(consume_size, 1u); + DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); + IncreaseRecvWindowSize(static_cast<int32>(consume_size)); +} + +void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); + + // By the time a read is processed by the delegate, this stream may + // already be inactive. + if (!session_->IsStreamActive(stream_id_)) + return; + + DCHECK_GE(unacked_recv_window_bytes_, 0); + DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); + DCHECK_GE(delta_window_size, 1); + // Check for overflow. + DCHECK_LE(delta_window_size, kint32max - recv_window_size_); + + recv_window_size_ += delta_window_size; + net_log_.AddEvent( + NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, + base::Bind(&NetLogSpdyStreamWindowUpdateCallback, + stream_id_, delta_window_size, recv_window_size_)); + + unacked_recv_window_bytes_ += delta_window_size; + if (unacked_recv_window_bytes_ > + session_->stream_initial_recv_window_size() / 2) { + session_->SendStreamWindowUpdate( + stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); + unacked_recv_window_bytes_ = 0; + } +} + +void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { + DCHECK(session_->IsStreamActive(stream_id_)); + DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); + DCHECK_GE(delta_window_size, 1); + + // Since we never decrease the initial window size, + // |delta_window_size| should never cause |recv_window_size_| to go + // negative. If we do, it's a client-side bug, so we use + // PROTOCOL_ERROR for lack of a better error code. + if (delta_window_size > recv_window_size_) { + session_->ResetStream( + stream_id_, RST_STREAM_PROTOCOL_ERROR, + "Invalid delta_window_size for DecreaseRecvWindowSize"); + NOTREACHED(); + return; + } + + recv_window_size_ -= delta_window_size; + net_log_.AddEvent( + NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, + base::Bind(&NetLogSpdyStreamWindowUpdateCallback, + stream_id_, -delta_window_size, recv_window_size_)); +} + int SpdyStream::GetPeerAddress(IPEndPoint* address) const { return session_->GetPeerAddress(address); } @@ -456,7 +535,7 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { DecreaseRecvWindowSize(static_cast<int32>(length)); buffer->AddConsumeCallback( - base::Bind(&SpdyStream::IncreaseRecvWindowSize, + base::Bind(&SpdyStream::OnReadBufferConsumed, weak_ptr_factory_.GetWeakPtr())); } @@ -569,6 +648,22 @@ void SpdyStream::QueueStreamData(IOBuffer* data, if (!data_buffer) return; + if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { + DCHECK_GE(data_buffer->GetRemainingSize(), + session_->GetDataFrameMinimumSize()); + size_t payload_size = + data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); + DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); + DecreaseSendWindowSize(static_cast<int32>(payload_size)); + // This currently isn't strictly needed, since write frames are + // discarded only if the stream is about to be closed. But have it + // here anyway just in case this changes. + data_buffer->AddConsumeCallback( + base::Bind(&SpdyStream::OnWriteBufferConsumed, + weak_ptr_factory_.GetWeakPtr(), + payload_size)); + } + session_->EnqueueStreamWrite( this, DATA, scoped_ptr<SpdyBufferProducer>( @@ -889,59 +984,4 @@ void SpdyStream::UpdateHistograms() { UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); } -void SpdyStream::IncreaseRecvWindowSize(size_t delta_window_size) { - if (session_->flow_control_state() < SpdySession::FLOW_CONTROL_STREAM) - return; - - // By the time a read is processed by the delegate, this stream may - // already be inactive. - if (!session_->IsStreamActive(stream_id_)) - return; - - DCHECK_GE(unacked_recv_window_bytes_, 0); - DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); - DCHECK_GE(delta_window_size, 1u); - // Check for overflow. - DCHECK_LE(delta_window_size, - static_cast<size_t>(kint32max - recv_window_size_)); - - recv_window_size_ += delta_window_size; - net_log_.AddEvent( - NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, - base::Bind(&NetLogSpdyStreamWindowUpdateCallback, - stream_id_, delta_window_size, recv_window_size_)); - - unacked_recv_window_bytes_ += delta_window_size; - if (unacked_recv_window_bytes_ > - session_->stream_initial_recv_window_size() / 2) { - session_->SendStreamWindowUpdate( - stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); - unacked_recv_window_bytes_ = 0; - } -} - -void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { - DCHECK(session_->IsStreamActive(stream_id_)); - DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); - DCHECK_GE(delta_window_size, 1); - - // Since we never decrease the initial window size, - // |delta_window_size| should never cause |recv_window_size_| to go - // negative. If we do, it's a client-side bug, so we use - // PROTOCOL_ERROR for lack of a better error code. - if (delta_window_size > recv_window_size_) { - session_->ResetStream( - stream_id_, RST_STREAM_PROTOCOL_ERROR, - "Invalid delta_window_size for DecreaseRecvWindowSize"); - NOTREACHED(); - return; - } - - recv_window_size_ -= delta_window_size; - net_log_.AddEvent( - NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, - base::Bind(&NetLogSpdyStreamWindowUpdateCallback, - stream_id_, -delta_window_size, recv_window_size_)); -} - } // namespace net diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 1ae204e..d3ef69f 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -147,24 +147,35 @@ class NET_EXPORT_PRIVATE SpdyStream send_stalled_by_flow_control_ = stalled; } - // If stream flow control is turned on, called by the session to - // adjust this stream's send window size by |delta_window_size|, - // which is the difference between the SETTINGS_INITIAL_WINDOW_SIZE - // in the most recent SETTINGS frame and the previous initial send - // window size, possibly unstalling this stream. Although - // |delta_window_size| may cause this stream's send window size to - // go negative, it must not cause it to wrap around in either - // direction. Does nothing if the stream is already closed. + // Called by the session to adjust this stream's send window size by + // |delta_window_size|, which is the difference between the + // SETTINGS_INITIAL_WINDOW_SIZE in the most recent SETTINGS frame + // and the previous initial send window size, possibly unstalling + // this stream. Although |delta_window_size| may cause this stream's + // send window size to go negative, it must not cause it to wrap + // around in either direction. Does nothing if the stream is already + // closed. // // If stream flow control is turned off, this must not be called. void AdjustSendWindowSize(int32 delta_window_size); - // If stream flow control is turned on, called by the session to - // increase this stream's send window size by |delta_window_size| - // from a WINDOW_UPDATE frome, which must be at least 1, possibly - // unstalling this stream. If |delta_window_size| would cause this - // stream's send window size to overflow, calls into the session to - // reset this stream. Does nothing if the stream is already closed. + // Called when bytes are consumed from a SpdyBuffer for a DATA frame + // that is to be written or is being written. Increases the send + // window size accordingly if some or all of the SpdyBuffer is being + // discarded. + // + // If stream flow control is turned off, this must not be called. + void OnWriteBufferConsumed(size_t frame_payload_size, + size_t consume_size, + SpdyBuffer::ConsumeSource consume_source); + + // Called by the session to increase this stream's send window size + // by |delta_window_size| (which must be at least 1) from a received + // WINDOW_UPDATE frame or from a dropped DATA frame that was + // intended to be sent, possibly unstalling this stream. If + // |delta_window_size| would cause this stream's send window size to + // overflow, calls into the session to reset this stream. Does + // nothing if the stream is already closed. // // If stream flow control is turned off, this must not be called. void IncreaseSendWindowSize(int32 delta_window_size); @@ -178,6 +189,32 @@ class NET_EXPORT_PRIVATE SpdyStream // If stream flow control is turned off, this must not be called. void DecreaseSendWindowSize(int32 delta_window_size); + // Called when bytes are consumed by the delegate from a SpdyBuffer + // containing received data. Increases the receive window size + // accordingly. + // + // If stream flow control is turned off, this must not be called. + void OnReadBufferConsumed(size_t consume_size, + SpdyBuffer::ConsumeSource consume_source); + + // Called by OnReadBufferConsume to increase this stream's receive + // window size by |delta_window_size|, which must be at least 1 and + // must not cause this stream's receive window size to overflow, + // possibly also sending a WINDOW_UPDATE frame. Does nothing if the + // stream is not active. + // + // If stream flow control is turned off, this must not be called. + void IncreaseRecvWindowSize(int32 delta_window_size); + + // Called by OnDataReceived (which is in turn called by the session) + // to decrease this stream's receive window size by + // |delta_window_size|, which must be at least 1 and must not cause + // this stream's receive window size to go negative. + // + // If stream flow control is turned off or the stream is not active, + // this must not be called. + void DecreaseRecvWindowSize(int32 delta_window_size); + int GetPeerAddress(IPEndPoint* address) const; int GetLocalAddress(IPEndPoint* address) const; @@ -338,23 +375,6 @@ class NET_EXPORT_PRIVATE SpdyStream scoped_ptr<SpdyFrame> ProduceHeaderFrame( scoped_ptr<SpdyHeaderBlock> header_block); - // Called by SpdyBuffers (via ConsumeCallbacks) to increase this - // stream's receive window size by |delta_window_size|, which must - // be at least 1 and must not cause this stream's receive window - // size to overflow, possibly also sending a WINDOW_UPDATE frame. - // - // Unlike the functions above, this may be called even when stream - // flow control is turned off, although this does nothing in that - // case (and also if the stream is inactive). - void IncreaseRecvWindowSize(size_t delta_window_size); - - // If the stream is active and stream flow control is turned on, - // called by OnDataReceived (which is in turn called by the session) - // to decrease this stream's receive window size by - // |delta_window_size|, which must be at least 1 and must not cause - // this stream's receive window size to go negative. - void DecreaseRecvWindowSize(int32 delta_window_size); - base::WeakPtrFactory<SpdyStream> weak_ptr_factory_; // There is a small period of time between when a server pushed stream is |