diff options
author | rch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-06-28 02:21:41 +0000 |
---|---|---|
committer | rch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-06-28 02:21:41 +0000 |
commit | d5a94433ff8d7324510adae18ab1b9c9ba1394d5 (patch) | |
tree | d88e1d7108c12a87d433c9ded99be60a198f431c | |
parent | a7266a9dc36aa68143d670eb8646f9d67237fa61 (diff) | |
download | chromium_src-d5a94433ff8d7324510adae18ab1b9c9ba1394d5.zip chromium_src-d5a94433ff8d7324510adae18ab1b9c9ba1394d5.tar.gz chromium_src-d5a94433ff8d7324510adae18ab1b9c9ba1394d5.tar.bz2 |
Instead of enqueueing SPDY frames, instead enqueue SPDY streams that are ready to produce data. This allows us to lazily allocate a stream id.
BUG=111708
Review URL: https://chromiumcodereview.appspot.com/10448083
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@144649 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/http/http_network_transaction_spdy2_unittest.cc | 4 | ||||
-rw-r--r-- | net/http/http_network_transaction_spdy3_unittest.cc | 4 | ||||
-rw-r--r-- | net/spdy/spdy_io_buffer.cc | 15 | ||||
-rw-r--r-- | net/spdy/spdy_io_buffer.h | 10 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy2_unittest.cc | 43 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy3_unittest.cc | 45 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 251 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 88 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy2_unittest.cc | 20 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 35 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 108 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 16 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.h | 2 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_spdy2_unittest.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_spdy3_unittest.cc | 10 |
17 files changed, 459 insertions, 212 deletions
diff --git a/net/http/http_network_transaction_spdy2_unittest.cc b/net/http/http_network_transaction_spdy2_unittest.cc index 72d4bf8..2c830bd 100644 --- a/net/http/http_network_transaction_spdy2_unittest.cc +++ b/net/http/http_network_transaction_spdy2_unittest.cc @@ -4944,8 +4944,8 @@ TEST_F(HttpNetworkTransactionSpdy2Test, BasicAuthSpdyProxy) { MockWrite spdy_writes[] = { CreateMockWrite(*req, 0, ASYNC), - CreateMockWrite(*connect2, 2), - CreateMockWrite(*rst, 3, ASYNC), + CreateMockWrite(*rst, 2, ASYNC), + CreateMockWrite(*connect2, 3), CreateMockWrite(*wrapped_get, 5), }; diff --git a/net/http/http_network_transaction_spdy3_unittest.cc b/net/http/http_network_transaction_spdy3_unittest.cc index 331555c..c5d4c74 100644 --- a/net/http/http_network_transaction_spdy3_unittest.cc +++ b/net/http/http_network_transaction_spdy3_unittest.cc @@ -4944,8 +4944,8 @@ TEST_F(HttpNetworkTransactionSpdy3Test, BasicAuthSpdyProxy) { MockWrite spdy_writes[] = { CreateMockWrite(*req, 0, ASYNC), - CreateMockWrite(*connect2, 2), - CreateMockWrite(*rst, 3, ASYNC), + CreateMockWrite(*rst, 2, ASYNC), + CreateMockWrite(*connect2, 3), CreateMockWrite(*wrapped_get, 5) }; diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc index 5720f7d..f0c14b1 100644 --- a/net/spdy/spdy_io_buffer.cc +++ b/net/spdy/spdy_io_buffer.cc @@ -20,8 +20,23 @@ SpdyIOBuffer::SpdyIOBuffer( SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) { } +SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) { + buffer_ = rhs.buffer_; + priority_ = rhs.priority_; + position_ = rhs.position_; + stream_ = rhs.stream_; +} + SpdyIOBuffer::~SpdyIOBuffer() {} +SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) { + buffer_ = rhs.buffer_; + priority_ = rhs.priority_; + position_ = rhs.position_; + stream_ = rhs.stream_; + return *this; +} + void SpdyIOBuffer::release() { buffer_ = NULL; stream_ = NULL; diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h index 0aaced3..05ec59c 100644 --- a/net/spdy/spdy_io_buffer.h +++ b/net/spdy/spdy_io_buffer.h @@ -9,10 +9,12 @@ #include "base/memory/ref_counted.h" #include "net/base/io_buffer.h" #include "net/base/net_export.h" -#include "net/spdy/spdy_stream.h" +#include "net/base/request_priority.h" namespace net { +class SpdyStream; + // A class for managing SPDY IO buffers. These buffers need to be prioritized // so that the SpdySession sends them in the right order. Further, they need // to track the SpdyStream which they are associated with so that incremental @@ -26,8 +28,14 @@ class NET_EXPORT_PRIVATE SpdyIOBuffer { // |stream| is a pointer to the stream which is managing this buffer. SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority, SpdyStream* stream); + // Declare this instead of using the default so that we avoid needing to + // include spdy_stream.h. + SpdyIOBuffer(const SpdyIOBuffer& rhs); SpdyIOBuffer(); ~SpdyIOBuffer(); + // Declare this instead of using the default so that we avoid needing to + // include spdy_stream.h. + SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs); // Accessors. DrainableIOBuffer* buffer() const { return buffer_; } diff --git a/net/spdy/spdy_network_transaction_spdy2_unittest.cc b/net/spdy/spdy_network_transaction_spdy2_unittest.cc index a52859f..a4c8864 100644 --- a/net/spdy/spdy_network_transaction_spdy2_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy2_unittest.cc @@ -1736,22 +1736,36 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0)); scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { - CreateMockRead(*stream_reply, 2), - CreateMockRead(*stream_body, 3), - MockRead(SYNCHRONOUS, 0, 0) // EOF + CreateMockRead(*stream_reply, 1), + MockRead(ASYNC, 0, 3) // EOF }; - scoped_ptr<DelayedSocketData> data( - new DelayedSocketData(0, reads, arraysize(reads), NULL, 0)); - NormalSpdyTransactionHelper helper(request, + scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0)); + scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true)); + MockRead writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*body, 2), + }; + + scoped_refptr<DeterministicSocketData> data( + new DeterministicSocketData(reads, arraysize(reads), + writes, arraysize(writes))); + NormalSpdyTransactionHelper helper(CreatePostRequest(), BoundNetLog(), GetParam(), NULL); + helper.SetDeterministic(); helper.RunPreTestSetup(); - helper.AddData(data.get()); - helper.RunDefaultTest(); - helper.VerifyDataConsumed(); + helper.AddDeterministicData(data.get()); + HttpNetworkTransaction* trans = helper.trans(); - TransactionHelperResult out = helper.output(); - EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); + TestCompletionCallback callback; + int rv = trans->Start( + &CreatePostRequest(), callback.callback(), BoundNetLog()); + EXPECT_EQ(ERR_IO_PENDING, rv); + + data->RunFor(2); + rv = callback.WaitForResult(); + EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); + data->RunFor(1); } // The client upon cancellation tries to send a RST_STREAM frame. The mock @@ -5565,10 +5579,11 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, OutOfOrderSynStream) { // This first request will start to establish the SpdySession. // Then we will start the second (MEDIUM priority) and then third // (HIGHEST priority) request in such a way that the third will actually - // start before the second, causing the second to be re-numbered. + // start before the second, causing the second to be numbered differently + // than the order they were created. scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM)); - scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM)); MockWrite writes[] = { CreateMockWrite(*req1, 0), CreateMockWrite(*req2, 3), diff --git a/net/spdy/spdy_network_transaction_spdy3_unittest.cc b/net/spdy/spdy_network_transaction_spdy3_unittest.cc index 4eca577..349e37c 100644 --- a/net/spdy/spdy_network_transaction_spdy3_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy3_unittest.cc @@ -1745,22 +1745,36 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0)); scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { - CreateMockRead(*stream_reply, 2), - CreateMockRead(*stream_body, 3), - MockRead(SYNCHRONOUS, 0, 0) // EOF + CreateMockRead(*stream_reply, 1), + MockRead(ASYNC, 0, 3) // EOF }; - scoped_ptr<DelayedSocketData> data( - new DelayedSocketData(0, reads, arraysize(reads), NULL, 0)); - NormalSpdyTransactionHelper helper(request, + scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0)); + scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true)); + MockRead writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*body, 2), + }; + + scoped_refptr<DeterministicSocketData> data( + new DeterministicSocketData(reads, arraysize(reads), + writes, arraysize(writes))); + NormalSpdyTransactionHelper helper(CreatePostRequest(), BoundNetLog(), GetParam(), NULL); + helper.SetDeterministic(); helper.RunPreTestSetup(); - helper.AddData(data.get()); - helper.RunDefaultTest(); - helper.VerifyDataConsumed(); + helper.AddDeterministicData(data.get()); + HttpNetworkTransaction* trans = helper.trans(); - TransactionHelperResult out = helper.output(); - EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); + TestCompletionCallback callback; + int rv = trans->Start( + &CreatePostRequest(), callback.callback(), BoundNetLog()); + EXPECT_EQ(ERR_IO_PENDING, rv); + + data->RunFor(2); + rv = callback.WaitForResult(); + EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); + data->RunFor(1); } // The client upon cancellation tries to send a RST_STREAM frame. The mock @@ -2063,7 +2077,7 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, WindowUpdateOverflow) { // WINDOW_UPDATE while sending a request and will send a RST_STREAM frame. MockWrite writes[] = { CreateMockWrite(*req), - CreateMockWrite(*body), + //CreateMockWrite(*body), CreateMockWrite(*rst), }; @@ -6148,10 +6162,11 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, OutOfOrderSynStream) { // This first request will start to establish the SpdySession. // Then we will start the second (MEDIUM priority) and then third // (HIGHEST priority) request in such a way that the third will actually - // start before the second, causing the second to be re-numbered. + // start before the second, causing the second to be numbered differently + // than they order they were created. scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM)); - scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM)); MockWrite writes[] = { CreateMockWrite(*req1, 0), CreateMockWrite(*req2, 3), diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 613d242..2b7ab3a 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -184,6 +184,28 @@ bool g_enable_ping_based_connection_checking = true; } // namespace // static +void SpdySession::SpdyIOBufferProducer::ActivateStream( + SpdySession* spdy_session, + SpdyStream* spdy_stream) { + spdy_session->ActivateStream(spdy_stream); +} + +// static +SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( + SpdyFrame* frame, + RequestPriority priority, + SpdyStream* stream) { + size_t size = frame->length() + SpdyFrame::kHeaderSize; + DCHECK_GT(size, 0u); + + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), frame->data(), size); + + return new SpdyIOBuffer(buffer, size, priority, stream); +} + +// static void SpdySession::set_default_protocol(NextProto default_protocol) { g_default_protocol = default_protocol; } @@ -360,6 +382,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { ssl_info.cert->VerifyNameMatch(domain); } +void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, + SpdyIOBufferProducer* producer) { + write_queue_.push(producer); + stream_producers_[producer] = stream; + WriteSocketLater(); +} + int SpdySession::GetPushStream( const GURL& url, scoped_refptr<SpdyStream>* stream, @@ -398,7 +427,8 @@ int SpdySession::CreateStream( const BoundNetLog& stream_net_log, const CompletionCallback& callback) { if (!max_concurrent_streams_ || - active_streams_.size() < max_concurrent_streams_) { + (active_streams_.size() + created_streams_.size() < + max_concurrent_streams_)) { return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); } @@ -488,10 +518,7 @@ int SpdySession::CreateStreamImpl( const std::string& path = url.PathForRequest(); - const SpdyStreamId stream_id = GetNewStreamId(); - *spdy_stream = new SpdyStream(this, - stream_id, false, stream_net_log); const scoped_refptr<SpdyStream>& stream = *spdy_stream; @@ -500,14 +527,13 @@ int SpdySession::CreateStreamImpl( stream->set_path(path); stream->set_send_window_size(initial_send_window_size_); stream->set_recv_window_size(initial_recv_window_size_); - ActivateStream(stream); + created_streams_.insert(stream); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", static_cast<int>(priority), 0, 10, 11); // TODO(mbelshe): Optimize memory allocations - DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); return OK; } @@ -529,15 +555,13 @@ int SpdySession::GetProtocolVersion() const { return buffered_spdy_framer_->protocol_version(); } -int SpdySession::WriteSynStream( +SpdySynStreamControlFrame* SpdySession::CreateSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, SpdyControlFlags flags, const linked_ptr<SpdyHeaderBlock>& headers) { - // Find our stream - if (!IsStreamActive(stream_id)) - return ERR_INVALID_SPDY_STREAM; + CHECK(IsStreamActive(stream_id)); const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; CHECK_EQ(stream->stream_id(), stream_id); @@ -548,11 +572,7 @@ int SpdySession::WriteSynStream( buffered_spdy_framer_->CreateSynStream( stream_id, 0, ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), - credential_slot, flags, false, headers.get())); - // We enqueue all SYN_STREAM frames at the same priority to ensure - // that we do not send them out-of-order. - // http://crbug.com/111708 - QueueFrame(syn_frame.get(), HIGHEST, stream); + credential_slot, flags, true, headers.get())); base::StatsCounter spdy_requests("spdy.requests"); spdy_requests.Increment(); @@ -564,14 +584,15 @@ int SpdySession::WriteSynStream( base::Bind(&NetLogSpdySynCallback, headers.get(), flags, stream_id, 0)); } - return ERR_IO_PENDING; + return syn_frame.release(); } -int SpdySession::WriteCredentialFrame(const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority) { +SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame( + const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority) { DCHECK(is_secure_); unsigned char secret[32]; // 32 bytes from the spec GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof", @@ -613,24 +634,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin, DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyCredentialControlFrame> credential_frame( buffered_spdy_framer_->CreateCredentialFrame(credential)); - // We enqueue all SYN_STREAM frames at the same priority to ensure - // that we do not send them out-of-order, which means that we need - // to enqueue all CREDENTIAL frames at this priority to ensure that - // they are sent *before* the SYN_STREAM that references them. - // http://crbug.com/111708 - QueueFrame(credential_frame.get(), HIGHEST, NULL); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); } - return ERR_IO_PENDING; + return credential_frame.release(); } -int SpdySession::WriteStreamData(SpdyStreamId stream_id, - net::IOBuffer* data, int len, - SpdyDataFlags flags) { +SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, + net::IOBuffer* data, int len, + SpdyDataFlags flags) { // Find our stream CHECK(IsStreamActive(stream_id)); scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; @@ -654,7 +669,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return ERR_IO_PENDING; + return NULL; } int new_len = std::min(len, stream->send_window_size()); if (new_len < len) { @@ -679,18 +694,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, scoped_ptr<SpdyDataFrame> frame( buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), len, flags)); - QueueFrame(frame.get(), stream->priority(), stream); - return ERR_IO_PENDING; + return frame.release(); } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { + DCHECK_NE(0u, stream_id); // TODO(mbelshe): We should send a RST_STREAM control frame here // so that the server can cancel a large send. DeleteStream(stream_id, status); } +void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) { + DCHECK_EQ(0u, stream->stream_id()); + created_streams_.erase(scoped_refptr<SpdyStream>(stream)); +} + void SpdySession::ResetStream(SpdyStreamId stream_id, SpdyStatusCodes status, const std::string& description) { @@ -708,7 +728,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; priority = stream->priority(); } - QueueFrame(rst_frame.get(), priority, NULL); + QueueFrame(rst_frame.release(), priority); RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); @@ -896,46 +916,22 @@ void SpdySession::WriteSocket() { // Loop sending frames until we've sent everything or until the write // returns error (or ERR_IO_PENDING). DCHECK(buffered_spdy_framer_.get()); - while (in_flight_write_.buffer() || !queue_.empty()) { + while (in_flight_write_.buffer() || !write_queue_.empty()) { if (!in_flight_write_.buffer()) { - // Grab the next SpdyFrame to send. - SpdyIOBuffer next_buffer = queue_.top(); - queue_.pop(); - - // We've deferred compression until just before we write it to the socket, - // which is now. At this time, we don't compress our data frames. - SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); - size_t size; - if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) { - DCHECK(uncompressed_frame.is_control_frame()); - const SpdyControlFrame* uncompressed_control_frame = - reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame); - scoped_ptr<SpdyFrame> compressed_frame( - buffered_spdy_framer_->CompressControlFrame( - *uncompressed_control_frame)); - if (!compressed_frame.get()) { - RecordProtocolErrorHistogram( - PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE); - CloseSessionOnError( - net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure."); - return; - } - - size = compressed_frame->length() + SpdyFrame::kHeaderSize; - - DCHECK_GT(size, 0u); - - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), compressed_frame->data(), size); - - // Attempt to send the frame. - in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST, - next_buffer.stream()); - } else { - size = uncompressed_frame.length() + SpdyFrame::kHeaderSize; - in_flight_write_ = next_buffer; - } + // Grab the next SpdyBuffer to send. + scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); + scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); + stream_producers_.erase(producer.get()); + write_queue_.pop(); + // It is possible that a stream had data to write, but a + // WINDOW_UPDATE frame has been received which made that + // stream no longer writable. + // TODO(rch): consider handling that case by removing the + // stream from the writable queue? + if (buffer == NULL) + continue; + + in_flight_write_ = *buffer; } else { DCHECK(in_flight_write_.buffer()->BytesRemaining()); } @@ -982,16 +978,33 @@ void SpdySession::CloseAllStreams(net::Error status) { while (!active_streams_.empty()) { ActiveStreamMap::iterator it = active_streams_.begin(); const scoped_refptr<SpdyStream>& stream = it->second; - DCHECK(stream); - std::string description = base::StringPrintf( - "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); - stream->LogStreamError(status, description); + LogAbandonedStream(stream, status); DeleteStream(stream->stream_id(), status); } + while (!created_streams_.empty()) { + CreatedStreamSet::iterator it = created_streams_.begin(); + const scoped_refptr<SpdyStream>& stream = *it; + LogAbandonedStream(stream, status); + stream->OnClose(status); + created_streams_.erase(it); + } + // We also need to drain the queue. - while (queue_.size()) - queue_.pop(); + while (!write_queue_.empty()) { + SpdyIOBufferProducer* producer = write_queue_.top(); + stream_producers_.erase(producer); + delete producer; + write_queue_.pop(); + } +} + +void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, + net::Error status) { + DCHECK(stream); + std::string description = base::StringPrintf( + "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); + stream->LogStreamError(status, description); } int SpdySession::GetNewStreamId() { @@ -1002,17 +1015,6 @@ int SpdySession::GetNewStreamId() { return id; } -void SpdySession::QueueFrame(SpdyFrame* frame, - RequestPriority priority, - SpdyStream* stream) { - int length = SpdyFrame::kHeaderSize + frame->length(); - IOBuffer* buffer = new IOBuffer(length); - memcpy(buffer->data(), frame->data(), length); - queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); - - WriteSocketLater(); -} - void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool, const std::string& description) { @@ -1099,7 +1101,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { return connection_->socket()->GetLocalAddress(address); } +class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { + public: + SimpleSpdyIOBufferProducer(SpdyFrame* frame, + RequestPriority priority) + : frame_(frame), + priority_(priority) { + } + + virtual RequestPriority GetPriority() const OVERRIDE { + return priority_; + } + + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) { + return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( + frame_, priority_, NULL); + } + + private: + SpdyFrame* frame_; + RequestPriority priority_; +}; + +void SpdySession::QueueFrame(SpdyFrame* frame, + RequestPriority priority) { + SimpleSpdyIOBufferProducer* producer = + new SimpleSpdyIOBufferProducer(frame, priority); + write_queue_.push(producer); + WriteSocketLater(); +} + void SpdySession::ActivateStream(SpdyStream* stream) { + if (stream->stream_id() == 0) { + stream->set_stream_id(GetNewStreamId()); + created_streams_.erase(scoped_refptr<SpdyStream>(stream)); + } const SpdyStreamId id = stream->stream_id(); DCHECK(!IsStreamActive(id)); @@ -1127,6 +1163,19 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { if (it2 == active_streams_.end()) return; + // Possibly remove from the write queue. + WriteQueue old = write_queue_; + write_queue_ = WriteQueue(); + while (!old.empty()) { + SpdyIOBufferProducer* producer = old.top(); + StreamProducerMap::iterator it = stream_producers_.find(producer); + if (it == stream_producers_.end() || it->second->stream_id() != id) + write_queue_.push(producer); + else + delete producer; + old.pop(); + } + // If this is an active stream, call the callback. const scoped_refptr<SpdyStream> stream(it2->second); active_streams_.erase(it2); @@ -1350,8 +1399,8 @@ void SpdySession::OnSynStream( return; } - scoped_refptr<SpdyStream> stream( - new SpdyStream(this, stream_id, true, net_log_)); + scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_)); + stream->set_stream_id(stream_id); stream->set_path(gurl.PathForRequest()); stream->set_send_window_size(initial_send_window_size_); @@ -1373,7 +1422,6 @@ void SpdySession::OnSynStream( void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame, const linked_ptr<SpdyHeaderBlock>& headers) { SpdyStreamId stream_id = frame.stream_id(); - if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_SYN_REPLY, @@ -1557,7 +1605,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id, DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame( buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); - QueueFrame(window_update_frame.get(), stream->priority(), NULL); + QueueFrame(window_update_frame.release(), stream->priority()); } // Given a cwnd that we would have sent to the server, modify it based on the @@ -1596,7 +1644,6 @@ void SpdySession::SendInitialSettings() { settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_); } - sent_settings_ = true; SendSettings(settings_map); } @@ -1629,7 +1676,6 @@ void SpdySession::SendInitialSettings() { HandleSetting(new_id, new_val); } - sent_settings_ = true; SendSettings(settings_map_new); } @@ -1643,7 +1689,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdySettingsControlFrame> settings_frame( buffered_spdy_framer_->CreateSettings(settings)); - QueueFrame(settings_frame.get(), HIGHEST, NULL); + sent_settings_ = true; + QueueFrame(settings_frame.release(), HIGHEST); } void SpdySession::HandleSetting(uint32 id, uint32 value) { @@ -1678,6 +1725,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { DCHECK(stream); stream->AdjustSendWindowSize(delta_window_size); } + + CreatedStreamSet::iterator i; + for (i = created_streams_.begin(); i != created_streams_.end(); i++) { + const scoped_refptr<SpdyStream>& stream = *i; + stream->AdjustSendWindowSize(delta_window_size); + } } void SpdySession::SendPrefacePingIfNoneInFlight() { @@ -1698,7 +1751,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyPingControlFrame> ping_frame( buffered_spdy_framer_->CreatePingFrame(next_ping_id_)); - QueueFrame(ping_frame.get(), HIGHEST, NULL); + QueueFrame(ping_frame.release(), HIGHEST); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 844d37b..ceab965 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -20,6 +20,7 @@ #include "net/base/load_states.h" #include "net/base/net_errors.h" #include "net/base/request_priority.h" +#include "net/base/ssl_client_cert_type.h" #include "net/base/ssl_config_service.h" #include "net/base/upload_data_stream.h" #include "net/socket/client_socket_handle.h" @@ -94,6 +95,32 @@ COMPILE_ASSERT(PROTOCOL_ERROR_UNEXPECTED_PING == class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, public BufferedSpdyFramerVisitorInterface { public: + // Defines an interface for producing SpdyIOBuffers. + class NET_EXPORT_PRIVATE SpdyIOBufferProducer { + public: + SpdyIOBufferProducer() {} + + // Returns a newly created SpdyIOBuffer, owned by the caller, or NULL + // if not buffer is ready to be produced. + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0; + + virtual RequestPriority GetPriority() const = 0; + + virtual ~SpdyIOBufferProducer() {} + + protected: + // Activates |spdy_stream| in |spdy_session|. + static void ActivateStream(SpdySession* spdy_session, + SpdyStream* spdy_stream); + + static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame, + RequestPriority priority, + SpdyStream* spdy_stream); + + private: + DISALLOW_COPY_AND_ASSIGN(SpdyIOBufferProducer); + }; + // Create a new SpdySession. // |host_port_proxy_pair| is the host/port that this session connects to, and // the proxy configuration settings that it's using. @@ -158,9 +185,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // authentication now. bool VerifyDomainAuthentication(const std::string& domain); + // Records that |stream| has a write available from |producer|. + // |producer| will be owned by this SpdySession. + void SetStreamHasWriteAvailable(SpdyStream* stream, + SpdyIOBufferProducer* producer); + // Send the SYN frame for |stream_id|. This also sends PING message to check // the status of the connection. - int WriteSynStream( + SpdySynStreamControlFrame* CreateSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, @@ -168,21 +200,24 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, const linked_ptr<SpdyHeaderBlock>& headers); // Write a CREDENTIAL frame to the session. - int WriteCredentialFrame(const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority); + SpdyCredentialControlFrame* CreateCredentialFrame(const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority); // Write a data frame to the stream. // Used to create and queue a data frame for the given stream. - int WriteStreamData(SpdyStreamId stream_id, net::IOBuffer* data, - int len, - SpdyDataFlags flags); + SpdyDataFrame* CreateDataFrame(SpdyStreamId stream_id, + net::IOBuffer* data, int len, + SpdyDataFlags flags); // Close a stream. void CloseStream(SpdyStreamId stream_id, int status); + // Close a stream that has been created but is not yet active. + void CloseCreatedStream(SpdyStream* stream, int status); + // Reset a stream by sending a RST_STREAM frame with given status code. // Also closes the stream. Was not piggybacked to CloseStream since not // all of the calls to CloseStream necessitate sending a RST_STREAM. @@ -269,7 +304,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Returns true if session is not currently active bool is_active() const { - return !active_streams_.empty(); + return !active_streams_.empty() || !created_streams_.empty(); } // Access to the number of active and pending streams. These are primarily @@ -278,6 +313,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, size_t num_unclaimed_pushed_streams() const { return unclaimed_pushed_streams_.size(); } + size_t num_created_streams() const { return created_streams_.size(); } // Returns true if flow control is enabled for the session. bool is_flow_control_enabled() const { @@ -353,7 +389,18 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::map<int, scoped_refptr<SpdyStream> > ActiveStreamMap; // Only HTTP push a stream. typedef std::map<std::string, scoped_refptr<SpdyStream> > PushedStreamMap; - typedef std::priority_queue<SpdyIOBuffer> OutputQueue; + typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet; + typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap; + class SpdyIOBufferProducerCompare { + public: + bool operator() (const SpdyIOBufferProducer* lhs, + const SpdyIOBufferProducer* rhs) const { + return lhs->GetPriority() < rhs->GetPriority(); + } + }; + typedef std::priority_queue<SpdyIOBufferProducer*, + std::vector<SpdyIOBufferProducer*>, + SpdyIOBufferProducerCompare> WriteQueue; struct CallbackResultPair { CallbackResultPair(const CompletionCallback& callback_in, int result_in) @@ -431,9 +478,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Queue a frame for sending. // |frame| is the frame to send. // |priority| is the priority for insertion into the queue. - // |stream| is the stream which this IO is associated with (or NULL). - void QueueFrame(SpdyFrame* frame, RequestPriority priority, - SpdyStream* stream); + void QueueFrame(SpdyFrame* frame, RequestPriority priority); // Track active streams in the active stream list. void ActivateStream(SpdyStream* stream); @@ -459,6 +504,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Closes all streams. Used as part of shutdown. void CloseAllStreams(net::Error status); + void LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, + net::Error status); + // Invokes a user callback for stream creation. We provide this method so it // can be deferred to the MessageLoop, so we avoid re-entrancy problems. void InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream>* stream); @@ -563,8 +611,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // server, but do not have consumers yet. PushedStreamMap unclaimed_pushed_streams_; - // As we gather data to be sent, we put it into the output queue. - OutputQueue queue_; + // Set of all created streams but that have not yet sent any frames. + CreatedStreamSet created_streams_; + + // As streams have data to be sent, we put them into the write queue. + WriteQueue write_queue_; + + // Mapping from SpdyIOBufferProducers to their corresponding SpdyStream + // so that when a stream is destroyed, we can remove the corresponding + // producer from |write_queue_|. + StreamProducerMap stream_producers_; // The packet we are currently sending. bool write_pending_; // Will be true when a write is in progress. diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index 6ee4257..2cbb283 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -330,7 +330,7 @@ TEST_F(SpdySessionSpdy2Test, FailedPing) { // Assert session is not closed. EXPECT_FALSE(session->IsClosed()); - EXPECT_LT(0u, session->num_active_streams()); + EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); EXPECT_TRUE(spdy_session_pool->HasSession(pair)); // We set last time we have received any data in 1 sec less than now. @@ -450,8 +450,8 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) { // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active - session1->CloseStream(spdy_stream1->stream_id(), OK); - session3->CloseStream(spdy_stream3->stream_id(), OK); + session1->CloseCreatedStream(spdy_stream1, OK); + session3->CloseCreatedStream(spdy_stream3, OK); EXPECT_FALSE(session1->is_active()); EXPECT_FALSE(session1->IsClosed()); EXPECT_TRUE(session2->is_active()); @@ -474,7 +474,7 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) { EXPECT_FALSE(session2->IsClosed()); // Make 2 not active - session2->CloseStream(spdy_stream2->stream_id(), OK); + session2->CloseCreatedStream(spdy_stream2, OK); EXPECT_FALSE(session2->is_active()); EXPECT_FALSE(session2->IsClosed()); @@ -1109,8 +1109,8 @@ TEST_F(SpdySessionSpdy2Test, CloseSessionOnError) { TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { // Construct the request. MockConnect connect_data(SYNCHRONOUS, OK); - scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST)); MockWrite writes[] = { CreateMockWrite(*req1, 2), CreateMockWrite(*req2, 1), @@ -1174,14 +1174,14 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { GURL url1("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1, BoundNetLog(), callback1.callback())); - EXPECT_EQ(1u, spdy_stream1->stream_id()); + EXPECT_EQ(0u, spdy_stream1->stream_id()); scoped_refptr<SpdyStream> spdy_stream2; TestCompletionCallback callback2; GURL url2("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2, BoundNetLog(), callback2.callback())); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(0u, spdy_stream2->stream_id()); linked_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); (*headers)["method"] = "GET"; @@ -1199,8 +1199,8 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { spdy_stream2->SendRequest(false); MessageLoop::current()->RunAllPending(); - EXPECT_EQ(1u, spdy_stream1->stream_id()); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(3u, spdy_stream1->stream_id()); + EXPECT_EQ(1u, spdy_stream2->stream_id()); spdy_stream1->Cancel(); spdy_stream1 = NULL; diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 299186e..8862889 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -330,7 +330,7 @@ TEST_F(SpdySessionSpdy3Test, FailedPing) { // Assert session is not closed. EXPECT_FALSE(session->IsClosed()); - EXPECT_LT(0u, session->num_active_streams()); + EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); EXPECT_TRUE(spdy_session_pool->HasSession(pair)); // We set last time we have received any data in 1 sec less than now. @@ -450,8 +450,8 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) { // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active - session1->CloseStream(spdy_stream1->stream_id(), OK); - session3->CloseStream(spdy_stream3->stream_id(), OK); + session1->CloseCreatedStream(spdy_stream1, OK); + session3->CloseCreatedStream(spdy_stream3, OK); EXPECT_FALSE(session1->is_active()); EXPECT_FALSE(session1->IsClosed()); EXPECT_TRUE(session2->is_active()); @@ -474,7 +474,7 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) { EXPECT_FALSE(session2->IsClosed()); // Make 2 not active - session2->CloseStream(spdy_stream2->stream_id(), OK); + session2->CloseCreatedStream(spdy_stream2, OK); EXPECT_FALSE(session2->is_active()); EXPECT_FALSE(session2->IsClosed()); @@ -1193,22 +1193,24 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { MockConnect connect_data(SYNCHRONOUS, OK); scoped_ptr<SpdyFrame> settings_frame(ConstructSpdySettings(new_settings)); MockRead reads[] = { - CreateMockRead(*settings_frame), - MockRead(SYNCHRONOUS, 0, 0) // EOF + CreateMockRead(*settings_frame, 0), + MockRead(ASYNC, 0, 2) // EOF }; SpdySessionDependencies session_deps; + session_deps.host_resolver->set_synchronous_mode(true); - StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); - data.set_connect_data(connect_data); - session_deps.socket_factory->AddSocketDataProvider(&data); + scoped_refptr<DeterministicSocketData> data = + new DeterministicSocketData(reads, arraysize(reads), NULL, 0); + data->set_connect_data(connect_data); + session_deps.deterministic_socket_factory->AddSocketDataProvider(data); SSLSocketDataProvider ssl(SYNCHRONOUS, OK); session_deps.socket_factory->AddSSLSocketDataProvider(&ssl); scoped_refptr<HttpNetworkSession> http_session( - SpdySessionDependencies::SpdyCreateSession(&session_deps)); + SpdySessionDependencies::SpdyCreateSessionDeterministic(&session_deps)); const std::string kTestHost("www.foo.com"); const int kTestPort = 80; @@ -1248,6 +1250,7 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { callback1.callback())); EXPECT_NE(spdy_stream1->send_window_size(), window_size); + data->RunFor(1); // Process the SETTINGS frame, but not the EOF MessageLoop::current()->RunAllPending(); EXPECT_EQ(session->initial_send_window_size(), window_size); EXPECT_EQ(spdy_stream1->send_window_size(), window_size); @@ -1272,8 +1275,8 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { // Construct the request. MockConnect connect_data(SYNCHRONOUS, OK); - scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST)); MockWrite writes[] = { CreateMockWrite(*req1, 2), CreateMockWrite(*req2, 1), @@ -1337,14 +1340,14 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { GURL url1("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1, BoundNetLog(), callback1.callback())); - EXPECT_EQ(1u, spdy_stream1->stream_id()); + EXPECT_EQ(0u, spdy_stream1->stream_id()); scoped_refptr<SpdyStream> spdy_stream2; TestCompletionCallback callback2; GURL url2("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2, BoundNetLog(), callback2.callback())); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(0u, spdy_stream2->stream_id()); linked_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); (*headers)[":method"] = "GET"; @@ -1362,8 +1365,8 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { spdy_stream2->SendRequest(false); MessageLoop::current()->RunAllPending(); - EXPECT_EQ(1u, spdy_stream1->stream_id()); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(3u, spdy_stream1->stream_id()); + EXPECT_EQ(1u, spdy_stream2->stream_id()); spdy_stream1->Cancel(); spdy_stream1 = NULL; diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 09a184b..3ad7544 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -50,11 +50,10 @@ bool ContainsUpperAscii(const std::string& str) { } // namespace SpdyStream::SpdyStream(SpdySession* session, - SpdyStreamId stream_id, bool pushed, const BoundNetLog& net_log) : continue_buffering_data_(true), - stream_id_(stream_id), + stream_id_(0), priority_(HIGHEST), slot_(0), stalled_by_flow_control_(false), @@ -77,8 +76,77 @@ SpdyStream::SpdyStream(SpdySession* session, domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) { } +class SpdyStream::SpdyStreamIOBufferProducer + : public SpdySession::SpdyIOBufferProducer { + public: + SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {} + + // SpdyFrameProducer + virtual RequestPriority GetPriority() const OVERRIDE { + return stream_->priority(); + } + + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { + if (stream_->stream_id() == 0) + SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_); + SpdyFrame* frame = stream_->ProduceNextFrame(); + return frame == NULL ? NULL : + SpdySession::SpdyIOBufferProducer::CreateIOBuffer( + frame, GetPriority(), stream_); + } + + private: + scoped_refptr<SpdyStream> stream_; +}; + +void SpdyStream::SetHasWriteAvailable() { + session_->SetStreamHasWriteAvailable(this, + new SpdyStreamIOBufferProducer(this)); +} + +SpdyFrame* SpdyStream::ProduceNextFrame() { + if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) { + CHECK(request_.get()); + CHECK_GT(stream_id_, 0u); + + std::string origin = GetUrl().GetOrigin().spec(); + DCHECK(origin[origin.length() - 1] == '/'); + origin.erase(origin.length() - 1); // Trim trailing slash. + SpdyCredentialControlFrame* frame = session_->CreateCredentialFrame( + origin, domain_bound_cert_type_, domain_bound_private_key_, + domain_bound_cert_, priority_); + return frame; + } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) { + CHECK(request_.get()); + CHECK_GT(stream_id_, 0u); + + SpdyControlFlags flags = + has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; + SpdySynStreamControlFrame* frame = session_->CreateSynStream( + stream_id_, priority_, slot_, flags, request_); + send_time_ = base::TimeTicks::Now(); + return frame; + } else { + CHECK(!cancelled()); + // We must need to write stream data. + // Until the headers have been completely sent, we can not be sure + // that our stream_id is correct. + DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); + DCHECK_GT(stream_id_, 0u); + DCHECK(!pending_data_frames_.empty()); + SpdyFrame* frame = pending_data_frames_.front(); + pending_data_frames_.pop_front(); + return frame; + } +} + SpdyStream::~SpdyStream() { UpdateHistograms(); + while (!pending_data_frames_.empty()) { + SpdyFrame* frame = pending_data_frames_.back(); + pending_data_frames_.pop_back(); + delete frame; + } } void SpdyStream::SetDelegate(Delegate* delegate) { @@ -472,7 +540,10 @@ void SpdyStream::Cancel() { } void SpdyStream::Close() { - session_->CloseStream(stream_id_, net::OK); + if (stream_id_ != 0) + session_->CloseStream(stream_id_, net::OK); + else + session_->CloseCreatedStream(this, OK); } int SpdyStream::SendRequest(bool has_upload_data) { @@ -499,7 +570,13 @@ int SpdyStream::WriteStreamData(IOBuffer* data, int length, // Until the headers have been completely sent, we can not be sure // that our stream_id is correct. DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); - return session_->WriteStreamData(stream_id_, data, length, flags); + CHECK_GT(stream_id_, 0u); + + pending_data_frames_.push_back( + session_->CreateDataFrame(stream_id_, data, length, flags)); + + SetHasWriteAvailable(); + return ERR_IO_PENDING; } bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, @@ -643,14 +720,8 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) { int SpdyStream::DoSendDomainBoundCert() { io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; CHECK(request_.get()); - std::string origin = GetUrl().GetOrigin().spec(); - origin.erase(origin.length() - 1); // trim trailing slash - int rv = session_->WriteCredentialFrame( - origin, domain_bound_cert_type_, domain_bound_private_key_, - domain_bound_cert_, priority_); - if (rv != ERR_IO_PENDING) - return rv; - return OK; + SetHasWriteAvailable(); + return ERR_IO_PENDING; } int SpdyStream::DoSendDomainBoundCertComplete(int result) { @@ -664,18 +735,7 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) { int SpdyStream::DoSendHeaders() { CHECK(!cancelled_); - SpdyControlFlags flags = CONTROL_FLAG_NONE; - if (!has_upload_data_) - flags = CONTROL_FLAG_FIN; - - CHECK(request_.get()); - int result = session_->WriteSynStream( - stream_id_, priority_, slot_, flags, - request_); - if (result != ERR_IO_PENDING) - return result; - - send_time_ = base::TimeTicks::Now(); + SetHasWriteAvailable(); io_state_ = STATE_SEND_HEADERS_COMPLETE; return ERR_IO_PENDING; } diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index dddc0e7..ea85ce4 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -6,6 +6,7 @@ #define NET_SPDY_SPDY_STREAM_H_ #pragma once +#include <list> #include <string> #include <vector> @@ -24,12 +25,12 @@ #include "net/socket/ssl_client_socket.h" #include "net/spdy/spdy_framer.h" #include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_session.h" namespace net { class AddressList; class IPEndPoint; -class SpdySession; class SSLCertRequestInfo; class SSLInfo; @@ -95,7 +96,6 @@ class NET_EXPORT_PRIVATE SpdyStream // SpdyStream constructor SpdyStream(SpdySession* session, - SpdyStreamId stream_id, bool pushed, const BoundNetLog& net_log); @@ -260,6 +260,8 @@ class NET_EXPORT_PRIVATE SpdyStream int GetProtocolVersion() const; private: + class SpdyStreamIOBufferProducer; + enum State { STATE_NONE, STATE_GET_DOMAIN_BOUND_CERT, @@ -308,6 +310,14 @@ class NET_EXPORT_PRIVATE SpdyStream // the MessageLoop to replay all the data that the server has already sent. void PushedStreamReplayData(); + // Informs the SpdySession that this stream has a write available. + void SetHasWriteAvailable(); + + // Returns a newly created SPDY frame owned by the called that contains + // the next frame to be sent by this frame. May return NULL if this + // stream has become stalled on flow control. + SpdyFrame* ProduceNextFrame(); + // There is a small period of time between when a server pushed stream is // first created, and the pushed data is replayed. Any data received during // this time should continue to be buffered. @@ -343,6 +353,8 @@ class NET_EXPORT_PRIVATE SpdyStream linked_ptr<SpdyHeaderBlock> response_; base::Time response_time_; + std::list<SpdyFrame*> pending_data_frames_; + State io_state_; // Since we buffer the response, we also buffer the response status. diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index 4d7fd48..99ede76 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -277,9 +277,9 @@ TEST_F(SpdyStreamSpdy2Test, PushedStream) { // Conjure up a stream. scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session, - 2, true, net_log); + stream->set_stream_id(2); EXPECT_FALSE(stream->response_received()); EXPECT_FALSE(stream->HasUrl()); @@ -408,10 +408,10 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); - const SpdyStreamId stream_id = stream->stream_id(); - EXPECT_EQ(OK, callback.WaitForResult()); + const SpdyStreamId stream_id = stream->stream_id(); + EXPECT_TRUE(delegate->send_headers_completed()); EXPECT_EQ("200", (*delegate->response())["status"]); EXPECT_EQ("HTTP/1.1", (*delegate->response())["version"]); diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index b7db4e2..0dbaf2e 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -278,9 +278,9 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) { // Conjure up a stream. scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session, - 2, true, net_log); + stream->set_stream_id(2); EXPECT_FALSE(stream->response_received()); EXPECT_FALSE(stream->HasUrl()); @@ -413,10 +413,10 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); - const SpdyStreamId stream_id = stream->stream_id(); - EXPECT_EQ(OK, callback.WaitForResult()); + const SpdyStreamId stream_id = stream->stream_id(); + EXPECT_TRUE(delegate->send_headers_completed()); EXPECT_EQ("200", (*delegate->response())[":status"]); EXPECT_EQ("HTTP/1.1", (*delegate->response())[":version"]); diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h index d8d85da..4a208b9 100644 --- a/net/spdy/spdy_websocket_stream.h +++ b/net/spdy/spdy_websocket_stream.h @@ -84,6 +84,8 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream virtual void set_chunk_callback(ChunkCallback* callback) OVERRIDE; private: + friend class SpdyWebSocketStreamSpdy2Test; + friend class SpdyWebSocketStreamSpdy3Test; FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy2Test, Basic); FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy3Test, Basic); diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc index 5f468c9..d6e75a6 100644 --- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc @@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test { OrderedSocketData* data() { return data_.get(); } void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { + // Record the actual stream_id. + created_stream_id_ = websocket_stream_->stream_->stream_id(); websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); } @@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test { scoped_refptr<TransportSocketParams> transport_params_; scoped_ptr<SpdyWebSocketStream> websocket_stream_; SpdyStreamId stream_id_; + SpdyStreamId created_stream_id_; scoped_ptr<SpdyFrame> request_frame_; scoped_ptr<SpdyFrame> response_frame_; scoped_ptr<SpdyFrame> message_frame_; @@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, Basic) { ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); ASSERT_TRUE(websocket_stream_->stream_); - EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); SendRequest(); completion_callback_.WaitForResult(); + EXPECT_EQ(stream_id_, created_stream_id_); + websocket_stream_.reset(); const std::vector<SpdyWebSocketStreamEvent>& events = @@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, DestructionAfterExplicitClose) { } TEST_F(SpdyWebSocketStreamSpdy2Test, IOPending) { - Prepare(3); + Prepare(1); scoped_ptr<SpdyFrame> settings_frame( ConstructSpdySettings(spdy_settings_to_send_)); MockWrite writes[] = { diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc index 5a21e66..fcc5ccb 100644 --- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc @@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test { OrderedSocketData* data() { return data_.get(); } void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { + // Record the actual stream_id. + created_stream_id_ = websocket_stream_->stream_->stream_id(); websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); } @@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test { scoped_refptr<TransportSocketParams> transport_params_; scoped_ptr<SpdyWebSocketStream> websocket_stream_; SpdyStreamId stream_id_; + SpdyStreamId created_stream_id_; scoped_ptr<SpdyFrame> request_frame_; scoped_ptr<SpdyFrame> response_frame_; scoped_ptr<SpdyFrame> message_frame_; @@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, Basic) { ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); ASSERT_TRUE(websocket_stream_->stream_); - EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); SendRequest(); completion_callback_.WaitForResult(); + EXPECT_EQ(stream_id_, created_stream_id_); + websocket_stream_.reset(); const std::vector<SpdyWebSocketStreamEvent>& events = @@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, DestructionAfterExplicitClose) { } TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) { - Prepare(3); + Prepare(1); scoped_ptr<SpdyFrame> settings_frame( ConstructSpdySettings(spdy_settings_to_send_)); MockWrite writes[] = { @@ -576,7 +580,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) { ASSERT_EQ(ERR_IO_PENDING, websocket_stream_->InitializeStream( url, HIGHEST, net_log)); - // Delete the fist stream to allow create the second stream. + // Delete the first stream to allow create the second stream. block_stream.reset(); ASSERT_EQ(OK, sync_callback_.WaitForResult()); |