diff options
author | jgraettinger@chromium.org <jgraettinger@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-06-05 03:36:24 +0000 |
---|---|---|
committer | jgraettinger@chromium.org <jgraettinger@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-06-05 03:36:24 +0000 |
commit | 975da41aab312ba7cc270462d02931452f51d953 (patch) | |
tree | d40126c5d42b09caded96a6de8ebeeb042071f3e | |
parent | da0c65087c4ed0811fbdb00bacc62bc8429b27e7 (diff) | |
download | chromium_src-975da41aab312ba7cc270462d02931452f51d953.zip chromium_src-975da41aab312ba7cc270462d02931452f51d953.tar.gz chromium_src-975da41aab312ba7cc270462d02931452f51d953.tar.bz2 |
Defer SpdySession destruction to support closing writes
Replace the STATE_CLOSED availability state with STATE_DRAINING, where:
* The read-pump will not run, and no frames can be read.
* No new frames may be queued in the sessions's write_queue.
* However, the write-pump will continue to run.
SpdySession's write-pump will destroy the session iff it's draining, the
write queue is empty and no in-flight writes remain. As this (and
~SpdySessionPool()) are now the only locations where SpdySessions may be
destroyed, session lifetime is more know-able and some WeakPtrs are
removed.
Some tests have been updated to include closing RST_STREAMS sent by the
session. This reflects current behavior. A future CL will add GOAWAY
frames upon session errors.
BUG=375033,379469
Committed: https://src.chromium.org/viewvc/chrome?view=rev&revision=273680
Review URL: https://codereview.chromium.org/305823003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@275005 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/http/http_stream_factory_impl_job.cc | 3 | ||||
-rw-r--r-- | net/http/http_stream_factory_impl_request.cc | 9 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_unittest.cc | 4 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket.cc | 1 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket_unittest.cc | 81 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 299 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 77 | ||||
-rw-r--r-- | net/spdy/spdy_session_pool.cc | 11 | ||||
-rw-r--r-- | net/spdy/spdy_session_pool_unittest.cc | 136 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 178 |
10 files changed, 450 insertions, 349 deletions
diff --git a/net/http/http_stream_factory_impl_job.cc b/net/http/http_stream_factory_impl_job.cc index 42ed0c0..93b9ef2 100644 --- a/net/http/http_stream_factory_impl_job.cc +++ b/net/http/http_stream_factory_impl_job.cc @@ -339,6 +339,9 @@ void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() { // NULL at this point if the SpdySession closed immediately after creation. base::WeakPtr<SpdySession> spdy_session = new_spdy_session_; new_spdy_session_.reset(); + + // TODO(jgraettinger): Notify the factory, and let that notify |request_|, + // rather than notifying |request_| directly. if (IsOrphaned()) { if (spdy_session) { stream_factory_->OnNewSpdySessionReady( diff --git a/net/http/http_stream_factory_impl_request.cc b/net/http/http_stream_factory_impl_request.cc index 9c49433..c8924d2 100644 --- a/net/http/http_stream_factory_impl_request.cc +++ b/net/http/http_stream_factory_impl_request.cc @@ -250,6 +250,13 @@ HttpStreamFactoryImpl::Request::RemoveRequestFromSpdySessionRequestMap() { } } +// TODO(jgraettinger): Currently, HttpStreamFactoryImpl::Job notifies a +// Request that the session is ready, which in turn notifies it's delegate, +// and then it notifies HttpStreamFactoryImpl so that /other/ requests may +// be woken, but only if the spdy_session is still okay. This is tough to grok. +// Instead, see if Job can notify HttpStreamFactoryImpl only, and have one +// path for notifying any requests waiting for the session (including the +// request which spawned it). void HttpStreamFactoryImpl::Request::OnNewSpdySessionReady( Job* job, scoped_ptr<HttpStream> stream, @@ -292,7 +299,7 @@ void HttpStreamFactoryImpl::Request::OnNewSpdySessionReady( stream.release()); } // |this| may be deleted after this point. - if (spdy_session) { + if (spdy_session && spdy_session->IsAvailable()) { factory->OnNewSpdySessionReady(spdy_session, direct, used_ssl_config, diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index 3993a56..46f0500 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -4277,7 +4277,9 @@ TEST_P(SpdyNetworkTransactionTest, BufferedCancelled) { scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true)); - MockWrite writes[] = { CreateMockWrite(*req) }; + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); + MockWrite writes[] = {CreateMockWrite(*req), CreateMockWrite(*rst)}; // NOTE: We don't FIN the stream. scoped_ptr<SpdyFrame> data_frame( diff --git a/net/spdy/spdy_proxy_client_socket.cc b/net/spdy/spdy_proxy_client_socket.cc index 01c30b9..6a87b54 100644 --- a/net/spdy/spdy_proxy_client_socket.cc +++ b/net/spdy/spdy_proxy_client_socket.cc @@ -410,6 +410,7 @@ int SpdyProxyClientSocket::DoReadReplyComplete(int result) { if (SanitizeProxyRedirect(&response_, request_.url)) { redirect_has_load_timing_info_ = spdy_stream_->GetLoadTimingInfo(&redirect_load_timing_info_); + // Note that this triggers a RST_STREAM_CANCEL. spdy_stream_->DetachDelegate(); next_state_ = STATE_DISCONNECTED; return ERR_HTTPS_PROXY_TUNNEL_RESPONSE; diff --git a/net/spdy/spdy_proxy_client_socket_unittest.cc b/net/spdy/spdy_proxy_client_socket_unittest.cc index 7ae59f5..1f9747d 100644 --- a/net/spdy/spdy_proxy_client_socket_unittest.cc +++ b/net/spdy/spdy_proxy_client_socket_unittest.cc @@ -164,7 +164,6 @@ SpdyProxyClientSocketTest::SpdyProxyClientSocketTest() } void SpdyProxyClientSocketTest::TearDown() { - sock_.reset(NULL); if (session_.get() != NULL) session_->spdy_session_pool()->CloseAllSessions(); @@ -448,14 +447,15 @@ TEST_P(SpdyProxyClientSocketTest, ConnectWithAuthCredentials) { TEST_P(SpdyProxyClientSocketTest, ConnectRedirects) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectRedirectReplyFrame()); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - MockRead(ASYNC, 0, 2), // EOF + CreateMockRead(*resp, 1, ASYNC), MockRead(ASYNC, 0, 3), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -473,6 +473,9 @@ TEST_P(SpdyProxyClientSocketTest, ConnectRedirects) { std::string location; ASSERT_TRUE(headers->IsRedirect(&location)); ASSERT_EQ(location, kRedirectUrl); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } TEST_P(SpdyProxyClientSocketTest, ConnectFails) { @@ -499,14 +502,15 @@ TEST_P(SpdyProxyClientSocketTest, ConnectFails) { TEST_P(SpdyProxyClientSocketTest, WasEverUsedReturnsCorrectValues) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - MockRead(ASYNC, 0, 2), // EOF + CreateMockRead(*resp, 1, ASYNC), MockRead(ASYNC, 0, 3), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -516,6 +520,9 @@ TEST_P(SpdyProxyClientSocketTest, WasEverUsedReturnsCorrectValues) { EXPECT_TRUE(sock_->WasEverUsed()); sock_->Disconnect(); EXPECT_TRUE(sock_->WasEverUsed()); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // ----------- GetPeerAddress @@ -987,14 +994,15 @@ TEST_P(SpdyProxyClientSocketTest, PendingReadOnCloseReturnsZero) { TEST_P(SpdyProxyClientSocketTest, ReadOnDisconnectSocketReturnsNotConnected) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - MockRead(ASYNC, 0, 2), // EOF + CreateMockRead(*resp, 1, ASYNC), MockRead(ASYNC, 0, 3), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -1005,6 +1013,9 @@ TEST_P(SpdyProxyClientSocketTest, ASSERT_EQ(ERR_SOCKET_NOT_CONNECTED, sock_->Read(NULL, 1, CompletionCallback())); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // Reading buffered data from an already closed socket should return @@ -1068,15 +1079,16 @@ TEST_P(SpdyProxyClientSocketTest, WriteOnClosedStream) { // Calling Write() on a disconnected socket is an error TEST_P(SpdyProxyClientSocketTest, WriteOnDisconnectedSocket) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); scoped_ptr<SpdyFrame> msg1(ConstructBodyFrame(kMsg1, kLen1)); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - MockRead(ASYNC, 0, 2), // EOF + CreateMockRead(*resp, 1, ASYNC), MockRead(ASYNC, 0, 3), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -1088,6 +1100,9 @@ TEST_P(SpdyProxyClientSocketTest, WriteOnDisconnectedSocket) { scoped_refptr<IOBufferWithSize> buf(CreateBuffer(kMsg1, kLen1)); EXPECT_EQ(ERR_SOCKET_NOT_CONNECTED, sock_->Write(buf.get(), buf->size(), CompletionCallback())); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // If the socket is closed with a pending Write(), the callback @@ -1124,15 +1139,16 @@ TEST_P(SpdyProxyClientSocketTest, WritePendingOnClose) { // should not be called. TEST_P(SpdyProxyClientSocketTest, DisconnectWithWritePending) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(SYNCHRONOUS, 0, 2), // EOF + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 2), + MockWrite(SYNCHRONOUS, 0, 3), // EOF }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - MockRead(ASYNC, 0, 3), // EOF + CreateMockRead(*resp, 1, ASYNC), MockRead(ASYNC, 0, 4), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -1149,20 +1165,24 @@ TEST_P(SpdyProxyClientSocketTest, DisconnectWithWritePending) { EXPECT_FALSE(sock_->IsConnected()); EXPECT_FALSE(write_callback_.have_result()); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // If the socket is Disconnected with a pending Read(), the callback // should not be called. TEST_P(SpdyProxyClientSocketTest, DisconnectWithReadPending) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - MockRead(ASYNC, 0, 2), // EOF + CreateMockRead(*resp, 1, ASYNC), MockRead(ASYNC, 0, 3), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -1179,6 +1199,9 @@ TEST_P(SpdyProxyClientSocketTest, DisconnectWithReadPending) { EXPECT_FALSE(sock_->IsConnected()); EXPECT_FALSE(read_callback_.have_result()); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // If the socket is Reset when both a read and write are pending, @@ -1220,22 +1243,26 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePending) { EXPECT_TRUE(sock_.get()); EXPECT_TRUE(read_callback_.have_result()); EXPECT_TRUE(write_callback_.have_result()); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // Makes sure the proxy client socket's source gets the expected NetLog events // and only the expected NetLog events (No SpdySession events). TEST_P(SpdyProxyClientSocketTest, NetLog) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*conn, 0, SYNCHRONOUS), + CreateMockWrite(*conn, 0, SYNCHRONOUS), CreateMockWrite(*rst, 3), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); scoped_ptr<SpdyFrame> msg1(ConstructBodyFrame(kMsg1, kLen1)); MockRead reads[] = { - CreateMockRead(*resp, 1, ASYNC), - CreateMockRead(*msg1, 2, ASYNC), - MockRead(ASYNC, 0, 3), // EOF + CreateMockRead(*resp, 1, ASYNC), CreateMockRead(*msg1, 2, ASYNC), + MockRead(ASYNC, 0, 4), // EOF }; Initialize(reads, arraysize(reads), writes, arraysize(writes)); @@ -1275,6 +1302,9 @@ TEST_P(SpdyProxyClientSocketTest, NetLog) { NetLog::TYPE_SOCKET_BYTES_RECEIVED, NetLog::PHASE_NONE)); EXPECT_TRUE(LogContainsEndEvent(entry_list, 9, NetLog::TYPE_SOCKET_ALIVE)); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } // CompletionCallback that causes the SpdyProxyClientSocket to be @@ -1346,6 +1376,9 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePendingDelete) { EXPECT_FALSE(sock_.get()); EXPECT_TRUE(read_callback.have_result()); EXPECT_FALSE(write_callback_.have_result()); + + // Let the RST_STREAM write while |rst| is in-scope. + base::MessageLoop::current()->RunUntilIdle(); } } // namespace net diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index c1c598e..983df0a 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -527,8 +527,7 @@ SpdySession::SpdySession( SpdySession::~SpdySession() { CHECK(!in_io_loop_); - DCHECK(!pool_); - DcheckClosed(); + DcheckDraining(); // TODO(akalin): Check connection->is_initialized() instead. This // requires re-working CreateFakeSpdySession(), though. @@ -602,7 +601,7 @@ void SpdySession::InitializeWithSocket( NetLog::TYPE_SPDY_SESSION_INITIALIZED, connection_->socket()->NetLog().source().ToEventParametersCallback()); - DCHECK_NE(availability_state_, STATE_CLOSED); + DCHECK_EQ(availability_state_, STATE_AVAILABLE); connection_->AddHigherLayeredPool(this); if (enable_sending_initial_data_) SendInitialData(); @@ -619,7 +618,7 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { if (!verify_domain_authentication_) return true; - if (availability_state_ == STATE_CLOSED) + if (availability_state_ == STATE_DRAINING) return false; SSLInfo ssl_info; @@ -645,8 +644,7 @@ int SpdySession::GetPushStream( stream->reset(); - // TODO(akalin): Add unit test exercising this code path. - if (availability_state_ == STATE_CLOSED) + if (availability_state_ == STATE_DRAINING) return ERR_CONNECTION_CLOSED; Error err = TryAccessStream(url); @@ -666,17 +664,14 @@ int SpdySession::GetPushStream( // another being closed due to received data. Error SpdySession::TryAccessStream(const GURL& url) { - DCHECK_NE(availability_state_, STATE_CLOSED); - if (is_secure_ && certificate_error_code_ != OK && (url.SchemeIs("https") || url.SchemeIs("wss"))) { RecordProtocolErrorHistogram( PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); - CloseSessionResult result = DoCloseSession( + DoDrainSession( static_cast<Error>(certificate_error_code_), "Tried to get SPDY stream for secure content over an unauthenticated " "session."); - DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); return ERR_SPDY_PROTOCOL_ERROR; } return OK; @@ -690,8 +685,7 @@ int SpdySession::TryCreateStream( if (availability_state_ == STATE_GOING_AWAY) return ERR_FAILED; - // TODO(akalin): Add unit test exercising this code path. - if (availability_state_ == STATE_CLOSED) + if (availability_state_ == STATE_DRAINING) return ERR_CONNECTION_CLOSED; Error err = TryAccessStream(request->url()); @@ -721,8 +715,7 @@ int SpdySession::CreateStream(const SpdyStreamRequest& request, if (availability_state_ == STATE_GOING_AWAY) return ERR_FAILED; - // TODO(akalin): Add unit test exercising this code path. - if (availability_state_ == STATE_CLOSED) + if (availability_state_ == STATE_DRAINING) return ERR_CONNECTION_CLOSED; Error err = TryAccessStream(request.url()); @@ -738,10 +731,9 @@ int SpdySession::CreateStream(const SpdyStreamRequest& request, UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", connection_->socket()->IsConnected()); if (!connection_->socket()->IsConnected()) { - CloseSessionResult result = DoCloseSession( + DoDrainSession( ERR_CONNECTION_CLOSED, "Tried to create SPDY stream for a closed socket connection."); - DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); return ERR_CONNECTION_CLOSED; } } @@ -882,17 +874,12 @@ base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { bool SpdySession::CloseOneIdleConnection() { CHECK(!in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK(pool_); - if (!active_streams_.empty()) - return false; - CloseSessionResult result = - DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection."); - if (result != SESSION_CLOSED_AND_REMOVED) { - NOTREACHED(); - return false; + if (active_streams_.empty()) { + DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); } - return true; + // Return false as the socket wasn't immediately closed. + return false; } void SpdySession::EnqueueStreamWrite( @@ -945,8 +932,7 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, IOBuffer* data, int len, SpdyDataFlags flags) { - if (availability_state_ == STATE_CLOSED) { - NOTREACHED(); + if (availability_state_ == STATE_DRAINING) { return scoped_ptr<SpdyBuffer>(); } @@ -1135,22 +1121,13 @@ void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, if (owned_stream->type() == SPDY_PUSH_STREAM) unclaimed_pushed_streams_.erase(owned_stream->url()); - base::WeakPtr<SpdySession> weak_this = GetWeakPtr(); - DeleteStream(owned_stream.Pass(), status); - - if (!weak_this) - return; - - if (availability_state_ == STATE_CLOSED) - return; + MaybeFinishGoingAway(); // If there are no active streams and the socket pool is stalled, close the // session to free up a socket slot. if (active_streams_.empty() && connection_->IsPoolStalled()) { - CloseSessionResult result = - DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); - CHECK_NE(result, SESSION_ALREADY_CLOSED); + DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); } } @@ -1195,31 +1172,21 @@ void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id, void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { CHECK(!in_io_loop_); - CHECK_NE(availability_state_, STATE_CLOSED); - CHECK_EQ(read_state_, expected_read_state); - - result = DoReadLoop(expected_read_state, result); - - if (availability_state_ == STATE_CLOSED) { - CHECK_EQ(result, error_on_close_); - CHECK_LT(error_on_close_, ERR_IO_PENDING); - RemoveFromPool(); + if (availability_state_ == STATE_DRAINING) { return; } - - CHECK(result == OK || result == ERR_IO_PENDING); + ignore_result(DoReadLoop(expected_read_state, result)); } int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { CHECK(!in_io_loop_); - CHECK_NE(availability_state_, STATE_CLOSED); CHECK_EQ(read_state_, expected_read_state); in_io_loop_ = true; int bytes_read_without_yielding = 0; - // Loop until the session is closed, the read becomes blocked, or + // Loop until the session is draining, the read becomes blocked, or // the read limit is exceeded. while (true) { switch (read_state_) { @@ -1237,11 +1204,8 @@ int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { break; } - if (availability_state_ == STATE_CLOSED) { - CHECK_EQ(result, error_on_close_); - CHECK_LT(result, ERR_IO_PENDING); + if (availability_state_ == STATE_DRAINING) break; - } if (result == ERR_IO_PENDING) break; @@ -1265,7 +1229,6 @@ int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { int SpdySession::DoRead() { CHECK(in_io_loop_); - CHECK_NE(availability_state_, STATE_CLOSED); CHECK(connection_); CHECK(connection_->socket()); @@ -1279,7 +1242,6 @@ int SpdySession::DoRead() { int SpdySession::DoReadComplete(int result) { CHECK(in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); // Parse a frame. For now this code requires that the frame fit into our // buffer (kReadBufferSize). @@ -1288,20 +1250,13 @@ int SpdySession::DoReadComplete(int result) { if (result == 0) { UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", total_bytes_received_, 1, 100000000, 50); - CloseSessionResult close_session_result = - DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed"); - DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); - DCHECK_EQ(availability_state_, STATE_CLOSED); - DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED); + DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed"); + return ERR_CONNECTION_CLOSED; } if (result < 0) { - CloseSessionResult close_session_result = - DoCloseSession(static_cast<Error>(result), "result is < 0."); - DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); - DCHECK_EQ(availability_state_, STATE_CLOSED); - DCHECK_EQ(error_on_close_, result); + DoDrainSession(static_cast<Error>(result), "result is < 0."); return result; } CHECK_LE(result, kReadBufferSize); @@ -1316,9 +1271,8 @@ int SpdySession::DoReadComplete(int result) { result -= bytes_processed; data += bytes_processed; - if (availability_state_ == STATE_CLOSED) { - DCHECK_LT(error_on_close_, ERR_IO_PENDING); - return error_on_close_; + if (availability_state_ == STATE_DRAINING) { + return ERR_CONNECTION_CLOSED; } DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); @@ -1330,24 +1284,19 @@ int SpdySession::DoReadComplete(int result) { void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { CHECK(!in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK_EQ(write_state_, expected_write_state); - result = DoWriteLoop(expected_write_state, result); + DoWriteLoop(expected_write_state, result); - if (availability_state_ == STATE_CLOSED) { - DCHECK_EQ(result, error_on_close_); - DCHECK_LT(error_on_close_, ERR_IO_PENDING); - RemoveFromPool(); + if (availability_state_ == STATE_DRAINING && !in_flight_write_ && + write_queue_.IsEmpty()) { + pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|. return; } - - DCHECK(result == OK || result == ERR_IO_PENDING); } int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { CHECK(!in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK_NE(write_state_, WRITE_STATE_IDLE); DCHECK_EQ(write_state_, expected_write_state); @@ -1369,12 +1318,6 @@ int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { break; } - if (availability_state_ == STATE_CLOSED) { - DCHECK_EQ(result, error_on_close_); - DCHECK_LT(result, ERR_IO_PENDING); - break; - } - if (write_state_ == WRITE_STATE_IDLE) { DCHECK_EQ(result, ERR_IO_PENDING); break; @@ -1392,7 +1335,6 @@ int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { int SpdySession::DoWrite() { CHECK(in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK(buffered_spdy_framer_); if (in_flight_write_) { @@ -1456,7 +1398,6 @@ int SpdySession::DoWrite() { int SpdySession::DoWriteComplete(int result) { CHECK(in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK_NE(result, ERR_IO_PENDING); DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); @@ -1468,12 +1409,9 @@ int SpdySession::DoWriteComplete(int result) { in_flight_write_frame_type_ = DATA; in_flight_write_frame_size_ = 0; in_flight_write_stream_.reset(); - CloseSessionResult close_session_result = - DoCloseSession(static_cast<Error>(result), "Write error"); - DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); - DCHECK_EQ(availability_state_, STATE_CLOSED); - DCHECK_EQ(error_on_close_, result); - return result; + write_state_ = WRITE_STATE_DO_WRITE; + DoDrainSession(static_cast<Error>(result), "Write error"); + return OK; } // It should not be possible to have written more bytes than our @@ -1517,13 +1455,11 @@ void SpdySession::DcheckGoingAway() const { #endif } -void SpdySession::DcheckClosed() const { +void SpdySession::DcheckDraining() const { DcheckGoingAway(); - DCHECK_EQ(availability_state_, STATE_CLOSED); - DCHECK_LT(error_on_close_, ERR_IO_PENDING); + DCHECK_EQ(availability_state_, STATE_DRAINING); DCHECK(active_streams_.empty()); DCHECK(unclaimed_pushed_streams_.empty()); - DCHECK(write_queue_.IsEmpty()); } void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, @@ -1573,21 +1509,21 @@ void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, } void SpdySession::MaybeFinishGoingAway() { - DcheckGoingAway(); - if (active_streams_.empty() && availability_state_ != STATE_CLOSED) { - CloseSessionResult result = - DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away"); - CHECK_NE(result, SESSION_ALREADY_CLOSED); + if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) { + DoDrainSession(OK, "Finished going away"); } } -SpdySession::CloseSessionResult SpdySession::DoCloseSession( - Error err, - const std::string& description) { - CHECK_LT(err, ERR_IO_PENDING); +void SpdySession::DoDrainSession(Error err, const std::string& description) { + if (availability_state_ == STATE_DRAINING) { + return; + } + MakeUnavailable(); - if (availability_state_ == STATE_CLOSED) - return SESSION_ALREADY_CLOSED; + // TODO(jgraettinger): If draining with an |err|, enqueue a GOAWAY frame here. + + availability_state_ = STATE_DRAINING; + error_on_close_ = err; net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_CLOSE, @@ -1597,32 +1533,9 @@ SpdySession::CloseSessionResult SpdySession::DoCloseSession( UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", total_bytes_received_, 1, 100000000, 50); - CHECK(pool_); - if (availability_state_ != STATE_GOING_AWAY) - pool_->MakeSessionUnavailable(GetWeakPtr()); - - availability_state_ = STATE_CLOSED; - error_on_close_ = err; - StartGoingAway(0, err); - write_queue_.Clear(); - - DcheckClosed(); - - if (in_io_loop_) - return SESSION_CLOSED_BUT_NOT_REMOVED; - - RemoveFromPool(); - return SESSION_CLOSED_AND_REMOVED; -} - -void SpdySession::RemoveFromPool() { - DcheckClosed(); - CHECK(pool_); - - SpdySessionPool* pool = pool_; - pool_ = NULL; - pool->RemoveUnavailableSession(GetWeakPtr()); + DcheckDraining(); + MaybePostWriteLoop(); } void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { @@ -1661,15 +1574,12 @@ SpdyStreamId SpdySession::GetNewStreamId() { void SpdySession::CloseSessionOnError(Error err, const std::string& description) { - // We may be called from anywhere, so we can't expect a particular - // return value. - ignore_result(DoCloseSession(err, description)); + DoDrainSession(err, description); } void SpdySession::MakeUnavailable() { - if (availability_state_ < STATE_GOING_AWAY) { + if (availability_state_ == STATE_AVAILABLE) { availability_state_ = STATE_GOING_AWAY; - DCHECK(pool_); pool_->MakeSessionUnavailable(GetWeakPtr()); } } @@ -1778,14 +1688,16 @@ void SpdySession::EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer, const base::WeakPtr<SpdyStream>& stream) { - if (availability_state_ == STATE_CLOSED) + if (availability_state_ == STATE_DRAINING) return; - bool was_idle = write_queue_.IsEmpty(); write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); + MaybePostWriteLoop(); +} + +void SpdySession::MaybePostWriteLoop() { if (write_state_ == WRITE_STATE_IDLE) { - DCHECK(was_idle); - DCHECK(!in_flight_write_); + CHECK(!in_flight_write_); write_state_ = WRITE_STATE_DO_WRITE; base::MessageLoop::current()->PostTask( FROM_HERE, @@ -1829,26 +1741,10 @@ void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { } write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); - - // |stream->OnClose()| may end up closing |this|, so detect that. - base::WeakPtr<SpdySession> weak_this = GetWeakPtr(); - stream->OnClose(status); - if (!weak_this) - return; - - switch (availability_state_) { - case STATE_AVAILABLE: - ProcessPendingStreamRequests(); - break; - case STATE_GOING_AWAY: - DcheckGoingAway(); - MaybeFinishGoingAway(); - break; - case STATE_CLOSED: - // Do nothing. - break; + if (availability_state_ == STATE_AVAILABLE) { + ProcessPendingStreamRequests(); } } @@ -1892,24 +1788,16 @@ bool SpdySession::GetSSLCertRequestInfo( void SpdySession::OnError(SpdyFramer::SpdyError error_code) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code)); std::string description = base::StringPrintf( "SPDY_ERROR error_code: %d.", error_code); - CloseSessionResult result = - DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description); - DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); + DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, description); } void SpdySession::OnStreamError(SpdyStreamId stream_id, const std::string& description) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - ActiveStreamMap::iterator it = active_streams_.find(stream_id); if (it == active_streams_.end()) { // We still want to send a frame to reset the stream even if we @@ -1927,9 +1815,6 @@ void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id, bool fin) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - ActiveStreamMap::iterator it = active_streams_.find(stream_id); // By the time data comes in, the stream may already be inactive. @@ -1950,9 +1835,6 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, bool fin) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - if (data == NULL && len != 0) { // This is notification of consumed data padding. // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames. @@ -2012,9 +1894,6 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, void SpdySession::OnSettings(bool clear_persisted) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - if (clear_persisted) http_server_properties_->ClearSpdySettings(host_port_pair()); @@ -2042,9 +1921,6 @@ void SpdySession::OnSetting(SpdySettingsIds id, uint32 value) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - HandleSetting(id, value); http_server_properties_->SetSpdySetting( host_port_pair(), @@ -2112,9 +1988,6 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, const SpdyHeaderBlock& headers) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = time_func_(); @@ -2300,9 +2173,6 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id, const SpdyHeaderBlock& headers) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = time_func_(); @@ -2350,9 +2220,6 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id, const SpdyHeaderBlock& headers) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - if (net_log().IsLogging()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, @@ -2400,9 +2267,6 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, SpdyRstStreamStatus status) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - std::string description; net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RST_STREAM, @@ -2438,9 +2302,6 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, SpdyGoAwayStatus status) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, base::Bind(&NetLogSpdyGoAwayCallback, last_accepted_stream_id, @@ -2459,9 +2320,6 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_PING, base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received")); @@ -2476,9 +2334,7 @@ void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) { --pings_in_flight_; if (pings_in_flight_ < 0) { RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); - CloseSessionResult result = - DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); - DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); + DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); pings_in_flight_ = 0; return; } @@ -2495,9 +2351,6 @@ void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size) { CHECK(in_io_loop_); - if (availability_state_ == STATE_CLOSED) - return; - DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, @@ -2515,11 +2368,10 @@ void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, if (delta_window_size < 1u) { RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); - CloseSessionResult result = DoCloseSession( + DoDrainSession( ERR_SPDY_PROTOCOL_ERROR, "Received WINDOW_UPDATE with an invalid delta_window_size " + - base::UintToString(delta_window_size)); - DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); + base::UintToString(delta_window_size)); return; } @@ -2577,7 +2429,6 @@ void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, void SpdySession::SendInitialData() { DCHECK(enable_sending_initial_data_); - DCHECK_NE(availability_state_, STATE_CLOSED); if (send_connection_header_prefix_) { DCHECK_EQ(protocol_, kProtoSPDY4); @@ -2643,8 +2494,6 @@ void SpdySession::SendInitialData() { void SpdySession::SendSettings(const SettingsMap& settings) { - DCHECK_NE(availability_state_, STATE_CLOSED); - net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, base::Bind(&NetLogSpdySendSettingsCallback, &settings)); @@ -2773,7 +2622,6 @@ void SpdySession::PlanToCheckPingStatus() { void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { CHECK(!in_io_loop_); - DCHECK_NE(availability_state_, STATE_CLOSED); // Check if we got a response back for all PINGs we had sent. if (pings_in_flight_ == 0) { @@ -2789,9 +2637,7 @@ void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { // Track all failed PING messages in a separate bucket. RecordPingRTTHistogram(base::TimeDelta::Max()); - CloseSessionResult result = - DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping."); - DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); + DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping."); return; } @@ -2924,9 +2770,6 @@ void SpdySession::OnWriteBufferConsumed( // We can be called with |in_io_loop_| set if a write SpdyBuffer is // deleted (e.g., a stream is closed due to incoming data). - if (availability_state_ == STATE_CLOSED) - return; - DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); if (consume_source == SpdyBuffer::DISCARD) { @@ -2946,7 +2789,6 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { // We can be called with |in_io_loop_| set if a SpdyBuffer is // deleted (e.g., a stream is closed due to incoming data). - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(delta_window_size, 1); @@ -2954,13 +2796,12 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { int32 max_delta_window_size = kint32max - session_send_window_size_; if (delta_window_size > max_delta_window_size) { RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); - CloseSessionResult result = DoCloseSession( + DoDrainSession( ERR_SPDY_PROTOCOL_ERROR, "Received WINDOW_UPDATE [delta: " + - base::IntToString(delta_window_size) + - "] for session overflows session_send_window_size_ [current: " + - base::IntToString(session_send_window_size_) + "]"); - DCHECK_NE(result, SESSION_ALREADY_CLOSED); + base::IntToString(delta_window_size) + + "] for session overflows session_send_window_size_ [current: " + + base::IntToString(session_send_window_size_) + "]"); return; } @@ -2976,7 +2817,6 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { } void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); // We only call this method when sending a frame. Therefore, @@ -3002,9 +2842,6 @@ void SpdySession::OnReadBufferConsumed( // We can be called with |in_io_loop_| set if a read SpdyBuffer is // deleted (e.g., discarded by a SpdyReadQueue). - if (availability_state_ == STATE_CLOSED) - return; - DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(consume_size, 1u); DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); @@ -3013,7 +2850,6 @@ void SpdySession::OnReadBufferConsumed( } void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { - DCHECK_NE(availability_state_, STATE_CLOSED); DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); DCHECK_GE(session_unacked_recv_window_bytes_, 0); DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); @@ -3046,12 +2882,11 @@ void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { // negative. If we do, the receive window isn't being respected. if (delta_window_size > session_recv_window_size_) { RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); - CloseSessionResult result = DoCloseSession( + DoDrainSession( ERR_SPDY_PROTOCOL_ERROR, "delta_window_size is " + base::IntToString(delta_window_size) + " in DecreaseRecvWindowSize, which is larger than the receive " + "window size of " + base::IntToString(session_recv_window_size_)); - DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); return; } @@ -3078,7 +2913,7 @@ void SpdySession::ResumeSendStalledStreams() { // have to worry about streams being closed, as well as ourselves // being closed. - while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { + while (!IsSendStalled()) { size_t old_size = 0; #if DCHECK_IS_ON old_size = GetTotalSize(stream_send_unstall_queue_); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 27e6049..d65ef46 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -344,12 +344,10 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, void SendStreamWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size); - // Whether the stream is closed, i.e. it has stopped processing data - // and is about to be destroyed. - // - // TODO(akalin): This is only used in tests. Remove this function - // and have tests test the WeakPtr instead. - bool IsClosed() const { return availability_state_ == STATE_CLOSED; } + // Accessors for the session's availability state. + bool IsAvailable() const { return availability_state_ == STATE_AVAILABLE; } + bool IsGoingAway() const { return availability_state_ == STATE_GOING_AWAY; } + bool IsDraining() const { return availability_state_ == STATE_DRAINING; } // Closes this session. This will close all active streams and mark // the session as permanently closed. Callers must assume that the @@ -533,9 +531,11 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, // The session can process data on existing streams but will // refuse to create new ones. STATE_GOING_AWAY, - // The session has been closed, is waiting to be deleted, and will - // refuse to process any more data. - STATE_CLOSED + // The session is draining its write queue in preparation of closing. + // Further writes will not be queued, and further reads will not be issued + // (though the remainder of a current read may be processed). The session + // will be destroyed by its write loop once the write queue is drained. + STATE_DRAINING, }; enum ReadState { @@ -550,18 +550,6 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, WRITE_STATE_DO_WRITE_COMPLETE, }; - // The return value of DoCloseSession() describing what was done. - enum CloseSessionResult { - // The session was already closed so nothing was done. - SESSION_ALREADY_CLOSED, - // The session was moved into the closed state but was not removed - // from |pool_| (because we're in an IO loop). - SESSION_CLOSED_BUT_NOT_REMOVED, - // The session was moved into the closed state and removed from - // |pool_|. - SESSION_CLOSED_AND_REMOVED, - }; - // Checks whether a stream for the given |url| can be created or // retrieved from the set of unclaimed push streams. Returns OK if // so. Otherwise, the session is closed and an error < @@ -619,11 +607,8 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, SpdyRstStreamStatus status, const std::string& description); - // Calls DoReadLoop and then if |availability_state_| is - // STATE_CLOSED, calls RemoveFromPool(). - // - // Use this function instead of DoReadLoop when posting a task to - // pump the read loop. + // Calls DoReadLoop. Use this function instead of DoReadLoop when + // posting a task to pump the read loop. void PumpReadLoop(ReadState expected_read_state, int result); // Advance the ReadState state machine. |expected_read_state| is the @@ -635,13 +620,18 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, int DoRead(); int DoReadComplete(int result); - // Calls DoWriteLoop and then if |availability_state_| is - // STATE_CLOSED, calls RemoveFromPool(). + // Calls DoWriteLoop. If |availability_state_| is STATE_DRAINING and no + // writes remain, the session is removed from the session pool and + // destroyed. // // Use this function instead of DoWriteLoop when posting a task to // pump the write loop. void PumpWriteLoop(WriteState expected_write_state, int result); + // Iff the write loop is not currently active, posts a callback into + // PumpWriteLoop(). + void MaybePostWriteLoop(); + // Advance the WriteState state machine. |expected_write_state| is // the expected starting write state. // @@ -744,10 +734,9 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, void DcheckGoingAway() const; // Calls DcheckGoingAway(), then DCHECKs that |availability_state_| - // == STATE_CLOSED, |error_on_close_| has a valid value, that there - // are no active streams or unclaimed pushed streams, and that the - // write queue is empty. - void DcheckClosed() const; + // == STATE_DRAINING, |error_on_close_| has a valid value, and that there + // are no active streams or unclaimed pushed streams. + void DcheckDraining() const; // Closes all active streams with stream id's greater than // |last_good_stream_id|, as well as any created or pending @@ -761,19 +750,9 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, // isn't closed yet, close it. void MaybeFinishGoingAway(); - // If the stream is already closed, does nothing. Otherwise, moves - // the session to a closed state. Then, if we're in an IO loop, - // returns (as the IO loop will do the pool removal itself when its - // done). Otherwise, also removes |this| from |pool_|. The returned - // result describes what was done. - CloseSessionResult DoCloseSession(Error err, const std::string& description); - - // Remove this session from its pool, which must exist. Must be - // called only when the session is closed. - // - // Must be called only via Pump{Read,Write}Loop() or - // DoCloseSession(). - void RemoveFromPool(); + // If the session is already draining, does nothing. Otherwise, moves + // the session to the draining state. + void DoDrainSession(Error err, const std::string& description); // Called right before closing a (possibly-inactive) stream for a // reason other than being requested to by the stream. @@ -1013,10 +992,10 @@ class NET_EXPORT SpdySession : public BufferedSpdyFramerVisitorInterface, ReadState read_state_; WriteState write_state_; - // If the session was closed (i.e., |availability_state_| is - // STATE_CLOSED), then |error_on_close_| holds the error with which - // it was closed, which is < ERR_IO_PENDING. Otherwise, it is set to - // OK. + // If the session is closing (i.e., |availability_state_| is STATE_DRAINING), + // then |error_on_close_| holds the error with which it was closed, which + // may be OK (upon a polite GOAWAY) or an error < ERR_IO_PENDING otherwise. + // Initialized to OK. Error error_on_close_; // Limits diff --git a/net/spdy/spdy_session_pool.cc b/net/spdy/spdy_session_pool.cc index 4747f05..66d907d 100644 --- a/net/spdy/spdy_session_pool.cc +++ b/net/spdy/spdy_session_pool.cc @@ -71,6 +71,12 @@ SpdySessionPool::SpdySessionPool( SpdySessionPool::~SpdySessionPool() { CloseAllSessions(); + while (!sessions_.empty()) { + // Destroy sessions to enforce that lifetime is scoped to SpdySessionPool. + // Write callbacks queued upon session drain are not invoked. + RemoveUnavailableSession((*sessions_.begin())->GetWeakPtr()); + } + if (ssl_config_service_.get()) ssl_config_service_->RemoveObserver(this); NetworkChangeNotifier::RemoveIPAddressObserver(this); @@ -244,7 +250,7 @@ void SpdySessionPool::CloseCurrentIdleSessions() { } void SpdySessionPool::CloseAllSessions() { - while (!sessions_.empty()) { + while (!available_sessions_.empty()) { CloseCurrentSessionsHelper(ERR_ABORTED, "Closing all sessions.", false /* idle_only */); } @@ -280,7 +286,7 @@ void SpdySessionPool::OnIPAddressChanged() { #else (*it)->CloseSessionOnError(ERR_NETWORK_CHANGED, "Closing current sessions."); - DCHECK(!*it); + DCHECK((*it)->IsDraining()); #endif // defined(OS_ANDROID) || defined(OS_WIN) || defined(OS_IOS) DCHECK(!IsSessionAvailable(*it)); } @@ -389,7 +395,6 @@ void SpdySessionPool::CloseCurrentSessionsHelper( (*it)->CloseSessionOnError(error, description); DCHECK(!IsSessionAvailable(*it)); - DCHECK(!*it); } } diff --git a/net/spdy/spdy_session_pool_unittest.cc b/net/spdy/spdy_session_pool_unittest.cc index 58002dc..1a29cc7 100644 --- a/net/spdy/spdy_session_pool_unittest.cc +++ b/net/spdy/spdy_session_pool_unittest.cc @@ -14,6 +14,7 @@ #include "net/socket/client_socket_handle.h" #include "net/socket/transport_client_socket_pool.h" #include "net/spdy/spdy_session.h" +#include "net/spdy/spdy_stream_test_util.h" #include "net/spdy/spdy_test_util_common.h" #include "testing/gtest/include/gtest/gtest.h" @@ -138,7 +139,7 @@ TEST_P(SpdySessionPoolTest, CloseCurrentSessions) { TEST_P(SpdySessionPoolTest, CloseCurrentIdleSessions) { MockConnect connect_data(SYNCHRONOUS, OK); MockRead reads[] = { - MockRead(ASYNC, 0, 0) // EOF + MockRead(SYNCHRONOUS, ERR_IO_PENDING) // Stall forever. }; session_deps_.host_resolver->set_synchronous_mode(true); @@ -195,20 +196,20 @@ TEST_P(SpdySessionPoolTest, CloseCurrentIdleSessions) { // All sessions are active and not closed EXPECT_TRUE(session1->is_active()); - EXPECT_FALSE(session1->IsClosed()); + EXPECT_TRUE(session1->IsAvailable()); EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); EXPECT_TRUE(session3->is_active()); - EXPECT_FALSE(session3->IsClosed()); + EXPECT_TRUE(session3->IsAvailable()); // Should not do anything, all are active spdy_session_pool_->CloseCurrentIdleSessions(); EXPECT_TRUE(session1->is_active()); - EXPECT_FALSE(session1->IsClosed()); + EXPECT_TRUE(session1->IsAvailable()); EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); EXPECT_TRUE(session3->is_active()); - EXPECT_FALSE(session3->IsClosed()); + EXPECT_TRUE(session3->IsAvailable()); // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active @@ -217,32 +218,40 @@ TEST_P(SpdySessionPoolTest, CloseCurrentIdleSessions) { session3->CloseCreatedStream(spdy_stream3, OK); EXPECT_EQ(NULL, spdy_stream3.get()); EXPECT_FALSE(session1->is_active()); - EXPECT_FALSE(session1->IsClosed()); + EXPECT_TRUE(session1->IsAvailable()); EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); EXPECT_FALSE(session3->is_active()); - EXPECT_FALSE(session3->IsClosed()); + EXPECT_TRUE(session3->IsAvailable()); // Should close session 1 and 3, 2 should be left open spdy_session_pool_->CloseCurrentIdleSessions(); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(session1 == NULL); EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); EXPECT_TRUE(session3 == NULL); // Should not do anything spdy_session_pool_->CloseCurrentIdleSessions(); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); // Make 2 not active session2->CloseCreatedStream(spdy_stream2, OK); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_EQ(NULL, spdy_stream2.get()); EXPECT_FALSE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); // This should close session 2 spdy_session_pool_->CloseCurrentIdleSessions(); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(session2 == NULL); } @@ -420,8 +429,9 @@ void SpdySessionPoolTest::RunIPPoolingTest( switch (close_sessions_type) { case SPDY_POOL_CLOSE_SESSIONS_MANUALLY: session->CloseSessionOnError(ERR_ABORTED, std::string()); - EXPECT_TRUE(session == NULL); session2->CloseSessionOnError(ERR_ABORTED, std::string()); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(session == NULL); EXPECT_TRUE(session2 == NULL); break; case SPDY_POOL_CLOSE_CURRENT_SESSIONS: @@ -449,27 +459,30 @@ void SpdySessionPoolTest::RunIPPoolingTest( // Check spdy_session and spdy_session1 are not closed. EXPECT_FALSE(session->is_active()); - EXPECT_FALSE(session->IsClosed()); + EXPECT_TRUE(session->IsAvailable()); EXPECT_FALSE(session1->is_active()); - EXPECT_FALSE(session1->IsClosed()); + EXPECT_TRUE(session1->IsAvailable()); EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); // Test that calling CloseIdleSessions, does not cause a crash. // http://crbug.com/181400 spdy_session_pool_->CloseCurrentIdleSessions(); + base::MessageLoop::current()->RunUntilIdle(); // Verify spdy_session and spdy_session1 are closed. EXPECT_TRUE(session == NULL); EXPECT_TRUE(session1 == NULL); EXPECT_TRUE(session2->is_active()); - EXPECT_FALSE(session2->IsClosed()); + EXPECT_TRUE(session2->IsAvailable()); spdy_stream2->Cancel(); EXPECT_EQ(NULL, spdy_stream.get()); EXPECT_EQ(NULL, spdy_stream1.get()); EXPECT_EQ(NULL, spdy_stream2.get()); + session2->CloseSessionOnError(ERR_ABORTED, std::string()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session2 == NULL); break; } @@ -492,6 +505,93 @@ TEST_P(SpdySessionPoolTest, IPPoolingCloseIdleSessions) { RunIPPoolingTest(SPDY_POOL_CLOSE_IDLE_SESSIONS); } +// Construct a Pool with SpdySessions in various availability states. Simulate +// an IP address change. Ensure sessions gracefully shut down. Regression test +// for crbug.com/379469. +TEST_P(SpdySessionPoolTest, IPAddressChanged) { + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, ERR_IO_PENDING) // Stall forever. + }; + session_deps_.host_resolver->set_synchronous_mode(true); + + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + SSLSocketDataProvider ssl(SYNCHRONOUS, OK); + session_deps_.socket_factory->AddSSLSocketDataProvider(&ssl); + + CreateNetworkSession(); + + // Set up session 1: Available, but idle. + const std::string kTestHost1("http://www.a.com"); + HostPortPair test_host_port_pair1(kTestHost1, 80); + SpdySessionKey key1( + test_host_port_pair1, ProxyServer::Direct(), PRIVACY_MODE_DISABLED); + base::WeakPtr<SpdySession> session1 = + CreateInsecureSpdySession(http_session_, key1, BoundNetLog()); + EXPECT_TRUE(session1->IsAvailable()); + + // Set up session 2: Going away, but with an active stream. + session_deps_.socket_factory->AddSocketDataProvider(&data); + const std::string kTestHost2("http://www.b.com"); + HostPortPair test_host_port_pair2(kTestHost2, 80); + SpdySessionKey key2( + test_host_port_pair2, ProxyServer::Direct(), PRIVACY_MODE_DISABLED); + base::WeakPtr<SpdySession> session2 = + CreateInsecureSpdySession(http_session_, key2, BoundNetLog()); + GURL url2(kTestHost2); + base::WeakPtr<SpdyStream> spdy_stream2 = CreateStreamSynchronously( + SPDY_BIDIRECTIONAL_STREAM, session2, url2, MEDIUM, BoundNetLog()); + test::StreamDelegateDoNothing delegate2(spdy_stream2); + spdy_stream2->SetDelegate(&delegate2); + + scoped_ptr<SpdyHeaderBlock> headers( + SpdyTestUtil(GetParam()).ConstructGetHeaderBlock(url2.spec())); + spdy_stream2->SendRequestHeaders(headers.Pass(), NO_MORE_DATA_TO_SEND); + EXPECT_TRUE(spdy_stream2->HasUrlFromHeaders()); + + session2->MakeUnavailable(); + EXPECT_TRUE(session2->IsGoingAway()); + + // Set up session 3: Draining. + session_deps_.socket_factory->AddSocketDataProvider(&data); + const std::string kTestHost3("http://www.c.com"); + HostPortPair test_host_port_pair3(kTestHost3, 80); + SpdySessionKey key3( + test_host_port_pair3, ProxyServer::Direct(), PRIVACY_MODE_DISABLED); + base::WeakPtr<SpdySession> session3 = + CreateInsecureSpdySession(http_session_, key3, BoundNetLog()); + + session3->CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Error!"); + EXPECT_TRUE(session3->IsDraining()); + + spdy_session_pool_->OnIPAddressChanged(); + +#if defined(OS_ANDROID) || defined(OS_WIN) || defined(OS_IOS) + // TODO(jgraettinger): This should be draining when crbug.com/324653 is fixed. + EXPECT_TRUE(session1->IsGoingAway()); + EXPECT_TRUE(session2->IsGoingAway()); + EXPECT_TRUE(session3->IsDraining()); + + EXPECT_FALSE(delegate2.StreamIsClosed()); + + session1->CloseSessionOnError(ERR_ABORTED, "Closing"); + session2->CloseSessionOnError(ERR_ABORTED, "Closing"); + + EXPECT_TRUE(delegate2.StreamIsClosed()); + EXPECT_EQ(ERR_ABORTED, delegate2.WaitForClose()); +#else + EXPECT_TRUE(session1->IsDraining()); + EXPECT_TRUE(session2->IsDraining()); + EXPECT_TRUE(session3->IsDraining()); + + EXPECT_TRUE(delegate2.StreamIsClosed()); + EXPECT_EQ(ERR_NETWORK_CHANGED, delegate2.WaitForClose()); +#endif // defined(OS_ANDROID) || defined(OS_WIN) || defined(OS_IOS) +} + } // namespace } // namespace net diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index 1f7ca60..dd86791 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -282,8 +282,7 @@ TEST_P(SpdySessionTest, PendingStreamCancellingAnother) { EXPECT_EQ(ERR_ABORTED, callback1.WaitForResult()); } -// A session receiving a GOAWAY frame with no active streams should -// immediately close. +// A session receiving a GOAWAY frame with no active streams should close. TEST_P(SpdySessionTest, GoAwayWithNoActiveStreams) { session_deps_.host_resolver->set_synchronous_mode(true); @@ -307,9 +306,8 @@ TEST_P(SpdySessionTest, GoAwayWithNoActiveStreams) { // Read and process the GOAWAY frame. data.RunFor(1); - EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); - + base::RunLoop().RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -409,12 +407,13 @@ TEST_P(SpdySessionTest, GoAwayWithActiveStreams) { EXPECT_EQ(NULL, spdy_stream2.get()); EXPECT_TRUE(session->IsStreamActive(1)); - EXPECT_FALSE(session->IsClosed()); + EXPECT_TRUE(session->IsGoingAway()); // Should close the session. spdy_stream1->Close(); EXPECT_EQ(NULL, spdy_stream1.get()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -489,13 +488,12 @@ TEST_P(SpdySessionTest, GoAwayTwice) { EXPECT_FALSE(session->IsStreamActive(3)); EXPECT_EQ(NULL, spdy_stream2.get()); EXPECT_TRUE(session->IsStreamActive(1)); - - EXPECT_FALSE(session->IsClosed()); + EXPECT_TRUE(session->IsGoingAway()); // Read and process the second GOAWAY frame, which should close the // session. data.RunFor(1); - + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -568,12 +566,79 @@ TEST_P(SpdySessionTest, GoAwayWithActiveStreamsThenClose) { EXPECT_FALSE(session->IsStreamActive(3)); EXPECT_EQ(NULL, spdy_stream2.get()); EXPECT_TRUE(session->IsStreamActive(1)); - - EXPECT_FALSE(session->IsClosed()); + EXPECT_TRUE(session->IsGoingAway()); session->CloseSessionOnError(ERR_ABORTED, "Aborting session"); - EXPECT_EQ(NULL, spdy_stream1.get()); + + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_TRUE(session == NULL); +} + +// Process a joint read buffer which causes the session to begin draining, and +// then processes a GOAWAY. The session should gracefully drain. Regression test +// for crbug.com/379469 +TEST_P(SpdySessionTest, GoAwayWhileDraining) { + session_deps_.host_resolver->set_synchronous_mode(true); + + scoped_ptr<SpdyFrame> req( + spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, MEDIUM, true)); + MockWrite writes[] = { + CreateMockWrite(*req, 0), + }; + + scoped_ptr<SpdyFrame> resp(spdy_util_.ConstructSpdyGetSynReply(NULL, 0, 1)); + scoped_ptr<SpdyFrame> goaway(spdy_util_.ConstructSpdyGoAway(1)); + scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); + size_t joint_size = goaway->size() * 2 + body->size(); + + // Compose interleaved |goaway| and |body| frames into a single read. + scoped_ptr<char[]> buffer(new char[joint_size]); + { + size_t out = 0; + memcpy(&buffer[out], goaway->data(), goaway->size()); + out += goaway->size(); + memcpy(&buffer[out], body->data(), body->size()); + out += body->size(); + memcpy(&buffer[out], goaway->data(), goaway->size()); + out += goaway->size(); + ASSERT_EQ(out, joint_size); + } + SpdyFrame joint_frames(buffer.get(), joint_size, false); + + MockRead reads[] = { + CreateMockRead(*resp, 1), CreateMockRead(joint_frames, 2), + MockRead(ASYNC, 0, 3) // EOF + }; + + MockConnect connect_data(SYNCHRONOUS, OK); + DeterministicSocketData data( + reads, arraysize(reads), writes, arraysize(writes)); + data.set_connect_data(connect_data); + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + + CreateDeterministicNetworkSession(); + base::WeakPtr<SpdySession> session = + CreateInsecureSpdySession(http_session_, key_, BoundNetLog()); + + GURL url(kDefaultURL); + base::WeakPtr<SpdyStream> spdy_stream = CreateStreamSynchronously( + SPDY_REQUEST_RESPONSE_STREAM, session, url, MEDIUM, BoundNetLog()); + test::StreamDelegateDoNothing delegate(spdy_stream); + spdy_stream->SetDelegate(&delegate); + + scoped_ptr<SpdyHeaderBlock> headers( + spdy_util_.ConstructGetHeaderBlock(url.spec())); + spdy_stream->SendRequestHeaders(headers.Pass(), NO_MORE_DATA_TO_SEND); + EXPECT_TRUE(spdy_stream->HasUrlFromHeaders()); + + data.RunFor(3); + base::MessageLoop::current()->RunUntilIdle(); + + // Stream and session closed gracefully. + EXPECT_TRUE(delegate.StreamIsClosed()); + EXPECT_EQ(OK, delegate.WaitForClose()); + EXPECT_EQ(kUploadData, delegate.TakeReceivedData()); EXPECT_TRUE(session == NULL); } @@ -702,7 +767,7 @@ TEST_P(SpdySessionTest, SynStreamAfterGoAway) { // Read and process the SYN_STREAM frame, the subsequent RST_STREAM, // and EOF. data.RunFor(3); - + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -761,7 +826,7 @@ TEST_P(SpdySessionTest, NetworkChangeWithActiveStreams) { // pre-existing stream is still active. EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); - EXPECT_FALSE(session->IsClosed()); + EXPECT_TRUE(session->IsGoingAway()); EXPECT_TRUE(session->IsStreamActive(1)); @@ -770,6 +835,7 @@ TEST_P(SpdySessionTest, NetworkChangeWithActiveStreams) { #endif EXPECT_EQ(NULL, spdy_stream.get()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -1042,6 +1108,7 @@ TEST_P(SpdySessionTest, StreamIdSpaceExhausted) { EXPECT_EQ(kUploadData, delegate2.TakeReceivedData()); // Session was destroyed. + base::MessageLoop::current()->RunUntilIdle(); EXPECT_FALSE(session.get()); } @@ -1195,6 +1262,7 @@ TEST_P(SpdySessionTest, DeleteExpiredPushStreams) { // Read and process EOF. data.RunFor(1); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -1203,14 +1271,18 @@ TEST_P(SpdySessionTest, FailedPing) { MockConnect connect_data(SYNCHRONOUS, OK); MockRead reads[] = { - MockRead(ASYNC, 0, 0, 0) // EOF + MockRead(SYNCHRONOUS, ERR_IO_PENDING) // Stall forever. }; scoped_ptr<SpdyFrame> write_ping(spdy_util_.ConstructSpdyPing(1, false)); - DeterministicSocketData data(reads, arraysize(reads), NULL, 0); + MockWrite writes[] = { + CreateMockWrite(*write_ping), + }; + StaticSocketDataProvider data( + reads, arraysize(reads), writes, arraysize(writes)); data.set_connect_data(connect_data); - session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); + session_deps_.socket_factory->AddSocketDataProvider(&data); - CreateDeterministicNetworkSession(); + CreateNetworkSession(); base::WeakPtr<SpdySession> session = CreateInsecureSpdySession(http_session_, key_, BoundNetLog()); @@ -1232,7 +1304,7 @@ TEST_P(SpdySessionTest, FailedPing) { EXPECT_TRUE(session->check_ping_status_pending()); // Assert session is not closed. - EXPECT_FALSE(session->IsClosed()); + EXPECT_TRUE(session->IsAvailable()); EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_)); @@ -1241,6 +1313,7 @@ TEST_P(SpdySessionTest, FailedPing) { base::TimeTicks now = base::TimeTicks::Now(); session->last_activity_time_ = now - base::TimeDelta::FromSeconds(1); session->CheckPingStatus(now); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); @@ -1304,6 +1377,11 @@ TEST_P(SpdySessionTest, OnSettings) { EXPECT_EQ(OK, stream_releaser.WaitForResult()); data.RunFor(1); + if (spdy_util_.spdy_version() >= SPDY4) { + // Allow the SETTINGS+ACK to write, so the session finishes draining. + data.RunFor(1); + } + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -1583,7 +1661,7 @@ TEST_P(SpdySessionTest, Initialize) { EXPECT_NE(log.bound().source().id, socket_source.id); } -TEST_P(SpdySessionTest, CloseSessionOnError) { +TEST_P(SpdySessionTest, NetLogOnSessionGoaway) { session_deps_.host_resolver->set_synchronous_mode(true); MockConnect connect_data(SYNCHRONOUS, OK); @@ -1625,6 +1703,53 @@ TEST_P(SpdySessionTest, CloseSessionOnError) { CapturingNetLog::CapturedEntry entry = entries[pos]; int error_code = 0; ASSERT_TRUE(entry.GetNetErrorCode(&error_code)); + EXPECT_EQ(OK, error_code); + } else { + ADD_FAILURE(); + } +} + +TEST_P(SpdySessionTest, NetLogOnSessionEOF) { + session_deps_.host_resolver->set_synchronous_mode(true); + + MockConnect connect_data(SYNCHRONOUS, OK); + MockRead reads[] = { + MockRead(SYNCHRONOUS, 0, 0) // EOF + }; + + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps_.socket_factory->AddSocketDataProvider(&data); + + CreateNetworkSession(); + + CapturingBoundNetLog log; + base::WeakPtr<SpdySession> session = + CreateInsecureSpdySession(http_session_, key_, log.bound()); + EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_)); + + // Flush the read completion task. + base::MessageLoop::current()->RunUntilIdle(); + + EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); + EXPECT_TRUE(session == NULL); + + // Check that the NetLog was filled reasonably. + net::CapturingNetLog::CapturedEntryList entries; + log.GetEntries(&entries); + EXPECT_LT(0u, entries.size()); + + // Check that we logged SPDY_SESSION_CLOSE correctly. + int pos = + net::ExpectLogContainsSomewhere(entries, + 0, + net::NetLog::TYPE_SPDY_SESSION_CLOSE, + net::NetLog::PHASE_NONE); + + if (pos < static_cast<int>(entries.size())) { + CapturingNetLog::CapturedEntry entry = entries[pos]; + int error_code = 0; + ASSERT_TRUE(entry.GetNetErrorCode(&error_code)); EXPECT_EQ(ERR_CONNECTION_CLOSED, error_code); } else { ADD_FAILURE(); @@ -1857,6 +1982,7 @@ TEST_P(SpdySessionTest, CloseSessionWithTwoCreatedSelfClosingStreams) { EXPECT_TRUE(delegate1.StreamIsClosed()); EXPECT_TRUE(delegate2.StreamIsClosed()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -1930,6 +2056,7 @@ TEST_P(SpdySessionTest, CloseSessionWithTwoCreatedMutuallyClosingStreams) { EXPECT_TRUE(delegate1.StreamIsClosed()); EXPECT_TRUE(delegate2.StreamIsClosed()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -2011,6 +2138,7 @@ TEST_P(SpdySessionTest, CloseSessionWithTwoActivatedSelfClosingStreams) { EXPECT_TRUE(delegate1.StreamIsClosed()); EXPECT_TRUE(delegate2.StreamIsClosed()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -2094,6 +2222,7 @@ TEST_P(SpdySessionTest, CloseSessionWithTwoActivatedMutuallyClosingStreams) { EXPECT_TRUE(delegate1.StreamIsClosed()); EXPECT_TRUE(delegate2.StreamIsClosed()); + base::MessageLoop::current()->RunUntilIdle(); EXPECT_TRUE(session == NULL); } @@ -2124,12 +2253,14 @@ TEST_P(SpdySessionTest, CloseActivatedStreamThatClosesSession) { scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, MEDIUM, true)); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_CANCEL)); MockWrite writes[] = { - CreateMockWrite(*req, 0), + CreateMockWrite(*req, 0), CreateMockWrite(*rst, 1), }; MockRead reads[] = { - MockRead(ASYNC, 0, 1) // EOF + MockRead(ASYNC, 0, 2) // EOF }; DeterministicSocketData data(reads, arraysize(reads), writes, arraysize(writes)); @@ -2166,8 +2297,12 @@ TEST_P(SpdySessionTest, CloseActivatedStreamThatClosesSession) { // session). spdy_stream->Cancel(); + data.RunFor(1); + base::MessageLoop::current()->RunUntilIdle(); + EXPECT_EQ(NULL, spdy_stream.get()); EXPECT_TRUE(delegate.StreamIsClosed()); + EXPECT_TRUE(session == NULL); } @@ -4308,6 +4443,7 @@ TEST_P(SpdySessionTest, SendWindowSizeIncreaseWithDeletedSession) { // Close the session (since we can't do it from within the delegate // method, since it's in the stream's loop). session->CloseSessionOnError(ERR_CONNECTION_CLOSED, "Closing session"); + base::RunLoop().RunUntilIdle(); EXPECT_TRUE(session == NULL); EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); |