summaryrefslogtreecommitdiffstats
path: root/base/threading
diff options
context:
space:
mode:
authorbrettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-12-30 18:08:36 +0000
committerbrettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-12-30 18:08:36 +0000
commitac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3 (patch)
tree7118143b15fb701d5d91de270a1af7ea431eaa8a /base/threading
parentb8eeb3eeba2418d9a1a7bb8429ddd5ec592298c1 (diff)
downloadchromium_src-ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3.zip
chromium_src-ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3.tar.gz
chromium_src-ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3.tar.bz2
Move some misc thread-related stuff from base to base/thread and into the base
namespace. This does not move the "hard" thread stuff (thread.h). TEST=it compiles BUG=none Review URL: http://codereview.chromium.org/6079009 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70315 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/threading')
-rw-r--r--base/threading/simple_thread.cc152
-rw-r--r--base/threading/simple_thread.h182
-rw-r--r--base/threading/simple_thread_unittest.cc170
-rw-r--r--base/threading/watchdog.cc144
-rw-r--r--base/threading/watchdog.h98
-rw-r--r--base/threading/watchdog_unittest.cc140
-rw-r--r--base/threading/worker_pool.h35
-rw-r--r--base/threading/worker_pool_posix.cc169
-rw-r--r--base/threading/worker_pool_posix.h89
-rw-r--r--base/threading/worker_pool_posix_unittest.cc268
-rw-r--r--base/threading/worker_pool_unittest.cc46
-rw-r--r--base/threading/worker_pool_win.cc40
12 files changed, 1533 insertions, 0 deletions
diff --git a/base/threading/simple_thread.cc b/base/threading/simple_thread.cc
new file mode 100644
index 0000000..df1953f
--- /dev/null
+++ b/base/threading/simple_thread.cc
@@ -0,0 +1,152 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/simple_thread.h"
+
+#include "base/logging.h"
+#include "base/platform_thread.h"
+#include "base/string_number_conversions.h"
+
+namespace base {
+
+void SimpleThread::Start() {
+ DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times.";
+ bool success = PlatformThread::Create(options_.stack_size(), this, &thread_);
+ CHECK(success);
+ event_.Wait(); // Wait for the thread to complete initialization.
+}
+
+void SimpleThread::Join() {
+ DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread.";
+ DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
+ PlatformThread::Join(thread_);
+ joined_ = true;
+}
+
+void SimpleThread::ThreadMain() {
+ tid_ = PlatformThread::CurrentId();
+ // Construct our full name of the form "name_prefix_/TID".
+ name_.push_back('/');
+ name_.append(IntToString(tid_));
+ PlatformThread::SetName(name_.c_str());
+
+ // We've initialized our new thread, signal that we're done to Start().
+ event_.Signal();
+
+ Run();
+}
+
+SimpleThread::SimpleThread(const std::string& name_prefix)
+ : name_prefix_(name_prefix), name_(name_prefix),
+ thread_(), event_(true, false), tid_(0), joined_(false) {
+}
+
+SimpleThread::SimpleThread(const std::string& name_prefix,
+ const Options& options)
+ : name_prefix_(name_prefix), name_(name_prefix), options_(options),
+ thread_(), event_(true, false), tid_(0), joined_(false) {
+}
+
+SimpleThread::~SimpleThread() {
+ DCHECK(HasBeenStarted()) << "SimpleThread was never started.";
+ DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed.";
+}
+
+DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
+ const std::string& name_prefix)
+ : SimpleThread(name_prefix),
+ delegate_(delegate) {
+}
+
+DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
+ const std::string& name_prefix,
+ const Options& options)
+ : SimpleThread(name_prefix, options),
+ delegate_(delegate) {
+}
+
+DelegateSimpleThread::~DelegateSimpleThread() {
+}
+
+void DelegateSimpleThread::Run() {
+ DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
+ delegate_->Run();
+ delegate_ = NULL;
+}
+
+DelegateSimpleThreadPool::DelegateSimpleThreadPool(
+ const std::string& name_prefix,
+ int num_threads)
+ : name_prefix_(name_prefix),
+ num_threads_(num_threads),
+ dry_(true, false) {
+}
+
+DelegateSimpleThreadPool::~DelegateSimpleThreadPool() {
+ DCHECK(threads_.empty());
+ DCHECK(delegates_.empty());
+ DCHECK(!dry_.IsSignaled());
+}
+
+void DelegateSimpleThreadPool::Start() {
+ DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
+ for (int i = 0; i < num_threads_; ++i) {
+ DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_);
+ thread->Start();
+ threads_.push_back(thread);
+ }
+}
+
+void DelegateSimpleThreadPool::JoinAll() {
+ DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
+
+ // Tell all our threads to quit their worker loop.
+ AddWork(NULL, num_threads_);
+
+ // Join and destroy all the worker threads.
+ for (int i = 0; i < num_threads_; ++i) {
+ threads_[i]->Join();
+ delete threads_[i];
+ }
+ threads_.clear();
+ DCHECK(delegates_.empty());
+}
+
+void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
+ AutoLock locked(lock_);
+ for (int i = 0; i < repeat_count; ++i)
+ delegates_.push(delegate);
+ // If we were empty, signal that we have work now.
+ if (!dry_.IsSignaled())
+ dry_.Signal();
+}
+
+void DelegateSimpleThreadPool::Run() {
+ Delegate* work = NULL;
+
+ while (true) {
+ dry_.Wait();
+ {
+ AutoLock locked(lock_);
+ if (!dry_.IsSignaled())
+ continue;
+
+ DCHECK(!delegates_.empty());
+ work = delegates_.front();
+ delegates_.pop();
+
+ // Signal to any other threads that we're currently out of work.
+ if (delegates_.empty())
+ dry_.Reset();
+ }
+
+ // A NULL delegate pointer signals us to quit.
+ if (!work)
+ break;
+
+ work->Run();
+ }
+}
+
+} // namespace base
diff --git a/base/threading/simple_thread.h b/base/threading/simple_thread.h
new file mode 100644
index 0000000..dbff3ae
--- /dev/null
+++ b/base/threading/simple_thread.h
@@ -0,0 +1,182 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// WARNING: You should probably be using Thread (thread.h) instead. Thread is
+// Chrome's message-loop based Thread abstraction, and if you are a
+// thread running in the browser, there will likely be assumptions
+// that your thread will have an associated message loop.
+//
+// This is a simple thread interface that backs to a native operating system
+// thread. You should use this only when you want a thread that does not have
+// an associated MessageLoop. Unittesting is the best example of this.
+//
+// The simplest interface to use is DelegateSimpleThread, which will create
+// a new thread, and execute the Delegate's virtual Run() in this new thread
+// until it has completed, exiting the thread.
+//
+// NOTE: You *MUST* call Join on the thread to clean up the underlying thread
+// resources. You are also responsible for destructing the SimpleThread object.
+// It is invalid to destroy a SimpleThread while it is running, or without
+// Start() having been called (and a thread never created). The Delegate
+// object should live as long as a DelegateSimpleThread.
+//
+// Thread Safety: A SimpleThread is not completely thread safe. It is safe to
+// access it from the creating thread or from the newly created thread. This
+// implies that the creator thread should be the thread that calls Join.
+//
+// Example:
+// class MyThreadRunner : public DelegateSimpleThread::Delegate { ... };
+// MyThreadRunner runner;
+// DelegateSimpleThread thread(&runner, "good_name_here");
+// thread.Start();
+// // Start will return after the Thread has been successfully started and
+// // initialized. The newly created thread will invoke runner->Run(), and
+// // run until it returns.
+// thread.Join(); // Wait until the thread has exited. You *MUST* Join!
+// // The SimpleThread object is still valid, however you may not call Join
+// // or Start again.
+
+#ifndef BASE_THREADING_SIMPLE_THREAD_H_
+#define BASE_THREADING_SIMPLE_THREAD_H_
+#pragma once
+
+#include <string>
+#include <queue>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/lock.h"
+#include "base/waitable_event.h"
+#include "base/platform_thread.h"
+
+namespace base {
+
+// This is the base SimpleThread. You can derive from it and implement the
+// virtual Run method, or you can use the DelegateSimpleThread interface.
+class SimpleThread : public PlatformThread::Delegate {
+ public:
+ class Options {
+ public:
+ Options() : stack_size_(0) { }
+ ~Options() { }
+
+ // We use the standard compiler-supplied copy constructor.
+
+ // A custom stack size, or 0 for the system default.
+ void set_stack_size(size_t size) { stack_size_ = size; }
+ size_t stack_size() const { return stack_size_; }
+ private:
+ size_t stack_size_;
+ };
+
+ // Create a SimpleThread. |options| should be used to manage any specific
+ // configuration involving the thread creation and management.
+ // Every thread has a name, in the form of |name_prefix|/TID, for example
+ // "my_thread/321". The thread will not be created until Start() is called.
+ explicit SimpleThread(const std::string& name_prefix);
+ SimpleThread(const std::string& name_prefix, const Options& options);
+
+ virtual ~SimpleThread();
+
+ virtual void Start();
+ virtual void Join();
+
+ // We follow the PlatformThread Delegate interface.
+ virtual void ThreadMain();
+
+ // Subclasses should override the Run method.
+ virtual void Run() = 0;
+
+ // Return the thread name prefix, or "unnamed" if none was supplied.
+ std::string name_prefix() { return name_prefix_; }
+
+ // Return the completed name including TID, only valid after Start().
+ std::string name() { return name_; }
+
+ // Return the thread id, only valid after Start().
+ PlatformThreadId tid() { return tid_; }
+
+ // Return True if Start() has ever been called.
+ bool HasBeenStarted() { return event_.IsSignaled(); }
+
+ // Return True if Join() has evern been called.
+ bool HasBeenJoined() { return joined_; }
+
+ private:
+ const std::string name_prefix_;
+ std::string name_;
+ const Options options_;
+ PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join!
+ WaitableEvent event_; // Signaled if Start() was ever called.
+ PlatformThreadId tid_; // The backing thread's id.
+ bool joined_; // True if Join has been called.
+};
+
+class DelegateSimpleThread : public SimpleThread {
+ public:
+ class Delegate {
+ public:
+ Delegate() { }
+ virtual ~Delegate() { }
+ virtual void Run() = 0;
+ };
+
+ DelegateSimpleThread(Delegate* delegate,
+ const std::string& name_prefix);
+ DelegateSimpleThread(Delegate* delegate,
+ const std::string& name_prefix,
+ const Options& options);
+
+ virtual ~DelegateSimpleThread();
+ virtual void Run();
+ private:
+ Delegate* delegate_;
+};
+
+// DelegateSimpleThreadPool allows you to start up a fixed number of threads,
+// and then add jobs which will be dispatched to the threads. This is
+// convenient when you have a lot of small work that you want done
+// multi-threaded, but don't want to spawn a thread for each small bit of work.
+//
+// You just call AddWork() to add a delegate to the list of work to be done.
+// JoinAll() will make sure that all outstanding work is processed, and wait
+// for everything to finish. You can reuse a pool, so you can call Start()
+// again after you've called JoinAll().
+class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate {
+ public:
+ typedef DelegateSimpleThread::Delegate Delegate;
+
+ DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads);
+ virtual ~DelegateSimpleThreadPool();
+
+ // Start up all of the underlying threads, and start processing work if we
+ // have any.
+ void Start();
+
+ // Make sure all outstanding work is finished, and wait for and destroy all
+ // of the underlying threads in the pool.
+ void JoinAll();
+
+ // It is safe to AddWork() any time, before or after Start().
+ // Delegate* should always be a valid pointer, NULL is reserved internally.
+ void AddWork(Delegate* work, int repeat_count);
+ void AddWork(Delegate* work) {
+ AddWork(work, 1);
+ }
+
+ // We implement the Delegate interface, for running our internal threads.
+ virtual void Run();
+
+ private:
+ const std::string name_prefix_;
+ int num_threads_;
+ std::vector<DelegateSimpleThread*> threads_;
+ std::queue<Delegate*> delegates_;
+ Lock lock_; // Locks delegates_
+ WaitableEvent dry_; // Not signaled when there is no work to do.
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_SIMPLE_THREAD_H_
diff --git a/base/threading/simple_thread_unittest.cc b/base/threading/simple_thread_unittest.cc
new file mode 100644
index 0000000..56aed6b
--- /dev/null
+++ b/base/threading/simple_thread_unittest.cc
@@ -0,0 +1,170 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/atomic_sequence_num.h"
+#include "base/string_number_conversions.h"
+#include "base/threading/simple_thread.h"
+#include "base/waitable_event.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+namespace {
+
+class SetIntRunner : public DelegateSimpleThread::Delegate {
+ public:
+ SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { }
+ ~SetIntRunner() { }
+
+ virtual void Run() {
+ *ptr_ = val_;
+ }
+
+ private:
+ int* ptr_;
+ int val_;
+};
+
+class WaitEventRunner : public DelegateSimpleThread::Delegate {
+ public:
+ explicit WaitEventRunner(WaitableEvent* event) : event_(event) { }
+ ~WaitEventRunner() { }
+
+ virtual void Run() {
+ EXPECT_FALSE(event_->IsSignaled());
+ event_->Signal();
+ EXPECT_TRUE(event_->IsSignaled());
+ }
+ private:
+ WaitableEvent* event_;
+};
+
+class SeqRunner : public DelegateSimpleThread::Delegate {
+ public:
+ explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) { }
+ virtual void Run() {
+ seq_->GetNext();
+ }
+
+ private:
+ AtomicSequenceNumber* seq_;
+};
+
+// We count up on a sequence number, firing on the event when we've hit our
+// expected amount, otherwise we wait on the event. This will ensure that we
+// have all threads outstanding until we hit our expected thread pool size.
+class VerifyPoolRunner : public DelegateSimpleThread::Delegate {
+ public:
+ VerifyPoolRunner(AtomicSequenceNumber* seq,
+ int total, WaitableEvent* event)
+ : seq_(seq), total_(total), event_(event) { }
+
+ virtual void Run() {
+ if (seq_->GetNext() == total_) {
+ event_->Signal();
+ } else {
+ event_->Wait();
+ }
+ }
+
+ private:
+ AtomicSequenceNumber* seq_;
+ int total_;
+ WaitableEvent* event_;
+};
+
+} // namespace
+
+TEST(SimpleThreadTest, CreateAndJoin) {
+ int stack_int = 0;
+
+ SetIntRunner runner(&stack_int, 7);
+ EXPECT_EQ(0, stack_int);
+
+ DelegateSimpleThread thread(&runner, "int_setter");
+ EXPECT_FALSE(thread.HasBeenStarted());
+ EXPECT_FALSE(thread.HasBeenJoined());
+ EXPECT_EQ(0, stack_int);
+
+ thread.Start();
+ EXPECT_TRUE(thread.HasBeenStarted());
+ EXPECT_FALSE(thread.HasBeenJoined());
+
+ thread.Join();
+ EXPECT_TRUE(thread.HasBeenStarted());
+ EXPECT_TRUE(thread.HasBeenJoined());
+ EXPECT_EQ(7, stack_int);
+}
+
+TEST(SimpleThreadTest, WaitForEvent) {
+ // Create a thread, and wait for it to signal us.
+ WaitableEvent event(true, false);
+
+ WaitEventRunner runner(&event);
+ DelegateSimpleThread thread(&runner, "event_waiter");
+
+ EXPECT_FALSE(event.IsSignaled());
+ thread.Start();
+ event.Wait();
+ EXPECT_TRUE(event.IsSignaled());
+ thread.Join();
+}
+
+TEST(SimpleThreadTest, NamedWithOptions) {
+ WaitableEvent event(true, false);
+
+ WaitEventRunner runner(&event);
+ SimpleThread::Options options;
+ DelegateSimpleThread thread(&runner, "event_waiter", options);
+ EXPECT_EQ(thread.name_prefix(), "event_waiter");
+ EXPECT_FALSE(event.IsSignaled());
+
+ thread.Start();
+ EXPECT_EQ(thread.name_prefix(), "event_waiter");
+ EXPECT_EQ(thread.name(),
+ std::string("event_waiter/") + IntToString(thread.tid()));
+ event.Wait();
+
+ EXPECT_TRUE(event.IsSignaled());
+ thread.Join();
+
+ // We keep the name and tid, even after the thread is gone.
+ EXPECT_EQ(thread.name_prefix(), "event_waiter");
+ EXPECT_EQ(thread.name(),
+ std::string("event_waiter/") + IntToString(thread.tid()));
+}
+
+TEST(SimpleThreadTest, ThreadPool) {
+ AtomicSequenceNumber seq;
+ SeqRunner runner(&seq);
+ DelegateSimpleThreadPool pool("seq_runner", 10);
+
+ // Add work before we're running.
+ pool.AddWork(&runner, 300);
+
+ EXPECT_EQ(seq.GetNext(), 0);
+ pool.Start();
+
+ // Add work while we're running.
+ pool.AddWork(&runner, 300);
+
+ pool.JoinAll();
+
+ EXPECT_EQ(seq.GetNext(), 601);
+
+ // We can reuse our pool. Verify that all 10 threads can actually run in
+ // parallel, so this test will only pass if there are actually 10 threads.
+ AtomicSequenceNumber seq2;
+ WaitableEvent event(true, false);
+ // Changing 9 to 10, for example, would cause us JoinAll() to never return.
+ VerifyPoolRunner verifier(&seq2, 9, &event);
+ pool.Start();
+
+ pool.AddWork(&verifier, 10);
+
+ pool.JoinAll();
+ EXPECT_EQ(seq2.GetNext(), 10);
+}
+
+} // namespace base
diff --git a/base/threading/watchdog.cc b/base/threading/watchdog.cc
new file mode 100644
index 0000000..8474744
--- /dev/null
+++ b/base/threading/watchdog.cc
@@ -0,0 +1,144 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/watchdog.h"
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/platform_thread.h"
+
+namespace base {
+
+// Start thread running in a Disarmed state.
+Watchdog::Watchdog(const TimeDelta& duration,
+ const std::string& thread_watched_name,
+ bool enabled)
+ : init_successful_(false),
+ lock_(),
+ condition_variable_(&lock_),
+ state_(DISARMED),
+ duration_(duration),
+ thread_watched_name_(thread_watched_name),
+ ALLOW_THIS_IN_INITIALIZER_LIST(delegate_(this)) {
+ if (!enabled)
+ return; // Don't start thread, or doing anything really.
+ init_successful_ = PlatformThread::Create(0, // Default stack size.
+ &delegate_,
+ &handle_);
+ DCHECK(init_successful_);
+}
+
+// Notify watchdog thread, and wait for it to finish up.
+Watchdog::~Watchdog() {
+ if (!init_successful_)
+ return;
+ {
+ AutoLock lock(lock_);
+ state_ = SHUTDOWN;
+ }
+ condition_variable_.Signal();
+ PlatformThread::Join(handle_);
+}
+
+void Watchdog::Arm() {
+ ArmAtStartTime(TimeTicks::Now());
+}
+
+void Watchdog::ArmSomeTimeDeltaAgo(const TimeDelta& time_delta) {
+ ArmAtStartTime(TimeTicks::Now() - time_delta);
+}
+
+// Start clock for watchdog.
+void Watchdog::ArmAtStartTime(const TimeTicks start_time) {
+ {
+ AutoLock lock(lock_);
+ start_time_ = start_time;
+ state_ = ARMED;
+ }
+ // Force watchdog to wake up, and go to sleep with the timer ticking with the
+ // proper duration.
+ condition_variable_.Signal();
+}
+
+// Disable watchdog so that it won't do anything when time expires.
+void Watchdog::Disarm() {
+ AutoLock lock(lock_);
+ state_ = DISARMED;
+ // We don't need to signal, as the watchdog will eventually wake up, and it
+ // will check its state and time, and act accordingly.
+}
+
+void Watchdog::Alarm() {
+ DVLOG(1) << "Watchdog alarmed for " << thread_watched_name_;
+}
+
+//------------------------------------------------------------------------------
+// Internal private methods that the watchdog thread uses.
+
+void Watchdog::ThreadDelegate::ThreadMain() {
+ SetThreadName();
+ TimeDelta remaining_duration;
+ while (1) {
+ AutoLock lock(watchdog_->lock_);
+ while (DISARMED == watchdog_->state_)
+ watchdog_->condition_variable_.Wait();
+ if (SHUTDOWN == watchdog_->state_)
+ return;
+ DCHECK(ARMED == watchdog_->state_);
+ remaining_duration = watchdog_->duration_ -
+ (TimeTicks::Now() - watchdog_->start_time_);
+ if (remaining_duration.InMilliseconds() > 0) {
+ // Spurios wake? Timer drifts? Go back to sleep for remaining time.
+ watchdog_->condition_variable_.TimedWait(remaining_duration);
+ continue;
+ }
+ // We overslept, so this seems like a real alarm.
+ // Watch out for a user that stopped the debugger on a different alarm!
+ {
+ AutoLock static_lock(static_lock_);
+ if (last_debugged_alarm_time_ > watchdog_->start_time_) {
+ // False alarm: we started our clock before the debugger break (last
+ // alarm time).
+ watchdog_->start_time_ += last_debugged_alarm_delay_;
+ if (last_debugged_alarm_time_ > watchdog_->start_time_)
+ // Too many alarms must have taken place.
+ watchdog_->state_ = DISARMED;
+ continue;
+ }
+ }
+ watchdog_->state_ = DISARMED; // Only alarm at most once.
+ TimeTicks last_alarm_time = TimeTicks::Now();
+ watchdog_->Alarm(); // Set a break point here to debug on alarms.
+ TimeDelta last_alarm_delay = TimeTicks::Now() - last_alarm_time;
+ if (last_alarm_delay <= TimeDelta::FromMilliseconds(2))
+ continue;
+ // Ignore race of two alarms/breaks going off at roughly the same time.
+ AutoLock static_lock(static_lock_);
+ // This was a real debugger break.
+ last_debugged_alarm_time_ = last_alarm_time;
+ last_debugged_alarm_delay_ = last_alarm_delay;
+ }
+}
+
+void Watchdog::ThreadDelegate::SetThreadName() const {
+ std::string name = watchdog_->thread_watched_name_ + " Watchdog";
+ PlatformThread::SetName(name.c_str());
+ DVLOG(1) << "Watchdog active: " << name;
+}
+
+// static
+void Watchdog::ResetStaticData() {
+ AutoLock lock(static_lock_);
+ last_debugged_alarm_time_ = TimeTicks();
+ last_debugged_alarm_delay_ = TimeDelta();
+}
+
+// static
+Lock Watchdog::static_lock_; // Lock for access of static data...
+// static
+TimeTicks Watchdog::last_debugged_alarm_time_ = TimeTicks();
+// static
+TimeDelta Watchdog::last_debugged_alarm_delay_;
+
+} // namespace base
diff --git a/base/threading/watchdog.h b/base/threading/watchdog.h
new file mode 100644
index 0000000..025fe09
--- /dev/null
+++ b/base/threading/watchdog.h
@@ -0,0 +1,98 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// The Watchdog class creates a second thread that can Alarm if a specific
+// duration of time passes without proper attention. The duration of time is
+// specified at construction time. The Watchdog may be used many times by
+// simply calling Arm() (to start timing) and Disarm() (to reset the timer).
+// The Watchdog is typically used under a debugger, where the stack traces on
+// other threads can be examined if/when the Watchdog alarms.
+
+// Some watchdogs will be enabled or disabled via command line switches. To
+// facilitate such code, an "enabled" argument for the constuctor can be used
+// to permanently disable the watchdog. Disabled watchdogs don't even spawn
+// a second thread, and their methods call (Arm() and Disarm()) return very
+// quickly.
+
+#ifndef BASE_THREADING_WATCHDOG_H_
+#define BASE_THREADING_WATCHDOG_H_
+#pragma once
+
+#include <string>
+
+#include "base/condition_variable.h"
+#include "base/lock.h"
+#include "base/platform_thread.h"
+#include "base/time.h"
+
+namespace base {
+
+class Watchdog {
+ public:
+ // Constructor specifies how long the Watchdog will wait before alarming.
+ Watchdog(const base::TimeDelta& duration,
+ const std::string& thread_watched_name,
+ bool enabled);
+ virtual ~Watchdog();
+
+ // Start timing, and alarm when time expires (unless we're disarm()ed.)
+ void Arm(); // Arm starting now.
+ void ArmSomeTimeDeltaAgo(const base::TimeDelta& time_delta);
+ void ArmAtStartTime(const base::TimeTicks start_time);
+
+ // Reset time, and do not set off the alarm.
+ void Disarm();
+
+ // Alarm is called if the time expires after an Arm() without someone calling
+ // Disarm(). This method can be overridden to create testable classes.
+ virtual void Alarm();
+
+ // Reset static data to initial state. Useful for tests, to ensure
+ // they are independent.
+ static void ResetStaticData();
+
+ private:
+ class ThreadDelegate : public PlatformThread::Delegate {
+ public:
+ explicit ThreadDelegate(Watchdog* watchdog) : watchdog_(watchdog) {
+ }
+ virtual void ThreadMain();
+ private:
+ Watchdog* watchdog_;
+
+ void SetThreadName() const;
+ };
+
+ enum State {ARMED, DISARMED, SHUTDOWN };
+
+ bool init_successful_;
+
+ Lock lock_; // Mutex for state_.
+ ConditionVariable condition_variable_;
+ State state_;
+ const base::TimeDelta duration_; // How long after start_time_ do we alarm?
+ const std::string thread_watched_name_;
+ PlatformThreadHandle handle_;
+ ThreadDelegate delegate_; // Store it, because it must outlive the thread.
+
+ base::TimeTicks start_time_; // Start of epoch, and alarm after duration_.
+
+ // When the debugger breaks (when we alarm), all the other alarms that are
+ // armed will expire (also alarm). To diminish this effect, we track any
+ // delay due to debugger breaks, and we *try* to adjust the effective start
+ // time of other alarms to step past the debugging break.
+ // Without this safety net, any alarm will typically trigger a host of follow
+ // on alarms from callers that specify old times.
+ static Lock static_lock_; // Lock for access of static data...
+ // When did we last alarm and get stuck (for a while) in a debugger?
+ static base::TimeTicks last_debugged_alarm_time_;
+ // How long did we sit on a break in the debugger?
+ static base::TimeDelta last_debugged_alarm_delay_;
+
+ DISALLOW_COPY_AND_ASSIGN(Watchdog);
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_WATCHDOG_H_
diff --git a/base/threading/watchdog_unittest.cc b/base/threading/watchdog_unittest.cc
new file mode 100644
index 0000000..347781e
--- /dev/null
+++ b/base/threading/watchdog_unittest.cc
@@ -0,0 +1,140 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/watchdog.h"
+
+#include "base/logging.h"
+#include "base/platform_thread.h"
+#include "base/spin_wait.h"
+#include "base/time.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+namespace {
+
+//------------------------------------------------------------------------------
+// Provide a derived class to facilitate testing.
+
+class WatchdogCounter : public Watchdog {
+ public:
+ WatchdogCounter(const TimeDelta& duration,
+ const std::string& thread_watched_name,
+ bool enabled)
+ : Watchdog(duration, thread_watched_name, enabled), alarm_counter_(0) {
+ }
+
+ virtual ~WatchdogCounter() {}
+
+ virtual void Alarm() {
+ alarm_counter_++;
+ Watchdog::Alarm();
+ }
+
+ int alarm_counter() { return alarm_counter_; }
+
+ private:
+ int alarm_counter_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatchdogCounter);
+};
+
+class WatchdogTest : public testing::Test {
+ public:
+ void SetUp() {
+ Watchdog::ResetStaticData();
+ }
+};
+
+} // namespace
+
+//------------------------------------------------------------------------------
+// Actual tests
+
+// Minimal constructor/destructor test.
+TEST_F(WatchdogTest, StartupShutdownTest) {
+ Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false);
+ Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true);
+}
+
+// Test ability to call Arm and Disarm repeatedly.
+TEST_F(WatchdogTest, ArmDisarmTest) {
+ Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false);
+ watchdog1.Arm();
+ watchdog1.Disarm();
+ watchdog1.Arm();
+ watchdog1.Disarm();
+
+ Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true);
+ watchdog2.Arm();
+ watchdog2.Disarm();
+ watchdog2.Arm();
+ watchdog2.Disarm();
+}
+
+// Make sure a basic alarm fires when the time has expired.
+TEST_F(WatchdogTest, AlarmTest) {
+ WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Enabled", true);
+ watchdog.Arm();
+ SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5),
+ watchdog.alarm_counter() > 0);
+ EXPECT_EQ(1, watchdog.alarm_counter());
+}
+
+// Make sure a basic alarm fires when the time has expired.
+TEST_F(WatchdogTest, AlarmPriorTimeTest) {
+ WatchdogCounter watchdog(TimeDelta(), "Enabled2", true);
+ // Set a time in the past.
+ watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(2));
+ // It should instantly go off, but certainly in less than 5 minutes.
+ SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5),
+ watchdog.alarm_counter() > 0);
+
+ EXPECT_EQ(1, watchdog.alarm_counter());
+}
+
+// Make sure a disable alarm does nothing, even if we arm it.
+TEST_F(WatchdogTest, ConstructorDisabledTest) {
+ WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Disabled", false);
+ watchdog.Arm();
+ // Alarm should not fire, as it was disabled.
+ PlatformThread::Sleep(500);
+ EXPECT_EQ(0, watchdog.alarm_counter());
+}
+
+// Make sure Disarming will prevent firing, even after Arming.
+TEST_F(WatchdogTest, DisarmTest) {
+ WatchdogCounter watchdog(TimeDelta::FromSeconds(1), "Enabled3", true);
+
+ TimeTicks start = TimeTicks::Now();
+ watchdog.Arm();
+ PlatformThread::Sleep(100); // Sleep a bit, but not past the alarm point.
+ watchdog.Disarm();
+ TimeTicks end = TimeTicks::Now();
+
+ if (end - start > TimeDelta::FromMilliseconds(500)) {
+ LOG(WARNING) << "100ms sleep took over 500ms, making the results of this "
+ << "timing-sensitive test suspicious. Aborting now.";
+ return;
+ }
+
+ // Alarm should not have fired before it was disarmed.
+ EXPECT_EQ(0, watchdog.alarm_counter());
+
+ // Sleep past the point where it would have fired if it wasn't disarmed,
+ // and verify that it didn't fire.
+ PlatformThread::Sleep(1000);
+ EXPECT_EQ(0, watchdog.alarm_counter());
+
+ // ...but even after disarming, we can still use the alarm...
+ // Set a time greater than the timeout into the past.
+ watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(10));
+ // It should almost instantly go off, but certainly in less than 5 minutes.
+ SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5),
+ watchdog.alarm_counter() > 0);
+
+ EXPECT_EQ(1, watchdog.alarm_counter());
+}
+
+} // namespace base
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h
new file mode 100644
index 0000000..9a02acc
--- /dev/null
+++ b/base/threading/worker_pool.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_THREADING_WORKER_POOL_H_
+#define BASE_THREADING_WORKER_POOL_H_
+#pragma once
+
+#include "base/tracked.h"
+
+class Task;
+
+namespace base {
+
+// This is a facility that runs tasks that don't require a specific thread or
+// a message loop.
+//
+// WARNING: This shouldn't be used unless absolutely necessary. We don't wait
+// for the worker pool threads to finish on shutdown, so the tasks running
+// inside the pool must be extremely careful about other objects they access
+// (MessageLoops, Singletons, etc). During shutdown these object may no longer
+// exist.
+class WorkerPool {
+ public:
+ // This function posts |task| to run on a worker thread. |task_is_slow|
+ // should be used for tasks that will take a long time to execute. Returns
+ // false if |task| could not be posted to a worker thread. Regardless of
+ // return value, ownership of |task| is transferred to the worker pool.
+ static bool PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow);
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_WORKER_POOL_H_
diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc
new file mode 100644
index 0000000..2facc01
--- /dev/null
+++ b/base/threading/worker_pool_posix.cc
@@ -0,0 +1,169 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/worker_pool_posix.h"
+
+#include "base/lazy_instance.h"
+#include "base/logging.h"
+#include "base/platform_thread.h"
+#include "base/ref_counted.h"
+#include "base/stringprintf.h"
+#include "base/task.h"
+#include "base/threading/worker_pool.h"
+
+namespace base {
+
+namespace {
+
+const int kIdleSecondsBeforeExit = 10 * 60;
+// A stack size of 64 KB is too small for the CERT_PKIXVerifyCert
+// function of NSS because of NSS bug 439169.
+const int kWorkerThreadStackSize = 128 * 1024;
+
+class WorkerPoolImpl {
+ public:
+ WorkerPoolImpl();
+ ~WorkerPoolImpl();
+
+ void PostTask(const tracked_objects::Location& from_here, Task* task,
+ bool task_is_slow);
+
+ private:
+ scoped_refptr<base::PosixDynamicThreadPool> pool_;
+};
+
+WorkerPoolImpl::WorkerPoolImpl()
+ : pool_(new base::PosixDynamicThreadPool("WorkerPool",
+ kIdleSecondsBeforeExit)) {
+}
+
+WorkerPoolImpl::~WorkerPoolImpl() {
+ pool_->Terminate();
+}
+
+void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow) {
+ task->SetBirthPlace(from_here);
+ pool_->PostTask(task);
+}
+
+base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED);
+
+class WorkerThread : public PlatformThread::Delegate {
+ public:
+ WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit,
+ base::PosixDynamicThreadPool* pool)
+ : name_prefix_(name_prefix),
+ idle_seconds_before_exit_(idle_seconds_before_exit),
+ pool_(pool) {}
+
+ virtual void ThreadMain();
+
+ private:
+ const std::string name_prefix_;
+ const int idle_seconds_before_exit_;
+ scoped_refptr<base::PosixDynamicThreadPool> pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(WorkerThread);
+};
+
+void WorkerThread::ThreadMain() {
+ const std::string name = base::StringPrintf(
+ "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId());
+ PlatformThread::SetName(name.c_str());
+
+ for (;;) {
+ Task* task = pool_->WaitForTask();
+ if (!task)
+ break;
+ task->Run();
+ delete task;
+ }
+
+ // The WorkerThread is non-joinable, so it deletes itself.
+ delete this;
+}
+
+} // namespace
+
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow) {
+ g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
+ return true;
+}
+
+PosixDynamicThreadPool::PosixDynamicThreadPool(
+ const std::string& name_prefix,
+ int idle_seconds_before_exit)
+ : name_prefix_(name_prefix),
+ idle_seconds_before_exit_(idle_seconds_before_exit),
+ tasks_available_cv_(&lock_),
+ num_idle_threads_(0),
+ terminated_(false),
+ num_idle_threads_cv_(NULL) {}
+
+PosixDynamicThreadPool::~PosixDynamicThreadPool() {
+ while (!tasks_.empty()) {
+ Task* task = tasks_.front();
+ tasks_.pop();
+ delete task;
+ }
+}
+
+void PosixDynamicThreadPool::Terminate() {
+ {
+ AutoLock locked(lock_);
+ DCHECK(!terminated_) << "Thread pool is already terminated.";
+ terminated_ = true;
+ }
+ tasks_available_cv_.Broadcast();
+}
+
+void PosixDynamicThreadPool::PostTask(Task* task) {
+ AutoLock locked(lock_);
+ DCHECK(!terminated_) <<
+ "This thread pool is already terminated. Do not post new tasks.";
+
+ tasks_.push(task);
+
+ // We have enough worker threads.
+ if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) {
+ tasks_available_cv_.Signal();
+ } else {
+ // The new PlatformThread will take ownership of the WorkerThread object,
+ // which will delete itself on exit.
+ WorkerThread* worker =
+ new WorkerThread(name_prefix_, idle_seconds_before_exit_, this);
+ PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker);
+ }
+}
+
+Task* PosixDynamicThreadPool::WaitForTask() {
+ AutoLock locked(lock_);
+
+ if (terminated_)
+ return NULL;
+
+ if (tasks_.empty()) { // No work available, wait for work.
+ num_idle_threads_++;
+ if (num_idle_threads_cv_.get())
+ num_idle_threads_cv_->Signal();
+ tasks_available_cv_.TimedWait(
+ TimeDelta::FromSeconds(kIdleSecondsBeforeExit));
+ num_idle_threads_--;
+ if (num_idle_threads_cv_.get())
+ num_idle_threads_cv_->Signal();
+ if (tasks_.empty()) {
+ // We waited for work, but there's still no work. Return NULL to signal
+ // the thread to terminate.
+ return NULL;
+ }
+ }
+
+ Task* task = tasks_.front();
+ tasks_.pop();
+ return task;
+}
+
+} // namespace base
diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h
new file mode 100644
index 0000000..6c99e76
--- /dev/null
+++ b/base/threading/worker_pool_posix.h
@@ -0,0 +1,89 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// The thread pool used in the POSIX implementation of WorkerPool dynamically
+// adds threads as necessary to handle all tasks. It keeps old threads around
+// for a period of time to allow them to be reused. After this waiting period,
+// the threads exit. This thread pool uses non-joinable threads, therefore
+// worker threads are not joined during process shutdown. This means that
+// potentially long running tasks (such as DNS lookup) do not block process
+// shutdown, but also means that process shutdown may "leak" objects. Note that
+// although PosixDynamicThreadPool spawns the worker threads and manages the
+// task queue, it does not own the worker threads. The worker threads ask the
+// PosixDynamicThreadPool for work and eventually clean themselves up. The
+// worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool
+// instance, which prevents PosixDynamicThreadPool from disappearing before all
+// worker threads exit. The owner of PosixDynamicThreadPool should likewise
+// maintain a scoped_refptr to the PosixDynamicThreadPool instance.
+//
+// NOTE: The classes defined in this file are only meant for use by the POSIX
+// implementation of WorkerPool. No one else should be using these classes.
+// These symbols are exported in a header purely for testing purposes.
+
+#ifndef BASE_THREADING_WORKER_POOL_POSIX_H_
+#define BASE_THREADING_WORKER_POOL_POSIX_H_
+#pragma once
+
+#include <queue>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/condition_variable.h"
+#include "base/lock.h"
+#include "base/platform_thread.h"
+#include "base/ref_counted.h"
+#include "base/scoped_ptr.h"
+
+class Task;
+
+namespace base {
+
+class PosixDynamicThreadPool
+ : public RefCountedThreadSafe<PosixDynamicThreadPool> {
+ public:
+ class PosixDynamicThreadPoolPeer;
+
+ // All worker threads will share the same |name_prefix|. They will exit after
+ // |idle_seconds_before_exit|.
+ PosixDynamicThreadPool(const std::string& name_prefix,
+ int idle_seconds_before_exit);
+ ~PosixDynamicThreadPool();
+
+ // Indicates that the thread pool is going away. Stops handing out tasks to
+ // worker threads. Wakes up all the idle threads to let them exit.
+ void Terminate();
+
+ // Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership
+ // of |task|.
+ void PostTask(Task* task);
+
+ // Worker thread method to wait for up to |idle_seconds_before_exit| for more
+ // work from the thread pool. Returns NULL if no work is available.
+ Task* WaitForTask();
+
+ private:
+ friend class PosixDynamicThreadPoolPeer;
+
+ const std::string name_prefix_;
+ const int idle_seconds_before_exit_;
+
+ Lock lock_; // Protects all the variables below.
+
+ // Signal()s worker threads to let them know more tasks are available.
+ // Also used for Broadcast()'ing to worker threads to let them know the pool
+ // is being deleted and they can exit.
+ ConditionVariable tasks_available_cv_;
+ int num_idle_threads_;
+ std::queue<Task*> tasks_;
+ bool terminated_;
+ // Only used for tests to ensure correct thread ordering. It will always be
+ // NULL in non-test code.
+ scoped_ptr<ConditionVariable> num_idle_threads_cv_;
+
+ DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool);
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_WORKER_POOL_POSIX_H_
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc
new file mode 100644
index 0000000..48df16e
--- /dev/null
+++ b/base/threading/worker_pool_posix_unittest.cc
@@ -0,0 +1,268 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/worker_pool_posix.h"
+
+#include <set>
+
+#include "base/condition_variable.h"
+#include "base/lock.h"
+#include "base/platform_thread.h"
+#include "base/task.h"
+#include "base/waitable_event.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+// Peer class to provide passthrough access to PosixDynamicThreadPool internals.
+class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
+ public:
+ explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
+ : pool_(pool) {}
+
+ Lock* lock() { return &pool_->lock_; }
+ ConditionVariable* tasks_available_cv() {
+ return &pool_->tasks_available_cv_;
+ }
+ const std::queue<Task*>& tasks() const { return pool_->tasks_; }
+ int num_idle_threads() const { return pool_->num_idle_threads_; }
+ ConditionVariable* num_idle_threads_cv() {
+ return pool_->num_idle_threads_cv_.get();
+ }
+ void set_num_idle_threads_cv(ConditionVariable* cv) {
+ pool_->num_idle_threads_cv_.reset(cv);
+ }
+
+ private:
+ PosixDynamicThreadPool* pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
+};
+
+namespace {
+
+// IncrementingTask's main purpose is to increment a counter. It also updates a
+// set of unique thread ids, and signals a ConditionVariable on completion.
+// Note that since it does not block, there is no way to control the number of
+// threads used if more than one IncrementingTask is consecutively posted to the
+// thread pool, since the first one might finish executing before the subsequent
+// PostTask() calls get invoked.
+class IncrementingTask : public Task {
+ public:
+ IncrementingTask(Lock* counter_lock,
+ int* counter,
+ Lock* unique_threads_lock,
+ std::set<PlatformThreadId>* unique_threads)
+ : counter_lock_(counter_lock),
+ unique_threads_lock_(unique_threads_lock),
+ unique_threads_(unique_threads),
+ counter_(counter) {}
+
+ virtual void Run() {
+ AddSelfToUniqueThreadSet();
+ AutoLock locked(*counter_lock_);
+ (*counter_)++;
+ }
+
+ void AddSelfToUniqueThreadSet() {
+ AutoLock locked(*unique_threads_lock_);
+ unique_threads_->insert(PlatformThread::CurrentId());
+ }
+
+ private:
+ Lock* counter_lock_;
+ Lock* unique_threads_lock_;
+ std::set<PlatformThreadId>* unique_threads_;
+ int* counter_;
+
+ DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
+};
+
+// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
+// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
+class BlockingIncrementingTask : public Task {
+ public:
+ BlockingIncrementingTask(Lock* counter_lock,
+ int* counter,
+ Lock* unique_threads_lock,
+ std::set<PlatformThreadId>* unique_threads,
+ Lock* num_waiting_to_start_lock,
+ int* num_waiting_to_start,
+ ConditionVariable* num_waiting_to_start_cv,
+ base::WaitableEvent* start)
+ : incrementer_(
+ counter_lock, counter, unique_threads_lock, unique_threads),
+ num_waiting_to_start_lock_(num_waiting_to_start_lock),
+ num_waiting_to_start_(num_waiting_to_start),
+ num_waiting_to_start_cv_(num_waiting_to_start_cv),
+ start_(start) {}
+
+ virtual void Run() {
+ {
+ AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
+ (*num_waiting_to_start_)++;
+ }
+ num_waiting_to_start_cv_->Signal();
+ CHECK(start_->Wait());
+ incrementer_.Run();
+ }
+
+ private:
+ IncrementingTask incrementer_;
+ Lock* num_waiting_to_start_lock_;
+ int* num_waiting_to_start_;
+ ConditionVariable* num_waiting_to_start_cv_;
+ base::WaitableEvent* start_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
+};
+
+class PosixDynamicThreadPoolTest : public testing::Test {
+ protected:
+ PosixDynamicThreadPoolTest()
+ : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
+ peer_(pool_.get()),
+ counter_(0),
+ num_waiting_to_start_(0),
+ num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
+ start_(true, false) {}
+
+ virtual void SetUp() {
+ peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
+ }
+
+ virtual void TearDown() {
+ // Wake up the idle threads so they can terminate.
+ if (pool_.get()) pool_->Terminate();
+ }
+
+ void WaitForTasksToStart(int num_tasks) {
+ AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
+ while (num_waiting_to_start_ < num_tasks) {
+ num_waiting_to_start_cv_.Wait();
+ }
+ }
+
+ void WaitForIdleThreads(int num_idle_threads) {
+ AutoLock pool_locked(*peer_.lock());
+ while (peer_.num_idle_threads() < num_idle_threads) {
+ peer_.num_idle_threads_cv()->Wait();
+ }
+ }
+
+ Task* CreateNewIncrementingTask() {
+ return new IncrementingTask(&counter_lock_, &counter_,
+ &unique_threads_lock_, &unique_threads_);
+ }
+
+ Task* CreateNewBlockingIncrementingTask() {
+ return new BlockingIncrementingTask(
+ &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
+ &num_waiting_to_start_lock_, &num_waiting_to_start_,
+ &num_waiting_to_start_cv_, &start_);
+ }
+
+ scoped_refptr<base::PosixDynamicThreadPool> pool_;
+ base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
+ Lock counter_lock_;
+ int counter_;
+ Lock unique_threads_lock_;
+ std::set<PlatformThreadId> unique_threads_;
+ Lock num_waiting_to_start_lock_;
+ int num_waiting_to_start_;
+ ConditionVariable num_waiting_to_start_cv_;
+ base::WaitableEvent start_;
+};
+
+} // namespace
+
+TEST_F(PosixDynamicThreadPoolTest, Basic) {
+ EXPECT_EQ(0, peer_.num_idle_threads());
+ EXPECT_EQ(0U, unique_threads_.size());
+ EXPECT_EQ(0U, peer_.tasks().size());
+
+ // Add one task and wait for it to be completed.
+ pool_->PostTask(CreateNewIncrementingTask());
+
+ WaitForIdleThreads(1);
+
+ EXPECT_EQ(1U, unique_threads_.size()) <<
+ "There should be only one thread allocated for one task.";
+ EXPECT_EQ(1, peer_.num_idle_threads());
+ EXPECT_EQ(1, counter_);
+}
+
+TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
+ // Add one task and wait for it to be completed.
+ pool_->PostTask(CreateNewIncrementingTask());
+
+ WaitForIdleThreads(1);
+
+ // Add another 2 tasks. One should reuse the existing worker thread.
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+
+ WaitForTasksToStart(2);
+ start_.Signal();
+ WaitForIdleThreads(2);
+
+ EXPECT_EQ(2U, unique_threads_.size());
+ EXPECT_EQ(2, peer_.num_idle_threads());
+ EXPECT_EQ(3, counter_);
+}
+
+TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
+ // Add two blocking tasks.
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+
+ EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
+
+ WaitForTasksToStart(2);
+ start_.Signal();
+ WaitForIdleThreads(2);
+
+ EXPECT_EQ(2U, unique_threads_.size());
+ EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
+ EXPECT_EQ(2, counter_);
+}
+
+TEST_F(PosixDynamicThreadPoolTest, Complex) {
+ // Add two non blocking tasks and wait for them to finish.
+ pool_->PostTask(CreateNewIncrementingTask());
+
+ WaitForIdleThreads(1);
+
+ // Add two blocking tasks, start them simultaneously, and wait for them to
+ // finish.
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+ pool_->PostTask(CreateNewBlockingIncrementingTask());
+
+ WaitForTasksToStart(2);
+ start_.Signal();
+ WaitForIdleThreads(2);
+
+ EXPECT_EQ(3, counter_);
+ EXPECT_EQ(2, peer_.num_idle_threads());
+ EXPECT_EQ(2U, unique_threads_.size());
+
+ // Wake up all idle threads so they can exit.
+ {
+ AutoLock locked(*peer_.lock());
+ while (peer_.num_idle_threads() > 0) {
+ peer_.tasks_available_cv()->Signal();
+ peer_.num_idle_threads_cv()->Wait();
+ }
+ }
+
+ // Add another non blocking task. There are no threads to reuse.
+ pool_->PostTask(CreateNewIncrementingTask());
+ WaitForIdleThreads(1);
+
+ EXPECT_EQ(3U, unique_threads_.size());
+ EXPECT_EQ(1, peer_.num_idle_threads());
+ EXPECT_EQ(4, counter_);
+}
+
+} // namespace base
diff --git a/base/threading/worker_pool_unittest.cc b/base/threading/worker_pool_unittest.cc
new file mode 100644
index 0000000..cf8e0e8
--- /dev/null
+++ b/base/threading/worker_pool_unittest.cc
@@ -0,0 +1,46 @@
+// Copyright (c) 2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task.h"
+#include "base/waitable_event.h"
+#include "base/threading/worker_pool.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "testing/platform_test.h"
+
+typedef PlatformTest WorkerPoolTest;
+
+namespace base {
+
+namespace {
+
+class PostTaskTestTask : public Task {
+ public:
+ explicit PostTaskTestTask(WaitableEvent* event) : event_(event) {
+ }
+
+ void Run() {
+ event_->Signal();
+ }
+
+ private:
+ WaitableEvent* event_;
+};
+
+} // namespace
+
+TEST_F(WorkerPoolTest, PostTask) {
+ WaitableEvent test_event(false, false);
+ WaitableEvent long_test_event(false, false);
+ bool signaled;
+
+ WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&test_event), false);
+ WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&long_test_event), true);
+
+ signaled = test_event.Wait();
+ EXPECT_TRUE(signaled);
+ signaled = long_test_event.Wait();
+ EXPECT_TRUE(signaled);
+}
+
+} // namespace base
diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc
new file mode 100644
index 0000000..2072e52
--- /dev/null
+++ b/base/threading/worker_pool_win.cc
@@ -0,0 +1,40 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/worker_pool.h"
+
+#include "base/logging.h"
+#include "base/task.h"
+
+namespace base {
+
+namespace {
+
+DWORD CALLBACK WorkItemCallback(void* param) {
+ Task* task = static_cast<Task*>(param);
+ task->Run();
+ delete task;
+ return 0;
+}
+
+} // namespace
+
+bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
+ Task* task, bool task_is_slow) {
+ task->SetBirthPlace(from_here);
+
+ ULONG flags = 0;
+ if (task_is_slow)
+ flags |= WT_EXECUTELONGFUNCTION;
+
+ if (!QueueUserWorkItem(WorkItemCallback, task, flags)) {
+ DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError();
+ delete task;
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace base