diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-14 23:13:48 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-14 23:13:48 +0000 |
commit | 1c12c06d2e1c6f845bda544afb82319537b5ec36 (patch) | |
tree | e94ea9a62c71f46110910eba882d07106021ee1d /net | |
parent | 9ae7c4a88b1aa71396254a74a0cb54052188edd7 (diff) | |
download | chromium_src-1c12c06d2e1c6f845bda544afb82319537b5ec36.zip chromium_src-1c12c06d2e1c6f845bda544afb82319537b5ec36.tar.gz chromium_src-1c12c06d2e1c6f845bda544afb82319537b5ec36.tar.bz2 |
[SPDY] Make SpdyStream not ref-counted
Add a bunch of functions to SpdyStream to help manage
stream ownership.
Reduce the number of active_streams_ lookups in SpdyStream.
Make SpdyStream::~SpdyStream() non-virtual.
BUG=178943
R=rch@chromium.org
Review URL: https://codereview.chromium.org/15129003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@200101 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_session.cc | 221 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 64 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy2_unittest.cc | 8 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 10 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 18 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 13 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 33 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 69 | ||||
-rw-r--r-- | net/spdy/spdy_write_queue_unittest.cc | 36 |
9 files changed, 245 insertions, 227 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index f17db79..67cf065 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -584,21 +584,18 @@ int SpdySession::CreateStream(const SpdyStreamRequest& request, } const std::string& path = request.url().PathForRequest(); - scoped_refptr<SpdyStream> new_stream( + scoped_ptr<SpdyStream> new_stream( new SpdyStream(this, path, request.priority(), stream_initial_send_window_size_, stream_initial_recv_window_size_, false, request.net_log())); - created_streams_.insert(new_stream); - *stream = new_stream->GetWeakPtr(); + InsertCreatedStream(new_stream.Pass()); UMA_HISTOGRAM_CUSTOM_COUNTS( "Net.SpdyPriorityCount", static_cast<int>(request.priority()), 0, 10, 11); - // TODO(mbelshe): Optimize memory allocations - return OK; } @@ -704,9 +701,9 @@ scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( uint8 credential_slot, SpdyControlFlags flags, const SpdyHeaderBlock& headers) { - CHECK(IsStreamActive(stream_id)); - const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; - CHECK_EQ(stream->stream_id(), stream_id); + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + CHECK(it != active_streams_.end()); + CHECK_EQ(it->second->stream_id(), stream_id); SendPrefacePingIfNoneInFlight(); @@ -771,10 +768,9 @@ scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame( SpdyStreamId stream_id, const SpdyHeaderBlock& headers, SpdyControlFlags flags) { - // Find our stream - CHECK(IsStreamActive(stream_id)); - scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; - CHECK_EQ(stream->stream_id(), stream_id); + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + CHECK(it != active_streams_.end()); + CHECK_EQ(it->second->stream_id(), stream_id); // Create a HEADER frame. scoped_ptr<SpdyFrame> frame( @@ -796,9 +792,9 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, IOBuffer* data, int len, SpdyDataFlags flags) { - // Find our stream. - CHECK(IsStreamActive(stream_id)); - scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + CHECK(it != active_streams_.end()); + SpdyStream* stream = it->second; CHECK_EQ(stream->stream_id(), stream_id); if (len < 0) { @@ -851,7 +847,7 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, stream->set_send_stalled_by_flow_control(true); // Even though we're currently stalled only by the stream, we // might end up being stalled by the session also. - QueueSendStalledStream(stream); + QueueSendStalledStream(*stream); net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); @@ -866,7 +862,7 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { if (send_stalled_by_session) { stream->set_send_stalled_by_flow_control(true); - QueueSendStalledStream(stream); + QueueSendStalledStream(*stream); net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); @@ -914,21 +910,44 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, return data_buffer.Pass(); } -void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { +void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) { DCHECK_NE(0u, stream_id); + // TODO(mbelshe): We should send a RST_STREAM control frame here // so that the server can cancel a large send. - DeleteStream(stream_id, status); + // For push streams, if they are being deleted normally, we leave + // the stream in the unclaimed_pushed_streams_ list. However, if + // the stream is errored out, clean it up entirely. + if (status != OK) { + for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); + it != unclaimed_pushed_streams_.end(); ++it) { + if (stream_id == it->second.first->stream_id()) { + unclaimed_pushed_streams_.erase(it); + break; + } + } + } + + // The stream might have been deleted. + ActiveStreamMap::iterator it = active_streams_.find(stream_id); + if (it == active_streams_.end()) + return; + + scoped_ptr<SpdyStream> owned_stream(it->second); + active_streams_.erase(it); + + DeleteStream(owned_stream.Pass(), status); } void SpdySession::CloseCreatedStream( const base::WeakPtr<SpdyStream>& stream, int status) { DCHECK_EQ(0u, stream->stream_id()); - scoped_refptr<SpdyStream> last_ref(stream.get()); - created_streams_.erase(last_ref); - DeleteStreamRefs(&last_ref, status); + scoped_ptr<SpdyStream> owned_stream(stream.get()); + created_streams_.erase(stream); + + DeleteStream(owned_stream.Pass(), status); } void SpdySession::ResetStream(SpdyStreamId stream_id, @@ -943,12 +962,12 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, scoped_ptr<SpdyFrame> rst_frame( buffered_spdy_framer_->CreateRstStream(stream_id, status)); + // Removes any pending writes for |stream_id| except for possibly an + // in-flight one. + CloseActiveStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); RecordProtocolErrorHistogram( static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); - // Removes any pending writes for |stream_id| except for possibly an - // in-flight one. - DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); } bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { @@ -1086,7 +1105,7 @@ void SpdySession::OnWriteComplete(int result) { in_flight_write_.reset(); in_flight_write_frame_type_ = DATA; in_flight_write_frame_size_ = 0; - in_flight_write_stream_ = NULL; + in_flight_write_stream_.reset(); CloseSessionOnError(static_cast<Error>(result), true, "Write error"); return; } @@ -1114,7 +1133,7 @@ void SpdySession::OnWriteComplete(int result) { in_flight_write_.reset(); in_flight_write_frame_type_ = DATA; in_flight_write_frame_size_ = 0; - in_flight_write_stream_ = NULL; + in_flight_write_stream_.reset(); } } @@ -1172,7 +1191,9 @@ void SpdySession::WriteSocket() { // guarantee monotonically-increasing stream IDs. if (frame_type == SYN_STREAM) { if (stream && stream->stream_id() == 0) { - ActivateStream(scoped_refptr<SpdyStream>(stream.get())); + scoped_ptr<SpdyStream> owned_stream = + ActivateCreatedStream(stream.get()); + InsertActivatedStream(owned_stream.Pass()); } else { NOTREACHED(); continue; @@ -1238,7 +1259,7 @@ void SpdySession::CloseAllStreamsAfter(SpdyStreamId last_good_stream_id, streams_abandoned_count_++; LogAbandonedStream(it->second, status); ++it; - DeleteStream(stream_id, status); + CloseActiveStream(stream_id, status); } while (!created_streams_.empty()) { @@ -1265,8 +1286,7 @@ void SpdySession::CloseAllStreams(Error status) { write_queue_.Clear(); } -void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, - Error status) { +void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { DCHECK(stream); std::string description = base::StringPrintf( "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); @@ -1414,66 +1434,49 @@ void SpdySession::EnqueueWrite(RequestPriority priority, WriteSocketLater(); } -void SpdySession::ActivateStream(const scoped_refptr<SpdyStream>& stream) { - if (stream->stream_id() == 0) { - stream->set_stream_id(GetNewStreamId()); - created_streams_.erase(stream); - } - ActiveStreamMap::value_type entry(stream->stream_id(), stream); - ActiveStreamMap::iterator it = active_streams_.lower_bound(entry.first); - DCHECK(it == active_streams_.end() || it->second->stream_id() != entry.first); - ignore_result(active_streams_.insert(it, entry)); +void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { + DCHECK_EQ(stream->stream_id(), 0u); + DCHECK(created_streams_.find(stream.get()) == created_streams_.end()); + created_streams_.insert(stream.release()); } -void SpdySession::DeleteStream(SpdyStreamId id, int status) { - // For push streams, if they are being deleted normally, we leave - // the stream in the unclaimed_pushed_streams_ list. However, if - // the stream is errored out, clean it up entirely. - if (status != OK) { - for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); - it != unclaimed_pushed_streams_.end(); ++it) { - if (id == it->second.first->stream_id()) { - unclaimed_pushed_streams_.erase(it); - break; - } - } - } - - // The stream might have been deleted. - ActiveStreamMap::iterator it = active_streams_.find(id); - if (it == active_streams_.end()) - return; - - scoped_refptr<SpdyStream> last_ref; - last_ref.swap(it->second); - active_streams_.erase(it); - DCHECK(last_ref); +scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) { + DCHECK_EQ(stream->stream_id(), 0u); + DCHECK(created_streams_.find(stream) != created_streams_.end()); + stream->set_stream_id(GetNewStreamId()); + scoped_ptr<SpdyStream> owned_stream(stream); + created_streams_.erase(stream); + return owned_stream.Pass(); +} - DeleteStreamRefs(&last_ref, status); +void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) { + SpdyStreamId stream_id = stream->stream_id(); + DCHECK_NE(stream_id, 0u); + std::pair<ActiveStreamMap::iterator, bool> result = + active_streams_.insert(std::make_pair(stream_id, stream.get())); + if (result.second) { + ignore_result(stream.release()); + } else { + NOTREACHED(); + } } -void SpdySession::DeleteStreamRefs(scoped_refptr<SpdyStream>* last_ref, - int status) { - if (in_flight_write_stream_ == *last_ref) { +void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { + if (in_flight_write_stream_ == stream.get()) { // If we're deleting the stream for the in-flight write, we still // need to let the write complete, so we clear // |in_flight_write_stream_| and let the write finish on its own // without notifying |in_flight_write_stream_|. - in_flight_write_stream_ = NULL; + in_flight_write_stream_.reset(); } - write_queue_.RemovePendingWritesForStream((*last_ref)->GetWeakPtr()); + write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); - (*last_ref)->OnClose(status); + stream->OnClose(status); ProcessPendingStreamRequests(); - // Nothing else should be holding a reference to |stream| after this - // point. - CHECK((*last_ref)->HasOneRef()); - - // May release the last reference to |this|. - *last_ref = NULL; + // Deleting |stream| may release the last reference to |this|. } void SpdySession::RemoveFromPool() { @@ -1613,15 +1616,14 @@ void SpdySession::OnSynStreamCompressed( } -bool SpdySession::Respond(const SpdyHeaderBlock& headers, - const scoped_refptr<SpdyStream>& stream) { +bool SpdySession::Respond(const SpdyHeaderBlock& headers, SpdyStream* stream) { int rv = OK; SpdyStreamId stream_id = stream->stream_id(); // May invalidate |stream|. rv = stream->OnResponseReceived(headers); if (rv < 0) { DCHECK_NE(rv, ERR_IO_PENDING); - DeleteStream(stream_id, rv); + CloseActiveStream(stream_id, rv); return false; } return true; @@ -1718,7 +1720,7 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, return; } - scoped_refptr<SpdyStream> stream( + scoped_ptr<SpdyStream> stream( new SpdyStream(this, gurl.PathForRequest(), request_priority, stream_initial_send_window_size_, stream_initial_recv_window_size_, @@ -1727,12 +1729,10 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, DeleteExpiredPushedStreams(); unclaimed_pushed_streams_[url] = - std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> ( - stream, time_func_()); + std::pair<SpdyStream*, base::TimeTicks>(stream.get(), time_func_()); - ActivateStream(stream); stream->set_response_received(); - stream = NULL; + InsertActivatedStream(stream.Pass()); ActiveStreamMap::iterator it = active_streams_.find(stream_id); if (it == active_streams_.end()) { @@ -1762,9 +1762,10 @@ void SpdySession::DeleteExpiredPushedStreams() { PushedStreamMap::iterator it; for (it = unclaimed_pushed_streams_.begin(); it != unclaimed_pushed_streams_.end(); ) { - const scoped_refptr<SpdyStream>& stream = it->second.first; + SpdyStream* stream = it->second.first; base::TimeTicks creation_time = it->second.second; - // DeleteStream() will invalidate the current iterator, so move to next. + // CloseActiveStream() will invalidate the current iterator, so + // move to next. ++it; if (minimum_freshness > creation_time) { base::StatsCounter abandoned_push_streams( @@ -1773,7 +1774,7 @@ void SpdySession::DeleteExpiredPushedStreams() { abandoned_push_streams.Increment(); abandoned_streams.Increment(); streams_abandoned_count_++; - DeleteStream(stream->stream_id(), ERR_INVALID_SPDY_STREAM); + CloseActiveStream(stream->stream_id(), ERR_INVALID_SPDY_STREAM); } } next_unclaimed_push_stream_sweep_time_ = time_func_() + @@ -1797,14 +1798,14 @@ void SpdySession::OnSynReply(SpdyStreamId stream_id, return; } - const scoped_refptr<SpdyStream>& stream = it->second; + SpdyStream* stream = it->second; CHECK_EQ(stream->stream_id(), stream_id); if (stream->response_received()) { stream->LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Received duplicate SYN_REPLY for stream."); RecordProtocolErrorHistogram(PROTOCOL_ERROR_SYN_REPLY_NOT_RECEIVED); - CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR); + CloseActiveStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR); return; } stream->set_response_received(); @@ -1835,7 +1836,7 @@ void SpdySession::OnHeaders(SpdyStreamId stream_id, int rv = it->second->OnHeaders(headers); if (rv < 0) { DCHECK_NE(rv, ERR_IO_PENDING); - DeleteStream(stream_id, rv); + CloseActiveStream(stream_id, rv); } } @@ -1859,7 +1860,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, if (status == 0) { it->second->OnDataReceived(scoped_ptr<SpdyBuffer>()); } else if (status == RST_STREAM_REFUSED_STREAM) { - DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); + CloseActiveStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); } else { RecordProtocolErrorHistogram( PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); @@ -1868,7 +1869,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, base::StringPrintf("SPDY stream closed with status: %d", status)); // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. // For now, it doesn't matter much - it is a protocol error. - DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); + CloseActiveStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); } } @@ -1974,10 +1975,10 @@ void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, uint32 delta_window_size) { CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); - CHECK(IsStreamActive(stream_id)); - scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; - CHECK_EQ(stream->stream_id(), stream_id); - SendWindowUpdateFrame(stream_id, delta_window_size, stream->priority()); + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + CHECK(it != active_streams_.end()); + CHECK_EQ(it->second->stream_id(), stream_id); + SendWindowUpdateFrame(stream_id, delta_window_size, it->second->priority()); } void SpdySession::SendInitialSettings() { @@ -2074,17 +2075,14 @@ void SpdySession::HandleSetting(uint32 id, uint32 value) { void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); - ActiveStreamMap::iterator it; - for (it = active_streams_.begin(); it != active_streams_.end(); ++it) { - const scoped_refptr<SpdyStream>& stream = it->second; - DCHECK(stream); - stream->AdjustSendWindowSize(delta_window_size); + for (ActiveStreamMap::iterator it = active_streams_.begin(); + it != active_streams_.end(); ++it) { + it->second->AdjustSendWindowSize(delta_window_size); } - CreatedStreamSet::iterator i; - for (i = created_streams_.begin(); i != created_streams_.end(); i++) { - const scoped_refptr<SpdyStream>& stream = *i; - stream->AdjustSendWindowSize(delta_window_size); + for (CreatedStreamSet::const_iterator it = created_streams_.begin(); + it != created_streams_.end(); it++) { + (*it)->AdjustSendWindowSize(delta_window_size); } } @@ -2106,9 +2104,9 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, uint32 delta_window_size, RequestPriority priority) { CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); - if (IsStreamActive(stream_id)) { - scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; - CHECK_EQ(stream->stream_id(), stream_id); + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + if (it != active_streams_.end()) { + CHECK_EQ(it->second->stream_id(), stream_id); } else { CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); CHECK_EQ(stream_id, kSessionFlowControlStreamId); @@ -2422,10 +2420,9 @@ void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { -delta_window_size, session_recv_window_size_)); } -void SpdySession::QueueSendStalledStream( - const scoped_refptr<SpdyStream>& stream) { - DCHECK(stream->send_stalled_by_flow_control()); - stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id()); +void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { + DCHECK(stream.send_stalled_by_flow_control()); + stream_send_unstall_queue_[stream.priority()].push_back(stream.stream_id()); } namespace { diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index e571ddd..ba3a55c 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -287,12 +287,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, int len, SpdyDataFlags flags); - // If there is an active stream with the given ID, close it. There - // must be no external references to that active stream. - void CloseStream(SpdyStreamId stream_id, int status); - - // Close a stream that has been created but is not yet active. - // There must be no external references to that active stream. + // Close the active stream with the given ID (if it's not already + // deleted). Note that that stream may hold the last reference to + // the session. + void CloseActiveStream(SpdyStreamId stream_id, int status); + + // Close the given created stream, which must not yet be + // active. Note that |stream| may hold the last reference to the + // session. void CloseCreatedStream(const base::WeakPtr<SpdyStream>& stream, int status); // If there is an active stream with the given ID, reset it by by @@ -491,11 +493,11 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, typedef std::deque<SpdyStreamRequest*> PendingStreamRequestQueue; typedef std::set<SpdyStreamRequest*> PendingStreamRequestCompletionSet; - typedef std::map<SpdyStreamId, scoped_refptr<SpdyStream> > ActiveStreamMap; - typedef std::map<std::string, - std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> > PushedStreamMap; + typedef std::map<SpdyStreamId, SpdyStream*> ActiveStreamMap; + typedef std::map<std::string, std::pair<SpdyStream*, base::TimeTicks> > + PushedStreamMap; - typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet; + typedef std::set<SpdyStream*> CreatedStreamSet; enum State { STATE_IDLE, @@ -603,20 +605,20 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, scoped_ptr<SpdyBufferProducer> producer, const base::WeakPtr<SpdyStream>& stream); - // Track active streams in the active stream list. - void ActivateStream(const scoped_refptr<SpdyStream>& stream); + // Inserts a newly-created stream into |created_streams_|. + void InsertCreatedStream(scoped_ptr<SpdyStream> stream); + + // Activates |stream| (which must be in |created_streams_|) by + // assigning it an ID and returns it. + scoped_ptr<SpdyStream> ActivateCreatedStream(SpdyStream* stream); - // If there is an active stream with the given ID, delete it. There - // must be no external references to that active stream. Also note - // that that active stream may hold the last reference to this - // object. - void DeleteStream(SpdyStreamId id, int status); + // Inserts a newly-activated stream into |active_streams_|. + void InsertActivatedStream(scoped_ptr<SpdyStream> stream); - // Remove all internal references to the stream pointed to by - // |last_ref|, which must be the last reference to that stream and - // is set to NULL. Also note that the given stream may hold the last - // reference to this object. - void DeleteStreamRefs(scoped_refptr<SpdyStream>* last_ref, int status); + // Remove all internal references to |stream|, call OnClose() on it, + // and process any pending stream requests before deleting it. Note + // that |stream| may hold the last reference to the session. + void DeleteStream(scoped_ptr<SpdyStream> stream, int status); // Removes this session from the session pool. void RemoveFromPool(); @@ -628,8 +630,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Calls OnResponseReceived(). // Returns true if successful. - bool Respond(const SpdyHeaderBlock& headers, - const scoped_refptr<SpdyStream>& stream); + bool Respond(const SpdyHeaderBlock& headers, SpdyStream* stream); void RecordPingRTTHistogram(base::TimeDelta duration); void RecordHistograms(); @@ -645,8 +646,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // shutdown. void CloseAllStreams(Error status); - void LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, - Error status); + void LogAbandonedStream(SpdyStream* stream, Error status); // Invokes a user callback for stream creation. We provide this method so it // can be deferred to the MessageLoop, so we avoid re-entrancy problems. @@ -747,7 +747,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Queue a send-stalled stream for possibly resuming once we're not // send-stalled anymore. - void QueueSendStalledStream(const scoped_refptr<SpdyStream>& stream); + void QueueSendStalledStream(const SpdyStream& stream); // Go through the queue of send-stalled streams and try to resume as // many as possible. @@ -824,6 +824,9 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // of whether or not there is currently any ongoing IO [might be waiting for // the server to start pushing the stream]) or there are still network events // incoming even though the consumer has already gone away (cancellation). + // + // |active_streams_| owns all its SpdyStream objects. + // // TODO(willchan): Perhaps we should separate out cancelled streams and move // them into a separate ActiveStreamMap, and not deliver network events to // them? @@ -831,9 +834,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // Map of all the streams that have already started to be pushed by the // server, but do not have consumers yet. + // + // |unclaimed_pushed_streams_| does not own any of its SpdyStream + // objects. PushedStreamMap unclaimed_pushed_streams_; // Set of all created streams but that have not yet sent any frames. + // + // |created_streams_| owns all its SpdyStream objects. CreatedStreamSet created_streams_; // The write queue. @@ -850,7 +858,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, size_t in_flight_write_frame_size_; // The stream to notify when |in_flight_write_| has been written to // the socket completely. - scoped_refptr<SpdyStream> in_flight_write_stream_; + base::WeakPtr<SpdyStream> in_flight_write_stream_; // Flag if we have a pending message scheduled for WriteSocket. bool delayed_write_pending_; diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index 0a36e6a..cd5f562 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -331,14 +331,16 @@ TEST_F(SpdySessionSpdy2Test, DeleteExpiredPushStreams) { (*request_headers)["host"] = "www.google.com"; (*request_headers)["url"] = "/"; - scoped_refptr<SpdyStream> stream( + scoped_ptr<SpdyStream> stream( new SpdyStream(session, std::string(), DEFAULT_PRIORITY, kSpdyStreamInitialWindowSize, kSpdyStreamInitialWindowSize, false, session->net_log_)); stream->set_spdy_headers(request_headers.Pass()); - session->ActivateStream(stream); - stream = NULL; + SpdyStream* stream_ptr = stream.get(); + session->InsertCreatedStream(stream.Pass()); + stream = session->ActivateCreatedStream(stream_ptr); + session->InsertActivatedStream(stream.Pass()); SpdyHeaderBlock headers; headers["url"] = "http://www.google.com/a.dat"; diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index f906b2f..723b473 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -413,14 +413,16 @@ TEST_F(SpdySessionSpdy3Test, DeleteExpiredPushStreams) { (*request_headers)[":host"] = "www.google.com"; (*request_headers)[":path"] = "/"; - scoped_refptr<SpdyStream> stream( + scoped_ptr<SpdyStream> stream( new SpdyStream(session, std::string(), DEFAULT_PRIORITY, kSpdyStreamInitialWindowSize, kSpdyStreamInitialWindowSize, false, session->net_log_)); stream->set_spdy_headers(request_headers.Pass()); - session->ActivateStream(stream); - stream = NULL; + SpdyStream* stream_ptr = stream.get(); + session->InsertCreatedStream(stream.Pass()); + stream = session->ActivateCreatedStream(stream_ptr); + session->InsertActivatedStream(stream.Pass()); SpdyHeaderBlock headers; headers[":scheme"] = "http"; @@ -3340,7 +3342,7 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) { SpdyStreamId stream_id3 = stream3->stream_id(); // Close stream1 preemptively. - session->CloseStream(stream_id1, ERR_CONNECTION_CLOSED); + session->CloseActiveStream(stream_id1, ERR_CONNECTION_CLOSED); EXPECT_EQ(NULL, stream1.get()); EXPECT_FALSE(session->IsStreamActive(stream_id1)); diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index b83e167..08cb603 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -160,6 +160,8 @@ void SpdyStream::SetDelegate(Delegate* delegate) { } void SpdyStream::PushedStreamReplayData() { + DCHECK_NE(stream_id_, 0u); + if (!delegate_) return; @@ -173,7 +175,7 @@ void SpdyStream::PushedStreamReplayData() { if (pending_buffers_.size() != 0U) { LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "HEADERS incomplete headers, but pending data frames."); - session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); + session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); } return; } @@ -189,8 +191,8 @@ void SpdyStream::PushedStreamReplayData() { delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i])); } else { delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>()); - session_->CloseStream(stream_id_, OK); - // Note: |this| may be deleted after calling CloseStream. + session_->CloseActiveStream(stream_id_, OK); + // Note: |this| may be deleted after calling CloseActiveStream. DCHECK_EQ(buffers.size() - 1, i); } } @@ -502,7 +504,7 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { // received. if (!response_received()) { LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Didn't receive a response."); - session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); + session_->CloseActiveStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); return; } @@ -524,8 +526,8 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { if (!buffer) { metrics_.StopStream(); - session_->CloseStream(stream_id_, OK); - // Note: |this| may be deleted after calling CloseStream. + session_->CloseActiveStream(stream_id_, OK); + // Note: |this| may be deleted after calling CloseActiveStream. return; } @@ -545,7 +547,7 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { if (delegate_->OnDataReceived(buffer.Pass()) != OK) { // |delegate_| rejected the data. LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data"); - session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); + session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); return; } } @@ -594,7 +596,7 @@ void SpdyStream::Cancel() { void SpdyStream::Close() { if (stream_id_ != 0) { - session_->CloseStream(stream_id_, OK); + session_->CloseActiveStream(stream_id_, OK); } else { session_->CloseCreatedStream(GetWeakPtr(), OK); } diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 7e8c575..78cea20 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -49,8 +49,7 @@ enum SpdySendStatus { // a SpdyNetworkTransaction) will maintain a reference to the stream. When // initiated by the server, only the SpdySession will maintain any reference, // until such a time as a client object requests a stream for the path. -class NET_EXPORT_PRIVATE SpdyStream - : public base::RefCounted<SpdyStream> { +class NET_EXPORT_PRIVATE SpdyStream { public: // Delegate handles protocol specific behavior of spdy stream. class NET_EXPORT_PRIVATE Delegate { @@ -113,6 +112,8 @@ class NET_EXPORT_PRIVATE SpdyStream bool pushed, const BoundNetLog& net_log); + ~SpdyStream(); + // Set new |delegate|. |delegate| must not be NULL. // If it already received SYN_REPLY or data, OnResponseReceived() or // OnDataReceived() will be called. @@ -141,7 +142,9 @@ class NET_EXPORT_PRIVATE SpdyStream int32 recv_window_size() const { return recv_window_size_; } - bool send_stalled_by_flow_control() { return send_stalled_by_flow_control_; } + bool send_stalled_by_flow_control() const { + return send_stalled_by_flow_control_; + } void set_send_stalled_by_flow_control(bool stalled) { send_stalled_by_flow_control_ = stalled; @@ -348,10 +351,6 @@ class NET_EXPORT_PRIVATE SpdyStream STATE_DONE }; - friend class base::RefCounted<SpdyStream>; - - virtual ~SpdyStream(); - void OnGetDomainBoundCertComplete(int result); // Try to make progress sending/receiving the request/response. diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index 7d5c1ac..e66953e 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -238,33 +238,32 @@ TEST_F(SpdyStreamSpdy2Test, PushedStream) { BoundNetLog net_log; // Conjure up a stream. - scoped_refptr<SpdyStream> stream = - new SpdyStream(spdy_session, - std::string(), - DEFAULT_PRIORITY, - kSpdyStreamInitialWindowSize, - kSpdyStreamInitialWindowSize, - true, - net_log); - stream->set_stream_id(2); - EXPECT_FALSE(stream->response_received()); - EXPECT_FALSE(stream->HasUrl()); + SpdyStream stream(spdy_session, + std::string(), + DEFAULT_PRIORITY, + kSpdyStreamInitialWindowSize, + kSpdyStreamInitialWindowSize, + true, + net_log); + stream.set_stream_id(2); + EXPECT_FALSE(stream.response_received()); + EXPECT_FALSE(stream.HasUrl()); // Set a couple of headers. SpdyHeaderBlock response; response["url"] = kStreamUrl; - stream->OnResponseReceived(response); + stream.OnResponseReceived(response); // Send some basic headers. SpdyHeaderBlock headers; response["status"] = "200"; response["version"] = "OK"; - stream->OnHeaders(headers); + stream.OnHeaders(headers); - stream->set_response_received(); - EXPECT_TRUE(stream->response_received()); - EXPECT_TRUE(stream->HasUrl()); - EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + stream.set_response_received(); + EXPECT_TRUE(stream.response_received()); + EXPECT_TRUE(stream.HasUrl()); + EXPECT_EQ(kStreamUrl, stream.GetUrl().spec()); } TEST_F(SpdyStreamSpdy2Test, StreamError) { diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index cb08736..10c4a92 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -236,17 +236,16 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) { BoundNetLog net_log; // Conjure up a stream. - scoped_refptr<SpdyStream> stream = - new SpdyStream(spdy_session, - std::string(), - DEFAULT_PRIORITY, - kSpdyStreamInitialWindowSize, - kSpdyStreamInitialWindowSize, - true, - net_log); - stream->set_stream_id(2); - EXPECT_FALSE(stream->response_received()); - EXPECT_FALSE(stream->HasUrl()); + SpdyStream stream(spdy_session, + std::string(), + DEFAULT_PRIORITY, + kSpdyStreamInitialWindowSize, + kSpdyStreamInitialWindowSize, + true, + net_log); + stream.set_stream_id(2); + EXPECT_FALSE(stream.response_received()); + EXPECT_FALSE(stream.HasUrl()); // Set a couple of headers. SpdyHeaderBlock response; @@ -254,18 +253,18 @@ TEST_F(SpdyStreamSpdy3Test, PushedStream) { response[":host"] = url.host(); response[":scheme"] = url.scheme(); response[":path"] = url.path(); - stream->OnResponseReceived(response); + stream.OnResponseReceived(response); // Send some basic headers. SpdyHeaderBlock headers; response[":status"] = "200"; response[":version"] = "OK"; - stream->OnHeaders(headers); + stream.OnHeaders(headers); - stream->set_response_received(); - EXPECT_TRUE(stream->response_received()); - EXPECT_TRUE(stream->HasUrl()); - EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + stream.set_response_received(); + EXPECT_TRUE(stream.response_received()); + EXPECT_TRUE(stream.HasUrl()); + EXPECT_EQ(kStreamUrl, stream.GetUrl().spec()); } TEST_F(SpdyStreamSpdy3Test, StreamError) { @@ -357,28 +356,32 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { // to overflow an int32. The SpdyStream should handle that case // gracefully. TEST_F(SpdyStreamSpdy3Test, IncreaseSendWindowSizeOverflow) { - session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_); + session_ = + SpdySessionDependencies::SpdyCreateSessionDeterministic(&session_deps_); MockRead reads[] = { - MockRead(ASYNC, 0, 1), // EOF + MockRead(ASYNC, 0, 2), // EOF }; + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, 1, kPostBodyLength, LOWEST, NULL, 0)); // Triggered by the overflowing call to IncreaseSendWindowSize // below. scoped_ptr<SpdyFrame> rst( - ConstructSpdyRstStream(0, RST_STREAM_FLOW_CONTROL_ERROR)); + ConstructSpdyRstStream(1, RST_STREAM_FLOW_CONTROL_ERROR)); MockWrite writes[] = { - CreateMockWrite(*rst), + CreateMockWrite(*req, 0), + CreateMockWrite(*rst, 1), }; - writes[0].sequence_number = 0; CapturingBoundNetLog log; - OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes)); + DeterministicSocketData data( + reads, arraysize(reads), writes, arraysize(writes)); MockConnect connect_data(SYNCHRONOUS, OK); data.set_connect_data(connect_data); - session_deps_.socket_factory->AddSocketDataProvider(&data); + session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data); scoped_refptr<SpdySession> session(CreateSpdySession()); GURL url(kStreamUrl); @@ -388,22 +391,28 @@ TEST_F(SpdyStreamSpdy3Test, IncreaseSendWindowSizeOverflow) { base::WeakPtr<SpdyStream> stream = CreateStreamSynchronously(session, url, LOWEST, log.bound()); ASSERT_TRUE(stream.get() != NULL); - StreamDelegateSendImmediate delegate( stream, scoped_ptr<SpdyHeaderBlock>(), kPostBodyStringPiece); stream->SetDelegate(&delegate); - EXPECT_FALSE(stream->HasUrl()); - EXPECT_EQ(0u, stream->stream_id()); - EXPECT_FALSE(stream->closed()); + stream->set_spdy_headers( + spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + data.RunFor(1); int32 old_send_window_size = stream->send_window_size(); ASSERT_GT(old_send_window_size, 0); int32 delta_window_size = kint32max - old_send_window_size + 1; stream->IncreaseSendWindowSize(delta_window_size); - EXPECT_EQ(old_send_window_size, stream->send_window_size()); + EXPECT_EQ(NULL, stream.get()); - EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose()); + data.RunFor(2); + + EXPECT_EQ(ERR_SPDY_PROTOCOL_ERROR, delegate.WaitForClose()); } // Cause a send stall by reducing the flow control send window to diff --git a/net/spdy/spdy_write_queue_unittest.cc b/net/spdy/spdy_write_queue_unittest.cc index 9d85863..b327af9 100644 --- a/net/spdy/spdy_write_queue_unittest.cc +++ b/net/spdy/spdy_write_queue_unittest.cc @@ -75,8 +75,8 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM"); scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST"); - scoped_refptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM)); - scoped_refptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST)); + scoped_ptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM)); + scoped_ptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST)); // A NULL stream should still work. write_queue.Enqueue( @@ -117,9 +117,9 @@ TEST_F(SpdyWriteQueueTest, DequeuesFIFO) { scoped_ptr<SpdyBufferProducer> producer2 = IntToProducer(2); scoped_ptr<SpdyBufferProducer> producer3 = IntToProducer(3); - scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); - scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); - scoped_refptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, producer1.Pass(), stream1->GetWeakPtr()); @@ -155,13 +155,13 @@ TEST_F(SpdyWriteQueueTest, DequeuesFIFO) { TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { SpdyWriteQueue write_queue; - scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); - scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); for (int i = 0; i < 100; ++i) { - scoped_refptr<SpdyStream> stream = ((i % 3) == 0) ? stream1 : stream2; - write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), - stream->GetWeakPtr()); + base::WeakPtr<SpdyStream> stream = + (((i % 3) == 0) ? stream1 : stream2)->GetWeakPtr(); + write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), stream); } write_queue.RemovePendingWritesForStream(stream2->GetWeakPtr()); @@ -189,22 +189,22 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStreamsAfter) { SpdyWriteQueue write_queue; - scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); stream1->set_stream_id(1); - scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); stream2->set_stream_id(3); - scoped_refptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); + scoped_ptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY)); stream3->set_stream_id(5); // No stream id assigned. - scoped_refptr<SpdyStream> stream4(MakeTestStream(DEFAULT_PRIORITY)); - scoped_refptr<SpdyStream> streams[] = { - stream1, stream2, stream3, stream4 + scoped_ptr<SpdyStream> stream4(MakeTestStream(DEFAULT_PRIORITY)); + base::WeakPtr<SpdyStream> streams[] = { + stream1->GetWeakPtr(), stream2->GetWeakPtr(), + stream3->GetWeakPtr(), stream4->GetWeakPtr() }; for (int i = 0; i < 100; ++i) { - scoped_refptr<SpdyStream> stream = streams[i % arraysize(streams)]; write_queue.Enqueue(DEFAULT_PRIORITY, SYN_STREAM, IntToProducer(i), - stream->GetWeakPtr()); + streams[i % arraysize(streams)]); } write_queue.RemovePendingWritesForStreamsAfter(stream1->stream_id()); |