summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormichaeln@chromium.org <michaeln@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-12-19 01:56:37 +0000
committermichaeln@chromium.org <michaeln@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-12-19 01:56:37 +0000
commit270f2cf4ed76055f185ca9c59d3e4b9316b758fc (patch)
tree86d19d18d92918b34262647a617f4a68b7adb0cf
parent5558281c413d933567b16e3aa35429fd769b8c87 (diff)
downloadchromium_src-270f2cf4ed76055f185ca9c59d3e4b9316b758fc.zip
chromium_src-270f2cf4ed76055f185ca9c59d3e4b9316b758fc.tar.gz
chromium_src-270f2cf4ed76055f185ca9c59d3e4b9316b758fc.tar.bz2
SharedWorkerPool.Shutdown(int max_new_blocking_tasks)
BUG=158934,163096 Review URL: https://chromiumcodereview.appspot.com/11415246 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@173829 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/threading/sequenced_worker_pool.cc77
-rw-r--r--base/threading/sequenced_worker_pool.h11
-rw-r--r--base/threading/sequenced_worker_pool_unittest.cc69
-rw-r--r--chrome/service/service_process.cc7
-rw-r--r--content/browser/browser_thread_impl.cc7
5 files changed, 145 insertions, 26 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 56f908b..5b73618 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -229,17 +229,24 @@ class SequencedWorkerPool::Worker : public SimpleThread {
// SimpleThread implementation. This actually runs the background thread.
virtual void Run() OVERRIDE;
- void set_running_sequence(SequenceToken token) {
+ void set_running_task_info(SequenceToken token,
+ WorkerShutdown shutdown_behavior) {
running_sequence_ = token;
+ running_shutdown_behavior_ = shutdown_behavior;
}
SequenceToken running_sequence() const {
return running_sequence_;
}
+ WorkerShutdown running_shutdown_behavior() const {
+ return running_shutdown_behavior_;
+ }
+
private:
scoped_refptr<SequencedWorkerPool> worker_pool_;
SequenceToken running_sequence_;
+ WorkerShutdown running_shutdown_behavior_;
DISALLOW_COPY_AND_ASSIGN(Worker);
};
@@ -280,7 +287,7 @@ class SequencedWorkerPool::Inner {
int GetWorkSignalCountForTesting() const;
- void Shutdown();
+ void Shutdown(int max_blocking_tasks_after_shutdown);
// Runs the worker loop on the background thread.
void ThreadLoop(Worker* this_worker);
@@ -303,6 +310,11 @@ class SequencedWorkerPool::Inner {
// Called from within the lock, this returns the next sequence task number.
int64 LockedGetNextSequenceTaskNumber();
+ // Called from within the lock, returns the shutdown behavior of the task
+ // running on the currently executing worker thread. If invoked from a thread
+ // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
+ WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
+
// Gets new task. There are 3 cases depending on the return value:
//
// 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
@@ -434,6 +446,10 @@ class SequencedWorkerPool::Inner {
// allowed, though we may still be running existing tasks.
bool shutdown_called_;
+ // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
+ // has been called.
+ int max_blocking_tasks_after_shutdown_;
+
TestingObserver* const testing_observer_;
DISALLOW_COPY_AND_ASSIGN(Inner);
@@ -447,7 +463,8 @@ SequencedWorkerPool::Worker::Worker(
const std::string& prefix)
: SimpleThread(
prefix + StringPrintf("Worker%d", thread_number).c_str()),
- worker_pool_(worker_pool) {
+ worker_pool_(worker_pool),
+ running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
Start();
}
@@ -487,6 +504,7 @@ SequencedWorkerPool::Inner::Inner(
blocking_shutdown_pending_task_count_(0),
trace_id_(0),
shutdown_called_(false),
+ max_blocking_tasks_after_shutdown_(0),
testing_observer_(observer) {}
SequencedWorkerPool::Inner::~Inner() {
@@ -536,8 +554,17 @@ bool SequencedWorkerPool::Inner::PostTask(
int create_thread_id = 0;
{
AutoLock lock(lock_);
- if (shutdown_called_)
- return false;
+ if (shutdown_called_) {
+ if (shutdown_behavior != BLOCK_SHUTDOWN ||
+ LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
+ return false;
+ }
+ if (max_blocking_tasks_after_shutdown_ <= 0) {
+ DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
+ return false;
+ }
+ max_blocking_tasks_after_shutdown_ -= 1;
+ }
// The trace_id is used for identifying the task in about:tracing.
sequenced.trace_id = trace_id_++;
@@ -592,17 +619,16 @@ void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
SignalHasWork();
}
-void SequencedWorkerPool::Inner::Shutdown() {
- // Mark us as terminated and go through and drop all tasks that aren't
- // required to run on shutdown. Since no new tasks will get posted once the
- // terminated flag is set, this ensures that all remaining tasks are required
- // for shutdown whenever the termianted_ flag is set.
+void SequencedWorkerPool::Inner::Shutdown(
+ int max_new_blocking_tasks_after_shutdown) {
+ DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
{
AutoLock lock(lock_);
if (shutdown_called_)
return;
shutdown_called_ = true;
+ max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
// Tickle the threads. This will wake up a waiting one so it will know that
// it can exit, which in turn will wake up any other waiting ones.
@@ -672,8 +698,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
if (new_thread_id)
FinishStartingAdditionalThread(new_thread_id);
- this_worker->set_running_sequence(
- SequenceToken(task.sequence_token_id));
+ this_worker->set_running_task_info(
+ SequenceToken(task.sequence_token_id), task.shutdown_behavior);
tracked_objects::TrackedTime start_time =
tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
@@ -683,7 +709,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
start_time, tracked_objects::ThreadData::NowForEndOfRun());
- this_worker->set_running_sequence(SequenceToken());
+ this_worker->set_running_task_info(
+ SequenceToken(), CONTINUE_ON_SHUTDOWN);
// Make sure our task is erased outside the lock for the same reason
// we do this with delete_these_oustide_lock.
@@ -692,10 +719,13 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
DidRunWorkerTask(task); // Must be done inside the lock.
} else {
// When we're terminating and there's no more work, we can
- // shut down. You can't get more tasks posted once
- // shutdown_called_ is set. There may be some tasks stuck
- // behind running ones with the same sequence token, but
- // additional threads won't help this case.
+ // shut down, other workers can complete any pending or new tasks.
+ // We can get additional tasks posted after shutdown_called_ is set
+ // but only worker threads are allowed to post tasks at that time, and
+ // the workers responsible for posting those tasks will be available
+ // to run them. Also, there may be some tasks stuck behind running
+ // ones with the same sequence token, but additional threads won't
+ // help this case.
if (shutdown_called_ &&
blocking_shutdown_pending_task_count_ == 0)
break;
@@ -754,6 +784,15 @@ int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
return next_sequence_task_number_++;
}
+SequencedWorkerPool::WorkerShutdown
+SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
+ lock_.AssertAcquired();
+ ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
+ if (found == threads_.end())
+ return CONTINUE_ON_SHUTDOWN;
+ return found->second->running_shutdown_behavior();
+}
+
SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
@@ -1114,9 +1153,9 @@ void SequencedWorkerPool::SignalHasWorkForTesting() {
inner_->SignalHasWorkForTesting();
}
-void SequencedWorkerPool::Shutdown() {
+void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
DCHECK(constructor_message_loop_->BelongsToCurrentThread());
- inner_->Shutdown();
+ inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
}
} // namespace base
diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h
index 17361b2..9c7108e 100644
--- a/base/threading/sequenced_worker_pool.h
+++ b/base/threading/sequenced_worker_pool.h
@@ -304,7 +304,16 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// After this call, subsequent calls to post tasks will fail.
//
// Must be called from the same thread this object was constructed on.
- void Shutdown();
+ void Shutdown() { Shutdown(0); }
+
+ // A variant that allows an arbitrary number of new blocking tasks to
+ // be posted during shutdown from within tasks that execute during shutdown.
+ // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if
+ // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once
+ // the limit is reached, subsequent calls to post task fail in all cases.
+ //
+ // Must be called from the same thread this object was constructed on.
+ void Shutdown(int max_new_blocking_tasks_after_shutdown);
protected:
virtual ~SequencedWorkerPool();
diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc
index 2dcda60..79f08c8 100644
--- a/base/threading/sequenced_worker_pool_unittest.cc
+++ b/base/threading/sequenced_worker_pool_unittest.cc
@@ -97,6 +97,22 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
SignalWorkerDone(id);
}
+ void PostAdditionalTasks(int id, SequencedWorkerPool* pool) {
+ Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
+ EXPECT_FALSE(
+ pool->PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE, fast_task,
+ SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
+ EXPECT_FALSE(
+ pool->PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE, fast_task,
+ SequencedWorkerPool::SKIP_ON_SHUTDOWN));
+ pool->PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE, fast_task,
+ SequencedWorkerPool::BLOCK_SHUTDOWN);
+ SignalWorkerDone(id);
+ }
+
// Waits until the given number of tasks have started executing.
void WaitUntilTasksBlocked(size_t count) {
{
@@ -377,20 +393,21 @@ TEST_F(SequencedWorkerPoolTest, IgnoresAfterShutdown) {
}
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
- // Shutdown the worker pool. This should discard all non-blocking tasks.
SetWillWaitForShutdownCallback(
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
scoped_refptr<TestTracker>(tracker()), 0,
&blocker, kNumWorkerThreads));
- pool()->Shutdown();
+
+ // Shutdown the worker pool. This should discard all non-blocking tasks.
+ const int kMaxNewBlockingTasksAfterShutdown = 100;
+ pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
int old_has_work_call_count = has_work_call_count();
std::vector<int> result =
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
- // The kNumWorkerThread items should have completed, in no particular
- // order.
+ // The kNumWorkerThread items should have completed, in no particular order.
ASSERT_EQ(kNumWorkerThreads, result.size());
for (size_t i = 0; i < kNumWorkerThreads; i++) {
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
@@ -414,6 +431,50 @@ TEST_F(SequencedWorkerPoolTest, IgnoresAfterShutdown) {
ASSERT_EQ(old_has_work_call_count, has_work_call_count());
}
+TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
+ // Test that <n> new blocking tasks are allowed provided they're posted
+ // by a running tasks.
+ EnsureAllWorkersCreated();
+ ThreadBlocker blocker;
+
+ // Start tasks to take all the threads and block them.
+ const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
+ for (int i = 0; i < kNumBlockTasks; ++i) {
+ EXPECT_TRUE(pool()->PostWorkerTask(
+ FROM_HERE,
+ base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
+ }
+ tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
+
+ // Queue up shutdown blocking tasks behind those which will attempt to post
+ // additional tasks when run, PostAdditionalTasks attemtps to post 3
+ // new FastTasks, one for each shutdown_behavior.
+ const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
+ for (int i = 0; i < kNumQueuedTasks; ++i) {
+ EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE,
+ base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool()),
+ SequencedWorkerPool::BLOCK_SHUTDOWN));
+ }
+
+ // Setup to open the floodgates from within Shutdown().
+ SetWillWaitForShutdownCallback(
+ base::Bind(&EnsureTasksToCompleteCountAndUnblock,
+ scoped_refptr<TestTracker>(tracker()),
+ 0, &blocker, kNumBlockTasks));
+
+ // Allow half of the additional blocking tasks thru.
+ const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
+ pool()->Shutdown(kNumNewBlockingTasksToAllow);
+
+ // Ensure that the correct number of tasks actually got run.
+ tracker()->WaitUntilTasksComplete(static_cast<size_t>(
+ kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
+
+ // Clean up the task IDs we added and go home.
+ tracker()->ClearCompleteSequence();
+}
+
// Tests that unrun tasks are discarded properly according to their shutdown
// mode.
TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
diff --git a/chrome/service/service_process.cc b/chrome/service/service_process.cc
index f7aa291..8af5ddb 100644
--- a/chrome/service/service_process.cc
+++ b/chrome/service/service_process.cc
@@ -224,7 +224,12 @@ bool ServiceProcess::Teardown() {
file_thread_.reset();
if (blocking_pool_.get()) {
- blocking_pool_->Shutdown();
+ // The goal is to make it impossible for chrome to 'infinite loop' during
+ // shutdown, but to reasonably expect that all BLOCKING_SHUTDOWN tasks
+ // queued during shutdown get run. There's nothing particularly scientific
+ // about the number chosen.
+ const int kMaxNewShutdownBlockingTasks = 1000;
+ blocking_pool_->Shutdown(kMaxNewShutdownBlockingTasks);
blocking_pool_ = NULL;
}
diff --git a/content/browser/browser_thread_impl.cc b/content/browser/browser_thread_impl.cc
index fff9625..f61c0a2 100644
--- a/content/browser/browser_thread_impl.cc
+++ b/content/browser/browser_thread_impl.cc
@@ -78,8 +78,13 @@ BrowserThreadImpl::BrowserThreadImpl(ID identifier,
// static
void BrowserThreadImpl::ShutdownThreadPool() {
+ // The goal is to make it impossible for chrome to 'infinite loop' during
+ // shutdown, but to reasonably expect that all BLOCKING_SHUTDOWN tasks queued
+ // during shutdown get run. There's nothing particularly scientific about the
+ // number chosen.
+ const int kMaxNewShutdownBlockingTasks = 1000;
BrowserThreadGlobals& globals = g_globals.Get();
- globals.blocking_pool->Shutdown();
+ globals.blocking_pool->Shutdown(kMaxNewShutdownBlockingTasks);
}
void BrowserThreadImpl::Init() {