diff options
author | simonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-11-15 20:36:51 +0000 |
---|---|---|
committer | simonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-11-15 20:36:51 +0000 |
commit | 2c49511423700ea206402a5d2a55daa65cb1bd3d (patch) | |
tree | 6f62dbf082214dba60518490116ac9b00100bf80 /net/http/http_pipelined_connection_impl.cc | |
parent | 1de476696d65c188b3d661316cc2a0dc11e2adcc (diff) | |
download | chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.zip chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.tar.gz chromium_src-2c49511423700ea206402a5d2a55daa65cb1bd3d.tar.bz2 |
Refactor state machines in HttpPipelinedConnectionImpl.
Hopefully you agree that this makes the code a whole lot easier to understand.
BUG=None
TEST=net_unittests
Review URL: http://codereview.chromium.org/8515020
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@110159 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/http/http_pipelined_connection_impl.cc')
-rw-r--r-- | net/http/http_pipelined_connection_impl.cc | 439 |
1 files changed, 249 insertions, 190 deletions
diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc index aa3d178..82c788f 100644 --- a/net/http/http_pipelined_connection_impl.cc +++ b/net/http/http_pipelined_connection_impl.cc @@ -34,25 +34,26 @@ HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl( completed_one_request_(false), ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), send_next_state_(SEND_STATE_NONE), + send_still_on_call_stack_(false), ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_( this, &HttpPipelinedConnectionImpl::OnSendIOCallback)), - send_user_callback_(NULL), read_next_state_(READ_STATE_NONE), + active_read_id_(0), + read_still_on_call_stack_(false), ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_( - this, &HttpPipelinedConnectionImpl::OnReadIOCallback)), - read_user_callback_(NULL) { + this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) { CHECK(connection_.get()); } HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { CHECK_EQ(depth(), 0); CHECK(stream_info_map_.empty()); - CHECK(deferred_request_queue_.empty()); + CHECK(pending_send_request_queue_.empty()); CHECK(request_order_.empty()); CHECK_EQ(send_next_state_, SEND_STATE_NONE); CHECK_EQ(read_next_state_, READ_STATE_NONE); - CHECK(!send_user_callback_); - CHECK(!read_user_callback_); + CHECK(!active_send_request_.get()); + CHECK(!active_read_id_); if (!usable_) { connection_->socket()->Disconnect(); } @@ -105,7 +106,6 @@ void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) { stream_info_map_[pipeline_id].parser.reset(); } CHECK(!stream_info_map_[pipeline_id].parser.get()); - CHECK(!stream_info_map_[pipeline_id].read_headers_callback); stream_info_map_.erase(pipeline_id); delegate_->OnPipelineHasCapacity(this); @@ -123,18 +123,18 @@ int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id, return ERR_PIPELINE_EVICTION; } - DeferredSendRequest deferred_request; - deferred_request.pipeline_id = pipeline_id; - deferred_request.request_line = request_line; - deferred_request.headers = headers; - deferred_request.request_body = request_body; - deferred_request.response = response; - deferred_request.callback = callback; - deferred_request_queue_.push(deferred_request); + PendingSendRequest* send_request = new PendingSendRequest; + send_request->pipeline_id = pipeline_id; + send_request->request_line = request_line; + send_request->headers = headers; + send_request->request_body = request_body; + send_request->response = response; + send_request->callback = callback; + pending_send_request_queue_.push(send_request); int rv; if (send_next_state_ == SEND_STATE_NONE) { - send_next_state_ = SEND_STATE_NEXT_REQUEST; + send_next_state_ = SEND_STATE_START_IMMEDIATELY; rv = DoSendRequestLoop(OK); } else { rv = ERR_IO_PENDING; @@ -149,13 +149,19 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { SendRequestState state = send_next_state_; send_next_state_ = SEND_STATE_NONE; switch (state) { - case SEND_STATE_NEXT_REQUEST: - rv = DoSendNextRequest(rv); + case SEND_STATE_START_IMMEDIATELY: + rv = DoStartRequestImmediately(rv); + break; + case SEND_STATE_START_NEXT_DEFERRED_REQUEST: + rv = DoStartNextDeferredRequest(rv); + break; + case SEND_STATE_SEND_ACTIVE_REQUEST: + rv = DoSendActiveRequest(rv); break; case SEND_STATE_COMPLETE: rv = DoSendComplete(rv); break; - case SEND_STATE_UNUSABLE: + case SEND_STATE_EVICT_PENDING_REQUESTS: rv = DoEvictPendingSendRequests(rv); break; default: @@ -164,91 +170,107 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { break; } } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); + send_still_on_call_stack_ = false; return rv; } void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { - CHECK(send_user_callback_); + CHECK(active_send_request_.get()); DoSendRequestLoop(result); } -int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) { - CHECK(!deferred_request_queue_.empty()); - const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); - CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id)); - if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) { - deferred_request_queue_.pop(); - if (deferred_request_queue_.empty()) { - send_next_state_ = SEND_STATE_NONE; - } else { - send_next_state_ = SEND_STATE_NEXT_REQUEST; +int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) { + CHECK(!active_send_request_.get()); + CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size()); + // If SendRequest() completes synchronously, then we need to return the value + // directly to the caller. |send_still_on_call_stack_| will track this. + // Otherwise, asynchronous completions will notify the caller via callback. + send_still_on_call_stack_ = true; + active_send_request_.reset(pending_send_request_queue_.front()); + pending_send_request_queue_.pop(); + send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST; + return OK; +} + +int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) { + CHECK(!send_still_on_call_stack_); + CHECK(!active_send_request_.get()); + + while (!pending_send_request_queue_.empty()) { + scoped_ptr<PendingSendRequest> next_request( + pending_send_request_queue_.front()); + pending_send_request_queue_.pop(); + CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id)); + if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) { + active_send_request_.reset(next_request.release()); + send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST; + return OK; } - return OK; - } - CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get()); - int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest( - deferred_request.request_line, - deferred_request.headers, - deferred_request.request_body, - deferred_request.response, - &send_io_callback_); - // |result| == ERR_IO_PENDING means this function was *not* called on the same - // stack as SendRequest(). That means we returned ERR_IO_PENDING to - // SendRequest() earlier and will need to invoke its callback. - if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) { - send_user_callback_ = deferred_request.callback; } - stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING; + + send_next_state_ = SEND_STATE_NONE; + return OK; +} + +int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) { + CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get()); + int rv = stream_info_map_[active_send_request_->pipeline_id].parser-> + SendRequest(active_send_request_->request_line, + active_send_request_->headers, + active_send_request_->request_body, + active_send_request_->response, + &send_io_callback_); + stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING; send_next_state_ = SEND_STATE_COMPLETE; return rv; } int HttpPipelinedConnectionImpl::DoSendComplete(int result) { - CHECK(!deferred_request_queue_.empty()); - const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); - CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state, - STREAM_SENDING); - request_order_.push(deferred_request.pipeline_id); - stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT; - deferred_request_queue_.pop(); + CHECK(active_send_request_.get()); + CHECK_EQ(STREAM_SENDING, + stream_info_map_[active_send_request_->pipeline_id].state); + + request_order_.push(active_send_request_->pipeline_id); + stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT; + if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { result = ERR_PIPELINE_EVICTION; } if (result < OK) { - send_next_state_ = SEND_STATE_UNUSABLE; usable_ = false; } - if (send_user_callback_) { - MessageLoop::current()->PostTask( - FROM_HERE, - method_factory_.NewRunnableMethod( - &HttpPipelinedConnectionImpl::FireUserCallback, - deferred_request.pipeline_id, - result)); - stream_info_map_[deferred_request.pipeline_id].pending_user_callback = - send_user_callback_; - send_user_callback_ = NULL; - } - if (result < OK) { - return result; + + if (!send_still_on_call_stack_) { + QueueUserCallback(active_send_request_->pipeline_id, + active_send_request_->callback, result, FROM_HERE); } - if (deferred_request_queue_.empty()) { + + active_send_request_.reset(); + + if (send_still_on_call_stack_) { + // It should be impossible for another request to appear on the queue while + // this send was on the call stack. + CHECK(pending_send_request_queue_.empty()); send_next_state_ = SEND_STATE_NONE; - return OK; + } else if (!usable_) { + send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS; + } else { + send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST; } - send_next_state_ = SEND_STATE_NEXT_REQUEST; - return OK; + + return result; } int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { - send_next_state_ = SEND_STATE_NONE; - while (!deferred_request_queue_.empty()) { - const DeferredSendRequest& evicted_send = deferred_request_queue_.front(); - if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) { - evicted_send.callback->Run(ERR_PIPELINE_EVICTION); + while (!pending_send_request_queue_.empty()) { + scoped_ptr<PendingSendRequest> evicted_send( + pending_send_request_queue_.front()); + pending_send_request_queue_.pop(); + if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) { + evicted_send->callback->Run(ERR_PIPELINE_EVICTION); } - deferred_request_queue_.pop(); } + send_next_state_ = SEND_STATE_NONE; return result; } @@ -256,18 +278,27 @@ int HttpPipelinedConnectionImpl::ReadResponseHeaders( int pipeline_id, OldCompletionCallback* callback) { CHECK(ContainsKey(stream_info_map_, pipeline_id)); - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT); + CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state); CHECK(!stream_info_map_[pipeline_id].read_headers_callback); + if (!usable_) { return ERR_PIPELINE_EVICTION; } + stream_info_map_[pipeline_id].state = STREAM_READ_PENDING; stream_info_map_[pipeline_id].read_headers_callback = callback; - if (read_next_state_ == READ_STATE_NONE) { - read_next_state_ = READ_STATE_NEXT_HEADERS; + if (read_next_state_ == READ_STATE_NONE && + pipeline_id == request_order_.front()) { + read_next_state_ = READ_STATE_START_IMMEDIATELY; return DoReadHeadersLoop(OK); - } else { - return ERR_IO_PENDING; + } + return ERR_IO_PENDING; +} + +void HttpPipelinedConnectionImpl::StartNextDeferredRead() { + if (read_next_state_ == READ_STATE_NONE) { + read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ; + DoReadHeadersLoop(OK); } } @@ -277,19 +308,28 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { ReadHeadersState state = read_next_state_; read_next_state_ = READ_STATE_NONE; switch (state) { - case READ_STATE_NEXT_HEADERS: - rv = DoReadNextHeaders(rv); + case READ_STATE_START_IMMEDIATELY: + rv = DoStartReadImmediately(rv); + break; + case READ_STATE_START_NEXT_DEFERRED_READ: + rv = DoStartNextDeferredRead(rv); break; - case READ_STATE_COMPLETE: + case READ_STATE_READ_HEADERS: + rv = DoReadHeaders(rv); + break; + case READ_STATE_READ_HEADERS_COMPLETE: rv = DoReadHeadersComplete(rv); break; case READ_STATE_WAITING_FOR_CLOSE: - rv = DoReadWaitingForClose(rv); + // This is a holding state. We return instead of continuing to run hte + // loop. The state will advance when the stream calls Close(). + rv = DoReadWaitForClose(rv); + read_still_on_call_stack_ = false; return rv; case READ_STATE_STREAM_CLOSED: rv = DoReadStreamClosed(); break; - case READ_STATE_UNUSABLE: + case READ_STATE_EVICT_PENDING_READS: rv = DoEvictPendingReadHeaders(rv); break; case READ_STATE_NONE: @@ -300,139 +340,135 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { break; } } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); + read_still_on_call_stack_ = false; return rv; } void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { - CHECK(read_user_callback_); DoReadHeadersLoop(result); } -int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) { +int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) { + CHECK(!active_read_id_); + CHECK(!read_still_on_call_stack_); CHECK(!request_order_.empty()); - int pipeline_id = request_order_.front(); - CHECK(ContainsKey(stream_info_map_, pipeline_id)); - if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) { - // Since nobody will read whatever data is on the pipeline associated with - // this closed request, we must shut down the rest of the pipeline. - read_next_state_ = READ_STATE_UNUSABLE; + // If ReadResponseHeaders() completes synchronously, then we need to return + // the value directly to the caller. |read_still_on_call_stack_| will track + // this. Otherwise, asynchronous completions will notify the caller via + // callback. + read_still_on_call_stack_ = true; + read_next_state_ = READ_STATE_READ_HEADERS; + active_read_id_ = request_order_.front(); + request_order_.pop(); + return OK; +} + +int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) { + CHECK(!active_read_id_); + CHECK(!read_still_on_call_stack_); + + if (request_order_.empty()) { + read_next_state_ = READ_STATE_NONE; return OK; } - if (stream_info_map_[pipeline_id].read_headers_callback == NULL) { - return ERR_IO_PENDING; - } - CHECK(stream_info_map_[pipeline_id].parser.get()); - if (result == ERR_IO_PENDING) { - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE); - } else { - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING); - stream_info_map_[pipeline_id].state = STREAM_ACTIVE; - } + int next_id = request_order_.front(); + CHECK(ContainsKey(stream_info_map_, next_id)); + switch (stream_info_map_[next_id].state) { + case STREAM_READ_PENDING: + read_next_state_ = READ_STATE_READ_HEADERS; + active_read_id_ = next_id; + request_order_.pop(); + break; - int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders( - &read_io_callback_); - if (rv == ERR_IO_PENDING) { - read_next_state_ = READ_STATE_COMPLETE; - read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback; - } else if (rv < OK) { - read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; - if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) - rv = ERR_PIPELINE_EVICTION; - } else { - CHECK_LE(OK, rv); - read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; - } + case STREAM_CLOSED: + // Since nobody will read whatever data is on the pipeline associated with + // this closed request, we must shut down the rest of the pipeline. + read_next_state_ = READ_STATE_EVICT_PENDING_READS; + break; - // |result| == ERR_IO_PENDING means this function was *not* called on the same - // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to - // ReadResponseHeaders() earlier and now need to invoke its callback. - if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) { - read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; - read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback; - stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_; - MessageLoop::current()->PostTask( - FROM_HERE, - method_factory_.NewRunnableMethod( - &HttpPipelinedConnectionImpl::FireUserCallback, - pipeline_id, - rv)); + case STREAM_SENT: + read_next_state_ = READ_STATE_NONE; + break; + + default: + NOTREACHED() << "Unexpected read state: " + << stream_info_map_[next_id].state; } + + return OK; +} + +int HttpPipelinedConnectionImpl::DoReadHeaders(int result) { + CHECK(active_read_id_); + CHECK(ContainsKey(stream_info_map_, active_read_id_)); + CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state); + stream_info_map_[active_read_id_].state = STREAM_ACTIVE; + int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders( + &read_io_callback_); + read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE; return rv; } int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { + CHECK(active_read_id_); + CHECK(ContainsKey(stream_info_map_, active_read_id_)); + CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state); + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; - if (read_user_callback_) { - int pipeline_id = request_order_.front(); - MessageLoop::current()->PostTask( - FROM_HERE, - method_factory_.NewRunnableMethod( - &HttpPipelinedConnectionImpl::FireUserCallback, - pipeline_id, - result)); - stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_; - read_user_callback_ = NULL; + if (result < OK) { + if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { + result = ERR_PIPELINE_EVICTION; + } + usable_ = false; + } + + if (!read_still_on_call_stack_) { + QueueUserCallback(active_read_id_, + stream_info_map_[active_read_id_].read_headers_callback, + result, FROM_HERE); } + return result; } -int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) { +int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) { read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; return result; } int HttpPipelinedConnectionImpl::DoReadStreamClosed() { - CHECK(!request_order_.empty()); - int pipeline_id = request_order_.front(); - CHECK(ContainsKey(stream_info_map_, pipeline_id)); - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); - CHECK(stream_info_map_[pipeline_id].read_headers_callback); - stream_info_map_[pipeline_id].read_headers_callback = NULL; - request_order_.pop(); + CHECK(active_read_id_); + CHECK(ContainsKey(stream_info_map_, active_read_id_)); + CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED); + active_read_id_ = 0; if (!usable_) { - read_next_state_ = READ_STATE_UNUSABLE; - return OK; - } else { - completed_one_request_ = true; - if (!request_order_.empty()) { - int next_pipeline_id = request_order_.front(); - CHECK(ContainsKey(stream_info_map_, next_pipeline_id)); - if (stream_info_map_[next_pipeline_id].read_headers_callback) { - stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE; - read_next_state_ = READ_STATE_NEXT_HEADERS; - MessageLoop::current()->PostTask( - FROM_HERE, - method_factory_.NewRunnableMethod( - &HttpPipelinedConnectionImpl::DoReadHeadersLoop, - ERR_IO_PENDING)); - return ERR_IO_PENDING; // Wait for the task to fire. - } - } - read_next_state_ = READ_STATE_NONE; + read_next_state_ = READ_STATE_EVICT_PENDING_READS; return OK; } + completed_one_request_ = true; + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::StartNextDeferredRead)); + read_next_state_ = READ_STATE_NONE; + return OK; } int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { while (!request_order_.empty()) { int evicted_id = request_order_.front(); request_order_.pop(); - if (!ContainsKey(stream_info_map_, evicted_id) || - (stream_info_map_[evicted_id].read_headers_callback == NULL)) { + if (!ContainsKey(stream_info_map_, evicted_id)) { continue; } - if (stream_info_map_[evicted_id].state != STREAM_CLOSED) { - stream_info_map_[evicted_id].pending_user_callback = - stream_info_map_[evicted_id].read_headers_callback; - MessageLoop::current()->PostTask( - FROM_HERE, - method_factory_.NewRunnableMethod( - &HttpPipelinedConnectionImpl::FireUserCallback, - evicted_id, - ERR_PIPELINE_EVICTION)); + if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) { + stream_info_map_[evicted_id].state = STREAM_READ_EVICTED; + QueueUserCallback(evicted_id, + stream_info_map_[evicted_id].read_headers_callback, + ERR_PIPELINE_EVICTION, + FROM_HERE); } - stream_info_map_[evicted_id].read_headers_callback = NULL; } read_next_state_ = READ_STATE_NONE; return result; @@ -453,8 +489,8 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id, case STREAM_SENDING: usable_ = false; stream_info_map_[pipeline_id].state = STREAM_CLOSED; - send_user_callback_ = NULL; - send_next_state_ = SEND_STATE_UNUSABLE; + active_send_request_.reset(); + send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS; DoSendRequestLoop(OK); break; @@ -462,9 +498,10 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id, case STREAM_READ_PENDING: usable_ = false; stream_info_map_[pipeline_id].state = STREAM_CLOSED; - stream_info_map_[pipeline_id].read_headers_callback = NULL; - if (read_next_state_ == READ_STATE_NONE) { - read_next_state_ = READ_STATE_UNUSABLE; + if (!request_order_.empty() && + pipeline_id == request_order_.front() && + read_next_state_ == READ_STATE_NONE) { + read_next_state_ = READ_STATE_EVICT_PENDING_READS; DoReadHeadersLoop(OK); } break; @@ -475,10 +512,13 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id, usable_ = false; } read_next_state_ = READ_STATE_STREAM_CLOSED; - read_user_callback_ = NULL; DoReadHeadersLoop(OK); break; + case STREAM_READ_EVICTED: + stream_info_map_[pipeline_id].state = STREAM_CLOSED; + break; + case STREAM_CLOSED: case STREAM_UNUSED: // TODO(simonjam): Why is Close() sometimes called twice? @@ -496,8 +536,7 @@ int HttpPipelinedConnectionImpl::ReadResponseBody( int buf_len, OldCompletionCallback* callback) { CHECK(ContainsKey(stream_info_map_, pipeline_id)); - CHECK(!request_order_.empty()); - CHECK_EQ(pipeline_id, request_order_.front()); + CHECK_EQ(active_read_id_, pipeline_id); CHECK(stream_info_map_[pipeline_id].parser.get()); return stream_info_map_[pipeline_id].parser->ReadResponseBody( buf, buf_len, callback); @@ -567,10 +606,29 @@ void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( cert_request_info); } +void HttpPipelinedConnectionImpl::QueueUserCallback( + int pipeline_id, + OldCompletionCallback* callback, + int rv, + const tracked_objects::Location& from_here) { + CHECK(!stream_info_map_[pipeline_id].pending_user_callback); + stream_info_map_[pipeline_id].pending_user_callback = callback; + MessageLoop::current()->PostTask( + from_here, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::FireUserCallback, + pipeline_id, + rv)); +} + void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id, int result) { if (ContainsKey(stream_info_map_, pipeline_id)) { - stream_info_map_[pipeline_id].pending_user_callback->Run(result); + CHECK(stream_info_map_[pipeline_id].pending_user_callback); + OldCompletionCallback* callback = + stream_info_map_[pipeline_id].pending_user_callback; + stream_info_map_[pipeline_id].pending_user_callback = NULL; + callback->Run(result); } } @@ -602,14 +660,15 @@ bool HttpPipelinedConnectionImpl::was_npn_negotiated() const { return was_npn_negotiated_; } -HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() { +HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() { } -HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() { +HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() { } HttpPipelinedConnectionImpl::StreamInfo::StreamInfo() : read_headers_callback(NULL), + pending_user_callback(NULL), state(STREAM_CREATED) { } |