diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-02 23:32:22 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-02 23:32:22 +0000 |
commit | 1d874a809da9edd362288409233a842bee7c94a9 (patch) | |
tree | 4a9fcccee24a211bc156b54519dfee5aa9e83eae /chrome/common | |
parent | 0717207534670b8e4ca8e33244baf46371b7d851 (diff) | |
download | chromium_src-1d874a809da9edd362288409233a842bee7c94a9.zip chromium_src-1d874a809da9edd362288409233a842bee7c94a9.tar.gz chromium_src-1d874a809da9edd362288409233a842bee7c94a9.tar.bz2 |
Refactored MediatorThread to use Chrome threads primarily. This is in preparation for using Chrome sockets.
Made TalkMediator explicitly non-thread-safe.
BUG=45612
TEST=manual testing of notifications
Review URL: http://codereview.chromium.org/2471006
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@48791 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/common')
6 files changed, 272 insertions, 248 deletions
diff --git a/chrome/common/net/notifier/communicator/login_failure.h b/chrome/common/net/notifier/communicator/login_failure.h index 89c629b..05505ce 100644 --- a/chrome/common/net/notifier/communicator/login_failure.h +++ b/chrome/common/net/notifier/communicator/login_failure.h @@ -5,7 +5,6 @@ #ifndef CHROME_COMMON_NET_NOTIFIER_COMMUNICATOR_LOGIN_FAILURE_H_ #define CHROME_COMMON_NET_NOTIFIER_COMMUNICATOR_LOGIN_FAILURE_H_ -#include "talk/base/common.h" #include "talk/xmpp/xmppengine.h" namespace notifier { @@ -41,11 +40,9 @@ class LoginFailure { buzz::XmppEngine::Error xmpp_error() const; private: - LoginError error_; - buzz::XmppEngine::Error xmpp_error_; - int subcode_; - - DISALLOW_COPY_AND_ASSIGN(LoginFailure); + const LoginError error_; + const buzz::XmppEngine::Error xmpp_error_; + const int subcode_; }; } // namespace notifier diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.cc b/chrome/common/net/notifier/listener/mediator_thread_impl.cc index f13dd25..ac88983 100644 --- a/chrome/common/net/notifier/listener/mediator_thread_impl.cc +++ b/chrome/common/net/notifier/listener/mediator_thread_impl.cc @@ -6,7 +6,7 @@ #include "base/logging.h" #include "base/message_loop.h" -#include "base/platform_thread.h" +#include "base/task.h" #include "chrome/common/net/network_change_notifier_proxy.h" #include "chrome/common/net/notifier/base/task_pump.h" #include "chrome/common/net/notifier/communicator/connection_options.h" @@ -17,11 +17,17 @@ #include "chrome/common/net/notifier/listener/subscribe_task.h" #include "net/base/host_port_pair.h" #include "net/base/host_resolver.h" +#include "talk/base/physicalsocketserver.h" #include "talk/base/thread.h" #include "talk/xmpp/xmppclient.h" #include "talk/xmpp/xmppclientsettings.h" -using std::string; +// We manage the lifetime of notifier::MediatorThreadImpl ourselves. +template <> +struct RunnableMethodTraits<notifier::MediatorThreadImpl> { + void RetainCallee(notifier::MediatorThreadImpl*) {} + void ReleaseCallee(notifier::MediatorThreadImpl*) {} +}; namespace notifier { @@ -29,111 +35,123 @@ MediatorThreadImpl::MediatorThreadImpl( chrome_common_net::NetworkChangeNotifierThread* network_change_notifier_thread) : delegate_(NULL), - network_change_notifier_thread_(network_change_notifier_thread) { + parent_message_loop_(MessageLoop::current()), + network_change_notifier_thread_(network_change_notifier_thread), + worker_thread_("MediatorThread worker thread") { + DCHECK(parent_message_loop_); DCHECK(network_change_notifier_thread_); } MediatorThreadImpl::~MediatorThreadImpl() { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); } void MediatorThreadImpl::SetDelegate(Delegate* delegate) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); delegate_ = delegate; } void MediatorThreadImpl::Start() { - talk_base::Thread::Start(); + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + // We create the worker thread as an IO thread in preparation for + // making this use Chrome sockets. + const base::Thread::Options options(MessageLoop::TYPE_IO, 0); + // TODO(akalin): Make this function return a bool and remove this + // CHECK(). + CHECK(worker_thread_.StartWithOptions(options)); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::StartLibjingleThread)); } -void MediatorThreadImpl::Run() { - PlatformThread::SetName("Notifier_MediatorThread"); - MessageLoop message_loop; - Post(this, CMD_PUMP_AUXILIARY_LOOPS); - ProcessMessages(talk_base::kForever); +void MediatorThreadImpl::StartLibjingleThread() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + socket_server_.reset(new talk_base::PhysicalSocketServer()); + libjingle_thread_.reset(new talk_base::Thread()); + talk_base::ThreadManager::SetCurrent(libjingle_thread_.get()); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop)); } -void MediatorThreadImpl::PumpAuxiliaryLoops() { - MessageLoop::current()->RunAllPending(); - // We want to pump auxiliary loops every 100ms until this thread is stopped, - // at which point this call will do nothing. - PostDelayed(100, this, CMD_PUMP_AUXILIARY_LOOPS); +void MediatorThreadImpl::StopLibjingleThread() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + talk_base::ThreadManager::SetCurrent(NULL); + libjingle_thread_.reset(); + socket_server_.reset(); } -void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { - Post(this, CMD_LOGIN, new LoginData(settings)); +void MediatorThreadImpl::PumpLibjingleLoop() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + // Pump the libjingle message loop 100ms at a time. + if (!libjingle_thread_.get()) { + // StopLibjingleThread() was called. + return; + } + libjingle_thread_->ProcessMessages(100); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop)); } -void MediatorThreadImpl::Stop() { - Thread::Stop(); - CHECK(!login_.get() && !network_change_notifier_.get() && !pump_.get()) - << "Logout should be called prior to message queue exit."; +void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings)); } void MediatorThreadImpl::Logout() { - CHECK(!IsQuitting()) - << "Logout should be called prior to message queue exit."; - Post(this, CMD_DISCONNECT); - Stop(); + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect)); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::StopLibjingleThread)); + // TODO(akalin): Decomp this into a separate stop method. + worker_thread_.Stop(); + // Process any messages the worker thread may be posted on our + // thread. + bool old_state = parent_message_loop_->NestableTasksAllowed(); + parent_message_loop_->SetNestableTasksAllowed(true); + parent_message_loop_->RunAllPending(); + parent_message_loop_->SetNestableTasksAllowed(old_state); + // worker_thread_ should have cleaned all this up. + CHECK(!login_.get()); + CHECK(!network_change_notifier_.get()); + CHECK(!pump_.get()); } void MediatorThreadImpl::ListenForUpdates() { - Post(this, CMD_LISTEN_FOR_UPDATES); + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::DoListenForUpdates)); } void MediatorThreadImpl::SubscribeForUpdates( const std::vector<std::string>& subscribed_services_list) { - Post(this, CMD_SUBSCRIBE_FOR_UPDATES, - new SubscriptionData(subscribed_services_list)); + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::DoSubscribeForUpdates, + subscribed_services_list)); } void MediatorThreadImpl::SendNotification( const OutgoingNotificationData& data) { - Post(this, CMD_SEND_NOTIFICATION, new OutgoingNotificationMessageData(data)); -} - -void MediatorThreadImpl::ProcessMessages(int milliseconds) { - talk_base::Thread::ProcessMessages(milliseconds); + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification, + data)); } -void MediatorThreadImpl::OnMessage(talk_base::Message* msg) { - scoped_ptr<LoginData> data; - switch (msg->message_id) { - case CMD_LOGIN: - DCHECK(msg->pdata); - data.reset(reinterpret_cast<LoginData*>(msg->pdata)); - DoLogin(data.get()); - break; - case CMD_DISCONNECT: - DoDisconnect(); - break; - case CMD_LISTEN_FOR_UPDATES: - DoListenForUpdates(); - break; - case CMD_SEND_NOTIFICATION: { - DCHECK(msg->pdata); - scoped_ptr<OutgoingNotificationMessageData> data( - reinterpret_cast<OutgoingNotificationMessageData*>(msg->pdata)); - DoSendNotification(*data); - break; - } - case CMD_SUBSCRIBE_FOR_UPDATES: { - DCHECK(msg->pdata); - scoped_ptr<SubscriptionData> subscription_data( - reinterpret_cast<SubscriptionData*>(msg->pdata)); - DoSubscribeForUpdates(*subscription_data); - break; - } - case CMD_PUMP_AUXILIARY_LOOPS: - PumpAuxiliaryLoops(); - break; - default: - LOG(ERROR) << "P2P: Someone passed a bad message to the thread."; - break; - } -} - -void MediatorThreadImpl::DoLogin(LoginData* login_data) { +void MediatorThreadImpl::DoLogin( + const buzz::XmppClientSettings& settings) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); LOG(INFO) << "P2P: Thread logging into talk network."; - buzz::XmppClientSettings& user_settings = login_data->user_settings; network_change_notifier_.reset( new chrome_common_net::NetworkChangeNotifierProxy( @@ -164,7 +182,7 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) { // Language is not used in the stanza so we default to |en|. std::string lang = "en"; login_.reset(new notifier::Login(pump_.get(), - user_settings, + settings, options, lang, host_resolver_.get(), @@ -189,17 +207,8 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) { login_->StartConnection(); } -void MediatorThreadImpl::OnInputDebug(const char* msg, int length) { - string output(msg, length); - LOG(INFO) << "P2P: OnInputDebug:" << output << "."; -} - -void MediatorThreadImpl::OnOutputDebug(const char* msg, int length) { - string output(msg, length); - LOG(INFO) << "P2P: OnOutputDebug:" << output << "."; -} - void MediatorThreadImpl::DoDisconnect() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); LOG(INFO) << "P2P: Thread logging out of talk network."; login_.reset(); // Delete the old pump while on the thread to ensure that everything is @@ -211,14 +220,10 @@ void MediatorThreadImpl::DoDisconnect() { } void MediatorThreadImpl::DoSubscribeForUpdates( - const SubscriptionData& subscription_data) { - buzz::XmppClient* client = xmpp_client(); - // If there isn't an active xmpp client, return. - if (!client) { - return; - } + const std::vector<std::string>& subscribed_services_list) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); SubscribeTask* subscription = - new SubscribeTask(client, subscription_data.subscribed_services_list); + new SubscribeTask(xmpp_client(), subscribed_services_list); subscription->SignalStatusUpdate.connect( this, &MediatorThreadImpl::OnSubscriptionStateChange); @@ -226,40 +231,56 @@ void MediatorThreadImpl::DoSubscribeForUpdates( } void MediatorThreadImpl::DoListenForUpdates() { - buzz::XmppClient* client = xmpp_client(); - // If there isn't an active xmpp client, return. - if (!client) { - return; - } - ListenTask* listener = new ListenTask(client); + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + ListenTask* listener = new ListenTask(xmpp_client()); listener->SignalUpdateAvailable.connect( this, - &MediatorThreadImpl::OnUpdateListenerMessage); + &MediatorThreadImpl::OnIncomingNotification); listener->Start(); } void MediatorThreadImpl::DoSendNotification( - const OutgoingNotificationMessageData& data) { - buzz::XmppClient* client = xmpp_client(); - // If there isn't an active xmpp client, return. - if (!client) { - return; - } - SendUpdateTask* task = new SendUpdateTask(client, data.notification_data); + const OutgoingNotificationData& data) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + SendUpdateTask* task = new SendUpdateTask(xmpp_client(), data); task->SignalStatusUpdate.connect( this, - &MediatorThreadImpl::OnUpdateNotificationSent); + &MediatorThreadImpl::OnOutgoingNotification); task->Start(); } -void MediatorThreadImpl::OnUpdateListenerMessage( +void MediatorThreadImpl::OnIncomingNotification( const IncomingNotificationData& notification_data) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &MediatorThreadImpl::OnIncomingNotificationOnParentThread, + notification_data)); +} + +void MediatorThreadImpl::OnIncomingNotificationOnParentThread( + const IncomingNotificationData& notification_data) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); if (delegate_) { delegate_->OnIncomingNotification(notification_data); } } -void MediatorThreadImpl::OnUpdateNotificationSent(bool success) { +void MediatorThreadImpl::OnOutgoingNotification(bool success) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &MediatorThreadImpl::OnOutgoingNotificationOnParentThread, + success)); +} + +void MediatorThreadImpl::OnOutgoingNotificationOnParentThread( + bool success) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); if (delegate_ && success) { delegate_->OnOutgoingNotification(); } @@ -267,6 +288,18 @@ void MediatorThreadImpl::OnUpdateNotificationSent(bool success) { void MediatorThreadImpl::OnLoginFailureMessage( const notifier::LoginFailure& failure) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &MediatorThreadImpl::OnLoginFailureMessageOnParentThread, + failure)); +} + +void MediatorThreadImpl::OnLoginFailureMessageOnParentThread( + const notifier::LoginFailure& failure) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); if (delegate_) { delegate_->OnConnectionStateChange(false); } @@ -274,6 +307,18 @@ void MediatorThreadImpl::OnLoginFailureMessage( void MediatorThreadImpl::OnClientStateChangeMessage( LoginConnectionState state) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &MediatorThreadImpl::OnClientStateChangeMessageOnParentThread, + state)); +} + +void MediatorThreadImpl::OnClientStateChangeMessageOnParentThread( + LoginConnectionState state) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); switch (state) { case STATE_CLOSED: if (delegate_) { @@ -301,16 +346,39 @@ void MediatorThreadImpl::OnClientStateChangeMessage( } void MediatorThreadImpl::OnSubscriptionStateChange(bool success) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread, + success)); +} + +void MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread( + bool success) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); if (delegate_) { delegate_->OnSubscriptionStateChange(success); } } +MessageLoop* MediatorThreadImpl::worker_message_loop() { + MessageLoop* current_message_loop = MessageLoop::current(); + DCHECK(current_message_loop); + MessageLoop* worker_message_loop = worker_thread_.message_loop(); + DCHECK(worker_message_loop); + DCHECK(current_message_loop == parent_message_loop_ || + current_message_loop == worker_message_loop); + return worker_message_loop; +} + buzz::XmppClient* MediatorThreadImpl::xmpp_client() { - if (!login_.get()) { - return NULL; - } - return login_->xmpp_client(); + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(login_.get()); + buzz::XmppClient* xmpp_client = login_->xmpp_client(); + DCHECK(xmpp_client); + return xmpp_client; } } // namespace notifier diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.h b/chrome/common/net/notifier/listener/mediator_thread_impl.h index 0a35051..30683e8 100644 --- a/chrome/common/net/notifier/listener/mediator_thread_impl.h +++ b/chrome/common/net/notifier/listener/mediator_thread_impl.h @@ -26,13 +26,20 @@ #include "base/logging.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" +#include "base/thread.h" #include "chrome/common/net/notifier/communicator/login.h" +#include "chrome/common/net/notifier/communicator/login_connection_state.h" #include "chrome/common/net/notifier/communicator/login_failure.h" #include "chrome/common/net/notifier/listener/mediator_thread.h" #include "talk/base/sigslot.h" -#include "talk/base/thread.h" #include "talk/xmpp/xmppclientsettings.h" +class MessageLoop; + +namespace buzz { +class XmppClient; +} // namespace buzz + namespace chrome_common_net { class NetworkChangeNotifierThread; } // namespace chrome_common_net @@ -46,64 +53,16 @@ namespace notifier { class TaskPump; } // namespace notifier -namespace buzz { -class XmppClient; -} // namespace buzz - namespace talk_base { class SocketServer; +class Thread; } // namespace talk_base namespace notifier { -enum MEDIATOR_CMD { - CMD_LOGIN, - CMD_DISCONNECT, - CMD_LISTEN_FOR_UPDATES, - CMD_SEND_NOTIFICATION, - CMD_SUBSCRIBE_FOR_UPDATES, - CMD_PUMP_AUXILIARY_LOOPS, -}; - -// Used to pass authentication information from the mediator to the thread. -// Use new to allocate it on the heap, the thread will delete it for you. -struct LoginData : public talk_base::MessageData { - explicit LoginData(const buzz::XmppClientSettings& settings) - : user_settings(settings) { - } - virtual ~LoginData() {} - - buzz::XmppClientSettings user_settings; -}; - -// Used to pass subscription information from the mediator to the thread. -// Use new to allocate it on the heap, the thread will delete it for you. -struct SubscriptionData : public talk_base::MessageData { - explicit SubscriptionData(const std::vector<std::string>& services) - : subscribed_services_list(services) { - } - virtual ~SubscriptionData() {} - - std::vector<std::string> subscribed_services_list; -}; - -// Used to pass outgoing notification information from the mediator to the -// thread. Use new to allocate it on the heap, the thread will delete it -// for you. -struct OutgoingNotificationMessageData : public talk_base::MessageData { - explicit OutgoingNotificationMessageData( - const OutgoingNotificationData& data) : notification_data(data) { - } - virtual ~OutgoingNotificationMessageData() {} - - OutgoingNotificationData notification_data; -}; - class MediatorThreadImpl : public MediatorThread, - public sigslot::has_slots<>, - public talk_base::MessageHandler, - public talk_base::Thread { + public sigslot::has_slots<> { public: explicit MediatorThreadImpl( chrome_common_net::NetworkChangeNotifierThread* @@ -114,47 +73,58 @@ class MediatorThreadImpl // Start the thread. virtual void Start(); - virtual void Stop(); - virtual void Run(); // These are called from outside threads, by the talk mediator object. // They add messages to a queue which we poll in this thread. - void Login(const buzz::XmppClientSettings& settings); - void Logout(); - void ListenForUpdates(); - void SubscribeForUpdates( + virtual void Login(const buzz::XmppClientSettings& settings); + virtual void Logout(); + virtual void ListenForUpdates(); + virtual void SubscribeForUpdates( const std::vector<std::string>& subscribed_services_list); - void SendNotification(const OutgoingNotificationData& data); - void LogStanzas(); + virtual void SendNotification(const OutgoingNotificationData& data); private: + void StartLibjingleThread(); + void PumpLibjingleLoop(); + void StopLibjingleThread(); + // Called from within the thread on internal events. - void ProcessMessages(int cms); - void OnMessage(talk_base::Message* msg); - void DoLogin(LoginData* login_data); + void DoLogin(const buzz::XmppClientSettings& settings); void DoDisconnect(); - void DoSubscribeForUpdates(const SubscriptionData& subscription_data); + void DoSubscribeForUpdates( + const std::vector<std::string>& subscribed_services_list); void DoListenForUpdates(); void DoSendNotification( - const OutgoingNotificationMessageData& data); - void DoStanzaLogging(); - void PumpAuxiliaryLoops(); + const OutgoingNotificationData& data); - // These handle messages indicating an event happened in the outside world. - void OnUpdateListenerMessage( + // These handle messages indicating an event happened in the outside + // world. These are all called from the worker thread. + void OnIncomingNotification( const IncomingNotificationData& notification_data); - void OnUpdateNotificationSent(bool success); + void OnOutgoingNotification(bool success); void OnLoginFailureMessage(const notifier::LoginFailure& failure); void OnClientStateChangeMessage(LoginConnectionState state); void OnSubscriptionStateChange(bool success); - void OnInputDebug(const char* msg, int length); - void OnOutputDebug(const char* msg, int length); + // Equivalents of the above functions called from the parent thread. + void OnIncomingNotificationOnParentThread( + const IncomingNotificationData& notification_data); + void OnOutgoingNotificationOnParentThread(bool success); + void OnLoginFailureMessageOnParentThread( + const notifier::LoginFailure& failure); + void OnClientStateChangeMessageOnParentThread( + LoginConnectionState state); + void OnSubscriptionStateChangeOnParentThread( + bool success); + + MessageLoop* worker_message_loop(); buzz::XmppClient* xmpp_client(); Delegate* delegate_; + MessageLoop* parent_message_loop_; chrome_common_net::NetworkChangeNotifierThread* network_change_notifier_thread_; + base::Thread worker_thread_; scoped_ptr<net::NetworkChangeNotifier> network_change_notifier_; scoped_refptr<net::HostResolver> host_resolver_; @@ -164,6 +134,10 @@ class MediatorThreadImpl // complete or the pump shuts down. scoped_ptr<notifier::TaskPump> pump_; scoped_ptr<notifier::Login> login_; + + scoped_ptr<talk_base::SocketServer> socket_server_; + scoped_ptr<talk_base::Thread> libjingle_thread_; + DISALLOW_COPY_AND_ASSIGN(MediatorThreadImpl); }; diff --git a/chrome/common/net/notifier/listener/talk_mediator_impl.cc b/chrome/common/net/notifier/listener/talk_mediator_impl.cc index fbf9da7..2e977fc 100644 --- a/chrome/common/net/notifier/listener/talk_mediator_impl.cc +++ b/chrome/common/net/notifier/listener/talk_mediator_impl.cc @@ -46,6 +46,7 @@ TalkMediatorImpl::TalkMediatorImpl( mediator_thread_( new MediatorThreadImpl(network_change_notifier_thread)), invalidate_xmpp_auth_token_(invalidate_xmpp_auth_token) { + DCHECK(non_thread_safe_.CalledOnValidThread()); // Ensure the SSL library is initialized. SslInitializationSingleton::GetInstance()->RegisterClient(); @@ -57,11 +58,13 @@ TalkMediatorImpl::TalkMediatorImpl(MediatorThread *thread) : delegate_(NULL), mediator_thread_(thread), invalidate_xmpp_auth_token_(false) { + DCHECK(non_thread_safe_.CalledOnValidThread()); // When testing we do not initialize the SSL library. TalkMediatorInitialization(true); } void TalkMediatorImpl::TalkMediatorInitialization(bool should_connect) { + DCHECK(non_thread_safe_.CalledOnValidThread()); if (should_connect) { mediator_thread_->SetDelegate(this); state_.connected = 1; @@ -71,76 +74,64 @@ void TalkMediatorImpl::TalkMediatorInitialization(bool should_connect) { } TalkMediatorImpl::~TalkMediatorImpl() { + DCHECK(non_thread_safe_.CalledOnValidThread()); if (state_.started) { Logout(); } } bool TalkMediatorImpl::Login() { - bool should_log_in = false; - buzz::XmppClientSettings xmpp_settings; - { - AutoLock lock(mutex_); - // Connect to the mediator thread and start processing messages. - if (!state_.connected) { - mediator_thread_->SetDelegate(this); - state_.connected = 1; - } - should_log_in = - state_.initialized && !state_.logging_in && !state_.logged_in; - state_.logging_in = should_log_in; - xmpp_settings = xmpp_settings_; + DCHECK(non_thread_safe_.CalledOnValidThread()); + // Connect to the mediator thread and start processing messages. + if (!state_.connected) { + mediator_thread_->SetDelegate(this); + state_.connected = 1; } - if (should_log_in) { - mediator_thread_->Login(xmpp_settings); + if (state_.initialized && !state_.logging_in && !state_.logged_in) { + state_.logging_in = true; + mediator_thread_->Login(xmpp_settings_); + return true; } - return should_log_in; + return false; } bool TalkMediatorImpl::Logout() { - bool logging_out = false; - { - AutoLock lock(mutex_); - if (state_.connected) { - state_.connected = 0; - } - logging_out = state_.started; - if (logging_out) { - state_.started = 0; - state_.logging_in = 0; - state_.logged_in = 0; - state_.subscribed = 0; - } + DCHECK(non_thread_safe_.CalledOnValidThread()); + if (state_.connected) { + state_.connected = 0; } - if (logging_out) { + if (state_.started) { + state_.started = 0; + state_.logging_in = 0; + state_.logged_in = 0; + state_.subscribed = 0; // We do not want to be called back during logout since we may be // closing. mediator_thread_->SetDelegate(NULL); mediator_thread_->Logout(); + return true; } - return logging_out; + return false; } bool TalkMediatorImpl::SendNotification(const OutgoingNotificationData& data) { - bool can_send_notification = false; - { - AutoLock lock(mutex_); - can_send_notification = state_.logged_in && state_.subscribed; - } - if (can_send_notification) { + DCHECK(non_thread_safe_.CalledOnValidThread()); + if (state_.logged_in && state_.subscribed) { mediator_thread_->SendNotification(data); + return true; } - return can_send_notification; + return false; } void TalkMediatorImpl::SetDelegate(TalkMediator::Delegate* delegate) { + DCHECK(non_thread_safe_.CalledOnValidThread()); delegate_ = delegate; } bool TalkMediatorImpl::SetAuthToken(const std::string& email, const std::string& token, const std::string& token_service) { - AutoLock lock(mutex_); + DCHECK(non_thread_safe_.CalledOnValidThread()); // Verify that we can create a JID from the email provided. buzz::Jid jid = buzz::Jid(email); @@ -163,13 +154,9 @@ bool TalkMediatorImpl::SetAuthToken(const std::string& email, void TalkMediatorImpl::AddSubscribedServiceUrl( const std::string& service_url) { - bool logged_in = false; - { - AutoLock lock(mutex_); - subscribed_services_list_.push_back(service_url); - logged_in = state_.logged_in; - } - if (logged_in) { + DCHECK(non_thread_safe_.CalledOnValidThread()); + subscribed_services_list_.push_back(service_url); + if (state_.logged_in) { LOG(INFO) << "Resubscribing for updates, a new service got added"; mediator_thread_->SubscribeForUpdates(subscribed_services_list_); } @@ -177,11 +164,9 @@ void TalkMediatorImpl::AddSubscribedServiceUrl( void TalkMediatorImpl::OnConnectionStateChange(bool logged_in) { - { - AutoLock lock(mutex_); - state_.logging_in = 0; - state_.logged_in = logged_in; - } + DCHECK(non_thread_safe_.CalledOnValidThread()); + state_.logging_in = 0; + state_.logged_in = logged_in; if (logged_in) { LOG(INFO) << "P2P: Logged in."; // ListenForUpdates enables the ListenTask. This is done before @@ -196,10 +181,8 @@ void TalkMediatorImpl::OnConnectionStateChange(bool logged_in) { } void TalkMediatorImpl::OnSubscriptionStateChange(bool subscribed) { - { - AutoLock lock(mutex_); - state_.subscribed = subscribed; - } + DCHECK(non_thread_safe_.CalledOnValidThread()); + state_.subscribed = subscribed; LOG(INFO) << "P2P: " << (subscribed ? "subscribed" : "unsubscribed"); if (delegate_) { delegate_->OnNotificationStateChange(subscribed); @@ -208,6 +191,7 @@ void TalkMediatorImpl::OnSubscriptionStateChange(bool subscribed) { void TalkMediatorImpl::OnIncomingNotification( const IncomingNotificationData& notification_data) { + DCHECK(non_thread_safe_.CalledOnValidThread()); LOG(INFO) << "P2P: Updates are available on the server."; if (delegate_) { delegate_->OnIncomingNotification(notification_data); @@ -215,6 +199,7 @@ void TalkMediatorImpl::OnIncomingNotification( } void TalkMediatorImpl::OnOutgoingNotification() { + DCHECK(non_thread_safe_.CalledOnValidThread()); LOG(INFO) << "P2P: Peers were notified that updates are available on the server."; if (delegate_) { diff --git a/chrome/common/net/notifier/listener/talk_mediator_impl.h b/chrome/common/net/notifier/listener/talk_mediator_impl.h index 4ff4681..8625fa4 100644 --- a/chrome/common/net/notifier/listener/talk_mediator_impl.h +++ b/chrome/common/net/notifier/listener/talk_mediator_impl.h @@ -12,7 +12,7 @@ #include <string> #include <vector> -#include "base/lock.h" +#include "base/non_thread_safe.h" #include "base/scoped_ptr.h" #include "chrome/common/net/notifier/listener/mediator_thread.h" #include "chrome/common/net/notifier/listener/talk_mediator.h" @@ -82,11 +82,7 @@ class TalkMediatorImpl // mediator thread's SignalStateChange object. void TalkMediatorInitialization(bool should_connect); - // Protects state_, xmpp_settings_, and subscribed_services_list_. - // - // TODO(akalin): Remove this once we use this class from one thread - // only. - Lock mutex_; + NonThreadSafe non_thread_safe_; // Delegate, which we don't own. May be NULL. TalkMediator::Delegate* delegate_; diff --git a/chrome/common/net/notifier/listener/talk_mediator_unittest.cc b/chrome/common/net/notifier/listener/talk_mediator_unittest.cc index 1a8b0b4..ea9d222 100644 --- a/chrome/common/net/notifier/listener/talk_mediator_unittest.cc +++ b/chrome/common/net/notifier/listener/talk_mediator_unittest.cc @@ -6,6 +6,7 @@ #include "base/basictypes.h" #include "base/logging.h" +#include "base/message_loop.h" #include "chrome/common/net/fake_network_change_notifier_thread.h" #include "chrome/common/net/notifier/listener/mediator_thread_mock.h" #include "chrome/common/net/notifier/listener/talk_mediator_impl.h" @@ -42,6 +43,9 @@ class TalkMediatorImplTest : public testing::Test { int last_message_; private: + // TalkMediatorImpl expects a message loop. + MessageLoop message_loop_; + DISALLOW_COPY_AND_ASSIGN(TalkMediatorImplTest); }; |