diff options
author | skyostil <skyostil@chromium.org> | 2015-02-24 11:35:49 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-02-24 19:36:41 +0000 |
commit | 0da8dc886fad453b0cf4f1c718da8d44e132b618 (patch) | |
tree | dddc676f02460e27663dea6d33a41341ee2f7512 /content | |
parent | 8ec6aaa574230d326a602f1cd719cfff227e4279 (diff) | |
download | chromium_src-0da8dc886fad453b0cf4f1c718da8d44e132b618.zip chromium_src-0da8dc886fad453b0cf4f1c718da8d44e132b618.tar.gz chromium_src-0da8dc886fad453b0cf4f1c718da8d44e132b618.tar.bz2 |
scheduler: Implement task observers
This patch adds a way for scheduler clients to get notified of task
execution, much like with base::MessageLoop. This is needed to allow
the execution of microtask checkpoints[1] between tasks by Blink while
letting the scheduler use a >1 work batch size.
This patch also refactors the various WebThreadImpls to reduce code
duplication removes the deprecated postTask entrypoints that don't take
the posting location as a parameter.
BUG=450977,444764,458990
[1] https://html.spec.whatwg.org/multipage/webappapis.html#processing-model-9
Committed: https://crrev.com/ab211a0e344b431ca58c74f7815b15b28f7fb01a
Cr-Commit-Position: refs/heads/master@{#316450}
Review URL: https://codereview.chromium.org/922733002
Cr-Commit-Position: refs/heads/master@{#317852}
Diffstat (limited to 'content')
20 files changed, 583 insertions, 126 deletions
diff --git a/content/child/threaded_data_provider.cc b/content/child/threaded_data_provider.cc index 6a533d3..64daeeb 100644 --- a/content/child/threaded_data_provider.cc +++ b/content/child/threaded_data_provider.cc @@ -108,10 +108,11 @@ void DataProviderMessageFilter::OnReceivedData(int request_id, int data_length, int encoded_data_length) { DCHECK(io_message_loop_->BelongsToCurrentThread()); - background_thread_.message_loop()->PostTask(FROM_HERE, base::Bind( - &ThreadedDataProvider::OnReceivedDataOnBackgroundThread, - background_thread_resource_provider_, - data_offset, data_length, encoded_data_length)); + background_thread_.TaskRunner()->PostTask( + FROM_HERE, + base::Bind(&ThreadedDataProvider::OnReceivedDataOnBackgroundThread, + background_thread_resource_provider_, data_offset, data_length, + encoded_data_length)); } } // anonymous namespace @@ -186,9 +187,9 @@ void ThreadedDataProvider::Stop() { // ThreadedDataProvider gets created. DCHECK(current_background_thread == static_cast<WebThreadImpl*>(&background_thread_)); - background_thread_.message_loop()->PostTask(FROM_HERE, - base::Bind(&ThreadedDataProvider::StopOnBackgroundThread, - base::Unretained(this))); + background_thread_.TaskRunner()->PostTask( + FROM_HERE, base::Bind(&ThreadedDataProvider::StopOnBackgroundThread, + base::Unretained(this))); } } @@ -213,7 +214,8 @@ void ThreadedDataProvider::OnRequestCompleteForegroundThread( const base::TimeTicks& renderer_completion_time) { DCHECK(ChildThreadImpl::current()); - background_thread_.message_loop()->PostTask(FROM_HERE, + background_thread_.TaskRunner()->PostTask( + FROM_HERE, base::Bind(&ThreadedDataProvider::OnRequestCompleteBackgroundThread, base::Unretained(this), resource_dispatcher, request_complete_data, renderer_completion_time)); @@ -241,7 +243,8 @@ void ThreadedDataProvider::OnResourceMessageFilterAddedMainThread() { // We bounce this message from the I/O thread via the main thread and then // to our background thread, following the same path as incoming data before // our filter gets added, to make sure there's nothing still incoming. - background_thread_.message_loop()->PostTask(FROM_HERE, + background_thread_.TaskRunner()->PostTask( + FROM_HERE, base::Bind( &ThreadedDataProvider::OnResourceMessageFilterAddedBackgroundThread, background_thread_weak_factory_->GetWeakPtr())); @@ -295,10 +298,10 @@ void ThreadedDataProvider::OnReceivedDataOnForegroundThread( const char* data, int data_length, int encoded_data_length) { DCHECK(ChildThreadImpl::current()); - background_thread_.message_loop()->PostTask(FROM_HERE, - base::Bind(&ThreadedDataProvider::ForwardAndACKData, - base::Unretained(this), - data, data_length, encoded_data_length)); + background_thread_.TaskRunner()->PostTask( + FROM_HERE, base::Bind(&ThreadedDataProvider::ForwardAndACKData, + base::Unretained(this), data, data_length, + encoded_data_length)); } void ThreadedDataProvider::ForwardAndACKData(const char* data, diff --git a/content/child/webthread_impl.cc b/content/child/webthread_impl.cc index 9b697de..8a1d7cd 100644 --- a/content/child/webthread_impl.cc +++ b/content/child/webthread_impl.cc @@ -16,9 +16,6 @@ namespace content { -WebThreadBase::WebThreadBase() {} -WebThreadBase::~WebThreadBase() {} - class WebThreadBase::TaskObserverAdapter : public base::MessageLoop::TaskObserver { public: @@ -37,13 +34,22 @@ private: WebThread::TaskObserver* observer_; }; +WebThreadBase::WebThreadBase() { +} + +WebThreadBase::~WebThreadBase() { + for (auto& observer_entry : task_observer_map_) { + delete observer_entry.second; + } +} + void WebThreadBase::addTaskObserver(TaskObserver* observer) { CHECK(isCurrentThread()); std::pair<TaskObserverMap::iterator, bool> result = task_observer_map_.insert( std::make_pair(observer, static_cast<TaskObserverAdapter*>(NULL))); if (result.second) result.first->second = new TaskObserverAdapter(observer); - base::MessageLoop::current()->AddTaskObserver(result.first->second); + AddTaskObserverInternal(result.first->second); } void WebThreadBase::removeTaskObserver(TaskObserver* observer) { @@ -51,14 +57,19 @@ void WebThreadBase::removeTaskObserver(TaskObserver* observer) { TaskObserverMap::iterator iter = task_observer_map_.find(observer); if (iter == task_observer_map_.end()) return; - base::MessageLoop::current()->RemoveTaskObserver(iter->second); + RemoveTaskObserverInternal(iter->second); delete iter->second; task_observer_map_.erase(iter); } -WebThreadImpl::WebThreadImpl(const char* name) - : thread_(new base::Thread(name)) { - thread_->Start(); +void WebThreadBase::AddTaskObserverInternal( + base::MessageLoop::TaskObserver* observer) { + base::MessageLoop::current()->AddTaskObserver(observer); +} + +void WebThreadBase::RemoveTaskObserverInternal( + base::MessageLoop::TaskObserver* observer) { + base::MessageLoop::current()->RemoveTaskObserver(observer); } // RunWebThreadTask takes the ownership of |task| from base::Closure and @@ -84,118 +95,85 @@ WebThreadImpl::WebThreadImpl(const char* name) // are no reference counter modification like [A] (because task->run() is not // executed), so there are no race conditions. // See https://crbug.com/390851 for more details. -static void RunWebThreadTask(scoped_ptr<blink::WebThread::Task> task) { - task->run(); -} - -void WebThreadImpl::postTask(Task* task) { - postDelayedTask(task, 0); +// +// static +void WebThreadBase::RunWebThreadTask(scoped_ptr<blink::WebThread::Task> task) { + task->run(); } -void WebThreadImpl::postTask(const blink::WebTraceLocation& location, +void WebThreadBase::postTask(const blink::WebTraceLocation& location, Task* task) { postDelayedTask(location, task, 0); } -void WebThreadImpl::postDelayedTask(Task* task, long long delay_ms) { - thread_->message_loop()->PostDelayedTask( - FROM_HERE, - base::Bind(RunWebThreadTask, base::Passed(make_scoped_ptr(task))), - base::TimeDelta::FromMilliseconds(delay_ms)); -} - -void WebThreadImpl::postDelayedTask(const blink::WebTraceLocation& web_location, +void WebThreadBase::postDelayedTask(const blink::WebTraceLocation& web_location, Task* task, long long delay_ms) { tracked_objects::Location location(web_location.functionName(), web_location.fileName(), -1, nullptr); - thread_->message_loop()->PostDelayedTask( + TaskRunner()->PostDelayedTask( location, base::Bind(RunWebThreadTask, base::Passed(make_scoped_ptr(task))), base::TimeDelta::FromMilliseconds(delay_ms)); } -void WebThreadImpl::enterRunLoop() { +void WebThreadBase::enterRunLoop() { CHECK(isCurrentThread()); - CHECK(!thread_->message_loop()->is_running()); // We don't support nesting. - thread_->message_loop()->Run(); + CHECK(!MessageLoop()->is_running()); // We don't support nesting. + MessageLoop()->Run(); } -void WebThreadImpl::exitRunLoop() { +void WebThreadBase::exitRunLoop() { CHECK(isCurrentThread()); - CHECK(thread_->message_loop()->is_running()); - thread_->message_loop()->Quit(); + CHECK(MessageLoop()->is_running()); + MessageLoop()->Quit(); } -bool WebThreadImpl::isCurrentThread() const { - return thread_->thread_id() == base::PlatformThread::CurrentId(); +bool WebThreadBase::isCurrentThread() const { + return TaskRunner()->BelongsToCurrentThread(); } blink::PlatformThreadId WebThreadImpl::threadId() const { return thread_->thread_id(); } -WebThreadImpl::~WebThreadImpl() { - thread_->Stop(); -} - -WebThreadImplForMessageLoop::WebThreadImplForMessageLoop( - scoped_refptr<base::SingleThreadTaskRunner> owning_thread_task_runner) - : owning_thread_task_runner_(owning_thread_task_runner), - thread_id_(base::PlatformThread::CurrentId()) { +WebThreadImpl::WebThreadImpl(const char* name) + : thread_(new base::Thread(name)) { + thread_->Start(); } -void WebThreadImplForMessageLoop::postTask(Task* task) { - postDelayedTask(task, 0); +WebThreadImpl::~WebThreadImpl() { + thread_->Stop(); } -void WebThreadImplForMessageLoop::postTask( - const blink::WebTraceLocation& location, - Task* task) { - postDelayedTask(location, task, 0); +base::MessageLoop* WebThreadImpl::MessageLoop() const { + return thread_->message_loop(); } -void WebThreadImplForMessageLoop::postDelayedTask(Task* task, - long long delay_ms) { - owning_thread_task_runner_->PostDelayedTask( - FROM_HERE, - base::Bind(RunWebThreadTask, base::Passed(make_scoped_ptr(task))), - base::TimeDelta::FromMilliseconds(delay_ms)); +base::SingleThreadTaskRunner* WebThreadImpl::TaskRunner() const { + return thread_->message_loop_proxy().get(); } -void WebThreadImplForMessageLoop::postDelayedTask( - const blink::WebTraceLocation& web_location, - Task* task, - long long delay_ms) { - tracked_objects::Location location(web_location.functionName(), - web_location.fileName(), -1, nullptr); - owning_thread_task_runner_->PostDelayedTask( - location, - base::Bind(RunWebThreadTask, base::Passed(make_scoped_ptr(task))), - base::TimeDelta::FromMilliseconds(delay_ms)); +WebThreadImplForMessageLoop::WebThreadImplForMessageLoop( + scoped_refptr<base::SingleThreadTaskRunner> owning_thread_task_runner) + : owning_thread_task_runner_(owning_thread_task_runner), + thread_id_(base::PlatformThread::CurrentId()) { } -void WebThreadImplForMessageLoop::enterRunLoop() { - CHECK(isCurrentThread()); - // We don't support nesting. - CHECK(!base::MessageLoop::current()->is_running()); - base::MessageLoop::current()->Run(); +blink::PlatformThreadId WebThreadImplForMessageLoop::threadId() const { + return thread_id_; } -void WebThreadImplForMessageLoop::exitRunLoop() { - CHECK(isCurrentThread()); - CHECK(base::MessageLoop::current()->is_running()); - base::MessageLoop::current()->Quit(); +WebThreadImplForMessageLoop::~WebThreadImplForMessageLoop() { } -bool WebThreadImplForMessageLoop::isCurrentThread() const { - return owning_thread_task_runner_->BelongsToCurrentThread(); +base::MessageLoop* WebThreadImplForMessageLoop::MessageLoop() const { + DCHECK(isCurrentThread()); + return base::MessageLoop::current(); } -blink::PlatformThreadId WebThreadImplForMessageLoop::threadId() const { - return thread_id_; +base::SingleThreadTaskRunner* WebThreadImplForMessageLoop::TaskRunner() const { + return owning_thread_task_runner_.get(); } -WebThreadImplForMessageLoop::~WebThreadImplForMessageLoop() {} - } // namespace content diff --git a/content/child/webthread_impl.h b/content/child/webthread_impl.h index ba2888f..edd3956 100644 --- a/content/child/webthread_impl.h +++ b/content/child/webthread_impl.h @@ -22,18 +22,43 @@ class CONTENT_EXPORT WebThreadBase : public blink::WebThread { public: virtual ~WebThreadBase(); + // blink::WebThread implementation. + virtual bool isCurrentThread() const; + virtual blink::PlatformThreadId threadId() const = 0; + + virtual void postTask(const blink::WebTraceLocation& location, Task* task); + virtual void postDelayedTask(const blink::WebTraceLocation& location, + Task* task, + long long delay_ms); + + virtual void enterRunLoop(); + virtual void exitRunLoop(); + virtual void addTaskObserver(TaskObserver* observer); virtual void removeTaskObserver(TaskObserver* observer); - virtual bool isCurrentThread() const = 0; - virtual blink::PlatformThreadId threadId() const = 0; + // Returns the base::Bind-compatible task runner for posting tasks to this + // thread. Can be called from any thread. + virtual base::SingleThreadTaskRunner* TaskRunner() const = 0; protected: + class TaskObserverAdapter; + WebThreadBase(); - private: - class TaskObserverAdapter; + // Returns the underlying MessageLoop for this thread. Only used for entering + // and exiting a nested run loop. Only called on the thread that the + // WebThread belongs to. + virtual base::MessageLoop* MessageLoop() const = 0; + + virtual void AddTaskObserverInternal( + base::MessageLoop::TaskObserver* observer); + virtual void RemoveTaskObserverInternal( + base::MessageLoop::TaskObserver* observer); + + static void RunWebThreadTask(scoped_ptr<blink::WebThread::Task> task); + private: typedef std::map<TaskObserver*, TaskObserverAdapter*> TaskObserverMap; TaskObserverMap task_observer_map_; }; @@ -43,24 +68,15 @@ class CONTENT_EXPORT WebThreadImpl : public WebThreadBase { explicit WebThreadImpl(const char* name); virtual ~WebThreadImpl(); - virtual void postTask(const blink::WebTraceLocation& location, Task* task); - virtual void postDelayedTask(const blink::WebTraceLocation& location, - Task* task, - long long delay_ms); - - // TODO(skyostil): Remove once blink has migrated. - virtual void postTask(Task* task); - virtual void postDelayedTask(Task* task, long long delay_ms); - - virtual void enterRunLoop(); - virtual void exitRunLoop(); - - base::MessageLoop* message_loop() const { return thread_->message_loop(); } + // blink::WebThread implementation. + blink::PlatformThreadId threadId() const override; - virtual bool isCurrentThread() const override; - virtual blink::PlatformThreadId threadId() const override; + // WebThreadBase implementation. + base::SingleThreadTaskRunner* TaskRunner() const override; private: + base::MessageLoop* MessageLoop() const override; + scoped_ptr<base::Thread> thread_; }; @@ -70,21 +86,14 @@ class WebThreadImplForMessageLoop : public WebThreadBase { scoped_refptr<base::SingleThreadTaskRunner> owning_thread_task_runner); CONTENT_EXPORT virtual ~WebThreadImplForMessageLoop(); - virtual void postTask(const blink::WebTraceLocation& location, Task* task); - virtual void postDelayedTask(const blink::WebTraceLocation& location, - Task* task, - long long delay_ms); - - // TODO(skyostil): Remove once blink has migrated. - virtual void postTask(Task* task); - virtual void postDelayedTask(Task* task, long long delay_ms); + // blink::WebThread implementation. + blink::PlatformThreadId threadId() const override; - virtual void enterRunLoop() override; - virtual void exitRunLoop() override; + // WebThreadBase implementation. + base::MessageLoop* MessageLoop() const override; private: - virtual bool isCurrentThread() const override; - virtual blink::PlatformThreadId threadId() const override; + base::SingleThreadTaskRunner* TaskRunner() const override; scoped_refptr<base::SingleThreadTaskRunner> owning_thread_task_runner_; blink::PlatformThreadId thread_id_; diff --git a/content/content_renderer.gypi b/content/content_renderer.gypi index 0150728..d3e4979 100644 --- a/content/content_renderer.gypi +++ b/content/content_renderer.gypi @@ -377,6 +377,8 @@ 'renderer/scheduler/task_queue_selector.h', 'renderer/scheduler/web_scheduler_impl.cc', 'renderer/scheduler/web_scheduler_impl.h', + 'renderer/scheduler/webthread_impl_for_scheduler.cc', + 'renderer/scheduler/webthread_impl_for_scheduler.h', 'renderer/screen_orientation/screen_orientation_dispatcher.cc', 'renderer/screen_orientation/screen_orientation_dispatcher.h', 'renderer/screen_orientation/screen_orientation_observer.cc', diff --git a/content/content_tests.gypi b/content/content_tests.gypi index bca56bf..fccef99 100644 --- a/content/content_tests.gypi +++ b/content/content_tests.gypi @@ -660,6 +660,7 @@ 'renderer/scheduler/renderer_task_queue_selector_unittest.cc', 'renderer/scheduler/resource_dispatch_throttler_unittest.cc', 'renderer/scheduler/task_queue_manager_unittest.cc', + 'renderer/scheduler/webthread_impl_for_scheduler_unittest.cc', 'renderer/screen_orientation/screen_orientation_dispatcher_unittest.cc', 'renderer/skia_benchmarking_extension_unittest.cc', 'test/fileapi_test_file_set.cc', diff --git a/content/renderer/renderer_blink_platform_impl.cc b/content/renderer/renderer_blink_platform_impl.cc index 3ac2bd0..cf1f817 100644 --- a/content/renderer/renderer_blink_platform_impl.cc +++ b/content/renderer/renderer_blink_platform_impl.cc @@ -52,6 +52,7 @@ #include "content/renderer/renderer_clipboard_delegate.h" #include "content/renderer/scheduler/renderer_scheduler.h" #include "content/renderer/scheduler/web_scheduler_impl.h" +#include "content/renderer/scheduler/webthread_impl_for_scheduler.h" #include "content/renderer/screen_orientation/screen_orientation_observer.h" #include "content/renderer/webclipboard_impl.h" #include "content/renderer/webgraphicscontext3d_provider_impl.h" @@ -228,6 +229,7 @@ RendererBlinkPlatformImpl::RendererBlinkPlatformImpl( RendererScheduler* renderer_scheduler) : BlinkPlatformImpl(renderer_scheduler->DefaultTaskRunner()), web_scheduler_(new WebSchedulerImpl(renderer_scheduler)), + main_thread_(new WebThreadImplForScheduler(renderer_scheduler)), clipboard_delegate_(new RendererClipboardDelegate), clipboard_(new WebClipboardImpl(clipboard_delegate_.get())), mime_registry_(new RendererBlinkPlatformImpl::MimeRegistry), @@ -263,6 +265,12 @@ blink::WebScheduler* RendererBlinkPlatformImpl::scheduler() { return web_scheduler_.get(); } +blink::WebThread* RendererBlinkPlatformImpl::currentThread() { + if (main_thread_->isCurrentThread()) + return main_thread_.get(); + return BlinkPlatformImpl::currentThread(); +} + blink::WebClipboard* RendererBlinkPlatformImpl::clipboard() { blink::WebClipboard* clipboard = GetContentClient()->renderer()->OverrideWebClipboard(); diff --git a/content/renderer/renderer_blink_platform_impl.h b/content/renderer/renderer_blink_platform_impl.h index 97ab4ee..ae8cb46 100644 --- a/content/renderer/renderer_blink_platform_impl.h +++ b/content/renderer/renderer_blink_platform_impl.h @@ -50,6 +50,7 @@ class ThreadSafeSender; class WebClipboardImpl; class WebDatabaseObserverImpl; class WebFileSystemImpl; +class WebThreadImplForScheduler; class WebSchedulerImpl; class CONTENT_EXPORT RendererBlinkPlatformImpl : public BlinkPlatformImpl { @@ -152,6 +153,7 @@ class CONTENT_EXPORT RendererBlinkPlatformImpl : public BlinkPlatformImpl { virtual void vibrate(unsigned int milliseconds); virtual void cancelVibration(); virtual blink::WebScheduler* scheduler(); + virtual blink::WebThread* currentThread(); // Set the PlatformEventObserverBase in |platform_event_observers_| associated // with |type| to |observer|. If there was already an observer associated to @@ -202,6 +204,7 @@ class CONTENT_EXPORT RendererBlinkPlatformImpl : public BlinkPlatformImpl { device::VibrationManagerPtr& GetConnectedVibrationManagerService(); scoped_ptr<WebSchedulerImpl> web_scheduler_; + scoped_ptr<WebThreadImplForScheduler> main_thread_; scoped_ptr<RendererClipboardDelegate> clipboard_delegate_; scoped_ptr<WebClipboardImpl> clipboard_; diff --git a/content/renderer/scheduler/null_renderer_scheduler.cc b/content/renderer/scheduler/null_renderer_scheduler.cc index 7dd3d52..7e5d2db 100644 --- a/content/renderer/scheduler/null_renderer_scheduler.cc +++ b/content/renderer/scheduler/null_renderer_scheduler.cc @@ -89,6 +89,16 @@ bool NullRendererScheduler::ShouldYieldForHighPriorityWork() { return false; } +void NullRendererScheduler::AddTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { + base::MessageLoop::current()->AddTaskObserver(task_observer); +} + +void NullRendererScheduler::RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { + base::MessageLoop::current()->RemoveTaskObserver(task_observer); +} + void NullRendererScheduler::Shutdown() { } diff --git a/content/renderer/scheduler/null_renderer_scheduler.h b/content/renderer/scheduler/null_renderer_scheduler.h index eb2faec..d983e72 100644 --- a/content/renderer/scheduler/null_renderer_scheduler.h +++ b/content/renderer/scheduler/null_renderer_scheduler.h @@ -27,6 +27,9 @@ class NullRendererScheduler : public RendererScheduler { void DidAnimateForInputOnCompositorThread() override; bool IsHighPriorityWorkAnticipated() override; bool ShouldYieldForHighPriorityWork() override; + void AddTaskObserver(base::MessageLoop::TaskObserver* task_observer) override; + void RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) override; void Shutdown() override; private: diff --git a/content/renderer/scheduler/renderer_scheduler.h b/content/renderer/scheduler/renderer_scheduler.h index 0a57339..7d2c39e 100644 --- a/content/renderer/scheduler/renderer_scheduler.h +++ b/content/renderer/scheduler/renderer_scheduler.h @@ -70,6 +70,14 @@ class CONTENT_EXPORT RendererScheduler { // Must be called from the main thread. virtual bool ShouldYieldForHighPriorityWork() = 0; + // Adds or removes a task observer from the scheduler. The observer will be + // notified before and after every executed task. These functions can only be + // called on the main thread. + virtual void AddTaskObserver( + base::MessageLoop::TaskObserver* task_observer) = 0; + virtual void RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) = 0; + // Shuts down the scheduler by dropping any remaining pending work in the work // queues. After this call any work posted to the task runners will be // silently dropped. diff --git a/content/renderer/scheduler/renderer_scheduler_impl.cc b/content/renderer/scheduler/renderer_scheduler_impl.cc index 67f3072..4c3d1a3 100644 --- a/content/renderer/scheduler/renderer_scheduler_impl.cc +++ b/content/renderer/scheduler/renderer_scheduler_impl.cc @@ -350,6 +350,11 @@ void RendererSchedulerImpl::SetTimeSourceForTesting( task_queue_manager_->SetTimeSourceForTesting(time_source); } +void RendererSchedulerImpl::SetWorkBatchSizeForTesting(size_t work_batch_size) { + DCHECK(main_thread_checker_.CalledOnValidThread()); + task_queue_manager_->SetWorkBatchSize(work_batch_size); +} + base::TimeTicks RendererSchedulerImpl::Now() const { return UNLIKELY(time_source_) ? time_source_->Now() : base::TimeTicks::Now(); } @@ -481,4 +486,18 @@ RendererSchedulerImpl::ComputeNewInputStreamState( return INPUT_ACTIVE; } +void RendererSchedulerImpl::AddTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { + DCHECK(main_thread_checker_.CalledOnValidThread()); + if (task_queue_manager_) + task_queue_manager_->AddTaskObserver(task_observer); +} + +void RendererSchedulerImpl::RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { + DCHECK(main_thread_checker_.CalledOnValidThread()); + if (task_queue_manager_) + task_queue_manager_->RemoveTaskObserver(task_observer); +} + } // namespace content diff --git a/content/renderer/scheduler/renderer_scheduler_impl.h b/content/renderer/scheduler/renderer_scheduler_impl.h index f188f8b..67f5505 100644 --- a/content/renderer/scheduler/renderer_scheduler_impl.h +++ b/content/renderer/scheduler/renderer_scheduler_impl.h @@ -43,9 +43,13 @@ class CONTENT_EXPORT RendererSchedulerImpl : public RendererScheduler { void DidAnimateForInputOnCompositorThread() override; bool IsHighPriorityWorkAnticipated() override; bool ShouldYieldForHighPriorityWork() override; + void AddTaskObserver(base::MessageLoop::TaskObserver* task_observer) override; + void RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) override; void Shutdown() override; void SetTimeSourceForTesting(scoped_refptr<cc::TestNowSource> time_source); + void SetWorkBatchSizeForTesting(size_t work_batch_size); private: friend class RendererSchedulerImplTest; diff --git a/content/renderer/scheduler/task_queue_manager.cc b/content/renderer/scheduler/task_queue_manager.cc index 38a7192..90f89fe 100644 --- a/content/renderer/scheduler/task_queue_manager.cc +++ b/content/renderer/scheduler/task_queue_manager.cc @@ -419,6 +419,8 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) { if (!UpdateWorkQueues(&next_pending_delayed_task, BEFORE_WAKEUP_EVENT_TYPE)) return; + base::PendingTask previous_task((tracked_objects::Location()), + (base::Closure())); for (int i = 0; i < work_batch_size_; i++) { // Interrupt the work batch if we should run the next delayed task. if (i > 0 && next_pending_delayed_task.ToInternalValue() != kMaxTimeTicks && @@ -431,7 +433,7 @@ void TaskQueueManager::DoWork(bool posted_from_main_thread) { // Note that this function won't post another call to DoWork if one is // already pending, so it is safe to call it in a loop. MaybePostDoWorkOnMainRunner(); - ProcessTaskFromWorkQueue(queue_index); + ProcessTaskFromWorkQueue(queue_index, i > 0, &previous_task); if (!UpdateWorkQueues(&next_pending_delayed_task, AFTER_WAKEUP_EVENT_TYPE)) return; @@ -451,7 +453,10 @@ void TaskQueueManager::DidQueueTask(base::PendingTask* pending_task) { task_annotator_.DidQueueTask("TaskQueueManager::PostTask", *pending_task); } -void TaskQueueManager::ProcessTaskFromWorkQueue(size_t queue_index) { +void TaskQueueManager::ProcessTaskFromWorkQueue( + size_t queue_index, + bool has_previous_task, + base::PendingTask* previous_task) { DCHECK(main_thread_checker_.CalledOnValidThread()); internal::TaskQueue* queue = Queue(queue_index); base::PendingTask pending_task = queue->TakeTaskFromWorkQueue(); @@ -461,8 +466,19 @@ void TaskQueueManager::ProcessTaskFromWorkQueue(size_t queue_index) { main_task_runner_->PostNonNestableTask(pending_task.posted_from, pending_task.task); } else { + // Suppress "will" task observer notifications for the first and "did" + // notifications for the last task in the batch to avoid duplicate + // notifications. + if (has_previous_task) { + FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, + DidProcessTask(*previous_task)); + FOR_EACH_OBSERVER(base::MessageLoop::TaskObserver, task_observers_, + WillProcessTask(pending_task)); + } task_annotator_.RunTask("TaskQueueManager::PostTask", "TaskQueueManager::RunTask", pending_task); + pending_task.task.Reset(); + *previous_task = pending_task; } } @@ -490,6 +506,20 @@ void TaskQueueManager::SetWorkBatchSize(int work_batch_size) { work_batch_size_ = work_batch_size; } +void TaskQueueManager::AddTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { + DCHECK(main_thread_checker_.CalledOnValidThread()); + base::MessageLoop::current()->AddTaskObserver(task_observer); + task_observers_.AddObserver(task_observer); +} + +void TaskQueueManager::RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { + DCHECK(main_thread_checker_.CalledOnValidThread()); + base::MessageLoop::current()->RemoveTaskObserver(task_observer); + task_observers_.RemoveObserver(task_observer); +} + void TaskQueueManager::SetTimeSourceForTesting( scoped_refptr<cc::TestNowSource> time_source) { DCHECK(main_thread_checker_.CalledOnValidThread()); diff --git a/content/renderer/scheduler/task_queue_manager.h b/content/renderer/scheduler/task_queue_manager.h index 0b1bcbb..9e475ff 100644 --- a/content/renderer/scheduler/task_queue_manager.h +++ b/content/renderer/scheduler/task_queue_manager.h @@ -9,6 +9,7 @@ #include "base/debug/task_annotator.h" #include "base/macros.h" #include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop.h" #include "base/pending_task.h" #include "base/single_thread_task_runner.h" #include "base/synchronization/lock.h" @@ -105,6 +106,11 @@ class CONTENT_EXPORT TaskQueueManager { // tasks posted to the main loop. The batch size is 1 by default. void SetWorkBatchSize(int work_batch_size); + // These functions can only be called on the same thread that the task queue + // manager executes its tasks on. + void AddTaskObserver(base::MessageLoop::TaskObserver* task_observer); + void RemoveTaskObserver(base::MessageLoop::TaskObserver* task_observer); + void SetTimeSourceForTesting(scoped_refptr<cc::TestNowSource> time_source); private: @@ -141,9 +147,12 @@ class CONTENT_EXPORT TaskQueueManager { bool SelectWorkQueueToService(size_t* out_queue_index); // Runs a single nestable task from the work queue designated by - // |queue_index|. Non-nestable task are reposted on the run loop. - // The queue must not be empty. - void ProcessTaskFromWorkQueue(size_t queue_index); + // |queue_index|. If |has_previous_task| is true, |previous_task| should + // contain the previous task in this work batch. Non-nestable task are + // reposted on the run loop. The queue must not be empty. + void ProcessTaskFromWorkQueue(size_t queue_index, + bool has_previous_task, + base::PendingTask* previous_task); bool RunsTasksOnCurrentThread() const; bool PostDelayedTask(const tracked_objects::Location& from_here, @@ -177,6 +186,8 @@ class CONTENT_EXPORT TaskQueueManager { scoped_refptr<cc::TestNowSource> time_source_; + ObserverList<base::MessageLoop::TaskObserver> task_observers_; + base::WeakPtrFactory<TaskQueueManager> weak_factory_; DISALLOW_COPY_AND_ASSIGN(TaskQueueManager); diff --git a/content/renderer/scheduler/task_queue_manager_unittest.cc b/content/renderer/scheduler/task_queue_manager_unittest.cc index ef5bd53..21c7525 100644 --- a/content/renderer/scheduler/task_queue_manager_unittest.cc +++ b/content/renderer/scheduler/task_queue_manager_unittest.cc @@ -11,6 +11,7 @@ #include "testing/gmock/include/gmock/gmock.h" using testing::ElementsAre; +using testing::_; namespace content { namespace { @@ -655,6 +656,79 @@ TEST_F(TaskQueueManagerTest, AutoPumpOnWakeupTriggeredByManuallyPumpedQueue) { EXPECT_THAT(run_order, ElementsAre(2, 1)); } +class MockTaskObserver : public base::MessageLoop::TaskObserver { + public: + MOCK_METHOD1(DidProcessTask, void(const base::PendingTask& task)); + MOCK_METHOD1(WillProcessTask, void(const base::PendingTask& task)); +}; + +TEST_F(TaskQueueManagerTest, TaskObserverAdding) { + InitializeWithRealMessageLoop(1u); + MockTaskObserver observer; + + manager_->SetWorkBatchSize(2); + manager_->AddTaskObserver(&observer); + + std::vector<int> run_order; + scoped_refptr<base::SingleThreadTaskRunner> runner = + manager_->TaskRunnerForQueue(0); + + runner->PostTask(FROM_HERE, base::Bind(&TestTask, 1, &run_order)); + runner->PostTask(FROM_HERE, base::Bind(&TestTask, 2, &run_order)); + + selector_->AppendQueueToService(0); + selector_->AppendQueueToService(0); + + // Two pairs of callbacks for the tasks above plus another one for the + // DoWork() posted by the task queue manager. + EXPECT_CALL(observer, WillProcessTask(_)).Times(3); + EXPECT_CALL(observer, DidProcessTask(_)).Times(3); + message_loop_->RunUntilIdle(); +} + +TEST_F(TaskQueueManagerTest, TaskObserverRemoving) { + InitializeWithRealMessageLoop(1u); + MockTaskObserver observer; + manager_->SetWorkBatchSize(2); + manager_->AddTaskObserver(&observer); + manager_->RemoveTaskObserver(&observer); + + std::vector<int> run_order; + scoped_refptr<base::SingleThreadTaskRunner> runner = + manager_->TaskRunnerForQueue(0); + + runner->PostTask(FROM_HERE, base::Bind(&TestTask, 1, &run_order)); + + EXPECT_CALL(observer, WillProcessTask(_)).Times(0); + EXPECT_CALL(observer, DidProcessTask(_)).Times(0); + + selector_->AppendQueueToService(0); + message_loop_->RunUntilIdle(); +} + +void RemoveObserverTask(TaskQueueManager* manager, + base::MessageLoop::TaskObserver* observer) { + manager->RemoveTaskObserver(observer); +} + +TEST_F(TaskQueueManagerTest, TaskObserverRemovingInsideTask) { + InitializeWithRealMessageLoop(1u); + MockTaskObserver observer; + manager_->SetWorkBatchSize(3); + manager_->AddTaskObserver(&observer); + + scoped_refptr<base::SingleThreadTaskRunner> runner = + manager_->TaskRunnerForQueue(0); + runner->PostTask(FROM_HERE, + base::Bind(&RemoveObserverTask, manager_.get(), &observer)); + + selector_->AppendQueueToService(0); + + EXPECT_CALL(observer, WillProcessTask(_)).Times(1); + EXPECT_CALL(observer, DidProcessTask(_)).Times(0); + message_loop_->RunUntilIdle(); +} + } // namespace } // namespace content diff --git a/content/renderer/scheduler/webthread_impl_for_scheduler.cc b/content/renderer/scheduler/webthread_impl_for_scheduler.cc new file mode 100644 index 0000000..b328142 --- /dev/null +++ b/content/renderer/scheduler/webthread_impl_for_scheduler.cc @@ -0,0 +1,45 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/renderer/scheduler/webthread_impl_for_scheduler.h" + +#include "content/renderer/scheduler/renderer_scheduler.h" +#include "third_party/WebKit/public/platform/WebTraceLocation.h" + +namespace content { + +WebThreadImplForScheduler::WebThreadImplForScheduler( + RendererScheduler* scheduler) + : task_runner_(scheduler->DefaultTaskRunner()), + scheduler_(scheduler), + thread_id_(base::PlatformThread::CurrentId()) { +} + +WebThreadImplForScheduler::~WebThreadImplForScheduler() { +} + +blink::PlatformThreadId WebThreadImplForScheduler::threadId() const { + return thread_id_; +} + +base::MessageLoop* WebThreadImplForScheduler::MessageLoop() const { + DCHECK(isCurrentThread()); + return base::MessageLoop::current(); +} + +base::SingleThreadTaskRunner* WebThreadImplForScheduler::TaskRunner() const { + return task_runner_.get(); +} + +void WebThreadImplForScheduler::AddTaskObserverInternal( + base::MessageLoop::TaskObserver* observer) { + scheduler_->AddTaskObserver(observer); +} + +void WebThreadImplForScheduler::RemoveTaskObserverInternal( + base::MessageLoop::TaskObserver* observer) { + scheduler_->RemoveTaskObserver(observer); +} + +} // namespace content diff --git a/content/renderer/scheduler/webthread_impl_for_scheduler.h b/content/renderer/scheduler/webthread_impl_for_scheduler.h new file mode 100644 index 0000000..36bd2e2 --- /dev/null +++ b/content/renderer/scheduler/webthread_impl_for_scheduler.h @@ -0,0 +1,41 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_RENDERER_SCHEDULER_WEBTHREAD_IMPL_FOR_SCHEDULER_H_ +#define CONTENT_RENDERER_SCHEDULER_WEBTHREAD_IMPL_FOR_SCHEDULER_H_ + +#include "content/child/webthread_impl.h" + +#include "base/containers/scoped_ptr_hash_map.h" + +namespace content { + +class RendererScheduler; + +class CONTENT_EXPORT WebThreadImplForScheduler : public WebThreadBase { + public: + explicit WebThreadImplForScheduler(RendererScheduler* scheduler); + virtual ~WebThreadImplForScheduler(); + + // blink::WebThread implementation. + blink::PlatformThreadId threadId() const override; + + // WebThreadBase implementation. + base::SingleThreadTaskRunner* TaskRunner() const override; + + private: + base::MessageLoop* MessageLoop() const override; + void AddTaskObserverInternal( + base::MessageLoop::TaskObserver* observer) override; + void RemoveTaskObserverInternal( + base::MessageLoop::TaskObserver* observer) override; + + scoped_refptr<base::SingleThreadTaskRunner> task_runner_; + RendererScheduler* scheduler_; // Not owned. + blink::PlatformThreadId thread_id_; +}; + +} // namespace content + +#endif // CONTENT_RENDERER_SCHEDULER_WEBTHREAD_IMPL_FOR_SCHEDULER_H_ diff --git a/content/renderer/scheduler/webthread_impl_for_scheduler_unittest.cc b/content/renderer/scheduler/webthread_impl_for_scheduler_unittest.cc new file mode 100644 index 0000000..68d3197 --- /dev/null +++ b/content/renderer/scheduler/webthread_impl_for_scheduler_unittest.cc @@ -0,0 +1,197 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/renderer/scheduler/webthread_impl_for_scheduler.h" + +#include "base/run_loop.h" +#include "content/renderer/scheduler/renderer_scheduler_impl.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "third_party/WebKit/public/platform/WebTraceLocation.h" + +namespace content { +namespace { + +const int kWorkBatchSize = 2; + +class MockTask : public blink::WebThread::Task { + public: + MOCK_METHOD0(run, void()); +}; + +class MockTaskObserver : public blink::WebThread::TaskObserver { + public: + MOCK_METHOD0(willProcessTask, void()); + MOCK_METHOD0(didProcessTask, void()); +}; +} // namespace + +class WebThreadImplForSchedulerTest : public testing::Test { + public: + WebThreadImplForSchedulerTest() + : scheduler_(message_loop_.task_runner()), + default_task_runner_(scheduler_.DefaultTaskRunner()), + thread_(&scheduler_) {} + + ~WebThreadImplForSchedulerTest() override {} + + protected: + void EatDefaultTask(MockTaskObserver* observer) { + // The scheduler posts one extra DoWork() task automatically. + EXPECT_CALL(*observer, willProcessTask()); + EXPECT_CALL(*observer, didProcessTask()); + } + + base::MessageLoop message_loop_; + RendererSchedulerImpl scheduler_; + scoped_refptr<base::SingleThreadTaskRunner> default_task_runner_; + WebThreadImplForScheduler thread_; + + DISALLOW_COPY_AND_ASSIGN(WebThreadImplForSchedulerTest); +}; + +TEST_F(WebThreadImplForSchedulerTest, TestTaskObserver) { + MockTaskObserver observer; + thread_.addTaskObserver(&observer); + scoped_ptr<MockTask> task(new MockTask()); + + { + testing::InSequence sequence; + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task, run()); + EXPECT_CALL(observer, didProcessTask()); + + EatDefaultTask(&observer); + } + + thread_.postTask(blink::WebTraceLocation(), task.release()); + message_loop_.RunUntilIdle(); + thread_.removeTaskObserver(&observer); +} + +TEST_F(WebThreadImplForSchedulerTest, TestWorkBatchWithOneTask) { + MockTaskObserver observer; + thread_.addTaskObserver(&observer); + scoped_ptr<MockTask> task(new MockTask()); + + scheduler_.SetWorkBatchSizeForTesting(kWorkBatchSize); + { + testing::InSequence sequence; + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task, run()); + EXPECT_CALL(observer, didProcessTask()); + + EatDefaultTask(&observer); + } + + thread_.postTask(blink::WebTraceLocation(), task.release()); + message_loop_.RunUntilIdle(); + thread_.removeTaskObserver(&observer); +} + +TEST_F(WebThreadImplForSchedulerTest, TestWorkBatchWithTwoTasks) { + MockTaskObserver observer; + thread_.addTaskObserver(&observer); + scoped_ptr<MockTask> task1(new MockTask()); + scoped_ptr<MockTask> task2(new MockTask()); + + scheduler_.SetWorkBatchSizeForTesting(kWorkBatchSize); + { + testing::InSequence sequence; + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task1, run()); + EXPECT_CALL(observer, didProcessTask()); + + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task2, run()); + EXPECT_CALL(observer, didProcessTask()); + + EatDefaultTask(&observer); + } + + thread_.postTask(blink::WebTraceLocation(), task1.release()); + thread_.postTask(blink::WebTraceLocation(), task2.release()); + message_loop_.RunUntilIdle(); + thread_.removeTaskObserver(&observer); +} + +TEST_F(WebThreadImplForSchedulerTest, TestWorkBatchWithThreeTasks) { + MockTaskObserver observer; + thread_.addTaskObserver(&observer); + scoped_ptr<MockTask> task1(new MockTask()); + scoped_ptr<MockTask> task2(new MockTask()); + scoped_ptr<MockTask> task3(new MockTask()); + + scheduler_.SetWorkBatchSizeForTesting(kWorkBatchSize); + { + testing::InSequence sequence; + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task1, run()); + EXPECT_CALL(observer, didProcessTask()); + + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task2, run()); + EXPECT_CALL(observer, didProcessTask()); + + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(*task3, run()); + EXPECT_CALL(observer, didProcessTask()); + + EatDefaultTask(&observer); + } + + thread_.postTask(blink::WebTraceLocation(), task1.release()); + thread_.postTask(blink::WebTraceLocation(), task2.release()); + thread_.postTask(blink::WebTraceLocation(), task3.release()); + message_loop_.RunUntilIdle(); + thread_.removeTaskObserver(&observer); +} + +class ExitRunLoopTask : public blink::WebThread::Task { + public: + ExitRunLoopTask(base::RunLoop* run_loop) : run_loop_(run_loop) {} + + virtual void run() { run_loop_->Quit(); } + + private: + base::RunLoop* run_loop_; +}; + +void EnterRunLoop(base::MessageLoop* message_loop, blink::WebThread* thread) { + // Note: WebThreads do not support nested run loops, which is why we use a + // run loop directly. + base::RunLoop run_loop; + thread->postTask(blink::WebTraceLocation(), new ExitRunLoopTask(&run_loop)); + message_loop->SetNestableTasksAllowed(true); + run_loop.Run(); +} + +TEST_F(WebThreadImplForSchedulerTest, TestNestedRunLoop) { + MockTaskObserver observer; + thread_.addTaskObserver(&observer); + + { + testing::InSequence sequence; + + // One callback for EnterRunLoop. + EXPECT_CALL(observer, willProcessTask()); + + // A pair for ExitRunLoopTask. + EXPECT_CALL(observer, willProcessTask()); + EXPECT_CALL(observer, didProcessTask()); + + // A final callback for EnterRunLoop. + EXPECT_CALL(observer, didProcessTask()); + + EatDefaultTask(&observer); + } + + message_loop_.PostTask( + FROM_HERE, base::Bind(&EnterRunLoop, base::Unretained(&message_loop_), + base::Unretained(&thread_))); + message_loop_.RunUntilIdle(); + thread_.removeTaskObserver(&observer); +} + +} // namespace content diff --git a/content/test/fake_renderer_scheduler.cc b/content/test/fake_renderer_scheduler.cc index 86bf4a2..5deb288 100644 --- a/content/test/fake_renderer_scheduler.cc +++ b/content/test/fake_renderer_scheduler.cc @@ -56,6 +56,14 @@ bool FakeRendererScheduler::ShouldYieldForHighPriorityWork() { return false; } +void FakeRendererScheduler::AddTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { +} + +void FakeRendererScheduler::RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) { +} + void FakeRendererScheduler::Shutdown() { } diff --git a/content/test/fake_renderer_scheduler.h b/content/test/fake_renderer_scheduler.h index fd2d3e0..b950536 100644 --- a/content/test/fake_renderer_scheduler.h +++ b/content/test/fake_renderer_scheduler.h @@ -27,6 +27,9 @@ class FakeRendererScheduler : public RendererScheduler { void DidAnimateForInputOnCompositorThread() override; bool IsHighPriorityWorkAnticipated() override; bool ShouldYieldForHighPriorityWork() override; + void AddTaskObserver(base::MessageLoop::TaskObserver* task_observer) override; + void RemoveTaskObserver( + base::MessageLoop::TaskObserver* task_observer) override; void Shutdown() override; private: |