summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-14 05:51:44 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-14 05:51:44 +0000
commitd68d318078d07c351c926a4788873bdc7102e448 (patch)
tree5104c9009c626586cdd09bc33eb3f4007279fc99 /net
parentf4213ec4fcef066d281947901369b5ee88ca0735 (diff)
downloadchromium_src-d68d318078d07c351c926a4788873bdc7102e448.zip
chromium_src-d68d318078d07c351c926a4788873bdc7102e448.tar.gz
chromium_src-d68d318078d07c351c926a4788873bdc7102e448.tar.bz2
[SPDY] Make SpdyWriteQueue use WeakPtrs and not scoped_refptrs for SpdyStream
Add check that the SpdyStream remains non-NULL while its in the SpdyWriteQueue. Make ActivateStream() take a scoped_refptr<> and make it do a single map loop. Make SpdyStream functions use SpdyStream::GetWeakPtr(). BUG=178943 Review URL: https://chromiumcodereview.appspot.com/15102014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@199930 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/spdy/spdy_session.cc26
-rw-r--r--net/spdy/spdy_session.h6
-rw-r--r--net/spdy/spdy_stream.cc25
-rw-r--r--net/spdy/spdy_write_queue.cc18
-rw-r--r--net/spdy/spdy_write_queue.h17
-rw-r--r--net/spdy/spdy_write_queue_unittest.cc46
6 files changed, 72 insertions, 66 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 1b62517..f17db79 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -688,7 +688,7 @@ bool SpdySession::CloseOneIdleConnection() {
}
void SpdySession::EnqueueStreamWrite(
- SpdyStream* stream,
+ const base::WeakPtr<SpdyStream>& stream,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> producer) {
DCHECK(frame_type == HEADERS ||
@@ -1161,7 +1161,7 @@ void SpdySession::WriteSocket() {
// Grab the next frame to send.
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
break;
@@ -1172,7 +1172,7 @@ void SpdySession::WriteSocket() {
// guarantee monotonically-increasing stream IDs.
if (frame_type == SYN_STREAM) {
if (stream && stream->stream_id() == 0) {
- ActivateStream(stream.get());
+ ActivateStream(scoped_refptr<SpdyStream>(stream.get()));
} else {
NOTREACHED();
continue;
@@ -1403,26 +1403,26 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority,
scoped_ptr<SpdyBufferProducer>(
new SimpleBufferProducer(
scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
- NULL);
+ base::WeakPtr<SpdyStream>());
}
void SpdySession::EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> producer,
- const scoped_refptr<SpdyStream>& stream) {
+ const base::WeakPtr<SpdyStream>& stream) {
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
WriteSocketLater();
}
-void SpdySession::ActivateStream(SpdyStream* stream) {
+void SpdySession::ActivateStream(const scoped_refptr<SpdyStream>& stream) {
if (stream->stream_id() == 0) {
stream->set_stream_id(GetNewStreamId());
- created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+ created_streams_.erase(stream);
}
- const SpdyStreamId id = stream->stream_id();
- DCHECK(!IsStreamActive(id));
-
- active_streams_[id] = stream;
+ ActiveStreamMap::value_type entry(stream->stream_id(), stream);
+ ActiveStreamMap::iterator it = active_streams_.lower_bound(entry.first);
+ DCHECK(it == active_streams_.end() || it->second->stream_id() != entry.first);
+ ignore_result(active_streams_.insert(it, entry));
}
void SpdySession::DeleteStream(SpdyStreamId id, int status) {
@@ -1462,7 +1462,7 @@ void SpdySession::DeleteStreamRefs(scoped_refptr<SpdyStream>* last_ref,
in_flight_write_stream_ = NULL;
}
- write_queue_.RemovePendingWritesForStream(*last_ref);
+ write_queue_.RemovePendingWritesForStream((*last_ref)->GetWeakPtr());
(*last_ref)->OnClose(status);
@@ -1730,7 +1730,7 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id,
std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> (
stream, time_func_());
- ActivateStream(stream.get());
+ ActivateStream(stream);
stream->set_response_received();
stream = NULL;
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index e838fa6..e571ddd 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -253,7 +253,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Pushes the given producer into the write queue for
// |stream|. |stream| is guaranteed to be activated before the
// producer is used to produce its frame.
- void EnqueueStreamWrite(SpdyStream* stream,
+ void EnqueueStreamWrite(const base::WeakPtr<SpdyStream>& stream,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> producer);
@@ -601,10 +601,10 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
void EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> producer,
- const scoped_refptr<SpdyStream>& stream);
+ const base::WeakPtr<SpdyStream>& stream);
// Track active streams in the active stream list.
- void ActivateStream(SpdyStream* stream);
+ void ActivateStream(const scoped_refptr<SpdyStream>& stream);
// If there is an active stream with the given ID, delete it. There
// must be no external references to that active stream. Also note
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index a4ba04c..b83e167 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -153,8 +153,7 @@ void SpdyStream::SetDelegate(Delegate* delegate) {
CHECK(response_received());
MessageLoop::current()->PostTask(
FROM_HERE,
- base::Bind(&SpdyStream::PushedStreamReplayData,
- weak_ptr_factory_.GetWeakPtr()));
+ base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr()));
} else {
continue_buffering_data_ = false;
}
@@ -535,8 +534,7 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
DecreaseRecvWindowSize(static_cast<int32>(length));
buffer->AddConsumeCallback(
- base::Bind(&SpdyStream::OnReadBufferConsumed,
- weak_ptr_factory_.GetWeakPtr()));
+ base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
}
// Track our bandwidth.
@@ -626,10 +624,9 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
CHECK_GT(stream_id_, 0u);
session_->EnqueueStreamWrite(
- this, HEADERS,
+ GetWeakPtr(), HEADERS,
scoped_ptr<SpdyBufferProducer>(
- new HeaderBufferProducer(
- weak_ptr_factory_.GetWeakPtr(), headers.Pass())));
+ new HeaderBufferProducer(GetWeakPtr(), headers.Pass())));
}
void SpdyStream::QueueStreamData(IOBuffer* data,
@@ -658,12 +655,11 @@ void SpdyStream::QueueStreamData(IOBuffer* data,
// here anyway just in case this changes.
data_buffer->AddConsumeCallback(
base::Bind(&SpdyStream::OnWriteBufferConsumed,
- weak_ptr_factory_.GetWeakPtr(),
- payload_size));
+ GetWeakPtr(), payload_size));
}
session_->EnqueueStreamWrite(
- this, DATA,
+ GetWeakPtr(), DATA,
scoped_ptr<SpdyBufferProducer>(
new SimpleBufferProducer(data_buffer.Pass())));
}
@@ -814,8 +810,7 @@ int SpdyStream::DoGetDomainBoundCert() {
int rv = sbc_service->GetDomainBoundCert(
url.GetOrigin().host(), requested_cert_types,
&domain_bound_cert_type_, &domain_bound_private_key_, &domain_bound_cert_,
- base::Bind(&SpdyStream::OnGetDomainBoundCertComplete,
- weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, GetWeakPtr()),
&domain_bound_cert_request_handle_);
return rv;
}
@@ -854,7 +849,7 @@ int SpdyStream::DoSendDomainBoundCert() {
// immediately enqueueing the SYN_STREAM frame here and adjusting
// the state machine appropriately.
session_->EnqueueStreamWrite(
- this, CREDENTIAL,
+ GetWeakPtr(), CREDENTIAL,
scoped_ptr<SpdyBufferProducer>(
new SimpleBufferProducer(
scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
@@ -871,9 +866,9 @@ int SpdyStream::DoSendHeaders() {
io_state_ = STATE_SEND_HEADERS_COMPLETE;
session_->EnqueueStreamWrite(
- this, SYN_STREAM,
+ GetWeakPtr(), SYN_STREAM,
scoped_ptr<SpdyBufferProducer>(
- new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr())));
+ new SynStreamBufferProducer(GetWeakPtr())));
return ERR_IO_PENDING;
}
diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc
index e3691e20..b4e7376 100644
--- a/net/spdy/spdy_write_queue.cc
+++ b/net/spdy/spdy_write_queue.cc
@@ -18,10 +18,11 @@ SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {}
SpdyWriteQueue::PendingWrite::PendingWrite(
SpdyFrameType frame_type,
SpdyBufferProducer* frame_producer,
- const scoped_refptr<SpdyStream>& stream)
+ const base::WeakPtr<SpdyStream>& stream)
: frame_type(frame_type),
frame_producer(frame_producer),
- stream(stream) {}
+ stream(stream),
+ has_stream(stream != NULL) {}
SpdyWriteQueue::PendingWrite::~PendingWrite() {}
@@ -34,17 +35,16 @@ SpdyWriteQueue::~SpdyWriteQueue() {
void SpdyWriteQueue::Enqueue(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> frame_producer,
- const scoped_refptr<SpdyStream>& stream) {
- if (stream.get()) {
+ const base::WeakPtr<SpdyStream>& stream) {
+ if (stream)
DCHECK_EQ(stream->priority(), priority);
- }
queue_[priority].push_back(
PendingWrite(frame_type, frame_producer.release(), stream));
}
bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
scoped_ptr<SpdyBufferProducer>* frame_producer,
- scoped_refptr<SpdyStream>* stream) {
+ base::WeakPtr<SpdyStream>* stream) {
for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
if (!queue_[i].empty()) {
PendingWrite pending_write = queue_[i].front();
@@ -52,6 +52,8 @@ bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
*frame_type = pending_write.frame_type;
frame_producer->reset(pending_write.frame_producer);
*stream = pending_write.stream;
+ if (pending_write.has_stream)
+ DCHECK(*stream);
return true;
}
}
@@ -59,8 +61,8 @@ bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
}
void SpdyWriteQueue::RemovePendingWritesForStream(
- const scoped_refptr<SpdyStream>& stream) {
- DCHECK(stream.get());
+ const base::WeakPtr<SpdyStream>& stream) {
+ DCHECK(stream);
if (DCHECK_IS_ON()) {
// |stream| should not have pending writes in a queue not matching
// its priority.
diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h
index 1d5556b..fa194ef 100644
--- a/net/spdy/spdy_write_queue.h
+++ b/net/spdy/spdy_write_queue.h
@@ -8,8 +8,8 @@
#include <deque>
#include "base/basictypes.h"
-#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
#include "net/base/net_export.h"
#include "net/base/request_priority.h"
#include "net/spdy/spdy_protocol.h"
@@ -30,11 +30,12 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
// Enqueues the given frame producer of the given type at the given
// priority associated with the given stream, which may be NULL if
// the frame producer is not associated with a stream. If |stream|
- // is non-NULL, its priority must be equal to |priority|.
+ // is non-NULL, its priority must be equal to |priority|, and it
+ // must remain non-NULL until the write is dequeued or removed.
void Enqueue(RequestPriority priority,
SpdyFrameType frame_type,
scoped_ptr<SpdyBufferProducer> frame_producer,
- const scoped_refptr<SpdyStream>& stream);
+ const base::WeakPtr<SpdyStream>& stream);
// Dequeues the frame producer with the highest priority that was
// enqueued the earliest and its associated stream. Returns true and
@@ -42,11 +43,11 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
// successful -- otherwise, just returns false.
bool Dequeue(SpdyFrameType* frame_type,
scoped_ptr<SpdyBufferProducer>* frame_producer,
- scoped_refptr<SpdyStream>* stream);
+ base::WeakPtr<SpdyStream>* stream);
// Removes all pending writes for the given stream, which must be
// non-NULL.
- void RemovePendingWritesForStream(const scoped_refptr<SpdyStream>& stream);
+ void RemovePendingWritesForStream(const base::WeakPtr<SpdyStream>& stream);
// Removes all pending writes for streams after |last_good_stream_id|
// and streams with no stream id.
@@ -62,12 +63,14 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
// This has to be a raw pointer since we store this in an STL
// container.
SpdyBufferProducer* frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
+ // Whether |stream| was non-NULL when enqueued.
+ bool has_stream;
PendingWrite();
PendingWrite(SpdyFrameType frame_type,
SpdyBufferProducer* frame_producer,
- const scoped_refptr<SpdyStream>& stream);
+ const base::WeakPtr<SpdyStream>& stream);
~PendingWrite();
};
diff --git a/net/spdy/spdy_write_queue_unittest.cc b/net/spdy/spdy_write_queue_unittest.cc
index 564cdbe..9d85863 100644
--- a/net/spdy/spdy_write_queue_unittest.cc
+++ b/net/spdy/spdy_write_queue_unittest.cc
@@ -75,21 +75,21 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM");
scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST");
- // A NULL stream should still work.
- scoped_refptr<SpdyStream> stream_low(NULL);
scoped_refptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM));
scoped_refptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST));
+ // A NULL stream should still work.
write_queue.Enqueue(
- LOW, SYN_STREAM, producer_low.Pass(), stream_low);
+ LOW, SYN_STREAM, producer_low.Pass(), base::WeakPtr<SpdyStream>());
write_queue.Enqueue(
- MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium);
+ MEDIUM, SYN_REPLY, producer_medium.Pass(), stream_medium->GetWeakPtr());
write_queue.Enqueue(
- HIGHEST, RST_STREAM, producer_highest.Pass(), stream_highest);
+ HIGHEST, RST_STREAM, producer_highest.Pass(),
+ stream_highest->GetWeakPtr());
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(RST_STREAM, frame_type);
EXPECT_EQ("HIGHEST", ProducerToString(frame_producer.Pass()));
@@ -103,7 +103,7 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(SYN_STREAM, frame_type);
EXPECT_EQ("LOW", ProducerToString(frame_producer.Pass()));
- EXPECT_EQ(stream_low, stream);
+ EXPECT_EQ(NULL, stream.get());
EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
}
@@ -121,13 +121,16 @@ TEST_F(SpdyWriteQueueTest, DequeuesFIFO) {
scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
scoped_refptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY));
- write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(), stream1);
- write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(), stream2);
- write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(), stream3);
+ write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(),
+ stream1->GetWeakPtr());
+ write_queue.Enqueue(DEFAULT_PRIORITY, SYN_REPLY, producer2.Pass(),
+ stream2->GetWeakPtr());
+ write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(),
+ stream3->GetWeakPtr());
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(SYN_STREAM, frame_type);
EXPECT_EQ(1, ProducerToInt(frame_producer.Pass()));
@@ -157,15 +160,16 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
for (int i = 0; i < 100; ++i) {
scoped_refptr<SpdyStream> stream = ((i % 3) == 0) ? stream1 : stream2;
- write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream);
+ write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
+ stream->GetWeakPtr());
}
- write_queue.RemovePendingWritesForStream(stream2);
+ write_queue.RemovePendingWritesForStream(stream2->GetWeakPtr());
for (int i = 0; i < 100; i += 3) {
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(SYN_STREAM, frame_type);
EXPECT_EQ(i, ProducerToInt(frame_producer.Pass()));
@@ -174,7 +178,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
}
@@ -199,7 +203,8 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) {
for (int i = 0; i < 100; ++i) {
scoped_refptr<SpdyStream> stream = streams[i % arraysize(streams)];
- write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream);
+ write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
+ stream->GetWeakPtr());
}
write_queue.RemovePendingWritesForStreamsAfter(stream1->stream_id());
@@ -207,7 +212,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) {
for (int i = 0; i < 100; i += arraysize(streams)) {
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream))
<< "Unable to Dequeue i: " << i;
EXPECT_EQ(SYN_STREAM, frame_type);
@@ -217,7 +222,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) {
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
}
@@ -228,14 +233,15 @@ TEST_F(SpdyWriteQueueTest, Clear) {
SpdyWriteQueue write_queue;
for (int i = 0; i < 100; ++i) {
- write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), NULL);
+ write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i),
+ base::WeakPtr<SpdyStream>());
}
write_queue.Clear();
SpdyFrameType frame_type = DATA;
scoped_ptr<SpdyBufferProducer> frame_producer;
- scoped_refptr<SpdyStream> stream;
+ base::WeakPtr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
}