diff options
-rw-r--r-- | DEPS | 2 | ||||
-rw-r--r-- | jingle/glue/thread_wrapper.cc | 5 | ||||
-rw-r--r-- | jingle/glue/thread_wrapper.h | 2 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_thread.h | 4 | ||||
-rw-r--r-- | remoting/jingle_glue/xmpp_socket_adapter.cc | 2 | ||||
-rw-r--r-- | third_party/libjingle/README.chromium | 2 | ||||
-rw-r--r-- | third_party/libjingle/libjingle.gyp | 13 | ||||
-rw-r--r-- | third_party/libjingle/overrides/talk/base/messagequeue.cc | 399 | ||||
-rw-r--r-- | third_party/libjingle/overrides/talk/base/messagequeue.h | 9 | ||||
-rw-r--r-- | third_party/libjingle/overrides/talk/base/thread.cc | 560 | ||||
-rw-r--r-- | third_party/libjingle/overrides/talk/base/thread.h | 5 |
11 files changed, 19 insertions, 984 deletions
@@ -44,7 +44,7 @@ vars = { "nacl_toolchain_revision": "8715", "pnacl_toolchain_revision": "8847", - "libjingle_revision": "152", + "libjingle_revision": "153", "libphonenumber_revision": "456", "libvpx_revision": "134182", "lss_revision": "9", diff --git a/jingle/glue/thread_wrapper.cc b/jingle/glue/thread_wrapper.cc index e31f8de..c63f89a 100644 --- a/jingle/glue/thread_wrapper.cc +++ b/jingle/glue/thread_wrapper.cc @@ -8,6 +8,7 @@ #include "base/bind_helpers.h" #include "base/lazy_instance.h" #include "base/threading/thread_local.h" +#include "third_party/libjingle/source/talk/base/nullsocketserver.h" namespace jingle_glue { @@ -43,7 +44,7 @@ JingleThreadWrapper* JingleThreadWrapper::current() { } JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop) - : talk_base::Thread(NULL), + : talk_base::Thread(new talk_base::NullSocketServer()), message_loop_(message_loop), send_allowed_(false), last_task_id_(0), @@ -69,7 +70,9 @@ void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { talk_base::ThreadManager::Instance()->SetCurrentThread(NULL); talk_base::MessageQueueManager::Instance()->Remove(this); message_loop_->RemoveDestructionObserver(this); + talk_base::SocketServer* ss = socketserver(); delete this; + delete ss; } void JingleThreadWrapper::Post( diff --git a/jingle/glue/thread_wrapper.h b/jingle/glue/thread_wrapper.h index 553dbbd..284c762 100644 --- a/jingle/glue/thread_wrapper.h +++ b/jingle/glue/thread_wrapper.h @@ -12,7 +12,7 @@ #include "base/message_loop.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" -#include "third_party/libjingle/overrides/talk/base/thread.h" +#include "third_party/libjingle/source/talk/base/thread.h" namespace jingle_glue { diff --git a/remoting/jingle_glue/jingle_thread.h b/remoting/jingle_glue/jingle_thread.h index 7c3aba0..2c5422e 100644 --- a/remoting/jingle_glue/jingle_thread.h +++ b/remoting/jingle_glue/jingle_thread.h @@ -8,9 +8,9 @@ #include "base/message_loop.h" #include "base/tracked_objects.h" #include "base/synchronization/waitable_event.h" -#include "third_party/libjingle/overrides/talk/base/messagequeue.h" -#include "third_party/libjingle/overrides/talk/base/thread.h" +#include "third_party/libjingle/source/talk/base/messagequeue.h" #include "third_party/libjingle/source/talk/base/taskrunner.h" +#include "third_party/libjingle/source/talk/base/thread.h" namespace base { class MessageLoopProxy; diff --git a/remoting/jingle_glue/xmpp_socket_adapter.cc b/remoting/jingle_glue/xmpp_socket_adapter.cc index 611d70f..de66c5f 100644 --- a/remoting/jingle_glue/xmpp_socket_adapter.cc +++ b/remoting/jingle_glue/xmpp_socket_adapter.cc @@ -9,12 +9,12 @@ #include "base/logging.h" #include "remoting/jingle_glue/ssl_adapter.h" -#include "third_party/libjingle/overrides/talk/base/thread.h" #include "third_party/libjingle/source/talk/base/byteorder.h" #include "third_party/libjingle/source/talk/base/common.h" #include "third_party/libjingle/source/talk/base/firewallsocketserver.h" #include "third_party/libjingle/source/talk/base/socketadapters.h" #include "third_party/libjingle/source/talk/base/ssladapter.h" +#include "third_party/libjingle/source/talk/base/thread.h" #include "third_party/libjingle/source/talk/xmpp/xmppengine.h" namespace remoting { diff --git a/third_party/libjingle/README.chromium b/third_party/libjingle/README.chromium index 802d91f..02c5da0 100644 --- a/third_party/libjingle/README.chromium +++ b/third_party/libjingle/README.chromium @@ -2,7 +2,7 @@ Name: Libjingle library. Used for p2p voice and video communication. Short Name: libjingle URL: http://code.google.com/p/libjingle/ Version: 0.6.19 -Revision: 152 +Revision: 153 License: BSD License File: source/COPYING Security Critical: yes diff --git a/third_party/libjingle/libjingle.gyp b/third_party/libjingle/libjingle.gyp index 4ceeeee5..87de525 100644 --- a/third_party/libjingle/libjingle.gyp +++ b/third_party/libjingle/libjingle.gyp @@ -16,6 +16,7 @@ 'HAVE_WEBRTC_VOICE', 'JSONCPP_RELATIVE_PATH', 'LOGGING_INSIDE_LIBJINGLE', + 'NO_MAIN_THREAD_WRAPPING', 'NO_SOUND_SYSTEM', 'SRTP_RELATIVE_PATH', 'WEBRTC_RELATIVE_PATH', @@ -57,6 +58,7 @@ 'GTEST_RELATIVE_PATH', 'JSONCPP_RELATIVE_PATH', 'WEBRTC_RELATIVE_PATH', + 'NO_MAIN_THREAD_WRAPPING', 'NO_SOUND_SYSTEM', ], 'conditions': [ @@ -176,13 +178,6 @@ 'overrides/talk/base/logging.cc', 'overrides/talk/base/logging.h', - # TODO(ronghuawu): Remove below overrides once below bug is fixed: - # http://crbug.com/115702 - 'overrides/talk/base/messagequeue.cc', - 'overrides/talk/base/messagequeue.h', - 'overrides/talk/base/thread.cc', - 'overrides/talk/base/thread.h', - 'source/talk/base/asyncfile.cc', 'source/talk/base/asyncfile.h', 'source/talk/base/asynchttprequest.cc', @@ -247,6 +242,8 @@ 'source/talk/base/messagedigest.h', 'source/talk/base/messagehandler.cc', 'source/talk/base/messagehandler.h', + 'source/talk/base/messagequeue.cc', + 'source/talk/base/messagequeue.h', 'source/talk/base/nethelpers.cc', 'source/talk/base/nethelpers.h', 'source/talk/base/network.cc', @@ -304,6 +301,8 @@ 'source/talk/base/taskparent.h', 'source/talk/base/taskrunner.cc', 'source/talk/base/taskrunner.h', + 'source/talk/base/thread.cc', + 'source/talk/base/thread.h', 'source/talk/base/timeutils.cc', 'source/talk/base/timeutils.h', 'source/talk/base/timing.cc', diff --git a/third_party/libjingle/overrides/talk/base/messagequeue.cc b/third_party/libjingle/overrides/talk/base/messagequeue.cc deleted file mode 100644 index 36ddc7a..0000000 --- a/third_party/libjingle/overrides/talk/base/messagequeue.cc +++ /dev/null @@ -1,399 +0,0 @@ -/* - * libjingle - * Copyright 2004--2005, Google Inc. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * 3. The name of the author may not be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO - * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; - * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, - * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR - * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#if defined(_MSC_VER) && _MSC_VER < 1300 -#pragma warning(disable:4786) -#endif - -#ifdef POSIX -#include <sys/time.h> -#endif - -#include "talk/base/common.h" -#include "talk/base/event.h" -#include "talk/base/logging.h" -#include "talk/base/messagequeue.h" -#include "talk/base/nullsocketserver.h" -#include "talk/base/physicalsocketserver.h" - -namespace talk_base { - -const uint32 kMaxMsgLatency = 150; // 150 ms - -//------------------------------------------------------------------ -// MessageQueueManager - -MessageQueueManager* MessageQueueManager::instance_; - -MessageQueueManager* MessageQueueManager::Instance() { - // Note: This is not thread safe, but it is first called before threads are - // spawned. - if (!instance_) - instance_ = new MessageQueueManager; - return instance_; -} - -MessageQueueManager::MessageQueueManager() { -} - -MessageQueueManager::~MessageQueueManager() { -} - -void MessageQueueManager::Add(MessageQueue *message_queue) { - // MessageQueueManager methods should be non-reentrant, so we - // ASSERT that is the case. If any of these ASSERT, please - // contact bpm or jbeda. - ASSERT(!crit_.CurrentThreadIsOwner()); - CritScope cs(&crit_); - message_queues_.push_back(message_queue); -} - -void MessageQueueManager::Remove(MessageQueue *message_queue) { - ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. - // If this is the last MessageQueue, destroy the manager as well so that - // we don't leak this object at program shutdown. As mentioned above, this is - // not thread-safe, but this should only happen at program termination (when - // the ThreadManager is destroyed, and threads are no longer active). - bool destroy = false; - { - CritScope cs(&crit_); - std::vector<MessageQueue *>::iterator iter; - iter = std::find(message_queues_.begin(), message_queues_.end(), - message_queue); - if (iter != message_queues_.end()) { - message_queues_.erase(iter); - } - destroy = message_queues_.empty(); - } - if (destroy) { - instance_ = NULL; - delete this; - } -} - -void MessageQueueManager::Clear(MessageHandler *handler) { - ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. - CritScope cs(&crit_); - std::vector<MessageQueue *>::iterator iter; - for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) - (*iter)->Clear(handler); -} - -//------------------------------------------------------------------ -// MessageQueue - -MessageQueue::MessageQueue() { - // TODO(ronghuawu): - // Currently, MessageQueue holds a socket server, and is the base class for - // Thread. It seems like it makes more sense for Thread to hold the socket - // server, and provide it to the MessageQueue, since the Thread controls - // the I/O model, and MQ is agnostic to those details. Anyway, this causes - // messagequeue_unittest to depend on network libraries... yuck. - owned_ss_.reset(new PhysicalSocketServer()); - ss_ = owned_ss_.get(); - Construct(); -} - -MessageQueue::MessageQueue(SocketServer* ss) { - if (ss) { - ss_ = ss; - } else { - owned_ss_.reset(new NullSocketServer()); - ss_ = owned_ss_.get(); - } - Construct(); -} - -void MessageQueue::Construct() { - fStop_ = false; - fPeekKeep_ = false; - active_ = false; - dmsgq_next_num_ = 0; - ss_->SetMessageQueue(this); -} - -MessageQueue::~MessageQueue() { - // The signal is done from here to ensure - // that it always gets called when the queue - // is going away. - SignalQueueDestroyed(); - if (active_) { - MessageQueueManager::Instance()->Remove(this); - Clear(NULL); - } - if (ss_) { - ss_->SetMessageQueue(NULL); - } -} - -void MessageQueue::set_socketserver(SocketServer* ss) { - ss_ = ss ? ss : owned_ss_.get(); - ss_->SetMessageQueue(this); -} - -void MessageQueue::Quit() { - fStop_ = true; - ss_->WakeUp(); -} - -bool MessageQueue::IsQuitting() { - return fStop_; -} - -void MessageQueue::Restart() { - fStop_ = false; -} - -bool MessageQueue::Peek(Message *pmsg, int cmsWait) { - if (fPeekKeep_) { - *pmsg = msgPeek_; - return true; - } - if (!Get(pmsg, cmsWait)) - return false; - msgPeek_ = *pmsg; - fPeekKeep_ = true; - return true; -} - -bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { - // Return and clear peek if present - // Always return the peek if it exists so there is Peek/Get symmetry - - if (fPeekKeep_) { - *pmsg = msgPeek_; - fPeekKeep_ = false; - return true; - } - - // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch - - int cmsTotal = cmsWait; - int cmsElapsed = 0; - uint32 msStart = Time(); - uint32 msCurrent = msStart; - while (true) { - // Check for sent messages - - ReceiveSends(); - - // Check queues - - int cmsDelayNext = kForever; - { - CritScope cs(&crit_); - - // Check for delayed messages that have been triggered - // Calc the next trigger too - - while (!dmsgq_.empty()) { - if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { - cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); - break; - } - msgq_.push_back(dmsgq_.top().msg_); - dmsgq_.pop(); - } - - // Check for posted events - - while (!msgq_.empty()) { - *pmsg = msgq_.front(); - if (pmsg->ts_sensitive) { - long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); - if (delay > 0) { - LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " - << (delay + kMaxMsgLatency) << "ms"; - } - } - msgq_.pop_front(); - if (MQID_DISPOSE == pmsg->message_id) { - ASSERT(NULL == pmsg->phandler); - delete pmsg->pdata; - continue; - } - return true; - } - } - - if (fStop_) - break; - - // Which is shorter, the delay wait or the asked wait? - - int cmsNext; - if (cmsWait == kForever) { - cmsNext = cmsDelayNext; - } else { - cmsNext = _max(0, cmsTotal - cmsElapsed); - if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) - cmsNext = cmsDelayNext; - } - - // Wait and multiplex in the meantime - if (!ss_->Wait(cmsNext, process_io)) - return false; - - // If the specified timeout expired, return - - msCurrent = Time(); - cmsElapsed = TimeDiff(msCurrent, msStart); - if (cmsWait != kForever) { - if (cmsElapsed >= cmsWait) - return false; - } - } - return false; -} - -void MessageQueue::ReceiveSends() { -} - -void MessageQueue::Post(MessageHandler *phandler, uint32 id, - MessageData *pdata, bool time_sensitive) { - if (fStop_) - return; - - // Keep thread safe - // Add the message to the end of the queue - // Signal for the multiplexer to return - - CritScope cs(&crit_); - EnsureActive(); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = Time() + kMaxMsgLatency; - } - msgq_.push_back(msg); - ss_->WakeUp(); -} - -void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, - MessageHandler *phandler, uint32 id, MessageData* pdata) { - if (fStop_) - return; - - // Keep thread safe - // Add to the priority queue. Gets sorted soonest first. - // Signal for the multiplexer to return. - - CritScope cs(&crit_); - EnsureActive(); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); - dmsgq_.push(dmsg); - // If this message queue processes 1 message every millisecond for 50 days, - // we will wrap this number. Even then, only messages with identical times - // will be misordered, and then only briefly. This is probably ok. - VERIFY(0 != ++dmsgq_next_num_); - ss_->WakeUp(); -} - -int MessageQueue::GetDelay() { - CritScope cs(&crit_); - - if (!msgq_.empty()) - return 0; - - if (!dmsgq_.empty()) { - int delay = TimeUntil(dmsgq_.top().msTrigger_); - if (delay < 0) - delay = 0; - return delay; - } - - return kForever; -} - -void MessageQueue::Clear(MessageHandler *phandler, uint32 id, - MessageList* removed) { - CritScope cs(&crit_); - - // Remove messages with phandler - - if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { - if (removed) { - removed->push_back(msgPeek_); - } else { - delete msgPeek_.pdata; - } - fPeekKeep_ = false; - } - - // Remove from ordered message queue - - for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { - if (it->Match(phandler, id)) { - if (removed) { - removed->push_back(*it); - } else { - delete it->pdata; - } - it = msgq_.erase(it); - } else { - ++it; - } - } - - // Remove from priority queue. Not directly iterable, so use this approach - - PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); - for (PriorityQueue::container_type::iterator it = new_end; - it != dmsgq_.container().end(); ++it) { - if (it->msg_.Match(phandler, id)) { - if (removed) { - removed->push_back(it->msg_); - } else { - delete it->msg_.pdata; - } - } else { - *new_end++ = *it; - } - } - dmsgq_.container().erase(new_end, dmsgq_.container().end()); - dmsgq_.reheap(); -} - -void MessageQueue::Dispatch(Message *pmsg) { - pmsg->phandler->OnMessage(pmsg); -} - -void MessageQueue::EnsureActive() { - ASSERT(crit_.CurrentThreadIsOwner()); - if (!active_) { - active_ = true; - MessageQueueManager::Instance()->Add(this); - } -} - -} // namespace talk_base diff --git a/third_party/libjingle/overrides/talk/base/messagequeue.h b/third_party/libjingle/overrides/talk/base/messagequeue.h index 634fa2a..2857182 100644 --- a/third_party/libjingle/overrides/talk/base/messagequeue.h +++ b/third_party/libjingle/overrides/talk/base/messagequeue.h @@ -171,14 +171,9 @@ class DelayedMessage { class MessageQueue { public: - // A phiscal socket server will be created for this ctor. - MessageQueue(); - // If |ss| is NULL, a dummy socket server will be created. - explicit MessageQueue(SocketServer* ss); + explicit MessageQueue(SocketServer* ss = NULL); virtual ~MessageQueue(); - void Construct(); - SocketServer* socketserver() { return ss_; } void set_socketserver(SocketServer* ss); @@ -245,7 +240,7 @@ class MessageQueue { // The SocketServer is not owned by MessageQueue. SocketServer* ss_; // If a server isn't supplied in the constructor, use this one. - scoped_ptr<SocketServer> owned_ss_; + scoped_ptr<SocketServer> default_ss_; bool fStop_; bool fPeekKeep_; Message msgPeek_; diff --git a/third_party/libjingle/overrides/talk/base/thread.cc b/third_party/libjingle/overrides/talk/base/thread.cc deleted file mode 100644 index 7236ab8..0000000 --- a/third_party/libjingle/overrides/talk/base/thread.cc +++ /dev/null @@ -1,560 +0,0 @@ -/* - * libjingle - * Copyright 2004 Google Inc. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * 3. The name of the author may not be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO - * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; - * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, - * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR - * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "talk/base/thread.h" - -#if defined(WIN32) -#include <comdef.h> -#elif defined(POSIX) -#include <time.h> -#endif - -#include "talk/base/common.h" -#include "talk/base/logging.h" -#include "talk/base/stringutils.h" -#include "talk/base/timeutils.h" - -#ifdef USE_COCOA_THREADING -#if !defined(OSX) && !defined(IOS) -#error USE_COCOA_THREADING is defined but not OSX nor IOS -#endif -#include "talk/base/maccocoathreadhelper.h" -#include "talk/base/scoped_autorelease_pool.h" -#endif - -namespace talk_base { - -ThreadManager* ThreadManager::Instance() { - LIBJINGLE_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ()); - return &thread_manager; -} - -#ifdef POSIX -ThreadManager::ThreadManager() { - pthread_key_create(&key_, NULL); - WrapCurrentThread(); -#ifdef USE_COCOA_THREADING - InitCocoaMultiThreading(); -#endif -} - -ThreadManager::~ThreadManager() { -#ifdef USE_COCOA_THREADING - // This is called during exit, at which point apparently no NSAutoreleasePools - // are available; but we might still need them to do cleanup (or we get the - // "no autoreleasepool in place, just leaking" warning when exiting). - ScopedAutoreleasePool pool; -#endif - UnwrapCurrentThread(); - pthread_key_delete(key_); -} - -Thread *ThreadManager::CurrentThread() { - return static_cast<Thread *>(pthread_getspecific(key_)); -} - -void ThreadManager::SetCurrentThread(Thread *thread) { - pthread_setspecific(key_, thread); -} -#endif - -#ifdef WIN32 -ThreadManager::ThreadManager() { - key_ = TlsAlloc(); -} - -ThreadManager::~ThreadManager() { - UnwrapCurrentThread(); - TlsFree(key_); -} - -Thread *ThreadManager::CurrentThread() { - return static_cast<Thread *>(TlsGetValue(key_)); -} - -void ThreadManager::SetCurrentThread(Thread *thread) { - TlsSetValue(key_, thread); -} -#endif - -// static -Thread *ThreadManager::WrapCurrentThread() { - Thread* result = CurrentThread(); - if (NULL == result) { - result = new Thread(); - result->WrapCurrentWithThreadManager(this); - } - return result; -} - -// static -void ThreadManager::UnwrapCurrentThread() { - Thread* t = CurrentThread(); - if (t && !(t->IsOwned())) { - t->UnwrapCurrent(); - delete t; - } -} - -struct ThreadInit { - Thread* thread; - Runnable* runnable; -}; - -Thread::Thread() { - Construct(); -} - -Thread::Thread(SocketServer* ss) - : MessageQueue(ss) { - Construct(); -} - -void Thread::Construct() { - priority_ = PRIORITY_NORMAL; - started_ = false; - has_sends_ = false; -#if defined(WIN32) - thread_ = NULL; -#endif - owned_ = true; - delete_self_when_complete_ = false; - SetName("Thread", this); // default name -} - -Thread::~Thread() { - Stop(); - if (active_) - Clear(NULL); -} - -bool Thread::SleepMs(int milliseconds) { -#ifdef WIN32 - ::Sleep(milliseconds); - return true; -#else - // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, - // so we use nanosleep() even though it has greater precision than necessary. - struct timespec ts; - ts.tv_sec = milliseconds / 1000; - ts.tv_nsec = (milliseconds % 1000) * 1000000; - int ret = nanosleep(&ts, NULL); - if (ret != 0) { - LOG_ERR(LS_WARNING) << "nanosleep() returning early"; - return false; - } - return true; -#endif -} - -bool Thread::SetName(const std::string& name, const void* obj) { - if (started_) return false; - name_ = name; - if (obj) { - char buf[16]; - sprintfn(buf, sizeof(buf), " 0x%p", obj); - name_ += buf; - } - return true; -} - -bool Thread::SetPriority(ThreadPriority priority) { -#if defined(WIN32) - if (started_) { - BOOL ret = FALSE; - if (priority == PRIORITY_NORMAL) { - ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL); - } else if (priority == PRIORITY_HIGH) { - ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_HIGHEST); - } else if (priority == PRIORITY_ABOVE_NORMAL) { - ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_ABOVE_NORMAL); - } else if (priority == PRIORITY_IDLE) { - ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE); - } - if (!ret) { - return false; - } - } - priority_ = priority; - return true; -#else - // TODO: Implement for Linux/Mac if possible. - if (started_) return false; - priority_ = priority; - return true; -#endif -} - -bool Thread::Start(Runnable* runnable) { - ASSERT(owned_); - if (!owned_) return false; - ASSERT(!started_); - if (started_) return false; - - // Make sure that ThreadManager is created on the main thread before - // we start a new thread. - ThreadManager::Instance(); - - ThreadInit* init = new ThreadInit; - init->thread = this; - init->runnable = runnable; -#if defined(WIN32) - DWORD flags = 0; - if (priority_ != PRIORITY_NORMAL) { - flags = CREATE_SUSPENDED; - } - thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags, - NULL); - if (thread_) { - started_ = true; - if (priority_ != PRIORITY_NORMAL) { - SetPriority(priority_); - ::ResumeThread(thread_); - } - } else { - return false; - } -#elif defined(POSIX) - pthread_attr_t attr; - pthread_attr_init(&attr); - if (priority_ != PRIORITY_NORMAL) { - if (priority_ == PRIORITY_IDLE) { - // There is no POSIX-standard way to set a below-normal priority for an - // individual thread (only whole process), so let's not support it. - LOG(LS_WARNING) << "PRIORITY_IDLE not supported"; - } else { - // Set real-time round-robin policy. - if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) { - LOG(LS_ERROR) << "pthread_attr_setschedpolicy"; - } - struct sched_param param; - if (pthread_attr_getschedparam(&attr, ¶m) != 0) { - LOG(LS_ERROR) << "pthread_attr_getschedparam"; - } else { - // The numbers here are arbitrary. - if (priority_ == PRIORITY_HIGH) { - param.sched_priority = 6; // 6 = HIGH - } else { - ASSERT(priority_ == PRIORITY_ABOVE_NORMAL); - param.sched_priority = 4; // 4 = ABOVE_NORMAL - } - if (pthread_attr_setschedparam(&attr, ¶m) != 0) { - LOG(LS_ERROR) << "pthread_attr_setschedparam"; - } - } - } - } - int error_code = pthread_create(&thread_, &attr, PreRun, init); - if (0 != error_code) { - LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; - return false; - } - started_ = true; -#endif - return true; -} - -void Thread::Join() { - if (started_) { - ASSERT(!IsCurrent()); -#if defined(WIN32) - WaitForSingleObject(thread_, INFINITE); - CloseHandle(thread_); - thread_ = NULL; -#elif defined(POSIX) - void *pv; - pthread_join(thread_, &pv); -#endif - started_ = false; - } -} - -#ifdef WIN32 -// As seen on MSDN. -// http://msdn.microsoft.com/en-us/library/xcb2z8hs(VS.71).aspx -#define MSDEV_SET_THREAD_NAME 0x406D1388 -typedef struct tagTHREADNAME_INFO { - DWORD dwType; - LPCSTR szName; - DWORD dwThreadID; - DWORD dwFlags; -} THREADNAME_INFO; - -void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) { - THREADNAME_INFO info; - info.dwType = 0x1000; - info.szName = szThreadName; - info.dwThreadID = dwThreadID; - info.dwFlags = 0; - - __try { - RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info) / sizeof(DWORD), - reinterpret_cast<DWORD*>(&info)); - } - __except(EXCEPTION_CONTINUE_EXECUTION) { - } -} -#endif // WIN32 - -void* Thread::PreRun(void* pv) { - ThreadInit* init = static_cast<ThreadInit*>(pv); - ThreadManager::Instance()->SetCurrentThread(init->thread); -#if defined(WIN32) - SetThreadName(GetCurrentThreadId(), init->thread->name_.c_str()); -#elif defined(POSIX) - // TODO: See if naming exists for pthreads. -#endif -#ifdef USE_COCOA_THREADING - // Make sure the new thread has an autoreleasepool - ScopedAutoreleasePool pool; -#endif - if (init->runnable) { - init->runnable->Run(init->thread); - } else { - init->thread->Run(); - } - if (init->thread->delete_self_when_complete_) { - init->thread->started_ = false; - delete init->thread; - } - delete init; - return NULL; -} - -void Thread::Run() { - ProcessMessages(kForever); -} - -bool Thread::IsOwned() { - return owned_; -} - -void Thread::Stop() { - MessageQueue::Quit(); - Join(); -} - -void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { - if (fStop_) - return; - - // Sent messages are sent to the MessageHandler directly, in the context - // of "thread", like Win32 SendMessage. If in the right context, - // call the handler directly. - - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (IsCurrent()) { - phandler->OnMessage(&msg); - return; - } - - AutoThread thread; - Thread *current_thread = Thread::Current(); - ASSERT(current_thread != NULL); // AutoThread ensures this - - bool ready = false; - { - CritScope cs(&crit_); - EnsureActive(); - _SendMessage smsg; - smsg.thread = current_thread; - smsg.msg = msg; - smsg.ready = &ready; - sendlist_.push_back(smsg); - has_sends_ = true; - } - - // Wait for a reply - - ss_->WakeUp(); - - bool waited = false; - while (!ready) { - current_thread->ReceiveSends(); - current_thread->socketserver()->Wait(kForever, false); - waited = true; - } - - // Our Wait loop above may have consumed some WakeUp events for this - // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can - // cause problems for some SocketServers. - // - // Concrete example: - // Win32SocketServer on thread A calls Send on thread B. While processing the - // message, thread B Posts a message to A. We consume the wakeup for that - // Post while waiting for the Send to complete, which means that when we exit - // this loop, we need to issue another WakeUp, or else the Posted message - // won't be processed in a timely manner. - - if (waited) { - current_thread->socketserver()->WakeUp(); - } -} - -void Thread::ReceiveSends() { - // Before entering critical section, check boolean. - - if (!has_sends_) - return; - - // Receive a sent message. Cleanup scenarios: - // - thread sending exits: We don't allow this, since thread can exit - // only via Join, so Send must complete. - // - thread receiving exits: Wakeup/set ready in Thread::Clear() - // - object target cleared: Wakeup/set ready in Thread::Clear() - crit_.Enter(); - while (!sendlist_.empty()) { - _SendMessage smsg = sendlist_.front(); - sendlist_.pop_front(); - crit_.Leave(); - smsg.msg.phandler->OnMessage(&smsg.msg); - crit_.Enter(); - *smsg.ready = true; - smsg.thread->socketserver()->WakeUp(); - } - has_sends_ = false; - crit_.Leave(); -} - -void Thread::Clear(MessageHandler *phandler, uint32 id, - MessageList* removed) { - CritScope cs(&crit_); - - // Remove messages on sendlist_ with phandler - // Object target cleared: remove from send list, wakeup/set ready - // if sender not NULL. - - std::list<_SendMessage>::iterator iter = sendlist_.begin(); - while (iter != sendlist_.end()) { - _SendMessage smsg = *iter; - if (smsg.msg.Match(phandler, id)) { - if (removed) { - removed->push_back(smsg.msg); - } else { - delete smsg.msg.pdata; - } - iter = sendlist_.erase(iter); - *smsg.ready = true; - smsg.thread->socketserver()->WakeUp(); - continue; - } - ++iter; - } - - MessageQueue::Clear(phandler, id, removed); -} - -bool Thread::ProcessMessages(int cmsLoop) { - uint32 msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); - int cmsNext = cmsLoop; - - while (true) { -#ifdef USE_COCOA_THREADING - // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html - // Each thread is supposed to have an autorelease pool. Also for event loops - // like this, autorelease pool needs to be created and drained/released - // for each cycle. - ScopedAutoreleasePool pool; -#endif - Message msg; - if (!Get(&msg, cmsNext)) - return !IsQuitting(); - Dispatch(&msg); - - if (cmsLoop != kForever) { - cmsNext = TimeUntil(msEnd); - if (cmsNext < 0) - return true; - } - } -} - -bool Thread::WrapCurrent() { - return WrapCurrentWithThreadManager(ThreadManager::Instance()); -} - -bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) { - if (started_) - return false; -#if defined(WIN32) - // We explicitly ask for no rights other than synchronization. - // This gives us the best chance of succeeding. - thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); - if (!thread_) { - LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; - return false; - } -#elif defined(POSIX) - thread_ = pthread_self(); -#endif - owned_ = false; - started_ = true; - thread_manager->SetCurrentThread(this); - return true; -} - -void Thread::UnwrapCurrent() { - // Clears the platform-specific thread-specific storage. - ThreadManager::Instance()->SetCurrentThread(NULL); -#ifdef WIN32 - if (!CloseHandle(thread_)) { - LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle."; - } -#endif - started_ = false; -} - - -AutoThread::AutoThread(SocketServer* ss) : Thread(ss) { - if (!ThreadManager::Instance()->CurrentThread()) { - ThreadManager::Instance()->SetCurrentThread(this); - } -} - -AutoThread::~AutoThread() { - if (ThreadManager::Instance()->CurrentThread() == this) { - ThreadManager::Instance()->SetCurrentThread(NULL); - } -} - -#ifdef WIN32 -void ComThread::Run() { - HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); - ASSERT(SUCCEEDED(hr)); - if (SUCCEEDED(hr)) { - Thread::Run(); - CoUninitialize(); - } else { - LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr; - } -} -#endif - -} // namespace talk_base diff --git a/third_party/libjingle/overrides/talk/base/thread.h b/third_party/libjingle/overrides/talk/base/thread.h index d050182..13fc68c 100644 --- a/third_party/libjingle/overrides/talk/base/thread.h +++ b/third_party/libjingle/overrides/talk/base/thread.h @@ -116,12 +116,9 @@ class Runnable { class Thread : public MessageQueue { public: - Thread(); - explicit Thread(SocketServer* ss); + Thread(SocketServer* ss = NULL); virtual ~Thread(); - void Construct(); - static inline Thread* Current() { return ThreadManager::Instance()->CurrentThread(); } |