diff options
author | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-12-30 18:08:36 +0000 |
---|---|---|
committer | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-12-30 18:08:36 +0000 |
commit | ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3 (patch) | |
tree | 7118143b15fb701d5d91de270a1af7ea431eaa8a /base/threading | |
parent | b8eeb3eeba2418d9a1a7bb8429ddd5ec592298c1 (diff) | |
download | chromium_src-ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3.zip chromium_src-ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3.tar.gz chromium_src-ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3.tar.bz2 |
Move some misc thread-related stuff from base to base/thread and into the base
namespace. This does not move the "hard" thread stuff (thread.h).
TEST=it compiles
BUG=none
Review URL: http://codereview.chromium.org/6079009
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70315 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/threading')
-rw-r--r-- | base/threading/simple_thread.cc | 152 | ||||
-rw-r--r-- | base/threading/simple_thread.h | 182 | ||||
-rw-r--r-- | base/threading/simple_thread_unittest.cc | 170 | ||||
-rw-r--r-- | base/threading/watchdog.cc | 144 | ||||
-rw-r--r-- | base/threading/watchdog.h | 98 | ||||
-rw-r--r-- | base/threading/watchdog_unittest.cc | 140 | ||||
-rw-r--r-- | base/threading/worker_pool.h | 35 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.cc | 169 | ||||
-rw-r--r-- | base/threading/worker_pool_posix.h | 89 | ||||
-rw-r--r-- | base/threading/worker_pool_posix_unittest.cc | 268 | ||||
-rw-r--r-- | base/threading/worker_pool_unittest.cc | 46 | ||||
-rw-r--r-- | base/threading/worker_pool_win.cc | 40 |
12 files changed, 1533 insertions, 0 deletions
diff --git a/base/threading/simple_thread.cc b/base/threading/simple_thread.cc new file mode 100644 index 0000000..df1953f --- /dev/null +++ b/base/threading/simple_thread.cc @@ -0,0 +1,152 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/simple_thread.h" + +#include "base/logging.h" +#include "base/platform_thread.h" +#include "base/string_number_conversions.h" + +namespace base { + +void SimpleThread::Start() { + DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times."; + bool success = PlatformThread::Create(options_.stack_size(), this, &thread_); + CHECK(success); + event_.Wait(); // Wait for the thread to complete initialization. +} + +void SimpleThread::Join() { + DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread."; + DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times."; + PlatformThread::Join(thread_); + joined_ = true; +} + +void SimpleThread::ThreadMain() { + tid_ = PlatformThread::CurrentId(); + // Construct our full name of the form "name_prefix_/TID". + name_.push_back('/'); + name_.append(IntToString(tid_)); + PlatformThread::SetName(name_.c_str()); + + // We've initialized our new thread, signal that we're done to Start(). + event_.Signal(); + + Run(); +} + +SimpleThread::SimpleThread(const std::string& name_prefix) + : name_prefix_(name_prefix), name_(name_prefix), + thread_(), event_(true, false), tid_(0), joined_(false) { +} + +SimpleThread::SimpleThread(const std::string& name_prefix, + const Options& options) + : name_prefix_(name_prefix), name_(name_prefix), options_(options), + thread_(), event_(true, false), tid_(0), joined_(false) { +} + +SimpleThread::~SimpleThread() { + DCHECK(HasBeenStarted()) << "SimpleThread was never started."; + DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed."; +} + +DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix) + : SimpleThread(name_prefix), + delegate_(delegate) { +} + +DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix, + const Options& options) + : SimpleThread(name_prefix, options), + delegate_(delegate) { +} + +DelegateSimpleThread::~DelegateSimpleThread() { +} + +void DelegateSimpleThread::Run() { + DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)"; + delegate_->Run(); + delegate_ = NULL; +} + +DelegateSimpleThreadPool::DelegateSimpleThreadPool( + const std::string& name_prefix, + int num_threads) + : name_prefix_(name_prefix), + num_threads_(num_threads), + dry_(true, false) { +} + +DelegateSimpleThreadPool::~DelegateSimpleThreadPool() { + DCHECK(threads_.empty()); + DCHECK(delegates_.empty()); + DCHECK(!dry_.IsSignaled()); +} + +void DelegateSimpleThreadPool::Start() { + DCHECK(threads_.empty()) << "Start() called with outstanding threads."; + for (int i = 0; i < num_threads_; ++i) { + DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_); + thread->Start(); + threads_.push_back(thread); + } +} + +void DelegateSimpleThreadPool::JoinAll() { + DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads."; + + // Tell all our threads to quit their worker loop. + AddWork(NULL, num_threads_); + + // Join and destroy all the worker threads. + for (int i = 0; i < num_threads_; ++i) { + threads_[i]->Join(); + delete threads_[i]; + } + threads_.clear(); + DCHECK(delegates_.empty()); +} + +void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) { + AutoLock locked(lock_); + for (int i = 0; i < repeat_count; ++i) + delegates_.push(delegate); + // If we were empty, signal that we have work now. + if (!dry_.IsSignaled()) + dry_.Signal(); +} + +void DelegateSimpleThreadPool::Run() { + Delegate* work = NULL; + + while (true) { + dry_.Wait(); + { + AutoLock locked(lock_); + if (!dry_.IsSignaled()) + continue; + + DCHECK(!delegates_.empty()); + work = delegates_.front(); + delegates_.pop(); + + // Signal to any other threads that we're currently out of work. + if (delegates_.empty()) + dry_.Reset(); + } + + // A NULL delegate pointer signals us to quit. + if (!work) + break; + + work->Run(); + } +} + +} // namespace base diff --git a/base/threading/simple_thread.h b/base/threading/simple_thread.h new file mode 100644 index 0000000..dbff3ae --- /dev/null +++ b/base/threading/simple_thread.h @@ -0,0 +1,182 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// WARNING: You should probably be using Thread (thread.h) instead. Thread is +// Chrome's message-loop based Thread abstraction, and if you are a +// thread running in the browser, there will likely be assumptions +// that your thread will have an associated message loop. +// +// This is a simple thread interface that backs to a native operating system +// thread. You should use this only when you want a thread that does not have +// an associated MessageLoop. Unittesting is the best example of this. +// +// The simplest interface to use is DelegateSimpleThread, which will create +// a new thread, and execute the Delegate's virtual Run() in this new thread +// until it has completed, exiting the thread. +// +// NOTE: You *MUST* call Join on the thread to clean up the underlying thread +// resources. You are also responsible for destructing the SimpleThread object. +// It is invalid to destroy a SimpleThread while it is running, or without +// Start() having been called (and a thread never created). The Delegate +// object should live as long as a DelegateSimpleThread. +// +// Thread Safety: A SimpleThread is not completely thread safe. It is safe to +// access it from the creating thread or from the newly created thread. This +// implies that the creator thread should be the thread that calls Join. +// +// Example: +// class MyThreadRunner : public DelegateSimpleThread::Delegate { ... }; +// MyThreadRunner runner; +// DelegateSimpleThread thread(&runner, "good_name_here"); +// thread.Start(); +// // Start will return after the Thread has been successfully started and +// // initialized. The newly created thread will invoke runner->Run(), and +// // run until it returns. +// thread.Join(); // Wait until the thread has exited. You *MUST* Join! +// // The SimpleThread object is still valid, however you may not call Join +// // or Start again. + +#ifndef BASE_THREADING_SIMPLE_THREAD_H_ +#define BASE_THREADING_SIMPLE_THREAD_H_ +#pragma once + +#include <string> +#include <queue> +#include <vector> + +#include "base/basictypes.h" +#include "base/lock.h" +#include "base/waitable_event.h" +#include "base/platform_thread.h" + +namespace base { + +// This is the base SimpleThread. You can derive from it and implement the +// virtual Run method, or you can use the DelegateSimpleThread interface. +class SimpleThread : public PlatformThread::Delegate { + public: + class Options { + public: + Options() : stack_size_(0) { } + ~Options() { } + + // We use the standard compiler-supplied copy constructor. + + // A custom stack size, or 0 for the system default. + void set_stack_size(size_t size) { stack_size_ = size; } + size_t stack_size() const { return stack_size_; } + private: + size_t stack_size_; + }; + + // Create a SimpleThread. |options| should be used to manage any specific + // configuration involving the thread creation and management. + // Every thread has a name, in the form of |name_prefix|/TID, for example + // "my_thread/321". The thread will not be created until Start() is called. + explicit SimpleThread(const std::string& name_prefix); + SimpleThread(const std::string& name_prefix, const Options& options); + + virtual ~SimpleThread(); + + virtual void Start(); + virtual void Join(); + + // We follow the PlatformThread Delegate interface. + virtual void ThreadMain(); + + // Subclasses should override the Run method. + virtual void Run() = 0; + + // Return the thread name prefix, or "unnamed" if none was supplied. + std::string name_prefix() { return name_prefix_; } + + // Return the completed name including TID, only valid after Start(). + std::string name() { return name_; } + + // Return the thread id, only valid after Start(). + PlatformThreadId tid() { return tid_; } + + // Return True if Start() has ever been called. + bool HasBeenStarted() { return event_.IsSignaled(); } + + // Return True if Join() has evern been called. + bool HasBeenJoined() { return joined_; } + + private: + const std::string name_prefix_; + std::string name_; + const Options options_; + PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join! + WaitableEvent event_; // Signaled if Start() was ever called. + PlatformThreadId tid_; // The backing thread's id. + bool joined_; // True if Join has been called. +}; + +class DelegateSimpleThread : public SimpleThread { + public: + class Delegate { + public: + Delegate() { } + virtual ~Delegate() { } + virtual void Run() = 0; + }; + + DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix); + DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix, + const Options& options); + + virtual ~DelegateSimpleThread(); + virtual void Run(); + private: + Delegate* delegate_; +}; + +// DelegateSimpleThreadPool allows you to start up a fixed number of threads, +// and then add jobs which will be dispatched to the threads. This is +// convenient when you have a lot of small work that you want done +// multi-threaded, but don't want to spawn a thread for each small bit of work. +// +// You just call AddWork() to add a delegate to the list of work to be done. +// JoinAll() will make sure that all outstanding work is processed, and wait +// for everything to finish. You can reuse a pool, so you can call Start() +// again after you've called JoinAll(). +class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate { + public: + typedef DelegateSimpleThread::Delegate Delegate; + + DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads); + virtual ~DelegateSimpleThreadPool(); + + // Start up all of the underlying threads, and start processing work if we + // have any. + void Start(); + + // Make sure all outstanding work is finished, and wait for and destroy all + // of the underlying threads in the pool. + void JoinAll(); + + // It is safe to AddWork() any time, before or after Start(). + // Delegate* should always be a valid pointer, NULL is reserved internally. + void AddWork(Delegate* work, int repeat_count); + void AddWork(Delegate* work) { + AddWork(work, 1); + } + + // We implement the Delegate interface, for running our internal threads. + virtual void Run(); + + private: + const std::string name_prefix_; + int num_threads_; + std::vector<DelegateSimpleThread*> threads_; + std::queue<Delegate*> delegates_; + Lock lock_; // Locks delegates_ + WaitableEvent dry_; // Not signaled when there is no work to do. +}; + +} // namespace base + +#endif // BASE_THREADING_SIMPLE_THREAD_H_ diff --git a/base/threading/simple_thread_unittest.cc b/base/threading/simple_thread_unittest.cc new file mode 100644 index 0000000..56aed6b --- /dev/null +++ b/base/threading/simple_thread_unittest.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/atomic_sequence_num.h" +#include "base/string_number_conversions.h" +#include "base/threading/simple_thread.h" +#include "base/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +class SetIntRunner : public DelegateSimpleThread::Delegate { + public: + SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { } + ~SetIntRunner() { } + + virtual void Run() { + *ptr_ = val_; + } + + private: + int* ptr_; + int val_; +}; + +class WaitEventRunner : public DelegateSimpleThread::Delegate { + public: + explicit WaitEventRunner(WaitableEvent* event) : event_(event) { } + ~WaitEventRunner() { } + + virtual void Run() { + EXPECT_FALSE(event_->IsSignaled()); + event_->Signal(); + EXPECT_TRUE(event_->IsSignaled()); + } + private: + WaitableEvent* event_; +}; + +class SeqRunner : public DelegateSimpleThread::Delegate { + public: + explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) { } + virtual void Run() { + seq_->GetNext(); + } + + private: + AtomicSequenceNumber* seq_; +}; + +// We count up on a sequence number, firing on the event when we've hit our +// expected amount, otherwise we wait on the event. This will ensure that we +// have all threads outstanding until we hit our expected thread pool size. +class VerifyPoolRunner : public DelegateSimpleThread::Delegate { + public: + VerifyPoolRunner(AtomicSequenceNumber* seq, + int total, WaitableEvent* event) + : seq_(seq), total_(total), event_(event) { } + + virtual void Run() { + if (seq_->GetNext() == total_) { + event_->Signal(); + } else { + event_->Wait(); + } + } + + private: + AtomicSequenceNumber* seq_; + int total_; + WaitableEvent* event_; +}; + +} // namespace + +TEST(SimpleThreadTest, CreateAndJoin) { + int stack_int = 0; + + SetIntRunner runner(&stack_int, 7); + EXPECT_EQ(0, stack_int); + + DelegateSimpleThread thread(&runner, "int_setter"); + EXPECT_FALSE(thread.HasBeenStarted()); + EXPECT_FALSE(thread.HasBeenJoined()); + EXPECT_EQ(0, stack_int); + + thread.Start(); + EXPECT_TRUE(thread.HasBeenStarted()); + EXPECT_FALSE(thread.HasBeenJoined()); + + thread.Join(); + EXPECT_TRUE(thread.HasBeenStarted()); + EXPECT_TRUE(thread.HasBeenJoined()); + EXPECT_EQ(7, stack_int); +} + +TEST(SimpleThreadTest, WaitForEvent) { + // Create a thread, and wait for it to signal us. + WaitableEvent event(true, false); + + WaitEventRunner runner(&event); + DelegateSimpleThread thread(&runner, "event_waiter"); + + EXPECT_FALSE(event.IsSignaled()); + thread.Start(); + event.Wait(); + EXPECT_TRUE(event.IsSignaled()); + thread.Join(); +} + +TEST(SimpleThreadTest, NamedWithOptions) { + WaitableEvent event(true, false); + + WaitEventRunner runner(&event); + SimpleThread::Options options; + DelegateSimpleThread thread(&runner, "event_waiter", options); + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_FALSE(event.IsSignaled()); + + thread.Start(); + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_EQ(thread.name(), + std::string("event_waiter/") + IntToString(thread.tid())); + event.Wait(); + + EXPECT_TRUE(event.IsSignaled()); + thread.Join(); + + // We keep the name and tid, even after the thread is gone. + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_EQ(thread.name(), + std::string("event_waiter/") + IntToString(thread.tid())); +} + +TEST(SimpleThreadTest, ThreadPool) { + AtomicSequenceNumber seq; + SeqRunner runner(&seq); + DelegateSimpleThreadPool pool("seq_runner", 10); + + // Add work before we're running. + pool.AddWork(&runner, 300); + + EXPECT_EQ(seq.GetNext(), 0); + pool.Start(); + + // Add work while we're running. + pool.AddWork(&runner, 300); + + pool.JoinAll(); + + EXPECT_EQ(seq.GetNext(), 601); + + // We can reuse our pool. Verify that all 10 threads can actually run in + // parallel, so this test will only pass if there are actually 10 threads. + AtomicSequenceNumber seq2; + WaitableEvent event(true, false); + // Changing 9 to 10, for example, would cause us JoinAll() to never return. + VerifyPoolRunner verifier(&seq2, 9, &event); + pool.Start(); + + pool.AddWork(&verifier, 10); + + pool.JoinAll(); + EXPECT_EQ(seq2.GetNext(), 10); +} + +} // namespace base diff --git a/base/threading/watchdog.cc b/base/threading/watchdog.cc new file mode 100644 index 0000000..8474744 --- /dev/null +++ b/base/threading/watchdog.cc @@ -0,0 +1,144 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/watchdog.h" + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/platform_thread.h" + +namespace base { + +// Start thread running in a Disarmed state. +Watchdog::Watchdog(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled) + : init_successful_(false), + lock_(), + condition_variable_(&lock_), + state_(DISARMED), + duration_(duration), + thread_watched_name_(thread_watched_name), + ALLOW_THIS_IN_INITIALIZER_LIST(delegate_(this)) { + if (!enabled) + return; // Don't start thread, or doing anything really. + init_successful_ = PlatformThread::Create(0, // Default stack size. + &delegate_, + &handle_); + DCHECK(init_successful_); +} + +// Notify watchdog thread, and wait for it to finish up. +Watchdog::~Watchdog() { + if (!init_successful_) + return; + { + AutoLock lock(lock_); + state_ = SHUTDOWN; + } + condition_variable_.Signal(); + PlatformThread::Join(handle_); +} + +void Watchdog::Arm() { + ArmAtStartTime(TimeTicks::Now()); +} + +void Watchdog::ArmSomeTimeDeltaAgo(const TimeDelta& time_delta) { + ArmAtStartTime(TimeTicks::Now() - time_delta); +} + +// Start clock for watchdog. +void Watchdog::ArmAtStartTime(const TimeTicks start_time) { + { + AutoLock lock(lock_); + start_time_ = start_time; + state_ = ARMED; + } + // Force watchdog to wake up, and go to sleep with the timer ticking with the + // proper duration. + condition_variable_.Signal(); +} + +// Disable watchdog so that it won't do anything when time expires. +void Watchdog::Disarm() { + AutoLock lock(lock_); + state_ = DISARMED; + // We don't need to signal, as the watchdog will eventually wake up, and it + // will check its state and time, and act accordingly. +} + +void Watchdog::Alarm() { + DVLOG(1) << "Watchdog alarmed for " << thread_watched_name_; +} + +//------------------------------------------------------------------------------ +// Internal private methods that the watchdog thread uses. + +void Watchdog::ThreadDelegate::ThreadMain() { + SetThreadName(); + TimeDelta remaining_duration; + while (1) { + AutoLock lock(watchdog_->lock_); + while (DISARMED == watchdog_->state_) + watchdog_->condition_variable_.Wait(); + if (SHUTDOWN == watchdog_->state_) + return; + DCHECK(ARMED == watchdog_->state_); + remaining_duration = watchdog_->duration_ - + (TimeTicks::Now() - watchdog_->start_time_); + if (remaining_duration.InMilliseconds() > 0) { + // Spurios wake? Timer drifts? Go back to sleep for remaining time. + watchdog_->condition_variable_.TimedWait(remaining_duration); + continue; + } + // We overslept, so this seems like a real alarm. + // Watch out for a user that stopped the debugger on a different alarm! + { + AutoLock static_lock(static_lock_); + if (last_debugged_alarm_time_ > watchdog_->start_time_) { + // False alarm: we started our clock before the debugger break (last + // alarm time). + watchdog_->start_time_ += last_debugged_alarm_delay_; + if (last_debugged_alarm_time_ > watchdog_->start_time_) + // Too many alarms must have taken place. + watchdog_->state_ = DISARMED; + continue; + } + } + watchdog_->state_ = DISARMED; // Only alarm at most once. + TimeTicks last_alarm_time = TimeTicks::Now(); + watchdog_->Alarm(); // Set a break point here to debug on alarms. + TimeDelta last_alarm_delay = TimeTicks::Now() - last_alarm_time; + if (last_alarm_delay <= TimeDelta::FromMilliseconds(2)) + continue; + // Ignore race of two alarms/breaks going off at roughly the same time. + AutoLock static_lock(static_lock_); + // This was a real debugger break. + last_debugged_alarm_time_ = last_alarm_time; + last_debugged_alarm_delay_ = last_alarm_delay; + } +} + +void Watchdog::ThreadDelegate::SetThreadName() const { + std::string name = watchdog_->thread_watched_name_ + " Watchdog"; + PlatformThread::SetName(name.c_str()); + DVLOG(1) << "Watchdog active: " << name; +} + +// static +void Watchdog::ResetStaticData() { + AutoLock lock(static_lock_); + last_debugged_alarm_time_ = TimeTicks(); + last_debugged_alarm_delay_ = TimeDelta(); +} + +// static +Lock Watchdog::static_lock_; // Lock for access of static data... +// static +TimeTicks Watchdog::last_debugged_alarm_time_ = TimeTicks(); +// static +TimeDelta Watchdog::last_debugged_alarm_delay_; + +} // namespace base diff --git a/base/threading/watchdog.h b/base/threading/watchdog.h new file mode 100644 index 0000000..025fe09 --- /dev/null +++ b/base/threading/watchdog.h @@ -0,0 +1,98 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// The Watchdog class creates a second thread that can Alarm if a specific +// duration of time passes without proper attention. The duration of time is +// specified at construction time. The Watchdog may be used many times by +// simply calling Arm() (to start timing) and Disarm() (to reset the timer). +// The Watchdog is typically used under a debugger, where the stack traces on +// other threads can be examined if/when the Watchdog alarms. + +// Some watchdogs will be enabled or disabled via command line switches. To +// facilitate such code, an "enabled" argument for the constuctor can be used +// to permanently disable the watchdog. Disabled watchdogs don't even spawn +// a second thread, and their methods call (Arm() and Disarm()) return very +// quickly. + +#ifndef BASE_THREADING_WATCHDOG_H_ +#define BASE_THREADING_WATCHDOG_H_ +#pragma once + +#include <string> + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/time.h" + +namespace base { + +class Watchdog { + public: + // Constructor specifies how long the Watchdog will wait before alarming. + Watchdog(const base::TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled); + virtual ~Watchdog(); + + // Start timing, and alarm when time expires (unless we're disarm()ed.) + void Arm(); // Arm starting now. + void ArmSomeTimeDeltaAgo(const base::TimeDelta& time_delta); + void ArmAtStartTime(const base::TimeTicks start_time); + + // Reset time, and do not set off the alarm. + void Disarm(); + + // Alarm is called if the time expires after an Arm() without someone calling + // Disarm(). This method can be overridden to create testable classes. + virtual void Alarm(); + + // Reset static data to initial state. Useful for tests, to ensure + // they are independent. + static void ResetStaticData(); + + private: + class ThreadDelegate : public PlatformThread::Delegate { + public: + explicit ThreadDelegate(Watchdog* watchdog) : watchdog_(watchdog) { + } + virtual void ThreadMain(); + private: + Watchdog* watchdog_; + + void SetThreadName() const; + }; + + enum State {ARMED, DISARMED, SHUTDOWN }; + + bool init_successful_; + + Lock lock_; // Mutex for state_. + ConditionVariable condition_variable_; + State state_; + const base::TimeDelta duration_; // How long after start_time_ do we alarm? + const std::string thread_watched_name_; + PlatformThreadHandle handle_; + ThreadDelegate delegate_; // Store it, because it must outlive the thread. + + base::TimeTicks start_time_; // Start of epoch, and alarm after duration_. + + // When the debugger breaks (when we alarm), all the other alarms that are + // armed will expire (also alarm). To diminish this effect, we track any + // delay due to debugger breaks, and we *try* to adjust the effective start + // time of other alarms to step past the debugging break. + // Without this safety net, any alarm will typically trigger a host of follow + // on alarms from callers that specify old times. + static Lock static_lock_; // Lock for access of static data... + // When did we last alarm and get stuck (for a while) in a debugger? + static base::TimeTicks last_debugged_alarm_time_; + // How long did we sit on a break in the debugger? + static base::TimeDelta last_debugged_alarm_delay_; + + DISALLOW_COPY_AND_ASSIGN(Watchdog); +}; + +} // namespace base + +#endif // BASE_THREADING_WATCHDOG_H_ diff --git a/base/threading/watchdog_unittest.cc b/base/threading/watchdog_unittest.cc new file mode 100644 index 0000000..347781e --- /dev/null +++ b/base/threading/watchdog_unittest.cc @@ -0,0 +1,140 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/watchdog.h" + +#include "base/logging.h" +#include "base/platform_thread.h" +#include "base/spin_wait.h" +#include "base/time.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +//------------------------------------------------------------------------------ +// Provide a derived class to facilitate testing. + +class WatchdogCounter : public Watchdog { + public: + WatchdogCounter(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled) + : Watchdog(duration, thread_watched_name, enabled), alarm_counter_(0) { + } + + virtual ~WatchdogCounter() {} + + virtual void Alarm() { + alarm_counter_++; + Watchdog::Alarm(); + } + + int alarm_counter() { return alarm_counter_; } + + private: + int alarm_counter_; + + DISALLOW_COPY_AND_ASSIGN(WatchdogCounter); +}; + +class WatchdogTest : public testing::Test { + public: + void SetUp() { + Watchdog::ResetStaticData(); + } +}; + +} // namespace + +//------------------------------------------------------------------------------ +// Actual tests + +// Minimal constructor/destructor test. +TEST_F(WatchdogTest, StartupShutdownTest) { + Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); + Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); +} + +// Test ability to call Arm and Disarm repeatedly. +TEST_F(WatchdogTest, ArmDisarmTest) { + Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); + watchdog1.Arm(); + watchdog1.Disarm(); + watchdog1.Arm(); + watchdog1.Disarm(); + + Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); + watchdog2.Arm(); + watchdog2.Disarm(); + watchdog2.Arm(); + watchdog2.Disarm(); +} + +// Make sure a basic alarm fires when the time has expired. +TEST_F(WatchdogTest, AlarmTest) { + WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Enabled", true); + watchdog.Arm(); + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +// Make sure a basic alarm fires when the time has expired. +TEST_F(WatchdogTest, AlarmPriorTimeTest) { + WatchdogCounter watchdog(TimeDelta(), "Enabled2", true); + // Set a time in the past. + watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(2)); + // It should instantly go off, but certainly in less than 5 minutes. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +// Make sure a disable alarm does nothing, even if we arm it. +TEST_F(WatchdogTest, ConstructorDisabledTest) { + WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Disabled", false); + watchdog.Arm(); + // Alarm should not fire, as it was disabled. + PlatformThread::Sleep(500); + EXPECT_EQ(0, watchdog.alarm_counter()); +} + +// Make sure Disarming will prevent firing, even after Arming. +TEST_F(WatchdogTest, DisarmTest) { + WatchdogCounter watchdog(TimeDelta::FromSeconds(1), "Enabled3", true); + + TimeTicks start = TimeTicks::Now(); + watchdog.Arm(); + PlatformThread::Sleep(100); // Sleep a bit, but not past the alarm point. + watchdog.Disarm(); + TimeTicks end = TimeTicks::Now(); + + if (end - start > TimeDelta::FromMilliseconds(500)) { + LOG(WARNING) << "100ms sleep took over 500ms, making the results of this " + << "timing-sensitive test suspicious. Aborting now."; + return; + } + + // Alarm should not have fired before it was disarmed. + EXPECT_EQ(0, watchdog.alarm_counter()); + + // Sleep past the point where it would have fired if it wasn't disarmed, + // and verify that it didn't fire. + PlatformThread::Sleep(1000); + EXPECT_EQ(0, watchdog.alarm_counter()); + + // ...but even after disarming, we can still use the alarm... + // Set a time greater than the timeout into the past. + watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(10)); + // It should almost instantly go off, but certainly in less than 5 minutes. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +} // namespace base diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h new file mode 100644 index 0000000..9a02acc --- /dev/null +++ b/base/threading/worker_pool.h @@ -0,0 +1,35 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_WORKER_POOL_H_ +#define BASE_THREADING_WORKER_POOL_H_ +#pragma once + +#include "base/tracked.h" + +class Task; + +namespace base { + +// This is a facility that runs tasks that don't require a specific thread or +// a message loop. +// +// WARNING: This shouldn't be used unless absolutely necessary. We don't wait +// for the worker pool threads to finish on shutdown, so the tasks running +// inside the pool must be extremely careful about other objects they access +// (MessageLoops, Singletons, etc). During shutdown these object may no longer +// exist. +class WorkerPool { + public: + // This function posts |task| to run on a worker thread. |task_is_slow| + // should be used for tasks that will take a long time to execute. Returns + // false if |task| could not be posted to a worker thread. Regardless of + // return value, ownership of |task| is transferred to the worker pool. + static bool PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow); +}; + +} // namespace base + +#endif // BASE_THREADING_WORKER_POOL_H_ diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc new file mode 100644 index 0000000..2facc01 --- /dev/null +++ b/base/threading/worker_pool_posix.cc @@ -0,0 +1,169 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" +#include "base/stringprintf.h" +#include "base/task.h" +#include "base/threading/worker_pool.h" + +namespace base { + +namespace { + +const int kIdleSecondsBeforeExit = 10 * 60; +// A stack size of 64 KB is too small for the CERT_PKIXVerifyCert +// function of NSS because of NSS bug 439169. +const int kWorkerThreadStackSize = 128 * 1024; + +class WorkerPoolImpl { + public: + WorkerPoolImpl(); + ~WorkerPoolImpl(); + + void PostTask(const tracked_objects::Location& from_here, Task* task, + bool task_is_slow); + + private: + scoped_refptr<base::PosixDynamicThreadPool> pool_; +}; + +WorkerPoolImpl::WorkerPoolImpl() + : pool_(new base::PosixDynamicThreadPool("WorkerPool", + kIdleSecondsBeforeExit)) { +} + +WorkerPoolImpl::~WorkerPoolImpl() { + pool_->Terminate(); +} + +void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + task->SetBirthPlace(from_here); + pool_->PostTask(task); +} + +base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED); + +class WorkerThread : public PlatformThread::Delegate { + public: + WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, + base::PosixDynamicThreadPool* pool) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + pool_(pool) {} + + virtual void ThreadMain(); + + private: + const std::string name_prefix_; + const int idle_seconds_before_exit_; + scoped_refptr<base::PosixDynamicThreadPool> pool_; + + DISALLOW_COPY_AND_ASSIGN(WorkerThread); +}; + +void WorkerThread::ThreadMain() { + const std::string name = base::StringPrintf( + "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); + PlatformThread::SetName(name.c_str()); + + for (;;) { + Task* task = pool_->WaitForTask(); + if (!task) + break; + task->Run(); + delete task; + } + + // The WorkerThread is non-joinable, so it deletes itself. + delete this; +} + +} // namespace + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); + return true; +} + +PosixDynamicThreadPool::PosixDynamicThreadPool( + const std::string& name_prefix, + int idle_seconds_before_exit) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + tasks_available_cv_(&lock_), + num_idle_threads_(0), + terminated_(false), + num_idle_threads_cv_(NULL) {} + +PosixDynamicThreadPool::~PosixDynamicThreadPool() { + while (!tasks_.empty()) { + Task* task = tasks_.front(); + tasks_.pop(); + delete task; + } +} + +void PosixDynamicThreadPool::Terminate() { + { + AutoLock locked(lock_); + DCHECK(!terminated_) << "Thread pool is already terminated."; + terminated_ = true; + } + tasks_available_cv_.Broadcast(); +} + +void PosixDynamicThreadPool::PostTask(Task* task) { + AutoLock locked(lock_); + DCHECK(!terminated_) << + "This thread pool is already terminated. Do not post new tasks."; + + tasks_.push(task); + + // We have enough worker threads. + if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) { + tasks_available_cv_.Signal(); + } else { + // The new PlatformThread will take ownership of the WorkerThread object, + // which will delete itself on exit. + WorkerThread* worker = + new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); + PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); + } +} + +Task* PosixDynamicThreadPool::WaitForTask() { + AutoLock locked(lock_); + + if (terminated_) + return NULL; + + if (tasks_.empty()) { // No work available, wait for work. + num_idle_threads_++; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + tasks_available_cv_.TimedWait( + TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); + num_idle_threads_--; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + if (tasks_.empty()) { + // We waited for work, but there's still no work. Return NULL to signal + // the thread to terminate. + return NULL; + } + } + + Task* task = tasks_.front(); + tasks_.pop(); + return task; +} + +} // namespace base diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h new file mode 100644 index 0000000..6c99e76 --- /dev/null +++ b/base/threading/worker_pool_posix.h @@ -0,0 +1,89 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// The thread pool used in the POSIX implementation of WorkerPool dynamically +// adds threads as necessary to handle all tasks. It keeps old threads around +// for a period of time to allow them to be reused. After this waiting period, +// the threads exit. This thread pool uses non-joinable threads, therefore +// worker threads are not joined during process shutdown. This means that +// potentially long running tasks (such as DNS lookup) do not block process +// shutdown, but also means that process shutdown may "leak" objects. Note that +// although PosixDynamicThreadPool spawns the worker threads and manages the +// task queue, it does not own the worker threads. The worker threads ask the +// PosixDynamicThreadPool for work and eventually clean themselves up. The +// worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool +// instance, which prevents PosixDynamicThreadPool from disappearing before all +// worker threads exit. The owner of PosixDynamicThreadPool should likewise +// maintain a scoped_refptr to the PosixDynamicThreadPool instance. +// +// NOTE: The classes defined in this file are only meant for use by the POSIX +// implementation of WorkerPool. No one else should be using these classes. +// These symbols are exported in a header purely for testing purposes. + +#ifndef BASE_THREADING_WORKER_POOL_POSIX_H_ +#define BASE_THREADING_WORKER_POOL_POSIX_H_ +#pragma once + +#include <queue> +#include <string> + +#include "base/basictypes.h" +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" + +class Task; + +namespace base { + +class PosixDynamicThreadPool + : public RefCountedThreadSafe<PosixDynamicThreadPool> { + public: + class PosixDynamicThreadPoolPeer; + + // All worker threads will share the same |name_prefix|. They will exit after + // |idle_seconds_before_exit|. + PosixDynamicThreadPool(const std::string& name_prefix, + int idle_seconds_before_exit); + ~PosixDynamicThreadPool(); + + // Indicates that the thread pool is going away. Stops handing out tasks to + // worker threads. Wakes up all the idle threads to let them exit. + void Terminate(); + + // Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership + // of |task|. + void PostTask(Task* task); + + // Worker thread method to wait for up to |idle_seconds_before_exit| for more + // work from the thread pool. Returns NULL if no work is available. + Task* WaitForTask(); + + private: + friend class PosixDynamicThreadPoolPeer; + + const std::string name_prefix_; + const int idle_seconds_before_exit_; + + Lock lock_; // Protects all the variables below. + + // Signal()s worker threads to let them know more tasks are available. + // Also used for Broadcast()'ing to worker threads to let them know the pool + // is being deleted and they can exit. + ConditionVariable tasks_available_cv_; + int num_idle_threads_; + std::queue<Task*> tasks_; + bool terminated_; + // Only used for tests to ensure correct thread ordering. It will always be + // NULL in non-test code. + scoped_ptr<ConditionVariable> num_idle_threads_cv_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool); +}; + +} // namespace base + +#endif // BASE_THREADING_WORKER_POOL_POSIX_H_ diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc new file mode 100644 index 0000000..48df16e --- /dev/null +++ b/base/threading/worker_pool_posix_unittest.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include <set> + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/task.h" +#include "base/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Peer class to provide passthrough access to PosixDynamicThreadPool internals. +class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { + public: + explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool) + : pool_(pool) {} + + Lock* lock() { return &pool_->lock_; } + ConditionVariable* tasks_available_cv() { + return &pool_->tasks_available_cv_; + } + const std::queue<Task*>& tasks() const { return pool_->tasks_; } + int num_idle_threads() const { return pool_->num_idle_threads_; } + ConditionVariable* num_idle_threads_cv() { + return pool_->num_idle_threads_cv_.get(); + } + void set_num_idle_threads_cv(ConditionVariable* cv) { + pool_->num_idle_threads_cv_.reset(cv); + } + + private: + PosixDynamicThreadPool* pool_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer); +}; + +namespace { + +// IncrementingTask's main purpose is to increment a counter. It also updates a +// set of unique thread ids, and signals a ConditionVariable on completion. +// Note that since it does not block, there is no way to control the number of +// threads used if more than one IncrementingTask is consecutively posted to the +// thread pool, since the first one might finish executing before the subsequent +// PostTask() calls get invoked. +class IncrementingTask : public Task { + public: + IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads) + : counter_lock_(counter_lock), + unique_threads_lock_(unique_threads_lock), + unique_threads_(unique_threads), + counter_(counter) {} + + virtual void Run() { + AddSelfToUniqueThreadSet(); + AutoLock locked(*counter_lock_); + (*counter_)++; + } + + void AddSelfToUniqueThreadSet() { + AutoLock locked(*unique_threads_lock_); + unique_threads_->insert(PlatformThread::CurrentId()); + } + + private: + Lock* counter_lock_; + Lock* unique_threads_lock_; + std::set<PlatformThreadId>* unique_threads_; + int* counter_; + + DISALLOW_COPY_AND_ASSIGN(IncrementingTask); +}; + +// BlockingIncrementingTask is a simple wrapper around IncrementingTask that +// allows for waiting at the start of Run() for a WaitableEvent to be signalled. +class BlockingIncrementingTask : public Task { + public: + BlockingIncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads, + Lock* num_waiting_to_start_lock, + int* num_waiting_to_start, + ConditionVariable* num_waiting_to_start_cv, + base::WaitableEvent* start) + : incrementer_( + counter_lock, counter, unique_threads_lock, unique_threads), + num_waiting_to_start_lock_(num_waiting_to_start_lock), + num_waiting_to_start_(num_waiting_to_start), + num_waiting_to_start_cv_(num_waiting_to_start_cv), + start_(start) {} + + virtual void Run() { + { + AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); + (*num_waiting_to_start_)++; + } + num_waiting_to_start_cv_->Signal(); + CHECK(start_->Wait()); + incrementer_.Run(); + } + + private: + IncrementingTask incrementer_; + Lock* num_waiting_to_start_lock_; + int* num_waiting_to_start_; + ConditionVariable* num_waiting_to_start_cv_; + base::WaitableEvent* start_; + + DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); +}; + +class PosixDynamicThreadPoolTest : public testing::Test { + protected: + PosixDynamicThreadPoolTest() + : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), + peer_(pool_.get()), + counter_(0), + num_waiting_to_start_(0), + num_waiting_to_start_cv_(&num_waiting_to_start_lock_), + start_(true, false) {} + + virtual void SetUp() { + peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); + } + + virtual void TearDown() { + // Wake up the idle threads so they can terminate. + if (pool_.get()) pool_->Terminate(); + } + + void WaitForTasksToStart(int num_tasks) { + AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); + while (num_waiting_to_start_ < num_tasks) { + num_waiting_to_start_cv_.Wait(); + } + } + + void WaitForIdleThreads(int num_idle_threads) { + AutoLock pool_locked(*peer_.lock()); + while (peer_.num_idle_threads() < num_idle_threads) { + peer_.num_idle_threads_cv()->Wait(); + } + } + + Task* CreateNewIncrementingTask() { + return new IncrementingTask(&counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); + } + + Task* CreateNewBlockingIncrementingTask() { + return new BlockingIncrementingTask( + &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, + &num_waiting_to_start_lock_, &num_waiting_to_start_, + &num_waiting_to_start_cv_, &start_); + } + + scoped_refptr<base::PosixDynamicThreadPool> pool_; + base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; + Lock counter_lock_; + int counter_; + Lock unique_threads_lock_; + std::set<PlatformThreadId> unique_threads_; + Lock num_waiting_to_start_lock_; + int num_waiting_to_start_; + ConditionVariable num_waiting_to_start_cv_; + base::WaitableEvent start_; +}; + +} // namespace + +TEST_F(PosixDynamicThreadPoolTest, Basic) { + EXPECT_EQ(0, peer_.num_idle_threads()); + EXPECT_EQ(0U, unique_threads_.size()); + EXPECT_EQ(0U, peer_.tasks().size()); + + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + EXPECT_EQ(1U, unique_threads_.size()) << + "There should be only one thread allocated for one task."; + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(1, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add another 2 tasks. One should reuse the existing worker thread. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(3, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { + // Add two blocking tasks. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; + EXPECT_EQ(2, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, Complex) { + // Add two non blocking tasks and wait for them to finish. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add two blocking tasks, start them simultaneously, and wait for them to + // finish. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(3, counter_); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, unique_threads_.size()); + + // Wake up all idle threads so they can exit. + { + AutoLock locked(*peer_.lock()); + while (peer_.num_idle_threads() > 0) { + peer_.tasks_available_cv()->Signal(); + peer_.num_idle_threads_cv()->Wait(); + } + } + + // Add another non blocking task. There are no threads to reuse. + pool_->PostTask(CreateNewIncrementingTask()); + WaitForIdleThreads(1); + + EXPECT_EQ(3U, unique_threads_.size()); + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(4, counter_); +} + +} // namespace base diff --git a/base/threading/worker_pool_unittest.cc b/base/threading/worker_pool_unittest.cc new file mode 100644 index 0000000..cf8e0e8 --- /dev/null +++ b/base/threading/worker_pool_unittest.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/task.h" +#include "base/waitable_event.h" +#include "base/threading/worker_pool.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +typedef PlatformTest WorkerPoolTest; + +namespace base { + +namespace { + +class PostTaskTestTask : public Task { + public: + explicit PostTaskTestTask(WaitableEvent* event) : event_(event) { + } + + void Run() { + event_->Signal(); + } + + private: + WaitableEvent* event_; +}; + +} // namespace + +TEST_F(WorkerPoolTest, PostTask) { + WaitableEvent test_event(false, false); + WaitableEvent long_test_event(false, false); + bool signaled; + + WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&test_event), false); + WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&long_test_event), true); + + signaled = test_event.Wait(); + EXPECT_TRUE(signaled); + signaled = long_test_event.Wait(); + EXPECT_TRUE(signaled); +} + +} // namespace base diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc new file mode 100644 index 0000000..2072e52 --- /dev/null +++ b/base/threading/worker_pool_win.cc @@ -0,0 +1,40 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool.h" + +#include "base/logging.h" +#include "base/task.h" + +namespace base { + +namespace { + +DWORD CALLBACK WorkItemCallback(void* param) { + Task* task = static_cast<Task*>(param); + task->Run(); + delete task; + return 0; +} + +} // namespace + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + task->SetBirthPlace(from_here); + + ULONG flags = 0; + if (task_is_slow) + flags |= WT_EXECUTELONGFUNCTION; + + if (!QueueUserWorkItem(WorkItemCallback, task, flags)) { + DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError(); + delete task; + return false; + } + + return true; +} + +} // namespace base |