summaryrefslogtreecommitdiffstats
path: root/sync/engine/sync_scheduler_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sync/engine/sync_scheduler_impl.cc')
-rw-r--r--sync/engine/sync_scheduler_impl.cc154
1 files changed, 53 insertions, 101 deletions
diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc
index 5bfcbcc..7fd9ccd 100644
--- a/sync/engine/sync_scheduler_impl.cc
+++ b/sync/engine/sync_scheduler_impl.cc
@@ -217,11 +217,7 @@ void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
if (!pending.get())
return;
-
- PostTask(FROM_HERE, "DoCanaryJob",
- base::Bind(&SyncSchedulerImpl::DoCanaryJob,
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(&pending)));
+ DoCanaryJob(pending.Pass());
}
void SyncSchedulerImpl::Start(Mode mode) {
@@ -310,17 +306,13 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
// Only reconfigure if we have types to download.
if (!params.types_to_download.Empty()) {
DCHECK(!restricted_routes.empty());
- scoped_ptr<SyncSession> session(new SyncSession(
- session_context_,
- this,
- SyncSourceInfo(params.source,
- ModelSafeRoutingInfoToInvalidationMap(
- restricted_routes,
- std::string()))));
scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
SyncSessionJob::CONFIGURATION,
TimeTicks::Now(),
- session.Pass(),
+ SyncSourceInfo(params.source,
+ ModelSafeRoutingInfoToInvalidationMap(
+ restricted_routes,
+ std::string())),
params));
bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY);
@@ -389,11 +381,10 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
ModelTypeSet throttled_types =
session_context_->throttled_data_type_tracker()->GetThrottledTypes();
if (job.purpose() == SyncSessionJob::NUDGE &&
- job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
+ job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) {
ModelTypeSet requested_types;
for (ModelTypeInvalidationMap::const_iterator i =
- job.session()->source().types.begin();
- i != job.session()->source().types.end();
+ job.source_info().types.begin(); i != job.source_info().types.end();
++i) {
requested_types.Put(i->first);
}
@@ -466,12 +457,11 @@ void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
// logic in ScheduleNudgeImpl that takes the min of the two nudge start
// times, because we're calling this function first. Pull this out
// into a function to coalesce + set start times and reuse.
- pending_nudge_->mutable_session()->CoalesceSources(
- job->session()->source());
+ pending_nudge_->CoalesceSources(job->source_info());
return;
}
- scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon();
+ scoped_ptr<SyncSessionJob> job_to_save = job->Clone();
if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
// This job should be made the new canary.
if (is_nudge) {
@@ -486,7 +476,7 @@ void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
DCHECK(!unscheduled_nudge_storage_.get());
if (pending_nudge_) {
// Pre-empt the nudge canary and abandon the old nudge (owned by task).
- unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon();
+ unscheduled_nudge_storage_ = pending_nudge_->Clone();
pending_nudge_ = unscheduled_nudge_storage_.get();
}
wait_interval_->pending_configure_job = job_to_save.get();
@@ -589,12 +579,11 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
SyncSessionJob::NUDGE,
TimeTicks::Now() + delay,
- CreateSyncSession(info).Pass(),
+ info,
ConfigurationParams()));
JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY);
SDVLOG(2) << "Should run "
<< SyncSessionJob::GetPurposeString(job->purpose())
- << " job " << job->session()
<< " in mode " << GetModeString(mode_)
<< ": " << GetDecisionString(decision);
if (decision != CONTINUE) {
@@ -609,8 +598,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
if (pending_nudge_) {
SDVLOG(2) << "Rescheduling pending nudge";
- pending_nudge_->mutable_session()->CoalesceSources(
- job->session()->source());
+ pending_nudge_->CoalesceSources(job->source_info());
// Choose the start time as the earliest of the 2. Note that this means
// if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
// but a nudge is already scheduled to go out, we'll send the (tab) commit
@@ -621,7 +609,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
// It's possible that by "rescheduling" we're actually taking a job that
// was previously unscheduled and giving it wings, so take care to reset
// unscheduled nudge storage.
- job = pending_nudge_->CloneAndAbandon();
+ job = pending_nudge_->Clone();
pending_nudge_ = NULL;
unscheduled_nudge_storage_.reset();
// It's also possible we took a canary job, since we allow one nudge
@@ -663,18 +651,6 @@ const char* SyncSchedulerImpl::GetDecisionString(
return "";
}
-void SyncSchedulerImpl::PostTask(
- const tracked_objects::Location& from_here,
- const char* name, const base::Closure& task) {
- SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (!started_) {
- SDVLOG(1) << "Not posting task as scheduler is stopped.";
- return;
- }
- sync_loop_->PostTask(from_here, task);
-}
-
void SyncSchedulerImpl::PostDelayedTask(
const tracked_objects::Location& from_here,
const char* name, const base::Closure& task, base::TimeDelta delay) {
@@ -685,35 +661,24 @@ void SyncSchedulerImpl::PostDelayedTask(
SDVLOG(1) << "Not posting task as scheduler is stopped.";
return;
}
- sync_loop_->PostDelayedTask(from_here, task, delay);
+ // This cancels the previous task, if one existed.
+ pending_wakeup_.Reset(task);
+ sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay);
}
bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
JobPriority priority) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
if (job->purpose() == SyncSessionJob::NUDGE) {
- if (pending_nudge_ == NULL ||
- pending_nudge_->session() != job->session()) {
- // |job| is abandoned.
- SDVLOG(2) << "Dropping a nudge in "
- << "DoSyncSessionJob because another nudge was scheduled";
- return false;
- }
pending_nudge_ = NULL;
}
- if (!job->session()) {
- SDVLOG(2) << "Dropping abandoned job";
- return false; // Fix for crbug.com/190085.
- }
-
base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
JobProcessDecision decision = DecideOnJob(*job, priority);
SDVLOG(2) << "Should run "
<< SyncSessionJob::GetPurposeString(job->purpose())
- << " job " << job->session()
<< " in mode " << GetModeString(mode_)
- << " with source " << job->session()->source().updates_source
+ << " with source " << job->source_info().updates_source
<< ": " << GetDecisionString(decision);
if (decision != CONTINUE) {
if (decision == SAVE) {
@@ -724,14 +689,18 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
return false;
}
- SDVLOG(2) << "Calling SyncShare with "
- << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
- bool premature_exit = !syncer_->SyncShare(job->mutable_session(),
+ DVLOG(2) << "Creating sync session with routes "
+ << ModelSafeRoutingInfoToString(session_context_->routing_info())
+ << "and purpose " << job->purpose();
+ SyncSession session(session_context_, this, job->source_info());
+ bool premature_exit = !syncer_->SyncShare(&session,
job->start_step(),
job->end_step());
SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
- bool success = FinishSyncSessionJob(job.get(), premature_exit);
+ bool success = FinishSyncSessionJob(job.get(),
+ premature_exit,
+ &session);
if (IsSyncingCurrentlySilenced()) {
SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
@@ -751,7 +720,7 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
}
if (!success)
- ScheduleNextSync(job.Pass());
+ ScheduleNextSync(job.Pass(), &session);
return success;
}
@@ -786,14 +755,15 @@ void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
if (!ShouldPoll())
return;
- SDVLOG(2) << "Calling SyncShare with "
- << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
- bool premature_exit = !syncer_->SyncShare(job->mutable_session(),
+ DVLOG(2) << "Polling with routes "
+ << ModelSafeRoutingInfoToString(session_context_->routing_info());
+ SyncSession session(session_context_, this, job->source_info());
+ bool premature_exit = !syncer_->SyncShare(&session,
job->start_step(),
job->end_step());
SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
- FinishSyncSessionJob(job.get(), premature_exit);
+ FinishSyncSessionJob(job.get(), premature_exit, &session);
if (IsSyncingCurrentlySilenced()) {
// This will start the countdown to unthrottle. Other kinds of jobs would
@@ -831,14 +801,15 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
}
bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
- bool exited_prematurely) {
+ bool exited_prematurely,
+ SyncSession* session) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
// Let job know that we're through syncing (calling SyncShare) at this point.
bool succeeded = false;
{
base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
- succeeded = job->Finish(exited_prematurely);
+ succeeded = job->Finish(exited_prematurely, session);
}
SDVLOG(2) << "Updating the next polling time after SyncMain";
@@ -858,7 +829,8 @@ bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
}
void SyncSchedulerImpl::ScheduleNextSync(
- scoped_ptr<SyncSessionJob> finished_job) {
+ scoped_ptr<SyncSessionJob> finished_job,
+ SyncSession* session) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION
|| finished_job->purpose() == SyncSessionJob::NUDGE);
@@ -891,7 +863,7 @@ void SyncSchedulerImpl::ScheduleNextSync(
// Either this is the first failure or a consecutive failure after our
// backoff timer expired. We handle it the same way in either case.
SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
- HandleContinuationError(finished_job.Pass());
+ HandleContinuationError(finished_job.Pass(), session);
}
}
@@ -921,26 +893,28 @@ void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
wait_interval_->timer.Stop();
DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
if (wait_interval_->mode == WaitInterval::THROTTLED) {
- wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
- base::Bind(&SyncSchedulerImpl::Unthrottle,
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(&job)));
+ pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle,
+ weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(&job)));
+
} else {
- wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
- base::Bind(&SyncSchedulerImpl::DoCanaryJob,
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(&job)));
+ pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob,
+ weak_ptr_factory_.GetWeakPtr(),
+ base::Passed(&job)));
}
+ wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
+ pending_wakeup_.callback());
}
void SyncSchedulerImpl::HandleContinuationError(
- scoped_ptr<SyncSessionJob> old_job) {
+ scoped_ptr<SyncSessionJob> old_job,
+ SyncSession* session) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
TimeDelta length = delay_provider_->GetDelay(
IsBackingOff() ? wait_interval_->length :
delay_provider_->GetInitialDelay(
- old_job->session()->status_controller().model_neutral_state()));
+ session->status_controller().model_neutral_state()));
SDVLOG(2) << "In handle continuation error with "
<< SyncSessionJob::GetPurposeString(old_job->purpose())
@@ -989,6 +963,7 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
poll_timer_.Stop();
pending_nudge_ = NULL;
unscheduled_nudge_storage_.reset();
+ pending_wakeup_.Cancel();
if (started_) {
started_ = false;
}
@@ -1000,18 +975,6 @@ void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
SDVLOG(2) << "Do canary job";
- if (to_be_canary->purpose() == SyncSessionJob::NUDGE) {
- // TODO(tim): Bug 158313. Remove this check.
- if (pending_nudge_ == NULL ||
- pending_nudge_->session() != to_be_canary->session()) {
- // |job| is abandoned.
- SDVLOG(2) << "Dropping a nudge in "
- << "DoCanaryJob because another nudge was scheduled";
- return;
- }
- DCHECK_EQ(pending_nudge_->session(), to_be_canary->session());
- }
-
// This is the only place where we invoke DoSyncSessionJob with canary
// privileges. Everyone else should use NORMAL_PRIORITY.
DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY);
@@ -1026,11 +989,11 @@ scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
&& wait_interval_->pending_configure_job) {
SDVLOG(2) << "Found pending configure job";
candidate =
- wait_interval_->pending_configure_job->CloneAndAbandon().Pass();
+ wait_interval_->pending_configure_job->Clone().Pass();
wait_interval_->pending_configure_job = candidate.get();
} else if (mode_ == NORMAL_MODE && pending_nudge_) {
SDVLOG(2) << "Found pending nudge job";
- candidate = pending_nudge_->CloneAndAbandon();
+ candidate = pending_nudge_->Clone();
pending_nudge_ = candidate.get();
unscheduled_nudge_storage_.reset();
}
@@ -1040,26 +1003,15 @@ scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
return candidate.Pass();
}
-scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession(
- const SyncSourceInfo& source) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- DVLOG(2) << "Creating sync session with routes "
- << ModelSafeRoutingInfoToString(session_context_->routing_info());
-
- SyncSourceInfo info(source);
- return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info));
-}
-
void SyncSchedulerImpl::PollTimerCallback() {
DCHECK_EQ(MessageLoop::current(), sync_loop_);
ModelSafeRoutingInfo r;
ModelTypeInvalidationMap invalidation_map =
ModelSafeRoutingInfoToInvalidationMap(r, std::string());
SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
- scoped_ptr<SyncSession> s(CreateSyncSession(info));
scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
TimeTicks::Now(),
- s.Pass(),
+ info,
ConfigurationParams()));
if (no_scheduling_allowed_) {
// The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in