diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-07-16 21:34:00 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-07-16 21:34:00 +0000 |
commit | 63cf10d7652388b9dd9d45f5648cbfaaa63f2232 (patch) | |
tree | 3d747f622ea8322ddecbe3eb99af9948a76c2e26 | |
parent | 3699af9fae872b8b9deee04cc7ccd862b82d464e (diff) | |
download | chromium_src-63cf10d7652388b9dd9d45f5648cbfaaa63f2232.zip chromium_src-63cf10d7652388b9dd9d45f5648cbfaaa63f2232.tar.gz chromium_src-63cf10d7652388b9dd9d45f5648cbfaaa63f2232.tar.bz2 |
[SPDY] Refactor SpdySession state machine
Represent a SpdySession's state by three variables of type:
AvailabilityState, ReadState (the old State type), and WriteState.
Make some important functions, including the BufferedSpdyWriter
callbacks, do nothing if availability_state_ == STATE_CLOSED.
Rename the current state machine from DoLoop etc. to DoReadLoop.
Refactor the write state machine to be parallel to the read state machine.
Keep track of the number of bytes read without yielding in DoReadLoop
itself. This fixes a slight bug where the counter isn't reset if the loop
yields due to ERR_IO_PENDING being returned.
The new write state machine (DoWriteLoop, etc.) has almost the same
behavior as the old one, except if a write completes asynchronously
the write loop is immediately re-entered instead of via a posted task.
Fix tests to match new write loop behavior.
BUG=255701
R=rch@chromium.org, rtenneti@chromium.org
Review URL: https://codereview.chromium.org/18143005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@211852 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/spdy/spdy_http_stream.cc | 2 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream_unittest.cc | 24 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_unittest.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 449 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 93 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 102 | ||||
-rw-r--r-- | net/spdy/spdy_test_util_common.cc | 3 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.h | 4 |
10 files changed, 401 insertions, 304 deletions
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index 1e7afc7..4a44c23 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -54,8 +54,6 @@ int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info, const BoundNetLog& stream_net_log, const CompletionCallback& callback) { DCHECK(!stream_.get()); - if (spdy_session_->IsClosed()) - return ERR_CONNECTION_CLOSED; request_info_ = request_info; if (request_info_->method == "GET") { diff --git a/net/spdy/spdy_http_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc index f91594e..565de15 100644 --- a/net/spdy/spdy_http_stream_unittest.cc +++ b/net/spdy/spdy_http_stream_unittest.cc @@ -254,14 +254,6 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { scoped_ptr<SpdyHttpStream> http_stream1( new SpdyHttpStream(session_.get(), true)); - ASSERT_EQ(OK, - http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY, - BoundNetLog(), - CompletionCallback())); - EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1, - callback1.callback())); - EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); - HttpRequestInfo request2; request2.method = "GET"; request2.url = GURL("http://www.google.com/"); @@ -271,15 +263,15 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { scoped_ptr<SpdyHttpStream> http_stream2( new SpdyHttpStream(session_.get(), true)); + // First write. ASSERT_EQ(OK, - http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY, + http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY, BoundNetLog(), CompletionCallback())); - EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2, - callback2.callback())); + EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1, + callback1.callback())); EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); - // First write. deterministic_data()->RunFor(1); EXPECT_LE(0, callback1.WaitForResult()); @@ -290,6 +282,14 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { EXPECT_FALSE(http_stream2->GetLoadTimingInfo(&load_timing_info2)); // Second write. + ASSERT_EQ(OK, + http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY, + BoundNetLog(), + CompletionCallback())); + EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2, + callback2.callback())); + EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); + deterministic_data()->RunFor(1); EXPECT_LE(0, callback2.WaitForResult()); TestLoadTimingReused(*http_stream2); diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index db1dc7d..36d80c6 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -2016,19 +2016,21 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply( spdy_util_.ConstructSpdyPostSynReply(NULL, 0)); - scoped_ptr<SpdyFrame> stream_body(spdy_util_.ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { CreateMockRead(*stream_reply, 1), - MockRead(ASYNC, 0, 3) // EOF + MockRead(ASYNC, 0, 4) // EOF }; scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyPost( kRequestUrl, 1, kUploadDataSize, LOWEST, NULL, 0)); scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR)); MockWrite writes[] = { CreateMockWrite(*req, 0), CreateMockWrite(*body, 2), + CreateMockWrite(*rst, 3) }; DeterministicSocketData data(reads, arraysize(reads), @@ -2045,7 +2047,7 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) { &CreatePostRequest(), callback.callback(), BoundNetLog()); EXPECT_EQ(ERR_IO_PENDING, rv); - data.RunFor(2); + data.RunFor(4); rv = callback.WaitForResult(); EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); data.RunFor(1); @@ -3531,7 +3533,11 @@ TEST_P(SpdyNetworkTransactionTest, CorruptFrameSessionError) { for (size_t i = 0; i < ARRAYSIZE_UNSAFE(test_cases); ++i) { scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true)); - MockWrite writes[] = { CreateMockWrite(*req), MockWrite(ASYNC, 0, 0) // EOF + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR)); + MockWrite writes[] = { + CreateMockWrite(*req), + CreateMockWrite(*rst), }; scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); diff --git a/net/spdy/spdy_proxy_client_socket_unittest.cc b/net/spdy/spdy_proxy_client_socket_unittest.cc index 5af767b..95ca4b5 100644 --- a/net/spdy/spdy_proxy_client_socket_unittest.cc +++ b/net/spdy/spdy_proxy_client_socket_unittest.cc @@ -1176,7 +1176,7 @@ TEST_P(SpdyProxyClientSocketTest, WritePendingOnClose) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); MockWrite writes[] = { CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(ASYNC, ERR_IO_PENDING, 2), + MockWrite(ASYNC, ERR_ABORTED, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); @@ -1267,7 +1267,7 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePending) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); MockWrite writes[] = { CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(ASYNC, ERR_IO_PENDING, 2), + MockWrite(ASYNC, ERR_ABORTED, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); @@ -1390,7 +1390,7 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePendingDelete) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); MockWrite writes[] = { CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(ASYNC, ERR_IO_PENDING, 2), + MockWrite(ASYNC, ERR_ABORTED, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index b607796..879d645 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -9,6 +9,7 @@ #include "base/basictypes.h" #include "base/bind.h" +#include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "base/logging.h" #include "base/message_loop.h" @@ -352,14 +353,14 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, http_server_properties_(http_server_properties), read_buffer_(new IOBuffer(kReadBufferSize)), stream_hi_water_mark_(kFirstStreamId), - write_pending_(false), in_flight_write_frame_type_(DATA), in_flight_write_frame_size_(0), - delayed_write_pending_(false), is_secure_(false), certificate_error_code_(OK), - error_(OK), - state_(STATE_IDLE), + availability_state_(STATE_AVAILABLE), + read_state_(READ_STATE_DO_READ), + write_state_(WRITE_STATE_DO_WRITE), + error_on_close_(OK), max_concurrent_streams_(initial_max_concurrent_streams == 0 ? kInitialMaxConcurrentStreams : initial_max_concurrent_streams), @@ -371,7 +372,6 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, streams_pushed_and_claimed_count_(0), streams_abandoned_count_(0), total_bytes_received_(0), - bytes_read_(0), sent_settings_(false), received_settings_(false), stalled_streams_(0), @@ -412,8 +412,9 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, } SpdySession::~SpdySession() { - if (state_ != STATE_CLOSED) { - state_ = STATE_CLOSED; + if (availability_state_ != STATE_CLOSED) { + availability_state_ = STATE_CLOSED; + error_on_close_ = ERR_ABORTED; // Cleanup all the streams. CloseAllStreams(ERR_ABORTED); @@ -451,7 +452,10 @@ Error SpdySession::InitializeWithSocket( base::StatsCounter spdy_sessions("spdy.sessions"); spdy_sessions.Increment(); - state_ = STATE_DO_READ; + DCHECK_EQ(availability_state_, STATE_AVAILABLE); + DCHECK_EQ(read_state_, READ_STATE_DO_READ); + DCHECK_EQ(write_state_, WRITE_STATE_DO_WRITE); + connection_ = connection.Pass(); is_secure_ = is_secure; certificate_error_code_ = certificate_error_code; @@ -500,15 +504,12 @@ Error SpdySession::InitializeWithSocket( NetLog::TYPE_SPDY_SESSION_INITIALIZED, connection_->socket()->NetLog().source().ToEventParametersCallback()); - int error = DoLoop(OK); + int error = DoReadLoop(READ_STATE_DO_READ, OK); if (error == ERR_IO_PENDING) error = OK; if (error == OK) { connection_->AddLayeredPool(this); SendInitialSettings(); - // Write out any data that we might have to send, such as the - // settings frame. - WriteSocketLater(); spdy_session_pool_ = spdy_session_pool; } return static_cast<Error>(error); @@ -518,7 +519,7 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { if (!verify_domain_authentication_) return true; - if (!IsConnected()) + if (availability_state_ == STATE_CLOSED) return false; SSLInfo ssl_info; @@ -538,7 +539,8 @@ int SpdySession::GetPushStream( const GURL& url, base::WeakPtr<SpdyStream>* stream, const BoundNetLog& stream_net_log) { - CHECK_NE(state_, STATE_CLOSED); + if (availability_state_ == STATE_CLOSED) + return ERR_CONNECTION_CLOSED; stream->reset(); @@ -565,6 +567,11 @@ int SpdySession::GetPushStream( int SpdySession::TryCreateStream(SpdyStreamRequest* request, base::WeakPtr<SpdyStream>* stream) { + // TODO(akalin): Also refuse to create the stream when + // |availability_state_| == STATE_GOING_AWAY. + if (availability_state_ == STATE_CLOSED) + return ERR_CONNECTION_CLOSED; + if (!max_concurrent_streams_ || (active_streams_.size() + created_streams_.size() < max_concurrent_streams_)) { @@ -582,6 +589,11 @@ int SpdySession::CreateStream(const SpdyStreamRequest& request, DCHECK_GE(request.priority(), MINIMUM_PRIORITY); DCHECK_LT(request.priority(), NUM_PRIORITIES); + // TODO(akalin): Also refuse to create the stream when + // |availability_state_| == STATE_GOING_AWAY. + if (availability_state_ == STATE_CLOSED) + return ERR_CONNECTION_CLOSED; + // Make sure that we don't try to send https/wss over an unauthenticated, but // encrypted SSL socket. if (is_secure_ && certificate_error_code_ != OK && @@ -956,26 +968,11 @@ bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { } LoadState SpdySession::GetLoadState() const { - // NOTE: The application only queries the LoadState via the - // SpdyNetworkTransaction, and details are only needed when - // we're in the process of connecting. - - // If we're connecting, defer to the connection to give us the actual - // LoadState. - if (state_ == STATE_CONNECTING) - return connection_->GetLoadState(); - // Just report that we're idle since the session could be doing // many things concurrently. return LOAD_STATE_IDLE; } -void SpdySession::OnReadComplete(int bytes_read) { - DCHECK_NE(state_, STATE_DO_READ); - DoLoop(bytes_read); -} - - void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, int status) { // TODO(mbelshe): We should send a RST_STREAM control frame here @@ -1043,13 +1040,13 @@ void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id, static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); } -void SpdySession::StartRead() { - DCHECK_NE(state_, STATE_DO_READ_COMPLETE); - DoLoop(OK); -} +int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { + DCHECK_EQ(read_state_, expected_read_state); -int SpdySession::DoLoop(int result) { - bytes_read_ = 0; + if (availability_state_ == STATE_CLOSED) { + DCHECK_LT(error_on_close_, ERR_IO_PENDING); + return error_on_close_; + } // The SpdyFramer will use callbacks onto |this| as it parses frames. // When errors occur, those callbacks can lead to teardown of all references @@ -1057,106 +1054,221 @@ int SpdySession::DoLoop(int result) { // cleanup. scoped_refptr<SpdySession> self(this); - do { - switch (state_) { - case STATE_DO_READ: + int bytes_read_without_yielding = 0; + + // Loop until the session is closed, the read becomes blocked, or + // the read limit is exceeded. + while (true) { + switch (read_state_) { + case READ_STATE_DO_READ: DCHECK_EQ(result, OK); result = DoRead(); break; - case STATE_DO_READ_COMPLETE: + case READ_STATE_DO_READ_COMPLETE: + if (result > 0) + bytes_read_without_yielding += result; result = DoReadComplete(result); break; - case STATE_CLOSED: - result = ERR_CONNECTION_CLOSED; - break; default: - NOTREACHED() << "state_: " << state_; + NOTREACHED() << "read_state_: " << read_state_; break; } - } while (result != ERR_IO_PENDING && state_ != STATE_CLOSED); - DCHECK(result == ERR_IO_PENDING || result == ERR_CONNECTION_CLOSED); - return result; + if (availability_state_ == STATE_CLOSED) { + DCHECK_EQ(result, error_on_close_); + DCHECK_LT(result, ERR_IO_PENDING); + return result; + } + + if (result == ERR_IO_PENDING) + return ERR_IO_PENDING; + + if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { + read_state_ = READ_STATE_DO_READ; + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop), + weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); + return ERR_IO_PENDING; + } + } } int SpdySession::DoRead() { - if (bytes_read_ > kMaxReadBytes) { - state_ = STATE_DO_READ; - base::MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(&SpdySession::StartRead, - weak_factory_.GetWeakPtr())); - return ERR_IO_PENDING; - } + DCHECK_NE(availability_state_, STATE_CLOSED); CHECK(connection_); CHECK(connection_->socket()); - state_ = STATE_DO_READ_COMPLETE; + read_state_ = READ_STATE_DO_READ_COMPLETE; return connection_->socket()->Read( read_buffer_.get(), kReadBufferSize, - base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr())); + base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop), + weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); } int SpdySession::DoReadComplete(int result) { + DCHECK_NE(availability_state_, STATE_CLOSED); + // Parse a frame. For now this code requires that the frame fit into our - // buffer (32KB). + // buffer (kReadBufferSize). // TODO(mbelshe): support arbitrarily large frames! - if (result <= 0) { - // Session is tearing down. - Error error = static_cast<Error>(result); - if (result == 0) { - UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", - total_bytes_received_, 1, 100000000, 50); - error = ERR_CONNECTION_CLOSED; - } - CloseSessionOnError(error, "result is <= 0."); + if (result == 0) { + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", + total_bytes_received_, 1, 100000000, 50); + CloseSessionOnError(ERR_CONNECTION_CLOSED, "Connection closed"); + DCHECK_EQ(availability_state_, STATE_CLOSED); + DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED); return ERR_CONNECTION_CLOSED; } + if (result < 0) { + CloseSessionOnError(static_cast<Error>(result), "result is < 0."); + DCHECK_EQ(availability_state_, STATE_CLOSED); + DCHECK_EQ(error_on_close_, result); + return result; + } + total_bytes_received_ += result; - bytes_read_ += result; last_activity_time_ = base::TimeTicks::Now(); DCHECK(buffered_spdy_framer_.get()); char* data = read_buffer_->data(); - while (result && - buffered_spdy_framer_->error_code() == - SpdyFramer::SPDY_NO_ERROR) { - uint32 bytes_processed = - buffered_spdy_framer_->ProcessInput(data, result); + while (result > 0) { + uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); result -= bytes_processed; data += bytes_processed; - } - if (!IsConnected()) - return ERR_CONNECTION_CLOSED; + if (availability_state_ == STATE_CLOSED) { + DCHECK_LT(error_on_close_, ERR_IO_PENDING); + return error_on_close_; + } - state_ = STATE_DO_READ; + DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); + } + + read_state_ = READ_STATE_DO_READ; return OK; } -void SpdySession::OnWriteComplete(int result) { - // Releasing the in-flight write can have a side-effect of dropping - // the last reference to |this|. Hold a reference through this - // function. +int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { + DCHECK_EQ(write_state_, expected_write_state); + + if (availability_state_ == STATE_CLOSED) { + DCHECK_LT(error_on_close_, ERR_IO_PENDING); + return error_on_close_; + } + + // Releasing the in-flight write in DoWriteComplete() can have a + // side-effect of dropping the last reference to |this|. Hold a + // reference through this function. scoped_refptr<SpdySession> self(this); - DCHECK(write_pending_); + // Loop until the session is closed or the write becomes blocked. + while (true) { + switch (write_state_) { + case WRITE_STATE_DO_WRITE: + DCHECK_EQ(result, OK); + result = DoWrite(); + break; + case WRITE_STATE_DO_WRITE_COMPLETE: + result = DoWriteComplete(result); + break; + default: + NOTREACHED() << "write_state_: " << write_state_; + break; + } + + if (availability_state_ == STATE_CLOSED) { + DCHECK_EQ(result, error_on_close_); + DCHECK_LT(result, ERR_IO_PENDING); + return result; + } + + if (result == ERR_IO_PENDING) + return ERR_IO_PENDING; + } +} + +int SpdySession::DoWrite() { + DCHECK_NE(availability_state_, STATE_CLOSED); + + DCHECK(buffered_spdy_framer_); + if (in_flight_write_) { + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); + } else { + // Grab the next frame to send. + SpdyFrameType frame_type = DATA; + scoped_ptr<SpdyBufferProducer> producer; + base::WeakPtr<SpdyStream> stream; + if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { + DCHECK_EQ(write_state_, WRITE_STATE_DO_WRITE); + return ERR_IO_PENDING; + } + + if (stream.get()) + DCHECK(!stream->IsClosed()); + + // Activate the stream only when sending the SYN_STREAM frame to + // guarantee monotonically-increasing stream IDs. + if (frame_type == SYN_STREAM) { + if (stream.get() && stream->stream_id() == 0) { + scoped_ptr<SpdyStream> owned_stream = + ActivateCreatedStream(stream.get()); + InsertActivatedStream(owned_stream.Pass()); + } else { + NOTREACHED(); + return ERR_UNEXPECTED; + } + } + + in_flight_write_ = producer->ProduceBuffer(); + if (!in_flight_write_) { + NOTREACHED(); + return ERR_UNEXPECTED; + } + in_flight_write_frame_type_ = frame_type; + in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); + DCHECK_GE(in_flight_write_frame_size_, + buffered_spdy_framer_->GetFrameMinimumSize()); + in_flight_write_stream_ = stream; + } + + write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; + + // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems + // with Socket implementations that don't store their IOBuffer + // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). + scoped_refptr<IOBuffer> write_io_buffer = + in_flight_write_->GetIOBufferForRemainingData(); + // We keep |in_flight_write_| alive until OnWriteComplete(), so it's + // okay to pass in |write_io_buffer| since the socket won't use it + // past OnWriteComplete(). + return connection_->socket()->Write( + write_io_buffer.get(), + in_flight_write_->GetRemainingSize(), + base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop), + weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); +} + +int SpdySession::DoWriteComplete(int result) { + DCHECK_NE(availability_state_, STATE_CLOSED); + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); last_activity_time_ = base::TimeTicks::Now(); - write_pending_ = false; if (result < 0) { + DCHECK_NE(result, ERR_IO_PENDING); in_flight_write_.reset(); in_flight_write_frame_type_ = DATA; in_flight_write_frame_size_ = 0; in_flight_write_stream_.reset(); CloseSessionOnError(static_cast<Error>(result), "Write error"); - return; + DCHECK_EQ(error_on_close_, result); + return result; } // It should not be possible to have written more bytes than our @@ -1186,108 +1298,11 @@ void SpdySession::OnWriteComplete(int result) { } } - // Write more data. We're already in a continuation, so we can go - // ahead and write it immediately (without going back to the message - // loop). - WriteSocketLater(); -} - -void SpdySession::WriteSocketLater() { - if (delayed_write_pending_) - return; - - if (!IsConnected()) - return; - - delayed_write_pending_ = true; - base::MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(&SpdySession::WriteSocket, weak_factory_.GetWeakPtr())); -} - -void SpdySession::WriteSocket() { - // This function should only be called via WriteSocketLater. - DCHECK(delayed_write_pending_); - delayed_write_pending_ = false; - - // If the socket isn't connected yet, just wait; we'll get called - // again when the socket connection completes. If the socket is - // closed, just return. - if (!IsConnected()) - return; - - if (write_pending_) // Another write is in progress still. - return; - - // Loop sending frames until we've sent everything or until the write - // returns error (or ERR_IO_PENDING). - DCHECK(buffered_spdy_framer_.get()); - while (true) { - if (in_flight_write_) { - DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); - } else { - // Grab the next frame to send. - SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyBufferProducer> producer; - base::WeakPtr<SpdyStream> stream; - if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) - break; - - if (stream.get()) - DCHECK(!stream->IsClosed()); - - // Activate the stream only when sending the SYN_STREAM frame to - // guarantee monotonically-increasing stream IDs. - if (frame_type == SYN_STREAM) { - if (stream.get() && stream->stream_id() == 0) { - scoped_ptr<SpdyStream> owned_stream = - ActivateCreatedStream(stream.get()); - InsertActivatedStream(owned_stream.Pass()); - } else { - NOTREACHED(); - continue; - } - } - - in_flight_write_ = producer->ProduceBuffer(); - if (!in_flight_write_) { - NOTREACHED(); - continue; - } - in_flight_write_frame_type_ = frame_type; - in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); - DCHECK_GE(in_flight_write_frame_size_, - buffered_spdy_framer_->GetFrameMinimumSize()); - in_flight_write_stream_ = stream; - } - - write_pending_ = true; - // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems - // with Socket implementations that don't store their IOBuffer - // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). - scoped_refptr<IOBuffer> write_io_buffer = - in_flight_write_->GetIOBufferForRemainingData(); - // We keep |in_flight_write_| alive until OnWriteComplete(), so - // it's okay to use GetIOBufferForRemainingData() since the socket - // doesn't use the IOBuffer past OnWriteComplete(). - int rv = connection_->socket()->Write( - write_io_buffer.get(), - in_flight_write_->GetRemainingSize(), - base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); - // Avoid persisting |write_io_buffer| past |in_flight_write_|'s - // lifetime (which will end if OnWriteComplete() is called below). - write_io_buffer = NULL; - if (rv == ERR_IO_PENDING) - break; - - // We sent the frame successfully. - OnWriteComplete(rv); - - // TODO(mbelshe): Test this error case. Maybe we should mark the socket - // as in an error state. - if (rv < 0) - break; - } + // This has to go after calling OnFrameWriteComplete() so that if + // that ends up calling EnqueueWrite(), it properly avoids posting + // another DoWriteLoop task. + write_state_ = WRITE_STATE_DO_WRITE; + return OK; } void SpdySession::CloseAllStreamsAfter(SpdyStreamId last_good_stream_id, @@ -1359,7 +1374,7 @@ void SpdySession::CloseSessionOnError(Error err, // to |this|. Hold a reference through this function. scoped_refptr<SpdySession> self(this); - DCHECK_LT(err, OK); + DCHECK_LT(err, ERR_IO_PENDING); net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_CLOSE, base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); @@ -1367,12 +1382,12 @@ void SpdySession::CloseSessionOnError(Error err, // Don't close twice. This can occur because we can have both // a read and a write outstanding, and each can complete with // an error. - if (!IsClosed()) { + if (availability_state_ != STATE_CLOSED) { UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", total_bytes_received_, 1, 100000000, 50); - state_ = STATE_CLOSED; - error_ = err; + availability_state_ = STATE_CLOSED; + error_on_close_ = err; // TODO(akalin): Move this after CloseAllStreams() once we're // owned by the pool. RemoveFromPool(); @@ -1409,7 +1424,7 @@ base::Value* SpdySession::GetInfoAsValue() const { SSLClientSocket::NextProtoToString( connection_->socket()->GetNegotiatedProtocol())); - dict->SetInteger("error", error_); + dict->SetInteger("error", error_on_close_); dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); dict->SetInteger("streams_initiated_count", streams_initiated_count_); @@ -1483,8 +1498,19 @@ void SpdySession::EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer, const base::WeakPtr<SpdyStream>& stream) { + if (availability_state_ == STATE_CLOSED) + return; + + bool was_idle = write_queue_.IsEmpty(); write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); - WriteSocketLater(); + // We only want to post a task when the queue was just empty *and* + // we're not being called from a callback from DoWriteComplete(). + if (was_idle && write_state_ == WRITE_STATE_DO_WRITE) { + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop), + weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); + } } void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { @@ -1588,6 +1614,9 @@ ServerBoundCertService* SpdySession::GetServerBoundCertService() const { } void SpdySession::OnError(SpdyFramer::SpdyError error_code) { + if (availability_state_ == STATE_CLOSED) + return; + RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(error_code)); std::string description = base::StringPrintf( @@ -1597,6 +1626,9 @@ void SpdySession::OnError(SpdyFramer::SpdyError error_code) { void SpdySession::OnStreamError(SpdyStreamId stream_id, const std::string& description) { + if (availability_state_ == STATE_CLOSED) + return; + ActiveStreamMap::iterator it = active_streams_.find(stream_id); if (it == active_streams_.end()) { // We still want to send a frame to reset the stream even if we @@ -1613,6 +1645,9 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, const char* data, size_t len, bool fin) { + if (availability_state_ == STATE_CLOSED) + return; + DCHECK_LT(len, 1u << 24); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( @@ -1654,6 +1689,9 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, } void SpdySession::OnSettings(bool clear_persisted) { + if (availability_state_ == STATE_CLOSED) + return; + if (clear_persisted) http_server_properties_->ClearSpdySettings(host_port_pair()); @@ -1668,6 +1706,9 @@ void SpdySession::OnSettings(bool clear_persisted) { void SpdySession::OnSetting(SpdySettingsIds id, uint8 flags, uint32 value) { + if (availability_state_ == STATE_CLOSED) + return; + HandleSetting(id, value); http_server_properties_->SetSpdySetting( host_port_pair(), @@ -1730,6 +1771,12 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, bool fin, bool unidirectional, const SpdyHeaderBlock& headers) { + if (availability_state_ == STATE_CLOSED) + return; + + // TODO(akalin): Reset the stream when |availability_state_| == + // STATE_GOING_AWAY. + base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = base::TimeTicks::Now(); @@ -1891,6 +1938,9 @@ void SpdySession::DeleteExpiredPushedStreams() { void SpdySession::OnSynReply(SpdyStreamId stream_id, bool fin, const SpdyHeaderBlock& headers) { + if (availability_state_ == STATE_CLOSED) + return; + base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = base::TimeTicks::Now(); @@ -1927,6 +1977,9 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id, void SpdySession::OnHeaders(SpdyStreamId stream_id, bool fin, const SpdyHeaderBlock& headers) { + if (availability_state_ == STATE_CLOSED) + return; + if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, @@ -1954,6 +2007,9 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id, void SpdySession::OnRstStream(SpdyStreamId stream_id, SpdyRstStreamStatus status) { + if (availability_state_ == STATE_CLOSED) + return; + std::string description; net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RST_STREAM, @@ -1987,6 +2043,9 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, SpdyGoAwayStatus status) { + if (availability_state_ == STATE_CLOSED) + return; + net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, base::Bind(&NetLogSpdyGoAwayCallback, last_accepted_stream_id, @@ -1999,6 +2058,9 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, } void SpdySession::OnPing(uint32 unique_id) { + if (availability_state_ == STATE_CLOSED) + return; + net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_PING, base::Bind(&NetLogSpdyPingCallback, unique_id, "received")); @@ -2027,6 +2089,9 @@ void SpdySession::OnPing(uint32 unique_id) { void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size) { + if (availability_state_ == STATE_CLOSED) + return; + DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, @@ -2580,7 +2645,7 @@ void SpdySession::ResumeSendStalledStreams() { // have to worry about streams being closed, as well as ourselves // being closed. - while (!IsClosed() && !IsSendStalled()) { + while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { size_t old_size = 0; if (DCHECK_IS_ON()) old_size = GetTotalSize(stream_send_unstall_queue_); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index c2c6cfd..0cca912 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -51,9 +51,9 @@ const int kMaxSpdyFrameChunkSize = (2 * kMss) - 8; // Specifies the maxiumum concurrent streams server could send (via push). const int kMaxConcurrentPushedStreams = 1000; -// Specifies the number of bytes read synchronously (without yielding) if the -// data is available. -const int kMaxReadBytes = 32 * 1024; +// Specifies the maximum number of bytes to read synchronously before +// yielding. +const int kMaxReadBytesWithoutYielding = 32 * 1024; // The initial receive window size for both streams and sessions. const int32 kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB @@ -336,8 +336,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, void SendStreamWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size); - // If session is closed, no new streams/transactions should be created. - bool IsClosed() const { return state_ == STATE_CLOSED; } + // Whether the stream is closed, i.e. it has stopped processing data + // and is about to be destroyed. + bool IsClosed() const { return availability_state_ == STATE_CLOSED; } // Closes this session. This will close all active streams and mark // the session as permanently closed. @@ -386,7 +387,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, } size_t num_created_streams() const { return created_streams_.size(); } - size_t pending_create_stream_queue_size(int priority) const { + size_t pending_create_stream_queue_size(RequestPriority priority) const { DCHECK_LT(priority, NUM_PRIORITIES); return pending_create_stream_queues_[priority].size(); } @@ -501,14 +502,28 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::set<SpdyStream*> CreatedStreamSet; - enum State { - STATE_IDLE, - STATE_CONNECTING, - STATE_DO_READ, - STATE_DO_READ_COMPLETE, + enum AvailabilityState { + // The session is available in its socket pool and can be used + // freely. + STATE_AVAILABLE, + // The session can process data on existing streams but will + // refuse to create new ones. + STATE_GOING_AWAY, + // The session has been closed, is waiting to be deleted, and will + // refuse to process any more data. STATE_CLOSED }; + enum ReadState { + READ_STATE_DO_READ, + READ_STATE_DO_READ_COMPLETE, + }; + + enum WriteState { + WRITE_STATE_DO_WRITE, + WRITE_STATE_DO_WRITE_COMPLETE, + }; + virtual ~SpdySession(); // Called by SpdyStreamRequest to start a request to create a @@ -554,24 +569,19 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, SpdyRstStreamStatus status, const std::string& description); - // Start the DoLoop to read data from socket. - void StartRead(); - - // Try to make progress by reading and processing data. - int DoLoop(int result); - // The implementations of STATE_DO_READ/STATE_DO_READ_COMPLETE state changes - // of the state machine. + // Advance the ReadState state machine. |expected_read_state| is the + // expected starting read state. + int DoReadLoop(ReadState expected_read_state, int result); + // The implementations of the states of the ReadState state machine. int DoRead(); - int DoReadComplete(int bytes_read); - - // Check if session is connected or not. - bool IsConnected() const { - return state_ == STATE_DO_READ || state_ == STATE_DO_READ_COMPLETE; - } + int DoReadComplete(int result); - // IO Callbacks - void OnReadComplete(int result); - void OnWriteComplete(int result); + // Advance the WriteState state machine. |expected_write_state| is + // the expected starting write state. + int DoWriteLoop(WriteState expected_write_state, int result); + // The implementations of the states of the WriteState state machine. + int DoWrite(); + int DoWriteComplete(int result); // Send relevant SETTINGS. This is generally called on connection setup. void SendInitialSettings(); @@ -607,10 +617,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // haven't received any data in |kHungInterval| time period. void CheckPingStatus(base::TimeTicks last_check_time); - // Write current data to the socket. - void WriteSocketLater(); - void WriteSocket(); - // Get a new stream id. int GetNewStreamId(); @@ -884,8 +890,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, SpdyWriteQueue write_queue_; // Data for the frame we are currently sending. - // Whether we have a socket write pending completion. - bool write_pending_; + // The buffer we're currently writing. scoped_ptr<SpdyBuffer> in_flight_write_; // The type of the frame in |in_flight_write_|. @@ -896,9 +901,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // the socket completely. base::WeakPtr<SpdyStream> in_flight_write_stream_; - // Flag if we have a pending message scheduled for WriteSocket. - bool delayed_write_pending_; - // Flag if we're using an SSL connection for this SpdySession. bool is_secure_; @@ -908,11 +910,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Spdy Frame state. scoped_ptr<BufferedSpdyFramer> buffered_spdy_framer_; - // If an error has occurred on the session, the session is effectively - // dead. Record this error here. When no error has occurred, |error_| will - // be OK. - Error error_; - State state_; + // The state variables. + AvailabilityState availability_state_; + ReadState read_state_; + WriteState write_state_; + + // If the session was closed (i.e., |availability_state_| is + // STATE_CLOSED), then |error_on_close_| holds the error with which + // it was closed, which is < ERR_IO_PENDING. Otherwise, it is set to + // OK. + Error error_on_close_; // Limits size_t max_concurrent_streams_; // 0 if no limit @@ -928,10 +935,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // SpdySession. It is used by the |Net.SpdySettingsCwnd...| histograms. int total_bytes_received_; - // |bytes_read_| keeps track of number of bytes read continously in the - // DoLoop() without yielding. - int bytes_read_; - bool sent_settings_; // Did this session send settings when it started. bool received_settings_; // Did this session receive at least one settings // frame. diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index 1b414681..1f6b931 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -357,7 +357,7 @@ TEST_P(SpdySessionTest, ServerPing) { test::StreamDelegateSendImmediate delegate(spdy_stream1, NULL); spdy_stream1->SetDelegate(&delegate); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); @@ -546,7 +546,7 @@ TEST_P(SpdySessionTest, OnSettings) { stream_releaser.MakeCallback(&request))); session = NULL; - EXPECT_EQ(OK, stream_releaser.WaitForResult()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, stream_releaser.WaitForResult()); } // Start with max concurrent streams set to 1 (that is persisted). Receive a @@ -610,7 +610,7 @@ TEST_P(SpdySessionTest, ClearSettings) { BoundNetLog(), stream_releaser.MakeCallback(&request))); - EXPECT_EQ(OK, stream_releaser.WaitForResult()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, stream_releaser.WaitForResult()); // Make sure that persisted data is cleared. EXPECT_EQ(0u, spdy_session_pool_->http_server_properties()->GetSpdySettings( @@ -823,7 +823,7 @@ TEST_P(SpdySessionTest, Initialize) { CreateInsecureSpdySession(http_session_, key_, log.bound()); EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_)); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); net::CapturingNetLog::CapturedEntryList entries; @@ -871,7 +871,7 @@ TEST_P(SpdySessionTest, CloseSessionOnError) { CreateInsecureSpdySession(http_session_, key_, log.bound()); EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_)); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); @@ -1705,15 +1705,16 @@ TEST_P(SpdySessionTest, NeedsCredentials) { EXPECT_EQ(spdy_util_.spdy_version() >= SPDY3, session->NeedsCredentials()); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); session->CloseSessionOnError(ERR_ABORTED, std::string()); } -// Test that SpdySession::DoRead reads data from the socket without yielding. -// This test makes 32k - 1 bytes of data available on the socket for reading. It -// then verifies that it has read all the available data without yielding. +// Test that SpdySession::DoReadLoop reads data from the socket +// without yielding. This test makes 32k - 1 bytes of data available +// on the socket for reading. It then verifies that it has read all +// the available data without yielding. TEST_P(SpdySessionTest, ReadDataWithoutYielding) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -1724,10 +1725,11 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytes); + // Build buffer of size kMaxReadBytesWithoutYielding / 4 + // (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); const int kPayloadSize = - kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); TestDataStream test_stream; scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize)); char* payload_data = payload->data(); @@ -1780,8 +1782,9 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to verify SpdySession::DoRead doesn't post a task. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); + // Set up the TaskObserver to verify SpdySession::DoReadLoop doesn't + // post a task. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1789,7 +1792,8 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { EXPECT_EQ(1u, delegate1.stream_id()); EXPECT_EQ(0u, observer.executed_count()); - // Read all the data and verify SpdySession::DoRead has not posted a task. + // Read all the data and verify SpdySession::DoReadLoop has not + // posted a task. data.RunFor(4); EXPECT_EQ(NULL, spdy_stream1.get()); @@ -1800,10 +1804,11 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { EXPECT_TRUE(data.at_read_eof()); } -// Test that SpdySession::DoRead yields while reading the data. This test makes -// 32k + 1 bytes of data available on the socket for reading. It then verifies -// that DoRead has yielded even though there is data available for it to read -// (i.e, socket()->Read didn't return ERR_IO_PENDING during socket reads). +// Test that SpdySession::DoReadLoop yields while reading the +// data. This test makes 32k + 1 bytes of data available on the socket +// for reading. It then verifies that DoRead has yielded even though +// there is data available for it to read (i.e, socket()->Read didn't +// return ERR_IO_PENDING during socket reads). TEST_P(SpdySessionTest, TestYieldingDuringReadData) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -1814,10 +1819,11 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytes); + // Build buffer of size kMaxReadBytesWithoutYielding / 4 + // (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); const int kPayloadSize = - kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); TestDataStream test_stream; scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize)); char* payload_data = payload->data(); @@ -1870,8 +1876,9 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to verify SpdySession::DoRead posts a task. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); + // Set up the TaskObserver to verify SpdySession::DoReadLoop posts a + // task. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1879,7 +1886,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { EXPECT_EQ(1u, delegate1.stream_id()); EXPECT_EQ(0u, observer.executed_count()); - // Read all the data and verify SpdySession::DoRead has posted a task. + // Read all the data and verify SpdySession::DoReadLoop has posted a + // task. data.RunFor(6); EXPECT_EQ(NULL, spdy_stream1.get()); @@ -1891,17 +1899,17 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { EXPECT_TRUE(data.at_read_eof()); } -// Test that SpdySession::DoRead() tests interactions of yielding + async, -// by doing the following MockReads. +// Test that SpdySession::DoReadLoop() tests interactions of yielding +// + async, by doing the following MockReads. // // MockRead of SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 2K // ASYNC 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 2K. // -// The above reads 26K synchronously. Since that is less that 32K, we will -// attempt to read again. However, that DoRead() will return ERR_IO_PENDING -// (because of async read), so DoRead() will yield. When we come back, DoRead() -// will read the results from the async read, and rest of the data -// synchronously. +// The above reads 26K synchronously. Since that is less that 32K, we +// will attempt to read again. However, that DoRead() will return +// ERR_IO_PENDING (because of async read), so DoReadLoop() will +// yield. When we come back, DoRead() will read the results from the +// async read, and rest of the data synchronously. TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -1912,11 +1920,12 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytes); + // Build buffer of size kMaxReadBytesWithoutYielding / 4 + // (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); TestDataStream test_stream; const int kEightKPayloadSize = - kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); scoped_refptr<net::IOBuffer> eightk_payload( new net::IOBuffer(kEightKPayloadSize)); char* eightk_payload_data = eightk_payload->data(); @@ -1983,8 +1992,9 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to monitor SpdySession::DoRead posting of tasks. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); + // Set up the TaskObserver to monitor SpdySession::DoReadLoop + // posting of tasks. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1992,7 +2002,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { EXPECT_EQ(1u, delegate1.stream_id()); EXPECT_EQ(0u, observer.executed_count()); - // Read all the data and verify SpdySession::DoRead has posted a task. + // Read all the data and verify SpdySession::DoReadLoop has posted a + // task. data.RunFor(12); EXPECT_EQ(NULL, spdy_stream1.get()); @@ -2004,11 +2015,12 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { EXPECT_TRUE(data.at_read_eof()); } -// Send a GoAway frame when SpdySession is in DoLoop. If scoped_refptr to -// <SpdySession> is deleted from SpdySession::DoLoop(), we get a crash because -// GoAway could delete the SpdySession from the SpdySessionPool and the last +// Send a GoAway frame when SpdySession is in DoReadLoop. If +// scoped_refptr to <SpdySession> is deleted from +// SpdySession::DoReadLoop(), we get a crash because GoAway could +// delete the SpdySession from the SpdySessionPool and the last // reference to SpdySession. -TEST_P(SpdySessionTest, GoAwayWhileInDoLoop) { +TEST_P(SpdySessionTest, GoAwayWhileInDoReadLoop) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -2064,8 +2076,8 @@ TEST_P(SpdySessionTest, GoAwayWhileInDoLoop) { data.RunFor(1); EXPECT_EQ(1u, spdy_stream1->stream_id()); - // Only references to SpdySession are held by DoLoop and - // SpdySessionPool. If DoLoop doesn't hold the reference, we get a + // Only references to SpdySession are held by DoReadLoop and + // SpdySessionPool. If DoReadLoop doesn't hold the reference, we get a // crash if SpdySession is deleted from the SpdySessionPool. // Run until GoAway. @@ -2466,7 +2478,7 @@ TEST_P(SpdySessionTest, SendCredentials) { CreateSecureSpdySession(http_session_, key, BoundNetLog()); EXPECT_TRUE(session->NeedsCredentials()); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); session->CloseSessionOnError(ERR_ABORTED, std::string()); diff --git a/net/spdy/spdy_test_util_common.cc b/net/spdy/spdy_test_util_common.cc index ea333c9..ba32a6f 100644 --- a/net/spdy/spdy_test_util_common.cc +++ b/net/spdy/spdy_test_util_common.cc @@ -293,7 +293,8 @@ CompletionCallback StreamReleaserCallback::MakeCallback( void StreamReleaserCallback::OnComplete( SpdyStreamRequest* request, int result) { - request->ReleaseStream()->Cancel(); + if (result == OK) + request->ReleaseStream()->Cancel(); SetResult(result); } diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc index c793aab..2ac4241 100644 --- a/net/spdy/spdy_write_queue.cc +++ b/net/spdy/spdy_write_queue.cc @@ -32,6 +32,14 @@ SpdyWriteQueue::~SpdyWriteQueue() { Clear(); } +bool SpdyWriteQueue::IsEmpty() const { + for (int i = 0; i < NUM_PRIORITIES; i++) { + if (!queue_[i].empty()) + return false; + } + return true; +} + void SpdyWriteQueue::Enqueue(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> frame_producer, diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h index fa194ef..3bceb29 100644 --- a/net/spdy/spdy_write_queue.h +++ b/net/spdy/spdy_write_queue.h @@ -27,6 +27,10 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { SpdyWriteQueue(); ~SpdyWriteQueue(); + // Returns whether there is anything in the write queue, + // i.e. whether the next call to Dequeue will return true. + bool IsEmpty() const; + // Enqueues the given frame producer of the given type at the given // priority associated with the given stream, which may be NULL if // the frame producer is not associated with a stream. If |stream| |