diff options
author | simonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-10-19 20:14:29 +0000 |
---|---|---|
committer | simonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-10-19 20:14:29 +0000 |
commit | 5a60c8bb002e4573dd0809f4a78d56b4c9720add (patch) | |
tree | 4be91c33213b0e7ab66dfed7ced978e8a16b3cdc /net | |
parent | 8053408224c37ade07452972e01de6126d074d27 (diff) | |
download | chromium_src-5a60c8bb002e4573dd0809f4a78d56b4c9720add.zip chromium_src-5a60c8bb002e4573dd0809f4a78d56b4c9720add.tar.gz chromium_src-5a60c8bb002e4573dd0809f4a78d56b4c9720add.tar.bz2 |
Basic HTTP pipelining support.
This must be enabled in about:flags. It is naive and assumes all servers correctly implement pipelining. Proxies are not supported.
Immediate future work:
- Integration tests.
- Additional NetLog logging.
- Refactor HttpPipelinedConnectionImpl.
Long term:
- Detect broken transparent proxies.
- Detect and/or mitigate broken servers.
- Buffer HttpPipelinedStream.
- Optimize number of pipelines and their depth.
- Support proxies.
BUG=8991
TEST=net_unittests
Review URL: http://codereview.chromium.org/7289006
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@106364 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
31 files changed, 3079 insertions, 80 deletions
diff --git a/net/base/net_error_list.h b/net/base/net_error_list.h index 3097d31..ca63445 100644 --- a/net/base/net_error_list.h +++ b/net/base/net_error_list.h @@ -637,3 +637,6 @@ NET_ERROR(DNS_TIMED_OUT, -803) // The entry was not found in cache, for cache-only lookups. NET_ERROR(DNS_CACHE_MISS, -804) + +// FIXME: Take the next number. +NET_ERROR(PIPELINE_EVICTION, -900) diff --git a/net/base/net_log_event_type_list.h b/net/base/net_log_event_type_list.h index 1547bc8..8ebca16 100644 --- a/net/base/net_log_event_type_list.h +++ b/net/base/net_log_event_type_list.h @@ -845,6 +845,13 @@ EVENT_TYPE(HTTP_TRANSACTION_READ_BODY) // restarting for authentication, on keep alive connections. EVENT_TYPE(HTTP_TRANSACTION_DRAIN_BODY_FOR_AUTH_RESTART) +// This event is sent when we try to restart a transaction after an error. +// The following parameters are attached: +// { +// "net_error": <The net error code integer for the failure>, +// } +EVENT_TYPE(HTTP_TRANSACTION_RESTART_AFTER_ERROR) + // ------------------------------------------------------------------------ // SpdySession // ------------------------------------------------------------------------ diff --git a/net/http/http_basic_stream.cc b/net/http/http_basic_stream.cc index eb5ecc2..43b141a 100644 --- a/net/http/http_basic_stream.cc +++ b/net/http/http_basic_stream.cc @@ -11,6 +11,7 @@ #include "net/base/net_errors.h" #include "net/http/http_request_headers.h" #include "net/http/http_request_info.h" +#include "net/http/http_response_body_drainer.h" #include "net/http/http_stream_parser.h" #include "net/http/http_util.h" #include "net/socket/client_socket_handle.h" @@ -130,4 +131,10 @@ void HttpBasicStream::LogNumRttVsBytesMetrics() const { // Log rtt metrics here. } +void HttpBasicStream::Drain(HttpNetworkSession* session) { + HttpResponseBodyDrainer* drainer = new HttpResponseBodyDrainer(this); + drainer->Start(session); + // |drainer| will delete itself. +} + } // namespace net diff --git a/net/http/http_basic_stream.h b/net/http/http_basic_stream.h index 3b38b93..8024576 100644 --- a/net/http/http_basic_stream.h +++ b/net/http/http_basic_stream.h @@ -83,6 +83,8 @@ class HttpBasicStream : public HttpStream { virtual void LogNumRttVsBytesMetrics() const OVERRIDE; + virtual void Drain(HttpNetworkSession* session) OVERRIDE; + private: scoped_refptr<GrowableIOBuffer> read_buf_; diff --git a/net/http/http_network_transaction.cc b/net/http/http_network_transaction.cc index 84df782..c66f0f3 100644 --- a/net/http/http_network_transaction.cc +++ b/net/http/http_network_transaction.cc @@ -39,7 +39,6 @@ #include "net/http/http_proxy_client_socket_pool.h" #include "net/http/http_request_headers.h" #include "net/http/http_request_info.h" -#include "net/http/http_response_body_drainer.h" #include "net/http/http_response_headers.h" #include "net/http/http_response_info.h" #include "net/http/http_server_properties.h" @@ -137,15 +136,8 @@ HttpNetworkTransaction::~HttpNetworkTransaction() { stream_->Close(true /* not reusable */); } else { // Otherwise, we try to drain the response body. - // TODO(willchan): Consider moving this response body draining to the - // stream implementation. For SPDY, there's clearly no point. For - // HTTP, it can vary depending on whether or not we're pipelining. It's - // stream dependent, so the different subtypes should be implementing - // their solutions. - HttpResponseBodyDrainer* drainer = - new HttpResponseBodyDrainer(stream_.release()); - drainer->Start(session_); - // |drainer| will delete itself. + HttpStream* stream = stream_.release(); + stream->Drain(session_); } } } @@ -1198,12 +1190,19 @@ int HttpNetworkTransaction::HandleIOError(int error) { case ERR_CONNECTION_CLOSED: case ERR_CONNECTION_ABORTED: if (ShouldResendRequest(error)) { + net_log_.AddEvent( + NetLog::TYPE_HTTP_TRANSACTION_RESTART_AFTER_ERROR, + make_scoped_refptr(new NetLogIntegerParameter("net_error", error))); ResetConnectionAndRequestForResend(); error = OK; } break; + case ERR_PIPELINE_EVICTION: case ERR_SPDY_PING_FAILED: case ERR_SPDY_SERVER_REFUSED_STREAM: + net_log_.AddEvent( + NetLog::TYPE_HTTP_TRANSACTION_RESTART_AFTER_ERROR, + make_scoped_refptr(new NetLogIntegerParameter("net_error", error))); ResetConnectionAndRequestForResend(); error = OK; break; diff --git a/net/http/http_network_transaction_unittest.cc b/net/http/http_network_transaction_unittest.cc index 1de21e7..546935e 100644 --- a/net/http/http_network_transaction_unittest.cc +++ b/net/http/http_network_transaction_unittest.cc @@ -174,8 +174,8 @@ class HttpNetworkTransactionTest : public PlatformTest { void KeepAliveConnectionResendRequestTest(const MockRead& read_failure); - SimpleGetHelperResult SimpleGetHelper(MockRead data_reads[], - size_t reads_count) { + SimpleGetHelperResult SimpleGetHelperForData(StaticSocketDataProvider* data[], + size_t data_count) { SimpleGetHelperResult out; HttpRequestInfo request; @@ -187,8 +187,9 @@ class HttpNetworkTransactionTest : public PlatformTest { scoped_ptr<HttpTransaction> trans( new HttpNetworkTransaction(CreateSession(&session_deps))); - StaticSocketDataProvider data(data_reads, reads_count, NULL, 0); - session_deps.socket_factory.AddSocketDataProvider(&data); + for (size_t i = 0; i < data_count; ++i) { + session_deps.socket_factory.AddSocketDataProvider(data[i]); + } TestOldCompletionCallback callback; @@ -237,6 +238,13 @@ class HttpNetworkTransactionTest : public PlatformTest { return out; } + SimpleGetHelperResult SimpleGetHelper(MockRead data_reads[], + size_t reads_count) { + StaticSocketDataProvider reads(data_reads, reads_count, NULL, 0); + StaticSocketDataProvider* data[] = { &reads }; + return SimpleGetHelperForData(data, 1); + } + void ConnectStatusHelperWithExpectedStatus(const MockRead& status, int expected_status); @@ -9245,4 +9253,51 @@ TEST_F(HttpNetworkTransactionTest, HttpStreamFactory::set_use_alternate_protocols(false); } +TEST_F(HttpNetworkTransactionTest, ReadPipelineEvictionFallback) { + MockRead data_reads1[] = { + MockRead(false, ERR_PIPELINE_EVICTION), + }; + MockRead data_reads2[] = { + MockRead("HTTP/1.0 200 OK\r\n\r\n"), + MockRead("hello world"), + MockRead(false, OK), + }; + StaticSocketDataProvider data1(data_reads1, arraysize(data_reads1), NULL, 0); + StaticSocketDataProvider data2(data_reads2, arraysize(data_reads2), NULL, 0); + StaticSocketDataProvider* data[] = { &data1, &data2 }; + + SimpleGetHelperResult out = SimpleGetHelperForData(data, arraysize(data)); + + EXPECT_EQ(OK, out.rv); + EXPECT_EQ("HTTP/1.0 200 OK", out.status_line); + EXPECT_EQ("hello world", out.response_data); +} + +TEST_F(HttpNetworkTransactionTest, SendPipelineEvictionFallback) { + MockWrite data_writes1[] = { + MockWrite(false, ERR_PIPELINE_EVICTION), + }; + MockWrite data_writes2[] = { + MockWrite("GET / HTTP/1.1\r\n" + "Host: www.google.com\r\n" + "Connection: keep-alive\r\n\r\n"), + }; + MockRead data_reads2[] = { + MockRead("HTTP/1.0 200 OK\r\n\r\n"), + MockRead("hello world"), + MockRead(false, OK), + }; + StaticSocketDataProvider data1(NULL, 0, + data_writes1, arraysize(data_writes1)); + StaticSocketDataProvider data2(data_reads2, arraysize(data_reads2), + data_writes2, arraysize(data_writes2)); + StaticSocketDataProvider* data[] = { &data1, &data2 }; + + SimpleGetHelperResult out = SimpleGetHelperForData(data, arraysize(data)); + + EXPECT_EQ(OK, out.rv); + EXPECT_EQ("HTTP/1.0 200 OK", out.status_line); + EXPECT_EQ("hello world", out.response_data); +} + } // namespace net diff --git a/net/http/http_pipelined_connection.h b/net/http/http_pipelined_connection.h new file mode 100644 index 0000000..59e61d6 --- /dev/null +++ b/net/http/http_pipelined_connection.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_HTTP_HTTP_PIPELINED_CONNECTION_H_ +#define NET_HTTP_HTTP_PIPELINED_CONNECTION_H_ +#pragma once + +#include "net/base/net_export.h" +#include "net/base/net_log.h" + +namespace net { + +class BoundNetLog; +class ClientSocketHandle; +class HttpPipelinedStream; +class ProxyInfo; +struct SSLConfig; + +class NET_EXPORT_PRIVATE HttpPipelinedConnection { + public: + class Delegate { + public: + // Called when a pipeline has newly available capacity. This may be because + // the first request has been sent and the pipeline is now active. Or, it + // may be because a request successfully completed. + virtual void OnPipelineHasCapacity(HttpPipelinedConnection* pipeline) = 0; + }; + + class Factory { + public: + virtual ~Factory() {} + + virtual HttpPipelinedConnection* CreateNewPipeline( + ClientSocketHandle* connection, + Delegate* delegate, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated) = 0; + }; + + virtual ~HttpPipelinedConnection() {} + + // Returns a new stream that uses this pipeline. + virtual HttpPipelinedStream* CreateNewStream() = 0; + + // The number of streams currently associated with this pipeline. + virtual int depth() const = 0; + + // True if this pipeline can accept new HTTP requests. False if a fatal error + // has occurred. + virtual bool usable() const = 0; + + // True if this pipeline has bound one request and is ready for additional + // requests. + virtual bool active() const = 0; + + // The SSLConfig used to establish this connection. + virtual const SSLConfig& used_ssl_config() const = 0; + + // The ProxyInfo used to establish this connection. + virtual const ProxyInfo& used_proxy_info() const = 0; + + // The source of this pipelined connection. + virtual const NetLog::Source& source() const = 0; + + // True if this connection was NPN negotiated. + virtual bool was_npn_negotiated() const = 0; +}; + +} // namespace net + +#endif // NET_HTTP_HTTP_PIPELINED_CONNECTION_H_ diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc new file mode 100644 index 0000000..33f2ef6 --- /dev/null +++ b/net/http/http_pipelined_connection_impl.cc @@ -0,0 +1,612 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/http/http_pipelined_connection_impl.h" + +#include "base/message_loop.h" +#include "base/stl_util.h" +#include "net/base/io_buffer.h" +#include "net/http/http_pipelined_stream.h" +#include "net/http/http_request_info.h" +#include "net/http/http_stream_parser.h" +#include "net/socket/client_socket_handle.h" + +namespace net { + +HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl( + ClientSocketHandle* connection, + HttpPipelinedConnection::Delegate* delegate, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated) + : delegate_(delegate), + connection_(connection), + used_ssl_config_(used_ssl_config), + used_proxy_info_(used_proxy_info), + net_log_(net_log), + was_npn_negotiated_(was_npn_negotiated), + read_buf_(new GrowableIOBuffer()), + next_pipeline_id_(1), + active_(false), + usable_(true), + completed_one_request_(false), + ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), + send_next_state_(SEND_STATE_NONE), + ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_( + this, &HttpPipelinedConnectionImpl::OnSendIOCallback)), + send_user_callback_(NULL), + read_next_state_(READ_STATE_NONE), + ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_( + this, &HttpPipelinedConnectionImpl::OnReadIOCallback)), + read_user_callback_(NULL) { + CHECK(connection_.get()); +} + +HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { + CHECK_EQ(depth(), 0); + CHECK(stream_info_map_.empty()); + CHECK(deferred_request_queue_.empty()); + CHECK(request_order_.empty()); + CHECK_EQ(send_next_state_, SEND_STATE_NONE); + CHECK_EQ(read_next_state_, READ_STATE_NONE); + CHECK(!send_user_callback_); + CHECK(!read_user_callback_); + if (!usable_) { + connection_->socket()->Disconnect(); + } + connection_->Reset(); +} + +HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() { + int pipeline_id = next_pipeline_id_++; + CHECK(pipeline_id); + HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id); + stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo())); + return stream; +} + +void HttpPipelinedConnectionImpl::InitializeParser( + int pipeline_id, + const HttpRequestInfo* request, + const BoundNetLog& net_log) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(!stream_info_map_[pipeline_id].parser.get()); + stream_info_map_[pipeline_id].state = STREAM_BOUND; + stream_info_map_[pipeline_id].parser.reset(new HttpStreamParser( + connection_.get(), request, read_buf_.get(), net_log)); + + // In case our first stream doesn't SendRequest() immediately, we should still + // allow others to use this pipeline. + if (pipeline_id == 1) { + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::ActivatePipeline)); + } +} + +void HttpPipelinedConnectionImpl::ActivatePipeline() { + if (!active_) { + active_ = true; + delegate_->OnPipelineHasCapacity(this); + } +} + +void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + Close(pipeline_id, false); + + if (stream_info_map_[pipeline_id].state != STREAM_CREATED && + stream_info_map_[pipeline_id].state != STREAM_UNUSED) { + CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); + CHECK(stream_info_map_[pipeline_id].parser.get()); + stream_info_map_[pipeline_id].parser.reset(); + } + CHECK(!stream_info_map_[pipeline_id].parser.get()); + CHECK(!stream_info_map_[pipeline_id].read_headers_callback); + stream_info_map_.erase(pipeline_id); + + delegate_->OnPipelineHasCapacity(this); +} + +int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id, + const std::string& request_line, + const HttpRequestHeaders& headers, + UploadDataStream* request_body, + HttpResponseInfo* response, + OldCompletionCallback* callback) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND); + if (!usable_) { + return ERR_PIPELINE_EVICTION; + } + + DeferredSendRequest deferred_request; + deferred_request.pipeline_id = pipeline_id; + deferred_request.request_line = request_line; + deferred_request.headers = headers; + deferred_request.request_body = request_body; + deferred_request.response = response; + deferred_request.callback = callback; + deferred_request_queue_.push(deferred_request); + + int rv; + if (send_next_state_ == SEND_STATE_NONE) { + send_next_state_ = SEND_STATE_NEXT_REQUEST; + rv = DoSendRequestLoop(OK); + } else { + rv = ERR_IO_PENDING; + } + ActivatePipeline(); + return rv; +} + +int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { + int rv = result; + do { + SendRequestState state = send_next_state_; + send_next_state_ = SEND_STATE_NONE; + switch (state) { + case SEND_STATE_NEXT_REQUEST: + rv = DoSendNextRequest(rv); + break; + case SEND_STATE_COMPLETE: + rv = DoSendComplete(rv); + break; + case SEND_STATE_UNUSABLE: + rv = DoEvictPendingSendRequests(rv); + break; + default: + NOTREACHED() << "bad send state: " << state; + rv = ERR_FAILED; + break; + } + } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); + return rv; +} + +void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { + CHECK(send_user_callback_); + DoSendRequestLoop(result); +} + +int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) { + CHECK(!deferred_request_queue_.empty()); + const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); + CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id)); + if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) { + deferred_request_queue_.pop(); + if (deferred_request_queue_.empty()) { + send_next_state_ = SEND_STATE_NONE; + } else { + send_next_state_ = SEND_STATE_NEXT_REQUEST; + } + return OK; + } + CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get()); + int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest( + deferred_request.request_line, + deferred_request.headers, + deferred_request.request_body, + deferred_request.response, + &send_io_callback_); + // |result| == ERR_IO_PENDING means this function was *not* called on the same + // stack as SendRequest(). That means we returned ERR_IO_PENDING to + // SendRequest() earlier and will need to invoke its callback. + if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) { + send_user_callback_ = deferred_request.callback; + } + stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING; + send_next_state_ = SEND_STATE_COMPLETE; + return rv; +} + +int HttpPipelinedConnectionImpl::DoSendComplete(int result) { + CHECK(!deferred_request_queue_.empty()); + const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); + CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state, + STREAM_SENDING); + request_order_.push(deferred_request.pipeline_id); + stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT; + deferred_request_queue_.pop(); + if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { + result = ERR_PIPELINE_EVICTION; + } + if (result < OK) { + send_next_state_ = SEND_STATE_UNUSABLE; + usable_ = false; + } + if (send_user_callback_) { + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::FireUserCallback, + send_user_callback_, + result)); + send_user_callback_ = NULL; + } + if (result < OK) { + return result; + } + if (deferred_request_queue_.empty()) { + send_next_state_ = SEND_STATE_NONE; + return OK; + } + send_next_state_ = SEND_STATE_NEXT_REQUEST; + return OK; +} + +int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { + send_next_state_ = SEND_STATE_NONE; + while (!deferred_request_queue_.empty()) { + const DeferredSendRequest& evicted_send = deferred_request_queue_.front(); + if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) { + evicted_send.callback->Run(ERR_PIPELINE_EVICTION); + } + deferred_request_queue_.pop(); + } + return result; +} + +int HttpPipelinedConnectionImpl::ReadResponseHeaders( + int pipeline_id, + OldCompletionCallback* callback) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT); + CHECK(!stream_info_map_[pipeline_id].read_headers_callback); + if (!usable_) { + return ERR_PIPELINE_EVICTION; + } + stream_info_map_[pipeline_id].state = STREAM_READ_PENDING; + stream_info_map_[pipeline_id].read_headers_callback = callback; + if (read_next_state_ == READ_STATE_NONE) { + read_next_state_ = READ_STATE_NEXT_HEADERS; + return DoReadHeadersLoop(OK); + } else { + return ERR_IO_PENDING; + } +} + +int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { + int rv = result; + do { + ReadHeadersState state = read_next_state_; + read_next_state_ = READ_STATE_NONE; + switch (state) { + case READ_STATE_NEXT_HEADERS: + rv = DoReadNextHeaders(rv); + break; + case READ_STATE_COMPLETE: + rv = DoReadHeadersComplete(rv); + break; + case READ_STATE_WAITING_FOR_CLOSE: + rv = DoReadWaitingForClose(rv); + return rv; + case READ_STATE_STREAM_CLOSED: + rv = DoReadStreamClosed(); + break; + case READ_STATE_UNUSABLE: + rv = DoEvictPendingReadHeaders(rv); + break; + case READ_STATE_NONE: + break; + default: + NOTREACHED() << "bad read state"; + rv = ERR_FAILED; + break; + } + } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); + return rv; +} + +void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { + CHECK(read_user_callback_); + DoReadHeadersLoop(result); +} + +int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) { + CHECK(!request_order_.empty()); + int pipeline_id = request_order_.front(); + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) { + // Since nobody will read whatever data is on the pipeline associated with + // this closed request, we must shut down the rest of the pipeline. + read_next_state_ = READ_STATE_UNUSABLE; + return OK; + } + if (stream_info_map_[pipeline_id].read_headers_callback == NULL) { + return ERR_IO_PENDING; + } + CHECK(stream_info_map_[pipeline_id].parser.get()); + + if (result == ERR_IO_PENDING) { + CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE); + } else { + CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING); + stream_info_map_[pipeline_id].state = STREAM_ACTIVE; + } + + int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders( + &read_io_callback_); + if (rv == ERR_IO_PENDING) { + read_next_state_ = READ_STATE_COMPLETE; + read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback; + } else if (rv < OK) { + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; + if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) + rv = ERR_PIPELINE_EVICTION; + } else { + CHECK_LE(OK, rv); + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; + } + + // |result| == ERR_IO_PENDING means this function was *not* called on the same + // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to + // ReadResponseHeaders() earlier and now need to invoke its callback. + if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) { + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; + read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback; + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::FireUserCallback, + read_user_callback_, + rv)); + } + return rv; +} + +int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; + if (read_user_callback_) { + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::FireUserCallback, + read_user_callback_, + result)); + read_user_callback_ = NULL; + } + return result; +} + +int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) { + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; + return result; +} + +int HttpPipelinedConnectionImpl::DoReadStreamClosed() { + CHECK(!request_order_.empty()); + int pipeline_id = request_order_.front(); + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); + CHECK(stream_info_map_[pipeline_id].read_headers_callback); + stream_info_map_[pipeline_id].read_headers_callback = NULL; + request_order_.pop(); + if (!usable_) { + read_next_state_ = READ_STATE_UNUSABLE; + return OK; + } else { + completed_one_request_ = true; + if (!request_order_.empty()) { + int next_pipeline_id = request_order_.front(); + CHECK(ContainsKey(stream_info_map_, next_pipeline_id)); + if (stream_info_map_[next_pipeline_id].read_headers_callback) { + stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE; + read_next_state_ = READ_STATE_NEXT_HEADERS; + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::DoReadHeadersLoop, + ERR_IO_PENDING)); + return ERR_IO_PENDING; // Wait for the task to fire. + } + } + read_next_state_ = READ_STATE_NONE; + return OK; + } +} + +int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { + while (!request_order_.empty()) { + int evicted_id = request_order_.front(); + request_order_.pop(); + if (!ContainsKey(stream_info_map_, evicted_id) || + (stream_info_map_[evicted_id].read_headers_callback == NULL)) { + continue; + } + if (stream_info_map_[evicted_id].state != STREAM_CLOSED) { + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &HttpPipelinedConnectionImpl::FireUserCallback, + stream_info_map_[evicted_id].read_headers_callback, + ERR_PIPELINE_EVICTION)); + } + stream_info_map_[evicted_id].read_headers_callback = NULL; + } + read_next_state_ = READ_STATE_NONE; + return result; +} + +void HttpPipelinedConnectionImpl::Close(int pipeline_id, + bool not_reusable) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + switch (stream_info_map_[pipeline_id].state) { + case STREAM_CREATED: + stream_info_map_[pipeline_id].state = STREAM_UNUSED; + break; + + case STREAM_BOUND: + stream_info_map_[pipeline_id].state = STREAM_CLOSED; + break; + + case STREAM_SENDING: + usable_ = false; + stream_info_map_[pipeline_id].state = STREAM_CLOSED; + send_user_callback_ = NULL; + send_next_state_ = SEND_STATE_UNUSABLE; + DoSendRequestLoop(OK); + break; + + case STREAM_SENT: + case STREAM_READ_PENDING: + usable_ = false; + stream_info_map_[pipeline_id].state = STREAM_CLOSED; + stream_info_map_[pipeline_id].read_headers_callback = NULL; + if (read_next_state_ == READ_STATE_NONE) { + read_next_state_ = READ_STATE_UNUSABLE; + DoReadHeadersLoop(OK); + } + break; + + case STREAM_ACTIVE: + stream_info_map_[pipeline_id].state = STREAM_CLOSED; + if (not_reusable) { + usable_ = false; + } + read_next_state_ = READ_STATE_STREAM_CLOSED; + read_user_callback_ = NULL; + DoReadHeadersLoop(OK); + break; + + case STREAM_CLOSED: + case STREAM_UNUSED: + // TODO(simonjam): Why is Close() sometimes called twice? + break; + + default: + NOTREACHED(); + break; + } +} + +int HttpPipelinedConnectionImpl::ReadResponseBody( + int pipeline_id, + IOBuffer* buf, + int buf_len, + OldCompletionCallback* callback) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(!request_order_.empty()); + CHECK_EQ(pipeline_id, request_order_.front()); + CHECK(stream_info_map_[pipeline_id].parser.get()); + return stream_info_map_[pipeline_id].parser->ReadResponseBody( + buf, buf_len, callback); +} + +uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); + return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress(); +} + +HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo( + int pipeline_id) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); + return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo(); +} + +bool HttpPipelinedConnectionImpl::IsResponseBodyComplete( + int pipeline_id) const { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); + return stream_info_map_.find(pipeline_id)->second.parser-> + IsResponseBodyComplete(); +} + +bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); + return stream_info_map_.find(pipeline_id)->second.parser-> + CanFindEndOfResponse(); +} + +bool HttpPipelinedConnectionImpl::IsMoreDataBuffered(int pipeline_id) const { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + return read_buf_->offset() != 0; +} + +bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + if (pipeline_id > 1) { + return true; + } + ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type(); + return connection_->is_reused() || + reuse_type == ClientSocketHandle::UNUSED_IDLE; +} + +void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + connection_->set_is_reused(true); +} + +void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id, + SSLInfo* ssl_info) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(stream_info_map_[pipeline_id].parser.get()); + return stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info); +} + +void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( + int pipeline_id, + SSLCertRequestInfo* cert_request_info) { + CHECK(ContainsKey(stream_info_map_, pipeline_id)); + CHECK(stream_info_map_[pipeline_id].parser.get()); + return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo( + cert_request_info); +} + +void HttpPipelinedConnectionImpl::FireUserCallback( + OldCompletionCallback* callback, + int result) { + CHECK(callback); + callback->Run(result); +} + +int HttpPipelinedConnectionImpl::depth() const { + return stream_info_map_.size(); +} + +bool HttpPipelinedConnectionImpl::usable() const { + return usable_; +} + +bool HttpPipelinedConnectionImpl::active() const { + return active_; +} + +const SSLConfig& HttpPipelinedConnectionImpl::used_ssl_config() const { + return used_ssl_config_; +} + +const ProxyInfo& HttpPipelinedConnectionImpl::used_proxy_info() const { + return used_proxy_info_; +} + +const NetLog::Source& HttpPipelinedConnectionImpl::source() const { + return net_log_.source(); +} + +bool HttpPipelinedConnectionImpl::was_npn_negotiated() const { + return was_npn_negotiated_; +} + +HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() { +} + +HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() { +} + +HttpPipelinedConnectionImpl::StreamInfo::StreamInfo() + : read_headers_callback(NULL), + state(STREAM_CREATED) { +} + +HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() { +} + +} // namespace net diff --git a/net/http/http_pipelined_connection_impl.h b/net/http/http_pipelined_connection_impl.h new file mode 100644 index 0000000..bf39e62 --- /dev/null +++ b/net/http/http_pipelined_connection_impl.h @@ -0,0 +1,271 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_HTTP_HTTP_PIPELINED_CONNECTION_IMPL_H_ +#define NET_HTTP_HTTP_PIPELINED_CONNECTION_IMPL_H_ +#pragma once + +#include <map> +#include <queue> +#include <string> + +#include "base/basictypes.h" +#include "base/memory/linked_ptr.h" +#include "base/task.h" +#include "net/base/completion_callback.h" +#include "net/base/net_export.h" +#include "net/base/net_log.h" +#include "net/base/ssl_config_service.h" +#include "net/base/upload_data_stream.h" +#include "net/http/http_pipelined_connection.h" +#include "net/http/http_request_info.h" +#include "net/http/http_stream_parser.h" +#include "net/proxy/proxy_info.h" + +namespace net { + +class ClientSocketHandle; +class GrowableIOBuffer; +class HttpRequestHeaders; +class HttpResponseInfo; +class IOBuffer; +class SSLCertRequestInfo; +class SSLInfo; + +// This class manages all of the state for a single pipelined connection. It +// tracks the order that HTTP requests are sent and enforces that the +// subsequent reads occur in the appropriate order. +// +// If an error occurs related to pipelining, ERR_PIPELINE_EVICTION will be +// returned to the client. This indicates the client should retry the request +// without pipelining. +class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl + : public HttpPipelinedConnection { + public: + HttpPipelinedConnectionImpl(ClientSocketHandle* connection, + Delegate* delegate, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated); + virtual ~HttpPipelinedConnectionImpl(); + + // HttpPipelinedConnection interface. + + // Used by HttpStreamFactoryImpl and friends. + virtual HttpPipelinedStream* CreateNewStream() OVERRIDE; + + // Used by HttpPipelinedHost. + virtual int depth() const OVERRIDE; + virtual bool usable() const OVERRIDE; + virtual bool active() const OVERRIDE; + + // Used by HttpStreamFactoryImpl. + virtual const SSLConfig& used_ssl_config() const OVERRIDE; + virtual const ProxyInfo& used_proxy_info() const OVERRIDE; + virtual const NetLog::Source& source() const OVERRIDE; + virtual bool was_npn_negotiated() const OVERRIDE; + + // Used by HttpPipelinedStream. + + // Notifies this pipeline that a stream is no longer using it. + void OnStreamDeleted(int pipeline_id); + + // Effective implementation of HttpStream. Note that we don't directly + // implement that interface. Instead, these functions will be called by the + // pass-through methods in HttpPipelinedStream. + void InitializeParser(int pipeline_id, + const HttpRequestInfo* request, + const BoundNetLog& net_log); + + int SendRequest(int pipeline_id, + const std::string& request_line, + const HttpRequestHeaders& headers, + UploadDataStream* request_body, + HttpResponseInfo* response, + OldCompletionCallback* callback); + + int ReadResponseHeaders(int pipeline_id, + OldCompletionCallback* callback); + + int ReadResponseBody(int pipeline_id, + IOBuffer* buf, int buf_len, + OldCompletionCallback* callback); + + void Close(int pipeline_id, + bool not_reusable); + + uint64 GetUploadProgress(int pipeline_id) const; + + HttpResponseInfo* GetResponseInfo(int pipeline_id); + + bool IsResponseBodyComplete(int pipeline_id) const; + + bool CanFindEndOfResponse(int pipeline_id) const; + + bool IsMoreDataBuffered(int pipeline_id) const; + + bool IsConnectionReused(int pipeline_id) const; + + void SetConnectionReused(int pipeline_id); + + void GetSSLInfo(int pipeline_id, + SSLInfo* ssl_info); + + void GetSSLCertRequestInfo(int pipeline_id, + SSLCertRequestInfo* cert_request_info); + + private: + enum StreamState { + STREAM_CREATED, + STREAM_BOUND, + STREAM_SENDING, + STREAM_SENT, + STREAM_READ_PENDING, + STREAM_ACTIVE, + STREAM_CLOSED, + STREAM_UNUSED, + }; + enum SendRequestState { + SEND_STATE_NEXT_REQUEST, + SEND_STATE_COMPLETE, + SEND_STATE_NONE, + SEND_STATE_UNUSABLE, + }; + enum ReadHeadersState { + READ_STATE_NEXT_HEADERS, + READ_STATE_COMPLETE, + READ_STATE_WAITING_FOR_CLOSE, + READ_STATE_STREAM_CLOSED, + READ_STATE_NONE, + READ_STATE_UNUSABLE, + }; + + struct DeferredSendRequest { + DeferredSendRequest(); + ~DeferredSendRequest(); + + int pipeline_id; + std::string request_line; + HttpRequestHeaders headers; + UploadDataStream* request_body; + HttpResponseInfo* response; + OldCompletionCallback* callback; + }; + + struct StreamInfo { + StreamInfo(); + ~StreamInfo(); + + linked_ptr<HttpStreamParser> parser; + OldCompletionCallback* read_headers_callback; + StreamState state; + }; + + typedef std::map<int, StreamInfo> StreamInfoMap; + + // Called after the first request is sent or in a task sometime after the + // first stream is added to this pipeline. This gives the first request + // priority to send, but doesn't hold up other requests if it doesn't. + // When called the first time, notifies the |delegate_| that we can accept new + // requests. + void ActivatePipeline(); + + // Responsible for sending one request at a time and waiting until each + // comepletes. + int DoSendRequestLoop(int result); + + // Called when an asynchronous Send() completes. + void OnSendIOCallback(int result); + + // Sends the next deferred request. This may be called immediately after + // SendRequest(), or it may be in a new task after a prior send completes in + // DoSendComplete(). + int DoSendNextRequest(int result); + + // Notifies the user that the send has completed. This may be called directly + // after SendRequest() for a synchronous request, or it may be called in + // response to OnSendIOCallback for an asynchronous request. + int DoSendComplete(int result); + + // Evicts all unsent deferred requests. This is called if there is a Send() + // error or one of our streams informs us the connection is no longer + // reusable. + int DoEvictPendingSendRequests(int result); + + // Ensures that only the active request's HttpPipelinedSocket can read from + // the underlying socket until it completes. A HttpPipelinedSocket informs us + // that it's done by calling Close(). + int DoReadHeadersLoop(int result); + + // Called when the pending asynchronous ReadResponseHeaders() completes. + void OnReadIOCallback(int result); + + // Determines if the next response in the pipeline is ready to be read. + // If it's ready, then we call ReadResponseHeaders() on the underlying parser. + // HttpPipelinedSocket indicates its readiness by calling + // ReadResponseHeaders(). This function may be called immediately after + // ReadResponseHeaders(), or it may be called in a new task after a previous + // HttpPipelinedSocket finishes its work. + int DoReadNextHeaders(int result); + + // Notifies the user that reading the headers has completed. This may happen + // directly after DoReadNextHeaders() if the response is already available. + // Otherwise, it is called in response to OnReadIOCallback(). + int DoReadHeadersComplete(int result); + + // This is a holding state. It does not do anything, except exit the + // DoReadHeadersLoop(). It is called after DoReadHeadersComplete(). + int DoReadWaitingForClose(int result); + + // Cleans up the state associated with the active request. Invokes + // DoReadNextHeaders() in a new task to start the next response. This is + // called after the active request's HttpPipelinedSocket calls Close(). + int DoReadStreamClosed(); + + // Removes all pending ReadResponseHeaders() requests from the queue. This may + // happen if there is an error with the pipeline or one of our + // HttpPipelinedSockets indicates the connection was suddenly closed. + int DoEvictPendingReadHeaders(int result); + + // Invokes the user's callback in response to SendRequest() or + // ReadResponseHeaders() completing on an underlying parser. This might be + // invoked in response to our own IO callbacks, or it may be invoked if the + // underlying parser completes SendRequest() or ReadResponseHeaders() + // synchronously, but we've already returned ERR_IO_PENDING to the user's + // SendRequest() or ReadResponseHeaders() call into us. + void FireUserCallback(OldCompletionCallback* callback, int result); + + Delegate* delegate_; + scoped_ptr<ClientSocketHandle> connection_; + SSLConfig used_ssl_config_; + ProxyInfo used_proxy_info_; + BoundNetLog net_log_; + bool was_npn_negotiated_; + scoped_refptr<GrowableIOBuffer> read_buf_; + int next_pipeline_id_; + bool active_; + bool usable_; + bool completed_one_request_; + ScopedRunnableMethodFactory<HttpPipelinedConnectionImpl> method_factory_; + + StreamInfoMap stream_info_map_; + + std::queue<int> request_order_; + + std::queue<DeferredSendRequest> deferred_request_queue_; + SendRequestState send_next_state_; + OldCompletionCallbackImpl<HttpPipelinedConnectionImpl> send_io_callback_; + OldCompletionCallback* send_user_callback_; + + ReadHeadersState read_next_state_; + OldCompletionCallbackImpl<HttpPipelinedConnectionImpl> read_io_callback_; + OldCompletionCallback* read_user_callback_; + + DISALLOW_COPY_AND_ASSIGN(HttpPipelinedConnectionImpl); +}; + +} // namespace net + +#endif // NET_HTTP_HTTP_PIPELINED_CONNECTION_IMPL_H_ diff --git a/net/http/http_pipelined_connection_impl_unittest.cc b/net/http/http_pipelined_connection_impl_unittest.cc new file mode 100644 index 0000000..420d8dc --- /dev/null +++ b/net/http/http_pipelined_connection_impl_unittest.cc @@ -0,0 +1,1067 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/http/http_pipelined_connection_impl.h" + +#include <string> + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_vector.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/base/request_priority.h" +#include "net/http/http_pipelined_stream.h" +#include "net/socket/client_socket_handle.h" +#include "net/socket/client_socket_pool_histograms.h" +#include "net/socket/socket_test_util.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::NiceMock; +using testing::StrEq; + +namespace net { + +class DummySocketParams : public base::RefCounted<DummySocketParams> { + private: + friend class base::RefCounted<DummySocketParams>; +}; + +REGISTER_SOCKET_PARAMS_FOR_POOL(MockTransportClientSocketPool, + DummySocketParams); + +class MockPipelineDelegate : public HttpPipelinedConnectionImpl::Delegate { + public: + MOCK_METHOD1(OnPipelineHasCapacity, void(HttpPipelinedConnection* pipeline)); +}; + +class SuddenCloseObserver : public MessageLoop::TaskObserver { + public: + SuddenCloseObserver(HttpStream* stream, int close_before_task) + : stream_(stream), + close_before_task_(close_before_task), + current_task_(0) { } + + virtual void WillProcessTask(base::TimeTicks) OVERRIDE { + ++current_task_; + if (current_task_ == close_before_task_) { + stream_->Close(false); + MessageLoop::current()->RemoveTaskObserver(this); + } + } + + virtual void DidProcessTask(base::TimeTicks) OVERRIDE { } + + private: + HttpStream* stream_; + int close_before_task_; + int current_task_; +}; + +class HttpPipelinedConnectionImplTest : public testing::Test { + public: + HttpPipelinedConnectionImplTest() + : histograms_("a"), + pool_(1, 1, &histograms_, &factory_) { + } + + void TearDown() { + MessageLoop::current()->RunAllPending(); + } + + void Initialize(MockRead* reads, size_t reads_count, + MockWrite* writes, size_t writes_count) { + data_ = new DeterministicSocketData(reads, reads_count, + writes, writes_count); + data_->set_connect_data(MockConnect(false, 0)); + if (reads_count || writes_count) { + data_->StopAfter(reads_count + writes_count); + } + factory_.AddSocketDataProvider(data_.get()); + scoped_refptr<DummySocketParams> params; + ClientSocketHandle* connection = new ClientSocketHandle; + connection->Init("a", params, MEDIUM, NULL, &pool_, BoundNetLog()); + pipeline_.reset(new HttpPipelinedConnectionImpl(connection, &delegate_, + ssl_config_, proxy_info_, + BoundNetLog(), false)); + } + + HttpRequestInfo* GetRequestInfo(const std::string& filename) { + HttpRequestInfo* request_info = new HttpRequestInfo; + request_info->url = GURL("http://localhost/" + filename); + request_info->method = "GET"; + request_info_vector_.push_back(request_info); + return request_info; + } + + HttpStream* NewTestStream(const std::string& filename) { + HttpStream* stream = pipeline_->CreateNewStream(); + HttpRequestInfo* request_info = GetRequestInfo(filename); + int rv = stream->InitializeStream(request_info, BoundNetLog(), NULL); + DCHECK_EQ(OK, rv); + return stream; + } + + void ExpectResponse(const std::string& expected, + scoped_ptr<HttpStream>& stream, bool async) { + scoped_refptr<IOBuffer> buffer(new IOBuffer(expected.size())); + + if (async) { + EXPECT_EQ(ERR_IO_PENDING, + stream->ReadResponseBody(buffer.get(), expected.size(), + &callback_)); + data_->RunFor(1); + EXPECT_EQ(static_cast<int>(expected.size()), callback_.WaitForResult()); + } else { + EXPECT_EQ(static_cast<int>(expected.size()), + stream->ReadResponseBody(buffer.get(), expected.size(), + &callback_)); + } + std::string actual(buffer->data(), expected.size()); + EXPECT_THAT(actual, StrEq(expected)); + } + + void TestSyncRequest(scoped_ptr<HttpStream>& stream, + const std::string& filename) { + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, stream->ReadResponseHeaders(&callback_)); + ExpectResponse(filename, stream, false); + + stream->Close(false); + } + + DeterministicMockClientSocketFactory factory_; + ClientSocketPoolHistograms histograms_; + MockTransportClientSocketPool pool_; + scoped_refptr<DeterministicSocketData> data_; + + SSLConfig ssl_config_; + ProxyInfo proxy_info_; + NiceMock<MockPipelineDelegate> delegate_; + TestOldCompletionCallback callback_; + scoped_ptr<HttpPipelinedConnectionImpl> pipeline_; + ScopedVector<HttpRequestInfo> request_info_vector_; +}; + +TEST_F(HttpPipelinedConnectionImplTest, PipelineNotUsed) { + Initialize(NULL, 0, NULL, 0); +} + +TEST_F(HttpPipelinedConnectionImplTest, StreamNotUsed) { + Initialize(NULL, 0, NULL, 0); + + scoped_ptr<HttpStream> stream(pipeline_->CreateNewStream()); + + stream->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, StreamBoundButNotUsed) { + Initialize(NULL, 0, NULL, 0); + + scoped_ptr<HttpStream> stream(NewTestStream("ok.html")); + + stream->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, SyncSingleRequest) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 2, "Content-Length: 7\r\n\r\n"), + MockRead(false, 3, "ok.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream(NewTestStream("ok.html")); + TestSyncRequest(stream, "ok.html"); +} + +TEST_F(HttpPipelinedConnectionImplTest, AsyncSingleRequest) { + MockWrite writes[] = { + MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(true, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 2, "Content-Length: 7\r\n\r\n"), + MockRead(true, 3, "ok.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream(NewTestStream("ok.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(ERR_IO_PENDING, + stream->SendRequest(headers, NULL, &response, &callback_)); + data_->RunFor(1); + EXPECT_LE(OK, callback_.WaitForResult()); + + EXPECT_EQ(ERR_IO_PENDING, stream->ReadResponseHeaders(&callback_)); + data_->RunFor(2); + EXPECT_LE(OK, callback_.WaitForResult()); + + ExpectResponse("ok.html", stream, true); + + stream->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, LockStepAsyncRequests) { + MockWrite writes[] = { + MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(true, 1, "GET /ko.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(true, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 3, "Content-Length: 7\r\n\r\n"), + MockRead(true, 4, "ok.html"), + MockRead(true, 5, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 6, "Content-Length: 7\r\n\r\n"), + MockRead(true, 7, "ko.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream1(NewTestStream("ok.html")); + scoped_ptr<HttpStream> stream2(NewTestStream("ko.html")); + + HttpRequestHeaders headers1; + HttpResponseInfo response1; + EXPECT_EQ(ERR_IO_PENDING, + stream1->SendRequest(headers1, NULL, &response1, &callback_)); + + HttpRequestHeaders headers2; + HttpResponseInfo response2; + EXPECT_EQ(ERR_IO_PENDING, + stream2->SendRequest(headers2, NULL, &response2, &callback_)); + + data_->RunFor(1); + EXPECT_LE(OK, callback_.WaitForResult()); + data_->RunFor(1); + EXPECT_LE(OK, callback_.WaitForResult()); + + EXPECT_EQ(ERR_IO_PENDING, stream1->ReadResponseHeaders(&callback_)); + EXPECT_EQ(ERR_IO_PENDING, stream2->ReadResponseHeaders(&callback_)); + + data_->RunFor(2); + EXPECT_LE(OK, callback_.WaitForResult()); + + ExpectResponse("ok.html", stream1, true); + + stream1->Close(false); + + data_->RunFor(2); + EXPECT_LE(OK, callback_.WaitForResult()); + + ExpectResponse("ko.html", stream2, true); + + stream2->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, TwoResponsesInOnePacket) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /ko.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, + "HTTP/1.1 200 OK\r\n" + "Content-Length: 7\r\n\r\n" + "ok.html" + "HTTP/1.1 200 OK\r\n" + "Content-Length: 7\r\n\r\n" + "ko.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream1(NewTestStream("ok.html")); + scoped_ptr<HttpStream> stream2(NewTestStream("ko.html")); + + HttpRequestHeaders headers1; + HttpResponseInfo response1; + EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_)); + HttpRequestHeaders headers2; + HttpResponseInfo response2; + EXPECT_EQ(OK, stream2->SendRequest(headers2, NULL, &response2, &callback_)); + + EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", stream1, false); + stream1->Close(false); + + EXPECT_EQ(OK, stream2->ReadResponseHeaders(&callback_)); + ExpectResponse("ko.html", stream2, false); + stream2->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, SendOrderSwapped) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ko.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 4, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 2, "Content-Length: 7\r\n\r\n"), + MockRead(false, 3, "ko.html"), + MockRead(false, 5, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 6, "Content-Length: 7\r\n\r\n"), + MockRead(false, 7, "ok.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream1(NewTestStream("ok.html")); + scoped_ptr<HttpStream> stream2(NewTestStream("ko.html")); + + TestSyncRequest(stream2, "ko.html"); + TestSyncRequest(stream1, "ok.html"); +} + +TEST_F(HttpPipelinedConnectionImplTest, ReadOrderSwapped) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /ko.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 3, "Content-Length: 7\r\n\r\n"), + MockRead(false, 4, "ok.html"), + MockRead(false, 5, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 6, "Content-Length: 7\r\n\r\n"), + MockRead(false, 7, "ko.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream1(NewTestStream("ok.html")); + scoped_ptr<HttpStream> stream2(NewTestStream("ko.html")); + + HttpRequestHeaders headers1; + HttpResponseInfo response1; + EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_)); + + HttpRequestHeaders headers2; + HttpResponseInfo response2; + EXPECT_EQ(OK, stream2->SendRequest(headers2, NULL, &response2, &callback_)); + + EXPECT_EQ(ERR_IO_PENDING, stream2->ReadResponseHeaders(&callback_)); + + EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", stream1, false); + + stream1->Close(false); + + EXPECT_LE(OK, callback_.WaitForResult()); + ExpectResponse("ko.html", stream2, false); + + stream2->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, SendWhileReading) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 3, "GET /ko.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 2, "Content-Length: 7\r\n\r\n"), + MockRead(false, 4, "ok.html"), + MockRead(false, 5, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 6, "Content-Length: 7\r\n\r\n"), + MockRead(false, 7, "ko.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream1(NewTestStream("ok.html")); + scoped_ptr<HttpStream> stream2(NewTestStream("ko.html")); + + HttpRequestHeaders headers1; + HttpResponseInfo response1; + EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_)); + EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_)); + + HttpRequestHeaders headers2; + HttpResponseInfo response2; + EXPECT_EQ(OK, stream2->SendRequest(headers2, NULL, &response2, &callback_)); + + ExpectResponse("ok.html", stream1, false); + stream1->Close(false); + + EXPECT_EQ(OK, stream2->ReadResponseHeaders(&callback_)); + ExpectResponse("ko.html", stream2, false); + stream2->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, AsyncSendWhileAsyncReadBlocked) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(true, 3, "GET /ko.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 2, "Content-Length: 7\r\n\r\n"), + MockRead(true, 4, "ok.html"), + MockRead(false, 5, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 6, "Content-Length: 7\r\n\r\n"), + MockRead(false, 7, "ko.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream1(NewTestStream("ok.html")); + scoped_ptr<HttpStream> stream2(NewTestStream("ko.html")); + + HttpRequestHeaders headers1; + HttpResponseInfo response1; + EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_)); + EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_)); + TestOldCompletionCallback callback1; + std::string expected = "ok.html"; + scoped_refptr<IOBuffer> buffer(new IOBuffer(expected.size())); + EXPECT_EQ(ERR_IO_PENDING, + stream1->ReadResponseBody(buffer.get(), expected.size(), + &callback1)); + + HttpRequestHeaders headers2; + HttpResponseInfo response2; + TestOldCompletionCallback callback2; + EXPECT_EQ(ERR_IO_PENDING, + stream2->SendRequest(headers2, NULL, &response2, &callback2)); + + data_->RunFor(1); + EXPECT_LE(OK, callback2.WaitForResult()); + EXPECT_EQ(ERR_IO_PENDING, stream2->ReadResponseHeaders(&callback2)); + + data_->RunFor(1); + EXPECT_EQ(static_cast<int>(expected.size()), callback1.WaitForResult()); + std::string actual(buffer->data(), expected.size()); + EXPECT_THAT(actual, StrEq(expected)); + stream1->Close(false); + + data_->StopAfter(8); + EXPECT_LE(OK, callback2.WaitForResult()); + ExpectResponse("ko.html", stream2, false); + stream2->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, UnusedStreamAllowsLaterUse) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 2, "Content-Length: 7\r\n\r\n"), + MockRead(false, 3, "ok.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> unused_stream(NewTestStream("unused.html")); + unused_stream->Close(false); + + scoped_ptr<HttpStream> later_stream(NewTestStream("ok.html")); + TestSyncRequest(later_stream, "ok.html"); +} + +TEST_F(HttpPipelinedConnectionImplTest, UnsentStreamAllowsLaterUse) { + MockWrite writes[] = { + MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 4, "GET /ko.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(true, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 2, "Content-Length: 7\r\n\r\n"), + MockRead(true, 3, "ok.html"), + MockRead(false, 5, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 6, "Content-Length: 7\r\n\r\n"), + MockRead(false, 7, "ko.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> stream(NewTestStream("ok.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(ERR_IO_PENDING, + stream->SendRequest(headers, NULL, &response, &callback_)); + + scoped_ptr<HttpStream> unsent_stream(NewTestStream("unsent.html")); + HttpRequestHeaders unsent_headers; + HttpResponseInfo unsent_response; + EXPECT_EQ(ERR_IO_PENDING, + unsent_stream->SendRequest(unsent_headers, NULL, &unsent_response, + &callback_)); + unsent_stream->Close(false); + + data_->RunFor(1); + EXPECT_LE(OK, callback_.WaitForResult()); + + EXPECT_EQ(ERR_IO_PENDING, stream->ReadResponseHeaders(&callback_)); + data_->RunFor(2); + EXPECT_LE(OK, callback_.WaitForResult()); + + ExpectResponse("ok.html", stream, true); + + stream->Close(false); + + data_->StopAfter(8); + scoped_ptr<HttpStream> later_stream(NewTestStream("ko.html")); + TestSyncRequest(later_stream, "ko.html"); +} + +TEST_F(HttpPipelinedConnectionImplTest, FailedSend) { + MockWrite writes[] = { + MockWrite(true, ERR_FAILED), + }; + Initialize(NULL, 0, writes, arraysize(writes)); + + scoped_ptr<HttpStream> failed_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + scoped_ptr<HttpStream> closed_stream(NewTestStream("closed.html")); + scoped_ptr<HttpStream> rejected_stream(NewTestStream("rejected.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + TestOldCompletionCallback failed_callback; + EXPECT_EQ(ERR_IO_PENDING, + failed_stream->SendRequest(headers, NULL, &response, + &failed_callback)); + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->SendRequest(headers, NULL, &response, + &evicted_callback)); + EXPECT_EQ(ERR_IO_PENDING, + closed_stream->SendRequest(headers, NULL, &response, + &callback_)); + closed_stream->Close(false); + + data_->RunFor(1); + EXPECT_EQ(ERR_FAILED, failed_callback.WaitForResult()); + EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult()); + EXPECT_EQ(ERR_PIPELINE_EVICTION, + rejected_stream->SendRequest(headers, NULL, &response, + &callback_)); + + failed_stream->Close(true); + evicted_stream->Close(true); + rejected_stream->Close(true); +} + +TEST_F(HttpPipelinedConnectionImplTest, ConnectionSuddenlyClosedAfterResponse) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /read_evicted.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 2, "GET /read_rejected.html HTTP/1.1\r\n\r\n"), + MockWrite(true, ERR_SOCKET_NOT_CONNECTED, 5), + }; + MockRead reads[] = { + MockRead(false, 3, "HTTP/1.1 200 OK\r\n\r\n"), + MockRead(false, 4, "ok.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> closed_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> read_evicted_stream( + NewTestStream("read_evicted.html")); + scoped_ptr<HttpStream> read_rejected_stream( + NewTestStream("read_rejected.html")); + scoped_ptr<HttpStream> send_closed_stream( + NewTestStream("send_closed.html")); + scoped_ptr<HttpStream> send_evicted_stream( + NewTestStream("send_evicted.html")); + scoped_ptr<HttpStream> send_rejected_stream( + NewTestStream("send_rejected.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, closed_stream->SendRequest(headers, NULL, &response, + &callback_)); + EXPECT_EQ(OK, read_evicted_stream->SendRequest(headers, NULL, &response, + &callback_)); + EXPECT_EQ(OK, read_rejected_stream->SendRequest(headers, NULL, &response, + &callback_)); + TestOldCompletionCallback send_closed_callback; + EXPECT_EQ(ERR_IO_PENDING, + send_closed_stream->SendRequest(headers, NULL, &response, + &send_closed_callback)); + TestOldCompletionCallback send_evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + send_evicted_stream->SendRequest(headers, NULL, &response, + &send_evicted_callback)); + + TestOldCompletionCallback read_evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + read_evicted_stream->ReadResponseHeaders(&read_evicted_callback)); + + EXPECT_EQ(OK, closed_stream->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", closed_stream, false); + closed_stream->Close(true); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, read_evicted_callback.WaitForResult()); + read_evicted_stream->Close(true); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, + read_rejected_stream->ReadResponseHeaders(&callback_)); + read_rejected_stream->Close(true); + + data_->RunFor(1); + EXPECT_EQ(ERR_SOCKET_NOT_CONNECTED, send_closed_callback.WaitForResult()); + send_closed_stream->Close(true); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, send_evicted_callback.WaitForResult()); + send_evicted_stream->Close(true); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, + send_rejected_stream->SendRequest(headers, NULL, &response, + &callback_)); + send_rejected_stream->Close(true); +} + +TEST_F(HttpPipelinedConnectionImplTest, AbortWhileSending) { + MockWrite writes[] = { + MockWrite(true, 0, "GET /aborts.html HTTP/1.1\r\n\r\n"), + }; + Initialize(NULL, 0, writes, arraysize(writes)); + + scoped_ptr<HttpStream> aborted_stream(NewTestStream("aborts.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + TestOldCompletionCallback aborted_callback; + EXPECT_EQ(ERR_IO_PENDING, + aborted_stream->SendRequest(headers, NULL, &response, + &aborted_callback)); + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->SendRequest(headers, NULL, &response, + &evicted_callback)); + + aborted_stream->Close(true); + EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult()); + evicted_stream->Close(true); + EXPECT_FALSE(aborted_callback.have_result()); +} + +TEST_F(HttpPipelinedConnectionImplTest, AbortWhileSendingSecondRequest) { + MockWrite writes[] = { + MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(true, 1, "GET /aborts.html HTTP/1.1\r\n\r\n"), + }; + Initialize(NULL, 0, writes, arraysize(writes)); + + scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> aborted_stream(NewTestStream("aborts.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + TestOldCompletionCallback ok_callback; + EXPECT_EQ(ERR_IO_PENDING, + ok_stream->SendRequest(headers, NULL, &response, + &ok_callback)); + TestOldCompletionCallback aborted_callback; + EXPECT_EQ(ERR_IO_PENDING, + aborted_stream->SendRequest(headers, NULL, &response, + &aborted_callback)); + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->SendRequest(headers, NULL, &response, + &evicted_callback)); + + data_->RunFor(1); + EXPECT_LE(OK, ok_callback.WaitForResult()); + MessageLoop::current()->RunAllPending(); + aborted_stream->Close(true); + EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult()); + evicted_stream->Close(true); + EXPECT_FALSE(aborted_callback.have_result()); + ok_stream->Close(true); +} + +TEST_F(HttpPipelinedConnectionImplTest, AbortWhileReadingHeaders) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /aborts.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(true, ERR_FAILED, 2), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> aborted_stream(NewTestStream("aborts.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + scoped_ptr<HttpStream> rejected_stream(NewTestStream("rejected.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, aborted_stream->SendRequest(headers, NULL, &response, + &callback_)); + EXPECT_EQ(OK, evicted_stream->SendRequest(headers, NULL, &response, + &callback_)); + + EXPECT_EQ(ERR_IO_PENDING, aborted_stream->ReadResponseHeaders(&callback_)); + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->ReadResponseHeaders(&evicted_callback)); + + aborted_stream->Close(true); + EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult()); + evicted_stream->Close(true); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, + rejected_stream->SendRequest(headers, NULL, &response, &callback_)); + rejected_stream->Close(true); +} + +TEST_F(HttpPipelinedConnectionImplTest, PendingResponseAbandoned) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /abandoned.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 2, "GET /evicted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 3, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 4, "Content-Length: 7\r\n\r\n"), + MockRead(false, 5, "ok.html"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> abandoned_stream(NewTestStream("abandoned.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, abandoned_stream->SendRequest(headers, NULL, &response, + &callback_)); + EXPECT_EQ(OK, evicted_stream->SendRequest(headers, NULL, &response, + &callback_)); + + EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_)); + TestOldCompletionCallback abandoned_callback; + EXPECT_EQ(ERR_IO_PENDING, + abandoned_stream->ReadResponseHeaders(&abandoned_callback)); + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->ReadResponseHeaders(&evicted_callback)); + + abandoned_stream->Close(false); + + ExpectResponse("ok.html", ok_stream, false); + ok_stream->Close(false); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult()); + evicted_stream->Close(true); + EXPECT_FALSE(evicted_stream->IsConnectionReusable()); +} + +TEST_F(HttpPipelinedConnectionImplTest, DisconnectedAfterOneRequestRecovery) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /rejected.html HTTP/1.1\r\n\r\n"), + MockWrite(true, ERR_SOCKET_NOT_CONNECTED, 5), + MockWrite(false, ERR_SOCKET_NOT_CONNECTED, 7), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 3, "Content-Length: 7\r\n\r\n"), + MockRead(false, 4, "ok.html"), + MockRead(false, ERR_SOCKET_NOT_CONNECTED, 6), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> rejected_read_stream(NewTestStream("rejected.html")); + scoped_ptr<HttpStream> evicted_send_stream(NewTestStream("evicted.html")); + scoped_ptr<HttpStream> rejected_send_stream(NewTestStream("rejected.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, rejected_read_stream->SendRequest( + headers, NULL, &response, &callback_)); + + EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", ok_stream, false); + ok_stream->Close(false); + + TestOldCompletionCallback read_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_send_stream->SendRequest(headers, NULL, &response, + &read_callback)); + data_->RunFor(1); + EXPECT_EQ(ERR_PIPELINE_EVICTION, read_callback.WaitForResult()); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, + rejected_read_stream->ReadResponseHeaders(&callback_)); + EXPECT_EQ(ERR_PIPELINE_EVICTION, + rejected_send_stream->SendRequest(headers, NULL, &response, + &callback_)); + + rejected_read_stream->Close(true); + rejected_send_stream->Close(true); +} + +TEST_F(HttpPipelinedConnectionImplTest, DisconnectedPendingReadRecovery) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 3, "Content-Length: 7\r\n\r\n"), + MockRead(false, 4, "ok.html"), + MockRead(false, ERR_SOCKET_NOT_CONNECTED, 5), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, evicted_stream->SendRequest( + headers, NULL, &response, &callback_)); + + EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", ok_stream, false); + + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->ReadResponseHeaders(&evicted_callback)); + + ok_stream->Close(false); + + EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult()); + evicted_stream->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, CloseCalledBeforeNextReadLoop) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 3, "Content-Length: 7\r\n\r\n"), + MockRead(false, 4, "ok.html"), + MockRead(false, ERR_SOCKET_NOT_CONNECTED, 5), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, evicted_stream->SendRequest( + headers, NULL, &response, &callback_)); + + EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", ok_stream, false); + + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->ReadResponseHeaders(&evicted_callback)); + + ok_stream->Close(false); + evicted_stream->Close(false); +} + +TEST_F(HttpPipelinedConnectionImplTest, CloseCalledBeforeReadCallback) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(false, 3, "Content-Length: 7\r\n\r\n"), + MockRead(false, 4, "ok.html"), + MockRead(false, ERR_SOCKET_NOT_CONNECTED, 5), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html")); + scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, evicted_stream->SendRequest( + headers, NULL, &response, &callback_)); + + EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_)); + ExpectResponse("ok.html", ok_stream, false); + + TestOldCompletionCallback evicted_callback; + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->ReadResponseHeaders(&evicted_callback)); + + ok_stream->Close(false); + + // The posted tasks should be: + // 1. DoReadHeadersLoop, which will post: + // 2. InvokeUserCallback + SuddenCloseObserver observer(evicted_stream.get(), 2); + MessageLoop::current()->AddTaskObserver(&observer); + MessageLoop::current()->RunAllPending(); + EXPECT_FALSE(evicted_callback.have_result()); +} + +class StreamDeleter { + public: + StreamDeleter(HttpStream* stream) : + stream_(stream), + ALLOW_THIS_IN_INITIALIZER_LIST( + callback_(this, &StreamDeleter::OnIOComplete)) { + } + + OldCompletionCallbackImpl<StreamDeleter>* callback() { return &callback_; } + + private: + void OnIOComplete(int result) { + delete stream_; + } + + HttpStream* stream_; + OldCompletionCallbackImpl<StreamDeleter> callback_; +}; + +TEST_F(HttpPipelinedConnectionImplTest, CloseCalledDuringSendCallback) { + MockWrite writes[] = { + MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + Initialize(NULL, 0, writes, arraysize(writes)); + + HttpStream* stream(NewTestStream("ok.html")); + + StreamDeleter deleter(stream); + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(headers, NULL, &response, + deleter.callback())); + data_->RunFor(1); +} + +TEST_F(HttpPipelinedConnectionImplTest, CloseCalledDuringReadCallback) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 1, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 2, "Content-Length: 7\r\n\r\n"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + HttpStream* stream(NewTestStream("ok.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, stream->SendRequest(headers, NULL, &response, &callback_)); + + StreamDeleter deleter(stream); + EXPECT_EQ(ERR_IO_PENDING, stream->ReadResponseHeaders(deleter.callback())); + data_->RunFor(1); +} + +TEST_F(HttpPipelinedConnectionImplTest, + CloseCalledDuringReadCallbackWithPendingRead) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /failed.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 3, "Content-Length: 7\r\n\r\n"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + HttpStream* failed_stream(NewTestStream("failed.html")); + HttpStream* evicted_stream(NewTestStream("evicted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, + failed_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, + evicted_stream->SendRequest(headers, NULL, &response, &callback_)); + + StreamDeleter failed_deleter(failed_stream); + EXPECT_EQ(ERR_IO_PENDING, + failed_stream->ReadResponseHeaders(failed_deleter.callback())); + StreamDeleter evicted_deleter(evicted_stream); + EXPECT_EQ(ERR_IO_PENDING, + evicted_stream->ReadResponseHeaders(evicted_deleter.callback())); + data_->RunFor(1); +} + +TEST_F(HttpPipelinedConnectionImplTest, CloseOtherDuringReadCallback) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /deleter.html HTTP/1.1\r\n\r\n"), + MockWrite(false, 1, "GET /deleted.html HTTP/1.1\r\n\r\n"), + }; + MockRead reads[] = { + MockRead(false, 2, "HTTP/1.1 200 OK\r\n"), + MockRead(true, 3, "Content-Length: 7\r\n\r\n"), + }; + Initialize(reads, arraysize(reads), writes, arraysize(writes)); + + scoped_ptr<HttpStream> deleter_stream(NewTestStream("deleter.html")); + HttpStream* deleted_stream(NewTestStream("deleted.html")); + + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, + deleter_stream->SendRequest(headers, NULL, &response, &callback_)); + EXPECT_EQ(OK, + deleted_stream->SendRequest(headers, NULL, &response, &callback_)); + + StreamDeleter deleter(deleted_stream); + EXPECT_EQ(ERR_IO_PENDING, + deleter_stream->ReadResponseHeaders(deleter.callback())); + EXPECT_EQ(ERR_IO_PENDING, deleted_stream->ReadResponseHeaders(&callback_)); + data_->RunFor(1); +} + +TEST_F(HttpPipelinedConnectionImplTest, OnPipelineHasCapacity) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + Initialize(NULL, 0, writes, arraysize(writes)); + + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(0); + scoped_ptr<HttpStream> stream(NewTestStream("ok.html")); + + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1); + HttpRequestHeaders headers; + HttpResponseInfo response; + EXPECT_EQ(OK, stream->SendRequest(headers, NULL, &response, &callback_)); + + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(0); + MessageLoop::current()->RunAllPending(); + + stream->Close(false); + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1); + stream.reset(NULL); +} + +TEST_F(HttpPipelinedConnectionImplTest, OnPipelineHasCapacityWithoutSend) { + MockWrite writes[] = { + MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"), + }; + Initialize(NULL, 0, writes, arraysize(writes)); + + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(0); + scoped_ptr<HttpStream> stream(NewTestStream("ok.html")); + + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1); + MessageLoop::current()->RunAllPending(); + + stream->Close(false); + EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1); + stream.reset(NULL); +} + +} // namespace net diff --git a/net/http/http_pipelined_host.cc b/net/http/http_pipelined_host.cc new file mode 100644 index 0000000..32d023d --- /dev/null +++ b/net/http/http_pipelined_host.cc @@ -0,0 +1,108 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/http/http_pipelined_host.h" + +#include "base/stl_util.h" +#include "net/http/http_pipelined_connection_impl.h" +#include "net/http/http_pipelined_stream.h" + +namespace net { + +class HttpPipelinedConnectionImplFactory : + public HttpPipelinedConnection::Factory { + public: + HttpPipelinedConnection* CreateNewPipeline( + ClientSocketHandle* connection, + HttpPipelinedConnection::Delegate* delegate, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated) OVERRIDE { + return new HttpPipelinedConnectionImpl(connection, delegate, + used_ssl_config, used_proxy_info, + net_log, was_npn_negotiated); + } +}; + +HttpPipelinedHost::HttpPipelinedHost( + HttpPipelinedHost::Delegate* delegate, + const HostPortPair& origin, + HttpPipelinedConnection::Factory* factory) + : delegate_(delegate), + origin_(origin), + factory_(factory) { + if (!factory) { + factory_.reset(new HttpPipelinedConnectionImplFactory()); + } +} + +HttpPipelinedHost::~HttpPipelinedHost() { + CHECK(pipelines_.empty()); +} + +HttpPipelinedStream* HttpPipelinedHost::CreateStreamOnNewPipeline( + ClientSocketHandle* connection, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated) { + HttpPipelinedConnection* pipeline = factory_->CreateNewPipeline( + connection, this, used_ssl_config, used_proxy_info, net_log, + was_npn_negotiated); + pipelines_.insert(pipeline); + return pipeline->CreateNewStream(); +} + +HttpPipelinedStream* HttpPipelinedHost::CreateStreamOnExistingPipeline() { + HttpPipelinedConnection* available_pipeline = NULL; + for (std::set<HttpPipelinedConnection*>::iterator it = pipelines_.begin(); + it != pipelines_.end(); ++it) { + if ((*it)->usable() && + (*it)->active() && + (*it)->depth() < max_pipeline_depth() && + (!available_pipeline || (*it)->depth() < available_pipeline->depth())) { + available_pipeline = *it; + } + } + if (!available_pipeline) { + return NULL; + } + return available_pipeline->CreateNewStream(); +} + +bool HttpPipelinedHost::IsExistingPipelineAvailable() { + for (std::set<HttpPipelinedConnection*>::iterator it = pipelines_.begin(); + it != pipelines_.end(); ++it) { + if ((*it)->usable() && + (*it)->active() && + (*it)->depth() < max_pipeline_depth()) { + return true; + } + } + return false; +} + +void HttpPipelinedHost::OnPipelineEmpty(HttpPipelinedConnection* pipeline) { + CHECK(ContainsKey(pipelines_, pipeline)); + pipelines_.erase(pipeline); + delete pipeline; + if (pipelines_.empty()) { + delegate_->OnHostIdle(this); + // WARNING: We'll probably be deleted here. + } +} + +void HttpPipelinedHost::OnPipelineHasCapacity( + HttpPipelinedConnection* pipeline) { + if (pipeline->usable() && pipeline->depth() < max_pipeline_depth()) { + delegate_->OnHostHasAdditionalCapacity(this); + } + if (!pipeline->depth()) { + OnPipelineEmpty(pipeline); + // WARNING: We might be deleted here. + } +} + +} // namespace net diff --git a/net/http/http_pipelined_host.h b/net/http/http_pipelined_host.h new file mode 100644 index 0000000..ae3128d --- /dev/null +++ b/net/http/http_pipelined_host.h @@ -0,0 +1,93 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_HTTP_HTTP_PIPELINED_HOST_H_ +#define NET_HTTP_HTTP_PIPELINED_HOST_H_ +#pragma once + +#include <set> +#include <string> + +#include "base/basictypes.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/host_port_pair.h" +#include "net/base/net_export.h" +#include "net/http/http_pipelined_connection.h" + +namespace net { + +class BoundNetLog; +class ClientSocketHandle; +class HttpPipelinedStream; +class ProxyInfo; +struct SSLConfig; + +// Manages all of the pipelining state for specific host with active pipelined +// HTTP requests. Manages connection jobs, constructs pipelined streams, and +// assigns requests to the least loaded pipelined connection. +class NET_EXPORT_PRIVATE HttpPipelinedHost + : public HttpPipelinedConnection::Delegate { + public: + class Delegate { + public: + // Called when a pipelined host has no outstanding requests on any of its + // pipelined connections. + virtual void OnHostIdle(HttpPipelinedHost* host) = 0; + + // Called when a pipelined host has newly available pipeline capacity, like + // when a request completes. + virtual void OnHostHasAdditionalCapacity(HttpPipelinedHost* host) = 0; + }; + + HttpPipelinedHost(Delegate* delegate, const HostPortPair& origin, + HttpPipelinedConnection::Factory* factory); + virtual ~HttpPipelinedHost(); + + // Constructs a new pipeline on |connection| and returns a new + // HttpPipelinedStream that uses it. + HttpPipelinedStream* CreateStreamOnNewPipeline( + ClientSocketHandle* connection, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated); + + // Tries to find an existing pipeline with capacity for a new request. If + // successful, returns a new stream on that pipeline. Otherwise, returns NULL. + HttpPipelinedStream* CreateStreamOnExistingPipeline(); + + // Returns true if we have a pipelined connection that can accept new + // requests. + bool IsExistingPipelineAvailable(); + + // Callbacks for HttpPipelinedConnection. + + // Called when a pipelined connection completes a request. Adds a pending + // request to the pipeline if the pipeline is still usable. + virtual void OnPipelineHasCapacity( + HttpPipelinedConnection* pipeline) OVERRIDE; + + const HostPortPair& origin() const { return origin_; } + + private: + // Called when a pipeline is empty and there are no pending requests. Closes + // the connection. + void OnPipelineEmpty(HttpPipelinedConnection* pipeline); + + // Adds the next pending request to the pipeline if it's still usuable. + void AddRequestToPipeline(HttpPipelinedConnection* connection); + + int max_pipeline_depth() const { return 3; } + + Delegate* delegate_; + const HostPortPair origin_; + std::set<HttpPipelinedConnection*> pipelines_; + scoped_ptr<HttpPipelinedConnection::Factory> factory_; + + DISALLOW_COPY_AND_ASSIGN(HttpPipelinedHost); +}; + +} // namespace net + +#endif // NET_HTTP_HTTP_PIPELINED_HOST_H_ diff --git a/net/http/http_pipelined_host_pool.cc b/net/http/http_pipelined_host_pool.cc new file mode 100644 index 0000000..bd238d8 --- /dev/null +++ b/net/http/http_pipelined_host_pool.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/http/http_pipelined_host_pool.h" + +#include "base/logging.h" +#include "base/stl_util.h" +#include "net/http/http_pipelined_host.h" +#include "net/http/http_stream_factory_impl.h" + +namespace net { + +HttpPipelinedHostPool::HttpPipelinedHostPool(HttpStreamFactoryImpl* factory) + : factory_(factory) { +} + +HttpPipelinedHostPool::~HttpPipelinedHostPool() { + CHECK(host_map_.empty()); +} + +HttpPipelinedStream* HttpPipelinedHostPool::CreateStreamOnNewPipeline( + const HostPortPair& origin, + ClientSocketHandle* connection, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated) { + HttpPipelinedHost* host = GetPipelinedHost(origin, true); + return host->CreateStreamOnNewPipeline(connection, used_ssl_config, + used_proxy_info, net_log, + was_npn_negotiated); +} + +HttpPipelinedStream* HttpPipelinedHostPool::CreateStreamOnExistingPipeline( + const HostPortPair& origin) { + HttpPipelinedHost* host = GetPipelinedHost(origin, false); + if (!host) { + return NULL; + } + return host->CreateStreamOnExistingPipeline(); +} + +bool HttpPipelinedHostPool::IsExistingPipelineAvailableForOrigin( + const HostPortPair& origin) { + HttpPipelinedHost* host = GetPipelinedHost(origin, false); + if (!host) { + return false; + } + return host->IsExistingPipelineAvailable(); +} + +HttpPipelinedHost* HttpPipelinedHostPool::GetPipelinedHost( + const HostPortPair& origin, bool create_if_not_found) { + HostMap::iterator it = host_map_.find(origin); + if (it != host_map_.end()) { + CHECK(it->second); + return it->second; + } else if (!create_if_not_found) { + return NULL; + } + HttpPipelinedHost* host = new HttpPipelinedHost(this, origin, NULL); + host_map_[origin] = host; + return host; +} + +void HttpPipelinedHostPool::OnHostIdle(HttpPipelinedHost* host) { + const HostPortPair& origin = host->origin(); + CHECK(ContainsKey(host_map_, origin)); + // TODO(simonjam): We should remember the pipeline state for each host. + host_map_.erase(origin); + delete host; +} + +void HttpPipelinedHostPool::OnHostHasAdditionalCapacity( + HttpPipelinedHost* host) { + factory_->OnHttpPipelinedHostHasAdditionalCapacity(host->origin()); +} + +} // namespace net diff --git a/net/http/http_pipelined_host_pool.h b/net/http/http_pipelined_host_pool.h new file mode 100644 index 0000000..5e7c146 --- /dev/null +++ b/net/http/http_pipelined_host_pool.h @@ -0,0 +1,66 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_HTTP_HTTP_PIPELINED_HOST_POOL_H_ +#define NET_HTTP_HTTP_PIPELINED_HOST_POOL_H_ +#pragma once + +#include <map> + +#include "base/basictypes.h" +#include "net/http/http_pipelined_host.h" + +namespace net { + +class HostPortPair; +class HttpPipelinedStream; +class HttpStreamFactoryImpl; + +// Manages all of the pipelining state for specific host with active pipelined +// HTTP requests. Manages connection jobs, constructs pipelined streams, and +// assigns requests to the least loaded pipelined connection. +class HttpPipelinedHostPool : public HttpPipelinedHost::Delegate { + public: + explicit HttpPipelinedHostPool(HttpStreamFactoryImpl* factory); + ~HttpPipelinedHostPool(); + + // Constructs a new pipeline on |connection| and returns a new + // HttpPipelinedStream that uses it. + HttpPipelinedStream* CreateStreamOnNewPipeline( + const HostPortPair& origin, + ClientSocketHandle* connection, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated); + + // Tries to find an existing pipeline with capacity for a new request. If + // successful, returns a new stream on that pipeline. Otherwise, returns NULL. + HttpPipelinedStream* CreateStreamOnExistingPipeline( + const HostPortPair& origin); + + // Returns true if a pipelined connection already exists for this origin and + // can accept new requests. + bool IsExistingPipelineAvailableForOrigin(const HostPortPair& origin); + + // Callbacks for HttpPipelinedHost. + virtual void OnHostIdle(HttpPipelinedHost* host) OVERRIDE; + + virtual void OnHostHasAdditionalCapacity(HttpPipelinedHost* host) OVERRIDE; + + private: + typedef std::map<const HostPortPair, HttpPipelinedHost*> HostMap; + + HttpPipelinedHost* GetPipelinedHost(const HostPortPair& origin, + bool create_if_not_found); + + HttpStreamFactoryImpl* factory_; + HostMap host_map_; + + DISALLOW_COPY_AND_ASSIGN(HttpPipelinedHostPool); +}; + +} // namespace net + +#endif // NET_HTTP_HTTP_PIPELINED_HOST_POOL_H_ diff --git a/net/http/http_pipelined_host_unittest.cc b/net/http/http_pipelined_host_unittest.cc new file mode 100644 index 0000000..de78e9e --- /dev/null +++ b/net/http/http_pipelined_host_unittest.cc @@ -0,0 +1,201 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/http/http_pipelined_host.h" + +#include "net/base/ssl_config_service.h" +#include "net/http/http_pipelined_connection.h" +#include "net/proxy/proxy_info.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::NiceMock; +using testing::Ref; +using testing::Return; +using testing::ReturnNull; + +static const int kMaxCapacity = 3; + +namespace net { + +static ClientSocketHandle* kDummyConnection = + reinterpret_cast<ClientSocketHandle*>(84); +static HttpPipelinedStream* kDummyStream = + reinterpret_cast<HttpPipelinedStream*>(42); + +class MockHostDelegate : public HttpPipelinedHost::Delegate { + public: + MOCK_METHOD1(OnHostIdle, void(HttpPipelinedHost* host)); + MOCK_METHOD1(OnHostHasAdditionalCapacity, void(HttpPipelinedHost* host)); +}; + +class MockPipelineFactory : public HttpPipelinedConnection::Factory { + public: + MOCK_METHOD6(CreateNewPipeline, HttpPipelinedConnection*( + ClientSocketHandle* connection, + HttpPipelinedConnection::Delegate* delegate, + const SSLConfig& used_ssl_config, + const ProxyInfo& used_proxy_info, + const BoundNetLog& net_log, + bool was_npn_negotiated)); +}; + +class MockPipeline : public HttpPipelinedConnection { + public: + MockPipeline(int depth, bool usable, bool active) + : depth_(depth), + usable_(usable), + active_(active) { + } + + void SetState(int depth, bool usable, bool active) { + depth_ = depth; + usable_ = usable; + active_ = active; + } + + virtual int depth() const OVERRIDE { return depth_; } + virtual bool usable() const OVERRIDE { return usable_; } + virtual bool active() const OVERRIDE { return active_; } + + MOCK_METHOD0(CreateNewStream, HttpPipelinedStream*()); + MOCK_METHOD1(OnStreamDeleted, void(int pipeline_id)); + MOCK_CONST_METHOD0(used_ssl_config, const SSLConfig&()); + MOCK_CONST_METHOD0(used_proxy_info, const ProxyInfo&()); + MOCK_CONST_METHOD0(source, const NetLog::Source&()); + MOCK_CONST_METHOD0(was_npn_negotiated, bool()); + + private: + int depth_; + bool usable_; + bool active_; +}; + +class HttpPipelinedHostTest : public testing::Test { + public: + HttpPipelinedHostTest() + : origin_("host", 123), + factory_(new MockPipelineFactory), // Owned by host_. + host_(&delegate_, origin_, factory_) { + } + + MockPipeline* AddTestPipeline(int depth, bool usable, bool active) { + MockPipeline* pipeline = new MockPipeline(depth, usable, active); + EXPECT_CALL(*factory_, CreateNewPipeline(kDummyConnection, &host_, + Ref(ssl_config_), Ref(proxy_info_), + Ref(net_log_), true)) + .Times(1) + .WillOnce(Return(pipeline)); + EXPECT_CALL(*pipeline, CreateNewStream()) + .Times(1) + .WillOnce(Return(kDummyStream)); + EXPECT_EQ(kDummyStream, host_.CreateStreamOnNewPipeline( + kDummyConnection, ssl_config_, proxy_info_, net_log_, true)); + return pipeline; + } + + void ClearTestPipeline(MockPipeline* pipeline) { + pipeline->SetState(0, true, true); + host_.OnPipelineHasCapacity(pipeline); + } + + NiceMock<MockHostDelegate> delegate_; + HostPortPair origin_; + MockPipelineFactory* factory_; + HttpPipelinedHost host_; + + SSLConfig ssl_config_; + ProxyInfo proxy_info_; + BoundNetLog net_log_; +}; + +TEST_F(HttpPipelinedHostTest, Delegate) { + EXPECT_TRUE(origin_.Equals(host_.origin())); +} + +TEST_F(HttpPipelinedHostTest, OnHostIdle) { + MockPipeline* pipeline = AddTestPipeline(0, false, true); + + EXPECT_CALL(delegate_, OnHostHasAdditionalCapacity(&host_)) + .Times(0); + EXPECT_CALL(delegate_, OnHostIdle(&host_)) + .Times(1); + host_.OnPipelineHasCapacity(pipeline); +} + +TEST_F(HttpPipelinedHostTest, OnHostHasAdditionalCapacity) { + MockPipeline* pipeline = AddTestPipeline(1, true, true); + + EXPECT_CALL(delegate_, OnHostHasAdditionalCapacity(&host_)) + .Times(2); + EXPECT_CALL(delegate_, OnHostIdle(&host_)) + .Times(0); + + host_.OnPipelineHasCapacity(pipeline); + + EXPECT_CALL(delegate_, OnHostIdle(&host_)) + .Times(1); + ClearTestPipeline(pipeline); +} + +TEST_F(HttpPipelinedHostTest, IgnoresUnusablePipeline) { + MockPipeline* pipeline = AddTestPipeline(1, false, true); + + EXPECT_FALSE(host_.IsExistingPipelineAvailable()); + EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline()); + + ClearTestPipeline(pipeline); +} + +TEST_F(HttpPipelinedHostTest, IgnoresInactivePipeline) { + MockPipeline* pipeline = AddTestPipeline(1, true, false); + + EXPECT_FALSE(host_.IsExistingPipelineAvailable()); + EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline()); + + ClearTestPipeline(pipeline); +} + +TEST_F(HttpPipelinedHostTest, IgnoresFullPipeline) { + MockPipeline* pipeline = AddTestPipeline(kMaxCapacity, true, true); + + EXPECT_FALSE(host_.IsExistingPipelineAvailable()); + EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline()); + + ClearTestPipeline(pipeline); +} + +TEST_F(HttpPipelinedHostTest, PicksLeastLoadedPipeline) { + MockPipeline* full_pipeline = AddTestPipeline(kMaxCapacity, true, true); + MockPipeline* usable_pipeline = AddTestPipeline(kMaxCapacity - 1, true, true); + MockPipeline* empty_pipeline = AddTestPipeline(0, true, true); + + EXPECT_TRUE(host_.IsExistingPipelineAvailable()); + EXPECT_CALL(*empty_pipeline, CreateNewStream()) + .Times(1) + .WillOnce(ReturnNull()); + EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline()); + + ClearTestPipeline(full_pipeline); + ClearTestPipeline(usable_pipeline); + ClearTestPipeline(empty_pipeline); +} + +TEST_F(HttpPipelinedHostTest, EmptyPipelineIsRemoved) { + MockPipeline* empty_pipeline = AddTestPipeline(0, true, true); + + EXPECT_TRUE(host_.IsExistingPipelineAvailable()); + EXPECT_CALL(*empty_pipeline, CreateNewStream()) + .Times(1) + .WillOnce(Return(kDummyStream)); + EXPECT_EQ(kDummyStream, host_.CreateStreamOnExistingPipeline()); + + ClearTestPipeline(empty_pipeline); + + EXPECT_FALSE(host_.IsExistingPipelineAvailable()); + EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline()); +} + +} // namespace net diff --git a/net/http/http_pipelined_stream.cc b/net/http/http_pipelined_stream.cc new file mode 100644 index 0000000..2183c52 --- /dev/null +++ b/net/http/http_pipelined_stream.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/http/http_pipelined_stream.h" + +#include "base/logging.h" +#include "base/stringprintf.h" +#include "net/base/net_errors.h" +#include "net/http/http_pipelined_connection_impl.h" +#include "net/http/http_request_headers.h" +#include "net/http/http_request_info.h" +#include "net/http/http_util.h" + +namespace net { + +HttpPipelinedStream::HttpPipelinedStream(HttpPipelinedConnectionImpl* pipeline, + int pipeline_id) + : pipeline_(pipeline), + pipeline_id_(pipeline_id), + request_info_(NULL) { +} + +HttpPipelinedStream::~HttpPipelinedStream() { + pipeline_->OnStreamDeleted(pipeline_id_); +} + +int HttpPipelinedStream::InitializeStream(const HttpRequestInfo* request_info, + const BoundNetLog& net_log, + OldCompletionCallback* callback) { + request_info_ = request_info; + pipeline_->InitializeParser(pipeline_id_, request_info, net_log); + return OK; +} + + +int HttpPipelinedStream::SendRequest(const HttpRequestHeaders& headers, + UploadDataStream* request_body, + HttpResponseInfo* response, + OldCompletionCallback* callback) { + CHECK(pipeline_id_); + CHECK(request_info_); + // TODO(simonjam): Proxy support will be needed here. + const std::string path = HttpUtil::PathForRequest(request_info_->url); + std::string request_line_ = base::StringPrintf("%s %s HTTP/1.1\r\n", + request_info_->method.c_str(), + path.c_str()); + return pipeline_->SendRequest(pipeline_id_, request_line_, headers, + request_body, response, callback); +} + +uint64 HttpPipelinedStream::GetUploadProgress() const { + return pipeline_->GetUploadProgress(pipeline_id_); +} + +int HttpPipelinedStream::ReadResponseHeaders(OldCompletionCallback* callback) { + return pipeline_->ReadResponseHeaders(pipeline_id_, callback); +} + +const HttpResponseInfo* HttpPipelinedStream::GetResponseInfo() const { + return pipeline_->GetResponseInfo(pipeline_id_); +} + +int HttpPipelinedStream::ReadResponseBody(IOBuffer* buf, int buf_len, + OldCompletionCallback* callback) { + return pipeline_->ReadResponseBody(pipeline_id_, buf, buf_len, callback); +} + +void HttpPipelinedStream::Close(bool not_reusable) { + pipeline_->Close(pipeline_id_, not_reusable); +} + +HttpStream* HttpPipelinedStream::RenewStreamForAuth() { + // FIXME: What does this mean on a pipeline? Is it for proxies? + return new HttpPipelinedStream(pipeline_, pipeline_id_); +} + +bool HttpPipelinedStream::IsResponseBodyComplete() const { + return pipeline_->IsResponseBodyComplete(pipeline_id_); +} + +bool HttpPipelinedStream::CanFindEndOfResponse() const { + return pipeline_->CanFindEndOfResponse(pipeline_id_); +} + +bool HttpPipelinedStream::IsMoreDataBuffered() const { + return pipeline_->IsMoreDataBuffered(pipeline_id_); +} + +bool HttpPipelinedStream::IsConnectionReused() const { + return pipeline_->IsConnectionReused(pipeline_id_); +} + +void HttpPipelinedStream::SetConnectionReused() { + pipeline_->SetConnectionReused(pipeline_id_); +} + +bool HttpPipelinedStream::IsConnectionReusable() const { + return pipeline_->usable(); +} + +void HttpPipelinedStream::GetSSLInfo(SSLInfo* ssl_info) { + pipeline_->GetSSLInfo(pipeline_id_, ssl_info); +} + +void HttpPipelinedStream::GetSSLCertRequestInfo( + SSLCertRequestInfo* cert_request_info) { + pipeline_->GetSSLCertRequestInfo(pipeline_id_, cert_request_info); +} + +bool HttpPipelinedStream::IsSpdyHttpStream() const { + return false; +} + +void HttpPipelinedStream::LogNumRttVsBytesMetrics() const { + // TODO(simonjam): I don't want to copy & paste this from http_basic_stream. +} + +void HttpPipelinedStream::Drain(HttpNetworkSession*) { + // On errors, we already evict everything from the pipeline and close it. + // TODO(simonjam): Consider trying to drain the pipeline in the same way that + // HttpBasicStream does. + delete this; +} + +const SSLConfig& HttpPipelinedStream::used_ssl_config() const { + return pipeline_->used_ssl_config(); +} + +const ProxyInfo& HttpPipelinedStream::used_proxy_info() const { + return pipeline_->used_proxy_info(); +} + +const NetLog::Source& HttpPipelinedStream::source() const { + return pipeline_->source(); +} + +bool HttpPipelinedStream::was_npn_negotiated() const { + return pipeline_->was_npn_negotiated(); +} + +} // namespace net diff --git a/net/http/http_pipelined_stream.h b/net/http/http_pipelined_stream.h new file mode 100644 index 0000000..dfafe69 --- /dev/null +++ b/net/http/http_pipelined_stream.h @@ -0,0 +1,112 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_HTTP_HTTP_PIPELINED_STREAM_H_ +#define NET_HTTP_HTTP_PIPELINED_STREAM_H_ +#pragma once + +#include <string> + +#include "base/basictypes.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/net_log.h" +#include "net/http/http_stream.h" + +namespace net { + +class BoundNetLog; +class HttpPipelinedConnectionImpl; +class HttpResponseInfo; +class HttpRequestHeaders; +struct HttpRequestInfo; +class HttpStreamParser; +class IOBuffer; +class ProxyInfo; +struct SSLConfig; +class UploadDataStream; + +// HttpPipelinedStream is the pipelined implementation of HttpStream. It has +// very little code in it. Instead, it serves as the client's interface to the +// pipelined connection, where all the work happens. +// +// In the case of pipelining failures, these functions may return +// ERR_PIPELINE_EVICTION. In that case, the client should retry the HTTP +// request without pipelining. +class HttpPipelinedStream : public HttpStream { + public: + HttpPipelinedStream(HttpPipelinedConnectionImpl* pipeline, + int pipeline_id); + virtual ~HttpPipelinedStream(); + + // HttpStream methods: + virtual int InitializeStream(const HttpRequestInfo* request_info, + const BoundNetLog& net_log, + OldCompletionCallback* callback) OVERRIDE; + + virtual int SendRequest(const HttpRequestHeaders& headers, + UploadDataStream* request_body, + HttpResponseInfo* response, + OldCompletionCallback* callback) OVERRIDE; + + virtual uint64 GetUploadProgress() const OVERRIDE; + + virtual int ReadResponseHeaders(OldCompletionCallback* callback) OVERRIDE; + + virtual const HttpResponseInfo* GetResponseInfo() const OVERRIDE; + + virtual int ReadResponseBody(IOBuffer* buf, int buf_len, + OldCompletionCallback* callback) OVERRIDE; + + virtual void Close(bool not_reusable) OVERRIDE; + + virtual HttpStream* RenewStreamForAuth() OVERRIDE; + + virtual bool IsResponseBodyComplete() const OVERRIDE; + + virtual bool CanFindEndOfResponse() const OVERRIDE; + + virtual bool IsMoreDataBuffered() const OVERRIDE; + + virtual bool IsConnectionReused() const OVERRIDE; + + virtual void SetConnectionReused() OVERRIDE; + + virtual bool IsConnectionReusable() const OVERRIDE; + + virtual void GetSSLInfo(SSLInfo* ssl_info) OVERRIDE; + + virtual void GetSSLCertRequestInfo( + SSLCertRequestInfo* cert_request_info) OVERRIDE; + + virtual bool IsSpdyHttpStream() const OVERRIDE; + + virtual void LogNumRttVsBytesMetrics() const OVERRIDE; + + virtual void Drain(HttpNetworkSession* session) OVERRIDE; + + // The SSLConfig used to establish this stream's pipeline. + const SSLConfig& used_ssl_config() const; + + // The ProxyInfo used to establish this this stream's pipeline. + const ProxyInfo& used_proxy_info() const; + + // The source of this stream's pipelined connection. + const NetLog::Source& source() const; + + // True if this stream's pipeline was NPN negotiated. + bool was_npn_negotiated() const; + + private: + HttpPipelinedConnectionImpl* pipeline_; + + int pipeline_id_; + + const HttpRequestInfo* request_info_; + + DISALLOW_COPY_AND_ASSIGN(HttpPipelinedStream); +}; + +} // namespace net + +#endif // NET_HTTP_HTTP_PIPELINED_STREAM_H_ diff --git a/net/http/http_response_body_drainer_unittest.cc b/net/http/http_response_body_drainer_unittest.cc index 9f774db..9db5ec0 100644 --- a/net/http/http_response_body_drainer_unittest.cc +++ b/net/http/http_response_body_drainer_unittest.cc @@ -120,6 +120,8 @@ class MockHttpStream : public HttpStream { virtual void LogNumRttVsBytesMetrics() const OVERRIDE {} + virtual void Drain(HttpNetworkSession*) OVERRIDE {} + // Methods to tweak/observer mock behavior: void StallReadsForever() { stall_reads_forever_ = true; } diff --git a/net/http/http_stream.h b/net/http/http_stream.h index 7460fb7..2d6402f 100644 --- a/net/http/http_stream.h +++ b/net/http/http_stream.h @@ -23,6 +23,7 @@ namespace net { class BoundNetLog; +class HttpNetworkSession; class HttpRequestHeaders; struct HttpRequestInfo; class HttpResponseInfo; @@ -139,6 +140,12 @@ class NET_EXPORT_PRIVATE HttpStream { // response body vs bytes transferred. virtual void LogNumRttVsBytesMetrics() const = 0; + // In the case of an HTTP error or redirect, flush the response body (usually + // a simple error or "this page has moved") so that we can re-use the + // underlying connection. This stream is responsible for deleting itself when + // draining is complete. + virtual void Drain(HttpNetworkSession* session) = 0; + private: DISALLOW_COPY_AND_ASSIGN(HttpStream); }; diff --git a/net/http/http_stream_factory.cc b/net/http/http_stream_factory.cc index 70a4940..7e81d61 100644 --- a/net/http/http_stream_factory.cc +++ b/net/http/http_stream_factory.cc @@ -33,6 +33,8 @@ bool HttpStreamFactory::force_spdy_always_ = false; std::list<HostPortPair>* HttpStreamFactory::forced_spdy_exclusions_ = NULL; // static bool HttpStreamFactory::ignore_certificate_errors_ = false; +// static +bool HttpStreamFactory::http_pipelining_enabled_ = false; HttpStreamFactory::~HttpStreamFactory() {} diff --git a/net/http/http_stream_factory.h b/net/http/http_stream_factory.h index 3cbad96..31e0cc1 100644 --- a/net/http/http_stream_factory.h +++ b/net/http/http_stream_factory.h @@ -24,6 +24,7 @@ class HostMappingRules; class HostPortPair; class HttpAuthController; class HttpNetworkSession; +class HttpPipelinedHost; class HttpResponseInfo; class HttpServerProperties; class HttpStream; @@ -239,6 +240,11 @@ class NET_EXPORT HttpStreamFactory { static void SetHostMappingRules(const std::string& rules); + static void set_http_pipelining_enabled(bool value) { + http_pipelining_enabled_ = value; + } + static bool http_pipelining_enabled() { return http_pipelining_enabled_; } + protected: HttpStreamFactory(); @@ -253,6 +259,7 @@ class NET_EXPORT HttpStreamFactory { static bool force_spdy_always_; static std::list<HostPortPair>* forced_spdy_exclusions_; static bool ignore_certificate_errors_; + static bool http_pipelining_enabled_; DISALLOW_COPY_AND_ASSIGN(HttpStreamFactory); }; diff --git a/net/http/http_stream_factory_impl.cc b/net/http/http_stream_factory_impl.cc index 364d202..9593dc4 100644 --- a/net/http/http_stream_factory_impl.cc +++ b/net/http/http_stream_factory_impl.cc @@ -10,6 +10,9 @@ #include "net/base/net_log.h" #include "net/base/net_util.h" #include "net/http/http_network_session.h" +#include "net/http/http_pipelined_connection.h" +#include "net/http/http_pipelined_host.h" +#include "net/http/http_pipelined_stream.h" #include "net/http/http_server_properties.h" #include "net/http/http_stream_factory_impl_job.h" #include "net/http/http_stream_factory_impl_request.h" @@ -33,7 +36,8 @@ GURL UpgradeUrlToHttps(const GURL& original_url, int port) { } // namespace HttpStreamFactoryImpl::HttpStreamFactoryImpl(HttpNetworkSession* session) - : session_(session) {} + : session_(session), + http_pipelined_host_pool_(this) {} HttpStreamFactoryImpl::~HttpStreamFactoryImpl() { DCHECK(request_map_.empty()); @@ -221,4 +225,21 @@ void HttpStreamFactoryImpl::OnPreconnectsComplete(const Job* job) { OnPreconnectsCompleteInternal(); } +void HttpStreamFactoryImpl::OnHttpPipelinedHostHasAdditionalCapacity( + const HostPortPair& origin) { + HttpPipelinedStream* stream; + while (ContainsKey(http_pipelining_request_map_, origin) && + (stream = http_pipelined_host_pool_.CreateStreamOnExistingPipeline( + origin))) { + Request* request = *http_pipelining_request_map_[origin].begin(); + request->Complete(stream->was_npn_negotiated(), + false, // not using_spdy + stream->source()); + request->OnStreamReady(NULL, + stream->used_ssl_config(), + stream->used_proxy_info(), + stream); + } +} + } // namespace net diff --git a/net/http/http_stream_factory_impl.h b/net/http/http_stream_factory_impl.h index b84f158..9c74e23 100644 --- a/net/http/http_stream_factory_impl.h +++ b/net/http/http_stream_factory_impl.h @@ -10,13 +10,15 @@ #include "base/memory/ref_counted.h" #include "net/base/host_port_pair.h" -#include "net/http/http_stream_factory.h" #include "net/base/net_log.h" +#include "net/http/http_pipelined_host_pool.h" +#include "net/http/http_stream_factory.h" #include "net/proxy/proxy_server.h" namespace net { class HttpNetworkSession; +class HttpPipelinedHost; class SpdySession; class NET_EXPORT_PRIVATE HttpStreamFactoryImpl : public HttpStreamFactory { @@ -40,12 +42,18 @@ class NET_EXPORT_PRIVATE HttpStreamFactoryImpl : public HttpStreamFactory { virtual void AddTLSIntolerantServer(const HostPortPair& server); virtual bool IsTLSIntolerantServer(const HostPortPair& server) const; + // Called when a HttpPipelinedHost has new capacity. Attempts to allocate any + // pending pipeline-capable requests to pipelines. + virtual void OnHttpPipelinedHostHasAdditionalCapacity( + const HostPortPair& origin); + private: class Request; class Job; typedef std::set<Request*> RequestSet; typedef std::map<HostPortProxyPair, RequestSet> SpdySessionRequestMap; + typedef std::map<HostPortPair, RequestSet> HttpPipeliningRequestMap; bool GetAlternateProtocolRequestFor(const GURL& original_url, GURL* alternate_url) const; @@ -88,6 +96,9 @@ class NET_EXPORT_PRIVATE HttpStreamFactoryImpl : public HttpStreamFactory { std::map<const Job*, Request*> request_map_; SpdySessionRequestMap spdy_session_request_map_; + HttpPipeliningRequestMap http_pipelining_request_map_; + + HttpPipelinedHostPool http_pipelined_host_pool_; // These jobs correspond to jobs orphaned by Requests and now owned by // HttpStreamFactoryImpl. Since they are no longer tied to Requests, they will diff --git a/net/http/http_stream_factory_impl_job.cc b/net/http/http_stream_factory_impl_job.cc index 74a7895..d858be1 100644 --- a/net/http/http_stream_factory_impl_job.cc +++ b/net/http/http_stream_factory_impl_job.cc @@ -16,10 +16,15 @@ #include "net/base/ssl_cert_request_info.h" #include "net/http/http_basic_stream.h" #include "net/http/http_network_session.h" +#include "net/http/http_pipelined_connection.h" +#include "net/http/http_pipelined_host.h" +#include "net/http/http_pipelined_host_pool.h" +#include "net/http/http_pipelined_stream.h" #include "net/http/http_proxy_client_socket.h" #include "net/http/http_proxy_client_socket_pool.h" #include "net/http/http_request_info.h" #include "net/http/http_server_properties.h" +#include "net/http/http_stream_factory.h" #include "net/http/http_stream_factory_impl_request.h" #include "net/socket/client_socket_handle.h" #include "net/socket/client_socket_pool.h" @@ -88,6 +93,7 @@ HttpStreamFactoryImpl::Job::Job(HttpStreamFactoryImpl* stream_factory, was_npn_negotiated_(false), num_streams_(0), spdy_session_direct_(false), + existing_available_pipeline_(false), ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)) { DCHECK(stream_factory); DCHECK(session); @@ -598,6 +604,17 @@ int HttpStreamFactoryImpl::Job::DoInitConnection() { } else if (request_ && (using_ssl_ || ShouldForceSpdyWithoutSSL())) { // Update the spdy session key for the request that launched this job. request_->SetSpdySessionKey(spdy_session_key); + } else if (IsRequestEligibleForPipelining()) { + // TODO(simonjam): With pipelining, we might be better off using fewer + // connections and thus should make fewer preconnections. Explore + // preconnecting fewer than the requested num_connections. + existing_available_pipeline_ = stream_factory_->http_pipelined_host_pool_. + IsExistingPipelineAvailableForOrigin(origin_); + if (existing_available_pipeline_) { + return OK; + } else { + request_->SetHttpPipeliningKey(origin_); + } } // OK, there's no available SPDY session. Let |dependent_job_| resume if it's @@ -756,10 +773,6 @@ int HttpStreamFactoryImpl::Job::DoInitConnectionComplete(int result) { return result; } - if (!connection_->socket()) { - HACKCrashHereToDebug80095(); - } - next_state_ = STATE_CREATE_STREAM; return OK; } @@ -774,8 +787,8 @@ int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) { } int HttpStreamFactoryImpl::Job::DoCreateStream() { - if (!connection_->socket() && !existing_spdy_session_) - HACKCrashHereToDebug80095(); + DCHECK(connection_->socket() || existing_spdy_session_ || + existing_available_pipeline_); next_state_ = STATE_CREATE_STREAM_COMPLETE; @@ -790,16 +803,30 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() { if (!using_spdy_) { bool using_proxy = (proxy_info_.is_http() || proxy_info_.is_https()) && request_info_.url.SchemeIs("http"); - stream_.reset(new HttpBasicStream(connection_.release(), NULL, - using_proxy)); + // TODO(simonjam): Support proxies. + if (existing_available_pipeline_) { + stream_.reset(stream_factory_->http_pipelined_host_pool_. + CreateStreamOnExistingPipeline(origin_)); + CHECK(stream_.get()); + } else if (!using_proxy && IsRequestEligibleForPipelining()) { + stream_.reset( + stream_factory_->http_pipelined_host_pool_.CreateStreamOnNewPipeline( + origin_, + connection_.release(), + server_ssl_config_, + proxy_info_, + net_log_, + was_npn_negotiated_)); + CHECK(stream_.get()); + } else { + stream_.reset(new HttpBasicStream(connection_.release(), NULL, + using_proxy)); + } return OK; } CHECK(!stream_.get()); - if (!connection_->socket() && !existing_spdy_session_) - HACKCrashHereToDebug80095(); - bool direct = true; HostPortProxyPair pair(origin_, proxy_server); if (IsHttpsProxyAndHttpUrl()) { @@ -821,12 +848,6 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() { SpdySessionPool* spdy_pool = session_->spdy_session_pool(); spdy_session = spdy_pool->GetIfExists(pair, net_log_); if (!spdy_session) { - // SPDY can be negotiated using the TLS next protocol negotiation (NPN) - // extension, or just directly using SSL. Either way, |connection_| must - // contain an SSLClientSocket. - if (!connection_->socket()) - HACKCrashHereToDebug80095(); - int error = spdy_pool->GetSpdySessionFromSocket( pair, connection_.release(), net_log_, spdy_certificate_error_, &new_spdy_session_, using_ssl_); @@ -1102,47 +1123,17 @@ bool HttpStreamFactoryImpl::Job::IsOrphaned() const { return !IsPreconnecting() && !request_; } -#if defined(OS_WIN) -#pragma warning (disable: 4748) -#pragma optimize( "", off ) -#endif - -void HttpStreamFactoryImpl::Job::HACKCrashHereToDebug80095() { - // If we enter this code path, then we'll cause a crash later in - // DoCreateStream(). Crash now and figure out what happened: - // http://crbug.com/80095. - GURL url = original_url_.get() ? *original_url_ : request_info_.url; - bool using_ssl = using_ssl_; - bool using_spdy = using_spdy_; - char url_buf[512]; - base::strlcpy(url_buf, url.spec().data(), arraysize(url_buf)); - - // Note that these local variables have their addresses referenced to - // prevent the compiler from optimizing them away. - if (using_spdy) { - LOG(FATAL) << "Crashing here because willchan doesn't know why we're " - << "crashing later. Sorry! I'll give you a cookie later. " - << "Cheers mate!\n" - << "url[" << &url << "]: " << url << "\n" - << "using_ssl[" << &using_ssl << "]: " - << (using_ssl ? "true\n" : "false\n") - << "using_spdy[" << &using_spdy << "]: " - << (using_spdy ? "true\n" : "false\n"); - } else { - LOG(FATAL) << "Crashing here because willchan doesn't know why we're " - << "crashing later. Sorry! I'll give you a cookie later. " - << "Cheers mate!\n" - << "url[" << &url << "]: " << url << "\n" - << "using_ssl[" << &using_ssl << "]: " - << (using_ssl ? "true\n" : "false\n") - << "using_spdy[" << &using_spdy << "]: " - << (using_spdy ? "true\n" : "false\n"); +bool HttpStreamFactoryImpl::Job::IsRequestEligibleForPipelining() const { + if (!HttpStreamFactory::http_pipelining_enabled()) { + return false; + } + if (IsPreconnecting() || !request_) { + return false; } + if (using_ssl_) { + return false; + } + return request_info_.method == "GET" || request_info_.method == "HEAD"; } -#if defined(OS_WIN) -#pragma optimize( "", on ) -#pragma warning (default: 4748) -#endif - } // namespace net diff --git a/net/http/http_stream_factory_impl_job.h b/net/http/http_stream_factory_impl_job.h index b3d4175..50da574 100644 --- a/net/http/http_stream_factory_impl_job.h +++ b/net/http/http_stream_factory_impl_job.h @@ -23,6 +23,7 @@ namespace net { class ClientSocketHandle; class HttpAuthController; class HttpNetworkSession; +class HttpPipelinedConnection; class HttpProxySocketParams; class HttpStream; class SOCKSSocketParams; @@ -197,11 +198,11 @@ class HttpStreamFactoryImpl::Job { // Should we force SPDY to run without SSL for this stream request. bool ShouldForceSpdyWithoutSSL() const; + bool IsRequestEligibleForPipelining() const; + // Record histograms of latency until Connect() completes. static void LogHttpConnectedMetrics(const ClientSocketHandle& handle); - void HACKCrashHereToDebug80095(); - Request* request_; const HttpRequestInfo request_info_; @@ -276,6 +277,9 @@ class HttpStreamFactoryImpl::Job { // Only used if |new_spdy_session_| is non-NULL. bool spdy_session_direct_; + // True if an existing pipeline can handle this job's request. + bool existing_available_pipeline_; + ScopedRunnableMethodFactory<Job> method_factory_; DISALLOW_COPY_AND_ASSIGN(Job); diff --git a/net/http/http_stream_factory_impl_request.cc b/net/http/http_stream_factory_impl_request.cc index 801638a..17c717e 100644 --- a/net/http/http_stream_factory_impl_request.cc +++ b/net/http/http_stream_factory_impl_request.cc @@ -40,9 +40,10 @@ HttpStreamFactoryImpl::Request::~Request() { for (std::set<Job*>::iterator it = jobs_.begin(); it != jobs_.end(); ++it) factory_->request_map_.erase(*it); - STLDeleteElements(&jobs_); - RemoveRequestFromSpdySessionRequestMap(); + RemoveRequestFromHttpPipeliningRequestMap(); + + STLDeleteElements(&jobs_); } void HttpStreamFactoryImpl::Request::SetSpdySessionKey( @@ -55,6 +56,16 @@ void HttpStreamFactoryImpl::Request::SetSpdySessionKey( request_set.insert(this); } +void HttpStreamFactoryImpl::Request::SetHttpPipeliningKey( + const HostPortPair& http_pipelining_key) { + DCHECK(!http_pipelining_key_.get()); + http_pipelining_key_.reset(new HostPortPair(http_pipelining_key)); + RequestSet& request_set = + factory_->http_pipelining_request_map_[http_pipelining_key]; + DCHECK(!ContainsKey(request_set, this)); + request_set.insert(this); +} + void HttpStreamFactoryImpl::Request::AttachJob(Job* job) { DCHECK(job); jobs_.insert(job); @@ -84,7 +95,8 @@ void HttpStreamFactoryImpl::Request::OnStreamReady( DCHECK(completed_); // |job| should only be NULL if we're being serviced by a late bound - // SpdySession (one that was not created by a job in our |jobs_| set). + // SpdySession or HttpPipelinedConnection (one that was not created by a job + // in our |jobs_| set). if (!job) { DCHECK(!bound_job_.get()); DCHECK(!jobs_.empty()); @@ -225,6 +237,22 @@ HttpStreamFactoryImpl::Request::RemoveRequestFromSpdySessionRequestMap() { } } +void +HttpStreamFactoryImpl::Request::RemoveRequestFromHttpPipeliningRequestMap() { + if (http_pipelining_key_.get()) { + HttpPipeliningRequestMap& http_pipelining_request_map = + factory_->http_pipelining_request_map_; + DCHECK(ContainsKey(http_pipelining_request_map, *http_pipelining_key_)); + RequestSet& request_set = + http_pipelining_request_map[*http_pipelining_key_]; + DCHECK(ContainsKey(request_set, this)); + request_set.erase(this); + if (request_set.empty()) + http_pipelining_request_map.erase(*http_pipelining_key_); + http_pipelining_key_.reset(); + } +} + void HttpStreamFactoryImpl::Request::OnSpdySessionReady( Job* job, scoped_refptr<SpdySession> spdy_session, @@ -278,6 +306,7 @@ void HttpStreamFactoryImpl::Request::OrphanJobsExcept(Job* job) { void HttpStreamFactoryImpl::Request::OrphanJobs() { RemoveRequestFromSpdySessionRequestMap(); + RemoveRequestFromHttpPipeliningRequestMap(); std::set<Job*> tmp; tmp.swap(jobs_); diff --git a/net/http/http_stream_factory_impl_request.h b/net/http/http_stream_factory_impl_request.h index 4a7ace4..da81b73 100644 --- a/net/http/http_stream_factory_impl_request.h +++ b/net/http/http_stream_factory_impl_request.h @@ -30,6 +30,11 @@ class HttpStreamFactoryImpl::Request : public HttpStreamRequest { // before knowing if SPDY is available. void SetSpdySessionKey(const HostPortProxyPair& spdy_session_key); + // Called when the Job determines the appropriate |http_pipelining_key| for + // the Request. Registers this Request with the factory, so that if an + // existing pipeline becomes available, this Request can be late bound to it. + void SetHttpPipeliningKey(const HostPortPair& http_pipelining_key); + // Attaches |job| to this request. Does not mean that Request will use |job|, // but Request will own |job|. void AttachJob(HttpStreamFactoryImpl::Job* job); @@ -41,10 +46,14 @@ class HttpStreamFactoryImpl::Request : public HttpStreamRequest { bool using_spdy, const NetLog::Source& source); - // If this Request has a spdy_session_key, remove this session from the + // If this Request has a |spdy_session_key_|, remove this session from the // SpdySessionRequestMap. void RemoveRequestFromSpdySessionRequestMap(); + // If this Request has a |http_pipelining_key_|, remove this session from the + // HttpPipeliningRequestMap. + void RemoveRequestFromHttpPipeliningRequestMap(); + // Called by an attached Job if it sets up a SpdySession. void OnSpdySessionReady(Job* job, scoped_refptr<SpdySession> spdy_session, @@ -102,6 +111,7 @@ class HttpStreamFactoryImpl::Request : public HttpStreamRequest { scoped_ptr<Job> bound_job_; std::set<HttpStreamFactoryImpl::Job*> jobs_; scoped_ptr<const HostPortProxyPair> spdy_session_key_; + scoped_ptr<const HostPortPair> http_pipelining_key_; bool completed_; bool was_npn_negotiated_; diff --git a/net/http/http_stream_parser.cc b/net/http/http_stream_parser.cc index 028669f..2a86220 100644 --- a/net/http/http_stream_parser.cc +++ b/net/http/http_stream_parser.cc @@ -85,7 +85,6 @@ HttpStreamParser::HttpStreamParser(ClientSocketHandle* connection, chunk_length_(0), chunk_length_without_encoding_(0), sent_last_chunk_(false) { - DCHECK_EQ(0, read_buffer->offset()); } HttpStreamParser::~HttpStreamParser() { diff --git a/net/net.gyp b/net/net.gyp index a6223d8..6962b2e 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -414,6 +414,15 @@ 'http/http_network_session_peer.h', 'http/http_network_transaction.cc', 'http/http_network_transaction.h', + 'http/http_pipelined_connection.h', + 'http/http_pipelined_connection_impl.cc', + 'http/http_pipelined_connection_impl.h', + 'http/http_pipelined_host.cc', + 'http/http_pipelined_host.h', + 'http/http_pipelined_host_pool.cc', + 'http/http_pipelined_host_pool.h', + 'http/http_pipelined_stream.cc', + 'http/http_pipelined_stream.h', 'http/http_proxy_client_socket.cc', 'http/http_proxy_client_socket.h', 'http/http_proxy_client_socket_pool.cc', @@ -1034,6 +1043,8 @@ 'http/http_mac_signature_unittest.cc', 'http/http_network_layer_unittest.cc', 'http/http_network_transaction_unittest.cc', + 'http/http_pipelined_connection_impl_unittest.cc', + 'http/http_pipelined_host_unittest.cc', 'http/http_proxy_client_socket_pool_unittest.cc', 'http/http_request_headers_unittest.cc', 'http/http_response_body_drainer_unittest.cc', diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index 474d497..175f827 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -472,4 +472,9 @@ bool SpdyHttpStream::IsSpdyHttpStream() const { return true; } +void SpdyHttpStream::Drain(HttpNetworkSession* session) { + Close(false); + delete this; +} + } // namespace net diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h index 46a4104..b025cd2 100644 --- a/net/spdy/spdy_http_stream.h +++ b/net/spdy/spdy_http_stream.h @@ -70,6 +70,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate, SSLCertRequestInfo* cert_request_info) OVERRIDE; virtual bool IsSpdyHttpStream() const OVERRIDE; virtual void LogNumRttVsBytesMetrics() const OVERRIDE {} + virtual void Drain(HttpNetworkSession* session) OVERRIDE; // SpdyStream::Delegate methods: virtual bool OnSendHeadersComplete(int status) OVERRIDE; |