diff options
Diffstat (limited to 'chrome/browser/sync/engine/syncer_thread2.cc')
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2.cc | 863 |
1 files changed, 863 insertions, 0 deletions
diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc new file mode 100644 index 0000000..b8bd9a9 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2.cc @@ -0,0 +1,863 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/engine/syncer_thread2.h" + +#include <algorithm> + +#include "base/rand_util.h" +#include "chrome/browser/sync/engine/syncer.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { + +using sessions::SyncSession; +using sessions::SyncSessionSnapshot; +using sessions::SyncSourceInfo; +using syncable::ModelTypePayloadMap; +using syncable::ModelTypeBitSet; +using sync_pb::GetUpdatesCallerInfo; + +namespace s3 { + +SyncerThread::DelayProvider::DelayProvider() {} +SyncerThread::DelayProvider::~DelayProvider() {} + +SyncerThread::WaitInterval::WaitInterval() {} +SyncerThread::WaitInterval::~WaitInterval() {} + +SyncerThread::SyncSessionJob::SyncSessionJob() {} +SyncerThread::SyncSessionJob::~SyncSessionJob() {} + +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, + base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location) : purpose(purpose), + scheduled_start(start), + session(session), + is_canary_job(is_canary_job), + nudge_location(nudge_location) { +} + +TimeDelta SyncerThread::DelayProvider::GetDelay( + const base::TimeDelta& last_delay) { + return SyncerThread::GetRecommendedDelay(last_delay); +} + +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_UNKNOWN: + return GetUpdatesCallerInfo::UNKNOWN; + default: + NOTREACHED(); + return GetUpdatesCallerInfo::UNKNOWN; + } +} + +SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) + : mode(mode), had_nudge(false), length(length) { } + +SyncerThread::SyncerThread(sessions::SyncSessionContext* context, + Syncer* syncer) + : thread_("SyncEngine_SyncerThread"), + syncer_short_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), + syncer_long_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), + mode_(NORMAL_MODE), + server_connection_ok_(false), + delay_provider_(new DelayProvider()), + syncer_(syncer), + session_context_(context) { +} + +SyncerThread::~SyncerThread() { + DCHECK(!thread_.IsRunning()); +} + +void SyncerThread::CheckServerConnectionManagerStatus( + HttpResponse::ServerConnectionCode code) { + + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << "Old mode: " << server_connection_ok_ << " Code: " << code; + // Note, be careful when adding cases here because if the SyncerThread + // thinks there is no valid connection as determined by this method, it + // will drop out of *all* forward progress sync loops (it won't poll and it + // will queue up Talk notifications but not actually call SyncShare) until + // some external action causes a ServerConnectionManager to broadcast that + // a valid connection has been re-established. + if (HttpResponse::CONNECTION_UNAVAILABLE == code || + HttpResponse::SYNC_AUTH_ERROR == code) { + server_connection_ok_ = false; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; + } else if (HttpResponse::SERVER_CONNECTION_OK == code) { + server_connection_ok_ = true; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; + DoCanaryJob(); + } +} + +void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread " + << MessageLoop::current()->thread_name(); + if (!thread_.IsRunning()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode " + << mode; + if (!thread_.Start()) { + NOTREACHED() << "Unable to start SyncerThread."; + return; + } + WatchConnectionManager(); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::SendInitialSnapshot)); + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = " + << mode; + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); +} + +void SyncerThread::SendInitialSnapshot() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, + SyncSourceInfo(), ModelSafeRoutingInfo(), + std::vector<ModelSafeWorker*>())); + SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); + sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); + event.snapshot = &snapshot; + session_context_->NotifyListeners(event); +} + +void SyncerThread::WatchConnectionManager() { + ServerConnectionManager* scm = session_context_->connection_manager(); + CheckServerConnectionManagerStatus(scm->server_status()); + scm->AddListener(this); +} + +void SyncerThread::StartImpl(Mode mode, + linked_ptr<ModeChangeCallback> callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " + << mode; + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!session_context_->account_name().empty()); + DCHECK(syncer_.get()); + mode_ = mode; + AdjustPolling(NULL); // Will kick start poll timer if needed. + if (callback.get()) + callback->Run(); + + // We just changed our mode. See if there are any pending jobs that we could + // execute in the new mode. + DoPendingJobIfPossible(false); +} + +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( + const SyncSessionJob& job) { + + DCHECK(wait_interval_.get()); + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); + + VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : " + << wait_interval_->mode << "Wait interval had nudge : " + << wait_interval_->had_nudge << "is canary job : " + << job.is_canary_job; + + if (job.purpose == SyncSessionJob::POLL) + return DROP; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || + job.purpose == SyncSessionJob::CONFIGURATION); + if (wait_interval_->mode == WaitInterval::THROTTLED) + return SAVE; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + if (job.purpose == SyncSessionJob::NUDGE) { + if (mode_ == CONFIGURATION_MODE) + return SAVE; + + // If we already had one nudge then just drop this nudge. We will retry + // later when the timer runs out. + return wait_interval_->had_nudge ? DROP : CONTINUE; + } + // This is a config job. + return job.is_canary_job ? CONTINUE : SAVE; +} + +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( + const SyncSessionJob& job) { + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) + return CONTINUE; + + if (wait_interval_.get()) + return DecideWhileInWaitInterval(job); + + if (mode_ == CONFIGURATION_MODE) { + if (job.purpose == SyncSessionJob::NUDGE) + return SAVE; + else if (job.purpose == SyncSessionJob::CONFIGURATION) + return CONTINUE; + else + return DROP; + } + + // We are in normal mode. + DCHECK_EQ(mode_, NORMAL_MODE); + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); + + // Freshness condition + if (job.scheduled_start < last_sync_session_end_time_) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Dropping job because of freshness"; + return DROP; + } + + if (server_connection_ok_) + return CONTINUE; + + VLOG(2) << "SyncerThread(" << this << ")" + << " Bad server connection. Using that to decide on job."; + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; +} + +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); + if (pending_nudge_.get() == NULL) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Creating a pending nudge job"; + SyncSession* s = job.session.get(); + scoped_ptr<SyncSession> session(new SyncSession(s->context(), + s->delegate(), s->source(), s->routing_info(), s->workers())); + + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, + make_linked_ptr(session.release()), false, job.nudge_location); + pending_nudge_.reset(new SyncSessionJob(new_job)); + + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + pending_nudge_->scheduled_start = job.scheduled_start; + + // Unfortunately the nudge location cannot be modified. So it stores the + // location of the first caller. +} + +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { + JobProcessDecision decision = DecideOnJob(job); + VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: " + << decision << " Job purpose " << job.purpose << "mode " << mode_; + if (decision != SAVE) + return decision == CONTINUE; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == + SyncSessionJob::CONFIGURATION); + + SaveJob(job); + return false; +} + +void SyncerThread::SaveJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); + if (job.purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job"; + InitOrCoalescePendingJob(job); + } else if (job.purpose == SyncSessionJob::CONFIGURATION){ + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job"; + DCHECK(wait_interval_.get()); + DCHECK(mode_ == CONFIGURATION_MODE); + + SyncSession* old = job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + SyncSessionJob new_job(job.purpose, TimeTicks::Now(), + make_linked_ptr(s), false, job.nudge_location); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); + } // drop the rest. +} + +// Functor for std::find_if to search by ModelSafeGroup. +struct ModelSafeWorkerGroupIs { + explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} + bool operator()(ModelSafeWorker* w) { + return group == w->GetModelSafeGroup(); + } + ModelSafeGroup group; +}; + +void SyncerThread::ScheduleClearUserData() { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleClearUserDataImpl)); +} + +void SyncerThread::ScheduleNudge(const TimeDelta& delay, + NudgeSource source, const ModelTypeBitSet& types, + const tracked_objects::Location& nudge_location) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled"; + + ModelTypePayloadMap types_with_payloads = + syncable::ModelTypePayloadMapFromBitSet(types, std::string()); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); +} + +void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, + NudgeSource source, const ModelTypePayloadMap& types_with_payloads, + const tracked_objects::Location& nudge_location) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); +} + +void SyncerThread::ScheduleClearUserDataImpl() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + SyncSession* session = new SyncSession(session_context_.get(), this, + SyncSourceInfo(), ModelSafeRoutingInfo(), + std::vector<ModelSafeWorker*>()); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); +} + +void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, + GetUpdatesCallerInfo::GetUpdatesSource source, + const ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; + // Note we currently nudge for all types regardless of the ones incurring + // the nudge. Doing different would throw off some syncer commands like + // CleanupDisabledTypes. We may want to change this in the future. + SyncSourceInfo info(source, types_with_payloads); + + SyncSession* session(CreateSyncSession(info)); + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, + make_linked_ptr(session), is_canary_job, + nudge_location); + + session = NULL; + if (!ShouldRunJob(job)) + return; + + if (pending_nudge_.get()) { + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because" + << "we are in backoff"; + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + + if (!IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because" + << " we are not in backoff and the job was coalesced"; + return; + } else { + VLOG(2) << "SyncerThread(" << this << ")" + << " Rescheduling pending nudge"; + SyncSession* s = pending_nudge_->session.get(); + job.session.reset(new SyncSession(s->context(), s->delegate(), + s->source(), s->routing_info(), s->workers())); + pending_nudge_.reset(); + } + } + + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), + nudge_location); +} + +// Helper to extract the routing info and workers corresponding to types in +// |types| from |registrar|. +void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, + ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, + std::vector<ModelSafeWorker*>* workers) { + ModelSafeRoutingInfo r_tmp; + std::vector<ModelSafeWorker*> w_tmp; + registrar->GetModelSafeRoutingInfo(&r_tmp); + registrar->GetWorkers(&w_tmp); + + typedef std::vector<ModelSafeWorker*>::const_iterator iter; + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { + if (!types.test(i)) + continue; + syncable::ModelType t = syncable::ModelTypeFromInt(i); + DCHECK_EQ(1U, r_tmp.count(t)); + (*routes)[t] = r_tmp[t]; + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), + ModelSafeWorkerGroupIs(r_tmp[t])); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); + } + + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), + ModelSafeWorkerGroupIs(GROUP_PASSIVE)); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); +} + +void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config"; + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + GetModelSafeParamsForTypes(types, session_context_->registrar(), + &routes, &workers); + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleConfigImpl, routes, workers, + GetUpdatesCallerInfo::FIRST_UPDATE)); +} + +void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; + // TODO(tim): config-specific GetUpdatesCallerInfo value? + SyncSession* session = new SyncSession(session_context_.get(), this, + SyncSourceInfo(source, + syncable::ModelTypePayloadMapFromRoutingInfo( + routing_info, std::string())), + routing_info, workers); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CONFIGURATION, session, FROM_HERE); +} + +void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, + const tracked_objects::Location& nudge_location) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + SyncSessionJob job(purpose, TimeTicks::Now() + delay, + make_linked_ptr(session), false, nudge_location); + if (purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" + << " ScheduleSyncSessionJob"; + DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); + pending_nudge_.reset(new SyncSessionJob(job)); + } + VLOG(2) << "SyncerThread(" << this << ")" + << " Posting job to execute in DoSyncSessionJob. Job purpose " + << job.purpose; + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::DoSyncSessionJob, job), + delay.InMilliseconds()); +} + +void SyncerThread::SetSyncerStepsForPurpose( + SyncSessionJob::SyncSessionJobPurpose purpose, + SyncerStep* start, SyncerStep* end) { + *end = SYNCER_END; + switch (purpose) { + case SyncSessionJob::CONFIGURATION: + *start = DOWNLOAD_UPDATES; + *end = APPLY_UPDATES; + return; + case SyncSessionJob::CLEAR_USER_DATA: + *start = CLEAR_PRIVATE_DATA; + return; + case SyncSessionJob::NUDGE: + case SyncSessionJob::POLL: + *start = SYNCER_BEGIN; + return; + default: + NOTREACHED(); + } +} + +void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + if (!ShouldRunJob(job)) + return; + + if (job.purpose == SyncSessionJob::NUDGE) { + DCHECK(pending_nudge_.get()); + if (pending_nudge_->session != job.session) + return; // Another nudge must have been scheduled in in the meantime. + pending_nudge_.reset(); + } + VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " + << job.purpose; + + SyncerStep begin(SYNCER_BEGIN); + SyncerStep end(SYNCER_END); + SetSyncerStepsForPurpose(job.purpose, &begin, &end); + + bool has_more_to_sync = true; + while (ShouldRunJob(job) && has_more_to_sync) { + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Calling SyncShare."; + // Synchronously perform the sync session from this thread. + syncer_->SyncShare(job.session.get(), begin, end); + has_more_to_sync = job.session->HasMoreToSync(); + if (has_more_to_sync) + job.session->ResetTransientState(); + } + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Done SyncShare looping."; + FinishSyncSessionJob(job); +} + +void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + // Whatever types were part of a configuration task will have had updates + // downloaded. For that reason, we make sure they get recorded in the + // event that they get disabled at a later time. + ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); + if (!r.empty()) { + ModelSafeRoutingInfo temp_r; + ModelSafeRoutingInfo old_info(old_job.session->routing_info()); + std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), + std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); + session_context_->set_previous_session_routing_info(temp_r); + } + } else { + session_context_->set_previous_session_routing_info( + old_job.session->routing_info()); + } +} + +void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + // Update timing information for how often datatypes are triggering nudges. + base::TimeTicks now = TimeTicks::Now(); + if (!last_sync_session_end_time_.is_null()) { + ModelTypePayloadMap::const_iterator iter; + for (iter = job.session->source().types.begin(); + iter != job.session->source().types.end(); + ++iter) { + syncable::PostTimeToTypeHistogram(iter->first, + now - last_sync_session_end_time_); + } + } + last_sync_session_end_time_ = now; + UpdateCarryoverSessionState(job); + if (IsSyncingCurrentlySilenced()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " We are currently throttled. So not scheduling the next sync."; + SaveJob(job); + return; // Nothing to do. + } + + VLOG(2) << "SyncerThread(" << this << ")" + << " Updating the next polling time after SyncMain"; + ScheduleNextSync(job); +} + +void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!old_job.session->HasMoreToSync()); + // Note: |num_server_changes_remaining| > 0 here implies that we received a + // broken response while trying to download all updates, because the Syncer + // will loop until this value is exhausted. Also, if unsynced_handles exist + // but HasMoreToSync is false, this implies that the Syncer determined no + // forward progress was possible at this time (an error, such as an HTTP + // 500, is likely to have occurred during commit). + const bool work_to_do = + old_job.session->status_controller()->num_server_changes_remaining() > 0 + || old_job.session->status_controller()->unsynced_handles().size() > 0; + VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: " + << work_to_do; + + AdjustPolling(&old_job); + + // TODO(tim): Old impl had special code if notifications disabled. Needed? + if (!work_to_do) { + // Success implies backoff relief. Note that if this was a "one-off" job + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was + // work_to_do before it ran this wont have changed, as jobs like this don't + // run a full sync cycle. So we don't need special code here. + wait_interval_.reset(); + VLOG(2) << "SyncerThread(" << this << ")" + << " Job suceeded so not scheduling more jobs"; + return; + } + + if (old_job.session->source().updates_source == + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Job failed with source continuation"; + // We don't seem to have made forward progress. Start or extend backoff. + HandleConsecutiveContinuationError(old_job); + } else if (IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " A nudge during backoff failed"; + // We weren't continuing but we're in backoff; must have been a nudge. + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); + DCHECK(!wait_interval_->had_nudge); + wait_interval_->had_nudge = true; + wait_interval_->timer.Reset(); + } else { + VLOG(2) << "SyncerThread(" << this << ")" + << " Failed. Schedule a job with continuation as source"; + // We weren't continuing and we aren't in backoff. Schedule a normal + // continuation. + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + ScheduleConfigImpl(old_job.session->routing_info(), + old_job.session->workers(), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); + } else { + // For all other purposes(nudge and poll) we schedule a retry nudge. + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), + old_job.session->source().types, false, FROM_HERE); + } + } +} + +void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { + DCHECK(thread_.IsRunning()); + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + TimeDelta poll = (!session_context_->notifications_enabled()) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + bool rate_changed = !poll_timer_.IsRunning() || + poll != poll_timer_.GetCurrentDelay(); + + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) + poll_timer_.Reset(); + + if (!rate_changed) + return; + + // Adjust poll rate. + poll_timer_.Stop(); + poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); +} + +void SyncerThread::HandleConsecutiveContinuationError( + const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + // This if conditions should be compiled out in retail builds. + if (IsBackingOff()) { + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); + } + SyncSession* old = old_job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + TimeDelta length = delay_provider_->GetDelay( + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + + VLOG(2) << "SyncerThread(" << this << ")" + << " In handle continuation error. Old job purpose is " + << old_job.purpose; + VLOG(2) << "SyncerThread(" << this << ")" + << " In Handle continuation error. The time delta(ms) is: " + << length.InMilliseconds(); + + // This will reset the had_nudge variable as well. + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, + length)); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, + make_linked_ptr(s), false, FROM_HERE); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); + } else { + // We are not in configuration mode. So wait_interval's pending job + // should be null. + DCHECK(wait_interval_->pending_configure_job.get() == NULL); + + // TODO(lipalani) - handle clear user data. + InitOrCoalescePendingJob(old_job); + } + wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); +} + +// static +TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { + if (last_delay.InSeconds() >= kMaxBackoffSeconds) + return TimeDelta::FromSeconds(kMaxBackoffSeconds); + + // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 + int64 backoff_s = + std::max(static_cast<int64>(1), + last_delay.InSeconds() * kBackoffRandomizationFactor); + + // Flip a coin to randomize backoff interval by +/- 50%. + int rand_sign = base::RandInt(0, 1) * 2 - 1; + + // Truncation is adequate for rounding here. + backoff_s = backoff_s + + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); + + // Cap the backoff interval. + backoff_s = std::max(static_cast<int64>(1), + std::min(backoff_s, kMaxBackoffSeconds)); + + return TimeDelta::FromSeconds(backoff_s); +} + +void SyncerThread::Stop() { + VLOG(2) << "SyncerThread(" << this << ")" << " stop called"; + syncer_->RequestEarlyExit(); // Safe to call from any thread. + session_context_->connection_manager()->RemoveListener(this); + thread_.Stop(); +} + +void SyncerThread::DoCanaryJob() { + VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job"; + DoPendingJobIfPossible(true); +} + +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { + SyncSessionJob* job_to_execute = NULL; + if (mode_ == CONFIGURATION_MODE && wait_interval_.get() + && wait_interval_->pending_configure_job.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job"; + job_to_execute = wait_interval_->pending_configure_job.get(); + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job"; + // Pending jobs mostly have time from the past. Reset it so this job + // will get executed. + if (pending_nudge_->scheduled_start < TimeTicks::Now()) + pending_nudge_->scheduled_start = TimeTicks::Now(); + + scoped_ptr<SyncSession> session(CreateSyncSession( + pending_nudge_->session->source())); + + // Also the routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + pending_nudge_->session->Coalesce(*(session.get())); + // The pending nudge would be cleared in the DoSyncSessionJob function. + job_to_execute = pending_nudge_.get(); + } + + if (job_to_execute != NULL) { + VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job"; + SyncSessionJob copy = *job_to_execute; + copy.is_canary_job = is_canary_job; + DoSyncSessionJob(copy); + } +} + +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(source); + + SyncSession* session(new SyncSession(session_context_.get(), this, info, + routes, workers)); + + return session; +} + +void SyncerThread::PollTimerCallback() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + ModelSafeRoutingInfo r; + ModelTypePayloadMap types_with_payloads = + syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); + SyncSession* s = CreateSyncSession(info); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, + FROM_HERE); +} + +void SyncerThread::Unthrottle() { + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled.."; + DoCanaryJob(); + wait_interval_.reset(); +} + +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + session_context_->NotifyListeners(SyncEngineEvent(cause)); +} + +bool SyncerThread::IsBackingOff() const { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::EXPONENTIAL_BACKOFF; +} + +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, + silenced_until - TimeTicks::Now())); + wait_interval_->timer.Start(wait_interval_->length, this, + &SyncerThread::Unthrottle); +} + +bool SyncerThread::IsSyncingCurrentlySilenced() { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::THROTTLED; +} + +void SyncerThread::OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_short_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_long_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnShouldStopSyncingPermanently() { + VLOG(2) << "SyncerThread(" << this << ")" + << " OnShouldStopSyncingPermanently"; + syncer_->RequestEarlyExit(); // Thread-safe. + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); +} + +void SyncerThread::OnServerConnectionEvent( + const ServerConnectionEvent2& event) { + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::CheckServerConnectionManagerStatus, + event.connection_code)); +} + +void SyncerThread::set_notifications_enabled(bool notifications_enabled) { + session_context_->set_notifications_enabled(notifications_enabled); +} + +} // s3 +} // browser_sync |