summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authorbauerb <bauerb@chromium.org>2015-10-30 02:05:31 -0700
committerCommit bot <commit-bot@chromium.org>2015-10-30 09:06:08 +0000
commit8a4087226c482fadb886585d97081adf379e6b68 (patch)
tree1e21a013ccfa1b435c1b33c5e2993357b02bab66 /base
parentfff0ba0a41392542d3b6238ab928f96e1ddad7b0 (diff)
downloadchromium_src-8a4087226c482fadb886585d97081adf379e6b68.zip
chromium_src-8a4087226c482fadb886585d97081adf379e6b68.tar.gz
chromium_src-8a4087226c482fadb886585d97081adf379e6b68.tar.bz2
Add SequencedTaskRunnerHandle to get a SequencedTaskRunner for the current thread / sequence.
BUG=546596 Review URL: https://codereview.chromium.org/1423773003 Cr-Commit-Position: refs/heads/master@{#357067}
Diffstat (limited to 'base')
-rw-r--r--base/BUILD.gn3
-rw-r--r--base/base.gyp1
-rw-r--r--base/base.gypi2
-rw-r--r--base/thread_task_runner_handle.h1
-rw-r--r--base/threading/sequenced_task_runner_handle.cc38
-rw-r--r--base/threading/sequenced_task_runner_handle.h36
-rw-r--r--base/threading/sequenced_task_runner_handle_unittest.cc81
-rw-r--r--base/threading/sequenced_worker_pool.cc60
-rw-r--r--base/threading/sequenced_worker_pool.h18
-rw-r--r--base/threading/sequenced_worker_pool_unittest.cc43
10 files changed, 264 insertions, 19 deletions
diff --git a/base/BUILD.gn b/base/BUILD.gn
index efc3754..bb8bd55 100644
--- a/base/BUILD.gn
+++ b/base/BUILD.gn
@@ -547,6 +547,8 @@ component("base") {
"threading/platform_thread_win.cc",
"threading/post_task_and_reply_impl.cc",
"threading/post_task_and_reply_impl.h",
+ "threading/sequenced_task_runner_handle.cc",
+ "threading/sequenced_task_runner_handle.h",
"threading/sequenced_worker_pool.cc",
"threading/sequenced_worker_pool.h",
"threading/simple_thread.cc",
@@ -1435,6 +1437,7 @@ test("base_unittests") {
"test/user_action_tester_unittest.cc",
"threading/non_thread_safe_unittest.cc",
"threading/platform_thread_unittest.cc",
+ "threading/sequenced_task_runner_handle_unittest.cc",
"threading/sequenced_worker_pool_unittest.cc",
"threading/simple_thread_unittest.cc",
"threading/thread_checker_unittest.cc",
diff --git a/base/base.gyp b/base/base.gyp
index 2b67d07..41d5379 100644
--- a/base/base.gyp
+++ b/base/base.gyp
@@ -624,6 +624,7 @@
'threading/non_thread_safe_unittest.cc',
'threading/platform_thread_unittest.cc',
'threading/sequenced_worker_pool_unittest.cc',
+ 'threading/sequenced_task_runner_handle_unittest.cc',
'threading/simple_thread_unittest.cc',
'threading/thread_checker_unittest.cc',
'threading/thread_collision_warner_unittest.cc',
diff --git a/base/base.gypi b/base/base.gypi
index 6ec36dd..7e09e30 100644
--- a/base/base.gypi
+++ b/base/base.gypi
@@ -645,6 +645,8 @@
'threading/platform_thread_win.cc',
'threading/post_task_and_reply_impl.cc',
'threading/post_task_and_reply_impl.h',
+ 'threading/sequenced_task_runner_handle.cc',
+ 'threading/sequenced_task_runner_handle.h',
'threading/sequenced_worker_pool.cc',
'threading/sequenced_worker_pool.h',
'threading/simple_thread.cc',
diff --git a/base/thread_task_runner_handle.h b/base/thread_task_runner_handle.h
index 238435f..197669e 100644
--- a/base/thread_task_runner_handle.h
+++ b/base/thread_task_runner_handle.h
@@ -16,6 +16,7 @@ class SingleThreadTaskRunner;
// in thread-local storage. Callers can then retrieve the TaskRunner
// for the current thread by calling ThreadTaskRunnerHandle::Get().
// At most one TaskRunner may be bound to each thread at a time.
+// Prefer SequenceTaskRunnerHandle to this unless thread affinity is required.
class BASE_EXPORT ThreadTaskRunnerHandle {
public:
// Gets the SingleThreadTaskRunner for the current thread.
diff --git a/base/threading/sequenced_task_runner_handle.cc b/base/threading/sequenced_task_runner_handle.cc
new file mode 100644
index 0000000..a03642d
--- /dev/null
+++ b/base/threading/sequenced_task_runner_handle.cc
@@ -0,0 +1,38 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/threading/sequenced_task_runner_handle.h"
+
+#include "base/sequenced_task_runner.h"
+#include "base/thread_task_runner_handle.h"
+#include "base/threading/sequenced_worker_pool.h"
+
+namespace base {
+
+// static
+scoped_refptr<SequencedTaskRunner> SequencedTaskRunnerHandle::Get() {
+ // If we are on a worker thread for a SequencedBlockingPool that is running a
+ // sequenced task, return a SequencedTaskRunner for it.
+ scoped_refptr<base::SequencedWorkerPool> pool =
+ SequencedWorkerPool::GetWorkerPoolForCurrentThread();
+ if (pool) {
+ SequencedWorkerPool::SequenceToken sequence_token =
+ SequencedWorkerPool::GetSequenceTokenForCurrentThread();
+ DCHECK(sequence_token.IsValid());
+ DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
+ return pool->GetSequencedTaskRunner(sequence_token);
+ }
+
+ // Otherwise, return a SingleThreadTaskRunner for the current thread.
+ return base::ThreadTaskRunnerHandle::Get();
+}
+
+// static
+bool SequencedTaskRunnerHandle::IsSet() {
+ return (SequencedWorkerPool::GetWorkerPoolForCurrentThread() &&
+ SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()) ||
+ base::ThreadTaskRunnerHandle::IsSet();
+}
+
+} // namespace base
diff --git a/base/threading/sequenced_task_runner_handle.h b/base/threading/sequenced_task_runner_handle.h
new file mode 100644
index 0000000..c638d2f
--- /dev/null
+++ b/base/threading/sequenced_task_runner_handle.h
@@ -0,0 +1,36 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_
+#define BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_
+
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+
+namespace base {
+
+class SequencedTaskRunner;
+
+class BASE_EXPORT SequencedTaskRunnerHandle {
+ public:
+ // Returns a SequencedTaskRunner which guarantees that posted tasks will only
+ // run after the current task is finished and will satisfy a SequenceChecker.
+ // It should only be called if IsSet() returns true (see the comment there for
+ // the requirements).
+ static scoped_refptr<SequencedTaskRunner> Get();
+
+ // Returns true if one of the following conditions is fulfilled:
+ // a) The current thread has a ThreadTaskRunnerHandle (which includes any
+ // thread that has a MessageLoop associated with it), or
+ // b) The current thread is a worker thread belonging to a SequencedWorkerPool
+ // *and* is currently running a sequenced task.
+ static bool IsSet();
+
+ private:
+ DISALLOW_IMPLICIT_CONSTRUCTORS(SequencedTaskRunnerHandle);
+};
+
+} // namespace base
+
+#endif // BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_
diff --git a/base/threading/sequenced_task_runner_handle_unittest.cc b/base/threading/sequenced_task_runner_handle_unittest.cc
new file mode 100644
index 0000000..df7a117
--- /dev/null
+++ b/base/threading/sequenced_task_runner_handle_unittest.cc
@@ -0,0 +1,81 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/location.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "base/sequence_checker_impl.h"
+#include "base/sequenced_task_runner.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/sequenced_task_runner_handle.h"
+#include "base/threading/sequenced_worker_pool.h"
+#include "base/threading/simple_thread.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+namespace {
+
+class SequencedTaskRunnerHandleTest : public ::testing::Test {
+ protected:
+ static void GetTaskRunner(const Closure& callback) {
+ // Use SequenceCheckerImpl to make sure it's not a no-op in Release builds.
+ scoped_ptr<SequenceCheckerImpl> sequence_checker(new SequenceCheckerImpl);
+ ASSERT_TRUE(SequencedTaskRunnerHandle::IsSet());
+ scoped_refptr<SequencedTaskRunner> task_runner =
+ SequencedTaskRunnerHandle::Get();
+ ASSERT_TRUE(task_runner);
+ task_runner->PostTask(
+ FROM_HERE, base::Bind(&SequencedTaskRunnerHandleTest::CheckValidThread,
+ base::Passed(&sequence_checker), callback));
+ }
+
+ private:
+ static void CheckValidThread(scoped_ptr<SequenceCheckerImpl> sequence_checker,
+ const Closure& callback) {
+ EXPECT_TRUE(sequence_checker->CalledOnValidSequencedThread());
+ callback.Run();
+ }
+
+ MessageLoop message_loop_;
+};
+
+TEST_F(SequencedTaskRunnerHandleTest, FromMessageLoop) {
+ RunLoop run_loop;
+ GetTaskRunner(run_loop.QuitClosure());
+ run_loop.Run();
+}
+
+TEST_F(SequencedTaskRunnerHandleTest, FromSequencedWorkerPool) {
+ scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Test"));
+ WaitableEvent event(false, false);
+ pool->PostSequencedWorkerTask(
+ pool->GetSequenceToken(), FROM_HERE,
+ base::Bind(&SequencedTaskRunnerHandleTest::GetTaskRunner,
+ base::Bind(&WaitableEvent::Signal, base::Unretained(&event))));
+ event.Wait();
+}
+
+class ThreadRunner : public DelegateSimpleThread::Delegate {
+ public:
+ void Run() override {
+ ASSERT_FALSE(SequencedTaskRunnerHandle::IsSet());
+ }
+
+ private:
+ Closure callback_;
+};
+
+TEST_F(SequencedTaskRunnerHandleTest, FromSimpleThread) {
+ ThreadRunner thread_runner;
+ DelegateSimpleThread thread(&thread_runner, "Background thread");
+ thread.Start();
+ thread.Join();
+}
+
+} // namespace
+} // namespace base
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 54a6bc8..ed79897 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -219,10 +219,6 @@ uint64 GetTaskTraceID(const SequencedTask& task,
static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
}
-base::LazyInstance<base::ThreadLocalPointer<
- SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
- LAZY_INSTANCE_INITIALIZER;
-
} // namespace
// Worker ---------------------------------------------------------------------
@@ -239,6 +235,9 @@ class SequencedWorkerPool::Worker : public SimpleThread {
// SimpleThread implementation. This actually runs the background thread.
void Run() override;
+ // Gets the worker for the current thread out of thread-local storage.
+ static Worker* GetForCurrentThread();
+
// Indicates that a task is about to be run. The parameters provide
// additional metainformation about the task being run.
void set_running_task_info(SequenceToken token,
@@ -264,7 +263,14 @@ class SequencedWorkerPool::Worker : public SimpleThread {
return task_shutdown_behavior_;
}
+ scoped_refptr<SequencedWorkerPool> worker_pool() const {
+ return worker_pool_;
+ }
+
private:
+ static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
+ lazy_tls_ptr_;
+
scoped_refptr<SequencedWorkerPool> worker_pool_;
// The sequence token of the task being processed. Only valid when
// is_processing_task_ is true.
@@ -508,9 +514,10 @@ void SequencedWorkerPool::Worker::Run() {
win::ScopedCOMInitializer com_initializer;
#endif
- // Store a pointer to the running sequence in thread local storage for
- // static function access.
- g_lazy_tls_ptr.Get().Set(&task_sequence_token_);
+ // Store a pointer to this worker in thread local storage for static function
+ // access.
+ DCHECK(!lazy_tls_ptr_.Get().Get());
+ lazy_tls_ptr_.Get().Set(this);
// Just jump back to the Inner object to run the thread, since it has all the
// tracking information and queues. It might be more natural to implement
@@ -519,9 +526,23 @@ void SequencedWorkerPool::Worker::Run() {
// send thread-specific information easily to the thread loop.
worker_pool_->inner_->ThreadLoop(this);
// Release our cyclic reference once we're done.
- worker_pool_ = NULL;
+ worker_pool_ = nullptr;
+}
+
+// static
+SequencedWorkerPool::Worker*
+SequencedWorkerPool::Worker::GetForCurrentThread() {
+ // Don't construct lazy instance on check.
+ if (lazy_tls_ptr_ == nullptr)
+ return nullptr;
+
+ return lazy_tls_ptr_.Get().Get();
}
+// static
+LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
+ SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
+
// Inner definitions ---------------------------------------------------------
SequencedWorkerPool::Inner::Inner(
@@ -1145,17 +1166,28 @@ SequencedWorkerPool::Inner::g_last_sequence_number_;
// SequencedWorkerPool --------------------------------------------------------
+std::string SequencedWorkerPool::SequenceToken::ToString() const {
+ return base::StringPrintf("[%d]", id_);
+}
+
// static
SequencedWorkerPool::SequenceToken
SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
- // Don't construct lazy instance on check.
- if (g_lazy_tls_ptr == NULL)
+ Worker* worker = Worker::GetForCurrentThread();
+ if (!worker)
return SequenceToken();
- SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
- if (!token)
- return SequenceToken();
- return *token;
+ return worker->task_sequence_token();
+}
+
+// static
+scoped_refptr<SequencedWorkerPool>
+SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
+ Worker* worker = Worker::GetForCurrentThread();
+ if (!worker)
+ return nullptr;
+
+ return worker->worker_pool();
}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h
index ee282bc..9369fb7 100644
--- a/base/threading/sequenced_worker_pool.h
+++ b/base/threading/sequenced_worker_pool.h
@@ -121,7 +121,7 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// Opaque identifier that defines sequencing of tasks posted to the worker
// pool.
- class SequenceToken {
+ class BASE_EXPORT SequenceToken {
public:
SequenceToken() : id_(0) {}
~SequenceToken() {}
@@ -135,6 +135,10 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
return id_ != 0;
}
+ // Returns a string representation of this token. This method should only be
+ // used for debugging.
+ std::string ToString() const;
+
private:
friend class SequencedWorkerPool;
@@ -157,17 +161,21 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// an unsequenced task, returns an invalid SequenceToken.
static SequenceToken GetSequenceTokenForCurrentThread();
+ // Returns the SequencedWorkerPool that owns this thread, or null if the
+ // current thread is not a SequencedWorkerPool worker thread.
+ static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
+
// When constructing a SequencedWorkerPool, there must be a
- // MessageLoop on the current thread unless you plan to deliberately
- // leak it.
+ // ThreadTaskRunnerHandle on the current thread unless you plan to
+ // deliberately leak it.
// Pass the maximum number of threads (they will be lazily created as needed)
// and a prefix for the thread name to aid in debugging.
SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix);
- // Like above, but with |observer| for testing. Does not take
- // ownership of |observer|.
+ // Like above, but with |observer| for testing. Does not take ownership of
+ // |observer|.
SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
TestingObserver* observer);
diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc
index bf82b11..5812ee7 100644
--- a/base/threading/sequenced_worker_pool_unittest.cc
+++ b/base/threading/sequenced_worker_pool_unittest.cc
@@ -904,6 +904,49 @@ TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
pool()->FlushForTesting();
}
+namespace {
+
+void CheckWorkerPoolAndSequenceToken(
+ const scoped_refptr<SequencedWorkerPool>& expected_pool,
+ SequencedWorkerPool::SequenceToken expected_token) {
+ SequencedWorkerPool::SequenceToken token =
+ SequencedWorkerPool::GetSequenceTokenForCurrentThread();
+ EXPECT_EQ(expected_token.ToString(), token.ToString());
+
+ scoped_refptr<SequencedWorkerPool> pool =
+ SequencedWorkerPool::GetWorkerPoolForCurrentThread();
+ EXPECT_EQ(expected_pool, pool);
+}
+
+} // namespace
+
+TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) {
+ EnsureAllWorkersCreated();
+
+ // The current thread should have neither a worker pool nor a sequence token.
+ SequencedWorkerPool::SequenceToken local_token =
+ SequencedWorkerPool::GetSequenceTokenForCurrentThread();
+ scoped_refptr<SequencedWorkerPool> local_pool =
+ SequencedWorkerPool::GetWorkerPoolForCurrentThread();
+ EXPECT_FALSE(local_token.IsValid()) << local_token.ToString();
+ EXPECT_FALSE(local_pool);
+
+ SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
+ SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
+ pool()->PostSequencedWorkerTask(
+ token1, FROM_HERE,
+ base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token1));
+ pool()->PostSequencedWorkerTask(
+ token2, FROM_HERE,
+ base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token2));
+
+ pool()->PostWorkerTask(FROM_HERE,
+ base::Bind(&CheckWorkerPoolAndSequenceToken, pool(),
+ SequencedWorkerPool::SequenceToken()));
+
+ pool()->FlushForTesting();
+}
+
TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
MessageLoop loop;
scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));