diff options
author | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-15 17:09:57 +0000 |
---|---|---|
committer | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-15 17:09:57 +0000 |
commit | 5fc08e35ad744f5bbccbfee38fa0941fd746c427 (patch) | |
tree | ae66e63a8d6d898aa15e0cf3a1032d5c7b4694c0 /net | |
parent | 8d090ca8c25795d526761bcae9fb301b51b2ee43 (diff) | |
download | chromium_src-5fc08e35ad744f5bbccbfee38fa0941fd746c427.zip chromium_src-5fc08e35ad744f5bbccbfee38fa0941fd746c427.tar.gz chromium_src-5fc08e35ad744f5bbccbfee38fa0941fd746c427.tar.bz2 |
Add support for late binding of sockets.
There are tests for this, but the code is not activated yet. The old behavior is maintained. I will follow up this patch with a change to enable an A/B test for this optimization. Credit to jar&mbelshe for the optimization idea.
BUG=13289
TEST=Covered by new unit tests. Not activatable yet.
Review URL: http://codereview.chromium.org/151190
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@20735 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/socket/client_socket_pool_base.cc | 154 | ||||
-rw-r--r-- | net/socket/client_socket_pool_base.h | 39 | ||||
-rw-r--r-- | net/socket/client_socket_pool_base_unittest.cc | 545 |
3 files changed, 694 insertions, 44 deletions
diff --git a/net/socket/client_socket_pool_base.cc b/net/socket/client_socket_pool_base.cc index 0afe53e..313e046 100644 --- a/net/socket/client_socket_pool_base.cc +++ b/net/socket/client_socket_pool_base.cc @@ -30,6 +30,8 @@ const int kIdleTimeout = 300; // 5 minutes. namespace net { +bool ClientSocketPoolBase::g_late_binding = false; + ConnectJob::ConnectJob(const std::string& group_name, const ClientSocketHandle* key_handle, Delegate* delegate) @@ -52,6 +54,8 @@ ClientSocketPoolBase::ClientSocketPoolBase( connect_job_factory_(connect_job_factory) {} ClientSocketPoolBase::~ClientSocketPoolBase() { + if (g_late_binding) + CancelAllConnectJobs(); // Clean up any idle sockets. Assert that we have no remaining active // sockets or pending requests. They should have all been cleaned up prior // to the manager being destroyed. @@ -98,7 +102,7 @@ int ClientSocketPoolBase::RequestSocket( DecrementIdleCount(); if (idle_socket.socket->IsConnectedAndIdle()) { // We found one we can reuse! - HandOutSocket(idle_socket.socket, true /* reuse */, handle, &group); + HandOutSocket(idle_socket.socket, idle_socket.used, handle, &group); return OK; } delete idle_socket.socket; @@ -116,9 +120,16 @@ int ClientSocketPoolBase::RequestSocket( HandOutSocket(connect_job->ReleaseSocket(), false /* not reused */, handle, &group); } else if (rv == ERR_IO_PENDING) { - group.connecting_requests[handle] = r; - CHECK(!ContainsKey(connect_job_map_, handle)); - connect_job_map_[handle] = connect_job.release(); + ConnectJob* job = connect_job.release(); + if (g_late_binding) { + CHECK(!ContainsKey(connect_job_map_, handle)); + InsertRequestIntoQueue(r, &group.pending_requests); + } else { + group.connecting_requests[handle] = r; + CHECK(!ContainsKey(connect_job_map_, handle)); + connect_job_map_[handle] = job; + } + group.jobs.insert(job); } else { if (group.IsEmpty()) group_map_.erase(group_name); @@ -142,14 +153,15 @@ void ClientSocketPoolBase::CancelRequest(const std::string& group_name, } } - // It's invalid to cancel a non-existent request. - CHECK(ContainsKey(group.connecting_requests, handle)); + if (!g_late_binding) { + // It's invalid to cancel a non-existent request. + CHECK(ContainsKey(group.connecting_requests, handle)); - RequestMap::iterator map_it = group.connecting_requests.find(handle); - if (map_it != group.connecting_requests.end()) { - RemoveConnectJob(handle); - group.connecting_requests.erase(map_it); - OnAvailableSocketSlot(group_name, &group); + RequestMap::iterator map_it = group.connecting_requests.find(handle); + if (map_it != group.connecting_requests.end()) { + RemoveConnectJob(handle, NULL, &group); + OnAvailableSocketSlot(group_name, &group); + } } } @@ -199,11 +211,20 @@ LoadState ClientSocketPoolBase::GetLoadState( // Search pending_requests for matching handle. RequestQueue::const_iterator it = group.pending_requests.begin(); - for (; it != group.pending_requests.end(); ++it) { + for (size_t i = 0; it != group.pending_requests.end(); ++it, ++i) { if (it->handle == handle) { - // TODO(wtc): Add a state for being on the wait list. - // See http://www.crbug.com/5077. - return LOAD_STATE_IDLE; + if (g_late_binding && i < group.jobs.size()) { + LoadState max_state = LOAD_STATE_IDLE; + for (ConnectJobSet::const_iterator job_it = group.jobs.begin(); + job_it != group.jobs.end(); ++job_it) { + max_state = std::max(max_state, (*job_it)->load_state()); + } + return max_state; + } else { + // TODO(wtc): Add a state for being on the wait list. + // See http://www.crbug.com/5077. + return LOAD_STATE_IDLE; + } } } @@ -215,7 +236,8 @@ bool ClientSocketPoolBase::IdleSocket::ShouldCleanup( base::TimeTicks now) const { bool timed_out = (now - start_time) >= base::TimeDelta::FromSeconds(kIdleTimeout); - return timed_out || !socket->IsConnectedAndIdle(); + return timed_out || + !(used ? socket->IsConnectedAndIdle() : socket->IsConnected()); } void ClientSocketPoolBase::CleanupIdleSockets(bool force) { @@ -274,12 +296,7 @@ void ClientSocketPoolBase::DoReleaseSocket(const std::string& group_name, const bool can_reuse = socket->IsConnectedAndIdle(); if (can_reuse) { - IdleSocket idle_socket; - idle_socket.socket = socket; - idle_socket.start_time = base::TimeTicks::Now(); - - group.idle_sockets.push_back(idle_socket); - IncrementIdleCount(); + AddIdleSocket(socket, true /* used socket */, &group); } else { delete socket; } @@ -294,17 +311,44 @@ void ClientSocketPoolBase::OnConnectJobComplete(int result, ConnectJob* job) { CHECK(group_it != group_map_.end()); Group& group = group_it->second; - RequestMap* request_map = &group.connecting_requests; + const ClientSocketHandle* const key_handle = job->key_handle(); + scoped_ptr<ClientSocket> socket(job->ReleaseSocket()); + + if (g_late_binding) { + RemoveConnectJob(key_handle, job, &group); + + if (result == OK) { + DCHECK(socket.get()); + if (!group.pending_requests.empty()) { + Request r = group.pending_requests.front(); + group.pending_requests.pop_front(); + HandOutSocket( + socket.release(), false /* unused socket */, r.handle, &group); + r.callback->Run(result); + } else { + AddIdleSocket(socket.release(), false /* unused socket */, &group); + OnAvailableSocketSlot(group_name, &group); + } + } else { + DCHECK(!socket.get()); + if (!group.pending_requests.empty()) { + Request r = group.pending_requests.front(); + group.pending_requests.pop_front(); + r.callback->Run(result); + } + MaybeOnAvailableSocketSlot(group_name); + } + + return; + } - RequestMap::iterator it = request_map->find(job->key_handle()); + RequestMap* request_map = &group.connecting_requests; + RequestMap::iterator it = request_map->find(key_handle); CHECK(it != request_map->end()); ClientSocketHandle* const handle = it->second.handle; CompletionCallback* const callback = it->second.callback; - request_map->erase(it); - DCHECK_EQ(handle, job->key_handle()); - scoped_ptr<ClientSocket> socket(job->ReleaseSocket()); - RemoveConnectJob(job->key_handle()); + RemoveConnectJob(key_handle, job, &group); if (result != OK) { DCHECK(!socket.get()); @@ -313,17 +357,34 @@ void ClientSocketPoolBase::OnConnectJobComplete(int result, ConnectJob* job) { // |group_map_| again. MaybeOnAvailableSocketSlot(group_name); } else { + DCHECK(socket.get()); HandOutSocket(socket.release(), false /* not reused */, handle, &group); callback->Run(result); } } +void ClientSocketPoolBase::EnableLateBindingOfSockets(bool enabled) { + g_late_binding = enabled; +} + void ClientSocketPoolBase::RemoveConnectJob( - const ClientSocketHandle* handle) { - ConnectJobMap::iterator it = connect_job_map_.find(handle); - CHECK(it != connect_job_map_.end()); - delete it->second; - connect_job_map_.erase(it); + const ClientSocketHandle* handle, ConnectJob *job, Group* group) { + if (g_late_binding) { + DCHECK(job); + delete job; + } else { + ConnectJobMap::iterator it = connect_job_map_.find(handle); + CHECK(it != connect_job_map_.end()); + job = it->second; + delete job; + connect_job_map_.erase(it); + group->connecting_requests.erase(handle); + } + + if (group) { + DCHECK(ContainsKey(group->jobs, job)); + group->jobs.erase(job); + } } void ClientSocketPoolBase::MaybeOnAvailableSocketSlot( @@ -377,4 +438,31 @@ void ClientSocketPoolBase::HandOutSocket( group->active_socket_count++; } +void ClientSocketPoolBase::AddIdleSocket( + ClientSocket* socket, bool used, Group* group) { + DCHECK(socket); + IdleSocket idle_socket; + idle_socket.socket = socket; + idle_socket.start_time = base::TimeTicks::Now(); + idle_socket.used = used; + + group->idle_sockets.push_back(idle_socket); + IncrementIdleCount(); +} + +void ClientSocketPoolBase::CancelAllConnectJobs() { + for (GroupMap::iterator i = group_map_.begin(); i != group_map_.end();) { + Group& group = i->second; + STLDeleteElements(&group.jobs); + + // Delete group if no longer needed. + if (group.IsEmpty()) { + CHECK(group.pending_requests.empty()); + group_map_.erase(i++); + } else { + ++i; + } + } +} + } // namespace net diff --git a/net/socket/client_socket_pool_base.h b/net/socket/client_socket_pool_base.h index 5476e1a..fc3582d 100644 --- a/net/socket/client_socket_pool_base.h +++ b/net/socket/client_socket_pool_base.h @@ -7,6 +7,7 @@ #include <deque> #include <map> +#include <set> #include <string> #include "base/basictypes.h" @@ -160,11 +161,21 @@ class ClientSocketPoolBase virtual void OnConnectJobComplete(int result, ConnectJob* job); + // Enables late binding of sockets. In this mode, socket requests are + // decoupled from socket connection jobs. A socket request may initiate a + // socket connection job, but there is no guarantee that that socket + // connection will service the request (for example, a released socket may + // service the request sooner, or a higher priority request may come in + // afterward and receive the socket from the job). + static void EnableLateBindingOfSockets(bool enabled); + private: // Entry for a persistent socket which became idle at time |start_time|. struct IdleSocket { + IdleSocket() : socket(NULL), used(false) {} ClientSocket* socket; base::TimeTicks start_time; + bool used; // Indicates whether or not the socket has been used yet. // An idle socket should be removed if it can't be reused, or has been idle // for too long. |now| is the current time value (TimeTicks::Now()). @@ -185,17 +196,16 @@ class ClientSocketPoolBase Group() : active_socket_count(0) {} bool IsEmpty() const { - return active_socket_count == 0 && idle_sockets.empty() && - connecting_requests.empty(); + return active_socket_count == 0 && idle_sockets.empty() && jobs.empty(); } bool HasAvailableSocketSlot(int max_sockets_per_group) const { - return active_socket_count + - static_cast<int>(connecting_requests.size()) < + return active_socket_count + static_cast<int>(jobs.size()) < max_sockets_per_group; } std::deque<IdleSocket> idle_sockets; + std::set<const ConnectJob*> jobs; RequestQueue pending_requests; RequestMap connecting_requests; int active_socket_count; // number of active sockets used by clients @@ -204,6 +214,7 @@ class ClientSocketPoolBase typedef std::map<std::string, Group> GroupMap; typedef std::map<const ClientSocketHandle*, ConnectJob*> ConnectJobMap; + typedef std::set<const ConnectJob*> ConnectJobSet; static void InsertRequestIntoQueue(const Request& r, RequestQueue* pending_requests); @@ -226,8 +237,12 @@ class ClientSocketPoolBase } // Removes the ConnectJob corresponding to |handle| from the - // |connect_job_map_|. - void RemoveConnectJob(const ClientSocketHandle* handle); + // |connect_job_map_| or |connect_job_set_| depending on whether or not late + // binding is enabled. |job| must be non-NULL when late binding is + // enabled. Also updates |group| if non-NULL. + void RemoveConnectJob(const ClientSocketHandle* handle, + ConnectJob* job, + Group* group); // Same as OnAvailableSocketSlot except it looks up the Group first to see if // it's there. @@ -245,6 +260,15 @@ class ClientSocketPoolBase ClientSocketHandle* handle, Group* group); + // Adds |socket| to the list of idle sockets for |group|. |used| indicates + // whether or not the socket has previously been used. + void AddIdleSocket(ClientSocket* socket, bool used, Group* group); + + // Iterates through |connect_job_map_|, canceling all ConnectJobs. + // Afterwards, it iterates through all groups and deletes them if they are no + // longer needed. + void CancelAllConnectJobs(); + GroupMap group_map_; ConnectJobMap connect_job_map_; @@ -261,6 +285,9 @@ class ClientSocketPoolBase const scoped_ptr<ConnectJobFactory> connect_job_factory_; + // Controls whether or not we use late binding of sockets. + static bool g_late_binding; + DISALLOW_COPY_AND_ASSIGN(ClientSocketPoolBase); }; diff --git a/net/socket/client_socket_pool_base_unittest.cc b/net/socket/client_socket_pool_base_unittest.cc index d3c7397..8874686 100644 --- a/net/socket/client_socket_pool_base_unittest.cc +++ b/net/socket/client_socket_pool_base_unittest.cc @@ -62,6 +62,8 @@ class MockClientSocket : public ClientSocket { DISALLOW_COPY_AND_ASSIGN(MockClientSocket); }; +class TestConnectJob; + class MockClientSocketFactory : public ClientSocketFactory { public: MockClientSocketFactory() : allocation_count_(0) {} @@ -79,10 +81,14 @@ class MockClientSocketFactory : public ClientSocketFactory { return NULL; } + void WaitForSignal(TestConnectJob* job) { waiting_jobs_.push_back(job); } + void SignalJobs(); + int allocation_count() const { return allocation_count_; } private: int allocation_count_; + std::vector<TestConnectJob*> waiting_jobs_; }; class TestSocketRequest : public CallbackRunner< Tuple1<int> > { @@ -120,13 +126,15 @@ class TestConnectJob : public ConnectJob { kMockFailingJob, kMockPendingJob, kMockPendingFailingJob, + kMockWaitingJob, + kMockAdvancingLoadStateJob, }; TestConnectJob(JobType job_type, const std::string& group_name, const ClientSocketPoolBase::Request& request, ConnectJob::Delegate* delegate, - ClientSocketFactory* client_socket_factory) + MockClientSocketFactory* client_socket_factory) : ConnectJob(group_name, request.handle, delegate), job_type_(job_type), client_socket_factory_(client_socket_factory), @@ -143,6 +151,7 @@ class TestConnectJob : public ConnectJob { case kMockFailingJob: return DoConnect(false /* error */, false /* sync */); case kMockPendingJob: + set_load_state(LOAD_STATE_CONNECTING); MessageLoop::current()->PostTask( FROM_HERE, method_factory_.NewRunnableMethod( @@ -151,6 +160,7 @@ class TestConnectJob : public ConnectJob { true /* async */)); return ERR_IO_PENDING; case kMockPendingFailingJob: + set_load_state(LOAD_STATE_CONNECTING); MessageLoop::current()->PostTask( FROM_HERE, method_factory_.NewRunnableMethod( @@ -158,12 +168,26 @@ class TestConnectJob : public ConnectJob { false /* error */, true /* async */)); return ERR_IO_PENDING; + case kMockWaitingJob: + client_socket_factory_->WaitForSignal(this); + waiting_success_ = true; + return ERR_IO_PENDING; + case kMockAdvancingLoadStateJob: + MessageLoop::current()->PostTask( + FROM_HERE, + method_factory_.NewRunnableMethod( + &TestConnectJob::AdvanceLoadState, load_state())); + return ERR_IO_PENDING; default: NOTREACHED(); return ERR_FAILED; } } + void Signal() { + DoConnect(waiting_success_, true /* async */); + } + private: int DoConnect(bool succeed, bool was_async) { int result = ERR_CONNECTION_FAILED; @@ -178,8 +202,22 @@ class TestConnectJob : public ConnectJob { return result; } + void AdvanceLoadState(LoadState state) { + int tmp = state; + tmp++; + state = static_cast<LoadState>(tmp); + set_load_state(state); + // Post a delayed task so RunAllPending() won't run it. + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + method_factory_.NewRunnableMethod(&TestConnectJob::AdvanceLoadState, + state), + 1 /* 1ms delay */); + } + + bool waiting_success_; const JobType job_type_; - ClientSocketFactory* const client_socket_factory_; + MockClientSocketFactory* const client_socket_factory_; ScopedRunnableMethodFactory<TestConnectJob> method_factory_; DISALLOW_COPY_AND_ASSIGN(TestConnectJob); @@ -187,7 +225,7 @@ class TestConnectJob : public ConnectJob { class TestConnectJobFactory : public ClientSocketPoolBase::ConnectJobFactory { public: - explicit TestConnectJobFactory(ClientSocketFactory* client_socket_factory) + explicit TestConnectJobFactory(MockClientSocketFactory* client_socket_factory) : job_type_(TestConnectJob::kMockJob), client_socket_factory_(client_socket_factory) {} @@ -210,7 +248,7 @@ class TestConnectJobFactory : public ClientSocketPoolBase::ConnectJobFactory { private: TestConnectJob::JobType job_type_; - ClientSocketFactory* const client_socket_factory_; + MockClientSocketFactory* const client_socket_factory_; DISALLOW_COPY_AND_ASSIGN(TestConnectJobFactory); }; @@ -270,6 +308,14 @@ class TestClientSocketPool : public ClientSocketPool { DISALLOW_COPY_AND_ASSIGN(TestClientSocketPool); }; +void MockClientSocketFactory::SignalJobs() { + for (std::vector<TestConnectJob*>::iterator it = waiting_jobs_.begin(); + it != waiting_jobs_.end(); ++it) { + (*it)->Signal(); + } + waiting_jobs_.clear(); +} + class ClientSocketPoolBaseTest : public testing::Test { protected: ClientSocketPoolBaseTest() @@ -291,6 +337,10 @@ class ClientSocketPoolBaseTest : public testing::Test { // The tests often call Reset() on handles at the end which may post // DoReleaseSocket() tasks. MessageLoop::current()->RunAllPending(); + // Need to delete |pool_| before we turn late binding back off. + // TODO(willchan): Remove this line when late binding becomes the default. + pool_ = NULL; + ClientSocketPoolBase::EnableLateBindingOfSockets(false); } int StartRequest(const std::string& group_name, int priority) { @@ -356,7 +406,7 @@ const int ClientSocketPoolBaseTest::kIndexOutOfBounds = -1; // static const int ClientSocketPoolBaseTest::kRequestNotFound = -2; -TEST_F(ClientSocketPoolBaseTest, Basic) { +TEST_F(ClientSocketPoolBaseTest, BasicSynchronous) { CreatePool(kDefaultMaxSocketsPerGroup); TestCompletionCallback callback; @@ -368,6 +418,20 @@ TEST_F(ClientSocketPoolBaseTest, Basic) { handle.Reset(); } +TEST_F(ClientSocketPoolBaseTest, BasicAsynchronous) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + TestSocketRequest req(pool_.get(), &request_order_); + int rv = req.handle.Init("a", ignored_request_info_, 0, &req); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(LOAD_STATE_CONNECTING, pool_->GetLoadState("a", &req.handle)); + EXPECT_EQ(OK, req.WaitForResult()); + EXPECT_TRUE(req.handle.is_initialized()); + EXPECT_TRUE(req.handle.socket()); + req.handle.Reset(); +} + TEST_F(ClientSocketPoolBaseTest, InitConnectionFailure) { CreatePool(kDefaultMaxSocketsPerGroup); @@ -378,6 +442,17 @@ TEST_F(ClientSocketPoolBaseTest, InitConnectionFailure) { kDefaultPriority, &req)); } +TEST_F(ClientSocketPoolBaseTest, InitConnectionAsynchronousFailure) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingFailingJob); + TestSocketRequest req(pool_.get(), &request_order_); + EXPECT_EQ(ERR_IO_PENDING, + req.handle.Init("a", ignored_request_info_, 5, &req)); + EXPECT_EQ(LOAD_STATE_CONNECTING, pool_->GetLoadState("a", &req.handle)); + EXPECT_EQ(ERR_CONNECTION_FAILED, req.WaitForResult()); +} + TEST_F(ClientSocketPoolBaseTest, PendingRequests) { CreatePool(kDefaultMaxSocketsPerGroup); @@ -647,6 +722,28 @@ TEST_F(ClientSocketPoolBaseTest, FailingActiveRequestWithPendingRequests) { EXPECT_EQ(ERR_CONNECTION_FAILED, reqs[i]->WaitForResult()); } +TEST_F(ClientSocketPoolBaseTest, CancelActiveRequestThenRequestSocket) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + + TestSocketRequest req(pool_.get(), &request_order_); + int rv = req.handle.Init( + "a", ignored_request_info_, kDefaultPriority, &req); + EXPECT_EQ(ERR_IO_PENDING, rv); + + // Cancel the active request. + req.handle.Reset(); + + rv = req.handle.Init("a", ignored_request_info_, kDefaultPriority, &req); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(OK, req.WaitForResult()); + + EXPECT_FALSE(req.handle.is_reused()); + EXPECT_EQ(1U, TestSocketRequest::completion_count); + EXPECT_EQ(2, client_socket_factory_.allocation_count()); +} + // A pending asynchronous job completes, which will free up a socket slot. The // next job finishes synchronously. The callback for the asynchronous job // should be first though. @@ -689,6 +786,444 @@ TEST_F(ClientSocketPoolBaseTest, PendingJobCompletionOrder) { EXPECT_EQ(&req3, request_order_[1]); } +// When a ConnectJob is coupled to a request, even if a free socket becomes +// available, the request will be serviced by the ConnectJob. +TEST_F(ClientSocketPoolBaseTest, ReleaseSockets) { + CreatePool(kDefaultMaxSocketsPerGroup); + ClientSocketPoolBase::EnableLateBindingOfSockets(false); + + // Start job 1 (async OK) + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + + TestSocketRequest req1(pool_.get(), &request_order_); + int rv = req1.handle.Init("a", ignored_request_info_, 5, &req1); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(OK, req1.WaitForResult()); + + // Job 1 finished OK. Start job 2 (also async OK). Release socket 1. + connect_job_factory_->set_job_type(TestConnectJob::kMockWaitingJob); + + TestSocketRequest req2(pool_.get(), &request_order_); + rv = req2.handle.Init("a", ignored_request_info_, 5, &req2); + EXPECT_EQ(ERR_IO_PENDING, rv); + req1.handle.Reset(); + MessageLoop::current()->RunAllPending(); // Run the DoReleaseSocket() + + // Job 2 is pending. Start request 3 (which has no associated job since it + // will use the idle socket). + + TestSocketRequest req3(pool_.get(), &request_order_); + rv = req3.handle.Init("a", ignored_request_info_, 5, &req3); + EXPECT_EQ(OK, rv); + + EXPECT_FALSE(req2.handle.socket()); + client_socket_factory_.SignalJobs(); + EXPECT_EQ(OK, req2.WaitForResult()); + + ASSERT_EQ(2U, request_order_.size()); + EXPECT_EQ(&req1, request_order_[0]); + EXPECT_EQ(&req2, request_order_[1]); + EXPECT_EQ(0, pool_->IdleSocketCountInGroup("a")); +} + +class ClientSocketPoolBaseTest_LateBinding : public ClientSocketPoolBaseTest { + protected: + virtual void SetUp() { + ClientSocketPoolBaseTest::SetUp(); + ClientSocketPoolBase::EnableLateBindingOfSockets(true); + } +}; + +TEST_F(ClientSocketPoolBaseTest_LateBinding, BasicSynchronous) { + CreatePool(kDefaultMaxSocketsPerGroup); + + TestCompletionCallback callback; + ClientSocketHandle handle(pool_.get()); + EXPECT_EQ(OK, handle.Init("a", ignored_request_info_, kDefaultPriority, + &callback)); + EXPECT_TRUE(handle.is_initialized()); + EXPECT_TRUE(handle.socket()); + handle.Reset(); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, BasicAsynchronous) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + TestSocketRequest req(pool_.get(), &request_order_); + int rv = req.handle.Init("a", ignored_request_info_, 0, &req); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(LOAD_STATE_CONNECTING, pool_->GetLoadState("a", &req.handle)); + EXPECT_EQ(OK, req.WaitForResult()); + EXPECT_TRUE(req.handle.is_initialized()); + EXPECT_TRUE(req.handle.socket()); + req.handle.Reset(); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, InitConnectionFailure) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockFailingJob); + TestSocketRequest req(pool_.get(), &request_order_); + EXPECT_EQ(ERR_CONNECTION_FAILED, + req.handle.Init("a", ignored_request_info_, + kDefaultPriority, &req)); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, + InitConnectionAsynchronousFailure) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingFailingJob); + TestSocketRequest req(pool_.get(), &request_order_); + EXPECT_EQ(ERR_IO_PENDING, + req.handle.Init("a", ignored_request_info_, 5, &req)); + EXPECT_EQ(LOAD_STATE_CONNECTING, pool_->GetLoadState("a", &req.handle)); + EXPECT_EQ(ERR_CONNECTION_FAILED, req.WaitForResult()); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, PendingRequests) { + CreatePool(kDefaultMaxSocketsPerGroup); + + EXPECT_EQ(OK, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(OK, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 1)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 3)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 4)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 2)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 1)); + + ReleaseAllConnections(KEEP_ALIVE); + + EXPECT_EQ(kDefaultMaxSocketsPerGroup, + client_socket_factory_.allocation_count()); + EXPECT_EQ(requests_.size() - kDefaultMaxSocketsPerGroup, + TestSocketRequest::completion_count); + + EXPECT_EQ(1, GetOrderOfRequest(1)); + EXPECT_EQ(2, GetOrderOfRequest(2)); + EXPECT_EQ(6, GetOrderOfRequest(3)); + EXPECT_EQ(4, GetOrderOfRequest(4)); + EXPECT_EQ(3, GetOrderOfRequest(5)); + EXPECT_EQ(5, GetOrderOfRequest(6)); + EXPECT_EQ(7, GetOrderOfRequest(7)); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, PendingRequests_NoKeepAlive) { + CreatePool(kDefaultMaxSocketsPerGroup); + + EXPECT_EQ(OK, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(OK, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 1)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 3)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 4)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 2)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 1)); + + ReleaseAllConnections(NO_KEEP_ALIVE); + + for (size_t i = kDefaultMaxSocketsPerGroup; i < requests_.size(); ++i) + EXPECT_EQ(OK, requests_[i]->WaitForResult()); + + EXPECT_EQ(static_cast<int>(requests_.size()), + client_socket_factory_.allocation_count()); + EXPECT_EQ(requests_.size() - kDefaultMaxSocketsPerGroup, + TestSocketRequest::completion_count); +} + +// This test will start up a RequestSocket() and then immediately Cancel() it. +// The pending connect job will be cancelled and should not call back into +// ClientSocketPoolBase. +TEST_F(ClientSocketPoolBaseTest_LateBinding, CancelRequestClearGroup) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + TestSocketRequest req(pool_.get(), &request_order_); + EXPECT_EQ(ERR_IO_PENDING, + req.handle.Init("a", ignored_request_info_, + kDefaultPriority, &req)); + req.handle.Reset(); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, TwoRequestsCancelOne) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + TestSocketRequest req(pool_.get(), &request_order_); + TestSocketRequest req2(pool_.get(), &request_order_); + + EXPECT_EQ(ERR_IO_PENDING, + req.handle.Init("a", ignored_request_info_, + kDefaultPriority, &req)); + EXPECT_EQ(ERR_IO_PENDING, + req2.handle.Init("a", ignored_request_info_, + kDefaultPriority, &req2)); + + req.handle.Reset(); + + EXPECT_EQ(OK, req2.WaitForResult()); + req2.handle.Reset(); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, ConnectCancelConnect) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + ClientSocketHandle handle(pool_.get()); + TestCompletionCallback callback; + TestSocketRequest req(pool_.get(), &request_order_); + + EXPECT_EQ(ERR_IO_PENDING, + handle.Init("a", ignored_request_info_, + kDefaultPriority, &callback)); + + handle.Reset(); + + TestCompletionCallback callback2; + EXPECT_EQ(ERR_IO_PENDING, + handle.Init("a", ignored_request_info_, + kDefaultPriority, &callback2)); + + EXPECT_EQ(OK, callback2.WaitForResult()); + EXPECT_FALSE(callback.have_result()); + + handle.Reset(); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, CancelRequest) { + CreatePool(kDefaultMaxSocketsPerGroup); + + EXPECT_EQ(OK, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(OK, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 1)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 3)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 4)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 2)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", 1)); + + // Cancel a request. + size_t index_to_cancel = kDefaultMaxSocketsPerGroup + 2; + EXPECT_FALSE(requests_[index_to_cancel]->handle.is_initialized()); + requests_[index_to_cancel]->handle.Reset(); + + ReleaseAllConnections(KEEP_ALIVE); + + EXPECT_EQ(kDefaultMaxSocketsPerGroup, + client_socket_factory_.allocation_count()); + EXPECT_EQ(requests_.size() - kDefaultMaxSocketsPerGroup - 1, + TestSocketRequest::completion_count); + + EXPECT_EQ(1, GetOrderOfRequest(1)); + EXPECT_EQ(2, GetOrderOfRequest(2)); + EXPECT_EQ(5, GetOrderOfRequest(3)); + EXPECT_EQ(3, GetOrderOfRequest(4)); + EXPECT_EQ(kRequestNotFound, GetOrderOfRequest(5)); // Canceled request. + EXPECT_EQ(4, GetOrderOfRequest(6)); + EXPECT_EQ(6, GetOrderOfRequest(7)); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, RequestPendingJobTwice) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + ClientSocketHandle handle(pool_.get()); + RequestSocketCallback callback( + &handle, connect_job_factory_, TestConnectJob::kMockPendingJob); + int rv = handle.Init( + "a", ignored_request_info_, kDefaultPriority, &callback); + ASSERT_EQ(ERR_IO_PENDING, rv); + + EXPECT_EQ(OK, callback.WaitForResult()); + handle.Reset(); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, RequestPendingJobThenSynchronous) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + ClientSocketHandle handle(pool_.get()); + RequestSocketCallback callback( + &handle, connect_job_factory_, TestConnectJob::kMockJob); + int rv = handle.Init( + "a", ignored_request_info_, kDefaultPriority, &callback); + ASSERT_EQ(ERR_IO_PENDING, rv); + + EXPECT_EQ(OK, callback.WaitForResult()); + handle.Reset(); +} + +// Make sure that pending requests get serviced after active requests get +// cancelled. +TEST_F(ClientSocketPoolBaseTest_LateBinding, + CancelActiveRequestWithPendingRequests) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + EXPECT_EQ(ERR_IO_PENDING, StartRequest("a", kDefaultPriority)); + + // Now, kDefaultMaxSocketsPerGroup requests should be active. + // Let's cancel them. + for (int i = 0; i < kDefaultMaxSocketsPerGroup; ++i) { + ASSERT_FALSE(requests_[i]->handle.is_initialized()); + requests_[i]->handle.Reset(); + } + + // Let's wait for the rest to complete now. + for (size_t i = kDefaultMaxSocketsPerGroup; i < requests_.size(); ++i) { + EXPECT_EQ(OK, requests_[i]->WaitForResult()); + requests_[i]->handle.Reset(); + } + + EXPECT_EQ(requests_.size() - kDefaultMaxSocketsPerGroup, + TestSocketRequest::completion_count); +} + +// Make sure that pending requests get serviced after active requests fail. +TEST_F(ClientSocketPoolBaseTest_LateBinding, + FailingActiveRequestWithPendingRequests) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingFailingJob); + + scoped_ptr<TestSocketRequest> reqs[kDefaultMaxSocketsPerGroup * 2 + 1]; + + // Queue up all the requests + for (size_t i = 0; i < arraysize(reqs); ++i) { + reqs[i].reset(new TestSocketRequest(pool_.get(), &request_order_)); + int rv = reqs[i]->handle.Init("a", ignored_request_info_, + kDefaultPriority, reqs[i].get()); + EXPECT_EQ(ERR_IO_PENDING, rv); + } + + for (size_t i = 0; i < arraysize(reqs); ++i) + EXPECT_EQ(ERR_CONNECTION_FAILED, reqs[i]->WaitForResult()); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, + CancelActiveRequestThenRequestSocket) { + CreatePool(kDefaultMaxSocketsPerGroup); + + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + + TestSocketRequest req(pool_.get(), &request_order_); + int rv = req.handle.Init( + "a", ignored_request_info_, kDefaultPriority, &req); + EXPECT_EQ(ERR_IO_PENDING, rv); + + // Cancel the active request. + req.handle.Reset(); + + rv = req.handle.Init("a", ignored_request_info_, kDefaultPriority, &req); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(OK, req.WaitForResult()); + + EXPECT_FALSE(req.handle.is_reused()); + EXPECT_EQ(1U, TestSocketRequest::completion_count); + EXPECT_EQ(2, client_socket_factory_.allocation_count()); +} + +// When requests and ConnectJobs are not coupled, the request will get serviced +// by whatever comes first. +TEST_F(ClientSocketPoolBaseTest_LateBinding, ReleaseSockets) { + CreatePool(kDefaultMaxSocketsPerGroup); + + // Start job 1 (async OK) + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingJob); + + TestSocketRequest req1(pool_.get(), &request_order_); + int rv = req1.handle.Init("a", ignored_request_info_, 5, &req1); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(OK, req1.WaitForResult()); + + // Job 1 finished OK. Start job 2 (also async OK). Request 3 is pending + // without a job. + connect_job_factory_->set_job_type(TestConnectJob::kMockWaitingJob); + + TestSocketRequest req2(pool_.get(), &request_order_); + rv = req2.handle.Init("a", ignored_request_info_, 5, &req2); + EXPECT_EQ(ERR_IO_PENDING, rv); + TestSocketRequest req3(pool_.get(), &request_order_); + rv = req3.handle.Init("a", ignored_request_info_, 5, &req3); + EXPECT_EQ(ERR_IO_PENDING, rv); + + // Both Requests 2 and 3 are pending. We release socket 1 which should + // service request 2. Request 3 should still be waiting. + req1.handle.Reset(); + MessageLoop::current()->RunAllPending(); // Run the DoReleaseSocket() + ASSERT_TRUE(req2.handle.socket()); + EXPECT_EQ(OK, req2.WaitForResult()); + EXPECT_FALSE(req3.handle.socket()); + + // Signal job 2, which should service request 3. + + client_socket_factory_.SignalJobs(); + EXPECT_EQ(OK, req3.WaitForResult()); + + ASSERT_EQ(3U, request_order_.size()); + EXPECT_EQ(&req1, request_order_[0]); + EXPECT_EQ(&req2, request_order_[1]); + EXPECT_EQ(&req3, request_order_[2]); + EXPECT_EQ(0, pool_->IdleSocketCountInGroup("a")); +} + +// The requests are not coupled to the jobs. So, the requests should finish in +// their priority / insertion order. +TEST_F(ClientSocketPoolBaseTest_LateBinding, PendingJobCompletionOrder) { + CreatePool(kDefaultMaxSocketsPerGroup); + // First two jobs are async. + connect_job_factory_->set_job_type(TestConnectJob::kMockPendingFailingJob); + + TestSocketRequest req1(pool_.get(), &request_order_); + int rv = req1.handle.Init("a", ignored_request_info_, 5, &req1); + EXPECT_EQ(ERR_IO_PENDING, rv); + + TestSocketRequest req2(pool_.get(), &request_order_); + rv = req2.handle.Init("a", ignored_request_info_, 5, &req2); + EXPECT_EQ(ERR_IO_PENDING, rv); + + // The pending job is sync. + connect_job_factory_->set_job_type(TestConnectJob::kMockJob); + + TestSocketRequest req3(pool_.get(), &request_order_); + rv = req3.handle.Init("a", ignored_request_info_, 5, &req3); + EXPECT_EQ(ERR_IO_PENDING, rv); + + EXPECT_EQ(ERR_CONNECTION_FAILED, req1.WaitForResult()); + EXPECT_EQ(OK, req2.WaitForResult()); + EXPECT_EQ(ERR_CONNECTION_FAILED, req3.WaitForResult()); + + ASSERT_EQ(3U, request_order_.size()); + EXPECT_EQ(&req1, request_order_[0]); + EXPECT_EQ(&req2, request_order_[1]); + EXPECT_EQ(&req3, request_order_[2]); +} + +TEST_F(ClientSocketPoolBaseTest_LateBinding, LoadState) { + CreatePool(kDefaultMaxSocketsPerGroup); + connect_job_factory_->set_job_type( + TestConnectJob::kMockAdvancingLoadStateJob); + + TestSocketRequest req1(pool_.get(), &request_order_); + int rv = req1.handle.Init("a", ignored_request_info_, 5, &req1); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(LOAD_STATE_IDLE, req1.handle.GetLoadState()); + + MessageLoop::current()->RunAllPending(); + + TestSocketRequest req2(pool_.get(), &request_order_); + rv = req2.handle.Init("a", ignored_request_info_, 5, &req2); + EXPECT_EQ(ERR_IO_PENDING, rv); + EXPECT_EQ(LOAD_STATE_WAITING_FOR_CACHE, req1.handle.GetLoadState()); + EXPECT_EQ(LOAD_STATE_WAITING_FOR_CACHE, req2.handle.GetLoadState()); +} + } // namespace } // namespace net |