// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "sync/internal_api/sync_manager_impl.h" #include #include "base/base64.h" #include "base/bind.h" #include "base/callback.h" #include "base/compiler_specific.h" #include "base/json/json_writer.h" #include "base/memory/ref_counted.h" #include "base/metrics/histogram.h" #include "base/observer_list.h" #include "base/strings/string_number_conversions.h" #include "base/values.h" #include "sync/engine/sync_scheduler.h" #include "sync/engine/syncer_types.h" #include "sync/internal_api/change_reorder_buffer.h" #include "sync/internal_api/public/base/cancelation_signal.h" #include "sync/internal_api/public/base/model_type.h" #include "sync/internal_api/public/base_node.h" #include "sync/internal_api/public/configure_reason.h" #include "sync/internal_api/public/engine/polling_constants.h" #include "sync/internal_api/public/http_post_provider_factory.h" #include "sync/internal_api/public/internal_components_factory.h" #include "sync/internal_api/public/read_node.h" #include "sync/internal_api/public/read_transaction.h" #include "sync/internal_api/public/user_share.h" #include "sync/internal_api/public/util/experiments.h" #include "sync/internal_api/public/write_node.h" #include "sync/internal_api/public/write_transaction.h" #include "sync/internal_api/syncapi_internal.h" #include "sync/internal_api/syncapi_server_connection_manager.h" #include "sync/js/js_arg_list.h" #include "sync/js/js_event_details.h" #include "sync/js/js_event_handler.h" #include "sync/js/js_reply_handler.h" #include "sync/notifier/invalidation_util.h" #include "sync/notifier/invalidator.h" #include "sync/notifier/object_id_invalidation_map.h" #include "sync/protocol/proto_value_conversions.h" #include "sync/protocol/sync.pb.h" #include "sync/syncable/directory.h" #include "sync/syncable/entry.h" #include "sync/syncable/in_memory_directory_backing_store.h" #include "sync/syncable/on_disk_directory_backing_store.h" using base::TimeDelta; using sync_pb::GetUpdatesCallerInfo; namespace syncer { using sessions::SyncSessionContext; using syncable::ImmutableWriteTransactionInfo; using syncable::SPECIFICS; using syncable::UNIQUE_POSITION; namespace { // Delays for syncer nudges. static const int kDefaultNudgeDelayMilliseconds = 200; static const int kPreferencesNudgeDelayMilliseconds = 2000; static const int kSyncRefreshDelayMsec = 500; static const int kSyncSchedulerDelayMsec = 250; // Maximum count and size for traffic recorder. static const unsigned int kMaxMessagesToRecord = 10; static const unsigned int kMaxMessageSizeToRecord = 5 * 1024; GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason( ConfigureReason reason) { switch (reason) { case CONFIGURE_REASON_RECONFIGURATION: return GetUpdatesCallerInfo::RECONFIGURATION; case CONFIGURE_REASON_MIGRATION: return GetUpdatesCallerInfo::MIGRATION; case CONFIGURE_REASON_NEW_CLIENT: return GetUpdatesCallerInfo::NEW_CLIENT; case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE: case CONFIGURE_REASON_CRYPTO: return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE; default: NOTREACHED(); } return GetUpdatesCallerInfo::UNKNOWN; } } // namespace // A class to calculate nudge delays for types. class NudgeStrategy { public: static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type, SyncManagerImpl* core) { NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type); return GetNudgeDelayTimeDeltaFromType(delay_type, model_type, core); } private: // Possible types of nudge delay for datatypes. // Note: These are just hints. If a sync happens then all dirty entries // would be committed as part of the sync. enum NudgeDelayStrategy { // Sync right away. IMMEDIATE, // Sync this change while syncing another change. ACCOMPANY_ONLY, // The datatype does not use one of the predefined wait times but defines // its own wait time logic for nudge. CUSTOM, }; static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) { switch (type) { case AUTOFILL: return ACCOMPANY_ONLY; case PREFERENCES: case SESSIONS: case FAVICON_IMAGES: case FAVICON_TRACKING: return CUSTOM; default: return IMMEDIATE; } } static TimeDelta GetNudgeDelayTimeDeltaFromType( const NudgeDelayStrategy& delay_type, const ModelType& model_type, const SyncManagerImpl* core) { CHECK(core); TimeDelta delay = TimeDelta::FromMilliseconds( kDefaultNudgeDelayMilliseconds); switch (delay_type) { case IMMEDIATE: delay = TimeDelta::FromMilliseconds( kDefaultNudgeDelayMilliseconds); break; case ACCOMPANY_ONLY: delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds); break; case CUSTOM: switch (model_type) { case PREFERENCES: delay = TimeDelta::FromMilliseconds( kPreferencesNudgeDelayMilliseconds); break; case SESSIONS: case FAVICON_IMAGES: case FAVICON_TRACKING: delay = core->scheduler()->GetSessionsCommitDelay(); break; default: NOTREACHED(); } break; default: NOTREACHED(); } return delay; } }; SyncManagerImpl::SyncManagerImpl(const std::string& name) : name_(name), change_delegate_(NULL), initialized_(false), observing_network_connectivity_changes_(false), invalidator_state_(DEFAULT_INVALIDATION_ERROR), traffic_recorder_(kMaxMessagesToRecord, kMaxMessageSizeToRecord), encryptor_(NULL), report_unrecoverable_error_function_(NULL), weak_ptr_factory_(this) { // Pre-fill |notification_info_map_|. for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { notification_info_map_.insert( std::make_pair(ModelTypeFromInt(i), NotificationInfo())); } // Bind message handlers. BindJsMessageHandler( "getNotificationState", &SyncManagerImpl::GetNotificationState); BindJsMessageHandler( "getNotificationInfo", &SyncManagerImpl::GetNotificationInfo); BindJsMessageHandler( "getRootNodeDetails", &SyncManagerImpl::GetRootNodeDetails); BindJsMessageHandler( "getNodeSummariesById", &SyncManagerImpl::GetNodeSummariesById); BindJsMessageHandler( "getNodeDetailsById", &SyncManagerImpl::GetNodeDetailsById); BindJsMessageHandler( "getAllNodes", &SyncManagerImpl::GetAllNodes); BindJsMessageHandler( "getChildNodeIds", &SyncManagerImpl::GetChildNodeIds); BindJsMessageHandler( "getClientServerTraffic", &SyncManagerImpl::GetClientServerTraffic); } SyncManagerImpl::~SyncManagerImpl() { DCHECK(thread_checker_.CalledOnValidThread()); CHECK(!initialized_); } SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {} SyncManagerImpl::NotificationInfo::~NotificationInfo() {} base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const { base::DictionaryValue* value = new base::DictionaryValue(); value->SetInteger("totalCount", total_count); value->SetString("payload", payload); return value; } bool SyncManagerImpl::VisiblePositionsDiffer( const syncable::EntryKernelMutation& mutation) const { const syncable::EntryKernel& a = mutation.original; const syncable::EntryKernel& b = mutation.mutated; if (!b.ShouldMaintainPosition()) return false; if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION))) return true; if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID)) return true; return false; } bool SyncManagerImpl::VisiblePropertiesDiffer( const syncable::EntryKernelMutation& mutation, Cryptographer* cryptographer) const { const syncable::EntryKernel& a = mutation.original; const syncable::EntryKernel& b = mutation.mutated; const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS); const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS); DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics), GetModelTypeFromSpecifics(b_specifics)); ModelType model_type = GetModelTypeFromSpecifics(b_specifics); // Suppress updates to items that aren't tracked by any browser model. if (model_type < FIRST_REAL_MODEL_TYPE || !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) { return false; } if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR)) return true; if (!AreSpecificsEqual(cryptographer, a.ref(syncable::SPECIFICS), b.ref(syncable::SPECIFICS))) { return true; } // We only care if the name has changed if neither specifics is encrypted // (encrypted nodes blow away the NON_UNIQUE_NAME). if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() && a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME)) return true; if (VisiblePositionsDiffer(mutation)) return true; return false; } void SyncManagerImpl::ThrowUnrecoverableError() { DCHECK(thread_checker_.CalledOnValidThread()); ReadTransaction trans(FROM_HERE, GetUserShare()); trans.GetWrappedTrans()->OnUnrecoverableError( FROM_HERE, "Simulating unrecoverable error for testing purposes."); } ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() { return directory()->InitialSyncEndedTypes(); } ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken( ModelTypeSet types) { ModelTypeSet result; for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) { sync_pb::DataTypeProgressMarker marker; directory()->GetDownloadProgress(i.Get(), &marker); if (marker.token().empty()) result.Put(i.Get()); } return result; } void SyncManagerImpl::ConfigureSyncer( ConfigureReason reason, ModelTypeSet to_download, ModelTypeSet to_purge, ModelTypeSet to_journal, ModelTypeSet to_unapply, const ModelSafeRoutingInfo& new_routing_info, const base::Closure& ready_task, const base::Closure& retry_task) { DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(!ready_task.is_null()); DCHECK(!retry_task.is_null()); DVLOG(1) << "Configuring -" << "\n\t" << "current types: " << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info)) << "\n\t" << "types to download: " << ModelTypeSetToString(to_download) << "\n\t" << "types to purge: " << ModelTypeSetToString(to_purge) << "\n\t" << "types to journal: " << ModelTypeSetToString(to_journal) << "\n\t" << "types to unapply: " << ModelTypeSetToString(to_unapply); if (!PurgeDisabledTypes(to_purge, to_journal, to_unapply)) { // We failed to cleanup the types. Invoke the ready task without actually // configuring any types. The caller should detect this as a configuration // failure and act appropriately. ready_task.Run(); return; } ConfigurationParams params(GetSourceFromReason(reason), to_download, new_routing_info, ready_task, retry_task); scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); scheduler_->ScheduleConfiguration(params); } void SyncManagerImpl::Init( const base::FilePath& database_location, const WeakHandle& event_handler, const std::string& sync_server_and_path, int port, bool use_ssl, scoped_ptr post_factory, const std::vector >& workers, ExtensionsActivity* extensions_activity, SyncManager::ChangeDelegate* change_delegate, const SyncCredentials& credentials, const std::string& invalidator_client_id, const std::string& restored_key_for_bootstrapping, const std::string& restored_keystore_key_for_bootstrapping, InternalComponentsFactory* internal_components_factory, Encryptor* encryptor, scoped_ptr unrecoverable_error_handler, ReportUnrecoverableErrorFunction report_unrecoverable_error_function, CancelationSignal* cancelation_signal) { CHECK(!initialized_); DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(post_factory.get()); DCHECK(!credentials.email.empty()); DCHECK(!credentials.sync_token.empty()); DCHECK(cancelation_signal); DVLOG(1) << "SyncManager starting Init..."; weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()); change_delegate_ = change_delegate; AddObserver(&js_sync_manager_observer_); SetJsEventHandler(event_handler); AddObserver(&debug_info_event_listener_); database_path_ = database_location.Append( syncable::Directory::kSyncDatabaseFilename); encryptor_ = encryptor; unrecoverable_error_handler_ = unrecoverable_error_handler.Pass(); report_unrecoverable_error_function_ = report_unrecoverable_error_function; allstatus_.SetHasKeystoreKey( !restored_keystore_key_for_bootstrapping.empty()); sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl( &share_, encryptor, restored_key_for_bootstrapping, restored_keystore_key_for_bootstrapping)); sync_encryption_handler_->AddObserver(this); sync_encryption_handler_->AddObserver(&debug_info_event_listener_); sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_); base::FilePath absolute_db_path = database_path_; DCHECK(absolute_db_path.IsAbsolute()); scoped_ptr backing_store = internal_components_factory->BuildDirectoryBackingStore( credentials.email, absolute_db_path).Pass(); DCHECK(backing_store.get()); const std::string& username = credentials.email; share_.directory.reset( new syncable::Directory( backing_store.release(), unrecoverable_error_handler_.get(), report_unrecoverable_error_function_, sync_encryption_handler_.get(), sync_encryption_handler_->GetCryptographerUnsafe())); DVLOG(1) << "Username: " << username; if (!OpenDirectory(username)) { NotifyInitializationFailure(); LOG(ERROR) << "Sync manager initialization failed!"; return; } connection_manager_.reset(new SyncAPIServerConnectionManager( sync_server_and_path, port, use_ssl, post_factory.release(), cancelation_signal)); connection_manager_->set_client_id(directory()->cache_guid()); connection_manager_->AddListener(this); std::string sync_id = directory()->cache_guid(); allstatus_.SetSyncId(sync_id); allstatus_.SetInvalidatorClientId(invalidator_client_id); DVLOG(1) << "Setting sync client ID: " << sync_id; DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id; // Build a SyncSessionContext and store the worker in it. DVLOG(1) << "Sync is bringing up SyncSessionContext."; std::vector listeners; listeners.push_back(&allstatus_); listeners.push_back(this); session_context_ = internal_components_factory->BuildContext( connection_manager_.get(), directory(), workers, extensions_activity, listeners, &debug_info_event_listener_, &traffic_recorder_, invalidator_client_id).Pass(); session_context_->set_account_name(credentials.email); scheduler_ = internal_components_factory->BuildScheduler( name_, session_context_.get(), cancelation_signal).Pass(); scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); initialized_ = true; net::NetworkChangeNotifier::AddIPAddressObserver(this); net::NetworkChangeNotifier::AddConnectionTypeObserver(this); observing_network_connectivity_changes_ = true; UpdateCredentials(credentials); NotifyInitializationSuccess(); } void SyncManagerImpl::NotifyInitializationSuccess() { FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnInitializationComplete( MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), true, InitialSyncEndedTypes())); } void SyncManagerImpl::NotifyInitializationFailure() { FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnInitializationComplete( MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), false, ModelTypeSet())); } void SyncManagerImpl::OnPassphraseRequired( PassphraseRequiredReason reason, const sync_pb::EncryptedData& pending_keys) { // Does nothing. } void SyncManagerImpl::OnPassphraseAccepted() { // Does nothing. } void SyncManagerImpl::OnBootstrapTokenUpdated( const std::string& bootstrap_token, BootstrapTokenType type) { if (type == KEYSTORE_BOOTSTRAP_TOKEN) allstatus_.SetHasKeystoreKey(true); } void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, bool encrypt_everything) { allstatus_.SetEncryptedTypes(encrypted_types); } void SyncManagerImpl::OnEncryptionComplete() { // Does nothing. } void SyncManagerImpl::OnCryptographerStateChanged( Cryptographer* cryptographer) { allstatus_.SetCryptographerReady(cryptographer->is_ready()); allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys()); allstatus_.SetKeystoreMigrationTime( sync_encryption_handler_->migration_time()); } void SyncManagerImpl::OnPassphraseTypeChanged( PassphraseType type, base::Time explicit_passphrase_time) { allstatus_.SetPassphraseType(type); allstatus_.SetKeystoreMigrationTime( sync_encryption_handler_->migration_time()); } void SyncManagerImpl::StartSyncingNormally( const ModelSafeRoutingInfo& routing_info) { // Start the sync scheduler. // TODO(sync): We always want the newest set of routes when we switch back // to normal mode. Figure out how to enforce set_routing_info is always // appropriately set and that it's only modified when switching to normal // mode. DCHECK(thread_checker_.CalledOnValidThread()); session_context_->set_routing_info(routing_info); scheduler_->Start(SyncScheduler::NORMAL_MODE); } syncable::Directory* SyncManagerImpl::directory() { return share_.directory.get(); } const SyncScheduler* SyncManagerImpl::scheduler() const { return scheduler_.get(); } bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const { return connection_manager_->HasInvalidAuthToken(); } bool SyncManagerImpl::OpenDirectory(const std::string& username) { DCHECK(!initialized_) << "Should only happen once"; // Set before Open(). change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()); WeakHandle transaction_observer( MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr())); syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED; open_result = directory()->Open(username, this, transaction_observer); if (open_result != syncable::OPENED) { LOG(ERROR) << "Could not open share for:" << username; return false; } // Unapplied datatypes (those that do not have initial sync ended set) get // re-downloaded during any configuration. But, it's possible for a datatype // to have a progress marker but not have initial sync ended yet, making // it a candidate for migration. This is a problem, as the DataTypeManager // does not support a migration while it's already in the middle of a // configuration. As a result, any partially synced datatype can stall the // DTM, waiting for the configuration to complete, which it never will due // to the migration error. In addition, a partially synced nigori will // trigger the migration logic before the backend is initialized, resulting // in crashes. We therefore detect and purge any partially synced types as // part of initialization. if (!PurgePartiallySyncedTypes()) return false; return true; } bool SyncManagerImpl::PurgePartiallySyncedTypes() { ModelTypeSet partially_synced_types = ModelTypeSet::All(); partially_synced_types.RemoveAll(InitialSyncEndedTypes()); partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken( ModelTypeSet::All())); DVLOG(1) << "Purging partially synced types " << ModelTypeSetToString(partially_synced_types); UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes", partially_synced_types.Size()); if (partially_synced_types.Empty()) return true; return directory()->PurgeEntriesWithTypeIn(partially_synced_types, ModelTypeSet(), ModelTypeSet()); } bool SyncManagerImpl::PurgeDisabledTypes( ModelTypeSet to_purge, ModelTypeSet to_journal, ModelTypeSet to_unapply) { if (to_purge.Empty()) return true; DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge); DCHECK(to_purge.HasAll(to_journal)); DCHECK(to_purge.HasAll(to_unapply)); return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply); } void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) { DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(initialized_); DCHECK(!credentials.email.empty()); DCHECK(!credentials.sync_token.empty()); observing_network_connectivity_changes_ = true; if (!connection_manager_->SetAuthToken(credentials.sync_token)) return; // Auth token is known to be invalid, so exit early. scheduler_->OnCredentialsUpdated(); // TODO(zea): pass the credential age to the debug info event listener. } void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) { DCHECK(thread_checker_.CalledOnValidThread()); observers_.AddObserver(observer); } void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) { DCHECK(thread_checker_.CalledOnValidThread()); observers_.RemoveObserver(observer); } void SyncManagerImpl::ShutdownOnSyncThread() { DCHECK(thread_checker_.CalledOnValidThread()); // Prevent any in-flight method calls from running. Also // invalidates |weak_handle_this_| and |change_observer_|. weak_ptr_factory_.InvalidateWeakPtrs(); js_mutation_event_observer_.InvalidateWeakPtrs(); scheduler_.reset(); session_context_.reset(); if (sync_encryption_handler_) { sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_); sync_encryption_handler_->RemoveObserver(this); } SetJsEventHandler(WeakHandle()); RemoveObserver(&js_sync_manager_observer_); RemoveObserver(&debug_info_event_listener_); // |connection_manager_| may end up being NULL here in tests (in synchronous // initialization mode). // // TODO(akalin): Fix this behavior. if (connection_manager_) connection_manager_->RemoveListener(this); connection_manager_.reset(); net::NetworkChangeNotifier::RemoveIPAddressObserver(this); net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this); observing_network_connectivity_changes_ = false; if (initialized_ && directory()) { directory()->SaveChanges(); } share_.directory.reset(); change_delegate_ = NULL; initialized_ = false; // We reset these here, since only now we know they will not be // accessed from other threads (since we shut down everything). change_observer_.Reset(); weak_handle_this_.Reset(); } void SyncManagerImpl::OnIPAddressChanged() { if (!observing_network_connectivity_changes_) { DVLOG(1) << "IP address change dropped."; return; } DVLOG(1) << "IP address change detected."; OnNetworkConnectivityChangedImpl(); } void SyncManagerImpl::OnConnectionTypeChanged( net::NetworkChangeNotifier::ConnectionType) { if (!observing_network_connectivity_changes_) { DVLOG(1) << "Connection type change dropped."; return; } DVLOG(1) << "Connection type change detected."; OnNetworkConnectivityChangedImpl(); } void SyncManagerImpl::OnNetworkConnectivityChangedImpl() { DCHECK(thread_checker_.CalledOnValidThread()); scheduler_->OnConnectionStatusChange(); } void SyncManagerImpl::OnServerConnectionEvent( const ServerConnectionEvent& event) { DCHECK(thread_checker_.CalledOnValidThread()); if (event.connection_code == HttpResponse::SERVER_CONNECTION_OK) { FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnConnectionStatusChange(CONNECTION_OK)); } if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) { observing_network_connectivity_changes_ = false; FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnConnectionStatusChange(CONNECTION_AUTH_ERROR)); } if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) { FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnConnectionStatusChange(CONNECTION_SERVER_ERROR)); } } void SyncManagerImpl::HandleTransactionCompleteChangeEvent( ModelTypeSet models_with_changes) { // This notification happens immediately after the transaction mutex is // released. This allows work to be performed without blocking other threads // from acquiring a transaction. if (!change_delegate_) return; // Call commit. for (ModelTypeSet::Iterator it = models_with_changes.First(); it.Good(); it.Inc()) { change_delegate_->OnChangesComplete(it.Get()); change_observer_.Call( FROM_HERE, &SyncManager::ChangeObserver::OnChangesComplete, it.Get()); } } ModelTypeSet SyncManagerImpl::HandleTransactionEndingChangeEvent( const ImmutableWriteTransactionInfo& write_transaction_info, syncable::BaseTransaction* trans) { // This notification happens immediately before a syncable WriteTransaction // falls out of scope. It happens while the channel mutex is still held, // and while the transaction mutex is held, so it cannot be re-entrant. if (!change_delegate_ || change_records_.empty()) return ModelTypeSet(); // This will continue the WriteTransaction using a read only wrapper. // This is the last chance for read to occur in the WriteTransaction // that's closing. This special ReadTransaction will not close the // underlying transaction. ReadTransaction read_trans(GetUserShare(), trans); ModelTypeSet models_with_changes; for (ChangeRecordMap::const_iterator it = change_records_.begin(); it != change_records_.end(); ++it) { DCHECK(!it->second.Get().empty()); ModelType type = ModelTypeFromInt(it->first); change_delegate_-> OnChangesApplied(type, trans->directory()->GetTransactionVersion(type), &read_trans, it->second); change_observer_.Call(FROM_HERE, &SyncManager::ChangeObserver::OnChangesApplied, type, write_transaction_info.Get().id, it->second); models_with_changes.Put(type); } change_records_.clear(); return models_with_changes; } void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi( const ImmutableWriteTransactionInfo& write_transaction_info, syncable::BaseTransaction* trans, std::vector* entries_changed) { // We have been notified about a user action changing a sync model. LOG_IF(WARNING, !change_records_.empty()) << "CALCULATE_CHANGES called with unapplied old changes."; // The mutated model type, or UNSPECIFIED if nothing was mutated. ModelTypeSet mutated_model_types; const syncable::ImmutableEntryKernelMutationMap& mutations = write_transaction_info.Get().mutations; for (syncable::EntryKernelMutationMap::const_iterator it = mutations.Get().begin(); it != mutations.Get().end(); ++it) { if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) { continue; } ModelType model_type = GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); if (model_type < FIRST_REAL_MODEL_TYPE) { NOTREACHED() << "Permanent or underspecified item changed via syncapi."; continue; } // Found real mutation. if (model_type != UNSPECIFIED) { mutated_model_types.Put(model_type); entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE)); } } // Nudge if necessary. if (!mutated_model_types.Empty()) { if (weak_handle_this_.IsInitialized()) { weak_handle_this_.Call(FROM_HERE, &SyncManagerImpl::RequestNudgeForDataTypes, FROM_HERE, mutated_model_types); } else { NOTREACHED(); } } } void SyncManagerImpl::SetExtraChangeRecordData(int64 id, ModelType type, ChangeReorderBuffer* buffer, Cryptographer* cryptographer, const syncable::EntryKernel& original, bool existed_before, bool exists_now) { // If this is a deletion and the datatype was encrypted, we need to decrypt it // and attach it to the buffer. if (!exists_now && existed_before) { sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS)); if (type == PASSWORDS) { // Passwords must use their own legacy ExtraPasswordChangeRecordData. scoped_ptr data( DecryptPasswordSpecifics(original_specifics, cryptographer)); if (!data) { NOTREACHED(); return; } buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data)); } else if (original_specifics.has_encrypted()) { // All other datatypes can just create a new unencrypted specifics and // attach it. const sync_pb::EncryptedData& encrypted = original_specifics.encrypted(); if (!cryptographer->Decrypt(encrypted, &original_specifics)) { NOTREACHED(); return; } } buffer->SetSpecificsForId(id, original_specifics); } } void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer( const ImmutableWriteTransactionInfo& write_transaction_info, syncable::BaseTransaction* trans, std::vector* entries_changed) { // We only expect one notification per sync step, so change_buffers_ should // contain no pending entries. LOG_IF(WARNING, !change_records_.empty()) << "CALCULATE_CHANGES called with unapplied old changes."; ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT]; Cryptographer* crypto = directory()->GetCryptographer(trans); const syncable::ImmutableEntryKernelMutationMap& mutations = write_transaction_info.Get().mutations; for (syncable::EntryKernelMutationMap::const_iterator it = mutations.Get().begin(); it != mutations.Get().end(); ++it) { bool existed_before = !it->second.original.ref(syncable::IS_DEL); bool exists_now = !it->second.mutated.ref(syncable::IS_DEL); // Omit items that aren't associated with a model. ModelType type = GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); if (type < FIRST_REAL_MODEL_TYPE) continue; int64 handle = it->first; if (exists_now && !existed_before) change_buffers[type].PushAddedItem(handle); else if (!exists_now && existed_before) change_buffers[type].PushDeletedItem(handle); else if (exists_now && existed_before && VisiblePropertiesDiffer(it->second, crypto)) { change_buffers[type].PushUpdatedItem(handle); } SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto, it->second.original, existed_before, exists_now); } ReadTransaction read_trans(GetUserShare(), trans); for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { if (!change_buffers[i].IsEmpty()) { if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans, &(change_records_[i]))) { for (size_t j = 0; j < change_records_[i].Get().size(); ++j) entries_changed->push_back((change_records_[i].Get())[j].id); } if (change_records_[i].Get().empty()) change_records_.erase(i); } } } TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta( const ModelType& model_type) { return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this); } void SyncManagerImpl::RequestNudgeForDataTypes( const tracked_objects::Location& nudge_location, ModelTypeSet types) { debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get()); // TODO(lipalani) : Calculate the nudge delay based on all types. base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta( types.First().Get(), this); allstatus_.IncrementNudgeCounter(NUDGE_SOURCE_LOCAL); scheduler_->ScheduleLocalNudge(nudge_delay, types, nudge_location); } void SyncManagerImpl::OnSyncEngineEvent(const SyncEngineEvent& event) { DCHECK(thread_checker_.CalledOnValidThread()); // Only send an event if this is due to a cycle ending and this cycle // concludes a canonical "sync" process; that is, based on what is known // locally we are "all happy" and up-to-date. There may be new changes on // the server, but we'll get them on a subsequent sync. // // Notifications are sent at the end of every sync cycle, regardless of // whether we should sync again. if (event.what_happened == SyncEngineEvent::SYNC_CYCLE_ENDED) { if (!initialized_) { DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not " << "initialized"; return; } DVLOG(1) << "Sending OnSyncCycleCompleted"; FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnSyncCycleCompleted(event.snapshot)); } if (event.what_happened == SyncEngineEvent::STOP_SYNCING_PERMANENTLY) { FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnStopSyncingPermanently()); return; } if (event.what_happened == SyncEngineEvent::ACTIONABLE_ERROR) { FOR_EACH_OBSERVER( SyncManager::Observer, observers_, OnActionableError( event.snapshot.model_neutral_state().sync_protocol_error)); return; } } void SyncManagerImpl::SetJsEventHandler( const WeakHandle& event_handler) { js_event_handler_ = event_handler; js_sync_manager_observer_.SetJsEventHandler(js_event_handler_); js_mutation_event_observer_.SetJsEventHandler(js_event_handler_); js_sync_encryption_handler_observer_.SetJsEventHandler(js_event_handler_); } void SyncManagerImpl::ProcessJsMessage( const std::string& name, const JsArgList& args, const WeakHandle& reply_handler) { if (!initialized_) { NOTREACHED(); return; } if (!reply_handler.IsInitialized()) { DVLOG(1) << "Uninitialized reply handler; dropping unknown message " << name << " with args " << args.ToString(); return; } JsMessageHandler js_message_handler = js_message_handlers_[name]; if (js_message_handler.is_null()) { DVLOG(1) << "Dropping unknown message " << name << " with args " << args.ToString(); return; } reply_handler.Call(FROM_HERE, &JsReplyHandler::HandleJsReply, name, js_message_handler.Run(args)); } void SyncManagerImpl::BindJsMessageHandler( const std::string& name, UnboundJsMessageHandler unbound_message_handler) { js_message_handlers_[name] = base::Bind(unbound_message_handler, base::Unretained(this)); } base::DictionaryValue* SyncManagerImpl::NotificationInfoToValue( const NotificationInfoMap& notification_info) { base::DictionaryValue* value = new base::DictionaryValue(); for (NotificationInfoMap::const_iterator it = notification_info.begin(); it != notification_info.end(); ++it) { const std::string model_type_str = ModelTypeToString(it->first); value->Set(model_type_str, it->second.ToValue()); } return value; } std::string SyncManagerImpl::NotificationInfoToString( const NotificationInfoMap& notification_info) { scoped_ptr value( NotificationInfoToValue(notification_info)); std::string str; base::JSONWriter::Write(value.get(), &str); return str; } JsArgList SyncManagerImpl::GetNotificationState( const JsArgList& args) { const std::string& notification_state = InvalidatorStateToString(invalidator_state_); DVLOG(1) << "GetNotificationState: " << notification_state; base::ListValue return_args; return_args.Append(new base::StringValue(notification_state)); return JsArgList(&return_args); } JsArgList SyncManagerImpl::GetNotificationInfo( const JsArgList& args) { DVLOG(1) << "GetNotificationInfo: " << NotificationInfoToString(notification_info_map_); base::ListValue return_args; return_args.Append(NotificationInfoToValue(notification_info_map_)); return JsArgList(&return_args); } JsArgList SyncManagerImpl::GetRootNodeDetails( const JsArgList& args) { ReadTransaction trans(FROM_HERE, GetUserShare()); ReadNode root(&trans); root.InitByRootLookup(); base::ListValue return_args; return_args.Append(root.GetDetailsAsValue()); return JsArgList(&return_args); } JsArgList SyncManagerImpl::GetClientServerTraffic( const JsArgList& args) { base::ListValue return_args; base::ListValue* value = traffic_recorder_.ToValue(); if (value != NULL) return_args.Append(value); return JsArgList(&return_args); } namespace { int64 GetId(const base::ListValue& ids, int i) { std::string id_str; if (!ids.GetString(i, &id_str)) { return kInvalidId; } int64 id = kInvalidId; if (!base::StringToInt64(id_str, &id)) { return kInvalidId; } return id; } JsArgList GetNodeInfoById( const JsArgList& args, UserShare* user_share, base::DictionaryValue* (BaseNode::*info_getter)() const) { CHECK(info_getter); base::ListValue return_args; base::ListValue* node_summaries = new base::ListValue(); return_args.Append(node_summaries); const base::ListValue* id_list = NULL; ReadTransaction trans(FROM_HERE, user_share); if (args.Get().GetList(0, &id_list)) { CHECK(id_list); for (size_t i = 0; i < id_list->GetSize(); ++i) { int64 id = GetId(*id_list, i); if (id == kInvalidId) { continue; } ReadNode node(&trans); if (node.InitByIdLookup(id) != BaseNode::INIT_OK) { continue; } node_summaries->Append((node.*info_getter)()); } } return JsArgList(&return_args); } } // namespace JsArgList SyncManagerImpl::GetNodeSummariesById(const JsArgList& args) { return GetNodeInfoById(args, GetUserShare(), &BaseNode::GetSummaryAsValue); } JsArgList SyncManagerImpl::GetNodeDetailsById(const JsArgList& args) { return GetNodeInfoById(args, GetUserShare(), &BaseNode::GetDetailsAsValue); } JsArgList SyncManagerImpl::GetAllNodes(const JsArgList& args) { base::ListValue return_args; base::ListValue* result = new base::ListValue(); return_args.Append(result); ReadTransaction trans(FROM_HERE, GetUserShare()); std::vector entry_kernels; trans.GetDirectory()->GetAllEntryKernels(trans.GetWrappedTrans(), &entry_kernels); for (std::vector::const_iterator it = entry_kernels.begin(); it != entry_kernels.end(); ++it) { result->Append((*it)->ToValue(trans.GetCryptographer())); } return JsArgList(&return_args); } JsArgList SyncManagerImpl::GetChildNodeIds(const JsArgList& args) { base::ListValue return_args; base::ListValue* child_ids = new base::ListValue(); return_args.Append(child_ids); int64 id = GetId(args.Get(), 0); if (id != kInvalidId) { ReadTransaction trans(FROM_HERE, GetUserShare()); syncable::Directory::Metahandles child_handles; trans.GetDirectory()->GetChildHandlesByHandle(trans.GetWrappedTrans(), id, &child_handles); for (syncable::Directory::Metahandles::const_iterator it = child_handles.begin(); it != child_handles.end(); ++it) { child_ids->Append(new base::StringValue(base::Int64ToString(*it))); } } return JsArgList(&return_args); } void SyncManagerImpl::UpdateNotificationInfo( const ObjectIdInvalidationMap& invalidation_map) { ObjectIdSet ids = invalidation_map.GetObjectIds(); for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { ModelType type = UNSPECIFIED; if (!ObjectIdToRealModelType(*it, &type)) { continue; } const SingleObjectInvalidationSet& type_invalidations = invalidation_map.ForObject(*it); for (SingleObjectInvalidationSet::const_iterator inv_it = type_invalidations.begin(); inv_it != type_invalidations.end(); ++inv_it) { NotificationInfo* info = ¬ification_info_map_[type]; info->total_count++; std::string payload = inv_it->is_unknown_version() ? "UNKNOWN" : inv_it->payload(); info->payload = payload; } } } void SyncManagerImpl::OnInvalidatorStateChange(InvalidatorState state) { DCHECK(thread_checker_.CalledOnValidThread()); const std::string& state_str = InvalidatorStateToString(state); invalidator_state_ = state; DVLOG(1) << "Invalidator state changed to: " << state_str; const bool notifications_enabled = (invalidator_state_ == INVALIDATIONS_ENABLED); allstatus_.SetNotificationsEnabled(notifications_enabled); scheduler_->SetNotificationsEnabled(notifications_enabled); if (js_event_handler_.IsInitialized()) { base::DictionaryValue details; details.SetString("state", state_str); js_event_handler_.Call(FROM_HERE, &JsEventHandler::HandleJsEvent, "onNotificationStateChange", JsEventDetails(&details)); } } void SyncManagerImpl::OnIncomingInvalidation( const ObjectIdInvalidationMap& invalidation_map) { DCHECK(thread_checker_.CalledOnValidThread()); // We should never receive IDs from non-sync objects. ObjectIdSet ids = invalidation_map.GetObjectIds(); for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { ModelType type; if (!ObjectIdToRealModelType(*it, &type)) { DLOG(WARNING) << "Notification has invalid id: " << ObjectIdToString(*it); } } if (invalidation_map.Empty()) { LOG(WARNING) << "Sync received invalidation without any type information."; } else { allstatus_.IncrementNudgeCounter(NUDGE_SOURCE_NOTIFICATION); scheduler_->ScheduleInvalidationNudge( TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec), invalidation_map, FROM_HERE); allstatus_.IncrementNotificationsReceived(); UpdateNotificationInfo(invalidation_map); debug_info_event_listener_.OnIncomingNotification(invalidation_map); } if (js_event_handler_.IsInitialized()) { base::DictionaryValue details; base::ListValue* changed_types = new base::ListValue(); details.Set("changedTypes", changed_types); ObjectIdSet id_set = invalidation_map.GetObjectIds(); ModelTypeSet nudged_types = ObjectIdSetToModelTypeSet(id_set); DCHECK(!nudged_types.Empty()); for (ModelTypeSet::Iterator it = nudged_types.First(); it.Good(); it.Inc()) { const std::string model_type_str = ModelTypeToString(it.Get()); changed_types->Append(new base::StringValue(model_type_str)); } details.SetString("source", "REMOTE_INVALIDATION"); js_event_handler_.Call(FROM_HERE, &JsEventHandler::HandleJsEvent, "onIncomingNotification", JsEventDetails(&details)); } } void SyncManagerImpl::RefreshTypes(ModelTypeSet types) { DCHECK(thread_checker_.CalledOnValidThread()); if (types.Empty()) { LOG(WARNING) << "Sync received refresh request with no types specified."; } else { allstatus_.IncrementNudgeCounter(NUDGE_SOURCE_LOCAL_REFRESH); scheduler_->ScheduleLocalRefreshRequest( TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec), types, FROM_HERE); } if (js_event_handler_.IsInitialized()) { base::DictionaryValue details; base::ListValue* changed_types = new base::ListValue(); details.Set("changedTypes", changed_types); for (ModelTypeSet::Iterator it = types.First(); it.Good(); it.Inc()) { const std::string& model_type_str = ModelTypeToString(it.Get()); changed_types->Append(new base::StringValue(model_type_str)); } details.SetString("source", "LOCAL_INVALIDATION"); js_event_handler_.Call(FROM_HERE, &JsEventHandler::HandleJsEvent, "onIncomingNotification", JsEventDetails(&details)); } } SyncStatus SyncManagerImpl::GetDetailedStatus() const { return allstatus_.status(); } void SyncManagerImpl::SaveChanges() { directory()->SaveChanges(); } UserShare* SyncManagerImpl::GetUserShare() { DCHECK(initialized_); return &share_; } const std::string SyncManagerImpl::cache_guid() { DCHECK(initialized_); return directory()->cache_guid(); } bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) { ReadTransaction trans(FROM_HERE, GetUserShare()); ReadNode nigori_node(&trans); if (nigori_node.InitByTagLookup(kNigoriTag) != BaseNode::INIT_OK) { DVLOG(1) << "Couldn't find Nigori node."; return false; } bool found_experiment = false; ReadNode autofill_culling_node(&trans); if (autofill_culling_node.InitByClientTagLookup( syncer::EXPERIMENTS, syncer::kAutofillCullingTag) == BaseNode::INIT_OK && autofill_culling_node.GetExperimentsSpecifics(). autofill_culling().enabled()) { experiments->autofill_culling = true; found_experiment = true; } ReadNode favicon_sync_node(&trans); if (favicon_sync_node.InitByClientTagLookup( syncer::EXPERIMENTS, syncer::kFaviconSyncTag) == BaseNode::INIT_OK) { experiments->favicon_sync_limit = favicon_sync_node.GetExperimentsSpecifics().favicon_sync(). favicon_sync_limit(); found_experiment = true; } ReadNode pre_commit_update_avoidance_node(&trans); if (pre_commit_update_avoidance_node.InitByClientTagLookup( syncer::EXPERIMENTS, syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) { session_context_->set_server_enabled_pre_commit_update_avoidance( pre_commit_update_avoidance_node.GetExperimentsSpecifics(). pre_commit_update_avoidance().enabled()); // We don't bother setting found_experiment. The frontend doesn't need to // know about this. } return found_experiment; } bool SyncManagerImpl::HasUnsyncedItems() { ReadTransaction trans(FROM_HERE, GetUserShare()); return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0); } SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() { return sync_encryption_handler_.get(); } // static. int SyncManagerImpl::GetDefaultNudgeDelay() { return kDefaultNudgeDelayMilliseconds; } // static. int SyncManagerImpl::GetPreferencesNudgeDelay() { return kPreferencesNudgeDelayMilliseconds; } } // namespace syncer