summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-07-10 01:29:35 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-07-10 01:29:35 +0000
commit1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61 (patch)
tree5eb3649aaf282c64d97dc72e29feecbe9aeb3be1 /net
parent82c4d3b8fd037c0ee2c7b391a6f97f0992ecb4a5 (diff)
downloadchromium_src-1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61.zip
chromium_src-1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61.tar.gz
chromium_src-1b02c6d99fa61ec7572cdd3c5b944a5ca694ea61.tar.bz2
Revert 210694 "fix loop"
> fix loop > > fix > > remove > > rename > > fix yielding > > add another expected > > add expected > > rem another > > remove dcheck > > rewrite state machine > > rem unused > > rem startread loop > > read loop > > fix test > > use read > > fix test > > fix test > > fix another staet > > fix test > > rem bool > > initial write loop > > fix var > > bail out if closed > > rem dcheck > > fix onreadcomplete > > fix loop > > check state > > remove availability state > > clean up states > > remove unused states TBR=akalin@chromium.org Review URL: https://codereview.chromium.org/18822006 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@210696 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/spdy/spdy_http_stream_unittest.cc24
-rw-r--r--net/spdy/spdy_network_transaction_unittest.cc14
-rw-r--r--net/spdy/spdy_session.cc400
-rw-r--r--net/spdy/spdy_session.h82
-rw-r--r--net/spdy/spdy_session_unittest.cc33
-rw-r--r--net/spdy/spdy_write_queue.cc8
-rw-r--r--net/spdy/spdy_write_queue.h2
7 files changed, 255 insertions, 308 deletions
diff --git a/net/spdy/spdy_http_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc
index 565de15..f91594e 100644
--- a/net/spdy/spdy_http_stream_unittest.cc
+++ b/net/spdy/spdy_http_stream_unittest.cc
@@ -254,6 +254,14 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) {
scoped_ptr<SpdyHttpStream> http_stream1(
new SpdyHttpStream(session_.get(), true));
+ ASSERT_EQ(OK,
+ http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY,
+ BoundNetLog(),
+ CompletionCallback()));
+ EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1,
+ callback1.callback()));
+ EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key));
+
HttpRequestInfo request2;
request2.method = "GET";
request2.url = GURL("http://www.google.com/");
@@ -263,15 +271,15 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) {
scoped_ptr<SpdyHttpStream> http_stream2(
new SpdyHttpStream(session_.get(), true));
- // First write.
ASSERT_EQ(OK,
- http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY,
+ http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY,
BoundNetLog(),
CompletionCallback()));
- EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1,
- callback1.callback()));
+ EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2,
+ callback2.callback()));
EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key));
+ // First write.
deterministic_data()->RunFor(1);
EXPECT_LE(0, callback1.WaitForResult());
@@ -282,14 +290,6 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) {
EXPECT_FALSE(http_stream2->GetLoadTimingInfo(&load_timing_info2));
// Second write.
- ASSERT_EQ(OK,
- http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY,
- BoundNetLog(),
- CompletionCallback()));
- EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2,
- callback2.callback()));
- EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key));
-
deterministic_data()->RunFor(1);
EXPECT_LE(0, callback2.WaitForResult());
TestLoadTimingReused(*http_stream2);
diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc
index 3d31648..e0e9cfc 100644
--- a/net/spdy/spdy_network_transaction_unittest.cc
+++ b/net/spdy/spdy_network_transaction_unittest.cc
@@ -2014,21 +2014,19 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) {
scoped_ptr<SpdyFrame> stream_reply(
spdy_util_.ConstructSpdyPostSynReply(NULL, 0));
+ scoped_ptr<SpdyFrame> stream_body(spdy_util_.ConstructSpdyBodyFrame(1, true));
MockRead reads[] = {
CreateMockRead(*stream_reply, 1),
- MockRead(ASYNC, 0, 4) // EOF
+ MockRead(ASYNC, 0, 3) // EOF
};
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyPost(
kRequestUrl, 1, kUploadDataSize, LOWEST, NULL, 0));
scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true));
- scoped_ptr<SpdyFrame> rst(
- spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR));
MockWrite writes[] = {
CreateMockWrite(*req, 0),
CreateMockWrite(*body, 2),
- CreateMockWrite(*rst, 3)
};
DeterministicSocketData data(reads, arraysize(reads),
@@ -2045,7 +2043,7 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) {
&CreatePostRequest(), callback.callback(), BoundNetLog());
EXPECT_EQ(ERR_IO_PENDING, rv);
- data.RunFor(4);
+ data.RunFor(2);
rv = callback.WaitForResult();
EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv);
data.RunFor(1);
@@ -3531,11 +3529,7 @@ TEST_P(SpdyNetworkTransactionTest, CorruptFrameSessionError) {
for (size_t i = 0; i < ARRAYSIZE_UNSAFE(test_cases); ++i) {
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true));
- scoped_ptr<SpdyFrame> rst(
- spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR));
- MockWrite writes[] = {
- CreateMockWrite(*req),
- CreateMockWrite(*rst),
+ MockWrite writes[] = { CreateMockWrite(*req), MockWrite(ASYNC, 0, 0) // EOF
};
scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true));
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 65aa43c..ae995e6 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -9,7 +9,6 @@
#include "base/basictypes.h"
#include "base/bind.h"
-#include "base/bind_helpers.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/message_loop.h"
@@ -353,14 +352,14 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key,
http_server_properties_(http_server_properties),
read_buffer_(new IOBuffer(kReadBufferSize)),
stream_hi_water_mark_(kFirstStreamId),
+ write_pending_(false),
in_flight_write_frame_type_(DATA),
in_flight_write_frame_size_(0),
+ delayed_write_pending_(false),
is_secure_(false),
certificate_error_code_(OK),
- availability_state_(STATE_AVAILABLE),
- read_state_(READ_STATE_DO_READ),
- write_state_(WRITE_STATE_IDLE),
- error_on_close_(OK),
+ error_(OK),
+ state_(STATE_IDLE),
max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
kInitialMaxConcurrentStreams :
initial_max_concurrent_streams),
@@ -372,6 +371,7 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key,
streams_pushed_and_claimed_count_(0),
streams_abandoned_count_(0),
total_bytes_received_(0),
+ bytes_read_(0),
sent_settings_(false),
received_settings_(false),
stalled_streams_(0),
@@ -412,8 +412,8 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key,
}
SpdySession::~SpdySession() {
- if (availability_state_ != STATE_CLOSED) {
- availability_state_ = STATE_CLOSED;
+ if (state_ != STATE_CLOSED) {
+ state_ = STATE_CLOSED;
// Cleanup all the streams.
CloseAllStreams(ERR_ABORTED);
@@ -450,7 +450,7 @@ Error SpdySession::InitializeWithSocket(
base::StatsCounter spdy_sessions("spdy.sessions");
spdy_sessions.Increment();
- read_state_ = READ_STATE_DO_READ;
+ state_ = STATE_DO_READ;
connection_ = connection.Pass();
is_secure_ = is_secure;
certificate_error_code_ = certificate_error_code;
@@ -493,12 +493,15 @@ Error SpdySession::InitializeWithSocket(
NetLog::TYPE_SPDY_SESSION_INITIALIZED,
connection_->socket()->NetLog().source().ToEventParametersCallback());
- int error = DoReadLoop(READ_STATE_DO_READ, OK);
+ int error = DoLoop(OK);
if (error == ERR_IO_PENDING)
error = OK;
if (error == OK) {
connection_->AddLayeredPool(this);
SendInitialSettings();
+ // Write out any data that we might have to send, such as the
+ // settings frame.
+ WriteSocketLater();
spdy_session_pool_ = spdy_session_pool;
}
return static_cast<Error>(error);
@@ -508,7 +511,7 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
if (!verify_domain_authentication_)
return true;
- if (availability_state_ == STATE_CLOSED)
+ if (!IsConnected())
return false;
SSLInfo ssl_info;
@@ -528,7 +531,7 @@ int SpdySession::GetPushStream(
const GURL& url,
base::WeakPtr<SpdyStream>* stream,
const BoundNetLog& stream_net_log) {
- CHECK_NE(availability_state_, STATE_CLOSED);
+ CHECK_NE(state_, STATE_CLOSED);
stream->reset();
@@ -946,11 +949,26 @@ bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
}
LoadState SpdySession::GetLoadState() const {
+ // NOTE: The application only queries the LoadState via the
+ // SpdyNetworkTransaction, and details are only needed when
+ // we're in the process of connecting.
+
+ // If we're connecting, defer to the connection to give us the actual
+ // LoadState.
+ if (state_ == STATE_CONNECTING)
+ return connection_->GetLoadState();
+
// Just report that we're idle since the session could be doing
// many things concurrently.
return LOAD_STATE_IDLE;
}
+void SpdySession::OnReadComplete(int bytes_read) {
+ DCHECK_NE(state_, STATE_DO_READ);
+ DoLoop(bytes_read);
+}
+
+
void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
int status) {
// TODO(mbelshe): We should send a RST_STREAM control frame here
@@ -1018,11 +1036,13 @@ void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id,
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
}
-int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
- DCHECK_EQ(read_state_, expected_read_state);
+void SpdySession::StartRead() {
+ DCHECK_NE(state_, STATE_DO_READ_COMPLETE);
+ DoLoop(OK);
+}
- if (availability_state_ == STATE_CLOSED)
- return ERR_CONNECTION_CLOSED;
+int SpdySession::DoLoop(int result) {
+ bytes_read_ = 0;
// The SpdyFramer will use callbacks onto |this| as it parses frames.
// When errors occur, those callbacks can lead to teardown of all references
@@ -1030,200 +1050,98 @@ int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
// cleanup.
scoped_refptr<SpdySession> self(this);
- int bytes_read_without_yielding = 0;
-
- while (true) {
- switch (read_state_) {
- case READ_STATE_DO_READ:
+ do {
+ switch (state_) {
+ case STATE_DO_READ:
DCHECK_EQ(result, OK);
result = DoRead();
break;
- case READ_STATE_DO_READ_COMPLETE:
- if (result > 0)
- bytes_read_without_yielding += result;
+ case STATE_DO_READ_COMPLETE:
result = DoReadComplete(result);
break;
+ case STATE_CLOSED:
+ result = ERR_CONNECTION_CLOSED;
+ break;
default:
- NOTREACHED() << "read_state_: " << read_state_;
+ NOTREACHED() << "state_: " << state_;
break;
}
+ } while (result != ERR_IO_PENDING && state_ != STATE_CLOSED);
+ DCHECK(result == ERR_IO_PENDING || result == ERR_CONNECTION_CLOSED);
- if (availability_state_ == STATE_CLOSED)
- DCHECK_LT(result, ERR_IO_PENDING);
-
- if (result < ERR_IO_PENDING)
- return result;
-
- if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
- read_state_ = READ_STATE_DO_READ;
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop),
- weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
- return ERR_IO_PENDING;
- }
- }
+ return result;
}
int SpdySession::DoRead() {
- DCHECK_NE(availability_state_, STATE_CLOSED);
+ if (bytes_read_ > kMaxReadBytes) {
+ state_ = STATE_DO_READ;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&SpdySession::StartRead,
+ weak_factory_.GetWeakPtr()));
+ return ERR_IO_PENDING;
+ }
CHECK(connection_);
CHECK(connection_->socket());
- read_state_ = READ_STATE_DO_READ_COMPLETE;
+ state_ = STATE_DO_READ_COMPLETE;
return connection_->socket()->Read(
read_buffer_.get(),
kReadBufferSize,
- base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop),
- weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
+ base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr()));
}
int SpdySession::DoReadComplete(int result) {
- DCHECK_NE(availability_state_, STATE_CLOSED);
// Parse a frame. For now this code requires that the frame fit into our
- // buffer (kReadBufferSize).
+ // buffer (32KB).
// TODO(mbelshe): support arbitrarily large frames!
- if (result == 0) {
- UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
- total_bytes_received_, 1, 100000000, 50);
- CloseSessionOnError(ERR_CONNECTION_CLOSED, "Connection closed");
+ if (result <= 0) {
+ // Session is tearing down.
+ Error error = static_cast<Error>(result);
+ if (result == 0) {
+ UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
+ total_bytes_received_, 1, 100000000, 50);
+ error = ERR_CONNECTION_CLOSED;
+ }
+ CloseSessionOnError(error, "result is <= 0.");
return ERR_CONNECTION_CLOSED;
}
- if (result < 0) {
- CloseSessionOnError(static_cast<Error>(result), "result is <= 0.");
- return result;
- }
-
total_bytes_received_ += result;
+ bytes_read_ += result;
last_activity_time_ = base::TimeTicks::Now();
DCHECK(buffered_spdy_framer_.get());
char* data = read_buffer_->data();
- while (result > 0) {
+ while (result &&
+ buffered_spdy_framer_->error_code() ==
+ SpdyFramer::SPDY_NO_ERROR) {
uint32 bytes_processed =
buffered_spdy_framer_->ProcessInput(data, result);
result -= bytes_processed;
data += bytes_processed;
-
- if (availability_state_ == STATE_CLOSED) {
- DCHECK_LT(error_on_close_, ERR_IO_PENDING);
- return error_on_close_;
- }
-
- DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
}
- read_state_ = READ_STATE_DO_READ;
- return OK;
-}
-
-int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
- DCHECK_EQ(write_state_, expected_write_state);
-
- if (availability_state_ == STATE_CLOSED)
+ if (!IsConnected())
return ERR_CONNECTION_CLOSED;
- scoped_refptr<SpdySession> self(this);
-
- do {
- switch (write_state_) {
- case WRITE_STATE_DO_WRITE:
- DCHECK_EQ(result, OK);
- result = DoWrite();
- break;
- case WRITE_STATE_DO_WRITE_COMPLETE:
- result = DoWriteComplete(result);
- break;
- case WRITE_STATE_IDLE:
- default:
- NOTREACHED() << "write_state_: " << write_state_;
- break;
- }
- } while (write_state_ != WRITE_STATE_IDLE &&
- result != ERR_IO_PENDING && availability_state_ != STATE_CLOSED);
-
- return result;
-}
-
-int SpdySession::DoWrite() {
- DCHECK_NE(availability_state_, STATE_CLOSED);
-
- // Loop sending frames until we've sent everything or until the write
- // returns error (or ERR_IO_PENDING).
- DCHECK(buffered_spdy_framer_);
- if (in_flight_write_) {
- DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
- } else {
- // Grab the next frame to send.
- SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyBufferProducer> producer;
- base::WeakPtr<SpdyStream> stream;
- if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
- write_state_ = WRITE_STATE_IDLE;
- return OK;
- }
-
- if (stream.get())
- DCHECK(!stream->IsClosed());
-
- // Activate the stream only when sending the SYN_STREAM frame to
- // guarantee monotonically-increasing stream IDs.
- if (frame_type == SYN_STREAM) {
- if (stream.get() && stream->stream_id() == 0) {
- scoped_ptr<SpdyStream> owned_stream =
- ActivateCreatedStream(stream.get());
- InsertActivatedStream(owned_stream.Pass());
- } else {
- NOTREACHED();
- return ERR_UNEXPECTED;
- }
- }
-
- in_flight_write_ = producer->ProduceBuffer();
- if (!in_flight_write_) {
- NOTREACHED();
- return ERR_UNEXPECTED;
- }
- in_flight_write_frame_type_ = frame_type;
- in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
- DCHECK_GE(in_flight_write_frame_size_,
- buffered_spdy_framer_->GetFrameMinimumSize());
- in_flight_write_stream_ = stream;
- }
-
- write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
-
- // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
- // with Socket implementations that don't store their IOBuffer
- // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
- scoped_refptr<IOBuffer> write_io_buffer =
- in_flight_write_->GetIOBufferForRemainingData();
- // We keep |in_flight_write_| alive until OnWriteComplete(), so
- // it's okay to use GetIOBufferForRemainingData() since the socket
- // doesn't use the IOBuffer past OnWriteComplete().
- int rv = connection_->socket()->Write(
- write_io_buffer.get(),
- in_flight_write_->GetRemainingSize(),
- base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop),
- weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
-
- return rv;
+ state_ = STATE_DO_READ;
+ return OK;
}
-int SpdySession::DoWriteComplete(int result) {
- DCHECK_NE(availability_state_, STATE_CLOSED);
-
+void SpdySession::OnWriteComplete(int result) {
// Releasing the in-flight write can have a side-effect of dropping
// the last reference to |this|. Hold a reference through this
// function.
scoped_refptr<SpdySession> self(this);
+ DCHECK(write_pending_);
DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
last_activity_time_ = base::TimeTicks::Now();
+ write_pending_ = false;
if (result < 0) {
in_flight_write_.reset();
@@ -1231,7 +1149,7 @@ int SpdySession::DoWriteComplete(int result) {
in_flight_write_frame_size_ = 0;
in_flight_write_stream_.reset();
CloseSessionOnError(static_cast<Error>(result), "Write error");
- return result;
+ return;
}
// It should not be possible to have written more bytes than our
@@ -1261,8 +1179,108 @@ int SpdySession::DoWriteComplete(int result) {
}
}
- write_state_ = WRITE_STATE_DO_WRITE;
- return OK;
+ // Write more data. We're already in a continuation, so we can go
+ // ahead and write it immediately (without going back to the message
+ // loop).
+ WriteSocketLater();
+}
+
+void SpdySession::WriteSocketLater() {
+ if (delayed_write_pending_)
+ return;
+
+ if (!IsConnected())
+ return;
+
+ delayed_write_pending_ = true;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&SpdySession::WriteSocket, weak_factory_.GetWeakPtr()));
+}
+
+void SpdySession::WriteSocket() {
+ // This function should only be called via WriteSocketLater.
+ DCHECK(delayed_write_pending_);
+ delayed_write_pending_ = false;
+
+ // If the socket isn't connected yet, just wait; we'll get called
+ // again when the socket connection completes. If the socket is
+ // closed, just return.
+ if (!IsConnected())
+ return;
+
+ if (write_pending_) // Another write is in progress still.
+ return;
+
+ // Loop sending frames until we've sent everything or until the write
+ // returns error (or ERR_IO_PENDING).
+ DCHECK(buffered_spdy_framer_.get());
+ while (true) {
+ if (in_flight_write_) {
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
+ } else {
+ // Grab the next frame to send.
+ SpdyFrameType frame_type = DATA;
+ scoped_ptr<SpdyBufferProducer> producer;
+ base::WeakPtr<SpdyStream> stream;
+ if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
+ break;
+
+ if (stream.get())
+ DCHECK(!stream->IsClosed());
+
+ // Activate the stream only when sending the SYN_STREAM frame to
+ // guarantee monotonically-increasing stream IDs.
+ if (frame_type == SYN_STREAM) {
+ if (stream.get() && stream->stream_id() == 0) {
+ scoped_ptr<SpdyStream> owned_stream =
+ ActivateCreatedStream(stream.get());
+ InsertActivatedStream(owned_stream.Pass());
+ } else {
+ NOTREACHED();
+ continue;
+ }
+ }
+
+ in_flight_write_ = producer->ProduceBuffer();
+ if (!in_flight_write_) {
+ NOTREACHED();
+ continue;
+ }
+ in_flight_write_frame_type_ = frame_type;
+ in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
+ DCHECK_GE(in_flight_write_frame_size_,
+ buffered_spdy_framer_->GetFrameMinimumSize());
+ in_flight_write_stream_ = stream;
+ }
+
+ write_pending_ = true;
+ // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
+ // with Socket implementations that don't store their IOBuffer
+ // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
+ scoped_refptr<IOBuffer> write_io_buffer =
+ in_flight_write_->GetIOBufferForRemainingData();
+ // We keep |in_flight_write_| alive until OnWriteComplete(), so
+ // it's okay to use GetIOBufferForRemainingData() since the socket
+ // doesn't use the IOBuffer past OnWriteComplete().
+ int rv = connection_->socket()->Write(
+ write_io_buffer.get(),
+ in_flight_write_->GetRemainingSize(),
+ base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
+ // Avoid persisting |write_io_buffer| past |in_flight_write_|'s
+ // lifetime (which will end if OnWriteComplete() is called below).
+ write_io_buffer = NULL;
+ if (rv == ERR_IO_PENDING)
+ break;
+
+ // We sent the frame successfully.
+ OnWriteComplete(rv);
+
+ // TODO(mbelshe): Test this error case. Maybe we should mark the socket
+ // as in an error state.
+ if (rv < 0)
+ break;
+ }
}
void SpdySession::CloseAllStreamsAfter(SpdyStreamId last_good_stream_id,
@@ -1342,12 +1360,12 @@ void SpdySession::CloseSessionOnError(Error err,
// Don't close twice. This can occur because we can have both
// a read and a write outstanding, and each can complete with
// an error.
- if (availability_state_ != STATE_CLOSED) {
+ if (!IsClosed()) {
UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
total_bytes_received_, 1, 100000000, 50);
- availability_state_ = STATE_CLOSED;
- error_on_close_ = err;
+ state_ = STATE_CLOSED;
+ error_ = err;
// TODO(akalin): Move this after CloseAllStreams() once we're
// owned by the pool.
RemoveFromPool();
@@ -1384,7 +1402,7 @@ base::Value* SpdySession::GetInfoAsValue() const {
SSLClientSocket::NextProtoToString(
connection_->socket()->GetNegotiatedProtocol()));
- dict->SetInteger("error", error_on_close_);
+ dict->SetInteger("error", error_);
dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
dict->SetInteger("streams_initiated_count", streams_initiated_count_);
@@ -1458,17 +1476,8 @@ void SpdySession::EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> producer,
const base::WeakPtr<SpdyStream>& stream) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
- if (write_state_ == WRITE_STATE_IDLE) {
- write_state_ = WRITE_STATE_DO_WRITE;
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop),
- weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
- }
+ WriteSocketLater();
}
void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
@@ -1572,9 +1581,6 @@ ServerBoundCertService* SpdySession::GetServerBoundCertService() const {
}
void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(error_code));
std::string description = base::StringPrintf(
@@ -1584,9 +1590,6 @@ void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
void SpdySession::OnStreamError(SpdyStreamId stream_id,
const std::string& description) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
ActiveStreamMap::iterator it = active_streams_.find(stream_id);
if (it == active_streams_.end()) {
// We still want to send a frame to reset the stream even if we
@@ -1603,9 +1606,6 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
const char* data,
size_t len,
bool fin) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
DCHECK_LT(len, 1u << 24);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
@@ -1647,9 +1647,6 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
}
void SpdySession::OnSettings(bool clear_persisted) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
if (clear_persisted)
http_server_properties_->ClearSpdySettings(host_port_pair());
@@ -1664,9 +1661,6 @@ void SpdySession::OnSettings(bool clear_persisted) {
void SpdySession::OnSetting(SpdySettingsIds id,
uint8 flags,
uint32 value) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
HandleSetting(id, value);
http_server_properties_->SetSpdySetting(
host_port_pair(),
@@ -1685,9 +1679,6 @@ void SpdySession::OnSetting(SpdySettingsIds id,
void SpdySession::OnSynStreamCompressed(
size_t uncompressed_size,
size_t compressed_size) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
// Make sure we avoid early decimal truncation.
int compression_pct = 100 - (100 * compressed_size) / uncompressed_size;
UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
@@ -1722,9 +1713,6 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id,
bool fin,
bool unidirectional,
const SpdyHeaderBlock& headers) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
base::Time response_time = base::Time::Now();
base::TimeTicks recv_first_byte_time = base::TimeTicks::Now();
@@ -1886,9 +1874,6 @@ void SpdySession::DeleteExpiredPushedStreams() {
void SpdySession::OnSynReply(SpdyStreamId stream_id,
bool fin,
const SpdyHeaderBlock& headers) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
base::Time response_time = base::Time::Now();
base::TimeTicks recv_first_byte_time = base::TimeTicks::Now();
@@ -1925,9 +1910,6 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id,
void SpdySession::OnHeaders(SpdyStreamId stream_id,
bool fin,
const SpdyHeaderBlock& headers) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
@@ -1955,9 +1937,6 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id,
void SpdySession::OnRstStream(SpdyStreamId stream_id,
SpdyRstStreamStatus status) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
std::string description;
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RST_STREAM,
@@ -1991,9 +1970,6 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id,
void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
SpdyGoAwayStatus status) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
base::Bind(&NetLogSpdyGoAwayCallback,
last_accepted_stream_id,
@@ -2006,9 +1982,6 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
}
void SpdySession::OnPing(uint32 unique_id) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_PING,
base::Bind(&NetLogSpdyPingCallback, unique_id, "received"));
@@ -2037,9 +2010,6 @@ void SpdySession::OnPing(uint32 unique_id) {
void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
uint32 delta_window_size) {
- if (availability_state_ == STATE_CLOSED)
- return;
-
DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
@@ -2588,7 +2558,7 @@ void SpdySession::ResumeSendStalledStreams() {
// have to worry about streams being closed, as well as ourselves
// being closed.
- while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
+ while (!IsClosed() && !IsSendStalled()) {
size_t old_size = 0;
if (DCHECK_IS_ON())
old_size = GetTotalSize(stream_send_unstall_queue_);
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index f2ab6de..ed81c6c 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -50,9 +50,9 @@ const int kMaxSpdyFrameChunkSize = (2 * kMss) - 8;
// Specifies the maxiumum concurrent streams server could send (via push).
const int kMaxConcurrentPushedStreams = 1000;
-// Specifies the maximum number of bytes to read synchronously before
-// yielding.
-const int kMaxReadBytesWithoutYielding = 32 * 1024;
+// Specifies the number of bytes read synchronously (without yielding) if the
+// data is available.
+const int kMaxReadBytes = 32 * 1024;
// The initial receive window size for both streams and sessions.
const int32 kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB
@@ -335,7 +335,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
uint32 delta_window_size);
// If session is closed, no new streams/transactions should be created.
- bool IsClosed() const { return availability_state_ == STATE_CLOSED; }
+ bool IsClosed() const { return state_ == STATE_CLOSED; }
// Closes this session. This will close all active streams and mark
// the session as permanently closed.
@@ -499,30 +499,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
typedef std::set<SpdyStream*> CreatedStreamSet;
- enum AvailabilityState {
- // The session is available in its socket pool and can be used
- // freely.
- STATE_AVAILABLE,
- // The session can process data on existing streams but will
- // refuse to create new ones.
- STATE_GOING_AWAY,
- // The session has been closed, is waiting to be deleted, and will
- // refuse to process any more data.
+ enum State {
+ STATE_IDLE,
+ STATE_CONNECTING,
+ STATE_DO_READ,
+ STATE_DO_READ_COMPLETE,
STATE_CLOSED
};
- enum ReadState {
- READ_STATE_DO_READ,
- READ_STATE_DO_READ_COMPLETE,
- };
-
- enum WriteState {
- // This state means the session's write queue is empty.
- WRITE_STATE_IDLE,
- WRITE_STATE_DO_WRITE,
- WRITE_STATE_DO_WRITE_COMPLETE,
- };
-
virtual ~SpdySession();
// Called by SpdyStreamRequest to start a request to create a
@@ -568,17 +552,24 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
SpdyRstStreamStatus status,
const std::string& description);
- // Advance the ReadState state machine.
- int DoReadLoop(ReadState expected_read_state, int result);
- // The implementations of the states of the ReadState state machine.
+ // Start the DoLoop to read data from socket.
+ void StartRead();
+
+ // Try to make progress by reading and processing data.
+ int DoLoop(int result);
+ // The implementations of STATE_DO_READ/STATE_DO_READ_COMPLETE state changes
+ // of the state machine.
int DoRead();
- int DoReadComplete(int result);
+ int DoReadComplete(int bytes_read);
+
+ // Check if session is connected or not.
+ bool IsConnected() const {
+ return state_ == STATE_DO_READ || state_ == STATE_DO_READ_COMPLETE;
+ }
- // Advance the WriteState state machine.
- int DoWriteLoop(WriteState expected_write_state, int result);
- // The implementations of the states of the WriteState state machine.
- int DoWrite();
- int DoWriteComplete(int result);
+ // IO Callbacks
+ void OnReadComplete(int result);
+ void OnWriteComplete(int result);
// Send relevant SETTINGS. This is generally called on connection setup.
void SendInitialSettings();
@@ -614,6 +605,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// haven't received any data in |kHungInterval| time period.
void CheckPingStatus(base::TimeTicks last_check_time);
+ // Write current data to the socket.
+ void WriteSocketLater();
+ void WriteSocket();
+
// Get a new stream id.
int GetNewStreamId();
@@ -877,7 +872,8 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
SpdyWriteQueue write_queue_;
// Data for the frame we are currently sending.
-
+ // Whether we have a socket write pending completion.
+ bool write_pending_;
// The buffer we're currently writing.
scoped_ptr<SpdyBuffer> in_flight_write_;
// The type of the frame in |in_flight_write_|.
@@ -888,6 +884,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// the socket completely.
base::WeakPtr<SpdyStream> in_flight_write_stream_;
+ // Flag if we have a pending message scheduled for WriteSocket.
+ bool delayed_write_pending_;
+
// Flag if we're using an SSL connection for this SpdySession.
bool is_secure_;
@@ -897,16 +896,11 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Spdy Frame state.
scoped_ptr<BufferedSpdyFramer> buffered_spdy_framer_;
- AvailabilityState availability_state_;
-
- ReadState read_state_;
-
- WriteState write_state_;
-
// If an error has occurred on the session, the session is effectively
// dead. Record this error here. When no error has occurred, |error_| will
// be OK.
- Error error_on_close_;
+ Error error_;
+ State state_;
// Limits
size_t max_concurrent_streams_; // 0 if no limit
@@ -922,6 +916,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// SpdySession. It is used by the |Net.SpdySettingsCwnd...| histograms.
int total_bytes_received_;
+ // |bytes_read_| keeps track of number of bytes read continously in the
+ // DoLoop() without yielding.
+ int bytes_read_;
+
bool sent_settings_; // Did this session send settings when it started.
bool received_settings_; // Did this session receive at least one settings
// frame.
diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc
index d194e87..be14048 100644
--- a/net/spdy/spdy_session_unittest.cc
+++ b/net/spdy/spdy_session_unittest.cc
@@ -1813,11 +1813,10 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
CreateMockWrite(*req1, 0),
};
- // Build buffer of size kMaxReadBytesWithoutYielding / 4
- // (-spdy_data_frame_size).
- ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding);
+ // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size).
+ ASSERT_EQ(32 * 1024, kMaxReadBytes);
const int kPayloadSize =
- kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize();
+ kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize();
TestDataStream test_stream;
scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize));
char* payload_data = payload->data();
@@ -1870,9 +1869,8 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrl());
- // Set up the TaskObserver to verify SpdySession::DoReadLoop doesn't
- // post a task.
- SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop");
+ // Set up the TaskObserver to verify SpdySession::DoRead doesn't post a task.
+ SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead");
// Run until 1st read.
EXPECT_EQ(0u, delegate1.stream_id());
@@ -1905,11 +1903,10 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
CreateMockWrite(*req1, 0),
};
- // Build buffer of size kMaxReadBytesWithoutYielding / 4
- // (-spdy_data_frame_size).
- ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding);
+ // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size).
+ ASSERT_EQ(32 * 1024, kMaxReadBytes);
const int kPayloadSize =
- kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize();
+ kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize();
TestDataStream test_stream;
scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize));
char* payload_data = payload->data();
@@ -1963,7 +1960,7 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
EXPECT_TRUE(spdy_stream1->HasUrl());
// Set up the TaskObserver to verify SpdySession::DoRead posts a task.
- SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop");
+ SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead");
// Run until 1st read.
EXPECT_EQ(0u, delegate1.stream_id());
@@ -2004,12 +2001,11 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
CreateMockWrite(*req1, 0),
};
- // Build buffer of size kMaxReadBytesWithoutYielding / 4
- // (-spdy_data_frame_size).
- ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding);
+ // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size).
+ ASSERT_EQ(32 * 1024, kMaxReadBytes);
TestDataStream test_stream;
const int kEightKPayloadSize =
- kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize();
+ kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize();
scoped_refptr<net::IOBuffer> eightk_payload(
new net::IOBuffer(kEightKPayloadSize));
char* eightk_payload_data = eightk_payload->data();
@@ -2076,9 +2072,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrl());
- // Set up the TaskObserver to monitor SpdySession::DoReadLoop
- // posting of tasks.
- SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop");
+ // Set up the TaskObserver to monitor SpdySession::DoRead posting of tasks.
+ SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead");
// Run until 1st read.
EXPECT_EQ(0u, delegate1.stream_id());
diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc
index 8d7854f..c793aab 100644
--- a/net/spdy/spdy_write_queue.cc
+++ b/net/spdy/spdy_write_queue.cc
@@ -32,14 +32,6 @@ SpdyWriteQueue::~SpdyWriteQueue() {
Clear();
}
-bool SpdyWriteQueue::IsEmpty() const {
- for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
- if (!queue_[i].empty())
- return false;
- }
- return true;
-}
-
void SpdyWriteQueue::Enqueue(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> frame_producer,
diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h
index 60d93b4..fa194ef 100644
--- a/net/spdy/spdy_write_queue.h
+++ b/net/spdy/spdy_write_queue.h
@@ -27,8 +27,6 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
SpdyWriteQueue();
~SpdyWriteQueue();
- bool IsEmpty() const;
-
// Enqueues the given frame producer of the given type at the given
// priority associated with the given stream, which may be NULL if
// the frame producer is not associated with a stream. If |stream|