diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-13 12:18:16 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-13 12:18:16 +0000 |
commit | c78ac09acfa96b97e11492a13b4d9b379eaea4ea (patch) | |
tree | a3de5b7b592781c619ba8faaf43ff4254b3f4c7f | |
parent | d649b0dea66ce082a202a8e3f9b27b10141d61d0 (diff) | |
download | chromium_src-c78ac09acfa96b97e11492a13b4d9b379eaea4ea.zip chromium_src-c78ac09acfa96b97e11492a13b4d9b379eaea4ea.tar.gz chromium_src-c78ac09acfa96b97e11492a13b4d9b379eaea4ea.tar.bz2 |
[SPDY] Refactor SpdySession's write queue
This is in preparation for replacing the various IOBuffers used for reads/writes
with a single SpdyBuffer class.
Replace the priority queue of SpdyIOBufferProducers with a SpdyWriteQueue object,
which is an an array of FIFO queues binned by priority.
The priority queue was looking only at priority and so was not guaranteeing
FIFO behavior among producers with the same priority.
Remove the frame queue in SpdyStream and instead have it use the session's
write queue directly.
Remove unused fields from SpdyIOBuffer and clean it up.
Propagate and handle errors from SpdyCredentialBuilder::Build.
Rename SpdyIOBufferProducer to SpdyFrameProducer, have it return a SpdyFrame,
clean up its interface, and move the stream-activating logic out of it.
Replace uses of std::list with std::deque.
Steamline logic in SpdySession that deals with the write queue.
Convert some raw pointers to scoped_ptr<>.
Convert a use of Unretained() in SpdySession to use the weak pointer factory.
BUG=176582
Committed: https://src.chromium.org/viewvc/chrome?view=rev&revision=192975
Review URL: https://chromiumcodereview.appspot.com/13009012
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194102 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/net.gyp | 5 | ||||
-rw-r--r-- | net/spdy/spdy_frame_producer.cc | 26 | ||||
-rw-r--r-- | net/spdy/spdy_frame_producer.h | 50 | ||||
-rw-r--r-- | net/spdy/spdy_io_buffer.cc | 25 | ||||
-rw-r--r-- | net/spdy/spdy_io_buffer.h | 39 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 280 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 116 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy2_unittest.cc | 33 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 33 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 239 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 42 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.cc | 95 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.h | 72 |
13 files changed, 585 insertions, 470 deletions
diff --git a/net/net.gyp b/net/net.gyp index 3f96ec1..f60d159 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -858,6 +858,8 @@ 'spdy/spdy_credential_state.h', 'spdy/spdy_frame_builder.cc', 'spdy/spdy_frame_builder.h', + 'spdy/spdy_frame_producer.cc', + 'spdy/spdy_frame_producer.h', 'spdy/spdy_frame_reader.cc', 'spdy/spdy_frame_reader.h', 'spdy/spdy_framer.cc', @@ -883,6 +885,8 @@ 'spdy/spdy_stream.h', 'spdy/spdy_websocket_stream.cc', 'spdy/spdy_websocket_stream.h', + 'spdy/spdy_write_queue.cc', + 'spdy/spdy_write_queue.h', 'ssl/client_cert_store.h', 'ssl/client_cert_store_impl.h', 'ssl/client_cert_store_impl_mac.cc', @@ -1694,6 +1698,7 @@ 'spdy/spdy_websocket_test_util_spdy2.h', 'spdy/spdy_websocket_test_util_spdy3.cc', 'spdy/spdy_websocket_test_util_spdy3.h', + 'spdy/spdy_write_queue_unittest.cc', 'ssl/client_cert_store_impl_unittest.cc', 'ssl/default_server_bound_cert_store_unittest.cc', 'ssl/openssl_client_key_store_unittest.cc', diff --git a/net/spdy/spdy_frame_producer.cc b/net/spdy/spdy_frame_producer.cc new file mode 100644 index 0000000..dc16744 --- /dev/null +++ b/net/spdy/spdy_frame_producer.cc @@ -0,0 +1,26 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_frame_producer.h" + +#include "base/logging.h" +#include "net/spdy/spdy_protocol.h" + +namespace net { + +SpdyFrameProducer::SpdyFrameProducer() {} + +SpdyFrameProducer::~SpdyFrameProducer() {} + +SimpleFrameProducer::SimpleFrameProducer(scoped_ptr<SpdyFrame> frame) + : frame_(frame.Pass()) {} + +SimpleFrameProducer::~SimpleFrameProducer() {} + +scoped_ptr<SpdyFrame> SimpleFrameProducer::ProduceFrame() { + DCHECK(frame_); + return frame_.Pass(); +} + +} // namespace net diff --git a/net/spdy/spdy_frame_producer.h b/net/spdy/spdy_frame_producer.h new file mode 100644 index 0000000..9db20ce --- /dev/null +++ b/net/spdy/spdy_frame_producer.h @@ -0,0 +1,50 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_FRAME_PRODUCER_H_ +#define NET_SPDY_SPDY_FRAME_PRODUCER_H_ + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/net_export.h" + +namespace net { + +class SpdyFrame; + +// An object which provides a SpdyFrame for writing. We pass these +// around instead of SpdyFrames since some frames have to be generated +// "just in time". +class NET_EXPORT_PRIVATE SpdyFrameProducer { + public: + SpdyFrameProducer(); + + // Produces the frame to be written. Will be called at most once. + virtual scoped_ptr<SpdyFrame> ProduceFrame() = 0; + + virtual ~SpdyFrameProducer(); + + private: + DISALLOW_COPY_AND_ASSIGN(SpdyFrameProducer); +}; + +// A simple wrapper around a single SpdyFrame. +class NET_EXPORT_PRIVATE SimpleFrameProducer : public SpdyFrameProducer { + public: + explicit SimpleFrameProducer(scoped_ptr<SpdyFrame> frame); + + virtual ~SimpleFrameProducer(); + + virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE; + + private: + scoped_ptr<SpdyFrame> frame_; + + DISALLOW_COPY_AND_ASSIGN(SimpleFrameProducer); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_FRAME_PRODUCER_H_ diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc index f0c14b1..441b033 100644 --- a/net/spdy/spdy_io_buffer.cc +++ b/net/spdy/spdy_io_buffer.cc @@ -7,23 +7,13 @@ namespace net { -// static -uint64 SpdyIOBuffer::order_ = 0; +SpdyIOBuffer::SpdyIOBuffer() {} -SpdyIOBuffer::SpdyIOBuffer( - IOBuffer* buffer, int size, RequestPriority priority, SpdyStream* stream) - : buffer_(new DrainableIOBuffer(buffer, size)), - priority_(priority), - position_(++order_), - stream_(stream) {} - -SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) { -} +SpdyIOBuffer::SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream) + : buffer_(new DrainableIOBuffer(buffer, size)), stream_(stream) {} SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) { buffer_ = rhs.buffer_; - priority_ = rhs.priority_; - position_ = rhs.position_; stream_ = rhs.stream_; } @@ -31,13 +21,16 @@ SpdyIOBuffer::~SpdyIOBuffer() {} SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) { buffer_ = rhs.buffer_; - priority_ = rhs.priority_; - position_ = rhs.position_; stream_ = rhs.stream_; return *this; } -void SpdyIOBuffer::release() { +void SpdyIOBuffer::Swap(SpdyIOBuffer* other) { + buffer_.swap(other->buffer_); + stream_.swap(other->stream_); +} + +void SpdyIOBuffer::Release() { buffer_ = NULL; stream_ = NULL; } diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h index 9bbee16..ed95e70 100644 --- a/net/spdy/spdy_io_buffer.h +++ b/net/spdy/spdy_io_buffer.h @@ -8,54 +8,47 @@ #include "base/memory/ref_counted.h" #include "net/base/io_buffer.h" #include "net/base/net_export.h" -#include "net/base/request_priority.h" namespace net { class SpdyStream; -// A class for managing SPDY IO buffers. These buffers need to be prioritized -// so that the SpdySession sends them in the right order. Further, they need -// to track the SpdyStream which they are associated with so that incremental -// completion of the IO can notify the appropriate stream of completion. +// A class for managing SPDY write buffers. These write buffers need +// to track the SpdyStream which they are associated with so that the +// session can activate the stream lazily and also notify the stream +// on completion of the write. class NET_EXPORT_PRIVATE SpdyIOBuffer { public: + SpdyIOBuffer(); + // Constructor // |buffer| is the actual data buffer. // |size| is the size of the data buffer. - // |priority| is the priority of this buffer. - // |stream| is a pointer to the stream which is managing this buffer. - SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority, - SpdyStream* stream); + // |stream| is a pointer to the stream which is managing this buffer + // (can be NULL if the write is for the session itself). + SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream); + // Declare this instead of using the default so that we avoid needing to // include spdy_stream.h. SpdyIOBuffer(const SpdyIOBuffer& rhs); - SpdyIOBuffer(); + ~SpdyIOBuffer(); + // Declare this instead of using the default so that we avoid needing to // include spdy_stream.h. SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs); + void Swap(SpdyIOBuffer* other); + + void Release(); + // Accessors. DrainableIOBuffer* buffer() const { return buffer_; } - size_t size() const { return buffer_->size(); } - void release(); - RequestPriority priority() const { return priority_; } const scoped_refptr<SpdyStream>& stream() const { return stream_; } - // Comparison operator to support sorting. - bool operator<(const SpdyIOBuffer& other) const { - if (priority_ != other.priority_) - return priority_ < other.priority_; - return position_ > other.position_; - } - private: scoped_refptr<DrainableIOBuffer> buffer_; - RequestPriority priority_; - uint64 position_; scoped_refptr<SpdyStream> stream_; - static uint64 order_; // Maintains a FIFO order for equal priorities. }; } // namespace net diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 9943190..e34c16f 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -32,6 +32,7 @@ #include "net/http/http_server_properties.h" #include "net/spdy/spdy_credential_builder.h" #include "net/spdy/spdy_frame_builder.h" +#include "net/spdy/spdy_frame_producer.h" #include "net/spdy/spdy_http_utils.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_session_pool.h" @@ -286,28 +287,6 @@ void SpdyStreamRequest::Reset() { callback_.Reset(); } -// static -void SpdySession::SpdyIOBufferProducer::ActivateStream( - SpdySession* spdy_session, - SpdyStream* spdy_stream) { - spdy_session->ActivateStream(spdy_stream); -} - -// static -SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( - SpdyFrame* frame, - RequestPriority priority, - SpdyStream* stream) { - size_t size = frame->size(); - DCHECK_GT(size, 0u); - - // TODO(mbelshe): We have too much copying of data here. - IOBufferWithSize* buffer = new IOBufferWithSize(size); - memcpy(buffer->data(), frame->data(), size); - - return new SpdyIOBuffer(buffer, size, priority, stream); -} - SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, SpdySessionPool* spdy_session_pool, HttpServerProperties* http_server_properties, @@ -404,7 +383,7 @@ SpdySession::~SpdySession() { DCHECK_EQ(0u, num_active_streams()); DCHECK_EQ(0u, num_unclaimed_pushed_streams()); - for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { + for (int i = 0; i < NUM_PRIORITIES; ++i) { DCHECK(pending_create_stream_queues_[i].empty()); } DCHECK(pending_stream_request_completions_.empty()); @@ -499,13 +478,6 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { ssl_info.cert->VerifyNameMatch(domain); } -void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, - SpdyIOBufferProducer* producer) { - write_queue_.push(producer); - stream_producers_[producer] = stream; - WriteSocketLater(); -} - int SpdySession::GetPushStream( const GURL& url, scoped_refptr<SpdyStream>* stream, @@ -659,7 +631,13 @@ int SpdySession::GetProtocolVersion() const { return buffered_spdy_framer_->protocol_version(); } -SpdyFrame* SpdySession::CreateSynStream( +void SpdySession::EnqueueStreamWrite( + SpdyStream* stream, + scoped_ptr<SpdyFrameProducer> producer) { + EnqueueWrite(stream->priority(), producer.Pass(), stream); +} + +scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, @@ -691,15 +669,16 @@ SpdyFrame* SpdySession::CreateSynStream( stream_id, 0)); } - return syn_frame.release(); + return syn_frame.Pass(); } -SpdyFrame* SpdySession::CreateCredentialFrame( +int SpdySession::CreateCredentialFrame( const std::string& origin, SSLClientCertType type, const std::string& key, const std::string& cert, - RequestPriority priority) { + RequestPriority priority, + scoped_ptr<SpdyFrame>* credential_frame) { DCHECK(is_secure_); SSLClientSocket* ssl_socket = GetSSLClientSocket(); DCHECK(ssl_socket); @@ -711,12 +690,12 @@ SpdyFrame* SpdySession::CreateCredentialFrame( size_t slot = credential_state_.SetHasCredential(GURL(origin)); int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot, &credential); - DCHECK_EQ(OK, rv); + DCHECK_NE(rv, ERR_IO_PENDING); if (rv != OK) - return NULL; + return rv; DCHECK(buffered_spdy_framer_.get()); - scoped_ptr<SpdyFrame> credential_frame( + credential_frame->reset( buffered_spdy_framer_->CreateCredentialFrame(credential)); if (net_log().IsLoggingAllEvents()) { @@ -724,10 +703,10 @@ SpdyFrame* SpdySession::CreateCredentialFrame( NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); } - return credential_frame.release(); + return OK; } -SpdyFrame* SpdySession::CreateHeadersFrame( +scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame( SpdyStreamId stream_id, const SpdyHeaderBlock& headers, SpdyControlFlags flags) { @@ -749,12 +728,13 @@ SpdyFrame* SpdySession::CreateHeadersFrame( &headers, fin, /*unidirectional=*/false, stream_id, 0)); } - return frame.release(); + return frame.Pass(); } -SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, - net::IOBuffer* data, int len, - SpdyDataFlags flags) { +scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, + net::IOBuffer* data, + int len, + SpdyDataFlags flags) { // Find our stream CHECK(IsStreamActive(stream_id)); scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; @@ -762,7 +742,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, if (len < 0) { NOTREACHED(); - return NULL; + return scoped_ptr<SpdyFrame>(); } if (len > kMaxSpdyFrameChunkSize) { @@ -785,7 +765,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return NULL; + return scoped_ptr<SpdyFrame>(); } if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { effective_window_size = @@ -797,7 +777,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return NULL; + return scoped_ptr<SpdyFrame>(); } } @@ -828,7 +808,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), static_cast<uint32>(len), flags)); - return frame.release(); + return frame.Pass(); } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { @@ -863,7 +843,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; priority = stream->priority(); } - QueueFrame(rst_frame.release(), priority); + EnqueueSessionWrite(priority, rst_frame.Pass()); RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); @@ -930,57 +910,59 @@ void SpdySession::OnReadComplete(int bytes_read) { } void SpdySession::OnWriteComplete(int result) { + // Releasing the in-flight write can have a side-effect of dropping + // the last reference to |this|. Hold a reference through this + // function. + scoped_refptr<SpdySession> self(this); + DCHECK(write_pending_); - DCHECK(in_flight_write_.size()); + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); last_activity_time_ = base::TimeTicks::Now(); write_pending_ = false; - scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); - - if (result >= 0) { - // It should not be possible to have written more bytes than our - // in_flight_write_. - DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); - - in_flight_write_.buffer()->DidConsume(result); - - // We only notify the stream when we've fully written the pending frame. - if (!in_flight_write_.buffer()->BytesRemaining()) { - if (stream) { - // Report the number of bytes written to the caller, but exclude the - // frame size overhead. NOTE: if this frame was compressed the - // reported bytes written is the compressed size, not the original - // size. - if (result > 0) { - result = in_flight_write_.buffer()->size(); - DCHECK_GE(result, - static_cast<int>( - buffered_spdy_framer_->GetControlFrameHeaderSize())); - result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); - } - - // It is possible that the stream was cancelled while we were writing - // to the socket. - if (!stream->cancelled()) - stream->OnWriteComplete(result); - } + if (result < 0) { + in_flight_write_.Release(); + CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); + return; + } - // Cleanup the write which just completed. - in_flight_write_.release(); - } + // It should not be possible to have written more bytes than our + // in_flight_write_. + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); - // Write more data. We're already in a continuation, so we can - // go ahead and write it immediately (without going back to the - // message loop). - WriteSocketLater(); - } else { - in_flight_write_.release(); + in_flight_write_.buffer()->DidConsume(result); - // The stream is now errored. Close it down. - CloseSessionOnError( - static_cast<net::Error>(result), true, "The stream has errored."); + // We only notify the stream when we've fully written the pending frame. + if (in_flight_write_.buffer()->BytesRemaining() == 0) { + DCHECK_GT(result, 0); + + scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); + + // It is possible that the stream was cancelled while we were writing + // to the socket. + if (stream && !stream->cancelled()) { + // Report the number of bytes written to the caller, but exclude the + // frame size overhead. NOTE: if this frame was compressed the + // reported bytes written is the compressed size, not the original + // size. + result = in_flight_write_.buffer()->size(); + DCHECK_GE(result, + static_cast<int>( + buffered_spdy_framer_->GetControlFrameHeaderSize())); + result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); + + stream->OnWriteComplete(result); + } + + // Cleanup the write which just completed. + in_flight_write_.Release(); } + + // Write more data. We're already in a continuation, so we can go + // ahead and write it immediately (without going back to the message + // loop). + WriteSocketLater(); } net::Error SpdySession::ReadSocket() { @@ -997,7 +979,7 @@ net::Error SpdySession::ReadSocket() { int bytes_read = connection_->socket()->Read( read_buffer_.get(), kReadBufferSize, - base::Bind(&SpdySession::OnReadComplete, base::Unretained(this))); + base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr())); switch (bytes_read) { case 0: // Socket is closed! @@ -1051,24 +1033,39 @@ void SpdySession::WriteSocket() { // Loop sending frames until we've sent everything or until the write // returns error (or ERR_IO_PENDING). DCHECK(buffered_spdy_framer_.get()); - while (in_flight_write_.buffer() || !write_queue_.empty()) { - if (!in_flight_write_.buffer()) { - // Grab the next SpdyBuffer to send. - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); - write_queue_.pop(); - scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); - stream_producers_.erase(producer.get()); + while (true) { + if (in_flight_write_.buffer()) { + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); + } else { + // Grab the next frame to send. + scoped_ptr<SpdyFrameProducer> producer; + scoped_refptr<SpdyStream> stream; + if (!write_queue_.Dequeue(&producer, &stream)) + break; + // It is possible that a stream had data to write, but a // WINDOW_UPDATE frame has been received which made that // stream no longer writable. // TODO(rch): consider handling that case by removing the // stream from the writable queue? - if (buffer == NULL) + if (stream.get() && stream->cancelled()) continue; - in_flight_write_ = *buffer; - } else { - DCHECK(in_flight_write_.buffer()->BytesRemaining()); + if (stream.get() && stream->stream_id() == 0) + ActivateStream(stream); + + scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); + if (!frame) { + NOTREACHED(); + continue; + } + DCHECK_GT(frame->size(), 0u); + + // TODO(mbelshe): We have too much copying of data here. + scoped_refptr<IOBufferWithSize> buffer = + new IOBufferWithSize(frame->size()); + memcpy(buffer->data(), frame->data(), frame->size()); + in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); } write_pending_ = true; @@ -1126,12 +1123,7 @@ void SpdySession::CloseAllStreams(net::Error status) { stream->OnClose(status); } - // We also need to drain the queue. - while (!write_queue_.empty()) { - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); - write_queue_.pop(); - stream_producers_.erase(producer.get()); - } + write_queue_.Clear(); } void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, @@ -1257,33 +1249,18 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { return rv; } -class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { - public: - SimpleSpdyIOBufferProducer(SpdyFrame* frame, - RequestPriority priority) - : frame_(frame), - priority_(priority) { - } - - virtual RequestPriority GetPriority() const OVERRIDE { - return priority_; - } - - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { - return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( - frame_.get(), priority_, NULL); - } - - private: - scoped_ptr<SpdyFrame> frame_; - RequestPriority priority_; -}; +void SpdySession::EnqueueSessionWrite(RequestPriority priority, + scoped_ptr<SpdyFrame> frame) { + EnqueueWrite( + priority, + scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), + NULL); +} -void SpdySession::QueueFrame(SpdyFrame* frame, - RequestPriority priority) { - SimpleSpdyIOBufferProducer* producer = - new SimpleSpdyIOBufferProducer(frame, priority); - write_queue_.push(producer); +void SpdySession::EnqueueWrite(RequestPriority priority, + scoped_ptr<SpdyFrameProducer> producer, + const scoped_refptr<SpdyStream>& stream) { + write_queue_.Enqueue(priority, producer.Pass(), stream); WriteSocketLater(); } @@ -1303,8 +1280,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { // the stream in the unclaimed_pushed_streams_ list. However, if // the stream is errored out, clean it up entirely. if (status != OK) { - PushedStreamMap::iterator it; - for (it = unclaimed_pushed_streams_.begin(); + for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); it != unclaimed_pushed_streams_.end(); ++it) { scoped_refptr<SpdyStream> curr = it->second.first; if (id == curr->stream_id()) { @@ -1315,29 +1291,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { } // The stream might have been deleted. - ActiveStreamMap::iterator it2 = active_streams_.find(id); - if (it2 == active_streams_.end()) + ActiveStreamMap::iterator it = active_streams_.find(id); + if (it == active_streams_.end()) return; - // Possibly remove from the write queue. - WriteQueue old = write_queue_; - write_queue_ = WriteQueue(); - while (!old.empty()) { - scoped_ptr<SpdyIOBufferProducer> producer(old.top()); - old.pop(); - StreamProducerMap::iterator it = stream_producers_.find(producer.get()); - if (it == stream_producers_.end() || it->second->stream_id() != id) { - write_queue_.push(producer.release()); - } else { - stream_producers_.erase(producer.get()); - producer.reset(NULL); - } - } + const scoped_refptr<SpdyStream> stream(it->second); + active_streams_.erase(it); + DCHECK(stream); + + write_queue_.RemovePendingWritesForStream(stream); // If this is an active stream, call the callback. - const scoped_refptr<SpdyStream> stream(it2->second); - active_streams_.erase(it2); - DCHECK(stream); stream->OnClose(status); ProcessPendingStreamRequests(); } @@ -1921,7 +1885,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) { scoped_ptr<SpdyFrame> settings_frame( buffered_spdy_framer_->CreateSettings(settings)); sent_settings_ = true; - QueueFrame(settings_frame.release(), HIGHEST); + EnqueueSessionWrite(HIGHEST, settings_frame.Pass()); } void SpdySession::HandleSetting(uint32 id, uint32 value) { @@ -2009,14 +1973,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyFrame> window_update_frame( buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); - QueueFrame(window_update_frame.release(), priority); + EnqueueSessionWrite(priority, window_update_frame.Pass()); } void SpdySession::WritePingFrame(uint32 unique_id) { DCHECK(buffered_spdy_framer_.get()); scoped_ptr<SpdyFrame> ping_frame( buffered_spdy_framer_->CreatePingFrame(unique_id)); - QueueFrame(ping_frame.release(), HIGHEST); + EnqueueSessionWrite(HIGHEST, ping_frame.Pass()); if (net_log().IsLoggingAllEvents()) { net_log().AddEvent( diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 8849683..85b623e 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -6,20 +6,20 @@ #define NET_SPDY_SPDY_SESSION_H_ #include <deque> -#include <list> #include <map> -#include <queue> #include <set> #include <string> #include "base/basictypes.h" #include "base/gtest_prod_util.h" #include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/time.h" #include "net/base/io_buffer.h" #include "net/base/load_states.h" #include "net/base/net_errors.h" +#include "net/base/net_export.h" #include "net/base/request_priority.h" #include "net/socket/client_socket_handle.h" #include "net/socket/next_proto.h" @@ -31,6 +31,7 @@ #include "net/spdy/spdy_io_buffer.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_session_pool.h" +#include "net/spdy/spdy_write_queue.h" #include "net/ssl/ssl_client_cert_type.h" #include "net/ssl/ssl_config_service.h" @@ -174,29 +175,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, FLOW_CONTROL_STREAM_AND_SESSION }; - // Defines an interface for producing SpdyIOBuffers. - class NET_EXPORT_PRIVATE SpdyIOBufferProducer { - public: - SpdyIOBufferProducer() {} - - // Returns a newly created SpdyIOBuffer, owned by the caller, or NULL - // if not buffer is ready to be produced. - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0; - - virtual RequestPriority GetPriority() const = 0; - - virtual ~SpdyIOBufferProducer() {} - - protected: - // Activates |spdy_stream| in |spdy_session|. - static void ActivateStream(SpdySession* spdy_session, - SpdyStream* spdy_stream); - - static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame, - RequestPriority priority, - SpdyStream* spdy_stream); - }; - // Create a new SpdySession. // |host_port_proxy_pair| is the host/port that this session connects to, and // the proxy configuration settings that it's using. @@ -259,37 +237,41 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // authentication now. bool VerifyDomainAuthentication(const std::string& domain); - // Records that |stream| has a write available from |producer|. - // |producer| will be owned by this SpdySession. - void SetStreamHasWriteAvailable(SpdyStream* stream, - SpdyIOBufferProducer* producer); + // Pushes the given producer into the write queue for + // |stream|. |stream| is guaranteed to be activated before the + // producer is used to produce its frame. + void EnqueueStreamWrite(SpdyStream* stream, + scoped_ptr<SpdyFrameProducer> producer); - // Send the SYN frame for |stream_id|. This also sends PING message to check - // the status of the connection. - SpdyFrame* CreateSynStream( + // Creates and returns a SYN frame for |stream_id|. + scoped_ptr<SpdyFrame> CreateSynStream( SpdyStreamId stream_id, RequestPriority priority, uint8 credential_slot, SpdyControlFlags flags, const SpdyHeaderBlock& headers); - // Write a CREDENTIAL frame to the session. - SpdyFrame* CreateCredentialFrame(const std::string& origin, - SSLClientCertType type, - const std::string& key, - const std::string& cert, - RequestPriority priority); - - // Write a HEADERS frame to the stream. - SpdyFrame* CreateHeadersFrame(SpdyStreamId stream_id, - const SpdyHeaderBlock& headers, - SpdyControlFlags flags); - - // Write a data frame to the stream. - // Used to create and queue a data frame for the given stream. - SpdyFrame* CreateDataFrame(SpdyStreamId stream_id, - net::IOBuffer* data, int len, - SpdyDataFlags flags); + // Tries to create a CREDENTIAL frame. If successful, fills in + // |credential_frame| and returns OK. Returns the error (guaranteed + // to not be ERR_IO_PENDING) otherwise. + int CreateCredentialFrame(const std::string& origin, + SSLClientCertType type, + const std::string& key, + const std::string& cert, + RequestPriority priority, + scoped_ptr<SpdyFrame>* credential_frame); + + // Creates and returns a HEADERS frame. + scoped_ptr<SpdyFrame> CreateHeadersFrame(SpdyStreamId stream_id, + const SpdyHeaderBlock& headers, + SpdyControlFlags flags); + + // Creates and returns a data frame. May return NULL if stalled by + // flow control. + scoped_ptr<SpdyFrame> CreateDataFrame(SpdyStreamId stream_id, + net::IOBuffer* data, + int len, + SpdyDataFlags flags); // Close a stream. void CloseStream(SpdyStreamId stream_id, int status); @@ -473,19 +455,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> > PushedStreamMap; typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet; - typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap; - - class SpdyIOBufferProducerCompare { - public: - bool operator() (const SpdyIOBufferProducer* lhs, - const SpdyIOBufferProducer* rhs) const { - return lhs->GetPriority() < rhs->GetPriority(); - } - }; - - typedef std::priority_queue<SpdyIOBufferProducer*, - std::vector<SpdyIOBufferProducer*>, - SpdyIOBufferProducerCompare> WriteQueue; enum State { IDLE, @@ -568,10 +537,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Get a new stream id. int GetNewStreamId(); - // Queue a frame for sending. - // |frame| is the frame to send. - // |priority| is the priority for insertion into the queue. - void QueueFrame(SpdyFrame* frame, RequestPriority priority); + // Pushes the given frame with the given priority into the write + // queue for the session. + void EnqueueSessionWrite(RequestPriority priority, + scoped_ptr<SpdyFrame> frame); + + // Puts |producer| associated with |stream| onto the write queue + // with the given priority. + void EnqueueWrite(RequestPriority priority, + scoped_ptr<SpdyFrameProducer> producer, + const scoped_refptr<SpdyStream>& stream); // Track active streams in the active stream list. void ActivateStream(SpdyStream* stream); @@ -755,13 +730,8 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Set of all created streams but that have not yet sent any frames. CreatedStreamSet created_streams_; - // As streams have data to be sent, we put them into the write queue. - WriteQueue write_queue_; - - // Mapping from SpdyIOBufferProducers to their corresponding SpdyStream - // so that when a stream is destroyed, we can remove the corresponding - // producer from |write_queue_|. - StreamProducerMap stream_producers_; + // The write queue. + SpdyWriteQueue write_queue_; // The packet we are currently sending. bool write_pending_; // Will be true when a write is in progress. diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index c76eef3..e58f736 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -103,39 +103,6 @@ class SpdySessionSpdy2Test : public PlatformTest { HostPortProxyPair pair_; }; -// Test the SpdyIOBuffer class. -TEST_F(SpdySessionSpdy2Test, SpdyIOBuffer) { - std::priority_queue<SpdyIOBuffer> queue_; - const size_t kQueueSize = 100; - - // Insert items with random priority and increasing buffer size. - for (size_t index = 0; index < kQueueSize; ++index) { - queue_.push(SpdyIOBuffer( - new IOBufferWithSize(index + 1), - index + 1, - static_cast<RequestPriority>(rand() % NUM_PRIORITIES), - NULL)); - } - - EXPECT_EQ(kQueueSize, queue_.size()); - - // Verify items come out with decreasing priority or FIFO order. - RequestPriority last_priority = NUM_PRIORITIES; - size_t last_size = 0; - for (size_t index = 0; index < kQueueSize; ++index) { - SpdyIOBuffer buffer = queue_.top(); - EXPECT_LE(buffer.priority(), last_priority); - if (buffer.priority() < last_priority) - last_size = 0; - EXPECT_LT(last_size, buffer.size()); - last_priority = buffer.priority(); - last_size = buffer.size(); - queue_.pop(); - } - - EXPECT_EQ(0u, queue_.size()); -} - TEST_F(SpdySessionSpdy2Test, GoAway) { session_deps_.host_resolver->set_synchronous_mode(true); diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 519f1d3..a2a5dba 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -120,39 +120,6 @@ class SpdySessionSpdy3Test : public PlatformTest { HostPortProxyPair pair_; }; -// Test the SpdyIOBuffer class. -TEST_F(SpdySessionSpdy3Test, SpdyIOBuffer) { - std::priority_queue<SpdyIOBuffer> queue_; - const size_t kQueueSize = 100; - - // Insert items with random priority and increasing buffer size. - for (size_t index = 0; index < kQueueSize; ++index) { - queue_.push(SpdyIOBuffer( - new IOBufferWithSize(index + 1), - index + 1, - static_cast<RequestPriority>(rand() % NUM_PRIORITIES), - NULL)); - } - - EXPECT_EQ(kQueueSize, queue_.size()); - - // Verify items come out with decreasing priority or FIFO order. - RequestPriority last_priority = NUM_PRIORITIES; - size_t last_size = 0; - for (size_t index = 0; index < kQueueSize; ++index) { - SpdyIOBuffer buffer = queue_.top(); - EXPECT_LE(buffer.priority(), last_priority); - if (buffer.priority() < last_priority) - last_size = 0; - EXPECT_LT(last_size, buffer.size()); - last_priority = buffer.priority(); - last_size = buffer.size(); - queue_.pop(); - } - - EXPECT_EQ(0u, queue_.size()); -} - TEST_F(SpdySessionSpdy3Test, GoAway) { session_deps_.host_resolver->set_synchronous_mode(true); diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 809c35bb1..70412dd 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -10,6 +10,7 @@ #include "base/message_loop.h" #include "base/stringprintf.h" #include "base/values.h" +#include "net/spdy/spdy_frame_producer.h" #include "net/spdy/spdy_http_utils.h" #include "net/spdy/spdy_session.h" @@ -50,6 +51,57 @@ bool ContainsUpperAscii(const std::string& str) { } // namespace +// A wrapper around a stream that calls into ProduceSynStreamFrame(). +class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer { + public: + SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream) + : stream_(stream) { + DCHECK(stream_); + } + + virtual ~SynStreamFrameProducer() {} + + virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { + if (!stream_) { + NOTREACHED(); + return scoped_ptr<SpdyFrame>(); + } + DCHECK_GT(stream_->stream_id(), 0u); + return stream_->ProduceSynStreamFrame(); + } + + private: + const base::WeakPtr<SpdyStream> stream_; +}; + +// A wrapper around a stream that calls into ProduceHeaderFrame() with +// a given header block. +class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer { + public: + HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream, + scoped_ptr<SpdyHeaderBlock> headers) + : stream_(stream), + headers_(headers.Pass()) { + DCHECK(stream_); + DCHECK(headers_); + } + + virtual ~HeaderFrameProducer() {} + + virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { + if (!stream_) { + NOTREACHED(); + return scoped_ptr<SpdyFrame>(); + } + DCHECK_GT(stream_->stream_id(), 0u); + return stream_->ProduceHeaderFrame(headers_.Pass()); + } + + private: + const base::WeakPtr<SpdyStream> stream_; + scoped_ptr<SpdyHeaderBlock> headers_; +}; + SpdyStream::SpdyStream(SpdySession* session, const std::string& path, RequestPriority priority, @@ -83,99 +135,8 @@ SpdyStream::SpdyStream(SpdySession* session, domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) { } -class SpdyStream::SpdyStreamIOBufferProducer - : public SpdySession::SpdyIOBufferProducer { - public: - SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {} - - // SpdyFrameProducer - virtual RequestPriority GetPriority() const OVERRIDE { - return stream_->priority(); - } - - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { - if (stream_->cancelled()) - return NULL; - if (stream_->stream_id() == 0) - SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_); - frame_ = stream_->ProduceNextFrame(); - return frame_ == NULL ? NULL : - SpdySession::SpdyIOBufferProducer::CreateIOBuffer( - frame_.get(), GetPriority(), stream_); - } - - private: - scoped_refptr<SpdyStream> stream_; - scoped_ptr<SpdyFrame> frame_; -}; - -void SpdyStream::SetHasWriteAvailable() { - session_->SetStreamHasWriteAvailable(this, - new SpdyStreamIOBufferProducer(this)); -} - -scoped_ptr<SpdyFrame> SpdyStream::ProduceNextFrame() { - if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) { - CHECK(request_.get()); - CHECK_GT(stream_id_, 0u); - - std::string origin = GetUrl().GetOrigin().spec(); - DCHECK(origin[origin.length() - 1] == '/'); - origin.erase(origin.length() - 1); // Trim trailing slash. - scoped_ptr<SpdyFrame> frame(session_->CreateCredentialFrame( - origin, domain_bound_cert_type_, domain_bound_private_key_, - domain_bound_cert_, priority_)); - return frame.Pass(); - } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) { - CHECK(request_.get()); - CHECK_GT(stream_id_, 0u); - - SpdyControlFlags flags = - has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; - scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( - stream_id_, priority_, slot_, flags, *request_)); - send_time_ = base::TimeTicks::Now(); - return frame.Pass(); - } else { - CHECK(!cancelled()); - // We must need to write stream data. - // Until the headers have been completely sent, we can not be sure - // that our stream_id is correct. - DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); - DCHECK_GT(stream_id_, 0u); - DCHECK(!pending_frames_.empty()); - - PendingFrame frame = pending_frames_.front(); - pending_frames_.pop_front(); - - waiting_completions_.push_back(frame.type); - - if (frame.type == TYPE_DATA) { - // Send queued data frame. - return scoped_ptr<SpdyFrame>(frame.data_frame); - } else { - DCHECK(frame.type == TYPE_HEADERS); - // Create actual HEADERS frame just in time because it depends on - // compression context and should not be reordered after the creation. - scoped_ptr<SpdyFrame> header_frame(session_->CreateHeadersFrame( - stream_id_, *frame.header_block, SpdyControlFlags())); - delete frame.header_block; - return header_frame.Pass(); - } - } - NOTREACHED(); -} - SpdyStream::~SpdyStream() { UpdateHistograms(); - while (!pending_frames_.empty()) { - PendingFrame frame = pending_frames_.back(); - pending_frames_.pop_back(); - if (frame.type == TYPE_DATA) - delete frame.data_frame; - else - delete frame.header_block; - } } void SpdyStream::SetDelegate(Delegate* delegate) { @@ -228,6 +189,35 @@ void SpdyStream::PushedStreamReplayData() { } } +scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { + CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE); + CHECK(request_.get()); + CHECK_GT(stream_id_, 0u); + + SpdyControlFlags flags = + has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; + scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( + stream_id_, priority_, slot_, flags, *request_)); + send_time_ = base::TimeTicks::Now(); + return frame.Pass(); +} + +scoped_ptr<SpdyFrame> SpdyStream::ProduceHeaderFrame( + scoped_ptr<SpdyHeaderBlock> header_block) { + CHECK(!cancelled()); + // We must need to write stream data. + // Until the headers have been completely sent, we can not be sure + // that our stream_id is correct. + DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); + DCHECK_GT(stream_id_, 0u); + + // Create actual HEADERS frame just in time because it depends on + // compression context and should not be reordered after the creation. + scoped_ptr<SpdyFrame> header_frame(session_->CreateHeadersFrame( + stream_id_, *header_block, SpdyControlFlags())); + return header_frame.Pass(); +} + void SpdyStream::DetachDelegate() { delegate_ = NULL; if (!closed()) @@ -578,14 +568,13 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); CHECK_GT(stream_id_, 0u); - PendingFrame frame; - frame.type = TYPE_HEADERS; - // |frame.header_block| is deleted by either ProduceNextFrame() or - // the destructor. - frame.header_block = headers.release(); - pending_frames_.push_back(frame); + waiting_completions_.push_back(TYPE_HEADERS); - SetHasWriteAvailable(); + session_->EnqueueStreamWrite( + this, + scoped_ptr<SpdyFrameProducer>( + new HeaderFrameProducer( + weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); } void SpdyStream::QueueStreamData(IOBuffer* data, @@ -595,20 +584,19 @@ void SpdyStream::QueueStreamData(IOBuffer* data, // that our stream_id is correct. DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); CHECK_GT(stream_id_, 0u); + CHECK(!cancelled()); scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame( stream_id_, data, length, flags)); if (!data_frame) return; - PendingFrame frame; - frame.type = TYPE_DATA; - // |frame.data_frame| is either returned by ProduceNextFrame() or - // deleted in the destructor. - frame.data_frame = data_frame.release(); - pending_frames_.push_back(frame); + waiting_completions_.push_back(TYPE_DATA); - SetHasWriteAvailable(); + session_->EnqueueStreamWrite( + this, + scoped_ptr<SpdyFrameProducer>( + new SimpleFrameProducer(data_frame.Pass()))); } bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, @@ -693,7 +681,7 @@ int SpdyStream::DoLoop(int result) { break; // State machine 2: connection is established. // In STATE_OPEN, OnResponseReceived has already been called. - // OnDataReceived, OnClose and OnWriteCompelte can be called. + // OnDataReceived, OnClose and OnWriteComplete can be called. // Only OnWriteComplete calls DoLoop((). // // For HTTP streams, no data is sent from the client while in the OPEN @@ -763,7 +751,37 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) { int SpdyStream::DoSendDomainBoundCert() { io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; CHECK(request_.get()); - SetHasWriteAvailable(); + + std::string origin = GetUrl().GetOrigin().spec(); + DCHECK(origin[origin.length() - 1] == '/'); + origin.erase(origin.length() - 1); // Trim trailing slash. + scoped_ptr<SpdyFrame> frame; + int rv = session_->CreateCredentialFrame( + origin, domain_bound_cert_type_, domain_bound_private_key_, + domain_bound_cert_, priority_, &frame); + if (rv != OK) { + DCHECK_NE(rv, ERR_IO_PENDING); + return rv; + } + + DCHECK(frame); + // TODO(akalin): Fix a couple of race conditions: + // + // 1) Since this counts as a write for this stream, the stream will + // be activated (and hence allocated a stream ID) before this frame + // is sent, even though the ID should only be activated for the + // SYN_STREAM frame. This can be solved by signalling to the session + // when we're sending a SYN_STREAM frame, and have it only activate + // the stream then. + // + // 2) Since this is decoupled from sending the SYN_STREAM frame, it + // is possible that other domain-bound cert frames will clobber ours + // before our SYN_STREAM frame gets sent. This can be solved by + // immediately enqueueing the SYN_STREAM frame here and adjusting + // the state machine appropriately. + session_->EnqueueStreamWrite( + this, + scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass()))); return ERR_IO_PENDING; } @@ -777,9 +795,12 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) { int SpdyStream::DoSendHeaders() { CHECK(!cancelled_); - - SetHasWriteAvailable(); io_state_ = STATE_SEND_HEADERS_COMPLETE; + + session_->EnqueueStreamWrite( + this, + scoped_ptr<SpdyFrameProducer>( + new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr()))); return ERR_IO_PENDING; } diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 3ad34f9..293bc20 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -5,7 +5,7 @@ #ifndef NET_SPDY_SPDY_STREAM_H_ #define NET_SPDY_SPDY_STREAM_H_ -#include <list> +#include <deque> #include <string> #include <vector> @@ -102,15 +102,6 @@ class NET_EXPORT_PRIVATE SpdyStream TYPE_DATA }; - // Structure to contains pending frame information. - typedef struct { - FrameType type; - union { - SpdyHeaderBlock* header_block; - SpdyFrame* data_frame; - }; - } PendingFrame; - // SpdyStream constructor SpdyStream(SpdySession* session, const std::string& path, @@ -299,7 +290,8 @@ class NET_EXPORT_PRIVATE SpdyStream int GetProtocolVersion() const; private: - class SpdyStreamIOBufferProducer; + class SynStreamFrameProducer; + class HeaderFrameProducer; enum State { STATE_NONE, @@ -346,13 +338,14 @@ class NET_EXPORT_PRIVATE SpdyStream // the MessageLoop to replay all the data that the server has already sent. void PushedStreamReplayData(); - // Informs the SpdySession that this stream has a write available. - void SetHasWriteAvailable(); + // Produces the SYN_STREAM frame for the stream. The stream must + // already be activated. + scoped_ptr<SpdyFrame> ProduceSynStreamFrame(); - // Returns a newly created SPDY frame owned by the called that contains - // the next frame to be sent by this frame. May return NULL if this - // stream has become stalled on flow control. - scoped_ptr<SpdyFrame> ProduceNextFrame(); + // Produce the initial HEADER frame for the stream with the given + // block. The stream must already be activated. + scoped_ptr<SpdyFrame> ProduceHeaderFrame( + scoped_ptr<SpdyHeaderBlock> header_block); // If the stream is active and stream flow control is turned on, // called by OnDataReceived (which is in turn called by the session) @@ -398,14 +391,13 @@ class NET_EXPORT_PRIVATE SpdyStream scoped_ptr<SpdyHeaderBlock> response_; base::Time response_time_; - // An in order list of pending frame data that are going to be sent. HEADERS - // frames are queued as SpdyHeaderBlock structures because these must be - // compressed just before sending. Data frames are queued as SpdyDataFrame. - std::list<PendingFrame> pending_frames_; - - // An in order list of sending frame types. It will be used to know which type - // of frame is sent and which callback should be invoked in OnOpen(). - std::list<FrameType> waiting_completions_; + // An in order list of sending frame types. Used communicate to the + // delegate which type of frame was sent in DoOpen(). + // + // TODO(akalin): We can remove the need for this queue if we add an + // OnFrameSent() callback to SpdyFrameProducer and have the session + // call that instead of SpdyStream::OnWriteComplete(). + std::deque<FrameType> waiting_completions_; State io_state_; diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc new file mode 100644 index 0000000..d1f7fb6 --- /dev/null +++ b/net/spdy/spdy_write_queue.cc @@ -0,0 +1,95 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_write_queue.h" + +#include <cstddef> + +#include "base/logging.h" +#include "net/spdy/spdy_frame_producer.h" +#include "net/spdy/spdy_stream.h" + +namespace net { + +SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {} + +SpdyWriteQueue::PendingWrite::PendingWrite( + SpdyFrameProducer* frame_producer, + const scoped_refptr<SpdyStream>& stream) + : frame_producer(frame_producer), + stream(stream) {} + +SpdyWriteQueue::PendingWrite::~PendingWrite() {} + +SpdyWriteQueue::SpdyWriteQueue() {} + +SpdyWriteQueue::~SpdyWriteQueue() { + Clear(); +} + +void SpdyWriteQueue::Enqueue(RequestPriority priority, + scoped_ptr<SpdyFrameProducer> frame_producer, + const scoped_refptr<SpdyStream>& stream) { + if (stream.get()) { + DCHECK_EQ(stream->priority(), priority); + } + queue_[priority].push_back(PendingWrite(frame_producer.release(), stream)); +} + +bool SpdyWriteQueue::Dequeue(scoped_ptr<SpdyFrameProducer>* frame_producer, + scoped_refptr<SpdyStream>* stream) { + for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { + if (!queue_[i].empty()) { + PendingWrite pending_write = queue_[i].front(); + queue_[i].pop_front(); + frame_producer->reset(pending_write.frame_producer); + *stream = pending_write.stream; + return true; + } + } + return false; +} + +void SpdyWriteQueue::RemovePendingWritesForStream( + const scoped_refptr<SpdyStream>& stream) { + DCHECK(stream.get()); + if (DCHECK_IS_ON()) { + // |stream| should not have pending writes in a queue not matching + // its priority. + for (int i = 0; i < NUM_PRIORITIES; ++i) { + if (stream->priority() == i) + continue; + for (std::deque<PendingWrite>::const_iterator it = queue_[i].begin(); + it != queue_[i].end(); ++it) { + DCHECK_NE(it->stream, stream); + } + } + } + + // Do the actual deletion and removal, preserving FIFO-ness. + std::deque<PendingWrite>* queue = &queue_[stream->priority()]; + std::deque<PendingWrite>::iterator out_it = queue->begin(); + for (std::deque<PendingWrite>::const_iterator it = queue->begin(); + it != queue->end(); ++it) { + if (it->stream == stream) { + delete it->frame_producer; + } else { + *out_it = *it; + ++out_it; + } + } + queue->erase(out_it, queue->end()); +} + +void SpdyWriteQueue::Clear() { + for (int i = 0; i < NUM_PRIORITIES; ++i) { + for (std::deque<PendingWrite>::iterator it = queue_[i].begin(); + it != queue_[i].end(); ++it) { + delete it->frame_producer; + } + queue_[i].clear(); + } +} + +} // namespace net diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h new file mode 100644 index 0000000..7f5451ad --- /dev/null +++ b/net/spdy/spdy_write_queue.h @@ -0,0 +1,72 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_WRITE_QUEUE_H_ +#define NET_SPDY_SPDY_WRITE_QUEUE_H_ + +#include <deque> + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/net_export.h" +#include "net/base/request_priority.h" + +namespace net { + +class SpdyFrameProducer; +class SpdyStream; + +// A queue of SpdyFrameProducers to produce frames to write. Ordered +// by priority, and then FIFO. +class NET_EXPORT_PRIVATE SpdyWriteQueue { + public: + SpdyWriteQueue(); + ~SpdyWriteQueue(); + + // Enqueues the given frame producer at the given priority + // associated with the given stream, which may be NULL if the frame + // producer is not associated with a stream. If |stream| is + // non-NULL, its priority must be equal to |priority|. + void Enqueue(RequestPriority priority, + scoped_ptr<SpdyFrameProducer> frame_producer, + const scoped_refptr<SpdyStream>& stream); + + // Dequeues the frame producer with the highest priority that was + // enqueued the earliest and its associated stream. Returns true and + // fills in |frame_producer| and |stream| if successful -- + // otherwise, just returns false. + bool Dequeue(scoped_ptr<SpdyFrameProducer>* frame_producer, + scoped_refptr<SpdyStream>* stream); + + // Removes all pending writes for the given stream, which must be + // non-NULL. + void RemovePendingWritesForStream(const scoped_refptr<SpdyStream>& stream); + + // Removes all pending writes. + void Clear(); + + private: + // A struct holding a frame producer and its associated stream. + struct PendingWrite { + // This has to be a raw pointer since we store this in an STL + // container. + SpdyFrameProducer* frame_producer; + scoped_refptr<SpdyStream> stream; + + PendingWrite(); + PendingWrite(SpdyFrameProducer* frame_producer, + const scoped_refptr<SpdyStream>& stream); + ~PendingWrite(); + }; + + // The actual write queue, binned by priority. + std::deque<PendingWrite> queue_[NUM_PRIORITIES]; + + DISALLOW_COPY_AND_ASSIGN(SpdyWriteQueue); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_WRITE_QUEUE_H_ |