// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "sync/notifier/non_blocking_invalidator.h" #include #include "base/location.h" #include "base/logging.h" #include "base/memory/scoped_ptr.h" #include "base/single_thread_task_runner.h" #include "base/thread_task_runner_handle.h" #include "base/threading/thread.h" #include "jingle/notifier/listener/push_client.h" #include "sync/notifier/gcm_network_channel_delegate.h" #include "sync/notifier/invalidation_notifier.h" #include "sync/notifier/object_id_invalidation_map.h" #include "sync/notifier/sync_system_resources.h" namespace syncer { struct NonBlockingInvalidator::InitializeOptions { InitializeOptions( NetworkChannelCreator network_channel_creator, const std::string& invalidator_client_id, const UnackedInvalidationsMap& saved_invalidations, const std::string& invalidation_bootstrap_data, const WeakHandle& invalidation_state_tracker, const std::string& client_info, scoped_refptr request_context_getter) : network_channel_creator(network_channel_creator), invalidator_client_id(invalidator_client_id), saved_invalidations(saved_invalidations), invalidation_bootstrap_data(invalidation_bootstrap_data), invalidation_state_tracker(invalidation_state_tracker), client_info(client_info), request_context_getter(request_context_getter) { } NetworkChannelCreator network_channel_creator; std::string invalidator_client_id; UnackedInvalidationsMap saved_invalidations; std::string invalidation_bootstrap_data; WeakHandle invalidation_state_tracker; std::string client_info; scoped_refptr request_context_getter; }; namespace { // This class provides a wrapper for a logging class in order to receive // callbacks across threads, without having to worry about owner threads. class CallbackProxy { public: explicit CallbackProxy( base::Callback callback); ~CallbackProxy(); void Run(const base::DictionaryValue& value); private: static void DoRun(base::Callback callback, scoped_ptr value); base::Callback callback_; scoped_refptr running_thread_; DISALLOW_COPY_AND_ASSIGN(CallbackProxy); }; CallbackProxy::CallbackProxy( base::Callback callback) : callback_(callback), running_thread_(base::ThreadTaskRunnerHandle::Get()) {} CallbackProxy::~CallbackProxy() {} void CallbackProxy::DoRun( base::Callback callback, scoped_ptr value) { callback.Run(*value); } void CallbackProxy::Run(const base::DictionaryValue& value) { scoped_ptr copied(value.DeepCopy()); running_thread_->PostTask( FROM_HERE, base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied))); } } class NonBlockingInvalidator::Core : public base::RefCountedThreadSafe, // InvalidationHandler to observe the InvalidationNotifier we create. public InvalidationHandler { public: // Called on parent thread. |delegate_observer| should be // initialized. explicit Core( const WeakHandle& delegate_observer); // Helpers called on I/O thread. void Initialize( const NonBlockingInvalidator::InitializeOptions& initialize_options); void Teardown(); void UpdateRegisteredIds(const ObjectIdSet& ids); void UpdateCredentials(const std::string& email, const std::string& token); void RequestDetailedStatus( base::Callback callback) const; // InvalidationHandler implementation (all called on I/O thread by // InvalidationNotifier). virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE; virtual void OnIncomingInvalidation( const ObjectIdInvalidationMap& invalidation_map) OVERRIDE; virtual std::string GetOwnerName() const OVERRIDE; private: friend class base::RefCountedThreadSafe; // Called on parent or I/O thread. virtual ~Core(); // The variables below should be used only on the I/O thread. const WeakHandle delegate_observer_; scoped_ptr invalidation_notifier_; scoped_refptr network_task_runner_; DISALLOW_COPY_AND_ASSIGN(Core); }; NonBlockingInvalidator::Core::Core( const WeakHandle& delegate_observer) : delegate_observer_(delegate_observer) { DCHECK(delegate_observer_.IsInitialized()); } NonBlockingInvalidator::Core::~Core() { } void NonBlockingInvalidator::Core::Initialize( const NonBlockingInvalidator::InitializeOptions& initialize_options) { DCHECK(initialize_options.request_context_getter.get()); network_task_runner_ = initialize_options.request_context_getter->GetNetworkTaskRunner(); DCHECK(network_task_runner_->BelongsToCurrentThread()); scoped_ptr network_channel = initialize_options.network_channel_creator.Run(); invalidation_notifier_.reset( new InvalidationNotifier( network_channel.Pass(), initialize_options.invalidator_client_id, initialize_options.saved_invalidations, initialize_options.invalidation_bootstrap_data, initialize_options.invalidation_state_tracker, initialize_options.client_info)); invalidation_notifier_->RegisterHandler(this); } void NonBlockingInvalidator::Core::Teardown() { DCHECK(network_task_runner_->BelongsToCurrentThread()); invalidation_notifier_->UnregisterHandler(this); invalidation_notifier_.reset(); network_task_runner_ = NULL; } void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) { DCHECK(network_task_runner_->BelongsToCurrentThread()); invalidation_notifier_->UpdateRegisteredIds(this, ids); } void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email, const std::string& token) { DCHECK(network_task_runner_->BelongsToCurrentThread()); invalidation_notifier_->UpdateCredentials(email, token); } void NonBlockingInvalidator::Core::RequestDetailedStatus( base::Callback callback) const { DCHECK(network_task_runner_->BelongsToCurrentThread()); invalidation_notifier_->RequestDetailedStatus(callback); } void NonBlockingInvalidator::Core::OnInvalidatorStateChange( InvalidatorState reason) { DCHECK(network_task_runner_->BelongsToCurrentThread()); delegate_observer_.Call( FROM_HERE, &InvalidationHandler::OnInvalidatorStateChange, reason); } void NonBlockingInvalidator::Core::OnIncomingInvalidation( const ObjectIdInvalidationMap& invalidation_map) { DCHECK(network_task_runner_->BelongsToCurrentThread()); delegate_observer_.Call(FROM_HERE, &InvalidationHandler::OnIncomingInvalidation, invalidation_map); } std::string NonBlockingInvalidator::Core::GetOwnerName() const { return "Sync"; } NonBlockingInvalidator::NonBlockingInvalidator( NetworkChannelCreator network_channel_creator, const std::string& invalidator_client_id, const UnackedInvalidationsMap& saved_invalidations, const std::string& invalidation_bootstrap_data, const WeakHandle& invalidation_state_tracker, const std::string& client_info, const scoped_refptr& request_context_getter) : parent_task_runner_(base::ThreadTaskRunnerHandle::Get()), network_task_runner_(request_context_getter->GetNetworkTaskRunner()), weak_ptr_factory_(this) { core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr())); InitializeOptions initialize_options( network_channel_creator, invalidator_client_id, saved_invalidations, invalidation_bootstrap_data, invalidation_state_tracker, client_info, request_context_getter); if (!network_task_runner_->PostTask( FROM_HERE, base::Bind( &NonBlockingInvalidator::Core::Initialize, core_.get(), initialize_options))) { NOTREACHED(); } } NonBlockingInvalidator::~NonBlockingInvalidator() { DCHECK(parent_task_runner_->BelongsToCurrentThread()); if (!network_task_runner_->PostTask( FROM_HERE, base::Bind(&NonBlockingInvalidator::Core::Teardown, core_.get()))) { DVLOG(1) << "Network thread stopped before invalidator is destroyed."; } } void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) { DCHECK(parent_task_runner_->BelongsToCurrentThread()); registrar_.RegisterHandler(handler); } void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler, const ObjectIdSet& ids) { DCHECK(parent_task_runner_->BelongsToCurrentThread()); registrar_.UpdateRegisteredIds(handler, ids); if (!network_task_runner_->PostTask( FROM_HERE, base::Bind( &NonBlockingInvalidator::Core::UpdateRegisteredIds, core_.get(), registrar_.GetAllRegisteredIds()))) { NOTREACHED(); } } void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) { DCHECK(parent_task_runner_->BelongsToCurrentThread()); registrar_.UnregisterHandler(handler); } InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const { DCHECK(parent_task_runner_->BelongsToCurrentThread()); return registrar_.GetInvalidatorState(); } void NonBlockingInvalidator::UpdateCredentials(const std::string& email, const std::string& token) { DCHECK(parent_task_runner_->BelongsToCurrentThread()); if (!network_task_runner_->PostTask( FROM_HERE, base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials, core_.get(), email, token))) { NOTREACHED(); } } void NonBlockingInvalidator::RequestDetailedStatus( base::Callback callback) const { DCHECK(parent_task_runner_->BelongsToCurrentThread()); base::Callback proxy_callback = base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback))); if (!network_task_runner_->PostTask( FROM_HERE, base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus, core_.get(), proxy_callback))) { NOTREACHED(); } } void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) { DCHECK(parent_task_runner_->BelongsToCurrentThread()); registrar_.UpdateInvalidatorState(state); } void NonBlockingInvalidator::OnIncomingInvalidation( const ObjectIdInvalidationMap& invalidation_map) { DCHECK(parent_task_runner_->BelongsToCurrentThread()); registrar_.DispatchInvalidationsToHandlers(invalidation_map); } std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; } NetworkChannelCreator NonBlockingInvalidator::MakePushClientChannelCreator( const notifier::NotifierOptions& notifier_options) { return base::Bind(SyncNetworkChannel::CreatePushClientChannel, notifier_options); } NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator( scoped_refptr request_context_getter, scoped_ptr delegate) { return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel, request_context_getter, base::Passed(&delegate)); } } // namespace syncer