diff options
Diffstat (limited to 'base/threading/worker_pool_posix_unittest.cc')
-rw-r--r-- | base/threading/worker_pool_posix_unittest.cc | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc new file mode 100644 index 0000000..48df16e --- /dev/null +++ b/base/threading/worker_pool_posix_unittest.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include <set> + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/task.h" +#include "base/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Peer class to provide passthrough access to PosixDynamicThreadPool internals. +class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { + public: + explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool) + : pool_(pool) {} + + Lock* lock() { return &pool_->lock_; } + ConditionVariable* tasks_available_cv() { + return &pool_->tasks_available_cv_; + } + const std::queue<Task*>& tasks() const { return pool_->tasks_; } + int num_idle_threads() const { return pool_->num_idle_threads_; } + ConditionVariable* num_idle_threads_cv() { + return pool_->num_idle_threads_cv_.get(); + } + void set_num_idle_threads_cv(ConditionVariable* cv) { + pool_->num_idle_threads_cv_.reset(cv); + } + + private: + PosixDynamicThreadPool* pool_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer); +}; + +namespace { + +// IncrementingTask's main purpose is to increment a counter. It also updates a +// set of unique thread ids, and signals a ConditionVariable on completion. +// Note that since it does not block, there is no way to control the number of +// threads used if more than one IncrementingTask is consecutively posted to the +// thread pool, since the first one might finish executing before the subsequent +// PostTask() calls get invoked. +class IncrementingTask : public Task { + public: + IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads) + : counter_lock_(counter_lock), + unique_threads_lock_(unique_threads_lock), + unique_threads_(unique_threads), + counter_(counter) {} + + virtual void Run() { + AddSelfToUniqueThreadSet(); + AutoLock locked(*counter_lock_); + (*counter_)++; + } + + void AddSelfToUniqueThreadSet() { + AutoLock locked(*unique_threads_lock_); + unique_threads_->insert(PlatformThread::CurrentId()); + } + + private: + Lock* counter_lock_; + Lock* unique_threads_lock_; + std::set<PlatformThreadId>* unique_threads_; + int* counter_; + + DISALLOW_COPY_AND_ASSIGN(IncrementingTask); +}; + +// BlockingIncrementingTask is a simple wrapper around IncrementingTask that +// allows for waiting at the start of Run() for a WaitableEvent to be signalled. +class BlockingIncrementingTask : public Task { + public: + BlockingIncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads, + Lock* num_waiting_to_start_lock, + int* num_waiting_to_start, + ConditionVariable* num_waiting_to_start_cv, + base::WaitableEvent* start) + : incrementer_( + counter_lock, counter, unique_threads_lock, unique_threads), + num_waiting_to_start_lock_(num_waiting_to_start_lock), + num_waiting_to_start_(num_waiting_to_start), + num_waiting_to_start_cv_(num_waiting_to_start_cv), + start_(start) {} + + virtual void Run() { + { + AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); + (*num_waiting_to_start_)++; + } + num_waiting_to_start_cv_->Signal(); + CHECK(start_->Wait()); + incrementer_.Run(); + } + + private: + IncrementingTask incrementer_; + Lock* num_waiting_to_start_lock_; + int* num_waiting_to_start_; + ConditionVariable* num_waiting_to_start_cv_; + base::WaitableEvent* start_; + + DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); +}; + +class PosixDynamicThreadPoolTest : public testing::Test { + protected: + PosixDynamicThreadPoolTest() + : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), + peer_(pool_.get()), + counter_(0), + num_waiting_to_start_(0), + num_waiting_to_start_cv_(&num_waiting_to_start_lock_), + start_(true, false) {} + + virtual void SetUp() { + peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); + } + + virtual void TearDown() { + // Wake up the idle threads so they can terminate. + if (pool_.get()) pool_->Terminate(); + } + + void WaitForTasksToStart(int num_tasks) { + AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); + while (num_waiting_to_start_ < num_tasks) { + num_waiting_to_start_cv_.Wait(); + } + } + + void WaitForIdleThreads(int num_idle_threads) { + AutoLock pool_locked(*peer_.lock()); + while (peer_.num_idle_threads() < num_idle_threads) { + peer_.num_idle_threads_cv()->Wait(); + } + } + + Task* CreateNewIncrementingTask() { + return new IncrementingTask(&counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); + } + + Task* CreateNewBlockingIncrementingTask() { + return new BlockingIncrementingTask( + &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, + &num_waiting_to_start_lock_, &num_waiting_to_start_, + &num_waiting_to_start_cv_, &start_); + } + + scoped_refptr<base::PosixDynamicThreadPool> pool_; + base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; + Lock counter_lock_; + int counter_; + Lock unique_threads_lock_; + std::set<PlatformThreadId> unique_threads_; + Lock num_waiting_to_start_lock_; + int num_waiting_to_start_; + ConditionVariable num_waiting_to_start_cv_; + base::WaitableEvent start_; +}; + +} // namespace + +TEST_F(PosixDynamicThreadPoolTest, Basic) { + EXPECT_EQ(0, peer_.num_idle_threads()); + EXPECT_EQ(0U, unique_threads_.size()); + EXPECT_EQ(0U, peer_.tasks().size()); + + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + EXPECT_EQ(1U, unique_threads_.size()) << + "There should be only one thread allocated for one task."; + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(1, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add another 2 tasks. One should reuse the existing worker thread. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(3, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { + // Add two blocking tasks. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; + EXPECT_EQ(2, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, Complex) { + // Add two non blocking tasks and wait for them to finish. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add two blocking tasks, start them simultaneously, and wait for them to + // finish. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(3, counter_); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, unique_threads_.size()); + + // Wake up all idle threads so they can exit. + { + AutoLock locked(*peer_.lock()); + while (peer_.num_idle_threads() > 0) { + peer_.tasks_available_cv()->Signal(); + peer_.num_idle_threads_cv()->Wait(); + } + } + + // Add another non blocking task. There are no threads to reuse. + pool_->PostTask(CreateNewIncrementingTask()); + WaitForIdleThreads(1); + + EXPECT_EQ(3U, unique_threads_.size()); + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(4, counter_); +} + +} // namespace base |