diff options
26 files changed, 524 insertions, 468 deletions
diff --git a/chrome/browser/sync/glue/sync_backend_host.cc b/chrome/browser/sync/glue/sync_backend_host.cc index 5381ff9..361b0c0 100644 --- a/chrome/browser/sync/glue/sync_backend_host.cc +++ b/chrome/browser/sync/glue/sync_backend_host.cc @@ -143,6 +143,10 @@ void SyncBackendHost::Initialize( // Nigori is populated by default now. registrar_.routing_info[syncable::NIGORI] = GROUP_PASSIVE; + // TODO(akalin): Create SyncNotifier here and pass it in as part of + // DoInitializeOptions. + core_->CreateSyncNotifier(baseline_context_getter); + InitCore(Core::DoInitializeOptions( sync_service_url, MakeHttpBridgeFactory(baseline_context_getter), @@ -639,6 +643,16 @@ void SyncBackendHost::Core::FinishConfigureDataTypesOnFrontendLoop() { host_->FinishConfigureDataTypesOnFrontendLoop(); } + +void SyncBackendHost::Core::CreateSyncNotifier( + const scoped_refptr<net::URLRequestContextGetter>& request_context_getter) { + const std::string& client_info = webkit_glue::GetUserAgent(GURL()); + SyncNotifierFactory sync_notifier_factory(client_info); + sync_notifier_.reset(sync_notifier_factory.CreateSyncNotifier( + *CommandLine::ForCurrentProcess(), + request_context_getter)); +} + SyncBackendHost::Core::DoInitializeOptions::DoInitializeOptions( const GURL& service_url, sync_api::HttpPostProviderFactory* http_bridge_factory, @@ -711,10 +725,6 @@ SyncBackendHost::Core::Core(SyncBackendHost* backend) parent_router_(NULL), processing_passphrase_(false), deferred_nudge_for_cleanup_requested_(false) { - const std::string& client_info = webkit_glue::GetUserAgent(GURL()); - SyncNotifierFactory sync_notifier_factory(client_info); - sync_notifier_.reset(sync_notifier_factory.CreateSyncNotifier( - *CommandLine::ForCurrentProcess())); } // Helper to construct a user agent string (ASCII) suitable for use by diff --git a/chrome/browser/sync/glue/sync_backend_host.h b/chrome/browser/sync/glue/sync_backend_host.h index 65a5aaa..1e65dd9 100644 --- a/chrome/browser/sync/glue/sync_backend_host.h +++ b/chrome/browser/sync/glue/sync_backend_host.h @@ -33,6 +33,10 @@ class CancelableTask; class Profile; +namespace net { +class URLRequestContextGetter; +} + namespace sync_notifier { class SyncNotifier; } // namespace sync_notifier @@ -318,6 +322,11 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { bool setup_for_test_mode; }; + // Called on |frontend_loop_|. + void CreateSyncNotifier(const scoped_refptr<net::URLRequestContextGetter>& + request_context_getter); + + // Note: // // The Do* methods are the various entry points from our SyncBackendHost. diff --git a/chrome/browser/sync/notifier/invalidation_notifier.cc b/chrome/browser/sync/notifier/invalidation_notifier.cc index 6c0fb87..9dd6a24 100644 --- a/chrome/browser/sync/notifier/invalidation_notifier.cc +++ b/chrome/browser/sync/notifier/invalidation_notifier.cc @@ -5,6 +5,7 @@ #include "chrome/browser/sync/notifier/invalidation_notifier.h" #include "base/logging.h" +#include "base/message_loop_proxy.h" #include "chrome/browser/sync/notifier/sync_notifier_observer.h" #include "chrome/browser/sync/protocol/service_constants.h" #include "chrome/browser/sync/syncable/model_type_payload_map.h" @@ -12,6 +13,7 @@ #include "jingle/notifier/base/notifier_options_util.h" #include "jingle/notifier/communicator/connection_options.h" #include "net/base/host_port_pair.h" +#include "net/url_request/url_request_context.h" #include "talk/xmpp/jid.h" #include "talk/xmpp/xmppclientsettings.h" @@ -19,16 +21,16 @@ namespace sync_notifier { InvalidationNotifier::InvalidationNotifier( const notifier::NotifierOptions& notifier_options, - net::HostResolver* host_resolver, - net::CertVerifier* cert_verifier, const std::string& client_info) : state_(STOPPED), notifier_options_(notifier_options), - host_resolver_(host_resolver), - cert_verifier_(cert_verifier), client_info_(client_info) { DCHECK_EQ(notifier::NOTIFICATION_SERVER, notifier_options.notification_method); + DCHECK(notifier_options_.request_context_getter); + // TODO(akalin): Replace NonThreadSafe checks with IO thread checks. + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); } InvalidationNotifier::~InvalidationNotifier() { @@ -66,8 +68,10 @@ void InvalidationNotifier::UpdateCredentials( new notifier::Login(this, xmpp_client_settings, notifier::ConnectionOptions(), - host_resolver_, - cert_verifier_, + notifier_options_.request_context_getter-> + GetURLRequestContext()->host_resolver(), + notifier_options_.request_context_getter-> + GetURLRequestContext()->cert_verifier(), notifier::GetServerList(notifier_options_), notifier_options_.try_ssltcp_first, notifier_options_.auth_mechanism)); diff --git a/chrome/browser/sync/notifier/invalidation_notifier.h b/chrome/browser/sync/notifier/invalidation_notifier.h index 3d8f9ad..81e64b3 100644 --- a/chrome/browser/sync/notifier/invalidation_notifier.h +++ b/chrome/browser/sync/notifier/invalidation_notifier.h @@ -25,26 +25,18 @@ #include "chrome/browser/sync/syncable/model_type.h" #include "jingle/notifier/base/notifier_options.h" #include "jingle/notifier/communicator/login.h" -#include "net/base/cert_verifier.h" - -namespace net { -class HostResolver; -class CertVerifier; -} // namespace net namespace sync_notifier { +// This class must live on the IO thread. class InvalidationNotifier : public SyncNotifier, public notifier::LoginDelegate, public ChromeInvalidationClient::Listener, public StateWriter { public: - // Does not take ownership of |host_resolver| or |cert_verifier|. InvalidationNotifier( const notifier::NotifierOptions& notifier_options, - net::HostResolver* host_resolver, - net::CertVerifier* cert_verifier, const std::string& client_info); virtual ~InvalidationNotifier(); @@ -89,8 +81,6 @@ class InvalidationNotifier // Used to build parameters for |login_|. const notifier::NotifierOptions notifier_options_; - net::HostResolver* const host_resolver_; - net::CertVerifier* const cert_verifier_; // Passed to |invalidation_client_|. const std::string client_info_; diff --git a/chrome/browser/sync/notifier/invalidation_notifier_unittest.cc b/chrome/browser/sync/notifier/invalidation_notifier_unittest.cc index fb189587..e6c7fce 100644 --- a/chrome/browser/sync/notifier/invalidation_notifier_unittest.cc +++ b/chrome/browser/sync/notifier/invalidation_notifier_unittest.cc @@ -9,6 +9,8 @@ #include "chrome/browser/sync/notifier/mock_sync_notifier_observer.h" #include "chrome/browser/sync/syncable/model_type.h" #include "chrome/browser/sync/syncable/model_type_payload_map.h" +#include "chrome/test/test_url_request_context_getter.h" +#include "content/browser/browser_thread.h" #include "jingle/notifier/base/fake_base_task.h" #include "net/base/cert_verifier.h" #include "net/base/host_resolver.h" @@ -24,30 +26,31 @@ using ::testing::StrictMock; class InvalidationNotifierTest : public testing::Test { public: - InvalidationNotifierTest() - : host_resolver_( - net::CreateSystemHostResolver( - net::HostResolver::kDefaultParallelism, NULL, NULL)), - invalidation_notifier_(notifier::NotifierOptions(), - host_resolver_.get(), - &cert_verifier_, - "fake_client_info") {} + InvalidationNotifierTest() : io_thread_(BrowserThread::IO, &message_loop_) {} protected: virtual void SetUp() { - invalidation_notifier_.AddObserver(&mock_observer_); + request_context_getter_ = new TestURLRequestContextGetter; + notifier::NotifierOptions notifier_options; + notifier_options.request_context_getter = request_context_getter_; + invalidation_notifier_.reset(new InvalidationNotifier(notifier_options, + "fake_client_info")); + invalidation_notifier_->AddObserver(&mock_observer_); } virtual void TearDown() { - invalidation_notifier_.RemoveObserver(&mock_observer_); + invalidation_notifier_->RemoveObserver(&mock_observer_); + invalidation_notifier_.reset(); + request_context_getter_ = NULL; } MessageLoop message_loop_; - scoped_ptr<net::HostResolver> host_resolver_; - net::CertVerifier cert_verifier_; - InvalidationNotifier invalidation_notifier_; + scoped_refptr<net::URLRequestContextGetter> request_context_getter_; + scoped_ptr<InvalidationNotifier> invalidation_notifier_; StrictMock<MockSyncNotifierObserver> mock_observer_; notifier::FakeBaseTask fake_base_task_; + // Since this test calls HostResolver code, we need an IO thread. + BrowserThread io_thread_; }; TEST_F(InvalidationNotifierTest, Basic) { @@ -72,20 +75,20 @@ TEST_F(InvalidationNotifierTest, Basic) { } EXPECT_CALL(mock_observer_, OnNotificationStateChange(false)); - invalidation_notifier_.SetState("fake_state"); - invalidation_notifier_.UpdateCredentials("foo@bar.com", "fake_token"); - invalidation_notifier_.UpdateEnabledTypes(types); + invalidation_notifier_->SetState("fake_state"); + invalidation_notifier_->UpdateCredentials("foo@bar.com", "fake_token"); + invalidation_notifier_->UpdateEnabledTypes(types); - invalidation_notifier_.OnConnect(fake_base_task_.AsWeakPtr()); + invalidation_notifier_->OnConnect(fake_base_task_.AsWeakPtr()); - invalidation_notifier_.WriteState("new_fake_state"); + invalidation_notifier_->WriteState("new_fake_state"); // Even though preferences isn't in the set of enabled types, we // should still propagate the notification. - invalidation_notifier_.OnInvalidate(syncable::PREFERENCES, "payload"); - invalidation_notifier_.OnInvalidateAll(); + invalidation_notifier_->OnInvalidate(syncable::PREFERENCES, "payload"); + invalidation_notifier_->OnInvalidateAll(); - invalidation_notifier_.OnDisconnect(); + invalidation_notifier_->OnDisconnect(); } TEST_F(InvalidationNotifierTest, UpdateEnabledTypes) { @@ -103,9 +106,9 @@ TEST_F(InvalidationNotifierTest, UpdateEnabledTypes) { EXPECT_CALL(mock_observer_, OnIncomingNotification(type_payloads)); } - invalidation_notifier_.OnConnect(fake_base_task_.AsWeakPtr()); - invalidation_notifier_.UpdateEnabledTypes(types); - invalidation_notifier_.OnInvalidateAll(); + invalidation_notifier_->OnConnect(fake_base_task_.AsWeakPtr()); + invalidation_notifier_->UpdateEnabledTypes(types); + invalidation_notifier_->OnInvalidateAll(); } } // namespace diff --git a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc index ae97976..d5e1848 100644 --- a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc +++ b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.cc @@ -5,82 +5,196 @@ #include "chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h" #include "base/logging.h" +#include "base/memory/scoped_ptr.h" #include "base/message_loop.h" +#include "base/observer_list_threadsafe.h" +#include "base/threading/thread.h" +#include "chrome/browser/sync/notifier/invalidation_notifier.h" +#include "chrome/browser/sync/notifier/sync_notifier_observer.h" namespace sync_notifier { -NonBlockingInvalidationNotifier::NonBlockingInvalidationNotifier( +class NonBlockingInvalidationNotifier::Core + : public base::RefCountedThreadSafe<NonBlockingInvalidationNotifier::Core>, + public SyncNotifierObserver { + public: + // Called on parent thread. + Core(); + + // Called on parent thread. + void AddObserver(SyncNotifierObserver* observer); + void RemoveObserver(SyncNotifierObserver* observer); + + // Helpers called on I/O thread. + void Initialize(const notifier::NotifierOptions& notifier_options, + const std::string& client_info); + void Teardown(); + void SetState(const std::string& state); + void UpdateCredentials(const std::string& email, const std::string& token); + void UpdateEnabledTypes(const syncable::ModelTypeSet& types); + void SendNotification(); + + // SyncNotifierObserver implementation (all called on I/O thread). + virtual void OnIncomingNotification( + const syncable::ModelTypePayloadMap& type_payloads); + virtual void OnNotificationStateChange(bool notifications_enabled); + virtual void StoreState(const std::string& state); + + private: + friend class + base::RefCountedThreadSafe<NonBlockingInvalidationNotifier::Core>; + // Called on parent or I/O thread. + ~Core(); + + scoped_ptr<InvalidationNotifier> invalidation_notifier_; + scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_; + scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> > observers_; + DISALLOW_COPY_AND_ASSIGN(Core); +}; + +NonBlockingInvalidationNotifier::Core::Core() + : observers_(new ObserverListThreadSafe<SyncNotifierObserver>()) { +} + +NonBlockingInvalidationNotifier::Core::~Core() { +} + +void NonBlockingInvalidationNotifier::Core::Initialize( const notifier::NotifierOptions& notifier_options, - const std::string& client_info) - : construction_message_loop_(MessageLoop::current()), - method_message_loop_(NULL), - observers_(new ObserverListThreadSafe<SyncNotifierObserver>()), - worker_thread_("InvalidationNotifier worker thread"), - worker_thread_vars_(NULL) { + const std::string& client_info) { + DCHECK(notifier_options.request_context_getter); DCHECK_EQ(notifier::NOTIFICATION_SERVER, notifier_options.notification_method); - const base::Thread::Options options(MessageLoop::TYPE_IO, 0); - CHECK(worker_thread_.StartWithOptions(options)); - worker_message_loop()->PostTask( + io_message_loop_proxy_ = notifier_options.request_context_getter-> + GetIOMessageLoopProxy(); + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + invalidation_notifier_.reset( + new InvalidationNotifier(notifier_options, client_info)); + invalidation_notifier_->AddObserver(this); +} + + +void NonBlockingInvalidationNotifier::Core::Teardown() { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + invalidation_notifier_->RemoveObserver(this); + invalidation_notifier_.reset(); + io_message_loop_proxy_ = NULL; +} + +void NonBlockingInvalidationNotifier::Core::AddObserver( + SyncNotifierObserver* observer) { + observers_->AddObserver(observer); +} + +void NonBlockingInvalidationNotifier::Core::RemoveObserver( + SyncNotifierObserver* observer) { + observers_->RemoveObserver(observer); +} + +void NonBlockingInvalidationNotifier::Core::SetState( + const std::string& state) { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + invalidation_notifier_->SetState(state); +} + +void NonBlockingInvalidationNotifier::Core::UpdateCredentials( + const std::string& email, const std::string& token) { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + invalidation_notifier_->UpdateCredentials(email, token); +} + +void NonBlockingInvalidationNotifier::Core::UpdateEnabledTypes( + const syncable::ModelTypeSet& types) { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + invalidation_notifier_->UpdateEnabledTypes(types); +} + +void NonBlockingInvalidationNotifier::Core::OnIncomingNotification( + const syncable::ModelTypePayloadMap& type_payloads) { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + observers_->Notify(&SyncNotifierObserver::OnIncomingNotification, + type_payloads); +} + +void NonBlockingInvalidationNotifier::Core::OnNotificationStateChange( + bool notifications_enabled) { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + observers_->Notify(&SyncNotifierObserver::OnNotificationStateChange, + notifications_enabled); +} + +void NonBlockingInvalidationNotifier::Core::StoreState( + const std::string& state) { + DCHECK(io_message_loop_proxy_->BelongsToCurrentThread()); + observers_->Notify(&SyncNotifierObserver::StoreState, state); +} + +NonBlockingInvalidationNotifier::NonBlockingInvalidationNotifier( + const notifier::NotifierOptions& notifier_options, + const std::string& client_info) + : core_(new Core), + construction_message_loop_proxy_( + base::MessageLoopProxy::CreateForCurrentThread()), + io_message_loop_proxy_(notifier_options.request_context_getter-> + GetIOMessageLoopProxy()) { + io_message_loop_proxy_->PostTask( FROM_HERE, NewRunnableMethod( - this, - &NonBlockingInvalidationNotifier::CreateWorkerThreadVars, + core_.get(), + &NonBlockingInvalidationNotifier::Core::Initialize, notifier_options, client_info)); } NonBlockingInvalidationNotifier::~NonBlockingInvalidationNotifier() { - DCHECK_EQ(MessageLoop::current(), construction_message_loop_); - worker_message_loop()->PostTask( + DCHECK(construction_message_loop_proxy_->BelongsToCurrentThread()); + io_message_loop_proxy_->PostTask( FROM_HERE, NewRunnableMethod( - this, - &NonBlockingInvalidationNotifier::DestroyWorkerThreadVars)); - worker_thread_.Stop(); - CHECK(!worker_thread_vars_); + core_.get(), + &NonBlockingInvalidationNotifier::Core::Teardown)); } void NonBlockingInvalidationNotifier::AddObserver( SyncNotifierObserver* observer) { CheckOrSetValidThread(); - observers_->AddObserver(observer); + core_->AddObserver(observer); } void NonBlockingInvalidationNotifier::RemoveObserver( SyncNotifierObserver* observer) { CheckOrSetValidThread(); - observers_->RemoveObserver(observer); + core_->RemoveObserver(observer); } void NonBlockingInvalidationNotifier::SetState(const std::string& state) { CheckOrSetValidThread(); - worker_message_loop()->PostTask( + io_message_loop_proxy_->PostTask( FROM_HERE, NewRunnableMethod( - this, - &NonBlockingInvalidationNotifier::SetStateOnWorkerThread, + core_.get(), + &NonBlockingInvalidationNotifier::Core::SetState, state)); } void NonBlockingInvalidationNotifier::UpdateCredentials( const std::string& email, const std::string& token) { CheckOrSetValidThread(); - worker_message_loop()->PostTask( + io_message_loop_proxy_->PostTask( FROM_HERE, NewRunnableMethod( - this, - &NonBlockingInvalidationNotifier::UpdateCredentialsOnWorkerThread, + core_.get(), + &NonBlockingInvalidationNotifier::Core::UpdateCredentials, email, token)); } void NonBlockingInvalidationNotifier::UpdateEnabledTypes( const syncable::ModelTypeSet& types) { CheckOrSetValidThread(); - worker_message_loop()->PostTask( + io_message_loop_proxy_->PostTask( FROM_HERE, NewRunnableMethod( - this, - &NonBlockingInvalidationNotifier::UpdateEnabledTypesOnWorkerThread, + core_.get(), + &NonBlockingInvalidationNotifier::Core::UpdateEnabledTypes, types)); } @@ -90,98 +204,13 @@ void NonBlockingInvalidationNotifier::SendNotification() { // need to forward on the call. } -MessageLoop* NonBlockingInvalidationNotifier::worker_message_loop() { - MessageLoop* current_message_loop = MessageLoop::current(); - DCHECK(current_message_loop); - MessageLoop* worker_message_loop = worker_thread_.message_loop(); - DCHECK(worker_message_loop); - DCHECK(current_message_loop == construction_message_loop_ || - current_message_loop == method_message_loop_ || - current_message_loop == worker_message_loop); - return worker_message_loop; -} - void NonBlockingInvalidationNotifier::CheckOrSetValidThread() { - if (method_message_loop_) { - DCHECK_EQ(MessageLoop::current(), method_message_loop_); + if (method_message_loop_proxy_) { + DCHECK(method_message_loop_proxy_->BelongsToCurrentThread()); } else { - method_message_loop_ = MessageLoop::current(); + method_message_loop_proxy_ = + base::MessageLoopProxy::CreateForCurrentThread(); } } -void NonBlockingInvalidationNotifier::CreateWorkerThreadVars( - const notifier::NotifierOptions& notifier_options, - const std::string& client_info) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - worker_thread_vars_ = - new WorkerThreadVars(notifier_options, client_info, observers_); -} - -void NonBlockingInvalidationNotifier::DestroyWorkerThreadVars() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - delete worker_thread_vars_; - worker_thread_vars_ = NULL; -} - -void NonBlockingInvalidationNotifier::SetStateOnWorkerThread( - const std::string& state) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - worker_thread_vars_->invalidation_notifier.SetState(state); -} - -void NonBlockingInvalidationNotifier::UpdateCredentialsOnWorkerThread( - const std::string& email, const std::string& token) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - worker_thread_vars_->invalidation_notifier.UpdateCredentials(email, token); -} - -void NonBlockingInvalidationNotifier::UpdateEnabledTypesOnWorkerThread( - const syncable::ModelTypeSet& types) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - worker_thread_vars_->invalidation_notifier.UpdateEnabledTypes(types); -} - -NonBlockingInvalidationNotifier::ObserverRouter::ObserverRouter( - const scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> >& - observers) : observers_(observers) {} - -NonBlockingInvalidationNotifier::ObserverRouter::~ObserverRouter() {} - -void NonBlockingInvalidationNotifier::ObserverRouter:: - OnIncomingNotification( - const syncable::ModelTypePayloadMap& type_payloads) { - observers_->Notify(&SyncNotifierObserver::OnIncomingNotification, - type_payloads); -} - -void NonBlockingInvalidationNotifier::ObserverRouter:: - OnNotificationStateChange( - bool notifications_enabled) { - observers_->Notify(&SyncNotifierObserver::OnNotificationStateChange, - notifications_enabled); -} - -void NonBlockingInvalidationNotifier::ObserverRouter::StoreState( - const std::string& state) { - observers_->Notify(&SyncNotifierObserver::StoreState, state); -} - -NonBlockingInvalidationNotifier::WorkerThreadVars::WorkerThreadVars( - const notifier::NotifierOptions& notifier_options, - const std::string& client_info, - const scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> >& - observers) - : host_resolver_( - net::CreateSystemHostResolver(net::HostResolver::kDefaultParallelism, - NULL, NULL)), - invalidation_notifier(notifier_options, host_resolver_.get(), - &cert_verifier_, client_info), - observer_router_(observers) { - invalidation_notifier.AddObserver(&observer_router_); -} - -NonBlockingInvalidationNotifier::WorkerThreadVars::~WorkerThreadVars() { - invalidation_notifier.RemoveObserver(&observer_router_); -} - } // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h index 4b6b3fd..362e4fb 100644 --- a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h +++ b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h @@ -13,14 +13,12 @@ #include "base/basictypes.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" -#include "base/observer_list_threadsafe.h" -#include "base/threading/thread.h" -#include "chrome/browser/sync/notifier/invalidation_notifier.h" #include "chrome/browser/sync/notifier/sync_notifier.h" -#include "chrome/browser/sync/notifier/sync_notifier_observer.h" +#include "jingle/notifier/base/notifier_options.h" -class MessageLoop; +namespace base { +class MessageLoopProxy; +} namespace sync_notifier { @@ -42,85 +40,17 @@ class NonBlockingInvalidationNotifier : public SyncNotifier { virtual void SendNotification(); private: - // Utility class that routes received notifications to a given - // thread-safe observer list. - class ObserverRouter : public SyncNotifierObserver { - public: - explicit ObserverRouter( - const scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> >& - observers); - - virtual ~ObserverRouter(); - - // SyncNotifierObserver implementation. - virtual void OnIncomingNotification( - const syncable::ModelTypePayloadMap& type_payloads); - virtual void OnNotificationStateChange(bool notifications_enabled); - virtual void StoreState(const std::string& state); - - private: - scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> > observers_; - }; - - // The set of variables that should only be created/used on the - // worker thread. - struct WorkerThreadVars { - WorkerThreadVars( - const notifier::NotifierOptions& notifier_options, - const std::string& client_info, - const scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> >& - observers); - ~WorkerThreadVars(); - - private: - scoped_ptr<net::HostResolver> host_resolver_; - net::CertVerifier cert_verifier_; - - public: - // This needs to be initialized after |host_resolver_| and - // |cert_verifier_|. - InvalidationNotifier invalidation_notifier; - - private: - ObserverRouter observer_router_; - - DISALLOW_COPY_AND_ASSIGN(WorkerThreadVars); - }; - - MessageLoop* worker_message_loop(); - - void CreateWorkerThreadVars( - const notifier::NotifierOptions& notifier_options, - const std::string& client_info); - void DestroyWorkerThreadVars(); void CheckOrSetValidThread(); - - // Equivalents of the public functions that are run on the worker - // thread. - void SetStateOnWorkerThread(const std::string& state); - void UpdateCredentialsOnWorkerThread(const std::string& email, - const std::string& token); - void UpdateEnabledTypesOnWorkerThread(const syncable::ModelTypeSet& types); - - MessageLoop* construction_message_loop_; - MessageLoop* method_message_loop_; - - scoped_refptr<ObserverListThreadSafe<SyncNotifierObserver> > observers_; - - base::Thread worker_thread_; - // Created and destroyed on the worker thread. Not a scoped_ptr as - // it's better to leak memory than to delete on the wrong thread. - // Created by CreateWorkerThreadVars() and destroyed by - // DestroyWorkerThreadVars(). - WorkerThreadVars* worker_thread_vars_; - + // The real guts of NonBlockingInvalidationNotifier, which allows this class + // to not be refcounted. + class Core; + scoped_refptr<Core> core_; + scoped_refptr<base::MessageLoopProxy> construction_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> method_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_; DISALLOW_COPY_AND_ASSIGN(NonBlockingInvalidationNotifier); }; } // namespace sync_notifier -// We own our worker thread, so we don't need to be ref-counted. -DISABLE_RUNNABLE_METHOD_REFCOUNT( - sync_notifier::NonBlockingInvalidationNotifier); - #endif // CHROME_BROWSER_SYNC_NOTIFIER_NON_BLOCKING_INVALIDATION_NOTIFIER_H_ diff --git a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier_unittest.cc b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier_unittest.cc index acbcb88..87405d9 100644 --- a/chrome/browser/sync/notifier/non_blocking_invalidation_notifier_unittest.cc +++ b/chrome/browser/sync/notifier/non_blocking_invalidation_notifier_unittest.cc @@ -4,11 +4,14 @@ #include "chrome/browser/sync/notifier/non_blocking_invalidation_notifier.h" +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop.h" #include "chrome/browser/sync/notifier/mock_sync_notifier_observer.h" #include "chrome/browser/sync/syncable/model_type.h" #include "chrome/browser/sync/syncable/model_type_payload_map.h" +#include "chrome/test/test_url_request_context_getter.h" +#include "content/browser/browser_thread.h" #include "jingle/notifier/base/fake_base_task.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" @@ -22,21 +25,28 @@ using ::testing::StrictMock; class NonBlockingInvalidationNotifierTest : public testing::Test { public: - NonBlockingInvalidationNotifierTest() - : invalidation_notifier_(notifier::NotifierOptions(), - "fake_client_info") {} + NonBlockingInvalidationNotifierTest() {} protected: virtual void SetUp() { - invalidation_notifier_.AddObserver(&mock_observer_); + request_context_getter_ = new TestURLRequestContextGetter; + notifier::NotifierOptions notifier_options; + notifier_options.request_context_getter = request_context_getter_; + invalidation_notifier_.reset( + new NonBlockingInvalidationNotifier(notifier_options, + "fake_client_info")); + invalidation_notifier_->AddObserver(&mock_observer_); } virtual void TearDown() { - invalidation_notifier_.RemoveObserver(&mock_observer_); + invalidation_notifier_->RemoveObserver(&mock_observer_); + invalidation_notifier_.reset(); + request_context_getter_ = NULL; } MessageLoop message_loop_; - NonBlockingInvalidationNotifier invalidation_notifier_; + scoped_refptr<net::URLRequestContextGetter> request_context_getter_; + scoped_ptr<NonBlockingInvalidationNotifier> invalidation_notifier_; StrictMock<MockSyncNotifierObserver> mock_observer_; notifier::FakeBaseTask fake_base_task_; }; @@ -46,9 +56,9 @@ TEST_F(NonBlockingInvalidationNotifierTest, Basic) { types.insert(syncable::BOOKMARKS); types.insert(syncable::AUTOFILL); - invalidation_notifier_.SetState("fake_state"); - invalidation_notifier_.UpdateCredentials("foo@bar.com", "fake_token"); - invalidation_notifier_.UpdateEnabledTypes(types); + invalidation_notifier_->SetState("fake_state"); + invalidation_notifier_->UpdateCredentials("foo@bar.com", "fake_token"); + invalidation_notifier_->UpdateEnabledTypes(types); } // TODO(akalin): Add synchronous operations for testing to diff --git a/chrome/browser/sync/notifier/p2p_notifier.cc b/chrome/browser/sync/notifier/p2p_notifier.cc index da7eae4..25c3182 100644 --- a/chrome/browser/sync/notifier/p2p_notifier.cc +++ b/chrome/browser/sync/notifier/p2p_notifier.cc @@ -4,6 +4,7 @@ #include "chrome/browser/sync/notifier/p2p_notifier.h" +#include "base/message_loop_proxy.h" #include "chrome/browser/sync/notifier/sync_notifier_observer.h" #include "chrome/browser/sync/protocol/service_constants.h" #include "chrome/browser/sync/syncable/model_type_payload_map.h" @@ -25,13 +26,13 @@ P2PNotifier::P2PNotifier( notifier_options)), logged_in_(false), notifications_enabled_(false), - construction_message_loop_(MessageLoop::current()), - method_message_loop_(NULL) { + construction_message_loop_proxy_( + base::MessageLoopProxy::CreateForCurrentThread()) { talk_mediator_->SetDelegate(this); } P2PNotifier::~P2PNotifier() { - DCHECK_EQ(MessageLoop::current(), construction_message_loop_); + DCHECK(construction_message_loop_proxy_->BelongsToCurrentThread()); } void P2PNotifier::AddObserver(SyncNotifierObserver* observer) { @@ -143,10 +144,11 @@ void P2PNotifier::MaybeEmitNotification() { } void P2PNotifier::CheckOrSetValidThread() { - if (method_message_loop_) { - DCHECK_EQ(MessageLoop::current(), method_message_loop_); + if (method_message_loop_proxy_) { + DCHECK(method_message_loop_proxy_->BelongsToCurrentThread()); } else { - method_message_loop_ = MessageLoop::current(); + method_message_loop_proxy_ = + base::MessageLoopProxy::CreateForCurrentThread(); } } diff --git a/chrome/browser/sync/notifier/p2p_notifier.h b/chrome/browser/sync/notifier/p2p_notifier.h index 54cbf70..e5b2743 100644 --- a/chrome/browser/sync/notifier/p2p_notifier.h +++ b/chrome/browser/sync/notifier/p2p_notifier.h @@ -10,13 +10,17 @@ #include <string> +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/observer_list.h" #include "chrome/browser/sync/notifier/sync_notifier.h" #include "chrome/browser/sync/syncable/model_type.h" #include "jingle/notifier/listener/talk_mediator.h" -class MessageLoop; +namespace base { +class MessageLoopProxy; +} + namespace notifier { struct NotifierOptions; @@ -64,8 +68,8 @@ class P2PNotifier bool notifications_enabled_; syncable::ModelTypeSet enabled_types_; - MessageLoop* construction_message_loop_; - MessageLoop* method_message_loop_; + scoped_refptr<base::MessageLoopProxy> construction_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> method_message_loop_proxy_; }; } // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/sync_notifier_factory.cc b/chrome/browser/sync/notifier/sync_notifier_factory.cc index 204faeb..1a1a531 100644 --- a/chrome/browser/sync/notifier/sync_notifier_factory.cc +++ b/chrome/browser/sync/notifier/sync_notifier_factory.cc @@ -42,11 +42,14 @@ net::HostPortPair StringToHostPortPair(const std::string& host_port_str, return net::HostPortPair(host, port); } -SyncNotifier* CreateDefaultSyncNotifier(const CommandLine& command_line, - const std::string& client_info) { +SyncNotifier* CreateDefaultSyncNotifier( + const CommandLine& command_line, + const scoped_refptr<net::URLRequestContextGetter>& request_context_getter, + const std::string& client_info) { // Contains options specific to how sync clients send and listen to // jingle notifications. notifier::NotifierOptions notifier_options; + notifier_options.request_context_getter = request_context_getter; // Override the notification server host from the command-line, if provided. if (command_line.HasSwitch(switches::kSyncNotificationHost)) { @@ -99,7 +102,10 @@ SyncNotifierFactory::~SyncNotifierFactory() { } SyncNotifier* SyncNotifierFactory::CreateSyncNotifier( - const CommandLine& command_line) { - return CreateDefaultSyncNotifier(command_line, client_info_); + const CommandLine& command_line, + const scoped_refptr<net::URLRequestContextGetter>& request_context_getter) { + return CreateDefaultSyncNotifier(command_line, + request_context_getter, + client_info_); } } // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/sync_notifier_factory.h b/chrome/browser/sync/notifier/sync_notifier_factory.h index 010ba37..5a0f618 100644 --- a/chrome/browser/sync/notifier/sync_notifier_factory.h +++ b/chrome/browser/sync/notifier/sync_notifier_factory.h @@ -7,8 +7,14 @@ #include <string> +#include "base/memory/ref_counted.h" + class CommandLine; +namespace net { +class URLRequestContextGetter; +} + namespace sync_notifier { class SyncNotifier; @@ -23,7 +29,10 @@ class SyncNotifierFactory { // Creates the appropriate sync notifier. The caller should take ownership // of the object returned and delete it when no longer used. - SyncNotifier* CreateSyncNotifier(const CommandLine& command_line); + SyncNotifier* CreateSyncNotifier( + const CommandLine& command_line, + const scoped_refptr<net::URLRequestContextGetter>& + request_context_getter); private: const std::string client_info_; diff --git a/chrome/browser/sync/profile_sync_service_autofill_unittest.cc b/chrome/browser/sync/profile_sync_service_autofill_unittest.cc index 11cc59e..889c33f 100644 --- a/chrome/browser/sync/profile_sync_service_autofill_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_autofill_unittest.cc @@ -269,6 +269,7 @@ class ProfileSyncServiceAutofillTest : public AbstractProfileSyncServiceTest { } } virtual void SetUp() { + profile_.CreateRequestContext(); web_database_.reset(new WebDatabaseFake(&autofill_table_)); web_data_service_ = new WebDataServiceFake(web_database_.get()); personal_data_manager_ = new PersonalDataManagerMock(); @@ -285,6 +286,7 @@ class ProfileSyncServiceAutofillTest : public AbstractProfileSyncServiceTest { service_.reset(); notification_service_->TearDown(); db_thread_.Stop(); + profile_.ResetRequestContext(); MessageLoop::current()->RunAllPending(); } diff --git a/chrome/browser/sync/profile_sync_service_password_unittest.cc b/chrome/browser/sync/profile_sync_service_password_unittest.cc index 1e665ce..c00ac4b 100644 --- a/chrome/browser/sync/profile_sync_service_password_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_password_unittest.cc @@ -143,6 +143,7 @@ class ProfileSyncServicePasswordTest : public AbstractProfileSyncServiceTest { } virtual void SetUp() { + profile_.CreateRequestContext(); password_store_ = new MockPasswordStore(); db_thread_.Start(); @@ -160,6 +161,7 @@ class ProfileSyncServicePasswordTest : public AbstractProfileSyncServiceTest { service_.reset(); notification_service_->TearDown(); db_thread_.Stop(); + profile_.ResetRequestContext(); MessageLoop::current()->RunAllPending(); } diff --git a/chrome/browser/sync/profile_sync_service_preference_unittest.cc b/chrome/browser/sync/profile_sync_service_preference_unittest.cc index 129c294..98ddfda 100644 --- a/chrome/browser/sync/profile_sync_service_preference_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_preference_unittest.cc @@ -52,6 +52,7 @@ class ProfileSyncServicePreferenceTest virtual void SetUp() { profile_.reset(new TestingProfile()); + profile_->CreateRequestContext(); prefs_ = profile_->GetTestingPrefService(); prefs_->RegisterStringPref(not_synced_preference_name_.c_str(), diff --git a/chrome/browser/sync/profile_sync_service_session_unittest.cc b/chrome/browser/sync/profile_sync_service_session_unittest.cc index ed08d52..62ec5f2 100644 --- a/chrome/browser/sync/profile_sync_service_session_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_session_unittest.cc @@ -66,7 +66,7 @@ class ProfileSyncServiceSessionTest virtual void SetUp() { // BrowserWithTestWindowTest implementation. BrowserWithTestWindowTest::SetUp(); - + profile()->CreateRequestContext(); ASSERT_TRUE(temp_dir_.CreateUniqueTempDir()); SessionService* session_service = new SessionService(temp_dir_.path()); helper_.set_service(session_service); @@ -93,6 +93,7 @@ class ProfileSyncServiceSessionTest helper_.set_service(NULL); profile()->set_session_service(NULL); sync_service_.reset(); + profile()->ResetRequestContext(); } bool StartSyncService(Task* task, bool will_fail_association) { diff --git a/chrome/browser/sync/profile_sync_service_startup_unittest.cc b/chrome/browser/sync/profile_sync_service_startup_unittest.cc index bdc7897..04b2b64 100644 --- a/chrome/browser/sync/profile_sync_service_startup_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_startup_unittest.cc @@ -56,6 +56,7 @@ class ProfileSyncServiceStartupTest : public testing::Test { } virtual void SetUp() { + profile_.CreateRequestContext(); service_.reset(new TestProfileSyncService(&factory_, &profile_, "test", true, NULL)); service_->AddObserver(&observer_); @@ -64,6 +65,7 @@ class ProfileSyncServiceStartupTest : public testing::Test { virtual void TearDown() { service_->RemoveObserver(&observer_); + profile_.ResetRequestContext(); } protected: diff --git a/chrome/browser/sync/profile_sync_service_typed_url_unittest.cc b/chrome/browser/sync/profile_sync_service_typed_url_unittest.cc index f42ccae..e384a73 100644 --- a/chrome/browser/sync/profile_sync_service_typed_url_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_typed_url_unittest.cc @@ -130,6 +130,7 @@ class ProfileSyncServiceTypedUrlTest : public AbstractProfileSyncServiceTest { } virtual void SetUp() { + profile_.CreateRequestContext(); history_backend_ = new HistoryBackendMock(); history_service_ = new HistoryServiceMock(); EXPECT_CALL((*history_service_.get()), ScheduleDBTask(_, _)) @@ -148,6 +149,7 @@ class ProfileSyncServiceTypedUrlTest : public AbstractProfileSyncServiceTest { service_.reset(); notification_service_->TearDown(); history_thread_.Stop(); + profile_.ResetRequestContext(); MessageLoop::current()->RunAllPending(); } diff --git a/chrome/browser/sync/profile_sync_service_unittest.cc b/chrome/browser/sync/profile_sync_service_unittest.cc index b76e7da..5db3ea9 100644 --- a/chrome/browser/sync/profile_sync_service_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_unittest.cc @@ -49,6 +49,12 @@ class ProfileSyncServiceTest : public testing::Test { // Ensure that the sync objects destruct to avoid memory leaks. MessageLoop::current()->RunAllPending(); } + virtual void SetUp() { + profile_->CreateRequestContext(); + } + virtual void TearDown() { + profile_->ResetRequestContext(); + } // TODO(akalin): Refactor the StartSyncService*() functions below. diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc index 5bcf805..1a4382e 100644 --- a/chrome/browser/sync/tools/sync_listen_notifications.cc +++ b/chrome/browser/sync/tools/sync_listen_notifications.cc @@ -9,6 +9,7 @@ #include "base/base64.h" #include "base/command_line.h" #include "base/logging.h" +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop.h" #include "chrome/browser/sync/notifier/sync_notifier.h" @@ -16,6 +17,8 @@ #include "chrome/browser/sync/notifier/sync_notifier_observer.h" #include "chrome/browser/sync/syncable/model_type.h" #include "chrome/browser/sync/syncable/model_type_payload_map.h" +#include "chrome/test/test_url_request_context_getter.h" +#include "content/browser/browser_thread.h" // This is a simple utility that initializes a sync notifier and // listens to any received notifications. @@ -56,6 +59,12 @@ class NotificationPrinter : public sync_notifier::SyncNotifierObserver { int main(int argc, char* argv[]) { base::AtExitManager exit_manager; + scoped_refptr<TestURLRequestContextGetter> request_context_getter( + new TestURLRequestContextGetter); + BrowserThread io_thread(BrowserThread::IO); + base::Thread::Options options; + options.message_loop_type = MessageLoop::TYPE_IO; + io_thread.StartWithOptions(options); CommandLine::Init(argc, argv); logging::InitLogging( NULL, @@ -87,7 +96,8 @@ int main(int argc, char* argv[]) { const char kClientInfo[] = "sync_listen_notifications"; sync_notifier::SyncNotifierFactory sync_notifier_factory(kClientInfo); scoped_ptr<sync_notifier::SyncNotifier> sync_notifier( - sync_notifier_factory.CreateSyncNotifier(command_line)); + sync_notifier_factory.CreateSyncNotifier(command_line, + request_context_getter.get())); NotificationPrinter notification_printer; sync_notifier->AddObserver(¬ification_printer); @@ -105,5 +115,6 @@ int main(int argc, char* argv[]) { main_loop.Run(); sync_notifier->RemoveObserver(¬ification_printer); + io_thread.Stop(); return 0; } diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp index d4e290f..4ebe6ea 100644 --- a/chrome/browser/sync/tools/sync_tools.gyp +++ b/chrome/browser/sync/tools/sync_tools.gyp @@ -13,6 +13,8 @@ 'dependencies': [ '<(DEPTH)/base/base.gyp:base', '<(DEPTH)/chrome/chrome.gyp:sync_notifier', + '<(DEPTH)/chrome/chrome.gyp:test_support_common', + '<(DEPTH)/content/content.gyp:content_browser', ], }, ], diff --git a/chrome/service/cloud_print/cloud_print_proxy_backend.cc b/chrome/service/cloud_print/cloud_print_proxy_backend.cc index 170976c..3f18c6f 100644 --- a/chrome/service/cloud_print/cloud_print_proxy_backend.cc +++ b/chrome/service/cloud_print/cloud_print_proxy_backend.cc @@ -20,6 +20,7 @@ #include "chrome/service/cloud_print/cloud_print_url_fetcher.h" #include "chrome/service/cloud_print/printer_job_handler.h" #include "chrome/service/gaia/service_gaia_authenticator.h" +#include "chrome/service/net/service_url_request_context.h" #include "chrome/service/service_process.h" #include "googleurl/src/gurl.h" #include "grit/generated_resources.h" @@ -371,10 +372,12 @@ void CloudPrintProxyBackend::Core::DoInitializeWithToken( auth_token_ = cloud_print_token; if (result.succeeded()) { - const notifier::NotifierOptions kNotifierOptions; + notifier::NotifierOptions notifier_options; + notifier_options.request_context_getter = + g_service_process->GetServiceURLRequestContextGetter(); talk_mediator_.reset(new notifier::TalkMediatorImpl( - new notifier::MediatorThreadImpl(kNotifierOptions), - kNotifierOptions)); + new notifier::MediatorThreadImpl(notifier_options), + notifier_options)); notifier::Subscription subscription; subscription.channel = kCloudPrintPushNotificationsSource; subscription.channel.append("/proxy/"); diff --git a/jingle/notifier/base/notifier_options.cc b/jingle/notifier/base/notifier_options.cc index 2b4b56a..dbb1d4c 100644 --- a/jingle/notifier/base/notifier_options.cc +++ b/jingle/notifier/base/notifier_options.cc @@ -15,4 +15,6 @@ NotifierOptions::NotifierOptions() notification_method(kDefaultNotificationMethod), auth_mechanism(GaiaTokenPreXmppAuth::kDefaultAuthMechanism) {} +NotifierOptions::~NotifierOptions() { } + } // namespace notifier diff --git a/jingle/notifier/base/notifier_options.h b/jingle/notifier/base/notifier_options.h index 5cd4243..0d7b471 100644 --- a/jingle/notifier/base/notifier_options.h +++ b/jingle/notifier/base/notifier_options.h @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -7,13 +7,16 @@ #include <string> +#include "base/memory/ref_counted.h" #include "jingle/notifier/base/notification_method.h" #include "net/base/host_port_pair.h" +#include "net/url_request/url_request_context_getter.h" namespace notifier { struct NotifierOptions { NotifierOptions(); + ~NotifierOptions(); // Indicates that the SSLTCP port (443) is to be tried before the the XMPP // port (5222) during login. @@ -37,6 +40,9 @@ struct NotifierOptions { // Specifies the auth mechanism to use ("X-GOOGLE-TOKEN", "X-OAUTH2", etc), std::string auth_mechanism; + + // The URLRequestContextGetter to use for doing I/O. + scoped_refptr<net::URLRequestContextGetter> request_context_getter; }; } // namespace notifier diff --git a/jingle/notifier/listener/mediator_thread_impl.cc b/jingle/notifier/listener/mediator_thread_impl.cc index 0d96d01..9ef6ea9 100644 --- a/jingle/notifier/listener/mediator_thread_impl.cc +++ b/jingle/notifier/listener/mediator_thread_impl.cc @@ -5,162 +5,121 @@ #include "jingle/notifier/listener/mediator_thread_impl.h" #include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "base/message_loop.h" +#include "base/observer_list_threadsafe.h" +#include "base/threading/thread.h" #include "jingle/notifier/base/const_communicator.h" #include "jingle/notifier/base/notifier_options_util.h" #include "jingle/notifier/base/task_pump.h" #include "jingle/notifier/communicator/connection_options.h" +#include "jingle/notifier/communicator/login.h" #include "jingle/notifier/communicator/xmpp_connection_generator.h" +#include "jingle/notifier/listener/push_notifications_listen_task.h" #include "jingle/notifier/listener/push_notifications_send_update_task.h" -#include "net/base/cert_verifier.h" +#include "jingle/notifier/listener/push_notifications_subscribe_task.h" #include "net/base/host_port_pair.h" -#include "net/base/host_resolver.h" +#include "net/url_request/url_request_context.h" +#include "net/url_request/url_request_context_getter.h" #include "talk/xmpp/xmppclientsettings.h" namespace notifier { -MediatorThreadImpl::MediatorThreadImpl( +class MediatorThreadImpl::Core + : public base::RefCountedThreadSafe<MediatorThreadImpl::Core>, + public LoginDelegate, + public PushNotificationsListenTaskDelegate, + public PushNotificationsSubscribeTaskDelegate { + public: + // Invoked on the caller thread. + explicit Core(const NotifierOptions& notifier_options); + void AddObserver(Observer* observer); + void RemoveObserver(Observer* observer); + + // Login::Delegate implementation. Called on I/O thread. + virtual void OnConnect(base::WeakPtr<talk_base::Task> base_task); + virtual void OnDisconnect(); + + // PushNotificationsListenTaskDelegate implementation. Called on I/O thread. + virtual void OnNotificationReceived( + const Notification& notification); + // PushNotificationsSubscribeTaskDelegate implementation. Called on I/O + // thread. + virtual void OnSubscribed(); + virtual void OnSubscriptionError(); + + // Helpers invoked on I/O thread. + void Login(const buzz::XmppClientSettings& settings); + void Disconnect(); + void ListenForPushNotifications(); + void SubscribeForPushNotifications( + const SubscriptionList& subscriptions); + void SendNotification(const Notification& data); + void UpdateXmppSettings(const buzz::XmppClientSettings& settings); + + private: + friend class base::RefCountedThreadSafe<MediatorThreadImpl::Core>; + // Invoked on either the caller thread or the I/O thread. + ~Core(); + scoped_refptr<ObserverListThreadSafe<Observer> > observers_; + base::WeakPtr<talk_base::Task> base_task_; + + const NotifierOptions notifier_options_; + + scoped_ptr<notifier::Login> login_; + DISALLOW_COPY_AND_ASSIGN(Core); +}; + +MediatorThreadImpl::Core::Core( const NotifierOptions& notifier_options) : observers_(new ObserverListThreadSafe<Observer>()), - construction_message_loop_(MessageLoop::current()), - method_message_loop_(NULL), - notifier_options_(notifier_options), - worker_thread_("MediatorThread worker thread") { - DCHECK(construction_message_loop_); + notifier_options_(notifier_options) { + DCHECK(notifier_options_.request_context_getter); } -MediatorThreadImpl::~MediatorThreadImpl() { - DCHECK_EQ(MessageLoop::current(), construction_message_loop_); - // If the worker thread is still around, we need to call Logout() so - // that all the variables living it get destroyed properly (i.e., on - // the worker thread). - if (worker_thread_.IsRunning()) { - Logout(); - } +MediatorThreadImpl::Core::~Core() { } -void MediatorThreadImpl::AddObserver(Observer* observer) { - CheckOrSetValidThread(); +void MediatorThreadImpl::Core::AddObserver(Observer* observer) { observers_->AddObserver(observer); } -void MediatorThreadImpl::RemoveObserver(Observer* observer) { - CheckOrSetValidThread(); +void MediatorThreadImpl::Core::RemoveObserver(Observer* observer) { observers_->RemoveObserver(observer); } -void MediatorThreadImpl::Start() { - DCHECK_EQ(MessageLoop::current(), construction_message_loop_); - // We create the worker thread as an IO thread in preparation for - // making this use Chrome sockets. - const base::Thread::Options options(MessageLoop::TYPE_IO, 0); - // TODO(akalin): Make this function return a bool and remove this - // CHECK(). - CHECK(worker_thread_.StartWithOptions(options)); -} - -void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { - CheckOrSetValidThread(); - - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings)); -} - -void MediatorThreadImpl::Logout() { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect)); - // TODO(akalin): Decomp this into a separate stop method. - worker_thread_.Stop(); - // worker_thread_ should have cleaned this up. It is OK to check this - // variable in this thread because worker_thread_ is gone by now. - CHECK(!login_.get()); -} - -void MediatorThreadImpl::ListenForUpdates() { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, - &MediatorThreadImpl::ListenForPushNotifications)); -} - -void MediatorThreadImpl::SubscribeForUpdates( - const SubscriptionList& subscriptions) { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &MediatorThreadImpl::SubscribeForPushNotifications, - subscriptions)); -} - -void MediatorThreadImpl::SendNotification( - const Notification& data) { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification, - data)); -} - -void MediatorThreadImpl::UpdateXmppSettings( - const buzz::XmppClientSettings& settings) { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, - &MediatorThreadImpl::DoUpdateXmppSettings, - settings)); -} - -MessageLoop* MediatorThreadImpl::worker_message_loop() { - MessageLoop* current_message_loop = MessageLoop::current(); - DCHECK(current_message_loop); - MessageLoop* worker_message_loop = worker_thread_.message_loop(); - DCHECK(worker_message_loop); - DCHECK(current_message_loop == method_message_loop_ || - current_message_loop == worker_message_loop); - return worker_message_loop; -} - - -void MediatorThreadImpl::DoLogin( - const buzz::XmppClientSettings& settings) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::Login(const buzz::XmppClientSettings& settings) { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); VLOG(1) << "P2P: Thread logging into talk network."; + // TODO(sanjeevr): Pass in the URLRequestContextGetter to Login. base_task_.reset(); - - host_resolver_.reset( - net::CreateSystemHostResolver(net::HostResolver::kDefaultParallelism, - NULL, NULL)); - cert_verifier_.reset(new net::CertVerifier); login_.reset(new notifier::Login(this, settings, notifier::ConnectionOptions(), - host_resolver_.get(), - cert_verifier_.get(), + notifier_options_.request_context_getter-> + GetURLRequestContext()->host_resolver(), + notifier_options_.request_context_getter-> + GetURLRequestContext()->cert_verifier(), GetServerList(notifier_options_), notifier_options_.try_ssltcp_first, notifier_options_.auth_mechanism)); login_->StartConnection(); } -void MediatorThreadImpl::DoDisconnect() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::Disconnect() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); VLOG(1) << "P2P: Thread logging out of talk network."; login_.reset(); - cert_verifier_.reset(); - host_resolver_.reset(); base_task_.reset(); } -void MediatorThreadImpl::ListenForPushNotifications() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::ListenForPushNotifications() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); if (!base_task_.get()) return; PushNotificationsListenTask* listener = @@ -168,36 +127,39 @@ void MediatorThreadImpl::ListenForPushNotifications() { listener->Start(); } -void MediatorThreadImpl::SubscribeForPushNotifications( +void MediatorThreadImpl::Core::SubscribeForPushNotifications( const SubscriptionList& subscriptions) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); if (!base_task_.get()) return; - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); PushNotificationsSubscribeTask* subscribe_task = new PushNotificationsSubscribeTask(base_task_, subscriptions, this); subscribe_task->Start(); } -void MediatorThreadImpl::OnSubscribed() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnSubscribed() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); observers_->Notify(&Observer::OnSubscriptionStateChange, true); } -void MediatorThreadImpl::OnSubscriptionError() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnSubscriptionError() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); observers_->Notify(&Observer::OnSubscriptionStateChange, false); } -void MediatorThreadImpl::OnNotificationReceived( +void MediatorThreadImpl::Core::OnNotificationReceived( const Notification& notification) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); observers_->Notify(&Observer::OnIncomingNotification, notification); } -void MediatorThreadImpl::DoSendNotification( - const Notification& data) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::SendNotification(const Notification& data) { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); if (!base_task_.get()) { return; } @@ -208,9 +170,10 @@ void MediatorThreadImpl::DoSendNotification( observers_->Notify(&Observer::OnOutgoingNotification); } -void MediatorThreadImpl::DoUpdateXmppSettings( +void MediatorThreadImpl::Core::UpdateXmppSettings( const buzz::XmppClientSettings& settings) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); VLOG(1) << "P2P: Thread Updating login settings."; // The caller should only call UpdateXmppSettings after a Login call. if (login_.get()) @@ -220,24 +183,115 @@ void MediatorThreadImpl::DoUpdateXmppSettings( "P2P: Thread UpdateXmppSettings called when login_ was NULL"; } - -void MediatorThreadImpl::OnConnect(base::WeakPtr<talk_base::Task> base_task) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnConnect( + base::WeakPtr<talk_base::Task> base_task) { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); base_task_ = base_task; observers_->Notify(&Observer::OnConnectionStateChange, true); } -void MediatorThreadImpl::OnDisconnect() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnDisconnect() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); base_task_.reset(); observers_->Notify(&Observer::OnConnectionStateChange, false); } + +MediatorThreadImpl::MediatorThreadImpl(const NotifierOptions& notifier_options) + : core_(new Core(notifier_options)), + construction_message_loop_proxy_( + base::MessageLoopProxy::CreateForCurrentThread()), + io_message_loop_proxy_( + notifier_options.request_context_getter->GetIOMessageLoopProxy()) { +} + +MediatorThreadImpl::~MediatorThreadImpl() { + DCHECK(construction_message_loop_proxy_->BelongsToCurrentThread()); + LogoutImpl(); +} + +void MediatorThreadImpl::AddObserver(Observer* observer) { + CheckOrSetValidThread(); + core_->AddObserver(observer); +} + +void MediatorThreadImpl::RemoveObserver(Observer* observer) { + CheckOrSetValidThread(); + core_->RemoveObserver(observer); +} + +void MediatorThreadImpl::Start() { + DCHECK(construction_message_loop_proxy_->BelongsToCurrentThread()); +} + +void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::Login, + settings)); +} + +void MediatorThreadImpl::Logout() { + CheckOrSetValidThread(); + LogoutImpl(); +} + +void MediatorThreadImpl::ListenForUpdates() { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::ListenForPushNotifications)); +} + +void MediatorThreadImpl::SubscribeForUpdates( + const SubscriptionList& subscriptions) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod( + core_.get(), + &MediatorThreadImpl::Core::SubscribeForPushNotifications, + subscriptions)); +} + +void MediatorThreadImpl::SendNotification( + const Notification& data) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::SendNotification, + data)); +} + +void MediatorThreadImpl::UpdateXmppSettings( + const buzz::XmppClientSettings& settings) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::UpdateXmppSettings, + settings)); +} + +void MediatorThreadImpl::LogoutImpl() { + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::Disconnect)); +} + void MediatorThreadImpl::CheckOrSetValidThread() { - if (method_message_loop_) { - DCHECK_EQ(MessageLoop::current(), method_message_loop_); + if (method_message_loop_proxy_) { + DCHECK(method_message_loop_proxy_->BelongsToCurrentThread()); } else { - method_message_loop_ = MessageLoop::current(); + method_message_loop_proxy_ = + base::MessageLoopProxy::CreateForCurrentThread(); } } diff --git a/jingle/notifier/listener/mediator_thread_impl.h b/jingle/notifier/listener/mediator_thread_impl.h index 0d1f1c4..4015a81 100644 --- a/jingle/notifier/listener/mediator_thread_impl.h +++ b/jingle/notifier/listener/mediator_thread_impl.h @@ -25,32 +25,21 @@ #include "base/basictypes.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" -#include "base/memory/weak_ptr.h" -#include "base/observer_list_threadsafe.h" #include "base/task.h" -#include "base/threading/thread.h" #include "jingle/notifier/base/notifier_options.h" -#include "jingle/notifier/communicator/login.h" #include "jingle/notifier/listener/mediator_thread.h" -#include "jingle/notifier/listener/push_notifications_listen_task.h" -#include "jingle/notifier/listener/push_notifications_subscribe_task.h" -class MessageLoop; +namespace base { +class MessageLoopProxy; +} namespace buzz { class XmppClientSettings; } // namespace buzz -namespace net { -class HostResolver; -} // namespace net - namespace notifier { -class MediatorThreadImpl : public MediatorThread, public LoginDelegate, - public PushNotificationsListenTaskDelegate, - public PushNotificationsSubscribeTaskDelegate { +class MediatorThreadImpl : public MediatorThread { public: explicit MediatorThreadImpl(const NotifierOptions& notifier_options); virtual ~MediatorThreadImpl(); @@ -70,54 +59,21 @@ class MediatorThreadImpl : public MediatorThread, public LoginDelegate, virtual void SendNotification(const Notification& data); virtual void UpdateXmppSettings(const buzz::XmppClientSettings& settings); - // Login::Delegate implementation. - virtual void OnConnect(base::WeakPtr<talk_base::Task> base_task); - virtual void OnDisconnect(); - - // PushNotificationsListenTaskDelegate implementation. - virtual void OnNotificationReceived( - const Notification& notification); - // PushNotificationsSubscribeTaskDelegate implementation. - virtual void OnSubscribed(); - virtual void OnSubscriptionError(); - - protected: - // Should only be called after Start(). - MessageLoop* worker_message_loop(); - - scoped_refptr<ObserverListThreadSafe<Observer> > observers_; - MessageLoop* construction_message_loop_; - MessageLoop* method_message_loop_; - base::WeakPtr<talk_base::Task> base_task_; - private: - void DoLogin(const buzz::XmppClientSettings& settings); - void DoDisconnect(); - - // Helpers invoked on worker thread. - void ListenForPushNotifications(); - void SubscribeForPushNotifications( - const SubscriptionList& subscriptions); - - void DoSendNotification( - const Notification& data); - void DoUpdateXmppSettings(const buzz::XmppClientSettings& settings); void CheckOrSetValidThread(); - - const NotifierOptions notifier_options_; - - base::Thread worker_thread_; - scoped_ptr<net::HostResolver> host_resolver_; - scoped_ptr<net::CertVerifier> cert_verifier_; - - scoped_ptr<notifier::Login> login_; - + // The logic of Logout without the thread check so it can be called in the + // d'tor. + void LogoutImpl(); + // The real guts of MediatorThreadImpl, which allows this class to not be + // refcounted. + class Core; + scoped_refptr<Core> core_; + scoped_refptr<base::MessageLoopProxy> construction_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> method_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_; DISALLOW_COPY_AND_ASSIGN(MediatorThreadImpl); }; } // namespace notifier -// We manage the lifetime of notifier::MediatorThreadImpl ourselves. -DISABLE_RUNNABLE_METHOD_REFCOUNT(notifier::MediatorThreadImpl); - #endif // JINGLE_NOTIFIER_LISTENER_MEDIATOR_THREAD_IMPL_H_ |
