diff options
Diffstat (limited to 'sync')
23 files changed, 1092 insertions, 567 deletions
diff --git a/sync/engine/sync_scheduler.h b/sync/engine/sync_scheduler.h index b9c1ed3..4673192 100644 --- a/sync/engine/sync_scheduler.h +++ b/sync/engine/sync_scheduler.h @@ -85,13 +85,18 @@ class SyncScheduler : public sessions::SyncSession::Delegate { // The meat and potatoes. Both of these methods will post a delayed task // to attempt the actual nudge (see ScheduleNudgeImpl). + // NOTE: |desired_delay| is best-effort. If a nudge is already scheduled to + // depart earlier than Now() + delay, the scheduler can and will prefer to + // batch the two so that only one nudge is sent (at the earlier time). Also, + // as always with delayed tasks and timers, it's possible the task gets run + // any time after |desired_delay|. virtual void ScheduleNudgeAsync( - const base::TimeDelta& delay, + const base::TimeDelta& desired_delay, NudgeSource source, ModelTypeSet types, const tracked_objects::Location& nudge_location) = 0; virtual void ScheduleNudgeWithStatesAsync( - const base::TimeDelta& delay, NudgeSource source, + const base::TimeDelta& desired_delay, NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, const tracked_objects::Location& nudge_location) = 0; diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc index b2180fb..4d1f9a8 100644 --- a/sync/engine/sync_scheduler_impl.cc +++ b/sync/engine/sync_scheduler_impl.cc @@ -9,6 +9,7 @@ #include "base/auto_reset.h" #include "base/bind.h" +#include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "base/location.h" #include "base/logging.h" @@ -83,8 +84,12 @@ ConfigurationParams::~ConfigurationParams() {} SyncSchedulerImpl::WaitInterval::WaitInterval() : mode(UNKNOWN), - had_nudge(false) { -} + had_nudge(false), + pending_configure_job(NULL) {} + +SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) + : mode(mode), had_nudge(false), length(length), + pending_configure_job(NULL) {} SyncSchedulerImpl::WaitInterval::~WaitInterval() {} @@ -100,39 +105,6 @@ const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { return ""; } -SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() - : purpose(UNKNOWN), - is_canary_job(false) { -} - -SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} - -SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, - base::TimeTicks start, - linked_ptr<sessions::SyncSession> session, - bool is_canary_job, - const ConfigurationParams& config_params, - const tracked_objects::Location& from_here) - : purpose(purpose), - scheduled_start(start), - session(session), - is_canary_job(is_canary_job), - config_params(config_params), - from_here(from_here) { -} - -const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( - SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { - switch (purpose) { - ENUM_CASE(UNKNOWN); - ENUM_CASE(POLL); - ENUM_CASE(NUDGE); - ENUM_CASE(CONFIGURATION); - } - NOTREACHED(); - return ""; -} - GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( NudgeSource source) { switch (source) { @@ -152,9 +124,6 @@ GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( } } -SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) - : mode(mode), had_nudge(false), length(length) { } - // Helper macros to log with the syncer thread name; useful when there // are multiple syncer threads involved. @@ -205,6 +174,7 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, // Start with assuming everything is fine with the connection. // At the end of the sync cycle we would have the correct status. connection_code_(HttpResponse::SERVER_CONNECTION_OK), + pending_nudge_(NULL), delay_provider_(delay_provider), syncer_(syncer), session_context_(context), @@ -241,10 +211,25 @@ void SyncSchedulerImpl::OnConnectionStatusChange() { void SyncSchedulerImpl::OnServerConnectionErrorFixed() { connection_code_ = HttpResponse::SERVER_CONNECTION_OK; + // There could be a pending nudge or configuration job in several cases: + // + // 1. We're in exponential backoff. + // 2. We're silenced / throttled. + // 3. A nudge was saved previously due to not having a valid auth token. + // 4. A nudge was scheduled + saved while in configuration mode. + // + // In all cases except (2), we want to retry contacting the server. We + // call DoCanaryJob to achieve this, and note that nothing -- not even a + // canary job -- can bypass a THROTTLED WaitInterval. The only thing that + // has the authority to do that is the Unthrottle timer. + scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); + if (!pending.get()) + return; + PostTask(FROM_HERE, "DoCanaryJob", base::Bind(&SyncSchedulerImpl::DoCanaryJob, - weak_ptr_factory_.GetWeakPtr())); - + weak_ptr_factory_.GetWeakPtr(), + base::Passed(&pending))); } void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( @@ -280,11 +265,20 @@ void SyncSchedulerImpl::Start(Mode mode) { if (mode_ == NORMAL_MODE) { // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job // has not yet completed. - DCHECK(!wait_interval_.get() || - !wait_interval_->pending_configure_job.get()); + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); } - DoPendingJobIfPossible(false); + scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); + if (pending.get()) { + // TODO(tim): We should be able to remove this... + scoped_ptr<SyncSession> session(CreateSyncSession( + pending->session()->source())); + // Also the routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + pending->mutable_session()->Coalesce(*session); + SDVLOG(2) << "Executing pending job. Good luck!"; + DoSyncSessionJob(pending.Pass()); + } } } @@ -328,7 +322,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration( // Only one configuration is allowed at a time. Verify we're not waiting // for a pending configure job. - DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); + DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); ModelSafeRoutingInfo restricted_routes; BuildModelSafeParams(params.types_to_download, @@ -339,7 +333,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration( // Only reconfigure if we have types to download. if (!params.types_to_download.Empty()) { DCHECK(!restricted_routes.empty()); - linked_ptr<SyncSession> session(new SyncSession( + scoped_ptr<SyncSession> session(new SyncSession( session_context_, this, SyncSourceInfo(params.source, @@ -348,19 +342,18 @@ bool SyncSchedulerImpl::ScheduleConfiguration( std::string())), restricted_routes, session_context_->workers())); - SyncSessionJob job(SyncSessionJob::CONFIGURATION, - TimeTicks::Now(), - session, - false, - params, - FROM_HERE); - DoSyncSessionJob(job); + scoped_ptr<SyncSessionJob> job(new SyncSessionJob( + SyncSessionJob::CONFIGURATION, + TimeTicks::Now(), + session.Pass(), + params, + FROM_HERE)); + bool succeeded = DoSyncSessionJob(job.Pass()); // If we failed, the job would have been saved as the pending configure // job and a wait interval would have been set. - if (!session->Succeeded()) { - DCHECK(wait_interval_.get() && - wait_interval_->pending_configure_job.get()); + if (!succeeded) { + DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); return false; } } else { @@ -372,37 +365,42 @@ bool SyncSchedulerImpl::ScheduleConfiguration( } SyncSchedulerImpl::JobProcessDecision -SyncSchedulerImpl::DecideWhileInWaitInterval( - const SyncSessionJob& job) { +SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job) { DCHECK_EQ(MessageLoop::current(), sync_loop_); DCHECK(wait_interval_.get()); SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " << WaitInterval::GetModeString(wait_interval_->mode) << (wait_interval_->had_nudge ? " (had nudge)" : "") - << (job.is_canary_job ? " (canary)" : ""); + << (job.is_canary() ? " (canary)" : ""); - if (job.purpose == SyncSessionJob::POLL) + if (job.purpose() == SyncSessionJob::POLL) return DROP; - DCHECK(job.purpose == SyncSessionJob::NUDGE || - job.purpose == SyncSessionJob::CONFIGURATION); + // If we save a job while in a WaitInterval, there is a well-defined moment + // in time in the future when it makes sense for that SAVE-worthy job to try + // running again -- the end of the WaitInterval. + DCHECK(job.purpose() == SyncSessionJob::NUDGE || + job.purpose() == SyncSessionJob::CONFIGURATION); + + // If throttled, there's a clock ticking to unthrottle. We want to get + // on the same train. if (wait_interval_->mode == WaitInterval::THROTTLED) return SAVE; DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); - if (job.purpose == SyncSessionJob::NUDGE) { + if (job.purpose() == SyncSessionJob::NUDGE) { if (mode_ == CONFIGURATION_MODE) return SAVE; // If we already had one nudge then just drop this nudge. We will retry // later when the timer runs out. - if (!job.is_canary_job) + if (!job.is_canary()) return wait_interval_->had_nudge ? DROP : CONTINUE; else // We are here because timer ran out. So retry. return CONTINUE; } - return job.is_canary_job ? CONTINUE : SAVE; + return job.is_canary() ? CONTINUE : SAVE; } SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( @@ -412,16 +410,22 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( // See if our type is throttled. ModelTypeSet throttled_types = session_context_->throttled_data_type_tracker()->GetThrottledTypes(); - if (job.purpose == SyncSessionJob::NUDGE && - job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { + if (job.purpose() == SyncSessionJob::NUDGE && + job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { ModelTypeSet requested_types; for (ModelTypeInvalidationMap::const_iterator i = - job.session->source().types.begin(); - i != job.session->source().types.end(); + job.session()->source().types.begin(); + i != job.session()->source().types.end(); ++i) { requested_types.Put(i->first); } + // If all types are throttled, do not CONTINUE. Today, we don't treat + // a per-datatype "unthrottle" event as something that should force a + // canary job. For this reason, there's no good time to reschedule this job + // to run -- we'll lazily wait for an independent event to trigger a sync. + // Note that there may already be such an event if we're in a WaitInterval, + // so we can retry it then. if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) return SAVE; } @@ -430,9 +434,9 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( return DecideWhileInWaitInterval(job); if (mode_ == CONFIGURATION_MODE) { - if (job.purpose == SyncSessionJob::NUDGE) - return SAVE; - else if (job.purpose == SyncSessionJob::CONFIGURATION) + if (job.purpose() == SyncSessionJob::NUDGE) + return SAVE; // Running requires a mode switch. + else if (job.purpose() == SyncSessionJob::CONFIGURATION) return CONTINUE; else return DROP; @@ -440,7 +444,7 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( // We are in normal mode. DCHECK_EQ(mode_, NORMAL_MODE); - DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); + DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION); // Note about some subtle scheduling semantics. // @@ -483,81 +487,59 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( return CONTINUE; SDVLOG(2) << "No valid auth token. Using that to decide on job."; - return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; + // Running the job would require updated auth, so we can't honour + // job.scheduled_start(). + return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; } -void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); - if (pending_nudge_.get() == NULL) { - SDVLOG(2) << "Creating a pending nudge job"; - SyncSession* s = job.session.get(); - - // Get a fresh session with similar configuration as before (resets - // StatusController). - scoped_ptr<SyncSession> session(new SyncSession(s->context(), - s->delegate(), s->source(), s->routing_info(), s->workers())); - - SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, - make_linked_ptr(session.release()), false, - ConfigurationParams(), job.from_here); - pending_nudge_.reset(new SyncSessionJob(new_job)); +void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { + DCHECK_EQ(DecideOnJob(*job), SAVE); + const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; + if (is_nudge && pending_nudge_) { + SDVLOG(2) << "Coalescing a pending nudge"; + // TODO(tim): This basically means we never use the more-careful coalescing + // logic in ScheduleNudgeImpl that takes the min of the two nudge start + // times, because we're calling this function first. Pull this out + // into a function to coalesce + set start times and reuse. + pending_nudge_->mutable_session()->Coalesce(*(job->session())); return; } - SDVLOG(2) << "Coalescing a pending nudge"; - pending_nudge_->session->Coalesce(*(job.session.get())); - pending_nudge_->scheduled_start = job.scheduled_start; - - // Unfortunately the nudge location cannot be modified. So it stores the - // location of the first caller. -} - -bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(started_); - - JobProcessDecision decision = DecideOnJob(job); - SDVLOG(2) << "Should run " - << SyncSessionJob::GetPurposeString(job.purpose) - << " job in mode " << GetModeString(mode_) - << ": " << GetDecisionString(decision); - if (decision != SAVE) - return decision == CONTINUE; - - DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == - SyncSessionJob::CONFIGURATION); - - SaveJob(job); - return false; -} - -void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (job.purpose == SyncSessionJob::NUDGE) { - SDVLOG(2) << "Saving a nudge job"; - InitOrCoalescePendingJob(job); - } else if (job.purpose == SyncSessionJob::CONFIGURATION){ - SDVLOG(2) << "Saving a configuration job"; - DCHECK(wait_interval_.get()); - DCHECK(mode_ == CONFIGURATION_MODE); + scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); + if (wait_interval_.get() && !wait_interval_->pending_configure_job) { + // This job should be made the new canary. + if (is_nudge) { + pending_nudge_ = job_to_save.get(); + } else { + SDVLOG(2) << "Saving a configuration job"; + DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); + DCHECK(!wait_interval_->pending_configure_job); + DCHECK_EQ(mode_, CONFIGURATION_MODE); + DCHECK(!job->config_params().ready_task.is_null()); + // The only nudge that could exist is a scheduled canary nudge. + DCHECK(!unscheduled_nudge_storage_.get()); + if (pending_nudge_) { + // Pre-empt the nudge canary and abandon the old nudge (owned by task). + unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); + pending_nudge_ = unscheduled_nudge_storage_.get(); + } + wait_interval_->pending_configure_job = job_to_save.get(); + } + TimeDelta length = + wait_interval_->timer.desired_run_time() - TimeTicks::Now(); + wait_interval_->length = length < TimeDelta::FromSeconds(0) ? + TimeDelta::FromSeconds(0) : length; + RestartWaiting(job_to_save.Pass()); + return; + } - // Config params should always get set. - DCHECK(!job.config_params.ready_task.is_null()); - SyncSession* old = job.session.get(); - SyncSession* s(new SyncSession(session_context_, this, old->source(), - old->routing_info(), old->workers())); - SyncSessionJob new_job(job.purpose, - TimeTicks::Now(), - make_linked_ptr(s), - false, - job.config_params, - job.from_here); - wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); - } // drop the rest. - // TODO(sync): Is it okay to drop the rest? It's weird that - // SaveJob() only does what it says sometimes. (See - // http://crbug.com/90868.) + // Note that today there are no cases where we SAVE a CONFIGURATION job + // when we're not in a WaitInterval. See bug 147736. + DCHECK(is_nudge); + // There may or may not be a pending_configure_job. Either way this nudge + // is unschedulable. + pending_nudge_ = job_to_save.get(); + unscheduled_nudge_storage_ = job_to_save.Pass(); } // Functor for std::find_if to search by ModelSafeGroup. @@ -570,39 +552,39 @@ struct ModelSafeWorkerGroupIs { }; void SyncSchedulerImpl::ScheduleNudgeAsync( - const TimeDelta& delay, + const TimeDelta& desired_delay, NudgeSource source, ModelTypeSet types, const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), sync_loop_); SDVLOG_LOC(nudge_location, 2) - << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " + << "Nudge scheduled with delay " + << desired_delay.InMilliseconds() << " ms, " << "source " << GetNudgeSourceString(source) << ", " << "types " << ModelTypeSetToString(types); ModelTypeInvalidationMap invalidation_map = ModelTypeSetToInvalidationMap(types, std::string()); - SyncSchedulerImpl::ScheduleNudgeImpl(delay, + SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, GetUpdatesFromNudgeSource(source), invalidation_map, - false, nudge_location); } void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( - const TimeDelta& delay, + const TimeDelta& desired_delay, NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), sync_loop_); SDVLOG_LOC(nudge_location, 2) - << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " + << "Nudge scheduled with delay " + << desired_delay.InMilliseconds() << " ms, " << "source " << GetNudgeSourceString(source) << ", " << "payloads " << ModelTypeInvalidationMapToString(invalidation_map); - SyncSchedulerImpl::ScheduleNudgeImpl(delay, + SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, GetUpdatesFromNudgeSource(source), invalidation_map, - false, nudge_location); } @@ -610,7 +592,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( const TimeDelta& delay, GetUpdatesCallerInfo::GetUpdatesSource source, const ModelTypeInvalidationMap& invalidation_map, - bool is_canary_job, const tracked_objects::Location& nudge_location) { + const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), sync_loop_); DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; @@ -619,44 +601,55 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( << delay.InMilliseconds() << " ms, " << "source " << GetUpdatesSourceString(source) << ", " << "payloads " - << ModelTypeInvalidationMapToString(invalidation_map) - << (is_canary_job ? " (canary)" : ""); + << ModelTypeInvalidationMapToString(invalidation_map); SyncSourceInfo info(source, invalidation_map); UpdateNudgeTimeRecords(info); - SyncSession* session(CreateSyncSession(info)); - SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, - make_linked_ptr(session), is_canary_job, - ConfigurationParams(), nudge_location); + scoped_ptr<SyncSessionJob> job(new SyncSessionJob( + SyncSessionJob::NUDGE, + TimeTicks::Now() + delay, + CreateSyncSession(info).Pass(), + ConfigurationParams(), + nudge_location)); - session = NULL; - if (!ShouldRunJob(job)) - return; - - if (pending_nudge_.get()) { - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { - SDVLOG(2) << "Dropping the nudge because we are in backoff"; - return; + JobProcessDecision decision = DecideOnJob(*job); + SDVLOG(2) << "Should run " + << SyncSessionJob::GetPurposeString(job->purpose()) + << " job " << job->session() + << " in mode " << GetModeString(mode_) + << ": " << GetDecisionString(decision); + if (decision != CONTINUE) { + // End of the line, though we may save the job for later. + if (decision == SAVE) { + HandleSaveJobDecision(job.Pass()); + } else { + DCHECK_EQ(decision, DROP); } + return; + } - SDVLOG(2) << "Coalescing pending nudge"; - pending_nudge_->session->Coalesce(*(job.session.get())); - + if (pending_nudge_) { SDVLOG(2) << "Rescheduling pending nudge"; - SyncSession* s = pending_nudge_->session.get(); - job.session.reset(new SyncSession(s->context(), s->delegate(), - s->source(), s->routing_info(), s->workers())); - - // Choose the start time as the earliest of the 2. - job.scheduled_start = std::min(job.scheduled_start, - pending_nudge_->scheduled_start); - pending_nudge_.reset(); + pending_nudge_->mutable_session()->Coalesce(*(job->session())); + // Choose the start time as the earliest of the 2. Note that this means + // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) + // but a nudge is already scheduled to go out, we'll send the (tab) commit + // without waiting. + pending_nudge_->set_scheduled_start( + std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); + // Abandon the old task by cloning and replacing the session. + // It's possible that by "rescheduling" we're actually taking a job that + // was previously unscheduled and giving it wings, so take care to reset + // unscheduled nudge storage. + job = pending_nudge_->CloneAndAbandon(); + unscheduled_nudge_storage_.reset(); + pending_nudge_ = NULL; } // TODO(zea): Consider adding separate throttling/backoff for datatype // refresh requests. - ScheduleSyncSessionJob(job); + ScheduleSyncSessionJob(job.Pass()); } const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { @@ -677,29 +670,6 @@ const char* SyncSchedulerImpl::GetDecisionString( return ""; } -// static -void SyncSchedulerImpl::SetSyncerStepsForPurpose( - SyncSessionJob::SyncSessionJobPurpose purpose, - SyncerStep* start, - SyncerStep* end) { - switch (purpose) { - case SyncSessionJob::CONFIGURATION: - *start = DOWNLOAD_UPDATES; - *end = APPLY_UPDATES; - return; - case SyncSessionJob::NUDGE: - case SyncSessionJob::POLL: - *start = SYNCER_BEGIN; - *end = SYNCER_END; - return; - default: - NOTREACHED(); - *start = SYNCER_END; - *end = SYNCER_END; - return; - } -} - void SyncSchedulerImpl::PostTask( const tracked_objects::Location& from_here, const char* name, const base::Closure& task) { @@ -725,82 +695,93 @@ void SyncSchedulerImpl::PostDelayedTask( sync_loop_->PostDelayedTask(from_here, task, delay); } -void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { +void SyncSchedulerImpl::ScheduleSyncSessionJob( + scoped_ptr<SyncSessionJob> job) { DCHECK_EQ(MessageLoop::current(), sync_loop_); if (no_scheduling_allowed_) { NOTREACHED() << "Illegal to schedule job while session in progress."; return; } - TimeDelta delay = job.scheduled_start - TimeTicks::Now(); + TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); + tracked_objects::Location loc(job->from_location()); if (delay < TimeDelta::FromMilliseconds(0)) delay = TimeDelta::FromMilliseconds(0); - SDVLOG_LOC(job.from_here, 2) + SDVLOG_LOC(loc, 2) << "In ScheduleSyncSessionJob with " - << SyncSessionJob::GetPurposeString(job.purpose) + << SyncSessionJob::GetPurposeString(job->purpose()) << " job and " << delay.InMilliseconds() << " ms delay"; - DCHECK(job.purpose == SyncSessionJob::NUDGE || - job.purpose == SyncSessionJob::POLL); - if (job.purpose == SyncSessionJob::NUDGE) { - SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; - DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == - job.session); - pending_nudge_.reset(new SyncSessionJob(job)); + DCHECK(job->purpose() == SyncSessionJob::NUDGE || + job->purpose() == SyncSessionJob::POLL); + if (job->purpose() == SyncSessionJob::NUDGE) { + SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to "; + DCHECK(!pending_nudge_ || pending_nudge_->session() == + job->session()); + pending_nudge_ = job.get(); } - PostDelayedTask(job.from_here, "DoSyncSessionJob", - base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, - weak_ptr_factory_.GetWeakPtr(), - job), - delay); + + PostDelayedTask(loc, "DoSyncSessionJob", + base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), + weak_ptr_factory_.GetWeakPtr(), + base::Passed(&job)), + delay); } -void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { +bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { DCHECK_EQ(MessageLoop::current(), sync_loop_); - - AutoReset<bool> protector(&no_scheduling_allowed_, true); - if (!ShouldRunJob(job)) { - SLOG(WARNING) - << "Not executing " - << SyncSessionJob::GetPurposeString(job.purpose) << " job from " - << GetUpdatesSourceString(job.session->source().updates_source); - return; - } - - if (job.purpose == SyncSessionJob::NUDGE) { - if (pending_nudge_.get() == NULL || - pending_nudge_->session != job.session) { + if (job->purpose() == SyncSessionJob::NUDGE) { + if (pending_nudge_ == NULL || + pending_nudge_->session() != job->session()) { + // |job| is abandoned. SDVLOG(2) << "Dropping a nudge in " << "DoSyncSessionJob because another nudge was scheduled"; - return; // Another nudge must have been scheduled in in the meantime. + return false; } - pending_nudge_.reset(); + pending_nudge_ = NULL; - // Create the session with the latest model safe table and use it to purge + // Rebase the session with the latest model safe table and use it to purge // and update any disabled or modified entries in the job. - scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); + job->mutable_session()->RebaseRoutingInfoWithLatest( + session_context_->routing_info(), session_context_->workers()); + } - job.session->RebaseRoutingInfoWithLatest(*session); + AutoReset<bool> protector(&no_scheduling_allowed_, true); + JobProcessDecision decision = DecideOnJob(*job); + SDVLOG(2) << "Should run " + << SyncSessionJob::GetPurposeString(job->purpose()) + << " job " << job->session() + << " in mode " << GetModeString(mode_) + << " with source " << job->session()->source().updates_source + << ": " << GetDecisionString(decision); + if (decision != CONTINUE) { + if (decision == SAVE) { + HandleSaveJobDecision(job.Pass()); + } else { + DCHECK_EQ(decision, DROP); + } + return false; } - SDVLOG(2) << "DoSyncSessionJob with " - << SyncSessionJob::GetPurposeString(job.purpose) << " job"; - SyncerStep begin(SYNCER_END); - SyncerStep end(SYNCER_END); - SetSyncerStepsForPurpose(job.purpose, &begin, &end); + SDVLOG(2) << "DoSyncSessionJob with " + << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; bool has_more_to_sync = true; - while (ShouldRunJob(job) && has_more_to_sync) { + bool premature_exit = false; + while (DecideOnJob(*job) == CONTINUE && has_more_to_sync) { SDVLOG(2) << "Calling SyncShare."; // Synchronously perform the sync session from this thread. - syncer_->SyncShare(job.session.get(), begin, end); - has_more_to_sync = job.session->HasMoreToSync(); + premature_exit = !syncer_->SyncShare(job->mutable_session(), + job->start_step(), + job->end_step()); + + has_more_to_sync = job->session()->HasMoreToSync(); if (has_more_to_sync) - job.session->PrepareForAnotherSyncCycle(); + job->mutable_session()->PrepareForAnotherSyncCycle(); } SDVLOG(2) << "Done SyncShare looping."; - FinishSyncSessionJob(job); + return FinishSyncSessionJob(job.Pass(), premature_exit); } void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { @@ -823,64 +804,82 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { #define PER_DATA_TYPE_MACRO(type_str) \ SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); - SYNC_DATA_TYPE_HISTOGRAM(iter->first); + SYNC_DATA_TYPE_HISTOGRAM(iter->first); #undef PER_DATA_TYPE_MACRO } } -void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { +bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, + bool exited_prematurely) { DCHECK_EQ(MessageLoop::current(), sync_loop_); // Now update the status of the connection from SCM. We need this to decide - // whether we need to save/run future jobs. The notifications from SCM are not - // reliable. + // whether we need to save/run future jobs. The notifications from SCM are + // not reliable. // // TODO(rlarocque): crbug.com/110954 // We should get rid of the notifications and it is probably not needed to - // maintain this status variable in 2 places. We should query it directly from - // SCM when needed. + // maintain this status variable in 2 places. We should query it directly + // from SCM when needed. ServerConnectionManager* scm = session_context_->connection_manager(); UpdateServerConnectionManagerStatus(scm->server_status()); - if (IsSyncingCurrentlySilenced()) { - SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; - // TODO(sync): Investigate whether we need to check job.purpose - // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) - SaveJob(job); - return; // Nothing to do. - } else if (job.session->Succeeded() && - !job.config_params.ready_task.is_null()) { - // If this was a configuration job with a ready task, invoke it now that - // we finished successfully. + // Let job know that we're through syncing (calling SyncShare) at this point. + bool succeeded = false; + { AutoReset<bool> protector(&no_scheduling_allowed_, true); - job.config_params.ready_task.Run(); + succeeded = job->Finish(exited_prematurely); } SDVLOG(2) << "Updating the next polling time after SyncMain"; - ScheduleNextSync(job); + ScheduleNextSync(job.Pass(), succeeded); + return succeeded; } -void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { +void SyncSchedulerImpl::ScheduleNextSync( + scoped_ptr<SyncSessionJob> finished_job, bool succeeded) { DCHECK_EQ(MessageLoop::current(), sync_loop_); - DCHECK(!old_job.session->HasMoreToSync()); + DCHECK(!finished_job->session()->HasMoreToSync()); - AdjustPolling(&old_job); + AdjustPolling(finished_job.get()); - if (old_job.session->Succeeded()) { + if (succeeded) { // Only reset backoff if we actually reached the server. - if (old_job.session->SuccessfullyReachedServer()) + // It's possible that we reached the server on one attempt, then had an + // error on the next (or didn't perform some of the server-communicating + // commands). We want to verify that, for all commands attempted, we + // successfully spoke with the server. Therefore, we verify no errors + // and at least one SYNCER_OK. + if (finished_job->session()->DidReachServer()) wait_interval_.reset(); SDVLOG(2) << "Job succeeded so not scheduling more jobs"; return; } - if (old_job.purpose == SyncSessionJob::POLL) { + if (IsSyncingCurrentlySilenced()) { + SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; + // If we're here, it's because |job| was silenced until a server specified + // time. (Note, it had to be |job|, because DecideOnJob would not permit + // any job through while in WaitInterval::THROTTLED). + scoped_ptr<SyncSessionJob> clone = finished_job->Clone(); + if (clone->purpose() == SyncSessionJob::NUDGE) + pending_nudge_ = clone.get(); + else if (clone->purpose() == SyncSessionJob::CONFIGURATION) + wait_interval_->pending_configure_job = clone.get(); + else + clone.reset(); // Unthrottling is enough, no need to force a canary. + + RestartWaiting(clone.Pass()); + return; + } + + if (finished_job->purpose() == SyncSessionJob::POLL) { return; // We don't retry POLL jobs. } // TODO(rlarocque): There's no reason why we should blindly backoff and retry // if we don't succeed. Some types of errors are not likely to disappear on - // their own. With the return values now available in the old_job.session, we - // should be able to detect such errors and only retry when we detect + // their own. With the return values now available in the old_job.session, + // we should be able to detect such errors and only retry when we detect // transient errors. if (IsBackingOff() && wait_interval_->timer.IsRunning() && @@ -888,22 +887,24 @@ void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { // When in normal mode, we allow up to one nudge per backoff interval. It // appears that this was our nudge for this interval, and it failed. // - // Note: This does not prevent us from running canary jobs. For example, an - // IP address change might still result in another nudge being executed + // Note: This does not prevent us from running canary jobs. For example, + // an IP address change might still result in another nudge being executed // during this backoff interval. - SDVLOG(2) << "A nudge during backoff failed"; - - DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); + SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; + DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); DCHECK(!wait_interval_->had_nudge); wait_interval_->had_nudge = true; - InitOrCoalescePendingJob(old_job); - RestartWaiting(); + DCHECK(!pending_nudge_); + + scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); + pending_nudge_ = new_job.get(); + RestartWaiting(new_job.Pass()); } else { // Either this is the first failure or a consecutive failure after our // backoff timer expired. We handle it the same way in either case. SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; - HandleContinuationError(old_job); + HandleContinuationError(finished_job.Pass()); } } @@ -916,7 +917,7 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { bool rate_changed = !poll_timer_.IsRunning() || poll != poll_timer_.GetCurrentDelay(); - if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) + if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) poll_timer_.Reset(); if (!rate_changed) @@ -928,55 +929,61 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { &SyncSchedulerImpl::PollTimerCallback); } -void SyncSchedulerImpl::RestartWaiting() { +void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { CHECK(wait_interval_.get()); wait_interval_->timer.Stop(); - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, - this, &SyncSchedulerImpl::DoCanaryJob); + DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); + if (wait_interval_->mode == WaitInterval::THROTTLED) { + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, + base::Bind(&SyncSchedulerImpl::Unthrottle, + weak_ptr_factory_.GetWeakPtr(), + base::Passed(&job))); + } else { + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, + base::Bind(&SyncSchedulerImpl::DoCanaryJob, + weak_ptr_factory_.GetWeakPtr(), + base::Passed(&job))); + } } void SyncSchedulerImpl::HandleContinuationError( - const SyncSessionJob& old_job) { + scoped_ptr<SyncSessionJob> old_job) { DCHECK_EQ(MessageLoop::current(), sync_loop_); if (DCHECK_IS_ON()) { if (IsBackingOff()) { - DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); + DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); } } TimeDelta length = delay_provider_->GetDelay( IsBackingOff() ? wait_interval_->length : delay_provider_->GetInitialDelay( - old_job.session->status_controller().model_neutral_state())); + old_job->session()->status_controller().model_neutral_state())); SDVLOG(2) << "In handle continuation error with " - << SyncSessionJob::GetPurposeString(old_job.purpose) + << SyncSessionJob::GetPurposeString(old_job->purpose()) << " job. The time delta(ms) is " << length.InMilliseconds(); // This will reset the had_nudge variable as well. wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); - if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); + new_job->set_scheduled_start(TimeTicks::Now() + length); + if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { SDVLOG(2) << "Configuration did not succeed, scheduling retry."; // Config params should always get set. - DCHECK(!old_job.config_params.ready_task.is_null()); - SyncSession* old = old_job.session.get(); - SyncSession* s(new SyncSession(session_context_, this, - old->source(), old->routing_info(), old->workers())); - SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, - make_linked_ptr(s), false, old_job.config_params, - FROM_HERE); - wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); + DCHECK(!old_job->config_params().ready_task.is_null()); + wait_interval_->pending_configure_job = new_job.get(); } else { // We are not in configuration mode. So wait_interval's pending job // should be null. - DCHECK(wait_interval_->pending_configure_job.get() == NULL); - - // TODO(lipalani) - handle clear user data. - InitOrCoalescePendingJob(old_job); + DCHECK(wait_interval_->pending_configure_job == NULL); + DCHECK(!pending_nudge_); + pending_nudge_ = new_job.get(); } - RestartWaiting(); + + RestartWaiting(new_job.Pass()); } void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { @@ -1003,51 +1010,56 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { callback.Run(); } -void SyncSchedulerImpl::DoCanaryJob() { +void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { DCHECK_EQ(MessageLoop::current(), sync_loop_); SDVLOG(2) << "Do canary job"; - DoPendingJobIfPossible(true); + + // Only set canary privileges here, when we are about to run the job. This + // avoids confusion in managing canary bits during scheduling, when you + // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that + // was scheduled as canary, and send it to an "unscheduled" state. + to_be_canary->GrantCanaryPrivilege(); + + if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { + // TODO(tim): We should be able to remove this... + scoped_ptr<SyncSession> temp = CreateSyncSession( + to_be_canary->session()->source()).Pass(); + // The routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + to_be_canary->mutable_session()->Coalesce(*(temp)); + } + DoSyncSessionJob(to_be_canary.Pass()); } -void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { +scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { DCHECK_EQ(MessageLoop::current(), sync_loop_); - SyncSessionJob* job_to_execute = NULL; + // If we find a scheduled pending_ job, abandon the old one and return a + // a clone. If unscheduled, just hand over ownership. + scoped_ptr<SyncSessionJob> candidate; if (mode_ == CONFIGURATION_MODE && wait_interval_.get() - && wait_interval_->pending_configure_job.get()) { + && wait_interval_->pending_configure_job) { SDVLOG(2) << "Found pending configure job"; - job_to_execute = wait_interval_->pending_configure_job.get(); - } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { + candidate = + wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); + wait_interval_->pending_configure_job = candidate.get(); + } else if (mode_ == NORMAL_MODE && pending_nudge_) { SDVLOG(2) << "Found pending nudge job"; - - scoped_ptr<SyncSession> session(CreateSyncSession( - pending_nudge_->session->source())); - - // Also the routing info might have been changed since we cached the - // pending nudge. Update it by coalescing to the latest. - pending_nudge_->session->Coalesce(*(session.get())); - // The pending nudge would be cleared in the DoSyncSessionJob function. - job_to_execute = pending_nudge_.get(); - } - - if (job_to_execute != NULL) { - SDVLOG(2) << "Executing pending job"; - SyncSessionJob copy = *job_to_execute; - copy.is_canary_job = is_canary_job; - DoSyncSessionJob(copy); + candidate = pending_nudge_->CloneAndAbandon(); + pending_nudge_ = candidate.get(); + unscheduled_nudge_storage_.reset(); } + return candidate.Pass(); } -SyncSession* SyncSchedulerImpl::CreateSyncSession( +scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( const SyncSourceInfo& source) { DCHECK_EQ(MessageLoop::current(), sync_loop_); DVLOG(2) << "Creating sync session with routes " << ModelSafeRoutingInfoToString(session_context_->routing_info()); SyncSourceInfo info(source); - SyncSession* session(new SyncSession(session_context_, this, info, + return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, session_context_->routing_info(), session_context_->workers())); - - return session; } void SyncSchedulerImpl::PollTimerCallback() { @@ -1056,22 +1068,27 @@ void SyncSchedulerImpl::PollTimerCallback() { ModelTypeInvalidationMap invalidation_map = ModelSafeRoutingInfoToInvalidationMap(r, std::string()); SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); - SyncSession* s = CreateSyncSession(info); - - SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), - make_linked_ptr(s), - false, - ConfigurationParams(), - FROM_HERE); - - ScheduleSyncSessionJob(job); + scoped_ptr<SyncSession> s(CreateSyncSession(info)); + scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, + TimeTicks::Now(), + s.Pass(), + ConfigurationParams(), + FROM_HERE)); + ScheduleSyncSessionJob(job.Pass()); } -void SyncSchedulerImpl::Unthrottle() { +void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { DCHECK_EQ(MessageLoop::current(), sync_loop_); DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); - SDVLOG(2) << "Unthrottled."; - DoCanaryJob(); + SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") + << "canary."; + if (to_be_canary.get()) + DoCanaryJob(to_be_canary.Pass()); + + // TODO(tim): The way DecideOnJob works today, canary privileges aren't + // enough to bypass a THROTTLED wait interval, which would suggest we need + // to reset before DoCanaryJob (though trusting canary in DecideOnJob is + // probably the "right" thing to do). Bug 154216. wait_interval_.reset(); } @@ -1091,8 +1108,6 @@ void SyncSchedulerImpl::OnSilencedUntil( DCHECK_EQ(MessageLoop::current(), sync_loop_); wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, silenced_until - TimeTicks::Now())); - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, - &SyncSchedulerImpl::Unthrottle); } bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { diff --git a/sync/engine/sync_scheduler_impl.h b/sync/engine/sync_scheduler_impl.h index 90ce57c..797f0f8 100644 --- a/sync/engine/sync_scheduler_impl.h +++ b/sync/engine/sync_scheduler_impl.h @@ -20,6 +20,7 @@ #include "sync/engine/net/server_connection_manager.h" #include "sync/engine/nudge_source.h" #include "sync/engine/sync_scheduler.h" +#include "sync/engine/sync_session_job.h" #include "sync/engine/syncer.h" #include "sync/internal_api/public/base/model_type_invalidation_map.h" #include "sync/internal_api/public/engine/polling_constants.h" @@ -48,12 +49,12 @@ class SyncSchedulerImpl : public SyncScheduler { const ConfigurationParams& params) OVERRIDE; virtual void RequestStop(const base::Closure& callback) OVERRIDE; virtual void ScheduleNudgeAsync( - const base::TimeDelta& delay, + const base::TimeDelta& desired_delay, NudgeSource source, ModelTypeSet types, const tracked_objects::Location& nudge_location) OVERRIDE; virtual void ScheduleNudgeWithStatesAsync( - const base::TimeDelta& delay, NudgeSource source, + const base::TimeDelta& desired_delay, NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, const tracked_objects::Location& nudge_location) OVERRIDE; virtual void SetNotificationsEnabled(bool notifications_enabled) OVERRIDE; @@ -87,40 +88,6 @@ class SyncSchedulerImpl : public SyncScheduler { DROP, }; - struct SyncSessionJob { - // An enum used to describe jobs for scheduling purposes. - enum SyncSessionJobPurpose { - // Uninitialized state, should never be hit in practice. - UNKNOWN = -1, - // Our poll timer schedules POLL jobs periodically based on a server - // assigned poll interval. - POLL, - // A nudge task can come from a variety of components needing to force - // a sync. The source is inferable from |session.source()|. - NUDGE, - // Typically used for fetching updates for a subset of the enabled types - // during initial sync or reconfiguration. - CONFIGURATION, - }; - SyncSessionJob(); - SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, - linked_ptr<sessions::SyncSession> session, bool is_canary_job, - const ConfigurationParams& config_params, - const tracked_objects::Location& nudge_location); - ~SyncSessionJob(); - static const char* GetPurposeString(SyncSessionJobPurpose purpose); - - SyncSessionJobPurpose purpose; - base::TimeTicks scheduled_start; - linked_ptr<sessions::SyncSession> session; - bool is_canary_job; - ConfigurationParams config_params; - - // This is the location the job came from. Used for debugging. - // In case of multiple nudges getting coalesced this stores the - // first location that came in. - tracked_objects::Location from_here; - }; friend class SyncSchedulerTest; friend class SyncSchedulerWhiteboxTest; friend class SyncerTest; @@ -172,20 +139,14 @@ class SyncSchedulerImpl : public SyncScheduler { base::OneShotTimer<SyncSchedulerImpl> timer; // Configure jobs are saved only when backing off or throttling. So we - // expose the pointer here. - scoped_ptr<SyncSessionJob> pending_configure_job; + // expose the pointer here (does not own, similar to pending_nudge). + SyncSessionJob* pending_configure_job; }; static const char* GetModeString(Mode mode); static const char* GetDecisionString(JobProcessDecision decision); - // Assign |start| and |end| to appropriate SyncerStep values for the - // specified |purpose|. - static void SetSyncerStepsForPurpose( - SyncSessionJob::SyncSessionJobPurpose purpose, - SyncerStep* start, SyncerStep* end); - // Helpers that log before posting to |sync_loop_|. These will only post // the task in between calls to Start/Stop. void PostTask(const tracked_objects::Location& from_here, @@ -197,45 +158,43 @@ class SyncSchedulerImpl : public SyncScheduler { base::TimeDelta delay); // Helper to assemble a job and post a delayed task to sync. - void ScheduleSyncSessionJob(const SyncSessionJob& job); + void ScheduleSyncSessionJob(scoped_ptr<SyncSessionJob> job); // Invoke the Syncer to perform a sync. - void DoSyncSessionJob(const SyncSessionJob& job); + bool DoSyncSessionJob(scoped_ptr<SyncSessionJob> job); // Called after the Syncer has performed the sync represented by |job|, to - // reset our state. - void FinishSyncSessionJob(const SyncSessionJob& job); + // reset our state. |exited_prematurely| is true if the Syncer did not + // cycle from job.start_step() to job.end_step(), likely because the + // scheduler was forced to quit the job mid-way through. + bool FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, + bool exited_prematurely); // Helper to FinishSyncSessionJob to schedule the next sync operation. - void ScheduleNextSync(const SyncSessionJob& old_job); + // |succeeded| carries the return value of |old_job|->Finish. + void ScheduleNextSync(scoped_ptr<SyncSessionJob> finished_job, + bool succeeded); // Helper to configure polling intervals. Used by Start and ScheduleNextSync. void AdjustPolling(const SyncSessionJob* old_job); // Helper to restart waiting with |wait_interval_|'s timer. - void RestartWaiting(); + void RestartWaiting(scoped_ptr<SyncSessionJob> job); // Helper to ScheduleNextSync in case of consecutive sync errors. - void HandleContinuationError(const SyncSessionJob& old_job); - - // Determines if it is legal to run |job| by checking current - // operational mode, backoff or throttling, freshness - // (so we don't make redundant syncs), and connection. - bool ShouldRunJob(const SyncSessionJob& job); + void HandleContinuationError(scoped_ptr<SyncSessionJob> old_job); // Decide whether we should CONTINUE, SAVE or DROP the job. JobProcessDecision DecideOnJob(const SyncSessionJob& job); + // If DecideOnJob decides that |job| should be SAVEd, this function will + // carry out the task of actually "saving" (or coalescing) the job. + void HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job); + // Decide on whether to CONTINUE, SAVE or DROP the job when we are in // backoff mode. JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); - // Saves the job for future execution. Note: It drops all the poll jobs. - void SaveJob(const SyncSessionJob& job); - - // Coalesces the current job with the pending nudge. - void InitOrCoalescePendingJob(const SyncSessionJob& job); - // 'Impl' here refers to real implementation of public functions, running on // |thread_|. void StopImpl(const base::Closure& callback); @@ -243,7 +202,7 @@ class SyncSchedulerImpl : public SyncScheduler { const base::TimeDelta& delay, sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, const ModelTypeInvalidationMap& invalidation_map, - bool is_canary_job, const tracked_objects::Location& nudge_location); + const tracked_objects::Location& nudge_location); // Returns true if the client is currently in exponential backoff. bool IsBackingOff() const; @@ -251,20 +210,27 @@ class SyncSchedulerImpl : public SyncScheduler { // Helper to signal all listeners registered with |session_context_|. void Notify(SyncEngineEvent::EventCause cause); - // Callback to change backoff state. - void DoCanaryJob(); - void Unthrottle(); - - // Executes the pending job. Called whenever an event occurs that may - // change conditions permitting a job to run. Like when network connection is - // re-established, mode changes etc. - void DoPendingJobIfPossible(bool is_canary_job); + // Callback to change backoff state. |to_be_canary| in both cases is the job + // that should be granted canary privileges. Note: it is possible that the + // job that gets scheduled when this callback is scheduled is different from + // the job that will actually get executed, because other jobs may have been + // scheduled while we were waiting for the callback. + void DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary); + void Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary); + + // Returns a pending job that has potential to run given the state of the + // scheduler, if it exists. Useful whenever an event occurs that may + // change conditions that permit a job to run, such as re-establishing + // network connection, auth refresh, mode changes etc. Note that the returned + // job may have been scheduled to run at a later time, or may have been + // unscheduled. In the former case, this will result in abandoning the old + // job and effectively cancelling it. + scoped_ptr<SyncSessionJob> TakePendingJobForCurrentMode(); // Called when the root cause of the current connection error is fixed. void OnServerConnectionErrorFixed(); - // The pointer is owned by the caller. - sessions::SyncSession* CreateSyncSession( + scoped_ptr<sessions::SyncSession> CreateSyncSession( const sessions::SyncSourceInfo& info); // Creates a session for a poll and performs the sync. @@ -323,8 +289,18 @@ class SyncSchedulerImpl : public SyncScheduler { // The latest connection code we got while trying to connect. HttpResponse::ServerConnectionCode connection_code_; - // Tracks in-flight nudges so we can coalesce. - scoped_ptr<SyncSessionJob> pending_nudge_; + // Tracks (does not own) in-flight nudges (scheduled or unscheduled), + // so we can coalesce. NULL if there is no pending nudge. + SyncSessionJob* pending_nudge_; + + // There are certain situations where we want to remember a nudge, but + // there is no well defined moment in time in the future when that nudge + // should run, e.g. if it requires a mode switch or updated auth credentials. + // This member will own NUDGE jobs in those cases, until an external event + // (mode switch or fixed auth) occurs to trigger a retry. Should be treated + // as opaque / not interacted with (i.e. we could build a wrapper to + // hide the type, but that's probably overkill). + scoped_ptr<SyncSessionJob> unscheduled_nudge_storage_; // Current wait state. Null if we're not in backoff and not throttled. scoped_ptr<WaitInterval> wait_interval_; diff --git a/sync/engine/sync_scheduler_unittest.cc b/sync/engine/sync_scheduler_unittest.cc index 31f850a..7746aea 100644 --- a/sync/engine/sync_scheduler_unittest.cc +++ b/sync/engine/sync_scheduler_unittest.cc @@ -42,7 +42,7 @@ using sync_pb::GetUpdatesCallerInfo; class MockSyncer : public Syncer { public: - MOCK_METHOD3(SyncShare, void(sessions::SyncSession*, SyncerStep, + MOCK_METHOD3(SyncShare, bool(sessions::SyncSession*, SyncerStep, SyncerStep)); }; @@ -110,6 +110,7 @@ class SyncSchedulerTest : public testing::Test { routing_info_[THEMES] = GROUP_UI; routing_info_[NIGORI] = GROUP_PASSIVE; + workers_.clear(); workers_.push_back(make_scoped_refptr(new FakeModelWorker(GROUP_UI))); workers_.push_back(make_scoped_refptr(new FakeModelWorker(GROUP_DB))); workers_.push_back(make_scoped_refptr(new FakeModelWorker(GROUP_PASSIVE))); @@ -245,6 +246,7 @@ ACTION_P(RecordSyncShare, record) { RecordSyncShareImpl(arg0, record); if (MessageLoop::current()->is_running()) QuitLoopNow(); + return true; } ACTION_P2(RecordSyncShareMultiple, record, quit_after) { @@ -254,15 +256,18 @@ ACTION_P2(RecordSyncShareMultiple, record, quit_after) { MessageLoop::current()->is_running()) { QuitLoopNow(); } + return true; } ACTION(AddFailureAndQuitLoopNow) { ADD_FAILURE(); QuitLoopNow(); + return true; } ACTION(QuitLoopNowAction) { QuitLoopNow(); + return true; } // Test nudge scheduling. @@ -646,8 +651,10 @@ TEST_F(SyncSchedulerTest, PollIntervalUpdate) { TimeDelta poll2(TimeDelta::FromMilliseconds(30)); scheduler()->OnReceivedLongPollIntervalUpdate(poll1); EXPECT_CALL(*syncer(), SyncShare(_,_,_)).Times(AtLeast(kMinNumSamples)) - .WillOnce(WithArg<0>( - sessions::test_util::SimulatePollIntervalUpdate(poll2))) + .WillOnce(DoAll( + WithArg<0>( + sessions::test_util::SimulatePollIntervalUpdate(poll2)), + Return(true))) .WillRepeatedly( DoAll(Invoke(sessions::test_util::SimulateSuccess), WithArg<0>( @@ -700,7 +707,9 @@ TEST_F(SyncSchedulerTest, ThrottlingDoesThrottle) { scheduler()->OnReceivedLongPollIntervalUpdate(poll); EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(WithArg<0>(sessions::test_util::SimulateThrottled(throttle))) + .WillOnce(DoAll( + WithArg<0>(sessions::test_util::SimulateThrottled(throttle)), + Return(true))) .WillRepeatedly(AddFailureAndQuitLoopNow()); StartSyncScheduler(SyncScheduler::NORMAL_MODE); @@ -729,7 +738,9 @@ TEST_F(SyncSchedulerTest, ThrottlingExpires) { ::testing::InSequence seq; EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(WithArg<0>(sessions::test_util::SimulateThrottled(throttle1))) + .WillOnce(DoAll( + WithArg<0>(sessions::test_util::SimulateThrottled(throttle1)), + Return(true))) .RetiresOnSaturation(); EXPECT_CALL(*syncer(), SyncShare(_,_,_)) .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), @@ -827,7 +838,9 @@ TEST_F(BackoffTriggersSyncSchedulerTest, FailCommitOnce) { // retry. Expect that this clears the backoff state. TEST_F(BackoffTriggersSyncSchedulerTest, FailDownloadOnceThenSucceed) { EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(Invoke(sessions::test_util::SimulateDownloadUpdatesFailed)) + .WillOnce(DoAll( + Invoke(sessions::test_util::SimulateDownloadUpdatesFailed), + Return(true))) .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), QuitLoopNowAction())); EXPECT_FALSE(RunAndGetBackoff()); @@ -837,7 +850,9 @@ TEST_F(BackoffTriggersSyncSchedulerTest, FailDownloadOnceThenSucceed) { // that this clears the backoff state. TEST_F(BackoffTriggersSyncSchedulerTest, FailCommitOnceThenSucceed) { EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillOnce(DoAll( + Invoke(sessions::test_util::SimulateCommitFailed), + Return(true))) .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), QuitLoopNowAction())); EXPECT_FALSE(RunAndGetBackoff()); @@ -847,7 +862,9 @@ TEST_F(BackoffTriggersSyncSchedulerTest, FailCommitOnceThenSucceed) { // Expect this will leave the scheduler in backoff. TEST_F(BackoffTriggersSyncSchedulerTest, FailDownloadTwice) { EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(Invoke(sessions::test_util::SimulateDownloadUpdatesFailed)) + .WillOnce(DoAll( + Invoke(sessions::test_util::SimulateDownloadUpdatesFailed), + Return(true))) .WillRepeatedly(DoAll( Invoke(sessions::test_util::SimulateDownloadUpdatesFailed), QuitLoopNowAction())); @@ -858,7 +875,9 @@ TEST_F(BackoffTriggersSyncSchedulerTest, FailDownloadTwice) { // updates. Expect this will leave the scheduler in backoff. TEST_F(BackoffTriggersSyncSchedulerTest, FailGetEncryptionKey) { EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(Invoke(sessions::test_util::SimulateGetEncryptionKeyFailed)) + .WillOnce(DoAll( + Invoke(sessions::test_util::SimulateGetEncryptionKeyFailed), + Return(true))) .WillRepeatedly(DoAll( Invoke(sessions::test_util::SimulateGetEncryptionKeyFailed), QuitLoopNowAction())); @@ -1040,7 +1059,8 @@ TEST_F(SyncSchedulerTest, TransientPollFailure) { TEST_F(SyncSchedulerTest, SyncerSteps) { // Nudges. EXPECT_CALL(*syncer(), SyncShare(_, SYNCER_BEGIN, SYNCER_END)) - .WillOnce(Invoke(sessions::test_util::SimulateSuccess)); + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + Return(true))); StartSyncScheduler(SyncScheduler::NORMAL_MODE); scheduler()->ScheduleNudgeAsync( @@ -1054,7 +1074,8 @@ TEST_F(SyncSchedulerTest, SyncerSteps) { // Configuration. EXPECT_CALL(*syncer(), SyncShare(_, DOWNLOAD_UPDATES, APPLY_UPDATES)) - .WillOnce(Invoke(sessions::test_util::SimulateSuccess)); + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + Return(true))); StartSyncScheduler(SyncScheduler::CONFIGURATION_MODE); ModelTypeSet model_types(BOOKMARKS); @@ -1095,8 +1116,10 @@ TEST_F(SyncSchedulerTest, StartWhenNotConnected) { connection()->SetServerNotReachable(); connection()->UpdateConnectionStatus(); EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(Invoke(sessions::test_util::SimulateConnectionFailure)) - .WillOnce(QuitLoopNowAction()); + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateConnectionFailure), + Return(true))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + QuitLoopNowAction())); StartSyncScheduler(SyncScheduler::NORMAL_MODE); scheduler()->ScheduleNudgeAsync( diff --git a/sync/engine/sync_scheduler_whitebox_unittest.cc b/sync/engine/sync_scheduler_whitebox_unittest.cc index 8cf4a37..383f17d 100644 --- a/sync/engine/sync_scheduler_whitebox_unittest.cc +++ b/sync/engine/sync_scheduler_whitebox_unittest.cc @@ -92,7 +92,7 @@ class SyncSchedulerWhiteboxTest : public testing::Test { } SyncSchedulerImpl::JobProcessDecision DecideOnJob( - const SyncSchedulerImpl::SyncSessionJob& job) { + const SyncSessionJob& job) { return scheduler_->DecideOnJob(job); } @@ -102,13 +102,10 @@ class SyncSchedulerWhiteboxTest : public testing::Test { } SyncSchedulerImpl::JobProcessDecision CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { - SyncSession* s = scheduler_->CreateSyncSession(SyncSourceInfo()); - SyncSchedulerImpl::SyncSessionJob job(purpose, TimeTicks::Now(), - make_linked_ptr(s), - false, - ConfigurationParams(), - FROM_HERE); + SyncSessionJob::Purpose purpose) { + scoped_ptr<SyncSession> s(scheduler_->CreateSyncSession(SyncSourceInfo())); + SyncSessionJob job(purpose, TimeTicks::Now(), s.Pass(), + ConfigurationParams(), FROM_HERE); return DecideOnJob(job); } @@ -135,7 +132,7 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudge) { SetMode(SyncScheduler::CONFIGURATION_MODE); SyncSchedulerImpl::JobProcessDecision decision = - CreateAndDecideJob(SyncSchedulerImpl::SyncSessionJob::NUDGE); + CreateAndDecideJob(SyncSessionJob::NUDGE); EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } @@ -153,17 +150,14 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileTypeThrottled) { ModelTypeSetToInvalidationMap(types, std::string()); SyncSourceInfo info(GetUpdatesCallerInfo::LOCAL, invalidation_map); - SyncSession* s = scheduler_->CreateSyncSession(info); + scoped_ptr<SyncSession> s(scheduler_->CreateSyncSession(info)); // Now schedule a nudge with just bookmarks and the change is local. - SyncSchedulerImpl::SyncSessionJob job( - SyncSchedulerImpl::SyncSessionJob::NUDGE, - TimeTicks::Now(), - make_linked_ptr(s), - false, - ConfigurationParams(), - FROM_HERE); - + SyncSessionJob job(SyncSessionJob::NUDGE, + TimeTicks::Now(), + s.Pass(), + ConfigurationParams(), + FROM_HERE); SyncSchedulerImpl::JobProcessDecision decision = DecideOnJob(job); EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } @@ -172,7 +166,7 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueNudge) { InitializeSyncerOnNormalMode(); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::NUDGE); + SyncSessionJob::NUDGE); EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } @@ -182,7 +176,7 @@ TEST_F(SyncSchedulerWhiteboxTest, DropPoll) { SetMode(SyncScheduler::CONFIGURATION_MODE); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::POLL); + SyncSessionJob::POLL); EXPECT_EQ(decision, SyncSchedulerImpl::DROP); } @@ -191,7 +185,7 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinuePoll) { InitializeSyncerOnNormalMode(); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::POLL); + SyncSessionJob::POLL); EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } @@ -201,7 +195,7 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueConfiguration) { SetMode(SyncScheduler::CONFIGURATION_MODE); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::CONFIGURATION); + SyncSessionJob::CONFIGURATION); EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } @@ -213,7 +207,7 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveConfigurationWhileThrottled) { SetWaitIntervalToThrottled(); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::CONFIGURATION); + SyncSessionJob::CONFIGURATION); EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } @@ -225,7 +219,7 @@ TEST_F(SyncSchedulerWhiteboxTest, SaveNudgeWhileThrottled) { SetWaitIntervalToThrottled(); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::NUDGE); + SyncSessionJob::NUDGE); EXPECT_EQ(decision, SyncSchedulerImpl::SAVE); } @@ -236,7 +230,7 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueNudgeWhileExponentialBackOff) { SetWaitIntervalToExponentialBackoff(); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::NUDGE); + SyncSessionJob::NUDGE); EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); } @@ -248,7 +242,7 @@ TEST_F(SyncSchedulerWhiteboxTest, DropNudgeWhileExponentialBackOff) { SetWaitIntervalHadNudge(true); SyncSchedulerImpl::JobProcessDecision decision = CreateAndDecideJob( - SyncSchedulerImpl::SyncSessionJob::NUDGE); + SyncSessionJob::NUDGE); EXPECT_EQ(decision, SyncSchedulerImpl::DROP); } @@ -258,10 +252,11 @@ TEST_F(SyncSchedulerWhiteboxTest, ContinueCanaryJobConfig) { SetMode(SyncScheduler::CONFIGURATION_MODE); SetWaitIntervalToExponentialBackoff(); - struct SyncSchedulerImpl::SyncSessionJob job; - job.purpose = SyncSchedulerImpl::SyncSessionJob::CONFIGURATION; - job.scheduled_start = TimeTicks::Now(); - job.is_canary_job = true; + SyncSessionJob job(SyncSessionJob::CONFIGURATION, + TimeTicks::Now(), scoped_ptr<SyncSession>(), + ConfigurationParams(), FROM_HERE); + + job.GrantCanaryPrivilege(); SyncSchedulerImpl::JobProcessDecision decision = DecideOnJob(job); EXPECT_EQ(decision, SyncSchedulerImpl::CONTINUE); diff --git a/sync/engine/sync_session_job.cc b/sync/engine/sync_session_job.cc new file mode 100644 index 0000000..7dc5661 --- /dev/null +++ b/sync/engine/sync_session_job.cc @@ -0,0 +1,184 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_session_job.h" +#include "sync/internal_api/public/sessions/model_neutral_state.h" + +namespace syncer { + +SyncSessionJob::~SyncSessionJob() {} + +SyncSessionJob::SyncSessionJob( + Purpose purpose, + base::TimeTicks start, + scoped_ptr<sessions::SyncSession> session, + const ConfigurationParams& config_params, + const tracked_objects::Location& from_location) + : purpose_(purpose), + scheduled_start_(start), + session_(session.Pass()), + is_canary_(false), + config_params_(config_params), + finished_(NOT_FINISHED), + from_location_(from_location) { +} + +#define ENUM_CASE(x) case x: return #x; break; +const char* SyncSessionJob::GetPurposeString(SyncSessionJob::Purpose purpose) { + switch (purpose) { + ENUM_CASE(UNKNOWN); + ENUM_CASE(POLL); + ENUM_CASE(NUDGE); + ENUM_CASE(CONFIGURATION); + } + NOTREACHED(); + return ""; +} +#undef ENUM_CASE + +bool SyncSessionJob::Finish(bool early_exit) { + DCHECK_EQ(finished_, NOT_FINISHED); + // Did we run through all SyncerSteps from start_step() to end_step() + // until the SyncSession returned !HasMoreToSync()? + // Note: if not, it's possible the scheduler hasn't started with + // SyncShare yet, it's possible there is still more to sync in the session, + // and it's also possible the job quit part way through due to a premature + // exit condition (such as shutdown). + finished_ = early_exit ? EARLY_EXIT : FINISHED; + + if (early_exit) + return false; + + DCHECK(!session_->HasMoreToSync()); + + // Did we hit any errors along the way? + if (sessions::HasSyncerError( + session_->status_controller().model_neutral_state())) { + return false; + } + + const sessions::ModelNeutralState& state( + session_->status_controller().model_neutral_state()); + switch (purpose_) { + case POLL: + case NUDGE: + DCHECK_NE(state.last_download_updates_result, UNSET); + DCHECK_NE(state.commit_result, UNSET); + break; + case CONFIGURATION: + DCHECK_NE(state.last_download_updates_result, UNSET); + break; + case UNKNOWN: + default: + NOTREACHED(); + } + + if (!config_params_.ready_task.is_null()) + config_params_.ready_task.Run(); + return true; +} + +scoped_ptr<SyncSessionJob> SyncSessionJob::CloneAndAbandon() { + DCHECK_EQ(finished_, NOT_FINISHED); + // Clone |this|, and abandon it by NULL-ing session_. + return scoped_ptr<SyncSessionJob> (new SyncSessionJob( + purpose_, scheduled_start_, session_.Pass(), + config_params_, from_location_)); +} + +scoped_ptr<SyncSessionJob> SyncSessionJob::Clone() const { + DCHECK_GT(finished_, NOT_FINISHED); + return scoped_ptr<SyncSessionJob>(new SyncSessionJob( + purpose_, scheduled_start_, CloneSession().Pass(), + config_params_, from_location_)); +} + +scoped_ptr<SyncSessionJob> SyncSessionJob::CloneFromLocation( + const tracked_objects::Location& from_here) const { + DCHECK_GT(finished_, NOT_FINISHED); + return scoped_ptr<SyncSessionJob>(new SyncSessionJob( + purpose_, scheduled_start_, CloneSession().Pass(), + config_params_, from_here)); +} + +scoped_ptr<sessions::SyncSession> SyncSessionJob::CloneSession() const { + return scoped_ptr<sessions::SyncSession>( + new sessions::SyncSession(session_->context(), + session_->delegate(), session_->source(), session_->routing_info(), + session_->workers())); +} + +bool SyncSessionJob::is_canary() const { + return is_canary_; +} + +SyncSessionJob::Purpose SyncSessionJob::purpose() const { + return purpose_; +} + +base::TimeTicks SyncSessionJob::scheduled_start() const { + return scheduled_start_; +} + +void SyncSessionJob::set_scheduled_start(base::TimeTicks start) { + scheduled_start_ = start; +}; + +const sessions::SyncSession* SyncSessionJob::session() const { + return session_.get(); +} + +sessions::SyncSession* SyncSessionJob::mutable_session() { + return session_.get(); +} + +const tracked_objects::Location& SyncSessionJob::from_location() const { + return from_location_; +} + +ConfigurationParams SyncSessionJob::config_params() const { + return config_params_; +} + +void SyncSessionJob::GrantCanaryPrivilege() { + DCHECK_EQ(finished_, NOT_FINISHED); + DVLOG(2) << "Granting canary priviliege to " << session_.get(); + is_canary_ = true; +} + +SyncerStep SyncSessionJob::start_step() const { + SyncerStep start, end; + GetSyncerStepsForPurpose(purpose_, &start, &end); + return start; +} + +SyncerStep SyncSessionJob::end_step() const { + SyncerStep start, end; + GetSyncerStepsForPurpose(purpose_, &start, &end); + return end; +} + +// static +void SyncSessionJob::GetSyncerStepsForPurpose(Purpose purpose, + SyncerStep* start, + SyncerStep* end) { + switch (purpose) { + case SyncSessionJob::CONFIGURATION: + *start = DOWNLOAD_UPDATES; + *end = APPLY_UPDATES; + return; + case SyncSessionJob::NUDGE: + case SyncSessionJob::POLL: + *start = SYNCER_BEGIN; + *end = SYNCER_END; + return; + default: + NOTREACHED(); + *start = SYNCER_END; + *end = SYNCER_END; + return; + } +} + +} // namespace syncer diff --git a/sync/engine/sync_session_job.h b/sync/engine/sync_session_job.h new file mode 100644 index 0000000..136f1de --- /dev/null +++ b/sync/engine/sync_session_job.h @@ -0,0 +1,124 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_SYNC_SESSION_JOB_H_ +#define SYNC_ENGINE_SYNC_SESSION_JOB_H_ + +#include "base/memory/scoped_ptr.h" +#include "base/time.h" +#include "base/tracked_objects.h" +#include "sync/engine/sync_scheduler.h" +#include "sync/engine/syncer.h" +#include "sync/sessions/sync_session.h" + +namespace syncer { + +class SyncSessionJob { + public: + enum Purpose { + // Uninitialized state, should never be hit in practice. + UNKNOWN = -1, + // Our poll timer schedules POLL jobs periodically based on a server + // assigned poll interval. + POLL, + // A nudge task can come from a variety of components needing to force + // a sync. The source is inferable from |session.source()|. + NUDGE, + // Typically used for fetching updates for a subset of the enabled types + // during initial sync or reconfiguration. + CONFIGURATION, + }; + + SyncSessionJob(Purpose purpose, + base::TimeTicks start, + scoped_ptr<sessions::SyncSession> session, + const ConfigurationParams& config_params, + const tracked_objects::Location& nudge_location); + ~SyncSessionJob(); + + // Returns a new clone of the job, with a cloned SyncSession ready to be + // retried / rescheduled. The returned job will *never* be a canary, + // regardless of |this|. A job can only be cloned once it has finished, + // to prevent bugs where multiple jobs are scheduled with the same session. + // Use CloneAndAbandon if you want to clone before finishing. + scoped_ptr<SyncSessionJob> Clone() const; + scoped_ptr<SyncSessionJob> CloneFromLocation( + const tracked_objects::Location& from_here) const; + + // Same as Clone() above, but also ejects the SyncSession from this job, + // preventing it from ever being used for a sync cycle. + scoped_ptr<SyncSessionJob> CloneAndAbandon(); + + // Record that the scheduler has deemed the job as finished and give it a + // chance to perform any remaining cleanup and/or notification completion + // callback invocations. + // |early_exit| specifies whether the job 1) cycled through all the + // SyncerSteps it needed, or 2) was pre-empted by the scheduler. + // Returns true if we completely ran the session without errors. + // There are many errors that could prevent a sync cycle from succeeding, + // such as invalid local state, inability to contact the server, inability + // to authenticate with the server, and server errors. What they have in + // common is that the we either need to take some action and then retry the + // sync cycle or, in the case of transient errors, retry after some backoff + // timer has expired. Most importantly, the SyncScheduler should not assume + // that the original action that triggered the sync cycle (ie. a nudge or a + // notification) has been properly serviced. + bool Finish(bool early_exit); + + // Causes is_canary() to return true. Use with caution. + void GrantCanaryPrivilege(); + + static const char* GetPurposeString(Purpose purpose); + static void GetSyncerStepsForPurpose(Purpose purpose, + SyncerStep* start, + SyncerStep* end); + + bool is_canary() const; + Purpose purpose() const; + base::TimeTicks scheduled_start() const; + void set_scheduled_start(base::TimeTicks start); + const sessions::SyncSession* session() const; + sessions::SyncSession* mutable_session(); + const tracked_objects::Location& from_location() const; + SyncerStep start_step() const; + SyncerStep end_step() const; + ConfigurationParams config_params() const; + + private: + // A SyncSessionJob can be in one of these three states, controlled by the + // Finish() function, see method comments. + enum FinishedState { + NOT_FINISHED, // Finish has not been called. + EARLY_EXIT, // Finish was called but the job was "preempted", + FINISHED // Indicates a "clean" finish operation. + }; + + scoped_ptr<sessions::SyncSession> CloneSession() const; + + const Purpose purpose_; + + base::TimeTicks scheduled_start_; + scoped_ptr<sessions::SyncSession> session_; + bool is_canary_; + + // Only used for purpose_ == CONFIGURATION. This, and different Finish() and + // Succeeded() behavior may be arguments to subclass in the future. + const ConfigurationParams config_params_; + + // Set to true if Finish() was called, false otherwise. True implies that + // a SyncShare operation took place with |session_| and it cycled through + // all requisite steps given |purpose_| without being preempted. + FinishedState finished_; + + // This is the location the job came from. Used for debugging. + // In case of multiple nudges getting coalesced this stores the + // first location that came in. + tracked_objects::Location from_location_; + + DISALLOW_COPY_AND_ASSIGN(SyncSessionJob); +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_SYNC_SESSION_JOB_H_ diff --git a/sync/engine/sync_session_job_unittest.cc b/sync/engine/sync_session_job_unittest.cc new file mode 100644 index 0000000..466dffa --- /dev/null +++ b/sync/engine/sync_session_job_unittest.cc @@ -0,0 +1,239 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_session_job.h" + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/time.h" +#include "sync/internal_api/public/base/model_type_invalidation_map.h" +#include "sync/sessions/sync_session.h" +#include "sync/sessions/sync_session_context.h" +#include "sync/sessions/test_util.h" +#include "sync/test/engine/fake_model_worker.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::TimeTicks; + +namespace syncer { + +using sessions::SyncSession; + +class MockDelegate : public SyncSession::Delegate { + public: + MockDelegate() {} + ~MockDelegate() {} + + MOCK_METHOD0(IsSyncingCurrentlySilenced, bool()); + MOCK_METHOD1(OnReceivedShortPollIntervalUpdate, void(const base::TimeDelta&)); + MOCK_METHOD1(OnReceivedLongPollIntervalUpdate ,void(const base::TimeDelta&)); + MOCK_METHOD1(OnReceivedSessionsCommitDelay, void(const base::TimeDelta&)); + MOCK_METHOD1(OnSyncProtocolError, void(const sessions::SyncSessionSnapshot&)); + MOCK_METHOD0(OnShouldStopSyncingPermanently, void()); + MOCK_METHOD1(OnSilencedUntil, void(const base::TimeTicks&)); +}; + +class SyncSessionJobTest : public testing::Test { + public: + SyncSessionJobTest() : config_params_callback_invoked_(false) {} + virtual void SetUp() { + routes_.clear(); + workers_.clear(); + config_params_callback_invoked_ = false; + routes_[BOOKMARKS] = GROUP_PASSIVE; + scoped_refptr<ModelSafeWorker> passive_worker( + new FakeModelWorker(GROUP_PASSIVE)); + workers_.push_back(passive_worker); + std::vector<ModelSafeWorker*> workers; + GetWorkers(&workers); + context_.reset(new sessions::SyncSessionContext( + NULL, // |connection_manager| + NULL, // |directory| + workers, + NULL, // |extensions_activity_monitor| + NULL, // |throttled_data_type_tracker| + std::vector<SyncEngineEventListener*>(), + NULL, // |debug_info_getter| + NULL, // |traffic_recorder| + true /* |enable keystore encryption| */)); + context_->set_routing_info(routes_); + } + + scoped_ptr<SyncSession> NewLocalSession() { + sessions::SyncSourceInfo info( + sync_pb::GetUpdatesCallerInfo::LOCAL, ModelTypeInvalidationMap()); + return scoped_ptr<SyncSession>(new SyncSession(context_.get(), + &delegate_, info, context_->routing_info(), + context_->workers())); + } + + void GetWorkers(std::vector<ModelSafeWorker*>* out) const { + out->clear(); + for (std::vector<scoped_refptr<ModelSafeWorker> >::const_iterator it = + workers_.begin(); it != workers_.end(); ++it) { + out->push_back(it->get()); + } + } + + void ConfigurationParamsCallback() { + config_params_callback_invoked_ = true; + } + + bool config_params_callback_invoked() const { + return config_params_callback_invoked_; + } + + sessions::SyncSessionContext* context() { return context_.get(); } + SyncSession::Delegate* delegate() { return &delegate_; } + const ModelSafeRoutingInfo& routes() { return routes_; } + + // Checks that the two jobs are "clones" as defined by SyncSessionJob, + // minus location and SyncSession checking, for reuse in different + // scenarios. + void ExpectClonesBase(SyncSessionJob* job, SyncSessionJob* clone) { + EXPECT_EQ(job->purpose(), clone->purpose()); + EXPECT_EQ(job->scheduled_start(), clone->scheduled_start()); + EXPECT_EQ(job->start_step(), clone->start_step()); + EXPECT_EQ(job->end_step(), clone->end_step()); + EXPECT_FALSE(clone->is_canary()); + } + + private: + scoped_ptr<sessions::SyncSessionContext> context_; + std::vector<scoped_refptr<ModelSafeWorker> > workers_; + MockDelegate delegate_; + ModelSafeRoutingInfo routes_; + bool config_params_callback_invoked_; +}; + +TEST_F(SyncSessionJobTest, Clone) { + SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(), + NewLocalSession().Pass(), ConfigurationParams(), FROM_HERE); + + sessions::test_util::SimulateSuccess(job1.mutable_session(), + job1.start_step(), + job1.end_step()); + job1.Finish(false); + ModelSafeRoutingInfo new_routes; + new_routes[AUTOFILL] = GROUP_PASSIVE; + context()->set_routing_info(new_routes); + const tracked_objects::Location from_here1(FROM_HERE); + scoped_ptr<SyncSessionJob> clone1 = job1.Clone(); + scoped_ptr<SyncSessionJob> clone1_loc = job1.CloneFromLocation(from_here1); + + ExpectClonesBase(&job1, clone1.get()); + ExpectClonesBase(&job1, clone1_loc.get()); + EXPECT_NE(job1.session(), clone1->session()); + EXPECT_EQ(job1.session()->routing_info(), + clone1->session()->routing_info()); + EXPECT_EQ(job1.from_location().ToString(), + clone1->from_location().ToString()); + EXPECT_NE(job1.session(), clone1_loc->session()); + EXPECT_EQ(job1.session()->routing_info(), + clone1_loc->session()->routing_info()); + EXPECT_EQ(from_here1.ToString(), clone1_loc->from_location().ToString()); + + context()->set_routing_info(routes()); + clone1->GrantCanaryPrivilege(); + sessions::test_util::SimulateSuccess(clone1->mutable_session(), + clone1->start_step(), + clone1->end_step()); + clone1->Finish(false); + const tracked_objects::Location from_here2(FROM_HERE); + scoped_ptr<SyncSessionJob> clone2 = clone1->Clone(); + scoped_ptr<SyncSessionJob> clone2_loc(clone1->CloneFromLocation(from_here2)); + + ExpectClonesBase(clone1.get(), clone2.get()); + ExpectClonesBase(clone1.get(), clone2_loc.get()); + EXPECT_NE(clone1->session(), clone2->session()); + EXPECT_EQ(clone1->session()->routing_info(), + clone2->session()->routing_info()); + EXPECT_EQ(clone1->from_location().ToString(), + clone2->from_location().ToString()); + EXPECT_NE(clone1->session(), clone2->session()); + EXPECT_EQ(clone1->session()->routing_info(), + clone2->session()->routing_info()); + EXPECT_EQ(from_here2.ToString(), clone2_loc->from_location().ToString()); + + clone1.reset(); + clone1_loc.reset(); + ExpectClonesBase(&job1, clone2.get()); + EXPECT_NE(job1.session(), clone2->session()); + EXPECT_EQ(job1.session()->routing_info(), + clone2->session()->routing_info()); + EXPECT_EQ(job1.from_location().ToString(), + clone2->from_location().ToString()); +} + +TEST_F(SyncSessionJobTest, CloneAfterEarlyExit) { + SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(), + NewLocalSession().Pass(), ConfigurationParams(), FROM_HERE); + job1.Finish(true); + scoped_ptr<SyncSessionJob> job2 = job1.Clone(); + scoped_ptr<SyncSessionJob> job2_loc = job1.CloneFromLocation(FROM_HERE); + ExpectClonesBase(&job1, job2.get()); + ExpectClonesBase(&job1, job2_loc.get()); +} + +TEST_F(SyncSessionJobTest, CloneAndAbandon) { + scoped_ptr<SyncSession> session = NewLocalSession(); + SyncSession* session_ptr = session.get(); + + SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(), + session.Pass(), ConfigurationParams(), FROM_HERE); + ModelSafeRoutingInfo new_routes; + new_routes[AUTOFILL] = GROUP_PASSIVE; + context()->set_routing_info(new_routes); + + scoped_ptr<SyncSessionJob> clone1 = job1.CloneAndAbandon(); + ExpectClonesBase(&job1, clone1.get()); + EXPECT_FALSE(job1.session()); + EXPECT_EQ(session_ptr, clone1->session()); + EXPECT_EQ(session_ptr->routing_info(), clone1->session()->routing_info()); +} + +// Tests interaction between Finish and sync cycle success / failure. +TEST_F(SyncSessionJobTest, Finish) { + SyncSessionJob job1(SyncSessionJob::NUDGE, TimeTicks::Now(), + NewLocalSession().Pass(), ConfigurationParams(), FROM_HERE); + + sessions::test_util::SimulateSuccess(job1.mutable_session(), + job1.start_step(), + job1.end_step()); + EXPECT_TRUE(job1.Finish(false /* early_exit */)); + + scoped_ptr<SyncSessionJob> job2 = job1.Clone(); + sessions::test_util::SimulateConnectionFailure(job2->mutable_session(), + job2->start_step(), + job2->end_step()); + EXPECT_FALSE(job2->Finish(false)); + + scoped_ptr<SyncSessionJob> job3 = job2->Clone(); + EXPECT_FALSE(job3->Finish(true)); +} + +TEST_F(SyncSessionJobTest, FinishCallsReadyTask) { + ConfigurationParams params; + params.ready_task = base::Bind( + &SyncSessionJobTest::ConfigurationParamsCallback, + base::Unretained(this)); + + sessions::SyncSourceInfo info( + sync_pb::GetUpdatesCallerInfo::RECONFIGURATION, + ModelTypeInvalidationMap()); + scoped_ptr<SyncSession> session(new SyncSession(context(), + delegate(), info, context()->routing_info(), + context()->workers())); + + SyncSessionJob job1(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), + session.Pass(), params, FROM_HERE); + sessions::test_util::SimulateSuccess(job1.mutable_session(), + job1.start_step(), + job1.end_step()); + job1.Finish(false); + EXPECT_TRUE(config_params_callback_invoked()); +} + +} // namespace syncer diff --git a/sync/engine/syncer.cc b/sync/engine/syncer.cc index 78178c5..2b38501 100644 --- a/sync/engine/syncer.cc +++ b/sync/engine/syncer.cc @@ -83,7 +83,7 @@ void Syncer::RequestEarlyExit() { early_exit_requested_ = true; } -void Syncer::SyncShare(sessions::SyncSession* session, +bool Syncer::SyncShare(sessions::SyncSession* session, SyncerStep first_step, SyncerStep last_step) { ScopedSessionContextConflictResolver scoped(session->context(), @@ -179,17 +179,16 @@ void Syncer::SyncShare(sessions::SyncSession* session, } default: LOG(ERROR) << "Unknown command: " << current_step; - } + } // switch DVLOG(2) << "last step: " << SyncerStepToString(last_step) << ", " << "current step: " << SyncerStepToString(current_step) << ", " << "next step: " << SyncerStepToString(next_step) << ", " << "snapshot: " << session->TakeSnapshot().ToString(); - if (last_step == current_step) { - session->SetFinished(); - break; - } + if (last_step == current_step) + return true; current_step = next_step; - } + } // while + return false; } void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) { diff --git a/sync/engine/syncer.h b/sync/engine/syncer.h index ee97b08..80dfb51 100644 --- a/sync/engine/syncer.h +++ b/sync/engine/syncer.h @@ -57,7 +57,9 @@ class Syncer { void RequestEarlyExit(); // Runs a sync cycle from |first_step| to |last_step|. - virtual void SyncShare(sessions::SyncSession* session, + // Returns true if the cycle completed with |last_step|, and false + // if it terminated early due to error / exit requested. + virtual bool SyncShare(sessions::SyncSession* session, SyncerStep first_step, SyncerStep last_step); diff --git a/sync/engine/syncer_unittest.cc b/sync/engine/syncer_unittest.cc index 5f0a8f3..54d0eec 100644 --- a/sync/engine/syncer_unittest.cc +++ b/sync/engine/syncer_unittest.cc @@ -186,24 +186,24 @@ class SyncerTest : public testing::Test, info, workers); } - void SyncShareAsDelegate( - SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { + + void SyncShareAsDelegate(SyncSessionJob::Purpose purpose) { SyncerStep start; SyncerStep end; - SyncSchedulerImpl::SetSyncerStepsForPurpose(purpose, &start, &end); + SyncSessionJob::GetSyncerStepsForPurpose(purpose, &start, &end); session_.reset(MakeSession()); - syncer_->SyncShare(session_.get(), start, end); + EXPECT_TRUE(syncer_->SyncShare(session_.get(), start, end)); } void SyncShareNudge() { session_.reset(MakeSession()); - SyncShareAsDelegate(SyncSchedulerImpl::SyncSessionJob::NUDGE); + SyncShareAsDelegate(SyncSessionJob::NUDGE); } void SyncShareConfigure() { session_.reset(MakeSession()); - SyncShareAsDelegate(SyncSchedulerImpl::SyncSessionJob::CONFIGURATION); + SyncShareAsDelegate(SyncSessionJob::CONFIGURATION); } virtual void SetUp() { @@ -2538,7 +2538,6 @@ TEST_F(SyncerTest, CommitManyItemsInOneGo_PostBufferFail) { SyncShareNudge(); EXPECT_EQ(1U, mock_server_->commit_messages().size()); - EXPECT_FALSE(session_->Succeeded()); EXPECT_EQ(SYNC_SERVER_ERROR, session_->status_controller().model_neutral_state().commit_result); EXPECT_EQ(items_to_commit - kDefaultMaxCommitBatchSize, @@ -2569,7 +2568,7 @@ TEST_F(SyncerTest, CommitManyItemsInOneGo_CommitConflict) { // We should stop looping at the first sign of trouble. EXPECT_EQ(1U, mock_server_->commit_messages().size()); - EXPECT_FALSE(session_->Succeeded()); + EXPECT_FALSE(session_->HasMoreToSync()); EXPECT_EQ(items_to_commit - (kDefaultMaxCommitBatchSize - 1), directory()->unsynced_entity_count()); } diff --git a/sync/internal_api/js_sync_manager_observer_unittest.cc b/sync/internal_api/js_sync_manager_observer_unittest.cc index a65c3dc..d905679 100644 --- a/sync/internal_api/js_sync_manager_observer_unittest.cc +++ b/sync/internal_api/js_sync_manager_observer_unittest.cc @@ -85,8 +85,7 @@ TEST_F(JsSyncManagerObserverTest, OnSyncCycleCompleted) { sessions::SyncSourceInfo(), false, 0, - base::Time::Now(), - false); + base::Time::Now()); DictionaryValue expected_details; expected_details.Set("snapshot", snapshot.ToValue()); diff --git a/sync/internal_api/public/sessions/model_neutral_state.cc b/sync/internal_api/public/sessions/model_neutral_state.cc index c9caa75..c789dcc 100644 --- a/sync/internal_api/public/sessions/model_neutral_state.cc +++ b/sync/internal_api/public/sessions/model_neutral_state.cc @@ -30,5 +30,13 @@ ModelNeutralState::ModelNeutralState() ModelNeutralState::~ModelNeutralState() {} +bool HasSyncerError(const ModelNeutralState& state) { + const bool get_key_error = SyncerErrorIsError(state.last_get_key_result); + const bool download_updates_error = + SyncerErrorIsError(state.last_download_updates_result); + const bool commit_error = SyncerErrorIsError(state.commit_result); + return get_key_error || download_updates_error || commit_error; +} + } // namespace sessions } // namespace syncer diff --git a/sync/internal_api/public/sessions/model_neutral_state.h b/sync/internal_api/public/sessions/model_neutral_state.h index e291a25..9644175 100644 --- a/sync/internal_api/public/sessions/model_neutral_state.h +++ b/sync/internal_api/public/sessions/model_neutral_state.h @@ -75,6 +75,8 @@ struct ModelNeutralState { int64 num_server_changes_remaining; }; +bool HasSyncerError(const ModelNeutralState& state); + } // namespace sessions } // namespace syncer diff --git a/sync/internal_api/public/sessions/sync_session_snapshot.cc b/sync/internal_api/public/sessions/sync_session_snapshot.cc index 0f6fb01..193a5af 100644 --- a/sync/internal_api/public/sessions/sync_session_snapshot.cc +++ b/sync/internal_api/public/sessions/sync_session_snapshot.cc @@ -21,7 +21,6 @@ SyncSessionSnapshot::SyncSessionSnapshot() num_server_conflicts_(0), notifications_enabled_(false), num_entries_(0), - retry_scheduled_(false), is_initialized_(false) { } @@ -39,8 +38,7 @@ SyncSessionSnapshot::SyncSessionSnapshot( const SyncSourceInfo& source, bool notifications_enabled, size_t num_entries, - base::Time sync_start_time, - bool retry_scheduled) + base::Time sync_start_time) : model_neutral_state_(model_neutral_state), is_share_usable_(is_share_usable), initial_sync_ended_(initial_sync_ended), @@ -55,7 +53,6 @@ SyncSessionSnapshot::SyncSessionSnapshot( notifications_enabled_(notifications_enabled), num_entries_(num_entries), sync_start_time_(sync_start_time), - retry_scheduled_(retry_scheduled), is_initialized_(true) { } @@ -168,10 +165,6 @@ base::Time SyncSessionSnapshot::sync_start_time() const { return sync_start_time_; } -bool SyncSessionSnapshot::retry_scheduled() const { - return retry_scheduled_; -} - bool SyncSessionSnapshot::is_initialized() const { return is_initialized_; } diff --git a/sync/internal_api/public/sessions/sync_session_snapshot.h b/sync/internal_api/public/sessions/sync_session_snapshot.h index 16b0753..d507517 100644 --- a/sync/internal_api/public/sessions/sync_session_snapshot.h +++ b/sync/internal_api/public/sessions/sync_session_snapshot.h @@ -43,8 +43,7 @@ class SyncSessionSnapshot { const SyncSourceInfo& source, bool notifications_enabled, size_t num_entries, - base::Time sync_start_time, - bool retry_scheduled); + base::Time sync_start_time); ~SyncSessionSnapshot(); // Caller takes ownership of the returned dictionary. @@ -69,7 +68,6 @@ class SyncSessionSnapshot { bool notifications_enabled() const; size_t num_entries() const; base::Time sync_start_time() const; - bool retry_scheduled() const; // Set iff this snapshot was not built using the default constructor. bool is_initialized() const; @@ -89,7 +87,6 @@ class SyncSessionSnapshot { bool notifications_enabled_; size_t num_entries_; base::Time sync_start_time_; - bool retry_scheduled_; bool is_initialized_; }; diff --git a/sync/internal_api/public/util/syncer_error.h b/sync/internal_api/public/util/syncer_error.h index ff184e3..478d888 100644 --- a/sync/internal_api/public/util/syncer_error.h +++ b/sync/internal_api/public/util/syncer_error.h @@ -28,6 +28,8 @@ enum SyncerError { // Based on values returned by server. Most are defined in sync.proto. SERVER_RETURN_INVALID_CREDENTIAL, + FIRST_SERVER_RETURN_VALUE = SERVER_RETURN_INVALID_CREDENTIAL, + SERVER_RETURN_UNKNOWN_ERROR, SERVER_RETURN_THROTTLED, SERVER_RETURN_TRANSIENT_ERROR, diff --git a/sync/sessions/session_state_unittest.cc b/sync/sessions/session_state_unittest.cc index 9d20437..92d4b6f 100644 --- a/sync/sessions/session_state_unittest.cc +++ b/sync/sessions/session_state_unittest.cc @@ -90,8 +90,7 @@ TEST_F(SessionStateTest, SyncSessionSnapshotToValue) { source, false, 0, - base::Time::Now(), - false); + base::Time::Now()); scoped_ptr<DictionaryValue> value(snapshot.ToValue()); EXPECT_EQ(20u, value->size()); ExpectDictIntegerValue(model_neutral.num_successful_commits, diff --git a/sync/sessions/sync_session.cc b/sync/sessions/sync_session.cc index 5a2b572..9be78d8 100644 --- a/sync/sessions/sync_session.cc +++ b/sync/sessions/sync_session.cc @@ -77,8 +77,7 @@ SyncSession::SyncSession(SyncSessionContext* context, Delegate* delegate, delegate_(delegate), workers_(workers), routing_info_(routing_info), - enabled_groups_(ComputeEnabledGroups(routing_info_, workers_)), - finished_(false) { + enabled_groups_(ComputeEnabledGroups(routing_info_, workers_)) { status_controller_.reset(new StatusController(routing_info_)); std::sort(workers_.begin(), workers_.end()); } @@ -115,30 +114,31 @@ void SyncSession::Coalesce(const SyncSession& session) { enabled_groups_ = ComputeEnabledGroups(routing_info_, workers_); } -void SyncSession::RebaseRoutingInfoWithLatest(const SyncSession& session) { +void SyncSession::RebaseRoutingInfoWithLatest( + const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers) { ModelSafeRoutingInfo temp_routing_info; - // Take the intersecion and also set the routing info(it->second) from the + // Take the intersection and also set the routing info(it->second) from the // passed in session. for (ModelSafeRoutingInfo::const_iterator it = - session.routing_info_.begin(); it != session.routing_info_.end(); + routing_info.begin(); it != routing_info.end(); ++it) { if (routing_info_.find(it->first) != routing_info_.end()) { temp_routing_info[it->first] = it->second; } } - - // Now swap it. routing_info_.swap(temp_routing_info); - // Now update the payload map. - PurgeStaleStates(&source_.types, session.routing_info_); + PurgeStaleStates(&source_.types, routing_info); // Now update the workers. std::vector<ModelSafeWorker*> temp; + std::vector<ModelSafeWorker*> sorted_workers = workers; + std::sort(sorted_workers.begin(), sorted_workers.end()); std::set_intersection(workers_.begin(), workers_.end(), - session.workers_.begin(), session.workers_.end(), - std::back_inserter(temp)); + sorted_workers.begin(), sorted_workers.end(), + std::back_inserter(temp)); workers_.swap(temp); // Now update enabled groups. @@ -146,7 +146,6 @@ void SyncSession::RebaseRoutingInfoWithLatest(const SyncSession& session) { } void SyncSession::PrepareForAnotherSyncCycle() { - finished_ = false; source_.updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; status_controller_.reset(new StatusController(routing_info_)); @@ -183,8 +182,7 @@ SyncSessionSnapshot SyncSession::TakeSnapshot() const { source_, context_->notifications_enabled(), dir->GetEntriesCount(), - status_controller_->sync_start_time(), - !Succeeded()); + status_controller_->sync_start_time()); } void SyncSession::SendEventNotification(SyncEngineEvent::EventCause cause) { @@ -219,35 +217,11 @@ std::set<ModelSafeGroup> SyncSession::GetEnabledGroupsWithConflicts() const { return enabled_groups_with_conflicts; } -namespace { - -// Returns false iff one of the command results had an error. -bool HadErrors(const ModelNeutralState& state) { - const bool get_key_error = SyncerErrorIsError(state.last_get_key_result); - const bool download_updates_error = - SyncerErrorIsError(state.last_download_updates_result); - const bool commit_error = SyncerErrorIsError(state.commit_result); - return get_key_error || download_updates_error || commit_error; -} -} // namespace - -bool SyncSession::Succeeded() const { - return finished_ && !HadErrors(status_controller_->model_neutral_state()); -} - -bool SyncSession::SuccessfullyReachedServer() const { +bool SyncSession::DidReachServer() const { const ModelNeutralState& state = status_controller_->model_neutral_state(); - bool reached_server = state.last_get_key_result == SYNCER_OK || - state.last_download_updates_result == SYNCER_OK; - // It's possible that we reached the server on one attempt, then had an error - // on the next (or didn't perform some of the server-communicating commands). - // We want to verify that, for all commands attempted, we successfully spoke - // with the server. Therefore, we verify no errors and at least one SYNCER_OK. - return reached_server && !HadErrors(state); -} - -void SyncSession::SetFinished() { - finished_ = true; + return state.last_get_key_result >= FIRST_SERVER_RETURN_VALUE || + state.last_download_updates_result >= FIRST_SERVER_RETURN_VALUE || + state.commit_result >= FIRST_SERVER_RETURN_VALUE; } } // namespace sessions diff --git a/sync/sessions/sync_session.h b/sync/sessions/sync_session.h index 93d7f41..4026e88 100644 --- a/sync/sessions/sync_session.h +++ b/sync/sessions/sync_session.h @@ -112,25 +112,11 @@ class SyncSession { // engine again. bool HasMoreToSync() const; - // Returns true if we completely ran the session without errors. - // - // There are many errors that could prevent a sync cycle from succeeding. - // These include invalid local state, inability to contact the server, - // inability to authenticate with the server, and server errors. What they - // have in common is that the we either need to take some action and then - // retry the sync cycle or, in the case of transient errors, retry after some - // backoff timer has expired. Most importantly, the SyncScheduler should not - // assume that the original action that triggered the sync cycle (ie. a nudge - // or a notification) has been properly serviced. - // - // This function also returns false if SyncShare has not been called on this - // session yet, or if ResetTransientState() has been called on this session - // since the last call to SyncShare. - bool Succeeded() const; - - // Returns true if we reached the server successfully and the server did not - // return any error codes. Returns false if no connection was attempted. - bool SuccessfullyReachedServer() const; + // Returns true if we reached the server. Note that "reaching the server" + // here means that from an HTTP perspective, we succeeded (HTTP 200). The + // server **MAY** have returned a sync protocol error. + // See SERVER_RETURN_* in the SyncerError enum for values. + bool DidReachServer() const; // Collects all state pertaining to how and why |s| originated and unions it // with corresponding state in |this|, leaving |s| unchanged. Allows |this| @@ -139,11 +125,13 @@ class SyncSession { // sessions. void Coalesce(const SyncSession& session); - // Compares the routing_info_, workers and payload map with the passed in - // session. Purges types from the above 3 which are not in session. Useful + // Compares the routing_info_, workers and payload map with those passed in. + // Purges types from the above 3 which are not present in latest. Useful // to update the sync session when the user has disabled some types from // syncing. - void RebaseRoutingInfoWithLatest(const SyncSession& session); + void RebaseRoutingInfoWithLatest( + const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers); // Should be called any time |this| is being re-used in a new call to // SyncShare (e.g., HasMoreToSync returned true). @@ -177,9 +165,6 @@ class SyncSession { // Returns the set of enabled groups that have conflicts. std::set<ModelSafeGroup> GetEnabledGroupsWithConflicts() const; - // Mark the session has having finished all the sync steps it needed. - void SetFinished(); - private: // Extend the encapsulation boundary to utilities for internal member // assignments. This way, the scope of these actions is explicit, they can't @@ -217,10 +202,6 @@ class SyncSession { // |routing_info_|. std::set<ModelSafeGroup> enabled_groups_; - // Whether this session has reached its last step or not. Gets reset on each - // new cycle (via PrepareForAnotherSyncCycle). - bool finished_; - DISALLOW_COPY_AND_ASSIGN(SyncSession); }; diff --git a/sync/sessions/sync_session_unittest.cc b/sync/sessions/sync_session_unittest.cc index 285f33a..e23f6f6 100644 --- a/sync/sessions/sync_session_unittest.cc +++ b/sync/sessions/sync_session_unittest.cc @@ -347,7 +347,7 @@ TEST_F(SyncSessionTest, RebaseRoutingInfoWithLatestRemoveOneType) { EXPECT_EQ(expected_enabled_groups_one, one.GetEnabledGroups()); EXPECT_EQ(expected_enabled_groups_two, two.GetEnabledGroups()); - two.RebaseRoutingInfoWithLatest(one); + two.RebaseRoutingInfoWithLatest(one.routing_info(), one.workers()); EXPECT_EQ(expected_enabled_groups_one, one.GetEnabledGroups()); EXPECT_EQ(expected_enabled_groups_one, two.GetEnabledGroups()); @@ -417,7 +417,7 @@ TEST_F(SyncSessionTest, RebaseRoutingInfoWithLatestWithSameType) { EXPECT_EQ(expected_enabled_groups, first.GetEnabledGroups()); EXPECT_EQ(expected_enabled_groups, second.GetEnabledGroups()); - second.RebaseRoutingInfoWithLatest(first); + second.RebaseRoutingInfoWithLatest(first.routing_info(), first.workers()); EXPECT_EQ(expected_enabled_groups, first.GetEnabledGroups()); EXPECT_EQ(expected_enabled_groups, second.GetEnabledGroups()); diff --git a/sync/sessions/test_util.cc b/sync/sessions/test_util.cc index 4214c3d..cef5121 100644 --- a/sync/sessions/test_util.cc +++ b/sync/sessions/test_util.cc @@ -31,6 +31,9 @@ void SimulateDownloadUpdatesFailed(sessions::SyncSession* session, void SimulateCommitFailed(sessions::SyncSession* session, SyncerStep begin, SyncerStep end) { + session->mutable_status_controller()->set_last_get_key_result(SYNCER_OK); + session->mutable_status_controller()->set_last_download_updates_result( + SYNCER_OK); session->mutable_status_controller()->set_commit_result( SERVER_RETURN_TRANSIENT_ERROR); } @@ -47,7 +50,6 @@ void SimulateSuccess(sessions::SyncSession* session, ADD_FAILURE() << "Shouldn't have more to sync"; } ASSERT_EQ(0U, session->status_controller().num_server_changes_remaining()); - session->SetFinished(); switch(end) { case SYNCER_END: session->mutable_status_controller()->set_commit_result(SYNCER_OK); @@ -66,16 +68,20 @@ void SimulateSuccess(sessions::SyncSession* session, void SimulateThrottledImpl(sessions::SyncSession* session, const base::TimeDelta& delta) { + session->mutable_status_controller()->set_last_download_updates_result( + SERVER_RETURN_THROTTLED); session->delegate()->OnSilencedUntil(base::TimeTicks::Now() + delta); } void SimulatePollIntervalUpdateImpl(sessions::SyncSession* session, const base::TimeDelta& new_poll) { + SimulateSuccess(session, SYNCER_BEGIN, SYNCER_END); session->delegate()->OnReceivedLongPollIntervalUpdate(new_poll); } void SimulateSessionsCommitDelayUpdateImpl(sessions::SyncSession* session, const base::TimeDelta& new_delay) { + SimulateSuccess(session, SYNCER_BEGIN, SYNCER_END); session->delegate()->OnReceivedSessionsCommitDelay(new_delay); } diff --git a/sync/sync.gyp b/sync/sync.gyp index be35db2..0219b37 100644 --- a/sync/sync.gyp +++ b/sync/sync.gyp @@ -118,6 +118,8 @@ 'engine/sync_scheduler.h', 'engine/sync_scheduler_impl.cc', 'engine/sync_scheduler_impl.h', + 'engine/sync_session_job.cc', + 'engine/sync_session_job.h', 'engine/syncer.cc', 'engine/syncer.h', 'engine/syncer_command.cc', @@ -606,6 +608,7 @@ 'engine/model_changing_syncer_command_unittest.cc', 'engine/process_commit_response_command_unittest.cc', 'engine/process_updates_command_unittest.cc', + 'engine/sync_session_job_unittest.cc', 'engine/sync_scheduler_unittest.cc', 'engine/sync_scheduler_whitebox_unittest.cc', 'engine/syncer_proto_util_unittest.cc', |