// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include #include "base/bind.h" #include "base/memory/ref_counted.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" #include "base/threading/sequenced_worker_pool.h" #include "testing/gtest/include/gtest/gtest.h" namespace base { // IMPORTANT NOTE: // // Many of these tests have failure modes where they'll hang forever. These // tests should not be flaky, and hangling indicates a type of failure. Do not // mark as flaky if they're hanging, it's likely an actual bug. namespace { const size_t kNumWorkerThreads = 3; // Allows a number of threads to all be blocked on the same event, and // provides a way to unblock a certain number of them. class ThreadBlocker { public: ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) { } void Block() { { base::AutoLock lock(lock_); while (unblock_counter_ == 0) cond_var_.Wait(); unblock_counter_--; } cond_var_.Signal(); } void Unblock(size_t count) { { base::AutoLock lock(lock_); DCHECK(unblock_counter_ == 0); unblock_counter_ = count; } cond_var_.Signal(); } private: base::Lock lock_; base::ConditionVariable cond_var_; size_t unblock_counter_; }; class TestTracker : public base::RefCountedThreadSafe { public: TestTracker() : lock_(), cond_var_(&lock_), started_events_(0) { } // Each of these tasks appends the argument to the complete sequence vector // so calling code can see what order they finished in. void FastTask(int id) { SignalWorkerDone(id); } void SlowTask(int id) { base::PlatformThread::Sleep(1000); SignalWorkerDone(id); } void BlockTask(int id, ThreadBlocker* blocker) { // Note that this task has started and signal anybody waiting for that // to happen. { base::AutoLock lock(lock_); started_events_++; } cond_var_.Signal(); blocker->Block(); SignalWorkerDone(id); } // Waits until the given number of tasks have started executing. void WaitUntilTasksBlocked(size_t count) { { base::AutoLock lock(lock_); while (started_events_ < count) cond_var_.Wait(); } cond_var_.Signal(); } // Blocks the current thread until at least the given number of tasks are in // the completed vector, and then returns a copy. std::vector WaitUntilTasksComplete(size_t num_tasks) { std::vector ret; { base::AutoLock lock(lock_); while (complete_sequence_.size() < num_tasks) cond_var_.Wait(); ret = complete_sequence_; } cond_var_.Signal(); return ret; } void ClearCompleteSequence() { base::AutoLock lock(lock_); complete_sequence_.clear(); started_events_ = 0; } private: void SignalWorkerDone(int id) { { base::AutoLock lock(lock_); complete_sequence_.push_back(id); } cond_var_.Signal(); } // Protects the complete_sequence. base::Lock lock_; base::ConditionVariable cond_var_; // Protected by lock_. std::vector complete_sequence_; // Counter of the number of "block" workers that have started. size_t started_events_; }; class SequencedWorkerPoolTest : public testing::Test, public SequencedWorkerPool::TestingObserver { public: SequencedWorkerPoolTest() : pool_(kNumWorkerThreads, "test"), tracker_(new TestTracker) { pool_.SetTestingObserver(this); } ~SequencedWorkerPoolTest() { } virtual void SetUp() { } virtual void TearDown() { pool_.Shutdown(); } SequencedWorkerPool& pool() { return pool_; } TestTracker* tracker() { return tracker_.get(); } // Ensures that the given number of worker threads is created by adding // tasks and waiting until they complete. Worker thread creation is // serialized, can happen on background threads asynchronously, and doesn't // happen any more at shutdown. This means that if a test posts a bunch of // tasks and calls shutdown, fewer workers will be created than the test may // expect. // // This function ensures that this condition can't happen so tests can make // assumptions about the number of workers active. See the comment in // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more // details. // // It will post tasks to the queue with id -1. It also assumes this is the // first thing called in a test since it will clear the complete_sequence_. void EnsureAllWorkersCreated() { // Create a bunch of threads, all waiting. This will cause that may // workers to be created. ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool().PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), -1, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Now wake them up and wait until they're done. blocker.Unblock(kNumWorkerThreads); tracker()->WaitUntilTasksComplete(kNumWorkerThreads); // Clean up the task IDs we added. tracker()->ClearCompleteSequence(); } protected: // This closure will be executed right before the pool blocks on shutdown. base::Closure before_wait_for_shutdown_; private: // SequencedWorkerPool::TestingObserver implementation. virtual void WillWaitForShutdown() { if (!before_wait_for_shutdown_.is_null()) before_wait_for_shutdown_.Run(); } SequencedWorkerPool pool_; scoped_refptr tracker_; }; // Checks that the given number of entries are in the tasks to complete of // the given tracker, and then signals the given event the given number of // times. This is used to wakt up blocked background threads before blocking // on shutdown. void EnsureTasksToCompleteCountAndUnblock(scoped_refptr tracker, size_t expected_tasks_to_complete, ThreadBlocker* blocker, size_t threads_to_awake) { EXPECT_EQ( expected_tasks_to_complete, tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); blocker->Unblock(threads_to_awake); } } // namespace // Tests that same-named tokens have the same ID. TEST_F(SequencedWorkerPoolTest, NamedTokens) { const std::string name1("hello"); SequencedWorkerPool::SequenceToken token1 = pool().GetNamedSequenceToken(name1); SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); const std::string name3("goodbye"); SequencedWorkerPool::SequenceToken token3 = pool().GetNamedSequenceToken(name3); // All 3 tokens should be different. EXPECT_FALSE(token1.Equals(token2)); EXPECT_FALSE(token1.Equals(token3)); EXPECT_FALSE(token2.Equals(token3)); // Requesting the same name again should give the same value. SequencedWorkerPool::SequenceToken token1again = pool().GetNamedSequenceToken(name1); EXPECT_TRUE(token1.Equals(token1again)); SequencedWorkerPool::SequenceToken token3again = pool().GetNamedSequenceToken(name3); EXPECT_TRUE(token3.Equals(token3again)); } // Tests that posting a bunch of tasks (many more than the number of worker // threads) runs them all. TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { pool().PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::SlowTask, tracker(), 0)); const size_t kNumTasks = 20; for (size_t i = 1; i < kNumTasks; i++) { pool().PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), i)); } std::vector result = tracker()->WaitUntilTasksComplete(kNumTasks); EXPECT_EQ(kNumTasks, result.size()); } // Test that tasks with the same sequence token are executed in order but don't // affect other tasks. TEST_F(SequencedWorkerPoolTest, Sequence) { // Fill all the worker threads except one. const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; ThreadBlocker background_blocker; for (size_t i = 0; i < kNumBackgroundTasks; i++) { pool().PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &background_blocker)); } tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); // Create two tasks with the same sequence token, one that will block on the // event, and one which will just complete quickly when it's run. Since there // is one worker thread free, the first task will start and then block, and // the second task should be waiting. ThreadBlocker blocker; SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); pool().PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); pool().PostSequencedWorkerTask( token1, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Create another two tasks as above with a different token. These will be // blocked since there are no slots to run. SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); pool().PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 200)); pool().PostSequencedWorkerTask( token2, FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 201)); EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Let one background task complete. This should then let both tasks of // token2 run to completion in order. The second task of token1 should still // be blocked. background_blocker.Unblock(1); std::vector result = tracker()->WaitUntilTasksComplete(3); ASSERT_EQ(3u, result.size()); EXPECT_EQ(200, result[1]); EXPECT_EQ(201, result[2]); // Finish the rest of the background tasks. This should leave some workers // free with the second token1 task still blocked on the first. background_blocker.Unblock(kNumBackgroundTasks - 1); EXPECT_EQ(kNumBackgroundTasks + 2, tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); // Allow the first task of token1 to complete. This should run the second. blocker.Unblock(1); result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); EXPECT_EQ(100, result[result.size() - 2]); EXPECT_EQ(101, result[result.size() - 1]); } // Tests that unrun tasks are discarded properly according to their shutdown // mode. TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { // Start tasks to take all the threads and block them. EnsureAllWorkersCreated(); ThreadBlocker blocker; for (size_t i = 0; i < kNumWorkerThreads; i++) { pool().PostWorkerTask(FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)); } tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); // Create some tasks with different shutdown modes. pool().PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 100), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); pool().PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 101), SequencedWorkerPool::SKIP_ON_SHUTDOWN); pool().PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 102), SequencedWorkerPool::BLOCK_SHUTDOWN); // Shutdown the worker pool. This should discard all non-blocking tasks. before_wait_for_shutdown_ = base::Bind(&EnsureTasksToCompleteCountAndUnblock, scoped_refptr(tracker()), 0, &blocker, kNumWorkerThreads); pool().Shutdown(); std::vector result = tracker()->WaitUntilTasksComplete(4); // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN // one, in no particular order. ASSERT_EQ(4u, result.size()); for (size_t i = 0; i < kNumWorkerThreads; i++) { EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast(i)) != result.end()); } EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); } // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { EnsureAllWorkersCreated(); ThreadBlocker blocker; pool().PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::BlockTask, tracker(), 0, &blocker), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); tracker()->WaitUntilTasksBlocked(1); // This should not block. If this test hangs, it means it failed. pool().Shutdown(); // The task should not have completed yet. EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); // Posting more tasks should fail. EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); // Continue the background thread and make sure the task can complete. blocker.Unblock(1); std::vector result = tracker()->WaitUntilTasksComplete(1); EXPECT_EQ(1u, result.size()); } } // namespace base