diff options
author | yzshen <yzshen@chromium.org> | 2015-07-20 12:59:53 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-07-20 20:02:55 +0000 |
commit | da8f981cd01fc15b878c6bfd692adbd3c151855a (patch) | |
tree | adf209f088b2b000366d6ef6cc3a1c1fc7e2370e /base/threading | |
parent | 7b86b940888b925afe52396edf9551ce62bb2660 (diff) | |
download | chromium_src-da8f981cd01fc15b878c6bfd692adbd3c151855a.zip chromium_src-da8f981cd01fc15b878c6bfd692adbd3c151855a.tar.gz chromium_src-da8f981cd01fc15b878c6bfd692adbd3c151855a.tar.bz2 |
WorkerPool: Support clean shutdown on POSIX.
BUG=498986
Review URL: https://codereview.chromium.org/1190363002
Cr-Commit-Position: refs/heads/master@{#339493}
Diffstat (limited to 'base/threading')
-rw-r--r-- | base/threading/worker_pool.h | 17 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.cc | 195 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.h | 58 | ||||
-rw-r--r-- | base/threading/worker_pool_posix_unittest.cc | 180 | ||||
-rw-r--r-- | base/threading/worker_pool_win.cc | 6 |
5 files changed, 332 insertions, 124 deletions
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h index a52a414..f8c6235 100644 --- a/base/threading/worker_pool.h +++ b/base/threading/worker_pool.h @@ -22,11 +22,11 @@ class TaskRunner; // This is a facility that runs tasks that don't require a specific thread or // a message loop. // -// WARNING: This shouldn't be used unless absolutely necessary. We don't wait -// for the worker pool threads to finish on shutdown, so the tasks running -// inside the pool must be extremely careful about other objects they access -// (MessageLoops, Singletons, etc). During shutdown these object may no longer -// exist. +// WARNING: This shouldn't be used unless absolutely necessary. Typically +// (without calling ShutDownCleanly()), we don't wait for the worker pool +// threads to finish on shutdown, so the tasks running inside the pool must be +// extremely careful about other objects they access (MessageLoops, Singletons, +// etc). During shutdown these object may no longer exist. class BASE_EXPORT WorkerPool { public: // This function posts |task| to run on a worker thread. |task_is_slow| @@ -53,6 +53,13 @@ class BASE_EXPORT WorkerPool { // Get a TaskRunner wrapper which posts to the WorkerPool using the given // |task_is_slow| behavior. static const scoped_refptr<TaskRunner>& GetTaskRunner(bool task_is_slow); + + // Blocks until all worker threads quit cleanly. Please note that it ensures + // that no worker threads are running after the method returns, but it doesn't + // guarantee to process all queued pending tasks. This method may take a long + // time. Please don't use it unless absolutely necessary, e.g., when we want + // to unload the library containing the worker pool before process shutdown. + static void ShutDownCleanly(); }; } // namespace base diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc index 349b5d7..231aa68 100644 --- a/base/threading/worker_pool_posix.cc +++ b/base/threading/worker_pool_posix.cc @@ -22,10 +22,10 @@ namespace base { namespace { -base::LazyInstance<ThreadLocalBoolean>::Leaky - g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; +LazyInstance<ThreadLocalBoolean>::Leaky g_worker_pool_running_on_this_thread = + LAZY_INSTANCE_INITIALIZER; -const int kIdleSecondsBeforeExit = 10 * 60; +const int64 kIdleSecondsBeforeExit = 10 * 60; class WorkerPoolImpl { public: @@ -33,49 +33,55 @@ class WorkerPoolImpl { ~WorkerPoolImpl(); void PostTask(const tracked_objects::Location& from_here, - const base::Closure& task, bool task_is_slow); + const Closure& task, + bool task_is_slow); + + void ShutDownCleanly(); private: - scoped_refptr<base::PosixDynamicThreadPool> pool_; + scoped_refptr<PosixDynamicThreadPool> pool_; }; WorkerPoolImpl::WorkerPoolImpl() - : pool_(new base::PosixDynamicThreadPool("WorkerPool", - kIdleSecondsBeforeExit)) { + : pool_(new PosixDynamicThreadPool( + "WorkerPool", + TimeDelta::FromSeconds(kIdleSecondsBeforeExit))) { } WorkerPoolImpl::~WorkerPoolImpl() { - pool_->Terminate(); + pool_->Terminate(false); } void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, - const base::Closure& task, bool task_is_slow) { + const Closure& task, + bool task_is_slow) { pool_->PostTask(from_here, task); } -base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = - LAZY_INSTANCE_INITIALIZER; +void WorkerPoolImpl::ShutDownCleanly() { + pool_->Terminate(true); +} + +LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = LAZY_INSTANCE_INITIALIZER; class WorkerThread : public PlatformThread::Delegate { public: - WorkerThread(const std::string& name_prefix, - base::PosixDynamicThreadPool* pool) - : name_prefix_(name_prefix), - pool_(pool) {} + WorkerThread(const std::string& name_prefix, PosixDynamicThreadPool* pool) + : name_prefix_(name_prefix), pool_(pool) {} void ThreadMain() override; private: const std::string name_prefix_; - scoped_refptr<base::PosixDynamicThreadPool> pool_; + scoped_refptr<PosixDynamicThreadPool> pool_; DISALLOW_COPY_AND_ASSIGN(WorkerThread); }; void WorkerThread::ThreadMain() { g_worker_pool_running_on_this_thread.Get().Set(true); - const std::string name = base::StringPrintf( - "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); + const std::string name = + StringPrintf("%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); // Note |name.c_str()| must remain valid for for the whole life of the thread. PlatformThread::SetName(name); @@ -96,7 +102,7 @@ void WorkerThread::ThreadMain() { pending_task.birth_tally, pending_task.time_posted, stopwatch); } - // The WorkerThread is non-joinable, so it deletes itself. + pool_->NotifyWorkerIsGoingAway(PlatformThread::CurrentHandle()); delete this; } @@ -104,7 +110,8 @@ void WorkerThread::ThreadMain() { // static bool WorkerPool::PostTask(const tracked_objects::Location& from_here, - const base::Closure& task, bool task_is_slow) { + const Closure& task, + bool task_is_slow) { g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); return true; } @@ -114,74 +121,82 @@ bool WorkerPool::RunsTasksOnCurrentThread() { return g_worker_pool_running_on_this_thread.Get().Get(); } +// static +void WorkerPool::ShutDownCleanly() { + g_lazy_worker_pool.Pointer()->ShutDownCleanly(); +} + PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, - int idle_seconds_before_exit) + TimeDelta idle_time_before_exit) : name_prefix_(name_prefix), - idle_seconds_before_exit_(idle_seconds_before_exit), + idle_time_before_exit_(idle_time_before_exit), pending_tasks_available_cv_(&lock_), num_idle_threads_(0), - terminated_(false) {} + has_pending_cleanup_task_(false), + terminated_(false) { +} PosixDynamicThreadPool::~PosixDynamicThreadPool() { while (!pending_tasks_.empty()) pending_tasks_.pop(); } -void PosixDynamicThreadPool::Terminate() { +void PosixDynamicThreadPool::Terminate(bool blocking) { + std::vector<PlatformThreadHandle> threads_to_cleanup; + std::vector<PlatformThreadHandle> worker_threads; { AutoLock locked(lock_); - DCHECK(!terminated_) << "Thread pool is already terminated."; + if (terminated_) + return; terminated_ = true; + + threads_to_cleanup.swap(threads_to_cleanup_); + worker_threads.swap(worker_threads_); } pending_tasks_available_cv_.Broadcast(); + + if (blocking) { + for (const auto& item : threads_to_cleanup) + PlatformThread::Join(item); + + for (const auto& item : worker_threads) + PlatformThread::Join(item); + + // No need to take the lock. No one else should be accessing these members. + DCHECK_EQ(0u, num_idle_threads_); + // The following members should not have new elements added after + // |terminated_| is set to true. + DCHECK(threads_to_cleanup_.empty()); + DCHECK(worker_threads_.empty()); + } } void PosixDynamicThreadPool::PostTask( const tracked_objects::Location& from_here, - const base::Closure& task) { + const Closure& task) { PendingTask pending_task(from_here, task); - AddTask(&pending_task); -} - -void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { AutoLock locked(lock_); - DCHECK(!terminated_) << - "This thread pool is already terminated. Do not post new tasks."; - - pending_tasks_.push(*pending_task); - pending_task->task.Reset(); - - // We have enough worker threads. - if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { - pending_tasks_available_cv_.Signal(); - } else { - // The new PlatformThread will take ownership of the WorkerThread object, - // which will delete itself on exit. - WorkerThread* worker = - new WorkerThread(name_prefix_, this); - PlatformThread::CreateNonJoinable(0, worker); - } + AddTaskNoLock(&pending_task); } PendingTask PosixDynamicThreadPool::WaitForTask() { AutoLock locked(lock_); if (terminated_) - return PendingTask(FROM_HERE, base::Closure()); + return PendingTask(FROM_HERE, Closure()); if (pending_tasks_.empty()) { // No work available, wait for work. num_idle_threads_++; - if (num_idle_threads_cv_.get()) - num_idle_threads_cv_->Signal(); - pending_tasks_available_cv_.TimedWait( - TimeDelta::FromSeconds(idle_seconds_before_exit_)); + if (num_threads_cv_) + num_threads_cv_->Broadcast(); + pending_tasks_available_cv_.TimedWait(idle_time_before_exit_); num_idle_threads_--; - if (num_idle_threads_cv_.get()) - num_idle_threads_cv_->Signal(); + if (num_threads_cv_) + num_threads_cv_->Broadcast(); if (pending_tasks_.empty()) { - // We waited for work, but there's still no work. Return NULL to signal - // the thread to terminate. - return PendingTask(FROM_HERE, base::Closure()); + // We waited for work, but there's still no work. Return an empty task to + // signal the thread to terminate. + return PendingTask(FROM_HERE, Closure()); } } @@ -190,4 +205,72 @@ PendingTask PosixDynamicThreadPool::WaitForTask() { return pending_task; } +void PosixDynamicThreadPool::NotifyWorkerIsGoingAway( + PlatformThreadHandle worker) { + AutoLock locked(lock_); + if (terminated_) + return; + + auto new_end = std::remove_if(worker_threads_.begin(), worker_threads_.end(), + [worker](PlatformThreadHandle handle) { + return handle.is_equal(worker); + }); + DCHECK_EQ(1, worker_threads_.end() - new_end); + worker_threads_.erase(new_end, worker_threads_.end()); + + threads_to_cleanup_.push_back(worker); + + if (num_threads_cv_) + num_threads_cv_->Broadcast(); + + if (!has_pending_cleanup_task_) { + has_pending_cleanup_task_ = true; + PendingTask pending_task( + FROM_HERE, + base::Bind(&PosixDynamicThreadPool::CleanUpThreads, Unretained(this))); + AddTaskNoLock(&pending_task); + } +} + +void PosixDynamicThreadPool::AddTaskNoLock(PendingTask* pending_task) { + lock_.AssertAcquired(); + + if (terminated_) { + LOG(WARNING) + << "This thread pool is already terminated. Do not post new tasks."; + return; + } + + pending_tasks_.push(*pending_task); + pending_task->task.Reset(); + + // We have enough worker threads. + if (num_idle_threads_ >= + pending_tasks_.size() - (has_pending_cleanup_task_ ? 1 : 0)) { + pending_tasks_available_cv_.Signal(); + } else { + // The new PlatformThread will take ownership of the WorkerThread object, + // which will delete itself on exit. + WorkerThread* worker = new WorkerThread(name_prefix_, this); + PlatformThreadHandle handle; + PlatformThread::Create(0, worker, &handle); + worker_threads_.push_back(handle); + + if (num_threads_cv_) + num_threads_cv_->Broadcast(); + } +} + +void PosixDynamicThreadPool::CleanUpThreads() { + std::vector<PlatformThreadHandle> threads_to_cleanup; + { + AutoLock locked(lock_); + DCHECK(has_pending_cleanup_task_); + has_pending_cleanup_task_ = false; + threads_to_cleanup.swap(threads_to_cleanup_); + } + for (const auto& item : threads_to_cleanup) + PlatformThread::Join(item); +} + } // namespace base diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h index dd0ffb6..d3c4a8f 100644 --- a/base/threading/worker_pool_posix.h +++ b/base/threading/worker_pool_posix.h @@ -5,12 +5,12 @@ // The thread pool used in the POSIX implementation of WorkerPool dynamically // adds threads as necessary to handle all tasks. It keeps old threads around // for a period of time to allow them to be reused. After this waiting period, -// the threads exit. This thread pool uses non-joinable threads, therefore -// worker threads are not joined during process shutdown. This means that -// potentially long running tasks (such as DNS lookup) do not block process -// shutdown, but also means that process shutdown may "leak" objects. Note that -// although PosixDynamicThreadPool spawns the worker threads and manages the -// task queue, it does not own the worker threads. The worker threads ask the +// the threads exit. Unless blocking termination is requested, worker threads +// are not joined during process shutdown. This means that potentially long +// running tasks (such as DNS lookup) do not block process shutdown, but also +// means that process shutdown may "leak" objects. Note that although +// PosixDynamicThreadPool spawns the worker threads and manages the task queue, +// it does not own the worker threads. The worker threads ask the // PosixDynamicThreadPool for work and eventually clean themselves up. The // worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool // instance, which prevents PosixDynamicThreadPool from disappearing before all @@ -26,6 +26,7 @@ #include <queue> #include <string> +#include <vector> #include "base/basictypes.h" #include "base/callback_forward.h" @@ -36,6 +37,7 @@ #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" +#include "base/time/time.h" #include "base/tracked_objects.h" class Task; @@ -48,34 +50,44 @@ class BASE_EXPORT PosixDynamicThreadPool class PosixDynamicThreadPoolPeer; // All worker threads will share the same |name_prefix|. They will exit after - // |idle_seconds_before_exit|. + // |idle_time_before_exit|. PosixDynamicThreadPool(const std::string& name_prefix, - int idle_seconds_before_exit); + TimeDelta idle_time_before_exit); // Indicates that the thread pool is going away. Stops handing out tasks to - // worker threads. Wakes up all the idle threads to let them exit. - void Terminate(); + // worker threads. Wakes up all the idle threads to let them exit. If + // |blocking| is set to true, the call returns after all worker threads have + // quit. + // The second and subsequent calls to this method are ignored, regardless of + // the value of |blocking|. + void Terminate(bool blocking); // Adds |task| to the thread pool. void PostTask(const tracked_objects::Location& from_here, const Closure& task); - // Worker thread method to wait for up to |idle_seconds_before_exit| for more - // work from the thread pool. Returns NULL if no work is available. + // Worker thread method to wait for up to |idle_time_before_exit| for more + // work from the thread pool. Returns an empty task if no work is available. PendingTask WaitForTask(); + // Marks |worker| as dead and enqueues a cleanup task to join dead worker + // threads. Unlike tasks enqueued by PostTask(), cleanup tasks never cause new + // worker threads to be created. + void NotifyWorkerIsGoingAway(PlatformThreadHandle worker); + private: friend class RefCountedThreadSafe<PosixDynamicThreadPool>; - friend class PosixDynamicThreadPoolPeer; ~PosixDynamicThreadPool(); // Adds pending_task to the thread pool. This function will clear // |pending_task->task|. - void AddTask(PendingTask* pending_task); + void AddTaskNoLock(PendingTask* pending_task); + + void CleanUpThreads(); const std::string name_prefix_; - const int idle_seconds_before_exit_; + const TimeDelta idle_time_before_exit_; Lock lock_; // Protects all the variables below. @@ -83,12 +95,20 @@ class BASE_EXPORT PosixDynamicThreadPool // Also used for Broadcast()'ing to worker threads to let them know the pool // is being deleted and they can exit. ConditionVariable pending_tasks_available_cv_; - int num_idle_threads_; - TaskQueue pending_tasks_; + size_t num_idle_threads_; + bool has_pending_cleanup_task_; + std::queue<PendingTask> pending_tasks_; bool terminated_; - // Only used for tests to ensure correct thread ordering. It will always be + + std::vector<PlatformThreadHandle> threads_to_cleanup_; + std::vector<PlatformThreadHandle> worker_threads_; + + // Signaled when idle thread count or living thread count is changed. Please + // note that it won't be signaled when Terminate() is called. + // + // Only used for tests to ensure correct thread ordering. It will always be // NULL in non-test code. - scoped_ptr<ConditionVariable> num_idle_threads_cv_; + scoped_ptr<ConditionVariable> num_threads_cv_; DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool); }; diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc index 354a99c..8d2368f 100644 --- a/base/threading/worker_pool_posix_unittest.cc +++ b/base/threading/worker_pool_posix_unittest.cc @@ -10,8 +10,9 @@ #include "base/callback.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" -#include "base/threading/platform_thread.h" #include "base/synchronization/waitable_event.h" +#include "base/threading/platform_thread.h" +#include "base/time/time.h" #include "testing/gtest/include/gtest/gtest.h" namespace base { @@ -26,15 +27,17 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { ConditionVariable* pending_tasks_available_cv() { return &pool_->pending_tasks_available_cv_; } - const std::queue<PendingTask>& pending_tasks() const { - return pool_->pending_tasks_; + size_t num_pending_tasks() const { return pool_->pending_tasks_.size(); } + size_t num_idle_threads() const { return pool_->num_idle_threads_; } + ConditionVariable* num_threads_cv() { return pool_->num_threads_cv_.get(); } + void set_num_threads_cv(ConditionVariable* cv) { + pool_->num_threads_cv_.reset(cv); } - int num_idle_threads() const { return pool_->num_idle_threads_; } - ConditionVariable* num_idle_threads_cv() { - return pool_->num_idle_threads_cv_.get(); + const std::vector<PlatformThreadHandle>& threads_to_cleanup() const { + return pool_->threads_to_cleanup_; } - void set_num_idle_threads_cv(ConditionVariable* cv) { - pool_->num_idle_threads_cv_.reset(cv); + const std::vector<PlatformThreadHandle>& worker_threads() const { + return pool_->worker_threads_; } private: @@ -45,6 +48,8 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { namespace { +const int64 kDefaultIdleSecondsBeforeExit = 60 * 60; + // IncrementingTask's main purpose is to increment a counter. It also updates a // set of unique thread ids, and signals a ConditionVariable on completion. // Note that since it does not block, there is no way to control the number of @@ -56,10 +61,10 @@ void IncrementingTask(Lock* counter_lock, Lock* unique_threads_lock, std::set<PlatformThreadId>* unique_threads) { { - base::AutoLock locked(*unique_threads_lock); + AutoLock locked(*unique_threads_lock); unique_threads->insert(PlatformThread::CurrentId()); } - base::AutoLock locked(*counter_lock); + AutoLock locked(*counter_lock); (*counter)++; } @@ -73,12 +78,12 @@ struct BlockingIncrementingTaskArgs { Lock* num_waiting_to_start_lock; int* num_waiting_to_start; ConditionVariable* num_waiting_to_start_cv; - base::WaitableEvent* start; + WaitableEvent* start; }; void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) { { - base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock); + AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock); (*args.num_waiting_to_start)++; } args.num_waiting_to_start_cv->Signal(); @@ -90,52 +95,62 @@ void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) { class PosixDynamicThreadPoolTest : public testing::Test { protected: PosixDynamicThreadPoolTest() - : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), - peer_(pool_.get()), - counter_(0), + : counter_(0), num_waiting_to_start_(0), num_waiting_to_start_cv_(&num_waiting_to_start_lock_), start_(true, false) {} - void SetUp() override { - peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); - } - void TearDown() override { // Wake up the idle threads so they can terminate. - if (pool_.get()) pool_->Terminate(); + if (pool_.get()) + pool_->Terminate(false); + } + + void Initialize(TimeDelta idle_time_before_exit) { + pool_ = new PosixDynamicThreadPool("dynamic_pool", idle_time_before_exit); + peer_.reset( + new PosixDynamicThreadPool::PosixDynamicThreadPoolPeer(pool_.get())); + peer_->set_num_threads_cv(new ConditionVariable(peer_->lock())); } void WaitForTasksToStart(int num_tasks) { - base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); + AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); while (num_waiting_to_start_ < num_tasks) { num_waiting_to_start_cv_.Wait(); } } - void WaitForIdleThreads(int num_idle_threads) { - base::AutoLock pool_locked(*peer_.lock()); - while (peer_.num_idle_threads() < num_idle_threads) { - peer_.num_idle_threads_cv()->Wait(); + void WaitForIdleThreads(size_t num_idle_threads) { + AutoLock pool_locked(*peer_->lock()); + while (peer_->num_idle_threads() != num_idle_threads) { + peer_->num_threads_cv()->Wait(); + } + } + + void WaitForLivingThreads(int num_living_threads) { + AutoLock pool_locked(*peer_->lock()); + while (static_cast<int>(peer_->worker_threads().size()) != + num_living_threads) { + peer_->num_threads_cv()->Wait(); } } - base::Closure CreateNewIncrementingTaskCallback() { - return base::Bind(&IncrementingTask, &counter_lock_, &counter_, - &unique_threads_lock_, &unique_threads_); + Closure CreateNewIncrementingTaskCallback() { + return Bind(&IncrementingTask, &counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); } - base::Closure CreateNewBlockingIncrementingTaskCallback() { + Closure CreateNewBlockingIncrementingTaskCallback() { BlockingIncrementingTaskArgs args = { &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, &num_waiting_to_start_lock_, &num_waiting_to_start_, &num_waiting_to_start_cv_, &start_ }; - return base::Bind(&BlockingIncrementingTask, args); + return Bind(&BlockingIncrementingTask, args); } - scoped_refptr<base::PosixDynamicThreadPool> pool_; - base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; + scoped_refptr<PosixDynamicThreadPool> pool_; + scoped_ptr<PosixDynamicThreadPool::PosixDynamicThreadPoolPeer> peer_; Lock counter_lock_; int counter_; Lock unique_threads_lock_; @@ -143,15 +158,17 @@ class PosixDynamicThreadPoolTest : public testing::Test { Lock num_waiting_to_start_lock_; int num_waiting_to_start_; ConditionVariable num_waiting_to_start_cv_; - base::WaitableEvent start_; + WaitableEvent start_; }; } // namespace TEST_F(PosixDynamicThreadPoolTest, Basic) { - EXPECT_EQ(0, peer_.num_idle_threads()); + Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit)); + + EXPECT_EQ(0U, peer_->num_idle_threads()); EXPECT_EQ(0U, unique_threads_.size()); - EXPECT_EQ(0U, peer_.pending_tasks().size()); + EXPECT_EQ(0U, peer_->num_pending_tasks()); // Add one task and wait for it to be completed. pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); @@ -164,6 +181,8 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { } TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { + Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit)); + // Add one task and wait for it to be completed. pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); @@ -178,11 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { WaitForIdleThreads(2); EXPECT_EQ(2U, unique_threads_.size()); - EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, peer_->num_idle_threads()); EXPECT_EQ(3, counter_); } TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { + Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit)); + // Add two blocking tasks. pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); @@ -194,12 +215,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { WaitForIdleThreads(2); EXPECT_EQ(2U, unique_threads_.size()); - EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; + EXPECT_EQ(2U, peer_->num_idle_threads()) << "Existing threads are now idle."; EXPECT_EQ(2, counter_); } TEST_F(PosixDynamicThreadPoolTest, Complex) { - // Add two non blocking tasks and wait for them to finish. + Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit)); + + // Add one non blocking tasks and wait for it to finish. pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); WaitForIdleThreads(1); @@ -214,15 +237,15 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) { WaitForIdleThreads(2); EXPECT_EQ(3, counter_); - EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, peer_->num_idle_threads()); EXPECT_EQ(2U, unique_threads_.size()); // Wake up all idle threads so they can exit. { - base::AutoLock locked(*peer_.lock()); - while (peer_.num_idle_threads() > 0) { - peer_.pending_tasks_available_cv()->Signal(); - peer_.num_idle_threads_cv()->Wait(); + AutoLock locked(*peer_->lock()); + while (peer_->worker_threads().size() > 0) { + peer_->pending_tasks_available_cv()->Signal(); + peer_->num_threads_cv()->Wait(); } } @@ -246,8 +269,77 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) { // be either 2 or 3 unique thread IDs in the set at this stage in the test. EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3) << "unique_threads_.size() = " << unique_threads_.size(); - EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(1U, peer_->num_idle_threads()); EXPECT_EQ(4, counter_); } +TEST_F(PosixDynamicThreadPoolTest, NoNewThreadForCleanup) { + // Let worker threads quit quickly after they are idle. + Initialize(TimeDelta::FromMilliseconds(1)); + + for (size_t i = 0; i < 2; ++i) { + // This will create a worker thread. + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + + WaitForTasksToStart(1); + + PlatformThreadHandle worker; + { + AutoLock locked(*peer_->lock()); + ASSERT_EQ(1u, peer_->worker_threads().size()); + worker = peer_->worker_threads()[0]; + } + + start_.Signal(); + + // Wait for the worker thread to quit. + WaitForLivingThreads(0); + + { + AutoLock locked(*peer_->lock()); + // The thread that just quit is recorded for cleanup. But we don't create + // a worker thread just for doing that. + ASSERT_EQ(1u, peer_->threads_to_cleanup().size()); + EXPECT_TRUE(worker.is_equal(peer_->threads_to_cleanup()[0])); + EXPECT_TRUE(peer_->worker_threads().empty()); + } + } + + pool_->Terminate(true); + + { + AutoLock locked(*peer_->lock()); + EXPECT_TRUE(peer_->threads_to_cleanup().empty()); + EXPECT_TRUE(peer_->worker_threads().empty()); + } +} + +TEST_F(PosixDynamicThreadPoolTest, BlockingTerminate) { + // Let worker threads quit quickly after they are idle. + Initialize(TimeDelta::FromMilliseconds(3)); + + for (size_t i = 0; i < 5; ++i) { + PlatformThread::Sleep(TimeDelta::FromMilliseconds(i)); + for (size_t j = 0; j < 50; ++j) + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + } + + pool_->Terminate(true); + + { + AutoLock locked(*peer_->lock()); + EXPECT_TRUE(peer_->threads_to_cleanup().empty()); + EXPECT_TRUE(peer_->worker_threads().empty()); + } + + int counter = counter_; + EXPECT_GE(5 * 50, counter); + EXPECT_GE(5 * 50u, unique_threads_.size()); + + // Make sure that no threads are still running and trying to modify + // |counter_|. + PlatformThread::Sleep(TimeDelta::FromMilliseconds(10)); + EXPECT_EQ(counter, counter_); +} + } // namespace base diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc index 1b0ade5..563702b 100644 --- a/base/threading/worker_pool_win.cc +++ b/base/threading/worker_pool_win.cc @@ -70,4 +70,10 @@ bool WorkerPool::RunsTasksOnCurrentThread() { return g_worker_pool_running_on_this_thread.Get().Get(); } +// static +void WorkerPool::ShutDownCleanly() { + // TODO(yzshen): implement it. + NOTIMPLEMENTED(); +} + } // namespace base |