diff options
author | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-07-19 22:19:18 +0000 |
---|---|---|
committer | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-07-19 22:19:18 +0000 |
commit | 6d2dc98e938a0ff4da17e78e96ae6261bece64ef (patch) | |
tree | 9127eb2b563f30bbc4a7b3d38969ba0768289f85 /sync/engine | |
parent | 0b9db6796ae57fe8bb36b2e89528efa51f8e938b (diff) | |
download | chromium_src-6d2dc98e938a0ff4da17e78e96ae6261bece64ef.zip chromium_src-6d2dc98e938a0ff4da17e78e96ae6261bece64ef.tar.gz chromium_src-6d2dc98e938a0ff4da17e78e96ae6261bece64ef.tar.bz2 |
sync: Remove SyncManager::TestingMode in favour of InternalComponentsFactory.
Turns SyncScheduler into an interface and adds a FakeSyncScheduler class for tests.
BUG=117836
TEST=
Review URL: https://chromiumcodereview.appspot.com/10701046
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@147553 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'sync/engine')
-rw-r--r-- | sync/engine/sync_scheduler.cc | 1196 | ||||
-rw-r--r-- | sync/engine/sync_scheduler.h | 342 | ||||
-rw-r--r-- | sync/engine/sync_scheduler_impl.cc | 1209 | ||||
-rw-r--r-- | sync/engine/sync_scheduler_impl.h | 360 | ||||
-rw-r--r-- | sync/engine/sync_scheduler_unittest.cc | 32 | ||||
-rw-r--r-- | sync/engine/sync_scheduler_whitebox_unittest.cc | 105 | ||||
-rw-r--r-- | sync/engine/syncer_unittest.cc | 11 |
7 files changed, 1662 insertions, 1593 deletions
diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc index 04b30f0..d800b19 100644 --- a/sync/engine/sync_scheduler.cc +++ b/sync/engine/sync_scheduler.cc @@ -4,1201 +4,9 @@ #include "sync/engine/sync_scheduler.h" -#include <algorithm> -#include <cstring> - -#include "base/bind.h" -#include "base/compiler_specific.h" -#include "base/location.h" -#include "base/logging.h" -#include "base/message_loop.h" -#include "base/rand_util.h" -#include "sync/engine/syncer.h" -#include "sync/engine/throttled_data_type_tracker.h" -#include "sync/protocol/proto_enum_conversions.h" -#include "sync/protocol/sync.pb.h" -#include "sync/util/data_type_histogram.h" -#include "sync/util/logging.h" - -using base::TimeDelta; -using base::TimeTicks; - namespace syncer { -using sessions::SyncSession; -using sessions::SyncSessionSnapshot; -using sessions::SyncSourceInfo; -using sync_pb::GetUpdatesCallerInfo; - -namespace { -bool ShouldRequestEarlyExit( - const syncer::SyncProtocolError& error) { - switch (error.error_type) { - case syncer::SYNC_SUCCESS: - case syncer::MIGRATION_DONE: - case syncer::THROTTLED: - case syncer::TRANSIENT_ERROR: - return false; - case syncer::NOT_MY_BIRTHDAY: - case syncer::CLEAR_PENDING: - // If we send terminate sync early then |sync_cycle_ended| notification - // would not be sent. If there were no actions then |ACTIONABLE_ERROR| - // notification wouldnt be sent either. Then the UI layer would be left - // waiting forever. So assert we would send something. - DCHECK(error.action != syncer::UNKNOWN_ACTION); - return true; - case syncer::INVALID_CREDENTIAL: - // The notification for this is handled by PostAndProcessHeaders|. - // Server does no have to send any action for this. - return true; - // Make the default a NOTREACHED. So if a new error is introduced we - // think about its expected functionality. - default: - NOTREACHED(); - return false; - } -} - -bool IsActionableError( - const syncer::SyncProtocolError& error) { - return (error.action != syncer::UNKNOWN_ACTION); -} -} // namespace - -ConfigurationParams::ConfigurationParams() - : source(GetUpdatesCallerInfo::UNKNOWN), - keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {} -ConfigurationParams::ConfigurationParams( - const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, - const syncer::ModelTypeSet& types_to_download, - const syncer::ModelSafeRoutingInfo& routing_info, - KeystoreKeyStatus keystore_key_status, - const base::Closure& ready_task) - : source(source), - types_to_download(types_to_download), - routing_info(routing_info), - keystore_key_status(keystore_key_status), - ready_task(ready_task) { - DCHECK(!ready_task.is_null()); -} -ConfigurationParams::~ConfigurationParams() {} - -SyncScheduler::DelayProvider::DelayProvider() {} -SyncScheduler::DelayProvider::~DelayProvider() {} - -SyncScheduler::WaitInterval::WaitInterval() - : mode(UNKNOWN), - had_nudge(false) { -} - -SyncScheduler::WaitInterval::~WaitInterval() {} - -#define ENUM_CASE(x) case x: return #x; break; - -const char* SyncScheduler::WaitInterval::GetModeString(Mode mode) { - switch (mode) { - ENUM_CASE(UNKNOWN); - ENUM_CASE(EXPONENTIAL_BACKOFF); - ENUM_CASE(THROTTLED); - } - NOTREACHED(); - return ""; -} - -SyncScheduler::SyncSessionJob::SyncSessionJob() - : purpose(UNKNOWN), - is_canary_job(false) { -} - -SyncScheduler::SyncSessionJob::~SyncSessionJob() {} - -SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, - base::TimeTicks start, - linked_ptr<sessions::SyncSession> session, - bool is_canary_job, - const ConfigurationParams& config_params, - const tracked_objects::Location& from_here) - : purpose(purpose), - scheduled_start(start), - session(session), - is_canary_job(is_canary_job), - config_params(config_params), - from_here(from_here) { -} - -const char* SyncScheduler::SyncSessionJob::GetPurposeString( - SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { - switch (purpose) { - ENUM_CASE(UNKNOWN); - ENUM_CASE(POLL); - ENUM_CASE(NUDGE); - ENUM_CASE(CONFIGURATION); - ENUM_CASE(CLEANUP_DISABLED_TYPES); - } - NOTREACHED(); - return ""; -} - -TimeDelta SyncScheduler::DelayProvider::GetDelay( - const base::TimeDelta& last_delay) { - return SyncScheduler::GetRecommendedDelay(last_delay); -} - -GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( - NudgeSource source) { - switch (source) { - case NUDGE_SOURCE_NOTIFICATION: - return GetUpdatesCallerInfo::NOTIFICATION; - case NUDGE_SOURCE_LOCAL: - return GetUpdatesCallerInfo::LOCAL; - case NUDGE_SOURCE_CONTINUATION: - return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; - case NUDGE_SOURCE_LOCAL_REFRESH: - return GetUpdatesCallerInfo::DATATYPE_REFRESH; - case NUDGE_SOURCE_UNKNOWN: - return GetUpdatesCallerInfo::UNKNOWN; - default: - NOTREACHED(); - return GetUpdatesCallerInfo::UNKNOWN; - } -} - -SyncScheduler::WaitInterval::WaitInterval(Mode mode, TimeDelta length) - : mode(mode), had_nudge(false), length(length) { } - -// Helper macros to log with the syncer thread name; useful when there -// are multiple syncer threads involved. - -#define SLOG(severity) LOG(severity) << name_ << ": " - -#define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " - -#define SDVLOG_LOC(from_here, verbose_level) \ - DVLOG_LOC(from_here, verbose_level) << name_ << ": " - -namespace { - -const int kDefaultSessionsCommitDelaySeconds = 10; - -bool IsConfigRelatedUpdateSourceValue( - GetUpdatesCallerInfo::GetUpdatesSource source) { - switch (source) { - case GetUpdatesCallerInfo::RECONFIGURATION: - case GetUpdatesCallerInfo::MIGRATION: - case GetUpdatesCallerInfo::NEW_CLIENT: - case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE: - return true; - default: - return false; - } -} - -} // namespace - -SyncScheduler::SyncScheduler(const std::string& name, - sessions::SyncSessionContext* context, - Syncer* syncer) - : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), - weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), - weak_handle_this_(MakeWeakHandle( - weak_ptr_factory_for_weak_handle_.GetWeakPtr())), - name_(name), - sync_loop_(MessageLoop::current()), - started_(false), - syncer_short_poll_interval_seconds_( - TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), - syncer_long_poll_interval_seconds_( - TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), - sessions_commit_delay_( - TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), - mode_(NORMAL_MODE), - // Start with assuming everything is fine with the connection. - // At the end of the sync cycle we would have the correct status. - connection_code_(HttpResponse::SERVER_CONNECTION_OK), - delay_provider_(new DelayProvider()), - syncer_(syncer), - session_context_(context) { - DCHECK(sync_loop_); -} - -SyncScheduler::~SyncScheduler() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - StopImpl(base::Closure()); -} - -void SyncScheduler::OnCredentialsUpdated() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - - // TODO(lipalani): crbug.com/106262. One issue here is that if after - // the auth error we happened to do gettime and it succeeded then - // the |connection_code_| would be briefly OK however it would revert - // back to SYNC_AUTH_ERROR at the end of the sync cycle. The - // referenced bug explores the option of removing gettime calls - // altogethere - if (HttpResponse::SYNC_AUTH_ERROR == connection_code_) { - OnServerConnectionErrorFixed(); - } -} - -void SyncScheduler::OnConnectionStatusChange() { - if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { - // Optimistically assume that the connection is fixed and try - // connecting. - OnServerConnectionErrorFixed(); - } -} - -void SyncScheduler::OnServerConnectionErrorFixed() { - connection_code_ = HttpResponse::SERVER_CONNECTION_OK; - PostTask(FROM_HERE, "DoCanaryJob", - base::Bind(&SyncScheduler::DoCanaryJob, - weak_ptr_factory_.GetWeakPtr())); - -} - -void SyncScheduler::UpdateServerConnectionManagerStatus( - HttpResponse::ServerConnectionCode code) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG(2) << "New server connection code: " - << HttpResponse::GetServerConnectionCodeString(code); - - connection_code_ = code; -} - -void SyncScheduler::Start(Mode mode) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - std::string thread_name = MessageLoop::current()->thread_name(); - if (thread_name.empty()) - thread_name = "<Main thread>"; - SDVLOG(2) << "Start called from thread " - << thread_name << " with mode " << GetModeString(mode); - if (!started_) { - started_ = true; - SendInitialSnapshot(); - } - - DCHECK(!session_context_->account_name().empty()); - DCHECK(syncer_.get()); - Mode old_mode = mode_; - mode_ = mode; - AdjustPolling(NULL); // Will kick start poll timer if needed. - - if (old_mode != mode_) { - // We just changed our mode. See if there are any pending jobs that we could - // execute in the new mode. - DoPendingJobIfPossible(false); - } -} - -void SyncScheduler::SendInitialSnapshot() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, - SyncSourceInfo(), ModelSafeRoutingInfo(), - std::vector<ModelSafeWorker*>())); - SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); - event.snapshot = dummy->TakeSnapshot(); - session_context_->NotifyListeners(event); -} - -namespace { - -// Helper to extract the routing info and workers corresponding to types in -// |types| from |current_routes| and |current_workers|. -void BuildModelSafeParams( - const ModelTypeSet& types_to_download, - const ModelSafeRoutingInfo& current_routes, - const std::vector<ModelSafeWorker*>& current_workers, - ModelSafeRoutingInfo* result_routes, - std::vector<ModelSafeWorker*>* result_workers) { - std::set<ModelSafeGroup> active_groups; - active_groups.insert(GROUP_PASSIVE); - for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); - iter.Inc()) { - syncer::ModelType type = iter.Get(); - ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); - DCHECK(route != current_routes.end()); - ModelSafeGroup group = route->second; - (*result_routes)[type] = group; - active_groups.insert(group); - } - - for(std::vector<ModelSafeWorker*>::const_iterator iter = - current_workers.begin(); iter != current_workers.end(); ++iter) { - if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) - result_workers->push_back(*iter); - } -} - -} // namespace. - -bool SyncScheduler::ScheduleConfiguration(const ConfigurationParams& params) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); - DCHECK_EQ(CONFIGURATION_MODE, mode_); - DCHECK(!params.ready_task.is_null()); - SDVLOG(2) << "Reconfiguring syncer."; - - // Only one configuration is allowed at a time. Verify we're not waiting - // for a pending configure job. - DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); - - // TODO(sync): now that ModelChanging commands only use those workers within - // the routing info, we don't really need |restricted_workers|. Remove it. - // crbug.com/133030 - syncer::ModelSafeRoutingInfo restricted_routes; - std::vector<ModelSafeWorker*> restricted_workers; - BuildModelSafeParams(params.types_to_download, - params.routing_info, - session_context_->workers(), - &restricted_routes, - &restricted_workers); - session_context_->set_routing_info(params.routing_info); - - // We rely on this not failing, so don't need to worry about checking for - // success. In addition, this will be removed as part of crbug.com/131433. - SyncSessionJob cleanup_job( - SyncSessionJob::CLEANUP_DISABLED_TYPES, - TimeTicks::Now(), - make_linked_ptr(CreateSyncSession(SyncSourceInfo())), - false, - ConfigurationParams(), - FROM_HERE); - DoSyncSessionJob(cleanup_job); - - if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) { - // TODO(zea): implement in such a way that we can handle failures and the - // subsequent retrys the scheduler might perform. See crbug.com/129665. - NOTIMPLEMENTED(); - } - - // Only reconfigure if we have types to download. - if (!params.types_to_download.Empty()) { - DCHECK(!restricted_routes.empty()); - linked_ptr<SyncSession> session(new SyncSession( - session_context_, - this, - SyncSourceInfo(params.source, - ModelSafeRoutingInfoToPayloadMap( - restricted_routes, - std::string())), - restricted_routes, - restricted_workers)); - SyncSessionJob job(SyncSessionJob::CONFIGURATION, - TimeTicks::Now(), - session, - false, - params, - FROM_HERE); - DoSyncSessionJob(job); - - // If we failed, the job would have been saved as the pending configure - // job and a wait interval would have been set. - if (!session->Succeeded()) { - DCHECK(wait_interval_.get() && - wait_interval_->pending_configure_job.get()); - return false; - } - } else { - SDVLOG(2) << "No change in routing info, calling ready task directly."; - params.ready_task.Run(); - } - - return true; -} - -SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( - const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(wait_interval_.get()); - DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); - - SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " - << WaitInterval::GetModeString(wait_interval_->mode) - << (wait_interval_->had_nudge ? " (had nudge)" : "") - << (job.is_canary_job ? " (canary)" : ""); - - if (job.purpose == SyncSessionJob::POLL) - return DROP; - - DCHECK(job.purpose == SyncSessionJob::NUDGE || - job.purpose == SyncSessionJob::CONFIGURATION); - if (wait_interval_->mode == WaitInterval::THROTTLED) - return SAVE; - - DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); - if (job.purpose == SyncSessionJob::NUDGE) { - if (mode_ == CONFIGURATION_MODE) - return SAVE; - - // If we already had one nudge then just drop this nudge. We will retry - // later when the timer runs out. - if (!job.is_canary_job) - return wait_interval_->had_nudge ? DROP : CONTINUE; - else // We are here because timer ran out. So retry. - return CONTINUE; - } - return job.is_canary_job ? CONTINUE : SAVE; -} - -SyncScheduler::JobProcessDecision SyncScheduler::DecideOnJob( - const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (job.purpose == SyncSessionJob::CLEANUP_DISABLED_TYPES) - return CONTINUE; - - // See if our type is throttled. - syncer::ModelTypeSet throttled_types = - session_context_->throttled_data_type_tracker()->GetThrottledTypes(); - if (job.purpose == SyncSessionJob::NUDGE && - job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { - syncer::ModelTypeSet requested_types; - for (ModelTypePayloadMap::const_iterator i = - job.session->source().types.begin(); - i != job.session->source().types.end(); - ++i) { - requested_types.Put(i->first); - } - - if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) - return SAVE; - } - - if (wait_interval_.get()) - return DecideWhileInWaitInterval(job); - - if (mode_ == CONFIGURATION_MODE) { - if (job.purpose == SyncSessionJob::NUDGE) - return SAVE; - else if (job.purpose == SyncSessionJob::CONFIGURATION) - return CONTINUE; - else - return DROP; - } - - // We are in normal mode. - DCHECK_EQ(mode_, NORMAL_MODE); - DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); - - // Freshness condition - if (job.scheduled_start < last_sync_session_end_time_) { - SDVLOG(2) << "Dropping job because of freshness"; - return DROP; - } - - if (!session_context_->connection_manager()->HasInvalidAuthToken()) - return CONTINUE; - - SDVLOG(2) << "No valid auth token. Using that to decide on job."; - return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; -} - -void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); - if (pending_nudge_.get() == NULL) { - SDVLOG(2) << "Creating a pending nudge job"; - SyncSession* s = job.session.get(); - scoped_ptr<SyncSession> session(new SyncSession(s->context(), - s->delegate(), s->source(), s->routing_info(), s->workers())); - - SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, - make_linked_ptr(session.release()), false, - ConfigurationParams(), job.from_here); - pending_nudge_.reset(new SyncSessionJob(new_job)); - - return; - } - - SDVLOG(2) << "Coalescing a pending nudge"; - pending_nudge_->session->Coalesce(*(job.session.get())); - pending_nudge_->scheduled_start = job.scheduled_start; - - // Unfortunately the nudge location cannot be modified. So it stores the - // location of the first caller. -} - -bool SyncScheduler::ShouldRunJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(started_); - - JobProcessDecision decision = DecideOnJob(job); - SDVLOG(2) << "Should run " - << SyncSessionJob::GetPurposeString(job.purpose) - << " job in mode " << GetModeString(mode_) - << ": " << GetDecisionString(decision); - if (decision != SAVE) - return decision == CONTINUE; - - DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == - SyncSessionJob::CONFIGURATION); - - SaveJob(job); - return false; -} - -void SyncScheduler::SaveJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - // TODO(sync): Should we also check that job.purpose != - // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) - if (job.purpose == SyncSessionJob::NUDGE) { - SDVLOG(2) << "Saving a nudge job"; - InitOrCoalescePendingJob(job); - } else if (job.purpose == SyncSessionJob::CONFIGURATION){ - SDVLOG(2) << "Saving a configuration job"; - DCHECK(wait_interval_.get()); - DCHECK(mode_ == CONFIGURATION_MODE); - - // Config params should always get set. - DCHECK(!job.config_params.ready_task.is_null()); - SyncSession* old = job.session.get(); - SyncSession* s(new SyncSession(session_context_, this, old->source(), - old->routing_info(), old->workers())); - SyncSessionJob new_job(job.purpose, - TimeTicks::Now(), - make_linked_ptr(s), - false, - job.config_params, - job.from_here); - wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); - } // drop the rest. - // TODO(sync): Is it okay to drop the rest? It's weird that - // SaveJob() only does what it says sometimes. (See - // http://crbug.com/90868.) -} - -// Functor for std::find_if to search by ModelSafeGroup. -struct ModelSafeWorkerGroupIs { - explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} - bool operator()(ModelSafeWorker* w) { - return group == w->GetModelSafeGroup(); - } - ModelSafeGroup group; -}; - -void SyncScheduler::ScheduleNudgeAsync( - const TimeDelta& delay, - NudgeSource source, ModelTypeSet types, - const tracked_objects::Location& nudge_location) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG_LOC(nudge_location, 2) - << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " - << "source " << GetNudgeSourceString(source) << ", " - << "types " << ModelTypeSetToString(types); - - ModelTypePayloadMap types_with_payloads = - syncer::ModelTypePayloadMapFromEnumSet(types, std::string()); - SyncScheduler::ScheduleNudgeImpl(delay, - GetUpdatesFromNudgeSource(source), - types_with_payloads, - false, - nudge_location); -} - -void SyncScheduler::ScheduleNudgeWithPayloadsAsync( - const TimeDelta& delay, - NudgeSource source, const ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG_LOC(nudge_location, 2) - << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " - << "source " << GetNudgeSourceString(source) << ", " - << "payloads " - << syncer::ModelTypePayloadMapToString(types_with_payloads); - - SyncScheduler::ScheduleNudgeImpl(delay, - GetUpdatesFromNudgeSource(source), - types_with_payloads, - false, - nudge_location); -} - -void SyncScheduler::ScheduleNudgeImpl( - const TimeDelta& delay, - GetUpdatesCallerInfo::GetUpdatesSource source, - const ModelTypePayloadMap& types_with_payloads, - bool is_canary_job, const tracked_objects::Location& nudge_location) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - - SDVLOG_LOC(nudge_location, 2) - << "In ScheduleNudgeImpl with delay " - << delay.InMilliseconds() << " ms, " - << "source " << GetUpdatesSourceString(source) << ", " - << "payloads " - << syncer::ModelTypePayloadMapToString(types_with_payloads) - << (is_canary_job ? " (canary)" : ""); - - SyncSourceInfo info(source, types_with_payloads); - - SyncSession* session(CreateSyncSession(info)); - SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, - make_linked_ptr(session), is_canary_job, - ConfigurationParams(), nudge_location); - - session = NULL; - if (!ShouldRunJob(job)) - return; - - if (pending_nudge_.get()) { - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { - SDVLOG(2) << "Dropping the nudge because we are in backoff"; - return; - } - - SDVLOG(2) << "Coalescing pending nudge"; - pending_nudge_->session->Coalesce(*(job.session.get())); - - SDVLOG(2) << "Rescheduling pending nudge"; - SyncSession* s = pending_nudge_->session.get(); - job.session.reset(new SyncSession(s->context(), s->delegate(), - s->source(), s->routing_info(), s->workers())); - - // Choose the start time as the earliest of the 2. - job.scheduled_start = std::min(job.scheduled_start, - pending_nudge_->scheduled_start); - pending_nudge_.reset(); - } - - // TODO(zea): Consider adding separate throttling/backoff for datatype - // refresh requests. - ScheduleSyncSessionJob(job); -} - -const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { - switch (mode) { - ENUM_CASE(CONFIGURATION_MODE); - ENUM_CASE(NORMAL_MODE); - } - return ""; -} - -const char* SyncScheduler::GetDecisionString( - SyncScheduler::JobProcessDecision mode) { - switch (mode) { - ENUM_CASE(CONTINUE); - ENUM_CASE(SAVE); - ENUM_CASE(DROP); - } - return ""; -} - -// static -void SyncScheduler::SetSyncerStepsForPurpose( - SyncSessionJob::SyncSessionJobPurpose purpose, - SyncerStep* start, - SyncerStep* end) { - switch (purpose) { - case SyncSessionJob::CONFIGURATION: - *start = DOWNLOAD_UPDATES; - *end = APPLY_UPDATES; - return; - case SyncSessionJob::NUDGE: - case SyncSessionJob::POLL: - *start = SYNCER_BEGIN; - *end = SYNCER_END; - return; - case SyncSessionJob::CLEANUP_DISABLED_TYPES: - *start = CLEANUP_DISABLED_TYPES; - *end = CLEANUP_DISABLED_TYPES; - return; - default: - NOTREACHED(); - *start = SYNCER_END; - *end = SYNCER_END; - return; - } -} - -void SyncScheduler::PostTask( - const tracked_objects::Location& from_here, - const char* name, const base::Closure& task) { - SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (!started_) { - SDVLOG(1) << "Not posting task as scheduler is stopped."; - return; - } - sync_loop_->PostTask(from_here, task); -} - -void SyncScheduler::PostDelayedTask( - const tracked_objects::Location& from_here, - const char* name, const base::Closure& task, base::TimeDelta delay) { - SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " - << delay.InMilliseconds() << " ms delay"; - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (!started_) { - SDVLOG(1) << "Not posting task as scheduler is stopped."; - return; - } - sync_loop_->PostDelayedTask(from_here, task, delay); -} - -void SyncScheduler::ScheduleSyncSessionJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - TimeDelta delay = job.scheduled_start - TimeTicks::Now(); - if (delay < TimeDelta::FromMilliseconds(0)) - delay = TimeDelta::FromMilliseconds(0); - SDVLOG_LOC(job.from_here, 2) - << "In ScheduleSyncSessionJob with " - << SyncSessionJob::GetPurposeString(job.purpose) - << " job and " << delay.InMilliseconds() << " ms delay"; - - DCHECK(job.purpose == SyncSessionJob::NUDGE || - job.purpose == SyncSessionJob::POLL); - if (job.purpose == SyncSessionJob::NUDGE) { - SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; - DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == - job.session); - pending_nudge_.reset(new SyncSessionJob(job)); - } - PostDelayedTask(job.from_here, "DoSyncSessionJob", - base::Bind(&SyncScheduler::DoSyncSessionJob, - weak_ptr_factory_.GetWeakPtr(), - job), - delay); -} - -void SyncScheduler::DoSyncSessionJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (!ShouldRunJob(job)) { - SLOG(WARNING) - << "Not executing " - << SyncSessionJob::GetPurposeString(job.purpose) << " job from " - << GetUpdatesSourceString(job.session->source().updates_source); - return; - } - - if (job.purpose == SyncSessionJob::NUDGE) { - if (pending_nudge_.get() == NULL || - pending_nudge_->session != job.session) { - SDVLOG(2) << "Dropping a nudge in " - << "DoSyncSessionJob because another nudge was scheduled"; - return; // Another nudge must have been scheduled in in the meantime. - } - pending_nudge_.reset(); - - // Create the session with the latest model safe table and use it to purge - // and update any disabled or modified entries in the job. - scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); - - job.session->RebaseRoutingInfoWithLatest(*session); - } - SDVLOG(2) << "DoSyncSessionJob with " - << SyncSessionJob::GetPurposeString(job.purpose) << " job"; - - SyncerStep begin(SYNCER_END); - SyncerStep end(SYNCER_END); - SetSyncerStepsForPurpose(job.purpose, &begin, &end); - - bool has_more_to_sync = true; - while (ShouldRunJob(job) && has_more_to_sync) { - SDVLOG(2) << "Calling SyncShare."; - // Synchronously perform the sync session from this thread. - syncer_->SyncShare(job.session.get(), begin, end); - has_more_to_sync = job.session->HasMoreToSync(); - if (has_more_to_sync) - job.session->PrepareForAnotherSyncCycle(); - } - SDVLOG(2) << "Done SyncShare looping."; - - FinishSyncSessionJob(job); -} - -void SyncScheduler::UpdateCarryoverSessionState( - const SyncSessionJob& old_job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (old_job.purpose == SyncSessionJob::CONFIGURATION) { - // Whatever types were part of a configuration task will have had updates - // downloaded. For that reason, we make sure they get recorded in the - // event that they get disabled at a later time. - ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); - if (!r.empty()) { - ModelSafeRoutingInfo temp_r; - ModelSafeRoutingInfo old_info(old_job.session->routing_info()); - std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), - std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); - session_context_->set_previous_session_routing_info(temp_r); - } - } else { - session_context_->set_previous_session_routing_info( - old_job.session->routing_info()); - } -} - -void SyncScheduler::FinishSyncSessionJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - // Update timing information for how often datatypes are triggering nudges. - base::TimeTicks now = TimeTicks::Now(); - if (!last_sync_session_end_time_.is_null()) { - ModelTypePayloadMap::const_iterator iter; - for (iter = job.session->source().types.begin(); - iter != job.session->source().types.end(); - ++iter) { -#define PER_DATA_TYPE_MACRO(type_str) \ - SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, \ - now - last_sync_session_end_time_); - SYNC_DATA_TYPE_HISTOGRAM(iter->first); -#undef PER_DATA_TYPE_MACRO - } - } - last_sync_session_end_time_ = now; - - // Now update the status of the connection from SCM. We need this to decide - // whether we need to save/run future jobs. The notifications from SCM are not - // reliable. - // - // TODO(rlarocque): crbug.com/110954 - // We should get rid of the notifications and it is probably not needed to - // maintain this status variable in 2 places. We should query it directly from - // SCM when needed. - ServerConnectionManager* scm = session_context_->connection_manager(); - UpdateServerConnectionManagerStatus(scm->server_status()); - - UpdateCarryoverSessionState(job); - if (IsSyncingCurrentlySilenced()) { - SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; - // TODO(sync): Investigate whether we need to check job.purpose - // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) - SaveJob(job); - return; // Nothing to do. - } else if (job.session->Succeeded() && - !job.config_params.ready_task.is_null()) { - // If this was a configuration job with a ready task, invoke it now that - // we finished successfully. - job.config_params.ready_task.Run(); - } - - SDVLOG(2) << "Updating the next polling time after SyncMain"; - ScheduleNextSync(job); -} - -void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(!old_job.session->HasMoreToSync()); - - AdjustPolling(&old_job); - - if (old_job.session->Succeeded()) { - // Only reset backoff if we actually reached the server. - if (old_job.session->SuccessfullyReachedServer()) - wait_interval_.reset(); - SDVLOG(2) << "Job succeeded so not scheduling more jobs"; - return; - } - - if (old_job.purpose == SyncSessionJob::POLL) { - return; // We don't retry POLL jobs. - } - - // TODO(rlarocque): There's no reason why we should blindly backoff and retry - // if we don't succeed. Some types of errors are not likely to disappear on - // their own. With the return values now available in the old_job.session, we - // should be able to detect such errors and only retry when we detect - // transient errors. - - if (IsBackingOff() && wait_interval_->timer.IsRunning() && - mode_ == NORMAL_MODE) { - // When in normal mode, we allow up to one nudge per backoff interval. It - // appears that this was our nudge for this interval, and it failed. - // - // Note: This does not prevent us from running canary jobs. For example, an - // IP address change might still result in another nudge being executed - // during this backoff interval. - SDVLOG(2) << "A nudge during backoff failed"; - - DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); - DCHECK(!wait_interval_->had_nudge); - - wait_interval_->had_nudge = true; - InitOrCoalescePendingJob(old_job); - RestartWaiting(); - } else { - // Either this is the first failure or a consecutive failure after our - // backoff timer expired. We handle it the same way in either case. - SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; - HandleContinuationError(old_job); - } -} - -void SyncScheduler::AdjustPolling(const SyncSessionJob* old_job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - - TimeDelta poll = (!session_context_->notifications_enabled()) ? - syncer_short_poll_interval_seconds_ : - syncer_long_poll_interval_seconds_; - bool rate_changed = !poll_timer_.IsRunning() || - poll != poll_timer_.GetCurrentDelay(); - - if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) - poll_timer_.Reset(); - - if (!rate_changed) - return; - - // Adjust poll rate. - poll_timer_.Stop(); - poll_timer_.Start(FROM_HERE, poll, this, &SyncScheduler::PollTimerCallback); -} - -void SyncScheduler::RestartWaiting() { - CHECK(wait_interval_.get()); - wait_interval_->timer.Stop(); - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, - this, &SyncScheduler::DoCanaryJob); -} - -void SyncScheduler::HandleContinuationError( - const SyncSessionJob& old_job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (DCHECK_IS_ON()) { - if (IsBackingOff()) { - DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); - } - } - - TimeDelta length = delay_provider_->GetDelay( - IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); - - SDVLOG(2) << "In handle continuation error with " - << SyncSessionJob::GetPurposeString(old_job.purpose) - << " job. The time delta(ms) is " - << length.InMilliseconds(); - - // This will reset the had_nudge variable as well. - wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, - length)); - if (old_job.purpose == SyncSessionJob::CONFIGURATION) { - SDVLOG(2) << "Configuration did not succeed, scheduling retry."; - // Config params should always get set. - DCHECK(!old_job.config_params.ready_task.is_null()); - SyncSession* old = old_job.session.get(); - SyncSession* s(new SyncSession(session_context_, this, - old->source(), old->routing_info(), old->workers())); - SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, - make_linked_ptr(s), false, old_job.config_params, - FROM_HERE); - wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); - } else { - // We are not in configuration mode. So wait_interval's pending job - // should be null. - DCHECK(wait_interval_->pending_configure_job.get() == NULL); - - // TODO(lipalani) - handle clear user data. - InitOrCoalescePendingJob(old_job); - } - RestartWaiting(); -} - -// static -TimeDelta SyncScheduler::GetRecommendedDelay(const TimeDelta& last_delay) { - if (last_delay.InSeconds() >= kMaxBackoffSeconds) - return TimeDelta::FromSeconds(kMaxBackoffSeconds); - - // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 - int64 backoff_s = - std::max(static_cast<int64>(1), - last_delay.InSeconds() * kBackoffRandomizationFactor); - - // Flip a coin to randomize backoff interval by +/- 50%. - int rand_sign = base::RandInt(0, 1) * 2 - 1; - - // Truncation is adequate for rounding here. - backoff_s = backoff_s + - (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); - - // Cap the backoff interval. - backoff_s = std::max(static_cast<int64>(1), - std::min(backoff_s, kMaxBackoffSeconds)); - - return TimeDelta::FromSeconds(backoff_s); -} - -void SyncScheduler::RequestStop(const base::Closure& callback) { - syncer_->RequestEarlyExit(); // Safe to call from any thread. - DCHECK(weak_handle_this_.IsInitialized()); - SDVLOG(3) << "Posting StopImpl"; - weak_handle_this_.Call(FROM_HERE, - &SyncScheduler::StopImpl, - callback); -} - -void SyncScheduler::StopImpl(const base::Closure& callback) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG(2) << "StopImpl called"; - - // Kill any in-flight method calls. - weak_ptr_factory_.InvalidateWeakPtrs(); - wait_interval_.reset(); - poll_timer_.Stop(); - if (started_) { - started_ = false; - } - if (!callback.is_null()) - callback.Run(); -} - -void SyncScheduler::DoCanaryJob() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG(2) << "Do canary job"; - DoPendingJobIfPossible(true); -} - -void SyncScheduler::DoPendingJobIfPossible(bool is_canary_job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SyncSessionJob* job_to_execute = NULL; - if (mode_ == CONFIGURATION_MODE && wait_interval_.get() - && wait_interval_->pending_configure_job.get()) { - SDVLOG(2) << "Found pending configure job"; - job_to_execute = wait_interval_->pending_configure_job.get(); - } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { - SDVLOG(2) << "Found pending nudge job"; - // Pending jobs mostly have time from the past. Reset it so this job - // will get executed. - if (pending_nudge_->scheduled_start < TimeTicks::Now()) - pending_nudge_->scheduled_start = TimeTicks::Now(); - - scoped_ptr<SyncSession> session(CreateSyncSession( - pending_nudge_->session->source())); - - // Also the routing info might have been changed since we cached the - // pending nudge. Update it by coalescing to the latest. - pending_nudge_->session->Coalesce(*(session.get())); - // The pending nudge would be cleared in the DoSyncSessionJob function. - job_to_execute = pending_nudge_.get(); - } - - if (job_to_execute != NULL) { - SDVLOG(2) << "Executing pending job"; - SyncSessionJob copy = *job_to_execute; - copy.is_canary_job = is_canary_job; - DoSyncSessionJob(copy); - } -} - -SyncSession* SyncScheduler::CreateSyncSession(const SyncSourceInfo& source) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DVLOG(2) << "Creating sync session with routes " - << ModelSafeRoutingInfoToString(session_context_->routing_info()); - - SyncSourceInfo info(source); - SyncSession* session(new SyncSession(session_context_, this, info, - session_context_->routing_info(), session_context_->workers())); - - return session; -} - -void SyncScheduler::PollTimerCallback() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - ModelSafeRoutingInfo r; - ModelTypePayloadMap types_with_payloads = - ModelSafeRoutingInfoToPayloadMap(r, std::string()); - SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); - SyncSession* s = CreateSyncSession(info); - - SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), - make_linked_ptr(s), - false, - ConfigurationParams(), - FROM_HERE); - - ScheduleSyncSessionJob(job); -} - -void SyncScheduler::Unthrottle() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); - SDVLOG(2) << "Unthrottled."; - DoCanaryJob(); - wait_interval_.reset(); -} - -void SyncScheduler::Notify(SyncEngineEvent::EventCause cause) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - session_context_->NotifyListeners(SyncEngineEvent(cause)); -} - -bool SyncScheduler::IsBackingOff() const { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - return wait_interval_.get() && wait_interval_->mode == - WaitInterval::EXPONENTIAL_BACKOFF; -} - -void SyncScheduler::OnSilencedUntil(const base::TimeTicks& silenced_until) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, - silenced_until - TimeTicks::Now())); - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, - &SyncScheduler::Unthrottle); -} - -bool SyncScheduler::IsSyncingCurrentlySilenced() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - return wait_interval_.get() && wait_interval_->mode == - WaitInterval::THROTTLED; -} - -void SyncScheduler::OnReceivedShortPollIntervalUpdate( - const base::TimeDelta& new_interval) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - syncer_short_poll_interval_seconds_ = new_interval; -} - -void SyncScheduler::OnReceivedLongPollIntervalUpdate( - const base::TimeDelta& new_interval) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - syncer_long_poll_interval_seconds_ = new_interval; -} - -void SyncScheduler::OnReceivedSessionsCommitDelay( - const base::TimeDelta& new_delay) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - sessions_commit_delay_ = new_delay; -} - -void SyncScheduler::OnShouldStopSyncingPermanently() { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG(2) << "OnShouldStopSyncingPermanently"; - syncer_->RequestEarlyExit(); // Thread-safe. - Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); -} - -void SyncScheduler::OnActionableError( - const sessions::SyncSessionSnapshot& snap) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - SDVLOG(2) << "OnActionableError"; - SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); - event.snapshot = snap; - session_context_->NotifyListeners(event); -} - -void SyncScheduler::OnSyncProtocolError( - const sessions::SyncSessionSnapshot& snapshot) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (ShouldRequestEarlyExit( - snapshot.model_neutral_state().sync_protocol_error)) { - SDVLOG(2) << "Sync Scheduler requesting early exit."; - syncer_->RequestEarlyExit(); // Thread-safe. - } - if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error)) - OnActionableError(snapshot); -} - -void SyncScheduler::set_notifications_enabled(bool notifications_enabled) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - session_context_->set_notifications_enabled(notifications_enabled); -} - -base::TimeDelta SyncScheduler::sessions_commit_delay() const { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - return sessions_commit_delay_; -} - -#undef SDVLOG_LOC - -#undef SDVLOG - -#undef SLOG - -#undef ENUM_CASE +SyncScheduler::SyncScheduler() {} +SyncScheduler::~SyncScheduler() {} } // namespace syncer diff --git a/sync/engine/sync_scheduler.h b/sync/engine/sync_scheduler.h index 0ca2f5d..423516b 100644 --- a/sync/engine/sync_scheduler.h +++ b/sync/engine/sync_scheduler.h @@ -10,21 +10,10 @@ #include "base/callback.h" #include "base/compiler_specific.h" -#include "base/gtest_prod_util.h" -#include "base/memory/linked_ptr.h" -#include "base/memory/scoped_ptr.h" -#include "base/memory/weak_ptr.h" -#include "base/observer_list.h" #include "base/time.h" -#include "base/timer.h" -#include "sync/engine/net/server_connection_manager.h" #include "sync/engine/nudge_source.h" -#include "sync/engine/syncer.h" #include "sync/internal_api/public/base/model_type_payload_map.h" -#include "sync/internal_api/public/engine/polling_constants.h" -#include "sync/internal_api/public/util/weak_handle.h" #include "sync/sessions/sync_session.h" -#include "sync/sessions/sync_session_context.h" class MessageLoop; @@ -78,25 +67,20 @@ class SyncScheduler : public sessions::SyncSession::Delegate { // All methods of SyncScheduler must be called on the same thread // (except for RequestEarlyExit()). - // |name| is a display string to identify the syncer thread. Takes - // |ownership of |syncer|. - SyncScheduler(const std::string& name, - sessions::SyncSessionContext* context, Syncer* syncer); - - // Calls Stop(). + SyncScheduler(); virtual ~SyncScheduler(); // Start the scheduler with the given mode. If the scheduler is // already started, switch to the given mode, although some // scheduled tasks from the old mode may still run. - virtual void Start(Mode mode); + virtual void Start(Mode mode) = 0; // Schedules the configuration task specified by |params|. Returns true if // the configuration task executed immediately, false if it had to be // scheduled for a later attempt. |params.ready_task| is invoked whenever the // configuration task executes. // Note: must already be in CONFIGURATION mode. - virtual bool ScheduleConfiguration(const ConfigurationParams& params); + virtual bool ScheduleConfiguration(const ConfigurationParams& params) = 0; // Request that any running syncer task stop as soon as possible and // cancel all scheduled tasks. This function can be called from any thread, @@ -104,326 +88,30 @@ class SyncScheduler : public sessions::SyncSession::Delegate { // allow preempting ongoing sync cycles. // Invokes |callback| from the sync loop once syncer is idle and all tasks // are cancelled. - void RequestStop(const base::Closure& callback); + virtual void RequestStop(const base::Closure& callback) = 0; // The meat and potatoes. Both of these methods will post a delayed task // to attempt the actual nudge (see ScheduleNudgeImpl). - void ScheduleNudgeAsync(const base::TimeDelta& delay, NudgeSource source, - syncer::ModelTypeSet types, - const tracked_objects::Location& nudge_location); - void ScheduleNudgeWithPayloadsAsync( + virtual void ScheduleNudgeAsync( + const base::TimeDelta& delay, + NudgeSource source, + syncer::ModelTypeSet types, + const tracked_objects::Location& nudge_location) = 0; + virtual void ScheduleNudgeWithPayloadsAsync( const base::TimeDelta& delay, NudgeSource source, const syncer::ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location); - - void CleanupDisabledTypes(); + const tracked_objects::Location& nudge_location) = 0; // Change status of notifications in the SyncSessionContext. - void set_notifications_enabled(bool notifications_enabled); + virtual void SetNotificationsEnabled(bool notifications_enabled) = 0; - base::TimeDelta sessions_commit_delay() const; - - // DDOS avoidance function. Calculates how long we should wait before trying - // again after a failed sync attempt, where the last delay was |base_delay|. - // TODO(tim): Look at URLRequestThrottlerEntryInterface. - static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay); + virtual base::TimeDelta GetSessionsCommitDelay() const = 0; // Called when credentials are updated by the user. - void OnCredentialsUpdated(); + virtual void OnCredentialsUpdated() = 0; // Called when the network layer detects a connection status change. - void OnConnectionStatusChange(); - - // SyncSession::Delegate implementation. - virtual void OnSilencedUntil( - const base::TimeTicks& silenced_until) OVERRIDE; - virtual bool IsSyncingCurrentlySilenced() OVERRIDE; - virtual void OnReceivedShortPollIntervalUpdate( - const base::TimeDelta& new_interval) OVERRIDE; - virtual void OnReceivedLongPollIntervalUpdate( - const base::TimeDelta& new_interval) OVERRIDE; - virtual void OnReceivedSessionsCommitDelay( - const base::TimeDelta& new_delay) OVERRIDE; - virtual void OnShouldStopSyncingPermanently() OVERRIDE; - virtual void OnSyncProtocolError( - const sessions::SyncSessionSnapshot& snapshot) OVERRIDE; - - private: - enum JobProcessDecision { - // Indicates we should continue with the current job. - CONTINUE, - // Indicates that we should save it to be processed later. - SAVE, - // Indicates we should drop this job. - DROP, - }; - - struct SyncSessionJob { - // An enum used to describe jobs for scheduling purposes. - enum SyncSessionJobPurpose { - // Uninitialized state, should never be hit in practice. - UNKNOWN = -1, - // Our poll timer schedules POLL jobs periodically based on a server - // assigned poll interval. - POLL, - // A nudge task can come from a variety of components needing to force - // a sync. The source is inferable from |session.source()|. - NUDGE, - // Typically used for fetching updates for a subset of the enabled types - // during initial sync or reconfiguration. We don't run all steps of - // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). - CONFIGURATION, - // The user disabled some types and we have to clean up the data - // for those. - CLEANUP_DISABLED_TYPES, - }; - SyncSessionJob(); - SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, - linked_ptr<sessions::SyncSession> session, bool is_canary_job, - const ConfigurationParams& config_params, - const tracked_objects::Location& nudge_location); - ~SyncSessionJob(); - static const char* GetPurposeString(SyncSessionJobPurpose purpose); - - SyncSessionJobPurpose purpose; - base::TimeTicks scheduled_start; - linked_ptr<sessions::SyncSession> session; - bool is_canary_job; - ConfigurationParams config_params; - - // This is the location the job came from. Used for debugging. - // In case of multiple nudges getting coalesced this stores the - // first location that came in. - tracked_objects::Location from_here; - }; - friend class SyncSchedulerTest; - friend class SyncSchedulerWhiteboxTest; - friend class SyncerTest; - - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, - DropNudgeWhileExponentialBackOff); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, SaveNudge); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, - SaveNudgeWhileTypeThrottled); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueNudge); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, DropPoll); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinuePoll); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueConfiguration); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, - SaveConfigurationWhileThrottled); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, - SaveNudgeWhileThrottled); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, - ContinueCanaryJobConfig); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, - ContinueNudgeWhileExponentialBackOff); - FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest, TransientPollFailure); - - // A component used to get time delays associated with exponential backoff. - // Encapsulated into a class to facilitate testing. - class DelayProvider { - public: - DelayProvider(); - virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay); - virtual ~DelayProvider(); - private: - DISALLOW_COPY_AND_ASSIGN(DelayProvider); - }; - - struct WaitInterval { - enum Mode { - // Uninitialized state, should not be set in practice. - UNKNOWN = -1, - // A wait interval whose duration has been affected by exponential - // backoff. - // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. - EXPONENTIAL_BACKOFF, - // A server-initiated throttled interval. We do not allow any syncing - // during such an interval. - THROTTLED, - }; - WaitInterval(); - ~WaitInterval(); - WaitInterval(Mode mode, base::TimeDelta length); - - static const char* GetModeString(Mode mode); - - Mode mode; - - // This bool is set to true if we have observed a nudge during this - // interval and mode == EXPONENTIAL_BACKOFF. - bool had_nudge; - base::TimeDelta length; - base::OneShotTimer<SyncScheduler> timer; - - // Configure jobs are saved only when backing off or throttling. So we - // expose the pointer here. - scoped_ptr<SyncSessionJob> pending_configure_job; - }; - - static const char* GetModeString(Mode mode); - - static const char* GetDecisionString(JobProcessDecision decision); - - // Assign |start| and |end| to appropriate SyncerStep values for the - // specified |purpose|. - static void SetSyncerStepsForPurpose( - SyncSessionJob::SyncSessionJobPurpose purpose, - SyncerStep* start, SyncerStep* end); - - // Helpers that log before posting to |sync_loop_|. These will only post - // the task in between calls to Start/Stop. - void PostTask(const tracked_objects::Location& from_here, - const char* name, - const base::Closure& task); - void PostDelayedTask(const tracked_objects::Location& from_here, - const char* name, - const base::Closure& task, - base::TimeDelta delay); - - // Helper to assemble a job and post a delayed task to sync. - void ScheduleSyncSessionJob(const SyncSessionJob& job); - - // Invoke the Syncer to perform a sync. - void DoSyncSessionJob(const SyncSessionJob& job); - - // Called after the Syncer has performed the sync represented by |job|, to - // reset our state. - void FinishSyncSessionJob(const SyncSessionJob& job); - - // Record important state that might be needed in future syncs, such as which - // data types may require cleanup. - void UpdateCarryoverSessionState(const SyncSessionJob& old_job); - - // Helper to FinishSyncSessionJob to schedule the next sync operation. - void ScheduleNextSync(const SyncSessionJob& old_job); - - // Helper to configure polling intervals. Used by Start and ScheduleNextSync. - void AdjustPolling(const SyncSessionJob* old_job); - - // Helper to restart waiting with |wait_interval_|'s timer. - void RestartWaiting(); - - // Helper to ScheduleNextSync in case of consecutive sync errors. - void HandleContinuationError(const SyncSessionJob& old_job); - - // Determines if it is legal to run |job| by checking current - // operational mode, backoff or throttling, freshness - // (so we don't make redundant syncs), and connection. - bool ShouldRunJob(const SyncSessionJob& job); - - // Decide whether we should CONTINUE, SAVE or DROP the job. - JobProcessDecision DecideOnJob(const SyncSessionJob& job); - - // Decide on whether to CONTINUE, SAVE or DROP the job when we are in - // backoff mode. - JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); - - // Saves the job for future execution. Note: It drops all the poll jobs. - void SaveJob(const SyncSessionJob& job); - - // Coalesces the current job with the pending nudge. - void InitOrCoalescePendingJob(const SyncSessionJob& job); - - // 'Impl' here refers to real implementation of public functions, running on - // |thread_|. - void StopImpl(const base::Closure& callback); - void ScheduleNudgeImpl( - const base::TimeDelta& delay, - sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, - const syncer::ModelTypePayloadMap& types_with_payloads, - bool is_canary_job, const tracked_objects::Location& nudge_location); - - // Returns true if the client is currently in exponential backoff. - bool IsBackingOff() const; - - // Helper to signal all listeners registered with |session_context_|. - void Notify(SyncEngineEvent::EventCause cause); - - // Callback to change backoff state. - void DoCanaryJob(); - void Unthrottle(); - - // Executes the pending job. Called whenever an event occurs that may - // change conditions permitting a job to run. Like when network connection is - // re-established, mode changes etc. - void DoPendingJobIfPossible(bool is_canary_job); - - // Called when the root cause of the current connection error is fixed. - void OnServerConnectionErrorFixed(); - - // The pointer is owned by the caller. - syncer::sessions::SyncSession* CreateSyncSession( - const syncer::sessions::SyncSourceInfo& info); - - // Creates a session for a poll and performs the sync. - void PollTimerCallback(); - - // Used to update |connection_code_|, see below. - void UpdateServerConnectionManagerStatus( - HttpResponse::ServerConnectionCode code); - - // Called once the first time thread_ is started to broadcast an initial - // session snapshot containing data like initial_sync_ended. Important when - // the client starts up and does not need to perform an initial sync. - void SendInitialSnapshot(); - - virtual void OnActionableError(const sessions::SyncSessionSnapshot& snapshot); - - base::WeakPtrFactory<SyncScheduler> weak_ptr_factory_; - - // A second factory specially for weak_handle_this_, to allow the handle - // to be const and alleviate threading concerns. - base::WeakPtrFactory<SyncScheduler> weak_ptr_factory_for_weak_handle_; - - // For certain methods that need to worry about X-thread posting. - const WeakHandle<SyncScheduler> weak_handle_this_; - - // Used for logging. - const std::string name_; - - // The message loop this object is on. Almost all methods have to - // be called on this thread. - MessageLoop* const sync_loop_; - - // Set in Start(), unset in Stop(). - bool started_; - - // Modifiable versions of kDefaultLongPollIntervalSeconds which can be - // updated by the server. - base::TimeDelta syncer_short_poll_interval_seconds_; - base::TimeDelta syncer_long_poll_interval_seconds_; - - // Server-tweakable sessions commit delay. - base::TimeDelta sessions_commit_delay_; - - // Periodic timer for polling. See AdjustPolling. - base::RepeatingTimer<SyncScheduler> poll_timer_; - - // The mode of operation. - Mode mode_; - - // TODO(tim): Bug 26339. This needs to track more than just time I think, - // since the nudges could be for different types. Current impl doesn't care. - base::TimeTicks last_sync_session_end_time_; - - // The latest connection code we got while trying to connect. - HttpResponse::ServerConnectionCode connection_code_; - - // Tracks in-flight nudges so we can coalesce. - scoped_ptr<SyncSessionJob> pending_nudge_; - - // Current wait state. Null if we're not in backoff and not throttled. - scoped_ptr<WaitInterval> wait_interval_; - - scoped_ptr<DelayProvider> delay_provider_; - - // Invoked to run through the sync cycle. - scoped_ptr<Syncer> syncer_; - - sessions::SyncSessionContext *session_context_; - - DISALLOW_COPY_AND_ASSIGN(SyncScheduler); + virtual void OnConnectionStatusChange() = 0; }; } // namespace syncer diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc new file mode 100644 index 0000000..289e189 --- /dev/null +++ b/sync/engine/sync_scheduler_impl.cc @@ -0,0 +1,1209 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_scheduler_impl.h" + +#include <algorithm> +#include <cstring> + +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/location.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/rand_util.h" +#include "sync/engine/syncer.h" +#include "sync/engine/throttled_data_type_tracker.h" +#include "sync/protocol/proto_enum_conversions.h" +#include "sync/protocol/sync.pb.h" +#include "sync/util/data_type_histogram.h" +#include "sync/util/logging.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace syncer { + +using sessions::SyncSession; +using sessions::SyncSessionSnapshot; +using sessions::SyncSourceInfo; +using sync_pb::GetUpdatesCallerInfo; + +namespace { +bool ShouldRequestEarlyExit( + const syncer::SyncProtocolError& error) { + switch (error.error_type) { + case syncer::SYNC_SUCCESS: + case syncer::MIGRATION_DONE: + case syncer::THROTTLED: + case syncer::TRANSIENT_ERROR: + return false; + case syncer::NOT_MY_BIRTHDAY: + case syncer::CLEAR_PENDING: + // If we send terminate sync early then |sync_cycle_ended| notification + // would not be sent. If there were no actions then |ACTIONABLE_ERROR| + // notification wouldnt be sent either. Then the UI layer would be left + // waiting forever. So assert we would send something. + DCHECK(error.action != syncer::UNKNOWN_ACTION); + return true; + case syncer::INVALID_CREDENTIAL: + // The notification for this is handled by PostAndProcessHeaders|. + // Server does no have to send any action for this. + return true; + // Make the default a NOTREACHED. So if a new error is introduced we + // think about its expected functionality. + default: + NOTREACHED(); + return false; + } +} + +bool IsActionableError( + const syncer::SyncProtocolError& error) { + return (error.action != syncer::UNKNOWN_ACTION); +} +} // namespace + +ConfigurationParams::ConfigurationParams() + : source(GetUpdatesCallerInfo::UNKNOWN), + keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {} +ConfigurationParams::ConfigurationParams( + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, + const syncer::ModelTypeSet& types_to_download, + const syncer::ModelSafeRoutingInfo& routing_info, + KeystoreKeyStatus keystore_key_status, + const base::Closure& ready_task) + : source(source), + types_to_download(types_to_download), + routing_info(routing_info), + keystore_key_status(keystore_key_status), + ready_task(ready_task) { + DCHECK(!ready_task.is_null()); +} +ConfigurationParams::~ConfigurationParams() {} + +SyncSchedulerImpl::DelayProvider::DelayProvider() {} +SyncSchedulerImpl::DelayProvider::~DelayProvider() {} + +SyncSchedulerImpl::WaitInterval::WaitInterval() + : mode(UNKNOWN), + had_nudge(false) { +} + +SyncSchedulerImpl::WaitInterval::~WaitInterval() {} + +#define ENUM_CASE(x) case x: return #x; break; + +const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { + switch (mode) { + ENUM_CASE(UNKNOWN); + ENUM_CASE(EXPONENTIAL_BACKOFF); + ENUM_CASE(THROTTLED); + } + NOTREACHED(); + return ""; +} + +SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() + : purpose(UNKNOWN), + is_canary_job(false) { +} + +SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} + +SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, + base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, + bool is_canary_job, + const ConfigurationParams& config_params, + const tracked_objects::Location& from_here) + : purpose(purpose), + scheduled_start(start), + session(session), + is_canary_job(is_canary_job), + config_params(config_params), + from_here(from_here) { +} + +const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( + SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { + switch (purpose) { + ENUM_CASE(UNKNOWN); + ENUM_CASE(POLL); + ENUM_CASE(NUDGE); + ENUM_CASE(CONFIGURATION); + ENUM_CASE(CLEANUP_DISABLED_TYPES); + } + NOTREACHED(); + return ""; +} + +TimeDelta SyncSchedulerImpl::DelayProvider::GetDelay( + const base::TimeDelta& last_delay) { + return SyncSchedulerImpl::GetRecommendedDelay(last_delay); +} + +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_LOCAL_REFRESH: + return GetUpdatesCallerInfo::DATATYPE_REFRESH; + case NUDGE_SOURCE_UNKNOWN: + return GetUpdatesCallerInfo::UNKNOWN; + default: + NOTREACHED(); + return GetUpdatesCallerInfo::UNKNOWN; + } +} + +SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) + : mode(mode), had_nudge(false), length(length) { } + +// Helper macros to log with the syncer thread name; useful when there +// are multiple syncer threads involved. + +#define SLOG(severity) LOG(severity) << name_ << ": " + +#define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " + +#define SDVLOG_LOC(from_here, verbose_level) \ + DVLOG_LOC(from_here, verbose_level) << name_ << ": " + +namespace { + +const int kDefaultSessionsCommitDelaySeconds = 10; + +bool IsConfigRelatedUpdateSourceValue( + GetUpdatesCallerInfo::GetUpdatesSource source) { + switch (source) { + case GetUpdatesCallerInfo::RECONFIGURATION: + case GetUpdatesCallerInfo::MIGRATION: + case GetUpdatesCallerInfo::NEW_CLIENT: + case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE: + return true; + default: + return false; + } +} + +} // namespace + +SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, + sessions::SyncSessionContext* context, + Syncer* syncer) + : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), + weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), + weak_handle_this_(MakeWeakHandle( + weak_ptr_factory_for_weak_handle_.GetWeakPtr())), + name_(name), + sync_loop_(MessageLoop::current()), + started_(false), + syncer_short_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), + syncer_long_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), + sessions_commit_delay_( + TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), + mode_(NORMAL_MODE), + // Start with assuming everything is fine with the connection. + // At the end of the sync cycle we would have the correct status. + connection_code_(HttpResponse::SERVER_CONNECTION_OK), + delay_provider_(new DelayProvider()), + syncer_(syncer), + session_context_(context) { + DCHECK(sync_loop_); +} + +SyncSchedulerImpl::~SyncSchedulerImpl() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + StopImpl(base::Closure()); +} + +void SyncSchedulerImpl::OnCredentialsUpdated() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + + // TODO(lipalani): crbug.com/106262. One issue here is that if after + // the auth error we happened to do gettime and it succeeded then + // the |connection_code_| would be briefly OK however it would revert + // back to SYNC_AUTH_ERROR at the end of the sync cycle. The + // referenced bug explores the option of removing gettime calls + // altogethere + if (HttpResponse::SYNC_AUTH_ERROR == connection_code_) { + OnServerConnectionErrorFixed(); + } +} + +void SyncSchedulerImpl::OnConnectionStatusChange() { + if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { + // Optimistically assume that the connection is fixed and try + // connecting. + OnServerConnectionErrorFixed(); + } +} + +void SyncSchedulerImpl::OnServerConnectionErrorFixed() { + connection_code_ = HttpResponse::SERVER_CONNECTION_OK; + PostTask(FROM_HERE, "DoCanaryJob", + base::Bind(&SyncSchedulerImpl::DoCanaryJob, + weak_ptr_factory_.GetWeakPtr())); + +} + +void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( + HttpResponse::ServerConnectionCode code) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG(2) << "New server connection code: " + << HttpResponse::GetServerConnectionCodeString(code); + + connection_code_ = code; +} + +void SyncSchedulerImpl::Start(Mode mode) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + std::string thread_name = MessageLoop::current()->thread_name(); + if (thread_name.empty()) + thread_name = "<Main thread>"; + SDVLOG(2) << "Start called from thread " + << thread_name << " with mode " << GetModeString(mode); + if (!started_) { + started_ = true; + SendInitialSnapshot(); + } + + DCHECK(!session_context_->account_name().empty()); + DCHECK(syncer_.get()); + Mode old_mode = mode_; + mode_ = mode; + AdjustPolling(NULL); // Will kick start poll timer if needed. + + if (old_mode != mode_) { + // We just changed our mode. See if there are any pending jobs that we could + // execute in the new mode. + DoPendingJobIfPossible(false); + } +} + +void SyncSchedulerImpl::SendInitialSnapshot() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, + SyncSourceInfo(), ModelSafeRoutingInfo(), + std::vector<ModelSafeWorker*>())); + SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); + event.snapshot = dummy->TakeSnapshot(); + session_context_->NotifyListeners(event); +} + +namespace { + +// Helper to extract the routing info and workers corresponding to types in +// |types| from |current_routes| and |current_workers|. +void BuildModelSafeParams( + const ModelTypeSet& types_to_download, + const ModelSafeRoutingInfo& current_routes, + const std::vector<ModelSafeWorker*>& current_workers, + ModelSafeRoutingInfo* result_routes, + std::vector<ModelSafeWorker*>* result_workers) { + std::set<ModelSafeGroup> active_groups; + active_groups.insert(GROUP_PASSIVE); + for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); + iter.Inc()) { + syncer::ModelType type = iter.Get(); + ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); + DCHECK(route != current_routes.end()); + ModelSafeGroup group = route->second; + (*result_routes)[type] = group; + active_groups.insert(group); + } + + for(std::vector<ModelSafeWorker*>::const_iterator iter = + current_workers.begin(); iter != current_workers.end(); ++iter) { + if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) + result_workers->push_back(*iter); + } +} + +} // namespace. + +bool SyncSchedulerImpl::ScheduleConfiguration( + const ConfigurationParams& params) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); + DCHECK_EQ(CONFIGURATION_MODE, mode_); + DCHECK(!params.ready_task.is_null()); + SDVLOG(2) << "Reconfiguring syncer."; + + // Only one configuration is allowed at a time. Verify we're not waiting + // for a pending configure job. + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); + + // TODO(sync): now that ModelChanging commands only use those workers within + // the routing info, we don't really need |restricted_workers|. Remove it. + // crbug.com/133030 + syncer::ModelSafeRoutingInfo restricted_routes; + std::vector<ModelSafeWorker*> restricted_workers; + BuildModelSafeParams(params.types_to_download, + params.routing_info, + session_context_->workers(), + &restricted_routes, + &restricted_workers); + session_context_->set_routing_info(params.routing_info); + + // We rely on this not failing, so don't need to worry about checking for + // success. In addition, this will be removed as part of crbug.com/131433. + SyncSessionJob cleanup_job( + SyncSessionJob::CLEANUP_DISABLED_TYPES, + TimeTicks::Now(), + make_linked_ptr(CreateSyncSession(SyncSourceInfo())), + false, + ConfigurationParams(), + FROM_HERE); + DoSyncSessionJob(cleanup_job); + + if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) { + // TODO(zea): implement in such a way that we can handle failures and the + // subsequent retrys the scheduler might perform. See crbug.com/129665. + NOTIMPLEMENTED(); + } + + // Only reconfigure if we have types to download. + if (!params.types_to_download.Empty()) { + DCHECK(!restricted_routes.empty()); + linked_ptr<SyncSession> session(new SyncSession( + session_context_, + this, + SyncSourceInfo(params.source, + ModelSafeRoutingInfoToPayloadMap( + restricted_routes, + std::string())), + restricted_routes, + restricted_workers)); + SyncSessionJob job(SyncSessionJob::CONFIGURATION, + TimeTicks::Now(), + session, + false, + params, + FROM_HERE); + DoSyncSessionJob(job); + + // If we failed, the job would have been saved as the pending configure + // job and a wait interval would have been set. + if (!session->Succeeded()) { + DCHECK(wait_interval_.get() && + wait_interval_->pending_configure_job.get()); + return false; + } + } else { + SDVLOG(2) << "No change in routing info, calling ready task directly."; + params.ready_task.Run(); + } + + return true; +} + +SyncSchedulerImpl::JobProcessDecision +SyncSchedulerImpl::DecideWhileInWaitInterval( + const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DCHECK(wait_interval_.get()); + DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); + + SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " + << WaitInterval::GetModeString(wait_interval_->mode) + << (wait_interval_->had_nudge ? " (had nudge)" : "") + << (job.is_canary_job ? " (canary)" : ""); + + if (job.purpose == SyncSessionJob::POLL) + return DROP; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || + job.purpose == SyncSessionJob::CONFIGURATION); + if (wait_interval_->mode == WaitInterval::THROTTLED) + return SAVE; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + if (job.purpose == SyncSessionJob::NUDGE) { + if (mode_ == CONFIGURATION_MODE) + return SAVE; + + // If we already had one nudge then just drop this nudge. We will retry + // later when the timer runs out. + if (!job.is_canary_job) + return wait_interval_->had_nudge ? DROP : CONTINUE; + else // We are here because timer ran out. So retry. + return CONTINUE; + } + return job.is_canary_job ? CONTINUE : SAVE; +} + +SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( + const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (job.purpose == SyncSessionJob::CLEANUP_DISABLED_TYPES) + return CONTINUE; + + // See if our type is throttled. + syncer::ModelTypeSet throttled_types = + session_context_->throttled_data_type_tracker()->GetThrottledTypes(); + if (job.purpose == SyncSessionJob::NUDGE && + job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { + syncer::ModelTypeSet requested_types; + for (ModelTypePayloadMap::const_iterator i = + job.session->source().types.begin(); + i != job.session->source().types.end(); + ++i) { + requested_types.Put(i->first); + } + + if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) + return SAVE; + } + + if (wait_interval_.get()) + return DecideWhileInWaitInterval(job); + + if (mode_ == CONFIGURATION_MODE) { + if (job.purpose == SyncSessionJob::NUDGE) + return SAVE; + else if (job.purpose == SyncSessionJob::CONFIGURATION) + return CONTINUE; + else + return DROP; + } + + // We are in normal mode. + DCHECK_EQ(mode_, NORMAL_MODE); + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); + + // Freshness condition + if (job.scheduled_start < last_sync_session_end_time_) { + SDVLOG(2) << "Dropping job because of freshness"; + return DROP; + } + + if (!session_context_->connection_manager()->HasInvalidAuthToken()) + return CONTINUE; + + SDVLOG(2) << "No valid auth token. Using that to decide on job."; + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; +} + +void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); + if (pending_nudge_.get() == NULL) { + SDVLOG(2) << "Creating a pending nudge job"; + SyncSession* s = job.session.get(); + scoped_ptr<SyncSession> session(new SyncSession(s->context(), + s->delegate(), s->source(), s->routing_info(), s->workers())); + + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, + make_linked_ptr(session.release()), false, + ConfigurationParams(), job.from_here); + pending_nudge_.reset(new SyncSessionJob(new_job)); + + return; + } + + SDVLOG(2) << "Coalescing a pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + pending_nudge_->scheduled_start = job.scheduled_start; + + // Unfortunately the nudge location cannot be modified. So it stores the + // location of the first caller. +} + +bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DCHECK(started_); + + JobProcessDecision decision = DecideOnJob(job); + SDVLOG(2) << "Should run " + << SyncSessionJob::GetPurposeString(job.purpose) + << " job in mode " << GetModeString(mode_) + << ": " << GetDecisionString(decision); + if (decision != SAVE) + return decision == CONTINUE; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == + SyncSessionJob::CONFIGURATION); + + SaveJob(job); + return false; +} + +void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + // TODO(sync): Should we also check that job.purpose != + // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) + if (job.purpose == SyncSessionJob::NUDGE) { + SDVLOG(2) << "Saving a nudge job"; + InitOrCoalescePendingJob(job); + } else if (job.purpose == SyncSessionJob::CONFIGURATION){ + SDVLOG(2) << "Saving a configuration job"; + DCHECK(wait_interval_.get()); + DCHECK(mode_ == CONFIGURATION_MODE); + + // Config params should always get set. + DCHECK(!job.config_params.ready_task.is_null()); + SyncSession* old = job.session.get(); + SyncSession* s(new SyncSession(session_context_, this, old->source(), + old->routing_info(), old->workers())); + SyncSessionJob new_job(job.purpose, + TimeTicks::Now(), + make_linked_ptr(s), + false, + job.config_params, + job.from_here); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); + } // drop the rest. + // TODO(sync): Is it okay to drop the rest? It's weird that + // SaveJob() only does what it says sometimes. (See + // http://crbug.com/90868.) +} + +// Functor for std::find_if to search by ModelSafeGroup. +struct ModelSafeWorkerGroupIs { + explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} + bool operator()(ModelSafeWorker* w) { + return group == w->GetModelSafeGroup(); + } + ModelSafeGroup group; +}; + +void SyncSchedulerImpl::ScheduleNudgeAsync( + const TimeDelta& delay, + NudgeSource source, ModelTypeSet types, + const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG_LOC(nudge_location, 2) + << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " + << "source " << GetNudgeSourceString(source) << ", " + << "types " << ModelTypeSetToString(types); + + ModelTypePayloadMap types_with_payloads = + syncer::ModelTypePayloadMapFromEnumSet(types, std::string()); + SyncSchedulerImpl::ScheduleNudgeImpl(delay, + GetUpdatesFromNudgeSource(source), + types_with_payloads, + false, + nudge_location); +} + +void SyncSchedulerImpl::ScheduleNudgeWithPayloadsAsync( + const TimeDelta& delay, + NudgeSource source, const ModelTypePayloadMap& types_with_payloads, + const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG_LOC(nudge_location, 2) + << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " + << "source " << GetNudgeSourceString(source) << ", " + << "payloads " + << syncer::ModelTypePayloadMapToString(types_with_payloads); + + SyncSchedulerImpl::ScheduleNudgeImpl(delay, + GetUpdatesFromNudgeSource(source), + types_with_payloads, + false, + nudge_location); +} + +void SyncSchedulerImpl::ScheduleNudgeImpl( + const TimeDelta& delay, + GetUpdatesCallerInfo::GetUpdatesSource source, + const ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + + SDVLOG_LOC(nudge_location, 2) + << "In ScheduleNudgeImpl with delay " + << delay.InMilliseconds() << " ms, " + << "source " << GetUpdatesSourceString(source) << ", " + << "payloads " + << syncer::ModelTypePayloadMapToString(types_with_payloads) + << (is_canary_job ? " (canary)" : ""); + + SyncSourceInfo info(source, types_with_payloads); + + SyncSession* session(CreateSyncSession(info)); + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, + make_linked_ptr(session), is_canary_job, + ConfigurationParams(), nudge_location); + + session = NULL; + if (!ShouldRunJob(job)) + return; + + if (pending_nudge_.get()) { + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { + SDVLOG(2) << "Dropping the nudge because we are in backoff"; + return; + } + + SDVLOG(2) << "Coalescing pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + + SDVLOG(2) << "Rescheduling pending nudge"; + SyncSession* s = pending_nudge_->session.get(); + job.session.reset(new SyncSession(s->context(), s->delegate(), + s->source(), s->routing_info(), s->workers())); + + // Choose the start time as the earliest of the 2. + job.scheduled_start = std::min(job.scheduled_start, + pending_nudge_->scheduled_start); + pending_nudge_.reset(); + } + + // TODO(zea): Consider adding separate throttling/backoff for datatype + // refresh requests. + ScheduleSyncSessionJob(job); +} + +const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { + switch (mode) { + ENUM_CASE(CONFIGURATION_MODE); + ENUM_CASE(NORMAL_MODE); + } + return ""; +} + +const char* SyncSchedulerImpl::GetDecisionString( + SyncSchedulerImpl::JobProcessDecision mode) { + switch (mode) { + ENUM_CASE(CONTINUE); + ENUM_CASE(SAVE); + ENUM_CASE(DROP); + } + return ""; +} + +// static +void SyncSchedulerImpl::SetSyncerStepsForPurpose( + SyncSessionJob::SyncSessionJobPurpose purpose, + SyncerStep* start, + SyncerStep* end) { + switch (purpose) { + case SyncSessionJob::CONFIGURATION: + *start = DOWNLOAD_UPDATES; + *end = APPLY_UPDATES; + return; + case SyncSessionJob::NUDGE: + case SyncSessionJob::POLL: + *start = SYNCER_BEGIN; + *end = SYNCER_END; + return; + case SyncSessionJob::CLEANUP_DISABLED_TYPES: + *start = CLEANUP_DISABLED_TYPES; + *end = CLEANUP_DISABLED_TYPES; + return; + default: + NOTREACHED(); + *start = SYNCER_END; + *end = SYNCER_END; + return; + } +} + +void SyncSchedulerImpl::PostTask( + const tracked_objects::Location& from_here, + const char* name, const base::Closure& task) { + SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (!started_) { + SDVLOG(1) << "Not posting task as scheduler is stopped."; + return; + } + sync_loop_->PostTask(from_here, task); +} + +void SyncSchedulerImpl::PostDelayedTask( + const tracked_objects::Location& from_here, + const char* name, const base::Closure& task, base::TimeDelta delay) { + SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " + << delay.InMilliseconds() << " ms delay"; + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (!started_) { + SDVLOG(1) << "Not posting task as scheduler is stopped."; + return; + } + sync_loop_->PostDelayedTask(from_here, task, delay); +} + +void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + TimeDelta delay = job.scheduled_start - TimeTicks::Now(); + if (delay < TimeDelta::FromMilliseconds(0)) + delay = TimeDelta::FromMilliseconds(0); + SDVLOG_LOC(job.from_here, 2) + << "In ScheduleSyncSessionJob with " + << SyncSessionJob::GetPurposeString(job.purpose) + << " job and " << delay.InMilliseconds() << " ms delay"; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || + job.purpose == SyncSessionJob::POLL); + if (job.purpose == SyncSessionJob::NUDGE) { + SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; + DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == + job.session); + pending_nudge_.reset(new SyncSessionJob(job)); + } + PostDelayedTask(job.from_here, "DoSyncSessionJob", + base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, + weak_ptr_factory_.GetWeakPtr(), + job), + delay); +} + +void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (!ShouldRunJob(job)) { + SLOG(WARNING) + << "Not executing " + << SyncSessionJob::GetPurposeString(job.purpose) << " job from " + << GetUpdatesSourceString(job.session->source().updates_source); + return; + } + + if (job.purpose == SyncSessionJob::NUDGE) { + if (pending_nudge_.get() == NULL || + pending_nudge_->session != job.session) { + SDVLOG(2) << "Dropping a nudge in " + << "DoSyncSessionJob because another nudge was scheduled"; + return; // Another nudge must have been scheduled in in the meantime. + } + pending_nudge_.reset(); + + // Create the session with the latest model safe table and use it to purge + // and update any disabled or modified entries in the job. + scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); + + job.session->RebaseRoutingInfoWithLatest(*session); + } + SDVLOG(2) << "DoSyncSessionJob with " + << SyncSessionJob::GetPurposeString(job.purpose) << " job"; + + SyncerStep begin(SYNCER_END); + SyncerStep end(SYNCER_END); + SetSyncerStepsForPurpose(job.purpose, &begin, &end); + + bool has_more_to_sync = true; + while (ShouldRunJob(job) && has_more_to_sync) { + SDVLOG(2) << "Calling SyncShare."; + // Synchronously perform the sync session from this thread. + syncer_->SyncShare(job.session.get(), begin, end); + has_more_to_sync = job.session->HasMoreToSync(); + if (has_more_to_sync) + job.session->PrepareForAnotherSyncCycle(); + } + SDVLOG(2) << "Done SyncShare looping."; + + FinishSyncSessionJob(job); +} + +void SyncSchedulerImpl::UpdateCarryoverSessionState( + const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + // Whatever types were part of a configuration task will have had updates + // downloaded. For that reason, we make sure they get recorded in the + // event that they get disabled at a later time. + ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); + if (!r.empty()) { + ModelSafeRoutingInfo temp_r; + ModelSafeRoutingInfo old_info(old_job.session->routing_info()); + std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), + std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); + session_context_->set_previous_session_routing_info(temp_r); + } + } else { + session_context_->set_previous_session_routing_info( + old_job.session->routing_info()); + } +} + +void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + // Update timing information for how often datatypes are triggering nudges. + base::TimeTicks now = TimeTicks::Now(); + if (!last_sync_session_end_time_.is_null()) { + ModelTypePayloadMap::const_iterator iter; + for (iter = job.session->source().types.begin(); + iter != job.session->source().types.end(); + ++iter) { +#define PER_DATA_TYPE_MACRO(type_str) \ + SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, \ + now - last_sync_session_end_time_); + SYNC_DATA_TYPE_HISTOGRAM(iter->first); +#undef PER_DATA_TYPE_MACRO + } + } + last_sync_session_end_time_ = now; + + // Now update the status of the connection from SCM. We need this to decide + // whether we need to save/run future jobs. The notifications from SCM are not + // reliable. + // + // TODO(rlarocque): crbug.com/110954 + // We should get rid of the notifications and it is probably not needed to + // maintain this status variable in 2 places. We should query it directly from + // SCM when needed. + ServerConnectionManager* scm = session_context_->connection_manager(); + UpdateServerConnectionManagerStatus(scm->server_status()); + + UpdateCarryoverSessionState(job); + if (IsSyncingCurrentlySilenced()) { + SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; + // TODO(sync): Investigate whether we need to check job.purpose + // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) + SaveJob(job); + return; // Nothing to do. + } else if (job.session->Succeeded() && + !job.config_params.ready_task.is_null()) { + // If this was a configuration job with a ready task, invoke it now that + // we finished successfully. + job.config_params.ready_task.Run(); + } + + SDVLOG(2) << "Updating the next polling time after SyncMain"; + ScheduleNextSync(job); +} + +void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DCHECK(!old_job.session->HasMoreToSync()); + + AdjustPolling(&old_job); + + if (old_job.session->Succeeded()) { + // Only reset backoff if we actually reached the server. + if (old_job.session->SuccessfullyReachedServer()) + wait_interval_.reset(); + SDVLOG(2) << "Job succeeded so not scheduling more jobs"; + return; + } + + if (old_job.purpose == SyncSessionJob::POLL) { + return; // We don't retry POLL jobs. + } + + // TODO(rlarocque): There's no reason why we should blindly backoff and retry + // if we don't succeed. Some types of errors are not likely to disappear on + // their own. With the return values now available in the old_job.session, we + // should be able to detect such errors and only retry when we detect + // transient errors. + + if (IsBackingOff() && wait_interval_->timer.IsRunning() && + mode_ == NORMAL_MODE) { + // When in normal mode, we allow up to one nudge per backoff interval. It + // appears that this was our nudge for this interval, and it failed. + // + // Note: This does not prevent us from running canary jobs. For example, an + // IP address change might still result in another nudge being executed + // during this backoff interval. + SDVLOG(2) << "A nudge during backoff failed"; + + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); + DCHECK(!wait_interval_->had_nudge); + + wait_interval_->had_nudge = true; + InitOrCoalescePendingJob(old_job); + RestartWaiting(); + } else { + // Either this is the first failure or a consecutive failure after our + // backoff timer expired. We handle it the same way in either case. + SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; + HandleContinuationError(old_job); + } +} + +void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + + TimeDelta poll = (!session_context_->notifications_enabled()) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + bool rate_changed = !poll_timer_.IsRunning() || + poll != poll_timer_.GetCurrentDelay(); + + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) + poll_timer_.Reset(); + + if (!rate_changed) + return; + + // Adjust poll rate. + poll_timer_.Stop(); + poll_timer_.Start(FROM_HERE, poll, this, + &SyncSchedulerImpl::PollTimerCallback); +} + +void SyncSchedulerImpl::RestartWaiting() { + CHECK(wait_interval_.get()); + wait_interval_->timer.Stop(); + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, + this, &SyncSchedulerImpl::DoCanaryJob); +} + +void SyncSchedulerImpl::HandleContinuationError( + const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (DCHECK_IS_ON()) { + if (IsBackingOff()) { + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); + } + } + + TimeDelta length = delay_provider_->GetDelay( + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + + SDVLOG(2) << "In handle continuation error with " + << SyncSessionJob::GetPurposeString(old_job.purpose) + << " job. The time delta(ms) is " + << length.InMilliseconds(); + + // This will reset the had_nudge variable as well. + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, + length)); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + SDVLOG(2) << "Configuration did not succeed, scheduling retry."; + // Config params should always get set. + DCHECK(!old_job.config_params.ready_task.is_null()); + SyncSession* old = old_job.session.get(); + SyncSession* s(new SyncSession(session_context_, this, + old->source(), old->routing_info(), old->workers())); + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, + make_linked_ptr(s), false, old_job.config_params, + FROM_HERE); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); + } else { + // We are not in configuration mode. So wait_interval's pending job + // should be null. + DCHECK(wait_interval_->pending_configure_job.get() == NULL); + + // TODO(lipalani) - handle clear user data. + InitOrCoalescePendingJob(old_job); + } + RestartWaiting(); +} + +// static +TimeDelta SyncSchedulerImpl::GetRecommendedDelay(const TimeDelta& last_delay) { + if (last_delay.InSeconds() >= kMaxBackoffSeconds) + return TimeDelta::FromSeconds(kMaxBackoffSeconds); + + // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 + int64 backoff_s = + std::max(static_cast<int64>(1), + last_delay.InSeconds() * kBackoffRandomizationFactor); + + // Flip a coin to randomize backoff interval by +/- 50%. + int rand_sign = base::RandInt(0, 1) * 2 - 1; + + // Truncation is adequate for rounding here. + backoff_s = backoff_s + + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); + + // Cap the backoff interval. + backoff_s = std::max(static_cast<int64>(1), + std::min(backoff_s, kMaxBackoffSeconds)); + + return TimeDelta::FromSeconds(backoff_s); +} + +void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { + syncer_->RequestEarlyExit(); // Safe to call from any thread. + DCHECK(weak_handle_this_.IsInitialized()); + SDVLOG(3) << "Posting StopImpl"; + weak_handle_this_.Call(FROM_HERE, + &SyncSchedulerImpl::StopImpl, + callback); +} + +void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG(2) << "StopImpl called"; + + // Kill any in-flight method calls. + weak_ptr_factory_.InvalidateWeakPtrs(); + wait_interval_.reset(); + poll_timer_.Stop(); + if (started_) { + started_ = false; + } + if (!callback.is_null()) + callback.Run(); +} + +void SyncSchedulerImpl::DoCanaryJob() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG(2) << "Do canary job"; + DoPendingJobIfPossible(true); +} + +void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SyncSessionJob* job_to_execute = NULL; + if (mode_ == CONFIGURATION_MODE && wait_interval_.get() + && wait_interval_->pending_configure_job.get()) { + SDVLOG(2) << "Found pending configure job"; + job_to_execute = wait_interval_->pending_configure_job.get(); + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { + SDVLOG(2) << "Found pending nudge job"; + // Pending jobs mostly have time from the past. Reset it so this job + // will get executed. + if (pending_nudge_->scheduled_start < TimeTicks::Now()) + pending_nudge_->scheduled_start = TimeTicks::Now(); + + scoped_ptr<SyncSession> session(CreateSyncSession( + pending_nudge_->session->source())); + + // Also the routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + pending_nudge_->session->Coalesce(*(session.get())); + // The pending nudge would be cleared in the DoSyncSessionJob function. + job_to_execute = pending_nudge_.get(); + } + + if (job_to_execute != NULL) { + SDVLOG(2) << "Executing pending job"; + SyncSessionJob copy = *job_to_execute; + copy.is_canary_job = is_canary_job; + DoSyncSessionJob(copy); + } +} + +SyncSession* SyncSchedulerImpl::CreateSyncSession( + const SyncSourceInfo& source) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DVLOG(2) << "Creating sync session with routes " + << ModelSafeRoutingInfoToString(session_context_->routing_info()); + + SyncSourceInfo info(source); + SyncSession* session(new SyncSession(session_context_, this, info, + session_context_->routing_info(), session_context_->workers())); + + return session; +} + +void SyncSchedulerImpl::PollTimerCallback() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + ModelSafeRoutingInfo r; + ModelTypePayloadMap types_with_payloads = + ModelSafeRoutingInfoToPayloadMap(r, std::string()); + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); + SyncSession* s = CreateSyncSession(info); + + SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), + make_linked_ptr(s), + false, + ConfigurationParams(), + FROM_HERE); + + ScheduleSyncSessionJob(job); +} + +void SyncSchedulerImpl::Unthrottle() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + SDVLOG(2) << "Unthrottled."; + DoCanaryJob(); + wait_interval_.reset(); +} + +void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + session_context_->NotifyListeners(SyncEngineEvent(cause)); +} + +bool SyncSchedulerImpl::IsBackingOff() const { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::EXPONENTIAL_BACKOFF; +} + +void SyncSchedulerImpl::OnSilencedUntil( + const base::TimeTicks& silenced_until) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, + silenced_until - TimeTicks::Now())); + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, + &SyncSchedulerImpl::Unthrottle); +} + +bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::THROTTLED; +} + +void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + syncer_short_poll_interval_seconds_ = new_interval; +} + +void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + syncer_long_poll_interval_seconds_ = new_interval; +} + +void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( + const base::TimeDelta& new_delay) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + sessions_commit_delay_ = new_delay; +} + +void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG(2) << "OnShouldStopSyncingPermanently"; + syncer_->RequestEarlyExit(); // Thread-safe. + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); +} + +void SyncSchedulerImpl::OnActionableError( + const sessions::SyncSessionSnapshot& snap) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + SDVLOG(2) << "OnActionableError"; + SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); + event.snapshot = snap; + session_context_->NotifyListeners(event); +} + +void SyncSchedulerImpl::OnSyncProtocolError( + const sessions::SyncSessionSnapshot& snapshot) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + if (ShouldRequestEarlyExit( + snapshot.model_neutral_state().sync_protocol_error)) { + SDVLOG(2) << "Sync Scheduler requesting early exit."; + syncer_->RequestEarlyExit(); // Thread-safe. + } + if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error)) + OnActionableError(snapshot); +} + +void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + session_context_->set_notifications_enabled(notifications_enabled); +} + +base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { + DCHECK_EQ(MessageLoop::current(), sync_loop_); + return sessions_commit_delay_; +} + +#undef SDVLOG_LOC + +#undef SDVLOG + +#undef SLOG + +#undef ENUM_CASE + +} // namespace syncer diff --git a/sync/engine/sync_scheduler_impl.h b/sync/engine/sync_scheduler_impl.h new file mode 100644 index 0000000..dfc3bff --- /dev/null +++ b/sync/engine/sync_scheduler_impl.h @@ -0,0 +1,360 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_SYNC_SCHEDULER_IMPL_H_ +#define SYNC_ENGINE_SYNC_SCHEDULER_IMPL_H_ + +#include <string> + +#include "base/callback.h" +#include "base/compiler_specific.h" +#include "base/gtest_prod_util.h" +#include "base/memory/linked_ptr.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/observer_list.h" +#include "base/time.h" +#include "base/timer.h" +#include "sync/engine/net/server_connection_manager.h" +#include "sync/engine/nudge_source.h" +#include "sync/engine/sync_scheduler.h" +#include "sync/engine/syncer.h" +#include "sync/internal_api/public/base/model_type_payload_map.h" +#include "sync/internal_api/public/engine/polling_constants.h" +#include "sync/internal_api/public/util/weak_handle.h" +#include "sync/sessions/sync_session.h" +#include "sync/sessions/sync_session_context.h" + +namespace syncer { + +class SyncSchedulerImpl : public SyncScheduler { + public: + // |name| is a display string to identify the syncer thread. Takes + // |ownership of |syncer|. + SyncSchedulerImpl(const std::string& name, + sessions::SyncSessionContext* context, Syncer* syncer); + + // Calls Stop(). + virtual ~SyncSchedulerImpl(); + + virtual void Start(Mode mode) OVERRIDE; + virtual bool ScheduleConfiguration( + const ConfigurationParams& params) OVERRIDE; + virtual void RequestStop(const base::Closure& callback) OVERRIDE; + virtual void ScheduleNudgeAsync( + const base::TimeDelta& delay, + NudgeSource source, + syncer::ModelTypeSet types, + const tracked_objects::Location& nudge_location) OVERRIDE; + virtual void ScheduleNudgeWithPayloadsAsync( + const base::TimeDelta& delay, NudgeSource source, + const syncer::ModelTypePayloadMap& types_with_payloads, + const tracked_objects::Location& nudge_location) OVERRIDE; + virtual void SetNotificationsEnabled(bool notifications_enabled) OVERRIDE; + + virtual base::TimeDelta GetSessionsCommitDelay() const OVERRIDE; + + virtual void OnCredentialsUpdated() OVERRIDE; + virtual void OnConnectionStatusChange() OVERRIDE; + + // SyncSession::Delegate implementation. + virtual void OnSilencedUntil( + const base::TimeTicks& silenced_until) OVERRIDE; + virtual bool IsSyncingCurrentlySilenced() OVERRIDE; + virtual void OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) OVERRIDE; + virtual void OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) OVERRIDE; + virtual void OnReceivedSessionsCommitDelay( + const base::TimeDelta& new_delay) OVERRIDE; + virtual void OnShouldStopSyncingPermanently() OVERRIDE; + virtual void OnSyncProtocolError( + const sessions::SyncSessionSnapshot& snapshot) OVERRIDE; + + // DDOS avoidance function. Calculates how long we should wait before trying + // again after a failed sync attempt, where the last delay was |base_delay|. + // TODO(tim): Look at URLRequestThrottlerEntryInterface. + static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay); + + private: + enum JobProcessDecision { + // Indicates we should continue with the current job. + CONTINUE, + // Indicates that we should save it to be processed later. + SAVE, + // Indicates we should drop this job. + DROP, + }; + + struct SyncSessionJob { + // An enum used to describe jobs for scheduling purposes. + enum SyncSessionJobPurpose { + // Uninitialized state, should never be hit in practice. + UNKNOWN = -1, + // Our poll timer schedules POLL jobs periodically based on a server + // assigned poll interval. + POLL, + // A nudge task can come from a variety of components needing to force + // a sync. The source is inferable from |session.source()|. + NUDGE, + // Typically used for fetching updates for a subset of the enabled types + // during initial sync or reconfiguration. We don't run all steps of + // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). + CONFIGURATION, + // The user disabled some types and we have to clean up the data + // for those. + CLEANUP_DISABLED_TYPES, + }; + SyncSessionJob(); + SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const ConfigurationParams& config_params, + const tracked_objects::Location& nudge_location); + ~SyncSessionJob(); + static const char* GetPurposeString(SyncSessionJobPurpose purpose); + + SyncSessionJobPurpose purpose; + base::TimeTicks scheduled_start; + linked_ptr<sessions::SyncSession> session; + bool is_canary_job; + ConfigurationParams config_params; + + // This is the location the job came from. Used for debugging. + // In case of multiple nudges getting coalesced this stores the + // first location that came in. + tracked_objects::Location from_here; + }; + friend class SyncSchedulerTest; + friend class SyncSchedulerWhiteboxTest; + friend class SyncerTest; + + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, + DropNudgeWhileExponentialBackOff); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, SaveNudge); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, + SaveNudgeWhileTypeThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueNudge); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, DropPoll); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinuePoll); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueConfiguration); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, + SaveConfigurationWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, + SaveNudgeWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, + ContinueCanaryJobConfig); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, + ContinueNudgeWhileExponentialBackOff); + FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest, TransientPollFailure); + + // A component used to get time delays associated with exponential backoff. + // Encapsulated into a class to facilitate testing. + class DelayProvider { + public: + DelayProvider(); + virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay); + virtual ~DelayProvider(); + private: + DISALLOW_COPY_AND_ASSIGN(DelayProvider); + }; + + struct WaitInterval { + enum Mode { + // Uninitialized state, should not be set in practice. + UNKNOWN = -1, + // A wait interval whose duration has been affected by exponential + // backoff. + // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. + EXPONENTIAL_BACKOFF, + // A server-initiated throttled interval. We do not allow any syncing + // during such an interval. + THROTTLED, + }; + WaitInterval(); + ~WaitInterval(); + WaitInterval(Mode mode, base::TimeDelta length); + + static const char* GetModeString(Mode mode); + + Mode mode; + + // This bool is set to true if we have observed a nudge during this + // interval and mode == EXPONENTIAL_BACKOFF. + bool had_nudge; + base::TimeDelta length; + base::OneShotTimer<SyncSchedulerImpl> timer; + + // Configure jobs are saved only when backing off or throttling. So we + // expose the pointer here. + scoped_ptr<SyncSessionJob> pending_configure_job; + }; + + static const char* GetModeString(Mode mode); + + static const char* GetDecisionString(JobProcessDecision decision); + + // Assign |start| and |end| to appropriate SyncerStep values for the + // specified |purpose|. + static void SetSyncerStepsForPurpose( + SyncSessionJob::SyncSessionJobPurpose purpose, + SyncerStep* start, SyncerStep* end); + + // Helpers that log before posting to |sync_loop_|. These will only post + // the task in between calls to Start/Stop. + void PostTask(const tracked_objects::Location& from_here, + const char* name, + const base::Closure& task); + void PostDelayedTask(const tracked_objects::Location& from_here, + const char* name, + const base::Closure& task, + base::TimeDelta delay); + + // Helper to assemble a job and post a delayed task to sync. + void ScheduleSyncSessionJob(const SyncSessionJob& job); + + // Invoke the Syncer to perform a sync. + void DoSyncSessionJob(const SyncSessionJob& job); + + // Called after the Syncer has performed the sync represented by |job|, to + // reset our state. + void FinishSyncSessionJob(const SyncSessionJob& job); + + // Record important state that might be needed in future syncs, such as which + // data types may require cleanup. + void UpdateCarryoverSessionState(const SyncSessionJob& old_job); + + // Helper to FinishSyncSessionJob to schedule the next sync operation. + void ScheduleNextSync(const SyncSessionJob& old_job); + + // Helper to configure polling intervals. Used by Start and ScheduleNextSync. + void AdjustPolling(const SyncSessionJob* old_job); + + // Helper to restart waiting with |wait_interval_|'s timer. + void RestartWaiting(); + + // Helper to ScheduleNextSync in case of consecutive sync errors. + void HandleContinuationError(const SyncSessionJob& old_job); + + // Determines if it is legal to run |job| by checking current + // operational mode, backoff or throttling, freshness + // (so we don't make redundant syncs), and connection. + bool ShouldRunJob(const SyncSessionJob& job); + + // Decide whether we should CONTINUE, SAVE or DROP the job. + JobProcessDecision DecideOnJob(const SyncSessionJob& job); + + // Decide on whether to CONTINUE, SAVE or DROP the job when we are in + // backoff mode. + JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); + + // Saves the job for future execution. Note: It drops all the poll jobs. + void SaveJob(const SyncSessionJob& job); + + // Coalesces the current job with the pending nudge. + void InitOrCoalescePendingJob(const SyncSessionJob& job); + + // 'Impl' here refers to real implementation of public functions, running on + // |thread_|. + void StopImpl(const base::Closure& callback); + void ScheduleNudgeImpl( + const base::TimeDelta& delay, + sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, + const syncer::ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location); + + // Returns true if the client is currently in exponential backoff. + bool IsBackingOff() const; + + // Helper to signal all listeners registered with |session_context_|. + void Notify(SyncEngineEvent::EventCause cause); + + // Callback to change backoff state. + void DoCanaryJob(); + void Unthrottle(); + + // Executes the pending job. Called whenever an event occurs that may + // change conditions permitting a job to run. Like when network connection is + // re-established, mode changes etc. + void DoPendingJobIfPossible(bool is_canary_job); + + // Called when the root cause of the current connection error is fixed. + void OnServerConnectionErrorFixed(); + + // The pointer is owned by the caller. + syncer::sessions::SyncSession* CreateSyncSession( + const syncer::sessions::SyncSourceInfo& info); + + // Creates a session for a poll and performs the sync. + void PollTimerCallback(); + + // Used to update |connection_code_|, see below. + void UpdateServerConnectionManagerStatus( + HttpResponse::ServerConnectionCode code); + + // Called once the first time thread_ is started to broadcast an initial + // session snapshot containing data like initial_sync_ended. Important when + // the client starts up and does not need to perform an initial sync. + void SendInitialSnapshot(); + + virtual void OnActionableError(const sessions::SyncSessionSnapshot& snapshot); + + base::WeakPtrFactory<SyncSchedulerImpl> weak_ptr_factory_; + + // A second factory specially for weak_handle_this_, to allow the handle + // to be const and alleviate threading concerns. + base::WeakPtrFactory<SyncSchedulerImpl> weak_ptr_factory_for_weak_handle_; + + // For certain methods that need to worry about X-thread posting. + const WeakHandle<SyncSchedulerImpl> weak_handle_this_; + + // Used for logging. + const std::string name_; + + // The message loop this object is on. Almost all methods have to + // be called on this thread. + MessageLoop* const sync_loop_; + + // Set in Start(), unset in Stop(). + bool started_; + + // Modifiable versions of kDefaultLongPollIntervalSeconds which can be + // updated by the server. + base::TimeDelta syncer_short_poll_interval_seconds_; + base::TimeDelta syncer_long_poll_interval_seconds_; + + // Server-tweakable sessions commit delay. + base::TimeDelta sessions_commit_delay_; + + // Periodic timer for polling. See AdjustPolling. + base::RepeatingTimer<SyncSchedulerImpl> poll_timer_; + + // The mode of operation. + Mode mode_; + + // TODO(tim): Bug 26339. This needs to track more than just time I think, + // since the nudges could be for different types. Current impl doesn't care. + base::TimeTicks last_sync_session_end_time_; + + // The latest connection code we got while trying to connect. + HttpResponse::ServerConnectionCode connection_code_; + + // Tracks in-flight nudges so we can coalesce. + scoped_ptr<SyncSessionJob> pending_nudge_; + + // Current wait state. Null if we're not in backoff and not throttled. + scoped_ptr<WaitInterval> wait_interval_; + + scoped_ptr<DelayProvider> delay_provider_; + + // Invoked to run through the sync cycle. + scoped_ptr<Syncer> syncer_; + + sessions::SyncSessionContext *session_context_; + + DISALLOW_COPY_AND_ASSIGN(SyncSchedulerImpl); +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_SYNC_SCHEDULER_IMPL_H_ diff --git a/sync/engine/sync_scheduler_unittest.cc b/sync/engine/sync_scheduler_unittest.cc index 6c3e3eb..add275f 100644 --- a/sync/engine/sync_scheduler_unittest.cc +++ b/sync/engine/sync_scheduler_unittest.cc @@ -8,7 +8,7 @@ #include "base/memory/weak_ptr.h" #include "base/message_loop.h" #include "base/test/test_timeouts.h" -#include "sync/engine/sync_scheduler.h" +#include "sync/engine/sync_scheduler_impl.h" #include "sync/engine/syncer.h" #include "sync/engine/throttled_data_type_tracker.h" #include "sync/sessions/test_util.h" @@ -88,7 +88,7 @@ class SyncSchedulerTest : public testing::Test { syncer_(NULL), delay_(NULL) {} - class MockDelayProvider : public SyncScheduler::DelayProvider { + class MockDelayProvider : public SyncSchedulerImpl::DelayProvider { public: MOCK_METHOD1(GetDelay, TimeDelta(const TimeDelta&)); }; @@ -124,10 +124,10 @@ class SyncSchedulerTest : public testing::Test { context_->set_notifications_enabled(true); context_->set_account_name("Test"); scheduler_.reset( - new SyncScheduler("TestSyncScheduler", context(), syncer_)); + new SyncSchedulerImpl("TestSyncScheduler", context(), syncer_)); } - SyncScheduler* scheduler() { return scheduler_.get(); } + SyncSchedulerImpl* scheduler() { return scheduler_.get(); } MockSyncer* syncer() { return syncer_; } MockDelayProvider* delay() { return delay_; } MockConnectionManager* connection() { return connection_.get(); } @@ -215,7 +215,7 @@ class SyncSchedulerTest : public testing::Test { TestDirectorySetterUpper dir_maker_; scoped_ptr<MockConnectionManager> connection_; scoped_ptr<SyncSessionContext> context_; - scoped_ptr<SyncScheduler> scheduler_; + scoped_ptr<SyncSchedulerImpl> scheduler_; MockSyncer* syncer_; MockDelayProvider* delay_; std::vector<scoped_refptr<FakeModelWorker> > workers_; @@ -618,7 +618,7 @@ TEST_F(SyncSchedulerTest, PollNotificationsDisabled) { WithArg<0>(RecordSyncShareMultiple(&records, kMinNumSamples)))); scheduler()->OnReceivedShortPollIntervalUpdate(poll_interval); - scheduler()->set_notifications_enabled(false); + scheduler()->SetNotificationsEnabled(false); TimeTicks optimal_start = TimeTicks::Now() + poll_interval; StartSyncScheduler(SyncScheduler::NORMAL_MODE); @@ -670,16 +670,16 @@ TEST_F(SyncSchedulerTest, SessionsCommitDelay) { Invoke(sessions::test_util::SimulateSuccess), QuitLoopNowAction())); - EXPECT_EQ(delay1, scheduler()->sessions_commit_delay()); + EXPECT_EQ(delay1, scheduler()->GetSessionsCommitDelay()); StartSyncScheduler(SyncScheduler::NORMAL_MODE); - EXPECT_EQ(delay1, scheduler()->sessions_commit_delay()); + EXPECT_EQ(delay1, scheduler()->GetSessionsCommitDelay()); const ModelTypeSet model_types(syncer::BOOKMARKS); scheduler()->ScheduleNudgeAsync( zero(), NUDGE_SOURCE_LOCAL, model_types, FROM_HERE); RunLoop(); - EXPECT_EQ(delay2, scheduler()->sessions_commit_delay()); + EXPECT_EQ(delay2, scheduler()->GetSessionsCommitDelay()); StopSyncScheduler(); } @@ -1047,18 +1047,20 @@ TEST_F(SyncSchedulerTest, TransientPollFailure) { TEST_F(SyncSchedulerTest, GetRecommendedDelay) { EXPECT_LE(TimeDelta::FromSeconds(0), - SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(0))); + SyncSchedulerImpl::GetRecommendedDelay(TimeDelta::FromSeconds(0))); EXPECT_LE(TimeDelta::FromSeconds(1), - SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(1))); + SyncSchedulerImpl::GetRecommendedDelay(TimeDelta::FromSeconds(1))); EXPECT_LE(TimeDelta::FromSeconds(50), - SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(50))); + SyncSchedulerImpl::GetRecommendedDelay( + TimeDelta::FromSeconds(50))); EXPECT_LE(TimeDelta::FromSeconds(10), - SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(10))); + SyncSchedulerImpl::GetRecommendedDelay( + TimeDelta::FromSeconds(10))); EXPECT_EQ(TimeDelta::FromSeconds(kMaxBackoffSeconds), - SyncScheduler::GetRecommendedDelay( + SyncSchedulerImpl::GetRecommendedDelay( TimeDelta::FromSeconds(kMaxBackoffSeconds))); EXPECT_EQ(TimeDelta::FromSeconds(kMaxBackoffSeconds), - SyncScheduler::GetRecommendedDelay( + SyncSchedulerImpl::GetRecommendedDelay( TimeDelta::FromSeconds(kMaxBackoffSeconds + 1))); } diff --git a/sync/engine/sync_scheduler_whitebox_unittest.cc b/sync/engine/sync_scheduler_whitebox_unittest.cc index 760083f..71af129 100644 --- a/sync/engine/sync_scheduler_whitebox_unittest.cc +++ b/sync/engine/sync_scheduler_whitebox_unittest.cc @@ -4,7 +4,7 @@ #include "base/message_loop.h" #include "base/time.h" -#include "sync/engine/sync_scheduler.h" +#include "sync/engine/sync_scheduler_impl.h" #include "sync/engine/throttled_data_type_tracker.h" #include "sync/sessions/sync_session_context.h" #include "sync/sessions/test_util.h" @@ -55,7 +55,7 @@ class SyncSchedulerWhiteboxTest : public testing::Test { context_->set_notifications_enabled(true); context_->set_account_name("Test"); scheduler_.reset( - new SyncScheduler("TestSyncSchedulerWhitebox", context(), syncer)); + new SyncSchedulerImpl("TestSyncSchedulerWhitebox", context(), syncer)); } virtual void TearDown() { @@ -75,14 +75,14 @@ class SyncSchedulerWhiteboxTest : public testing::Test { } void SetWaitIntervalToThrottled() { - scheduler_->wait_interval_.reset(new SyncScheduler::WaitInterval( - SyncScheduler::WaitInterval::THROTTLED, TimeDelta::FromSeconds(1))); + scheduler_->wait_interval_.reset(new SyncSchedulerImpl::WaitInterval( + SyncSchedulerImpl::WaitInterval::THROTTLED, TimeDelta::FromSeconds(1))); } void SetWaitIntervalToExponentialBackoff() { scheduler_->wait_interval_.reset( - new SyncScheduler::WaitInterval( - SyncScheduler::WaitInterval::EXPONENTIAL_BACKOFF, + new SyncSchedulerImpl::WaitInterval( + SyncSchedulerImpl::WaitInterval::EXPONENTIAL_BACKOFF, TimeDelta::FromSeconds(1))); } @@ -90,8 +90,8 @@ class SyncSchedulerWhiteboxTest : public testing::Test { scheduler_->wait_interval_->had_nudge = had_nudge; } - SyncScheduler::JobProcessDecision DecideOnJob( - const SyncScheduler::SyncSessionJob& job) { + SyncSchedulerImpl::JobProcessDecision DecideOnJob( + const SyncSchedulerImpl::SyncSessionJob& job) { return scheduler_->DecideOnJob(job); } @@ -101,10 +101,10 @@ class SyncSchedulerWhiteboxTest : public testing::Test { SetLastSyncedTime(base::TimeTicks::Now()); } - SyncScheduler::JobProcessDecision CreateAndDecideJob( - SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { + SyncSchedulerImpl::JobProcessDecision CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { SyncSession* s = scheduler_->CreateSyncSession(SyncSourceInfo()); - SyncScheduler::SyncSessionJob job(purpose, TimeTicks::Now(), + SyncSchedulerImpl::SyncSessionJob job(purpose, TimeTicks::Now(), make_linked_ptr(s), false, ConfigurationParams(), @@ -125,7 +125,7 @@ class SyncSchedulerWhiteboxTest : public testing::Test { protected: // Declared here to ensure it is destructed before the objects it references. - scoped_ptr<SyncScheduler> scheduler_; + scoped_ptr<SyncSchedulerImpl> scheduler_; }; TEST_F(SyncSchedulerWhiteboxTest, SaveNudge) { @@ -134,10 +134,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudge) { // Now set the mode to configure. SetMode(SyncScheduler::CONFIGURATION_MODE); - SyncScheduler::JobProcessDecision decision = - CreateAndDecideJob(SyncScheduler::SyncSessionJob::NUDGE); + SyncSchedulerImpl::JobProcessDecision decision = + CreateAndDecideJob(SyncSchedulerImpl::SyncSessionJob::NUDGE); - EXPECT_EQ(decision, SyncScheduler::SAVE); + EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileTypeThrottled) { @@ -157,53 +157,54 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileTypeThrottled) { SyncSession* s = scheduler_->CreateSyncSession(info); // Now schedule a nudge with just bookmarks and the change is local. - SyncScheduler::SyncSessionJob job(SyncScheduler::SyncSessionJob::NUDGE, - TimeTicks::Now(), - make_linked_ptr(s), - false, - ConfigurationParams(), - FROM_HERE); - - SyncScheduler::JobProcessDecision decision = DecideOnJob(job); - EXPECT_EQ(decision, SyncScheduler::SAVE); + SyncSchedulerImpl::SyncSessionJob job( + SyncSchedulerImpl::SyncSessionJob::NUDGE, + TimeTicks::Now(), + make_linked_ptr(s), + false, + ConfigurationParams(), + FROM_HERE); + + SyncSchedulerImpl::JobProcessDecision decision = DecideOnJob(job); + EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } TEST_F(SyncSchedulerWhiteboxTest, ContinueNudge) { InitializeSyncerOnNormalMode(); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::NUDGE); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::NUDGE); - EXPECT_EQ(decision, SyncScheduler::CONTINUE); + EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } TEST_F(SyncSchedulerWhiteboxTest, DropPoll) { InitializeSyncerOnNormalMode(); SetMode(SyncScheduler::CONFIGURATION_MODE); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::POLL); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::POLL); - EXPECT_EQ(decision, SyncScheduler::DROP); + EXPECT_EQ(decision, SyncSchedulerImpl::DROP); } TEST_F(SyncSchedulerWhiteboxTest, ContinuePoll) { InitializeSyncerOnNormalMode(); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::POLL); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::POLL); - EXPECT_EQ(decision, SyncScheduler::CONTINUE); + EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } TEST_F(SyncSchedulerWhiteboxTest, ContinueConfiguration) { InitializeSyncerOnNormalMode(); SetMode(SyncScheduler::CONFIGURATION_MODE); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::CONFIGURATION); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::CONFIGURATION); - EXPECT_EQ(decision, SyncScheduler::CONTINUE); + EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } TEST_F(SyncSchedulerWhiteboxTest, SaveConfigurationWhileThrottled) { @@ -212,10 +213,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveConfigurationWhileThrottled) { SetWaitIntervalToThrottled(); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::CONFIGURATION); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::CONFIGURATION); - EXPECT_EQ(decision, SyncScheduler::SAVE); + EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileThrottled) { @@ -224,10 +225,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileThrottled) { SetWaitIntervalToThrottled(); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::NUDGE); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::NUDGE); - EXPECT_EQ(decision, SyncScheduler::SAVE); + EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } TEST_F(SyncSchedulerWhiteboxTest, ContinueNudgeWhileExponentialBackOff) { @@ -235,10 +236,10 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueNudgeWhileExponentialBackOff) { SetMode(SyncScheduler::NORMAL_MODE); SetWaitIntervalToExponentialBackoff(); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::NUDGE); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::NUDGE); - EXPECT_EQ(decision, SyncScheduler::CONTINUE); + EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } TEST_F(SyncSchedulerWhiteboxTest, DropNudgeWhileExponentialBackOff) { @@ -247,10 +248,10 @@ TEST_F(SyncSchedulerWhiteboxTest, DropNudgeWhileExponentialBackOff) { SetWaitIntervalToExponentialBackoff(); SetWaitIntervalHadNudge(true); - SyncScheduler::JobProcessDecision decision = CreateAndDecideJob( - SyncScheduler::SyncSessionJob::NUDGE); + SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( + SyncSchedulerImpl::SyncSessionJob::NUDGE); - EXPECT_EQ(decision, SyncScheduler::DROP); + EXPECT_EQ(decision, SyncSchedulerImpl::DROP); } TEST_F(SyncSchedulerWhiteboxTest, ContinueCanaryJobConfig) { @@ -258,13 +259,13 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueCanaryJobConfig) { SetMode(SyncScheduler::CONFIGURATION_MODE); SetWaitIntervalToExponentialBackoff(); - struct SyncScheduler::SyncSessionJob job; - job.purpose = SyncScheduler::SyncSessionJob::CONFIGURATION; + struct SyncSchedulerImpl::SyncSessionJob job; + job.purpose = SyncSchedulerImpl::SyncSessionJob::CONFIGURATION; job.scheduled_start = TimeTicks::Now(); job.is_canary_job = true; - SyncScheduler::JobProcessDecision decision = DecideOnJob(job); + SyncSchedulerImpl::JobProcessDecision decision = DecideOnJob(job); - EXPECT_EQ(decision, SyncScheduler::CONTINUE); + EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } } // namespace syncer diff --git a/sync/engine/syncer_unittest.cc b/sync/engine/syncer_unittest.cc index c468623..3f6dde7 100644 --- a/sync/engine/syncer_unittest.cc +++ b/sync/engine/syncer_unittest.cc @@ -26,7 +26,7 @@ #include "sync/engine/get_commit_ids_command.h" #include "sync/engine/net/server_connection_manager.h" #include "sync/engine/process_updates_command.h" -#include "sync/engine/sync_scheduler.h" +#include "sync/engine/sync_scheduler_impl.h" #include "sync/engine/syncer.h" #include "sync/engine/syncer_proto_util.h" #include "sync/engine/throttled_data_type_tracker.h" @@ -185,10 +185,10 @@ class SyncerTest : public testing::Test, } bool SyncShareAsDelegate( - SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { + SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { SyncerStep start; SyncerStep end; - SyncScheduler::SetSyncerStepsForPurpose(purpose, &start, &end); + SyncSchedulerImpl::SetSyncerStepsForPurpose(purpose, &start, &end); session_.reset(MakeSession()); syncer_->SyncShare(session_.get(), start, end); @@ -197,12 +197,13 @@ class SyncerTest : public testing::Test, bool SyncShareNudge() { session_.reset(MakeSession()); - return SyncShareAsDelegate(SyncScheduler::SyncSessionJob::NUDGE); + return SyncShareAsDelegate(SyncSchedulerImpl::SyncSessionJob::NUDGE); } bool SyncShareConfigure() { session_.reset(MakeSession()); - return SyncShareAsDelegate(SyncScheduler::SyncSessionJob::CONFIGURATION); + return SyncShareAsDelegate( + SyncSchedulerImpl::SyncSessionJob::CONFIGURATION); } void LoopSyncShare() { |