diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-07-10 01:29:35 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-07-10 01:29:35 +0000 |
commit | 1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61 (patch) | |
tree | 5eb3649aaf282c64d97dc72e29feecbe9aeb3be1 /net | |
parent | 82c4d3b8fd037c0ee2c7b391a6f97f0992ecb4a5 (diff) | |
download | chromium_src-1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61.zip chromium_src-1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61.tar.gz chromium_src-1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61.tar.bz2 |
Revert 210694 "fix loop"
> fix loop
>
> fix
>
> remove
>
> rename
>
> fix yielding
>
> add another expected
>
> add expected
>
> rem another
>
> remove dcheck
>
> rewrite state machine
>
> rem unused
>
> rem startread loop
>
> read loop
>
> fix test
>
> use read
>
> fix test
>
> fix test
>
> fix another staet
>
> fix test
>
> rem bool
>
> initial write loop
>
> fix var
>
> bail out if closed
>
> rem dcheck
>
> fix onreadcomplete
>
> fix loop
>
> check state
>
> remove availability state
>
> clean up states
>
> remove unused states
TBR=akalin@chromium.org
Review URL: https://codereview.chromium.org/18822006
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@210696 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_http_stream_unittest.cc | 24 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_unittest.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 400 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 82 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 33 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.h | 2 |
7 files changed, 255 insertions, 308 deletions
diff --git a/net/spdy/spdy_http_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc index 565de15..f91594e 100644 --- a/net/spdy/spdy_http_stream_unittest.cc +++ b/net/spdy/spdy_http_stream_unittest.cc @@ -254,6 +254,14 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { scoped_ptr<SpdyHttpStream> http_stream1( new SpdyHttpStream(session_.get(), true)); + ASSERT_EQ(OK, + http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY, + BoundNetLog(), + CompletionCallback())); + EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1, + callback1.callback())); + EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); + HttpRequestInfo request2; request2.method = "GET"; request2.url = GURL("http://www.google.com/"); @@ -263,15 +271,15 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { scoped_ptr<SpdyHttpStream> http_stream2( new SpdyHttpStream(session_.get(), true)); - // First write. ASSERT_EQ(OK, - http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY, + http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY, BoundNetLog(), CompletionCallback())); - EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1, - callback1.callback())); + EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2, + callback2.callback())); EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); + // First write. deterministic_data()->RunFor(1); EXPECT_LE(0, callback1.WaitForResult()); @@ -282,14 +290,6 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) { EXPECT_FALSE(http_stream2->GetLoadTimingInfo(&load_timing_info2)); // Second write. - ASSERT_EQ(OK, - http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY, - BoundNetLog(), - CompletionCallback())); - EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2, - callback2.callback())); - EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key)); - deterministic_data()->RunFor(1); EXPECT_LE(0, callback2.WaitForResult()); TestLoadTimingReused(*http_stream2); diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index 3d31648..e0e9cfc 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -2014,21 +2014,19 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply( spdy_util_.ConstructSpdyPostSynReply(NULL, 0)); + scoped_ptr<SpdyFrame> stream_body(spdy_util_.ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { CreateMockRead(*stream_reply, 1), - MockRead(ASYNC, 0, 4) // EOF + MockRead(ASYNC, 0, 3) // EOF }; scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyPost( kRequestUrl, 1, kUploadDataSize, LOWEST, NULL, 0)); scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); - scoped_ptr<SpdyFrame> rst( - spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR)); MockWrite writes[] = { CreateMockWrite(*req, 0), CreateMockWrite(*body, 2), - CreateMockWrite(*rst, 3) }; DeterministicSocketData data(reads, arraysize(reads), @@ -2045,7 +2043,7 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) { &CreatePostRequest(), callback.callback(), BoundNetLog()); EXPECT_EQ(ERR_IO_PENDING, rv); - data.RunFor(4); + data.RunFor(2); rv = callback.WaitForResult(); EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); data.RunFor(1); @@ -3531,11 +3529,7 @@ TEST_P(SpdyNetworkTransactionTest, CorruptFrameSessionError) { for (size_t i = 0; i < ARRAYSIZE_UNSAFE(test_cases); ++i) { scoped_ptr<SpdyFrame> req( spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true)); - scoped_ptr<SpdyFrame> rst( - spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR)); - MockWrite writes[] = { - CreateMockWrite(*req), - CreateMockWrite(*rst), + MockWrite writes[] = { CreateMockWrite(*req), MockWrite(ASYNC, 0, 0) // EOF }; scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true)); diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 65aa43c..ae995e6 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -9,7 +9,6 @@ #include "base/basictypes.h" #include "base/bind.h" -#include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "base/logging.h" #include "base/message_loop.h" @@ -353,14 +352,14 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, http_server_properties_(http_server_properties), read_buffer_(new IOBuffer(kReadBufferSize)), stream_hi_water_mark_(kFirstStreamId), + write_pending_(false), in_flight_write_frame_type_(DATA), in_flight_write_frame_size_(0), + delayed_write_pending_(false), is_secure_(false), certificate_error_code_(OK), - availability_state_(STATE_AVAILABLE), - read_state_(READ_STATE_DO_READ), - write_state_(WRITE_STATE_IDLE), - error_on_close_(OK), + error_(OK), + state_(STATE_IDLE), max_concurrent_streams_(initial_max_concurrent_streams == 0 ? kInitialMaxConcurrentStreams : initial_max_concurrent_streams), @@ -372,6 +371,7 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, streams_pushed_and_claimed_count_(0), streams_abandoned_count_(0), total_bytes_received_(0), + bytes_read_(0), sent_settings_(false), received_settings_(false), stalled_streams_(0), @@ -412,8 +412,8 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key, } SpdySession::~SpdySession() { - if (availability_state_ != STATE_CLOSED) { - availability_state_ = STATE_CLOSED; + if (state_ != STATE_CLOSED) { + state_ = STATE_CLOSED; // Cleanup all the streams. CloseAllStreams(ERR_ABORTED); @@ -450,7 +450,7 @@ Error SpdySession::InitializeWithSocket( base::StatsCounter spdy_sessions("spdy.sessions"); spdy_sessions.Increment(); - read_state_ = READ_STATE_DO_READ; + state_ = STATE_DO_READ; connection_ = connection.Pass(); is_secure_ = is_secure; certificate_error_code_ = certificate_error_code; @@ -493,12 +493,15 @@ Error SpdySession::InitializeWithSocket( NetLog::TYPE_SPDY_SESSION_INITIALIZED, connection_->socket()->NetLog().source().ToEventParametersCallback()); - int error = DoReadLoop(READ_STATE_DO_READ, OK); + int error = DoLoop(OK); if (error == ERR_IO_PENDING) error = OK; if (error == OK) { connection_->AddLayeredPool(this); SendInitialSettings(); + // Write out any data that we might have to send, such as the + // settings frame. + WriteSocketLater(); spdy_session_pool_ = spdy_session_pool; } return static_cast<Error>(error); @@ -508,7 +511,7 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { if (!verify_domain_authentication_) return true; - if (availability_state_ == STATE_CLOSED) + if (!IsConnected()) return false; SSLInfo ssl_info; @@ -528,7 +531,7 @@ int SpdySession::GetPushStream( const GURL& url, base::WeakPtr<SpdyStream>* stream, const BoundNetLog& stream_net_log) { - CHECK_NE(availability_state_, STATE_CLOSED); + CHECK_NE(state_, STATE_CLOSED); stream->reset(); @@ -946,11 +949,26 @@ bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { } LoadState SpdySession::GetLoadState() const { + // NOTE: The application only queries the LoadState via the + // SpdyNetworkTransaction, and details are only needed when + // we're in the process of connecting. + + // If we're connecting, defer to the connection to give us the actual + // LoadState. + if (state_ == STATE_CONNECTING) + return connection_->GetLoadState(); + // Just report that we're idle since the session could be doing // many things concurrently. return LOAD_STATE_IDLE; } +void SpdySession::OnReadComplete(int bytes_read) { + DCHECK_NE(state_, STATE_DO_READ); + DoLoop(bytes_read); +} + + void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, int status) { // TODO(mbelshe): We should send a RST_STREAM control frame here @@ -1018,11 +1036,13 @@ void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id, static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); } -int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { - DCHECK_EQ(read_state_, expected_read_state); +void SpdySession::StartRead() { + DCHECK_NE(state_, STATE_DO_READ_COMPLETE); + DoLoop(OK); +} - if (availability_state_ == STATE_CLOSED) - return ERR_CONNECTION_CLOSED; +int SpdySession::DoLoop(int result) { + bytes_read_ = 0; // The SpdyFramer will use callbacks onto |this| as it parses frames. // When errors occur, those callbacks can lead to teardown of all references @@ -1030,200 +1050,98 @@ int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { // cleanup. scoped_refptr<SpdySession> self(this); - int bytes_read_without_yielding = 0; - - while (true) { - switch (read_state_) { - case READ_STATE_DO_READ: + do { + switch (state_) { + case STATE_DO_READ: DCHECK_EQ(result, OK); result = DoRead(); break; - case READ_STATE_DO_READ_COMPLETE: - if (result > 0) - bytes_read_without_yielding += result; + case STATE_DO_READ_COMPLETE: result = DoReadComplete(result); break; + case STATE_CLOSED: + result = ERR_CONNECTION_CLOSED; + break; default: - NOTREACHED() << "read_state_: " << read_state_; + NOTREACHED() << "state_: " << state_; break; } + } while (result != ERR_IO_PENDING && state_ != STATE_CLOSED); + DCHECK(result == ERR_IO_PENDING || result == ERR_CONNECTION_CLOSED); - if (availability_state_ == STATE_CLOSED) - DCHECK_LT(result, ERR_IO_PENDING); - - if (result < ERR_IO_PENDING) - return result; - - if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { - read_state_ = READ_STATE_DO_READ; - base::MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop), - weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); - return ERR_IO_PENDING; - } - } + return result; } int SpdySession::DoRead() { - DCHECK_NE(availability_state_, STATE_CLOSED); + if (bytes_read_ > kMaxReadBytes) { + state_ = STATE_DO_READ; + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&SpdySession::StartRead, + weak_factory_.GetWeakPtr())); + return ERR_IO_PENDING; + } CHECK(connection_); CHECK(connection_->socket()); - read_state_ = READ_STATE_DO_READ_COMPLETE; + state_ = STATE_DO_READ_COMPLETE; return connection_->socket()->Read( read_buffer_.get(), kReadBufferSize, - base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop), - weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); + base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr())); } int SpdySession::DoReadComplete(int result) { - DCHECK_NE(availability_state_, STATE_CLOSED); // Parse a frame. For now this code requires that the frame fit into our - // buffer (kReadBufferSize). + // buffer (32KB). // TODO(mbelshe): support arbitrarily large frames! - if (result == 0) { - UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", - total_bytes_received_, 1, 100000000, 50); - CloseSessionOnError(ERR_CONNECTION_CLOSED, "Connection closed"); + if (result <= 0) { + // Session is tearing down. + Error error = static_cast<Error>(result); + if (result == 0) { + UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", + total_bytes_received_, 1, 100000000, 50); + error = ERR_CONNECTION_CLOSED; + } + CloseSessionOnError(error, "result is <= 0."); return ERR_CONNECTION_CLOSED; } - if (result < 0) { - CloseSessionOnError(static_cast<Error>(result), "result is <= 0."); - return result; - } - total_bytes_received_ += result; + bytes_read_ += result; last_activity_time_ = base::TimeTicks::Now(); DCHECK(buffered_spdy_framer_.get()); char* data = read_buffer_->data(); - while (result > 0) { + while (result && + buffered_spdy_framer_->error_code() == + SpdyFramer::SPDY_NO_ERROR) { uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); result -= bytes_processed; data += bytes_processed; - - if (availability_state_ == STATE_CLOSED) { - DCHECK_LT(error_on_close_, ERR_IO_PENDING); - return error_on_close_; - } - - DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); } - read_state_ = READ_STATE_DO_READ; - return OK; -} - -int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { - DCHECK_EQ(write_state_, expected_write_state); - - if (availability_state_ == STATE_CLOSED) + if (!IsConnected()) return ERR_CONNECTION_CLOSED; - scoped_refptr<SpdySession> self(this); - - do { - switch (write_state_) { - case WRITE_STATE_DO_WRITE: - DCHECK_EQ(result, OK); - result = DoWrite(); - break; - case WRITE_STATE_DO_WRITE_COMPLETE: - result = DoWriteComplete(result); - break; - case WRITE_STATE_IDLE: - default: - NOTREACHED() << "write_state_: " << write_state_; - break; - } - } while (write_state_ != WRITE_STATE_IDLE && - result != ERR_IO_PENDING && availability_state_ != STATE_CLOSED); - - return result; -} - -int SpdySession::DoWrite() { - DCHECK_NE(availability_state_, STATE_CLOSED); - - // Loop sending frames until we've sent everything or until the write - // returns error (or ERR_IO_PENDING). - DCHECK(buffered_spdy_framer_); - if (in_flight_write_) { - DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); - } else { - // Grab the next frame to send. - SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyBufferProducer> producer; - base::WeakPtr<SpdyStream> stream; - if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { - write_state_ = WRITE_STATE_IDLE; - return OK; - } - - if (stream.get()) - DCHECK(!stream->IsClosed()); - - // Activate the stream only when sending the SYN_STREAM frame to - // guarantee monotonically-increasing stream IDs. - if (frame_type == SYN_STREAM) { - if (stream.get() && stream->stream_id() == 0) { - scoped_ptr<SpdyStream> owned_stream = - ActivateCreatedStream(stream.get()); - InsertActivatedStream(owned_stream.Pass()); - } else { - NOTREACHED(); - return ERR_UNEXPECTED; - } - } - - in_flight_write_ = producer->ProduceBuffer(); - if (!in_flight_write_) { - NOTREACHED(); - return ERR_UNEXPECTED; - } - in_flight_write_frame_type_ = frame_type; - in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); - DCHECK_GE(in_flight_write_frame_size_, - buffered_spdy_framer_->GetFrameMinimumSize()); - in_flight_write_stream_ = stream; - } - - write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; - - // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems - // with Socket implementations that don't store their IOBuffer - // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). - scoped_refptr<IOBuffer> write_io_buffer = - in_flight_write_->GetIOBufferForRemainingData(); - // We keep |in_flight_write_| alive until OnWriteComplete(), so - // it's okay to use GetIOBufferForRemainingData() since the socket - // doesn't use the IOBuffer past OnWriteComplete(). - int rv = connection_->socket()->Write( - write_io_buffer.get(), - in_flight_write_->GetRemainingSize(), - base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop), - weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); - - return rv; + state_ = STATE_DO_READ; + return OK; } -int SpdySession::DoWriteComplete(int result) { - DCHECK_NE(availability_state_, STATE_CLOSED); - +void SpdySession::OnWriteComplete(int result) { // Releasing the in-flight write can have a side-effect of dropping // the last reference to |this|. Hold a reference through this // function. scoped_refptr<SpdySession> self(this); + DCHECK(write_pending_); DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); last_activity_time_ = base::TimeTicks::Now(); + write_pending_ = false; if (result < 0) { in_flight_write_.reset(); @@ -1231,7 +1149,7 @@ int SpdySession::DoWriteComplete(int result) { in_flight_write_frame_size_ = 0; in_flight_write_stream_.reset(); CloseSessionOnError(static_cast<Error>(result), "Write error"); - return result; + return; } // It should not be possible to have written more bytes than our @@ -1261,8 +1179,108 @@ int SpdySession::DoWriteComplete(int result) { } } - write_state_ = WRITE_STATE_DO_WRITE; - return OK; + // Write more data. We're already in a continuation, so we can go + // ahead and write it immediately (without going back to the message + // loop). + WriteSocketLater(); +} + +void SpdySession::WriteSocketLater() { + if (delayed_write_pending_) + return; + + if (!IsConnected()) + return; + + delayed_write_pending_ = true; + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&SpdySession::WriteSocket, weak_factory_.GetWeakPtr())); +} + +void SpdySession::WriteSocket() { + // This function should only be called via WriteSocketLater. + DCHECK(delayed_write_pending_); + delayed_write_pending_ = false; + + // If the socket isn't connected yet, just wait; we'll get called + // again when the socket connection completes. If the socket is + // closed, just return. + if (!IsConnected()) + return; + + if (write_pending_) // Another write is in progress still. + return; + + // Loop sending frames until we've sent everything or until the write + // returns error (or ERR_IO_PENDING). + DCHECK(buffered_spdy_framer_.get()); + while (true) { + if (in_flight_write_) { + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); + } else { + // Grab the next frame to send. + SpdyFrameType frame_type = DATA; + scoped_ptr<SpdyBufferProducer> producer; + base::WeakPtr<SpdyStream> stream; + if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) + break; + + if (stream.get()) + DCHECK(!stream->IsClosed()); + + // Activate the stream only when sending the SYN_STREAM frame to + // guarantee monotonically-increasing stream IDs. + if (frame_type == SYN_STREAM) { + if (stream.get() && stream->stream_id() == 0) { + scoped_ptr<SpdyStream> owned_stream = + ActivateCreatedStream(stream.get()); + InsertActivatedStream(owned_stream.Pass()); + } else { + NOTREACHED(); + continue; + } + } + + in_flight_write_ = producer->ProduceBuffer(); + if (!in_flight_write_) { + NOTREACHED(); + continue; + } + in_flight_write_frame_type_ = frame_type; + in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); + DCHECK_GE(in_flight_write_frame_size_, + buffered_spdy_framer_->GetFrameMinimumSize()); + in_flight_write_stream_ = stream; + } + + write_pending_ = true; + // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems + // with Socket implementations that don't store their IOBuffer + // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). + scoped_refptr<IOBuffer> write_io_buffer = + in_flight_write_->GetIOBufferForRemainingData(); + // We keep |in_flight_write_| alive until OnWriteComplete(), so + // it's okay to use GetIOBufferForRemainingData() since the socket + // doesn't use the IOBuffer past OnWriteComplete(). + int rv = connection_->socket()->Write( + write_io_buffer.get(), + in_flight_write_->GetRemainingSize(), + base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); + // Avoid persisting |write_io_buffer| past |in_flight_write_|'s + // lifetime (which will end if OnWriteComplete() is called below). + write_io_buffer = NULL; + if (rv == ERR_IO_PENDING) + break; + + // We sent the frame successfully. + OnWriteComplete(rv); + + // TODO(mbelshe): Test this error case. Maybe we should mark the socket + // as in an error state. + if (rv < 0) + break; + } } void SpdySession::CloseAllStreamsAfter(SpdyStreamId last_good_stream_id, @@ -1342,12 +1360,12 @@ void SpdySession::CloseSessionOnError(Error err, // Don't close twice. This can occur because we can have both // a read and a write outstanding, and each can complete with // an error. - if (availability_state_ != STATE_CLOSED) { + if (!IsClosed()) { UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", total_bytes_received_, 1, 100000000, 50); - availability_state_ = STATE_CLOSED; - error_on_close_ = err; + state_ = STATE_CLOSED; + error_ = err; // TODO(akalin): Move this after CloseAllStreams() once we're // owned by the pool. RemoveFromPool(); @@ -1384,7 +1402,7 @@ base::Value* SpdySession::GetInfoAsValue() const { SSLClientSocket::NextProtoToString( connection_->socket()->GetNegotiatedProtocol())); - dict->SetInteger("error", error_on_close_); + dict->SetInteger("error", error_); dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); dict->SetInteger("streams_initiated_count", streams_initiated_count_); @@ -1458,17 +1476,8 @@ void SpdySession::EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer, const base::WeakPtr<SpdyStream>& stream) { - if (availability_state_ == STATE_CLOSED) - return; - write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); - if (write_state_ == WRITE_STATE_IDLE) { - write_state_ = WRITE_STATE_DO_WRITE; - base::MessageLoop::current()->PostTask( - FROM_HERE, - base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop), - weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); - } + WriteSocketLater(); } void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { @@ -1572,9 +1581,6 @@ ServerBoundCertService* SpdySession::GetServerBoundCertService() const { } void SpdySession::OnError(SpdyFramer::SpdyError error_code) { - if (availability_state_ == STATE_CLOSED) - return; - RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(error_code)); std::string description = base::StringPrintf( @@ -1584,9 +1590,6 @@ void SpdySession::OnError(SpdyFramer::SpdyError error_code) { void SpdySession::OnStreamError(SpdyStreamId stream_id, const std::string& description) { - if (availability_state_ == STATE_CLOSED) - return; - ActiveStreamMap::iterator it = active_streams_.find(stream_id); if (it == active_streams_.end()) { // We still want to send a frame to reset the stream even if we @@ -1603,9 +1606,6 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, const char* data, size_t len, bool fin) { - if (availability_state_ == STATE_CLOSED) - return; - DCHECK_LT(len, 1u << 24); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( @@ -1647,9 +1647,6 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, } void SpdySession::OnSettings(bool clear_persisted) { - if (availability_state_ == STATE_CLOSED) - return; - if (clear_persisted) http_server_properties_->ClearSpdySettings(host_port_pair()); @@ -1664,9 +1661,6 @@ void SpdySession::OnSettings(bool clear_persisted) { void SpdySession::OnSetting(SpdySettingsIds id, uint8 flags, uint32 value) { - if (availability_state_ == STATE_CLOSED) - return; - HandleSetting(id, value); http_server_properties_->SetSpdySetting( host_port_pair(), @@ -1685,9 +1679,6 @@ void SpdySession::OnSetting(SpdySettingsIds id, void SpdySession::OnSynStreamCompressed( size_t uncompressed_size, size_t compressed_size) { - if (availability_state_ == STATE_CLOSED) - return; - // Make sure we avoid early decimal truncation. int compression_pct = 100 - (100 * compressed_size) / uncompressed_size; UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", @@ -1722,9 +1713,6 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, bool fin, bool unidirectional, const SpdyHeaderBlock& headers) { - if (availability_state_ == STATE_CLOSED) - return; - base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = base::TimeTicks::Now(); @@ -1886,9 +1874,6 @@ void SpdySession::DeleteExpiredPushedStreams() { void SpdySession::OnSynReply(SpdyStreamId stream_id, bool fin, const SpdyHeaderBlock& headers) { - if (availability_state_ == STATE_CLOSED) - return; - base::Time response_time = base::Time::Now(); base::TimeTicks recv_first_byte_time = base::TimeTicks::Now(); @@ -1925,9 +1910,6 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id, void SpdySession::OnHeaders(SpdyStreamId stream_id, bool fin, const SpdyHeaderBlock& headers) { - if (availability_state_ == STATE_CLOSED) - return; - if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, @@ -1955,9 +1937,6 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id, void SpdySession::OnRstStream(SpdyStreamId stream_id, SpdyRstStreamStatus status) { - if (availability_state_ == STATE_CLOSED) - return; - std::string description; net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_RST_STREAM, @@ -1991,9 +1970,6 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, SpdyGoAwayStatus status) { - if (availability_state_ == STATE_CLOSED) - return; - net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, base::Bind(&NetLogSpdyGoAwayCallback, last_accepted_stream_id, @@ -2006,9 +1982,6 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, } void SpdySession::OnPing(uint32 unique_id) { - if (availability_state_ == STATE_CLOSED) - return; - net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_PING, base::Bind(&NetLogSpdyPingCallback, unique_id, "received")); @@ -2037,9 +2010,6 @@ void SpdySession::OnPing(uint32 unique_id) { void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size) { - if (availability_state_ == STATE_CLOSED) - return; - DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); net_log_.AddEvent( NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, @@ -2588,7 +2558,7 @@ void SpdySession::ResumeSendStalledStreams() { // have to worry about streams being closed, as well as ourselves // being closed. - while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { + while (!IsClosed() && !IsSendStalled()) { size_t old_size = 0; if (DCHECK_IS_ON()) old_size = GetTotalSize(stream_send_unstall_queue_); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index f2ab6de..ed81c6c 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -50,9 +50,9 @@ const int kMaxSpdyFrameChunkSize = (2 * kMss) - 8; // Specifies the maxiumum concurrent streams server could send (via push). const int kMaxConcurrentPushedStreams = 1000; -// Specifies the maximum number of bytes to read synchronously before -// yielding. -const int kMaxReadBytesWithoutYielding = 32 * 1024; +// Specifies the number of bytes read synchronously (without yielding) if the +// data is available. +const int kMaxReadBytes = 32 * 1024; // The initial receive window size for both streams and sessions. const int32 kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB @@ -335,7 +335,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, uint32 delta_window_size); // If session is closed, no new streams/transactions should be created. - bool IsClosed() const { return availability_state_ == STATE_CLOSED; } + bool IsClosed() const { return state_ == STATE_CLOSED; } // Closes this session. This will close all active streams and mark // the session as permanently closed. @@ -499,30 +499,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::set<SpdyStream*> CreatedStreamSet; - enum AvailabilityState { - // The session is available in its socket pool and can be used - // freely. - STATE_AVAILABLE, - // The session can process data on existing streams but will - // refuse to create new ones. - STATE_GOING_AWAY, - // The session has been closed, is waiting to be deleted, and will - // refuse to process any more data. + enum State { + STATE_IDLE, + STATE_CONNECTING, + STATE_DO_READ, + STATE_DO_READ_COMPLETE, STATE_CLOSED }; - enum ReadState { - READ_STATE_DO_READ, - READ_STATE_DO_READ_COMPLETE, - }; - - enum WriteState { - // This state means the session's write queue is empty. - WRITE_STATE_IDLE, - WRITE_STATE_DO_WRITE, - WRITE_STATE_DO_WRITE_COMPLETE, - }; - virtual ~SpdySession(); // Called by SpdyStreamRequest to start a request to create a @@ -568,17 +552,24 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, SpdyRstStreamStatus status, const std::string& description); - // Advance the ReadState state machine. - int DoReadLoop(ReadState expected_read_state, int result); - // The implementations of the states of the ReadState state machine. + // Start the DoLoop to read data from socket. + void StartRead(); + + // Try to make progress by reading and processing data. + int DoLoop(int result); + // The implementations of STATE_DO_READ/STATE_DO_READ_COMPLETE state changes + // of the state machine. int DoRead(); - int DoReadComplete(int result); + int DoReadComplete(int bytes_read); + + // Check if session is connected or not. + bool IsConnected() const { + return state_ == STATE_DO_READ || state_ == STATE_DO_READ_COMPLETE; + } - // Advance the WriteState state machine. - int DoWriteLoop(WriteState expected_write_state, int result); - // The implementations of the states of the WriteState state machine. - int DoWrite(); - int DoWriteComplete(int result); + // IO Callbacks + void OnReadComplete(int result); + void OnWriteComplete(int result); // Send relevant SETTINGS. This is generally called on connection setup. void SendInitialSettings(); @@ -614,6 +605,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // haven't received any data in |kHungInterval| time period. void CheckPingStatus(base::TimeTicks last_check_time); + // Write current data to the socket. + void WriteSocketLater(); + void WriteSocket(); + // Get a new stream id. int GetNewStreamId(); @@ -877,7 +872,8 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, SpdyWriteQueue write_queue_; // Data for the frame we are currently sending. - + // Whether we have a socket write pending completion. + bool write_pending_; // The buffer we're currently writing. scoped_ptr<SpdyBuffer> in_flight_write_; // The type of the frame in |in_flight_write_|. @@ -888,6 +884,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // the socket completely. base::WeakPtr<SpdyStream> in_flight_write_stream_; + // Flag if we have a pending message scheduled for WriteSocket. + bool delayed_write_pending_; + // Flag if we're using an SSL connection for this SpdySession. bool is_secure_; @@ -897,16 +896,11 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Spdy Frame state. scoped_ptr<BufferedSpdyFramer> buffered_spdy_framer_; - AvailabilityState availability_state_; - - ReadState read_state_; - - WriteState write_state_; - // If an error has occurred on the session, the session is effectively // dead. Record this error here. When no error has occurred, |error_| will // be OK. - Error error_on_close_; + Error error_; + State state_; // Limits size_t max_concurrent_streams_; // 0 if no limit @@ -922,6 +916,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // SpdySession. It is used by the |Net.SpdySettingsCwnd...| histograms. int total_bytes_received_; + // |bytes_read_| keeps track of number of bytes read continously in the + // DoLoop() without yielding. + int bytes_read_; + bool sent_settings_; // Did this session send settings when it started. bool received_settings_; // Did this session receive at least one settings // frame. diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index d194e87..be14048 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -1813,11 +1813,10 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytesWithoutYielding / 4 - // (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); + // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytes); const int kPayloadSize = - kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); TestDataStream test_stream; scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize)); char* payload_data = payload->data(); @@ -1870,9 +1869,8 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to verify SpdySession::DoReadLoop doesn't - // post a task. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); + // Set up the TaskObserver to verify SpdySession::DoRead doesn't post a task. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -1905,11 +1903,10 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytesWithoutYielding / 4 - // (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); + // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytes); const int kPayloadSize = - kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); TestDataStream test_stream; scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize)); char* payload_data = payload->data(); @@ -1963,7 +1960,7 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) { EXPECT_TRUE(spdy_stream1->HasUrl()); // Set up the TaskObserver to verify SpdySession::DoRead posts a task. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); @@ -2004,12 +2001,11 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { CreateMockWrite(*req1, 0), }; - // Build buffer of size kMaxReadBytesWithoutYielding / 4 - // (-spdy_data_frame_size). - ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding); + // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size). + ASSERT_EQ(32 * 1024, kMaxReadBytes); TestDataStream test_stream; const int kEightKPayloadSize = - kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize(); + kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize(); scoped_refptr<net::IOBuffer> eightk_payload( new net::IOBuffer(kEightKPayloadSize)); char* eightk_payload_data = eightk_payload->data(); @@ -2076,9 +2072,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) { spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND); EXPECT_TRUE(spdy_stream1->HasUrl()); - // Set up the TaskObserver to monitor SpdySession::DoReadLoop - // posting of tasks. - SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop"); + // Set up the TaskObserver to monitor SpdySession::DoRead posting of tasks. + SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead"); // Run until 1st read. EXPECT_EQ(0u, delegate1.stream_id()); diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc index 8d7854f..c793aab 100644 --- a/net/spdy/spdy_write_queue.cc +++ b/net/spdy/spdy_write_queue.cc @@ -32,14 +32,6 @@ SpdyWriteQueue::~SpdyWriteQueue() { Clear(); } -bool SpdyWriteQueue::IsEmpty() const { - for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { - if (!queue_[i].empty()) - return false; - } - return true; -} - void SpdyWriteQueue::Enqueue(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> frame_producer, diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h index 60d93b4..fa194ef 100644 --- a/net/spdy/spdy_write_queue.h +++ b/net/spdy/spdy_write_queue.h @@ -27,8 +27,6 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { SpdyWriteQueue(); ~SpdyWriteQueue(); - bool IsEmpty() const; - // Enqueues the given frame producer of the given type at the given // priority associated with the given stream, which may be NULL if // the frame producer is not associated with a stream. If |stream| |