summaryrefslogtreecommitdiffstats
path: root/net/http/http_pipelined_connection_impl.cc
diff options
context:
space:
mode:
authorsimonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-11-15 20:36:51 +0000
committersimonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-11-15 20:36:51 +0000
commit2c49511423700ea206402a5d2a55daa65cb1bd3d (patch)
tree6f62dbf082214dba60518490116ac9b00100bf80 /net/http/http_pipelined_connection_impl.cc
parent1de476696d65c188b3d661316cc2a0dc11e2adcc (diff)
downloadchromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.zip
chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.tar.gz
chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.tar.bz2
Refactor state machines in HttpPipelinedConnectionImpl.
Hopefully you agree that this makes the code a whole lot easier to understand. BUG=None TEST=net_unittests Review URL: http://codereview.chromium.org/8515020 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@110159 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/http/http_pipelined_connection_impl.cc')
-rw-r--r--net/http/http_pipelined_connection_impl.cc439
1 files changed, 249 insertions, 190 deletions
diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc
index aa3d178..82c788f 100644
--- a/net/http/http_pipelined_connection_impl.cc
+++ b/net/http/http_pipelined_connection_impl.cc
@@ -34,25 +34,26 @@ HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
completed_one_request_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
send_next_state_(SEND_STATE_NONE),
+ send_still_on_call_stack_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
- send_user_callback_(NULL),
read_next_state_(READ_STATE_NONE),
+ active_read_id_(0),
+ read_still_on_call_stack_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
- this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
- read_user_callback_(NULL) {
+ this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) {
CHECK(connection_.get());
}
HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
CHECK_EQ(depth(), 0);
CHECK(stream_info_map_.empty());
- CHECK(deferred_request_queue_.empty());
+ CHECK(pending_send_request_queue_.empty());
CHECK(request_order_.empty());
CHECK_EQ(send_next_state_, SEND_STATE_NONE);
CHECK_EQ(read_next_state_, READ_STATE_NONE);
- CHECK(!send_user_callback_);
- CHECK(!read_user_callback_);
+ CHECK(!active_send_request_.get());
+ CHECK(!active_read_id_);
if (!usable_) {
connection_->socket()->Disconnect();
}
@@ -105,7 +106,6 @@ void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
stream_info_map_[pipeline_id].parser.reset();
}
CHECK(!stream_info_map_[pipeline_id].parser.get());
- CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
stream_info_map_.erase(pipeline_id);
delegate_->OnPipelineHasCapacity(this);
@@ -123,18 +123,18 @@ int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
return ERR_PIPELINE_EVICTION;
}
- DeferredSendRequest deferred_request;
- deferred_request.pipeline_id = pipeline_id;
- deferred_request.request_line = request_line;
- deferred_request.headers = headers;
- deferred_request.request_body = request_body;
- deferred_request.response = response;
- deferred_request.callback = callback;
- deferred_request_queue_.push(deferred_request);
+ PendingSendRequest* send_request = new PendingSendRequest;
+ send_request->pipeline_id = pipeline_id;
+ send_request->request_line = request_line;
+ send_request->headers = headers;
+ send_request->request_body = request_body;
+ send_request->response = response;
+ send_request->callback = callback;
+ pending_send_request_queue_.push(send_request);
int rv;
if (send_next_state_ == SEND_STATE_NONE) {
- send_next_state_ = SEND_STATE_NEXT_REQUEST;
+ send_next_state_ = SEND_STATE_START_IMMEDIATELY;
rv = DoSendRequestLoop(OK);
} else {
rv = ERR_IO_PENDING;
@@ -149,13 +149,19 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
SendRequestState state = send_next_state_;
send_next_state_ = SEND_STATE_NONE;
switch (state) {
- case SEND_STATE_NEXT_REQUEST:
- rv = DoSendNextRequest(rv);
+ case SEND_STATE_START_IMMEDIATELY:
+ rv = DoStartRequestImmediately(rv);
+ break;
+ case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
+ rv = DoStartNextDeferredRequest(rv);
+ break;
+ case SEND_STATE_SEND_ACTIVE_REQUEST:
+ rv = DoSendActiveRequest(rv);
break;
case SEND_STATE_COMPLETE:
rv = DoSendComplete(rv);
break;
- case SEND_STATE_UNUSABLE:
+ case SEND_STATE_EVICT_PENDING_REQUESTS:
rv = DoEvictPendingSendRequests(rv);
break;
default:
@@ -164,91 +170,107 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
break;
}
} while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
+ send_still_on_call_stack_ = false;
return rv;
}
void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
- CHECK(send_user_callback_);
+ CHECK(active_send_request_.get());
DoSendRequestLoop(result);
}
-int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
- CHECK(!deferred_request_queue_.empty());
- const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
- CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id));
- if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) {
- deferred_request_queue_.pop();
- if (deferred_request_queue_.empty()) {
- send_next_state_ = SEND_STATE_NONE;
- } else {
- send_next_state_ = SEND_STATE_NEXT_REQUEST;
+int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
+ CHECK(!active_send_request_.get());
+ CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
+ // If SendRequest() completes synchronously, then we need to return the value
+ // directly to the caller. |send_still_on_call_stack_| will track this.
+ // Otherwise, asynchronous completions will notify the caller via callback.
+ send_still_on_call_stack_ = true;
+ active_send_request_.reset(pending_send_request_queue_.front());
+ pending_send_request_queue_.pop();
+ send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
+ CHECK(!send_still_on_call_stack_);
+ CHECK(!active_send_request_.get());
+
+ while (!pending_send_request_queue_.empty()) {
+ scoped_ptr<PendingSendRequest> next_request(
+ pending_send_request_queue_.front());
+ pending_send_request_queue_.pop();
+ CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
+ if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
+ active_send_request_.reset(next_request.release());
+ send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
+ return OK;
}
- return OK;
- }
- CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get());
- int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest(
- deferred_request.request_line,
- deferred_request.headers,
- deferred_request.request_body,
- deferred_request.response,
- &send_io_callback_);
- // |result| == ERR_IO_PENDING means this function was *not* called on the same
- // stack as SendRequest(). That means we returned ERR_IO_PENDING to
- // SendRequest() earlier and will need to invoke its callback.
- if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
- send_user_callback_ = deferred_request.callback;
}
- stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING;
+
+ send_next_state_ = SEND_STATE_NONE;
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
+ CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
+ int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
+ SendRequest(active_send_request_->request_line,
+ active_send_request_->headers,
+ active_send_request_->request_body,
+ active_send_request_->response,
+ &send_io_callback_);
+ stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
send_next_state_ = SEND_STATE_COMPLETE;
return rv;
}
int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
- CHECK(!deferred_request_queue_.empty());
- const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
- CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state,
- STREAM_SENDING);
- request_order_.push(deferred_request.pipeline_id);
- stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT;
- deferred_request_queue_.pop();
+ CHECK(active_send_request_.get());
+ CHECK_EQ(STREAM_SENDING,
+ stream_info_map_[active_send_request_->pipeline_id].state);
+
+ request_order_.push(active_send_request_->pipeline_id);
+ stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
+
if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
result = ERR_PIPELINE_EVICTION;
}
if (result < OK) {
- send_next_state_ = SEND_STATE_UNUSABLE;
usable_ = false;
}
- if (send_user_callback_) {
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- deferred_request.pipeline_id,
- result));
- stream_info_map_[deferred_request.pipeline_id].pending_user_callback =
- send_user_callback_;
- send_user_callback_ = NULL;
- }
- if (result < OK) {
- return result;
+
+ if (!send_still_on_call_stack_) {
+ QueueUserCallback(active_send_request_->pipeline_id,
+ active_send_request_->callback, result, FROM_HERE);
}
- if (deferred_request_queue_.empty()) {
+
+ active_send_request_.reset();
+
+ if (send_still_on_call_stack_) {
+ // It should be impossible for another request to appear on the queue while
+ // this send was on the call stack.
+ CHECK(pending_send_request_queue_.empty());
send_next_state_ = SEND_STATE_NONE;
- return OK;
+ } else if (!usable_) {
+ send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
+ } else {
+ send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
}
- send_next_state_ = SEND_STATE_NEXT_REQUEST;
- return OK;
+
+ return result;
}
int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
- send_next_state_ = SEND_STATE_NONE;
- while (!deferred_request_queue_.empty()) {
- const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
- if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
- evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
+ while (!pending_send_request_queue_.empty()) {
+ scoped_ptr<PendingSendRequest> evicted_send(
+ pending_send_request_queue_.front());
+ pending_send_request_queue_.pop();
+ if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
+ evicted_send->callback->Run(ERR_PIPELINE_EVICTION);
}
- deferred_request_queue_.pop();
}
+ send_next_state_ = SEND_STATE_NONE;
return result;
}
@@ -256,18 +278,27 @@ int HttpPipelinedConnectionImpl::ReadResponseHeaders(
int pipeline_id,
OldCompletionCallback* callback) {
CHECK(ContainsKey(stream_info_map_, pipeline_id));
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT);
+ CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
+
if (!usable_) {
return ERR_PIPELINE_EVICTION;
}
+
stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
stream_info_map_[pipeline_id].read_headers_callback = callback;
- if (read_next_state_ == READ_STATE_NONE) {
- read_next_state_ = READ_STATE_NEXT_HEADERS;
+ if (read_next_state_ == READ_STATE_NONE &&
+ pipeline_id == request_order_.front()) {
+ read_next_state_ = READ_STATE_START_IMMEDIATELY;
return DoReadHeadersLoop(OK);
- } else {
- return ERR_IO_PENDING;
+ }
+ return ERR_IO_PENDING;
+}
+
+void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
+ if (read_next_state_ == READ_STATE_NONE) {
+ read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
+ DoReadHeadersLoop(OK);
}
}
@@ -277,19 +308,28 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
ReadHeadersState state = read_next_state_;
read_next_state_ = READ_STATE_NONE;
switch (state) {
- case READ_STATE_NEXT_HEADERS:
- rv = DoReadNextHeaders(rv);
+ case READ_STATE_START_IMMEDIATELY:
+ rv = DoStartReadImmediately(rv);
+ break;
+ case READ_STATE_START_NEXT_DEFERRED_READ:
+ rv = DoStartNextDeferredRead(rv);
break;
- case READ_STATE_COMPLETE:
+ case READ_STATE_READ_HEADERS:
+ rv = DoReadHeaders(rv);
+ break;
+ case READ_STATE_READ_HEADERS_COMPLETE:
rv = DoReadHeadersComplete(rv);
break;
case READ_STATE_WAITING_FOR_CLOSE:
- rv = DoReadWaitingForClose(rv);
+ // This is a holding state. We return instead of continuing to run hte
+ // loop. The state will advance when the stream calls Close().
+ rv = DoReadWaitForClose(rv);
+ read_still_on_call_stack_ = false;
return rv;
case READ_STATE_STREAM_CLOSED:
rv = DoReadStreamClosed();
break;
- case READ_STATE_UNUSABLE:
+ case READ_STATE_EVICT_PENDING_READS:
rv = DoEvictPendingReadHeaders(rv);
break;
case READ_STATE_NONE:
@@ -300,139 +340,135 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
break;
}
} while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
+ read_still_on_call_stack_ = false;
return rv;
}
void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
- CHECK(read_user_callback_);
DoReadHeadersLoop(result);
}
-int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
+int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
+ CHECK(!active_read_id_);
+ CHECK(!read_still_on_call_stack_);
CHECK(!request_order_.empty());
- int pipeline_id = request_order_.front();
- CHECK(ContainsKey(stream_info_map_, pipeline_id));
- if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) {
- // Since nobody will read whatever data is on the pipeline associated with
- // this closed request, we must shut down the rest of the pipeline.
- read_next_state_ = READ_STATE_UNUSABLE;
+ // If ReadResponseHeaders() completes synchronously, then we need to return
+ // the value directly to the caller. |read_still_on_call_stack_| will track
+ // this. Otherwise, asynchronous completions will notify the caller via
+ // callback.
+ read_still_on_call_stack_ = true;
+ read_next_state_ = READ_STATE_READ_HEADERS;
+ active_read_id_ = request_order_.front();
+ request_order_.pop();
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
+ CHECK(!active_read_id_);
+ CHECK(!read_still_on_call_stack_);
+
+ if (request_order_.empty()) {
+ read_next_state_ = READ_STATE_NONE;
return OK;
}
- if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
- return ERR_IO_PENDING;
- }
- CHECK(stream_info_map_[pipeline_id].parser.get());
- if (result == ERR_IO_PENDING) {
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE);
- } else {
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING);
- stream_info_map_[pipeline_id].state = STREAM_ACTIVE;
- }
+ int next_id = request_order_.front();
+ CHECK(ContainsKey(stream_info_map_, next_id));
+ switch (stream_info_map_[next_id].state) {
+ case STREAM_READ_PENDING:
+ read_next_state_ = READ_STATE_READ_HEADERS;
+ active_read_id_ = next_id;
+ request_order_.pop();
+ break;
- int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders(
- &read_io_callback_);
- if (rv == ERR_IO_PENDING) {
- read_next_state_ = READ_STATE_COMPLETE;
- read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
- } else if (rv < OK) {
- read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
- rv = ERR_PIPELINE_EVICTION;
- } else {
- CHECK_LE(OK, rv);
- read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- }
+ case STREAM_CLOSED:
+ // Since nobody will read whatever data is on the pipeline associated with
+ // this closed request, we must shut down the rest of the pipeline.
+ read_next_state_ = READ_STATE_EVICT_PENDING_READS;
+ break;
- // |result| == ERR_IO_PENDING means this function was *not* called on the same
- // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
- // ReadResponseHeaders() earlier and now need to invoke its callback.
- if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
- read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
- stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- pipeline_id,
- rv));
+ case STREAM_SENT:
+ read_next_state_ = READ_STATE_NONE;
+ break;
+
+ default:
+ NOTREACHED() << "Unexpected read state: "
+ << stream_info_map_[next_id].state;
}
+
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
+ CHECK(active_read_id_);
+ CHECK(ContainsKey(stream_info_map_, active_read_id_));
+ CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
+ stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
+ int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
+ &read_io_callback_);
+ read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
return rv;
}
int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
+ CHECK(active_read_id_);
+ CHECK(ContainsKey(stream_info_map_, active_read_id_));
+ CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
+
read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- if (read_user_callback_) {
- int pipeline_id = request_order_.front();
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- pipeline_id,
- result));
- stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
- read_user_callback_ = NULL;
+ if (result < OK) {
+ if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
+ result = ERR_PIPELINE_EVICTION;
+ }
+ usable_ = false;
+ }
+
+ if (!read_still_on_call_stack_) {
+ QueueUserCallback(active_read_id_,
+ stream_info_map_[active_read_id_].read_headers_callback,
+ result, FROM_HERE);
}
+
return result;
}
-int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
+int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
return result;
}
int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
- CHECK(!request_order_.empty());
- int pipeline_id = request_order_.front();
- CHECK(ContainsKey(stream_info_map_, pipeline_id));
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
- CHECK(stream_info_map_[pipeline_id].read_headers_callback);
- stream_info_map_[pipeline_id].read_headers_callback = NULL;
- request_order_.pop();
+ CHECK(active_read_id_);
+ CHECK(ContainsKey(stream_info_map_, active_read_id_));
+ CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
+ active_read_id_ = 0;
if (!usable_) {
- read_next_state_ = READ_STATE_UNUSABLE;
- return OK;
- } else {
- completed_one_request_ = true;
- if (!request_order_.empty()) {
- int next_pipeline_id = request_order_.front();
- CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
- if (stream_info_map_[next_pipeline_id].read_headers_callback) {
- stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
- read_next_state_ = READ_STATE_NEXT_HEADERS;
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
- ERR_IO_PENDING));
- return ERR_IO_PENDING; // Wait for the task to fire.
- }
- }
- read_next_state_ = READ_STATE_NONE;
+ read_next_state_ = READ_STATE_EVICT_PENDING_READS;
return OK;
}
+ completed_one_request_ = true;
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::StartNextDeferredRead));
+ read_next_state_ = READ_STATE_NONE;
+ return OK;
}
int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
while (!request_order_.empty()) {
int evicted_id = request_order_.front();
request_order_.pop();
- if (!ContainsKey(stream_info_map_, evicted_id) ||
- (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
+ if (!ContainsKey(stream_info_map_, evicted_id)) {
continue;
}
- if (stream_info_map_[evicted_id].state != STREAM_CLOSED) {
- stream_info_map_[evicted_id].pending_user_callback =
- stream_info_map_[evicted_id].read_headers_callback;
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- evicted_id,
- ERR_PIPELINE_EVICTION));
+ if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
+ stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
+ QueueUserCallback(evicted_id,
+ stream_info_map_[evicted_id].read_headers_callback,
+ ERR_PIPELINE_EVICTION,
+ FROM_HERE);
}
- stream_info_map_[evicted_id].read_headers_callback = NULL;
}
read_next_state_ = READ_STATE_NONE;
return result;
@@ -453,8 +489,8 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
case STREAM_SENDING:
usable_ = false;
stream_info_map_[pipeline_id].state = STREAM_CLOSED;
- send_user_callback_ = NULL;
- send_next_state_ = SEND_STATE_UNUSABLE;
+ active_send_request_.reset();
+ send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
DoSendRequestLoop(OK);
break;
@@ -462,9 +498,10 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
case STREAM_READ_PENDING:
usable_ = false;
stream_info_map_[pipeline_id].state = STREAM_CLOSED;
- stream_info_map_[pipeline_id].read_headers_callback = NULL;
- if (read_next_state_ == READ_STATE_NONE) {
- read_next_state_ = READ_STATE_UNUSABLE;
+ if (!request_order_.empty() &&
+ pipeline_id == request_order_.front() &&
+ read_next_state_ == READ_STATE_NONE) {
+ read_next_state_ = READ_STATE_EVICT_PENDING_READS;
DoReadHeadersLoop(OK);
}
break;
@@ -475,10 +512,13 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
usable_ = false;
}
read_next_state_ = READ_STATE_STREAM_CLOSED;
- read_user_callback_ = NULL;
DoReadHeadersLoop(OK);
break;
+ case STREAM_READ_EVICTED:
+ stream_info_map_[pipeline_id].state = STREAM_CLOSED;
+ break;
+
case STREAM_CLOSED:
case STREAM_UNUSED:
// TODO(simonjam): Why is Close() sometimes called twice?
@@ -496,8 +536,7 @@ int HttpPipelinedConnectionImpl::ReadResponseBody(
int buf_len,
OldCompletionCallback* callback) {
CHECK(ContainsKey(stream_info_map_, pipeline_id));
- CHECK(!request_order_.empty());
- CHECK_EQ(pipeline_id, request_order_.front());
+ CHECK_EQ(active_read_id_, pipeline_id);
CHECK(stream_info_map_[pipeline_id].parser.get());
return stream_info_map_[pipeline_id].parser->ReadResponseBody(
buf, buf_len, callback);
@@ -567,10 +606,29 @@ void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
cert_request_info);
}
+void HttpPipelinedConnectionImpl::QueueUserCallback(
+ int pipeline_id,
+ OldCompletionCallback* callback,
+ int rv,
+ const tracked_objects::Location& from_here) {
+ CHECK(!stream_info_map_[pipeline_id].pending_user_callback);
+ stream_info_map_[pipeline_id].pending_user_callback = callback;
+ MessageLoop::current()->PostTask(
+ from_here,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::FireUserCallback,
+ pipeline_id,
+ rv));
+}
+
void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
int result) {
if (ContainsKey(stream_info_map_, pipeline_id)) {
- stream_info_map_[pipeline_id].pending_user_callback->Run(result);
+ CHECK(stream_info_map_[pipeline_id].pending_user_callback);
+ OldCompletionCallback* callback =
+ stream_info_map_[pipeline_id].pending_user_callback;
+ stream_info_map_[pipeline_id].pending_user_callback = NULL;
+ callback->Run(result);
}
}
@@ -602,14 +660,15 @@ bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
return was_npn_negotiated_;
}
-HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() {
+HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() {
}
-HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() {
+HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
}
HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
: read_headers_callback(NULL),
+ pending_user_callback(NULL),
state(STREAM_CREATED) {
}