summaryrefslogtreecommitdiffstats
path: root/net/spdy/spdy_session.cc
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-03-02 01:00:45 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-03-02 01:00:45 +0000
commite3861ca766137c03543b2aa9b115bc7ef5af57ab (patch)
tree1a023e95d58571d60231c68fdc936a8c808da6b7 /net/spdy/spdy_session.cc
parent7ab2d1bd8d58531d60eaafca1160b1426a451f89 (diff)
downloadchromium_src-e3861ca766137c03543b2aa9b115bc7ef5af57ab.zip
chromium_src-e3861ca766137c03543b2aa9b115bc7ef5af57ab.tar.gz
chromium_src-e3861ca766137c03543b2aa9b115bc7ef5af57ab.tar.bz2
[SPDY] Create SpdyStreamRequest class to help manage delayed creation of streams
This avoids passing around pointers to memory locations that are asynchronously filled, which is scary. Rework how SpdySession manages its pending stream requests. In particular, avoid leaking streams if a pending request is cancelled before the created stream is handed off to it. Also, replace a list-based queue with an easier to manage deque. Move StreamReleaserCallback to spdy_test_util_common.{h,cc}. BUG=178943 Review URL: https://codereview.chromium.org/12380005 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@185640 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/spdy/spdy_session.cc')
-rw-r--r--net/spdy/spdy_session.cc293
1 files changed, 166 insertions, 127 deletions
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 714b4f0..a2b5da4 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -4,6 +4,7 @@
#include "net/spdy/spdy_session.h"
+#include <algorithm>
#include <map>
#include "base/basictypes.h"
@@ -201,6 +202,74 @@ const size_t kDefaultInitialRecvWindowSize = 10 * 1024 * 1024; // 10MB
} // namespace
+SpdyStreamRequest::SpdyStreamRequest() {
+ Reset();
+}
+
+SpdyStreamRequest::~SpdyStreamRequest() {
+ CancelRequest();
+}
+
+int SpdyStreamRequest::StartRequest(
+ const scoped_refptr<SpdySession>& session,
+ const GURL& url,
+ RequestPriority priority,
+ const BoundNetLog& net_log,
+ const CompletionCallback& callback) {
+ DCHECK(session);
+ DCHECK(!session_);
+ DCHECK(!stream_);
+ DCHECK(callback_.is_null());
+
+ session_ = session;
+ url_ = url;
+ priority_ = priority;
+ net_log_ = net_log;
+ callback_ = callback;
+
+ scoped_refptr<SpdyStream> stream;
+ int rv = session->TryCreateStream(this, &stream);
+ if (rv == OK) {
+ Reset();
+ stream_ = stream;
+ }
+ return rv;
+}
+
+void SpdyStreamRequest::CancelRequest() {
+ if (session_)
+ session_->CancelStreamRequest(this);
+ Reset();
+}
+
+scoped_refptr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
+ DCHECK(!session_.get());
+ scoped_refptr<SpdyStream> stream = stream_;
+ DCHECK(stream.get());
+ Reset();
+ return stream;
+}
+
+void SpdyStreamRequest::OnRequestComplete(
+ const scoped_refptr<SpdyStream>& stream,
+ int rv) {
+ DCHECK(session_.get());
+ DCHECK(!callback_.is_null());
+ CompletionCallback callback = callback_;
+ Reset();
+ stream_ = stream;
+ callback.Run(rv);
+}
+
+void SpdyStreamRequest::Reset() {
+ session_ = NULL;
+ stream_ = NULL;
+ url_ = GURL();
+ priority_ = MINIMUM_PRIORITY;
+ net_log_ = BoundNetLog();
+ callback_.Reset();
+}
+
// static
void SpdySession::SpdyIOBufferProducer::ActivateStream(
SpdySession* spdy_session,
@@ -302,28 +371,6 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
// TODO(mbelshe): consider randomization of the stream_hi_water_mark.
}
-SpdySession::PendingCreateStream::PendingCreateStream(
- const GURL& url, RequestPriority priority,
- scoped_refptr<SpdyStream>* spdy_stream,
- const BoundNetLog& stream_net_log,
- const CompletionCallback& callback)
- : url(&url),
- priority(priority),
- spdy_stream(spdy_stream),
- stream_net_log(&stream_net_log),
- callback(callback) {
-}
-
-SpdySession::PendingCreateStream::~PendingCreateStream() {}
-
-SpdySession::CallbackResultPair::CallbackResultPair(
- const CompletionCallback& callback_in, int result_in)
- : callback(callback_in),
- result(result_in) {
-}
-
-SpdySession::CallbackResultPair::~CallbackResultPair() {}
-
SpdySession::~SpdySession() {
if (state_ != STATE_CLOSED) {
state_ = STATE_CLOSED;
@@ -341,7 +388,10 @@ SpdySession::~SpdySession() {
DCHECK_EQ(0u, num_active_streams());
DCHECK_EQ(0u, num_unclaimed_pushed_streams());
- DCHECK(pending_callback_map_.empty());
+ for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
+ DCHECK(pending_create_stream_queues_[i].empty());
+ }
+ DCHECK(pending_stream_request_completions_.empty());
RecordHistograms();
@@ -460,95 +510,29 @@ int SpdySession::GetPushStream(
return 0;
}
-int SpdySession::CreateStream(
- const GURL& url,
- RequestPriority priority,
- scoped_refptr<SpdyStream>* spdy_stream,
- const BoundNetLog& stream_net_log,
- const CompletionCallback& callback) {
+int SpdySession::TryCreateStream(SpdyStreamRequest* request,
+ scoped_refptr<SpdyStream>* stream) {
if (!max_concurrent_streams_ ||
(active_streams_.size() + created_streams_.size() <
max_concurrent_streams_)) {
- return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
+ return CreateStream(*request, stream);
}
stalled_streams_++;
net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
- pending_create_stream_queues_[priority].push(
- PendingCreateStream(url, priority, spdy_stream,
- stream_net_log, callback));
+ pending_create_stream_queues_[request->priority()].push_back(request);
return ERR_IO_PENDING;
}
-void SpdySession::ProcessPendingCreateStreams() {
- while (!max_concurrent_streams_ ||
- (active_streams_.size() + created_streams_.size() <
- max_concurrent_streams_)) {
- bool no_pending_create_streams = true;
- for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
- if (!pending_create_stream_queues_[i].empty()) {
- PendingCreateStream pending_create =
- pending_create_stream_queues_[i].front();
- pending_create_stream_queues_[i].pop();
- no_pending_create_streams = false;
- int error = CreateStreamImpl(*pending_create.url,
- pending_create.priority,
- pending_create.spdy_stream,
- *pending_create.stream_net_log);
- scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
- DCHECK(!ContainsKey(pending_callback_map_, stream));
- pending_callback_map_.insert(std::make_pair(stream,
- CallbackResultPair(pending_create.callback, error)));
- MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&SpdySession::InvokeUserStreamCreationCallback,
- weak_factory_.GetWeakPtr(), stream));
- break;
- }
- }
- if (no_pending_create_streams)
- return; // there were no streams in any queue
- }
-}
-
-void SpdySession::CancelPendingCreateStreams(
- const scoped_refptr<SpdyStream>* spdy_stream) {
- PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
- if (it != pending_callback_map_.end()) {
- pending_callback_map_.erase(it);
- return;
- }
-
- for (int i = 0; i < NUM_PRIORITIES; ++i) {
- PendingCreateStreamQueue tmp;
- // Make a copy removing this trans
- while (!pending_create_stream_queues_[i].empty()) {
- PendingCreateStream pending_create =
- pending_create_stream_queues_[i].front();
- pending_create_stream_queues_[i].pop();
- if (pending_create.spdy_stream != spdy_stream)
- tmp.push(pending_create);
- }
- // Now copy it back
- while (!tmp.empty()) {
- pending_create_stream_queues_[i].push(tmp.front());
- tmp.pop();
- }
- }
-}
-
-int SpdySession::CreateStreamImpl(
- const GURL& url,
- RequestPriority priority,
- scoped_refptr<SpdyStream>* spdy_stream,
- const BoundNetLog& stream_net_log) {
- DCHECK_GE(priority, MINIMUM_PRIORITY);
- DCHECK_LT(priority, NUM_PRIORITIES);
+int SpdySession::CreateStream(const SpdyStreamRequest& request,
+ scoped_refptr<SpdyStream>* stream) {
+ DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
+ DCHECK_LT(request.priority(), NUM_PRIORITIES);
// Make sure that we don't try to send https/wss over an unauthenticated, but
// encrypted SSL socket.
if (is_secure_ && certificate_error_code_ != OK &&
- (url.SchemeIs("https") || url.SchemeIs("wss"))) {
+ (request.url().SchemeIs("https") || request.url().SchemeIs("wss"))) {
RecordProtocolErrorHistogram(
PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
CloseSessionOnError(
@@ -559,27 +543,81 @@ int SpdySession::CreateStreamImpl(
return ERR_SPDY_PROTOCOL_ERROR;
}
- const std::string& path = url.PathForRequest();
+ const std::string& path = request.url().PathForRequest();
- *spdy_stream = new SpdyStream(this,
- false,
- stream_net_log);
- const scoped_refptr<SpdyStream>& stream = *spdy_stream;
+ *stream = new SpdyStream(this, false, request.net_log());
- stream->set_priority(priority);
- stream->set_path(path);
- stream->set_send_window_size(stream_initial_send_window_size_);
- stream->set_recv_window_size(stream_initial_recv_window_size_);
- created_streams_.insert(stream);
+ (*stream)->set_priority(request.priority());
+ (*stream)->set_path(path);
+ (*stream)->set_send_window_size(stream_initial_send_window_size_);
+ (*stream)->set_recv_window_size(stream_initial_recv_window_size_);
+ created_streams_.insert(*stream);
- UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
- static_cast<int>(priority), 0, 10, 11);
+ UMA_HISTOGRAM_CUSTOM_COUNTS(
+ "Net.SpdyPriorityCount",
+ static_cast<int>(request.priority()), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
return OK;
}
+void SpdySession::CancelStreamRequest(SpdyStreamRequest* request) {
+ if (DCHECK_IS_ON()) {
+ // |request| should not be in a queue not matching its priority.
+ for (int i = 0; i < NUM_PRIORITIES; ++i) {
+ if (request->priority() == i)
+ continue;
+ PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
+ DCHECK(std::find(queue->begin(), queue->end(), request) == queue->end());
+ }
+ }
+
+ PendingStreamRequestQueue* queue =
+ &pending_create_stream_queues_[request->priority()];
+ // Remove |request| from |queue| while preserving the order of the
+ // other elements.
+ PendingStreamRequestQueue::iterator it =
+ std::find(queue->begin(), queue->end(), request);
+ if (it != queue->end()) {
+ it = queue->erase(it);
+ // |request| should be in the queue at most once, and if it is
+ // present, should not be pending completion.
+ DCHECK(std::find(it, queue->end(), request) == queue->end());
+ DCHECK(!ContainsKey(pending_stream_request_completions_,
+ request));
+ return;
+ }
+
+ pending_stream_request_completions_.erase(request);
+}
+
+void SpdySession::ProcessPendingStreamRequests() {
+ while (!max_concurrent_streams_ ||
+ (active_streams_.size() + created_streams_.size() <
+ max_concurrent_streams_)) {
+ bool no_pending_create_streams = true;
+ for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
+ if (!pending_create_stream_queues_[i].empty()) {
+ SpdyStreamRequest* pending_request =
+ pending_create_stream_queues_[i].front();
+ pending_create_stream_queues_[i].pop_front();
+ no_pending_create_streams = false;
+ DCHECK(!ContainsKey(pending_stream_request_completions_,
+ pending_request));
+ pending_stream_request_completions_.insert(pending_request);
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&SpdySession::CompleteStreamRequest,
+ weak_factory_.GetWeakPtr(), pending_request));
+ break;
+ }
+ }
+ if (no_pending_create_streams)
+ return; // there were no streams in any queue
+ }
+}
+
bool SpdySession::NeedsCredentials() const {
if (!is_secure_)
return false;
@@ -778,7 +816,7 @@ void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
DCHECK_EQ(0u, stream->stream_id());
created_streams_.erase(scoped_refptr<SpdyStream>(stream));
- ProcessPendingCreateStreams();
+ ProcessPendingStreamRequests();
}
void SpdySession::ResetStream(SpdyStreamId stream_id,
@@ -1053,11 +1091,11 @@ void SpdySession::CloseAllStreams(net::Error status) {
}
for (int i = 0; i < NUM_PRIORITIES; ++i) {
- while (!pending_create_stream_queues_[i].empty()) {
- PendingCreateStream pending_create =
- pending_create_stream_queues_[i].front();
- pending_create_stream_queues_[i].pop();
- pending_create.callback.Run(ERR_ABORTED);
+ PendingStreamRequestQueue queue;
+ queue.swap(pending_create_stream_queues_[i]);
+ for (PendingStreamRequestQueue::const_iterator it = queue.begin();
+ it != queue.end(); ++it) {
+ (*it)->OnRequestComplete(NULL, ERR_ABORTED);
}
}
@@ -1274,7 +1312,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
active_streams_.erase(it2);
if (stream)
stream->OnClose(status);
- ProcessPendingCreateStreams();
+ ProcessPendingStreamRequests();
}
void SpdySession::RemoveFromPool() {
@@ -1861,7 +1899,7 @@ void SpdySession::HandleSetting(uint32 id, uint32 value) {
case SETTINGS_MAX_CONCURRENT_STREAMS:
max_concurrent_streams_ = std::min(static_cast<size_t>(value),
kMaxConcurrentStreamLimit);
- ProcessPendingCreateStreams();
+ ProcessPendingStreamRequests();
break;
case SETTINGS_INITIAL_WINDOW_SIZE: {
if (flow_control_state_ < FLOW_CONTROL_STREAM) {
@@ -2087,18 +2125,19 @@ void SpdySession::RecordHistograms() {
}
}
-void SpdySession::InvokeUserStreamCreationCallback(
- scoped_refptr<SpdyStream>* stream) {
- PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
+void SpdySession::CompleteStreamRequest(SpdyStreamRequest* pending_request) {
+ PendingStreamRequestCompletionSet::iterator it =
+ pending_stream_request_completions_.find(pending_request);
- // Exit if the request has already been cancelled.
- if (it == pending_callback_map_.end())
+ // Abort if the request has already been cancelled.
+ if (it == pending_stream_request_completions_.end())
return;
- CompletionCallback callback = it->second.callback;
- int result = it->second.result;
- pending_callback_map_.erase(it);
- callback.Run(result);
+ scoped_refptr<SpdyStream> stream;
+ int rv = CreateStream(*pending_request, &stream);
+ pending_stream_request_completions_.erase(it);
+
+ pending_request->OnRequestComplete(stream, rv);
}
SSLClientSocket* SpdySession::GetSSLClientSocket() const {