diff options
Diffstat (limited to 'remoting')
-rw-r--r-- | remoting/base/plugin_thread_task_runner.cc | 217 | ||||
-rw-r--r-- | remoting/base/plugin_thread_task_runner.h | 80 | ||||
-rw-r--r-- | remoting/client/plugin/chromoting_instance.cc | 14 | ||||
-rw-r--r-- | remoting/host/plugin/host_script_object.cc | 77 | ||||
-rw-r--r-- | remoting/host/plugin/host_script_object.h | 3 |
5 files changed, 312 insertions, 79 deletions
diff --git a/remoting/base/plugin_thread_task_runner.cc b/remoting/base/plugin_thread_task_runner.cc index 07637a0..0a7c8e1 100644 --- a/remoting/base/plugin_thread_task_runner.cc +++ b/remoting/base/plugin_thread_task_runner.cc @@ -6,21 +6,80 @@ #include "base/bind.h" +namespace { + +base::TimeDelta CalcTimeDelta(base::TimeTicks when) { + return std::max(when - base::TimeTicks::Now(), base::TimeDelta()); +} + +} // namespace + namespace remoting { +PluginThreadTaskRunner::Delegate::~Delegate() { +} + PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate) : plugin_thread_id_(base::PlatformThread::CurrentId()), - delegate_(delegate) { + event_(false, false), + delegate_(delegate), + next_sequence_num_(0), + quit_received_(false), + stopped_(false) { } PluginThreadTaskRunner::~PluginThreadTaskRunner() { + DCHECK(delegate_ == NULL); + DCHECK(stopped_); } -void PluginThreadTaskRunner::Detach() { - base::AutoLock auto_lock(lock_); - if (delegate_) { - DCHECK(BelongsToCurrentThread()); +void PluginThreadTaskRunner::DetachAndRunShutdownLoop() { + DCHECK(BelongsToCurrentThread()); + + // Detach from the plugin thread and redirect all tasks posted after this + // point to the shutdown task loop. + { + base::AutoLock auto_lock(lock_); + + DCHECK(delegate_ != NULL); + DCHECK(!stopped_); + delegate_ = NULL; + stopped_ = quit_received_; + } + + // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled + // timers are cancelled. It is OK to clear |scheduled_timers_| even if + // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is + // called before NPP_Destroy()). + scheduled_timers_.clear(); + + // Run all tasks that are due. + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + + while (!stopped_) { + if (delayed_queue_.empty()) { + event_.Wait(); + } else { + event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time)); + } + + // Run all tasks that are due. + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + + base::AutoLock auto_lock(lock_); + stopped_ = quit_received_; + } +} + +void PluginThreadTaskRunner::Quit() { + base::AutoLock auto_lock(lock_); + + if (!quit_received_) { + quit_received_ = true; + event_.Signal(); } } @@ -28,21 +87,42 @@ bool PluginThreadTaskRunner::PostDelayedTask( const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) { - base::AutoLock auto_lock(lock_); - if (!delegate_) - return false; - base::Closure* springpad_closure = new base::Closure(base::Bind( - &PluginThreadTaskRunner::RunClosureIf, this, task)); - return delegate_->RunOnPluginThread( - delay, &PluginThreadTaskRunner::TaskSpringboard, springpad_closure); + // Wrap the task into |base::PendingTask|. + base::TimeTicks delayed_run_time; + if (delay > base::TimeDelta()) { + delayed_run_time = base::TimeTicks::Now() + delay; + } else { + DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; + } + + base::PendingTask pending_task(from_here, task, delayed_run_time, false); + + // Push the task to the incoming queue. + base::AutoLock locked(lock_); + + // Initialize the sequence number. The sequence number provides FIFO ordering + // for tasks with the same |delayed_run_time|. + pending_task.sequence_num = next_sequence_num_++; + + // Post an asynchronous call on the plugin thread to process the task. + if (incoming_queue_.empty()) { + PostRunTasks(); + } + + incoming_queue_.push(pending_task); + pending_task.task.Reset(); + + // No tasks should be posted after Quit() has been called. + DCHECK(!quit_received_); + return true; } bool PluginThreadTaskRunner::PostNonNestableDelayedTask( const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) { - // All tasks running on this message loop are non-nestable. + // All tasks running on this task loop are non-nestable. return PostDelayedTask(from_here, task, delay); } @@ -53,6 +133,110 @@ bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const { return base::PlatformThread::CurrentId() == plugin_thread_id_; } +void PluginThreadTaskRunner::PostRunTasks() { + // Post tasks to the plugin thread when it is availabe or spin the shutdown + // task loop. + if (delegate_ != NULL) { + base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this); + delegate_->RunOnPluginThread( + base::TimeDelta(), + &PluginThreadTaskRunner::TaskSpringboard, + new base::Closure(closure)); + } else { + event_.Signal(); + } +} + +void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) { + DCHECK(BelongsToCurrentThread()); + + // |delegate_| is updated from the plugin thread only, so it is safe to access + // it here without taking the lock. + if (delegate_ != NULL) { + // Schedule RunDelayedTasks() to be called at |when| if it hasn't been + // scheduled already. + if (scheduled_timers_.insert(when).second) { + base::TimeDelta delay = CalcTimeDelta(when); + base::Closure closure = + base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when); + delegate_->RunOnPluginThread( + delay, + &PluginThreadTaskRunner::TaskSpringboard, + new base::Closure(closure)); + } + } else { + // Spin the shutdown loop if the task runner has already been detached. + // The shutdown loop will pick the tasks to run itself. + event_.Signal(); + } +} + +void PluginThreadTaskRunner::ProcessIncomingTasks() { + DCHECK(BelongsToCurrentThread()); + + // Grab all unsorted tasks accomulated so far. + base::TaskQueue work_queue; + { + base::AutoLock locked(lock_); + incoming_queue_.Swap(&work_queue); + } + + while (!work_queue.empty()) { + base::PendingTask pending_task = work_queue.front(); + work_queue.pop(); + + if (pending_task.delayed_run_time.is_null()) { + pending_task.task.Run(); + } else { + delayed_queue_.push(pending_task); + } + } +} + +void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) { + DCHECK(BelongsToCurrentThread()); + + scheduled_timers_.erase(when); + + // |stopped_| is updated by the plugin thread only, so it is safe to access + // it here without taking the lock. + if (!stopped_) { + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + } +} + +void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) { + DCHECK(BelongsToCurrentThread()); + + // Run all due tasks. + while (!delayed_queue_.empty() && + delayed_queue_.top().delayed_run_time <= now) { + delayed_queue_.top().task.Run(); + delayed_queue_.pop(); + } + + // Post a delayed asynchronous call to the plugin thread to process tasks from + // the delayed queue. + if (!delayed_queue_.empty()) { + base::TimeTicks when = delayed_queue_.top().delayed_run_time; + if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) { + PostDelayedRunTasks(when); + } + } +} + +void PluginThreadTaskRunner::RunTasks() { + DCHECK(BelongsToCurrentThread()); + + // |stopped_| is updated by the plugin thread only, so it is safe to access + // it here without taking the lock. + if (!stopped_) { + ProcessIncomingTasks(); + RunDueTasks(base::TimeTicks::Now()); + } +} + // static void PluginThreadTaskRunner::TaskSpringboard(void* data) { base::Closure* task = reinterpret_cast<base::Closure*>(data); @@ -60,11 +244,4 @@ void PluginThreadTaskRunner::TaskSpringboard(void* data) { delete task; } -void PluginThreadTaskRunner::RunClosureIf(const base::Closure& task) { - // |delegate_| can be changed only from our thread, so it's safe to - // access it without acquiring |lock_|. - if (delegate_) - task.Run(); -} - } // namespace remoting diff --git a/remoting/base/plugin_thread_task_runner.h b/remoting/base/plugin_thread_task_runner.h index 5ebafe1..bfae1ce 100644 --- a/remoting/base/plugin_thread_task_runner.h +++ b/remoting/base/plugin_thread_task_runner.h @@ -5,11 +5,18 @@ #ifndef REMOTING_BASE_PLUGIN_THREAD_TASK_RUNNER_H_ #define REMOTING_BASE_PLUGIN_THREAD_TASK_RUNNER_H_ +#include <set> + #include "base/callback_forward.h" #include "base/compiler_specific.h" +#include "base/pending_task.h" #include "base/single_thread_task_runner.h" #include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" #include "base/threading/platform_thread.h" +#include "base/time.h" + +class MessageLoop; namespace remoting { @@ -18,8 +25,7 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner { public: class Delegate { public: - Delegate() { } - virtual ~Delegate() { } + virtual ~Delegate(); virtual bool RunOnPluginThread( base::TimeDelta delay, void(function)(void*), void* data) = 0; @@ -28,7 +34,18 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner { // Caller keeps ownership of delegate. PluginThreadTaskRunner(Delegate* delegate); - void Detach(); + // Detaches the PluginThreadTaskRunner from the underlying Delegate and + // processes posted tasks until Quit() is called. This is used during plugin + // shutdown, when the plugin environment has stopped accepting new tasks to + // run, to process cleanup tasks posted to the plugin thread. + // This method must be called on the plugin thread. + void DetachAndRunShutdownLoop(); + + // Makes DetachAndRunShutdownLoop() stop processing tasks and return control + // to the caller. Calling Quit() before DetachAndRunShutdownLoop() causes + // the latter to exit immediately when called, without processing any delayed + // shutdown tasks. This method can be called from any thread. + void Quit(); // base::SingleThreadTaskRunner interface. virtual bool PostDelayedTask( @@ -39,23 +56,72 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner { const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) OVERRIDE; - virtual bool RunsTasksOnCurrentThread() const OVERRIDE; protected: virtual ~PluginThreadTaskRunner(); private: + // Methods that can be called from any thread. + + // Schedules RunTasks to be called on the plugin thread. + void PostRunTasks(); + + // Methods that are always called on the plugin thread. + + // Schedules RunDelayedTasks() to be called on the plugin thread. |when| + // specifies the time when RunDelayedTasks() should be called. + void PostDelayedRunTasks(base::TimeTicks when); + + // Processes the incoming task queue: runs all non delayed tasks and posts all + // delayed tasks to |delayed_queue_|. + void ProcessIncomingTasks(); + + // Called in response to PostDelayedRunTasks(). + void RunDelayedTasks(base::TimeTicks when); + + // Runs all tasks that are due. + void RunDueTasks(base::TimeTicks now); + + // Called in response to PostRunTasks(). + void RunTasks(); + static void TaskSpringboard(void* data); - void RunClosureIf(const base::Closure& task); + const base::PlatformThreadId plugin_thread_id_; - base::PlatformThreadId plugin_thread_id_; + // Used by the shutdown loop to block the thread until there is a task ready + // to run. + base::WaitableEvent event_; - // |lock_| must be acquired when accessing |delegate_|. base::Lock lock_; + + // The members below are protected by |lock_|. + + // Pointer to the delegate that implements scheduling tasks via the plugin + // API. Delegate* delegate_; + // Contains all posted tasks that haven't been sorted yet. + base::TaskQueue incoming_queue_; + + // The next sequence number to use for delayed tasks. + int next_sequence_num_; + + // True if Quit() has been called. + bool quit_received_; + + // The members below are accessed only on the plugin thread. + + // Contains delayed tasks, sorted by their 'delayed_run_time' property. + base::DelayedTaskQueue delayed_queue_; + + // The list of timestamps when scheduled timers are expected to fire. + std::set<base::TimeTicks> scheduled_timers_; + + // True if the shutdown task loop was been stopped. + bool stopped_; + DISALLOW_COPY_AND_ASSIGN(PluginThreadTaskRunner); }; diff --git a/remoting/client/plugin/chromoting_instance.cc b/remoting/client/plugin/chromoting_instance.cc index 9038b4f..db90e9a 100644 --- a/remoting/client/plugin/chromoting_instance.cc +++ b/remoting/client/plugin/chromoting_instance.cc @@ -189,17 +189,17 @@ ChromotingInstance::~ChromotingInstance() { view_.reset(); if (client_.get()) { - base::WaitableEvent done_event(true, false); - client_->Stop(base::Bind(&base::WaitableEvent::Signal, - base::Unretained(&done_event))); - done_event.Wait(); + client_->Stop(base::Bind(&PluginThreadTaskRunner::Quit, + plugin_task_runner_)); + } else { + plugin_task_runner_->Quit(); } + // Ensure that nothing touches the plugin thread delegate after this point. + plugin_task_runner_->DetachAndRunShutdownLoop(); + // Stopping the context shuts down all chromoting threads. context_.Stop(); - - // Ensure that nothing touches the plugin thread delegate after this point. - plugin_task_runner_->Detach(); } bool ChromotingInstance::Init(uint32_t argc, diff --git a/remoting/host/plugin/host_script_object.cc b/remoting/host/plugin/host_script_object.cc index 21b79be..2d441f3 100644 --- a/remoting/host/plugin/host_script_object.cc +++ b/remoting/host/plugin/host_script_object.cc @@ -95,8 +95,6 @@ HostNPScriptObject::HostNPScriptObject( new PluginThreadTaskRunner(plugin_thread_delegate)), desktop_environment_factory_(new DesktopEnvironmentFactory()), failed_login_attempts_(0), - disconnected_event_(true, false), - stopped_event_(true, false), nat_traversal_enabled_(false), policy_received_(false), daemon_controller_(DaemonController::Create()), @@ -109,12 +107,6 @@ HostNPScriptObject::~HostNPScriptObject() { HostLogHandler::UnregisterLoggingScriptObject(this); - // Stop the message loop. Any attempt to post a task to - // |context_.ui_task_runner()| will result in a CHECK() after this point. - // TODO(alexeypa): Enable posting messages to |plugin_task_runner_| during - // shutdown to avoid this hack. - plugin_task_runner_->Detach(); - // Stop listening for policy updates. if (policy_watcher_.get()) { base::WaitableEvent policy_watcher_stopped_(true, false); @@ -124,35 +116,17 @@ HostNPScriptObject::~HostNPScriptObject() { } if (host_context_.get()) { - // Disconnect synchronously. We cannot disconnect asynchronously - // here because |host_context_| needs to be stopped on the plugin - // thread, but the plugin thread may not exist after the instance - // is destroyed. - disconnected_event_.Reset(); DisconnectInternal(); - - // |disconnected_event_| is signalled when the host is completely stopped. - disconnected_event_.Wait(); - - // UI needs to be shut down on the UI thread before we destroy the - // host context (because it depends on the context object), but - // only after the host has been shut down (becase the UI object is - // registered as status observer for the host, and we can't - // unregister it from this thread). - it2me_host_user_interface_.reset(); - - // Release the context's TaskRunner references for the threads, so they can - // exit when no objects need them. - host_context_->ReleaseTaskRunners(); - - // |stopped_event_| is signalled when the last reference to the plugin - // thread is dropped. - stopped_event_.Wait(); - - // Stop all threads. - host_context_.reset(); + } else { + plugin_task_runner_->Quit(); } + // Stop the message loop and run the remaining tasks. The loop will exit + // once the wrapping AutoThreadTaskRunner is destroyed. + plugin_task_runner_->DetachAndRunShutdownLoop(); + + // Stop all threads. + host_context_.reset(); worker_thread_.Stop(); } @@ -160,10 +134,12 @@ bool HostNPScriptObject::Init() { DCHECK(plugin_task_runner_->BelongsToCurrentThread()); VLOG(2) << "Init"; - host_context_.reset(new ChromotingHostContext(new AutoThreadTaskRunner( - plugin_task_runner_, - base::Bind(&base::WaitableEvent::Signal, - base::Unretained(&stopped_event_))))); + scoped_refptr<AutoThreadTaskRunner> auto_plugin_task_runner = + new AutoThreadTaskRunner(plugin_task_runner_, + base::Bind(&PluginThreadTaskRunner::Quit, + plugin_task_runner_)); + host_context_.reset(new ChromotingHostContext(auto_plugin_task_runner)); + auto_plugin_task_runner = NULL; if (!host_context_->Start()) { host_context_.reset(); return false; @@ -879,6 +855,9 @@ bool HostNPScriptObject::StopDaemon(const NPVariant* args, } void HostNPScriptObject::DisconnectInternal() { + if (!host_context_->network_task_runner()) + return; + if (!host_context_->network_task_runner()->BelongsToCurrentThread()) { host_context_->network_task_runner()->PostTask( FROM_HERE, base::Bind(&HostNPScriptObject::DisconnectInternal, @@ -888,13 +867,13 @@ void HostNPScriptObject::DisconnectInternal() { switch (state_) { case kDisconnected: - disconnected_event_.Signal(); + OnShutdownFinished(); return; case kStarting: SetState(kDisconnecting); SetState(kDisconnected); - disconnected_event_.Signal(); + OnShutdownFinished(); return; case kDisconnecting: @@ -921,9 +900,23 @@ void HostNPScriptObject::DisconnectInternal() { } void HostNPScriptObject::OnShutdownFinished() { - DCHECK(host_context_->network_task_runner()->BelongsToCurrentThread()); + if (!host_context_->ui_task_runner()->BelongsToCurrentThread()) { + host_context_->ui_task_runner()->PostTask( + FROM_HERE, base::Bind(&HostNPScriptObject::OnShutdownFinished, + base::Unretained(this))); + return; + } + + // UI needs to be shut down on the UI thread before we destroy the + // host context (because it depends on the context object), but + // only after the host has been shut down (becase the UI object is + // registered as status observer for the host, and we can't + // unregister it from this thread). + it2me_host_user_interface_.reset(); - disconnected_event_.Signal(); + // Release the context's TaskRunner references for the threads, so they can + // exit when no objects need them. + host_context_->ReleaseTaskRunners(); } void HostNPScriptObject::OnPolicyUpdate( diff --git a/remoting/host/plugin/host_script_object.h b/remoting/host/plugin/host_script_object.h index c9bd1ec5..cfcb990 100644 --- a/remoting/host/plugin/host_script_object.h +++ b/remoting/host/plugin/host_script_object.h @@ -322,9 +322,6 @@ class HostNPScriptObject : public HostStatusObserver { UiStrings ui_strings_; base::Lock ui_strings_lock_; - base::WaitableEvent disconnected_event_; - base::WaitableEvent stopped_event_; - base::Lock nat_policy_lock_; scoped_ptr<policy_hack::PolicyWatcher> policy_watcher_; |