summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwillchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-03-24 23:41:53 +0000
committerwillchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-03-24 23:41:53 +0000
commit250103f6249e1f19ab14ff69f4356fee56833af8 (patch)
treee009911c7f0f087b77ff04cfd524a794d6fd2c9e
parent98733819a022a57f401f1b6cf71b0764451bb8ab (diff)
downloadchromium_src-250103f6249e1f19ab14ff69f4356fee56833af8.zip
chromium_src-250103f6249e1f19ab14ff69f4356fee56833af8.tar.gz
chromium_src-250103f6249e1f19ab14ff69f4356fee56833af8.tar.bz2
Added a thread pool to WorkerPool for Linux that dynamically adds threads as needed.
Review URL: http://codereview.chromium.org/39102 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12414 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/base.gyp2
-rw-r--r--base/base.scons1
-rw-r--r--base/base_unittests.scons2
-rw-r--r--base/worker_pool_linux.cc162
-rw-r--r--base/worker_pool_linux.h88
-rw-r--r--base/worker_pool_linux_unittest.cc268
6 files changed, 504 insertions, 19 deletions
diff --git a/base/base.gyp b/base/base.gyp
index 73b9d9c..4b4b417 100644
--- a/base/base.gyp
+++ b/base/base.gyp
@@ -303,6 +303,7 @@
'word_iterator.h',
'worker_pool.h',
'worker_pool_linux.cc',
+ 'worker_pool_linux.h',
'worker_pool_mac.mm',
'worker_pool_win.cc',
],
@@ -602,6 +603,7 @@
# Linux has an implementation of idle_timer, but it's unclear
# if we want it yet, so leave it 'unported' for now.
'idletimer_unittest.cc',
+ 'worker_pool_linux_unittest.cc',
],
'dependencies': [
'../build/linux/system.gyp:gtk',
diff --git a/base/base.scons b/base/base.scons
index e7d72a3..44d2730 100644
--- a/base/base.scons
+++ b/base/base.scons
@@ -402,6 +402,7 @@ if env.Bit('linux'):
'sys_string_conversions_linux.cc',
'test_file_util_linux.cc',
'worker_pool_linux.cc',
+ 'worker_pool_linux.h',
])
linux_version = env.Command('$BASE_DIR/file_version_info_linux.h',
diff --git a/base/base_unittests.scons b/base/base_unittests.scons
index 08709f1..c4cb2f7 100644
--- a/base/base_unittests.scons
+++ b/base/base_unittests.scons
@@ -119,6 +119,7 @@ input_files = ChromeFileList([
'win_util_unittest.cc',
'wmi_util_unittest.cc',
'word_iterator_unittest.cc',
+ 'worker_pool_unittest.cc'
]),
MSVSFilter('gfx_tests', [
@@ -141,6 +142,7 @@ if env.Bit('posix'):
if env.Bit('linux'):
input_files.Append(
'data_pack_unittest.cc',
+ 'worker_pool_linux_unittest.cc'
)
if env.Bit('mac'):
diff --git a/base/worker_pool_linux.cc b/base/worker_pool_linux.cc
index 95023f7..a650b3b 100644
--- a/base/worker_pool_linux.cc
+++ b/base/worker_pool_linux.cc
@@ -3,41 +3,165 @@
// found in the LICENSE file.
#include "base/worker_pool.h"
+#include "base/worker_pool_linux.h"
+#include "base/lazy_instance.h"
#include "base/logging.h"
+#include "base/platform_thread.h"
+#include "base/ref_counted.h"
+#include "base/string_util.h"
#include "base/task.h"
namespace {
-void* PThreadCallback(void* param) {
- Task* task = static_cast<Task*>(param);
- task->Run();
- delete task;
- return 0;
+const int kIdleSecondsBeforeExit = 10 * 60;
+const int kWorkerThreadStackSize = 64 * 1024;
+
+class WorkerPoolImpl {
+ public:
+ WorkerPoolImpl();
+ ~WorkerPoolImpl();
+
+ void PostTask(const tracked_objects::Location& from_here, Task* task,
+ bool task_is_slow);
+
+ private:
+ scoped_refptr<base::LinuxDynamicThreadPool> pool_;
+};
+
+WorkerPoolImpl::WorkerPoolImpl()
+ : pool_(new base::LinuxDynamicThreadPool(
+ "WorkerPool", kIdleSecondsBeforeExit)) {}
+
+WorkerPoolImpl::~WorkerPoolImpl() {
+ pool_->Terminate();
+}
+
+void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow) {
+ task->SetBirthPlace(from_here);
+ pool_->PostTask(task);
+}
+
+base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED);
+
+class WorkerThread : public PlatformThread::Delegate {
+ public:
+ WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit,
+ base::LinuxDynamicThreadPool* pool)
+ : name_prefix_(name_prefix),
+ idle_seconds_before_exit_(idle_seconds_before_exit),
+ pool_(pool) {}
+
+ virtual void ThreadMain();
+
+ private:
+ const std::string name_prefix_;
+ const int idle_seconds_before_exit_;
+ scoped_refptr<base::LinuxDynamicThreadPool> pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(WorkerThread);
+};
+
+void WorkerThread::ThreadMain() {
+ const std::string name =
+ StringPrintf("%s/%d", name_prefix_.c_str(),
+ IntToString(PlatformThread::CurrentId()).c_str());
+ PlatformThread::SetName(name.c_str());
+
+ for (;;) {
+ Task* task = pool_->WaitForTask();
+ if (!task)
+ break;
+ task->Run();
+ delete task;
+ }
+
+ // The WorkerThread is non-joinable, so it deletes itself.
+ delete this;
}
} // namespace
bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow) {
- task->SetBirthPlace(from_here);
+ g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
+ return true;
+}
- pthread_t thread;
- pthread_attr_t attr;
+namespace base {
- // POSIX does not have a worker thread pool implementation. For now we just
- // create a thread for each task, and ignore |task_is_slow|.
- // TODO(dsh): Implement thread reuse.
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+LinuxDynamicThreadPool::LinuxDynamicThreadPool(
+ const std::string& name_prefix,
+ int idle_seconds_before_exit)
+ : name_prefix_(name_prefix),
+ idle_seconds_before_exit_(idle_seconds_before_exit),
+ tasks_available_cv_(&lock_),
+ num_idle_threads_(0),
+ terminated_(false),
+ num_idle_threads_cv_(NULL) {}
- int err = pthread_create(&thread, &attr, PThreadCallback, task);
- pthread_attr_destroy(&attr);
- if (err) {
- DLOG(ERROR) << "pthread_create failed: " << err;
+LinuxDynamicThreadPool::~LinuxDynamicThreadPool() {
+ while (!tasks_.empty()) {
+ Task* task = tasks_.front();
+ tasks_.pop();
delete task;
- return false;
}
+}
- return true;
+void LinuxDynamicThreadPool::Terminate() {
+ {
+ AutoLock locked(lock_);
+ DCHECK(!terminated_) << "Thread pool is already terminated.";
+ terminated_ = true;
+ }
+ tasks_available_cv_.Broadcast();
+}
+
+void LinuxDynamicThreadPool::PostTask(Task* task) {
+ AutoLock locked(lock_);
+ DCHECK(!terminated_) <<
+ "This thread pool is already terminated. Do not post new tasks.";
+
+ tasks_.push(task);
+
+ // We have enough worker threads.
+ if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) {
+ tasks_available_cv_.Signal();
+ } else {
+ // The new PlatformThread will take ownership of the WorkerThread object,
+ // which will delete itself on exit.
+ WorkerThread* worker =
+ new WorkerThread(name_prefix_, idle_seconds_before_exit_, this);
+ PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker);
+ }
}
+
+Task* LinuxDynamicThreadPool::WaitForTask() {
+ AutoLock locked(lock_);
+
+ if (terminated_)
+ return NULL;
+
+ if (tasks_.empty()) { // No work available, wait for work.
+ num_idle_threads_++;
+ if (num_idle_threads_cv_.get())
+ num_idle_threads_cv_->Signal();
+ tasks_available_cv_.TimedWait(
+ TimeDelta::FromSeconds(kIdleSecondsBeforeExit));
+ num_idle_threads_--;
+ if (num_idle_threads_cv_.get())
+ num_idle_threads_cv_->Signal();
+ if (tasks_.empty()) {
+ // We waited for work, but there's still no work. Return NULL to signal
+ // the thread to terminate.
+ return NULL;
+ }
+ }
+
+ Task* task = tasks_.front();
+ tasks_.pop();
+ return task;
+}
+
+} // namespace base
diff --git a/base/worker_pool_linux.h b/base/worker_pool_linux.h
new file mode 100644
index 0000000..a9cd894
--- /dev/null
+++ b/base/worker_pool_linux.h
@@ -0,0 +1,88 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// The thread pool used in the Linux implementation of WorkerPool dynamically
+// adds threads as necessary to handle all tasks. It keeps old threads around
+// for a period of time to allow them to be reused. After this waiting period,
+// the threads exit. This thread pool uses non-joinable threads, therefore
+// worker threads are not joined during process shutdown. This means that
+// potentially long running tasks (such as DNS lookup) do not block process
+// shutdown, but also means that process shutdown may "leak" objects. Note that
+// although LinuxDynamicThreadPool spawns the worker threads and manages the
+// task queue, it does not own the worker threads. The worker threads ask the
+// LinuxDynamicThreadPool for work and eventually clean themselves up. The
+// worker threads all maintain scoped_refptrs to the LinuxDynamicThreadPool
+// instance, which prevents LinuxDynamicThreadPool from disappearing before all
+// worker threads exit. The owner of LinuxDynamicThreadPool should likewise
+// maintain a scoped_refptr to the LinuxDynamicThreadPool instance.
+//
+// NOTE: The classes defined in this file are only meant for use by the Linux
+// implementation of WorkerPool. No one else should be using these classes.
+// These symbols are exported in a header purely for testing purposes.
+
+#ifndef BASE_WORKER_POOL_LINUX_H_
+#define BASE_WORKER_POOL_LINUX_H_
+
+#include <queue>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/condition_variable.h"
+#include "base/lock.h"
+#include "base/platform_thread.h"
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+
+class Task;
+
+namespace base {
+
+class LinuxDynamicThreadPool
+ : public RefCountedThreadSafe<LinuxDynamicThreadPool> {
+ public:
+ class LinuxDynamicThreadPoolPeer;
+
+ // All worker threads will share the same |name_prefix|. They will exit after
+ // |idle_seconds_before_exit|.
+ LinuxDynamicThreadPool(const std::string& name_prefix,
+ int idle_seconds_before_exit);
+ ~LinuxDynamicThreadPool();
+
+ // Indicates that the thread pool is going away. Stops handing out tasks to
+ // worker threads. Wakes up all the idle threads to let them exit.
+ void Terminate();
+
+ // Adds |task| to the thread pool. LinuxDynamicThreadPool assumes ownership
+ // of |task|.
+ void PostTask(Task* task);
+
+ // Worker thread method to wait for up to |idle_seconds_before_exit| for more
+ // work from the thread pool. Returns NULL if no work is available.
+ Task* WaitForTask();
+
+ private:
+ friend class LinuxDynamicThreadPoolPeer;
+
+ const std::string name_prefix_;
+ const int idle_seconds_before_exit_;
+
+ Lock lock_; // Protects all the variables below.
+
+ // Signal()s worker threads to let them know more tasks are available.
+ // Also used for Broadcast()'ing to worker threads to let them know the pool
+ // is being deleted and they can exit.
+ ConditionVariable tasks_available_cv_;
+ int num_idle_threads_;
+ std::queue<Task*> tasks_;
+ bool terminated_;
+ // Only used for tests to ensure correct thread ordering. It will always be
+ // NULL in non-test code.
+ scoped_ptr<ConditionVariable> num_idle_threads_cv_;
+
+ DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPool);
+};
+
+} // namespace base
+
+#endif // BASE_WORKER_POOL_LINUX_H_
diff --git a/base/worker_pool_linux_unittest.cc b/base/worker_pool_linux_unittest.cc
new file mode 100644
index 0000000..f98f37a
--- /dev/null
+++ b/base/worker_pool_linux_unittest.cc
@@ -0,0 +1,268 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/worker_pool_linux.h"
+
+#include <set>
+
+#include "base/condition_variable.h"
+#include "base/lock.h"
+#include "base/platform_thread.h"
+#include "base/task.h"
+#include "base/waitable_event.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+// Peer class to provide passthrough access to LinuxDynamicThreadPool internals.
+class LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer {
+ public:
+ explicit LinuxDynamicThreadPoolPeer(LinuxDynamicThreadPool* pool)
+ : pool_(pool) {}
+
+ Lock* lock() { return &pool_->lock_; }
+ ConditionVariable* tasks_available_cv() {
+ return &pool_->tasks_available_cv_;
+ }
+ const std::queue<Task*>& tasks() const { return pool_->tasks_; }
+ int num_idle_threads() const { return pool_->num_idle_threads_; }
+ ConditionVariable* num_idle_threads_cv() {
+ return pool_->num_idle_threads_cv_.get();
+ }
+ void set_num_idle_threads_cv(ConditionVariable* cv) {
+ pool_->num_idle_threads_cv_.reset(cv);
+ }
+
+ private:
+ LinuxDynamicThreadPool* pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPoolPeer);
+};
+
+} // namespace base
+
+namespace {
+
+// IncrementingTask's main purpose is to increment a counter. It also updates a
+// set of unique thread ids, and signals a ConditionVariable on completion.
+// Note that since it does not block, there is no way to control the number of
+// threads used if more than one IncrementingTask is consecutively posted to the
+// thread pool, since the first one might finish executing before the subsequent
+// PostTask() calls get invoked.
+class IncrementingTask : public Task {
+ public:
+ IncrementingTask(Lock* counter_lock,
+ int* counter,
+ Lock* unique_threads_lock,
+ std::set<PlatformThreadId>* unique_threads)
+ : counter_lock_(counter_lock),
+ unique_threads_lock_(unique_threads_lock),
+ unique_threads_(unique_threads),
+ counter_(counter) {}
+
+ virtual void Run() {
+ AddSelfToUniqueThreadSet();
+ AutoLock locked(*counter_lock_);
+ (*counter_)++;
+ }
+
+ void AddSelfToUniqueThreadSet() {
+ AutoLock locked(*unique_threads_lock_);
+ unique_threads_->insert(PlatformThread::CurrentId());
+ }
+
+ private:
+ Lock* counter_lock_;
+ Lock* unique_threads_lock_;
+ std::set<PlatformThreadId>* unique_threads_;
+ int* counter_;
+
+ DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
+};
+
+// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
+// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
+class BlockingIncrementingTask : public Task {
+ public:
+ BlockingIncrementingTask(Lock* counter_lock,
+ int* counter,
+ Lock* unique_threads_lock,
+ std::set<PlatformThreadId>* unique_threads,
+ Lock* num_waiting_to_start_lock,
+ int* num_waiting_to_start,
+ ConditionVariable* num_waiting_to_start_cv,
+ base::WaitableEvent* start)
+ : incrementer_(
+ counter_lock, counter, unique_threads_lock, unique_threads),
+ num_waiting_to_start_lock_(num_waiting_to_start_lock),
+ num_waiting_to_start_(num_waiting_to_start),
+ num_waiting_to_start_cv_(num_waiting_to_start_cv),
+ start_(start) {}
+
+ virtual void Run() {
+ {
+ AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
+ (*num_waiting_to_start_)++;
+ }
+ num_waiting_to_start_cv_->Signal();
+ CHECK(start_->Wait());
+ incrementer_.Run();
+ }
+
+ private:
+ IncrementingTask incrementer_;
+ Lock* num_waiting_to_start_lock_;
+ int* num_waiting_to_start_;
+ ConditionVariable* num_waiting_to_start_cv_;
+ base::WaitableEvent* start_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
+};
+
+class LinuxDynamicThreadPoolTest : public testing::Test {
+ protected:
+ LinuxDynamicThreadPoolTest()
+ : pool_(new base::LinuxDynamicThreadPool("dynamic_pool", 60*60)),
+ peer_(pool_.get()),
+ counter_(0),
+ num_waiting_to_start_(0),
+ num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
+ start_(true, false) {}
+
+ virtual void SetUp() {
+ peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
+ }
+
+ virtual void TearDown() {
+ // Wake up the idle threads so they can terminate.
+ if (pool_.get()) pool_->Terminate();
+ }
+
+ void WaitForTasksToStart(int num_tasks) {
+ AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
+ while (num_waiting_to_start_ < num_tasks) {
+ num_waiting_to_start_cv_.Wait();
+ }
+ }
+
+ void WaitForIdleThreads(int num_idle_threads) {
+ AutoLock pool_locked(*peer_.lock());
+ while (peer_.num_idle_threads() < num_idle_threads) {
+ peer_.num_idle_threads_cv()->Wait();
+ }
+ }
+
+ Task* CreateNewIncrementingTask() {
+ return new IncrementingTask(&counter_lock_, &counter_,
+ &unique_threads_lock_, &unique_threads_);
+ }
+
+ Task* CreateNewBlockingIncrementingTask() {
+ return new BlockingIncrementingTask(
+ &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
+ &num_waiting_to_start_lock_, &num_waiting_to_start_,
+ &num_waiting_to_start_cv_, &start_);
+ }
+
+ scoped_refptr<base::LinuxDynamicThreadPool> pool_;
+ base::LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer peer_;
+ Lock counter_lock_;
+ int counter_;
+ Lock unique_threads_lock_;
+ std::set<PlatformThreadId> unique_threads_;
+ Lock num_waiting_to_start_lock_;
+ int num_waiting_to_start_;
+ ConditionVariable num_waiting_to_start_cv_;
+ base::WaitableEvent start_;
+};
+
+TEST_F(LinuxDynamicThreadPoolTest, Basic) {
+ EXPECT_EQ(0, peer_.num_idle_threads());
+ EXPECT_EQ(0U, unique_threads_.size());
+ EXPECT_EQ(0U, peer_.tasks().size());
+
+ // Add one task and wait for it to be completed.
+ pool_->PostTask(CreateNewIncrementingTask());
+
+ WaitForIdleThreads(1);
+
+ EXPECT_EQ(1U, unique_threads_.size()) <<
+ "There should be only one thread allocated for one task.";
+ EXPECT_EQ(1, peer_.num_idle_threads());
+ EXPECT_EQ(1, counter_);
+}
+
+TEST_F(LinuxDynamicThreadPoolTest, ReuseIdle) {
+ // Add one task and wait for it to be completed.
+ pool_->PostTask(CreateNewIncrementingTask());
+
+ WaitForIdleThreads(1);
+
+ // Add another 2 tasks. One should reuse the existing worker thread.
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+
+ WaitForTasksToStart(2);
+ start_.Signal();
+ WaitForIdleThreads(2);
+
+ EXPECT_EQ(2U, unique_threads_.size());
+ EXPECT_EQ(2, peer_.num_idle_threads());
+ EXPECT_EQ(3, counter_);
+}
+
+TEST_F(LinuxDynamicThreadPoolTest, TwoActiveTasks) {
+ // Add two blocking tasks.
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+
+ EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
+
+ WaitForTasksToStart(2);
+ start_.Signal();
+ WaitForIdleThreads(2);
+
+ EXPECT_EQ(2U, unique_threads_.size());
+ EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
+ EXPECT_EQ(2, counter_);
+}
+
+TEST_F(LinuxDynamicThreadPoolTest, Complex) {
+ // Add two non blocking tasks and wait for them to finish.
+ pool_->PostTask(CreateNewIncrementingTask());
+
+ WaitForIdleThreads(1);
+
+ // Add two blocking tasks, start them simultaneously, and wait for them to
+ // finish.
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+
+ WaitForTasksToStart(2);
+ start_.Signal();
+ WaitForIdleThreads(2);
+
+ EXPECT_EQ(3, counter_);
+ EXPECT_EQ(2, peer_.num_idle_threads());
+ EXPECT_EQ(2U, unique_threads_.size());
+
+ // Wake up all idle threads so they can exit.
+ {
+ AutoLock locked(*peer_.lock());
+ while (peer_.num_idle_threads() > 0) {
+ peer_.tasks_available_cv()->Signal();
+ peer_.num_idle_threads_cv()->Wait();
+ }
+ }
+
+ // Add another non blocking task. There are no threads to reuse.
+ pool_->PostTask(CreateNewIncrementingTask());
+ WaitForIdleThreads(1);
+
+ EXPECT_EQ(3U, unique_threads_.size());
+ EXPECT_EQ(1, peer_.num_idle_threads());
+ EXPECT_EQ(4, counter_);
+}
+
+} // namespace