summaryrefslogtreecommitdiffstats
path: root/sync/engine
diff options
context:
space:
mode:
authortim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-07-19 22:19:18 +0000
committertim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-07-19 22:19:18 +0000
commit6d2dc98e938a0ff4da17e78e96ae6261bece64ef (patch)
tree9127eb2b563f30bbc4a7b3d38969ba0768289f85 /sync/engine
parent0b9db6796ae57fe8bb36b2e89528efa51f8e938b (diff)
downloadchromium_src-6d2dc98e938a0ff4da17e78e96ae6261bece64ef.zip
chromium_src-6d2dc98e938a0ff4da17e78e96ae6261bece64ef.tar.gz
chromium_src-6d2dc98e938a0ff4da17e78e96ae6261bece64ef.tar.bz2
sync: Remove SyncManager::TestingMode in favour of InternalComponentsFactory.
Turns SyncScheduler into an interface and adds a FakeSyncScheduler class for tests. BUG=117836 TEST= Review URL: https://chromiumcodereview.appspot.com/10701046 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@147553 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'sync/engine')
-rw-r--r--sync/engine/sync_scheduler.cc1196
-rw-r--r--sync/engine/sync_scheduler.h342
-rw-r--r--sync/engine/sync_scheduler_impl.cc1209
-rw-r--r--sync/engine/sync_scheduler_impl.h360
-rw-r--r--sync/engine/sync_scheduler_unittest.cc32
-rw-r--r--sync/engine/sync_scheduler_whitebox_unittest.cc105
-rw-r--r--sync/engine/syncer_unittest.cc11
7 files changed, 1662 insertions, 1593 deletions
diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc
index 04b30f0..d800b19 100644
--- a/sync/engine/sync_scheduler.cc
+++ b/sync/engine/sync_scheduler.cc
@@ -4,1201 +4,9 @@
#include "sync/engine/sync_scheduler.h"
-#include <algorithm>
-#include <cstring>
-
-#include "base/bind.h"
-#include "base/compiler_specific.h"
-#include "base/location.h"
-#include "base/logging.h"
-#include "base/message_loop.h"
-#include "base/rand_util.h"
-#include "sync/engine/syncer.h"
-#include "sync/engine/throttled_data_type_tracker.h"
-#include "sync/protocol/proto_enum_conversions.h"
-#include "sync/protocol/sync.pb.h"
-#include "sync/util/data_type_histogram.h"
-#include "sync/util/logging.h"
-
-using base::TimeDelta;
-using base::TimeTicks;
-
namespace syncer {
-using sessions::SyncSession;
-using sessions::SyncSessionSnapshot;
-using sessions::SyncSourceInfo;
-using sync_pb::GetUpdatesCallerInfo;
-
-namespace {
-bool ShouldRequestEarlyExit(
- const syncer::SyncProtocolError& error) {
- switch (error.error_type) {
- case syncer::SYNC_SUCCESS:
- case syncer::MIGRATION_DONE:
- case syncer::THROTTLED:
- case syncer::TRANSIENT_ERROR:
- return false;
- case syncer::NOT_MY_BIRTHDAY:
- case syncer::CLEAR_PENDING:
- // If we send terminate sync early then |sync_cycle_ended| notification
- // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
- // notification wouldnt be sent either. Then the UI layer would be left
- // waiting forever. So assert we would send something.
- DCHECK(error.action != syncer::UNKNOWN_ACTION);
- return true;
- case syncer::INVALID_CREDENTIAL:
- // The notification for this is handled by PostAndProcessHeaders|.
- // Server does no have to send any action for this.
- return true;
- // Make the default a NOTREACHED. So if a new error is introduced we
- // think about its expected functionality.
- default:
- NOTREACHED();
- return false;
- }
-}
-
-bool IsActionableError(
- const syncer::SyncProtocolError& error) {
- return (error.action != syncer::UNKNOWN_ACTION);
-}
-} // namespace
-
-ConfigurationParams::ConfigurationParams()
- : source(GetUpdatesCallerInfo::UNKNOWN),
- keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {}
-ConfigurationParams::ConfigurationParams(
- const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
- const syncer::ModelTypeSet& types_to_download,
- const syncer::ModelSafeRoutingInfo& routing_info,
- KeystoreKeyStatus keystore_key_status,
- const base::Closure& ready_task)
- : source(source),
- types_to_download(types_to_download),
- routing_info(routing_info),
- keystore_key_status(keystore_key_status),
- ready_task(ready_task) {
- DCHECK(!ready_task.is_null());
-}
-ConfigurationParams::~ConfigurationParams() {}
-
-SyncScheduler::DelayProvider::DelayProvider() {}
-SyncScheduler::DelayProvider::~DelayProvider() {}
-
-SyncScheduler::WaitInterval::WaitInterval()
- : mode(UNKNOWN),
- had_nudge(false) {
-}
-
-SyncScheduler::WaitInterval::~WaitInterval() {}
-
-#define ENUM_CASE(x) case x: return #x; break;
-
-const char* SyncScheduler::WaitInterval::GetModeString(Mode mode) {
- switch (mode) {
- ENUM_CASE(UNKNOWN);
- ENUM_CASE(EXPONENTIAL_BACKOFF);
- ENUM_CASE(THROTTLED);
- }
- NOTREACHED();
- return "";
-}
-
-SyncScheduler::SyncSessionJob::SyncSessionJob()
- : purpose(UNKNOWN),
- is_canary_job(false) {
-}
-
-SyncScheduler::SyncSessionJob::~SyncSessionJob() {}
-
-SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
- base::TimeTicks start,
- linked_ptr<sessions::SyncSession> session,
- bool is_canary_job,
- const ConfigurationParams& config_params,
- const tracked_objects::Location& from_here)
- : purpose(purpose),
- scheduled_start(start),
- session(session),
- is_canary_job(is_canary_job),
- config_params(config_params),
- from_here(from_here) {
-}
-
-const char* SyncScheduler::SyncSessionJob::GetPurposeString(
- SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) {
- switch (purpose) {
- ENUM_CASE(UNKNOWN);
- ENUM_CASE(POLL);
- ENUM_CASE(NUDGE);
- ENUM_CASE(CONFIGURATION);
- ENUM_CASE(CLEANUP_DISABLED_TYPES);
- }
- NOTREACHED();
- return "";
-}
-
-TimeDelta SyncScheduler::DelayProvider::GetDelay(
- const base::TimeDelta& last_delay) {
- return SyncScheduler::GetRecommendedDelay(last_delay);
-}
-
-GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
- NudgeSource source) {
- switch (source) {
- case NUDGE_SOURCE_NOTIFICATION:
- return GetUpdatesCallerInfo::NOTIFICATION;
- case NUDGE_SOURCE_LOCAL:
- return GetUpdatesCallerInfo::LOCAL;
- case NUDGE_SOURCE_CONTINUATION:
- return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
- case NUDGE_SOURCE_LOCAL_REFRESH:
- return GetUpdatesCallerInfo::DATATYPE_REFRESH;
- case NUDGE_SOURCE_UNKNOWN:
- return GetUpdatesCallerInfo::UNKNOWN;
- default:
- NOTREACHED();
- return GetUpdatesCallerInfo::UNKNOWN;
- }
-}
-
-SyncScheduler::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
- : mode(mode), had_nudge(false), length(length) { }
-
-// Helper macros to log with the syncer thread name; useful when there
-// are multiple syncer threads involved.
-
-#define SLOG(severity) LOG(severity) << name_ << ": "
-
-#define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
-
-#define SDVLOG_LOC(from_here, verbose_level) \
- DVLOG_LOC(from_here, verbose_level) << name_ << ": "
-
-namespace {
-
-const int kDefaultSessionsCommitDelaySeconds = 10;
-
-bool IsConfigRelatedUpdateSourceValue(
- GetUpdatesCallerInfo::GetUpdatesSource source) {
- switch (source) {
- case GetUpdatesCallerInfo::RECONFIGURATION:
- case GetUpdatesCallerInfo::MIGRATION:
- case GetUpdatesCallerInfo::NEW_CLIENT:
- case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
- return true;
- default:
- return false;
- }
-}
-
-} // namespace
-
-SyncScheduler::SyncScheduler(const std::string& name,
- sessions::SyncSessionContext* context,
- Syncer* syncer)
- : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
- weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
- weak_handle_this_(MakeWeakHandle(
- weak_ptr_factory_for_weak_handle_.GetWeakPtr())),
- name_(name),
- sync_loop_(MessageLoop::current()),
- started_(false),
- syncer_short_poll_interval_seconds_(
- TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
- syncer_long_poll_interval_seconds_(
- TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
- sessions_commit_delay_(
- TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
- mode_(NORMAL_MODE),
- // Start with assuming everything is fine with the connection.
- // At the end of the sync cycle we would have the correct status.
- connection_code_(HttpResponse::SERVER_CONNECTION_OK),
- delay_provider_(new DelayProvider()),
- syncer_(syncer),
- session_context_(context) {
- DCHECK(sync_loop_);
-}
-
-SyncScheduler::~SyncScheduler() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- StopImpl(base::Closure());
-}
-
-void SyncScheduler::OnCredentialsUpdated() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
-
- // TODO(lipalani): crbug.com/106262. One issue here is that if after
- // the auth error we happened to do gettime and it succeeded then
- // the |connection_code_| would be briefly OK however it would revert
- // back to SYNC_AUTH_ERROR at the end of the sync cycle. The
- // referenced bug explores the option of removing gettime calls
- // altogethere
- if (HttpResponse::SYNC_AUTH_ERROR == connection_code_) {
- OnServerConnectionErrorFixed();
- }
-}
-
-void SyncScheduler::OnConnectionStatusChange() {
- if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) {
- // Optimistically assume that the connection is fixed and try
- // connecting.
- OnServerConnectionErrorFixed();
- }
-}
-
-void SyncScheduler::OnServerConnectionErrorFixed() {
- connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
- PostTask(FROM_HERE, "DoCanaryJob",
- base::Bind(&SyncScheduler::DoCanaryJob,
- weak_ptr_factory_.GetWeakPtr()));
-
-}
-
-void SyncScheduler::UpdateServerConnectionManagerStatus(
- HttpResponse::ServerConnectionCode code) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG(2) << "New server connection code: "
- << HttpResponse::GetServerConnectionCodeString(code);
-
- connection_code_ = code;
-}
-
-void SyncScheduler::Start(Mode mode) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- std::string thread_name = MessageLoop::current()->thread_name();
- if (thread_name.empty())
- thread_name = "<Main thread>";
- SDVLOG(2) << "Start called from thread "
- << thread_name << " with mode " << GetModeString(mode);
- if (!started_) {
- started_ = true;
- SendInitialSnapshot();
- }
-
- DCHECK(!session_context_->account_name().empty());
- DCHECK(syncer_.get());
- Mode old_mode = mode_;
- mode_ = mode;
- AdjustPolling(NULL); // Will kick start poll timer if needed.
-
- if (old_mode != mode_) {
- // We just changed our mode. See if there are any pending jobs that we could
- // execute in the new mode.
- DoPendingJobIfPossible(false);
- }
-}
-
-void SyncScheduler::SendInitialSnapshot() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
- SyncSourceInfo(), ModelSafeRoutingInfo(),
- std::vector<ModelSafeWorker*>()));
- SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
- event.snapshot = dummy->TakeSnapshot();
- session_context_->NotifyListeners(event);
-}
-
-namespace {
-
-// Helper to extract the routing info and workers corresponding to types in
-// |types| from |current_routes| and |current_workers|.
-void BuildModelSafeParams(
- const ModelTypeSet& types_to_download,
- const ModelSafeRoutingInfo& current_routes,
- const std::vector<ModelSafeWorker*>& current_workers,
- ModelSafeRoutingInfo* result_routes,
- std::vector<ModelSafeWorker*>* result_workers) {
- std::set<ModelSafeGroup> active_groups;
- active_groups.insert(GROUP_PASSIVE);
- for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
- iter.Inc()) {
- syncer::ModelType type = iter.Get();
- ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
- DCHECK(route != current_routes.end());
- ModelSafeGroup group = route->second;
- (*result_routes)[type] = group;
- active_groups.insert(group);
- }
-
- for(std::vector<ModelSafeWorker*>::const_iterator iter =
- current_workers.begin(); iter != current_workers.end(); ++iter) {
- if (active_groups.count((*iter)->GetModelSafeGroup()) > 0)
- result_workers->push_back(*iter);
- }
-}
-
-} // namespace.
-
-bool SyncScheduler::ScheduleConfiguration(const ConfigurationParams& params) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
- DCHECK_EQ(CONFIGURATION_MODE, mode_);
- DCHECK(!params.ready_task.is_null());
- SDVLOG(2) << "Reconfiguring syncer.";
-
- // Only one configuration is allowed at a time. Verify we're not waiting
- // for a pending configure job.
- DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
-
- // TODO(sync): now that ModelChanging commands only use those workers within
- // the routing info, we don't really need |restricted_workers|. Remove it.
- // crbug.com/133030
- syncer::ModelSafeRoutingInfo restricted_routes;
- std::vector<ModelSafeWorker*> restricted_workers;
- BuildModelSafeParams(params.types_to_download,
- params.routing_info,
- session_context_->workers(),
- &restricted_routes,
- &restricted_workers);
- session_context_->set_routing_info(params.routing_info);
-
- // We rely on this not failing, so don't need to worry about checking for
- // success. In addition, this will be removed as part of crbug.com/131433.
- SyncSessionJob cleanup_job(
- SyncSessionJob::CLEANUP_DISABLED_TYPES,
- TimeTicks::Now(),
- make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
- false,
- ConfigurationParams(),
- FROM_HERE);
- DoSyncSessionJob(cleanup_job);
-
- if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) {
- // TODO(zea): implement in such a way that we can handle failures and the
- // subsequent retrys the scheduler might perform. See crbug.com/129665.
- NOTIMPLEMENTED();
- }
-
- // Only reconfigure if we have types to download.
- if (!params.types_to_download.Empty()) {
- DCHECK(!restricted_routes.empty());
- linked_ptr<SyncSession> session(new SyncSession(
- session_context_,
- this,
- SyncSourceInfo(params.source,
- ModelSafeRoutingInfoToPayloadMap(
- restricted_routes,
- std::string())),
- restricted_routes,
- restricted_workers));
- SyncSessionJob job(SyncSessionJob::CONFIGURATION,
- TimeTicks::Now(),
- session,
- false,
- params,
- FROM_HERE);
- DoSyncSessionJob(job);
-
- // If we failed, the job would have been saved as the pending configure
- // job and a wait interval would have been set.
- if (!session->Succeeded()) {
- DCHECK(wait_interval_.get() &&
- wait_interval_->pending_configure_job.get());
- return false;
- }
- } else {
- SDVLOG(2) << "No change in routing info, calling ready task directly.";
- params.ready_task.Run();
- }
-
- return true;
-}
-
-SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval(
- const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK(wait_interval_.get());
- DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES);
-
- SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
- << WaitInterval::GetModeString(wait_interval_->mode)
- << (wait_interval_->had_nudge ? " (had nudge)" : "")
- << (job.is_canary_job ? " (canary)" : "");
-
- if (job.purpose == SyncSessionJob::POLL)
- return DROP;
-
- DCHECK(job.purpose == SyncSessionJob::NUDGE ||
- job.purpose == SyncSessionJob::CONFIGURATION);
- if (wait_interval_->mode == WaitInterval::THROTTLED)
- return SAVE;
-
- DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
- if (job.purpose == SyncSessionJob::NUDGE) {
- if (mode_ == CONFIGURATION_MODE)
- return SAVE;
-
- // If we already had one nudge then just drop this nudge. We will retry
- // later when the timer runs out.
- if (!job.is_canary_job)
- return wait_interval_->had_nudge ? DROP : CONTINUE;
- else // We are here because timer ran out. So retry.
- return CONTINUE;
- }
- return job.is_canary_job ? CONTINUE : SAVE;
-}
-
-SyncScheduler::JobProcessDecision SyncScheduler::DecideOnJob(
- const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (job.purpose == SyncSessionJob::CLEANUP_DISABLED_TYPES)
- return CONTINUE;
-
- // See if our type is throttled.
- syncer::ModelTypeSet throttled_types =
- session_context_->throttled_data_type_tracker()->GetThrottledTypes();
- if (job.purpose == SyncSessionJob::NUDGE &&
- job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
- syncer::ModelTypeSet requested_types;
- for (ModelTypePayloadMap::const_iterator i =
- job.session->source().types.begin();
- i != job.session->source().types.end();
- ++i) {
- requested_types.Put(i->first);
- }
-
- if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
- return SAVE;
- }
-
- if (wait_interval_.get())
- return DecideWhileInWaitInterval(job);
-
- if (mode_ == CONFIGURATION_MODE) {
- if (job.purpose == SyncSessionJob::NUDGE)
- return SAVE;
- else if (job.purpose == SyncSessionJob::CONFIGURATION)
- return CONTINUE;
- else
- return DROP;
- }
-
- // We are in normal mode.
- DCHECK_EQ(mode_, NORMAL_MODE);
- DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
-
- // Freshness condition
- if (job.scheduled_start < last_sync_session_end_time_) {
- SDVLOG(2) << "Dropping job because of freshness";
- return DROP;
- }
-
- if (!session_context_->connection_manager()->HasInvalidAuthToken())
- return CONTINUE;
-
- SDVLOG(2) << "No valid auth token. Using that to decide on job.";
- return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
-}
-
-void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
- if (pending_nudge_.get() == NULL) {
- SDVLOG(2) << "Creating a pending nudge job";
- SyncSession* s = job.session.get();
- scoped_ptr<SyncSession> session(new SyncSession(s->context(),
- s->delegate(), s->source(), s->routing_info(), s->workers()));
-
- SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
- make_linked_ptr(session.release()), false,
- ConfigurationParams(), job.from_here);
- pending_nudge_.reset(new SyncSessionJob(new_job));
-
- return;
- }
-
- SDVLOG(2) << "Coalescing a pending nudge";
- pending_nudge_->session->Coalesce(*(job.session.get()));
- pending_nudge_->scheduled_start = job.scheduled_start;
-
- // Unfortunately the nudge location cannot be modified. So it stores the
- // location of the first caller.
-}
-
-bool SyncScheduler::ShouldRunJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK(started_);
-
- JobProcessDecision decision = DecideOnJob(job);
- SDVLOG(2) << "Should run "
- << SyncSessionJob::GetPurposeString(job.purpose)
- << " job in mode " << GetModeString(mode_)
- << ": " << GetDecisionString(decision);
- if (decision != SAVE)
- return decision == CONTINUE;
-
- DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
- SyncSessionJob::CONFIGURATION);
-
- SaveJob(job);
- return false;
-}
-
-void SyncScheduler::SaveJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- // TODO(sync): Should we also check that job.purpose !=
- // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.)
- if (job.purpose == SyncSessionJob::NUDGE) {
- SDVLOG(2) << "Saving a nudge job";
- InitOrCoalescePendingJob(job);
- } else if (job.purpose == SyncSessionJob::CONFIGURATION){
- SDVLOG(2) << "Saving a configuration job";
- DCHECK(wait_interval_.get());
- DCHECK(mode_ == CONFIGURATION_MODE);
-
- // Config params should always get set.
- DCHECK(!job.config_params.ready_task.is_null());
- SyncSession* old = job.session.get();
- SyncSession* s(new SyncSession(session_context_, this, old->source(),
- old->routing_info(), old->workers()));
- SyncSessionJob new_job(job.purpose,
- TimeTicks::Now(),
- make_linked_ptr(s),
- false,
- job.config_params,
- job.from_here);
- wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
- } // drop the rest.
- // TODO(sync): Is it okay to drop the rest? It's weird that
- // SaveJob() only does what it says sometimes. (See
- // http://crbug.com/90868.)
-}
-
-// Functor for std::find_if to search by ModelSafeGroup.
-struct ModelSafeWorkerGroupIs {
- explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
- bool operator()(ModelSafeWorker* w) {
- return group == w->GetModelSafeGroup();
- }
- ModelSafeGroup group;
-};
-
-void SyncScheduler::ScheduleNudgeAsync(
- const TimeDelta& delay,
- NudgeSource source, ModelTypeSet types,
- const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG_LOC(nudge_location, 2)
- << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
- << "source " << GetNudgeSourceString(source) << ", "
- << "types " << ModelTypeSetToString(types);
-
- ModelTypePayloadMap types_with_payloads =
- syncer::ModelTypePayloadMapFromEnumSet(types, std::string());
- SyncScheduler::ScheduleNudgeImpl(delay,
- GetUpdatesFromNudgeSource(source),
- types_with_payloads,
- false,
- nudge_location);
-}
-
-void SyncScheduler::ScheduleNudgeWithPayloadsAsync(
- const TimeDelta& delay,
- NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
- const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG_LOC(nudge_location, 2)
- << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
- << "source " << GetNudgeSourceString(source) << ", "
- << "payloads "
- << syncer::ModelTypePayloadMapToString(types_with_payloads);
-
- SyncScheduler::ScheduleNudgeImpl(delay,
- GetUpdatesFromNudgeSource(source),
- types_with_payloads,
- false,
- nudge_location);
-}
-
-void SyncScheduler::ScheduleNudgeImpl(
- const TimeDelta& delay,
- GetUpdatesCallerInfo::GetUpdatesSource source,
- const ModelTypePayloadMap& types_with_payloads,
- bool is_canary_job, const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
-
- SDVLOG_LOC(nudge_location, 2)
- << "In ScheduleNudgeImpl with delay "
- << delay.InMilliseconds() << " ms, "
- << "source " << GetUpdatesSourceString(source) << ", "
- << "payloads "
- << syncer::ModelTypePayloadMapToString(types_with_payloads)
- << (is_canary_job ? " (canary)" : "");
-
- SyncSourceInfo info(source, types_with_payloads);
-
- SyncSession* session(CreateSyncSession(info));
- SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
- make_linked_ptr(session), is_canary_job,
- ConfigurationParams(), nudge_location);
-
- session = NULL;
- if (!ShouldRunJob(job))
- return;
-
- if (pending_nudge_.get()) {
- if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
- SDVLOG(2) << "Dropping the nudge because we are in backoff";
- return;
- }
-
- SDVLOG(2) << "Coalescing pending nudge";
- pending_nudge_->session->Coalesce(*(job.session.get()));
-
- SDVLOG(2) << "Rescheduling pending nudge";
- SyncSession* s = pending_nudge_->session.get();
- job.session.reset(new SyncSession(s->context(), s->delegate(),
- s->source(), s->routing_info(), s->workers()));
-
- // Choose the start time as the earliest of the 2.
- job.scheduled_start = std::min(job.scheduled_start,
- pending_nudge_->scheduled_start);
- pending_nudge_.reset();
- }
-
- // TODO(zea): Consider adding separate throttling/backoff for datatype
- // refresh requests.
- ScheduleSyncSessionJob(job);
-}
-
-const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) {
- switch (mode) {
- ENUM_CASE(CONFIGURATION_MODE);
- ENUM_CASE(NORMAL_MODE);
- }
- return "";
-}
-
-const char* SyncScheduler::GetDecisionString(
- SyncScheduler::JobProcessDecision mode) {
- switch (mode) {
- ENUM_CASE(CONTINUE);
- ENUM_CASE(SAVE);
- ENUM_CASE(DROP);
- }
- return "";
-}
-
-// static
-void SyncScheduler::SetSyncerStepsForPurpose(
- SyncSessionJob::SyncSessionJobPurpose purpose,
- SyncerStep* start,
- SyncerStep* end) {
- switch (purpose) {
- case SyncSessionJob::CONFIGURATION:
- *start = DOWNLOAD_UPDATES;
- *end = APPLY_UPDATES;
- return;
- case SyncSessionJob::NUDGE:
- case SyncSessionJob::POLL:
- *start = SYNCER_BEGIN;
- *end = SYNCER_END;
- return;
- case SyncSessionJob::CLEANUP_DISABLED_TYPES:
- *start = CLEANUP_DISABLED_TYPES;
- *end = CLEANUP_DISABLED_TYPES;
- return;
- default:
- NOTREACHED();
- *start = SYNCER_END;
- *end = SYNCER_END;
- return;
- }
-}
-
-void SyncScheduler::PostTask(
- const tracked_objects::Location& from_here,
- const char* name, const base::Closure& task) {
- SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (!started_) {
- SDVLOG(1) << "Not posting task as scheduler is stopped.";
- return;
- }
- sync_loop_->PostTask(from_here, task);
-}
-
-void SyncScheduler::PostDelayedTask(
- const tracked_objects::Location& from_here,
- const char* name, const base::Closure& task, base::TimeDelta delay) {
- SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
- << delay.InMilliseconds() << " ms delay";
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (!started_) {
- SDVLOG(1) << "Not posting task as scheduler is stopped.";
- return;
- }
- sync_loop_->PostDelayedTask(from_here, task, delay);
-}
-
-void SyncScheduler::ScheduleSyncSessionJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- TimeDelta delay = job.scheduled_start - TimeTicks::Now();
- if (delay < TimeDelta::FromMilliseconds(0))
- delay = TimeDelta::FromMilliseconds(0);
- SDVLOG_LOC(job.from_here, 2)
- << "In ScheduleSyncSessionJob with "
- << SyncSessionJob::GetPurposeString(job.purpose)
- << " job and " << delay.InMilliseconds() << " ms delay";
-
- DCHECK(job.purpose == SyncSessionJob::NUDGE ||
- job.purpose == SyncSessionJob::POLL);
- if (job.purpose == SyncSessionJob::NUDGE) {
- SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge";
- DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() ==
- job.session);
- pending_nudge_.reset(new SyncSessionJob(job));
- }
- PostDelayedTask(job.from_here, "DoSyncSessionJob",
- base::Bind(&SyncScheduler::DoSyncSessionJob,
- weak_ptr_factory_.GetWeakPtr(),
- job),
- delay);
-}
-
-void SyncScheduler::DoSyncSessionJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (!ShouldRunJob(job)) {
- SLOG(WARNING)
- << "Not executing "
- << SyncSessionJob::GetPurposeString(job.purpose) << " job from "
- << GetUpdatesSourceString(job.session->source().updates_source);
- return;
- }
-
- if (job.purpose == SyncSessionJob::NUDGE) {
- if (pending_nudge_.get() == NULL ||
- pending_nudge_->session != job.session) {
- SDVLOG(2) << "Dropping a nudge in "
- << "DoSyncSessionJob because another nudge was scheduled";
- return; // Another nudge must have been scheduled in in the meantime.
- }
- pending_nudge_.reset();
-
- // Create the session with the latest model safe table and use it to purge
- // and update any disabled or modified entries in the job.
- scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
-
- job.session->RebaseRoutingInfoWithLatest(*session);
- }
- SDVLOG(2) << "DoSyncSessionJob with "
- << SyncSessionJob::GetPurposeString(job.purpose) << " job";
-
- SyncerStep begin(SYNCER_END);
- SyncerStep end(SYNCER_END);
- SetSyncerStepsForPurpose(job.purpose, &begin, &end);
-
- bool has_more_to_sync = true;
- while (ShouldRunJob(job) && has_more_to_sync) {
- SDVLOG(2) << "Calling SyncShare.";
- // Synchronously perform the sync session from this thread.
- syncer_->SyncShare(job.session.get(), begin, end);
- has_more_to_sync = job.session->HasMoreToSync();
- if (has_more_to_sync)
- job.session->PrepareForAnotherSyncCycle();
- }
- SDVLOG(2) << "Done SyncShare looping.";
-
- FinishSyncSessionJob(job);
-}
-
-void SyncScheduler::UpdateCarryoverSessionState(
- const SyncSessionJob& old_job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
- // Whatever types were part of a configuration task will have had updates
- // downloaded. For that reason, we make sure they get recorded in the
- // event that they get disabled at a later time.
- ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
- if (!r.empty()) {
- ModelSafeRoutingInfo temp_r;
- ModelSafeRoutingInfo old_info(old_job.session->routing_info());
- std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
- std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
- session_context_->set_previous_session_routing_info(temp_r);
- }
- } else {
- session_context_->set_previous_session_routing_info(
- old_job.session->routing_info());
- }
-}
-
-void SyncScheduler::FinishSyncSessionJob(const SyncSessionJob& job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- // Update timing information for how often datatypes are triggering nudges.
- base::TimeTicks now = TimeTicks::Now();
- if (!last_sync_session_end_time_.is_null()) {
- ModelTypePayloadMap::const_iterator iter;
- for (iter = job.session->source().types.begin();
- iter != job.session->source().types.end();
- ++iter) {
-#define PER_DATA_TYPE_MACRO(type_str) \
- SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, \
- now - last_sync_session_end_time_);
- SYNC_DATA_TYPE_HISTOGRAM(iter->first);
-#undef PER_DATA_TYPE_MACRO
- }
- }
- last_sync_session_end_time_ = now;
-
- // Now update the status of the connection from SCM. We need this to decide
- // whether we need to save/run future jobs. The notifications from SCM are not
- // reliable.
- //
- // TODO(rlarocque): crbug.com/110954
- // We should get rid of the notifications and it is probably not needed to
- // maintain this status variable in 2 places. We should query it directly from
- // SCM when needed.
- ServerConnectionManager* scm = session_context_->connection_manager();
- UpdateServerConnectionManagerStatus(scm->server_status());
-
- UpdateCarryoverSessionState(job);
- if (IsSyncingCurrentlySilenced()) {
- SDVLOG(2) << "We are currently throttled; not scheduling the next sync.";
- // TODO(sync): Investigate whether we need to check job.purpose
- // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
- SaveJob(job);
- return; // Nothing to do.
- } else if (job.session->Succeeded() &&
- !job.config_params.ready_task.is_null()) {
- // If this was a configuration job with a ready task, invoke it now that
- // we finished successfully.
- job.config_params.ready_task.Run();
- }
-
- SDVLOG(2) << "Updating the next polling time after SyncMain";
- ScheduleNextSync(job);
-}
-
-void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK(!old_job.session->HasMoreToSync());
-
- AdjustPolling(&old_job);
-
- if (old_job.session->Succeeded()) {
- // Only reset backoff if we actually reached the server.
- if (old_job.session->SuccessfullyReachedServer())
- wait_interval_.reset();
- SDVLOG(2) << "Job succeeded so not scheduling more jobs";
- return;
- }
-
- if (old_job.purpose == SyncSessionJob::POLL) {
- return; // We don't retry POLL jobs.
- }
-
- // TODO(rlarocque): There's no reason why we should blindly backoff and retry
- // if we don't succeed. Some types of errors are not likely to disappear on
- // their own. With the return values now available in the old_job.session, we
- // should be able to detect such errors and only retry when we detect
- // transient errors.
-
- if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
- mode_ == NORMAL_MODE) {
- // When in normal mode, we allow up to one nudge per backoff interval. It
- // appears that this was our nudge for this interval, and it failed.
- //
- // Note: This does not prevent us from running canary jobs. For example, an
- // IP address change might still result in another nudge being executed
- // during this backoff interval.
- SDVLOG(2) << "A nudge during backoff failed";
-
- DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
- DCHECK(!wait_interval_->had_nudge);
-
- wait_interval_->had_nudge = true;
- InitOrCoalescePendingJob(old_job);
- RestartWaiting();
- } else {
- // Either this is the first failure or a consecutive failure after our
- // backoff timer expired. We handle it the same way in either case.
- SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
- HandleContinuationError(old_job);
- }
-}
-
-void SyncScheduler::AdjustPolling(const SyncSessionJob* old_job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
-
- TimeDelta poll = (!session_context_->notifications_enabled()) ?
- syncer_short_poll_interval_seconds_ :
- syncer_long_poll_interval_seconds_;
- bool rate_changed = !poll_timer_.IsRunning() ||
- poll != poll_timer_.GetCurrentDelay();
-
- if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
- poll_timer_.Reset();
-
- if (!rate_changed)
- return;
-
- // Adjust poll rate.
- poll_timer_.Stop();
- poll_timer_.Start(FROM_HERE, poll, this, &SyncScheduler::PollTimerCallback);
-}
-
-void SyncScheduler::RestartWaiting() {
- CHECK(wait_interval_.get());
- wait_interval_->timer.Stop();
- wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
- this, &SyncScheduler::DoCanaryJob);
-}
-
-void SyncScheduler::HandleContinuationError(
- const SyncSessionJob& old_job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (DCHECK_IS_ON()) {
- if (IsBackingOff()) {
- DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
- }
- }
-
- TimeDelta length = delay_provider_->GetDelay(
- IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
-
- SDVLOG(2) << "In handle continuation error with "
- << SyncSessionJob::GetPurposeString(old_job.purpose)
- << " job. The time delta(ms) is "
- << length.InMilliseconds();
-
- // This will reset the had_nudge variable as well.
- wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
- length));
- if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
- SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
- // Config params should always get set.
- DCHECK(!old_job.config_params.ready_task.is_null());
- SyncSession* old = old_job.session.get();
- SyncSession* s(new SyncSession(session_context_, this,
- old->source(), old->routing_info(), old->workers()));
- SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
- make_linked_ptr(s), false, old_job.config_params,
- FROM_HERE);
- wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
- } else {
- // We are not in configuration mode. So wait_interval's pending job
- // should be null.
- DCHECK(wait_interval_->pending_configure_job.get() == NULL);
-
- // TODO(lipalani) - handle clear user data.
- InitOrCoalescePendingJob(old_job);
- }
- RestartWaiting();
-}
-
-// static
-TimeDelta SyncScheduler::GetRecommendedDelay(const TimeDelta& last_delay) {
- if (last_delay.InSeconds() >= kMaxBackoffSeconds)
- return TimeDelta::FromSeconds(kMaxBackoffSeconds);
-
- // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
- int64 backoff_s =
- std::max(static_cast<int64>(1),
- last_delay.InSeconds() * kBackoffRandomizationFactor);
-
- // Flip a coin to randomize backoff interval by +/- 50%.
- int rand_sign = base::RandInt(0, 1) * 2 - 1;
-
- // Truncation is adequate for rounding here.
- backoff_s = backoff_s +
- (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
-
- // Cap the backoff interval.
- backoff_s = std::max(static_cast<int64>(1),
- std::min(backoff_s, kMaxBackoffSeconds));
-
- return TimeDelta::FromSeconds(backoff_s);
-}
-
-void SyncScheduler::RequestStop(const base::Closure& callback) {
- syncer_->RequestEarlyExit(); // Safe to call from any thread.
- DCHECK(weak_handle_this_.IsInitialized());
- SDVLOG(3) << "Posting StopImpl";
- weak_handle_this_.Call(FROM_HERE,
- &SyncScheduler::StopImpl,
- callback);
-}
-
-void SyncScheduler::StopImpl(const base::Closure& callback) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG(2) << "StopImpl called";
-
- // Kill any in-flight method calls.
- weak_ptr_factory_.InvalidateWeakPtrs();
- wait_interval_.reset();
- poll_timer_.Stop();
- if (started_) {
- started_ = false;
- }
- if (!callback.is_null())
- callback.Run();
-}
-
-void SyncScheduler::DoCanaryJob() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG(2) << "Do canary job";
- DoPendingJobIfPossible(true);
-}
-
-void SyncScheduler::DoPendingJobIfPossible(bool is_canary_job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SyncSessionJob* job_to_execute = NULL;
- if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
- && wait_interval_->pending_configure_job.get()) {
- SDVLOG(2) << "Found pending configure job";
- job_to_execute = wait_interval_->pending_configure_job.get();
- } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
- SDVLOG(2) << "Found pending nudge job";
- // Pending jobs mostly have time from the past. Reset it so this job
- // will get executed.
- if (pending_nudge_->scheduled_start < TimeTicks::Now())
- pending_nudge_->scheduled_start = TimeTicks::Now();
-
- scoped_ptr<SyncSession> session(CreateSyncSession(
- pending_nudge_->session->source()));
-
- // Also the routing info might have been changed since we cached the
- // pending nudge. Update it by coalescing to the latest.
- pending_nudge_->session->Coalesce(*(session.get()));
- // The pending nudge would be cleared in the DoSyncSessionJob function.
- job_to_execute = pending_nudge_.get();
- }
-
- if (job_to_execute != NULL) {
- SDVLOG(2) << "Executing pending job";
- SyncSessionJob copy = *job_to_execute;
- copy.is_canary_job = is_canary_job;
- DoSyncSessionJob(copy);
- }
-}
-
-SyncSession* SyncScheduler::CreateSyncSession(const SyncSourceInfo& source) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DVLOG(2) << "Creating sync session with routes "
- << ModelSafeRoutingInfoToString(session_context_->routing_info());
-
- SyncSourceInfo info(source);
- SyncSession* session(new SyncSession(session_context_, this, info,
- session_context_->routing_info(), session_context_->workers()));
-
- return session;
-}
-
-void SyncScheduler::PollTimerCallback() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- ModelSafeRoutingInfo r;
- ModelTypePayloadMap types_with_payloads =
- ModelSafeRoutingInfoToPayloadMap(r, std::string());
- SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
- SyncSession* s = CreateSyncSession(info);
-
- SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(),
- make_linked_ptr(s),
- false,
- ConfigurationParams(),
- FROM_HERE);
-
- ScheduleSyncSessionJob(job);
-}
-
-void SyncScheduler::Unthrottle() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
- SDVLOG(2) << "Unthrottled.";
- DoCanaryJob();
- wait_interval_.reset();
-}
-
-void SyncScheduler::Notify(SyncEngineEvent::EventCause cause) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- session_context_->NotifyListeners(SyncEngineEvent(cause));
-}
-
-bool SyncScheduler::IsBackingOff() const {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- return wait_interval_.get() && wait_interval_->mode ==
- WaitInterval::EXPONENTIAL_BACKOFF;
-}
-
-void SyncScheduler::OnSilencedUntil(const base::TimeTicks& silenced_until) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
- silenced_until - TimeTicks::Now()));
- wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
- &SyncScheduler::Unthrottle);
-}
-
-bool SyncScheduler::IsSyncingCurrentlySilenced() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- return wait_interval_.get() && wait_interval_->mode ==
- WaitInterval::THROTTLED;
-}
-
-void SyncScheduler::OnReceivedShortPollIntervalUpdate(
- const base::TimeDelta& new_interval) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- syncer_short_poll_interval_seconds_ = new_interval;
-}
-
-void SyncScheduler::OnReceivedLongPollIntervalUpdate(
- const base::TimeDelta& new_interval) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- syncer_long_poll_interval_seconds_ = new_interval;
-}
-
-void SyncScheduler::OnReceivedSessionsCommitDelay(
- const base::TimeDelta& new_delay) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- sessions_commit_delay_ = new_delay;
-}
-
-void SyncScheduler::OnShouldStopSyncingPermanently() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG(2) << "OnShouldStopSyncingPermanently";
- syncer_->RequestEarlyExit(); // Thread-safe.
- Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
-}
-
-void SyncScheduler::OnActionableError(
- const sessions::SyncSessionSnapshot& snap) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG(2) << "OnActionableError";
- SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
- event.snapshot = snap;
- session_context_->NotifyListeners(event);
-}
-
-void SyncScheduler::OnSyncProtocolError(
- const sessions::SyncSessionSnapshot& snapshot) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (ShouldRequestEarlyExit(
- snapshot.model_neutral_state().sync_protocol_error)) {
- SDVLOG(2) << "Sync Scheduler requesting early exit.";
- syncer_->RequestEarlyExit(); // Thread-safe.
- }
- if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error))
- OnActionableError(snapshot);
-}
-
-void SyncScheduler::set_notifications_enabled(bool notifications_enabled) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- session_context_->set_notifications_enabled(notifications_enabled);
-}
-
-base::TimeDelta SyncScheduler::sessions_commit_delay() const {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- return sessions_commit_delay_;
-}
-
-#undef SDVLOG_LOC
-
-#undef SDVLOG
-
-#undef SLOG
-
-#undef ENUM_CASE
+SyncScheduler::SyncScheduler() {}
+SyncScheduler::~SyncScheduler() {}
} // namespace syncer
diff --git a/sync/engine/sync_scheduler.h b/sync/engine/sync_scheduler.h
index 0ca2f5d..423516b 100644
--- a/sync/engine/sync_scheduler.h
+++ b/sync/engine/sync_scheduler.h
@@ -10,21 +10,10 @@
#include "base/callback.h"
#include "base/compiler_specific.h"
-#include "base/gtest_prod_util.h"
-#include "base/memory/linked_ptr.h"
-#include "base/memory/scoped_ptr.h"
-#include "base/memory/weak_ptr.h"
-#include "base/observer_list.h"
#include "base/time.h"
-#include "base/timer.h"
-#include "sync/engine/net/server_connection_manager.h"
#include "sync/engine/nudge_source.h"
-#include "sync/engine/syncer.h"
#include "sync/internal_api/public/base/model_type_payload_map.h"
-#include "sync/internal_api/public/engine/polling_constants.h"
-#include "sync/internal_api/public/util/weak_handle.h"
#include "sync/sessions/sync_session.h"
-#include "sync/sessions/sync_session_context.h"
class MessageLoop;
@@ -78,25 +67,20 @@ class SyncScheduler : public sessions::SyncSession::Delegate {
// All methods of SyncScheduler must be called on the same thread
// (except for RequestEarlyExit()).
- // |name| is a display string to identify the syncer thread. Takes
- // |ownership of |syncer|.
- SyncScheduler(const std::string& name,
- sessions::SyncSessionContext* context, Syncer* syncer);
-
- // Calls Stop().
+ SyncScheduler();
virtual ~SyncScheduler();
// Start the scheduler with the given mode. If the scheduler is
// already started, switch to the given mode, although some
// scheduled tasks from the old mode may still run.
- virtual void Start(Mode mode);
+ virtual void Start(Mode mode) = 0;
// Schedules the configuration task specified by |params|. Returns true if
// the configuration task executed immediately, false if it had to be
// scheduled for a later attempt. |params.ready_task| is invoked whenever the
// configuration task executes.
// Note: must already be in CONFIGURATION mode.
- virtual bool ScheduleConfiguration(const ConfigurationParams& params);
+ virtual bool ScheduleConfiguration(const ConfigurationParams& params) = 0;
// Request that any running syncer task stop as soon as possible and
// cancel all scheduled tasks. This function can be called from any thread,
@@ -104,326 +88,30 @@ class SyncScheduler : public sessions::SyncSession::Delegate {
// allow preempting ongoing sync cycles.
// Invokes |callback| from the sync loop once syncer is idle and all tasks
// are cancelled.
- void RequestStop(const base::Closure& callback);
+ virtual void RequestStop(const base::Closure& callback) = 0;
// The meat and potatoes. Both of these methods will post a delayed task
// to attempt the actual nudge (see ScheduleNudgeImpl).
- void ScheduleNudgeAsync(const base::TimeDelta& delay, NudgeSource source,
- syncer::ModelTypeSet types,
- const tracked_objects::Location& nudge_location);
- void ScheduleNudgeWithPayloadsAsync(
+ virtual void ScheduleNudgeAsync(
+ const base::TimeDelta& delay,
+ NudgeSource source,
+ syncer::ModelTypeSet types,
+ const tracked_objects::Location& nudge_location) = 0;
+ virtual void ScheduleNudgeWithPayloadsAsync(
const base::TimeDelta& delay, NudgeSource source,
const syncer::ModelTypePayloadMap& types_with_payloads,
- const tracked_objects::Location& nudge_location);
-
- void CleanupDisabledTypes();
+ const tracked_objects::Location& nudge_location) = 0;
// Change status of notifications in the SyncSessionContext.
- void set_notifications_enabled(bool notifications_enabled);
+ virtual void SetNotificationsEnabled(bool notifications_enabled) = 0;
- base::TimeDelta sessions_commit_delay() const;
-
- // DDOS avoidance function. Calculates how long we should wait before trying
- // again after a failed sync attempt, where the last delay was |base_delay|.
- // TODO(tim): Look at URLRequestThrottlerEntryInterface.
- static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay);
+ virtual base::TimeDelta GetSessionsCommitDelay() const = 0;
// Called when credentials are updated by the user.
- void OnCredentialsUpdated();
+ virtual void OnCredentialsUpdated() = 0;
// Called when the network layer detects a connection status change.
- void OnConnectionStatusChange();
-
- // SyncSession::Delegate implementation.
- virtual void OnSilencedUntil(
- const base::TimeTicks& silenced_until) OVERRIDE;
- virtual bool IsSyncingCurrentlySilenced() OVERRIDE;
- virtual void OnReceivedShortPollIntervalUpdate(
- const base::TimeDelta& new_interval) OVERRIDE;
- virtual void OnReceivedLongPollIntervalUpdate(
- const base::TimeDelta& new_interval) OVERRIDE;
- virtual void OnReceivedSessionsCommitDelay(
- const base::TimeDelta& new_delay) OVERRIDE;
- virtual void OnShouldStopSyncingPermanently() OVERRIDE;
- virtual void OnSyncProtocolError(
- const sessions::SyncSessionSnapshot& snapshot) OVERRIDE;
-
- private:
- enum JobProcessDecision {
- // Indicates we should continue with the current job.
- CONTINUE,
- // Indicates that we should save it to be processed later.
- SAVE,
- // Indicates we should drop this job.
- DROP,
- };
-
- struct SyncSessionJob {
- // An enum used to describe jobs for scheduling purposes.
- enum SyncSessionJobPurpose {
- // Uninitialized state, should never be hit in practice.
- UNKNOWN = -1,
- // Our poll timer schedules POLL jobs periodically based on a server
- // assigned poll interval.
- POLL,
- // A nudge task can come from a variety of components needing to force
- // a sync. The source is inferable from |session.source()|.
- NUDGE,
- // Typically used for fetching updates for a subset of the enabled types
- // during initial sync or reconfiguration. We don't run all steps of
- // the sync cycle for these (e.g. CleanupDisabledTypes is skipped).
- CONFIGURATION,
- // The user disabled some types and we have to clean up the data
- // for those.
- CLEANUP_DISABLED_TYPES,
- };
- SyncSessionJob();
- SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start,
- linked_ptr<sessions::SyncSession> session, bool is_canary_job,
- const ConfigurationParams& config_params,
- const tracked_objects::Location& nudge_location);
- ~SyncSessionJob();
- static const char* GetPurposeString(SyncSessionJobPurpose purpose);
-
- SyncSessionJobPurpose purpose;
- base::TimeTicks scheduled_start;
- linked_ptr<sessions::SyncSession> session;
- bool is_canary_job;
- ConfigurationParams config_params;
-
- // This is the location the job came from. Used for debugging.
- // In case of multiple nudges getting coalesced this stores the
- // first location that came in.
- tracked_objects::Location from_here;
- };
- friend class SyncSchedulerTest;
- friend class SyncSchedulerWhiteboxTest;
- friend class SyncerTest;
-
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
- DropNudgeWhileExponentialBackOff);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, SaveNudge);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
- SaveNudgeWhileTypeThrottled);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueNudge);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, DropPoll);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinuePoll);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueConfiguration);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
- SaveConfigurationWhileThrottled);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
- SaveNudgeWhileThrottled);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
- ContinueCanaryJobConfig);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
- ContinueNudgeWhileExponentialBackOff);
- FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest, TransientPollFailure);
-
- // A component used to get time delays associated with exponential backoff.
- // Encapsulated into a class to facilitate testing.
- class DelayProvider {
- public:
- DelayProvider();
- virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay);
- virtual ~DelayProvider();
- private:
- DISALLOW_COPY_AND_ASSIGN(DelayProvider);
- };
-
- struct WaitInterval {
- enum Mode {
- // Uninitialized state, should not be set in practice.
- UNKNOWN = -1,
- // A wait interval whose duration has been affected by exponential
- // backoff.
- // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval.
- EXPONENTIAL_BACKOFF,
- // A server-initiated throttled interval. We do not allow any syncing
- // during such an interval.
- THROTTLED,
- };
- WaitInterval();
- ~WaitInterval();
- WaitInterval(Mode mode, base::TimeDelta length);
-
- static const char* GetModeString(Mode mode);
-
- Mode mode;
-
- // This bool is set to true if we have observed a nudge during this
- // interval and mode == EXPONENTIAL_BACKOFF.
- bool had_nudge;
- base::TimeDelta length;
- base::OneShotTimer<SyncScheduler> timer;
-
- // Configure jobs are saved only when backing off or throttling. So we
- // expose the pointer here.
- scoped_ptr<SyncSessionJob> pending_configure_job;
- };
-
- static const char* GetModeString(Mode mode);
-
- static const char* GetDecisionString(JobProcessDecision decision);
-
- // Assign |start| and |end| to appropriate SyncerStep values for the
- // specified |purpose|.
- static void SetSyncerStepsForPurpose(
- SyncSessionJob::SyncSessionJobPurpose purpose,
- SyncerStep* start, SyncerStep* end);
-
- // Helpers that log before posting to |sync_loop_|. These will only post
- // the task in between calls to Start/Stop.
- void PostTask(const tracked_objects::Location& from_here,
- const char* name,
- const base::Closure& task);
- void PostDelayedTask(const tracked_objects::Location& from_here,
- const char* name,
- const base::Closure& task,
- base::TimeDelta delay);
-
- // Helper to assemble a job and post a delayed task to sync.
- void ScheduleSyncSessionJob(const SyncSessionJob& job);
-
- // Invoke the Syncer to perform a sync.
- void DoSyncSessionJob(const SyncSessionJob& job);
-
- // Called after the Syncer has performed the sync represented by |job|, to
- // reset our state.
- void FinishSyncSessionJob(const SyncSessionJob& job);
-
- // Record important state that might be needed in future syncs, such as which
- // data types may require cleanup.
- void UpdateCarryoverSessionState(const SyncSessionJob& old_job);
-
- // Helper to FinishSyncSessionJob to schedule the next sync operation.
- void ScheduleNextSync(const SyncSessionJob& old_job);
-
- // Helper to configure polling intervals. Used by Start and ScheduleNextSync.
- void AdjustPolling(const SyncSessionJob* old_job);
-
- // Helper to restart waiting with |wait_interval_|'s timer.
- void RestartWaiting();
-
- // Helper to ScheduleNextSync in case of consecutive sync errors.
- void HandleContinuationError(const SyncSessionJob& old_job);
-
- // Determines if it is legal to run |job| by checking current
- // operational mode, backoff or throttling, freshness
- // (so we don't make redundant syncs), and connection.
- bool ShouldRunJob(const SyncSessionJob& job);
-
- // Decide whether we should CONTINUE, SAVE or DROP the job.
- JobProcessDecision DecideOnJob(const SyncSessionJob& job);
-
- // Decide on whether to CONTINUE, SAVE or DROP the job when we are in
- // backoff mode.
- JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job);
-
- // Saves the job for future execution. Note: It drops all the poll jobs.
- void SaveJob(const SyncSessionJob& job);
-
- // Coalesces the current job with the pending nudge.
- void InitOrCoalescePendingJob(const SyncSessionJob& job);
-
- // 'Impl' here refers to real implementation of public functions, running on
- // |thread_|.
- void StopImpl(const base::Closure& callback);
- void ScheduleNudgeImpl(
- const base::TimeDelta& delay,
- sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source,
- const syncer::ModelTypePayloadMap& types_with_payloads,
- bool is_canary_job, const tracked_objects::Location& nudge_location);
-
- // Returns true if the client is currently in exponential backoff.
- bool IsBackingOff() const;
-
- // Helper to signal all listeners registered with |session_context_|.
- void Notify(SyncEngineEvent::EventCause cause);
-
- // Callback to change backoff state.
- void DoCanaryJob();
- void Unthrottle();
-
- // Executes the pending job. Called whenever an event occurs that may
- // change conditions permitting a job to run. Like when network connection is
- // re-established, mode changes etc.
- void DoPendingJobIfPossible(bool is_canary_job);
-
- // Called when the root cause of the current connection error is fixed.
- void OnServerConnectionErrorFixed();
-
- // The pointer is owned by the caller.
- syncer::sessions::SyncSession* CreateSyncSession(
- const syncer::sessions::SyncSourceInfo& info);
-
- // Creates a session for a poll and performs the sync.
- void PollTimerCallback();
-
- // Used to update |connection_code_|, see below.
- void UpdateServerConnectionManagerStatus(
- HttpResponse::ServerConnectionCode code);
-
- // Called once the first time thread_ is started to broadcast an initial
- // session snapshot containing data like initial_sync_ended. Important when
- // the client starts up and does not need to perform an initial sync.
- void SendInitialSnapshot();
-
- virtual void OnActionableError(const sessions::SyncSessionSnapshot& snapshot);
-
- base::WeakPtrFactory<SyncScheduler> weak_ptr_factory_;
-
- // A second factory specially for weak_handle_this_, to allow the handle
- // to be const and alleviate threading concerns.
- base::WeakPtrFactory<SyncScheduler> weak_ptr_factory_for_weak_handle_;
-
- // For certain methods that need to worry about X-thread posting.
- const WeakHandle<SyncScheduler> weak_handle_this_;
-
- // Used for logging.
- const std::string name_;
-
- // The message loop this object is on. Almost all methods have to
- // be called on this thread.
- MessageLoop* const sync_loop_;
-
- // Set in Start(), unset in Stop().
- bool started_;
-
- // Modifiable versions of kDefaultLongPollIntervalSeconds which can be
- // updated by the server.
- base::TimeDelta syncer_short_poll_interval_seconds_;
- base::TimeDelta syncer_long_poll_interval_seconds_;
-
- // Server-tweakable sessions commit delay.
- base::TimeDelta sessions_commit_delay_;
-
- // Periodic timer for polling. See AdjustPolling.
- base::RepeatingTimer<SyncScheduler> poll_timer_;
-
- // The mode of operation.
- Mode mode_;
-
- // TODO(tim): Bug 26339. This needs to track more than just time I think,
- // since the nudges could be for different types. Current impl doesn't care.
- base::TimeTicks last_sync_session_end_time_;
-
- // The latest connection code we got while trying to connect.
- HttpResponse::ServerConnectionCode connection_code_;
-
- // Tracks in-flight nudges so we can coalesce.
- scoped_ptr<SyncSessionJob> pending_nudge_;
-
- // Current wait state. Null if we're not in backoff and not throttled.
- scoped_ptr<WaitInterval> wait_interval_;
-
- scoped_ptr<DelayProvider> delay_provider_;
-
- // Invoked to run through the sync cycle.
- scoped_ptr<Syncer> syncer_;
-
- sessions::SyncSessionContext *session_context_;
-
- DISALLOW_COPY_AND_ASSIGN(SyncScheduler);
+ virtual void OnConnectionStatusChange() = 0;
};
} // namespace syncer
diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc
new file mode 100644
index 0000000..289e189
--- /dev/null
+++ b/sync/engine/sync_scheduler_impl.cc
@@ -0,0 +1,1209 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/sync_scheduler_impl.h"
+
+#include <algorithm>
+#include <cstring>
+
+#include "base/bind.h"
+#include "base/compiler_specific.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/rand_util.h"
+#include "sync/engine/syncer.h"
+#include "sync/engine/throttled_data_type_tracker.h"
+#include "sync/protocol/proto_enum_conversions.h"
+#include "sync/protocol/sync.pb.h"
+#include "sync/util/data_type_histogram.h"
+#include "sync/util/logging.h"
+
+using base::TimeDelta;
+using base::TimeTicks;
+
+namespace syncer {
+
+using sessions::SyncSession;
+using sessions::SyncSessionSnapshot;
+using sessions::SyncSourceInfo;
+using sync_pb::GetUpdatesCallerInfo;
+
+namespace {
+bool ShouldRequestEarlyExit(
+ const syncer::SyncProtocolError& error) {
+ switch (error.error_type) {
+ case syncer::SYNC_SUCCESS:
+ case syncer::MIGRATION_DONE:
+ case syncer::THROTTLED:
+ case syncer::TRANSIENT_ERROR:
+ return false;
+ case syncer::NOT_MY_BIRTHDAY:
+ case syncer::CLEAR_PENDING:
+ // If we send terminate sync early then |sync_cycle_ended| notification
+ // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
+ // notification wouldnt be sent either. Then the UI layer would be left
+ // waiting forever. So assert we would send something.
+ DCHECK(error.action != syncer::UNKNOWN_ACTION);
+ return true;
+ case syncer::INVALID_CREDENTIAL:
+ // The notification for this is handled by PostAndProcessHeaders|.
+ // Server does no have to send any action for this.
+ return true;
+ // Make the default a NOTREACHED. So if a new error is introduced we
+ // think about its expected functionality.
+ default:
+ NOTREACHED();
+ return false;
+ }
+}
+
+bool IsActionableError(
+ const syncer::SyncProtocolError& error) {
+ return (error.action != syncer::UNKNOWN_ACTION);
+}
+} // namespace
+
+ConfigurationParams::ConfigurationParams()
+ : source(GetUpdatesCallerInfo::UNKNOWN),
+ keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {}
+ConfigurationParams::ConfigurationParams(
+ const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
+ const syncer::ModelTypeSet& types_to_download,
+ const syncer::ModelSafeRoutingInfo& routing_info,
+ KeystoreKeyStatus keystore_key_status,
+ const base::Closure& ready_task)
+ : source(source),
+ types_to_download(types_to_download),
+ routing_info(routing_info),
+ keystore_key_status(keystore_key_status),
+ ready_task(ready_task) {
+ DCHECK(!ready_task.is_null());
+}
+ConfigurationParams::~ConfigurationParams() {}
+
+SyncSchedulerImpl::DelayProvider::DelayProvider() {}
+SyncSchedulerImpl::DelayProvider::~DelayProvider() {}
+
+SyncSchedulerImpl::WaitInterval::WaitInterval()
+ : mode(UNKNOWN),
+ had_nudge(false) {
+}
+
+SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
+
+#define ENUM_CASE(x) case x: return #x; break;
+
+const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
+ switch (mode) {
+ ENUM_CASE(UNKNOWN);
+ ENUM_CASE(EXPONENTIAL_BACKOFF);
+ ENUM_CASE(THROTTLED);
+ }
+ NOTREACHED();
+ return "";
+}
+
+SyncSchedulerImpl::SyncSessionJob::SyncSessionJob()
+ : purpose(UNKNOWN),
+ is_canary_job(false) {
+}
+
+SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {}
+
+SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
+ base::TimeTicks start,
+ linked_ptr<sessions::SyncSession> session,
+ bool is_canary_job,
+ const ConfigurationParams& config_params,
+ const tracked_objects::Location& from_here)
+ : purpose(purpose),
+ scheduled_start(start),
+ session(session),
+ is_canary_job(is_canary_job),
+ config_params(config_params),
+ from_here(from_here) {
+}
+
+const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString(
+ SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
+ switch (purpose) {
+ ENUM_CASE(UNKNOWN);
+ ENUM_CASE(POLL);
+ ENUM_CASE(NUDGE);
+ ENUM_CASE(CONFIGURATION);
+ ENUM_CASE(CLEANUP_DISABLED_TYPES);
+ }
+ NOTREACHED();
+ return "";
+}
+
+TimeDelta SyncSchedulerImpl::DelayProvider::GetDelay(
+ const base::TimeDelta& last_delay) {
+ return SyncSchedulerImpl::GetRecommendedDelay(last_delay);
+}
+
+GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
+ NudgeSource source) {
+ switch (source) {
+ case NUDGE_SOURCE_NOTIFICATION:
+ return GetUpdatesCallerInfo::NOTIFICATION;
+ case NUDGE_SOURCE_LOCAL:
+ return GetUpdatesCallerInfo::LOCAL;
+ case NUDGE_SOURCE_CONTINUATION:
+ return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
+ case NUDGE_SOURCE_LOCAL_REFRESH:
+ return GetUpdatesCallerInfo::DATATYPE_REFRESH;
+ case NUDGE_SOURCE_UNKNOWN:
+ return GetUpdatesCallerInfo::UNKNOWN;
+ default:
+ NOTREACHED();
+ return GetUpdatesCallerInfo::UNKNOWN;
+ }
+}
+
+SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
+ : mode(mode), had_nudge(false), length(length) { }
+
+// Helper macros to log with the syncer thread name; useful when there
+// are multiple syncer threads involved.
+
+#define SLOG(severity) LOG(severity) << name_ << ": "
+
+#define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
+
+#define SDVLOG_LOC(from_here, verbose_level) \
+ DVLOG_LOC(from_here, verbose_level) << name_ << ": "
+
+namespace {
+
+const int kDefaultSessionsCommitDelaySeconds = 10;
+
+bool IsConfigRelatedUpdateSourceValue(
+ GetUpdatesCallerInfo::GetUpdatesSource source) {
+ switch (source) {
+ case GetUpdatesCallerInfo::RECONFIGURATION:
+ case GetUpdatesCallerInfo::MIGRATION:
+ case GetUpdatesCallerInfo::NEW_CLIENT:
+ case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
+ return true;
+ default:
+ return false;
+ }
+}
+
+} // namespace
+
+SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
+ sessions::SyncSessionContext* context,
+ Syncer* syncer)
+ : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ weak_handle_this_(MakeWeakHandle(
+ weak_ptr_factory_for_weak_handle_.GetWeakPtr())),
+ name_(name),
+ sync_loop_(MessageLoop::current()),
+ started_(false),
+ syncer_short_poll_interval_seconds_(
+ TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
+ syncer_long_poll_interval_seconds_(
+ TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
+ sessions_commit_delay_(
+ TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
+ mode_(NORMAL_MODE),
+ // Start with assuming everything is fine with the connection.
+ // At the end of the sync cycle we would have the correct status.
+ connection_code_(HttpResponse::SERVER_CONNECTION_OK),
+ delay_provider_(new DelayProvider()),
+ syncer_(syncer),
+ session_context_(context) {
+ DCHECK(sync_loop_);
+}
+
+SyncSchedulerImpl::~SyncSchedulerImpl() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ StopImpl(base::Closure());
+}
+
+void SyncSchedulerImpl::OnCredentialsUpdated() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+
+ // TODO(lipalani): crbug.com/106262. One issue here is that if after
+ // the auth error we happened to do gettime and it succeeded then
+ // the |connection_code_| would be briefly OK however it would revert
+ // back to SYNC_AUTH_ERROR at the end of the sync cycle. The
+ // referenced bug explores the option of removing gettime calls
+ // altogethere
+ if (HttpResponse::SYNC_AUTH_ERROR == connection_code_) {
+ OnServerConnectionErrorFixed();
+ }
+}
+
+void SyncSchedulerImpl::OnConnectionStatusChange() {
+ if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) {
+ // Optimistically assume that the connection is fixed and try
+ // connecting.
+ OnServerConnectionErrorFixed();
+ }
+}
+
+void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
+ connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
+ PostTask(FROM_HERE, "DoCanaryJob",
+ base::Bind(&SyncSchedulerImpl::DoCanaryJob,
+ weak_ptr_factory_.GetWeakPtr()));
+
+}
+
+void SyncSchedulerImpl::UpdateServerConnectionManagerStatus(
+ HttpResponse::ServerConnectionCode code) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG(2) << "New server connection code: "
+ << HttpResponse::GetServerConnectionCodeString(code);
+
+ connection_code_ = code;
+}
+
+void SyncSchedulerImpl::Start(Mode mode) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ std::string thread_name = MessageLoop::current()->thread_name();
+ if (thread_name.empty())
+ thread_name = "<Main thread>";
+ SDVLOG(2) << "Start called from thread "
+ << thread_name << " with mode " << GetModeString(mode);
+ if (!started_) {
+ started_ = true;
+ SendInitialSnapshot();
+ }
+
+ DCHECK(!session_context_->account_name().empty());
+ DCHECK(syncer_.get());
+ Mode old_mode = mode_;
+ mode_ = mode;
+ AdjustPolling(NULL); // Will kick start poll timer if needed.
+
+ if (old_mode != mode_) {
+ // We just changed our mode. See if there are any pending jobs that we could
+ // execute in the new mode.
+ DoPendingJobIfPossible(false);
+ }
+}
+
+void SyncSchedulerImpl::SendInitialSnapshot() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
+ SyncSourceInfo(), ModelSafeRoutingInfo(),
+ std::vector<ModelSafeWorker*>()));
+ SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
+ event.snapshot = dummy->TakeSnapshot();
+ session_context_->NotifyListeners(event);
+}
+
+namespace {
+
+// Helper to extract the routing info and workers corresponding to types in
+// |types| from |current_routes| and |current_workers|.
+void BuildModelSafeParams(
+ const ModelTypeSet& types_to_download,
+ const ModelSafeRoutingInfo& current_routes,
+ const std::vector<ModelSafeWorker*>& current_workers,
+ ModelSafeRoutingInfo* result_routes,
+ std::vector<ModelSafeWorker*>* result_workers) {
+ std::set<ModelSafeGroup> active_groups;
+ active_groups.insert(GROUP_PASSIVE);
+ for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
+ iter.Inc()) {
+ syncer::ModelType type = iter.Get();
+ ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
+ DCHECK(route != current_routes.end());
+ ModelSafeGroup group = route->second;
+ (*result_routes)[type] = group;
+ active_groups.insert(group);
+ }
+
+ for(std::vector<ModelSafeWorker*>::const_iterator iter =
+ current_workers.begin(); iter != current_workers.end(); ++iter) {
+ if (active_groups.count((*iter)->GetModelSafeGroup()) > 0)
+ result_workers->push_back(*iter);
+ }
+}
+
+} // namespace.
+
+bool SyncSchedulerImpl::ScheduleConfiguration(
+ const ConfigurationParams& params) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
+ DCHECK_EQ(CONFIGURATION_MODE, mode_);
+ DCHECK(!params.ready_task.is_null());
+ SDVLOG(2) << "Reconfiguring syncer.";
+
+ // Only one configuration is allowed at a time. Verify we're not waiting
+ // for a pending configure job.
+ DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
+
+ // TODO(sync): now that ModelChanging commands only use those workers within
+ // the routing info, we don't really need |restricted_workers|. Remove it.
+ // crbug.com/133030
+ syncer::ModelSafeRoutingInfo restricted_routes;
+ std::vector<ModelSafeWorker*> restricted_workers;
+ BuildModelSafeParams(params.types_to_download,
+ params.routing_info,
+ session_context_->workers(),
+ &restricted_routes,
+ &restricted_workers);
+ session_context_->set_routing_info(params.routing_info);
+
+ // We rely on this not failing, so don't need to worry about checking for
+ // success. In addition, this will be removed as part of crbug.com/131433.
+ SyncSessionJob cleanup_job(
+ SyncSessionJob::CLEANUP_DISABLED_TYPES,
+ TimeTicks::Now(),
+ make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
+ false,
+ ConfigurationParams(),
+ FROM_HERE);
+ DoSyncSessionJob(cleanup_job);
+
+ if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) {
+ // TODO(zea): implement in such a way that we can handle failures and the
+ // subsequent retrys the scheduler might perform. See crbug.com/129665.
+ NOTIMPLEMENTED();
+ }
+
+ // Only reconfigure if we have types to download.
+ if (!params.types_to_download.Empty()) {
+ DCHECK(!restricted_routes.empty());
+ linked_ptr<SyncSession> session(new SyncSession(
+ session_context_,
+ this,
+ SyncSourceInfo(params.source,
+ ModelSafeRoutingInfoToPayloadMap(
+ restricted_routes,
+ std::string())),
+ restricted_routes,
+ restricted_workers));
+ SyncSessionJob job(SyncSessionJob::CONFIGURATION,
+ TimeTicks::Now(),
+ session,
+ false,
+ params,
+ FROM_HERE);
+ DoSyncSessionJob(job);
+
+ // If we failed, the job would have been saved as the pending configure
+ // job and a wait interval would have been set.
+ if (!session->Succeeded()) {
+ DCHECK(wait_interval_.get() &&
+ wait_interval_->pending_configure_job.get());
+ return false;
+ }
+ } else {
+ SDVLOG(2) << "No change in routing info, calling ready task directly.";
+ params.ready_task.Run();
+ }
+
+ return true;
+}
+
+SyncSchedulerImpl::JobProcessDecision
+SyncSchedulerImpl::DecideWhileInWaitInterval(
+ const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(wait_interval_.get());
+ DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES);
+
+ SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
+ << WaitInterval::GetModeString(wait_interval_->mode)
+ << (wait_interval_->had_nudge ? " (had nudge)" : "")
+ << (job.is_canary_job ? " (canary)" : "");
+
+ if (job.purpose == SyncSessionJob::POLL)
+ return DROP;
+
+ DCHECK(job.purpose == SyncSessionJob::NUDGE ||
+ job.purpose == SyncSessionJob::CONFIGURATION);
+ if (wait_interval_->mode == WaitInterval::THROTTLED)
+ return SAVE;
+
+ DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
+ if (job.purpose == SyncSessionJob::NUDGE) {
+ if (mode_ == CONFIGURATION_MODE)
+ return SAVE;
+
+ // If we already had one nudge then just drop this nudge. We will retry
+ // later when the timer runs out.
+ if (!job.is_canary_job)
+ return wait_interval_->had_nudge ? DROP : CONTINUE;
+ else // We are here because timer ran out. So retry.
+ return CONTINUE;
+ }
+ return job.is_canary_job ? CONTINUE : SAVE;
+}
+
+SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
+ const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (job.purpose == SyncSessionJob::CLEANUP_DISABLED_TYPES)
+ return CONTINUE;
+
+ // See if our type is throttled.
+ syncer::ModelTypeSet throttled_types =
+ session_context_->throttled_data_type_tracker()->GetThrottledTypes();
+ if (job.purpose == SyncSessionJob::NUDGE &&
+ job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
+ syncer::ModelTypeSet requested_types;
+ for (ModelTypePayloadMap::const_iterator i =
+ job.session->source().types.begin();
+ i != job.session->source().types.end();
+ ++i) {
+ requested_types.Put(i->first);
+ }
+
+ if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
+ return SAVE;
+ }
+
+ if (wait_interval_.get())
+ return DecideWhileInWaitInterval(job);
+
+ if (mode_ == CONFIGURATION_MODE) {
+ if (job.purpose == SyncSessionJob::NUDGE)
+ return SAVE;
+ else if (job.purpose == SyncSessionJob::CONFIGURATION)
+ return CONTINUE;
+ else
+ return DROP;
+ }
+
+ // We are in normal mode.
+ DCHECK_EQ(mode_, NORMAL_MODE);
+ DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
+
+ // Freshness condition
+ if (job.scheduled_start < last_sync_session_end_time_) {
+ SDVLOG(2) << "Dropping job because of freshness";
+ return DROP;
+ }
+
+ if (!session_context_->connection_manager()->HasInvalidAuthToken())
+ return CONTINUE;
+
+ SDVLOG(2) << "No valid auth token. Using that to decide on job.";
+ return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
+}
+
+void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
+ if (pending_nudge_.get() == NULL) {
+ SDVLOG(2) << "Creating a pending nudge job";
+ SyncSession* s = job.session.get();
+ scoped_ptr<SyncSession> session(new SyncSession(s->context(),
+ s->delegate(), s->source(), s->routing_info(), s->workers()));
+
+ SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
+ make_linked_ptr(session.release()), false,
+ ConfigurationParams(), job.from_here);
+ pending_nudge_.reset(new SyncSessionJob(new_job));
+
+ return;
+ }
+
+ SDVLOG(2) << "Coalescing a pending nudge";
+ pending_nudge_->session->Coalesce(*(job.session.get()));
+ pending_nudge_->scheduled_start = job.scheduled_start;
+
+ // Unfortunately the nudge location cannot be modified. So it stores the
+ // location of the first caller.
+}
+
+bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(started_);
+
+ JobProcessDecision decision = DecideOnJob(job);
+ SDVLOG(2) << "Should run "
+ << SyncSessionJob::GetPurposeString(job.purpose)
+ << " job in mode " << GetModeString(mode_)
+ << ": " << GetDecisionString(decision);
+ if (decision != SAVE)
+ return decision == CONTINUE;
+
+ DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
+ SyncSessionJob::CONFIGURATION);
+
+ SaveJob(job);
+ return false;
+}
+
+void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ // TODO(sync): Should we also check that job.purpose !=
+ // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.)
+ if (job.purpose == SyncSessionJob::NUDGE) {
+ SDVLOG(2) << "Saving a nudge job";
+ InitOrCoalescePendingJob(job);
+ } else if (job.purpose == SyncSessionJob::CONFIGURATION){
+ SDVLOG(2) << "Saving a configuration job";
+ DCHECK(wait_interval_.get());
+ DCHECK(mode_ == CONFIGURATION_MODE);
+
+ // Config params should always get set.
+ DCHECK(!job.config_params.ready_task.is_null());
+ SyncSession* old = job.session.get();
+ SyncSession* s(new SyncSession(session_context_, this, old->source(),
+ old->routing_info(), old->workers()));
+ SyncSessionJob new_job(job.purpose,
+ TimeTicks::Now(),
+ make_linked_ptr(s),
+ false,
+ job.config_params,
+ job.from_here);
+ wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
+ } // drop the rest.
+ // TODO(sync): Is it okay to drop the rest? It's weird that
+ // SaveJob() only does what it says sometimes. (See
+ // http://crbug.com/90868.)
+}
+
+// Functor for std::find_if to search by ModelSafeGroup.
+struct ModelSafeWorkerGroupIs {
+ explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
+ bool operator()(ModelSafeWorker* w) {
+ return group == w->GetModelSafeGroup();
+ }
+ ModelSafeGroup group;
+};
+
+void SyncSchedulerImpl::ScheduleNudgeAsync(
+ const TimeDelta& delay,
+ NudgeSource source, ModelTypeSet types,
+ const tracked_objects::Location& nudge_location) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG_LOC(nudge_location, 2)
+ << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
+ << "source " << GetNudgeSourceString(source) << ", "
+ << "types " << ModelTypeSetToString(types);
+
+ ModelTypePayloadMap types_with_payloads =
+ syncer::ModelTypePayloadMapFromEnumSet(types, std::string());
+ SyncSchedulerImpl::ScheduleNudgeImpl(delay,
+ GetUpdatesFromNudgeSource(source),
+ types_with_payloads,
+ false,
+ nudge_location);
+}
+
+void SyncSchedulerImpl::ScheduleNudgeWithPayloadsAsync(
+ const TimeDelta& delay,
+ NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
+ const tracked_objects::Location& nudge_location) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG_LOC(nudge_location, 2)
+ << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
+ << "source " << GetNudgeSourceString(source) << ", "
+ << "payloads "
+ << syncer::ModelTypePayloadMapToString(types_with_payloads);
+
+ SyncSchedulerImpl::ScheduleNudgeImpl(delay,
+ GetUpdatesFromNudgeSource(source),
+ types_with_payloads,
+ false,
+ nudge_location);
+}
+
+void SyncSchedulerImpl::ScheduleNudgeImpl(
+ const TimeDelta& delay,
+ GetUpdatesCallerInfo::GetUpdatesSource source,
+ const ModelTypePayloadMap& types_with_payloads,
+ bool is_canary_job, const tracked_objects::Location& nudge_location) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+
+ SDVLOG_LOC(nudge_location, 2)
+ << "In ScheduleNudgeImpl with delay "
+ << delay.InMilliseconds() << " ms, "
+ << "source " << GetUpdatesSourceString(source) << ", "
+ << "payloads "
+ << syncer::ModelTypePayloadMapToString(types_with_payloads)
+ << (is_canary_job ? " (canary)" : "");
+
+ SyncSourceInfo info(source, types_with_payloads);
+
+ SyncSession* session(CreateSyncSession(info));
+ SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
+ make_linked_ptr(session), is_canary_job,
+ ConfigurationParams(), nudge_location);
+
+ session = NULL;
+ if (!ShouldRunJob(job))
+ return;
+
+ if (pending_nudge_.get()) {
+ if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
+ SDVLOG(2) << "Dropping the nudge because we are in backoff";
+ return;
+ }
+
+ SDVLOG(2) << "Coalescing pending nudge";
+ pending_nudge_->session->Coalesce(*(job.session.get()));
+
+ SDVLOG(2) << "Rescheduling pending nudge";
+ SyncSession* s = pending_nudge_->session.get();
+ job.session.reset(new SyncSession(s->context(), s->delegate(),
+ s->source(), s->routing_info(), s->workers()));
+
+ // Choose the start time as the earliest of the 2.
+ job.scheduled_start = std::min(job.scheduled_start,
+ pending_nudge_->scheduled_start);
+ pending_nudge_.reset();
+ }
+
+ // TODO(zea): Consider adding separate throttling/backoff for datatype
+ // refresh requests.
+ ScheduleSyncSessionJob(job);
+}
+
+const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
+ switch (mode) {
+ ENUM_CASE(CONFIGURATION_MODE);
+ ENUM_CASE(NORMAL_MODE);
+ }
+ return "";
+}
+
+const char* SyncSchedulerImpl::GetDecisionString(
+ SyncSchedulerImpl::JobProcessDecision mode) {
+ switch (mode) {
+ ENUM_CASE(CONTINUE);
+ ENUM_CASE(SAVE);
+ ENUM_CASE(DROP);
+ }
+ return "";
+}
+
+// static
+void SyncSchedulerImpl::SetSyncerStepsForPurpose(
+ SyncSessionJob::SyncSessionJobPurpose purpose,
+ SyncerStep* start,
+ SyncerStep* end) {
+ switch (purpose) {
+ case SyncSessionJob::CONFIGURATION:
+ *start = DOWNLOAD_UPDATES;
+ *end = APPLY_UPDATES;
+ return;
+ case SyncSessionJob::NUDGE:
+ case SyncSessionJob::POLL:
+ *start = SYNCER_BEGIN;
+ *end = SYNCER_END;
+ return;
+ case SyncSessionJob::CLEANUP_DISABLED_TYPES:
+ *start = CLEANUP_DISABLED_TYPES;
+ *end = CLEANUP_DISABLED_TYPES;
+ return;
+ default:
+ NOTREACHED();
+ *start = SYNCER_END;
+ *end = SYNCER_END;
+ return;
+ }
+}
+
+void SyncSchedulerImpl::PostTask(
+ const tracked_objects::Location& from_here,
+ const char* name, const base::Closure& task) {
+ SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (!started_) {
+ SDVLOG(1) << "Not posting task as scheduler is stopped.";
+ return;
+ }
+ sync_loop_->PostTask(from_here, task);
+}
+
+void SyncSchedulerImpl::PostDelayedTask(
+ const tracked_objects::Location& from_here,
+ const char* name, const base::Closure& task, base::TimeDelta delay) {
+ SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
+ << delay.InMilliseconds() << " ms delay";
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (!started_) {
+ SDVLOG(1) << "Not posting task as scheduler is stopped.";
+ return;
+ }
+ sync_loop_->PostDelayedTask(from_here, task, delay);
+}
+
+void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ TimeDelta delay = job.scheduled_start - TimeTicks::Now();
+ if (delay < TimeDelta::FromMilliseconds(0))
+ delay = TimeDelta::FromMilliseconds(0);
+ SDVLOG_LOC(job.from_here, 2)
+ << "In ScheduleSyncSessionJob with "
+ << SyncSessionJob::GetPurposeString(job.purpose)
+ << " job and " << delay.InMilliseconds() << " ms delay";
+
+ DCHECK(job.purpose == SyncSessionJob::NUDGE ||
+ job.purpose == SyncSessionJob::POLL);
+ if (job.purpose == SyncSessionJob::NUDGE) {
+ SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge";
+ DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() ==
+ job.session);
+ pending_nudge_.reset(new SyncSessionJob(job));
+ }
+ PostDelayedTask(job.from_here, "DoSyncSessionJob",
+ base::Bind(&SyncSchedulerImpl::DoSyncSessionJob,
+ weak_ptr_factory_.GetWeakPtr(),
+ job),
+ delay);
+}
+
+void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (!ShouldRunJob(job)) {
+ SLOG(WARNING)
+ << "Not executing "
+ << SyncSessionJob::GetPurposeString(job.purpose) << " job from "
+ << GetUpdatesSourceString(job.session->source().updates_source);
+ return;
+ }
+
+ if (job.purpose == SyncSessionJob::NUDGE) {
+ if (pending_nudge_.get() == NULL ||
+ pending_nudge_->session != job.session) {
+ SDVLOG(2) << "Dropping a nudge in "
+ << "DoSyncSessionJob because another nudge was scheduled";
+ return; // Another nudge must have been scheduled in in the meantime.
+ }
+ pending_nudge_.reset();
+
+ // Create the session with the latest model safe table and use it to purge
+ // and update any disabled or modified entries in the job.
+ scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
+
+ job.session->RebaseRoutingInfoWithLatest(*session);
+ }
+ SDVLOG(2) << "DoSyncSessionJob with "
+ << SyncSessionJob::GetPurposeString(job.purpose) << " job";
+
+ SyncerStep begin(SYNCER_END);
+ SyncerStep end(SYNCER_END);
+ SetSyncerStepsForPurpose(job.purpose, &begin, &end);
+
+ bool has_more_to_sync = true;
+ while (ShouldRunJob(job) && has_more_to_sync) {
+ SDVLOG(2) << "Calling SyncShare.";
+ // Synchronously perform the sync session from this thread.
+ syncer_->SyncShare(job.session.get(), begin, end);
+ has_more_to_sync = job.session->HasMoreToSync();
+ if (has_more_to_sync)
+ job.session->PrepareForAnotherSyncCycle();
+ }
+ SDVLOG(2) << "Done SyncShare looping.";
+
+ FinishSyncSessionJob(job);
+}
+
+void SyncSchedulerImpl::UpdateCarryoverSessionState(
+ const SyncSessionJob& old_job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
+ // Whatever types were part of a configuration task will have had updates
+ // downloaded. For that reason, we make sure they get recorded in the
+ // event that they get disabled at a later time.
+ ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
+ if (!r.empty()) {
+ ModelSafeRoutingInfo temp_r;
+ ModelSafeRoutingInfo old_info(old_job.session->routing_info());
+ std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
+ std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
+ session_context_->set_previous_session_routing_info(temp_r);
+ }
+ } else {
+ session_context_->set_previous_session_routing_info(
+ old_job.session->routing_info());
+ }
+}
+
+void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ // Update timing information for how often datatypes are triggering nudges.
+ base::TimeTicks now = TimeTicks::Now();
+ if (!last_sync_session_end_time_.is_null()) {
+ ModelTypePayloadMap::const_iterator iter;
+ for (iter = job.session->source().types.begin();
+ iter != job.session->source().types.end();
+ ++iter) {
+#define PER_DATA_TYPE_MACRO(type_str) \
+ SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, \
+ now - last_sync_session_end_time_);
+ SYNC_DATA_TYPE_HISTOGRAM(iter->first);
+#undef PER_DATA_TYPE_MACRO
+ }
+ }
+ last_sync_session_end_time_ = now;
+
+ // Now update the status of the connection from SCM. We need this to decide
+ // whether we need to save/run future jobs. The notifications from SCM are not
+ // reliable.
+ //
+ // TODO(rlarocque): crbug.com/110954
+ // We should get rid of the notifications and it is probably not needed to
+ // maintain this status variable in 2 places. We should query it directly from
+ // SCM when needed.
+ ServerConnectionManager* scm = session_context_->connection_manager();
+ UpdateServerConnectionManagerStatus(scm->server_status());
+
+ UpdateCarryoverSessionState(job);
+ if (IsSyncingCurrentlySilenced()) {
+ SDVLOG(2) << "We are currently throttled; not scheduling the next sync.";
+ // TODO(sync): Investigate whether we need to check job.purpose
+ // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
+ SaveJob(job);
+ return; // Nothing to do.
+ } else if (job.session->Succeeded() &&
+ !job.config_params.ready_task.is_null()) {
+ // If this was a configuration job with a ready task, invoke it now that
+ // we finished successfully.
+ job.config_params.ready_task.Run();
+ }
+
+ SDVLOG(2) << "Updating the next polling time after SyncMain";
+ ScheduleNextSync(job);
+}
+
+void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(!old_job.session->HasMoreToSync());
+
+ AdjustPolling(&old_job);
+
+ if (old_job.session->Succeeded()) {
+ // Only reset backoff if we actually reached the server.
+ if (old_job.session->SuccessfullyReachedServer())
+ wait_interval_.reset();
+ SDVLOG(2) << "Job succeeded so not scheduling more jobs";
+ return;
+ }
+
+ if (old_job.purpose == SyncSessionJob::POLL) {
+ return; // We don't retry POLL jobs.
+ }
+
+ // TODO(rlarocque): There's no reason why we should blindly backoff and retry
+ // if we don't succeed. Some types of errors are not likely to disappear on
+ // their own. With the return values now available in the old_job.session, we
+ // should be able to detect such errors and only retry when we detect
+ // transient errors.
+
+ if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
+ mode_ == NORMAL_MODE) {
+ // When in normal mode, we allow up to one nudge per backoff interval. It
+ // appears that this was our nudge for this interval, and it failed.
+ //
+ // Note: This does not prevent us from running canary jobs. For example, an
+ // IP address change might still result in another nudge being executed
+ // during this backoff interval.
+ SDVLOG(2) << "A nudge during backoff failed";
+
+ DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
+ DCHECK(!wait_interval_->had_nudge);
+
+ wait_interval_->had_nudge = true;
+ InitOrCoalescePendingJob(old_job);
+ RestartWaiting();
+ } else {
+ // Either this is the first failure or a consecutive failure after our
+ // backoff timer expired. We handle it the same way in either case.
+ SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
+ HandleContinuationError(old_job);
+ }
+}
+
+void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+
+ TimeDelta poll = (!session_context_->notifications_enabled()) ?
+ syncer_short_poll_interval_seconds_ :
+ syncer_long_poll_interval_seconds_;
+ bool rate_changed = !poll_timer_.IsRunning() ||
+ poll != poll_timer_.GetCurrentDelay();
+
+ if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
+ poll_timer_.Reset();
+
+ if (!rate_changed)
+ return;
+
+ // Adjust poll rate.
+ poll_timer_.Stop();
+ poll_timer_.Start(FROM_HERE, poll, this,
+ &SyncSchedulerImpl::PollTimerCallback);
+}
+
+void SyncSchedulerImpl::RestartWaiting() {
+ CHECK(wait_interval_.get());
+ wait_interval_->timer.Stop();
+ wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
+ this, &SyncSchedulerImpl::DoCanaryJob);
+}
+
+void SyncSchedulerImpl::HandleContinuationError(
+ const SyncSessionJob& old_job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (DCHECK_IS_ON()) {
+ if (IsBackingOff()) {
+ DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
+ }
+ }
+
+ TimeDelta length = delay_provider_->GetDelay(
+ IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
+
+ SDVLOG(2) << "In handle continuation error with "
+ << SyncSessionJob::GetPurposeString(old_job.purpose)
+ << " job. The time delta(ms) is "
+ << length.InMilliseconds();
+
+ // This will reset the had_nudge variable as well.
+ wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
+ length));
+ if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
+ SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
+ // Config params should always get set.
+ DCHECK(!old_job.config_params.ready_task.is_null());
+ SyncSession* old = old_job.session.get();
+ SyncSession* s(new SyncSession(session_context_, this,
+ old->source(), old->routing_info(), old->workers()));
+ SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
+ make_linked_ptr(s), false, old_job.config_params,
+ FROM_HERE);
+ wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
+ } else {
+ // We are not in configuration mode. So wait_interval's pending job
+ // should be null.
+ DCHECK(wait_interval_->pending_configure_job.get() == NULL);
+
+ // TODO(lipalani) - handle clear user data.
+ InitOrCoalescePendingJob(old_job);
+ }
+ RestartWaiting();
+}
+
+// static
+TimeDelta SyncSchedulerImpl::GetRecommendedDelay(const TimeDelta& last_delay) {
+ if (last_delay.InSeconds() >= kMaxBackoffSeconds)
+ return TimeDelta::FromSeconds(kMaxBackoffSeconds);
+
+ // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
+ int64 backoff_s =
+ std::max(static_cast<int64>(1),
+ last_delay.InSeconds() * kBackoffRandomizationFactor);
+
+ // Flip a coin to randomize backoff interval by +/- 50%.
+ int rand_sign = base::RandInt(0, 1) * 2 - 1;
+
+ // Truncation is adequate for rounding here.
+ backoff_s = backoff_s +
+ (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
+
+ // Cap the backoff interval.
+ backoff_s = std::max(static_cast<int64>(1),
+ std::min(backoff_s, kMaxBackoffSeconds));
+
+ return TimeDelta::FromSeconds(backoff_s);
+}
+
+void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
+ syncer_->RequestEarlyExit(); // Safe to call from any thread.
+ DCHECK(weak_handle_this_.IsInitialized());
+ SDVLOG(3) << "Posting StopImpl";
+ weak_handle_this_.Call(FROM_HERE,
+ &SyncSchedulerImpl::StopImpl,
+ callback);
+}
+
+void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG(2) << "StopImpl called";
+
+ // Kill any in-flight method calls.
+ weak_ptr_factory_.InvalidateWeakPtrs();
+ wait_interval_.reset();
+ poll_timer_.Stop();
+ if (started_) {
+ started_ = false;
+ }
+ if (!callback.is_null())
+ callback.Run();
+}
+
+void SyncSchedulerImpl::DoCanaryJob() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG(2) << "Do canary job";
+ DoPendingJobIfPossible(true);
+}
+
+void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SyncSessionJob* job_to_execute = NULL;
+ if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
+ && wait_interval_->pending_configure_job.get()) {
+ SDVLOG(2) << "Found pending configure job";
+ job_to_execute = wait_interval_->pending_configure_job.get();
+ } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
+ SDVLOG(2) << "Found pending nudge job";
+ // Pending jobs mostly have time from the past. Reset it so this job
+ // will get executed.
+ if (pending_nudge_->scheduled_start < TimeTicks::Now())
+ pending_nudge_->scheduled_start = TimeTicks::Now();
+
+ scoped_ptr<SyncSession> session(CreateSyncSession(
+ pending_nudge_->session->source()));
+
+ // Also the routing info might have been changed since we cached the
+ // pending nudge. Update it by coalescing to the latest.
+ pending_nudge_->session->Coalesce(*(session.get()));
+ // The pending nudge would be cleared in the DoSyncSessionJob function.
+ job_to_execute = pending_nudge_.get();
+ }
+
+ if (job_to_execute != NULL) {
+ SDVLOG(2) << "Executing pending job";
+ SyncSessionJob copy = *job_to_execute;
+ copy.is_canary_job = is_canary_job;
+ DoSyncSessionJob(copy);
+ }
+}
+
+SyncSession* SyncSchedulerImpl::CreateSyncSession(
+ const SyncSourceInfo& source) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DVLOG(2) << "Creating sync session with routes "
+ << ModelSafeRoutingInfoToString(session_context_->routing_info());
+
+ SyncSourceInfo info(source);
+ SyncSession* session(new SyncSession(session_context_, this, info,
+ session_context_->routing_info(), session_context_->workers()));
+
+ return session;
+}
+
+void SyncSchedulerImpl::PollTimerCallback() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ ModelSafeRoutingInfo r;
+ ModelTypePayloadMap types_with_payloads =
+ ModelSafeRoutingInfoToPayloadMap(r, std::string());
+ SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
+ SyncSession* s = CreateSyncSession(info);
+
+ SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(),
+ make_linked_ptr(s),
+ false,
+ ConfigurationParams(),
+ FROM_HERE);
+
+ ScheduleSyncSessionJob(job);
+}
+
+void SyncSchedulerImpl::Unthrottle() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
+ SDVLOG(2) << "Unthrottled.";
+ DoCanaryJob();
+ wait_interval_.reset();
+}
+
+void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ session_context_->NotifyListeners(SyncEngineEvent(cause));
+}
+
+bool SyncSchedulerImpl::IsBackingOff() const {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ return wait_interval_.get() && wait_interval_->mode ==
+ WaitInterval::EXPONENTIAL_BACKOFF;
+}
+
+void SyncSchedulerImpl::OnSilencedUntil(
+ const base::TimeTicks& silenced_until) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
+ silenced_until - TimeTicks::Now()));
+ wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
+ &SyncSchedulerImpl::Unthrottle);
+}
+
+bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ return wait_interval_.get() && wait_interval_->mode ==
+ WaitInterval::THROTTLED;
+}
+
+void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
+ const base::TimeDelta& new_interval) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ syncer_short_poll_interval_seconds_ = new_interval;
+}
+
+void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
+ const base::TimeDelta& new_interval) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ syncer_long_poll_interval_seconds_ = new_interval;
+}
+
+void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
+ const base::TimeDelta& new_delay) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ sessions_commit_delay_ = new_delay;
+}
+
+void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG(2) << "OnShouldStopSyncingPermanently";
+ syncer_->RequestEarlyExit(); // Thread-safe.
+ Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
+}
+
+void SyncSchedulerImpl::OnActionableError(
+ const sessions::SyncSessionSnapshot& snap) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ SDVLOG(2) << "OnActionableError";
+ SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
+ event.snapshot = snap;
+ session_context_->NotifyListeners(event);
+}
+
+void SyncSchedulerImpl::OnSyncProtocolError(
+ const sessions::SyncSessionSnapshot& snapshot) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ if (ShouldRequestEarlyExit(
+ snapshot.model_neutral_state().sync_protocol_error)) {
+ SDVLOG(2) << "Sync Scheduler requesting early exit.";
+ syncer_->RequestEarlyExit(); // Thread-safe.
+ }
+ if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error))
+ OnActionableError(snapshot);
+}
+
+void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ session_context_->set_notifications_enabled(notifications_enabled);
+}
+
+base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
+ DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ return sessions_commit_delay_;
+}
+
+#undef SDVLOG_LOC
+
+#undef SDVLOG
+
+#undef SLOG
+
+#undef ENUM_CASE
+
+} // namespace syncer
diff --git a/sync/engine/sync_scheduler_impl.h b/sync/engine/sync_scheduler_impl.h
new file mode 100644
index 0000000..dfc3bff
--- /dev/null
+++ b/sync/engine/sync_scheduler_impl.h
@@ -0,0 +1,360 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_SYNC_SCHEDULER_IMPL_H_
+#define SYNC_ENGINE_SYNC_SCHEDULER_IMPL_H_
+
+#include <string>
+
+#include "base/callback.h"
+#include "base/compiler_specific.h"
+#include "base/gtest_prod_util.h"
+#include "base/memory/linked_ptr.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/observer_list.h"
+#include "base/time.h"
+#include "base/timer.h"
+#include "sync/engine/net/server_connection_manager.h"
+#include "sync/engine/nudge_source.h"
+#include "sync/engine/sync_scheduler.h"
+#include "sync/engine/syncer.h"
+#include "sync/internal_api/public/base/model_type_payload_map.h"
+#include "sync/internal_api/public/engine/polling_constants.h"
+#include "sync/internal_api/public/util/weak_handle.h"
+#include "sync/sessions/sync_session.h"
+#include "sync/sessions/sync_session_context.h"
+
+namespace syncer {
+
+class SyncSchedulerImpl : public SyncScheduler {
+ public:
+ // |name| is a display string to identify the syncer thread. Takes
+ // |ownership of |syncer|.
+ SyncSchedulerImpl(const std::string& name,
+ sessions::SyncSessionContext* context, Syncer* syncer);
+
+ // Calls Stop().
+ virtual ~SyncSchedulerImpl();
+
+ virtual void Start(Mode mode) OVERRIDE;
+ virtual bool ScheduleConfiguration(
+ const ConfigurationParams& params) OVERRIDE;
+ virtual void RequestStop(const base::Closure& callback) OVERRIDE;
+ virtual void ScheduleNudgeAsync(
+ const base::TimeDelta& delay,
+ NudgeSource source,
+ syncer::ModelTypeSet types,
+ const tracked_objects::Location& nudge_location) OVERRIDE;
+ virtual void ScheduleNudgeWithPayloadsAsync(
+ const base::TimeDelta& delay, NudgeSource source,
+ const syncer::ModelTypePayloadMap& types_with_payloads,
+ const tracked_objects::Location& nudge_location) OVERRIDE;
+ virtual void SetNotificationsEnabled(bool notifications_enabled) OVERRIDE;
+
+ virtual base::TimeDelta GetSessionsCommitDelay() const OVERRIDE;
+
+ virtual void OnCredentialsUpdated() OVERRIDE;
+ virtual void OnConnectionStatusChange() OVERRIDE;
+
+ // SyncSession::Delegate implementation.
+ virtual void OnSilencedUntil(
+ const base::TimeTicks& silenced_until) OVERRIDE;
+ virtual bool IsSyncingCurrentlySilenced() OVERRIDE;
+ virtual void OnReceivedShortPollIntervalUpdate(
+ const base::TimeDelta& new_interval) OVERRIDE;
+ virtual void OnReceivedLongPollIntervalUpdate(
+ const base::TimeDelta& new_interval) OVERRIDE;
+ virtual void OnReceivedSessionsCommitDelay(
+ const base::TimeDelta& new_delay) OVERRIDE;
+ virtual void OnShouldStopSyncingPermanently() OVERRIDE;
+ virtual void OnSyncProtocolError(
+ const sessions::SyncSessionSnapshot& snapshot) OVERRIDE;
+
+ // DDOS avoidance function. Calculates how long we should wait before trying
+ // again after a failed sync attempt, where the last delay was |base_delay|.
+ // TODO(tim): Look at URLRequestThrottlerEntryInterface.
+ static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay);
+
+ private:
+ enum JobProcessDecision {
+ // Indicates we should continue with the current job.
+ CONTINUE,
+ // Indicates that we should save it to be processed later.
+ SAVE,
+ // Indicates we should drop this job.
+ DROP,
+ };
+
+ struct SyncSessionJob {
+ // An enum used to describe jobs for scheduling purposes.
+ enum SyncSessionJobPurpose {
+ // Uninitialized state, should never be hit in practice.
+ UNKNOWN = -1,
+ // Our poll timer schedules POLL jobs periodically based on a server
+ // assigned poll interval.
+ POLL,
+ // A nudge task can come from a variety of components needing to force
+ // a sync. The source is inferable from |session.source()|.
+ NUDGE,
+ // Typically used for fetching updates for a subset of the enabled types
+ // during initial sync or reconfiguration. We don't run all steps of
+ // the sync cycle for these (e.g. CleanupDisabledTypes is skipped).
+ CONFIGURATION,
+ // The user disabled some types and we have to clean up the data
+ // for those.
+ CLEANUP_DISABLED_TYPES,
+ };
+ SyncSessionJob();
+ SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start,
+ linked_ptr<sessions::SyncSession> session, bool is_canary_job,
+ const ConfigurationParams& config_params,
+ const tracked_objects::Location& nudge_location);
+ ~SyncSessionJob();
+ static const char* GetPurposeString(SyncSessionJobPurpose purpose);
+
+ SyncSessionJobPurpose purpose;
+ base::TimeTicks scheduled_start;
+ linked_ptr<sessions::SyncSession> session;
+ bool is_canary_job;
+ ConfigurationParams config_params;
+
+ // This is the location the job came from. Used for debugging.
+ // In case of multiple nudges getting coalesced this stores the
+ // first location that came in.
+ tracked_objects::Location from_here;
+ };
+ friend class SyncSchedulerTest;
+ friend class SyncSchedulerWhiteboxTest;
+ friend class SyncerTest;
+
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
+ DropNudgeWhileExponentialBackOff);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, SaveNudge);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
+ SaveNudgeWhileTypeThrottled);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueNudge);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, DropPoll);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinuePoll);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest, ContinueConfiguration);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
+ SaveConfigurationWhileThrottled);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
+ SaveNudgeWhileThrottled);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
+ ContinueCanaryJobConfig);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerWhiteboxTest,
+ ContinueNudgeWhileExponentialBackOff);
+ FRIEND_TEST_ALL_PREFIXES(SyncSchedulerTest, TransientPollFailure);
+
+ // A component used to get time delays associated with exponential backoff.
+ // Encapsulated into a class to facilitate testing.
+ class DelayProvider {
+ public:
+ DelayProvider();
+ virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay);
+ virtual ~DelayProvider();
+ private:
+ DISALLOW_COPY_AND_ASSIGN(DelayProvider);
+ };
+
+ struct WaitInterval {
+ enum Mode {
+ // Uninitialized state, should not be set in practice.
+ UNKNOWN = -1,
+ // A wait interval whose duration has been affected by exponential
+ // backoff.
+ // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval.
+ EXPONENTIAL_BACKOFF,
+ // A server-initiated throttled interval. We do not allow any syncing
+ // during such an interval.
+ THROTTLED,
+ };
+ WaitInterval();
+ ~WaitInterval();
+ WaitInterval(Mode mode, base::TimeDelta length);
+
+ static const char* GetModeString(Mode mode);
+
+ Mode mode;
+
+ // This bool is set to true if we have observed a nudge during this
+ // interval and mode == EXPONENTIAL_BACKOFF.
+ bool had_nudge;
+ base::TimeDelta length;
+ base::OneShotTimer<SyncSchedulerImpl> timer;
+
+ // Configure jobs are saved only when backing off or throttling. So we
+ // expose the pointer here.
+ scoped_ptr<SyncSessionJob> pending_configure_job;
+ };
+
+ static const char* GetModeString(Mode mode);
+
+ static const char* GetDecisionString(JobProcessDecision decision);
+
+ // Assign |start| and |end| to appropriate SyncerStep values for the
+ // specified |purpose|.
+ static void SetSyncerStepsForPurpose(
+ SyncSessionJob::SyncSessionJobPurpose purpose,
+ SyncerStep* start, SyncerStep* end);
+
+ // Helpers that log before posting to |sync_loop_|. These will only post
+ // the task in between calls to Start/Stop.
+ void PostTask(const tracked_objects::Location& from_here,
+ const char* name,
+ const base::Closure& task);
+ void PostDelayedTask(const tracked_objects::Location& from_here,
+ const char* name,
+ const base::Closure& task,
+ base::TimeDelta delay);
+
+ // Helper to assemble a job and post a delayed task to sync.
+ void ScheduleSyncSessionJob(const SyncSessionJob& job);
+
+ // Invoke the Syncer to perform a sync.
+ void DoSyncSessionJob(const SyncSessionJob& job);
+
+ // Called after the Syncer has performed the sync represented by |job|, to
+ // reset our state.
+ void FinishSyncSessionJob(const SyncSessionJob& job);
+
+ // Record important state that might be needed in future syncs, such as which
+ // data types may require cleanup.
+ void UpdateCarryoverSessionState(const SyncSessionJob& old_job);
+
+ // Helper to FinishSyncSessionJob to schedule the next sync operation.
+ void ScheduleNextSync(const SyncSessionJob& old_job);
+
+ // Helper to configure polling intervals. Used by Start and ScheduleNextSync.
+ void AdjustPolling(const SyncSessionJob* old_job);
+
+ // Helper to restart waiting with |wait_interval_|'s timer.
+ void RestartWaiting();
+
+ // Helper to ScheduleNextSync in case of consecutive sync errors.
+ void HandleContinuationError(const SyncSessionJob& old_job);
+
+ // Determines if it is legal to run |job| by checking current
+ // operational mode, backoff or throttling, freshness
+ // (so we don't make redundant syncs), and connection.
+ bool ShouldRunJob(const SyncSessionJob& job);
+
+ // Decide whether we should CONTINUE, SAVE or DROP the job.
+ JobProcessDecision DecideOnJob(const SyncSessionJob& job);
+
+ // Decide on whether to CONTINUE, SAVE or DROP the job when we are in
+ // backoff mode.
+ JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job);
+
+ // Saves the job for future execution. Note: It drops all the poll jobs.
+ void SaveJob(const SyncSessionJob& job);
+
+ // Coalesces the current job with the pending nudge.
+ void InitOrCoalescePendingJob(const SyncSessionJob& job);
+
+ // 'Impl' here refers to real implementation of public functions, running on
+ // |thread_|.
+ void StopImpl(const base::Closure& callback);
+ void ScheduleNudgeImpl(
+ const base::TimeDelta& delay,
+ sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source,
+ const syncer::ModelTypePayloadMap& types_with_payloads,
+ bool is_canary_job, const tracked_objects::Location& nudge_location);
+
+ // Returns true if the client is currently in exponential backoff.
+ bool IsBackingOff() const;
+
+ // Helper to signal all listeners registered with |session_context_|.
+ void Notify(SyncEngineEvent::EventCause cause);
+
+ // Callback to change backoff state.
+ void DoCanaryJob();
+ void Unthrottle();
+
+ // Executes the pending job. Called whenever an event occurs that may
+ // change conditions permitting a job to run. Like when network connection is
+ // re-established, mode changes etc.
+ void DoPendingJobIfPossible(bool is_canary_job);
+
+ // Called when the root cause of the current connection error is fixed.
+ void OnServerConnectionErrorFixed();
+
+ // The pointer is owned by the caller.
+ syncer::sessions::SyncSession* CreateSyncSession(
+ const syncer::sessions::SyncSourceInfo& info);
+
+ // Creates a session for a poll and performs the sync.
+ void PollTimerCallback();
+
+ // Used to update |connection_code_|, see below.
+ void UpdateServerConnectionManagerStatus(
+ HttpResponse::ServerConnectionCode code);
+
+ // Called once the first time thread_ is started to broadcast an initial
+ // session snapshot containing data like initial_sync_ended. Important when
+ // the client starts up and does not need to perform an initial sync.
+ void SendInitialSnapshot();
+
+ virtual void OnActionableError(const sessions::SyncSessionSnapshot& snapshot);
+
+ base::WeakPtrFactory<SyncSchedulerImpl> weak_ptr_factory_;
+
+ // A second factory specially for weak_handle_this_, to allow the handle
+ // to be const and alleviate threading concerns.
+ base::WeakPtrFactory<SyncSchedulerImpl> weak_ptr_factory_for_weak_handle_;
+
+ // For certain methods that need to worry about X-thread posting.
+ const WeakHandle<SyncSchedulerImpl> weak_handle_this_;
+
+ // Used for logging.
+ const std::string name_;
+
+ // The message loop this object is on. Almost all methods have to
+ // be called on this thread.
+ MessageLoop* const sync_loop_;
+
+ // Set in Start(), unset in Stop().
+ bool started_;
+
+ // Modifiable versions of kDefaultLongPollIntervalSeconds which can be
+ // updated by the server.
+ base::TimeDelta syncer_short_poll_interval_seconds_;
+ base::TimeDelta syncer_long_poll_interval_seconds_;
+
+ // Server-tweakable sessions commit delay.
+ base::TimeDelta sessions_commit_delay_;
+
+ // Periodic timer for polling. See AdjustPolling.
+ base::RepeatingTimer<SyncSchedulerImpl> poll_timer_;
+
+ // The mode of operation.
+ Mode mode_;
+
+ // TODO(tim): Bug 26339. This needs to track more than just time I think,
+ // since the nudges could be for different types. Current impl doesn't care.
+ base::TimeTicks last_sync_session_end_time_;
+
+ // The latest connection code we got while trying to connect.
+ HttpResponse::ServerConnectionCode connection_code_;
+
+ // Tracks in-flight nudges so we can coalesce.
+ scoped_ptr<SyncSessionJob> pending_nudge_;
+
+ // Current wait state. Null if we're not in backoff and not throttled.
+ scoped_ptr<WaitInterval> wait_interval_;
+
+ scoped_ptr<DelayProvider> delay_provider_;
+
+ // Invoked to run through the sync cycle.
+ scoped_ptr<Syncer> syncer_;
+
+ sessions::SyncSessionContext *session_context_;
+
+ DISALLOW_COPY_AND_ASSIGN(SyncSchedulerImpl);
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_SYNC_SCHEDULER_IMPL_H_
diff --git a/sync/engine/sync_scheduler_unittest.cc b/sync/engine/sync_scheduler_unittest.cc
index 6c3e3eb..add275f 100644
--- a/sync/engine/sync_scheduler_unittest.cc
+++ b/sync/engine/sync_scheduler_unittest.cc
@@ -8,7 +8,7 @@
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
#include "base/test/test_timeouts.h"
-#include "sync/engine/sync_scheduler.h"
+#include "sync/engine/sync_scheduler_impl.h"
#include "sync/engine/syncer.h"
#include "sync/engine/throttled_data_type_tracker.h"
#include "sync/sessions/test_util.h"
@@ -88,7 +88,7 @@ class SyncSchedulerTest : public testing::Test {
syncer_(NULL),
delay_(NULL) {}
- class MockDelayProvider : public SyncScheduler::DelayProvider {
+ class MockDelayProvider : public SyncSchedulerImpl::DelayProvider {
public:
MOCK_METHOD1(GetDelay, TimeDelta(const TimeDelta&));
};
@@ -124,10 +124,10 @@ class SyncSchedulerTest : public testing::Test {
context_->set_notifications_enabled(true);
context_->set_account_name("Test");
scheduler_.reset(
- new SyncScheduler("TestSyncScheduler", context(), syncer_));
+ new SyncSchedulerImpl("TestSyncScheduler", context(), syncer_));
}
- SyncScheduler* scheduler() { return scheduler_.get(); }
+ SyncSchedulerImpl* scheduler() { return scheduler_.get(); }
MockSyncer* syncer() { return syncer_; }
MockDelayProvider* delay() { return delay_; }
MockConnectionManager* connection() { return connection_.get(); }
@@ -215,7 +215,7 @@ class SyncSchedulerTest : public testing::Test {
TestDirectorySetterUpper dir_maker_;
scoped_ptr<MockConnectionManager> connection_;
scoped_ptr<SyncSessionContext> context_;
- scoped_ptr<SyncScheduler> scheduler_;
+ scoped_ptr<SyncSchedulerImpl> scheduler_;
MockSyncer* syncer_;
MockDelayProvider* delay_;
std::vector<scoped_refptr<FakeModelWorker> > workers_;
@@ -618,7 +618,7 @@ TEST_F(SyncSchedulerTest, PollNotificationsDisabled) {
WithArg<0>(RecordSyncShareMultiple(&records, kMinNumSamples))));
scheduler()->OnReceivedShortPollIntervalUpdate(poll_interval);
- scheduler()->set_notifications_enabled(false);
+ scheduler()->SetNotificationsEnabled(false);
TimeTicks optimal_start = TimeTicks::Now() + poll_interval;
StartSyncScheduler(SyncScheduler::NORMAL_MODE);
@@ -670,16 +670,16 @@ TEST_F(SyncSchedulerTest, SessionsCommitDelay) {
Invoke(sessions::test_util::SimulateSuccess),
QuitLoopNowAction()));
- EXPECT_EQ(delay1, scheduler()->sessions_commit_delay());
+ EXPECT_EQ(delay1, scheduler()->GetSessionsCommitDelay());
StartSyncScheduler(SyncScheduler::NORMAL_MODE);
- EXPECT_EQ(delay1, scheduler()->sessions_commit_delay());
+ EXPECT_EQ(delay1, scheduler()->GetSessionsCommitDelay());
const ModelTypeSet model_types(syncer::BOOKMARKS);
scheduler()->ScheduleNudgeAsync(
zero(), NUDGE_SOURCE_LOCAL, model_types, FROM_HERE);
RunLoop();
- EXPECT_EQ(delay2, scheduler()->sessions_commit_delay());
+ EXPECT_EQ(delay2, scheduler()->GetSessionsCommitDelay());
StopSyncScheduler();
}
@@ -1047,18 +1047,20 @@ TEST_F(SyncSchedulerTest, TransientPollFailure) {
TEST_F(SyncSchedulerTest, GetRecommendedDelay) {
EXPECT_LE(TimeDelta::FromSeconds(0),
- SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(0)));
+ SyncSchedulerImpl::GetRecommendedDelay(TimeDelta::FromSeconds(0)));
EXPECT_LE(TimeDelta::FromSeconds(1),
- SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(1)));
+ SyncSchedulerImpl::GetRecommendedDelay(TimeDelta::FromSeconds(1)));
EXPECT_LE(TimeDelta::FromSeconds(50),
- SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(50)));
+ SyncSchedulerImpl::GetRecommendedDelay(
+ TimeDelta::FromSeconds(50)));
EXPECT_LE(TimeDelta::FromSeconds(10),
- SyncScheduler::GetRecommendedDelay(TimeDelta::FromSeconds(10)));
+ SyncSchedulerImpl::GetRecommendedDelay(
+ TimeDelta::FromSeconds(10)));
EXPECT_EQ(TimeDelta::FromSeconds(kMaxBackoffSeconds),
- SyncScheduler::GetRecommendedDelay(
+ SyncSchedulerImpl::GetRecommendedDelay(
TimeDelta::FromSeconds(kMaxBackoffSeconds)));
EXPECT_EQ(TimeDelta::FromSeconds(kMaxBackoffSeconds),
- SyncScheduler::GetRecommendedDelay(
+ SyncSchedulerImpl::GetRecommendedDelay(
TimeDelta::FromSeconds(kMaxBackoffSeconds + 1)));
}
diff --git a/sync/engine/sync_scheduler_whitebox_unittest.cc b/sync/engine/sync_scheduler_whitebox_unittest.cc
index 760083f..71af129 100644
--- a/sync/engine/sync_scheduler_whitebox_unittest.cc
+++ b/sync/engine/sync_scheduler_whitebox_unittest.cc
@@ -4,7 +4,7 @@
#include "base/message_loop.h"
#include "base/time.h"
-#include "sync/engine/sync_scheduler.h"
+#include "sync/engine/sync_scheduler_impl.h"
#include "sync/engine/throttled_data_type_tracker.h"
#include "sync/sessions/sync_session_context.h"
#include "sync/sessions/test_util.h"
@@ -55,7 +55,7 @@ class SyncSchedulerWhiteboxTest : public testing::Test {
context_->set_notifications_enabled(true);
context_->set_account_name("Test");
scheduler_.reset(
- new SyncScheduler("TestSyncSchedulerWhitebox", context(), syncer));
+ new SyncSchedulerImpl("TestSyncSchedulerWhitebox", context(), syncer));
}
virtual void TearDown() {
@@ -75,14 +75,14 @@ class SyncSchedulerWhiteboxTest : public testing::Test {
}
void SetWaitIntervalToThrottled() {
- scheduler_->wait_interval_.reset(new SyncScheduler::WaitInterval(
- SyncScheduler::WaitInterval::THROTTLED, TimeDelta::FromSeconds(1)));
+ scheduler_->wait_interval_.reset(new SyncSchedulerImpl::WaitInterval(
+ SyncSchedulerImpl::WaitInterval::THROTTLED, TimeDelta::FromSeconds(1)));
}
void SetWaitIntervalToExponentialBackoff() {
scheduler_->wait_interval_.reset(
- new SyncScheduler::WaitInterval(
- SyncScheduler::WaitInterval::EXPONENTIAL_BACKOFF,
+ new SyncSchedulerImpl::WaitInterval(
+ SyncSchedulerImpl::WaitInterval::EXPONENTIAL_BACKOFF,
TimeDelta::FromSeconds(1)));
}
@@ -90,8 +90,8 @@ class SyncSchedulerWhiteboxTest : public testing::Test {
scheduler_->wait_interval_->had_nudge = had_nudge;
}
- SyncScheduler::JobProcessDecision DecideOnJob(
- const SyncScheduler::SyncSessionJob& job) {
+ SyncSchedulerImpl::JobProcessDecision DecideOnJob(
+ const SyncSchedulerImpl::SyncSessionJob& job) {
return scheduler_->DecideOnJob(job);
}
@@ -101,10 +101,10 @@ class SyncSchedulerWhiteboxTest : public testing::Test {
SetLastSyncedTime(base::TimeTicks::Now());
}
- SyncScheduler::JobProcessDecision CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) {
+ SyncSchedulerImpl::JobProcessDecision CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
SyncSession* s = scheduler_->CreateSyncSession(SyncSourceInfo());
- SyncScheduler::SyncSessionJob job(purpose, TimeTicks::Now(),
+ SyncSchedulerImpl::SyncSessionJob job(purpose, TimeTicks::Now(),
make_linked_ptr(s),
false,
ConfigurationParams(),
@@ -125,7 +125,7 @@ class SyncSchedulerWhiteboxTest : public testing::Test {
protected:
// Declared here to ensure it is destructed before the objects it references.
- scoped_ptr<SyncScheduler> scheduler_;
+ scoped_ptr<SyncSchedulerImpl> scheduler_;
};
TEST_F(SyncSchedulerWhiteboxTest, SaveNudge) {
@@ -134,10 +134,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudge) {
// Now set the mode to configure.
SetMode(SyncScheduler::CONFIGURATION_MODE);
- SyncScheduler::JobProcessDecision decision =
- CreateAndDecideJob(SyncScheduler::SyncSessionJob::NUDGE);
+ SyncSchedulerImpl::JobProcessDecision decision =
+ CreateAndDecideJob(SyncSchedulerImpl::SyncSessionJob::NUDGE);
- EXPECT_EQ(decision, SyncScheduler::SAVE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::SAVE);
}
TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileTypeThrottled) {
@@ -157,53 +157,54 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileTypeThrottled) {
SyncSession* s = scheduler_->CreateSyncSession(info);
// Now schedule a nudge with just bookmarks and the change is local.
- SyncScheduler::SyncSessionJob job(SyncScheduler::SyncSessionJob::NUDGE,
- TimeTicks::Now(),
- make_linked_ptr(s),
- false,
- ConfigurationParams(),
- FROM_HERE);
-
- SyncScheduler::JobProcessDecision decision = DecideOnJob(job);
- EXPECT_EQ(decision, SyncScheduler::SAVE);
+ SyncSchedulerImpl::SyncSessionJob job(
+ SyncSchedulerImpl::SyncSessionJob::NUDGE,
+ TimeTicks::Now(),
+ make_linked_ptr(s),
+ false,
+ ConfigurationParams(),
+ FROM_HERE);
+
+ SyncSchedulerImpl::JobProcessDecision decision = DecideOnJob(job);
+ EXPECT_EQ(decision, SyncSchedulerImpl::SAVE);
}
TEST_F(SyncSchedulerWhiteboxTest, ContinueNudge) {
InitializeSyncerOnNormalMode();
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::NUDGE);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::NUDGE);
- EXPECT_EQ(decision, SyncScheduler::CONTINUE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE);
}
TEST_F(SyncSchedulerWhiteboxTest, DropPoll) {
InitializeSyncerOnNormalMode();
SetMode(SyncScheduler::CONFIGURATION_MODE);
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::POLL);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::POLL);
- EXPECT_EQ(decision, SyncScheduler::DROP);
+ EXPECT_EQ(decision, SyncSchedulerImpl::DROP);
}
TEST_F(SyncSchedulerWhiteboxTest, ContinuePoll) {
InitializeSyncerOnNormalMode();
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::POLL);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::POLL);
- EXPECT_EQ(decision, SyncScheduler::CONTINUE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE);
}
TEST_F(SyncSchedulerWhiteboxTest, ContinueConfiguration) {
InitializeSyncerOnNormalMode();
SetMode(SyncScheduler::CONFIGURATION_MODE);
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::CONFIGURATION);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::CONFIGURATION);
- EXPECT_EQ(decision, SyncScheduler::CONTINUE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE);
}
TEST_F(SyncSchedulerWhiteboxTest, SaveConfigurationWhileThrottled) {
@@ -212,10 +213,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveConfigurationWhileThrottled) {
SetWaitIntervalToThrottled();
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::CONFIGURATION);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::CONFIGURATION);
- EXPECT_EQ(decision, SyncScheduler::SAVE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::SAVE);
}
TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileThrottled) {
@@ -224,10 +225,10 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileThrottled) {
SetWaitIntervalToThrottled();
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::NUDGE);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::NUDGE);
- EXPECT_EQ(decision, SyncScheduler::SAVE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::SAVE);
}
TEST_F(SyncSchedulerWhiteboxTest, ContinueNudgeWhileExponentialBackOff) {
@@ -235,10 +236,10 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueNudgeWhileExponentialBackOff) {
SetMode(SyncScheduler::NORMAL_MODE);
SetWaitIntervalToExponentialBackoff();
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::NUDGE);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::NUDGE);
- EXPECT_EQ(decision, SyncScheduler::CONTINUE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE);
}
TEST_F(SyncSchedulerWhiteboxTest, DropNudgeWhileExponentialBackOff) {
@@ -247,10 +248,10 @@ TEST_F(SyncSchedulerWhiteboxTest, DropNudgeWhileExponentialBackOff) {
SetWaitIntervalToExponentialBackoff();
SetWaitIntervalHadNudge(true);
- SyncScheduler::JobProcessDecision decision = CreateAndDecideJob(
- SyncScheduler::SyncSessionJob::NUDGE);
+ SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob(
+ SyncSchedulerImpl::SyncSessionJob::NUDGE);
- EXPECT_EQ(decision, SyncScheduler::DROP);
+ EXPECT_EQ(decision, SyncSchedulerImpl::DROP);
}
TEST_F(SyncSchedulerWhiteboxTest, ContinueCanaryJobConfig) {
@@ -258,13 +259,13 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueCanaryJobConfig) {
SetMode(SyncScheduler::CONFIGURATION_MODE);
SetWaitIntervalToExponentialBackoff();
- struct SyncScheduler::SyncSessionJob job;
- job.purpose = SyncScheduler::SyncSessionJob::CONFIGURATION;
+ struct SyncSchedulerImpl::SyncSessionJob job;
+ job.purpose = SyncSchedulerImpl::SyncSessionJob::CONFIGURATION;
job.scheduled_start = TimeTicks::Now();
job.is_canary_job = true;
- SyncScheduler::JobProcessDecision decision = DecideOnJob(job);
+ SyncSchedulerImpl::JobProcessDecision decision = DecideOnJob(job);
- EXPECT_EQ(decision, SyncScheduler::CONTINUE);
+ EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE);
}
} // namespace syncer
diff --git a/sync/engine/syncer_unittest.cc b/sync/engine/syncer_unittest.cc
index c468623..3f6dde7 100644
--- a/sync/engine/syncer_unittest.cc
+++ b/sync/engine/syncer_unittest.cc
@@ -26,7 +26,7 @@
#include "sync/engine/get_commit_ids_command.h"
#include "sync/engine/net/server_connection_manager.h"
#include "sync/engine/process_updates_command.h"
-#include "sync/engine/sync_scheduler.h"
+#include "sync/engine/sync_scheduler_impl.h"
#include "sync/engine/syncer.h"
#include "sync/engine/syncer_proto_util.h"
#include "sync/engine/throttled_data_type_tracker.h"
@@ -185,10 +185,10 @@ class SyncerTest : public testing::Test,
}
bool SyncShareAsDelegate(
- SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) {
+ SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
SyncerStep start;
SyncerStep end;
- SyncScheduler::SetSyncerStepsForPurpose(purpose, &start, &end);
+ SyncSchedulerImpl::SetSyncerStepsForPurpose(purpose, &start, &end);
session_.reset(MakeSession());
syncer_->SyncShare(session_.get(), start, end);
@@ -197,12 +197,13 @@ class SyncerTest : public testing::Test,
bool SyncShareNudge() {
session_.reset(MakeSession());
- return SyncShareAsDelegate(SyncScheduler::SyncSessionJob::NUDGE);
+ return SyncShareAsDelegate(SyncSchedulerImpl::SyncSessionJob::NUDGE);
}
bool SyncShareConfigure() {
session_.reset(MakeSession());
- return SyncShareAsDelegate(SyncScheduler::SyncSessionJob::CONFIGURATION);
+ return SyncShareAsDelegate(
+ SyncSchedulerImpl::SyncSessionJob::CONFIGURATION);
}
void LoopSyncShare() {