diff options
author | rch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-07-20 19:08:01 +0000 |
---|---|---|
committer | rch@chromium.org <rch@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-07-20 19:08:01 +0000 |
commit | f7fe31b2e7f5985792d8abc04ae360190f2b790b (patch) | |
tree | 07106c9676edfb24d569d5330bccc55a8b540ec9 /net/spdy | |
parent | da44ca3b603807ffb10a54bcd3b6a9f2330f39b4 (diff) | |
download | chromium_src-f7fe31b2e7f5985792d8abc04ae360190f2b790b.zip chromium_src-f7fe31b2e7f5985792d8abc04ae360190f2b790b.tar.gz chromium_src-f7fe31b2e7f5985792d8abc04ae360190f2b790b.tar.bz2 |
Instead of enqueueing SPDY frames, instead enqueue SPDY streams that are ready to produce data. This allows us to lazily allocate a stream id.
The CL was reverted because of memory leaks. Both SpdyIOBufferProducers leaked the SpdyFrame they owned. This version of the CL fixes this defect.
Attempting to re-land 144649
Revert 144655 - Revert 144649
BUG=111708
TEST=
Review URL: https://chromiumcodereview.appspot.com/10803041
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@147692 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/spdy')
-rw-r--r-- | net/spdy/spdy_io_buffer.cc | 15 | ||||
-rw-r--r-- | net/spdy/spdy_io_buffer.h | 10 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy2_unittest.cc | 41 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy3_unittest.cc | 41 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 250 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 88 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy2_unittest.cc | 22 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 37 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 109 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 16 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.h | 2 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_spdy2_unittest.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_spdy3_unittest.cc | 10 |
15 files changed, 456 insertions, 205 deletions
diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc index 5720f7d..f0c14b1 100644 --- a/net/spdy/spdy_io_buffer.cc +++ b/net/spdy/spdy_io_buffer.cc @@ -20,8 +20,23 @@ SpdyIOBuffer::SpdyIOBuffer( SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) { } +SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) { + buffer_ = rhs.buffer_; + priority_ = rhs.priority_; + position_ = rhs.position_; + stream_ = rhs.stream_; +} + SpdyIOBuffer::~SpdyIOBuffer() {} +SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) { + buffer_ = rhs.buffer_; + priority_ = rhs.priority_; + position_ = rhs.position_; + stream_ = rhs.stream_; + return *this; +} + void SpdyIOBuffer::release() { buffer_ = NULL; stream_ = NULL; diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h index e4033c2..9bbee16 100644 --- a/net/spdy/spdy_io_buffer.h +++ b/net/spdy/spdy_io_buffer.h @@ -8,10 +8,12 @@ #include "base/memory/ref_counted.h" #include "net/base/io_buffer.h" #include "net/base/net_export.h" -#include "net/spdy/spdy_stream.h" +#include "net/base/request_priority.h" namespace net { +class SpdyStream; + // A class for managing SPDY IO buffers. These buffers need to be prioritized // so that the SpdySession sends them in the right order. Further, they need // to track the SpdyStream which they are associated with so that incremental @@ -25,8 +27,14 @@ class NET_EXPORT_PRIVATE SpdyIOBuffer { // |stream| is a pointer to the stream which is managing this buffer. SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority, SpdyStream* stream); + // Declare this instead of using the default so that we avoid needing to + // include spdy_stream.h. + SpdyIOBuffer(const SpdyIOBuffer& rhs); SpdyIOBuffer(); ~SpdyIOBuffer(); + // Declare this instead of using the default so that we avoid needing to + // include spdy_stream.h. + SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs); // Accessors. DrainableIOBuffer* buffer() const { return buffer_; } diff --git a/net/spdy/spdy_network_transaction_spdy2_unittest.cc b/net/spdy/spdy_network_transaction_spdy2_unittest.cc index 639c818..bb3d6d0 100644 --- a/net/spdy/spdy_network_transaction_spdy2_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy2_unittest.cc @@ -1709,21 +1709,35 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0)); scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { - CreateMockRead(*stream_reply, 2), - CreateMockRead(*stream_body, 3), - MockRead(SYNCHRONOUS, 0, 0) // EOF + CreateMockRead(*stream_reply, 1), + MockRead(ASYNC, 0, 3) // EOF }; - DelayedSocketData data(0, reads, arraysize(reads), NULL, 0); - NormalSpdyTransactionHelper helper(request, + scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0)); + scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true)); + MockRead writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*body, 2), + }; + + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + NormalSpdyTransactionHelper helper(CreatePostRequest(), BoundNetLog(), GetParam(), NULL); + helper.SetDeterministic(); helper.RunPreTestSetup(); - helper.AddData(&data); - helper.RunDefaultTest(); - helper.VerifyDataConsumed(); + helper.AddDeterministicData(&data); + HttpNetworkTransaction* trans = helper.trans(); - TransactionHelperResult out = helper.output(); - EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); + TestCompletionCallback callback; + int rv = trans->Start( + &CreatePostRequest(), callback.callback(), BoundNetLog()); + EXPECT_EQ(ERR_IO_PENDING, rv); + + data.RunFor(2); + rv = callback.WaitForResult(); + EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); + data.RunFor(1); } // The client upon cancellation tries to send a RST_STREAM frame. The mock @@ -5451,10 +5465,11 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, OutOfOrderSynStream) { // This first request will start to establish the SpdySession. // Then we will start the second (MEDIUM priority) and then third // (HIGHEST priority) request in such a way that the third will actually - // start before the second, causing the second to be re-numbered. + // start before the second, causing the second to be numbered differently + // than the order they were created. scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM)); - scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM)); MockWrite writes[] = { CreateMockWrite(*req1, 0), CreateMockWrite(*req2, 3), diff --git a/net/spdy/spdy_network_transaction_spdy3_unittest.cc b/net/spdy/spdy_network_transaction_spdy3_unittest.cc index ff9437e..ef264bb 100644 --- a/net/spdy/spdy_network_transaction_spdy3_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy3_unittest.cc @@ -1715,21 +1715,35 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0)); scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { - CreateMockRead(*stream_reply, 2), - CreateMockRead(*stream_body, 3), - MockRead(SYNCHRONOUS, 0, 0) // EOF + CreateMockRead(*stream_reply, 1), + MockRead(ASYNC, 0, 3) // EOF }; - DelayedSocketData data(0, reads, arraysize(reads), NULL, 0); - NormalSpdyTransactionHelper helper(request, + scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0)); + scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true)); + MockRead writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*body, 2), + }; + + DeterministicSocketData data(reads, arraysize(reads), + writes, arraysize(writes)); + NormalSpdyTransactionHelper helper(CreatePostRequest(), BoundNetLog(), GetParam(), NULL); + helper.SetDeterministic(); helper.RunPreTestSetup(); - helper.AddData(&data); - helper.RunDefaultTest(); - helper.VerifyDataConsumed(); + helper.AddDeterministicData(&data); + HttpNetworkTransaction* trans = helper.trans(); - TransactionHelperResult out = helper.output(); - EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); + TestCompletionCallback callback; + int rv = trans->Start( + &CreatePostRequest(), callback.callback(), BoundNetLog()); + EXPECT_EQ(ERR_IO_PENDING, rv); + + data.RunFor(2); + rv = callback.WaitForResult(); + EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); + data.RunFor(1); } // The client upon cancellation tries to send a RST_STREAM frame. The mock @@ -6010,10 +6024,11 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, OutOfOrderSynStream) { // This first request will start to establish the SpdySession. // Then we will start the second (MEDIUM priority) and then third // (HIGHEST priority) request in such a way that the third will actually - // start before the second, causing the second to be re-numbered. + // start before the second, causing the second to be numbered differently + // than they order they were created. scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM)); - scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM)); MockWrite writes[] = { CreateMockWrite(*req1, 0), CreateMockWrite(*req2, 3), diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index d0190df..2741d8ac 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -192,6 +192,28 @@ static ExternalTimeFunc g_time_func = base::TimeTicks::Now; } // namespace // static +void SpdySession::SpdyIOBufferProducer::ActivateStream( + SpdySession* spdy_session, + SpdyStream* spdy_stream) { + spdy_session->ActivateStream(spdy_stream); +} + +// static +SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( + SpdyFrame* frame, + RequestPriority priority, + SpdyStream* stream) { + size_t size = frame->length() + SpdyFrame::kHeaderSize; + DCHECK_GT(size, 0u); + + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), frame->data(), size); + + return new SpdyIOBuffer(buffer, size, priority, stream); +} + +// static void SpdySession::set_default_protocol(NextProto default_protocol) { g_default_protocol = default_protocol; } @@ -387,6 +409,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { ssl_info.cert->VerifyNameMatch(domain); } +void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, + SpdyIOBufferProducer* producer) { + write_queue_.push(producer); + stream_producers_[producer] = stream; + WriteSocketLater(); +} + int SpdySession::GetPushStream( const GURL& url, scoped_refptr<SpdyStream>* stream, @@ -425,7 +454,8 @@ int SpdySession::CreateStream( const BoundNetLog& stream_net_log, const CompletionCallback& callback) { if (!max_concurrent_streams_ || - active_streams_.size() < max_concurrent_streams_) { + (active_streams_.size() + created_streams_.size() < + max_concurrent_streams_)) { return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); } @@ -515,10 +545,7 @@ int SpdySession::CreateStreamImpl( const std::string& path = url.PathForRequest(); - const SpdyStreamId stream_id = GetNewStreamId(); - *spdy_stream = new SpdyStream(this, - stream_id, false, stream_net_log); const scoped_refptr<SpdyStream>& stream = *spdy_stream; @@ -527,14 +554,13 @@ int SpdySession::CreateStreamImpl( stream->set_path(path); stream->set_send_window_size(initial_send_window_size_); stream->set_recv_window_size(initial_recv_window_size_); - ActivateStream(stream); + created_streams_.insert(stream); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", static_cast<int>(priority), 0, 10, 11); // TODO(mbelshe): Optimize memory allocations - DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); return OK; } @@ -556,15 +582,13 @@ int SpdySession::GetProtocolVersion() const { return buffered_spdy_framer_->protocol_version(); } -int SpdySession::WriteSynStream( +SpdySynStreamControlFrame* SpdySession::CreateSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, SpdyControlFlags flags, const SpdyHeaderBlock& headers) { - // Find our stream - if (!IsStreamActive(stream_id)) - return ERR_INVALID_SPDY_STREAM; + CHECK(IsStreamActive(stream_id)); const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; CHECK_EQ(stream->stream_id(), stream_id); @@ -575,11 +599,7 @@ int SpdySession::WriteSynStream( buffered_spdy_framer_->CreateSynStream( stream_id, 0, ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), - credential_slot, flags, false, &headers)); - // We enqueue all SYN_STREAM frames at the same priority to ensure - // that we do not send them out-of-order. - // http://crbug.com/111708 - QueueFrame(syn_frame.get(), HIGHEST, stream); + credential_slot, flags, true, &headers)); base::StatsCounter spdy_requests("spdy.requests"); spdy_requests.Increment(); @@ -594,14 +614,15 @@ int SpdySession::WriteSynStream( stream_id, 0)); } - return ERR_IO_PENDING; + return syn_frame.release(); } -int SpdySession::WriteCredentialFrame(const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority) { +SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame( + const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority) { DCHECK(is_secure_); unsigned char secret[32]; // 32 bytes from the spec GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof", @@ -643,24 +664,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin, DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyCredentialControlFrame> credential_frame( buffered_spdy_framer_->CreateCredentialFrame(credential)); - // We enqueue all SYN_STREAM frames at the same priority to ensure - // that we do not send them out-of-order, which means that we need - // to enqueue all CREDENTIAL frames at this priority to ensure that - // they are sent *before* the SYN_STREAM that references them. - // http://crbug.com/111708 - QueueFrame(credential_frame.get(), HIGHEST, NULL); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); } - return ERR_IO_PENDING; + return credential_frame.release(); } -int SpdySession::WriteStreamData(SpdyStreamId stream_id, - net::IOBuffer* data, int len, - SpdyDataFlags flags) { +SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, + net::IOBuffer* data, int len, + SpdyDataFlags flags) { // Find our stream CHECK(IsStreamActive(stream_id)); scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; @@ -684,7 +699,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return ERR_IO_PENDING; + return NULL; } int new_len = std::min(len, stream->send_window_size()); if (new_len < len) { @@ -709,18 +724,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, scoped_ptr<SpdyDataFrame> frame( buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), len, flags)); - QueueFrame(frame.get(), stream->priority(), stream); - return ERR_IO_PENDING; + return frame.release(); } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { + DCHECK_NE(0u, stream_id); // TODO(mbelshe): We should send a RST_STREAM control frame here // so that the server can cancel a large send. DeleteStream(stream_id, status); } +void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) { + DCHECK_EQ(0u, stream->stream_id()); + created_streams_.erase(scoped_refptr<SpdyStream>(stream)); +} + void SpdySession::ResetStream(SpdyStreamId stream_id, SpdyStatusCodes status, const std::string& description) { @@ -738,7 +758,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; priority = stream->priority(); } - QueueFrame(rst_frame.get(), priority, NULL); + QueueFrame(rst_frame.release(), priority); RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); @@ -926,46 +946,22 @@ void SpdySession::WriteSocket() { // Loop sending frames until we've sent everything or until the write // returns error (or ERR_IO_PENDING). DCHECK(buffered_spdy_framer_.get()); - while (in_flight_write_.buffer() || !queue_.empty()) { + while (in_flight_write_.buffer() || !write_queue_.empty()) { if (!in_flight_write_.buffer()) { - // Grab the next SpdyFrame to send. - SpdyIOBuffer next_buffer = queue_.top(); - queue_.pop(); - - // We've deferred compression until just before we write it to the socket, - // which is now. At this time, we don't compress our data frames. - SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); - size_t size; - if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) { - DCHECK(uncompressed_frame.is_control_frame()); - const SpdyControlFrame* uncompressed_control_frame = - reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame); - scoped_ptr<SpdyFrame> compressed_frame( - buffered_spdy_framer_->CompressControlFrame( - *uncompressed_control_frame)); - if (!compressed_frame.get()) { - RecordProtocolErrorHistogram( - PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE); - CloseSessionOnError( - net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure."); - return; - } - - size = compressed_frame->length() + SpdyFrame::kHeaderSize; - - DCHECK_GT(size, 0u); - - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), compressed_frame->data(), size); - - // Attempt to send the frame. - in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST, - next_buffer.stream()); - } else { - size = uncompressed_frame.length() + SpdyFrame::kHeaderSize; - in_flight_write_ = next_buffer; - } + // Grab the next SpdyBuffer to send. + scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); + scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); + stream_producers_.erase(producer.get()); + write_queue_.pop(); + // It is possible that a stream had data to write, but a + // WINDOW_UPDATE frame has been received which made that + // stream no longer writable. + // TODO(rch): consider handling that case by removing the + // stream from the writable queue? + if (buffer == NULL) + continue; + + in_flight_write_ = *buffer; } else { DCHECK(in_flight_write_.buffer()->BytesRemaining()); } @@ -1012,16 +1008,33 @@ void SpdySession::CloseAllStreams(net::Error status) { while (!active_streams_.empty()) { ActiveStreamMap::iterator it = active_streams_.begin(); const scoped_refptr<SpdyStream>& stream = it->second; - DCHECK(stream); - std::string description = base::StringPrintf( - "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); - stream->LogStreamError(status, description); + LogAbandonedStream(stream, status); DeleteStream(stream->stream_id(), status); } + while (!created_streams_.empty()) { + CreatedStreamSet::iterator it = created_streams_.begin(); + const scoped_refptr<SpdyStream>& stream = *it; + LogAbandonedStream(stream, status); + stream->OnClose(status); + created_streams_.erase(it); + } + // We also need to drain the queue. - while (queue_.size()) - queue_.pop(); + while (!write_queue_.empty()) { + SpdyIOBufferProducer* producer = write_queue_.top(); + stream_producers_.erase(producer); + delete producer; + write_queue_.pop(); + } +} + +void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, + net::Error status) { + DCHECK(stream); + std::string description = base::StringPrintf( + "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); + stream->LogStreamError(status, description); } int SpdySession::GetNewStreamId() { @@ -1032,17 +1045,6 @@ int SpdySession::GetNewStreamId() { return id; } -void SpdySession::QueueFrame(SpdyFrame* frame, - RequestPriority priority, - SpdyStream* stream) { - int length = SpdyFrame::kHeaderSize + frame->length(); - IOBuffer* buffer = new IOBuffer(length); - memcpy(buffer->data(), frame->data(), length); - queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); - - WriteSocketLater(); -} - void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool, const std::string& description) { @@ -1129,7 +1131,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { return connection_->socket()->GetLocalAddress(address); } +class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { + public: + SimpleSpdyIOBufferProducer(SpdyFrame* frame, + RequestPriority priority) + : frame_(frame), + priority_(priority) { + } + + virtual RequestPriority GetPriority() const OVERRIDE { + return priority_; + } + + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) { + return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( + frame_.get(), priority_, NULL); + } + + private: + scoped_ptr<SpdyFrame> frame_; + RequestPriority priority_; +}; + +void SpdySession::QueueFrame(SpdyFrame* frame, + RequestPriority priority) { + SimpleSpdyIOBufferProducer* producer = + new SimpleSpdyIOBufferProducer(frame, priority); + write_queue_.push(producer); + WriteSocketLater(); +} + void SpdySession::ActivateStream(SpdyStream* stream) { + if (stream->stream_id() == 0) { + stream->set_stream_id(GetNewStreamId()); + created_streams_.erase(scoped_refptr<SpdyStream>(stream)); + } const SpdyStreamId id = stream->stream_id(); DCHECK(!IsStreamActive(id)); @@ -1157,6 +1193,19 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { if (it2 == active_streams_.end()) return; + // Possibly remove from the write queue. + WriteQueue old = write_queue_; + write_queue_ = WriteQueue(); + while (!old.empty()) { + SpdyIOBufferProducer* producer = old.top(); + StreamProducerMap::iterator it = stream_producers_.find(producer); + if (it == stream_producers_.end() || it->second->stream_id() != id) + write_queue_.push(producer); + else + delete producer; + old.pop(); + } + // If this is an active stream, call the callback. const scoped_refptr<SpdyStream> stream(it2->second); active_streams_.erase(it2); @@ -1377,8 +1426,8 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, return; } - scoped_refptr<SpdyStream> stream( - new SpdyStream(this, stream_id, true, net_log_)); + scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_)); + stream->set_stream_id(stream_id); stream->set_path(gurl.PathForRequest()); stream->set_send_window_size(initial_send_window_size_); @@ -1615,7 +1664,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id, DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame( buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); - QueueFrame(window_update_frame.get(), stream->priority(), NULL); + QueueFrame(window_update_frame.release(), stream->priority()); } // Given a cwnd that we would have sent to the server, modify it based on the @@ -1654,7 +1703,6 @@ void SpdySession::SendInitialSettings() { settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_); } - sent_settings_ = true; SendSettings(settings_map); } @@ -1687,7 +1735,6 @@ void SpdySession::SendInitialSettings() { HandleSetting(new_id, new_val); } - sent_settings_ = true; SendSettings(settings_map_new); } @@ -1701,7 +1748,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdySettingsControlFrame> settings_frame( buffered_spdy_framer_->CreateSettings(settings)); - QueueFrame(settings_frame.get(), HIGHEST, NULL); + sent_settings_ = true; + QueueFrame(settings_frame.release(), HIGHEST); } void SpdySession::HandleSetting(uint32 id, uint32 value) { @@ -1736,6 +1784,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { DCHECK(stream); stream->AdjustSendWindowSize(delta_window_size); } + + CreatedStreamSet::iterator i; + for (i = created_streams_.begin(); i != created_streams_.end(); i++) { + const scoped_refptr<SpdyStream>& stream = *i; + stream->AdjustSendWindowSize(delta_window_size); + } } void SpdySession::SendPrefacePingIfNoneInFlight() { @@ -1756,7 +1810,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyPingControlFrame> ping_frame( buffered_spdy_framer_->CreatePingFrame(next_ping_id_)); - QueueFrame(ping_frame.get(), HIGHEST, NULL); + QueueFrame(ping_frame.release(), HIGHEST); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 2975203..3d95101 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -19,6 +19,7 @@ #include "net/base/load_states.h" #include "net/base/net_errors.h" #include "net/base/request_priority.h" +#include "net/base/ssl_client_cert_type.h" #include "net/base/ssl_config_service.h" #include "net/base/upload_data_stream.h" #include "net/socket/client_socket_handle.h" @@ -93,6 +94,32 @@ COMPILE_ASSERT(PROTOCOL_ERROR_UNEXPECTED_PING == class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, public BufferedSpdyFramerVisitorInterface { public: + // Defines an interface for producing SpdyIOBuffers. + class NET_EXPORT_PRIVATE SpdyIOBufferProducer { + public: + SpdyIOBufferProducer() {} + + // Returns a newly created SpdyIOBuffer, owned by the caller, or NULL + // if not buffer is ready to be produced. + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0; + + virtual RequestPriority GetPriority() const = 0; + + virtual ~SpdyIOBufferProducer() {} + + protected: + // Activates |spdy_stream| in |spdy_session|. + static void ActivateStream(SpdySession* spdy_session, + SpdyStream* spdy_stream); + + static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame, + RequestPriority priority, + SpdyStream* spdy_stream); + + private: + DISALLOW_COPY_AND_ASSIGN(SpdyIOBufferProducer); + }; + // Create a new SpdySession. // |host_port_proxy_pair| is the host/port that this session connects to, and // the proxy configuration settings that it's using. @@ -157,9 +184,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // authentication now. bool VerifyDomainAuthentication(const std::string& domain); + // Records that |stream| has a write available from |producer|. + // |producer| will be owned by this SpdySession. + void SetStreamHasWriteAvailable(SpdyStream* stream, + SpdyIOBufferProducer* producer); + // Send the SYN frame for |stream_id|. This also sends PING message to check // the status of the connection. - int WriteSynStream( + SpdySynStreamControlFrame* CreateSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, @@ -167,21 +199,24 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, const SpdyHeaderBlock& headers); // Write a CREDENTIAL frame to the session. - int WriteCredentialFrame(const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority); + SpdyCredentialControlFrame* CreateCredentialFrame(const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority); // Write a data frame to the stream. // Used to create and queue a data frame for the given stream. - int WriteStreamData(SpdyStreamId stream_id, net::IOBuffer* data, - int len, - SpdyDataFlags flags); + SpdyDataFrame* CreateDataFrame(SpdyStreamId stream_id, + net::IOBuffer* data, int len, + SpdyDataFlags flags); // Close a stream. void CloseStream(SpdyStreamId stream_id, int status); + // Close a stream that has been created but is not yet active. + void CloseCreatedStream(SpdyStream* stream, int status); + // Reset a stream by sending a RST_STREAM frame with given status code. // Also closes the stream. Was not piggybacked to CloseStream since not // all of the calls to CloseStream necessitate sending a RST_STREAM. @@ -268,7 +303,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Returns true if session is not currently active bool is_active() const { - return !active_streams_.empty(); + return !active_streams_.empty() || !created_streams_.empty(); } // Access to the number of active and pending streams. These are primarily @@ -277,6 +312,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, size_t num_unclaimed_pushed_streams() const { return unclaimed_pushed_streams_.size(); } + size_t num_created_streams() const { return created_streams_.size(); } // Returns true if flow control is enabled for the session. bool is_flow_control_enabled() const { @@ -356,6 +392,19 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> > PushedStreamMap; typedef std::priority_queue<SpdyIOBuffer> OutputQueue; + typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet; + typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap; + class SpdyIOBufferProducerCompare { + public: + bool operator() (const SpdyIOBufferProducer* lhs, + const SpdyIOBufferProducer* rhs) const { + return lhs->GetPriority() < rhs->GetPriority(); + } + }; + typedef std::priority_queue<SpdyIOBufferProducer*, + std::vector<SpdyIOBufferProducer*>, + SpdyIOBufferProducerCompare> WriteQueue; + struct CallbackResultPair { CallbackResultPair(const CompletionCallback& callback_in, int result_in) : callback(callback_in), result(result_in) {} @@ -434,9 +483,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Queue a frame for sending. // |frame| is the frame to send. // |priority| is the priority for insertion into the queue. - // |stream| is the stream which this IO is associated with (or NULL). - void QueueFrame(SpdyFrame* frame, RequestPriority priority, - SpdyStream* stream); + void QueueFrame(SpdyFrame* frame, RequestPriority priority); // Track active streams in the active stream list. void ActivateStream(SpdyStream* stream); @@ -462,6 +509,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Closes all streams. Used as part of shutdown. void CloseAllStreams(net::Error status); + void LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, + net::Error status); + // Invokes a user callback for stream creation. We provide this method so it // can be deferred to the MessageLoop, so we avoid re-entrancy problems. void InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream>* stream); @@ -580,8 +630,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // server, but do not have consumers yet. PushedStreamMap unclaimed_pushed_streams_; - // As we gather data to be sent, we put it into the output queue. - OutputQueue queue_; + // Set of all created streams but that have not yet sent any frames. + CreatedStreamSet created_streams_; + + // As streams have data to be sent, we put them into the write queue. + WriteQueue write_queue_; + + // Mapping from SpdyIOBufferProducers to their corresponding SpdyStream + // so that when a stream is destroyed, we can remove the corresponding + // producer from |write_queue_|. + StreamProducerMap stream_producers_; // The packet we are currently sending. bool write_pending_; // Will be true when a write is in progress. diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index cd971d2..9597289 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -291,7 +291,7 @@ TEST_F(SpdySessionSpdy2Test, DeleteExpiredPushStreams) { (*request_headers)["url"] = "/"; scoped_refptr<SpdyStream> stream( - new SpdyStream(session, 1, false, session->net_log_)); + new SpdyStream(session, false, session->net_log_)); stream->set_spdy_headers(request_headers.Pass()); session->ActivateStream(stream); @@ -398,7 +398,7 @@ TEST_F(SpdySessionSpdy2Test, FailedPing) { // Assert session is not closed. EXPECT_FALSE(session->IsClosed()); - EXPECT_LT(0u, session->num_active_streams()); + EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); EXPECT_TRUE(spdy_session_pool->HasSession(pair)); // We set last time we have received any data in 1 sec less than now. @@ -518,8 +518,8 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) { // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active - session1->CloseStream(spdy_stream1->stream_id(), OK); - session3->CloseStream(spdy_stream3->stream_id(), OK); + session1->CloseCreatedStream(spdy_stream1, OK); + session3->CloseCreatedStream(spdy_stream3, OK); EXPECT_FALSE(session1->is_active()); EXPECT_FALSE(session1->IsClosed()); EXPECT_TRUE(session2->is_active()); @@ -542,7 +542,7 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) { EXPECT_FALSE(session2->IsClosed()); // Make 2 not active - session2->CloseStream(spdy_stream2->stream_id(), OK); + session2->CloseCreatedStream(spdy_stream2, OK); EXPECT_FALSE(session2->is_active()); EXPECT_FALSE(session2->IsClosed()); @@ -1177,8 +1177,8 @@ TEST_F(SpdySessionSpdy2Test, CloseSessionOnError) { TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { // Construct the request. MockConnect connect_data(SYNCHRONOUS, OK); - scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST)); MockWrite writes[] = { CreateMockWrite(*req1, 2), CreateMockWrite(*req2, 1), @@ -1242,14 +1242,14 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { GURL url1("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1, BoundNetLog(), callback1.callback())); - EXPECT_EQ(1u, spdy_stream1->stream_id()); + EXPECT_EQ(0u, spdy_stream1->stream_id()); scoped_refptr<SpdyStream> spdy_stream2; TestCompletionCallback callback2; GURL url2("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2, BoundNetLog(), callback2.callback())); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(0u, spdy_stream2->stream_id()); scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); (*headers)["method"] = "GET"; @@ -1270,8 +1270,8 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { spdy_stream2->SendRequest(false); MessageLoop::current()->RunAllPending(); - EXPECT_EQ(1u, spdy_stream1->stream_id()); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(3u, spdy_stream1->stream_id()); + EXPECT_EQ(1u, spdy_stream2->stream_id()); spdy_stream1->Cancel(); spdy_stream1 = NULL; diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 12fc829..08d56a3 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -291,7 +291,7 @@ TEST_F(SpdySessionSpdy3Test, DeleteExpiredPushStreams) { (*request_headers)[":path"] = "/"; scoped_refptr<SpdyStream> stream( - new SpdyStream(session, 1, false, session->net_log_)); + new SpdyStream(session, false, session->net_log_)); stream->set_spdy_headers(request_headers.Pass()); session->ActivateStream(stream); @@ -402,7 +402,7 @@ TEST_F(SpdySessionSpdy3Test, FailedPing) { // Assert session is not closed. EXPECT_FALSE(session->IsClosed()); - EXPECT_LT(0u, session->num_active_streams()); + EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); EXPECT_TRUE(spdy_session_pool->HasSession(pair)); // We set last time we have received any data in 1 sec less than now. @@ -522,8 +522,8 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) { // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active - session1->CloseStream(spdy_stream1->stream_id(), OK); - session3->CloseStream(spdy_stream3->stream_id(), OK); + session1->CloseCreatedStream(spdy_stream1, OK); + session3->CloseCreatedStream(spdy_stream3, OK); EXPECT_FALSE(session1->is_active()); EXPECT_FALSE(session1->IsClosed()); EXPECT_TRUE(session2->is_active()); @@ -546,7 +546,7 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) { EXPECT_FALSE(session2->IsClosed()); // Make 2 not active - session2->CloseStream(spdy_stream2->stream_id(), OK); + session2->CloseCreatedStream(spdy_stream2, OK); EXPECT_FALSE(session2->is_active()); EXPECT_FALSE(session2->IsClosed()); @@ -1265,22 +1265,24 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { MockConnect connect_data(SYNCHRONOUS, OK); scoped_ptr<SpdyFrame> settings_frame(ConstructSpdySettings(new_settings)); MockRead reads[] = { - CreateMockRead(*settings_frame), - MockRead(SYNCHRONOUS, 0, 0) // EOF + CreateMockRead(*settings_frame, 0), + MockRead(ASYNC, 0, 2) // EOF }; SpdySessionDependencies session_deps; + session_deps.host_resolver->set_synchronous_mode(true); - StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); - data.set_connect_data(connect_data); - session_deps.socket_factory->AddSocketDataProvider(&data); + scoped_ptr<DeterministicSocketData> data( + new DeterministicSocketData(reads, arraysize(reads), NULL, 0)); + data->set_connect_data(connect_data); + session_deps.deterministic_socket_factory->AddSocketDataProvider(data.get()); SSLSocketDataProvider ssl(SYNCHRONOUS, OK); session_deps.socket_factory->AddSSLSocketDataProvider(&ssl); scoped_refptr<HttpNetworkSession> http_session( - SpdySessionDependencies::SpdyCreateSession(&session_deps)); + SpdySessionDependencies::SpdyCreateSessionDeterministic(&session_deps)); const std::string kTestHost("www.foo.com"); const int kTestPort = 80; @@ -1320,6 +1322,7 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { callback1.callback())); EXPECT_NE(spdy_stream1->send_window_size(), window_size); + data->RunFor(1); // Process the SETTINGS frame, but not the EOF MessageLoop::current()->RunAllPending(); EXPECT_EQ(session->initial_send_window_size(), window_size); EXPECT_EQ(spdy_stream1->send_window_size(), window_size); @@ -1344,8 +1347,8 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { // Construct the request. MockConnect connect_data(SYNCHRONOUS, OK); - scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); + scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST)); MockWrite writes[] = { CreateMockWrite(*req1, 2), CreateMockWrite(*req2, 1), @@ -1409,14 +1412,14 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { GURL url1("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1, BoundNetLog(), callback1.callback())); - EXPECT_EQ(1u, spdy_stream1->stream_id()); + EXPECT_EQ(0u, spdy_stream1->stream_id()); scoped_refptr<SpdyStream> spdy_stream2; TestCompletionCallback callback2; GURL url2("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2, BoundNetLog(), callback2.callback())); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(0u, spdy_stream2->stream_id()); scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); (*headers)[":method"] = "GET"; @@ -1437,8 +1440,8 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { spdy_stream2->SendRequest(false); MessageLoop::current()->RunAllPending(); - EXPECT_EQ(1u, spdy_stream1->stream_id()); - EXPECT_EQ(3u, spdy_stream2->stream_id()); + EXPECT_EQ(3u, spdy_stream1->stream_id()); + EXPECT_EQ(1u, spdy_stream2->stream_id()); spdy_stream1->Cancel(); spdy_stream1 = NULL; diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 4fe3436..f3828d6 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -50,11 +50,10 @@ bool ContainsUpperAscii(const std::string& str) { } // namespace SpdyStream::SpdyStream(SpdySession* session, - SpdyStreamId stream_id, bool pushed, const BoundNetLog& net_log) : continue_buffering_data_(true), - stream_id_(stream_id), + stream_id_(0), priority_(HIGHEST), slot_(0), stalled_by_flow_control_(false), @@ -77,8 +76,78 @@ SpdyStream::SpdyStream(SpdySession* session, domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) { } +class SpdyStream::SpdyStreamIOBufferProducer + : public SpdySession::SpdyIOBufferProducer { + public: + SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {} + + // SpdyFrameProducer + virtual RequestPriority GetPriority() const OVERRIDE { + return stream_->priority(); + } + + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { + if (stream_->stream_id() == 0) + SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_); + frame_.reset(stream_->ProduceNextFrame()); + return frame_ == NULL ? NULL : + SpdySession::SpdyIOBufferProducer::CreateIOBuffer( + frame_.get(), GetPriority(), stream_); + } + + private: + scoped_refptr<SpdyStream> stream_; + scoped_ptr<SpdyFrame> frame_; +}; + +void SpdyStream::SetHasWriteAvailable() { + session_->SetStreamHasWriteAvailable(this, + new SpdyStreamIOBufferProducer(this)); +} + +SpdyFrame* SpdyStream::ProduceNextFrame() { + if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) { + CHECK(request_.get()); + CHECK_GT(stream_id_, 0u); + + std::string origin = GetUrl().GetOrigin().spec(); + DCHECK(origin[origin.length() - 1] == '/'); + origin.erase(origin.length() - 1); // Trim trailing slash. + SpdyCredentialControlFrame* frame = session_->CreateCredentialFrame( + origin, domain_bound_cert_type_, domain_bound_private_key_, + domain_bound_cert_, priority_); + return frame; + } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) { + CHECK(request_.get()); + CHECK_GT(stream_id_, 0u); + + SpdyControlFlags flags = + has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; + SpdySynStreamControlFrame* frame = session_->CreateSynStream( + stream_id_, priority_, slot_, flags, *request_); + send_time_ = base::TimeTicks::Now(); + return frame; + } else { + CHECK(!cancelled()); + // We must need to write stream data. + // Until the headers have been completely sent, we can not be sure + // that our stream_id is correct. + DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); + DCHECK_GT(stream_id_, 0u); + DCHECK(!pending_data_frames_.empty()); + SpdyFrame* frame = pending_data_frames_.front(); + pending_data_frames_.pop_front(); + return frame; + } +} + SpdyStream::~SpdyStream() { UpdateHistograms(); + while (!pending_data_frames_.empty()) { + SpdyFrame* frame = pending_data_frames_.back(); + pending_data_frames_.pop_back(); + delete frame; + } } void SpdyStream::SetDelegate(Delegate* delegate) { @@ -460,7 +529,10 @@ void SpdyStream::Cancel() { } void SpdyStream::Close() { - session_->CloseStream(stream_id_, net::OK); + if (stream_id_ != 0) + session_->CloseStream(stream_id_, net::OK); + else + session_->CloseCreatedStream(this, OK); } int SpdyStream::SendRequest(bool has_upload_data) { @@ -484,7 +556,13 @@ int SpdyStream::WriteStreamData(IOBuffer* data, int length, // Until the headers have been completely sent, we can not be sure // that our stream_id is correct. DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); - return session_->WriteStreamData(stream_id_, data, length, flags); + CHECK_GT(stream_id_, 0u); + + pending_data_frames_.push_back( + session_->CreateDataFrame(stream_id_, data, length, flags)); + + SetHasWriteAvailable(); + return ERR_IO_PENDING; } bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, @@ -628,14 +706,8 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) { int SpdyStream::DoSendDomainBoundCert() { io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; CHECK(request_.get()); - std::string origin = GetUrl().GetOrigin().spec(); - origin.erase(origin.length() - 1); // trim trailing slash - int rv = session_->WriteCredentialFrame( - origin, domain_bound_cert_type_, domain_bound_private_key_, - domain_bound_cert_, priority_); - if (rv != ERR_IO_PENDING) - return rv; - return OK; + SetHasWriteAvailable(); + return ERR_IO_PENDING; } int SpdyStream::DoSendDomainBoundCertComplete(int result) { @@ -649,18 +721,7 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) { int SpdyStream::DoSendHeaders() { CHECK(!cancelled_); - SpdyControlFlags flags = CONTROL_FLAG_NONE; - if (!has_upload_data_) - flags = CONTROL_FLAG_FIN; - - CHECK(request_.get()); - int result = session_->WriteSynStream( - stream_id_, priority_, slot_, flags, - *request_); - if (result != ERR_IO_PENDING) - return result; - - send_time_ = base::TimeTicks::Now(); + SetHasWriteAvailable(); io_state_ = STATE_SEND_HEADERS_COMPLETE; return ERR_IO_PENDING; } diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 40e436a..8653d9f 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -5,6 +5,7 @@ #ifndef NET_SPDY_SPDY_STREAM_H_ #define NET_SPDY_SPDY_STREAM_H_ +#include <list> #include <string> #include <vector> @@ -22,12 +23,12 @@ #include "net/socket/ssl_client_socket.h" #include "net/spdy/spdy_framer.h" #include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_session.h" namespace net { class AddressList; class IPEndPoint; -class SpdySession; class SSLCertRequestInfo; class SSLInfo; @@ -89,7 +90,6 @@ class NET_EXPORT_PRIVATE SpdyStream // SpdyStream constructor SpdyStream(SpdySession* session, - SpdyStreamId stream_id, bool pushed, const BoundNetLog& net_log); @@ -251,6 +251,8 @@ class NET_EXPORT_PRIVATE SpdyStream int GetProtocolVersion() const; private: + class SpdyStreamIOBufferProducer; + enum State { STATE_NONE, STATE_GET_DOMAIN_BOUND_CERT, @@ -300,6 +302,14 @@ class NET_EXPORT_PRIVATE SpdyStream // the MessageLoop to replay all the data that the server has already sent. void PushedStreamReplayData(); + // Informs the SpdySession that this stream has a write available. + void SetHasWriteAvailable(); + + // Returns a newly created SPDY frame owned by the called that contains + // the next frame to be sent by this frame. May return NULL if this + // stream has become stalled on flow control. + SpdyFrame* ProduceNextFrame(); + // There is a small period of time between when a server pushed stream is // first created, and the pushed data is replayed. Any data received during // this time should continue to be buffered. @@ -335,6 +345,8 @@ class NET_EXPORT_PRIVATE SpdyStream scoped_ptr<SpdyHeaderBlock> response_; base::Time response_time_; + std::list<SpdyFrame*> pending_data_frames_; + State io_state_; // Since we buffer the response, we also buffer the response status. diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index 7ffe646..5974e0a 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -206,9 +206,9 @@ TEST_F(SpdyStreamSpdy2Test, PushedStream) { // Conjure up a stream. scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session, - 2, true, net_log); + stream->set_stream_id(2); EXPECT_FALSE(stream->response_received()); EXPECT_FALSE(stream->HasUrl()); @@ -336,10 +336,10 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); - const SpdyStreamId stream_id = stream->stream_id(); - EXPECT_EQ(OK, callback.WaitForResult()); + const SpdyStreamId stream_id = stream->stream_id(); + EXPECT_TRUE(delegate->send_headers_completed()); EXPECT_EQ("200", (*delegate->response())["status"]); EXPECT_EQ("HTTP/1.1", (*delegate->response())["version"]); diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index c23c6df..58d93e9 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -207,9 +207,9 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) { // Conjure up a stream. scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session, - 2, true, net_log); + stream->set_stream_id(2); EXPECT_FALSE(stream->response_received()); EXPECT_FALSE(stream->HasUrl()); @@ -341,10 +341,10 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); - const SpdyStreamId stream_id = stream->stream_id(); - EXPECT_EQ(OK, callback.WaitForResult()); + const SpdyStreamId stream_id = stream->stream_id(); + EXPECT_TRUE(delegate->send_headers_completed()); EXPECT_EQ("200", (*delegate->response())[":status"]); EXPECT_EQ("HTTP/1.1", (*delegate->response())[":version"]); diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h index 4adb37c..900232b 100644 --- a/net/spdy/spdy_websocket_stream.h +++ b/net/spdy/spdy_websocket_stream.h @@ -82,6 +82,8 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream virtual void OnClose(int status) OVERRIDE; private: + friend class SpdyWebSocketStreamSpdy2Test; + friend class SpdyWebSocketStreamSpdy3Test; FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy2Test, Basic); FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy3Test, Basic); diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc index 99264e1..f5d5b58 100644 --- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc @@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test { OrderedSocketData* data() { return data_.get(); } void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { + // Record the actual stream_id. + created_stream_id_ = websocket_stream_->stream_->stream_id(); websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); } @@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test { scoped_refptr<TransportSocketParams> transport_params_; scoped_ptr<SpdyWebSocketStream> websocket_stream_; SpdyStreamId stream_id_; + SpdyStreamId created_stream_id_; scoped_ptr<SpdyFrame> request_frame_; scoped_ptr<SpdyFrame> response_frame_; scoped_ptr<SpdyFrame> message_frame_; @@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, Basic) { ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); ASSERT_TRUE(websocket_stream_->stream_); - EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); SendRequest(); completion_callback_.WaitForResult(); + EXPECT_EQ(stream_id_, created_stream_id_); + websocket_stream_.reset(); const std::vector<SpdyWebSocketStreamEvent>& events = @@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, DestructionAfterExplicitClose) { } TEST_F(SpdyWebSocketStreamSpdy2Test, IOPending) { - Prepare(3); + Prepare(1); scoped_ptr<SpdyFrame> settings_frame( ConstructSpdySettings(spdy_settings_to_send_)); MockWrite writes[] = { diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc index c6e1717..167d6af 100644 --- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc @@ -165,6 +165,8 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test { OrderedSocketData* data() { return data_.get(); } void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { + // Record the actual stream_id. + created_stream_id_ = websocket_stream_->stream_->stream_id(); websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); } @@ -299,6 +301,7 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test { scoped_refptr<TransportSocketParams> transport_params_; scoped_ptr<SpdyWebSocketStream> websocket_stream_; SpdyStreamId stream_id_; + SpdyStreamId created_stream_id_; scoped_ptr<SpdyFrame> request_frame_; scoped_ptr<SpdyFrame> response_frame_; scoped_ptr<SpdyFrame> message_frame_; @@ -358,12 +361,13 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, Basic) { ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); ASSERT_TRUE(websocket_stream_->stream_); - EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); SendRequest(); completion_callback_.WaitForResult(); + EXPECT_EQ(stream_id_, created_stream_id_); + websocket_stream_.reset(); const std::vector<SpdyWebSocketStreamEvent>& events = @@ -523,7 +527,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, DestructionAfterExplicitClose) { } TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) { - Prepare(3); + Prepare(1); scoped_ptr<SpdyFrame> settings_frame( ConstructSpdySettings(spdy_settings_to_send_)); MockWrite writes[] = { @@ -576,7 +580,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) { ASSERT_EQ(ERR_IO_PENDING, websocket_stream_->InitializeStream( url, HIGHEST, net_log)); - // Delete the fist stream to allow create the second stream. + // Delete the first stream to allow create the second stream. block_stream.reset(); ASSERT_EQ(OK, sync_callback_.WaitForResult()); |