diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-02-27 23:09:57 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-02-27 23:09:57 +0000 |
commit | e3f7624b8ccca961bce6c326e3986af3961c37d2 (patch) | |
tree | 842ef15c1645931277c9d5f2e1858232a5a95d9c /base | |
parent | 26438189c5d6b75579d410a89529a48dd7b029af (diff) | |
download | chromium_src-e3f7624b8ccca961bce6c326e3986af3961c37d2.zip chromium_src-e3f7624b8ccca961bce6c326e3986af3961c37d2.tar.gz chromium_src-e3f7624b8ccca961bce6c326e3986af3961c37d2.tar.bz2 |
Fix up SequencedWorkerPool in preparation for making it a TaskRunner
Make SequencedWorkerPool ref-counted, merge it with ::Inner, and
make its destructor private. Make users hold a scoped_refptr.
Fix bug where SequencedWorkerPool::Worker wasn't taking a reference
to the worker pool.
Rename SequencedWorkerPool::PostTask to PostTaskHelper.
Clean up includes and use forward declarations when possible.
Make SequencedWorkerPool::Shutdown completely thread-safe by merging
the terminating_ and shutdown_called_ flag. (Now that it's ref-counted,
it can be passed around multiple threads.)
Clean up includes and params in webkit/dom_storage a bit.
BUG=114329,114330
TEST=
Review URL: https://chromiumcodereview.appspot.com/9347056
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@123823 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 234 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 44 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_unittest.cc | 75 |
3 files changed, 193 insertions, 160 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 6d47068..7679c15 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -1,61 +1,77 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/threading/sequenced_worker_pool.h" -#include <deque> +#include <list> +#include <map> #include <set> +#include <vector> #include "base/atomicops.h" -#include "base/bind.h" -#include "base/memory/scoped_ptr.h" +#include "base/callback.h" +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/memory/linked_ptr.h" #include "base/metrics/histogram.h" #include "base/stringprintf.h" #include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" #include "base/threading/simple_thread.h" -#include "base/threading/thread.h" +#include "base/time.h" +#include "base/tracked_objects.h" namespace base { namespace { struct SequencedTask { + SequencedTask() + : sequence_token_id(0), + shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} + + ~SequencedTask() {} + int sequence_token_id; SequencedWorkerPool::WorkerShutdown shutdown_behavior; tracked_objects::Location location; - base::Closure task; + Closure task; }; } // namespace // Worker --------------------------------------------------------------------- -class SequencedWorkerPool::Worker : public base::SimpleThread { +class SequencedWorkerPool::Worker : public SimpleThread { public: - Worker(SequencedWorkerPool::Inner* inner, + // Hold a ref to |worker_pool|, since we want to keep it around even + // if it doesn't join our thread. Note that this (deliberately) + // leaks on shutdown. + Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, int thread_number, const std::string& thread_name_prefix); - ~Worker(); + virtual ~Worker(); // SimpleThread implementation. This actually runs the background thread. - virtual void Run(); + virtual void Run() OVERRIDE; private: - SequencedWorkerPool::Inner* inner_; - SequencedWorkerPool::WorkerShutdown current_shutdown_mode_; + const scoped_refptr<SequencedWorkerPool> worker_pool_; DISALLOW_COPY_AND_ASSIGN(Worker); }; - // Inner ---------------------------------------------------------------------- -class SequencedWorkerPool::Inner - : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { +class SequencedWorkerPool::Inner { public: - Inner(size_t max_threads, const std::string& thread_name_prefix); - virtual ~Inner(); + // Take a raw pointer to |worker| to avoid cycles (since we're owned + // by it). + Inner(SequencedWorkerPool* worker_pool, size_t max_threads, + const std::string& thread_name_prefix); + + ~Inner(); SequenceToken GetSequenceToken(); @@ -65,16 +81,16 @@ class SequencedWorkerPool::Inner // token ID is used. This allows us to implement the optional name lookup // from a single function without having to enter the lock a separate time. bool PostTask(const std::string* optional_token_name, - int sequence_token_id, - SequencedWorkerPool::WorkerShutdown shutdown_behavior, + SequenceToken sequence_token, + WorkerShutdown shutdown_behavior, const tracked_objects::Location& from_here, - const base::Closure& task); + const Closure& task); - void Flush(); + void FlushForTesting(); void Shutdown(); - void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); + void SetTestingObserver(TestingObserver* observer); // Runs the worker loop on the background thread. void ThreadLoop(Worker* this_worker); @@ -88,7 +104,7 @@ class SequencedWorkerPool::Inner // vector the next time the lock is released. See the implementation for // a more detailed description. bool GetWork(SequencedTask* task, - std::vector<base::Closure>* delete_these_outside_lock); + std::vector<Closure>* delete_these_outside_lock); // Peforms init and cleanup around running the given task. WillRun... // returns the value from PrepareToStartAdditionalThreadIfNecessary. @@ -125,23 +141,25 @@ class SequencedWorkerPool::Inner // called inside the lock. bool CanShutdown() const; + SequencedWorkerPool* const worker_pool_; + // The last sequence number used. Managed by GetSequenceToken, since this // only does threadsafe increment operations, you do not need to hold the // lock. - volatile base::subtle::Atomic32 last_sequence_number_; + volatile subtle::Atomic32 last_sequence_number_; // This lock protects |everything in this class|. Do not read or modify // anything without holding this lock. Do not block while holding this // lock. - base::Lock lock_; + Lock lock_; // Condition variable used to wake up worker threads when a task is runnable. - base::ConditionVariable cond_var_; + ConditionVariable cond_var_; // The maximum number of worker threads we'll create. - size_t max_threads_; + const size_t max_threads_; - std::string thread_name_prefix_; + const std::string thread_name_prefix_; // Associates all known sequence token names with their IDs. std::map<std::string, int> named_sequence_tokens_; @@ -177,23 +195,24 @@ class SequencedWorkerPool::Inner // Lists all sequence tokens currently executing. std::set<int> current_sequences_; - // Set when the app is terminating and no further tasks should be allowed, - // though we may still be running existing tasks. - bool terminating_; - - // Set when Shutdown is called to do some assertions. + // Set when Shutdown is called and no further tasks should be + // allowed, though we may still be running existing tasks. bool shutdown_called_; - SequencedWorkerPool::TestingObserver* testing_observer_; + TestingObserver* testing_observer_; + + DISALLOW_COPY_AND_ASSIGN(Inner); }; -SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner, - int thread_number, - const std::string& prefix) - : base::SimpleThread( +// Worker definitions --------------------------------------------------------- + +SequencedWorkerPool::Worker::Worker( + const scoped_refptr<SequencedWorkerPool>& worker_pool, + int thread_number, + const std::string& prefix) + : SimpleThread( prefix + StringPrintf("Worker%d", thread_number).c_str()), - inner_(inner), - current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) { + worker_pool_(worker_pool) { Start(); } @@ -206,12 +225,17 @@ void SequencedWorkerPool::Worker::Run() { // using DelegateSimpleThread and have Inner implement the Delegate to avoid // having these worker objects at all, but that method lacks the ability to // send thread-specific information easily to the thread loop. - inner_->ThreadLoop(this); + worker_pool_->inner_->ThreadLoop(this); } -SequencedWorkerPool::Inner::Inner(size_t max_threads, - const std::string& thread_name_prefix) - : last_sequence_number_(0), +// Inner definitions --------------------------------------------------------- + +SequencedWorkerPool::Inner::Inner( + SequencedWorkerPool* worker_pool, + size_t max_threads, + const std::string& thread_name_prefix) + : worker_pool_(worker_pool), + last_sequence_number_(0), lock_(), cond_var_(&lock_), max_threads_(max_threads), @@ -221,10 +245,8 @@ SequencedWorkerPool::Inner::Inner(size_t max_threads, blocking_shutdown_thread_count_(0), pending_task_count_(0), blocking_shutdown_pending_task_count_(0), - terminating_(false), shutdown_called_(false), - testing_observer_(NULL) { -} + testing_observer_(NULL) {} SequencedWorkerPool::Inner::~Inner() { // You must call Shutdown() before destroying the pool. @@ -239,34 +261,33 @@ SequencedWorkerPool::Inner::~Inner() { SequencedWorkerPool::SequenceToken SequencedWorkerPool::Inner::GetSequenceToken() { - base::subtle::Atomic32 result = - base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); + subtle::Atomic32 result = + subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); return SequenceToken(static_cast<int>(result)); } SequencedWorkerPool::SequenceToken -SequencedWorkerPool::Inner::GetNamedSequenceToken( - const std::string& name) { - base::AutoLock lock(lock_); +SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { + AutoLock lock(lock_); return SequenceToken(LockedGetNamedTokenID(name)); } bool SequencedWorkerPool::Inner::PostTask( const std::string* optional_token_name, - int sequence_token_id, - SequencedWorkerPool::WorkerShutdown shutdown_behavior, + SequenceToken sequence_token, + WorkerShutdown shutdown_behavior, const tracked_objects::Location& from_here, - const base::Closure& task) { + const Closure& task) { SequencedTask sequenced; - sequenced.sequence_token_id = sequence_token_id; + sequenced.sequence_token_id = sequence_token.id_; sequenced.shutdown_behavior = shutdown_behavior; sequenced.location = from_here; sequenced.task = task; int create_thread_id = 0; { - base::AutoLock lock(lock_); - if (terminating_) + AutoLock lock(lock_); + if (shutdown_called_) return false; // Now that we have the lock, apply the named token rules. @@ -291,9 +312,9 @@ bool SequencedWorkerPool::Inner::PostTask( return true; } -void SequencedWorkerPool::Inner::Flush() { +void SequencedWorkerPool::Inner::FlushForTesting() { { - base::AutoLock lock(lock_); + AutoLock lock(lock_); while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) cond_var_.Wait(); } @@ -301,18 +322,16 @@ void SequencedWorkerPool::Inner::Flush() { } void SequencedWorkerPool::Inner::Shutdown() { - if (shutdown_called_) - return; - shutdown_called_ = true; - // Mark us as terminated and go through and drop all tasks that aren't // required to run on shutdown. Since no new tasks will get posted once the // terminated flag is set, this ensures that all remaining tasks are required // for shutdown whenever the termianted_ flag is set. { - base::AutoLock lock(lock_); - DCHECK(!terminating_); - terminating_ = true; + AutoLock lock(lock_); + + if (shutdown_called_) + return; + shutdown_called_ = true; // Tickle the threads. This will wake up a waiting one so it will know that // it can exit, which in turn will wake up any other waiting ones. @@ -330,27 +349,27 @@ void SequencedWorkerPool::Inner::Shutdown() { if (testing_observer_) testing_observer_->WillWaitForShutdown(); - base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now(); + TimeTicks shutdown_wait_begin = TimeTicks::Now(); // Wait for no more tasks. { - base::AutoLock lock(lock_); + AutoLock lock(lock_); while (!CanShutdown()) cond_var_.Wait(); } UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", - base::TimeTicks::Now() - shutdown_wait_begin); + TimeTicks::Now() - shutdown_wait_begin); } void SequencedWorkerPool::Inner::SetTestingObserver( - SequencedWorkerPool::TestingObserver* observer) { - base::AutoLock lock(lock_); + TestingObserver* observer) { + AutoLock lock(lock_); testing_observer_ = observer; } void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { { - base::AutoLock lock(lock_); + AutoLock lock(lock_); DCHECK(thread_being_created_); thread_being_created_ = false; threads_.push_back(linked_ptr<Worker>(this_worker)); @@ -358,11 +377,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { while (true) { // See GetWork for what delete_these_outside_lock is doing. SequencedTask task; - std::vector<base::Closure> delete_these_outside_lock; + std::vector<Closure> delete_these_outside_lock; if (GetWork(&task, &delete_these_outside_lock)) { int new_thread_id = WillRunWorkerTask(task); { - base::AutoUnlock unlock(lock_); + AutoUnlock unlock(lock_); cond_var_.Signal(); delete_these_outside_lock.clear(); @@ -374,15 +393,16 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { // Make sure our task is erased outside the lock for the same reason // we do this with delete_these_oustide_lock. - task.task = base::Closure(); + task.task = Closure(); } DidRunWorkerTask(task); // Must be done inside the lock. } else { - // When we're terminating and there's no more work, we can shut down. - // You can't get more tasks posted once terminating_ is set. There may - // be some tasks stuck behind running ones with the same sequence - // token, but additional threads won't help this case. - if (terminating_) + // When we're terminating and there's no more work, we can + // shut down. You can't get more tasks posted once + // shutdown_called_ is set. There may be some tasks stuck + // behind running ones with the same sequence token, but + // additional threads won't help this case. + if (shutdown_called_) break; waiting_thread_count_++; cond_var_.Signal(); // For Flush() that may be waiting on the @@ -416,7 +436,7 @@ int SequencedWorkerPool::Inner::LockedGetNamedTokenID( bool SequencedWorkerPool::Inner::GetWork( SequencedTask* task, - std::vector<base::Closure>* delete_these_outside_lock) { + std::vector<Closure>* delete_these_outside_lock) { lock_.AssertAcquired(); DCHECK_EQ(pending_tasks_.size(), pending_task_count_); @@ -455,7 +475,7 @@ bool SequencedWorkerPool::Inner::GetWork( continue; } - if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { + if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { // We're shutting down and the task we just found isn't blocking // shutdown. Delete it and get more work. // @@ -502,7 +522,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { if (task.sequence_token_id) current_sequences_.insert(task.sequence_token_id); - if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) + if (task.shutdown_behavior == BLOCK_SHUTDOWN) blocking_shutdown_thread_count_++; // We just picked up a task. Since StartAdditionalThreadIfHelpful only @@ -526,7 +546,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { lock_.AssertAcquired(); - if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) { + if (task.shutdown_behavior == BLOCK_SHUTDOWN) { DCHECK_GT(blocking_shutdown_thread_count_, 0u); blocking_shutdown_thread_count_--; } @@ -544,6 +564,7 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( } int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { + lock_.AssertAcquired(); // How thread creation works: // // We'de like to avoid creating threads with the lock held. However, we @@ -567,13 +588,13 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { // 2. The first task post causes us to start a worker. Other tasks do not // cause a worker to start since one is pending. // 3. Main thread initiates shutdown. - // 4. No more threads are created since the terminating_ flag is set. + // 4. No more threads are created since the shutdown_called_ flag is set. // // The result is that one may expect that max_threads_ workers to be created // given the workload, but in reality fewer may be created because the // sequence of thread creation on the background threads is racing with the // shutdown call. - if (!terminating_ && + if (!shutdown_called_ && !thread_being_created_ && threads_.size() < max_threads_ && waiting_thread_count_ == 0) { @@ -597,7 +618,7 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( // The worker is assigned to the list when the thread actually starts, which // will manage the memory of the pointer. - new Worker(this, thread_number, thread_name_prefix_); + new Worker(worker_pool_, thread_number, thread_name_prefix_); } bool SequencedWorkerPool::Inner::CanShutdown() const { @@ -610,13 +631,13 @@ bool SequencedWorkerPool::Inner::CanShutdown() const { // SequencedWorkerPool -------------------------------------------------------- -SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, - const std::string& thread_name_prefix) - : inner_(new Inner(max_threads, thread_name_prefix)) { -} +SequencedWorkerPool::SequencedWorkerPool( + size_t max_threads, + const std::string& thread_name_prefix) + : inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), + max_threads, thread_name_prefix)) {} -SequencedWorkerPool::~SequencedWorkerPool() { -} +SequencedWorkerPool::~SequencedWorkerPool() {} SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { return inner_->GetSequenceToken(); @@ -629,44 +650,47 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( bool SequencedWorkerPool::PostWorkerTask( const tracked_objects::Location& from_here, - const base::Closure& task) { - return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); + const Closure& task) { + return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, + from_here, task); } bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( const tracked_objects::Location& from_here, - const base::Closure& task, + const Closure& task, WorkerShutdown shutdown_behavior) { - return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); + return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, + from_here, task); } bool SequencedWorkerPool::PostSequencedWorkerTask( SequenceToken sequence_token, const tracked_objects::Location& from_here, - const base::Closure& task) { - return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, + const Closure& task) { + return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, from_here, task); } bool SequencedWorkerPool::PostNamedSequencedWorkerTask( const std::string& token_name, const tracked_objects::Location& from_here, - const base::Closure& task) { + const Closure& task) { DCHECK(!token_name.empty()); - return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); + return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, + from_here, task); } bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( SequenceToken sequence_token, const tracked_objects::Location& from_here, - const base::Closure& task, + const Closure& task, WorkerShutdown shutdown_behavior) { - return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, + return inner_->PostTask(NULL, sequence_token, shutdown_behavior, from_here, task); } void SequencedWorkerPool::FlushForTesting() { - inner_->Flush(); + inner_->FlushForTesting(); } void SequencedWorkerPool::Shutdown() { diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index f5770a7..c840f07 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -6,13 +6,18 @@ #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ #pragma once +#include <cstddef> #include <string> -#include "base/callback.h" -#include "base/memory/linked_ptr.h" -#include "base/memory/ref_counted.h" -#include "base/tracked_objects.h" #include "base/base_export.h" +#include "base/basictypes.h" +#include "base/callback_forward.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" + +namespace tracked_objects { +class Location; +} // namespace tracked_objects namespace base { @@ -46,7 +51,8 @@ namespace base { // not enforce shutdown semantics or allow us to specify how many worker // threads to run. For the typical use case of random background work, we don't // necessarily want to be super aggressive about creating threads. -class BASE_EXPORT SequencedWorkerPool { +class BASE_EXPORT SequencedWorkerPool + : public RefCountedThreadSafe<SequencedWorkerPool> { public: // Defines what should happen to a task posted to the worker pool on shutdown. enum WorkerShutdown { @@ -87,10 +93,10 @@ class BASE_EXPORT SequencedWorkerPool { }; // Opaque identifier that defines sequencing of tasks posted to the worker - // pool. See NewSequenceToken(). + // pool. class SequenceToken { public: - explicit SequenceToken() : id_(0) {} + SequenceToken() : id_(0) {} ~SequenceToken() {} bool Equals(const SequenceToken& other) const { @@ -100,7 +106,7 @@ class BASE_EXPORT SequencedWorkerPool { private: friend class SequencedWorkerPool; - SequenceToken(int id) : id_(id) {} + explicit SequenceToken(int id) : id_(id) {} int id_; }; @@ -116,7 +122,6 @@ class BASE_EXPORT SequencedWorkerPool { // and a prefix for the thread name to ad in debugging. SequencedWorkerPool(size_t max_threads, const std::string& thread_name_prefix); - ~SequencedWorkerPool(); // Returns a unique token that can be used to sequence tasks posted to // PostSequencedWorkerTask(). Valid tokens are alwys nonzero. @@ -149,12 +154,12 @@ class BASE_EXPORT SequencedWorkerPool { // Returns true if the task was posted successfully. This may fail during // shutdown regardless of the specified ShutdownBehavior. bool PostWorkerTask(const tracked_objects::Location& from_here, - const base::Closure& task); + const Closure& task); // Same as PostWorkerTask but allows specification of the shutdown behavior. bool PostWorkerTaskWithShutdownBehavior( const tracked_objects::Location& from_here, - const base::Closure& task, + const Closure& task, WorkerShutdown shutdown_behavior); // Like PostWorkerTask above, but provides sequencing semantics. This means @@ -170,20 +175,20 @@ class BASE_EXPORT SequencedWorkerPool { // shutdown regardless of the specified ShutdownBehavior. bool PostSequencedWorkerTask(SequenceToken sequence_token, const tracked_objects::Location& from_here, - const base::Closure& task); + const Closure& task); // Like PostSequencedWorkerTask above, but allows you to specify a named // token, which saves an extra call to GetNamedSequenceToken. bool PostNamedSequencedWorkerTask(const std::string& token_name, const tracked_objects::Location& from_here, - const base::Closure& task); + const Closure& task); // Same as PostSequencedWorkerTask but allows specification of the shutdown // behavior. bool PostSequencedWorkerTaskWithShutdownBehavior( SequenceToken sequence_token, const tracked_objects::Location& from_here, - const base::Closure& task, + const Closure& task, WorkerShutdown shutdown_behavior); // Blocks until all pending tasks are complete. This should only be called in @@ -206,13 +211,16 @@ class BASE_EXPORT SequencedWorkerPool { void SetTestingObserver(TestingObserver* observer); private: + friend class RefCountedThreadSafe<SequencedWorkerPool>; + class Inner; class Worker; - friend class Inner; - friend class Worker; + ~SequencedWorkerPool(); - scoped_refptr<Inner> inner_; + // Avoid pulling in too many headers by putting everything into + // |inner_|. + const scoped_ptr<Inner> inner_; DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); }; diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc index 25736d0..a7e9e93 100644 --- a/base/threading/sequenced_worker_pool_unittest.cc +++ b/base/threading/sequenced_worker_pool_unittest.cc @@ -10,6 +10,7 @@ #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" #include "base/threading/sequenced_worker_pool.h" +#include "base/tracked_objects.h" #include "testing/gtest/include/gtest/gtest.h" namespace base { @@ -143,9 +144,9 @@ class SequencedWorkerPoolTest : public testing::Test, public SequencedWorkerPool::TestingObserver { public: SequencedWorkerPoolTest() - : pool_(kNumWorkerThreads, "test"), + : pool_(new SequencedWorkerPool(kNumWorkerThreads, "test")), tracker_(new TestTracker) { - pool_.SetTestingObserver(this); + pool_->SetTestingObserver(this); } ~SequencedWorkerPoolTest() { } @@ -153,10 +154,10 @@ class SequencedWorkerPoolTest : public testing::Test, virtual void SetUp() { } virtual void TearDown() { - pool_.Shutdown(); + pool_->Shutdown(); } - SequencedWorkerPool& pool() { return pool_; } + const scoped_refptr<SequencedWorkerPool>& pool() { return pool_; } TestTracker* tracker() { return tracker_.get(); } // Ensures that the given number of worker threads is created by adding @@ -178,9 +179,9 @@ class SequencedWorkerPoolTest : public testing::Test, // workers to be created. ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { - pool().PostWorkerTask(FROM_HERE, - base::Bind(&TestTracker::BlockTask, - tracker(), -1, &blocker)); + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), -1, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); @@ -203,8 +204,8 @@ class SequencedWorkerPoolTest : public testing::Test, before_wait_for_shutdown_.Run(); } - SequencedWorkerPool pool_; - scoped_refptr<TestTracker> tracker_; + const scoped_refptr<SequencedWorkerPool> pool_; + const scoped_refptr<TestTracker> tracker_; }; // Checks that the given number of entries are in the tasks to complete of @@ -228,13 +229,13 @@ void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, TEST_F(SequencedWorkerPoolTest, NamedTokens) { const std::string name1("hello"); SequencedWorkerPool::SequenceToken token1 = - pool().GetNamedSequenceToken(name1); + pool()->GetNamedSequenceToken(name1); - SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); + SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); const std::string name3("goodbye"); SequencedWorkerPool::SequenceToken token3 = - pool().GetNamedSequenceToken(name3); + pool()->GetNamedSequenceToken(name3); // All 3 tokens should be different. EXPECT_FALSE(token1.Equals(token2)); @@ -243,24 +244,24 @@ TEST_F(SequencedWorkerPoolTest, NamedTokens) { // Requesting the same name again should give the same value. SequencedWorkerPool::SequenceToken token1again = - pool().GetNamedSequenceToken(name1); + pool()->GetNamedSequenceToken(name1); EXPECT_TRUE(token1.Equals(token1again)); SequencedWorkerPool::SequenceToken token3again = - pool().GetNamedSequenceToken(name3); + pool()->GetNamedSequenceToken(name3); EXPECT_TRUE(token3.Equals(token3again)); } // Tests that posting a bunch of tasks (many more than the number of worker // threads) runs them all. TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { - pool().PostWorkerTask(FROM_HERE, - base::Bind(&TestTracker::SlowTask, tracker(), 0)); + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::SlowTask, tracker(), 0)); const size_t kNumTasks = 20; for (size_t i = 1; i < kNumTasks; i++) { - pool().PostWorkerTask(FROM_HERE, - base::Bind(&TestTracker::FastTask, tracker(), i)); + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), i)); } std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); @@ -274,9 +275,9 @@ TEST_F(SequencedWorkerPoolTest, Sequence) { const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; ThreadBlocker background_blocker; for (size_t i = 0; i < kNumBackgroundTasks; i++) { - pool().PostWorkerTask(FROM_HERE, - base::Bind(&TestTracker::BlockTask, - tracker(), i, &background_blocker)); + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &background_blocker)); } tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); @@ -285,22 +286,22 @@ TEST_F(SequencedWorkerPoolTest, Sequence) { // is one worker thread free, the first task will start and then block, and // the second task should be waiting. ThreadBlocker blocker; - SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); - pool().PostSequencedWorkerTask( + SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); + pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); - pool().PostSequencedWorkerTask( + pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Create another two tasks as above with a different token. These will be // blocked since there are no slots to run. - SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); - pool().PostSequencedWorkerTask( + SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); + pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 200)); - pool().PostSequencedWorkerTask( + pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 201)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); @@ -335,22 +336,22 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { EnsureAllWorkersCreated(); ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { - pool().PostWorkerTask(FROM_HERE, - base::Bind(&TestTracker::BlockTask, - tracker(), i, &blocker)); + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Create some tasks with different shutdown modes. - pool().PostWorkerTaskWithShutdownBehavior( + pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 100), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); - pool().PostWorkerTaskWithShutdownBehavior( + pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101), SequencedWorkerPool::SKIP_ON_SHUTDOWN); - pool().PostWorkerTaskWithShutdownBehavior( + pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 102), SequencedWorkerPool::BLOCK_SHUTDOWN); @@ -360,7 +361,7 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr<TestTracker>(tracker()), 0, &blocker, kNumWorkerThreads); - pool().Shutdown(); + pool()->Shutdown(); std::vector<int> result = tracker()->WaitUntilTasksComplete(4); @@ -378,7 +379,7 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { EnsureAllWorkersCreated(); ThreadBlocker blocker; - pool().PostWorkerTaskWithShutdownBehavior( + pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 0, &blocker), @@ -386,13 +387,13 @@ TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { tracker()->WaitUntilTasksBlocked(1); // This should not block. If this test hangs, it means it failed. - pool().Shutdown(); + pool()->Shutdown(); // The task should not have completed yet. EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Posting more tasks should fail. - EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( + EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |