diff options
author | mattm@chromium.org <mattm@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-06-28 03:27:03 +0000 |
---|---|---|
committer | mattm@chromium.org <mattm@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-06-28 03:27:03 +0000 |
commit | c1dd24fcb3aa8c2c29a29841ca434998098941b3 (patch) | |
tree | 629d7d5180757f81189c110aa2e1550487894a6d /net/spdy | |
parent | 7e8a3ffaed19ac3c6c187464abc22558bf78a556 (diff) | |
download | chromium_src-c1dd24fcb3aa8c2c29a29841ca434998098941b3.zip chromium_src-c1dd24fcb3aa8c2c29a29841ca434998098941b3.tar.gz chromium_src-c1dd24fcb3aa8c2c29a29841ca434998098941b3.tar.bz2 |
Revert 144649 - Instead of enqueueing SPDY frames, instead enqueue SPDY streams that are ready to produce data. This allows us to lazily allocate a stream id.
BUG=111708
Review URL: https://chromiumcodereview.appspot.com/10448083
TBR=rch@chromium.org
Review URL: https://chromiumcodereview.appspot.com/10710008
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@144655 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/spdy')
-rw-r--r-- | net/spdy/spdy_io_buffer.cc | 15 | ||||
-rw-r--r-- | net/spdy/spdy_io_buffer.h | 10 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy2_unittest.cc | 43 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_spdy3_unittest.cc | 45 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 251 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 88 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy2_unittest.cc | 20 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 35 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 108 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 16 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.h | 2 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_spdy2_unittest.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_spdy3_unittest.cc | 10 |
15 files changed, 208 insertions, 455 deletions
diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc index f0c14b1..5720f7d 100644 --- a/net/spdy/spdy_io_buffer.cc +++ b/net/spdy/spdy_io_buffer.cc @@ -20,23 +20,8 @@ SpdyIOBuffer::SpdyIOBuffer( SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) { } -SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) { - buffer_ = rhs.buffer_; - priority_ = rhs.priority_; - position_ = rhs.position_; - stream_ = rhs.stream_; -} - SpdyIOBuffer::~SpdyIOBuffer() {} -SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) { - buffer_ = rhs.buffer_; - priority_ = rhs.priority_; - position_ = rhs.position_; - stream_ = rhs.stream_; - return *this; -} - void SpdyIOBuffer::release() { buffer_ = NULL; stream_ = NULL; diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h index 05ec59c..0aaced3 100644 --- a/net/spdy/spdy_io_buffer.h +++ b/net/spdy/spdy_io_buffer.h @@ -9,12 +9,10 @@ #include "base/memory/ref_counted.h" #include "net/base/io_buffer.h" #include "net/base/net_export.h" -#include "net/base/request_priority.h" +#include "net/spdy/spdy_stream.h" namespace net { -class SpdyStream; - // A class for managing SPDY IO buffers. These buffers need to be prioritized // so that the SpdySession sends them in the right order. Further, they need // to track the SpdyStream which they are associated with so that incremental @@ -28,14 +26,8 @@ class NET_EXPORT_PRIVATE SpdyIOBuffer { // |stream| is a pointer to the stream which is managing this buffer. SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority, SpdyStream* stream); - // Declare this instead of using the default so that we avoid needing to - // include spdy_stream.h. - SpdyIOBuffer(const SpdyIOBuffer& rhs); SpdyIOBuffer(); ~SpdyIOBuffer(); - // Declare this instead of using the default so that we avoid needing to - // include spdy_stream.h. - SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs); // Accessors. DrainableIOBuffer* buffer() const { return buffer_; } diff --git a/net/spdy/spdy_network_transaction_spdy2_unittest.cc b/net/spdy/spdy_network_transaction_spdy2_unittest.cc index a4c8864..a52859f 100644 --- a/net/spdy/spdy_network_transaction_spdy2_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy2_unittest.cc @@ -1736,36 +1736,22 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0)); scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { - CreateMockRead(*stream_reply, 1), - MockRead(ASYNC, 0, 3) // EOF - }; - - scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0)); - scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true)); - MockRead writes[] = { - CreateMockWrite(*req, 0), - CreateMockWrite(*body, 2), + CreateMockRead(*stream_reply, 2), + CreateMockRead(*stream_body, 3), + MockRead(SYNCHRONOUS, 0, 0) // EOF }; - scoped_refptr<DeterministicSocketData> data( - new DeterministicSocketData(reads, arraysize(reads), - writes, arraysize(writes))); - NormalSpdyTransactionHelper helper(CreatePostRequest(), + scoped_ptr<DelayedSocketData> data( + new DelayedSocketData(0, reads, arraysize(reads), NULL, 0)); + NormalSpdyTransactionHelper helper(request, BoundNetLog(), GetParam(), NULL); - helper.SetDeterministic(); helper.RunPreTestSetup(); - helper.AddDeterministicData(data.get()); - HttpNetworkTransaction* trans = helper.trans(); - - TestCompletionCallback callback; - int rv = trans->Start( - &CreatePostRequest(), callback.callback(), BoundNetLog()); - EXPECT_EQ(ERR_IO_PENDING, rv); + helper.AddData(data.get()); + helper.RunDefaultTest(); + helper.VerifyDataConsumed(); - data->RunFor(2); - rv = callback.WaitForResult(); - EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); - data->RunFor(1); + TransactionHelperResult out = helper.output(); + EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); } // The client upon cancellation tries to send a RST_STREAM frame. The mock @@ -5579,11 +5565,10 @@ TEST_P(SpdyNetworkTransactionSpdy2Test, OutOfOrderSynStream) { // This first request will start to establish the SpdySession. // Then we will start the second (MEDIUM priority) and then third // (HIGHEST priority) request in such a way that the third will actually - // start before the second, causing the second to be numbered differently - // than the order they were created. + // start before the second, causing the second to be re-numbered. scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); - scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM)); + scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST)); MockWrite writes[] = { CreateMockWrite(*req1, 0), CreateMockWrite(*req2, 3), diff --git a/net/spdy/spdy_network_transaction_spdy3_unittest.cc b/net/spdy/spdy_network_transaction_spdy3_unittest.cc index 349e37c..4eca577 100644 --- a/net/spdy/spdy_network_transaction_spdy3_unittest.cc +++ b/net/spdy/spdy_network_transaction_spdy3_unittest.cc @@ -1745,36 +1745,22 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, PostWithEarlySynReply) { scoped_ptr<SpdyFrame> stream_reply(ConstructSpdyPostSynReply(NULL, 0)); scoped_ptr<SpdyFrame> stream_body(ConstructSpdyBodyFrame(1, true)); MockRead reads[] = { - CreateMockRead(*stream_reply, 1), - MockRead(ASYNC, 0, 3) // EOF - }; - - scoped_ptr<SpdyFrame> req(ConstructSpdyPost(kUploadDataSize, NULL, 0)); - scoped_ptr<SpdyFrame> body(ConstructSpdyBodyFrame(1, true)); - MockRead writes[] = { - CreateMockWrite(*req, 0), - CreateMockWrite(*body, 2), + CreateMockRead(*stream_reply, 2), + CreateMockRead(*stream_body, 3), + MockRead(SYNCHRONOUS, 0, 0) // EOF }; - scoped_refptr<DeterministicSocketData> data( - new DeterministicSocketData(reads, arraysize(reads), - writes, arraysize(writes))); - NormalSpdyTransactionHelper helper(CreatePostRequest(), + scoped_ptr<DelayedSocketData> data( + new DelayedSocketData(0, reads, arraysize(reads), NULL, 0)); + NormalSpdyTransactionHelper helper(request, BoundNetLog(), GetParam(), NULL); - helper.SetDeterministic(); helper.RunPreTestSetup(); - helper.AddDeterministicData(data.get()); - HttpNetworkTransaction* trans = helper.trans(); - - TestCompletionCallback callback; - int rv = trans->Start( - &CreatePostRequest(), callback.callback(), BoundNetLog()); - EXPECT_EQ(ERR_IO_PENDING, rv); + helper.AddData(data.get()); + helper.RunDefaultTest(); + helper.VerifyDataConsumed(); - data->RunFor(2); - rv = callback.WaitForResult(); - EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, rv); - data->RunFor(1); + TransactionHelperResult out = helper.output(); + EXPECT_EQ(ERR_SYN_REPLY_NOT_RECEIVED, out.rv); } // The client upon cancellation tries to send a RST_STREAM frame. The mock @@ -2077,7 +2063,7 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, WindowUpdateOverflow) { // WINDOW_UPDATE while sending a request and will send a RST_STREAM frame. MockWrite writes[] = { CreateMockWrite(*req), - //CreateMockWrite(*body), + CreateMockWrite(*body), CreateMockWrite(*rst), }; @@ -6162,11 +6148,10 @@ TEST_P(SpdyNetworkTransactionSpdy3Test, OutOfOrderSynStream) { // This first request will start to establish the SpdySession. // Then we will start the second (MEDIUM priority) and then third // (HIGHEST priority) request in such a way that the third will actually - // start before the second, causing the second to be numbered differently - // than they order they were created. + // start before the second, causing the second to be re-numbered. scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); - scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, MEDIUM)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, MEDIUM)); + scoped_ptr<SpdyFrame> req3(ConstructSpdyGet(NULL, 0, false, 5, HIGHEST)); MockWrite writes[] = { CreateMockWrite(*req1, 0), CreateMockWrite(*req2, 3), diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 2b7ab3a..613d242 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -184,28 +184,6 @@ bool g_enable_ping_based_connection_checking = true; } // namespace // static -void SpdySession::SpdyIOBufferProducer::ActivateStream( - SpdySession* spdy_session, - SpdyStream* spdy_stream) { - spdy_session->ActivateStream(spdy_stream); -} - -// static -SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( - SpdyFrame* frame, - RequestPriority priority, - SpdyStream* stream) { - size_t size = frame->length() + SpdyFrame::kHeaderSize; - DCHECK_GT(size, 0u); - - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), frame->data(), size); - - return new SpdyIOBuffer(buffer, size, priority, stream); -} - -// static void SpdySession::set_default_protocol(NextProto default_protocol) { g_default_protocol = default_protocol; } @@ -382,13 +360,6 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { ssl_info.cert->VerifyNameMatch(domain); } -void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, - SpdyIOBufferProducer* producer) { - write_queue_.push(producer); - stream_producers_[producer] = stream; - WriteSocketLater(); -} - int SpdySession::GetPushStream( const GURL& url, scoped_refptr<SpdyStream>* stream, @@ -427,8 +398,7 @@ int SpdySession::CreateStream( const BoundNetLog& stream_net_log, const CompletionCallback& callback) { if (!max_concurrent_streams_ || - (active_streams_.size() + created_streams_.size() < - max_concurrent_streams_)) { + active_streams_.size() < max_concurrent_streams_) { return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); } @@ -518,7 +488,10 @@ int SpdySession::CreateStreamImpl( const std::string& path = url.PathForRequest(); + const SpdyStreamId stream_id = GetNewStreamId(); + *spdy_stream = new SpdyStream(this, + stream_id, false, stream_net_log); const scoped_refptr<SpdyStream>& stream = *spdy_stream; @@ -527,13 +500,14 @@ int SpdySession::CreateStreamImpl( stream->set_path(path); stream->set_send_window_size(initial_send_window_size_); stream->set_recv_window_size(initial_recv_window_size_); - created_streams_.insert(stream); + ActivateStream(stream); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", static_cast<int>(priority), 0, 10, 11); // TODO(mbelshe): Optimize memory allocations + DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); return OK; } @@ -555,13 +529,15 @@ int SpdySession::GetProtocolVersion() const { return buffered_spdy_framer_->protocol_version(); } -SpdySynStreamControlFrame* SpdySession::CreateSynStream( +int SpdySession::WriteSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, SpdyControlFlags flags, const linked_ptr<SpdyHeaderBlock>& headers) { - CHECK(IsStreamActive(stream_id)); + // Find our stream + if (!IsStreamActive(stream_id)) + return ERR_INVALID_SPDY_STREAM; const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; CHECK_EQ(stream->stream_id(), stream_id); @@ -572,7 +548,11 @@ SpdySynStreamControlFrame* SpdySession::CreateSynStream( buffered_spdy_framer_->CreateSynStream( stream_id, 0, ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), - credential_slot, flags, true, headers.get())); + credential_slot, flags, false, headers.get())); + // We enqueue all SYN_STREAM frames at the same priority to ensure + // that we do not send them out-of-order. + // http://crbug.com/111708 + QueueFrame(syn_frame.get(), HIGHEST, stream); base::StatsCounter spdy_requests("spdy.requests"); spdy_requests.Increment(); @@ -584,15 +564,14 @@ SpdySynStreamControlFrame* SpdySession::CreateSynStream( base::Bind(&NetLogSpdySynCallback, headers.get(), flags, stream_id, 0)); } - return syn_frame.release(); + return ERR_IO_PENDING; } -SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame( - const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority) { +int SpdySession::WriteCredentialFrame(const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority) { DCHECK(is_secure_); unsigned char secret[32]; // 32 bytes from the spec GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof", @@ -634,18 +613,24 @@ SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame( DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyCredentialControlFrame> credential_frame( buffered_spdy_framer_->CreateCredentialFrame(credential)); + // We enqueue all SYN_STREAM frames at the same priority to ensure + // that we do not send them out-of-order, which means that we need + // to enqueue all CREDENTIAL frames at this priority to ensure that + // they are sent *before* the SYN_STREAM that references them. + // http://crbug.com/111708 + QueueFrame(credential_frame.get(), HIGHEST, NULL); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); } - return credential_frame.release(); + return ERR_IO_PENDING; } -SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, - net::IOBuffer* data, int len, - SpdyDataFlags flags) { +int SpdySession::WriteStreamData(SpdyStreamId stream_id, + net::IOBuffer* data, int len, + SpdyDataFlags flags) { // Find our stream CHECK(IsStreamActive(stream_id)); scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; @@ -669,7 +654,7 @@ SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return NULL; + return ERR_IO_PENDING; } int new_len = std::min(len, stream->send_window_size()); if (new_len < len) { @@ -694,23 +679,18 @@ SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, scoped_ptr<SpdyDataFrame> frame( buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), len, flags)); + QueueFrame(frame.get(), stream->priority(), stream); - return frame.release(); + return ERR_IO_PENDING; } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { - DCHECK_NE(0u, stream_id); // TODO(mbelshe): We should send a RST_STREAM control frame here // so that the server can cancel a large send. DeleteStream(stream_id, status); } -void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) { - DCHECK_EQ(0u, stream->stream_id()); - created_streams_.erase(scoped_refptr<SpdyStream>(stream)); -} - void SpdySession::ResetStream(SpdyStreamId stream_id, SpdyStatusCodes status, const std::string& description) { @@ -728,7 +708,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; priority = stream->priority(); } - QueueFrame(rst_frame.release(), priority); + QueueFrame(rst_frame.get(), priority, NULL); RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); @@ -916,22 +896,46 @@ void SpdySession::WriteSocket() { // Loop sending frames until we've sent everything or until the write // returns error (or ERR_IO_PENDING). DCHECK(buffered_spdy_framer_.get()); - while (in_flight_write_.buffer() || !write_queue_.empty()) { + while (in_flight_write_.buffer() || !queue_.empty()) { if (!in_flight_write_.buffer()) { - // Grab the next SpdyBuffer to send. - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); - scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); - stream_producers_.erase(producer.get()); - write_queue_.pop(); - // It is possible that a stream had data to write, but a - // WINDOW_UPDATE frame has been received which made that - // stream no longer writable. - // TODO(rch): consider handling that case by removing the - // stream from the writable queue? - if (buffer == NULL) - continue; - - in_flight_write_ = *buffer; + // Grab the next SpdyFrame to send. + SpdyIOBuffer next_buffer = queue_.top(); + queue_.pop(); + + // We've deferred compression until just before we write it to the socket, + // which is now. At this time, we don't compress our data frames. + SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); + size_t size; + if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) { + DCHECK(uncompressed_frame.is_control_frame()); + const SpdyControlFrame* uncompressed_control_frame = + reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame); + scoped_ptr<SpdyFrame> compressed_frame( + buffered_spdy_framer_->CompressControlFrame( + *uncompressed_control_frame)); + if (!compressed_frame.get()) { + RecordProtocolErrorHistogram( + PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE); + CloseSessionOnError( + net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure."); + return; + } + + size = compressed_frame->length() + SpdyFrame::kHeaderSize; + + DCHECK_GT(size, 0u); + + // TODO(mbelshe): We have too much copying of data here. + IOBufferWithSize* buffer = new IOBufferWithSize(size); + memcpy(buffer->data(), compressed_frame->data(), size); + + // Attempt to send the frame. + in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST, + next_buffer.stream()); + } else { + size = uncompressed_frame.length() + SpdyFrame::kHeaderSize; + in_flight_write_ = next_buffer; + } } else { DCHECK(in_flight_write_.buffer()->BytesRemaining()); } @@ -978,33 +982,16 @@ void SpdySession::CloseAllStreams(net::Error status) { while (!active_streams_.empty()) { ActiveStreamMap::iterator it = active_streams_.begin(); const scoped_refptr<SpdyStream>& stream = it->second; - LogAbandonedStream(stream, status); + DCHECK(stream); + std::string description = base::StringPrintf( + "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); + stream->LogStreamError(status, description); DeleteStream(stream->stream_id(), status); } - while (!created_streams_.empty()) { - CreatedStreamSet::iterator it = created_streams_.begin(); - const scoped_refptr<SpdyStream>& stream = *it; - LogAbandonedStream(stream, status); - stream->OnClose(status); - created_streams_.erase(it); - } - // We also need to drain the queue. - while (!write_queue_.empty()) { - SpdyIOBufferProducer* producer = write_queue_.top(); - stream_producers_.erase(producer); - delete producer; - write_queue_.pop(); - } -} - -void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, - net::Error status) { - DCHECK(stream); - std::string description = base::StringPrintf( - "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); - stream->LogStreamError(status, description); + while (queue_.size()) + queue_.pop(); } int SpdySession::GetNewStreamId() { @@ -1015,6 +1002,17 @@ int SpdySession::GetNewStreamId() { return id; } +void SpdySession::QueueFrame(SpdyFrame* frame, + RequestPriority priority, + SpdyStream* stream) { + int length = SpdyFrame::kHeaderSize + frame->length(); + IOBuffer* buffer = new IOBuffer(length); + memcpy(buffer->data(), frame->data(), length); + queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); + + WriteSocketLater(); +} + void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool, const std::string& description) { @@ -1101,41 +1099,7 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { return connection_->socket()->GetLocalAddress(address); } -class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { - public: - SimpleSpdyIOBufferProducer(SpdyFrame* frame, - RequestPriority priority) - : frame_(frame), - priority_(priority) { - } - - virtual RequestPriority GetPriority() const OVERRIDE { - return priority_; - } - - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) { - return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( - frame_, priority_, NULL); - } - - private: - SpdyFrame* frame_; - RequestPriority priority_; -}; - -void SpdySession::QueueFrame(SpdyFrame* frame, - RequestPriority priority) { - SimpleSpdyIOBufferProducer* producer = - new SimpleSpdyIOBufferProducer(frame, priority); - write_queue_.push(producer); - WriteSocketLater(); -} - void SpdySession::ActivateStream(SpdyStream* stream) { - if (stream->stream_id() == 0) { - stream->set_stream_id(GetNewStreamId()); - created_streams_.erase(scoped_refptr<SpdyStream>(stream)); - } const SpdyStreamId id = stream->stream_id(); DCHECK(!IsStreamActive(id)); @@ -1163,19 +1127,6 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { if (it2 == active_streams_.end()) return; - // Possibly remove from the write queue. - WriteQueue old = write_queue_; - write_queue_ = WriteQueue(); - while (!old.empty()) { - SpdyIOBufferProducer* producer = old.top(); - StreamProducerMap::iterator it = stream_producers_.find(producer); - if (it == stream_producers_.end() || it->second->stream_id() != id) - write_queue_.push(producer); - else - delete producer; - old.pop(); - } - // If this is an active stream, call the callback. const scoped_refptr<SpdyStream> stream(it2->second); active_streams_.erase(it2); @@ -1399,8 +1350,8 @@ void SpdySession::OnSynStream( return; } - scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_)); - stream->set_stream_id(stream_id); + scoped_refptr<SpdyStream> stream( + new SpdyStream(this, stream_id, true, net_log_)); stream->set_path(gurl.PathForRequest()); stream->set_send_window_size(initial_send_window_size_); @@ -1422,6 +1373,7 @@ void SpdySession::OnSynStream( void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame, const linked_ptr<SpdyHeaderBlock>& headers) { SpdyStreamId stream_id = frame.stream_id(); + if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_SYN_REPLY, @@ -1605,7 +1557,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id, DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame( buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); - QueueFrame(window_update_frame.release(), stream->priority()); + QueueFrame(window_update_frame.get(), stream->priority(), NULL); } // Given a cwnd that we would have sent to the server, modify it based on the @@ -1644,6 +1596,7 @@ void SpdySession::SendInitialSettings() { settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_); } + sent_settings_ = true; SendSettings(settings_map); } @@ -1676,6 +1629,7 @@ void SpdySession::SendInitialSettings() { HandleSetting(new_id, new_val); } + sent_settings_ = true; SendSettings(settings_map_new); } @@ -1689,8 +1643,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdySettingsControlFrame> settings_frame( buffered_spdy_framer_->CreateSettings(settings)); - sent_settings_ = true; - QueueFrame(settings_frame.release(), HIGHEST); + QueueFrame(settings_frame.get(), HIGHEST, NULL); } void SpdySession::HandleSetting(uint32 id, uint32 value) { @@ -1725,12 +1678,6 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { DCHECK(stream); stream->AdjustSendWindowSize(delta_window_size); } - - CreatedStreamSet::iterator i; - for (i = created_streams_.begin(); i != created_streams_.end(); i++) { - const scoped_refptr<SpdyStream>& stream = *i; - stream->AdjustSendWindowSize(delta_window_size); - } } void SpdySession::SendPrefacePingIfNoneInFlight() { @@ -1751,7 +1698,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyPingControlFrame> ping_frame( buffered_spdy_framer_->CreatePingFrame(next_ping_id_)); - QueueFrame(ping_frame.release(), HIGHEST); + QueueFrame(ping_frame.get(), HIGHEST, NULL); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index ceab965..844d37b 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -20,7 +20,6 @@ #include "net/base/load_states.h" #include "net/base/net_errors.h" #include "net/base/request_priority.h" -#include "net/base/ssl_client_cert_type.h" #include "net/base/ssl_config_service.h" #include "net/base/upload_data_stream.h" #include "net/socket/client_socket_handle.h" @@ -95,32 +94,6 @@ COMPILE_ASSERT(PROTOCOL_ERROR_UNEXPECTED_PING == class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, public BufferedSpdyFramerVisitorInterface { public: - // Defines an interface for producing SpdyIOBuffers. - class NET_EXPORT_PRIVATE SpdyIOBufferProducer { - public: - SpdyIOBufferProducer() {} - - // Returns a newly created SpdyIOBuffer, owned by the caller, or NULL - // if not buffer is ready to be produced. - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0; - - virtual RequestPriority GetPriority() const = 0; - - virtual ~SpdyIOBufferProducer() {} - - protected: - // Activates |spdy_stream| in |spdy_session|. - static void ActivateStream(SpdySession* spdy_session, - SpdyStream* spdy_stream); - - static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame, - RequestPriority priority, - SpdyStream* spdy_stream); - - private: - DISALLOW_COPY_AND_ASSIGN(SpdyIOBufferProducer); - }; - // Create a new SpdySession. // |host_port_proxy_pair| is the host/port that this session connects to, and // the proxy configuration settings that it's using. @@ -185,14 +158,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // authentication now. bool VerifyDomainAuthentication(const std::string& domain); - // Records that |stream| has a write available from |producer|. - // |producer| will be owned by this SpdySession. - void SetStreamHasWriteAvailable(SpdyStream* stream, - SpdyIOBufferProducer* producer); - // Send the SYN frame for |stream_id|. This also sends PING message to check // the status of the connection. - SpdySynStreamControlFrame* CreateSynStream( + int WriteSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, @@ -200,24 +168,21 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, const linked_ptr<SpdyHeaderBlock>& headers); // Write a CREDENTIAL frame to the session. - SpdyCredentialControlFrame* CreateCredentialFrame(const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority); + int WriteCredentialFrame(const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority); // Write a data frame to the stream. // Used to create and queue a data frame for the given stream. - SpdyDataFrame* CreateDataFrame(SpdyStreamId stream_id, - net::IOBuffer* data, int len, - SpdyDataFlags flags); + int WriteStreamData(SpdyStreamId stream_id, net::IOBuffer* data, + int len, + SpdyDataFlags flags); // Close a stream. void CloseStream(SpdyStreamId stream_id, int status); - // Close a stream that has been created but is not yet active. - void CloseCreatedStream(SpdyStream* stream, int status); - // Reset a stream by sending a RST_STREAM frame with given status code. // Also closes the stream. Was not piggybacked to CloseStream since not // all of the calls to CloseStream necessitate sending a RST_STREAM. @@ -304,7 +269,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Returns true if session is not currently active bool is_active() const { - return !active_streams_.empty() || !created_streams_.empty(); + return !active_streams_.empty(); } // Access to the number of active and pending streams. These are primarily @@ -313,7 +278,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, size_t num_unclaimed_pushed_streams() const { return unclaimed_pushed_streams_.size(); } - size_t num_created_streams() const { return created_streams_.size(); } // Returns true if flow control is enabled for the session. bool is_flow_control_enabled() const { @@ -389,18 +353,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::map<int, scoped_refptr<SpdyStream> > ActiveStreamMap; // Only HTTP push a stream. typedef std::map<std::string, scoped_refptr<SpdyStream> > PushedStreamMap; - typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet; - typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap; - class SpdyIOBufferProducerCompare { - public: - bool operator() (const SpdyIOBufferProducer* lhs, - const SpdyIOBufferProducer* rhs) const { - return lhs->GetPriority() < rhs->GetPriority(); - } - }; - typedef std::priority_queue<SpdyIOBufferProducer*, - std::vector<SpdyIOBufferProducer*>, - SpdyIOBufferProducerCompare> WriteQueue; + typedef std::priority_queue<SpdyIOBuffer> OutputQueue; struct CallbackResultPair { CallbackResultPair(const CompletionCallback& callback_in, int result_in) @@ -478,7 +431,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Queue a frame for sending. // |frame| is the frame to send. // |priority| is the priority for insertion into the queue. - void QueueFrame(SpdyFrame* frame, RequestPriority priority); + // |stream| is the stream which this IO is associated with (or NULL). + void QueueFrame(SpdyFrame* frame, RequestPriority priority, + SpdyStream* stream); // Track active streams in the active stream list. void ActivateStream(SpdyStream* stream); @@ -504,9 +459,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Closes all streams. Used as part of shutdown. void CloseAllStreams(net::Error status); - void LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, - net::Error status); - // Invokes a user callback for stream creation. We provide this method so it // can be deferred to the MessageLoop, so we avoid re-entrancy problems. void InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream>* stream); @@ -611,16 +563,8 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // server, but do not have consumers yet. PushedStreamMap unclaimed_pushed_streams_; - // Set of all created streams but that have not yet sent any frames. - CreatedStreamSet created_streams_; - - // As streams have data to be sent, we put them into the write queue. - WriteQueue write_queue_; - - // Mapping from SpdyIOBufferProducers to their corresponding SpdyStream - // so that when a stream is destroyed, we can remove the corresponding - // producer from |write_queue_|. - StreamProducerMap stream_producers_; + // As we gather data to be sent, we put it into the output queue. + OutputQueue queue_; // The packet we are currently sending. bool write_pending_; // Will be true when a write is in progress. diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index 2cbb283..6ee4257 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -330,7 +330,7 @@ TEST_F(SpdySessionSpdy2Test, FailedPing) { // Assert session is not closed. EXPECT_FALSE(session->IsClosed()); - EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); + EXPECT_LT(0u, session->num_active_streams()); EXPECT_TRUE(spdy_session_pool->HasSession(pair)); // We set last time we have received any data in 1 sec less than now. @@ -450,8 +450,8 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) { // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active - session1->CloseCreatedStream(spdy_stream1, OK); - session3->CloseCreatedStream(spdy_stream3, OK); + session1->CloseStream(spdy_stream1->stream_id(), OK); + session3->CloseStream(spdy_stream3->stream_id(), OK); EXPECT_FALSE(session1->is_active()); EXPECT_FALSE(session1->IsClosed()); EXPECT_TRUE(session2->is_active()); @@ -474,7 +474,7 @@ TEST_F(SpdySessionSpdy2Test, CloseIdleSessions) { EXPECT_FALSE(session2->IsClosed()); // Make 2 not active - session2->CloseCreatedStream(spdy_stream2, OK); + session2->CloseStream(spdy_stream2->stream_id(), OK); EXPECT_FALSE(session2->is_active()); EXPECT_FALSE(session2->IsClosed()); @@ -1109,8 +1109,8 @@ TEST_F(SpdySessionSpdy2Test, CloseSessionOnError) { TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { // Construct the request. MockConnect connect_data(SYNCHRONOUS, OK); - scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST)); + scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); MockWrite writes[] = { CreateMockWrite(*req1, 2), CreateMockWrite(*req2, 1), @@ -1174,14 +1174,14 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { GURL url1("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1, BoundNetLog(), callback1.callback())); - EXPECT_EQ(0u, spdy_stream1->stream_id()); + EXPECT_EQ(1u, spdy_stream1->stream_id()); scoped_refptr<SpdyStream> spdy_stream2; TestCompletionCallback callback2; GURL url2("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2, BoundNetLog(), callback2.callback())); - EXPECT_EQ(0u, spdy_stream2->stream_id()); + EXPECT_EQ(3u, spdy_stream2->stream_id()); linked_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); (*headers)["method"] = "GET"; @@ -1199,8 +1199,8 @@ TEST_F(SpdySessionSpdy2Test, OutOfOrderSynStreams) { spdy_stream2->SendRequest(false); MessageLoop::current()->RunAllPending(); - EXPECT_EQ(3u, spdy_stream1->stream_id()); - EXPECT_EQ(1u, spdy_stream2->stream_id()); + EXPECT_EQ(1u, spdy_stream1->stream_id()); + EXPECT_EQ(3u, spdy_stream2->stream_id()); spdy_stream1->Cancel(); spdy_stream1 = NULL; diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 8862889..299186e 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -330,7 +330,7 @@ TEST_F(SpdySessionSpdy3Test, FailedPing) { // Assert session is not closed. EXPECT_FALSE(session->IsClosed()); - EXPECT_LT(0u, session->num_active_streams() + session->num_created_streams()); + EXPECT_LT(0u, session->num_active_streams()); EXPECT_TRUE(spdy_session_pool->HasSession(pair)); // We set last time we have received any data in 1 sec less than now. @@ -450,8 +450,8 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) { // Make sessions 1 and 3 inactive, but keep them open. // Session 2 still open and active - session1->CloseCreatedStream(spdy_stream1, OK); - session3->CloseCreatedStream(spdy_stream3, OK); + session1->CloseStream(spdy_stream1->stream_id(), OK); + session3->CloseStream(spdy_stream3->stream_id(), OK); EXPECT_FALSE(session1->is_active()); EXPECT_FALSE(session1->IsClosed()); EXPECT_TRUE(session2->is_active()); @@ -474,7 +474,7 @@ TEST_F(SpdySessionSpdy3Test, CloseIdleSessions) { EXPECT_FALSE(session2->IsClosed()); // Make 2 not active - session2->CloseCreatedStream(spdy_stream2, OK); + session2->CloseStream(spdy_stream2->stream_id(), OK); EXPECT_FALSE(session2->is_active()); EXPECT_FALSE(session2->IsClosed()); @@ -1193,24 +1193,22 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { MockConnect connect_data(SYNCHRONOUS, OK); scoped_ptr<SpdyFrame> settings_frame(ConstructSpdySettings(new_settings)); MockRead reads[] = { - CreateMockRead(*settings_frame, 0), - MockRead(ASYNC, 0, 2) // EOF + CreateMockRead(*settings_frame), + MockRead(SYNCHRONOUS, 0, 0) // EOF }; SpdySessionDependencies session_deps; - session_deps.host_resolver->set_synchronous_mode(true); - scoped_refptr<DeterministicSocketData> data = - new DeterministicSocketData(reads, arraysize(reads), NULL, 0); - data->set_connect_data(connect_data); - session_deps.deterministic_socket_factory->AddSocketDataProvider(data); + StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); + data.set_connect_data(connect_data); + session_deps.socket_factory->AddSocketDataProvider(&data); SSLSocketDataProvider ssl(SYNCHRONOUS, OK); session_deps.socket_factory->AddSSLSocketDataProvider(&ssl); scoped_refptr<HttpNetworkSession> http_session( - SpdySessionDependencies::SpdyCreateSessionDeterministic(&session_deps)); + SpdySessionDependencies::SpdyCreateSession(&session_deps)); const std::string kTestHost("www.foo.com"); const int kTestPort = 80; @@ -1250,7 +1248,6 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { callback1.callback())); EXPECT_NE(spdy_stream1->send_window_size(), window_size); - data->RunFor(1); // Process the SETTINGS frame, but not the EOF MessageLoop::current()->RunAllPending(); EXPECT_EQ(session->initial_send_window_size(), window_size); EXPECT_EQ(spdy_stream1->send_window_size(), window_size); @@ -1275,8 +1272,8 @@ TEST_F(SpdySessionSpdy3Test, UpdateStreamsSendWindowSize) { TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { // Construct the request. MockConnect connect_data(SYNCHRONOUS, OK); - scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, HIGHEST)); - scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, LOWEST)); + scoped_ptr<SpdyFrame> req1(ConstructSpdyGet(NULL, 0, false, 1, LOWEST)); + scoped_ptr<SpdyFrame> req2(ConstructSpdyGet(NULL, 0, false, 3, HIGHEST)); MockWrite writes[] = { CreateMockWrite(*req1, 2), CreateMockWrite(*req2, 1), @@ -1340,14 +1337,14 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { GURL url1("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url1, LOWEST, &spdy_stream1, BoundNetLog(), callback1.callback())); - EXPECT_EQ(0u, spdy_stream1->stream_id()); + EXPECT_EQ(1u, spdy_stream1->stream_id()); scoped_refptr<SpdyStream> spdy_stream2; TestCompletionCallback callback2; GURL url2("http://www.google.com"); EXPECT_EQ(OK, session->CreateStream(url2, HIGHEST, &spdy_stream2, BoundNetLog(), callback2.callback())); - EXPECT_EQ(0u, spdy_stream2->stream_id()); + EXPECT_EQ(3u, spdy_stream2->stream_id()); linked_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); (*headers)[":method"] = "GET"; @@ -1365,8 +1362,8 @@ TEST_F(SpdySessionSpdy3Test, OutOfOrderSynStreams) { spdy_stream2->SendRequest(false); MessageLoop::current()->RunAllPending(); - EXPECT_EQ(3u, spdy_stream1->stream_id()); - EXPECT_EQ(1u, spdy_stream2->stream_id()); + EXPECT_EQ(1u, spdy_stream1->stream_id()); + EXPECT_EQ(3u, spdy_stream2->stream_id()); spdy_stream1->Cancel(); spdy_stream1 = NULL; diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 3ad7544..09a184b 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -50,10 +50,11 @@ bool ContainsUpperAscii(const std::string& str) { } // namespace SpdyStream::SpdyStream(SpdySession* session, + SpdyStreamId stream_id, bool pushed, const BoundNetLog& net_log) : continue_buffering_data_(true), - stream_id_(0), + stream_id_(stream_id), priority_(HIGHEST), slot_(0), stalled_by_flow_control_(false), @@ -76,77 +77,8 @@ SpdyStream::SpdyStream(SpdySession* session, domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) { } -class SpdyStream::SpdyStreamIOBufferProducer - : public SpdySession::SpdyIOBufferProducer { - public: - SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {} - - // SpdyFrameProducer - virtual RequestPriority GetPriority() const OVERRIDE { - return stream_->priority(); - } - - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { - if (stream_->stream_id() == 0) - SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_); - SpdyFrame* frame = stream_->ProduceNextFrame(); - return frame == NULL ? NULL : - SpdySession::SpdyIOBufferProducer::CreateIOBuffer( - frame, GetPriority(), stream_); - } - - private: - scoped_refptr<SpdyStream> stream_; -}; - -void SpdyStream::SetHasWriteAvailable() { - session_->SetStreamHasWriteAvailable(this, - new SpdyStreamIOBufferProducer(this)); -} - -SpdyFrame* SpdyStream::ProduceNextFrame() { - if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) { - CHECK(request_.get()); - CHECK_GT(stream_id_, 0u); - - std::string origin = GetUrl().GetOrigin().spec(); - DCHECK(origin[origin.length() - 1] == '/'); - origin.erase(origin.length() - 1); // Trim trailing slash. - SpdyCredentialControlFrame* frame = session_->CreateCredentialFrame( - origin, domain_bound_cert_type_, domain_bound_private_key_, - domain_bound_cert_, priority_); - return frame; - } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) { - CHECK(request_.get()); - CHECK_GT(stream_id_, 0u); - - SpdyControlFlags flags = - has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; - SpdySynStreamControlFrame* frame = session_->CreateSynStream( - stream_id_, priority_, slot_, flags, request_); - send_time_ = base::TimeTicks::Now(); - return frame; - } else { - CHECK(!cancelled()); - // We must need to write stream data. - // Until the headers have been completely sent, we can not be sure - // that our stream_id is correct. - DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); - DCHECK_GT(stream_id_, 0u); - DCHECK(!pending_data_frames_.empty()); - SpdyFrame* frame = pending_data_frames_.front(); - pending_data_frames_.pop_front(); - return frame; - } -} - SpdyStream::~SpdyStream() { UpdateHistograms(); - while (!pending_data_frames_.empty()) { - SpdyFrame* frame = pending_data_frames_.back(); - pending_data_frames_.pop_back(); - delete frame; - } } void SpdyStream::SetDelegate(Delegate* delegate) { @@ -540,10 +472,7 @@ void SpdyStream::Cancel() { } void SpdyStream::Close() { - if (stream_id_ != 0) - session_->CloseStream(stream_id_, net::OK); - else - session_->CloseCreatedStream(this, OK); + session_->CloseStream(stream_id_, net::OK); } int SpdyStream::SendRequest(bool has_upload_data) { @@ -570,13 +499,7 @@ int SpdyStream::WriteStreamData(IOBuffer* data, int length, // Until the headers have been completely sent, we can not be sure // that our stream_id is correct. DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); - CHECK_GT(stream_id_, 0u); - - pending_data_frames_.push_back( - session_->CreateDataFrame(stream_id_, data, length, flags)); - - SetHasWriteAvailable(); - return ERR_IO_PENDING; + return session_->WriteStreamData(stream_id_, data, length, flags); } bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, @@ -720,8 +643,14 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) { int SpdyStream::DoSendDomainBoundCert() { io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; CHECK(request_.get()); - SetHasWriteAvailable(); - return ERR_IO_PENDING; + std::string origin = GetUrl().GetOrigin().spec(); + origin.erase(origin.length() - 1); // trim trailing slash + int rv = session_->WriteCredentialFrame( + origin, domain_bound_cert_type_, domain_bound_private_key_, + domain_bound_cert_, priority_); + if (rv != ERR_IO_PENDING) + return rv; + return OK; } int SpdyStream::DoSendDomainBoundCertComplete(int result) { @@ -735,7 +664,18 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) { int SpdyStream::DoSendHeaders() { CHECK(!cancelled_); - SetHasWriteAvailable(); + SpdyControlFlags flags = CONTROL_FLAG_NONE; + if (!has_upload_data_) + flags = CONTROL_FLAG_FIN; + + CHECK(request_.get()); + int result = session_->WriteSynStream( + stream_id_, priority_, slot_, flags, + request_); + if (result != ERR_IO_PENDING) + return result; + + send_time_ = base::TimeTicks::Now(); io_state_ = STATE_SEND_HEADERS_COMPLETE; return ERR_IO_PENDING; } diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index ea85ce4..dddc0e7 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -6,7 +6,6 @@ #define NET_SPDY_SPDY_STREAM_H_ #pragma once -#include <list> #include <string> #include <vector> @@ -25,12 +24,12 @@ #include "net/socket/ssl_client_socket.h" #include "net/spdy/spdy_framer.h" #include "net/spdy/spdy_protocol.h" -#include "net/spdy/spdy_session.h" namespace net { class AddressList; class IPEndPoint; +class SpdySession; class SSLCertRequestInfo; class SSLInfo; @@ -96,6 +95,7 @@ class NET_EXPORT_PRIVATE SpdyStream // SpdyStream constructor SpdyStream(SpdySession* session, + SpdyStreamId stream_id, bool pushed, const BoundNetLog& net_log); @@ -260,8 +260,6 @@ class NET_EXPORT_PRIVATE SpdyStream int GetProtocolVersion() const; private: - class SpdyStreamIOBufferProducer; - enum State { STATE_NONE, STATE_GET_DOMAIN_BOUND_CERT, @@ -310,14 +308,6 @@ class NET_EXPORT_PRIVATE SpdyStream // the MessageLoop to replay all the data that the server has already sent. void PushedStreamReplayData(); - // Informs the SpdySession that this stream has a write available. - void SetHasWriteAvailable(); - - // Returns a newly created SPDY frame owned by the called that contains - // the next frame to be sent by this frame. May return NULL if this - // stream has become stalled on flow control. - SpdyFrame* ProduceNextFrame(); - // There is a small period of time between when a server pushed stream is // first created, and the pushed data is replayed. Any data received during // this time should continue to be buffered. @@ -353,8 +343,6 @@ class NET_EXPORT_PRIVATE SpdyStream linked_ptr<SpdyHeaderBlock> response_; base::Time response_time_; - std::list<SpdyFrame*> pending_data_frames_; - State io_state_; // Since we buffer the response, we also buffer the response status. diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index 99ede76..4d7fd48 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -277,9 +277,9 @@ TEST_F(SpdyStreamSpdy2Test, PushedStream) { // Conjure up a stream. scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session, + 2, true, net_log); - stream->set_stream_id(2); EXPECT_FALSE(stream->response_received()); EXPECT_FALSE(stream->HasUrl()); @@ -408,10 +408,10 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); - EXPECT_EQ(OK, callback.WaitForResult()); - const SpdyStreamId stream_id = stream->stream_id(); + EXPECT_EQ(OK, callback.WaitForResult()); + EXPECT_TRUE(delegate->send_headers_completed()); EXPECT_EQ("200", (*delegate->response())["status"]); EXPECT_EQ("HTTP/1.1", (*delegate->response())["version"]); diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index 0dbaf2e..b7db4e2 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -278,9 +278,9 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) { // Conjure up a stream. scoped_refptr<SpdyStream> stream = new SpdyStream(spdy_session, + 2, true, net_log); - stream->set_stream_id(2); EXPECT_FALSE(stream->response_received()); EXPECT_FALSE(stream->HasUrl()); @@ -413,10 +413,10 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); - EXPECT_EQ(OK, callback.WaitForResult()); - const SpdyStreamId stream_id = stream->stream_id(); + EXPECT_EQ(OK, callback.WaitForResult()); + EXPECT_TRUE(delegate->send_headers_completed()); EXPECT_EQ("200", (*delegate->response())[":status"]); EXPECT_EQ("HTTP/1.1", (*delegate->response())[":version"]); diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h index 4a208b9..d8d85da 100644 --- a/net/spdy/spdy_websocket_stream.h +++ b/net/spdy/spdy_websocket_stream.h @@ -84,8 +84,6 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream virtual void set_chunk_callback(ChunkCallback* callback) OVERRIDE; private: - friend class SpdyWebSocketStreamSpdy2Test; - friend class SpdyWebSocketStreamSpdy3Test; FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy2Test, Basic); FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamSpdy3Test, Basic); diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc index d6e75a6..5f468c9 100644 --- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc @@ -165,8 +165,6 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test { OrderedSocketData* data() { return data_.get(); } void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { - // Record the actual stream_id. - created_stream_id_ = websocket_stream_->stream_->stream_id(); websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); } @@ -301,7 +299,6 @@ class SpdyWebSocketStreamSpdy2Test : public testing::Test { scoped_refptr<TransportSocketParams> transport_params_; scoped_ptr<SpdyWebSocketStream> websocket_stream_; SpdyStreamId stream_id_; - SpdyStreamId created_stream_id_; scoped_ptr<SpdyFrame> request_frame_; scoped_ptr<SpdyFrame> response_frame_; scoped_ptr<SpdyFrame> message_frame_; @@ -361,13 +358,12 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, Basic) { ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); ASSERT_TRUE(websocket_stream_->stream_); + EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); SendRequest(); completion_callback_.WaitForResult(); - EXPECT_EQ(stream_id_, created_stream_id_); - websocket_stream_.reset(); const std::vector<SpdyWebSocketStreamEvent>& events = @@ -527,7 +523,7 @@ TEST_F(SpdyWebSocketStreamSpdy2Test, DestructionAfterExplicitClose) { } TEST_F(SpdyWebSocketStreamSpdy2Test, IOPending) { - Prepare(1); + Prepare(3); scoped_ptr<SpdyFrame> settings_frame( ConstructSpdySettings(spdy_settings_to_send_)); MockWrite writes[] = { diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc index fcc5ccb..5a21e66 100644 --- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc @@ -165,8 +165,6 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test { OrderedSocketData* data() { return data_.get(); } void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { - // Record the actual stream_id. - created_stream_id_ = websocket_stream_->stream_->stream_id(); websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); } @@ -301,7 +299,6 @@ class SpdyWebSocketStreamSpdy3Test : public testing::Test { scoped_refptr<TransportSocketParams> transport_params_; scoped_ptr<SpdyWebSocketStream> websocket_stream_; SpdyStreamId stream_id_; - SpdyStreamId created_stream_id_; scoped_ptr<SpdyFrame> request_frame_; scoped_ptr<SpdyFrame> response_frame_; scoped_ptr<SpdyFrame> message_frame_; @@ -361,13 +358,12 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, Basic) { ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); ASSERT_TRUE(websocket_stream_->stream_); + EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); SendRequest(); completion_callback_.WaitForResult(); - EXPECT_EQ(stream_id_, created_stream_id_); - websocket_stream_.reset(); const std::vector<SpdyWebSocketStreamEvent>& events = @@ -527,7 +523,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, DestructionAfterExplicitClose) { } TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) { - Prepare(1); + Prepare(3); scoped_ptr<SpdyFrame> settings_frame( ConstructSpdySettings(spdy_settings_to_send_)); MockWrite writes[] = { @@ -580,7 +576,7 @@ TEST_F(SpdyWebSocketStreamSpdy3Test, IOPending) { ASSERT_EQ(ERR_IO_PENDING, websocket_stream_->InitializeStream( url, HIGHEST, net_log)); - // Delete the first stream to allow create the second stream. + // Delete the fist stream to allow create the second stream. block_stream.reset(); ASSERT_EQ(OK, sync_callback_.WaitForResult()); |