summaryrefslogtreecommitdiffstats
path: root/third_party/libjingle/files/talk/base/messagequeue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libjingle/files/talk/base/messagequeue.cc')
-rw-r--r--third_party/libjingle/files/talk/base/messagequeue.cc359
1 files changed, 0 insertions, 359 deletions
diff --git a/third_party/libjingle/files/talk/base/messagequeue.cc b/third_party/libjingle/files/talk/base/messagequeue.cc
deleted file mode 100644
index 171f324..0000000
--- a/third_party/libjingle/files/talk/base/messagequeue.cc
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * libjingle
- * Copyright 2004--2005, Google Inc.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * 3. The name of the author may not be used to endorse or promote products
- * derived from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
- * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
- * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
- * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
- * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
- * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#if defined(_MSC_VER) && _MSC_VER < 1300
-#pragma warning(disable:4786)
-#endif
-
-#ifdef POSIX
-extern "C" {
-#include <sys/time.h>
-}
-#endif
-
-#include "talk/base/common.h"
-#include "talk/base/logging.h"
-#include "talk/base/messagequeue.h"
-#include "talk/base/physicalsocketserver.h"
-
-
-namespace talk_base {
-
-const uint32 kMaxMsgLatency = 150; // 150 ms
-
-//------------------------------------------------------------------
-// MessageQueueManager
-
-MessageQueueManager* MessageQueueManager::instance_;
-
-MessageQueueManager* MessageQueueManager::Instance() {
- // Note: This is not thread safe, but it is first called before threads are
- // spawned.
- if (!instance_)
- instance_ = new MessageQueueManager;
- return instance_;
-}
-
-MessageQueueManager::MessageQueueManager() {
-}
-
-MessageQueueManager::~MessageQueueManager() {
-}
-
-void MessageQueueManager::Add(MessageQueue *message_queue) {
- // MessageQueueManager methods should be non-reentrant, so we
- // ASSERT that is the case.
- ASSERT(!crit_.CurrentThreadIsOwner());
- CritScope cs(&crit_);
- message_queues_.push_back(message_queue);
-}
-
-void MessageQueueManager::Remove(MessageQueue *message_queue) {
- ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
- CritScope cs(&crit_);
- std::vector<MessageQueue *>::iterator iter;
- iter = std::find(message_queues_.begin(), message_queues_.end(), message_queue);
- if (iter != message_queues_.end())
- message_queues_.erase(iter);
-}
-
-void MessageQueueManager::Clear(MessageHandler *handler) {
- ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
- CritScope cs(&crit_);
- std::vector<MessageQueue *>::iterator iter;
- for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
- (*iter)->Clear(handler);
-}
-
-//------------------------------------------------------------------
-// MessageQueue
-
-MessageQueue::MessageQueue(SocketServer* ss)
- : ss_(ss), new_ss(false), fStop_(false), fPeekKeep_(false), active_(false) {
- if (!ss_) {
- default_ss_.reset(new PhysicalSocketServer());
- ss_ = default_ss_.get();
- }
-}
-
-MessageQueue::~MessageQueue() {
- // The signal is done from here to ensure
- // that it always gets called when the queue
- // is going away.
- SignalQueueDestroyed();
- if (active_) {
- MessageQueueManager::Instance()->Remove(this);
- Clear(NULL);
- }
-}
-
-void MessageQueue::set_socketserver(SocketServer* ss) {
- ss_ = ss;
-}
-
-void MessageQueue::Quit() {
- fStop_ = true;
- ss_->WakeUp();
-}
-
-bool MessageQueue::IsQuitting() {
- return fStop_;
-}
-
-void MessageQueue::Restart() {
- fStop_ = false;
-}
-
-bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
- if (fPeekKeep_) {
- *pmsg = msgPeek_;
- return true;
- }
- if (!Get(pmsg, cmsWait))
- return false;
- msgPeek_ = *pmsg;
- fPeekKeep_ = true;
- return true;
-}
-
-bool MessageQueue::Get(Message *pmsg, int cmsWait) {
- // Return and clear peek if present
- // Always return the peek if it exists so there is Peek/Get symmetry
-
- if (fPeekKeep_) {
- *pmsg = msgPeek_;
- fPeekKeep_ = false;
- return true;
- }
-
- // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
-
- int cmsTotal = cmsWait;
- int cmsElapsed = 0;
- uint32 msStart = Time();
- uint32 msCurrent = msStart;
- while (true) {
- // Check for sent messages
-
- ReceiveSends();
-
- // Check queues
-
- int cmsDelayNext = kForever;
- {
- CritScope cs(&crit_);
-
- // Check for delayed messages that have been triggered
- // Calc the next trigger too
-
- while (!dmsgq_.empty()) {
- if (msCurrent < dmsgq_.top().msTrigger_) {
- cmsDelayNext = dmsgq_.top().msTrigger_ - msCurrent;
- break;
- }
- msgq_.push(dmsgq_.top().msg_);
- dmsgq_.pop();
- }
-
- // Check for posted events
-
- while (!msgq_.empty()) {
- *pmsg = msgq_.front();
- if (pmsg->ts_sensitive) {
- long delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
- if (delay > 0) {
- LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
- << (delay + kMaxMsgLatency) << "ms";
- }
- }
- msgq_.pop();
- if (MQID_DISPOSE == pmsg->message_id) {
- ASSERT(NULL == pmsg->phandler);
- delete pmsg->pdata;
- continue;
- }
- return true;
- }
- }
-
- if (fStop_)
- break;
-
- // Which is shorter, the delay wait or the asked wait?
-
- int cmsNext;
- if (cmsWait == kForever) {
- cmsNext = cmsDelayNext;
- } else {
- cmsNext = cmsTotal - cmsElapsed;
- if (cmsNext < 0)
- cmsNext = 0;
- if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
- cmsNext = cmsDelayNext;
- }
-
- // Wait and multiplex in the meantime
- ss_->Wait(cmsNext, true);
-
- // If the specified timeout expired, return
-
- msCurrent = Time();
- cmsElapsed = msCurrent - msStart;
- if (cmsWait != kForever) {
- if (cmsElapsed >= cmsWait)
- return false;
- }
- }
- return false;
-}
-
-void MessageQueue::ReceiveSends() {
-}
-
-void MessageQueue::Post(MessageHandler *phandler, uint32 id,
- MessageData *pdata, bool time_sensitive) {
- if (fStop_)
- return;
-
- // Keep thread safe
- // Add the message to the end of the queue
- // Signal for the multiplexer to return
-
- CritScope cs(&crit_);
- EnsureActive();
- Message msg;
- msg.phandler = phandler;
- msg.message_id = id;
- msg.pdata = pdata;
- if (time_sensitive) {
- msg.ts_sensitive = Time() + kMaxMsgLatency;
- }
- msgq_.push(msg);
- ss_->WakeUp();
-}
-
-void MessageQueue::PostDelayed(int cmsDelay, MessageHandler *phandler,
- uint32 id, MessageData *pdata) {
- if (fStop_)
- return;
-
- // Keep thread safe
- // Add to the priority queue. Gets sorted soonest first.
- // Signal for the multiplexer to return.
-
- CritScope cs(&crit_);
- EnsureActive();
- Message msg;
- msg.phandler = phandler;
- msg.message_id = id;
- msg.pdata = pdata;
- dmsgq_.push(DelayedMessage(cmsDelay, &msg));
- ss_->WakeUp();
-}
-
-int MessageQueue::GetDelay() {
- CritScope cs(&crit_);
-
- if (!msgq_.empty())
- return 0;
-
- if (!dmsgq_.empty()) {
- int delay = dmsgq_.top().msTrigger_ - Time();
- if (delay < 0)
- delay = 0;
- return delay;
- }
-
- return kForever;
-}
-
-void MessageQueue::Clear(MessageHandler *phandler, uint32 id) {
- CritScope cs(&crit_);
-
- // Remove messages with phandler
-
- if (fPeekKeep_) {
- if (phandler == NULL || msgPeek_.phandler == phandler) {
- if (id == MQID_ANY || msgPeek_.message_id == id) {
- delete msgPeek_.pdata;
- fPeekKeep_ = false;
- }
- }
- }
-
- // Remove from ordered message queue
-
- size_t c = msgq_.size();
- while (c-- != 0) {
- Message msg = msgq_.front();
- msgq_.pop();
- if (phandler != NULL && msg.phandler != phandler) {
- msgq_.push(msg);
- } else {
- if (id == MQID_ANY || msg.message_id == id) {
- delete msg.pdata;
- } else {
- msgq_.push(msg);
- }
- }
- }
-
- // Remove from priority queue. Not directly iterable, so use this approach
-
- std::queue<DelayedMessage> dmsgs;
- while (!dmsgq_.empty()) {
- DelayedMessage dmsg = dmsgq_.top();
- dmsgq_.pop();
- if (phandler != NULL && dmsg.msg_.phandler != phandler) {
- dmsgs.push(dmsg);
- } else {
- if (id == MQID_ANY || dmsg.msg_.message_id == id) {
- delete dmsg.msg_.pdata;
- } else {
- dmsgs.push(dmsg);
- }
- }
- }
- while (!dmsgs.empty()) {
- dmsgq_.push(dmsgs.front());
- dmsgs.pop();
- }
-}
-
-void MessageQueue::Dispatch(Message *pmsg) {
- pmsg->phandler->OnMessage(pmsg);
-}
-
-void MessageQueue::EnsureActive() {
- ASSERT(crit_.CurrentThreadIsOwner());
- if (!active_) {
- active_ = true;
- MessageQueueManager::Instance()->Add(this);
- }
-}
-
-} // namespace talk_base