diff options
author | erikchen <erikchen@chromium.org> | 2015-03-30 18:06:46 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-03-31 01:07:28 +0000 |
commit | 8982d900e70f452d048fcc1bd7ea1b8cf5453e41 (patch) | |
tree | 8bd2b5d77f7d0696ae28f5f8b2d6a47389c31abc /base/threading | |
parent | 80d99738f74feb34aed36c5ec1c750d3418680b0 (diff) | |
download | chromium_src-8982d900e70f452d048fcc1bd7ea1b8cf5453e41.zip chromium_src-8982d900e70f452d048fcc1bd7ea1b8cf5453e41.tar.gz chromium_src-8982d900e70f452d048fcc1bd7ea1b8cf5453e41.tar.bz2 |
Base: Change task posting logic to SequencedWorkerPool during shutdown.
This CL allows consumers to post blocking tasks to SequencedWorkerPool during
shutdown. Previously, tasks could only be posted from within the context of an
already running task whose shutdown behavior was BLOCKING_SHUTDOWN.
The previous logic didn't fit the use case of all consumers of
SequencedWorkerPools. To give one example: the SQLitePersistentCookieStore
needs to post some cleanup to the task queue during shutdown.
BUG=
Review URL: https://codereview.chromium.org/1006423006
Cr-Commit-Position: refs/heads/master@{#322936}
Diffstat (limited to 'base/threading')
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 73 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 8 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_unittest.cc | 40 |
3 files changed, 85 insertions, 36 deletions
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 52b178b..6f4a248 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -239,24 +239,41 @@ class SequencedWorkerPool::Worker : public SimpleThread { // SimpleThread implementation. This actually runs the background thread. void Run() override; + // Indicates that a task is about to be run. The parameters provide + // additional metainformation about the task being run. void set_running_task_info(SequenceToken token, WorkerShutdown shutdown_behavior) { - running_sequence_ = token; - running_shutdown_behavior_ = shutdown_behavior; + is_processing_task_ = true; + task_sequence_token_ = token; + task_shutdown_behavior_ = shutdown_behavior; } - SequenceToken running_sequence() const { - return running_sequence_; + // Indicates that the task has finished running. + void reset_running_task_info() { is_processing_task_ = false; } + + // Whether the worker is processing a task. + bool is_processing_task() { return is_processing_task_; } + + SequenceToken task_sequence_token() const { + DCHECK(is_processing_task_); + return task_sequence_token_; } - WorkerShutdown running_shutdown_behavior() const { - return running_shutdown_behavior_; + WorkerShutdown task_shutdown_behavior() const { + DCHECK(is_processing_task_); + return task_shutdown_behavior_; } private: scoped_refptr<SequencedWorkerPool> worker_pool_; - SequenceToken running_sequence_; - WorkerShutdown running_shutdown_behavior_; + // The sequence token of the task being processed. Only valid when + // is_processing_task_ is true. + SequenceToken task_sequence_token_; + // The shutdown behavior of the task being processed. Only valid when + // is_processing_task_ is true. + WorkerShutdown task_shutdown_behavior_; + // Whether the Worker is processing a task. + bool is_processing_task_; DISALLOW_COPY_AND_ASSIGN(Worker); }; @@ -326,11 +343,6 @@ class SequencedWorkerPool::Inner { // Called from within the lock, this returns the next sequence task number. int64 LockedGetNextSequenceTaskNumber(); - // Called from within the lock, returns the shutdown behavior of the task - // running on the currently executing worker thread. If invoked from a thread - // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. - WorkerShutdown LockedCurrentThreadShutdownBehavior() const; - // Gets new task. There are 3 cases depending on the return value: // // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should @@ -483,7 +495,8 @@ SequencedWorkerPool::Worker::Worker( const std::string& prefix) : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), worker_pool_(worker_pool), - running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { + task_shutdown_behavior_(BLOCK_SHUTDOWN), + is_processing_task_(false) { Start(); } @@ -497,7 +510,7 @@ void SequencedWorkerPool::Worker::Run() { // Store a pointer to the running sequence in thread local storage for // static function access. - g_lazy_tls_ptr.Get().Set(&running_sequence_); + g_lazy_tls_ptr.Get().Set(&task_sequence_token_); // Just jump back to the Inner object to run the thread, since it has all the // tracking information and queues. It might be more natural to implement @@ -583,10 +596,19 @@ bool SequencedWorkerPool::Inner::PostTask( { AutoLock lock(lock_); if (shutdown_called_) { - if (shutdown_behavior != BLOCK_SHUTDOWN || - LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { + // Don't allow a new task to be posted if it doesn't block shutdown. + if (shutdown_behavior != BLOCK_SHUTDOWN) + return false; + + // If the current thread is running a task, and that task doesn't block + // shutdown, then it shouldn't be allowed to post any more tasks. + ThreadMap::const_iterator found = + threads_.find(PlatformThread::CurrentId()); + if (found != threads_.end() && found->second->is_processing_task() && + found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) { return false; } + if (max_blocking_tasks_after_shutdown_ <= 0) { DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; return false; @@ -635,7 +657,8 @@ bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); if (found == threads_.end()) return false; - return sequence_token.Equals(found->second->running_sequence()); + return found->second->is_processing_task() && + sequence_token.Equals(found->second->task_sequence_token()); } // See https://code.google.com/p/chromium/issues/detail?id=168415 @@ -765,13 +788,12 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { // Make sure our task is erased outside the lock for the // same reason we do this with delete_these_oustide_lock. - // Also, do it before calling set_running_task_info() so + // Also, do it before calling reset_running_task_info() so // that sequence-checking from within the task's destructor // still works. task.task = Closure(); - this_worker->set_running_task_info( - SequenceToken(), CONTINUE_ON_SHUTDOWN); + this_worker->reset_running_task_info(); } DidRunWorkerTask(task); // Must be done inside the lock. } else if (cleanup_state_ == CLEANUP_RUNNING) { @@ -904,15 +926,6 @@ int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { return next_sequence_task_number_++; } -SequencedWorkerPool::WorkerShutdown -SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { - lock_.AssertAcquired(); - ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); - if (found == threads_.end()) - return CONTINUE_ON_SHUTDOWN; - return found->second->running_shutdown_behavior(); -} - SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( SequencedTask* task, TimeDelta* wait_time, diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index 63c6204..0b6c5f9 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -320,12 +320,10 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // Must be called from the same thread this object was constructed on. void Shutdown() { Shutdown(0); } - // A variant that allows an arbitrary number of new blocking tasks to - // be posted during shutdown from within tasks that execute during shutdown. - // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if - // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once + // A variant that allows an arbitrary number of new blocking tasks to be + // posted during shutdown. The tasks cannot be posted within the execution + // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once // the limit is reached, subsequent calls to post task fail in all cases. - // // Must be called from the same thread this object was constructed on. void Shutdown(int max_new_blocking_tasks_after_shutdown); diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc index 5d0880c..c12156e 100644 --- a/base/threading/sequenced_worker_pool_unittest.cc +++ b/base/threading/sequenced_worker_pool_unittest.cc @@ -159,6 +159,16 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> { FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN); } + void PostBlockingTaskThenUnblockThreads( + const scoped_refptr<SequencedWorkerPool>& pool, + ThreadBlocker* blocker, + size_t threads_to_wake) { + Closure arbitrary_task = base::Bind(&TestTracker::FastTask, this, 0); + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, arbitrary_task, SequencedWorkerPool::BLOCK_SHUTDOWN); + blocker->Unblock(threads_to_wake); + } + // Waits until the given number of tasks have started executing. void WaitUntilTasksBlocked(size_t count) { { @@ -287,7 +297,7 @@ class SequencedWorkerPoolTest : public testing::Test { // Checks that the given number of entries are in the tasks to complete of // the given tracker, and then signals the given event the given number of -// times. This is used to wakt up blocked background threads before blocking +// times. This is used to wake up blocked background threads before blocking // on shutdown. void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, size_t expected_tasks_to_complete, @@ -583,6 +593,34 @@ TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { tracker()->ClearCompleteSequence(); } +// Tests that blocking tasks can still be posted during shutdown, as long as +// the task is not being posted within the context of a running task. +TEST_F(SequencedWorkerPoolTest, + AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) { + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + + // Start tasks to take all the threads and block them. + const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads); + for (int i = 0; i < kNumBlockTasks; ++i) { + EXPECT_TRUE(pool()->PostWorkerTask( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Setup to open the floodgates from within Shutdown(). + SetWillWaitForShutdownCallback( + base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads, + scoped_refptr<TestTracker>(tracker()), pool(), &blocker, + kNumWorkerThreads)); + pool()->Shutdown(kNumWorkerThreads + 1); + + // Ensure that the correct number of tasks actually got run. + tracker()->WaitUntilTasksComplete(static_cast<size_t>(kNumWorkerThreads + 1)); + tracker()->ClearCompleteSequence(); +} + // Tests that unrun tasks are discarded properly according to their shutdown // mode. TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { |