From ac9ba8fe1d2f2397de3d7c4cebfb3c659d226fd3 Mon Sep 17 00:00:00 2001 From: "brettw@chromium.org" Date: Thu, 30 Dec 2010 18:08:36 +0000 Subject: Move some misc thread-related stuff from base to base/thread and into the base namespace. This does not move the "hard" thread stuff (thread.h). TEST=it compiles BUG=none Review URL: http://codereview.chromium.org/6079009 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70315 0039d316-1c4b-4281-b951-d872f2087c98 --- base/base.gyp | 8 +- base/base.gypi | 16 +- base/lazy_instance_unittest.cc | 2 +- base/metrics/stats_table_unittest.cc | 6 +- base/non_thread_safe_unittest.cc | 2 +- base/simple_thread.cc | 152 ------------ base/simple_thread.h | 182 -------------- base/simple_thread_unittest.cc | 166 ------------- base/thread_checker_unittest.cc | 2 +- base/thread_collision_warner_unittest.cc | 2 +- base/thread_local_storage_unittest.cc | 2 +- base/thread_local_unittest.cc | 4 +- base/threading/simple_thread.cc | 152 ++++++++++++ base/threading/simple_thread.h | 182 ++++++++++++++ base/threading/simple_thread_unittest.cc | 170 +++++++++++++ base/threading/watchdog.cc | 144 +++++++++++ base/threading/watchdog.h | 98 ++++++++ base/threading/watchdog_unittest.cc | 140 +++++++++++ base/threading/worker_pool.h | 35 +++ base/threading/worker_pool_posix.cc | 169 +++++++++++++ base/threading/worker_pool_posix.h | 89 +++++++ base/threading/worker_pool_posix_unittest.cc | 268 +++++++++++++++++++++ base/threading/worker_pool_unittest.cc | 46 ++++ base/threading/worker_pool_win.cc | 40 +++ base/watchdog.cc | 146 ----------- base/watchdog.h | 94 -------- base/watchdog_unittest.cc | 142 ----------- base/worker_pool.h | 31 --- base/worker_pool_posix.cc | 168 ------------- base/worker_pool_posix.h | 89 ------- base/worker_pool_posix_unittest.cc | 268 --------------------- base/worker_pool_unittest.cc | 44 ---- base/worker_pool_win.cc | 36 --- chrome/browser/jankometer.cc | 4 +- chrome/browser/printing/printing_layout_uitest.cc | 2 +- .../browser/renderer_host/render_message_filter.cc | 4 +- chrome/browser/ui/cocoa/keystone_glue.mm | 4 +- chrome/gpu/gpu_thread.cc | 4 +- chrome/renderer/pepper_devices.h | 2 +- net/base/cert_verifier.cc | 8 +- net/base/dnsrr_resolver.cc | 8 +- net/base/file_stream_posix.cc | 22 +- net/base/host_resolver_impl.cc | 6 +- net/base/keygen_handler_unittest.cc | 10 +- net/base/test_completion_callback_unittest.cc | 8 +- net/disk_cache/backend_impl.cc | 4 +- net/disk_cache/file_posix.cc | 6 +- net/proxy/polling_proxy_config_service.cc | 8 +- net/tools/flip_server/flip_in_mem_edsm_server.cc | 2 +- net/url_request/url_request_file_job.cc | 7 +- ppapi/proxy/ppb_audio_proxy.cc | 2 +- ppapi/shared_impl/audio_impl.h | 2 +- 52 files changed, 1610 insertions(+), 1598 deletions(-) delete mode 100644 base/simple_thread.cc delete mode 100644 base/simple_thread.h delete mode 100644 base/simple_thread_unittest.cc create mode 100644 base/threading/simple_thread.cc create mode 100644 base/threading/simple_thread.h create mode 100644 base/threading/simple_thread_unittest.cc create mode 100644 base/threading/watchdog.cc create mode 100644 base/threading/watchdog.h create mode 100644 base/threading/watchdog_unittest.cc create mode 100644 base/threading/worker_pool.h create mode 100644 base/threading/worker_pool_posix.cc create mode 100644 base/threading/worker_pool_posix.h create mode 100644 base/threading/worker_pool_posix_unittest.cc create mode 100644 base/threading/worker_pool_unittest.cc create mode 100644 base/threading/worker_pool_win.cc delete mode 100644 base/watchdog.cc delete mode 100644 base/watchdog.h delete mode 100644 base/watchdog_unittest.cc delete mode 100644 base/worker_pool.h delete mode 100644 base/worker_pool_posix.cc delete mode 100644 base/worker_pool_posix.h delete mode 100644 base/worker_pool_posix_unittest.cc delete mode 100644 base/worker_pool_unittest.cc delete mode 100644 base/worker_pool_win.cc diff --git a/base/base.gyp b/base/base.gyp index 075561b..327a098 100644 --- a/base/base.gyp +++ b/base/base.gyp @@ -125,7 +125,6 @@ 'sha1_unittest.cc', 'sha2_unittest.cc', 'shared_memory_unittest.cc', - 'simple_thread_unittest.cc', 'singleton_unittest.cc', 'stack_container_unittest.cc', 'string16_unittest.cc', @@ -141,6 +140,10 @@ 'sys_string_conversions_unittest.cc', 'task_queue_unittest.cc', 'task_unittest.cc', + 'threading/simple_thread_unittest.cc', + 'threading/watchdog_unittest.cc', + 'threading/worker_pool_posix_unittest.cc', + 'threading/worker_pool_unittest.cc', 'thread_checker_unittest.cc', 'thread_collision_warner_unittest.cc', 'thread_local_storage_unittest.cc', @@ -159,7 +162,6 @@ 'vlog_unittest.cc', 'waitable_event_unittest.cc', 'waitable_event_watcher_unittest.cc', - 'watchdog_unittest.cc', 'weak_ptr_unittest.cc', 'win_util_unittest.cc', 'win/event_trace_consumer_unittest.cc', @@ -171,8 +173,6 @@ 'win/scoped_bstr_unittest.cc', 'win/scoped_comptr_unittest.cc', 'win/scoped_variant_unittest.cc', - 'worker_pool_posix_unittest.cc', - 'worker_pool_unittest.cc', ], 'dependencies': [ 'base', diff --git a/base/base.gypi b/base/base.gypi index 102c49ca..e969149 100644 --- a/base/base.gypi +++ b/base/base.gypi @@ -209,8 +209,6 @@ 'shared_memory.h', 'shared_memory_posix.cc', 'shared_memory_win.cc', - 'simple_thread.cc', - 'simple_thread.h', 'singleton.h', 'spin_wait.h', 'stack_container.h', @@ -245,6 +243,14 @@ 'task_queue.cc', 'task_queue.h', 'template_util.h', + 'threading/simple_thread.cc', + 'threading/simple_thread.h', + 'threading/watchdog.cc', + 'threading/watchdog.h', + 'threading/worker_pool.h', + 'threading/worker_pool_posix.cc', + 'threading/worker_pool_posix.h', + 'threading/worker_pool_win.cc', 'thread.cc', 'thread.h', 'thread_checker.cc', @@ -289,8 +295,6 @@ 'waitable_event_watcher_posix.cc', 'waitable_event_watcher_win.cc', 'waitable_event_win.cc', - 'watchdog.cc', - 'watchdog.h', 'weak_ptr.cc', 'weak_ptr.h', 'win/i18n.cc', @@ -317,10 +321,6 @@ 'win/windows_version.h', 'win_util.cc', 'win_util.h', - 'worker_pool.h', - 'worker_pool_posix.cc', - 'worker_pool_posix.h', - 'worker_pool_win.cc', 'nix/xdg_util.h', 'nix/xdg_util.cc', ], diff --git a/base/lazy_instance_unittest.cc b/base/lazy_instance_unittest.cc index 1731381..d9e0fa9 100644 --- a/base/lazy_instance_unittest.cc +++ b/base/lazy_instance_unittest.cc @@ -5,7 +5,7 @@ #include "base/at_exit.h" #include "base/atomic_sequence_num.h" #include "base/lazy_instance.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "testing/gtest/include/gtest/gtest.h" namespace { diff --git a/base/metrics/stats_table_unittest.cc b/base/metrics/stats_table_unittest.cc index c9eb9a2..9052244 100644 --- a/base/metrics/stats_table_unittest.cc +++ b/base/metrics/stats_table_unittest.cc @@ -2,14 +2,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/metrics/stats_counters.h" +#include "base/metrics/stats_table.h" #include "base/platform_thread.h" -#include "base/simple_thread.h" #include "base/shared_memory.h" -#include "base/metrics/stats_table.h" -#include "base/metrics/stats_counters.h" #include "base/string_piece.h" #include "base/string_util.h" #include "base/test/multiprocess_test.h" +#include "base/threading/simple_thread.h" #include "base/utf_string_conversions.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/multiprocess_func_list.h" diff --git a/base/non_thread_safe_unittest.cc b/base/non_thread_safe_unittest.cc index 1db198b..98af0e1 100644 --- a/base/non_thread_safe_unittest.cc +++ b/base/non_thread_safe_unittest.cc @@ -6,7 +6,7 @@ #include "base/logging.h" #include "base/non_thread_safe.h" #include "base/scoped_ptr.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "testing/gtest/include/gtest/gtest.h" #ifndef NDEBUG diff --git a/base/simple_thread.cc b/base/simple_thread.cc deleted file mode 100644 index 086a430..0000000 --- a/base/simple_thread.cc +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/simple_thread.h" - -#include "base/logging.h" -#include "base/platform_thread.h" -#include "base/string_number_conversions.h" - -namespace base { - -void SimpleThread::Start() { - DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times."; - bool success = PlatformThread::Create(options_.stack_size(), this, &thread_); - CHECK(success); - event_.Wait(); // Wait for the thread to complete initialization. -} - -void SimpleThread::Join() { - DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread."; - DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times."; - PlatformThread::Join(thread_); - joined_ = true; -} - -void SimpleThread::ThreadMain() { - tid_ = PlatformThread::CurrentId(); - // Construct our full name of the form "name_prefix_/TID". - name_.push_back('/'); - name_.append(IntToString(tid_)); - PlatformThread::SetName(name_.c_str()); - - // We've initialized our new thread, signal that we're done to Start(). - event_.Signal(); - - Run(); -} - -SimpleThread::SimpleThread(const std::string& name_prefix) - : name_prefix_(name_prefix), name_(name_prefix), - thread_(), event_(true, false), tid_(0), joined_(false) { -} - -SimpleThread::SimpleThread(const std::string& name_prefix, - const Options& options) - : name_prefix_(name_prefix), name_(name_prefix), options_(options), - thread_(), event_(true, false), tid_(0), joined_(false) { -} - -SimpleThread::~SimpleThread() { - DCHECK(HasBeenStarted()) << "SimpleThread was never started."; - DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed."; -} - -DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, - const std::string& name_prefix) - : SimpleThread(name_prefix), - delegate_(delegate) { -} - -DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, - const std::string& name_prefix, - const Options& options) - : SimpleThread(name_prefix, options), - delegate_(delegate) { -} - -DelegateSimpleThread::~DelegateSimpleThread() { -} - -void DelegateSimpleThread::Run() { - DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)"; - delegate_->Run(); - delegate_ = NULL; -} - -DelegateSimpleThreadPool::DelegateSimpleThreadPool( - const std::string& name_prefix, - int num_threads) - : name_prefix_(name_prefix), - num_threads_(num_threads), - dry_(true, false) { -} - -DelegateSimpleThreadPool::~DelegateSimpleThreadPool() { - DCHECK(threads_.empty()); - DCHECK(delegates_.empty()); - DCHECK(!dry_.IsSignaled()); -} - -void DelegateSimpleThreadPool::Start() { - DCHECK(threads_.empty()) << "Start() called with outstanding threads."; - for (int i = 0; i < num_threads_; ++i) { - DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_); - thread->Start(); - threads_.push_back(thread); - } -} - -void DelegateSimpleThreadPool::JoinAll() { - DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads."; - - // Tell all our threads to quit their worker loop. - AddWork(NULL, num_threads_); - - // Join and destroy all the worker threads. - for (int i = 0; i < num_threads_; ++i) { - threads_[i]->Join(); - delete threads_[i]; - } - threads_.clear(); - DCHECK(delegates_.empty()); -} - -void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) { - AutoLock locked(lock_); - for (int i = 0; i < repeat_count; ++i) - delegates_.push(delegate); - // If we were empty, signal that we have work now. - if (!dry_.IsSignaled()) - dry_.Signal(); -} - -void DelegateSimpleThreadPool::Run() { - Delegate* work = NULL; - - while (true) { - dry_.Wait(); - { - AutoLock locked(lock_); - if (!dry_.IsSignaled()) - continue; - - DCHECK(!delegates_.empty()); - work = delegates_.front(); - delegates_.pop(); - - // Signal to any other threads that we're currently out of work. - if (delegates_.empty()) - dry_.Reset(); - } - - // A NULL delegate pointer signals us to quit. - if (!work) - break; - - work->Run(); - } -} - -} // namespace base diff --git a/base/simple_thread.h b/base/simple_thread.h deleted file mode 100644 index 13c46c0..0000000 --- a/base/simple_thread.h +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -// WARNING: You should probably be using Thread (thread.h) instead. Thread is -// Chrome's message-loop based Thread abstraction, and if you are a -// thread running in the browser, there will likely be assumptions -// that your thread will have an associated message loop. -// -// This is a simple thread interface that backs to a native operating system -// thread. You should use this only when you want a thread that does not have -// an associated MessageLoop. Unittesting is the best example of this. -// -// The simplest interface to use is DelegateSimpleThread, which will create -// a new thread, and execute the Delegate's virtual Run() in this new thread -// until it has completed, exiting the thread. -// -// NOTE: You *MUST* call Join on the thread to clean up the underlying thread -// resources. You are also responsible for destructing the SimpleThread object. -// It is invalid to destroy a SimpleThread while it is running, or without -// Start() having been called (and a thread never created). The Delegate -// object should live as long as a DelegateSimpleThread. -// -// Thread Safety: A SimpleThread is not completely thread safe. It is safe to -// access it from the creating thread or from the newly created thread. This -// implies that the creator thread should be the thread that calls Join. -// -// Example: -// class MyThreadRunner : public DelegateSimpleThread::Delegate { ... }; -// MyThreadRunner runner; -// DelegateSimpleThread thread(&runner, "good_name_here"); -// thread.Start(); -// // Start will return after the Thread has been successfully started and -// // initialized. The newly created thread will invoke runner->Run(), and -// // run until it returns. -// thread.Join(); // Wait until the thread has exited. You *MUST* Join! -// // The SimpleThread object is still valid, however you may not call Join -// // or Start again. - -#ifndef BASE_SIMPLE_THREAD_H_ -#define BASE_SIMPLE_THREAD_H_ -#pragma once - -#include -#include -#include - -#include "base/basictypes.h" -#include "base/lock.h" -#include "base/waitable_event.h" -#include "base/platform_thread.h" - -namespace base { - -// This is the base SimpleThread. You can derive from it and implement the -// virtual Run method, or you can use the DelegateSimpleThread interface. -class SimpleThread : public PlatformThread::Delegate { - public: - class Options { - public: - Options() : stack_size_(0) { } - ~Options() { } - - // We use the standard compiler-supplied copy constructor. - - // A custom stack size, or 0 for the system default. - void set_stack_size(size_t size) { stack_size_ = size; } - size_t stack_size() const { return stack_size_; } - private: - size_t stack_size_; - }; - - // Create a SimpleThread. |options| should be used to manage any specific - // configuration involving the thread creation and management. - // Every thread has a name, in the form of |name_prefix|/TID, for example - // "my_thread/321". The thread will not be created until Start() is called. - explicit SimpleThread(const std::string& name_prefix); - SimpleThread(const std::string& name_prefix, const Options& options); - - virtual ~SimpleThread(); - - virtual void Start(); - virtual void Join(); - - // We follow the PlatformThread Delegate interface. - virtual void ThreadMain(); - - // Subclasses should override the Run method. - virtual void Run() = 0; - - // Return the thread name prefix, or "unnamed" if none was supplied. - std::string name_prefix() { return name_prefix_; } - - // Return the completed name including TID, only valid after Start(). - std::string name() { return name_; } - - // Return the thread id, only valid after Start(). - PlatformThreadId tid() { return tid_; } - - // Return True if Start() has ever been called. - bool HasBeenStarted() { return event_.IsSignaled(); } - - // Return True if Join() has evern been called. - bool HasBeenJoined() { return joined_; } - - private: - const std::string name_prefix_; - std::string name_; - const Options options_; - PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join! - WaitableEvent event_; // Signaled if Start() was ever called. - PlatformThreadId tid_; // The backing thread's id. - bool joined_; // True if Join has been called. -}; - -class DelegateSimpleThread : public SimpleThread { - public: - class Delegate { - public: - Delegate() { } - virtual ~Delegate() { } - virtual void Run() = 0; - }; - - DelegateSimpleThread(Delegate* delegate, - const std::string& name_prefix); - DelegateSimpleThread(Delegate* delegate, - const std::string& name_prefix, - const Options& options); - - virtual ~DelegateSimpleThread(); - virtual void Run(); - private: - Delegate* delegate_; -}; - -// DelegateSimpleThreadPool allows you to start up a fixed number of threads, -// and then add jobs which will be dispatched to the threads. This is -// convenient when you have a lot of small work that you want done -// multi-threaded, but don't want to spawn a thread for each small bit of work. -// -// You just call AddWork() to add a delegate to the list of work to be done. -// JoinAll() will make sure that all outstanding work is processed, and wait -// for everything to finish. You can reuse a pool, so you can call Start() -// again after you've called JoinAll(). -class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate { - public: - typedef DelegateSimpleThread::Delegate Delegate; - - DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads); - virtual ~DelegateSimpleThreadPool(); - - // Start up all of the underlying threads, and start processing work if we - // have any. - void Start(); - - // Make sure all outstanding work is finished, and wait for and destroy all - // of the underlying threads in the pool. - void JoinAll(); - - // It is safe to AddWork() any time, before or after Start(). - // Delegate* should always be a valid pointer, NULL is reserved internally. - void AddWork(Delegate* work, int repeat_count); - void AddWork(Delegate* work) { - AddWork(work, 1); - } - - // We implement the Delegate interface, for running our internal threads. - virtual void Run(); - - private: - const std::string name_prefix_; - int num_threads_; - std::vector threads_; - std::queue delegates_; - Lock lock_; // Locks delegates_ - WaitableEvent dry_; // Not signaled when there is no work to do. -}; - -} // namespace base - -#endif // BASE_SIMPLE_THREAD_H_ diff --git a/base/simple_thread_unittest.cc b/base/simple_thread_unittest.cc deleted file mode 100644 index 208290a..0000000 --- a/base/simple_thread_unittest.cc +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/atomic_sequence_num.h" -#include "base/simple_thread.h" -#include "base/string_number_conversions.h" -#include "base/waitable_event.h" -#include "testing/gtest/include/gtest/gtest.h" - -namespace { - -class SetIntRunner : public base::DelegateSimpleThread::Delegate { - public: - SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { } - ~SetIntRunner() { } - - virtual void Run() { - *ptr_ = val_; - } - - private: - int* ptr_; - int val_; -}; - -class WaitEventRunner : public base::DelegateSimpleThread::Delegate { - public: - explicit WaitEventRunner(base::WaitableEvent* event) : event_(event) { } - ~WaitEventRunner() { } - - virtual void Run() { - EXPECT_FALSE(event_->IsSignaled()); - event_->Signal(); - EXPECT_TRUE(event_->IsSignaled()); - } - private: - base::WaitableEvent* event_; -}; - -class SeqRunner : public base::DelegateSimpleThread::Delegate { - public: - explicit SeqRunner(base::AtomicSequenceNumber* seq) : seq_(seq) { } - virtual void Run() { - seq_->GetNext(); - } - - private: - base::AtomicSequenceNumber* seq_; -}; - -// We count up on a sequence number, firing on the event when we've hit our -// expected amount, otherwise we wait on the event. This will ensure that we -// have all threads outstanding until we hit our expected thread pool size. -class VerifyPoolRunner : public base::DelegateSimpleThread::Delegate { - public: - VerifyPoolRunner(base::AtomicSequenceNumber* seq, - int total, base::WaitableEvent* event) - : seq_(seq), total_(total), event_(event) { } - - virtual void Run() { - if (seq_->GetNext() == total_) { - event_->Signal(); - } else { - event_->Wait(); - } - } - - private: - base::AtomicSequenceNumber* seq_; - int total_; - base::WaitableEvent* event_; -}; - -} // namespace - -TEST(SimpleThreadTest, CreateAndJoin) { - int stack_int = 0; - - SetIntRunner runner(&stack_int, 7); - EXPECT_EQ(0, stack_int); - - base::DelegateSimpleThread thread(&runner, "int_setter"); - EXPECT_FALSE(thread.HasBeenStarted()); - EXPECT_FALSE(thread.HasBeenJoined()); - EXPECT_EQ(0, stack_int); - - thread.Start(); - EXPECT_TRUE(thread.HasBeenStarted()); - EXPECT_FALSE(thread.HasBeenJoined()); - - thread.Join(); - EXPECT_TRUE(thread.HasBeenStarted()); - EXPECT_TRUE(thread.HasBeenJoined()); - EXPECT_EQ(7, stack_int); -} - -TEST(SimpleThreadTest, WaitForEvent) { - // Create a thread, and wait for it to signal us. - base::WaitableEvent event(true, false); - - WaitEventRunner runner(&event); - base::DelegateSimpleThread thread(&runner, "event_waiter"); - - EXPECT_FALSE(event.IsSignaled()); - thread.Start(); - event.Wait(); - EXPECT_TRUE(event.IsSignaled()); - thread.Join(); -} - -TEST(SimpleThreadTest, NamedWithOptions) { - base::WaitableEvent event(true, false); - - WaitEventRunner runner(&event); - base::SimpleThread::Options options; - base::DelegateSimpleThread thread(&runner, "event_waiter", options); - EXPECT_EQ(thread.name_prefix(), "event_waiter"); - EXPECT_FALSE(event.IsSignaled()); - - thread.Start(); - EXPECT_EQ(thread.name_prefix(), "event_waiter"); - EXPECT_EQ(thread.name(), - std::string("event_waiter/") + base::IntToString(thread.tid())); - event.Wait(); - - EXPECT_TRUE(event.IsSignaled()); - thread.Join(); - - // We keep the name and tid, even after the thread is gone. - EXPECT_EQ(thread.name_prefix(), "event_waiter"); - EXPECT_EQ(thread.name(), - std::string("event_waiter/") + base::IntToString(thread.tid())); -} - -TEST(SimpleThreadTest, ThreadPool) { - base::AtomicSequenceNumber seq; - SeqRunner runner(&seq); - base::DelegateSimpleThreadPool pool("seq_runner", 10); - - // Add work before we're running. - pool.AddWork(&runner, 300); - - EXPECT_EQ(seq.GetNext(), 0); - pool.Start(); - - // Add work while we're running. - pool.AddWork(&runner, 300); - - pool.JoinAll(); - - EXPECT_EQ(seq.GetNext(), 601); - - // We can reuse our pool. Verify that all 10 threads can actually run in - // parallel, so this test will only pass if there are actually 10 threads. - base::AtomicSequenceNumber seq2; - base::WaitableEvent event(true, false); - // Changing 9 to 10, for example, would cause us JoinAll() to never return. - VerifyPoolRunner verifier(&seq2, 9, &event); - pool.Start(); - - pool.AddWork(&verifier, 10); - - pool.JoinAll(); - EXPECT_EQ(seq2.GetNext(), 10); -} diff --git a/base/thread_checker_unittest.cc b/base/thread_checker_unittest.cc index 6c55348..2b4aa7a 100644 --- a/base/thread_checker_unittest.cc +++ b/base/thread_checker_unittest.cc @@ -6,7 +6,7 @@ #include "base/logging.h" #include "base/thread_checker.h" #include "base/scoped_ptr.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "testing/gtest/include/gtest/gtest.h" #ifndef NDEBUG diff --git a/base/thread_collision_warner_unittest.cc b/base/thread_collision_warner_unittest.cc index b70cfbe..e563036b 100644 --- a/base/thread_collision_warner_unittest.cc +++ b/base/thread_collision_warner_unittest.cc @@ -6,7 +6,7 @@ #include "base/lock.h" #include "base/platform_thread.h" #include "base/scoped_ptr.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "base/thread_collision_warner.h" #include "testing/gtest/include/gtest/gtest.h" diff --git a/base/thread_local_storage_unittest.cc b/base/thread_local_storage_unittest.cc index cb24c75..dc82c53 100644 --- a/base/thread_local_storage_unittest.cc +++ b/base/thread_local_storage_unittest.cc @@ -7,7 +7,7 @@ #include #endif -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "base/thread_local_storage.h" #include "testing/gtest/include/gtest/gtest.h" diff --git a/base/thread_local_unittest.cc b/base/thread_local_unittest.cc index 7632c56..a155ca2 100644 --- a/base/thread_local_unittest.cc +++ b/base/thread_local_unittest.cc @@ -1,9 +1,9 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/logging.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "base/thread_local.h" #include "base/waitable_event.h" #include "testing/gtest/include/gtest/gtest.h" diff --git a/base/threading/simple_thread.cc b/base/threading/simple_thread.cc new file mode 100644 index 0000000..df1953f --- /dev/null +++ b/base/threading/simple_thread.cc @@ -0,0 +1,152 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/simple_thread.h" + +#include "base/logging.h" +#include "base/platform_thread.h" +#include "base/string_number_conversions.h" + +namespace base { + +void SimpleThread::Start() { + DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times."; + bool success = PlatformThread::Create(options_.stack_size(), this, &thread_); + CHECK(success); + event_.Wait(); // Wait for the thread to complete initialization. +} + +void SimpleThread::Join() { + DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread."; + DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times."; + PlatformThread::Join(thread_); + joined_ = true; +} + +void SimpleThread::ThreadMain() { + tid_ = PlatformThread::CurrentId(); + // Construct our full name of the form "name_prefix_/TID". + name_.push_back('/'); + name_.append(IntToString(tid_)); + PlatformThread::SetName(name_.c_str()); + + // We've initialized our new thread, signal that we're done to Start(). + event_.Signal(); + + Run(); +} + +SimpleThread::SimpleThread(const std::string& name_prefix) + : name_prefix_(name_prefix), name_(name_prefix), + thread_(), event_(true, false), tid_(0), joined_(false) { +} + +SimpleThread::SimpleThread(const std::string& name_prefix, + const Options& options) + : name_prefix_(name_prefix), name_(name_prefix), options_(options), + thread_(), event_(true, false), tid_(0), joined_(false) { +} + +SimpleThread::~SimpleThread() { + DCHECK(HasBeenStarted()) << "SimpleThread was never started."; + DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed."; +} + +DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix) + : SimpleThread(name_prefix), + delegate_(delegate) { +} + +DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix, + const Options& options) + : SimpleThread(name_prefix, options), + delegate_(delegate) { +} + +DelegateSimpleThread::~DelegateSimpleThread() { +} + +void DelegateSimpleThread::Run() { + DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)"; + delegate_->Run(); + delegate_ = NULL; +} + +DelegateSimpleThreadPool::DelegateSimpleThreadPool( + const std::string& name_prefix, + int num_threads) + : name_prefix_(name_prefix), + num_threads_(num_threads), + dry_(true, false) { +} + +DelegateSimpleThreadPool::~DelegateSimpleThreadPool() { + DCHECK(threads_.empty()); + DCHECK(delegates_.empty()); + DCHECK(!dry_.IsSignaled()); +} + +void DelegateSimpleThreadPool::Start() { + DCHECK(threads_.empty()) << "Start() called with outstanding threads."; + for (int i = 0; i < num_threads_; ++i) { + DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_); + thread->Start(); + threads_.push_back(thread); + } +} + +void DelegateSimpleThreadPool::JoinAll() { + DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads."; + + // Tell all our threads to quit their worker loop. + AddWork(NULL, num_threads_); + + // Join and destroy all the worker threads. + for (int i = 0; i < num_threads_; ++i) { + threads_[i]->Join(); + delete threads_[i]; + } + threads_.clear(); + DCHECK(delegates_.empty()); +} + +void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) { + AutoLock locked(lock_); + for (int i = 0; i < repeat_count; ++i) + delegates_.push(delegate); + // If we were empty, signal that we have work now. + if (!dry_.IsSignaled()) + dry_.Signal(); +} + +void DelegateSimpleThreadPool::Run() { + Delegate* work = NULL; + + while (true) { + dry_.Wait(); + { + AutoLock locked(lock_); + if (!dry_.IsSignaled()) + continue; + + DCHECK(!delegates_.empty()); + work = delegates_.front(); + delegates_.pop(); + + // Signal to any other threads that we're currently out of work. + if (delegates_.empty()) + dry_.Reset(); + } + + // A NULL delegate pointer signals us to quit. + if (!work) + break; + + work->Run(); + } +} + +} // namespace base diff --git a/base/threading/simple_thread.h b/base/threading/simple_thread.h new file mode 100644 index 0000000..dbff3ae --- /dev/null +++ b/base/threading/simple_thread.h @@ -0,0 +1,182 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// WARNING: You should probably be using Thread (thread.h) instead. Thread is +// Chrome's message-loop based Thread abstraction, and if you are a +// thread running in the browser, there will likely be assumptions +// that your thread will have an associated message loop. +// +// This is a simple thread interface that backs to a native operating system +// thread. You should use this only when you want a thread that does not have +// an associated MessageLoop. Unittesting is the best example of this. +// +// The simplest interface to use is DelegateSimpleThread, which will create +// a new thread, and execute the Delegate's virtual Run() in this new thread +// until it has completed, exiting the thread. +// +// NOTE: You *MUST* call Join on the thread to clean up the underlying thread +// resources. You are also responsible for destructing the SimpleThread object. +// It is invalid to destroy a SimpleThread while it is running, or without +// Start() having been called (and a thread never created). The Delegate +// object should live as long as a DelegateSimpleThread. +// +// Thread Safety: A SimpleThread is not completely thread safe. It is safe to +// access it from the creating thread or from the newly created thread. This +// implies that the creator thread should be the thread that calls Join. +// +// Example: +// class MyThreadRunner : public DelegateSimpleThread::Delegate { ... }; +// MyThreadRunner runner; +// DelegateSimpleThread thread(&runner, "good_name_here"); +// thread.Start(); +// // Start will return after the Thread has been successfully started and +// // initialized. The newly created thread will invoke runner->Run(), and +// // run until it returns. +// thread.Join(); // Wait until the thread has exited. You *MUST* Join! +// // The SimpleThread object is still valid, however you may not call Join +// // or Start again. + +#ifndef BASE_THREADING_SIMPLE_THREAD_H_ +#define BASE_THREADING_SIMPLE_THREAD_H_ +#pragma once + +#include +#include +#include + +#include "base/basictypes.h" +#include "base/lock.h" +#include "base/waitable_event.h" +#include "base/platform_thread.h" + +namespace base { + +// This is the base SimpleThread. You can derive from it and implement the +// virtual Run method, or you can use the DelegateSimpleThread interface. +class SimpleThread : public PlatformThread::Delegate { + public: + class Options { + public: + Options() : stack_size_(0) { } + ~Options() { } + + // We use the standard compiler-supplied copy constructor. + + // A custom stack size, or 0 for the system default. + void set_stack_size(size_t size) { stack_size_ = size; } + size_t stack_size() const { return stack_size_; } + private: + size_t stack_size_; + }; + + // Create a SimpleThread. |options| should be used to manage any specific + // configuration involving the thread creation and management. + // Every thread has a name, in the form of |name_prefix|/TID, for example + // "my_thread/321". The thread will not be created until Start() is called. + explicit SimpleThread(const std::string& name_prefix); + SimpleThread(const std::string& name_prefix, const Options& options); + + virtual ~SimpleThread(); + + virtual void Start(); + virtual void Join(); + + // We follow the PlatformThread Delegate interface. + virtual void ThreadMain(); + + // Subclasses should override the Run method. + virtual void Run() = 0; + + // Return the thread name prefix, or "unnamed" if none was supplied. + std::string name_prefix() { return name_prefix_; } + + // Return the completed name including TID, only valid after Start(). + std::string name() { return name_; } + + // Return the thread id, only valid after Start(). + PlatformThreadId tid() { return tid_; } + + // Return True if Start() has ever been called. + bool HasBeenStarted() { return event_.IsSignaled(); } + + // Return True if Join() has evern been called. + bool HasBeenJoined() { return joined_; } + + private: + const std::string name_prefix_; + std::string name_; + const Options options_; + PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join! + WaitableEvent event_; // Signaled if Start() was ever called. + PlatformThreadId tid_; // The backing thread's id. + bool joined_; // True if Join has been called. +}; + +class DelegateSimpleThread : public SimpleThread { + public: + class Delegate { + public: + Delegate() { } + virtual ~Delegate() { } + virtual void Run() = 0; + }; + + DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix); + DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix, + const Options& options); + + virtual ~DelegateSimpleThread(); + virtual void Run(); + private: + Delegate* delegate_; +}; + +// DelegateSimpleThreadPool allows you to start up a fixed number of threads, +// and then add jobs which will be dispatched to the threads. This is +// convenient when you have a lot of small work that you want done +// multi-threaded, but don't want to spawn a thread for each small bit of work. +// +// You just call AddWork() to add a delegate to the list of work to be done. +// JoinAll() will make sure that all outstanding work is processed, and wait +// for everything to finish. You can reuse a pool, so you can call Start() +// again after you've called JoinAll(). +class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate { + public: + typedef DelegateSimpleThread::Delegate Delegate; + + DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads); + virtual ~DelegateSimpleThreadPool(); + + // Start up all of the underlying threads, and start processing work if we + // have any. + void Start(); + + // Make sure all outstanding work is finished, and wait for and destroy all + // of the underlying threads in the pool. + void JoinAll(); + + // It is safe to AddWork() any time, before or after Start(). + // Delegate* should always be a valid pointer, NULL is reserved internally. + void AddWork(Delegate* work, int repeat_count); + void AddWork(Delegate* work) { + AddWork(work, 1); + } + + // We implement the Delegate interface, for running our internal threads. + virtual void Run(); + + private: + const std::string name_prefix_; + int num_threads_; + std::vector threads_; + std::queue delegates_; + Lock lock_; // Locks delegates_ + WaitableEvent dry_; // Not signaled when there is no work to do. +}; + +} // namespace base + +#endif // BASE_THREADING_SIMPLE_THREAD_H_ diff --git a/base/threading/simple_thread_unittest.cc b/base/threading/simple_thread_unittest.cc new file mode 100644 index 0000000..56aed6b --- /dev/null +++ b/base/threading/simple_thread_unittest.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/atomic_sequence_num.h" +#include "base/string_number_conversions.h" +#include "base/threading/simple_thread.h" +#include "base/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +class SetIntRunner : public DelegateSimpleThread::Delegate { + public: + SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { } + ~SetIntRunner() { } + + virtual void Run() { + *ptr_ = val_; + } + + private: + int* ptr_; + int val_; +}; + +class WaitEventRunner : public DelegateSimpleThread::Delegate { + public: + explicit WaitEventRunner(WaitableEvent* event) : event_(event) { } + ~WaitEventRunner() { } + + virtual void Run() { + EXPECT_FALSE(event_->IsSignaled()); + event_->Signal(); + EXPECT_TRUE(event_->IsSignaled()); + } + private: + WaitableEvent* event_; +}; + +class SeqRunner : public DelegateSimpleThread::Delegate { + public: + explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) { } + virtual void Run() { + seq_->GetNext(); + } + + private: + AtomicSequenceNumber* seq_; +}; + +// We count up on a sequence number, firing on the event when we've hit our +// expected amount, otherwise we wait on the event. This will ensure that we +// have all threads outstanding until we hit our expected thread pool size. +class VerifyPoolRunner : public DelegateSimpleThread::Delegate { + public: + VerifyPoolRunner(AtomicSequenceNumber* seq, + int total, WaitableEvent* event) + : seq_(seq), total_(total), event_(event) { } + + virtual void Run() { + if (seq_->GetNext() == total_) { + event_->Signal(); + } else { + event_->Wait(); + } + } + + private: + AtomicSequenceNumber* seq_; + int total_; + WaitableEvent* event_; +}; + +} // namespace + +TEST(SimpleThreadTest, CreateAndJoin) { + int stack_int = 0; + + SetIntRunner runner(&stack_int, 7); + EXPECT_EQ(0, stack_int); + + DelegateSimpleThread thread(&runner, "int_setter"); + EXPECT_FALSE(thread.HasBeenStarted()); + EXPECT_FALSE(thread.HasBeenJoined()); + EXPECT_EQ(0, stack_int); + + thread.Start(); + EXPECT_TRUE(thread.HasBeenStarted()); + EXPECT_FALSE(thread.HasBeenJoined()); + + thread.Join(); + EXPECT_TRUE(thread.HasBeenStarted()); + EXPECT_TRUE(thread.HasBeenJoined()); + EXPECT_EQ(7, stack_int); +} + +TEST(SimpleThreadTest, WaitForEvent) { + // Create a thread, and wait for it to signal us. + WaitableEvent event(true, false); + + WaitEventRunner runner(&event); + DelegateSimpleThread thread(&runner, "event_waiter"); + + EXPECT_FALSE(event.IsSignaled()); + thread.Start(); + event.Wait(); + EXPECT_TRUE(event.IsSignaled()); + thread.Join(); +} + +TEST(SimpleThreadTest, NamedWithOptions) { + WaitableEvent event(true, false); + + WaitEventRunner runner(&event); + SimpleThread::Options options; + DelegateSimpleThread thread(&runner, "event_waiter", options); + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_FALSE(event.IsSignaled()); + + thread.Start(); + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_EQ(thread.name(), + std::string("event_waiter/") + IntToString(thread.tid())); + event.Wait(); + + EXPECT_TRUE(event.IsSignaled()); + thread.Join(); + + // We keep the name and tid, even after the thread is gone. + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_EQ(thread.name(), + std::string("event_waiter/") + IntToString(thread.tid())); +} + +TEST(SimpleThreadTest, ThreadPool) { + AtomicSequenceNumber seq; + SeqRunner runner(&seq); + DelegateSimpleThreadPool pool("seq_runner", 10); + + // Add work before we're running. + pool.AddWork(&runner, 300); + + EXPECT_EQ(seq.GetNext(), 0); + pool.Start(); + + // Add work while we're running. + pool.AddWork(&runner, 300); + + pool.JoinAll(); + + EXPECT_EQ(seq.GetNext(), 601); + + // We can reuse our pool. Verify that all 10 threads can actually run in + // parallel, so this test will only pass if there are actually 10 threads. + AtomicSequenceNumber seq2; + WaitableEvent event(true, false); + // Changing 9 to 10, for example, would cause us JoinAll() to never return. + VerifyPoolRunner verifier(&seq2, 9, &event); + pool.Start(); + + pool.AddWork(&verifier, 10); + + pool.JoinAll(); + EXPECT_EQ(seq2.GetNext(), 10); +} + +} // namespace base diff --git a/base/threading/watchdog.cc b/base/threading/watchdog.cc new file mode 100644 index 0000000..8474744 --- /dev/null +++ b/base/threading/watchdog.cc @@ -0,0 +1,144 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/watchdog.h" + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/platform_thread.h" + +namespace base { + +// Start thread running in a Disarmed state. +Watchdog::Watchdog(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled) + : init_successful_(false), + lock_(), + condition_variable_(&lock_), + state_(DISARMED), + duration_(duration), + thread_watched_name_(thread_watched_name), + ALLOW_THIS_IN_INITIALIZER_LIST(delegate_(this)) { + if (!enabled) + return; // Don't start thread, or doing anything really. + init_successful_ = PlatformThread::Create(0, // Default stack size. + &delegate_, + &handle_); + DCHECK(init_successful_); +} + +// Notify watchdog thread, and wait for it to finish up. +Watchdog::~Watchdog() { + if (!init_successful_) + return; + { + AutoLock lock(lock_); + state_ = SHUTDOWN; + } + condition_variable_.Signal(); + PlatformThread::Join(handle_); +} + +void Watchdog::Arm() { + ArmAtStartTime(TimeTicks::Now()); +} + +void Watchdog::ArmSomeTimeDeltaAgo(const TimeDelta& time_delta) { + ArmAtStartTime(TimeTicks::Now() - time_delta); +} + +// Start clock for watchdog. +void Watchdog::ArmAtStartTime(const TimeTicks start_time) { + { + AutoLock lock(lock_); + start_time_ = start_time; + state_ = ARMED; + } + // Force watchdog to wake up, and go to sleep with the timer ticking with the + // proper duration. + condition_variable_.Signal(); +} + +// Disable watchdog so that it won't do anything when time expires. +void Watchdog::Disarm() { + AutoLock lock(lock_); + state_ = DISARMED; + // We don't need to signal, as the watchdog will eventually wake up, and it + // will check its state and time, and act accordingly. +} + +void Watchdog::Alarm() { + DVLOG(1) << "Watchdog alarmed for " << thread_watched_name_; +} + +//------------------------------------------------------------------------------ +// Internal private methods that the watchdog thread uses. + +void Watchdog::ThreadDelegate::ThreadMain() { + SetThreadName(); + TimeDelta remaining_duration; + while (1) { + AutoLock lock(watchdog_->lock_); + while (DISARMED == watchdog_->state_) + watchdog_->condition_variable_.Wait(); + if (SHUTDOWN == watchdog_->state_) + return; + DCHECK(ARMED == watchdog_->state_); + remaining_duration = watchdog_->duration_ - + (TimeTicks::Now() - watchdog_->start_time_); + if (remaining_duration.InMilliseconds() > 0) { + // Spurios wake? Timer drifts? Go back to sleep for remaining time. + watchdog_->condition_variable_.TimedWait(remaining_duration); + continue; + } + // We overslept, so this seems like a real alarm. + // Watch out for a user that stopped the debugger on a different alarm! + { + AutoLock static_lock(static_lock_); + if (last_debugged_alarm_time_ > watchdog_->start_time_) { + // False alarm: we started our clock before the debugger break (last + // alarm time). + watchdog_->start_time_ += last_debugged_alarm_delay_; + if (last_debugged_alarm_time_ > watchdog_->start_time_) + // Too many alarms must have taken place. + watchdog_->state_ = DISARMED; + continue; + } + } + watchdog_->state_ = DISARMED; // Only alarm at most once. + TimeTicks last_alarm_time = TimeTicks::Now(); + watchdog_->Alarm(); // Set a break point here to debug on alarms. + TimeDelta last_alarm_delay = TimeTicks::Now() - last_alarm_time; + if (last_alarm_delay <= TimeDelta::FromMilliseconds(2)) + continue; + // Ignore race of two alarms/breaks going off at roughly the same time. + AutoLock static_lock(static_lock_); + // This was a real debugger break. + last_debugged_alarm_time_ = last_alarm_time; + last_debugged_alarm_delay_ = last_alarm_delay; + } +} + +void Watchdog::ThreadDelegate::SetThreadName() const { + std::string name = watchdog_->thread_watched_name_ + " Watchdog"; + PlatformThread::SetName(name.c_str()); + DVLOG(1) << "Watchdog active: " << name; +} + +// static +void Watchdog::ResetStaticData() { + AutoLock lock(static_lock_); + last_debugged_alarm_time_ = TimeTicks(); + last_debugged_alarm_delay_ = TimeDelta(); +} + +// static +Lock Watchdog::static_lock_; // Lock for access of static data... +// static +TimeTicks Watchdog::last_debugged_alarm_time_ = TimeTicks(); +// static +TimeDelta Watchdog::last_debugged_alarm_delay_; + +} // namespace base diff --git a/base/threading/watchdog.h b/base/threading/watchdog.h new file mode 100644 index 0000000..025fe09 --- /dev/null +++ b/base/threading/watchdog.h @@ -0,0 +1,98 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// The Watchdog class creates a second thread that can Alarm if a specific +// duration of time passes without proper attention. The duration of time is +// specified at construction time. The Watchdog may be used many times by +// simply calling Arm() (to start timing) and Disarm() (to reset the timer). +// The Watchdog is typically used under a debugger, where the stack traces on +// other threads can be examined if/when the Watchdog alarms. + +// Some watchdogs will be enabled or disabled via command line switches. To +// facilitate such code, an "enabled" argument for the constuctor can be used +// to permanently disable the watchdog. Disabled watchdogs don't even spawn +// a second thread, and their methods call (Arm() and Disarm()) return very +// quickly. + +#ifndef BASE_THREADING_WATCHDOG_H_ +#define BASE_THREADING_WATCHDOG_H_ +#pragma once + +#include + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/time.h" + +namespace base { + +class Watchdog { + public: + // Constructor specifies how long the Watchdog will wait before alarming. + Watchdog(const base::TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled); + virtual ~Watchdog(); + + // Start timing, and alarm when time expires (unless we're disarm()ed.) + void Arm(); // Arm starting now. + void ArmSomeTimeDeltaAgo(const base::TimeDelta& time_delta); + void ArmAtStartTime(const base::TimeTicks start_time); + + // Reset time, and do not set off the alarm. + void Disarm(); + + // Alarm is called if the time expires after an Arm() without someone calling + // Disarm(). This method can be overridden to create testable classes. + virtual void Alarm(); + + // Reset static data to initial state. Useful for tests, to ensure + // they are independent. + static void ResetStaticData(); + + private: + class ThreadDelegate : public PlatformThread::Delegate { + public: + explicit ThreadDelegate(Watchdog* watchdog) : watchdog_(watchdog) { + } + virtual void ThreadMain(); + private: + Watchdog* watchdog_; + + void SetThreadName() const; + }; + + enum State {ARMED, DISARMED, SHUTDOWN }; + + bool init_successful_; + + Lock lock_; // Mutex for state_. + ConditionVariable condition_variable_; + State state_; + const base::TimeDelta duration_; // How long after start_time_ do we alarm? + const std::string thread_watched_name_; + PlatformThreadHandle handle_; + ThreadDelegate delegate_; // Store it, because it must outlive the thread. + + base::TimeTicks start_time_; // Start of epoch, and alarm after duration_. + + // When the debugger breaks (when we alarm), all the other alarms that are + // armed will expire (also alarm). To diminish this effect, we track any + // delay due to debugger breaks, and we *try* to adjust the effective start + // time of other alarms to step past the debugging break. + // Without this safety net, any alarm will typically trigger a host of follow + // on alarms from callers that specify old times. + static Lock static_lock_; // Lock for access of static data... + // When did we last alarm and get stuck (for a while) in a debugger? + static base::TimeTicks last_debugged_alarm_time_; + // How long did we sit on a break in the debugger? + static base::TimeDelta last_debugged_alarm_delay_; + + DISALLOW_COPY_AND_ASSIGN(Watchdog); +}; + +} // namespace base + +#endif // BASE_THREADING_WATCHDOG_H_ diff --git a/base/threading/watchdog_unittest.cc b/base/threading/watchdog_unittest.cc new file mode 100644 index 0000000..347781e --- /dev/null +++ b/base/threading/watchdog_unittest.cc @@ -0,0 +1,140 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/watchdog.h" + +#include "base/logging.h" +#include "base/platform_thread.h" +#include "base/spin_wait.h" +#include "base/time.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +//------------------------------------------------------------------------------ +// Provide a derived class to facilitate testing. + +class WatchdogCounter : public Watchdog { + public: + WatchdogCounter(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled) + : Watchdog(duration, thread_watched_name, enabled), alarm_counter_(0) { + } + + virtual ~WatchdogCounter() {} + + virtual void Alarm() { + alarm_counter_++; + Watchdog::Alarm(); + } + + int alarm_counter() { return alarm_counter_; } + + private: + int alarm_counter_; + + DISALLOW_COPY_AND_ASSIGN(WatchdogCounter); +}; + +class WatchdogTest : public testing::Test { + public: + void SetUp() { + Watchdog::ResetStaticData(); + } +}; + +} // namespace + +//------------------------------------------------------------------------------ +// Actual tests + +// Minimal constructor/destructor test. +TEST_F(WatchdogTest, StartupShutdownTest) { + Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); + Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); +} + +// Test ability to call Arm and Disarm repeatedly. +TEST_F(WatchdogTest, ArmDisarmTest) { + Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); + watchdog1.Arm(); + watchdog1.Disarm(); + watchdog1.Arm(); + watchdog1.Disarm(); + + Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); + watchdog2.Arm(); + watchdog2.Disarm(); + watchdog2.Arm(); + watchdog2.Disarm(); +} + +// Make sure a basic alarm fires when the time has expired. +TEST_F(WatchdogTest, AlarmTest) { + WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Enabled", true); + watchdog.Arm(); + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +// Make sure a basic alarm fires when the time has expired. +TEST_F(WatchdogTest, AlarmPriorTimeTest) { + WatchdogCounter watchdog(TimeDelta(), "Enabled2", true); + // Set a time in the past. + watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(2)); + // It should instantly go off, but certainly in less than 5 minutes. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +// Make sure a disable alarm does nothing, even if we arm it. +TEST_F(WatchdogTest, ConstructorDisabledTest) { + WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Disabled", false); + watchdog.Arm(); + // Alarm should not fire, as it was disabled. + PlatformThread::Sleep(500); + EXPECT_EQ(0, watchdog.alarm_counter()); +} + +// Make sure Disarming will prevent firing, even after Arming. +TEST_F(WatchdogTest, DisarmTest) { + WatchdogCounter watchdog(TimeDelta::FromSeconds(1), "Enabled3", true); + + TimeTicks start = TimeTicks::Now(); + watchdog.Arm(); + PlatformThread::Sleep(100); // Sleep a bit, but not past the alarm point. + watchdog.Disarm(); + TimeTicks end = TimeTicks::Now(); + + if (end - start > TimeDelta::FromMilliseconds(500)) { + LOG(WARNING) << "100ms sleep took over 500ms, making the results of this " + << "timing-sensitive test suspicious. Aborting now."; + return; + } + + // Alarm should not have fired before it was disarmed. + EXPECT_EQ(0, watchdog.alarm_counter()); + + // Sleep past the point where it would have fired if it wasn't disarmed, + // and verify that it didn't fire. + PlatformThread::Sleep(1000); + EXPECT_EQ(0, watchdog.alarm_counter()); + + // ...but even after disarming, we can still use the alarm... + // Set a time greater than the timeout into the past. + watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(10)); + // It should almost instantly go off, but certainly in less than 5 minutes. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +} // namespace base diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h new file mode 100644 index 0000000..9a02acc --- /dev/null +++ b/base/threading/worker_pool.h @@ -0,0 +1,35 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_WORKER_POOL_H_ +#define BASE_THREADING_WORKER_POOL_H_ +#pragma once + +#include "base/tracked.h" + +class Task; + +namespace base { + +// This is a facility that runs tasks that don't require a specific thread or +// a message loop. +// +// WARNING: This shouldn't be used unless absolutely necessary. We don't wait +// for the worker pool threads to finish on shutdown, so the tasks running +// inside the pool must be extremely careful about other objects they access +// (MessageLoops, Singletons, etc). During shutdown these object may no longer +// exist. +class WorkerPool { + public: + // This function posts |task| to run on a worker thread. |task_is_slow| + // should be used for tasks that will take a long time to execute. Returns + // false if |task| could not be posted to a worker thread. Regardless of + // return value, ownership of |task| is transferred to the worker pool. + static bool PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow); +}; + +} // namespace base + +#endif // BASE_THREADING_WORKER_POOL_H_ diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc new file mode 100644 index 0000000..2facc01 --- /dev/null +++ b/base/threading/worker_pool_posix.cc @@ -0,0 +1,169 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" +#include "base/stringprintf.h" +#include "base/task.h" +#include "base/threading/worker_pool.h" + +namespace base { + +namespace { + +const int kIdleSecondsBeforeExit = 10 * 60; +// A stack size of 64 KB is too small for the CERT_PKIXVerifyCert +// function of NSS because of NSS bug 439169. +const int kWorkerThreadStackSize = 128 * 1024; + +class WorkerPoolImpl { + public: + WorkerPoolImpl(); + ~WorkerPoolImpl(); + + void PostTask(const tracked_objects::Location& from_here, Task* task, + bool task_is_slow); + + private: + scoped_refptr pool_; +}; + +WorkerPoolImpl::WorkerPoolImpl() + : pool_(new base::PosixDynamicThreadPool("WorkerPool", + kIdleSecondsBeforeExit)) { +} + +WorkerPoolImpl::~WorkerPoolImpl() { + pool_->Terminate(); +} + +void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + task->SetBirthPlace(from_here); + pool_->PostTask(task); +} + +base::LazyInstance g_lazy_worker_pool(base::LINKER_INITIALIZED); + +class WorkerThread : public PlatformThread::Delegate { + public: + WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, + base::PosixDynamicThreadPool* pool) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + pool_(pool) {} + + virtual void ThreadMain(); + + private: + const std::string name_prefix_; + const int idle_seconds_before_exit_; + scoped_refptr pool_; + + DISALLOW_COPY_AND_ASSIGN(WorkerThread); +}; + +void WorkerThread::ThreadMain() { + const std::string name = base::StringPrintf( + "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); + PlatformThread::SetName(name.c_str()); + + for (;;) { + Task* task = pool_->WaitForTask(); + if (!task) + break; + task->Run(); + delete task; + } + + // The WorkerThread is non-joinable, so it deletes itself. + delete this; +} + +} // namespace + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); + return true; +} + +PosixDynamicThreadPool::PosixDynamicThreadPool( + const std::string& name_prefix, + int idle_seconds_before_exit) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + tasks_available_cv_(&lock_), + num_idle_threads_(0), + terminated_(false), + num_idle_threads_cv_(NULL) {} + +PosixDynamicThreadPool::~PosixDynamicThreadPool() { + while (!tasks_.empty()) { + Task* task = tasks_.front(); + tasks_.pop(); + delete task; + } +} + +void PosixDynamicThreadPool::Terminate() { + { + AutoLock locked(lock_); + DCHECK(!terminated_) << "Thread pool is already terminated."; + terminated_ = true; + } + tasks_available_cv_.Broadcast(); +} + +void PosixDynamicThreadPool::PostTask(Task* task) { + AutoLock locked(lock_); + DCHECK(!terminated_) << + "This thread pool is already terminated. Do not post new tasks."; + + tasks_.push(task); + + // We have enough worker threads. + if (static_cast(num_idle_threads_) >= tasks_.size()) { + tasks_available_cv_.Signal(); + } else { + // The new PlatformThread will take ownership of the WorkerThread object, + // which will delete itself on exit. + WorkerThread* worker = + new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); + PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); + } +} + +Task* PosixDynamicThreadPool::WaitForTask() { + AutoLock locked(lock_); + + if (terminated_) + return NULL; + + if (tasks_.empty()) { // No work available, wait for work. + num_idle_threads_++; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + tasks_available_cv_.TimedWait( + TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); + num_idle_threads_--; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + if (tasks_.empty()) { + // We waited for work, but there's still no work. Return NULL to signal + // the thread to terminate. + return NULL; + } + } + + Task* task = tasks_.front(); + tasks_.pop(); + return task; +} + +} // namespace base diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h new file mode 100644 index 0000000..6c99e76 --- /dev/null +++ b/base/threading/worker_pool_posix.h @@ -0,0 +1,89 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// The thread pool used in the POSIX implementation of WorkerPool dynamically +// adds threads as necessary to handle all tasks. It keeps old threads around +// for a period of time to allow them to be reused. After this waiting period, +// the threads exit. This thread pool uses non-joinable threads, therefore +// worker threads are not joined during process shutdown. This means that +// potentially long running tasks (such as DNS lookup) do not block process +// shutdown, but also means that process shutdown may "leak" objects. Note that +// although PosixDynamicThreadPool spawns the worker threads and manages the +// task queue, it does not own the worker threads. The worker threads ask the +// PosixDynamicThreadPool for work and eventually clean themselves up. The +// worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool +// instance, which prevents PosixDynamicThreadPool from disappearing before all +// worker threads exit. The owner of PosixDynamicThreadPool should likewise +// maintain a scoped_refptr to the PosixDynamicThreadPool instance. +// +// NOTE: The classes defined in this file are only meant for use by the POSIX +// implementation of WorkerPool. No one else should be using these classes. +// These symbols are exported in a header purely for testing purposes. + +#ifndef BASE_THREADING_WORKER_POOL_POSIX_H_ +#define BASE_THREADING_WORKER_POOL_POSIX_H_ +#pragma once + +#include +#include + +#include "base/basictypes.h" +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" +#include "base/scoped_ptr.h" + +class Task; + +namespace base { + +class PosixDynamicThreadPool + : public RefCountedThreadSafe { + public: + class PosixDynamicThreadPoolPeer; + + // All worker threads will share the same |name_prefix|. They will exit after + // |idle_seconds_before_exit|. + PosixDynamicThreadPool(const std::string& name_prefix, + int idle_seconds_before_exit); + ~PosixDynamicThreadPool(); + + // Indicates that the thread pool is going away. Stops handing out tasks to + // worker threads. Wakes up all the idle threads to let them exit. + void Terminate(); + + // Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership + // of |task|. + void PostTask(Task* task); + + // Worker thread method to wait for up to |idle_seconds_before_exit| for more + // work from the thread pool. Returns NULL if no work is available. + Task* WaitForTask(); + + private: + friend class PosixDynamicThreadPoolPeer; + + const std::string name_prefix_; + const int idle_seconds_before_exit_; + + Lock lock_; // Protects all the variables below. + + // Signal()s worker threads to let them know more tasks are available. + // Also used for Broadcast()'ing to worker threads to let them know the pool + // is being deleted and they can exit. + ConditionVariable tasks_available_cv_; + int num_idle_threads_; + std::queue tasks_; + bool terminated_; + // Only used for tests to ensure correct thread ordering. It will always be + // NULL in non-test code. + scoped_ptr num_idle_threads_cv_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool); +}; + +} // namespace base + +#endif // BASE_THREADING_WORKER_POOL_POSIX_H_ diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc new file mode 100644 index 0000000..48df16e --- /dev/null +++ b/base/threading/worker_pool_posix_unittest.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/platform_thread.h" +#include "base/task.h" +#include "base/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Peer class to provide passthrough access to PosixDynamicThreadPool internals. +class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { + public: + explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool) + : pool_(pool) {} + + Lock* lock() { return &pool_->lock_; } + ConditionVariable* tasks_available_cv() { + return &pool_->tasks_available_cv_; + } + const std::queue& tasks() const { return pool_->tasks_; } + int num_idle_threads() const { return pool_->num_idle_threads_; } + ConditionVariable* num_idle_threads_cv() { + return pool_->num_idle_threads_cv_.get(); + } + void set_num_idle_threads_cv(ConditionVariable* cv) { + pool_->num_idle_threads_cv_.reset(cv); + } + + private: + PosixDynamicThreadPool* pool_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer); +}; + +namespace { + +// IncrementingTask's main purpose is to increment a counter. It also updates a +// set of unique thread ids, and signals a ConditionVariable on completion. +// Note that since it does not block, there is no way to control the number of +// threads used if more than one IncrementingTask is consecutively posted to the +// thread pool, since the first one might finish executing before the subsequent +// PostTask() calls get invoked. +class IncrementingTask : public Task { + public: + IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set* unique_threads) + : counter_lock_(counter_lock), + unique_threads_lock_(unique_threads_lock), + unique_threads_(unique_threads), + counter_(counter) {} + + virtual void Run() { + AddSelfToUniqueThreadSet(); + AutoLock locked(*counter_lock_); + (*counter_)++; + } + + void AddSelfToUniqueThreadSet() { + AutoLock locked(*unique_threads_lock_); + unique_threads_->insert(PlatformThread::CurrentId()); + } + + private: + Lock* counter_lock_; + Lock* unique_threads_lock_; + std::set* unique_threads_; + int* counter_; + + DISALLOW_COPY_AND_ASSIGN(IncrementingTask); +}; + +// BlockingIncrementingTask is a simple wrapper around IncrementingTask that +// allows for waiting at the start of Run() for a WaitableEvent to be signalled. +class BlockingIncrementingTask : public Task { + public: + BlockingIncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set* unique_threads, + Lock* num_waiting_to_start_lock, + int* num_waiting_to_start, + ConditionVariable* num_waiting_to_start_cv, + base::WaitableEvent* start) + : incrementer_( + counter_lock, counter, unique_threads_lock, unique_threads), + num_waiting_to_start_lock_(num_waiting_to_start_lock), + num_waiting_to_start_(num_waiting_to_start), + num_waiting_to_start_cv_(num_waiting_to_start_cv), + start_(start) {} + + virtual void Run() { + { + AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); + (*num_waiting_to_start_)++; + } + num_waiting_to_start_cv_->Signal(); + CHECK(start_->Wait()); + incrementer_.Run(); + } + + private: + IncrementingTask incrementer_; + Lock* num_waiting_to_start_lock_; + int* num_waiting_to_start_; + ConditionVariable* num_waiting_to_start_cv_; + base::WaitableEvent* start_; + + DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); +}; + +class PosixDynamicThreadPoolTest : public testing::Test { + protected: + PosixDynamicThreadPoolTest() + : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), + peer_(pool_.get()), + counter_(0), + num_waiting_to_start_(0), + num_waiting_to_start_cv_(&num_waiting_to_start_lock_), + start_(true, false) {} + + virtual void SetUp() { + peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); + } + + virtual void TearDown() { + // Wake up the idle threads so they can terminate. + if (pool_.get()) pool_->Terminate(); + } + + void WaitForTasksToStart(int num_tasks) { + AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); + while (num_waiting_to_start_ < num_tasks) { + num_waiting_to_start_cv_.Wait(); + } + } + + void WaitForIdleThreads(int num_idle_threads) { + AutoLock pool_locked(*peer_.lock()); + while (peer_.num_idle_threads() < num_idle_threads) { + peer_.num_idle_threads_cv()->Wait(); + } + } + + Task* CreateNewIncrementingTask() { + return new IncrementingTask(&counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); + } + + Task* CreateNewBlockingIncrementingTask() { + return new BlockingIncrementingTask( + &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, + &num_waiting_to_start_lock_, &num_waiting_to_start_, + &num_waiting_to_start_cv_, &start_); + } + + scoped_refptr pool_; + base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; + Lock counter_lock_; + int counter_; + Lock unique_threads_lock_; + std::set unique_threads_; + Lock num_waiting_to_start_lock_; + int num_waiting_to_start_; + ConditionVariable num_waiting_to_start_cv_; + base::WaitableEvent start_; +}; + +} // namespace + +TEST_F(PosixDynamicThreadPoolTest, Basic) { + EXPECT_EQ(0, peer_.num_idle_threads()); + EXPECT_EQ(0U, unique_threads_.size()); + EXPECT_EQ(0U, peer_.tasks().size()); + + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + EXPECT_EQ(1U, unique_threads_.size()) << + "There should be only one thread allocated for one task."; + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(1, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { + // Add one task and wait for it to be completed. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add another 2 tasks. One should reuse the existing worker thread. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(3, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { + // Add two blocking tasks. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; + EXPECT_EQ(2, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, Complex) { + // Add two non blocking tasks and wait for them to finish. + pool_->PostTask(CreateNewIncrementingTask()); + + WaitForIdleThreads(1); + + // Add two blocking tasks, start them simultaneously, and wait for them to + // finish. + pool_->PostTask(CreateNewBlockingIncrementingTask()); + pool_->PostTask(CreateNewBlockingIncrementingTask()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(3, counter_); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, unique_threads_.size()); + + // Wake up all idle threads so they can exit. + { + AutoLock locked(*peer_.lock()); + while (peer_.num_idle_threads() > 0) { + peer_.tasks_available_cv()->Signal(); + peer_.num_idle_threads_cv()->Wait(); + } + } + + // Add another non blocking task. There are no threads to reuse. + pool_->PostTask(CreateNewIncrementingTask()); + WaitForIdleThreads(1); + + EXPECT_EQ(3U, unique_threads_.size()); + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(4, counter_); +} + +} // namespace base diff --git a/base/threading/worker_pool_unittest.cc b/base/threading/worker_pool_unittest.cc new file mode 100644 index 0000000..cf8e0e8 --- /dev/null +++ b/base/threading/worker_pool_unittest.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/task.h" +#include "base/waitable_event.h" +#include "base/threading/worker_pool.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +typedef PlatformTest WorkerPoolTest; + +namespace base { + +namespace { + +class PostTaskTestTask : public Task { + public: + explicit PostTaskTestTask(WaitableEvent* event) : event_(event) { + } + + void Run() { + event_->Signal(); + } + + private: + WaitableEvent* event_; +}; + +} // namespace + +TEST_F(WorkerPoolTest, PostTask) { + WaitableEvent test_event(false, false); + WaitableEvent long_test_event(false, false); + bool signaled; + + WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&test_event), false); + WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&long_test_event), true); + + signaled = test_event.Wait(); + EXPECT_TRUE(signaled); + signaled = long_test_event.Wait(); + EXPECT_TRUE(signaled); +} + +} // namespace base diff --git a/base/threading/worker_pool_win.cc b/base/threading/worker_pool_win.cc new file mode 100644 index 0000000..2072e52 --- /dev/null +++ b/base/threading/worker_pool_win.cc @@ -0,0 +1,40 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool.h" + +#include "base/logging.h" +#include "base/task.h" + +namespace base { + +namespace { + +DWORD CALLBACK WorkItemCallback(void* param) { + Task* task = static_cast(param); + task->Run(); + delete task; + return 0; +} + +} // namespace + +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + Task* task, bool task_is_slow) { + task->SetBirthPlace(from_here); + + ULONG flags = 0; + if (task_is_slow) + flags |= WT_EXECUTELONGFUNCTION; + + if (!QueueUserWorkItem(WorkItemCallback, task, flags)) { + DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError(); + delete task; + return false; + } + + return true; +} + +} // namespace base diff --git a/base/watchdog.cc b/base/watchdog.cc deleted file mode 100644 index f9326e1..0000000 --- a/base/watchdog.cc +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/watchdog.h" - -#include "base/compiler_specific.h" -#include "base/logging.h" -#include "base/platform_thread.h" - -using base::TimeDelta; -using base::TimeTicks; - -//------------------------------------------------------------------------------ -// Public API methods. - -// Start thread running in a Disarmed state. -Watchdog::Watchdog(const TimeDelta& duration, - const std::string& thread_watched_name, - bool enabled) - : init_successful_(false), - lock_(), - condition_variable_(&lock_), - state_(DISARMED), - duration_(duration), - thread_watched_name_(thread_watched_name), - ALLOW_THIS_IN_INITIALIZER_LIST(delegate_(this)) { - if (!enabled) - return; // Don't start thread, or doing anything really. - init_successful_ = PlatformThread::Create(0, // Default stack size. - &delegate_, - &handle_); - DCHECK(init_successful_); -} - -// Notify watchdog thread, and wait for it to finish up. -Watchdog::~Watchdog() { - if (!init_successful_) - return; - { - AutoLock lock(lock_); - state_ = SHUTDOWN; - } - condition_variable_.Signal(); - PlatformThread::Join(handle_); -} - -void Watchdog::Arm() { - ArmAtStartTime(TimeTicks::Now()); -} - -void Watchdog::ArmSomeTimeDeltaAgo(const TimeDelta& time_delta) { - ArmAtStartTime(TimeTicks::Now() - time_delta); -} - -// Start clock for watchdog. -void Watchdog::ArmAtStartTime(const TimeTicks start_time) { - { - AutoLock lock(lock_); - start_time_ = start_time; - state_ = ARMED; - } - // Force watchdog to wake up, and go to sleep with the timer ticking with the - // proper duration. - condition_variable_.Signal(); -} - -// Disable watchdog so that it won't do anything when time expires. -void Watchdog::Disarm() { - AutoLock lock(lock_); - state_ = DISARMED; - // We don't need to signal, as the watchdog will eventually wake up, and it - // will check its state and time, and act accordingly. -} - -void Watchdog::Alarm() { - DVLOG(1) << "Watchdog alarmed for " << thread_watched_name_; -} - -//------------------------------------------------------------------------------ -// Internal private methods that the watchdog thread uses. - -void Watchdog::ThreadDelegate::ThreadMain() { - SetThreadName(); - TimeDelta remaining_duration; - while (1) { - AutoLock lock(watchdog_->lock_); - while (DISARMED == watchdog_->state_) - watchdog_->condition_variable_.Wait(); - if (SHUTDOWN == watchdog_->state_) - return; - DCHECK(ARMED == watchdog_->state_); - remaining_duration = watchdog_->duration_ - - (TimeTicks::Now() - watchdog_->start_time_); - if (remaining_duration.InMilliseconds() > 0) { - // Spurios wake? Timer drifts? Go back to sleep for remaining time. - watchdog_->condition_variable_.TimedWait(remaining_duration); - continue; - } - // We overslept, so this seems like a real alarm. - // Watch out for a user that stopped the debugger on a different alarm! - { - AutoLock static_lock(static_lock_); - if (last_debugged_alarm_time_ > watchdog_->start_time_) { - // False alarm: we started our clock before the debugger break (last - // alarm time). - watchdog_->start_time_ += last_debugged_alarm_delay_; - if (last_debugged_alarm_time_ > watchdog_->start_time_) - // Too many alarms must have taken place. - watchdog_->state_ = DISARMED; - continue; - } - } - watchdog_->state_ = DISARMED; // Only alarm at most once. - TimeTicks last_alarm_time = TimeTicks::Now(); - watchdog_->Alarm(); // Set a break point here to debug on alarms. - TimeDelta last_alarm_delay = TimeTicks::Now() - last_alarm_time; - if (last_alarm_delay <= TimeDelta::FromMilliseconds(2)) - continue; - // Ignore race of two alarms/breaks going off at roughly the same time. - AutoLock static_lock(static_lock_); - // This was a real debugger break. - last_debugged_alarm_time_ = last_alarm_time; - last_debugged_alarm_delay_ = last_alarm_delay; - } -} - -void Watchdog::ThreadDelegate::SetThreadName() const { - std::string name = watchdog_->thread_watched_name_ + " Watchdog"; - PlatformThread::SetName(name.c_str()); - DVLOG(1) << "Watchdog active: " << name; -} - -// static -void Watchdog::ResetStaticData() { - AutoLock lock(static_lock_); - last_debugged_alarm_time_ = TimeTicks(); - last_debugged_alarm_delay_ = TimeDelta(); -} - -// static -Lock Watchdog::static_lock_; // Lock for access of static data... -// static -TimeTicks Watchdog::last_debugged_alarm_time_ = TimeTicks(); -// static -TimeDelta Watchdog::last_debugged_alarm_delay_; diff --git a/base/watchdog.h b/base/watchdog.h deleted file mode 100644 index b4262d4..0000000 --- a/base/watchdog.h +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -// The Watchdog class creates a second thread that can Alarm if a specific -// duration of time passes without proper attention. The duration of time is -// specified at construction time. The Watchdog may be used many times by -// simply calling Arm() (to start timing) and Disarm() (to reset the timer). -// The Watchdog is typically used under a debugger, where the stack traces on -// other threads can be examined if/when the Watchdog alarms. - -// Some watchdogs will be enabled or disabled via command line switches. To -// facilitate such code, an "enabled" argument for the constuctor can be used -// to permanently disable the watchdog. Disabled watchdogs don't even spawn -// a second thread, and their methods call (Arm() and Disarm()) return very -// quickly. - -#ifndef BASE_WATCHDOG_H__ -#define BASE_WATCHDOG_H__ -#pragma once - -#include - -#include "base/condition_variable.h" -#include "base/lock.h" -#include "base/platform_thread.h" -#include "base/time.h" - -class Watchdog { - public: - // Constructor specifies how long the Watchdog will wait before alarming. - Watchdog(const base::TimeDelta& duration, - const std::string& thread_watched_name, - bool enabled); - virtual ~Watchdog(); - - // Start timing, and alarm when time expires (unless we're disarm()ed.) - void Arm(); // Arm starting now. - void ArmSomeTimeDeltaAgo(const base::TimeDelta& time_delta); - void ArmAtStartTime(const base::TimeTicks start_time); - - // Reset time, and do not set off the alarm. - void Disarm(); - - // Alarm is called if the time expires after an Arm() without someone calling - // Disarm(). This method can be overridden to create testable classes. - virtual void Alarm(); - - // Reset static data to initial state. Useful for tests, to ensure - // they are independent. - static void ResetStaticData(); - - private: - class ThreadDelegate : public PlatformThread::Delegate { - public: - explicit ThreadDelegate(Watchdog* watchdog) : watchdog_(watchdog) { - } - virtual void ThreadMain(); - private: - Watchdog* watchdog_; - - void SetThreadName() const; - }; - - enum State {ARMED, DISARMED, SHUTDOWN }; - - bool init_successful_; - - Lock lock_; // Mutex for state_. - ConditionVariable condition_variable_; - State state_; - const base::TimeDelta duration_; // How long after start_time_ do we alarm? - const std::string thread_watched_name_; - PlatformThreadHandle handle_; - ThreadDelegate delegate_; // Store it, because it must outlive the thread. - - base::TimeTicks start_time_; // Start of epoch, and alarm after duration_. - - // When the debugger breaks (when we alarm), all the other alarms that are - // armed will expire (also alarm). To diminish this effect, we track any - // delay due to debugger breaks, and we *try* to adjust the effective start - // time of other alarms to step past the debugging break. - // Without this safety net, any alarm will typically trigger a host of follow - // on alarms from callers that specify old times. - static Lock static_lock_; // Lock for access of static data... - // When did we last alarm and get stuck (for a while) in a debugger? - static base::TimeTicks last_debugged_alarm_time_; - // How long did we sit on a break in the debugger? - static base::TimeDelta last_debugged_alarm_delay_; - - DISALLOW_COPY_AND_ASSIGN(Watchdog); -}; - -#endif // BASE_WATCHDOG_H__ diff --git a/base/watchdog_unittest.cc b/base/watchdog_unittest.cc deleted file mode 100644 index 658a31a..0000000 --- a/base/watchdog_unittest.cc +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -// Tests for Watchdog class. - -#include "base/watchdog.h" - -#include "base/logging.h" -#include "base/platform_thread.h" -#include "base/spin_wait.h" -#include "base/time.h" -#include "testing/gtest/include/gtest/gtest.h" - -using base::TimeDelta; -using base::TimeTicks; - -namespace { - -//------------------------------------------------------------------------------ -// Provide a derived class to facilitate testing. - -class WatchdogCounter : public Watchdog { - public: - WatchdogCounter(const TimeDelta& duration, - const std::string& thread_watched_name, - bool enabled) - : Watchdog(duration, thread_watched_name, enabled), alarm_counter_(0) { - } - - virtual ~WatchdogCounter() {} - - virtual void Alarm() { - alarm_counter_++; - Watchdog::Alarm(); - } - - int alarm_counter() { return alarm_counter_; } - - private: - int alarm_counter_; - - DISALLOW_COPY_AND_ASSIGN(WatchdogCounter); -}; - -class WatchdogTest : public testing::Test { - public: - void SetUp() { - Watchdog::ResetStaticData(); - } -}; - - -//------------------------------------------------------------------------------ -// Actual tests - -// Minimal constructor/destructor test. -TEST_F(WatchdogTest, StartupShutdownTest) { - Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); - Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); -} - -// Test ability to call Arm and Disarm repeatedly. -TEST_F(WatchdogTest, ArmDisarmTest) { - Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); - watchdog1.Arm(); - watchdog1.Disarm(); - watchdog1.Arm(); - watchdog1.Disarm(); - - Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); - watchdog2.Arm(); - watchdog2.Disarm(); - watchdog2.Arm(); - watchdog2.Disarm(); -} - -// Make sure a basic alarm fires when the time has expired. -TEST_F(WatchdogTest, AlarmTest) { - WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Enabled", true); - watchdog.Arm(); - SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), - watchdog.alarm_counter() > 0); - EXPECT_EQ(1, watchdog.alarm_counter()); -} - -// Make sure a basic alarm fires when the time has expired. -TEST_F(WatchdogTest, AlarmPriorTimeTest) { - WatchdogCounter watchdog(TimeDelta(), "Enabled2", true); - // Set a time in the past. - watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(2)); - // It should instantly go off, but certainly in less than 5 minutes. - SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), - watchdog.alarm_counter() > 0); - - EXPECT_EQ(1, watchdog.alarm_counter()); -} - -// Make sure a disable alarm does nothing, even if we arm it. -TEST_F(WatchdogTest, ConstructorDisabledTest) { - WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Disabled", false); - watchdog.Arm(); - // Alarm should not fire, as it was disabled. - PlatformThread::Sleep(500); - EXPECT_EQ(0, watchdog.alarm_counter()); -} - -// Make sure Disarming will prevent firing, even after Arming. -TEST_F(WatchdogTest, DisarmTest) { - WatchdogCounter watchdog(TimeDelta::FromSeconds(1), "Enabled3", true); - - TimeTicks start = TimeTicks::Now(); - watchdog.Arm(); - PlatformThread::Sleep(100); // Sleep a bit, but not past the alarm point. - watchdog.Disarm(); - TimeTicks end = TimeTicks::Now(); - - if (end - start > TimeDelta::FromMilliseconds(500)) { - LOG(WARNING) << "100ms sleep took over 500ms, making the results of this " - << "timing-sensitive test suspicious. Aborting now."; - return; - } - - // Alarm should not have fired before it was disarmed. - EXPECT_EQ(0, watchdog.alarm_counter()); - - // Sleep past the point where it would have fired if it wasn't disarmed, - // and verify that it didn't fire. - PlatformThread::Sleep(1000); - EXPECT_EQ(0, watchdog.alarm_counter()); - - // ...but even after disarming, we can still use the alarm... - // Set a time greater than the timeout into the past. - watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(10)); - // It should almost instantly go off, but certainly in less than 5 minutes. - SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), - watchdog.alarm_counter() > 0); - - EXPECT_EQ(1, watchdog.alarm_counter()); -} - -} // namespace diff --git a/base/worker_pool.h b/base/worker_pool.h deleted file mode 100644 index e0b75a9..0000000 --- a/base/worker_pool.h +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef BASE_WORKER_POOL_H_ -#define BASE_WORKER_POOL_H_ -#pragma once - -#include "base/tracked.h" - -class Task; - -// This is a facility that runs tasks that don't require a specific thread or -// a message loop. -// -// WARNING: This shouldn't be used unless absolutely necessary. We don't wait -// for the worker pool threads to finish on shutdown, so the tasks running -// inside the pool must be extremely careful about other objects they access -// (MessageLoops, Singletons, etc). During shutdown these object may no longer -// exist. -class WorkerPool { - public: - // This function posts |task| to run on a worker thread. |task_is_slow| - // should be used for tasks that will take a long time to execute. Returns - // false if |task| could not be posted to a worker thread. Regardless of - // return value, ownership of |task| is transferred to the worker pool. - static bool PostTask(const tracked_objects::Location& from_here, - Task* task, bool task_is_slow); -}; - -#endif // BASE_WORKER_POOL_H_ diff --git a/base/worker_pool_posix.cc b/base/worker_pool_posix.cc deleted file mode 100644 index 85e1d8e..0000000 --- a/base/worker_pool_posix.cc +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/worker_pool.h" -#include "base/worker_pool_posix.h" - -#include "base/lazy_instance.h" -#include "base/logging.h" -#include "base/platform_thread.h" -#include "base/ref_counted.h" -#include "base/stringprintf.h" -#include "base/task.h" - -namespace { - -const int kIdleSecondsBeforeExit = 10 * 60; -// A stack size of 64 KB is too small for the CERT_PKIXVerifyCert -// function of NSS because of NSS bug 439169. -const int kWorkerThreadStackSize = 128 * 1024; - -class WorkerPoolImpl { - public: - WorkerPoolImpl(); - ~WorkerPoolImpl(); - - void PostTask(const tracked_objects::Location& from_here, Task* task, - bool task_is_slow); - - private: - scoped_refptr pool_; -}; - -WorkerPoolImpl::WorkerPoolImpl() - : pool_(new base::PosixDynamicThreadPool( - "WorkerPool", kIdleSecondsBeforeExit)) {} - -WorkerPoolImpl::~WorkerPoolImpl() { - pool_->Terminate(); -} - -void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, - Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); - pool_->PostTask(task); -} - -base::LazyInstance g_lazy_worker_pool(base::LINKER_INITIALIZED); - -class WorkerThread : public PlatformThread::Delegate { - public: - WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit, - base::PosixDynamicThreadPool* pool) - : name_prefix_(name_prefix), - idle_seconds_before_exit_(idle_seconds_before_exit), - pool_(pool) {} - - virtual void ThreadMain(); - - private: - const std::string name_prefix_; - const int idle_seconds_before_exit_; - scoped_refptr pool_; - - DISALLOW_COPY_AND_ASSIGN(WorkerThread); -}; - -void WorkerThread::ThreadMain() { - const std::string name = base::StringPrintf( - "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); - PlatformThread::SetName(name.c_str()); - - for (;;) { - Task* task = pool_->WaitForTask(); - if (!task) - break; - task->Run(); - delete task; - } - - // The WorkerThread is non-joinable, so it deletes itself. - delete this; -} - -} // namespace - -bool WorkerPool::PostTask(const tracked_objects::Location& from_here, - Task* task, bool task_is_slow) { - g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); - return true; -} - -namespace base { - -PosixDynamicThreadPool::PosixDynamicThreadPool( - const std::string& name_prefix, - int idle_seconds_before_exit) - : name_prefix_(name_prefix), - idle_seconds_before_exit_(idle_seconds_before_exit), - tasks_available_cv_(&lock_), - num_idle_threads_(0), - terminated_(false), - num_idle_threads_cv_(NULL) {} - -PosixDynamicThreadPool::~PosixDynamicThreadPool() { - while (!tasks_.empty()) { - Task* task = tasks_.front(); - tasks_.pop(); - delete task; - } -} - -void PosixDynamicThreadPool::Terminate() { - { - AutoLock locked(lock_); - DCHECK(!terminated_) << "Thread pool is already terminated."; - terminated_ = true; - } - tasks_available_cv_.Broadcast(); -} - -void PosixDynamicThreadPool::PostTask(Task* task) { - AutoLock locked(lock_); - DCHECK(!terminated_) << - "This thread pool is already terminated. Do not post new tasks."; - - tasks_.push(task); - - // We have enough worker threads. - if (static_cast(num_idle_threads_) >= tasks_.size()) { - tasks_available_cv_.Signal(); - } else { - // The new PlatformThread will take ownership of the WorkerThread object, - // which will delete itself on exit. - WorkerThread* worker = - new WorkerThread(name_prefix_, idle_seconds_before_exit_, this); - PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); - } -} - -Task* PosixDynamicThreadPool::WaitForTask() { - AutoLock locked(lock_); - - if (terminated_) - return NULL; - - if (tasks_.empty()) { // No work available, wait for work. - num_idle_threads_++; - if (num_idle_threads_cv_.get()) - num_idle_threads_cv_->Signal(); - tasks_available_cv_.TimedWait( - TimeDelta::FromSeconds(kIdleSecondsBeforeExit)); - num_idle_threads_--; - if (num_idle_threads_cv_.get()) - num_idle_threads_cv_->Signal(); - if (tasks_.empty()) { - // We waited for work, but there's still no work. Return NULL to signal - // the thread to terminate. - return NULL; - } - } - - Task* task = tasks_.front(); - tasks_.pop(); - return task; -} - -} // namespace base diff --git a/base/worker_pool_posix.h b/base/worker_pool_posix.h deleted file mode 100644 index 73d8287..0000000 --- a/base/worker_pool_posix.h +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -// -// The thread pool used in the POSIX implementation of WorkerPool dynamically -// adds threads as necessary to handle all tasks. It keeps old threads around -// for a period of time to allow them to be reused. After this waiting period, -// the threads exit. This thread pool uses non-joinable threads, therefore -// worker threads are not joined during process shutdown. This means that -// potentially long running tasks (such as DNS lookup) do not block process -// shutdown, but also means that process shutdown may "leak" objects. Note that -// although PosixDynamicThreadPool spawns the worker threads and manages the -// task queue, it does not own the worker threads. The worker threads ask the -// PosixDynamicThreadPool for work and eventually clean themselves up. The -// worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool -// instance, which prevents PosixDynamicThreadPool from disappearing before all -// worker threads exit. The owner of PosixDynamicThreadPool should likewise -// maintain a scoped_refptr to the PosixDynamicThreadPool instance. -// -// NOTE: The classes defined in this file are only meant for use by the POSIX -// implementation of WorkerPool. No one else should be using these classes. -// These symbols are exported in a header purely for testing purposes. - -#ifndef BASE_WORKER_POOL_POSIX_H_ -#define BASE_WORKER_POOL_POSIX_H_ -#pragma once - -#include -#include - -#include "base/basictypes.h" -#include "base/condition_variable.h" -#include "base/lock.h" -#include "base/platform_thread.h" -#include "base/ref_counted.h" -#include "base/scoped_ptr.h" - -class Task; - -namespace base { - -class PosixDynamicThreadPool - : public RefCountedThreadSafe { - public: - class PosixDynamicThreadPoolPeer; - - // All worker threads will share the same |name_prefix|. They will exit after - // |idle_seconds_before_exit|. - PosixDynamicThreadPool(const std::string& name_prefix, - int idle_seconds_before_exit); - ~PosixDynamicThreadPool(); - - // Indicates that the thread pool is going away. Stops handing out tasks to - // worker threads. Wakes up all the idle threads to let them exit. - void Terminate(); - - // Adds |task| to the thread pool. PosixDynamicThreadPool assumes ownership - // of |task|. - void PostTask(Task* task); - - // Worker thread method to wait for up to |idle_seconds_before_exit| for more - // work from the thread pool. Returns NULL if no work is available. - Task* WaitForTask(); - - private: - friend class PosixDynamicThreadPoolPeer; - - const std::string name_prefix_; - const int idle_seconds_before_exit_; - - Lock lock_; // Protects all the variables below. - - // Signal()s worker threads to let them know more tasks are available. - // Also used for Broadcast()'ing to worker threads to let them know the pool - // is being deleted and they can exit. - ConditionVariable tasks_available_cv_; - int num_idle_threads_; - std::queue tasks_; - bool terminated_; - // Only used for tests to ensure correct thread ordering. It will always be - // NULL in non-test code. - scoped_ptr num_idle_threads_cv_; - - DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool); -}; - -} // namespace base - -#endif // BASE_WORKER_POOL_POSIX_H_ diff --git a/base/worker_pool_posix_unittest.cc b/base/worker_pool_posix_unittest.cc deleted file mode 100644 index 55453c8..0000000 --- a/base/worker_pool_posix_unittest.cc +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/worker_pool_posix.h" - -#include - -#include "base/condition_variable.h" -#include "base/lock.h" -#include "base/platform_thread.h" -#include "base/task.h" -#include "base/waitable_event.h" -#include "testing/gtest/include/gtest/gtest.h" - -namespace base { - -// Peer class to provide passthrough access to PosixDynamicThreadPool internals. -class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { - public: - explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool) - : pool_(pool) {} - - Lock* lock() { return &pool_->lock_; } - ConditionVariable* tasks_available_cv() { - return &pool_->tasks_available_cv_; - } - const std::queue& tasks() const { return pool_->tasks_; } - int num_idle_threads() const { return pool_->num_idle_threads_; } - ConditionVariable* num_idle_threads_cv() { - return pool_->num_idle_threads_cv_.get(); - } - void set_num_idle_threads_cv(ConditionVariable* cv) { - pool_->num_idle_threads_cv_.reset(cv); - } - - private: - PosixDynamicThreadPool* pool_; - - DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer); -}; - -} // namespace base - -namespace { - -// IncrementingTask's main purpose is to increment a counter. It also updates a -// set of unique thread ids, and signals a ConditionVariable on completion. -// Note that since it does not block, there is no way to control the number of -// threads used if more than one IncrementingTask is consecutively posted to the -// thread pool, since the first one might finish executing before the subsequent -// PostTask() calls get invoked. -class IncrementingTask : public Task { - public: - IncrementingTask(Lock* counter_lock, - int* counter, - Lock* unique_threads_lock, - std::set* unique_threads) - : counter_lock_(counter_lock), - unique_threads_lock_(unique_threads_lock), - unique_threads_(unique_threads), - counter_(counter) {} - - virtual void Run() { - AddSelfToUniqueThreadSet(); - AutoLock locked(*counter_lock_); - (*counter_)++; - } - - void AddSelfToUniqueThreadSet() { - AutoLock locked(*unique_threads_lock_); - unique_threads_->insert(PlatformThread::CurrentId()); - } - - private: - Lock* counter_lock_; - Lock* unique_threads_lock_; - std::set* unique_threads_; - int* counter_; - - DISALLOW_COPY_AND_ASSIGN(IncrementingTask); -}; - -// BlockingIncrementingTask is a simple wrapper around IncrementingTask that -// allows for waiting at the start of Run() for a WaitableEvent to be signalled. -class BlockingIncrementingTask : public Task { - public: - BlockingIncrementingTask(Lock* counter_lock, - int* counter, - Lock* unique_threads_lock, - std::set* unique_threads, - Lock* num_waiting_to_start_lock, - int* num_waiting_to_start, - ConditionVariable* num_waiting_to_start_cv, - base::WaitableEvent* start) - : incrementer_( - counter_lock, counter, unique_threads_lock, unique_threads), - num_waiting_to_start_lock_(num_waiting_to_start_lock), - num_waiting_to_start_(num_waiting_to_start), - num_waiting_to_start_cv_(num_waiting_to_start_cv), - start_(start) {} - - virtual void Run() { - { - AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); - (*num_waiting_to_start_)++; - } - num_waiting_to_start_cv_->Signal(); - CHECK(start_->Wait()); - incrementer_.Run(); - } - - private: - IncrementingTask incrementer_; - Lock* num_waiting_to_start_lock_; - int* num_waiting_to_start_; - ConditionVariable* num_waiting_to_start_cv_; - base::WaitableEvent* start_; - - DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); -}; - -class PosixDynamicThreadPoolTest : public testing::Test { - protected: - PosixDynamicThreadPoolTest() - : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), - peer_(pool_.get()), - counter_(0), - num_waiting_to_start_(0), - num_waiting_to_start_cv_(&num_waiting_to_start_lock_), - start_(true, false) {} - - virtual void SetUp() { - peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); - } - - virtual void TearDown() { - // Wake up the idle threads so they can terminate. - if (pool_.get()) pool_->Terminate(); - } - - void WaitForTasksToStart(int num_tasks) { - AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); - while (num_waiting_to_start_ < num_tasks) { - num_waiting_to_start_cv_.Wait(); - } - } - - void WaitForIdleThreads(int num_idle_threads) { - AutoLock pool_locked(*peer_.lock()); - while (peer_.num_idle_threads() < num_idle_threads) { - peer_.num_idle_threads_cv()->Wait(); - } - } - - Task* CreateNewIncrementingTask() { - return new IncrementingTask(&counter_lock_, &counter_, - &unique_threads_lock_, &unique_threads_); - } - - Task* CreateNewBlockingIncrementingTask() { - return new BlockingIncrementingTask( - &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, - &num_waiting_to_start_lock_, &num_waiting_to_start_, - &num_waiting_to_start_cv_, &start_); - } - - scoped_refptr pool_; - base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; - Lock counter_lock_; - int counter_; - Lock unique_threads_lock_; - std::set unique_threads_; - Lock num_waiting_to_start_lock_; - int num_waiting_to_start_; - ConditionVariable num_waiting_to_start_cv_; - base::WaitableEvent start_; -}; - -TEST_F(PosixDynamicThreadPoolTest, Basic) { - EXPECT_EQ(0, peer_.num_idle_threads()); - EXPECT_EQ(0U, unique_threads_.size()); - EXPECT_EQ(0U, peer_.tasks().size()); - - // Add one task and wait for it to be completed. - pool_->PostTask(CreateNewIncrementingTask()); - - WaitForIdleThreads(1); - - EXPECT_EQ(1U, unique_threads_.size()) << - "There should be only one thread allocated for one task."; - EXPECT_EQ(1, peer_.num_idle_threads()); - EXPECT_EQ(1, counter_); -} - -TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { - // Add one task and wait for it to be completed. - pool_->PostTask(CreateNewIncrementingTask()); - - WaitForIdleThreads(1); - - // Add another 2 tasks. One should reuse the existing worker thread. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); - - WaitForTasksToStart(2); - start_.Signal(); - WaitForIdleThreads(2); - - EXPECT_EQ(2U, unique_threads_.size()); - EXPECT_EQ(2, peer_.num_idle_threads()); - EXPECT_EQ(3, counter_); -} - -TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { - // Add two blocking tasks. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); - - EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; - - WaitForTasksToStart(2); - start_.Signal(); - WaitForIdleThreads(2); - - EXPECT_EQ(2U, unique_threads_.size()); - EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; - EXPECT_EQ(2, counter_); -} - -TEST_F(PosixDynamicThreadPoolTest, Complex) { - // Add two non blocking tasks and wait for them to finish. - pool_->PostTask(CreateNewIncrementingTask()); - - WaitForIdleThreads(1); - - // Add two blocking tasks, start them simultaneously, and wait for them to - // finish. - pool_->PostTask(CreateNewBlockingIncrementingTask()); - pool_->PostTask(CreateNewBlockingIncrementingTask()); - - WaitForTasksToStart(2); - start_.Signal(); - WaitForIdleThreads(2); - - EXPECT_EQ(3, counter_); - EXPECT_EQ(2, peer_.num_idle_threads()); - EXPECT_EQ(2U, unique_threads_.size()); - - // Wake up all idle threads so they can exit. - { - AutoLock locked(*peer_.lock()); - while (peer_.num_idle_threads() > 0) { - peer_.tasks_available_cv()->Signal(); - peer_.num_idle_threads_cv()->Wait(); - } - } - - // Add another non blocking task. There are no threads to reuse. - pool_->PostTask(CreateNewIncrementingTask()); - WaitForIdleThreads(1); - - EXPECT_EQ(3U, unique_threads_.size()); - EXPECT_EQ(1, peer_.num_idle_threads()); - EXPECT_EQ(4, counter_); -} - -} // namespace diff --git a/base/worker_pool_unittest.cc b/base/worker_pool_unittest.cc deleted file mode 100644 index f6bee96..0000000 --- a/base/worker_pool_unittest.cc +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) 2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/task.h" -#include "base/waitable_event.h" -#include "base/worker_pool.h" -#include "testing/gtest/include/gtest/gtest.h" -#include "testing/platform_test.h" - -using base::WaitableEvent; - -typedef PlatformTest WorkerPoolTest; - -namespace { - -class PostTaskTestTask : public Task { - public: - explicit PostTaskTestTask(WaitableEvent* event) : event_(event) { - } - - void Run() { - event_->Signal(); - } - - private: - WaitableEvent* event_; -}; - -TEST_F(WorkerPoolTest, PostTask) { - WaitableEvent test_event(false, false); - WaitableEvent long_test_event(false, false); - bool signaled; - - WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&test_event), false); - WorkerPool::PostTask(FROM_HERE, new PostTaskTestTask(&long_test_event), true); - - signaled = test_event.Wait(); - EXPECT_TRUE(signaled); - signaled = long_test_event.Wait(); - EXPECT_TRUE(signaled); -} - -} // namespace diff --git a/base/worker_pool_win.cc b/base/worker_pool_win.cc deleted file mode 100644 index 3f383b9..0000000 --- a/base/worker_pool_win.cc +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/worker_pool.h" - -#include "base/logging.h" -#include "base/task.h" - -namespace { - -DWORD CALLBACK WorkItemCallback(void* param) { - Task* task = static_cast(param); - task->Run(); - delete task; - return 0; -} - -} // namespace - -bool WorkerPool::PostTask(const tracked_objects::Location& from_here, - Task* task, bool task_is_slow) { - task->SetBirthPlace(from_here); - - ULONG flags = 0; - if (task_is_slow) - flags |= WT_EXECUTELONGFUNCTION; - - if (!QueueUserWorkItem(WorkItemCallback, task, flags)) { - DLOG(ERROR) << "QueueUserWorkItem failed: " << GetLastError(); - delete task; - return false; - } - - return true; -} diff --git a/chrome/browser/jankometer.cc b/chrome/browser/jankometer.cc index ed4f9fd..a2f63c0 100644 --- a/chrome/browser/jankometer.cc +++ b/chrome/browser/jankometer.cc @@ -13,9 +13,9 @@ #include "base/metrics/stats_counters.h" #include "base/ref_counted.h" #include "base/string_util.h" +#include "base/threading/watchdog.h" #include "base/thread.h" #include "base/time.h" -#include "base/watchdog.h" #include "build/build_config.h" #include "chrome/browser/browser_process.h" #include "chrome/browser/browser_thread.h" @@ -55,7 +55,7 @@ const bool kPlaySounds = false; //------------------------------------------------------------------------------ // Provide a special watchdog to make it easy to set the breakpoint on this // class only. -class JankWatchdog : public Watchdog { +class JankWatchdog : public base::Watchdog { public: JankWatchdog(const TimeDelta& duration, const std::string& thread_watched_name, diff --git a/chrome/browser/printing/printing_layout_uitest.cc b/chrome/browser/printing/printing_layout_uitest.cc index 754e8f5..1198705 100644 --- a/chrome/browser/printing/printing_layout_uitest.cc +++ b/chrome/browser/printing/printing_layout_uitest.cc @@ -4,8 +4,8 @@ #include "base/command_line.h" #include "base/file_util.h" -#include "base/simple_thread.h" #include "base/test/test_file_util.h" +#include "base/threading/simple_thread.h" #include "chrome/test/automation/tab_proxy.h" #include "chrome/test/ui/ui_test.h" #include "net/test/test_server.h" diff --git a/chrome/browser/renderer_host/render_message_filter.cc b/chrome/browser/renderer_host/render_message_filter.cc index b5adbaa..5f638ab 100644 --- a/chrome/browser/renderer_host/render_message_filter.cc +++ b/chrome/browser/renderer_host/render_message_filter.cc @@ -12,9 +12,9 @@ #include "base/process_util.h" #include "base/shared_memory.h" #include "base/sys_string_conversions.h" +#include "base/threading/worker_pool.h" #include "base/thread.h" #include "base/utf_string_conversions.h" -#include "base/worker_pool.h" #include "chrome/browser/automation/automation_resource_message_filter.h" #include "chrome/browser/browser_process.h" #include "chrome/browser/browser_thread.h" @@ -1352,7 +1352,7 @@ void RenderMessageFilter::OnKeygen(uint32 key_size_index, VLOG(1) << "Dispatching keygen task to worker pool."; // Dispatch to worker pool, so we do not block the IO thread. - if (!WorkerPool::PostTask( + if (!base::WorkerPool::PostTask( FROM_HERE, NewRunnableMethod( this, &RenderMessageFilter::OnKeygenOnWorkerThread, diff --git a/chrome/browser/ui/cocoa/keystone_glue.mm b/chrome/browser/ui/cocoa/keystone_glue.mm index b42375a..79e95fc 100644 --- a/chrome/browser/ui/cocoa/keystone_glue.mm +++ b/chrome/browser/ui/cocoa/keystone_glue.mm @@ -17,7 +17,7 @@ #include "base/sys_string_conversions.h" #include "base/ref_counted.h" #include "base/task.h" -#include "base/worker_pool.h" +#include "base/threading/worker_pool.h" #include "chrome/browser/ui/cocoa/authorization_util.h" #include "chrome/common/chrome_constants.h" #include "grit/chromium_strings.h" @@ -104,7 +104,7 @@ class PerformBridge : public base::RefCountedThreadSafe { DCHECK(sel); scoped_refptr op = new PerformBridge(target, sel, arg); - WorkerPool::PostTask( + base::WorkerPool::PostTask( FROM_HERE, NewRunnableMethod(op.get(), &PerformBridge::Run), true); } diff --git a/chrome/gpu/gpu_thread.cc b/chrome/gpu/gpu_thread.cc index e95d27f..0c64e89 100644 --- a/chrome/gpu/gpu_thread.cc +++ b/chrome/gpu/gpu_thread.cc @@ -9,7 +9,7 @@ #include "app/gfx/gl/gl_context.h" #include "base/command_line.h" -#include "base/worker_pool.h" +#include "base/threading/worker_pool.h" #include "build/build_config.h" #include "chrome/common/child_process.h" #include "chrome/common/child_process_logging.h" @@ -34,7 +34,7 @@ void GpuThread::Init(const base::Time& process_start_time) { #if defined(OS_WIN) // Asynchronously collect the DirectX diagnostics because this can take a // couple of seconds. - if (!WorkerPool::PostTask( + if (!base::WorkerPool::PostTask( FROM_HERE, NewRunnableFunction(&GpuThread::CollectDxDiagnostics, this), true)) { diff --git a/chrome/renderer/pepper_devices.h b/chrome/renderer/pepper_devices.h index 912657a..c043ad9 100644 --- a/chrome/renderer/pepper_devices.h +++ b/chrome/renderer/pepper_devices.h @@ -10,7 +10,7 @@ #include "base/basictypes.h" #include "base/scoped_ptr.h" #include "base/shared_memory.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "chrome/common/render_messages.h" #include "chrome/renderer/audio_message_filter.h" #include "gfx/rect.h" diff --git a/net/base/cert_verifier.cc b/net/base/cert_verifier.cc index 4b3d904..3d22dec 100644 --- a/net/base/cert_verifier.cc +++ b/net/base/cert_verifier.cc @@ -8,7 +8,7 @@ #include "base/lock.h" #include "base/message_loop.h" #include "base/stl_util-inl.h" -#include "base/worker_pool.h" +#include "base/threading/worker_pool.h" #include "net/base/net_errors.h" #include "net/base/x509_certificate.h" @@ -136,9 +136,9 @@ class CertVerifierWorker { bool Start() { DCHECK_EQ(MessageLoop::current(), origin_loop_); - return WorkerPool::PostTask( - FROM_HERE, NewRunnableMethod(this, &CertVerifierWorker::Run), - true /* task is slow */); + return base::WorkerPool::PostTask( + FROM_HERE, NewRunnableMethod(this, &CertVerifierWorker::Run), + true /* task is slow */); } // Cancel is called from the origin loop when the CertVerifier is getting diff --git a/net/base/dnsrr_resolver.cc b/net/base/dnsrr_resolver.cc index 30031f3..393c213 100644 --- a/net/base/dnsrr_resolver.cc +++ b/net/base/dnsrr_resolver.cc @@ -15,7 +15,7 @@ #include "base/stl_util-inl.h" #include "base/string_piece.h" #include "base/task.h" -#include "base/worker_pool.h" +#include "base/threading/worker_pool.h" #include "net/base/dns_reload_timer.h" #include "net/base/dns_util.h" #include "net/base/net_errors.h" @@ -139,9 +139,9 @@ class RRResolverWorker { bool Start() { DCHECK_EQ(MessageLoop::current(), origin_loop_); - return WorkerPool::PostTask( - FROM_HERE, NewRunnableMethod(this, &RRResolverWorker::Run), - true /* task is slow */); + return base::WorkerPool::PostTask( + FROM_HERE, NewRunnableMethod(this, &RRResolverWorker::Run), + true /* task is slow */); } // Cancel is called from the origin loop when the DnsRRResolver is getting diff --git a/net/base/file_stream_posix.cc b/net/base/file_stream_posix.cc index 1e20efe..887d9c1 100644 --- a/net/base/file_stream_posix.cc +++ b/net/base/file_stream_posix.cc @@ -21,8 +21,8 @@ #include "base/message_loop.h" #include "base/metrics/histogram.h" #include "base/string_util.h" +#include "base/threading/worker_pool.h" #include "base/waitable_event.h" -#include "base/worker_pool.h" #include "net/base/net_errors.h" // We cast back and forth, so make sure it's the size we're expecting. @@ -250,11 +250,11 @@ void FileStream::AsyncContext::InitiateAsyncRead( DCHECK(!callback_); callback_ = callback; - WorkerPool::PostTask(FROM_HERE, - new BackgroundReadTask( - file, buf, buf_len, - &background_io_completed_callback_), - true /* task_is_slow */); + base::WorkerPool::PostTask(FROM_HERE, + new BackgroundReadTask( + file, buf, buf_len, + &background_io_completed_callback_), + true /* task_is_slow */); } void FileStream::AsyncContext::InitiateAsyncWrite( @@ -263,11 +263,11 @@ void FileStream::AsyncContext::InitiateAsyncWrite( DCHECK(!callback_); callback_ = callback; - WorkerPool::PostTask(FROM_HERE, - new BackgroundWriteTask( - file, buf, buf_len, - &background_io_completed_callback_), - true /* task_is_slow */); + base::WorkerPool::PostTask(FROM_HERE, + new BackgroundWriteTask( + file, buf, buf_len, + &background_io_completed_callback_), + true /* task_is_slow */); } void FileStream::AsyncContext::OnBackgroundIOCompleted(int result) { diff --git a/net/base/host_resolver_impl.cc b/net/base/host_resolver_impl.cc index b53ec6d..6e4637c 100644 --- a/net/base/host_resolver_impl.cc +++ b/net/base/host_resolver_impl.cc @@ -24,10 +24,10 @@ #include "base/metrics/histogram.h" #include "base/stl_util-inl.h" #include "base/string_util.h" +#include "base/threading/worker_pool.h" #include "base/time.h" #include "base/utf_string_conversions.h" #include "base/values.h" -#include "base/worker_pool.h" #include "net/base/address_list.h" #include "net/base/address_list_net_log_param.h" #include "net/base/host_port_pair.h" @@ -382,7 +382,7 @@ class HostResolverImpl::Job start_time_ = base::TimeTicks::Now(); // Dispatch the job to a worker thread. - if (!WorkerPool::PostTask(FROM_HERE, + if (!base::WorkerPool::PostTask(FROM_HERE, NewRunnableMethod(this, &Job::DoLookup), true)) { NOTREACHED(); @@ -650,7 +650,7 @@ class HostResolverImpl::IPv6ProbeJob return; DCHECK(IsOnOriginThread()); const bool kIsSlow = true; - WorkerPool::PostTask( + base::WorkerPool::PostTask( FROM_HERE, NewRunnableMethod(this, &IPv6ProbeJob::DoProbe), kIsSlow); } diff --git a/net/base/keygen_handler_unittest.cc b/net/base/keygen_handler_unittest.cc index f4251f2..10ccc95 100644 --- a/net/base/keygen_handler_unittest.cc +++ b/net/base/keygen_handler_unittest.cc @@ -11,9 +11,9 @@ #include "base/logging.h" #include "base/nss_util.h" #include "base/task.h" +#include "base/threading/worker_pool.h" #include "base/thread_restrictions.h" #include "base/waitable_event.h" -#include "base/worker_pool.h" #include "testing/gtest/include/gtest/gtest.h" #if defined(USE_NSS) @@ -124,10 +124,10 @@ TEST_F(KeygenHandlerTest, ConcurrencyTest) { std::string results[NUM_HANDLERS]; for (int i = 0; i < NUM_HANDLERS; i++) { events[i] = new base::WaitableEvent(false, false); - WorkerPool::PostTask(FROM_HERE, - new ConcurrencyTestTask(events[i], "some challenge", - &results[i]), - true); + base::WorkerPool::PostTask( + FROM_HERE, + new ConcurrencyTestTask(events[i], "some challenge", &results[i]), + true); } for (int i = 0; i < NUM_HANDLERS; i++) { diff --git a/net/base/test_completion_callback_unittest.cc b/net/base/test_completion_callback_unittest.cc index c19aae0..a0ae9bf 100644 --- a/net/base/test_completion_callback_unittest.cc +++ b/net/base/test_completion_callback_unittest.cc @@ -1,13 +1,13 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // Illustrates how to use worker threads that issue completion callbacks -#include "testing/gtest/include/gtest/gtest.h" +#include "base/threading/worker_pool.h" #include "net/base/completion_callback.h" #include "net/base/test_completion_callback.h" -#include "base/worker_pool.h" +#include "testing/gtest/include/gtest/gtest.h" #include "testing/platform_test.h" typedef PlatformTest TestCompletionCallbackTest; @@ -102,7 +102,7 @@ bool ExampleEmployer::DoSomething(CompletionCallback* callback) { request_ = new ExampleWorker(this, callback); // Dispatch to worker thread... - if (!WorkerPool::PostTask(FROM_HERE, + if (!base::WorkerPool::PostTask(FROM_HERE, NewRunnableMethod(request_.get(), &ExampleWorker::DoWork), true)) { NOTREACHED(); request_ = NULL; diff --git a/net/disk_cache/backend_impl.cc b/net/disk_cache/backend_impl.cc index 96f8723..4eab2675 100644 --- a/net/disk_cache/backend_impl.cc +++ b/net/disk_cache/backend_impl.cc @@ -13,10 +13,10 @@ #include "base/string_util.h" #include "base/stringprintf.h" #include "base/sys_info.h" +#include "base/threading/worker_pool.h" #include "base/thread_restrictions.h" #include "base/time.h" #include "base/timer.h" -#include "base/worker_pool.h" #include "net/base/net_errors.h" #include "net/disk_cache/cache_util.h" #include "net/disk_cache/entry_impl.h" @@ -142,7 +142,7 @@ bool DelayedCacheCleanup(const FilePath& full_path) { return false; } - WorkerPool::PostTask(FROM_HERE, new CleanupTask(path, name_str), true); + base::WorkerPool::PostTask(FROM_HERE, new CleanupTask(path, name_str), true); return true; } diff --git a/net/disk_cache/file_posix.cc b/net/disk_cache/file_posix.cc index 1870c0b..01dafd3 100644 --- a/net/disk_cache/file_posix.cc +++ b/net/disk_cache/file_posix.cc @@ -7,7 +7,7 @@ #include #include "base/logging.h" -#include "base/worker_pool.h" +#include "base/threading/worker_pool.h" #include "net/disk_cache/disk_cache.h" #include "net/disk_cache/in_flight_io.h" @@ -113,7 +113,7 @@ void FileInFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len, new FileBackgroundIO(file, buf, buf_len, offset, callback, this)); file->AddRef(); // Balanced on OnOperationComplete() - WorkerPool::PostTask(FROM_HERE, + base::WorkerPool::PostTask(FROM_HERE, NewRunnableMethod(operation.get(), &FileBackgroundIO::Read), true); OnOperationPosted(operation); } @@ -125,7 +125,7 @@ void FileInFlightIO::PostWrite(disk_cache::File* file, const void* buf, new FileBackgroundIO(file, buf, buf_len, offset, callback, this)); file->AddRef(); // Balanced on OnOperationComplete() - WorkerPool::PostTask(FROM_HERE, + base::WorkerPool::PostTask(FROM_HERE, NewRunnableMethod(operation.get(), &FileBackgroundIO::Write), true); OnOperationPosted(operation); } diff --git a/net/proxy/polling_proxy_config_service.cc b/net/proxy/polling_proxy_config_service.cc index 2db9792..98a164d 100644 --- a/net/proxy/polling_proxy_config_service.cc +++ b/net/proxy/polling_proxy_config_service.cc @@ -8,7 +8,7 @@ #include "base/message_loop_proxy.h" #include "base/observer_list.h" #include "base/scoped_ptr.h" -#include "base/worker_pool.h" +#include "base/threading/worker_pool.h" #include "net/proxy/proxy_config.h" namespace net { @@ -88,10 +88,10 @@ class PollingProxyConfigService::Core last_poll_time_ = base::TimeTicks::Now(); poll_task_outstanding_ = true; poll_task_queued_ = false; - WorkerPool::PostTask( + base::WorkerPool::PostTask( FROM_HERE, - NewRunnableMethod( - this, &Core::PollOnWorkerThread, get_config_func_), true); + NewRunnableMethod(this, &Core::PollOnWorkerThread, get_config_func_), + true); } private: diff --git a/net/tools/flip_server/flip_in_mem_edsm_server.cc b/net/tools/flip_server/flip_in_mem_edsm_server.cc index 1969e0c..a378e72 100644 --- a/net/tools/flip_server/flip_in_mem_edsm_server.cc +++ b/net/tools/flip_server/flip_in_mem_edsm_server.cc @@ -19,7 +19,7 @@ #include "base/command_line.h" #include "base/logging.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "base/timer.h" #include "base/lock.h" #include "net/spdy/spdy_frame_builder.h" diff --git a/net/url_request/url_request_file_job.cc b/net/url_request/url_request_file_job.cc index 230ee06..d7ebe0a 100644 --- a/net/url_request/url_request_file_job.cc +++ b/net/url_request/url_request_file_job.cc @@ -23,6 +23,7 @@ #include "base/message_loop.h" #include "base/platform_file.h" #include "base/string_util.h" +#include "base/threading/worker_pool.h" #include "base/thread_restrictions.h" #include "build/build_config.h" #include "googleurl/src/gurl.h" @@ -36,10 +37,6 @@ #include "net/url_request/url_request_error_job.h" #include "net/url_request/url_request_file_dir_job.h" -#if defined(OS_WIN) -#include "base/worker_pool.h" -#endif - namespace net { #if defined(OS_WIN) @@ -136,7 +133,7 @@ void URLRequestFileJob::Start() { if (!file_path_.value().compare(0, 2, L"\\\\")) { DCHECK(!async_resolver_); async_resolver_ = new AsyncResolver(this); - WorkerPool::PostTask(FROM_HERE, NewRunnableMethod( + base::WorkerPool::PostTask(FROM_HERE, NewRunnableMethod( async_resolver_.get(), &AsyncResolver::Resolve, file_path_), true); return; } diff --git a/ppapi/proxy/ppb_audio_proxy.cc b/ppapi/proxy/ppb_audio_proxy.cc index 4468c4e..e23ac4e 100644 --- a/ppapi/proxy/ppb_audio_proxy.cc +++ b/ppapi/proxy/ppb_audio_proxy.cc @@ -4,7 +4,7 @@ #include "ppapi/proxy/ppb_audio_proxy.h" -#include "base/simple_thread.h" +#include "base/threading/simple_thread.h" #include "ppapi/c/dev/ppb_audio_dev.h" #include "ppapi/c/dev/ppb_audio_trusted_dev.h" #include "ppapi/c/pp_errors.h" diff --git a/ppapi/shared_impl/audio_impl.h b/ppapi/shared_impl/audio_impl.h index 3a45142..f4d5681 100644 --- a/ppapi/shared_impl/audio_impl.h +++ b/ppapi/shared_impl/audio_impl.h @@ -6,9 +6,9 @@ #define PPAPI_SHARED_IMPL_AUDIO_IMPL_H_ #include "base/scoped_ptr.h" -#include "base/simple_thread.h" #include "base/shared_memory.h" #include "base/sync_socket.h" +#include "base/threading/simple_thread.h" #include "ppapi/c/dev/ppb_audio_dev.h" namespace pp { -- cgit v1.1