summaryrefslogtreecommitdiffstats
path: root/cc/base
diff options
context:
space:
mode:
authorrtenneti@chromium.org <rtenneti@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-29 01:28:07 +0000
committerrtenneti@chromium.org <rtenneti@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-29 01:28:07 +0000
commit8876310adb585c1639c64b50432793f2ac35a513 (patch)
tree55f31e85d8fa74245318bf6435945759213697f6 /cc/base
parenta649b5e509c33907d8cb538f2820fbc963210fea (diff)
downloadchromium_src-8876310adb585c1639c64b50432793f2ac35a513.zip
chromium_src-8876310adb585c1639c64b50432793f2ac35a513.tar.gz
chromium_src-8876310adb585c1639c64b50432793f2ac35a513.tar.bz2
Revert 202363 "cc: Cancel and re-prioritize worker pool tasks."
> cc: Cancel and re-prioritize worker pool tasks. > > This adds a task graph interface to the worker pool and > implements a simple queue instance of this interface for > use by the tile manager. > > The task graph interface can be used describe more > complicated task dependencies in the future and > provides the immediate benefit of seamlessly being > able to cancel and re-prioritize tasks. > > BUG=178974 > > Review URL: https://chromiumcodereview.appspot.com/14689004 TBR=reveman@chromium.org Review URL: https://codereview.chromium.org/16178002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@202736 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'cc/base')
-rw-r--r--cc/base/scoped_ptr_hash_map.h2
-rw-r--r--cc/base/worker_pool.cc480
-rw-r--r--cc/base/worker_pool.h73
-rw-r--r--cc/base/worker_pool_perftest.cc234
-rw-r--r--cc/base/worker_pool_unittest.cc182
5 files changed, 189 insertions, 782 deletions
diff --git a/cc/base/scoped_ptr_hash_map.h b/cc/base/scoped_ptr_hash_map.h
index af792ec..970984a 100644
--- a/cc/base/scoped_ptr_hash_map.h
+++ b/cc/base/scoped_ptr_hash_map.h
@@ -31,7 +31,7 @@ class ScopedPtrHashMap {
~ScopedPtrHashMap() { clear(); }
- void swap(ScopedPtrHashMap<Key, Value>& other) {
+ void swap(ScopedPtrHashMap<Key, Value*>& other) {
data_.swap(other.data_);
}
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
index b2fb185..59dc828 100644
--- a/cc/base/worker_pool.cc
+++ b/cc/base/worker_pool.cc
@@ -4,93 +4,46 @@
#include "cc/base/worker_pool.h"
-#if defined(OS_ANDROID)
-// TODO(epenner): Move thread priorities to base. (crbug.com/170549)
-#include <sys/resource.h>
-#endif
-
-#include <map>
+#include <algorithm>
#include "base/bind.h"
#include "base/debug/trace_event.h"
-#include "base/hash_tables.h"
#include "base/stringprintf.h"
+#include "base/synchronization/condition_variable.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_restrictions.h"
-#include "cc/base/scoped_ptr_deque.h"
-#include "cc/base/scoped_ptr_hash_map.h"
-
-#if defined(COMPILER_GCC)
-namespace BASE_HASH_NAMESPACE {
-template <> struct hash<cc::internal::WorkerPoolTask*> {
- size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
- return hash<size_t>()(reinterpret_cast<size_t>(ptr));
- }
-};
-} // namespace BASE_HASH_NAMESPACE
-#endif // COMPILER
namespace cc {
-namespace internal {
+namespace {
-WorkerPoolTask::WorkerPoolTask()
- : did_schedule_(false),
- did_run_(false),
- did_complete_(false) {
-}
-
-WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
- : did_schedule_(false),
- did_run_(false),
- did_complete_(false) {
- dependencies_.swap(*dependencies);
-}
-
-WorkerPoolTask::~WorkerPoolTask() {
- DCHECK_EQ(did_schedule_, did_complete_);
- DCHECK(!did_run_ || did_schedule_);
- DCHECK(!did_run_ || did_complete_);
-}
+class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
+ public:
+ WorkerPoolTaskImpl(const WorkerPool::Callback& task,
+ const base::Closure& reply)
+ : internal::WorkerPoolTask(reply),
+ task_(task) {}
-void WorkerPoolTask::DidSchedule() {
- DCHECK(!did_complete_);
- did_schedule_ = true;
-}
+ virtual void RunOnThread(unsigned thread_index) OVERRIDE {
+ task_.Run();
+ }
-void WorkerPoolTask::WillRun() {
- DCHECK(did_schedule_);
- DCHECK(!did_complete_);
- DCHECK(!did_run_);
-}
+ private:
+ WorkerPool::Callback task_;
+};
-void WorkerPoolTask::DidRun() {
- did_run_ = true;
-}
+} // namespace
-void WorkerPoolTask::DidComplete() {
- DCHECK(did_schedule_);
- DCHECK(!did_complete_);
- did_complete_ = true;
-}
+namespace internal {
-bool WorkerPoolTask::IsReadyToRun() const {
- // TODO(reveman): Use counter to improve performance.
- for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
- it != dependencies_.rend(); ++it) {
- WorkerPoolTask* dependency = *it;
- if (!dependency->HasFinishedRunning())
- return false;
- }
- return true;
+WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
}
-bool WorkerPoolTask::HasFinishedRunning() const {
- return did_run_;
+WorkerPoolTask::~WorkerPoolTask() {
}
-bool WorkerPoolTask::HasCompleted() const {
- return did_complete_;
+void WorkerPoolTask::DidComplete() {
+ reply_.Run();
}
} // namespace internal
@@ -107,51 +60,17 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
void Shutdown();
- // Schedule running of |root| task and all its dependencies. Tasks
- // previously scheduled but no longer needed to run |root| will be
- // canceled unless already running. Canceled tasks are moved to
- // |completed_tasks_| without being run. The result is that once
- // scheduled, a task is guaranteed to end up in the |completed_tasks_|
- // queue even if they later get canceled by another call to
- // ScheduleTasks().
- void ScheduleTasks(internal::WorkerPoolTask* root);
+ void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
- // Collect all completed tasks in |completed_tasks|. Returns true if idle.
- bool CollectCompletedTasks(TaskDeque* completed_tasks);
+ // Appends all completed tasks to worker pool's completed tasks queue
+ // and returns true if idle.
+ bool CollectCompletedTasks();
private:
- class ScheduledTask {
- public:
- ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority)
- : priority_(priority) {
- if (dependent)
- dependents_.push_back(dependent);
- }
-
- internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
- unsigned priority() const { return priority_; }
-
- private:
- internal::WorkerPoolTask::TaskVector dependents_;
- unsigned priority_;
- };
- typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
- typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
- ScheduledTaskMap;
-
- // This builds a ScheduledTaskMap from a root task.
- static unsigned BuildScheduledTaskMapRecursive(
- internal::WorkerPoolTask* task,
- internal::WorkerPoolTask* dependent,
- unsigned priority,
- ScheduledTaskMap* scheduled_tasks);
- static void BuildScheduledTaskMap(
- internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
-
- // Collect all completed tasks by swapping the contents of
- // |completed_tasks| and |completed_tasks_|. Lock must be acquired
- // before calling this function. Returns true if idle.
- bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
+ // Appends all completed tasks to |completed_tasks|. Lock must
+ // already be acquired before calling this function.
+ bool AppendCompletedTasksWithLockAcquired(
+ ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
// Schedule an OnIdleOnOriginThread callback if not already pending.
// Lock must already be acquired before calling this function.
@@ -171,8 +90,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
mutable base::Lock lock_;
// Condition variable that is waited on by worker threads until new
- // tasks are ready to run or shutdown starts.
- base::ConditionVariable has_ready_to_run_tasks_cv_;
+ // tasks are posted or shutdown starts.
+ base::ConditionVariable has_pending_tasks_cv_;
// Target message loop used for posting callbacks.
scoped_refptr<base::MessageLoopProxy> origin_loop_;
@@ -187,25 +106,15 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
// loop index is 0.
unsigned next_thread_index_;
+ // Number of tasks currently running.
+ unsigned running_task_count_;
+
// Set during shutdown. Tells workers to exit when no more tasks
// are pending.
bool shutdown_;
- // The root task that is a dependent of all other tasks.
- scoped_refptr<internal::WorkerPoolTask> root_;
-
- // This set contains all pending tasks.
- ScheduledTaskMap pending_tasks_;
-
- // Ordered set of tasks that are ready to run.
- // TODO(reveman): priority_queue might be more efficient.
- typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
- TaskMap ready_to_run_tasks_;
-
- // This set contains all currently running tasks.
- ScheduledTaskMap running_tasks_;
-
- // Completed tasks not yet collected by origin thread.
+ typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
+ TaskDeque pending_tasks_;
TaskDeque completed_tasks_;
ScopedPtrDeque<base::DelegateSimpleThread> workers_;
@@ -218,24 +127,25 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
const std::string& thread_name_prefix)
: worker_pool_on_origin_thread_(worker_pool),
lock_(),
- has_ready_to_run_tasks_cv_(&lock_),
+ has_pending_tasks_cv_(&lock_),
origin_loop_(base::MessageLoopProxy::current()),
weak_ptr_factory_(this),
on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
weak_ptr_factory_.GetWeakPtr())),
on_idle_pending_(false),
next_thread_index_(0),
+ running_task_count_(0),
shutdown_(false) {
base::AutoLock lock(lock_);
while (workers_.size() < num_threads) {
scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
new base::DelegateSimpleThread(
- this,
- thread_name_prefix +
- base::StringPrintf(
- "Worker%u",
- static_cast<unsigned>(workers_.size() + 1)).c_str()));
+ this,
+ thread_name_prefix +
+ base::StringPrintf(
+ "Worker%u",
+ static_cast<unsigned>(workers_.size() + 1)).c_str()));
worker->Start();
workers_.push_back(worker.Pass());
}
@@ -246,10 +156,12 @@ WorkerPool::Inner::~Inner() {
DCHECK(shutdown_);
+ // Cancel all pending callbacks.
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
DCHECK_EQ(0u, pending_tasks_.size());
- DCHECK_EQ(0u, ready_to_run_tasks_.size());
- DCHECK_EQ(0u, running_tasks_.size());
DCHECK_EQ(0u, completed_tasks_.size());
+ DCHECK_EQ(0u, running_task_count_);
}
void WorkerPool::Inner::Shutdown() {
@@ -261,7 +173,7 @@ void WorkerPool::Inner::Shutdown() {
// Wake up a worker so it knows it should exit. This will cause all workers
// to exit as each will wake up another worker before exiting.
- has_ready_to_run_tasks_cv_.Signal();
+ has_pending_tasks_cv_.Signal();
}
while (workers_.size()) {
@@ -271,100 +183,32 @@ void WorkerPool::Inner::Shutdown() {
base::ThreadRestrictions::ScopedAllowIO allow_io;
worker->Join();
}
-
- // Cancel any pending OnIdle callback.
- weak_ptr_factory_.InvalidateWeakPtrs();
}
-void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) {
- // It is OK to call ScheduleTasks() after shutdown if |root| is NULL.
- DCHECK(!root || !shutdown_);
-
- scoped_refptr<internal::WorkerPoolTask> new_root(root);
-
- ScheduledTaskMap new_pending_tasks;
- ScheduledTaskMap new_running_tasks;
- TaskMap new_ready_to_run_tasks;
-
- // Build scheduled task map before acquiring |lock_|.
- if (root)
- BuildScheduledTaskMap(root, &new_pending_tasks);
-
- {
- base::AutoLock lock(lock_);
-
- // First remove all completed tasks from |new_pending_tasks|.
- for (TaskDeque::iterator it = completed_tasks_.begin();
- it != completed_tasks_.end(); ++it) {
- internal::WorkerPoolTask* task = *it;
- new_pending_tasks.take_and_erase(task);
- }
-
- // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
- for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
- it != pending_tasks_.end(); ++it) {
- internal::WorkerPoolTask* task = it->first;
-
- // Task has completed if not present in |new_pending_tasks|.
- if (!new_pending_tasks.contains(task))
- completed_tasks_.push_back(task);
- }
-
- // Build new running task set.
- for (ScheduledTaskMap::iterator it = running_tasks_.begin();
- it != running_tasks_.end(); ++it) {
- internal::WorkerPoolTask* task = it->first;
- // Transfer scheduled task value from |new_pending_tasks| to
- // |new_running_tasks| if currently running. Value must be set to
- // NULL if |new_pending_tasks| doesn't contain task. This does
- // the right in both cases.
- new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
- }
-
- // Build new "ready to run" tasks queue.
- for (ScheduledTaskMap::iterator it = new_pending_tasks.begin();
- it != new_pending_tasks.end(); ++it) {
- internal::WorkerPoolTask* task = it->first;
-
- // Completed tasks should not exist in |new_pending_tasks_|.
- DCHECK(!task->HasFinishedRunning());
-
- // Call DidSchedule() to indicate that this task has been scheduled.
- // Note: This is only for debugging purposes.
- task->DidSchedule();
-
- DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
- if (task->IsReadyToRun())
- new_ready_to_run_tasks[it->second->priority()] = task;
- }
+void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
+ base::AutoLock lock(lock_);
- // Swap root taskand task sets.
- // Note: old tasks are intentionally destroyed after releasing |lock_|.
- root_.swap(new_root);
- pending_tasks_.swap(new_pending_tasks);
- running_tasks_.swap(new_running_tasks);
- ready_to_run_tasks_.swap(new_ready_to_run_tasks);
+ pending_tasks_.push_back(task.Pass());
- // If there is more work available, wake up worker thread.
- if (!ready_to_run_tasks_.empty())
- has_ready_to_run_tasks_cv_.Signal();
- }
+ // There is more work available, so wake up worker thread.
+ has_pending_tasks_cv_.Signal();
}
-bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
+bool WorkerPool::Inner::CollectCompletedTasks() {
base::AutoLock lock(lock_);
- return CollectCompletedTasksWithLockAcquired(completed_tasks);
+ return AppendCompletedTasksWithLockAcquired(
+ &worker_pool_on_origin_thread_->completed_tasks_);
}
-bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
- TaskDeque* completed_tasks) {
+bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
+ ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
lock_.AssertAcquired();
- DCHECK_EQ(0u, completed_tasks->size());
- completed_tasks->swap(completed_tasks_);
+ while (completed_tasks_.size())
+ completed_tasks->push_back(completed_tasks_.take_front().Pass());
- return running_tasks_.empty() && pending_tasks_.empty();
+ return !running_task_count_ && pending_tasks_.empty();
}
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
@@ -377,8 +221,6 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
}
void WorkerPool::Inner::OnIdleOnOriginThread() {
- TaskDeque completed_tasks;
-
{
base::AutoLock lock(lock_);
@@ -386,13 +228,14 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
on_idle_pending_ = false;
// Early out if no longer idle.
- if (!running_tasks_.empty() || !pending_tasks_.empty())
+ if (running_task_count_ || !pending_tasks_.empty())
return;
- CollectCompletedTasksWithLockAcquired(&completed_tasks);
+ AppendCompletedTasksWithLockAcquired(
+ &worker_pool_on_origin_thread_->completed_tasks_);
}
- worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
+ worker_pool_on_origin_thread_->OnIdle();
}
void WorkerPool::Inner::Run() {
@@ -408,37 +251,29 @@ void WorkerPool::Inner::Run() {
int thread_index = next_thread_index_++;
while (true) {
- if (ready_to_run_tasks_.empty()) {
- if (pending_tasks_.empty()) {
- // Exit when shutdown is set and no more tasks are pending.
- if (shutdown_)
- break;
-
- // Schedule an idle callback if no tasks are running.
- if (running_tasks_.empty())
- ScheduleOnIdleWithLockAcquired();
- }
-
- // Wait for more tasks.
- has_ready_to_run_tasks_cv_.Wait();
+ if (pending_tasks_.empty()) {
+ // Exit when shutdown is set and no more tasks are pending.
+ if (shutdown_)
+ break;
+
+ // Schedule an idle callback if requested and not pending.
+ if (!running_task_count_)
+ ScheduleOnIdleWithLockAcquired();
+
+ // Wait for new pending tasks.
+ has_pending_tasks_cv_.Wait();
continue;
}
- // Take top priority task from |ready_to_run_tasks_|.
- scoped_refptr<internal::WorkerPoolTask> task(
- ready_to_run_tasks_.begin()->second);
- ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
-
- // Move task from |pending_tasks_| to |running_tasks_|.
- DCHECK(pending_tasks_.contains(task));
- DCHECK(!running_tasks_.contains(task));
- running_tasks_.set(task, pending_tasks_.take_and_erase(task));
+ // Get next task.
+ scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
- // There may be more work available, so wake up another worker thread.
- has_ready_to_run_tasks_cv_.Signal();
+ // Increment |running_task_count_| before starting to run task.
+ running_task_count_++;
- // Call WillRun() before releasing |lock_| and running task.
- task->WillRun();
+ // There may be more work available, so wake up another
+ // worker thread.
+ has_pending_tasks_cv_.Signal();
{
base::AutoUnlock unlock(lock_);
@@ -446,95 +281,15 @@ void WorkerPool::Inner::Run() {
task->RunOnThread(thread_index);
}
- // This will mark task as finished running.
- task->DidRun();
-
- // Now iterate over all dependents to check if they are ready to run.
- scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase(
- task);
- if (scheduled_task) {
- typedef internal::WorkerPoolTask::TaskVector TaskVector;
- for (TaskVector::iterator it = scheduled_task->dependents().begin();
- it != scheduled_task->dependents().end(); ++it) {
- internal::WorkerPoolTask* dependent = *it;
- if (!dependent->IsReadyToRun())
- continue;
-
- // Task is ready. Add it to |ready_to_run_tasks_|.
- DCHECK(pending_tasks_.contains(dependent));
- unsigned priority = pending_tasks_.get(dependent)->priority();
- DCHECK(!ready_to_run_tasks_.count(priority) ||
- ready_to_run_tasks_[priority] == dependent);
- ready_to_run_tasks_[priority] = dependent;
- }
- }
+ completed_tasks_.push_back(task.Pass());
- // Finally add task to |completed_tasks_|.
- completed_tasks_.push_back(task);
+ // Decrement |running_task_count_| now that we are done running task.
+ running_task_count_--;
}
// We noticed we should exit. Wake up the next worker so it knows it should
// exit as well (because the Shutdown() code only signals once).
- has_ready_to_run_tasks_cv_.Signal();
-}
-
-// BuildScheduledTaskMap() takes a task tree as input and constructs
-// a unique set of tasks with edges between dependencies pointing in
-// the direction of the dependents. Each task is given a unique priority
-// which is currently the same as the DFS traversal order.
-//
-// Input: Output:
-//
-// root task4 Task | Priority (lower is better)
-// / \ / \ -------+---------------------------
-// task1 task2 task3 task2 root | 4
-// | | | | task1 | 2
-// task3 | task1 | task2 | 3
-// | | \ / task3 | 1
-// task4 task4 root task4 | 0
-//
-// The output can be used to efficiently maintain a queue of
-// "ready to run" tasks.
-
-// static
-unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
- internal::WorkerPoolTask* task,
- internal::WorkerPoolTask* dependent,
- unsigned priority,
- ScheduledTaskMap* scheduled_tasks) {
- // Skip sub-tree if task has already completed.
- if (task->HasCompleted())
- return priority;
-
- ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
- if (scheduled_it != scheduled_tasks->end()) {
- DCHECK(dependent);
- scheduled_it->second->dependents().push_back(dependent);
- return priority;
- }
-
- typedef internal::WorkerPoolTask::TaskVector TaskVector;
- for (TaskVector::iterator it = task->dependencies().begin();
- it != task->dependencies().end(); ++it) {
- internal::WorkerPoolTask* dependency = *it;
- priority = BuildScheduledTaskMapRecursive(
- dependency, task, priority, scheduled_tasks);
- }
-
- scheduled_tasks->set(task,
- make_scoped_ptr(new ScheduledTask(dependent,
- priority)));
-
- return priority + 1;
-}
-
-// static
-void WorkerPool::Inner::BuildScheduledTaskMap(
- internal::WorkerPoolTask* root,
- ScheduledTaskMap* scheduled_tasks) {
- const unsigned kBasePriority = 0u;
- DCHECK(root);
- BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
+ has_pending_tasks_cv_.Signal();
}
WorkerPool::WorkerPool(size_t num_threads,
@@ -542,6 +297,7 @@ WorkerPool::WorkerPool(size_t num_threads,
const std::string& thread_name_prefix)
: client_(NULL),
origin_loop_(base::MessageLoopProxy::current()),
+ weak_ptr_factory_(this),
check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
check_for_completed_tasks_pending_(false),
inner_(make_scoped_ptr(new Inner(this,
@@ -550,80 +306,74 @@ WorkerPool::WorkerPool(size_t num_threads,
}
WorkerPool::~WorkerPool() {
+ // Cancel all pending callbacks.
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
+ DCHECK_EQ(0u, completed_tasks_.size());
}
void WorkerPool::Shutdown() {
inner_->Shutdown();
+ inner_->CollectCompletedTasks();
+ DispatchCompletionCallbacks();
+}
- TaskDeque completed_tasks;
- inner_->CollectCompletedTasks(&completed_tasks);
- DispatchCompletionCallbacks(&completed_tasks);
+void WorkerPool::PostTaskAndReply(
+ const Callback& task, const base::Closure& reply) {
+ PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
+ task,
+ reply)).PassAs<internal::WorkerPoolTask>());
}
-void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
+void WorkerPool::OnIdle() {
TRACE_EVENT0("cc", "WorkerPool::OnIdle");
- DispatchCompletionCallbacks(completed_tasks);
-
- // Cancel any pending check for completed tasks.
- check_for_completed_tasks_callback_.Cancel();
- check_for_completed_tasks_pending_ = false;
+ DispatchCompletionCallbacks();
}
void WorkerPool::ScheduleCheckForCompletedTasks() {
if (check_for_completed_tasks_pending_)
return;
- check_for_completed_tasks_callback_.Reset(
- base::Bind(&WorkerPool::CheckForCompletedTasks,
- base::Unretained(this)));
origin_loop_->PostDelayedTask(
FROM_HERE,
- check_for_completed_tasks_callback_.callback(),
+ base::Bind(&WorkerPool::CheckForCompletedTasks,
+ weak_ptr_factory_.GetWeakPtr()),
check_for_completed_tasks_delay_);
check_for_completed_tasks_pending_ = true;
}
void WorkerPool::CheckForCompletedTasks() {
TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
- check_for_completed_tasks_callback_.Cancel();
+ DCHECK(check_for_completed_tasks_pending_);
check_for_completed_tasks_pending_ = false;
- TaskDeque completed_tasks;
-
// Schedule another check for completed tasks if not idle.
- if (!inner_->CollectCompletedTasks(&completed_tasks))
+ if (!inner_->CollectCompletedTasks())
ScheduleCheckForCompletedTasks();
- DispatchCompletionCallbacks(&completed_tasks);
+ DispatchCompletionCallbacks();
}
-void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
+void WorkerPool::DispatchCompletionCallbacks() {
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
- // Early out when |completed_tasks| is empty to prevent unnecessary
- // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
- if (completed_tasks->empty())
+ if (completed_tasks_.empty())
return;
- while (!completed_tasks->empty()) {
- scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
- completed_tasks->pop_front();
+ while (completed_tasks_.size()) {
+ scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
task->DidComplete();
- task->DispatchCompletionCallback();
}
DCHECK(client_);
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
}
-void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) {
- TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
-
- // Schedule check for completed tasks.
- if (root)
- ScheduleCheckForCompletedTasks();
+void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
+ // Schedule check for completed tasks if not pending.
+ ScheduleCheckForCompletedTasks();
- inner_->ScheduleTasks(root);
+ inner_->PostTask(task.Pass());
}
} // namespace cc
diff --git a/cc/base/worker_pool.h b/cc/base/worker_pool.h
index 627e636..aa49ec9 100644
--- a/cc/base/worker_pool.h
+++ b/cc/base/worker_pool.h
@@ -5,52 +5,31 @@
#ifndef CC_BASE_WORKER_POOL_H_
#define CC_BASE_WORKER_POOL_H_
-#include <deque>
#include <string>
-#include <vector>
#include "base/cancelable_callback.h"
-#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
#include "cc/base/cc_export.h"
+#include "cc/base/scoped_ptr_deque.h"
namespace cc {
namespace internal {
-class CC_EXPORT WorkerPoolTask
- : public base::RefCountedThreadSafe<WorkerPoolTask> {
+class WorkerPoolTask {
public:
- typedef std::vector<scoped_refptr<WorkerPoolTask> > TaskVector;
+ virtual ~WorkerPoolTask();
virtual void RunOnThread(unsigned thread_index) = 0;
- virtual void DispatchCompletionCallback() = 0;
- void DidSchedule();
- void WillRun();
- void DidRun();
void DidComplete();
- bool IsReadyToRun() const;
- bool HasFinishedRunning() const;
- bool HasCompleted() const;
-
- TaskVector& dependencies() { return dependencies_; }
-
protected:
- friend class base::RefCountedThreadSafe<WorkerPoolTask>;
-
- WorkerPoolTask();
- explicit WorkerPoolTask(TaskVector* dependencies);
- virtual ~WorkerPoolTask();
+ explicit WorkerPoolTask(const base::Closure& reply);
- private:
- bool did_schedule_;
- bool did_run_;
- bool did_complete_;
- TaskVector dependencies_;
+ const base::Closure reply_;
};
} // namespace internal
@@ -63,49 +42,67 @@ class CC_EXPORT WorkerPoolClient {
virtual ~WorkerPoolClient() {}
};
-// A worker thread pool that runs tasks provided by task graph and
-// guarantees completion of all pending tasks at shutdown.
+// A worker thread pool that runs rendering tasks and guarantees completion
+// of all pending tasks at shutdown.
class CC_EXPORT WorkerPool {
public:
+ typedef base::Callback<void()> Callback;
+
virtual ~WorkerPool();
+ static scoped_ptr<WorkerPool> Create(
+ size_t num_threads,
+ base::TimeDelta check_for_completed_tasks_delay,
+ const std::string& thread_name_prefix) {
+ return make_scoped_ptr(new WorkerPool(num_threads,
+ check_for_completed_tasks_delay,
+ thread_name_prefix));
+ }
+
// Tells the worker pool to shutdown and returns once all pending tasks have
// completed.
- virtual void Shutdown();
+ void Shutdown();
+
+ // Posts |task| to worker pool. On completion, |reply|
+ // is posted to the thread that called PostTaskAndReply().
+ void PostTaskAndReply(const Callback& task, const base::Closure& reply);
// Set a new client.
void SetClient(WorkerPoolClient* client) {
client_ = client;
}
- // Force a check for completed tasks.
- void CheckForCompletedTasks();
-
protected:
WorkerPool(size_t num_threads,
base::TimeDelta check_for_completed_tasks_delay,
const std::string& thread_name_prefix);
- void ScheduleTasks(internal::WorkerPoolTask* root);
+ void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
private:
class Inner;
friend class Inner;
- typedef std::deque<scoped_refptr<internal::WorkerPoolTask> > TaskDeque;
-
- void OnIdle(TaskDeque* completed_tasks);
+ void OnTaskCompleted();
+ void OnIdle();
void ScheduleCheckForCompletedTasks();
- void DispatchCompletionCallbacks(TaskDeque* completed_tasks);
+ void CheckForCompletedTasks();
+ void DispatchCompletionCallbacks();
WorkerPoolClient* client_;
scoped_refptr<base::MessageLoopProxy> origin_loop_;
- base::CancelableClosure check_for_completed_tasks_callback_;
+ base::WeakPtrFactory<WorkerPool> weak_ptr_factory_;
base::TimeDelta check_for_completed_tasks_delay_;
bool check_for_completed_tasks_pending_;
+ // Holds all completed tasks for which we have not yet dispatched
+ // reply callbacks.
+ ScopedPtrDeque<internal::WorkerPoolTask> completed_tasks_;
+
// Hide the gory details of the worker pool in |inner_|.
const scoped_ptr<Inner> inner_;
+
+ DISALLOW_COPY_AND_ASSIGN(WorkerPool);
};
} // namespace cc
diff --git a/cc/base/worker_pool_perftest.cc b/cc/base/worker_pool_perftest.cc
deleted file mode 100644
index c03bcc8..0000000
--- a/cc/base/worker_pool_perftest.cc
+++ /dev/null
@@ -1,234 +0,0 @@
-// Copyright 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "cc/base/worker_pool.h"
-
-#include "base/time.h"
-#include "cc/base/completion_event.h"
-#include "testing/gtest/include/gtest/gtest.h"
-
-namespace cc {
-
-namespace {
-
-static const int kTimeLimitMillis = 2000;
-static const int kWarmupRuns = 5;
-static const int kTimeCheckInterval = 10;
-
-class PerfTaskImpl : public internal::WorkerPoolTask {
- public:
- explicit PerfTaskImpl(internal::WorkerPoolTask::TaskVector* dependencies)
- : internal::WorkerPoolTask(dependencies) {}
-
- // Overridden from internal::WorkerPoolTask:
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {}
- virtual void DispatchCompletionCallback() OVERRIDE {}
-
- private:
- virtual ~PerfTaskImpl() {}
-};
-
-class PerfControlTaskImpl : public internal::WorkerPoolTask {
- public:
- explicit PerfControlTaskImpl(
- internal::WorkerPoolTask::TaskVector* dependencies)
- : internal::WorkerPoolTask(dependencies),
- did_start_(new CompletionEvent),
- can_finish_(new CompletionEvent) {}
-
- // Overridden from internal::WorkerPoolTask:
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {
- did_start_->Signal();
- can_finish_->Wait();
- }
- virtual void DispatchCompletionCallback() OVERRIDE {}
-
- void WaitForTaskToStartRunning() {
- did_start_->Wait();
- }
-
- void AllowTaskToFinish() {
- can_finish_->Signal();
- }
-
- private:
- virtual ~PerfControlTaskImpl() {}
-
- scoped_ptr<CompletionEvent> did_start_;
- scoped_ptr<CompletionEvent> can_finish_;
-};
-
-class PerfWorkerPool : public WorkerPool {
- public:
- PerfWorkerPool() : WorkerPool(1, base::TimeDelta::FromDays(1024), "test") {}
- virtual ~PerfWorkerPool() {}
-
- static scoped_ptr<PerfWorkerPool> Create() {
- return make_scoped_ptr(new PerfWorkerPool);
- }
-
- void ScheduleTasks(internal::WorkerPoolTask* root) {
- WorkerPool::ScheduleTasks(root);
- }
-};
-
-class WorkerPoolPerfTest : public testing::Test,
- public WorkerPoolClient {
- public:
- WorkerPoolPerfTest() : num_runs_(0) {}
-
- // Overridden from testing::Test:
- virtual void SetUp() OVERRIDE {
- worker_pool_ = PerfWorkerPool::Create();
- worker_pool_->SetClient(this);
- }
- virtual void TearDown() OVERRIDE {
- worker_pool_->Shutdown();
- }
-
- // Overridden from WorkerPoolClient:
- virtual void DidFinishDispatchingWorkerPoolCompletionCallbacks() OVERRIDE {}
-
- void EndTest() {
- elapsed_ = base::TimeTicks::HighResNow() - start_time_;
- }
-
- void AfterTest(const std::string test_name) {
- // Format matches chrome/test/perf/perf_test.h:PrintResult
- printf("*RESULT %s: %.2f runs/s\n",
- test_name.c_str(),
- num_runs_ / elapsed_.InSecondsF());
- }
-
- void BuildTaskGraph(internal::WorkerPoolTask::TaskVector* dependencies,
- unsigned current_depth,
- unsigned max_depth,
- unsigned num_children_per_node) {
- internal::WorkerPoolTask::TaskVector children;
- if (current_depth < max_depth) {
- for (unsigned i = 0; i < num_children_per_node; ++i) {
- BuildTaskGraph(&children,
- current_depth + 1,
- max_depth,
- num_children_per_node);
- }
- } else if (leaf_task_) {
- children.push_back(leaf_task_);
- }
- dependencies->push_back(make_scoped_refptr(new PerfTaskImpl(&children)));
- }
-
- bool DidRun() {
- ++num_runs_;
- if (num_runs_ == kWarmupRuns)
- start_time_ = base::TimeTicks::HighResNow();
-
- if (!start_time_.is_null() && (num_runs_ % kTimeCheckInterval) == 0) {
- base::TimeDelta elapsed = base::TimeTicks::HighResNow() - start_time_;
- if (elapsed >= base::TimeDelta::FromMilliseconds(kTimeLimitMillis)) {
- elapsed_ = elapsed;
- return false;
- }
- }
-
- return true;
- }
-
- void RunBuildTaskGraphTest(const std::string test_name,
- unsigned max_depth,
- unsigned num_children_per_node) {
- start_time_ = base::TimeTicks();
- num_runs_ = 0;
- do {
- internal::WorkerPoolTask::TaskVector children;
- BuildTaskGraph(&children, 0, max_depth, num_children_per_node);
- } while (DidRun());
-
- AfterTest(test_name);
- }
-
- void RunScheduleTasksTest(const std::string test_name,
- unsigned max_depth,
- unsigned num_children_per_node) {
- start_time_ = base::TimeTicks();
- num_runs_ = 0;
- do {
- internal::WorkerPoolTask::TaskVector empty;
- leaf_task_ = make_scoped_refptr(new PerfControlTaskImpl(&empty));
- internal::WorkerPoolTask::TaskVector children;
- BuildTaskGraph(&children, 0, max_depth, num_children_per_node);
- scoped_refptr<PerfTaskImpl> root_task(
- make_scoped_refptr(new PerfTaskImpl(&children)));
-
- worker_pool_->ScheduleTasks(root_task);
- leaf_task_->WaitForTaskToStartRunning();
- worker_pool_->ScheduleTasks(NULL);
- worker_pool_->CheckForCompletedTasks();
- leaf_task_->AllowTaskToFinish();
- } while (DidRun());
-
- AfterTest(test_name);
- }
-
- void RunExecuteTasksTest(const std::string test_name,
- unsigned max_depth,
- unsigned num_children_per_node) {
- start_time_ = base::TimeTicks();
- num_runs_ = 0;
- do {
- internal::WorkerPoolTask::TaskVector children;
- BuildTaskGraph(&children, 0, max_depth, num_children_per_node);
- scoped_refptr<PerfControlTaskImpl> root_task(
- make_scoped_refptr(new PerfControlTaskImpl(&children)));
-
- worker_pool_->ScheduleTasks(root_task);
- root_task->WaitForTaskToStartRunning();
- root_task->AllowTaskToFinish();
- worker_pool_->CheckForCompletedTasks();
- } while (DidRun());
-
- AfterTest(test_name);
- }
-
- protected:
- scoped_ptr<PerfWorkerPool> worker_pool_;
- scoped_refptr<PerfControlTaskImpl> leaf_task_;
- base::TimeTicks start_time_;
- base::TimeDelta elapsed_;
- int num_runs_;
-};
-
-TEST_F(WorkerPoolPerfTest, BuildTaskGraph) {
- RunBuildTaskGraphTest("build_task_graph_1_10", 1, 10);
- RunBuildTaskGraphTest("build_task_graph_1_1000", 1, 1000);
- RunBuildTaskGraphTest("build_task_graph_2_10", 2, 10);
- RunBuildTaskGraphTest("build_task_graph_5_5", 5, 5);
- RunBuildTaskGraphTest("build_task_graph_10_2", 10, 2);
- RunBuildTaskGraphTest("build_task_graph_1000_1", 1000, 1);
- RunBuildTaskGraphTest("build_task_graph_10_1", 10, 1);
-}
-
-TEST_F(WorkerPoolPerfTest, ScheduleTasks) {
- RunScheduleTasksTest("schedule_tasks_1_10", 1, 10);
- RunScheduleTasksTest("schedule_tasks_1_1000", 1, 1000);
- RunScheduleTasksTest("schedule_tasks_2_10", 2, 10);
- RunScheduleTasksTest("schedule_tasks_5_5", 5, 5);
- RunScheduleTasksTest("schedule_tasks_10_2", 10, 2);
- RunScheduleTasksTest("schedule_tasks_1000_1", 1000, 1);
- RunScheduleTasksTest("schedule_tasks_10_1", 10, 1);
-}
-
-TEST_F(WorkerPoolPerfTest, ExecuteTasks) {
- RunExecuteTasksTest("execute_tasks_1_10", 1, 10);
- RunExecuteTasksTest("execute_tasks_1_1000", 1, 1000);
- RunExecuteTasksTest("execute_tasks_2_10", 2, 10);
- RunExecuteTasksTest("execute_tasks_5_5", 5, 5);
- RunExecuteTasksTest("execute_tasks_10_2", 10, 2);
- RunExecuteTasksTest("execute_tasks_1000_1", 1000, 1);
- RunExecuteTasksTest("execute_tasks_10_1", 10, 1);
-}
-
-} // namespace
-
-} // namespace cc
diff --git a/cc/base/worker_pool_unittest.cc b/cc/base/worker_pool_unittest.cc
index de904b1..c032e2c 100644
--- a/cc/base/worker_pool_unittest.cc
+++ b/cc/base/worker_pool_unittest.cc
@@ -4,101 +4,27 @@
#include "cc/base/worker_pool.h"
-#include <vector>
-
-#include "cc/base/completion_event.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace cc {
namespace {
-class FakeTaskImpl : public internal::WorkerPoolTask {
- public:
- FakeTaskImpl(const base::Closure& callback,
- const base::Closure& reply,
- internal::WorkerPoolTask::TaskVector* dependencies)
- : internal::WorkerPoolTask(dependencies),
- callback_(callback),
- reply_(reply) {
- }
- FakeTaskImpl(const base::Closure& callback, const base::Closure& reply)
- : callback_(callback),
- reply_(reply) {
- }
-
- // Overridden from internal::WorkerPoolTask:
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {
- if (!callback_.is_null())
- callback_.Run();
- }
- virtual void DispatchCompletionCallback() OVERRIDE {
- if (!reply_.is_null())
- reply_.Run();
- }
-
- private:
- virtual ~FakeTaskImpl() {}
-
- const base::Closure callback_;
- const base::Closure reply_;
-};
-
-class FakeWorkerPool : public WorkerPool {
- public:
- FakeWorkerPool() : WorkerPool(1, base::TimeDelta::FromDays(1024), "test") {}
- virtual ~FakeWorkerPool() {}
-
- static scoped_ptr<FakeWorkerPool> Create() {
- return make_scoped_ptr(new FakeWorkerPool);
- }
-
- void ScheduleTasks(const base::Closure& callback,
- const base::Closure& reply,
- const base::Closure& dependency,
- int count) {
- scoped_refptr<FakeTaskImpl> dependency_task(
- new FakeTaskImpl(dependency, base::Closure()));
-
- internal::WorkerPoolTask::TaskVector tasks;
- for (int i = 0; i < count; ++i) {
- internal::WorkerPoolTask::TaskVector dependencies(1, dependency_task);
- tasks.push_back(new FakeTaskImpl(callback, reply, &dependencies));
- }
- scoped_refptr<FakeTaskImpl> completion_task(
- new FakeTaskImpl(base::Bind(&FakeWorkerPool::OnTasksCompleted,
- base::Unretained(this)),
- base::Closure(),
- &tasks));
-
- scheduled_tasks_completion_.reset(new CompletionEvent);
- WorkerPool::ScheduleTasks(completion_task);
- }
-
- void WaitForTasksToComplete() {
- DCHECK(scheduled_tasks_completion_);
- scheduled_tasks_completion_->Wait();
- }
-
- private:
- void OnTasksCompleted() {
- DCHECK(scheduled_tasks_completion_);
- scheduled_tasks_completion_->Signal();
- }
-
- scoped_ptr<CompletionEvent> scheduled_tasks_completion_;
-};
-
class WorkerPoolTest : public testing::Test,
public WorkerPoolClient {
public:
- WorkerPoolTest() : finish_dispatching_completion_callbacks_count_(0) {}
- virtual ~WorkerPoolTest() {}
+ WorkerPoolTest()
+ : run_task_count_(0),
+ on_task_completed_count_(0),
+ finish_dispatching_completion_callbacks_count_(0) {
+ }
+ virtual ~WorkerPoolTest() {
+ }
- // Overridden from testing::Test:
virtual void SetUp() OVERRIDE {
Reset();
}
+
virtual void TearDown() OVERRIDE {
worker_pool_->Shutdown();
}
@@ -109,34 +35,35 @@ class WorkerPoolTest : public testing::Test,
}
void Reset() {
- worker_pool_ = FakeWorkerPool::Create();
+ worker_pool_ = WorkerPool::Create(1,
+ base::TimeDelta::FromDays(1024),
+ "test");
worker_pool_->SetClient(this);
}
void RunAllTasksAndReset() {
- worker_pool_->WaitForTasksToComplete();
worker_pool_->Shutdown();
Reset();
}
- FakeWorkerPool* worker_pool() {
+ WorkerPool* worker_pool() {
return worker_pool_.get();
}
- void RunTask(unsigned id) {
- run_task_ids_.push_back(id);
+ void RunTask() {
+ ++run_task_count_;
}
- void OnTaskCompleted(unsigned id) {
- on_task_completed_ids_.push_back(id);
+ void OnTaskCompleted() {
+ ++on_task_completed_count_;
}
- const std::vector<unsigned>& run_task_ids() {
- return run_task_ids_;
+ unsigned run_task_count() {
+ return run_task_count_;
}
- const std::vector<unsigned>& on_task_completed_ids() {
- return on_task_completed_ids_;
+ unsigned on_task_completed_count() {
+ return on_task_completed_count_;
}
unsigned finish_dispatching_completion_callbacks_count() {
@@ -144,72 +71,39 @@ class WorkerPoolTest : public testing::Test,
}
private:
- scoped_ptr<FakeWorkerPool> worker_pool_;
- std::vector<unsigned> run_task_ids_;
- std::vector<unsigned> on_task_completed_ids_;
+ scoped_ptr<WorkerPool> worker_pool_;
+ unsigned run_task_count_;
+ unsigned on_task_completed_count_;
unsigned finish_dispatching_completion_callbacks_count_;
};
TEST_F(WorkerPoolTest, Basic) {
- EXPECT_EQ(0u, run_task_ids().size());
- EXPECT_EQ(0u, on_task_completed_ids().size());
+ EXPECT_EQ(0u, run_task_count());
+ EXPECT_EQ(0u, on_task_completed_count());
EXPECT_EQ(0u, finish_dispatching_completion_callbacks_count());
- worker_pool()->ScheduleTasks(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 0u),
- base::Closure(),
- 1);
+ worker_pool()->PostTaskAndReply(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this)));
RunAllTasksAndReset();
- EXPECT_EQ(1u, run_task_ids().size());
- EXPECT_EQ(1u, on_task_completed_ids().size());
+ EXPECT_EQ(1u, run_task_count());
+ EXPECT_EQ(1u, on_task_completed_count());
EXPECT_EQ(1u, finish_dispatching_completion_callbacks_count());
- worker_pool()->ScheduleTasks(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 0u),
- base::Closure(),
- 2);
+ worker_pool()->PostTaskAndReply(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this)));
+ worker_pool()->PostTaskAndReply(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this)));
RunAllTasksAndReset();
- EXPECT_EQ(3u, run_task_ids().size());
- EXPECT_EQ(3u, on_task_completed_ids().size());
+ EXPECT_EQ(3u, run_task_count());
+ EXPECT_EQ(3u, on_task_completed_count());
EXPECT_EQ(2u, finish_dispatching_completion_callbacks_count());
}
-TEST_F(WorkerPoolTest, Dependencies) {
- worker_pool()->ScheduleTasks(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 1u),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 1u),
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
- 1);
- RunAllTasksAndReset();
-
- // Check if dependency ran before task.
- ASSERT_EQ(2u, run_task_ids().size());
- EXPECT_EQ(0u, run_task_ids()[0]);
- EXPECT_EQ(1u, run_task_ids()[1]);
- ASSERT_EQ(1u, on_task_completed_ids().size());
- EXPECT_EQ(1u, on_task_completed_ids()[0]);
-
- worker_pool()->ScheduleTasks(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 1u),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 1u),
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
- 2);
- RunAllTasksAndReset();
-
- // Dependency should only run once.
- ASSERT_EQ(5u, run_task_ids().size());
- EXPECT_EQ(0u, run_task_ids()[2]);
- EXPECT_EQ(1u, run_task_ids()[3]);
- EXPECT_EQ(1u, run_task_ids()[4]);
- ASSERT_EQ(3u, on_task_completed_ids().size());
- EXPECT_EQ(1u, on_task_completed_ids()[1]);
- EXPECT_EQ(1u, on_task_completed_ids()[2]);
-}
-
} // namespace
} // namespace cc