diff options
Diffstat (limited to 'components/scheduler/child/task_queue_manager.cc')
-rw-r--r-- | components/scheduler/child/task_queue_manager.cc | 173 |
1 files changed, 108 insertions, 65 deletions
diff --git a/components/scheduler/child/task_queue_manager.cc b/components/scheduler/child/task_queue_manager.cc index 620ede5..606fe7f 100644 --- a/components/scheduler/child/task_queue_manager.cc +++ b/components/scheduler/child/task_queue_manager.cc @@ -12,6 +12,7 @@ #include "base/trace_event/trace_event.h" #include "base/trace_event/trace_event_argument.h" #include "components/scheduler/child/nestable_single_thread_task_runner.h" +#include "components/scheduler/child/task_queue.h" #include "components/scheduler/child/task_queue_selector.h" namespace { @@ -44,13 +45,13 @@ class LazyNow { base::TimeTicks now_; }; -class TaskQueue : public base::SingleThreadTaskRunner { +class TaskQueueImpl : public TaskQueue { public: - TaskQueue(TaskQueueManager* task_queue_manager, - const char* disabled_by_default_tracing_category, - const char* disabled_by_default_verbose_tracing_category); + TaskQueueImpl(TaskQueueManager* task_queue_manager, + const char* disabled_by_default_tracing_category, + const char* disabled_by_default_verbose_tracing_category); - // base::SingleThreadTaskRunner implementation. + // TaskQueue :implementation. bool RunsTasksOnCurrentThread() const override; bool PostDelayedTask(const tracked_objects::Location& from_here, const base::Closure& task, @@ -63,6 +64,9 @@ class TaskQueue : public base::SingleThreadTaskRunner { base::TimeDelta delay) override { return PostDelayedTaskImpl(from_here, task, delay, TaskType::NON_NESTABLE); } + bool PostDelayedTaskAt(const tracked_objects::Location& from_here, + const base::Closure& task, + base::TimeTicks desired_run_time) override; TaskQueueManager::QueueState GetQueueState() const; @@ -101,12 +105,17 @@ class TaskQueue : public base::SingleThreadTaskRunner { NON_NESTABLE, }; - ~TaskQueue() override; + ~TaskQueueImpl() override; bool PostDelayedTaskImpl(const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay, TaskType task_type); + bool PostDelayedTaskLocked(LazyNow* lazy_now, + const tracked_objects::Location& from_here, + const base::Closure& task, + base::TimeTicks desired_run_time, + TaskType task_type); // Delayed task posted to the underlying run loop, which locks |lock_| and // calls MoveReadyDelayedTasksToIncomingQueueLocked to process dealyed tasks @@ -149,6 +158,11 @@ class TaskQueue : public base::SingleThreadTaskRunner { const char* name_; const char* disabled_by_default_tracing_category_; const char* disabled_by_default_verbose_tracing_category_; + + // Queue-local task sequence number for maintaining the order of delayed + // tasks which are posted for the exact same time. Note that this will be + // replaced by the global sequence number when the delay has elapsed. + int delayed_task_sequence_number_; base::DelayedTaskQueue delayed_task_queue_; std::set<base::TimeTicks> in_flight_kick_delayed_tasks_; @@ -156,12 +170,13 @@ class TaskQueue : public base::SingleThreadTaskRunner { base::TaskQueue work_queue_; TaskQueueManager::WakeupPolicy wakeup_policy_; - DISALLOW_COPY_AND_ASSIGN(TaskQueue); + DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl); }; -TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager, - const char* disabled_by_default_tracing_category, - const char* disabled_by_default_verbose_tracing_category) +TaskQueueImpl::TaskQueueImpl( + TaskQueueManager* task_queue_manager, + const char* disabled_by_default_tracing_category, + const char* disabled_by_default_verbose_tracing_category) : thread_id_(base::PlatformThread::CurrentId()), task_queue_manager_(task_queue_manager), pump_policy_(TaskQueueManager::PumpPolicy::AUTO), @@ -170,13 +185,12 @@ TaskQueue::TaskQueue(TaskQueueManager* task_queue_manager, disabled_by_default_tracing_category), disabled_by_default_verbose_tracing_category_( disabled_by_default_verbose_tracing_category), - wakeup_policy_(TaskQueueManager::WakeupPolicy::CAN_WAKE_OTHER_QUEUES) { -} + delayed_task_sequence_number_(0), + wakeup_policy_(TaskQueueManager::WakeupPolicy::CAN_WAKE_OTHER_QUEUES) {} -TaskQueue::~TaskQueue() { -} +TaskQueueImpl::~TaskQueueImpl() {} -void TaskQueue::WillDeleteTaskQueueManager() { +void TaskQueueImpl::WillDeleteTaskQueueManager() { base::AutoLock lock(lock_); task_queue_manager_ = nullptr; delayed_task_queue_ = base::DelayedTaskQueue(); @@ -184,40 +198,67 @@ void TaskQueue::WillDeleteTaskQueueManager() { work_queue_ = base::TaskQueue(); } -bool TaskQueue::RunsTasksOnCurrentThread() const { +bool TaskQueueImpl::RunsTasksOnCurrentThread() const { base::AutoLock lock(lock_); return base::PlatformThread::CurrentId() == thread_id_; } -bool TaskQueue::PostDelayedTaskImpl(const tracked_objects::Location& from_here, - const base::Closure& task, - base::TimeDelta delay, - TaskType task_type) { +bool TaskQueueImpl::PostDelayedTaskAt( + const tracked_objects::Location& from_here, + const base::Closure& task, + base::TimeTicks desired_run_time) { + base::AutoLock lock(lock_); + if (!task_queue_manager_) + return false; + LazyNow lazy_now(task_queue_manager_); + return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time, + TaskType::NORMAL); +} + +bool TaskQueueImpl::PostDelayedTaskImpl( + const tracked_objects::Location& from_here, + const base::Closure& task, + base::TimeDelta delay, + TaskType task_type) { base::AutoLock lock(lock_); if (!task_queue_manager_) return false; + LazyNow lazy_now(task_queue_manager_); + base::TimeTicks desired_run_time; + if (delay > base::TimeDelta()) + desired_run_time = lazy_now.Now() + delay; + return PostDelayedTaskLocked(&lazy_now, from_here, task, desired_run_time, + task_type); +} + +bool TaskQueueImpl::PostDelayedTaskLocked( + LazyNow* lazy_now, + const tracked_objects::Location& from_here, + const base::Closure& task, + base::TimeTicks desired_run_time, + TaskType task_type) { + lock_.AssertAcquired(); + DCHECK(task_queue_manager_); base::PendingTask pending_task(from_here, task, base::TimeTicks(), task_type != TaskType::NON_NESTABLE); task_queue_manager_->DidQueueTask(pending_task); - if (delay > base::TimeDelta()) { - base::TimeTicks now = task_queue_manager_->Now(); - pending_task.delayed_run_time = now + delay; + if (!desired_run_time.is_null()) { + pending_task.delayed_run_time = std::max(lazy_now->Now(), desired_run_time); + pending_task.sequence_num = delayed_task_sequence_number_++; delayed_task_queue_.push(pending_task); TraceQueueSize(true); // If we changed the topmost task, then it is time to reschedule. - if (delayed_task_queue_.top().task.Equals(pending_task.task)) { - LazyNow lazy_now(now); - ScheduleDelayedWorkLocked(&lazy_now); - } + if (delayed_task_queue_.top().task.Equals(pending_task.task)) + ScheduleDelayedWorkLocked(lazy_now); return true; } EnqueueTaskLocked(pending_task); return true; } -void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { +void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue() { DCHECK(main_thread_checker_.CalledOnValidThread()); base::AutoLock lock(lock_); if (!task_queue_manager_) @@ -227,7 +268,8 @@ void TaskQueue::MoveReadyDelayedTasksToIncomingQueue() { MoveReadyDelayedTasksToIncomingQueueLocked(&lazy_now); } -void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { +void TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueueLocked( + LazyNow* lazy_now) { lock_.AssertAcquired(); // Enqueue all delayed tasks that should be running now. while (!delayed_task_queue_.empty() && @@ -241,12 +283,12 @@ void TaskQueue::MoveReadyDelayedTasksToIncomingQueueLocked(LazyNow* lazy_now) { ScheduleDelayedWorkLocked(lazy_now); } -void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { +void TaskQueueImpl::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { lock_.AssertAcquired(); // Any remaining tasks are in the future, so queue a task to kick them. if (!delayed_task_queue_.empty()) { base::TimeTicks next_run_time = delayed_task_queue_.top().delayed_run_time; - DCHECK_GT(next_run_time, lazy_now->Now()); + DCHECK_GE(next_run_time, lazy_now->Now()); // Make sure we don't have more than one // MoveReadyDelayedTasksToIncomingQueue posted for a particular scheduled // run time (note it's fine to have multiple ones in flight for distinct @@ -257,12 +299,13 @@ void TaskQueue::ScheduleDelayedWorkLocked(LazyNow* lazy_now) { base::TimeDelta delay = next_run_time - lazy_now->Now(); task_queue_manager_->PostDelayedTask( FROM_HERE, - Bind(&TaskQueue::MoveReadyDelayedTasksToIncomingQueue, this), delay); + Bind(&TaskQueueImpl::MoveReadyDelayedTasksToIncomingQueue, this), + delay); } } } -TaskQueueManager::QueueState TaskQueue::GetQueueState() const { +TaskQueueManager::QueueState TaskQueueImpl::GetQueueState() const { DCHECK(main_thread_checker_.CalledOnValidThread()); if (!work_queue_.empty()) return TaskQueueManager::QueueState::HAS_WORK; @@ -277,7 +320,7 @@ TaskQueueManager::QueueState TaskQueue::GetQueueState() const { } } -bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { +bool TaskQueueImpl::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { lock_.AssertAcquired(); // A null task is passed when UpdateQueue is called before any task is run. // In this case we don't want to pump an after_wakeup queue, so return true @@ -299,7 +342,7 @@ bool TaskQueue::TaskIsOlderThanQueuedTasks(const base::PendingTask* task) { return oldest_queued_task < *task; } -bool TaskQueue::ShouldAutoPumpQueueLocked( +bool TaskQueueImpl::ShouldAutoPumpQueueLocked( bool should_trigger_wakeup, const base::PendingTask* previous_task) { lock_.AssertAcquired(); @@ -313,7 +356,7 @@ bool TaskQueue::ShouldAutoPumpQueueLocked( return true; } -bool TaskQueue::NextPendingDelayedTaskRunTime( +bool TaskQueueImpl::NextPendingDelayedTaskRunTime( base::TimeTicks* next_pending_delayed_task) { base::AutoLock lock(lock_); if (delayed_task_queue_.empty()) @@ -322,9 +365,9 @@ bool TaskQueue::NextPendingDelayedTaskRunTime( return true; } -bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, - bool should_trigger_wakeup, - const base::PendingTask* previous_task) { +bool TaskQueueImpl::UpdateWorkQueue(LazyNow* lazy_now, + bool should_trigger_wakeup, + const base::PendingTask* previous_task) { if (!work_queue_.empty()) return true; @@ -339,14 +382,14 @@ bool TaskQueue::UpdateWorkQueue(LazyNow* lazy_now, } } -base::PendingTask TaskQueue::TakeTaskFromWorkQueue() { +base::PendingTask TaskQueueImpl::TakeTaskFromWorkQueue() { base::PendingTask pending_task = work_queue_.front(); work_queue_.pop(); TraceQueueSize(false); return pending_task; } -void TaskQueue::TraceQueueSize(bool is_locked) const { +void TaskQueueImpl::TraceQueueSize(bool is_locked) const { bool is_tracing; TRACE_EVENT_CATEGORY_GROUP_ENABLED(disabled_by_default_tracing_category_, &is_tracing); @@ -363,7 +406,7 @@ void TaskQueue::TraceQueueSize(bool is_locked) const { lock_.Release(); } -void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { +void TaskQueueImpl::EnqueueTaskLocked(const base::PendingTask& pending_task) { lock_.AssertAcquired(); if (!task_queue_manager_) return; @@ -382,7 +425,7 @@ void TaskQueue::EnqueueTaskLocked(const base::PendingTask& pending_task) { TraceQueueSize(true); } -void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { +void TaskQueueImpl::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { base::AutoLock lock(lock_); if (pump_policy == TaskQueueManager::PumpPolicy::AUTO && pump_policy_ != TaskQueueManager::PumpPolicy::AUTO) { @@ -391,7 +434,7 @@ void TaskQueue::SetPumpPolicy(TaskQueueManager::PumpPolicy pump_policy) { pump_policy_ = pump_policy; } -void TaskQueue::PumpQueueLocked() { +void TaskQueueImpl::PumpQueueLocked() { lock_.AssertAcquired(); if (task_queue_manager_) { LazyNow lazy_now(task_queue_manager_); @@ -405,12 +448,12 @@ void TaskQueue::PumpQueueLocked() { task_queue_manager_->MaybePostDoWorkOnMainRunner(); } -void TaskQueue::PumpQueue() { +void TaskQueueImpl::PumpQueue() { base::AutoLock lock(lock_); PumpQueueLocked(); } -void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { +void TaskQueueImpl::AsValueInto(base::trace_event::TracedValue* state) const { base::AutoLock lock(lock_); state->BeginDictionary(); if (name_) @@ -440,8 +483,8 @@ void TaskQueue::AsValueInto(base::trace_event::TracedValue* state) const { } // static -void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, - base::trace_event::TracedValue* state) { +void TaskQueueImpl::QueueAsValueInto(const base::TaskQueue& queue, + base::trace_event::TracedValue* state) { base::TaskQueue queue_copy(queue); while (!queue_copy.empty()) { TaskAsValueInto(queue_copy.front(), state); @@ -450,8 +493,8 @@ void TaskQueue::QueueAsValueInto(const base::TaskQueue& queue, } // static -void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, - base::trace_event::TracedValue* state) { +void TaskQueueImpl::QueueAsValueInto(const base::DelayedTaskQueue& queue, + base::trace_event::TracedValue* state) { base::DelayedTaskQueue queue_copy(queue); while (!queue_copy.empty()) { TaskAsValueInto(queue_copy.top(), state); @@ -460,8 +503,8 @@ void TaskQueue::QueueAsValueInto(const base::DelayedTaskQueue& queue, } // static -void TaskQueue::TaskAsValueInto(const base::PendingTask& task, - base::trace_event::TracedValue* state) { +void TaskQueueImpl::TaskAsValueInto(const base::PendingTask& task, + base::trace_event::TracedValue* state) { state->BeginDictionary(); state->SetString("posted_from", task.posted_from.ToString()); state->SetInteger("sequence_num", task.sequence_num); @@ -498,10 +541,10 @@ TaskQueueManager::TaskQueueManager( "TaskQueueManager", this); for (size_t i = 0; i < task_queue_count; i++) { - scoped_refptr<internal::TaskQueue> queue(make_scoped_refptr( - new internal::TaskQueue(this, - disabled_by_default_tracing_category, - disabled_by_default_verbose_tracing_category))); + scoped_refptr<internal::TaskQueueImpl> queue( + make_scoped_refptr(new internal::TaskQueueImpl( + this, disabled_by_default_tracing_category, + disabled_by_default_verbose_tracing_category))); queues_.push_back(queue); } @@ -525,13 +568,13 @@ TaskQueueManager::~TaskQueueManager() { selector_->SetTaskQueueSelectorObserver(nullptr); } -internal::TaskQueue* TaskQueueManager::Queue(size_t queue_index) const { +internal::TaskQueueImpl* TaskQueueManager::Queue(size_t queue_index) const { DCHECK_LT(queue_index, queues_.size()); return queues_[queue_index].get(); } -scoped_refptr<base::SingleThreadTaskRunner> -TaskQueueManager::TaskRunnerForQueue(size_t queue_index) const { +scoped_refptr<TaskQueue> TaskQueueManager::TaskRunnerForQueue( + size_t queue_index) const { return Queue(queue_index); } @@ -570,20 +613,20 @@ base::TimeTicks TaskQueueManager::NextPendingDelayedTaskRunTime() { void TaskQueueManager::SetPumpPolicy(size_t queue_index, PumpPolicy pump_policy) { DCHECK(main_thread_checker_.CalledOnValidThread()); - internal::TaskQueue* queue = Queue(queue_index); + internal::TaskQueueImpl* queue = Queue(queue_index); queue->SetPumpPolicy(pump_policy); } void TaskQueueManager::SetWakeupPolicy(size_t queue_index, WakeupPolicy wakeup_policy) { DCHECK(main_thread_checker_.CalledOnValidThread()); - internal::TaskQueue* queue = Queue(queue_index); + internal::TaskQueueImpl* queue = Queue(queue_index); queue->set_wakeup_policy(wakeup_policy); } void TaskQueueManager::PumpQueue(size_t queue_index) { DCHECK(main_thread_checker_.CalledOnValidThread()); - internal::TaskQueue* queue = Queue(queue_index); + internal::TaskQueueImpl* queue = Queue(queue_index); queue->PumpQueue(); } @@ -673,7 +716,7 @@ bool TaskQueueManager::ProcessTaskFromWorkQueue( base::PendingTask* previous_task) { DCHECK(main_thread_checker_.CalledOnValidThread()); scoped_refptr<DeletionSentinel> protect(deletion_sentinel_); - internal::TaskQueue* queue = Queue(queue_index); + internal::TaskQueueImpl* queue = Queue(queue_index); base::PendingTask pending_task = queue->TakeTaskFromWorkQueue(); task_was_run_bitmap_ |= UINT64_C(1) << queue_index; if (!pending_task.nestable && main_task_runner_->IsNested()) { @@ -713,13 +756,13 @@ bool TaskQueueManager::PostDelayedTask( const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) { - DCHECK(delay > base::TimeDelta()); + DCHECK_GE(delay, base::TimeDelta()); return main_task_runner_->PostDelayedTask(from_here, task, delay); } void TaskQueueManager::SetQueueName(size_t queue_index, const char* name) { DCHECK(main_thread_checker_.CalledOnValidThread()); - internal::TaskQueue* queue = Queue(queue_index); + internal::TaskQueueImpl* queue = Queue(queue_index); queue->set_name(name); } |