diff options
-rw-r--r-- | base/simple_thread.cc | 66 | ||||
-rw-r--r-- | base/simple_thread.h | 50 | ||||
-rw-r--r-- | base/simple_thread_unittest.cc | 68 |
3 files changed, 183 insertions, 1 deletions
diff --git a/base/simple_thread.cc b/base/simple_thread.cc index fa9a723..f919562 100644 --- a/base/simple_thread.cc +++ b/base/simple_thread.cc @@ -49,4 +49,70 @@ void DelegateSimpleThread::Run() { delegate_ = NULL; } +DelegateSimpleThreadPool::~DelegateSimpleThreadPool() { + DCHECK(threads_.empty()); + DCHECK(delegates_.empty()); + DCHECK(!dry_.IsSignaled()); +} + +void DelegateSimpleThreadPool::Start() { + DCHECK(threads_.empty()) << "Start() called with outstanding threads."; + for (int i = 0; i < num_threads_; ++i) { + DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_); + thread->Start(); + threads_.push_back(thread); + } +} + +void DelegateSimpleThreadPool::JoinAll() { + DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads."; + + // Tell all our threads to quit their worker loop. + AddWork(NULL, num_threads_); + + // Join and destroy all the worker threads. + for (int i = 0; i < num_threads_; ++i) { + threads_[i]->Join(); + delete threads_[i]; + } + threads_.clear(); + DCHECK(delegates_.empty()); +} + +void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) { + AutoLock locked(lock_); + for (int i = 0; i < repeat_count; ++i) + delegates_.push(delegate); + // If we were empty, signal that we have work now. + if (!dry_.IsSignaled()) + dry_.Signal(); +} + +void DelegateSimpleThreadPool::Run() { + Delegate* work; + + while (true) { + dry_.Wait(); + { + AutoLock locked(lock_); + if (!dry_.IsSignaled()) + continue; + + DCHECK(!delegates_.empty()); + work = delegates_.front(); + delegates_.pop(); + + // Signal to any other threads that we're currently out of work. + if (delegates_.empty()) + dry_.Reset(); + } + + // A NULL delegate pointer signals us to quit. + if (!work) + break; + + work->Run(); + } +} + } // namespace base diff --git a/base/simple_thread.h b/base/simple_thread.h index 1c7a82b..edcbfbe 100644 --- a/base/simple_thread.h +++ b/base/simple_thread.h @@ -41,8 +41,11 @@ #define BASE_SIMPLE_THREAD_H_ #include <string> +#include <queue> +#include <vector> #include "base/basictypes.h" +#include "base/lock.h" #include "base/waitable_event.h" #include "base/platform_thread.h" @@ -70,7 +73,7 @@ class SimpleThread : public PlatformThread::Delegate { // configuration involving the thread creation and management. // Every thread has a name, in the form of |name_prefix|/TID, for example // "my_thread/321". The thread will not be created until Start() is called. - SimpleThread(const std::string& name_prefix) + explicit SimpleThread(const std::string& name_prefix) : name_prefix_(name_prefix), name_(name_prefix), thread_(), event_(true, false), tid_(0), joined_(false) { } SimpleThread(const std::string& name_prefix, const Options& options) @@ -136,6 +139,51 @@ class DelegateSimpleThread : public SimpleThread { Delegate* delegate_; }; +// DelegateSimpleThreadPool allows you to start up a fixed number of threads, +// and then add jobs which will be dispatched to the threads. This is +// convenient when you have a lot of small work that you want done +// multi-threaded, but don't want to spawn a thread for each small bit of work. +// +// You just call AddWork() to add a delegate to the list of work to be done. +// JoinAll() will make sure that all outstanding work is processed, and wait +// for everything to finish. You can reuse a pool, so you can call Start() +// again after you've called JoinAll(). +class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate { + public: + typedef DelegateSimpleThread::Delegate Delegate; + + DelegateSimpleThreadPool(const std::string name_prefix, int num_threads) + : name_prefix_(name_prefix), num_threads_(num_threads), + dry_(true, false) { } + ~DelegateSimpleThreadPool(); + + // Start up all of the underlying threads, and start processing work if we + // have any. + void Start(); + + // Make sure all outstanding work is finished, and wait for and destroy all + // of the underlying threads in the pool. + void JoinAll(); + + // It is safe to AddWork() any time, before or after Start(). + // Delegate* should always be a valid pointer, NULL is reserved internally. + void AddWork(Delegate* work, int repeat_count); + void AddWork(Delegate* work) { + AddWork(work, 1); + } + + // We implement the Delegate interface, for running our internal threads. + virtual void Run(); + + private: + const std::string name_prefix_; + int num_threads_; + std::vector<DelegateSimpleThread*> threads_; + std::queue<Delegate*> delegates_; + Lock lock_; // Locks delegates_ + WaitableEvent dry_; // Not signaled when there is no work to do. +}; + } // namespace base #endif // BASE_SIMPLE_THREAD_H_ diff --git a/base/simple_thread_unittest.cc b/base/simple_thread_unittest.cc index 3e55858..639eeae 100644 --- a/base/simple_thread_unittest.cc +++ b/base/simple_thread_unittest.cc @@ -2,6 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/atomic_sequence_num.h" +#include "base/lock.h" #include "base/simple_thread.h" #include "base/string_util.h" #include "base/waitable_event.h" @@ -37,6 +39,40 @@ class WaitEventRunner : public base::DelegateSimpleThread::Delegate { base::WaitableEvent* event_; }; +class SeqRunner : public base::DelegateSimpleThread::Delegate { + public: + SeqRunner(base::AtomicSequenceNumber* seq) : seq_(seq) { } + virtual void Run() { + seq_->GetNext(); + } + + private: + base::AtomicSequenceNumber* seq_; +}; + +// We count up on a sequence number, firing on the event when we've hit our +// expected amount, otherwise we wait on the event. This will ensure that we +// have all threads outstanding until we hit our expected thread pool size. +class VerifyPoolRunner : public base::DelegateSimpleThread::Delegate { + public: + VerifyPoolRunner(base::AtomicSequenceNumber* seq, + int total, base::WaitableEvent* event) + : seq_(seq), total_(total), event_(event) { } + + virtual void Run() { + if (seq_->GetNext() == total_) { + event_->Signal(); + } else { + event_->Wait(); + } + } + + private: + base::AtomicSequenceNumber* seq_; + int total_; + base::WaitableEvent* event_; +}; + } // namespace TEST(SimpleThreadTest, CreateAndJoin) { @@ -97,3 +133,35 @@ TEST(SimpleThreadTest, NamedWithOptions) { EXPECT_EQ(thread.name(), std::string("event_waiter/") + IntToString(thread.tid())); } + +TEST(SimpleThreadTest, ThreadPool) { + base::AtomicSequenceNumber seq; + SeqRunner runner(&seq); + base::DelegateSimpleThreadPool pool("seq_runner", 10); + + // Add work before we're running. + pool.AddWork(&runner, 300); + + EXPECT_EQ(seq.GetNext(), 0); + pool.Start(); + + // Add work while we're running. + pool.AddWork(&runner, 300); + + pool.JoinAll(); + + EXPECT_EQ(seq.GetNext(), 601); + + // We can reuse our pool. Verify that all 10 threads can actually run in + // parallel, so this test will only pass if there are actually 10 threads. + base::AtomicSequenceNumber seq2; + base::WaitableEvent event(true, false); + // Changing 9 to 10, for example, would cause us JoinAll() to never return. + VerifyPoolRunner verifier(&seq2, 9, &event); + pool.Start(); + + pool.AddWork(&verifier, 10); + + pool.JoinAll(); + EXPECT_EQ(seq2.GetNext(), 10); +} |