summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordeanm@google.com <deanm@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-09-02 11:15:48 +0000
committerdeanm@google.com <deanm@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-09-02 11:15:48 +0000
commita89d97fdbea8bf1e246f2adf33819342f684d944 (patch)
tree8f46b5c03c99276e6f35521127424134f0a79e17
parent5f6eee53a4ba6995b68a8ec6f22d3281529060c8 (diff)
downloadchromium_src-a89d97fdbea8bf1e246f2adf33819342f684d944.zip
chromium_src-a89d97fdbea8bf1e246f2adf33819342f684d944.tar.gz
chromium_src-a89d97fdbea8bf1e246f2adf33819342f684d944.tar.bz2
Add a simple thread pool to SimpleThread.
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@1634 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/simple_thread.cc66
-rw-r--r--base/simple_thread.h50
-rw-r--r--base/simple_thread_unittest.cc68
3 files changed, 183 insertions, 1 deletions
diff --git a/base/simple_thread.cc b/base/simple_thread.cc
index fa9a723..f919562 100644
--- a/base/simple_thread.cc
+++ b/base/simple_thread.cc
@@ -49,4 +49,70 @@ void DelegateSimpleThread::Run() {
delegate_ = NULL;
}
+DelegateSimpleThreadPool::~DelegateSimpleThreadPool() {
+ DCHECK(threads_.empty());
+ DCHECK(delegates_.empty());
+ DCHECK(!dry_.IsSignaled());
+}
+
+void DelegateSimpleThreadPool::Start() {
+ DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
+ for (int i = 0; i < num_threads_; ++i) {
+ DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_);
+ thread->Start();
+ threads_.push_back(thread);
+ }
+}
+
+void DelegateSimpleThreadPool::JoinAll() {
+ DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
+
+ // Tell all our threads to quit their worker loop.
+ AddWork(NULL, num_threads_);
+
+ // Join and destroy all the worker threads.
+ for (int i = 0; i < num_threads_; ++i) {
+ threads_[i]->Join();
+ delete threads_[i];
+ }
+ threads_.clear();
+ DCHECK(delegates_.empty());
+}
+
+void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
+ AutoLock locked(lock_);
+ for (int i = 0; i < repeat_count; ++i)
+ delegates_.push(delegate);
+ // If we were empty, signal that we have work now.
+ if (!dry_.IsSignaled())
+ dry_.Signal();
+}
+
+void DelegateSimpleThreadPool::Run() {
+ Delegate* work;
+
+ while (true) {
+ dry_.Wait();
+ {
+ AutoLock locked(lock_);
+ if (!dry_.IsSignaled())
+ continue;
+
+ DCHECK(!delegates_.empty());
+ work = delegates_.front();
+ delegates_.pop();
+
+ // Signal to any other threads that we're currently out of work.
+ if (delegates_.empty())
+ dry_.Reset();
+ }
+
+ // A NULL delegate pointer signals us to quit.
+ if (!work)
+ break;
+
+ work->Run();
+ }
+}
+
} // namespace base
diff --git a/base/simple_thread.h b/base/simple_thread.h
index 1c7a82b..edcbfbe 100644
--- a/base/simple_thread.h
+++ b/base/simple_thread.h
@@ -41,8 +41,11 @@
#define BASE_SIMPLE_THREAD_H_
#include <string>
+#include <queue>
+#include <vector>
#include "base/basictypes.h"
+#include "base/lock.h"
#include "base/waitable_event.h"
#include "base/platform_thread.h"
@@ -70,7 +73,7 @@ class SimpleThread : public PlatformThread::Delegate {
// configuration involving the thread creation and management.
// Every thread has a name, in the form of |name_prefix|/TID, for example
// "my_thread/321". The thread will not be created until Start() is called.
- SimpleThread(const std::string& name_prefix)
+ explicit SimpleThread(const std::string& name_prefix)
: name_prefix_(name_prefix), name_(name_prefix),
thread_(), event_(true, false), tid_(0), joined_(false) { }
SimpleThread(const std::string& name_prefix, const Options& options)
@@ -136,6 +139,51 @@ class DelegateSimpleThread : public SimpleThread {
Delegate* delegate_;
};
+// DelegateSimpleThreadPool allows you to start up a fixed number of threads,
+// and then add jobs which will be dispatched to the threads. This is
+// convenient when you have a lot of small work that you want done
+// multi-threaded, but don't want to spawn a thread for each small bit of work.
+//
+// You just call AddWork() to add a delegate to the list of work to be done.
+// JoinAll() will make sure that all outstanding work is processed, and wait
+// for everything to finish. You can reuse a pool, so you can call Start()
+// again after you've called JoinAll().
+class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate {
+ public:
+ typedef DelegateSimpleThread::Delegate Delegate;
+
+ DelegateSimpleThreadPool(const std::string name_prefix, int num_threads)
+ : name_prefix_(name_prefix), num_threads_(num_threads),
+ dry_(true, false) { }
+ ~DelegateSimpleThreadPool();
+
+ // Start up all of the underlying threads, and start processing work if we
+ // have any.
+ void Start();
+
+ // Make sure all outstanding work is finished, and wait for and destroy all
+ // of the underlying threads in the pool.
+ void JoinAll();
+
+ // It is safe to AddWork() any time, before or after Start().
+ // Delegate* should always be a valid pointer, NULL is reserved internally.
+ void AddWork(Delegate* work, int repeat_count);
+ void AddWork(Delegate* work) {
+ AddWork(work, 1);
+ }
+
+ // We implement the Delegate interface, for running our internal threads.
+ virtual void Run();
+
+ private:
+ const std::string name_prefix_;
+ int num_threads_;
+ std::vector<DelegateSimpleThread*> threads_;
+ std::queue<Delegate*> delegates_;
+ Lock lock_; // Locks delegates_
+ WaitableEvent dry_; // Not signaled when there is no work to do.
+};
+
} // namespace base
#endif // BASE_SIMPLE_THREAD_H_
diff --git a/base/simple_thread_unittest.cc b/base/simple_thread_unittest.cc
index 3e55858..639eeae 100644
--- a/base/simple_thread_unittest.cc
+++ b/base/simple_thread_unittest.cc
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/atomic_sequence_num.h"
+#include "base/lock.h"
#include "base/simple_thread.h"
#include "base/string_util.h"
#include "base/waitable_event.h"
@@ -37,6 +39,40 @@ class WaitEventRunner : public base::DelegateSimpleThread::Delegate {
base::WaitableEvent* event_;
};
+class SeqRunner : public base::DelegateSimpleThread::Delegate {
+ public:
+ SeqRunner(base::AtomicSequenceNumber* seq) : seq_(seq) { }
+ virtual void Run() {
+ seq_->GetNext();
+ }
+
+ private:
+ base::AtomicSequenceNumber* seq_;
+};
+
+// We count up on a sequence number, firing on the event when we've hit our
+// expected amount, otherwise we wait on the event. This will ensure that we
+// have all threads outstanding until we hit our expected thread pool size.
+class VerifyPoolRunner : public base::DelegateSimpleThread::Delegate {
+ public:
+ VerifyPoolRunner(base::AtomicSequenceNumber* seq,
+ int total, base::WaitableEvent* event)
+ : seq_(seq), total_(total), event_(event) { }
+
+ virtual void Run() {
+ if (seq_->GetNext() == total_) {
+ event_->Signal();
+ } else {
+ event_->Wait();
+ }
+ }
+
+ private:
+ base::AtomicSequenceNumber* seq_;
+ int total_;
+ base::WaitableEvent* event_;
+};
+
} // namespace
TEST(SimpleThreadTest, CreateAndJoin) {
@@ -97,3 +133,35 @@ TEST(SimpleThreadTest, NamedWithOptions) {
EXPECT_EQ(thread.name(), std::string("event_waiter/") +
IntToString(thread.tid()));
}
+
+TEST(SimpleThreadTest, ThreadPool) {
+ base::AtomicSequenceNumber seq;
+ SeqRunner runner(&seq);
+ base::DelegateSimpleThreadPool pool("seq_runner", 10);
+
+ // Add work before we're running.
+ pool.AddWork(&runner, 300);
+
+ EXPECT_EQ(seq.GetNext(), 0);
+ pool.Start();
+
+ // Add work while we're running.
+ pool.AddWork(&runner, 300);
+
+ pool.JoinAll();
+
+ EXPECT_EQ(seq.GetNext(), 601);
+
+ // We can reuse our pool. Verify that all 10 threads can actually run in
+ // parallel, so this test will only pass if there are actually 10 threads.
+ base::AtomicSequenceNumber seq2;
+ base::WaitableEvent event(true, false);
+ // Changing 9 to 10, for example, would cause us JoinAll() to never return.
+ VerifyPoolRunner verifier(&seq2, 9, &event);
+ pool.Start();
+
+ pool.AddWork(&verifier, 10);
+
+ pool.JoinAll();
+ EXPECT_EQ(seq2.GetNext(), 10);
+}