summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authorajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-07-26 18:25:16 +0000
committerajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-07-26 18:25:16 +0000
commit180c85e3e3691042ab617fd0755dcde6e75d5fbd (patch)
treeb9d4fd7a77f7f54dce4463960326ef7b0cd7a270 /base
parent324ab8e0d77303333f8ad7de3b54d248587687db (diff)
downloadchromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.zip
chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.gz
chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.bz2
Support Closure in ALL the loops!
Add an overload for PostTask into MessageLoopProxy, and WorkerPool. BUG=35223 TEST=unittests. Review URL: http://codereview.chromium.org/7316015 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@94129 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/message_loop.cc71
-rw-r--r--base/message_loop_proxy.h24
-rw-r--r--base/message_loop_proxy_impl.cc42
-rw-r--r--base/message_loop_proxy_impl.h24
-rw-r--r--base/message_loop_proxy_impl_unittest.cc48
-rw-r--r--base/task.cc29
-rw-r--r--base/task.h38
-rw-r--r--base/threading/worker_pool.h6
-rw-r--r--base/threading/worker_pool_posix.cc102
-rw-r--r--base/threading/worker_pool_posix.h34
-rw-r--r--base/threading/worker_pool_posix_unittest.cc34
-rw-r--r--base/threading/worker_pool_win.cc67
-rw-r--r--base/tracked_objects.cc20
-rw-r--r--base/tracked_objects.h7
14 files changed, 420 insertions, 126 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 08985a4..e3f6ea3 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -85,40 +85,6 @@ const base::LinearHistogram::DescriptionPair event_descriptions_[] = {
bool enable_histogrammer_ = false;
-// TODO(ajwong): This is one use case for having a Owned() tag that behaves
-// like a "Unique" pointer. If we had that, and Tasks were always safe to
-// delete on MessageLoop shutdown, this class could just be a function.
-class TaskClosureAdapter : public base::RefCounted<TaskClosureAdapter> {
- public:
- // |should_leak_task| points to a flag variable that can be used to determine
- // if this class should leak the Task on destruction. This is important
- // at MessageLoop shutdown since not all tasks can be safely deleted without
- // running. See MessageLoop::DeletePendingTasks() for the exact behavior
- // of when a Task should be deleted. It is subtle.
- TaskClosureAdapter(Task* task, bool* should_leak_task)
- : task_(task),
- should_leak_task_(should_leak_task) {
- }
-
- void Run() {
- task_->Run();
- delete task_;
- task_ = NULL;
- }
-
- private:
- friend class base::RefCounted<TaskClosureAdapter>;
-
- ~TaskClosureAdapter() {
- if (!*should_leak_task_) {
- delete task_;
- }
- }
-
- Task* task_;
- bool* should_leak_task_;
-};
-
} // namespace
//------------------------------------------------------------------------------
@@ -271,8 +237,9 @@ void MessageLoop::PostTask(
const tracked_objects::Location& from_here, Task* task) {
CHECK(task);
PendingTask pending_task(
- base::Bind(&TaskClosureAdapter::Run,
- new TaskClosureAdapter(task, &should_leak_tasks_)),
+ base::Bind(
+ &base::subtle::TaskClosureAdapter::Run,
+ new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)),
from_here,
CalculateDelayedRuntime(0), true);
AddToIncomingQueue(&pending_task);
@@ -282,8 +249,9 @@ void MessageLoop::PostDelayedTask(
const tracked_objects::Location& from_here, Task* task, int64 delay_ms) {
CHECK(task);
PendingTask pending_task(
- base::Bind(&TaskClosureAdapter::Run,
- new TaskClosureAdapter(task, &should_leak_tasks_)),
+ base::Bind(
+ &base::subtle::TaskClosureAdapter::Run,
+ new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)),
from_here,
CalculateDelayedRuntime(delay_ms), true);
AddToIncomingQueue(&pending_task);
@@ -293,8 +261,9 @@ void MessageLoop::PostNonNestableTask(
const tracked_objects::Location& from_here, Task* task) {
CHECK(task);
PendingTask pending_task(
- base::Bind(&TaskClosureAdapter::Run,
- new TaskClosureAdapter(task, &should_leak_tasks_)),
+ base::Bind(
+ &base::subtle::TaskClosureAdapter::Run,
+ new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)),
from_here,
CalculateDelayedRuntime(0), false);
AddToIncomingQueue(&pending_task);
@@ -304,8 +273,9 @@ void MessageLoop::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here, Task* task, int64 delay_ms) {
CHECK(task);
PendingTask pending_task(
- base::Bind(&TaskClosureAdapter::Run,
- new TaskClosureAdapter(task, &should_leak_tasks_)),
+ base::Bind(
+ &base::subtle::TaskClosureAdapter::Run,
+ new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)),
from_here,
CalculateDelayedRuntime(delay_ms), false);
AddToIncomingQueue(&pending_task);
@@ -486,11 +456,9 @@ void MessageLoop::RunTask(const PendingTask& pending_task) {
DidProcessTask(pending_task.time_posted));
#if defined(TRACK_ALL_TASK_OBJECTS)
- if (tracked_objects::ThreadData::IsActive() && pending_task.post_births) {
- tracked_objects::ThreadData::current()->TallyADeath(
- *pending_task.post_births,
- TimeTicks::Now() - pending_task.time_posted);
- }
+ tracked_objects::ThreadData::TallyADeathIfActive(
+ pending_task.post_births,
+ TimeTicks::Now() - pending_task.time_posted);
#endif // defined(TRACK_ALL_TASK_OBJECTS)
nestable_tasks_allowed_ = true;
@@ -780,14 +748,7 @@ MessageLoop::PendingTask::PendingTask(
nestable(nestable),
birth_program_counter(posted_from.program_counter()) {
#if defined(TRACK_ALL_TASK_OBJECTS)
- post_births = NULL;
- if (tracked_objects::ThreadData::IsActive()) {
- tracked_objects::ThreadData* current_thread_data =
- tracked_objects::ThreadData::current();
- if (current_thread_data) {
- post_births = current_thread_data->TallyABirth(posted_from);
- }
- }
+ post_births = tracked_objects::ThreadData::TallyABirthIfActive(posted_from);
#endif // defined(TRACK_ALL_TASK_OBJECTS)
}
diff --git a/base/message_loop_proxy.h b/base/message_loop_proxy.h
index 07bca64..d6955ba 100644
--- a/base/message_loop_proxy.h
+++ b/base/message_loop_proxy.h
@@ -8,6 +8,7 @@
#include "base/base_api.h"
#include "base/basictypes.h"
+#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/task.h"
@@ -34,13 +35,34 @@ class BASE_API MessageLoopProxy
virtual bool PostTask(const tracked_objects::Location& from_here,
Task* task) = 0;
virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
- Task* task, int64 delay_ms) = 0;
+ Task* task,
+ int64 delay_ms) = 0;
virtual bool PostNonNestableTask(const tracked_objects::Location& from_here,
Task* task) = 0;
virtual bool PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
Task* task,
int64 delay_ms) = 0;
+
+ // TODO(ajwong): Remove the functions above once the Task -> Closure migration
+ // is complete.
+ //
+ // There are 2 sets of Post*Task functions, one which takes the older Task*
+ // function object representation, and one that takes the newer base::Closure.
+ // We have this overload to allow a staged transition between the two systems.
+ // Once the transition is done, the functions above should be deleted.
+ virtual bool PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task) = 0;
+ virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms) = 0;
+ virtual bool PostNonNestableTask(const tracked_objects::Location& from_here,
+ const base::Closure& task) = 0;
+ virtual bool PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms) = 0;
+
// A method which checks if the caller is currently running in the thread that
// this proxy represents.
virtual bool BelongsToCurrentThread() = 0;
diff --git a/base/message_loop_proxy_impl.cc b/base/message_loop_proxy_impl.cc
index b47c934..af0d214 100644
--- a/base/message_loop_proxy_impl.cc
+++ b/base/message_loop_proxy_impl.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -40,6 +40,30 @@ bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
return PostTaskHelper(from_here, task, delay_ms, false);
}
+bool MessageLoopProxyImpl::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ return PostTaskHelper(from_here, task, 0, true);
+}
+
+bool MessageLoopProxyImpl::PostDelayedTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms) {
+ return PostTaskHelper(from_here, task, delay_ms, true);
+}
+
+bool MessageLoopProxyImpl::PostNonNestableTask(
+ const tracked_objects::Location& from_here, const base::Closure& task) {
+ return PostTaskHelper(from_here, task, 0, false);
+}
+
+bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms) {
+ return PostTaskHelper(from_here, task, delay_ms, false);
+}
+
bool MessageLoopProxyImpl::BelongsToCurrentThread() {
// We shouldn't use MessageLoop::current() since it uses LazyInstance which
// may be deleted by ~AtExitManager when a WorkerPool thread calls this
@@ -102,6 +126,22 @@ bool MessageLoopProxyImpl::PostTaskHelper(
return ret;
}
+bool MessageLoopProxyImpl::PostTaskHelper(
+ const tracked_objects::Location& from_here, const base::Closure& task,
+ int64 delay_ms, bool nestable) {
+ AutoLock lock(message_loop_lock_);
+ if (target_message_loop_) {
+ if (nestable) {
+ target_message_loop_->PostDelayedTask(from_here, task, delay_ms);
+ } else {
+ target_message_loop_->PostNonNestableDelayedTask(from_here, task,
+ delay_ms);
+ }
+ return true;
+ }
+ return false;
+}
+
scoped_refptr<MessageLoopProxy>
MessageLoopProxy::CreateForCurrentThread() {
scoped_refptr<MessageLoopProxy> ret(new MessageLoopProxyImpl());
diff --git a/base/message_loop_proxy_impl.h b/base/message_loop_proxy_impl.h
index 80d9a26..4c04148 100644
--- a/base/message_loop_proxy_impl.h
+++ b/base/message_loop_proxy_impl.h
@@ -25,13 +25,25 @@ class BASE_API MessageLoopProxyImpl : public MessageLoopProxy,
virtual bool PostTask(const tracked_objects::Location& from_here,
Task* task);
virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
- Task* task, int64 delay_ms);
+ Task* task,
+ int64 delay_ms);
virtual bool PostNonNestableTask(const tracked_objects::Location& from_here,
Task* task);
virtual bool PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
Task* task,
int64 delay_ms);
+ virtual bool PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task);
+ virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms);
+ virtual bool PostNonNestableTask(const tracked_objects::Location& from_here,
+ const base::Closure& task);
+ virtual bool PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms);
virtual bool BelongsToCurrentThread();
// MessageLoop::DestructionObserver implementation
@@ -44,8 +56,15 @@ class BASE_API MessageLoopProxyImpl : public MessageLoopProxy,
private:
MessageLoopProxyImpl();
+ // TODO(ajwong): Remove this after we've fully migrated to base::Closure.
bool PostTaskHelper(const tracked_objects::Location& from_here,
- Task* task, int64 delay_ms, bool nestable);
+ Task* task,
+ int64 delay_ms,
+ bool nestable);
+ bool PostTaskHelper(const tracked_objects::Location& from_here,
+ const base::Closure& task,
+ int64 delay_ms,
+ bool nestable);
// For the factory method to work
friend class MessageLoopProxy;
@@ -60,4 +79,3 @@ class BASE_API MessageLoopProxyImpl : public MessageLoopProxy,
} // namespace base
#endif // BASE_MESSAGE_LOOP_PROXY_IMPL_H_
-
diff --git a/base/message_loop_proxy_impl_unittest.cc b/base/message_loop_proxy_impl_unittest.cc
index 558cd932..d635acb 100644
--- a/base/message_loop_proxy_impl_unittest.cc
+++ b/base/message_loop_proxy_impl_unittest.cc
@@ -2,9 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/message_loop_proxy_impl.h"
+
+#include "base/bind.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop.h"
-#include "base/message_loop_proxy_impl.h"
#include "base/threading/thread.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/platform_test.h"
@@ -47,6 +49,10 @@ class MessageLoopProxyImplTest : public testing::Test {
test->Quit();
}
+ static void AssertNotRun() {
+ FAIL() << "Callback Should not get executed.";
+ }
+
class DummyTask : public Task {
public:
explicit DummyTask(bool* deleted) : deleted_(deleted) { }
@@ -83,7 +89,7 @@ class MessageLoopProxyImplTest : public testing::Test {
};
-TEST_F(MessageLoopProxyImplTest, PostTask) {
+TEST_F(MessageLoopProxyImplTest, LegacyPostTask) {
EXPECT_TRUE(file_thread_->message_loop_proxy()->PostTask(
FROM_HERE, NewRunnableFunction(&BasicFunction, this)));
MessageLoop::current()->Run();
@@ -101,7 +107,7 @@ TEST_F(MessageLoopProxyImplTest, Delete) {
MessageLoop::current()->Run();
}
-TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadExits) {
+TEST_F(MessageLoopProxyImplTest, LegacyPostTaskAfterThreadExits) {
scoped_ptr<base::Thread> test_thread(
new base::Thread("MessageLoopProxyImplTest_Dummy"));
test_thread->Start();
@@ -116,7 +122,7 @@ TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadExits) {
EXPECT_TRUE(deleted);
}
-TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadIsDeleted) {
+TEST_F(MessageLoopProxyImplTest, LegacyPostTaskAfterThreadIsDeleted) {
scoped_refptr<base::MessageLoopProxy> message_loop_proxy;
{
scoped_ptr<base::Thread> test_thread(
@@ -130,3 +136,37 @@ TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadIsDeleted) {
EXPECT_TRUE(deleted);
}
+TEST_F(MessageLoopProxyImplTest, PostTask) {
+ EXPECT_TRUE(file_thread_->message_loop_proxy()->PostTask(
+ FROM_HERE, base::Bind(&MessageLoopProxyImplTest::BasicFunction,
+ base::Unretained(this))));
+ MessageLoop::current()->Run();
+}
+
+TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadExits) {
+ scoped_ptr<base::Thread> test_thread(
+ new base::Thread("MessageLoopProxyImplTest_Dummy"));
+ test_thread->Start();
+ scoped_refptr<base::MessageLoopProxy> message_loop_proxy =
+ test_thread->message_loop_proxy();
+ test_thread->Stop();
+
+ bool ret = message_loop_proxy->PostTask(
+ FROM_HERE,
+ base::Bind(&MessageLoopProxyImplTest::AssertNotRun));
+ EXPECT_FALSE(ret);
+}
+
+TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadIsDeleted) {
+ scoped_refptr<base::MessageLoopProxy> message_loop_proxy;
+ {
+ scoped_ptr<base::Thread> test_thread(
+ new base::Thread("MessageLoopProxyImplTest_Dummy"));
+ test_thread->Start();
+ message_loop_proxy = test_thread->message_loop_proxy();
+ }
+ bool ret = message_loop_proxy->PostTask(
+ FROM_HERE,
+ base::Bind(&MessageLoopProxyImplTest::AssertNotRun));
+ EXPECT_FALSE(ret);
+}
diff --git a/base/task.cc b/base/task.cc
index e4da547..8c61473 100644
--- a/base/task.cc
+++ b/base/task.cc
@@ -34,4 +34,33 @@ Task* ScopedTaskRunner::Release() {
return tmp;
}
+namespace subtle {
+
+TaskClosureAdapter::TaskClosureAdapter(Task* task)
+ : task_(task),
+ should_leak_task_(&kTaskLeakingDefault) {
+}
+
+TaskClosureAdapter::TaskClosureAdapter(Task* task, bool* should_leak_task)
+ : task_(task),
+ should_leak_task_(should_leak_task) {
+}
+
+TaskClosureAdapter::~TaskClosureAdapter() {
+ if (!*should_leak_task_) {
+ delete task_;
+ }
+}
+
+void TaskClosureAdapter::Run() {
+ task_->Run();
+ delete task_;
+ task_ = NULL;
+}
+
+// Don't leak tasks by default.
+bool TaskClosureAdapter::kTaskLeakingDefault = false;
+
+} // namespace subtle
+
} // namespace base
diff --git a/base/task.h b/base/task.h
index ae47f32..4bee08c 100644
--- a/base/task.h
+++ b/base/task.h
@@ -564,6 +564,44 @@ class BASE_API ScopedTaskRunner {
DISALLOW_IMPLICIT_CONSTRUCTORS(ScopedTaskRunner);
};
+namespace subtle {
+
+// This class is meant for use in the implementation of MessageLoop classes
+// such as MessageLoop, MessageLoopProxy, BrowserThread, and WorkerPool to
+// implement the compatibility APIs while we are transitioning from Task to
+// Callback.
+//
+// It should NOT be used anywhere else!
+//
+// In particular, notice that this is RefCounted instead of
+// RefCountedThreadSafe. We rely on the fact that users of this class are
+// careful to ensure that a lock is taken during transfer of ownership for
+// objects from this class to ensure the refcount is not corrupted.
+class TaskClosureAdapter : public RefCounted<TaskClosureAdapter> {
+ public:
+ explicit TaskClosureAdapter(Task* task);
+
+ // |should_leak_task| points to a flag variable that can be used to determine
+ // if this class should leak the Task on destruction. This is important
+ // at MessageLoop shutdown since not all tasks can be safely deleted without
+ // running. See MessageLoop::DeletePendingTasks() for the exact behavior
+ // of when a Task should be deleted. It is subtle.
+ TaskClosureAdapter(Task* task, bool* should_leak_task);
+
+ void Run();
+
+ private:
+ friend class base::RefCounted<TaskClosureAdapter>;
+
+ ~TaskClosureAdapter();
+
+ Task* task_;
+ bool* should_leak_task_;
+ static bool kTaskLeakingDefault;
+};
+
+} // namespace subtle
+
} // namespace base
#endif // BASE_TASK_H_
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h
index 12b50b4..58471b5 100644
--- a/base/threading/worker_pool.h
+++ b/base/threading/worker_pool.h
@@ -7,6 +7,7 @@
#pragma once
#include "base/base_api.h"
+#include "base/callback.h"
#include "base/tracked.h"
class Task;
@@ -27,8 +28,13 @@ class BASE_API WorkerPool {
// should be used for tasks that will take a long time to execute. Returns
// false if |task| could not be posted to a worker thread. Regardless of
// return value, ownership of |task| is transferred to the worker pool.
+ //
+ // TODO(ajwong): Remove the Task* based overload once we've finishsed the
+ // Task -> Closure migration.
static bool PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow);
+ static bool PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow);
};
} // namespace base
diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc
index bd6210f..c3a382c 100644
--- a/base/threading/worker_pool_posix.cc
+++ b/base/threading/worker_pool_posix.cc
@@ -4,6 +4,7 @@
#include "base/threading/worker_pool_posix.h"
+#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
@@ -11,6 +12,7 @@
#include "base/task.h"
#include "base/threading/platform_thread.h"
#include "base/threading/worker_pool.h"
+#include "base/tracked_objects.h"
namespace base {
@@ -28,6 +30,8 @@ class WorkerPoolImpl {
void PostTask(const tracked_objects::Location& from_here, Task* task,
bool task_is_slow);
+ void PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow);
private:
scoped_refptr<base::PosixDynamicThreadPool> pool_;
@@ -44,25 +48,27 @@ WorkerPoolImpl::~WorkerPoolImpl() {
void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow) {
- task->SetBirthPlace(from_here);
- pool_->PostTask(task);
+ pool_->PostTask(from_here, task);
+}
+
+void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow) {
+ pool_->PostTask(from_here, task);
}
base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED);
class WorkerThread : public PlatformThread::Delegate {
public:
- WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit,
+ WorkerThread(const std::string& name_prefix,
base::PosixDynamicThreadPool* pool)
: name_prefix_(name_prefix),
- idle_seconds_before_exit_(idle_seconds_before_exit),
pool_(pool) {}
virtual void ThreadMain();
private:
const std::string name_prefix_;
- const int idle_seconds_before_exit_;
scoped_refptr<base::PosixDynamicThreadPool> pool_;
DISALLOW_COPY_AND_ASSIGN(WorkerThread);
@@ -74,11 +80,10 @@ void WorkerThread::ThreadMain() {
PlatformThread::SetName(name.c_str());
for (;;) {
- Task* task = pool_->WaitForTask();
- if (!task)
+ PosixDynamicThreadPool::PendingTask pending_task = pool_->WaitForTask();
+ if (pending_task.task.is_null())
break;
- task->Run();
- delete task;
+ pending_task.task.Run();
}
// The WorkerThread is non-joinable, so it deletes itself.
@@ -93,21 +98,35 @@ bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
return true;
}
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow) {
+ g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
+ return true;
+}
+
+PosixDynamicThreadPool::PendingTask::PendingTask(
+ const tracked_objects::Location& posted_from,
+ const base::Closure& task)
+ : task(task) {
+}
+
+PosixDynamicThreadPool::PendingTask::~PendingTask() {
+}
+
PosixDynamicThreadPool::PosixDynamicThreadPool(
const std::string& name_prefix,
int idle_seconds_before_exit)
: name_prefix_(name_prefix),
idle_seconds_before_exit_(idle_seconds_before_exit),
- tasks_available_cv_(&lock_),
+ pending_tasks_available_cv_(&lock_),
num_idle_threads_(0),
terminated_(false),
num_idle_threads_cv_(NULL) {}
PosixDynamicThreadPool::~PosixDynamicThreadPool() {
- while (!tasks_.empty()) {
- Task* task = tasks_.front();
- tasks_.pop();
- delete task;
+ while (!pending_tasks_.empty()) {
+ PendingTask pending_task = pending_tasks_.front();
+ pending_tasks_.pop();
}
}
@@ -117,53 +136,76 @@ void PosixDynamicThreadPool::Terminate() {
DCHECK(!terminated_) << "Thread pool is already terminated.";
terminated_ = true;
}
- tasks_available_cv_.Broadcast();
+ pending_tasks_available_cv_.Broadcast();
+}
+
+void PosixDynamicThreadPool::PostTask(
+ const tracked_objects::Location& from_here,
+ Task* task) {
+ PendingTask pending_task(from_here,
+ base::Bind(&subtle::TaskClosureAdapter::Run,
+ new subtle::TaskClosureAdapter(task)));
+ // |pending_task| and AddTask() work in conjunction here to ensure that after
+ // a successful AddTask(), the TaskClosureAdapter object is deleted on the
+ // worker thread. In AddTask(), the reference |pending_task.task| is handed
+ // off in a destructive manner to ensure that the local copy of
+ // |pending_task| doesn't keep a ref on the Closure causing the
+ // TaskClosureAdapter to be deleted on the wrong thread.
+ AddTask(&pending_task);
+}
+
+void PosixDynamicThreadPool::PostTask(
+ const tracked_objects::Location& from_here,
+ const base::Closure& task) {
+ PendingTask pending_task(from_here, task);
+ AddTask(&pending_task);
}
-void PosixDynamicThreadPool::PostTask(Task* task) {
+void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
AutoLock locked(lock_);
DCHECK(!terminated_) <<
"This thread pool is already terminated. Do not post new tasks.";
- tasks_.push(task);
+ pending_tasks_.push(*pending_task);
+ pending_task->task.Reset();
// We have enough worker threads.
- if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) {
- tasks_available_cv_.Signal();
+ if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) {
+ pending_tasks_available_cv_.Signal();
} else {
// The new PlatformThread will take ownership of the WorkerThread object,
// which will delete itself on exit.
WorkerThread* worker =
- new WorkerThread(name_prefix_, idle_seconds_before_exit_, this);
+ new WorkerThread(name_prefix_, this);
PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker);
}
}
-Task* PosixDynamicThreadPool::WaitForTask() {
+PosixDynamicThreadPool::PendingTask PosixDynamicThreadPool::WaitForTask() {
AutoLock locked(lock_);
if (terminated_)
- return NULL;
+ return PendingTask(FROM_HERE, base::Closure());
- if (tasks_.empty()) { // No work available, wait for work.
+ if (pending_tasks_.empty()) { // No work available, wait for work.
num_idle_threads_++;
if (num_idle_threads_cv_.get())
num_idle_threads_cv_->Signal();
- tasks_available_cv_.TimedWait(
- TimeDelta::FromSeconds(kIdleSecondsBeforeExit));
+ pending_tasks_available_cv_.TimedWait(
+ TimeDelta::FromSeconds(idle_seconds_before_exit_));
num_idle_threads_--;
if (num_idle_threads_cv_.get())
num_idle_threads_cv_->Signal();
- if (tasks_.empty()) {
+ if (pending_tasks_.empty()) {
// We waited for work, but there's still no work. Return NULL to signal
// the thread to terminate.
- return NULL;
+ return PendingTask(FROM_HERE, base::Closure());
}
}
- Task* task = tasks_.front();
- tasks_.pop();
- return task;
+ PendingTask pending_task = pending_tasks_.front();
+ pending_tasks_.pop();
+ return pending_task;
}
} // namespace base
diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h
index 9bb9eda..bf75c78 100644
--- a/base/threading/worker_pool_posix.h
+++ b/base/threading/worker_pool_posix.h
@@ -29,11 +29,13 @@
#include <string>
#include "base/basictypes.h"
+#include "base/callback.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
+#include "base/tracked.h"
class Task;
@@ -44,6 +46,19 @@ class BASE_API PosixDynamicThreadPool
public:
class PosixDynamicThreadPoolPeer;
+ struct PendingTask {
+ PendingTask(const tracked_objects::Location& posted_from,
+ const base::Closure& task);
+ ~PendingTask();
+ // TODO(ajwong): After we figure out why Mac's ~AtExitManager dies when
+ // destructing the lock, add in extra info so we can call
+ // tracked_objects::TallyADeathIfActive() and
+ // tracked_objects::TallyABirthIfActive correctly.
+
+ // The task to run.
+ base::Closure task;
+ };
+
// All worker threads will share the same |name_prefix|. They will exit after
// |idle_seconds_before_exit|.
PosixDynamicThreadPool(const std::string& name_prefix,
@@ -56,15 +71,26 @@ class BASE_API PosixDynamicThreadPool
// Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership
// of |task|.
- void PostTask(Task* task);
+ //
+ // TODO(ajwong): Remove this compatibility API once the Task -> Closure
+ // migration is finished.
+ void PostTask(const tracked_objects::Location& from_here, Task* task);
+
+ // Adds |task| to the thread pool.
+ void PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task);
// Worker thread method to wait for up to |idle_seconds_before_exit| for more
// work from the thread pool. Returns NULL if no work is available.
- Task* WaitForTask();
+ PendingTask WaitForTask();
private:
friend class PosixDynamicThreadPoolPeer;
+ // Adds pending_task to the thread pool. This function will clear
+ // |pending_task->task|.
+ void AddTask(PendingTask* pending_task);
+
const std::string name_prefix_;
const int idle_seconds_before_exit_;
@@ -73,9 +99,9 @@ class BASE_API PosixDynamicThreadPool
// Signal()s worker threads to let them know more tasks are available.
// Also used for Broadcast()'ing to worker threads to let them know the pool
// is being deleted and they can exit.
- ConditionVariable tasks_available_cv_;
+ ConditionVariable pending_tasks_available_cv_;
int num_idle_threads_;
- std::queue<Task*> tasks_;
+ std::queue<PendingTask> pending_tasks_;
bool terminated_;
// Only used for tests to ensure correct thread ordering. It will always be
// NULL in non-test code.
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc
index c984ee3..01722f0 100644
--- a/base/threading/worker_pool_posix_unittest.cc
+++ b/base/threading/worker_pool_posix_unittest.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -22,10 +22,12 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
: pool_(pool) {}
Lock* lock() { return &pool_->lock_; }
- ConditionVariable* tasks_available_cv() {
- return &pool_->tasks_available_cv_;
+ ConditionVariable* pending_tasks_available_cv() {
+ return &pool_->pending_tasks_available_cv_;
+ }
+ const std::queue<PendingTask>& pending_tasks() const {
+ return pool_->pending_tasks_;
}
- const std::queue<Task*>& tasks() const { return pool_->tasks_; }
int num_idle_threads() const { return pool_->num_idle_threads_; }
ConditionVariable* num_idle_threads_cv() {
return pool_->num_idle_threads_cv_.get();
@@ -180,10 +182,10 @@ class PosixDynamicThreadPoolTest : public testing::Test {
TEST_F(PosixDynamicThreadPoolTest, Basic) {
EXPECT_EQ(0, peer_.num_idle_threads());
EXPECT_EQ(0U, unique_threads_.size());
- EXPECT_EQ(0U, peer_.tasks().size());
+ EXPECT_EQ(0U, peer_.pending_tasks().size());
// Add one task and wait for it to be completed.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
@@ -195,13 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {
TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
// Add one task and wait for it to be completed.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
// Add another 2 tasks. One should reuse the existing worker thread.
- pool_->PostTask(CreateNewBlockingIncrementingTask());
- pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
WaitForTasksToStart(2);
start_.Signal();
@@ -214,8 +216,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
// Add two blocking tasks.
- pool_->PostTask(CreateNewBlockingIncrementingTask());
- pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
@@ -230,14 +232,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
TEST_F(PosixDynamicThreadPoolTest, Complex) {
// Add two non blocking tasks and wait for them to finish.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
// Add two blocking tasks, start them simultaneously, and wait for them to
// finish.
- pool_->PostTask(CreateNewBlockingIncrementingTask());
- pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
WaitForTasksToStart(2);
start_.Signal();
@@ -251,13 +253,13 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) {
{
base::AutoLock locked(*peer_.lock());
while (peer_.num_idle_threads() > 0) {
- peer_.tasks_available_cv()->Signal();
+ peer_.pending_tasks_available_cv()->Signal();
peer_.num_idle_threads_cv()->Wait();
}
}
// Add another non blocking task. There are no threads to reuse.
- pool_->PostTask(CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);
EXPECT_EQ(3U, unique_threads_.size());
diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc
index 2072e52..2aa423f 100644
--- a/base/threading/worker_pool_win.cc
+++ b/base/threading/worker_pool_win.cc
@@ -1,40 +1,83 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/threading/worker_pool.h"
+#include "base/bind.h"
#include "base/logging.h"
#include "base/task.h"
+#include "base/tracked_objects.h"
namespace base {
namespace {
+struct PendingTask {
+ PendingTask(
+ const tracked_objects::Location& posted_from,
+ const base::Closure& task)
+ : task(task) {
+#if defined(TRACK_ALL_TASK_OBJECTS)
+ post_births = tracked_objects::ThreadData::TallyABirthIfActive(posted_from);
+ time_posted = TimeTicks::Now();
+#endif // defined(TRACK_ALL_TASK_OBJECTS)
+ }
+
+#if defined(TRACK_ALL_TASK_OBJECTS)
+ // Counter for location where the Closure was posted from.
+ tracked_objects::Births* post_births;
+
+ // Time the task was posted.
+ TimeTicks time_posted;
+#endif // defined(TRACK_ALL_TASK_OBJECTS)
+
+ // The task to run.
+ base::Closure task;
+};
+
DWORD CALLBACK WorkItemCallback(void* param) {
- Task* task = static_cast<Task*>(param);
- task->Run();
- delete task;
+ PendingTask* pending_task = static_cast<PendingTask*>(param);
+ pending_task->task.Run();
+#if defined(TRACK_ALL_TASK_OBJECTS)
+ tracked_objects::ThreadData::TallyADeathIfActive(
+ pending_task->post_births,
+ TimeTicks::Now() - pending_task->time_posted);
+#endif // defined(TRACK_ALL_TASK_OBJECTS)
+ delete pending_task;
return 0;
}
-} // namespace
-
-bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
- Task* task, bool task_is_slow) {
- task->SetBirthPlace(from_here);
-
+// Takes ownership of |pending_task|
+bool PostTaskInternal(PendingTask* pending_task, bool task_is_slow) {
ULONG flags = 0;
if (task_is_slow)
flags |= WT_EXECUTELONGFUNCTION;
- if (!QueueUserWorkItem(WorkItemCallback, task, flags)) {
+ if (!QueueUserWorkItem(WorkItemCallback, pending_task, flags)) {
DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError();
- delete task;
+ delete pending_task;
return false;
}
return true;
}
+} // namespace
+
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow) {
+ PendingTask* pending_task =
+ new PendingTask(from_here,
+ base::Bind(&subtle::TaskClosureAdapter::Run,
+ new subtle::TaskClosureAdapter(task)));
+ return PostTaskInternal(pending_task, task_is_slow);
+}
+
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ const base::Closure& task, bool task_is_slow) {
+ PendingTask* pending_task = new PendingTask(from_here, task);
+ return PostTaskInternal(pending_task, task_is_slow);
+}
+
} // namespace base
diff --git a/base/tracked_objects.cc b/base/tracked_objects.cc
index f7fd0c2..21ac0fc 100644
--- a/base/tracked_objects.cc
+++ b/base/tracked_objects.cc
@@ -277,6 +277,26 @@ void ThreadData::TallyADeath(const Births& lifetimes,
}
// static
+Births* ThreadData::TallyABirthIfActive(const Location& location) {
+ if (IsActive()) {
+ ThreadData* current_thread_data = current();
+ if (current_thread_data) {
+ return current_thread_data->TallyABirth(location);
+ }
+ }
+
+ return NULL;
+}
+
+// static
+void ThreadData::TallyADeathIfActive(const Births* the_birth,
+ const base::TimeDelta& duration) {
+ if (IsActive() && the_birth) {
+ current()->TallyADeath(*the_birth, duration);
+ }
+}
+
+// static
ThreadData* ThreadData::first() {
base::AutoLock lock(list_lock_);
return first_;
diff --git a/base/tracked_objects.h b/base/tracked_objects.h
index 5954f16..b6ab345 100644
--- a/base/tracked_objects.h
+++ b/base/tracked_objects.h
@@ -496,6 +496,13 @@ class BASE_API ThreadData {
// Find a place to record a death on this thread.
void TallyADeath(const Births& lifetimes, const base::TimeDelta& duration);
+ // Helper methods to only tally if the current thread has tracking active.
+ //
+ // TallyABirthIfActive will returns NULL if the birth cannot be tallied.
+ static Births* TallyABirthIfActive(const Location& location);
+ static void TallyADeathIfActive(const Births* lifetimes,
+ const base::TimeDelta& duration);
+
// (Thread safe) Get start of list of instances.
static ThreadData* first();
// Iterate through the null terminated list of instances.