diff options
-rw-r--r-- | net/spdy/spdy_http_stream.cc | 2 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream_unittest.cc | 24 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_unittest.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 449 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 93 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 102 | ||||
-rw-r--r-- | net/spdy/spdy_test_util_common.cc | 3 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.h | 4 |
10 files changed, 401 insertions, 304 deletions
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index 1e7afc7..4a44c23 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -54,8 +54,6 @@ int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info, const BoundNetLog& stream_net_log, const CompletionCallback& callback) { DCHECK(!stream_.get()); - if (spdy_session_->IsClosed()) - return ERR_CONNECTION_CLOSED; request_info_ = request_info; if (request_info_->method == "GET") { diff --git a/net/spdy/spdy_http_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc index f91594e..565de15 100644 --- a/net/spdy/spdy_http_stream_unittest.cc +++ b/net/spdy/spdy_http_stream_unittest.cc @@ -254,14 +254,6 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { scoped_ptr<SpdyHttpStream> http_stream1( new SpdyHttpStream(session_.get(), true)); - ASSERT_EQ(OK, - http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY, - BoundNetLog(), - CompletionCallback())); - EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1, - callback1.callback())); - EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); - HttpRequestInfo request2; request2.method = "GET"; request2.url = GURL("http://www.google.com/"); @@ -271,15 +263,15 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { scoped_ptr<SpdyHttpStream> http_stream2( new SpdyHttpStream(session_.get(), true)); + // First write. ASSERT_EQ(OK, - http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY, + http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY, BoundNetLog(), CompletionCallback())); - EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2, - callback2.callback())); + EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1, + callback1.callback())); EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); - // First write. deterministic_data()->RunFor(1); EXPECT_LE(0, callback1.WaitForResult()); @@ -290,6 +282,14 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { EXPECT_FALSE(http_stream2->GetLoadTimingInfo(&load_timing_info2)); // Second write. + ASSERT_EQ(OK, + http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY, + BoundNetLog(), + CompletionCallback())); + EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2, + callback2.callback())); + EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); + deterministic_data()->RunFor(1); EXPECT_LE(0, callback2.WaitForResult()); TestLoadTimingReused(*http_stream2); diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index db1dc7d..36d80c6 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -2016,19 +2016,21 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply( spdy_util_.ConstructSpdyPostSynReply(NULL, 0)); - scoped_ptr<SpdyFrame> stream_body(spdy_util_.ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { CreateMockRead(*stream_reply, 1), - MockRead(ASYNC, 0, 3) // EOF + MockRead(ASYNC, 0, 4) // EOF }; scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyPost( kRequestUrl, 1, kUploadDataSize, LOWEST, NULL, 0)); scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR)); MockWrite writes[] = { CreateMockWrite(*req, 0), CreateMockWrite(*body, 2), + CreateMockWrite(*rst, 3) }; DeterministicSocketData data(reads, arraysize(reads), @@ -2045,7 +2047,7 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) { &CreatePostRequest(), callback.callback(), BoundNetLog()); EXPECT_EQ(ERR_IO_PENDING, rv); - data.RunFor(2); + data.RunFor(4); rv = callback.WaitForResult(); EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); data.RunFor(1); @@ -3531,7 +3533,11 @@ TEST_P(SpdyNetworkTransactionTest, CorruptFrameSessionError) { for (size_t i = 0; i < ARRAYSIZE_UNSAFE(test_cases); ++i) { scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true)); - MockWrite writes[] = { CreateMockWrite(*req), MockWrite(ASYNC, 0, 0) // EOF + scoped_ptr<SpdyFrame> rst( + spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR)); + MockWrite writes[] = { + CreateMockWrite(*req), + CreateMockWrite(*rst), }; scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); diff --git a/net/spdy/spdy_proxy_client_socket_unittest.cc b/net/spdy/spdy_proxy_client_socket_unittest.cc index 5af767b..95ca4b5 100644 --- a/net/spdy/spdy_proxy_client_socket_unittest.cc +++ b/net/spdy/spdy_proxy_client_socket_unittest.cc @@ -1176,7 +1176,7 @@ TEST_P(SpdyProxyClientSocketTest, WritePendingOnClose) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); MockWrite writes[] = { CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(ASYNC, ERR_IO_PENDING, 2), + MockWrite(ASYNC, ERR_ABORTED, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); @@ -1267,7 +1267,7 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePending) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); MockWrite writes[] = { CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(ASYNC, ERR_IO_PENDING, 2), + MockWrite(ASYNC, ERR_ABORTED, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); @@ -1390,7 +1390,7 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePendingDelete) { scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame()); MockWrite writes[] = { CreateMockWrite(*conn, 0, SYNCHRONOUS), - MockWrite(ASYNC, ERR_IO_PENDING, 2), + MockWrite(ASYNC, ERR_ABORTED, 2), }; scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame()); diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index b607796..879d645 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -9,6 +9,7 @@ #include "base/basictypes.h" #include "base/bind.h" +#include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "base/logging.h" #include "base/message_loop.h" @@ -352,14 +353,14 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, http_server_properties_(http_server_properties), read_buffer_(new IOBuffer(kReadBufferSize)), stream_hi_water_mark_(kFirstStreamId), - write_pending_(false), in_flight_write_frame_type_(DATA), in_flight_write_frame_size_(0), - delayed_write_pending_(false), is_secure_(false), certificate_error_code_(OK), - error_(OK), - state_(STATE_IDLE), + availability_state_(STATE_AVAILABLE), + read_state_(READ_STATE_DO_READ), + write_state_(WRITE_STATE_DO_WRITE), + error_on_close_(OK), max_concurrent_streams_(initial_max_concurrent_streams == 0 ? kInitialMaxConcurrentStreams : initial_max_concurrent_streams), @@ -371,7 +372,6 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, streams_pushed_and_claimed_count_(0), streams_abandoned_count_(0), total_bytes_received_(0), - bytes_read_(0), sent_settings_(false), received_settings_(false), stalled_streams_(0), @@ -412,8 +412,9 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, } SpdySession::~SpdySession() { - if (state_ != STATE_CLOSED) { - state_ = STATE_CLOSED; + if (availability_state_ != STATE_CLOSED) { + availability_state_ = STATE_CLOSED; + error_on_close_ = ERR_ABORTED; // Cleanup all the streams. CloseAllStreams(ERR_ABORTED); @@ -451,7 +452,10 @@ Error SpdySession::InitializeWithSocket( base::StatsCounter spdy_sessions("spdy.sessions"); spdy_sessions.Increment(); - state_ = STATE_DO_READ; + DCHECK_EQ(availability_state_, STATE_AVAILABLE); + DCHECK_EQ(read_state_, READ_STATE_DO_READ); + DCHECK_EQ(write_state_, WRITE_STATE_DO_WRITE); + connection_ = connection.Pass(); is_secure_ = is_secure; certificate_error_code_ = certificate_error_code; @@ -500,15 +504,12 @@ Error SpdySession::InitializeWithSocket( NetLog::TYPE_SPDY_SESSION_INITIALIZED, connection_->socket()->NetLog().source().ToEventParametersCallback()); - int error = DoLoop(OK); + int error = DoReadLoop(READ_STATE_DO_READ, OK); if (error == ERR_IO_PENDING) error = OK; if (error == OK) { connection_->AddLayeredPool(this); SendInitialSettings(); - // Write out any data that we might have to send, such as the - // settings frame. - WriteSocketLater(); spdy_session_pool_ = spdy_session_pool; } return static_cast<Error>(error); @@ -518,7 +519,7 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { if (!verify_domain_authentication_) return true; - if (!IsConnected()) + if (availability_state_ == STATE_CLOSED) return false; SSLInfo ssl_info; @@ -538,7 +539,8 @@ int SpdySession::GetPushStream( const GURL& url, base::WeakPtr<SpdyStream>* stream, const BoundNetLog& stream_net_log) { - CHECK_NE(state_, STATE_CLOSED); + if (availability_state_ == STATE_CLOSED) + return ERR_CONNECTION_CLOSED; stream->reset(); @@ -565,6 +567,11 @@ int SpdySession::GetPushStream( int SpdySession::TryCreateStream(SpdyStreamRequest* request, base::WeakPtr<SpdyStream>* stream) { + // TODO(akalin): Also refuse to create the stream when + // |availability_state_| == STATE_GOING_AWAY. + if (availability_state_ == STATE_CLOSED) + return ERR_CONNECTION_CLOSED; + if (!max_concurrent_streams_ || (active_streams_.size() + created_streams_.size() < max_concurrent_streams_)) { @@ -582,6 +589,11 @@ int SpdySession::CreateStream(const SpdyStreamRequest& request, DCHECK_GE(request.priority(), MINIMUM_PRIORITY); DCHECK_LT(request.priority(), NUM_PRIORITIES); + // TODO(akalin): Also refuse to create the stream when + // |availability_state_| == STATE_GOING_AWAY. + if (availability_state_ == STATE_CLOSED) + return ERR_CONNECTION_CLOSED; + // Make sure that we don't try to send https/wss over an unauthenticated, but // encrypted SSL socket. if (is_secure_ && certificate_error_code_ != OK && @@ -956,26 +968,11 @@ bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { } LoadState SpdySession::GetLoadState() const { - // NOTE: The application only queries the LoadState via the - // SpdyNetworkTransaction, and details are only needed when - // we're in the process of connecting. - - // If we're connecting, defer to the connection to give us the actual - // LoadState. - if (state_ == STATE_CONNECTING) - return connection_->GetLoadState(); - // Just report that we're idle since the session could be doing // many things concurrently. return LOAD_STATE_IDLE; } -void SpdySession::OnReadComplete(int bytes_read) { - DCHECK_NE(state_, STATE_DO_READ); - DoLoop(bytes_read); -} - - void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, int status) { // TODO(mbelshe): We should send a RST_STREAM control frame here @@ -1043,13 +1040,13 @@ void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id, static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); } -void SpdySession::StartRead() { - DCHECK_NE(state_, STATE_DO_READ_COMPLETE); - DoLoop(OK); -} +int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { + DCHECK_EQ(read_state_, expected_read_state); -int SpdySession::DoLoop(int result) { - bytes_read_ = 0; + if (availability_state_ == STATE_CLOSED) { + DCHECK_LT(error_on_close_, ERR_IO_PENDING); + return error_on_close_; + } // The SpdyFramer will use callbacks onto |this| as it parses frames. // When errors occur, those callbacks can lead to teardown of all references @@ -1057,106 +1054,221 @@ int SpdySession::DoLoop(int result) { // cleanup. scoped_refptr<SpdySession> self(this); - do { - switch (state_) { - case STATE_DO_READ: + int bytes_read_without_yielding = 0; + + // Loop until the session is closed, the read becomes blocked, or + // the read limit is exceeded. + while (true) { + switch (read_state_) { + case READ_STATE_DO_READ: DCHECK_EQ(result, OK); result = DoRead(); break; - case STATE_DO_READ_COMPLETE: + case READ_STATE_DO_READ_COMPLETE: + if (result > 0) + bytes_read_without_yielding += result; result = DoReadComplete(result); break; - case STATE_CLOSED: - result = ERR_CONNECTION_CLOSED; - break; default: - NOTREACHED() << "state_: " << state_; + NOTREACHED() << "read_state_: " << read_state_; break; } - } while (result != ERR_IO_PENDING && state_ != STATE_CLOSED); - DCHECK(result == ERR_IO_PENDING || result == ERR_CONNECTION_CLOSED); - return result; + if (availability_state_ == STATE_CLOSED) { + DCHECK_EQ(result, error_on_close_); + DCHECK_LT(result, ERR_IO_PENDING); + return result; + } + + if (result == ERR_IO_PENDING) + return ERR_IO_PENDING; + + if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { + read_state_ = READ_STATE_DO_READ; + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop), + weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); + return ERR_IO_PENDING; + } + } } int SpdySession::DoRead() { - if (bytes_read_ > kMaxReadBytes) { - state_ = STATE_DO_READ; - base::MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(&SpdySession::StartRead, - weak_factory_.GetWeakPtr())); - return ERR_IO_PENDING; - } + DCHECK_NE(availability_state_, STATE_CLOSED); CHECK(connection_); CHECK(connection_->socket()); - state_ = STATE_DO_READ_COMPLETE; + read_state_ = READ_STATE_DO_READ_COMPLETE; return connection_->socket()->Read( read_buffer_.get(), kReadBufferSize, - base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr())); + base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop), + weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); } int SpdySession::DoReadComplete(int result) { + DCHECK_NE(availability_state_, STATE_CLOSED); + // Parse a frame. For now this code requires that the frame fit into our - // buffer (32KB). + // buffer (kReadBufferSize). // TODO(mbelshe): support arbitrarily large frames! - if (result <= 0) { - // Session is tearing down. - Error error = static_cast<Error>(result); - if (result == 0) { - UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", - total_bytes_received_, 1, 100000000, 50); - error = ERR_CONNECTION_CLOSED; - } - CloseSessionOnError(error, "result is <= 0."); + if (result == 0) { + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", + total_bytes_received_, 1, 100000000, 50); + CloseSessionOnError(ERR_CONNECTION_CLOSED, "Connection closed"); + DCHECK_EQ(availability_state_, STATE_CLOSED); + DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED); return ERR_CONNECTION_CLOSED; } + if (result < 0) { + CloseSessionOnError(static_cast<Error>(result), "result is < 0."); + DCHECK_EQ(availability_state_, STATE_CLOSED); + DCHECK_EQ(error_on_close_, result); + return result; + } + total_bytes_received_ += result; - bytes_read_ += result; last_activity_time_ = base::TimeTicks::Now(); DCHECK(buffered_spdy_framer_.get()); char* data = read_buffer_->data(); - while (result && - buffered_spdy_framer_->error_code() == - SpdyFramer::SPDY_NO_ERROR) { - uint32 bytes_processed = - buffered_spdy_framer_->ProcessInput(data, result); + while (result > 0) { + uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); result -= bytes_processed; data += bytes_processed; - } - if (!IsConnected()) - return ERR_CONNECTION_CLOSED; + if (availability_state_ == STATE_CLOSED) { + DCHECK_LT(error_on_close_, ERR_IO_PENDING); + return error_on_close_; + } - state_ = STATE_DO_READ; + DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); + } + + read_state_ = READ_STATE_DO_READ; return OK; } -void SpdySession::OnWriteComplete(int result) { - // Releasing the in-flight write can have a side-effect of dropping - // the last reference to |this|. Hold a reference through this - // function. +int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { + DCHECK_EQ(write_state_, expected_write_state); + + if (availability_state_ == STATE_CLOSED) { + DCHECK_LT(error_on_close_, ERR_IO_PENDING); + return error_on_close_; + } + + // Releasing the in-flight write in DoWriteComplete() can have a + // side-effect of dropping the last reference to |this|. Hold a + // reference through this function. scoped_refptr<SpdySession> self(this); - DCHECK(write_pending_); + // Loop until the session is closed or the write becomes blocked. + while (true) { + switch (write_state_) { + case WRITE_STATE_DO_WRITE: + DCHECK_EQ(result, OK); + result = DoWrite(); + break; + case WRITE_STATE_DO_WRITE_COMPLETE: + result = DoWriteComplete(result); + break; + default: + NOTREACHED() << "write_state_: " << write_state_; + break; + } + + if (availability_state_ == STATE_CLOSED) { + DCHECK_EQ(result, error_on_close_); + DCHECK_LT(result, ERR_IO_PENDING); + return result; + } + + if (result == ERR_IO_PENDING) + return ERR_IO_PENDING; + } +} + +int SpdySession::DoWrite() { + DCHECK_NE(availability_state_, STATE_CLOSED); + + DCHECK(buffered_spdy_framer_); + if (in_flight_write_) { + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); + } else { + // Grab the next frame to send. + SpdyFrameType frame_type = DATA; + scoped_ptr<SpdyBufferProducer> producer; + base::WeakPtr<SpdyStream> stream; + if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { + DCHECK_EQ(write_state_, WRITE_STATE_DO_WRITE); + return ERR_IO_PENDING; + } + + if (stream.get()) + DCHECK(!stream->IsClosed()); + + // Activate the stream only when sending the SYN_STREAM frame to + // guarantee monotonically-increasing stream IDs. + if (frame_type == SYN_STREAM) { + if (stream.get() && stream->stream_id() == 0) { + scoped_ptr<SpdyStream> owned_stream = + ActivateCreatedStream(stream.get()); + InsertActivatedStream(owned_stream.Pass()); + } else { + NOTREACHED(); + return ERR_UNEXPECTED; + } + } + + in_flight_write_ = producer->ProduceBuffer(); + if (!in_flight_write_) { + NOTREACHED(); + return ERR_UNEXPECTED; + } + in_flight_write_frame_type_ = frame_type; + in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); + DCHECK_GE(in_flight_write_frame_size_, + buffered_spdy_framer_->GetFrameMinimumSize()); + in_flight_write_stream_ = stream; + } + + write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; + + // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems + // with Socket implementations that don't store their IOBuffer + // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). + scoped_refptr<IOBuffer> write_io_buffer = + in_flight_write_->GetIOBufferForRemainingData(); + // We keep |in_flight_write_| alive until OnWriteComplete(), so it's + // okay to pass in |write_io_buffer| since the socket won't use it + // past OnWriteComplete(). + return connection_->socket()->Write( + write_io_buffer.get(), + in_flight_write_->GetRemainingSize(), + base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop), + weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); +} + +int SpdySession::DoWriteComplete(int result) { + DCHECK_NE(availability_state_, STATE_CLOSED); + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); last_activity_time_ = base::TimeTicks::Now(); - write_pending_ = false; if (result < 0) { + DCHECK_NE(result, ERR_IO_PENDING); in_flight_write_.reset(); in_flight_write_frame_type_ = DATA; in_flight_write_frame_size_ = 0; in_flight_write_stream_.reset(); CloseSessionOnError(static_cast<Error>(result), "Write error"); - return; + DCHECK_EQ(error_on_close_, result); + return result; } // It should not be possible to have written more bytes than our @@ -1186,108 +1298,11 @@ void SpdySession::OnWriteComplete(int result) { } } - // Write more data. We're already in a continuation, so we can go - // ahead and write it immediately (without going back to the message - // loop). - WriteSocketLater(); -} - -void SpdySession::WriteSocketLater() { - if (delayed_write_pending_) - return; - - if (!IsConnected()) - return; - - delayed_write_pending_ = true; - base::MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(&SpdySession::WriteSocket, weak_factory_.GetWeakPtr())); -} - -void SpdySession::WriteSocket() { - // This function should only be called via WriteSocketLater. - DCHECK(delayed_write_pending_); - delayed_write_pending_ = false; - - // If the socket isn't connected yet, just wait; we'll get called - // again when the socket connection completes. If the socket is - // closed, just return. - if (!IsConnected()) - return; - - if (write_pending_) // Another write is in progress still. - return; - - // Loop sending frames until we've sent everything or until the write - // returns error (or ERR_IO_PENDING). - DCHECK(buffered_spdy_framer_.get()); - while (true) { - if (in_flight_write_) { - DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); - } else { - // Grab the next frame to send. - SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyBufferProducer> producer; - base::WeakPtr<SpdyStream> stream; - if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) - break; - - if (stream.get()) - DCHECK(!stream->IsClosed()); - - // Activate the stream only when sending the SYN_STREAM frame to - // guarantee monotonically-increasing stream IDs. - if (frame_type == SYN_STREAM) { - if (stream.get() && stream->stream_id() == 0) { - scoped_ptr<SpdyStream> owned_stream = - ActivateCreatedStream(stream.get()); - InsertActivatedStream(owned_stream.Pass()); - } else { - NOTREACHED(); - continue; - } - } - - in_flight_write_ = producer->ProduceBuffer(); - if (!in_flight_write_) { - NOTREACHED(); - continue; - } - in_flight_write_frame_type_ = frame_type; - in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); - DCHECK_GE(in_flight_write_frame_size_, - buffered_spdy_framer_->GetFrameMinimumSize()); - in_flight_write_stream_ = stream; - } - - write_pending_ = true; - // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems - // with Socket implementations that don't store their IOBuffer - // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). - scoped_refptr<IOBuffer> write_io_buffer = - in_flight_write_->GetIOBufferForRemainingData(); - // We keep |in_flight_write_| alive until OnWriteComplete(), so - // it's okay to use GetIOBufferForRemainingData() since the socket - // doesn't use the IOBuffer past OnWriteComplete(). - int rv = connection_->socket()->Write( - write_io_buffer.get(), - in_flight_write_->GetRemainingSize(), - base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); - // Avoid persisting |write_io_buffer| past |in_flight_write_|'s - // lifetime (which will end if OnWriteComplete() is called below). - write_io_buffer = NULL; - if (rv == ERR_IO_PENDING) - break; - - // We sent the frame successfully. - OnWriteComplete(rv); - - // TODO(mbelshe): Test this error case. Maybe we should mark the socket - // as in an error state. - if (rv < 0) - break; - } + // This has to go after calling OnFrameWriteComplete() so that if + // that ends up calling EnqueueWrite(), it properly avoids posting + // another DoWriteLoop task. + write_state_ = WRITE_STATE_DO_WRITE; + return OK; } void SpdySession::CloseAllStreamsAfter(SpdyStreamId last_good_stream_id, @@ -1359,7 +1374,7 @@ void SpdySession::CloseSessionOnError(Error err, // to |this|. Hold a reference through this function. scoped_refptr<SpdySession> self(this); - DCHECK_LT(err, OK); + DCHECK_LT(err, ERR_IO_PENDING); net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_CLOSE, base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); @@ -1367,12 +1382,12 @@ void SpdySession::CloseSessionOnError(Error err, // Don't close twice. This can occur because we can have both // a read and a write outstanding, and each can complete with // an error. - if (!IsClosed()) { + if (availability_state_ != STATE_CLOSED) { UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", total_bytes_received_, 1, 100000000, 50); - state_ = STATE_CLOSED; - error_ = err; + availability_state_ = STATE_CLOSED; + error_on_close_ = err; // TODO(akalin): Move this after CloseAllStreams() once we're // owned by the pool. RemoveFromPool(); @@ -1409,7 +1424,7 @@ base::Value* SpdySession::GetInfoAsValue() const { SSLClientSocket::NextProtoToString( connection_->socket()->GetNegotiatedProtocol())); - dict->SetInteger("error", error_); + dict->SetInteger("error", error_on_close_); dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); dict->SetInteger("streams_initiated_count", streams_initiated_count_); @@ -1483,8 +1498,19 @@ void SpdySession::EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer, const base::WeakPtr<SpdyStream>& stream) { + if (availability_state_ == STATE_CLOSED) + return; + + bool was_idle = write_queue_.IsEmpty(); write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); - WriteSocketLater(); + // We only want to post a task when the queue was just empty *and* + // we're not being called from a callback from DoWriteComplete(). + if (was_idle && write_state_ == WRITE_STATE_DO_WRITE) { + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop), + weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); + } } void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { @@ -1588,6 +1614,9 @@ ServerBoundCertService* SpdySession::GetServerBoundCertService() const { } void SpdySession::OnError(SpdyFramer::SpdyError error_code) { + if (availability_state_ == STATE_CLOSED) + return; + RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(error_code)); std::string description = base::StringPrintf( @@ -1597,6 +1626,9 @@ void SpdySession::OnError(SpdyFramer::SpdyError error_code) { void SpdySession::OnStreamError(SpdyStreamId stream_id, const std::string& description) { + if (availability_state_ == STATE_CLOSED) + return; + ActiveStreamMap::iterator it = active_streams_.find(stream_id); if (it == active_streams_.end()) { // We still want to send a frame to reset the stream even if we @@ -1613,6 +1645,9 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, const char* data, size_t len, bool fin) { + if (availability_state_ == STATE_CLOSED) + return; + DCHECK_LT(len, 1u << 24); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( @@ -1654,6 +1689,9 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, } void SpdySession::OnSettings(bool clear_persisted) { + if (availability_state_ == STATE_CLOSED) + return; + if (clear_persisted) http_server_properties_->ClearSpdySettings(host_port_pair()); @@ -1668,6 +1706,9 @@ void SpdySession::OnSettings(bool clear_persisted) { void SpdySession::OnSetting(SpdySettingsIds id, uint8 flags, uint32 value) { + if (availability_state_ == STATE_CLOSED) + return; + HandleSetting(id, value); http_server_properties_->SetSpdySetting( host_port_pair(), @@ -1730,6 +1771,12 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, bool fin, bool unidirectional, const SpdyHeaderBlock& headers) { + if (availability_state_ == STATE_CLOSED) + return; + + // TODO(akalin): Reset the stream when |availability_state_| == + // STATE_GOING_AWAY. + base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = base::TimeTicks::Now(); @@ -1891,6 +1938,9 @@ void SpdySession::DeleteExpiredPushedStreams() { void SpdySession::OnSynReply(SpdyStreamId stream_id, bool fin, const SpdyHeaderBlock& headers) { + if (availability_state_ == STATE_CLOSED) + return; + base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = base::TimeTicks::Now(); @@ -1927,6 +1977,9 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id, void SpdySession::OnHeaders(SpdyStreamId stream_id, bool fin, const SpdyHeaderBlock& headers) { + if (availability_state_ == STATE_CLOSED) + return; + if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, @@ -1954,6 +2007,9 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id, void SpdySession::OnRstStream(SpdyStreamId stream_id, SpdyRstStreamStatus status) { + if (availability_state_ == STATE_CLOSED) + return; + std::string description; net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RST_STREAM, @@ -1987,6 +2043,9 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, SpdyGoAwayStatus status) { + if (availability_state_ == STATE_CLOSED) + return; + net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, base::Bind(&NetLogSpdyGoAwayCallback, last_accepted_stream_id, @@ -1999,6 +2058,9 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, } void SpdySession::OnPing(uint32 unique_id) { + if (availability_state_ == STATE_CLOSED) + return; + net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_PING, base::Bind(&NetLogSpdyPingCallback, unique_id, "received")); @@ -2027,6 +2089,9 @@ void SpdySession::OnPing(uint32 unique_id) { void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size) { + if (availability_state_ == STATE_CLOSED) + return; + DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, @@ -2580,7 +2645,7 @@ void SpdySession::ResumeSendStalledStreams() { // have to worry about streams being closed, as well as ourselves // being closed. - while (!IsClosed() && !IsSendStalled()) { + while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { size_t old_size = 0; if (DCHECK_IS_ON()) old_size = GetTotalSize(stream_send_unstall_queue_); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index c2c6cfd..0cca912 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -51,9 +51,9 @@ const int kMaxSpdyFrameChunkSize = (2 * kMss) - 8; // Specifies the maxiumum concurrent streams server could send (via push). const int kMaxConcurrentPushedStreams = 1000; -// Specifies the number of bytes read synchronously (without yielding) if the -// data is available. -const int kMaxReadBytes = 32 * 1024; +// Specifies the maximum number of bytes to read synchronously before +// yielding. +const int kMaxReadBytesWithoutYielding = 32 * 1024; // The initial receive window size for both streams and sessions. const int32 kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB @@ -336,8 +336,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, void SendStreamWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size); - // If session is closed, no new streams/transactions should be created. - bool IsClosed() const { return state_ == STATE_CLOSED; } + // Whether the stream is closed, i.e. it has stopped processing data + // and is about to be destroyed. + bool IsClosed() const { return availability_state_ == STATE_CLOSED; } // Closes this session. This will close all active streams and mark // the session as permanently closed. @@ -386,7 +387,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, } size_t num_created_streams() const { return created_streams_.size(); } - size_t pending_create_stream_queue_size(int priority) const { + size_t pending_create_stream_queue_size(RequestPriority priority) const { DCHECK_LT(priority, NUM_PRIORITIES); return pending_create_stream_queues_[priority].size(); } @@ -501,14 +502,28 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::set<SpdyStream*> CreatedStreamSet; - enum State { - STATE_IDLE, - STATE_CONNECTING, - STATE_DO_READ, - STATE_DO_READ_COMPLETE, + enum AvailabilityState { + // The session is available in its socket pool and can be used + // freely. + STATE_AVAILABLE, + // The session can process data on existing streams but will + // refuse to create new ones. + STATE_GOING_AWAY, + // The session has been closed, is waiting to be deleted, and will + // refuse to process any more data. STATE_CLOSED }; + enum ReadState { + READ_STATE_DO_READ, + READ_STATE_DO_READ_COMPLETE, + }; + + enum WriteState { + WRITE_STATE_DO_WRITE, + WRITE_STATE_DO_WRITE_COMPLETE, + }; + virtual ~SpdySession(); // Called by SpdyStreamRequest to start a request to create a @@ -554,24 +569,19 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, SpdyRstStreamStatus status, const std::string& description); - // Start the DoLoop to read data from socket. - void StartRead(); - - // Try to make progress by reading and processing data. - int DoLoop(int result); - // The implementations of STATE_DO_READ/STATE_DO_READ_COMPLETE state changes - // of the state machine. + // Advance the ReadState state machine. |expected_read_state| is the + // expected starting read state. + int DoReadLoop(ReadState expected_read_state, int result); + // The implementations of the states of the ReadState state machine. int DoRead(); - int DoReadComplete(int bytes_read); - - // Check if session is connected or not. - bool IsConnected() const { - return state_ == STATE_DO_READ || state_ == STATE_DO_READ_COMPLETE; - } + int DoReadComplete(int result); - // IO Callbacks - void OnReadComplete(int result); - void OnWriteComplete(int result); + // Advance the WriteState state machine. |expected_write_state| is + // the expected starting write state. + int DoWriteLoop(WriteState expected_write_state, int result); + // The implementations of the states of the WriteState state machine. + int DoWrite(); + int DoWriteComplete(int result); // Send relevant SETTINGS. This is generally called on connection setup. void SendInitialSettings(); @@ -607,10 +617,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // haven't received any data in |kHungInterval| time period. void CheckPingStatus(base::TimeTicks last_check_time); - // Write current data to the socket. - void WriteSocketLater(); - void WriteSocket(); - // Get a new stream id. int GetNewStreamId(); @@ -884,8 +890,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, SpdyWriteQueue write_queue_; // Data for the frame we are currently sending. - // Whether we have a socket write pending completion. - bool write_pending_; + // The buffer we're currently writing. scoped_ptr<SpdyBuffer> in_flight_write_; // The type of the frame in |in_flight_write_|. @@ -896,9 +901,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // the socket completely. base::WeakPtr<SpdyStream> in_flight_write_stream_; - // Flag if we have a pending message scheduled for WriteSocket. - bool delayed_write_pending_; - // Flag if we're using an SSL connection for this SpdySession. bool is_secure_; @@ -908,11 +910,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Spdy Frame state. scoped_ptr<BufferedSpdyFramer> buffered_spdy_framer_; - // If an error has occurred on the session, the session is effectively - // dead. Record this error here. When no error has occurred, |error_| will - // be OK. - Error error_; - State state_; + // The state variables. + AvailabilityState availability_state_; + ReadState read_state_; + WriteState write_state_; + + // If the session was closed (i.e., |availability_state_| is + // STATE_CLOSED), then |error_on_close_| holds the error with which + // it was closed, which is < ERR_IO_PENDING. Otherwise, it is set to + // OK. + Error error_on_close_; // Limits size_t max_concurrent_streams_; // 0 if no limit @@ -928,10 +935,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // SpdySession. It is used by the |Net.SpdySettingsCwnd...| histograms. int total_bytes_received_; - // |bytes_read_| keeps track of number of bytes read continously in the - // DoLoop() without yielding. - int bytes_read_; - bool sent_settings_; // Did this session send settings when it started. bool received_settings_; // Did this session receive at least one settings // frame. diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index 1b414681..1f6b931 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -357,7 +357,7 @@ TEST_P(SpdySessionTest, ServerPing) { test::StreamDelegateSendImmediate delegate(spdy_stream1, NULL); spdy_stream1->SetDelegate(&delegate); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); @@ -546,7 +546,7 @@ TEST_P(SpdySessionTest, OnSettings) { stream_releaser.MakeCallback(&request))); session = NULL; - EXPECT_EQ(OK, stream_releaser.WaitForResult()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, stream_releaser.WaitForResult()); } // Start with max concurrent streams set to 1 (that is persisted). Receive a @@ -610,7 +610,7 @@ TEST_P(SpdySessionTest, ClearSettings) { BoundNetLog(), stream_releaser.MakeCallback(&request))); - EXPECT_EQ(OK, stream_releaser.WaitForResult()); + EXPECT_EQ(ERR_CONNECTION_CLOSED, stream_releaser.WaitForResult()); // Make sure that persisted data is cleared. EXPECT_EQ(0u, spdy_session_pool_->http_server_properties()->GetSpdySettings( @@ -823,7 +823,7 @@ TEST_P(SpdySessionTest, Initialize) { CreateInsecureSpdySession(http_session_, key_, log.bound()); EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_)); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); net::CapturingNetLog::CapturedEntryList entries; @@ -871,7 +871,7 @@ TEST_P(SpdySessionTest, CloseSessionOnError) { CreateInsecureSpdySession(http_session_, key_, log.bound()); EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_)); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_)); @@ -1705,15 +1705,16 @@ TEST_P(SpdySessionTest, NeedsCredentials) { EXPECT_EQ(spdy_util_.spdy_version() >= SPDY3, session->NeedsCredentials()); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); session->CloseSessionOnError(ERR_ABORTED, std::string()); } -// Test that SpdySession::DoRead reads data from the socket without yielding. -// This test makes 32k - 1 bytes of data available on the socket for reading. It -// then verifies that it has read all the available data without yielding. +// Test that SpdySession::DoReadLoop reads data from the socket +// without yielding. This test makes 32k - 1 bytes of data available +// on the socket for reading. It then verifies that it has read all +// the available data without yielding. TEST_P(SpdySessionTest, ReadDataWithoutYielding) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -1724,10 +1725,11 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytes); + // Build buffer of size kMaxReadBytesWithoutYielding / 4 + // (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); const int kPayloadSize = - kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); TestDataStream test_stream; scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize)); char* payload_data = payload->data(); @@ -1780,8 +1782,9 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to verify SpdySession::DoRead doesn't post a task. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); + // Set up the TaskObserver to verify SpdySession::DoReadLoop doesn't + // post a task. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1789,7 +1792,8 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { EXPECT_EQ(1u, delegate1.stream_id()); EXPECT_EQ(0u, observer.executed_count()); - // Read all the data and verify SpdySession::DoRead has not posted a task. + // Read all the data and verify SpdySession::DoReadLoop has not + // posted a task. data.RunFor(4); EXPECT_EQ(NULL, spdy_stream1.get()); @@ -1800,10 +1804,11 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { EXPECT_TRUE(data.at_read_eof()); } -// Test that SpdySession::DoRead yields while reading the data. This test makes -// 32k + 1 bytes of data available on the socket for reading. It then verifies -// that DoRead has yielded even though there is data available for it to read -// (i.e, socket()->Read didn't return ERR_IO_PENDING during socket reads). +// Test that SpdySession::DoReadLoop yields while reading the +// data. This test makes 32k + 1 bytes of data available on the socket +// for reading. It then verifies that DoRead has yielded even though +// there is data available for it to read (i.e, socket()->Read didn't +// return ERR_IO_PENDING during socket reads). TEST_P(SpdySessionTest, TestYieldingDuringReadData) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -1814,10 +1819,11 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytes); + // Build buffer of size kMaxReadBytesWithoutYielding / 4 + // (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); const int kPayloadSize = - kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); TestDataStream test_stream; scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize)); char* payload_data = payload->data(); @@ -1870,8 +1876,9 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to verify SpdySession::DoRead posts a task. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); + // Set up the TaskObserver to verify SpdySession::DoReadLoop posts a + // task. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1879,7 +1886,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { EXPECT_EQ(1u, delegate1.stream_id()); EXPECT_EQ(0u, observer.executed_count()); - // Read all the data and verify SpdySession::DoRead has posted a task. + // Read all the data and verify SpdySession::DoReadLoop has posted a + // task. data.RunFor(6); EXPECT_EQ(NULL, spdy_stream1.get()); @@ -1891,17 +1899,17 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { EXPECT_TRUE(data.at_read_eof()); } -// Test that SpdySession::DoRead() tests interactions of yielding + async, -// by doing the following MockReads. +// Test that SpdySession::DoReadLoop() tests interactions of yielding +// + async, by doing the following MockReads. // // MockRead of SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 2K // ASYNC 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 2K. // -// The above reads 26K synchronously. Since that is less that 32K, we will -// attempt to read again. However, that DoRead() will return ERR_IO_PENDING -// (because of async read), so DoRead() will yield. When we come back, DoRead() -// will read the results from the async read, and rest of the data -// synchronously. +// The above reads 26K synchronously. Since that is less that 32K, we +// will attempt to read again. However, that DoRead() will return +// ERR_IO_PENDING (because of async read), so DoReadLoop() will +// yield. When we come back, DoRead() will read the results from the +// async read, and rest of the data synchronously. TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -1912,11 +1920,12 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytes); + // Build buffer of size kMaxReadBytesWithoutYielding / 4 + // (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); TestDataStream test_stream; const int kEightKPayloadSize = - kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); scoped_refptr<net::IOBuffer> eightk_payload( new net::IOBuffer(kEightKPayloadSize)); char* eightk_payload_data = eightk_payload->data(); @@ -1983,8 +1992,9 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to monitor SpdySession::DoRead posting of tasks. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); + // Set up the TaskObserver to monitor SpdySession::DoReadLoop + // posting of tasks. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1992,7 +2002,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { EXPECT_EQ(1u, delegate1.stream_id()); EXPECT_EQ(0u, observer.executed_count()); - // Read all the data and verify SpdySession::DoRead has posted a task. + // Read all the data and verify SpdySession::DoReadLoop has posted a + // task. data.RunFor(12); EXPECT_EQ(NULL, spdy_stream1.get()); @@ -2004,11 +2015,12 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { EXPECT_TRUE(data.at_read_eof()); } -// Send a GoAway frame when SpdySession is in DoLoop. If scoped_refptr to -// <SpdySession> is deleted from SpdySession::DoLoop(), we get a crash because -// GoAway could delete the SpdySession from the SpdySessionPool and the last +// Send a GoAway frame when SpdySession is in DoReadLoop. If +// scoped_refptr to <SpdySession> is deleted from +// SpdySession::DoReadLoop(), we get a crash because GoAway could +// delete the SpdySession from the SpdySessionPool and the last // reference to SpdySession. -TEST_P(SpdySessionTest, GoAwayWhileInDoLoop) { +TEST_P(SpdySessionTest, GoAwayWhileInDoReadLoop) { MockConnect connect_data(SYNCHRONOUS, OK); BufferedSpdyFramer framer(spdy_util_.spdy_version(), false); @@ -2064,8 +2076,8 @@ TEST_P(SpdySessionTest, GoAwayWhileInDoLoop) { data.RunFor(1); EXPECT_EQ(1u, spdy_stream1->stream_id()); - // Only references to SpdySession are held by DoLoop and - // SpdySessionPool. If DoLoop doesn't hold the reference, we get a + // Only references to SpdySession are held by DoReadLoop and + // SpdySessionPool. If DoReadLoop doesn't hold the reference, we get a // crash if SpdySession is deleted from the SpdySessionPool. // Run until GoAway. @@ -2466,7 +2478,7 @@ TEST_P(SpdySessionTest, SendCredentials) { CreateSecureSpdySession(http_session_, key, BoundNetLog()); EXPECT_TRUE(session->NeedsCredentials()); - // Flush the SpdySession::OnReadComplete() task. + // Flush the read completion task. base::MessageLoop::current()->RunUntilIdle(); session->CloseSessionOnError(ERR_ABORTED, std::string()); diff --git a/net/spdy/spdy_test_util_common.cc b/net/spdy/spdy_test_util_common.cc index ea333c9..ba32a6f 100644 --- a/net/spdy/spdy_test_util_common.cc +++ b/net/spdy/spdy_test_util_common.cc @@ -293,7 +293,8 @@ CompletionCallback StreamReleaserCallback::MakeCallback( void StreamReleaserCallback::OnComplete( SpdyStreamRequest* request, int result) { - request->ReleaseStream()->Cancel(); + if (result == OK) + request->ReleaseStream()->Cancel(); SetResult(result); } diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc index c793aab..2ac4241 100644 --- a/net/spdy/spdy_write_queue.cc +++ b/net/spdy/spdy_write_queue.cc @@ -32,6 +32,14 @@ SpdyWriteQueue::~SpdyWriteQueue() { Clear(); } +bool SpdyWriteQueue::IsEmpty() const { + for (int i = 0; i < NUM_PRIORITIES; i++) { + if (!queue_[i].empty()) + return false; + } + return true; +} + void SpdyWriteQueue::Enqueue(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> frame_producer, diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h index fa194ef..3bceb29 100644 --- a/net/spdy/spdy_write_queue.h +++ b/net/spdy/spdy_write_queue.h @@ -27,6 +27,10 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { SpdyWriteQueue(); ~SpdyWriteQueue(); + // Returns whether there is anything in the write queue, + // i.e. whether the next call to Dequeue will return true. + bool IsEmpty() const; + // Enqueues the given frame producer of the given type at the given // priority associated with the given stream, which may be NULL if // the frame producer is not associated with a stream. If |stream| |