summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorsimonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-10-19 20:14:29 +0000
committersimonjam@chromium.org <simonjam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-10-19 20:14:29 +0000
commit5a60c8bb002e4573dd0809f4a78d56b4c9720add (patch)
tree4be91c33213b0e7ab66dfed7ced978e8a16b3cdc /net
parent8053408224c37ade07452972e01de6126d074d27 (diff)
downloadchromium_src-5a60c8bb002e4573dd0809f4a78d56b4c9720add.zip
chromium_src-5a60c8bb002e4573dd0809f4a78d56b4c9720add.tar.gz
chromium_src-5a60c8bb002e4573dd0809f4a78d56b4c9720add.tar.bz2
Basic HTTP pipelining support.
This must be enabled in about:flags. It is naive and assumes all servers correctly implement pipelining. Proxies are not supported. Immediate future work: - Integration tests. - Additional NetLog logging. - Refactor HttpPipelinedConnectionImpl. Long term: - Detect broken transparent proxies. - Detect and/or mitigate broken servers. - Buffer HttpPipelinedStream. - Optimize number of pipelines and their depth. - Support proxies. BUG=8991 TEST=net_unittests Review URL: http://codereview.chromium.org/7289006 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@106364 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/base/net_error_list.h3
-rw-r--r--net/base/net_log_event_type_list.h7
-rw-r--r--net/http/http_basic_stream.cc7
-rw-r--r--net/http/http_basic_stream.h2
-rw-r--r--net/http/http_network_transaction.cc19
-rw-r--r--net/http/http_network_transaction_unittest.cc63
-rw-r--r--net/http/http_pipelined_connection.h74
-rw-r--r--net/http/http_pipelined_connection_impl.cc612
-rw-r--r--net/http/http_pipelined_connection_impl.h271
-rw-r--r--net/http/http_pipelined_connection_impl_unittest.cc1067
-rw-r--r--net/http/http_pipelined_host.cc108
-rw-r--r--net/http/http_pipelined_host.h93
-rw-r--r--net/http/http_pipelined_host_pool.cc80
-rw-r--r--net/http/http_pipelined_host_pool.h66
-rw-r--r--net/http/http_pipelined_host_unittest.cc201
-rw-r--r--net/http/http_pipelined_stream.cc142
-rw-r--r--net/http/http_pipelined_stream.h112
-rw-r--r--net/http/http_response_body_drainer_unittest.cc2
-rw-r--r--net/http/http_stream.h7
-rw-r--r--net/http/http_stream_factory.cc2
-rw-r--r--net/http/http_stream_factory.h7
-rw-r--r--net/http/http_stream_factory_impl.cc23
-rw-r--r--net/http/http_stream_factory_impl.h13
-rw-r--r--net/http/http_stream_factory_impl_job.cc105
-rw-r--r--net/http/http_stream_factory_impl_job.h8
-rw-r--r--net/http/http_stream_factory_impl_request.cc35
-rw-r--r--net/http/http_stream_factory_impl_request.h12
-rw-r--r--net/http/http_stream_parser.cc1
-rw-r--r--net/net.gyp11
-rw-r--r--net/spdy/spdy_http_stream.cc5
-rw-r--r--net/spdy/spdy_http_stream.h1
31 files changed, 3079 insertions, 80 deletions
diff --git a/net/base/net_error_list.h b/net/base/net_error_list.h
index 3097d31..ca63445 100644
--- a/net/base/net_error_list.h
+++ b/net/base/net_error_list.h
@@ -637,3 +637,6 @@ NET_ERROR(DNS_TIMED_OUT, -803)
// The entry was not found in cache, for cache-only lookups.
NET_ERROR(DNS_CACHE_MISS, -804)
+
+// FIXME: Take the next number.
+NET_ERROR(PIPELINE_EVICTION, -900)
diff --git a/net/base/net_log_event_type_list.h b/net/base/net_log_event_type_list.h
index 1547bc8..8ebca16 100644
--- a/net/base/net_log_event_type_list.h
+++ b/net/base/net_log_event_type_list.h
@@ -845,6 +845,13 @@ EVENT_TYPE(HTTP_TRANSACTION_READ_BODY)
// restarting for authentication, on keep alive connections.
EVENT_TYPE(HTTP_TRANSACTION_DRAIN_BODY_FOR_AUTH_RESTART)
+// This event is sent when we try to restart a transaction after an error.
+// The following parameters are attached:
+// {
+// "net_error": <The net error code integer for the failure>,
+// }
+EVENT_TYPE(HTTP_TRANSACTION_RESTART_AFTER_ERROR)
+
// ------------------------------------------------------------------------
// SpdySession
// ------------------------------------------------------------------------
diff --git a/net/http/http_basic_stream.cc b/net/http/http_basic_stream.cc
index eb5ecc2..43b141a 100644
--- a/net/http/http_basic_stream.cc
+++ b/net/http/http_basic_stream.cc
@@ -11,6 +11,7 @@
#include "net/base/net_errors.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_request_info.h"
+#include "net/http/http_response_body_drainer.h"
#include "net/http/http_stream_parser.h"
#include "net/http/http_util.h"
#include "net/socket/client_socket_handle.h"
@@ -130,4 +131,10 @@ void HttpBasicStream::LogNumRttVsBytesMetrics() const {
// Log rtt metrics here.
}
+void HttpBasicStream::Drain(HttpNetworkSession* session) {
+ HttpResponseBodyDrainer* drainer = new HttpResponseBodyDrainer(this);
+ drainer->Start(session);
+ // |drainer| will delete itself.
+}
+
} // namespace net
diff --git a/net/http/http_basic_stream.h b/net/http/http_basic_stream.h
index 3b38b93..8024576 100644
--- a/net/http/http_basic_stream.h
+++ b/net/http/http_basic_stream.h
@@ -83,6 +83,8 @@ class HttpBasicStream : public HttpStream {
virtual void LogNumRttVsBytesMetrics() const OVERRIDE;
+ virtual void Drain(HttpNetworkSession* session) OVERRIDE;
+
private:
scoped_refptr<GrowableIOBuffer> read_buf_;
diff --git a/net/http/http_network_transaction.cc b/net/http/http_network_transaction.cc
index 84df782..c66f0f3 100644
--- a/net/http/http_network_transaction.cc
+++ b/net/http/http_network_transaction.cc
@@ -39,7 +39,6 @@
#include "net/http/http_proxy_client_socket_pool.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_request_info.h"
-#include "net/http/http_response_body_drainer.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_response_info.h"
#include "net/http/http_server_properties.h"
@@ -137,15 +136,8 @@ HttpNetworkTransaction::~HttpNetworkTransaction() {
stream_->Close(true /* not reusable */);
} else {
// Otherwise, we try to drain the response body.
- // TODO(willchan): Consider moving this response body draining to the
- // stream implementation. For SPDY, there's clearly no point. For
- // HTTP, it can vary depending on whether or not we're pipelining. It's
- // stream dependent, so the different subtypes should be implementing
- // their solutions.
- HttpResponseBodyDrainer* drainer =
- new HttpResponseBodyDrainer(stream_.release());
- drainer->Start(session_);
- // |drainer| will delete itself.
+ HttpStream* stream = stream_.release();
+ stream->Drain(session_);
}
}
}
@@ -1198,12 +1190,19 @@ int HttpNetworkTransaction::HandleIOError(int error) {
case ERR_CONNECTION_CLOSED:
case ERR_CONNECTION_ABORTED:
if (ShouldResendRequest(error)) {
+ net_log_.AddEvent(
+ NetLog::TYPE_HTTP_TRANSACTION_RESTART_AFTER_ERROR,
+ make_scoped_refptr(new NetLogIntegerParameter("net_error", error)));
ResetConnectionAndRequestForResend();
error = OK;
}
break;
+ case ERR_PIPELINE_EVICTION:
case ERR_SPDY_PING_FAILED:
case ERR_SPDY_SERVER_REFUSED_STREAM:
+ net_log_.AddEvent(
+ NetLog::TYPE_HTTP_TRANSACTION_RESTART_AFTER_ERROR,
+ make_scoped_refptr(new NetLogIntegerParameter("net_error", error)));
ResetConnectionAndRequestForResend();
error = OK;
break;
diff --git a/net/http/http_network_transaction_unittest.cc b/net/http/http_network_transaction_unittest.cc
index 1de21e7..546935e 100644
--- a/net/http/http_network_transaction_unittest.cc
+++ b/net/http/http_network_transaction_unittest.cc
@@ -174,8 +174,8 @@ class HttpNetworkTransactionTest : public PlatformTest {
void KeepAliveConnectionResendRequestTest(const MockRead& read_failure);
- SimpleGetHelperResult SimpleGetHelper(MockRead data_reads[],
- size_t reads_count) {
+ SimpleGetHelperResult SimpleGetHelperForData(StaticSocketDataProvider* data[],
+ size_t data_count) {
SimpleGetHelperResult out;
HttpRequestInfo request;
@@ -187,8 +187,9 @@ class HttpNetworkTransactionTest : public PlatformTest {
scoped_ptr<HttpTransaction> trans(
new HttpNetworkTransaction(CreateSession(&session_deps)));
- StaticSocketDataProvider data(data_reads, reads_count, NULL, 0);
- session_deps.socket_factory.AddSocketDataProvider(&data);
+ for (size_t i = 0; i < data_count; ++i) {
+ session_deps.socket_factory.AddSocketDataProvider(data[i]);
+ }
TestOldCompletionCallback callback;
@@ -237,6 +238,13 @@ class HttpNetworkTransactionTest : public PlatformTest {
return out;
}
+ SimpleGetHelperResult SimpleGetHelper(MockRead data_reads[],
+ size_t reads_count) {
+ StaticSocketDataProvider reads(data_reads, reads_count, NULL, 0);
+ StaticSocketDataProvider* data[] = { &reads };
+ return SimpleGetHelperForData(data, 1);
+ }
+
void ConnectStatusHelperWithExpectedStatus(const MockRead& status,
int expected_status);
@@ -9245,4 +9253,51 @@ TEST_F(HttpNetworkTransactionTest,
HttpStreamFactory::set_use_alternate_protocols(false);
}
+TEST_F(HttpNetworkTransactionTest, ReadPipelineEvictionFallback) {
+ MockRead data_reads1[] = {
+ MockRead(false, ERR_PIPELINE_EVICTION),
+ };
+ MockRead data_reads2[] = {
+ MockRead("HTTP/1.0 200 OK\r\n\r\n"),
+ MockRead("hello world"),
+ MockRead(false, OK),
+ };
+ StaticSocketDataProvider data1(data_reads1, arraysize(data_reads1), NULL, 0);
+ StaticSocketDataProvider data2(data_reads2, arraysize(data_reads2), NULL, 0);
+ StaticSocketDataProvider* data[] = { &data1, &data2 };
+
+ SimpleGetHelperResult out = SimpleGetHelperForData(data, arraysize(data));
+
+ EXPECT_EQ(OK, out.rv);
+ EXPECT_EQ("HTTP/1.0 200 OK", out.status_line);
+ EXPECT_EQ("hello world", out.response_data);
+}
+
+TEST_F(HttpNetworkTransactionTest, SendPipelineEvictionFallback) {
+ MockWrite data_writes1[] = {
+ MockWrite(false, ERR_PIPELINE_EVICTION),
+ };
+ MockWrite data_writes2[] = {
+ MockWrite("GET / HTTP/1.1\r\n"
+ "Host: www.google.com\r\n"
+ "Connection: keep-alive\r\n\r\n"),
+ };
+ MockRead data_reads2[] = {
+ MockRead("HTTP/1.0 200 OK\r\n\r\n"),
+ MockRead("hello world"),
+ MockRead(false, OK),
+ };
+ StaticSocketDataProvider data1(NULL, 0,
+ data_writes1, arraysize(data_writes1));
+ StaticSocketDataProvider data2(data_reads2, arraysize(data_reads2),
+ data_writes2, arraysize(data_writes2));
+ StaticSocketDataProvider* data[] = { &data1, &data2 };
+
+ SimpleGetHelperResult out = SimpleGetHelperForData(data, arraysize(data));
+
+ EXPECT_EQ(OK, out.rv);
+ EXPECT_EQ("HTTP/1.0 200 OK", out.status_line);
+ EXPECT_EQ("hello world", out.response_data);
+}
+
} // namespace net
diff --git a/net/http/http_pipelined_connection.h b/net/http/http_pipelined_connection.h
new file mode 100644
index 0000000..59e61d6
--- /dev/null
+++ b/net/http/http_pipelined_connection.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_HTTP_HTTP_PIPELINED_CONNECTION_H_
+#define NET_HTTP_HTTP_PIPELINED_CONNECTION_H_
+#pragma once
+
+#include "net/base/net_export.h"
+#include "net/base/net_log.h"
+
+namespace net {
+
+class BoundNetLog;
+class ClientSocketHandle;
+class HttpPipelinedStream;
+class ProxyInfo;
+struct SSLConfig;
+
+class NET_EXPORT_PRIVATE HttpPipelinedConnection {
+ public:
+ class Delegate {
+ public:
+ // Called when a pipeline has newly available capacity. This may be because
+ // the first request has been sent and the pipeline is now active. Or, it
+ // may be because a request successfully completed.
+ virtual void OnPipelineHasCapacity(HttpPipelinedConnection* pipeline) = 0;
+ };
+
+ class Factory {
+ public:
+ virtual ~Factory() {}
+
+ virtual HttpPipelinedConnection* CreateNewPipeline(
+ ClientSocketHandle* connection,
+ Delegate* delegate,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated) = 0;
+ };
+
+ virtual ~HttpPipelinedConnection() {}
+
+ // Returns a new stream that uses this pipeline.
+ virtual HttpPipelinedStream* CreateNewStream() = 0;
+
+ // The number of streams currently associated with this pipeline.
+ virtual int depth() const = 0;
+
+ // True if this pipeline can accept new HTTP requests. False if a fatal error
+ // has occurred.
+ virtual bool usable() const = 0;
+
+ // True if this pipeline has bound one request and is ready for additional
+ // requests.
+ virtual bool active() const = 0;
+
+ // The SSLConfig used to establish this connection.
+ virtual const SSLConfig& used_ssl_config() const = 0;
+
+ // The ProxyInfo used to establish this connection.
+ virtual const ProxyInfo& used_proxy_info() const = 0;
+
+ // The source of this pipelined connection.
+ virtual const NetLog::Source& source() const = 0;
+
+ // True if this connection was NPN negotiated.
+ virtual bool was_npn_negotiated() const = 0;
+};
+
+} // namespace net
+
+#endif // NET_HTTP_HTTP_PIPELINED_CONNECTION_H_
diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc
new file mode 100644
index 0000000..33f2ef6
--- /dev/null
+++ b/net/http/http_pipelined_connection_impl.cc
@@ -0,0 +1,612 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_pipelined_connection_impl.h"
+
+#include "base/message_loop.h"
+#include "base/stl_util.h"
+#include "net/base/io_buffer.h"
+#include "net/http/http_pipelined_stream.h"
+#include "net/http/http_request_info.h"
+#include "net/http/http_stream_parser.h"
+#include "net/socket/client_socket_handle.h"
+
+namespace net {
+
+HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
+ ClientSocketHandle* connection,
+ HttpPipelinedConnection::Delegate* delegate,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated)
+ : delegate_(delegate),
+ connection_(connection),
+ used_ssl_config_(used_ssl_config),
+ used_proxy_info_(used_proxy_info),
+ net_log_(net_log),
+ was_npn_negotiated_(was_npn_negotiated),
+ read_buf_(new GrowableIOBuffer()),
+ next_pipeline_id_(1),
+ active_(false),
+ usable_(true),
+ completed_one_request_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
+ send_next_state_(SEND_STATE_NONE),
+ ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
+ this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
+ send_user_callback_(NULL),
+ read_next_state_(READ_STATE_NONE),
+ ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
+ this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
+ read_user_callback_(NULL) {
+ CHECK(connection_.get());
+}
+
+HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
+ CHECK_EQ(depth(), 0);
+ CHECK(stream_info_map_.empty());
+ CHECK(deferred_request_queue_.empty());
+ CHECK(request_order_.empty());
+ CHECK_EQ(send_next_state_, SEND_STATE_NONE);
+ CHECK_EQ(read_next_state_, READ_STATE_NONE);
+ CHECK(!send_user_callback_);
+ CHECK(!read_user_callback_);
+ if (!usable_) {
+ connection_->socket()->Disconnect();
+ }
+ connection_->Reset();
+}
+
+HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
+ int pipeline_id = next_pipeline_id_++;
+ CHECK(pipeline_id);
+ HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
+ stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo()));
+ return stream;
+}
+
+void HttpPipelinedConnectionImpl::InitializeParser(
+ int pipeline_id,
+ const HttpRequestInfo* request,
+ const BoundNetLog& net_log) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(!stream_info_map_[pipeline_id].parser.get());
+ stream_info_map_[pipeline_id].state = STREAM_BOUND;
+ stream_info_map_[pipeline_id].parser.reset(new HttpStreamParser(
+ connection_.get(), request, read_buf_.get(), net_log));
+
+ // In case our first stream doesn't SendRequest() immediately, we should still
+ // allow others to use this pipeline.
+ if (pipeline_id == 1) {
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::ActivatePipeline));
+ }
+}
+
+void HttpPipelinedConnectionImpl::ActivatePipeline() {
+ if (!active_) {
+ active_ = true;
+ delegate_->OnPipelineHasCapacity(this);
+ }
+}
+
+void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ Close(pipeline_id, false);
+
+ if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
+ stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
+ CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
+ CHECK(stream_info_map_[pipeline_id].parser.get());
+ stream_info_map_[pipeline_id].parser.reset();
+ }
+ CHECK(!stream_info_map_[pipeline_id].parser.get());
+ CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
+ stream_info_map_.erase(pipeline_id);
+
+ delegate_->OnPipelineHasCapacity(this);
+}
+
+int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
+ const std::string& request_line,
+ const HttpRequestHeaders& headers,
+ UploadDataStream* request_body,
+ HttpResponseInfo* response,
+ OldCompletionCallback* callback) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
+ if (!usable_) {
+ return ERR_PIPELINE_EVICTION;
+ }
+
+ DeferredSendRequest deferred_request;
+ deferred_request.pipeline_id = pipeline_id;
+ deferred_request.request_line = request_line;
+ deferred_request.headers = headers;
+ deferred_request.request_body = request_body;
+ deferred_request.response = response;
+ deferred_request.callback = callback;
+ deferred_request_queue_.push(deferred_request);
+
+ int rv;
+ if (send_next_state_ == SEND_STATE_NONE) {
+ send_next_state_ = SEND_STATE_NEXT_REQUEST;
+ rv = DoSendRequestLoop(OK);
+ } else {
+ rv = ERR_IO_PENDING;
+ }
+ ActivatePipeline();
+ return rv;
+}
+
+int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
+ int rv = result;
+ do {
+ SendRequestState state = send_next_state_;
+ send_next_state_ = SEND_STATE_NONE;
+ switch (state) {
+ case SEND_STATE_NEXT_REQUEST:
+ rv = DoSendNextRequest(rv);
+ break;
+ case SEND_STATE_COMPLETE:
+ rv = DoSendComplete(rv);
+ break;
+ case SEND_STATE_UNUSABLE:
+ rv = DoEvictPendingSendRequests(rv);
+ break;
+ default:
+ NOTREACHED() << "bad send state: " << state;
+ rv = ERR_FAILED;
+ break;
+ }
+ } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
+ return rv;
+}
+
+void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
+ CHECK(send_user_callback_);
+ DoSendRequestLoop(result);
+}
+
+int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
+ CHECK(!deferred_request_queue_.empty());
+ const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
+ CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id));
+ if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) {
+ deferred_request_queue_.pop();
+ if (deferred_request_queue_.empty()) {
+ send_next_state_ = SEND_STATE_NONE;
+ } else {
+ send_next_state_ = SEND_STATE_NEXT_REQUEST;
+ }
+ return OK;
+ }
+ CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get());
+ int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest(
+ deferred_request.request_line,
+ deferred_request.headers,
+ deferred_request.request_body,
+ deferred_request.response,
+ &send_io_callback_);
+ // |result| == ERR_IO_PENDING means this function was *not* called on the same
+ // stack as SendRequest(). That means we returned ERR_IO_PENDING to
+ // SendRequest() earlier and will need to invoke its callback.
+ if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
+ send_user_callback_ = deferred_request.callback;
+ }
+ stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING;
+ send_next_state_ = SEND_STATE_COMPLETE;
+ return rv;
+}
+
+int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
+ CHECK(!deferred_request_queue_.empty());
+ const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
+ CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state,
+ STREAM_SENDING);
+ request_order_.push(deferred_request.pipeline_id);
+ stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT;
+ deferred_request_queue_.pop();
+ if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
+ result = ERR_PIPELINE_EVICTION;
+ }
+ if (result < OK) {
+ send_next_state_ = SEND_STATE_UNUSABLE;
+ usable_ = false;
+ }
+ if (send_user_callback_) {
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::FireUserCallback,
+ send_user_callback_,
+ result));
+ send_user_callback_ = NULL;
+ }
+ if (result < OK) {
+ return result;
+ }
+ if (deferred_request_queue_.empty()) {
+ send_next_state_ = SEND_STATE_NONE;
+ return OK;
+ }
+ send_next_state_ = SEND_STATE_NEXT_REQUEST;
+ return OK;
+}
+
+int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
+ send_next_state_ = SEND_STATE_NONE;
+ while (!deferred_request_queue_.empty()) {
+ const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
+ if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
+ evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
+ }
+ deferred_request_queue_.pop();
+ }
+ return result;
+}
+
+int HttpPipelinedConnectionImpl::ReadResponseHeaders(
+ int pipeline_id,
+ OldCompletionCallback* callback) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT);
+ CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
+ if (!usable_) {
+ return ERR_PIPELINE_EVICTION;
+ }
+ stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
+ stream_info_map_[pipeline_id].read_headers_callback = callback;
+ if (read_next_state_ == READ_STATE_NONE) {
+ read_next_state_ = READ_STATE_NEXT_HEADERS;
+ return DoReadHeadersLoop(OK);
+ } else {
+ return ERR_IO_PENDING;
+ }
+}
+
+int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
+ int rv = result;
+ do {
+ ReadHeadersState state = read_next_state_;
+ read_next_state_ = READ_STATE_NONE;
+ switch (state) {
+ case READ_STATE_NEXT_HEADERS:
+ rv = DoReadNextHeaders(rv);
+ break;
+ case READ_STATE_COMPLETE:
+ rv = DoReadHeadersComplete(rv);
+ break;
+ case READ_STATE_WAITING_FOR_CLOSE:
+ rv = DoReadWaitingForClose(rv);
+ return rv;
+ case READ_STATE_STREAM_CLOSED:
+ rv = DoReadStreamClosed();
+ break;
+ case READ_STATE_UNUSABLE:
+ rv = DoEvictPendingReadHeaders(rv);
+ break;
+ case READ_STATE_NONE:
+ break;
+ default:
+ NOTREACHED() << "bad read state";
+ rv = ERR_FAILED;
+ break;
+ }
+ } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
+ return rv;
+}
+
+void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
+ CHECK(read_user_callback_);
+ DoReadHeadersLoop(result);
+}
+
+int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
+ CHECK(!request_order_.empty());
+ int pipeline_id = request_order_.front();
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) {
+ // Since nobody will read whatever data is on the pipeline associated with
+ // this closed request, we must shut down the rest of the pipeline.
+ read_next_state_ = READ_STATE_UNUSABLE;
+ return OK;
+ }
+ if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
+ return ERR_IO_PENDING;
+ }
+ CHECK(stream_info_map_[pipeline_id].parser.get());
+
+ if (result == ERR_IO_PENDING) {
+ CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE);
+ } else {
+ CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING);
+ stream_info_map_[pipeline_id].state = STREAM_ACTIVE;
+ }
+
+ int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders(
+ &read_io_callback_);
+ if (rv == ERR_IO_PENDING) {
+ read_next_state_ = READ_STATE_COMPLETE;
+ read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
+ } else if (rv < OK) {
+ read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
+ if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
+ rv = ERR_PIPELINE_EVICTION;
+ } else {
+ CHECK_LE(OK, rv);
+ read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
+ }
+
+ // |result| == ERR_IO_PENDING means this function was *not* called on the same
+ // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
+ // ReadResponseHeaders() earlier and now need to invoke its callback.
+ if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
+ read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
+ read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::FireUserCallback,
+ read_user_callback_,
+ rv));
+ }
+ return rv;
+}
+
+int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
+ read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
+ if (read_user_callback_) {
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::FireUserCallback,
+ read_user_callback_,
+ result));
+ read_user_callback_ = NULL;
+ }
+ return result;
+}
+
+int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
+ read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
+ return result;
+}
+
+int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
+ CHECK(!request_order_.empty());
+ int pipeline_id = request_order_.front();
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
+ CHECK(stream_info_map_[pipeline_id].read_headers_callback);
+ stream_info_map_[pipeline_id].read_headers_callback = NULL;
+ request_order_.pop();
+ if (!usable_) {
+ read_next_state_ = READ_STATE_UNUSABLE;
+ return OK;
+ } else {
+ completed_one_request_ = true;
+ if (!request_order_.empty()) {
+ int next_pipeline_id = request_order_.front();
+ CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
+ if (stream_info_map_[next_pipeline_id].read_headers_callback) {
+ stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
+ read_next_state_ = READ_STATE_NEXT_HEADERS;
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
+ ERR_IO_PENDING));
+ return ERR_IO_PENDING; // Wait for the task to fire.
+ }
+ }
+ read_next_state_ = READ_STATE_NONE;
+ return OK;
+ }
+}
+
+int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
+ while (!request_order_.empty()) {
+ int evicted_id = request_order_.front();
+ request_order_.pop();
+ if (!ContainsKey(stream_info_map_, evicted_id) ||
+ (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
+ continue;
+ }
+ if (stream_info_map_[evicted_id].state != STREAM_CLOSED) {
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ method_factory_.NewRunnableMethod(
+ &HttpPipelinedConnectionImpl::FireUserCallback,
+ stream_info_map_[evicted_id].read_headers_callback,
+ ERR_PIPELINE_EVICTION));
+ }
+ stream_info_map_[evicted_id].read_headers_callback = NULL;
+ }
+ read_next_state_ = READ_STATE_NONE;
+ return result;
+}
+
+void HttpPipelinedConnectionImpl::Close(int pipeline_id,
+ bool not_reusable) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ switch (stream_info_map_[pipeline_id].state) {
+ case STREAM_CREATED:
+ stream_info_map_[pipeline_id].state = STREAM_UNUSED;
+ break;
+
+ case STREAM_BOUND:
+ stream_info_map_[pipeline_id].state = STREAM_CLOSED;
+ break;
+
+ case STREAM_SENDING:
+ usable_ = false;
+ stream_info_map_[pipeline_id].state = STREAM_CLOSED;
+ send_user_callback_ = NULL;
+ send_next_state_ = SEND_STATE_UNUSABLE;
+ DoSendRequestLoop(OK);
+ break;
+
+ case STREAM_SENT:
+ case STREAM_READ_PENDING:
+ usable_ = false;
+ stream_info_map_[pipeline_id].state = STREAM_CLOSED;
+ stream_info_map_[pipeline_id].read_headers_callback = NULL;
+ if (read_next_state_ == READ_STATE_NONE) {
+ read_next_state_ = READ_STATE_UNUSABLE;
+ DoReadHeadersLoop(OK);
+ }
+ break;
+
+ case STREAM_ACTIVE:
+ stream_info_map_[pipeline_id].state = STREAM_CLOSED;
+ if (not_reusable) {
+ usable_ = false;
+ }
+ read_next_state_ = READ_STATE_STREAM_CLOSED;
+ read_user_callback_ = NULL;
+ DoReadHeadersLoop(OK);
+ break;
+
+ case STREAM_CLOSED:
+ case STREAM_UNUSED:
+ // TODO(simonjam): Why is Close() sometimes called twice?
+ break;
+
+ default:
+ NOTREACHED();
+ break;
+ }
+}
+
+int HttpPipelinedConnectionImpl::ReadResponseBody(
+ int pipeline_id,
+ IOBuffer* buf,
+ int buf_len,
+ OldCompletionCallback* callback) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(!request_order_.empty());
+ CHECK_EQ(pipeline_id, request_order_.front());
+ CHECK(stream_info_map_[pipeline_id].parser.get());
+ return stream_info_map_[pipeline_id].parser->ReadResponseBody(
+ buf, buf_len, callback);
+}
+
+uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
+ return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
+}
+
+HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
+ int pipeline_id) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
+ return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo();
+}
+
+bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
+ int pipeline_id) const {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
+ return stream_info_map_.find(pipeline_id)->second.parser->
+ IsResponseBodyComplete();
+}
+
+bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
+ return stream_info_map_.find(pipeline_id)->second.parser->
+ CanFindEndOfResponse();
+}
+
+bool HttpPipelinedConnectionImpl::IsMoreDataBuffered(int pipeline_id) const {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ return read_buf_->offset() != 0;
+}
+
+bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ if (pipeline_id > 1) {
+ return true;
+ }
+ ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
+ return connection_->is_reused() ||
+ reuse_type == ClientSocketHandle::UNUSED_IDLE;
+}
+
+void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ connection_->set_is_reused(true);
+}
+
+void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
+ SSLInfo* ssl_info) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(stream_info_map_[pipeline_id].parser.get());
+ return stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info);
+}
+
+void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
+ int pipeline_id,
+ SSLCertRequestInfo* cert_request_info) {
+ CHECK(ContainsKey(stream_info_map_, pipeline_id));
+ CHECK(stream_info_map_[pipeline_id].parser.get());
+ return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
+ cert_request_info);
+}
+
+void HttpPipelinedConnectionImpl::FireUserCallback(
+ OldCompletionCallback* callback,
+ int result) {
+ CHECK(callback);
+ callback->Run(result);
+}
+
+int HttpPipelinedConnectionImpl::depth() const {
+ return stream_info_map_.size();
+}
+
+bool HttpPipelinedConnectionImpl::usable() const {
+ return usable_;
+}
+
+bool HttpPipelinedConnectionImpl::active() const {
+ return active_;
+}
+
+const SSLConfig& HttpPipelinedConnectionImpl::used_ssl_config() const {
+ return used_ssl_config_;
+}
+
+const ProxyInfo& HttpPipelinedConnectionImpl::used_proxy_info() const {
+ return used_proxy_info_;
+}
+
+const NetLog::Source& HttpPipelinedConnectionImpl::source() const {
+ return net_log_.source();
+}
+
+bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
+ return was_npn_negotiated_;
+}
+
+HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() {
+}
+
+HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() {
+}
+
+HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
+ : read_headers_callback(NULL),
+ state(STREAM_CREATED) {
+}
+
+HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
+}
+
+} // namespace net
diff --git a/net/http/http_pipelined_connection_impl.h b/net/http/http_pipelined_connection_impl.h
new file mode 100644
index 0000000..bf39e62
--- /dev/null
+++ b/net/http/http_pipelined_connection_impl.h
@@ -0,0 +1,271 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_HTTP_HTTP_PIPELINED_CONNECTION_IMPL_H_
+#define NET_HTTP_HTTP_PIPELINED_CONNECTION_IMPL_H_
+#pragma once
+
+#include <map>
+#include <queue>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/memory/linked_ptr.h"
+#include "base/task.h"
+#include "net/base/completion_callback.h"
+#include "net/base/net_export.h"
+#include "net/base/net_log.h"
+#include "net/base/ssl_config_service.h"
+#include "net/base/upload_data_stream.h"
+#include "net/http/http_pipelined_connection.h"
+#include "net/http/http_request_info.h"
+#include "net/http/http_stream_parser.h"
+#include "net/proxy/proxy_info.h"
+
+namespace net {
+
+class ClientSocketHandle;
+class GrowableIOBuffer;
+class HttpRequestHeaders;
+class HttpResponseInfo;
+class IOBuffer;
+class SSLCertRequestInfo;
+class SSLInfo;
+
+// This class manages all of the state for a single pipelined connection. It
+// tracks the order that HTTP requests are sent and enforces that the
+// subsequent reads occur in the appropriate order.
+//
+// If an error occurs related to pipelining, ERR_PIPELINE_EVICTION will be
+// returned to the client. This indicates the client should retry the request
+// without pipelining.
+class NET_EXPORT_PRIVATE HttpPipelinedConnectionImpl
+ : public HttpPipelinedConnection {
+ public:
+ HttpPipelinedConnectionImpl(ClientSocketHandle* connection,
+ Delegate* delegate,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated);
+ virtual ~HttpPipelinedConnectionImpl();
+
+ // HttpPipelinedConnection interface.
+
+ // Used by HttpStreamFactoryImpl and friends.
+ virtual HttpPipelinedStream* CreateNewStream() OVERRIDE;
+
+ // Used by HttpPipelinedHost.
+ virtual int depth() const OVERRIDE;
+ virtual bool usable() const OVERRIDE;
+ virtual bool active() const OVERRIDE;
+
+ // Used by HttpStreamFactoryImpl.
+ virtual const SSLConfig& used_ssl_config() const OVERRIDE;
+ virtual const ProxyInfo& used_proxy_info() const OVERRIDE;
+ virtual const NetLog::Source& source() const OVERRIDE;
+ virtual bool was_npn_negotiated() const OVERRIDE;
+
+ // Used by HttpPipelinedStream.
+
+ // Notifies this pipeline that a stream is no longer using it.
+ void OnStreamDeleted(int pipeline_id);
+
+ // Effective implementation of HttpStream. Note that we don't directly
+ // implement that interface. Instead, these functions will be called by the
+ // pass-through methods in HttpPipelinedStream.
+ void InitializeParser(int pipeline_id,
+ const HttpRequestInfo* request,
+ const BoundNetLog& net_log);
+
+ int SendRequest(int pipeline_id,
+ const std::string& request_line,
+ const HttpRequestHeaders& headers,
+ UploadDataStream* request_body,
+ HttpResponseInfo* response,
+ OldCompletionCallback* callback);
+
+ int ReadResponseHeaders(int pipeline_id,
+ OldCompletionCallback* callback);
+
+ int ReadResponseBody(int pipeline_id,
+ IOBuffer* buf, int buf_len,
+ OldCompletionCallback* callback);
+
+ void Close(int pipeline_id,
+ bool not_reusable);
+
+ uint64 GetUploadProgress(int pipeline_id) const;
+
+ HttpResponseInfo* GetResponseInfo(int pipeline_id);
+
+ bool IsResponseBodyComplete(int pipeline_id) const;
+
+ bool CanFindEndOfResponse(int pipeline_id) const;
+
+ bool IsMoreDataBuffered(int pipeline_id) const;
+
+ bool IsConnectionReused(int pipeline_id) const;
+
+ void SetConnectionReused(int pipeline_id);
+
+ void GetSSLInfo(int pipeline_id,
+ SSLInfo* ssl_info);
+
+ void GetSSLCertRequestInfo(int pipeline_id,
+ SSLCertRequestInfo* cert_request_info);
+
+ private:
+ enum StreamState {
+ STREAM_CREATED,
+ STREAM_BOUND,
+ STREAM_SENDING,
+ STREAM_SENT,
+ STREAM_READ_PENDING,
+ STREAM_ACTIVE,
+ STREAM_CLOSED,
+ STREAM_UNUSED,
+ };
+ enum SendRequestState {
+ SEND_STATE_NEXT_REQUEST,
+ SEND_STATE_COMPLETE,
+ SEND_STATE_NONE,
+ SEND_STATE_UNUSABLE,
+ };
+ enum ReadHeadersState {
+ READ_STATE_NEXT_HEADERS,
+ READ_STATE_COMPLETE,
+ READ_STATE_WAITING_FOR_CLOSE,
+ READ_STATE_STREAM_CLOSED,
+ READ_STATE_NONE,
+ READ_STATE_UNUSABLE,
+ };
+
+ struct DeferredSendRequest {
+ DeferredSendRequest();
+ ~DeferredSendRequest();
+
+ int pipeline_id;
+ std::string request_line;
+ HttpRequestHeaders headers;
+ UploadDataStream* request_body;
+ HttpResponseInfo* response;
+ OldCompletionCallback* callback;
+ };
+
+ struct StreamInfo {
+ StreamInfo();
+ ~StreamInfo();
+
+ linked_ptr<HttpStreamParser> parser;
+ OldCompletionCallback* read_headers_callback;
+ StreamState state;
+ };
+
+ typedef std::map<int, StreamInfo> StreamInfoMap;
+
+ // Called after the first request is sent or in a task sometime after the
+ // first stream is added to this pipeline. This gives the first request
+ // priority to send, but doesn't hold up other requests if it doesn't.
+ // When called the first time, notifies the |delegate_| that we can accept new
+ // requests.
+ void ActivatePipeline();
+
+ // Responsible for sending one request at a time and waiting until each
+ // comepletes.
+ int DoSendRequestLoop(int result);
+
+ // Called when an asynchronous Send() completes.
+ void OnSendIOCallback(int result);
+
+ // Sends the next deferred request. This may be called immediately after
+ // SendRequest(), or it may be in a new task after a prior send completes in
+ // DoSendComplete().
+ int DoSendNextRequest(int result);
+
+ // Notifies the user that the send has completed. This may be called directly
+ // after SendRequest() for a synchronous request, or it may be called in
+ // response to OnSendIOCallback for an asynchronous request.
+ int DoSendComplete(int result);
+
+ // Evicts all unsent deferred requests. This is called if there is a Send()
+ // error or one of our streams informs us the connection is no longer
+ // reusable.
+ int DoEvictPendingSendRequests(int result);
+
+ // Ensures that only the active request's HttpPipelinedSocket can read from
+ // the underlying socket until it completes. A HttpPipelinedSocket informs us
+ // that it's done by calling Close().
+ int DoReadHeadersLoop(int result);
+
+ // Called when the pending asynchronous ReadResponseHeaders() completes.
+ void OnReadIOCallback(int result);
+
+ // Determines if the next response in the pipeline is ready to be read.
+ // If it's ready, then we call ReadResponseHeaders() on the underlying parser.
+ // HttpPipelinedSocket indicates its readiness by calling
+ // ReadResponseHeaders(). This function may be called immediately after
+ // ReadResponseHeaders(), or it may be called in a new task after a previous
+ // HttpPipelinedSocket finishes its work.
+ int DoReadNextHeaders(int result);
+
+ // Notifies the user that reading the headers has completed. This may happen
+ // directly after DoReadNextHeaders() if the response is already available.
+ // Otherwise, it is called in response to OnReadIOCallback().
+ int DoReadHeadersComplete(int result);
+
+ // This is a holding state. It does not do anything, except exit the
+ // DoReadHeadersLoop(). It is called after DoReadHeadersComplete().
+ int DoReadWaitingForClose(int result);
+
+ // Cleans up the state associated with the active request. Invokes
+ // DoReadNextHeaders() in a new task to start the next response. This is
+ // called after the active request's HttpPipelinedSocket calls Close().
+ int DoReadStreamClosed();
+
+ // Removes all pending ReadResponseHeaders() requests from the queue. This may
+ // happen if there is an error with the pipeline or one of our
+ // HttpPipelinedSockets indicates the connection was suddenly closed.
+ int DoEvictPendingReadHeaders(int result);
+
+ // Invokes the user's callback in response to SendRequest() or
+ // ReadResponseHeaders() completing on an underlying parser. This might be
+ // invoked in response to our own IO callbacks, or it may be invoked if the
+ // underlying parser completes SendRequest() or ReadResponseHeaders()
+ // synchronously, but we've already returned ERR_IO_PENDING to the user's
+ // SendRequest() or ReadResponseHeaders() call into us.
+ void FireUserCallback(OldCompletionCallback* callback, int result);
+
+ Delegate* delegate_;
+ scoped_ptr<ClientSocketHandle> connection_;
+ SSLConfig used_ssl_config_;
+ ProxyInfo used_proxy_info_;
+ BoundNetLog net_log_;
+ bool was_npn_negotiated_;
+ scoped_refptr<GrowableIOBuffer> read_buf_;
+ int next_pipeline_id_;
+ bool active_;
+ bool usable_;
+ bool completed_one_request_;
+ ScopedRunnableMethodFactory<HttpPipelinedConnectionImpl> method_factory_;
+
+ StreamInfoMap stream_info_map_;
+
+ std::queue<int> request_order_;
+
+ std::queue<DeferredSendRequest> deferred_request_queue_;
+ SendRequestState send_next_state_;
+ OldCompletionCallbackImpl<HttpPipelinedConnectionImpl> send_io_callback_;
+ OldCompletionCallback* send_user_callback_;
+
+ ReadHeadersState read_next_state_;
+ OldCompletionCallbackImpl<HttpPipelinedConnectionImpl> read_io_callback_;
+ OldCompletionCallback* read_user_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(HttpPipelinedConnectionImpl);
+};
+
+} // namespace net
+
+#endif // NET_HTTP_HTTP_PIPELINED_CONNECTION_IMPL_H_
diff --git a/net/http/http_pipelined_connection_impl_unittest.cc b/net/http/http_pipelined_connection_impl_unittest.cc
new file mode 100644
index 0000000..420d8dc
--- /dev/null
+++ b/net/http/http_pipelined_connection_impl_unittest.cc
@@ -0,0 +1,1067 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_pipelined_connection_impl.h"
+
+#include <string>
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_vector.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/request_priority.h"
+#include "net/http/http_pipelined_stream.h"
+#include "net/socket/client_socket_handle.h"
+#include "net/socket/client_socket_pool_histograms.h"
+#include "net/socket/socket_test_util.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::NiceMock;
+using testing::StrEq;
+
+namespace net {
+
+class DummySocketParams : public base::RefCounted<DummySocketParams> {
+ private:
+ friend class base::RefCounted<DummySocketParams>;
+};
+
+REGISTER_SOCKET_PARAMS_FOR_POOL(MockTransportClientSocketPool,
+ DummySocketParams);
+
+class MockPipelineDelegate : public HttpPipelinedConnectionImpl::Delegate {
+ public:
+ MOCK_METHOD1(OnPipelineHasCapacity, void(HttpPipelinedConnection* pipeline));
+};
+
+class SuddenCloseObserver : public MessageLoop::TaskObserver {
+ public:
+ SuddenCloseObserver(HttpStream* stream, int close_before_task)
+ : stream_(stream),
+ close_before_task_(close_before_task),
+ current_task_(0) { }
+
+ virtual void WillProcessTask(base::TimeTicks) OVERRIDE {
+ ++current_task_;
+ if (current_task_ == close_before_task_) {
+ stream_->Close(false);
+ MessageLoop::current()->RemoveTaskObserver(this);
+ }
+ }
+
+ virtual void DidProcessTask(base::TimeTicks) OVERRIDE { }
+
+ private:
+ HttpStream* stream_;
+ int close_before_task_;
+ int current_task_;
+};
+
+class HttpPipelinedConnectionImplTest : public testing::Test {
+ public:
+ HttpPipelinedConnectionImplTest()
+ : histograms_("a"),
+ pool_(1, 1, &histograms_, &factory_) {
+ }
+
+ void TearDown() {
+ MessageLoop::current()->RunAllPending();
+ }
+
+ void Initialize(MockRead* reads, size_t reads_count,
+ MockWrite* writes, size_t writes_count) {
+ data_ = new DeterministicSocketData(reads, reads_count,
+ writes, writes_count);
+ data_->set_connect_data(MockConnect(false, 0));
+ if (reads_count || writes_count) {
+ data_->StopAfter(reads_count + writes_count);
+ }
+ factory_.AddSocketDataProvider(data_.get());
+ scoped_refptr<DummySocketParams> params;
+ ClientSocketHandle* connection = new ClientSocketHandle;
+ connection->Init("a", params, MEDIUM, NULL, &pool_, BoundNetLog());
+ pipeline_.reset(new HttpPipelinedConnectionImpl(connection, &delegate_,
+ ssl_config_, proxy_info_,
+ BoundNetLog(), false));
+ }
+
+ HttpRequestInfo* GetRequestInfo(const std::string& filename) {
+ HttpRequestInfo* request_info = new HttpRequestInfo;
+ request_info->url = GURL("http://localhost/" + filename);
+ request_info->method = "GET";
+ request_info_vector_.push_back(request_info);
+ return request_info;
+ }
+
+ HttpStream* NewTestStream(const std::string& filename) {
+ HttpStream* stream = pipeline_->CreateNewStream();
+ HttpRequestInfo* request_info = GetRequestInfo(filename);
+ int rv = stream->InitializeStream(request_info, BoundNetLog(), NULL);
+ DCHECK_EQ(OK, rv);
+ return stream;
+ }
+
+ void ExpectResponse(const std::string& expected,
+ scoped_ptr<HttpStream>& stream, bool async) {
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(expected.size()));
+
+ if (async) {
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream->ReadResponseBody(buffer.get(), expected.size(),
+ &callback_));
+ data_->RunFor(1);
+ EXPECT_EQ(static_cast<int>(expected.size()), callback_.WaitForResult());
+ } else {
+ EXPECT_EQ(static_cast<int>(expected.size()),
+ stream->ReadResponseBody(buffer.get(), expected.size(),
+ &callback_));
+ }
+ std::string actual(buffer->data(), expected.size());
+ EXPECT_THAT(actual, StrEq(expected));
+ }
+
+ void TestSyncRequest(scoped_ptr<HttpStream>& stream,
+ const std::string& filename) {
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK, stream->ReadResponseHeaders(&callback_));
+ ExpectResponse(filename, stream, false);
+
+ stream->Close(false);
+ }
+
+ DeterministicMockClientSocketFactory factory_;
+ ClientSocketPoolHistograms histograms_;
+ MockTransportClientSocketPool pool_;
+ scoped_refptr<DeterministicSocketData> data_;
+
+ SSLConfig ssl_config_;
+ ProxyInfo proxy_info_;
+ NiceMock<MockPipelineDelegate> delegate_;
+ TestOldCompletionCallback callback_;
+ scoped_ptr<HttpPipelinedConnectionImpl> pipeline_;
+ ScopedVector<HttpRequestInfo> request_info_vector_;
+};
+
+TEST_F(HttpPipelinedConnectionImplTest, PipelineNotUsed) {
+ Initialize(NULL, 0, NULL, 0);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, StreamNotUsed) {
+ Initialize(NULL, 0, NULL, 0);
+
+ scoped_ptr<HttpStream> stream(pipeline_->CreateNewStream());
+
+ stream->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, StreamBoundButNotUsed) {
+ Initialize(NULL, 0, NULL, 0);
+
+ scoped_ptr<HttpStream> stream(NewTestStream("ok.html"));
+
+ stream->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, SyncSingleRequest) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 3, "ok.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream(NewTestStream("ok.html"));
+ TestSyncRequest(stream, "ok.html");
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, AsyncSingleRequest) {
+ MockWrite writes[] = {
+ MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(true, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(true, 3, "ok.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream(NewTestStream("ok.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream->SendRequest(headers, NULL, &response, &callback_));
+ data_->RunFor(1);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream->ReadResponseHeaders(&callback_));
+ data_->RunFor(2);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ ExpectResponse("ok.html", stream, true);
+
+ stream->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, LockStepAsyncRequests) {
+ MockWrite writes[] = {
+ MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(true, 1, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(true, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 3, "Content-Length: 7\r\n\r\n"),
+ MockRead(true, 4, "ok.html"),
+ MockRead(true, 5, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 6, "Content-Length: 7\r\n\r\n"),
+ MockRead(true, 7, "ko.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream1(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> stream2(NewTestStream("ko.html"));
+
+ HttpRequestHeaders headers1;
+ HttpResponseInfo response1;
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream1->SendRequest(headers1, NULL, &response1, &callback_));
+
+ HttpRequestHeaders headers2;
+ HttpResponseInfo response2;
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream2->SendRequest(headers2, NULL, &response2, &callback_));
+
+ data_->RunFor(1);
+ EXPECT_LE(OK, callback_.WaitForResult());
+ data_->RunFor(1);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream1->ReadResponseHeaders(&callback_));
+ EXPECT_EQ(ERR_IO_PENDING, stream2->ReadResponseHeaders(&callback_));
+
+ data_->RunFor(2);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ ExpectResponse("ok.html", stream1, true);
+
+ stream1->Close(false);
+
+ data_->RunFor(2);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ ExpectResponse("ko.html", stream2, true);
+
+ stream2->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, TwoResponsesInOnePacket) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2,
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Length: 7\r\n\r\n"
+ "ok.html"
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Length: 7\r\n\r\n"
+ "ko.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream1(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> stream2(NewTestStream("ko.html"));
+
+ HttpRequestHeaders headers1;
+ HttpResponseInfo response1;
+ EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_));
+ HttpRequestHeaders headers2;
+ HttpResponseInfo response2;
+ EXPECT_EQ(OK, stream2->SendRequest(headers2, NULL, &response2, &callback_));
+
+ EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", stream1, false);
+ stream1->Close(false);
+
+ EXPECT_EQ(OK, stream2->ReadResponseHeaders(&callback_));
+ ExpectResponse("ko.html", stream2, false);
+ stream2->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, SendOrderSwapped) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 4, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 3, "ko.html"),
+ MockRead(false, 5, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 6, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 7, "ok.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream1(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> stream2(NewTestStream("ko.html"));
+
+ TestSyncRequest(stream2, "ko.html");
+ TestSyncRequest(stream1, "ok.html");
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, ReadOrderSwapped) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 3, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ MockRead(false, 5, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 6, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 7, "ko.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream1(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> stream2(NewTestStream("ko.html"));
+
+ HttpRequestHeaders headers1;
+ HttpResponseInfo response1;
+ EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_));
+
+ HttpRequestHeaders headers2;
+ HttpResponseInfo response2;
+ EXPECT_EQ(OK, stream2->SendRequest(headers2, NULL, &response2, &callback_));
+
+ EXPECT_EQ(ERR_IO_PENDING, stream2->ReadResponseHeaders(&callback_));
+
+ EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", stream1, false);
+
+ stream1->Close(false);
+
+ EXPECT_LE(OK, callback_.WaitForResult());
+ ExpectResponse("ko.html", stream2, false);
+
+ stream2->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, SendWhileReading) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 3, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ MockRead(false, 5, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 6, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 7, "ko.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream1(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> stream2(NewTestStream("ko.html"));
+
+ HttpRequestHeaders headers1;
+ HttpResponseInfo response1;
+ EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_));
+ EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_));
+
+ HttpRequestHeaders headers2;
+ HttpResponseInfo response2;
+ EXPECT_EQ(OK, stream2->SendRequest(headers2, NULL, &response2, &callback_));
+
+ ExpectResponse("ok.html", stream1, false);
+ stream1->Close(false);
+
+ EXPECT_EQ(OK, stream2->ReadResponseHeaders(&callback_));
+ ExpectResponse("ko.html", stream2, false);
+ stream2->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, AsyncSendWhileAsyncReadBlocked) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(true, 3, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(true, 4, "ok.html"),
+ MockRead(false, 5, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 6, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 7, "ko.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream1(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> stream2(NewTestStream("ko.html"));
+
+ HttpRequestHeaders headers1;
+ HttpResponseInfo response1;
+ EXPECT_EQ(OK, stream1->SendRequest(headers1, NULL, &response1, &callback_));
+ EXPECT_EQ(OK, stream1->ReadResponseHeaders(&callback_));
+ TestOldCompletionCallback callback1;
+ std::string expected = "ok.html";
+ scoped_refptr<IOBuffer> buffer(new IOBuffer(expected.size()));
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream1->ReadResponseBody(buffer.get(), expected.size(),
+ &callback1));
+
+ HttpRequestHeaders headers2;
+ HttpResponseInfo response2;
+ TestOldCompletionCallback callback2;
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream2->SendRequest(headers2, NULL, &response2, &callback2));
+
+ data_->RunFor(1);
+ EXPECT_LE(OK, callback2.WaitForResult());
+ EXPECT_EQ(ERR_IO_PENDING, stream2->ReadResponseHeaders(&callback2));
+
+ data_->RunFor(1);
+ EXPECT_EQ(static_cast<int>(expected.size()), callback1.WaitForResult());
+ std::string actual(buffer->data(), expected.size());
+ EXPECT_THAT(actual, StrEq(expected));
+ stream1->Close(false);
+
+ data_->StopAfter(8);
+ EXPECT_LE(OK, callback2.WaitForResult());
+ ExpectResponse("ko.html", stream2, false);
+ stream2->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, UnusedStreamAllowsLaterUse) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 3, "ok.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> unused_stream(NewTestStream("unused.html"));
+ unused_stream->Close(false);
+
+ scoped_ptr<HttpStream> later_stream(NewTestStream("ok.html"));
+ TestSyncRequest(later_stream, "ok.html");
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, UnsentStreamAllowsLaterUse) {
+ MockWrite writes[] = {
+ MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 4, "GET /ko.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(true, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 2, "Content-Length: 7\r\n\r\n"),
+ MockRead(true, 3, "ok.html"),
+ MockRead(false, 5, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 6, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 7, "ko.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> stream(NewTestStream("ok.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(ERR_IO_PENDING,
+ stream->SendRequest(headers, NULL, &response, &callback_));
+
+ scoped_ptr<HttpStream> unsent_stream(NewTestStream("unsent.html"));
+ HttpRequestHeaders unsent_headers;
+ HttpResponseInfo unsent_response;
+ EXPECT_EQ(ERR_IO_PENDING,
+ unsent_stream->SendRequest(unsent_headers, NULL, &unsent_response,
+ &callback_));
+ unsent_stream->Close(false);
+
+ data_->RunFor(1);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream->ReadResponseHeaders(&callback_));
+ data_->RunFor(2);
+ EXPECT_LE(OK, callback_.WaitForResult());
+
+ ExpectResponse("ok.html", stream, true);
+
+ stream->Close(false);
+
+ data_->StopAfter(8);
+ scoped_ptr<HttpStream> later_stream(NewTestStream("ko.html"));
+ TestSyncRequest(later_stream, "ko.html");
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, FailedSend) {
+ MockWrite writes[] = {
+ MockWrite(true, ERR_FAILED),
+ };
+ Initialize(NULL, 0, writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> failed_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+ scoped_ptr<HttpStream> closed_stream(NewTestStream("closed.html"));
+ scoped_ptr<HttpStream> rejected_stream(NewTestStream("rejected.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ TestOldCompletionCallback failed_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ failed_stream->SendRequest(headers, NULL, &response,
+ &failed_callback));
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->SendRequest(headers, NULL, &response,
+ &evicted_callback));
+ EXPECT_EQ(ERR_IO_PENDING,
+ closed_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ closed_stream->Close(false);
+
+ data_->RunFor(1);
+ EXPECT_EQ(ERR_FAILED, failed_callback.WaitForResult());
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult());
+ EXPECT_EQ(ERR_PIPELINE_EVICTION,
+ rejected_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+
+ failed_stream->Close(true);
+ evicted_stream->Close(true);
+ rejected_stream->Close(true);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, ConnectionSuddenlyClosedAfterResponse) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /read_evicted.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 2, "GET /read_rejected.html HTTP/1.1\r\n\r\n"),
+ MockWrite(true, ERR_SOCKET_NOT_CONNECTED, 5),
+ };
+ MockRead reads[] = {
+ MockRead(false, 3, "HTTP/1.1 200 OK\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> closed_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> read_evicted_stream(
+ NewTestStream("read_evicted.html"));
+ scoped_ptr<HttpStream> read_rejected_stream(
+ NewTestStream("read_rejected.html"));
+ scoped_ptr<HttpStream> send_closed_stream(
+ NewTestStream("send_closed.html"));
+ scoped_ptr<HttpStream> send_evicted_stream(
+ NewTestStream("send_evicted.html"));
+ scoped_ptr<HttpStream> send_rejected_stream(
+ NewTestStream("send_rejected.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, closed_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ EXPECT_EQ(OK, read_evicted_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ EXPECT_EQ(OK, read_rejected_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ TestOldCompletionCallback send_closed_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ send_closed_stream->SendRequest(headers, NULL, &response,
+ &send_closed_callback));
+ TestOldCompletionCallback send_evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ send_evicted_stream->SendRequest(headers, NULL, &response,
+ &send_evicted_callback));
+
+ TestOldCompletionCallback read_evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ read_evicted_stream->ReadResponseHeaders(&read_evicted_callback));
+
+ EXPECT_EQ(OK, closed_stream->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", closed_stream, false);
+ closed_stream->Close(true);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, read_evicted_callback.WaitForResult());
+ read_evicted_stream->Close(true);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION,
+ read_rejected_stream->ReadResponseHeaders(&callback_));
+ read_rejected_stream->Close(true);
+
+ data_->RunFor(1);
+ EXPECT_EQ(ERR_SOCKET_NOT_CONNECTED, send_closed_callback.WaitForResult());
+ send_closed_stream->Close(true);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, send_evicted_callback.WaitForResult());
+ send_evicted_stream->Close(true);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION,
+ send_rejected_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ send_rejected_stream->Close(true);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, AbortWhileSending) {
+ MockWrite writes[] = {
+ MockWrite(true, 0, "GET /aborts.html HTTP/1.1\r\n\r\n"),
+ };
+ Initialize(NULL, 0, writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> aborted_stream(NewTestStream("aborts.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ TestOldCompletionCallback aborted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ aborted_stream->SendRequest(headers, NULL, &response,
+ &aborted_callback));
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->SendRequest(headers, NULL, &response,
+ &evicted_callback));
+
+ aborted_stream->Close(true);
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult());
+ evicted_stream->Close(true);
+ EXPECT_FALSE(aborted_callback.have_result());
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, AbortWhileSendingSecondRequest) {
+ MockWrite writes[] = {
+ MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(true, 1, "GET /aborts.html HTTP/1.1\r\n\r\n"),
+ };
+ Initialize(NULL, 0, writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> aborted_stream(NewTestStream("aborts.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ TestOldCompletionCallback ok_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ ok_stream->SendRequest(headers, NULL, &response,
+ &ok_callback));
+ TestOldCompletionCallback aborted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ aborted_stream->SendRequest(headers, NULL, &response,
+ &aborted_callback));
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->SendRequest(headers, NULL, &response,
+ &evicted_callback));
+
+ data_->RunFor(1);
+ EXPECT_LE(OK, ok_callback.WaitForResult());
+ MessageLoop::current()->RunAllPending();
+ aborted_stream->Close(true);
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult());
+ evicted_stream->Close(true);
+ EXPECT_FALSE(aborted_callback.have_result());
+ ok_stream->Close(true);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, AbortWhileReadingHeaders) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /aborts.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(true, ERR_FAILED, 2),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> aborted_stream(NewTestStream("aborts.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+ scoped_ptr<HttpStream> rejected_stream(NewTestStream("rejected.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, aborted_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ EXPECT_EQ(OK, evicted_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+
+ EXPECT_EQ(ERR_IO_PENDING, aborted_stream->ReadResponseHeaders(&callback_));
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->ReadResponseHeaders(&evicted_callback));
+
+ aborted_stream->Close(true);
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult());
+ evicted_stream->Close(true);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION,
+ rejected_stream->SendRequest(headers, NULL, &response, &callback_));
+ rejected_stream->Close(true);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, PendingResponseAbandoned) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /abandoned.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 2, "GET /evicted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 3, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 4, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 5, "ok.html"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> abandoned_stream(NewTestStream("abandoned.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK, abandoned_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+ EXPECT_EQ(OK, evicted_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+
+ EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_));
+ TestOldCompletionCallback abandoned_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ abandoned_stream->ReadResponseHeaders(&abandoned_callback));
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->ReadResponseHeaders(&evicted_callback));
+
+ abandoned_stream->Close(false);
+
+ ExpectResponse("ok.html", ok_stream, false);
+ ok_stream->Close(false);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult());
+ evicted_stream->Close(true);
+ EXPECT_FALSE(evicted_stream->IsConnectionReusable());
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, DisconnectedAfterOneRequestRecovery) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /rejected.html HTTP/1.1\r\n\r\n"),
+ MockWrite(true, ERR_SOCKET_NOT_CONNECTED, 5),
+ MockWrite(false, ERR_SOCKET_NOT_CONNECTED, 7),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 3, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ MockRead(false, ERR_SOCKET_NOT_CONNECTED, 6),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> rejected_read_stream(NewTestStream("rejected.html"));
+ scoped_ptr<HttpStream> evicted_send_stream(NewTestStream("evicted.html"));
+ scoped_ptr<HttpStream> rejected_send_stream(NewTestStream("rejected.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK, rejected_read_stream->SendRequest(
+ headers, NULL, &response, &callback_));
+
+ EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", ok_stream, false);
+ ok_stream->Close(false);
+
+ TestOldCompletionCallback read_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_send_stream->SendRequest(headers, NULL, &response,
+ &read_callback));
+ data_->RunFor(1);
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, read_callback.WaitForResult());
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION,
+ rejected_read_stream->ReadResponseHeaders(&callback_));
+ EXPECT_EQ(ERR_PIPELINE_EVICTION,
+ rejected_send_stream->SendRequest(headers, NULL, &response,
+ &callback_));
+
+ rejected_read_stream->Close(true);
+ rejected_send_stream->Close(true);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, DisconnectedPendingReadRecovery) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 3, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ MockRead(false, ERR_SOCKET_NOT_CONNECTED, 5),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK, evicted_stream->SendRequest(
+ headers, NULL, &response, &callback_));
+
+ EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", ok_stream, false);
+
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->ReadResponseHeaders(&evicted_callback));
+
+ ok_stream->Close(false);
+
+ EXPECT_EQ(ERR_PIPELINE_EVICTION, evicted_callback.WaitForResult());
+ evicted_stream->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, CloseCalledBeforeNextReadLoop) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 3, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ MockRead(false, ERR_SOCKET_NOT_CONNECTED, 5),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK, evicted_stream->SendRequest(
+ headers, NULL, &response, &callback_));
+
+ EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", ok_stream, false);
+
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->ReadResponseHeaders(&evicted_callback));
+
+ ok_stream->Close(false);
+ evicted_stream->Close(false);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, CloseCalledBeforeReadCallback) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(false, 3, "Content-Length: 7\r\n\r\n"),
+ MockRead(false, 4, "ok.html"),
+ MockRead(false, ERR_SOCKET_NOT_CONNECTED, 5),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> ok_stream(NewTestStream("ok.html"));
+ scoped_ptr<HttpStream> evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, ok_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK, evicted_stream->SendRequest(
+ headers, NULL, &response, &callback_));
+
+ EXPECT_EQ(OK, ok_stream->ReadResponseHeaders(&callback_));
+ ExpectResponse("ok.html", ok_stream, false);
+
+ TestOldCompletionCallback evicted_callback;
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->ReadResponseHeaders(&evicted_callback));
+
+ ok_stream->Close(false);
+
+ // The posted tasks should be:
+ // 1. DoReadHeadersLoop, which will post:
+ // 2. InvokeUserCallback
+ SuddenCloseObserver observer(evicted_stream.get(), 2);
+ MessageLoop::current()->AddTaskObserver(&observer);
+ MessageLoop::current()->RunAllPending();
+ EXPECT_FALSE(evicted_callback.have_result());
+}
+
+class StreamDeleter {
+ public:
+ StreamDeleter(HttpStream* stream) :
+ stream_(stream),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ callback_(this, &StreamDeleter::OnIOComplete)) {
+ }
+
+ OldCompletionCallbackImpl<StreamDeleter>* callback() { return &callback_; }
+
+ private:
+ void OnIOComplete(int result) {
+ delete stream_;
+ }
+
+ HttpStream* stream_;
+ OldCompletionCallbackImpl<StreamDeleter> callback_;
+};
+
+TEST_F(HttpPipelinedConnectionImplTest, CloseCalledDuringSendCallback) {
+ MockWrite writes[] = {
+ MockWrite(true, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ Initialize(NULL, 0, writes, arraysize(writes));
+
+ HttpStream* stream(NewTestStream("ok.html"));
+
+ StreamDeleter deleter(stream);
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(headers, NULL, &response,
+ deleter.callback()));
+ data_->RunFor(1);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, CloseCalledDuringReadCallback) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 1, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 2, "Content-Length: 7\r\n\r\n"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ HttpStream* stream(NewTestStream("ok.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, stream->SendRequest(headers, NULL, &response, &callback_));
+
+ StreamDeleter deleter(stream);
+ EXPECT_EQ(ERR_IO_PENDING, stream->ReadResponseHeaders(deleter.callback()));
+ data_->RunFor(1);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest,
+ CloseCalledDuringReadCallbackWithPendingRead) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /failed.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /evicted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 3, "Content-Length: 7\r\n\r\n"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ HttpStream* failed_stream(NewTestStream("failed.html"));
+ HttpStream* evicted_stream(NewTestStream("evicted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK,
+ failed_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK,
+ evicted_stream->SendRequest(headers, NULL, &response, &callback_));
+
+ StreamDeleter failed_deleter(failed_stream);
+ EXPECT_EQ(ERR_IO_PENDING,
+ failed_stream->ReadResponseHeaders(failed_deleter.callback()));
+ StreamDeleter evicted_deleter(evicted_stream);
+ EXPECT_EQ(ERR_IO_PENDING,
+ evicted_stream->ReadResponseHeaders(evicted_deleter.callback()));
+ data_->RunFor(1);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, CloseOtherDuringReadCallback) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /deleter.html HTTP/1.1\r\n\r\n"),
+ MockWrite(false, 1, "GET /deleted.html HTTP/1.1\r\n\r\n"),
+ };
+ MockRead reads[] = {
+ MockRead(false, 2, "HTTP/1.1 200 OK\r\n"),
+ MockRead(true, 3, "Content-Length: 7\r\n\r\n"),
+ };
+ Initialize(reads, arraysize(reads), writes, arraysize(writes));
+
+ scoped_ptr<HttpStream> deleter_stream(NewTestStream("deleter.html"));
+ HttpStream* deleted_stream(NewTestStream("deleted.html"));
+
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK,
+ deleter_stream->SendRequest(headers, NULL, &response, &callback_));
+ EXPECT_EQ(OK,
+ deleted_stream->SendRequest(headers, NULL, &response, &callback_));
+
+ StreamDeleter deleter(deleted_stream);
+ EXPECT_EQ(ERR_IO_PENDING,
+ deleter_stream->ReadResponseHeaders(deleter.callback()));
+ EXPECT_EQ(ERR_IO_PENDING, deleted_stream->ReadResponseHeaders(&callback_));
+ data_->RunFor(1);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, OnPipelineHasCapacity) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ Initialize(NULL, 0, writes, arraysize(writes));
+
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(0);
+ scoped_ptr<HttpStream> stream(NewTestStream("ok.html"));
+
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1);
+ HttpRequestHeaders headers;
+ HttpResponseInfo response;
+ EXPECT_EQ(OK, stream->SendRequest(headers, NULL, &response, &callback_));
+
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(0);
+ MessageLoop::current()->RunAllPending();
+
+ stream->Close(false);
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1);
+ stream.reset(NULL);
+}
+
+TEST_F(HttpPipelinedConnectionImplTest, OnPipelineHasCapacityWithoutSend) {
+ MockWrite writes[] = {
+ MockWrite(false, 0, "GET /ok.html HTTP/1.1\r\n\r\n"),
+ };
+ Initialize(NULL, 0, writes, arraysize(writes));
+
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(0);
+ scoped_ptr<HttpStream> stream(NewTestStream("ok.html"));
+
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1);
+ MessageLoop::current()->RunAllPending();
+
+ stream->Close(false);
+ EXPECT_CALL(delegate_, OnPipelineHasCapacity(pipeline_.get())).Times(1);
+ stream.reset(NULL);
+}
+
+} // namespace net
diff --git a/net/http/http_pipelined_host.cc b/net/http/http_pipelined_host.cc
new file mode 100644
index 0000000..32d023d
--- /dev/null
+++ b/net/http/http_pipelined_host.cc
@@ -0,0 +1,108 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_pipelined_host.h"
+
+#include "base/stl_util.h"
+#include "net/http/http_pipelined_connection_impl.h"
+#include "net/http/http_pipelined_stream.h"
+
+namespace net {
+
+class HttpPipelinedConnectionImplFactory :
+ public HttpPipelinedConnection::Factory {
+ public:
+ HttpPipelinedConnection* CreateNewPipeline(
+ ClientSocketHandle* connection,
+ HttpPipelinedConnection::Delegate* delegate,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated) OVERRIDE {
+ return new HttpPipelinedConnectionImpl(connection, delegate,
+ used_ssl_config, used_proxy_info,
+ net_log, was_npn_negotiated);
+ }
+};
+
+HttpPipelinedHost::HttpPipelinedHost(
+ HttpPipelinedHost::Delegate* delegate,
+ const HostPortPair& origin,
+ HttpPipelinedConnection::Factory* factory)
+ : delegate_(delegate),
+ origin_(origin),
+ factory_(factory) {
+ if (!factory) {
+ factory_.reset(new HttpPipelinedConnectionImplFactory());
+ }
+}
+
+HttpPipelinedHost::~HttpPipelinedHost() {
+ CHECK(pipelines_.empty());
+}
+
+HttpPipelinedStream* HttpPipelinedHost::CreateStreamOnNewPipeline(
+ ClientSocketHandle* connection,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated) {
+ HttpPipelinedConnection* pipeline = factory_->CreateNewPipeline(
+ connection, this, used_ssl_config, used_proxy_info, net_log,
+ was_npn_negotiated);
+ pipelines_.insert(pipeline);
+ return pipeline->CreateNewStream();
+}
+
+HttpPipelinedStream* HttpPipelinedHost::CreateStreamOnExistingPipeline() {
+ HttpPipelinedConnection* available_pipeline = NULL;
+ for (std::set<HttpPipelinedConnection*>::iterator it = pipelines_.begin();
+ it != pipelines_.end(); ++it) {
+ if ((*it)->usable() &&
+ (*it)->active() &&
+ (*it)->depth() < max_pipeline_depth() &&
+ (!available_pipeline || (*it)->depth() < available_pipeline->depth())) {
+ available_pipeline = *it;
+ }
+ }
+ if (!available_pipeline) {
+ return NULL;
+ }
+ return available_pipeline->CreateNewStream();
+}
+
+bool HttpPipelinedHost::IsExistingPipelineAvailable() {
+ for (std::set<HttpPipelinedConnection*>::iterator it = pipelines_.begin();
+ it != pipelines_.end(); ++it) {
+ if ((*it)->usable() &&
+ (*it)->active() &&
+ (*it)->depth() < max_pipeline_depth()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void HttpPipelinedHost::OnPipelineEmpty(HttpPipelinedConnection* pipeline) {
+ CHECK(ContainsKey(pipelines_, pipeline));
+ pipelines_.erase(pipeline);
+ delete pipeline;
+ if (pipelines_.empty()) {
+ delegate_->OnHostIdle(this);
+ // WARNING: We'll probably be deleted here.
+ }
+}
+
+void HttpPipelinedHost::OnPipelineHasCapacity(
+ HttpPipelinedConnection* pipeline) {
+ if (pipeline->usable() && pipeline->depth() < max_pipeline_depth()) {
+ delegate_->OnHostHasAdditionalCapacity(this);
+ }
+ if (!pipeline->depth()) {
+ OnPipelineEmpty(pipeline);
+ // WARNING: We might be deleted here.
+ }
+}
+
+} // namespace net
diff --git a/net/http/http_pipelined_host.h b/net/http/http_pipelined_host.h
new file mode 100644
index 0000000..ae3128d
--- /dev/null
+++ b/net/http/http_pipelined_host.h
@@ -0,0 +1,93 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_HTTP_HTTP_PIPELINED_HOST_H_
+#define NET_HTTP_HTTP_PIPELINED_HOST_H_
+#pragma once
+
+#include <set>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "net/base/host_port_pair.h"
+#include "net/base/net_export.h"
+#include "net/http/http_pipelined_connection.h"
+
+namespace net {
+
+class BoundNetLog;
+class ClientSocketHandle;
+class HttpPipelinedStream;
+class ProxyInfo;
+struct SSLConfig;
+
+// Manages all of the pipelining state for specific host with active pipelined
+// HTTP requests. Manages connection jobs, constructs pipelined streams, and
+// assigns requests to the least loaded pipelined connection.
+class NET_EXPORT_PRIVATE HttpPipelinedHost
+ : public HttpPipelinedConnection::Delegate {
+ public:
+ class Delegate {
+ public:
+ // Called when a pipelined host has no outstanding requests on any of its
+ // pipelined connections.
+ virtual void OnHostIdle(HttpPipelinedHost* host) = 0;
+
+ // Called when a pipelined host has newly available pipeline capacity, like
+ // when a request completes.
+ virtual void OnHostHasAdditionalCapacity(HttpPipelinedHost* host) = 0;
+ };
+
+ HttpPipelinedHost(Delegate* delegate, const HostPortPair& origin,
+ HttpPipelinedConnection::Factory* factory);
+ virtual ~HttpPipelinedHost();
+
+ // Constructs a new pipeline on |connection| and returns a new
+ // HttpPipelinedStream that uses it.
+ HttpPipelinedStream* CreateStreamOnNewPipeline(
+ ClientSocketHandle* connection,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated);
+
+ // Tries to find an existing pipeline with capacity for a new request. If
+ // successful, returns a new stream on that pipeline. Otherwise, returns NULL.
+ HttpPipelinedStream* CreateStreamOnExistingPipeline();
+
+ // Returns true if we have a pipelined connection that can accept new
+ // requests.
+ bool IsExistingPipelineAvailable();
+
+ // Callbacks for HttpPipelinedConnection.
+
+ // Called when a pipelined connection completes a request. Adds a pending
+ // request to the pipeline if the pipeline is still usable.
+ virtual void OnPipelineHasCapacity(
+ HttpPipelinedConnection* pipeline) OVERRIDE;
+
+ const HostPortPair& origin() const { return origin_; }
+
+ private:
+ // Called when a pipeline is empty and there are no pending requests. Closes
+ // the connection.
+ void OnPipelineEmpty(HttpPipelinedConnection* pipeline);
+
+ // Adds the next pending request to the pipeline if it's still usuable.
+ void AddRequestToPipeline(HttpPipelinedConnection* connection);
+
+ int max_pipeline_depth() const { return 3; }
+
+ Delegate* delegate_;
+ const HostPortPair origin_;
+ std::set<HttpPipelinedConnection*> pipelines_;
+ scoped_ptr<HttpPipelinedConnection::Factory> factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(HttpPipelinedHost);
+};
+
+} // namespace net
+
+#endif // NET_HTTP_HTTP_PIPELINED_HOST_H_
diff --git a/net/http/http_pipelined_host_pool.cc b/net/http/http_pipelined_host_pool.cc
new file mode 100644
index 0000000..bd238d8
--- /dev/null
+++ b/net/http/http_pipelined_host_pool.cc
@@ -0,0 +1,80 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_pipelined_host_pool.h"
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "net/http/http_pipelined_host.h"
+#include "net/http/http_stream_factory_impl.h"
+
+namespace net {
+
+HttpPipelinedHostPool::HttpPipelinedHostPool(HttpStreamFactoryImpl* factory)
+ : factory_(factory) {
+}
+
+HttpPipelinedHostPool::~HttpPipelinedHostPool() {
+ CHECK(host_map_.empty());
+}
+
+HttpPipelinedStream* HttpPipelinedHostPool::CreateStreamOnNewPipeline(
+ const HostPortPair& origin,
+ ClientSocketHandle* connection,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated) {
+ HttpPipelinedHost* host = GetPipelinedHost(origin, true);
+ return host->CreateStreamOnNewPipeline(connection, used_ssl_config,
+ used_proxy_info, net_log,
+ was_npn_negotiated);
+}
+
+HttpPipelinedStream* HttpPipelinedHostPool::CreateStreamOnExistingPipeline(
+ const HostPortPair& origin) {
+ HttpPipelinedHost* host = GetPipelinedHost(origin, false);
+ if (!host) {
+ return NULL;
+ }
+ return host->CreateStreamOnExistingPipeline();
+}
+
+bool HttpPipelinedHostPool::IsExistingPipelineAvailableForOrigin(
+ const HostPortPair& origin) {
+ HttpPipelinedHost* host = GetPipelinedHost(origin, false);
+ if (!host) {
+ return false;
+ }
+ return host->IsExistingPipelineAvailable();
+}
+
+HttpPipelinedHost* HttpPipelinedHostPool::GetPipelinedHost(
+ const HostPortPair& origin, bool create_if_not_found) {
+ HostMap::iterator it = host_map_.find(origin);
+ if (it != host_map_.end()) {
+ CHECK(it->second);
+ return it->second;
+ } else if (!create_if_not_found) {
+ return NULL;
+ }
+ HttpPipelinedHost* host = new HttpPipelinedHost(this, origin, NULL);
+ host_map_[origin] = host;
+ return host;
+}
+
+void HttpPipelinedHostPool::OnHostIdle(HttpPipelinedHost* host) {
+ const HostPortPair& origin = host->origin();
+ CHECK(ContainsKey(host_map_, origin));
+ // TODO(simonjam): We should remember the pipeline state for each host.
+ host_map_.erase(origin);
+ delete host;
+}
+
+void HttpPipelinedHostPool::OnHostHasAdditionalCapacity(
+ HttpPipelinedHost* host) {
+ factory_->OnHttpPipelinedHostHasAdditionalCapacity(host->origin());
+}
+
+} // namespace net
diff --git a/net/http/http_pipelined_host_pool.h b/net/http/http_pipelined_host_pool.h
new file mode 100644
index 0000000..5e7c146
--- /dev/null
+++ b/net/http/http_pipelined_host_pool.h
@@ -0,0 +1,66 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_HTTP_HTTP_PIPELINED_HOST_POOL_H_
+#define NET_HTTP_HTTP_PIPELINED_HOST_POOL_H_
+#pragma once
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "net/http/http_pipelined_host.h"
+
+namespace net {
+
+class HostPortPair;
+class HttpPipelinedStream;
+class HttpStreamFactoryImpl;
+
+// Manages all of the pipelining state for specific host with active pipelined
+// HTTP requests. Manages connection jobs, constructs pipelined streams, and
+// assigns requests to the least loaded pipelined connection.
+class HttpPipelinedHostPool : public HttpPipelinedHost::Delegate {
+ public:
+ explicit HttpPipelinedHostPool(HttpStreamFactoryImpl* factory);
+ ~HttpPipelinedHostPool();
+
+ // Constructs a new pipeline on |connection| and returns a new
+ // HttpPipelinedStream that uses it.
+ HttpPipelinedStream* CreateStreamOnNewPipeline(
+ const HostPortPair& origin,
+ ClientSocketHandle* connection,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated);
+
+ // Tries to find an existing pipeline with capacity for a new request. If
+ // successful, returns a new stream on that pipeline. Otherwise, returns NULL.
+ HttpPipelinedStream* CreateStreamOnExistingPipeline(
+ const HostPortPair& origin);
+
+ // Returns true if a pipelined connection already exists for this origin and
+ // can accept new requests.
+ bool IsExistingPipelineAvailableForOrigin(const HostPortPair& origin);
+
+ // Callbacks for HttpPipelinedHost.
+ virtual void OnHostIdle(HttpPipelinedHost* host) OVERRIDE;
+
+ virtual void OnHostHasAdditionalCapacity(HttpPipelinedHost* host) OVERRIDE;
+
+ private:
+ typedef std::map<const HostPortPair, HttpPipelinedHost*> HostMap;
+
+ HttpPipelinedHost* GetPipelinedHost(const HostPortPair& origin,
+ bool create_if_not_found);
+
+ HttpStreamFactoryImpl* factory_;
+ HostMap host_map_;
+
+ DISALLOW_COPY_AND_ASSIGN(HttpPipelinedHostPool);
+};
+
+} // namespace net
+
+#endif // NET_HTTP_HTTP_PIPELINED_HOST_POOL_H_
diff --git a/net/http/http_pipelined_host_unittest.cc b/net/http/http_pipelined_host_unittest.cc
new file mode 100644
index 0000000..de78e9e
--- /dev/null
+++ b/net/http/http_pipelined_host_unittest.cc
@@ -0,0 +1,201 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_pipelined_host.h"
+
+#include "net/base/ssl_config_service.h"
+#include "net/http/http_pipelined_connection.h"
+#include "net/proxy/proxy_info.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::NiceMock;
+using testing::Ref;
+using testing::Return;
+using testing::ReturnNull;
+
+static const int kMaxCapacity = 3;
+
+namespace net {
+
+static ClientSocketHandle* kDummyConnection =
+ reinterpret_cast<ClientSocketHandle*>(84);
+static HttpPipelinedStream* kDummyStream =
+ reinterpret_cast<HttpPipelinedStream*>(42);
+
+class MockHostDelegate : public HttpPipelinedHost::Delegate {
+ public:
+ MOCK_METHOD1(OnHostIdle, void(HttpPipelinedHost* host));
+ MOCK_METHOD1(OnHostHasAdditionalCapacity, void(HttpPipelinedHost* host));
+};
+
+class MockPipelineFactory : public HttpPipelinedConnection::Factory {
+ public:
+ MOCK_METHOD6(CreateNewPipeline, HttpPipelinedConnection*(
+ ClientSocketHandle* connection,
+ HttpPipelinedConnection::Delegate* delegate,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ const BoundNetLog& net_log,
+ bool was_npn_negotiated));
+};
+
+class MockPipeline : public HttpPipelinedConnection {
+ public:
+ MockPipeline(int depth, bool usable, bool active)
+ : depth_(depth),
+ usable_(usable),
+ active_(active) {
+ }
+
+ void SetState(int depth, bool usable, bool active) {
+ depth_ = depth;
+ usable_ = usable;
+ active_ = active;
+ }
+
+ virtual int depth() const OVERRIDE { return depth_; }
+ virtual bool usable() const OVERRIDE { return usable_; }
+ virtual bool active() const OVERRIDE { return active_; }
+
+ MOCK_METHOD0(CreateNewStream, HttpPipelinedStream*());
+ MOCK_METHOD1(OnStreamDeleted, void(int pipeline_id));
+ MOCK_CONST_METHOD0(used_ssl_config, const SSLConfig&());
+ MOCK_CONST_METHOD0(used_proxy_info, const ProxyInfo&());
+ MOCK_CONST_METHOD0(source, const NetLog::Source&());
+ MOCK_CONST_METHOD0(was_npn_negotiated, bool());
+
+ private:
+ int depth_;
+ bool usable_;
+ bool active_;
+};
+
+class HttpPipelinedHostTest : public testing::Test {
+ public:
+ HttpPipelinedHostTest()
+ : origin_("host", 123),
+ factory_(new MockPipelineFactory), // Owned by host_.
+ host_(&delegate_, origin_, factory_) {
+ }
+
+ MockPipeline* AddTestPipeline(int depth, bool usable, bool active) {
+ MockPipeline* pipeline = new MockPipeline(depth, usable, active);
+ EXPECT_CALL(*factory_, CreateNewPipeline(kDummyConnection, &host_,
+ Ref(ssl_config_), Ref(proxy_info_),
+ Ref(net_log_), true))
+ .Times(1)
+ .WillOnce(Return(pipeline));
+ EXPECT_CALL(*pipeline, CreateNewStream())
+ .Times(1)
+ .WillOnce(Return(kDummyStream));
+ EXPECT_EQ(kDummyStream, host_.CreateStreamOnNewPipeline(
+ kDummyConnection, ssl_config_, proxy_info_, net_log_, true));
+ return pipeline;
+ }
+
+ void ClearTestPipeline(MockPipeline* pipeline) {
+ pipeline->SetState(0, true, true);
+ host_.OnPipelineHasCapacity(pipeline);
+ }
+
+ NiceMock<MockHostDelegate> delegate_;
+ HostPortPair origin_;
+ MockPipelineFactory* factory_;
+ HttpPipelinedHost host_;
+
+ SSLConfig ssl_config_;
+ ProxyInfo proxy_info_;
+ BoundNetLog net_log_;
+};
+
+TEST_F(HttpPipelinedHostTest, Delegate) {
+ EXPECT_TRUE(origin_.Equals(host_.origin()));
+}
+
+TEST_F(HttpPipelinedHostTest, OnHostIdle) {
+ MockPipeline* pipeline = AddTestPipeline(0, false, true);
+
+ EXPECT_CALL(delegate_, OnHostHasAdditionalCapacity(&host_))
+ .Times(0);
+ EXPECT_CALL(delegate_, OnHostIdle(&host_))
+ .Times(1);
+ host_.OnPipelineHasCapacity(pipeline);
+}
+
+TEST_F(HttpPipelinedHostTest, OnHostHasAdditionalCapacity) {
+ MockPipeline* pipeline = AddTestPipeline(1, true, true);
+
+ EXPECT_CALL(delegate_, OnHostHasAdditionalCapacity(&host_))
+ .Times(2);
+ EXPECT_CALL(delegate_, OnHostIdle(&host_))
+ .Times(0);
+
+ host_.OnPipelineHasCapacity(pipeline);
+
+ EXPECT_CALL(delegate_, OnHostIdle(&host_))
+ .Times(1);
+ ClearTestPipeline(pipeline);
+}
+
+TEST_F(HttpPipelinedHostTest, IgnoresUnusablePipeline) {
+ MockPipeline* pipeline = AddTestPipeline(1, false, true);
+
+ EXPECT_FALSE(host_.IsExistingPipelineAvailable());
+ EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline());
+
+ ClearTestPipeline(pipeline);
+}
+
+TEST_F(HttpPipelinedHostTest, IgnoresInactivePipeline) {
+ MockPipeline* pipeline = AddTestPipeline(1, true, false);
+
+ EXPECT_FALSE(host_.IsExistingPipelineAvailable());
+ EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline());
+
+ ClearTestPipeline(pipeline);
+}
+
+TEST_F(HttpPipelinedHostTest, IgnoresFullPipeline) {
+ MockPipeline* pipeline = AddTestPipeline(kMaxCapacity, true, true);
+
+ EXPECT_FALSE(host_.IsExistingPipelineAvailable());
+ EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline());
+
+ ClearTestPipeline(pipeline);
+}
+
+TEST_F(HttpPipelinedHostTest, PicksLeastLoadedPipeline) {
+ MockPipeline* full_pipeline = AddTestPipeline(kMaxCapacity, true, true);
+ MockPipeline* usable_pipeline = AddTestPipeline(kMaxCapacity - 1, true, true);
+ MockPipeline* empty_pipeline = AddTestPipeline(0, true, true);
+
+ EXPECT_TRUE(host_.IsExistingPipelineAvailable());
+ EXPECT_CALL(*empty_pipeline, CreateNewStream())
+ .Times(1)
+ .WillOnce(ReturnNull());
+ EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline());
+
+ ClearTestPipeline(full_pipeline);
+ ClearTestPipeline(usable_pipeline);
+ ClearTestPipeline(empty_pipeline);
+}
+
+TEST_F(HttpPipelinedHostTest, EmptyPipelineIsRemoved) {
+ MockPipeline* empty_pipeline = AddTestPipeline(0, true, true);
+
+ EXPECT_TRUE(host_.IsExistingPipelineAvailable());
+ EXPECT_CALL(*empty_pipeline, CreateNewStream())
+ .Times(1)
+ .WillOnce(Return(kDummyStream));
+ EXPECT_EQ(kDummyStream, host_.CreateStreamOnExistingPipeline());
+
+ ClearTestPipeline(empty_pipeline);
+
+ EXPECT_FALSE(host_.IsExistingPipelineAvailable());
+ EXPECT_EQ(NULL, host_.CreateStreamOnExistingPipeline());
+}
+
+} // namespace net
diff --git a/net/http/http_pipelined_stream.cc b/net/http/http_pipelined_stream.cc
new file mode 100644
index 0000000..2183c52
--- /dev/null
+++ b/net/http/http_pipelined_stream.cc
@@ -0,0 +1,142 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_pipelined_stream.h"
+
+#include "base/logging.h"
+#include "base/stringprintf.h"
+#include "net/base/net_errors.h"
+#include "net/http/http_pipelined_connection_impl.h"
+#include "net/http/http_request_headers.h"
+#include "net/http/http_request_info.h"
+#include "net/http/http_util.h"
+
+namespace net {
+
+HttpPipelinedStream::HttpPipelinedStream(HttpPipelinedConnectionImpl* pipeline,
+ int pipeline_id)
+ : pipeline_(pipeline),
+ pipeline_id_(pipeline_id),
+ request_info_(NULL) {
+}
+
+HttpPipelinedStream::~HttpPipelinedStream() {
+ pipeline_->OnStreamDeleted(pipeline_id_);
+}
+
+int HttpPipelinedStream::InitializeStream(const HttpRequestInfo* request_info,
+ const BoundNetLog& net_log,
+ OldCompletionCallback* callback) {
+ request_info_ = request_info;
+ pipeline_->InitializeParser(pipeline_id_, request_info, net_log);
+ return OK;
+}
+
+
+int HttpPipelinedStream::SendRequest(const HttpRequestHeaders& headers,
+ UploadDataStream* request_body,
+ HttpResponseInfo* response,
+ OldCompletionCallback* callback) {
+ CHECK(pipeline_id_);
+ CHECK(request_info_);
+ // TODO(simonjam): Proxy support will be needed here.
+ const std::string path = HttpUtil::PathForRequest(request_info_->url);
+ std::string request_line_ = base::StringPrintf("%s %s HTTP/1.1\r\n",
+ request_info_->method.c_str(),
+ path.c_str());
+ return pipeline_->SendRequest(pipeline_id_, request_line_, headers,
+ request_body, response, callback);
+}
+
+uint64 HttpPipelinedStream::GetUploadProgress() const {
+ return pipeline_->GetUploadProgress(pipeline_id_);
+}
+
+int HttpPipelinedStream::ReadResponseHeaders(OldCompletionCallback* callback) {
+ return pipeline_->ReadResponseHeaders(pipeline_id_, callback);
+}
+
+const HttpResponseInfo* HttpPipelinedStream::GetResponseInfo() const {
+ return pipeline_->GetResponseInfo(pipeline_id_);
+}
+
+int HttpPipelinedStream::ReadResponseBody(IOBuffer* buf, int buf_len,
+ OldCompletionCallback* callback) {
+ return pipeline_->ReadResponseBody(pipeline_id_, buf, buf_len, callback);
+}
+
+void HttpPipelinedStream::Close(bool not_reusable) {
+ pipeline_->Close(pipeline_id_, not_reusable);
+}
+
+HttpStream* HttpPipelinedStream::RenewStreamForAuth() {
+ // FIXME: What does this mean on a pipeline? Is it for proxies?
+ return new HttpPipelinedStream(pipeline_, pipeline_id_);
+}
+
+bool HttpPipelinedStream::IsResponseBodyComplete() const {
+ return pipeline_->IsResponseBodyComplete(pipeline_id_);
+}
+
+bool HttpPipelinedStream::CanFindEndOfResponse() const {
+ return pipeline_->CanFindEndOfResponse(pipeline_id_);
+}
+
+bool HttpPipelinedStream::IsMoreDataBuffered() const {
+ return pipeline_->IsMoreDataBuffered(pipeline_id_);
+}
+
+bool HttpPipelinedStream::IsConnectionReused() const {
+ return pipeline_->IsConnectionReused(pipeline_id_);
+}
+
+void HttpPipelinedStream::SetConnectionReused() {
+ pipeline_->SetConnectionReused(pipeline_id_);
+}
+
+bool HttpPipelinedStream::IsConnectionReusable() const {
+ return pipeline_->usable();
+}
+
+void HttpPipelinedStream::GetSSLInfo(SSLInfo* ssl_info) {
+ pipeline_->GetSSLInfo(pipeline_id_, ssl_info);
+}
+
+void HttpPipelinedStream::GetSSLCertRequestInfo(
+ SSLCertRequestInfo* cert_request_info) {
+ pipeline_->GetSSLCertRequestInfo(pipeline_id_, cert_request_info);
+}
+
+bool HttpPipelinedStream::IsSpdyHttpStream() const {
+ return false;
+}
+
+void HttpPipelinedStream::LogNumRttVsBytesMetrics() const {
+ // TODO(simonjam): I don't want to copy & paste this from http_basic_stream.
+}
+
+void HttpPipelinedStream::Drain(HttpNetworkSession*) {
+ // On errors, we already evict everything from the pipeline and close it.
+ // TODO(simonjam): Consider trying to drain the pipeline in the same way that
+ // HttpBasicStream does.
+ delete this;
+}
+
+const SSLConfig& HttpPipelinedStream::used_ssl_config() const {
+ return pipeline_->used_ssl_config();
+}
+
+const ProxyInfo& HttpPipelinedStream::used_proxy_info() const {
+ return pipeline_->used_proxy_info();
+}
+
+const NetLog::Source& HttpPipelinedStream::source() const {
+ return pipeline_->source();
+}
+
+bool HttpPipelinedStream::was_npn_negotiated() const {
+ return pipeline_->was_npn_negotiated();
+}
+
+} // namespace net
diff --git a/net/http/http_pipelined_stream.h b/net/http/http_pipelined_stream.h
new file mode 100644
index 0000000..dfafe69
--- /dev/null
+++ b/net/http/http_pipelined_stream.h
@@ -0,0 +1,112 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_HTTP_HTTP_PIPELINED_STREAM_H_
+#define NET_HTTP_HTTP_PIPELINED_STREAM_H_
+#pragma once
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "net/base/net_log.h"
+#include "net/http/http_stream.h"
+
+namespace net {
+
+class BoundNetLog;
+class HttpPipelinedConnectionImpl;
+class HttpResponseInfo;
+class HttpRequestHeaders;
+struct HttpRequestInfo;
+class HttpStreamParser;
+class IOBuffer;
+class ProxyInfo;
+struct SSLConfig;
+class UploadDataStream;
+
+// HttpPipelinedStream is the pipelined implementation of HttpStream. It has
+// very little code in it. Instead, it serves as the client's interface to the
+// pipelined connection, where all the work happens.
+//
+// In the case of pipelining failures, these functions may return
+// ERR_PIPELINE_EVICTION. In that case, the client should retry the HTTP
+// request without pipelining.
+class HttpPipelinedStream : public HttpStream {
+ public:
+ HttpPipelinedStream(HttpPipelinedConnectionImpl* pipeline,
+ int pipeline_id);
+ virtual ~HttpPipelinedStream();
+
+ // HttpStream methods:
+ virtual int InitializeStream(const HttpRequestInfo* request_info,
+ const BoundNetLog& net_log,
+ OldCompletionCallback* callback) OVERRIDE;
+
+ virtual int SendRequest(const HttpRequestHeaders& headers,
+ UploadDataStream* request_body,
+ HttpResponseInfo* response,
+ OldCompletionCallback* callback) OVERRIDE;
+
+ virtual uint64 GetUploadProgress() const OVERRIDE;
+
+ virtual int ReadResponseHeaders(OldCompletionCallback* callback) OVERRIDE;
+
+ virtual const HttpResponseInfo* GetResponseInfo() const OVERRIDE;
+
+ virtual int ReadResponseBody(IOBuffer* buf, int buf_len,
+ OldCompletionCallback* callback) OVERRIDE;
+
+ virtual void Close(bool not_reusable) OVERRIDE;
+
+ virtual HttpStream* RenewStreamForAuth() OVERRIDE;
+
+ virtual bool IsResponseBodyComplete() const OVERRIDE;
+
+ virtual bool CanFindEndOfResponse() const OVERRIDE;
+
+ virtual bool IsMoreDataBuffered() const OVERRIDE;
+
+ virtual bool IsConnectionReused() const OVERRIDE;
+
+ virtual void SetConnectionReused() OVERRIDE;
+
+ virtual bool IsConnectionReusable() const OVERRIDE;
+
+ virtual void GetSSLInfo(SSLInfo* ssl_info) OVERRIDE;
+
+ virtual void GetSSLCertRequestInfo(
+ SSLCertRequestInfo* cert_request_info) OVERRIDE;
+
+ virtual bool IsSpdyHttpStream() const OVERRIDE;
+
+ virtual void LogNumRttVsBytesMetrics() const OVERRIDE;
+
+ virtual void Drain(HttpNetworkSession* session) OVERRIDE;
+
+ // The SSLConfig used to establish this stream's pipeline.
+ const SSLConfig& used_ssl_config() const;
+
+ // The ProxyInfo used to establish this this stream's pipeline.
+ const ProxyInfo& used_proxy_info() const;
+
+ // The source of this stream's pipelined connection.
+ const NetLog::Source& source() const;
+
+ // True if this stream's pipeline was NPN negotiated.
+ bool was_npn_negotiated() const;
+
+ private:
+ HttpPipelinedConnectionImpl* pipeline_;
+
+ int pipeline_id_;
+
+ const HttpRequestInfo* request_info_;
+
+ DISALLOW_COPY_AND_ASSIGN(HttpPipelinedStream);
+};
+
+} // namespace net
+
+#endif // NET_HTTP_HTTP_PIPELINED_STREAM_H_
diff --git a/net/http/http_response_body_drainer_unittest.cc b/net/http/http_response_body_drainer_unittest.cc
index 9f774db..9db5ec0 100644
--- a/net/http/http_response_body_drainer_unittest.cc
+++ b/net/http/http_response_body_drainer_unittest.cc
@@ -120,6 +120,8 @@ class MockHttpStream : public HttpStream {
virtual void LogNumRttVsBytesMetrics() const OVERRIDE {}
+ virtual void Drain(HttpNetworkSession*) OVERRIDE {}
+
// Methods to tweak/observer mock behavior:
void StallReadsForever() { stall_reads_forever_ = true; }
diff --git a/net/http/http_stream.h b/net/http/http_stream.h
index 7460fb7..2d6402f 100644
--- a/net/http/http_stream.h
+++ b/net/http/http_stream.h
@@ -23,6 +23,7 @@
namespace net {
class BoundNetLog;
+class HttpNetworkSession;
class HttpRequestHeaders;
struct HttpRequestInfo;
class HttpResponseInfo;
@@ -139,6 +140,12 @@ class NET_EXPORT_PRIVATE HttpStream {
// response body vs bytes transferred.
virtual void LogNumRttVsBytesMetrics() const = 0;
+ // In the case of an HTTP error or redirect, flush the response body (usually
+ // a simple error or "this page has moved") so that we can re-use the
+ // underlying connection. This stream is responsible for deleting itself when
+ // draining is complete.
+ virtual void Drain(HttpNetworkSession* session) = 0;
+
private:
DISALLOW_COPY_AND_ASSIGN(HttpStream);
};
diff --git a/net/http/http_stream_factory.cc b/net/http/http_stream_factory.cc
index 70a4940..7e81d61 100644
--- a/net/http/http_stream_factory.cc
+++ b/net/http/http_stream_factory.cc
@@ -33,6 +33,8 @@ bool HttpStreamFactory::force_spdy_always_ = false;
std::list<HostPortPair>* HttpStreamFactory::forced_spdy_exclusions_ = NULL;
// static
bool HttpStreamFactory::ignore_certificate_errors_ = false;
+// static
+bool HttpStreamFactory::http_pipelining_enabled_ = false;
HttpStreamFactory::~HttpStreamFactory() {}
diff --git a/net/http/http_stream_factory.h b/net/http/http_stream_factory.h
index 3cbad96..31e0cc1 100644
--- a/net/http/http_stream_factory.h
+++ b/net/http/http_stream_factory.h
@@ -24,6 +24,7 @@ class HostMappingRules;
class HostPortPair;
class HttpAuthController;
class HttpNetworkSession;
+class HttpPipelinedHost;
class HttpResponseInfo;
class HttpServerProperties;
class HttpStream;
@@ -239,6 +240,11 @@ class NET_EXPORT HttpStreamFactory {
static void SetHostMappingRules(const std::string& rules);
+ static void set_http_pipelining_enabled(bool value) {
+ http_pipelining_enabled_ = value;
+ }
+ static bool http_pipelining_enabled() { return http_pipelining_enabled_; }
+
protected:
HttpStreamFactory();
@@ -253,6 +259,7 @@ class NET_EXPORT HttpStreamFactory {
static bool force_spdy_always_;
static std::list<HostPortPair>* forced_spdy_exclusions_;
static bool ignore_certificate_errors_;
+ static bool http_pipelining_enabled_;
DISALLOW_COPY_AND_ASSIGN(HttpStreamFactory);
};
diff --git a/net/http/http_stream_factory_impl.cc b/net/http/http_stream_factory_impl.cc
index 364d202..9593dc4 100644
--- a/net/http/http_stream_factory_impl.cc
+++ b/net/http/http_stream_factory_impl.cc
@@ -10,6 +10,9 @@
#include "net/base/net_log.h"
#include "net/base/net_util.h"
#include "net/http/http_network_session.h"
+#include "net/http/http_pipelined_connection.h"
+#include "net/http/http_pipelined_host.h"
+#include "net/http/http_pipelined_stream.h"
#include "net/http/http_server_properties.h"
#include "net/http/http_stream_factory_impl_job.h"
#include "net/http/http_stream_factory_impl_request.h"
@@ -33,7 +36,8 @@ GURL UpgradeUrlToHttps(const GURL& original_url, int port) {
} // namespace
HttpStreamFactoryImpl::HttpStreamFactoryImpl(HttpNetworkSession* session)
- : session_(session) {}
+ : session_(session),
+ http_pipelined_host_pool_(this) {}
HttpStreamFactoryImpl::~HttpStreamFactoryImpl() {
DCHECK(request_map_.empty());
@@ -221,4 +225,21 @@ void HttpStreamFactoryImpl::OnPreconnectsComplete(const Job* job) {
OnPreconnectsCompleteInternal();
}
+void HttpStreamFactoryImpl::OnHttpPipelinedHostHasAdditionalCapacity(
+ const HostPortPair& origin) {
+ HttpPipelinedStream* stream;
+ while (ContainsKey(http_pipelining_request_map_, origin) &&
+ (stream = http_pipelined_host_pool_.CreateStreamOnExistingPipeline(
+ origin))) {
+ Request* request = *http_pipelining_request_map_[origin].begin();
+ request->Complete(stream->was_npn_negotiated(),
+ false, // not using_spdy
+ stream->source());
+ request->OnStreamReady(NULL,
+ stream->used_ssl_config(),
+ stream->used_proxy_info(),
+ stream);
+ }
+}
+
} // namespace net
diff --git a/net/http/http_stream_factory_impl.h b/net/http/http_stream_factory_impl.h
index b84f158..9c74e23 100644
--- a/net/http/http_stream_factory_impl.h
+++ b/net/http/http_stream_factory_impl.h
@@ -10,13 +10,15 @@
#include "base/memory/ref_counted.h"
#include "net/base/host_port_pair.h"
-#include "net/http/http_stream_factory.h"
#include "net/base/net_log.h"
+#include "net/http/http_pipelined_host_pool.h"
+#include "net/http/http_stream_factory.h"
#include "net/proxy/proxy_server.h"
namespace net {
class HttpNetworkSession;
+class HttpPipelinedHost;
class SpdySession;
class NET_EXPORT_PRIVATE HttpStreamFactoryImpl : public HttpStreamFactory {
@@ -40,12 +42,18 @@ class NET_EXPORT_PRIVATE HttpStreamFactoryImpl : public HttpStreamFactory {
virtual void AddTLSIntolerantServer(const HostPortPair& server);
virtual bool IsTLSIntolerantServer(const HostPortPair& server) const;
+ // Called when a HttpPipelinedHost has new capacity. Attempts to allocate any
+ // pending pipeline-capable requests to pipelines.
+ virtual void OnHttpPipelinedHostHasAdditionalCapacity(
+ const HostPortPair& origin);
+
private:
class Request;
class Job;
typedef std::set<Request*> RequestSet;
typedef std::map<HostPortProxyPair, RequestSet> SpdySessionRequestMap;
+ typedef std::map<HostPortPair, RequestSet> HttpPipeliningRequestMap;
bool GetAlternateProtocolRequestFor(const GURL& original_url,
GURL* alternate_url) const;
@@ -88,6 +96,9 @@ class NET_EXPORT_PRIVATE HttpStreamFactoryImpl : public HttpStreamFactory {
std::map<const Job*, Request*> request_map_;
SpdySessionRequestMap spdy_session_request_map_;
+ HttpPipeliningRequestMap http_pipelining_request_map_;
+
+ HttpPipelinedHostPool http_pipelined_host_pool_;
// These jobs correspond to jobs orphaned by Requests and now owned by
// HttpStreamFactoryImpl. Since they are no longer tied to Requests, they will
diff --git a/net/http/http_stream_factory_impl_job.cc b/net/http/http_stream_factory_impl_job.cc
index 74a7895..d858be1 100644
--- a/net/http/http_stream_factory_impl_job.cc
+++ b/net/http/http_stream_factory_impl_job.cc
@@ -16,10 +16,15 @@
#include "net/base/ssl_cert_request_info.h"
#include "net/http/http_basic_stream.h"
#include "net/http/http_network_session.h"
+#include "net/http/http_pipelined_connection.h"
+#include "net/http/http_pipelined_host.h"
+#include "net/http/http_pipelined_host_pool.h"
+#include "net/http/http_pipelined_stream.h"
#include "net/http/http_proxy_client_socket.h"
#include "net/http/http_proxy_client_socket_pool.h"
#include "net/http/http_request_info.h"
#include "net/http/http_server_properties.h"
+#include "net/http/http_stream_factory.h"
#include "net/http/http_stream_factory_impl_request.h"
#include "net/socket/client_socket_handle.h"
#include "net/socket/client_socket_pool.h"
@@ -88,6 +93,7 @@ HttpStreamFactoryImpl::Job::Job(HttpStreamFactoryImpl* stream_factory,
was_npn_negotiated_(false),
num_streams_(0),
spdy_session_direct_(false),
+ existing_available_pipeline_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)) {
DCHECK(stream_factory);
DCHECK(session);
@@ -598,6 +604,17 @@ int HttpStreamFactoryImpl::Job::DoInitConnection() {
} else if (request_ && (using_ssl_ || ShouldForceSpdyWithoutSSL())) {
// Update the spdy session key for the request that launched this job.
request_->SetSpdySessionKey(spdy_session_key);
+ } else if (IsRequestEligibleForPipelining()) {
+ // TODO(simonjam): With pipelining, we might be better off using fewer
+ // connections and thus should make fewer preconnections. Explore
+ // preconnecting fewer than the requested num_connections.
+ existing_available_pipeline_ = stream_factory_->http_pipelined_host_pool_.
+ IsExistingPipelineAvailableForOrigin(origin_);
+ if (existing_available_pipeline_) {
+ return OK;
+ } else {
+ request_->SetHttpPipeliningKey(origin_);
+ }
}
// OK, there's no available SPDY session. Let |dependent_job_| resume if it's
@@ -756,10 +773,6 @@ int HttpStreamFactoryImpl::Job::DoInitConnectionComplete(int result) {
return result;
}
- if (!connection_->socket()) {
- HACKCrashHereToDebug80095();
- }
-
next_state_ = STATE_CREATE_STREAM;
return OK;
}
@@ -774,8 +787,8 @@ int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) {
}
int HttpStreamFactoryImpl::Job::DoCreateStream() {
- if (!connection_->socket() && !existing_spdy_session_)
- HACKCrashHereToDebug80095();
+ DCHECK(connection_->socket() || existing_spdy_session_ ||
+ existing_available_pipeline_);
next_state_ = STATE_CREATE_STREAM_COMPLETE;
@@ -790,16 +803,30 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() {
if (!using_spdy_) {
bool using_proxy = (proxy_info_.is_http() || proxy_info_.is_https()) &&
request_info_.url.SchemeIs("http");
- stream_.reset(new HttpBasicStream(connection_.release(), NULL,
- using_proxy));
+ // TODO(simonjam): Support proxies.
+ if (existing_available_pipeline_) {
+ stream_.reset(stream_factory_->http_pipelined_host_pool_.
+ CreateStreamOnExistingPipeline(origin_));
+ CHECK(stream_.get());
+ } else if (!using_proxy && IsRequestEligibleForPipelining()) {
+ stream_.reset(
+ stream_factory_->http_pipelined_host_pool_.CreateStreamOnNewPipeline(
+ origin_,
+ connection_.release(),
+ server_ssl_config_,
+ proxy_info_,
+ net_log_,
+ was_npn_negotiated_));
+ CHECK(stream_.get());
+ } else {
+ stream_.reset(new HttpBasicStream(connection_.release(), NULL,
+ using_proxy));
+ }
return OK;
}
CHECK(!stream_.get());
- if (!connection_->socket() && !existing_spdy_session_)
- HACKCrashHereToDebug80095();
-
bool direct = true;
HostPortProxyPair pair(origin_, proxy_server);
if (IsHttpsProxyAndHttpUrl()) {
@@ -821,12 +848,6 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() {
SpdySessionPool* spdy_pool = session_->spdy_session_pool();
spdy_session = spdy_pool->GetIfExists(pair, net_log_);
if (!spdy_session) {
- // SPDY can be negotiated using the TLS next protocol negotiation (NPN)
- // extension, or just directly using SSL. Either way, |connection_| must
- // contain an SSLClientSocket.
- if (!connection_->socket())
- HACKCrashHereToDebug80095();
-
int error = spdy_pool->GetSpdySessionFromSocket(
pair, connection_.release(), net_log_, spdy_certificate_error_,
&new_spdy_session_, using_ssl_);
@@ -1102,47 +1123,17 @@ bool HttpStreamFactoryImpl::Job::IsOrphaned() const {
return !IsPreconnecting() && !request_;
}
-#if defined(OS_WIN)
-#pragma warning (disable: 4748)
-#pragma optimize( "", off )
-#endif
-
-void HttpStreamFactoryImpl::Job::HACKCrashHereToDebug80095() {
- // If we enter this code path, then we'll cause a crash later in
- // DoCreateStream(). Crash now and figure out what happened:
- // http://crbug.com/80095.
- GURL url = original_url_.get() ? *original_url_ : request_info_.url;
- bool using_ssl = using_ssl_;
- bool using_spdy = using_spdy_;
- char url_buf[512];
- base::strlcpy(url_buf, url.spec().data(), arraysize(url_buf));
-
- // Note that these local variables have their addresses referenced to
- // prevent the compiler from optimizing them away.
- if (using_spdy) {
- LOG(FATAL) << "Crashing here because willchan doesn't know why we're "
- << "crashing later. Sorry! I'll give you a cookie later. "
- << "Cheers mate!\n"
- << "url[" << &url << "]: " << url << "\n"
- << "using_ssl[" << &using_ssl << "]: "
- << (using_ssl ? "true\n" : "false\n")
- << "using_spdy[" << &using_spdy << "]: "
- << (using_spdy ? "true\n" : "false\n");
- } else {
- LOG(FATAL) << "Crashing here because willchan doesn't know why we're "
- << "crashing later. Sorry! I'll give you a cookie later. "
- << "Cheers mate!\n"
- << "url[" << &url << "]: " << url << "\n"
- << "using_ssl[" << &using_ssl << "]: "
- << (using_ssl ? "true\n" : "false\n")
- << "using_spdy[" << &using_spdy << "]: "
- << (using_spdy ? "true\n" : "false\n");
+bool HttpStreamFactoryImpl::Job::IsRequestEligibleForPipelining() const {
+ if (!HttpStreamFactory::http_pipelining_enabled()) {
+ return false;
+ }
+ if (IsPreconnecting() || !request_) {
+ return false;
}
+ if (using_ssl_) {
+ return false;
+ }
+ return request_info_.method == "GET" || request_info_.method == "HEAD";
}
-#if defined(OS_WIN)
-#pragma optimize( "", on )
-#pragma warning (default: 4748)
-#endif
-
} // namespace net
diff --git a/net/http/http_stream_factory_impl_job.h b/net/http/http_stream_factory_impl_job.h
index b3d4175..50da574 100644
--- a/net/http/http_stream_factory_impl_job.h
+++ b/net/http/http_stream_factory_impl_job.h
@@ -23,6 +23,7 @@ namespace net {
class ClientSocketHandle;
class HttpAuthController;
class HttpNetworkSession;
+class HttpPipelinedConnection;
class HttpProxySocketParams;
class HttpStream;
class SOCKSSocketParams;
@@ -197,11 +198,11 @@ class HttpStreamFactoryImpl::Job {
// Should we force SPDY to run without SSL for this stream request.
bool ShouldForceSpdyWithoutSSL() const;
+ bool IsRequestEligibleForPipelining() const;
+
// Record histograms of latency until Connect() completes.
static void LogHttpConnectedMetrics(const ClientSocketHandle& handle);
- void HACKCrashHereToDebug80095();
-
Request* request_;
const HttpRequestInfo request_info_;
@@ -276,6 +277,9 @@ class HttpStreamFactoryImpl::Job {
// Only used if |new_spdy_session_| is non-NULL.
bool spdy_session_direct_;
+ // True if an existing pipeline can handle this job's request.
+ bool existing_available_pipeline_;
+
ScopedRunnableMethodFactory<Job> method_factory_;
DISALLOW_COPY_AND_ASSIGN(Job);
diff --git a/net/http/http_stream_factory_impl_request.cc b/net/http/http_stream_factory_impl_request.cc
index 801638a..17c717e 100644
--- a/net/http/http_stream_factory_impl_request.cc
+++ b/net/http/http_stream_factory_impl_request.cc
@@ -40,9 +40,10 @@ HttpStreamFactoryImpl::Request::~Request() {
for (std::set<Job*>::iterator it = jobs_.begin(); it != jobs_.end(); ++it)
factory_->request_map_.erase(*it);
- STLDeleteElements(&jobs_);
-
RemoveRequestFromSpdySessionRequestMap();
+ RemoveRequestFromHttpPipeliningRequestMap();
+
+ STLDeleteElements(&jobs_);
}
void HttpStreamFactoryImpl::Request::SetSpdySessionKey(
@@ -55,6 +56,16 @@ void HttpStreamFactoryImpl::Request::SetSpdySessionKey(
request_set.insert(this);
}
+void HttpStreamFactoryImpl::Request::SetHttpPipeliningKey(
+ const HostPortPair& http_pipelining_key) {
+ DCHECK(!http_pipelining_key_.get());
+ http_pipelining_key_.reset(new HostPortPair(http_pipelining_key));
+ RequestSet& request_set =
+ factory_->http_pipelining_request_map_[http_pipelining_key];
+ DCHECK(!ContainsKey(request_set, this));
+ request_set.insert(this);
+}
+
void HttpStreamFactoryImpl::Request::AttachJob(Job* job) {
DCHECK(job);
jobs_.insert(job);
@@ -84,7 +95,8 @@ void HttpStreamFactoryImpl::Request::OnStreamReady(
DCHECK(completed_);
// |job| should only be NULL if we're being serviced by a late bound
- // SpdySession (one that was not created by a job in our |jobs_| set).
+ // SpdySession or HttpPipelinedConnection (one that was not created by a job
+ // in our |jobs_| set).
if (!job) {
DCHECK(!bound_job_.get());
DCHECK(!jobs_.empty());
@@ -225,6 +237,22 @@ HttpStreamFactoryImpl::Request::RemoveRequestFromSpdySessionRequestMap() {
}
}
+void
+HttpStreamFactoryImpl::Request::RemoveRequestFromHttpPipeliningRequestMap() {
+ if (http_pipelining_key_.get()) {
+ HttpPipeliningRequestMap& http_pipelining_request_map =
+ factory_->http_pipelining_request_map_;
+ DCHECK(ContainsKey(http_pipelining_request_map, *http_pipelining_key_));
+ RequestSet& request_set =
+ http_pipelining_request_map[*http_pipelining_key_];
+ DCHECK(ContainsKey(request_set, this));
+ request_set.erase(this);
+ if (request_set.empty())
+ http_pipelining_request_map.erase(*http_pipelining_key_);
+ http_pipelining_key_.reset();
+ }
+}
+
void HttpStreamFactoryImpl::Request::OnSpdySessionReady(
Job* job,
scoped_refptr<SpdySession> spdy_session,
@@ -278,6 +306,7 @@ void HttpStreamFactoryImpl::Request::OrphanJobsExcept(Job* job) {
void HttpStreamFactoryImpl::Request::OrphanJobs() {
RemoveRequestFromSpdySessionRequestMap();
+ RemoveRequestFromHttpPipeliningRequestMap();
std::set<Job*> tmp;
tmp.swap(jobs_);
diff --git a/net/http/http_stream_factory_impl_request.h b/net/http/http_stream_factory_impl_request.h
index 4a7ace4..da81b73 100644
--- a/net/http/http_stream_factory_impl_request.h
+++ b/net/http/http_stream_factory_impl_request.h
@@ -30,6 +30,11 @@ class HttpStreamFactoryImpl::Request : public HttpStreamRequest {
// before knowing if SPDY is available.
void SetSpdySessionKey(const HostPortProxyPair& spdy_session_key);
+ // Called when the Job determines the appropriate |http_pipelining_key| for
+ // the Request. Registers this Request with the factory, so that if an
+ // existing pipeline becomes available, this Request can be late bound to it.
+ void SetHttpPipeliningKey(const HostPortPair& http_pipelining_key);
+
// Attaches |job| to this request. Does not mean that Request will use |job|,
// but Request will own |job|.
void AttachJob(HttpStreamFactoryImpl::Job* job);
@@ -41,10 +46,14 @@ class HttpStreamFactoryImpl::Request : public HttpStreamRequest {
bool using_spdy,
const NetLog::Source& source);
- // If this Request has a spdy_session_key, remove this session from the
+ // If this Request has a |spdy_session_key_|, remove this session from the
// SpdySessionRequestMap.
void RemoveRequestFromSpdySessionRequestMap();
+ // If this Request has a |http_pipelining_key_|, remove this session from the
+ // HttpPipeliningRequestMap.
+ void RemoveRequestFromHttpPipeliningRequestMap();
+
// Called by an attached Job if it sets up a SpdySession.
void OnSpdySessionReady(Job* job,
scoped_refptr<SpdySession> spdy_session,
@@ -102,6 +111,7 @@ class HttpStreamFactoryImpl::Request : public HttpStreamRequest {
scoped_ptr<Job> bound_job_;
std::set<HttpStreamFactoryImpl::Job*> jobs_;
scoped_ptr<const HostPortProxyPair> spdy_session_key_;
+ scoped_ptr<const HostPortPair> http_pipelining_key_;
bool completed_;
bool was_npn_negotiated_;
diff --git a/net/http/http_stream_parser.cc b/net/http/http_stream_parser.cc
index 028669f..2a86220 100644
--- a/net/http/http_stream_parser.cc
+++ b/net/http/http_stream_parser.cc
@@ -85,7 +85,6 @@ HttpStreamParser::HttpStreamParser(ClientSocketHandle* connection,
chunk_length_(0),
chunk_length_without_encoding_(0),
sent_last_chunk_(false) {
- DCHECK_EQ(0, read_buffer->offset());
}
HttpStreamParser::~HttpStreamParser() {
diff --git a/net/net.gyp b/net/net.gyp
index a6223d8..6962b2e 100644
--- a/net/net.gyp
+++ b/net/net.gyp
@@ -414,6 +414,15 @@
'http/http_network_session_peer.h',
'http/http_network_transaction.cc',
'http/http_network_transaction.h',
+ 'http/http_pipelined_connection.h',
+ 'http/http_pipelined_connection_impl.cc',
+ 'http/http_pipelined_connection_impl.h',
+ 'http/http_pipelined_host.cc',
+ 'http/http_pipelined_host.h',
+ 'http/http_pipelined_host_pool.cc',
+ 'http/http_pipelined_host_pool.h',
+ 'http/http_pipelined_stream.cc',
+ 'http/http_pipelined_stream.h',
'http/http_proxy_client_socket.cc',
'http/http_proxy_client_socket.h',
'http/http_proxy_client_socket_pool.cc',
@@ -1034,6 +1043,8 @@
'http/http_mac_signature_unittest.cc',
'http/http_network_layer_unittest.cc',
'http/http_network_transaction_unittest.cc',
+ 'http/http_pipelined_connection_impl_unittest.cc',
+ 'http/http_pipelined_host_unittest.cc',
'http/http_proxy_client_socket_pool_unittest.cc',
'http/http_request_headers_unittest.cc',
'http/http_response_body_drainer_unittest.cc',
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc
index 474d497..175f827 100644
--- a/net/spdy/spdy_http_stream.cc
+++ b/net/spdy/spdy_http_stream.cc
@@ -472,4 +472,9 @@ bool SpdyHttpStream::IsSpdyHttpStream() const {
return true;
}
+void SpdyHttpStream::Drain(HttpNetworkSession* session) {
+ Close(false);
+ delete this;
+}
+
} // namespace net
diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h
index 46a4104..b025cd2 100644
--- a/net/spdy/spdy_http_stream.h
+++ b/net/spdy/spdy_http_stream.h
@@ -70,6 +70,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate,
SSLCertRequestInfo* cert_request_info) OVERRIDE;
virtual bool IsSpdyHttpStream() const OVERRIDE;
virtual void LogNumRttVsBytesMetrics() const OVERRIDE {}
+ virtual void Drain(HttpNetworkSession* session) OVERRIDE;
// SpdyStream::Delegate methods:
virtual bool OnSendHeadersComplete(int status) OVERRIDE;