diff options
author | erikchen@google.com <erikchen@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-15 23:10:36 +0000 |
---|---|---|
committer | erikchen@google.com <erikchen@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-15 23:10:36 +0000 |
commit | 48599ca76c53dfeff62302135e7435f2ee3d1bee (patch) | |
tree | 3ee2083d62b50bd0a66eb72d48aea2fd90d048b9 | |
parent | c774bfa1edf43f64b881f9e0061c6c179e0c5e63 (diff) | |
download | chromium_src-48599ca76c53dfeff62302135e7435f2ee3d1bee.zip chromium_src-48599ca76c53dfeff62302135e7435f2ee3d1bee.tar.gz chromium_src-48599ca76c53dfeff62302135e7435f2ee3d1bee.tar.bz2 |
cleaned up the way that streams close so that all of them use DeleteStream
BUG=none
TEST=none
Review URL: http://codereview.chromium.org/2764005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@49856 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/spdy/spdy_http_stream.h | 5 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 94 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 10 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 117 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 18 |
5 files changed, 52 insertions, 192 deletions
diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h index d5131e8..11c00e0 100644 --- a/net/spdy/spdy_http_stream.h +++ b/net/spdy/spdy_http_stream.h @@ -48,10 +48,7 @@ class SpdyHttpStream : public SpdyStream { int ReadResponseBody( IOBuffer* buf, int buf_len, CompletionCallback* callback); - // Cancels the stream. Note that this does not immediately cause deletion of - // the stream. This function is used to cancel any callbacks from being - // invoked. TODO(willchan): It should also free up any memory associated with - // the stream, such as IOBuffers. + // Cancels any callbacks from being invoked and deletes the stream. void Cancel(); // Returns the number of bytes uploaded. diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 6cb33f8..7fdd395 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -481,20 +481,12 @@ int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id, return ERR_IO_PENDING; } -bool SpdySession::CancelStream(spdy::SpdyStreamId stream_id) { - LOG(INFO) << "Cancelling stream " << stream_id; - if (!IsStreamActive(stream_id)) - return false; - +void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) { + LOG(INFO) << "Closing stream " << stream_id << " with status " << status; // TODO(mbelshe): We should send a RST_STREAM control frame here // so that the server can cancel a large send. - // TODO(mbelshe): Write a method for tearing down a stream - // that cleans it out of the active list, the pending list, - // etc. - scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; - DeactivateStream(stream_id); - return true; + DeleteStream(stream_id, status); } bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const { @@ -789,41 +781,38 @@ void SpdySession::WriteSocket() { } } -void SpdySession::CloseAllStreams(net::Error code) { +void SpdySession::CloseAllStreams(net::Error status) { LOG(INFO) << "Closing all SPDY Streams for " << host_port_pair().ToString(); static StatsCounter abandoned_streams("spdy.abandoned_streams"); static StatsCounter abandoned_push_streams("spdy.abandoned_push_streams"); - if (active_streams_.size()) { + if (!active_streams_.empty()) abandoned_streams.Add(active_streams_.size()); + if (!pushed_streams_.empty()) { + streams_abandoned_count_ += pushed_streams_.size(); + abandoned_push_streams.Add(pushed_streams_.size()); + } - // Create a copy of the list, since aborting streams can invalidate - // our list. - SpdyStream** list = new SpdyStream*[active_streams_.size()]; - ActiveStreamMap::const_iterator it; - int index = 0; - for (it = active_streams_.begin(); it != active_streams_.end(); ++it) - list[index++] = it->second; - - // Issue the aborts. - for (--index; index >= 0; index--) { - LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id() - << "): " << list[index]->path(); - list[index]->OnClose(code); - } - - // Clear out anything pending. - active_streams_.clear(); - - delete[] list; + while (!active_streams_.empty()) { + ActiveStreamMap::iterator it = active_streams_.begin(); + const scoped_refptr<SpdyStream>& stream = it->second; + DCHECK(stream); + LOG(ERROR) << "ABANDONED (stream_id=" << stream->stream_id() + << "): " << stream->path(); + DeleteStream(stream->stream_id(), status); } - if (pushed_streams_.size()) { - streams_abandoned_count_ += pushed_streams_.size(); - abandoned_push_streams.Add(pushed_streams_.size()); - pushed_streams_.clear(); + // TODO(erikchen): ideally stream->OnClose() is only ever called by + // DeleteStream, but pending streams fall into their own category for now. + PendingStreamMap::iterator it; + for (it = pending_streams_.begin(); it != pending_streams_.end(); ++it) + { + const scoped_refptr<SpdyStream>& stream = it->second; + if (stream) + stream->OnClose(ERR_ABORTED); } + pending_streams_.clear(); // We also need to drain the queue. while (queue_.size()) @@ -876,10 +865,8 @@ void SpdySession::ActivateStream(SpdyStream* stream) { active_streams_[id] = stream; } -void SpdySession::DeactivateStream(spdy::SpdyStreamId id) { - DCHECK(IsStreamActive(id)); - - // Verify it is not on the pushed_streams_ list. +void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) { + // Remove the stream from pushed_streams_ and active_streams_. ActivePushedStreamList::iterator it; for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { scoped_refptr<SpdyHttpStream> curr = *it; @@ -889,7 +876,15 @@ void SpdySession::DeactivateStream(spdy::SpdyStreamId id) { } } - active_streams_.erase(id); + // The stream should still exist. + ActiveStreamMap::iterator it2 = active_streams_.find(id); + DCHECK(it2 != active_streams_.end()); + + // If this is an active stream, call the callback. + const scoped_refptr<SpdyStream>& stream = it2->second; + if (stream) + stream->OnClose(status); + active_streams_.erase(it2); } void SpdySession::RemoveFromPool() { @@ -948,15 +943,7 @@ void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id, } scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; - - // Note that calling OnDataReceived() on |stream| will potentially invoke a - // user callback, after which the state of |stream| and |this| may be altered. - // http://crbug.com/44800 had a bug where the stream gets deactivated in this - // callback. - bool success = stream->OnDataReceived(data, len); - // |len| == 0 implies a closed stream. - if ((!success || !len) && IsStreamActive(stream_id)) - DeactivateStream(stream_id); + stream->OnDataReceived(data, len); } bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers, @@ -987,8 +974,7 @@ bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers, if (rv < 0) { DCHECK_NE(rv, ERR_IO_PENDING); const spdy::SpdyStreamId stream_id = stream->stream_id(); - stream->OnClose(rv); - DeactivateStream(stream_id); + DeleteStream(stream_id, rv); return false; } return true; @@ -1210,10 +1196,8 @@ void SpdySession::OnFin(const spdy::SpdyRstStreamControlFrame& frame) { LOG(ERROR) << "Spdy stream closed: " << frame.status(); // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. // For now, it doesn't matter much - it is a protocol error. - stream->OnClose(ERR_FAILED); + DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); } - - DeactivateStream(stream_id); } void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) { diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 4772c58..4d70d22 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -78,8 +78,8 @@ class SpdySession : public base::RefCounted<SpdySession>, int WriteStreamData(spdy::SpdyStreamId stream_id, net::IOBuffer* data, int len); - // Cancel a stream. - bool CancelStream(spdy::SpdyStreamId stream_id); + // Close a stream. + void CloseStream(spdy::SpdyStreamId stream_id, int status); // Check if a stream is active. bool IsStreamActive(spdy::SpdyStreamId stream_id) const; @@ -88,8 +88,8 @@ class SpdySession : public base::RefCounted<SpdySession>, // status, such as "resolving host", "connecting", etc. LoadState GetLoadState() const; - // Closes all open streams. Used as part of shutdown. - void CloseAllStreams(net::Error code); + // Closes all streams. Used as part of shutdown. + void CloseAllStreams(net::Error status); // Enable or disable SSL. static void SetSSLMode(bool enable) { use_ssl_ = enable; } @@ -165,7 +165,7 @@ class SpdySession : public base::RefCounted<SpdySession>, // Track active streams in the active stream list. void ActivateStream(SpdyStream* stream); - void DeactivateStream(spdy::SpdyStreamId id); + void DeleteStream(spdy::SpdyStreamId id, int status); // Removes this session from the session pool. void RemoveFromPool(); diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index 6b443e1..1cefb49 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -160,123 +160,6 @@ TEST_F(SpdySessionTest, GoAway) { session2 = NULL; } -// StreamCanceler is used for SpdySessionTest.CancelStreamOnClose. It is used -// for two callbacks: the stream's SendRequest() and ReadResponseBody() -// callbacks. -class StreamCanceler { - public: - enum State { - WAITING_FOR_CONNECT, - WAITING_FOR_RESPONSE, - STATE_DONE - }; - - explicit StreamCanceler(const scoped_refptr<SpdyHttpStream>& stream) - : stream_(stream), - ALLOW_THIS_IN_INITIALIZER_LIST( - callback_(this, &StreamCanceler::OnIOComplete)), - buf_(new IOBufferWithSize(64)), - state_(WAITING_FOR_CONNECT) {} - - CompletionCallback* callback() { return &callback_; } - - State state() const { return state_; } - - private: - void OnIOComplete(int result) { - MessageLoop::current()->Quit(); - switch (state_) { - case WAITING_FOR_CONNECT: - // After receiving this callback, start the ReadResponseBody() request. - // We need to do this here rather than elsewhere, since the MessageLoop - // will keep processing the pending read tasks until there aren't any - // more, so we won't get a chance to get an asynchronous callback for - // ReadResponseBody() unless we call it here in the callback for - // SendRequest(). - EXPECT_EQ(OK, result); - state_ = WAITING_FOR_RESPONSE; - EXPECT_EQ(ERR_IO_PENDING, stream_->ReadResponseBody( - buf_.get(), buf_->size(), &callback_)); - break; - case WAITING_FOR_RESPONSE: - // The result should be the 6 bytes of the body. The next read will - // succeed synchronously, indicating the stream is closed. We cancel - // the stream during the callback for the first ReadResponseBody() call - // which will deactivate the stream. The code should handle this case. - EXPECT_EQ(6, result); - EXPECT_EQ(OK, - stream_->ReadResponseBody( - buf_.get(), buf_->size(), &callback_)); - stream_->Cancel(); - state_ = STATE_DONE; - break; - default: - NOTREACHED(); - break; - } - } - - const scoped_refptr<SpdyHttpStream> stream_; - CompletionCallbackImpl<StreamCanceler> callback_; - scoped_refptr<IOBufferWithSize> buf_; - State state_; - - DISALLOW_COPY_AND_ASSIGN(StreamCanceler); -}; - -TEST_F(SpdySessionTest, CancelStreamOnClose) { - SpdySessionTest::TurnOffCompression(); - SessionDependencies session_deps; - session_deps.host_resolver->set_synchronous_mode(true); - - MockConnect connect_data(false, OK); - MockRead reads[] = { - MockRead(true, reinterpret_cast<const char*>(kGetSynReply), - arraysize(kGetSynReply)), - MockRead(true, reinterpret_cast<const char*>(kGetBodyFrame), - arraysize(kGetBodyFrame)), - MockRead(true, 0, 0) // EOF - }; - StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0); - data.set_connect_data(connect_data); - session_deps.socket_factory.AddSocketDataProvider(&data); - - SSLSocketDataProvider ssl(false, OK); - session_deps.socket_factory.AddSSLSocketDataProvider(&ssl); - - scoped_refptr<HttpNetworkSession> http_session(CreateSession(&session_deps)); - - const std::string kTestHost("www.google.com"); - const int kTestPort = 80; - HostPortPair test_host_port_pair; - test_host_port_pair.host = kTestHost; - test_host_port_pair.port = kTestPort; - - scoped_refptr<SpdySessionPool> spdy_session_pool( - http_session->spdy_session_pool()); - EXPECT_FALSE(spdy_session_pool->HasSession(test_host_port_pair)); - scoped_refptr<SpdySession> session = - spdy_session_pool->Get( - test_host_port_pair, http_session.get(), BoundNetLog()); - - HttpRequestInfo request; - request.url = GURL("http://www.google.com"); - - scoped_refptr<SpdyHttpStream> stream = - session->GetOrCreateStream(request, NULL, BoundNetLog()); - TCPSocketParams tcp_params(kTestHost, kTestPort, MEDIUM, GURL(), false); - int rv = session->Connect(kTestHost, tcp_params, MEDIUM); - ASSERT_EQ(OK, rv); - - HttpResponseInfo response; - TestCompletionCallback callback; - StreamCanceler canceler(stream); - rv = stream->SendRequest(NULL, &response, canceler.callback()); - ASSERT_EQ(ERR_IO_PENDING, rv); - while (canceler.state() != StreamCanceler::STATE_DONE) - MessageLoop::current()->Run(); -} - // kPush is a server-issued SYN_STREAM with stream id 2, and // associated stream id 1. It also includes 3 headers of path, // status, and HTTP version. diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 359217e0..baff2b9 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -33,15 +33,10 @@ SpdyStream::SpdyStream( SpdyStream::~SpdyStream() { DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_; - // TODO(willchan): We're still calling CancelStream() too many times, because - // inactive pending/pushed streams will still have stream_id_ set. - if (stream_id_) { - session_->CancelStream(stream_id_); - } else { - // When the stream_id_ is 0, we expect that it is because - // we've cancelled or closed the stream and set the stream_id to 0. + // When the stream_id_ is 0, we expect that it is because + // we've cancelled or closed the stream and set the stream_id to 0. + if (!stream_id_) DCHECK(response_complete_); - } } const HttpResponseInfo* SpdyStream::GetResponseInfo() const { @@ -121,14 +116,15 @@ bool SpdyStream::DoOnDataReceived(const char* data, int length) { // We cannot pass data up to the caller unless the reply headers have been // received. if (!response_->headers) { - OnClose(ERR_SYN_REPLY_NOT_RECEIVED); + session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); return false; } // A zero-length read means that the stream is being closed. if (!length) { metrics_.StopStream(); - OnClose(net::OK); + scoped_refptr<SpdyStream> self(this); + session_->CloseStream(stream_id_, net::OK); UpdateHistograms(); return true; } @@ -163,7 +159,7 @@ void SpdyStream::DoOnClose(int status) { void SpdyStream::DoCancel() { cancelled_ = true; - session_->CancelStream(stream_id_); + session_->CloseStream(stream_id_, ERR_ABORTED); } int SpdyStream::DoSendRequest(UploadDataStream* upload_data, |