// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "sync/sessions/model_type_registry.h" #include "base/bind.h" #include "base/observer_list.h" #include "base/thread_task_runner_handle.h" #include "sync/engine/directory_commit_contributor.h" #include "sync/engine/directory_update_handler.h" #include "sync/engine/model_type_sync_proxy.h" #include "sync/engine/model_type_sync_proxy_impl.h" #include "sync/engine/model_type_sync_worker.h" #include "sync/engine/model_type_sync_worker_impl.h" #include "sync/internal_api/public/non_blocking_sync_common.h" #include "sync/sessions/directory_type_debug_info_emitter.h" #include "sync/util/cryptographer.h" namespace syncer { namespace { class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy { public: ModelTypeSyncProxyWrapper( const base::WeakPtr& proxy, const scoped_refptr& processor_task_runner); ~ModelTypeSyncProxyWrapper() override; void OnCommitCompleted( const syncer_v2::DataTypeState& type_state, const syncer_v2::CommitResponseDataList& response_list) override; void OnUpdateReceived( const syncer_v2::DataTypeState& type_state, const syncer_v2::UpdateResponseDataList& response_list, const syncer_v2::UpdateResponseDataList& pending_updates) override; private: base::WeakPtr processor_; scoped_refptr processor_task_runner_; }; ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper( const base::WeakPtr& proxy, const scoped_refptr& processor_task_runner) : processor_(proxy), processor_task_runner_(processor_task_runner) { } ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() { } void ModelTypeSyncProxyWrapper::OnCommitCompleted( const syncer_v2::DataTypeState& type_state, const syncer_v2::CommitResponseDataList& response_list) { processor_task_runner_->PostTask( FROM_HERE, base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted, processor_, type_state, response_list)); } void ModelTypeSyncProxyWrapper::OnUpdateReceived( const syncer_v2::DataTypeState& type_state, const syncer_v2::UpdateResponseDataList& response_list, const syncer_v2::UpdateResponseDataList& pending_updates) { processor_task_runner_->PostTask( FROM_HERE, base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived, processor_, type_state, response_list, pending_updates)); } class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker { public: ModelTypeSyncWorkerWrapper( const base::WeakPtr& worker, const scoped_refptr& sync_thread); ~ModelTypeSyncWorkerWrapper() override; void EnqueueForCommit(const syncer_v2::CommitRequestDataList& list) override; private: base::WeakPtr worker_; scoped_refptr sync_thread_; }; ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper( const base::WeakPtr& worker, const scoped_refptr& sync_thread) : worker_(worker), sync_thread_(sync_thread) { } ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() { } void ModelTypeSyncWorkerWrapper::EnqueueForCommit( const syncer_v2::CommitRequestDataList& list) { sync_thread_->PostTask( FROM_HERE, base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list)); } } // namespace ModelTypeRegistry::ModelTypeRegistry( const std::vector >& workers, syncable::Directory* directory, NudgeHandler* nudge_handler) : directory_(directory), nudge_handler_(nudge_handler), weak_ptr_factory_(this) { for (size_t i = 0u; i < workers.size(); ++i) { workers_map_.insert( std::make_pair(workers[i]->GetModelSafeGroup(), workers[i])); } } ModelTypeRegistry::~ModelTypeRegistry() { } void ModelTypeRegistry::SetEnabledDirectoryTypes( const ModelSafeRoutingInfo& routing_info) { // Remove all existing directory processors and delete them. The // DebugInfoEmitters are not deleted here, since we want to preserve their // counters. for (ModelTypeSet::Iterator it = enabled_directory_types_.First(); it.Good(); it.Inc()) { size_t result1 = update_handler_map_.erase(it.Get()); size_t result2 = commit_contributor_map_.erase(it.Get()); DCHECK_EQ(1U, result1); DCHECK_EQ(1U, result2); } // Clear the old instances of directory update handlers and commit // contributors, deleting their contents in the processs. directory_update_handlers_.clear(); directory_commit_contributors_.clear(); // Create new ones and add them to the appropriate containers. for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin(); routing_iter != routing_info.end(); ++routing_iter) { ModelType type = routing_iter->first; ModelSafeGroup group = routing_iter->second; std::map >::iterator worker_it = workers_map_.find(group); DCHECK(worker_it != workers_map_.end()); scoped_refptr worker = worker_it->second; // DebugInfoEmitters are never deleted. Use existing one if we have it. DirectoryTypeDebugInfoEmitter* emitter = NULL; DirectoryTypeDebugInfoEmitterMap::iterator it = directory_type_debug_info_emitter_map_.find(type); if (it != directory_type_debug_info_emitter_map_.end()) { emitter = it->second; } else { emitter = new DirectoryTypeDebugInfoEmitter(directory_, type, &type_debug_info_observers_); directory_type_debug_info_emitter_map_.insert( std::make_pair(type, emitter)); directory_type_debug_info_emitters_.push_back(emitter); } DirectoryCommitContributor* committer = new DirectoryCommitContributor(directory_, type, emitter); DirectoryUpdateHandler* updater = new DirectoryUpdateHandler(directory_, type, worker, emitter); // These containers take ownership of their contents. directory_commit_contributors_.push_back(committer); directory_update_handlers_.push_back(updater); bool inserted1 = update_handler_map_.insert(std::make_pair(type, updater)).second; DCHECK(inserted1) << "Attempt to override existing type handler in map"; bool inserted2 = commit_contributor_map_.insert(std::make_pair(type, committer)).second; DCHECK(inserted2) << "Attempt to override existing type handler in map"; } enabled_directory_types_ = GetRoutingInfoTypes(routing_info); DCHECK(Intersection(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()).Empty()); } void ModelTypeRegistry::ConnectSyncTypeToWorker( ModelType type, const syncer_v2::DataTypeState& data_type_state, const syncer_v2::UpdateResponseDataList& saved_pending_updates, const scoped_refptr& type_task_runner, const base::WeakPtr& proxy_impl) { DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); // Initialize Worker -> Proxy communication channel. scoped_ptr proxy( new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner)); scoped_ptr cryptographer_copy; if (encrypted_types_.Has(type)) cryptographer_copy.reset(new Cryptographer(*cryptographer_)); scoped_ptr worker( new ModelTypeSyncWorkerImpl(type, data_type_state, saved_pending_updates, cryptographer_copy.Pass(), nudge_handler_, proxy.Pass())); // Initialize Proxy -> Worker communication channel. scoped_ptr wrapped_worker( new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(), scoped_refptr( base::ThreadTaskRunnerHandle::Get()))); type_task_runner->PostTask(FROM_HERE, base::Bind(&ModelTypeSyncProxyImpl::OnConnect, proxy_impl, base::Passed(&wrapped_worker))); DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); update_handler_map_.insert(std::make_pair(type, worker.get())); commit_contributor_map_.insert(std::make_pair(type, worker.get())); // The container takes ownership. model_type_sync_workers_.push_back(worker.Pass()); DCHECK(Intersection(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()).Empty()); } void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) { DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type); DCHECK(update_handler_map_.find(type) != update_handler_map_.end()); DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end()); size_t updaters_erased = update_handler_map_.erase(type); size_t committers_erased = commit_contributor_map_.erase(type); DCHECK_EQ(1U, updaters_erased); DCHECK_EQ(1U, committers_erased); // Remove from the ScopedVector, deleting the worker in the process. for (ScopedVector::iterator it = model_type_sync_workers_.begin(); it != model_type_sync_workers_.end(); ++it) { if ((*it)->GetModelType() == type) { model_type_sync_workers_.erase(it); break; } } } ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const { return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()); } UpdateHandlerMap* ModelTypeRegistry::update_handler_map() { return &update_handler_map_; } CommitContributorMap* ModelTypeRegistry::commit_contributor_map() { return &commit_contributor_map_; } DirectoryTypeDebugInfoEmitterMap* ModelTypeRegistry::directory_type_debug_info_emitter_map() { return &directory_type_debug_info_emitter_map_; } void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver( syncer::TypeDebugInfoObserver* observer) { if (!type_debug_info_observers_.HasObserver(observer)) type_debug_info_observers_.AddObserver(observer); } void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver( syncer::TypeDebugInfoObserver* observer) { type_debug_info_observers_.RemoveObserver(observer); } bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver( const syncer::TypeDebugInfoObserver* observer) const { return type_debug_info_observers_.HasObserver(observer); } void ModelTypeRegistry::RequestEmitDebugInfo() { for (DirectoryTypeDebugInfoEmitterMap::iterator it = directory_type_debug_info_emitter_map_.begin(); it != directory_type_debug_info_emitter_map_.end(); ++it) { it->second->EmitCommitCountersUpdate(); it->second->EmitUpdateCountersUpdate(); it->second->EmitStatusCountersUpdate(); } } base::WeakPtr ModelTypeRegistry::AsWeakPtr() { return weak_ptr_factory_.GetWeakPtr(); } void ModelTypeRegistry::OnPassphraseRequired( PassphraseRequiredReason reason, const sync_pb::EncryptedData& pending_keys) { } void ModelTypeRegistry::OnPassphraseAccepted() { } void ModelTypeRegistry::OnBootstrapTokenUpdated( const std::string& bootstrap_token, BootstrapTokenType type) { } void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, bool encrypt_everything) { encrypted_types_ = encrypted_types; OnEncryptionStateChanged(); } void ModelTypeRegistry::OnEncryptionComplete() { } void ModelTypeRegistry::OnCryptographerStateChanged( Cryptographer* cryptographer) { cryptographer_.reset(new Cryptographer(*cryptographer)); OnEncryptionStateChanged(); } void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type, base::Time passphrase_time) { } void ModelTypeRegistry::OnLocalSetPassphraseEncryption( const SyncEncryptionHandler::NigoriState& nigori_state) { } ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const { return enabled_directory_types_; } void ModelTypeRegistry::OnEncryptionStateChanged() { for (ScopedVector::iterator it = model_type_sync_workers_.begin(); it != model_type_sync_workers_.end(); ++it) { if (encrypted_types_.Has((*it)->GetModelType())) { (*it)->UpdateCryptographer( make_scoped_ptr(new Cryptographer(*cryptographer_))); } } } ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const { ModelTypeSet enabled_off_thread_types; for (ScopedVector::const_iterator it = model_type_sync_workers_.begin(); it != model_type_sync_workers_.end(); ++it) { enabled_off_thread_types.Put((*it)->GetModelType()); } return enabled_off_thread_types; } } // namespace syncer