// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/threading/sequenced_worker_pool.h" #include #include #include "base/bind.h" #include "base/compiler_specific.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop/message_loop.h" #include "base/sequence_checker_impl.h" #include "base/stl_util.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" #include "base/test/sequenced_task_runner_test_template.h" #include "base/test/sequenced_worker_pool_owner.h" #include "base/test/task_runner_test_template.h" #include "base/test/test_timeouts.h" #include "base/threading/platform_thread.h" #include "base/time/time.h" #include "base/tracked_objects.h" #include "testing/gtest/include/gtest/gtest.h" namespace base { // IMPORTANT NOTE: // // Many of these tests have failure modes where they'll hang forever. These // tests should not be flaky, and hanging indicates a type of failure. Do not // mark as flaky if they're hanging, it's likely an actual bug. namespace { const size_t kNumWorkerThreads = 3; // Allows a number of threads to all be blocked on the same event, and // provides a way to unblock a certain number of them. class ThreadBlocker { public: ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {} void Block() { { base::AutoLock lock(lock_); while (unblock_counter_ == 0) cond_var_.Wait(); unblock_counter_--; } cond_var_.Signal(); } void Unblock(size_t count) { { base::AutoLock lock(lock_); DCHECK_EQ(unblock_counter_, 0u); unblock_counter_ = count; } cond_var_.Signal(); } private: base::Lock lock_; base::ConditionVariable cond_var_; size_t unblock_counter_; }; class DestructionDeadlockChecker : public base::RefCountedThreadSafe { public: explicit DestructionDeadlockChecker(scoped_refptr pool) : pool_(std::move(pool)) {} protected: virtual ~DestructionDeadlockChecker() { // This method should not deadlock. pool_->RunsTasksOnCurrentThread(); } private: scoped_refptr pool_; friend class base::RefCountedThreadSafe; }; class TestTracker : public base::RefCountedThreadSafe { public: TestTracker() : lock_(), cond_var_(&lock_), started_events_(0) { } // Each of these tasks appends the argument to the complete sequence vector // so calling code can see what order they finished in. void FastTask(int id) { SignalWorkerDone(id); } void SlowTask(int id) { base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1)); SignalWorkerDone(id); } void BlockTask(int id, ThreadBlocker* blocker) { // Note that this task has started and signal anybody waiting for that // to happen. { base::AutoLock lock(lock_); started_events_++; } cond_var_.Signal(); blocker->Block(); SignalWorkerDone(id); } void PostAdditionalTasks( int id, SequencedWorkerPool* pool, bool expected_return_value) { Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); EXPECT_EQ(expected_return_value, pool->PostWorkerTaskWithShutdownBehavior( FROM_HERE, fast_task, SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); EXPECT_EQ(expected_return_value, pool->PostWorkerTaskWithShutdownBehavior( FROM_HERE, fast_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN)); pool->PostWorkerTaskWithShutdownBehavior( FROM_HERE, fast_task, SequencedWorkerPool::BLOCK_SHUTDOWN); SignalWorkerDone(id); } // This task posts itself back onto the SequencedWorkerPool before it // finishes running. Each instance of the task maintains a strong reference // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only // destroyed when the task is destroyed without being run, which only happens // during destruction of the SequencedWorkerPool. void PostRepostingTask( const scoped_refptr& pool, const scoped_refptr& checker) { Closure reposting_task = base::Bind(&TestTracker::PostRepostingTask, this, pool, checker); pool->PostWorkerTaskWithShutdownBehavior( FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN); } // This task reposts itself back onto the SequencedWorkerPool before it // finishes running. void PostRepostingBlockingTask( const scoped_refptr& pool, const SequencedWorkerPool::SequenceToken& token) { Closure reposting_task = base::Bind(&TestTracker::PostRepostingBlockingTask, this, pool, token); pool->PostSequencedWorkerTaskWithShutdownBehavior(token, FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN); } void PostBlockingTaskThenUnblockThreads( const scoped_refptr& pool, ThreadBlocker* blocker, size_t threads_to_wake) { Closure arbitrary_task = base::Bind(&TestTracker::FastTask, this, 0); pool->PostWorkerTaskWithShutdownBehavior( FROM_HERE, arbitrary_task, SequencedWorkerPool::BLOCK_SHUTDOWN); blocker->Unblock(threads_to_wake); } // Waits until the given number of tasks have started executing. void WaitUntilTasksBlocked(size_t count) { { base::AutoLock lock(lock_); while (started_events_ < count) cond_var_.Wait(); } cond_var_.Signal(); } // Blocks the current thread until at least the given number of tasks are in // the completed vector, and then returns a copy. std::vector WaitUntilTasksComplete(size_t num_tasks) { std::vector ret; { base::AutoLock lock(lock_); while (complete_sequence_.size() < num_tasks) cond_var_.Wait(); ret = complete_sequence_; } cond_var_.Signal(); return ret; } size_t GetTasksCompletedCount() { base::AutoLock lock(lock_); return complete_sequence_.size(); } void ClearCompleteSequence() { base::AutoLock lock(lock_); complete_sequence_.clear(); started_events_ = 0; } private: friend class base::RefCountedThreadSafe; ~TestTracker() {} void SignalWorkerDone(int id) { { base::AutoLock lock(lock_); complete_sequence_.push_back(id); } cond_var_.Signal(); } // Protects the complete_sequence. base::Lock lock_; base::ConditionVariable cond_var_; // Protected by lock_. std::vector complete_sequence_; // Counter of the number of "block" workers that have started. size_t started_events_; }; class SequencedWorkerPoolTest : public testing::Test { public: SequencedWorkerPoolTest() : tracker_(new TestTracker) { ResetPool(); } const scoped_refptr& pool() { return pool_owner_->pool(); } TestTracker* tracker() { return tracker_.get(); } // Destroys the SequencedWorkerPool instance, blocking until it is fully shut // down, and creates a new instance. void ResetPool() { pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test")); } void SetWillWaitForShutdownCallback(const Closure& callback) { pool_owner_->SetWillWaitForShutdownCallback(callback); } // Ensures that the given number of worker threads is created by adding // tasks and waiting until they complete. Worker thread creation is // serialized, can happen on background threads asynchronously, and doesn't // happen any more at shutdown. This means that if a test posts a bunch of // tasks and calls shutdown, fewer workers will be created than the test may // expect. // // This function ensures that this condition can't happen so tests can make // assumptions about the number of workers active. See the comment in // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more // details. // // It will post tasks to the queue with id -1. It also assumes this is the // first thing called in a test since it will clear the complete_sequence_. void EnsureAllWorkersCreated() { // Create a bunch of threads, all waiting. This will cause that may // workers to be created. ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), -1, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Now wake them up and wait until they're done. blocker.Unblock(kNumWorkerThreads); tracker()->WaitUntilTasksComplete(kNumWorkerThreads); // Clean up the task IDs we added. tracker()->ClearCompleteSequence(); } int has_work_call_count() const { return pool_owner_->has_work_call_count(); } private: MessageLoop message_loop_; scoped_ptr pool_owner_; const scoped_refptr tracker_; }; // Checks that the given number of entries are in the tasks to complete of // the given tracker, and then signals the given event the given number of // times. This is used to wake up blocked background threads before blocking // on shutdown. void EnsureTasksToCompleteCountAndUnblock(scoped_refptr tracker, size_t expected_tasks_to_complete, ThreadBlocker* blocker, size_t threads_to_awake) { EXPECT_EQ( expected_tasks_to_complete, tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); blocker->Unblock(threads_to_awake); } class DeletionHelper : public base::RefCountedThreadSafe { public: explicit DeletionHelper( const scoped_refptr >& deleted_flag) : deleted_flag_(deleted_flag) { } private: friend class base::RefCountedThreadSafe; virtual ~DeletionHelper() { deleted_flag_->data = true; } const scoped_refptr > deleted_flag_; DISALLOW_COPY_AND_ASSIGN(DeletionHelper); }; void HoldPoolReference(const scoped_refptr& pool, const scoped_refptr& helper) { ADD_FAILURE() << "Should never run"; } // Tests that delayed tasks are deleted upon shutdown of the pool. TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { // Post something to verify the pool is started up. EXPECT_TRUE(pool()->PostTask( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1))); scoped_refptr > deleted_flag( new base::RefCountedData(false)); base::Time posted_at(base::Time::Now()); // Post something that shouldn't run. EXPECT_TRUE(pool()->PostDelayedTask( FROM_HERE, base::Bind(&HoldPoolReference, pool(), make_scoped_refptr(new DeletionHelper(deleted_flag))), TestTimeouts::action_timeout())); std::vector completion_sequence = tracker()->WaitUntilTasksComplete(1); ASSERT_EQ(1u, completion_sequence.size()); ASSERT_EQ(1, completion_sequence[0]); // Shutdown is asynchronous, so use ResetPool() to block until the pool is // fully destroyed (and thus shut down). ResetPool(); // Verify that we didn't block until the task was due. ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout()); // Verify that the deferred task has not only not run, but has also been // destroyed. ASSERT_TRUE(deleted_flag->data); } // Tests that same-named tokens have the same ID. TEST_F(SequencedWorkerPoolTest, NamedTokens) { const std::string name1("hello"); SequencedWorkerPool::SequenceToken token1 = pool()->GetNamedSequenceToken(name1); SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); const std::string name3("goodbye"); SequencedWorkerPool::SequenceToken token3 = pool()->GetNamedSequenceToken(name3); // All 3 tokens should be different. EXPECT_FALSE(token1.Equals(token2)); EXPECT_FALSE(token1.Equals(token3)); EXPECT_FALSE(token2.Equals(token3)); // Requesting the same name again should give the same value. SequencedWorkerPool::SequenceToken token1again = pool()->GetNamedSequenceToken(name1); EXPECT_TRUE(token1.Equals(token1again)); SequencedWorkerPool::SequenceToken token3again = pool()->GetNamedSequenceToken(name3); EXPECT_TRUE(token3.Equals(token3again)); } // Tests that posting a bunch of tasks (many more than the number of worker // threads) runs them all. TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::SlowTask, tracker(), 0)); const size_t kNumTasks = 20; for (size_t i = 1; i < kNumTasks; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), i)); } std::vector result = tracker()->WaitUntilTasksComplete(kNumTasks); EXPECT_EQ(kNumTasks, result.size()); } // Tests that posting a bunch of tasks (many more than the number of // worker threads) to two pools simultaneously runs them all twice. // This test is meant to shake out any concurrency issues between // pools (like histograms). TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1"); SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2"); base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0); pool1.pool()->PostWorkerTask(FROM_HERE, slow_task); pool2.pool()->PostWorkerTask(FROM_HERE, slow_task); const size_t kNumTasks = 20; for (size_t i = 1; i < kNumTasks; i++) { base::Closure fast_task = base::Bind(&TestTracker::FastTask, tracker(), i); pool1.pool()->PostWorkerTask(FROM_HERE, fast_task); pool2.pool()->PostWorkerTask(FROM_HERE, fast_task); } std::vector result = tracker()->WaitUntilTasksComplete(2*kNumTasks); EXPECT_EQ(2 * kNumTasks, result.size()); } // Test that tasks with the same sequence token are executed in order but don't // affect other tasks. TEST_F(SequencedWorkerPoolTest, Sequence) { // Fill all the worker threads except one. const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; ThreadBlocker background_blocker; for (size_t i = 0; i < kNumBackgroundTasks; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &background_blocker)); } tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); // Create two tasks with the same sequence token, one that will block on the // event, and one which will just complete quickly when it's run. Since there // is one worker thread free, the first task will start and then block, and // the second task should be waiting. ThreadBlocker blocker; SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Create another two tasks as above with a different token. These will be // blocked since there are no slots to run. SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 200)); pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 201)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Let one background task complete. This should then let both tasks of // token2 run to completion in order. The second task of token1 should still // be blocked. background_blocker.Unblock(1); std::vector result = tracker()->WaitUntilTasksComplete(3); ASSERT_EQ(3u, result.size()); EXPECT_EQ(200, result[1]); EXPECT_EQ(201, result[2]); // Finish the rest of the background tasks. This should leave some workers // free with the second token1 task still blocked on the first. background_blocker.Unblock(kNumBackgroundTasks - 1); EXPECT_EQ(kNumBackgroundTasks + 2, tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); // Allow the first task of token1 to complete. This should run the second. blocker.Unblock(1); result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); EXPECT_EQ(100, result[result.size() - 2]); EXPECT_EQ(101, result[result.size() - 1]); } // Tests that any tasks posted after Shutdown are ignored. // Disabled for flakiness. See http://crbug.com/166451. TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) { // Start tasks to take all the threads and block them. EnsureAllWorkersCreated(); ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); SetWillWaitForShutdownCallback( base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr(tracker()), 0, &blocker, kNumWorkerThreads)); // Shutdown the worker pool. This should discard all non-blocking tasks. const int kMaxNewBlockingTasksAfterShutdown = 100; pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown); int old_has_work_call_count = has_work_call_count(); std::vector result = tracker()->WaitUntilTasksComplete(kNumWorkerThreads); // The kNumWorkerThread items should have completed, in no particular order. ASSERT_EQ(kNumWorkerThreads, result.size()); for (size_t i = 0; i < kNumWorkerThreads; i++) EXPECT_TRUE(ContainsValue(result, static_cast(i))); // No further tasks, regardless of shutdown mode, should be allowed. EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 100), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101), SequencedWorkerPool::SKIP_ON_SHUTDOWN)); EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 102), SequencedWorkerPool::BLOCK_SHUTDOWN)); ASSERT_EQ(old_has_work_call_count, has_work_call_count()); } TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { // Test that new blocking tasks are allowed provided they're posted // by a running tasks. EnsureAllWorkersCreated(); ThreadBlocker blocker; // Start tasks to take all the threads and block them. const int kNumBlockTasks = static_cast(kNumWorkerThreads); for (int i = 0; i < kNumBlockTasks; ++i) { EXPECT_TRUE(pool()->PostWorkerTask( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Queue up shutdown blocking tasks behind those which will attempt to post // additional tasks when run, PostAdditionalTasks attemtps to post 3 // new FastTasks, one for each shutdown_behavior. const int kNumQueuedTasks = static_cast(kNumWorkerThreads); for (int i = 0; i < kNumQueuedTasks; ++i) { EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, base::RetainedRef(pool()), false), SequencedWorkerPool::BLOCK_SHUTDOWN)); } // Setup to open the floodgates from within Shutdown(). SetWillWaitForShutdownCallback( base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr(tracker()), 0, &blocker, kNumBlockTasks)); // Allow half of the additional blocking tasks thru. const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2; pool()->Shutdown(kNumNewBlockingTasksToAllow); // Ensure that the correct number of tasks actually got run. tracker()->WaitUntilTasksComplete(static_cast( kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow)); // Clean up the task IDs we added and go home. tracker()->ClearCompleteSequence(); } // Tests that blocking tasks can still be posted during shutdown, as long as // the task is not being posted within the context of a running task. TEST_F(SequencedWorkerPoolTest, AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) { EnsureAllWorkersCreated(); ThreadBlocker blocker; // Start tasks to take all the threads and block them. const int kNumBlockTasks = static_cast(kNumWorkerThreads); for (int i = 0; i < kNumBlockTasks; ++i) { EXPECT_TRUE(pool()->PostWorkerTask( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Setup to open the floodgates from within Shutdown(). SetWillWaitForShutdownCallback( base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads, scoped_refptr(tracker()), pool(), &blocker, kNumWorkerThreads)); pool()->Shutdown(kNumWorkerThreads + 1); // Ensure that the correct number of tasks actually got run. tracker()->WaitUntilTasksComplete(static_cast(kNumWorkerThreads + 1)); tracker()->ClearCompleteSequence(); } // Tests that unrun tasks are discarded properly according to their shutdown // mode. TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { // Start tasks to take all the threads and block them. EnsureAllWorkersCreated(); ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Create some tasks with different shutdown modes. pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 100), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101), SequencedWorkerPool::SKIP_ON_SHUTDOWN); pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 102), SequencedWorkerPool::BLOCK_SHUTDOWN); // Shutdown the worker pool. This should discard all non-blocking tasks. SetWillWaitForShutdownCallback( base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr(tracker()), 0, &blocker, kNumWorkerThreads)); pool()->Shutdown(); std::vector result = tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1); // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN // one, in no particular order. ASSERT_EQ(kNumWorkerThreads + 1, result.size()); for (size_t i = 0; i < kNumWorkerThreads; i++) EXPECT_TRUE(ContainsValue(result, static_cast(i))); EXPECT_TRUE(ContainsValue(result, 102)); } // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { scoped_refptr runner(pool()->GetTaskRunnerWithShutdownBehavior( SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); scoped_refptr sequenced_runner( pool()->GetSequencedTaskRunnerWithShutdownBehavior( pool()->GetSequenceToken(), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); EnsureAllWorkersCreated(); ThreadBlocker blocker; pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 0, &blocker), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); runner->PostTask( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 1, &blocker)); sequenced_runner->PostTask( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 2, &blocker)); tracker()->WaitUntilTasksBlocked(3); // This should not block. If this test hangs, it means it failed. pool()->Shutdown(); // The task should not have completed yet. EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Posting more tasks should fail. EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); EXPECT_FALSE(runner->PostTask( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); EXPECT_FALSE(sequenced_runner->PostTask( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); // Continue the background thread and make sure the tasks can complete. blocker.Unblock(3); std::vector result = tracker()->WaitUntilTasksComplete(3); EXPECT_EQ(3u, result.size()); } // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown // until they stop, but tasks not yet started do not. TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) { // Start tasks to take all the threads and block them. EnsureAllWorkersCreated(); ThreadBlocker blocker; // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not // return until these tasks have completed. for (size_t i = 0; i < kNumWorkerThreads; i++) { pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker), SequencedWorkerPool::SKIP_ON_SHUTDOWN); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be // executed once Shutdown() has been called. pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 0, &blocker), SequencedWorkerPool::SKIP_ON_SHUTDOWN); // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have // been started block shutdown. SetWillWaitForShutdownCallback( base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr(tracker()), 0, &blocker, kNumWorkerThreads)); // No tasks should have completed yet. EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // This should not block. If this test hangs, it means it failed. pool()->Shutdown(); // Shutdown should not return until all of the tasks have completed. std::vector result = tracker()->WaitUntilTasksComplete(kNumWorkerThreads); // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be // allowed to complete. No additional non-blocking tasks should have been // started. ASSERT_EQ(kNumWorkerThreads, result.size()); for (size_t i = 0; i < kNumWorkerThreads; i++) EXPECT_TRUE(ContainsValue(result, static_cast(i))); } // Ensure all worker threads are created, and then trigger a spurious // work signal. This shouldn't cause any other work signals to be // triggered. This is a regression test for http://crbug.com/117469. TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { EnsureAllWorkersCreated(); int old_has_work_call_count = has_work_call_count(); pool()->SignalHasWorkForTesting(); // This is inherently racy, but can only produce false positives. base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count()); } void IsRunningOnCurrentThreadTask( SequencedWorkerPool::SequenceToken test_positive_token, SequencedWorkerPool::SequenceToken test_negative_token, SequencedWorkerPool* pool, SequencedWorkerPool* unused_pool) { EXPECT_TRUE(pool->RunsTasksOnCurrentThread()); EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token)); EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token)); EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); EXPECT_FALSE( unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token)); EXPECT_FALSE( unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token)); } // Verify correctness of the IsRunningSequenceOnCurrentThread method. TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) { SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); SequencedWorkerPool::SequenceToken unsequenced_token; SequencedWorkerPoolOwner unused_pool_owner(2, "unused_pool"); EXPECT_FALSE(pool()->RunsTasksOnCurrentThread()); EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1)); EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2)); EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token)); EXPECT_FALSE(unused_pool_owner.pool()->RunsTasksOnCurrentThread()); EXPECT_FALSE( unused_pool_owner.pool()->IsRunningSequenceOnCurrentThread(token1)); EXPECT_FALSE( unused_pool_owner.pool()->IsRunningSequenceOnCurrentThread(token2)); EXPECT_FALSE(unused_pool_owner.pool()->IsRunningSequenceOnCurrentThread( unsequenced_token)); pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&IsRunningOnCurrentThreadTask, token1, token2, base::RetainedRef(pool()), base::RetainedRef(unused_pool_owner.pool()))); pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&IsRunningOnCurrentThreadTask, token2, unsequenced_token, base::RetainedRef(pool()), base::RetainedRef(unused_pool_owner.pool()))); pool()->PostWorkerTask( FROM_HERE, base::Bind(&IsRunningOnCurrentThreadTask, unsequenced_token, token1, base::RetainedRef(pool()), base::RetainedRef(unused_pool_owner.pool()))); } // Checks that tasks are destroyed in the right context during shutdown. If a // task is destroyed while SequencedWorkerPool's global lock is held, // SequencedWorkerPool might deadlock. TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) { for (int i = 0; i < 4; ++i) { scoped_refptr checker( new DestructionDeadlockChecker(pool())); tracker()->PostRepostingTask(pool(), checker); } // Shutting down the pool should destroy the DestructionDeadlockCheckers, // which in turn should not deadlock in their destructors. pool()->Shutdown(); } // Similar to the test AvoidsDeadlockOnShutdown, but there are now also // sequenced, blocking tasks in the queue during shutdown. TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) { const std::string sequence_token_name("name"); for (int i = 0; i < 4; ++i) { scoped_refptr checker( new DestructionDeadlockChecker(pool())); tracker()->PostRepostingTask(pool(), checker); SequencedWorkerPool::SequenceToken token1 = pool()->GetNamedSequenceToken(sequence_token_name); tracker()->PostRepostingBlockingTask(pool(), token1); } // Shutting down the pool should destroy the DestructionDeadlockCheckers, // which in turn should not deadlock in their destructors. pool()->Shutdown(); } // Verify that FlushForTesting works as intended. TEST_F(SequencedWorkerPoolTest, FlushForTesting) { // Should be fine to call on a new instance. pool()->FlushForTesting(); // Queue up a bunch of work, including a long delayed task and // a task that produces additional tasks as an artifact. pool()->PostDelayedWorkerTask( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), TimeDelta::FromMinutes(5)); pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::SlowTask, tracker(), 0)); const size_t kNumFastTasks = 20; for (size_t i = 0; i < kNumFastTasks; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)); } pool()->PostWorkerTask( FROM_HERE, base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, base::RetainedRef(pool()), true)); // We expect all except the delayed task to have been run. We verify all // closures have been deleted by looking at the refcount of the // tracker. EXPECT_FALSE(tracker()->HasOneRef()); pool()->FlushForTesting(); EXPECT_TRUE(tracker()->HasOneRef()); EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount()); // Should be fine to call on an idle instance with all threads created, and // spamming the method shouldn't deadlock or confuse the class. pool()->FlushForTesting(); pool()->FlushForTesting(); // Should be fine to call after shutdown too. pool()->Shutdown(); pool()->FlushForTesting(); } // Helper method for VerifyCurrentSequencedTaskRunner() and // VerifyCurrentSequencedTaskRunnerForUnsequencedTask(). void VerifySequencedTaskRunnerRunsOnCurrentThread( SequencedTaskRunner* task_runner, bool should_run_on_current_thread, const Closure& callback) { EXPECT_EQ(should_run_on_current_thread, task_runner->RunsTasksOnCurrentThread()); callback.Run(); } void VerifyCurrentSequencedTaskRunner( SequencedTaskRunner* expected_task_runner, bool expected_equal, const Closure& callback) { scoped_refptr task_runner = SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); // SequencedTaskRunner does not allow directly checking for equality, but we // can post a task to one task runner and verify that the other task runner // is on the same sequence. task_runner->PostTask( FROM_HERE, Bind(&VerifySequencedTaskRunnerRunsOnCurrentThread, base::Unretained(expected_task_runner), expected_equal, callback)); } void VerifyCurrentSequencedTaskRunnerForUnsequencedTask( SequencedWorkerPool* pool, const Closure& callback) { EXPECT_FALSE( SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()); scoped_refptr task_runner = SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); EXPECT_TRUE(task_runner->RunsTasksOnCurrentThread()); scoped_refptr expected_task_runner = SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); // The pool should now be running a sequence. This also verifies that no other // thread will start running tasks with this sequence token. const SequencedWorkerPool::SequenceToken sequence_token = SequencedWorkerPool::GetSequenceTokenForCurrentThread(); ASSERT_TRUE(sequence_token.IsValid()); EXPECT_TRUE(pool->IsRunningSequence(sequence_token)); // The two sequenced task runners should be the same. See // VerifyCurrentSequencedTaskRunner() above for why the check is implemented // this way. const bool expected_equal = true; task_runner->PostTask(FROM_HERE, Bind(&VerifySequencedTaskRunnerRunsOnCurrentThread, RetainedRef(std::move(expected_task_runner)), expected_equal, callback)); } TEST_F(SequencedWorkerPoolTest, GetSequencedTaskRunnerForCurrentThread) { EnsureAllWorkersCreated(); // The current thread should not have a sequenced task runner from a // worker pool. scoped_refptr local_task_runner = SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); EXPECT_FALSE(local_task_runner); WaitableEvent event(false, false); Closure signal = Bind(&WaitableEvent::Signal, Unretained(&event)); scoped_refptr task_runner_1 = pool()->GetSequencedTaskRunner(SequencedWorkerPool::GetSequenceToken()); scoped_refptr task_runner_2 = pool()->GetSequencedTaskRunner(SequencedWorkerPool::GetSequenceToken()); task_runner_1->PostTask( FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunner, base::Unretained(task_runner_1.get()), true, signal)); event.Wait(); task_runner_2->PostTask( FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunner, base::Unretained(task_runner_2.get()), true, signal)); event.Wait(); task_runner_1->PostTask( FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunner, base::Unretained(task_runner_2.get()), false, signal)); event.Wait(); pool()->PostWorkerTask( FROM_HERE, Bind(&VerifyCurrentSequencedTaskRunnerForUnsequencedTask, RetainedRef(pool()), signal)); event.Wait(); } class ChecksSequenceOnDestruction : public RefCountedThreadSafe { public: void DoNothing() {} private: friend class RefCountedThreadSafe; ~ChecksSequenceOnDestruction() { EXPECT_TRUE(sequence_checker_.CalledOnValidSequencedThread()); } SequenceCheckerImpl sequence_checker_; }; void VerifySequenceOnDestruction(const Closure& callback) { scoped_refptr task_runner = SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); scoped_refptr check_sequence( new ChecksSequenceOnDestruction); // Post a task to an empty method. This will keep the only reference to the // object, so it will be destroyed right after running the task. task_runner->PostTask(FROM_HERE, Bind(&ChecksSequenceOnDestruction::DoNothing, std::move(check_sequence))); // Post the callback afterwards, so we can be sure the first task completed. task_runner->PostTask(FROM_HERE, callback); } TEST_F(SequencedWorkerPoolTest, CheckSequenceOnDestruction) { EnsureAllWorkersCreated(); WaitableEvent event(false, false); Closure signal = Bind(&WaitableEvent::Signal, Unretained(&event)); pool()->PostWorkerTask(FROM_HERE, Bind(&VerifySequenceOnDestruction, signal)); event.Wait(); } TEST_F(SequencedWorkerPoolTest, ShutsDownCleanWithContinueOnShutdown) { scoped_refptr task_runner = pool()->GetSequencedTaskRunnerWithShutdownBehavior( pool()->GetSequenceToken(), base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); // Upon test exit, should shut down without hanging. pool()->Shutdown(); } class SequencedWorkerPoolTaskRunnerTestDelegate { public: SequencedWorkerPoolTaskRunnerTestDelegate() {} ~SequencedWorkerPoolTaskRunnerTestDelegate() {} void StartTaskRunner() { pool_owner_.reset( new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); } scoped_refptr GetTaskRunner() { return pool_owner_->pool(); } void StopTaskRunner() { // Make sure all tasks are run before shutting down. Delayed tasks are // not run, they're simply deleted. pool_owner_->pool()->FlushForTesting(); pool_owner_->pool()->Shutdown(); // Don't reset |pool_owner_| here, as the test may still hold a // reference to the pool. } private: MessageLoop message_loop_; scoped_ptr pool_owner_; }; INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPool, TaskRunnerTest, SequencedWorkerPoolTaskRunnerTestDelegate); INSTANTIATE_TYPED_TEST_CASE_P(SequencedWorkerPool, TaskRunnerAffinityTest, SequencedWorkerPoolTaskRunnerTestDelegate); class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate { public: SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {} ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() { } void StartTaskRunner() { pool_owner_.reset( new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( SequencedWorkerPool::BLOCK_SHUTDOWN); } scoped_refptr GetTaskRunner() { return task_runner_; } void StopTaskRunner() { // Make sure all tasks are run before shutting down. Delayed tasks are // not run, they're simply deleted. pool_owner_->pool()->FlushForTesting(); pool_owner_->pool()->Shutdown(); // Don't reset |pool_owner_| here, as the test may still hold a // reference to the pool. } private: MessageLoop message_loop_; scoped_ptr pool_owner_; scoped_refptr task_runner_; }; INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolTaskRunner, TaskRunnerTest, SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate); INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolTaskRunner, TaskRunnerAffinityTest, SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate); class SequencedWorkerPoolSequencedTaskRunnerTestDelegate { public: SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {} ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() { } void StartTaskRunner() { pool_owner_.reset(new SequencedWorkerPoolOwner( 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( pool_owner_->pool()->GetSequenceToken()); } scoped_refptr GetTaskRunner() { return task_runner_; } void StopTaskRunner() { // Make sure all tasks are run before shutting down. Delayed tasks are // not run, they're simply deleted. pool_owner_->pool()->FlushForTesting(); pool_owner_->pool()->Shutdown(); // Don't reset |pool_owner_| here, as the test may still hold a // reference to the pool. } private: MessageLoop message_loop_; scoped_ptr pool_owner_; scoped_refptr task_runner_; }; INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, SequencedWorkerPoolSequencedTaskRunnerTestDelegate); INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolSequencedTaskRunner, TaskRunnerAffinityTest, SequencedWorkerPoolSequencedTaskRunnerTestDelegate); INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, SequencedWorkerPoolSequencedTaskRunnerTestDelegate); INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerDelayedTest, SequencedWorkerPoolSequencedTaskRunnerTestDelegate); } // namespace } // namespace base