diff options
Diffstat (limited to 'sync/engine/sync_scheduler_impl.cc')
-rw-r--r-- | sync/engine/sync_scheduler_impl.cc | 154 |
1 files changed, 53 insertions, 101 deletions
diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc index 5bfcbcc..7fd9ccd 100644 --- a/sync/engine/sync_scheduler_impl.cc +++ b/sync/engine/sync_scheduler_impl.cc @@ -217,11 +217,7 @@ void SyncSchedulerImpl::OnServerConnectionErrorFixed() { scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); if (!pending.get()) return; - - PostTask(FROM_HERE, "DoCanaryJob", - base::Bind(&SyncSchedulerImpl::DoCanaryJob, - weak_ptr_factory_.GetWeakPtr(), - base::Passed(&pending))); + DoCanaryJob(pending.Pass()); } void SyncSchedulerImpl::Start(Mode mode) { @@ -310,17 +306,13 @@ bool SyncSchedulerImpl::ScheduleConfiguration( // Only reconfigure if we have types to download. if (!params.types_to_download.Empty()) { DCHECK(!restricted_routes.empty()); - scoped_ptr<SyncSession> session(new SyncSession( - session_context_, - this, - SyncSourceInfo(params.source, - ModelSafeRoutingInfoToInvalidationMap( - restricted_routes, - std::string())))); scoped_ptr<SyncSessionJob> job(new SyncSessionJob( SyncSessionJob::CONFIGURATION, TimeTicks::Now(), - session.Pass(), + SyncSourceInfo(params.source, + ModelSafeRoutingInfoToInvalidationMap( + restricted_routes, + std::string())), params)); bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); @@ -389,11 +381,10 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( ModelTypeSet throttled_types = session_context_->throttled_data_type_tracker()->GetThrottledTypes(); if (job.purpose() == SyncSessionJob::NUDGE && - job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { + job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) { ModelTypeSet requested_types; for (ModelTypeInvalidationMap::const_iterator i = - job.session()->source().types.begin(); - i != job.session()->source().types.end(); + job.source_info().types.begin(); i != job.source_info().types.end(); ++i) { requested_types.Put(i->first); } @@ -466,12 +457,11 @@ void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { // logic in ScheduleNudgeImpl that takes the min of the two nudge start // times, because we're calling this function first. Pull this out // into a function to coalesce + set start times and reuse. - pending_nudge_->mutable_session()->CoalesceSources( - job->session()->source()); + pending_nudge_->CoalesceSources(job->source_info()); return; } - scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); + scoped_ptr<SyncSessionJob> job_to_save = job->Clone(); if (wait_interval_.get() && !wait_interval_->pending_configure_job) { // This job should be made the new canary. if (is_nudge) { @@ -486,7 +476,7 @@ void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { DCHECK(!unscheduled_nudge_storage_.get()); if (pending_nudge_) { // Pre-empt the nudge canary and abandon the old nudge (owned by task). - unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); + unscheduled_nudge_storage_ = pending_nudge_->Clone(); pending_nudge_ = unscheduled_nudge_storage_.get(); } wait_interval_->pending_configure_job = job_to_save.get(); @@ -589,12 +579,11 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( scoped_ptr<SyncSessionJob> job(new SyncSessionJob( SyncSessionJob::NUDGE, TimeTicks::Now() + delay, - CreateSyncSession(info).Pass(), + info, ConfigurationParams())); JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); SDVLOG(2) << "Should run " << SyncSessionJob::GetPurposeString(job->purpose()) - << " job " << job->session() << " in mode " << GetModeString(mode_) << ": " << GetDecisionString(decision); if (decision != CONTINUE) { @@ -609,8 +598,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( if (pending_nudge_) { SDVLOG(2) << "Rescheduling pending nudge"; - pending_nudge_->mutable_session()->CoalesceSources( - job->session()->source()); + pending_nudge_->CoalesceSources(job->source_info()); // Choose the start time as the earliest of the 2. Note that this means // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) // but a nudge is already scheduled to go out, we'll send the (tab) commit @@ -621,7 +609,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl( // It's possible that by "rescheduling" we're actually taking a job that // was previously unscheduled and giving it wings, so take care to reset // unscheduled nudge storage. - job = pending_nudge_->CloneAndAbandon(); + job = pending_nudge_->Clone(); pending_nudge_ = NULL; unscheduled_nudge_storage_.reset(); // It's also possible we took a canary job, since we allow one nudge @@ -663,18 +651,6 @@ const char* SyncSchedulerImpl::GetDecisionString( return ""; } -void SyncSchedulerImpl::PostTask( - const tracked_objects::Location& from_here, - const char* name, const base::Closure& task) { - SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; - DCHECK_EQ(MessageLoop::current(), sync_loop_); - if (!started_) { - SDVLOG(1) << "Not posting task as scheduler is stopped."; - return; - } - sync_loop_->PostTask(from_here, task); -} - void SyncSchedulerImpl::PostDelayedTask( const tracked_objects::Location& from_here, const char* name, const base::Closure& task, base::TimeDelta delay) { @@ -685,35 +661,24 @@ void SyncSchedulerImpl::PostDelayedTask( SDVLOG(1) << "Not posting task as scheduler is stopped."; return; } - sync_loop_->PostDelayedTask(from_here, task, delay); + // This cancels the previous task, if one existed. + pending_wakeup_.Reset(task); + sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); } bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, JobPriority priority) { DCHECK_EQ(MessageLoop::current(), sync_loop_); if (job->purpose() == SyncSessionJob::NUDGE) { - if (pending_nudge_ == NULL || - pending_nudge_->session() != job->session()) { - // |job| is abandoned. - SDVLOG(2) << "Dropping a nudge in " - << "DoSyncSessionJob because another nudge was scheduled"; - return false; - } pending_nudge_ = NULL; } - if (!job->session()) { - SDVLOG(2) << "Dropping abandoned job"; - return false; // Fix for crbug.com/190085. - } - base::AutoReset<bool> protector(&no_scheduling_allowed_, true); JobProcessDecision decision = DecideOnJob(*job, priority); SDVLOG(2) << "Should run " << SyncSessionJob::GetPurposeString(job->purpose()) - << " job " << job->session() << " in mode " << GetModeString(mode_) - << " with source " << job->session()->source().updates_source + << " with source " << job->source_info().updates_source << ": " << GetDecisionString(decision); if (decision != CONTINUE) { if (decision == SAVE) { @@ -724,14 +689,18 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, return false; } - SDVLOG(2) << "Calling SyncShare with " - << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; - bool premature_exit = !syncer_->SyncShare(job->mutable_session(), + DVLOG(2) << "Creating sync session with routes " + << ModelSafeRoutingInfoToString(session_context_->routing_info()) + << "and purpose " << job->purpose(); + SyncSession session(session_context_, this, job->source_info()); + bool premature_exit = !syncer_->SyncShare(&session, job->start_step(), job->end_step()); SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; - bool success = FinishSyncSessionJob(job.get(), premature_exit); + bool success = FinishSyncSessionJob(job.get(), + premature_exit, + &session); if (IsSyncingCurrentlySilenced()) { SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; @@ -751,7 +720,7 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, } if (!success) - ScheduleNextSync(job.Pass()); + ScheduleNextSync(job.Pass(), &session); return success; } @@ -786,14 +755,15 @@ void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { if (!ShouldPoll()) return; - SDVLOG(2) << "Calling SyncShare with " - << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; - bool premature_exit = !syncer_->SyncShare(job->mutable_session(), + DVLOG(2) << "Polling with routes " + << ModelSafeRoutingInfoToString(session_context_->routing_info()); + SyncSession session(session_context_, this, job->source_info()); + bool premature_exit = !syncer_->SyncShare(&session, job->start_step(), job->end_step()); SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; - FinishSyncSessionJob(job.get(), premature_exit); + FinishSyncSessionJob(job.get(), premature_exit, &session); if (IsSyncingCurrentlySilenced()) { // This will start the countdown to unthrottle. Other kinds of jobs would @@ -831,14 +801,15 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { } bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, - bool exited_prematurely) { + bool exited_prematurely, + SyncSession* session) { DCHECK_EQ(MessageLoop::current(), sync_loop_); // Let job know that we're through syncing (calling SyncShare) at this point. bool succeeded = false; { base::AutoReset<bool> protector(&no_scheduling_allowed_, true); - succeeded = job->Finish(exited_prematurely); + succeeded = job->Finish(exited_prematurely, session); } SDVLOG(2) << "Updating the next polling time after SyncMain"; @@ -858,7 +829,8 @@ bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, } void SyncSchedulerImpl::ScheduleNextSync( - scoped_ptr<SyncSessionJob> finished_job) { + scoped_ptr<SyncSessionJob> finished_job, + SyncSession* session) { DCHECK_EQ(MessageLoop::current(), sync_loop_); DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION || finished_job->purpose() == SyncSessionJob::NUDGE); @@ -891,7 +863,7 @@ void SyncSchedulerImpl::ScheduleNextSync( // Either this is the first failure or a consecutive failure after our // backoff timer expired. We handle it the same way in either case. SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; - HandleContinuationError(finished_job.Pass()); + HandleContinuationError(finished_job.Pass(), session); } } @@ -921,26 +893,28 @@ void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { wait_interval_->timer.Stop(); DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); if (wait_interval_->mode == WaitInterval::THROTTLED) { - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, - base::Bind(&SyncSchedulerImpl::Unthrottle, - weak_ptr_factory_.GetWeakPtr(), - base::Passed(&job))); + pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, + weak_ptr_factory_.GetWeakPtr(), + base::Passed(&job))); + } else { - wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, - base::Bind(&SyncSchedulerImpl::DoCanaryJob, - weak_ptr_factory_.GetWeakPtr(), - base::Passed(&job))); + pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, + weak_ptr_factory_.GetWeakPtr(), + base::Passed(&job))); } + wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, + pending_wakeup_.callback()); } void SyncSchedulerImpl::HandleContinuationError( - scoped_ptr<SyncSessionJob> old_job) { + scoped_ptr<SyncSessionJob> old_job, + SyncSession* session) { DCHECK_EQ(MessageLoop::current(), sync_loop_); TimeDelta length = delay_provider_->GetDelay( IsBackingOff() ? wait_interval_->length : delay_provider_->GetInitialDelay( - old_job->session()->status_controller().model_neutral_state())); + session->status_controller().model_neutral_state())); SDVLOG(2) << "In handle continuation error with " << SyncSessionJob::GetPurposeString(old_job->purpose()) @@ -989,6 +963,7 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { poll_timer_.Stop(); pending_nudge_ = NULL; unscheduled_nudge_storage_.reset(); + pending_wakeup_.Cancel(); if (started_) { started_ = false; } @@ -1000,18 +975,6 @@ void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { DCHECK_EQ(MessageLoop::current(), sync_loop_); SDVLOG(2) << "Do canary job"; - if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { - // TODO(tim): Bug 158313. Remove this check. - if (pending_nudge_ == NULL || - pending_nudge_->session() != to_be_canary->session()) { - // |job| is abandoned. - SDVLOG(2) << "Dropping a nudge in " - << "DoCanaryJob because another nudge was scheduled"; - return; - } - DCHECK_EQ(pending_nudge_->session(), to_be_canary->session()); - } - // This is the only place where we invoke DoSyncSessionJob with canary // privileges. Everyone else should use NORMAL_PRIORITY. DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); @@ -1026,11 +989,11 @@ scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { && wait_interval_->pending_configure_job) { SDVLOG(2) << "Found pending configure job"; candidate = - wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); + wait_interval_->pending_configure_job->Clone().Pass(); wait_interval_->pending_configure_job = candidate.get(); } else if (mode_ == NORMAL_MODE && pending_nudge_) { SDVLOG(2) << "Found pending nudge job"; - candidate = pending_nudge_->CloneAndAbandon(); + candidate = pending_nudge_->Clone(); pending_nudge_ = candidate.get(); unscheduled_nudge_storage_.reset(); } @@ -1040,26 +1003,15 @@ scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { return candidate.Pass(); } -scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( - const SyncSourceInfo& source) { - DCHECK_EQ(MessageLoop::current(), sync_loop_); - DVLOG(2) << "Creating sync session with routes " - << ModelSafeRoutingInfoToString(session_context_->routing_info()); - - SyncSourceInfo info(source); - return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info)); -} - void SyncSchedulerImpl::PollTimerCallback() { DCHECK_EQ(MessageLoop::current(), sync_loop_); ModelSafeRoutingInfo r; ModelTypeInvalidationMap invalidation_map = ModelSafeRoutingInfoToInvalidationMap(r, std::string()); SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); - scoped_ptr<SyncSession> s(CreateSyncSession(info)); scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, TimeTicks::Now(), - s.Pass(), + info, ConfigurationParams())); if (no_scheduling_allowed_) { // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in |