summaryrefslogtreecommitdiffstats
path: root/cc/base
diff options
context:
space:
mode:
authorreveman@chromium.org <reveman@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-30 03:34:13 +0000
committerreveman@chromium.org <reveman@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-30 03:34:13 +0000
commit362c9d06bc4d5b0a6760b5141125829d986cf7e5 (patch)
tree7b54bf5e810579a0bc2f5c1bc82d83cb25b6dbd4 /cc/base
parent3d82df66cceb50b8088b818e8ca74015d372872e (diff)
downloadchromium_src-362c9d06bc4d5b0a6760b5141125829d986cf7e5.zip
chromium_src-362c9d06bc4d5b0a6760b5141125829d986cf7e5.tar.gz
chromium_src-362c9d06bc4d5b0a6760b5141125829d986cf7e5.tar.bz2
Re-land: cc: Cancel and re-prioritize worker pool tasks.
This adds a task graph interface to the worker pool and implements a simple queue instance of this interface for use by the tile manager. The task graph interface can be used describe more complicated task dependencies in the future and provides the immediate benefit of seamlessly being able to cancel and re-prioritize tasks. BUG=178974,244642 TEST=cc_unittests --gtest_filter=WorkerPoolTest.Dependencies Review URL: https://chromiumcodereview.appspot.com/14689004 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@203041 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'cc/base')
-rw-r--r--cc/base/scoped_ptr_hash_map.h2
-rw-r--r--cc/base/worker_pool.cc497
-rw-r--r--cc/base/worker_pool.h74
-rw-r--r--cc/base/worker_pool_perftest.cc234
-rw-r--r--cc/base/worker_pool_unittest.cc182
5 files changed, 800 insertions, 189 deletions
diff --git a/cc/base/scoped_ptr_hash_map.h b/cc/base/scoped_ptr_hash_map.h
index 970984a..af792ec 100644
--- a/cc/base/scoped_ptr_hash_map.h
+++ b/cc/base/scoped_ptr_hash_map.h
@@ -31,7 +31,7 @@ class ScopedPtrHashMap {
~ScopedPtrHashMap() { clear(); }
- void swap(ScopedPtrHashMap<Key, Value*>& other) {
+ void swap(ScopedPtrHashMap<Key, Value>& other) {
data_.swap(other.data_);
}
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
index 59dc828..bbf51c2 100644
--- a/cc/base/worker_pool.cc
+++ b/cc/base/worker_pool.cc
@@ -4,46 +4,93 @@
#include "cc/base/worker_pool.h"
-#include <algorithm>
+#if defined(OS_ANDROID)
+// TODO(epenner): Move thread priorities to base. (crbug.com/170549)
+#include <sys/resource.h>
+#endif
+
+#include <map>
#include "base/bind.h"
#include "base/debug/trace_event.h"
+#include "base/hash_tables.h"
#include "base/stringprintf.h"
-#include "base/synchronization/condition_variable.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_restrictions.h"
+#include "cc/base/scoped_ptr_deque.h"
+#include "cc/base/scoped_ptr_hash_map.h"
+
+#if defined(COMPILER_GCC)
+namespace BASE_HASH_NAMESPACE {
+template <> struct hash<cc::internal::WorkerPoolTask*> {
+ size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
+ return hash<size_t>()(reinterpret_cast<size_t>(ptr));
+ }
+};
+} // namespace BASE_HASH_NAMESPACE
+#endif // COMPILER
namespace cc {
-namespace {
-
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
- public:
- WorkerPoolTaskImpl(const WorkerPool::Callback& task,
- const base::Closure& reply)
- : internal::WorkerPoolTask(reply),
- task_(task) {}
+namespace internal {
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {
- task_.Run();
- }
+WorkerPoolTask::WorkerPoolTask()
+ : did_schedule_(false),
+ did_run_(false),
+ did_complete_(false) {
+}
- private:
- WorkerPool::Callback task_;
-};
+WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
+ : did_schedule_(false),
+ did_run_(false),
+ did_complete_(false) {
+ dependencies_.swap(*dependencies);
+}
-} // namespace
+WorkerPoolTask::~WorkerPoolTask() {
+ DCHECK_EQ(did_schedule_, did_complete_);
+ DCHECK(!did_run_ || did_schedule_);
+ DCHECK(!did_run_ || did_complete_);
+}
-namespace internal {
+void WorkerPoolTask::DidSchedule() {
+ DCHECK(!did_complete_);
+ did_schedule_ = true;
+}
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
+void WorkerPoolTask::WillRun() {
+ DCHECK(did_schedule_);
+ DCHECK(!did_complete_);
+ DCHECK(!did_run_);
}
-WorkerPoolTask::~WorkerPoolTask() {
+void WorkerPoolTask::DidRun() {
+ did_run_ = true;
}
void WorkerPoolTask::DidComplete() {
- reply_.Run();
+ DCHECK(did_schedule_);
+ DCHECK(!did_complete_);
+ did_complete_ = true;
+}
+
+bool WorkerPoolTask::IsReadyToRun() const {
+ // TODO(reveman): Use counter to improve performance.
+ for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
+ it != dependencies_.rend(); ++it) {
+ WorkerPoolTask* dependency = *it;
+ if (!dependency->HasFinishedRunning())
+ return false;
+ }
+ return true;
+}
+
+bool WorkerPoolTask::HasFinishedRunning() const {
+ return did_run_;
+}
+
+bool WorkerPoolTask::HasCompleted() const {
+ return did_complete_;
}
} // namespace internal
@@ -60,17 +107,51 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
void Shutdown();
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
+ // Schedule running of |root| task and all its dependencies. Tasks
+ // previously scheduled but no longer needed to run |root| will be
+ // canceled unless already running. Canceled tasks are moved to
+ // |completed_tasks_| without being run. The result is that once
+ // scheduled, a task is guaranteed to end up in the |completed_tasks_|
+ // queue even if they later get canceled by another call to
+ // ScheduleTasks().
+ void ScheduleTasks(internal::WorkerPoolTask* root);
- // Appends all completed tasks to worker pool's completed tasks queue
- // and returns true if idle.
- bool CollectCompletedTasks();
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle.
+ bool CollectCompletedTasks(TaskDeque* completed_tasks);
private:
- // Appends all completed tasks to |completed_tasks|. Lock must
- // already be acquired before calling this function.
- bool AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
+ class ScheduledTask {
+ public:
+ ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority)
+ : priority_(priority) {
+ if (dependent)
+ dependents_.push_back(dependent);
+ }
+
+ internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
+ unsigned priority() const { return priority_; }
+
+ private:
+ internal::WorkerPoolTask::TaskVector dependents_;
+ unsigned priority_;
+ };
+ typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
+ typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
+ ScheduledTaskMap;
+
+ // This builds a ScheduledTaskMap from a root task.
+ static unsigned BuildScheduledTaskMapRecursive(
+ internal::WorkerPoolTask* task,
+ internal::WorkerPoolTask* dependent,
+ unsigned priority,
+ ScheduledTaskMap* scheduled_tasks);
+ static void BuildScheduledTaskMap(
+ internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
+
+ // Collect all completed tasks by swapping the contents of
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired
+ // before calling this function. Returns true if idle.
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
// Schedule an OnIdleOnOriginThread callback if not already pending.
// Lock must already be acquired before calling this function.
@@ -90,8 +171,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
mutable base::Lock lock_;
// Condition variable that is waited on by worker threads until new
- // tasks are posted or shutdown starts.
- base::ConditionVariable has_pending_tasks_cv_;
+ // tasks are ready to run or shutdown starts.
+ base::ConditionVariable has_ready_to_run_tasks_cv_;
// Target message loop used for posting callbacks.
scoped_refptr<base::MessageLoopProxy> origin_loop_;
@@ -106,15 +187,25 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
// loop index is 0.
unsigned next_thread_index_;
- // Number of tasks currently running.
- unsigned running_task_count_;
-
// Set during shutdown. Tells workers to exit when no more tasks
// are pending.
bool shutdown_;
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
- TaskDeque pending_tasks_;
+ // The root task that is a dependent of all other tasks.
+ scoped_refptr<internal::WorkerPoolTask> root_;
+
+ // This set contains all pending tasks.
+ ScheduledTaskMap pending_tasks_;
+
+ // Ordered set of tasks that are ready to run.
+ // TODO(reveman): priority_queue might be more efficient.
+ typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
+ TaskMap ready_to_run_tasks_;
+
+ // This set contains all currently running tasks.
+ ScheduledTaskMap running_tasks_;
+
+ // Completed tasks not yet collected by origin thread.
TaskDeque completed_tasks_;
ScopedPtrDeque<base::DelegateSimpleThread> workers_;
@@ -127,25 +218,24 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
const std::string& thread_name_prefix)
: worker_pool_on_origin_thread_(worker_pool),
lock_(),
- has_pending_tasks_cv_(&lock_),
+ has_ready_to_run_tasks_cv_(&lock_),
origin_loop_(base::MessageLoopProxy::current()),
weak_ptr_factory_(this),
on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
weak_ptr_factory_.GetWeakPtr())),
on_idle_pending_(false),
next_thread_index_(0),
- running_task_count_(0),
shutdown_(false) {
base::AutoLock lock(lock_);
while (workers_.size() < num_threads) {
scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
new base::DelegateSimpleThread(
- this,
- thread_name_prefix +
- base::StringPrintf(
- "Worker%u",
- static_cast<unsigned>(workers_.size() + 1)).c_str()));
+ this,
+ thread_name_prefix +
+ base::StringPrintf(
+ "Worker%u",
+ static_cast<unsigned>(workers_.size() + 1)).c_str()));
worker->Start();
workers_.push_back(worker.Pass());
}
@@ -156,12 +246,10 @@ WorkerPool::Inner::~Inner() {
DCHECK(shutdown_);
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
DCHECK_EQ(0u, pending_tasks_.size());
+ DCHECK_EQ(0u, ready_to_run_tasks_.size());
+ DCHECK_EQ(0u, running_tasks_.size());
DCHECK_EQ(0u, completed_tasks_.size());
- DCHECK_EQ(0u, running_task_count_);
}
void WorkerPool::Inner::Shutdown() {
@@ -173,7 +261,7 @@ void WorkerPool::Inner::Shutdown() {
// Wake up a worker so it knows it should exit. This will cause all workers
// to exit as each will wake up another worker before exiting.
- has_pending_tasks_cv_.Signal();
+ has_ready_to_run_tasks_cv_.Signal();
}
while (workers_.size()) {
@@ -183,32 +271,100 @@ void WorkerPool::Inner::Shutdown() {
base::ThreadRestrictions::ScopedAllowIO allow_io;
worker->Join();
}
+
+ // Cancel any pending OnIdle callback.
+ weak_ptr_factory_.InvalidateWeakPtrs();
}
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- base::AutoLock lock(lock_);
+void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) {
+ // It is OK to call ScheduleTasks() after shutdown if |root| is NULL.
+ DCHECK(!root || !shutdown_);
+
+ scoped_refptr<internal::WorkerPoolTask> new_root(root);
+
+ ScheduledTaskMap new_pending_tasks;
+ ScheduledTaskMap new_running_tasks;
+ TaskMap new_ready_to_run_tasks;
+
+ // Build scheduled task map before acquiring |lock_|.
+ if (root)
+ BuildScheduledTaskMap(root, &new_pending_tasks);
+
+ {
+ base::AutoLock lock(lock_);
+
+ // First remove all completed tasks from |new_pending_tasks|.
+ for (TaskDeque::iterator it = completed_tasks_.begin();
+ it != completed_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = *it;
+ new_pending_tasks.take_and_erase(task);
+ }
- pending_tasks_.push_back(task.Pass());
+ // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
+ for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
+ it != pending_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = it->first;
- // There is more work available, so wake up worker thread.
- has_pending_tasks_cv_.Signal();
+ // Task has completed if not present in |new_pending_tasks|.
+ if (!new_pending_tasks.contains(task))
+ completed_tasks_.push_back(task);
+ }
+
+ // Build new running task set.
+ for (ScheduledTaskMap::iterator it = running_tasks_.begin();
+ it != running_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = it->first;
+ // Transfer scheduled task value from |new_pending_tasks| to
+ // |new_running_tasks| if currently running. Value must be set to
+ // NULL if |new_pending_tasks| doesn't contain task. This does
+ // the right in both cases.
+ new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
+ }
+
+ // Build new "ready to run" tasks queue.
+ for (ScheduledTaskMap::iterator it = new_pending_tasks.begin();
+ it != new_pending_tasks.end(); ++it) {
+ internal::WorkerPoolTask* task = it->first;
+
+ // Completed tasks should not exist in |new_pending_tasks|.
+ DCHECK(!task->HasFinishedRunning());
+
+ // Call DidSchedule() to indicate that this task has been scheduled.
+ // Note: This is only for debugging purposes.
+ task->DidSchedule();
+
+ DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
+ if (task->IsReadyToRun())
+ new_ready_to_run_tasks[it->second->priority()] = task;
+ }
+
+ // Swap root taskand task sets.
+ // Note: old tasks are intentionally destroyed after releasing |lock_|.
+ root_.swap(new_root);
+ pending_tasks_.swap(new_pending_tasks);
+ running_tasks_.swap(new_running_tasks);
+ ready_to_run_tasks_.swap(new_ready_to_run_tasks);
+
+ // If there is more work available, wake up worker thread.
+ if (!ready_to_run_tasks_.empty())
+ has_ready_to_run_tasks_cv_.Signal();
+ }
}
-bool WorkerPool::Inner::CollectCompletedTasks() {
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
base::AutoLock lock(lock_);
- return AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ return CollectCompletedTasksWithLockAcquired(completed_tasks);
}
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
+ TaskDeque* completed_tasks) {
lock_.AssertAcquired();
- while (completed_tasks_.size())
- completed_tasks->push_back(completed_tasks_.take_front().Pass());
+ DCHECK_EQ(0u, completed_tasks->size());
+ completed_tasks->swap(completed_tasks_);
- return !running_task_count_ && pending_tasks_.empty();
+ return running_tasks_.empty() && pending_tasks_.empty();
}
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
@@ -221,6 +377,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
}
void WorkerPool::Inner::OnIdleOnOriginThread() {
+ TaskDeque completed_tasks;
+
{
base::AutoLock lock(lock_);
@@ -228,14 +386,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
on_idle_pending_ = false;
// Early out if no longer idle.
- if (running_task_count_ || !pending_tasks_.empty())
+ if (!running_tasks_.empty() || !pending_tasks_.empty())
return;
- AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ CollectCompletedTasksWithLockAcquired(&completed_tasks);
}
- worker_pool_on_origin_thread_->OnIdle();
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
}
void WorkerPool::Inner::Run() {
@@ -251,29 +408,37 @@ void WorkerPool::Inner::Run() {
int thread_index = next_thread_index_++;
while (true) {
- if (pending_tasks_.empty()) {
- // Exit when shutdown is set and no more tasks are pending.
- if (shutdown_)
- break;
-
- // Schedule an idle callback if requested and not pending.
- if (!running_task_count_)
- ScheduleOnIdleWithLockAcquired();
-
- // Wait for new pending tasks.
- has_pending_tasks_cv_.Wait();
+ if (ready_to_run_tasks_.empty()) {
+ if (pending_tasks_.empty()) {
+ // Exit when shutdown is set and no more tasks are pending.
+ if (shutdown_)
+ break;
+
+ // Schedule an idle callback if no tasks are running.
+ if (running_tasks_.empty())
+ ScheduleOnIdleWithLockAcquired();
+ }
+
+ // Wait for more tasks.
+ has_ready_to_run_tasks_cv_.Wait();
continue;
}
- // Get next task.
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
+ // Take top priority task from |ready_to_run_tasks_|.
+ scoped_refptr<internal::WorkerPoolTask> task(
+ ready_to_run_tasks_.begin()->second);
+ ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
+
+ // Move task from |pending_tasks_| to |running_tasks_|.
+ DCHECK(pending_tasks_.contains(task));
+ DCHECK(!running_tasks_.contains(task));
+ running_tasks_.set(task, pending_tasks_.take_and_erase(task));
- // Increment |running_task_count_| before starting to run task.
- running_task_count_++;
+ // There may be more work available, so wake up another worker thread.
+ has_ready_to_run_tasks_cv_.Signal();
- // There may be more work available, so wake up another
- // worker thread.
- has_pending_tasks_cv_.Signal();
+ // Call WillRun() before releasing |lock_| and running task.
+ task->WillRun();
{
base::AutoUnlock unlock(lock_);
@@ -281,15 +446,95 @@ void WorkerPool::Inner::Run() {
task->RunOnThread(thread_index);
}
- completed_tasks_.push_back(task.Pass());
+ // This will mark task as finished running.
+ task->DidRun();
+
+ // Now iterate over all dependents to check if they are ready to run.
+ scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase(
+ task);
+ if (scheduled_task) {
+ typedef internal::WorkerPoolTask::TaskVector TaskVector;
+ for (TaskVector::iterator it = scheduled_task->dependents().begin();
+ it != scheduled_task->dependents().end(); ++it) {
+ internal::WorkerPoolTask* dependent = *it;
+ if (!dependent->IsReadyToRun())
+ continue;
+
+ // Task is ready. Add it to |ready_to_run_tasks_|.
+ DCHECK(pending_tasks_.contains(dependent));
+ unsigned priority = pending_tasks_.get(dependent)->priority();
+ DCHECK(!ready_to_run_tasks_.count(priority) ||
+ ready_to_run_tasks_[priority] == dependent);
+ ready_to_run_tasks_[priority] = dependent;
+ }
+ }
- // Decrement |running_task_count_| now that we are done running task.
- running_task_count_--;
+ // Finally add task to |completed_tasks_|.
+ completed_tasks_.push_back(task);
}
// We noticed we should exit. Wake up the next worker so it knows it should
// exit as well (because the Shutdown() code only signals once).
- has_pending_tasks_cv_.Signal();
+ has_ready_to_run_tasks_cv_.Signal();
+}
+
+// BuildScheduledTaskMap() takes a task tree as input and constructs
+// a unique set of tasks with edges between dependencies pointing in
+// the direction of the dependents. Each task is given a unique priority
+// which is currently the same as the DFS traversal order.
+//
+// Input: Output:
+//
+// root task4 Task | Priority (lower is better)
+// / \ / \ -------+---------------------------
+// task1 task2 task3 task2 root | 4
+// | | | | task1 | 2
+// task3 | task1 | task2 | 3
+// | | \ / task3 | 1
+// task4 task4 root task4 | 0
+//
+// The output can be used to efficiently maintain a queue of
+// "ready to run" tasks.
+
+// static
+unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
+ internal::WorkerPoolTask* task,
+ internal::WorkerPoolTask* dependent,
+ unsigned priority,
+ ScheduledTaskMap* scheduled_tasks) {
+ // Skip sub-tree if task has already completed.
+ if (task->HasCompleted())
+ return priority;
+
+ ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
+ if (scheduled_it != scheduled_tasks->end()) {
+ DCHECK(dependent);
+ scheduled_it->second->dependents().push_back(dependent);
+ return priority;
+ }
+
+ typedef internal::WorkerPoolTask::TaskVector TaskVector;
+ for (TaskVector::iterator it = task->dependencies().begin();
+ it != task->dependencies().end(); ++it) {
+ internal::WorkerPoolTask* dependency = *it;
+ priority = BuildScheduledTaskMapRecursive(
+ dependency, task, priority, scheduled_tasks);
+ }
+
+ scheduled_tasks->set(task,
+ make_scoped_ptr(new ScheduledTask(dependent,
+ priority)));
+
+ return priority + 1;
+}
+
+// static
+void WorkerPool::Inner::BuildScheduledTaskMap(
+ internal::WorkerPoolTask* root,
+ ScheduledTaskMap* scheduled_tasks) {
+ const unsigned kBasePriority = 0u;
+ DCHECK(root);
+ BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
}
WorkerPool::WorkerPool(size_t num_threads,
@@ -297,83 +542,105 @@ WorkerPool::WorkerPool(size_t num_threads,
const std::string& thread_name_prefix)
: client_(NULL),
origin_loop_(base::MessageLoopProxy::current()),
- weak_ptr_factory_(this),
check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
check_for_completed_tasks_pending_(false),
+ in_dispatch_completion_callbacks_(false),
inner_(make_scoped_ptr(new Inner(this,
num_threads,
thread_name_prefix))) {
}
WorkerPool::~WorkerPool() {
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- DCHECK_EQ(0u, completed_tasks_.size());
}
void WorkerPool::Shutdown() {
+ TRACE_EVENT0("cc", "WorkerPool::Shutdown");
+
+ DCHECK(!in_dispatch_completion_callbacks_);
+
inner_->Shutdown();
- inner_->CollectCompletedTasks();
- DispatchCompletionCallbacks();
-}
-void WorkerPool::PostTaskAndReply(
- const Callback& task, const base::Closure& reply) {
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
- task,
- reply)).PassAs<internal::WorkerPoolTask>());
+ TaskDeque completed_tasks;
+ inner_->CollectCompletedTasks(&completed_tasks);
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::OnIdle() {
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::OnIdle");
- DispatchCompletionCallbacks();
+ DCHECK(!in_dispatch_completion_callbacks_);
+
+ DispatchCompletionCallbacks(completed_tasks);
+
+ // Cancel any pending check for completed tasks.
+ check_for_completed_tasks_callback_.Cancel();
+ check_for_completed_tasks_pending_ = false;
}
void WorkerPool::ScheduleCheckForCompletedTasks() {
if (check_for_completed_tasks_pending_)
return;
+ check_for_completed_tasks_callback_.Reset(
+ base::Bind(&WorkerPool::CheckForCompletedTasks,
+ base::Unretained(this)));
origin_loop_->PostDelayedTask(
FROM_HERE,
- base::Bind(&WorkerPool::CheckForCompletedTasks,
- weak_ptr_factory_.GetWeakPtr()),
+ check_for_completed_tasks_callback_.callback(),
check_for_completed_tasks_delay_);
check_for_completed_tasks_pending_ = true;
}
void WorkerPool::CheckForCompletedTasks() {
TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
- DCHECK(check_for_completed_tasks_pending_);
+
+ DCHECK(!in_dispatch_completion_callbacks_);
+
+ check_for_completed_tasks_callback_.Cancel();
check_for_completed_tasks_pending_ = false;
+ TaskDeque completed_tasks;
+
// Schedule another check for completed tasks if not idle.
- if (!inner_->CollectCompletedTasks())
+ if (!inner_->CollectCompletedTasks(&completed_tasks))
ScheduleCheckForCompletedTasks();
- DispatchCompletionCallbacks();
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::DispatchCompletionCallbacks() {
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
- if (completed_tasks_.empty())
+ // Early out when |completed_tasks| is empty to prevent unnecessary
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
+ if (completed_tasks->empty())
return;
- while (completed_tasks_.size()) {
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
+ // Worker pool instance is not reentrant while processing completed tasks.
+ in_dispatch_completion_callbacks_ = true;
+
+ while (!completed_tasks->empty()) {
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
+ completed_tasks->pop_front();
task->DidComplete();
+ task->DispatchCompletionCallback();
}
+ in_dispatch_completion_callbacks_ = false;
+
DCHECK(client_);
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
}
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- // Schedule check for completed tasks if not pending.
- ScheduleCheckForCompletedTasks();
+void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) {
+ TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
+
+ DCHECK(!in_dispatch_completion_callbacks_);
+
+ // Schedule check for completed tasks.
+ if (root)
+ ScheduleCheckForCompletedTasks();
- inner_->PostTask(task.Pass());
+ inner_->ScheduleTasks(root);
}
} // namespace cc
diff --git a/cc/base/worker_pool.h b/cc/base/worker_pool.h
index aa49ec9..a27757a 100644
--- a/cc/base/worker_pool.h
+++ b/cc/base/worker_pool.h
@@ -5,31 +5,52 @@
#ifndef CC_BASE_WORKER_POOL_H_
#define CC_BASE_WORKER_POOL_H_
+#include <deque>
#include <string>
+#include <vector>
#include "base/cancelable_callback.h"
+#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
#include "cc/base/cc_export.h"
-#include "cc/base/scoped_ptr_deque.h"
namespace cc {
namespace internal {
-class WorkerPoolTask {
+class CC_EXPORT WorkerPoolTask
+ : public base::RefCountedThreadSafe<WorkerPoolTask> {
public:
- virtual ~WorkerPoolTask();
+ typedef std::vector<scoped_refptr<WorkerPoolTask> > TaskVector;
virtual void RunOnThread(unsigned thread_index) = 0;
+ virtual void DispatchCompletionCallback() = 0;
+ void DidSchedule();
+ void WillRun();
+ void DidRun();
void DidComplete();
+ bool IsReadyToRun() const;
+ bool HasFinishedRunning() const;
+ bool HasCompleted() const;
+
+ TaskVector& dependencies() { return dependencies_; }
+
protected:
- explicit WorkerPoolTask(const base::Closure& reply);
+ friend class base::RefCountedThreadSafe<WorkerPoolTask>;
+
+ WorkerPoolTask();
+ explicit WorkerPoolTask(TaskVector* dependencies);
+ virtual ~WorkerPoolTask();
- const base::Closure reply_;
+ private:
+ bool did_schedule_;
+ bool did_run_;
+ bool did_complete_;
+ TaskVector dependencies_;
};
} // namespace internal
@@ -42,67 +63,50 @@ class CC_EXPORT WorkerPoolClient {
virtual ~WorkerPoolClient() {}
};
-// A worker thread pool that runs rendering tasks and guarantees completion
-// of all pending tasks at shutdown.
+// A worker thread pool that runs tasks provided by task graph and
+// guarantees completion of all pending tasks at shutdown.
class CC_EXPORT WorkerPool {
public:
- typedef base::Callback<void()> Callback;
-
virtual ~WorkerPool();
- static scoped_ptr<WorkerPool> Create(
- size_t num_threads,
- base::TimeDelta check_for_completed_tasks_delay,
- const std::string& thread_name_prefix) {
- return make_scoped_ptr(new WorkerPool(num_threads,
- check_for_completed_tasks_delay,
- thread_name_prefix));
- }
-
// Tells the worker pool to shutdown and returns once all pending tasks have
// completed.
- void Shutdown();
-
- // Posts |task| to worker pool. On completion, |reply|
- // is posted to the thread that called PostTaskAndReply().
- void PostTaskAndReply(const Callback& task, const base::Closure& reply);
+ virtual void Shutdown();
// Set a new client.
void SetClient(WorkerPoolClient* client) {
client_ = client;
}
+ // Force a check for completed tasks.
+ void CheckForCompletedTasks();
+
protected:
WorkerPool(size_t num_threads,
base::TimeDelta check_for_completed_tasks_delay,
const std::string& thread_name_prefix);
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
+ void ScheduleTasks(internal::WorkerPoolTask* root);
private:
class Inner;
friend class Inner;
- void OnTaskCompleted();
- void OnIdle();
+ typedef std::deque<scoped_refptr<internal::WorkerPoolTask> > TaskDeque;
+
+ void OnIdle(TaskDeque* completed_tasks);
void ScheduleCheckForCompletedTasks();
- void CheckForCompletedTasks();
- void DispatchCompletionCallbacks();
+ void DispatchCompletionCallbacks(TaskDeque* completed_tasks);
WorkerPoolClient* client_;
scoped_refptr<base::MessageLoopProxy> origin_loop_;
- base::WeakPtrFactory<WorkerPool> weak_ptr_factory_;
+ base::CancelableClosure check_for_completed_tasks_callback_;
base::TimeDelta check_for_completed_tasks_delay_;
bool check_for_completed_tasks_pending_;
-
- // Holds all completed tasks for which we have not yet dispatched
- // reply callbacks.
- ScopedPtrDeque<internal::WorkerPoolTask> completed_tasks_;
+ bool in_dispatch_completion_callbacks_;
// Hide the gory details of the worker pool in |inner_|.
const scoped_ptr<Inner> inner_;
-
- DISALLOW_COPY_AND_ASSIGN(WorkerPool);
};
} // namespace cc
diff --git a/cc/base/worker_pool_perftest.cc b/cc/base/worker_pool_perftest.cc
new file mode 100644
index 0000000..c03bcc8
--- /dev/null
+++ b/cc/base/worker_pool_perftest.cc
@@ -0,0 +1,234 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "cc/base/worker_pool.h"
+
+#include "base/time.h"
+#include "cc/base/completion_event.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace cc {
+
+namespace {
+
+static const int kTimeLimitMillis = 2000;
+static const int kWarmupRuns = 5;
+static const int kTimeCheckInterval = 10;
+
+class PerfTaskImpl : public internal::WorkerPoolTask {
+ public:
+ explicit PerfTaskImpl(internal::WorkerPoolTask::TaskVector* dependencies)
+ : internal::WorkerPoolTask(dependencies) {}
+
+ // Overridden from internal::WorkerPoolTask:
+ virtual void RunOnThread(unsigned thread_index) OVERRIDE {}
+ virtual void DispatchCompletionCallback() OVERRIDE {}
+
+ private:
+ virtual ~PerfTaskImpl() {}
+};
+
+class PerfControlTaskImpl : public internal::WorkerPoolTask {
+ public:
+ explicit PerfControlTaskImpl(
+ internal::WorkerPoolTask::TaskVector* dependencies)
+ : internal::WorkerPoolTask(dependencies),
+ did_start_(new CompletionEvent),
+ can_finish_(new CompletionEvent) {}
+
+ // Overridden from internal::WorkerPoolTask:
+ virtual void RunOnThread(unsigned thread_index) OVERRIDE {
+ did_start_->Signal();
+ can_finish_->Wait();
+ }
+ virtual void DispatchCompletionCallback() OVERRIDE {}
+
+ void WaitForTaskToStartRunning() {
+ did_start_->Wait();
+ }
+
+ void AllowTaskToFinish() {
+ can_finish_->Signal();
+ }
+
+ private:
+ virtual ~PerfControlTaskImpl() {}
+
+ scoped_ptr<CompletionEvent> did_start_;
+ scoped_ptr<CompletionEvent> can_finish_;
+};
+
+class PerfWorkerPool : public WorkerPool {
+ public:
+ PerfWorkerPool() : WorkerPool(1, base::TimeDelta::FromDays(1024), "test") {}
+ virtual ~PerfWorkerPool() {}
+
+ static scoped_ptr<PerfWorkerPool> Create() {
+ return make_scoped_ptr(new PerfWorkerPool);
+ }
+
+ void ScheduleTasks(internal::WorkerPoolTask* root) {
+ WorkerPool::ScheduleTasks(root);
+ }
+};
+
+class WorkerPoolPerfTest : public testing::Test,
+ public WorkerPoolClient {
+ public:
+ WorkerPoolPerfTest() : num_runs_(0) {}
+
+ // Overridden from testing::Test:
+ virtual void SetUp() OVERRIDE {
+ worker_pool_ = PerfWorkerPool::Create();
+ worker_pool_->SetClient(this);
+ }
+ virtual void TearDown() OVERRIDE {
+ worker_pool_->Shutdown();
+ }
+
+ // Overridden from WorkerPoolClient:
+ virtual void DidFinishDispatchingWorkerPoolCompletionCallbacks() OVERRIDE {}
+
+ void EndTest() {
+ elapsed_ = base::TimeTicks::HighResNow() - start_time_;
+ }
+
+ void AfterTest(const std::string test_name) {
+ // Format matches chrome/test/perf/perf_test.h:PrintResult
+ printf("*RESULT %s: %.2f runs/s\n",
+ test_name.c_str(),
+ num_runs_ / elapsed_.InSecondsF());
+ }
+
+ void BuildTaskGraph(internal::WorkerPoolTask::TaskVector* dependencies,
+ unsigned current_depth,
+ unsigned max_depth,
+ unsigned num_children_per_node) {
+ internal::WorkerPoolTask::TaskVector children;
+ if (current_depth < max_depth) {
+ for (unsigned i = 0; i < num_children_per_node; ++i) {
+ BuildTaskGraph(&children,
+ current_depth + 1,
+ max_depth,
+ num_children_per_node);
+ }
+ } else if (leaf_task_) {
+ children.push_back(leaf_task_);
+ }
+ dependencies->push_back(make_scoped_refptr(new PerfTaskImpl(&children)));
+ }
+
+ bool DidRun() {
+ ++num_runs_;
+ if (num_runs_ == kWarmupRuns)
+ start_time_ = base::TimeTicks::HighResNow();
+
+ if (!start_time_.is_null() && (num_runs_ % kTimeCheckInterval) == 0) {
+ base::TimeDelta elapsed = base::TimeTicks::HighResNow() - start_time_;
+ if (elapsed >= base::TimeDelta::FromMilliseconds(kTimeLimitMillis)) {
+ elapsed_ = elapsed;
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ void RunBuildTaskGraphTest(const std::string test_name,
+ unsigned max_depth,
+ unsigned num_children_per_node) {
+ start_time_ = base::TimeTicks();
+ num_runs_ = 0;
+ do {
+ internal::WorkerPoolTask::TaskVector children;
+ BuildTaskGraph(&children, 0, max_depth, num_children_per_node);
+ } while (DidRun());
+
+ AfterTest(test_name);
+ }
+
+ void RunScheduleTasksTest(const std::string test_name,
+ unsigned max_depth,
+ unsigned num_children_per_node) {
+ start_time_ = base::TimeTicks();
+ num_runs_ = 0;
+ do {
+ internal::WorkerPoolTask::TaskVector empty;
+ leaf_task_ = make_scoped_refptr(new PerfControlTaskImpl(&empty));
+ internal::WorkerPoolTask::TaskVector children;
+ BuildTaskGraph(&children, 0, max_depth, num_children_per_node);
+ scoped_refptr<PerfTaskImpl> root_task(
+ make_scoped_refptr(new PerfTaskImpl(&children)));
+
+ worker_pool_->ScheduleTasks(root_task);
+ leaf_task_->WaitForTaskToStartRunning();
+ worker_pool_->ScheduleTasks(NULL);
+ worker_pool_->CheckForCompletedTasks();
+ leaf_task_->AllowTaskToFinish();
+ } while (DidRun());
+
+ AfterTest(test_name);
+ }
+
+ void RunExecuteTasksTest(const std::string test_name,
+ unsigned max_depth,
+ unsigned num_children_per_node) {
+ start_time_ = base::TimeTicks();
+ num_runs_ = 0;
+ do {
+ internal::WorkerPoolTask::TaskVector children;
+ BuildTaskGraph(&children, 0, max_depth, num_children_per_node);
+ scoped_refptr<PerfControlTaskImpl> root_task(
+ make_scoped_refptr(new PerfControlTaskImpl(&children)));
+
+ worker_pool_->ScheduleTasks(root_task);
+ root_task->WaitForTaskToStartRunning();
+ root_task->AllowTaskToFinish();
+ worker_pool_->CheckForCompletedTasks();
+ } while (DidRun());
+
+ AfterTest(test_name);
+ }
+
+ protected:
+ scoped_ptr<PerfWorkerPool> worker_pool_;
+ scoped_refptr<PerfControlTaskImpl> leaf_task_;
+ base::TimeTicks start_time_;
+ base::TimeDelta elapsed_;
+ int num_runs_;
+};
+
+TEST_F(WorkerPoolPerfTest, BuildTaskGraph) {
+ RunBuildTaskGraphTest("build_task_graph_1_10", 1, 10);
+ RunBuildTaskGraphTest("build_task_graph_1_1000", 1, 1000);
+ RunBuildTaskGraphTest("build_task_graph_2_10", 2, 10);
+ RunBuildTaskGraphTest("build_task_graph_5_5", 5, 5);
+ RunBuildTaskGraphTest("build_task_graph_10_2", 10, 2);
+ RunBuildTaskGraphTest("build_task_graph_1000_1", 1000, 1);
+ RunBuildTaskGraphTest("build_task_graph_10_1", 10, 1);
+}
+
+TEST_F(WorkerPoolPerfTest, ScheduleTasks) {
+ RunScheduleTasksTest("schedule_tasks_1_10", 1, 10);
+ RunScheduleTasksTest("schedule_tasks_1_1000", 1, 1000);
+ RunScheduleTasksTest("schedule_tasks_2_10", 2, 10);
+ RunScheduleTasksTest("schedule_tasks_5_5", 5, 5);
+ RunScheduleTasksTest("schedule_tasks_10_2", 10, 2);
+ RunScheduleTasksTest("schedule_tasks_1000_1", 1000, 1);
+ RunScheduleTasksTest("schedule_tasks_10_1", 10, 1);
+}
+
+TEST_F(WorkerPoolPerfTest, ExecuteTasks) {
+ RunExecuteTasksTest("execute_tasks_1_10", 1, 10);
+ RunExecuteTasksTest("execute_tasks_1_1000", 1, 1000);
+ RunExecuteTasksTest("execute_tasks_2_10", 2, 10);
+ RunExecuteTasksTest("execute_tasks_5_5", 5, 5);
+ RunExecuteTasksTest("execute_tasks_10_2", 10, 2);
+ RunExecuteTasksTest("execute_tasks_1000_1", 1000, 1);
+ RunExecuteTasksTest("execute_tasks_10_1", 10, 1);
+}
+
+} // namespace
+
+} // namespace cc
diff --git a/cc/base/worker_pool_unittest.cc b/cc/base/worker_pool_unittest.cc
index c032e2c..de904b1 100644
--- a/cc/base/worker_pool_unittest.cc
+++ b/cc/base/worker_pool_unittest.cc
@@ -4,27 +4,101 @@
#include "cc/base/worker_pool.h"
+#include <vector>
+
+#include "cc/base/completion_event.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace cc {
namespace {
-class WorkerPoolTest : public testing::Test,
- public WorkerPoolClient {
+class FakeTaskImpl : public internal::WorkerPoolTask {
public:
- WorkerPoolTest()
- : run_task_count_(0),
- on_task_completed_count_(0),
- finish_dispatching_completion_callbacks_count_(0) {
+ FakeTaskImpl(const base::Closure& callback,
+ const base::Closure& reply,
+ internal::WorkerPoolTask::TaskVector* dependencies)
+ : internal::WorkerPoolTask(dependencies),
+ callback_(callback),
+ reply_(reply) {
+ }
+ FakeTaskImpl(const base::Closure& callback, const base::Closure& reply)
+ : callback_(callback),
+ reply_(reply) {
+ }
+
+ // Overridden from internal::WorkerPoolTask:
+ virtual void RunOnThread(unsigned thread_index) OVERRIDE {
+ if (!callback_.is_null())
+ callback_.Run();
+ }
+ virtual void DispatchCompletionCallback() OVERRIDE {
+ if (!reply_.is_null())
+ reply_.Run();
+ }
+
+ private:
+ virtual ~FakeTaskImpl() {}
+
+ const base::Closure callback_;
+ const base::Closure reply_;
+};
+
+class FakeWorkerPool : public WorkerPool {
+ public:
+ FakeWorkerPool() : WorkerPool(1, base::TimeDelta::FromDays(1024), "test") {}
+ virtual ~FakeWorkerPool() {}
+
+ static scoped_ptr<FakeWorkerPool> Create() {
+ return make_scoped_ptr(new FakeWorkerPool);
+ }
+
+ void ScheduleTasks(const base::Closure& callback,
+ const base::Closure& reply,
+ const base::Closure& dependency,
+ int count) {
+ scoped_refptr<FakeTaskImpl> dependency_task(
+ new FakeTaskImpl(dependency, base::Closure()));
+
+ internal::WorkerPoolTask::TaskVector tasks;
+ for (int i = 0; i < count; ++i) {
+ internal::WorkerPoolTask::TaskVector dependencies(1, dependency_task);
+ tasks.push_back(new FakeTaskImpl(callback, reply, &dependencies));
+ }
+ scoped_refptr<FakeTaskImpl> completion_task(
+ new FakeTaskImpl(base::Bind(&FakeWorkerPool::OnTasksCompleted,
+ base::Unretained(this)),
+ base::Closure(),
+ &tasks));
+
+ scheduled_tasks_completion_.reset(new CompletionEvent);
+ WorkerPool::ScheduleTasks(completion_task);
}
- virtual ~WorkerPoolTest() {
+
+ void WaitForTasksToComplete() {
+ DCHECK(scheduled_tasks_completion_);
+ scheduled_tasks_completion_->Wait();
}
+ private:
+ void OnTasksCompleted() {
+ DCHECK(scheduled_tasks_completion_);
+ scheduled_tasks_completion_->Signal();
+ }
+
+ scoped_ptr<CompletionEvent> scheduled_tasks_completion_;
+};
+
+class WorkerPoolTest : public testing::Test,
+ public WorkerPoolClient {
+ public:
+ WorkerPoolTest() : finish_dispatching_completion_callbacks_count_(0) {}
+ virtual ~WorkerPoolTest() {}
+
+ // Overridden from testing::Test:
virtual void SetUp() OVERRIDE {
Reset();
}
-
virtual void TearDown() OVERRIDE {
worker_pool_->Shutdown();
}
@@ -35,35 +109,34 @@ class WorkerPoolTest : public testing::Test,
}
void Reset() {
- worker_pool_ = WorkerPool::Create(1,
- base::TimeDelta::FromDays(1024),
- "test");
+ worker_pool_ = FakeWorkerPool::Create();
worker_pool_->SetClient(this);
}
void RunAllTasksAndReset() {
+ worker_pool_->WaitForTasksToComplete();
worker_pool_->Shutdown();
Reset();
}
- WorkerPool* worker_pool() {
+ FakeWorkerPool* worker_pool() {
return worker_pool_.get();
}
- void RunTask() {
- ++run_task_count_;
+ void RunTask(unsigned id) {
+ run_task_ids_.push_back(id);
}
- void OnTaskCompleted() {
- ++on_task_completed_count_;
+ void OnTaskCompleted(unsigned id) {
+ on_task_completed_ids_.push_back(id);
}
- unsigned run_task_count() {
- return run_task_count_;
+ const std::vector<unsigned>& run_task_ids() {
+ return run_task_ids_;
}
- unsigned on_task_completed_count() {
- return on_task_completed_count_;
+ const std::vector<unsigned>& on_task_completed_ids() {
+ return on_task_completed_ids_;
}
unsigned finish_dispatching_completion_callbacks_count() {
@@ -71,39 +144,72 @@ class WorkerPoolTest : public testing::Test,
}
private:
- scoped_ptr<WorkerPool> worker_pool_;
- unsigned run_task_count_;
- unsigned on_task_completed_count_;
+ scoped_ptr<FakeWorkerPool> worker_pool_;
+ std::vector<unsigned> run_task_ids_;
+ std::vector<unsigned> on_task_completed_ids_;
unsigned finish_dispatching_completion_callbacks_count_;
};
TEST_F(WorkerPoolTest, Basic) {
- EXPECT_EQ(0u, run_task_count());
- EXPECT_EQ(0u, on_task_completed_count());
+ EXPECT_EQ(0u, run_task_ids().size());
+ EXPECT_EQ(0u, on_task_completed_ids().size());
EXPECT_EQ(0u, finish_dispatching_completion_callbacks_count());
- worker_pool()->PostTaskAndReply(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this)));
+ worker_pool()->ScheduleTasks(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 0u),
+ base::Closure(),
+ 1);
RunAllTasksAndReset();
- EXPECT_EQ(1u, run_task_count());
- EXPECT_EQ(1u, on_task_completed_count());
+ EXPECT_EQ(1u, run_task_ids().size());
+ EXPECT_EQ(1u, on_task_completed_ids().size());
EXPECT_EQ(1u, finish_dispatching_completion_callbacks_count());
- worker_pool()->PostTaskAndReply(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this)));
- worker_pool()->PostTaskAndReply(
- base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this)),
- base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this)));
+ worker_pool()->ScheduleTasks(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 0u),
+ base::Closure(),
+ 2);
RunAllTasksAndReset();
- EXPECT_EQ(3u, run_task_count());
- EXPECT_EQ(3u, on_task_completed_count());
+ EXPECT_EQ(3u, run_task_ids().size());
+ EXPECT_EQ(3u, on_task_completed_ids().size());
EXPECT_EQ(2u, finish_dispatching_completion_callbacks_count());
}
+TEST_F(WorkerPoolTest, Dependencies) {
+ worker_pool()->ScheduleTasks(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 1u),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 1u),
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
+ 1);
+ RunAllTasksAndReset();
+
+ // Check if dependency ran before task.
+ ASSERT_EQ(2u, run_task_ids().size());
+ EXPECT_EQ(0u, run_task_ids()[0]);
+ EXPECT_EQ(1u, run_task_ids()[1]);
+ ASSERT_EQ(1u, on_task_completed_ids().size());
+ EXPECT_EQ(1u, on_task_completed_ids()[0]);
+
+ worker_pool()->ScheduleTasks(
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 1u),
+ base::Bind(&WorkerPoolTest::OnTaskCompleted, base::Unretained(this), 1u),
+ base::Bind(&WorkerPoolTest::RunTask, base::Unretained(this), 0u),
+ 2);
+ RunAllTasksAndReset();
+
+ // Dependency should only run once.
+ ASSERT_EQ(5u, run_task_ids().size());
+ EXPECT_EQ(0u, run_task_ids()[2]);
+ EXPECT_EQ(1u, run_task_ids()[3]);
+ EXPECT_EQ(1u, run_task_ids()[4]);
+ ASSERT_EQ(3u, on_task_completed_ids().size());
+ EXPECT_EQ(1u, on_task_completed_ids()[1]);
+ EXPECT_EQ(1u, on_task_completed_ids()[2]);
+}
+
} // namespace
} // namespace cc