summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/spdy/spdy_http_stream.cc2
-rw-r--r--net/spdy/spdy_http_stream_unittest.cc24
-rw-r--r--net/spdy/spdy_network_transaction_unittest.cc14
-rw-r--r--net/spdy/spdy_proxy_client_socket_unittest.cc6
-rw-r--r--net/spdy/spdy_session.cc449
-rw-r--r--net/spdy/spdy_session.h93
-rw-r--r--net/spdy/spdy_session_unittest.cc102
-rw-r--r--net/spdy/spdy_test_util_common.cc3
-rw-r--r--net/spdy/spdy_write_queue.cc8
-rw-r--r--net/spdy/spdy_write_queue.h4
10 files changed, 401 insertions, 304 deletions
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc
index 1e7afc7..4a44c23 100644
--- a/net/spdy/spdy_http_stream.cc
+++ b/net/spdy/spdy_http_stream.cc
@@ -54,8 +54,6 @@ int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
DCHECK(!stream_.get());
- if (spdy_session_->IsClosed())
- return ERR_CONNECTION_CLOSED;
request_info_ = request_info;
if (request_info_->method == "GET") {
diff --git a/net/spdy/spdy_http_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc
index f91594e..565de15 100644
--- a/net/spdy/spdy_http_stream_unittest.cc
+++ b/net/spdy/spdy_http_stream_unittest.cc
@@ -254,14 +254,6 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) {
scoped_ptr<SpdyHttpStream> http_stream1(
new SpdyHttpStream(session_.get(), true));
- ASSERT_EQ(OK,
- http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY,
- BoundNetLog(),
- CompletionCallback()));
- EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1,
- callback1.callback()));
- EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key));
-
HttpRequestInfo request2;
request2.method = "GET";
request2.url = GURL("http://www.google.com/");
@@ -271,15 +263,15 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) {
scoped_ptr<SpdyHttpStream> http_stream2(
new SpdyHttpStream(session_.get(), true));
+ // First write.
ASSERT_EQ(OK,
- http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY,
+ http_stream1->InitializeStream(&request1, DEFAULT_PRIORITY,
BoundNetLog(),
CompletionCallback()));
- EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2,
- callback2.callback()));
+ EXPECT_EQ(ERR_IO_PENDING, http_stream1->SendRequest(headers1, &response1,
+ callback1.callback()));
EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key));
- // First write.
deterministic_data()->RunFor(1);
EXPECT_LE(0, callback1.WaitForResult());
@@ -290,6 +282,14 @@ TEST_P(SpdyHttpStreamTest, LoadTimingTwoRequests) {
EXPECT_FALSE(http_stream2->GetLoadTimingInfo(&load_timing_info2));
// Second write.
+ ASSERT_EQ(OK,
+ http_stream2->InitializeStream(&request2, DEFAULT_PRIORITY,
+ BoundNetLog(),
+ CompletionCallback()));
+ EXPECT_EQ(ERR_IO_PENDING, http_stream2->SendRequest(headers2, &response2,
+ callback2.callback()));
+ EXPECT_TRUE(HasSpdySession(http_session_->spdy_session_pool(), key));
+
deterministic_data()->RunFor(1);
EXPECT_LE(0, callback2.WaitForResult());
TestLoadTimingReused(*http_stream2);
diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc
index db1dc7d..36d80c6 100644
--- a/net/spdy/spdy_network_transaction_unittest.cc
+++ b/net/spdy/spdy_network_transaction_unittest.cc
@@ -2016,19 +2016,21 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) {
scoped_ptr<SpdyFrame> stream_reply(
spdy_util_.ConstructSpdyPostSynReply(NULL, 0));
- scoped_ptr<SpdyFrame> stream_body(spdy_util_.ConstructSpdyBodyFrame(1, true));
MockRead reads[] = {
CreateMockRead(*stream_reply, 1),
- MockRead(ASYNC, 0, 3) // EOF
+ MockRead(ASYNC, 0, 4) // EOF
};
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyPost(
kRequestUrl, 1, kUploadDataSize, LOWEST, NULL, 0));
scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true));
+ scoped_ptr<SpdyFrame> rst(
+ spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR));
MockWrite writes[] = {
CreateMockWrite(*req, 0),
CreateMockWrite(*body, 2),
+ CreateMockWrite(*rst, 3)
};
DeterministicSocketData data(reads, arraysize(reads),
@@ -2045,7 +2047,7 @@ TEST_P(SpdyNetworkTransactionTest, PostWithEarlySynReply) {
&CreatePostRequest(), callback.callback(), BoundNetLog());
EXPECT_EQ(ERR_IO_PENDING, rv);
- data.RunFor(2);
+ data.RunFor(4);
rv = callback.WaitForResult();
EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv);
data.RunFor(1);
@@ -3531,7 +3533,11 @@ TEST_P(SpdyNetworkTransactionTest, CorruptFrameSessionError) {
for (size_t i = 0; i < ARRAYSIZE_UNSAFE(test_cases); ++i) {
scoped_ptr<SpdyFrame> req(
spdy_util_.ConstructSpdyGet(NULL, 0, false, 1, LOWEST, true));
- MockWrite writes[] = { CreateMockWrite(*req), MockWrite(ASYNC, 0, 0) // EOF
+ scoped_ptr<SpdyFrame> rst(
+ spdy_util_.ConstructSpdyRstStream(1, RST_STREAM_PROTOCOL_ERROR));
+ MockWrite writes[] = {
+ CreateMockWrite(*req),
+ CreateMockWrite(*rst),
};
scoped_ptr<SpdyFrame> body(spdy_util_.ConstructSpdyBodyFrame(1, true));
diff --git a/net/spdy/spdy_proxy_client_socket_unittest.cc b/net/spdy/spdy_proxy_client_socket_unittest.cc
index 5af767b..95ca4b5 100644
--- a/net/spdy/spdy_proxy_client_socket_unittest.cc
+++ b/net/spdy/spdy_proxy_client_socket_unittest.cc
@@ -1176,7 +1176,7 @@ TEST_P(SpdyProxyClientSocketTest, WritePendingOnClose) {
scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame());
MockWrite writes[] = {
CreateMockWrite(*conn, 0, SYNCHRONOUS),
- MockWrite(ASYNC, ERR_IO_PENDING, 2),
+ MockWrite(ASYNC, ERR_ABORTED, 2),
};
scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame());
@@ -1267,7 +1267,7 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePending) {
scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame());
MockWrite writes[] = {
CreateMockWrite(*conn, 0, SYNCHRONOUS),
- MockWrite(ASYNC, ERR_IO_PENDING, 2),
+ MockWrite(ASYNC, ERR_ABORTED, 2),
};
scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame());
@@ -1390,7 +1390,7 @@ TEST_P(SpdyProxyClientSocketTest, RstWithReadAndWritePendingDelete) {
scoped_ptr<SpdyFrame> conn(ConstructConnectRequestFrame());
MockWrite writes[] = {
CreateMockWrite(*conn, 0, SYNCHRONOUS),
- MockWrite(ASYNC, ERR_IO_PENDING, 2),
+ MockWrite(ASYNC, ERR_ABORTED, 2),
};
scoped_ptr<SpdyFrame> resp(ConstructConnectReplyFrame());
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index b607796..879d645 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -9,6 +9,7 @@
#include "base/basictypes.h"
#include "base/bind.h"
+#include "base/bind_helpers.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/message_loop.h"
@@ -352,14 +353,14 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key,
http_server_properties_(http_server_properties),
read_buffer_(new IOBuffer(kReadBufferSize)),
stream_hi_water_mark_(kFirstStreamId),
- write_pending_(false),
in_flight_write_frame_type_(DATA),
in_flight_write_frame_size_(0),
- delayed_write_pending_(false),
is_secure_(false),
certificate_error_code_(OK),
- error_(OK),
- state_(STATE_IDLE),
+ availability_state_(STATE_AVAILABLE),
+ read_state_(READ_STATE_DO_READ),
+ write_state_(WRITE_STATE_DO_WRITE),
+ error_on_close_(OK),
max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
kInitialMaxConcurrentStreams :
initial_max_concurrent_streams),
@@ -371,7 +372,6 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key,
streams_pushed_and_claimed_count_(0),
streams_abandoned_count_(0),
total_bytes_received_(0),
- bytes_read_(0),
sent_settings_(false),
received_settings_(false),
stalled_streams_(0),
@@ -412,8 +412,9 @@ SpdySession::SpdySession(const SpdySessionKey& spdy_session_key,
}
SpdySession::~SpdySession() {
- if (state_ != STATE_CLOSED) {
- state_ = STATE_CLOSED;
+ if (availability_state_ != STATE_CLOSED) {
+ availability_state_ = STATE_CLOSED;
+ error_on_close_ = ERR_ABORTED;
// Cleanup all the streams.
CloseAllStreams(ERR_ABORTED);
@@ -451,7 +452,10 @@ Error SpdySession::InitializeWithSocket(
base::StatsCounter spdy_sessions("spdy.sessions");
spdy_sessions.Increment();
- state_ = STATE_DO_READ;
+ DCHECK_EQ(availability_state_, STATE_AVAILABLE);
+ DCHECK_EQ(read_state_, READ_STATE_DO_READ);
+ DCHECK_EQ(write_state_, WRITE_STATE_DO_WRITE);
+
connection_ = connection.Pass();
is_secure_ = is_secure;
certificate_error_code_ = certificate_error_code;
@@ -500,15 +504,12 @@ Error SpdySession::InitializeWithSocket(
NetLog::TYPE_SPDY_SESSION_INITIALIZED,
connection_->socket()->NetLog().source().ToEventParametersCallback());
- int error = DoLoop(OK);
+ int error = DoReadLoop(READ_STATE_DO_READ, OK);
if (error == ERR_IO_PENDING)
error = OK;
if (error == OK) {
connection_->AddLayeredPool(this);
SendInitialSettings();
- // Write out any data that we might have to send, such as the
- // settings frame.
- WriteSocketLater();
spdy_session_pool_ = spdy_session_pool;
}
return static_cast<Error>(error);
@@ -518,7 +519,7 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
if (!verify_domain_authentication_)
return true;
- if (!IsConnected())
+ if (availability_state_ == STATE_CLOSED)
return false;
SSLInfo ssl_info;
@@ -538,7 +539,8 @@ int SpdySession::GetPushStream(
const GURL& url,
base::WeakPtr<SpdyStream>* stream,
const BoundNetLog& stream_net_log) {
- CHECK_NE(state_, STATE_CLOSED);
+ if (availability_state_ == STATE_CLOSED)
+ return ERR_CONNECTION_CLOSED;
stream->reset();
@@ -565,6 +567,11 @@ int SpdySession::GetPushStream(
int SpdySession::TryCreateStream(SpdyStreamRequest* request,
base::WeakPtr<SpdyStream>* stream) {
+ // TODO(akalin): Also refuse to create the stream when
+ // |availability_state_| == STATE_GOING_AWAY.
+ if (availability_state_ == STATE_CLOSED)
+ return ERR_CONNECTION_CLOSED;
+
if (!max_concurrent_streams_ ||
(active_streams_.size() + created_streams_.size() <
max_concurrent_streams_)) {
@@ -582,6 +589,11 @@ int SpdySession::CreateStream(const SpdyStreamRequest& request,
DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
DCHECK_LT(request.priority(), NUM_PRIORITIES);
+ // TODO(akalin): Also refuse to create the stream when
+ // |availability_state_| == STATE_GOING_AWAY.
+ if (availability_state_ == STATE_CLOSED)
+ return ERR_CONNECTION_CLOSED;
+
// Make sure that we don't try to send https/wss over an unauthenticated, but
// encrypted SSL socket.
if (is_secure_ && certificate_error_code_ != OK &&
@@ -956,26 +968,11 @@ bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
}
LoadState SpdySession::GetLoadState() const {
- // NOTE: The application only queries the LoadState via the
- // SpdyNetworkTransaction, and details are only needed when
- // we're in the process of connecting.
-
- // If we're connecting, defer to the connection to give us the actual
- // LoadState.
- if (state_ == STATE_CONNECTING)
- return connection_->GetLoadState();
-
// Just report that we're idle since the session could be doing
// many things concurrently.
return LOAD_STATE_IDLE;
}
-void SpdySession::OnReadComplete(int bytes_read) {
- DCHECK_NE(state_, STATE_DO_READ);
- DoLoop(bytes_read);
-}
-
-
void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
int status) {
// TODO(mbelshe): We should send a RST_STREAM control frame here
@@ -1043,13 +1040,13 @@ void SpdySession::SendResetStreamFrame(SpdyStreamId stream_id,
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
}
-void SpdySession::StartRead() {
- DCHECK_NE(state_, STATE_DO_READ_COMPLETE);
- DoLoop(OK);
-}
+int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
+ DCHECK_EQ(read_state_, expected_read_state);
-int SpdySession::DoLoop(int result) {
- bytes_read_ = 0;
+ if (availability_state_ == STATE_CLOSED) {
+ DCHECK_LT(error_on_close_, ERR_IO_PENDING);
+ return error_on_close_;
+ }
// The SpdyFramer will use callbacks onto |this| as it parses frames.
// When errors occur, those callbacks can lead to teardown of all references
@@ -1057,106 +1054,221 @@ int SpdySession::DoLoop(int result) {
// cleanup.
scoped_refptr<SpdySession> self(this);
- do {
- switch (state_) {
- case STATE_DO_READ:
+ int bytes_read_without_yielding = 0;
+
+ // Loop until the session is closed, the read becomes blocked, or
+ // the read limit is exceeded.
+ while (true) {
+ switch (read_state_) {
+ case READ_STATE_DO_READ:
DCHECK_EQ(result, OK);
result = DoRead();
break;
- case STATE_DO_READ_COMPLETE:
+ case READ_STATE_DO_READ_COMPLETE:
+ if (result > 0)
+ bytes_read_without_yielding += result;
result = DoReadComplete(result);
break;
- case STATE_CLOSED:
- result = ERR_CONNECTION_CLOSED;
- break;
default:
- NOTREACHED() << "state_: " << state_;
+ NOTREACHED() << "read_state_: " << read_state_;
break;
}
- } while (result != ERR_IO_PENDING && state_ != STATE_CLOSED);
- DCHECK(result == ERR_IO_PENDING || result == ERR_CONNECTION_CLOSED);
- return result;
+ if (availability_state_ == STATE_CLOSED) {
+ DCHECK_EQ(result, error_on_close_);
+ DCHECK_LT(result, ERR_IO_PENDING);
+ return result;
+ }
+
+ if (result == ERR_IO_PENDING)
+ return ERR_IO_PENDING;
+
+ if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
+ read_state_ = READ_STATE_DO_READ;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop),
+ weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
+ return ERR_IO_PENDING;
+ }
+ }
}
int SpdySession::DoRead() {
- if (bytes_read_ > kMaxReadBytes) {
- state_ = STATE_DO_READ;
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&SpdySession::StartRead,
- weak_factory_.GetWeakPtr()));
- return ERR_IO_PENDING;
- }
+ DCHECK_NE(availability_state_, STATE_CLOSED);
CHECK(connection_);
CHECK(connection_->socket());
- state_ = STATE_DO_READ_COMPLETE;
+ read_state_ = READ_STATE_DO_READ_COMPLETE;
return connection_->socket()->Read(
read_buffer_.get(),
kReadBufferSize,
- base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr()));
+ base::Bind(base::IgnoreResult(&SpdySession::DoReadLoop),
+ weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
}
int SpdySession::DoReadComplete(int result) {
+ DCHECK_NE(availability_state_, STATE_CLOSED);
+
// Parse a frame. For now this code requires that the frame fit into our
- // buffer (32KB).
+ // buffer (kReadBufferSize).
// TODO(mbelshe): support arbitrarily large frames!
- if (result <= 0) {
- // Session is tearing down.
- Error error = static_cast<Error>(result);
- if (result == 0) {
- UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
- total_bytes_received_, 1, 100000000, 50);
- error = ERR_CONNECTION_CLOSED;
- }
- CloseSessionOnError(error, "result is <= 0.");
+ if (result == 0) {
+ UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
+ total_bytes_received_, 1, 100000000, 50);
+ CloseSessionOnError(ERR_CONNECTION_CLOSED, "Connection closed");
+ DCHECK_EQ(availability_state_, STATE_CLOSED);
+ DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
return ERR_CONNECTION_CLOSED;
}
+ if (result < 0) {
+ CloseSessionOnError(static_cast<Error>(result), "result is < 0.");
+ DCHECK_EQ(availability_state_, STATE_CLOSED);
+ DCHECK_EQ(error_on_close_, result);
+ return result;
+ }
+
total_bytes_received_ += result;
- bytes_read_ += result;
last_activity_time_ = base::TimeTicks::Now();
DCHECK(buffered_spdy_framer_.get());
char* data = read_buffer_->data();
- while (result &&
- buffered_spdy_framer_->error_code() ==
- SpdyFramer::SPDY_NO_ERROR) {
- uint32 bytes_processed =
- buffered_spdy_framer_->ProcessInput(data, result);
+ while (result > 0) {
+ uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
result -= bytes_processed;
data += bytes_processed;
- }
- if (!IsConnected())
- return ERR_CONNECTION_CLOSED;
+ if (availability_state_ == STATE_CLOSED) {
+ DCHECK_LT(error_on_close_, ERR_IO_PENDING);
+ return error_on_close_;
+ }
- state_ = STATE_DO_READ;
+ DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
+ }
+
+ read_state_ = READ_STATE_DO_READ;
return OK;
}
-void SpdySession::OnWriteComplete(int result) {
- // Releasing the in-flight write can have a side-effect of dropping
- // the last reference to |this|. Hold a reference through this
- // function.
+int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
+ DCHECK_EQ(write_state_, expected_write_state);
+
+ if (availability_state_ == STATE_CLOSED) {
+ DCHECK_LT(error_on_close_, ERR_IO_PENDING);
+ return error_on_close_;
+ }
+
+ // Releasing the in-flight write in DoWriteComplete() can have a
+ // side-effect of dropping the last reference to |this|. Hold a
+ // reference through this function.
scoped_refptr<SpdySession> self(this);
- DCHECK(write_pending_);
+ // Loop until the session is closed or the write becomes blocked.
+ while (true) {
+ switch (write_state_) {
+ case WRITE_STATE_DO_WRITE:
+ DCHECK_EQ(result, OK);
+ result = DoWrite();
+ break;
+ case WRITE_STATE_DO_WRITE_COMPLETE:
+ result = DoWriteComplete(result);
+ break;
+ default:
+ NOTREACHED() << "write_state_: " << write_state_;
+ break;
+ }
+
+ if (availability_state_ == STATE_CLOSED) {
+ DCHECK_EQ(result, error_on_close_);
+ DCHECK_LT(result, ERR_IO_PENDING);
+ return result;
+ }
+
+ if (result == ERR_IO_PENDING)
+ return ERR_IO_PENDING;
+ }
+}
+
+int SpdySession::DoWrite() {
+ DCHECK_NE(availability_state_, STATE_CLOSED);
+
+ DCHECK(buffered_spdy_framer_);
+ if (in_flight_write_) {
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
+ } else {
+ // Grab the next frame to send.
+ SpdyFrameType frame_type = DATA;
+ scoped_ptr<SpdyBufferProducer> producer;
+ base::WeakPtr<SpdyStream> stream;
+ if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
+ DCHECK_EQ(write_state_, WRITE_STATE_DO_WRITE);
+ return ERR_IO_PENDING;
+ }
+
+ if (stream.get())
+ DCHECK(!stream->IsClosed());
+
+ // Activate the stream only when sending the SYN_STREAM frame to
+ // guarantee monotonically-increasing stream IDs.
+ if (frame_type == SYN_STREAM) {
+ if (stream.get() && stream->stream_id() == 0) {
+ scoped_ptr<SpdyStream> owned_stream =
+ ActivateCreatedStream(stream.get());
+ InsertActivatedStream(owned_stream.Pass());
+ } else {
+ NOTREACHED();
+ return ERR_UNEXPECTED;
+ }
+ }
+
+ in_flight_write_ = producer->ProduceBuffer();
+ if (!in_flight_write_) {
+ NOTREACHED();
+ return ERR_UNEXPECTED;
+ }
+ in_flight_write_frame_type_ = frame_type;
+ in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
+ DCHECK_GE(in_flight_write_frame_size_,
+ buffered_spdy_framer_->GetFrameMinimumSize());
+ in_flight_write_stream_ = stream;
+ }
+
+ write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
+
+ // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
+ // with Socket implementations that don't store their IOBuffer
+ // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
+ scoped_refptr<IOBuffer> write_io_buffer =
+ in_flight_write_->GetIOBufferForRemainingData();
+ // We keep |in_flight_write_| alive until OnWriteComplete(), so it's
+ // okay to pass in |write_io_buffer| since the socket won't use it
+ // past OnWriteComplete().
+ return connection_->socket()->Write(
+ write_io_buffer.get(),
+ in_flight_write_->GetRemainingSize(),
+ base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop),
+ weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
+}
+
+int SpdySession::DoWriteComplete(int result) {
+ DCHECK_NE(availability_state_, STATE_CLOSED);
+
DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
last_activity_time_ = base::TimeTicks::Now();
- write_pending_ = false;
if (result < 0) {
+ DCHECK_NE(result, ERR_IO_PENDING);
in_flight_write_.reset();
in_flight_write_frame_type_ = DATA;
in_flight_write_frame_size_ = 0;
in_flight_write_stream_.reset();
CloseSessionOnError(static_cast<Error>(result), "Write error");
- return;
+ DCHECK_EQ(error_on_close_, result);
+ return result;
}
// It should not be possible to have written more bytes than our
@@ -1186,108 +1298,11 @@ void SpdySession::OnWriteComplete(int result) {
}
}
- // Write more data. We're already in a continuation, so we can go
- // ahead and write it immediately (without going back to the message
- // loop).
- WriteSocketLater();
-}
-
-void SpdySession::WriteSocketLater() {
- if (delayed_write_pending_)
- return;
-
- if (!IsConnected())
- return;
-
- delayed_write_pending_ = true;
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&SpdySession::WriteSocket, weak_factory_.GetWeakPtr()));
-}
-
-void SpdySession::WriteSocket() {
- // This function should only be called via WriteSocketLater.
- DCHECK(delayed_write_pending_);
- delayed_write_pending_ = false;
-
- // If the socket isn't connected yet, just wait; we'll get called
- // again when the socket connection completes. If the socket is
- // closed, just return.
- if (!IsConnected())
- return;
-
- if (write_pending_) // Another write is in progress still.
- return;
-
- // Loop sending frames until we've sent everything or until the write
- // returns error (or ERR_IO_PENDING).
- DCHECK(buffered_spdy_framer_.get());
- while (true) {
- if (in_flight_write_) {
- DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
- } else {
- // Grab the next frame to send.
- SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyBufferProducer> producer;
- base::WeakPtr<SpdyStream> stream;
- if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
- break;
-
- if (stream.get())
- DCHECK(!stream->IsClosed());
-
- // Activate the stream only when sending the SYN_STREAM frame to
- // guarantee monotonically-increasing stream IDs.
- if (frame_type == SYN_STREAM) {
- if (stream.get() && stream->stream_id() == 0) {
- scoped_ptr<SpdyStream> owned_stream =
- ActivateCreatedStream(stream.get());
- InsertActivatedStream(owned_stream.Pass());
- } else {
- NOTREACHED();
- continue;
- }
- }
-
- in_flight_write_ = producer->ProduceBuffer();
- if (!in_flight_write_) {
- NOTREACHED();
- continue;
- }
- in_flight_write_frame_type_ = frame_type;
- in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
- DCHECK_GE(in_flight_write_frame_size_,
- buffered_spdy_framer_->GetFrameMinimumSize());
- in_flight_write_stream_ = stream;
- }
-
- write_pending_ = true;
- // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
- // with Socket implementations that don't store their IOBuffer
- // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
- scoped_refptr<IOBuffer> write_io_buffer =
- in_flight_write_->GetIOBufferForRemainingData();
- // We keep |in_flight_write_| alive until OnWriteComplete(), so
- // it's okay to use GetIOBufferForRemainingData() since the socket
- // doesn't use the IOBuffer past OnWriteComplete().
- int rv = connection_->socket()->Write(
- write_io_buffer.get(),
- in_flight_write_->GetRemainingSize(),
- base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
- // Avoid persisting |write_io_buffer| past |in_flight_write_|'s
- // lifetime (which will end if OnWriteComplete() is called below).
- write_io_buffer = NULL;
- if (rv == ERR_IO_PENDING)
- break;
-
- // We sent the frame successfully.
- OnWriteComplete(rv);
-
- // TODO(mbelshe): Test this error case. Maybe we should mark the socket
- // as in an error state.
- if (rv < 0)
- break;
- }
+ // This has to go after calling OnFrameWriteComplete() so that if
+ // that ends up calling EnqueueWrite(), it properly avoids posting
+ // another DoWriteLoop task.
+ write_state_ = WRITE_STATE_DO_WRITE;
+ return OK;
}
void SpdySession::CloseAllStreamsAfter(SpdyStreamId last_good_stream_id,
@@ -1359,7 +1374,7 @@ void SpdySession::CloseSessionOnError(Error err,
// to |this|. Hold a reference through this function.
scoped_refptr<SpdySession> self(this);
- DCHECK_LT(err, OK);
+ DCHECK_LT(err, ERR_IO_PENDING);
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_CLOSE,
base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
@@ -1367,12 +1382,12 @@ void SpdySession::CloseSessionOnError(Error err,
// Don't close twice. This can occur because we can have both
// a read and a write outstanding, and each can complete with
// an error.
- if (!IsClosed()) {
+ if (availability_state_ != STATE_CLOSED) {
UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
total_bytes_received_, 1, 100000000, 50);
- state_ = STATE_CLOSED;
- error_ = err;
+ availability_state_ = STATE_CLOSED;
+ error_on_close_ = err;
// TODO(akalin): Move this after CloseAllStreams() once we're
// owned by the pool.
RemoveFromPool();
@@ -1409,7 +1424,7 @@ base::Value* SpdySession::GetInfoAsValue() const {
SSLClientSocket::NextProtoToString(
connection_->socket()->GetNegotiatedProtocol()));
- dict->SetInteger("error", error_);
+ dict->SetInteger("error", error_on_close_);
dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
dict->SetInteger("streams_initiated_count", streams_initiated_count_);
@@ -1483,8 +1498,19 @@ void SpdySession::EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> producer,
const base::WeakPtr<SpdyStream>& stream) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
+ bool was_idle = write_queue_.IsEmpty();
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
- WriteSocketLater();
+ // We only want to post a task when the queue was just empty *and*
+ // we're not being called from a callback from DoWriteComplete().
+ if (was_idle && write_state_ == WRITE_STATE_DO_WRITE) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(base::IgnoreResult(&SpdySession::DoWriteLoop),
+ weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
+ }
}
void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
@@ -1588,6 +1614,9 @@ ServerBoundCertService* SpdySession::GetServerBoundCertService() const {
}
void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(error_code));
std::string description = base::StringPrintf(
@@ -1597,6 +1626,9 @@ void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
void SpdySession::OnStreamError(SpdyStreamId stream_id,
const std::string& description) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
ActiveStreamMap::iterator it = active_streams_.find(stream_id);
if (it == active_streams_.end()) {
// We still want to send a frame to reset the stream even if we
@@ -1613,6 +1645,9 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
const char* data,
size_t len,
bool fin) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
DCHECK_LT(len, 1u << 24);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
@@ -1654,6 +1689,9 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
}
void SpdySession::OnSettings(bool clear_persisted) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
if (clear_persisted)
http_server_properties_->ClearSpdySettings(host_port_pair());
@@ -1668,6 +1706,9 @@ void SpdySession::OnSettings(bool clear_persisted) {
void SpdySession::OnSetting(SpdySettingsIds id,
uint8 flags,
uint32 value) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
HandleSetting(id, value);
http_server_properties_->SetSpdySetting(
host_port_pair(),
@@ -1730,6 +1771,12 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id,
bool fin,
bool unidirectional,
const SpdyHeaderBlock& headers) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
+ // TODO(akalin): Reset the stream when |availability_state_| ==
+ // STATE_GOING_AWAY.
+
base::Time response_time = base::Time::Now();
base::TimeTicks recv_first_byte_time = base::TimeTicks::Now();
@@ -1891,6 +1938,9 @@ void SpdySession::DeleteExpiredPushedStreams() {
void SpdySession::OnSynReply(SpdyStreamId stream_id,
bool fin,
const SpdyHeaderBlock& headers) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
base::Time response_time = base::Time::Now();
base::TimeTicks recv_first_byte_time = base::TimeTicks::Now();
@@ -1927,6 +1977,9 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id,
void SpdySession::OnHeaders(SpdyStreamId stream_id,
bool fin,
const SpdyHeaderBlock& headers) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
@@ -1954,6 +2007,9 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id,
void SpdySession::OnRstStream(SpdyStreamId stream_id,
SpdyRstStreamStatus status) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
std::string description;
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_RST_STREAM,
@@ -1987,6 +2043,9 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id,
void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
SpdyGoAwayStatus status) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
base::Bind(&NetLogSpdyGoAwayCallback,
last_accepted_stream_id,
@@ -1999,6 +2058,9 @@ void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
}
void SpdySession::OnPing(uint32 unique_id) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_PING,
base::Bind(&NetLogSpdyPingCallback, unique_id, "received"));
@@ -2027,6 +2089,9 @@ void SpdySession::OnPing(uint32 unique_id) {
void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
uint32 delta_window_size) {
+ if (availability_state_ == STATE_CLOSED)
+ return;
+
DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
net_log_.AddEvent(
NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
@@ -2580,7 +2645,7 @@ void SpdySession::ResumeSendStalledStreams() {
// have to worry about streams being closed, as well as ourselves
// being closed.
- while (!IsClosed() && !IsSendStalled()) {
+ while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
size_t old_size = 0;
if (DCHECK_IS_ON())
old_size = GetTotalSize(stream_send_unstall_queue_);
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index c2c6cfd..0cca912 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -51,9 +51,9 @@ const int kMaxSpdyFrameChunkSize = (2 * kMss) - 8;
// Specifies the maxiumum concurrent streams server could send (via push).
const int kMaxConcurrentPushedStreams = 1000;
-// Specifies the number of bytes read synchronously (without yielding) if the
-// data is available.
-const int kMaxReadBytes = 32 * 1024;
+// Specifies the maximum number of bytes to read synchronously before
+// yielding.
+const int kMaxReadBytesWithoutYielding = 32 * 1024;
// The initial receive window size for both streams and sessions.
const int32 kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB
@@ -336,8 +336,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
void SendStreamWindowUpdate(SpdyStreamId stream_id,
uint32 delta_window_size);
- // If session is closed, no new streams/transactions should be created.
- bool IsClosed() const { return state_ == STATE_CLOSED; }
+ // Whether the stream is closed, i.e. it has stopped processing data
+ // and is about to be destroyed.
+ bool IsClosed() const { return availability_state_ == STATE_CLOSED; }
// Closes this session. This will close all active streams and mark
// the session as permanently closed.
@@ -386,7 +387,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
}
size_t num_created_streams() const { return created_streams_.size(); }
- size_t pending_create_stream_queue_size(int priority) const {
+ size_t pending_create_stream_queue_size(RequestPriority priority) const {
DCHECK_LT(priority, NUM_PRIORITIES);
return pending_create_stream_queues_[priority].size();
}
@@ -501,14 +502,28 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
typedef std::set<SpdyStream*> CreatedStreamSet;
- enum State {
- STATE_IDLE,
- STATE_CONNECTING,
- STATE_DO_READ,
- STATE_DO_READ_COMPLETE,
+ enum AvailabilityState {
+ // The session is available in its socket pool and can be used
+ // freely.
+ STATE_AVAILABLE,
+ // The session can process data on existing streams but will
+ // refuse to create new ones.
+ STATE_GOING_AWAY,
+ // The session has been closed, is waiting to be deleted, and will
+ // refuse to process any more data.
STATE_CLOSED
};
+ enum ReadState {
+ READ_STATE_DO_READ,
+ READ_STATE_DO_READ_COMPLETE,
+ };
+
+ enum WriteState {
+ WRITE_STATE_DO_WRITE,
+ WRITE_STATE_DO_WRITE_COMPLETE,
+ };
+
virtual ~SpdySession();
// Called by SpdyStreamRequest to start a request to create a
@@ -554,24 +569,19 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
SpdyRstStreamStatus status,
const std::string& description);
- // Start the DoLoop to read data from socket.
- void StartRead();
-
- // Try to make progress by reading and processing data.
- int DoLoop(int result);
- // The implementations of STATE_DO_READ/STATE_DO_READ_COMPLETE state changes
- // of the state machine.
+ // Advance the ReadState state machine. |expected_read_state| is the
+ // expected starting read state.
+ int DoReadLoop(ReadState expected_read_state, int result);
+ // The implementations of the states of the ReadState state machine.
int DoRead();
- int DoReadComplete(int bytes_read);
-
- // Check if session is connected or not.
- bool IsConnected() const {
- return state_ == STATE_DO_READ || state_ == STATE_DO_READ_COMPLETE;
- }
+ int DoReadComplete(int result);
- // IO Callbacks
- void OnReadComplete(int result);
- void OnWriteComplete(int result);
+ // Advance the WriteState state machine. |expected_write_state| is
+ // the expected starting write state.
+ int DoWriteLoop(WriteState expected_write_state, int result);
+ // The implementations of the states of the WriteState state machine.
+ int DoWrite();
+ int DoWriteComplete(int result);
// Send relevant SETTINGS. This is generally called on connection setup.
void SendInitialSettings();
@@ -607,10 +617,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// haven't received any data in |kHungInterval| time period.
void CheckPingStatus(base::TimeTicks last_check_time);
- // Write current data to the socket.
- void WriteSocketLater();
- void WriteSocket();
-
// Get a new stream id.
int GetNewStreamId();
@@ -884,8 +890,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
SpdyWriteQueue write_queue_;
// Data for the frame we are currently sending.
- // Whether we have a socket write pending completion.
- bool write_pending_;
+
// The buffer we're currently writing.
scoped_ptr<SpdyBuffer> in_flight_write_;
// The type of the frame in |in_flight_write_|.
@@ -896,9 +901,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// the socket completely.
base::WeakPtr<SpdyStream> in_flight_write_stream_;
- // Flag if we have a pending message scheduled for WriteSocket.
- bool delayed_write_pending_;
-
// Flag if we're using an SSL connection for this SpdySession.
bool is_secure_;
@@ -908,11 +910,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Spdy Frame state.
scoped_ptr<BufferedSpdyFramer> buffered_spdy_framer_;
- // If an error has occurred on the session, the session is effectively
- // dead. Record this error here. When no error has occurred, |error_| will
- // be OK.
- Error error_;
- State state_;
+ // The state variables.
+ AvailabilityState availability_state_;
+ ReadState read_state_;
+ WriteState write_state_;
+
+ // If the session was closed (i.e., |availability_state_| is
+ // STATE_CLOSED), then |error_on_close_| holds the error with which
+ // it was closed, which is < ERR_IO_PENDING. Otherwise, it is set to
+ // OK.
+ Error error_on_close_;
// Limits
size_t max_concurrent_streams_; // 0 if no limit
@@ -928,10 +935,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// SpdySession. It is used by the |Net.SpdySettingsCwnd...| histograms.
int total_bytes_received_;
- // |bytes_read_| keeps track of number of bytes read continously in the
- // DoLoop() without yielding.
- int bytes_read_;
-
bool sent_settings_; // Did this session send settings when it started.
bool received_settings_; // Did this session receive at least one settings
// frame.
diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc
index 1b414681..1f6b931 100644
--- a/net/spdy/spdy_session_unittest.cc
+++ b/net/spdy/spdy_session_unittest.cc
@@ -357,7 +357,7 @@ TEST_P(SpdySessionTest, ServerPing) {
test::StreamDelegateSendImmediate delegate(spdy_stream1, NULL);
spdy_stream1->SetDelegate(&delegate);
- // Flush the SpdySession::OnReadComplete() task.
+ // Flush the read completion task.
base::MessageLoop::current()->RunUntilIdle();
EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_));
@@ -546,7 +546,7 @@ TEST_P(SpdySessionTest, OnSettings) {
stream_releaser.MakeCallback(&request)));
session = NULL;
- EXPECT_EQ(OK, stream_releaser.WaitForResult());
+ EXPECT_EQ(ERR_CONNECTION_CLOSED, stream_releaser.WaitForResult());
}
// Start with max concurrent streams set to 1 (that is persisted). Receive a
@@ -610,7 +610,7 @@ TEST_P(SpdySessionTest, ClearSettings) {
BoundNetLog(),
stream_releaser.MakeCallback(&request)));
- EXPECT_EQ(OK, stream_releaser.WaitForResult());
+ EXPECT_EQ(ERR_CONNECTION_CLOSED, stream_releaser.WaitForResult());
// Make sure that persisted data is cleared.
EXPECT_EQ(0u, spdy_session_pool_->http_server_properties()->GetSpdySettings(
@@ -823,7 +823,7 @@ TEST_P(SpdySessionTest, Initialize) {
CreateInsecureSpdySession(http_session_, key_, log.bound());
EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_));
- // Flush the SpdySession::OnReadComplete() task.
+ // Flush the read completion task.
base::MessageLoop::current()->RunUntilIdle();
net::CapturingNetLog::CapturedEntryList entries;
@@ -871,7 +871,7 @@ TEST_P(SpdySessionTest, CloseSessionOnError) {
CreateInsecureSpdySession(http_session_, key_, log.bound());
EXPECT_TRUE(HasSpdySession(spdy_session_pool_, key_));
- // Flush the SpdySession::OnReadComplete() task.
+ // Flush the read completion task.
base::MessageLoop::current()->RunUntilIdle();
EXPECT_FALSE(HasSpdySession(spdy_session_pool_, key_));
@@ -1705,15 +1705,16 @@ TEST_P(SpdySessionTest, NeedsCredentials) {
EXPECT_EQ(spdy_util_.spdy_version() >= SPDY3, session->NeedsCredentials());
- // Flush the SpdySession::OnReadComplete() task.
+ // Flush the read completion task.
base::MessageLoop::current()->RunUntilIdle();
session->CloseSessionOnError(ERR_ABORTED, std::string());
}
-// Test that SpdySession::DoRead reads data from the socket without yielding.
-// This test makes 32k - 1 bytes of data available on the socket for reading. It
-// then verifies that it has read all the available data without yielding.
+// Test that SpdySession::DoReadLoop reads data from the socket
+// without yielding. This test makes 32k - 1 bytes of data available
+// on the socket for reading. It then verifies that it has read all
+// the available data without yielding.
TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
MockConnect connect_data(SYNCHRONOUS, OK);
BufferedSpdyFramer framer(spdy_util_.spdy_version(), false);
@@ -1724,10 +1725,11 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
CreateMockWrite(*req1, 0),
};
- // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size).
- ASSERT_EQ(32 * 1024, kMaxReadBytes);
+ // Build buffer of size kMaxReadBytesWithoutYielding / 4
+ // (-spdy_data_frame_size).
+ ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding);
const int kPayloadSize =
- kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize();
+ kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize();
TestDataStream test_stream;
scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize));
char* payload_data = payload->data();
@@ -1780,8 +1782,9 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrl());
- // Set up the TaskObserver to verify SpdySession::DoRead doesn't post a task.
- SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead");
+ // Set up the TaskObserver to verify SpdySession::DoReadLoop doesn't
+ // post a task.
+ SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop");
// Run until 1st read.
EXPECT_EQ(0u, delegate1.stream_id());
@@ -1789,7 +1792,8 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(0u, observer.executed_count());
- // Read all the data and verify SpdySession::DoRead has not posted a task.
+ // Read all the data and verify SpdySession::DoReadLoop has not
+ // posted a task.
data.RunFor(4);
EXPECT_EQ(NULL, spdy_stream1.get());
@@ -1800,10 +1804,11 @@ TEST_P(SpdySessionTest, ReadDataWithoutYielding) {
EXPECT_TRUE(data.at_read_eof());
}
-// Test that SpdySession::DoRead yields while reading the data. This test makes
-// 32k + 1 bytes of data available on the socket for reading. It then verifies
-// that DoRead has yielded even though there is data available for it to read
-// (i.e, socket()->Read didn't return ERR_IO_PENDING during socket reads).
+// Test that SpdySession::DoReadLoop yields while reading the
+// data. This test makes 32k + 1 bytes of data available on the socket
+// for reading. It then verifies that DoRead has yielded even though
+// there is data available for it to read (i.e, socket()->Read didn't
+// return ERR_IO_PENDING during socket reads).
TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
MockConnect connect_data(SYNCHRONOUS, OK);
BufferedSpdyFramer framer(spdy_util_.spdy_version(), false);
@@ -1814,10 +1819,11 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
CreateMockWrite(*req1, 0),
};
- // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size).
- ASSERT_EQ(32 * 1024, kMaxReadBytes);
+ // Build buffer of size kMaxReadBytesWithoutYielding / 4
+ // (-spdy_data_frame_size).
+ ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding);
const int kPayloadSize =
- kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize();
+ kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize();
TestDataStream test_stream;
scoped_refptr<net::IOBuffer> payload(new net::IOBuffer(kPayloadSize));
char* payload_data = payload->data();
@@ -1870,8 +1876,9 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrl());
- // Set up the TaskObserver to verify SpdySession::DoRead posts a task.
- SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead");
+ // Set up the TaskObserver to verify SpdySession::DoReadLoop posts a
+ // task.
+ SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop");
// Run until 1st read.
EXPECT_EQ(0u, delegate1.stream_id());
@@ -1879,7 +1886,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(0u, observer.executed_count());
- // Read all the data and verify SpdySession::DoRead has posted a task.
+ // Read all the data and verify SpdySession::DoReadLoop has posted a
+ // task.
data.RunFor(6);
EXPECT_EQ(NULL, spdy_stream1.get());
@@ -1891,17 +1899,17 @@ TEST_P(SpdySessionTest, TestYieldingDuringReadData) {
EXPECT_TRUE(data.at_read_eof());
}
-// Test that SpdySession::DoRead() tests interactions of yielding + async,
-// by doing the following MockReads.
+// Test that SpdySession::DoReadLoop() tests interactions of yielding
+// + async, by doing the following MockReads.
//
// MockRead of SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 2K
// ASYNC 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 8K, SYNCHRONOUS 2K.
//
-// The above reads 26K synchronously. Since that is less that 32K, we will
-// attempt to read again. However, that DoRead() will return ERR_IO_PENDING
-// (because of async read), so DoRead() will yield. When we come back, DoRead()
-// will read the results from the async read, and rest of the data
-// synchronously.
+// The above reads 26K synchronously. Since that is less that 32K, we
+// will attempt to read again. However, that DoRead() will return
+// ERR_IO_PENDING (because of async read), so DoReadLoop() will
+// yield. When we come back, DoRead() will read the results from the
+// async read, and rest of the data synchronously.
TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
MockConnect connect_data(SYNCHRONOUS, OK);
BufferedSpdyFramer framer(spdy_util_.spdy_version(), false);
@@ -1912,11 +1920,12 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
CreateMockWrite(*req1, 0),
};
- // Build buffer of size kMaxReadBytes / 4 (-spdy_data_frame_size).
- ASSERT_EQ(32 * 1024, kMaxReadBytes);
+ // Build buffer of size kMaxReadBytesWithoutYielding / 4
+ // (-spdy_data_frame_size).
+ ASSERT_EQ(32 * 1024, kMaxReadBytesWithoutYielding);
TestDataStream test_stream;
const int kEightKPayloadSize =
- kMaxReadBytes / 4 - framer.GetControlFrameHeaderSize();
+ kMaxReadBytesWithoutYielding / 4 - framer.GetControlFrameHeaderSize();
scoped_refptr<net::IOBuffer> eightk_payload(
new net::IOBuffer(kEightKPayloadSize));
char* eightk_payload_data = eightk_payload->data();
@@ -1983,8 +1992,9 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
spdy_stream1->SendRequestHeaders(headers1.Pass(), NO_MORE_DATA_TO_SEND);
EXPECT_TRUE(spdy_stream1->HasUrl());
- // Set up the TaskObserver to monitor SpdySession::DoRead posting of tasks.
- SpdySessionTestTaskObserver observer("spdy_session.cc", "DoRead");
+ // Set up the TaskObserver to monitor SpdySession::DoReadLoop
+ // posting of tasks.
+ SpdySessionTestTaskObserver observer("spdy_session.cc", "DoReadLoop");
// Run until 1st read.
EXPECT_EQ(0u, delegate1.stream_id());
@@ -1992,7 +2002,8 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
EXPECT_EQ(1u, delegate1.stream_id());
EXPECT_EQ(0u, observer.executed_count());
- // Read all the data and verify SpdySession::DoRead has posted a task.
+ // Read all the data and verify SpdySession::DoReadLoop has posted a
+ // task.
data.RunFor(12);
EXPECT_EQ(NULL, spdy_stream1.get());
@@ -2004,11 +2015,12 @@ TEST_P(SpdySessionTest, TestYieldingDuringAsyncReadData) {
EXPECT_TRUE(data.at_read_eof());
}
-// Send a GoAway frame when SpdySession is in DoLoop. If scoped_refptr to
-// <SpdySession> is deleted from SpdySession::DoLoop(), we get a crash because
-// GoAway could delete the SpdySession from the SpdySessionPool and the last
+// Send a GoAway frame when SpdySession is in DoReadLoop. If
+// scoped_refptr to <SpdySession> is deleted from
+// SpdySession::DoReadLoop(), we get a crash because GoAway could
+// delete the SpdySession from the SpdySessionPool and the last
// reference to SpdySession.
-TEST_P(SpdySessionTest, GoAwayWhileInDoLoop) {
+TEST_P(SpdySessionTest, GoAwayWhileInDoReadLoop) {
MockConnect connect_data(SYNCHRONOUS, OK);
BufferedSpdyFramer framer(spdy_util_.spdy_version(), false);
@@ -2064,8 +2076,8 @@ TEST_P(SpdySessionTest, GoAwayWhileInDoLoop) {
data.RunFor(1);
EXPECT_EQ(1u, spdy_stream1->stream_id());
- // Only references to SpdySession are held by DoLoop and
- // SpdySessionPool. If DoLoop doesn't hold the reference, we get a
+ // Only references to SpdySession are held by DoReadLoop and
+ // SpdySessionPool. If DoReadLoop doesn't hold the reference, we get a
// crash if SpdySession is deleted from the SpdySessionPool.
// Run until GoAway.
@@ -2466,7 +2478,7 @@ TEST_P(SpdySessionTest, SendCredentials) {
CreateSecureSpdySession(http_session_, key, BoundNetLog());
EXPECT_TRUE(session->NeedsCredentials());
- // Flush the SpdySession::OnReadComplete() task.
+ // Flush the read completion task.
base::MessageLoop::current()->RunUntilIdle();
session->CloseSessionOnError(ERR_ABORTED, std::string());
diff --git a/net/spdy/spdy_test_util_common.cc b/net/spdy/spdy_test_util_common.cc
index ea333c9..ba32a6f 100644
--- a/net/spdy/spdy_test_util_common.cc
+++ b/net/spdy/spdy_test_util_common.cc
@@ -293,7 +293,8 @@ CompletionCallback StreamReleaserCallback::MakeCallback(
void StreamReleaserCallback::OnComplete(
SpdyStreamRequest* request, int result) {
- request->ReleaseStream()->Cancel();
+ if (result == OK)
+ request->ReleaseStream()->Cancel();
SetResult(result);
}
diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc
index c793aab..2ac4241 100644
--- a/net/spdy/spdy_write_queue.cc
+++ b/net/spdy/spdy_write_queue.cc
@@ -32,6 +32,14 @@ SpdyWriteQueue::~SpdyWriteQueue() {
Clear();
}
+bool SpdyWriteQueue::IsEmpty() const {
+ for (int i = 0; i < NUM_PRIORITIES; i++) {
+ if (!queue_[i].empty())
+ return false;
+ }
+ return true;
+}
+
void SpdyWriteQueue::Enqueue(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> frame_producer,
diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h
index fa194ef..3bceb29 100644
--- a/net/spdy/spdy_write_queue.h
+++ b/net/spdy/spdy_write_queue.h
@@ -27,6 +27,10 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
SpdyWriteQueue();
~SpdyWriteQueue();
+ // Returns whether there is anything in the write queue,
+ // i.e. whether the next call to Dequeue will return true.
+ bool IsEmpty() const;
+
// Enqueues the given frame producer of the given type at the given
// priority associated with the given stream, which may be NULL if
// the frame producer is not associated with a stream. If |stream|