summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-06-28 02:21:41 +0000
committerrch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-06-28 02:21:41 +0000
commitd5a94433ff8d7324510adae18ab1b9c9ba1394d5 (patch)
treed88e1d7108c12a87d433c9ded99be60a198f431c
parenta7266a9dc36aa68143d670eb8646f9d67237fa61 (diff)
downloadchromium_src-d5a94433ff8d7324510adae18ab1b9c9ba1394d5.zip
chromium_src-d5a94433ff8d7324510adae18ab1b9c9ba1394d5.tar.gz
chromium_src-d5a94433ff8d7324510adae18ab1b9c9ba1394d5.tar.bz2
Instead of enqueueing SPDY frames, instead enqueue SPDY streams that are ready to produce data. This allows us to lazily allocate a stream id.
BUG=111708 Review URL: https://chromiumcodereview.appspot.com/10448083 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@144649 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--net/http/http_network_transaction_spdy2_unittest.cc4
-rw-r--r--net/http/http_network_transaction_spdy3_unittest.cc4
-rw-r--r--net/spdy/spdy_io_buffer.cc15
-rw-r--r--net/spdy/spdy_io_buffer.h10
-rw-r--r--net/spdy/spdy_network_transaction_spdy2_unittest.cc43
-rw-r--r--net/spdy/spdy_network_transaction_spdy3_unittest.cc45
-rw-r--r--net/spdy/spdy_session.cc251
-rw-r--r--net/spdy/spdy_session.h88
-rw-r--r--net/spdy/spdy_session_spdy2_unittest.cc20
-rw-r--r--net/spdy/spdy_session_spdy3_unittest.cc35
-rw-r--r--net/spdy/spdy_stream.cc108
-rw-r--r--net/spdy/spdy_stream.h16
-rw-r--r--net/spdy/spdy_stream_spdy2_unittest.cc6
-rw-r--r--net/spdy/spdy_stream_spdy3_unittest.cc6
-rw-r--r--net/spdy/spdy_websocket_stream.h2
-rw-r--r--net/spdy/spdy_websocket_stream_spdy2_unittest.cc8
-rw-r--r--net/spdy/spdy_websocket_stream_spdy3_unittest.cc10
17 files changed, 459 insertions, 212 deletions
diff --git a/net/http/http_network_transaction_spdy2_unittest.cc b/net/http/http_network_transaction_spdy2_unittest.cc
index 72d4bf8..2c830bd 100644
--- a/net/http/http_network_transaction_spdy2_unittest.cc
+++ b/net/http/http_network_transaction_spdy2_unittest.cc
@@ -4944,8 +4944,8 @@ TEST_F(HttpNetworkTransactionSpdy2Test, BasicAuthSpdyProxy) {
MockWrite spdy_writes[] = {
CreateMockWrite(*req, 0, ASYNC),
- CreateMockWrite(*connect2, 2),
- CreateMockWrite(*rst, 3, ASYNC),
+ CreateMockWrite(*rst, 2, ASYNC),
+ CreateMockWrite(*connect2, 3),
CreateMockWrite(*wrapped_get, 5),
};
diff --git a/net/http/http_network_transaction_spdy3_unittest.cc b/net/http/http_network_transaction_spdy3_unittest.cc
index 331555c..c5d4c74 100644
--- a/net/http/http_network_transaction_spdy3_unittest.cc
+++ b/net/http/http_network_transaction_spdy3_unittest.cc
@@ -4944,8 +4944,8 @@ TEST_F(HttpNetworkTransactionSpdy3Test, BasicAuthSpdyProxy) {
MockWrite spdy_writes[] = {
CreateMockWrite(*req, 0, ASYNC),
- CreateMockWrite(*connect2, 2),
- CreateMockWrite(*rst, 3, ASYNC),
+ CreateMockWrite(*rst, 2, ASYNC),
+ CreateMockWrite(*connect2, 3),
CreateMockWrite(*wrapped_get, 5)
};
diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc
index 5720f7d..f0c14b1 100644
--- a/net/spdy/spdy_io_buffer.cc
+++ b/net/spdy/spdy_io_buffer.cc
@@ -20,8 +20,23 @@ SpdyIOBuffer::SpdyIOBuffer(
SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) {
}
+SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) {
+ buffer_ = rhs.buffer_;
+ priority_ = rhs.priority_;
+ position_ = rhs.position_;
+ stream_ = rhs.stream_;
+}
+
SpdyIOBuffer::~SpdyIOBuffer() {}
+SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) {
+ buffer_ = rhs.buffer_;
+ priority_ = rhs.priority_;
+ position_ = rhs.position_;
+ stream_ = rhs.stream_;
+ return *this;
+}
+
void SpdyIOBuffer::release() {
buffer_ = NULL;
stream_ = NULL;
diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h
index 0aaced3..05ec59c 100644
--- a/net/spdy/spdy_io_buffer.h
+++ b/net/spdy/spdy_io_buffer.h
@@ -9,10 +9,12 @@
#include "base/memory/ref_counted.h"
#include "net/base/io_buffer.h"
#include "net/base/net_export.h"
-#include "net/spdy/spdy_stream.h"
+#include "net/base/request_priority.h"
namespace net {
+class SpdyStream;
+
// A class for managing SPDY IO buffers. These buffers need to be prioritized
// so that the SpdySession sends them in the right order. Further, they need
// to track the SpdyStream which they are associated with so that incremental
@@ -26,8 +28,14 @@ class NET_EXPORT_PRIVATE SpdyIOBuffer {
// |stream| is a pointer to the stream which is managing this buffer.
SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority,
SpdyStream* stream);
+ // Declare this instead of using the default so that we avoid needing to
+ // include spdy_stream.h.
+ SpdyIOBuffer(const SpdyIOBuffer& rhs);
SpdyIOBuffer();
~SpdyIOBuffer();
+ // Declare this instead of using the default so that we avoid needing to
+ // include spdy_stream.h.
+ SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs);
// Accessors.
DrainableIOBuffer* buffer() const { return buffer_; }
diff --git a/net/spdy/spdy_network_transaction_spdy2_unittest.cc b/net/spdy/spdy_network_transaction_spdy2_unittest.cc
index a52859f..a4c8864 100644
--- a/net/spdy/spdy_network_transaction_spdy2_unittest.cc
+++ b/net/spdy/spdy_network_transaction_spdy2_unittest.cc
@@ -1736,22 +1736,36 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, PostWithEarlySynReply) {
scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0));
scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true));
MockRead reads[] = {
- CreateMockRead(*stream_reply, 2),
- CreateMockRead(*stream_body, 3),
- MockRead(SYNCHRONOUS, 0, 0) // EOF
+ CreateMockRead(*stream_reply, 1),
+ MockRead(ASYNC, 0, 3) // EOF
};
- scoped_ptr<DelayedSocketData> data(
- new DelayedSocketData(0, reads, arraysize(reads), NULL, 0));
- NormalSpdyTransactionHelper helper(request,
+ scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0));
+ scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true));
+ MockRead writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*body, 2),
+ };
+
+ scoped_refptr<DeterministicSocketData> data(
+ new DeterministicSocketData(reads, arraysize(reads),
+ writes, arraysize(writes)));
+ NormalSpdyTransactionHelper helper(CreatePostRequest(),
BoundNetLog(), GetParam(), NULL);
+ helper.SetDeterministic();
helper.RunPreTestSetup();
- helper.AddData(data.get());
- helper.RunDefaultTest();
- helper.VerifyDataConsumed();
+ helper.AddDeterministicData(data.get());
+ HttpNetworkTransaction* trans = helper.trans();
- TransactionHelperResult out = helper.output();
- EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv);
+ TestCompletionCallback callback;
+ int rv = trans->Start(
+ &CreatePostRequest(), callback.callback(), BoundNetLog());
+ EXPECT_EQ(ERR_IO_PENDING, rv);
+
+ data->RunFor(2);
+ rv = callback.WaitForResult();
+ EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv);
+ data->RunFor(1);
}
// The client upon cancellation tries to send a RST_STREAM frame. The mock
@@ -5565,10 +5579,11 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, OutOfOrderSynStream) {
// This first request will start to establish the SpdySession.
// Then we will start the second (MEDIUM priority) and then third
// (HIGHEST priority) request in such a way that the third will actually
- // start before the second, causing the second to be re-numbered.
+ // start before the second, causing the second to be numbered differently
+ // than the order they were created.
scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM));
- scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM));
MockWrite writes[] = {
CreateMockWrite(*req1, 0),
CreateMockWrite(*req2, 3),
diff --git a/net/spdy/spdy_network_transaction_spdy3_unittest.cc b/net/spdy/spdy_network_transaction_spdy3_unittest.cc
index 4eca577..349e37c 100644
--- a/net/spdy/spdy_network_transaction_spdy3_unittest.cc
+++ b/net/spdy/spdy_network_transaction_spdy3_unittest.cc
@@ -1745,22 +1745,36 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, PostWithEarlySynReply) {
scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0));
scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true));
MockRead reads[] = {
- CreateMockRead(*stream_reply, 2),
- CreateMockRead(*stream_body, 3),
- MockRead(SYNCHRONOUS, 0, 0) // EOF
+ CreateMockRead(*stream_reply, 1),
+ MockRead(ASYNC, 0, 3) // EOF
};
- scoped_ptr<DelayedSocketData> data(
- new DelayedSocketData(0, reads, arraysize(reads), NULL, 0));
- NormalSpdyTransactionHelper helper(request,
+ scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0));
+ scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true));
+ MockRead writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*body, 2),
+ };
+
+ scoped_refptr<DeterministicSocketData> data(
+ new DeterministicSocketData(reads, arraysize(reads),
+ writes, arraysize(writes)));
+ NormalSpdyTransactionHelper helper(CreatePostRequest(),
BoundNetLog(), GetParam(), NULL);
+ helper.SetDeterministic();
helper.RunPreTestSetup();
- helper.AddData(data.get());
- helper.RunDefaultTest();
- helper.VerifyDataConsumed();
+ helper.AddDeterministicData(data.get());
+ HttpNetworkTransaction* trans = helper.trans();
- TransactionHelperResult out = helper.output();
- EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv);
+ TestCompletionCallback callback;
+ int rv = trans->Start(
+ &CreatePostRequest(), callback.callback(), BoundNetLog());
+ EXPECT_EQ(ERR_IO_PENDING, rv);
+
+ data->RunFor(2);
+ rv = callback.WaitForResult();
+ EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv);
+ data->RunFor(1);
}
// The client upon cancellation tries to send a RST_STREAM frame. The mock
@@ -2063,7 +2077,7 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, WindowUpdateOverflow) {
// WINDOW_UPDATE while sending a request and will send a RST_STREAM frame.
MockWrite writes[] = {
CreateMockWrite(*req),
- CreateMockWrite(*body),
+ //CreateMockWrite(*body),
CreateMockWrite(*rst),
};
@@ -6148,10 +6162,11 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, OutOfOrderSynStream) {
// This first request will start to establish the SpdySession.
// Then we will start the second (MEDIUM priority) and then third
// (HIGHEST priority) request in such a way that the third will actually
- // start before the second, causing the second to be re-numbered.
+ // start before the second, causing the second to be numbered differently
+ // than they order they were created.
scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM));
- scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM));
MockWrite writes[] = {
CreateMockWrite(*req1, 0),
CreateMockWrite(*req2, 3),
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 613d242..2b7ab3a 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -184,6 +184,28 @@ bool g_enable_ping_based_connection_checking = true;
} // namespace
// static
+void SpdySession::SpdyIOBufferProducer::ActivateStream(
+ SpdySession* spdy_session,
+ SpdyStream* spdy_stream) {
+ spdy_session->ActivateStream(spdy_stream);
+}
+
+// static
+SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ SpdyFrame* frame,
+ RequestPriority priority,
+ SpdyStream* stream) {
+ size_t size = frame->length() + SpdyFrame::kHeaderSize;
+ DCHECK_GT(size, 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), frame->data(), size);
+
+ return new SpdyIOBuffer(buffer, size, priority, stream);
+}
+
+// static
void SpdySession::set_default_protocol(NextProto default_protocol) {
g_default_protocol = default_protocol;
}
@@ -360,6 +382,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
ssl_info.cert->VerifyNameMatch(domain);
}
+void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
+ SpdyIOBufferProducer* producer) {
+ write_queue_.push(producer);
+ stream_producers_[producer] = stream;
+ WriteSocketLater();
+}
+
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
@@ -398,7 +427,8 @@ int SpdySession::CreateStream(
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
if (!max_concurrent_streams_ ||
- active_streams_.size() < max_concurrent_streams_) {
+ (active_streams_.size() + created_streams_.size() <
+ max_concurrent_streams_)) {
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
}
@@ -488,10 +518,7 @@ int SpdySession::CreateStreamImpl(
const std::string& path = url.PathForRequest();
- const SpdyStreamId stream_id = GetNewStreamId();
-
*spdy_stream = new SpdyStream(this,
- stream_id,
false,
stream_net_log);
const scoped_refptr<SpdyStream>& stream = *spdy_stream;
@@ -500,14 +527,13 @@ int SpdySession::CreateStreamImpl(
stream->set_path(path);
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
- ActivateStream(stream);
+ created_streams_.insert(stream);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
static_cast<int>(priority), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
- DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
return OK;
}
@@ -529,15 +555,13 @@ int SpdySession::GetProtocolVersion() const {
return buffered_spdy_framer_->protocol_version();
}
-int SpdySession::WriteSynStream(
+SpdySynStreamControlFrame* SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const linked_ptr<SpdyHeaderBlock>& headers) {
- // Find our stream
- if (!IsStreamActive(stream_id))
- return ERR_INVALID_SPDY_STREAM;
+ CHECK(IsStreamActive(stream_id));
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
@@ -548,11 +572,7 @@ int SpdySession::WriteSynStream(
buffered_spdy_framer_->CreateSynStream(
stream_id, 0,
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
- credential_slot, flags, false, headers.get()));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order.
- // http://crbug.com/111708
- QueueFrame(syn_frame.get(), HIGHEST, stream);
+ credential_slot, flags, true, headers.get()));
base::StatsCounter spdy_requests("spdy.requests");
spdy_requests.Increment();
@@ -564,14 +584,15 @@ int SpdySession::WriteSynStream(
base::Bind(&NetLogSpdySynCallback, headers.get(), flags, stream_id, 0));
}
- return ERR_IO_PENDING;
+ return syn_frame.release();
}
-int SpdySession::WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority) {
+SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
+ const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority) {
DCHECK(is_secure_);
unsigned char secret[32]; // 32 bytes from the spec
GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
@@ -613,24 +634,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyCredentialControlFrame> credential_frame(
buffered_spdy_framer_->CreateCredentialFrame(credential));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order, which means that we need
- // to enqueue all CREDENTIAL frames at this priority to ensure that
- // they are sent *before* the SYN_STREAM that references them.
- // http://crbug.com/111708
- QueueFrame(credential_frame.get(), HIGHEST, NULL);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
}
- return ERR_IO_PENDING;
+ return credential_frame.release();
}
-int SpdySession::WriteStreamData(SpdyStreamId stream_id,
- net::IOBuffer* data, int len,
- SpdyDataFlags flags) {
+SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
@@ -654,7 +669,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return ERR_IO_PENDING;
+ return NULL;
}
int new_len = std::min(len, stream->send_window_size());
if (new_len < len) {
@@ -679,18 +694,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
scoped_ptr<SpdyDataFrame> frame(
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), len, flags));
- QueueFrame(frame.get(), stream->priority(), stream);
- return ERR_IO_PENDING;
+ return frame.release();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
+ DCHECK_NE(0u, stream_id);
// TODO(mbelshe): We should send a RST_STREAM control frame here
// so that the server can cancel a large send.
DeleteStream(stream_id, status);
}
+void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
+ DCHECK_EQ(0u, stream->stream_id());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+}
+
void SpdySession::ResetStream(SpdyStreamId stream_id,
SpdyStatusCodes status,
const std::string& description) {
@@ -708,7 +728,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.get(), priority, NULL);
+ QueueFrame(rst_frame.release(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -896,46 +916,22 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !queue_.empty()) {
+ while (in_flight_write_.buffer() || !write_queue_.empty()) {
if (!in_flight_write_.buffer()) {
- // Grab the next SpdyFrame to send.
- SpdyIOBuffer next_buffer = queue_.top();
- queue_.pop();
-
- // We've deferred compression until just before we write it to the socket,
- // which is now. At this time, we don't compress our data frames.
- SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- size_t size;
- if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
- DCHECK(uncompressed_frame.is_control_frame());
- const SpdyControlFrame* uncompressed_control_frame =
- reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame);
- scoped_ptr<SpdyFrame> compressed_frame(
- buffered_spdy_framer_->CompressControlFrame(
- *uncompressed_control_frame));
- if (!compressed_frame.get()) {
- RecordProtocolErrorHistogram(
- PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
- CloseSessionOnError(
- net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
- return;
- }
-
- size = compressed_frame->length() + SpdyFrame::kHeaderSize;
-
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
-
- // Attempt to send the frame.
- in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
- next_buffer.stream());
- } else {
- size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
- in_flight_write_ = next_buffer;
- }
+ // Grab the next SpdyBuffer to send.
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
+ scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
+ stream_producers_.erase(producer.get());
+ write_queue_.pop();
+ // It is possible that a stream had data to write, but a
+ // WINDOW_UPDATE frame has been received which made that
+ // stream no longer writable.
+ // TODO(rch): consider handling that case by removing the
+ // stream from the writable queue?
+ if (buffer == NULL)
+ continue;
+
+ in_flight_write_ = *buffer;
} else {
DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
@@ -982,16 +978,33 @@ void SpdySession::CloseAllStreams(net::Error status) {
while (!active_streams_.empty()) {
ActiveStreamMap::iterator it = active_streams_.begin();
const scoped_refptr<SpdyStream>& stream = it->second;
- DCHECK(stream);
- std::string description = base::StringPrintf(
- "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
- stream->LogStreamError(status, description);
+ LogAbandonedStream(stream, status);
DeleteStream(stream->stream_id(), status);
}
+ while (!created_streams_.empty()) {
+ CreatedStreamSet::iterator it = created_streams_.begin();
+ const scoped_refptr<SpdyStream>& stream = *it;
+ LogAbandonedStream(stream, status);
+ stream->OnClose(status);
+ created_streams_.erase(it);
+ }
+
// We also need to drain the queue.
- while (queue_.size())
- queue_.pop();
+ while (!write_queue_.empty()) {
+ SpdyIOBufferProducer* producer = write_queue_.top();
+ stream_producers_.erase(producer);
+ delete producer;
+ write_queue_.pop();
+ }
+}
+
+void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
+ net::Error status) {
+ DCHECK(stream);
+ std::string description = base::StringPrintf(
+ "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
+ stream->LogStreamError(status, description);
}
int SpdySession::GetNewStreamId() {
@@ -1002,17 +1015,6 @@ int SpdySession::GetNewStreamId() {
return id;
}
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- int length = SpdyFrame::kHeaderSize + frame->length();
- IOBuffer* buffer = new IOBuffer(length);
- memcpy(buffer->data(), frame->data(), length);
- queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
-
- WriteSocketLater();
-}
-
void SpdySession::CloseSessionOnError(net::Error err,
bool remove_from_pool,
const std::string& description) {
@@ -1099,7 +1101,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return connection_->socket()->GetLocalAddress(address);
}
+class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
+ public:
+ SimpleSpdyIOBufferProducer(SpdyFrame* frame,
+ RequestPriority priority)
+ : frame_(frame),
+ priority_(priority) {
+ }
+
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return priority_;
+ }
+
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) {
+ return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ frame_, priority_, NULL);
+ }
+
+ private:
+ SpdyFrame* frame_;
+ RequestPriority priority_;
+};
+
+void SpdySession::QueueFrame(SpdyFrame* frame,
+ RequestPriority priority) {
+ SimpleSpdyIOBufferProducer* producer =
+ new SimpleSpdyIOBufferProducer(frame, priority);
+ write_queue_.push(producer);
+ WriteSocketLater();
+}
+
void SpdySession::ActivateStream(SpdyStream* stream) {
+ if (stream->stream_id() == 0) {
+ stream->set_stream_id(GetNewStreamId());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+ }
const SpdyStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
@@ -1127,6 +1163,19 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
if (it2 == active_streams_.end())
return;
+ // Possibly remove from the write queue.
+ WriteQueue old = write_queue_;
+ write_queue_ = WriteQueue();
+ while (!old.empty()) {
+ SpdyIOBufferProducer* producer = old.top();
+ StreamProducerMap::iterator it = stream_producers_.find(producer);
+ if (it == stream_producers_.end() || it->second->stream_id() != id)
+ write_queue_.push(producer);
+ else
+ delete producer;
+ old.pop();
+ }
+
// If this is an active stream, call the callback.
const scoped_refptr<SpdyStream> stream(it2->second);
active_streams_.erase(it2);
@@ -1350,8 +1399,8 @@ void SpdySession::OnSynStream(
return;
}
- scoped_refptr<SpdyStream> stream(
- new SpdyStream(this, stream_id, true, net_log_));
+ scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
+ stream->set_stream_id(stream_id);
stream->set_path(gurl.PathForRequest());
stream->set_send_window_size(initial_send_window_size_);
@@ -1373,7 +1422,6 @@ void SpdySession::OnSynStream(
void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame,
const linked_ptr<SpdyHeaderBlock>& headers) {
SpdyStreamId stream_id = frame.stream_id();
-
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
@@ -1557,7 +1605,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.get(), stream->priority(), NULL);
+ QueueFrame(window_update_frame.release(), stream->priority());
}
// Given a cwnd that we would have sent to the server, modify it based on the
@@ -1596,7 +1644,6 @@ void SpdySession::SendInitialSettings() {
settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_);
}
- sent_settings_ = true;
SendSettings(settings_map);
}
@@ -1629,7 +1676,6 @@ void SpdySession::SendInitialSettings() {
HandleSetting(new_id, new_val);
}
- sent_settings_ = true;
SendSettings(settings_map_new);
}
@@ -1643,7 +1689,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdySettingsControlFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
- QueueFrame(settings_frame.get(), HIGHEST, NULL);
+ sent_settings_ = true;
+ QueueFrame(settings_frame.release(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -1678,6 +1725,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
DCHECK(stream);
stream->AdjustSendWindowSize(delta_window_size);
}
+
+ CreatedStreamSet::iterator i;
+ for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
+ const scoped_refptr<SpdyStream>& stream = *i;
+ stream->AdjustSendWindowSize(delta_window_size);
+ }
}
void SpdySession::SendPrefacePingIfNoneInFlight() {
@@ -1698,7 +1751,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyPingControlFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
- QueueFrame(ping_frame.get(), HIGHEST, NULL);
+ QueueFrame(ping_frame.release(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index 844d37b..ceab965 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -20,6 +20,7 @@
#include "net/base/load_states.h"
#include "net/base/net_errors.h"
#include "net/base/request_priority.h"
+#include "net/base/ssl_client_cert_type.h"
#include "net/base/ssl_config_service.h"
#include "net/base/upload_data_stream.h"
#include "net/socket/client_socket_handle.h"
@@ -94,6 +95,32 @@ COMPILE_ASSERT(PROTOCOL_ERROR_UNEXPECTED_PING ==
class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
public BufferedSpdyFramerVisitorInterface {
public:
+ // Defines an interface for producing SpdyIOBuffers.
+ class NET_EXPORT_PRIVATE SpdyIOBufferProducer {
+ public:
+ SpdyIOBufferProducer() {}
+
+ // Returns a newly created SpdyIOBuffer, owned by the caller, or NULL
+ // if not buffer is ready to be produced.
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0;
+
+ virtual RequestPriority GetPriority() const = 0;
+
+ virtual ~SpdyIOBufferProducer() {}
+
+ protected:
+ // Activates |spdy_stream| in |spdy_session|.
+ static void ActivateStream(SpdySession* spdy_session,
+ SpdyStream* spdy_stream);
+
+ static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame,
+ RequestPriority priority,
+ SpdyStream* spdy_stream);
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SpdyIOBufferProducer);
+ };
+
// Create a new SpdySession.
// |host_port_proxy_pair| is the host/port that this session connects to, and
// the proxy configuration settings that it's using.
@@ -158,9 +185,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// authentication now.
bool VerifyDomainAuthentication(const std::string& domain);
+ // Records that |stream| has a write available from |producer|.
+ // |producer| will be owned by this SpdySession.
+ void SetStreamHasWriteAvailable(SpdyStream* stream,
+ SpdyIOBufferProducer* producer);
+
// Send the SYN frame for |stream_id|. This also sends PING message to check
// the status of the connection.
- int WriteSynStream(
+ SpdySynStreamControlFrame* CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
@@ -168,21 +200,24 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
const linked_ptr<SpdyHeaderBlock>& headers);
// Write a CREDENTIAL frame to the session.
- int WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority);
+ SpdyCredentialControlFrame* CreateCredentialFrame(const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority);
// Write a data frame to the stream.
// Used to create and queue a data frame for the given stream.
- int WriteStreamData(SpdyStreamId stream_id, net::IOBuffer* data,
- int len,
- SpdyDataFlags flags);
+ SpdyDataFrame* CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags);
// Close a stream.
void CloseStream(SpdyStreamId stream_id, int status);
+ // Close a stream that has been created but is not yet active.
+ void CloseCreatedStream(SpdyStream* stream, int status);
+
// Reset a stream by sending a RST_STREAM frame with given status code.
// Also closes the stream. Was not piggybacked to CloseStream since not
// all of the calls to CloseStream necessitate sending a RST_STREAM.
@@ -269,7 +304,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Returns true if session is not currently active
bool is_active() const {
- return !active_streams_.empty();
+ return !active_streams_.empty() || !created_streams_.empty();
}
// Access to the number of active and pending streams. These are primarily
@@ -278,6 +313,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
size_t num_unclaimed_pushed_streams() const {
return unclaimed_pushed_streams_.size();
}
+ size_t num_created_streams() const { return created_streams_.size(); }
// Returns true if flow control is enabled for the session.
bool is_flow_control_enabled() const {
@@ -353,7 +389,18 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
typedef std::map<int, scoped_refptr<SpdyStream> > ActiveStreamMap;
// Only HTTP push a stream.
typedef std::map<std::string, scoped_refptr<SpdyStream> > PushedStreamMap;
- typedef std::priority_queue<SpdyIOBuffer> OutputQueue;
+ typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet;
+ typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap;
+ class SpdyIOBufferProducerCompare {
+ public:
+ bool operator() (const SpdyIOBufferProducer* lhs,
+ const SpdyIOBufferProducer* rhs) const {
+ return lhs->GetPriority() < rhs->GetPriority();
+ }
+ };
+ typedef std::priority_queue<SpdyIOBufferProducer*,
+ std::vector<SpdyIOBufferProducer*>,
+ SpdyIOBufferProducerCompare> WriteQueue;
struct CallbackResultPair {
CallbackResultPair(const CompletionCallback& callback_in, int result_in)
@@ -431,9 +478,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Queue a frame for sending.
// |frame| is the frame to send.
// |priority| is the priority for insertion into the queue.
- // |stream| is the stream which this IO is associated with (or NULL).
- void QueueFrame(SpdyFrame* frame, RequestPriority priority,
- SpdyStream* stream);
+ void QueueFrame(SpdyFrame* frame, RequestPriority priority);
// Track active streams in the active stream list.
void ActivateStream(SpdyStream* stream);
@@ -459,6 +504,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Closes all streams. Used as part of shutdown.
void CloseAllStreams(net::Error status);
+ void LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
+ net::Error status);
+
// Invokes a user callback for stream creation. We provide this method so it
// can be deferred to the MessageLoop, so we avoid re-entrancy problems.
void InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream>* stream);
@@ -563,8 +611,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// server, but do not have consumers yet.
PushedStreamMap unclaimed_pushed_streams_;
- // As we gather data to be sent, we put it into the output queue.
- OutputQueue queue_;
+ // Set of all created streams but that have not yet sent any frames.
+ CreatedStreamSet created_streams_;
+
+ // As streams have data to be sent, we put them into the write queue.
+ WriteQueue write_queue_;
+
+ // Mapping from SpdyIOBufferProducers to their corresponding SpdyStream
+ // so that when a stream is destroyed, we can remove the corresponding
+ // producer from |write_queue_|.
+ StreamProducerMap stream_producers_;
// The packet we are currently sending.
bool write_pending_; // Will be true when a write is in progress.
diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc
index 6ee4257..2cbb283 100644
--- a/net/spdy/spdy_session_spdy2_unittest.cc
+++ b/net/spdy/spdy_session_spdy2_unittest.cc
@@ -330,7 +330,7 @@ TEST_F(SpdySessionSpdy2Test, FailedPing) {
// Assert session is not closed.
EXPECT_FALSE(session->IsClosed());
- EXPECT_LT(0u, session->num_active_streams());
+ EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams());
EXPECT_TRUE(spdy_session_pool->HasSession(pair));
// We set last time we have received any data in 1 sec less than now.
@@ -450,8 +450,8 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) {
// Make sessions 1 and 3 inactive, but keep them open.
// Session 2 still open and active
- session1->CloseStream(spdy_stream1->stream_id(), OK);
- session3->CloseStream(spdy_stream3->stream_id(), OK);
+ session1->CloseCreatedStream(spdy_stream1, OK);
+ session3->CloseCreatedStream(spdy_stream3, OK);
EXPECT_FALSE(session1->is_active());
EXPECT_FALSE(session1->IsClosed());
EXPECT_TRUE(session2->is_active());
@@ -474,7 +474,7 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) {
EXPECT_FALSE(session2->IsClosed());
// Make 2 not active
- session2->CloseStream(spdy_stream2->stream_id(), OK);
+ session2->CloseCreatedStream(spdy_stream2, OK);
EXPECT_FALSE(session2->is_active());
EXPECT_FALSE(session2->IsClosed());
@@ -1109,8 +1109,8 @@ TEST_F(SpdySessionSpdy2Test, CloseSessionOnError) {
TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) {
// Construct the request.
MockConnect connect_data(SYNCHRONOUS, OK);
- scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST));
MockWrite writes[] = {
CreateMockWrite(*req1, 2),
CreateMockWrite(*req2, 1),
@@ -1174,14 +1174,14 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) {
GURL url1("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1,
BoundNetLog(), callback1.callback()));
- EXPECT_EQ(1u, spdy_stream1->stream_id());
+ EXPECT_EQ(0u, spdy_stream1->stream_id());
scoped_refptr<SpdyStream> spdy_stream2;
TestCompletionCallback callback2;
GURL url2("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2,
BoundNetLog(), callback2.callback()));
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(0u, spdy_stream2->stream_id());
linked_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
(*headers)["method"] = "GET";
@@ -1199,8 +1199,8 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) {
spdy_stream2->SendRequest(false);
MessageLoop::current()->RunAllPending();
- EXPECT_EQ(1u, spdy_stream1->stream_id());
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(3u, spdy_stream1->stream_id());
+ EXPECT_EQ(1u, spdy_stream2->stream_id());
spdy_stream1->Cancel();
spdy_stream1 = NULL;
diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc
index 299186e..8862889 100644
--- a/net/spdy/spdy_session_spdy3_unittest.cc
+++ b/net/spdy/spdy_session_spdy3_unittest.cc
@@ -330,7 +330,7 @@ TEST_F(SpdySessionSpdy3Test, FailedPing) {
// Assert session is not closed.
EXPECT_FALSE(session->IsClosed());
- EXPECT_LT(0u, session->num_active_streams());
+ EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams());
EXPECT_TRUE(spdy_session_pool->HasSession(pair));
// We set last time we have received any data in 1 sec less than now.
@@ -450,8 +450,8 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) {
// Make sessions 1 and 3 inactive, but keep them open.
// Session 2 still open and active
- session1->CloseStream(spdy_stream1->stream_id(), OK);
- session3->CloseStream(spdy_stream3->stream_id(), OK);
+ session1->CloseCreatedStream(spdy_stream1, OK);
+ session3->CloseCreatedStream(spdy_stream3, OK);
EXPECT_FALSE(session1->is_active());
EXPECT_FALSE(session1->IsClosed());
EXPECT_TRUE(session2->is_active());
@@ -474,7 +474,7 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) {
EXPECT_FALSE(session2->IsClosed());
// Make 2 not active
- session2->CloseStream(spdy_stream2->stream_id(), OK);
+ session2->CloseCreatedStream(spdy_stream2, OK);
EXPECT_FALSE(session2->is_active());
EXPECT_FALSE(session2->IsClosed());
@@ -1193,22 +1193,24 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) {
MockConnect connect_data(SYNCHRONOUS, OK);
scoped_ptr<SpdyFrame> settings_frame(ConstructSpdySettings(new_settings));
MockRead reads[] = {
- CreateMockRead(*settings_frame),
- MockRead(SYNCHRONOUS, 0, 0) // EOF
+ CreateMockRead(*settings_frame, 0),
+ MockRead(ASYNC, 0, 2) // EOF
};
SpdySessionDependencies session_deps;
+
session_deps.host_resolver->set_synchronous_mode(true);
- StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0);
- data.set_connect_data(connect_data);
- session_deps.socket_factory->AddSocketDataProvider(&data);
+ scoped_refptr<DeterministicSocketData> data =
+ new DeterministicSocketData(reads, arraysize(reads), NULL, 0);
+ data->set_connect_data(connect_data);
+ session_deps.deterministic_socket_factory->AddSocketDataProvider(data);
SSLSocketDataProvider ssl(SYNCHRONOUS, OK);
session_deps.socket_factory->AddSSLSocketDataProvider(&ssl);
scoped_refptr<HttpNetworkSession> http_session(
- SpdySessionDependencies::SpdyCreateSession(&session_deps));
+ SpdySessionDependencies::SpdyCreateSessionDeterministic(&session_deps));
const std::string kTestHost("www.foo.com");
const int kTestPort = 80;
@@ -1248,6 +1250,7 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) {
callback1.callback()));
EXPECT_NE(spdy_stream1->send_window_size(), window_size);
+ data->RunFor(1); // Process the SETTINGS frame, but not the EOF
MessageLoop::current()->RunAllPending();
EXPECT_EQ(session->initial_send_window_size(), window_size);
EXPECT_EQ(spdy_stream1->send_window_size(), window_size);
@@ -1272,8 +1275,8 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) {
TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) {
// Construct the request.
MockConnect connect_data(SYNCHRONOUS, OK);
- scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST));
MockWrite writes[] = {
CreateMockWrite(*req1, 2),
CreateMockWrite(*req2, 1),
@@ -1337,14 +1340,14 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) {
GURL url1("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1,
BoundNetLog(), callback1.callback()));
- EXPECT_EQ(1u, spdy_stream1->stream_id());
+ EXPECT_EQ(0u, spdy_stream1->stream_id());
scoped_refptr<SpdyStream> spdy_stream2;
TestCompletionCallback callback2;
GURL url2("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2,
BoundNetLog(), callback2.callback()));
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(0u, spdy_stream2->stream_id());
linked_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
(*headers)[":method"] = "GET";
@@ -1362,8 +1365,8 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) {
spdy_stream2->SendRequest(false);
MessageLoop::current()->RunAllPending();
- EXPECT_EQ(1u, spdy_stream1->stream_id());
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(3u, spdy_stream1->stream_id());
+ EXPECT_EQ(1u, spdy_stream2->stream_id());
spdy_stream1->Cancel();
spdy_stream1 = NULL;
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index 09a184b..3ad7544 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -50,11 +50,10 @@ bool ContainsUpperAscii(const std::string& str) {
} // namespace
SpdyStream::SpdyStream(SpdySession* session,
- SpdyStreamId stream_id,
bool pushed,
const BoundNetLog& net_log)
: continue_buffering_data_(true),
- stream_id_(stream_id),
+ stream_id_(0),
priority_(HIGHEST),
slot_(0),
stalled_by_flow_control_(false),
@@ -77,8 +76,77 @@ SpdyStream::SpdyStream(SpdySession* session,
domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) {
}
+class SpdyStream::SpdyStreamIOBufferProducer
+ : public SpdySession::SpdyIOBufferProducer {
+ public:
+ SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {}
+
+ // SpdyFrameProducer
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return stream_->priority();
+ }
+
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
+ if (stream_->stream_id() == 0)
+ SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_);
+ SpdyFrame* frame = stream_->ProduceNextFrame();
+ return frame == NULL ? NULL :
+ SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ frame, GetPriority(), stream_);
+ }
+
+ private:
+ scoped_refptr<SpdyStream> stream_;
+};
+
+void SpdyStream::SetHasWriteAvailable() {
+ session_->SetStreamHasWriteAvailable(this,
+ new SpdyStreamIOBufferProducer(this));
+}
+
+SpdyFrame* SpdyStream::ProduceNextFrame() {
+ if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) {
+ CHECK(request_.get());
+ CHECK_GT(stream_id_, 0u);
+
+ std::string origin = GetUrl().GetOrigin().spec();
+ DCHECK(origin[origin.length() - 1] == '/');
+ origin.erase(origin.length() - 1); // Trim trailing slash.
+ SpdyCredentialControlFrame* frame = session_->CreateCredentialFrame(
+ origin, domain_bound_cert_type_, domain_bound_private_key_,
+ domain_bound_cert_, priority_);
+ return frame;
+ } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) {
+ CHECK(request_.get());
+ CHECK_GT(stream_id_, 0u);
+
+ SpdyControlFlags flags =
+ has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN;
+ SpdySynStreamControlFrame* frame = session_->CreateSynStream(
+ stream_id_, priority_, slot_, flags, request_);
+ send_time_ = base::TimeTicks::Now();
+ return frame;
+ } else {
+ CHECK(!cancelled());
+ // We must need to write stream data.
+ // Until the headers have been completely sent, we can not be sure
+ // that our stream_id is correct.
+ DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
+ DCHECK_GT(stream_id_, 0u);
+ DCHECK(!pending_data_frames_.empty());
+ SpdyFrame* frame = pending_data_frames_.front();
+ pending_data_frames_.pop_front();
+ return frame;
+ }
+}
+
SpdyStream::~SpdyStream() {
UpdateHistograms();
+ while (!pending_data_frames_.empty()) {
+ SpdyFrame* frame = pending_data_frames_.back();
+ pending_data_frames_.pop_back();
+ delete frame;
+ }
}
void SpdyStream::SetDelegate(Delegate* delegate) {
@@ -472,7 +540,10 @@ void SpdyStream::Cancel() {
}
void SpdyStream::Close() {
- session_->CloseStream(stream_id_, net::OK);
+ if (stream_id_ != 0)
+ session_->CloseStream(stream_id_, net::OK);
+ else
+ session_->CloseCreatedStream(this, OK);
}
int SpdyStream::SendRequest(bool has_upload_data) {
@@ -499,7 +570,13 @@ int SpdyStream::WriteStreamData(IOBuffer* data, int length,
// Until the headers have been completely sent, we can not be sure
// that our stream_id is correct.
DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
- return session_->WriteStreamData(stream_id_, data, length, flags);
+ CHECK_GT(stream_id_, 0u);
+
+ pending_data_frames_.push_back(
+ session_->CreateDataFrame(stream_id_, data, length, flags));
+
+ SetHasWriteAvailable();
+ return ERR_IO_PENDING;
}
bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
@@ -643,14 +720,8 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) {
int SpdyStream::DoSendDomainBoundCert() {
io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE;
CHECK(request_.get());
- std::string origin = GetUrl().GetOrigin().spec();
- origin.erase(origin.length() - 1); // trim trailing slash
- int rv = session_->WriteCredentialFrame(
- origin, domain_bound_cert_type_, domain_bound_private_key_,
- domain_bound_cert_, priority_);
- if (rv != ERR_IO_PENDING)
- return rv;
- return OK;
+ SetHasWriteAvailable();
+ return ERR_IO_PENDING;
}
int SpdyStream::DoSendDomainBoundCertComplete(int result) {
@@ -664,18 +735,7 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) {
int SpdyStream::DoSendHeaders() {
CHECK(!cancelled_);
- SpdyControlFlags flags = CONTROL_FLAG_NONE;
- if (!has_upload_data_)
- flags = CONTROL_FLAG_FIN;
-
- CHECK(request_.get());
- int result = session_->WriteSynStream(
- stream_id_, priority_, slot_, flags,
- request_);
- if (result != ERR_IO_PENDING)
- return result;
-
- send_time_ = base::TimeTicks::Now();
+ SetHasWriteAvailable();
io_state_ = STATE_SEND_HEADERS_COMPLETE;
return ERR_IO_PENDING;
}
diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h
index dddc0e7..ea85ce4 100644
--- a/net/spdy/spdy_stream.h
+++ b/net/spdy/spdy_stream.h
@@ -6,6 +6,7 @@
#define NET_SPDY_SPDY_STREAM_H_
#pragma once
+#include <list>
#include <string>
#include <vector>
@@ -24,12 +25,12 @@
#include "net/socket/ssl_client_socket.h"
#include "net/spdy/spdy_framer.h"
#include "net/spdy/spdy_protocol.h"
+#include "net/spdy/spdy_session.h"
namespace net {
class AddressList;
class IPEndPoint;
-class SpdySession;
class SSLCertRequestInfo;
class SSLInfo;
@@ -95,7 +96,6 @@ class NET_EXPORT_PRIVATE SpdyStream
// SpdyStream constructor
SpdyStream(SpdySession* session,
- SpdyStreamId stream_id,
bool pushed,
const BoundNetLog& net_log);
@@ -260,6 +260,8 @@ class NET_EXPORT_PRIVATE SpdyStream
int GetProtocolVersion() const;
private:
+ class SpdyStreamIOBufferProducer;
+
enum State {
STATE_NONE,
STATE_GET_DOMAIN_BOUND_CERT,
@@ -308,6 +310,14 @@ class NET_EXPORT_PRIVATE SpdyStream
// the MessageLoop to replay all the data that the server has already sent.
void PushedStreamReplayData();
+ // Informs the SpdySession that this stream has a write available.
+ void SetHasWriteAvailable();
+
+ // Returns a newly created SPDY frame owned by the called that contains
+ // the next frame to be sent by this frame. May return NULL if this
+ // stream has become stalled on flow control.
+ SpdyFrame* ProduceNextFrame();
+
// There is a small period of time between when a server pushed stream is
// first created, and the pushed data is replayed. Any data received during
// this time should continue to be buffered.
@@ -343,6 +353,8 @@ class NET_EXPORT_PRIVATE SpdyStream
linked_ptr<SpdyHeaderBlock> response_;
base::Time response_time_;
+ std::list<SpdyFrame*> pending_data_frames_;
+
State io_state_;
// Since we buffer the response, we also buffer the response status.
diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc
index 4d7fd48..99ede76 100644
--- a/net/spdy/spdy_stream_spdy2_unittest.cc
+++ b/net/spdy/spdy_stream_spdy2_unittest.cc
@@ -277,9 +277,9 @@ TEST_F(SpdyStreamSpdy2Test, PushedStream) {
// Conjure up a stream.
scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session,
- 2,
true,
net_log);
+ stream->set_stream_id(2);
EXPECT_FALSE(stream->response_received());
EXPECT_FALSE(stream->HasUrl());
@@ -408,10 +408,10 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) {
EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
- const SpdyStreamId stream_id = stream->stream_id();
-
EXPECT_EQ(OK, callback.WaitForResult());
+ const SpdyStreamId stream_id = stream->stream_id();
+
EXPECT_TRUE(delegate->send_headers_completed());
EXPECT_EQ("200", (*delegate->response())["status"]);
EXPECT_EQ("HTTP/1.1", (*delegate->response())["version"]);
diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc
index b7db4e2..0dbaf2e 100644
--- a/net/spdy/spdy_stream_spdy3_unittest.cc
+++ b/net/spdy/spdy_stream_spdy3_unittest.cc
@@ -278,9 +278,9 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) {
// Conjure up a stream.
scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session,
- 2,
true,
net_log);
+ stream->set_stream_id(2);
EXPECT_FALSE(stream->response_received());
EXPECT_FALSE(stream->HasUrl());
@@ -413,10 +413,10 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) {
EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
- const SpdyStreamId stream_id = stream->stream_id();
-
EXPECT_EQ(OK, callback.WaitForResult());
+ const SpdyStreamId stream_id = stream->stream_id();
+
EXPECT_TRUE(delegate->send_headers_completed());
EXPECT_EQ("200", (*delegate->response())[":status"]);
EXPECT_EQ("HTTP/1.1", (*delegate->response())[":version"]);
diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h
index d8d85da..4a208b9 100644
--- a/net/spdy/spdy_websocket_stream.h
+++ b/net/spdy/spdy_websocket_stream.h
@@ -84,6 +84,8 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream
virtual void set_chunk_callback(ChunkCallback* callback) OVERRIDE;
private:
+ friend class SpdyWebSocketStreamSpdy2Test;
+ friend class SpdyWebSocketStreamSpdy3Test;
FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy2Test, Basic);
FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy3Test, Basic);
diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
index 5f468c9..d6e75a6 100644
--- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
+++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
@@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test {
OrderedSocketData* data() { return data_.get(); }
void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) {
+ // Record the actual stream_id.
+ created_stream_id_ = websocket_stream_->stream_->stream_id();
websocket_stream_->SendData(kMessageFrame, kMessageFrameLength);
}
@@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test {
scoped_refptr<TransportSocketParams> transport_params_;
scoped_ptr<SpdyWebSocketStream> websocket_stream_;
SpdyStreamId stream_id_;
+ SpdyStreamId created_stream_id_;
scoped_ptr<SpdyFrame> request_frame_;
scoped_ptr<SpdyFrame> response_frame_;
scoped_ptr<SpdyFrame> message_frame_;
@@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, Basic) {
ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log));
ASSERT_TRUE(websocket_stream_->stream_);
- EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id());
SendRequest();
completion_callback_.WaitForResult();
+ EXPECT_EQ(stream_id_, created_stream_id_);
+
websocket_stream_.reset();
const std::vector<SpdyWebSocketStreamEvent>& events =
@@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, DestructionAfterExplicitClose) {
}
TEST_F(SpdyWebSocketStreamSpdy2Test, IOPending) {
- Prepare(3);
+ Prepare(1);
scoped_ptr<SpdyFrame> settings_frame(
ConstructSpdySettings(spdy_settings_to_send_));
MockWrite writes[] = {
diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
index 5a21e66..fcc5ccb 100644
--- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
+++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
@@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test {
OrderedSocketData* data() { return data_.get(); }
void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) {
+ // Record the actual stream_id.
+ created_stream_id_ = websocket_stream_->stream_->stream_id();
websocket_stream_->SendData(kMessageFrame, kMessageFrameLength);
}
@@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test {
scoped_refptr<TransportSocketParams> transport_params_;
scoped_ptr<SpdyWebSocketStream> websocket_stream_;
SpdyStreamId stream_id_;
+ SpdyStreamId created_stream_id_;
scoped_ptr<SpdyFrame> request_frame_;
scoped_ptr<SpdyFrame> response_frame_;
scoped_ptr<SpdyFrame> message_frame_;
@@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, Basic) {
ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log));
ASSERT_TRUE(websocket_stream_->stream_);
- EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id());
SendRequest();
completion_callback_.WaitForResult();
+ EXPECT_EQ(stream_id_, created_stream_id_);
+
websocket_stream_.reset();
const std::vector<SpdyWebSocketStreamEvent>& events =
@@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, DestructionAfterExplicitClose) {
}
TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) {
- Prepare(3);
+ Prepare(1);
scoped_ptr<SpdyFrame> settings_frame(
ConstructSpdySettings(spdy_settings_to_send_));
MockWrite writes[] = {
@@ -576,7 +580,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) {
ASSERT_EQ(ERR_IO_PENDING, websocket_stream_->InitializeStream(
url, HIGHEST, net_log));
- // Delete the fist stream to allow create the second stream.
+ // Delete the first stream to allow create the second stream.
block_stream.reset();
ASSERT_EQ(OK, sync_callback_.WaitForResult());