diff options
author | francoisk777@gmail.com <francoisk777@gmail.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-04-02 10:16:55 +0000 |
---|---|---|
committer | francoisk777@gmail.com <francoisk777@gmail.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-04-02 10:16:55 +0000 |
commit | ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a (patch) | |
tree | 1a6cc4b3cd2dae37cfec2b1099be4610862c17f6 /base | |
parent | 6fe6965e2d0e4ac248d6825a88b5e53a77ac5ffe (diff) | |
download | chromium_src-ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a.zip chromium_src-ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a.tar.gz chromium_src-ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a.tar.bz2 |
Implementation of SequencedTaskRunner based on SequencedWorkerPool.
Also includes specification tests for SequencedTaskRunner.
BUG=114330,114327
TEST=--gtest_filter=SequencedWorkerPoolTaskRunner*
Review URL: http://codereview.chromium.org/9663075
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@130113 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/base.gyp | 5 | ||||
-rw-r--r-- | base/base.gypi | 2 | ||||
-rw-r--r-- | base/test/sequenced_task_runner_test_template.cc | 263 | ||||
-rw-r--r-- | base/test/sequenced_task_runner_test_template.h | 243 | ||||
-rw-r--r-- | base/test/sequenced_worker_pool_owner.cc | 57 | ||||
-rw-r--r-- | base/test/sequenced_worker_pool_owner.h | 61 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 6 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 7 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_task_runner.cc | 75 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_task_runner.h | 74 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_task_runner_unittest.cc | 61 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_unittest.cc | 67 |
12 files changed, 855 insertions, 66 deletions
diff --git a/base/base.gyp b/base/base.gyp index 02c8585..ea037758 100644 --- a/base/base.gyp +++ b/base/base.gyp @@ -259,9 +259,12 @@ 'sys_string_conversions_unittest.cc', 'system_monitor/system_monitor_unittest.cc', 'template_util_unittest.cc', + 'test/sequenced_worker_pool_owner.cc', + 'test/sequenced_worker_pool_owner.h', 'test/trace_event_analyzer_unittest.cc', 'threading/non_thread_safe_unittest.cc', 'threading/platform_thread_unittest.cc', + 'threading/sequenced_worker_pool_task_runner_unittest.cc', 'threading/sequenced_worker_pool_unittest.cc', 'threading/simple_thread_unittest.cc', 'threading/thread_checker_unittest.cc', @@ -452,6 +455,8 @@ 'test/perf_test_suite.h', 'test/scoped_locale.cc', 'test/scoped_locale.h', + 'test/sequenced_task_runner_test_template.cc', + 'test/sequenced_task_runner_test_template.h', 'test/task_runner_test_template.cc', 'test/task_runner_test_template.h', 'test/test_file_util.h', diff --git a/base/base.gypi b/base/base.gypi index aecba8b..6707a98 100644 --- a/base/base.gypi +++ b/base/base.gypi @@ -346,6 +346,8 @@ 'threading/post_task_and_reply_impl.h', 'threading/sequenced_worker_pool.cc', 'threading/sequenced_worker_pool.h', + 'threading/sequenced_worker_pool_task_runner.cc', + 'threading/sequenced_worker_pool_task_runner.h', 'threading/simple_thread.cc', 'threading/simple_thread.h', 'threading/thread.cc', diff --git a/base/test/sequenced_task_runner_test_template.cc b/base/test/sequenced_task_runner_test_template.cc new file mode 100644 index 0000000..3b58973 --- /dev/null +++ b/base/test/sequenced_task_runner_test_template.cc @@ -0,0 +1,263 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/test/sequenced_task_runner_test_template.h" + +#include <ostream> + +#include "base/location.h" + +namespace base { + +namespace internal { + +TaskEvent::TaskEvent(int i, Type type) + : i(i), type(type) { +} + +SequencedTaskTracker::SequencedTaskTracker() : next_post_i_(0) { +} + +void SequencedTaskTracker::PostWrappedNonNestableTask( + const scoped_refptr<SequencedTaskRunner>& task_runner, + const Closure& task) { + AutoLock event_lock(lock_); + const int post_i = next_post_i_++; + Closure wrapped_task = Bind(&SequencedTaskTracker::RunTask, this, + task, post_i); + task_runner->PostNonNestableTask(FROM_HERE, wrapped_task); + TaskPosted(post_i); +} + +void SequencedTaskTracker::PostWrappedNestableTask( + const scoped_refptr<SequencedTaskRunner>& task_runner, + const Closure& task) { + AutoLock event_lock(lock_); + const int post_i = next_post_i_++; + Closure wrapped_task = Bind(&SequencedTaskTracker::RunTask, this, + task, post_i); + task_runner->PostTask(FROM_HERE, wrapped_task); + TaskPosted(post_i); +} + +void SequencedTaskTracker::PostWrappedDelayedNonNestableTask( + const scoped_refptr<SequencedTaskRunner>& task_runner, + const Closure& task, + TimeDelta delay) { + AutoLock event_lock(lock_); + const int post_i = next_post_i_++; + Closure wrapped_task = Bind(&SequencedTaskTracker::RunTask, this, + task, post_i); + task_runner->PostNonNestableDelayedTask(FROM_HERE, wrapped_task, delay); + TaskPosted(post_i); +} + +void SequencedTaskTracker::PostNonNestableTasks( + const scoped_refptr<SequencedTaskRunner>& task_runner, + int task_count) { + for (int i = 0; i < task_count; ++i) { + PostWrappedNonNestableTask(task_runner, Closure()); + } +} + +void SequencedTaskTracker::RunTask(const Closure& task, int task_i) { + TaskStarted(task_i); + if (!task.is_null()) + task.Run(); + TaskEnded(task_i); +} + +void SequencedTaskTracker::TaskPosted(int i) { + // Caller must own |lock_|. + events_.push_back(TaskEvent(i, TaskEvent::POST)); +} + +void SequencedTaskTracker::TaskStarted(int i) { + AutoLock lock(lock_); + events_.push_back(TaskEvent(i, TaskEvent::START)); +} + +void SequencedTaskTracker::TaskEnded(int i) { + AutoLock lock(lock_); + events_.push_back(TaskEvent(i, TaskEvent::END)); +} + +const std::vector<TaskEvent>& +SequencedTaskTracker::GetTaskEvents() const { + return events_; +} + +SequencedTaskTracker::~SequencedTaskTracker() { +} + +void PrintTo(const TaskEvent& event, std::ostream* os) { + *os << "(i=" << event.i << ", type="; + switch (event.type) { + case TaskEvent::POST: *os << "POST"; break; + case TaskEvent::START: *os << "START"; break; + case TaskEvent::END: *os << "END"; break; + } + *os << ")"; +} + +void SleepForOneSecond() { + base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1)); +} + +namespace { + +// Returns the task ordinals for the task event type |type| in the order that +// they were recorded. +std::vector<int> GetEventTypeOrder(const std::vector<TaskEvent>& events, + TaskEvent::Type type) { + std::vector<int> tasks; + std::vector<TaskEvent>::const_iterator event; + for (event = events.begin(); event != events.end(); ++event) { + if (event->type == type) + tasks.push_back(event->i); + } + return tasks; +} + +// Returns all task events for task |task_i|. +std::vector<TaskEvent::Type> GetEventsForTask( + const std::vector<TaskEvent>& events, + int task_i) { + std::vector<TaskEvent::Type> task_event_orders; + std::vector<TaskEvent>::const_iterator event; + for (event = events.begin(); event != events.end(); ++event) { + if (event->i == task_i) + task_event_orders.push_back(event->type); + } + return task_event_orders; +} + +// Checks that the task events for each task in |events| occur in the order +// {POST, START, END}, and that there is only one instance of each event type +// per task. +::testing::AssertionResult CheckEventOrdersForEachTask( + const std::vector<TaskEvent>& events, + int task_count) { + std::vector<TaskEvent::Type> expected_order; + expected_order.push_back(TaskEvent::POST); + expected_order.push_back(TaskEvent::START); + expected_order.push_back(TaskEvent::END); + + // This is O(n^2), but it runs fast enough currently so is not worth + // optimizing. + for (int i = 0; i < task_count; ++i) { + const std::vector<TaskEvent::Type> task_events = + GetEventsForTask(events, i); + if (task_events != expected_order) { + return ::testing::AssertionFailure() + << "Events for task " << i << " are out of order; expected: " + << ::testing::PrintToString(expected_order) << "; actual: " + << ::testing::PrintToString(task_events); + } + } + return ::testing::AssertionSuccess(); +} + +// Checks that no two tasks were running at the same time. I.e. the only +// events allowed between the START and END of a task are the POSTs of other +// tasks. +::testing::AssertionResult CheckNoTaskRunsOverlap( + const std::vector<TaskEvent>& events) { + // If > -1, we're currently inside a START, END pair. + int current_task_i = -1; + + std::vector<TaskEvent>::const_iterator event; + for (event = events.begin(); event != events.end(); ++event) { + bool spurious_event_found = false; + + if (current_task_i == -1) { // Not inside a START, END pair. + switch (event->type) { + case TaskEvent::POST: + break; + case TaskEvent::START: + current_task_i = event->i; + break; + case TaskEvent::END: + spurious_event_found = true; + break; + } + + } else { // Inside a START, END pair. + bool interleaved_task_detected = false; + + switch (event->type) { + case TaskEvent::POST: + if (event->i == current_task_i) + spurious_event_found = true; + break; + case TaskEvent::START: + interleaved_task_detected = true; + break; + case TaskEvent::END: + if (event->i != current_task_i) + interleaved_task_detected = true; + else + current_task_i = -1; + break; + } + + if (interleaved_task_detected) { + return ::testing::AssertionFailure() + << "Found event " << ::testing::PrintToString(*event) + << " between START and END events for task " << current_task_i + << "; event dump: " << ::testing::PrintToString(events); + } + } + + if (spurious_event_found) { + const int event_i = event - events.begin(); + return ::testing::AssertionFailure() + << "Spurious event " << ::testing::PrintToString(*event) + << " at position " << event_i << "; event dump: " + << ::testing::PrintToString(events); + } + } + + return ::testing::AssertionSuccess(); +} + +} // namespace + +::testing::AssertionResult CheckNonNestableInvariants( + const std::vector<TaskEvent>& events, + int task_count) { + const std::vector<int> post_order = + GetEventTypeOrder(events, TaskEvent::POST); + const std::vector<int> start_order = + GetEventTypeOrder(events, TaskEvent::START); + const std::vector<int> end_order = + GetEventTypeOrder(events, TaskEvent::END); + + if (start_order != post_order) { + return ::testing::AssertionFailure() + << "Expected START order (which equals actual POST order): \n" + << ::testing::PrintToString(post_order) + << "\n Actual START order:\n" + << ::testing::PrintToString(start_order); + } + + if (end_order != post_order) { + return ::testing::AssertionFailure() + << "Expected END order (which equals actual POST order): \n" + << ::testing::PrintToString(post_order) + << "\n Actual END order:\n" + << ::testing::PrintToString(end_order); + } + + const ::testing::AssertionResult result = + CheckEventOrdersForEachTask(events, task_count); + if (!result) + return result; + + return CheckNoTaskRunsOverlap(events); +} + +} // namespace internal + +} // namespace base diff --git a/base/test/sequenced_task_runner_test_template.h b/base/test/sequenced_task_runner_test_template.h new file mode 100644 index 0000000..ff337ff --- /dev/null +++ b/base/test/sequenced_task_runner_test_template.h @@ -0,0 +1,243 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// This class defines tests that implementations of SequencedTaskRunner should +// pass in order to be conformant. See task_runner_test_template.h for a +// description of how to use the constructs in this file; these work the same. + +#ifndef BASE_SEQUENCED_TASK_RUNNER_TEST_TEMPLATE_H_ +#define BASE_SEQUENCED_TASK_RUNNER_TEST_TEMPLATE_H_ +#pragma once + +#include <cstddef> +#include <iosfwd> +#include <vector> + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/callback.h" +#include "base/memory/ref_counted.h" +#include "base/sequenced_task_runner.h" +#include "base/synchronization/lock.h" +#include "base/time.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace internal { + +struct TaskEvent { + enum Type { POST, START, END }; + TaskEvent(int i, Type type); + int i; + Type type; +}; + +// Utility class used in the tests below. +class SequencedTaskTracker : public RefCountedThreadSafe<SequencedTaskTracker> { + public: + SequencedTaskTracker(); + + // Posts the non-nestable task |task|, and records its post event. + void PostWrappedNonNestableTask( + const scoped_refptr<SequencedTaskRunner>& task_runner, + const Closure& task); + + // Posts the nestable task |task|, and records its post event. + void PostWrappedNestableTask( + const scoped_refptr<SequencedTaskRunner>& task_runner, + const Closure& task); + + // Posts the delayed non-nestable task |task|, and records its post event. + void PostWrappedDelayedNonNestableTask( + const scoped_refptr<SequencedTaskRunner>& task_runner, + const Closure& task, + TimeDelta delay); + + // Posts |task_count| non-nestable tasks. + void PostNonNestableTasks( + const scoped_refptr<SequencedTaskRunner>& task_runner, + int task_count); + + const std::vector<TaskEvent>& GetTaskEvents() const; + + private: + friend class RefCountedThreadSafe<SequencedTaskTracker>; + + ~SequencedTaskTracker(); + + // A task which runs |task|, recording the start and end events. + void RunTask(const Closure& task, int task_i); + + // Records a post event for task |i|. The owner is expected to be holding + // |lock_| (unlike |TaskStarted| and |TaskEnded|). + void TaskPosted(int i); + + // Records a start event for task |i|. + void TaskStarted(int i); + + // Records a end event for task |i|. + void TaskEnded(int i); + + // Protects events_ and next_post_i_. + Lock lock_; + + // The events as they occurred for each task (protected by lock_). + std::vector<TaskEvent> events_; + + // The ordinal to be used for the next task-posting task (protected by + // lock_). + int next_post_i_; + + DISALLOW_COPY_AND_ASSIGN(SequencedTaskTracker); +}; + +void PrintTo(const TaskEvent& event, std::ostream* os); + +void SleepForOneSecond(); + +// Checks the non-nestable task invariants for all tasks in |events|. +// +// The invariants are: +// 1) Events started and ended in the same order that they were posted. +// 2) Events for an individual tasks occur in the order {POST, START, END}, +// and there is only one instance of each event type for a task. +// 3) The only events between a task's START and END events are the POSTs of +// other tasks. I.e. tasks were run sequentially, not interleaved. +::testing::AssertionResult CheckNonNestableInvariants( + const std::vector<TaskEvent>& events, + int task_count); + +} // namespace internal + +template <typename TaskRunnerTestDelegate> +class SequencedTaskRunnerTest : public testing::Test { + protected: + SequencedTaskRunnerTest() + : task_tracker_(new internal::SequencedTaskTracker()) {} + + const scoped_refptr<internal::SequencedTaskTracker> task_tracker_; + TaskRunnerTestDelegate delegate_; +}; + +TYPED_TEST_CASE_P(SequencedTaskRunnerTest); + +// This test posts N non-nestable tasks in sequence, and expects them to run +// in FIFO order, with no part of any two tasks' execution +// overlapping. I.e. that each task starts only after the previously-posted +// one has finished. +TYPED_TEST_P(SequencedTaskRunnerTest, SequentialNonNestable) { + const int task_count = 1000; + + this->delegate_.StartTaskRunner(); + const scoped_refptr<SequencedTaskRunner> task_runner = + this->delegate_.GetTaskRunner(); + + this->task_tracker_->PostWrappedNonNestableTask( + task_runner, Bind(&internal::SleepForOneSecond)); + for (int i = 1; i < task_count; ++i) { + this->task_tracker_->PostWrappedNonNestableTask(task_runner, Closure()); + } + + this->delegate_.StopTaskRunner(); + + EXPECT_TRUE(CheckNonNestableInvariants(this->task_tracker_->GetTaskEvents(), + task_count)); +} + +// This test posts N nestable tasks in sequence. It has the same expectations +// as SequentialNonNestable because even though the tasks are nestable, they +// will not be run nestedly in this case. +TYPED_TEST_P(SequencedTaskRunnerTest, SequentialNestable) { + const int task_count = 1000; + + this->delegate_.StartTaskRunner(); + const scoped_refptr<SequencedTaskRunner> task_runner = + this->delegate_.GetTaskRunner(); + + this->task_tracker_->PostWrappedNestableTask( + task_runner, + Bind(&internal::SleepForOneSecond)); + for (int i = 1; i < task_count; ++i) { + this->task_tracker_->PostWrappedNestableTask(task_runner, Closure()); + } + + this->delegate_.StopTaskRunner(); + + EXPECT_TRUE(CheckNonNestableInvariants(this->task_tracker_->GetTaskEvents(), + task_count)); +} + +// This test posts non-nestable tasks in order of increasing delay, and checks +// that that the tasks are run in FIFO order and that there is no execution +// overlap whatsoever between any two tasks. +TYPED_TEST_P(SequencedTaskRunnerTest, SequentialDelayedNonNestable) { + if (!this->delegate_.TaskRunnerHandlesNonZeroDelays()) { + DLOG(INFO) << "This SequencedTaskRunner doesn't handle " + "non-zero delays; skipping"; + return; + } + + const int task_count = 20; + const int delay_increment_ms = 50; + + this->delegate_.StartTaskRunner(); + const scoped_refptr<SequencedTaskRunner> task_runner = + this->delegate_.GetTaskRunner(); + + for (int i = 0; i < task_count; ++i) { + this->task_tracker_->PostWrappedDelayedNonNestableTask( + task_runner, + Closure(), + TimeDelta::FromMilliseconds(delay_increment_ms * i)); + } + + this->delegate_.StopTaskRunner(); + + EXPECT_TRUE(CheckNonNestableInvariants(this->task_tracker_->GetTaskEvents(), + task_count)); +} + +// This test posts a fast, non-nestable task from within each of a number of +// slow, non-nestable tasks and checks that they all run in the sequence they +// were posted in and that there is no execution overlap whatsoever. +TYPED_TEST_P(SequencedTaskRunnerTest, NonNestablePostFromNonNestableTask) { + const int parent_count = 10; + const int children_per_parent = 10; + + this->delegate_.StartTaskRunner(); + const scoped_refptr<SequencedTaskRunner> task_runner = + this->delegate_.GetTaskRunner(); + + for (int i = 0; i < parent_count; ++i) { + Closure task = Bind( + &internal::SequencedTaskTracker::PostNonNestableTasks, + this->task_tracker_, + task_runner, + children_per_parent); + this->task_tracker_->PostWrappedNonNestableTask(task_runner, task); + } + + this->delegate_.StopTaskRunner(); + + EXPECT_TRUE(CheckNonNestableInvariants( + this->task_tracker_->GetTaskEvents(), + parent_count * (children_per_parent + 1))); +} + +// TODO(francoisk777@gmail.com) Add a test, similiar to the above, which runs +// some tasked nestedly (which should be implemented in the test +// delegate). Also add, to the the test delegate, a predicate which checks +// whether the implementation supports nested tasks. +// + +REGISTER_TYPED_TEST_CASE_P(SequencedTaskRunnerTest, + SequentialNonNestable, + SequentialNestable, + SequentialDelayedNonNestable, + NonNestablePostFromNonNestableTask); + +} // namespace base + +#endif // BASE_TASK_RUNNER_TEST_TEMPLATE_H_ diff --git a/base/test/sequenced_worker_pool_owner.cc b/base/test/sequenced_worker_pool_owner.cc new file mode 100644 index 0000000..3d124fa --- /dev/null +++ b/base/test/sequenced_worker_pool_owner.cc @@ -0,0 +1,57 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/test/sequenced_worker_pool_owner.h" + +#include "base/location.h" +#include "base/message_loop.h" + +namespace base { + +SequencedWorkerPoolOwner::SequencedWorkerPoolOwner( + size_t max_threads, + const std::string& thread_name_prefix) + : constructor_message_loop_(MessageLoop::current()), + pool_(new SequencedWorkerPool( + max_threads, thread_name_prefix, + ALLOW_THIS_IN_INITIALIZER_LIST(this))), + has_work_call_count_(0) {} + +SequencedWorkerPoolOwner::~SequencedWorkerPoolOwner() { + pool_ = NULL; + MessageLoop::current()->Run(); +} + +const scoped_refptr<SequencedWorkerPool>& SequencedWorkerPoolOwner::pool() { + return pool_; +} + +void SequencedWorkerPoolOwner::SetWillWaitForShutdownCallback( + const Closure& callback) { + will_wait_for_shutdown_callback_ = callback; +} + +int SequencedWorkerPoolOwner::has_work_call_count() const { + AutoLock lock(has_work_lock_); + return has_work_call_count_; +} + +void SequencedWorkerPoolOwner::OnHasWork() { + AutoLock lock(has_work_lock_); + ++has_work_call_count_; +} + +void SequencedWorkerPoolOwner::WillWaitForShutdown() { + if (!will_wait_for_shutdown_callback_.is_null()) { + will_wait_for_shutdown_callback_.Run(); + } +} + +void SequencedWorkerPoolOwner::OnDestruct() { + constructor_message_loop_->PostTask( + FROM_HERE, + constructor_message_loop_->QuitClosure()); +} + +} // namespace base diff --git a/base/test/sequenced_worker_pool_owner.h b/base/test/sequenced_worker_pool_owner.h new file mode 100644 index 0000000..c6c8d676 --- /dev/null +++ b/base/test/sequenced_worker_pool_owner.h @@ -0,0 +1,61 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_UNITTEST_H_ +#define BASE_THREADING_SEQUENCED_WORKER_POOL_UNITTEST_H_ + +#include <cstddef> +#include <string> + +#include "base/basictypes.h" +#include "base/callback.h" +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "base/synchronization/lock.h" +#include "base/threading/sequenced_worker_pool.h" + +class MessageLoop; + +namespace base { + +// Wrapper around SequencedWorkerPool for testing that blocks destruction +// until the pool is actually destroyed. This is so that a +// SequencedWorkerPool from one test doesn't outlive its test and cause +// strange races with other tests that touch global stuff (like histograms and +// logging). However, this requires that nothing else on this thread holds a +// ref to the pool when the SequencedWorkerPoolOwner is destroyed. +class SequencedWorkerPoolOwner : public SequencedWorkerPool::TestingObserver { + public: + SequencedWorkerPoolOwner(size_t max_threads, + const std::string& thread_name_prefix); + + virtual ~SequencedWorkerPoolOwner(); + + // Don't change the returned pool's testing observer. + const scoped_refptr<SequencedWorkerPool>& pool(); + + // The given callback will be called on WillWaitForShutdown(). + void SetWillWaitForShutdownCallback(const Closure& callback); + + int has_work_call_count() const; + + private: + // SequencedWorkerPool::TestingObserver implementation. + virtual void OnHasWork() OVERRIDE; + virtual void WillWaitForShutdown() OVERRIDE; + virtual void OnDestruct() OVERRIDE; + + MessageLoop* const constructor_message_loop_; + scoped_refptr<SequencedWorkerPool> pool_; + Closure will_wait_for_shutdown_callback_; + + mutable Lock has_work_lock_; + int has_work_call_count_; + + DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolOwner); +}; + +} // namespace base + +#endif // BASE_THREADING_SEQUENCED_WORKER_POOL_UNITTEST_H_ diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 7054fba..61cd1ac 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -22,6 +22,7 @@ #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" +#include "base/threading/sequenced_worker_pool_task_runner.h" #include "base/threading/simple_thread.h" #include "base/time.h" #include "base/tracked_objects.h" @@ -758,6 +759,11 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( return inner_->GetNamedSequenceToken(name); } +scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( + SequenceToken token) { + return new SequencedWorkerPoolTaskRunner(this, token); +} + bool SequencedWorkerPool::PostWorkerTask( const tracked_objects::Location& from_here, const Closure& task) { diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index 99d7bb9..737470d 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -26,6 +26,8 @@ class MessageLoopProxy; template <class T> class DeleteHelper; +class SequencedTaskRunner; + // A worker thread pool that enforces ordering between sets of tasks. It also // allows you to specify what should happen to your tasks on shutdown. // @@ -152,6 +154,11 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // will be created. SequenceToken GetNamedSequenceToken(const std::string& name); + // Returns a SequencedTaskRunner wrapper which posts to this + // SequencedWorkerPool using the given sequence token. + scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( + SequenceToken token); + // Posts the given task for execution in the worker pool. Tasks posted with // this function will execute in an unspecified order on a background thread. // Returns true if the task was posted. If your tasks have ordering diff --git a/base/threading/sequenced_worker_pool_task_runner.cc b/base/threading/sequenced_worker_pool_task_runner.cc new file mode 100644 index 0000000..ed3759f --- /dev/null +++ b/base/threading/sequenced_worker_pool_task_runner.cc @@ -0,0 +1,75 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/sequenced_worker_pool_task_runner.h" + +#include "base/callback.h" +#include "base/location.h" +#include "base/logging.h" + +namespace base { + +SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( + const scoped_refptr<SequencedWorkerPool>& pool, + SequencedWorkerPool::SequenceToken token) + : pool_(pool), + token_(token) { +} + +SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { +} + +bool SequencedWorkerPoolTaskRunner::PostDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + int64 delay_ms) { + return PostDelayedTaskAssertZeroDelay(from_here, task, delay_ms); +} + +bool SequencedWorkerPoolTaskRunner::PostDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + return PostDelayedTaskAssertZeroDelay(from_here, task, delay); +} + +bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { + return pool_->RunsTasksOnCurrentThread(); +} + +bool SequencedWorkerPoolTaskRunner::PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + int64 delay_ms) { + return PostDelayedTaskAssertZeroDelay(from_here, task, delay_ms); +} + +bool SequencedWorkerPoolTaskRunner::PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + return PostDelayedTaskAssertZeroDelay(from_here, task, delay); +} + +bool SequencedWorkerPoolTaskRunner::PostDelayedTaskAssertZeroDelay( + const tracked_objects::Location& from_here, + const Closure& task, + int64 delay_ms) { + // TODO(francoisk777@gmail.com): Change the following two statements once + // SequencedWorkerPool supports non-zero delays. + DCHECK_EQ(delay_ms, 0) + << "SequencedWorkerPoolTaskRunner does not yet support non-zero delays"; + return pool_->PostSequencedWorkerTask(token_, from_here, task); +} + +bool SequencedWorkerPoolTaskRunner::PostDelayedTaskAssertZeroDelay( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + return PostDelayedTaskAssertZeroDelay(from_here, + task, + delay.InMillisecondsRoundedUp()); +} + +} // namespace base diff --git a/base/threading/sequenced_worker_pool_task_runner.h b/base/threading/sequenced_worker_pool_task_runner.h new file mode 100644 index 0000000..34d65b9 --- /dev/null +++ b/base/threading/sequenced_worker_pool_task_runner.h @@ -0,0 +1,74 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_TASK_RUNNER_H_ +#define BASE_THREADING_SEQUENCED_WORKER_POOL_TASK_RUNNER_H_ +#pragma once + +#include "base/basictypes.h" +#include "base/callback_forward.h" +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "base/sequenced_task_runner.h" +#include "base/threading/sequenced_worker_pool.h" +#include "base/time.h" + +namespace tracked_objects { +class Location; +} // namespace tracked_objects + +namespace base { + +// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a +// fixed sequence token. +// +// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). +class BASE_EXPORT SequencedWorkerPoolTaskRunner : public SequencedTaskRunner { + public: + SequencedWorkerPoolTaskRunner(const scoped_refptr<SequencedWorkerPool>& pool, + SequencedWorkerPool::SequenceToken token); + + // TaskRunner implementation + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + int64 delay_ms) OVERRIDE; + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + virtual bool RunsTasksOnCurrentThread() const OVERRIDE; + + // SequencedTaskRunner implementation + virtual bool PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + int64 delay_ms) OVERRIDE; + virtual bool PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + + private: + virtual ~SequencedWorkerPoolTaskRunner(); + + // Helper function for posting a delayed task. Asserts that the delay is + // zero because non-zero delays are not yet supported. + bool PostDelayedTaskAssertZeroDelay( + const tracked_objects::Location& from_here, + const Closure& task, + int64 delay_ms); + bool PostDelayedTaskAssertZeroDelay( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay); + + const scoped_refptr<SequencedWorkerPool> pool_; + + const SequencedWorkerPool::SequenceToken token_; + + DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); +}; + +} // namespace base + +#endif // BASE_THREADING_SEQUENCED_TASK_RUNNER_IMPL_H_ diff --git a/base/threading/sequenced_worker_pool_task_runner_unittest.cc b/base/threading/sequenced_worker_pool_task_runner_unittest.cc new file mode 100644 index 0000000..effb905 --- /dev/null +++ b/base/threading/sequenced_worker_pool_task_runner_unittest.cc @@ -0,0 +1,61 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "base/test/sequenced_task_runner_test_template.h" +#include "base/test/sequenced_worker_pool_owner.h" +#include "base/test/task_runner_test_template.h" +#include "base/threading/sequenced_worker_pool_task_runner.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +class SequencedWorkerPoolTaskRunnerTestDelegate { + public: + SequencedWorkerPoolTaskRunnerTestDelegate() {} + + ~SequencedWorkerPoolTaskRunnerTestDelegate() { + } + + void StartTaskRunner() { + pool_owner_.reset( + new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); + task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( + pool_owner_->pool()->GetSequenceToken()); + } + + scoped_refptr<SequencedTaskRunner> GetTaskRunner() { + return task_runner_; + } + + void StopTaskRunner() { + pool_owner_->pool()->FlushForTesting(); + pool_owner_->pool()->Shutdown(); + // Don't reset |pool_owner_| here, as the test may still hold a + // reference to the pool. + } + + bool TaskRunnerHandlesNonZeroDelays() const { + // TODO(akalin): Set this to true once SequencedWorkerPool handles + // non-zero delays. + return false; + } + + private: + MessageLoop message_loop_; + scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; + scoped_refptr<SequencedTaskRunner> task_runner_; +}; + +INSTANTIATE_TYPED_TEST_CASE_P( + SequencedWorkerPoolTaskRunner, TaskRunnerTest, + SequencedWorkerPoolTaskRunnerTestDelegate); + +INSTANTIATE_TYPED_TEST_CASE_P( + SequencedWorkerPoolTaskRunner, SequencedTaskRunnerTest, + SequencedWorkerPoolTaskRunnerTestDelegate); + +} // namespace base diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc index 8a3508d..1c29c34 100644 --- a/base/threading/sequenced_worker_pool_unittest.cc +++ b/base/threading/sequenced_worker_pool_unittest.cc @@ -12,6 +12,7 @@ #include "base/message_loop_proxy.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" +#include "base/test/sequenced_worker_pool_owner.h" #include "base/test/task_runner_test_template.h" #include "base/threading/platform_thread.h" #include "base/threading/sequenced_worker_pool.h" @@ -145,72 +146,6 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> { size_t started_events_; }; -// Wrapper around SequencedWorkerPool that blocks destruction until -// the pool is actually destroyed. This is so that a -// SequencedWorkerPool from one test doesn't outlive its test and -// cause strange races with other tests that touch global stuff (like -// histograms and logging). However, this requires that nothing else -// on this thread holds a ref to the pool when the -// SequencedWorkerPoolOwner is destroyed. -class SequencedWorkerPoolOwner : public SequencedWorkerPool::TestingObserver { - public: - SequencedWorkerPoolOwner(size_t max_threads, - const std::string& thread_name_prefix) - : constructor_message_loop_(MessageLoop::current()), - pool_(new SequencedWorkerPool( - max_threads, thread_name_prefix, - ALLOW_THIS_IN_INITIALIZER_LIST(this))), - has_work_call_count_(0) {} - - virtual ~SequencedWorkerPoolOwner() { - pool_ = NULL; - MessageLoop::current()->Run(); - } - - // Don't change the return pool's testing observer. - const scoped_refptr<SequencedWorkerPool>& pool() { - return pool_; - } - - // The given callback will be called on WillWaitForShutdown(). - void SetWillWaitForShutdownCallback(const Closure& callback) { - will_wait_for_shutdown_callback_ = callback; - } - - int has_work_call_count() const { - AutoLock lock(has_work_lock_); - return has_work_call_count_; - } - - private: - // SequencedWorkerPool::TestingObserver implementation. - virtual void OnHasWork() OVERRIDE { - AutoLock lock(has_work_lock_); - ++has_work_call_count_; - } - - virtual void WillWaitForShutdown() OVERRIDE { - if (!will_wait_for_shutdown_callback_.is_null()) { - will_wait_for_shutdown_callback_.Run(); - } - } - - virtual void OnDestruct() OVERRIDE { - constructor_message_loop_->PostTask( - FROM_HERE, - constructor_message_loop_->QuitClosure()); - } - - MessageLoop* const constructor_message_loop_; - scoped_refptr<SequencedWorkerPool> pool_; - Closure will_wait_for_shutdown_callback_; - - mutable Lock has_work_lock_; - int has_work_call_count_; - - DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolOwner); -}; - class SequencedWorkerPoolTest : public testing::Test { public: SequencedWorkerPoolTest() |