diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-14 05:51:44 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-14 05:51:44 +0000 |
commit | d68d318078d07c351c926a4788873bdc7102e448 (patch) | |
tree | 5104c9009c626586cdd09bc33eb3f4007279fc99 /net | |
parent | f4213ec4fcef066d281947901369b5ee88ca0735 (diff) | |
download | chromium_src-d68d318078d07c351c926a4788873bdc7102e448.zip chromium_src-d68d318078d07c351c926a4788873bdc7102e448.tar.gz chromium_src-d68d318078d07c351c926a4788873bdc7102e448.tar.bz2 |
[SPDY] Make SpdyWriteQueue use WeakPtrs and not scoped_refptrs for SpdyStream
Add check that the SpdyStream remains non-NULL while its
in the SpdyWriteQueue.
Make ActivateStream() take a scoped_refptr<> and make it do
a single map loop.
Make SpdyStream functions use SpdyStream::GetWeakPtr().
BUG=178943
Review URL: https://chromiumcodereview.appspot.com/15102014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@199930 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_session.cc | 26 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 6 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 25 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.cc | 18 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue.h | 17 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue_unittest.cc | 46 |
6 files changed, 72 insertions, 66 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 1b62517..f17db79 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -688,7 +688,7 @@ bool SpdySession::CloseOneIdleConnection() { } void SpdySession::EnqueueStreamWrite( - SpdyStream* stream, + const base::WeakPtr<SpdyStream>& stream, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer) { DCHECK(frame_type == HEADERS || @@ -1161,7 +1161,7 @@ void SpdySession::WriteSocket() { // Grab the next frame to send. SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) break; @@ -1172,7 +1172,7 @@ void SpdySession::WriteSocket() { // guarantee monotonically-increasing stream IDs. if (frame_type == SYN_STREAM) { if (stream && stream->stream_id() == 0) { - ActivateStream(stream.get()); + ActivateStream(scoped_refptr<SpdyStream>(stream.get())); } else { NOTREACHED(); continue; @@ -1403,26 +1403,26 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority, scoped_ptr<SpdyBufferProducer>( new SimpleBufferProducer( scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), - NULL); + base::WeakPtr<SpdyStream>()); } void SpdySession::EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer, - const scoped_refptr<SpdyStream>& stream) { + const base::WeakPtr<SpdyStream>& stream) { write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); WriteSocketLater(); } -void SpdySession::ActivateStream(SpdyStream* stream) { +void SpdySession::ActivateStream(const scoped_refptr<SpdyStream>& stream) { if (stream->stream_id() == 0) { stream->set_stream_id(GetNewStreamId()); - created_streams_.erase(scoped_refptr<SpdyStream>(stream)); + created_streams_.erase(stream); } - const SpdyStreamId id = stream->stream_id(); - DCHECK(!IsStreamActive(id)); - - active_streams_[id] = stream; + ActiveStreamMap::value_type entry(stream->stream_id(), stream); + ActiveStreamMap::iterator it = active_streams_.lower_bound(entry.first); + DCHECK(it == active_streams_.end() || it->second->stream_id() != entry.first); + ignore_result(active_streams_.insert(it, entry)); } void SpdySession::DeleteStream(SpdyStreamId id, int status) { @@ -1462,7 +1462,7 @@ void SpdySession::DeleteStreamRefs(scoped_refptr<SpdyStream>* last_ref, in_flight_write_stream_ = NULL; } - write_queue_.RemovePendingWritesForStream(*last_ref); + write_queue_.RemovePendingWritesForStream((*last_ref)->GetWeakPtr()); (*last_ref)->OnClose(status); @@ -1730,7 +1730,7 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> ( stream, time_func_()); - ActivateStream(stream.get()); + ActivateStream(stream); stream->set_response_received(); stream = NULL; diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index e838fa6..e571ddd 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -253,7 +253,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Pushes the given producer into the write queue for // |stream|. |stream| is guaranteed to be activated before the // producer is used to produce its frame. - void EnqueueStreamWrite(SpdyStream* stream, + void EnqueueStreamWrite(const base::WeakPtr<SpdyStream>& stream, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer); @@ -601,10 +601,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, void EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> producer, - const scoped_refptr<SpdyStream>& stream); + const base::WeakPtr<SpdyStream>& stream); // Track active streams in the active stream list. - void ActivateStream(SpdyStream* stream); + void ActivateStream(const scoped_refptr<SpdyStream>& stream); // If there is an active stream with the given ID, delete it. There // must be no external references to that active stream. Also note diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index a4ba04c..b83e167 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -153,8 +153,7 @@ void SpdyStream::SetDelegate(Delegate* delegate) { CHECK(response_received()); MessageLoop::current()->PostTask( FROM_HERE, - base::Bind(&SpdyStream::PushedStreamReplayData, - weak_ptr_factory_.GetWeakPtr())); + base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr())); } else { continue_buffering_data_ = false; } @@ -535,8 +534,7 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { DecreaseRecvWindowSize(static_cast<int32>(length)); buffer->AddConsumeCallback( - base::Bind(&SpdyStream::OnReadBufferConsumed, - weak_ptr_factory_.GetWeakPtr())); + base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr())); } // Track our bandwidth. @@ -626,10 +624,9 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { CHECK_GT(stream_id_, 0u); session_->EnqueueStreamWrite( - this, HEADERS, + GetWeakPtr(), HEADERS, scoped_ptr<SpdyBufferProducer>( - new HeaderBufferProducer( - weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); + new HeaderBufferProducer(GetWeakPtr(), headers.Pass()))); } void SpdyStream::QueueStreamData(IOBuffer* data, @@ -658,12 +655,11 @@ void SpdyStream::QueueStreamData(IOBuffer* data, // here anyway just in case this changes. data_buffer->AddConsumeCallback( base::Bind(&SpdyStream::OnWriteBufferConsumed, - weak_ptr_factory_.GetWeakPtr(), - payload_size)); + GetWeakPtr(), payload_size)); } session_->EnqueueStreamWrite( - this, DATA, + GetWeakPtr(), DATA, scoped_ptr<SpdyBufferProducer>( new SimpleBufferProducer(data_buffer.Pass()))); } @@ -814,8 +810,7 @@ int SpdyStream::DoGetDomainBoundCert() { int rv = sbc_service->GetDomainBoundCert( url.GetOrigin().host(), requested_cert_types, &domain_bound_cert_type_, &domain_bound_private_key_, &domain_bound_cert_, - base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, - weak_ptr_factory_.GetWeakPtr()), + base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, GetWeakPtr()), &domain_bound_cert_request_handle_); return rv; } @@ -854,7 +849,7 @@ int SpdyStream::DoSendDomainBoundCert() { // immediately enqueueing the SYN_STREAM frame here and adjusting // the state machine appropriately. session_->EnqueueStreamWrite( - this, CREDENTIAL, + GetWeakPtr(), CREDENTIAL, scoped_ptr<SpdyBufferProducer>( new SimpleBufferProducer( scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()))))); @@ -871,9 +866,9 @@ int SpdyStream::DoSendHeaders() { io_state_ = STATE_SEND_HEADERS_COMPLETE; session_->EnqueueStreamWrite( - this, SYN_STREAM, + GetWeakPtr(), SYN_STREAM, scoped_ptr<SpdyBufferProducer>( - new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr()))); + new SynStreamBufferProducer(GetWeakPtr()))); return ERR_IO_PENDING; } diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc index e3691e20..b4e7376 100644 --- a/net/spdy/spdy_write_queue.cc +++ b/net/spdy/spdy_write_queue.cc @@ -18,10 +18,11 @@ SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {} SpdyWriteQueue::PendingWrite::PendingWrite( SpdyFrameType frame_type, SpdyBufferProducer* frame_producer, - const scoped_refptr<SpdyStream>& stream) + const base::WeakPtr<SpdyStream>& stream) : frame_type(frame_type), frame_producer(frame_producer), - stream(stream) {} + stream(stream), + has_stream(stream != NULL) {} SpdyWriteQueue::PendingWrite::~PendingWrite() {} @@ -34,17 +35,16 @@ SpdyWriteQueue::~SpdyWriteQueue() { void SpdyWriteQueue::Enqueue(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> frame_producer, - const scoped_refptr<SpdyStream>& stream) { - if (stream.get()) { + const base::WeakPtr<SpdyStream>& stream) { + if (stream) DCHECK_EQ(stream->priority(), priority); - } queue_[priority].push_back( PendingWrite(frame_type, frame_producer.release(), stream)); } bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type, scoped_ptr<SpdyBufferProducer>* frame_producer, - scoped_refptr<SpdyStream>* stream) { + base::WeakPtr<SpdyStream>* stream) { for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { if (!queue_[i].empty()) { PendingWrite pending_write = queue_[i].front(); @@ -52,6 +52,8 @@ bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type, *frame_type = pending_write.frame_type; frame_producer->reset(pending_write.frame_producer); *stream = pending_write.stream; + if (pending_write.has_stream) + DCHECK(*stream); return true; } } @@ -59,8 +61,8 @@ bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type, } void SpdyWriteQueue::RemovePendingWritesForStream( - const scoped_refptr<SpdyStream>& stream) { - DCHECK(stream.get()); + const base::WeakPtr<SpdyStream>& stream) { + DCHECK(stream); if (DCHECK_IS_ON()) { // |stream| should not have pending writes in a queue not matching // its priority. diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h index 1d5556b..fa194ef 100644 --- a/net/spdy/spdy_write_queue.h +++ b/net/spdy/spdy_write_queue.h @@ -8,8 +8,8 @@ #include <deque> #include "base/basictypes.h" -#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "net/base/net_export.h" #include "net/base/request_priority.h" #include "net/spdy/spdy_protocol.h" @@ -30,11 +30,12 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { // Enqueues the given frame producer of the given type at the given // priority associated with the given stream, which may be NULL if // the frame producer is not associated with a stream. If |stream| - // is non-NULL, its priority must be equal to |priority|. + // is non-NULL, its priority must be equal to |priority|, and it + // must remain non-NULL until the write is dequeued or removed. void Enqueue(RequestPriority priority, SpdyFrameType frame_type, scoped_ptr<SpdyBufferProducer> frame_producer, - const scoped_refptr<SpdyStream>& stream); + const base::WeakPtr<SpdyStream>& stream); // Dequeues the frame producer with the highest priority that was // enqueued the earliest and its associated stream. Returns true and @@ -42,11 +43,11 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { // successful -- otherwise, just returns false. bool Dequeue(SpdyFrameType* frame_type, scoped_ptr<SpdyBufferProducer>* frame_producer, - scoped_refptr<SpdyStream>* stream); + base::WeakPtr<SpdyStream>* stream); // Removes all pending writes for the given stream, which must be // non-NULL. - void RemovePendingWritesForStream(const scoped_refptr<SpdyStream>& stream); + void RemovePendingWritesForStream(const base::WeakPtr<SpdyStream>& stream); // Removes all pending writes for streams after |last_good_stream_id| // and streams with no stream id. @@ -62,12 +63,14 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { // This has to be a raw pointer since we store this in an STL // container. SpdyBufferProducer* frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; + // Whether |stream| was non-NULL when enqueued. + bool has_stream; PendingWrite(); PendingWrite(SpdyFrameType frame_type, SpdyBufferProducer* frame_producer, - const scoped_refptr<SpdyStream>& stream); + const base::WeakPtr<SpdyStream>& stream); ~PendingWrite(); }; diff --git a/net/spdy/spdy_write_queue_unittest.cc b/net/spdy/spdy_write_queue_unittest.cc index 564cdbe..9d85863 100644 --- a/net/spdy/spdy_write_queue_unittest.cc +++ b/net/spdy/spdy_write_queue_unittest.cc @@ -75,21 +75,21 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM"); scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST"); - // A NULL stream should still work. - scoped_refptr<SpdyStream> stream_low(NULL); scoped_refptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM)); scoped_refptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST)); + // A NULL stream should still work. write_queue.Enqueue( - LOW, SYN_STREAM, producer_low.Pass(), stream_low); + LOW, SYN_STREAM, producer_low.Pass(), base::WeakPtr<SpdyStream>()); write_queue.Enqueue( - MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium); + MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium->GetWeakPtr()); write_queue.Enqueue( - HIGHEST, RST_STREAM, producer_highest.Pass(), stream_highest); + HIGHEST, RST_STREAM, producer_highest.Pass(), + stream_highest->GetWeakPtr()); SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(RST_STREAM, frame_type); EXPECT_EQ("HIGHEST", ProducerToString(frame_producer.Pass())); @@ -103,7 +103,7 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(SYN_STREAM, frame_type); EXPECT_EQ("LOW", ProducerToString(frame_producer.Pass())); - EXPECT_EQ(stream_low, stream); + EXPECT_EQ(NULL, stream.get()); EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); } @@ -121,13 +121,16 @@ TEST_F(SpdyWriteQueueTest, DequeuesFIFO) { scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); scoped_refptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); - write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(), stream1); - write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(), stream2); - write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(), stream3); + write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(), + stream1->GetWeakPtr()); + write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(), + stream2->GetWeakPtr()); + write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(), + stream3->GetWeakPtr()); SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(SYN_STREAM, frame_type); EXPECT_EQ(1, ProducerToInt(frame_producer.Pass())); @@ -157,15 +160,16 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { for (int i = 0; i < 100; ++i) { scoped_refptr<SpdyStream> stream = ((i % 3) == 0) ? stream1 : stream2; - write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream); + write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), + stream->GetWeakPtr()); } - write_queue.RemovePendingWritesForStream(stream2); + write_queue.RemovePendingWritesForStream(stream2->GetWeakPtr()); for (int i = 0; i < 100; i += 3) { SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(SYN_STREAM, frame_type); EXPECT_EQ(i, ProducerToInt(frame_producer.Pass())); @@ -174,7 +178,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); } @@ -199,7 +203,8 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) { for (int i = 0; i < 100; ++i) { scoped_refptr<SpdyStream> stream = streams[i % arraysize(streams)]; - write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream); + write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), + stream->GetWeakPtr()); } write_queue.RemovePendingWritesForStreamsAfter(stream1->stream_id()); @@ -207,7 +212,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) { for (int i = 0; i < 100; i += arraysize(streams)) { SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)) << "Unable to Dequeue i: " << i; EXPECT_EQ(SYN_STREAM, frame_type); @@ -217,7 +222,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) { SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); } @@ -228,14 +233,15 @@ TEST_F(SpdyWriteQueueTest, Clear) { SpdyWriteQueue write_queue; for (int i = 0; i < 100; ++i) { - write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), NULL); + write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), + base::WeakPtr<SpdyStream>()); } write_queue.Clear(); SpdyFrameType frame_type = DATA; scoped_ptr<SpdyBufferProducer> frame_producer; - scoped_refptr<SpdyStream> stream; + base::WeakPtr<SpdyStream> stream; EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); } |