From 450c50209ee3bfddef034f09d8a6fa3c47d979d9 Mon Sep 17 00:00:00 2001 From: "agayev@chromium.org" Date: Thu, 26 Aug 2010 02:38:28 +0000 Subject: SPDY flow control: add support for receive window size BUG=48100 TEST=net_unittests Review URL: http://codereview.chromium.org/3137014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@57453 0039d316-1c4b-4281-b951-d872f2087c98 --- net/http/http_network_layer.cc | 2 +- net/http/http_network_transaction.h | 1 + net/http/http_stream_handle.h | 2 + net/spdy/spdy_http_stream.cc | 2 + net/spdy/spdy_network_transaction_unittest.cc | 105 +++++++++++++++++++++++--- net/spdy/spdy_session.cc | 16 ++++ net/spdy/spdy_session.h | 13 +++- net/spdy/spdy_stream.cc | 35 +++++++++ net/spdy/spdy_stream.h | 16 ++++ 9 files changed, 179 insertions(+), 13 deletions(-) (limited to 'net') diff --git a/net/http/http_network_layer.cc b/net/http/http_network_layer.cc index 95f9ea9..292be21 100644 --- a/net/http/http_network_layer.cc +++ b/net/http/http_network_layer.cc @@ -210,7 +210,7 @@ void HttpNetworkLayer::EnableSpdy(const std::string& mode) { use_alt_protocols = false; HttpStreamFactory::set_use_alternate_protocols(false); } else if (option == kEnableFlowControl) { - SpdySession::SetFlowControl(true); + SpdySession::set_flow_control(true); } else if (option == kForceAltProtocols) { HttpAlternateProtocols::PortProtocolPair pair; pair.port = 443; diff --git a/net/http/http_network_transaction.h b/net/http/http_network_transaction.h index c4cb8d0..718a770 100644 --- a/net/http/http_network_transaction.h +++ b/net/http/http_network_transaction.h @@ -69,6 +69,7 @@ class HttpNetworkTransaction : public HttpTransaction, private: FRIEND_TEST_ALL_PREFIXES(HttpNetworkTransactionTest, ResetStateForRestart); FRIEND_TEST_ALL_PREFIXES(SpdyNetworkTransactionTest, WindowUpdateReceived); + FRIEND_TEST_ALL_PREFIXES(SpdyNetworkTransactionTest, WindowUpdateSent); FRIEND_TEST_ALL_PREFIXES(SpdyNetworkTransactionTest, WindowUpdateOverflow); FRIEND_TEST_ALL_PREFIXES(SpdyNetworkTransactionTest, FlowControlStallResume); diff --git a/net/http/http_stream_handle.h b/net/http/http_stream_handle.h index 2d43abb..07e7d69 100644 --- a/net/http/http_stream_handle.h +++ b/net/http/http_stream_handle.h @@ -96,6 +96,8 @@ class HttpStreamHandle : public HttpStream { } private: + FRIEND_TEST_ALL_PREFIXES(SpdyNetworkTransactionTest, WindowUpdateSent); + scoped_ptr connection_; scoped_ptr stream_; diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index de0cc14..8072d53 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -246,6 +246,8 @@ int SpdyHttpStream::ReadResponseBody( } bytes_read += bytes_to_copy; } + if (spdy_session_->flow_control()) + stream_->IncreaseRecvWindowSize(bytes_read); return bytes_read; } else if (stream_->closed()) { return stream_->response_status(); diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index b3738a6..884ec06 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -1447,12 +1447,12 @@ TEST_P(SpdyNetworkTransactionTest, ResponseWithTwoSynReplies) { // a reply with a body, to cause a graceful shutdown. // TODO(agayev): develop a socket data provider where both, reads and -// writes are ordered so that writing tests like these are easy, right now -// we are working around the limitations as described above and it's not -// deterministic, tests may fail under specific circumstances. -// TODO(mbelshe): Disabling until we have deterministic sockets! -TEST_P(SpdyNetworkTransactionTest, DISABLED_WindowUpdateReceived) { - SpdySession::SetFlowControl(true); +// writes are ordered so that writing tests like these are easy and rewrite +// all these tests using it. Right now we are working around the +// limitations as described above and it's not deterministic, tests may +// fail under specific circumstances. +TEST_P(SpdyNetworkTransactionTest, WindowUpdateReceived) { + SpdySession::set_flow_control(true); static int kFrameCount = 2; scoped_ptr content( @@ -1480,6 +1480,7 @@ TEST_P(SpdyNetworkTransactionTest, DISABLED_WindowUpdateReceived) { MockRead reads[] = { CreateMockRead(*window_update_dummy), CreateMockRead(*window_update_dummy), + CreateMockRead(*window_update_dummy), CreateMockRead(*window_update), // Four updates, therefore window CreateMockRead(*window_update), // size should increase by CreateMockRead(*window_update), // kDeltaWindowSize * 4 @@ -1523,13 +1524,95 @@ TEST_P(SpdyNetworkTransactionTest, DISABLED_WindowUpdateReceived) { kMaxSpdyFrameChunkSize * kFrameCount, stream->stream()->send_window_size()); helper.VerifyDataConsumed(); - SpdySession::SetFlowControl(false); + SpdySession::set_flow_control(false); +} + +// Test that received data frames and sent WINDOW_UPDATE frames change +// the recv_window_size_ correctly. +TEST_P(SpdyNetworkTransactionTest, WindowUpdateSent) { + SpdySession::set_flow_control(true); + + scoped_ptr req(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); + scoped_ptr window_update( + ConstructSpdyWindowUpdate(1, kUploadDataSize)); + + MockWrite writes[] = { + CreateMockWrite(*req), + CreateMockWrite(*window_update), + }; + + scoped_ptr resp( + ConstructSpdyGetSynReply(NULL, 0, 1)); + scoped_ptr body_no_fin( + ConstructSpdyBodyFrame(1, false)); + scoped_ptr body_fin( + ConstructSpdyBodyFrame(1, NULL, 0, true)); + MockRead reads[] = { + CreateMockRead(*resp), + CreateMockRead(*body_no_fin), + MockRead(true, ERR_IO_PENDING, 0), // Force a pause + CreateMockRead(*body_fin), + MockRead(true, ERR_IO_PENDING, 0), // Force a pause + MockRead(true, 0, 0) // EOF + }; + + scoped_refptr data( + new DelayedSocketData(1, reads, arraysize(reads), + writes, arraysize(writes))); + + NormalSpdyTransactionHelper helper(CreateGetRequest(), + BoundNetLog(), GetParam()); + helper.AddData(data.get()); + helper.RunPreTestSetup(); + HttpNetworkTransaction* trans = helper.trans(); + + TestCompletionCallback callback; + int rv = trans->Start(&helper.request(), &callback, BoundNetLog()); + + EXPECT_EQ(ERR_IO_PENDING, rv); + rv = callback.WaitForResult(); + EXPECT_EQ(OK, rv); + + SpdyHttpStream* stream = + static_cast(trans->stream_.get()->stream_.get()); + ASSERT_TRUE(stream != NULL); + ASSERT_TRUE(stream->stream() != NULL); + + EXPECT_EQ(spdy::kInitialWindowSize - kUploadDataSize, + stream->stream()->recv_window_size()); + + const HttpResponseInfo* response = trans->GetResponseInfo(); + ASSERT_TRUE(response != NULL); + ASSERT_TRUE(response->headers != NULL); + EXPECT_EQ("HTTP/1.1 200 OK", response->headers->GetStatusLine()); + EXPECT_TRUE(response->was_fetched_via_spdy); + + // Issue a read which will cause a WINDOW_UPDATE to be sent and window + // size increased to default. + scoped_refptr buf = new net::IOBuffer(kUploadDataSize); + rv = trans->Read(buf, kUploadDataSize, NULL); + EXPECT_EQ(kUploadDataSize, rv); + std::string content(buf->data(), buf->data()+kUploadDataSize); + EXPECT_STREQ(kUploadData, content.c_str()); + + // Schedule the reading of empty data frame with FIN + data->CompleteRead(); + + // Force write of WINDOW_UPDATE which was scheduled during the above + // read. + MessageLoop::current()->RunAllPending(); + + // Read EOF. + data->CompleteRead(); + + helper.VerifyDataConsumed(); + SpdySession::set_flow_control(false); } // Test that WINDOW_UPDATE frame causing overflow is handled correctly. We // use the same trick as in the above test to enforce our scenario. TEST_P(SpdyNetworkTransactionTest, WindowUpdateOverflow) { - SpdySession::SetFlowControl(true); + SpdySession::set_flow_control(true); // number of full frames we hope to write (but will not, used to // set content-length header correctly) @@ -1602,7 +1685,7 @@ TEST_P(SpdyNetworkTransactionTest, WindowUpdateOverflow) { helper.session()->spdy_session_pool()->CloseAllSessions(); helper.VerifyDataConsumed(); - SpdySession::SetFlowControl(false); + SpdySession::set_flow_control(false); } // Test that after hitting a send window size of 0, the write process @@ -1621,7 +1704,7 @@ TEST_P(SpdyNetworkTransactionTest, WindowUpdateOverflow) { // After that, next read is artifically enforced, which causes a // WINDOW_UPDATE to be read and I/O process resumes. TEST_P(SpdyNetworkTransactionTest, FlowControlStallResume) { - SpdySession::SetFlowControl(true); + SpdySession::set_flow_control(true); // Number of frames we need to send to zero out the window size: data // frames plus SYN_STREAM plus the last data frame; also we need another @@ -1711,7 +1794,7 @@ TEST_P(SpdyNetworkTransactionTest, FlowControlStallResume) { rv = callback.WaitForResult(); helper.VerifyDataConsumed(); - SpdySession::SetFlowControl(false); + SpdySession::set_flow_control(false); } TEST_P(SpdyNetworkTransactionTest, CancelledTransaction) { diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 64a04cc..8738a6a 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -172,6 +172,7 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, received_settings_(false), in_session_pool_(true), initial_send_window_size_(spdy::kInitialWindowSize), + initial_recv_window_size_(spdy::kInitialWindowSize), net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) { net_log_.BeginEvent( NetLog::TYPE_SPDY_SESSION, @@ -369,6 +370,7 @@ int SpdySession::CreateStreamImpl( stream->set_path(path); stream->set_net_log(stream_net_log); stream->set_send_window_size(initial_send_window_size_); + stream->set_recv_window_size(initial_recv_window_size_); ActivateStream(stream); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", @@ -1255,6 +1257,20 @@ void SpdySession::OnWindowUpdate( stream->IncreaseSendWindowSize(delta_window_size); } +void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id, + int delta_window_size) { + DCHECK(IsStreamActive(stream_id)); + scoped_refptr stream = active_streams_[stream_id]; + CHECK_EQ(stream->stream_id(), stream_id); + + LOG(INFO) << "Sending a WINDOW_UPDATE frame for stream " << stream_id + << " with delta window size " << delta_window_size; + + scoped_ptr window_update_frame( + spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size)); + QueueFrame(window_update_frame.get(), stream->priority(), stream); +} + void SpdySession::SendSettings() { const SpdySettingsStorage& settings_storage = session_->spdy_settings(); const spdy::SpdySettings& settings = settings_storage.Get(host_port_pair()); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 480f9ad..d200e6c 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -141,7 +141,12 @@ class SpdySession : public base::RefCounted, static bool SSLMode() { return use_ssl_; } // Enable or disable flow control. - static void SetFlowControl(bool enable) { use_flow_control_ = enable; } + static void set_flow_control(bool enable) { use_flow_control_ = enable; } + static bool flow_control() { return use_flow_control_; } + + // Send WINDOW_UPDATE frame, called by a stream whenever receive window + // size is increased. + void SendWindowUpdate(spdy::SpdyStreamId stream_id, int delta_window_size); // If session is closed, no new streams/transactions should be created. bool IsClosed() const { return state_ == CLOSED; } @@ -366,6 +371,12 @@ class SpdySession : public base::RefCounted, // initial send window size. int initial_send_window_size_; + // Initial receive window size for the session; there are plans to add a + // command line switch that would cause a SETTINGS frame with window size + // announcement to be sent on startup; newly created streams will use + // this value for the initial receive window size. + int initial_recv_window_size_; + BoundNetLog net_log_; static bool use_ssl_; diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 95e8496..d9af4c0 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -18,6 +18,7 @@ SpdyStream::SpdyStream( priority_(0), stalled_by_flow_control_(false), send_window_size_(spdy::kInitialWindowSize), + recv_window_size_(spdy::kInitialWindowSize), pushed_(pushed), metrics_(Singleton::get()), response_received_(false), @@ -142,6 +143,37 @@ void SpdyStream::DecreaseSendWindowSize(int delta_window_size) { send_window_size_ -= delta_window_size; } +void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) { + DCHECK_GE(delta_window_size, 1); + // By the time a read is isued, stream may become inactive. + if (!session_->IsStreamActive(stream_id_)) + return; + int new_window_size = recv_window_size_ + delta_window_size; + if (recv_window_size_ > 0) + DCHECK(new_window_size > 0); + + LOG(INFO) << "Increasing stream " << stream_id_ + << " recv_window_size_ [current:" << recv_window_size_ << "]" + << " by " << delta_window_size << " bytes"; + recv_window_size_ = new_window_size; + session_->SendWindowUpdate(stream_id_, delta_window_size); +} + +void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) { + DCHECK_GE(delta_window_size, 1); + + LOG(INFO) << "Decreasing stream " << stream_id_ + << " recv_window_size_ [current:" << recv_window_size_ << "]" + << " by " << delta_window_size << " bytes"; + recv_window_size_ -= delta_window_size; + + // Since we never decrease the initial window size, we should never hit + // a negative |recv_window_size_|, if we do, it's a flow-control violation. + if (recv_window_size_ < 0) + session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR); +} + + base::Time SpdyStream::GetRequestTime() const { return request_time_; } @@ -217,6 +249,9 @@ void SpdyStream::OnDataReceived(const char* data, int length) { return; } + if (session_->flow_control()) + DecreaseRecvWindowSize(length); + // Track our bandwidth. metrics_.RecordBytes(length); recv_bytes_ += length; diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 91c6a36..14580ff 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -109,6 +109,11 @@ class SpdyStream : public base::RefCounted { send_window_size_ = window_size; } + int recv_window_size() const { return recv_window_size_; } + void set_recv_window_size(int window_size) { + recv_window_size_ = window_size; + } + void set_stalled_by_flow_control(bool stalled) { stalled_by_flow_control_ = stalled; } @@ -121,6 +126,16 @@ class SpdyStream : public base::RefCounted { // Decreases |send_window_size_| by the given number of bytes. void DecreaseSendWindowSize(int delta_window_size); + // Increases |recv_window_size_| by the given number of bytes, also sends + // a WINDOW_UPDATE frame. + void IncreaseRecvWindowSize(int delta_window_size); + + // Decreases |recv_window_size_| by the given number of bytes, called + // whenever data is read. May also send a RST_STREAM and remove the + // stream from the session if the resultant |recv_window_size_| is + // negative, since that would be a flow control violation. + void DecreaseRecvWindowSize(int delta_window_size); + const BoundNetLog& net_log() const { return net_log_; } void set_net_log(const BoundNetLog& log) { net_log_ = log; } @@ -229,6 +244,7 @@ class SpdyStream : public base::RefCounted { // Flow control variables. bool stalled_by_flow_control_; int send_window_size_; + int recv_window_size_; const bool pushed_; ScopedBandwidthMetrics metrics_; -- cgit v1.1