diff options
author | alexeypa@chromium.org <alexeypa@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-10-01 23:10:25 +0000 |
---|---|---|
committer | alexeypa@chromium.org <alexeypa@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-10-01 23:10:25 +0000 |
commit | 788504f27317338b03c0e62498b781c3c193db3e (patch) | |
tree | eac7b6031bccea34be73d7a0b42dd49f60873a26 /remoting/base | |
parent | b602628bb20f7feada7cc7ff5112f55a48a4d898 (diff) | |
download | chromium_src-788504f27317338b03c0e62498b781c3c193db3e.zip chromium_src-788504f27317338b03c0e62498b781c3c193db3e.tar.gz chromium_src-788504f27317338b03c0e62498b781c3c193db3e.tar.bz2 |
[Chromoting] Allow tasks to be posted to the plugin thread while a plugin instance is being shutdown.
This CL reliminates a restriction on posting tasks to the plugin thread making asynchronous shutdown logic more uniform and as result - simpler. The CL affects both the client and host plugins.
BUG=150783
Review URL: https://chromiumcodereview.appspot.com/10998002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@159581 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting/base')
-rw-r--r-- | remoting/base/plugin_thread_task_runner.cc | 217 | ||||
-rw-r--r-- | remoting/base/plugin_thread_task_runner.h | 80 |
2 files changed, 270 insertions, 27 deletions
diff --git a/remoting/base/plugin_thread_task_runner.cc b/remoting/base/plugin_thread_task_runner.cc index 07637a0..0a7c8e1 100644 --- a/remoting/base/plugin_thread_task_runner.cc +++ b/remoting/base/plugin_thread_task_runner.cc @@ -6,21 +6,80 @@ #include "base/bind.h" +namespace { + +base::TimeDelta CalcTimeDelta(base::TimeTicks when) { + return std::max(when - base::TimeTicks::Now(), base::TimeDelta()); +} + +} // namespace + namespace remoting { +PluginThreadTaskRunner::Delegate::~Delegate() { +} + PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate) : plugin_thread_id_(base::PlatformThread::CurrentId()), - delegate_(delegate) { + event_(false, false), + delegate_(delegate), + next_sequence_num_(0), + quit_received_(false), + stopped_(false) { } PluginThreadTaskRunner::~PluginThreadTaskRunner() { + DCHECK(delegate_ == NULL); + DCHECK(stopped_); } -void PluginThreadTaskRunner::Detach() { - base::AutoLock auto_lock(lock_); - if (delegate_) { - DCHECK(BelongsToCurrentThread()); +void PluginThreadTaskRunner::DetachAndRunShutdownLoop() { + DCHECK(BelongsToCurrentThread()); + + // Detach from the plugin thread and redirect all tasks posted after this + // point to the shutdown task loop. + { + base::AutoLock auto_lock(lock_); + + DCHECK(delegate_ != NULL); + DCHECK(!stopped_); + delegate_ = NULL; + stopped_ = quit_received_; + } + + // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled + // timers are cancelled. It is OK to clear |scheduled_timers_| even if + // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is + // called before NPP_Destroy()). + scheduled_timers_.clear(); + + // Run all tasks that are due. + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + + while (!stopped_) { + if (delayed_queue_.empty()) { + event_.Wait(); + } else { + event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time)); + } + + // Run all tasks that are due. + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + + base::AutoLock auto_lock(lock_); + stopped_ = quit_received_; + } +} + +void PluginThreadTaskRunner::Quit() { + base::AutoLock auto_lock(lock_); + + if (!quit_received_) { + quit_received_ = true; + event_.Signal(); } } @@ -28,21 +87,42 @@ bool PluginThreadTaskRunner::PostDelayedTask( const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) { - base::AutoLock auto_lock(lock_); - if (!delegate_) - return false; - base::Closure* springpad_closure = new base::Closure(base::Bind( - &PluginThreadTaskRunner::RunClosureIf, this, task)); - return delegate_->RunOnPluginThread( - delay, &PluginThreadTaskRunner::TaskSpringboard, springpad_closure); + // Wrap the task into |base::PendingTask|. + base::TimeTicks delayed_run_time; + if (delay > base::TimeDelta()) { + delayed_run_time = base::TimeTicks::Now() + delay; + } else { + DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; + } + + base::PendingTask pending_task(from_here, task, delayed_run_time, false); + + // Push the task to the incoming queue. + base::AutoLock locked(lock_); + + // Initialize the sequence number. The sequence number provides FIFO ordering + // for tasks with the same |delayed_run_time|. + pending_task.sequence_num = next_sequence_num_++; + + // Post an asynchronous call on the plugin thread to process the task. + if (incoming_queue_.empty()) { + PostRunTasks(); + } + + incoming_queue_.push(pending_task); + pending_task.task.Reset(); + + // No tasks should be posted after Quit() has been called. + DCHECK(!quit_received_); + return true; } bool PluginThreadTaskRunner::PostNonNestableDelayedTask( const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) { - // All tasks running on this message loop are non-nestable. + // All tasks running on this task loop are non-nestable. return PostDelayedTask(from_here, task, delay); } @@ -53,6 +133,110 @@ bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const { return base::PlatformThread::CurrentId() == plugin_thread_id_; } +void PluginThreadTaskRunner::PostRunTasks() { + // Post tasks to the plugin thread when it is availabe or spin the shutdown + // task loop. + if (delegate_ != NULL) { + base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this); + delegate_->RunOnPluginThread( + base::TimeDelta(), + &PluginThreadTaskRunner::TaskSpringboard, + new base::Closure(closure)); + } else { + event_.Signal(); + } +} + +void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) { + DCHECK(BelongsToCurrentThread()); + + // |delegate_| is updated from the plugin thread only, so it is safe to access + // it here without taking the lock. + if (delegate_ != NULL) { + // Schedule RunDelayedTasks() to be called at |when| if it hasn't been + // scheduled already. + if (scheduled_timers_.insert(when).second) { + base::TimeDelta delay = CalcTimeDelta(when); + base::Closure closure = + base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when); + delegate_->RunOnPluginThread( + delay, + &PluginThreadTaskRunner::TaskSpringboard, + new base::Closure(closure)); + } + } else { + // Spin the shutdown loop if the task runner has already been detached. + // The shutdown loop will pick the tasks to run itself. + event_.Signal(); + } +} + +void PluginThreadTaskRunner::ProcessIncomingTasks() { + DCHECK(BelongsToCurrentThread()); + + // Grab all unsorted tasks accomulated so far. + base::TaskQueue work_queue; + { + base::AutoLock locked(lock_); + incoming_queue_.Swap(&work_queue); + } + + while (!work_queue.empty()) { + base::PendingTask pending_task = work_queue.front(); + work_queue.pop(); + + if (pending_task.delayed_run_time.is_null()) { + pending_task.task.Run(); + } else { + delayed_queue_.push(pending_task); + } + } +} + +void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) { + DCHECK(BelongsToCurrentThread()); + + scheduled_timers_.erase(when); + + // |stopped_| is updated by the plugin thread only, so it is safe to access + // it here without taking the lock. + if (!stopped_) { + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + } +} + +void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) { + DCHECK(BelongsToCurrentThread()); + + // Run all due tasks. + while (!delayed_queue_.empty() && + delayed_queue_.top().delayed_run_time <= now) { + delayed_queue_.top().task.Run(); + delayed_queue_.pop(); + } + + // Post a delayed asynchronous call to the plugin thread to process tasks from + // the delayed queue. + if (!delayed_queue_.empty()) { + base::TimeTicks when = delayed_queue_.top().delayed_run_time; + if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) { + PostDelayedRunTasks(when); + } + } +} + +void PluginThreadTaskRunner::RunTasks() { + DCHECK(BelongsToCurrentThread()); + + // |stopped_| is updated by the plugin thread only, so it is safe to access + // it here without taking the lock. + if (!stopped_) { + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + } +} + // static void PluginThreadTaskRunner::TaskSpringboard(void* data) { base::Closure* task = reinterpret_cast<base::Closure*>(data); @@ -60,11 +244,4 @@ void PluginThreadTaskRunner::TaskSpringboard(void* data) { delete task; } -void PluginThreadTaskRunner::RunClosureIf(const base::Closure& task) { - // |delegate_| can be changed only from our thread, so it's safe to - // access it without acquiring |lock_|. - if (delegate_) - task.Run(); -} - } // namespace remoting diff --git a/remoting/base/plugin_thread_task_runner.h b/remoting/base/plugin_thread_task_runner.h index 5ebafe1..bfae1ce 100644 --- a/remoting/base/plugin_thread_task_runner.h +++ b/remoting/base/plugin_thread_task_runner.h @@ -5,11 +5,18 @@ #ifndef REMOTING_BASE_PLUGIN_THREAD_TASK_RUNNER_H_ #define REMOTING_BASE_PLUGIN_THREAD_TASK_RUNNER_H_ +#include <set> + #include "base/callback_forward.h" #include "base/compiler_specific.h" +#include "base/pending_task.h" #include "base/single_thread_task_runner.h" #include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" #include "base/threading/platform_thread.h" +#include "base/time.h" + +class MessageLoop; namespace remoting { @@ -18,8 +25,7 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner { public: class Delegate { public: - Delegate() { } - virtual ~Delegate() { } + virtual ~Delegate(); virtual bool RunOnPluginThread( base::TimeDelta delay, void(function)(void*), void* data) = 0; @@ -28,7 +34,18 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner { // Caller keeps ownership of delegate. PluginThreadTaskRunner(Delegate* delegate); - void Detach(); + // Detaches the PluginThreadTaskRunner from the underlying Delegate and + // processes posted tasks until Quit() is called. This is used during plugin + // shutdown, when the plugin environment has stopped accepting new tasks to + // run, to process cleanup tasks posted to the plugin thread. + // This method must be called on the plugin thread. + void DetachAndRunShutdownLoop(); + + // Makes DetachAndRunShutdownLoop() stop processing tasks and return control + // to the caller. Calling Quit() before DetachAndRunShutdownLoop() causes + // the latter to exit immediately when called, without processing any delayed + // shutdown tasks. This method can be called from any thread. + void Quit(); // base::SingleThreadTaskRunner interface. virtual bool PostDelayedTask( @@ -39,23 +56,72 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner { const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) OVERRIDE; - virtual bool RunsTasksOnCurrentThread() const OVERRIDE; protected: virtual ~PluginThreadTaskRunner(); private: + // Methods that can be called from any thread. + + // Schedules RunTasks to be called on the plugin thread. + void PostRunTasks(); + + // Methods that are always called on the plugin thread. + + // Schedules RunDelayedTasks() to be called on the plugin thread. |when| + // specifies the time when RunDelayedTasks() should be called. + void PostDelayedRunTasks(base::TimeTicks when); + + // Processes the incoming task queue: runs all non delayed tasks and posts all + // delayed tasks to |delayed_queue_|. + void ProcessIncomingTasks(); + + // Called in response to PostDelayedRunTasks(). + void RunDelayedTasks(base::TimeTicks when); + + // Runs all tasks that are due. + void RunDueTasks(base::TimeTicks now); + + // Called in response to PostRunTasks(). + void RunTasks(); + static void TaskSpringboard(void* data); - void RunClosureIf(const base::Closure& task); + const base::PlatformThreadId plugin_thread_id_; - base::PlatformThreadId plugin_thread_id_; + // Used by the shutdown loop to block the thread until there is a task ready + // to run. + base::WaitableEvent event_; - // |lock_| must be acquired when accessing |delegate_|. base::Lock lock_; + + // The members below are protected by |lock_|. + + // Pointer to the delegate that implements scheduling tasks via the plugin + // API. Delegate* delegate_; + // Contains all posted tasks that haven't been sorted yet. + base::TaskQueue incoming_queue_; + + // The next sequence number to use for delayed tasks. + int next_sequence_num_; + + // True if Quit() has been called. + bool quit_received_; + + // The members below are accessed only on the plugin thread. + + // Contains delayed tasks, sorted by their 'delayed_run_time' property. + base::DelayedTaskQueue delayed_queue_; + + // The list of timestamps when scheduled timers are expected to fire. + std::set<base::TimeTicks> scheduled_timers_; + + // True if the shutdown task loop was been stopped. + bool stopped_; + DISALLOW_COPY_AND_ASSIGN(PluginThreadTaskRunner); }; |