diff options
author | nileshagrawal@chromium.org <nileshagrawal@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-07 00:08:14 +0000 |
---|---|---|
committer | nileshagrawal@chromium.org <nileshagrawal@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-07 00:08:14 +0000 |
commit | 09fb6e8705dcf450344a19b97d5ff2d529441549 (patch) | |
tree | 2e6a6489b64c00fcad0fceeb95a1fd4d9db0a92d | |
parent | 0d1add3db49c5cffead44f60a85c9e82f8d9582f (diff) | |
download | chromium_src-09fb6e8705dcf450344a19b97d5ff2d529441549.zip chromium_src-09fb6e8705dcf450344a19b97d5ff2d529441549.tar.gz chromium_src-09fb6e8705dcf450344a19b97d5ff2d529441549.tar.bz2 |
Move sync notifier contruction out of syncer thread.
Add thread safety checks to ensure that all the methods are called on
the same thread.
BUG=
TEST=
Review URL: http://codereview.chromium.org/6794005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@80724 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | chrome/browser/sync/engine/syncapi.cc | 9 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncapi.h | 3 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncapi_unittest.cc | 16 | ||||
-rw-r--r-- | chrome/browser/sync/glue/sync_backend_host.cc | 11 | ||||
-rw-r--r-- | chrome/browser/sync/glue/sync_backend_host.h | 6 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc | 28 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h | 4 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/p2p_notifier.cc | 36 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/p2p_notifier.h | 7 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/sync_notifier.h | 14 | ||||
-rw-r--r-- | chrome/service/cloud_print/cloud_print_proxy_backend.cc | 1 | ||||
-rw-r--r-- | jingle/notifier/listener/mediator_thread_impl.cc | 34 | ||||
-rw-r--r-- | jingle/notifier/listener/mediator_thread_impl.h | 4 | ||||
-rw-r--r-- | jingle/notifier/listener/talk_mediator_impl.cc | 41 | ||||
-rw-r--r-- | jingle/notifier/listener/talk_mediator_impl.h | 12 |
15 files changed, 163 insertions, 63 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc index 4303720..35f2382 100644 --- a/chrome/browser/sync/engine/syncapi.cc +++ b/chrome/browser/sync/engine/syncapi.cc @@ -1504,7 +1504,7 @@ class SyncManager::SyncInternal scoped_ptr<SyncerThreadAdapter> syncer_thread_; // The SyncNotifier which notifies us when updates need to be downloaded. - scoped_ptr<sync_notifier::SyncNotifier> sync_notifier_; + sync_notifier::SyncNotifier* sync_notifier_; // A multi-purpose status watch object that aggregates stats from various // sync components. @@ -1714,7 +1714,7 @@ bool SyncManager::SyncInternal::Init( registrar_ = model_safe_worker_registrar; setup_for_test_mode_ = setup_for_test_mode; - sync_notifier_.reset(sync_notifier); + sync_notifier_ = sync_notifier; sync_notifier_->AddObserver(this); share_.dir_manager.reset(new DirectoryManager(database_location)); @@ -1837,7 +1837,7 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { void SyncManager::SyncInternal::SendNotification() { DCHECK_EQ(MessageLoop::current(), core_message_loop_); - if (!sync_notifier_.get()) { + if (!sync_notifier_) { VLOG(1) << "Not sending notification: sync_notifier_ is NULL"; return; } @@ -2204,9 +2204,8 @@ void SyncManager::SyncInternal::Shutdown() { // We NULL out sync_notifer_ so that any pending tasks do not // trigger further notifications. // TODO(akalin): NULL the other member variables defensively, too. - if (sync_notifier_.get()) { + if (sync_notifier_) { sync_notifier_->RemoveObserver(this); - sync_notifier_.reset(); } // |this| is about to be destroyed, so we have to ensure any messages diff --git a/chrome/browser/sync/engine/syncapi.h b/chrome/browser/sync/engine/syncapi.h index 2ec06a3..1718c71 100644 --- a/chrome/browser/sync/engine/syncapi.h +++ b/chrome/browser/sync/engine/syncapi.h @@ -851,8 +851,7 @@ class SyncManager { // |model_safe_worker| ownership is given to the SyncManager. // |user_agent| is a 7-bit ASCII string suitable for use as the User-Agent // HTTP header. Used internally when collecting stats to classify clients. - // |sync_notifier| will be owned internally and used to listen for - // notifications. + // |sync_notifier| used to listen for notifications, not owned. bool Init(const FilePath& database_location, const char* sync_server_and_path, int sync_server_port, diff --git a/chrome/browser/sync/engine/syncapi_unittest.cc b/chrome/browser/sync/engine/syncapi_unittest.cc index 52116a0..10d53a3 100644 --- a/chrome/browser/sync/engine/syncapi_unittest.cc +++ b/chrome/browser/sync/engine/syncapi_unittest.cc @@ -654,25 +654,24 @@ class SyncManagerTest : public testing::Test, credentials.email = "foo@bar.com"; credentials.sync_token = "sometoken"; - scoped_ptr<StrictMock<SyncNotifierMock> > sync_notifier_mock( - new StrictMock<SyncNotifierMock>()); - EXPECT_CALL(*sync_notifier_mock, AddObserver(_)). + sync_notifier_mock_.reset(new StrictMock<SyncNotifierMock>()); + EXPECT_CALL(*sync_notifier_mock_, AddObserver(_)). WillOnce(Invoke(this, &SyncManagerTest::SyncNotifierAddObserver)); - EXPECT_CALL(*sync_notifier_mock, SetState("")); - EXPECT_CALL(*sync_notifier_mock, + EXPECT_CALL(*sync_notifier_mock_, SetState("")); + EXPECT_CALL(*sync_notifier_mock_, UpdateCredentials(credentials.email, credentials.sync_token)); - EXPECT_CALL(*sync_notifier_mock, UpdateEnabledTypes(_)). + EXPECT_CALL(*sync_notifier_mock_, UpdateEnabledTypes(_)). Times(AtLeast(1)). WillRepeatedly( Invoke(this, &SyncManagerTest::SyncNotifierUpdateEnabledTypes)); - EXPECT_CALL(*sync_notifier_mock, RemoveObserver(_)). + EXPECT_CALL(*sync_notifier_mock_, RemoveObserver(_)). WillOnce(Invoke(this, &SyncManagerTest::SyncNotifierRemoveObserver)); EXPECT_FALSE(sync_notifier_observer_); sync_manager_.Init(temp_dir_.path(), "bogus", 0, false, new TestHttpPostProviderFactory(), this, "bogus", - credentials, sync_notifier_mock.release(), "", + credentials, sync_notifier_mock_.get(), "", true /* setup_for_test_mode */); EXPECT_TRUE(sync_notifier_observer_); @@ -775,6 +774,7 @@ class SyncManagerTest : public testing::Test, ScopedTempDir temp_dir_; // Sync Id's for the roots of the enabled datatypes. std::map<ModelType, int64> type_roots_; + scoped_ptr<StrictMock<SyncNotifierMock> > sync_notifier_mock_; protected: SyncManager sync_manager_; diff --git a/chrome/browser/sync/glue/sync_backend_host.cc b/chrome/browser/sync/glue/sync_backend_host.cc index fe6e1b3..5381ff9 100644 --- a/chrome/browser/sync/glue/sync_backend_host.cc +++ b/chrome/browser/sync/glue/sync_backend_host.cc @@ -25,6 +25,7 @@ #include "chrome/browser/sync/glue/password_model_worker.h" #include "chrome/browser/sync/glue/sync_backend_host.h" #include "chrome/browser/sync/js_arg_list.h" +#include "chrome/browser/sync/notifier/sync_notifier.h" #include "chrome/browser/sync/notifier/sync_notifier_factory.h" #include "chrome/browser/sync/sessions/session_state.h" // TODO(tim): Remove this! We should have a syncapi pass-thru instead. @@ -46,6 +47,7 @@ static const FilePath::CharType kSyncDataFolderName[] = FILE_PATH_LITERAL("Sync Data"); using browser_sync::DataTypeController; +using sync_notifier::SyncNotifierFactory; typedef TokenService::TokenAvailableDetails TokenAvailableDetails; typedef GoogleServiceAuthError AuthError; @@ -709,6 +711,10 @@ SyncBackendHost::Core::Core(SyncBackendHost* backend) parent_router_(NULL), processing_passphrase_(false), deferred_nudge_for_cleanup_requested_(false) { + const std::string& client_info = webkit_glue::GetUserAgent(GURL()); + SyncNotifierFactory sync_notifier_factory(client_info); + sync_notifier_.reset(sync_notifier_factory.CreateSyncNotifier( + *CommandLine::ForCurrentProcess())); } // Helper to construct a user agent string (ASCII) suitable for use by @@ -758,8 +764,6 @@ void SyncBackendHost::Core::DoInitialize(const DoInitializeOptions& options) { syncapi_->AddObserver(this); const FilePath& path_str = host_->sync_data_folder_path(); - const std::string& client_info = webkit_glue::GetUserAgent(GURL()); - sync_notifier::SyncNotifierFactory sync_notifier_factory(client_info); success = syncapi_->Init( path_str, (options.service_url.host() + options.service_url.path()).c_str(), @@ -769,8 +773,7 @@ void SyncBackendHost::Core::DoInitialize(const DoInitializeOptions& options) { host_, // ModelSafeWorkerRegistrar. MakeUserAgentForSyncapi().c_str(), options.credentials, - sync_notifier_factory.CreateSyncNotifier( - *CommandLine::ForCurrentProcess()), + sync_notifier_.get(), options.restored_key_for_bootstrapping, options.setup_for_test_mode); DCHECK(success) << "Syncapi initialization failed!"; diff --git a/chrome/browser/sync/glue/sync_backend_host.h b/chrome/browser/sync/glue/sync_backend_host.h index f78c137..65a5aaa 100644 --- a/chrome/browser/sync/glue/sync_backend_host.h +++ b/chrome/browser/sync/glue/sync_backend_host.h @@ -33,6 +33,10 @@ class CancelableTask; class Profile; +namespace sync_notifier { +class SyncNotifier; +} // namespace sync_notifier + namespace browser_sync { namespace sessions { @@ -500,6 +504,8 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { // The top-level syncapi entry point. scoped_ptr<sync_api::SyncManager> syncapi_; + scoped_ptr<sync_notifier::SyncNotifier> sync_notifier_; + JsSyncManagerObserver sync_manager_observer_; JsEventRouter* parent_router_; diff --git a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc index 11adf97..ae97976 100644 --- a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc +++ b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc @@ -12,7 +12,8 @@ namespace sync_notifier { NonBlockingInvalidationNotifier::NonBlockingInvalidationNotifier( const notifier::NotifierOptions& notifier_options, const std::string& client_info) - : parent_message_loop_(MessageLoop::current()), + : construction_message_loop_(MessageLoop::current()), + method_message_loop_(NULL), observers_(new ObserverListThreadSafe<SyncNotifierObserver>()), worker_thread_("InvalidationNotifier worker thread"), worker_thread_vars_(NULL) { @@ -29,7 +30,7 @@ NonBlockingInvalidationNotifier::NonBlockingInvalidationNotifier( } NonBlockingInvalidationNotifier::~NonBlockingInvalidationNotifier() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + DCHECK_EQ(MessageLoop::current(), construction_message_loop_); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod( @@ -41,18 +42,18 @@ NonBlockingInvalidationNotifier::~NonBlockingInvalidationNotifier() { void NonBlockingInvalidationNotifier::AddObserver( SyncNotifierObserver* observer) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); observers_->AddObserver(observer); } void NonBlockingInvalidationNotifier::RemoveObserver( SyncNotifierObserver* observer) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); observers_->RemoveObserver(observer); } void NonBlockingInvalidationNotifier::SetState(const std::string& state) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod( @@ -63,7 +64,7 @@ void NonBlockingInvalidationNotifier::SetState(const std::string& state) { void NonBlockingInvalidationNotifier::UpdateCredentials( const std::string& email, const std::string& token) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod( @@ -74,7 +75,7 @@ void NonBlockingInvalidationNotifier::UpdateCredentials( void NonBlockingInvalidationNotifier::UpdateEnabledTypes( const syncable::ModelTypeSet& types) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod( @@ -84,7 +85,7 @@ void NonBlockingInvalidationNotifier::UpdateEnabledTypes( } void NonBlockingInvalidationNotifier::SendNotification() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); // InvalidationClient doesn't implement SendNotification(), so no // need to forward on the call. } @@ -94,11 +95,20 @@ MessageLoop* NonBlockingInvalidationNotifier::worker_message_loop() { DCHECK(current_message_loop); MessageLoop* worker_message_loop = worker_thread_.message_loop(); DCHECK(worker_message_loop); - DCHECK(current_message_loop == parent_message_loop_ || + DCHECK(current_message_loop == construction_message_loop_ || + current_message_loop == method_message_loop_ || current_message_loop == worker_message_loop); return worker_message_loop; } +void NonBlockingInvalidationNotifier::CheckOrSetValidThread() { + if (method_message_loop_) { + DCHECK_EQ(MessageLoop::current(), method_message_loop_); + } else { + method_message_loop_ = MessageLoop::current(); + } +} + void NonBlockingInvalidationNotifier::CreateWorkerThreadVars( const notifier::NotifierOptions& notifier_options, const std::string& client_info) { diff --git a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h index dfb7b4c..4b6b3fd 100644 --- a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h +++ b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h @@ -93,6 +93,7 @@ class NonBlockingInvalidationNotifier : public SyncNotifier { const notifier::NotifierOptions& notifier_options, const std::string& client_info); void DestroyWorkerThreadVars(); + void CheckOrSetValidThread(); // Equivalents of the public functions that are run on the worker // thread. @@ -101,7 +102,8 @@ class NonBlockingInvalidationNotifier : public SyncNotifier { const std::string& token); void UpdateEnabledTypesOnWorkerThread(const syncable::ModelTypeSet& types); - MessageLoop* parent_message_loop_; + MessageLoop* construction_message_loop_; + MessageLoop* method_message_loop_; scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> > observers_; diff --git a/chrome/browser/sync/notifier/p2p_notifier.cc b/chrome/browser/sync/notifier/p2p_notifier.cc index c0b8d7f..da7eae4 100644 --- a/chrome/browser/sync/notifier/p2p_notifier.cc +++ b/chrome/browser/sync/notifier/p2p_notifier.cc @@ -24,24 +24,42 @@ P2PNotifier::P2PNotifier( new notifier::MediatorThreadImpl(notifier_options), notifier_options)), logged_in_(false), - notifications_enabled_(false) { + notifications_enabled_(false), + construction_message_loop_(MessageLoop::current()), + method_message_loop_(NULL) { talk_mediator_->SetDelegate(this); } -P2PNotifier::~P2PNotifier() {} +P2PNotifier::~P2PNotifier() { + DCHECK_EQ(MessageLoop::current(), construction_message_loop_); +} void P2PNotifier::AddObserver(SyncNotifierObserver* observer) { + CheckOrSetValidThread(); observer_list_.AddObserver(observer); } +// Note: Since we need to shutdown TalkMediator on the method_thread, we are +// calling Logout on TalkMediator when the last observer is removed. +// Users will need to call UpdateCredentials again to use the same object. +// TODO(akalin): Think of a better solution to fix this. void P2PNotifier::RemoveObserver(SyncNotifierObserver* observer) { + CheckOrSetValidThread(); observer_list_.RemoveObserver(observer); + + // Logout after the last observer is removed. + if (observer_list_.size() == 0) { + talk_mediator_->Logout(); + } } -void P2PNotifier::SetState(const std::string& state) {} +void P2PNotifier::SetState(const std::string& state) { + CheckOrSetValidThread(); +} void P2PNotifier::UpdateCredentials( const std::string& email, const std::string& token) { + CheckOrSetValidThread(); // If already logged in, the new credentials will take effect on the // next reconnection. talk_mediator_->SetAuthToken(email, token, SYNC_SERVICE_NAME); @@ -64,11 +82,13 @@ void P2PNotifier::UpdateCredentials( } void P2PNotifier::UpdateEnabledTypes(const syncable::ModelTypeSet& types) { + CheckOrSetValidThread(); enabled_types_ = types; MaybeEmitNotification(); } void P2PNotifier::SendNotification() { + CheckOrSetValidThread(); VLOG(1) << "Sending XMPP notification..."; notifier::Notification notification; notification.channel = kSyncNotificationChannel; @@ -82,6 +102,7 @@ void P2PNotifier::SendNotification() { } void P2PNotifier::OnNotificationStateChange(bool notifications_enabled) { + CheckOrSetValidThread(); notifications_enabled_ = notifications_enabled; FOR_EACH_OBSERVER(SyncNotifierObserver, observer_list_, OnNotificationStateChange(notifications_enabled_)); @@ -90,6 +111,7 @@ void P2PNotifier::OnNotificationStateChange(bool notifications_enabled) { void P2PNotifier::OnIncomingNotification( const notifier::Notification& notification) { + CheckOrSetValidThread(); VLOG(1) << "Sync received P2P notification."; if (notification.channel != kSyncNotificationChannel) { LOG(WARNING) << "Notification fron unexpected source: " @@ -120,4 +142,12 @@ void P2PNotifier::MaybeEmitNotification() { OnIncomingNotification(type_payloads)); } +void P2PNotifier::CheckOrSetValidThread() { + if (method_message_loop_) { + DCHECK_EQ(MessageLoop::current(), method_message_loop_); + } else { + method_message_loop_ = MessageLoop::current(); + } +} + } // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/p2p_notifier.h b/chrome/browser/sync/notifier/p2p_notifier.h index 67bad41..54cbf70 100644 --- a/chrome/browser/sync/notifier/p2p_notifier.h +++ b/chrome/browser/sync/notifier/p2p_notifier.h @@ -16,6 +16,8 @@ #include "chrome/browser/sync/syncable/model_type.h" #include "jingle/notifier/listener/talk_mediator.h" +class MessageLoop; + namespace notifier { struct NotifierOptions; } // namespace @@ -49,6 +51,7 @@ class P2PNotifier // Call OnIncomingNotification() on observers if we have a non-empty // set of enabled types. void MaybeEmitNotification(); + void CheckOrSetValidThread(); ObserverList<SyncNotifierObserver> observer_list_; @@ -61,7 +64,9 @@ class P2PNotifier bool notifications_enabled_; syncable::ModelTypeSet enabled_types_; + MessageLoop* construction_message_loop_; + MessageLoop* method_message_loop_; }; -} +} // namespace sync_notifier #endif // CHROME_BROWSER_SYNC_NOTIFIER_P2P_NOTIFIER_H_ diff --git a/chrome/browser/sync/notifier/sync_notifier.h b/chrome/browser/sync/notifier/sync_notifier.h index da15259..5bf6c8c 100644 --- a/chrome/browser/sync/notifier/sync_notifier.h +++ b/chrome/browser/sync/notifier/sync_notifier.h @@ -5,6 +5,20 @@ // Interface to the sync notifier, which is an object that receives // notifications when updates are available for a set of sync types. // All the observers are notified when such an event happens. +// +// A SyncNotifier must be destroyed on the same thread it was created on, +// and all its methods must be called on the same thread (not necessarily +// the one it was created on). If the methods thread is different from the +// creation thread, then the methods thread must not exist when the SyncNotifier +// is created and destroyed. +// +// In particular, the SyncNotifier will be created on the UI thread, the syncer +// core thread will be created, the SyncNotifier will be used on that core +// thread, the syncer core thread will be destroyed, and then the SyncNotifier +// will be destroyed. +// +// TODO(akalin): Remove the code to deal with this situation once the syncer +// core thread goes away. #ifndef CHROME_BROWSER_SYNC_NOTIFIER_SYNC_NOTIFIER_H_ #define CHROME_BROWSER_SYNC_NOTIFIER_SYNC_NOTIFIER_H_ diff --git a/chrome/service/cloud_print/cloud_print_proxy_backend.cc b/chrome/service/cloud_print/cloud_print_proxy_backend.cc index cbf55ee..170976c 100644 --- a/chrome/service/cloud_print/cloud_print_proxy_backend.cc +++ b/chrome/service/cloud_print/cloud_print_proxy_backend.cc @@ -449,6 +449,7 @@ void CloudPrintProxyBackend::Core::DoShutdown() { index->second->Shutdown(); } // Important to delete the TalkMediator on this thread. + talk_mediator_->Logout(); talk_mediator_.reset(); notifications_enabled_ = false; notifications_enabled_since_ = base::TimeTicks(); diff --git a/jingle/notifier/listener/mediator_thread_impl.cc b/jingle/notifier/listener/mediator_thread_impl.cc index 6887f10..0d96d01 100644 --- a/jingle/notifier/listener/mediator_thread_impl.cc +++ b/jingle/notifier/listener/mediator_thread_impl.cc @@ -22,14 +22,15 @@ namespace notifier { MediatorThreadImpl::MediatorThreadImpl( const NotifierOptions& notifier_options) : observers_(new ObserverListThreadSafe<Observer>()), - parent_message_loop_(MessageLoop::current()), + construction_message_loop_(MessageLoop::current()), + method_message_loop_(NULL), notifier_options_(notifier_options), worker_thread_("MediatorThread worker thread") { - DCHECK(parent_message_loop_); + DCHECK(construction_message_loop_); } MediatorThreadImpl::~MediatorThreadImpl() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + DCHECK_EQ(MessageLoop::current(), construction_message_loop_); // If the worker thread is still around, we need to call Logout() so // that all the variables living it get destroyed properly (i.e., on // the worker thread). @@ -39,15 +40,17 @@ MediatorThreadImpl::~MediatorThreadImpl() { } void MediatorThreadImpl::AddObserver(Observer* observer) { + CheckOrSetValidThread(); observers_->AddObserver(observer); } void MediatorThreadImpl::RemoveObserver(Observer* observer) { + CheckOrSetValidThread(); observers_->RemoveObserver(observer); } void MediatorThreadImpl::Start() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + DCHECK_EQ(MessageLoop::current(), construction_message_loop_); // We create the worker thread as an IO thread in preparation for // making this use Chrome sockets. const base::Thread::Options options(MessageLoop::TYPE_IO, 0); @@ -57,14 +60,15 @@ void MediatorThreadImpl::Start() { } void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); + worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings)); } void MediatorThreadImpl::Logout() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect)); @@ -76,7 +80,7 @@ void MediatorThreadImpl::Logout() { } void MediatorThreadImpl::ListenForUpdates() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, @@ -85,7 +89,7 @@ void MediatorThreadImpl::ListenForUpdates() { void MediatorThreadImpl::SubscribeForUpdates( const SubscriptionList& subscriptions) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod( @@ -96,7 +100,7 @@ void MediatorThreadImpl::SubscribeForUpdates( void MediatorThreadImpl::SendNotification( const Notification& data) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification, @@ -105,7 +109,7 @@ void MediatorThreadImpl::SendNotification( void MediatorThreadImpl::UpdateXmppSettings( const buzz::XmppClientSettings& settings) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + CheckOrSetValidThread(); worker_message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, @@ -118,7 +122,7 @@ MessageLoop* MediatorThreadImpl::worker_message_loop() { DCHECK(current_message_loop); MessageLoop* worker_message_loop = worker_thread_.message_loop(); DCHECK(worker_message_loop); - DCHECK(current_message_loop == parent_message_loop_ || + DCHECK(current_message_loop == method_message_loop_ || current_message_loop == worker_message_loop); return worker_message_loop; } @@ -229,4 +233,12 @@ void MediatorThreadImpl::OnDisconnect() { observers_->Notify(&Observer::OnConnectionStateChange, false); } +void MediatorThreadImpl::CheckOrSetValidThread() { + if (method_message_loop_) { + DCHECK_EQ(MessageLoop::current(), method_message_loop_); + } else { + method_message_loop_ = MessageLoop::current(); + } +} + } // namespace notifier diff --git a/jingle/notifier/listener/mediator_thread_impl.h b/jingle/notifier/listener/mediator_thread_impl.h index e5ea7b4..0d1f1c4 100644 --- a/jingle/notifier/listener/mediator_thread_impl.h +++ b/jingle/notifier/listener/mediator_thread_impl.h @@ -86,7 +86,8 @@ class MediatorThreadImpl : public MediatorThread, public LoginDelegate, MessageLoop* worker_message_loop(); scoped_refptr<ObserverListThreadSafe<Observer> > observers_; - MessageLoop* parent_message_loop_; + MessageLoop* construction_message_loop_; + MessageLoop* method_message_loop_; base::WeakPtr<talk_base::Task> base_task_; private: @@ -101,6 +102,7 @@ class MediatorThreadImpl : public MediatorThread, public LoginDelegate, void DoSendNotification( const Notification& data); void DoUpdateXmppSettings(const buzz::XmppClientSettings& settings); + void CheckOrSetValidThread(); const NotifierOptions notifier_options_; diff --git a/jingle/notifier/listener/talk_mediator_impl.cc b/jingle/notifier/listener/talk_mediator_impl.cc index fa0fc4fc..5bd70dc 100644 --- a/jingle/notifier/listener/talk_mediator_impl.cc +++ b/jingle/notifier/listener/talk_mediator_impl.cc @@ -5,6 +5,7 @@ #include "jingle/notifier/listener/talk_mediator_impl.h" #include "base/logging.h" +#include "base/message_loop.h" #include "jingle/notifier/base/notifier_options_util.h" namespace notifier { @@ -14,21 +15,20 @@ TalkMediatorImpl::TalkMediatorImpl( const NotifierOptions& notifier_options) : delegate_(NULL), mediator_thread_(mediator_thread), - notifier_options_(notifier_options) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + notifier_options_(notifier_options), + construction_message_loop_(MessageLoop::current()), + method_message_loop_(NULL) { mediator_thread_->Start(); state_.started = 1; } TalkMediatorImpl::~TalkMediatorImpl() { - DCHECK(non_thread_safe_.CalledOnValidThread()); - if (state_.started) { - Logout(); - } + DCHECK_EQ(MessageLoop::current(), construction_message_loop_); + DCHECK(!state_.started); } bool TalkMediatorImpl::Login() { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); // Connect to the mediator thread and start processing messages. mediator_thread_->AddObserver(this); if (state_.initialized && !state_.logging_in && !state_.logged_in) { @@ -40,7 +40,7 @@ bool TalkMediatorImpl::Login() { } bool TalkMediatorImpl::Logout() { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); if (state_.started) { state_.started = 0; state_.logging_in = 0; @@ -56,7 +56,7 @@ bool TalkMediatorImpl::Logout() { } bool TalkMediatorImpl::SendNotification(const Notification& data) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); if (state_.logged_in && state_.subscribed) { mediator_thread_->SendNotification(data); return true; @@ -65,15 +65,14 @@ bool TalkMediatorImpl::SendNotification(const Notification& data) { } void TalkMediatorImpl::SetDelegate(TalkMediator::Delegate* delegate) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + DCHECK_EQ(MessageLoop::current(), construction_message_loop_); delegate_ = delegate; } void TalkMediatorImpl::SetAuthToken(const std::string& email, const std::string& token, const std::string& token_service) { - DCHECK(non_thread_safe_.CalledOnValidThread()); - + CheckOrSetValidThread(); xmpp_settings_ = MakeXmppClientSettings(notifier_options_, email, token, token_service); @@ -87,7 +86,7 @@ void TalkMediatorImpl::SetAuthToken(const std::string& email, } void TalkMediatorImpl::AddSubscription(const Subscription& subscription) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); subscriptions_.push_back(subscription); if (state_.logged_in) { VLOG(1) << "Resubscribing for updates, a new service got added"; @@ -97,7 +96,7 @@ void TalkMediatorImpl::AddSubscription(const Subscription& subscription) { void TalkMediatorImpl::OnConnectionStateChange(bool logged_in) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); // If we just lost connection, then the MediatorThread implementation will // try to log in again. We need to set state_.logging_in to true in that case. state_.logging_in = !logged_in; @@ -116,7 +115,7 @@ void TalkMediatorImpl::OnConnectionStateChange(bool logged_in) { } void TalkMediatorImpl::OnSubscriptionStateChange(bool subscribed) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); state_.subscribed = subscribed; VLOG(1) << "P2P: " << (subscribed ? "subscribed" : "unsubscribed"); if (delegate_) @@ -125,18 +124,26 @@ void TalkMediatorImpl::OnSubscriptionStateChange(bool subscribed) { void TalkMediatorImpl::OnIncomingNotification( const Notification& notification) { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); VLOG(1) << "P2P: Updates are available on the server."; if (delegate_) delegate_->OnIncomingNotification(notification); } void TalkMediatorImpl::OnOutgoingNotification() { - DCHECK(non_thread_safe_.CalledOnValidThread()); + CheckOrSetValidThread(); VLOG(1) << "P2P: Peers were notified that updates are available on the " "server."; if (delegate_) delegate_->OnOutgoingNotification(); } +void TalkMediatorImpl::CheckOrSetValidThread() { + if (method_message_loop_) { + DCHECK_EQ(MessageLoop::current(), method_message_loop_); + } else { + method_message_loop_ = MessageLoop::current(); + } +} + } // namespace notifier diff --git a/jingle/notifier/listener/talk_mediator_impl.h b/jingle/notifier/listener/talk_mediator_impl.h index 5ae0030..d563ed0 100644 --- a/jingle/notifier/listener/talk_mediator_impl.h +++ b/jingle/notifier/listener/talk_mediator_impl.h @@ -20,6 +20,8 @@ #include "jingle/notifier/listener/talk_mediator.h" #include "talk/xmpp/xmppclientsettings.h" +class MessageLoop; + namespace notifier { class TalkMediatorImpl @@ -36,13 +38,18 @@ class TalkMediatorImpl // TalkMediator implementation. + // Should be called on the same thread as the constructor. virtual void SetDelegate(TalkMediator::Delegate* delegate); + // All the methods below should be called on the same thread. It may or may + // not be same as the thread on which the object was constructed. + // |email| must be a valid email address (e.g., foo@bar.com). virtual void SetAuthToken(const std::string& email, const std::string& token, const std::string& token_service); virtual bool Login(); + // Users must call Logout once Login is called. virtual bool Logout(); virtual bool SendNotification(const Notification& data); @@ -74,7 +81,7 @@ class TalkMediatorImpl unsigned int subscribed : 1; // Subscribed to the xmpp receiving channel. }; - base::NonThreadSafe non_thread_safe_; + void CheckOrSetValidThread(); // Delegate, which we don't own. May be NULL. TalkMediator::Delegate* delegate_; @@ -92,6 +99,9 @@ class TalkMediatorImpl SubscriptionList subscriptions_; + MessageLoop* construction_message_loop_; + MessageLoop* method_message_loop_; + FRIEND_TEST_ALL_PREFIXES(TalkMediatorImplTest, SetAuthToken); FRIEND_TEST_ALL_PREFIXES(TalkMediatorImplTest, SendNotification); FRIEND_TEST_ALL_PREFIXES(TalkMediatorImplTest, MediatorThreadCallbacks); |