diff options
author | dcheng@chromium.org <dcheng@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-11-29 03:55:20 +0000 |
---|---|---|
committer | dcheng@chromium.org <dcheng@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-11-29 03:55:20 +0000 |
commit | 09b36f984ea775b05d945aeae13b464c9264d3d2 (patch) | |
tree | 6530b1dadef4d380126a874c2e65ee4cc4f398ce /base | |
parent | b9281ac4b0bbe0068d47ff83e153603734c2b264 (diff) | |
download | chromium_src-09b36f984ea775b05d945aeae13b464c9264d3d2.zip chromium_src-09b36f984ea775b05d945aeae13b464c9264d3d2.tar.gz chromium_src-09b36f984ea775b05d945aeae13b464c9264d3d2.tar.bz2 |
Remove several more custom Task implementations from base/
BUG=none
TEST=trybots
Review URL: http://codereview.chromium.org/8702016
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@111865 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/threading/worker_pool_posix_unittest.cc | 131 | ||||
-rw-r--r-- | base/win/object_watcher.cc | 100 | ||||
-rw-r--r-- | base/win/object_watcher.h | 10 |
3 files changed, 90 insertions, 151 deletions
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc index 97e8807..1b700e1 100644 --- a/base/threading/worker_pool_posix_unittest.cc +++ b/base/threading/worker_pool_posix_unittest.cc @@ -6,9 +6,10 @@ #include <set> +#include "base/bind.h" +#include "base/callback.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" -#include "base/task.h" #include "base/threading/platform_thread.h" #include "base/synchronization/waitable_event.h" #include "testing/gtest/include/gtest/gtest.h" @@ -50,76 +51,42 @@ namespace { // threads used if more than one IncrementingTask is consecutively posted to the // thread pool, since the first one might finish executing before the subsequent // PostTask() calls get invoked. -class IncrementingTask : public Task { - public: - IncrementingTask(Lock* counter_lock, - int* counter, - Lock* unique_threads_lock, - std::set<PlatformThreadId>* unique_threads) - : counter_lock_(counter_lock), - unique_threads_lock_(unique_threads_lock), - unique_threads_(unique_threads), - counter_(counter) {} - - virtual void Run() { - AddSelfToUniqueThreadSet(); - base::AutoLock locked(*counter_lock_); - (*counter_)++; - } - - void AddSelfToUniqueThreadSet() { - base::AutoLock locked(*unique_threads_lock_); - unique_threads_->insert(PlatformThread::CurrentId()); +void IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads) { + { + base::AutoLock locked(*unique_threads_lock); + unique_threads->insert(PlatformThread::CurrentId()); } - - private: - Lock* counter_lock_; - Lock* unique_threads_lock_; - std::set<PlatformThreadId>* unique_threads_; - int* counter_; - - DISALLOW_COPY_AND_ASSIGN(IncrementingTask); -}; + base::AutoLock locked(*counter_lock); + (*counter)++; +} // BlockingIncrementingTask is a simple wrapper around IncrementingTask that // allows for waiting at the start of Run() for a WaitableEvent to be signalled. -class BlockingIncrementingTask : public Task { - public: - BlockingIncrementingTask(Lock* counter_lock, - int* counter, - Lock* unique_threads_lock, - std::set<PlatformThreadId>* unique_threads, - Lock* num_waiting_to_start_lock, - int* num_waiting_to_start, - ConditionVariable* num_waiting_to_start_cv, - base::WaitableEvent* start) - : incrementer_( - counter_lock, counter, unique_threads_lock, unique_threads), - num_waiting_to_start_lock_(num_waiting_to_start_lock), - num_waiting_to_start_(num_waiting_to_start), - num_waiting_to_start_cv_(num_waiting_to_start_cv), - start_(start) {} - - virtual void Run() { - { - base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); - (*num_waiting_to_start_)++; - } - num_waiting_to_start_cv_->Signal(); - start_->Wait(); - incrementer_.Run(); - } - - private: - IncrementingTask incrementer_; - Lock* num_waiting_to_start_lock_; - int* num_waiting_to_start_; - ConditionVariable* num_waiting_to_start_cv_; - base::WaitableEvent* start_; - - DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); +struct BlockingIncrementingTaskArgs { + Lock* counter_lock; + int* counter; + Lock* unique_threads_lock; + std::set<PlatformThreadId>* unique_threads; + Lock* num_waiting_to_start_lock; + int* num_waiting_to_start; + ConditionVariable* num_waiting_to_start_cv; + base::WaitableEvent* start; }; +void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) { + { + base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock); + (*args.num_waiting_to_start)++; + } + args.num_waiting_to_start_cv->Signal(); + args.start->Wait(); + IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock, + args.unique_threads); +} + class PosixDynamicThreadPoolTest : public testing::Test { protected: PosixDynamicThreadPoolTest() @@ -153,16 +120,18 @@ class PosixDynamicThreadPoolTest : public testing::Test { } } - Task* CreateNewIncrementingTask() { - return new IncrementingTask(&counter_lock_, &counter_, - &unique_threads_lock_, &unique_threads_); + base::Closure CreateNewIncrementingTaskCallback() { + return base::Bind(&IncrementingTask, &counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); } - Task* CreateNewBlockingIncrementingTask() { - return new BlockingIncrementingTask( + base::Closure CreateNewBlockingIncrementingTaskCallback() { + BlockingIncrementingTaskArgs args = { &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, &num_waiting_to_start_lock_, &num_waiting_to_start_, - &num_waiting_to_start_cv_, &start_); + &num_waiting_to_start_cv_, &start_ + }; + return base::Bind(&BlockingIncrementingTask, args); } scoped_refptr<base::PosixDynamicThreadPool> pool_; @@ -185,7 +154,7 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { EXPECT_EQ(0U, peer_.pending_tasks().size()); // Add one task and wait for it to be completed. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); WaitForIdleThreads(1); @@ -197,13 +166,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { // Add one task and wait for it to be completed. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); WaitForIdleThreads(1); // Add another 2 tasks. One should reuse the existing worker thread. - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); WaitForTasksToStart(2); start_.Signal(); @@ -216,8 +185,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { // Add two blocking tasks. - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; @@ -232,14 +201,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { TEST_F(PosixDynamicThreadPoolTest, Complex) { // Add two non blocking tasks and wait for them to finish. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); WaitForIdleThreads(1); // Add two blocking tasks, start them simultaneously, and wait for them to // finish. - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); WaitForTasksToStart(2); start_.Signal(); @@ -259,7 +228,7 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) { } // Add another non blocking task. There are no threads to reuse. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); WaitForIdleThreads(1); EXPECT_EQ(3U, unique_threads_.size()); diff --git a/base/win/object_watcher.cc b/base/win/object_watcher.cc index 4f5e7ab..be1e409 100644 --- a/base/win/object_watcher.cc +++ b/base/win/object_watcher.cc @@ -4,6 +4,7 @@ #include "base/win/object_watcher.h" +#include "base/bind.h" #include "base/logging.h" namespace base { @@ -11,30 +12,12 @@ namespace win { //----------------------------------------------------------------------------- -struct ObjectWatcher::Watch : public Task { - ObjectWatcher* watcher; // The associated ObjectWatcher instance - HANDLE object; // The object being watched - HANDLE wait_object; // Returned by RegisterWaitForSingleObject - MessageLoop* origin_loop; // Used to get back to the origin thread - Delegate* delegate; // Delegate to notify when signaled - bool did_signal; // DoneWaiting was called - - virtual void Run() { - // The watcher may have already been torn down, in which case we need to - // just get out of dodge. - if (!watcher) - return; - - DCHECK(did_signal); - watcher->StopWatching(); - - delegate->OnObjectSignaled(object); - } -}; - -//----------------------------------------------------------------------------- - -ObjectWatcher::ObjectWatcher() : watch_(NULL) { +ObjectWatcher::ObjectWatcher() + : weak_factory_(this), + object_(NULL), + wait_object_(NULL), + origin_loop_(NULL), + delegate_(NULL) { } ObjectWatcher::~ObjectWatcher() { @@ -42,30 +25,25 @@ ObjectWatcher::~ObjectWatcher() { } bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) { - if (watch_) { + if (wait_object_) { NOTREACHED() << "Already watching an object"; return false; } - Watch* watch = new Watch; - watch->watcher = this; - watch->object = object; - watch->origin_loop = MessageLoop::current(); - watch->delegate = delegate; - watch->did_signal = false; + origin_loop_ = MessageLoop::current(); + delegate_ = delegate; // Since our job is to just notice when an object is signaled and report the // result back to this thread, we can just run on a Windows wait thread. DWORD wait_flags = WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE; - if (!RegisterWaitForSingleObject(&watch->wait_object, object, DoneWaiting, - watch, INFINITE, wait_flags)) { + if (!RegisterWaitForSingleObject(&wait_object_, object, DoneWaiting, + this, INFINITE, wait_flags)) { NOTREACHED() << "RegisterWaitForSingleObject failed: " << GetLastError(); - delete watch; return false; } - watch_ = watch; + object_ = object; // We need to know if the current message loop is going away so we can // prevent the wait thread from trying to access a dead message loop. @@ -74,60 +52,46 @@ bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) { } bool ObjectWatcher::StopWatching() { - if (!watch_) + if (!wait_object_) return false; // Make sure ObjectWatcher is used in a single-threaded fashion. - DCHECK(watch_->origin_loop == MessageLoop::current()); + DCHECK(origin_loop_ == MessageLoop::current()); - // If DoneWaiting is in progress, we wait for it to finish. We know whether - // DoneWaiting happened or not by inspecting the did_signal flag. - if (!UnregisterWaitEx(watch_->wait_object, INVALID_HANDLE_VALUE)) { + // Blocking call to cancel the wait. Any callbacks already in progress will + // finish before we return from this call. + if (!UnregisterWaitEx(wait_object_, INVALID_HANDLE_VALUE)) { NOTREACHED() << "UnregisterWaitEx failed: " << GetLastError(); return false; } - // Make sure that we see any mutation to did_signal. This should be a no-op - // since we expect that UnregisterWaitEx resulted in a memory barrier, but - // just to be sure, we're going to be explicit. - MemoryBarrier(); - - // If the watch has been posted, then we need to make sure it knows not to do - // anything once it is run. - watch_->watcher = NULL; - - // If DoneWaiting was called, then the watch would have been posted as a - // task, and will therefore be deleted by the MessageLoop. Otherwise, we - // need to take care to delete it here. - if (!watch_->did_signal) - delete watch_; - - watch_ = NULL; + weak_factory_.InvalidateWeakPtrs(); + object_ = NULL; + wait_object_ = NULL; MessageLoop::current()->RemoveDestructionObserver(this); return true; } HANDLE ObjectWatcher::GetWatchedObject() { - if (!watch_) - return NULL; - - return watch_->object; + return object_; } // static void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) { DCHECK(!timed_out); - Watch* watch = static_cast<Watch*>(param); - - // Record that we ran this function. - watch->did_signal = true; + // The destructor blocks on any callbacks that are in flight, so we know that + // that is always a pointer to a valid ObjectWater. + ObjectWatcher* that = static_cast<ObjectWatcher*>(param); + that->origin_loop_->PostTask( + FROM_HERE, + base::Bind(&ObjectWatcher::Signal, that->weak_factory_.GetWeakPtr())); +} - // We rely on the locking in PostTask() to ensure that a memory barrier is - // provided, which in turn ensures our change to did_signal can be observed - // on the target thread. - watch->origin_loop->PostTask(FROM_HERE, watch); +void ObjectWatcher::Signal() { + StopWatching(); + delegate_->OnObjectSignaled(object_); } void ObjectWatcher::WillDestroyCurrentMessageLoop() { diff --git a/base/win/object_watcher.h b/base/win/object_watcher.h index f5a46eb..86ac2d0 100644 --- a/base/win/object_watcher.h +++ b/base/win/object_watcher.h @@ -9,6 +9,7 @@ #include <windows.h> #include "base/base_export.h" +#include "base/memory/weak_ptr.h" #include "base/message_loop.h" namespace base { @@ -79,12 +80,17 @@ class BASE_EXPORT ObjectWatcher : public MessageLoop::DestructionObserver { // Called on a background thread when done waiting. static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out); + void Signal(); + // MessageLoop::DestructionObserver implementation: virtual void WillDestroyCurrentMessageLoop(); // Internal state. - struct Watch; - Watch* watch_; + base::WeakPtrFactory<ObjectWatcher> weak_factory_; + HANDLE object_; // The object being watched + HANDLE wait_object_; // Returned by RegisterWaitForSingleObject + MessageLoop* origin_loop_; // Used to get back to the origin thread + Delegate* delegate_; // Delegate to notify when signaled DISALLOW_COPY_AND_ASSIGN(ObjectWatcher); }; |