diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-17 19:23:49 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-17 19:23:49 +0000 |
commit | 09a8d917eb7bc6b45277b24a851ccec6e8e499a9 (patch) | |
tree | abeb2c359417e5681f241c0d8eba10ceacc69aa1 | |
parent | 15c709e5127e3fd3c27b4bd2ac7d642608582f23 (diff) | |
download | chromium_src-09a8d917eb7bc6b45277b24a851ccec6e8e499a9.zip chromium_src-09a8d917eb7bc6b45277b24a851ccec6e8e499a9.tar.gz chromium_src-09a8d917eb7bc6b45277b24a851ccec6e8e499a9.tar.bz2 |
[SPDY] Avoid leaking bytes from the session flow control receive window
Add a way to add ConsumeCallbacks to a SpdyBuffer in order to be notified
when Consume() is called. Use that to ensure that flow control receive
windows are updated appropriately regardless of what the SpdyStream's
delegate does.
Make IncreaseRevWindowSize private in both SpdyStream
and SpdySession.
BUG=176592
Review URL: https://codereview.chromium.org/14311002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194655 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/spdy/spdy_buffer.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_buffer.h | 16 | ||||
-rw-r--r-- | net/spdy/spdy_buffer_unittest.cc | 30 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream.cc | 5 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket.cc | 7 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 62 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 18 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 126 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 72 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 20 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_stream_test_util.cc | 20 | ||||
-rw-r--r-- | net/spdy/spdy_stream_test_util.h | 8 |
14 files changed, 300 insertions, 120 deletions
diff --git a/net/spdy/spdy_buffer.cc b/net/spdy/spdy_buffer.cc index dc96158..a48b35d 100644 --- a/net/spdy/spdy_buffer.cc +++ b/net/spdy/spdy_buffer.cc @@ -6,6 +6,7 @@ #include <cstring> +#include "base/callback.h" #include "base/logging.h" #include "net/base/io_buffer.h" #include "net/spdy/spdy_protocol.h" @@ -38,7 +39,10 @@ SpdyBuffer::SpdyBuffer(const char* data, size_t size) : frame_(MakeSpdyFrame(data, size)), offset_(0) {} -SpdyBuffer::~SpdyBuffer() {} +SpdyBuffer::~SpdyBuffer() { + if (GetRemainingSize() > 0) + Consume(GetRemainingSize()); +} const char* SpdyBuffer::GetRemainingData() const { return frame_->data() + offset_; @@ -48,10 +52,18 @@ size_t SpdyBuffer::GetRemainingSize() const { return frame_->size() - offset_; } +void SpdyBuffer::AddConsumeCallback(const ConsumeCallback& consume_callback) { + consume_callbacks_.push_back(consume_callback); +} + void SpdyBuffer::Consume(size_t consume_size) { DCHECK_GE(consume_size, 1u); DCHECK_LE(consume_size, GetRemainingSize()); offset_ += consume_size; + for (std::vector<ConsumeCallback>::const_iterator it = + consume_callbacks_.begin(); it != consume_callbacks_.end(); ++it) { + it->Run(consume_size); + } }; IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() { diff --git a/net/spdy/spdy_buffer.h b/net/spdy/spdy_buffer.h index c4c5b8f..4d04184 100644 --- a/net/spdy/spdy_buffer.h +++ b/net/spdy/spdy_buffer.h @@ -6,8 +6,10 @@ #define NET_SPDY_SPDY_BUFFER_H_ #include <cstddef> +#include <vector> #include "base/basictypes.h" +#include "base/callback_forward.h" #include "base/memory/scoped_ptr.h" #include "net/base/net_export.h" @@ -26,6 +28,10 @@ class SpdyFrame; // fact that IOBuffer member functions are not virtual. class NET_EXPORT_PRIVATE SpdyBuffer { public: + // A Callback that gets called whenever Consume() is called with the + // number of bytes consumed. + typedef base::Callback<void(size_t)> ConsumeCallback; + // Construct with the data in the given frame. Assumes that data is // owned by |frame| or outlives it. explicit SpdyBuffer(scoped_ptr<SpdyFrame> frame); @@ -42,10 +48,15 @@ class NET_EXPORT_PRIVATE SpdyBuffer { // Returns the number of remaining (unconsumed) bytes. size_t GetRemainingSize() const; + // Add a callback which is called whenever Consume() is called. Used + // mainly to update flow control windows. The ConsumeCallback should + // not do anything complicated; ideally it should only update a + // counter. In particular, it must *not* cause the SpdyBuffer itself + // to be destroyed. + void AddConsumeCallback(const ConsumeCallback& consume_callback); + // Consume the given number of bytes, which must be positive but not // greater than GetRemainingSize(). - // - // TODO(akalin): Add a way to get notified when Consume() is called. void Consume(size_t consume_size); // Returns an IOBuffer pointing to the data starting at @@ -56,6 +67,7 @@ class NET_EXPORT_PRIVATE SpdyBuffer { private: const scoped_ptr<SpdyFrame> frame_; + std::vector<ConsumeCallback> consume_callbacks_; size_t offset_; DISALLOW_COPY_AND_ASSIGN(SpdyBuffer); diff --git a/net/spdy/spdy_buffer_unittest.cc b/net/spdy/spdy_buffer_unittest.cc index fbd7705..bcf25d2 100644 --- a/net/spdy/spdy_buffer_unittest.cc +++ b/net/spdy/spdy_buffer_unittest.cc @@ -8,6 +8,7 @@ #include <string> #include "base/basictypes.h" +#include "base/bind.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "net/base/io_buffer.h" @@ -53,18 +54,45 @@ TEST_F(SpdyBufferTest, DataConstructor) { EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer)); } +void IncrementBy(size_t* x, size_t delta) { + *x += delta; +} + // Construct a SpdyBuffer and call Consume() on it, which should -// update the remaining data pointer and size appropriately. +// update the remaining data pointer and size appropriately, as well +// as calling the consume callbacks. TEST_F(SpdyBufferTest, Consume) { SpdyBuffer buffer(kData, kDataSize); + size_t x1 = 0; + size_t x2 = 0; + buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x1)); + buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x2)); + EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer)); buffer.Consume(5); EXPECT_EQ(std::string(kData + 5, kDataSize - 5), BufferToString(buffer)); + EXPECT_EQ(5u, x1); + EXPECT_EQ(5u, x2); buffer.Consume(kDataSize - 5); EXPECT_EQ(0u, buffer.GetRemainingSize()); + EXPECT_EQ(kDataSize, x1); + EXPECT_EQ(kDataSize, x2); +} + +// Construct a SpdyBuffer and attach a ConsumeCallback to it. The +// callback should be called when the SpdyBuffer is destroyed. +TEST_F(SpdyBufferTest, ConsumeOnDestruction) { + size_t x = 0; + + { + SpdyBuffer buffer(kData, kDataSize); + buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x)); + } + + EXPECT_EQ(kDataSize, x); } // Make sure the IOBuffer returned by GetIOBufferForRemainingData() diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index 9053bc3..c9f3ab9 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -136,10 +136,7 @@ int SpdyHttpStream::ReadResponseBody( // If we have data buffered, complete the IO immediately. if (!response_body_queue_.IsEmpty()) { - size_t bytes_consumed = response_body_queue_.Dequeue(buf->data(), buf_len); - if (stream_) - stream_->IncreaseRecvWindowSize(bytes_consumed); - return bytes_consumed; + return response_body_queue_.Dequeue(buf->data(), buf_len); } else if (stream_closed_) { return closed_stream_status_; } diff --git a/net/spdy/spdy_proxy_client_socket.cc b/net/spdy/spdy_proxy_client_socket.cc index c09a5bd..245a58b 100644 --- a/net/spdy/spdy_proxy_client_socket.cc +++ b/net/spdy/spdy_proxy_client_socket.cc @@ -217,12 +217,7 @@ int SpdyProxyClientSocket::Read(IOBuffer* buf, int buf_len, } size_t SpdyProxyClientSocket::PopulateUserReadBuffer(char* data, size_t len) { - size_t bytes_consumed = read_buffer_queue_.Dequeue(data, len); - - if (bytes_consumed > 0 && spdy_stream_) - spdy_stream_->IncreaseRecvWindowSize(bytes_consumed); - - return bytes_consumed; + return read_buffer_queue_.Dequeue(data, len); } int SpdyProxyClientSocket::Write(IOBuffer* buf, int buf_len, diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index ba84ea1..8fa52db 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -1439,14 +1439,17 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, if (it == active_streams_.end()) return; - // Only decrease the window size for data for active streams. - if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) - DecreaseRecvWindowSize(static_cast<int32>(len)); - scoped_ptr<SpdyBuffer> buffer; if (data) { DCHECK_GT(len, 0u); buffer.reset(new SpdyBuffer(data, len)); + + if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { + DecreaseRecvWindowSize(static_cast<int32>(len)); + buffer->AddConsumeCallback( + base::Bind(&SpdySession::IncreaseRecvWindowSize, + weak_factory_.GetWeakPtr())); + } } else { DCHECK_EQ(len, 0u); } @@ -1842,31 +1845,6 @@ void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, SendWindowUpdateFrame(stream_id, delta_window_size, stream->priority()); } -void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { - if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) - return; - - DCHECK_GE(session_unacked_recv_window_bytes_, 0); - DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); - DCHECK_GE(delta_window_size, 1); - // Check for overflow. - DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); - - session_recv_window_size_ += delta_window_size; - net_log_.AddEvent( - NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, - base::Bind(&NetLogSpdySessionWindowUpdateCallback, - delta_window_size, session_recv_window_size_)); - - session_unacked_recv_window_bytes_ += delta_window_size; - if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { - SendWindowUpdateFrame(kSessionFlowControlStreamId, - session_unacked_recv_window_bytes_, - HIGHEST); - session_unacked_recv_window_bytes_ = 0; - } -} - // Given a cwnd that we would have sent to the server, modify it based on the // field trial policy. uint32 ApplyCwndFieldTrialPolicy(int cwnd) { @@ -2317,6 +2295,32 @@ void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { -delta_window_size, session_send_window_size_)); } +void SpdySession::IncreaseRecvWindowSize(size_t delta_window_size) { + if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) + return; + + DCHECK_GE(session_unacked_recv_window_bytes_, 0); + DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); + DCHECK_GE(delta_window_size, 1u); + // Check for overflow. + DCHECK_LE(delta_window_size, + static_cast<size_t>(kint32max - session_recv_window_size_)); + + session_recv_window_size_ += delta_window_size; + net_log_.AddEvent( + NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, + base::Bind(&NetLogSpdySessionWindowUpdateCallback, + delta_window_size, session_recv_window_size_)); + + session_unacked_recv_window_bytes_ += delta_window_size; + if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { + SendWindowUpdateFrame(kSessionFlowControlStreamId, + session_unacked_recv_window_bytes_, + HIGHEST); + session_unacked_recv_window_bytes_ = 0; + } +} + void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(delta_window_size, 1); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 55ac9f9..0746398 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -316,13 +316,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, void SendStreamWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size); - // Called by a stream to increase this session's receive window size - // by |delta_window_size|, which must be at least 1 and must not - // cause this session's receive window size to overflow, possibly - // also sending a WINDOW_UPDATE frame. Does nothing if session flow - // control is turned off. - void IncreaseRecvWindowSize(int32 delta_window_size); - // If session is closed, no new streams/transactions should be created. bool IsClosed() const { return state_ == STATE_CLOSED; } @@ -470,6 +463,8 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustSendWindowSize31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlInactiveStream31); + FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, + SessionFlowControlNoReceiveLeaks31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31); typedef std::deque<SpdyStreamRequest*> PendingStreamRequestQueue; @@ -684,6 +679,15 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // cause this session's send window size to go negative. void DecreaseSendWindowSize(int32 delta_window_size); + // Called by SpdyBuffers (via ConsumeCallbacks) to increase this + // session's receive window size by |delta_window_size|, which must + // be at least 1 and must not cause this session's receive window + // size to overflow, possibly also sending a WINDOW_UPDATE + // frame. Also called during initialization to set the initial + // receive window size. Does nothing if session flow control is + // turned off. + void IncreaseRecvWindowSize(size_t delta_window_size); + // If session flow control is turned on, called by OnStreamFrameData // (which is in turn called by the framer) to decrease this // session's receive window size by |delta_window_size|, which must diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 1c63960..c15d3fd 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -2274,6 +2274,108 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlInactiveStream31) { EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); } +// A delegate that drops any received data. +class DropReceivedDataDelegate : public test::StreamDelegateSendImmediate { + public: + DropReceivedDataDelegate(const scoped_refptr<SpdyStream>& stream, + base::StringPiece data) + : StreamDelegateSendImmediate( + stream, scoped_ptr<SpdyHeaderBlock>(), data) {} + + virtual ~DropReceivedDataDelegate() {} + + // Drop any received data. + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE { + return OK; + } +}; + +// Send data back and forth but use a delegate that drops its received +// data. The receive window should still increase to its original +// value, i.e. we shouldn't "leak" receive window bytes. +TEST_F(SpdySessionSpdy3Test, SessionFlowControlNoReceiveLeaks31) { + const char kStreamUrl[] = "http://www.google.com/"; + + session_deps_.enable_spdy_31 = true; + + const int32 msg_data_size = 100; + const std::string msg_data(msg_data_size, 'a'); + + MockConnect connect_data(SYNCHRONOUS, OK); + + scoped_ptr<SpdyFrame> initial_window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, + kDefaultInitialRecvWindowSize - kSpdySessionInitialWindowSize)); + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, 1, msg_data_size, MEDIUM, NULL, 0)); + scoped_ptr<SpdyFrame> msg( + ConstructSpdyBodyFrame(1, msg_data.data(), msg_data_size, false)); + MockWrite writes[] = { + CreateMockWrite(*initial_window_update, 0), + CreateMockWrite(*req, 1), + CreateMockWrite(*msg, 3), + }; + + scoped_ptr<SpdyFrame> resp(ConstructSpdyGetSynReply(NULL, 0, 1)); + scoped_ptr<SpdyFrame> echo( + ConstructSpdyBodyFrame(1, msg_data.data(), msg_data_size, false)); + scoped_ptr<SpdyFrame> window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, msg_data_size)); + MockRead reads[] = { + CreateMockRead(*resp, 2), + CreateMockRead(*echo, 4), + MockRead(ASYNC, 0, 5) // EOF + }; + + // Create SpdySession and SpdyStream and send the request. + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + data.set_connect_data(connect_data); + session_deps_.host_resolver->set_synchronous_mode(true); + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + SSLSocketDataProvider ssl(SYNCHRONOUS, OK); + session_deps_.deterministic_socket_factory->AddSSLSocketDataProvider(&ssl); + + CreateDeterministicNetworkSession(); + + scoped_refptr<SpdySession> session = CreateInitializedSession(); + + GURL url(kStreamUrl); + scoped_refptr<SpdyStream> stream = + CreateStreamSynchronously(session, url, MEDIUM, BoundNetLog()); + ASSERT_TRUE(stream.get() != NULL); + EXPECT_EQ(0u, stream->stream_id()); + + DropReceivedDataDelegate delegate(stream, msg_data); + stream->SetDelegate(&delegate); + + stream->set_spdy_headers( + ConstructPostHeaderBlock(url.spec(), msg_data_size)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); + EXPECT_EQ(0, session->session_unacked_recv_window_bytes_); + + data.RunFor(5); + + EXPECT_TRUE(data.at_write_eof()); + EXPECT_TRUE(data.at_read_eof()); + + EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); + EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); + + stream->Close(); + + EXPECT_EQ(OK, delegate.WaitForClose()); + + EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); + EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); +} + // Send data back and forth; the send and receive windows should // change appropriately. TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { @@ -2384,15 +2486,19 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { EXPECT_TRUE(data.at_write_eof()); EXPECT_TRUE(data.at_read_eof()); - // Normally done by the delegate, but not by our test delegate. - session->IncreaseRecvWindowSize(msg_data_size); + EXPECT_EQ(msg_data, delegate.TakeReceivedData()); + // Draining the delegate's read queue should increase our receive + // window. EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); stream->Close(); EXPECT_EQ(OK, delegate.WaitForClose()); + + EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_); + EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_); } // Cause a stall by reducing the flow control send window to 0. The @@ -2478,7 +2584,7 @@ TEST_F(SpdySessionSpdy3Test, ResumeAfterSendWindowSizeIncrease31) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); - EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate.body_data_sent()); } @@ -2614,13 +2720,13 @@ TEST_F(SpdySessionSpdy3Test, ResumeByPriorityAfterSendWindowSizeIncrease31) { EXPECT_TRUE(delegate1.send_headers_completed()); EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); - EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate1.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate1.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); - EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate2.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate2.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent()); } @@ -2808,19 +2914,19 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) { EXPECT_TRUE(delegate1.send_headers_completed()); EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); - EXPECT_EQ("", delegate1.received_data()); + EXPECT_EQ(std::string(), delegate1.TakeReceivedData()); EXPECT_EQ(0, delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); - EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate2.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate2.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent()); EXPECT_TRUE(delegate3.send_headers_completed()); EXPECT_EQ("200", delegate3.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate3.GetResponseHeaderValue(":version")); - EXPECT_EQ("", delegate3.received_data()); + EXPECT_EQ(std::string(), delegate3.TakeReceivedData()); EXPECT_EQ(0, delegate3.body_data_sent()); } @@ -2969,13 +3075,13 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedSession31) { EXPECT_TRUE(delegate1.send_headers_completed()); EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); - EXPECT_EQ("", delegate1.received_data()); + EXPECT_EQ(std::string(), delegate1.TakeReceivedData()); EXPECT_EQ(0, delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); - EXPECT_EQ("", delegate2.received_data()); + EXPECT_EQ(std::string(), delegate2.TakeReceivedData()); EXPECT_EQ(0, delegate2.body_data_sent()); } diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 3de3716..2d7f5e6 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -311,41 +311,6 @@ void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { stream_id_, -delta_window_size, send_window_size_)); } -void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { - if (session_->flow_control_state() < SpdySession::FLOW_CONTROL_STREAM) - return; - - // Call back into the session, since this is the only - // window-size-related function that is called by the delegate - // instead of by the session. - session_->IncreaseRecvWindowSize(delta_window_size); - - // By the time a read is processed by the delegate, this stream may - // already be inactive. - if (!session_->IsStreamActive(stream_id_)) - return; - - DCHECK_GE(unacked_recv_window_bytes_, 0); - DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); - DCHECK_GE(delta_window_size, 1); - // Check for overflow. - DCHECK_LE(delta_window_size, kint32max - recv_window_size_); - - recv_window_size_ += delta_window_size; - net_log_.AddEvent( - NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, - base::Bind(&NetLogSpdyStreamWindowUpdateCallback, - stream_id_, delta_window_size, recv_window_size_)); - - unacked_recv_window_bytes_ += delta_window_size; - if (unacked_recv_window_bytes_ > - session_->stream_initial_recv_window_size() / 2) { - session_->SendStreamWindowUpdate( - stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); - unacked_recv_window_bytes_ = 0; - } -} - int SpdyStream::GetPeerAddress(IPEndPoint* address) const { return session_->GetPeerAddress(address); } @@ -488,8 +453,12 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { size_t length = buffer->GetRemainingSize(); DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); - if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) + if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { DecreaseRecvWindowSize(static_cast<int32>(length)); + buffer->AddConsumeCallback( + base::Bind(&SpdyStream::IncreaseRecvWindowSize, + weak_ptr_factory_.GetWeakPtr())); + } // Track our bandwidth. metrics_.RecordBytes(length); @@ -920,6 +889,37 @@ void SpdyStream::UpdateHistograms() { UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); } +void SpdyStream::IncreaseRecvWindowSize(size_t delta_window_size) { + if (session_->flow_control_state() < SpdySession::FLOW_CONTROL_STREAM) + return; + + // By the time a read is processed by the delegate, this stream may + // already be inactive. + if (!session_->IsStreamActive(stream_id_)) + return; + + DCHECK_GE(unacked_recv_window_bytes_, 0); + DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); + DCHECK_GE(delta_window_size, 1u); + // Check for overflow. + DCHECK_LE(delta_window_size, + static_cast<size_t>(kint32max - recv_window_size_)); + + recv_window_size_ += delta_window_size; + net_log_.AddEvent( + NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, + base::Bind(&NetLogSpdyStreamWindowUpdateCallback, + stream_id_, delta_window_size, recv_window_size_)); + + unacked_recv_window_bytes_ += delta_window_size; + if (unacked_recv_window_bytes_ > + session_->stream_initial_recv_window_size() / 2) { + session_->SendStreamWindowUpdate( + stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); + unacked_recv_window_bytes_ = 0; + } +} + void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { DCHECK(session_->IsStreamActive(stream_id_)); DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index fd145ea..1ae204e 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -178,16 +178,6 @@ class NET_EXPORT_PRIVATE SpdyStream // If stream flow control is turned off, this must not be called. void DecreaseSendWindowSize(int32 delta_window_size); - // Called by the delegate to increase this stream's receive window - // size by |delta_window_size|, which must be at least 1 and must - // not cause this stream's receive window size to overflow, possibly - // also sending a WINDOW_UPDATE frame. - // - // Unlike the functions above, this may be called even when stream - // flow control is turned off, although this does nothing in that - // case (and also if the stream is inactive). - void IncreaseRecvWindowSize(int32 delta_window_size); - int GetPeerAddress(IPEndPoint* address) const; int GetLocalAddress(IPEndPoint* address) const; @@ -348,6 +338,16 @@ class NET_EXPORT_PRIVATE SpdyStream scoped_ptr<SpdyFrame> ProduceHeaderFrame( scoped_ptr<SpdyHeaderBlock> header_block); + // Called by SpdyBuffers (via ConsumeCallbacks) to increase this + // stream's receive window size by |delta_window_size|, which must + // be at least 1 and must not cause this stream's receive window + // size to overflow, possibly also sending a WINDOW_UPDATE frame. + // + // Unlike the functions above, this may be called even when stream + // flow control is turned off, although this does nothing in that + // case (and also if the stream is inactive). + void IncreaseRecvWindowSize(size_t delta_window_size); + // If the stream is active and stream flow control is turned on, // called by OnDataReceived (which is in turn called by the session) // to decrease this stream's receive window size by diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index 64cb31a..a337309 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -134,7 +134,8 @@ TEST_F(SpdyStreamSpdy2Test, SendDataAfterOpen) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue("status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version")); - EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.received_data()); + EXPECT_EQ(std::string(kPostBody, kPostBodyLength), + delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); } @@ -211,7 +212,7 @@ TEST_F(SpdyStreamSpdy2Test, SendHeaderAndDataAfterOpen) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("101", delegate.GetResponseHeaderValue("status")); EXPECT_EQ(1, delegate.headers_sent()); - EXPECT_EQ(std::string(), delegate.received_data()); + EXPECT_EQ(std::string(), delegate.TakeReceivedData()); EXPECT_EQ(6, delegate.data_sent()); } @@ -328,7 +329,8 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue("status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version")); - EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.received_data()); + EXPECT_EQ(std::string(kPostBody, kPostBodyLength), + delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); // Check that the NetLog was filled reasonably. diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index 78c6855..7f787be 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -131,7 +131,8 @@ TEST_F(SpdyStreamSpdy3Test, SendDataAfterOpen) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.received_data()); + EXPECT_EQ(std::string(kPostBody, kPostBodyLength), + delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); } @@ -209,7 +210,7 @@ TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("101", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ(1, delegate.headers_sent()); - EXPECT_EQ(std::string(), delegate.received_data()); + EXPECT_EQ(std::string(), delegate.TakeReceivedData()); EXPECT_EQ(6, delegate.data_sent()); } @@ -328,7 +329,8 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.received_data()); + EXPECT_EQ(std::string(kPostBody, kPostBodyLength), + delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); // Check that the NetLog was filled reasonably. @@ -478,7 +480,8 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncrease) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.received_data()); + EXPECT_EQ(std::string(kPostBody, kPostBodyLength), + delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.body_data_sent()); } @@ -569,7 +572,8 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjust) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.received_data()); + EXPECT_EQ(std::string(kPostBody, kPostBodyLength), + delegate.TakeReceivedData()); EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.body_data_sent()); } diff --git a/net/spdy/spdy_stream_test_util.cc b/net/spdy/spdy_stream_test_util.cc index 0165c20..4af3382 100644 --- a/net/spdy/spdy_stream_test_util.cc +++ b/net/spdy/spdy_stream_test_util.cc @@ -4,6 +4,9 @@ #include "net/spdy/spdy_stream_test_util.h" +#include <cstddef> + +#include "base/stl_util.h" #include "net/base/completion_callback.h" #include "net/spdy/spdy_stream.h" #include "testing/gtest/include/gtest/gtest.h" @@ -78,10 +81,8 @@ void StreamDelegateBase::OnHeadersSent() { } int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { - if (buffer) { - received_data_ += std::string(buffer->GetRemainingData(), - buffer->GetRemainingSize()); - } + if (buffer) + received_data_queue_.Enqueue(buffer.Pass()); return OK; } @@ -102,6 +103,17 @@ int StreamDelegateBase::WaitForClose() { return result; } +std::string StreamDelegateBase::TakeReceivedData() { + size_t len = received_data_queue_.GetTotalSize(); + std::string received_data(len, '\0'); + if (len > 0) { + EXPECT_EQ( + len, + received_data_queue_.Dequeue(string_as_array(&received_data), len)); + } + return received_data; +} + std::string StreamDelegateBase::GetResponseHeaderValue( const std::string& name) const { SpdyHeaderBlock::const_iterator it = response_.find(name); diff --git a/net/spdy/spdy_stream_test_util.h b/net/spdy/spdy_stream_test_util.h index a7ca912..1e59ad8 100644 --- a/net/spdy/spdy_stream_test_util.h +++ b/net/spdy/spdy_stream_test_util.h @@ -11,6 +11,7 @@ #include "base/strings/string_piece.h" #include "net/base/io_buffer.h" #include "net/base/test_completion_callback.h" +#include "net/spdy/spdy_read_queue.h" #include "net/spdy/spdy_stream.h" namespace net { @@ -62,9 +63,12 @@ class StreamDelegateBase : public SpdyStream::Delegate { // to OnClose(). int WaitForClose(); + // Drains all data from the underlying read queue and returns it as + // a string. + std::string TakeReceivedData(); + std::string GetResponseHeaderValue(const std::string& name) const; bool send_headers_completed() const { return send_headers_completed_; } - const std::string& received_data() const { return received_data_; } int headers_sent() const { return headers_sent_; } int data_sent() const { return data_sent_; } @@ -76,7 +80,7 @@ class StreamDelegateBase : public SpdyStream::Delegate { TestCompletionCallback callback_; bool send_headers_completed_; SpdyHeaderBlock response_; - std::string received_data_; + SpdyReadQueue received_data_queue_; int headers_sent_; int data_sent_; }; |