diff options
author | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-01 05:42:34 +0000 |
---|---|---|
committer | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-01 05:42:34 +0000 |
commit | 098def22390dd628396e0d962235fc06e935282e (patch) | |
tree | b5fedd4b011fe5eabf61bc638c0287dddf074fb4 /base/threading | |
parent | ed4c57531d269151dfad2f3aa3378ab43db562d0 (diff) | |
download | chromium_src-098def22390dd628396e0d962235fc06e935282e.zip chromium_src-098def22390dd628396e0d962235fc06e935282e.tar.gz chromium_src-098def22390dd628396e0d962235fc06e935282e.tar.bz2 |
Add a sequenced worker pool.
This allows tasks to be put in the worker pool with optional sequencing semantics for consumers that must run a bunch of stuff in order on a background thread, but don't particularly care about which thread.
Review URL: http://codereview.chromium.org/8416019
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@116078 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/threading')
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 630 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 206 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_unittest.cc | 405 |
3 files changed, 1241 insertions, 0 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc new file mode 100644 index 0000000..806d3b7 --- /dev/null +++ b/base/threading/sequenced_worker_pool.cc @@ -0,0 +1,630 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/sequenced_worker_pool.h" + +#include <deque> +#include <set> + +#include "base/atomicops.h" +#include "base/bind.h" +#include "base/memory/scoped_ptr.h" +#include "base/metrics/histogram.h" +#include "base/stringprintf.h" +#include "base/synchronization/condition_variable.h" +#include "base/threading/simple_thread.h" +#include "base/threading/thread.h" + +namespace base { + +namespace { + +struct SequencedTask { + int sequence_token_id; + SequencedWorkerPool::WorkerShutdown shutdown_behavior; + tracked_objects::Location location; + base::Closure task; +}; + +} // namespace + +// Worker --------------------------------------------------------------------- + +class SequencedWorkerPool::Worker : public base::SimpleThread { + public: + Worker(SequencedWorkerPool::Inner* inner, + int thread_number, + const std::string& thread_name_prefix); + ~Worker(); + + // SimpleThread implementation. This actually runs the background thread. + virtual void Run(); + + private: + SequencedWorkerPool::Inner* inner_; + SequencedWorkerPool::WorkerShutdown current_shutdown_mode_; + + DISALLOW_COPY_AND_ASSIGN(Worker); +}; + + +// Inner ---------------------------------------------------------------------- + +class SequencedWorkerPool::Inner + : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { + public: + Inner(size_t max_threads, const std::string& thread_name_prefix); + virtual ~Inner(); + + // Backends for SequenceWorkerPool. + SequenceToken GetSequenceToken(); + SequenceToken GetNamedSequenceToken(const std::string& name); + bool PostTask(int sequence_token_id, + SequencedWorkerPool::WorkerShutdown shutdown_behavior, + const tracked_objects::Location& from_here, + const base::Closure& task); + void Shutdown(); + void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); + + // Runs the worker loop on the background thread. + void ThreadLoop(Worker* this_worker); + + private: + // The calling code should clear the given delete_these_oustide_lock + // vector the next time the lock is released. See the implementation for + // a more detailed description. + bool GetWork(SequencedTask* task, + std::vector<base::Closure>* delete_these_outside_lock); + + // Peforms init and cleanup around running the given task. WillRun... + // returns the value from PrepareToStartAdditionalThreadIfNecessary. + // The calling code should call FinishStartingAdditionalThread once the + // lock is released if the return values is nonzero. + int WillRunWorkerTask(const SequencedTask& task); + void DidRunWorkerTask(const SequencedTask& task); + + // Returns true if there are no threads currently running the given + // sequence token. + bool IsSequenceTokenRunnable(int sequence_token_id) const; + + // Checks if all threads are busy and the addition of one more could run an + // additional task waiting in the queue. This must be called from within + // the lock. + // + // If another thread is helpful, this will mark the thread as being in the + // process of starting and returns the index of the new thread which will be + // 0 or more. The caller should then call FinishStartingAdditionalThread to + // complete initialization once the lock is released. + // + // If another thread is not necessary, returne 0; + // + // See the implementedion for more. + int PrepareToStartAdditionalThreadIfHelpful(); + + // The second part of thread creation after + // PrepareToStartAdditionalThreadIfHelpful with the thread number it + // generated. This actually creates the thread and should be called outside + // the lock to avoid blocking important work starting a thread in the lock. + void FinishStartingAdditionalThread(int thread_number); + + // Checks whether there is work left that's blocking shutdown. Must be + // called inside the lock. + bool CanShutdown() const; + + // The last sequence number used. Managed by GetSequenceToken, since this + // only does threadsafe increment operations, you do not need to hold the + // lock. + volatile base::subtle::Atomic32 last_sequence_number_; + + // This lock protects |everything in this class|. Do not read or modify + // anything without holding this lock. Do not block while holding this + // lock. + base::Lock lock_; + + // Condition variable used to wake up worker threads when a task is runnable. + base::ConditionVariable cond_var_; + + // The maximum number of worker threads we'll create. + size_t max_threads_; + + std::string thread_name_prefix_; + + // Associates all known sequence token names with their IDs. + std::map<std::string, int> named_sequence_tokens_; + + // Owning pointers to all threads we've created so far. Since we lazily + // create threads, this may be less than max_threads_ and will be initially + // empty. + std::vector<linked_ptr<Worker> > threads_; + + // Set to true when we're in the process of creating another thread. + // See PrepareToStartAdditionalThreadIfHelpful for more. + bool thread_being_created_; + + // Number of threads currently waiting for work. + size_t waiting_thread_count_; + + // Number of threads currently running tasks that have the BLOCK_SHUTDOWN + // flag set. + size_t blocking_shutdown_thread_count_; + + // In-order list of all pending tasks. These are tasks waiting for a thread + // to run on or that are blocked on a previous task in their sequence. + // + // We maintain the pending_task_count_ separately for metrics because + // list.size() can be linear time. + std::list<SequencedTask> pending_tasks_; + size_t pending_task_count_; + + // Number of tasks in the pending_tasks_ list that are marked as blocking + // shutdown. + size_t blocking_shutdown_pending_task_count_; + + // Lists all sequence tokens currently executing. + std::set<int> current_sequences_; + + // Set when the app is terminating and no further tasks should be allowed, + // though we may still be running existing tasks. + bool terminating_; + + // Set when Shutdown is called to do some assertions. + bool shutdown_called_; + + SequencedWorkerPool::TestingObserver* testing_observer_; +}; + +SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner, + int thread_number, + const std::string& prefix) + : base::SimpleThread( + prefix + StringPrintf("Worker%d", thread_number).c_str()), + inner_(inner), + current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) { + Start(); +} + +SequencedWorkerPool::Worker::~Worker() { +} + +void SequencedWorkerPool::Worker::Run() { + // Just jump back to the Inner object to run the thread, since it has all the + // tracking information and queues. It might be more natural to implement + // using DelegateSimpleThread and have Inner implement the Delegate to avoid + // having these worker objects at all, but that method lacks the ability to + // send thread-specific information easily to the thread loop. + inner_->ThreadLoop(this); +} + +SequencedWorkerPool::Inner::Inner(size_t max_threads, + const std::string& thread_name_prefix) + : last_sequence_number_(0), + lock_(), + cond_var_(&lock_), + max_threads_(max_threads), + thread_name_prefix_(thread_name_prefix), + thread_being_created_(false), + waiting_thread_count_(0), + blocking_shutdown_thread_count_(0), + pending_task_count_(0), + blocking_shutdown_pending_task_count_(0), + terminating_(false), + shutdown_called_(false) { +} + +SequencedWorkerPool::Inner::~Inner() { + // You must call Shutdown() before destroying the pool. + DCHECK(shutdown_called_); + + // Need to explicitly join with the threads before they're destroyed or else + // they will be running when our object is half torn down. + for (size_t i = 0; i < threads_.size(); i++) + threads_[i]->Join(); + threads_.clear(); +} + +SequencedWorkerPool::SequenceToken +SequencedWorkerPool::Inner::GetSequenceToken() { + base::subtle::Atomic32 result = + base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); + return SequenceToken(static_cast<int>(result)); +} + +SequencedWorkerPool::SequenceToken +SequencedWorkerPool::Inner::GetNamedSequenceToken( + const std::string& name) { + base::AutoLock lock(lock_); + std::map<std::string, int>::const_iterator found = + named_sequence_tokens_.find(name); + if (found != named_sequence_tokens_.end()) + return SequenceToken(found->second); // Got an existing one. + + // Create a new one for this name. + SequenceToken result = GetSequenceToken(); + named_sequence_tokens_.insert(std::make_pair(name, result.id_)); + return result; +} + +bool SequencedWorkerPool::Inner::PostTask( + int sequence_token_id, + SequencedWorkerPool::WorkerShutdown shutdown_behavior, + const tracked_objects::Location& from_here, + const base::Closure& task) { + SequencedTask sequenced; + sequenced.sequence_token_id = sequence_token_id; + sequenced.shutdown_behavior = shutdown_behavior; + sequenced.location = from_here; + sequenced.task = task; + + int create_thread_id = 0; + { + base::AutoLock lock(lock_); + if (terminating_) + return false; + + pending_tasks_.push_back(sequenced); + pending_task_count_++; + if (shutdown_behavior == BLOCK_SHUTDOWN) + blocking_shutdown_pending_task_count_++; + + create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); + } + + // Actually start the additional thread or signal an existing one now that + // we're outside the lock. + if (create_thread_id) + FinishStartingAdditionalThread(create_thread_id); + else + cond_var_.Signal(); + + return true; +} + +void SequencedWorkerPool::Inner::Shutdown() { + if (shutdown_called_) + return; + shutdown_called_ = true; + + // Mark us as terminated and go through and drop all tasks that aren't + // required to run on shutdown. Since no new tasks will get posted once the + // terminated flag is set, this ensures that all remaining tasks are required + // for shutdown whenever the termianted_ flag is set. + { + base::AutoLock lock(lock_); + DCHECK(!terminating_); + terminating_ = true; + + // Tickle the threads. This will wake up a waiting one so it will know that + // it can exit, which in turn will wake up any other waiting ones. + cond_var_.Signal(); + + // There are no pending or running tasks blocking shutdown, we're done. + if (CanShutdown()) + return; + } + + // If we get here, we know we're either waiting on a blocking task that's + // currently running, waiting on a blocking task that hasn't been scheduled + // yet, or both. Block on the "queue empty" event to know when all tasks are + // complete. This must be done outside the lock. + if (testing_observer_) + testing_observer_->WillWaitForShutdown(); + + base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now(); + + // Wait for no more tasks. + { + base::AutoLock lock(lock_); + while (!CanShutdown()) + cond_var_.Wait(); + } + UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", + base::TimeTicks::Now() - shutdown_wait_begin); +} + +void SequencedWorkerPool::Inner::SetTestingObserver( + SequencedWorkerPool::TestingObserver* observer) { + base::AutoLock lock(lock_); + testing_observer_ = observer; +} + +void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { + { + base::AutoLock lock(lock_); + DCHECK(thread_being_created_); + thread_being_created_ = false; + threads_.push_back(linked_ptr<Worker>(this_worker)); + + while (true) { + // See GetWork for what delete_these_outside_lock is doing. + SequencedTask task; + std::vector<base::Closure> delete_these_outside_lock; + if (GetWork(&task, &delete_these_outside_lock)) { + int new_thread_id = WillRunWorkerTask(task); + { + base::AutoUnlock unlock(lock_); + cond_var_.Signal(); + delete_these_outside_lock.clear(); + + // Complete thread creation outside the lock if necessary. + if (new_thread_id) + FinishStartingAdditionalThread(new_thread_id); + + task.task.Run(); + + // Make sure our task is erased outside the lock for the same reason + // we do this with delete_these_oustide_lock. + task.task = base::Closure(); + } + DidRunWorkerTask(task); // Must be done inside the lock. + } else { + // When we're terminating and there's no more work, we can shut down. + // You can't get more tasks posted once terminating_ is set. There may + // be some tasks stuck behind running ones with the same sequence + // token, but additional threads won't help this case. + if (terminating_) + break; + waiting_thread_count_++; + cond_var_.Wait(); + waiting_thread_count_--; + } + } + } + + // We noticed we should exit. Wake up the next worker so it knows it should + // exit as well (because the Shutdown() code only signals once). + cond_var_.Signal(); +} + +bool SequencedWorkerPool::Inner::GetWork( + SequencedTask* task, + std::vector<base::Closure>* delete_these_outside_lock) { + lock_.AssertAcquired(); + + DCHECK_EQ(pending_tasks_.size(), pending_task_count_); + UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", + static_cast<int>(pending_task_count_)); + + // Find the next task with a sequence token that's not currently in use. + // If the token is in use, that means another thread is running something + // in that sequence, and we can't run it without going out-of-order. + // + // This algorithm is simple and fair, but inefficient in some cases. For + // example, say somebody schedules 1000 slow tasks with the same sequence + // number. We'll have to go through all those tasks each time we feel like + // there might be work to schedule. If this proves to be a problem, we + // should make this more efficient. + // + // One possible enhancement would be to keep a map from sequence ID to a + // list of pending but currently blocked SequencedTasks for that ID. + // When a worker finishes a task of one sequence token, it can pick up the + // next one from that token right away. + // + // This may lead to starvation if there are sufficient numbers of sequences + // in use. To alleviate this, we could add an incrementing priority counter + // to each SequencedTask. Then maintain a priority_queue of all runnable + // tasks, sorted by priority counter. When a sequenced task is completed + // we would pop the head element off of that tasks pending list and add it + // to the priority queue. Then we would run the first item in the priority + // queue. + bool found_task = false; + int unrunnable_tasks = 0; + std::list<SequencedTask>::iterator i = pending_tasks_.begin(); + while (i != pending_tasks_.end()) { + if (!IsSequenceTokenRunnable(i->sequence_token_id)) { + unrunnable_tasks++; + ++i; + continue; + } + + if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { + // We're shutting down and the task we just found isn't blocking + // shutdown. Delete it and get more work. + // + // Note that we do not want to delete unrunnable tasks. Deleting a task + // can have side effects (like freeing some objects) and deleting a + // task that's supposed to run after one that's currently running could + // cause an obscure crash. + // + // We really want to delete these tasks outside the lock in case the + // closures are holding refs to objects that want to post work from + // their destructorss (which would deadlock). The closures are + // internally refcounted, so we just need to keep a copy of them alive + // until the lock is exited. The calling code can just clear() the + // vector they passed to us once the lock is exited to make this + // happen. + delete_these_outside_lock->push_back(i->task); + i = pending_tasks_.erase(i); + pending_task_count_--; + } else { + // Found a runnable task. + *task = *i; + i = pending_tasks_.erase(i); + pending_task_count_--; + if (task->shutdown_behavior == BLOCK_SHUTDOWN) + blocking_shutdown_pending_task_count_--; + + found_task = true; + break; + } + } + + // Track the number of tasks we had to skip over to see if we should be + // making this more efficient. If this number ever becomes large or is + // frequently "some", we should consider the optimization above. + UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", + unrunnable_tasks); + return found_task; +} + +int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { + lock_.AssertAcquired(); + + // Mark the task's sequence number as in use. + if (task.sequence_token_id) + current_sequences_.insert(task.sequence_token_id); + + if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) + blocking_shutdown_thread_count_++; + + // We just picked up a task. Since StartAdditionalThreadIfHelpful only + // creates a new thread if there is no free one, there is a race when posting + // tasks that many tasks could have been posted before a thread started + // running them, so only one thread would have been created. So we also check + // whether we should create more threads after removing our task from the + // queue, which also has the nice side effect of creating the workers from + // background threads rather than the main thread of the app. + // + // If another thread wasn't created, we want to wake up an existing thread + // if there is one waiting to pick up the next task. + // + // Note that we really need to do this *before* running the task, not + // after. Otherwise, if more than one task is posted, the creation of the + // second thread (since we only create one at a time) will be blocked by + // the execution of the first task, which could be arbitrarily long. + return PrepareToStartAdditionalThreadIfHelpful(); +} + +void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { + lock_.AssertAcquired(); + + if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) { + DCHECK_GT(blocking_shutdown_thread_count_, 0u); + blocking_shutdown_thread_count_--; + } + + if (task.sequence_token_id) + current_sequences_.erase(task.sequence_token_id); +} + +bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( + int sequence_token_id) const { + lock_.AssertAcquired(); + return !sequence_token_id || + current_sequences_.find(sequence_token_id) == + current_sequences_.end(); +} + +int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { + // How thread creation works: + // + // We'de like to avoid creating threads with the lock held. However, we + // need to be sure that we have an accurate accounting of the threads for + // proper Joining and deltion on shutdown. + // + // We need to figure out if we need another thread with the lock held, which + // is what this function does. It then marks us as in the process of creating + // a thread. When we do shutdown, we wait until the thread_being_created_ + // flag is cleared, which ensures that the new thread is properly added to + // all the data structures and we can't leak it. Once shutdown starts, we'll + // refuse to create more threads or they would be leaked. + // + // Note that this creates a mostly benign race condition on shutdown that + // will cause fewer workers to be created than one would expect. It isn't + // much of an issue in real life, but affects some tests. Since we only spawn + // one worker at a time, the following sequence of events can happen: + // + // 1. Main thread posts a bunch of unrelated tasks that would normally be + // run on separate threads. + // 2. The first task post causes us to start a worker. Other tasks do not + // cause a worker to start since one is pending. + // 3. Main thread initiates shutdown. + // 4. No more threads are created since the terminating_ flag is set. + // + // The result is that one may expect that max_threads_ workers to be created + // given the workload, but in reality fewer may be created because the + // sequence of thread creation on the background threads is racing with the + // shutdown call. + if (!terminating_ && + !thread_being_created_ && + threads_.size() < max_threads_ && + waiting_thread_count_ == 0) { + // We could use an additional thread if there's work to be done. + for (std::list<SequencedTask>::iterator i = pending_tasks_.begin(); + i != pending_tasks_.end(); ++i) { + if (IsSequenceTokenRunnable(i->sequence_token_id)) { + // Found a runnable task, mark the thread as being started. + thread_being_created_ = true; + return static_cast<int>(threads_.size() + 1); + } + } + } + return 0; +} + +void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( + int thread_number) { + // Called outside of the lock. + DCHECK(thread_number > 0); + + // The worker is assigned to the list when the thread actually starts, which + // will manage the memory of the pointer. + new Worker(this, thread_number, thread_name_prefix_); +} + +bool SequencedWorkerPool::Inner::CanShutdown() const { + lock_.AssertAcquired(); + // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. + return !thread_being_created_ && + blocking_shutdown_thread_count_ == 0 && + blocking_shutdown_pending_task_count_ == 0; +} + +// SequencedWorkerPool -------------------------------------------------------- + +SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, + const std::string& thread_name_prefix) + : inner_(new Inner(max_threads, thread_name_prefix)) { +} + +SequencedWorkerPool::~SequencedWorkerPool() { +} + +SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { + return inner_->GetSequenceToken(); +} + +SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( + const std::string& name) { + return inner_->GetNamedSequenceToken(name); +} + +bool SequencedWorkerPool::PostWorkerTask( + const tracked_objects::Location& from_here, + const base::Closure& task) { + return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task); +} + +bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( + const tracked_objects::Location& from_here, + const base::Closure& task, + WorkerShutdown shutdown_behavior) { + return inner_->PostTask(0, shutdown_behavior, from_here, task); +} + +bool SequencedWorkerPool::PostSequencedWorkerTask( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const base::Closure& task) { + return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN, + from_here, task); +} + +bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const base::Closure& task, + WorkerShutdown shutdown_behavior) { + return inner_->PostTask(sequence_token.id_, shutdown_behavior, + from_here, task); +} + +void SequencedWorkerPool::Shutdown() { + inner_->Shutdown(); +} + +void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { + inner_->SetTestingObserver(observer); +} + +} // namespace base diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h new file mode 100644 index 0000000..c6e0560 --- /dev/null +++ b/base/threading/sequenced_worker_pool.h @@ -0,0 +1,206 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ +#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ +#pragma once + +#include <string> + +#include "base/callback.h" +#include "base/memory/linked_ptr.h" +#include "base/memory/ref_counted.h" +#include "base/tracked_objects.h" +#include "base/base_export.h" + +namespace base { + +// A worker thread pool that enforces ordering between sets of tasks. It also +// allows you to specify what should happen to your tasks on shutdown. +// +// To enforce ordering, get a unique sequence token from the pool and post all +// tasks you want to order with the token. All tasks with the same token are +// guaranteed to execute serially, though not necessarily on the same thread. +// +// Example: +// SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); +// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, +// FROM_HERE, base::Bind(...)); +// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, +// FROM_HERE, base::Bind(...)); +// +// You can make named sequence tokens to make it easier to share a token +// across different components. +// +// You can also post tasks to the pool without ordering using PostWorkerTask. +// These will be executed in an unspecified order. The order of execution +// between tasks with different sequence tokens is also unspecified. +// +// This class is designed to be leaked on shutdown to allow the +// CONTINUE_ON_SHUTDOWN behavior to be implemented. To enforce the +// BLOCK_SHUTDOWN behavior, you must call Shutdown() which will wait until +// the necessary tasks have completed. +// +// Implementation note: This does not use a base::WorkerPool since that does +// not enforce shutdown semantics or allow us to specify how many worker +// threads to run. For the typical use case of random background work, we don't +// necessarily want to be super aggressive about creating threads. +class BASE_EXPORT SequencedWorkerPool { + public: + // Defines what should happen to a task posted to the worker pool on shutdown. + enum WorkerShutdown { + // Tasks posted with this mode which have not run at shutdown will be + // deleted rather than run, and any tasks with this mode running at + // shutdown will be ignored (the worker thread will not be joined). + // + // This option provides a nice way to post stuff you don't want blocking + // shutdown. For example, you might be doing a slow DNS lookup and if it's + // blocked on the OS, you may not want to stop shutdown, since the result + // doesn't really matter at that point. + // + // However, you need to be very careful what you do in your callback when + // you use this option. Since the thread will continue to run until the OS + // terminates the process, the app can be in the process of tearing down + // when you're running. This means any singletons or global objects you + // use may suddenly become invalid out from under you. For this reason, + // it's best to use this only for slow but simple operations like the DNS + // example. + CONTINUE_ON_SHUTDOWN, + + // Tasks posted with this mode that have not started executing at shutdown + // will be deleted rather than executed. However, tasks already in progress + // will be completed. + SKIP_ON_SHUTDOWN, + + // Tasks posted with this mode will block browser shutdown until they're + // executed. Since this can have significant performance implications, use + // sparingly. + // + // Generally, this should be used only for user data, for example, a task + // writing a preference file. + // + // If a task is posted during shutdown, it will not get run since the + // workers may already be stopped. In this case, the post operation will + // fail (return false) and the task will be deleted. + BLOCK_SHUTDOWN, + }; + + // Opaque identifier that defines sequencing of tasks posted to the worker + // pool. See NewSequenceToken(). + class SequenceToken { + public: + explicit SequenceToken() : id_(0) {} + ~SequenceToken() {} + + bool Equals(const SequenceToken& other) const { + return id_ == other.id_; + } + + private: + friend class SequencedWorkerPool; + + SequenceToken(int id) : id_(id) {} + + int id_; + }; + + // Allows tests to perform certain actions. + class TestingObserver { + public: + virtual ~TestingObserver() {} + virtual void WillWaitForShutdown() = 0; + }; + + // Pass the maximum number of threads (they will be lazily created as needed) + // and a prefix for the thread name to ad in debugging. + SequencedWorkerPool(size_t max_threads, + const std::string& thread_name_prefix); + ~SequencedWorkerPool(); + + // Returns a unique token that can be used to sequence tasks posted to + // PostSequencedWorkerTask(). Valid tokens are alwys nonzero. + SequenceToken GetSequenceToken(); + + // Returns the sequence token associated with the given name. Calling this + // function multiple times with the same string will always produce the + // same sequence token. If the name has not been used before, a new token + // will be created. + SequenceToken GetNamedSequenceToken(const std::string& name); + + // Posts the given task for execution in the worker pool. Tasks posted with + // this function will execute in an unspecified order on a background thread. + // Returns true if the task was posted. If your tasks have ordering + // requirements, see PostSequencedWorkerTask(). + // + // This class will attempt to delete tasks that aren't run + // (non-block-shutdown semantics) but can't guarantee that this happens. If + // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there + // will be no workers available to delete these tasks. And there may be + // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN + // tasks. Deleting those tasks before the previous one has completed could + // cause nondeterministic crashes because the task could be keeping some + // objects alive which do work in their destructor, which could voilate the + // assumptions of the running task. + // + // The task will be guaranteed to run to completion before shutdown + // (BLOCK_SHUTDOWN semantics). + // + // Returns true if the task was posted successfully. This may fail during + // shutdown regardless of the specified ShutdownBehavior. + bool PostWorkerTask(const tracked_objects::Location& from_here, + const base::Closure& task); + + // Same as PostWorkerTask but allows specification of the shutdown behavior. + bool PostWorkerTaskWithShutdownBehavior( + const tracked_objects::Location& from_here, + const base::Closure& task, + WorkerShutdown shutdown_behavior); + + // Like PostWorkerTask above, but provides sequencing semantics. This means + // that tasks posted with the same sequence token (see GetSequenceToken()) + // are guaranteed to execute in order. This is useful in cases where you're + // doing operations that may depend on previous ones, like appending to a + // file. + // + // The task will be guaranteed to run to completion before shutdown + // (BLOCK_SHUTDOWN semantics). + // + // Returns true if the task was posted successfully. This may fail during + // shutdown regardless of the specified ShutdownBehavior. + bool PostSequencedWorkerTask(SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const base::Closure& task); + + // Same as PostSequencedWorkerTask but allows specification of the shutdown + // behavior. + bool PostSequencedWorkerTaskWithShutdownBehavior( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const base::Closure& task, + WorkerShutdown shutdown_behavior); + + // Implements the worker pool shutdown. This should be called during app + // shutdown, and will discard/join with appropriate tasks before returning. + // After this call, subsequent calls to post tasks will fail. + void Shutdown(); + + // Called by tests to set the testing observer. This is NULL by default + // and ownership of the pointer is kept with the caller. + void SetTestingObserver(TestingObserver* observer); + + private: + class Inner; + class Worker; + + friend class Inner; + friend class Worker; + + scoped_refptr<Inner> inner_; + + DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); +}; + +} // namespace base + +#endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc new file mode 100644 index 0000000..a3f26ec --- /dev/null +++ b/base/threading/sequenced_worker_pool_unittest.cc @@ -0,0 +1,405 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <algorithm> + +#include "base/bind.h" +#include "base/memory/ref_counted.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "base/threading/sequenced_worker_pool.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// IMPORTANT NOTE: +// +// Many of these tests have failure modes where they'll hang forever. These +// tests should not be flaky, and hangling indicates a type of failure. Do not +// mark as flaky if they're hanging, it's likely an actual bug. + +namespace { + +const size_t kNumWorkerThreads = 3; + +// Allows a number of threads to all be blocked on the same event, and +// provides a way to unblock a certain number of them. +class ThreadBlocker { + public: + ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) { + } + + void Block() { + { + base::AutoLock lock(lock_); + while (unblock_counter_ == 0) + cond_var_.Wait(); + unblock_counter_--; + } + cond_var_.Signal(); + } + + void Unblock(size_t count) { + { + base::AutoLock lock(lock_); + DCHECK(unblock_counter_ == 0); + unblock_counter_ = count; + } + cond_var_.Signal(); + } + + private: + base::Lock lock_; + base::ConditionVariable cond_var_; + + size_t unblock_counter_; +}; + +class TestTracker : public base::RefCountedThreadSafe<TestTracker> { + public: + TestTracker() + : lock_(), + cond_var_(&lock_), + started_events_(0) { + } + + // Each of these tasks appends the argument to the complete sequence vector + // so calling code can see what order they finished in. + void FastTask(int id) { + SignalWorkerDone(id); + } + void SlowTask(int id) { + base::PlatformThread::Sleep(1000); + SignalWorkerDone(id); + } + + void BlockTask(int id, ThreadBlocker* blocker) { + // Note that this task has started and signal anybody waiting for that + // to happen. + { + base::AutoLock lock(lock_); + started_events_++; + } + cond_var_.Signal(); + + blocker->Block(); + SignalWorkerDone(id); + } + + // Waits until the given number of tasks have started executing. + void WaitUntilTasksBlocked(size_t count) { + { + base::AutoLock lock(lock_); + while (started_events_ < count) + cond_var_.Wait(); + } + cond_var_.Signal(); + } + + // Blocks the current thread until at least the given number of tasks are in + // the completed vector, and then returns a copy. + std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { + std::vector<int> ret; + { + base::AutoLock lock(lock_); + while (complete_sequence_.size() < num_tasks) + cond_var_.Wait(); + ret = complete_sequence_; + } + cond_var_.Signal(); + return ret; + } + + void ClearCompleteSequence() { + base::AutoLock lock(lock_); + complete_sequence_.clear(); + started_events_ = 0; + } + + private: + void SignalWorkerDone(int id) { + { + base::AutoLock lock(lock_); + complete_sequence_.push_back(id); + } + cond_var_.Signal(); + } + + // Protects the complete_sequence. + base::Lock lock_; + + base::ConditionVariable cond_var_; + + // Protected by lock_. + std::vector<int> complete_sequence_; + + // Counter of the number of "block" workers that have started. + size_t started_events_; +}; + +class SequencedWorkerPoolTest : public testing::Test, + public SequencedWorkerPool::TestingObserver { + public: + SequencedWorkerPoolTest() + : pool_(kNumWorkerThreads, "test"), + tracker_(new TestTracker) { + pool_.SetTestingObserver(this); + } + ~SequencedWorkerPoolTest() { + } + + virtual void SetUp() { + } + virtual void TearDown() { + pool_.Shutdown(); + } + + SequencedWorkerPool& pool() { return pool_; } + TestTracker* tracker() { return tracker_.get(); } + + // Ensures that the given number of worker threads is created by adding + // tasks and waiting until they complete. Worker thread creation is + // serialized, can happen on background threads asynchronously, and doesn't + // happen any more at shutdown. This means that if a test posts a bunch of + // tasks and calls shutdown, fewer workers will be created than the test may + // expect. + // + // This function ensures that this condition can't happen so tests can make + // assumptions about the number of workers active. See the comment in + // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more + // details. + // + // It will post tasks to the queue with id -1. It also assumes this is the + // first thing called in a test since it will clear the complete_sequence_. + void EnsureAllWorkersCreated() { + // Create a bunch of threads, all waiting. This will cause that may + // workers to be created. + ThreadBlocker blocker; + for (size_t i = 0; i < kNumWorkerThreads; i++) { + pool().PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), -1, &blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Now wake them up and wait until they're done. + blocker.Unblock(kNumWorkerThreads); + tracker()->WaitUntilTasksComplete(kNumWorkerThreads); + + // Clean up the task IDs we added. + tracker()->ClearCompleteSequence(); + } + + protected: + // This closure will be executed right before the pool blocks on shutdown. + base::Closure before_wait_for_shutdown_; + + private: + // SequencedWorkerPool::TestingObserver implementation. + virtual void WillWaitForShutdown() { + if (!before_wait_for_shutdown_.is_null()) + before_wait_for_shutdown_.Run(); + } + + SequencedWorkerPool pool_; + scoped_refptr<TestTracker> tracker_; +}; + +// Checks that the given number of entries are in the tasks to complete of +// the given tracker, and then signals the given event the given number of +// times. This is used to wakt up blocked background threads before blocking +// on shutdown. +void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, + size_t expected_tasks_to_complete, + ThreadBlocker* blocker, + size_t threads_to_awake) { + EXPECT_EQ( + expected_tasks_to_complete, + tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); + + blocker->Unblock(threads_to_awake); +} + +} // namespace + +// Tests that same-named tokens have the same ID. +TEST_F(SequencedWorkerPoolTest, NamedTokens) { + const std::string name1("hello"); + SequencedWorkerPool::SequenceToken token1 = + pool().GetNamedSequenceToken(name1); + + SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); + + const std::string name3("goodbye"); + SequencedWorkerPool::SequenceToken token3 = + pool().GetNamedSequenceToken(name3); + + // All 3 tokens should be different. + EXPECT_FALSE(token1.Equals(token2)); + EXPECT_FALSE(token1.Equals(token3)); + EXPECT_FALSE(token2.Equals(token3)); + + // Requesting the same name again should give the same value. + SequencedWorkerPool::SequenceToken token1again = + pool().GetNamedSequenceToken(name1); + EXPECT_TRUE(token1.Equals(token1again)); + + SequencedWorkerPool::SequenceToken token3again = + pool().GetNamedSequenceToken(name3); + EXPECT_TRUE(token3.Equals(token3again)); +} + +// Tests that posting a bunch of tasks (many more than the number of worker +// threads) runs them all. +TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { + pool().PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::SlowTask, tracker(), 0)); + + const size_t kNumTasks = 20; + for (size_t i = 1; i < kNumTasks; i++) { + pool().PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), i)); + } + + std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); + EXPECT_EQ(kNumTasks, result.size()); +} + +// Test that tasks with the same sequence token are executed in order but don't +// affect other tasks. +TEST_F(SequencedWorkerPoolTest, Sequence) { + // Fill all the worker threads except one. + const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; + ThreadBlocker background_blocker; + for (size_t i = 0; i < kNumBackgroundTasks; i++) { + pool().PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &background_blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); + + // Create two tasks with the same sequence token, one that will block on the + // event, and one which will just complete quickly when it's run. Since there + // is one worker thread free, the first task will start and then block, and + // the second task should be waiting. + ThreadBlocker blocker; + SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); + pool().PostSequencedWorkerTask( + token1, FROM_HERE, + base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); + pool().PostSequencedWorkerTask( + token1, FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 101)); + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // Create another two tasks as above with a different token. These will be + // blocked since there are no slots to run. + SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); + pool().PostSequencedWorkerTask( + token2, FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 200)); + pool().PostSequencedWorkerTask( + token2, FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 201)); + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // Let one background task complete. This should then let both tasks of + // token2 run to completion in order. The second task of token1 should still + // be blocked. + background_blocker.Unblock(1); + std::vector<int> result = tracker()->WaitUntilTasksComplete(3); + ASSERT_EQ(3u, result.size()); + EXPECT_EQ(200, result[1]); + EXPECT_EQ(201, result[2]); + + // Finish the rest of the background tasks. This should leave some workers + // free with the second token1 task still blocked on the first. + background_blocker.Unblock(kNumBackgroundTasks - 1); + EXPECT_EQ(kNumBackgroundTasks + 2, + tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); + + // Allow the first task of token1 to complete. This should run the second. + blocker.Unblock(1); + result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); + ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); + EXPECT_EQ(100, result[result.size() - 2]); + EXPECT_EQ(101, result[result.size() - 1]); +} + +// Tests that unrun tasks are discarded properly according to their shutdown +// mode. +TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { + // Start tasks to take all the threads and block them. + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + for (size_t i = 0; i < kNumWorkerThreads; i++) { + pool().PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Create some tasks with different shutdown modes. + pool().PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 100), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); + pool().PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 101), + SequencedWorkerPool::SKIP_ON_SHUTDOWN); + pool().PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 102), + SequencedWorkerPool::BLOCK_SHUTDOWN); + + // Shutdown the worker pool. This should discard all non-blocking tasks. + before_wait_for_shutdown_ = + base::Bind(&EnsureTasksToCompleteCountAndUnblock, + scoped_refptr<TestTracker>(tracker()), 0, + &blocker, kNumWorkerThreads); + pool().Shutdown(); + + std::vector<int> result = tracker()->WaitUntilTasksComplete(4); + + // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN + // one, in no particular order. + ASSERT_EQ(4u, result.size()); + for (size_t i = 0; i < kNumWorkerThreads; i++) { + EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != + result.end()); + } + EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); +} + +// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. +TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + pool().PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), 0, &blocker), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); + tracker()->WaitUntilTasksBlocked(1); + + // This should not block. If this test hangs, it means it failed. + pool().Shutdown(); + + // The task should not have completed yet. + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // Posting more tasks should fail. + EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + + // Continue the background thread and make sure the task can complete. + blocker.Unblock(1); + std::vector<int> result = tracker()->WaitUntilTasksComplete(1); + EXPECT_EQ(1u, result.size()); +} + +} // namespace base |