diff options
author | kinuko <kinuko@chromium.org> | 2015-05-23 04:38:37 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-05-23 11:39:08 +0000 |
commit | 7f68f8735355c1c73557c3cfb294b441901fc31d (patch) | |
tree | 85ae14ede9dc2d3df2d88d38ddeab43b7e4fb42e | |
parent | b955d50ef7c0598baecc34ec7c4247c95e7819e5 (diff) | |
download | chromium_src-7f68f8735355c1c73557c3cfb294b441901fc31d.zip chromium_src-7f68f8735355c1c73557c3cfb294b441901fc31d.tar.gz chromium_src-7f68f8735355c1c73557c3cfb294b441901fc31d.tar.bz2 |
Reland (3rd try): Lazily initialize MessageLoop for faster thread startup
Original review: https://codereview.chromium.org/1011683002/
2nd try: https://codereview.chromium.org/1129953004/
2nd try reverted due to race reports on Linux:
https://crbug.com/489263 Data races on valid_thread_id_ after r330329
This fixes:
- Race in MessageLoopProxyImpl by introducing lock
- Race in BrowserMainLoop/BrowserThreadImpl, where BrowserThread::CurrentlyOn()
called on one of BrowserThreads tries to touch other thread's message_loop()
via global thread table.
Reg: the latter race, the code flow that causes this race is like following:
// On the main thread, we create all known browser threads:
for (...) {
{
AutoLock lock(g_lock);
g_threads[id] = new BrowserProcessSubThread();
}
// [A] This initializes the thread's message_loop, which causes a race
// against [B] in the new code because new threads can start running
// immediately.
thread->StartWithOptions();
}
// On the new thread's main function, it calls CurrentlyOn() which does:
{
// [B] This touches other thread's Thread::message_loop.
AutoLock lock(g_lock);
return g_threads[other_thread_id] &&
g_threads[other_thread_id]->message_loop() == MessageLoop::current();
}
This was safe before because both message_loop initialization and the first
call to CurrentlyOn() on the new thread was done synchronously in
StartWithOptions() while the main thread was blocked. In the new code
new threads can start accessing message_loop() asynchronously while
the main thread's for loop is running.
PS1 is the original patch (2nd try) that got reverted.
BUG=465458, 489263
Review URL: https://codereview.chromium.org/1131513007
Cr-Commit-Position: refs/heads/master@{#331235}
25 files changed, 300 insertions, 181 deletions
diff --git a/base/message_loop/incoming_task_queue.cc b/base/message_loop/incoming_task_queue.cc index c1ce939..5e9a461 100644 --- a/base/message_loop/incoming_task_queue.cc +++ b/base/message_loop/incoming_task_queue.cc @@ -44,7 +44,8 @@ IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop) message_loop_(message_loop), next_sequence_num_(0), message_loop_scheduled_(false), - always_schedule_work_(AlwaysNotifyPump(message_loop_->type())) { + always_schedule_work_(AlwaysNotifyPump(message_loop_->type())), + is_ready_for_scheduling_(false) { } bool IncomingTaskQueue::AddToIncomingQueue( @@ -109,6 +110,15 @@ void IncomingTaskQueue::WillDestroyCurrentMessageLoop() { message_loop_ = NULL; } +void IncomingTaskQueue::StartScheduling() { + AutoLock lock(incoming_queue_lock_); + DCHECK(!is_ready_for_scheduling_); + DCHECK(!message_loop_scheduled_); + is_ready_for_scheduling_ = true; + if (!incoming_queue_.empty()) + ScheduleWork(); +} + IncomingTaskQueue::~IncomingTaskQueue() { // Verify that WillDestroyCurrentMessageLoop() has been called. DCHECK(!message_loop_); @@ -148,19 +158,25 @@ bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) { incoming_queue_.push(*pending_task); pending_task->task.Reset(); - if (always_schedule_work_ || (!message_loop_scheduled_ && was_empty)) { - // Wake up the message loop. - message_loop_->ScheduleWork(); - // After we've scheduled the message loop, we do not need to do so again - // until we know it has processed all of the work in our queue and is - // waiting for more work again. The message loop will always attempt to - // reload from the incoming queue before waiting again so we clear this flag - // in ReloadWorkQueue(). - message_loop_scheduled_ = true; + if (is_ready_for_scheduling_ && + (always_schedule_work_ || (!message_loop_scheduled_ && was_empty))) { + ScheduleWork(); } return true; } +void IncomingTaskQueue::ScheduleWork() { + DCHECK(is_ready_for_scheduling_); + // Wake up the message loop. + message_loop_->ScheduleWork(); + // After we've scheduled the message loop, we do not need to do so again + // until we know it has processed all of the work in our queue and is + // waiting for more work again. The message loop will always attempt to + // reload from the incoming queue before waiting again so we clear this flag + // in ReloadWorkQueue(). + message_loop_scheduled_ = true; +} + } // namespace internal } // namespace base diff --git a/base/message_loop/incoming_task_queue.h b/base/message_loop/incoming_task_queue.h index 72e1f30..7dd1e82 100644 --- a/base/message_loop/incoming_task_queue.h +++ b/base/message_loop/incoming_task_queue.h @@ -53,6 +53,10 @@ class BASE_EXPORT IncomingTaskQueue // Disconnects |this| from the parent message loop. void WillDestroyCurrentMessageLoop(); + // This should be called when the message loop becomes ready for + // scheduling work. + void StartScheduling(); + private: friend class RefCountedThreadSafe<IncomingTaskQueue>; virtual ~IncomingTaskQueue(); @@ -66,6 +70,9 @@ class BASE_EXPORT IncomingTaskQueue // does not retain |pending_task->task| beyond this function call. bool PostPendingTask(PendingTask* pending_task); + // Wakes up the message loop and schedules work. + void ScheduleWork(); + // Number of tasks that require high resolution timing. This value is kept // so that ReloadWorkQueue() completes in constant time. int high_res_task_count_; @@ -92,6 +99,9 @@ class BASE_EXPORT IncomingTaskQueue // if the incoming queue was not empty. const bool always_schedule_work_; + // False until StartScheduling() is called. + bool is_ready_for_scheduling_; + DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue); }; diff --git a/base/message_loop/message_loop.cc b/base/message_loop/message_loop.cc index eb0f968..4222c77 100644 --- a/base/message_loop/message_loop.cc +++ b/base/message_loop/message_loop.cc @@ -100,6 +100,10 @@ MessagePumpForIO* ToPumpIO(MessagePump* pump) { } #endif // !defined(OS_NACL_SFI) +scoped_ptr<MessagePump> ReturnPump(scoped_ptr<MessagePump> pump) { + return pump; +} + } // namespace //------------------------------------------------------------------------------ @@ -116,41 +120,19 @@ MessageLoop::DestructionObserver::~DestructionObserver() { //------------------------------------------------------------------------------ MessageLoop::MessageLoop(Type type) - : type_(type), -#if defined(OS_WIN) - pending_high_res_tasks_(0), - in_high_res_mode_(false), -#endif - nestable_tasks_allowed_(true), -#if defined(OS_WIN) - os_modal_loop_(false), -#endif // OS_WIN - message_histogram_(NULL), - run_loop_(NULL) { - Init(); - - pump_ = CreateMessagePumpForType(type).Pass(); + : MessageLoop(type, MessagePumpFactoryCallback()) { + BindToCurrentThread(); } MessageLoop::MessageLoop(scoped_ptr<MessagePump> pump) - : pump_(pump.Pass()), - type_(TYPE_CUSTOM), -#if defined(OS_WIN) - pending_high_res_tasks_(0), - in_high_res_mode_(false), -#endif - nestable_tasks_allowed_(true), -#if defined(OS_WIN) - os_modal_loop_(false), -#endif // OS_WIN - message_histogram_(NULL), - run_loop_(NULL) { - DCHECK(pump_.get()); - Init(); + : MessageLoop(TYPE_CUSTOM, Bind(&ReturnPump, Passed(&pump))) { + BindToCurrentThread(); } MessageLoop::~MessageLoop() { - DCHECK_EQ(this, current()); + // current() could be NULL if this message loop is destructed before it is + // bound to a thread. + DCHECK(current() == this || !current()); // iOS just attaches to the loop, it doesn't Run it. // TODO(stuartmorgan): Consider wiring up a Detach(). @@ -299,11 +281,13 @@ void MessageLoop::PostNonNestableDelayedTask( } void MessageLoop::Run() { + DCHECK(pump_); RunLoop run_loop; run_loop.Run(); } void MessageLoop::RunUntilIdle() { + DCHECK(pump_); RunLoop run_loop; run_loop.RunUntilIdle(); } @@ -383,13 +367,43 @@ bool MessageLoop::IsIdleForTesting() { //------------------------------------------------------------------------------ -void MessageLoop::Init() { +scoped_ptr<MessageLoop> MessageLoop::CreateUnbound( + Type type, MessagePumpFactoryCallback pump_factory) { + return make_scoped_ptr(new MessageLoop(type, pump_factory)); +} + +MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory) + : type_(type), +#if defined(OS_WIN) + pending_high_res_tasks_(0), + in_high_res_mode_(false), +#endif + nestable_tasks_allowed_(true), +#if defined(OS_WIN) + os_modal_loop_(false), +#endif // OS_WIN + pump_factory_(pump_factory), + message_histogram_(NULL), + run_loop_(NULL), + incoming_task_queue_(new internal::IncomingTaskQueue(this)), + message_loop_proxy_( + new internal::MessageLoopProxyImpl(incoming_task_queue_)) { + // If type is TYPE_CUSTOM non-null pump_factory must be given. + DCHECK_EQ(type_ == TYPE_CUSTOM, !pump_factory_.is_null()); +} + +void MessageLoop::BindToCurrentThread() { + DCHECK(!pump_); + if (!pump_factory_.is_null()) + pump_ = pump_factory_.Run(); + else + pump_ = CreateMessagePumpForType(type_); + DCHECK(!current()) << "should only have one message loop per thread"; lazy_tls_ptr.Pointer()->Set(this); - incoming_task_queue_ = new internal::IncomingTaskQueue(this); - message_loop_proxy_ = - new internal::MessageLoopProxyImpl(incoming_task_queue_); + incoming_task_queue_->StartScheduling(); + message_loop_proxy_->BindToCurrentThread(); thread_task_runner_handle_.reset( new ThreadTaskRunnerHandle(message_loop_proxy_)); } diff --git a/base/message_loop/message_loop.h b/base/message_loop/message_loop.h index fd7596a..f2f89d0 100644 --- a/base/message_loop/message_loop.h +++ b/base/message_loop/message_loop.h @@ -114,7 +114,8 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { explicit MessageLoop(Type type = TYPE_DEFAULT); // Creates a TYPE_CUSTOM MessageLoop with the supplied MessagePump, which must // be non-NULL. - explicit MessageLoop(scoped_ptr<base::MessagePump> pump); + explicit MessageLoop(scoped_ptr<MessagePump> pump); + ~MessageLoop() override; // Returns the MessageLoop object for the current thread, or null if none. @@ -394,10 +395,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { // Returns true if the message loop is "idle". Provided for testing. bool IsIdleForTesting(); - // Wakes up the message pump. Can be called on any thread. The caller is - // responsible for synchronizing ScheduleWork() calls. - void ScheduleWork(); - // Returns the TaskAnnotator which is used to add debug information to posted // tasks. debug::TaskAnnotator* task_annotator() { return &task_annotator_; } @@ -411,9 +408,33 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { private: friend class RunLoop; + friend class internal::IncomingTaskQueue; + friend class ScheduleWorkTest; + friend class Thread; + + using MessagePumpFactoryCallback = Callback<scoped_ptr<MessagePump>()>; - // Configures various members for the two constructors. - void Init(); + // Creates a MessageLoop without binding to a thread. + // If |type| is TYPE_CUSTOM non-null |pump_factory| must be also given + // to create a message pump for this message loop. Otherwise a default + // message pump for the |type| is created. + // + // It is valid to call this to create a new message loop on one thread, + // and then pass it to the thread where the message loop actually runs. + // The message loop's BindToCurrentThread() method must be called on the + // thread the message loop runs on, before calling Run(). + // Before BindToCurrentThread() is called only Post*Task() functions can + // be called on the message loop. + scoped_ptr<MessageLoop> CreateUnbound( + Type type, + MessagePumpFactoryCallback pump_factory); + + // Common private constructor. Other constructors delegate the initialization + // to this constructor. + MessageLoop(Type type, MessagePumpFactoryCallback pump_factory); + + // Configure various members and bind this message loop to the current thread. + void BindToCurrentThread(); // Invokes the actual run loop using the message pump. void RunHandler(); @@ -437,6 +458,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { // empty. void ReloadWorkQueue(); + // Wakes up the message pump. Can be called on any thread. The caller is + // responsible for synchronizing ScheduleWork() calls. + void ScheduleWork(); + // Start recording histogram info about events and action IF it was enabled // and IF the statistics recorder can accept a registration of our histogram. void StartHistogrammer(); @@ -490,6 +515,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate { bool os_modal_loop_; #endif + // pump_factory_.Run() is called to create a message pump for this loop + // if type_ is TYPE_CUSTOM and pump_ is null. + MessagePumpFactoryCallback pump_factory_; + std::string thread_name_; // A profiling histogram showing the counts of various messages and events. HistogramBase* message_histogram_; diff --git a/base/message_loop/message_loop_proxy_impl.cc b/base/message_loop/message_loop_proxy_impl.cc index b7abca3..580620d 100644 --- a/base/message_loop/message_loop_proxy_impl.cc +++ b/base/message_loop/message_loop_proxy_impl.cc @@ -15,7 +15,13 @@ namespace internal { MessageLoopProxyImpl::MessageLoopProxyImpl( scoped_refptr<IncomingTaskQueue> incoming_queue) : incoming_queue_(incoming_queue), - valid_thread_id_(PlatformThread::CurrentId()) { + valid_thread_id_(kInvalidThreadId) { +} + +void MessageLoopProxyImpl::BindToCurrentThread() { + AutoLock lock(valid_thread_id_lock_); + DCHECK_EQ(kInvalidThreadId, valid_thread_id_); + valid_thread_id_ = PlatformThread::CurrentId(); } bool MessageLoopProxyImpl::PostDelayedTask( @@ -35,6 +41,7 @@ bool MessageLoopProxyImpl::PostNonNestableDelayedTask( } bool MessageLoopProxyImpl::RunsTasksOnCurrentThread() const { + AutoLock lock(valid_thread_id_lock_); return valid_thread_id_ == PlatformThread::CurrentId(); } diff --git a/base/message_loop/message_loop_proxy_impl.h b/base/message_loop/message_loop_proxy_impl.h index 0fe629f..fa611c2 100644 --- a/base/message_loop/message_loop_proxy_impl.h +++ b/base/message_loop/message_loop_proxy_impl.h @@ -9,6 +9,7 @@ #include "base/memory/ref_counted.h" #include "base/message_loop/message_loop_proxy.h" #include "base/pending_task.h" +#include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" namespace base { @@ -24,6 +25,9 @@ class BASE_EXPORT MessageLoopProxyImpl : public MessageLoopProxy { explicit MessageLoopProxyImpl( scoped_refptr<IncomingTaskQueue> incoming_queue); + // Initialize this message loop proxy on the current thread. + void BindToCurrentThread(); + // MessageLoopProxy implementation bool PostDelayedTask(const tracked_objects::Location& from_here, const base::Closure& task, @@ -40,8 +44,10 @@ class BASE_EXPORT MessageLoopProxyImpl : public MessageLoopProxy { // THe incoming queue receiving all posted tasks. scoped_refptr<IncomingTaskQueue> incoming_queue_; - // ID of the thread |this| was created on. + // ID of the thread |this| was created on. Could be accessed on multiple + // threads, protected by |valid_thread_id_lock_|. PlatformThreadId valid_thread_id_; + mutable Lock valid_thread_id_lock_; DISALLOW_COPY_AND_ASSIGN(MessageLoopProxyImpl); }; diff --git a/base/message_loop/message_pump_perftest.cc b/base/message_loop/message_pump_perftest.cc index b3e5604..0bf3d0c 100644 --- a/base/message_loop/message_pump_perftest.cc +++ b/base/message_loop/message_pump_perftest.cc @@ -20,7 +20,6 @@ #endif namespace base { -namespace { class ScheduleWorkTest : public testing::Test { public: @@ -224,9 +223,6 @@ TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromFourThreads) { } #endif -static void DoNothing() { -} - class FakeMessagePump : public MessagePump { public: FakeMessagePump() {} @@ -289,5 +285,4 @@ TEST_F(PostTaskTest, OneHundredTasksPerReload) { Run(1000, 100); } -} // namespace } // namespace base diff --git a/base/threading/platform_thread.h b/base/threading/platform_thread.h index d8f06e5..69a2b0d 100644 --- a/base/threading/platform_thread.h +++ b/base/threading/platform_thread.h @@ -89,6 +89,10 @@ class PlatformThreadHandle { id_(id) { } + PlatformThreadId id() const { + return id_; + } + bool is_equal(const PlatformThreadHandle& other) const { return handle_ == other.handle_; } diff --git a/base/threading/platform_thread_win.cc b/base/threading/platform_thread_win.cc index 4eb2cb2..aeaa7c7 100644 --- a/base/threading/platform_thread_win.cc +++ b/base/threading/platform_thread_win.cc @@ -108,15 +108,16 @@ bool CreateThreadInternal(size_t stack_size, // have to work running on CreateThread() threads anyway, since we run code // on the Windows thread pool, etc. For some background on the difference: // http://www.microsoft.com/msj/1099/win32/win321099.aspx + PlatformThreadId thread_id; void* thread_handle = CreateThread( - NULL, stack_size, ThreadFunc, params, flags, NULL); + NULL, stack_size, ThreadFunc, params, flags, &thread_id); if (!thread_handle) { delete params; return false; } if (out_thread_handle) - *out_thread_handle = PlatformThreadHandle(thread_handle); + *out_thread_handle = PlatformThreadHandle(thread_handle, thread_id); else CloseHandle(thread_handle); return true; diff --git a/base/threading/thread.cc b/base/threading/thread.cc index 0e4aab2..b753a6b 100644 --- a/base/threading/thread.cc +++ b/base/threading/thread.cc @@ -37,20 +37,6 @@ void ThreadQuitHelper() { Thread::SetThreadWasQuitProperly(true); } -// Used to pass data to ThreadMain. This structure is allocated on the stack -// from within StartWithOptions. -struct Thread::StartupData { - // We get away with a const reference here because of how we are allocated. - const Thread::Options& options; - - // Used to synchronize thread startup. - WaitableEvent event; - - explicit StartupData(const Options& opt) - : options(opt), - event(false, false) {} -}; - Thread::Options::Options() : message_loop_type(MessageLoop::TYPE_DEFAULT), timer_slack(TIMER_SLACK_NONE), @@ -72,13 +58,11 @@ Thread::Thread(const std::string& name) #if defined(OS_WIN) com_status_(NONE), #endif - started_(false), stopping_(false), running_(false), - startup_data_(NULL), thread_(0), - message_loop_(NULL), - thread_id_(kInvalidThreadId), + message_loop_(nullptr), + message_loop_timer_slack_(TIMER_SLACK_NONE), name_(name) { } @@ -104,34 +88,50 @@ bool Thread::StartWithOptions(const Options& options) { SetThreadWasQuitProperly(false); - StartupData startup_data(options); - startup_data_ = &startup_data; + MessageLoop::Type type = options.message_loop_type; + if (!options.message_pump_factory.is_null()) + type = MessageLoop::TYPE_CUSTOM; - if (!PlatformThread::Create(options.stack_size, this, &thread_)) { - DLOG(ERROR) << "failed to create thread"; - startup_data_ = NULL; - return false; - } + message_loop_timer_slack_ = options.timer_slack; + message_loop_ = new MessageLoop(type, options.message_pump_factory); - // TODO(kinuko): Remove once crbug.com/465458 is solved. - tracked_objects::ScopedTracker tracking_profile_wait( - FROM_HERE_WITH_EXPLICIT_FUNCTION( - "465458 base::Thread::StartWithOptions (Wait)")); + start_event_.reset(new WaitableEvent(false, false)); - // Wait for the thread to start and initialize message_loop_ - base::ThreadRestrictions::ScopedAllowWait allow_wait; - startup_data.event.Wait(); - - // set it to NULL so we don't keep a pointer to some object on the stack. - startup_data_ = NULL; - started_ = true; + // Hold the thread_lock_ while starting a new thread, so that we can make sure + // that thread_ is populated before the newly created thread accesses it. + { + AutoLock lock(thread_lock_); + if (!PlatformThread::Create(options.stack_size, this, &thread_)) { + DLOG(ERROR) << "failed to create thread"; + delete message_loop_; + message_loop_ = nullptr; + start_event_.reset(); + return false; + } + } DCHECK(message_loop_); return true; } +bool Thread::StartAndWaitForTesting() { + bool result = Start(); + if (!result) + return false; + WaitUntilThreadStarted(); + return true; +} + +bool Thread::WaitUntilThreadStarted() { + if (!start_event_) + return false; + base::ThreadRestrictions::ScopedAllowWait allow_wait; + start_event_->Wait(); + return true; +} + void Thread::Stop() { - if (!started_) + if (!start_event_) return; StopSoon(); @@ -147,7 +147,7 @@ void Thread::Stop() { DCHECK(!message_loop_); // The thread no longer needs to be joined. - started_ = false; + start_event_.reset(); stopping_ = false; } @@ -155,9 +155,7 @@ void Thread::Stop() { void Thread::StopSoon() { // We should only be called on the same thread that started us. - // Reading thread_id_ without a lock can lead to a benign data race - // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer. - DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId()); + DCHECK_NE(thread_id(), PlatformThread::CurrentId()); if (stopping_ || !message_loop_) return; @@ -166,14 +164,28 @@ void Thread::StopSoon() { task_runner()->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); } +PlatformThreadId Thread::thread_id() const { + AutoLock lock(thread_lock_); + return thread_.id(); +} + bool Thread::IsRunning() const { + // If the thread's already started (i.e. message_loop_ is non-null) and + // not yet requested to stop (i.e. stopping_ is false) we can just return + // true. (Note that stopping_ is touched only on the same thread that + // starts / started the new thread so we need no locking here.) + if (message_loop_ && !stopping_) + return true; + // Otherwise check the running_ flag, which is set to true by the new thread + // only while it is inside Run(). + AutoLock lock(running_lock_); return running_; } void Thread::SetPriority(ThreadPriority priority) { // The thread must be started (and id known) for this to be // compatible with all platforms. - DCHECK_NE(thread_id_, kInvalidThreadId); + DCHECK(message_loop_ != nullptr); PlatformThread::SetThreadPriority(thread_, priority); } @@ -194,60 +206,60 @@ bool Thread::GetThreadWasQuitProperly() { } void Thread::ThreadMain() { - { - // The message loop for this thread. - // Allocated on the heap to centralize any leak reports at this line. - scoped_ptr<MessageLoop> message_loop; - if (!startup_data_->options.message_pump_factory.is_null()) { - message_loop.reset( - new MessageLoop(startup_data_->options.message_pump_factory.Run())); - } else { - message_loop.reset( - new MessageLoop(startup_data_->options.message_loop_type)); - } + // Complete the initialization of our Thread object. + PlatformThread::SetName(name_.c_str()); + ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. - // Complete the initialization of our Thread object. - thread_id_ = PlatformThread::CurrentId(); - PlatformThread::SetName(name_); - ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. - message_loop->set_thread_name(name_); - message_loop->SetTimerSlack(startup_data_->options.timer_slack); - message_loop_ = message_loop.get(); + // Lazily initialize the message_loop so that it can run on this thread. + DCHECK(message_loop_); + scoped_ptr<MessageLoop> message_loop(message_loop_); + message_loop_->BindToCurrentThread(); + message_loop_->set_thread_name(name_); + message_loop_->SetTimerSlack(message_loop_timer_slack_); #if defined(OS_WIN) - scoped_ptr<win::ScopedCOMInitializer> com_initializer; - if (com_status_ != NONE) { - com_initializer.reset((com_status_ == STA) ? - new win::ScopedCOMInitializer() : - new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); - } + scoped_ptr<win::ScopedCOMInitializer> com_initializer; + if (com_status_ != NONE) { + com_initializer.reset((com_status_ == STA) ? + new win::ScopedCOMInitializer() : + new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); + } #endif - // Let the thread do extra initialization. - // Let's do this before signaling we are started. - Init(); + // Make sure the thread_id() returns current thread. + // (This internally acquires lock against PlatformThread::Create) + DCHECK_EQ(thread_id(), PlatformThread::CurrentId()); + // Let the thread do extra initialization. + Init(); + + { + AutoLock lock(running_lock_); running_ = true; - startup_data_->event.Signal(); - // startup_data_ can't be touched anymore since the starting thread is now - // unlocked. + } + + start_event_->Signal(); - Run(message_loop_); + Run(message_loop_); + + { + AutoLock lock(running_lock_); running_ = false; + } - // Let the thread do extra cleanup. - CleanUp(); + // Let the thread do extra cleanup. + CleanUp(); #if defined(OS_WIN) - com_initializer.reset(); + com_initializer.reset(); #endif - // Assert that MessageLoop::Quit was called by ThreadQuitHelper. - DCHECK(GetThreadWasQuitProperly()); + // Assert that MessageLoop::Quit was called by ThreadQuitHelper. + DCHECK(GetThreadWasQuitProperly()); - // We can't receive messages anymore. - message_loop_ = NULL; - } + // We can't receive messages anymore. + // (The message loop is destructed at the end of this block) + message_loop_ = NULL; } } // namespace base diff --git a/base/threading/thread.h b/base/threading/thread.h index 4915606..6b2f1c5 100644 --- a/base/threading/thread.h +++ b/base/threading/thread.h @@ -13,11 +13,13 @@ #include "base/message_loop/message_loop.h" #include "base/message_loop/timer_slack.h" #include "base/single_thread_task_runner.h" +#include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" namespace base { class MessagePump; +class WaitableEvent; // A simple thread abstraction that establishes a MessageLoop on a new thread. // The consumer uses the MessageLoop of the thread to cause code to execute on @@ -45,7 +47,7 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // This is ignored if message_pump_factory.is_null() is false. MessageLoop::Type message_loop_type; - // Specify timer slack for thread message loop. + // Specifies timer slack for thread message loop. TimerSlack timer_slack; // Used to create the MessagePump for the MessageLoop. The callback is Run() @@ -81,7 +83,7 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // init_com_with_mta(false) and then StartWithOptions() with any message loop // type other than TYPE_UI. void init_com_with_mta(bool use_mta) { - DCHECK(!started_); + DCHECK(!start_event_); com_status_ = use_mta ? MTA : STA; } #endif @@ -103,6 +105,18 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // callback. bool StartWithOptions(const Options& options); + // Starts the thread and wait for the thread to start and run initialization + // before returning. It's same as calling Start() and then + // WaitUntilThreadStarted(). + // Note that using this (instead of Start() or StartWithOptions() causes + // jank on the calling thread, should be used only in testing code. + bool StartAndWaitForTesting(); + + // Blocks until the thread starts running. Called within StartAndWait(). + // Note that calling this causes jank on the calling thread, must be used + // carefully for production code. + bool WaitUntilThreadStarted(); + // Signals the thread to exit and returns once the thread has exited. After // this method returns, the Thread object is completely reset and may be used // as if it were newly constructed (i.e., Start may be called again). @@ -166,7 +180,7 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { PlatformThreadHandle thread_handle() { return thread_; } // The thread ID. - PlatformThreadId thread_id() const { return thread_id_; } + PlatformThreadId thread_id() const; // Returns true if the thread has been started, and not yet stopped. bool IsRunning() const; @@ -208,33 +222,32 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { ComStatus com_status_; #endif - // Whether we successfully started the thread. - bool started_; - // If true, we're in the middle of stopping, and shouldn't access // |message_loop_|. It may non-NULL and invalid. bool stopping_; // True while inside of Run(). bool running_; - - // Used to pass data to ThreadMain. - struct StartupData; - StartupData* startup_data_; + mutable base::Lock running_lock_; // Protects running_. // The thread's handle. PlatformThreadHandle thread_; + mutable base::Lock thread_lock_; // Protects thread_. // The thread's message loop. Valid only while the thread is alive. Set // by the created thread. MessageLoop* message_loop_; - // Our thread's ID. - PlatformThreadId thread_id_; + // Stores Options::timer_slack_ until the message loop has been bound to + // a thread. + TimerSlack message_loop_timer_slack_; // The name of the thread. Used for debugging purposes. std::string name_; + // Non-null if the thread has successfully started. + scoped_ptr<WaitableEvent> start_event_; + friend void ThreadQuitHelper(); DISALLOW_COPY_AND_ASSIGN(Thread); diff --git a/base/threading/thread_id_name_manager_unittest.cc b/base/threading/thread_id_name_manager_unittest.cc index b5953d5..b17c681 100644 --- a/base/threading/thread_id_name_manager_unittest.cc +++ b/base/threading/thread_id_name_manager_unittest.cc @@ -21,8 +21,8 @@ TEST_F(ThreadIdNameManagerTest, AddThreads) { base::Thread thread_a(kAThread); base::Thread thread_b(kBThread); - thread_a.Start(); - thread_b.Start(); + thread_a.StartAndWaitForTesting(); + thread_b.StartAndWaitForTesting(); EXPECT_STREQ(kAThread, manager->GetName(thread_a.thread_id())); EXPECT_STREQ(kBThread, manager->GetName(thread_b.thread_id())); @@ -35,10 +35,10 @@ TEST_F(ThreadIdNameManagerTest, RemoveThreads) { base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); base::Thread thread_a(kAThread); - thread_a.Start(); + thread_a.StartAndWaitForTesting(); { base::Thread thread_b(kBThread); - thread_b.Start(); + thread_b.StartAndWaitForTesting(); thread_b.Stop(); } EXPECT_STREQ(kAThread, manager->GetName(thread_a.thread_id())); @@ -51,12 +51,12 @@ TEST_F(ThreadIdNameManagerTest, RestartThread) { base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); base::Thread thread_a(kAThread); - thread_a.Start(); + thread_a.StartAndWaitForTesting(); base::PlatformThreadId a_id = thread_a.thread_id(); EXPECT_STREQ(kAThread, manager->GetName(a_id)); thread_a.Stop(); - thread_a.Start(); + thread_a.StartAndWaitForTesting(); EXPECT_STREQ("", manager->GetName(a_id)); EXPECT_STREQ(kAThread, manager->GetName(thread_a.thread_id())); thread_a.Stop(); diff --git a/base/threading/thread_unittest.cc b/base/threading/thread_unittest.cc index a89768e..e86c758 100644 --- a/base/threading/thread_unittest.cc +++ b/base/threading/thread_unittest.cc @@ -194,11 +194,12 @@ TEST_F(ThreadTest, ThreadName) { EXPECT_EQ("ThreadName", a.thread_name()); } -// Make sure we can't use a thread between Start() and Init(). +// Make sure Init() is called after Start() and before +// WaitUntilThreadInitialized() returns. TEST_F(ThreadTest, SleepInsideInit) { SleepInsideInitThread t; EXPECT_FALSE(t.InitCalled()); - t.Start(); + t.StartAndWaitForTesting(); EXPECT_TRUE(t.InitCalled()); } diff --git a/chrome/browser/io_thread.cc b/chrome/browser/io_thread.cc index 0a306c7..e89a415 100644 --- a/chrome/browser/io_thread.cc +++ b/chrome/browser/io_thread.cc @@ -611,11 +611,6 @@ net::URLRequestContextGetter* IOThread::system_url_request_context_getter() { } void IOThread::Init() { - // Prefer to use InitAsync unless you need initialization to block - // the UI thread -} - -void IOThread::InitAsync() { // TODO(erikchen): Remove ScopedTracker below once http://crbug.com/466432 // is fixed. tracked_objects::ScopedTracker tracking_profile1( diff --git a/chrome/browser/io_thread.h b/chrome/browser/io_thread.h index 7396cb0..7a02c5b 100644 --- a/chrome/browser/io_thread.h +++ b/chrome/browser/io_thread.h @@ -257,7 +257,6 @@ class IOThread : public content::BrowserThreadDelegate { // This handles initialization and destruction of state that must // live on the IO thread. void Init() override; - void InitAsync() override; void CleanUp() override; // Initializes |params| based on the settings in |globals|. diff --git a/chrome/browser/metrics/thread_watcher_android_unittest.cc b/chrome/browser/metrics/thread_watcher_android_unittest.cc index ee04c5d..d00c24d 100644 --- a/chrome/browser/metrics/thread_watcher_android_unittest.cc +++ b/chrome/browser/metrics/thread_watcher_android_unittest.cc @@ -47,7 +47,7 @@ TEST(ThreadWatcherAndroidTest, ApplicationStatusNotification) { scoped_ptr<WatchDogThread> watchdog_thread_(new WatchDogThread()); - watchdog_thread_->Start(); + watchdog_thread_->StartAndWaitForTesting(); EXPECT_FALSE(ThreadWatcherList::g_thread_watcher_list_); diff --git a/chrome/browser/metrics/thread_watcher_unittest.cc b/chrome/browser/metrics/thread_watcher_unittest.cc index 4b3c457..00171b8 100644 --- a/chrome/browser/metrics/thread_watcher_unittest.cc +++ b/chrome/browser/metrics/thread_watcher_unittest.cc @@ -257,9 +257,9 @@ class ThreadWatcherTest : public ::testing::Test { db_thread_.reset(new content::TestBrowserThread(BrowserThread::DB)); io_thread_.reset(new content::TestBrowserThread(BrowserThread::IO)); watchdog_thread_.reset(new WatchDogThread()); - db_thread_->Start(); - io_thread_->Start(); - watchdog_thread_->Start(); + db_thread_->StartAndWaitForTesting(); + io_thread_->StartAndWaitForTesting(); + watchdog_thread_->StartAndWaitForTesting(); WatchDogThread::PostTask( FROM_HERE, @@ -691,7 +691,7 @@ TEST_F(ThreadWatcherListTest, Restart) { content::TestBrowserThread ui_thread(BrowserThread::UI, &message_loop_for_ui); scoped_ptr<WatchDogThread> watchdog_thread_(new WatchDogThread()); - watchdog_thread_->Start(); + watchdog_thread_->StartAndWaitForTesting(); // See http://crbug.com/347887. // StartWatchingAll() will PostDelayedTask to create g_thread_watcher_list_, diff --git a/chrome/browser/sync/glue/sync_backend_registrar_unittest.cc b/chrome/browser/sync/glue/sync_backend_registrar_unittest.cc index 3db65e9..34adb95 100644 --- a/chrome/browser/sync/glue/sync_backend_registrar_unittest.cc +++ b/chrome/browser/sync/glue/sync_backend_registrar_unittest.cc @@ -336,6 +336,9 @@ TEST_F(SyncBackendRegistrarShutdownTest, BlockingShutdown) { base::Bind(&SyncBackendRegistrar::Shutdown, base::Unretained(registrar.release()))); + // Make sure the thread starts running. + sync_thread->WaitUntilThreadStarted(); + // The test verifies that the sync thread doesn't block because // of the blocked DB thread and can finish the shutdown. sync_thread->message_loop()->RunUntilIdle(); diff --git a/content/browser/browser_thread_impl.cc b/content/browser/browser_thread_impl.cc index 78e3836..1aac305 100644 --- a/content/browser/browser_thread_impl.cc +++ b/content/browser/browser_thread_impl.cc @@ -159,14 +159,8 @@ void BrowserThreadImpl::Init() { AtomicWord stored_pointer = base::subtle::NoBarrier_Load(storage); BrowserThreadDelegate* delegate = reinterpret_cast<BrowserThreadDelegate*>(stored_pointer); - if (delegate) { + if (delegate) delegate->Init(); - message_loop()->PostTask(FROM_HERE, - base::Bind(&BrowserThreadDelegate::InitAsync, - // Delegate is expected to exist for the - // duration of the thread's lifetime - base::Unretained(delegate))); - } } // We disable optimizations for this block of functions so the compiler doesn't @@ -300,6 +294,15 @@ BrowserThreadImpl::~BrowserThreadImpl() { #endif } +bool BrowserThreadImpl::StartWithOptions(const Options& options) { + // The global thread table needs to be locked while a new thread is + // starting, as the new thread can asynchronously start touching the + // table (and other thread's message_loop). + BrowserThreadGlobals& globals = g_globals.Get(); + base::AutoLock lock(globals.lock); + return Thread::StartWithOptions(options); +} + // static bool BrowserThreadImpl::PostTaskHelper( BrowserThread::ID identifier, diff --git a/content/browser/browser_thread_impl.h b/content/browser/browser_thread_impl.h index 72915ea..eebe5b0 100644 --- a/content/browser/browser_thread_impl.h +++ b/content/browser/browser_thread_impl.h @@ -25,6 +25,8 @@ class CONTENT_EXPORT BrowserThreadImpl : public BrowserThread, base::MessageLoop* message_loop); ~BrowserThreadImpl() override; + bool StartWithOptions(const Options& options); + static void ShutdownThreadPool(); protected: diff --git a/content/public/browser/browser_thread_delegate.h b/content/public/browser/browser_thread_delegate.h index ad5fd7e..3688e65 100644 --- a/content/public/browser/browser_thread_delegate.h +++ b/content/public/browser/browser_thread_delegate.h @@ -22,9 +22,6 @@ class BrowserThreadDelegate { // Called prior to starting the message loop virtual void Init() = 0; - // Called as the first task on the thread's message loop. - virtual void InitAsync() = 0; - // Called just after the message loop ends. virtual void CleanUp() = 0; }; diff --git a/content/public/test/test_browser_thread.cc b/content/public/test/test_browser_thread.cc index ac67480..62d9eb8 100644 --- a/content/public/test/test_browser_thread.cc +++ b/content/public/test/test_browser_thread.cc @@ -58,6 +58,10 @@ bool TestBrowserThread::Start() { return impl_->Start(); } +bool TestBrowserThread::StartAndWaitForTesting() { + return impl_->StartAndWaitForTesting(); +} + bool TestBrowserThread::StartIOThread() { base::Thread::Options options; options.message_loop_type = base::MessageLoop::TYPE_IO; diff --git a/content/public/test/test_browser_thread.h b/content/public/test/test_browser_thread.h index 2428bbb..d3a2564 100644 --- a/content/public/test/test_browser_thread.h +++ b/content/public/test/test_browser_thread.h @@ -36,6 +36,10 @@ class TestBrowserThread { // Starts the thread with a generic message loop. bool Start(); + // Starts the thread with a generic message loop and waits for the + // thread to run. + bool StartAndWaitForTesting(); + // Starts the thread with an IOThread message loop. bool StartIOThread(); diff --git a/net/android/network_change_notifier_android.cc b/net/android/network_change_notifier_android.cc index a6ea628..9829235 100644 --- a/net/android/network_change_notifier_android.cc +++ b/net/android/network_change_notifier_android.cc @@ -173,6 +173,9 @@ NetworkChangeNotifierAndroid::NetworkChangeNotifierAndroid( delegate_->AddObserver(this); dns_config_service_thread_->StartWithOptions( base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); + // Wait until Init is called on the DNS config thread before + // calling InitAfterStart. + dns_config_service_thread_->WaitUntilThreadStarted(); dns_config_service_thread_->InitAfterStart(); } diff --git a/net/test/embedded_test_server/embedded_test_server.cc b/net/test/embedded_test_server/embedded_test_server.cc index ac7d3bc..e747c94 100644 --- a/net/test/embedded_test_server/embedded_test_server.cc +++ b/net/test/embedded_test_server/embedded_test_server.cc @@ -190,6 +190,7 @@ void EmbeddedTestServer::StartThread() { thread_options.message_loop_type = base::MessageLoop::TYPE_IO; io_thread_.reset(new base::Thread("EmbeddedTestServer io thread")); CHECK(io_thread_->StartWithOptions(thread_options)); + CHECK(io_thread_->WaitUntilThreadStarted()); } void EmbeddedTestServer::InitializeOnIOThread() { |