summaryrefslogtreecommitdiffstats
path: root/jingle/glue/thread_wrapper.cc
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-08-01 22:12:32 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2011-08-01 22:12:32 +0000
commit7e6a06163bc9bc4bd43e8fdc53104eda830801bc (patch)
tree9384a86d18b818bded59fe9e141f063ad6215ae2 /jingle/glue/thread_wrapper.cc
parent0cef904cec62f1aa0191fd36980c6bd3e1e3e896 (diff)
downloadchromium_src-7e6a06163bc9bc4bd43e8fdc53104eda830801bc.zip
chromium_src-7e6a06163bc9bc4bd43e8fdc53104eda830801bc.tar.gz
chromium_src-7e6a06163bc9bc4bd43e8fdc53104eda830801bc.tar.bz2
Implement Send() in JingleThreadWrapper.
Send() is used in some of libjingle code. WebRTC needs to create two separate threads which Send() messages to each other. This wasn't previously supported in JingleThreadWrapper. BUG=None TEST=Unittests Review URL: http://codereview.chromium.org/7520014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@94992 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'jingle/glue/thread_wrapper.cc')
-rw-r--r--jingle/glue/thread_wrapper.cc150
1 files changed, 129 insertions, 21 deletions
diff --git a/jingle/glue/thread_wrapper.cc b/jingle/glue/thread_wrapper.cc
index 979064b..036122b 100644
--- a/jingle/glue/thread_wrapper.cc
+++ b/jingle/glue/thread_wrapper.cc
@@ -4,29 +4,46 @@
#include "jingle/glue/thread_wrapper.h"
+#include "base/lazy_instance.h"
+#include "base/threading/thread_local.h"
+
namespace jingle_glue {
+struct JingleThreadWrapper::PendingSend {
+ PendingSend(const talk_base::Message& message_value)
+ : sending_thread(JingleThreadWrapper::current()),
+ message(message_value),
+ done_event(true, false) {
+ DCHECK(sending_thread);
+ }
+
+ JingleThreadWrapper* sending_thread;
+ talk_base::Message message;
+ base::WaitableEvent done_event;
+};
+
+base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
+ g_jingle_thread_wrapper(base::LINKER_INITIALIZED);
+
// static
void JingleThreadWrapper::EnsureForCurrentThread() {
- talk_base::Thread* current_thread = talk_base::Thread::Current();
- // If JingleThreadWrapper already exists for the current thread then
- // it is returned by talk_base::Thread::Current().
- // talk_base::Thread::Current() may also return non-null value for
- // the main thread because talk_base::ThreadManager creates
- // talk_base::Thread object for it. IsOwned() allows to distinguish
- // talk_base::Thread object created by talk_base::ThreadManager from
- // other talk_base::Thread objects. Because talk_base::Thread
- // objects should never created by chromium code, we can assume that
- // if talk_base::Thread::Current() returns non-null value and it
- // isn't the object created by talk_base::ThreadManager then
- // JingleThreadWrapper already exists for the current thread.
- if (current_thread == NULL || !current_thread->IsOwned()) {
- new JingleThreadWrapper(MessageLoop::current());
+ if (JingleThreadWrapper::current() == NULL) {
+ g_jingle_thread_wrapper.Get().Set(
+ new JingleThreadWrapper(MessageLoop::current()));
}
+
+ DCHECK_EQ(talk_base::Thread::Current(), current());
+}
+
+// static
+JingleThreadWrapper* JingleThreadWrapper::current() {
+ return g_jingle_thread_wrapper.Get().Get();
}
JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop)
- : message_loop_(message_loop) {
+ : message_loop_(message_loop),
+ send_allowed_(false),
+ pending_send_event_(true, false) {
DCHECK_EQ(message_loop_, MessageLoop::current());
talk_base::ThreadManager::SetCurrent(this);
@@ -38,6 +55,8 @@ JingleThreadWrapper::~JingleThreadWrapper() {
}
void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
+ DCHECK_EQ(talk_base::Thread::Current(), current());
+ g_jingle_thread_wrapper.Get().Set(NULL);
talk_base::ThreadManager::SetCurrent(NULL);
talk_base::MessageQueueManager::Instance()->Remove(this);
message_loop_->RemoveDestructionObserver(this);
@@ -62,18 +81,107 @@ void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id,
for (MessagesQueue::iterator it = messages_.begin();
it != messages_.end();) {
+ MessagesQueue::iterator next = it;
+ ++next;
+
if (it->second.Match(handler, id)) {
if (removed) {
removed->push_back(it->second);
} else {
delete it->second.pdata;
}
- MessagesQueue::iterator next = it;
- ++next;
messages_.erase(it);
- it = next;
- } else {
- ++it;
+ }
+
+ it = next;
+ }
+
+ for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
+ it != pending_send_messages_.end();) {
+ std::list<PendingSend*>::iterator next = it;
+ ++next;
+
+ if ((*it)->message.Match(handler, id)) {
+ if (removed) {
+ removed ->push_back((*it)->message);
+ } else {
+ delete (*it)->message.pdata;
+ }
+ (*it)->done_event.Signal();
+ pending_send_messages_.erase(it);
+ }
+
+ it = next;
+ }
+}
+
+void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id,
+ talk_base::MessageData *data) {
+ if (fStop_)
+ return;
+
+ JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
+ DCHECK(current_thread != NULL) << "Send() can be called only from a "
+ "thread that has JingleThreadWrapper.";
+
+ talk_base::Message message;
+ message.phandler = handler;
+ message.message_id = id;
+ message.pdata = data;
+
+ if (current_thread == this) {
+ handler->OnMessage(&message);
+ return;
+ }
+
+ // Send message from a thread different than |this|.
+
+ // Allow inter-thread send only from threads that have
+ // |send_allowed_| flag set.
+ DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
+ "messages is not allowed from the current thread.";
+
+ PendingSend pending_send(message);
+ {
+ base::AutoLock auto_lock(lock_);
+ pending_send_messages_.push_back(&pending_send);
+ }
+
+ // Need to signal |pending_send_event_| here in case the thread is
+ // sending message to another thread.
+ pending_send_event_.Signal();
+ message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &JingleThreadWrapper::ProcessPendingSends));
+
+
+ while (!pending_send.done_event.IsSignaled()) {
+ base::WaitableEvent* events[] = {&pending_send.done_event,
+ &current_thread->pending_send_event_};
+ size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
+ DCHECK(event == 0 || event == 1);
+
+ if (event == 1)
+ current_thread->ProcessPendingSends();
+ }
+}
+
+void JingleThreadWrapper::ProcessPendingSends() {
+ while (true) {
+ PendingSend* pending_send = NULL;
+ {
+ base::AutoLock auto_lock(lock_);
+ if (!pending_send_messages_.empty()) {
+ pending_send = pending_send_messages_.front();
+ pending_send_messages_.pop_front();
+ } else {
+ // Reset the event while |lock_| is still locked.
+ pending_send_event_.Reset();
+ break;
+ }
+ }
+ if (pending_send) {
+ pending_send->message.phandler->OnMessage(&pending_send->message);
+ pending_send->done_event.Signal();
}
}
}
@@ -150,7 +258,7 @@ void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*,
NOTREACHED();
}
-void JingleThreadWrapper::Dispatch(talk_base::Message* msg) {
+void JingleThreadWrapper::Dispatch(talk_base::Message* message) {
NOTREACHED();
}