diff options
author | rtenneti@chromium.org <rtenneti@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-29 01:28:07 +0000 |
---|---|---|
committer | rtenneti@chromium.org <rtenneti@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-29 01:28:07 +0000 |
commit | 8876310adb585c1639c64b50432793f2ac35a513 (patch) | |
tree | 55f31e85d8fa74245318bf6435945759213697f6 /cc/base | |
parent | a649b5e509c33907d8cb538f2820fbc963210fea (diff) | |
download | chromium_src-8876310adb585c1639c64b50432793f2ac35a513.zip chromium_src-8876310adb585c1639c64b50432793f2ac35a513.tar.gz chromium_src-8876310adb585c1639c64b50432793f2ac35a513.tar.bz2 |
Revert 202363 "cc: Cancel and re-prioritize worker pool tasks."
> cc: Cancel and re-prioritize worker pool tasks.
>
> This adds a task graph interface to the worker pool and
> implements a simple queue instance of this interface for
> use by the tile manager.
>
> The task graph interface can be used describe more
> complicated task dependencies in the future and
> provides the immediate benefit of seamlessly being
> able to cancel and re-prioritize tasks.
>
> BUG=178974
>
> Review URL: https://chromiumcodereview.appspot.com/14689004
TBR=reveman@chromium.org
Review URL: https://codereview.chromium.org/16178002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@202736 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'cc/base')
-rw-r--r-- | cc/base/scoped_ptr_hash_map.h | 2 | ||||
-rw-r--r-- | cc/base/worker_pool.cc | 480 | ||||
-rw-r--r-- | cc/base/worker_pool.h | 73 | ||||
-rw-r--r-- | cc/base/worker_pool_perftest.cc | 234 | ||||
-rw-r--r-- | cc/base/worker_pool_unittest.cc | 182 |
5 files changed, 189 insertions, 782 deletions
diff --git a/cc/base/scoped_ptr_hash_map.h b/cc/base/scoped_ptr_hash_map.h index af792ec..970984a 100644 --- a/cc/base/scoped_ptr_hash_map.h +++ b/cc/base/scoped_ptr_hash_map.h @@ -31,7 +31,7 @@ class ScopedPtrHashMap { ~ScopedPtrHashMap() { clear(); } - void swap(ScopedPtrHashMap<Key, Value>& other) { + void swap(ScopedPtrHashMap<Key, Value*>& other) { data_.swap(other.data_); } diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc index b2fb185..59dc828 100644 --- a/cc/base/worker_pool.cc +++ b/cc/base/worker_pool.cc @@ -4,93 +4,46 @@ #include "cc/base/worker_pool.h" -#if defined(OS_ANDROID) -// TODO(epenner): Move thread priorities to base. (crbug.com/170549) -#include <sys/resource.h> -#endif - -#include <map> +#include <algorithm> #include "base/bind.h" #include "base/debug/trace_event.h" -#include "base/hash_tables.h" #include "base/stringprintf.h" +#include "base/synchronization/condition_variable.h" #include "base/threading/simple_thread.h" #include "base/threading/thread_restrictions.h" -#include "cc/base/scoped_ptr_deque.h" -#include "cc/base/scoped_ptr_hash_map.h" - -#if defined(COMPILER_GCC) -namespace BASE_HASH_NAMESPACE { -template <> struct hash<cc::internal::WorkerPoolTask*> { - size_t operator()(cc::internal::WorkerPoolTask* ptr) const { - return hash<size_t>()(reinterpret_cast<size_t>(ptr)); - } -}; -} // namespace BASE_HASH_NAMESPACE -#endif // COMPILER namespace cc { -namespace internal { +namespace { -WorkerPoolTask::WorkerPoolTask() - : did_schedule_(false), - did_run_(false), - did_complete_(false) { -} - -WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) - : did_schedule_(false), - did_run_(false), - did_complete_(false) { - dependencies_.swap(*dependencies); -} - -WorkerPoolTask::~WorkerPoolTask() { - DCHECK_EQ(did_schedule_, did_complete_); - DCHECK(!did_run_ || did_schedule_); - DCHECK(!did_run_ || did_complete_); -} +class WorkerPoolTaskImpl : public internal::WorkerPoolTask { + public: + WorkerPoolTaskImpl(const WorkerPool::Callback& task, + const base::Closure& reply) + : internal::WorkerPoolTask(reply), + task_(task) {} -void WorkerPoolTask::DidSchedule() { - DCHECK(!did_complete_); - did_schedule_ = true; -} + virtual void RunOnThread(unsigned thread_index) OVERRIDE { + task_.Run(); + } -void WorkerPoolTask::WillRun() { - DCHECK(did_schedule_); - DCHECK(!did_complete_); - DCHECK(!did_run_); -} + private: + WorkerPool::Callback task_; +}; -void WorkerPoolTask::DidRun() { - did_run_ = true; -} +} // namespace -void WorkerPoolTask::DidComplete() { - DCHECK(did_schedule_); - DCHECK(!did_complete_); - did_complete_ = true; -} +namespace internal { -bool WorkerPoolTask::IsReadyToRun() const { - // TODO(reveman): Use counter to improve performance. - for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); - it != dependencies_.rend(); ++it) { - WorkerPoolTask* dependency = *it; - if (!dependency->HasFinishedRunning()) - return false; - } - return true; +WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { } -bool WorkerPoolTask::HasFinishedRunning() const { - return did_run_; +WorkerPoolTask::~WorkerPoolTask() { } -bool WorkerPoolTask::HasCompleted() const { - return did_complete_; +void WorkerPoolTask::DidComplete() { + reply_.Run(); } } // namespace internal @@ -107,51 +60,17 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { void Shutdown(); - // Schedule running of |root| task and all its dependencies. Tasks - // previously scheduled but no longer needed to run |root| will be - // canceled unless already running. Canceled tasks are moved to - // |completed_tasks_| without being run. The result is that once - // scheduled, a task is guaranteed to end up in the |completed_tasks_| - // queue even if they later get canceled by another call to - // ScheduleTasks(). - void ScheduleTasks(internal::WorkerPoolTask* root); + void PostTask(scoped_ptr<internal::WorkerPoolTask> task); - // Collect all completed tasks in |completed_tasks|. Returns true if idle. - bool CollectCompletedTasks(TaskDeque* completed_tasks); + // Appends all completed tasks to worker pool's completed tasks queue + // and returns true if idle. + bool CollectCompletedTasks(); private: - class ScheduledTask { - public: - ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) - : priority_(priority) { - if (dependent) - dependents_.push_back(dependent); - } - - internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } - unsigned priority() const { return priority_; } - - private: - internal::WorkerPoolTask::TaskVector dependents_; - unsigned priority_; - }; - typedef internal::WorkerPoolTask* ScheduledTaskMapKey; - typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> - ScheduledTaskMap; - - // This builds a ScheduledTaskMap from a root task. - static unsigned BuildScheduledTaskMapRecursive( - internal::WorkerPoolTask* task, - internal::WorkerPoolTask* dependent, - unsigned priority, - ScheduledTaskMap* scheduled_tasks); - static void BuildScheduledTaskMap( - internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); - - // Collect all completed tasks by swapping the contents of - // |completed_tasks| and |completed_tasks_|. Lock must be acquired - // before calling this function. Returns true if idle. - bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); + // Appends all completed tasks to |completed_tasks|. Lock must + // already be acquired before calling this function. + bool AppendCompletedTasksWithLockAcquired( + ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); // Schedule an OnIdleOnOriginThread callback if not already pending. // Lock must already be acquired before calling this function. @@ -171,8 +90,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { mutable base::Lock lock_; // Condition variable that is waited on by worker threads until new - // tasks are ready to run or shutdown starts. - base::ConditionVariable has_ready_to_run_tasks_cv_; + // tasks are posted or shutdown starts. + base::ConditionVariable has_pending_tasks_cv_; // Target message loop used for posting callbacks. scoped_refptr<base::MessageLoopProxy> origin_loop_; @@ -187,25 +106,15 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { // loop index is 0. unsigned next_thread_index_; + // Number of tasks currently running. + unsigned running_task_count_; + // Set during shutdown. Tells workers to exit when no more tasks // are pending. bool shutdown_; - // The root task that is a dependent of all other tasks. - scoped_refptr<internal::WorkerPoolTask> root_; - - // This set contains all pending tasks. - ScheduledTaskMap pending_tasks_; - - // Ordered set of tasks that are ready to run. - // TODO(reveman): priority_queue might be more efficient. - typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; - TaskMap ready_to_run_tasks_; - - // This set contains all currently running tasks. - ScheduledTaskMap running_tasks_; - - // Completed tasks not yet collected by origin thread. + typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; + TaskDeque pending_tasks_; TaskDeque completed_tasks_; ScopedPtrDeque<base::DelegateSimpleThread> workers_; @@ -218,24 +127,25 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool, const std::string& thread_name_prefix) : worker_pool_on_origin_thread_(worker_pool), lock_(), - has_ready_to_run_tasks_cv_(&lock_), + has_pending_tasks_cv_(&lock_), origin_loop_(base::MessageLoopProxy::current()), weak_ptr_factory_(this), on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, weak_ptr_factory_.GetWeakPtr())), on_idle_pending_(false), next_thread_index_(0), + running_task_count_(0), shutdown_(false) { base::AutoLock lock(lock_); while (workers_.size() < num_threads) { scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( new base::DelegateSimpleThread( - this, - thread_name_prefix + - base::StringPrintf( - "Worker%u", - static_cast<unsigned>(workers_.size() + 1)).c_str())); + this, + thread_name_prefix + + base::StringPrintf( + "Worker%u", + static_cast<unsigned>(workers_.size() + 1)).c_str())); worker->Start(); workers_.push_back(worker.Pass()); } @@ -246,10 +156,12 @@ WorkerPool::Inner::~Inner() { DCHECK(shutdown_); + // Cancel all pending callbacks. + weak_ptr_factory_.InvalidateWeakPtrs(); + DCHECK_EQ(0u, pending_tasks_.size()); - DCHECK_EQ(0u, ready_to_run_tasks_.size()); - DCHECK_EQ(0u, running_tasks_.size()); DCHECK_EQ(0u, completed_tasks_.size()); + DCHECK_EQ(0u, running_task_count_); } void WorkerPool::Inner::Shutdown() { @@ -261,7 +173,7 @@ void WorkerPool::Inner::Shutdown() { // Wake up a worker so it knows it should exit. This will cause all workers // to exit as each will wake up another worker before exiting. - has_ready_to_run_tasks_cv_.Signal(); + has_pending_tasks_cv_.Signal(); } while (workers_.size()) { @@ -271,100 +183,32 @@ void WorkerPool::Inner::Shutdown() { base::ThreadRestrictions::ScopedAllowIO allow_io; worker->Join(); } - - // Cancel any pending OnIdle callback. - weak_ptr_factory_.InvalidateWeakPtrs(); } -void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { - // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. - DCHECK(!root || !shutdown_); - - scoped_refptr<internal::WorkerPoolTask> new_root(root); - - ScheduledTaskMap new_pending_tasks; - ScheduledTaskMap new_running_tasks; - TaskMap new_ready_to_run_tasks; - - // Build scheduled task map before acquiring |lock_|. - if (root) - BuildScheduledTaskMap(root, &new_pending_tasks); - - { - base::AutoLock lock(lock_); - - // First remove all completed tasks from |new_pending_tasks|. - for (TaskDeque::iterator it = completed_tasks_.begin(); - it != completed_tasks_.end(); ++it) { - internal::WorkerPoolTask* task = *it; - new_pending_tasks.take_and_erase(task); - } - - // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. - for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); - it != pending_tasks_.end(); ++it) { - internal::WorkerPoolTask* task = it->first; - - // Task has completed if not present in |new_pending_tasks|. - if (!new_pending_tasks.contains(task)) - completed_tasks_.push_back(task); - } - - // Build new running task set. - for (ScheduledTaskMap::iterator it = running_tasks_.begin(); - it != running_tasks_.end(); ++it) { - internal::WorkerPoolTask* task = it->first; - // Transfer scheduled task value from |new_pending_tasks| to - // |new_running_tasks| if currently running. Value must be set to - // NULL if |new_pending_tasks| doesn't contain task. This does - // the right in both cases. - new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); - } - - // Build new "ready to run" tasks queue. - for (ScheduledTaskMap::iterator it = new_pending_tasks.begin(); - it != new_pending_tasks.end(); ++it) { - internal::WorkerPoolTask* task = it->first; - - // Completed tasks should not exist in |new_pending_tasks_|. - DCHECK(!task->HasFinishedRunning()); - - // Call DidSchedule() to indicate that this task has been scheduled. - // Note: This is only for debugging purposes. - task->DidSchedule(); - - DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority())); - if (task->IsReadyToRun()) - new_ready_to_run_tasks[it->second->priority()] = task; - } +void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { + base::AutoLock lock(lock_); - // Swap root taskand task sets. - // Note: old tasks are intentionally destroyed after releasing |lock_|. - root_.swap(new_root); - pending_tasks_.swap(new_pending_tasks); - running_tasks_.swap(new_running_tasks); - ready_to_run_tasks_.swap(new_ready_to_run_tasks); + pending_tasks_.push_back(task.Pass()); - // If there is more work available, wake up worker thread. - if (!ready_to_run_tasks_.empty()) - has_ready_to_run_tasks_cv_.Signal(); - } + // There is more work available, so wake up worker thread. + has_pending_tasks_cv_.Signal(); } -bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { +bool WorkerPool::Inner::CollectCompletedTasks() { base::AutoLock lock(lock_); - return CollectCompletedTasksWithLockAcquired(completed_tasks); + return AppendCompletedTasksWithLockAcquired( + &worker_pool_on_origin_thread_->completed_tasks_); } -bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( - TaskDeque* completed_tasks) { +bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( + ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { lock_.AssertAcquired(); - DCHECK_EQ(0u, completed_tasks->size()); - completed_tasks->swap(completed_tasks_); + while (completed_tasks_.size()) + completed_tasks->push_back(completed_tasks_.take_front().Pass()); - return running_tasks_.empty() && pending_tasks_.empty(); + return !running_task_count_ && pending_tasks_.empty(); } void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { @@ -377,8 +221,6 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { } void WorkerPool::Inner::OnIdleOnOriginThread() { - TaskDeque completed_tasks; - { base::AutoLock lock(lock_); @@ -386,13 +228,14 @@ void WorkerPool::Inner::OnIdleOnOriginThread() { on_idle_pending_ = false; // Early out if no longer idle. - if (!running_tasks_.empty() || !pending_tasks_.empty()) + if (running_task_count_ || !pending_tasks_.empty()) return; - CollectCompletedTasksWithLockAcquired(&completed_tasks); + AppendCompletedTasksWithLockAcquired( + &worker_pool_on_origin_thread_->completed_tasks_); } - worker_pool_on_origin_thread_->OnIdle(&completed_tasks); + worker_pool_on_origin_thread_->OnIdle(); } void WorkerPool::Inner::Run() { @@ -408,37 +251,29 @@ void WorkerPool::Inner::Run() { int thread_index = next_thread_index_++; while (true) { - if (ready_to_run_tasks_.empty()) { - if (pending_tasks_.empty()) { - // Exit when shutdown is set and no more tasks are pending. - if (shutdown_) - break; - - // Schedule an idle callback if no tasks are running. - if (running_tasks_.empty()) - ScheduleOnIdleWithLockAcquired(); - } - - // Wait for more tasks. - has_ready_to_run_tasks_cv_.Wait(); + if (pending_tasks_.empty()) { + // Exit when shutdown is set and no more tasks are pending. + if (shutdown_) + break; + + // Schedule an idle callback if requested and not pending. + if (!running_task_count_) + ScheduleOnIdleWithLockAcquired(); + + // Wait for new pending tasks. + has_pending_tasks_cv_.Wait(); continue; } - // Take top priority task from |ready_to_run_tasks_|. - scoped_refptr<internal::WorkerPoolTask> task( - ready_to_run_tasks_.begin()->second); - ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); - - // Move task from |pending_tasks_| to |running_tasks_|. - DCHECK(pending_tasks_.contains(task)); - DCHECK(!running_tasks_.contains(task)); - running_tasks_.set(task, pending_tasks_.take_and_erase(task)); + // Get next task. + scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); - // There may be more work available, so wake up another worker thread. - has_ready_to_run_tasks_cv_.Signal(); + // Increment |running_task_count_| before starting to run task. + running_task_count_++; - // Call WillRun() before releasing |lock_| and running task. - task->WillRun(); + // There may be more work available, so wake up another + // worker thread. + has_pending_tasks_cv_.Signal(); { base::AutoUnlock unlock(lock_); @@ -446,95 +281,15 @@ void WorkerPool::Inner::Run() { task->RunOnThread(thread_index); } - // This will mark task as finished running. - task->DidRun(); - - // Now iterate over all dependents to check if they are ready to run. - scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( - task); - if (scheduled_task) { - typedef internal::WorkerPoolTask::TaskVector TaskVector; - for (TaskVector::iterator it = scheduled_task->dependents().begin(); - it != scheduled_task->dependents().end(); ++it) { - internal::WorkerPoolTask* dependent = *it; - if (!dependent->IsReadyToRun()) - continue; - - // Task is ready. Add it to |ready_to_run_tasks_|. - DCHECK(pending_tasks_.contains(dependent)); - unsigned priority = pending_tasks_.get(dependent)->priority(); - DCHECK(!ready_to_run_tasks_.count(priority) || - ready_to_run_tasks_[priority] == dependent); - ready_to_run_tasks_[priority] = dependent; - } - } + completed_tasks_.push_back(task.Pass()); - // Finally add task to |completed_tasks_|. - completed_tasks_.push_back(task); + // Decrement |running_task_count_| now that we are done running task. + running_task_count_--; } // We noticed we should exit. Wake up the next worker so it knows it should // exit as well (because the Shutdown() code only signals once). - has_ready_to_run_tasks_cv_.Signal(); -} - -// BuildScheduledTaskMap() takes a task tree as input and constructs -// a unique set of tasks with edges between dependencies pointing in -// the direction of the dependents. Each task is given a unique priority -// which is currently the same as the DFS traversal order. -// -// Input: Output: -// -// root task4 Task | Priority (lower is better) -// / \ / \ -------+--------------------------- -// task1 task2 task3 task2 root | 4 -// | | | | task1 | 2 -// task3 | task1 | task2 | 3 -// | | \ / task3 | 1 -// task4 task4 root task4 | 0 -// -// The output can be used to efficiently maintain a queue of -// "ready to run" tasks. - -// static -unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( - internal::WorkerPoolTask* task, - internal::WorkerPoolTask* dependent, - unsigned priority, - ScheduledTaskMap* scheduled_tasks) { - // Skip sub-tree if task has already completed. - if (task->HasCompleted()) - return priority; - - ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); - if (scheduled_it != scheduled_tasks->end()) { - DCHECK(dependent); - scheduled_it->second->dependents().push_back(dependent); - return priority; - } - - typedef internal::WorkerPoolTask::TaskVector TaskVector; - for (TaskVector::iterator it = task->dependencies().begin(); - it != task->dependencies().end(); ++it) { - internal::WorkerPoolTask* dependency = *it; - priority = BuildScheduledTaskMapRecursive( - dependency, task, priority, scheduled_tasks); - } - - scheduled_tasks->set(task, - make_scoped_ptr(new ScheduledTask(dependent, - priority))); - - return priority + 1; -} - -// static -void WorkerPool::Inner::BuildScheduledTaskMap( - internal::WorkerPoolTask* root, - ScheduledTaskMap* scheduled_tasks) { - const unsigned kBasePriority = 0u; - DCHECK(root); - BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); + has_pending_tasks_cv_.Signal(); } WorkerPool::WorkerPool(size_t num_threads, @@ -542,6 +297,7 @@ WorkerPool::WorkerPool(size_t num_threads, const std::string& thread_name_prefix) : client_(NULL), origin_loop_(base::MessageLoopProxy::current()), + weak_ptr_factory_(this), check_for_completed_tasks_delay_(check_for_completed_tasks_delay), check_for_completed_tasks_pending_(false), inner_(make_scoped_ptr(new Inner(this, @@ -550,80 +306,74 @@ WorkerPool::WorkerPool(size_t num_threads, } WorkerPool::~WorkerPool() { + // Cancel all pending callbacks. + weak_ptr_factory_.InvalidateWeakPtrs(); + + DCHECK_EQ(0u, completed_tasks_.size()); } void WorkerPool::Shutdown() { inner_->Shutdown(); + inner_->CollectCompletedTasks(); + DispatchCompletionCallbacks(); +} - TaskDeque completed_tasks; - inner_->CollectCompletedTasks(&completed_tasks); - DispatchCompletionCallbacks(&completed_tasks); +void WorkerPool::PostTaskAndReply( + const Callback& task, const base::Closure& reply) { + PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( + task, + reply)).PassAs<internal::WorkerPoolTask>()); } -void WorkerPool::OnIdle(TaskDeque* completed_tasks) { +void WorkerPool::OnIdle() { TRACE_EVENT0("cc", "WorkerPool::OnIdle"); - DispatchCompletionCallbacks(completed_tasks); - - // Cancel any pending check for completed tasks. - check_for_completed_tasks_callback_.Cancel(); - check_for_completed_tasks_pending_ = false; + DispatchCompletionCallbacks(); } void WorkerPool::ScheduleCheckForCompletedTasks() { if (check_for_completed_tasks_pending_) return; - check_for_completed_tasks_callback_.Reset( - base::Bind(&WorkerPool::CheckForCompletedTasks, - base::Unretained(this))); origin_loop_->PostDelayedTask( FROM_HERE, - check_for_completed_tasks_callback_.callback(), + base::Bind(&WorkerPool::CheckForCompletedTasks, + weak_ptr_factory_.GetWeakPtr()), check_for_completed_tasks_delay_); check_for_completed_tasks_pending_ = true; } void WorkerPool::CheckForCompletedTasks() { TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); - check_for_completed_tasks_callback_.Cancel(); + DCHECK(check_for_completed_tasks_pending_); check_for_completed_tasks_pending_ = false; - TaskDeque completed_tasks; - // Schedule another check for completed tasks if not idle. - if (!inner_->CollectCompletedTasks(&completed_tasks)) + if (!inner_->CollectCompletedTasks()) ScheduleCheckForCompletedTasks(); - DispatchCompletionCallbacks(&completed_tasks); + DispatchCompletionCallbacks(); } -void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { +void WorkerPool::DispatchCompletionCallbacks() { TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); - // Early out when |completed_tasks| is empty to prevent unnecessary - // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). - if (completed_tasks->empty()) + if (completed_tasks_.empty()) return; - while (!completed_tasks->empty()) { - scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); - completed_tasks->pop_front(); + while (completed_tasks_.size()) { + scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); task->DidComplete(); - task->DispatchCompletionCallback(); } DCHECK(client_); client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); } -void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { - TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); - - // Schedule check for completed tasks. - if (root) - ScheduleCheckForCompletedTasks(); +void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { + // Schedule check for completed tasks if not pending. + ScheduleCheckForCompletedTasks(); - inner_->ScheduleTasks(root); + inner_->PostTask(task.Pass()); } } // namespace cc diff --git a/cc/base/worker_pool.h b/cc/base/worker_pool.h index 627e636..aa49ec9 100644 --- a/cc/base/worker_pool.h +++ b/cc/base/worker_pool.h @@ -5,52 +5,31 @@ #ifndef CC_BASE_WORKER_POOL_H_ #define CC_BASE_WORKER_POOL_H_ -#include <deque> #include <string> -#include <vector> #include "base/cancelable_callback.h" -#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/message_loop.h" #include "cc/base/cc_export.h" +#include "cc/base/scoped_ptr_deque.h" namespace cc { namespace internal { -class CC_EXPORT WorkerPoolTask - : public base::RefCountedThreadSafe<WorkerPoolTask> { +class WorkerPoolTask { public: - typedef std::vector<scoped_refptr<WorkerPoolTask> > TaskVector; + virtual ~WorkerPoolTask(); virtual void RunOnThread(unsigned thread_index) = 0; - virtual void DispatchCompletionCallback() = 0; - void DidSchedule(); - void WillRun(); - void DidRun(); void DidComplete(); - bool IsReadyToRun() const; - bool HasFinishedRunning() const; - bool HasCompleted() const; - - TaskVector& dependencies() { return dependencies_; } - protected: - friend class base::RefCountedThreadSafe<WorkerPoolTask>; - - WorkerPoolTask(); - explicit WorkerPoolTask(TaskVector* dependencies); - virtual ~WorkerPoolTask(); + explicit WorkerPoolTask(const base::Closure& reply); - private: - bool did_schedule_; - bool did_run_; - bool did_complete_; - TaskVector dependencies_; + const base::Closure reply_; }; } // namespace internal @@ -63,49 +42,67 @@ class CC_EXPORT WorkerPoolClient { virtual ~WorkerPoolClient() {} }; -// A worker thread pool that runs tasks provided by task graph and -// guarantees completion of all pending tasks at shutdown. +// A worker thread pool that runs rendering tasks and guarantees completion +// of all pending tasks at shutdown. class CC_EXPORT WorkerPool { public: + typedef base::Callback<void()> Callback; + virtual ~WorkerPool(); + static scoped_ptr<WorkerPool> Create( + size_t num_threads, + base::TimeDelta check_for_completed_tasks_delay, + const std::string& thread_name_prefix) { + return make_scoped_ptr(new WorkerPool(num_threads, + check_for_completed_tasks_delay, + thread_name_prefix)); + } + // Tells the worker pool to shutdown and returns once all pending tasks have // completed. - virtual void Shutdown(); + void Shutdown(); + + // Posts |task| to worker pool. On completion, |reply| + // is posted to the thread that called PostTaskAndReply(). + void PostTaskAndReply(const Callback& task, const base::Closure& reply); // Set a new client. void SetClient(WorkerPoolClient* client) { client_ = client; } - // Force a check for completed tasks. - void CheckForCompletedTasks(); - protected: WorkerPool(size_t num_threads, base::TimeDelta check_for_completed_tasks_delay, const std::string& thread_name_prefix); - void ScheduleTasks(internal::WorkerPoolTask* root); + void PostTask(scoped_ptr<internal::WorkerPoolTask> task); private: class Inner; friend class Inner; - typedef std::deque<scoped_refptr<internal::WorkerPoolTask> > TaskDeque; - - void OnIdle(TaskDeque* completed_tasks); + void OnTaskCompleted(); + void OnIdle(); void ScheduleCheckForCompletedTasks(); - void DispatchCompletionCallbacks(TaskDeque* completed_tasks); + void CheckForCompletedTasks(); + void DispatchCompletionCallbacks(); WorkerPoolClient* client_; scoped_refptr<base::MessageLoopProxy> origin_loop_; - base::CancelableClosure check_for_completed_tasks_callback_; + base::WeakPtrFactory<WorkerPool> weak_ptr_factory_; base::TimeDelta check_for_completed_tasks_delay_; bool check_for_completed_tasks_pending_; + // Holds all completed tasks for which we have not yet dispatched + // reply callbacks. + ScopedPtrDeque<internal::WorkerPoolTask> completed_tasks_; + // Hide the gory details of the worker pool in |inner_|. const scoped_ptr<Inner> inner_; + + DISALLOW_COPY_AND_ASSIGN(WorkerPool); }; } // namespace cc diff --git a/cc/base/worker_pool_perftest.cc b/cc/base/worker_pool_perftest.cc deleted file mode 100644 index c03bcc8..0000000 --- a/cc/base/worker_pool_perftest.cc +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2013 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "cc/base/worker_pool.h" - -#include "base/time.h" -#include "cc/base/completion_event.h" -#include "testing/gtest/include/gtest/gtest.h" - -namespace cc { - -namespace { - -static const int kTimeLimitMillis = 2000; -static const int kWarmupRuns = 5; -static const int kTimeCheckInterval = 10; - -class PerfTaskImpl : public internal::WorkerPoolTask { - public: - explicit PerfTaskImpl(internal::WorkerPoolTask::TaskVector* dependencies) - : internal::WorkerPoolTask(dependencies) {} - - // Overridden from internal::WorkerPoolTask: - virtual void RunOnThread(unsigned thread_index) OVERRIDE {} - virtual void DispatchCompletionCallback() OVERRIDE {} - - private: - virtual ~PerfTaskImpl() {} -}; - -class PerfControlTaskImpl : public internal::WorkerPoolTask { - public: - explicit PerfControlTaskImpl( - internal::WorkerPoolTask::TaskVector* dependencies) - : internal::WorkerPoolTask(dependencies), - did_start_(new CompletionEvent), - can_finish_(new CompletionEvent) {} - - // Overridden from internal::WorkerPoolTask: - virtual void RunOnThread(unsigned thread_index) OVERRIDE { - did_start_->Signal(); - can_finish_->Wait(); - } - virtual void DispatchCompletionCallback() OVERRIDE {} - - void WaitForTaskToStartRunning() { - did_start_->Wait(); - } - - void AllowTaskToFinish() { - can_finish_->Signal(); - } - - private: - virtual ~PerfControlTaskImpl() {} - - scoped_ptr<CompletionEvent> did_start_; - scoped_ptr<CompletionEvent> can_finish_; -}; - -class PerfWorkerPool : public WorkerPool { - public: - PerfWorkerPool() : WorkerPool(1, base::TimeDelta::FromDays(1024), "test") {} - virtual ~PerfWorkerPool() {} - - static scoped_ptr<PerfWorkerPool> Create() { - return make_scoped_ptr(new PerfWorkerPool); - } - - void ScheduleTasks(internal::WorkerPoolTask* root) { - WorkerPool::ScheduleTasks(root); - } -}; - -class WorkerPoolPerfTest : public testing::Test, - public WorkerPoolClient { - public: - WorkerPoolPerfTest() : num_runs_(0) {} - - // Overridden from testing::Test: - virtual void SetUp() OVERRIDE { - worker_pool_ = PerfWorkerPool::Create(); - worker_pool_->SetClient(this); - } - virtual void TearDown() OVERRIDE { - worker_pool_->Shutdown(); - } - - // Overridden from WorkerPoolClient: - virtual void DidFinishDispatchingWorkerPoolCompletionCallbacks() OVERRIDE {} - - void EndTest() { - elapsed_ = base::TimeTicks::HighResNow() - start_time_; - } - - void AfterTest(const std::string test_name) { - // Format matches chrome/test/perf/perf_test.h:PrintResult - printf("*RESULT %s: %.2f runs/s\n", - test_name.c_str(), - num_runs_ / elapsed_.InSecondsF()); - } - - void BuildTaskGraph(internal::WorkerPoolTask::TaskVector* dependencies, - unsigned current_depth, - unsigned max_depth, - unsigned num_children_per_node) { - internal::WorkerPoolTask::TaskVector children; - if (current_depth < max_depth) { - for (unsigned i = 0; i < num_children_per_node; ++i) { - BuildTaskGraph(&children, - current_depth + 1, - max_depth, - num_children_per_node); - } - } else if (leaf_task_) { - children.push_back(leaf_task_); - } - dependencies->push_back(make_scoped_refptr(new PerfTaskImpl(&children))); - } - - bool DidRun() { - ++num_runs_; - if (num_runs_ == kWarmupRuns) - start_time_ = base::TimeTicks::HighResNow(); - - if (!start_time_.is_null() && (num_runs_ % kTimeCheckInterval) == 0) { - base::TimeDelta elapsed = base::TimeTicks::HighResNow() - start_time_; - if (elapsed >= base::TimeDelta::FromMilliseconds(kTimeLimitMillis)) { - elapsed_ = elapsed; - return false; - } - } - - return true; - } - - void RunBuildTaskGraphTest(const std::string test_name, - unsigned max_depth, - unsigned num_children_per_node) { - start_time_ = base::TimeTicks(); - num_runs_ = 0; - do { - internal::WorkerPoolTask::TaskVector children; - BuildTaskGraph(&children, 0, max_depth, num_children_per_node); - } while (DidRun()); - - AfterTest(test_name); - } - - void RunScheduleTasksTest(const std::string test_name, - unsigned max_depth, - unsigned num_children_per_node) { - start_time_ = base::TimeTicks(); - num_runs_ = 0; - do { - internal::WorkerPoolTask::TaskVector empty; - leaf_task_ = make_scoped_refptr(new PerfControlTaskImpl(&empty)); - internal::WorkerPoolTask::TaskVector children; - BuildTaskGraph(&children, 0, max_depth, num_children_per_node); - scoped_refptr<PerfTaskImpl> root_task( - make_scoped_refptr(new PerfTaskImpl(&children))); - - worker_pool_->ScheduleTasks(root_task); - leaf_task_->WaitForTaskToStartRunning(); - worker_pool_->ScheduleTasks(NULL); - worker_pool_->CheckForCompletedTasks(); - leaf_task_->AllowTaskToFinish(); - } while (DidRun()); - - AfterTest(test_name); - } - - void RunExecuteTasksTest(const std::string test_name, - unsigned max_depth, - unsigned num_children_per_node) { - start_time_ = base::TimeTicks(); - num_runs_ = 0; - do { - internal::WorkerPoolTask::TaskVector children; - BuildTaskGraph(&children, 0, max_depth, num_children_per_node); - scoped_refptr<PerfControlTaskImpl> root_task( - make_scoped_refptr(new PerfControlTaskImpl(&children))); - - worker_pool_->ScheduleTasks(root_task); - root_task->WaitForTaskToStartRunning(); - root_task->AllowTaskToFinish(); - worker_pool_->CheckForCompletedTasks(); - } while (DidRun()); - - AfterTest(test_name); - } - - protected: - scoped_ptr<PerfWorkerPool> worker_pool_; - scoped_refptr<PerfControlTaskImpl> leaf_task_; - base::TimeTicks start_time_; - base::TimeDelta elapsed_; - int num_runs_; -}; - -TEST_F(WorkerPoolPerfTest, BuildTaskGraph) { - RunBuildTaskGraphTest("build_task_graph_1_10", 1, 10); - RunBuildTaskGraphTest("build_task_graph_1_1000", 1, 1000); - RunBuildTaskGraphTest("build_task_graph_2_10", 2, 10); - RunBuildTaskGraphTest("build_task_graph_5_5", 5, 5); - RunBuildTaskGraphTest("build_task_graph_10_2", 10, 2); - RunBuildTaskGraphTest("build_task_graph_1000_1", 1000, 1); - RunBuildTaskGraphTest("build_task_graph_10_1", 10, 1); -} - -TEST_F(WorkerPoolPerfTest, ScheduleTasks) { - RunScheduleTasksTest("schedule_tasks_1_10", 1, 10); - RunScheduleTasksTest("schedule_tasks_1_1000", 1, 1000); - RunScheduleTasksTest("schedule_tasks_2_10", 2, 10); - RunScheduleTasksTest("schedule_tasks_5_5", 5, 5); - RunScheduleTasksTest("schedule_tasks_10_2", 10, 2); - RunScheduleTasksTest("schedule_tasks_1000_1", 1000, 1); - RunScheduleTasksTest("schedule_tasks_10_1", 10, 1); -} - -TEST_F(WorkerPoolPerfTest, ExecuteTasks) { - RunExecuteTasksTest("execute_tasks_1_10", 1, 10); - RunExecuteTasksTest("execute_tasks_1_1000", 1, 1000); - RunExecuteTasksTest("execute_tasks_2_10", 2, 10); - RunExecuteTasksTest("execute_tasks_5_5", 5, 5); - RunExecuteTasksTest("execute_tasks_10_2", 10, 2); - RunExecuteTasksTest("execute_tasks_1000_1", 1000, 1); - RunExecuteTasksTest("execute_tasks_10_1", 10, 1); -} - -} // namespace - -} // namespace cc diff --git a/cc/base/worker_pool_unittest.cc b/cc/base/worker_pool_unittest.cc index de904b1..c032e2c 100644 --- a/cc/base/worker_pool_unittest.cc +++ b/cc/base/worker_pool_unittest.cc @@ -4,101 +4,27 @@ #include "cc/base/worker_pool.h" -#include <vector> - -#include "cc/base/completion_event.h" #include "testing/gtest/include/gtest/gtest.h" namespace cc { namespace { -class FakeTaskImpl : public internal::WorkerPoolTask { - public: - FakeTaskImpl(const base::Closure& callback, - const base::Closure& reply, - internal::WorkerPoolTask::TaskVector* dependencies) - : internal::WorkerPoolTask(dependencies), - callback_(callback), - reply_(reply) { - } - FakeTaskImpl(const base::Closure& callback, const base::Closure& reply) - : callback_(callback), - reply_(reply) { - } - - // Overridden from internal::WorkerPoolTask: - virtual void RunOnThread(unsigned thread_index) OVERRIDE { - if (!callback_.is_null()) - callback_.Run(); - } - virtual void DispatchCompletionCallback() OVERRIDE { - if (!reply_.is_null()) - reply_.Run(); - } - - private: - virtual ~FakeTaskImpl() {} - - const base::Closure callback_; - const base::Closure reply_; -}; - -class FakeWorkerPool : public WorkerPool { - public: - FakeWorkerPool() : WorkerPool(1, base::TimeDelta::FromDays(1024), "test") {} - virtual ~FakeWorkerPool() {} - - static scoped_ptr<FakeWorkerPool> Create() { - return make_scoped_ptr(new FakeWorkerPool); - } - - void ScheduleTasks(const base::Closure& callback, - const base::Closure& reply, - const base::Closure& dependency, - int count) { - scoped_refptr<FakeTaskImpl> dependency_task( - new FakeTaskImpl(dependency, base::Closure())); - - internal::WorkerPoolTask::TaskVector tasks; - for (int i = 0; i < count; ++i) { - internal::WorkerPoolTask::TaskVector dependencies(1, dependency_task); - tasks.push_back(new FakeTaskImpl(callback, reply, &dependencies)); - } - scoped_refptr<FakeTaskImpl> completion_task( - new FakeTaskImpl(base::Bind(&FakeWorkerPool::OnTasksCompleted, - base::Unretained(this)), - base::Closure(), - &tasks)); - - scheduled_tasks_completion_.reset(new CompletionEvent); - WorkerPool::ScheduleTasks(completion_task); - } - - void WaitForTasksToComplete() { - DCHECK(scheduled_tasks_completion_); - scheduled_tasks_completion_->Wait(); - } - - private: - void OnTasksCompleted() { - DCHECK(scheduled_tasks_completion_); - scheduled_tasks_completion_->Signal(); - } - - scoped_ptr<CompletionEvent> scheduled_tasks_completion_; -}; - class WorkerPoolTest : public testing::Test, public WorkerPoolClient { public: - WorkerPoolTest() : finish_dispatching_completion_callbacks_count_(0) {} - virtual ~WorkerPoolTest() {} + WorkerPoolTest() + : run_task_count_(0), + on_task_completed_count_(0), + finish_dispatching_completion_callbacks_count_(0) { + } + virtual ~WorkerPoolTest() { + } - // Overridden from testing::Test: virtual void SetUp() OVERRIDE { Reset(); } + virtual void TearDown() OVERRIDE { worker_pool_->Shutdown(); } @@ -109,34 +35,35 @@ class WorkerPoolTest : public testing::Test, } void Reset() { - worker_pool_ = FakeWorkerPool::Create(); + worker_pool_ = WorkerPool::Create(1, + base::TimeDelta::FromDays(1024), + "test"); worker_pool_->SetClient(this); } void RunAllTasksAndReset() { - worker_pool_->WaitForTasksToComplete(); worker_pool_->Shutdown(); Reset(); } - FakeWorkerPool* worker_pool() { + WorkerPool* worker_pool() { return worker_pool_.get(); } - void RunTask(unsigned id) { - run_task_ids_.push_back(id); + void RunTask() { + ++run_task_count_; } - void OnTaskCompleted(unsigned id) { - on_task_completed_ids_.push_back(id); + void OnTaskCompleted() { + ++on_task_completed_count_; } - const std::vector<unsigned>& run_task_ids() { - return run_task_ids_; + unsigned run_task_count() { + return run_task_count_; } - const std::vector<unsigned>& on_task_completed_ids() { - return on_task_completed_ids_; + unsigned on_task_completed_count() { + return on_task_completed_count_; } unsigned finish_dispatching_completion_callbacks_count() { @@ -144,72 +71,39 @@ class WorkerPoolTest : public testing::Test, } private: - scoped_ptr<FakeWorkerPool> worker_pool_; - std::vector<unsigned> run_task_ids_; - std::vector<unsigned> on_task_completed_ids_; + scoped_ptr<WorkerPool> worker_pool_; + unsigned run_task_count_; + unsigned on_task_completed_count_; unsigned finish_dispatching_completion_callbacks_count_; }; TEST_F(WorkerPoolTest, Basic) { - EXPECT_EQ(0u, run_task_ids().size()); - EXPECT_EQ(0u, on_task_completed_ids().size()); + EXPECT_EQ(0u, run_task_count()); + EXPECT_EQ(0u, on_task_completed_count()); EXPECT_EQ(0u, finish_dispatching_completion_callbacks_count()); - worker_pool()->ScheduleTasks( - base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u), - base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 0u), - base::Closure(), - 1); + worker_pool()->PostTaskAndReply( + base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)), + base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this))); RunAllTasksAndReset(); - EXPECT_EQ(1u, run_task_ids().size()); - EXPECT_EQ(1u, on_task_completed_ids().size()); + EXPECT_EQ(1u, run_task_count()); + EXPECT_EQ(1u, on_task_completed_count()); EXPECT_EQ(1u, finish_dispatching_completion_callbacks_count()); - worker_pool()->ScheduleTasks( - base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u), - base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 0u), - base::Closure(), - 2); + worker_pool()->PostTaskAndReply( + base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)), + base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this))); + worker_pool()->PostTaskAndReply( + base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)), + base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this))); RunAllTasksAndReset(); - EXPECT_EQ(3u, run_task_ids().size()); - EXPECT_EQ(3u, on_task_completed_ids().size()); + EXPECT_EQ(3u, run_task_count()); + EXPECT_EQ(3u, on_task_completed_count()); EXPECT_EQ(2u, finish_dispatching_completion_callbacks_count()); } -TEST_F(WorkerPoolTest, Dependencies) { - worker_pool()->ScheduleTasks( - base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 1u), - base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 1u), - base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u), - 1); - RunAllTasksAndReset(); - - // Check if dependency ran before task. - ASSERT_EQ(2u, run_task_ids().size()); - EXPECT_EQ(0u, run_task_ids()[0]); - EXPECT_EQ(1u, run_task_ids()[1]); - ASSERT_EQ(1u, on_task_completed_ids().size()); - EXPECT_EQ(1u, on_task_completed_ids()[0]); - - worker_pool()->ScheduleTasks( - base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 1u), - base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 1u), - base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u), - 2); - RunAllTasksAndReset(); - - // Dependency should only run once. - ASSERT_EQ(5u, run_task_ids().size()); - EXPECT_EQ(0u, run_task_ids()[2]); - EXPECT_EQ(1u, run_task_ids()[3]); - EXPECT_EQ(1u, run_task_ids()[4]); - ASSERT_EQ(3u, on_task_completed_ids().size()); - EXPECT_EQ(1u, on_task_completed_ids()[1]); - EXPECT_EQ(1u, on_task_completed_ids()[2]); -} - } // namespace } // namespace cc |