diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-03-21 00:59:11 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-03-21 00:59:11 +0000 |
commit | 1c6b12a885c3fffbae05e45d2223cc1178c25788 (patch) | |
tree | 15761a6552935a10899c2e0e9452eb133b287031 /net | |
parent | 8fee49ddb65c07e8265053bb44ace7296eb99c23 (diff) | |
download | chromium_src-1c6b12a885c3fffbae05e45d2223cc1178c25788.zip chromium_src-1c6b12a885c3fffbae05e45d2223cc1178c25788.tar.gz chromium_src-1c6b12a885c3fffbae05e45d2223cc1178c25788.tar.bz2 |
[SPDY] Fix bug in session flow control which leaves streams stalled
Use the term "send-stalled" in names consistently.
Also remove more boilerplate from unit tests.
BUG=178943
Review URL: https://codereview.chromium.org/12580010
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@189478 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy3_unittest.cc | 2 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 74 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 32 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 459 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 27 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 20 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 61 | ||||
-rw-r--r-- | net/spdy/spdy_stream_test_util.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_stream_test_util.h | 8 |
10 files changed, 571 insertions, 140 deletions
diff --git a/net/spdy/spdy_network_transaction_spdy3_unittest.cc b/net/spdy/spdy_network_transaction_spdy3_unittest.cc index 5ae724b..0bc3ff6 100644 --- a/net/spdy/spdy_network_transaction_spdy3_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy3_unittest.cc @@ -2498,7 +2498,7 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, FlowControlStallResumeAfterSettings) { EXPECT_TRUE(upload_data_stream.IsEOF()); // But the body is not yet fully sent (kUploadData is not yet sent). EXPECT_FALSE(stream->stream()->body_sent()); - EXPECT_TRUE(stream->stream()->stalled_by_flow_control()); + EXPECT_TRUE(stream->stream()->send_stalled_by_flow_control()); data.ForceNextRead(); // Read in SETTINGS frame to unstall. rv = callback.WaitForResult(); diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index c40ae84..56d2bac 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -770,7 +770,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, // controlled. This is why we need the session to mark the stream // as stalled - because only the session knows for sure when the // stall occurs. - stream->set_stalled_by_flow_control(true); + stream->set_send_stalled_by_flow_control(true); net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); @@ -780,7 +780,9 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, effective_window_size = std::min(effective_window_size, session_send_window_size_); if (effective_window_size <= 0) { - stream->set_stalled_by_flow_control(true); + DCHECK(IsSendStalled()); + stream->set_send_stalled_by_flow_control(true); + QueueSendStalledStream(stream); net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); @@ -1331,8 +1333,8 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { // If this is an active stream, call the callback. const scoped_refptr<SpdyStream> stream(it2->second); active_streams_.erase(it2); - if (stream) - stream->OnClose(status); + DCHECK(stream); + stream->OnClose(status); ProcessPendingStreamRequests(); } @@ -2194,6 +2196,70 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, base::Bind(&NetLogSpdySessionWindowUpdateCallback, delta_window_size, session_send_window_size_)); + + DCHECK(!IsSendStalled()); + ResumeSendStalledStreams(); +} + +void SpdySession::QueueSendStalledStream( + const scoped_refptr<SpdyStream>& stream) { + DCHECK(stream->send_stalled_by_flow_control()); + stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id()); +} + +namespace { + +// Helper function to return the total size of an array of objects +// with .size() member functions. +template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { + size_t total_size = 0; + for (size_t i = 0; i < N; ++i) { + total_size += arr[i].size(); + } + return total_size; +} + +} // namespace + +void SpdySession::ResumeSendStalledStreams() { + DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); + + // We don't have to worry about new streams being queued, since + // doing so would cause IsSendStalled() to return true. But we do + // have to worry about streams being closed, as well as ourselves + // being closed. + + while (!IsClosed() && !IsSendStalled()) { + size_t old_size = 0; + if (DCHECK_IS_ON()) + old_size = GetTotalSize(stream_send_unstall_queue_); + + SpdyStreamId stream_id = PopStreamToPossiblyResume(); + if (stream_id == 0) + break; + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + // The stream may actually still be send-stalled after this (due + // to its own send window) but that's okay -- it'll then be + // resumed once its send window increases. + if (it != active_streams_.end()) + it->second->PossiblyResumeIfSendStalled(); + + // The size should decrease unless we got send-stalled again. + if (!IsSendStalled()) + DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); + } +} + +SpdyStreamId SpdySession::PopStreamToPossiblyResume() { + for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { + std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; + if (!queue->empty()) { + SpdyStreamId stream_id = queue->front(); + queue->pop_front(); + return stream_id; + } + } + return 0; } void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 2fe0102..8451bb3 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -8,6 +8,7 @@ #include <deque> #include <list> #include <map> +#include <queue> #include <set> #include <string> @@ -417,6 +418,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, return stream_initial_recv_window_size_; } + // Returns true if no stream in the session can send data due to + // session flow control. + bool IsSendStalled() const { + return + flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && + session_send_window_size_ == 0; + } + const BoundNetLog& net_log() const { return net_log_; } int GetPeerAddress(IPEndPoint* address) const; @@ -440,6 +449,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, private: friend class base::RefCounted<SpdySession>; friend class SpdyStreamRequest; + friend class SpdySessionSpdy3Test; // Allow tests to access our innards for testing purposes. FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy2Test, ClientPing); @@ -459,10 +469,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlInactiveStream31); FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31); - FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, - ResumeAfterSendWindowSizeIncrease31); - FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, - ResumeByPriorityAfterSendWindowSizeIncrease31); typedef std::deque<SpdyStreamRequest*> PendingStreamRequestQueue; typedef std::set<SpdyStreamRequest*> PendingStreamRequestCompletionSet; @@ -470,7 +476,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::map<int, scoped_refptr<SpdyStream> > ActiveStreamMap; typedef std::map<std::string, std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> > PushedStreamMap; - typedef std::priority_queue<SpdyIOBuffer> OutputQueue; typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet; typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap; @@ -663,6 +668,18 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // to overflow, does nothing. void IncreaseSendWindowSize(int32 delta_window_size); + // Queue a send-stalled stream for possibly resuming once we're not + // send-stalled anymore. + void QueueSendStalledStream(const scoped_refptr<SpdyStream>& stream); + + // Go through the queue of send-stalled streams and try to resume as + // many as possible. + void ResumeSendStalledStreams(); + + // Returns the next stream to possibly resume, or 0 if the queue is + // empty. + SpdyStreamId PopStreamToPossiblyResume(); + // If session flow control is turned on, called by CreateDataFrame() // (which is in turn called by a stream) to decrease this session's // send window size by |delta_window_size|, which must be at least 1 @@ -746,6 +763,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // them into a separate ActiveStreamMap, and not deliver network events to // them? ActiveStreamMap active_streams_; + // Map of all the streams that have already started to be pushed by the // server, but do not have consumers yet. PushedStreamMap unclaimed_pushed_streams_; @@ -847,6 +865,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, int32 session_recv_window_size_; int32 session_unacked_recv_window_bytes_; + // A queue of stream IDs that have been send-stalled at some point + // in the past. + std::deque<SpdyStreamId> stream_send_unstall_queue_[NUM_PRIORITIES]; + BoundNetLog net_log_; // Outside of tests, these should always be true. diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 8707048..7bf48f7 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -8,6 +8,7 @@ #include "base/memory/scoped_ptr.h" #include "base/memory/scoped_vector.h" #include "base/pending_task.h" +#include "base/string_piece.h" #include "base/string_util.h" #include "net/base/cert_test_util.h" #include "net/base/host_cache.h" @@ -36,6 +37,10 @@ static const char kTestUrl[] = "http://www.example.org/"; static const char kTestHost[] = "www.example.org"; static const int kTestPort = 80; +const char kBodyData[] = "Body data"; +const size_t kBodyDataSize = arraysize(kBodyData); +const base::StringPiece kBodyDataStringPiece(kBodyData, kBodyDataSize); + static int g_delta_seconds = 0; base::TimeTicks TheNearFuture() { return base::TimeTicks::Now() + base::TimeDelta::FromSeconds(g_delta_seconds); @@ -102,6 +107,18 @@ class SpdySessionSpdy3Test : public PlatformTest { return session->InitializeWithSocket(connection.release(), false, OK); } + void StallSessionSend(SpdySession* session) { + // Reduce the send window size to 0 to stall. + while (session->session_send_window_size_ > 0) { + session->DecreaseSendWindowSize( + std::min(kMaxSpdyFrameChunkSize, session->session_send_window_size_)); + } + } + + void UnstallSessionSend(SpdySession* session, int32 delta_window_size) { + session->IncreaseSendWindowSize(delta_window_size); + } + scoped_refptr<TransportSocketParams> transport_params_; SpdySessionDependencies session_deps_; scoped_refptr<HttpNetworkSession> http_session_; @@ -2313,10 +2330,8 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { ASSERT_TRUE(stream.get() != NULL); EXPECT_EQ(0u, stream->stream_id()); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(msg_data_size)); - memcpy(buf->data(), msg_data.data(), msg_data_size); test::StreamDelegateSendImmediate delegate( - stream.get(), scoped_ptr<SpdyHeaderBlock>(), buf.get()); + stream.get(), scoped_ptr<SpdyHeaderBlock>(), msg_data); stream->SetDelegate(&delegate); stream->set_spdy_headers( @@ -2377,12 +2392,9 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) { EXPECT_EQ(OK, delegate.WaitForClose()); } -// TODO(akalin): Re-enable the tests below when we fix the unstall bug -// mentioned in crbug.com/178943. - // Cause a stall by reducing the flow control send window to 0. The // stream should resume when that window is then increased. -TEST_F(SpdySessionSpdy3Test, DISABLED_ResumeAfterSendWindowSizeIncrease31) { +TEST_F(SpdySessionSpdy3Test, ResumeAfterSendWindowSizeIncrease31) { const char kStreamUrl[] = "http://www.google.com/"; GURL url(kStreamUrl); @@ -2393,8 +2405,6 @@ TEST_F(SpdySessionSpdy3Test, DISABLED_ResumeAfterSendWindowSizeIncrease31) { ConstructSpdyWindowUpdate( kSessionFlowControlStreamId, kDefaultInitialRecvWindowSize - kSpdySessionInitialWindowSize)); - const char kBodyData[] = "Body data"; - const size_t kBodyDataSize = arraysize(kBodyData); scoped_ptr<SpdyFrame> req( ConstructSpdyPost(kStreamUrl, 1, kBodyDataSize, LOWEST, NULL, 0)); scoped_ptr<SpdyFrame> msg( @@ -2431,10 +2441,8 @@ TEST_F(SpdySessionSpdy3Test, DISABLED_ResumeAfterSendWindowSizeIncrease31) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kBodyDataSize)); - memcpy(buf->data(), kBodyData, kBodyDataSize); - test::StreamDelegateWithBody delegate(stream.get(), buf); + test::StreamDelegateWithBody delegate(stream.get(), kBodyDataStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -2448,21 +2456,17 @@ TEST_F(SpdySessionSpdy3Test, DISABLED_ResumeAfterSendWindowSizeIncrease31) { data.RunFor(3); - EXPECT_FALSE(stream->stalled_by_flow_control()); + EXPECT_FALSE(stream->send_stalled_by_flow_control()); - // Reduce the send window size to 0 to stall. - while (session->session_send_window_size_ > 0) { - session->DecreaseSendWindowSize( - std::min(kMaxSpdyFrameChunkSize, session->session_send_window_size_)); - } + StallSessionSend(session); - stream->QueueStreamData(buf.get(), buf->size(), DATA_FLAG_NONE); + EXPECT_EQ(ERR_IO_PENDING, delegate.OnSendBody()); - EXPECT_TRUE(stream->stalled_by_flow_control()); + EXPECT_TRUE(stream->send_stalled_by_flow_control()); - session->IncreaseSendWindowSize(kBodyDataSize); + UnstallSessionSend(session, kBodyDataSize); - EXPECT_FALSE(stream->stalled_by_flow_control()); + EXPECT_FALSE(stream->send_stalled_by_flow_control()); data.RunFor(3); @@ -2471,22 +2475,20 @@ TEST_F(SpdySessionSpdy3Test, DISABLED_ResumeAfterSendWindowSizeIncrease31) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kBodyData, kBodyDataSize), delegate.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate.received_data()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate.body_data_sent()); } // Cause a stall by reducing the flow control send window to 0. The -// stream should resume when that window is then increased. -TEST_F(SpdySessionSpdy3Test, - DISABLED_ResumeByPriorityAfterSendWindowSizeIncrease31) { +// streams should resume in priority order when that window is then +// increased. +TEST_F(SpdySessionSpdy3Test, ResumeByPriorityAfterSendWindowSizeIncrease31) { const char kStreamUrl[] = "http://www.google.com/"; GURL url(kStreamUrl); session_deps_.enable_spdy_31 = true; session_deps_.host_resolver->set_synchronous_mode(true); - const char kBodyData[] = "Body data"; - const size_t kBodyDataSize = arraysize(kBodyData); scoped_ptr<SpdyFrame> initial_window_update( ConstructSpdyWindowUpdate( kSessionFlowControlStreamId, @@ -2538,10 +2540,8 @@ TEST_F(SpdySessionSpdy3Test, scoped_refptr<SpdyStream> stream1 = CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); ASSERT_TRUE(stream1.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kBodyDataSize)); - memcpy(buf->data(), kBodyData, kBodyDataSize); - test::StreamDelegateWithBody delegate1(stream1.get(), buf); + test::StreamDelegateWithBody delegate1(stream1.get(), kBodyDataStringPiece); stream1->SetDelegate(&delegate1); EXPECT_FALSE(stream1->HasUrl()); @@ -2559,10 +2559,8 @@ TEST_F(SpdySessionSpdy3Test, scoped_refptr<SpdyStream> stream2 = CreateStreamSynchronously(session, url, MEDIUM, BoundNetLog()); ASSERT_TRUE(stream2.get() != NULL); - scoped_refptr<IOBufferWithSize> buf2(new IOBufferWithSize(kBodyDataSize)); - memcpy(buf2->data(), kBodyData, kBodyDataSize); - test::StreamDelegateWithBody delegate2(stream2.get(), buf2); + test::StreamDelegateWithBody delegate2(stream2.get(), kBodyDataStringPiece); stream2->SetDelegate(&delegate2); EXPECT_FALSE(stream2->HasUrl()); @@ -2577,37 +2575,33 @@ TEST_F(SpdySessionSpdy3Test, data.RunFor(2); EXPECT_EQ(3u, stream2->stream_id()); - EXPECT_FALSE(stream1->stalled_by_flow_control()); - EXPECT_FALSE(stream2->stalled_by_flow_control()); + EXPECT_FALSE(stream1->send_stalled_by_flow_control()); + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); - // Reduce the send window size to 0 to stall. - while (session->session_send_window_size_ > 0) { - session->DecreaseSendWindowSize( - std::min(kMaxSpdyFrameChunkSize, session->session_send_window_size_)); - } + StallSessionSend(session); - stream1->QueueStreamData(buf.get(), buf->size(), DATA_FLAG_NONE); - stream2->QueueStreamData(buf.get(), buf->size(), DATA_FLAG_NONE); + EXPECT_EQ(ERR_IO_PENDING, delegate1.OnSendBody()); + EXPECT_EQ(ERR_IO_PENDING, delegate2.OnSendBody()); - EXPECT_TRUE(stream1->stalled_by_flow_control()); - EXPECT_TRUE(stream2->stalled_by_flow_control()); + EXPECT_TRUE(stream1->send_stalled_by_flow_control()); + EXPECT_TRUE(stream2->send_stalled_by_flow_control()); // This should unstall only stream2. - session->IncreaseSendWindowSize(kBodyDataSize); + UnstallSessionSend(session, kBodyDataSize); - EXPECT_TRUE(stream1->stalled_by_flow_control()); - EXPECT_FALSE(stream2->stalled_by_flow_control()); + EXPECT_TRUE(stream1->send_stalled_by_flow_control()); + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); data.RunFor(2); - EXPECT_TRUE(stream1->stalled_by_flow_control()); - EXPECT_FALSE(stream2->stalled_by_flow_control()); + EXPECT_TRUE(stream1->send_stalled_by_flow_control()); + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); // This should then unstall stream1. - session->IncreaseSendWindowSize(kBodyDataSize); + UnstallSessionSend(session, kBodyDataSize); - EXPECT_FALSE(stream1->stalled_by_flow_control()); - EXPECT_FALSE(stream2->stalled_by_flow_control()); + EXPECT_FALSE(stream1->send_stalled_by_flow_control()); + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); data.RunFor(3); @@ -2617,14 +2611,369 @@ TEST_F(SpdySessionSpdy3Test, EXPECT_TRUE(delegate1.send_headers_completed()); EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kBodyData, kBodyDataSize), delegate1.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate1.received_data()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); - EXPECT_EQ(std::string(kBodyData, kBodyDataSize), delegate2.received_data()); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate2.received_data()); EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent()); } +// Delegate that closes a given stream after sending its body. +class StreamClosingDelegate : public test::StreamDelegateWithBody { + public: + StreamClosingDelegate(const scoped_refptr<SpdyStream>& stream, + base::StringPiece data) + : StreamDelegateWithBody(stream, data) {} + + virtual ~StreamClosingDelegate() {} + + void set_stream_to_close(const scoped_refptr<SpdyStream>& stream_to_close) { + stream_to_close_ = stream_to_close; + } + + virtual int OnSendBody() OVERRIDE { + int rv = test::StreamDelegateWithBody::OnSendBody(); + if (stream_to_close_) { + stream_to_close_->Close(); + stream_to_close_ = NULL; + } + return rv; + } + + private: + scoped_refptr<SpdyStream> stream_to_close_; +}; + +// Cause a stall by reducing the flow control send window to +// 0. Unstalling the session should properly handle deleted streams. +TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) { + const char kStreamUrl[] = "http://www.google.com/"; + GURL url(kStreamUrl); + + session_deps_.enable_spdy_31 = true; + session_deps_.host_resolver->set_synchronous_mode(true); + + scoped_ptr<SpdyFrame> initial_window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, + kDefaultInitialRecvWindowSize - kSpdySessionInitialWindowSize)); + scoped_ptr<SpdyFrame> req1( + ConstructSpdyPost(kStreamUrl, 1, kBodyDataSize, LOWEST, NULL, 0)); + scoped_ptr<SpdyFrame> req2( + ConstructSpdyPost(kStreamUrl, 3, kBodyDataSize, LOWEST, NULL, 0)); + scoped_ptr<SpdyFrame> req3( + ConstructSpdyPost(kStreamUrl, 5, kBodyDataSize, LOWEST, NULL, 0)); + scoped_ptr<SpdyFrame> msg2( + ConstructSpdyBodyFrame(3, kBodyData, kBodyDataSize, false)); + MockWrite writes[] = { + CreateMockWrite(*initial_window_update, 0), + CreateMockWrite(*req1, 1), + CreateMockWrite(*req2, 3), + CreateMockWrite(*req3, 5), + CreateMockWrite(*msg2, 7), + }; + + scoped_ptr<SpdyFrame> resp1(ConstructSpdyGetSynReply(NULL, 0, 1)); + scoped_ptr<SpdyFrame> resp2(ConstructSpdyGetSynReply(NULL, 0, 3)); + scoped_ptr<SpdyFrame> resp3(ConstructSpdyGetSynReply(NULL, 0, 5)); + scoped_ptr<SpdyFrame> echo2( + ConstructSpdyBodyFrame(3, kBodyData, kBodyDataSize, false)); + MockRead reads[] = { + CreateMockRead(*resp1, 2), + CreateMockRead(*resp2, 4), + CreateMockRead(*resp3, 6), + CreateMockRead(*echo2, 8), + MockRead(ASYNC, 0, 0, 9), // EOF + }; + + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + MockConnect connect_data(SYNCHRONOUS, OK); + data.set_connect_data(connect_data); + + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + CreateDeterministicNetworkSession(); + scoped_refptr<SpdySession> session = GetSession(pair_); + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM_AND_SESSION, + session->flow_control_state()); + + scoped_refptr<SpdyStream> stream1 = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream1.get() != NULL); + + test::StreamDelegateWithBody delegate1(stream1.get(), kBodyDataStringPiece); + stream1->SetDelegate(&delegate1); + + EXPECT_FALSE(stream1->HasUrl()); + + stream1->set_spdy_headers( + ConstructPostHeaderBlock(kStreamUrl, kBodyDataSize)); + EXPECT_TRUE(stream1->HasUrl()); + EXPECT_EQ(kStreamUrl, stream1->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream1->SendRequest(true)); + + data.RunFor(3); + EXPECT_EQ(1u, stream1->stream_id()); + + scoped_refptr<SpdyStream> stream2 = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream2.get() != NULL); + + StreamClosingDelegate delegate2(stream2.get(), kBodyDataStringPiece); + stream2->SetDelegate(&delegate2); + + EXPECT_FALSE(stream2->HasUrl()); + + stream2->set_spdy_headers( + ConstructPostHeaderBlock(kStreamUrl, kBodyDataSize)); + EXPECT_TRUE(stream2->HasUrl()); + EXPECT_EQ(kStreamUrl, stream2->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream2->SendRequest(true)); + + data.RunFor(2); + EXPECT_EQ(3u, stream2->stream_id()); + + scoped_refptr<SpdyStream> stream3 = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream3.get() != NULL); + + test::StreamDelegateWithBody delegate3(stream3.get(), kBodyDataStringPiece); + stream3->SetDelegate(&delegate3); + + EXPECT_FALSE(stream3->HasUrl()); + + stream3->set_spdy_headers( + ConstructPostHeaderBlock(kStreamUrl, kBodyDataSize)); + EXPECT_TRUE(stream3->HasUrl()); + EXPECT_EQ(kStreamUrl, stream3->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream3->SendRequest(true)); + + data.RunFor(2); + EXPECT_EQ(5u, stream3->stream_id()); + + EXPECT_FALSE(stream1->send_stalled_by_flow_control()); + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); + EXPECT_FALSE(stream3->send_stalled_by_flow_control()); + + StallSessionSend(session); + + EXPECT_EQ(ERR_IO_PENDING, delegate1.OnSendBody()); + EXPECT_EQ(ERR_IO_PENDING, delegate2.OnSendBody()); + EXPECT_EQ(ERR_IO_PENDING, delegate3.OnSendBody()); + + EXPECT_TRUE(stream1->send_stalled_by_flow_control()); + EXPECT_TRUE(stream2->send_stalled_by_flow_control()); + EXPECT_TRUE(stream3->send_stalled_by_flow_control()); + + SpdyStreamId stream_id1 = stream1->stream_id(); + SpdyStreamId stream_id2 = stream2->stream_id(); + SpdyStreamId stream_id3 = stream3->stream_id(); + + // Close stream1 preemptively. + stream1 = NULL; + session->CloseStream(stream_id1, ERR_CONNECTION_CLOSED); + + EXPECT_FALSE(session->IsStreamActive(stream_id1)); + EXPECT_TRUE(session->IsStreamActive(stream_id2)); + EXPECT_TRUE(session->IsStreamActive(stream_id3)); + + // Unstall stream2, which should then close stream3. + delegate2.set_stream_to_close(stream3); + stream3 = NULL; + UnstallSessionSend(session, kBodyDataSize); + + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); + EXPECT_FALSE(session->IsStreamActive(stream_id1)); + EXPECT_TRUE(session->IsStreamActive(stream_id2)); + EXPECT_FALSE(session->IsStreamActive(stream_id3)); + + data.RunFor(3); + + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate1.WaitForClose()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate2.WaitForClose()); + EXPECT_EQ(OK, delegate3.WaitForClose()); + + EXPECT_TRUE(delegate1.send_headers_completed()); + EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); + EXPECT_EQ("", delegate1.received_data()); + EXPECT_EQ(0, delegate1.body_data_sent()); + + EXPECT_TRUE(delegate2.send_headers_completed()); + EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); + EXPECT_EQ(kBodyDataStringPiece.as_string(), delegate2.received_data()); + EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent()); + + EXPECT_TRUE(delegate3.send_headers_completed()); + EXPECT_EQ("200", delegate3.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate3.GetResponseHeaderValue(":version")); + EXPECT_EQ("", delegate3.received_data()); + EXPECT_EQ(0, delegate3.body_data_sent()); +} + +// Delegate that closes a given session after sending its body. +class SessionClosingDelegate : public test::StreamDelegateWithBody { + public: + SessionClosingDelegate(const scoped_refptr<SpdyStream>& stream, + base::StringPiece data) + : StreamDelegateWithBody(stream, data) {} + + virtual ~SessionClosingDelegate() {} + + void set_session_to_close( + const scoped_refptr<SpdySession>& session_to_close) { + session_to_close_ = session_to_close; + } + + virtual int OnSendBody() OVERRIDE { + int rv = test::StreamDelegateWithBody::OnSendBody(); + if (session_to_close_) { + session_to_close_->CloseSessionOnError( + ERR_CONNECTION_CLOSED, + true, + "Closed by SessionClosingDelegate"); + session_to_close_ = NULL; + } + return rv; + } + + private: + scoped_refptr<SpdySession> session_to_close_; +}; + +// Cause a stall by reducing the flow control send window to +// 0. Unstalling the session should properly handle the session itself +// being closed. +TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedSession31) { + const char kStreamUrl[] = "http://www.google.com/"; + GURL url(kStreamUrl); + + session_deps_.enable_spdy_31 = true; + session_deps_.host_resolver->set_synchronous_mode(true); + + scoped_ptr<SpdyFrame> initial_window_update( + ConstructSpdyWindowUpdate( + kSessionFlowControlStreamId, + kDefaultInitialRecvWindowSize - kSpdySessionInitialWindowSize)); + scoped_ptr<SpdyFrame> req1( + ConstructSpdyPost(kStreamUrl, 1, kBodyDataSize, LOWEST, NULL, 0)); + scoped_ptr<SpdyFrame> req2( + ConstructSpdyPost(kStreamUrl, 3, kBodyDataSize, LOWEST, NULL, 0)); + scoped_ptr<SpdyFrame> msg2( + ConstructSpdyBodyFrame(3, kBodyData, kBodyDataSize, false)); + MockWrite writes[] = { + CreateMockWrite(*initial_window_update, 0), + CreateMockWrite(*req1, 1), + CreateMockWrite(*req2, 3), + CreateMockWrite(*msg2, 5), + }; + + scoped_ptr<SpdyFrame> resp1(ConstructSpdyGetSynReply(NULL, 0, 1)); + scoped_ptr<SpdyFrame> resp2(ConstructSpdyGetSynReply(NULL, 0, 3)); + MockRead reads[] = { + CreateMockRead(*resp1, 2), + CreateMockRead(*resp2, 4), + MockRead(ASYNC, 0, 0, 6), // EOF + }; + + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + MockConnect connect_data(SYNCHRONOUS, OK); + data.set_connect_data(connect_data); + + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + CreateDeterministicNetworkSession(); + scoped_refptr<SpdySession> session = GetSession(pair_); + InitializeSession( + http_session_.get(), session.get(), test_host_port_pair_); + EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM_AND_SESSION, + session->flow_control_state()); + + scoped_refptr<SpdyStream> stream1 = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream1.get() != NULL); + + SessionClosingDelegate delegate1(stream1.get(), kBodyDataStringPiece); + stream1->SetDelegate(&delegate1); + + EXPECT_FALSE(stream1->HasUrl()); + + stream1->set_spdy_headers( + ConstructPostHeaderBlock(kStreamUrl, kBodyDataSize)); + EXPECT_TRUE(stream1->HasUrl()); + EXPECT_EQ(kStreamUrl, stream1->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream1->SendRequest(true)); + + data.RunFor(3); + EXPECT_EQ(1u, stream1->stream_id()); + + scoped_refptr<SpdyStream> stream2 = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream2.get() != NULL); + + test::StreamDelegateWithBody delegate2(stream2.get(), kBodyDataStringPiece); + stream2->SetDelegate(&delegate2); + + EXPECT_FALSE(stream2->HasUrl()); + + stream2->set_spdy_headers( + ConstructPostHeaderBlock(kStreamUrl, kBodyDataSize)); + EXPECT_TRUE(stream2->HasUrl()); + EXPECT_EQ(kStreamUrl, stream2->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream2->SendRequest(true)); + + data.RunFor(2); + EXPECT_EQ(3u, stream2->stream_id()); + + EXPECT_FALSE(stream1->send_stalled_by_flow_control()); + EXPECT_FALSE(stream2->send_stalled_by_flow_control()); + + StallSessionSend(session); + + EXPECT_EQ(ERR_IO_PENDING, delegate1.OnSendBody()); + EXPECT_EQ(ERR_IO_PENDING, delegate2.OnSendBody()); + + EXPECT_TRUE(stream1->send_stalled_by_flow_control()); + EXPECT_TRUE(stream2->send_stalled_by_flow_control()); + + EXPECT_TRUE(spdy_session_pool_->HasSession(pair_)); + + // Unstall stream1, which should then close the session. + delegate1.set_session_to_close(session); + stream1 = NULL; + stream2 = NULL; + UnstallSessionSend(session, kBodyDataSize); + session = NULL; + + EXPECT_FALSE(spdy_session_pool_->HasSession(pair_)); + + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate1.WaitForClose()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate2.WaitForClose()); + + EXPECT_TRUE(delegate1.send_headers_completed()); + EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); + EXPECT_EQ("", delegate1.received_data()); + EXPECT_EQ(0, delegate1.body_data_sent()); + + EXPECT_TRUE(delegate2.send_headers_completed()); + EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); + EXPECT_EQ("", delegate2.received_data()); + EXPECT_EQ(0, delegate2.body_data_sent()); +} + } // namespace net diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index a7868ef..6767821 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -56,7 +56,7 @@ SpdyStream::SpdyStream(SpdySession* session, stream_id_(0), priority_(HIGHEST), slot_(0), - stalled_by_flow_control_(false), + send_stalled_by_flow_control_(false), send_window_size_(kSpdyStreamInitialWindowSize), recv_window_size_(kSpdyStreamInitialWindowSize), unacked_recv_window_bytes_(0), @@ -236,16 +236,6 @@ void SpdyStream::set_spdy_headers(scoped_ptr<SpdyHeaderBlock> headers) { request_.reset(headers.release()); } -void SpdyStream::PossiblyResumeIfStalled() { - DCHECK(!closed()); - - if (send_window_size_ > 0 && stalled_by_flow_control_) { - stalled_by_flow_control_ = false; - io_state_ = STATE_SEND_BODY; - DoLoop(OK); - } -} - void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); @@ -260,7 +250,7 @@ void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { DCHECK_GE(delta_window_size, kint32min - send_window_size_); } send_window_size_ += delta_window_size; - PossiblyResumeIfStalled(); + PossiblyResumeIfSendStalled(); } void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { @@ -292,7 +282,7 @@ void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_, delta_window_size, send_window_size_)); - PossiblyResumeIfStalled(); + PossiblyResumeIfSendStalled(); } void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { @@ -625,6 +615,17 @@ bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { return session_->GetSSLCertRequestInfo(cert_request_info); } +void SpdyStream::PossiblyResumeIfSendStalled() { + DCHECK(!closed()); + + if (send_stalled_by_flow_control_ && !session_->IsSendStalled() && + send_window_size_ > 0) { + send_stalled_by_flow_control_ = false; + io_state_ = STATE_SEND_BODY; + DoLoop(OK); + } +} + bool SpdyStream::HasUrl() const { if (pushed_) return response_received(); diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 2544fff..db04f68 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -151,10 +151,10 @@ class NET_EXPORT_PRIVATE SpdyStream recv_window_size_ = window_size; } - bool stalled_by_flow_control() { return stalled_by_flow_control_; } + bool send_stalled_by_flow_control() { return send_stalled_by_flow_control_; } - void set_stalled_by_flow_control(bool stalled) { - stalled_by_flow_control_ = stalled; + void set_send_stalled_by_flow_control(bool stalled) { + send_stalled_by_flow_control_ = stalled; } // If stream flow control is turned on, called by the session to @@ -279,6 +279,13 @@ class NET_EXPORT_PRIVATE SpdyStream // true when SSL is in use. bool GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info); + // If the stream is stalled on sending data, but the session is not + // stalled on sending data and |send_window_size_| is positive, then + // set |send_stalled_by_flow_control_| to false and unstall the data + // sending. Called by the session or by the stream itself. Must be + // called only when the stream is still open. + void PossiblyResumeIfSendStalled(); + bool is_idle() const { return io_state_ == STATE_OPEN || io_state_ == STATE_DONE; } @@ -316,11 +323,6 @@ class NET_EXPORT_PRIVATE SpdyStream virtual ~SpdyStream(); - // If the stream is stalled and if |send_window_size_| is positive, - // then set |stalled_by_flow_control_| to false and unstall the - // stream. Must be called only when the stream is still open. - void PossiblyResumeIfStalled(); - void OnGetDomainBoundCertComplete(int result); // Try to make progress sending/receiving the request/response. @@ -373,7 +375,7 @@ class NET_EXPORT_PRIVATE SpdyStream size_t slot_; // Flow control variables. - bool stalled_by_flow_control_; + bool send_stalled_by_flow_control_; int32 send_window_size_; int32 recv_window_size_; int32 unacked_recv_window_bytes_; diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index 67461d5..755e55c 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -7,6 +7,7 @@ #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/string_piece.h" #include "net/base/completion_callback.h" #include "net/base/net_log_unittest.h" #include "net/spdy/buffered_spdy_framer.h" @@ -33,6 +34,7 @@ namespace { const char kStreamUrl[] = "http://www.google.com/"; const char kPostBody[] = "\0hello!\xff"; const size_t kPostBodyLength = arraysize(kPostBody); +const base::StringPiece kPostBodyStringPiece(kPostBody, kPostBodyLength); class SpdyStreamSpdy2Test : public testing::Test { protected: @@ -112,11 +114,9 @@ TEST_F(SpdyStreamSpdy2Test, SendDataAfterOpen) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kPostBodyLength)); - memcpy(buf->data(), kPostBody, kPostBodyLength); StreamDelegateSendImmediate delegate( - stream.get(), scoped_ptr<SpdyHeaderBlock>(), buf.get()); + stream.get(), scoped_ptr<SpdyHeaderBlock>(), kPostBodyStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -183,15 +183,13 @@ TEST_F(SpdyStreamSpdy2Test, SendHeaderAndDataAfterOpen) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, HIGHEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(6)); - memcpy(buf->data(), "hello!", 6); scoped_ptr<SpdyHeaderBlock> message_headers(new SpdyHeaderBlock); (*message_headers)["opcode"] = "1"; (*message_headers)["length"] = "6"; (*message_headers)["fin"] = "1"; StreamDelegateSendImmediate delegate( - stream.get(), message_headers.Pass(), buf.get()); + stream.get(), message_headers.Pass(), base::StringPiece("hello!", 6)); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -303,11 +301,9 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, log.bound()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kPostBodyLength)); - memcpy(buf->data(), kPostBody, kPostBodyLength); StreamDelegateSendImmediate delegate( - stream.get(), scoped_ptr<SpdyHeaderBlock>(), buf.get()); + stream.get(), scoped_ptr<SpdyHeaderBlock>(), kPostBodyStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index 5d8a722..a50f8dc 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -5,6 +5,7 @@ #include "base/basictypes.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/string_piece.h" #include "net/base/completion_callback.h" #include "net/base/net_log_unittest.h" #include "net/spdy/buffered_spdy_framer.h" @@ -31,6 +32,7 @@ namespace { const char kStreamUrl[] = "http://www.google.com/"; const char kPostBody[] = "\0hello!\xff"; const size_t kPostBodyLength = arraysize(kPostBody); +const base::StringPiece kPostBodyStringPiece(kPostBody, kPostBodyLength); class SpdyStreamSpdy3Test : public testing::Test { protected: @@ -109,11 +111,9 @@ TEST_F(SpdyStreamSpdy3Test, SendDataAfterOpen) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kPostBodyLength)); - memcpy(buf->data(), kPostBody, kPostBodyLength); StreamDelegateSendImmediate delegate( - stream.get(), scoped_ptr<SpdyHeaderBlock>(), buf.get()); + stream.get(), scoped_ptr<SpdyHeaderBlock>(), kPostBodyStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -180,8 +180,6 @@ TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, HIGHEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(6)); - memcpy(buf->data(), "hello!", 6); TestCompletionCallback callback; scoped_ptr<SpdyHeaderBlock> message_headers(new SpdyHeaderBlock); (*message_headers)[":opcode"] = "1"; @@ -189,7 +187,7 @@ TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) { (*message_headers)[":fin"] = "1"; StreamDelegateSendImmediate delegate( - stream.get(), message_headers.Pass(), buf.get()); + stream.get(), message_headers.Pass(), base::StringPiece("hello!", 6)); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -303,11 +301,9 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, log.bound()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kPostBodyLength)); - memcpy(buf->data(), kPostBody, kPostBodyLength); StreamDelegateSendImmediate delegate( - stream.get(), scoped_ptr<SpdyHeaderBlock>(), buf.get()); + stream.get(), scoped_ptr<SpdyHeaderBlock>(), kPostBodyStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -381,11 +377,9 @@ TEST_F(SpdyStreamSpdy3Test, IncreaseSendWindowSizeOverflow) { CreateStreamSynchronously(session, url, LOWEST, log.bound()); ASSERT_TRUE(stream.get() != NULL); - scoped_ptr<StreamDelegateSendImmediate> delegate( - new StreamDelegateSendImmediate( - stream.get(), scoped_ptr<SpdyHeaderBlock>(), - new IOBufferWithSize(kPostBodyLength))); - stream->SetDelegate(delegate.get()); + StreamDelegateSendImmediate delegate( + stream.get(), scoped_ptr<SpdyHeaderBlock>(), kPostBodyStringPiece); + stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); EXPECT_EQ(0u, stream->stream_id()); @@ -397,11 +391,11 @@ TEST_F(SpdyStreamSpdy3Test, IncreaseSendWindowSizeOverflow) { stream->IncreaseSendWindowSize(delta_window_size); EXPECT_EQ(old_send_window_size, stream->send_window_size()); - EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate->WaitForClose()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose()); } -// Cause a stall by reducing the flow control send window to 0. The -// stream should resume when that window is then increased. +// Cause a send stall by reducing the flow control send window to +// 0. The stream should resume when that window is then increased. TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncrease) { GURL url(kStreamUrl); @@ -440,10 +434,8 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncrease) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kPostBodyLength)); - memcpy(buf->data(), kPostBody, kPostBodyLength); - StreamDelegateWithBody delegate(stream.get(), buf); + StreamDelegateWithBody delegate(stream.get(), kPostBodyStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -457,7 +449,7 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncrease) { data.RunFor(2); - EXPECT_FALSE(stream->stalled_by_flow_control()); + EXPECT_FALSE(stream->send_stalled_by_flow_control()); // Reduce the send window size to 0 to stall. while (stream->send_window_size() > 0) { @@ -465,13 +457,13 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncrease) { std::min(kMaxSpdyFrameChunkSize, stream->send_window_size())); } - stream->QueueStreamData(buf.get(), buf->size(), DATA_FLAG_NONE); + EXPECT_EQ(ERR_IO_PENDING, delegate.OnSendBody()); - EXPECT_TRUE(stream->stalled_by_flow_control()); + EXPECT_TRUE(stream->send_stalled_by_flow_control()); stream->IncreaseSendWindowSize(kPostBodyLength); - EXPECT_FALSE(stream->stalled_by_flow_control()); + EXPECT_FALSE(stream->send_stalled_by_flow_control()); data.RunFor(3); @@ -484,8 +476,9 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncrease) { EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.body_data_sent()); } -// Cause a stall by reducing the flow control send window to 0. The -// stream should resume when that window is then adjusted positively. +// Cause a send stall by reducing the flow control send window to +// 0. The stream should resume when that window is then adjusted +// positively. TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjust) { GURL url(kStreamUrl); @@ -524,10 +517,8 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjust) { scoped_refptr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); ASSERT_TRUE(stream.get() != NULL); - scoped_refptr<IOBufferWithSize> buf(new IOBufferWithSize(kPostBodyLength)); - memcpy(buf->data(), kPostBody, kPostBodyLength); - StreamDelegateWithBody delegate(stream.get(), buf); + StreamDelegateWithBody delegate(stream.get(), kPostBodyStringPiece); stream->SetDelegate(&delegate); EXPECT_FALSE(stream->HasUrl()); @@ -541,7 +532,7 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjust) { data.RunFor(2); - EXPECT_FALSE(stream->stalled_by_flow_control()); + EXPECT_FALSE(stream->send_stalled_by_flow_control()); // Reduce the send window size to 0 to stall. while (stream->send_window_size() > 0) { @@ -549,21 +540,21 @@ TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjust) { std::min(kMaxSpdyFrameChunkSize, stream->send_window_size())); } - stream->QueueStreamData(buf.get(), buf->size(), DATA_FLAG_NONE); + EXPECT_EQ(ERR_IO_PENDING, delegate.OnSendBody()); - EXPECT_TRUE(stream->stalled_by_flow_control()); + EXPECT_TRUE(stream->send_stalled_by_flow_control()); stream->AdjustSendWindowSize(-static_cast<int>(kPostBodyLength)); - EXPECT_TRUE(stream->stalled_by_flow_control()); + EXPECT_TRUE(stream->send_stalled_by_flow_control()); stream->AdjustSendWindowSize(kPostBodyLength); - EXPECT_TRUE(stream->stalled_by_flow_control()); + EXPECT_TRUE(stream->send_stalled_by_flow_control()); stream->AdjustSendWindowSize(kPostBodyLength); - EXPECT_FALSE(stream->stalled_by_flow_control()); + EXPECT_FALSE(stream->send_stalled_by_flow_control()); data.RunFor(3); diff --git a/net/spdy/spdy_stream_test_util.cc b/net/spdy/spdy_stream_test_util.cc index c608dbc..3b2235f 100644 --- a/net/spdy/spdy_stream_test_util.cc +++ b/net/spdy/spdy_stream_test_util.cc @@ -107,10 +107,10 @@ std::string StreamDelegateBase::GetResponseHeaderValue( StreamDelegateSendImmediate::StreamDelegateSendImmediate( const scoped_refptr<SpdyStream>& stream, scoped_ptr<SpdyHeaderBlock> headers, - IOBufferWithSize* buf) + base::StringPiece data) : StreamDelegateBase(stream), headers_(headers.Pass()), - buf_(buf) {} + data_(data) {} StreamDelegateSendImmediate::~StreamDelegateSendImmediate() { } @@ -135,17 +135,19 @@ int StreamDelegateSendImmediate::OnResponseReceived( if (headers_.get()) { stream()->QueueHeaders(headers_.Pass()); } - if (buf_) { - stream()->QueueStreamData(buf_.get(), buf_->size(), DATA_FLAG_NONE); + if (data_.data()) { + scoped_refptr<StringIOBuffer> buf(new StringIOBuffer(data_.as_string())); + stream()->QueueStreamData(buf, buf->size(), DATA_FLAG_NONE); } return status; } StreamDelegateWithBody::StreamDelegateWithBody( const scoped_refptr<SpdyStream>& stream, - IOBufferWithSize* buf) + base::StringPiece data) : StreamDelegateBase(stream), - buf_(new DrainableIOBuffer(buf, buf->size())), + buf_(new DrainableIOBuffer(new StringIOBuffer(data.as_string()), + data.size())), body_data_sent_(0) {} StreamDelegateWithBody::~StreamDelegateWithBody() { diff --git a/net/spdy/spdy_stream_test_util.h b/net/spdy/spdy_stream_test_util.h index b72bd80..bc82365 100644 --- a/net/spdy/spdy_stream_test_util.h +++ b/net/spdy/spdy_stream_test_util.h @@ -8,6 +8,8 @@ #include "base/compiler_specific.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/string_piece.h" +#include "net/base/io_buffer.h" #include "net/base/test_completion_callback.h" #include "net/spdy/spdy_stream.h" @@ -85,7 +87,7 @@ class StreamDelegateSendImmediate : public StreamDelegateBase { // Both |headers| and |buf| can be NULL. StreamDelegateSendImmediate(const scoped_refptr<SpdyStream>& stream, scoped_ptr<SpdyHeaderBlock> headers, - IOBufferWithSize* buf); + base::StringPiece data); virtual ~StreamDelegateSendImmediate(); virtual int OnSendBody() OVERRIDE; @@ -96,14 +98,14 @@ class StreamDelegateSendImmediate : public StreamDelegateBase { private: scoped_ptr<SpdyHeaderBlock> headers_; - scoped_refptr<IOBufferWithSize> buf_; + base::StringPiece data_; }; // Test delegate that sends body data. class StreamDelegateWithBody : public StreamDelegateBase { public: StreamDelegateWithBody(const scoped_refptr<SpdyStream>& stream, - IOBufferWithSize* buf); + base::StringPiece data); virtual ~StreamDelegateWithBody(); virtual int OnSendBody() OVERRIDE; |