diff options
author | lipalani@chromium.org <lipalani@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-19 05:33:04 +0000 |
---|---|---|
committer | lipalani@chromium.org <lipalani@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-19 05:33:04 +0000 |
commit | 5e1cc9e5ab1a8ba9087ab117e71b06787ca7b251 (patch) | |
tree | f209812b2f4108e9f8cee56384db2cdeabf40c73 /chrome/browser/sync | |
parent | ca034a50e0e1f68212953c4b7795574747a32967 (diff) | |
download | chromium_src-5e1cc9e5ab1a8ba9087ab117e71b06787ca7b251.zip chromium_src-5e1cc9e5ab1a8ba9087ab117e71b06787ca7b251.tar.gz chromium_src-5e1cc9e5ab1a8ba9087ab117e71b06787ca7b251.tar.bz2 |
Revert 82067 - make new syncer thread the default. Fixed all the test cases that failed.BUG=26339TEST=unit_tests.exe, sync_integration_tests.exe, sync_unit_tests.exe. manual testcases:1. Set up a brand new profile to sync and make sure it syncs.2. enable/disable a datatype and make sure it syncs.3. Make change to a datatype locally and make sure it gets propagated.4. make change to a datatype remotely and make sure it syncs down.Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=82026Review URL: http://codereview.chromium.org/6874018
TBR=lipalani@chromium.org
Review URL: http://codereview.chromium.org/6883037
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@82073 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
38 files changed, 5418 insertions, 1137 deletions
diff --git a/chrome/browser/sync/engine/all_status.cc b/chrome/browser/sync/engine/all_status.cc index 0b454a3..77d2f5f 100644 --- a/chrome/browser/sync/engine/all_status.cc +++ b/chrome/browser/sync/engine/all_status.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -106,7 +106,12 @@ void AllStatus::OnSyncEngineEvent(const SyncEngineEvent& event) { case SyncEngineEvent::STATUS_CHANGED: status_ = CalcSyncing(event); break; + case SyncEngineEvent::SYNCER_THREAD_PAUSED: + case SyncEngineEvent::SYNCER_THREAD_RESUMED: + case SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION: + case SyncEngineEvent::SYNCER_THREAD_CONNECTED: case SyncEngineEvent::STOP_SYNCING_PERMANENTLY: + case SyncEngineEvent::SYNCER_THREAD_EXITING: break; default: LOG(ERROR) << "Unrecognized Syncer Event: " << event.what_happened; diff --git a/chrome/browser/sync/engine/net/server_connection_manager.cc b/chrome/browser/sync/engine/net/server_connection_manager.cc index f84b8b7..4598588 100644 --- a/chrome/browser/sync/engine/net/server_connection_manager.cc +++ b/chrome/browser/sync/engine/net/server_connection_manager.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -158,8 +158,16 @@ ServerConnectionManager::~ServerConnectionManager() { } void ServerConnectionManager::NotifyStatusChanged() { - listeners_->Notify(&ServerConnectionEventListener::OnServerConnectionEvent, - ServerConnectionEvent2(server_status_, server_reachable_)); + if (CommandLine::ForCurrentProcess()->HasSwitch( + switches::kNewSyncerThread)) { + listeners_->Notify(&ServerConnectionEventListener::OnServerConnectionEvent, + ServerConnectionEvent2(server_status_, server_reachable_)); + } else { + ServerConnectionEvent event = { ServerConnectionEvent::STATUS_CHANGED, + server_status_, + server_reachable_ }; + channel_->NotifyListeners(event); + } } bool ServerConnectionManager::PostBufferWithCachedAuth( diff --git a/chrome/browser/sync/engine/nudge_source.h b/chrome/browser/sync/engine/nudge_source.h index fc9b02a..3e3b3ae 100644 --- a/chrome/browser/sync/engine/nudge_source.h +++ b/chrome/browser/sync/engine/nudge_source.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -8,6 +8,8 @@ namespace browser_sync { +namespace s3 { + enum NudgeSource { NUDGE_SOURCE_UNKNOWN = 0, // We received an invalidation message and are nudging to check for updates. @@ -18,6 +20,7 @@ enum NudgeSource { NUDGE_SOURCE_CONTINUATION, }; +} // namespace s3 } // namespace browser_sync #endif // CHROME_BROWSER_SYNC_ENGINE_NUDGE_SOURCE_H_ diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc index 41da3d7..7792dc6 100644 --- a/chrome/browser/sync/engine/syncapi.cc +++ b/chrome/browser/sync/engine/syncapi.cc @@ -23,17 +23,17 @@ #include "base/string_util.h" #include "base/synchronization/lock.h" #include "base/task.h" -#include "base/time.h" #include "base/utf_string_conversions.h" #include "base/values.h" #include "chrome/browser/sync/engine/all_status.h" #include "chrome/browser/sync/engine/change_reorder_buffer.h" #include "chrome/browser/sync/engine/model_safe_worker.h" -#include "chrome/browser/sync/engine/nudge_source.h" #include "chrome/browser/sync/engine/net/server_connection_manager.h" #include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" #include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" +#include "chrome/browser/sync/engine/syncer_thread_adapter.h" #include "chrome/browser/sync/engine/http_post_provider_factory.h" #include "chrome/browser/sync/js_arg_list.h" #include "chrome/browser/sync/js_backend.h" @@ -57,7 +57,6 @@ #include "chrome/browser/sync/syncable/autofill_migration.h" #include "chrome/browser/sync/syncable/directory_manager.h" #include "chrome/browser/sync/syncable/model_type_payload_map.h" -#include "chrome/browser/sync/syncable/model_type.h" #include "chrome/browser/sync/syncable/nigori_util.h" #include "chrome/browser/sync/syncable/syncable.h" #include "chrome/browser/sync/util/crypto_helpers.h" @@ -67,7 +66,6 @@ #include "content/browser/browser_thread.h" #include "net/base/network_change_notifier.h" -using base::TimeDelta; using browser_sync::AllStatus; using browser_sync::Cryptographer; using browser_sync::KeyParams; @@ -81,6 +79,7 @@ using browser_sync::SyncEngineEvent; using browser_sync::SyncEngineEventListener; using browser_sync::Syncer; using browser_sync::SyncerThread; +using browser_sync::SyncerThreadAdapter; using browser_sync::kNigoriTag; using browser_sync::sessions::SyncSessionContext; using std::list; @@ -90,7 +89,6 @@ using std::vector; using syncable::Directory; using syncable::DirectoryManager; using syncable::Entry; -using syncable::ModelTypeBitSet; using syncable::SPECIFICS; using sync_pb::AutofillProfileSpecifics; @@ -1271,7 +1269,7 @@ class SyncManager::SyncInternal SyncAPIServerConnectionManager* connection_manager() { return connection_manager_.get(); } - SyncerThread* syncer_thread() { return syncer_thread_.get(); } + SyncerThreadAdapter* syncer_thread() { return syncer_thread_.get(); } UserShare* GetUserShare() { return &share_; } // Return the currently active (validated) username for use with syncable @@ -1518,7 +1516,7 @@ class SyncManager::SyncInternal scoped_ptr<SyncAPIServerConnectionManager> connection_manager_; // The thread that runs the Syncer. Needs to be explicitly Start()ed. - scoped_ptr<SyncerThread> syncer_thread_; + scoped_ptr<SyncerThreadAdapter> syncer_thread_; // The SyncNotifier which notifies us when updates need to be downloaded. sync_notifier::SyncNotifier* sync_notifier_; @@ -1666,30 +1664,44 @@ bool SyncManager::IsUsingExplicitPassphrase() { return data_ && data_->IsUsingExplicitPassphrase(); } +bool SyncManager::RequestPause() { + if (data_->syncer_thread()) + return data_->syncer_thread()->RequestPause(); + return false; +} + +bool SyncManager::RequestResume() { + if (data_->syncer_thread()) + return data_->syncer_thread()->RequestResume(); + return false; +} + void SyncManager::RequestNudge(const tracked_objects::Location& location) { if (data_->syncer_thread()) - data_->syncer_thread()->ScheduleNudge( - TimeDelta::FromMilliseconds(0), browser_sync::NUDGE_SOURCE_LOCAL, - ModelTypeBitSet(), location); + data_->syncer_thread()->NudgeSyncer(0, SyncerThread::kLocal, location); } void SyncManager::RequestClearServerData() { if (data_->syncer_thread()) - data_->syncer_thread()->ScheduleClearUserData(); + data_->syncer_thread()->NudgeSyncer(0, SyncerThread::kClearPrivateData, + FROM_HERE); } void SyncManager::RequestConfig(const syncable::ModelTypeBitSet& types) { if (!data_->syncer_thread()) return; + // It is an error for this to be called if new_impl is null. StartConfigurationMode(NULL); - data_->syncer_thread()->ScheduleConfig(types); + data_->syncer_thread()->new_impl()->ScheduleConfig(types); } void SyncManager::StartConfigurationMode(ModeChangeCallback* callback) { if (!data_->syncer_thread()) return; - data_->syncer_thread()->Start( - browser_sync::SyncerThread::CONFIGURATION_MODE, callback); + if (!data_->syncer_thread()->new_impl()) + return; + data_->syncer_thread()->new_impl()->Start( + browser_sync::s3::SyncerThread::CONFIGURATION_MODE, callback); } const std::string& SyncManager::GetAuthenticatedUsername() { @@ -1727,7 +1739,16 @@ bool SyncManager::SyncInternal::Init( net::NetworkChangeNotifier::AddIPAddressObserver(this); - connection_manager()->AddListener(this); + bool new_syncer_thread = CommandLine::ForCurrentProcess()->HasSwitch( + switches::kNewSyncerThread); + + if (new_syncer_thread) { + connection_manager()->AddListener(this); + } else { + connection_manager_hookup_.reset( + NewEventListenerHookup(connection_manager()->channel(), this, + &SyncManager::SyncInternal::HandleServerConnectionEvent)); + } // TODO(akalin): CheckServerReachable() can block, which may cause jank if we // try to shut down sync. Fix this. @@ -1748,7 +1769,8 @@ bool SyncManager::SyncInternal::Init( listeners); context->set_account_name(credentials.email); // The SyncerThread takes ownership of |context|. - syncer_thread_.reset(new SyncerThread(context, new Syncer())); + syncer_thread_.reset(new SyncerThreadAdapter(context, + new_syncer_thread)); } bool signed_in = SignIn(credentials); @@ -1803,12 +1825,11 @@ void SyncManager::SyncInternal::BootstrapEncryption( } void SyncManager::SyncInternal::StartSyncing() { - // Start the syncer thread. This won't actually - // result in any syncing until at least the - // DirectoryManager broadcasts the OPENED event, - // and a valid server connection is detected. if (syncer_thread()) // NULL during certain unittests. - syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); + syncer_thread()->Start(); // Start the syncer thread. This won't actually + // result in any syncing until at least the + // DirectoryManager broadcasts the OPENED event, + // and a valid server connection is detected. } void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { @@ -1861,6 +1882,9 @@ bool SyncManager::SyncInternal::OpenDirectory() { connection_manager()->set_client_id(lookup->cache_guid()); + if (syncer_thread()) + syncer_thread()->CreateSyncer(username_for_share()); + MarkAndNotifyInitializationComplete(); dir_change_hookup_.reset(lookup->AddChangeObserver(this)); return true; @@ -2186,7 +2210,9 @@ void SyncManager::SyncInternal::Shutdown() { method_factory_.RevokeAll(); if (syncer_thread()) { - syncer_thread()->Stop(); + if (!syncer_thread()->Stop(kThreadExitTimeoutMsec)) { + LOG(FATAL) << "Unable to stop the syncer, it won't be happy..."; + } syncer_thread_.reset(); } @@ -2391,9 +2417,9 @@ void SyncManager::SyncInternal::HandleCalculateChangesChangeEventFromSyncApi( if (exists_unsynced_items && syncer_thread()) { int nudge_delay = only_preference_changes ? kPreferencesNudgeDelayMilliseconds : kDefaultNudgeDelayMilliseconds; - syncer_thread()->ScheduleNudge( - TimeDelta::FromMilliseconds(nudge_delay), - browser_sync::NUDGE_SOURCE_LOCAL, + syncer_thread()->NudgeSyncerWithDataTypes( + nudge_delay, + SyncerThread::kLocal, model_types, FROM_HERE); } @@ -2549,6 +2575,18 @@ void SyncManager::SyncInternal::OnSyncEngineEvent( } } + if (event.what_happened == SyncEngineEvent::SYNCER_THREAD_PAUSED) { + FOR_EACH_OBSERVER(SyncManager::Observer, observers_, + OnPaused()); + return; + } + + if (event.what_happened == SyncEngineEvent::SYNCER_THREAD_RESUMED) { + FOR_EACH_OBSERVER(SyncManager::Observer, observers_, + OnResumed()); + return; + } + if (event.what_happened == SyncEngineEvent::STOP_SYNCING_PERMANENTLY) { FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnStopSyncingPermanently()); @@ -2705,7 +2743,7 @@ void SyncManager::SyncInternal::OnNotificationStateChange( << (notifications_enabled ? "true" : "false"); allstatus_.SetNotificationsEnabled(notifications_enabled); if (syncer_thread()) { - syncer_thread()->set_notifications_enabled(notifications_enabled); + syncer_thread()->SetNotificationsEnabled(notifications_enabled); } if (parent_router_) { ListValue args; @@ -2730,9 +2768,9 @@ void SyncManager::SyncInternal::OnIncomingNotification( const syncable::ModelTypePayloadMap& type_payloads) { if (!type_payloads.empty()) { if (syncer_thread()) { - syncer_thread()->ScheduleNudgeWithPayloads( - TimeDelta::FromMilliseconds(kSyncerThreadDelayMsec), - browser_sync::NUDGE_SOURCE_NOTIFICATION, + syncer_thread()->NudgeSyncerWithPayloads( + kSyncerThreadDelayMsec, + SyncerThread::kNotification, type_payloads, FROM_HERE); } allstatus_.IncrementNotificationsReceived(); diff --git a/chrome/browser/sync/engine/syncapi.h b/chrome/browser/sync/engine/syncapi.h index 1011a29..1718c71 100644 --- a/chrome/browser/sync/engine/syncapi.h +++ b/chrome/browser/sync/engine/syncapi.h @@ -808,6 +808,12 @@ class SyncManager { // message, unless otherwise specified, produces undefined behavior. virtual void OnInitializationComplete() = 0; + // The syncer thread has been paused. + virtual void OnPaused() = 0; + + // The syncer thread has been resumed. + virtual void OnResumed() = 0; + // We are no longer permitted to communicate with the server. Sync should // be disabled and state cleaned up at once. This can happen for a number // of reasons, e.g. swapping from a test instance to production, or a @@ -911,6 +917,20 @@ class SyncManager { // types, as we do not currently support decrypting datatypes. void EncryptDataTypes(const syncable::ModelTypeSet& encrypted_types); + // Requests the syncer thread to pause. The observer's OnPause + // method will be called when the syncer thread is paused. Returns + // false if the syncer thread can not be paused (e.g. if it is not + // started). + // TODO(tim): Deprecated. + bool RequestPause(); + + // Requests the syncer thread to resume. The observer's OnResume + // method will be called when the syncer thread is resumed. Returns + // false if the syncer thread can not be resumed (e.g. if it is not + // paused). + // TODO(tim): Deprecated. + bool RequestResume(); + // Puts the SyncerThread into a mode where no normal nudge or poll traffic // will occur, but calls to RequestConfig will be supported. If |callback| // is provided, it will be invoked (from the internal SyncerThread) when diff --git a/chrome/browser/sync/engine/syncapi_unittest.cc b/chrome/browser/sync/engine/syncapi_unittest.cc index 6c68735..10d53a3 100644 --- a/chrome/browser/sync/engine/syncapi_unittest.cc +++ b/chrome/browser/sync/engine/syncapi_unittest.cc @@ -617,6 +617,8 @@ class SyncManagerObserverMock : public SyncManager::Observer { MOCK_METHOD1(OnPassphraseRequired, void(bool)); // NOLINT MOCK_METHOD0(OnPassphraseFailed, void()); // NOLINT MOCK_METHOD1(OnPassphraseAccepted, void(const std::string&)); // NOLINT + MOCK_METHOD0(OnPaused, void()); // NOLINT + MOCK_METHOD0(OnResumed, void()); // NOLINT MOCK_METHOD0(OnStopSyncingPermanently, void()); // NOLINT MOCK_METHOD1(OnUpdatedToken, void(const std::string&)); // NOLINT MOCK_METHOD0(OnClearServerDataFailed, void()); // NOLINT diff --git a/chrome/browser/sync/engine/syncer_end_command.cc b/chrome/browser/sync/engine/syncer_end_command.cc index 516bbd0..8f67841 100644 --- a/chrome/browser/sync/engine/syncer_end_command.cc +++ b/chrome/browser/sync/engine/syncer_end_command.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -21,7 +21,7 @@ void SyncerEndCommand::ExecuteImpl(sessions::SyncSession* session) { sessions::SyncSessionSnapshot snapshot(session->TakeSnapshot()); event.snapshot = &snapshot; session->context()->NotifyListeners(event); - VLOG(1) << this << " sent sync end snapshot"; + session->context()->set_last_snapshot(session->TakeSnapshot()); } } // namespace browser_sync diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc index 5bf0d4d..5851c45 100644 --- a/chrome/browser/sync/engine/syncer_thread.cc +++ b/chrome/browser/sync/engine/syncer_thread.cc @@ -5,10 +5,27 @@ #include "chrome/browser/sync/engine/syncer_thread.h" #include <algorithm> +#include <queue> +#include <string> +#include <vector> #include "base/rand_util.h" +#include "base/third_party/dynamic_annotations/dynamic_annotations.h" +#include "build/build_config.h" +#include "chrome/browser/sync/engine/model_safe_worker.h" +#include "chrome/browser/sync/engine/net/server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/sessions/sync_session.h" +#if defined(OS_MACOSX) +#include <CoreFoundation/CFNumber.h> +#include <IOKit/IOTypes.h> +#include <IOKit/IOKitLib.h> +#endif + +using std::priority_queue; +using std::min; +using base::Time; using base::TimeDelta; using base::TimeTicks; @@ -18,843 +35,861 @@ using sessions::SyncSession; using sessions::SyncSessionSnapshot; using sessions::SyncSourceInfo; using syncable::ModelTypePayloadMap; -using syncable::ModelTypeBitSet; -using sync_pb::GetUpdatesCallerInfo; - -SyncerThread::DelayProvider::DelayProvider() {} -SyncerThread::DelayProvider::~DelayProvider() {} - -SyncerThread::WaitInterval::WaitInterval() {} -SyncerThread::WaitInterval::~WaitInterval() {} -SyncerThread::SyncSessionJob::SyncSessionJob() {} -SyncerThread::SyncSessionJob::~SyncSessionJob() {} +// We use high values here to ensure that failure to receive poll updates from +// the server doesn't result in rapid-fire polling from the client due to low +// local limits. +const int SyncerThread::kDefaultShortPollIntervalSeconds = 3600 * 8; +const int SyncerThread::kDefaultLongPollIntervalSeconds = 3600 * 12; + +// TODO(tim): This is used to regulate the short poll (when notifications are +// disabled) based on user idle time. If it is set to a smaller value than +// the short poll interval, it basically does nothing; for now, this is what +// we want and allows stronger control over the poll rate from the server. We +// should probably re-visit this code later and figure out if user idle time +// is really something we want and make sure it works, if it is. +const int SyncerThread::kDefaultMaxPollIntervalMs = 30 * 60 * 1000; + +// Backoff interval randomization factor. +static const int kBackoffRandomizationFactor = 2; + +const int SyncerThread::kMaxBackoffSeconds = 60 * 60 * 4; // 4 hours. + +SyncerThread::ProtectedFields::ProtectedFields() + : stop_syncer_thread_(false), + pause_requested_(false), + paused_(false), + syncer_(NULL), + connected_(false), + pending_nudge_source_(kUnknown) {} + +SyncerThread::ProtectedFields::~ProtectedFields() {} + +void SyncerThread::NudgeSyncerWithPayloads( + int milliseconds_from_now, + NudgeSource source, + const ModelTypePayloadMap& model_types_with_payloads) { + base::AutoLock lock(lock_); + if (vault_.syncer_ == NULL) { + return; + } -SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, - base::TimeTicks start, - linked_ptr<sessions::SyncSession> session, bool is_canary_job, - const tracked_objects::Location& nudge_location) : purpose(purpose), - scheduled_start(start), - session(session), - is_canary_job(is_canary_job), - nudge_location(nudge_location) { + NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads); } -TimeDelta SyncerThread::DelayProvider::GetDelay( - const base::TimeDelta& last_delay) { - return SyncerThread::GetRecommendedDelay(last_delay); +void SyncerThread::NudgeSyncerWithDataTypes( + int milliseconds_from_now, + NudgeSource source, + const syncable::ModelTypeBitSet& model_types) { + base::AutoLock lock(lock_); + if (vault_.syncer_ == NULL) { + return; + } + + ModelTypePayloadMap model_types_with_payloads = + syncable::ModelTypePayloadMapFromBitSet(model_types, std::string()); + NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads); } -GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( +void SyncerThread::NudgeSyncer( + int milliseconds_from_now, NudgeSource source) { - switch (source) { - case NUDGE_SOURCE_NOTIFICATION: - return GetUpdatesCallerInfo::NOTIFICATION; - case NUDGE_SOURCE_LOCAL: - return GetUpdatesCallerInfo::LOCAL; - case NUDGE_SOURCE_CONTINUATION: - return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; - case NUDGE_SOURCE_UNKNOWN: - return GetUpdatesCallerInfo::UNKNOWN; - default: - NOTREACHED(); - return GetUpdatesCallerInfo::UNKNOWN; + base::AutoLock lock(lock_); + if (vault_.syncer_ == NULL) { + return; } + + // Set all enabled datatypes. + ModelSafeRoutingInfo routes; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + ModelTypePayloadMap model_types_with_payloads = + syncable::ModelTypePayloadMapFromRoutingInfo(routes, std::string()); + NudgeSyncImpl(milliseconds_from_now, source, model_types_with_payloads); } -SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) - : mode(mode), had_nudge(false), length(length) { } +SyncerThread::SyncerThread(sessions::SyncSessionContext* context) + : thread_main_started_(false, false), + thread_("SyncEngine_SyncerThread"), + vault_field_changed_(&lock_), + conn_mgr_hookup_(NULL), + syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), + syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), + syncer_polling_interval_(kDefaultShortPollIntervalSeconds), + syncer_max_interval_(kDefaultMaxPollIntervalMs), + session_context_(context), + disable_idle_detection_(false) { + DCHECK(context); -SyncerThread::SyncerThread(sessions::SyncSessionContext* context, - Syncer* syncer) - : thread_("SyncEngine_SyncerThread"), - syncer_short_poll_interval_seconds_( - TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), - syncer_long_poll_interval_seconds_( - TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), - mode_(NORMAL_MODE), - server_connection_ok_(false), - delay_provider_(new DelayProvider()), - syncer_(syncer), - session_context_(context) { + if (context->connection_manager()) + WatchConnectionManager(context->connection_manager()); } SyncerThread::~SyncerThread() { - DCHECK(!thread_.IsRunning()); -} - -void SyncerThread::CheckServerConnectionManagerStatus( - HttpResponse::ServerConnectionCode code) { - - VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." - << "Old mode: " << server_connection_ok_ << " Code: " << code; - // Note, be careful when adding cases here because if the SyncerThread - // thinks there is no valid connection as determined by this method, it - // will drop out of *all* forward progress sync loops (it won't poll and it - // will queue up Talk notifications but not actually call SyncShare) until - // some external action causes a ServerConnectionManager to broadcast that - // a valid connection has been re-established. - if (HttpResponse::CONNECTION_UNAVAILABLE == code || - HttpResponse::SYNC_AUTH_ERROR == code) { - server_connection_ok_ = false; - VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." - << " new mode:" << server_connection_ok_; - } else if (HttpResponse::SERVER_CONNECTION_OK == code) { - server_connection_ok_ = true; - VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." - << " new mode:" << server_connection_ok_; - DoCanaryJob(); - } -} + conn_mgr_hookup_.reset(); + delete vault_.syncer_; + CHECK(!thread_.IsRunning()); +} + +// Creates and starts a syncer thread. +// Returns true if it creates a thread or if there's currently a thread running +// and false otherwise. +bool SyncerThread::Start() { + { + base::AutoLock lock(lock_); + if (thread_.IsRunning()) { + return true; + } -void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { - VLOG(1) << "SyncerThread(" << this << ")" << " Start called from thread " - << MessageLoop::current()->thread_name(); - if (!thread_.IsRunning()) { - VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode " - << mode; if (!thread_.Start()) { - NOTREACHED() << "Unable to start SyncerThread."; - return; + return false; } - WatchConnectionManager(); - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::SendInitialSnapshot)); } - VLOG(1) << "SyncerThread(" << this << ")" << " Entering start with mode = " - << mode; + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::ThreadMain)); - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); + // Wait for notification that our task makes it safely onto the message + // loop before returning, so the caller can't call Stop before we're + // actually up and running. This is for consistency with the old pthread + // impl because pthread_create would do this in one step. + thread_main_started_.Wait(); + VLOG(1) << "SyncerThread started."; + return true; } -void SyncerThread::SendInitialSnapshot() { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, - SyncSourceInfo(), ModelSafeRoutingInfo(), - std::vector<ModelSafeWorker*>())); - SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); - sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); - event.snapshot = &snapshot; - session_context_->NotifyListeners(event); -} +// Stop processing. A max wait of at least 2*server RTT time is recommended. +// Returns true if we stopped, false otherwise. +bool SyncerThread::Stop(int max_wait) { + RequestSyncerExitAndSetThreadStopConditions(); -void SyncerThread::WatchConnectionManager() { - ServerConnectionManager* scm = session_context_->connection_manager(); - CheckServerConnectionManagerStatus(scm->server_status()); - scm->AddListener(this); + // This will join, and finish when ThreadMain terminates. + thread_.Stop(); + return true; } -void SyncerThread::StartImpl(Mode mode, - linked_ptr<ModeChangeCallback> callback) { - VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " - << mode; - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - DCHECK(!session_context_->account_name().empty()); - DCHECK(syncer_.get()); - mode_ = mode; - AdjustPolling(NULL); // Will kick start poll timer if needed. - if (callback.get()) - callback->Run(); +void SyncerThread::RequestSyncerExitAndSetThreadStopConditions() { + { + base::AutoLock lock(lock_); + // If the thread has been started, then we either already have or are about + // to enter ThreadMainLoop so we have to proceed with shutdown and wait for + // it to finish. If the thread has not been started --and we now own the + // lock-- then we can early out because the caller has not called Start(). + if (!thread_.IsRunning()) + return; + + VLOG(1) << "SyncerThread::Stop - setting ThreadMain exit condition to true " + "(vault_.stop_syncer_thread_)"; + // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit + // below). + vault_.stop_syncer_thread_ = true; + if (NULL != vault_.syncer_) { + // Try to early exit the syncer itself, which could be looping inside + // SyncShare. + vault_.syncer_->RequestEarlyExit(); + } - // We just changed our mode. See if there are any pending jobs that we could - // execute in the new mode. - DoPendingJobIfPossible(false); + // stop_syncer_thread_ is now true and the Syncer has been told to exit. + // We want to wake up all waiters so they can re-examine state. We signal, + // causing all waiters to try to re-acquire the lock, and then we release + // the lock, and join on our internal thread which should soon run off the + // end of ThreadMain. + vault_field_changed_.Broadcast(); + } } -SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( - const SyncSessionJob& job) { +bool SyncerThread::RequestPause() { + base::AutoLock lock(lock_); + if (vault_.pause_requested_ || vault_.paused_) + return false; - DCHECK(wait_interval_.get()); - DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); + if (thread_.IsRunning()) { + // Set the pause request. The syncer thread will read this + // request, enter the paused state, and send the PAUSED + // notification. + vault_.pause_requested_ = true; + vault_field_changed_.Broadcast(); + VLOG(1) << "Pause requested."; + } else { + // If the thread is not running, go directly into the paused state + // and notify. + EnterPausedState(); + VLOG(1) << "Paused while not running."; + } + return true; +} - VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : " - << wait_interval_->mode << "Wait interval had nudge : " - << wait_interval_->had_nudge << "is canary job : " - << job.is_canary_job; +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { + session_context_->NotifyListeners(SyncEngineEvent(cause)); +} - if (job.purpose == SyncSessionJob::POLL) - return DROP; +bool SyncerThread::RequestResume() { + base::AutoLock lock(lock_); + // Only valid to request a resume when we are already paused or we + // have a pause pending. + if (!(vault_.paused_ || vault_.pause_requested_)) + return false; + + if (thread_.IsRunning()) { + if (vault_.pause_requested_) { + // If pause was requested we have not yet paused. In this case, + // the resume cancels the pause request. + vault_.pause_requested_ = false; + vault_field_changed_.Broadcast(); + Notify(SyncEngineEvent::SYNCER_THREAD_RESUMED); + VLOG(1) << "Pending pause canceled by resume."; + } else { + // Unpause and notify. + vault_.paused_ = false; + vault_field_changed_.Broadcast(); + } + } else { + ExitPausedState(); + VLOG(1) << "Resumed while not running."; + } + return true; +} - DCHECK(job.purpose == SyncSessionJob::NUDGE || - job.purpose == SyncSessionJob::CONFIGURATION); - if (wait_interval_->mode == WaitInterval::THROTTLED) - return SAVE; +void SyncerThread::OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) { + syncer_long_poll_interval_seconds_ = static_cast<int>( + new_interval.InSeconds()); +} - DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); - if (job.purpose == SyncSessionJob::NUDGE) { - if (mode_ == CONFIGURATION_MODE) - return SAVE; +void SyncerThread::OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) { + syncer_short_poll_interval_seconds_ = static_cast<int>( + new_interval.InSeconds()); +} - // If we already had one nudge then just drop this nudge. We will retry - // later when the timer runs out. - return wait_interval_->had_nudge ? DROP : CONTINUE; - } - // This is a config job. - return job.is_canary_job ? CONTINUE : SAVE; +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { + silenced_until_ = silenced_until; } -SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( - const SyncSessionJob& job) { - if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) - return CONTINUE; +bool SyncerThread::IsSyncingCurrentlySilenced() { + // We should ignore reads from silenced_until_ under ThreadSanitizer + // since this is a benign race. + ANNOTATE_IGNORE_READS_BEGIN(); + bool ret = (silenced_until_ - TimeTicks::Now()) >= TimeDelta::FromSeconds(0); + ANNOTATE_IGNORE_READS_END(); + return ret; +} - if (wait_interval_.get()) - return DecideWhileInWaitInterval(job); +void SyncerThread::OnShouldStopSyncingPermanently() { + RequestSyncerExitAndSetThreadStopConditions(); + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); +} - if (mode_ == CONFIGURATION_MODE) { - if (job.purpose == SyncSessionJob::NUDGE) - return SAVE; - else if (job.purpose == SyncSessionJob::CONFIGURATION) - return CONTINUE; - else - return DROP; +void SyncerThread::ThreadMainLoop() { + // This is called with lock_ acquired. + lock_.AssertAcquired(); + VLOG(1) << "In thread main loop."; + + // Use the short poll value by default. + vault_.current_wait_interval_.poll_delta = + TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); + int user_idle_milliseconds = 0; + TimeTicks last_sync_time; + bool initial_sync_for_thread = true; + bool continue_sync_cycle = false; + +#if defined(OS_LINUX) + idle_query_.reset(new IdleQueryLinux()); +#endif + + if (vault_.syncer_ == NULL) { + VLOG(1) << "Syncer thread waiting for database initialization."; + while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) + vault_field_changed_.Wait(); + VLOG_IF(1, !(vault_.syncer_ == NULL)) << "Syncer was found after DB " + "started."; } - // We are in normal mode. - DCHECK_EQ(mode_, NORMAL_MODE); - DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); - - // Freshness condition - if (job.scheduled_start < last_sync_session_end_time_) { - VLOG(1) << "SyncerThread(" << this << ")" - << " Dropping job because of freshness"; - return DROP; - } + while (!vault_.stop_syncer_thread_) { + // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as + // below) because we cannot poll until these conditions are met, so we wait + // indefinitely. + + // If we are not connected, enter WaitUntilConnectedOrQuit() which + // will return only when the network is connected or a quit is + // requested. Note that it is possible to exit + // WaitUntilConnectedOrQuit() in the paused state which will be + // handled by the next statement. + if (!vault_.connected_ && !initial_sync_for_thread) { + WaitUntilConnectedOrQuit(); + continue; + } - if (server_connection_ok_) - return CONTINUE; + // Check if we should be paused or if a pause was requested. Note + // that we don't check initial_sync_for_thread here since we want + // the pause to happen regardless if it is the initial sync or not. + if (vault_.pause_requested_ || vault_.paused_) { + PauseUntilResumedOrQuit(); + continue; + } - VLOG(1) << "SyncerThread(" << this << ")" - << " Bad server connection. Using that to decide on job."; - return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; -} + const TimeTicks next_poll = last_sync_time + + vault_.current_wait_interval_.poll_delta; + bool throttled = vault_.current_wait_interval_.mode == + WaitInterval::THROTTLED; + // If we are throttled, we must wait. Otherwise, wait until either the next + // nudge (if one exists) or the poll interval. + TimeTicks end_wait = next_poll; + if (!throttled && !vault_.pending_nudge_time_.is_null()) { + end_wait = std::min(end_wait, vault_.pending_nudge_time_); + } + VLOG(1) << "end_wait is " << end_wait.ToInternalValue() + << "\nnext_poll is " << next_poll.ToInternalValue(); + + // We block until the CV is signaled (e.g a control field changed, loss of + // network connection, nudge, spurious, etc), or the poll interval elapses. + TimeDelta sleep_time = end_wait - TimeTicks::Now(); + if (!initial_sync_for_thread && sleep_time > TimeDelta::FromSeconds(0)) { + vault_field_changed_.TimedWait(sleep_time); + + if (TimeTicks::Now() < end_wait) { + // Didn't timeout. Could be a spurious signal, or a signal corresponding + // to an actual change in one of our control fields. By continuing here + // we perform the typical "always recheck conditions when signaled", + // (typically handled by a while(condition_not_met) cv.wait() construct) + // because we jump to the top of the loop. The main difference is we + // recalculate the wait interval, but last_sync_time won't have changed. + // So if we were signaled by a nudge (for ex.) we'll grab the new nudge + // off the queue and wait for that delta. If it was a spurious signal, + // we'll keep waiting for the same moment in time as we just were. + continue; + } + } -void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { - DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); - if (pending_nudge_.get() == NULL) { - VLOG(1) << "SyncerThread(" << this << ")" - << " Creating a pending nudge job"; - SyncSession* s = job.session.get(); - scoped_ptr<SyncSession> session(new SyncSession(s->context(), - s->delegate(), s->source(), s->routing_info(), s->workers())); + // Handle a nudge, caused by either a notification or a local bookmark + // event. This will also update the source of the following SyncMain call. + VLOG(1) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); + bool nudged = false; + scoped_ptr<SyncSession> session; + session.reset(SyncMain(vault_.syncer_, + throttled, continue_sync_cycle, &initial_sync_for_thread, &nudged)); + + // Update timing information for how often these datatypes are triggering + // nudges. + base::TimeTicks now = TimeTicks::Now(); + if (!last_sync_time.is_null()) { + ModelTypePayloadMap::const_iterator iter; + for (iter = session->source().types.begin(); + iter != session->source().types.end(); + ++iter) { + syncable::PostTimeToTypeHistogram(iter->first, + now - last_sync_time); + } + } - SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, - make_linked_ptr(session.release()), false, job.nudge_location); - pending_nudge_.reset(new SyncSessionJob(new_job)); + last_sync_time = now; - return; + VLOG(1) << "Updating the next polling time after SyncMain"; + vault_.current_wait_interval_ = CalculatePollingWaitTime( + static_cast<int>(vault_.current_wait_interval_.poll_delta.InSeconds()), + &user_idle_milliseconds, &continue_sync_cycle, nudged); } +#if defined(OS_LINUX) + idle_query_.reset(); +#endif +} - VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; - pending_nudge_->session->Coalesce(*(job.session.get())); - pending_nudge_->scheduled_start = job.scheduled_start; +void SyncerThread::SetConnected(bool connected) { + DCHECK(!thread_.IsRunning()); + vault_.connected_ = connected; +} - // Unfortunately the nudge location cannot be modified. So it stores the - // location of the first caller. +void SyncerThread::SetSyncerPollingInterval(base::TimeDelta interval) { + // TODO(timsteele): Use TimeDelta internally. + syncer_polling_interval_ = static_cast<int>(interval.InSeconds()); } -bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { - JobProcessDecision decision = DecideOnJob(job); - VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: " - << decision << " Job purpose " << job.purpose << "mode " << mode_; - if (decision != SAVE) - return decision == CONTINUE; +void SyncerThread::SetSyncerShortPollInterval(base::TimeDelta interval) { + // TODO(timsteele): Use TimeDelta internally. + syncer_short_poll_interval_seconds_ = + static_cast<int>(interval.InSeconds()); +} - DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == - SyncSessionJob::CONFIGURATION); +void SyncerThread::WaitUntilConnectedOrQuit() { + VLOG(1) << "Syncer thread waiting for connection."; + Notify(SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION); - SaveJob(job); - return false; -} + bool is_paused = vault_.paused_; -void SyncerThread::SaveJob(const SyncSessionJob& job) { - DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); - if (job.purpose == SyncSessionJob::NUDGE) { - VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job"; - InitOrCoalescePendingJob(job); - } else if (job.purpose == SyncSessionJob::CONFIGURATION){ - VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job"; - DCHECK(wait_interval_.get()); - DCHECK(mode_ == CONFIGURATION_MODE); + while (!vault_.connected_ && !vault_.stop_syncer_thread_) { + if (!is_paused && vault_.pause_requested_) { + // If we get a pause request while waiting for a connection, + // enter the paused state. + EnterPausedState(); + is_paused = true; + VLOG(1) << "Syncer thread entering disconnected pause."; + } - SyncSession* old = job.session.get(); - SyncSession* s(new SyncSession(session_context_.get(), this, - old->source(), old->routing_info(), old->workers())); - SyncSessionJob new_job(job.purpose, TimeTicks::Now(), - make_linked_ptr(s), false, job.nudge_location); - wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); - } // drop the rest. -} + if (is_paused && !vault_.paused_) { + ExitPausedState(); + is_paused = false; + VLOG(1) << "Syncer thread exiting disconnected pause."; + } -// Functor for std::find_if to search by ModelSafeGroup. -struct ModelSafeWorkerGroupIs { - explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} - bool operator()(ModelSafeWorker* w) { - return group == w->GetModelSafeGroup(); + vault_field_changed_.Wait(); } - ModelSafeGroup group; -}; -void SyncerThread::ScheduleClearUserData() { - if (!thread_.IsRunning()) { - NOTREACHED(); - return; + if (!vault_.stop_syncer_thread_) { + Notify(SyncEngineEvent::SYNCER_THREAD_CONNECTED); + VLOG(1) << "Syncer thread found connection."; } - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleClearUserDataImpl)); } -void SyncerThread::ScheduleNudge(const TimeDelta& delay, - NudgeSource source, const ModelTypeBitSet& types, - const tracked_objects::Location& nudge_location) { - if (!thread_.IsRunning()) { - NOTREACHED(); - return; - } +void SyncerThread::PauseUntilResumedOrQuit() { + VLOG(1) << "Syncer thread entering pause."; + // If pause was requested (rather than already being paused), send + // the PAUSED notification. + if (vault_.pause_requested_) + EnterPausedState(); - VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled"; + // Thread will get stuck here until either a resume is requested + // or shutdown is started. + while (vault_.paused_ && !vault_.stop_syncer_thread_) + vault_field_changed_.Wait(); - ModelTypePayloadMap types_with_payloads = - syncable::ModelTypePayloadMapFromBitSet(types, std::string()); - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleNudgeImpl, delay, - GetUpdatesFromNudgeSource(source), types_with_payloads, false, - nudge_location)); -} + // Notify that we have resumed if we are not shutting down. + if (!vault_.stop_syncer_thread_) + ExitPausedState(); -void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, - NudgeSource source, const ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location) { - if (!thread_.IsRunning()) { - NOTREACHED(); - return; - } - - VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; - - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleNudgeImpl, delay, - GetUpdatesFromNudgeSource(source), types_with_payloads, false, - nudge_location)); + VLOG(1) << "Syncer thread exiting pause."; } -void SyncerThread::ScheduleClearUserDataImpl() { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - SyncSession* session = new SyncSession(session_context_.get(), this, - SyncSourceInfo(), ModelSafeRoutingInfo(), - std::vector<ModelSafeWorker*>()); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), - SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); +void SyncerThread::EnterPausedState() { + lock_.AssertAcquired(); + vault_.pause_requested_ = false; + vault_.paused_ = true; + vault_field_changed_.Broadcast(); + Notify(SyncEngineEvent::SYNCER_THREAD_PAUSED); } -void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, - GetUpdatesCallerInfo::GetUpdatesSource source, - const ModelTypePayloadMap& types_with_payloads, - bool is_canary_job, const tracked_objects::Location& nudge_location) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); +void SyncerThread::ExitPausedState() { + lock_.AssertAcquired(); + vault_.paused_ = false; + vault_field_changed_.Broadcast(); + Notify(SyncEngineEvent::SYNCER_THREAD_RESUMED); +} - VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; - // Note we currently nudge for all types regardless of the ones incurring - // the nudge. Doing different would throw off some syncer commands like - // CleanupDisabledTypes. We may want to change this in the future. - SyncSourceInfo info(source, types_with_payloads); +void SyncerThread::DisableIdleDetection() { + disable_idle_detection_ = true; +} - SyncSession* session(CreateSyncSession(info)); - SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, - make_linked_ptr(session), is_canary_job, - nudge_location); +// We check how long the user's been idle and sync less often if the machine is +// not in use. The aim is to reduce server load. +SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime( + int last_poll_wait, // Time in seconds. + int* user_idle_milliseconds, + bool* continue_sync_cycle, + bool was_nudged) { + lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock. + WaitInterval return_interval; - session = NULL; - if (!ShouldRunJob(job)) - return; + // Server initiated throttling trumps everything. + if (!silenced_until_.is_null()) { + // We don't need to reset other state, it can continue where it left off. + return_interval.mode = WaitInterval::THROTTLED; + return_interval.poll_delta = silenced_until_ - TimeTicks::Now(); + return return_interval; + } - if (pending_nudge_.get()) { - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { - VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because" - << "we are in backoff"; - return; - } + bool is_continuing_sync_cyle = *continue_sync_cycle; + *continue_sync_cycle = false; - VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; - pending_nudge_->session->Coalesce(*(job.session.get())); + // Determine if the syncer has unfinished work to do. + SyncSessionSnapshot* snapshot = session_context_->previous_session_snapshot(); + const bool syncer_has_work_to_do = snapshot && + (snapshot->num_server_changes_remaining > 0 || + snapshot->unsynced_count > 0); + VLOG(1) << "syncer_has_work_to_do is " << syncer_has_work_to_do; - if (!IsBackingOff()) { - VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because" - << " we are not in backoff and the job was coalesced"; - return; + // First calculate the expected wait time, figuring in any backoff because of + // user idle time. next_wait is in seconds + syncer_polling_interval_ = (!session_context_->notifications_enabled()) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + int default_next_wait = syncer_polling_interval_; + return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait); + + if (syncer_has_work_to_do) { + // Provide exponential backoff due to consecutive errors, else attempt to + // complete the work as soon as possible. + if (is_continuing_sync_cyle) { + return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF; + if (was_nudged && vault_.current_wait_interval_.mode == + WaitInterval::EXPONENTIAL_BACKOFF) { + // We were nudged, it failed, and we were already in backoff. + return_interval.had_nudge_during_backoff = true; + // Keep exponent for exponential backoff the same in this case. + return_interval.poll_delta = vault_.current_wait_interval_.poll_delta; + } else { + // We weren't nudged, or we were in a NORMAL wait interval until now. + return_interval.poll_delta = TimeDelta::FromSeconds( + GetRecommendedDelaySeconds(last_poll_wait)); + } } else { - VLOG(1) << "SyncerThread(" << this << ")" - << " Rescheduling pending nudge"; - SyncSession* s = pending_nudge_->session.get(); - job.session.reset(new SyncSession(s->context(), s->delegate(), - s->source(), s->routing_info(), s->workers())); - pending_nudge_.reset(); + // No consecutive error. + return_interval.poll_delta = TimeDelta::FromSeconds( + GetRecommendedDelaySeconds(0)); + } + *continue_sync_cycle = true; + } else if (!session_context_->notifications_enabled()) { + // Ensure that we start exponential backoff from our base polling + // interval when we are not continuing a sync cycle. + last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); + + // Did the user start interacting with the computer again? + // If so, revise our idle time (and probably next_sync_time) downwards + int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); + if (new_idle_time < *user_idle_milliseconds) { + *user_idle_milliseconds = new_idle_time; } + return_interval.poll_delta = TimeDelta::FromMilliseconds( + CalculateSyncWaitTime(last_poll_wait * 1000, + *user_idle_milliseconds)); + DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait); } - // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. - ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), - nudge_location); -} - -// Helper to extract the routing info and workers corresponding to types in -// |types| from |registrar|. -void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, - ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, - std::vector<ModelSafeWorker*>* workers) { - ModelSafeRoutingInfo r_tmp; - std::vector<ModelSafeWorker*> w_tmp; - registrar->GetModelSafeRoutingInfo(&r_tmp); - registrar->GetWorkers(&w_tmp); + VLOG(1) << "Sync wait: idle " << default_next_wait + << " non-idle or backoff " << return_interval.poll_delta.InSeconds(); - typedef std::vector<ModelSafeWorker*>::const_iterator iter; - for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { - if (!types.test(i)) - continue; - syncable::ModelType t = syncable::ModelTypeFromInt(i); - DCHECK_EQ(1U, r_tmp.count(t)); - (*routes)[t] = r_tmp[t]; - iter it = std::find_if(w_tmp.begin(), w_tmp.end(), - ModelSafeWorkerGroupIs(r_tmp[t])); - if (it != w_tmp.end()) - workers->push_back(*it); - else - NOTREACHED(); - } + return return_interval; +} - iter it = std::find_if(w_tmp.begin(), w_tmp.end(), - ModelSafeWorkerGroupIs(GROUP_PASSIVE)); - if (it != w_tmp.end()) - workers->push_back(*it); - else - NOTREACHED(); +void SyncerThread::ThreadMain() { + base::AutoLock lock(lock_); + // Signal Start() to let it know we've made it safely onto the message loop, + // and unblock it's caller. + thread_main_started_.Signal(); + ThreadMainLoop(); + VLOG(1) << "Syncer thread ThreadMain is done."; + Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); } -void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { - if (!thread_.IsRunning()) { - NOTREACHED(); - return; - } +SyncSession* SyncerThread::SyncMain(Syncer* syncer, bool was_throttled, + bool continue_sync_cycle, bool* initial_sync_for_thread, + bool* was_nudged) { + CHECK(syncer); + + // Since we are initiating a new session for which we are the delegate, we + // are not currently silenced so reset this state for the next session which + // may need to use it. + silenced_until_ = base::TimeTicks(); - VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config"; ModelSafeRoutingInfo routes; std::vector<ModelSafeWorker*> workers; - GetModelSafeParamsForTypes(types, session_context_->registrar(), - &routes, &workers); - - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleConfigImpl, routes, workers, - GetUpdatesCallerInfo::FIRST_UPDATE)); -} - -void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, - const std::vector<ModelSafeWorker*>& workers, - const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - - VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; - // TODO(tim): config-specific GetUpdatesCallerInfo value? - SyncSession* session = new SyncSession(session_context_.get(), this, - SyncSourceInfo(source, - syncable::ModelTypePayloadMapFromRoutingInfo( - routing_info, std::string())), - routing_info, workers); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), - SyncSessionJob::CONFIGURATION, session, FROM_HERE); -} - -void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, - SyncSessionJob::SyncSessionJobPurpose purpose, - sessions::SyncSession* session, - const tracked_objects::Location& nudge_location) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - - SyncSessionJob job(purpose, TimeTicks::Now() + delay, - make_linked_ptr(session), false, nudge_location); - if (purpose == SyncSessionJob::NUDGE) { - VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" - << " ScheduleSyncSessionJob"; - DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); - pending_nudge_.reset(new SyncSessionJob(job)); - } - VLOG(1) << "SyncerThread(" << this << ")" - << " Posting job to execute in DoSyncSessionJob. Job purpose " - << job.purpose; - MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, - &SyncerThread::DoSyncSessionJob, job), - delay.InMilliseconds()); -} - -void SyncerThread::SetSyncerStepsForPurpose( - SyncSessionJob::SyncSessionJobPurpose purpose, - SyncerStep* start, SyncerStep* end) { - *end = SYNCER_END; - switch (purpose) { - case SyncSessionJob::CONFIGURATION: - *start = DOWNLOAD_UPDATES; - *end = APPLY_UPDATES; - return; - case SyncSessionJob::CLEAR_USER_DATA: - *start = CLEAR_PRIVATE_DATA; - return; - case SyncSessionJob::NUDGE: - case SyncSessionJob::POLL: - *start = SYNCER_BEGIN; - return; - default: - NOTREACHED(); - } -} - -void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - if (!ShouldRunJob(job)) - return; - - if (job.purpose == SyncSessionJob::NUDGE) { - DCHECK(pending_nudge_.get()); - if (pending_nudge_->session != job.session) - return; // Another nudge must have been scheduled in in the meantime. - pending_nudge_.reset(); - } - VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " - << job.purpose; - - SyncerStep begin(SYNCER_BEGIN); - SyncerStep end(SYNCER_END); - SetSyncerStepsForPurpose(job.purpose, &begin, &end); - - bool has_more_to_sync = true; - while (ShouldRunJob(job) && has_more_to_sync) { - VLOG(1) << "SyncerThread(" << this << ")" - << " SyncerThread: Calling SyncShare."; - // Synchronously perform the sync session from this thread. - syncer_->SyncShare(job.session.get(), begin, end); - has_more_to_sync = job.session->HasMoreToSync(); - if (has_more_to_sync) - job.session->ResetTransientState(); - } - VLOG(1) << "SyncerThread(" << this << ")" - << " SyncerThread: Done SyncShare looping."; - FinishSyncSessionJob(job); -} - -void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { - if (old_job.purpose == SyncSessionJob::CONFIGURATION) { - // Whatever types were part of a configuration task will have had updates - // downloaded. For that reason, we make sure they get recorded in the - // event that they get disabled at a later time. - ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); - if (!r.empty()) { - ModelSafeRoutingInfo temp_r; - ModelSafeRoutingInfo old_info(old_job.session->routing_info()); - std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), - std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); - session_context_->set_previous_session_routing_info(temp_r); + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(GetAndResetNudgeSource(was_throttled, + continue_sync_cycle, initial_sync_for_thread, was_nudged)); + scoped_ptr<SyncSession> session; + + base::AutoUnlock unlock(lock_); + do { + session.reset(new SyncSession(session_context_.get(), this, + info, routes, workers)); + VLOG(1) << "Calling SyncShare."; + syncer->SyncShare(session.get()); + } while (session->HasMoreToSync() && silenced_until_.is_null()); + + VLOG(1) << "Done calling SyncShare."; + return session.release(); +} + +SyncSourceInfo SyncerThread::GetAndResetNudgeSource(bool was_throttled, + bool continue_sync_cycle, + bool* initial_sync, + bool* was_nudged) { + bool nudged = false; + NudgeSource nudge_source = kUnknown; + ModelTypePayloadMap model_types_with_payloads; + // Has the previous sync cycle completed? + if (continue_sync_cycle) + nudge_source = kContinuation; + // Update the nudge source if a new nudge has come through during the + // previous sync cycle. + if (!vault_.pending_nudge_time_.is_null()) { + if (!was_throttled) { + nudge_source = vault_.pending_nudge_source_; + model_types_with_payloads = vault_.pending_nudge_types_; + nudged = true; } - } else { - session_context_->set_previous_session_routing_info( - old_job.session->routing_info()); + VLOG(1) << "Clearing pending nudge from " << vault_.pending_nudge_source_ + << " at tick " << vault_.pending_nudge_time_.ToInternalValue(); + vault_.pending_nudge_source_ = kUnknown; + vault_.pending_nudge_types_.clear(); + vault_.pending_nudge_time_ = base::TimeTicks(); } -} -void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - // Update timing information for how often datatypes are triggering nudges. - base::TimeTicks now = TimeTicks::Now(); - if (!last_sync_session_end_time_.is_null()) { - ModelTypePayloadMap::const_iterator iter; - for (iter = job.session->source().types.begin(); - iter != job.session->source().types.end(); - ++iter) { - syncable::PostTimeToTypeHistogram(iter->first, - now - last_sync_session_end_time_); + *was_nudged = nudged; + + // TODO(tim): Hack for bug 64136 to correctly tag continuations that result + // from syncer having more work to do. This will be handled properly with + // the message loop based syncer thread, bug 26339. + return MakeSyncSourceInfo(nudged || nudge_source == kContinuation, + nudge_source, model_types_with_payloads, initial_sync); +} + +SyncSourceInfo SyncerThread::MakeSyncSourceInfo(bool nudged, + NudgeSource nudge_source, + const ModelTypePayloadMap& model_types_with_payloads, + bool* initial_sync) { + sync_pb::GetUpdatesCallerInfo::GetUpdatesSource updates_source = + sync_pb::GetUpdatesCallerInfo::UNKNOWN; + if (*initial_sync) { + updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; + *initial_sync = false; + } else if (!nudged) { + updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; + } else { + switch (nudge_source) { + case kNotification: + updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; + break; + case kLocal: + updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; + break; + case kContinuation: + updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + break; + case kClearPrivateData: + updates_source = sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; + break; + case kUnknown: + default: + updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; + break; } } - last_sync_session_end_time_ = now; - UpdateCarryoverSessionState(job); - if (IsSyncingCurrentlySilenced()) { - VLOG(1) << "SyncerThread(" << this << ")" - << " We are currently throttled. So not scheduling the next sync."; - SaveJob(job); - return; // Nothing to do. - } - VLOG(1) << "SyncerThread(" << this << ")" - << " Updating the next polling time after SyncMain"; - ScheduleNextSync(job); -} - -void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - DCHECK(!old_job.session->HasMoreToSync()); - // Note: |num_server_changes_remaining| > 0 here implies that we received a - // broken response while trying to download all updates, because the Syncer - // will loop until this value is exhausted. Also, if unsynced_handles exist - // but HasMoreToSync is false, this implies that the Syncer determined no - // forward progress was possible at this time (an error, such as an HTTP - // 500, is likely to have occurred during commit). - const bool work_to_do = - old_job.session->status_controller()->num_server_changes_remaining() > 0 - || old_job.session->status_controller()->unsynced_handles().size() > 0; - VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: " - << work_to_do; - - AdjustPolling(&old_job); - - // TODO(tim): Old impl had special code if notifications disabled. Needed? - if (!work_to_do) { - // Success implies backoff relief. Note that if this was a "one-off" job - // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was - // work_to_do before it ran this wont have changed, as jobs like this don't - // run a full sync cycle. So we don't need special code here. - wait_interval_.reset(); - VLOG(1) << "SyncerThread(" << this << ")" - << " Job suceeded so not scheduling more jobs"; - return; + ModelTypePayloadMap sync_source_types; + if (model_types_with_payloads.empty()) { + // No datatypes requested. This must be a poll so set all enabled datatypes. + ModelSafeRoutingInfo routes; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + sync_source_types = syncable::ModelTypePayloadMapFromRoutingInfo(routes, + std::string()); + } else { + sync_source_types = model_types_with_payloads; } - if (old_job.session->source().updates_source == - GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { - VLOG(1) << "SyncerThread(" << this << ")" - << " Job failed with source continuation"; - // We don't seem to have made forward progress. Start or extend backoff. - HandleConsecutiveContinuationError(old_job); - } else if (IsBackingOff()) { - VLOG(1) << "SyncerThread(" << this << ")" - << " A nudge during backoff failed"; - // We weren't continuing but we're in backoff; must have been a nudge. - DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); - DCHECK(!wait_interval_->had_nudge); - wait_interval_->had_nudge = true; - wait_interval_->timer.Reset(); + return SyncSourceInfo(updates_source, sync_source_types); +} + +void SyncerThread::CreateSyncer(const std::string& dirname) { + base::AutoLock lock(lock_); + VLOG(1) << "Creating syncer up for: " << dirname; + // The underlying database structure is ready, and we should create + // the syncer. + CHECK(vault_.syncer_ == NULL); + vault_.syncer_ = new Syncer(); + vault_field_changed_.Broadcast(); +} + +// Sets |*connected| to false if it is currently true but |code| suggests that +// the current network configuration and/or auth state cannot be used to make +// forward progress, and user intervention (e.g changing server URL or auth +// credentials) is likely necessary. If |*connected| is false, set it to true +// if |code| suggests that we just recently made healthy contact with the +// server. +static inline void CheckConnected(bool* connected, + HttpResponse::ServerConnectionCode code, + base::ConditionVariable* condvar) { + if (*connected) { + // Note, be careful when adding cases here because if the SyncerThread + // thinks there is no valid connection as determined by this method, it + // will drop out of *all* forward progress sync loops (it won't poll and it + // will queue up Talk notifications but not actually call SyncShare) until + // some external action causes a ServerConnectionManager to broadcast that + // a valid connection has been re-established. + if (HttpResponse::CONNECTION_UNAVAILABLE == code || + HttpResponse::SYNC_AUTH_ERROR == code) { + *connected = false; + condvar->Broadcast(); + } } else { - VLOG(1) << "SyncerThread(" << this << ")" - << " Failed. Schedule a job with continuation as source"; - // We weren't continuing and we aren't in backoff. Schedule a normal - // continuation. - if (old_job.purpose == SyncSessionJob::CONFIGURATION) { - ScheduleConfigImpl(old_job.session->routing_info(), - old_job.session->workers(), - GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); - } else { - // For all other purposes(nudge and poll) we schedule a retry nudge. - ScheduleNudgeImpl(TimeDelta::FromSeconds(0), - GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), - old_job.session->source().types, false, FROM_HERE); + if (HttpResponse::SERVER_CONNECTION_OK == code) { + *connected = true; + condvar->Broadcast(); } } } -void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { - DCHECK(thread_.IsRunning()); - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - - TimeDelta poll = (!session_context_->notifications_enabled()) ? - syncer_short_poll_interval_seconds_ : - syncer_long_poll_interval_seconds_; - bool rate_changed = !poll_timer_.IsRunning() || - poll != poll_timer_.GetCurrentDelay(); - - if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) - poll_timer_.Reset(); - - if (!rate_changed) - return; - - // Adjust poll rate. - poll_timer_.Stop(); - poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); +void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { + conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, + &SyncerThread::HandleServerConnectionEvent)); + CheckConnected(&vault_.connected_, conn_mgr->server_status(), + &vault_field_changed_); } -void SyncerThread::HandleConsecutiveContinuationError( - const SyncSessionJob& old_job) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - // This if conditions should be compiled out in retail builds. - if (IsBackingOff()) { - DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); +void SyncerThread::HandleServerConnectionEvent( + const ServerConnectionEvent& event) { + if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { + base::AutoLock lock(lock_); + CheckConnected(&vault_.connected_, event.connection_code, + &vault_field_changed_); } - SyncSession* old = old_job.session.get(); - SyncSession* s(new SyncSession(session_context_.get(), this, - old->source(), old->routing_info(), old->workers())); - TimeDelta length = delay_provider_->GetDelay( - IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); - - VLOG(1) << "SyncerThread(" << this << ")" - << " In handle continuation error. Old job purpose is " - << old_job.purpose; - VLOG(1) << "SyncerThread(" << this << ")" - << " In Handle continuation error. The time delta(ms) is: " - << length.InMilliseconds(); - - // This will reset the had_nudge variable as well. - wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, - length)); - if (old_job.purpose == SyncSessionJob::CONFIGURATION) { - SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, - make_linked_ptr(s), false, FROM_HERE); - wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); - } else { - // We are not in configuration mode. So wait_interval's pending job - // should be null. - DCHECK(wait_interval_->pending_configure_job.get() == NULL); - - // TODO(lipalani) - handle clear user data. - InitOrCoalescePendingJob(old_job); - } - wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); } -// static -TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { - if (last_delay.InSeconds() >= kMaxBackoffSeconds) - return TimeDelta::FromSeconds(kMaxBackoffSeconds); +int SyncerThread::GetRecommendedDelaySeconds(int base_delay_seconds) { + if (base_delay_seconds >= kMaxBackoffSeconds) + return kMaxBackoffSeconds; // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 - int64 backoff_s = - std::max(static_cast<int64>(1), - last_delay.InSeconds() * kBackoffRandomizationFactor); + int backoff_s = + std::max(1, base_delay_seconds * kBackoffRandomizationFactor); // Flip a coin to randomize backoff interval by +/- 50%. int rand_sign = base::RandInt(0, 1) * 2 - 1; // Truncation is adequate for rounding here. backoff_s = backoff_s + - (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); + (rand_sign * (base_delay_seconds / kBackoffRandomizationFactor)); // Cap the backoff interval. - backoff_s = std::max(static_cast<int64>(1), - std::min(backoff_s, kMaxBackoffSeconds)); - - return TimeDelta::FromSeconds(backoff_s); -} + backoff_s = std::max(1, std::min(backoff_s, kMaxBackoffSeconds)); -void SyncerThread::Stop() { - VLOG(1) << "SyncerThread(" << this << ")" << " stop called"; - syncer_->RequestEarlyExit(); // Safe to call from any thread. - session_context_->connection_manager()->RemoveListener(this); - thread_.Stop(); + return backoff_s; } -void SyncerThread::DoCanaryJob() { - VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job"; - DoPendingJobIfPossible(true); -} - -void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { - SyncSessionJob* job_to_execute = NULL; - if (mode_ == CONFIGURATION_MODE && wait_interval_.get() - && wait_interval_->pending_configure_job.get()) { - VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job"; - job_to_execute = wait_interval_->pending_configure_job.get(); - } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { - VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job"; - // Pending jobs mostly have time from the past. Reset it so this job - // will get executed. - if (pending_nudge_->scheduled_start < TimeTicks::Now()) - pending_nudge_->scheduled_start = TimeTicks::Now(); - - scoped_ptr<SyncSession> session(CreateSyncSession( - pending_nudge_->session->source())); - - // Also the routing info might have been changed since we cached the - // pending nudge. Update it by coalescing to the latest. - pending_nudge_->session->Coalesce(*(session.get())); - // The pending nudge would be cleared in the DoSyncSessionJob function. - job_to_execute = pending_nudge_.get(); - } +// Inputs and return value in milliseconds. +int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { + // syncer_polling_interval_ is in seconds + int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; - if (job_to_execute != NULL) { - VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job"; - SyncSessionJob copy = *job_to_execute; - copy.is_canary_job = is_canary_job; - DoSyncSessionJob(copy); - } -} + // This is our default and lower bound. + int next_wait = syncer_polling_interval_ms; -SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { - ModelSafeRoutingInfo routes; - std::vector<ModelSafeWorker*> workers; - session_context_->registrar()->GetModelSafeRoutingInfo(&routes); - session_context_->registrar()->GetWorkers(&workers); - SyncSourceInfo info(source); - - SyncSession* session(new SyncSession(session_context_.get(), this, info, - routes, workers)); + // Get idle time, bounded by max wait. + int idle = min(user_idle_ms, syncer_max_interval_); - return session; -} + // If the user has been idle for a while, we'll start decreasing the poll + // rate. + if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { + next_wait = std::min(GetRecommendedDelaySeconds( + last_interval / 1000), syncer_max_interval_ / 1000) * 1000; + } -void SyncerThread::PollTimerCallback() { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - ModelSafeRoutingInfo r; - ModelTypePayloadMap types_with_payloads = - syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); - SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); - SyncSession* s = CreateSyncSession(info); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, - FROM_HERE); -} + return next_wait; +} + +// Called with mutex_ already locked. +void SyncerThread::NudgeSyncImpl( + int milliseconds_from_now, + NudgeSource source, + const ModelTypePayloadMap& model_types_with_payloads) { + // TODO(sync): Add the option to reset the backoff state machine. + // This is needed so nudges that are a result of the user's desire + // to download updates for a new data type can be satisfied quickly. + if (vault_.current_wait_interval_.mode == WaitInterval::THROTTLED || + vault_.current_wait_interval_.had_nudge_during_backoff) { + // Drop nudges on the floor if we've already had one since starting this + // stage of exponential backoff or we are throttled. + return; + } -void SyncerThread::Unthrottle() { - DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); - VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled.."; - DoCanaryJob(); - wait_interval_.reset(); -} + // Union the current ModelTypePayloadMap with any from nudges that may have + // already posted (coalesce the nudge datatype information). + // TODO(tim): It seems weird to do this if the sources don't match up (e.g. + // if pending_source is kLocal and |source| is kClearPrivateData). + syncable::CoalescePayloads(&vault_.pending_nudge_types_, + model_types_with_payloads); + + const TimeTicks nudge_time = TimeTicks::Now() + + TimeDelta::FromMilliseconds(milliseconds_from_now); + if (nudge_time <= vault_.pending_nudge_time_) { + VLOG(1) << "Nudge for source " << source + << " dropped due to existing later pending nudge"; + return; + } -void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - session_context_->NotifyListeners(SyncEngineEvent(cause)); -} + VLOG(1) << "Replacing pending nudge for source " << source + << " at " << nudge_time.ToInternalValue(); -bool SyncerThread::IsBackingOff() const { - return wait_interval_.get() && wait_interval_->mode == - WaitInterval::EXPONENTIAL_BACKOFF; + vault_.pending_nudge_source_ = source; + vault_.pending_nudge_time_ = nudge_time; + vault_field_changed_.Broadcast(); } -void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { - wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, - silenced_until - TimeTicks::Now())); - wait_interval_->timer.Start(wait_interval_->length, this, - &SyncerThread::Unthrottle); +void SyncerThread::SetNotificationsEnabled(bool notifications_enabled) { + base::AutoLock lock(lock_); + session_context_->set_notifications_enabled(notifications_enabled); } -bool SyncerThread::IsSyncingCurrentlySilenced() { - return wait_interval_.get() && wait_interval_->mode == - WaitInterval::THROTTLED; -} +// Returns the amount of time since the user last interacted with the computer, +// in milliseconds +int SyncerThread::UserIdleTime() { +#if defined(OS_WIN) + LASTINPUTINFO last_input_info; + last_input_info.cbSize = sizeof(LASTINPUTINFO); + + // Get time in windows ticks since system start of last activity. + BOOL b = ::GetLastInputInfo(&last_input_info); + if (b == TRUE) + return ::GetTickCount() - last_input_info.dwTime; +#elif defined(OS_MACOSX) + // It would be great to do something like: + // + // return 1000 * + // CGEventSourceSecondsSinceLastEventType( + // kCGEventSourceStateCombinedSessionState, + // kCGAnyInputEventType); + // + // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon + // and can't link that high up the food chain. Thus this mucking in IOKit. + + io_service_t hid_service = + IOServiceGetMatchingService(kIOMasterPortDefault, + IOServiceMatching("IOHIDSystem")); + if (!hid_service) { + LOG(WARNING) << "Could not obtain IOHIDSystem"; + return 0; + } -void SyncerThread::OnReceivedShortPollIntervalUpdate( - const base::TimeDelta& new_interval) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - syncer_short_poll_interval_seconds_ = new_interval; -} + CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, + CFSTR("HIDIdleTime"), + kCFAllocatorDefault, + 0); + if (!object) { + LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; + IOObjectRelease(hid_service); + return 0; + } -void SyncerThread::OnReceivedLongPollIntervalUpdate( - const base::TimeDelta& new_interval) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - syncer_long_poll_interval_seconds_ = new_interval; -} + int64 idle_time; // in nanoseconds + Boolean success = false; + if (CFGetTypeID(object) == CFNumberGetTypeID()) { + success = CFNumberGetValue((CFNumberRef)object, + kCFNumberSInt64Type, + &idle_time); + } else { + LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; + } -void SyncerThread::OnShouldStopSyncingPermanently() { - VLOG(1) << "SyncerThread(" << this << ")" - << " OnShouldStopSyncingPermanently"; - syncer_->RequestEarlyExit(); // Thread-safe. - Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); -} + CFRelease(object); + IOObjectRelease(hid_service); -void SyncerThread::OnServerConnectionEvent( - const ServerConnectionEvent2& event) { - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, - &SyncerThread::CheckServerConnectionManagerStatus, - event.connection_code)); -} + if (!success) { + LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; + return 0; + } + return idle_time / 1000000; // nano to milli +#elif defined(OS_LINUX) + if (idle_query_.get()) + return idle_query_->IdleTime(); + return 0; +#else + static bool was_logged = false; + if (!was_logged) { + was_logged = true; + VLOG(1) << "UserIdleTime unimplemented on this platform, synchronization " + "will not throttle when user idle"; + } +#endif -void SyncerThread::set_notifications_enabled(bool notifications_enabled) { - session_context_->set_notifications_enabled(notifications_enabled); + return 0; } -} // browser_sync +} // namespace browser_sync diff --git a/chrome/browser/sync/engine/syncer_thread.h b/chrome/browser/sync/engine/syncer_thread.h index 11fee16..d270f14 100644 --- a/chrome/browser/sync/engine/syncer_thread.h +++ b/chrome/browser/sync/engine/syncer_thread.h @@ -3,175 +3,74 @@ // found in the LICENSE file. // // A class to run the syncer on a thread. +// This is the default implementation of SyncerThread whose Stop implementation +// does not support a timeout, but is greatly simplified. #ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ #define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ #pragma once -#include "base/callback.h" -#include "base/memory/linked_ptr.h" +#include <list> +#include <string> +#include <vector> + +#include "base/basictypes.h" +#include "base/gtest_prod_util.h" +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" -#include "base/observer_list.h" -#include "base/task.h" +#include "base/synchronization/condition_variable.h" #include "base/threading/thread.h" #include "base/time.h" -#include "base/timer.h" -#include "chrome/browser/sync/engine/nudge_source.h" -#include "chrome/browser/sync/engine/polling_constants.h" -#include "chrome/browser/sync/engine/syncer.h" -#include "chrome/browser/sync/syncable/model_type_payload_map.h" -#include "chrome/browser/sync/engine/net/server_connection_manager.h" +#include "base/synchronization/waitable_event.h" +#include "chrome/browser/sync/engine/syncer_types.h" #include "chrome/browser/sync/sessions/sync_session.h" -#include "chrome/browser/sync/sessions/sync_session_context.h" +#include "chrome/browser/sync/syncable/model_type.h" +#include "chrome/browser/sync/syncable/model_type_payload_map.h" +#include "chrome/common/deprecated/event_sys-inl.h" + +#if defined(OS_LINUX) +#include "chrome/browser/sync/engine/idle_query_linux.h" +#endif + +class EventListenerHookup; namespace browser_sync { +class ModelSafeWorker; +class ServerConnectionManager; +class Syncer; +class URLFactory; struct ServerConnectionEvent; -class SyncerThread : public sessions::SyncSession::Delegate, - public ServerConnectionEventListener { +class SyncerThread : public base::RefCountedThreadSafe<SyncerThread>, + public sessions::SyncSession::Delegate { + FRIEND_TEST_ALL_PREFIXES(SyncerThreadTest, CalculateSyncWaitTime); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadTest, CalculatePollingWaitTime); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, Polling); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, Nudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, NudgeWithDataTypes); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, + NudgeWithDataTypesCoalesced); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, NudgeWithPayloads); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, + NudgeWithPayloadsCoalesced); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, Throttling); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, AuthInvalid); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, Pause); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, StartWhenNotConnected); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, PauseWhenNotConnected); + FRIEND_TEST_ALL_PREFIXES(SyncerThreadWithSyncerTest, StopSyncPermanently); + friend class SyncerThreadWithSyncerTest; + friend class SyncerThreadFactory; public: - enum Mode { - // In this mode, the thread only performs configuration tasks. This is - // designed to make the case where we want to download updates for a - // specific type only, and not continue syncing until we are moved into - // normal mode. - CONFIGURATION_MODE, - // Resumes polling and allows nudges, drops configuration tasks. Runs - // through entire sync cycle. - NORMAL_MODE, - }; - - // Takes ownership of both |context| and |syncer|. - SyncerThread(sessions::SyncSessionContext* context, Syncer* syncer); - virtual ~SyncerThread(); - - typedef Callback0::Type ModeChangeCallback; - - // Change the mode of operation. - // We don't use a lock when changing modes, so we won't cause currently - // scheduled jobs to adhere to the new mode. We could protect it, but it - // doesn't buy very much as a) a session could already be in progress and it - // will continue no matter what, b) the scheduled sessions already contain - // all their required state and won't be affected by potential change at - // higher levels (i.e. the registrar), and c) we service tasks FIFO, so once - // the mode changes all future jobs will be run against the updated mode. - // If supplied, |callback| will be invoked when the mode has been - // changed to |mode| *from the SyncerThread*, and not from the caller - // thread. - void Start(Mode mode, ModeChangeCallback* callback); - - // Joins on the thread as soon as possible (currently running session - // completes). - void Stop(); - - // The meat and potatoes. - void ScheduleNudge(const base::TimeDelta& delay, NudgeSource source, - const syncable::ModelTypeBitSet& types, - const tracked_objects::Location& nudge_location); - void ScheduleNudgeWithPayloads( - const base::TimeDelta& delay, NudgeSource source, - const syncable::ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location); - void ScheduleConfig(const syncable::ModelTypeBitSet& types); - void ScheduleClearUserData(); - - // Change status of notifications in the SyncSessionContext. - void set_notifications_enabled(bool notifications_enabled); - - // DDOS avoidance function. Calculates how long we should wait before trying - // again after a failed sync attempt, where the last delay was |base_delay|. - // TODO(tim): Look at URLRequestThrottlerEntryInterface. - static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay); - - // SyncSession::Delegate implementation. - virtual void OnSilencedUntil(const base::TimeTicks& silenced_until); - virtual bool IsSyncingCurrentlySilenced(); - virtual void OnReceivedShortPollIntervalUpdate( - const base::TimeDelta& new_interval); - virtual void OnReceivedLongPollIntervalUpdate( - const base::TimeDelta& new_interval); - virtual void OnShouldStopSyncingPermanently(); - - // ServerConnectionEventListener implementation. - // TODO(tim): schedule a nudge when valid connection detected? in 1 minute? - virtual void OnServerConnectionEvent(const ServerConnectionEvent2& event); - - private: - enum JobProcessDecision { - // Indicates we should continue with the current job. - CONTINUE, - // Indicates that we should save it to be processed later. - SAVE, - // Indicates we should drop this job. - DROP, - }; - - struct SyncSessionJob { - // An enum used to describe jobs for scheduling purposes. - enum SyncSessionJobPurpose { - // Our poll timer schedules POLL jobs periodically based on a server - // assigned poll interval. - POLL, - // A nudge task can come from a variety of components needing to force - // a sync. The source is inferable from |session.source()|. - NUDGE, - // The user invoked a function in the UI to clear their entire account - // and stop syncing (globally). - CLEAR_USER_DATA, - // Typically used for fetching updates for a subset of the enabled types - // during initial sync or reconfiguration. We don't run all steps of - // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). - CONFIGURATION, - }; - SyncSessionJob(); - SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, - linked_ptr<sessions::SyncSession> session, bool is_canary_job, - const tracked_objects::Location& nudge_location); - ~SyncSessionJob(); - SyncSessionJobPurpose purpose; - base::TimeTicks scheduled_start; - linked_ptr<sessions::SyncSession> session; - bool is_canary_job; - - // This is the location the nudge came from. used for debugging purpose. - // In case of multiple nudges getting coalesced this stores the first nudge - // that came in. - tracked_objects::Location nudge_location; - }; - friend class SyncerThread2Test; - friend class SyncerThread2WhiteboxTest; - - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, - DropNudgeWhileExponentialBackOff); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, SaveNudge); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueNudge); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, DropPoll); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinuePoll); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueConfiguration); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, - SaveConfigurationWhileThrottled); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, - SaveNudgeWhileThrottled); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, - ContinueClearUserDataUnderAllCircumstances); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, - ContinueCanaryJobConfig); - FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, - ContinueNudgeWhileExponentialBackOff); - - // A component used to get time delays associated with exponential backoff. - // Encapsulated into a class to facilitate testing. - class DelayProvider { - public: - DelayProvider(); - virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay); - virtual ~DelayProvider(); - private: - DISALLOW_COPY_AND_ASSIGN(DelayProvider); - }; - + // Encapsulates the parameters that make up an interval on which the + // syncer thread is sleeping. struct WaitInterval { enum Mode { + // A wait interval whose duration has not been affected by exponential + // backoff. The base case for exponential backoff falls in to this case + // (e.g when the exponent is 1). So far, we don't need a separate case. + // NORMAL intervals are not nudge-rate limited. + NORMAL, // A wait interval whose duration has been affected by exponential // backoff. // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. @@ -180,162 +79,286 @@ class SyncerThread : public sessions::SyncSession::Delegate, // during such an interval. THROTTLED, }; - WaitInterval(); - ~WaitInterval(); Mode mode; - - // This bool is set to true if we have observed a nudge during this + // This bool is set to true if we have observed a nudge during during this // interval and mode == EXPONENTIAL_BACKOFF. - bool had_nudge; - base::TimeDelta length; - base::OneShotTimer<SyncerThread> timer; - - // Configure jobs are saved only when backing off or throttling. So we - // expose the pointer here. - scoped_ptr<SyncSessionJob> pending_configure_job; - WaitInterval(Mode mode, base::TimeDelta length); - }; - - // Helper to assemble a job and post a delayed task to sync. - void ScheduleSyncSessionJob( - const base::TimeDelta& delay, - SyncSessionJob::SyncSessionJobPurpose purpose, - sessions::SyncSession* session, - const tracked_objects::Location& nudge_location); + bool had_nudge_during_backoff; + base::TimeDelta poll_delta; // The wait duration until the next poll. - // Invoke the Syncer to perform a sync. - void DoSyncSessionJob(const SyncSessionJob& job); - - // Called after the Syncer has performed the sync represented by |job|, to - // reset our state. - void FinishSyncSessionJob(const SyncSessionJob& job); - - // Record important state that might be needed in future syncs, such as which - // data types may require cleanup. - void UpdateCarryoverSessionState(const SyncSessionJob& old_job); - - // Helper to FinishSyncSessionJob to schedule the next sync operation. - void ScheduleNextSync(const SyncSessionJob& old_job); + WaitInterval() : mode(NORMAL), had_nudge_during_backoff(false) { } + }; - // Helper to configure polling intervals. Used by Start and ScheduleNextSync. - void AdjustPolling(const SyncSessionJob* old_job); + enum NudgeSource { + kUnknown = 0, + kNotification, + kLocal, + kContinuation, + kClearPrivateData + }; + // Server can overwrite these values via client commands. + // Standard short poll. This is used when XMPP is off. + static const int kDefaultShortPollIntervalSeconds; + // Long poll is used when XMPP is on. + static const int kDefaultLongPollIntervalSeconds; + // 30 minutes by default. If exponential backoff kicks in, this is the + // longest possible poll interval. + static const int kDefaultMaxPollIntervalMs; + // Maximum interval for exponential backoff. + static const int kMaxBackoffSeconds; + + explicit SyncerThread(sessions::SyncSessionContext* context); + virtual ~SyncerThread(); - // Helper to ScheduleNextSync in case of consecutive sync errors. - void HandleConsecutiveContinuationError(const SyncSessionJob& old_job); + virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr); + + // Starts a syncer thread. + // Returns true if it creates a thread or if there's currently a thread + // running and false otherwise. + virtual bool Start(); + + // Stop processing. |max_wait| doesn't do anything in this version. + virtual bool Stop(int max_wait); + + // Request that the thread pauses. Returns false if the request can + // not be completed (e.g. the thread is not running). When the + // thread actually pauses, a SyncEngineEvent::PAUSED event notification + // will be sent to the relay channel. + virtual bool RequestPause(); + + // Request that the thread resumes from pause. Returns false if the + // request can not be completed (e.g. the thread is not running or + // is not currently paused). When the thread actually resumes, a + // SyncEngineEvent::RESUMED event notification will be sent to the relay + // channel. + virtual bool RequestResume(); + + // Nudges the syncer to sync with a delay specified. This API is for access + // from the SyncerThread's controller and will cause a mutex lock. + virtual void NudgeSyncer(int milliseconds_from_now, NudgeSource source); + + // Same as |NudgeSyncer|, but supports tracking the datatypes that caused + // the nudge to occur. + virtual void NudgeSyncerWithDataTypes( + int milliseconds_from_now, + NudgeSource source, + const syncable::ModelTypeBitSet& model_types); + + // Same as |NudgeSyncer|, but supports including a payload for passing on to + // the download updates command. Datatypes with payloads are also considered + // to have caused a nudged to occur and treated accordingly. + virtual void NudgeSyncerWithPayloads( + int milliseconds_from_now, + NudgeSource source, + const syncable::ModelTypePayloadMap& model_types_with_payloads); + + void SetNotificationsEnabled(bool notifications_enabled); + + // Call this when a directory is opened + void CreateSyncer(const std::string& dirname); + + // DDOS avoidance function. The argument and return value is in seconds + static int GetRecommendedDelaySeconds(int base_delay_seconds); + + protected: + virtual void ThreadMain(); + void ThreadMainLoop(); + + virtual void SetConnected(bool connected); + + virtual void SetSyncerPollingInterval(base::TimeDelta interval); + virtual void SetSyncerShortPollInterval(base::TimeDelta interval); + + // Needed to emulate the behavior of pthread_create, which synchronously + // started the thread and set the value of thread_running_ to true. + // We can't quite match that because we asynchronously post the task, + // which opens a window for Stop to get called before the task actually + // makes it. To prevent this, we block Start() until we're sure it's ok. + base::WaitableEvent thread_main_started_; + + // Handle of the running thread. + base::Thread thread_; - // Determines if it is legal to run |job| by checking current - // operational mode, backoff or throttling, freshness - // (so we don't make redundant syncs), and connection. - bool ShouldRunJob(const SyncSessionJob& job); + // Fields that are modified / accessed by multiple threads go in this struct + // for clarity and explicitness. + struct ProtectedFields { + ProtectedFields(); + ~ProtectedFields(); - // Decide whether we should CONTINUE, SAVE or DROP the job. - JobProcessDecision DecideOnJob(const SyncSessionJob& job); + // False when we want to stop the thread. + bool stop_syncer_thread_; - // Decide on whether to CONTINUE, SAVE or DROP the job when we are in - // backoff mode. - JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); + // True when a pause was requested. + bool pause_requested_; - // Saves the job for future execution. Note: It drops all the poll jobs. - void SaveJob(const SyncSessionJob& job); + // True when the thread is paused. + bool paused_; - // Coalesces the current job with the pending nudge. - void InitOrCoalescePendingJob(const SyncSessionJob& job); + Syncer* syncer_; - // 'Impl' here refers to real implementation of public functions, running on - // |thread_|. - void StartImpl(Mode mode, linked_ptr<ModeChangeCallback> callback); - void ScheduleNudgeImpl( - const base::TimeDelta& delay, - sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, - const syncable::ModelTypePayloadMap& types_with_payloads, - bool is_canary_job, const tracked_objects::Location& nudge_location); - void ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, - const std::vector<ModelSafeWorker*>& workers, - const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source); - void ScheduleClearUserDataImpl(); + // State of the server connection. + bool connected_; - // Returns true if the client is currently in exponential backoff. - bool IsBackingOff() const; + // kUnknown if there is no pending nudge. (Theoretically, there + // could be a pending nudge of type kUnknown, so it's better to + // check pending_nudge_time_.) + NudgeSource pending_nudge_source_; - // Helper to signal all listeners registered with |session_context_|. - void Notify(SyncEngineEvent::EventCause cause); + // Map of all datatypes that are requesting a nudge. Can be union + // from multiple nudges that are coalesced. In addition, we + // optionally track a payload associated with each datatype (most recent + // payload overwrites old ones). These payloads are used by the download + // updates command and can contain datatype specific information the server + // might use. + syncable::ModelTypePayloadMap pending_nudge_types_; - // Callback to change backoff state. - void DoCanaryJob(); - void Unthrottle(); + // null iff there is no pending nudge. + base::TimeTicks pending_nudge_time_; - // Executes the pending job. Called whenever an event occurs that may - // change conditions permitting a job to run. Like when network connection is - // re-established, mode changes etc. - void DoPendingJobIfPossible(bool is_canary_job); + // The wait interval for to the current iteration of our main loop. This is + // only written to by the syncer thread, and since the only reader from a + // different thread (NudgeSync) is called at totally random times, we don't + // really need to access mutually exclusively as the data races that exist + // are intrinsic, but do so anyway and avoid using 'volatile'. + WaitInterval current_wait_interval_; + } vault_; - // The pointer is owned by the caller. - browser_sync::sessions::SyncSession* CreateSyncSession( - const browser_sync::sessions::SyncSourceInfo& info); + // Gets signaled whenever a thread outside of the syncer thread changes a + // protected field in the vault_. + base::ConditionVariable vault_field_changed_; - // Creates a session for a poll and performs the sync. - void PollTimerCallback(); + // Used to lock everything in |vault_|. + base::Lock lock_; - // Assign |start| and |end| to appropriate SyncerStep values for the - // specified |purpose|. - void SetSyncerStepsForPurpose(SyncSessionJob::SyncSessionJobPurpose purpose, - SyncerStep* start, - SyncerStep* end); + private: + // Threshold multipler for how long before user should be considered idle. + static const int kPollBackoffThresholdMultiplier = 10; - // Initializes the hookup between the ServerConnectionManager and us. - void WatchConnectionManager(); + // SyncSession::Delegate implementation. + virtual void OnSilencedUntil(const base::TimeTicks& silenced_until); + virtual bool IsSyncingCurrentlySilenced(); + virtual void OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval); + virtual void OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval); + virtual void OnShouldStopSyncingPermanently(); - // Used to update |server_connection_ok_|, see below. - void CheckServerConnectionManagerStatus( - HttpResponse::ServerConnectionCode code); + void HandleServerConnectionEvent(const ServerConnectionEvent& event); + + // Collect all local state required for a sync and build a SyncSession out of + // it, reset state for the next time, and performs the sync cycle. + // See |GetAndResetNudgeSource| for details on what 'reset' means. + // |was_nudged| is set to true if the session returned is fulfilling a nudge. + // Returns once the session is finished (HasMoreToSync returns false). The + // caller owns the returned SyncSession. + sessions::SyncSession* SyncMain(Syncer* syncer, + bool was_throttled, + bool continue_sync_cycle, + bool* initial_sync_for_thread, + bool* was_nudged); + + // Calculates the next sync wait time and exponential backoff state. + // last_poll_wait is the time duration of the previous polling timeout which + // was used. user_idle_milliseconds is updated by this method, and is a report + // of the full amount of time since the last period of activity for the user. + // The continue_sync_cycle parameter is used to determine whether or not we + // are calculating a polling wait time that is a continuation of an sync cycle + // which terminated while the syncer still had work to do. was_nudged is used + // in case of exponential backoff so we only allow one nudge per backoff + // interval. + WaitInterval CalculatePollingWaitTime( + int last_poll_wait, // in s + int* user_idle_milliseconds, + bool* continue_sync_cycle, + bool was_nudged); + + // Helper to above function, considers effect of user idle time. + virtual int CalculateSyncWaitTime(int last_wait, int user_idle_ms); + + // Resets the source tracking state to a clean slate and returns the current + // state in a SyncSourceInfo. + // The initial sync boolean is updated if read as a sentinel. The following + // two methods work in concert to achieve this goal. + // If |was_throttled| was true, this still discards elapsed nudges, but we + // treat the request as a periodic poll rather than a nudge from a source. + // Builds a SyncSourceInfo and returns whether a nudge occurred in the + // |was_nudged| parameter. + sessions::SyncSourceInfo GetAndResetNudgeSource(bool was_throttled, + bool continue_sync_cycle, + bool* initial_sync, + bool* was_nudged); + + sessions::SyncSourceInfo MakeSyncSourceInfo( + bool nudged, + NudgeSource nudge_source, + const syncable::ModelTypePayloadMap& model_types_with_payloads, + bool* initial_sync); + + int UserIdleTime(); + + void WaitUntilConnectedOrQuit(); + + // The thread will remain in this method until a resume is requested + // or shutdown is started. + void PauseUntilResumedOrQuit(); + + void EnterPausedState(); + + void ExitPausedState(); + + // For unit tests only. + virtual void DisableIdleDetection(); + + // This sets all conditions for syncer thread termination but does not + // actually join threads. It is expected that Stop will be called at some + // time after to fully stop and clean up. + void RequestSyncerExitAndSetThreadStopConditions(); - // Called once the first time thread_ is started to broadcast an initial - // session snapshot containing data like initial_sync_ended. Important when - // the client starts up and does not need to perform an initial sync. - void SendInitialSnapshot(); + void Notify(SyncEngineEvent::EventCause cause); - base::Thread thread_; + scoped_ptr<EventListenerHookup> conn_mgr_hookup_; // Modifiable versions of kDefaultLongPollIntervalSeconds which can be // updated by the server. - base::TimeDelta syncer_short_poll_interval_seconds_; - base::TimeDelta syncer_long_poll_interval_seconds_; - - // Periodic timer for polling. See AdjustPolling. - base::RepeatingTimer<SyncerThread> poll_timer_; - - // The mode of operation. We don't use a lock, see Start(...) comment. - Mode mode_; - - // TODO(tim): Bug 26339. This needs to track more than just time I think, - // since the nudges could be for different types. Current impl doesn't care. - base::TimeTicks last_sync_session_end_time_; + int syncer_short_poll_interval_seconds_; + int syncer_long_poll_interval_seconds_; + + // The time we wait between polls in seconds. This is used as lower bound on + // our wait time. Updated once per loop from the command line flag. + int syncer_polling_interval_; + + // The upper bound on the nominal wait between polls in seconds. Note that + // this bounds the "nominal" poll interval, while the the actual interval + // also takes previous failures into account. + int syncer_max_interval_; + + // This causes syncer to start syncing ASAP. If the rate of requests is too + // high the request will be silently dropped. mutex_ should be held when + // this is called. + void NudgeSyncImpl( + int milliseconds_from_now, + NudgeSource source, + const syncable::ModelTypePayloadMap& model_types_with_payloads); + +#if defined(OS_LINUX) + // On Linux, we need this information in order to query idle time. + scoped_ptr<IdleQueryLinux> idle_query_; +#endif - // Have we observed a valid server connection? - bool server_connection_ok_; - - // Tracks in-flight nudges so we can coalesce. - scoped_ptr<SyncSessionJob> pending_nudge_; - - // Current wait state. Null if we're not in backoff and not throttled. - scoped_ptr<WaitInterval> wait_interval_; + scoped_ptr<sessions::SyncSessionContext> session_context_; - scoped_ptr<DelayProvider> delay_provider_; + // Set whenever the server instructs us to stop sending it requests until + // a specified time, and reset for each call to SyncShare. (Note that the + // WaitInterval::THROTTLED contract is such that we don't call SyncShare at + // all until the "silenced until" embargo expires.) + base::TimeTicks silenced_until_; - // Invoked to run through the sync cycle. - scoped_ptr<Syncer> syncer_; - - scoped_ptr<sessions::SyncSessionContext> session_context_; + // Useful for unit tests + bool disable_idle_detection_; DISALLOW_COPY_AND_ASSIGN(SyncerThread); }; - } // namespace browser_sync -// The SyncerThread manages its own internal thread and thus outlives it. We -// don't need refcounting for posting tasks to this internal thread. -DISABLE_RUNNABLE_METHOD_REFCOUNT(browser_sync::SyncerThread); - #endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc new file mode 100644 index 0000000..b8bd9a9 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2.cc @@ -0,0 +1,863 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/engine/syncer_thread2.h" + +#include <algorithm> + +#include "base/rand_util.h" +#include "chrome/browser/sync/engine/syncer.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { + +using sessions::SyncSession; +using sessions::SyncSessionSnapshot; +using sessions::SyncSourceInfo; +using syncable::ModelTypePayloadMap; +using syncable::ModelTypeBitSet; +using sync_pb::GetUpdatesCallerInfo; + +namespace s3 { + +SyncerThread::DelayProvider::DelayProvider() {} +SyncerThread::DelayProvider::~DelayProvider() {} + +SyncerThread::WaitInterval::WaitInterval() {} +SyncerThread::WaitInterval::~WaitInterval() {} + +SyncerThread::SyncSessionJob::SyncSessionJob() {} +SyncerThread::SyncSessionJob::~SyncSessionJob() {} + +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, + base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location) : purpose(purpose), + scheduled_start(start), + session(session), + is_canary_job(is_canary_job), + nudge_location(nudge_location) { +} + +TimeDelta SyncerThread::DelayProvider::GetDelay( + const base::TimeDelta& last_delay) { + return SyncerThread::GetRecommendedDelay(last_delay); +} + +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_UNKNOWN: + return GetUpdatesCallerInfo::UNKNOWN; + default: + NOTREACHED(); + return GetUpdatesCallerInfo::UNKNOWN; + } +} + +SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) + : mode(mode), had_nudge(false), length(length) { } + +SyncerThread::SyncerThread(sessions::SyncSessionContext* context, + Syncer* syncer) + : thread_("SyncEngine_SyncerThread"), + syncer_short_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), + syncer_long_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), + mode_(NORMAL_MODE), + server_connection_ok_(false), + delay_provider_(new DelayProvider()), + syncer_(syncer), + session_context_(context) { +} + +SyncerThread::~SyncerThread() { + DCHECK(!thread_.IsRunning()); +} + +void SyncerThread::CheckServerConnectionManagerStatus( + HttpResponse::ServerConnectionCode code) { + + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << "Old mode: " << server_connection_ok_ << " Code: " << code; + // Note, be careful when adding cases here because if the SyncerThread + // thinks there is no valid connection as determined by this method, it + // will drop out of *all* forward progress sync loops (it won't poll and it + // will queue up Talk notifications but not actually call SyncShare) until + // some external action causes a ServerConnectionManager to broadcast that + // a valid connection has been re-established. + if (HttpResponse::CONNECTION_UNAVAILABLE == code || + HttpResponse::SYNC_AUTH_ERROR == code) { + server_connection_ok_ = false; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; + } else if (HttpResponse::SERVER_CONNECTION_OK == code) { + server_connection_ok_ = true; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; + DoCanaryJob(); + } +} + +void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread " + << MessageLoop::current()->thread_name(); + if (!thread_.IsRunning()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode " + << mode; + if (!thread_.Start()) { + NOTREACHED() << "Unable to start SyncerThread."; + return; + } + WatchConnectionManager(); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::SendInitialSnapshot)); + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = " + << mode; + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); +} + +void SyncerThread::SendInitialSnapshot() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, + SyncSourceInfo(), ModelSafeRoutingInfo(), + std::vector<ModelSafeWorker*>())); + SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); + sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); + event.snapshot = &snapshot; + session_context_->NotifyListeners(event); +} + +void SyncerThread::WatchConnectionManager() { + ServerConnectionManager* scm = session_context_->connection_manager(); + CheckServerConnectionManagerStatus(scm->server_status()); + scm->AddListener(this); +} + +void SyncerThread::StartImpl(Mode mode, + linked_ptr<ModeChangeCallback> callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " + << mode; + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!session_context_->account_name().empty()); + DCHECK(syncer_.get()); + mode_ = mode; + AdjustPolling(NULL); // Will kick start poll timer if needed. + if (callback.get()) + callback->Run(); + + // We just changed our mode. See if there are any pending jobs that we could + // execute in the new mode. + DoPendingJobIfPossible(false); +} + +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( + const SyncSessionJob& job) { + + DCHECK(wait_interval_.get()); + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); + + VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : " + << wait_interval_->mode << "Wait interval had nudge : " + << wait_interval_->had_nudge << "is canary job : " + << job.is_canary_job; + + if (job.purpose == SyncSessionJob::POLL) + return DROP; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || + job.purpose == SyncSessionJob::CONFIGURATION); + if (wait_interval_->mode == WaitInterval::THROTTLED) + return SAVE; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + if (job.purpose == SyncSessionJob::NUDGE) { + if (mode_ == CONFIGURATION_MODE) + return SAVE; + + // If we already had one nudge then just drop this nudge. We will retry + // later when the timer runs out. + return wait_interval_->had_nudge ? DROP : CONTINUE; + } + // This is a config job. + return job.is_canary_job ? CONTINUE : SAVE; +} + +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( + const SyncSessionJob& job) { + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) + return CONTINUE; + + if (wait_interval_.get()) + return DecideWhileInWaitInterval(job); + + if (mode_ == CONFIGURATION_MODE) { + if (job.purpose == SyncSessionJob::NUDGE) + return SAVE; + else if (job.purpose == SyncSessionJob::CONFIGURATION) + return CONTINUE; + else + return DROP; + } + + // We are in normal mode. + DCHECK_EQ(mode_, NORMAL_MODE); + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); + + // Freshness condition + if (job.scheduled_start < last_sync_session_end_time_) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Dropping job because of freshness"; + return DROP; + } + + if (server_connection_ok_) + return CONTINUE; + + VLOG(2) << "SyncerThread(" << this << ")" + << " Bad server connection. Using that to decide on job."; + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; +} + +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); + if (pending_nudge_.get() == NULL) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Creating a pending nudge job"; + SyncSession* s = job.session.get(); + scoped_ptr<SyncSession> session(new SyncSession(s->context(), + s->delegate(), s->source(), s->routing_info(), s->workers())); + + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, + make_linked_ptr(session.release()), false, job.nudge_location); + pending_nudge_.reset(new SyncSessionJob(new_job)); + + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + pending_nudge_->scheduled_start = job.scheduled_start; + + // Unfortunately the nudge location cannot be modified. So it stores the + // location of the first caller. +} + +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { + JobProcessDecision decision = DecideOnJob(job); + VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: " + << decision << " Job purpose " << job.purpose << "mode " << mode_; + if (decision != SAVE) + return decision == CONTINUE; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == + SyncSessionJob::CONFIGURATION); + + SaveJob(job); + return false; +} + +void SyncerThread::SaveJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); + if (job.purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job"; + InitOrCoalescePendingJob(job); + } else if (job.purpose == SyncSessionJob::CONFIGURATION){ + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job"; + DCHECK(wait_interval_.get()); + DCHECK(mode_ == CONFIGURATION_MODE); + + SyncSession* old = job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + SyncSessionJob new_job(job.purpose, TimeTicks::Now(), + make_linked_ptr(s), false, job.nudge_location); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); + } // drop the rest. +} + +// Functor for std::find_if to search by ModelSafeGroup. +struct ModelSafeWorkerGroupIs { + explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} + bool operator()(ModelSafeWorker* w) { + return group == w->GetModelSafeGroup(); + } + ModelSafeGroup group; +}; + +void SyncerThread::ScheduleClearUserData() { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleClearUserDataImpl)); +} + +void SyncerThread::ScheduleNudge(const TimeDelta& delay, + NudgeSource source, const ModelTypeBitSet& types, + const tracked_objects::Location& nudge_location) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled"; + + ModelTypePayloadMap types_with_payloads = + syncable::ModelTypePayloadMapFromBitSet(types, std::string()); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); +} + +void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, + NudgeSource source, const ModelTypePayloadMap& types_with_payloads, + const tracked_objects::Location& nudge_location) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); +} + +void SyncerThread::ScheduleClearUserDataImpl() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + SyncSession* session = new SyncSession(session_context_.get(), this, + SyncSourceInfo(), ModelSafeRoutingInfo(), + std::vector<ModelSafeWorker*>()); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); +} + +void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, + GetUpdatesCallerInfo::GetUpdatesSource source, + const ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; + // Note we currently nudge for all types regardless of the ones incurring + // the nudge. Doing different would throw off some syncer commands like + // CleanupDisabledTypes. We may want to change this in the future. + SyncSourceInfo info(source, types_with_payloads); + + SyncSession* session(CreateSyncSession(info)); + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, + make_linked_ptr(session), is_canary_job, + nudge_location); + + session = NULL; + if (!ShouldRunJob(job)) + return; + + if (pending_nudge_.get()) { + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because" + << "we are in backoff"; + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + + if (!IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because" + << " we are not in backoff and the job was coalesced"; + return; + } else { + VLOG(2) << "SyncerThread(" << this << ")" + << " Rescheduling pending nudge"; + SyncSession* s = pending_nudge_->session.get(); + job.session.reset(new SyncSession(s->context(), s->delegate(), + s->source(), s->routing_info(), s->workers())); + pending_nudge_.reset(); + } + } + + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), + nudge_location); +} + +// Helper to extract the routing info and workers corresponding to types in +// |types| from |registrar|. +void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, + ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, + std::vector<ModelSafeWorker*>* workers) { + ModelSafeRoutingInfo r_tmp; + std::vector<ModelSafeWorker*> w_tmp; + registrar->GetModelSafeRoutingInfo(&r_tmp); + registrar->GetWorkers(&w_tmp); + + typedef std::vector<ModelSafeWorker*>::const_iterator iter; + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { + if (!types.test(i)) + continue; + syncable::ModelType t = syncable::ModelTypeFromInt(i); + DCHECK_EQ(1U, r_tmp.count(t)); + (*routes)[t] = r_tmp[t]; + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), + ModelSafeWorkerGroupIs(r_tmp[t])); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); + } + + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), + ModelSafeWorkerGroupIs(GROUP_PASSIVE)); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); +} + +void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config"; + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + GetModelSafeParamsForTypes(types, session_context_->registrar(), + &routes, &workers); + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleConfigImpl, routes, workers, + GetUpdatesCallerInfo::FIRST_UPDATE)); +} + +void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; + // TODO(tim): config-specific GetUpdatesCallerInfo value? + SyncSession* session = new SyncSession(session_context_.get(), this, + SyncSourceInfo(source, + syncable::ModelTypePayloadMapFromRoutingInfo( + routing_info, std::string())), + routing_info, workers); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CONFIGURATION, session, FROM_HERE); +} + +void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, + const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + SyncSessionJob job(purpose, TimeTicks::Now() + delay, + make_linked_ptr(session), false, nudge_location); + if (purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" + << " ScheduleSyncSessionJob"; + DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); + pending_nudge_.reset(new SyncSessionJob(job)); + } + VLOG(2) << "SyncerThread(" << this << ")" + << " Posting job to execute in DoSyncSessionJob. Job purpose " + << job.purpose; + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::DoSyncSessionJob, job), + delay.InMilliseconds()); +} + +void SyncerThread::SetSyncerStepsForPurpose( + SyncSessionJob::SyncSessionJobPurpose purpose, + SyncerStep* start, SyncerStep* end) { + *end = SYNCER_END; + switch (purpose) { + case SyncSessionJob::CONFIGURATION: + *start = DOWNLOAD_UPDATES; + *end = APPLY_UPDATES; + return; + case SyncSessionJob::CLEAR_USER_DATA: + *start = CLEAR_PRIVATE_DATA; + return; + case SyncSessionJob::NUDGE: + case SyncSessionJob::POLL: + *start = SYNCER_BEGIN; + return; + default: + NOTREACHED(); + } +} + +void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + if (!ShouldRunJob(job)) + return; + + if (job.purpose == SyncSessionJob::NUDGE) { + DCHECK(pending_nudge_.get()); + if (pending_nudge_->session != job.session) + return; // Another nudge must have been scheduled in in the meantime. + pending_nudge_.reset(); + } + VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " + << job.purpose; + + SyncerStep begin(SYNCER_BEGIN); + SyncerStep end(SYNCER_END); + SetSyncerStepsForPurpose(job.purpose, &begin, &end); + + bool has_more_to_sync = true; + while (ShouldRunJob(job) && has_more_to_sync) { + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Calling SyncShare."; + // Synchronously perform the sync session from this thread. + syncer_->SyncShare(job.session.get(), begin, end); + has_more_to_sync = job.session->HasMoreToSync(); + if (has_more_to_sync) + job.session->ResetTransientState(); + } + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Done SyncShare looping."; + FinishSyncSessionJob(job); +} + +void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + // Whatever types were part of a configuration task will have had updates + // downloaded. For that reason, we make sure they get recorded in the + // event that they get disabled at a later time. + ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); + if (!r.empty()) { + ModelSafeRoutingInfo temp_r; + ModelSafeRoutingInfo old_info(old_job.session->routing_info()); + std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), + std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); + session_context_->set_previous_session_routing_info(temp_r); + } + } else { + session_context_->set_previous_session_routing_info( + old_job.session->routing_info()); + } +} + +void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + // Update timing information for how often datatypes are triggering nudges. + base::TimeTicks now = TimeTicks::Now(); + if (!last_sync_session_end_time_.is_null()) { + ModelTypePayloadMap::const_iterator iter; + for (iter = job.session->source().types.begin(); + iter != job.session->source().types.end(); + ++iter) { + syncable::PostTimeToTypeHistogram(iter->first, + now - last_sync_session_end_time_); + } + } + last_sync_session_end_time_ = now; + UpdateCarryoverSessionState(job); + if (IsSyncingCurrentlySilenced()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " We are currently throttled. So not scheduling the next sync."; + SaveJob(job); + return; // Nothing to do. + } + + VLOG(2) << "SyncerThread(" << this << ")" + << " Updating the next polling time after SyncMain"; + ScheduleNextSync(job); +} + +void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!old_job.session->HasMoreToSync()); + // Note: |num_server_changes_remaining| > 0 here implies that we received a + // broken response while trying to download all updates, because the Syncer + // will loop until this value is exhausted. Also, if unsynced_handles exist + // but HasMoreToSync is false, this implies that the Syncer determined no + // forward progress was possible at this time (an error, such as an HTTP + // 500, is likely to have occurred during commit). + const bool work_to_do = + old_job.session->status_controller()->num_server_changes_remaining() > 0 + || old_job.session->status_controller()->unsynced_handles().size() > 0; + VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: " + << work_to_do; + + AdjustPolling(&old_job); + + // TODO(tim): Old impl had special code if notifications disabled. Needed? + if (!work_to_do) { + // Success implies backoff relief. Note that if this was a "one-off" job + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was + // work_to_do before it ran this wont have changed, as jobs like this don't + // run a full sync cycle. So we don't need special code here. + wait_interval_.reset(); + VLOG(2) << "SyncerThread(" << this << ")" + << " Job suceeded so not scheduling more jobs"; + return; + } + + if (old_job.session->source().updates_source == + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Job failed with source continuation"; + // We don't seem to have made forward progress. Start or extend backoff. + HandleConsecutiveContinuationError(old_job); + } else if (IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " A nudge during backoff failed"; + // We weren't continuing but we're in backoff; must have been a nudge. + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); + DCHECK(!wait_interval_->had_nudge); + wait_interval_->had_nudge = true; + wait_interval_->timer.Reset(); + } else { + VLOG(2) << "SyncerThread(" << this << ")" + << " Failed. Schedule a job with continuation as source"; + // We weren't continuing and we aren't in backoff. Schedule a normal + // continuation. + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + ScheduleConfigImpl(old_job.session->routing_info(), + old_job.session->workers(), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); + } else { + // For all other purposes(nudge and poll) we schedule a retry nudge. + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), + old_job.session->source().types, false, FROM_HERE); + } + } +} + +void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { + DCHECK(thread_.IsRunning()); + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + TimeDelta poll = (!session_context_->notifications_enabled()) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + bool rate_changed = !poll_timer_.IsRunning() || + poll != poll_timer_.GetCurrentDelay(); + + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) + poll_timer_.Reset(); + + if (!rate_changed) + return; + + // Adjust poll rate. + poll_timer_.Stop(); + poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); +} + +void SyncerThread::HandleConsecutiveContinuationError( + const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + // This if conditions should be compiled out in retail builds. + if (IsBackingOff()) { + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); + } + SyncSession* old = old_job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + TimeDelta length = delay_provider_->GetDelay( + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + + VLOG(2) << "SyncerThread(" << this << ")" + << " In handle continuation error. Old job purpose is " + << old_job.purpose; + VLOG(2) << "SyncerThread(" << this << ")" + << " In Handle continuation error. The time delta(ms) is: " + << length.InMilliseconds(); + + // This will reset the had_nudge variable as well. + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, + length)); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, + make_linked_ptr(s), false, FROM_HERE); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); + } else { + // We are not in configuration mode. So wait_interval's pending job + // should be null. + DCHECK(wait_interval_->pending_configure_job.get() == NULL); + + // TODO(lipalani) - handle clear user data. + InitOrCoalescePendingJob(old_job); + } + wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); +} + +// static +TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { + if (last_delay.InSeconds() >= kMaxBackoffSeconds) + return TimeDelta::FromSeconds(kMaxBackoffSeconds); + + // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 + int64 backoff_s = + std::max(static_cast<int64>(1), + last_delay.InSeconds() * kBackoffRandomizationFactor); + + // Flip a coin to randomize backoff interval by +/- 50%. + int rand_sign = base::RandInt(0, 1) * 2 - 1; + + // Truncation is adequate for rounding here. + backoff_s = backoff_s + + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); + + // Cap the backoff interval. + backoff_s = std::max(static_cast<int64>(1), + std::min(backoff_s, kMaxBackoffSeconds)); + + return TimeDelta::FromSeconds(backoff_s); +} + +void SyncerThread::Stop() { + VLOG(2) << "SyncerThread(" << this << ")" << " stop called"; + syncer_->RequestEarlyExit(); // Safe to call from any thread. + session_context_->connection_manager()->RemoveListener(this); + thread_.Stop(); +} + +void SyncerThread::DoCanaryJob() { + VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job"; + DoPendingJobIfPossible(true); +} + +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { + SyncSessionJob* job_to_execute = NULL; + if (mode_ == CONFIGURATION_MODE && wait_interval_.get() + && wait_interval_->pending_configure_job.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job"; + job_to_execute = wait_interval_->pending_configure_job.get(); + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job"; + // Pending jobs mostly have time from the past. Reset it so this job + // will get executed. + if (pending_nudge_->scheduled_start < TimeTicks::Now()) + pending_nudge_->scheduled_start = TimeTicks::Now(); + + scoped_ptr<SyncSession> session(CreateSyncSession( + pending_nudge_->session->source())); + + // Also the routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + pending_nudge_->session->Coalesce(*(session.get())); + // The pending nudge would be cleared in the DoSyncSessionJob function. + job_to_execute = pending_nudge_.get(); + } + + if (job_to_execute != NULL) { + VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job"; + SyncSessionJob copy = *job_to_execute; + copy.is_canary_job = is_canary_job; + DoSyncSessionJob(copy); + } +} + +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(source); + + SyncSession* session(new SyncSession(session_context_.get(), this, info, + routes, workers)); + + return session; +} + +void SyncerThread::PollTimerCallback() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + ModelSafeRoutingInfo r; + ModelTypePayloadMap types_with_payloads = + syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); + SyncSession* s = CreateSyncSession(info); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, + FROM_HERE); +} + +void SyncerThread::Unthrottle() { + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled.."; + DoCanaryJob(); + wait_interval_.reset(); +} + +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + session_context_->NotifyListeners(SyncEngineEvent(cause)); +} + +bool SyncerThread::IsBackingOff() const { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::EXPONENTIAL_BACKOFF; +} + +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, + silenced_until - TimeTicks::Now())); + wait_interval_->timer.Start(wait_interval_->length, this, + &SyncerThread::Unthrottle); +} + +bool SyncerThread::IsSyncingCurrentlySilenced() { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::THROTTLED; +} + +void SyncerThread::OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_short_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_long_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnShouldStopSyncingPermanently() { + VLOG(2) << "SyncerThread(" << this << ")" + << " OnShouldStopSyncingPermanently"; + syncer_->RequestEarlyExit(); // Thread-safe. + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); +} + +void SyncerThread::OnServerConnectionEvent( + const ServerConnectionEvent2& event) { + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::CheckServerConnectionManagerStatus, + event.connection_code)); +} + +void SyncerThread::set_notifications_enabled(bool notifications_enabled) { + session_context_->set_notifications_enabled(notifications_enabled); +} + +} // s3 +} // browser_sync diff --git a/chrome/browser/sync/engine/syncer_thread2.h b/chrome/browser/sync/engine/syncer_thread2.h new file mode 100644 index 0000000..1c698db --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2.h @@ -0,0 +1,344 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A class to run the syncer on a thread. +#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD2_H_ +#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD2_H_ +#pragma once + +#include "base/callback.h" +#include "base/memory/linked_ptr.h" +#include "base/memory/scoped_ptr.h" +#include "base/observer_list.h" +#include "base/task.h" +#include "base/threading/thread.h" +#include "base/time.h" +#include "base/timer.h" +#include "chrome/browser/sync/engine/nudge_source.h" +#include "chrome/browser/sync/engine/polling_constants.h" +#include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/syncable/model_type_payload_map.h" +#include "chrome/browser/sync/engine/net/server_connection_manager.h" +#include "chrome/browser/sync/sessions/sync_session.h" +#include "chrome/browser/sync/sessions/sync_session_context.h" + +namespace browser_sync { + +struct ServerConnectionEvent; + +namespace s3 { + +class SyncerThread : public sessions::SyncSession::Delegate, + public ServerConnectionEventListener { + public: + enum Mode { + // In this mode, the thread only performs configuration tasks. This is + // designed to make the case where we want to download updates for a + // specific type only, and not continue syncing until we are moved into + // normal mode. + CONFIGURATION_MODE, + // Resumes polling and allows nudges, drops configuration tasks. Runs + // through entire sync cycle. + NORMAL_MODE, + }; + + // Takes ownership of both |context| and |syncer|. + SyncerThread(sessions::SyncSessionContext* context, Syncer* syncer); + virtual ~SyncerThread(); + + typedef Callback0::Type ModeChangeCallback; + + // Change the mode of operation. + // We don't use a lock when changing modes, so we won't cause currently + // scheduled jobs to adhere to the new mode. We could protect it, but it + // doesn't buy very much as a) a session could already be in progress and it + // will continue no matter what, b) the scheduled sessions already contain + // all their required state and won't be affected by potential change at + // higher levels (i.e. the registrar), and c) we service tasks FIFO, so once + // the mode changes all future jobs will be run against the updated mode. + // If supplied, |callback| will be invoked when the mode has been + // changed to |mode| *from the SyncerThread*, and not from the caller + // thread. + void Start(Mode mode, ModeChangeCallback* callback); + + // Joins on the thread as soon as possible (currently running session + // completes). + void Stop(); + + // The meat and potatoes. + void ScheduleNudge(const base::TimeDelta& delay, NudgeSource source, + const syncable::ModelTypeBitSet& types, + const tracked_objects::Location& nudge_location); + void ScheduleNudgeWithPayloads( + const base::TimeDelta& delay, NudgeSource source, + const syncable::ModelTypePayloadMap& types_with_payloads, + const tracked_objects::Location& nudge_location); + void ScheduleConfig(const syncable::ModelTypeBitSet& types); + void ScheduleClearUserData(); + + // Change status of notifications in the SyncSessionContext. + void set_notifications_enabled(bool notifications_enabled); + + // DDOS avoidance function. Calculates how long we should wait before trying + // again after a failed sync attempt, where the last delay was |base_delay|. + // TODO(tim): Look at URLRequestThrottlerEntryInterface. + static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay); + + // SyncSession::Delegate implementation. + virtual void OnSilencedUntil(const base::TimeTicks& silenced_until); + virtual bool IsSyncingCurrentlySilenced(); + virtual void OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval); + virtual void OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval); + virtual void OnShouldStopSyncingPermanently(); + + // ServerConnectionEventListener implementation. + // TODO(tim): schedule a nudge when valid connection detected? in 1 minute? + virtual void OnServerConnectionEvent(const ServerConnectionEvent2& event); + + private: + enum JobProcessDecision { + // Indicates we should continue with the current job. + CONTINUE, + // Indicates that we should save it to be processed later. + SAVE, + // Indicates we should drop this job. + DROP, + }; + + struct SyncSessionJob { + // An enum used to describe jobs for scheduling purposes. + enum SyncSessionJobPurpose { + // Our poll timer schedules POLL jobs periodically based on a server + // assigned poll interval. + POLL, + // A nudge task can come from a variety of components needing to force + // a sync. The source is inferable from |session.source()|. + NUDGE, + // The user invoked a function in the UI to clear their entire account + // and stop syncing (globally). + CLEAR_USER_DATA, + // Typically used for fetching updates for a subset of the enabled types + // during initial sync or reconfiguration. We don't run all steps of + // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). + CONFIGURATION, + }; + SyncSessionJob(); + SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location); + ~SyncSessionJob(); + SyncSessionJobPurpose purpose; + base::TimeTicks scheduled_start; + linked_ptr<sessions::SyncSession> session; + bool is_canary_job; + + // This is the location the nudge came from. used for debugging purpose. + // In case of multiple nudges getting coalesced this stores the first nudge + // that came in. + tracked_objects::Location nudge_location; + }; + friend class SyncerThread2Test; + friend class SyncerThread2WhiteboxTest; + + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + DropNudgeWhileExponentialBackOff); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, SaveNudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueNudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, DropPoll); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinuePoll); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueConfiguration); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + SaveConfigurationWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + SaveNudgeWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueClearUserDataUnderAllCircumstances); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueCanaryJobConfig); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueNudgeWhileExponentialBackOff); + + // A component used to get time delays associated with exponential backoff. + // Encapsulated into a class to facilitate testing. + class DelayProvider { + public: + DelayProvider(); + virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay); + virtual ~DelayProvider(); + private: + DISALLOW_COPY_AND_ASSIGN(DelayProvider); + }; + + struct WaitInterval { + enum Mode { + // A wait interval whose duration has been affected by exponential + // backoff. + // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. + EXPONENTIAL_BACKOFF, + // A server-initiated throttled interval. We do not allow any syncing + // during such an interval. + THROTTLED, + }; + WaitInterval(); + ~WaitInterval(); + + Mode mode; + + // This bool is set to true if we have observed a nudge during this + // interval and mode == EXPONENTIAL_BACKOFF. + bool had_nudge; + base::TimeDelta length; + base::OneShotTimer<SyncerThread> timer; + + // Configure jobs are saved only when backing off or throttling. So we + // expose the pointer here. + scoped_ptr<SyncSessionJob> pending_configure_job; + WaitInterval(Mode mode, base::TimeDelta length); + }; + + // Helper to assemble a job and post a delayed task to sync. + void ScheduleSyncSessionJob( + const base::TimeDelta& delay, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, + const tracked_objects::Location& nudge_location); + + // Invoke the Syncer to perform a sync. + void DoSyncSessionJob(const SyncSessionJob& job); + + // Called after the Syncer has performed the sync represented by |job|, to + // reset our state. + void FinishSyncSessionJob(const SyncSessionJob& job); + + // Record important state that might be needed in future syncs, such as which + // data types may require cleanup. + void UpdateCarryoverSessionState(const SyncSessionJob& old_job); + + // Helper to FinishSyncSessionJob to schedule the next sync operation. + void ScheduleNextSync(const SyncSessionJob& old_job); + + // Helper to configure polling intervals. Used by Start and ScheduleNextSync. + void AdjustPolling(const SyncSessionJob* old_job); + + // Helper to ScheduleNextSync in case of consecutive sync errors. + void HandleConsecutiveContinuationError(const SyncSessionJob& old_job); + + // Determines if it is legal to run |job| by checking current + // operational mode, backoff or throttling, freshness + // (so we don't make redundant syncs), and connection. + bool ShouldRunJob(const SyncSessionJob& job); + + // Decide whether we should CONTINUE, SAVE or DROP the job. + JobProcessDecision DecideOnJob(const SyncSessionJob& job); + + // Decide on whether to CONTINUE, SAVE or DROP the job when we are in + // backoff mode. + JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); + + // Saves the job for future execution. Note: It drops all the poll jobs. + void SaveJob(const SyncSessionJob& job); + + // Coalesces the current job with the pending nudge. + void InitOrCoalescePendingJob(const SyncSessionJob& job); + + // 'Impl' here refers to real implementation of public functions, running on + // |thread_|. + void StartImpl(Mode mode, linked_ptr<ModeChangeCallback> callback); + void ScheduleNudgeImpl( + const base::TimeDelta& delay, + sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, + const syncable::ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location); + void ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source); + void ScheduleClearUserDataImpl(); + + // Returns true if the client is currently in exponential backoff. + bool IsBackingOff() const; + + // Helper to signal all listeners registered with |session_context_|. + void Notify(SyncEngineEvent::EventCause cause); + + // Callback to change backoff state. + void DoCanaryJob(); + void Unthrottle(); + + // Executes the pending job. Called whenever an event occurs that may + // change conditions permitting a job to run. Like when network connection is + // re-established, mode changes etc. + void DoPendingJobIfPossible(bool is_canary_job); + + // The pointer is owned by the caller. + browser_sync::sessions::SyncSession* CreateSyncSession( + const browser_sync::sessions::SyncSourceInfo& info); + + // Creates a session for a poll and performs the sync. + void PollTimerCallback(); + + // Assign |start| and |end| to appropriate SyncerStep values for the + // specified |purpose|. + void SetSyncerStepsForPurpose(SyncSessionJob::SyncSessionJobPurpose purpose, + SyncerStep* start, + SyncerStep* end); + + // Initializes the hookup between the ServerConnectionManager and us. + void WatchConnectionManager(); + + // Used to update |server_connection_ok_|, see below. + void CheckServerConnectionManagerStatus( + HttpResponse::ServerConnectionCode code); + + // Called once the first time thread_ is started to broadcast an initial + // session snapshot containing data like initial_sync_ended. Important when + // the client starts up and does not need to perform an initial sync. + void SendInitialSnapshot(); + + base::Thread thread_; + + // Modifiable versions of kDefaultLongPollIntervalSeconds which can be + // updated by the server. + base::TimeDelta syncer_short_poll_interval_seconds_; + base::TimeDelta syncer_long_poll_interval_seconds_; + + // Periodic timer for polling. See AdjustPolling. + base::RepeatingTimer<SyncerThread> poll_timer_; + + // The mode of operation. We don't use a lock, see Start(...) comment. + Mode mode_; + + // TODO(tim): Bug 26339. This needs to track more than just time I think, + // since the nudges could be for different types. Current impl doesn't care. + base::TimeTicks last_sync_session_end_time_; + + // Have we observed a valid server connection? + bool server_connection_ok_; + + // Tracks in-flight nudges so we can coalesce. + scoped_ptr<SyncSessionJob> pending_nudge_; + + // Current wait state. Null if we're not in backoff and not throttled. + scoped_ptr<WaitInterval> wait_interval_; + + scoped_ptr<DelayProvider> delay_provider_; + + // Invoked to run through the sync cycle. + scoped_ptr<Syncer> syncer_; + + scoped_ptr<sessions::SyncSessionContext> session_context_; + + DISALLOW_COPY_AND_ASSIGN(SyncerThread); +}; + +} // namespace s3 + +} // namespace browser_sync + +// The SyncerThread manages its own internal thread and thus outlives it. We +// don't need refcounting for posting tasks to this internal thread. +DISABLE_RUNNABLE_METHOD_REFCOUNT(browser_sync::s3::SyncerThread); + +#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD2_H_ diff --git a/chrome/browser/sync/engine/syncer_thread2_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_unittest.cc index d231a06..2823567 100644 --- a/chrome/browser/sync/engine/syncer_thread2_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread2_unittest.cc @@ -6,7 +6,7 @@ #include "base/test/test_timeouts.h" #include "chrome/browser/sync/engine/mock_model_safe_workers.h" #include "chrome/browser/sync/engine/syncer.h" -#include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" #include "chrome/browser/sync/sessions/test_util.h" #include "chrome/test/sync/engine/mock_connection_manager.h" #include "chrome/test/sync/engine/test_directory_setter_upper.h" @@ -37,6 +37,8 @@ class MockSyncer : public Syncer { SyncerStep)); }; +namespace s3 { + // Used when tests want to record syncing activity to examine later. struct SyncShareRecords { std::vector<TimeTicks> times; @@ -927,7 +929,8 @@ TEST_F(SyncerThread2Test, SetsPreviousRoutingInfo) { EXPECT_TRUE(expected == context()->previous_session_routing_info()); } +} // namespace s3 } // namespace browser_sync // SyncerThread won't outlive the test! -DISABLE_RUNNABLE_METHOD_REFCOUNT(browser_sync::SyncerThread2Test); +DISABLE_RUNNABLE_METHOD_REFCOUNT(browser_sync::s3::SyncerThread2Test); diff --git a/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc index e041aaa..e1d1218 100644 --- a/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc @@ -4,7 +4,7 @@ #include "base/time.h" #include "chrome/browser/sync/engine/mock_model_safe_workers.h" -#include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" #include "chrome/browser/sync/sessions/sync_session_context.h" #include "chrome/browser/sync/sessions/test_util.h" #include "chrome/test/sync/engine/mock_connection_manager.h" @@ -19,6 +19,8 @@ namespace browser_sync { using sessions::SyncSessionContext; using browser_sync::Syncer; +namespace s3 { + class SyncerThread2WhiteboxTest : public testing::Test { public: virtual void SetUp() { @@ -224,8 +226,10 @@ TEST_F(SyncerThread2WhiteboxTest, ContinueCanaryJobConfig) { EXPECT_EQ(decision, SyncerThread::CONTINUE); } + +} // namespace s3 } // namespace browser_sync // SyncerThread won't outlive the test! DISABLE_RUNNABLE_METHOD_REFCOUNT( - browser_sync::SyncerThread2WhiteboxTest); + browser_sync::s3::SyncerThread2WhiteboxTest); diff --git a/chrome/browser/sync/engine/syncer_thread_adapter.cc b/chrome/browser/sync/engine/syncer_thread_adapter.cc new file mode 100644 index 0000000..8a513e9 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread_adapter.cc @@ -0,0 +1,148 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/time.h" +#include "base/tracked.h" +#include "chrome/browser/sync/engine/syncer_thread_adapter.h" +#include "chrome/browser/sync/syncable/model_type.h" + +using base::TimeDelta; +using syncable::ModelTypeBitSet; + +namespace browser_sync { + +SyncerThreadAdapter::SyncerThreadAdapter(sessions::SyncSessionContext* context, + bool using_new_impl) + : legacy_(NULL), new_impl_(NULL), using_new_impl_(using_new_impl) { + if (using_new_impl_) { + new_impl_.reset(new s3::SyncerThread(context, new Syncer())); + } else { + legacy_ = new SyncerThread(context); + } +} + +SyncerThreadAdapter::~SyncerThreadAdapter() { + legacy_ = NULL; + new_impl_.reset(); +} + +void SyncerThreadAdapter::WatchConnectionManager( + ServerConnectionManager* conn_mgr) { + DCHECK(!using_new_impl_); + legacy_->WatchConnectionManager(conn_mgr); +} + +bool SyncerThreadAdapter::Start() { + if (using_new_impl_) { + new_impl_->Start(s3::SyncerThread::NORMAL_MODE, NULL); + return true; + } else { + return legacy_->Start(); + } +} + +bool SyncerThreadAdapter::Stop(int max_wait) { + if (using_new_impl_) { + new_impl_->Stop(); + return true; + } else { + return legacy_->Stop(max_wait); + } +} + +bool SyncerThreadAdapter::RequestPause() { + DCHECK(!using_new_impl_); + return legacy_->RequestPause(); +} + +bool SyncerThreadAdapter::RequestResume() { + DCHECK(!using_new_impl_); + return legacy_->RequestResume(); +} + +s3::NudgeSource LegacyToNewSyncerThreadSource(SyncerThread::NudgeSource s) { + switch (s) { + case SyncerThread::kNotification: + return s3::NUDGE_SOURCE_NOTIFICATION; + case SyncerThread::kContinuation: + return s3::NUDGE_SOURCE_CONTINUATION; + case SyncerThread::kLocal: + return s3::NUDGE_SOURCE_LOCAL; + case SyncerThread::kUnknown: + return s3::NUDGE_SOURCE_UNKNOWN; + default: + NOTREACHED(); + return s3::NUDGE_SOURCE_UNKNOWN; + } +} + +void SyncerThreadAdapter::NudgeSyncer(int milliseconds_from_now, + SyncerThread::NudgeSource source, + const tracked_objects::Location& nudge_location) { + if (using_new_impl_) { + if (source == SyncerThread::kClearPrivateData) { + new_impl_->ScheduleClearUserData(); + return; + } + new_impl_->ScheduleNudge( + TimeDelta::FromMilliseconds(milliseconds_from_now), + LegacyToNewSyncerThreadSource(source), ModelTypeBitSet(), + nudge_location); + } else { + legacy_->NudgeSyncer(milliseconds_from_now, source); + } +} + +void SyncerThreadAdapter::NudgeSyncerWithDataTypes( + int milliseconds_from_now, + SyncerThread::NudgeSource source, + const syncable::ModelTypeBitSet& model_types, + const tracked_objects::Location& nudge_location) { + DCHECK_NE(SyncerThread::kClearPrivateData, source); + if (using_new_impl_) { + new_impl_->ScheduleNudge( + TimeDelta::FromMilliseconds(milliseconds_from_now), + LegacyToNewSyncerThreadSource(source), model_types, + nudge_location); + } else { + legacy_->NudgeSyncerWithDataTypes(milliseconds_from_now, source, + model_types); + } +} + +void SyncerThreadAdapter::NudgeSyncerWithPayloads( + int milliseconds_from_now, + SyncerThread::NudgeSource source, + const syncable::ModelTypePayloadMap& model_types_with_payloads, + const tracked_objects::Location& nudge_location) { + DCHECK_NE(SyncerThread::kClearPrivateData, source); + if (using_new_impl_) { + new_impl_->ScheduleNudgeWithPayloads( + TimeDelta::FromMilliseconds(milliseconds_from_now), + LegacyToNewSyncerThreadSource(source), model_types_with_payloads, + nudge_location); + } else { + legacy_->NudgeSyncerWithPayloads(milliseconds_from_now, source, + model_types_with_payloads); + } +} + +void SyncerThreadAdapter::SetNotificationsEnabled(bool enabled) { + if (using_new_impl_) + new_impl_->set_notifications_enabled(enabled); + else + legacy_->SetNotificationsEnabled(enabled); +} + +void SyncerThreadAdapter::CreateSyncer(const std::string& dirname) { + if (!using_new_impl_) + legacy_->CreateSyncer(dirname); + // No-op if using new impl. +} + +s3::SyncerThread* SyncerThreadAdapter::new_impl() { + return new_impl_.get(); +} + +} // namespace browser_sync diff --git a/chrome/browser/sync/engine/syncer_thread_adapter.h b/chrome/browser/sync/engine/syncer_thread_adapter.h new file mode 100644 index 0000000..6b234e9 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread_adapter.h @@ -0,0 +1,60 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_ADAPTER_H_ +#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_ADAPTER_H_ +#pragma once + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/tracked.h" +#include "chrome/browser/sync/engine/net/server_connection_manager.h" +#include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" +#include "chrome/browser/sync/sessions/sync_session_context.h" +#include "chrome/browser/sync/syncable/model_type.h" +#include "chrome/browser/sync/syncable/model_type_payload_map.h" + +namespace browser_sync { + +class SyncerThreadAdapter { + public: + SyncerThreadAdapter(sessions::SyncSessionContext* context, + bool using_new_impl); + ~SyncerThreadAdapter(); + + // We mimic the SyncerThread interface (an adaptee). + void WatchConnectionManager(ServerConnectionManager* conn_mgr); + bool Start(); + bool Stop(int max_wait); + bool RequestPause(); + bool RequestResume(); + void NudgeSyncer(int milliseconds_from_now, + SyncerThread::NudgeSource source, + const tracked_objects::Location& nudge_location); + void NudgeSyncerWithDataTypes( + int milliseconds_from_now, + SyncerThread::NudgeSource source, + const syncable::ModelTypeBitSet& model_types, + const tracked_objects::Location& nudge_location); + void NudgeSyncerWithPayloads( + int milliseconds_from_now, + SyncerThread::NudgeSource source, + const syncable::ModelTypePayloadMap& model_types_with_payloads, + const tracked_objects::Location& nudge_location); + void SetNotificationsEnabled(bool enabled); + void CreateSyncer(const std::string& dirname); + + // Expose the new impl to leverage new apis through the adapter. + s3::SyncerThread* new_impl(); + private: + scoped_refptr<SyncerThread> legacy_; + scoped_ptr<s3::SyncerThread> new_impl_; + bool using_new_impl_; + + DISALLOW_COPY_AND_ASSIGN(SyncerThreadAdapter); +}; + +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_ADAPTER_H_ diff --git a/chrome/browser/sync/engine/syncer_thread_unittest.cc b/chrome/browser/sync/engine/syncer_thread_unittest.cc index 7a37f38..faa0e9b 100644 --- a/chrome/browser/sync/engine/syncer_thread_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread_unittest.cc @@ -1,3 +1,1245 @@ // Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. + +#include <list> +#include <map> + +#include "base/memory/scoped_ptr.h" +#include "base/synchronization/lock.h" +#include "base/time.h" +#include "base/synchronization/waitable_event.h" +#include "chrome/browser/sync/engine/model_safe_worker.h" +#include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/engine/syncer_types.h" +#include "chrome/browser/sync/sessions/sync_session_context.h" +#include "chrome/browser/sync/util/channel.h" +#include "chrome/test/sync/engine/mock_connection_manager.h" +#include "chrome/test/sync/engine/test_directory_setter_upper.h" +#include "chrome/test/sync/sessions/test_scoped_session_event_listener.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::TimeTicks; +using base::TimeDelta; +using base::WaitableEvent; +using testing::_; +using testing::AnyNumber; +using testing::Field; + +namespace browser_sync { +using sessions::ErrorCounters; +using sessions::TestScopedSessionEventListener; +using sessions::SyncSessionContext; +using sessions::SyncSessionSnapshot; +using sessions::SyncerStatus; + +typedef testing::Test SyncerThreadTest; +typedef SyncerThread::WaitInterval WaitInterval; + +ACTION_P(SignalEvent, event) { + event->Signal(); +} + +SyncSessionSnapshot SessionSnapshotForTest( + int64 num_server_changes_remaining, + int64 unsynced_count) { + std::string download_progress_markers[syncable::MODEL_TYPE_COUNT]; + for (int i = syncable::FIRST_REAL_MODEL_TYPE; + i < syncable::MODEL_TYPE_COUNT; + ++i) { + syncable::ModelType type(syncable::ModelTypeFromInt(i)); + sync_pb::DataTypeProgressMarker token; + token.set_data_type_id( + syncable::GetExtensionFieldNumberFromModelType(type)); + token.set_token("foobar"); + token.SerializeToString(&download_progress_markers[i]); + } + return SyncSessionSnapshot(SyncerStatus(), ErrorCounters(), + num_server_changes_remaining, false, + syncable::ModelTypeBitSet(), download_progress_markers, + false, false, unsynced_count, 0, false, sessions::SyncSourceInfo()); +} + +class ListenerMock : public SyncEngineEventListener { + public: + MOCK_METHOD1(OnSyncEngineEvent, void(const SyncEngineEvent&)); +}; + +class SyncerThreadWithSyncerTest : public testing::Test, + public ModelSafeWorkerRegistrar, + public SyncEngineEventListener { + public: + SyncerThreadWithSyncerTest() + : max_wait_time_(TimeDelta::FromSeconds(10)), + sync_cycle_ended_event_(false, false) {} + virtual void SetUp() { + metadb_.SetUp(); + connection_.reset(new MockConnectionManager(metadb_.manager(), + metadb_.name())); + worker_ = new ModelSafeWorker(); + std::vector<SyncEngineEventListener*> listeners; + listeners.push_back(this); + context_ = new SyncSessionContext(connection_.get(), metadb_.manager(), + this, listeners); + context_->set_account_name(metadb_.name()); + syncer_thread_ = new SyncerThread(context_); + syncer_thread_->SetConnected(true); + syncable::ModelTypeBitSet expected_types; + expected_types[syncable::BOOKMARKS] = true; + connection_->ExpectGetUpdatesRequestTypes(expected_types); + } + virtual void TearDown() { + context_ = NULL; + syncer_thread_ = NULL; + connection_.reset(); + metadb_.TearDown(); + } + + // ModelSafeWorkerRegistrar implementation. + virtual void GetWorkers(std::vector<ModelSafeWorker*>* out) { + out->push_back(worker_.get()); + } + + virtual void GetModelSafeRoutingInfo(ModelSafeRoutingInfo* out) { + // We're just testing the sync engine here, so we shunt everything to + // the SyncerThread. + (*out)[syncable::BOOKMARKS] = GROUP_PASSIVE; + } + + ManuallyOpenedTestDirectorySetterUpper* metadb() { return &metadb_; } + MockConnectionManager* connection() { return connection_.get(); } + SyncerThread* syncer_thread() { return syncer_thread_; } + + // Waits an indefinite amount of sync cycles for the syncer thread to become + // throttled. Only call this if a throttle is supposed to occur! + bool WaitForThrottle() { + int max_cycles = 5; + while (max_cycles && !syncer_thread()->IsSyncingCurrentlySilenced()) { + sync_cycle_ended_event_.TimedWait(max_wait_time_); + max_cycles--; + } + + return syncer_thread()->IsSyncingCurrentlySilenced(); + } + + void WaitForDisconnect() { + // Wait for the SyncerThread to detect loss of connection, up to a max of + // 10 seconds to timeout the test. + base::AutoLock lock(syncer_thread()->lock_); + TimeTicks start = TimeTicks::Now(); + TimeDelta ten_seconds = TimeDelta::FromSeconds(10); + while (syncer_thread()->vault_.connected_) { + syncer_thread()->vault_field_changed_.TimedWait(ten_seconds); + if (TimeTicks::Now() - start > ten_seconds) + break; + } + EXPECT_FALSE(syncer_thread()->vault_.connected_); + } + + bool Pause(ListenerMock* listener) { + WaitableEvent event(false, false); + { + base::AutoLock lock(syncer_thread()->lock_); + EXPECT_CALL(*listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_PAUSED))). + WillOnce(SignalEvent(&event)); + } + if (!syncer_thread()->RequestPause()) + return false; + return event.TimedWait(max_wait_time_); + } + + bool Resume(ListenerMock* listener) { + WaitableEvent event(false, false); + { + base::AutoLock lock(syncer_thread()->lock_); + EXPECT_CALL(*listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_RESUMED))). + WillOnce(SignalEvent(&event)); + } + if (!syncer_thread()->RequestResume()) + return false; + return event.TimedWait(max_wait_time_); + } + + void PreventThreadFromPolling() { + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + } + + // Compare a provided TypePayloadMap to the pending nudge info stored in the + // SyncerThread vault. + bool CompareNudgeTypesToVault(const syncable::ModelTypePayloadMap& lhs) { + const syncable::ModelTypePayloadMap& vault_nudge_types = + syncer_thread()->vault_.pending_nudge_types_; + return lhs == vault_nudge_types; + } + + // Compare a provided ModelTypeBitset to the pending nudge info stored in the + // SyncerThread vault. Nudge info in vault must not have any non-empty + // payloads. + bool CompareNudgeTypesBitSetToVault(const syncable::ModelTypeBitSet& lhs) { + syncable::ModelTypePayloadMap model_types_with_payloads = + syncable::ModelTypePayloadMapFromBitSet(lhs, std::string()); + size_t count = 0; + for (syncable::ModelTypePayloadMap::const_iterator i = + syncer_thread()->vault_.pending_nudge_types_.begin(); + i != syncer_thread()->vault_.pending_nudge_types_.end(); + ++i, ++count) { + if (!lhs.test(i->first)) + return false; + } + if (lhs.count() != count) + return false; + return true; + } + + + private: + + virtual void OnSyncEngineEvent(const SyncEngineEvent& event) { + if (event.what_happened == SyncEngineEvent::SYNC_CYCLE_ENDED) + sync_cycle_ended_event_.Signal(); + } + + protected: + TimeDelta max_wait_time_; + SyncSessionContext* context_; + + private: + ManuallyOpenedTestDirectorySetterUpper metadb_; + scoped_ptr<MockConnectionManager> connection_; + scoped_refptr<SyncerThread> syncer_thread_; + scoped_refptr<ModelSafeWorker> worker_; + base::WaitableEvent sync_cycle_ended_event_; + DISALLOW_COPY_AND_ASSIGN(SyncerThreadWithSyncerTest); +}; + +class SyncShareIntercept + : public MockConnectionManager::ResponseCodeOverrideRequestor, + public MockConnectionManager::MidCommitObserver { + public: + SyncShareIntercept() : sync_occured_(false, false), + allow_multiple_interceptions_(true) {} + virtual ~SyncShareIntercept() {} + virtual void Observe() { + if (!allow_multiple_interceptions_ && !times_sync_occured_.empty()) + FAIL() << "Multiple sync shares occured."; + times_sync_occured_.push_back(TimeTicks::Now()); + sync_occured_.Signal(); + } + + // ResponseCodeOverrideRequestor implementation. This assumes any override + // requested is intended to silence the SyncerThread. + virtual void OnOverrideComplete() { + // We should not see any syncing. + allow_multiple_interceptions_ = false; + times_sync_occured_.clear(); + } + + void WaitForSyncShare(int at_least_this_many, TimeDelta max_wait) { + while (at_least_this_many-- > 0) + sync_occured_.TimedWait(max_wait); + } + std::vector<TimeTicks> times_sync_occured() const { + return times_sync_occured_; + } + + void Reset() { + allow_multiple_interceptions_ = true; + times_sync_occured_.clear(); + sync_occured_.Reset(); + } + private: + std::vector<TimeTicks> times_sync_occured_; + base::WaitableEvent sync_occured_; + bool allow_multiple_interceptions_; + DISALLOW_COPY_AND_ASSIGN(SyncShareIntercept); +}; + +TEST_F(SyncerThreadTest, Construction) { + SyncSessionContext* context = new SyncSessionContext(NULL, NULL, NULL, + std::vector<SyncEngineEventListener*>()); + scoped_refptr<SyncerThread> syncer_thread(new SyncerThread(context)); +} + +TEST_F(SyncerThreadTest, StartStop) { + SyncSessionContext* context = new SyncSessionContext(NULL, NULL, NULL, + std::vector<SyncEngineEventListener*>()); + scoped_refptr<SyncerThread> syncer_thread(new SyncerThread(context)); + EXPECT_TRUE(syncer_thread->Start()); + EXPECT_TRUE(syncer_thread->Stop(2000)); + + // Do it again for good measure. I caught some bugs by adding this so + // I would recommend keeping it. + EXPECT_TRUE(syncer_thread->Start()); + EXPECT_TRUE(syncer_thread->Stop(2000)); +} + +TEST(SyncerThread, GetRecommendedDelay) { + EXPECT_LE(0, SyncerThread::GetRecommendedDelaySeconds(0)); + EXPECT_LE(1, SyncerThread::GetRecommendedDelaySeconds(1)); + EXPECT_LE(50, SyncerThread::GetRecommendedDelaySeconds(50)); + EXPECT_LE(10, SyncerThread::GetRecommendedDelaySeconds(10)); + EXPECT_EQ(SyncerThread::kMaxBackoffSeconds, + SyncerThread::GetRecommendedDelaySeconds( + SyncerThread::kMaxBackoffSeconds)); + EXPECT_EQ(SyncerThread::kMaxBackoffSeconds, + SyncerThread::GetRecommendedDelaySeconds( + SyncerThread::kMaxBackoffSeconds+1)); +} + +TEST_F(SyncerThreadTest, CalculateSyncWaitTime) { + SyncSessionContext* context = new SyncSessionContext(NULL, NULL, NULL, + std::vector<SyncEngineEventListener*>()); + scoped_refptr<SyncerThread> syncer_thread(new SyncerThread(context)); + syncer_thread->DisableIdleDetection(); + + // Syncer_polling_interval_ is less than max poll interval. + TimeDelta syncer_polling_interval = TimeDelta::FromSeconds(1); + + syncer_thread->SetSyncerPollingInterval(syncer_polling_interval); + + // user_idle_ms is less than 10 * (syncer_polling_interval*1000). + ASSERT_EQ(syncer_polling_interval.InMilliseconds(), + syncer_thread->CalculateSyncWaitTime(1000, 0)); + ASSERT_EQ(syncer_polling_interval.InMilliseconds(), + syncer_thread->CalculateSyncWaitTime(1000, 1)); + + // user_idle_ms is ge than 10 * (syncer_polling_interval*1000). + int last_poll_time = 2000; + ASSERT_TRUE(last_poll_time <= + syncer_thread->CalculateSyncWaitTime(last_poll_time, 10000)); + ASSERT_TRUE(last_poll_time * 3 >= + syncer_thread->CalculateSyncWaitTime(last_poll_time, 10000)); + ASSERT_TRUE(last_poll_time <= + syncer_thread->CalculateSyncWaitTime(last_poll_time, 100000)); + ASSERT_TRUE(last_poll_time * 3 >= + syncer_thread->CalculateSyncWaitTime(last_poll_time, 100000)); + + // Maximum backoff time should be syncer_max_interval. + int near_threshold = SyncerThread::kDefaultMaxPollIntervalMs / 2 - 1; + int threshold = SyncerThread::kDefaultMaxPollIntervalMs; + int over_threshold = SyncerThread::kDefaultMaxPollIntervalMs + 1; + ASSERT_TRUE(near_threshold <= + syncer_thread->CalculateSyncWaitTime(near_threshold, 10000)); + ASSERT_TRUE(SyncerThread::kDefaultMaxPollIntervalMs >= + syncer_thread->CalculateSyncWaitTime(near_threshold, 10000)); + ASSERT_TRUE(SyncerThread::kDefaultMaxPollIntervalMs == + syncer_thread->CalculateSyncWaitTime(threshold, 10000)); + ASSERT_TRUE(SyncerThread::kDefaultMaxPollIntervalMs == + syncer_thread->CalculateSyncWaitTime(over_threshold, 10000)); + + // Possible idle time must be capped by syncer_max_interval. + int over_sync_max_interval = + SyncerThread::kDefaultMaxPollIntervalMs + 1; + syncer_polling_interval = TimeDelta::FromSeconds( + over_sync_max_interval / 100); // so 1000* is right + syncer_thread->SetSyncerPollingInterval(syncer_polling_interval); + ASSERT_EQ(syncer_polling_interval.InSeconds() * 1000, + syncer_thread->CalculateSyncWaitTime(1000, over_sync_max_interval)); + syncer_polling_interval = TimeDelta::FromSeconds(1); + syncer_thread->SetSyncerPollingInterval(syncer_polling_interval); + ASSERT_TRUE(last_poll_time <= + syncer_thread->CalculateSyncWaitTime(last_poll_time, + over_sync_max_interval)); + ASSERT_TRUE(last_poll_time * 3 >= + syncer_thread->CalculateSyncWaitTime(last_poll_time, + over_sync_max_interval)); +} + +TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { + // Set up the environment. + int user_idle_milliseconds_param = 0; + SyncSessionContext* context = new SyncSessionContext(NULL, NULL, NULL, + std::vector<SyncEngineEventListener*>()); + scoped_refptr<SyncerThread> syncer_thread(new SyncerThread(context)); + syncer_thread->DisableIdleDetection(); + // Hold the lock to appease asserts in code. + base::AutoLock lock(syncer_thread->lock_); + + // Notifications disabled should result in a polling interval of + // kDefaultShortPollInterval. + { + context->set_notifications_enabled(false); + bool continue_sync_cycle_param = false; + + // No work and no backoff. + WaitInterval interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + + // In this case the continue_sync_cycle is turned off. + continue_sync_cycle_param = true; + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + } + + // Notifications enabled should result in a polling interval of + // SyncerThread::kDefaultLongPollIntervalSeconds. + { + context->set_notifications_enabled(true); + bool continue_sync_cycle_param = false; + + // No work and no backoff. + WaitInterval interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + + // In this case the continue_sync_cycle is turned off. + continue_sync_cycle_param = true; + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + } + + // There are two states which can cause a continuation, either the updates + // available do not match the updates received, or the unsynced count is + // non-zero. + { + // More server changes remaining to download. + context->set_last_snapshot(SessionSnapshotForTest(1, 0)); + bool continue_sync_cycle_param = false; + + WaitInterval interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_LE(0, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + continue_sync_cycle_param = false; + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_GE(3, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_LE(0, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_GE(2, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + // Now simulate no more server changes remaining. + context->set_last_snapshot(SessionSnapshotForTest(0, 0)); + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + } + + { + // Now try with unsynced local items. + context->set_last_snapshot(SessionSnapshotForTest(0, 1)); + bool continue_sync_cycle_param = false; + + WaitInterval interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_LE(0, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + continue_sync_cycle_param = false; + interval = syncer_thread->CalculatePollingWaitTime( + 0, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_GE(2, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + context->set_last_snapshot(SessionSnapshotForTest(0, 0)); + interval = syncer_thread->CalculatePollingWaitTime( + 4, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + } + + // Regression for exponential backoff reset when the syncer is nudged. + { + context->set_last_snapshot(SessionSnapshotForTest(0, 1)); + bool continue_sync_cycle_param = false; + + // Expect move from default polling interval to exponential backoff due to + // unsynced_count != 0. + WaitInterval interval = syncer_thread->CalculatePollingWaitTime( + 3600, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_LE(0, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + continue_sync_cycle_param = false; + interval = syncer_thread->CalculatePollingWaitTime( + 3600, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_GE(2, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + // Expect exponential backoff. + interval = syncer_thread->CalculatePollingWaitTime( + 2, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_LE(2, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + interval = syncer_thread->CalculatePollingWaitTime( + 2, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_GE(6, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + syncer_thread->vault_.current_wait_interval_ = interval; + + interval = syncer_thread->CalculatePollingWaitTime( + static_cast<int>(interval.poll_delta.InSeconds()), + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + true); + + // Don't change poll on a failed nudge during backoff. + ASSERT_TRUE(syncer_thread->vault_.current_wait_interval_.poll_delta == + interval.poll_delta); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_TRUE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + // If we got a nudge and we weren't in backoff mode, we see exponential + // backoff. + syncer_thread->vault_.current_wait_interval_.mode = WaitInterval::NORMAL; + interval = syncer_thread->CalculatePollingWaitTime( + 2, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + true); + + // 5 and 3 are bounds on the backoff randomization formula given input of 2. + ASSERT_GE(5, interval.poll_delta.InSeconds()); + ASSERT_LE(3, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + // And if another interval expires, we get a bigger backoff. + WaitInterval new_interval = syncer_thread->CalculatePollingWaitTime( + static_cast<int>(interval.poll_delta.InSeconds()), + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + false); + + ASSERT_GE(12, new_interval.poll_delta.InSeconds()); + ASSERT_LE(5, new_interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::EXPONENTIAL_BACKOFF, interval.mode); + ASSERT_FALSE(new_interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + // A nudge resets the continue_sync_cycle_param value, so our backoff + // should return to the minimum. + continue_sync_cycle_param = false; + interval = syncer_thread->CalculatePollingWaitTime( + 3600, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + true); + + ASSERT_LE(0, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + continue_sync_cycle_param = false; + interval = syncer_thread->CalculatePollingWaitTime( + 3600, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + true); + + ASSERT_GE(2, interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_TRUE(continue_sync_cycle_param); + + // Setting unsynced_count = 0 returns us to the default polling interval. + context->set_last_snapshot(SessionSnapshotForTest(0, 0)); + interval = syncer_thread->CalculatePollingWaitTime( + 4, + &user_idle_milliseconds_param, + &continue_sync_cycle_param, + true); + + ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, + interval.poll_delta.InSeconds()); + ASSERT_EQ(WaitInterval::NORMAL, interval.mode); + ASSERT_FALSE(interval.had_nudge_during_backoff); + ASSERT_FALSE(continue_sync_cycle_param); + } +} + +TEST_F(SyncerThreadWithSyncerTest, Polling) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + + const TimeDelta poll_interval = TimeDelta::FromSeconds(1); + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + EXPECT_TRUE(syncer_thread()->Start()); + + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + + TimeDelta two_polls = poll_interval + poll_interval; + // We could theoretically return immediately from the wait if the interceptor + // was already signaled for a SyncShare (the first one comes quick). + interceptor.WaitForSyncShare(1, two_polls); + EXPECT_FALSE(interceptor.times_sync_occured().empty()); + + // Wait for at least 2 more SyncShare operations. + interceptor.WaitForSyncShare(2, two_polls); + EXPECT_TRUE(syncer_thread()->Stop(2000)); + + // Now analyze the run. + std::vector<TimeTicks> data = interceptor.times_sync_occured(); + + EXPECT_GE(data.size(), static_cast<unsigned int>(3)); + for (unsigned int i = 0; i < data.size() - 1; i++) { + TimeTicks optimal_next_sync = data[i] + poll_interval; + EXPECT_TRUE(data[i + 1] >= optimal_next_sync); + // This should be reliable, as there are no blocking or I/O operations + // except the explicit 2 second wait, so if it takes longer than this + // there is a problem. + EXPECT_TRUE(data[i + 1] < optimal_next_sync + poll_interval); + } +} + +TEST_F(SyncerThreadWithSyncerTest, Nudge) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + // We don't want a poll to happen during this test (except the first one). + PreventThreadFromPolling(); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + interceptor.WaitForSyncShare(1, poll_interval + poll_interval); + + EXPECT_EQ(static_cast<unsigned int>(1), + interceptor.times_sync_occured().size()); + // The SyncerThread should be waiting for the poll now. Nudge it to sync + // immediately (5ms). + syncer_thread()->NudgeSyncer(5, SyncerThread::kUnknown); + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(static_cast<unsigned int>(2), + interceptor.times_sync_occured().size()); + + // SyncerThread should be waiting again. Signal it to stop. + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +TEST_F(SyncerThreadWithSyncerTest, NudgeWithDataTypes) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + // We don't want a poll to happen during this test (except the first one). + PreventThreadFromPolling(); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + interceptor.WaitForSyncShare(1, poll_interval + poll_interval); + EXPECT_EQ(static_cast<unsigned int>(1), + interceptor.times_sync_occured().size()); + + // The SyncerThread should be waiting for the poll now. Nudge it to sync + // immediately (5ms). + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + // Paused so we can verify the nudge types safely. + syncer_thread()->RequestPause(); + syncer_thread()->NudgeSyncerWithDataTypes(5, + SyncerThread::kUnknown, + model_types); + EXPECT_TRUE(CompareNudgeTypesBitSetToVault(model_types)); + syncer_thread()->RequestResume(); + + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(static_cast<unsigned int>(2), + interceptor.times_sync_occured().size()); + + // SyncerThread should be waiting again. Signal it to stop. + EXPECT_TRUE(syncer_thread()->Stop(2000)); + EXPECT_TRUE(syncer_thread()->vault_.pending_nudge_types_.empty()); +} + +TEST_F(SyncerThreadWithSyncerTest, NudgeWithDataTypesCoalesced) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + // We don't want a poll to happen during this test (except the first one). + PreventThreadFromPolling(); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + interceptor.WaitForSyncShare(1, poll_interval + poll_interval); + EXPECT_EQ(static_cast<unsigned int>(1), + interceptor.times_sync_occured().size()); + + // The SyncerThread should be waiting for the poll now. Nudge it to sync + // immediately (5ms). + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + // Paused so we can verify the nudge types safely. + syncer_thread()->RequestPause(); + syncer_thread()->NudgeSyncerWithDataTypes(100, + SyncerThread::kUnknown, + model_types); + EXPECT_TRUE(CompareNudgeTypesBitSetToVault(model_types)); + + model_types[syncable::BOOKMARKS] = false; + model_types[syncable::AUTOFILL] = true; + syncer_thread()->NudgeSyncerWithDataTypes(0, + SyncerThread::kUnknown, + model_types); + + // Reset BOOKMARKS for expectations. + model_types[syncable::BOOKMARKS] = true; + EXPECT_TRUE(CompareNudgeTypesBitSetToVault(model_types)); + + syncer_thread()->RequestResume(); + + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(static_cast<unsigned int>(2), + interceptor.times_sync_occured().size()); + + // SyncerThread should be waiting again. Signal it to stop. + EXPECT_TRUE(syncer_thread()->Stop(2000)); + EXPECT_TRUE(syncer_thread()->vault_.pending_nudge_types_.empty()); +} + +TEST_F(SyncerThreadWithSyncerTest, NudgeWithPayloads) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + // We don't want a poll to happen during this test (except the first one). + PreventThreadFromPolling(); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + interceptor.WaitForSyncShare(1, poll_interval + poll_interval); + EXPECT_EQ(static_cast<unsigned int>(1), + interceptor.times_sync_occured().size()); + + // The SyncerThread should be waiting for the poll now. Nudge it to sync + // immediately (5ms). + syncable::ModelTypePayloadMap nudge_types; + nudge_types[syncable::BOOKMARKS] = "test"; + connection()->ExpectGetUpdatesRequestPayloads(nudge_types); + + // Paused so we can verify the nudge types safely. + syncer_thread()->RequestPause(); + syncer_thread()->NudgeSyncerWithPayloads(5, + SyncerThread::kUnknown, + nudge_types); + EXPECT_TRUE(CompareNudgeTypesToVault(nudge_types)); + syncer_thread()->RequestResume(); + + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(static_cast<unsigned int>(2), + interceptor.times_sync_occured().size()); + + // SyncerThread should be waiting again. Signal it to stop. + EXPECT_TRUE(syncer_thread()->Stop(2000)); + EXPECT_TRUE(syncer_thread()->vault_.pending_nudge_types_.empty()); +} + +TEST_F(SyncerThreadWithSyncerTest, NudgeWithPayloadsCoalesced) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + // We don't want a poll to happen during this test (except the first one). + PreventThreadFromPolling(); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + interceptor.WaitForSyncShare(1, poll_interval + poll_interval); + EXPECT_EQ(static_cast<unsigned int>(1), + interceptor.times_sync_occured().size()); + + // The SyncerThread should be waiting for the poll now. Nudge it to sync + // immediately (5ms). + syncable::ModelTypePayloadMap nudge_types; + nudge_types[syncable::BOOKMARKS] = "books"; + + // Paused so we can verify the nudge types safely. + syncer_thread()->RequestPause(); + syncer_thread()->NudgeSyncerWithPayloads(100, + SyncerThread::kUnknown, + nudge_types); + EXPECT_TRUE(CompareNudgeTypesToVault(nudge_types)); + + nudge_types.erase(syncable::BOOKMARKS); + nudge_types[syncable::AUTOFILL] = "auto"; + syncer_thread()->NudgeSyncerWithPayloads(0, + SyncerThread::kUnknown, + nudge_types); + + // Reset BOOKMARKS for expectations. + nudge_types[syncable::BOOKMARKS] = "books"; + EXPECT_TRUE(CompareNudgeTypesToVault(nudge_types)); + connection()->ExpectGetUpdatesRequestPayloads(nudge_types); + + syncer_thread()->RequestResume(); + + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(static_cast<unsigned int>(2), + interceptor.times_sync_occured().size()); + + // SyncerThread should be waiting again. Signal it to stop. + EXPECT_TRUE(syncer_thread()->Stop(2000)); + EXPECT_TRUE(syncer_thread()->vault_.pending_nudge_types_.empty()); +} + +TEST_F(SyncerThreadWithSyncerTest, Throttling) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + const TimeDelta poll_interval = TimeDelta::FromMilliseconds(10); + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + + // Wait for some healthy syncing. + interceptor.WaitForSyncShare(4, poll_interval + poll_interval); + + // Tell the server to throttle a single request, which should be all it takes + // to silence our syncer (for 2 hours, so we shouldn't hit that in this test). + // This will atomically visit the interceptor so it can switch to throttled + // mode and fail on multiple requests. + connection()->ThrottleNextRequest(&interceptor); + + // Try to trigger a sync (we have a really short poll interval already). + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + + // Wait until the syncer thread reports that it is throttled. Any further + // sync share interceptions will result in failure. If things are broken, + // we may never halt. + ASSERT_TRUE(WaitForThrottle()); + EXPECT_TRUE(syncer_thread()->IsSyncingCurrentlySilenced()); + + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +TEST_F(SyncerThreadWithSyncerTest, StopSyncPermanently) { + // The SyncerThread should request an exit from the Syncer and set + // conditions for termination. + const TimeDelta poll_interval = TimeDelta::FromMilliseconds(10); + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + + ListenerMock listener; + WaitableEvent sync_cycle_ended_event(false, false); + WaitableEvent syncer_thread_exiting_event(false, false); + TestScopedSessionEventListener reg(context_, &listener); + + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::STATUS_CHANGED))). + Times(AnyNumber()); + + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + Times(AnyNumber()). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::STOP_SYNCING_PERMANENTLY))); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_EXITING))). + WillOnce(SignalEvent(&syncer_thread_exiting_event)); + + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + + connection()->set_store_birthday("NotYourLuckyDay"); + ASSERT_TRUE(syncer_thread_exiting_event.TimedWait(max_wait_time_)); + EXPECT_TRUE(syncer_thread()->Stop(0)); +} + +TEST_F(SyncerThreadWithSyncerTest, AuthInvalid) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + const TimeDelta poll_interval = TimeDelta::FromMilliseconds(1); + + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + + // Wait for some healthy syncing. + interceptor.WaitForSyncShare(2, TimeDelta::FromSeconds(10)); + EXPECT_GE(interceptor.times_sync_occured().size(), 2U); + + // Atomically start returning auth invalid and set the interceptor to fail + // on any sync. + connection()->FailWithAuthInvalid(&interceptor); + WaitForDisconnect(); + + // Try to trigger a sync (the interceptor will assert if one occurs). + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + + // Wait several poll intervals but don't expect any syncing besides the cycle + // that lost the connection. + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(1U, interceptor.times_sync_occured().size()); + + // Simulate a valid re-authentication and expect resumption of syncing. + interceptor.Reset(); + ASSERT_TRUE(interceptor.times_sync_occured().empty()); + connection()->StopFailingWithAuthInvalid(NULL); + ServerConnectionEvent e = {ServerConnectionEvent::STATUS_CHANGED, + HttpResponse::SERVER_CONNECTION_OK, + true}; + connection()->channel()->NotifyListeners(e); + + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(10)); + EXPECT_FALSE(interceptor.times_sync_occured().empty()); + + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +// TODO(zea): Disabled, along with PauseWhenNotConnected, due to stalling on +// windows, preventing further sync unit tests from running. See crbug/39070. +TEST_F(SyncerThreadWithSyncerTest, DISABLED_Pause) { + WaitableEvent sync_cycle_ended_event(false, false); + WaitableEvent paused_event(false, false); + WaitableEvent resumed_event(false, false); + PreventThreadFromPolling(); + + ListenerMock listener; + TestScopedSessionEventListener reg(context_, &listener); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::STATUS_CHANGED))). + Times(AnyNumber()); + + // Wait for the initial sync to complete. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_EXITING))); + + ASSERT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + + // Request a pause. + ASSERT_TRUE(Pause(&listener)); + + // Resuming the pause. + ASSERT_TRUE(Resume(&listener)); + + // Not paused, should fail. + EXPECT_FALSE(syncer_thread()->RequestResume()); + + // Request a pause. + ASSERT_TRUE(Pause(&listener)); + + // Nudge the syncer, this should do nothing while we are paused. + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + + // Resuming will cause the nudge to be processed and a sync cycle to run. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + ASSERT_TRUE(Resume(&listener)); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +TEST_F(SyncerThreadWithSyncerTest, StartWhenNotConnected) { + WaitableEvent sync_cycle_ended_event(false, false); + WaitableEvent event(false, false); + ListenerMock listener; + TestScopedSessionEventListener reg(context_, &listener); + PreventThreadFromPolling(); + + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::STATUS_CHANGED))). + Times(AnyNumber()); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_EXITING))); + + connection()->SetServerNotReachable(); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + + // Syncer thread will always go through once cycle at the start, + // then it will wait for a connection. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION))). + WillOnce(SignalEvent(&event)); + ASSERT_TRUE(syncer_thread()->Start()); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + ASSERT_TRUE(event.TimedWait(max_wait_time_)); + + // Connect, will put the syncer thread into its usually poll wait. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_CONNECTED))). + WillOnce(SignalEvent(&event)); + connection()->SetServerReachable(); + ASSERT_TRUE(event.TimedWait(max_wait_time_)); + + // Nudge the syncer to complete a cycle. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +// See TODO comment on the "Pause" test above. +TEST_F(SyncerThreadWithSyncerTest, DISABLED_PauseWhenNotConnected) { + WaitableEvent sync_cycle_ended_event(false, false); + WaitableEvent event(false, false); + ListenerMock listener; + TestScopedSessionEventListener reg(context_, &listener); + PreventThreadFromPolling(); + + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::STATUS_CHANGED))). + Times(AnyNumber()); + + // Put the thread into a "waiting for connection" state. + connection()->SetServerNotReachable(); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION))). + WillOnce(SignalEvent(&event)); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + + ASSERT_TRUE(syncer_thread()->Start()); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + ASSERT_TRUE(event.TimedWait(max_wait_time_)); + + // Pause and resume the thread while waiting for a connection. + ASSERT_TRUE(Pause(&listener)); + ASSERT_TRUE(Resume(&listener)); + + // Make a connection and let the syncer cycle. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_CONNECTED))). + WillOnce(SignalEvent(&event)); + connection()->SetServerReachable(); + ASSERT_TRUE(event.TimedWait(max_wait_time_)); + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + + // Disconnect and get into the waiting for a connection state. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_WAITING_FOR_CONNECTION))). + WillOnce(SignalEvent(&event)); + connection()->SetServerNotReachable(); + ASSERT_TRUE(event.TimedWait(max_wait_time_)); + + // Pause so we can test getting a connection while paused. + ASSERT_TRUE(Pause(&listener)); + + // Get a connection then resume. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_CONNECTED))). + WillOnce(SignalEvent(&event)); + connection()->SetServerReachable(); + ASSERT_TRUE(event.TimedWait(max_wait_time_)); + + ASSERT_TRUE(Resume(&listener)); + + // Cycle the syncer to show we are not longer paused. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_EXITING))); + + syncer_thread()->NudgeSyncer(0, SyncerThread::kUnknown); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +TEST_F(SyncerThreadWithSyncerTest, PauseResumeWhenNotRunning) { + WaitableEvent sync_cycle_ended_event(false, false); + WaitableEvent event(false, false); + ListenerMock listener; + TestScopedSessionEventListener reg(context_, &listener); + PreventThreadFromPolling(); + + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::STATUS_CHANGED))). + Times(AnyNumber()); + + // Pause and resume the syncer while not running + ASSERT_TRUE(Pause(&listener)); + ASSERT_TRUE(Resume(&listener)); + + // Pause the thread then start the syncer. + ASSERT_TRUE(Pause(&listener)); + metadb()->Open(); + syncer_thread()->CreateSyncer(metadb()->name()); + ASSERT_TRUE(syncer_thread()->Start()); + + // Resume and let the syncer cycle. + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNC_CYCLE_ENDED))). + WillOnce(SignalEvent(&sync_cycle_ended_event)); + EXPECT_CALL(listener, OnSyncEngineEvent( + Field(&SyncEngineEvent::what_happened, + SyncEngineEvent::SYNCER_THREAD_EXITING))); + + ASSERT_TRUE(Resume(&listener)); + ASSERT_TRUE(sync_cycle_ended_event.TimedWait(max_wait_time_)); + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + +} // namespace browser_sync diff --git a/chrome/browser/sync/engine/syncer_types.h b/chrome/browser/sync/engine/syncer_types.h index c4612c6..50f079f 100644 --- a/chrome/browser/sync/engine/syncer_types.h +++ b/chrome/browser/sync/engine/syncer_types.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -83,6 +83,33 @@ struct SyncEngineEvent { SYNC_CYCLE_ENDED, //////////////////////////////////////////////////////////////// + // SyncerThread generated events. + + // This event is sent when the thread is paused in response to a + // pause request. + // TODO(tim): Deprecated. + SYNCER_THREAD_PAUSED, + + // This event is sent when the thread is resumed in response to a + // resume request. + // TODO(tim): Deprecated. + SYNCER_THREAD_RESUMED, + + // This event is sent when the thread is waiting for a connection + // to be established. + // TODO(tim): Deprecated. + SYNCER_THREAD_WAITING_FOR_CONNECTION, + + // This event is sent when a connection has been established and + // the thread continues. + // TODO(tim): Deprecated. + SYNCER_THREAD_CONNECTED, + + // Sent when the main syncer loop finishes. + // TODO(tim): Deprecated. + SYNCER_THREAD_EXITING, + + //////////////////////////////////////////////////////////////// // Generated in response to specific protocol actions or events. // New token in updated_token. diff --git a/chrome/browser/sync/glue/data_type_manager.h b/chrome/browser/sync/glue/data_type_manager.h index 0a42660..4bce344 100644 --- a/chrome/browser/sync/glue/data_type_manager.h +++ b/chrome/browser/sync/glue/data_type_manager.h @@ -25,10 +25,15 @@ class DataTypeManager { // complete the initial download of new data // types. + // TODO(tim): Deprecated. Bug 26339. + PAUSE_PENDING, // Waiting for the sync backend to pause. CONFIGURING, // Data types are being started. BLOCKED, // We can't move forward with configuration because some // external action must take place (i.e. passphrase). + // TODO(tim): Deprecated. Bug 26339. + RESUME_PENDING, // Waiting for the sync backend to resume. + CONFIGURED, // All enabled data types are running. STOPPING // Data types are being stopped. }; diff --git a/chrome/browser/sync/glue/data_type_manager_impl.cc b/chrome/browser/sync/glue/data_type_manager_impl.cc index f8aefed..2514c3e 100644 --- a/chrome/browser/sync/glue/data_type_manager_impl.cc +++ b/chrome/browser/sync/glue/data_type_manager_impl.cc @@ -2,14 +2,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "chrome/browser/sync/glue/data_type_manager_impl.h" - #include <algorithm> #include <functional> -#include "base/compiler_specific.h" #include "base/logging.h" +#include "base/message_loop.h" +#include "base/task.h" #include "chrome/browser/sync/glue/data_type_controller.h" +#include "chrome/browser/sync/glue/data_type_manager_impl.h" #include "chrome/browser/sync/glue/sync_backend_host.h" #include "content/browser/browser_thread.h" #include "content/common/notification_details.h" @@ -21,23 +21,16 @@ namespace browser_sync { namespace { static const syncable::ModelType kStartOrder[] = { - syncable::NIGORI, // Listed for completeness. syncable::BOOKMARKS, syncable::PREFERENCES, syncable::AUTOFILL, syncable::AUTOFILL_PROFILE, - syncable::EXTENSIONS, - syncable::APPS, syncable::THEMES, syncable::TYPED_URLS, syncable::PASSWORDS, syncable::SESSIONS, }; -COMPILE_ASSERT(arraysize(kStartOrder) == - syncable::MODEL_TYPE_COUNT - syncable::FIRST_REAL_MODEL_TYPE, - kStartOrder_IncorrectSize); - // Comparator used when sorting data type controllers. class SortComparator : public std::binary_function<DataTypeController*, DataTypeController*, @@ -57,14 +50,18 @@ class SortComparator : public std::binary_function<DataTypeController*, } // namespace -DataTypeManagerImpl::DataTypeManagerImpl(SyncBackendHost* backend, +DataTypeManagerImpl::DataTypeManagerImpl( + SyncBackendHost* backend, const DataTypeController::TypeMap& controllers) : backend_(backend), controllers_(controllers), state_(DataTypeManager::STOPPED), + syncer_paused_(false), needs_reconfigure_(false), - method_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { + ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(backend_); + DCHECK_GT(arraysize(kStartOrder), 0U); // Ensure all data type controllers are stopped. for (DataTypeController::TypeMap::const_iterator it = controllers_.begin(); it != controllers_.end(); ++it) { @@ -76,7 +73,8 @@ DataTypeManagerImpl::DataTypeManagerImpl(SyncBackendHost* backend, start_order_[kStartOrder[i]] = i; } -DataTypeManagerImpl::~DataTypeManagerImpl() {} +DataTypeManagerImpl::~DataTypeManagerImpl() { +} bool DataTypeManagerImpl::GetControllersNeedingStart( std::vector<DataTypeController*>* needs_start) { @@ -149,6 +147,7 @@ void DataTypeManagerImpl::Configure(const TypeSet& desired_types) { } void DataTypeManagerImpl::Restart() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); VLOG(1) << "Restarting..."; DCHECK(state_ == STOPPED || state_ == CONFIGURED || state_ == BLOCKED); @@ -172,13 +171,50 @@ void DataTypeManagerImpl::Restart() { controllers_, last_requested_types_, method_factory_.NewRunnableMethod(&DataTypeManagerImpl::DownloadReady)); + + // If there were new types needing download, a nudge will have been sent and + // we should be in DOWNLOAD_PENDING. In that case we start the syncer thread + // (which is idempotent) to fetch these updates. + // However, we could actually be in PAUSE_PENDING here as if no new types + // were needed, our DownloadReady callback will have fired and we will have + // requested a pause already (so starting the syncer thread will still not + // let it make forward progress as the pause needs to be resumed by us). + // Because both the pause and start syncing commands are posted FCFS to the + // core thread, there is no race between the pause and the start; the pause + // will always win, so we will always start paused if we don't need to + // download new types. Thus, in almost all cases, the syncer thread DOES NOT + // start before model association. But... + // + // TODO(tim): Bug 47957. There is still subtle badness here. If we just + // restarted the browser and were upgraded in between, we may be in a state + // where a bunch of data types do have initial sync ended, but a new guy + // does not. In this case, what we really want is to _only_ download updates + // for that new type and not the ones that have already finished and we've + // presumably associated before. What happens now is the syncer is nudged + // and it does a sync from timestamp 0 for only the new types, and sends the + // OnSyncCycleCompleted event, which is how we get the DownloadReady call. + // We request a pause at this point, but it is done asynchronously. So in + // theory, the syncer could charge forward with another sync (for _all_ + // types) before the pause is serviced, which could be bad for associating + // models as we'll merge the present cloud with the immediate past, which + // opens the door to bugs like "bookmark came back from dead". A lot more + // stars have to align now for this to happen, but it's still there. + backend_->StartSyncingWithServer(); } void DataTypeManagerImpl::DownloadReady() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); DCHECK(state_ == DOWNLOAD_PENDING); - state_ = CONFIGURING; - StartNextType(); + if (syncer_paused_) { + state_ = CONFIGURING; + StartNextType(); + return; + } + + // Pause the sync backend before starting the data types. + state_ = PAUSE_PENDING; + PauseSyncer(); } void DataTypeManagerImpl::StartNextType() { @@ -200,7 +236,7 @@ void DataTypeManagerImpl::StartNextType() { // the most recent set of desired types, so we just call configure. // Note: we do this whether or not GetControllersNeedingStart is true, // because we may need to stop datatypes. - SetBlockedAndNotify(); + state_ = BLOCKED; needs_reconfigure_ = false; VLOG(1) << "Reconfiguring due to previous configure attempt occuring while" << " busy."; @@ -212,13 +248,14 @@ void DataTypeManagerImpl::StartNextType() { // things like encryption, which may still need to be sorted out before we // can announce we're "Done" configuration entirely. if (GetControllersNeedingStart(NULL)) { - SetBlockedAndNotify(); + state_ = BLOCKED; return; } - // If no more data types need starting, we're done. - state_ = CONFIGURED; - NotifyDone(OK, FROM_HERE); + // If no more data types need starting, we're done. Resume the sync backend + // to finish. + state_ = RESUME_PENDING; + ResumeSyncer(); } void DataTypeManagerImpl::TypeStartCallback( @@ -228,35 +265,40 @@ void DataTypeManagerImpl::TypeStartCallback( // on the UI thread. DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); - if (state_ == STOPPING) { - // If we reach this callback while stopping, this means that - // DataTypeManager::Stop() was called while the current data type - // was starting. Now that it has finished starting, we can finish - // stopping the DataTypeManager. This is considered an ABORT. - FinishStopAndNotify(ABORTED, FROM_HERE); - return; - } else if (state_ == STOPPED) { - // If our state_ is STOPPED, we have already stopped all of the data - // types. We should not be getting callbacks from stopped data types. - LOG(ERROR) << "Start callback called by stopped data type!"; - return; - } - // We're done with the data type at the head of the list -- remove it. DataTypeController* started_dtc = needs_start_[0]; DCHECK(needs_start_.size()); DCHECK_EQ(needs_start_[0], started_dtc); needs_start_.erase(needs_start_.begin()); - if (result == DataTypeController::NEEDS_CRYPTO) { + // If we reach this callback while stopping, this means that + // DataTypeManager::Stop() was called while the current data type + // was starting. Now that it has finished starting, we can finish + // stopping the DataTypeManager. This is considered an ABORT. + if (state_ == STOPPING) { + FinishStopAndNotify(ABORTED, FROM_HERE); + return; + } + // If our state_ is STOPPED, we have already stopped all of the data + // types. We should not be getting callbacks from stopped data types. + if (state_ == STOPPED) { + LOG(ERROR) << "Start callback called by stopped data type!"; + return; } - // If the type started normally, continue to the next type. + // If the type is waiting for the cryptographer, continue to the next type. // Once the cryptographer is ready, we'll attempt to restart this type. - if (result == DataTypeController::NEEDS_CRYPTO || - result == DataTypeController::OK || + if (result == DataTypeController::NEEDS_CRYPTO) { + VLOG(1) << "Waiting for crypto " << started_dtc->name(); + StartNextType(); + return; + } + + // If the type started normally, continue to the next type. + if (result == DataTypeController::OK || result == DataTypeController::OK_FIRST_RUN) { + VLOG(1) << "Started " << started_dtc->name(); StartNextType(); return; } @@ -305,21 +347,49 @@ void DataTypeManagerImpl::Stop() { return; } - const bool download_pending = state_ == DOWNLOAD_PENDING; - state_ = STOPPING; - if (download_pending) { - // If Stop() is called while waiting for download, cancel all - // outstanding tasks. + // If Stop() is called while waiting for pause or resume, we no + // longer care about this. + bool aborted = false; + if (state_ == PAUSE_PENDING) { + RemoveObserver(NotificationType::SYNC_PAUSED); + aborted = true; + } + if (state_ == RESUME_PENDING) { + RemoveObserver(NotificationType::SYNC_RESUMED); + aborted = true; + } + + // If Stop() is called while waiting for download, cancel all + // outstanding tasks. + if (state_ == DOWNLOAD_PENDING) { method_factory_.RevokeAll(); - FinishStopAndNotify(ABORTED, FROM_HERE); - return; + aborted = true; } - FinishStop(); + state_ = STOPPING; + if (aborted) + FinishStopAndNotify(ABORTED, FROM_HERE); + else + FinishStop(); +} + +const DataTypeController::TypeMap& DataTypeManagerImpl::controllers() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + return controllers_; +} + +DataTypeManager::State DataTypeManagerImpl::state() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + return state_; } void DataTypeManagerImpl::FinishStop() { - DCHECK(state_== CONFIGURING || state_ == STOPPING || state_ == BLOCKED); + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + DCHECK(state_== CONFIGURING || + state_ == STOPPING || + state_ == PAUSE_PENDING || + state_ == RESUME_PENDING || + state_ == BLOCKED); // Simply call the Stop() method on all running data types. for (DataTypeController::TypeMap::const_iterator it = controllers_.begin(); it != controllers_.end(); ++it) { @@ -335,11 +405,70 @@ void DataTypeManagerImpl::FinishStop() { void DataTypeManagerImpl::FinishStopAndNotify(ConfigureResult result, const tracked_objects::Location& location) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); FinishStop(); NotifyDone(result, location); } +void DataTypeManagerImpl::Observe(NotificationType type, + const NotificationSource& source, + const NotificationDetails& details) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + switch (type.value) { + case NotificationType::SYNC_PAUSED: + DCHECK(state_ == PAUSE_PENDING); + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + syncer_paused_ = true; + RemoveObserver(NotificationType::SYNC_PAUSED); + + state_ = CONFIGURING; + StartNextType(); + break; + case NotificationType::SYNC_RESUMED: + DCHECK(state_ == RESUME_PENDING); + RemoveObserver(NotificationType::SYNC_RESUMED); + syncer_paused_ = false; + + if (needs_reconfigure_) { + // An attempt was made to reconfigure while we were already configuring. + // This can be because a passphrase was accepted or the user changed the + // set of desired types. Either way, |last_requested_types_| will + // contain the most recent set of desired types, so we just call + // configure. + // Note: we do this whether or not GetControllersNeedingStart is true, + // because we may need to stop datatypes. + state_ = BLOCKED; + needs_reconfigure_ = false; + VLOG(1) << "Reconfiguring due to previous configure attempt occuring " + << " while busy."; + Configure(last_requested_types_); + return; + } + + state_ = CONFIGURED; + NotifyDone(OK, FROM_HERE); + break; + default: + NOTREACHED(); + } +} + +void DataTypeManagerImpl::AddObserver(NotificationType type) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + notification_registrar_.Add(this, + type, + NotificationService::AllSources()); +} + +void DataTypeManagerImpl::RemoveObserver(NotificationType type) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + notification_registrar_.Remove(this, + type, + NotificationService::AllSources()); +} + void DataTypeManagerImpl::NotifyStart() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); NotificationService::current()->Notify( NotificationType::SYNC_CONFIGURE_START, Source<DataTypeManager>(this), @@ -348,6 +477,7 @@ void DataTypeManagerImpl::NotifyStart() { void DataTypeManagerImpl::NotifyDone(ConfigureResult result, const tracked_objects::Location& location) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); ConfigureResultWithErrorLocation result_with_location(result, location); NotificationService::current()->Notify( NotificationType::SYNC_CONFIGURE_DONE, @@ -355,20 +485,22 @@ void DataTypeManagerImpl::NotifyDone(ConfigureResult result, Details<ConfigureResultWithErrorLocation>(&result_with_location)); } -const DataTypeController::TypeMap& DataTypeManagerImpl::controllers() { - return controllers_; -} - -DataTypeManager::State DataTypeManagerImpl::state() { - return state_; +void DataTypeManagerImpl::ResumeSyncer() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + AddObserver(NotificationType::SYNC_RESUMED); + if (!backend_->RequestResume()) { + RemoveObserver(NotificationType::SYNC_RESUMED); + FinishStopAndNotify(UNRECOVERABLE_ERROR, FROM_HERE); + } } -void DataTypeManagerImpl::SetBlockedAndNotify() { - state_ = BLOCKED; - NotificationService::current()->Notify( - NotificationType::SYNC_CONFIGURE_BLOCKED, - Source<DataTypeManager>(this), - NotificationService::NoDetails()); +void DataTypeManagerImpl::PauseSyncer() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + AddObserver(NotificationType::SYNC_PAUSED); + if (!backend_->RequestPause()) { + RemoveObserver(NotificationType::SYNC_PAUSED); + FinishStopAndNotify(UNRECOVERABLE_ERROR, FROM_HERE); + } } } // namespace browser_sync diff --git a/chrome/browser/sync/glue/data_type_manager_impl.h b/chrome/browser/sync/glue/data_type_manager_impl.h index 841af2b..0f23aed 100644 --- a/chrome/browser/sync/glue/data_type_manager_impl.h +++ b/chrome/browser/sync/glue/data_type_manager_impl.h @@ -13,24 +13,39 @@ #include "base/basictypes.h" #include "base/task.h" +#include "content/common/notification_observer.h" +#include "content/common/notification_registrar.h" +#include "content/common/notification_type.h" + +class NotificationSource; +class NotificationDetails; namespace browser_sync { class DataTypeController; class SyncBackendHost; -class DataTypeManagerImpl : public DataTypeManager { +class DataTypeManagerImpl : public DataTypeManager, + public NotificationObserver { public: DataTypeManagerImpl(SyncBackendHost* backend, - const DataTypeController::TypeMap& controllers); + const DataTypeController::TypeMap& controllers); virtual ~DataTypeManagerImpl(); // DataTypeManager interface. virtual void Configure(const TypeSet& desired_types); + virtual void Stop(); + virtual const DataTypeController::TypeMap& controllers(); + virtual State state(); + // NotificationObserver implementation. + virtual void Observe(NotificationType type, + const NotificationSource& source, + const NotificationDetails& details); + private: // Starts the next data type in the kStartOrder list, indicated by // the current_type_ member. If there are no more data types to @@ -39,12 +54,12 @@ class DataTypeManagerImpl : public DataTypeManager { // Callback passed to each data type controller on startup. void TypeStartCallback(DataTypeController::StartResult result, - const tracked_objects::Location& from_here); + const tracked_objects::Location& location); // Stops all data types. void FinishStop(); void FinishStopAndNotify(ConfigureResult result, - const tracked_objects::Location& location); + const tracked_objects::Location& location); // Returns true if any last_requested_types_ currently needs to start model // association. If non-null, fills |needs_start| with all such controllers. @@ -53,10 +68,13 @@ class DataTypeManagerImpl : public DataTypeManager { void Restart(); void DownloadReady(); + void AddObserver(NotificationType type); + void RemoveObserver(NotificationType type); void NotifyStart(); void NotifyDone(ConfigureResult result, - const tracked_objects::Location& location); - void SetBlockedAndNotify(); + const tracked_objects::Location& location); + void ResumeSyncer(); + void PauseSyncer(); SyncBackendHost* backend_; // Map of all data type controllers that are available for sync. @@ -68,10 +86,14 @@ class DataTypeManagerImpl : public DataTypeManager { std::vector<DataTypeController*> needs_start_; std::vector<DataTypeController*> needs_stop_; + // Whether we've observed a SYNC_PAUSED but not SYNC_RESUMED. + bool syncer_paused_; + // Whether an attempt to reconfigure was made while we were busy configuring. // The |last_requested_types_| will reflect the newest set of requested types. bool needs_reconfigure_; + NotificationRegistrar notification_registrar_; ScopedRunnableMethodFactory<DataTypeManagerImpl> method_factory_; DISALLOW_COPY_AND_ASSIGN(DataTypeManagerImpl); diff --git a/chrome/browser/sync/glue/data_type_manager_impl2.cc b/chrome/browser/sync/glue/data_type_manager_impl2.cc new file mode 100644 index 0000000..58a66b4 --- /dev/null +++ b/chrome/browser/sync/glue/data_type_manager_impl2.cc @@ -0,0 +1,363 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/glue/data_type_manager_impl2.h" + +#include <algorithm> +#include <functional> + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "chrome/browser/sync/glue/data_type_controller.h" +#include "chrome/browser/sync/glue/sync_backend_host.h" +#include "content/browser/browser_thread.h" +#include "content/common/notification_details.h" +#include "content/common/notification_service.h" +#include "content/common/notification_source.h" + +namespace browser_sync { + +namespace { + +static const syncable::ModelType kStartOrder[] = { + syncable::NIGORI, // Listed for completeness. + syncable::BOOKMARKS, + syncable::PREFERENCES, + syncable::AUTOFILL, + syncable::AUTOFILL_PROFILE, + syncable::EXTENSIONS, + syncable::APPS, + syncable::THEMES, + syncable::TYPED_URLS, + syncable::PASSWORDS, + syncable::SESSIONS, +}; + +COMPILE_ASSERT(arraysize(kStartOrder) == + syncable::MODEL_TYPE_COUNT - syncable::FIRST_REAL_MODEL_TYPE, + kStartOrder_IncorrectSize); + +// Comparator used when sorting data type controllers. +class SortComparator : public std::binary_function<DataTypeController*, + DataTypeController*, + bool> { + public: + explicit SortComparator(std::map<syncable::ModelType, int>* order) + : order_(order) { } + + // Returns true if lhs precedes rhs. + bool operator() (DataTypeController* lhs, DataTypeController* rhs) { + return (*order_)[lhs->type()] < (*order_)[rhs->type()]; + } + + private: + std::map<syncable::ModelType, int>* order_; +}; + +} // namespace + +DataTypeManagerImpl2::DataTypeManagerImpl2(SyncBackendHost* backend, + const DataTypeController::TypeMap& controllers) + : backend_(backend), + controllers_(controllers), + state_(DataTypeManager::STOPPED), + needs_reconfigure_(false), + method_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { + DCHECK(backend_); + // Ensure all data type controllers are stopped. + for (DataTypeController::TypeMap::const_iterator it = controllers_.begin(); + it != controllers_.end(); ++it) { + DCHECK_EQ(DataTypeController::NOT_RUNNING, (*it).second->state()); + } + + // Build a ModelType -> order map for sorting. + for (int i = 0; i < static_cast<int>(arraysize(kStartOrder)); i++) + start_order_[kStartOrder[i]] = i; +} + +DataTypeManagerImpl2::~DataTypeManagerImpl2() {} + +bool DataTypeManagerImpl2::GetControllersNeedingStart( + std::vector<DataTypeController*>* needs_start) { + // Add any data type controllers into the needs_start_ list that are + // currently NOT_RUNNING or STOPPING. + bool found_any = false; + for (TypeSet::const_iterator it = last_requested_types_.begin(); + it != last_requested_types_.end(); ++it) { + DataTypeController::TypeMap::const_iterator dtc = controllers_.find(*it); + if (dtc != controllers_.end() && + (dtc->second->state() == DataTypeController::NOT_RUNNING || + dtc->second->state() == DataTypeController::STOPPING)) { + found_any = true; + if (needs_start) + needs_start->push_back(dtc->second.get()); + } + } + return found_any; +} + +void DataTypeManagerImpl2::Configure(const TypeSet& desired_types) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + if (state_ == STOPPING) { + // You can not set a configuration while stopping. + LOG(ERROR) << "Configuration set while stopping."; + return; + } + + last_requested_types_ = desired_types; + // Only proceed if we're in a steady state or blocked. + if (state_ != STOPPED && state_ != CONFIGURED && state_ != BLOCKED) { + VLOG(1) << "Received configure request while configuration in flight. " + << "Postponing until current configuration complete."; + needs_reconfigure_ = true; + return; + } + + needs_start_.clear(); + GetControllersNeedingStart(&needs_start_); + // Sort these according to kStartOrder. + std::sort(needs_start_.begin(), + needs_start_.end(), + SortComparator(&start_order_)); + + // Add any data type controllers into that needs_stop_ list that are + // currently MODEL_STARTING, ASSOCIATING, or RUNNING. + needs_stop_.clear(); + for (DataTypeController::TypeMap::const_iterator it = controllers_.begin(); + it != controllers_.end(); ++it) { + DataTypeController* dtc = (*it).second; + if (desired_types.count(dtc->type()) == 0 && ( + dtc->state() == DataTypeController::MODEL_STARTING || + dtc->state() == DataTypeController::ASSOCIATING || + dtc->state() == DataTypeController::RUNNING)) { + needs_stop_.push_back(dtc); + VLOG(1) << "Will stop " << dtc->name(); + } + } + // Sort these according to kStartOrder. + std::sort(needs_stop_.begin(), + needs_stop_.end(), + SortComparator(&start_order_)); + + // Restart to start/stop data types and notify the backend that the + // desired types have changed (need to do this even if there aren't any + // types to start/stop, because it could be that some types haven't + // started due to crypto errors but the backend host needs to know that we're + // disabling them anyway). + Restart(); +} + +void DataTypeManagerImpl2::Restart() { + VLOG(1) << "Restarting..."; + + DCHECK(state_ == STOPPED || state_ == CONFIGURED || state_ == BLOCKED); + + // Starting from a "steady state" (stopped or configured) state + // should send a start notification. + if (state_ == STOPPED || state_ == CONFIGURED) + NotifyStart(); + + // Stop requested data types. + for (size_t i = 0; i < needs_stop_.size(); ++i) { + VLOG(1) << "Stopping " << needs_stop_[i]->name(); + needs_stop_[i]->Stop(); + } + needs_stop_.clear(); + + // Tell the backend about the new set of data types we wish to sync. + // The task will be invoked when updates are downloaded. + state_ = DOWNLOAD_PENDING; + backend_->ConfigureDataTypes( + controllers_, + last_requested_types_, + method_factory_.NewRunnableMethod(&DataTypeManagerImpl2::DownloadReady)); +} + +void DataTypeManagerImpl2::DownloadReady() { + DCHECK(state_ == DOWNLOAD_PENDING); + + state_ = CONFIGURING; + StartNextType(); +} + +void DataTypeManagerImpl2::StartNextType() { + // If there are any data types left to start, start the one at the + // front of the list. + if (!needs_start_.empty()) { + VLOG(1) << "Starting " << needs_start_[0]->name(); + needs_start_[0]->Start( + NewCallback(this, &DataTypeManagerImpl2::TypeStartCallback)); + return; + } + + DCHECK_EQ(state_, CONFIGURING); + + if (needs_reconfigure_) { + // An attempt was made to reconfigure while we were already configuring. + // This can be because a passphrase was accepted or the user changed the + // set of desired types. Either way, |last_requested_types_| will contain + // the most recent set of desired types, so we just call configure. + // Note: we do this whether or not GetControllersNeedingStart is true, + // because we may need to stop datatypes. + state_ = BLOCKED; + needs_reconfigure_ = false; + VLOG(1) << "Reconfiguring due to previous configure attempt occuring while" + << " busy."; + Configure(last_requested_types_); + return; + } + + // Do a fresh calculation to see if controllers need starting to account for + // things like encryption, which may still need to be sorted out before we + // can announce we're "Done" configuration entirely. + if (GetControllersNeedingStart(NULL)) { + state_ = BLOCKED; + return; + } + + // If no more data types need starting, we're done. + state_ = CONFIGURED; + NotifyDone(OK, FROM_HERE); +} + +void DataTypeManagerImpl2::TypeStartCallback( + DataTypeController::StartResult result, + const tracked_objects::Location& location) { + // When the data type controller invokes this callback, it must be + // on the UI thread. + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + + if (state_ == STOPPING) { + // If we reach this callback while stopping, this means that + // DataTypeManager::Stop() was called while the current data type + // was starting. Now that it has finished starting, we can finish + // stopping the DataTypeManager. This is considered an ABORT. + FinishStopAndNotify(ABORTED, FROM_HERE); + return; + } else if (state_ == STOPPED) { + // If our state_ is STOPPED, we have already stopped all of the data + // types. We should not be getting callbacks from stopped data types. + LOG(ERROR) << "Start callback called by stopped data type!"; + return; + } + + // We're done with the data type at the head of the list -- remove it. + DataTypeController* started_dtc = needs_start_[0]; + DCHECK(needs_start_.size()); + DCHECK_EQ(needs_start_[0], started_dtc); + needs_start_.erase(needs_start_.begin()); + + // If the type started normally, continue to the next type. + // If the type is waiting for the cryptographer, continue to the next type. + // Once the cryptographer is ready, we'll attempt to restart this type. + if (result == DataTypeController::NEEDS_CRYPTO || + result == DataTypeController::OK || + result == DataTypeController::OK_FIRST_RUN) { + StartNextType(); + return; + } + + // Any other result is a fatal error. Shut down any types we've + // managed to start up to this point and pass the result to the + // callback. + VLOG(1) << "Failed " << started_dtc->name(); + ConfigureResult configure_result = DataTypeManager::ABORTED; + switch (result) { + case DataTypeController::ABORTED: + configure_result = DataTypeManager::ABORTED; + break; + case DataTypeController::ASSOCIATION_FAILED: + configure_result = DataTypeManager::ASSOCIATION_FAILED; + break; + case DataTypeController::UNRECOVERABLE_ERROR: + configure_result = DataTypeManager::UNRECOVERABLE_ERROR; + break; + default: + NOTREACHED(); + break; + } + FinishStopAndNotify(configure_result, location); +} + +void DataTypeManagerImpl2::Stop() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); + if (state_ == STOPPED) + return; + + // If we are currently configuring, then the current type is in a + // partially started state. Abort the startup of the current type, + // which will synchronously invoke the start callback. + if (state_ == CONFIGURING) { + state_ = STOPPING; + + DCHECK_LT(0U, needs_start_.size()); + needs_start_[0]->Stop(); + + // By this point, the datatype should have invoked the start callback, + // triggering FinishStop to be called, and the state to reach STOPPED. If we + // aren't STOPPED, it means that a datatype controller didn't call the start + // callback appropriately. + DCHECK_EQ(STOPPED, state_); + return; + } + + const bool download_pending = state_ == DOWNLOAD_PENDING; + state_ = STOPPING; + if (download_pending) { + // If Stop() is called while waiting for download, cancel all + // outstanding tasks. + method_factory_.RevokeAll(); + FinishStopAndNotify(ABORTED, FROM_HERE); + return; + } + + FinishStop(); +} + +void DataTypeManagerImpl2::FinishStop() { + DCHECK(state_== CONFIGURING || state_ == STOPPING || state_ == BLOCKED); + // Simply call the Stop() method on all running data types. + for (DataTypeController::TypeMap::const_iterator it = controllers_.begin(); + it != controllers_.end(); ++it) { + DataTypeController* dtc = (*it).second; + if (dtc->state() != DataTypeController::NOT_RUNNING && + dtc->state() != DataTypeController::STOPPING) { + dtc->Stop(); + VLOG(1) << "Stopped " << dtc->name(); + } + } + state_ = STOPPED; +} + +void DataTypeManagerImpl2::FinishStopAndNotify(ConfigureResult result, + const tracked_objects::Location& location) { + FinishStop(); + NotifyDone(result, location); +} + +void DataTypeManagerImpl2::NotifyStart() { + NotificationService::current()->Notify( + NotificationType::SYNC_CONFIGURE_START, + Source<DataTypeManager>(this), + NotificationService::NoDetails()); +} + +void DataTypeManagerImpl2::NotifyDone(ConfigureResult result, + const tracked_objects::Location& location) { + ConfigureResultWithErrorLocation result_with_location(result, location); + NotificationService::current()->Notify( + NotificationType::SYNC_CONFIGURE_DONE, + Source<DataTypeManager>(this), + Details<ConfigureResultWithErrorLocation>(&result_with_location)); +} + +const DataTypeController::TypeMap& DataTypeManagerImpl2::controllers() { + return controllers_; +} + +DataTypeManager::State DataTypeManagerImpl2::state() { + return state_; +} + +} // namespace browser_sync diff --git a/chrome/browser/sync/glue/data_type_manager_impl2.h b/chrome/browser/sync/glue/data_type_manager_impl2.h new file mode 100644 index 0000000..1db2908 --- /dev/null +++ b/chrome/browser/sync/glue/data_type_manager_impl2.h @@ -0,0 +1,81 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROME_BROWSER_SYNC_GLUE_DATA_TYPE_MANAGER_IMPL2_H__ +#define CHROME_BROWSER_SYNC_GLUE_DATA_TYPE_MANAGER_IMPL2_H__ +#pragma once + +#include "chrome/browser/sync/glue/data_type_manager.h" + +#include <map> +#include <vector> + +#include "base/basictypes.h" +#include "base/task.h" + +namespace browser_sync { + +class DataTypeController; +class SyncBackendHost; + +class DataTypeManagerImpl2 : public DataTypeManager { + public: + DataTypeManagerImpl2(SyncBackendHost* backend, + const DataTypeController::TypeMap& controllers); + virtual ~DataTypeManagerImpl2(); + + // DataTypeManager interface. + virtual void Configure(const TypeSet& desired_types); + virtual void Stop(); + virtual const DataTypeController::TypeMap& controllers(); + virtual State state(); + + private: + // Starts the next data type in the kStartOrder list, indicated by + // the current_type_ member. If there are no more data types to + // start, the stashed start_callback_ is invoked. + void StartNextType(); + + // Callback passed to each data type controller on startup. + void TypeStartCallback(DataTypeController::StartResult result, + const tracked_objects::Location& from_here); + + // Stops all data types. + void FinishStop(); + void FinishStopAndNotify(ConfigureResult result, + const tracked_objects::Location& location); + + // Returns true if any last_requested_types_ currently needs to start model + // association. If non-null, fills |needs_start| with all such controllers. + bool GetControllersNeedingStart( + std::vector<DataTypeController*>* needs_start); + + void Restart(); + void DownloadReady(); + void NotifyStart(); + void NotifyDone(ConfigureResult result, + const tracked_objects::Location& location); + + SyncBackendHost* backend_; + // Map of all data type controllers that are available for sync. + // This list is determined at startup by various command line flags. + const DataTypeController::TypeMap controllers_; + State state_; + std::map<syncable::ModelType, int> start_order_; + TypeSet last_requested_types_; + std::vector<DataTypeController*> needs_start_; + std::vector<DataTypeController*> needs_stop_; + + // Whether an attempt to reconfigure was made while we were busy configuring. + // The |last_requested_types_| will reflect the newest set of requested types. + bool needs_reconfigure_; + + ScopedRunnableMethodFactory<DataTypeManagerImpl2> method_factory_; + + DISALLOW_COPY_AND_ASSIGN(DataTypeManagerImpl2); +}; + +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_GLUE_DATA_TYPE_MANAGER_IMPL2_H__ diff --git a/chrome/browser/sync/glue/data_type_manager_impl2_unittest.cc b/chrome/browser/sync/glue/data_type_manager_impl2_unittest.cc index 9c6c690..f7bf741 100644 --- a/chrome/browser/sync/glue/data_type_manager_impl2_unittest.cc +++ b/chrome/browser/sync/glue/data_type_manager_impl2_unittest.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "chrome/browser/sync/glue/data_type_manager_impl.h" +#include "chrome/browser/sync/glue/data_type_manager_impl2.h" #include <set> @@ -25,7 +25,7 @@ #include "testing/gtest/include/gtest/gtest.h" using browser_sync::DataTypeManager; -using browser_sync::DataTypeManagerImpl; +using browser_sync::DataTypeManagerImpl2; using browser_sync::DataTypeController; using browser_sync::DataTypeControllerMock; using browser_sync::SyncBackendHostMock; @@ -163,7 +163,7 @@ class DataTypeManagerImpl2Test : public testing::Test { }; TEST_F(DataTypeManagerImpl2Test, NoControllers) { - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::OK); dtm.Configure(types_); @@ -177,7 +177,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureOne) { SetStartStopExpectations(bookmark_dtc); controllers_[syncable::BOOKMARKS] = bookmark_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::BOOKMARKS); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::OK); @@ -193,7 +193,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureOneStopWhileStarting) { DataTypeController::MODEL_STARTING); controllers_[syncable::BOOKMARKS] = bookmark_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::BOOKMARKS); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::OK); @@ -208,7 +208,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureOneStopWhileAssociating) { SetBusyStartStopExpectations(bookmark_dtc, DataTypeController::ASSOCIATING); controllers_[syncable::BOOKMARKS] = bookmark_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::BOOKMARKS); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::OK); @@ -228,7 +228,7 @@ TEST_F(DataTypeManagerImpl2Test, OneWaitingForCrypto) { controllers_[syncable::PASSWORDS] = password_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::PASSWORDS); SetConfigureStartExpectation(); @@ -264,7 +264,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureOneThenAnother) { controllers_[syncable::PREFERENCES] = preference_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(2); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::BOOKMARKS); SetConfigureStartExpectation(); @@ -291,7 +291,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureOneThenSwitch) { controllers_[syncable::PREFERENCES] = preference_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(2); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::BOOKMARKS); SetConfigureStartExpectation(); @@ -333,7 +333,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureWhileOneInFlight) { controllers_[syncable::PREFERENCES] = preference_dtc; EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(2); - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); types_.insert(syncable::BOOKMARKS); SetConfigureStartExpectation(); @@ -362,7 +362,7 @@ TEST_F(DataTypeManagerImpl2Test, OneFailingController) { WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); controllers_[syncable::BOOKMARKS] = bookmark_dtc; - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::ASSOCIATION_FAILED); EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); @@ -386,7 +386,7 @@ TEST_F(DataTypeManagerImpl2Test, StopWhileInFlight) { WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); controllers_[syncable::PREFERENCES] = preference_dtc; - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::ABORTED); EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); @@ -418,7 +418,7 @@ TEST_F(DataTypeManagerImpl2Test, SecondControllerFails) { WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); controllers_[syncable::PREFERENCES] = preference_dtc; - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::ASSOCIATION_FAILED); EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); @@ -438,7 +438,7 @@ TEST_F(DataTypeManagerImpl2Test, ConfigureWhileDownloadPending) { SetStartStopExpectations(preference_dtc); controllers_[syncable::PREFERENCES] = preference_dtc; - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::OK); CancelableTask* task; @@ -473,7 +473,7 @@ TEST_F(DataTypeManagerImpl2Test, StopWhileDownloadPending) { SetNotUsedExpectations(bookmark_dtc); controllers_[syncable::BOOKMARKS] = bookmark_dtc; - DataTypeManagerImpl dtm(&backend_, controllers_); + DataTypeManagerImpl2 dtm(&backend_, controllers_); SetConfigureStartExpectation(); SetConfigureDoneExpectation(DataTypeManager::ABORTED); CancelableTask* task; diff --git a/chrome/browser/sync/glue/data_type_manager_impl_unittest.cc b/chrome/browser/sync/glue/data_type_manager_impl_unittest.cc index 7a37f38..d8d875b 100644 --- a/chrome/browser/sync/glue/data_type_manager_impl_unittest.cc +++ b/chrome/browser/sync/glue/data_type_manager_impl_unittest.cc @@ -1,3 +1,688 @@ // Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. + +#include <set> + +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "base/stl_util-inl.h" +#include "base/task.h" +#include "chrome/browser/sync/glue/data_type_controller.h" +#include "chrome/browser/sync/glue/data_type_controller_mock.h" +#include "chrome/browser/sync/glue/data_type_manager_impl.h" +#include "chrome/browser/sync/glue/sync_backend_host_mock.h" +#include "chrome/browser/sync/profile_sync_test_util.h" +#include "chrome/browser/sync/syncable/model_type.h" +#include "content/browser/browser_thread.h" +#include "content/common/notification_details.h" +#include "content/common/notification_observer_mock.h" +#include "content/common/notification_registrar.h" +#include "content/common/notification_service.h" +#include "content/common/notification_type.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using browser_sync::DataTypeManager; +using browser_sync::DataTypeManagerImpl; +using browser_sync::DataTypeController; +using browser_sync::DataTypeControllerMock; +using browser_sync::SyncBackendHostMock; +using testing::_; +using testing::AtLeast; +using testing::DoAll; +using testing::DoDefault; +using testing::InSequence; +using testing::Mock; +using testing::Property; +using testing::Pointee; +using testing::Return; +using testing::SaveArg; + +ACTION_P(InvokeCallback, callback_result) { + arg0->Run(callback_result, FROM_HERE); + delete arg0; +} + +ACTION_P2(InvokeCallbackPointer, callback, argument) { + callback->Run(argument, FROM_HERE); + delete callback; +} + +class DataTypeManagerImplTest : public testing::Test { + public: + DataTypeManagerImplTest() + : ui_thread_(BrowserThread::UI, &message_loop_) {} + + virtual ~DataTypeManagerImplTest() { + } + + protected: + virtual void SetUp() { + registrar_.Add(&observer_, + NotificationType::SYNC_CONFIGURE_START, + NotificationService::AllSources()); + registrar_.Add(&observer_, + NotificationType::SYNC_CONFIGURE_DONE, + NotificationService::AllSources()); + } + + DataTypeControllerMock* MakeBookmarkDTC() { + DataTypeControllerMock* dtc = new DataTypeControllerMock(); + EXPECT_CALL(*dtc, enabled()).WillRepeatedly(Return(true)); + EXPECT_CALL(*dtc, type()).WillRepeatedly(Return(syncable::BOOKMARKS)); + EXPECT_CALL(*dtc, name()).WillRepeatedly(Return("bookmark")); + return dtc; + } + + DataTypeControllerMock* MakePreferenceDTC() { + DataTypeControllerMock* dtc = new DataTypeControllerMock(); + EXPECT_CALL(*dtc, enabled()).WillRepeatedly(Return(true)); + EXPECT_CALL(*dtc, type()).WillRepeatedly(Return(syncable::PREFERENCES)); + EXPECT_CALL(*dtc, name()).WillRepeatedly(Return("preference")); + return dtc; + } + + DataTypeControllerMock* MakePasswordDTC() { + DataTypeControllerMock* dtc = new DataTypeControllerMock(); + SetPasswordDTCExpectations(dtc); + return dtc; + } + + void SetPasswordDTCExpectations(DataTypeControllerMock* dtc) { + EXPECT_CALL(*dtc, enabled()).WillRepeatedly(Return(true)); + EXPECT_CALL(*dtc, type()).WillRepeatedly(Return(syncable::PASSWORDS)); + EXPECT_CALL(*dtc, name()).WillRepeatedly(Return("passwords")); + } + + void SetStartStopExpectations(DataTypeControllerMock* mock_dtc) { + InSequence seq; + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + EXPECT_CALL(*mock_dtc, Start(_)). + WillOnce(InvokeCallback((DataTypeController::OK))); + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(DataTypeController::RUNNING)); + EXPECT_CALL(*mock_dtc, Stop()).Times(1); + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + } + + void SetBusyStartStopExpectations(DataTypeControllerMock* mock_dtc, + DataTypeController::State busy_state) { + InSequence seq; + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + EXPECT_CALL(*mock_dtc, Start(_)). + WillOnce(InvokeCallback((DataTypeController::OK))); + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(busy_state)); + EXPECT_CALL(*mock_dtc, Stop()).Times(1); + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + } + + void SetNotUsedExpectations(DataTypeControllerMock* mock_dtc) { + EXPECT_CALL(*mock_dtc, Start(_)).Times(0); + EXPECT_CALL(*mock_dtc, Stop()).Times(0); + EXPECT_CALL(*mock_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + } + + void SetConfigureStartExpectation() { + EXPECT_CALL( + observer_, + Observe(NotificationType(NotificationType::SYNC_CONFIGURE_START), + _, _)); + } + + void SetConfigureDoneExpectation(DataTypeManager::ConfigureResult result) { + EXPECT_CALL( + observer_, + Observe(NotificationType(NotificationType::SYNC_CONFIGURE_DONE), _, + Property(&Details<DataTypeManager::ConfigureResult>::ptr, + Pointee(result)))); + } + + void SetBackendExpectations(int times) { + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(times); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(times); + EXPECT_CALL(backend_, RequestPause()).Times(times); + EXPECT_CALL(backend_, RequestResume()).Times(times); + } + + MessageLoopForUI message_loop_; + BrowserThread ui_thread_; + DataTypeController::TypeMap controllers_; + SyncBackendHostMock backend_; + NotificationObserverMock observer_; + NotificationRegistrar registrar_; + std::set<syncable::ModelType> types_; +}; + +TEST_F(DataTypeManagerImplTest, NoControllers) { + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureOne) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + SetBackendExpectations(1); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureOneStopWhileStarting) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetBusyStartStopExpectations(bookmark_dtc, + DataTypeController::MODEL_STARTING); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + SetBackendExpectations(1); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureOneStopWhileAssociating) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetBusyStartStopExpectations(bookmark_dtc, DataTypeController::ASSOCIATING); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + SetBackendExpectations(1); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, OneWaitingForCrypto) { + DataTypeControllerMock* password_dtc = MakePasswordDTC(); + EXPECT_CALL(*password_dtc, state()).Times(AtLeast(2)). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + EXPECT_CALL(*password_dtc, Start(_)). + WillOnce(InvokeCallback((DataTypeController::NEEDS_CRYPTO))); + + controllers_[syncable::PASSWORDS] = password_dtc; + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).Times(1); + + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::PASSWORDS); + SetConfigureStartExpectation(); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::BLOCKED, dtm.state()); + + Mock::VerifyAndClearExpectations(&backend_); + Mock::VerifyAndClearExpectations(&observer_); + Mock::VerifyAndClearExpectations(password_dtc); + + SetConfigureDoneExpectation(DataTypeManager::OK); + SetPasswordDTCExpectations(password_dtc); + EXPECT_CALL(*password_dtc, state()). + WillOnce(Return(DataTypeController::NOT_RUNNING)). + WillRepeatedly(Return(DataTypeController::RUNNING)); + EXPECT_CALL(*password_dtc, Start(_)). + WillOnce(InvokeCallback((DataTypeController::OK))); + + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestResume()).Times(1); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + EXPECT_CALL(*password_dtc, Stop()).Times(1); + dtm.Stop(); + + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + + +TEST_F(DataTypeManagerImplTest, ConfigureOneThenAnother) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + SetStartStopExpectations(preference_dtc); + controllers_[syncable::PREFERENCES] = preference_dtc; + + SetBackendExpectations(2); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + + types_.insert(syncable::PREFERENCES); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureOneThenSwitch) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + SetStartStopExpectations(preference_dtc); + controllers_[syncable::PREFERENCES] = preference_dtc; + + SetBackendExpectations(2); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + + types_.clear(); + types_.insert(syncable::PREFERENCES); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureWhileOneInFlight) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + // Save the callback here so we can interrupt startup. + DataTypeController::StartCallback* callback; + { + InSequence seq; + EXPECT_CALL(*bookmark_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + EXPECT_CALL(*bookmark_dtc, Start(_)). + WillOnce(SaveArg<0>(&callback)); + EXPECT_CALL(*bookmark_dtc, state()). + WillRepeatedly(Return(DataTypeController::RUNNING)); + EXPECT_CALL(*bookmark_dtc, Stop()).Times(1); + EXPECT_CALL(*bookmark_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + } + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + SetStartStopExpectations(preference_dtc); + controllers_[syncable::PREFERENCES] = preference_dtc; + + // Request/Resume only called once due to the configures being inlined. + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(2); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(2); + EXPECT_CALL(backend_, RequestPause()).Times(1); + EXPECT_CALL(backend_, RequestResume()).Times(1); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + + // At this point, the bookmarks dtc should be in flight. Add + // preferences and continue starting bookmarks. + types_.insert(syncable::PREFERENCES); + dtm.Configure(types_); + callback->Run(DataTypeController::OK, FROM_HERE); + delete callback; + + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureWhilePausePending) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + SetStartStopExpectations(preference_dtc); + controllers_[syncable::PREFERENCES] = preference_dtc; + + // Don't notify the first time pause is called. + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(2); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(2); + EXPECT_CALL(backend_, RequestPause()). + WillOnce(Return(true)); + EXPECT_CALL(backend_, RequestResume()).Times(1); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::PAUSE_PENDING, dtm.state()); + + // Configure while pause pending. + types_.insert(syncable::PREFERENCES); + dtm.Configure(types_); + + // Send the SYNC_PAUSED notification. This will allow the DTM to + // wake up and restart itself with the new configuration. + NotificationService::current()->Notify(NotificationType::SYNC_PAUSED, + NotificationService::AllSources(), + NotificationService::NoDetails()); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, StopWhilePausePending) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetNotUsedExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + // Never notify when RequestPause is called. + EXPECT_CALL(backend_, RequestPause()).WillOnce(Return(true)); + EXPECT_CALL(backend_, RequestResume()).Times(0); + DataTypeManagerImpl dtm(&backend_, controllers_); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::ABORTED); + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::PAUSE_PENDING, dtm.state()); + + // Stop while pause pending. + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); + + // We should be able to safely handle a SYNC_PAUSED notification. + NotificationService::current()->Notify(NotificationType::SYNC_PAUSED, + NotificationService::AllSources(), + NotificationService::NoDetails()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureWhileResumePending) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + SetStartStopExpectations(preference_dtc); + controllers_[syncable::PREFERENCES] = preference_dtc; + + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(2); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(2); + EXPECT_CALL(backend_, RequestPause()).Times(2); + // Don't notify the first time resume is called. + EXPECT_CALL(backend_, RequestResume()). + WillOnce(Return(true)). + WillOnce(DoDefault()); + DataTypeManagerImpl dtm(&backend_, controllers_); + types_.insert(syncable::BOOKMARKS); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::RESUME_PENDING, dtm.state()); + + // Configure while resume pending. + types_.insert(syncable::PREFERENCES); + dtm.Configure(types_); + + // Send the SYNC_PAUSED notification. This will allow the DTM to + // wake up and restart itself with the new configuration. + NotificationService::current()->Notify(NotificationType::SYNC_RESUMED, + NotificationService::AllSources(), + NotificationService::NoDetails()); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, StopWhileResumePending) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).Times(1); + // Never notify pause resumed. + EXPECT_CALL(backend_, RequestResume()).WillOnce(Return(true)); + DataTypeManagerImpl dtm(&backend_, controllers_); + + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::ABORTED); + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::RESUME_PENDING, dtm.state()); + + // Stop while pause pending. + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); + + // We should be able to safely handle a SYNC_RESUMED notification. + NotificationService::current()->Notify(NotificationType::SYNC_RESUMED, + NotificationService::AllSources(), + NotificationService::NoDetails()); +} + +TEST_F(DataTypeManagerImplTest, OneFailingController) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + EXPECT_CALL(*bookmark_dtc, Start(_)). + WillOnce(InvokeCallback((DataTypeController::ASSOCIATION_FAILED))); + EXPECT_CALL(*bookmark_dtc, Stop()).Times(0); + EXPECT_CALL(*bookmark_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::ASSOCIATION_FAILED); + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).Times(1); + EXPECT_CALL(backend_, RequestResume()).Times(0); + + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, StopWhileInFlight) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + // Save the callback here so we can interrupt startup. + DataTypeController::StartCallback* callback; + EXPECT_CALL(*preference_dtc, Start(_)). + WillOnce(SaveArg<0>(&callback)); + EXPECT_CALL(*preference_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + controllers_[syncable::PREFERENCES] = preference_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::ABORTED); + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).Times(1); + EXPECT_CALL(backend_, RequestResume()).Times(0); + + types_.insert(syncable::BOOKMARKS); + types_.insert(syncable::PREFERENCES); + dtm.Configure(types_); + // Configure should stop in the CONFIGURING state because we are + // waiting for the preferences callback to be invoked. + EXPECT_EQ(DataTypeManager::CONFIGURING, dtm.state()); + + // Call stop before the preference callback is invoked. + EXPECT_CALL(*preference_dtc, Stop()). + WillOnce(InvokeCallbackPointer(callback, DataTypeController::ABORTED)); + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, SecondControllerFails) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + EXPECT_CALL(*preference_dtc, Start(_)). + WillOnce(InvokeCallback((DataTypeController::ASSOCIATION_FAILED))); + EXPECT_CALL(*preference_dtc, Stop()).Times(0); + EXPECT_CALL(*preference_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + controllers_[syncable::PREFERENCES] = preference_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::ASSOCIATION_FAILED); + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).Times(1); + EXPECT_CALL(backend_, RequestResume()).Times(0); + + types_.insert(syncable::BOOKMARKS); + types_.insert(syncable::PREFERENCES); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, PauseFailed) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + EXPECT_CALL(*bookmark_dtc, Start(_)).Times(0); + EXPECT_CALL(*bookmark_dtc, state()). + WillRepeatedly(Return(DataTypeController::NOT_RUNNING)); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::UNRECOVERABLE_ERROR); + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).WillOnce(Return(false)); + EXPECT_CALL(backend_, RequestResume()).Times(0); + + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ResumeFailed) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::UNRECOVERABLE_ERROR); + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)).Times(1); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(1); + EXPECT_CALL(backend_, RequestPause()).Times(1); + EXPECT_CALL(backend_, RequestResume()).WillOnce(Return(false)); + + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, ConfigureWhileDownloadPending) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetStartStopExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeControllerMock* preference_dtc = MakePreferenceDTC(); + SetStartStopExpectations(preference_dtc); + controllers_[syncable::PREFERENCES] = preference_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::OK); + CancelableTask* task; + // Grab the task the first time this is called so we can configure + // before it is finished. + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)). + WillOnce(SaveArg<2>(&task)). + WillOnce(DoDefault()); + EXPECT_CALL(backend_, StartSyncingWithServer()).Times(2); + EXPECT_CALL(backend_, RequestPause()).Times(1); + EXPECT_CALL(backend_, RequestResume()).Times(1); + + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + // Configure should stop in the DOWNLOAD_PENDING state because we + // are waiting for the download ready task to be run. + EXPECT_EQ(DataTypeManager::DOWNLOAD_PENDING, dtm.state()); + + types_.insert(syncable::PREFERENCES); + dtm.Configure(types_); + + // Running the task will queue a restart task to the message loop, and + // eventually get us configured. + task->Run(); + delete task; + MessageLoop::current()->RunAllPending(); + EXPECT_EQ(DataTypeManager::CONFIGURED, dtm.state()); + + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); +} + +TEST_F(DataTypeManagerImplTest, StopWhileDownloadPending) { + DataTypeControllerMock* bookmark_dtc = MakeBookmarkDTC(); + SetNotUsedExpectations(bookmark_dtc); + controllers_[syncable::BOOKMARKS] = bookmark_dtc; + + DataTypeManagerImpl dtm(&backend_, controllers_); + SetConfigureStartExpectation(); + SetConfigureDoneExpectation(DataTypeManager::ABORTED); + CancelableTask* task; + // Grab the task the first time this is called so we can stop + // before it is finished. + EXPECT_CALL(backend_, ConfigureDataTypes(_, _, _)). + WillOnce(SaveArg<2>(&task)); + EXPECT_CALL(backend_, StartSyncingWithServer()); + EXPECT_CALL(backend_, RequestPause()).Times(0); + EXPECT_CALL(backend_, RequestResume()).Times(0); + + types_.insert(syncable::BOOKMARKS); + dtm.Configure(types_); + // Configure should stop in the DOWNLOAD_PENDING state because we + // are waiting for the download ready task to be run. + EXPECT_EQ(DataTypeManager::DOWNLOAD_PENDING, dtm.state()); + + dtm.Stop(); + EXPECT_EQ(DataTypeManager::STOPPED, dtm.state()); + + // It should be perfectly safe to run this task even though the DTM + // has been stopped. + task->Run(); + delete task; +} diff --git a/chrome/browser/sync/glue/sync_backend_host.cc b/chrome/browser/sync/glue/sync_backend_host.cc index 9d57a8f..50fc35c 100644 --- a/chrome/browser/sync/glue/sync_backend_host.cc +++ b/chrome/browser/sync/glue/sync_backend_host.cc @@ -66,6 +66,9 @@ SyncBackendHost::SyncBackendHost(Profile* profile) sync_data_folder_path_( profile_->GetPath().Append(kSyncDataFolderName)), last_auth_error_(AuthError::None()), + using_new_syncer_thread_( + CommandLine::ForCurrentProcess()->HasSwitch( + switches::kNewSyncerThread)), syncapi_initialized_(false) { } @@ -75,6 +78,9 @@ SyncBackendHost::SyncBackendHost() profile_(NULL), frontend_(NULL), last_auth_error_(AuthError::None()), + using_new_syncer_thread_( + CommandLine::ForCurrentProcess()->HasSwitch( + switches::kNewSyncerThread)), syncapi_initialized_(false) { } @@ -403,7 +409,16 @@ void SyncBackendHost::ConfigureDataTypes( ®istrar_.routing_info)); } - FinishConfigureDataTypesOnFrontendLoop(); + // If we're doing the first configure (at startup) this is redundant as the + // syncer thread always must start in config mode. + if (using_new_syncer_thread_) { + core_thread_.message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &SyncBackendHost::Core::DoStartConfigurationMode)); + } else { + FinishConfigureDataTypesOnFrontendLoop(); + } } void SyncBackendHost::FinishConfigureDataTypesOnFrontendLoop() { @@ -418,11 +433,15 @@ void SyncBackendHost::FinishConfigureDataTypesOnFrontendLoop() { // downloading updates for newly added data types. Once this is // complete, the configure_state_.ready_task_ is run via an // OnInitializationComplete notification. - + bool request_nudge = false; if (pending_config_mode_state_->deleted_type) { - core_thread_.message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(core_.get(), - &SyncBackendHost::Core::DeferNudgeForCleanup)); + if (using_new_syncer_thread_) { + core_thread_.message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(core_.get(), + &SyncBackendHost::Core::DeferNudgeForCleanup)); + } else { + request_nudge = true; + } } if (pending_config_mode_state_->added_types.none() && @@ -445,14 +464,17 @@ void SyncBackendHost::FinishConfigureDataTypesOnFrontendLoop() { pending_config_mode_state_->ready_task->Run(); } else { pending_download_state_.reset(pending_config_mode_state_.release()); + if (using_new_syncer_thread_) { + RequestConfig(pending_download_state_->added_types); + } else { + request_nudge = true; + } + } - syncable::ModelTypeBitSet types_copy(pending_download_state_->added_types); - if (IsNigoriEnabled()) - types_copy.set(syncable::NIGORI); - core_thread_.message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(core_.get(), - &SyncBackendHost::Core::DoRequestConfig, - types_copy)); + // TODO(tim): Remove this when we get rid of the old syncer thread. + if (request_nudge) { + CHECK(!using_new_syncer_thread_); + RequestNudge(FROM_HERE); } pending_config_mode_state_.reset(); @@ -477,6 +499,17 @@ void SyncBackendHost::RequestNudge(const tracked_objects::Location& location) { location)); } +void SyncBackendHost::RequestConfig( + const syncable::ModelTypeBitSet& added_types) { + DCHECK(core_->syncapi()); + + syncable::ModelTypeBitSet types_copy(added_types); + if (IsNigoriEnabled()) + types_copy.set(syncable::NIGORI); + + core_->syncapi()->RequestConfig(types_copy); +} + void SyncBackendHost::ActivateDataType( DataTypeController* data_type_controller, ChangeProcessor* change_processor) { @@ -508,6 +541,20 @@ void SyncBackendHost::DeactivateDataType( DCHECK_EQ(erased, 1U); } +bool SyncBackendHost::RequestPause() { + DCHECK(!using_new_syncer_thread_); + core_thread_.message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(core_.get(), &SyncBackendHost::Core::DoRequestPause)); + return true; +} + +bool SyncBackendHost::RequestResume() { + DCHECK(!using_new_syncer_thread_); + core_thread_.message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(core_.get(), &SyncBackendHost::Core::DoRequestResume)); + return true; +} + bool SyncBackendHost::RequestClearServerData() { core_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(core_.get(), @@ -518,6 +565,20 @@ bool SyncBackendHost::RequestClearServerData() { SyncBackendHost::Core::~Core() { } +void SyncBackendHost::Core::NotifyPaused() { + DCHECK(!host_ || !host_->using_new_syncer_thread_); + NotificationService::current()->Notify(NotificationType::SYNC_PAUSED, + NotificationService::AllSources(), + NotificationService::NoDetails()); +} + +void SyncBackendHost::Core::NotifyResumed() { + DCHECK(!host_ || !host_->using_new_syncer_thread_); + NotificationService::current()->Notify(NotificationType::SYNC_RESUMED, + NotificationService::AllSources(), + NotificationService::NoDetails()); +} + void SyncBackendHost::Core::NotifyPassphraseRequired(bool for_decryption) { if (!host_ || !host_->frontend_) return; @@ -771,11 +832,6 @@ void SyncBackendHost::Core::DoEncryptDataTypes( syncapi_->EncryptDataTypes(encrypted_types); } -void SyncBackendHost::Core::DoRequestConfig( - const syncable::ModelTypeBitSet& added_types) { - syncapi_->RequestConfig(added_types); -} - UIModelWorker* SyncBackendHost::ui_worker() { ModelSafeWorker* w = registrar_.workers[GROUP_UI]; if (w == NULL) @@ -893,6 +949,7 @@ void SyncBackendHost::Core::HandleSyncCycleCompletedOnFrontendLoop( } if (!found_all_added) { CHECK(false); + DCHECK(!host_->using_new_syncer_thread_); } else { host_->pending_download_state_->ready_task->Run(); host_->pending_download_state_.reset(); @@ -970,6 +1027,18 @@ void SyncBackendHost::Core::OnPassphraseAccepted( bootstrap_token)); } +void SyncBackendHost::Core::OnPaused() { + host_->frontend_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &Core::NotifyPaused)); +} + +void SyncBackendHost::Core::OnResumed() { + host_->frontend_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &Core::NotifyResumed)); +} + void SyncBackendHost::Core::OnStopSyncingPermanently() { host_->frontend_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &Core::HandleStopSyncingPermanentlyOnFrontendLoop)); @@ -1061,6 +1130,14 @@ void SyncBackendHost::Core::DoRequestClearServerData() { syncapi_->RequestClearServerData(); } +void SyncBackendHost::Core::DoRequestResume() { + syncapi_->RequestResume(); +} + +void SyncBackendHost::Core::DoRequestPause() { + syncapi()->RequestPause(); +} + void SyncBackendHost::Core::SaveChanges() { syncapi_->SaveChanges(); } @@ -1136,6 +1213,11 @@ void SyncBackendHost::Core::DoProcessMessage( syncapi_->GetJsBackend()->ProcessMessage(name, args, sender); } +void SyncBackendHost::Core::DoStartConfigurationMode() { + syncapi_->StartConfigurationMode(NewCallback(this, + &SyncBackendHost::Core::FinishConfigureDataTypes)); +} + void SyncBackendHost::Core::DeferNudgeForCleanup() { DCHECK_EQ(MessageLoop::current(), host_->core_thread_.message_loop()); deferred_nudge_for_cleanup_requested_ = true; diff --git a/chrome/browser/sync/glue/sync_backend_host.h b/chrome/browser/sync/glue/sync_backend_host.h index 586ca8d..086b2e0 100644 --- a/chrome/browser/sync/glue/sync_backend_host.h +++ b/chrome/browser/sync/glue/sync_backend_host.h @@ -195,6 +195,16 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { void DeactivateDataType(DataTypeController* data_type_controller, ChangeProcessor* change_processor); + // Requests the backend to pause. Returns true if the request is + // sent sucessfully. When the backend does pause, a SYNC_PAUSED + // notification is sent to the notification service. + virtual bool RequestPause(); + + // Requests the backend to resume. Returns true if the request is + // sent sucessfully. When the backend does resume, a SYNC_RESUMED + // notification is sent to the notification service. + virtual bool RequestResume(); + // Asks the server to clear all data associated with ChromeSync. virtual bool RequestClearServerData(); @@ -272,6 +282,8 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { virtual void OnPassphraseRequired(bool for_decryption); virtual void OnPassphraseFailed(); virtual void OnPassphraseAccepted(const std::string& bootstrap_token); + virtual void OnPaused(); + virtual void OnResumed(); virtual void OnStopSyncingPermanently(); virtual void OnUpdatedToken(const std::string& token); virtual void OnClearServerDataFailed(); @@ -338,6 +350,8 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { // Called on the SyncBackendHost core_thread_ to nudge/pause/resume the // syncer. void DoRequestNudge(const tracked_objects::Location& location); + void DoRequestPause(); + void DoRequestResume(); void DoRequestClearServerData(); // Sets |deferred_nudge_for_cleanup_requested_| to true. See comment below. @@ -368,9 +382,6 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { // because the thread that was using them has exited (in step 2). void DoShutdown(bool stopping_sync); - // Posts a config request on the core thread. - virtual void DoRequestConfig(const syncable::ModelTypeBitSet& added_types); - // Set the base request context to use when making HTTP calls. // This method will add a reference to the context to persist it // on the IO thread. Must be removed from IO thread. @@ -390,6 +401,8 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { const std::string& name, const JsArgList& args, const JsEventHandler* sender); + void DoStartConfigurationMode(); + // A callback from the SyncerThread when it is safe to continue config. void FinishConfigureDataTypes(); @@ -419,6 +432,15 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { // Return change processor for a particular model (return NULL on failure). ChangeProcessor* GetProcessor(syncable::ModelType modeltype); + + // Sends a SYNC_PAUSED notification to the notification service on + // the UI thread. + void NotifyPaused(); + + // Sends a SYNC_RESUMED notification to the notification service + // on the UI thread. + void NotifyResumed(); + // Invoked when initialization of syncapi is complete and we can start // our timer. // This must be called from the thread on which SaveChanges is intended to @@ -519,6 +541,9 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { // Posts a nudge request on the core thread. virtual void RequestNudge(const tracked_objects::Location& location); + // Posts a config request on the core thread. + virtual void RequestConfig(const syncable::ModelTypeBitSet& added_types); + // Called to finish the job of ConfigureDataTypes once the syncer is in // configuration mode. void FinishConfigureDataTypes(); @@ -632,6 +657,9 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { // UI-thread cache of the last SyncSessionSnapshot received from syncapi. scoped_ptr<sessions::SyncSessionSnapshot> last_snapshot_; + // While two impls are in flight, using this for sanity checking. Bug 26339. + const bool using_new_syncer_thread_; + // Whether we've processed the initialization complete callback. bool syncapi_initialized_; diff --git a/chrome/browser/sync/glue/sync_backend_host_mock.cc b/chrome/browser/sync/glue/sync_backend_host_mock.cc index 8e16905..2d618f7 100644 --- a/chrome/browser/sync/glue/sync_backend_host_mock.cc +++ b/chrome/browser/sync/glue/sync_backend_host_mock.cc @@ -12,6 +12,15 @@ ACTION(InvokeTask) { } SyncBackendHostMock::SyncBackendHostMock() { + // By default, the RequestPause and RequestResume methods will + // send the confirmation notification and return true. + ON_CALL(*this, RequestPause()). + WillByDefault(testing::DoAll(Notify(NotificationType::SYNC_PAUSED), + testing::Return(true))); + ON_CALL(*this, RequestResume()). + WillByDefault(testing::DoAll(Notify(NotificationType::SYNC_RESUMED), + testing::Return(true))); + // By default, invoke the ready callback. ON_CALL(*this, ConfigureDataTypes(testing::_, testing::_, testing::_)). WillByDefault(InvokeTask()); diff --git a/chrome/browser/sync/glue/sync_backend_host_mock.h b/chrome/browser/sync/glue/sync_backend_host_mock.h index a594674..9d715f3 100644 --- a/chrome/browser/sync/glue/sync_backend_host_mock.h +++ b/chrome/browser/sync/glue/sync_backend_host_mock.h @@ -24,6 +24,8 @@ class SyncBackendHostMock : public SyncBackendHost { MOCK_METHOD3(ConfigureDataTypes, void(const DataTypeController::TypeMap&, const std::set<syncable::ModelType>&, CancelableTask*)); + MOCK_METHOD0(RequestPause, bool()); + MOCK_METHOD0(RequestResume, bool()); MOCK_METHOD0(StartSyncingWithServer, void()); }; diff --git a/chrome/browser/sync/js_sync_manager_observer.cc b/chrome/browser/sync/js_sync_manager_observer.cc index b6266e7..3999fdd 100644 --- a/chrome/browser/sync/js_sync_manager_observer.cc +++ b/chrome/browser/sync/js_sync_manager_observer.cc @@ -103,6 +103,14 @@ void JsSyncManagerObserver::OnInitializationComplete() { JsArgList(), NULL); } +void JsSyncManagerObserver::OnPaused() { + parent_router_->RouteJsEvent("onPaused", JsArgList(), NULL); +} + +void JsSyncManagerObserver::OnResumed() { + parent_router_->RouteJsEvent("onResumed", JsArgList(), NULL); +} + void JsSyncManagerObserver::OnStopSyncingPermanently() { parent_router_->RouteJsEvent("onStopSyncingPermanently", JsArgList(), NULL); diff --git a/chrome/browser/sync/js_sync_manager_observer.h b/chrome/browser/sync/js_sync_manager_observer.h index c7d09e76..2fa8638 100644 --- a/chrome/browser/sync/js_sync_manager_observer.h +++ b/chrome/browser/sync/js_sync_manager_observer.h @@ -40,6 +40,8 @@ class JsSyncManagerObserver : public sync_api::SyncManager::Observer { virtual void OnEncryptionComplete( const syncable::ModelTypeSet& encrypted_types); virtual void OnInitializationComplete(); + virtual void OnPaused(); + virtual void OnResumed(); virtual void OnStopSyncingPermanently(); virtual void OnClearServerDataSucceeded(); virtual void OnClearServerDataFailed(); diff --git a/chrome/browser/sync/js_sync_manager_observer_unittest.cc b/chrome/browser/sync/js_sync_manager_observer_unittest.cc index e854690..b9c0c33 100644 --- a/chrome/browser/sync/js_sync_manager_observer_unittest.cc +++ b/chrome/browser/sync/js_sync_manager_observer_unittest.cc @@ -39,7 +39,12 @@ TEST_F(JsSyncManagerObserverTest, NoArgNotifiations) { EXPECT_CALL(mock_router_, RouteJsEvent("onPassphraseFailed", HasArgs(JsArgList()), NULL)); - + EXPECT_CALL(mock_router_, + RouteJsEvent("onPaused", + HasArgs(JsArgList()), NULL)); + EXPECT_CALL(mock_router_, + RouteJsEvent("onResumed", + HasArgs(JsArgList()), NULL)); EXPECT_CALL(mock_router_, RouteJsEvent("onStopSyncingPermanently", HasArgs(JsArgList()), NULL)); @@ -52,6 +57,8 @@ TEST_F(JsSyncManagerObserverTest, NoArgNotifiations) { sync_manager_observer_.OnInitializationComplete(); sync_manager_observer_.OnPassphraseFailed(); + sync_manager_observer_.OnPaused(); + sync_manager_observer_.OnResumed(); sync_manager_observer_.OnStopSyncingPermanently(); sync_manager_observer_.OnClearServerDataSucceeded(); sync_manager_observer_.OnClearServerDataFailed(); diff --git a/chrome/browser/sync/profile_sync_factory_impl.cc b/chrome/browser/sync/profile_sync_factory_impl.cc index 9102e88..b272f20 100644 --- a/chrome/browser/sync/profile_sync_factory_impl.cc +++ b/chrome/browser/sync/profile_sync_factory_impl.cc @@ -16,6 +16,7 @@ #include "chrome/browser/sync/glue/bookmark_data_type_controller.h" #include "chrome/browser/sync/glue/bookmark_model_associator.h" #include "chrome/browser/sync/glue/data_type_manager_impl.h" +#include "chrome/browser/sync/glue/data_type_manager_impl2.h" #include "chrome/browser/sync/glue/extension_change_processor.h" #include "chrome/browser/sync/glue/extension_data_type_controller.h" #include "chrome/browser/sync/glue/extension_model_associator.h" @@ -54,6 +55,7 @@ using browser_sync::BookmarkModelAssociator; using browser_sync::DataTypeController; using browser_sync::DataTypeManager; using browser_sync::DataTypeManagerImpl; +using browser_sync::DataTypeManagerImpl2; using browser_sync::ExtensionChangeProcessor; using browser_sync::ExtensionDataTypeController; using browser_sync::ExtensionModelAssociator; @@ -159,7 +161,10 @@ ProfileSyncService* ProfileSyncFactoryImpl::CreateProfileSyncService( DataTypeManager* ProfileSyncFactoryImpl::CreateDataTypeManager( SyncBackendHost* backend, const DataTypeController::TypeMap& controllers) { - return new DataTypeManagerImpl(backend, controllers); + if (command_line_->HasSwitch(switches::kNewSyncerThread)) + return new DataTypeManagerImpl2(backend, controllers); + else + return new DataTypeManagerImpl(backend, controllers); } ProfileSyncFactory::SyncComponents diff --git a/chrome/browser/sync/profile_sync_service.cc b/chrome/browser/sync/profile_sync_service.cc index 937af9f..aa95e2a 100644 --- a/chrome/browser/sync/profile_sync_service.cc +++ b/chrome/browser/sync/profile_sync_service.cc @@ -598,13 +598,6 @@ void ProfileSyncService::OnClearServerDataSucceeded() { void ProfileSyncService::OnPassphraseRequired(bool for_decryption) { DCHECK(backend_.get()); DCHECK(backend_->IsNigoriEnabled()); - - // TODO(lipalani) : add this check to other locations as well. - if (unrecoverable_error_detected_) { - // When unrecoverable error is detected we post a task to shutdown the - // backend. The task might not have executed yet. - return; - } observed_passphrase_required_ = true; passphrase_required_for_decryption_ = for_decryption; diff --git a/chrome/browser/sync/profile_sync_service_password_unittest.cc b/chrome/browser/sync/profile_sync_service_password_unittest.cc index 22005ce..2e106c1 100644 --- a/chrome/browser/sync/profile_sync_service_password_unittest.cc +++ b/chrome/browser/sync/profile_sync_service_password_unittest.cc @@ -153,7 +153,7 @@ class ProfileSyncServicePasswordTest : public AbstractProfileSyncServiceTest { NotificationType::SYNC_CONFIGURE_DONE, NotificationService::AllSources()); registrar_.Add(&observer_, - NotificationType::SYNC_CONFIGURE_BLOCKED, + NotificationType::SYNC_PAUSED, NotificationService::AllSources()); } @@ -218,8 +218,7 @@ class ProfileSyncServicePasswordTest : public AbstractProfileSyncServiceTest { NotificationType(NotificationType::SYNC_CONFIGURE_DONE),_,_)); EXPECT_CALL(observer_, Observe( - NotificationType( - NotificationType::SYNC_CONFIGURE_BLOCKED),_,_)) + NotificationType(NotificationType::SYNC_PAUSED),_,_)) .WillOnce(InvokeWithoutArgs(QuitMessageLoop)); service_->RegisterDataTypeController(data_type_controller); diff --git a/chrome/browser/sync/sessions/sync_session_context.cc b/chrome/browser/sync/sessions/sync_session_context.cc index f0cbce4..b5f072b 100644 --- a/chrome/browser/sync/sessions/sync_session_context.cc +++ b/chrome/browser/sync/sessions/sync_session_context.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -36,5 +36,10 @@ SyncSessionContext::~SyncSessionContext() { } } +void SyncSessionContext::set_last_snapshot( + const SyncSessionSnapshot& snapshot) { + previous_session_snapshot_.reset(new SyncSessionSnapshot(snapshot)); +} + } // namespace sessions } // namespace browser_sync diff --git a/chrome/browser/sync/sessions/sync_session_context.h b/chrome/browser/sync/sessions/sync_session_context.h index a3a69ec..4ec8612 100644 --- a/chrome/browser/sync/sessions/sync_session_context.h +++ b/chrome/browser/sync/sessions/sync_session_context.h @@ -92,6 +92,13 @@ class SyncSessionContext { previous_session_routing_info_ = info; } + // TODO(tim): Deprecated. Bug 26339. + sessions::SyncSessionSnapshot* previous_session_snapshot() { + return previous_session_snapshot_.get(); + } + + void set_last_snapshot(const SyncSessionSnapshot& snapshot); + void NotifyListeners(const SyncEngineEvent& event) { FOR_EACH_OBSERVER(SyncEngineEventListener, listeners_, OnSyncEngineEvent(event)); diff --git a/chrome/browser/sync/test_profile_sync_service.cc b/chrome/browser/sync/test_profile_sync_service.cc index c36cd49..02b44ef 100644 --- a/chrome/browser/sync/test_profile_sync_service.cc +++ b/chrome/browser/sync/test_profile_sync_service.cc @@ -24,6 +24,14 @@ using syncable::ModelType; using syncable::ScopedDirLookup; using sync_api::UserShare; +ACTION_P(CallOnPaused, core) { + core->OnPaused(); +}; + +ACTION_P(CallOnResumed, core) { + core->OnResumed(); +} + namespace browser_sync { using ::testing::_; @@ -33,10 +41,21 @@ SyncBackendHostForProfileSyncTest::SyncBackendHostForProfileSyncTest( bool synchronous_init) : browser_sync::SyncBackendHost(profile), synchronous_init_(synchronous_init) { + // By default, the RequestPause and RequestResume methods will + // send the confirmation notification and return true. + ON_CALL(*this, RequestPause()). + WillByDefault(testing::DoAll(CallOnPaused(core_), + testing::Return(true))); + ON_CALL(*this, RequestResume()). + WillByDefault(testing::DoAll(CallOnResumed(core_), + testing::Return(true))); ON_CALL(*this, RequestNudge(_)).WillByDefault( testing::Invoke(this, &SyncBackendHostForProfileSyncTest:: SimulateSyncCycleCompletedInitialSyncEnded)); + + EXPECT_CALL(*this, RequestPause()).Times(testing::AnyNumber()); + EXPECT_CALL(*this, RequestResume()).Times(testing::AnyNumber()); EXPECT_CALL(*this, RequestNudge(_)).Times(testing::AnyNumber()); } diff --git a/chrome/browser/sync/test_profile_sync_service.h b/chrome/browser/sync/test_profile_sync_service.h index 3cedde6..bbe1d14 100644 --- a/chrome/browser/sync/test_profile_sync_service.h +++ b/chrome/browser/sync/test_profile_sync_service.h @@ -40,6 +40,8 @@ class SyncBackendHostForProfileSyncTest bool synchronous_init); virtual ~SyncBackendHostForProfileSyncTest(); + MOCK_METHOD0(RequestPause, bool()); + MOCK_METHOD0(RequestResume, bool()); MOCK_METHOD1(RequestNudge, void(const tracked_objects::Location&)); virtual void ConfigureDataTypes( |