diff options
-rw-r--r-- | chrome/browser/sync/engine/mock_model_safe_workers.cc | 15 | ||||
-rw-r--r-- | chrome/browser/sync/engine/mock_model_safe_workers.h | 2 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2.cc | 510 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2.h | 157 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2_unittest.cc | 163 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc | 235 | ||||
-rw-r--r-- | chrome/browser/sync/glue/sync_backend_host.cc | 11 | ||||
-rw-r--r-- | chrome/browser/sync/glue/sync_backend_host.h | 3 | ||||
-rw-r--r-- | chrome/browser/sync/profile_sync_service.cc | 1 | ||||
-rw-r--r-- | chrome/browser/sync/sessions/sync_session.cc | 15 | ||||
-rw-r--r-- | chrome/chrome_tests.gypi | 1 |
11 files changed, 905 insertions, 208 deletions
diff --git a/chrome/browser/sync/engine/mock_model_safe_workers.cc b/chrome/browser/sync/engine/mock_model_safe_workers.cc index bd6a26b..77ed4e9 100644 --- a/chrome/browser/sync/engine/mock_model_safe_workers.cc +++ b/chrome/browser/sync/engine/mock_model_safe_workers.cc @@ -24,6 +24,21 @@ MockModelSafeWorkerRegistrar* return m; } +MockModelSafeWorkerRegistrar* MockModelSafeWorkerRegistrar::PassiveForTypes( + const syncable::ModelTypeBitSet& set) { + ModelSafeRoutingInfo routes; + for (int i = syncable::UNSPECIFIED ; i < syncable::MODEL_TYPE_COUNT; ++i) { + syncable::ModelType type = syncable::ModelTypeFromInt(i); + if (set[type]) { + routes[type] = GROUP_PASSIVE; + } + } + MockModelSafeWorkerRegistrar* m = new MockModelSafeWorkerRegistrar(routes); + m->passive_worker_ = new ModelSafeWorker(); + return m; +} + + void MockModelSafeWorkerRegistrar::GetWorkers( std::vector<ModelSafeWorker*>* out) { if (passive_worker_.get()) diff --git a/chrome/browser/sync/engine/mock_model_safe_workers.h b/chrome/browser/sync/engine/mock_model_safe_workers.h index e2eb6cc..04f4adf 100644 --- a/chrome/browser/sync/engine/mock_model_safe_workers.h +++ b/chrome/browser/sync/engine/mock_model_safe_workers.h @@ -30,6 +30,8 @@ class MockModelSafeWorkerRegistrar : public ModelSafeWorkerRegistrar { public: virtual ~MockModelSafeWorkerRegistrar(); static MockModelSafeWorkerRegistrar* PassiveBookmarks(); + static MockModelSafeWorkerRegistrar* PassiveForTypes( + const syncable::ModelTypeBitSet& set); virtual void GetWorkers(std::vector<ModelSafeWorker*>* out); virtual void GetModelSafeRoutingInfo(ModelSafeRoutingInfo* out); diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc index e94215e..b8bd9a9 100644 --- a/chrome/browser/sync/engine/syncer_thread2.cc +++ b/chrome/browser/sync/engine/syncer_thread2.cc @@ -23,45 +23,47 @@ using sync_pb::GetUpdatesCallerInfo; namespace s3 { -struct SyncerThread::WaitInterval { - enum Mode { - // A wait interval whose duration has been affected by exponential - // backoff. - // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. - EXPONENTIAL_BACKOFF, - // A server-initiated throttled interval. We do not allow any syncing - // during such an interval. - THROTTLED, - }; - Mode mode; - - // This bool is set to true if we have observed a nudge during this - // interval and mode == EXPONENTIAL_BACKOFF. - bool had_nudge; - base::TimeDelta length; - base::OneShotTimer<SyncerThread> timer; - WaitInterval(Mode mode, base::TimeDelta length); -}; +SyncerThread::DelayProvider::DelayProvider() {} +SyncerThread::DelayProvider::~DelayProvider() {} -struct SyncerThread::SyncSessionJob { - SyncSessionJobPurpose purpose; - base::TimeTicks scheduled_start; - linked_ptr<sessions::SyncSession> session; +SyncerThread::WaitInterval::WaitInterval() {} +SyncerThread::WaitInterval::~WaitInterval() {} - // This is the location the nudge came from. used for debugging purpose. - // In case of multiple nudges getting coalesced this stores the first nudge - // that came in. - tracked_objects::Location nudge_location; -}; +SyncerThread::SyncSessionJob::SyncSessionJob() {} +SyncerThread::SyncSessionJob::~SyncSessionJob() {} -SyncerThread::DelayProvider::DelayProvider() {} -SyncerThread::DelayProvider::~DelayProvider() {} +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, + base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location) : purpose(purpose), + scheduled_start(start), + session(session), + is_canary_job(is_canary_job), + nudge_location(nudge_location) { +} TimeDelta SyncerThread::DelayProvider::GetDelay( const base::TimeDelta& last_delay) { return SyncerThread::GetRecommendedDelay(last_delay); } +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_UNKNOWN: + return GetUpdatesCallerInfo::UNKNOWN; + default: + NOTREACHED(); + return GetUpdatesCallerInfo::UNKNOWN; + } +} + SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) : mode(mode), had_nudge(false), length(length) { } @@ -85,6 +87,9 @@ SyncerThread::~SyncerThread() { void SyncerThread::CheckServerConnectionManagerStatus( HttpResponse::ServerConnectionCode code) { + + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << "Old mode: " << server_connection_ok_ << " Code: " << code; // Note, be careful when adding cases here because if the SyncerThread // thinks there is no valid connection as determined by this method, it // will drop out of *all* forward progress sync loops (it won't poll and it @@ -94,13 +99,22 @@ void SyncerThread::CheckServerConnectionManagerStatus( if (HttpResponse::CONNECTION_UNAVAILABLE == code || HttpResponse::SYNC_AUTH_ERROR == code) { server_connection_ok_ = false; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; } else if (HttpResponse::SERVER_CONNECTION_OK == code) { server_connection_ok_ = true; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; + DoCanaryJob(); } } void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread " + << MessageLoop::current()->thread_name(); if (!thread_.IsRunning()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode " + << mode; if (!thread_.Start()) { NOTREACHED() << "Unable to start SyncerThread."; return; @@ -110,6 +124,9 @@ void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { this, &SyncerThread::SendInitialSnapshot)); } + VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = " + << mode; + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); } @@ -133,6 +150,8 @@ void SyncerThread::WatchConnectionManager() { void SyncerThread::StartImpl(Mode mode, linked_ptr<ModeChangeCallback> callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " + << mode; DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); DCHECK(!session_context_->account_name().empty()); DCHECK(syncer_.get()); @@ -140,75 +159,135 @@ void SyncerThread::StartImpl(Mode mode, AdjustPolling(NULL); // Will kick start poll timer if needed. if (callback.get()) callback->Run(); + + // We just changed our mode. See if there are any pending jobs that we could + // execute in the new mode. + DoPendingJobIfPossible(false); } -bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, - const TimeTicks& scheduled_start) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( + const SyncSessionJob& job) { - // Check wait interval. - if (wait_interval_.get()) { - // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit - // when throttled). - if (wait_interval_->mode == WaitInterval::THROTTLED) - return false; + DCHECK(wait_interval_.get()); + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); - DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); - if ((purpose != NUDGE) || wait_interval_->had_nudge) - return false; - } + VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : " + << wait_interval_->mode << "Wait interval had nudge : " + << wait_interval_->had_nudge << "is canary job : " + << job.is_canary_job; - // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that - // were intended for a normal sync if we are in configuration mode, and vice - // versa. - switch (mode_) { - case CONFIGURATION_MODE: - if (purpose != CONFIGURATION) - return false; - break; - case NORMAL_MODE: - if (purpose == CONFIGURATION) - return false; - break; - default: - NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; - return false; + if (job.purpose == SyncSessionJob::POLL) + return DROP; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || + job.purpose == SyncSessionJob::CONFIGURATION); + if (wait_interval_->mode == WaitInterval::THROTTLED) + return SAVE; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + if (job.purpose == SyncSessionJob::NUDGE) { + if (mode_ == CONFIGURATION_MODE) + return SAVE; + + // If we already had one nudge then just drop this nudge. We will retry + // later when the timer runs out. + return wait_interval_->had_nudge ? DROP : CONTINUE; } + // This is a config job. + return job.is_canary_job ? CONTINUE : SAVE; +} + +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( + const SyncSessionJob& job) { + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) + return CONTINUE; - // Continuation NUDGE tasks have priority over POLLs because they are the - // only tasks that trigger exponential backoff, so this prevents them from - // being starved from running (e.g. due to a very, very low poll interval, - // such as 0ms). It's rare that this would ever matter in practice. - if (purpose == POLL && (pending_nudge_.get() && - pending_nudge_->session->source().updates_source == - GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { - return false; + if (wait_interval_.get()) + return DecideWhileInWaitInterval(job); + + if (mode_ == CONFIGURATION_MODE) { + if (job.purpose == SyncSessionJob::NUDGE) + return SAVE; + else if (job.purpose == SyncSessionJob::CONFIGURATION) + return CONTINUE; + else + return DROP; } - // Freshness condition. - if (purpose == NUDGE && - (scheduled_start < last_sync_session_end_time_)) { - return false; + // We are in normal mode. + DCHECK_EQ(mode_, NORMAL_MODE); + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); + + // Freshness condition + if (job.scheduled_start < last_sync_session_end_time_) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Dropping job because of freshness"; + return DROP; } - return server_connection_ok_; + if (server_connection_ok_) + return CONTINUE; + + VLOG(2) << "SyncerThread(" << this << ")" + << " Bad server connection. Using that to decide on job."; + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; } -GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( - NudgeSource source) { - switch (source) { - case NUDGE_SOURCE_NOTIFICATION: - return GetUpdatesCallerInfo::NOTIFICATION; - case NUDGE_SOURCE_LOCAL: - return GetUpdatesCallerInfo::LOCAL; - case NUDGE_SOURCE_CONTINUATION: - return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; - case NUDGE_SOURCE_UNKNOWN: - return GetUpdatesCallerInfo::UNKNOWN; - default: - NOTREACHED(); - return GetUpdatesCallerInfo::UNKNOWN; +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); + if (pending_nudge_.get() == NULL) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Creating a pending nudge job"; + SyncSession* s = job.session.get(); + scoped_ptr<SyncSession> session(new SyncSession(s->context(), + s->delegate(), s->source(), s->routing_info(), s->workers())); + + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, + make_linked_ptr(session.release()), false, job.nudge_location); + pending_nudge_.reset(new SyncSessionJob(new_job)); + + return; } + + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + pending_nudge_->scheduled_start = job.scheduled_start; + + // Unfortunately the nudge location cannot be modified. So it stores the + // location of the first caller. +} + +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { + JobProcessDecision decision = DecideOnJob(job); + VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: " + << decision << " Job purpose " << job.purpose << "mode " << mode_; + if (decision != SAVE) + return decision == CONTINUE; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == + SyncSessionJob::CONFIGURATION); + + SaveJob(job); + return false; +} + +void SyncerThread::SaveJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); + if (job.purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job"; + InitOrCoalescePendingJob(job); + } else if (job.purpose == SyncSessionJob::CONFIGURATION){ + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job"; + DCHECK(wait_interval_.get()); + DCHECK(mode_ == CONFIGURATION_MODE); + + SyncSession* old = job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + SyncSessionJob new_job(job.purpose, TimeTicks::Now(), + make_linked_ptr(s), false, job.nudge_location); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); + } // drop the rest. } // Functor for std::find_if to search by ModelSafeGroup. @@ -237,11 +316,14 @@ void SyncerThread::ScheduleNudge(const TimeDelta& delay, return; } + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled"; + ModelTypePayloadMap types_with_payloads = syncable::ModelTypePayloadMapFromBitSet(types, std::string()); thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleNudgeImpl, delay, source, - types_with_payloads, nudge_location)); + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); } void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, @@ -252,9 +334,12 @@ void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, return; } + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleNudgeImpl, delay, source, - types_with_payloads, nudge_location)); + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); } void SyncerThread::ScheduleClearUserDataImpl() { @@ -262,51 +347,58 @@ void SyncerThread::ScheduleClearUserDataImpl() { SyncSession* session = new SyncSession(session_context_.get(), this, SyncSourceInfo(), ModelSafeRoutingInfo(), std::vector<ModelSafeWorker*>()); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session, - FROM_HERE); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); } void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, - NudgeSource source, const ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location) { + GetUpdatesCallerInfo::GetUpdatesSource source, + const ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - TimeTicks rough_start = TimeTicks::Now() + delay; - if (!ShouldRunJob(NUDGE, rough_start)) { - LOG(WARNING) << "Dropping nudge at scheduling time, source = " - << source; - return; - } + VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; // Note we currently nudge for all types regardless of the ones incurring // the nudge. Doing different would throw off some syncer commands like // CleanupDisabledTypes. We may want to change this in the future. - ModelSafeRoutingInfo routes; - std::vector<ModelSafeWorker*> workers; - session_context_->registrar()->GetModelSafeRoutingInfo(&routes); - session_context_->registrar()->GetWorkers(&workers); - SyncSourceInfo info(GetUpdatesFromNudgeSource(source), - types_with_payloads); + SyncSourceInfo info(source, types_with_payloads); - scoped_ptr<SyncSession> session(new SyncSession( - session_context_.get(), this, info, routes, workers)); + SyncSession* session(CreateSyncSession(info)); + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, + make_linked_ptr(session), is_canary_job, + nudge_location); + + session = NULL; + if (!ShouldRunJob(job)) + return; if (pending_nudge_.get()) { - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because" + << "we are in backoff"; return; + } - pending_nudge_->session->Coalesce(*session.get()); + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); if (!IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because" + << " we are not in backoff and the job was coalesced"; return; } else { - // Re-schedule the current pending nudge. + VLOG(2) << "SyncerThread(" << this << ")" + << " Rescheduling pending nudge"; SyncSession* s = pending_nudge_->session.get(); - session.reset(new SyncSession(s->context(), s->delegate(), s->source(), - s->routing_info(), s->workers())); + job.session.reset(new SyncSession(s->context(), s->delegate(), + s->source(), s->routing_info(), s->workers())); pending_nudge_.reset(); } } - ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location); + + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), + nudge_location); } // Helper to extract the routing info and workers corresponding to types in @@ -348,58 +440,69 @@ void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { return; } + VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config"; ModelSafeRoutingInfo routes; std::vector<ModelSafeWorker*> workers; GetModelSafeParamsForTypes(types, session_context_->registrar(), &routes, &workers); thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleConfigImpl, routes, workers)); + this, &SyncerThread::ScheduleConfigImpl, routes, workers, + GetUpdatesCallerInfo::FIRST_UPDATE)); } void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, - const std::vector<ModelSafeWorker*>& workers) { + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; // TODO(tim): config-specific GetUpdatesCallerInfo value? SyncSession* session = new SyncSession(session_context_.get(), this, - SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE, + SyncSourceInfo(source, syncable::ModelTypePayloadMapFromRoutingInfo( routing_info, std::string())), routing_info, workers); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session, - FROM_HERE); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CONFIGURATION, session, FROM_HERE); } void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, - SyncSessionJobPurpose purpose, sessions::SyncSession* session, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - SyncSessionJob job = {purpose, TimeTicks::Now() + delay, - make_linked_ptr(session), nudge_location}; - if (purpose == NUDGE) { + SyncSessionJob job(purpose, TimeTicks::Now() + delay, + make_linked_ptr(session), false, nudge_location); + if (purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" + << " ScheduleSyncSessionJob"; DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); pending_nudge_.reset(new SyncSessionJob(job)); } + VLOG(2) << "SyncerThread(" << this << ")" + << " Posting job to execute in DoSyncSessionJob. Job purpose " + << job.purpose; MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); } -void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, +void SyncerThread::SetSyncerStepsForPurpose( + SyncSessionJob::SyncSessionJobPurpose purpose, SyncerStep* start, SyncerStep* end) { *end = SYNCER_END; switch (purpose) { - case CONFIGURATION: + case SyncSessionJob::CONFIGURATION: *start = DOWNLOAD_UPDATES; *end = APPLY_UPDATES; return; - case CLEAR_USER_DATA: + case SyncSessionJob::CLEAR_USER_DATA: *start = CLEAR_PRIVATE_DATA; return; - case NUDGE: - case POLL: + case SyncSessionJob::NUDGE: + case SyncSessionJob::POLL: *start = SYNCER_BEGIN; return; default: @@ -409,38 +512,39 @@ void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - if (!ShouldRunJob(job.purpose, job.scheduled_start)) { - LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = " - << job.session->source().updates_source; + if (!ShouldRunJob(job)) return; - } - if (job.purpose == NUDGE) { + if (job.purpose == SyncSessionJob::NUDGE) { DCHECK(pending_nudge_.get()); if (pending_nudge_->session != job.session) return; // Another nudge must have been scheduled in in the meantime. pending_nudge_.reset(); } + VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " + << job.purpose; SyncerStep begin(SYNCER_BEGIN); SyncerStep end(SYNCER_END); SetSyncerStepsForPurpose(job.purpose, &begin, &end); bool has_more_to_sync = true; - while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { - VLOG(1) << "SyncerThread: Calling SyncShare."; + while (ShouldRunJob(job) && has_more_to_sync) { + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Calling SyncShare."; // Synchronously perform the sync session from this thread. syncer_->SyncShare(job.session.get(), begin, end); has_more_to_sync = job.session->HasMoreToSync(); if (has_more_to_sync) job.session->ResetTransientState(); } - VLOG(1) << "SyncerThread: Done SyncShare looping."; + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Done SyncShare looping."; FinishSyncSessionJob(job); } void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { - if (old_job.purpose == CONFIGURATION) { + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { // Whatever types were part of a configuration task will have had updates // downloaded. For that reason, we make sure they get recorded in the // event that they get disabled at a later time. @@ -473,10 +577,15 @@ void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { } last_sync_session_end_time_ = now; UpdateCarryoverSessionState(job); - if (IsSyncingCurrentlySilenced()) + if (IsSyncingCurrentlySilenced()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " We are currently throttled. So not scheduling the next sync."; + SaveJob(job); return; // Nothing to do. + } - VLOG(1) << "Updating the next polling time after SyncMain"; + VLOG(2) << "SyncerThread(" << this << ")" + << " Updating the next polling time after SyncMain"; ScheduleNextSync(job); } @@ -492,35 +601,52 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { const bool work_to_do = old_job.session->status_controller()->num_server_changes_remaining() > 0 || old_job.session->status_controller()->unsynced_handles().size() > 0; - VLOG(1) << "syncer has work to do: " << work_to_do; + VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: " + << work_to_do; AdjustPolling(&old_job); // TODO(tim): Old impl had special code if notifications disabled. Needed? if (!work_to_do) { // Success implies backoff relief. Note that if this was a "one-off" job - // (i.e. purpose == CLEAR_USER_DATA), if there was work_to_do before it - // ran this wont have changed, as jobs like this don't run a full sync - // cycle. So we don't need special code here. + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was + // work_to_do before it ran this wont have changed, as jobs like this don't + // run a full sync cycle. So we don't need special code here. wait_interval_.reset(); + VLOG(2) << "SyncerThread(" << this << ")" + << " Job suceeded so not scheduling more jobs"; return; } if (old_job.session->source().updates_source == GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Job failed with source continuation"; // We don't seem to have made forward progress. Start or extend backoff. HandleConsecutiveContinuationError(old_job); } else if (IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " A nudge during backoff failed"; // We weren't continuing but we're in backoff; must have been a nudge. - DCHECK_EQ(NUDGE, old_job.purpose); + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); DCHECK(!wait_interval_->had_nudge); wait_interval_->had_nudge = true; wait_interval_->timer.Reset(); } else { + VLOG(2) << "SyncerThread(" << this << ")" + << " Failed. Schedule a job with continuation as source"; // We weren't continuing and we aren't in backoff. Schedule a normal // continuation. - ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, - old_job.session->source().types, FROM_HERE); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + ScheduleConfigImpl(old_job.session->routing_info(), + old_job.session->workers(), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); + } else { + // For all other purposes(nudge and poll) we schedule a retry nudge. + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), + old_job.session->source().types, false, FROM_HERE); + } } } @@ -534,7 +660,7 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { bool rate_changed = !poll_timer_.IsRunning() || poll != poll_timer_.GetCurrentDelay(); - if (old_job && old_job->purpose != POLL && !rate_changed) + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) poll_timer_.Reset(); if (!rate_changed) @@ -548,17 +674,38 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { void SyncerThread::HandleConsecutiveContinuationError( const SyncSessionJob& old_job) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); + // This if conditions should be compiled out in retail builds. + if (IsBackingOff()) { + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); + } SyncSession* old = old_job.session.get(); SyncSession* s(new SyncSession(session_context_.get(), this, old->source(), old->routing_info(), old->workers())); TimeDelta length = delay_provider_->GetDelay( IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + + VLOG(2) << "SyncerThread(" << this << ")" + << " In handle continuation error. Old job purpose is " + << old_job.purpose; + VLOG(2) << "SyncerThread(" << this << ")" + << " In Handle continuation error. The time delta(ms) is: " + << length.InMilliseconds(); + + // This will reset the had_nudge variable as well. wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); - SyncSessionJob job = {NUDGE, TimeTicks::Now() + length, - make_linked_ptr(s), FROM_HERE}; - pending_nudge_.reset(new SyncSessionJob(job)); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, + make_linked_ptr(s), false, FROM_HERE); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); + } else { + // We are not in configuration mode. So wait_interval's pending job + // should be null. + DCHECK(wait_interval_->pending_configure_job.get() == NULL); + + // TODO(lipalani) - handle clear user data. + InitOrCoalescePendingJob(old_job); + } wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); } @@ -587,33 +734,76 @@ TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { } void SyncerThread::Stop() { + VLOG(2) << "SyncerThread(" << this << ")" << " stop called"; syncer_->RequestEarlyExit(); // Safe to call from any thread. session_context_->connection_manager()->RemoveListener(this); thread_.Stop(); } void SyncerThread::DoCanaryJob() { - DCHECK(pending_nudge_.get()); - wait_interval_->had_nudge = false; - SyncSessionJob copy = *pending_nudge_; - DoSyncSessionJob(copy); + VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job"; + DoPendingJobIfPossible(true); +} + +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { + SyncSessionJob* job_to_execute = NULL; + if (mode_ == CONFIGURATION_MODE && wait_interval_.get() + && wait_interval_->pending_configure_job.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job"; + job_to_execute = wait_interval_->pending_configure_job.get(); + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job"; + // Pending jobs mostly have time from the past. Reset it so this job + // will get executed. + if (pending_nudge_->scheduled_start < TimeTicks::Now()) + pending_nudge_->scheduled_start = TimeTicks::Now(); + + scoped_ptr<SyncSession> session(CreateSyncSession( + pending_nudge_->session->source())); + + // Also the routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + pending_nudge_->session->Coalesce(*(session.get())); + // The pending nudge would be cleared in the DoSyncSessionJob function. + job_to_execute = pending_nudge_.get(); + } + + if (job_to_execute != NULL) { + VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job"; + SyncSessionJob copy = *job_to_execute; + copy.is_canary_job = is_canary_job; + DoSyncSessionJob(copy); + } +} + +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(source); + + SyncSession* session(new SyncSession(session_context_.get(), this, info, + routes, workers)); + + return session; } void SyncerThread::PollTimerCallback() { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); ModelSafeRoutingInfo r; - std::vector<ModelSafeWorker*> w; - session_context_->registrar()->GetModelSafeRoutingInfo(&r); - session_context_->registrar()->GetWorkers(&w); ModelTypePayloadMap types_with_payloads = syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); - SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE); + SyncSession* s = CreateSyncSession(info); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, + FROM_HERE); } void SyncerThread::Unthrottle() { DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled.."; + DoCanaryJob(); wait_interval_.reset(); } @@ -652,6 +842,8 @@ void SyncerThread::OnReceivedLongPollIntervalUpdate( } void SyncerThread::OnShouldStopSyncingPermanently() { + VLOG(2) << "SyncerThread(" << this << ")" + << " OnShouldStopSyncingPermanently"; syncer_->RequestEarlyExit(); // Thread-safe. Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); } diff --git a/chrome/browser/sync/engine/syncer_thread2.h b/chrome/browser/sync/engine/syncer_thread2.h index 3fbdd7f..1c698db 100644 --- a/chrome/browser/sync/engine/syncer_thread2.h +++ b/chrome/browser/sync/engine/syncer_thread2.h @@ -99,30 +99,67 @@ class SyncerThread : public sessions::SyncSession::Delegate, virtual void OnServerConnectionEvent(const ServerConnectionEvent2& event); private: - friend class SyncerThread2Test; - - // State pertaining to exponential backoff or throttling periods. - struct WaitInterval; - - // An enum used to describe jobs for scheduling purposes. - enum SyncSessionJobPurpose { - // Our poll timer schedules POLL jobs periodically based on a server - // assigned poll interval. - POLL, - // A nudge task can come from a variety of components needing to force - // a sync. The source is inferable from |session.source()|. - NUDGE, - // The user invoked a function in the UI to clear their entire account - // and stop syncing (globally). - CLEAR_USER_DATA, - // Typically used for fetching updates for a subset of the enabled types - // during initial sync or reconfiguration. We don't run all steps of - // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). - CONFIGURATION, + enum JobProcessDecision { + // Indicates we should continue with the current job. + CONTINUE, + // Indicates that we should save it to be processed later. + SAVE, + // Indicates we should drop this job. + DROP, }; - // Internal state for every sync task that is scheduled. - struct SyncSessionJob; + struct SyncSessionJob { + // An enum used to describe jobs for scheduling purposes. + enum SyncSessionJobPurpose { + // Our poll timer schedules POLL jobs periodically based on a server + // assigned poll interval. + POLL, + // A nudge task can come from a variety of components needing to force + // a sync. The source is inferable from |session.source()|. + NUDGE, + // The user invoked a function in the UI to clear their entire account + // and stop syncing (globally). + CLEAR_USER_DATA, + // Typically used for fetching updates for a subset of the enabled types + // during initial sync or reconfiguration. We don't run all steps of + // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). + CONFIGURATION, + }; + SyncSessionJob(); + SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location); + ~SyncSessionJob(); + SyncSessionJobPurpose purpose; + base::TimeTicks scheduled_start; + linked_ptr<sessions::SyncSession> session; + bool is_canary_job; + + // This is the location the nudge came from. used for debugging purpose. + // In case of multiple nudges getting coalesced this stores the first nudge + // that came in. + tracked_objects::Location nudge_location; + }; + friend class SyncerThread2Test; + friend class SyncerThread2WhiteboxTest; + + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + DropNudgeWhileExponentialBackOff); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, SaveNudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueNudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, DropPoll); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinuePoll); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueConfiguration); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + SaveConfigurationWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + SaveNudgeWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueClearUserDataUnderAllCircumstances); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueCanaryJobConfig); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueNudgeWhileExponentialBackOff); // A component used to get time delays associated with exponential backoff. // Encapsulated into a class to facilitate testing. @@ -135,11 +172,39 @@ class SyncerThread : public sessions::SyncSession::Delegate, DISALLOW_COPY_AND_ASSIGN(DelayProvider); }; + struct WaitInterval { + enum Mode { + // A wait interval whose duration has been affected by exponential + // backoff. + // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. + EXPONENTIAL_BACKOFF, + // A server-initiated throttled interval. We do not allow any syncing + // during such an interval. + THROTTLED, + }; + WaitInterval(); + ~WaitInterval(); + + Mode mode; + + // This bool is set to true if we have observed a nudge during this + // interval and mode == EXPONENTIAL_BACKOFF. + bool had_nudge; + base::TimeDelta length; + base::OneShotTimer<SyncerThread> timer; + + // Configure jobs are saved only when backing off or throttling. So we + // expose the pointer here. + scoped_ptr<SyncSessionJob> pending_configure_job; + WaitInterval(Mode mode, base::TimeDelta length); + }; + // Helper to assemble a job and post a delayed task to sync. - void ScheduleSyncSessionJob(const base::TimeDelta& delay, - SyncSessionJobPurpose purpose, - sessions::SyncSession* session, - const tracked_objects::Location& nudge_location); + void ScheduleSyncSessionJob( + const base::TimeDelta& delay, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, + const tracked_objects::Location& nudge_location); // Invoke the Syncer to perform a sync. void DoSyncSessionJob(const SyncSessionJob& job); @@ -161,22 +226,35 @@ class SyncerThread : public sessions::SyncSession::Delegate, // Helper to ScheduleNextSync in case of consecutive sync errors. void HandleConsecutiveContinuationError(const SyncSessionJob& old_job); - // Determines if it is legal to run a sync job for |purpose| at - // |scheduled_start|. This checks current operational mode, backoff or - // throttling, freshness (so we don't make redundant syncs), and connection. - bool ShouldRunJob(SyncSessionJobPurpose purpose, - const base::TimeTicks& scheduled_start); + // Determines if it is legal to run |job| by checking current + // operational mode, backoff or throttling, freshness + // (so we don't make redundant syncs), and connection. + bool ShouldRunJob(const SyncSessionJob& job); + + // Decide whether we should CONTINUE, SAVE or DROP the job. + JobProcessDecision DecideOnJob(const SyncSessionJob& job); + + // Decide on whether to CONTINUE, SAVE or DROP the job when we are in + // backoff mode. + JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); + + // Saves the job for future execution. Note: It drops all the poll jobs. + void SaveJob(const SyncSessionJob& job); + + // Coalesces the current job with the pending nudge. + void InitOrCoalescePendingJob(const SyncSessionJob& job); // 'Impl' here refers to real implementation of public functions, running on // |thread_|. void StartImpl(Mode mode, linked_ptr<ModeChangeCallback> callback); void ScheduleNudgeImpl( const base::TimeDelta& delay, - NudgeSource source, + sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, const syncable::ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location); + bool is_canary_job, const tracked_objects::Location& nudge_location); void ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, - const std::vector<ModelSafeWorker*>& workers); + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source); void ScheduleClearUserDataImpl(); // Returns true if the client is currently in exponential backoff. @@ -189,12 +267,21 @@ class SyncerThread : public sessions::SyncSession::Delegate, void DoCanaryJob(); void Unthrottle(); + // Executes the pending job. Called whenever an event occurs that may + // change conditions permitting a job to run. Like when network connection is + // re-established, mode changes etc. + void DoPendingJobIfPossible(bool is_canary_job); + + // The pointer is owned by the caller. + browser_sync::sessions::SyncSession* CreateSyncSession( + const browser_sync::sessions::SyncSourceInfo& info); + // Creates a session for a poll and performs the sync. void PollTimerCallback(); // Assign |start| and |end| to appropriate SyncerStep values for the // specified |purpose|. - void SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, + void SetSyncerStepsForPurpose(SyncSessionJob::SyncSessionJobPurpose purpose, SyncerStep* start, SyncerStep* end); diff --git a/chrome/browser/sync/engine/syncer_thread2_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_unittest.cc index 22c72cd..2823567 100644 --- a/chrome/browser/sync/engine/syncer_thread2_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread2_unittest.cc @@ -47,7 +47,6 @@ struct SyncShareRecords { // Convenient to use in tests wishing to analyze SyncShare calls over time. static const size_t kMinNumSamples = 5; - class SyncerThread2Test : public testing::Test { public: class MockDelayProvider : public SyncerThread::DelayProvider { @@ -69,6 +68,20 @@ class SyncerThread2Test : public testing::Test { syncer_thread_.reset(new SyncerThread(context_, syncer_)); } + virtual void SetUpWithTypes(syncable::ModelTypeBitSet types) { + syncdb_.SetUp(); + syncer_ = new MockSyncer(); + delay_ = NULL; + registrar_.reset(MockModelSafeWorkerRegistrar::PassiveForTypes(types)); + connection_.reset(new MockConnectionManager(syncdb_.manager(), "Test")); + connection_->SetServerReachable(); + context_ = new SyncSessionContext(connection_.get(), syncdb_.manager(), + registrar_.get(), std::vector<SyncEngineEventListener*>()); + context_->set_notifications_enabled(true); + context_->set_account_name("Test"); + syncer_thread_.reset(new SyncerThread(context_, syncer_)); + } + SyncerThread* syncer_thread() { return syncer_thread_.get(); } MockSyncer* syncer() { return syncer_; } MockDelayProvider* delay() { return delay_; } @@ -227,6 +240,148 @@ TEST_F(SyncerThread2Test, Nudge) { records2.snapshots[0]->source.updates_source); } +// Make sure a regular config command is scheduled fine in the absence of any +// errors. +TEST_F(SyncerThread2Test, Config) { + base::WaitableEvent done(false, false); + SyncShareRecords records; + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types); + done.TimedWait(timeout()); + + EXPECT_EQ(1U, records.snapshots.size()); + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[0]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::FIRST_UPDATE, + records.snapshots[0]->source.updates_source); +} + +// Simulate a failure and make sure the config request is retried. +TEST_F(SyncerThread2Test, ConfigWithBackingOff) { + base::WaitableEvent done(false, false); + base::WaitableEvent* dummy = NULL; + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(1))); + SyncShareRecords records; + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, dummy)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types); + done.TimedWait(timeout()); + + EXPECT_EQ(2U, records.snapshots.size()); + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[1]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION, + records.snapshots[1]->source.updates_source); +} + +// Issue 2 config commands. Second one right after the first has failed +// and make sure LATEST is executed. +TEST_F(SyncerThread2Test, MultipleConfigWithBackingOff) { + syncable::ModelTypeBitSet model_types1, model_types2; + model_types1[syncable::BOOKMARKS] = true; + model_types2[syncable::AUTOFILL] = true; + SetUpWithTypes(model_types1 | model_types2); + base::WaitableEvent done(false, false); + base::WaitableEvent done1(false, false); + base::WaitableEvent* dummy = NULL; + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(30))); + SyncShareRecords records; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, dummy)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, &done1)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types1); + + // done1 indicates the first config failed. + done1.TimedWait(timeout()); + syncer_thread()->ScheduleConfig(model_types2); + done.TimedWait(timeout()); + + EXPECT_EQ(3U, records.snapshots.size()); + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types2, + records.snapshots[2]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::FIRST_UPDATE, + records.snapshots[2]->source.updates_source); +} + +// Issue a nudge when the config has failed. Make sure both the config and +// nudge are executed. +TEST_F(SyncerThread2Test, NudgeWithConfigWithBackingOff) { + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + base::WaitableEvent done(false, false); + base::WaitableEvent done1(false, false); + base::WaitableEvent done2(false, false); + base::WaitableEvent* dummy = NULL; + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(50))); + SyncShareRecords records; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, dummy)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, &done1)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done2)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types); + done1.TimedWait(timeout()); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, model_types, + FROM_HERE); + + // done2 indicates config suceeded. Now change the mode so nudge can execute. + done2.TimedWait(timeout()); + syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); + done.TimedWait(timeout()); + EXPECT_EQ(4U, records.snapshots.size()); + + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[2]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION, + records.snapshots[2]->source.updates_source); + + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[3]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::LOCAL, + records.snapshots[3]->source.updates_source); + +} + + // Test that nudges are coalesced. TEST_F(SyncerThread2Test, NudgeCoalescing) { syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); @@ -476,7 +631,7 @@ TEST_F(SyncerThread2Test, ConfigurationMode) { base::WaitableEvent* dummy = NULL; syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + .WillOnce((Invoke(sessions::test_util::SimulateSuccess), WithArg<0>(RecordSyncShare(&records, 1U, dummy)))); syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); syncable::ModelTypeBitSet nudge_types; @@ -739,6 +894,7 @@ TEST_F(SyncerThread2Test, StartWhenNotConnected) { base::WaitableEvent done(false, false); MessageLoop cur; connection()->SetServerNotReachable(); + EXPECT_CALL(*syncer(), SyncShare(_,_,_)).WillOnce(SignalEvent(&done)); syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, ModelTypeBitSet(), FROM_HERE); @@ -752,9 +908,6 @@ TEST_F(SyncerThread2Test, StartWhenNotConnected) { // By now, the server connection event should have been posted to the // SyncerThread. FlushLastTask(&done); - EXPECT_CALL(*syncer(), SyncShare(_,_,_)).WillOnce(SignalEvent(&done)); - syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, ModelTypeBitSet(), - FROM_HERE); done.TimedWait(timeout()); } diff --git a/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc new file mode 100644 index 0000000..e1d1218 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc @@ -0,0 +1,235 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/time.h" +#include "chrome/browser/sync/engine/mock_model_safe_workers.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" +#include "chrome/browser/sync/sessions/sync_session_context.h" +#include "chrome/browser/sync/sessions/test_util.h" +#include "chrome/test/sync/engine/mock_connection_manager.h" +#include "chrome/test/sync/engine/test_directory_setter_upper.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/gmock/include/gmock/gmock.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { +using sessions::SyncSessionContext; +using browser_sync::Syncer; + +namespace s3 { + +class SyncerThread2WhiteboxTest : public testing::Test { + public: + virtual void SetUp() { + syncdb_.SetUp(); + Syncer* syncer = new Syncer(); + registrar_.reset(MockModelSafeWorkerRegistrar::PassiveBookmarks()); + context_ = new SyncSessionContext(connection_.get(), syncdb_.manager(), + registrar_.get(), std::vector<SyncEngineEventListener*>()); + context_->set_notifications_enabled(true); + context_->set_account_name("Test"); + syncer_thread_.reset(new SyncerThread(context_, syncer)); + } + + virtual void TearDown() { + syncdb_.TearDown(); + } + + void SetMode(SyncerThread::Mode mode) { + syncer_thread_->mode_ = mode; + } + + void SetLastSyncedTime(base::TimeTicks ticks) { + syncer_thread_->last_sync_session_end_time_ = ticks; + } + + void SetServerConnection(bool connected) { + syncer_thread_->server_connection_ok_ = connected; + } + + void ResetWaitInterval() { + syncer_thread_->wait_interval_.reset(); + } + + void SetWaitIntervalToThrottled() { + syncer_thread_->wait_interval_.reset(new SyncerThread::WaitInterval( + SyncerThread::WaitInterval::THROTTLED, TimeDelta::FromSeconds(1))); + } + + void SetWaitIntervalToExponentialBackoff() { + syncer_thread_->wait_interval_.reset( + new SyncerThread::WaitInterval( + SyncerThread::WaitInterval::EXPONENTIAL_BACKOFF, + TimeDelta::FromSeconds(1))); + } + + SyncerThread::JobProcessDecision DecideOnJob( + const SyncerThread::SyncSessionJob& job) { + return syncer_thread_->DecideOnJob(job); + } + + void InitializeSyncerOnNormalMode() { + SetMode(SyncerThread::NORMAL_MODE); + ResetWaitInterval(); + SetServerConnection(true); + SetLastSyncedTime(base::TimeTicks::Now()); + } + + SyncerThread::JobProcessDecision CreateAndDecideJob( + SyncerThread::SyncSessionJob::SyncSessionJobPurpose purpose) { + struct SyncerThread::SyncSessionJob job; + job.purpose = purpose; + job.scheduled_start = TimeTicks::Now(); + return DecideOnJob(job); + } + + protected: + scoped_ptr<SyncerThread> syncer_thread_; + + private: + scoped_ptr<MockConnectionManager> connection_; + SyncSessionContext* context_; + //MockDelayProvider* delay_; + scoped_ptr<MockModelSafeWorkerRegistrar> registrar_; + MockDirectorySetterUpper syncdb_; +}; + +TEST_F(SyncerThread2WhiteboxTest, SaveNudge) { + InitializeSyncerOnNormalMode(); + + // Now set the mode to configure. + SetMode(SyncerThread::CONFIGURATION_MODE); + + SyncerThread::JobProcessDecision decision = + CreateAndDecideJob(SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::SAVE); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueNudge) { + InitializeSyncerOnNormalMode(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, DropPoll) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::POLL); + + EXPECT_EQ(decision, SyncerThread::DROP); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinuePoll) { + InitializeSyncerOnNormalMode(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::POLL); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueConfiguration) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CONFIGURATION); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, SaveConfigurationWhileThrottled) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SetWaitIntervalToThrottled(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CONFIGURATION); + + EXPECT_EQ(decision, SyncerThread::SAVE); +} + +TEST_F(SyncerThread2WhiteboxTest, SaveNudgeWhileThrottled) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SetWaitIntervalToThrottled(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::SAVE); + +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueClearUserDataUnderAllCircumstances) { + InitializeSyncerOnNormalMode(); + + SetMode(SyncerThread::CONFIGURATION_MODE); + SetWaitIntervalToThrottled(); + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CLEAR_USER_DATA); + EXPECT_EQ(decision, SyncerThread::CONTINUE); + + SetMode(SyncerThread::NORMAL_MODE); + SetWaitIntervalToExponentialBackoff(); + decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CLEAR_USER_DATA); + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueNudgeWhileExponentialBackOff) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::NORMAL_MODE); + SetWaitIntervalToExponentialBackoff(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, DropNudgeWhileExponentialBackOff) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::NORMAL_MODE); + SetWaitIntervalToExponentialBackoff(); + + syncer_thread_->wait_interval_->had_nudge = true; + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::DROP); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueCanaryJobConfig) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + SetWaitIntervalToExponentialBackoff(); + + struct SyncerThread::SyncSessionJob job; + job.purpose = SyncerThread::SyncSessionJob::CONFIGURATION; + job.scheduled_start = TimeTicks::Now(); + job.is_canary_job = true; + SyncerThread::JobProcessDecision decision = DecideOnJob(job); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + + +} // namespace s3 +} // namespace browser_sync + +// SyncerThread won't outlive the test! +DISABLE_RUNNABLE_METHOD_REFCOUNT( + browser_sync::s3::SyncerThread2WhiteboxTest); diff --git a/chrome/browser/sync/glue/sync_backend_host.cc b/chrome/browser/sync/glue/sync_backend_host.cc index c317158..50fc35c 100644 --- a/chrome/browser/sync/glue/sync_backend_host.cc +++ b/chrome/browser/sync/glue/sync_backend_host.cc @@ -412,8 +412,10 @@ void SyncBackendHost::ConfigureDataTypes( // If we're doing the first configure (at startup) this is redundant as the // syncer thread always must start in config mode. if (using_new_syncer_thread_) { - core_->syncapi()->StartConfigurationMode(NewCallback(core_.get(), - &SyncBackendHost::Core::FinishConfigureDataTypes)); + core_thread_.message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &SyncBackendHost::Core::DoStartConfigurationMode)); } else { FinishConfigureDataTypesOnFrontendLoop(); } @@ -1211,6 +1213,11 @@ void SyncBackendHost::Core::DoProcessMessage( syncapi_->GetJsBackend()->ProcessMessage(name, args, sender); } +void SyncBackendHost::Core::DoStartConfigurationMode() { + syncapi_->StartConfigurationMode(NewCallback(this, + &SyncBackendHost::Core::FinishConfigureDataTypes)); +} + void SyncBackendHost::Core::DeferNudgeForCleanup() { DCHECK_EQ(MessageLoop::current(), host_->core_thread_.message_loop()); deferred_nudge_for_cleanup_requested_ = true; diff --git a/chrome/browser/sync/glue/sync_backend_host.h b/chrome/browser/sync/glue/sync_backend_host.h index 1e65dd9..086b2e0 100644 --- a/chrome/browser/sync/glue/sync_backend_host.h +++ b/chrome/browser/sync/glue/sync_backend_host.h @@ -326,7 +326,6 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { void CreateSyncNotifier(const scoped_refptr<net::URLRequestContextGetter>& request_context_getter); - // Note: // // The Do* methods are the various entry points from our SyncBackendHost. @@ -402,6 +401,8 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { const std::string& name, const JsArgList& args, const JsEventHandler* sender); + void DoStartConfigurationMode(); + // A callback from the SyncerThread when it is safe to continue config. void FinishConfigureDataTypes(); diff --git a/chrome/browser/sync/profile_sync_service.cc b/chrome/browser/sync/profile_sync_service.cc index dde2a0e..aa95e2a 100644 --- a/chrome/browser/sync/profile_sync_service.cc +++ b/chrome/browser/sync/profile_sync_service.cc @@ -522,6 +522,7 @@ void ProfileSyncService::OnBackendInitialized() { void ProfileSyncService::OnSyncCycleCompleted() { UpdateLastSyncedTime(); + VLOG(2) << "Notifying observers sync cycle completed"; NotifyObservers(); } diff --git a/chrome/browser/sync/sessions/sync_session.cc b/chrome/browser/sync/sessions/sync_session.cc index cd01375..a38e31a 100644 --- a/chrome/browser/sync/sessions/sync_session.cc +++ b/chrome/browser/sync/sessions/sync_session.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -41,11 +41,14 @@ void SyncSession::Coalesce(const SyncSession& session) { std::back_inserter(temp)); workers_.swap(temp); - ModelSafeRoutingInfo temp_r; - std::set_union(routing_info_.begin(), routing_info_.end(), - session.routing_info_.begin(), session.routing_info_.end(), - std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); - routing_info_.swap(temp_r); + // We have to update the model safe routing info to the union. In case the + // same key is present in both pick the one from session. + for (ModelSafeRoutingInfo::const_iterator it = + session.routing_info_.begin(); + it != session.routing_info_.end(); + ++it) { + routing_info_[it->first] = it->second; + } } void SyncSession::ResetTransientState() { diff --git a/chrome/chrome_tests.gypi b/chrome/chrome_tests.gypi index 23158f8..c8189a1 100644 --- a/chrome/chrome_tests.gypi +++ b/chrome/chrome_tests.gypi @@ -2915,6 +2915,7 @@ 'browser/sync/engine/syncer_proto_util_unittest.cc', 'browser/sync/engine/syncer_thread_unittest.cc', 'browser/sync/engine/syncer_thread2_unittest.cc', + 'browser/sync/engine/syncer_thread2_whitebox_unittest.cc', 'browser/sync/engine/syncer_unittest.cc', 'browser/sync/engine/syncproto_unittest.cc', 'browser/sync/engine/syncapi_mock.h', |