diff options
Diffstat (limited to 'third_party/libjingle/files/talk/base/messagequeue.cc')
-rw-r--r-- | third_party/libjingle/files/talk/base/messagequeue.cc | 359 |
1 files changed, 0 insertions, 359 deletions
diff --git a/third_party/libjingle/files/talk/base/messagequeue.cc b/third_party/libjingle/files/talk/base/messagequeue.cc deleted file mode 100644 index 171f324..0000000 --- a/third_party/libjingle/files/talk/base/messagequeue.cc +++ /dev/null @@ -1,359 +0,0 @@ -/* - * libjingle - * Copyright 2004--2005, Google Inc. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * 3. The name of the author may not be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO - * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; - * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, - * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR - * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#if defined(_MSC_VER) && _MSC_VER < 1300 -#pragma warning(disable:4786) -#endif - -#ifdef POSIX -extern "C" { -#include <sys/time.h> -} -#endif - -#include "talk/base/common.h" -#include "talk/base/logging.h" -#include "talk/base/messagequeue.h" -#include "talk/base/physicalsocketserver.h" - - -namespace talk_base { - -const uint32 kMaxMsgLatency = 150; // 150 ms - -//------------------------------------------------------------------ -// MessageQueueManager - -MessageQueueManager* MessageQueueManager::instance_; - -MessageQueueManager* MessageQueueManager::Instance() { - // Note: This is not thread safe, but it is first called before threads are - // spawned. - if (!instance_) - instance_ = new MessageQueueManager; - return instance_; -} - -MessageQueueManager::MessageQueueManager() { -} - -MessageQueueManager::~MessageQueueManager() { -} - -void MessageQueueManager::Add(MessageQueue *message_queue) { - // MessageQueueManager methods should be non-reentrant, so we - // ASSERT that is the case. - ASSERT(!crit_.CurrentThreadIsOwner()); - CritScope cs(&crit_); - message_queues_.push_back(message_queue); -} - -void MessageQueueManager::Remove(MessageQueue *message_queue) { - ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. - CritScope cs(&crit_); - std::vector<MessageQueue *>::iterator iter; - iter = std::find(message_queues_.begin(), message_queues_.end(), message_queue); - if (iter != message_queues_.end()) - message_queues_.erase(iter); -} - -void MessageQueueManager::Clear(MessageHandler *handler) { - ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. - CritScope cs(&crit_); - std::vector<MessageQueue *>::iterator iter; - for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) - (*iter)->Clear(handler); -} - -//------------------------------------------------------------------ -// MessageQueue - -MessageQueue::MessageQueue(SocketServer* ss) - : ss_(ss), new_ss(false), fStop_(false), fPeekKeep_(false), active_(false) { - if (!ss_) { - default_ss_.reset(new PhysicalSocketServer()); - ss_ = default_ss_.get(); - } -} - -MessageQueue::~MessageQueue() { - // The signal is done from here to ensure - // that it always gets called when the queue - // is going away. - SignalQueueDestroyed(); - if (active_) { - MessageQueueManager::Instance()->Remove(this); - Clear(NULL); - } -} - -void MessageQueue::set_socketserver(SocketServer* ss) { - ss_ = ss; -} - -void MessageQueue::Quit() { - fStop_ = true; - ss_->WakeUp(); -} - -bool MessageQueue::IsQuitting() { - return fStop_; -} - -void MessageQueue::Restart() { - fStop_ = false; -} - -bool MessageQueue::Peek(Message *pmsg, int cmsWait) { - if (fPeekKeep_) { - *pmsg = msgPeek_; - return true; - } - if (!Get(pmsg, cmsWait)) - return false; - msgPeek_ = *pmsg; - fPeekKeep_ = true; - return true; -} - -bool MessageQueue::Get(Message *pmsg, int cmsWait) { - // Return and clear peek if present - // Always return the peek if it exists so there is Peek/Get symmetry - - if (fPeekKeep_) { - *pmsg = msgPeek_; - fPeekKeep_ = false; - return true; - } - - // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch - - int cmsTotal = cmsWait; - int cmsElapsed = 0; - uint32 msStart = Time(); - uint32 msCurrent = msStart; - while (true) { - // Check for sent messages - - ReceiveSends(); - - // Check queues - - int cmsDelayNext = kForever; - { - CritScope cs(&crit_); - - // Check for delayed messages that have been triggered - // Calc the next trigger too - - while (!dmsgq_.empty()) { - if (msCurrent < dmsgq_.top().msTrigger_) { - cmsDelayNext = dmsgq_.top().msTrigger_ - msCurrent; - break; - } - msgq_.push(dmsgq_.top().msg_); - dmsgq_.pop(); - } - - // Check for posted events - - while (!msgq_.empty()) { - *pmsg = msgq_.front(); - if (pmsg->ts_sensitive) { - long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); - if (delay > 0) { - LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " - << (delay + kMaxMsgLatency) << "ms"; - } - } - msgq_.pop(); - if (MQID_DISPOSE == pmsg->message_id) { - ASSERT(NULL == pmsg->phandler); - delete pmsg->pdata; - continue; - } - return true; - } - } - - if (fStop_) - break; - - // Which is shorter, the delay wait or the asked wait? - - int cmsNext; - if (cmsWait == kForever) { - cmsNext = cmsDelayNext; - } else { - cmsNext = cmsTotal - cmsElapsed; - if (cmsNext < 0) - cmsNext = 0; - if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) - cmsNext = cmsDelayNext; - } - - // Wait and multiplex in the meantime - ss_->Wait(cmsNext, true); - - // If the specified timeout expired, return - - msCurrent = Time(); - cmsElapsed = msCurrent - msStart; - if (cmsWait != kForever) { - if (cmsElapsed >= cmsWait) - return false; - } - } - return false; -} - -void MessageQueue::ReceiveSends() { -} - -void MessageQueue::Post(MessageHandler *phandler, uint32 id, - MessageData *pdata, bool time_sensitive) { - if (fStop_) - return; - - // Keep thread safe - // Add the message to the end of the queue - // Signal for the multiplexer to return - - CritScope cs(&crit_); - EnsureActive(); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = Time() + kMaxMsgLatency; - } - msgq_.push(msg); - ss_->WakeUp(); -} - -void MessageQueue::PostDelayed(int cmsDelay, MessageHandler *phandler, - uint32 id, MessageData *pdata) { - if (fStop_) - return; - - // Keep thread safe - // Add to the priority queue. Gets sorted soonest first. - // Signal for the multiplexer to return. - - CritScope cs(&crit_); - EnsureActive(); - Message msg; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - dmsgq_.push(DelayedMessage(cmsDelay, &msg)); - ss_->WakeUp(); -} - -int MessageQueue::GetDelay() { - CritScope cs(&crit_); - - if (!msgq_.empty()) - return 0; - - if (!dmsgq_.empty()) { - int delay = dmsgq_.top().msTrigger_ - Time(); - if (delay < 0) - delay = 0; - return delay; - } - - return kForever; -} - -void MessageQueue::Clear(MessageHandler *phandler, uint32 id) { - CritScope cs(&crit_); - - // Remove messages with phandler - - if (fPeekKeep_) { - if (phandler == NULL || msgPeek_.phandler == phandler) { - if (id == MQID_ANY || msgPeek_.message_id == id) { - delete msgPeek_.pdata; - fPeekKeep_ = false; - } - } - } - - // Remove from ordered message queue - - size_t c = msgq_.size(); - while (c-- != 0) { - Message msg = msgq_.front(); - msgq_.pop(); - if (phandler != NULL && msg.phandler != phandler) { - msgq_.push(msg); - } else { - if (id == MQID_ANY || msg.message_id == id) { - delete msg.pdata; - } else { - msgq_.push(msg); - } - } - } - - // Remove from priority queue. Not directly iterable, so use this approach - - std::queue<DelayedMessage> dmsgs; - while (!dmsgq_.empty()) { - DelayedMessage dmsg = dmsgq_.top(); - dmsgq_.pop(); - if (phandler != NULL && dmsg.msg_.phandler != phandler) { - dmsgs.push(dmsg); - } else { - if (id == MQID_ANY || dmsg.msg_.message_id == id) { - delete dmsg.msg_.pdata; - } else { - dmsgs.push(dmsg); - } - } - } - while (!dmsgs.empty()) { - dmsgq_.push(dmsgs.front()); - dmsgs.pop(); - } -} - -void MessageQueue::Dispatch(Message *pmsg) { - pmsg->phandler->OnMessage(pmsg); -} - -void MessageQueue::EnsureActive() { - ASSERT(crit_.CurrentThreadIsOwner()); - if (!active_) { - active_ = true; - MessageQueueManager::Instance()->Add(this); - } -} - -} // namespace talk_base |