summaryrefslogtreecommitdiffstats
path: root/remoting
diff options
context:
space:
mode:
Diffstat (limited to 'remoting')
-rw-r--r--remoting/base/plugin_thread_task_runner.cc217
-rw-r--r--remoting/base/plugin_thread_task_runner.h80
-rw-r--r--remoting/client/plugin/chromoting_instance.cc14
-rw-r--r--remoting/host/plugin/host_script_object.cc77
-rw-r--r--remoting/host/plugin/host_script_object.h3
5 files changed, 312 insertions, 79 deletions
diff --git a/remoting/base/plugin_thread_task_runner.cc b/remoting/base/plugin_thread_task_runner.cc
index 07637a0..0a7c8e1 100644
--- a/remoting/base/plugin_thread_task_runner.cc
+++ b/remoting/base/plugin_thread_task_runner.cc
@@ -6,21 +6,80 @@
#include "base/bind.h"
+namespace {
+
+base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
+ return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
+}
+
+} // namespace
+
namespace remoting {
+PluginThreadTaskRunner::Delegate::~Delegate() {
+}
+
PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
: plugin_thread_id_(base::PlatformThread::CurrentId()),
- delegate_(delegate) {
+ event_(false, false),
+ delegate_(delegate),
+ next_sequence_num_(0),
+ quit_received_(false),
+ stopped_(false) {
}
PluginThreadTaskRunner::~PluginThreadTaskRunner() {
+ DCHECK(delegate_ == NULL);
+ DCHECK(stopped_);
}
-void PluginThreadTaskRunner::Detach() {
- base::AutoLock auto_lock(lock_);
- if (delegate_) {
- DCHECK(BelongsToCurrentThread());
+void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
+ DCHECK(BelongsToCurrentThread());
+
+ // Detach from the plugin thread and redirect all tasks posted after this
+ // point to the shutdown task loop.
+ {
+ base::AutoLock auto_lock(lock_);
+
+ DCHECK(delegate_ != NULL);
+ DCHECK(!stopped_);
+
delegate_ = NULL;
+ stopped_ = quit_received_;
+ }
+
+ // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
+ // timers are cancelled. It is OK to clear |scheduled_timers_| even if
+ // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
+ // called before NPP_Destroy()).
+ scheduled_timers_.clear();
+
+ // Run all tasks that are due.
+ ProcessIncomingTasks();
+ RunDueTasks(base::TimeTicks::Now());
+
+ while (!stopped_) {
+ if (delayed_queue_.empty()) {
+ event_.Wait();
+ } else {
+ event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
+ }
+
+ // Run all tasks that are due.
+ ProcessIncomingTasks();
+ RunDueTasks(base::TimeTicks::Now());
+
+ base::AutoLock auto_lock(lock_);
+ stopped_ = quit_received_;
+ }
+}
+
+void PluginThreadTaskRunner::Quit() {
+ base::AutoLock auto_lock(lock_);
+
+ if (!quit_received_) {
+ quit_received_ = true;
+ event_.Signal();
}
}
@@ -28,21 +87,42 @@ bool PluginThreadTaskRunner::PostDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
- base::AutoLock auto_lock(lock_);
- if (!delegate_)
- return false;
- base::Closure* springpad_closure = new base::Closure(base::Bind(
- &PluginThreadTaskRunner::RunClosureIf, this, task));
- return delegate_->RunOnPluginThread(
- delay, &PluginThreadTaskRunner::TaskSpringboard, springpad_closure);
+ // Wrap the task into |base::PendingTask|.
+ base::TimeTicks delayed_run_time;
+ if (delay > base::TimeDelta()) {
+ delayed_run_time = base::TimeTicks::Now() + delay;
+ } else {
+ DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
+ }
+
+ base::PendingTask pending_task(from_here, task, delayed_run_time, false);
+
+ // Push the task to the incoming queue.
+ base::AutoLock locked(lock_);
+
+ // Initialize the sequence number. The sequence number provides FIFO ordering
+ // for tasks with the same |delayed_run_time|.
+ pending_task.sequence_num = next_sequence_num_++;
+
+ // Post an asynchronous call on the plugin thread to process the task.
+ if (incoming_queue_.empty()) {
+ PostRunTasks();
+ }
+
+ incoming_queue_.push(pending_task);
+ pending_task.task.Reset();
+
+ // No tasks should be posted after Quit() has been called.
+ DCHECK(!quit_received_);
+ return true;
}
bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
- // All tasks running on this message loop are non-nestable.
+ // All tasks running on this task loop are non-nestable.
return PostDelayedTask(from_here, task, delay);
}
@@ -53,6 +133,110 @@ bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
return base::PlatformThread::CurrentId() == plugin_thread_id_;
}
+void PluginThreadTaskRunner::PostRunTasks() {
+ // Post tasks to the plugin thread when it is availabe or spin the shutdown
+ // task loop.
+ if (delegate_ != NULL) {
+ base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
+ delegate_->RunOnPluginThread(
+ base::TimeDelta(),
+ &PluginThreadTaskRunner::TaskSpringboard,
+ new base::Closure(closure));
+ } else {
+ event_.Signal();
+ }
+}
+
+void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
+ DCHECK(BelongsToCurrentThread());
+
+ // |delegate_| is updated from the plugin thread only, so it is safe to access
+ // it here without taking the lock.
+ if (delegate_ != NULL) {
+ // Schedule RunDelayedTasks() to be called at |when| if it hasn't been
+ // scheduled already.
+ if (scheduled_timers_.insert(when).second) {
+ base::TimeDelta delay = CalcTimeDelta(when);
+ base::Closure closure =
+ base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
+ delegate_->RunOnPluginThread(
+ delay,
+ &PluginThreadTaskRunner::TaskSpringboard,
+ new base::Closure(closure));
+ }
+ } else {
+ // Spin the shutdown loop if the task runner has already been detached.
+ // The shutdown loop will pick the tasks to run itself.
+ event_.Signal();
+ }
+}
+
+void PluginThreadTaskRunner::ProcessIncomingTasks() {
+ DCHECK(BelongsToCurrentThread());
+
+ // Grab all unsorted tasks accomulated so far.
+ base::TaskQueue work_queue;
+ {
+ base::AutoLock locked(lock_);
+ incoming_queue_.Swap(&work_queue);
+ }
+
+ while (!work_queue.empty()) {
+ base::PendingTask pending_task = work_queue.front();
+ work_queue.pop();
+
+ if (pending_task.delayed_run_time.is_null()) {
+ pending_task.task.Run();
+ } else {
+ delayed_queue_.push(pending_task);
+ }
+ }
+}
+
+void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
+ DCHECK(BelongsToCurrentThread());
+
+ scheduled_timers_.erase(when);
+
+ // |stopped_| is updated by the plugin thread only, so it is safe to access
+ // it here without taking the lock.
+ if (!stopped_) {
+ ProcessIncomingTasks();
+ RunDueTasks(base::TimeTicks::Now());
+ }
+}
+
+void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
+ DCHECK(BelongsToCurrentThread());
+
+ // Run all due tasks.
+ while (!delayed_queue_.empty() &&
+ delayed_queue_.top().delayed_run_time <= now) {
+ delayed_queue_.top().task.Run();
+ delayed_queue_.pop();
+ }
+
+ // Post a delayed asynchronous call to the plugin thread to process tasks from
+ // the delayed queue.
+ if (!delayed_queue_.empty()) {
+ base::TimeTicks when = delayed_queue_.top().delayed_run_time;
+ if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
+ PostDelayedRunTasks(when);
+ }
+ }
+}
+
+void PluginThreadTaskRunner::RunTasks() {
+ DCHECK(BelongsToCurrentThread());
+
+ // |stopped_| is updated by the plugin thread only, so it is safe to access
+ // it here without taking the lock.
+ if (!stopped_) {
+ ProcessIncomingTasks();
+ RunDueTasks(base::TimeTicks::Now());
+ }
+}
+
// static
void PluginThreadTaskRunner::TaskSpringboard(void* data) {
base::Closure* task = reinterpret_cast<base::Closure*>(data);
@@ -60,11 +244,4 @@ void PluginThreadTaskRunner::TaskSpringboard(void* data) {
delete task;
}
-void PluginThreadTaskRunner::RunClosureIf(const base::Closure& task) {
- // |delegate_| can be changed only from our thread, so it's safe to
- // access it without acquiring |lock_|.
- if (delegate_)
- task.Run();
-}
-
} // namespace remoting
diff --git a/remoting/base/plugin_thread_task_runner.h b/remoting/base/plugin_thread_task_runner.h
index 5ebafe1..bfae1ce 100644
--- a/remoting/base/plugin_thread_task_runner.h
+++ b/remoting/base/plugin_thread_task_runner.h
@@ -5,11 +5,18 @@
#ifndef REMOTING_BASE_PLUGIN_THREAD_TASK_RUNNER_H_
#define REMOTING_BASE_PLUGIN_THREAD_TASK_RUNNER_H_
+#include <set>
+
#include "base/callback_forward.h"
#include "base/compiler_specific.h"
+#include "base/pending_task.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
+#include "base/time.h"
+
+class MessageLoop;
namespace remoting {
@@ -18,8 +25,7 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner {
public:
class Delegate {
public:
- Delegate() { }
- virtual ~Delegate() { }
+ virtual ~Delegate();
virtual bool RunOnPluginThread(
base::TimeDelta delay, void(function)(void*), void* data) = 0;
@@ -28,7 +34,18 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner {
// Caller keeps ownership of delegate.
PluginThreadTaskRunner(Delegate* delegate);
- void Detach();
+ // Detaches the PluginThreadTaskRunner from the underlying Delegate and
+ // processes posted tasks until Quit() is called. This is used during plugin
+ // shutdown, when the plugin environment has stopped accepting new tasks to
+ // run, to process cleanup tasks posted to the plugin thread.
+ // This method must be called on the plugin thread.
+ void DetachAndRunShutdownLoop();
+
+ // Makes DetachAndRunShutdownLoop() stop processing tasks and return control
+ // to the caller. Calling Quit() before DetachAndRunShutdownLoop() causes
+ // the latter to exit immediately when called, without processing any delayed
+ // shutdown tasks. This method can be called from any thread.
+ void Quit();
// base::SingleThreadTaskRunner interface.
virtual bool PostDelayedTask(
@@ -39,23 +56,72 @@ class PluginThreadTaskRunner : public base::SingleThreadTaskRunner {
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) OVERRIDE;
-
virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
protected:
virtual ~PluginThreadTaskRunner();
private:
+ // Methods that can be called from any thread.
+
+ // Schedules RunTasks to be called on the plugin thread.
+ void PostRunTasks();
+
+ // Methods that are always called on the plugin thread.
+
+ // Schedules RunDelayedTasks() to be called on the plugin thread. |when|
+ // specifies the time when RunDelayedTasks() should be called.
+ void PostDelayedRunTasks(base::TimeTicks when);
+
+ // Processes the incoming task queue: runs all non delayed tasks and posts all
+ // delayed tasks to |delayed_queue_|.
+ void ProcessIncomingTasks();
+
+ // Called in response to PostDelayedRunTasks().
+ void RunDelayedTasks(base::TimeTicks when);
+
+ // Runs all tasks that are due.
+ void RunDueTasks(base::TimeTicks now);
+
+ // Called in response to PostRunTasks().
+ void RunTasks();
+
static void TaskSpringboard(void* data);
- void RunClosureIf(const base::Closure& task);
+ const base::PlatformThreadId plugin_thread_id_;
- base::PlatformThreadId plugin_thread_id_;
+ // Used by the shutdown loop to block the thread until there is a task ready
+ // to run.
+ base::WaitableEvent event_;
- // |lock_| must be acquired when accessing |delegate_|.
base::Lock lock_;
+
+ // The members below are protected by |lock_|.
+
+ // Pointer to the delegate that implements scheduling tasks via the plugin
+ // API.
Delegate* delegate_;
+ // Contains all posted tasks that haven't been sorted yet.
+ base::TaskQueue incoming_queue_;
+
+ // The next sequence number to use for delayed tasks.
+ int next_sequence_num_;
+
+ // True if Quit() has been called.
+ bool quit_received_;
+
+ // The members below are accessed only on the plugin thread.
+
+ // Contains delayed tasks, sorted by their 'delayed_run_time' property.
+ base::DelayedTaskQueue delayed_queue_;
+
+ // The list of timestamps when scheduled timers are expected to fire.
+ std::set<base::TimeTicks> scheduled_timers_;
+
+ // True if the shutdown task loop was been stopped.
+ bool stopped_;
+
DISALLOW_COPY_AND_ASSIGN(PluginThreadTaskRunner);
};
diff --git a/remoting/client/plugin/chromoting_instance.cc b/remoting/client/plugin/chromoting_instance.cc
index 9038b4f..db90e9a 100644
--- a/remoting/client/plugin/chromoting_instance.cc
+++ b/remoting/client/plugin/chromoting_instance.cc
@@ -189,17 +189,17 @@ ChromotingInstance::~ChromotingInstance() {
view_.reset();
if (client_.get()) {
- base::WaitableEvent done_event(true, false);
- client_->Stop(base::Bind(&base::WaitableEvent::Signal,
- base::Unretained(&done_event)));
- done_event.Wait();
+ client_->Stop(base::Bind(&PluginThreadTaskRunner::Quit,
+ plugin_task_runner_));
+ } else {
+ plugin_task_runner_->Quit();
}
+ // Ensure that nothing touches the plugin thread delegate after this point.
+ plugin_task_runner_->DetachAndRunShutdownLoop();
+
// Stopping the context shuts down all chromoting threads.
context_.Stop();
-
- // Ensure that nothing touches the plugin thread delegate after this point.
- plugin_task_runner_->Detach();
}
bool ChromotingInstance::Init(uint32_t argc,
diff --git a/remoting/host/plugin/host_script_object.cc b/remoting/host/plugin/host_script_object.cc
index 21b79be..2d441f3 100644
--- a/remoting/host/plugin/host_script_object.cc
+++ b/remoting/host/plugin/host_script_object.cc
@@ -95,8 +95,6 @@ HostNPScriptObject::HostNPScriptObject(
new PluginThreadTaskRunner(plugin_thread_delegate)),
desktop_environment_factory_(new DesktopEnvironmentFactory()),
failed_login_attempts_(0),
- disconnected_event_(true, false),
- stopped_event_(true, false),
nat_traversal_enabled_(false),
policy_received_(false),
daemon_controller_(DaemonController::Create()),
@@ -109,12 +107,6 @@ HostNPScriptObject::~HostNPScriptObject() {
HostLogHandler::UnregisterLoggingScriptObject(this);
- // Stop the message loop. Any attempt to post a task to
- // |context_.ui_task_runner()| will result in a CHECK() after this point.
- // TODO(alexeypa): Enable posting messages to |plugin_task_runner_| during
- // shutdown to avoid this hack.
- plugin_task_runner_->Detach();
-
// Stop listening for policy updates.
if (policy_watcher_.get()) {
base::WaitableEvent policy_watcher_stopped_(true, false);
@@ -124,35 +116,17 @@ HostNPScriptObject::~HostNPScriptObject() {
}
if (host_context_.get()) {
- // Disconnect synchronously. We cannot disconnect asynchronously
- // here because |host_context_| needs to be stopped on the plugin
- // thread, but the plugin thread may not exist after the instance
- // is destroyed.
- disconnected_event_.Reset();
DisconnectInternal();
-
- // |disconnected_event_| is signalled when the host is completely stopped.
- disconnected_event_.Wait();
-
- // UI needs to be shut down on the UI thread before we destroy the
- // host context (because it depends on the context object), but
- // only after the host has been shut down (becase the UI object is
- // registered as status observer for the host, and we can't
- // unregister it from this thread).
- it2me_host_user_interface_.reset();
-
- // Release the context's TaskRunner references for the threads, so they can
- // exit when no objects need them.
- host_context_->ReleaseTaskRunners();
-
- // |stopped_event_| is signalled when the last reference to the plugin
- // thread is dropped.
- stopped_event_.Wait();
-
- // Stop all threads.
- host_context_.reset();
+ } else {
+ plugin_task_runner_->Quit();
}
+ // Stop the message loop and run the remaining tasks. The loop will exit
+ // once the wrapping AutoThreadTaskRunner is destroyed.
+ plugin_task_runner_->DetachAndRunShutdownLoop();
+
+ // Stop all threads.
+ host_context_.reset();
worker_thread_.Stop();
}
@@ -160,10 +134,12 @@ bool HostNPScriptObject::Init() {
DCHECK(plugin_task_runner_->BelongsToCurrentThread());
VLOG(2) << "Init";
- host_context_.reset(new ChromotingHostContext(new AutoThreadTaskRunner(
- plugin_task_runner_,
- base::Bind(&base::WaitableEvent::Signal,
- base::Unretained(&stopped_event_)))));
+ scoped_refptr<AutoThreadTaskRunner> auto_plugin_task_runner =
+ new AutoThreadTaskRunner(plugin_task_runner_,
+ base::Bind(&PluginThreadTaskRunner::Quit,
+ plugin_task_runner_));
+ host_context_.reset(new ChromotingHostContext(auto_plugin_task_runner));
+ auto_plugin_task_runner = NULL;
if (!host_context_->Start()) {
host_context_.reset();
return false;
@@ -879,6 +855,9 @@ bool HostNPScriptObject::StopDaemon(const NPVariant* args,
}
void HostNPScriptObject::DisconnectInternal() {
+ if (!host_context_->network_task_runner())
+ return;
+
if (!host_context_->network_task_runner()->BelongsToCurrentThread()) {
host_context_->network_task_runner()->PostTask(
FROM_HERE, base::Bind(&HostNPScriptObject::DisconnectInternal,
@@ -888,13 +867,13 @@ void HostNPScriptObject::DisconnectInternal() {
switch (state_) {
case kDisconnected:
- disconnected_event_.Signal();
+ OnShutdownFinished();
return;
case kStarting:
SetState(kDisconnecting);
SetState(kDisconnected);
- disconnected_event_.Signal();
+ OnShutdownFinished();
return;
case kDisconnecting:
@@ -921,9 +900,23 @@ void HostNPScriptObject::DisconnectInternal() {
}
void HostNPScriptObject::OnShutdownFinished() {
- DCHECK(host_context_->network_task_runner()->BelongsToCurrentThread());
+ if (!host_context_->ui_task_runner()->BelongsToCurrentThread()) {
+ host_context_->ui_task_runner()->PostTask(
+ FROM_HERE, base::Bind(&HostNPScriptObject::OnShutdownFinished,
+ base::Unretained(this)));
+ return;
+ }
+
+ // UI needs to be shut down on the UI thread before we destroy the
+ // host context (because it depends on the context object), but
+ // only after the host has been shut down (becase the UI object is
+ // registered as status observer for the host, and we can't
+ // unregister it from this thread).
+ it2me_host_user_interface_.reset();
- disconnected_event_.Signal();
+ // Release the context's TaskRunner references for the threads, so they can
+ // exit when no objects need them.
+ host_context_->ReleaseTaskRunners();
}
void HostNPScriptObject::OnPolicyUpdate(
diff --git a/remoting/host/plugin/host_script_object.h b/remoting/host/plugin/host_script_object.h
index c9bd1ec5..cfcb990 100644
--- a/remoting/host/plugin/host_script_object.h
+++ b/remoting/host/plugin/host_script_object.h
@@ -322,9 +322,6 @@ class HostNPScriptObject : public HostStatusObserver {
UiStrings ui_strings_;
base::Lock ui_strings_lock_;
- base::WaitableEvent disconnected_event_;
- base::WaitableEvent stopped_event_;
-
base::Lock nat_policy_lock_;
scoped_ptr<policy_hack::PolicyWatcher> policy_watcher_;