diff options
author | michaeln@chromium.org <michaeln@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-12-19 01:56:37 +0000 |
---|---|---|
committer | michaeln@chromium.org <michaeln@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-12-19 01:56:37 +0000 |
commit | 270f2cf4ed76055f185ca9c59d3e4b9316b758fc (patch) | |
tree | 86d19d18d92918b34262647a617f4a68b7adb0cf | |
parent | 5558281c413d933567b16e3aa35429fd769b8c87 (diff) | |
download | chromium_src-270f2cf4ed76055f185ca9c59d3e4b9316b758fc.zip chromium_src-270f2cf4ed76055f185ca9c59d3e4b9316b758fc.tar.gz chromium_src-270f2cf4ed76055f185ca9c59d3e4b9316b758fc.tar.bz2 |
SharedWorkerPool.Shutdown(int max_new_blocking_tasks)
BUG=158934,163096
Review URL: https://chromiumcodereview.appspot.com/11415246
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@173829 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 77 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 11 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_unittest.cc | 69 | ||||
-rw-r--r-- | chrome/service/service_process.cc | 7 | ||||
-rw-r--r-- | content/browser/browser_thread_impl.cc | 7 |
5 files changed, 145 insertions, 26 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 56f908b..5b73618 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -229,17 +229,24 @@ class SequencedWorkerPool::Worker : public SimpleThread { // SimpleThread implementation. This actually runs the background thread. virtual void Run() OVERRIDE; - void set_running_sequence(SequenceToken token) { + void set_running_task_info(SequenceToken token, + WorkerShutdown shutdown_behavior) { running_sequence_ = token; + running_shutdown_behavior_ = shutdown_behavior; } SequenceToken running_sequence() const { return running_sequence_; } + WorkerShutdown running_shutdown_behavior() const { + return running_shutdown_behavior_; + } + private: scoped_refptr<SequencedWorkerPool> worker_pool_; SequenceToken running_sequence_; + WorkerShutdown running_shutdown_behavior_; DISALLOW_COPY_AND_ASSIGN(Worker); }; @@ -280,7 +287,7 @@ class SequencedWorkerPool::Inner { int GetWorkSignalCountForTesting() const; - void Shutdown(); + void Shutdown(int max_blocking_tasks_after_shutdown); // Runs the worker loop on the background thread. void ThreadLoop(Worker* this_worker); @@ -303,6 +310,11 @@ class SequencedWorkerPool::Inner { // Called from within the lock, this returns the next sequence task number. int64 LockedGetNextSequenceTaskNumber(); + // Called from within the lock, returns the shutdown behavior of the task + // running on the currently executing worker thread. If invoked from a thread + // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. + WorkerShutdown LockedCurrentThreadShutdownBehavior() const; + // Gets new task. There are 3 cases depending on the return value: // // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should @@ -434,6 +446,10 @@ class SequencedWorkerPool::Inner { // allowed, though we may still be running existing tasks. bool shutdown_called_; + // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() + // has been called. + int max_blocking_tasks_after_shutdown_; + TestingObserver* const testing_observer_; DISALLOW_COPY_AND_ASSIGN(Inner); @@ -447,7 +463,8 @@ SequencedWorkerPool::Worker::Worker( const std::string& prefix) : SimpleThread( prefix + StringPrintf("Worker%d", thread_number).c_str()), - worker_pool_(worker_pool) { + worker_pool_(worker_pool), + running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { Start(); } @@ -487,6 +504,7 @@ SequencedWorkerPool::Inner::Inner( blocking_shutdown_pending_task_count_(0), trace_id_(0), shutdown_called_(false), + max_blocking_tasks_after_shutdown_(0), testing_observer_(observer) {} SequencedWorkerPool::Inner::~Inner() { @@ -536,8 +554,17 @@ bool SequencedWorkerPool::Inner::PostTask( int create_thread_id = 0; { AutoLock lock(lock_); - if (shutdown_called_) - return false; + if (shutdown_called_) { + if (shutdown_behavior != BLOCK_SHUTDOWN || + LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { + return false; + } + if (max_blocking_tasks_after_shutdown_ <= 0) { + DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; + return false; + } + max_blocking_tasks_after_shutdown_ -= 1; + } // The trace_id is used for identifying the task in about:tracing. sequenced.trace_id = trace_id_++; @@ -592,17 +619,16 @@ void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { SignalHasWork(); } -void SequencedWorkerPool::Inner::Shutdown() { - // Mark us as terminated and go through and drop all tasks that aren't - // required to run on shutdown. Since no new tasks will get posted once the - // terminated flag is set, this ensures that all remaining tasks are required - // for shutdown whenever the termianted_ flag is set. +void SequencedWorkerPool::Inner::Shutdown( + int max_new_blocking_tasks_after_shutdown) { + DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); { AutoLock lock(lock_); if (shutdown_called_) return; shutdown_called_ = true; + max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; // Tickle the threads. This will wake up a waiting one so it will know that // it can exit, which in turn will wake up any other waiting ones. @@ -672,8 +698,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { if (new_thread_id) FinishStartingAdditionalThread(new_thread_id); - this_worker->set_running_sequence( - SequenceToken(task.sequence_token_id)); + this_worker->set_running_task_info( + SequenceToken(task.sequence_token_id), task.shutdown_behavior); tracked_objects::TrackedTime start_time = tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); @@ -683,7 +709,8 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, start_time, tracked_objects::ThreadData::NowForEndOfRun()); - this_worker->set_running_sequence(SequenceToken()); + this_worker->set_running_task_info( + SequenceToken(), CONTINUE_ON_SHUTDOWN); // Make sure our task is erased outside the lock for the same reason // we do this with delete_these_oustide_lock. @@ -692,10 +719,13 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { DidRunWorkerTask(task); // Must be done inside the lock. } else { // When we're terminating and there's no more work, we can - // shut down. You can't get more tasks posted once - // shutdown_called_ is set. There may be some tasks stuck - // behind running ones with the same sequence token, but - // additional threads won't help this case. + // shut down, other workers can complete any pending or new tasks. + // We can get additional tasks posted after shutdown_called_ is set + // but only worker threads are allowed to post tasks at that time, and + // the workers responsible for posting those tasks will be available + // to run them. Also, there may be some tasks stuck behind running + // ones with the same sequence token, but additional threads won't + // help this case. if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) break; @@ -754,6 +784,15 @@ int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { return next_sequence_task_number_++; } +SequencedWorkerPool::WorkerShutdown +SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { + lock_.AssertAcquired(); + ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); + if (found == threads_.end()) + return CONTINUE_ON_SHUTDOWN; + return found->second->running_shutdown_behavior(); +} + SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( SequencedTask* task, TimeDelta* wait_time, @@ -1114,9 +1153,9 @@ void SequencedWorkerPool::SignalHasWorkForTesting() { inner_->SignalHasWorkForTesting(); } -void SequencedWorkerPool::Shutdown() { +void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { DCHECK(constructor_message_loop_->BelongsToCurrentThread()); - inner_->Shutdown(); + inner_->Shutdown(max_new_blocking_tasks_after_shutdown); } } // namespace base diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index 17361b2..9c7108e 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -304,7 +304,16 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // After this call, subsequent calls to post tasks will fail. // // Must be called from the same thread this object was constructed on. - void Shutdown(); + void Shutdown() { Shutdown(0); } + + // A variant that allows an arbitrary number of new blocking tasks to + // be posted during shutdown from within tasks that execute during shutdown. + // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if + // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once + // the limit is reached, subsequent calls to post task fail in all cases. + // + // Must be called from the same thread this object was constructed on. + void Shutdown(int max_new_blocking_tasks_after_shutdown); protected: virtual ~SequencedWorkerPool(); diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc index 2dcda60..79f08c8 100644 --- a/base/threading/sequenced_worker_pool_unittest.cc +++ b/base/threading/sequenced_worker_pool_unittest.cc @@ -97,6 +97,22 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> { SignalWorkerDone(id); } + void PostAdditionalTasks(int id, SequencedWorkerPool* pool) { + Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); + EXPECT_FALSE( + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, fast_task, + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + EXPECT_FALSE( + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, fast_task, + SequencedWorkerPool::SKIP_ON_SHUTDOWN)); + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, fast_task, + SequencedWorkerPool::BLOCK_SHUTDOWN); + SignalWorkerDone(id); + } + // Waits until the given number of tasks have started executing. void WaitUntilTasksBlocked(size_t count) { { @@ -377,20 +393,21 @@ TEST_F(SequencedWorkerPoolTest, IgnoresAfterShutdown) { } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); - // Shutdown the worker pool. This should discard all non-blocking tasks. SetWillWaitForShutdownCallback( base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr<TestTracker>(tracker()), 0, &blocker, kNumWorkerThreads)); - pool()->Shutdown(); + + // Shutdown the worker pool. This should discard all non-blocking tasks. + const int kMaxNewBlockingTasksAfterShutdown = 100; + pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown); int old_has_work_call_count = has_work_call_count(); std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumWorkerThreads); - // The kNumWorkerThread items should have completed, in no particular - // order. + // The kNumWorkerThread items should have completed, in no particular order. ASSERT_EQ(kNumWorkerThreads, result.size()); for (size_t i = 0; i < kNumWorkerThreads; i++) { EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != @@ -414,6 +431,50 @@ TEST_F(SequencedWorkerPoolTest, IgnoresAfterShutdown) { ASSERT_EQ(old_has_work_call_count, has_work_call_count()); } +TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { + // Test that <n> new blocking tasks are allowed provided they're posted + // by a running tasks. + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + + // Start tasks to take all the threads and block them. + const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads); + for (int i = 0; i < kNumBlockTasks; ++i) { + EXPECT_TRUE(pool()->PostWorkerTask( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Queue up shutdown blocking tasks behind those which will attempt to post + // additional tasks when run, PostAdditionalTasks attemtps to post 3 + // new FastTasks, one for each shutdown_behavior. + const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); + for (int i = 0; i < kNumQueuedTasks; ++i) { + EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool()), + SequencedWorkerPool::BLOCK_SHUTDOWN)); + } + + // Setup to open the floodgates from within Shutdown(). + SetWillWaitForShutdownCallback( + base::Bind(&EnsureTasksToCompleteCountAndUnblock, + scoped_refptr<TestTracker>(tracker()), + 0, &blocker, kNumBlockTasks)); + + // Allow half of the additional blocking tasks thru. + const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2; + pool()->Shutdown(kNumNewBlockingTasksToAllow); + + // Ensure that the correct number of tasks actually got run. + tracker()->WaitUntilTasksComplete(static_cast<size_t>( + kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow)); + + // Clean up the task IDs we added and go home. + tracker()->ClearCompleteSequence(); +} + // Tests that unrun tasks are discarded properly according to their shutdown // mode. TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { diff --git a/chrome/service/service_process.cc b/chrome/service/service_process.cc index f7aa291..8af5ddb 100644 --- a/chrome/service/service_process.cc +++ b/chrome/service/service_process.cc @@ -224,7 +224,12 @@ bool ServiceProcess::Teardown() { file_thread_.reset(); if (blocking_pool_.get()) { - blocking_pool_->Shutdown(); + // The goal is to make it impossible for chrome to 'infinite loop' during + // shutdown, but to reasonably expect that all BLOCKING_SHUTDOWN tasks + // queued during shutdown get run. There's nothing particularly scientific + // about the number chosen. + const int kMaxNewShutdownBlockingTasks = 1000; + blocking_pool_->Shutdown(kMaxNewShutdownBlockingTasks); blocking_pool_ = NULL; } diff --git a/content/browser/browser_thread_impl.cc b/content/browser/browser_thread_impl.cc index fff9625..f61c0a2 100644 --- a/content/browser/browser_thread_impl.cc +++ b/content/browser/browser_thread_impl.cc @@ -78,8 +78,13 @@ BrowserThreadImpl::BrowserThreadImpl(ID identifier, // static void BrowserThreadImpl::ShutdownThreadPool() { + // The goal is to make it impossible for chrome to 'infinite loop' during + // shutdown, but to reasonably expect that all BLOCKING_SHUTDOWN tasks queued + // during shutdown get run. There's nothing particularly scientific about the + // number chosen. + const int kMaxNewShutdownBlockingTasks = 1000; BrowserThreadGlobals& globals = g_globals.Get(); - globals.blocking_pool->Shutdown(); + globals.blocking_pool->Shutdown(kMaxNewShutdownBlockingTasks); } void BrowserThreadImpl::Init() { |