diff options
Diffstat (limited to 'jingle/glue')
-rw-r--r-- | jingle/glue/thread_wrapper.cc | 150 | ||||
-rw-r--r-- | jingle/glue/thread_wrapper.h | 68 | ||||
-rw-r--r-- | jingle/glue/thread_wrapper_unittest.cc | 195 |
3 files changed, 92 insertions, 321 deletions
diff --git a/jingle/glue/thread_wrapper.cc b/jingle/glue/thread_wrapper.cc index 036122b..979064b 100644 --- a/jingle/glue/thread_wrapper.cc +++ b/jingle/glue/thread_wrapper.cc @@ -4,46 +4,29 @@ #include "jingle/glue/thread_wrapper.h" -#include "base/lazy_instance.h" -#include "base/threading/thread_local.h" - namespace jingle_glue { -struct JingleThreadWrapper::PendingSend { - PendingSend(const talk_base::Message& message_value) - : sending_thread(JingleThreadWrapper::current()), - message(message_value), - done_event(true, false) { - DCHECK(sending_thread); - } - - JingleThreadWrapper* sending_thread; - talk_base::Message message; - base::WaitableEvent done_event; -}; - -base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> > - g_jingle_thread_wrapper(base::LINKER_INITIALIZED); - // static void JingleThreadWrapper::EnsureForCurrentThread() { - if (JingleThreadWrapper::current() == NULL) { - g_jingle_thread_wrapper.Get().Set( - new JingleThreadWrapper(MessageLoop::current())); + talk_base::Thread* current_thread = talk_base::Thread::Current(); + // If JingleThreadWrapper already exists for the current thread then + // it is returned by talk_base::Thread::Current(). + // talk_base::Thread::Current() may also return non-null value for + // the main thread because talk_base::ThreadManager creates + // talk_base::Thread object for it. IsOwned() allows to distinguish + // talk_base::Thread object created by talk_base::ThreadManager from + // other talk_base::Thread objects. Because talk_base::Thread + // objects should never created by chromium code, we can assume that + // if talk_base::Thread::Current() returns non-null value and it + // isn't the object created by talk_base::ThreadManager then + // JingleThreadWrapper already exists for the current thread. + if (current_thread == NULL || !current_thread->IsOwned()) { + new JingleThreadWrapper(MessageLoop::current()); } - - DCHECK_EQ(talk_base::Thread::Current(), current()); -} - -// static -JingleThreadWrapper* JingleThreadWrapper::current() { - return g_jingle_thread_wrapper.Get().Get(); } JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop) - : message_loop_(message_loop), - send_allowed_(false), - pending_send_event_(true, false) { + : message_loop_(message_loop) { DCHECK_EQ(message_loop_, MessageLoop::current()); talk_base::ThreadManager::SetCurrent(this); @@ -55,8 +38,6 @@ JingleThreadWrapper::~JingleThreadWrapper() { } void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { - DCHECK_EQ(talk_base::Thread::Current(), current()); - g_jingle_thread_wrapper.Get().Set(NULL); talk_base::ThreadManager::SetCurrent(NULL); talk_base::MessageQueueManager::Instance()->Remove(this); message_loop_->RemoveDestructionObserver(this); @@ -81,107 +62,18 @@ void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id, for (MessagesQueue::iterator it = messages_.begin(); it != messages_.end();) { - MessagesQueue::iterator next = it; - ++next; - if (it->second.Match(handler, id)) { if (removed) { removed->push_back(it->second); } else { delete it->second.pdata; } + MessagesQueue::iterator next = it; + ++next; messages_.erase(it); - } - - it = next; - } - - for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin(); - it != pending_send_messages_.end();) { - std::list<PendingSend*>::iterator next = it; - ++next; - - if ((*it)->message.Match(handler, id)) { - if (removed) { - removed ->push_back((*it)->message); - } else { - delete (*it)->message.pdata; - } - (*it)->done_event.Signal(); - pending_send_messages_.erase(it); - } - - it = next; - } -} - -void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id, - talk_base::MessageData *data) { - if (fStop_) - return; - - JingleThreadWrapper* current_thread = JingleThreadWrapper::current(); - DCHECK(current_thread != NULL) << "Send() can be called only from a " - "thread that has JingleThreadWrapper."; - - talk_base::Message message; - message.phandler = handler; - message.message_id = id; - message.pdata = data; - - if (current_thread == this) { - handler->OnMessage(&message); - return; - } - - // Send message from a thread different than |this|. - - // Allow inter-thread send only from threads that have - // |send_allowed_| flag set. - DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous " - "messages is not allowed from the current thread."; - - PendingSend pending_send(message); - { - base::AutoLock auto_lock(lock_); - pending_send_messages_.push_back(&pending_send); - } - - // Need to signal |pending_send_event_| here in case the thread is - // sending message to another thread. - pending_send_event_.Signal(); - message_loop_->PostTask(FROM_HERE, NewRunnableMethod( - this, &JingleThreadWrapper::ProcessPendingSends)); - - - while (!pending_send.done_event.IsSignaled()) { - base::WaitableEvent* events[] = {&pending_send.done_event, - ¤t_thread->pending_send_event_}; - size_t event = base::WaitableEvent::WaitMany(events, arraysize(events)); - DCHECK(event == 0 || event == 1); - - if (event == 1) - current_thread->ProcessPendingSends(); - } -} - -void JingleThreadWrapper::ProcessPendingSends() { - while (true) { - PendingSend* pending_send = NULL; - { - base::AutoLock auto_lock(lock_); - if (!pending_send_messages_.empty()) { - pending_send = pending_send_messages_.front(); - pending_send_messages_.pop_front(); - } else { - // Reset the event while |lock_| is still locked. - pending_send_event_.Reset(); - break; - } - } - if (pending_send) { - pending_send->message.phandler->OnMessage(&pending_send->message); - pending_send->done_event.Signal(); + it = next; + } else { + ++it; } } } @@ -258,7 +150,7 @@ void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*, NOTREACHED(); } -void JingleThreadWrapper::Dispatch(talk_base::Message* message) { +void JingleThreadWrapper::Dispatch(talk_base::Message* msg) { NOTREACHED(); } diff --git a/jingle/glue/thread_wrapper.h b/jingle/glue/thread_wrapper.h index 4707e21..5b5cf08 100644 --- a/jingle/glue/thread_wrapper.h +++ b/jingle/glue/thread_wrapper.h @@ -5,21 +5,21 @@ #ifndef JINGLE_GLUE_THREAD_WRAPPER_H_ #define JINGLE_GLUE_THREAD_WRAPPER_H_ -#include <list> #include <map> #include "base/message_loop.h" #include "base/synchronization/lock.h" -#include "base/synchronization/waitable_event.h" #include "third_party/libjingle/source/talk/base/thread.h" +class MessageLoop; + namespace jingle_glue { // JingleThreadWrapper wraps Chromium threads using talk_base::Thread -// interface. The object must be created by calling -// EnsureForCurrentThread(). Each JingleThreadWrapper deletes itself -// when MessageLoop is destroyed. Currently only the bare minimum that -// is used by P2P part of libjingle is implemented. +// interface. The object must be created on a thread it belongs +// to. Each JingleThreadWrapper deletes itself when MessageLoop is +// destroyed. Currently only the bare minimum that is used by P2P +// part of libjingle is implemented. class JingleThreadWrapper : public MessageLoop::DestructionObserver, public talk_base::Thread { @@ -28,38 +28,21 @@ class JingleThreadWrapper // been created yet. static void EnsureForCurrentThread(); - // Returns thread wrapper for the current thread. NULL is returned - // if EnsureForCurrentThread() has never been called for this - // thread. - static JingleThreadWrapper* current(); - JingleThreadWrapper(MessageLoop* message_loop); - // Sets whether the thread can be used to send messages - // synchronously to another thread using Send() method. Set to false - // by default to avoid potential jankiness when Send() used on - // renderer thread. It should be set explicitly for threads that - // need to call Send() for other threads. - void set_send_allowed(bool allowed) { send_allowed_ = allowed; } - // MessageLoop::DestructionObserver implementation. virtual void WillDestroyCurrentMessageLoop() OVERRIDE; // talk_base::MessageQueue overrides. - virtual void Post(talk_base::MessageHandler *phandler, - uint32 id, - talk_base::MessageData *pdata, - bool time_sensitive) OVERRIDE; - virtual void PostDelayed(int delay_ms, - talk_base::MessageHandler* handler, - uint32 id, - talk_base::MessageData* data) OVERRIDE; + virtual void Post(talk_base::MessageHandler *phandler, uint32 id = 0, + talk_base::MessageData *pdata = NULL, + bool time_sensitive = false) OVERRIDE; + virtual void PostDelayed( + int delay_ms, talk_base::MessageHandler* handler, uint32 id = 0, + talk_base::MessageData* data = NULL) OVERRIDE; virtual void Clear(talk_base::MessageHandler* handler, - uint32 id, - talk_base::MessageList* removed) OVERRIDE; - virtual void Send(talk_base::MessageHandler *handler, - uint32 id, - talk_base::MessageData *data) OVERRIDE; + uint32 id = talk_base::MQID_ANY, + talk_base::MessageList* removed = NULL) OVERRIDE; // Following methods are not supported.They are overriden just to // ensure that they are not called (each of them contain NOTREACHED @@ -68,16 +51,13 @@ class JingleThreadWrapper virtual void Quit() OVERRIDE; virtual bool IsQuitting() OVERRIDE; virtual void Restart() OVERRIDE; - virtual bool Get(talk_base::Message* message, - int delay_ms, - bool process_io) OVERRIDE; - virtual bool Peek(talk_base::Message* message, - int delay_ms) OVERRIDE; - virtual void PostAt(uint32 timestamp, - talk_base::MessageHandler* handler, - uint32 id, - talk_base::MessageData* data) OVERRIDE; - virtual void Dispatch(talk_base::Message* message) OVERRIDE; + virtual bool Get(talk_base::Message* msg, int delay_ms = talk_base::kForever, + bool process_io = true) OVERRIDE; + virtual bool Peek(talk_base::Message* msg, int delay_ms = 0) OVERRIDE; + virtual void PostAt( + uint32 timestamp, talk_base::MessageHandler* handler, + uint32 id = 0, talk_base::MessageData* data = NULL) OVERRIDE; + virtual void Dispatch(talk_base::Message* msg) OVERRIDE; virtual void ReceiveSends() OVERRIDE; virtual int GetDelay() OVERRIDE; @@ -87,7 +67,6 @@ class JingleThreadWrapper private: typedef std::map<int, talk_base::Message> MessagesQueue; - struct PendingSend; virtual ~JingleThreadWrapper(); @@ -95,19 +74,14 @@ class JingleThreadWrapper int delay_ms, talk_base::MessageHandler* handler, uint32 message_id, talk_base::MessageData* data); void RunTask(int task_id); - void ProcessPendingSends(); // Chromium thread used to execute messages posted on this thread. MessageLoop* message_loop_; - bool send_allowed_; - // |lock_| must be locked when accessing |messages_|. base::Lock lock_; int last_task_id_; MessagesQueue messages_; - std::list<PendingSend*> pending_send_messages_; - base::WaitableEvent pending_send_event_; }; } diff --git a/jingle/glue/thread_wrapper_unittest.cc b/jingle/glue/thread_wrapper_unittest.cc index 87ef510..20f3648 100644 --- a/jingle/glue/thread_wrapper_unittest.cc +++ b/jingle/glue/thread_wrapper_unittest.cc @@ -2,17 +2,12 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "base/bind.h" #include "base/message_loop.h" -#include "base/threading/thread.h" #include "jingle/glue/thread_wrapper.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" -using ::testing::DoAll; using ::testing::InSequence; -using ::testing::InvokeWithoutArgs; -using ::testing::Mock; namespace jingle_glue { @@ -30,100 +25,88 @@ class MockMessageHandler : public talk_base::MessageHandler { MOCK_METHOD1(OnMessage, void(talk_base::Message* msg)); }; -MATCHER_P3(MatchMessage, handler, message_id, data, "") { - return arg->phandler == handler && - arg->message_id == message_id && - arg->pdata == data; -} - -ACTION(DeleteMessageData) { - delete arg0->pdata; -} - class ThreadWrapperTest : public testing::Test { - public: - // This method is used by the SendDuringSend test. It sends message to the - // main thread synchronously using Send(). - void PingMainThread() { - talk_base::MessageData* data = new talk_base::MessageData(); - MockMessageHandler handler; - - EXPECT_CALL(handler, OnMessage( - MatchMessage(&handler, kTestMessage2, data))) - .WillOnce(DeleteMessageData()); - thread_->Send(&handler, kTestMessage2, data); + protected: + ThreadWrapperTest() { } - protected: - ThreadWrapperTest() - : thread_(NULL) { + talk_base::Thread* thread() { + return talk_base::Thread::Current(); } virtual void SetUp() OVERRIDE { JingleThreadWrapper::EnsureForCurrentThread(); - thread_ = talk_base::Thread::Current(); } // ThreadWrapper destroyes itself when |message_loop_| is destroyed. MessageLoop message_loop_; - talk_base::Thread* thread_; MockMessageHandler handler1_; MockMessageHandler handler2_; }; +MATCHER_P3(MatchMessage, handler, message_id, data, "") { + return arg->phandler == handler && + arg->message_id == message_id && + arg->pdata == data; +} + +ACTION(DeleteMessageData) { + delete arg0->pdata; +} + TEST_F(ThreadWrapperTest, Post) { - talk_base::MessageData* data1 = new talk_base::MessageData(); - talk_base::MessageData* data2 = new talk_base::MessageData(); - talk_base::MessageData* data3 = new talk_base::MessageData(); - talk_base::MessageData* data4 = new talk_base::MessageData(); + talk_base::MessageData* data1_ = new talk_base::MessageData(); + talk_base::MessageData* data2_ = new talk_base::MessageData(); + talk_base::MessageData* data3_ = new talk_base::MessageData(); + talk_base::MessageData* data4_ = new talk_base::MessageData(); - thread_->Post(&handler1_, kTestMessage1, data1); - thread_->Post(&handler1_, kTestMessage2, data2); - thread_->Post(&handler2_, kTestMessage1, data3); - thread_->Post(&handler2_, kTestMessage1, data4); + thread()->Post(&handler1_, kTestMessage1, data1_); + thread()->Post(&handler1_, kTestMessage2, data2_); + thread()->Post(&handler2_, kTestMessage1, data3_); + thread()->Post(&handler2_, kTestMessage1, data4_); InSequence in_seq; EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data1))) + MatchMessage(&handler1_, kTestMessage1, data1_))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage2, data2))) + MatchMessage(&handler1_, kTestMessage2, data2_))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data3))) + MatchMessage(&handler2_, kTestMessage1, data3_))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data4))) + MatchMessage(&handler2_, kTestMessage1, data4_))) .WillOnce(DeleteMessageData()); message_loop_.RunAllPending(); } TEST_F(ThreadWrapperTest, PostDelayed) { - talk_base::MessageData* data1 = new talk_base::MessageData(); - talk_base::MessageData* data2 = new talk_base::MessageData(); - talk_base::MessageData* data3 = new talk_base::MessageData(); - talk_base::MessageData* data4 = new talk_base::MessageData(); + talk_base::MessageData* data1_ = new talk_base::MessageData(); + talk_base::MessageData* data2_ = new talk_base::MessageData(); + talk_base::MessageData* data3_ = new talk_base::MessageData(); + talk_base::MessageData* data4_ = new talk_base::MessageData(); - thread_->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, data1); - thread_->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, data2); - thread_->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, data3); - thread_->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, data4); + thread()->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, data1_); + thread()->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, data2_); + thread()->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, data3_); + thread()->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, data4_); InSequence in_seq; EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data1))) + MatchMessage(&handler1_, kTestMessage1, data1_))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage2, data2))) + MatchMessage(&handler1_, kTestMessage2, data2_))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data3))) + MatchMessage(&handler2_, kTestMessage1, data3_))) .WillOnce(DeleteMessageData()); EXPECT_CALL(handler2_, OnMessage( - MatchMessage(&handler2_, kTestMessage1, data4))) + MatchMessage(&handler2_, kTestMessage1, data4_))) .WillOnce(DeleteMessageData()); message_loop_.PostDelayedTask(FROM_HERE, new MessageLoop::QuitTask(), @@ -132,12 +115,12 @@ TEST_F(ThreadWrapperTest, PostDelayed) { } TEST_F(ThreadWrapperTest, Clear) { - thread_->Post(&handler1_, kTestMessage1, NULL); - thread_->Post(&handler1_, kTestMessage2, NULL); - thread_->Post(&handler2_, kTestMessage1, NULL); - thread_->Post(&handler2_, kTestMessage2, NULL); + thread()->Post(&handler1_, kTestMessage1, NULL); + thread()->Post(&handler1_, kTestMessage2, NULL); + thread()->Post(&handler2_, kTestMessage1, NULL); + thread()->Post(&handler2_, kTestMessage2, NULL); - thread_->Clear(&handler1_, kTestMessage2); + thread()->Clear(&handler1_, kTestMessage2); InSequence in_seq; @@ -156,12 +139,12 @@ TEST_F(ThreadWrapperTest, Clear) { } TEST_F(ThreadWrapperTest, ClearDelayed) { - thread_->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, NULL); - thread_->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, NULL); - thread_->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, NULL); - thread_->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, NULL); + thread()->PostDelayed(kTestDelayMs1, &handler1_, kTestMessage1, NULL); + thread()->PostDelayed(kTestDelayMs2, &handler1_, kTestMessage2, NULL); + thread()->PostDelayed(kTestDelayMs3, &handler2_, kTestMessage1, NULL); + thread()->PostDelayed(kTestDelayMs4, &handler2_, kTestMessage1, NULL); - thread_->Clear(&handler1_, kTestMessage2); + thread()->Clear(&handler1_, kTestMessage2); InSequence in_seq; @@ -187,89 +170,11 @@ TEST_F(ThreadWrapperTest, ClearDestoroyed) { { MockMessageHandler handler; handler_ptr = &handler; - thread_->Post(&handler, kTestMessage1, NULL); + thread()->Post(&handler, kTestMessage1, NULL); } talk_base::MessageList removed; - thread_->Clear(handler_ptr, talk_base::MQID_ANY, &removed); + thread()->Clear(handler_ptr, talk_base::MQID_ANY, &removed); DCHECK_EQ(0U, removed.size()); } -// Verify that Send() calls handler synchronously when called on the -// same thread. -TEST_F(ThreadWrapperTest, SendSameThread) { - talk_base::MessageData* data = new talk_base::MessageData(); - - EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data))) - .WillOnce(DeleteMessageData()); - thread_->Send(&handler1_, kTestMessage1, data); -} - -void InitializeWrapperForNewThread(talk_base::Thread** thread, - base::WaitableEvent* done_event) { - JingleThreadWrapper::EnsureForCurrentThread(); - JingleThreadWrapper::current()->set_send_allowed(true); - *thread = JingleThreadWrapper::current(); - done_event->Signal(); -} - -// Verify that Send() calls handler synchronously when called for a -// different thread. -TEST_F(ThreadWrapperTest, SendToOtherThread) { - JingleThreadWrapper::current()->set_send_allowed(true); - - base::Thread second_thread("JingleThreadWrapperTest"); - second_thread.Start(); - - base::WaitableEvent initialized_event(true, false); - talk_base::Thread* target; - second_thread.message_loop()->PostTask( - FROM_HERE, base::Bind(&InitializeWrapperForNewThread, - &target, &initialized_event)); - initialized_event.Wait(); - - ASSERT_TRUE(target != NULL); - - talk_base::MessageData* data = new talk_base::MessageData(); - - EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data))) - .WillOnce(DeleteMessageData()); - target->Send(&handler1_, kTestMessage1, data); - - Mock::VerifyAndClearExpectations(&handler1_); -} - -// Verify that thread handles Send() while another Send() is -// pending. The test creates second thread and Send()s kTestMessage1 -// to that thread. kTestMessage1 handler calls PingMainThread() which -// tries to Send() kTestMessage2 to the main thread. -TEST_F(ThreadWrapperTest, SendDuringSend) { - JingleThreadWrapper::current()->set_send_allowed(true); - - base::Thread second_thread("JingleThreadWrapperTest"); - second_thread.Start(); - - base::WaitableEvent initialized_event(true, false); - talk_base::Thread* target; - second_thread.message_loop()->PostTask( - FROM_HERE, base::Bind(&InitializeWrapperForNewThread, - &target, &initialized_event)); - initialized_event.Wait(); - - ASSERT_TRUE(target != NULL); - - talk_base::MessageData* data = new talk_base::MessageData(); - - EXPECT_CALL(handler1_, OnMessage( - MatchMessage(&handler1_, kTestMessage1, data))) - .WillOnce(DoAll( - InvokeWithoutArgs( - this, &ThreadWrapperTest::PingMainThread), - DeleteMessageData())); - target->Send(&handler1_, kTestMessage1, data); - - Mock::VerifyAndClearExpectations(&handler1_); -} - } // namespace jingle_glue |