summaryrefslogtreecommitdiffstats
path: root/base/threading
diff options
context:
space:
mode:
authorajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-07-26 18:25:16 +0000
committerajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-07-26 18:25:16 +0000
commit180c85e3e3691042ab617fd0755dcde6e75d5fbd (patch)
treeb9d4fd7a77f7f54dce4463960326ef7b0cd7a270 /base/threading
parent324ab8e0d77303333f8ad7de3b54d248587687db (diff)
downloadchromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.zip
chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.gz
chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.bz2
Support Closure in ALL the loops!
Add an overload for PostTask into MessageLoopProxy, and WorkerPool. BUG=35223 TEST=unittests. Review URL: http://codereview.chromium.org/7316015 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@94129 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/threading')
-rw-r--r--base/threading/worker_pool.h6
-rw-r--r--base/threading/worker_pool_posix.cc102
-rw-r--r--base/threading/worker_pool_posix.h34
-rw-r--r--base/threading/worker_pool_posix_unittest.cc34
-rw-r--r--base/threading/worker_pool_win.cc67
5 files changed, 181 insertions, 62 deletions
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h
index 12b50b4..58471b5 100644
--- a/base/threading/worker_pool.h
+++ b/base/threading/worker_pool.h
@@ -7,6 +7,7 @@
#pragma once
#include "base/base_api.h"
+#include "base/callback.h"
#include "base/tracked.h"
class Task;
@@ -27,8 +28,13 @@ class BASE_API WorkerPool {
// should be used for tasks that will take a long time to execute. Returns
// false if |task| could not be posted to a worker thread. Regardless of
// return value, ownership of |task| is transferred to the worker pool.
+ //
+ // TODO(ajwong): Remove the Task* based overload once we've finishsed the
+ // Task -> Closure migration.
static bool PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow);
+ static bool PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow);
};
} // namespace base
diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc
index bd6210f..c3a382c 100644
--- a/base/threading/worker_pool_posix.cc
+++ b/base/threading/worker_pool_posix.cc
@@ -4,6 +4,7 @@
#include "base/threading/worker_pool_posix.h"
+#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
@@ -11,6 +12,7 @@
#include "base/task.h"
#include "base/threading/platform_thread.h"
#include "base/threading/worker_pool.h"
+#include "base/tracked_objects.h"
namespace base {
@@ -28,6 +30,8 @@ class WorkerPoolImpl {
void PostTask(const tracked_objects::Location& from_here, Task* task,
bool task_is_slow);
+ void PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow);
private:
scoped_refptr<base::PosixDynamicThreadPool> pool_;
@@ -44,25 +48,27 @@ WorkerPoolImpl::~WorkerPoolImpl() {
void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow) {
- task->SetBirthPlace(from_here);
- pool_->PostTask(task);
+ pool_->PostTask(from_here, task);
+}
+
+void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow) {
+ pool_->PostTask(from_here, task);
}
base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED);
class WorkerThread : public PlatformThread::Delegate {
public:
- WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit,
+ WorkerThread(const std::string& name_prefix,
base::PosixDynamicThreadPool* pool)
: name_prefix_(name_prefix),
- idle_seconds_before_exit_(idle_seconds_before_exit),
pool_(pool) {}
virtual void ThreadMain();
private:
const std::string name_prefix_;
- const int idle_seconds_before_exit_;
scoped_refptr<base::PosixDynamicThreadPool> pool_;
DISALLOW_COPY_AND_ASSIGN(WorkerThread);
@@ -74,11 +80,10 @@ void WorkerThread::ThreadMain() {
PlatformThread::SetName(name.c_str());
for (;;) {
- Task* task = pool_->WaitForTask();
- if (!task)
+ PosixDynamicThreadPool::PendingTask pending_task = pool_->WaitForTask();
+ if (pending_task.task.is_null())
break;
- task->Run();
- delete task;
+ pending_task.task.Run();
}
// The WorkerThread is non-joinable, so it deletes itself.
@@ -93,21 +98,35 @@ bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
return true;
}
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow) {
+ g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
+ return true;
+}
+
+PosixDynamicThreadPool::PendingTask::PendingTask(
+ const tracked_objects::Location& posted_from,
+ const base::Closure& task)
+ : task(task) {
+}
+
+PosixDynamicThreadPool::PendingTask::~PendingTask() {
+}
+
PosixDynamicThreadPool::PosixDynamicThreadPool(
const std::string& name_prefix,
int idle_seconds_before_exit)
: name_prefix_(name_prefix),
idle_seconds_before_exit_(idle_seconds_before_exit),
- tasks_available_cv_(&lock_),
+ pending_tasks_available_cv_(&lock_),
num_idle_threads_(0),
terminated_(false),
num_idle_threads_cv_(NULL) {}
PosixDynamicThreadPool::~PosixDynamicThreadPool() {
- while (!tasks_.empty()) {
- Task* task = tasks_.front();
- tasks_.pop();
- delete task;
+ while (!pending_tasks_.empty()) {
+ PendingTask pending_task = pending_tasks_.front();
+ pending_tasks_.pop();
}
}
@@ -117,53 +136,76 @@ void PosixDynamicThreadPool::Terminate() {
DCHECK(!terminated_) << "Thread pool is already terminated.";
terminated_ = true;
}
- tasks_available_cv_.Broadcast();
+ pending_tasks_available_cv_.Broadcast();
+}
+
+void PosixDynamicThreadPool::PostTask(
+ const tracked_objects::Location& from_here,
+ Task* task) {
+ PendingTask pending_task(from_here,
+ base::Bind(&subtle::TaskClosureAdapter::Run,
+ new subtle::TaskClosureAdapter(task)));
+ // |pending_task| and AddTask() work in conjunction here to ensure that after
+ // a successful AddTask(), the TaskClosureAdapter object is deleted on the
+ // worker thread. In AddTask(), the reference |pending_task.task| is handed
+ // off in a destructive manner to ensure that the local copy of
+ // |pending_task| doesn't keep a ref on the Closure causing the
+ // TaskClosureAdapter to be deleted on the wrong thread.
+ AddTask(&pending_task);
+}
+
+void PosixDynamicThreadPool::PostTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ PendingTask pending_task(from_here, task);
+ AddTask(&pending_task);
}
-void PosixDynamicThreadPool::PostTask(Task* task) {
+void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
AutoLock locked(lock_);
DCHECK(!terminated_) <<
"This thread pool is already terminated. Do not post new tasks.";
- tasks_.push(task);
+ pending_tasks_.push(*pending_task);
+ pending_task->task.Reset();
// We have enough worker threads.
- if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) {
- tasks_available_cv_.Signal();
+ if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) {
+ pending_tasks_available_cv_.Signal();
} else {
// The new PlatformThread will take ownership of the WorkerThread object,
// which will delete itself on exit.
WorkerThread* worker =
- new WorkerThread(name_prefix_, idle_seconds_before_exit_, this);
+ new WorkerThread(name_prefix_, this);
PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker);
}
}
-Task* PosixDynamicThreadPool::WaitForTask() {
+PosixDynamicThreadPool::PendingTask PosixDynamicThreadPool::WaitForTask() {
AutoLock locked(lock_);
if (terminated_)
- return NULL;
+ return PendingTask(FROM_HERE, base::Closure());
- if (tasks_.empty()) { // No work available, wait for work.
+ if (pending_tasks_.empty()) { // No work available, wait for work.
num_idle_threads_++;
if (num_idle_threads_cv_.get())
num_idle_threads_cv_->Signal();
- tasks_available_cv_.TimedWait(
- TimeDelta::FromSeconds(kIdleSecondsBeforeExit));
+ pending_tasks_available_cv_.TimedWait(
+ TimeDelta::FromSeconds(idle_seconds_before_exit_));
num_idle_threads_--;
if (num_idle_threads_cv_.get())
num_idle_threads_cv_->Signal();
- if (tasks_.empty()) {
+ if (pending_tasks_.empty()) {
// We waited for work, but there's still no work. Return NULL to signal
// the thread to terminate.
- return NULL;
+ return PendingTask(FROM_HERE, base::Closure());
}
}
- Task* task = tasks_.front();
- tasks_.pop();
- return task;
+ PendingTask pending_task = pending_tasks_.front();
+ pending_tasks_.pop();
+ return pending_task;
}
} // namespace base
diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h
index 9bb9eda..bf75c78 100644
--- a/base/threading/worker_pool_posix.h
+++ b/base/threading/worker_pool_posix.h
@@ -29,11 +29,13 @@
#include <string>
#include "base/basictypes.h"
+#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
+#include "base/tracked.h"
class Task;
@@ -44,6 +46,19 @@ class BASE_API PosixDynamicThreadPool
public:
class PosixDynamicThreadPoolPeer;
+ struct PendingTask {
+ PendingTask(const tracked_objects::Location& posted_from,
+ const base::Closure& task);
+ ~PendingTask();
+ // TODO(ajwong): After we figure out why Mac's ~AtExitManager dies when
+ // destructing the lock, add in extra info so we can call
+ // tracked_objects::TallyADeathIfActive() and
+ // tracked_objects::TallyABirthIfActive correctly.
+
+ // The task to run.
+ base::Closure task;
+ };
+
// All worker threads will share the same |name_prefix|. They will exit after
// |idle_seconds_before_exit|.
PosixDynamicThreadPool(const std::string& name_prefix,
@@ -56,15 +71,26 @@ class BASE_API PosixDynamicThreadPool
// Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership
// of |task|.
- void PostTask(Task* task);
+ //
+ // TODO(ajwong): Remove this compatibility API once the Task -> Closure
+ // migration is finished.
+ void PostTask(const tracked_objects::Location& from_here, Task* task);
+
+ // Adds |task| to the thread pool.
+ void PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task);
// Worker thread method to wait for up to |idle_seconds_before_exit| for more
// work from the thread pool. Returns NULL if no work is available.
- Task* WaitForTask();
+ PendingTask WaitForTask();
private:
friend class PosixDynamicThreadPoolPeer;
+ // Adds pending_task to the thread pool. This function will clear
+ // |pending_task->task|.
+ void AddTask(PendingTask* pending_task);
+
const std::string name_prefix_;
const int idle_seconds_before_exit_;
@@ -73,9 +99,9 @@ class BASE_API PosixDynamicThreadPool
// Signal()s worker threads to let them know more tasks are available.
// Also used for Broadcast()'ing to worker threads to let them know the pool
// is being deleted and they can exit.
- ConditionVariable tasks_available_cv_;
+ ConditionVariable pending_tasks_available_cv_;
int num_idle_threads_;
- std::queue<Task*> tasks_;
+ std::queue<PendingTask> pending_tasks_;
bool terminated_;
// Only used for tests to ensure correct thread ordering. It will always be
// NULL in non-test code.
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc
index c984ee3..01722f0 100644
--- a/base/threading/worker_pool_posix_unittest.cc
+++ b/base/threading/worker_pool_posix_unittest.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -22,10 +22,12 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
: pool_(pool) {}
Lock* lock() { return &pool_->lock_; }
- ConditionVariable* tasks_available_cv() {
- return &pool_->tasks_available_cv_;
+ ConditionVariable* pending_tasks_available_cv() {
+ return &pool_->pending_tasks_available_cv_;
+ }
+ const std::queue<PendingTask>& pending_tasks() const {
+ return pool_->pending_tasks_;
}
- const std::queue<Task*>& tasks() const { return pool_->tasks_; }
int num_idle_threads() const { return pool_->num_idle_threads_; }
ConditionVariable* num_idle_threads_cv() {
return pool_->num_idle_threads_cv_.get();
@@ -180,10 +182,10 @@ class PosixDynamicThreadPoolTest : public testing::Test {
TEST_F(PosixDynamicThreadPoolTest, Basic) {
EXPECT_EQ(0, peer_.num_idle_threads());
EXPECT_EQ(0U, unique_threads_.size());
- EXPECT_EQ(0U, peer_.tasks().size());
+ EXPECT_EQ(0U, peer_.pending_tasks().size());
// Add one task and wait for it to be completed.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
@@ -195,13 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {
TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
// Add one task and wait for it to be completed.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
// Add another 2 tasks. One should reuse the existing worker thread.
- pool_->PostTask(CreateNewBlockingIncrementingTask());
- pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
WaitForTasksToStart(2);
start_.Signal();
@@ -214,8 +216,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
// Add two blocking tasks.
- pool_->PostTask(CreateNewBlockingIncrementingTask());
- pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
@@ -230,14 +232,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
TEST_F(PosixDynamicThreadPoolTest, Complex) {
// Add two non blocking tasks and wait for them to finish.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
// Add two blocking tasks, start them simultaneously, and wait for them to
// finish.
- pool_->PostTask(CreateNewBlockingIncrementingTask());
- pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
WaitForTasksToStart(2);
start_.Signal();
@@ -251,13 +253,13 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) {
{
base::AutoLock locked(*peer_.lock());
while (peer_.num_idle_threads() > 0) {
- peer_.tasks_available_cv()->Signal();
+ peer_.pending_tasks_available_cv()->Signal();
peer_.num_idle_threads_cv()->Wait();
}
}
// Add another non blocking task. There are no threads to reuse.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
EXPECT_EQ(3U, unique_threads_.size());
diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc
index 2072e52..2aa423f 100644
--- a/base/threading/worker_pool_win.cc
+++ b/base/threading/worker_pool_win.cc
@@ -1,40 +1,83 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/threading/worker_pool.h"
+#include "base/bind.h"
#include "base/logging.h"
#include "base/task.h"
+#include "base/tracked_objects.h"
namespace base {
namespace {
+struct PendingTask {
+ PendingTask(
+ const tracked_objects::Location& posted_from,
+ const base::Closure& task)
+ : task(task) {
+#if defined(TRACK_ALL_TASK_OBJECTS)
+ post_births = tracked_objects::ThreadData::TallyABirthIfActive(posted_from);
+ time_posted = TimeTicks::Now();
+#endif // defined(TRACK_ALL_TASK_OBJECTS)
+ }
+
+#if defined(TRACK_ALL_TASK_OBJECTS)
+ // Counter for location where the Closure was posted from.
+ tracked_objects::Births* post_births;
+
+ // Time the task was posted.
+ TimeTicks time_posted;
+#endif // defined(TRACK_ALL_TASK_OBJECTS)
+
+ // The task to run.
+ base::Closure task;
+};
+
DWORD CALLBACK WorkItemCallback(void* param) {
- Task* task = static_cast<Task*>(param);
- task->Run();
- delete task;
+ PendingTask* pending_task = static_cast<PendingTask*>(param);
+ pending_task->task.Run();
+#if defined(TRACK_ALL_TASK_OBJECTS)
+ tracked_objects::ThreadData::TallyADeathIfActive(
+ pending_task->post_births,
+ TimeTicks::Now() - pending_task->time_posted);
+#endif // defined(TRACK_ALL_TASK_OBJECTS)
+ delete pending_task;
return 0;
}
-} // namespace
-
-bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
- Task* task, bool task_is_slow) {
- task->SetBirthPlace(from_here);
-
+// Takes ownership of |pending_task|
+bool PostTaskInternal(PendingTask* pending_task, bool task_is_slow) {
ULONG flags = 0;
if (task_is_slow)
flags |= WT_EXECUTELONGFUNCTION;
- if (!QueueUserWorkItem(WorkItemCallback, task, flags)) {
+ if (!QueueUserWorkItem(WorkItemCallback, pending_task, flags)) {
DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError();
- delete task;
+ delete pending_task;
return false;
}
return true;
}
+} // namespace
+
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow) {
+ PendingTask* pending_task =
+ new PendingTask(from_here,
+ base::Bind(&subtle::TaskClosureAdapter::Run,
+ new subtle::TaskClosureAdapter(task)));
+ return PostTaskInternal(pending_task, task_is_slow);
+}
+
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow) {
+ PendingTask* pending_task = new PendingTask(from_here, task);
+ return PostTaskInternal(pending_task, task_is_slow);
+}
+
} // namespace base