diff options
author | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-24 23:41:53 +0000 |
---|---|---|
committer | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-24 23:41:53 +0000 |
commit | 250103f6249e1f19ab14ff69f4356fee56833af8 (patch) | |
tree | e009911c7f0f087b77ff04cfd524a794d6fd2c9e | |
parent | 98733819a022a57f401f1b6cf71b0764451bb8ab (diff) | |
download | chromium_src-250103f6249e1f19ab14ff69f4356fee56833af8.zip chromium_src-250103f6249e1f19ab14ff69f4356fee56833af8.tar.gz chromium_src-250103f6249e1f19ab14ff69f4356fee56833af8.tar.bz2 |
Added a thread pool to WorkerPool for Linux that dynamically adds threads as needed.
Review URL: http://codereview.chromium.org/39102
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12414 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/base.gyp | 2 | ||||
-rw-r--r-- | base/base.scons | 1 | ||||
-rw-r--r-- | base/base_unittests.scons | 2 | ||||
-rw-r--r-- | base/worker_pool_linux.cc | 162 | ||||
-rw-r--r-- | base/worker_pool_linux.h | 88 | ||||
-rw-r--r-- | base/worker_pool_linux_unittest.cc | 268 |
6 files changed, 504 insertions, 19 deletions
diff --git a/base/base.gyp b/base/base.gyp index 73b9d9c..4b4b417 100644 --- a/base/base.gyp +++ b/base/base.gyp @@ -303,6 +303,7 @@ 'word_iterator.h', 'worker_pool.h', 'worker_pool_linux.cc', + 'worker_pool_linux.h', 'worker_pool_mac.mm', 'worker_pool_win.cc', ], @@ -602,6 +603,7 @@ # Linux has an implementation of idle_timer, but it's unclear # if we want it yet, so leave it 'unported' for now. 'idletimer_unittest.cc', + 'worker_pool_linux_unittest.cc', ], 'dependencies': [ '../build/linux/system.gyp:gtk', diff --git a/base/base.scons b/base/base.scons index e7d72a3..44d2730 100644 --- a/base/base.scons +++ b/base/base.scons @@ -402,6 +402,7 @@ if env.Bit('linux'): 'sys_string_conversions_linux.cc', 'test_file_util_linux.cc', 'worker_pool_linux.cc', + 'worker_pool_linux.h', ]) linux_version = env.Command('$BASE_DIR/file_version_info_linux.h', diff --git a/base/base_unittests.scons b/base/base_unittests.scons index 08709f1..c4cb2f7 100644 --- a/base/base_unittests.scons +++ b/base/base_unittests.scons @@ -119,6 +119,7 @@ input_files = ChromeFileList([ 'win_util_unittest.cc', 'wmi_util_unittest.cc', 'word_iterator_unittest.cc', + 'worker_pool_unittest.cc' ]), MSVSFilter('gfx_tests', [ @@ -141,6 +142,7 @@ if env.Bit('posix'): if env.Bit('linux'): input_files.Append( 'data_pack_unittest.cc', + 'worker_pool_linux_unittest.cc' ) if env.Bit('mac'): diff --git a/base/worker_pool_linux.cc b/base/worker_pool_linux.cc index 95023f7..a650b3b 100644 --- a/base/worker_pool_linux.cc +++ b/base/worker_pool_linux.cc @@ -3,41 +3,165 @@ // found in the LICENSE file. #include "base/worker_pool.h" +#include "base/worker_pool_linux.h" +#include "base/lazy_instance.h" #include "base/logging.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" +#include "base/string_util.h" #include "base/task.h" namespace { -void* PThreadCallback(void* param) { - Task* task = static_cast<Task*>(param); - task->Run(); - delete task; - return 0; +const int kIdleSecondsBeforeExit = 10 * 60; +const int kWorkerThreadStackSize = 64 * 1024; + +class WorkerPoolImpl { + public: + WorkerPoolImpl(); + ~WorkerPoolImpl(); + + void PostTask(const tracked_objects::Location& from_here, Task* task, + bool task_is_slow); + + private: + scoped_refptr<base::LinuxDynamicThreadPool> pool_; +}; + +WorkerPoolImpl::WorkerPoolImpl() + : pool_(new base::LinuxDynamicThreadPool( + "WorkerPool", kIdleSecondsBeforeExit)) {} + +WorkerPoolImpl::~WorkerPoolImpl() { + pool_->Terminate(); +} + +void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + task->SetBirthPlace(from_here); + pool_->PostTask(task); +} + +base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); + +class WorkerThread : public PlatformThread::Delegate { + public: + WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, + base::LinuxDynamicThreadPool* pool) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + pool_(pool) {} + + virtual void ThreadMain(); + + private: + const std::string name_prefix_; + const int idle_seconds_before_exit_; + scoped_refptr<base::LinuxDynamicThreadPool> pool_; + + DISALLOW_COPY_AND_ASSIGN(WorkerThread); +}; + +void WorkerThread::ThreadMain() { + const std::string name = + StringPrintf("%s/%d", name_prefix_.c_str(), + IntToString(PlatformThread::CurrentId()).c_str()); + PlatformThread::SetName(name.c_str()); + + for (;;) { + Task* task = pool_->WaitForTask(); + if (!task) + break; + task->Run(); + delete task; + } + + // The WorkerThread is non-joinable, so it deletes itself. + delete this; } } // namespace bool WorkerPool::PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); + g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); + return true; +} - pthread_t thread; - pthread_attr_t attr; +namespace base { - // POSIX does not have a worker thread pool implementation. For now we just - // create a thread for each task, and ignore |task_is_slow|. - // TODO(dsh): Implement thread reuse. - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); +LinuxDynamicThreadPool::LinuxDynamicThreadPool( + const std::string& name_prefix, + int idle_seconds_before_exit) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + tasks_available_cv_(&lock_), + num_idle_threads_(0), + terminated_(false), + num_idle_threads_cv_(NULL) {} - int err = pthread_create(&thread, &attr, PThreadCallback, task); - pthread_attr_destroy(&attr); - if (err) { - DLOG(ERROR) << "pthread_create failed: " << err; +LinuxDynamicThreadPool::~LinuxDynamicThreadPool() { + while (!tasks_.empty()) { + Task* task = tasks_.front(); + tasks_.pop(); delete task; - return false; } +} - return true; +void LinuxDynamicThreadPool::Terminate() { + { + AutoLock locked(lock_); + DCHECK(!terminated_) << "Thread pool is already terminated."; + terminated_ = true; + } + tasks_available_cv_.Broadcast(); +} + +void LinuxDynamicThreadPool::PostTask(Task* task) { + AutoLock locked(lock_); + DCHECK(!terminated_) << + "This thread pool is already terminated. Do not post new tasks."; + + tasks_.push(task); + + // We have enough worker threads. + if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { + tasks_available_cv_.Signal(); + } else { + // The new PlatformThread will take ownership of the WorkerThread object, + // which will delete itself on exit. + WorkerThread* worker = + new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); + PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); + } } + +Task* LinuxDynamicThreadPool::WaitForTask() { + AutoLock locked(lock_); + + if (terminated_) + return NULL; + + if (tasks_.empty()) { // No work available, wait for work. + num_idle_threads_++; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + tasks_available_cv_.TimedWait( + TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); + num_idle_threads_--; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + if (tasks_.empty()) { + // We waited for work, but there's still no work. Return NULL to signal + // the thread to terminate. + return NULL; + } + } + + Task* task = tasks_.front(); + tasks_.pop(); + return task; +} + +} // namespace base diff --git a/base/worker_pool_linux.h b/base/worker_pool_linux.h new file mode 100644 index 0000000..a9cd894 --- /dev/null +++ b/base/worker_pool_linux.h @@ -0,0 +1,88 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// The thread pool used in the Linux implementation of WorkerPool dynamically +// adds threads as necessary to handle all tasks. It keeps old threads around +// for a period of time to allow them to be reused. After this waiting period, +// the threads exit. This thread pool uses non-joinable threads, therefore +// worker threads are not joined during process shutdown. This means that +// potentially long running tasks (such as DNS lookup) do not block process +// shutdown, but also means that process shutdown may "leak" objects. Note that +// although LinuxDynamicThreadPool spawns the worker threads and manages the +// task queue, it does not own the worker threads. The worker threads ask the +// LinuxDynamicThreadPool for work and eventually clean themselves up. The +// worker threads all maintain scoped_refptrs to the LinuxDynamicThreadPool +// instance, which prevents LinuxDynamicThreadPool from disappearing before all +// worker threads exit. The owner of LinuxDynamicThreadPool should likewise +// maintain a scoped_refptr to the LinuxDynamicThreadPool instance. +// +// NOTE: The classes defined in this file are only meant for use by the Linux +// implementation of WorkerPool. No one else should be using these classes. +// These symbols are exported in a header purely for testing purposes. + +#ifndef BASE_WORKER_POOL_LINUX_H_ +#define BASE_WORKER_POOL_LINUX_H_ + +#include <queue> +#include <string> + +#include "base/basictypes.h" +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" + +class Task; + +namespace base { + +class LinuxDynamicThreadPool + : public RefCountedThreadSafe<LinuxDynamicThreadPool> { + public: + class LinuxDynamicThreadPoolPeer; + + // All worker threads will share the same |name_prefix|. They will exit after + // |idle_seconds_before_exit|. + LinuxDynamicThreadPool(const std::string& name_prefix, + int idle_seconds_before_exit); + ~LinuxDynamicThreadPool(); + + // Indicates that the thread pool is going away. Stops handing out tasks to + // worker threads. Wakes up all the idle threads to let them exit. + void Terminate(); + + // Adds |task| to the thread pool. LinuxDynamicThreadPool assumes ownership + // of |task|. + void PostTask(Task* task); + + // Worker thread method to wait for up to |idle_seconds_before_exit| for more + // work from the thread pool. Returns NULL if no work is available. + Task* WaitForTask(); + + private: + friend class LinuxDynamicThreadPoolPeer; + + const std::string name_prefix_; + const int idle_seconds_before_exit_; + + Lock lock_; // Protects all the variables below. + + // Signal()s worker threads to let them know more tasks are available. + // Also used for Broadcast()'ing to worker threads to let them know the pool + // is being deleted and they can exit. + ConditionVariable tasks_available_cv_; + int num_idle_threads_; + std::queue<Task*> tasks_; + bool terminated_; + // Only used for tests to ensure correct thread ordering. It will always be + // NULL in non-test code. + scoped_ptr<ConditionVariable> num_idle_threads_cv_; + + DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPool); +}; + +} // namespace base + +#endif // BASE_WORKER_POOL_LINUX_H_ diff --git a/base/worker_pool_linux_unittest.cc b/base/worker_pool_linux_unittest.cc new file mode 100644 index 0000000..f98f37a --- /dev/null +++ b/base/worker_pool_linux_unittest.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/worker_pool_linux.h" + +#include <set> + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/task.h" +#include "base/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Peer class to provide passthrough access to LinuxDynamicThreadPool internals. +class LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer { + public: + explicit LinuxDynamicThreadPoolPeer(LinuxDynamicThreadPool* pool) + : pool_(pool) {} + + Lock* lock() { return &pool_->lock_; } + ConditionVariable* tasks_available_cv() { + return &pool_->tasks_available_cv_; + } + const std::queue<Task*>& tasks() const { return pool_->tasks_; } + int num_idle_threads() const { return pool_->num_idle_threads_; } + ConditionVariable* num_idle_threads_cv() { + return pool_->num_idle_threads_cv_.get(); + } + void set_num_idle_threads_cv(ConditionVariable* cv) { + pool_->num_idle_threads_cv_.reset(cv); + } + + private: + LinuxDynamicThreadPool* pool_; + + DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPoolPeer); +}; + +} // namespace base + +namespace { + +// IncrementingTask's main purpose is to increment a counter. It also updates a +// set of unique thread ids, and signals a ConditionVariable on completion. +// Note that since it does not block, there is no way to control the number of +// threads used if more than one IncrementingTask is consecutively posted to the +// thread pool, since the first one might finish executing before the subsequent +// PostTask() calls get invoked. +class IncrementingTask : public Task { + public: + IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads) + : counter_lock_(counter_lock), + unique_threads_lock_(unique_threads_lock), + unique_threads_(unique_threads), + counter_(counter) {} + + virtual void Run() { + AddSelfToUniqueThreadSet(); + AutoLock locked(*counter_lock_); + (*counter_)++; + } + + void AddSelfToUniqueThreadSet() { + AutoLock locked(*unique_threads_lock_); + unique_threads_->insert(PlatformThread::CurrentId()); + } + + private: + Lock* counter_lock_; + Lock* unique_threads_lock_; + std::set<PlatformThreadId>* unique_threads_; + int* counter_; + + DISALLOW_COPY_AND_ASSIGN(IncrementingTask); +}; + +// BlockingIncrementingTask is a simple wrapper around IncrementingTask that +// allows for waiting at the start of Run() for a WaitableEvent to be signalled. +class BlockingIncrementingTask : public Task { + public: + BlockingIncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads, + Lock* num_waiting_to_start_lock, + int* num_waiting_to_start, + ConditionVariable* num_waiting_to_start_cv, + base::WaitableEvent* start) + : incrementer_( + counter_lock, counter, unique_threads_lock, unique_threads), + num_waiting_to_start_lock_(num_waiting_to_start_lock), + num_waiting_to_start_(num_waiting_to_start), + num_waiting_to_start_cv_(num_waiting_to_start_cv), + start_(start) {} + + virtual void Run() { + { + AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); + (*num_waiting_to_start_)++; + } + num_waiting_to_start_cv_->Signal(); + CHECK(start_->Wait()); + incrementer_.Run(); + } + + private: + IncrementingTask incrementer_; + Lock* num_waiting_to_start_lock_; + int* num_waiting_to_start_; + ConditionVariable* num_waiting_to_start_cv_; + base::WaitableEvent* start_; + + DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); +}; + +class LinuxDynamicThreadPoolTest : public testing::Test { + protected: + LinuxDynamicThreadPoolTest() + : pool_(new base::LinuxDynamicThreadPool("dynamic_pool", 60*60)), + peer_(pool_.get()), + counter_(0), + num_waiting_to_start_(0), + num_waiting_to_start_cv_(&num_waiting_to_start_lock_), + start_(true, false) {} + + virtual void SetUp() { + peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); + } + + virtual void TearDown() { + // Wake up the idle threads so they can terminate. + if (pool_.get()) pool_->Terminate(); + } + + void WaitForTasksToStart(int num_tasks) { + AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); + while (num_waiting_to_start_ < num_tasks) { + num_waiting_to_start_cv_.Wait(); + } + } + + void WaitForIdleThreads(int num_idle_threads) { + AutoLock pool_locked(*peer_.lock()); + while (peer_.num_idle_threads() < num_idle_threads) { + peer_.num_idle_threads_cv()->Wait(); + } + } + + Task* CreateNewIncrementingTask() { + return new IncrementingTask(&counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); + } + + Task* CreateNewBlockingIncrementingTask() { + return new BlockingIncrementingTask( + &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, + &num_waiting_to_start_lock_, &num_waiting_to_start_, + &num_waiting_to_start_cv_, &start_); + } + + scoped_refptr<base::LinuxDynamicThreadPool> pool_; + base::LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer peer_; + Lock counter_lock_; + int counter_; + Lock unique_threads_lock_; + std::set<PlatformThreadId> unique_threads_; + Lock num_waiting_to_start_lock_; + int num_waiting_to_start_; + ConditionVariable num_waiting_to_start_cv_; + base::WaitableEvent start_; +}; + +TEST_F(LinuxDynamicThreadPoolTest, Basic) { + EXPECT_EQ(0, peer_.num_idle_threads()); + EXPECT_EQ(0U, unique_threads_.size()); + EXPECT_EQ(0U, peer_.tasks().size()); + + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + EXPECT_EQ(1U, unique_threads_.size()) << + "There should be only one thread allocated for one task."; + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(1, counter_); +} + +TEST_F(LinuxDynamicThreadPoolTest, ReuseIdle) { + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add another 2 tasks. One should reuse the existing worker thread. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(3, counter_); +} + +TEST_F(LinuxDynamicThreadPoolTest, TwoActiveTasks) { + // Add two blocking tasks. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; + EXPECT_EQ(2, counter_); +} + +TEST_F(LinuxDynamicThreadPoolTest, Complex) { + // Add two non blocking tasks and wait for them to finish. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add two blocking tasks, start them simultaneously, and wait for them to + // finish. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(3, counter_); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, unique_threads_.size()); + + // Wake up all idle threads so they can exit. + { + AutoLock locked(*peer_.lock()); + while (peer_.num_idle_threads() > 0) { + peer_.tasks_available_cv()->Signal(); + peer_.num_idle_threads_cv()->Wait(); + } + } + + // Add another non blocking task. There are no threads to reuse. + pool_->PostTask(CreateNewIncrementingTask()); + WaitForIdleThreads(1); + + EXPECT_EQ(3U, unique_threads_.size()); + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(4, counter_); +} + +} // namespace |