diff options
author | zork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-11-24 19:08:15 +0000 |
---|---|---|
committer | zork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-11-24 19:08:15 +0000 |
commit | d256e6b2991b8d2ba0a613242d545856540f9540 (patch) | |
tree | 771389eae254ef226932184cd3bfb7dfd75c0cb5 /third_party/libjingle/files/talk/base | |
parent | edd3b0a50912b04fe9d7e5dbf6f9639144cc09e4 (diff) | |
download | chromium_src-d256e6b2991b8d2ba0a613242d545856540f9540.zip chromium_src-d256e6b2991b8d2ba0a613242d545856540f9540.tar.gz chromium_src-d256e6b2991b8d2ba0a613242d545856540f9540.tar.bz2 |
Update Signal Thread to use ref counting and locks to ensure thread safety.
Review URL: http://codereview.chromium.org/418042
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@32948 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'third_party/libjingle/files/talk/base')
8 files changed, 110 insertions, 46 deletions
diff --git a/third_party/libjingle/files/talk/base/asynchttprequest.h b/third_party/libjingle/files/talk/base/asynchttprequest.h index 65bfa26..7f2f348 100644 --- a/third_party/libjingle/files/talk/base/asynchttprequest.h +++ b/third_party/libjingle/files/talk/base/asynchttprequest.h @@ -20,8 +20,7 @@ class FirewallManager; class MemoryStream; class AsyncHttpRequest: - public SignalThread, - public sigslot::has_slots<> { + public SignalThread { public: AsyncHttpRequest(const std::string &user_agent); diff --git a/third_party/libjingle/files/talk/base/autodetectproxy.cc b/third_party/libjingle/files/talk/base/autodetectproxy.cc index 917c548..cda405f 100644 --- a/third_party/libjingle/files/talk/base/autodetectproxy.cc +++ b/third_party/libjingle/files/talk/base/autodetectproxy.cc @@ -111,7 +111,7 @@ void AutoDetectProxy::Complete(talk_base::ProxyType type) { LOG_V(sev) << "AutoDetectProxy detected " << proxy_.address.ToString() << " as type " << proxy_.type; - Thread::Current()->MessageQueue::Stop(); + Thread::Current()->Quit(); } void AutoDetectProxy::OnConnectEvent(talk_base::AsyncSocket * socket) { diff --git a/third_party/libjingle/files/talk/base/autodetectproxy.h b/third_party/libjingle/files/talk/base/autodetectproxy.h index 9633d31..43d765e 100644 --- a/third_party/libjingle/files/talk/base/autodetectproxy.h +++ b/third_party/libjingle/files/talk/base/autodetectproxy.h @@ -19,7 +19,7 @@ namespace talk_base { class AsyncSocket; -class AutoDetectProxy : public SignalThread, public sigslot::has_slots<> { +class AutoDetectProxy : public SignalThread { public: AutoDetectProxy(const std::string& user_agent); diff --git a/third_party/libjingle/files/talk/base/messagequeue.cc b/third_party/libjingle/files/talk/base/messagequeue.cc index 48a372b..171f324 100644 --- a/third_party/libjingle/files/talk/base/messagequeue.cc +++ b/third_party/libjingle/files/talk/base/messagequeue.cc @@ -101,6 +101,10 @@ MessageQueue::MessageQueue(SocketServer* ss) } MessageQueue::~MessageQueue() { + // The signal is done from here to ensure + // that it always gets called when the queue + // is going away. + SignalQueueDestroyed(); if (active_) { MessageQueueManager::Instance()->Remove(this); Clear(NULL); @@ -111,12 +115,12 @@ void MessageQueue::set_socketserver(SocketServer* ss) { ss_ = ss; } -void MessageQueue::Stop() { +void MessageQueue::Quit() { fStop_ = true; ss_->WakeUp(); } -bool MessageQueue::IsStopping() { +bool MessageQueue::IsQuitting() { return fStop_; } diff --git a/third_party/libjingle/files/talk/base/messagequeue.h b/third_party/libjingle/files/talk/base/messagequeue.h index 5ef976d..d39aab8 100644 --- a/third_party/libjingle/files/talk/base/messagequeue.h +++ b/third_party/libjingle/files/talk/base/messagequeue.h @@ -160,9 +160,8 @@ public: // Get (or Peek) returns false. By guaranteeing delivery of those messages, // we eliminate the race condition when an MessageHandler and MessageQueue // may be destroyed independently of each other. - - virtual void Stop(); - virtual bool IsStopping(); + virtual void Quit(); + virtual bool IsQuitting(); virtual void Restart(); // Get() will process I/O until: @@ -187,6 +186,10 @@ public: } } + // When this signal is sent out, any references to this queue should + // no longer be used. + sigslot::signal0<> SignalQueueDestroyed; + protected: void EnsureActive(); diff --git a/third_party/libjingle/files/talk/base/signalthread.cc b/third_party/libjingle/files/talk/base/signalthread.cc index 0b5154d6..c9ac86a 100644 --- a/third_party/libjingle/files/talk/base/signalthread.cc +++ b/third_party/libjingle/files/talk/base/signalthread.cc @@ -10,22 +10,31 @@ using namespace talk_base; SignalThread::SignalThread() : main_(Thread::Current()), state_(kInit) { - worker_.SetParent(this); + main_->SignalQueueDestroyed.connect(this, + &SignalThread::OnMainThreadDestroyed); + refcount_ = 1; + worker_.parent_ = this; +} + +void SignalThread::OnMainThreadDestroyed() { + EnterExit ee(this); + main_ = NULL; } SignalThread::~SignalThread() { - worker_.SetParent(NULL); } void SignalThread::SetPriority(ThreadPriority priority) { + EnterExit ee(this); ASSERT(main_->IsCurrent()); ASSERT(kInit == state_); worker_.SetPriority(priority); } void SignalThread::Start() { + EnterExit ee(this); ASSERT(main_->IsCurrent()); - if (kInit == state_) { + if (kInit == state_ || kComplete == state_) { state_ = kRunning; OnWorkStart(); worker_.Start(); @@ -34,18 +43,24 @@ void SignalThread::Start() { } } -void SignalThread::Destroy() { +void SignalThread::Destroy(bool wait) { + EnterExit ee(this); ASSERT(main_->IsCurrent()); if ((kInit == state_) || (kComplete == state_)) { - delete this; - } else if (kRunning == state_) { + refcount_--; + } else if (kRunning == state_ || kReleasing == state_) { state_ = kStopping; - // A couple tricky issues here: - // 1) Thread::Stop() calls Join(), which we don't want... we just want - // to stop the MessageQueue, which causes ContinueWork() to return false. - // 2) OnWorkStop() must follow Stop(), so that when the thread wakes up - // due to OWS(), ContinueWork() will return false. - worker_.MessageQueue::Stop(); + // OnWorkStop() must follow Quit(), so that when the thread wakes up due to + // OWS(), ContinueWork() will return false. + if (wait) { + // Release the thread's lock so that it can return from ::Run. + cs_.Leave(); + worker_.Stop(); + cs_.Enter(); + refcount_--; + } else { + worker_.Quit(); + } OnWorkStop(); } else { ASSERT(false); @@ -53,9 +68,10 @@ void SignalThread::Destroy() { } void SignalThread::Release() { + EnterExit ee(this); ASSERT(main_->IsCurrent()); if (kComplete == state_) { - delete this; + refcount_--; } else if (kRunning == state_) { state_ = kReleasing; } else { @@ -65,11 +81,13 @@ void SignalThread::Release() { } bool SignalThread::ContinueWork() { + EnterExit ee(this); ASSERT(worker_.IsCurrent()); return worker_.ProcessMessages(0); } void SignalThread::OnMessage(Message *msg) { + EnterExit ee(this); if (ST_MSG_WORKER_DONE == msg->message_id) { ASSERT(main_->IsCurrent()); OnWorkDone(); @@ -80,15 +98,31 @@ void SignalThread::OnMessage(Message *msg) { do_delete = true; } if (kStopping != state_) { + // Before signaling that the work is done, make sure that the worker + // thread actually is done. We got here because DoWork() finished and + // Run() posted the ST_MSG_WORKER_DONE message. This means the worker + // thread is about to go away anyway, but sometimes it doesn't actually + // finish before SignalWorkDone is processed, and for a reusable + // SignalThread this makes an assert in thread.cc fire. + // + // Calling Stop() on the worker ensures that the OS thread that underlies + // the worker will finish, and will be set to NULL, enabling us to call + // Start() again. + worker_.Stop(); SignalWorkDone(this); } if (do_delete) { - delete this; + refcount_--; } } } void SignalThread::Run() { DoWork(); - main_->Post(this, ST_MSG_WORKER_DONE); + { + EnterExit ee(this); + if (main_) { + main_->Post(this, ST_MSG_WORKER_DONE); + } + } } diff --git a/third_party/libjingle/files/talk/base/signalthread.h b/third_party/libjingle/files/talk/base/signalthread.h index ccba912..ad6b46a 100644 --- a/third_party/libjingle/files/talk/base/signalthread.h +++ b/third_party/libjingle/files/talk/base/signalthread.h @@ -13,13 +13,17 @@ namespace talk_base { // Cancellation: Call Release(true), to abort the worker thread. // Fire-and-forget: Call Release(false), which allows the thread to run to // completion, and then self-destruct without further notification. +// Periodic tasks: Wait for SignalWorkDone, then eventually call Start() +// again to repeat the task. When the instance isn't needed anymore, +// call Release. DoWork, OnWorkStart and OnWorkStop are called again, +// on a new thread. // The subclass should override DoWork() to perform the background task. By // periodically calling ContinueWork(), it can check for cancellation. // OnWorkStart and OnWorkDone can be overridden to do pre- or post-work // tasks in the context of the main thread. /////////////////////////////////////////////////////////////////////////////// -class SignalThread : protected MessageHandler { +class SignalThread : public sigslot::has_slots<>, protected MessageHandler { public: SignalThread(); @@ -32,8 +36,9 @@ public: // Context: Main Thread. If the worker thread is not running, deletes the // object immediately. Otherwise, asks the worker thread to abort processing, // and schedules the object to be deleted once the worker exits. - // SignalWorkDone will not be signalled. - void Destroy(); + // SignalWorkDone will not be signalled. If wait is true, does not return + // until the thread is deleted. + void Destroy(bool wait); // Context: Main Thread. If the worker thread is complete, deletes the // object immediately. Otherwise, schedules the object to be deleted once @@ -50,11 +55,11 @@ protected: // Context: Main Thread. Subclass should override to do pre-work setup. virtual void OnWorkStart() { } - + // Context: Worker Thread. Subclass should override to do work. virtual void DoWork() = 0; - // Context: Worker Thread. Subclass should call periodically to + // Context: Worker Thread. Subclass should call periodically to // dispatch messages and determine if the thread should terminate. bool ContinueWork(); @@ -64,7 +69,7 @@ protected: // Context: Main Thread. Subclass should override to do post-work cleanup. virtual void OnWorkDone() { } - + // Context: Any Thread. If subclass overrides, be sure to call the base // implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE) virtual void OnMessage(Message *msg); @@ -73,26 +78,44 @@ private: friend class Worker; class Worker : public Thread { public: - virtual void Run() { - CritScope cs(&parent_crit_); - if (parent_) - parent_->Run(); + SignalThread* parent_; + virtual void Run() { parent_->Run(); } + }; + + class EnterExit { + friend class SignalThread; + + SignalThread * t_; + + EnterExit(SignalThread * t) : t_(t) { + t_->cs_.Enter(); + t_->refcount_ += 1; } - void SetParent(SignalThread* parent) { - CritScope cs(&parent_crit_); - parent_ = parent; + ~EnterExit() { + bool d = (0 == (--(t_->refcount_))); + t_->cs_.Leave(); + if (d) + delete t_; } - - private: - SignalThread* parent_; - CriticalSection parent_crit_; }; + friend class EnterExit; + + CriticalSection cs_; + int refcount_; + void Run(); + void OnMainThreadDestroyed(); Thread* main_; Worker worker_; - enum State { kInit, kRunning, kComplete, kStopping, kReleasing } state_; + enum State { + kInit, // Initialized, but not started + kRunning, // Started and doing work + kReleasing, // Same as running, but to be deleted when work is done + kComplete, // Work is done + kStopping, // Work is being interrupted + } state_; }; /////////////////////////////////////////////////////////////////////////////// diff --git a/third_party/libjingle/files/talk/base/thread.cc b/third_party/libjingle/files/talk/base/thread.cc index 99fa701..fa2d23c 100644 --- a/third_party/libjingle/files/talk/base/thread.cc +++ b/third_party/libjingle/files/talk/base/thread.cc @@ -131,8 +131,8 @@ void Thread::Start() { // Make sure Join() hasn't been called yet. if (stopped_) return; - pthread_create(&thread_, &attr, PreRun, this); - started_ = true; + if (pthread_create(&thread_, &attr, PreRun, this) == 0) + started_ = true; } void Thread::Join() { @@ -141,6 +141,7 @@ void Thread::Join() { if (started_) { void *pv; pthread_join(thread_, &pv); + started_ = false; } } #endif @@ -225,7 +226,7 @@ void Thread::Run() { } void Thread::Stop() { - MessageQueue::Stop(); + MessageQueue::Quit(); Join(); } @@ -347,7 +348,7 @@ bool Thread::ProcessMessages(int cmsLoop) { while (true) { Message msg; if (!Get(&msg, cmsNext)) - return false; + return !IsQuitting(); Dispatch(&msg); if (cmsLoop != kForever) { |