summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authordcheng@chromium.org <dcheng@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-11-29 03:55:20 +0000
committerdcheng@chromium.org <dcheng@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-11-29 03:55:20 +0000
commit09b36f984ea775b05d945aeae13b464c9264d3d2 (patch)
tree6530b1dadef4d380126a874c2e65ee4cc4f398ce /base
parentb9281ac4b0bbe0068d47ff83e153603734c2b264 (diff)
downloadchromium_src-09b36f984ea775b05d945aeae13b464c9264d3d2.zip
chromium_src-09b36f984ea775b05d945aeae13b464c9264d3d2.tar.gz
chromium_src-09b36f984ea775b05d945aeae13b464c9264d3d2.tar.bz2
Remove several more custom Task implementations from base/
BUG=none TEST=trybots Review URL: http://codereview.chromium.org/8702016 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@111865 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/threading/worker_pool_posix_unittest.cc131
-rw-r--r--base/win/object_watcher.cc100
-rw-r--r--base/win/object_watcher.h10
3 files changed, 90 insertions, 151 deletions
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc
index 97e8807..1b700e1 100644
--- a/base/threading/worker_pool_posix_unittest.cc
+++ b/base/threading/worker_pool_posix_unittest.cc
@@ -6,9 +6,10 @@
#include <set>
+#include "base/bind.h"
+#include "base/callback.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
-#include "base/task.h"
#include "base/threading/platform_thread.h"
#include "base/synchronization/waitable_event.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -50,76 +51,42 @@ namespace {
// threads used if more than one IncrementingTask is consecutively posted to the
// thread pool, since the first one might finish executing before the subsequent
// PostTask() calls get invoked.
-class IncrementingTask : public Task {
- public:
- IncrementingTask(Lock* counter_lock,
- int* counter,
- Lock* unique_threads_lock,
- std::set<PlatformThreadId>* unique_threads)
- : counter_lock_(counter_lock),
- unique_threads_lock_(unique_threads_lock),
- unique_threads_(unique_threads),
- counter_(counter) {}
-
- virtual void Run() {
- AddSelfToUniqueThreadSet();
- base::AutoLock locked(*counter_lock_);
- (*counter_)++;
- }
-
- void AddSelfToUniqueThreadSet() {
- base::AutoLock locked(*unique_threads_lock_);
- unique_threads_->insert(PlatformThread::CurrentId());
+void IncrementingTask(Lock* counter_lock,
+ int* counter,
+ Lock* unique_threads_lock,
+ std::set<PlatformThreadId>* unique_threads) {
+ {
+ base::AutoLock locked(*unique_threads_lock);
+ unique_threads->insert(PlatformThread::CurrentId());
}
-
- private:
- Lock* counter_lock_;
- Lock* unique_threads_lock_;
- std::set<PlatformThreadId>* unique_threads_;
- int* counter_;
-
- DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
-};
+ base::AutoLock locked(*counter_lock);
+ (*counter)++;
+}
// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
-class BlockingIncrementingTask : public Task {
- public:
- BlockingIncrementingTask(Lock* counter_lock,
- int* counter,
- Lock* unique_threads_lock,
- std::set<PlatformThreadId>* unique_threads,
- Lock* num_waiting_to_start_lock,
- int* num_waiting_to_start,
- ConditionVariable* num_waiting_to_start_cv,
- base::WaitableEvent* start)
- : incrementer_(
- counter_lock, counter, unique_threads_lock, unique_threads),
- num_waiting_to_start_lock_(num_waiting_to_start_lock),
- num_waiting_to_start_(num_waiting_to_start),
- num_waiting_to_start_cv_(num_waiting_to_start_cv),
- start_(start) {}
-
- virtual void Run() {
- {
- base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
- (*num_waiting_to_start_)++;
- }
- num_waiting_to_start_cv_->Signal();
- start_->Wait();
- incrementer_.Run();
- }
-
- private:
- IncrementingTask incrementer_;
- Lock* num_waiting_to_start_lock_;
- int* num_waiting_to_start_;
- ConditionVariable* num_waiting_to_start_cv_;
- base::WaitableEvent* start_;
-
- DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
+struct BlockingIncrementingTaskArgs {
+ Lock* counter_lock;
+ int* counter;
+ Lock* unique_threads_lock;
+ std::set<PlatformThreadId>* unique_threads;
+ Lock* num_waiting_to_start_lock;
+ int* num_waiting_to_start;
+ ConditionVariable* num_waiting_to_start_cv;
+ base::WaitableEvent* start;
};
+void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
+ {
+ base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
+ (*args.num_waiting_to_start)++;
+ }
+ args.num_waiting_to_start_cv->Signal();
+ args.start->Wait();
+ IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
+ args.unique_threads);
+}
+
class PosixDynamicThreadPoolTest : public testing::Test {
protected:
PosixDynamicThreadPoolTest()
@@ -153,16 +120,18 @@ class PosixDynamicThreadPoolTest : public testing::Test {
}
}
- Task* CreateNewIncrementingTask() {
- return new IncrementingTask(&counter_lock_, &counter_,
- &unique_threads_lock_, &unique_threads_);
+ base::Closure CreateNewIncrementingTaskCallback() {
+ return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
+ &unique_threads_lock_, &unique_threads_);
}
- Task* CreateNewBlockingIncrementingTask() {
- return new BlockingIncrementingTask(
+ base::Closure CreateNewBlockingIncrementingTaskCallback() {
+ BlockingIncrementingTaskArgs args = {
&counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
&num_waiting_to_start_lock_, &num_waiting_to_start_,
- &num_waiting_to_start_cv_, &start_);
+ &num_waiting_to_start_cv_, &start_
+ };
+ return base::Bind(&BlockingIncrementingTask, args);
}
scoped_refptr<base::PosixDynamicThreadPool> pool_;
@@ -185,7 +154,7 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {
EXPECT_EQ(0U, peer_.pending_tasks().size());
// Add one task and wait for it to be completed.
- pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
WaitForIdleThreads(1);
@@ -197,13 +166,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {
TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
// Add one task and wait for it to be completed.
- pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
WaitForIdleThreads(1);
// Add another 2 tasks. One should reuse the existing worker thread.
- pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
- pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
WaitForTasksToStart(2);
start_.Signal();
@@ -216,8 +185,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
// Add two blocking tasks.
- pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
- pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
@@ -232,14 +201,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
TEST_F(PosixDynamicThreadPoolTest, Complex) {
// Add two non blocking tasks and wait for them to finish.
- pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
WaitForIdleThreads(1);
// Add two blocking tasks, start them simultaneously, and wait for them to
// finish.
- pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
- pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
+ pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
WaitForTasksToStart(2);
start_.Signal();
@@ -259,7 +228,7 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) {
}
// Add another non blocking task. There are no threads to reuse.
- pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
+ pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
WaitForIdleThreads(1);
EXPECT_EQ(3U, unique_threads_.size());
diff --git a/base/win/object_watcher.cc b/base/win/object_watcher.cc
index 4f5e7ab..be1e409 100644
--- a/base/win/object_watcher.cc
+++ b/base/win/object_watcher.cc
@@ -4,6 +4,7 @@
#include "base/win/object_watcher.h"
+#include "base/bind.h"
#include "base/logging.h"
namespace base {
@@ -11,30 +12,12 @@ namespace win {
//-----------------------------------------------------------------------------
-struct ObjectWatcher::Watch : public Task {
- ObjectWatcher* watcher; // The associated ObjectWatcher instance
- HANDLE object; // The object being watched
- HANDLE wait_object; // Returned by RegisterWaitForSingleObject
- MessageLoop* origin_loop; // Used to get back to the origin thread
- Delegate* delegate; // Delegate to notify when signaled
- bool did_signal; // DoneWaiting was called
-
- virtual void Run() {
- // The watcher may have already been torn down, in which case we need to
- // just get out of dodge.
- if (!watcher)
- return;
-
- DCHECK(did_signal);
- watcher->StopWatching();
-
- delegate->OnObjectSignaled(object);
- }
-};
-
-//-----------------------------------------------------------------------------
-
-ObjectWatcher::ObjectWatcher() : watch_(NULL) {
+ObjectWatcher::ObjectWatcher()
+ : weak_factory_(this),
+ object_(NULL),
+ wait_object_(NULL),
+ origin_loop_(NULL),
+ delegate_(NULL) {
}
ObjectWatcher::~ObjectWatcher() {
@@ -42,30 +25,25 @@ ObjectWatcher::~ObjectWatcher() {
}
bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) {
- if (watch_) {
+ if (wait_object_) {
NOTREACHED() << "Already watching an object";
return false;
}
- Watch* watch = new Watch;
- watch->watcher = this;
- watch->object = object;
- watch->origin_loop = MessageLoop::current();
- watch->delegate = delegate;
- watch->did_signal = false;
+ origin_loop_ = MessageLoop::current();
+ delegate_ = delegate;
// Since our job is to just notice when an object is signaled and report the
// result back to this thread, we can just run on a Windows wait thread.
DWORD wait_flags = WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE;
- if (!RegisterWaitForSingleObject(&watch->wait_object, object, DoneWaiting,
- watch, INFINITE, wait_flags)) {
+ if (!RegisterWaitForSingleObject(&wait_object_, object, DoneWaiting,
+ this, INFINITE, wait_flags)) {
NOTREACHED() << "RegisterWaitForSingleObject failed: " << GetLastError();
- delete watch;
return false;
}
- watch_ = watch;
+ object_ = object;
// We need to know if the current message loop is going away so we can
// prevent the wait thread from trying to access a dead message loop.
@@ -74,60 +52,46 @@ bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) {
}
bool ObjectWatcher::StopWatching() {
- if (!watch_)
+ if (!wait_object_)
return false;
// Make sure ObjectWatcher is used in a single-threaded fashion.
- DCHECK(watch_->origin_loop == MessageLoop::current());
+ DCHECK(origin_loop_ == MessageLoop::current());
- // If DoneWaiting is in progress, we wait for it to finish. We know whether
- // DoneWaiting happened or not by inspecting the did_signal flag.
- if (!UnregisterWaitEx(watch_->wait_object, INVALID_HANDLE_VALUE)) {
+ // Blocking call to cancel the wait. Any callbacks already in progress will
+ // finish before we return from this call.
+ if (!UnregisterWaitEx(wait_object_, INVALID_HANDLE_VALUE)) {
NOTREACHED() << "UnregisterWaitEx failed: " << GetLastError();
return false;
}
- // Make sure that we see any mutation to did_signal. This should be a no-op
- // since we expect that UnregisterWaitEx resulted in a memory barrier, but
- // just to be sure, we're going to be explicit.
- MemoryBarrier();
-
- // If the watch has been posted, then we need to make sure it knows not to do
- // anything once it is run.
- watch_->watcher = NULL;
-
- // If DoneWaiting was called, then the watch would have been posted as a
- // task, and will therefore be deleted by the MessageLoop. Otherwise, we
- // need to take care to delete it here.
- if (!watch_->did_signal)
- delete watch_;
-
- watch_ = NULL;
+ weak_factory_.InvalidateWeakPtrs();
+ object_ = NULL;
+ wait_object_ = NULL;
MessageLoop::current()->RemoveDestructionObserver(this);
return true;
}
HANDLE ObjectWatcher::GetWatchedObject() {
- if (!watch_)
- return NULL;
-
- return watch_->object;
+ return object_;
}
// static
void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) {
DCHECK(!timed_out);
- Watch* watch = static_cast<Watch*>(param);
-
- // Record that we ran this function.
- watch->did_signal = true;
+ // The destructor blocks on any callbacks that are in flight, so we know that
+ // that is always a pointer to a valid ObjectWater.
+ ObjectWatcher* that = static_cast<ObjectWatcher*>(param);
+ that->origin_loop_->PostTask(
+ FROM_HERE,
+ base::Bind(&ObjectWatcher::Signal, that->weak_factory_.GetWeakPtr()));
+}
- // We rely on the locking in PostTask() to ensure that a memory barrier is
- // provided, which in turn ensures our change to did_signal can be observed
- // on the target thread.
- watch->origin_loop->PostTask(FROM_HERE, watch);
+void ObjectWatcher::Signal() {
+ StopWatching();
+ delegate_->OnObjectSignaled(object_);
}
void ObjectWatcher::WillDestroyCurrentMessageLoop() {
diff --git a/base/win/object_watcher.h b/base/win/object_watcher.h
index f5a46eb..86ac2d0 100644
--- a/base/win/object_watcher.h
+++ b/base/win/object_watcher.h
@@ -9,6 +9,7 @@
#include <windows.h>
#include "base/base_export.h"
+#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
namespace base {
@@ -79,12 +80,17 @@ class BASE_EXPORT ObjectWatcher : public MessageLoop::DestructionObserver {
// Called on a background thread when done waiting.
static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out);
+ void Signal();
+
// MessageLoop::DestructionObserver implementation:
virtual void WillDestroyCurrentMessageLoop();
// Internal state.
- struct Watch;
- Watch* watch_;
+ base::WeakPtrFactory<ObjectWatcher> weak_factory_;
+ HANDLE object_; // The object being watched
+ HANDLE wait_object_; // Returned by RegisterWaitForSingleObject
+ MessageLoop* origin_loop_; // Used to get back to the origin thread
+ Delegate* delegate_; // Delegate to notify when signaled
DISALLOW_COPY_AND_ASSIGN(ObjectWatcher);
};