diff options
author | ajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-07-26 18:25:16 +0000 |
---|---|---|
committer | ajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-07-26 18:25:16 +0000 |
commit | 180c85e3e3691042ab617fd0755dcde6e75d5fbd (patch) | |
tree | b9d4fd7a77f7f54dce4463960326ef7b0cd7a270 /base | |
parent | 324ab8e0d77303333f8ad7de3b54d248587687db (diff) | |
download | chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.zip chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.gz chromium_src-180c85e3e3691042ab617fd0755dcde6e75d5fbd.tar.bz2 |
Support Closure in ALL the loops!
Add an overload for PostTask into MessageLoopProxy, and WorkerPool.
BUG=35223
TEST=unittests.
Review URL: http://codereview.chromium.org/7316015
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@94129 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/message_loop.cc | 71 | ||||
-rw-r--r-- | base/message_loop_proxy.h | 24 | ||||
-rw-r--r-- | base/message_loop_proxy_impl.cc | 42 | ||||
-rw-r--r-- | base/message_loop_proxy_impl.h | 24 | ||||
-rw-r--r-- | base/message_loop_proxy_impl_unittest.cc | 48 | ||||
-rw-r--r-- | base/task.cc | 29 | ||||
-rw-r--r-- | base/task.h | 38 | ||||
-rw-r--r-- | base/threading/worker_pool.h | 6 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.cc | 102 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.h | 34 | ||||
-rw-r--r-- | base/threading/worker_pool_posix_unittest.cc | 34 | ||||
-rw-r--r-- | base/threading/worker_pool_win.cc | 67 | ||||
-rw-r--r-- | base/tracked_objects.cc | 20 | ||||
-rw-r--r-- | base/tracked_objects.h | 7 |
14 files changed, 420 insertions, 126 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 08985a4..e3f6ea3 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -85,40 +85,6 @@ const base::LinearHistogram::DescriptionPair event_descriptions_[] = { bool enable_histogrammer_ = false; -// TODO(ajwong): This is one use case for having a Owned() tag that behaves -// like a "Unique" pointer. If we had that, and Tasks were always safe to -// delete on MessageLoop shutdown, this class could just be a function. -class TaskClosureAdapter : public base::RefCounted<TaskClosureAdapter> { - public: - // |should_leak_task| points to a flag variable that can be used to determine - // if this class should leak the Task on destruction. This is important - // at MessageLoop shutdown since not all tasks can be safely deleted without - // running. See MessageLoop::DeletePendingTasks() for the exact behavior - // of when a Task should be deleted. It is subtle. - TaskClosureAdapter(Task* task, bool* should_leak_task) - : task_(task), - should_leak_task_(should_leak_task) { - } - - void Run() { - task_->Run(); - delete task_; - task_ = NULL; - } - - private: - friend class base::RefCounted<TaskClosureAdapter>; - - ~TaskClosureAdapter() { - if (!*should_leak_task_) { - delete task_; - } - } - - Task* task_; - bool* should_leak_task_; -}; - } // namespace //------------------------------------------------------------------------------ @@ -271,8 +237,9 @@ void MessageLoop::PostTask( const tracked_objects::Location& from_here, Task* task) { CHECK(task); PendingTask pending_task( - base::Bind(&TaskClosureAdapter::Run, - new TaskClosureAdapter(task, &should_leak_tasks_)), + base::Bind( + &base::subtle::TaskClosureAdapter::Run, + new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)), from_here, CalculateDelayedRuntime(0), true); AddToIncomingQueue(&pending_task); @@ -282,8 +249,9 @@ void MessageLoop::PostDelayedTask( const tracked_objects::Location& from_here, Task* task, int64 delay_ms) { CHECK(task); PendingTask pending_task( - base::Bind(&TaskClosureAdapter::Run, - new TaskClosureAdapter(task, &should_leak_tasks_)), + base::Bind( + &base::subtle::TaskClosureAdapter::Run, + new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)), from_here, CalculateDelayedRuntime(delay_ms), true); AddToIncomingQueue(&pending_task); @@ -293,8 +261,9 @@ void MessageLoop::PostNonNestableTask( const tracked_objects::Location& from_here, Task* task) { CHECK(task); PendingTask pending_task( - base::Bind(&TaskClosureAdapter::Run, - new TaskClosureAdapter(task, &should_leak_tasks_)), + base::Bind( + &base::subtle::TaskClosureAdapter::Run, + new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)), from_here, CalculateDelayedRuntime(0), false); AddToIncomingQueue(&pending_task); @@ -304,8 +273,9 @@ void MessageLoop::PostNonNestableDelayedTask( const tracked_objects::Location& from_here, Task* task, int64 delay_ms) { CHECK(task); PendingTask pending_task( - base::Bind(&TaskClosureAdapter::Run, - new TaskClosureAdapter(task, &should_leak_tasks_)), + base::Bind( + &base::subtle::TaskClosureAdapter::Run, + new base::subtle::TaskClosureAdapter(task, &should_leak_tasks_)), from_here, CalculateDelayedRuntime(delay_ms), false); AddToIncomingQueue(&pending_task); @@ -486,11 +456,9 @@ void MessageLoop::RunTask(const PendingTask& pending_task) { DidProcessTask(pending_task.time_posted)); #if defined(TRACK_ALL_TASK_OBJECTS) - if (tracked_objects::ThreadData::IsActive() && pending_task.post_births) { - tracked_objects::ThreadData::current()->TallyADeath( - *pending_task.post_births, - TimeTicks::Now() - pending_task.time_posted); - } + tracked_objects::ThreadData::TallyADeathIfActive( + pending_task.post_births, + TimeTicks::Now() - pending_task.time_posted); #endif // defined(TRACK_ALL_TASK_OBJECTS) nestable_tasks_allowed_ = true; @@ -780,14 +748,7 @@ MessageLoop::PendingTask::PendingTask( nestable(nestable), birth_program_counter(posted_from.program_counter()) { #if defined(TRACK_ALL_TASK_OBJECTS) - post_births = NULL; - if (tracked_objects::ThreadData::IsActive()) { - tracked_objects::ThreadData* current_thread_data = - tracked_objects::ThreadData::current(); - if (current_thread_data) { - post_births = current_thread_data->TallyABirth(posted_from); - } - } + post_births = tracked_objects::ThreadData::TallyABirthIfActive(posted_from); #endif // defined(TRACK_ALL_TASK_OBJECTS) } diff --git a/base/message_loop_proxy.h b/base/message_loop_proxy.h index 07bca64..d6955ba 100644 --- a/base/message_loop_proxy.h +++ b/base/message_loop_proxy.h @@ -8,6 +8,7 @@ #include "base/base_api.h" #include "base/basictypes.h" +#include "base/callback.h" #include "base/memory/ref_counted.h" #include "base/task.h" @@ -34,13 +35,34 @@ class BASE_API MessageLoopProxy virtual bool PostTask(const tracked_objects::Location& from_here, Task* task) = 0; virtual bool PostDelayedTask(const tracked_objects::Location& from_here, - Task* task, int64 delay_ms) = 0; + Task* task, + int64 delay_ms) = 0; virtual bool PostNonNestableTask(const tracked_objects::Location& from_here, Task* task) = 0; virtual bool PostNonNestableDelayedTask( const tracked_objects::Location& from_here, Task* task, int64 delay_ms) = 0; + + // TODO(ajwong): Remove the functions above once the Task -> Closure migration + // is complete. + // + // There are 2 sets of Post*Task functions, one which takes the older Task* + // function object representation, and one that takes the newer base::Closure. + // We have this overload to allow a staged transition between the two systems. + // Once the transition is done, the functions above should be deleted. + virtual bool PostTask(const tracked_objects::Location& from_here, + const base::Closure& task) = 0; + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms) = 0; + virtual bool PostNonNestableTask(const tracked_objects::Location& from_here, + const base::Closure& task) = 0; + virtual bool PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms) = 0; + // A method which checks if the caller is currently running in the thread that // this proxy represents. virtual bool BelongsToCurrentThread() = 0; diff --git a/base/message_loop_proxy_impl.cc b/base/message_loop_proxy_impl.cc index b47c934..af0d214 100644 --- a/base/message_loop_proxy_impl.cc +++ b/base/message_loop_proxy_impl.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -40,6 +40,30 @@ bool MessageLoopProxyImpl::PostNonNestableDelayedTask( return PostTaskHelper(from_here, task, delay_ms, false); } +bool MessageLoopProxyImpl::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task) { + return PostTaskHelper(from_here, task, 0, true); +} + +bool MessageLoopProxyImpl::PostDelayedTask( + const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms) { + return PostTaskHelper(from_here, task, delay_ms, true); +} + +bool MessageLoopProxyImpl::PostNonNestableTask( + const tracked_objects::Location& from_here, const base::Closure& task) { + return PostTaskHelper(from_here, task, 0, false); +} + +bool MessageLoopProxyImpl::PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms) { + return PostTaskHelper(from_here, task, delay_ms, false); +} + bool MessageLoopProxyImpl::BelongsToCurrentThread() { // We shouldn't use MessageLoop::current() since it uses LazyInstance which // may be deleted by ~AtExitManager when a WorkerPool thread calls this @@ -102,6 +126,22 @@ bool MessageLoopProxyImpl::PostTaskHelper( return ret; } +bool MessageLoopProxyImpl::PostTaskHelper( + const tracked_objects::Location& from_here, const base::Closure& task, + int64 delay_ms, bool nestable) { + AutoLock lock(message_loop_lock_); + if (target_message_loop_) { + if (nestable) { + target_message_loop_->PostDelayedTask(from_here, task, delay_ms); + } else { + target_message_loop_->PostNonNestableDelayedTask(from_here, task, + delay_ms); + } + return true; + } + return false; +} + scoped_refptr<MessageLoopProxy> MessageLoopProxy::CreateForCurrentThread() { scoped_refptr<MessageLoopProxy> ret(new MessageLoopProxyImpl()); diff --git a/base/message_loop_proxy_impl.h b/base/message_loop_proxy_impl.h index 80d9a26..4c04148 100644 --- a/base/message_loop_proxy_impl.h +++ b/base/message_loop_proxy_impl.h @@ -25,13 +25,25 @@ class BASE_API MessageLoopProxyImpl : public MessageLoopProxy, virtual bool PostTask(const tracked_objects::Location& from_here, Task* task); virtual bool PostDelayedTask(const tracked_objects::Location& from_here, - Task* task, int64 delay_ms); + Task* task, + int64 delay_ms); virtual bool PostNonNestableTask(const tracked_objects::Location& from_here, Task* task); virtual bool PostNonNestableDelayedTask( const tracked_objects::Location& from_here, Task* task, int64 delay_ms); + virtual bool PostTask(const tracked_objects::Location& from_here, + const base::Closure& task); + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms); + virtual bool PostNonNestableTask(const tracked_objects::Location& from_here, + const base::Closure& task); + virtual bool PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms); virtual bool BelongsToCurrentThread(); // MessageLoop::DestructionObserver implementation @@ -44,8 +56,15 @@ class BASE_API MessageLoopProxyImpl : public MessageLoopProxy, private: MessageLoopProxyImpl(); + // TODO(ajwong): Remove this after we've fully migrated to base::Closure. bool PostTaskHelper(const tracked_objects::Location& from_here, - Task* task, int64 delay_ms, bool nestable); + Task* task, + int64 delay_ms, + bool nestable); + bool PostTaskHelper(const tracked_objects::Location& from_here, + const base::Closure& task, + int64 delay_ms, + bool nestable); // For the factory method to work friend class MessageLoopProxy; @@ -60,4 +79,3 @@ class BASE_API MessageLoopProxyImpl : public MessageLoopProxy, } // namespace base #endif // BASE_MESSAGE_LOOP_PROXY_IMPL_H_ - diff --git a/base/message_loop_proxy_impl_unittest.cc b/base/message_loop_proxy_impl_unittest.cc index 558cd932..d635acb 100644 --- a/base/message_loop_proxy_impl_unittest.cc +++ b/base/message_loop_proxy_impl_unittest.cc @@ -2,9 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/message_loop_proxy_impl.h" + +#include "base/bind.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop.h" -#include "base/message_loop_proxy_impl.h" #include "base/threading/thread.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/platform_test.h" @@ -47,6 +49,10 @@ class MessageLoopProxyImplTest : public testing::Test { test->Quit(); } + static void AssertNotRun() { + FAIL() << "Callback Should not get executed."; + } + class DummyTask : public Task { public: explicit DummyTask(bool* deleted) : deleted_(deleted) { } @@ -83,7 +89,7 @@ class MessageLoopProxyImplTest : public testing::Test { }; -TEST_F(MessageLoopProxyImplTest, PostTask) { +TEST_F(MessageLoopProxyImplTest, LegacyPostTask) { EXPECT_TRUE(file_thread_->message_loop_proxy()->PostTask( FROM_HERE, NewRunnableFunction(&BasicFunction, this))); MessageLoop::current()->Run(); @@ -101,7 +107,7 @@ TEST_F(MessageLoopProxyImplTest, Delete) { MessageLoop::current()->Run(); } -TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadExits) { +TEST_F(MessageLoopProxyImplTest, LegacyPostTaskAfterThreadExits) { scoped_ptr<base::Thread> test_thread( new base::Thread("MessageLoopProxyImplTest_Dummy")); test_thread->Start(); @@ -116,7 +122,7 @@ TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadExits) { EXPECT_TRUE(deleted); } -TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadIsDeleted) { +TEST_F(MessageLoopProxyImplTest, LegacyPostTaskAfterThreadIsDeleted) { scoped_refptr<base::MessageLoopProxy> message_loop_proxy; { scoped_ptr<base::Thread> test_thread( @@ -130,3 +136,37 @@ TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadIsDeleted) { EXPECT_TRUE(deleted); } +TEST_F(MessageLoopProxyImplTest, PostTask) { + EXPECT_TRUE(file_thread_->message_loop_proxy()->PostTask( + FROM_HERE, base::Bind(&MessageLoopProxyImplTest::BasicFunction, + base::Unretained(this)))); + MessageLoop::current()->Run(); +} + +TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadExits) { + scoped_ptr<base::Thread> test_thread( + new base::Thread("MessageLoopProxyImplTest_Dummy")); + test_thread->Start(); + scoped_refptr<base::MessageLoopProxy> message_loop_proxy = + test_thread->message_loop_proxy(); + test_thread->Stop(); + + bool ret = message_loop_proxy->PostTask( + FROM_HERE, + base::Bind(&MessageLoopProxyImplTest::AssertNotRun)); + EXPECT_FALSE(ret); +} + +TEST_F(MessageLoopProxyImplTest, PostTaskAfterThreadIsDeleted) { + scoped_refptr<base::MessageLoopProxy> message_loop_proxy; + { + scoped_ptr<base::Thread> test_thread( + new base::Thread("MessageLoopProxyImplTest_Dummy")); + test_thread->Start(); + message_loop_proxy = test_thread->message_loop_proxy(); + } + bool ret = message_loop_proxy->PostTask( + FROM_HERE, + base::Bind(&MessageLoopProxyImplTest::AssertNotRun)); + EXPECT_FALSE(ret); +} diff --git a/base/task.cc b/base/task.cc index e4da547..8c61473 100644 --- a/base/task.cc +++ b/base/task.cc @@ -34,4 +34,33 @@ Task* ScopedTaskRunner::Release() { return tmp; } +namespace subtle { + +TaskClosureAdapter::TaskClosureAdapter(Task* task) + : task_(task), + should_leak_task_(&kTaskLeakingDefault) { +} + +TaskClosureAdapter::TaskClosureAdapter(Task* task, bool* should_leak_task) + : task_(task), + should_leak_task_(should_leak_task) { +} + +TaskClosureAdapter::~TaskClosureAdapter() { + if (!*should_leak_task_) { + delete task_; + } +} + +void TaskClosureAdapter::Run() { + task_->Run(); + delete task_; + task_ = NULL; +} + +// Don't leak tasks by default. +bool TaskClosureAdapter::kTaskLeakingDefault = false; + +} // namespace subtle + } // namespace base diff --git a/base/task.h b/base/task.h index ae47f32..4bee08c 100644 --- a/base/task.h +++ b/base/task.h @@ -564,6 +564,44 @@ class BASE_API ScopedTaskRunner { DISALLOW_IMPLICIT_CONSTRUCTORS(ScopedTaskRunner); }; +namespace subtle { + +// This class is meant for use in the implementation of MessageLoop classes +// such as MessageLoop, MessageLoopProxy, BrowserThread, and WorkerPool to +// implement the compatibility APIs while we are transitioning from Task to +// Callback. +// +// It should NOT be used anywhere else! +// +// In particular, notice that this is RefCounted instead of +// RefCountedThreadSafe. We rely on the fact that users of this class are +// careful to ensure that a lock is taken during transfer of ownership for +// objects from this class to ensure the refcount is not corrupted. +class TaskClosureAdapter : public RefCounted<TaskClosureAdapter> { + public: + explicit TaskClosureAdapter(Task* task); + + // |should_leak_task| points to a flag variable that can be used to determine + // if this class should leak the Task on destruction. This is important + // at MessageLoop shutdown since not all tasks can be safely deleted without + // running. See MessageLoop::DeletePendingTasks() for the exact behavior + // of when a Task should be deleted. It is subtle. + TaskClosureAdapter(Task* task, bool* should_leak_task); + + void Run(); + + private: + friend class base::RefCounted<TaskClosureAdapter>; + + ~TaskClosureAdapter(); + + Task* task_; + bool* should_leak_task_; + static bool kTaskLeakingDefault; +}; + +} // namespace subtle + } // namespace base #endif // BASE_TASK_H_ diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h index 12b50b4..58471b5 100644 --- a/base/threading/worker_pool.h +++ b/base/threading/worker_pool.h @@ -7,6 +7,7 @@ #pragma once #include "base/base_api.h" +#include "base/callback.h" #include "base/tracked.h" class Task; @@ -27,8 +28,13 @@ class BASE_API WorkerPool { // should be used for tasks that will take a long time to execute. Returns // false if |task| could not be posted to a worker thread. Regardless of // return value, ownership of |task| is transferred to the worker pool. + // + // TODO(ajwong): Remove the Task* based overload once we've finishsed the + // Task -> Closure migration. static bool PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow); + static bool PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow); }; } // namespace base diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc index bd6210f..c3a382c 100644 --- a/base/threading/worker_pool_posix.cc +++ b/base/threading/worker_pool_posix.cc @@ -4,6 +4,7 @@ #include "base/threading/worker_pool_posix.h" +#include "base/bind.h" #include "base/lazy_instance.h" #include "base/logging.h" #include "base/memory/ref_counted.h" @@ -11,6 +12,7 @@ #include "base/task.h" #include "base/threading/platform_thread.h" #include "base/threading/worker_pool.h" +#include "base/tracked_objects.h" namespace base { @@ -28,6 +30,8 @@ class WorkerPoolImpl { void PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow); + void PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow); private: scoped_refptr<base::PosixDynamicThreadPool> pool_; @@ -44,25 +48,27 @@ WorkerPoolImpl::~WorkerPoolImpl() { void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); - pool_->PostTask(task); + pool_->PostTask(from_here, task); +} + +void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + pool_->PostTask(from_here, task); } base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); class WorkerThread : public PlatformThread::Delegate { public: - WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, + WorkerThread(const std::string& name_prefix, base::PosixDynamicThreadPool* pool) : name_prefix_(name_prefix), - idle_seconds_before_exit_(idle_seconds_before_exit), pool_(pool) {} virtual void ThreadMain(); private: const std::string name_prefix_; - const int idle_seconds_before_exit_; scoped_refptr<base::PosixDynamicThreadPool> pool_; DISALLOW_COPY_AND_ASSIGN(WorkerThread); @@ -74,11 +80,10 @@ void WorkerThread::ThreadMain() { PlatformThread::SetName(name.c_str()); for (;;) { - Task* task = pool_->WaitForTask(); - if (!task) + PosixDynamicThreadPool::PendingTask pending_task = pool_->WaitForTask(); + if (pending_task.task.is_null()) break; - task->Run(); - delete task; + pending_task.task.Run(); } // The WorkerThread is non-joinable, so it deletes itself. @@ -93,21 +98,35 @@ bool WorkerPool::PostTask(const tracked_objects::Location& from_here, return true; } +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); + return true; +} + +PosixDynamicThreadPool::PendingTask::PendingTask( + const tracked_objects::Location& posted_from, + const base::Closure& task) + : task(task) { +} + +PosixDynamicThreadPool::PendingTask::~PendingTask() { +} + PosixDynamicThreadPool::PosixDynamicThreadPool( const std::string& name_prefix, int idle_seconds_before_exit) : name_prefix_(name_prefix), idle_seconds_before_exit_(idle_seconds_before_exit), - tasks_available_cv_(&lock_), + pending_tasks_available_cv_(&lock_), num_idle_threads_(0), terminated_(false), num_idle_threads_cv_(NULL) {} PosixDynamicThreadPool::~PosixDynamicThreadPool() { - while (!tasks_.empty()) { - Task* task = tasks_.front(); - tasks_.pop(); - delete task; + while (!pending_tasks_.empty()) { + PendingTask pending_task = pending_tasks_.front(); + pending_tasks_.pop(); } } @@ -117,53 +136,76 @@ void PosixDynamicThreadPool::Terminate() { DCHECK(!terminated_) << "Thread pool is already terminated."; terminated_ = true; } - tasks_available_cv_.Broadcast(); + pending_tasks_available_cv_.Broadcast(); +} + +void PosixDynamicThreadPool::PostTask( + const tracked_objects::Location& from_here, + Task* task) { + PendingTask pending_task(from_here, + base::Bind(&subtle::TaskClosureAdapter::Run, + new subtle::TaskClosureAdapter(task))); + // |pending_task| and AddTask() work in conjunction here to ensure that after + // a successful AddTask(), the TaskClosureAdapter object is deleted on the + // worker thread. In AddTask(), the reference |pending_task.task| is handed + // off in a destructive manner to ensure that the local copy of + // |pending_task| doesn't keep a ref on the Closure causing the + // TaskClosureAdapter to be deleted on the wrong thread. + AddTask(&pending_task); +} + +void PosixDynamicThreadPool::PostTask( + const tracked_objects::Location& from_here, + const base::Closure& task) { + PendingTask pending_task(from_here, task); + AddTask(&pending_task); } -void PosixDynamicThreadPool::PostTask(Task* task) { +void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { AutoLock locked(lock_); DCHECK(!terminated_) << "This thread pool is already terminated. Do not post new tasks."; - tasks_.push(task); + pending_tasks_.push(*pending_task); + pending_task->task.Reset(); // We have enough worker threads. - if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { - tasks_available_cv_.Signal(); + if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { + pending_tasks_available_cv_.Signal(); } else { // The new PlatformThread will take ownership of the WorkerThread object, // which will delete itself on exit. WorkerThread* worker = - new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); + new WorkerThread(name_prefix_, this); PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); } } -Task* PosixDynamicThreadPool::WaitForTask() { +PosixDynamicThreadPool::PendingTask PosixDynamicThreadPool::WaitForTask() { AutoLock locked(lock_); if (terminated_) - return NULL; + return PendingTask(FROM_HERE, base::Closure()); - if (tasks_.empty()) { // No work available, wait for work. + if (pending_tasks_.empty()) { // No work available, wait for work. num_idle_threads_++; if (num_idle_threads_cv_.get()) num_idle_threads_cv_->Signal(); - tasks_available_cv_.TimedWait( - TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); + pending_tasks_available_cv_.TimedWait( + TimeDelta::FromSeconds(idle_seconds_before_exit_)); num_idle_threads_--; if (num_idle_threads_cv_.get()) num_idle_threads_cv_->Signal(); - if (tasks_.empty()) { + if (pending_tasks_.empty()) { // We waited for work, but there's still no work. Return NULL to signal // the thread to terminate. - return NULL; + return PendingTask(FROM_HERE, base::Closure()); } } - Task* task = tasks_.front(); - tasks_.pop(); - return task; + PendingTask pending_task = pending_tasks_.front(); + pending_tasks_.pop(); + return pending_task; } } // namespace base diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h index 9bb9eda..bf75c78 100644 --- a/base/threading/worker_pool_posix.h +++ b/base/threading/worker_pool_posix.h @@ -29,11 +29,13 @@ #include <string> #include "base/basictypes.h" +#include "base/callback.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" +#include "base/tracked.h" class Task; @@ -44,6 +46,19 @@ class BASE_API PosixDynamicThreadPool public: class PosixDynamicThreadPoolPeer; + struct PendingTask { + PendingTask(const tracked_objects::Location& posted_from, + const base::Closure& task); + ~PendingTask(); + // TODO(ajwong): After we figure out why Mac's ~AtExitManager dies when + // destructing the lock, add in extra info so we can call + // tracked_objects::TallyADeathIfActive() and + // tracked_objects::TallyABirthIfActive correctly. + + // The task to run. + base::Closure task; + }; + // All worker threads will share the same |name_prefix|. They will exit after // |idle_seconds_before_exit|. PosixDynamicThreadPool(const std::string& name_prefix, @@ -56,15 +71,26 @@ class BASE_API PosixDynamicThreadPool // Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership // of |task|. - void PostTask(Task* task); + // + // TODO(ajwong): Remove this compatibility API once the Task -> Closure + // migration is finished. + void PostTask(const tracked_objects::Location& from_here, Task* task); + + // Adds |task| to the thread pool. + void PostTask(const tracked_objects::Location& from_here, + const base::Closure& task); // Worker thread method to wait for up to |idle_seconds_before_exit| for more // work from the thread pool. Returns NULL if no work is available. - Task* WaitForTask(); + PendingTask WaitForTask(); private: friend class PosixDynamicThreadPoolPeer; + // Adds pending_task to the thread pool. This function will clear + // |pending_task->task|. + void AddTask(PendingTask* pending_task); + const std::string name_prefix_; const int idle_seconds_before_exit_; @@ -73,9 +99,9 @@ class BASE_API PosixDynamicThreadPool // Signal()s worker threads to let them know more tasks are available. // Also used for Broadcast()'ing to worker threads to let them know the pool // is being deleted and they can exit. - ConditionVariable tasks_available_cv_; + ConditionVariable pending_tasks_available_cv_; int num_idle_threads_; - std::queue<Task*> tasks_; + std::queue<PendingTask> pending_tasks_; bool terminated_; // Only used for tests to ensure correct thread ordering. It will always be // NULL in non-test code. diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc index c984ee3..01722f0 100644 --- a/base/threading/worker_pool_posix_unittest.cc +++ b/base/threading/worker_pool_posix_unittest.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -22,10 +22,12 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { : pool_(pool) {} Lock* lock() { return &pool_->lock_; } - ConditionVariable* tasks_available_cv() { - return &pool_->tasks_available_cv_; + ConditionVariable* pending_tasks_available_cv() { + return &pool_->pending_tasks_available_cv_; + } + const std::queue<PendingTask>& pending_tasks() const { + return pool_->pending_tasks_; } - const std::queue<Task*>& tasks() const { return pool_->tasks_; } int num_idle_threads() const { return pool_->num_idle_threads_; } ConditionVariable* num_idle_threads_cv() { return pool_->num_idle_threads_cv_.get(); @@ -180,10 +182,10 @@ class PosixDynamicThreadPoolTest : public testing::Test { TEST_F(PosixDynamicThreadPoolTest, Basic) { EXPECT_EQ(0, peer_.num_idle_threads()); EXPECT_EQ(0U, unique_threads_.size()); - EXPECT_EQ(0U, peer_.tasks().size()); + EXPECT_EQ(0U, peer_.pending_tasks().size()); // Add one task and wait for it to be completed. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); @@ -195,13 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { // Add one task and wait for it to be completed. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); // Add another 2 tasks. One should reuse the existing worker thread. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); WaitForTasksToStart(2); start_.Signal(); @@ -214,8 +216,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { // Add two blocking tasks. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; @@ -230,14 +232,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { TEST_F(PosixDynamicThreadPoolTest, Complex) { // Add two non blocking tasks and wait for them to finish. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); // Add two blocking tasks, start them simultaneously, and wait for them to // finish. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); WaitForTasksToStart(2); start_.Signal(); @@ -251,13 +253,13 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) { { base::AutoLock locked(*peer_.lock()); while (peer_.num_idle_threads() > 0) { - peer_.tasks_available_cv()->Signal(); + peer_.pending_tasks_available_cv()->Signal(); peer_.num_idle_threads_cv()->Wait(); } } // Add another non blocking task. There are no threads to reuse. - pool_->PostTask(CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); EXPECT_EQ(3U, unique_threads_.size()); diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc index 2072e52..2aa423f 100644 --- a/base/threading/worker_pool_win.cc +++ b/base/threading/worker_pool_win.cc @@ -1,40 +1,83 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/threading/worker_pool.h" +#include "base/bind.h" #include "base/logging.h" #include "base/task.h" +#include "base/tracked_objects.h" namespace base { namespace { +struct PendingTask { + PendingTask( + const tracked_objects::Location& posted_from, + const base::Closure& task) + : task(task) { +#if defined(TRACK_ALL_TASK_OBJECTS) + post_births = tracked_objects::ThreadData::TallyABirthIfActive(posted_from); + time_posted = TimeTicks::Now(); +#endif // defined(TRACK_ALL_TASK_OBJECTS) + } + +#if defined(TRACK_ALL_TASK_OBJECTS) + // Counter for location where the Closure was posted from. + tracked_objects::Births* post_births; + + // Time the task was posted. + TimeTicks time_posted; +#endif // defined(TRACK_ALL_TASK_OBJECTS) + + // The task to run. + base::Closure task; +}; + DWORD CALLBACK WorkItemCallback(void* param) { - Task* task = static_cast<Task*>(param); - task->Run(); - delete task; + PendingTask* pending_task = static_cast<PendingTask*>(param); + pending_task->task.Run(); +#if defined(TRACK_ALL_TASK_OBJECTS) + tracked_objects::ThreadData::TallyADeathIfActive( + pending_task->post_births, + TimeTicks::Now() - pending_task->time_posted); +#endif // defined(TRACK_ALL_TASK_OBJECTS) + delete pending_task; return 0; } -} // namespace - -bool WorkerPool::PostTask(const tracked_objects::Location& from_here, - Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); - +// Takes ownership of |pending_task| +bool PostTaskInternal(PendingTask* pending_task, bool task_is_slow) { ULONG flags = 0; if (task_is_slow) flags |= WT_EXECUTELONGFUNCTION; - if (!QueueUserWorkItem(WorkItemCallback, task, flags)) { + if (!QueueUserWorkItem(WorkItemCallback, pending_task, flags)) { DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError(); - delete task; + delete pending_task; return false; } return true; } +} // namespace + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + PendingTask* pending_task = + new PendingTask(from_here, + base::Bind(&subtle::TaskClosureAdapter::Run, + new subtle::TaskClosureAdapter(task))); + return PostTaskInternal(pending_task, task_is_slow); +} + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + PendingTask* pending_task = new PendingTask(from_here, task); + return PostTaskInternal(pending_task, task_is_slow); +} + } // namespace base diff --git a/base/tracked_objects.cc b/base/tracked_objects.cc index f7fd0c2..21ac0fc 100644 --- a/base/tracked_objects.cc +++ b/base/tracked_objects.cc @@ -277,6 +277,26 @@ void ThreadData::TallyADeath(const Births& lifetimes, } // static +Births* ThreadData::TallyABirthIfActive(const Location& location) { + if (IsActive()) { + ThreadData* current_thread_data = current(); + if (current_thread_data) { + return current_thread_data->TallyABirth(location); + } + } + + return NULL; +} + +// static +void ThreadData::TallyADeathIfActive(const Births* the_birth, + const base::TimeDelta& duration) { + if (IsActive() && the_birth) { + current()->TallyADeath(*the_birth, duration); + } +} + +// static ThreadData* ThreadData::first() { base::AutoLock lock(list_lock_); return first_; diff --git a/base/tracked_objects.h b/base/tracked_objects.h index 5954f16..b6ab345 100644 --- a/base/tracked_objects.h +++ b/base/tracked_objects.h @@ -496,6 +496,13 @@ class BASE_API ThreadData { // Find a place to record a death on this thread. void TallyADeath(const Births& lifetimes, const base::TimeDelta& duration); + // Helper methods to only tally if the current thread has tracking active. + // + // TallyABirthIfActive will returns NULL if the birth cannot be tallied. + static Births* TallyABirthIfActive(const Location& location); + static void TallyADeathIfActive(const Births* lifetimes, + const base::TimeDelta& duration); + // (Thread safe) Get start of list of instances. static ThreadData* first(); // Iterate through the null terminated list of instances. |