diff options
Diffstat (limited to 'jingle')
-rw-r--r-- | jingle/glue/thread_wrapper.cc | 150 | ||||
-rw-r--r-- | jingle/glue/thread_wrapper.h | 68 | ||||
-rw-r--r-- | jingle/glue/thread_wrapper_unittest.cc | 195 | ||||
-rw-r--r-- | jingle/jingle.gyp | 1 |
4 files changed, 322 insertions, 92 deletions
diff --git a/jingle/glue/thread_wrapper.cc b/jingle/glue/thread_wrapper.cc index 979064b..036122b 100644 --- a/jingle/glue/thread_wrapper.cc +++ b/jingle/glue/thread_wrapper.cc @@ -4,29 +4,46 @@ #include "jingle/glue/thread_wrapper.h" +#include "base/lazy_instance.h" +#include "base/threading/thread_local.h" + namespace jingle_glue { +struct JingleThreadWrapper::PendingSend { + PendingSend(const talk_base::Message& message_value) + : sending_thread(JingleThreadWrapper::current()), + message(message_value), + done_event(true, false) { + DCHECK(sending_thread); + } + + JingleThreadWrapper* sending_thread; + talk_base::Message message; + base::WaitableEvent done_event; +}; + +base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> > + g_jingle_thread_wrapper(base::LINKER_INITIALIZED); + // static void JingleThreadWrapper::EnsureForCurrentThread() { - talk_base::Thread* current_thread = talk_base::Thread::Current(); - // If JingleThreadWrapper already exists for the current thread then - // it is returned by talk_base::Thread::Current(). - // talk_base::Thread::Current() may also return non-null value for - // the main thread because talk_base::ThreadManager creates - // talk_base::Thread object for it. IsOwned() allows to distinguish - // talk_base::Thread object created by talk_base::ThreadManager from - // other talk_base::Thread objects. Because talk_base::Thread - // objects should never created by chromium code, we can assume that - // if talk_base::Thread::Current() returns non-null value and it - // isn't the object created by talk_base::ThreadManager then - // JingleThreadWrapper already exists for the current thread. - if (current_thread == NULL || !current_thread->IsOwned()) { - new JingleThreadWrapper(MessageLoop::current()); + if (JingleThreadWrapper::current() == NULL) { + g_jingle_thread_wrapper.Get().Set( + new JingleThreadWrapper(MessageLoop::current())); } + + DCHECK_EQ(talk_base::Thread::Current(), current()); +} + +// static +JingleThreadWrapper* JingleThreadWrapper::current() { + return g_jingle_thread_wrapper.Get().Get(); } JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop) - : message_loop_(message_loop) { + : message_loop_(message_loop), + send_allowed_(false), + pending_send_event_(true, false) { DCHECK_EQ(message_loop_, MessageLoop::current()); talk_base::ThreadManager::SetCurrent(this); @@ -38,6 +55,8 @@ JingleThreadWrapper::~JingleThreadWrapper() { } void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { + DCHECK_EQ(talk_base::Thread::Current(), current()); + g_jingle_thread_wrapper.Get().Set(NULL); talk_base::ThreadManager::SetCurrent(NULL); talk_base::MessageQueueManager::Instance()->Remove(this); message_loop_->RemoveDestructionObserver(this); @@ -62,18 +81,107 @@ void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id, for (MessagesQueue::iterator it = messages_.begin(); it != messages_.end();) { + MessagesQueue::iterator next = it; + ++next; + if (it->second.Match(handler, id)) { if (removed) { removed->push_back(it->second); } else { delete it->second.pdata; } - MessagesQueue::iterator next = it; - ++next; messages_.erase(it); - it = next; - } else { - ++it; + } + + it = next; + } + + for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin(); + it != pending_send_messages_.end();) { + std::list<PendingSend*>::iterator next = it; + ++next; + + if ((*it)->message.Match(handler, id)) { + if (removed) { + removed ->push_back((*it)->message); + } else { + delete (*it)->message.pdata; + } + (*it)->done_event.Signal(); + pending_send_messages_.erase(it); + } + + it = next; + } +} + +void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id, + talk_base::MessageData *data) { + if (fStop_) + return; + + JingleThreadWrapper* current_thread = JingleThreadWrapper::current(); + DCHECK(current_thread != NULL) << "Send() can be called only from a " + "thread that has JingleThreadWrapper."; + + talk_base::Message message; + message.phandler = handler; + message.message_id = id; + message.pdata = data; + + if (current_thread == this) { + handler->OnMessage(&message); + return; + } + + // Send message from a thread different than |this|. + + // Allow inter-thread send only from threads that have + // |send_allowed_| flag set. + DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous " + "messages is not allowed from the current thread."; + + PendingSend pending_send(message); + { + base::AutoLock auto_lock(lock_); + pending_send_messages_.push_back(&pending_send); + } + + // Need to signal |pending_send_event_| here in case the thread is + // sending message to another thread. + pending_send_event_.Signal(); + message_loop_->PostTask(FROM_HERE, NewRunnableMethod( + this, &JingleThreadWrapper::ProcessPendingSends)); + + + while (!pending_send.done_event.IsSignaled()) { + base::WaitableEvent* events[] = {&pending_send.done_event, + ¤t_thread->pending_send_event_}; + size_t event = base::WaitableEvent::WaitMany(events, arraysize(events)); + DCHECK(event == 0 || event == 1); + + if (event == 1) + current_thread->ProcessPendingSends(); + } +} + +void JingleThreadWrapper::ProcessPendingSends() { + while (true) { + PendingSend* pending_send = NULL; + { + base::AutoLock auto_lock(lock_); + if (!pending_send_messages_.empty()) { + pending_send = pending_send_messages_.front(); + pending_send_messages_.pop_front(); + } else { + // Reset the event while |lock_| is still locked. + pending_send_event_.Reset(); + break; + } + } + if (pending_send) { + pending_send->message.phandler->OnMessage(&pending_send->message); + pending_send->done_event.Signal(); } } } @@ -150,7 +258,7 @@ void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*, NOTREACHED(); } -void JingleThreadWrapper::Dispatch(talk_base::Message* msg) { +void JingleThreadWrapper::Dispatch(talk_base::Message* message) { NOTREACHED(); } diff --git a/jingle/glue/thread_wrapper.h b/jingle/glue/thread_wrapper.h index 5b5cf08..4707e21 100644 --- a/jingle/glue/thread_wrapper.h +++ b/jingle/glue/thread_wrapper.h @@ -5,21 +5,21 @@ #ifndef JINGLE_GLUE_THREAD_WRAPPER_H_ #define JINGLE_GLUE_THREAD_WRAPPER_H_ +#include <list> #include <map> #include "base/message_loop.h" #include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" #include "third_party/libjingle/source/talk/base/thread.h" -class MessageLoop; - namespace jingle_glue { // JingleThreadWrapper wraps Chromium threads using talk_base::Thread -// interface. The object must be created on a thread it belongs -// to. Each JingleThreadWrapper deletes itself when MessageLoop is -// destroyed. Currently only the bare minimum that is used by P2P -// part of libjingle is implemented. +// interface. The object must be created by calling +// EnsureForCurrentThread(). Each JingleThreadWrapper deletes itself +// when MessageLoop is destroyed. Currently only the bare minimum that +// is used by P2P part of libjingle is implemented. class JingleThreadWrapper : public MessageLoop::DestructionObserver, public talk_base::Thread { @@ -28,21 +28,38 @@ class JingleThreadWrapper // been created yet. static void EnsureForCurrentThread(); + // Returns thread wrapper for the current thread. NULL is returned + // if EnsureForCurrentThread() has never been called for this + // thread. + static JingleThreadWrapper* current(); + JingleThreadWrapper(MessageLoop* message_loop); + // Sets whether the thread can be used to send messages + // synchronously to another thread using Send() method. Set to false + // by default to avoid potential jankiness when Send() used on + // renderer thread. It should be set explicitly for threads that + // need to call Send() for other threads. + void set_send_allowed(bool allowed) { send_allowed_ = allowed; } + // MessageLoop::DestructionObserver implementation. virtual void WillDestroyCurrentMessageLoop() OVERRIDE; // talk_base::MessageQueue overrides. - virtual void Post(talk_base::MessageHandler *phandler, uint32 id = 0, - talk_base::MessageData *pdata = NULL, - bool time_sensitive = false) OVERRIDE; - virtual void PostDelayed( - int delay_ms, talk_base::MessageHandler* handler, uint32 id = 0, - talk_base::MessageData* data = NULL) OVERRIDE; + virtual void Post(talk_base::MessageHandler *phandler, + uint32 id, + talk_base::MessageData *pdata, + bool time_sensitive) OVERRIDE; + virtual void PostDelayed(int delay_ms, + talk_base::MessageHandler* handler, + uint32 id, + talk_base::MessageData* data) OVERRIDE; virtual void Clear(talk_base::MessageHandler* handler, - uint32 id = talk_base::MQID_ANY, - talk_base::MessageList* removed = NULL) OVERRIDE; + uint32 id, + talk_base::MessageList* removed) OVERRIDE; + virtual void Send(talk_base::MessageHandler *handler, + uint32 id, + talk_base::MessageData *data) OVERRIDE; // Following methods are not supported.They are overriden just to // ensure that they are not called (each of them contain NOTREACHED @@ -51,13 +68,16 @@ class JingleThreadWrapper virtual void Quit() OVERRIDE; virtual bool IsQuitting() OVERRIDE; virtual void Restart() OVERRIDE; - virtual bool Get(talk_base::Message* msg, int delay_ms = talk_base::kForever, - bool process_io = true) OVERRIDE; - virtual bool Peek(talk_base::Message* msg, int delay_ms = 0) OVERRIDE; - virtual void PostAt( - uint32 timestamp, talk_base::MessageHandler* handler, - uint32 id = 0, talk_base::MessageData* data = NULL) OVERRIDE; - virtual void Dispatch(talk_base::Message* msg) OVERRIDE; + virtual bool Get(talk_base::Message* message, + int delay_ms, + bool process_io) OVERRIDE; + virtual bool Peek(talk_base::Message* message, + int delay_ms) OVERRIDE; + virtual void PostAt(uint32 timestamp, + talk_base::MessageHandler* handler, + uint32 id, + talk_base::MessageData* data) OVERRIDE; + virtual void Dispatch(talk_base::Message* message) OVERRIDE; virtual void ReceiveSends() OVERRIDE; virtual int GetDelay() OVERRIDE; @@ -67,6 +87,7 @@ class JingleThreadWrapper private: typedef std::map<int, talk_base::Message> MessagesQueue; + struct PendingSend; virtual ~JingleThreadWrapper(); @@ -74,14 +95,19 @@ class JingleThreadWrapper int delay_ms, talk_base::MessageHandler* handler, uint32 message_id, talk_base::MessageData* data); void RunTask(int task_id); + void ProcessPendingSends(); // Chromium thread used to execute messages posted on this thread. MessageLoop* message_loop_; + bool send_allowed_; + // |lock_| must be locked when accessing |messages_|. base::Lock lock_; int last_task_id_; MessagesQueue messages_; + std::list<PendingSend*> pending_send_messages_; + base::WaitableEvent pending_send_event_; }; } diff --git a/jingle/glue/thread_wrapper_unittest.cc b/jingle/glue/thread_wrapper_unittest.cc index 20f3648..87ef510 100644 --- a/jingle/glue/thread_wrapper_unittest.cc +++ b/jingle/glue/thread_wrapper_unittest.cc @@ -2,12 +2,17 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/bind.h" #include "base/message_loop.h" +#include "base/threading/thread.h" #include "jingle/glue/thread_wrapper.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" +using ::testing::DoAll; using ::testing::InSequence; +using ::testing::InvokeWithoutArgs; +using ::testing::Mock; namespace jingle_glue { @@ -25,88 +30,100 @@ class MockMessageHandler : public talk_base::MessageHandler { MOCK_METHOD1(OnMessage, void(talk_base::Message* msg)); }; +MATCHER_P3(MatchMessage, handler, message_id, data, "") { + return arg->phandler == handler && + arg->message_id == message_id && + arg->pdata == data; +} + +ACTION(DeleteMessageData) { + delete arg0->pdata; +} + class ThreadWrapperTest : public testing::Test { - protected: - ThreadWrapperTest() { + public: + // This method is used by the SendDuringSend test. It sends message to the + // main thread synchronously using Send(). + void PingMainThread() { + talk_base::MessageData* data = new talk_base::MessageData(); + MockMessageHandler handler; + + EXPECT_CALL(handler, OnMessage( + MatchMessage(&handler, kTestMessage2, data))) + .WillOnce(DeleteMessageData()); + thread_->Send(&handler, kTestMessage2, data); } - talk_base::Thread* thread() { - return talk_base::Thread::Current(); + protected: + ThreadWrapperTest() + : thread_(NULL) { } virtual void SetUp() OVERRIDE { JingleThreadWrapper::EnsureForCurrentThread(); + thread_ = talk_base::Thread::Current(); } // ThreadWrapper destroyes itself when |message_loop_| is destroyed. MessageLoop message_loop_; + talk_base::Thread* thread_; MockMessageHandler handler1_; MockMessageHandler handler2_; }; -MATCHER_P3(MatchMessage, handler, message_id, data, "") { - return arg->phandler == handler && - arg->message_id == message_id && - arg->pdata == data; -} - -ACTION(DeleteMessageData) { - delete arg0->pdata; -} - TEST_F(ThreadWrapperTest, Post) { - talk_base::MessageData* data1_ = new talk_base::MessageData(); - talk_base::MessageData* data2_ = new talk_base::MessageData(); - talk_base::MessageData* data3_ = new talk_base::MessageData(); - talk_base::MessageData* data4_ = new talk_base::MessageData(); + talk_base::MessageData* data1 = new talk_base::MessageData(); + talk_base::MessageData* data2 = new talk_base::MessageData(); + talk_base::MessageData* data3 = new talk_base::MessageData(); + talk_base::MessageData* data4 = new talk_base::MessageData(); - thread()->Post(&handler1_, kTestMessage1, data1_); - thread()->Post(&handler1_, kTestMessage2, data2_); - thread()->Post(&handler2_, kTestMessage1, data3_); - thread()->Post(&handler2_, kTestMessage1, data4_); + thread_->Post(&handler1_, kTestMessage1, data1); + thread_->Post(&handler1_, kTestMessage2, data2); + thread_->Post(&handler2_, kTestMessage1, data3); + thread_->Post(&handler2_, kTestMessage1, data4); InSequence in_seq; EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data1_))) + MatchMessage(&handler1_, kTestMessage1, data1))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage2, data2_))) + MatchMessage(&handler1_, kTestMessage2, data2))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data3_))) + MatchMessage(&handler2_, kTestMessage1, data3))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data4_))) + MatchMessage(&handler2_, kTestMessage1, data4))) .WillOnce(DeleteMessageData()); message_loop_.RunAllPending(); } TEST_F(ThreadWrapperTest, PostDelayed) { - talk_base::MessageData* data1_ = new talk_base::MessageData(); - talk_base::MessageData* data2_ = new talk_base::MessageData(); - talk_base::MessageData* data3_ = new talk_base::MessageData(); - talk_base::MessageData* data4_ = new talk_base::MessageData(); + talk_base::MessageData* data1 = new talk_base::MessageData(); + talk_base::MessageData* data2 = new talk_base::MessageData(); + talk_base::MessageData* data3 = new talk_base::MessageData(); + talk_base::MessageData* data4 = new talk_base::MessageData(); - thread()->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, data1_); - thread()->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, data2_); - thread()->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, data3_); - thread()->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, data4_); + thread_->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, data1); + thread_->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, data2); + thread_->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, data3); + thread_->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, data4); InSequence in_seq; EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data1_))) + MatchMessage(&handler1_, kTestMessage1, data1))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage2, data2_))) + MatchMessage(&handler1_, kTestMessage2, data2))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data3_))) + MatchMessage(&handler2_, kTestMessage1, data3))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data4_))) + MatchMessage(&handler2_, kTestMessage1, data4))) .WillOnce(DeleteMessageData()); message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), @@ -115,12 +132,12 @@ TEST_F(ThreadWrapperTest, PostDelayed) { } TEST_F(ThreadWrapperTest, Clear) { - thread()->Post(&handler1_, kTestMessage1, NULL); - thread()->Post(&handler1_, kTestMessage2, NULL); - thread()->Post(&handler2_, kTestMessage1, NULL); - thread()->Post(&handler2_, kTestMessage2, NULL); + thread_->Post(&handler1_, kTestMessage1, NULL); + thread_->Post(&handler1_, kTestMessage2, NULL); + thread_->Post(&handler2_, kTestMessage1, NULL); + thread_->Post(&handler2_, kTestMessage2, NULL); - thread()->Clear(&handler1_, kTestMessage2); + thread_->Clear(&handler1_, kTestMessage2); InSequence in_seq; @@ -139,12 +156,12 @@ TEST_F(ThreadWrapperTest, Clear) { } TEST_F(ThreadWrapperTest, ClearDelayed) { - thread()->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, NULL); - thread()->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, NULL); - thread()->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, NULL); - thread()->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, NULL); + thread_->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, NULL); + thread_->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, NULL); + thread_->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, NULL); + thread_->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, NULL); - thread()->Clear(&handler1_, kTestMessage2); + thread_->Clear(&handler1_, kTestMessage2); InSequence in_seq; @@ -170,11 +187,89 @@ TEST_F(ThreadWrapperTest, ClearDestoroyed) { { MockMessageHandler handler; handler_ptr = &handler; - thread()->Post(&handler, kTestMessage1, NULL); + thread_->Post(&handler, kTestMessage1, NULL); } talk_base::MessageList removed; - thread()->Clear(handler_ptr, talk_base::MQID_ANY, &removed); + thread_->Clear(handler_ptr, talk_base::MQID_ANY, &removed); DCHECK_EQ(0U, removed.size()); } +// Verify that Send() calls handler synchronously when called on the +// same thread. +TEST_F(ThreadWrapperTest, SendSameThread) { + talk_base::MessageData* data = new talk_base::MessageData(); + + EXPECT_CALL(handler1_, OnMessage( + MatchMessage(&handler1_, kTestMessage1, data))) + .WillOnce(DeleteMessageData()); + thread_->Send(&handler1_, kTestMessage1, data); +} + +void InitializeWrapperForNewThread(talk_base::Thread** thread, + base::WaitableEvent* done_event) { + JingleThreadWrapper::EnsureForCurrentThread(); + JingleThreadWrapper::current()->set_send_allowed(true); + *thread = JingleThreadWrapper::current(); + done_event->Signal(); +} + +// Verify that Send() calls handler synchronously when called for a +// different thread. +TEST_F(ThreadWrapperTest, SendToOtherThread) { + JingleThreadWrapper::current()->set_send_allowed(true); + + base::Thread second_thread("JingleThreadWrapperTest"); + second_thread.Start(); + + base::WaitableEvent initialized_event(true, false); + talk_base::Thread* target; + second_thread.message_loop()->PostTask( + FROM_HERE, base::Bind(&InitializeWrapperForNewThread, + &target, &initialized_event)); + initialized_event.Wait(); + + ASSERT_TRUE(target != NULL); + + talk_base::MessageData* data = new talk_base::MessageData(); + + EXPECT_CALL(handler1_, OnMessage( + MatchMessage(&handler1_, kTestMessage1, data))) + .WillOnce(DeleteMessageData()); + target->Send(&handler1_, kTestMessage1, data); + + Mock::VerifyAndClearExpectations(&handler1_); +} + +// Verify that thread handles Send() while another Send() is +// pending. The test creates second thread and Send()s kTestMessage1 +// to that thread. kTestMessage1 handler calls PingMainThread() which +// tries to Send() kTestMessage2 to the main thread. +TEST_F(ThreadWrapperTest, SendDuringSend) { + JingleThreadWrapper::current()->set_send_allowed(true); + + base::Thread second_thread("JingleThreadWrapperTest"); + second_thread.Start(); + + base::WaitableEvent initialized_event(true, false); + talk_base::Thread* target; + second_thread.message_loop()->PostTask( + FROM_HERE, base::Bind(&InitializeWrapperForNewThread, + &target, &initialized_event)); + initialized_event.Wait(); + + ASSERT_TRUE(target != NULL); + + talk_base::MessageData* data = new talk_base::MessageData(); + + EXPECT_CALL(handler1_, OnMessage( + MatchMessage(&handler1_, kTestMessage1, data))) + .WillOnce(DoAll( + InvokeWithoutArgs( + this, &ThreadWrapperTest::PingMainThread), + DeleteMessageData())); + target->Send(&handler1_, kTestMessage1, data); + + Mock::VerifyAndClearExpectations(&handler1_); +} + } // namespace jingle_glue diff --git a/jingle/jingle.gyp b/jingle/jingle.gyp index a211893..d1ff340 100644 --- a/jingle/jingle.gyp +++ b/jingle/jingle.gyp @@ -23,6 +23,7 @@ ], 'dependencies': [ '../base/base.gyp:base', + '../base/third_party/dynamic_annotations/dynamic_annotations.gyp:dynamic_annotations', '../net/net.gyp:net', '../third_party/libjingle/libjingle.gyp:libjingle', '../third_party/libjingle/libjingle.gyp:libjingle_p2p', |