summaryrefslogtreecommitdiffstats
path: root/net/http
diff options
context:
space:
mode:
authorsimonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-11-15 20:36:51 +0000
committersimonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-11-15 20:36:51 +0000
commit2c49511423700ea206402a5d2a55daa65cb1bd3d (patch)
tree6f62dbf082214dba60518490116ac9b00100bf80 /net/http
parent1de476696d65c188b3d661316cc2a0dc11e2adcc (diff)
downloadchromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.zip
chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.tar.gz
chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.tar.bz2
Refactor state machines in HttpPipelinedConnectionImpl.
Hopefully you agree that this makes the code a whole lot easier to understand. BUG=None TEST=net_unittests Review URL: http://codereview.chromium.org/8515020 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@110159 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/http')
-rw-r--r--net/http/http_pipelined_connection_impl.cc439
-rw-r--r--net/http/http_pipelined_connection_impl.h81
2 files changed, 304 insertions, 216 deletions
diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc
index aa3d178..82c788f 100644
--- a/net/http/http_pipelined_connection_impl.cc
+++ b/net/http/http_pipelined_connection_impl.cc
@@ -34,25 +34,26 @@ HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
completed_one_request_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
send_next_state_(SEND_STATE_NONE),
+ send_still_on_call_stack_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
- send_user_callback_(NULL),
read_next_state_(READ_STATE_NONE),
+ active_read_id_(0),
+ read_still_on_call_stack_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
- this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
- read_user_callback_(NULL) {
+ this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) {
CHECK(connection_.get());
}
HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
CHECK_EQ(depth(), 0);
CHECK(stream_info_map_.empty());
- CHECK(deferred_request_queue_.empty());
+ CHECK(pending_send_request_queue_.empty());
CHECK(request_order_.empty());
CHECK_EQ(send_next_state_, SEND_STATE_NONE);
CHECK_EQ(read_next_state_, READ_STATE_NONE);
- CHECK(!send_user_callback_);
- CHECK(!read_user_callback_);
+ CHECK(!active_send_request_.get());
+ CHECK(!active_read_id_);
if (!usable_) {
connection_->socket()->Disconnect();
}
@@ -105,7 +106,6 @@ void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
stream_info_map_[pipeline_id].parser.reset();
}
CHECK(!stream_info_map_[pipeline_id].parser.get());
- CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
stream_info_map_.erase(pipeline_id);
delegate_->OnPipelineHasCapacity(this);
@@ -123,18 +123,18 @@ int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
return ERR_PIPELINE_EVICTION;
}
- DeferredSendRequest deferred_request;
- deferred_request.pipeline_id = pipeline_id;
- deferred_request.request_line = request_line;
- deferred_request.headers = headers;
- deferred_request.request_body = request_body;
- deferred_request.response = response;
- deferred_request.callback = callback;
- deferred_request_queue_.push(deferred_request);
+ PendingSendRequest* send_request = new PendingSendRequest;
+ send_request->pipeline_id = pipeline_id;
+ send_request->request_line = request_line;
+ send_request->headers = headers;
+ send_request->request_body = request_body;
+ send_request->response = response;
+ send_request->callback = callback;
+ pending_send_request_queue_.push(send_request);
int rv;
if (send_next_state_ == SEND_STATE_NONE) {
- send_next_state_ = SEND_STATE_NEXT_REQUEST;
+ send_next_state_ = SEND_STATE_START_IMMEDIATELY;
rv = DoSendRequestLoop(OK);
} else {
rv = ERR_IO_PENDING;
@@ -149,13 +149,19 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
SendRequestState state = send_next_state_;
send_next_state_ = SEND_STATE_NONE;
switch (state) {
- case SEND_STATE_NEXT_REQUEST:
- rv = DoSendNextRequest(rv);
+ case SEND_STATE_START_IMMEDIATELY:
+ rv = DoStartRequestImmediately(rv);
+ break;
+ case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
+ rv = DoStartNextDeferredRequest(rv);
+ break;
+ case SEND_STATE_SEND_ACTIVE_REQUEST:
+ rv = DoSendActiveRequest(rv);
break;
case SEND_STATE_COMPLETE:
rv = DoSendComplete(rv);
break;
- case SEND_STATE_UNUSABLE:
+ case SEND_STATE_EVICT_PENDING_REQUESTS:
rv = DoEvictPendingSendRequests(rv);
break;
default:
@@ -164,91 +170,107 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
break;
}
} while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
+ send_still_on_call_stack_ = false;
return rv;
}
void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
- CHECK(send_user_callback_);
+ CHECK(active_send_request_.get());
DoSendRequestLoop(result);
}
-int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
- CHECK(!deferred_request_queue_.empty());
- const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
- CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id));
- if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) {
- deferred_request_queue_.pop();
- if (deferred_request_queue_.empty()) {
- send_next_state_ = SEND_STATE_NONE;
- } else {
- send_next_state_ = SEND_STATE_NEXT_REQUEST;
+int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
+ CHECK(!active_send_request_.get());
+ CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
+ // If SendRequest() completes synchronously, then we need to return the value
+ // directly to the caller. |send_still_on_call_stack_| will track this.
+ // Otherwise, asynchronous completions will notify the caller via callback.
+ send_still_on_call_stack_ = true;
+ active_send_request_.reset(pending_send_request_queue_.front());
+ pending_send_request_queue_.pop();
+ send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
+ CHECK(!send_still_on_call_stack_);
+ CHECK(!active_send_request_.get());
+
+ while (!pending_send_request_queue_.empty()) {
+ scoped_ptr<PendingSendRequest> next_request(
+ pending_send_request_queue_.front());
+ pending_send_request_queue_.pop();
+ CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
+ if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
+ active_send_request_.reset(next_request.release());
+ send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
+ return OK;
}
- return OK;
- }
- CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get());
- int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest(
- deferred_request.request_line,
- deferred_request.headers,
- deferred_request.request_body,
- deferred_request.response,
- &send_io_callback_);
- // |result| == ERR_IO_PENDING means this function was *not* called on the same
- // stack as SendRequest(). That means we returned ERR_IO_PENDING to
- // SendRequest() earlier and will need to invoke its callback.
- if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
- send_user_callback_ = deferred_request.callback;
}
- stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING;
+
+ send_next_state_ = SEND_STATE_NONE;
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
+ CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
+ int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
+ SendRequest(active_send_request_->request_line,
+ active_send_request_->headers,
+ active_send_request_->request_body,
+ active_send_request_->response,
+ &send_io_callback_);
+ stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
send_next_state_ = SEND_STATE_COMPLETE;
return rv;
}
int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
- CHECK(!deferred_request_queue_.empty());
- const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
- CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state,
- STREAM_SENDING);
- request_order_.push(deferred_request.pipeline_id);
- stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT;
- deferred_request_queue_.pop();
+ CHECK(active_send_request_.get());
+ CHECK_EQ(STREAM_SENDING,
+ stream_info_map_[active_send_request_->pipeline_id].state);
+
+ request_order_.push(active_send_request_->pipeline_id);
+ stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
+
if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
result = ERR_PIPELINE_EVICTION;
}
if (result < OK) {
- send_next_state_ = SEND_STATE_UNUSABLE;
usable_ = false;
}
- if (send_user_callback_) {
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- deferred_request.pipeline_id,
- result));
- stream_info_map_[deferred_request.pipeline_id].pending_user_callback =
- send_user_callback_;
- send_user_callback_ = NULL;
- }
- if (result < OK) {
- return result;
+
+ if (!send_still_on_call_stack_) {
+ QueueUserCallback(active_send_request_->pipeline_id,
+ active_send_request_->callback, result, FROM_HERE);
}
- if (deferred_request_queue_.empty()) {
+
+ active_send_request_.reset();
+
+ if (send_still_on_call_stack_) {
+ // It should be impossible for another request to appear on the queue while
+ // this send was on the call stack.
+ CHECK(pending_send_request_queue_.empty());
send_next_state_ = SEND_STATE_NONE;
- return OK;
+ } else if (!usable_) {
+ send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
+ } else {
+ send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
}
- send_next_state_ = SEND_STATE_NEXT_REQUEST;
- return OK;
+
+ return result;
}
int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
- send_next_state_ = SEND_STATE_NONE;
- while (!deferred_request_queue_.empty()) {
- const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
- if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
- evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
+ while (!pending_send_request_queue_.empty()) {
+ scoped_ptr<PendingSendRequest> evicted_send(
+ pending_send_request_queue_.front());
+ pending_send_request_queue_.pop();
+ if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
+ evicted_send->callback->Run(ERR_PIPELINE_EVICTION);
}
- deferred_request_queue_.pop();
}
+ send_next_state_ = SEND_STATE_NONE;
return result;
}
@@ -256,18 +278,27 @@ int HttpPipelinedConnectionImpl::ReadResponseHeaders(
int pipeline_id,
OldCompletionCallback* callback) {
CHECK(ContainsKey(stream_info_map_, pipeline_id));
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT);
+ CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
+
if (!usable_) {
return ERR_PIPELINE_EVICTION;
}
+
stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
stream_info_map_[pipeline_id].read_headers_callback = callback;
- if (read_next_state_ == READ_STATE_NONE) {
- read_next_state_ = READ_STATE_NEXT_HEADERS;
+ if (read_next_state_ == READ_STATE_NONE &&
+ pipeline_id == request_order_.front()) {
+ read_next_state_ = READ_STATE_START_IMMEDIATELY;
return DoReadHeadersLoop(OK);
- } else {
- return ERR_IO_PENDING;
+ }
+ return ERR_IO_PENDING;
+}
+
+void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
+ if (read_next_state_ == READ_STATE_NONE) {
+ read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
+ DoReadHeadersLoop(OK);
}
}
@@ -277,19 +308,28 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
ReadHeadersState state = read_next_state_;
read_next_state_ = READ_STATE_NONE;
switch (state) {
- case READ_STATE_NEXT_HEADERS:
- rv = DoReadNextHeaders(rv);
+ case READ_STATE_START_IMMEDIATELY:
+ rv = DoStartReadImmediately(rv);
+ break;
+ case READ_STATE_START_NEXT_DEFERRED_READ:
+ rv = DoStartNextDeferredRead(rv);
break;
- case READ_STATE_COMPLETE:
+ case READ_STATE_READ_HEADERS:
+ rv = DoReadHeaders(rv);
+ break;
+ case READ_STATE_READ_HEADERS_COMPLETE:
rv = DoReadHeadersComplete(rv);
break;
case READ_STATE_WAITING_FOR_CLOSE:
- rv = DoReadWaitingForClose(rv);
+ // This is a holding state. We return instead of continuing to run hte
+ // loop. The state will advance when the stream calls Close().
+ rv = DoReadWaitForClose(rv);
+ read_still_on_call_stack_ = false;
return rv;
case READ_STATE_STREAM_CLOSED:
rv = DoReadStreamClosed();
break;
- case READ_STATE_UNUSABLE:
+ case READ_STATE_EVICT_PENDING_READS:
rv = DoEvictPendingReadHeaders(rv);
break;
case READ_STATE_NONE:
@@ -300,139 +340,135 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
break;
}
} while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
+ read_still_on_call_stack_ = false;
return rv;
}
void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
- CHECK(read_user_callback_);
DoReadHeadersLoop(result);
}
-int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
+int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
+ CHECK(!active_read_id_);
+ CHECK(!read_still_on_call_stack_);
CHECK(!request_order_.empty());
- int pipeline_id = request_order_.front();
- CHECK(ContainsKey(stream_info_map_, pipeline_id));
- if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) {
- // Since nobody will read whatever data is on the pipeline associated with
- // this closed request, we must shut down the rest of the pipeline.
- read_next_state_ = READ_STATE_UNUSABLE;
+ // If ReadResponseHeaders() completes synchronously, then we need to return
+ // the value directly to the caller. |read_still_on_call_stack_| will track
+ // this. Otherwise, asynchronous completions will notify the caller via
+ // callback.
+ read_still_on_call_stack_ = true;
+ read_next_state_ = READ_STATE_READ_HEADERS;
+ active_read_id_ = request_order_.front();
+ request_order_.pop();
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
+ CHECK(!active_read_id_);
+ CHECK(!read_still_on_call_stack_);
+
+ if (request_order_.empty()) {
+ read_next_state_ = READ_STATE_NONE;
return OK;
}
- if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
- return ERR_IO_PENDING;
- }
- CHECK(stream_info_map_[pipeline_id].parser.get());
- if (result == ERR_IO_PENDING) {
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE);
- } else {
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING);
- stream_info_map_[pipeline_id].state = STREAM_ACTIVE;
- }
+ int next_id = request_order_.front();
+ CHECK(ContainsKey(stream_info_map_, next_id));
+ switch (stream_info_map_[next_id].state) {
+ case STREAM_READ_PENDING:
+ read_next_state_ = READ_STATE_READ_HEADERS;
+ active_read_id_ = next_id;
+ request_order_.pop();
+ break;
- int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders(
- &read_io_callback_);
- if (rv == ERR_IO_PENDING) {
- read_next_state_ = READ_STATE_COMPLETE;
- read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
- } else if (rv < OK) {
- read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
- rv = ERR_PIPELINE_EVICTION;
- } else {
- CHECK_LE(OK, rv);
- read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- }
+ case STREAM_CLOSED:
+ // Since nobody will read whatever data is on the pipeline associated with
+ // this closed request, we must shut down the rest of the pipeline.
+ read_next_state_ = READ_STATE_EVICT_PENDING_READS;
+ break;
- // |result| == ERR_IO_PENDING means this function was *not* called on the same
- // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
- // ReadResponseHeaders() earlier and now need to invoke its callback.
- if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
- read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
- stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- pipeline_id,
- rv));
+ case STREAM_SENT:
+ read_next_state_ = READ_STATE_NONE;
+ break;
+
+ default:
+ NOTREACHED() << "Unexpected read state: "
+ << stream_info_map_[next_id].state;
}
+
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
+ CHECK(active_read_id_);
+ CHECK(ContainsKey(stream_info_map_, active_read_id_));
+ CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
+ stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
+ int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
+ &read_io_callback_);
+ read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
return rv;
}
int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
+ CHECK(active_read_id_);
+ CHECK(ContainsKey(stream_info_map_, active_read_id_));
+ CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
+
read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
- if (read_user_callback_) {
- int pipeline_id = request_order_.front();
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- pipeline_id,
- result));
- stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
- read_user_callback_ = NULL;
+ if (result < OK) {
+ if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
+ result = ERR_PIPELINE_EVICTION;
+ }
+ usable_ = false;
+ }
+
+ if (!read_still_on_call_stack_) {
+ QueueUserCallback(active_read_id_,
+ stream_info_map_[active_read_id_].read_headers_callback,
+ result, FROM_HERE);
}
+
return result;
}
-int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
+int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
return result;
}
int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
- CHECK(!request_order_.empty());
- int pipeline_id = request_order_.front();
- CHECK(ContainsKey(stream_info_map_, pipeline_id));
- CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
- CHECK(stream_info_map_[pipeline_id].read_headers_callback);
- stream_info_map_[pipeline_id].read_headers_callback = NULL;
- request_order_.pop();
+ CHECK(active_read_id_);
+ CHECK(ContainsKey(stream_info_map_, active_read_id_));
+ CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
+ active_read_id_ = 0;
if (!usable_) {
- read_next_state_ = READ_STATE_UNUSABLE;
- return OK;
- } else {
- completed_one_request_ = true;
- if (!request_order_.empty()) {
- int next_pipeline_id = request_order_.front();
- CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
- if (stream_info_map_[next_pipeline_id].read_headers_callback) {
- stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
- read_next_state_ = READ_STATE_NEXT_HEADERS;
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
- ERR_IO_PENDING));
- return ERR_IO_PENDING; // Wait for the task to fire.
- }
- }
- read_next_state_ = READ_STATE_NONE;
+ read_next_state_ = READ_STATE_EVICT_PENDING_READS;
return OK;
}
+ completed_one_request_ = true;
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::StartNextDeferredRead));
+ read_next_state_ = READ_STATE_NONE;
+ return OK;
}
int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
while (!request_order_.empty()) {
int evicted_id = request_order_.front();
request_order_.pop();
- if (!ContainsKey(stream_info_map_, evicted_id) ||
- (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
+ if (!ContainsKey(stream_info_map_, evicted_id)) {
continue;
}
- if (stream_info_map_[evicted_id].state != STREAM_CLOSED) {
- stream_info_map_[evicted_id].pending_user_callback =
- stream_info_map_[evicted_id].read_headers_callback;
- MessageLoop::current()->PostTask(
- FROM_HERE,
- method_factory_.NewRunnableMethod(
- &HttpPipelinedConnectionImpl::FireUserCallback,
- evicted_id,
- ERR_PIPELINE_EVICTION));
+ if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
+ stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
+ QueueUserCallback(evicted_id,
+ stream_info_map_[evicted_id].read_headers_callback,
+ ERR_PIPELINE_EVICTION,
+ FROM_HERE);
}
- stream_info_map_[evicted_id].read_headers_callback = NULL;
}
read_next_state_ = READ_STATE_NONE;
return result;
@@ -453,8 +489,8 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
case STREAM_SENDING:
usable_ = false;
stream_info_map_[pipeline_id].state = STREAM_CLOSED;
- send_user_callback_ = NULL;
- send_next_state_ = SEND_STATE_UNUSABLE;
+ active_send_request_.reset();
+ send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
DoSendRequestLoop(OK);
break;
@@ -462,9 +498,10 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
case STREAM_READ_PENDING:
usable_ = false;
stream_info_map_[pipeline_id].state = STREAM_CLOSED;
- stream_info_map_[pipeline_id].read_headers_callback = NULL;
- if (read_next_state_ == READ_STATE_NONE) {
- read_next_state_ = READ_STATE_UNUSABLE;
+ if (!request_order_.empty() &&
+ pipeline_id == request_order_.front() &&
+ read_next_state_ == READ_STATE_NONE) {
+ read_next_state_ = READ_STATE_EVICT_PENDING_READS;
DoReadHeadersLoop(OK);
}
break;
@@ -475,10 +512,13 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
usable_ = false;
}
read_next_state_ = READ_STATE_STREAM_CLOSED;
- read_user_callback_ = NULL;
DoReadHeadersLoop(OK);
break;
+ case STREAM_READ_EVICTED:
+ stream_info_map_[pipeline_id].state = STREAM_CLOSED;
+ break;
+
case STREAM_CLOSED:
case STREAM_UNUSED:
// TODO(simonjam): Why is Close() sometimes called twice?
@@ -496,8 +536,7 @@ int HttpPipelinedConnectionImpl::ReadResponseBody(
int buf_len,
OldCompletionCallback* callback) {
CHECK(ContainsKey(stream_info_map_, pipeline_id));
- CHECK(!request_order_.empty());
- CHECK_EQ(pipeline_id, request_order_.front());
+ CHECK_EQ(active_read_id_, pipeline_id);
CHECK(stream_info_map_[pipeline_id].parser.get());
return stream_info_map_[pipeline_id].parser->ReadResponseBody(
buf, buf_len, callback);
@@ -567,10 +606,29 @@ void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
cert_request_info);
}
+void HttpPipelinedConnectionImpl::QueueUserCallback(
+ int pipeline_id,
+ OldCompletionCallback* callback,
+ int rv,
+ const tracked_objects::Location& from_here) {
+ CHECK(!stream_info_map_[pipeline_id].pending_user_callback);
+ stream_info_map_[pipeline_id].pending_user_callback = callback;
+ MessageLoop::current()->PostTask(
+ from_here,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::FireUserCallback,
+ pipeline_id,
+ rv));
+}
+
void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
int result) {
if (ContainsKey(stream_info_map_, pipeline_id)) {
- stream_info_map_[pipeline_id].pending_user_callback->Run(result);
+ CHECK(stream_info_map_[pipeline_id].pending_user_callback);
+ OldCompletionCallback* callback =
+ stream_info_map_[pipeline_id].pending_user_callback;
+ stream_info_map_[pipeline_id].pending_user_callback = NULL;
+ callback->Run(result);
}
}
@@ -602,14 +660,15 @@ bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
return was_npn_negotiated_;
}
-HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() {
+HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() {
}
-HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() {
+HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
}
HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
: read_headers_callback(NULL),
+ pending_user_callback(NULL),
state(STREAM_CREATED) {
}
diff --git a/net/http/http_pipelined_connection_impl.h b/net/http/http_pipelined_connection_impl.h
index d77f9ae..6a7f385 100644
--- a/net/http/http_pipelined_connection_impl.h
+++ b/net/http/http_pipelined_connection_impl.h
@@ -11,6 +11,7 @@
#include <string>
#include "base/basictypes.h"
+#include "base/location.h"
#include "base/memory/linked_ptr.h"
#include "base/task.h"
#include "net/base/completion_callback.h"
@@ -125,26 +126,31 @@ class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl
STREAM_READ_PENDING,
STREAM_ACTIVE,
STREAM_CLOSED,
+ STREAM_READ_EVICTED,
STREAM_UNUSED,
};
enum SendRequestState {
- SEND_STATE_NEXT_REQUEST,
+ SEND_STATE_START_IMMEDIATELY,
+ SEND_STATE_START_NEXT_DEFERRED_REQUEST,
+ SEND_STATE_SEND_ACTIVE_REQUEST,
SEND_STATE_COMPLETE,
+ SEND_STATE_EVICT_PENDING_REQUESTS,
SEND_STATE_NONE,
- SEND_STATE_UNUSABLE,
};
enum ReadHeadersState {
- READ_STATE_NEXT_HEADERS,
- READ_STATE_COMPLETE,
+ READ_STATE_START_IMMEDIATELY,
+ READ_STATE_START_NEXT_DEFERRED_READ,
+ READ_STATE_READ_HEADERS,
+ READ_STATE_READ_HEADERS_COMPLETE,
READ_STATE_WAITING_FOR_CLOSE,
READ_STATE_STREAM_CLOSED,
READ_STATE_NONE,
- READ_STATE_UNUSABLE,
+ READ_STATE_EVICT_PENDING_READS,
};
- struct DeferredSendRequest {
- DeferredSendRequest();
- ~DeferredSendRequest();
+ struct PendingSendRequest {
+ PendingSendRequest();
+ ~PendingSendRequest();
int pipeline_id;
std::string request_line;
@@ -180,10 +186,17 @@ class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl
// Called when an asynchronous Send() completes.
void OnSendIOCallback(int result);
- // Sends the next deferred request. This may be called immediately after
- // SendRequest(), or it may be in a new task after a prior send completes in
- // DoSendComplete().
- int DoSendNextRequest(int result);
+ // Activates the only request in |pending_send_request_queue_|. This should
+ // only be called via SendRequest() when the send loop is idle.
+ int DoStartRequestImmediately(int result);
+
+ // Activates the first request in |pending_send_request_queue_| that hasn't
+ // been closed, if any. This is called via DoSendComplete() after a prior
+ // request complets.
+ int DoStartNextDeferredRequest(int result);
+
+ // Sends the active request.
+ int DoSendActiveRequest(int result);
// Notifies the user that the send has completed. This may be called directly
// after SendRequest() for a synchronous request, or it may be called in
@@ -203,22 +216,30 @@ class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl
// Called when the pending asynchronous ReadResponseHeaders() completes.
void OnReadIOCallback(int result);
- // Determines if the next response in the pipeline is ready to be read.
- // If it's ready, then we call ReadResponseHeaders() on the underlying parser.
- // HttpPipelinedSocket indicates its readiness by calling
- // ReadResponseHeaders(). This function may be called immediately after
- // ReadResponseHeaders(), or it may be called in a new task after a previous
- // HttpPipelinedSocket finishes its work.
- int DoReadNextHeaders(int result);
+ // Invokes DoStartNextDeferredRead() if the read loop is idle. This is called
+ // via a task queued when the previous |active_read_id_| closes its stream
+ // after a succesful response.
+ void StartNextDeferredRead();
+
+ // Activates the next read request immediately. This is called via
+ // ReadResponseHeaders() if that stream is at the front of |request_order_|
+ // and the read loop is idle.
+ int DoStartReadImmediately(int result);
+
+ // Activates the next read request in |request_order_| if it's ready to go.
+ // This is called via StartNextDeferredRead().
+ int DoStartNextDeferredRead(int result);
+
+ // Calls ReadResponseHeaders() on the active request's parser.
+ int DoReadHeaders(int result);
// Notifies the user that reading the headers has completed. This may happen
// directly after DoReadNextHeaders() if the response is already available.
// Otherwise, it is called in response to OnReadIOCallback().
int DoReadHeadersComplete(int result);
- // This is a holding state. It does not do anything, except exit the
- // DoReadHeadersLoop(). It is called after DoReadHeadersComplete().
- int DoReadWaitingForClose(int result);
+ // Halts the read loop until Close() is called by the active stream.
+ int DoReadWaitForClose(int result);
// Cleans up the state associated with the active request. Invokes
// DoReadNextHeaders() in a new task to start the next response. This is
@@ -230,12 +251,18 @@ class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl
// HttpPipelinedSockets indicates the connection was suddenly closed.
int DoEvictPendingReadHeaders(int result);
- // Invokes the user's callback in response to SendRequest() or
+ // Posts a task to fire the user's callback in response to SendRequest() or
// ReadResponseHeaders() completing on an underlying parser. This might be
// invoked in response to our own IO callbacks, or it may be invoked if the
// underlying parser completes SendRequest() or ReadResponseHeaders()
// synchronously, but we've already returned ERR_IO_PENDING to the user's
// SendRequest() or ReadResponseHeaders() call into us.
+ void QueueUserCallback(int pipeline_id,
+ OldCompletionCallback* callback,
+ int rv,
+ const tracked_objects::Location& from_here);
+
+ // Invokes the callback queued in QueueUserCallback().
void FireUserCallback(int pipeline_id, int result);
Delegate* delegate_;
@@ -255,14 +282,16 @@ class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl
std::queue<int> request_order_;
- std::queue<DeferredSendRequest> deferred_request_queue_;
+ std::queue<PendingSendRequest*> pending_send_request_queue_;
+ scoped_ptr<PendingSendRequest> active_send_request_;
SendRequestState send_next_state_;
+ bool send_still_on_call_stack_;
OldCompletionCallbackImpl<HttpPipelinedConnectionImpl> send_io_callback_;
- OldCompletionCallback* send_user_callback_;
ReadHeadersState read_next_state_;
+ int active_read_id_;
+ bool read_still_on_call_stack_;
OldCompletionCallbackImpl<HttpPipelinedConnectionImpl> read_io_callback_;
- OldCompletionCallback* read_user_callback_;
DISALLOW_COPY_AND_ASSIGN(HttpPipelinedConnectionImpl);
};