// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/threading/sequenced_worker_pool.h" #include #include "base/bind.h" #include "base/compiler_specific.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop.h" #include "base/message_loop_proxy.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/test/sequenced_worker_pool_owner.h" #include "base/test/sequenced_task_runner_test_template.h" #include "base/test/task_runner_test_template.h" #include "base/threading/platform_thread.h" #include "base/tracked_objects.h" #include "testing/gtest/include/gtest/gtest.h" namespace base { // IMPORTANT NOTE: // // Many of these tests have failure modes where they'll hang forever. These // tests should not be flaky, and hangling indicates a type of failure. Do not // mark as flaky if they're hanging, it's likely an actual bug. namespace { const size_t kNumWorkerThreads = 3; // Allows a number of threads to all be blocked on the same event, and // provides a way to unblock a certain number of them. class ThreadBlocker { public: ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {} void Block() { { base::AutoLock lock(lock_); while (unblock_counter_ == 0) cond_var_.Wait(); unblock_counter_--; } cond_var_.Signal(); } void Unblock(size_t count) { { base::AutoLock lock(lock_); DCHECK(unblock_counter_ == 0); unblock_counter_ = count; } cond_var_.Signal(); } private: base::Lock lock_; base::ConditionVariable cond_var_; size_t unblock_counter_; }; class TestTracker : public base::RefCountedThreadSafe { public: TestTracker() : lock_(), cond_var_(&lock_), started_events_(0) { } // Each of these tasks appends the argument to the complete sequence vector // so calling code can see what order they finished in. void FastTask(int id) { SignalWorkerDone(id); } void SlowTask(int id) { base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1)); SignalWorkerDone(id); } void BlockTask(int id, ThreadBlocker* blocker) { // Note that this task has started and signal anybody waiting for that // to happen. { base::AutoLock lock(lock_); started_events_++; } cond_var_.Signal(); blocker->Block(); SignalWorkerDone(id); } // Waits until the given number of tasks have started executing. void WaitUntilTasksBlocked(size_t count) { { base::AutoLock lock(lock_); while (started_events_ < count) cond_var_.Wait(); } cond_var_.Signal(); } // Blocks the current thread until at least the given number of tasks are in // the completed vector, and then returns a copy. std::vector WaitUntilTasksComplete(size_t num_tasks) { std::vector ret; { base::AutoLock lock(lock_); while (complete_sequence_.size() < num_tasks) cond_var_.Wait(); ret = complete_sequence_; } cond_var_.Signal(); return ret; } void ClearCompleteSequence() { base::AutoLock lock(lock_); complete_sequence_.clear(); started_events_ = 0; } private: friend class base::RefCountedThreadSafe; ~TestTracker() {} void SignalWorkerDone(int id) { { base::AutoLock lock(lock_); complete_sequence_.push_back(id); } cond_var_.Signal(); } // Protects the complete_sequence. base::Lock lock_; base::ConditionVariable cond_var_; // Protected by lock_. std::vector complete_sequence_; // Counter of the number of "block" workers that have started. size_t started_events_; }; class SequencedWorkerPoolTest : public testing::Test { public: SequencedWorkerPoolTest() : pool_owner_(kNumWorkerThreads, "test"), tracker_(new TestTracker) { } virtual ~SequencedWorkerPoolTest() {} virtual void SetUp() OVERRIDE {} virtual void TearDown() OVERRIDE { pool()->Shutdown(); } const scoped_refptr& pool() { return pool_owner_.pool(); } TestTracker* tracker() { return tracker_.get(); } void SetWillWaitForShutdownCallback(const Closure& callback) { pool_owner_.SetWillWaitForShutdownCallback(callback); } // Ensures that the given number of worker threads is created by adding // tasks and waiting until they complete. Worker thread creation is // serialized, can happen on background threads asynchronously, and doesn't // happen any more at shutdown. This means that if a test posts a bunch of // tasks and calls shutdown, fewer workers will be created than the test may // expect. // // This function ensures that this condition can't happen so tests can make // assumptions about the number of workers active. See the comment in // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more // details. // // It will post tasks to the queue with id -1. It also assumes this is the // first thing called in a test since it will clear the complete_sequence_. void EnsureAllWorkersCreated() { // Create a bunch of threads, all waiting. This will cause that may // workers to be created. ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), -1, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Now wake them up and wait until they're done. blocker.Unblock(kNumWorkerThreads); tracker()->WaitUntilTasksComplete(kNumWorkerThreads); // Clean up the task IDs we added. tracker()->ClearCompleteSequence(); } int has_work_call_count() const { return pool_owner_.has_work_call_count(); } private: MessageLoop message_loop_; SequencedWorkerPoolOwner pool_owner_; const scoped_refptr tracker_; }; // Checks that the given number of entries are in the tasks to complete of // the given tracker, and then signals the given event the given number of // times. This is used to wakt up blocked background threads before blocking // on shutdown. void EnsureTasksToCompleteCountAndUnblock(scoped_refptr tracker, size_t expected_tasks_to_complete, ThreadBlocker* blocker, size_t threads_to_awake) { EXPECT_EQ( expected_tasks_to_complete, tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); blocker->Unblock(threads_to_awake); } // Tests that same-named tokens have the same ID. TEST_F(SequencedWorkerPoolTest, NamedTokens) { const std::string name1("hello"); SequencedWorkerPool::SequenceToken token1 = pool()->GetNamedSequenceToken(name1); SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); const std::string name3("goodbye"); SequencedWorkerPool::SequenceToken token3 = pool()->GetNamedSequenceToken(name3); // All 3 tokens should be different. EXPECT_FALSE(token1.Equals(token2)); EXPECT_FALSE(token1.Equals(token3)); EXPECT_FALSE(token2.Equals(token3)); // Requesting the same name again should give the same value. SequencedWorkerPool::SequenceToken token1again = pool()->GetNamedSequenceToken(name1); EXPECT_TRUE(token1.Equals(token1again)); SequencedWorkerPool::SequenceToken token3again = pool()->GetNamedSequenceToken(name3); EXPECT_TRUE(token3.Equals(token3again)); } // Tests that posting a bunch of tasks (many more than the number of worker // threads) runs them all. TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::SlowTask, tracker(), 0)); const size_t kNumTasks = 20; for (size_t i = 1; i < kNumTasks; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), i)); } std::vector result = tracker()->WaitUntilTasksComplete(kNumTasks); EXPECT_EQ(kNumTasks, result.size()); } // Tests that posting a bunch of tasks (many more than the number of // worker threads) to two pools simultaneously runs them all twice. // This test is meant to shake out any concurrency issues between // pools (like histograms). TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1"); SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2"); base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0); pool1.pool()->PostWorkerTask(FROM_HERE, slow_task); pool2.pool()->PostWorkerTask(FROM_HERE, slow_task); const size_t kNumTasks = 20; for (size_t i = 1; i < kNumTasks; i++) { base::Closure fast_task = base::Bind(&TestTracker::FastTask, tracker(), i); pool1.pool()->PostWorkerTask(FROM_HERE, fast_task); pool2.pool()->PostWorkerTask(FROM_HERE, fast_task); } std::vector result = tracker()->WaitUntilTasksComplete(2*kNumTasks); EXPECT_EQ(2*kNumTasks, result.size()); pool2.pool()->Shutdown(); pool1.pool()->Shutdown(); } // Test that tasks with the same sequence token are executed in order but don't // affect other tasks. TEST_F(SequencedWorkerPoolTest, Sequence) { // Fill all the worker threads except one. const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; ThreadBlocker background_blocker; for (size_t i = 0; i < kNumBackgroundTasks; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &background_blocker)); } tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); // Create two tasks with the same sequence token, one that will block on the // event, and one which will just complete quickly when it's run. Since there // is one worker thread free, the first task will start and then block, and // the second task should be waiting. ThreadBlocker blocker; SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Create another two tasks as above with a different token. These will be // blocked since there are no slots to run. SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 200)); pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 201)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Let one background task complete. This should then let both tasks of // token2 run to completion in order. The second task of token1 should still // be blocked. background_blocker.Unblock(1); std::vector result = tracker()->WaitUntilTasksComplete(3); ASSERT_EQ(3u, result.size()); EXPECT_EQ(200, result[1]); EXPECT_EQ(201, result[2]); // Finish the rest of the background tasks. This should leave some workers // free with the second token1 task still blocked on the first. background_blocker.Unblock(kNumBackgroundTasks - 1); EXPECT_EQ(kNumBackgroundTasks + 2, tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); // Allow the first task of token1 to complete. This should run the second. blocker.Unblock(1); result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); EXPECT_EQ(100, result[result.size() - 2]); EXPECT_EQ(101, result[result.size() - 1]); } // Tests that unrun tasks are discarded properly according to their shutdown // mode. TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { // Start tasks to take all the threads and block them. EnsureAllWorkersCreated(); ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool()->PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Create some tasks with different shutdown modes. pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 100), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101), SequencedWorkerPool::SKIP_ON_SHUTDOWN); pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 102), SequencedWorkerPool::BLOCK_SHUTDOWN); // Shutdown the worker pool. This should discard all non-blocking tasks. SetWillWaitForShutdownCallback( base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr(tracker()), 0, &blocker, kNumWorkerThreads)); pool()->Shutdown(); std::vector result = tracker()->WaitUntilTasksComplete(4); // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN // one, in no particular order. ASSERT_EQ(4u, result.size()); for (size_t i = 0; i < kNumWorkerThreads; i++) { EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast(i)) != result.end()); } EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); } // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { scoped_refptr runner(pool()->GetTaskRunnerWithShutdownBehavior( SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); scoped_refptr sequenced_runner( pool()->GetSequencedTaskRunnerWithShutdownBehavior( pool()->GetSequenceToken(), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); EnsureAllWorkersCreated(); ThreadBlocker blocker; pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 0, &blocker), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); runner->PostTask( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 1, &blocker)); sequenced_runner->PostTask( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 2, &blocker)); tracker()->WaitUntilTasksBlocked(3); // This should not block. If this test hangs, it means it failed. pool()->Shutdown(); // The task should not have completed yet. EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Posting more tasks should fail. EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); EXPECT_FALSE(runner->PostTask( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); EXPECT_FALSE(sequenced_runner->PostTask( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); // Continue the background thread and make sure the tasks can complete. blocker.Unblock(3); std::vector result = tracker()->WaitUntilTasksComplete(3); EXPECT_EQ(3u, result.size()); } // Ensure all worker threads are created, and then trigger a spurious // work signal. This shouldn't cause any other work signals to be // triggered. This is a regression test for http://crbug.com/117469. TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { EnsureAllWorkersCreated(); int old_has_work_call_count = has_work_call_count(); pool()->SignalHasWorkForTesting(); // This is inherently racy, but can only produce false positives. base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count()); } void IsRunningOnCurrentThreadTask( SequencedWorkerPool::SequenceToken test_positive_token, SequencedWorkerPool::SequenceToken test_negative_token, SequencedWorkerPool* pool, SequencedWorkerPool* unused_pool) { EXPECT_TRUE(pool->RunsTasksOnCurrentThread()); EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token)); EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token)); EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); EXPECT_FALSE( unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token)); EXPECT_FALSE( unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token)); } // Verify correctness of the IsRunningSequenceOnCurrentThread method. TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) { SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); SequencedWorkerPool::SequenceToken unsequenced_token; scoped_refptr unused_pool = new SequencedWorkerPool(2, "unused_pool"); EXPECT_TRUE(token1.Equals(unused_pool->GetSequenceToken())); EXPECT_TRUE(token2.Equals(unused_pool->GetSequenceToken())); EXPECT_FALSE(pool()->RunsTasksOnCurrentThread()); EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1)); EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2)); EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token)); EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1)); EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2)); EXPECT_FALSE( unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token)); pool()->PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&IsRunningOnCurrentThreadTask, token1, token2, pool(), unused_pool)); pool()->PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&IsRunningOnCurrentThreadTask, token2, unsequenced_token, pool(), unused_pool)); pool()->PostWorkerTask( FROM_HERE, base::Bind(&IsRunningOnCurrentThreadTask, unsequenced_token, token1, pool(), unused_pool)); pool()->Shutdown(); unused_pool->Shutdown(); } class SequencedWorkerPoolTaskRunnerTestDelegate { public: SequencedWorkerPoolTaskRunnerTestDelegate() {} ~SequencedWorkerPoolTaskRunnerTestDelegate() {} void StartTaskRunner() { pool_owner_.reset( new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); } scoped_refptr GetTaskRunner() { return pool_owner_->pool(); } void StopTaskRunner() { pool_owner_->pool()->Shutdown(); // Don't reset |pool_owner_| here, as the test may still hold a // reference to the pool. } bool TaskRunnerHandlesNonZeroDelays() const { // TODO(akalin): Set this to true once SequencedWorkerPool handles // non-zero delays. return false; } private: MessageLoop message_loop_; scoped_ptr pool_owner_; }; INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPool, TaskRunnerTest, SequencedWorkerPoolTaskRunnerTestDelegate); class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate { public: SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {} ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() { } void StartTaskRunner() { pool_owner_.reset( new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( SequencedWorkerPool::BLOCK_SHUTDOWN); } scoped_refptr GetTaskRunner() { return task_runner_; } void StopTaskRunner() { pool_owner_->pool()->FlushForTesting(); pool_owner_->pool()->Shutdown(); // Don't reset |pool_owner_| here, as the test may still hold a // reference to the pool. } bool TaskRunnerHandlesNonZeroDelays() const { // TODO(akalin): Set this to true once SequencedWorkerPool handles // non-zero delays. return false; } private: MessageLoop message_loop_; scoped_ptr pool_owner_; scoped_refptr task_runner_; }; INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolTaskRunner, TaskRunnerTest, SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate); class SequencedWorkerPoolSequencedTaskRunnerTestDelegate { public: SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {} ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() { } void StartTaskRunner() { pool_owner_.reset(new SequencedWorkerPoolOwner( 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( pool_owner_->pool()->GetSequenceToken()); } scoped_refptr GetTaskRunner() { return task_runner_; } void StopTaskRunner() { pool_owner_->pool()->FlushForTesting(); pool_owner_->pool()->Shutdown(); // Don't reset |pool_owner_| here, as the test may still hold a // reference to the pool. } bool TaskRunnerHandlesNonZeroDelays() const { // TODO(akalin): Set this to true once SequencedWorkerPool handles // non-zero delays. return false; } private: MessageLoop message_loop_; scoped_ptr pool_owner_; scoped_refptr task_runner_; }; INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, SequencedWorkerPoolSequencedTaskRunnerTestDelegate); INSTANTIATE_TYPED_TEST_CASE_P( SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, SequencedWorkerPoolSequencedTaskRunnerTestDelegate); } // namespace } // namespace base