summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-02-27 23:09:57 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-02-27 23:09:57 +0000
commite3f7624b8ccca961bce6c326e3986af3961c37d2 (patch)
tree842ef15c1645931277c9d5f2e1858232a5a95d9c /base
parent26438189c5d6b75579d410a89529a48dd7b029af (diff)
downloadchromium_src-e3f7624b8ccca961bce6c326e3986af3961c37d2.zip
chromium_src-e3f7624b8ccca961bce6c326e3986af3961c37d2.tar.gz
chromium_src-e3f7624b8ccca961bce6c326e3986af3961c37d2.tar.bz2
Fix up SequencedWorkerPool in preparation for making it a TaskRunner
Make SequencedWorkerPool ref-counted, merge it with ::Inner, and make its destructor private. Make users hold a scoped_refptr. Fix bug where SequencedWorkerPool::Worker wasn't taking a reference to the worker pool. Rename SequencedWorkerPool::PostTask to PostTaskHelper. Clean up includes and use forward declarations when possible. Make SequencedWorkerPool::Shutdown completely thread-safe by merging the terminating_ and shutdown_called_ flag. (Now that it's ref-counted, it can be passed around multiple threads.) Clean up includes and params in webkit/dom_storage a bit. BUG=114329,114330 TEST= Review URL: https://chromiumcodereview.appspot.com/9347056 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@123823 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/threading/sequenced_worker_pool.cc234
-rw-r--r--base/threading/sequenced_worker_pool.h44
-rw-r--r--base/threading/sequenced_worker_pool_unittest.cc75
3 files changed, 193 insertions, 160 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 6d47068..7679c15 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -1,61 +1,77 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/threading/sequenced_worker_pool.h"
-#include <deque>
+#include <list>
+#include <map>
#include <set>
+#include <vector>
#include "base/atomicops.h"
-#include "base/bind.h"
-#include "base/memory/scoped_ptr.h"
+#include "base/callback.h"
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/memory/linked_ptr.h"
#include "base/metrics/histogram.h"
#include "base/stringprintf.h"
#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
#include "base/threading/simple_thread.h"
-#include "base/threading/thread.h"
+#include "base/time.h"
+#include "base/tracked_objects.h"
namespace base {
namespace {
struct SequencedTask {
+ SequencedTask()
+ : sequence_token_id(0),
+ shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
+
+ ~SequencedTask() {}
+
int sequence_token_id;
SequencedWorkerPool::WorkerShutdown shutdown_behavior;
tracked_objects::Location location;
- base::Closure task;
+ Closure task;
};
} // namespace
// Worker ---------------------------------------------------------------------
-class SequencedWorkerPool::Worker : public base::SimpleThread {
+class SequencedWorkerPool::Worker : public SimpleThread {
public:
- Worker(SequencedWorkerPool::Inner* inner,
+ // Hold a ref to |worker_pool|, since we want to keep it around even
+ // if it doesn't join our thread. Note that this (deliberately)
+ // leaks on shutdown.
+ Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
int thread_number,
const std::string& thread_name_prefix);
- ~Worker();
+ virtual ~Worker();
// SimpleThread implementation. This actually runs the background thread.
- virtual void Run();
+ virtual void Run() OVERRIDE;
private:
- SequencedWorkerPool::Inner* inner_;
- SequencedWorkerPool::WorkerShutdown current_shutdown_mode_;
+ const scoped_refptr<SequencedWorkerPool> worker_pool_;
DISALLOW_COPY_AND_ASSIGN(Worker);
};
-
// Inner ----------------------------------------------------------------------
-class SequencedWorkerPool::Inner
- : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> {
+class SequencedWorkerPool::Inner {
public:
- Inner(size_t max_threads, const std::string& thread_name_prefix);
- virtual ~Inner();
+ // Take a raw pointer to |worker| to avoid cycles (since we're owned
+ // by it).
+ Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
+ const std::string& thread_name_prefix);
+
+ ~Inner();
SequenceToken GetSequenceToken();
@@ -65,16 +81,16 @@ class SequencedWorkerPool::Inner
// token ID is used. This allows us to implement the optional name lookup
// from a single function without having to enter the lock a separate time.
bool PostTask(const std::string* optional_token_name,
- int sequence_token_id,
- SequencedWorkerPool::WorkerShutdown shutdown_behavior,
+ SequenceToken sequence_token,
+ WorkerShutdown shutdown_behavior,
const tracked_objects::Location& from_here,
- const base::Closure& task);
+ const Closure& task);
- void Flush();
+ void FlushForTesting();
void Shutdown();
- void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer);
+ void SetTestingObserver(TestingObserver* observer);
// Runs the worker loop on the background thread.
void ThreadLoop(Worker* this_worker);
@@ -88,7 +104,7 @@ class SequencedWorkerPool::Inner
// vector the next time the lock is released. See the implementation for
// a more detailed description.
bool GetWork(SequencedTask* task,
- std::vector<base::Closure>* delete_these_outside_lock);
+ std::vector<Closure>* delete_these_outside_lock);
// Peforms init and cleanup around running the given task. WillRun...
// returns the value from PrepareToStartAdditionalThreadIfNecessary.
@@ -125,23 +141,25 @@ class SequencedWorkerPool::Inner
// called inside the lock.
bool CanShutdown() const;
+ SequencedWorkerPool* const worker_pool_;
+
// The last sequence number used. Managed by GetSequenceToken, since this
// only does threadsafe increment operations, you do not need to hold the
// lock.
- volatile base::subtle::Atomic32 last_sequence_number_;
+ volatile subtle::Atomic32 last_sequence_number_;
// This lock protects |everything in this class|. Do not read or modify
// anything without holding this lock. Do not block while holding this
// lock.
- base::Lock lock_;
+ Lock lock_;
// Condition variable used to wake up worker threads when a task is runnable.
- base::ConditionVariable cond_var_;
+ ConditionVariable cond_var_;
// The maximum number of worker threads we'll create.
- size_t max_threads_;
+ const size_t max_threads_;
- std::string thread_name_prefix_;
+ const std::string thread_name_prefix_;
// Associates all known sequence token names with their IDs.
std::map<std::string, int> named_sequence_tokens_;
@@ -177,23 +195,24 @@ class SequencedWorkerPool::Inner
// Lists all sequence tokens currently executing.
std::set<int> current_sequences_;
- // Set when the app is terminating and no further tasks should be allowed,
- // though we may still be running existing tasks.
- bool terminating_;
-
- // Set when Shutdown is called to do some assertions.
+ // Set when Shutdown is called and no further tasks should be
+ // allowed, though we may still be running existing tasks.
bool shutdown_called_;
- SequencedWorkerPool::TestingObserver* testing_observer_;
+ TestingObserver* testing_observer_;
+
+ DISALLOW_COPY_AND_ASSIGN(Inner);
};
-SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner,
- int thread_number,
- const std::string& prefix)
- : base::SimpleThread(
+// Worker definitions ---------------------------------------------------------
+
+SequencedWorkerPool::Worker::Worker(
+ const scoped_refptr<SequencedWorkerPool>& worker_pool,
+ int thread_number,
+ const std::string& prefix)
+ : SimpleThread(
prefix + StringPrintf("Worker%d", thread_number).c_str()),
- inner_(inner),
- current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) {
+ worker_pool_(worker_pool) {
Start();
}
@@ -206,12 +225,17 @@ void SequencedWorkerPool::Worker::Run() {
// using DelegateSimpleThread and have Inner implement the Delegate to avoid
// having these worker objects at all, but that method lacks the ability to
// send thread-specific information easily to the thread loop.
- inner_->ThreadLoop(this);
+ worker_pool_->inner_->ThreadLoop(this);
}
-SequencedWorkerPool::Inner::Inner(size_t max_threads,
- const std::string& thread_name_prefix)
- : last_sequence_number_(0),
+// Inner definitions ---------------------------------------------------------
+
+SequencedWorkerPool::Inner::Inner(
+ SequencedWorkerPool* worker_pool,
+ size_t max_threads,
+ const std::string& thread_name_prefix)
+ : worker_pool_(worker_pool),
+ last_sequence_number_(0),
lock_(),
cond_var_(&lock_),
max_threads_(max_threads),
@@ -221,10 +245,8 @@ SequencedWorkerPool::Inner::Inner(size_t max_threads,
blocking_shutdown_thread_count_(0),
pending_task_count_(0),
blocking_shutdown_pending_task_count_(0),
- terminating_(false),
shutdown_called_(false),
- testing_observer_(NULL) {
-}
+ testing_observer_(NULL) {}
SequencedWorkerPool::Inner::~Inner() {
// You must call Shutdown() before destroying the pool.
@@ -239,34 +261,33 @@ SequencedWorkerPool::Inner::~Inner() {
SequencedWorkerPool::SequenceToken
SequencedWorkerPool::Inner::GetSequenceToken() {
- base::subtle::Atomic32 result =
- base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
+ subtle::Atomic32 result =
+ subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
return SequenceToken(static_cast<int>(result));
}
SequencedWorkerPool::SequenceToken
-SequencedWorkerPool::Inner::GetNamedSequenceToken(
- const std::string& name) {
- base::AutoLock lock(lock_);
+SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
+ AutoLock lock(lock_);
return SequenceToken(LockedGetNamedTokenID(name));
}
bool SequencedWorkerPool::Inner::PostTask(
const std::string* optional_token_name,
- int sequence_token_id,
- SequencedWorkerPool::WorkerShutdown shutdown_behavior,
+ SequenceToken sequence_token,
+ WorkerShutdown shutdown_behavior,
const tracked_objects::Location& from_here,
- const base::Closure& task) {
+ const Closure& task) {
SequencedTask sequenced;
- sequenced.sequence_token_id = sequence_token_id;
+ sequenced.sequence_token_id = sequence_token.id_;
sequenced.shutdown_behavior = shutdown_behavior;
sequenced.location = from_here;
sequenced.task = task;
int create_thread_id = 0;
{
- base::AutoLock lock(lock_);
- if (terminating_)
+ AutoLock lock(lock_);
+ if (shutdown_called_)
return false;
// Now that we have the lock, apply the named token rules.
@@ -291,9 +312,9 @@ bool SequencedWorkerPool::Inner::PostTask(
return true;
}
-void SequencedWorkerPool::Inner::Flush() {
+void SequencedWorkerPool::Inner::FlushForTesting() {
{
- base::AutoLock lock(lock_);
+ AutoLock lock(lock_);
while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
cond_var_.Wait();
}
@@ -301,18 +322,16 @@ void SequencedWorkerPool::Inner::Flush() {
}
void SequencedWorkerPool::Inner::Shutdown() {
- if (shutdown_called_)
- return;
- shutdown_called_ = true;
-
// Mark us as terminated and go through and drop all tasks that aren't
// required to run on shutdown. Since no new tasks will get posted once the
// terminated flag is set, this ensures that all remaining tasks are required
// for shutdown whenever the termianted_ flag is set.
{
- base::AutoLock lock(lock_);
- DCHECK(!terminating_);
- terminating_ = true;
+ AutoLock lock(lock_);
+
+ if (shutdown_called_)
+ return;
+ shutdown_called_ = true;
// Tickle the threads. This will wake up a waiting one so it will know that
// it can exit, which in turn will wake up any other waiting ones.
@@ -330,27 +349,27 @@ void SequencedWorkerPool::Inner::Shutdown() {
if (testing_observer_)
testing_observer_->WillWaitForShutdown();
- base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now();
+ TimeTicks shutdown_wait_begin = TimeTicks::Now();
// Wait for no more tasks.
{
- base::AutoLock lock(lock_);
+ AutoLock lock(lock_);
while (!CanShutdown())
cond_var_.Wait();
}
UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
- base::TimeTicks::Now() - shutdown_wait_begin);
+ TimeTicks::Now() - shutdown_wait_begin);
}
void SequencedWorkerPool::Inner::SetTestingObserver(
- SequencedWorkerPool::TestingObserver* observer) {
- base::AutoLock lock(lock_);
+ TestingObserver* observer) {
+ AutoLock lock(lock_);
testing_observer_ = observer;
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
{
- base::AutoLock lock(lock_);
+ AutoLock lock(lock_);
DCHECK(thread_being_created_);
thread_being_created_ = false;
threads_.push_back(linked_ptr<Worker>(this_worker));
@@ -358,11 +377,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
while (true) {
// See GetWork for what delete_these_outside_lock is doing.
SequencedTask task;
- std::vector<base::Closure> delete_these_outside_lock;
+ std::vector<Closure> delete_these_outside_lock;
if (GetWork(&task, &delete_these_outside_lock)) {
int new_thread_id = WillRunWorkerTask(task);
{
- base::AutoUnlock unlock(lock_);
+ AutoUnlock unlock(lock_);
cond_var_.Signal();
delete_these_outside_lock.clear();
@@ -374,15 +393,16 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
// Make sure our task is erased outside the lock for the same reason
// we do this with delete_these_oustide_lock.
- task.task = base::Closure();
+ task.task = Closure();
}
DidRunWorkerTask(task); // Must be done inside the lock.
} else {
- // When we're terminating and there's no more work, we can shut down.
- // You can't get more tasks posted once terminating_ is set. There may
- // be some tasks stuck behind running ones with the same sequence
- // token, but additional threads won't help this case.
- if (terminating_)
+ // When we're terminating and there's no more work, we can
+ // shut down. You can't get more tasks posted once
+ // shutdown_called_ is set. There may be some tasks stuck
+ // behind running ones with the same sequence token, but
+ // additional threads won't help this case.
+ if (shutdown_called_)
break;
waiting_thread_count_++;
cond_var_.Signal(); // For Flush() that may be waiting on the
@@ -416,7 +436,7 @@ int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
bool SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
- std::vector<base::Closure>* delete_these_outside_lock) {
+ std::vector<Closure>* delete_these_outside_lock) {
lock_.AssertAcquired();
DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
@@ -455,7 +475,7 @@ bool SequencedWorkerPool::Inner::GetWork(
continue;
}
- if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
+ if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
// We're shutting down and the task we just found isn't blocking
// shutdown. Delete it and get more work.
//
@@ -502,7 +522,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
if (task.sequence_token_id)
current_sequences_.insert(task.sequence_token_id);
- if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN)
+ if (task.shutdown_behavior == BLOCK_SHUTDOWN)
blocking_shutdown_thread_count_++;
// We just picked up a task. Since StartAdditionalThreadIfHelpful only
@@ -526,7 +546,7 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
lock_.AssertAcquired();
- if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) {
+ if (task.shutdown_behavior == BLOCK_SHUTDOWN) {
DCHECK_GT(blocking_shutdown_thread_count_, 0u);
blocking_shutdown_thread_count_--;
}
@@ -544,6 +564,7 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
+ lock_.AssertAcquired();
// How thread creation works:
//
// We'de like to avoid creating threads with the lock held. However, we
@@ -567,13 +588,13 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
// 2. The first task post causes us to start a worker. Other tasks do not
// cause a worker to start since one is pending.
// 3. Main thread initiates shutdown.
- // 4. No more threads are created since the terminating_ flag is set.
+ // 4. No more threads are created since the shutdown_called_ flag is set.
//
// The result is that one may expect that max_threads_ workers to be created
// given the workload, but in reality fewer may be created because the
// sequence of thread creation on the background threads is racing with the
// shutdown call.
- if (!terminating_ &&
+ if (!shutdown_called_ &&
!thread_being_created_ &&
threads_.size() < max_threads_ &&
waiting_thread_count_ == 0) {
@@ -597,7 +618,7 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
// The worker is assigned to the list when the thread actually starts, which
// will manage the memory of the pointer.
- new Worker(this, thread_number, thread_name_prefix_);
+ new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
@@ -610,13 +631,13 @@ bool SequencedWorkerPool::Inner::CanShutdown() const {
// SequencedWorkerPool --------------------------------------------------------
-SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
- const std::string& thread_name_prefix)
- : inner_(new Inner(max_threads, thread_name_prefix)) {
-}
+SequencedWorkerPool::SequencedWorkerPool(
+ size_t max_threads,
+ const std::string& thread_name_prefix)
+ : inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
+ max_threads, thread_name_prefix)) {}
-SequencedWorkerPool::~SequencedWorkerPool() {
-}
+SequencedWorkerPool::~SequencedWorkerPool() {}
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
return inner_->GetSequenceToken();
@@ -629,44 +650,47 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
bool SequencedWorkerPool::PostWorkerTask(
const tracked_objects::Location& from_here,
- const base::Closure& task) {
- return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task);
+ const Closure& task) {
+ return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
+ from_here, task);
}
bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
const tracked_objects::Location& from_here,
- const base::Closure& task,
+ const Closure& task,
WorkerShutdown shutdown_behavior) {
- return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task);
+ return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
+ from_here, task);
}
bool SequencedWorkerPool::PostSequencedWorkerTask(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
- const base::Closure& task) {
- return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN,
+ const Closure& task) {
+ return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
from_here, task);
}
bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
const std::string& token_name,
const tracked_objects::Location& from_here,
- const base::Closure& task) {
+ const Closure& task) {
DCHECK(!token_name.empty());
- return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task);
+ return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
+ from_here, task);
}
bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
- const base::Closure& task,
+ const Closure& task,
WorkerShutdown shutdown_behavior) {
- return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior,
+ return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
from_here, task);
}
void SequencedWorkerPool::FlushForTesting() {
- inner_->Flush();
+ inner_->FlushForTesting();
}
void SequencedWorkerPool::Shutdown() {
diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h
index f5770a7..c840f07 100644
--- a/base/threading/sequenced_worker_pool.h
+++ b/base/threading/sequenced_worker_pool.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -6,13 +6,18 @@
#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
#pragma once
+#include <cstddef>
#include <string>
-#include "base/callback.h"
-#include "base/memory/linked_ptr.h"
-#include "base/memory/ref_counted.h"
-#include "base/tracked_objects.h"
#include "base/base_export.h"
+#include "base/basictypes.h"
+#include "base/callback_forward.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+
+namespace tracked_objects {
+class Location;
+} // namespace tracked_objects
namespace base {
@@ -46,7 +51,8 @@ namespace base {
// not enforce shutdown semantics or allow us to specify how many worker
// threads to run. For the typical use case of random background work, we don't
// necessarily want to be super aggressive about creating threads.
-class BASE_EXPORT SequencedWorkerPool {
+class BASE_EXPORT SequencedWorkerPool
+ : public RefCountedThreadSafe<SequencedWorkerPool> {
public:
// Defines what should happen to a task posted to the worker pool on shutdown.
enum WorkerShutdown {
@@ -87,10 +93,10 @@ class BASE_EXPORT SequencedWorkerPool {
};
// Opaque identifier that defines sequencing of tasks posted to the worker
- // pool. See NewSequenceToken().
+ // pool.
class SequenceToken {
public:
- explicit SequenceToken() : id_(0) {}
+ SequenceToken() : id_(0) {}
~SequenceToken() {}
bool Equals(const SequenceToken& other) const {
@@ -100,7 +106,7 @@ class BASE_EXPORT SequencedWorkerPool {
private:
friend class SequencedWorkerPool;
- SequenceToken(int id) : id_(id) {}
+ explicit SequenceToken(int id) : id_(id) {}
int id_;
};
@@ -116,7 +122,6 @@ class BASE_EXPORT SequencedWorkerPool {
// and a prefix for the thread name to ad in debugging.
SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix);
- ~SequencedWorkerPool();
// Returns a unique token that can be used to sequence tasks posted to
// PostSequencedWorkerTask(). Valid tokens are alwys nonzero.
@@ -149,12 +154,12 @@ class BASE_EXPORT SequencedWorkerPool {
// Returns true if the task was posted successfully. This may fail during
// shutdown regardless of the specified ShutdownBehavior.
bool PostWorkerTask(const tracked_objects::Location& from_here,
- const base::Closure& task);
+ const Closure& task);
// Same as PostWorkerTask but allows specification of the shutdown behavior.
bool PostWorkerTaskWithShutdownBehavior(
const tracked_objects::Location& from_here,
- const base::Closure& task,
+ const Closure& task,
WorkerShutdown shutdown_behavior);
// Like PostWorkerTask above, but provides sequencing semantics. This means
@@ -170,20 +175,20 @@ class BASE_EXPORT SequencedWorkerPool {
// shutdown regardless of the specified ShutdownBehavior.
bool PostSequencedWorkerTask(SequenceToken sequence_token,
const tracked_objects::Location& from_here,
- const base::Closure& task);
+ const Closure& task);
// Like PostSequencedWorkerTask above, but allows you to specify a named
// token, which saves an extra call to GetNamedSequenceToken.
bool PostNamedSequencedWorkerTask(const std::string& token_name,
const tracked_objects::Location& from_here,
- const base::Closure& task);
+ const Closure& task);
// Same as PostSequencedWorkerTask but allows specification of the shutdown
// behavior.
bool PostSequencedWorkerTaskWithShutdownBehavior(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
- const base::Closure& task,
+ const Closure& task,
WorkerShutdown shutdown_behavior);
// Blocks until all pending tasks are complete. This should only be called in
@@ -206,13 +211,16 @@ class BASE_EXPORT SequencedWorkerPool {
void SetTestingObserver(TestingObserver* observer);
private:
+ friend class RefCountedThreadSafe<SequencedWorkerPool>;
+
class Inner;
class Worker;
- friend class Inner;
- friend class Worker;
+ ~SequencedWorkerPool();
- scoped_refptr<Inner> inner_;
+ // Avoid pulling in too many headers by putting everything into
+ // |inner_|.
+ const scoped_ptr<Inner> inner_;
DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
};
diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc
index 25736d0..a7e9e93 100644
--- a/base/threading/sequenced_worker_pool_unittest.cc
+++ b/base/threading/sequenced_worker_pool_unittest.cc
@@ -10,6 +10,7 @@
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
#include "base/threading/sequenced_worker_pool.h"
+#include "base/tracked_objects.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
@@ -143,9 +144,9 @@ class SequencedWorkerPoolTest : public testing::Test,
public SequencedWorkerPool::TestingObserver {
public:
SequencedWorkerPoolTest()
- : pool_(kNumWorkerThreads, "test"),
+ : pool_(new SequencedWorkerPool(kNumWorkerThreads, "test")),
tracker_(new TestTracker) {
- pool_.SetTestingObserver(this);
+ pool_->SetTestingObserver(this);
}
~SequencedWorkerPoolTest() {
}
@@ -153,10 +154,10 @@ class SequencedWorkerPoolTest : public testing::Test,
virtual void SetUp() {
}
virtual void TearDown() {
- pool_.Shutdown();
+ pool_->Shutdown();
}
- SequencedWorkerPool& pool() { return pool_; }
+ const scoped_refptr<SequencedWorkerPool>& pool() { return pool_; }
TestTracker* tracker() { return tracker_.get(); }
// Ensures that the given number of worker threads is created by adding
@@ -178,9 +179,9 @@ class SequencedWorkerPoolTest : public testing::Test,
// workers to be created.
ThreadBlocker blocker;
for (size_t i = 0; i < kNumWorkerThreads; i++) {
- pool().PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), -1, &blocker));
+ pool()->PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::BlockTask,
+ tracker(), -1, &blocker));
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
@@ -203,8 +204,8 @@ class SequencedWorkerPoolTest : public testing::Test,
before_wait_for_shutdown_.Run();
}
- SequencedWorkerPool pool_;
- scoped_refptr<TestTracker> tracker_;
+ const scoped_refptr<SequencedWorkerPool> pool_;
+ const scoped_refptr<TestTracker> tracker_;
};
// Checks that the given number of entries are in the tasks to complete of
@@ -228,13 +229,13 @@ void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
TEST_F(SequencedWorkerPoolTest, NamedTokens) {
const std::string name1("hello");
SequencedWorkerPool::SequenceToken token1 =
- pool().GetNamedSequenceToken(name1);
+ pool()->GetNamedSequenceToken(name1);
- SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
+ SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
const std::string name3("goodbye");
SequencedWorkerPool::SequenceToken token3 =
- pool().GetNamedSequenceToken(name3);
+ pool()->GetNamedSequenceToken(name3);
// All 3 tokens should be different.
EXPECT_FALSE(token1.Equals(token2));
@@ -243,24 +244,24 @@ TEST_F(SequencedWorkerPoolTest, NamedTokens) {
// Requesting the same name again should give the same value.
SequencedWorkerPool::SequenceToken token1again =
- pool().GetNamedSequenceToken(name1);
+ pool()->GetNamedSequenceToken(name1);
EXPECT_TRUE(token1.Equals(token1again));
SequencedWorkerPool::SequenceToken token3again =
- pool().GetNamedSequenceToken(name3);
+ pool()->GetNamedSequenceToken(name3);
EXPECT_TRUE(token3.Equals(token3again));
}
// Tests that posting a bunch of tasks (many more than the number of worker
// threads) runs them all.
TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
- pool().PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::SlowTask, tracker(), 0));
+ pool()->PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::SlowTask, tracker(), 0));
const size_t kNumTasks = 20;
for (size_t i = 1; i < kNumTasks; i++) {
- pool().PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), i));
+ pool()->PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), i));
}
std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
@@ -274,9 +275,9 @@ TEST_F(SequencedWorkerPoolTest, Sequence) {
const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
ThreadBlocker background_blocker;
for (size_t i = 0; i < kNumBackgroundTasks; i++) {
- pool().PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), i, &background_blocker));
+ pool()->PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::BlockTask,
+ tracker(), i, &background_blocker));
}
tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
@@ -285,22 +286,22 @@ TEST_F(SequencedWorkerPoolTest, Sequence) {
// is one worker thread free, the first task will start and then block, and
// the second task should be waiting.
ThreadBlocker blocker;
- SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken();
- pool().PostSequencedWorkerTask(
+ SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
+ pool()->PostSequencedWorkerTask(
token1, FROM_HERE,
base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
- pool().PostSequencedWorkerTask(
+ pool()->PostSequencedWorkerTask(
token1, FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 101));
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
// Create another two tasks as above with a different token. These will be
// blocked since there are no slots to run.
- SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
- pool().PostSequencedWorkerTask(
+ SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
+ pool()->PostSequencedWorkerTask(
token2, FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 200));
- pool().PostSequencedWorkerTask(
+ pool()->PostSequencedWorkerTask(
token2, FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 201));
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
@@ -335,22 +336,22 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
EnsureAllWorkersCreated();
ThreadBlocker blocker;
for (size_t i = 0; i < kNumWorkerThreads; i++) {
- pool().PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), i, &blocker));
+ pool()->PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::BlockTask,
+ tracker(), i, &blocker));
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
// Create some tasks with different shutdown modes.
- pool().PostWorkerTaskWithShutdownBehavior(
+ pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 100),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
- pool().PostWorkerTaskWithShutdownBehavior(
+ pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 101),
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
- pool().PostWorkerTaskWithShutdownBehavior(
+ pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::FastTask, tracker(), 102),
SequencedWorkerPool::BLOCK_SHUTDOWN);
@@ -360,7 +361,7 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
scoped_refptr<TestTracker>(tracker()), 0,
&blocker, kNumWorkerThreads);
- pool().Shutdown();
+ pool()->Shutdown();
std::vector<int> result = tracker()->WaitUntilTasksComplete(4);
@@ -378,7 +379,7 @@ TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
EnsureAllWorkersCreated();
ThreadBlocker blocker;
- pool().PostWorkerTaskWithShutdownBehavior(
+ pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE,
base::Bind(&TestTracker::BlockTask,
tracker(), 0, &blocker),
@@ -386,13 +387,13 @@ TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
tracker()->WaitUntilTasksBlocked(1);
// This should not block. If this test hangs, it means it failed.
- pool().Shutdown();
+ pool()->Shutdown();
// The task should not have completed yet.
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
// Posting more tasks should fail.
- EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior(
+ EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));