diff options
Diffstat (limited to 'third_party/libjingle/files/talk/base/messagequeue.cc')
-rw-r--r-- | third_party/libjingle/files/talk/base/messagequeue.cc | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/third_party/libjingle/files/talk/base/messagequeue.cc b/third_party/libjingle/files/talk/base/messagequeue.cc new file mode 100644 index 0000000..171f324 --- /dev/null +++ b/third_party/libjingle/files/talk/base/messagequeue.cc @@ -0,0 +1,359 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif + +#ifdef POSIX +extern "C" { +#include <sys/time.h> +} +#endif + +#include "talk/base/common.h" +#include "talk/base/logging.h" +#include "talk/base/messagequeue.h" +#include "talk/base/physicalsocketserver.h" + + +namespace talk_base { + +const uint32 kMaxMsgLatency = 150; // 150 ms + +//------------------------------------------------------------------ +// MessageQueueManager + +MessageQueueManager* MessageQueueManager::instance_; + +MessageQueueManager* MessageQueueManager::Instance() { + // Note: This is not thread safe, but it is first called before threads are + // spawned. + if (!instance_) + instance_ = new MessageQueueManager; + return instance_; +} + +MessageQueueManager::MessageQueueManager() { +} + +MessageQueueManager::~MessageQueueManager() { +} + +void MessageQueueManager::Add(MessageQueue *message_queue) { + // MessageQueueManager methods should be non-reentrant, so we + // ASSERT that is the case. + ASSERT(!crit_.CurrentThreadIsOwner()); + CritScope cs(&crit_); + message_queues_.push_back(message_queue); +} + +void MessageQueueManager::Remove(MessageQueue *message_queue) { + ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. + CritScope cs(&crit_); + std::vector<MessageQueue *>::iterator iter; + iter = std::find(message_queues_.begin(), message_queues_.end(), message_queue); + if (iter != message_queues_.end()) + message_queues_.erase(iter); +} + +void MessageQueueManager::Clear(MessageHandler *handler) { + ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. + CritScope cs(&crit_); + std::vector<MessageQueue *>::iterator iter; + for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) + (*iter)->Clear(handler); +} + +//------------------------------------------------------------------ +// MessageQueue + +MessageQueue::MessageQueue(SocketServer* ss) + : ss_(ss), new_ss(false), fStop_(false), fPeekKeep_(false), active_(false) { + if (!ss_) { + default_ss_.reset(new PhysicalSocketServer()); + ss_ = default_ss_.get(); + } +} + +MessageQueue::~MessageQueue() { + // The signal is done from here to ensure + // that it always gets called when the queue + // is going away. + SignalQueueDestroyed(); + if (active_) { + MessageQueueManager::Instance()->Remove(this); + Clear(NULL); + } +} + +void MessageQueue::set_socketserver(SocketServer* ss) { + ss_ = ss; +} + +void MessageQueue::Quit() { + fStop_ = true; + ss_->WakeUp(); +} + +bool MessageQueue::IsQuitting() { + return fStop_; +} + +void MessageQueue::Restart() { + fStop_ = false; +} + +bool MessageQueue::Peek(Message *pmsg, int cmsWait) { + if (fPeekKeep_) { + *pmsg = msgPeek_; + return true; + } + if (!Get(pmsg, cmsWait)) + return false; + msgPeek_ = *pmsg; + fPeekKeep_ = true; + return true; +} + +bool MessageQueue::Get(Message *pmsg, int cmsWait) { + // Return and clear peek if present + // Always return the peek if it exists so there is Peek/Get symmetry + + if (fPeekKeep_) { + *pmsg = msgPeek_; + fPeekKeep_ = false; + return true; + } + + // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch + + int cmsTotal = cmsWait; + int cmsElapsed = 0; + uint32 msStart = Time(); + uint32 msCurrent = msStart; + while (true) { + // Check for sent messages + + ReceiveSends(); + + // Check queues + + int cmsDelayNext = kForever; + { + CritScope cs(&crit_); + + // Check for delayed messages that have been triggered + // Calc the next trigger too + + while (!dmsgq_.empty()) { + if (msCurrent < dmsgq_.top().msTrigger_) { + cmsDelayNext = dmsgq_.top().msTrigger_ - msCurrent; + break; + } + msgq_.push(dmsgq_.top().msg_); + dmsgq_.pop(); + } + + // Check for posted events + + while (!msgq_.empty()) { + *pmsg = msgq_.front(); + if (pmsg->ts_sensitive) { + long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); + if (delay > 0) { + LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " + << (delay + kMaxMsgLatency) << "ms"; + } + } + msgq_.pop(); + if (MQID_DISPOSE == pmsg->message_id) { + ASSERT(NULL == pmsg->phandler); + delete pmsg->pdata; + continue; + } + return true; + } + } + + if (fStop_) + break; + + // Which is shorter, the delay wait or the asked wait? + + int cmsNext; + if (cmsWait == kForever) { + cmsNext = cmsDelayNext; + } else { + cmsNext = cmsTotal - cmsElapsed; + if (cmsNext < 0) + cmsNext = 0; + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) + cmsNext = cmsDelayNext; + } + + // Wait and multiplex in the meantime + ss_->Wait(cmsNext, true); + + // If the specified timeout expired, return + + msCurrent = Time(); + cmsElapsed = msCurrent - msStart; + if (cmsWait != kForever) { + if (cmsElapsed >= cmsWait) + return false; + } + } + return false; +} + +void MessageQueue::ReceiveSends() { +} + +void MessageQueue::Post(MessageHandler *phandler, uint32 id, + MessageData *pdata, bool time_sensitive) { + if (fStop_) + return; + + // Keep thread safe + // Add the message to the end of the queue + // Signal for the multiplexer to return + + CritScope cs(&crit_); + EnsureActive(); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (time_sensitive) { + msg.ts_sensitive = Time() + kMaxMsgLatency; + } + msgq_.push(msg); + ss_->WakeUp(); +} + +void MessageQueue::PostDelayed(int cmsDelay, MessageHandler *phandler, + uint32 id, MessageData *pdata) { + if (fStop_) + return; + + // Keep thread safe + // Add to the priority queue. Gets sorted soonest first. + // Signal for the multiplexer to return. + + CritScope cs(&crit_); + EnsureActive(); + Message msg; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + dmsgq_.push(DelayedMessage(cmsDelay, &msg)); + ss_->WakeUp(); +} + +int MessageQueue::GetDelay() { + CritScope cs(&crit_); + + if (!msgq_.empty()) + return 0; + + if (!dmsgq_.empty()) { + int delay = dmsgq_.top().msTrigger_ - Time(); + if (delay < 0) + delay = 0; + return delay; + } + + return kForever; +} + +void MessageQueue::Clear(MessageHandler *phandler, uint32 id) { + CritScope cs(&crit_); + + // Remove messages with phandler + + if (fPeekKeep_) { + if (phandler == NULL || msgPeek_.phandler == phandler) { + if (id == MQID_ANY || msgPeek_.message_id == id) { + delete msgPeek_.pdata; + fPeekKeep_ = false; + } + } + } + + // Remove from ordered message queue + + size_t c = msgq_.size(); + while (c-- != 0) { + Message msg = msgq_.front(); + msgq_.pop(); + if (phandler != NULL && msg.phandler != phandler) { + msgq_.push(msg); + } else { + if (id == MQID_ANY || msg.message_id == id) { + delete msg.pdata; + } else { + msgq_.push(msg); + } + } + } + + // Remove from priority queue. Not directly iterable, so use this approach + + std::queue<DelayedMessage> dmsgs; + while (!dmsgq_.empty()) { + DelayedMessage dmsg = dmsgq_.top(); + dmsgq_.pop(); + if (phandler != NULL && dmsg.msg_.phandler != phandler) { + dmsgs.push(dmsg); + } else { + if (id == MQID_ANY || dmsg.msg_.message_id == id) { + delete dmsg.msg_.pdata; + } else { + dmsgs.push(dmsg); + } + } + } + while (!dmsgs.empty()) { + dmsgq_.push(dmsgs.front()); + dmsgs.pop(); + } +} + +void MessageQueue::Dispatch(Message *pmsg) { + pmsg->phandler->OnMessage(pmsg); +} + +void MessageQueue::EnsureActive() { + ASSERT(crit_.CurrentThreadIsOwner()); + if (!active_) { + active_ = true; + MessageQueueManager::Instance()->Add(this); + } +} + +} // namespace talk_base |