summaryrefslogtreecommitdiffstats
path: root/base/threading
diff options
context:
space:
mode:
authoryzshen <yzshen@chromium.org>2015-07-20 12:59:53 -0700
committerCommit bot <commit-bot@chromium.org>2015-07-20 20:02:55 +0000
commitda8f981cd01fc15b878c6bfd692adbd3c151855a (patch)
treeadf209f088b2b000366d6ef6cc3a1c1fc7e2370e /base/threading
parent7b86b940888b925afe52396edf9551ce62bb2660 (diff)
downloadchromium_src-da8f981cd01fc15b878c6bfd692adbd3c151855a.zip
chromium_src-da8f981cd01fc15b878c6bfd692adbd3c151855a.tar.gz
chromium_src-da8f981cd01fc15b878c6bfd692adbd3c151855a.tar.bz2
WorkerPool: Support clean shutdown on POSIX.
BUG=498986 Review URL: https://codereview.chromium.org/1190363002 Cr-Commit-Position: refs/heads/master@{#339493}
Diffstat (limited to 'base/threading')
-rw-r--r--base/threading/worker_pool.h17
-rw-r--r--base/threading/worker_pool_posix.cc195
-rw-r--r--base/threading/worker_pool_posix.h58
-rw-r--r--base/threading/worker_pool_posix_unittest.cc180
-rw-r--r--base/threading/worker_pool_win.cc6
5 files changed, 332 insertions, 124 deletions
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h
index a52a414..f8c6235 100644
--- a/base/threading/worker_pool.h
+++ b/base/threading/worker_pool.h
@@ -22,11 +22,11 @@ class TaskRunner;
// This is a facility that runs tasks that don't require a specific thread or
// a message loop.
//
-// WARNING: This shouldn't be used unless absolutely necessary. We don't wait
-// for the worker pool threads to finish on shutdown, so the tasks running
-// inside the pool must be extremely careful about other objects they access
-// (MessageLoops, Singletons, etc). During shutdown these object may no longer
-// exist.
+// WARNING: This shouldn't be used unless absolutely necessary. Typically
+// (without calling ShutDownCleanly()), we don't wait for the worker pool
+// threads to finish on shutdown, so the tasks running inside the pool must be
+// extremely careful about other objects they access (MessageLoops, Singletons,
+// etc). During shutdown these object may no longer exist.
class BASE_EXPORT WorkerPool {
public:
// This function posts |task| to run on a worker thread. |task_is_slow|
@@ -53,6 +53,13 @@ class BASE_EXPORT WorkerPool {
// Get a TaskRunner wrapper which posts to the WorkerPool using the given
// |task_is_slow| behavior.
static const scoped_refptr<TaskRunner>& GetTaskRunner(bool task_is_slow);
+
+ // Blocks until all worker threads quit cleanly. Please note that it ensures
+ // that no worker threads are running after the method returns, but it doesn't
+ // guarantee to process all queued pending tasks. This method may take a long
+ // time. Please don't use it unless absolutely necessary, e.g., when we want
+ // to unload the library containing the worker pool before process shutdown.
+ static void ShutDownCleanly();
};
} // namespace base
diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc
index 349b5d7..231aa68 100644
--- a/base/threading/worker_pool_posix.cc
+++ b/base/threading/worker_pool_posix.cc
@@ -22,10 +22,10 @@ namespace base {
namespace {
-base::LazyInstance<ThreadLocalBoolean>::Leaky
- g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER;
+LazyInstance<ThreadLocalBoolean>::Leaky g_worker_pool_running_on_this_thread =
+ LAZY_INSTANCE_INITIALIZER;
-const int kIdleSecondsBeforeExit = 10 * 60;
+const int64 kIdleSecondsBeforeExit = 10 * 60;
class WorkerPoolImpl {
public:
@@ -33,49 +33,55 @@ class WorkerPoolImpl {
~WorkerPoolImpl();
void PostTask(const tracked_objects::Location& from_here,
- const base::Closure& task, bool task_is_slow);
+ const Closure& task,
+ bool task_is_slow);
+
+ void ShutDownCleanly();
private:
- scoped_refptr<base::PosixDynamicThreadPool> pool_;
+ scoped_refptr<PosixDynamicThreadPool> pool_;
};
WorkerPoolImpl::WorkerPoolImpl()
- : pool_(new base::PosixDynamicThreadPool("WorkerPool",
- kIdleSecondsBeforeExit)) {
+ : pool_(new PosixDynamicThreadPool(
+ "WorkerPool",
+ TimeDelta::FromSeconds(kIdleSecondsBeforeExit))) {
}
WorkerPoolImpl::~WorkerPoolImpl() {
- pool_->Terminate();
+ pool_->Terminate(false);
}
void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
- const base::Closure& task, bool task_is_slow) {
+ const Closure& task,
+ bool task_is_slow) {
pool_->PostTask(from_here, task);
}
-base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool =
- LAZY_INSTANCE_INITIALIZER;
+void WorkerPoolImpl::ShutDownCleanly() {
+ pool_->Terminate(true);
+}
+
+LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = LAZY_INSTANCE_INITIALIZER;
class WorkerThread : public PlatformThread::Delegate {
public:
- WorkerThread(const std::string& name_prefix,
- base::PosixDynamicThreadPool* pool)
- : name_prefix_(name_prefix),
- pool_(pool) {}
+ WorkerThread(const std::string& name_prefix, PosixDynamicThreadPool* pool)
+ : name_prefix_(name_prefix), pool_(pool) {}
void ThreadMain() override;
private:
const std::string name_prefix_;
- scoped_refptr<base::PosixDynamicThreadPool> pool_;
+ scoped_refptr<PosixDynamicThreadPool> pool_;
DISALLOW_COPY_AND_ASSIGN(WorkerThread);
};
void WorkerThread::ThreadMain() {
g_worker_pool_running_on_this_thread.Get().Set(true);
- const std::string name = base::StringPrintf(
- "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId());
+ const std::string name =
+ StringPrintf("%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId());
// Note |name.c_str()| must remain valid for for the whole life of the thread.
PlatformThread::SetName(name);
@@ -96,7 +102,7 @@ void WorkerThread::ThreadMain() {
pending_task.birth_tally, pending_task.time_posted, stopwatch);
}
- // The WorkerThread is non-joinable, so it deletes itself.
+ pool_->NotifyWorkerIsGoingAway(PlatformThread::CurrentHandle());
delete this;
}
@@ -104,7 +110,8 @@ void WorkerThread::ThreadMain() {
// static
bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
- const base::Closure& task, bool task_is_slow) {
+ const Closure& task,
+ bool task_is_slow) {
g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
return true;
}
@@ -114,74 +121,82 @@ bool WorkerPool::RunsTasksOnCurrentThread() {
return g_worker_pool_running_on_this_thread.Get().Get();
}
+// static
+void WorkerPool::ShutDownCleanly() {
+ g_lazy_worker_pool.Pointer()->ShutDownCleanly();
+}
+
PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix,
- int idle_seconds_before_exit)
+ TimeDelta idle_time_before_exit)
: name_prefix_(name_prefix),
- idle_seconds_before_exit_(idle_seconds_before_exit),
+ idle_time_before_exit_(idle_time_before_exit),
pending_tasks_available_cv_(&lock_),
num_idle_threads_(0),
- terminated_(false) {}
+ has_pending_cleanup_task_(false),
+ terminated_(false) {
+}
PosixDynamicThreadPool::~PosixDynamicThreadPool() {
while (!pending_tasks_.empty())
pending_tasks_.pop();
}
-void PosixDynamicThreadPool::Terminate() {
+void PosixDynamicThreadPool::Terminate(bool blocking) {
+ std::vector<PlatformThreadHandle> threads_to_cleanup;
+ std::vector<PlatformThreadHandle> worker_threads;
{
AutoLock locked(lock_);
- DCHECK(!terminated_) << "Thread pool is already terminated.";
+ if (terminated_)
+ return;
terminated_ = true;
+
+ threads_to_cleanup.swap(threads_to_cleanup_);
+ worker_threads.swap(worker_threads_);
}
pending_tasks_available_cv_.Broadcast();
+
+ if (blocking) {
+ for (const auto& item : threads_to_cleanup)
+ PlatformThread::Join(item);
+
+ for (const auto& item : worker_threads)
+ PlatformThread::Join(item);
+
+ // No need to take the lock. No one else should be accessing these members.
+ DCHECK_EQ(0u, num_idle_threads_);
+ // The following members should not have new elements added after
+ // |terminated_| is set to true.
+ DCHECK(threads_to_cleanup_.empty());
+ DCHECK(worker_threads_.empty());
+ }
}
void PosixDynamicThreadPool::PostTask(
const tracked_objects::Location& from_here,
- const base::Closure& task) {
+ const Closure& task) {
PendingTask pending_task(from_here, task);
- AddTask(&pending_task);
-}
-
-void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
AutoLock locked(lock_);
- DCHECK(!terminated_) <<
- "This thread pool is already terminated. Do not post new tasks.";
-
- pending_tasks_.push(*pending_task);
- pending_task->task.Reset();
-
- // We have enough worker threads.
- if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) {
- pending_tasks_available_cv_.Signal();
- } else {
- // The new PlatformThread will take ownership of the WorkerThread object,
- // which will delete itself on exit.
- WorkerThread* worker =
- new WorkerThread(name_prefix_, this);
- PlatformThread::CreateNonJoinable(0, worker);
- }
+ AddTaskNoLock(&pending_task);
}
PendingTask PosixDynamicThreadPool::WaitForTask() {
AutoLock locked(lock_);
if (terminated_)
- return PendingTask(FROM_HERE, base::Closure());
+ return PendingTask(FROM_HERE, Closure());
if (pending_tasks_.empty()) { // No work available, wait for work.
num_idle_threads_++;
- if (num_idle_threads_cv_.get())
- num_idle_threads_cv_->Signal();
- pending_tasks_available_cv_.TimedWait(
- TimeDelta::FromSeconds(idle_seconds_before_exit_));
+ if (num_threads_cv_)
+ num_threads_cv_->Broadcast();
+ pending_tasks_available_cv_.TimedWait(idle_time_before_exit_);
num_idle_threads_--;
- if (num_idle_threads_cv_.get())
- num_idle_threads_cv_->Signal();
+ if (num_threads_cv_)
+ num_threads_cv_->Broadcast();
if (pending_tasks_.empty()) {
- // We waited for work, but there's still no work. Return NULL to signal
- // the thread to terminate.
- return PendingTask(FROM_HERE, base::Closure());
+ // We waited for work, but there's still no work. Return an empty task to
+ // signal the thread to terminate.
+ return PendingTask(FROM_HERE, Closure());
}
}
@@ -190,4 +205,72 @@ PendingTask PosixDynamicThreadPool::WaitForTask() {
return pending_task;
}
+void PosixDynamicThreadPool::NotifyWorkerIsGoingAway(
+ PlatformThreadHandle worker) {
+ AutoLock locked(lock_);
+ if (terminated_)
+ return;
+
+ auto new_end = std::remove_if(worker_threads_.begin(), worker_threads_.end(),
+ [worker](PlatformThreadHandle handle) {
+ return handle.is_equal(worker);
+ });
+ DCHECK_EQ(1, worker_threads_.end() - new_end);
+ worker_threads_.erase(new_end, worker_threads_.end());
+
+ threads_to_cleanup_.push_back(worker);
+
+ if (num_threads_cv_)
+ num_threads_cv_->Broadcast();
+
+ if (!has_pending_cleanup_task_) {
+ has_pending_cleanup_task_ = true;
+ PendingTask pending_task(
+ FROM_HERE,
+ base::Bind(&PosixDynamicThreadPool::CleanUpThreads, Unretained(this)));
+ AddTaskNoLock(&pending_task);
+ }
+}
+
+void PosixDynamicThreadPool::AddTaskNoLock(PendingTask* pending_task) {
+ lock_.AssertAcquired();
+
+ if (terminated_) {
+ LOG(WARNING)
+ << "This thread pool is already terminated. Do not post new tasks.";
+ return;
+ }
+
+ pending_tasks_.push(*pending_task);
+ pending_task->task.Reset();
+
+ // We have enough worker threads.
+ if (num_idle_threads_ >=
+ pending_tasks_.size() - (has_pending_cleanup_task_ ? 1 : 0)) {
+ pending_tasks_available_cv_.Signal();
+ } else {
+ // The new PlatformThread will take ownership of the WorkerThread object,
+ // which will delete itself on exit.
+ WorkerThread* worker = new WorkerThread(name_prefix_, this);
+ PlatformThreadHandle handle;
+ PlatformThread::Create(0, worker, &handle);
+ worker_threads_.push_back(handle);
+
+ if (num_threads_cv_)
+ num_threads_cv_->Broadcast();
+ }
+}
+
+void PosixDynamicThreadPool::CleanUpThreads() {
+ std::vector<PlatformThreadHandle> threads_to_cleanup;
+ {
+ AutoLock locked(lock_);
+ DCHECK(has_pending_cleanup_task_);
+ has_pending_cleanup_task_ = false;
+ threads_to_cleanup.swap(threads_to_cleanup_);
+ }
+ for (const auto& item : threads_to_cleanup)
+ PlatformThread::Join(item);
+}
+
} // namespace base
diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h
index dd0ffb6..d3c4a8f 100644
--- a/base/threading/worker_pool_posix.h
+++ b/base/threading/worker_pool_posix.h
@@ -5,12 +5,12 @@
// The thread pool used in the POSIX implementation of WorkerPool dynamically
// adds threads as necessary to handle all tasks. It keeps old threads around
// for a period of time to allow them to be reused. After this waiting period,
-// the threads exit. This thread pool uses non-joinable threads, therefore
-// worker threads are not joined during process shutdown. This means that
-// potentially long running tasks (such as DNS lookup) do not block process
-// shutdown, but also means that process shutdown may "leak" objects. Note that
-// although PosixDynamicThreadPool spawns the worker threads and manages the
-// task queue, it does not own the worker threads. The worker threads ask the
+// the threads exit. Unless blocking termination is requested, worker threads
+// are not joined during process shutdown. This means that potentially long
+// running tasks (such as DNS lookup) do not block process shutdown, but also
+// means that process shutdown may "leak" objects. Note that although
+// PosixDynamicThreadPool spawns the worker threads and manages the task queue,
+// it does not own the worker threads. The worker threads ask the
// PosixDynamicThreadPool for work and eventually clean themselves up. The
// worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool
// instance, which prevents PosixDynamicThreadPool from disappearing before all
@@ -26,6 +26,7 @@
#include <queue>
#include <string>
+#include <vector>
#include "base/basictypes.h"
#include "base/callback_forward.h"
@@ -36,6 +37,7 @@
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
#include "base/tracked_objects.h"
class Task;
@@ -48,34 +50,44 @@ class BASE_EXPORT PosixDynamicThreadPool
class PosixDynamicThreadPoolPeer;
// All worker threads will share the same |name_prefix|. They will exit after
- // |idle_seconds_before_exit|.
+ // |idle_time_before_exit|.
PosixDynamicThreadPool(const std::string& name_prefix,
- int idle_seconds_before_exit);
+ TimeDelta idle_time_before_exit);
// Indicates that the thread pool is going away. Stops handing out tasks to
- // worker threads. Wakes up all the idle threads to let them exit.
- void Terminate();
+ // worker threads. Wakes up all the idle threads to let them exit. If
+ // |blocking| is set to true, the call returns after all worker threads have
+ // quit.
+ // The second and subsequent calls to this method are ignored, regardless of
+ // the value of |blocking|.
+ void Terminate(bool blocking);
// Adds |task| to the thread pool.
void PostTask(const tracked_objects::Location& from_here,
const Closure& task);
- // Worker thread method to wait for up to |idle_seconds_before_exit| for more
- // work from the thread pool. Returns NULL if no work is available.
+ // Worker thread method to wait for up to |idle_time_before_exit| for more
+ // work from the thread pool. Returns an empty task if no work is available.
PendingTask WaitForTask();
+ // Marks |worker| as dead and enqueues a cleanup task to join dead worker
+ // threads. Unlike tasks enqueued by PostTask(), cleanup tasks never cause new
+ // worker threads to be created.
+ void NotifyWorkerIsGoingAway(PlatformThreadHandle worker);
+
private:
friend class RefCountedThreadSafe<PosixDynamicThreadPool>;
- friend class PosixDynamicThreadPoolPeer;
~PosixDynamicThreadPool();
// Adds pending_task to the thread pool. This function will clear
// |pending_task->task|.
- void AddTask(PendingTask* pending_task);
+ void AddTaskNoLock(PendingTask* pending_task);
+
+ void CleanUpThreads();
const std::string name_prefix_;
- const int idle_seconds_before_exit_;
+ const TimeDelta idle_time_before_exit_;
Lock lock_; // Protects all the variables below.
@@ -83,12 +95,20 @@ class BASE_EXPORT PosixDynamicThreadPool
// Also used for Broadcast()'ing to worker threads to let them know the pool
// is being deleted and they can exit.
ConditionVariable pending_tasks_available_cv_;
- int num_idle_threads_;
- TaskQueue pending_tasks_;
+ size_t num_idle_threads_;
+ bool has_pending_cleanup_task_;
+ std::queue<PendingTask> pending_tasks_;
bool terminated_;
- // Only used for tests to ensure correct thread ordering. It will always be
+
+ std::vector<PlatformThreadHandle> threads_to_cleanup_;
+ std::vector<PlatformThreadHandle> worker_threads_;
+
+ // Signaled when idle thread count or living thread count is changed. Please
+ // note that it won't be signaled when Terminate() is called.
+ //
+ // Only used for tests to ensure correct thread ordering. It will always be
// NULL in non-test code.
- scoped_ptr<ConditionVariable> num_idle_threads_cv_;
+ scoped_ptr<ConditionVariable> num_threads_cv_;
DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool);
};
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc
index 354a99c..8d2368f 100644
--- a/base/threading/worker_pool_posix_unittest.cc
+++ b/base/threading/worker_pool_posix_unittest.cc
@@ -10,8 +10,9 @@
#include "base/callback.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
-#include "base/threading/platform_thread.h"
#include "base/synchronization/waitable_event.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
@@ -26,15 +27,17 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
ConditionVariable* pending_tasks_available_cv() {
return &pool_->pending_tasks_available_cv_;
}
- const std::queue<PendingTask>& pending_tasks() const {
- return pool_->pending_tasks_;
+ size_t num_pending_tasks() const { return pool_->pending_tasks_.size(); }
+ size_t num_idle_threads() const { return pool_->num_idle_threads_; }
+ ConditionVariable* num_threads_cv() { return pool_->num_threads_cv_.get(); }
+ void set_num_threads_cv(ConditionVariable* cv) {
+ pool_->num_threads_cv_.reset(cv);
}
- int num_idle_threads() const { return pool_->num_idle_threads_; }
- ConditionVariable* num_idle_threads_cv() {
- return pool_->num_idle_threads_cv_.get();
+ const std::vector<PlatformThreadHandle>& threads_to_cleanup() const {
+ return pool_->threads_to_cleanup_;
}
- void set_num_idle_threads_cv(ConditionVariable* cv) {
- pool_->num_idle_threads_cv_.reset(cv);
+ const std::vector<PlatformThreadHandle>& worker_threads() const {
+ return pool_->worker_threads_;
}
private:
@@ -45,6 +48,8 @@ class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
namespace {
+const int64 kDefaultIdleSecondsBeforeExit = 60 * 60;
+
// IncrementingTask's main purpose is to increment a counter. It also updates a
// set of unique thread ids, and signals a ConditionVariable on completion.
// Note that since it does not block, there is no way to control the number of
@@ -56,10 +61,10 @@ void IncrementingTask(Lock* counter_lock,
Lock* unique_threads_lock,
std::set<PlatformThreadId>* unique_threads) {
{
- base::AutoLock locked(*unique_threads_lock);
+ AutoLock locked(*unique_threads_lock);
unique_threads->insert(PlatformThread::CurrentId());
}
- base::AutoLock locked(*counter_lock);
+ AutoLock locked(*counter_lock);
(*counter)++;
}
@@ -73,12 +78,12 @@ struct BlockingIncrementingTaskArgs {
Lock* num_waiting_to_start_lock;
int* num_waiting_to_start;
ConditionVariable* num_waiting_to_start_cv;
- base::WaitableEvent* start;
+ WaitableEvent* start;
};
void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
{
- base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
+ AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
(*args.num_waiting_to_start)++;
}
args.num_waiting_to_start_cv->Signal();
@@ -90,52 +95,62 @@ void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
class PosixDynamicThreadPoolTest : public testing::Test {
protected:
PosixDynamicThreadPoolTest()
- : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
- peer_(pool_.get()),
- counter_(0),
+ : counter_(0),
num_waiting_to_start_(0),
num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
start_(true, false) {}
- void SetUp() override {
- peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
- }
-
void TearDown() override {
// Wake up the idle threads so they can terminate.
- if (pool_.get()) pool_->Terminate();
+ if (pool_.get())
+ pool_->Terminate(false);
+ }
+
+ void Initialize(TimeDelta idle_time_before_exit) {
+ pool_ = new PosixDynamicThreadPool("dynamic_pool", idle_time_before_exit);
+ peer_.reset(
+ new PosixDynamicThreadPool::PosixDynamicThreadPoolPeer(pool_.get()));
+ peer_->set_num_threads_cv(new ConditionVariable(peer_->lock()));
}
void WaitForTasksToStart(int num_tasks) {
- base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
+ AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
while (num_waiting_to_start_ < num_tasks) {
num_waiting_to_start_cv_.Wait();
}
}
- void WaitForIdleThreads(int num_idle_threads) {
- base::AutoLock pool_locked(*peer_.lock());
- while (peer_.num_idle_threads() < num_idle_threads) {
- peer_.num_idle_threads_cv()->Wait();
+ void WaitForIdleThreads(size_t num_idle_threads) {
+ AutoLock pool_locked(*peer_->lock());
+ while (peer_->num_idle_threads() != num_idle_threads) {
+ peer_->num_threads_cv()->Wait();
+ }
+ }
+
+ void WaitForLivingThreads(int num_living_threads) {
+ AutoLock pool_locked(*peer_->lock());
+ while (static_cast<int>(peer_->worker_threads().size()) !=
+ num_living_threads) {
+ peer_->num_threads_cv()->Wait();
}
}
- base::Closure CreateNewIncrementingTaskCallback() {
- return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
- &unique_threads_lock_, &unique_threads_);
+ Closure CreateNewIncrementingTaskCallback() {
+ return Bind(&IncrementingTask, &counter_lock_, &counter_,
+ &unique_threads_lock_, &unique_threads_);
}
- base::Closure CreateNewBlockingIncrementingTaskCallback() {
+ Closure CreateNewBlockingIncrementingTaskCallback() {
BlockingIncrementingTaskArgs args = {
&counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
&num_waiting_to_start_lock_, &num_waiting_to_start_,
&num_waiting_to_start_cv_, &start_
};
- return base::Bind(&BlockingIncrementingTask, args);
+ return Bind(&BlockingIncrementingTask, args);
}
- scoped_refptr<base::PosixDynamicThreadPool> pool_;
- base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
+ scoped_refptr<PosixDynamicThreadPool> pool_;
+ scoped_ptr<PosixDynamicThreadPool::PosixDynamicThreadPoolPeer> peer_;
Lock counter_lock_;
int counter_;
Lock unique_threads_lock_;
@@ -143,15 +158,17 @@ class PosixDynamicThreadPoolTest : public testing::Test {
Lock num_waiting_to_start_lock_;
int num_waiting_to_start_;
ConditionVariable num_waiting_to_start_cv_;
- base::WaitableEvent start_;
+ WaitableEvent start_;
};
} // namespace
TEST_F(PosixDynamicThreadPoolTest, Basic) {
- EXPECT_EQ(0, peer_.num_idle_threads());
+ Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit));
+
+ EXPECT_EQ(0U, peer_->num_idle_threads());
EXPECT_EQ(0U, unique_threads_.size());
- EXPECT_EQ(0U, peer_.pending_tasks().size());
+ EXPECT_EQ(0U, peer_->num_pending_tasks());
// Add one task and wait for it to be completed.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
@@ -164,6 +181,8 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {
}
TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
+ Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit));
+
// Add one task and wait for it to be completed.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
@@ -178,11 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
WaitForIdleThreads(2);
EXPECT_EQ(2U, unique_threads_.size());
- EXPECT_EQ(2, peer_.num_idle_threads());
+ EXPECT_EQ(2U, peer_->num_idle_threads());
EXPECT_EQ(3, counter_);
}
TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
+ Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit));
+
// Add two blocking tasks.
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
@@ -194,12 +215,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
WaitForIdleThreads(2);
EXPECT_EQ(2U, unique_threads_.size());
- EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
+ EXPECT_EQ(2U, peer_->num_idle_threads()) << "Existing threads are now idle.";
EXPECT_EQ(2, counter_);
}
TEST_F(PosixDynamicThreadPoolTest, Complex) {
- // Add two non blocking tasks and wait for them to finish.
+ Initialize(TimeDelta::FromSeconds(kDefaultIdleSecondsBeforeExit));
+
+ // Add one non blocking tasks and wait for it to finish.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
WaitForIdleThreads(1);
@@ -214,15 +237,15 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) {
WaitForIdleThreads(2);
EXPECT_EQ(3, counter_);
- EXPECT_EQ(2, peer_.num_idle_threads());
+ EXPECT_EQ(2U, peer_->num_idle_threads());
EXPECT_EQ(2U, unique_threads_.size());
// Wake up all idle threads so they can exit.
{
- base::AutoLock locked(*peer_.lock());
- while (peer_.num_idle_threads() > 0) {
- peer_.pending_tasks_available_cv()->Signal();
- peer_.num_idle_threads_cv()->Wait();
+ AutoLock locked(*peer_->lock());
+ while (peer_->worker_threads().size() > 0) {
+ peer_->pending_tasks_available_cv()->Signal();
+ peer_->num_threads_cv()->Wait();
}
}
@@ -246,8 +269,77 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) {
// be either 2 or 3 unique thread IDs in the set at this stage in the test.
EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3)
<< "unique_threads_.size() = " << unique_threads_.size();
- EXPECT_EQ(1, peer_.num_idle_threads());
+ EXPECT_EQ(1U, peer_->num_idle_threads());
EXPECT_EQ(4, counter_);
}
+TEST_F(PosixDynamicThreadPoolTest, NoNewThreadForCleanup) {
+ // Let worker threads quit quickly after they are idle.
+ Initialize(TimeDelta::FromMilliseconds(1));
+
+ for (size_t i = 0; i < 2; ++i) {
+ // This will create a worker thread.
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
+
+ WaitForTasksToStart(1);
+
+ PlatformThreadHandle worker;
+ {
+ AutoLock locked(*peer_->lock());
+ ASSERT_EQ(1u, peer_->worker_threads().size());
+ worker = peer_->worker_threads()[0];
+ }
+
+ start_.Signal();
+
+ // Wait for the worker thread to quit.
+ WaitForLivingThreads(0);
+
+ {
+ AutoLock locked(*peer_->lock());
+ // The thread that just quit is recorded for cleanup. But we don't create
+ // a worker thread just for doing that.
+ ASSERT_EQ(1u, peer_->threads_to_cleanup().size());
+ EXPECT_TRUE(worker.is_equal(peer_->threads_to_cleanup()[0]));
+ EXPECT_TRUE(peer_->worker_threads().empty());
+ }
+ }
+
+ pool_->Terminate(true);
+
+ {
+ AutoLock locked(*peer_->lock());
+ EXPECT_TRUE(peer_->threads_to_cleanup().empty());
+ EXPECT_TRUE(peer_->worker_threads().empty());
+ }
+}
+
+TEST_F(PosixDynamicThreadPoolTest, BlockingTerminate) {
+ // Let worker threads quit quickly after they are idle.
+ Initialize(TimeDelta::FromMilliseconds(3));
+
+ for (size_t i = 0; i < 5; ++i) {
+ PlatformThread::Sleep(TimeDelta::FromMilliseconds(i));
+ for (size_t j = 0; j < 50; ++j)
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
+ }
+
+ pool_->Terminate(true);
+
+ {
+ AutoLock locked(*peer_->lock());
+ EXPECT_TRUE(peer_->threads_to_cleanup().empty());
+ EXPECT_TRUE(peer_->worker_threads().empty());
+ }
+
+ int counter = counter_;
+ EXPECT_GE(5 * 50, counter);
+ EXPECT_GE(5 * 50u, unique_threads_.size());
+
+ // Make sure that no threads are still running and trying to modify
+ // |counter_|.
+ PlatformThread::Sleep(TimeDelta::FromMilliseconds(10));
+ EXPECT_EQ(counter, counter_);
+}
+
} // namespace base
diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc
index 1b0ade5..563702b 100644
--- a/base/threading/worker_pool_win.cc
+++ b/base/threading/worker_pool_win.cc
@@ -70,4 +70,10 @@ bool WorkerPool::RunsTasksOnCurrentThread() {
return g_worker_pool_running_on_this_thread.Get().Get();
}
+// static
+void WorkerPool::ShutDownCleanly() {
+ // TODO(yzshen): implement it.
+ NOTIMPLEMENTED();
+}
+
} // namespace base