summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authorfrancoisk777@gmail.com <francoisk777@gmail.com@0039d316-1c4b-4281-b951-d872f2087c98>2012-04-02 10:16:55 +0000
committerfrancoisk777@gmail.com <francoisk777@gmail.com@0039d316-1c4b-4281-b951-d872f2087c98>2012-04-02 10:16:55 +0000
commitce5d047544f7e28cdc7693b86ffc78e3cfd01a2a (patch)
tree1a6cc4b3cd2dae37cfec2b1099be4610862c17f6 /base
parent6fe6965e2d0e4ac248d6825a88b5e53a77ac5ffe (diff)
downloadchromium_src-ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a.zip
chromium_src-ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a.tar.gz
chromium_src-ce5d047544f7e28cdc7693b86ffc78e3cfd01a2a.tar.bz2
Implementation of SequencedTaskRunner based on SequencedWorkerPool.
Also includes specification tests for SequencedTaskRunner. BUG=114330,114327 TEST=--gtest_filter=SequencedWorkerPoolTaskRunner* Review URL: http://codereview.chromium.org/9663075 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@130113 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/base.gyp5
-rw-r--r--base/base.gypi2
-rw-r--r--base/test/sequenced_task_runner_test_template.cc263
-rw-r--r--base/test/sequenced_task_runner_test_template.h243
-rw-r--r--base/test/sequenced_worker_pool_owner.cc57
-rw-r--r--base/test/sequenced_worker_pool_owner.h61
-rw-r--r--base/threading/sequenced_worker_pool.cc6
-rw-r--r--base/threading/sequenced_worker_pool.h7
-rw-r--r--base/threading/sequenced_worker_pool_task_runner.cc75
-rw-r--r--base/threading/sequenced_worker_pool_task_runner.h74
-rw-r--r--base/threading/sequenced_worker_pool_task_runner_unittest.cc61
-rw-r--r--base/threading/sequenced_worker_pool_unittest.cc67
12 files changed, 855 insertions, 66 deletions
diff --git a/base/base.gyp b/base/base.gyp
index 02c8585..ea037758 100644
--- a/base/base.gyp
+++ b/base/base.gyp
@@ -259,9 +259,12 @@
'sys_string_conversions_unittest.cc',
'system_monitor/system_monitor_unittest.cc',
'template_util_unittest.cc',
+ 'test/sequenced_worker_pool_owner.cc',
+ 'test/sequenced_worker_pool_owner.h',
'test/trace_event_analyzer_unittest.cc',
'threading/non_thread_safe_unittest.cc',
'threading/platform_thread_unittest.cc',
+ 'threading/sequenced_worker_pool_task_runner_unittest.cc',
'threading/sequenced_worker_pool_unittest.cc',
'threading/simple_thread_unittest.cc',
'threading/thread_checker_unittest.cc',
@@ -452,6 +455,8 @@
'test/perf_test_suite.h',
'test/scoped_locale.cc',
'test/scoped_locale.h',
+ 'test/sequenced_task_runner_test_template.cc',
+ 'test/sequenced_task_runner_test_template.h',
'test/task_runner_test_template.cc',
'test/task_runner_test_template.h',
'test/test_file_util.h',
diff --git a/base/base.gypi b/base/base.gypi
index aecba8b..6707a98 100644
--- a/base/base.gypi
+++ b/base/base.gypi
@@ -346,6 +346,8 @@
'threading/post_task_and_reply_impl.h',
'threading/sequenced_worker_pool.cc',
'threading/sequenced_worker_pool.h',
+ 'threading/sequenced_worker_pool_task_runner.cc',
+ 'threading/sequenced_worker_pool_task_runner.h',
'threading/simple_thread.cc',
'threading/simple_thread.h',
'threading/thread.cc',
diff --git a/base/test/sequenced_task_runner_test_template.cc b/base/test/sequenced_task_runner_test_template.cc
new file mode 100644
index 0000000..3b58973
--- /dev/null
+++ b/base/test/sequenced_task_runner_test_template.cc
@@ -0,0 +1,263 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/test/sequenced_task_runner_test_template.h"
+
+#include <ostream>
+
+#include "base/location.h"
+
+namespace base {
+
+namespace internal {
+
+TaskEvent::TaskEvent(int i, Type type)
+ : i(i), type(type) {
+}
+
+SequencedTaskTracker::SequencedTaskTracker() : next_post_i_(0) {
+}
+
+void SequencedTaskTracker::PostWrappedNonNestableTask(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ const Closure& task) {
+ AutoLock event_lock(lock_);
+ const int post_i = next_post_i_++;
+ Closure wrapped_task = Bind(&SequencedTaskTracker::RunTask, this,
+ task, post_i);
+ task_runner->PostNonNestableTask(FROM_HERE, wrapped_task);
+ TaskPosted(post_i);
+}
+
+void SequencedTaskTracker::PostWrappedNestableTask(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ const Closure& task) {
+ AutoLock event_lock(lock_);
+ const int post_i = next_post_i_++;
+ Closure wrapped_task = Bind(&SequencedTaskTracker::RunTask, this,
+ task, post_i);
+ task_runner->PostTask(FROM_HERE, wrapped_task);
+ TaskPosted(post_i);
+}
+
+void SequencedTaskTracker::PostWrappedDelayedNonNestableTask(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ const Closure& task,
+ TimeDelta delay) {
+ AutoLock event_lock(lock_);
+ const int post_i = next_post_i_++;
+ Closure wrapped_task = Bind(&SequencedTaskTracker::RunTask, this,
+ task, post_i);
+ task_runner->PostNonNestableDelayedTask(FROM_HERE, wrapped_task, delay);
+ TaskPosted(post_i);
+}
+
+void SequencedTaskTracker::PostNonNestableTasks(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ int task_count) {
+ for (int i = 0; i < task_count; ++i) {
+ PostWrappedNonNestableTask(task_runner, Closure());
+ }
+}
+
+void SequencedTaskTracker::RunTask(const Closure& task, int task_i) {
+ TaskStarted(task_i);
+ if (!task.is_null())
+ task.Run();
+ TaskEnded(task_i);
+}
+
+void SequencedTaskTracker::TaskPosted(int i) {
+ // Caller must own |lock_|.
+ events_.push_back(TaskEvent(i, TaskEvent::POST));
+}
+
+void SequencedTaskTracker::TaskStarted(int i) {
+ AutoLock lock(lock_);
+ events_.push_back(TaskEvent(i, TaskEvent::START));
+}
+
+void SequencedTaskTracker::TaskEnded(int i) {
+ AutoLock lock(lock_);
+ events_.push_back(TaskEvent(i, TaskEvent::END));
+}
+
+const std::vector<TaskEvent>&
+SequencedTaskTracker::GetTaskEvents() const {
+ return events_;
+}
+
+SequencedTaskTracker::~SequencedTaskTracker() {
+}
+
+void PrintTo(const TaskEvent& event, std::ostream* os) {
+ *os << "(i=" << event.i << ", type=";
+ switch (event.type) {
+ case TaskEvent::POST: *os << "POST"; break;
+ case TaskEvent::START: *os << "START"; break;
+ case TaskEvent::END: *os << "END"; break;
+ }
+ *os << ")";
+}
+
+void SleepForOneSecond() {
+ base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
+}
+
+namespace {
+
+// Returns the task ordinals for the task event type |type| in the order that
+// they were recorded.
+std::vector<int> GetEventTypeOrder(const std::vector<TaskEvent>& events,
+ TaskEvent::Type type) {
+ std::vector<int> tasks;
+ std::vector<TaskEvent>::const_iterator event;
+ for (event = events.begin(); event != events.end(); ++event) {
+ if (event->type == type)
+ tasks.push_back(event->i);
+ }
+ return tasks;
+}
+
+// Returns all task events for task |task_i|.
+std::vector<TaskEvent::Type> GetEventsForTask(
+ const std::vector<TaskEvent>& events,
+ int task_i) {
+ std::vector<TaskEvent::Type> task_event_orders;
+ std::vector<TaskEvent>::const_iterator event;
+ for (event = events.begin(); event != events.end(); ++event) {
+ if (event->i == task_i)
+ task_event_orders.push_back(event->type);
+ }
+ return task_event_orders;
+}
+
+// Checks that the task events for each task in |events| occur in the order
+// {POST, START, END}, and that there is only one instance of each event type
+// per task.
+::testing::AssertionResult CheckEventOrdersForEachTask(
+ const std::vector<TaskEvent>& events,
+ int task_count) {
+ std::vector<TaskEvent::Type> expected_order;
+ expected_order.push_back(TaskEvent::POST);
+ expected_order.push_back(TaskEvent::START);
+ expected_order.push_back(TaskEvent::END);
+
+ // This is O(n^2), but it runs fast enough currently so is not worth
+ // optimizing.
+ for (int i = 0; i < task_count; ++i) {
+ const std::vector<TaskEvent::Type> task_events =
+ GetEventsForTask(events, i);
+ if (task_events != expected_order) {
+ return ::testing::AssertionFailure()
+ << "Events for task " << i << " are out of order; expected: "
+ << ::testing::PrintToString(expected_order) << "; actual: "
+ << ::testing::PrintToString(task_events);
+ }
+ }
+ return ::testing::AssertionSuccess();
+}
+
+// Checks that no two tasks were running at the same time. I.e. the only
+// events allowed between the START and END of a task are the POSTs of other
+// tasks.
+::testing::AssertionResult CheckNoTaskRunsOverlap(
+ const std::vector<TaskEvent>& events) {
+ // If > -1, we're currently inside a START, END pair.
+ int current_task_i = -1;
+
+ std::vector<TaskEvent>::const_iterator event;
+ for (event = events.begin(); event != events.end(); ++event) {
+ bool spurious_event_found = false;
+
+ if (current_task_i == -1) { // Not inside a START, END pair.
+ switch (event->type) {
+ case TaskEvent::POST:
+ break;
+ case TaskEvent::START:
+ current_task_i = event->i;
+ break;
+ case TaskEvent::END:
+ spurious_event_found = true;
+ break;
+ }
+
+ } else { // Inside a START, END pair.
+ bool interleaved_task_detected = false;
+
+ switch (event->type) {
+ case TaskEvent::POST:
+ if (event->i == current_task_i)
+ spurious_event_found = true;
+ break;
+ case TaskEvent::START:
+ interleaved_task_detected = true;
+ break;
+ case TaskEvent::END:
+ if (event->i != current_task_i)
+ interleaved_task_detected = true;
+ else
+ current_task_i = -1;
+ break;
+ }
+
+ if (interleaved_task_detected) {
+ return ::testing::AssertionFailure()
+ << "Found event " << ::testing::PrintToString(*event)
+ << " between START and END events for task " << current_task_i
+ << "; event dump: " << ::testing::PrintToString(events);
+ }
+ }
+
+ if (spurious_event_found) {
+ const int event_i = event - events.begin();
+ return ::testing::AssertionFailure()
+ << "Spurious event " << ::testing::PrintToString(*event)
+ << " at position " << event_i << "; event dump: "
+ << ::testing::PrintToString(events);
+ }
+ }
+
+ return ::testing::AssertionSuccess();
+}
+
+} // namespace
+
+::testing::AssertionResult CheckNonNestableInvariants(
+ const std::vector<TaskEvent>& events,
+ int task_count) {
+ const std::vector<int> post_order =
+ GetEventTypeOrder(events, TaskEvent::POST);
+ const std::vector<int> start_order =
+ GetEventTypeOrder(events, TaskEvent::START);
+ const std::vector<int> end_order =
+ GetEventTypeOrder(events, TaskEvent::END);
+
+ if (start_order != post_order) {
+ return ::testing::AssertionFailure()
+ << "Expected START order (which equals actual POST order): \n"
+ << ::testing::PrintToString(post_order)
+ << "\n Actual START order:\n"
+ << ::testing::PrintToString(start_order);
+ }
+
+ if (end_order != post_order) {
+ return ::testing::AssertionFailure()
+ << "Expected END order (which equals actual POST order): \n"
+ << ::testing::PrintToString(post_order)
+ << "\n Actual END order:\n"
+ << ::testing::PrintToString(end_order);
+ }
+
+ const ::testing::AssertionResult result =
+ CheckEventOrdersForEachTask(events, task_count);
+ if (!result)
+ return result;
+
+ return CheckNoTaskRunsOverlap(events);
+}
+
+} // namespace internal
+
+} // namespace base
diff --git a/base/test/sequenced_task_runner_test_template.h b/base/test/sequenced_task_runner_test_template.h
new file mode 100644
index 0000000..ff337ff
--- /dev/null
+++ b/base/test/sequenced_task_runner_test_template.h
@@ -0,0 +1,243 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// This class defines tests that implementations of SequencedTaskRunner should
+// pass in order to be conformant. See task_runner_test_template.h for a
+// description of how to use the constructs in this file; these work the same.
+
+#ifndef BASE_SEQUENCED_TASK_RUNNER_TEST_TEMPLATE_H_
+#define BASE_SEQUENCED_TASK_RUNNER_TEST_TEMPLATE_H_
+#pragma once
+
+#include <cstddef>
+#include <iosfwd>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/memory/ref_counted.h"
+#include "base/sequenced_task_runner.h"
+#include "base/synchronization/lock.h"
+#include "base/time.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+namespace internal {
+
+struct TaskEvent {
+ enum Type { POST, START, END };
+ TaskEvent(int i, Type type);
+ int i;
+ Type type;
+};
+
+// Utility class used in the tests below.
+class SequencedTaskTracker : public RefCountedThreadSafe<SequencedTaskTracker> {
+ public:
+ SequencedTaskTracker();
+
+ // Posts the non-nestable task |task|, and records its post event.
+ void PostWrappedNonNestableTask(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ const Closure& task);
+
+ // Posts the nestable task |task|, and records its post event.
+ void PostWrappedNestableTask(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ const Closure& task);
+
+ // Posts the delayed non-nestable task |task|, and records its post event.
+ void PostWrappedDelayedNonNestableTask(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ const Closure& task,
+ TimeDelta delay);
+
+ // Posts |task_count| non-nestable tasks.
+ void PostNonNestableTasks(
+ const scoped_refptr<SequencedTaskRunner>& task_runner,
+ int task_count);
+
+ const std::vector<TaskEvent>& GetTaskEvents() const;
+
+ private:
+ friend class RefCountedThreadSafe<SequencedTaskTracker>;
+
+ ~SequencedTaskTracker();
+
+ // A task which runs |task|, recording the start and end events.
+ void RunTask(const Closure& task, int task_i);
+
+ // Records a post event for task |i|. The owner is expected to be holding
+ // |lock_| (unlike |TaskStarted| and |TaskEnded|).
+ void TaskPosted(int i);
+
+ // Records a start event for task |i|.
+ void TaskStarted(int i);
+
+ // Records a end event for task |i|.
+ void TaskEnded(int i);
+
+ // Protects events_ and next_post_i_.
+ Lock lock_;
+
+ // The events as they occurred for each task (protected by lock_).
+ std::vector<TaskEvent> events_;
+
+ // The ordinal to be used for the next task-posting task (protected by
+ // lock_).
+ int next_post_i_;
+
+ DISALLOW_COPY_AND_ASSIGN(SequencedTaskTracker);
+};
+
+void PrintTo(const TaskEvent& event, std::ostream* os);
+
+void SleepForOneSecond();
+
+// Checks the non-nestable task invariants for all tasks in |events|.
+//
+// The invariants are:
+// 1) Events started and ended in the same order that they were posted.
+// 2) Events for an individual tasks occur in the order {POST, START, END},
+// and there is only one instance of each event type for a task.
+// 3) The only events between a task's START and END events are the POSTs of
+// other tasks. I.e. tasks were run sequentially, not interleaved.
+::testing::AssertionResult CheckNonNestableInvariants(
+ const std::vector<TaskEvent>& events,
+ int task_count);
+
+} // namespace internal
+
+template <typename TaskRunnerTestDelegate>
+class SequencedTaskRunnerTest : public testing::Test {
+ protected:
+ SequencedTaskRunnerTest()
+ : task_tracker_(new internal::SequencedTaskTracker()) {}
+
+ const scoped_refptr<internal::SequencedTaskTracker> task_tracker_;
+ TaskRunnerTestDelegate delegate_;
+};
+
+TYPED_TEST_CASE_P(SequencedTaskRunnerTest);
+
+// This test posts N non-nestable tasks in sequence, and expects them to run
+// in FIFO order, with no part of any two tasks' execution
+// overlapping. I.e. that each task starts only after the previously-posted
+// one has finished.
+TYPED_TEST_P(SequencedTaskRunnerTest, SequentialNonNestable) {
+ const int task_count = 1000;
+
+ this->delegate_.StartTaskRunner();
+ const scoped_refptr<SequencedTaskRunner> task_runner =
+ this->delegate_.GetTaskRunner();
+
+ this->task_tracker_->PostWrappedNonNestableTask(
+ task_runner, Bind(&internal::SleepForOneSecond));
+ for (int i = 1; i < task_count; ++i) {
+ this->task_tracker_->PostWrappedNonNestableTask(task_runner, Closure());
+ }
+
+ this->delegate_.StopTaskRunner();
+
+ EXPECT_TRUE(CheckNonNestableInvariants(this->task_tracker_->GetTaskEvents(),
+ task_count));
+}
+
+// This test posts N nestable tasks in sequence. It has the same expectations
+// as SequentialNonNestable because even though the tasks are nestable, they
+// will not be run nestedly in this case.
+TYPED_TEST_P(SequencedTaskRunnerTest, SequentialNestable) {
+ const int task_count = 1000;
+
+ this->delegate_.StartTaskRunner();
+ const scoped_refptr<SequencedTaskRunner> task_runner =
+ this->delegate_.GetTaskRunner();
+
+ this->task_tracker_->PostWrappedNestableTask(
+ task_runner,
+ Bind(&internal::SleepForOneSecond));
+ for (int i = 1; i < task_count; ++i) {
+ this->task_tracker_->PostWrappedNestableTask(task_runner, Closure());
+ }
+
+ this->delegate_.StopTaskRunner();
+
+ EXPECT_TRUE(CheckNonNestableInvariants(this->task_tracker_->GetTaskEvents(),
+ task_count));
+}
+
+// This test posts non-nestable tasks in order of increasing delay, and checks
+// that that the tasks are run in FIFO order and that there is no execution
+// overlap whatsoever between any two tasks.
+TYPED_TEST_P(SequencedTaskRunnerTest, SequentialDelayedNonNestable) {
+ if (!this->delegate_.TaskRunnerHandlesNonZeroDelays()) {
+ DLOG(INFO) << "This SequencedTaskRunner doesn't handle "
+ "non-zero delays; skipping";
+ return;
+ }
+
+ const int task_count = 20;
+ const int delay_increment_ms = 50;
+
+ this->delegate_.StartTaskRunner();
+ const scoped_refptr<SequencedTaskRunner> task_runner =
+ this->delegate_.GetTaskRunner();
+
+ for (int i = 0; i < task_count; ++i) {
+ this->task_tracker_->PostWrappedDelayedNonNestableTask(
+ task_runner,
+ Closure(),
+ TimeDelta::FromMilliseconds(delay_increment_ms * i));
+ }
+
+ this->delegate_.StopTaskRunner();
+
+ EXPECT_TRUE(CheckNonNestableInvariants(this->task_tracker_->GetTaskEvents(),
+ task_count));
+}
+
+// This test posts a fast, non-nestable task from within each of a number of
+// slow, non-nestable tasks and checks that they all run in the sequence they
+// were posted in and that there is no execution overlap whatsoever.
+TYPED_TEST_P(SequencedTaskRunnerTest, NonNestablePostFromNonNestableTask) {
+ const int parent_count = 10;
+ const int children_per_parent = 10;
+
+ this->delegate_.StartTaskRunner();
+ const scoped_refptr<SequencedTaskRunner> task_runner =
+ this->delegate_.GetTaskRunner();
+
+ for (int i = 0; i < parent_count; ++i) {
+ Closure task = Bind(
+ &internal::SequencedTaskTracker::PostNonNestableTasks,
+ this->task_tracker_,
+ task_runner,
+ children_per_parent);
+ this->task_tracker_->PostWrappedNonNestableTask(task_runner, task);
+ }
+
+ this->delegate_.StopTaskRunner();
+
+ EXPECT_TRUE(CheckNonNestableInvariants(
+ this->task_tracker_->GetTaskEvents(),
+ parent_count * (children_per_parent + 1)));
+}
+
+// TODO(francoisk777@gmail.com) Add a test, similiar to the above, which runs
+// some tasked nestedly (which should be implemented in the test
+// delegate). Also add, to the the test delegate, a predicate which checks
+// whether the implementation supports nested tasks.
+//
+
+REGISTER_TYPED_TEST_CASE_P(SequencedTaskRunnerTest,
+ SequentialNonNestable,
+ SequentialNestable,
+ SequentialDelayedNonNestable,
+ NonNestablePostFromNonNestableTask);
+
+} // namespace base
+
+#endif // BASE_TASK_RUNNER_TEST_TEMPLATE_H_
diff --git a/base/test/sequenced_worker_pool_owner.cc b/base/test/sequenced_worker_pool_owner.cc
new file mode 100644
index 0000000..3d124fa
--- /dev/null
+++ b/base/test/sequenced_worker_pool_owner.cc
@@ -0,0 +1,57 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/test/sequenced_worker_pool_owner.h"
+
+#include "base/location.h"
+#include "base/message_loop.h"
+
+namespace base {
+
+SequencedWorkerPoolOwner::SequencedWorkerPoolOwner(
+ size_t max_threads,
+ const std::string& thread_name_prefix)
+ : constructor_message_loop_(MessageLoop::current()),
+ pool_(new SequencedWorkerPool(
+ max_threads, thread_name_prefix,
+ ALLOW_THIS_IN_INITIALIZER_LIST(this))),
+ has_work_call_count_(0) {}
+
+SequencedWorkerPoolOwner::~SequencedWorkerPoolOwner() {
+ pool_ = NULL;
+ MessageLoop::current()->Run();
+}
+
+const scoped_refptr<SequencedWorkerPool>& SequencedWorkerPoolOwner::pool() {
+ return pool_;
+}
+
+void SequencedWorkerPoolOwner::SetWillWaitForShutdownCallback(
+ const Closure& callback) {
+ will_wait_for_shutdown_callback_ = callback;
+}
+
+int SequencedWorkerPoolOwner::has_work_call_count() const {
+ AutoLock lock(has_work_lock_);
+ return has_work_call_count_;
+}
+
+void SequencedWorkerPoolOwner::OnHasWork() {
+ AutoLock lock(has_work_lock_);
+ ++has_work_call_count_;
+}
+
+void SequencedWorkerPoolOwner::WillWaitForShutdown() {
+ if (!will_wait_for_shutdown_callback_.is_null()) {
+ will_wait_for_shutdown_callback_.Run();
+ }
+}
+
+void SequencedWorkerPoolOwner::OnDestruct() {
+ constructor_message_loop_->PostTask(
+ FROM_HERE,
+ constructor_message_loop_->QuitClosure());
+}
+
+} // namespace base
diff --git a/base/test/sequenced_worker_pool_owner.h b/base/test/sequenced_worker_pool_owner.h
new file mode 100644
index 0000000..c6c8d676
--- /dev/null
+++ b/base/test/sequenced_worker_pool_owner.h
@@ -0,0 +1,61 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_UNITTEST_H_
+#define BASE_THREADING_SEQUENCED_WORKER_POOL_UNITTEST_H_
+
+#include <cstddef>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/callback.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/sequenced_worker_pool.h"
+
+class MessageLoop;
+
+namespace base {
+
+// Wrapper around SequencedWorkerPool for testing that blocks destruction
+// until the pool is actually destroyed. This is so that a
+// SequencedWorkerPool from one test doesn't outlive its test and cause
+// strange races with other tests that touch global stuff (like histograms and
+// logging). However, this requires that nothing else on this thread holds a
+// ref to the pool when the SequencedWorkerPoolOwner is destroyed.
+class SequencedWorkerPoolOwner : public SequencedWorkerPool::TestingObserver {
+ public:
+ SequencedWorkerPoolOwner(size_t max_threads,
+ const std::string& thread_name_prefix);
+
+ virtual ~SequencedWorkerPoolOwner();
+
+ // Don't change the returned pool's testing observer.
+ const scoped_refptr<SequencedWorkerPool>& pool();
+
+ // The given callback will be called on WillWaitForShutdown().
+ void SetWillWaitForShutdownCallback(const Closure& callback);
+
+ int has_work_call_count() const;
+
+ private:
+ // SequencedWorkerPool::TestingObserver implementation.
+ virtual void OnHasWork() OVERRIDE;
+ virtual void WillWaitForShutdown() OVERRIDE;
+ virtual void OnDestruct() OVERRIDE;
+
+ MessageLoop* const constructor_message_loop_;
+ scoped_refptr<SequencedWorkerPool> pool_;
+ Closure will_wait_for_shutdown_callback_;
+
+ mutable Lock has_work_lock_;
+ int has_work_call_count_;
+
+ DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolOwner);
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_SEQUENCED_WORKER_POOL_UNITTEST_H_
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 7054fba..61cd1ac 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -22,6 +22,7 @@
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
+#include "base/threading/sequenced_worker_pool_task_runner.h"
#include "base/threading/simple_thread.h"
#include "base/time.h"
#include "base/tracked_objects.h"
@@ -758,6 +759,11 @@ SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
return inner_->GetNamedSequenceToken(name);
}
+scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
+ SequenceToken token) {
+ return new SequencedWorkerPoolTaskRunner(this, token);
+}
+
bool SequencedWorkerPool::PostWorkerTask(
const tracked_objects::Location& from_here,
const Closure& task) {
diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h
index 99d7bb9..737470d 100644
--- a/base/threading/sequenced_worker_pool.h
+++ b/base/threading/sequenced_worker_pool.h
@@ -26,6 +26,8 @@ class MessageLoopProxy;
template <class T> class DeleteHelper;
+class SequencedTaskRunner;
+
// A worker thread pool that enforces ordering between sets of tasks. It also
// allows you to specify what should happen to your tasks on shutdown.
//
@@ -152,6 +154,11 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// will be created.
SequenceToken GetNamedSequenceToken(const std::string& name);
+ // Returns a SequencedTaskRunner wrapper which posts to this
+ // SequencedWorkerPool using the given sequence token.
+ scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
+ SequenceToken token);
+
// Posts the given task for execution in the worker pool. Tasks posted with
// this function will execute in an unspecified order on a background thread.
// Returns true if the task was posted. If your tasks have ordering
diff --git a/base/threading/sequenced_worker_pool_task_runner.cc b/base/threading/sequenced_worker_pool_task_runner.cc
new file mode 100644
index 0000000..ed3759f
--- /dev/null
+++ b/base/threading/sequenced_worker_pool_task_runner.cc
@@ -0,0 +1,75 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/sequenced_worker_pool_task_runner.h"
+
+#include "base/callback.h"
+#include "base/location.h"
+#include "base/logging.h"
+
+namespace base {
+
+SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
+ const scoped_refptr<SequencedWorkerPool>& pool,
+ SequencedWorkerPool::SequenceToken token)
+ : pool_(pool),
+ token_(token) {
+}
+
+SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
+}
+
+bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ int64 delay_ms) {
+ return PostDelayedTaskAssertZeroDelay(from_here, task, delay_ms);
+}
+
+bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) {
+ return PostDelayedTaskAssertZeroDelay(from_here, task, delay);
+}
+
+bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
+ return pool_->RunsTasksOnCurrentThread();
+}
+
+bool SequencedWorkerPoolTaskRunner::PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ int64 delay_ms) {
+ return PostDelayedTaskAssertZeroDelay(from_here, task, delay_ms);
+}
+
+bool SequencedWorkerPoolTaskRunner::PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) {
+ return PostDelayedTaskAssertZeroDelay(from_here, task, delay);
+}
+
+bool SequencedWorkerPoolTaskRunner::PostDelayedTaskAssertZeroDelay(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ int64 delay_ms) {
+ // TODO(francoisk777@gmail.com): Change the following two statements once
+ // SequencedWorkerPool supports non-zero delays.
+ DCHECK_EQ(delay_ms, 0)
+ << "SequencedWorkerPoolTaskRunner does not yet support non-zero delays";
+ return pool_->PostSequencedWorkerTask(token_, from_here, task);
+}
+
+bool SequencedWorkerPoolTaskRunner::PostDelayedTaskAssertZeroDelay(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) {
+ return PostDelayedTaskAssertZeroDelay(from_here,
+ task,
+ delay.InMillisecondsRoundedUp());
+}
+
+} // namespace base
diff --git a/base/threading/sequenced_worker_pool_task_runner.h b/base/threading/sequenced_worker_pool_task_runner.h
new file mode 100644
index 0000000..34d65b9
--- /dev/null
+++ b/base/threading/sequenced_worker_pool_task_runner.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_TASK_RUNNER_H_
+#define BASE_THREADING_SEQUENCED_WORKER_POOL_TASK_RUNNER_H_
+#pragma once
+
+#include "base/basictypes.h"
+#include "base/callback_forward.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/sequenced_task_runner.h"
+#include "base/threading/sequenced_worker_pool.h"
+#include "base/time.h"
+
+namespace tracked_objects {
+class Location;
+} // namespace tracked_objects
+
+namespace base {
+
+// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
+// fixed sequence token.
+//
+// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
+class BASE_EXPORT SequencedWorkerPoolTaskRunner : public SequencedTaskRunner {
+ public:
+ SequencedWorkerPoolTaskRunner(const scoped_refptr<SequencedWorkerPool>& pool,
+ SequencedWorkerPool::SequenceToken token);
+
+ // TaskRunner implementation
+ virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ int64 delay_ms) OVERRIDE;
+ virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) OVERRIDE;
+ virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
+
+ // SequencedTaskRunner implementation
+ virtual bool PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ int64 delay_ms) OVERRIDE;
+ virtual bool PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) OVERRIDE;
+
+ private:
+ virtual ~SequencedWorkerPoolTaskRunner();
+
+ // Helper function for posting a delayed task. Asserts that the delay is
+ // zero because non-zero delays are not yet supported.
+ bool PostDelayedTaskAssertZeroDelay(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ int64 delay_ms);
+ bool PostDelayedTaskAssertZeroDelay(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay);
+
+ const scoped_refptr<SequencedWorkerPool> pool_;
+
+ const SequencedWorkerPool::SequenceToken token_;
+
+ DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_SEQUENCED_TASK_RUNNER_IMPL_H_
diff --git a/base/threading/sequenced_worker_pool_task_runner_unittest.cc b/base/threading/sequenced_worker_pool_task_runner_unittest.cc
new file mode 100644
index 0000000..effb905
--- /dev/null
+++ b/base/threading/sequenced_worker_pool_task_runner_unittest.cc
@@ -0,0 +1,61 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "base/test/sequenced_task_runner_test_template.h"
+#include "base/test/sequenced_worker_pool_owner.h"
+#include "base/test/task_runner_test_template.h"
+#include "base/threading/sequenced_worker_pool_task_runner.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+class SequencedWorkerPoolTaskRunnerTestDelegate {
+ public:
+ SequencedWorkerPoolTaskRunnerTestDelegate() {}
+
+ ~SequencedWorkerPoolTaskRunnerTestDelegate() {
+ }
+
+ void StartTaskRunner() {
+ pool_owner_.reset(
+ new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
+ task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
+ pool_owner_->pool()->GetSequenceToken());
+ }
+
+ scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
+ return task_runner_;
+ }
+
+ void StopTaskRunner() {
+ pool_owner_->pool()->FlushForTesting();
+ pool_owner_->pool()->Shutdown();
+ // Don't reset |pool_owner_| here, as the test may still hold a
+ // reference to the pool.
+ }
+
+ bool TaskRunnerHandlesNonZeroDelays() const {
+ // TODO(akalin): Set this to true once SequencedWorkerPool handles
+ // non-zero delays.
+ return false;
+ }
+
+ private:
+ MessageLoop message_loop_;
+ scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
+ scoped_refptr<SequencedTaskRunner> task_runner_;
+};
+
+INSTANTIATE_TYPED_TEST_CASE_P(
+ SequencedWorkerPoolTaskRunner, TaskRunnerTest,
+ SequencedWorkerPoolTaskRunnerTestDelegate);
+
+INSTANTIATE_TYPED_TEST_CASE_P(
+ SequencedWorkerPoolTaskRunner, SequencedTaskRunnerTest,
+ SequencedWorkerPoolTaskRunnerTestDelegate);
+
+} // namespace base
diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc
index 8a3508d..1c29c34 100644
--- a/base/threading/sequenced_worker_pool_unittest.cc
+++ b/base/threading/sequenced_worker_pool_unittest.cc
@@ -12,6 +12,7 @@
#include "base/message_loop_proxy.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
+#include "base/test/sequenced_worker_pool_owner.h"
#include "base/test/task_runner_test_template.h"
#include "base/threading/platform_thread.h"
#include "base/threading/sequenced_worker_pool.h"
@@ -145,72 +146,6 @@ class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
size_t started_events_;
};
-// Wrapper around SequencedWorkerPool that blocks destruction until
-// the pool is actually destroyed. This is so that a
-// SequencedWorkerPool from one test doesn't outlive its test and
-// cause strange races with other tests that touch global stuff (like
-// histograms and logging). However, this requires that nothing else
-// on this thread holds a ref to the pool when the
-// SequencedWorkerPoolOwner is destroyed.
-class SequencedWorkerPoolOwner : public SequencedWorkerPool::TestingObserver {
- public:
- SequencedWorkerPoolOwner(size_t max_threads,
- const std::string& thread_name_prefix)
- : constructor_message_loop_(MessageLoop::current()),
- pool_(new SequencedWorkerPool(
- max_threads, thread_name_prefix,
- ALLOW_THIS_IN_INITIALIZER_LIST(this))),
- has_work_call_count_(0) {}
-
- virtual ~SequencedWorkerPoolOwner() {
- pool_ = NULL;
- MessageLoop::current()->Run();
- }
-
- // Don't change the return pool's testing observer.
- const scoped_refptr<SequencedWorkerPool>& pool() {
- return pool_;
- }
-
- // The given callback will be called on WillWaitForShutdown().
- void SetWillWaitForShutdownCallback(const Closure& callback) {
- will_wait_for_shutdown_callback_ = callback;
- }
-
- int has_work_call_count() const {
- AutoLock lock(has_work_lock_);
- return has_work_call_count_;
- }
-
- private:
- // SequencedWorkerPool::TestingObserver implementation.
- virtual void OnHasWork() OVERRIDE {
- AutoLock lock(has_work_lock_);
- ++has_work_call_count_;
- }
-
- virtual void WillWaitForShutdown() OVERRIDE {
- if (!will_wait_for_shutdown_callback_.is_null()) {
- will_wait_for_shutdown_callback_.Run();
- }
- }
-
- virtual void OnDestruct() OVERRIDE {
- constructor_message_loop_->PostTask(
- FROM_HERE,
- constructor_message_loop_->QuitClosure());
- }
-
- MessageLoop* const constructor_message_loop_;
- scoped_refptr<SequencedWorkerPool> pool_;
- Closure will_wait_for_shutdown_callback_;
-
- mutable Lock has_work_lock_;
- int has_work_call_count_;
-
- DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolOwner);
-};
-
class SequencedWorkerPoolTest : public testing::Test {
public:
SequencedWorkerPoolTest()