summaryrefslogtreecommitdiffstats
path: root/third_party/libjingle/files/talk/base
diff options
context:
space:
mode:
authorzork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-24 19:08:15 +0000
committerzork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-24 19:08:15 +0000
commitd256e6b2991b8d2ba0a613242d545856540f9540 (patch)
tree771389eae254ef226932184cd3bfb7dfd75c0cb5 /third_party/libjingle/files/talk/base
parentedd3b0a50912b04fe9d7e5dbf6f9639144cc09e4 (diff)
downloadchromium_src-d256e6b2991b8d2ba0a613242d545856540f9540.zip
chromium_src-d256e6b2991b8d2ba0a613242d545856540f9540.tar.gz
chromium_src-d256e6b2991b8d2ba0a613242d545856540f9540.tar.bz2
Update Signal Thread to use ref counting and locks to ensure thread safety.
Review URL: http://codereview.chromium.org/418042 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@32948 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'third_party/libjingle/files/talk/base')
-rw-r--r--third_party/libjingle/files/talk/base/asynchttprequest.h3
-rw-r--r--third_party/libjingle/files/talk/base/autodetectproxy.cc2
-rw-r--r--third_party/libjingle/files/talk/base/autodetectproxy.h2
-rw-r--r--third_party/libjingle/files/talk/base/messagequeue.cc8
-rw-r--r--third_party/libjingle/files/talk/base/messagequeue.h9
-rw-r--r--third_party/libjingle/files/talk/base/signalthread.cc64
-rw-r--r--third_party/libjingle/files/talk/base/signalthread.h59
-rw-r--r--third_party/libjingle/files/talk/base/thread.cc9
8 files changed, 110 insertions, 46 deletions
diff --git a/third_party/libjingle/files/talk/base/asynchttprequest.h b/third_party/libjingle/files/talk/base/asynchttprequest.h
index 65bfa26..7f2f348 100644
--- a/third_party/libjingle/files/talk/base/asynchttprequest.h
+++ b/third_party/libjingle/files/talk/base/asynchttprequest.h
@@ -20,8 +20,7 @@ class FirewallManager;
class MemoryStream;
class AsyncHttpRequest:
- public SignalThread,
- public sigslot::has_slots<> {
+ public SignalThread {
public:
AsyncHttpRequest(const std::string &user_agent);
diff --git a/third_party/libjingle/files/talk/base/autodetectproxy.cc b/third_party/libjingle/files/talk/base/autodetectproxy.cc
index 917c548..cda405f 100644
--- a/third_party/libjingle/files/talk/base/autodetectproxy.cc
+++ b/third_party/libjingle/files/talk/base/autodetectproxy.cc
@@ -111,7 +111,7 @@ void AutoDetectProxy::Complete(talk_base::ProxyType type) {
LOG_V(sev) << "AutoDetectProxy detected " << proxy_.address.ToString()
<< " as type " << proxy_.type;
- Thread::Current()->MessageQueue::Stop();
+ Thread::Current()->Quit();
}
void AutoDetectProxy::OnConnectEvent(talk_base::AsyncSocket * socket) {
diff --git a/third_party/libjingle/files/talk/base/autodetectproxy.h b/third_party/libjingle/files/talk/base/autodetectproxy.h
index 9633d31..43d765e 100644
--- a/third_party/libjingle/files/talk/base/autodetectproxy.h
+++ b/third_party/libjingle/files/talk/base/autodetectproxy.h
@@ -19,7 +19,7 @@ namespace talk_base {
class AsyncSocket;
-class AutoDetectProxy : public SignalThread, public sigslot::has_slots<> {
+class AutoDetectProxy : public SignalThread {
public:
AutoDetectProxy(const std::string& user_agent);
diff --git a/third_party/libjingle/files/talk/base/messagequeue.cc b/third_party/libjingle/files/talk/base/messagequeue.cc
index 48a372b..171f324 100644
--- a/third_party/libjingle/files/talk/base/messagequeue.cc
+++ b/third_party/libjingle/files/talk/base/messagequeue.cc
@@ -101,6 +101,10 @@ MessageQueue::MessageQueue(SocketServer* ss)
}
MessageQueue::~MessageQueue() {
+ // The signal is done from here to ensure
+ // that it always gets called when the queue
+ // is going away.
+ SignalQueueDestroyed();
if (active_) {
MessageQueueManager::Instance()->Remove(this);
Clear(NULL);
@@ -111,12 +115,12 @@ void MessageQueue::set_socketserver(SocketServer* ss) {
ss_ = ss;
}
-void MessageQueue::Stop() {
+void MessageQueue::Quit() {
fStop_ = true;
ss_->WakeUp();
}
-bool MessageQueue::IsStopping() {
+bool MessageQueue::IsQuitting() {
return fStop_;
}
diff --git a/third_party/libjingle/files/talk/base/messagequeue.h b/third_party/libjingle/files/talk/base/messagequeue.h
index 5ef976d..d39aab8 100644
--- a/third_party/libjingle/files/talk/base/messagequeue.h
+++ b/third_party/libjingle/files/talk/base/messagequeue.h
@@ -160,9 +160,8 @@ public:
// Get (or Peek) returns false. By guaranteeing delivery of those messages,
// we eliminate the race condition when an MessageHandler and MessageQueue
// may be destroyed independently of each other.
-
- virtual void Stop();
- virtual bool IsStopping();
+ virtual void Quit();
+ virtual bool IsQuitting();
virtual void Restart();
// Get() will process I/O until:
@@ -187,6 +186,10 @@ public:
}
}
+ // When this signal is sent out, any references to this queue should
+ // no longer be used.
+ sigslot::signal0<> SignalQueueDestroyed;
+
protected:
void EnsureActive();
diff --git a/third_party/libjingle/files/talk/base/signalthread.cc b/third_party/libjingle/files/talk/base/signalthread.cc
index 0b5154d6..c9ac86a 100644
--- a/third_party/libjingle/files/talk/base/signalthread.cc
+++ b/third_party/libjingle/files/talk/base/signalthread.cc
@@ -10,22 +10,31 @@ using namespace talk_base;
SignalThread::SignalThread()
: main_(Thread::Current()), state_(kInit)
{
- worker_.SetParent(this);
+ main_->SignalQueueDestroyed.connect(this,
+ &SignalThread::OnMainThreadDestroyed);
+ refcount_ = 1;
+ worker_.parent_ = this;
+}
+
+void SignalThread::OnMainThreadDestroyed() {
+ EnterExit ee(this);
+ main_ = NULL;
}
SignalThread::~SignalThread() {
- worker_.SetParent(NULL);
}
void SignalThread::SetPriority(ThreadPriority priority) {
+ EnterExit ee(this);
ASSERT(main_->IsCurrent());
ASSERT(kInit == state_);
worker_.SetPriority(priority);
}
void SignalThread::Start() {
+ EnterExit ee(this);
ASSERT(main_->IsCurrent());
- if (kInit == state_) {
+ if (kInit == state_ || kComplete == state_) {
state_ = kRunning;
OnWorkStart();
worker_.Start();
@@ -34,18 +43,24 @@ void SignalThread::Start() {
}
}
-void SignalThread::Destroy() {
+void SignalThread::Destroy(bool wait) {
+ EnterExit ee(this);
ASSERT(main_->IsCurrent());
if ((kInit == state_) || (kComplete == state_)) {
- delete this;
- } else if (kRunning == state_) {
+ refcount_--;
+ } else if (kRunning == state_ || kReleasing == state_) {
state_ = kStopping;
- // A couple tricky issues here:
- // 1) Thread::Stop() calls Join(), which we don't want... we just want
- // to stop the MessageQueue, which causes ContinueWork() to return false.
- // 2) OnWorkStop() must follow Stop(), so that when the thread wakes up
- // due to OWS(), ContinueWork() will return false.
- worker_.MessageQueue::Stop();
+ // OnWorkStop() must follow Quit(), so that when the thread wakes up due to
+ // OWS(), ContinueWork() will return false.
+ if (wait) {
+ // Release the thread's lock so that it can return from ::Run.
+ cs_.Leave();
+ worker_.Stop();
+ cs_.Enter();
+ refcount_--;
+ } else {
+ worker_.Quit();
+ }
OnWorkStop();
} else {
ASSERT(false);
@@ -53,9 +68,10 @@ void SignalThread::Destroy() {
}
void SignalThread::Release() {
+ EnterExit ee(this);
ASSERT(main_->IsCurrent());
if (kComplete == state_) {
- delete this;
+ refcount_--;
} else if (kRunning == state_) {
state_ = kReleasing;
} else {
@@ -65,11 +81,13 @@ void SignalThread::Release() {
}
bool SignalThread::ContinueWork() {
+ EnterExit ee(this);
ASSERT(worker_.IsCurrent());
return worker_.ProcessMessages(0);
}
void SignalThread::OnMessage(Message *msg) {
+ EnterExit ee(this);
if (ST_MSG_WORKER_DONE == msg->message_id) {
ASSERT(main_->IsCurrent());
OnWorkDone();
@@ -80,15 +98,31 @@ void SignalThread::OnMessage(Message *msg) {
do_delete = true;
}
if (kStopping != state_) {
+ // Before signaling that the work is done, make sure that the worker
+ // thread actually is done. We got here because DoWork() finished and
+ // Run() posted the ST_MSG_WORKER_DONE message. This means the worker
+ // thread is about to go away anyway, but sometimes it doesn't actually
+ // finish before SignalWorkDone is processed, and for a reusable
+ // SignalThread this makes an assert in thread.cc fire.
+ //
+ // Calling Stop() on the worker ensures that the OS thread that underlies
+ // the worker will finish, and will be set to NULL, enabling us to call
+ // Start() again.
+ worker_.Stop();
SignalWorkDone(this);
}
if (do_delete) {
- delete this;
+ refcount_--;
}
}
}
void SignalThread::Run() {
DoWork();
- main_->Post(this, ST_MSG_WORKER_DONE);
+ {
+ EnterExit ee(this);
+ if (main_) {
+ main_->Post(this, ST_MSG_WORKER_DONE);
+ }
+ }
}
diff --git a/third_party/libjingle/files/talk/base/signalthread.h b/third_party/libjingle/files/talk/base/signalthread.h
index ccba912..ad6b46a 100644
--- a/third_party/libjingle/files/talk/base/signalthread.h
+++ b/third_party/libjingle/files/talk/base/signalthread.h
@@ -13,13 +13,17 @@ namespace talk_base {
// Cancellation: Call Release(true), to abort the worker thread.
// Fire-and-forget: Call Release(false), which allows the thread to run to
// completion, and then self-destruct without further notification.
+// Periodic tasks: Wait for SignalWorkDone, then eventually call Start()
+// again to repeat the task. When the instance isn't needed anymore,
+// call Release. DoWork, OnWorkStart and OnWorkStop are called again,
+// on a new thread.
// The subclass should override DoWork() to perform the background task. By
// periodically calling ContinueWork(), it can check for cancellation.
// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work
// tasks in the context of the main thread.
///////////////////////////////////////////////////////////////////////////////
-class SignalThread : protected MessageHandler {
+class SignalThread : public sigslot::has_slots<>, protected MessageHandler {
public:
SignalThread();
@@ -32,8 +36,9 @@ public:
// Context: Main Thread. If the worker thread is not running, deletes the
// object immediately. Otherwise, asks the worker thread to abort processing,
// and schedules the object to be deleted once the worker exits.
- // SignalWorkDone will not be signalled.
- void Destroy();
+ // SignalWorkDone will not be signalled. If wait is true, does not return
+ // until the thread is deleted.
+ void Destroy(bool wait);
// Context: Main Thread. If the worker thread is complete, deletes the
// object immediately. Otherwise, schedules the object to be deleted once
@@ -50,11 +55,11 @@ protected:
// Context: Main Thread. Subclass should override to do pre-work setup.
virtual void OnWorkStart() { }
-
+
// Context: Worker Thread. Subclass should override to do work.
virtual void DoWork() = 0;
- // Context: Worker Thread. Subclass should call periodically to
+ // Context: Worker Thread. Subclass should call periodically to
// dispatch messages and determine if the thread should terminate.
bool ContinueWork();
@@ -64,7 +69,7 @@ protected:
// Context: Main Thread. Subclass should override to do post-work cleanup.
virtual void OnWorkDone() { }
-
+
// Context: Any Thread. If subclass overrides, be sure to call the base
// implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE)
virtual void OnMessage(Message *msg);
@@ -73,26 +78,44 @@ private:
friend class Worker;
class Worker : public Thread {
public:
- virtual void Run() {
- CritScope cs(&parent_crit_);
- if (parent_)
- parent_->Run();
+ SignalThread* parent_;
+ virtual void Run() { parent_->Run(); }
+ };
+
+ class EnterExit {
+ friend class SignalThread;
+
+ SignalThread * t_;
+
+ EnterExit(SignalThread * t) : t_(t) {
+ t_->cs_.Enter();
+ t_->refcount_ += 1;
}
- void SetParent(SignalThread* parent) {
- CritScope cs(&parent_crit_);
- parent_ = parent;
+ ~EnterExit() {
+ bool d = (0 == (--(t_->refcount_)));
+ t_->cs_.Leave();
+ if (d)
+ delete t_;
}
-
- private:
- SignalThread* parent_;
- CriticalSection parent_crit_;
};
+ friend class EnterExit;
+
+ CriticalSection cs_;
+ int refcount_;
+
void Run();
+ void OnMainThreadDestroyed();
Thread* main_;
Worker worker_;
- enum State { kInit, kRunning, kComplete, kStopping, kReleasing } state_;
+ enum State {
+ kInit, // Initialized, but not started
+ kRunning, // Started and doing work
+ kReleasing, // Same as running, but to be deleted when work is done
+ kComplete, // Work is done
+ kStopping, // Work is being interrupted
+ } state_;
};
///////////////////////////////////////////////////////////////////////////////
diff --git a/third_party/libjingle/files/talk/base/thread.cc b/third_party/libjingle/files/talk/base/thread.cc
index 99fa701..fa2d23c 100644
--- a/third_party/libjingle/files/talk/base/thread.cc
+++ b/third_party/libjingle/files/talk/base/thread.cc
@@ -131,8 +131,8 @@ void Thread::Start() {
// Make sure Join() hasn't been called yet.
if (stopped_)
return;
- pthread_create(&thread_, &attr, PreRun, this);
- started_ = true;
+ if (pthread_create(&thread_, &attr, PreRun, this) == 0)
+ started_ = true;
}
void Thread::Join() {
@@ -141,6 +141,7 @@ void Thread::Join() {
if (started_) {
void *pv;
pthread_join(thread_, &pv);
+ started_ = false;
}
}
#endif
@@ -225,7 +226,7 @@ void Thread::Run() {
}
void Thread::Stop() {
- MessageQueue::Stop();
+ MessageQueue::Quit();
Join();
}
@@ -347,7 +348,7 @@ bool Thread::ProcessMessages(int cmsLoop) {
while (true) {
Message msg;
if (!Get(&msg, cmsNext))
- return false;
+ return !IsQuitting();
Dispatch(&msg);
if (cmsLoop != kForever) {