diff options
author | ajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-07-26 18:25:16 +0000 |
---|---|---|
committer | ajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-07-26 18:25:16 +0000 |
commit | 180c85e3e3691042ab617fd0755dcde6e75d5fbd (patch) | |
tree | b9d4fd7a77f7f54dce4463960326ef7b0cd7a270 /base/threading | |
parent | 324ab8e0d77303333f8ad7de3b54d248587687db (diff) | |
download | chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.zip chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.gz chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.bz2 |
Support Closure in ALL the loops!
Add an overload for PostTask into MessageLoopProxy, and WorkerPool.
BUG=35223
TEST=unittests.
Review URL: http://codereview.chromium.org/7316015
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@94129 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/threading')
-rw-r--r-- | base/threading/worker_pool.h | 6 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.cc | 102 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.h | 34 | ||||
-rw-r--r-- | base/threading/worker_pool_posix_unittest.cc | 34 | ||||
-rw-r--r-- | base/threading/worker_pool_win.cc | 67 |
5 files changed, 181 insertions, 62 deletions
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h index 12b50b4..58471b5 100644 --- a/base/threading/worker_pool.h +++ b/base/threading/worker_pool.h @@ -7,6 +7,7 @@ #pragma once #include "base/base_api.h" +#include "base/callback.h" #include "base/tracked.h" class Task; @@ -27,8 +28,13 @@ class BASE_API WorkerPool { // should be used for tasks that will take a long time to execute. Returns // false if |task| could not be posted to a worker thread. Regardless of // return value, ownership of |task| is transferred to the worker pool. + // + // TODO(ajwong): Remove the Task* based overload once we've finishsed the + // Task -> Closure migration. static bool PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow); + static bool PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow); }; } // namespace base diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc index bd6210f..c3a382c 100644 --- a/base/threading/worker_pool_posix.cc +++ b/base/threading/worker_pool_posix.cc @@ -4,6 +4,7 @@ #include "base/threading/worker_pool_posix.h" +#include "base/bind.h" #include "base/lazy_instance.h" #include "base/logging.h" #include "base/memory/ref_counted.h" @@ -11,6 +12,7 @@ #include "base/task.h" #include "base/threading/platform_thread.h" #include "base/threading/worker_pool.h" +#include "base/tracked_objects.h" namespace base { @@ -28,6 +30,8 @@ class WorkerPoolImpl { void PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow); + void PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow); private: scoped_refptr<base::PosixDynamicThreadPool> pool_; @@ -44,25 +48,27 @@ WorkerPoolImpl::~WorkerPoolImpl() { void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); - pool_->PostTask(task); + pool_->PostTask(from_here, task); +} + +void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + pool_->PostTask(from_here, task); } base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); class WorkerThread : public PlatformThread::Delegate { public: - WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, + WorkerThread(const std::string& name_prefix, base::PosixDynamicThreadPool* pool) : name_prefix_(name_prefix), - idle_seconds_before_exit_(idle_seconds_before_exit), pool_(pool) {} virtual void ThreadMain(); private: const std::string name_prefix_; - const int idle_seconds_before_exit_; scoped_refptr<base::PosixDynamicThreadPool> pool_; DISALLOW_COPY_AND_ASSIGN(WorkerThread); @@ -74,11 +80,10 @@ void WorkerThread::ThreadMain() { PlatformThread::SetName(name.c_str()); for (;;) { - Task* task = pool_->WaitForTask(); - if (!task) + PosixDynamicThreadPool::PendingTask pending_task = pool_->WaitForTask(); + if (pending_task.task.is_null()) break; - task->Run(); - delete task; + pending_task.task.Run(); } // The WorkerThread is non-joinable, so it deletes itself. @@ -93,21 +98,35 @@ bool WorkerPool::PostTask(const tracked_objects::Location& from_here, return true; } +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); + return true; +} + +PosixDynamicThreadPool::PendingTask::PendingTask( + const tracked_objects::Location& posted_from, + const base::Closure& task) + : task(task) { +} + +PosixDynamicThreadPool::PendingTask::~PendingTask() { +} + PosixDynamicThreadPool::PosixDynamicThreadPool( const std::string& name_prefix, int idle_seconds_before_exit) : name_prefix_(name_prefix), idle_seconds_before_exit_(idle_seconds_before_exit), - tasks_available_cv_(&lock_), + pending_tasks_available_cv_(&lock_), num_idle_threads_(0), terminated_(false), num_idle_threads_cv_(NULL) {} PosixDynamicThreadPool::~PosixDynamicThreadPool() { - while (!tasks_.empty()) { - Task* task = tasks_.front(); - tasks_.pop(); - delete task; + while (!pending_tasks_.empty()) { + PendingTask pending_task = pending_tasks_.front(); + pending_tasks_.pop(); } } @@ -117,53 +136,76 @@ void PosixDynamicThreadPool::Terminate() { DCHECK(!terminated_) << "Thread pool is already terminated."; terminated_ = true; } - tasks_available_cv_.Broadcast(); + pending_tasks_available_cv_.Broadcast(); +} + +void PosixDynamicThreadPool::PostTask( + const tracked_objects::Location& from_here, + Task* task) { + PendingTask pending_task(from_here, + base::Bind(&subtle::TaskClosureAdapter::Run, + new subtle::TaskClosureAdapter(task))); + // |pending_task| and AddTask() work in conjunction here to ensure that after + // a successful AddTask(), the TaskClosureAdapter object is deleted on the + // worker thread. In AddTask(), the reference |pending_task.task| is handed + // off in a destructive manner to ensure that the local copy of + // |pending_task| doesn't keep a ref on the Closure causing the + // TaskClosureAdapter to be deleted on the wrong thread. + AddTask(&pending_task); +} + +void PosixDynamicThreadPool::PostTask( + const tracked_objects::Location& from_here, + const base::Closure& task) { + PendingTask pending_task(from_here, task); + AddTask(&pending_task); } -void PosixDynamicThreadPool::PostTask(Task* task) { +void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { AutoLock locked(lock_); DCHECK(!terminated_) << "This thread pool is already terminated. Do not post new tasks."; - tasks_.push(task); + pending_tasks_.push(*pending_task); + pending_task->task.Reset(); // We have enough worker threads. - if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { - tasks_available_cv_.Signal(); + if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { + pending_tasks_available_cv_.Signal(); } else { // The new PlatformThread will take ownership of the WorkerThread object, // which will delete itself on exit. WorkerThread* worker = - new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); + new WorkerThread(name_prefix_, this); PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); } } -Task* PosixDynamicThreadPool::WaitForTask() { +PosixDynamicThreadPool::PendingTask PosixDynamicThreadPool::WaitForTask() { AutoLock locked(lock_); if (terminated_) - return NULL; + return PendingTask(FROM_HERE, base::Closure()); - if (tasks_.empty()) { // No work available, wait for work. + if (pending_tasks_.empty()) { // No work available, wait for work. num_idle_threads_++; if (num_idle_threads_cv_.get()) num_idle_threads_cv_->Signal(); - tasks_available_cv_.TimedWait( - TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); + pending_tasks_available_cv_.TimedWait( + TimeDelta::FromSeconds(idle_seconds_before_exit_)); num_idle_threads_--; if (num_idle_threads_cv_.get()) num_idle_threads_cv_->Signal(); - if (tasks_.empty()) { + if (pending_tasks_.empty()) { // We waited for work, but there's still no work. Return NULL to signal // the thread to terminate. - return NULL; + return PendingTask(FROM_HERE, base::Closure()); } } - Task* task = tasks_.front(); - tasks_.pop(); - return task; + PendingTask pending_task = pending_tasks_.front(); + pending_tasks_.pop(); + return pending_task; } } // namespace base diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h index 9bb9eda..bf75c78 100644 --- a/base/threading/worker_pool_posix.h +++ b/base/threading/worker_pool_posix.h @@ -29,11 +29,13 @@ #include <string> #include "base/basictypes.h" +#include "base/callback.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" +#include "base/tracked.h" class Task; @@ -44,6 +46,19 @@ class BASE_API PosixDynamicThreadPool public: class PosixDynamicThreadPoolPeer; + struct PendingTask { + PendingTask(const tracked_objects::Location& posted_from, + const base::Closure& task); + ~PendingTask(); + // TODO(ajwong): After we figure out why Mac's ~AtExitManager dies when + // destructing the lock, add in extra info so we can call + // tracked_objects::TallyADeathIfActive() and + // tracked_objects::TallyABirthIfActive correctly. + + // The task to run. + base::Closure task; + }; + // All worker threads will share the same |name_prefix|. They will exit after // |idle_seconds_before_exit|. PosixDynamicThreadPool(const std::string& name_prefix, @@ -56,15 +71,26 @@ class BASE_API PosixDynamicThreadPool // Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership // of |task|. - void PostTask(Task* task); + // + // TODO(ajwong): Remove this compatibility API once the Task -> Closure + // migration is finished. + void PostTask(const tracked_objects::Location& from_here, Task* task); + + // Adds |task| to the thread pool. + void PostTask(const tracked_objects::Location& from_here, + const base::Closure& task); // Worker thread method to wait for up to |idle_seconds_before_exit| for more // work from the thread pool. Returns NULL if no work is available. - Task* WaitForTask(); + PendingTask WaitForTask(); private: friend class PosixDynamicThreadPoolPeer; + // Adds pending_task to the thread pool. This function will clear + // |pending_task->task|. + void AddTask(PendingTask* pending_task); + const std::string name_prefix_; const int idle_seconds_before_exit_; @@ -73,9 +99,9 @@ class BASE_API PosixDynamicThreadPool // Signal()s worker threads to let them know more tasks are available. // Also used for Broadcast()'ing to worker threads to let them know the pool // is being deleted and they can exit. - ConditionVariable tasks_available_cv_; + ConditionVariable pending_tasks_available_cv_; int num_idle_threads_; - std::queue<Task*> tasks_; + std::queue<PendingTask> pending_tasks_; bool terminated_; // Only used for tests to ensure correct thread ordering. It will always be // NULL in non-test code. diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc index c984ee3..01722f0 100644 --- a/base/threading/worker_pool_posix_unittest.cc +++ b/base/threading/worker_pool_posix_unittest.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -22,10 +22,12 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { : pool_(pool) {} Lock* lock() { return &pool_->lock_; } - ConditionVariable* tasks_available_cv() { - return &pool_->tasks_available_cv_; + ConditionVariable* pending_tasks_available_cv() { + return &pool_->pending_tasks_available_cv_; + } + const std::queue<PendingTask>& pending_tasks() const { + return pool_->pending_tasks_; } - const std::queue<Task*>& tasks() const { return pool_->tasks_; } int num_idle_threads() const { return pool_->num_idle_threads_; } ConditionVariable* num_idle_threads_cv() { return pool_->num_idle_threads_cv_.get(); @@ -180,10 +182,10 @@ class PosixDynamicThreadPoolTest : public testing::Test { TEST_F(PosixDynamicThreadPoolTest, Basic) { EXPECT_EQ(0, peer_.num_idle_threads()); EXPECT_EQ(0U, unique_threads_.size()); - EXPECT_EQ(0U, peer_.tasks().size()); + EXPECT_EQ(0U, peer_.pending_tasks().size()); // Add one task and wait for it to be completed. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); @@ -195,13 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { // Add one task and wait for it to be completed. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); // Add another 2 tasks. One should reuse the existing worker thread. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); WaitForTasksToStart(2); start_.Signal(); @@ -214,8 +216,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { // Add two blocking tasks. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; @@ -230,14 +232,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { TEST_F(PosixDynamicThreadPoolTest, Complex) { // Add two non blocking tasks and wait for them to finish. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); // Add two blocking tasks, start them simultaneously, and wait for them to // finish. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); WaitForTasksToStart(2); start_.Signal(); @@ -251,13 +253,13 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) { { base::AutoLock locked(*peer_.lock()); while (peer_.num_idle_threads() > 0) { - peer_.tasks_available_cv()->Signal(); + peer_.pending_tasks_available_cv()->Signal(); peer_.num_idle_threads_cv()->Wait(); } } // Add another non blocking task. There are no threads to reuse. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); EXPECT_EQ(3U, unique_threads_.size()); diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc index 2072e52..2aa423f 100644 --- a/base/threading/worker_pool_win.cc +++ b/base/threading/worker_pool_win.cc @@ -1,40 +1,83 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/threading/worker_pool.h" +#include "base/bind.h" #include "base/logging.h" #include "base/task.h" +#include "base/tracked_objects.h" namespace base { namespace { +struct PendingTask { + PendingTask( + const tracked_objects::Location& posted_from, + const base::Closure& task) + : task(task) { +#if defined(TRACK_ALL_TASK_OBJECTS) + post_births = tracked_objects::ThreadData::TallyABirthIfActive(posted_from); + time_posted = TimeTicks::Now(); +#endif // defined(TRACK_ALL_TASK_OBJECTS) + } + +#if defined(TRACK_ALL_TASK_OBJECTS) + // Counter for location where the Closure was posted from. + tracked_objects::Births* post_births; + + // Time the task was posted. + TimeTicks time_posted; +#endif // defined(TRACK_ALL_TASK_OBJECTS) + + // The task to run. + base::Closure task; +}; + DWORD CALLBACK WorkItemCallback(void* param) { - Task* task = static_cast<Task*>(param); - task->Run(); - delete task; + PendingTask* pending_task = static_cast<PendingTask*>(param); + pending_task->task.Run(); +#if defined(TRACK_ALL_TASK_OBJECTS) + tracked_objects::ThreadData::TallyADeathIfActive( + pending_task->post_births, + TimeTicks::Now() - pending_task->time_posted); +#endif // defined(TRACK_ALL_TASK_OBJECTS) + delete pending_task; return 0; } -} // namespace - -bool WorkerPool::PostTask(const tracked_objects::Location& from_here, - Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); - +// Takes ownership of |pending_task| +bool PostTaskInternal(PendingTask* pending_task, bool task_is_slow) { ULONG flags = 0; if (task_is_slow) flags |= WT_EXECUTELONGFUNCTION; - if (!QueueUserWorkItem(WorkItemCallback, task, flags)) { + if (!QueueUserWorkItem(WorkItemCallback, pending_task, flags)) { DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError(); - delete task; + delete pending_task; return false; } return true; } +} // namespace + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + PendingTask* pending_task = + new PendingTask(from_here, + base::Bind(&subtle::TaskClosureAdapter::Run, + new subtle::TaskClosureAdapter(task))); + return PostTaskInternal(pending_task, task_is_slow); +} + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + PendingTask* pending_task = new PendingTask(from_here, task); + return PostTaskInternal(pending_task, task_is_slow); +} + } // namespace base |