summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorrch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-07-26 23:53:21 +0000
committerrch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-07-26 23:53:21 +0000
commitc92f4b4541bbe727a5c9b333171e1a58bab89773 (patch)
treee3f86fa38f5453ab48db72a2eddfed23bad82285 /net
parent84d8904f31c411d102d50698bad7dfdfcc671978 (diff)
downloadchromium_src-c92f4b4541bbe727a5c9b333171e1a58bab89773.zip
chromium_src-c92f4b4541bbe727a5c9b333171e1a58bab89773.tar.gz
chromium_src-c92f4b4541bbe727a5c9b333171e1a58bab89773.tar.bz2
Instead of enqueueing SPDY frames, enqueue SPDY streams that are ready to produce data. This allows us to lazily allocate a stream id.
The second CL was reverted because of use-after-free problems. Producers were deleted before they were pop()'d from the WriteQueue, which turns out to be a no-no. This version fixes this defect. The initial CL was reverted because of memory leaks. Both SpdyIOBufferProducers leaked the SpdyFrame they owned. The second version of the CL fixes this defect. Attempting to re-land 144649 Revert 147692 - Revert 144655 - Revert 144649 BUG=111708 TEST=net_unittests Review URL: https://chromiumcodereview.appspot.com/10815074 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@148660 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/http/http_network_transaction_spdy2_unittest.cc4
-rw-r--r--net/http/http_network_transaction_spdy3_unittest.cc4
-rw-r--r--net/spdy/spdy_io_buffer.cc15
-rw-r--r--net/spdy/spdy_io_buffer.h10
-rw-r--r--net/spdy/spdy_network_transaction_spdy2_unittest.cc41
-rw-r--r--net/spdy/spdy_network_transaction_spdy3_unittest.cc41
-rw-r--r--net/spdy/spdy_session.cc251
-rw-r--r--net/spdy/spdy_session.h88
-rw-r--r--net/spdy/spdy_session_spdy2_unittest.cc22
-rw-r--r--net/spdy/spdy_session_spdy3_unittest.cc37
-rw-r--r--net/spdy/spdy_stream.cc109
-rw-r--r--net/spdy/spdy_stream.h16
-rw-r--r--net/spdy/spdy_stream_spdy2_unittest.cc6
-rw-r--r--net/spdy/spdy_stream_spdy3_unittest.cc6
-rw-r--r--net/spdy/spdy_websocket_stream.h2
-rw-r--r--net/spdy/spdy_websocket_stream_spdy2_unittest.cc8
-rw-r--r--net/spdy/spdy_websocket_stream_spdy3_unittest.cc10
17 files changed, 461 insertions, 209 deletions
diff --git a/net/http/http_network_transaction_spdy2_unittest.cc b/net/http/http_network_transaction_spdy2_unittest.cc
index 07e91425..7c10eb0 100644
--- a/net/http/http_network_transaction_spdy2_unittest.cc
+++ b/net/http/http_network_transaction_spdy2_unittest.cc
@@ -4939,8 +4939,8 @@ TEST_F(HttpNetworkTransactionSpdy2Test, BasicAuthSpdyProxy) {
MockWrite spdy_writes[] = {
CreateMockWrite(*req, 1, ASYNC),
- CreateMockWrite(*connect2, 4),
- CreateMockWrite(*rst, 5, ASYNC),
+ CreateMockWrite(*rst, 4, ASYNC),
+ CreateMockWrite(*connect2, 5),
CreateMockWrite(*wrapped_get, 8),
};
diff --git a/net/http/http_network_transaction_spdy3_unittest.cc b/net/http/http_network_transaction_spdy3_unittest.cc
index 0f665e8..7ccddeb 100644
--- a/net/http/http_network_transaction_spdy3_unittest.cc
+++ b/net/http/http_network_transaction_spdy3_unittest.cc
@@ -4939,8 +4939,8 @@ TEST_F(HttpNetworkTransactionSpdy3Test, BasicAuthSpdyProxy) {
MockWrite spdy_writes[] = {
CreateMockWrite(*req, 1, ASYNC),
- CreateMockWrite(*connect2, 4),
- CreateMockWrite(*rst, 5, ASYNC),
+ CreateMockWrite(*rst, 4, ASYNC),
+ CreateMockWrite(*connect2, 5),
CreateMockWrite(*wrapped_get, 8)
};
diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc
index 5720f7d..f0c14b1 100644
--- a/net/spdy/spdy_io_buffer.cc
+++ b/net/spdy/spdy_io_buffer.cc
@@ -20,8 +20,23 @@ SpdyIOBuffer::SpdyIOBuffer(
SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) {
}
+SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) {
+ buffer_ = rhs.buffer_;
+ priority_ = rhs.priority_;
+ position_ = rhs.position_;
+ stream_ = rhs.stream_;
+}
+
SpdyIOBuffer::~SpdyIOBuffer() {}
+SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) {
+ buffer_ = rhs.buffer_;
+ priority_ = rhs.priority_;
+ position_ = rhs.position_;
+ stream_ = rhs.stream_;
+ return *this;
+}
+
void SpdyIOBuffer::release() {
buffer_ = NULL;
stream_ = NULL;
diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h
index e4033c2..9bbee16 100644
--- a/net/spdy/spdy_io_buffer.h
+++ b/net/spdy/spdy_io_buffer.h
@@ -8,10 +8,12 @@
#include "base/memory/ref_counted.h"
#include "net/base/io_buffer.h"
#include "net/base/net_export.h"
-#include "net/spdy/spdy_stream.h"
+#include "net/base/request_priority.h"
namespace net {
+class SpdyStream;
+
// A class for managing SPDY IO buffers. These buffers need to be prioritized
// so that the SpdySession sends them in the right order. Further, they need
// to track the SpdyStream which they are associated with so that incremental
@@ -25,8 +27,14 @@ class NET_EXPORT_PRIVATE SpdyIOBuffer {
// |stream| is a pointer to the stream which is managing this buffer.
SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority,
SpdyStream* stream);
+ // Declare this instead of using the default so that we avoid needing to
+ // include spdy_stream.h.
+ SpdyIOBuffer(const SpdyIOBuffer& rhs);
SpdyIOBuffer();
~SpdyIOBuffer();
+ // Declare this instead of using the default so that we avoid needing to
+ // include spdy_stream.h.
+ SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs);
// Accessors.
DrainableIOBuffer* buffer() const { return buffer_; }
diff --git a/net/spdy/spdy_network_transaction_spdy2_unittest.cc b/net/spdy/spdy_network_transaction_spdy2_unittest.cc
index 95bfc87..2cff815 100644
--- a/net/spdy/spdy_network_transaction_spdy2_unittest.cc
+++ b/net/spdy/spdy_network_transaction_spdy2_unittest.cc
@@ -1709,21 +1709,35 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, PostWithEarlySynReply) {
scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0));
scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true));
MockRead reads[] = {
- CreateMockRead(*stream_reply, 2),
- CreateMockRead(*stream_body, 3),
- MockRead(SYNCHRONOUS, 0, 0) // EOF
+ CreateMockRead(*stream_reply, 1),
+ MockRead(ASYNC, 0, 3) // EOF
};
- DelayedSocketData data(0, reads, arraysize(reads), NULL, 0);
- NormalSpdyTransactionHelper helper(request,
+ scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0));
+ scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true));
+ MockRead writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*body, 2),
+ };
+
+ DeterministicSocketData data(reads, arraysize(reads),
+ writes, arraysize(writes));
+ NormalSpdyTransactionHelper helper(CreatePostRequest(),
BoundNetLog(), GetParam(), NULL);
+ helper.SetDeterministic();
helper.RunPreTestSetup();
- helper.AddData(&data);
- helper.RunDefaultTest();
- helper.VerifyDataConsumed();
+ helper.AddDeterministicData(&data);
+ HttpNetworkTransaction* trans = helper.trans();
- TransactionHelperResult out = helper.output();
- EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv);
+ TestCompletionCallback callback;
+ int rv = trans->Start(
+ &CreatePostRequest(), callback.callback(), BoundNetLog());
+ EXPECT_EQ(ERR_IO_PENDING, rv);
+
+ data.RunFor(2);
+ rv = callback.WaitForResult();
+ EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv);
+ data.RunFor(1);
}
// The client upon cancellation tries to send a RST_STREAM frame. The mock
@@ -5457,10 +5471,11 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, OutOfOrderSynStream) {
// This first request will start to establish the SpdySession.
// Then we will start the second (MEDIUM priority) and then third
// (HIGHEST priority) request in such a way that the third will actually
- // start before the second, causing the second to be re-numbered.
+ // start before the second, causing the second to be numbered differently
+ // than the order they were created.
scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM));
- scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM));
MockWrite writes[] = {
CreateMockWrite(*req1, 0),
CreateMockWrite(*req2, 3),
diff --git a/net/spdy/spdy_network_transaction_spdy3_unittest.cc b/net/spdy/spdy_network_transaction_spdy3_unittest.cc
index db7a5c4..e93733b 100644
--- a/net/spdy/spdy_network_transaction_spdy3_unittest.cc
+++ b/net/spdy/spdy_network_transaction_spdy3_unittest.cc
@@ -1715,21 +1715,35 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, PostWithEarlySynReply) {
scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0));
scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true));
MockRead reads[] = {
- CreateMockRead(*stream_reply, 2),
- CreateMockRead(*stream_body, 3),
- MockRead(SYNCHRONOUS, 0, 0) // EOF
+ CreateMockRead(*stream_reply, 1),
+ MockRead(ASYNC, 0, 3) // EOF
};
- DelayedSocketData data(0, reads, arraysize(reads), NULL, 0);
- NormalSpdyTransactionHelper helper(request,
+ scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0));
+ scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true));
+ MockRead writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*body, 2),
+ };
+
+ DeterministicSocketData data(reads, arraysize(reads),
+ writes, arraysize(writes));
+ NormalSpdyTransactionHelper helper(CreatePostRequest(),
BoundNetLog(), GetParam(), NULL);
+ helper.SetDeterministic();
helper.RunPreTestSetup();
- helper.AddData(&data);
- helper.RunDefaultTest();
- helper.VerifyDataConsumed();
+ helper.AddDeterministicData(&data);
+ HttpNetworkTransaction* trans = helper.trans();
- TransactionHelperResult out = helper.output();
- EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv);
+ TestCompletionCallback callback;
+ int rv = trans->Start(
+ &CreatePostRequest(), callback.callback(), BoundNetLog());
+ EXPECT_EQ(ERR_IO_PENDING, rv);
+
+ data.RunFor(2);
+ rv = callback.WaitForResult();
+ EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv);
+ data.RunFor(1);
}
// The client upon cancellation tries to send a RST_STREAM frame. The mock
@@ -6016,10 +6030,11 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, OutOfOrderSynStream) {
// This first request will start to establish the SpdySession.
// Then we will start the second (MEDIUM priority) and then third
// (HIGHEST priority) request in such a way that the third will actually
- // start before the second, causing the second to be re-numbered.
+ // start before the second, causing the second to be numbered differently
+ // than they order they were created.
scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM));
- scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM));
MockWrite writes[] = {
CreateMockWrite(*req1, 0),
CreateMockWrite(*req2, 3),
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 1051cb9..c372b87 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -194,6 +194,28 @@ static ExternalTimeFunc g_time_func = base::TimeTicks::Now;
} // namespace
// static
+void SpdySession::SpdyIOBufferProducer::ActivateStream(
+ SpdySession* spdy_session,
+ SpdyStream* spdy_stream) {
+ spdy_session->ActivateStream(spdy_stream);
+}
+
+// static
+SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ SpdyFrame* frame,
+ RequestPriority priority,
+ SpdyStream* stream) {
+ size_t size = frame->length() + SpdyFrame::kHeaderSize;
+ DCHECK_GT(size, 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), frame->data(), size);
+
+ return new SpdyIOBuffer(buffer, size, priority, stream);
+}
+
+// static
void SpdySession::set_default_protocol(NextProto default_protocol) {
g_default_protocol = default_protocol;
}
@@ -389,6 +411,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
ssl_info.cert->VerifyNameMatch(domain);
}
+void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
+ SpdyIOBufferProducer* producer) {
+ write_queue_.push(producer);
+ stream_producers_[producer] = stream;
+ WriteSocketLater();
+}
+
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
@@ -427,7 +456,8 @@ int SpdySession::CreateStream(
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
if (!max_concurrent_streams_ ||
- active_streams_.size() < max_concurrent_streams_) {
+ (active_streams_.size() + created_streams_.size() <
+ max_concurrent_streams_)) {
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
}
@@ -517,10 +547,7 @@ int SpdySession::CreateStreamImpl(
const std::string& path = url.PathForRequest();
- const SpdyStreamId stream_id = GetNewStreamId();
-
*spdy_stream = new SpdyStream(this,
- stream_id,
false,
stream_net_log);
const scoped_refptr<SpdyStream>& stream = *spdy_stream;
@@ -529,14 +556,13 @@ int SpdySession::CreateStreamImpl(
stream->set_path(path);
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
- ActivateStream(stream);
+ created_streams_.insert(stream);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
static_cast<int>(priority), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
- DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
return OK;
}
@@ -558,15 +584,13 @@ int SpdySession::GetProtocolVersion() const {
return buffered_spdy_framer_->protocol_version();
}
-int SpdySession::WriteSynStream(
+SpdySynStreamControlFrame* SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const SpdyHeaderBlock& headers) {
- // Find our stream
- if (!IsStreamActive(stream_id))
- return ERR_INVALID_SPDY_STREAM;
+ CHECK(IsStreamActive(stream_id));
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
@@ -577,11 +601,7 @@ int SpdySession::WriteSynStream(
buffered_spdy_framer_->CreateSynStream(
stream_id, 0,
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
- credential_slot, flags, false, &headers));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order.
- // http://crbug.com/111708
- QueueFrame(syn_frame.get(), HIGHEST, stream);
+ credential_slot, flags, true, &headers));
base::StatsCounter spdy_requests("spdy.requests");
spdy_requests.Increment();
@@ -596,14 +616,15 @@ int SpdySession::WriteSynStream(
stream_id, 0));
}
- return ERR_IO_PENDING;
+ return syn_frame.release();
}
-int SpdySession::WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority) {
+SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
+ const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority) {
DCHECK(is_secure_);
unsigned char secret[32]; // 32 bytes from the spec
GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
@@ -645,24 +666,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyCredentialControlFrame> credential_frame(
buffered_spdy_framer_->CreateCredentialFrame(credential));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order, which means that we need
- // to enqueue all CREDENTIAL frames at this priority to ensure that
- // they are sent *before* the SYN_STREAM that references them.
- // http://crbug.com/111708
- QueueFrame(credential_frame.get(), HIGHEST, NULL);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
}
- return ERR_IO_PENDING;
+ return credential_frame.release();
}
-int SpdySession::WriteStreamData(SpdyStreamId stream_id,
- net::IOBuffer* data, int len,
- SpdyDataFlags flags) {
+SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
@@ -686,7 +701,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return ERR_IO_PENDING;
+ return NULL;
}
int new_len = std::min(len, stream->send_window_size());
if (new_len < len) {
@@ -711,18 +726,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
scoped_ptr<SpdyDataFrame> frame(
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), len, flags));
- QueueFrame(frame.get(), stream->priority(), stream);
- return ERR_IO_PENDING;
+ return frame.release();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
+ DCHECK_NE(0u, stream_id);
// TODO(mbelshe): We should send a RST_STREAM control frame here
// so that the server can cancel a large send.
DeleteStream(stream_id, status);
}
+void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
+ DCHECK_EQ(0u, stream->stream_id());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+}
+
void SpdySession::ResetStream(SpdyStreamId stream_id,
SpdyStatusCodes status,
const std::string& description) {
@@ -740,7 +760,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.get(), priority, NULL);
+ QueueFrame(rst_frame.release(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -928,46 +948,22 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !queue_.empty()) {
+ while (in_flight_write_.buffer() || !write_queue_.empty()) {
if (!in_flight_write_.buffer()) {
- // Grab the next SpdyFrame to send.
- SpdyIOBuffer next_buffer = queue_.top();
- queue_.pop();
-
- // We've deferred compression until just before we write it to the socket,
- // which is now. At this time, we don't compress our data frames.
- SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- size_t size;
- if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
- DCHECK(uncompressed_frame.is_control_frame());
- const SpdyControlFrame* uncompressed_control_frame =
- reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame);
- scoped_ptr<SpdyFrame> compressed_frame(
- buffered_spdy_framer_->CompressControlFrame(
- *uncompressed_control_frame));
- if (!compressed_frame.get()) {
- RecordProtocolErrorHistogram(
- PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
- CloseSessionOnError(
- net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
- return;
- }
-
- size = compressed_frame->length() + SpdyFrame::kHeaderSize;
-
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
-
- // Attempt to send the frame.
- in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
- next_buffer.stream());
- } else {
- size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
- in_flight_write_ = next_buffer;
- }
+ // Grab the next SpdyBuffer to send.
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
+ write_queue_.pop();
+ scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
+ stream_producers_.erase(producer.get());
+ // It is possible that a stream had data to write, but a
+ // WINDOW_UPDATE frame has been received which made that
+ // stream no longer writable.
+ // TODO(rch): consider handling that case by removing the
+ // stream from the writable queue?
+ if (buffer == NULL)
+ continue;
+
+ in_flight_write_ = *buffer;
} else {
DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
@@ -1014,16 +1010,32 @@ void SpdySession::CloseAllStreams(net::Error status) {
while (!active_streams_.empty()) {
ActiveStreamMap::iterator it = active_streams_.begin();
const scoped_refptr<SpdyStream>& stream = it->second;
- DCHECK(stream);
- std::string description = base::StringPrintf(
- "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
- stream->LogStreamError(status, description);
+ LogAbandonedStream(stream, status);
DeleteStream(stream->stream_id(), status);
}
+ while (!created_streams_.empty()) {
+ CreatedStreamSet::iterator it = created_streams_.begin();
+ const scoped_refptr<SpdyStream>& stream = *it;
+ LogAbandonedStream(stream, status);
+ stream->OnClose(status);
+ created_streams_.erase(it);
+ }
+
// We also need to drain the queue.
- while (queue_.size())
- queue_.pop();
+ while (!write_queue_.empty()) {
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
+ write_queue_.pop();
+ stream_producers_.erase(producer.get());
+ }
+}
+
+void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
+ net::Error status) {
+ DCHECK(stream);
+ std::string description = base::StringPrintf(
+ "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
+ stream->LogStreamError(status, description);
}
int SpdySession::GetNewStreamId() {
@@ -1034,17 +1046,6 @@ int SpdySession::GetNewStreamId() {
return id;
}
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- int length = SpdyFrame::kHeaderSize + frame->length();
- IOBuffer* buffer = new IOBuffer(length);
- memcpy(buffer->data(), frame->data(), length);
- queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
-
- WriteSocketLater();
-}
-
void SpdySession::CloseSessionOnError(net::Error err,
bool remove_from_pool,
const std::string& description) {
@@ -1131,7 +1132,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return connection_->socket()->GetLocalAddress(address);
}
+class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
+ public:
+ SimpleSpdyIOBufferProducer(SpdyFrame* frame,
+ RequestPriority priority)
+ : frame_(frame),
+ priority_(priority) {
+ }
+
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return priority_;
+ }
+
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) {
+ return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ frame_.get(), priority_, NULL);
+ }
+
+ private:
+ scoped_ptr<SpdyFrame> frame_;
+ RequestPriority priority_;
+};
+
+void SpdySession::QueueFrame(SpdyFrame* frame,
+ RequestPriority priority) {
+ SimpleSpdyIOBufferProducer* producer =
+ new SimpleSpdyIOBufferProducer(frame, priority);
+ write_queue_.push(producer);
+ WriteSocketLater();
+}
+
void SpdySession::ActivateStream(SpdyStream* stream) {
+ if (stream->stream_id() == 0) {
+ stream->set_stream_id(GetNewStreamId());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+ }
const SpdyStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
@@ -1159,6 +1194,21 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
if (it2 == active_streams_.end())
return;
+ // Possibly remove from the write queue.
+ WriteQueue old = write_queue_;
+ write_queue_ = WriteQueue();
+ while (!old.empty()) {
+ scoped_ptr<SpdyIOBufferProducer> producer(old.top());
+ old.pop();
+ StreamProducerMap::iterator it = stream_producers_.find(producer.get());
+ if (it == stream_producers_.end() || it->second->stream_id() != id) {
+ write_queue_.push(producer.release());
+ } else {
+ stream_producers_.erase(producer.get());
+ producer.reset(NULL);
+ }
+ }
+
// If this is an active stream, call the callback.
const scoped_refptr<SpdyStream> stream(it2->second);
active_streams_.erase(it2);
@@ -1379,8 +1429,8 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id,
return;
}
- scoped_refptr<SpdyStream> stream(
- new SpdyStream(this, stream_id, true, net_log_));
+ scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
+ stream->set_stream_id(stream_id);
stream->set_path(gurl.PathForRequest());
stream->set_send_window_size(initial_send_window_size_);
@@ -1618,7 +1668,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.get(), stream->priority(), NULL);
+ QueueFrame(window_update_frame.release(), stream->priority());
}
// Given a cwnd that we would have sent to the server, modify it based on the
@@ -1657,7 +1707,6 @@ void SpdySession::SendInitialSettings() {
settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_);
}
- sent_settings_ = true;
SendSettings(settings_map);
}
@@ -1690,7 +1739,6 @@ void SpdySession::SendInitialSettings() {
HandleSetting(new_id, new_val);
}
- sent_settings_ = true;
SendSettings(settings_map_new);
}
@@ -1704,7 +1752,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdySettingsControlFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
- QueueFrame(settings_frame.get(), HIGHEST, NULL);
+ sent_settings_ = true;
+ QueueFrame(settings_frame.release(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -1739,6 +1788,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
DCHECK(stream);
stream->AdjustSendWindowSize(delta_window_size);
}
+
+ CreatedStreamSet::iterator i;
+ for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
+ const scoped_refptr<SpdyStream>& stream = *i;
+ stream->AdjustSendWindowSize(delta_window_size);
+ }
}
void SpdySession::SendPrefacePingIfNoneInFlight() {
@@ -1759,7 +1814,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyPingControlFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
- QueueFrame(ping_frame.get(), HIGHEST, NULL);
+ QueueFrame(ping_frame.release(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index 2975203..3d95101 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -19,6 +19,7 @@
#include "net/base/load_states.h"
#include "net/base/net_errors.h"
#include "net/base/request_priority.h"
+#include "net/base/ssl_client_cert_type.h"
#include "net/base/ssl_config_service.h"
#include "net/base/upload_data_stream.h"
#include "net/socket/client_socket_handle.h"
@@ -93,6 +94,32 @@ COMPILE_ASSERT(PROTOCOL_ERROR_UNEXPECTED_PING ==
class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
public BufferedSpdyFramerVisitorInterface {
public:
+ // Defines an interface for producing SpdyIOBuffers.
+ class NET_EXPORT_PRIVATE SpdyIOBufferProducer {
+ public:
+ SpdyIOBufferProducer() {}
+
+ // Returns a newly created SpdyIOBuffer, owned by the caller, or NULL
+ // if not buffer is ready to be produced.
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0;
+
+ virtual RequestPriority GetPriority() const = 0;
+
+ virtual ~SpdyIOBufferProducer() {}
+
+ protected:
+ // Activates |spdy_stream| in |spdy_session|.
+ static void ActivateStream(SpdySession* spdy_session,
+ SpdyStream* spdy_stream);
+
+ static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame,
+ RequestPriority priority,
+ SpdyStream* spdy_stream);
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SpdyIOBufferProducer);
+ };
+
// Create a new SpdySession.
// |host_port_proxy_pair| is the host/port that this session connects to, and
// the proxy configuration settings that it's using.
@@ -157,9 +184,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// authentication now.
bool VerifyDomainAuthentication(const std::string& domain);
+ // Records that |stream| has a write available from |producer|.
+ // |producer| will be owned by this SpdySession.
+ void SetStreamHasWriteAvailable(SpdyStream* stream,
+ SpdyIOBufferProducer* producer);
+
// Send the SYN frame for |stream_id|. This also sends PING message to check
// the status of the connection.
- int WriteSynStream(
+ SpdySynStreamControlFrame* CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
@@ -167,21 +199,24 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
const SpdyHeaderBlock& headers);
// Write a CREDENTIAL frame to the session.
- int WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority);
+ SpdyCredentialControlFrame* CreateCredentialFrame(const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority);
// Write a data frame to the stream.
// Used to create and queue a data frame for the given stream.
- int WriteStreamData(SpdyStreamId stream_id, net::IOBuffer* data,
- int len,
- SpdyDataFlags flags);
+ SpdyDataFrame* CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags);
// Close a stream.
void CloseStream(SpdyStreamId stream_id, int status);
+ // Close a stream that has been created but is not yet active.
+ void CloseCreatedStream(SpdyStream* stream, int status);
+
// Reset a stream by sending a RST_STREAM frame with given status code.
// Also closes the stream. Was not piggybacked to CloseStream since not
// all of the calls to CloseStream necessitate sending a RST_STREAM.
@@ -268,7 +303,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Returns true if session is not currently active
bool is_active() const {
- return !active_streams_.empty();
+ return !active_streams_.empty() || !created_streams_.empty();
}
// Access to the number of active and pending streams. These are primarily
@@ -277,6 +312,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
size_t num_unclaimed_pushed_streams() const {
return unclaimed_pushed_streams_.size();
}
+ size_t num_created_streams() const { return created_streams_.size(); }
// Returns true if flow control is enabled for the session.
bool is_flow_control_enabled() const {
@@ -356,6 +392,19 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> > PushedStreamMap;
typedef std::priority_queue<SpdyIOBuffer> OutputQueue;
+ typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet;
+ typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap;
+ class SpdyIOBufferProducerCompare {
+ public:
+ bool operator() (const SpdyIOBufferProducer* lhs,
+ const SpdyIOBufferProducer* rhs) const {
+ return lhs->GetPriority() < rhs->GetPriority();
+ }
+ };
+ typedef std::priority_queue<SpdyIOBufferProducer*,
+ std::vector<SpdyIOBufferProducer*>,
+ SpdyIOBufferProducerCompare> WriteQueue;
+
struct CallbackResultPair {
CallbackResultPair(const CompletionCallback& callback_in, int result_in)
: callback(callback_in), result(result_in) {}
@@ -434,9 +483,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Queue a frame for sending.
// |frame| is the frame to send.
// |priority| is the priority for insertion into the queue.
- // |stream| is the stream which this IO is associated with (or NULL).
- void QueueFrame(SpdyFrame* frame, RequestPriority priority,
- SpdyStream* stream);
+ void QueueFrame(SpdyFrame* frame, RequestPriority priority);
// Track active streams in the active stream list.
void ActivateStream(SpdyStream* stream);
@@ -462,6 +509,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Closes all streams. Used as part of shutdown.
void CloseAllStreams(net::Error status);
+ void LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
+ net::Error status);
+
// Invokes a user callback for stream creation. We provide this method so it
// can be deferred to the MessageLoop, so we avoid re-entrancy problems.
void InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream>* stream);
@@ -580,8 +630,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// server, but do not have consumers yet.
PushedStreamMap unclaimed_pushed_streams_;
- // As we gather data to be sent, we put it into the output queue.
- OutputQueue queue_;
+ // Set of all created streams but that have not yet sent any frames.
+ CreatedStreamSet created_streams_;
+
+ // As streams have data to be sent, we put them into the write queue.
+ WriteQueue write_queue_;
+
+ // Mapping from SpdyIOBufferProducers to their corresponding SpdyStream
+ // so that when a stream is destroyed, we can remove the corresponding
+ // producer from |write_queue_|.
+ StreamProducerMap stream_producers_;
// The packet we are currently sending.
bool write_pending_; // Will be true when a write is in progress.
diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc
index cd971d2..9597289 100644
--- a/net/spdy/spdy_session_spdy2_unittest.cc
+++ b/net/spdy/spdy_session_spdy2_unittest.cc
@@ -291,7 +291,7 @@ TEST_F(SpdySessionSpdy2Test, DeleteExpiredPushStreams) {
(*request_headers)["url"] = "/";
scoped_refptr<SpdyStream> stream(
- new SpdyStream(session, 1, false, session->net_log_));
+ new SpdyStream(session, false, session->net_log_));
stream->set_spdy_headers(request_headers.Pass());
session->ActivateStream(stream);
@@ -398,7 +398,7 @@ TEST_F(SpdySessionSpdy2Test, FailedPing) {
// Assert session is not closed.
EXPECT_FALSE(session->IsClosed());
- EXPECT_LT(0u, session->num_active_streams());
+ EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams());
EXPECT_TRUE(spdy_session_pool->HasSession(pair));
// We set last time we have received any data in 1 sec less than now.
@@ -518,8 +518,8 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) {
// Make sessions 1 and 3 inactive, but keep them open.
// Session 2 still open and active
- session1->CloseStream(spdy_stream1->stream_id(), OK);
- session3->CloseStream(spdy_stream3->stream_id(), OK);
+ session1->CloseCreatedStream(spdy_stream1, OK);
+ session3->CloseCreatedStream(spdy_stream3, OK);
EXPECT_FALSE(session1->is_active());
EXPECT_FALSE(session1->IsClosed());
EXPECT_TRUE(session2->is_active());
@@ -542,7 +542,7 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) {
EXPECT_FALSE(session2->IsClosed());
// Make 2 not active
- session2->CloseStream(spdy_stream2->stream_id(), OK);
+ session2->CloseCreatedStream(spdy_stream2, OK);
EXPECT_FALSE(session2->is_active());
EXPECT_FALSE(session2->IsClosed());
@@ -1177,8 +1177,8 @@ TEST_F(SpdySessionSpdy2Test, CloseSessionOnError) {
TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) {
// Construct the request.
MockConnect connect_data(SYNCHRONOUS, OK);
- scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST));
MockWrite writes[] = {
CreateMockWrite(*req1, 2),
CreateMockWrite(*req2, 1),
@@ -1242,14 +1242,14 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) {
GURL url1("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1,
BoundNetLog(), callback1.callback()));
- EXPECT_EQ(1u, spdy_stream1->stream_id());
+ EXPECT_EQ(0u, spdy_stream1->stream_id());
scoped_refptr<SpdyStream> spdy_stream2;
TestCompletionCallback callback2;
GURL url2("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2,
BoundNetLog(), callback2.callback()));
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(0u, spdy_stream2->stream_id());
scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
(*headers)["method"] = "GET";
@@ -1270,8 +1270,8 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) {
spdy_stream2->SendRequest(false);
MessageLoop::current()->RunAllPending();
- EXPECT_EQ(1u, spdy_stream1->stream_id());
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(3u, spdy_stream1->stream_id());
+ EXPECT_EQ(1u, spdy_stream2->stream_id());
spdy_stream1->Cancel();
spdy_stream1 = NULL;
diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc
index 12fc829..59b7382 100644
--- a/net/spdy/spdy_session_spdy3_unittest.cc
+++ b/net/spdy/spdy_session_spdy3_unittest.cc
@@ -291,7 +291,7 @@ TEST_F(SpdySessionSpdy3Test, DeleteExpiredPushStreams) {
(*request_headers)[":path"] = "/";
scoped_refptr<SpdyStream> stream(
- new SpdyStream(session, 1, false, session->net_log_));
+ new SpdyStream(session, false, session->net_log_));
stream->set_spdy_headers(request_headers.Pass());
session->ActivateStream(stream);
@@ -402,7 +402,7 @@ TEST_F(SpdySessionSpdy3Test, FailedPing) {
// Assert session is not closed.
EXPECT_FALSE(session->IsClosed());
- EXPECT_LT(0u, session->num_active_streams());
+ EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams());
EXPECT_TRUE(spdy_session_pool->HasSession(pair));
// We set last time we have received any data in 1 sec less than now.
@@ -522,8 +522,8 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) {
// Make sessions 1 and 3 inactive, but keep them open.
// Session 2 still open and active
- session1->CloseStream(spdy_stream1->stream_id(), OK);
- session3->CloseStream(spdy_stream3->stream_id(), OK);
+ session1->CloseCreatedStream(spdy_stream1, OK);
+ session3->CloseCreatedStream(spdy_stream3, OK);
EXPECT_FALSE(session1->is_active());
EXPECT_FALSE(session1->IsClosed());
EXPECT_TRUE(session2->is_active());
@@ -546,7 +546,7 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) {
EXPECT_FALSE(session2->IsClosed());
// Make 2 not active
- session2->CloseStream(spdy_stream2->stream_id(), OK);
+ session2->CloseCreatedStream(spdy_stream2, OK);
EXPECT_FALSE(session2->is_active());
EXPECT_FALSE(session2->IsClosed());
@@ -1265,22 +1265,24 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) {
MockConnect connect_data(SYNCHRONOUS, OK);
scoped_ptr<SpdyFrame> settings_frame(ConstructSpdySettings(new_settings));
MockRead reads[] = {
- CreateMockRead(*settings_frame),
- MockRead(SYNCHRONOUS, 0, 0) // EOF
+ CreateMockRead(*settings_frame, 0),
+ MockRead(ASYNC, 0, 1) // EOF
};
SpdySessionDependencies session_deps;
+
session_deps.host_resolver->set_synchronous_mode(true);
- StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0);
- data.set_connect_data(connect_data);
- session_deps.socket_factory->AddSocketDataProvider(&data);
+ scoped_ptr<DeterministicSocketData> data(
+ new DeterministicSocketData(reads, arraysize(reads), NULL, 0));
+ data->set_connect_data(connect_data);
+ session_deps.deterministic_socket_factory->AddSocketDataProvider(data.get());
SSLSocketDataProvider ssl(SYNCHRONOUS, OK);
session_deps.socket_factory->AddSSLSocketDataProvider(&ssl);
scoped_refptr<HttpNetworkSession> http_session(
- SpdySessionDependencies::SpdyCreateSession(&session_deps));
+ SpdySessionDependencies::SpdyCreateSessionDeterministic(&session_deps));
const std::string kTestHost("www.foo.com");
const int kTestPort = 80;
@@ -1320,6 +1322,7 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) {
callback1.callback()));
EXPECT_NE(spdy_stream1->send_window_size(), window_size);
+ data->RunFor(1); // Process the SETTINGS frame, but not the EOF
MessageLoop::current()->RunAllPending();
EXPECT_EQ(session->initial_send_window_size(), window_size);
EXPECT_EQ(spdy_stream1->send_window_size(), window_size);
@@ -1344,8 +1347,8 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) {
TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) {
// Construct the request.
MockConnect connect_data(SYNCHRONOUS, OK);
- scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST));
- scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST));
+ scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST));
+ scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST));
MockWrite writes[] = {
CreateMockWrite(*req1, 2),
CreateMockWrite(*req2, 1),
@@ -1409,14 +1412,14 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) {
GURL url1("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1,
BoundNetLog(), callback1.callback()));
- EXPECT_EQ(1u, spdy_stream1->stream_id());
+ EXPECT_EQ(0u, spdy_stream1->stream_id());
scoped_refptr<SpdyStream> spdy_stream2;
TestCompletionCallback callback2;
GURL url2("http://www.google.com");
EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2,
BoundNetLog(), callback2.callback()));
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(0u, spdy_stream2->stream_id());
scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
(*headers)[":method"] = "GET";
@@ -1437,8 +1440,8 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) {
spdy_stream2->SendRequest(false);
MessageLoop::current()->RunAllPending();
- EXPECT_EQ(1u, spdy_stream1->stream_id());
- EXPECT_EQ(3u, spdy_stream2->stream_id());
+ EXPECT_EQ(3u, spdy_stream1->stream_id());
+ EXPECT_EQ(1u, spdy_stream2->stream_id());
spdy_stream1->Cancel();
spdy_stream1 = NULL;
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index 4fe3436..f3828d6 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -50,11 +50,10 @@ bool ContainsUpperAscii(const std::string& str) {
} // namespace
SpdyStream::SpdyStream(SpdySession* session,
- SpdyStreamId stream_id,
bool pushed,
const BoundNetLog& net_log)
: continue_buffering_data_(true),
- stream_id_(stream_id),
+ stream_id_(0),
priority_(HIGHEST),
slot_(0),
stalled_by_flow_control_(false),
@@ -77,8 +76,78 @@ SpdyStream::SpdyStream(SpdySession* session,
domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) {
}
+class SpdyStream::SpdyStreamIOBufferProducer
+ : public SpdySession::SpdyIOBufferProducer {
+ public:
+ SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {}
+
+ // SpdyFrameProducer
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return stream_->priority();
+ }
+
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
+ if (stream_->stream_id() == 0)
+ SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_);
+ frame_.reset(stream_->ProduceNextFrame());
+ return frame_ == NULL ? NULL :
+ SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ frame_.get(), GetPriority(), stream_);
+ }
+
+ private:
+ scoped_refptr<SpdyStream> stream_;
+ scoped_ptr<SpdyFrame> frame_;
+};
+
+void SpdyStream::SetHasWriteAvailable() {
+ session_->SetStreamHasWriteAvailable(this,
+ new SpdyStreamIOBufferProducer(this));
+}
+
+SpdyFrame* SpdyStream::ProduceNextFrame() {
+ if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) {
+ CHECK(request_.get());
+ CHECK_GT(stream_id_, 0u);
+
+ std::string origin = GetUrl().GetOrigin().spec();
+ DCHECK(origin[origin.length() - 1] == '/');
+ origin.erase(origin.length() - 1); // Trim trailing slash.
+ SpdyCredentialControlFrame* frame = session_->CreateCredentialFrame(
+ origin, domain_bound_cert_type_, domain_bound_private_key_,
+ domain_bound_cert_, priority_);
+ return frame;
+ } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) {
+ CHECK(request_.get());
+ CHECK_GT(stream_id_, 0u);
+
+ SpdyControlFlags flags =
+ has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN;
+ SpdySynStreamControlFrame* frame = session_->CreateSynStream(
+ stream_id_, priority_, slot_, flags, *request_);
+ send_time_ = base::TimeTicks::Now();
+ return frame;
+ } else {
+ CHECK(!cancelled());
+ // We must need to write stream data.
+ // Until the headers have been completely sent, we can not be sure
+ // that our stream_id is correct.
+ DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
+ DCHECK_GT(stream_id_, 0u);
+ DCHECK(!pending_data_frames_.empty());
+ SpdyFrame* frame = pending_data_frames_.front();
+ pending_data_frames_.pop_front();
+ return frame;
+ }
+}
+
SpdyStream::~SpdyStream() {
UpdateHistograms();
+ while (!pending_data_frames_.empty()) {
+ SpdyFrame* frame = pending_data_frames_.back();
+ pending_data_frames_.pop_back();
+ delete frame;
+ }
}
void SpdyStream::SetDelegate(Delegate* delegate) {
@@ -460,7 +529,10 @@ void SpdyStream::Cancel() {
}
void SpdyStream::Close() {
- session_->CloseStream(stream_id_, net::OK);
+ if (stream_id_ != 0)
+ session_->CloseStream(stream_id_, net::OK);
+ else
+ session_->CloseCreatedStream(this, OK);
}
int SpdyStream::SendRequest(bool has_upload_data) {
@@ -484,7 +556,13 @@ int SpdyStream::WriteStreamData(IOBuffer* data, int length,
// Until the headers have been completely sent, we can not be sure
// that our stream_id is correct.
DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
- return session_->WriteStreamData(stream_id_, data, length, flags);
+ CHECK_GT(stream_id_, 0u);
+
+ pending_data_frames_.push_back(
+ session_->CreateDataFrame(stream_id_, data, length, flags));
+
+ SetHasWriteAvailable();
+ return ERR_IO_PENDING;
}
bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
@@ -628,14 +706,8 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) {
int SpdyStream::DoSendDomainBoundCert() {
io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE;
CHECK(request_.get());
- std::string origin = GetUrl().GetOrigin().spec();
- origin.erase(origin.length() - 1); // trim trailing slash
- int rv = session_->WriteCredentialFrame(
- origin, domain_bound_cert_type_, domain_bound_private_key_,
- domain_bound_cert_, priority_);
- if (rv != ERR_IO_PENDING)
- return rv;
- return OK;
+ SetHasWriteAvailable();
+ return ERR_IO_PENDING;
}
int SpdyStream::DoSendDomainBoundCertComplete(int result) {
@@ -649,18 +721,7 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) {
int SpdyStream::DoSendHeaders() {
CHECK(!cancelled_);
- SpdyControlFlags flags = CONTROL_FLAG_NONE;
- if (!has_upload_data_)
- flags = CONTROL_FLAG_FIN;
-
- CHECK(request_.get());
- int result = session_->WriteSynStream(
- stream_id_, priority_, slot_, flags,
- *request_);
- if (result != ERR_IO_PENDING)
- return result;
-
- send_time_ = base::TimeTicks::Now();
+ SetHasWriteAvailable();
io_state_ = STATE_SEND_HEADERS_COMPLETE;
return ERR_IO_PENDING;
}
diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h
index 40e436a..8653d9f 100644
--- a/net/spdy/spdy_stream.h
+++ b/net/spdy/spdy_stream.h
@@ -5,6 +5,7 @@
#ifndef NET_SPDY_SPDY_STREAM_H_
#define NET_SPDY_SPDY_STREAM_H_
+#include <list>
#include <string>
#include <vector>
@@ -22,12 +23,12 @@
#include "net/socket/ssl_client_socket.h"
#include "net/spdy/spdy_framer.h"
#include "net/spdy/spdy_protocol.h"
+#include "net/spdy/spdy_session.h"
namespace net {
class AddressList;
class IPEndPoint;
-class SpdySession;
class SSLCertRequestInfo;
class SSLInfo;
@@ -89,7 +90,6 @@ class NET_EXPORT_PRIVATE SpdyStream
// SpdyStream constructor
SpdyStream(SpdySession* session,
- SpdyStreamId stream_id,
bool pushed,
const BoundNetLog& net_log);
@@ -251,6 +251,8 @@ class NET_EXPORT_PRIVATE SpdyStream
int GetProtocolVersion() const;
private:
+ class SpdyStreamIOBufferProducer;
+
enum State {
STATE_NONE,
STATE_GET_DOMAIN_BOUND_CERT,
@@ -300,6 +302,14 @@ class NET_EXPORT_PRIVATE SpdyStream
// the MessageLoop to replay all the data that the server has already sent.
void PushedStreamReplayData();
+ // Informs the SpdySession that this stream has a write available.
+ void SetHasWriteAvailable();
+
+ // Returns a newly created SPDY frame owned by the called that contains
+ // the next frame to be sent by this frame. May return NULL if this
+ // stream has become stalled on flow control.
+ SpdyFrame* ProduceNextFrame();
+
// There is a small period of time between when a server pushed stream is
// first created, and the pushed data is replayed. Any data received during
// this time should continue to be buffered.
@@ -335,6 +345,8 @@ class NET_EXPORT_PRIVATE SpdyStream
scoped_ptr<SpdyHeaderBlock> response_;
base::Time response_time_;
+ std::list<SpdyFrame*> pending_data_frames_;
+
State io_state_;
// Since we buffer the response, we also buffer the response status.
diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc
index 7ffe646..5974e0a 100644
--- a/net/spdy/spdy_stream_spdy2_unittest.cc
+++ b/net/spdy/spdy_stream_spdy2_unittest.cc
@@ -206,9 +206,9 @@ TEST_F(SpdyStreamSpdy2Test, PushedStream) {
// Conjure up a stream.
scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session,
- 2,
true,
net_log);
+ stream->set_stream_id(2);
EXPECT_FALSE(stream->response_received());
EXPECT_FALSE(stream->HasUrl());
@@ -336,10 +336,10 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) {
EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
- const SpdyStreamId stream_id = stream->stream_id();
-
EXPECT_EQ(OK, callback.WaitForResult());
+ const SpdyStreamId stream_id = stream->stream_id();
+
EXPECT_TRUE(delegate->send_headers_completed());
EXPECT_EQ("200", (*delegate->response())["status"]);
EXPECT_EQ("HTTP/1.1", (*delegate->response())["version"]);
diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc
index c23c6df..58d93e9 100644
--- a/net/spdy/spdy_stream_spdy3_unittest.cc
+++ b/net/spdy/spdy_stream_spdy3_unittest.cc
@@ -207,9 +207,9 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) {
// Conjure up a stream.
scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session,
- 2,
true,
net_log);
+ stream->set_stream_id(2);
EXPECT_FALSE(stream->response_received());
EXPECT_FALSE(stream->HasUrl());
@@ -341,10 +341,10 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) {
EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
- const SpdyStreamId stream_id = stream->stream_id();
-
EXPECT_EQ(OK, callback.WaitForResult());
+ const SpdyStreamId stream_id = stream->stream_id();
+
EXPECT_TRUE(delegate->send_headers_completed());
EXPECT_EQ("200", (*delegate->response())[":status"]);
EXPECT_EQ("HTTP/1.1", (*delegate->response())[":version"]);
diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h
index 4adb37c..900232b 100644
--- a/net/spdy/spdy_websocket_stream.h
+++ b/net/spdy/spdy_websocket_stream.h
@@ -82,6 +82,8 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream
virtual void OnClose(int status) OVERRIDE;
private:
+ friend class SpdyWebSocketStreamSpdy2Test;
+ friend class SpdyWebSocketStreamSpdy3Test;
FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy2Test, Basic);
FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy3Test, Basic);
diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
index 99264e1..f5d5b58 100644
--- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
+++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
@@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test {
OrderedSocketData* data() { return data_.get(); }
void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) {
+ // Record the actual stream_id.
+ created_stream_id_ = websocket_stream_->stream_->stream_id();
websocket_stream_->SendData(kMessageFrame, kMessageFrameLength);
}
@@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test {
scoped_refptr<TransportSocketParams> transport_params_;
scoped_ptr<SpdyWebSocketStream> websocket_stream_;
SpdyStreamId stream_id_;
+ SpdyStreamId created_stream_id_;
scoped_ptr<SpdyFrame> request_frame_;
scoped_ptr<SpdyFrame> response_frame_;
scoped_ptr<SpdyFrame> message_frame_;
@@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, Basic) {
ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log));
ASSERT_TRUE(websocket_stream_->stream_);
- EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id());
SendRequest();
completion_callback_.WaitForResult();
+ EXPECT_EQ(stream_id_, created_stream_id_);
+
websocket_stream_.reset();
const std::vector<SpdyWebSocketStreamEvent>& events =
@@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, DestructionAfterExplicitClose) {
}
TEST_F(SpdyWebSocketStreamSpdy2Test, IOPending) {
- Prepare(3);
+ Prepare(1);
scoped_ptr<SpdyFrame> settings_frame(
ConstructSpdySettings(spdy_settings_to_send_));
MockWrite writes[] = {
diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
index c6e1717..167d6af 100644
--- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
+++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
@@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test {
OrderedSocketData* data() { return data_.get(); }
void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) {
+ // Record the actual stream_id.
+ created_stream_id_ = websocket_stream_->stream_->stream_id();
websocket_stream_->SendData(kMessageFrame, kMessageFrameLength);
}
@@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test {
scoped_refptr<TransportSocketParams> transport_params_;
scoped_ptr<SpdyWebSocketStream> websocket_stream_;
SpdyStreamId stream_id_;
+ SpdyStreamId created_stream_id_;
scoped_ptr<SpdyFrame> request_frame_;
scoped_ptr<SpdyFrame> response_frame_;
scoped_ptr<SpdyFrame> message_frame_;
@@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, Basic) {
ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log));
ASSERT_TRUE(websocket_stream_->stream_);
- EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id());
SendRequest();
completion_callback_.WaitForResult();
+ EXPECT_EQ(stream_id_, created_stream_id_);
+
websocket_stream_.reset();
const std::vector<SpdyWebSocketStreamEvent>& events =
@@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, DestructionAfterExplicitClose) {
}
TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) {
- Prepare(3);
+ Prepare(1);
scoped_ptr<SpdyFrame> settings_frame(
ConstructSpdySettings(spdy_settings_to_send_));
MockWrite writes[] = {
@@ -576,7 +580,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) {
ASSERT_EQ(ERR_IO_PENDING, websocket_stream_->InitializeStream(
url, HIGHEST, net_log));
- // Delete the fist stream to allow create the second stream.
+ // Delete the first stream to allow create the second stream.
block_stream.reset();
ASSERT_EQ(OK, sync_callback_.WaitForResult());