diff options
author | bauerb <bauerb@chromium.org> | 2015-10-30 02:05:31 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-10-30 09:06:08 +0000 |
commit | 8a4087226c482fadb886585d97081adf379e6b68 (patch) | |
tree | 1e21a013ccfa1b435c1b33c5e2993357b02bab66 /base | |
parent | fff0ba0a41392542d3b6238ab928f96e1ddad7b0 (diff) | |
download | chromium_src-8a4087226c482fadb886585d97081adf379e6b68.zip chromium_src-8a4087226c482fadb886585d97081adf379e6b68.tar.gz chromium_src-8a4087226c482fadb886585d97081adf379e6b68.tar.bz2 |
Add SequencedTaskRunnerHandle to get a SequencedTaskRunner for the current thread / sequence.
BUG=546596
Review URL: https://codereview.chromium.org/1423773003
Cr-Commit-Position: refs/heads/master@{#357067}
Diffstat (limited to 'base')
-rw-r--r-- | base/BUILD.gn | 3 | ||||
-rw-r--r-- | base/base.gyp | 1 | ||||
-rw-r--r-- | base/base.gypi | 2 | ||||
-rw-r--r-- | base/thread_task_runner_handle.h | 1 | ||||
-rw-r--r-- | base/threading/sequenced_task_runner_handle.cc | 38 | ||||
-rw-r--r-- | base/threading/sequenced_task_runner_handle.h | 36 | ||||
-rw-r--r-- | base/threading/sequenced_task_runner_handle_unittest.cc | 81 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.cc | 60 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool.h | 18 | ||||
-rw-r--r-- | base/threading/sequenced_worker_pool_unittest.cc | 43 |
10 files changed, 264 insertions, 19 deletions
diff --git a/base/BUILD.gn b/base/BUILD.gn index efc3754..bb8bd55 100644 --- a/base/BUILD.gn +++ b/base/BUILD.gn @@ -547,6 +547,8 @@ component("base") { "threading/platform_thread_win.cc", "threading/post_task_and_reply_impl.cc", "threading/post_task_and_reply_impl.h", + "threading/sequenced_task_runner_handle.cc", + "threading/sequenced_task_runner_handle.h", "threading/sequenced_worker_pool.cc", "threading/sequenced_worker_pool.h", "threading/simple_thread.cc", @@ -1435,6 +1437,7 @@ test("base_unittests") { "test/user_action_tester_unittest.cc", "threading/non_thread_safe_unittest.cc", "threading/platform_thread_unittest.cc", + "threading/sequenced_task_runner_handle_unittest.cc", "threading/sequenced_worker_pool_unittest.cc", "threading/simple_thread_unittest.cc", "threading/thread_checker_unittest.cc", diff --git a/base/base.gyp b/base/base.gyp index 2b67d07..41d5379 100644 --- a/base/base.gyp +++ b/base/base.gyp @@ -624,6 +624,7 @@ 'threading/non_thread_safe_unittest.cc', 'threading/platform_thread_unittest.cc', 'threading/sequenced_worker_pool_unittest.cc', + 'threading/sequenced_task_runner_handle_unittest.cc', 'threading/simple_thread_unittest.cc', 'threading/thread_checker_unittest.cc', 'threading/thread_collision_warner_unittest.cc', diff --git a/base/base.gypi b/base/base.gypi index 6ec36dd..7e09e30 100644 --- a/base/base.gypi +++ b/base/base.gypi @@ -645,6 +645,8 @@ 'threading/platform_thread_win.cc', 'threading/post_task_and_reply_impl.cc', 'threading/post_task_and_reply_impl.h', + 'threading/sequenced_task_runner_handle.cc', + 'threading/sequenced_task_runner_handle.h', 'threading/sequenced_worker_pool.cc', 'threading/sequenced_worker_pool.h', 'threading/simple_thread.cc', diff --git a/base/thread_task_runner_handle.h b/base/thread_task_runner_handle.h index 238435f..197669e 100644 --- a/base/thread_task_runner_handle.h +++ b/base/thread_task_runner_handle.h @@ -16,6 +16,7 @@ class SingleThreadTaskRunner; // in thread-local storage. Callers can then retrieve the TaskRunner // for the current thread by calling ThreadTaskRunnerHandle::Get(). // At most one TaskRunner may be bound to each thread at a time. +// Prefer SequenceTaskRunnerHandle to this unless thread affinity is required. class BASE_EXPORT ThreadTaskRunnerHandle { public: // Gets the SingleThreadTaskRunner for the current thread. diff --git a/base/threading/sequenced_task_runner_handle.cc b/base/threading/sequenced_task_runner_handle.cc new file mode 100644 index 0000000..a03642d --- /dev/null +++ b/base/threading/sequenced_task_runner_handle.cc @@ -0,0 +1,38 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/sequenced_task_runner_handle.h" + +#include "base/sequenced_task_runner.h" +#include "base/thread_task_runner_handle.h" +#include "base/threading/sequenced_worker_pool.h" + +namespace base { + +// static +scoped_refptr<SequencedTaskRunner> SequencedTaskRunnerHandle::Get() { + // If we are on a worker thread for a SequencedBlockingPool that is running a + // sequenced task, return a SequencedTaskRunner for it. + scoped_refptr<base::SequencedWorkerPool> pool = + SequencedWorkerPool::GetWorkerPoolForCurrentThread(); + if (pool) { + SequencedWorkerPool::SequenceToken sequence_token = + SequencedWorkerPool::GetSequenceTokenForCurrentThread(); + DCHECK(sequence_token.IsValid()); + DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); + return pool->GetSequencedTaskRunner(sequence_token); + } + + // Otherwise, return a SingleThreadTaskRunner for the current thread. + return base::ThreadTaskRunnerHandle::Get(); +} + +// static +bool SequencedTaskRunnerHandle::IsSet() { + return (SequencedWorkerPool::GetWorkerPoolForCurrentThread() && + SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()) || + base::ThreadTaskRunnerHandle::IsSet(); +} + +} // namespace base diff --git a/base/threading/sequenced_task_runner_handle.h b/base/threading/sequenced_task_runner_handle.h new file mode 100644 index 0000000..c638d2f --- /dev/null +++ b/base/threading/sequenced_task_runner_handle.h @@ -0,0 +1,36 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_ +#define BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_ + +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" + +namespace base { + +class SequencedTaskRunner; + +class BASE_EXPORT SequencedTaskRunnerHandle { + public: + // Returns a SequencedTaskRunner which guarantees that posted tasks will only + // run after the current task is finished and will satisfy a SequenceChecker. + // It should only be called if IsSet() returns true (see the comment there for + // the requirements). + static scoped_refptr<SequencedTaskRunner> Get(); + + // Returns true if one of the following conditions is fulfilled: + // a) The current thread has a ThreadTaskRunnerHandle (which includes any + // thread that has a MessageLoop associated with it), or + // b) The current thread is a worker thread belonging to a SequencedWorkerPool + // *and* is currently running a sequenced task. + static bool IsSet(); + + private: + DISALLOW_IMPLICIT_CONSTRUCTORS(SequencedTaskRunnerHandle); +}; + +} // namespace base + +#endif // BASE_THREADING_SEQUENCED_TASK_RUNNER_HANDLE_H_ diff --git a/base/threading/sequenced_task_runner_handle_unittest.cc b/base/threading/sequenced_task_runner_handle_unittest.cc new file mode 100644 index 0000000..df7a117 --- /dev/null +++ b/base/threading/sequenced_task_runner_handle_unittest.cc @@ -0,0 +1,81 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/bind.h" +#include "base/callback.h" +#include "base/location.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "base/sequence_checker_impl.h" +#include "base/sequenced_task_runner.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/sequenced_task_runner_handle.h" +#include "base/threading/sequenced_worker_pool.h" +#include "base/threading/simple_thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { +namespace { + +class SequencedTaskRunnerHandleTest : public ::testing::Test { + protected: + static void GetTaskRunner(const Closure& callback) { + // Use SequenceCheckerImpl to make sure it's not a no-op in Release builds. + scoped_ptr<SequenceCheckerImpl> sequence_checker(new SequenceCheckerImpl); + ASSERT_TRUE(SequencedTaskRunnerHandle::IsSet()); + scoped_refptr<SequencedTaskRunner> task_runner = + SequencedTaskRunnerHandle::Get(); + ASSERT_TRUE(task_runner); + task_runner->PostTask( + FROM_HERE, base::Bind(&SequencedTaskRunnerHandleTest::CheckValidThread, + base::Passed(&sequence_checker), callback)); + } + + private: + static void CheckValidThread(scoped_ptr<SequenceCheckerImpl> sequence_checker, + const Closure& callback) { + EXPECT_TRUE(sequence_checker->CalledOnValidSequencedThread()); + callback.Run(); + } + + MessageLoop message_loop_; +}; + +TEST_F(SequencedTaskRunnerHandleTest, FromMessageLoop) { + RunLoop run_loop; + GetTaskRunner(run_loop.QuitClosure()); + run_loop.Run(); +} + +TEST_F(SequencedTaskRunnerHandleTest, FromSequencedWorkerPool) { + scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Test")); + WaitableEvent event(false, false); + pool->PostSequencedWorkerTask( + pool->GetSequenceToken(), FROM_HERE, + base::Bind(&SequencedTaskRunnerHandleTest::GetTaskRunner, + base::Bind(&WaitableEvent::Signal, base::Unretained(&event)))); + event.Wait(); +} + +class ThreadRunner : public DelegateSimpleThread::Delegate { + public: + void Run() override { + ASSERT_FALSE(SequencedTaskRunnerHandle::IsSet()); + } + + private: + Closure callback_; +}; + +TEST_F(SequencedTaskRunnerHandleTest, FromSimpleThread) { + ThreadRunner thread_runner; + DelegateSimpleThread thread(&thread_runner, "Background thread"); + thread.Start(); + thread.Join(); +} + +} // namespace +} // namespace base diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 54a6bc8..ed79897 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -219,10 +219,6 @@ uint64 GetTaskTraceID(const SequencedTask& task, static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); } -base::LazyInstance<base::ThreadLocalPointer< - SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = - LAZY_INSTANCE_INITIALIZER; - } // namespace // Worker --------------------------------------------------------------------- @@ -239,6 +235,9 @@ class SequencedWorkerPool::Worker : public SimpleThread { // SimpleThread implementation. This actually runs the background thread. void Run() override; + // Gets the worker for the current thread out of thread-local storage. + static Worker* GetForCurrentThread(); + // Indicates that a task is about to be run. The parameters provide // additional metainformation about the task being run. void set_running_task_info(SequenceToken token, @@ -264,7 +263,14 @@ class SequencedWorkerPool::Worker : public SimpleThread { return task_shutdown_behavior_; } + scoped_refptr<SequencedWorkerPool> worker_pool() const { + return worker_pool_; + } + private: + static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky + lazy_tls_ptr_; + scoped_refptr<SequencedWorkerPool> worker_pool_; // The sequence token of the task being processed. Only valid when // is_processing_task_ is true. @@ -508,9 +514,10 @@ void SequencedWorkerPool::Worker::Run() { win::ScopedCOMInitializer com_initializer; #endif - // Store a pointer to the running sequence in thread local storage for - // static function access. - g_lazy_tls_ptr.Get().Set(&task_sequence_token_); + // Store a pointer to this worker in thread local storage for static function + // access. + DCHECK(!lazy_tls_ptr_.Get().Get()); + lazy_tls_ptr_.Get().Set(this); // Just jump back to the Inner object to run the thread, since it has all the // tracking information and queues. It might be more natural to implement @@ -519,9 +526,23 @@ void SequencedWorkerPool::Worker::Run() { // send thread-specific information easily to the thread loop. worker_pool_->inner_->ThreadLoop(this); // Release our cyclic reference once we're done. - worker_pool_ = NULL; + worker_pool_ = nullptr; +} + +// static +SequencedWorkerPool::Worker* +SequencedWorkerPool::Worker::GetForCurrentThread() { + // Don't construct lazy instance on check. + if (lazy_tls_ptr_ == nullptr) + return nullptr; + + return lazy_tls_ptr_.Get().Get(); } +// static +LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky + SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER; + // Inner definitions --------------------------------------------------------- SequencedWorkerPool::Inner::Inner( @@ -1145,17 +1166,28 @@ SequencedWorkerPool::Inner::g_last_sequence_number_; // SequencedWorkerPool -------------------------------------------------------- +std::string SequencedWorkerPool::SequenceToken::ToString() const { + return base::StringPrintf("[%d]", id_); +} + // static SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceTokenForCurrentThread() { - // Don't construct lazy instance on check. - if (g_lazy_tls_ptr == NULL) + Worker* worker = Worker::GetForCurrentThread(); + if (!worker) return SequenceToken(); - SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); - if (!token) - return SequenceToken(); - return *token; + return worker->task_sequence_token(); +} + +// static +scoped_refptr<SequencedWorkerPool> +SequencedWorkerPool::GetWorkerPoolForCurrentThread() { + Worker* worker = Worker::GetForCurrentThread(); + if (!worker) + return nullptr; + + return worker->worker_pool(); } SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index ee282bc..9369fb7 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -121,7 +121,7 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // Opaque identifier that defines sequencing of tasks posted to the worker // pool. - class SequenceToken { + class BASE_EXPORT SequenceToken { public: SequenceToken() : id_(0) {} ~SequenceToken() {} @@ -135,6 +135,10 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { return id_ != 0; } + // Returns a string representation of this token. This method should only be + // used for debugging. + std::string ToString() const; + private: friend class SequencedWorkerPool; @@ -157,17 +161,21 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // an unsequenced task, returns an invalid SequenceToken. static SequenceToken GetSequenceTokenForCurrentThread(); + // Returns the SequencedWorkerPool that owns this thread, or null if the + // current thread is not a SequencedWorkerPool worker thread. + static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread(); + // When constructing a SequencedWorkerPool, there must be a - // MessageLoop on the current thread unless you plan to deliberately - // leak it. + // ThreadTaskRunnerHandle on the current thread unless you plan to + // deliberately leak it. // Pass the maximum number of threads (they will be lazily created as needed) // and a prefix for the thread name to aid in debugging. SequencedWorkerPool(size_t max_threads, const std::string& thread_name_prefix); - // Like above, but with |observer| for testing. Does not take - // ownership of |observer|. + // Like above, but with |observer| for testing. Does not take ownership of + // |observer|. SequencedWorkerPool(size_t max_threads, const std::string& thread_name_prefix, TestingObserver* observer); diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc index bf82b11..5812ee7 100644 --- a/base/threading/sequenced_worker_pool_unittest.cc +++ b/base/threading/sequenced_worker_pool_unittest.cc @@ -904,6 +904,49 @@ TEST_F(SequencedWorkerPoolTest, FlushForTesting) { pool()->FlushForTesting(); } +namespace { + +void CheckWorkerPoolAndSequenceToken( + const scoped_refptr<SequencedWorkerPool>& expected_pool, + SequencedWorkerPool::SequenceToken expected_token) { + SequencedWorkerPool::SequenceToken token = + SequencedWorkerPool::GetSequenceTokenForCurrentThread(); + EXPECT_EQ(expected_token.ToString(), token.ToString()); + + scoped_refptr<SequencedWorkerPool> pool = + SequencedWorkerPool::GetWorkerPoolForCurrentThread(); + EXPECT_EQ(expected_pool, pool); +} + +} // namespace + +TEST_F(SequencedWorkerPoolTest, GetWorkerPoolAndSequenceTokenForCurrentThread) { + EnsureAllWorkersCreated(); + + // The current thread should have neither a worker pool nor a sequence token. + SequencedWorkerPool::SequenceToken local_token = + SequencedWorkerPool::GetSequenceTokenForCurrentThread(); + scoped_refptr<SequencedWorkerPool> local_pool = + SequencedWorkerPool::GetWorkerPoolForCurrentThread(); + EXPECT_FALSE(local_token.IsValid()) << local_token.ToString(); + EXPECT_FALSE(local_pool); + + SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); + SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); + pool()->PostSequencedWorkerTask( + token1, FROM_HERE, + base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token1)); + pool()->PostSequencedWorkerTask( + token2, FROM_HERE, + base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), token2)); + + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&CheckWorkerPoolAndSequenceToken, pool(), + SequencedWorkerPool::SequenceToken())); + + pool()->FlushForTesting(); +} + TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) { MessageLoop loop; scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool")); |