diff options
author | lipalani@chromium.org <lipalani@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-15 04:37:47 +0000 |
---|---|---|
committer | lipalani@chromium.org <lipalani@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-15 04:37:47 +0000 |
commit | 52d3effc94111150965c9b386ab564e5c04e862b (patch) | |
tree | 49e487e574a7871c2aaaafbfe6e1fdd40f146aed | |
parent | 5a7a583e86a3503c00a8e07e0b5f315c6a11a03f (diff) | |
download | chromium_src-52d3effc94111150965c9b386ab564e5c04e862b.zip chromium_src-52d3effc94111150965c9b386ab564e5c04e862b.tar.gz chromium_src-52d3effc94111150965c9b386ab564e5c04e862b.tar.bz2 |
sync: Make nudge + config jobs reliable in SyncerThread2
We save nudges that are received during config. And execute them when mode changes to normal.
We also save nudge and config jobs that fail due to any reason. Then we execute them either during exponential backoff or when we get connection.
In short we are making the nudges and configs reliable.
BUG=77820
TEST=
Review URL: http://codereview.chromium.org/6812004
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@81701 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | chrome/browser/sync/engine/mock_model_safe_workers.cc | 15 | ||||
-rw-r--r-- | chrome/browser/sync/engine/mock_model_safe_workers.h | 2 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2.cc | 510 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2.h | 157 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2_unittest.cc | 163 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc | 235 | ||||
-rw-r--r-- | chrome/browser/sync/glue/sync_backend_host.cc | 11 | ||||
-rw-r--r-- | chrome/browser/sync/glue/sync_backend_host.h | 3 | ||||
-rw-r--r-- | chrome/browser/sync/profile_sync_service.cc | 1 | ||||
-rw-r--r-- | chrome/browser/sync/sessions/sync_session.cc | 15 | ||||
-rw-r--r-- | chrome/chrome_tests.gypi | 1 |
11 files changed, 905 insertions, 208 deletions
diff --git a/chrome/browser/sync/engine/mock_model_safe_workers.cc b/chrome/browser/sync/engine/mock_model_safe_workers.cc index bd6a26b..77ed4e9 100644 --- a/chrome/browser/sync/engine/mock_model_safe_workers.cc +++ b/chrome/browser/sync/engine/mock_model_safe_workers.cc @@ -24,6 +24,21 @@ MockModelSafeWorkerRegistrar* return m; } +MockModelSafeWorkerRegistrar* MockModelSafeWorkerRegistrar::PassiveForTypes( + const syncable::ModelTypeBitSet& set) { + ModelSafeRoutingInfo routes; + for (int i = syncable::UNSPECIFIED ; i < syncable::MODEL_TYPE_COUNT; ++i) { + syncable::ModelType type = syncable::ModelTypeFromInt(i); + if (set[type]) { + routes[type] = GROUP_PASSIVE; + } + } + MockModelSafeWorkerRegistrar* m = new MockModelSafeWorkerRegistrar(routes); + m->passive_worker_ = new ModelSafeWorker(); + return m; +} + + void MockModelSafeWorkerRegistrar::GetWorkers( std::vector<ModelSafeWorker*>* out) { if (passive_worker_.get()) diff --git a/chrome/browser/sync/engine/mock_model_safe_workers.h b/chrome/browser/sync/engine/mock_model_safe_workers.h index e2eb6cc..04f4adf 100644 --- a/chrome/browser/sync/engine/mock_model_safe_workers.h +++ b/chrome/browser/sync/engine/mock_model_safe_workers.h @@ -30,6 +30,8 @@ class MockModelSafeWorkerRegistrar : public ModelSafeWorkerRegistrar { public: virtual ~MockModelSafeWorkerRegistrar(); static MockModelSafeWorkerRegistrar* PassiveBookmarks(); + static MockModelSafeWorkerRegistrar* PassiveForTypes( + const syncable::ModelTypeBitSet& set); virtual void GetWorkers(std::vector<ModelSafeWorker*>* out); virtual void GetModelSafeRoutingInfo(ModelSafeRoutingInfo* out); diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc index e94215e..b8bd9a9 100644 --- a/chrome/browser/sync/engine/syncer_thread2.cc +++ b/chrome/browser/sync/engine/syncer_thread2.cc @@ -23,45 +23,47 @@ using sync_pb::GetUpdatesCallerInfo; namespace s3 { -struct SyncerThread::WaitInterval { - enum Mode { - // A wait interval whose duration has been affected by exponential - // backoff. - // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. - EXPONENTIAL_BACKOFF, - // A server-initiated throttled interval. We do not allow any syncing - // during such an interval. - THROTTLED, - }; - Mode mode; - - // This bool is set to true if we have observed a nudge during this - // interval and mode == EXPONENTIAL_BACKOFF. - bool had_nudge; - base::TimeDelta length; - base::OneShotTimer<SyncerThread> timer; - WaitInterval(Mode mode, base::TimeDelta length); -}; +SyncerThread::DelayProvider::DelayProvider() {} +SyncerThread::DelayProvider::~DelayProvider() {} -struct SyncerThread::SyncSessionJob { - SyncSessionJobPurpose purpose; - base::TimeTicks scheduled_start; - linked_ptr<sessions::SyncSession> session; +SyncerThread::WaitInterval::WaitInterval() {} +SyncerThread::WaitInterval::~WaitInterval() {} - // This is the location the nudge came from. used for debugging purpose. - // In case of multiple nudges getting coalesced this stores the first nudge - // that came in. - tracked_objects::Location nudge_location; -}; +SyncerThread::SyncSessionJob::SyncSessionJob() {} +SyncerThread::SyncSessionJob::~SyncSessionJob() {} -SyncerThread::DelayProvider::DelayProvider() {} -SyncerThread::DelayProvider::~DelayProvider() {} +SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, + base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location) : purpose(purpose), + scheduled_start(start), + session(session), + is_canary_job(is_canary_job), + nudge_location(nudge_location) { +} TimeDelta SyncerThread::DelayProvider::GetDelay( const base::TimeDelta& last_delay) { return SyncerThread::GetRecommendedDelay(last_delay); } +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_UNKNOWN: + return GetUpdatesCallerInfo::UNKNOWN; + default: + NOTREACHED(); + return GetUpdatesCallerInfo::UNKNOWN; + } +} + SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) : mode(mode), had_nudge(false), length(length) { } @@ -85,6 +87,9 @@ SyncerThread::~SyncerThread() { void SyncerThread::CheckServerConnectionManagerStatus( HttpResponse::ServerConnectionCode code) { + + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << "Old mode: " << server_connection_ok_ << " Code: " << code; // Note, be careful when adding cases here because if the SyncerThread // thinks there is no valid connection as determined by this method, it // will drop out of *all* forward progress sync loops (it won't poll and it @@ -94,13 +99,22 @@ void SyncerThread::CheckServerConnectionManagerStatus( if (HttpResponse::CONNECTION_UNAVAILABLE == code || HttpResponse::SYNC_AUTH_ERROR == code) { server_connection_ok_ = false; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; } else if (HttpResponse::SERVER_CONNECTION_OK == code) { server_connection_ok_ = true; + VLOG(2) << "SyncerThread(" << this << ")" << " Server connection changed." + << " new mode:" << server_connection_ok_; + DoCanaryJob(); } } void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Start called from thread " + << MessageLoop::current()->thread_name(); if (!thread_.IsRunning()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Starting thread with mode " + << mode; if (!thread_.Start()) { NOTREACHED() << "Unable to start SyncerThread."; return; @@ -110,6 +124,9 @@ void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { this, &SyncerThread::SendInitialSnapshot)); } + VLOG(2) << "SyncerThread(" << this << ")" << " Entering start with mode = " + << mode; + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback))); } @@ -133,6 +150,8 @@ void SyncerThread::WatchConnectionManager() { void SyncerThread::StartImpl(Mode mode, linked_ptr<ModeChangeCallback> callback) { + VLOG(2) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " + << mode; DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); DCHECK(!session_context_->account_name().empty()); DCHECK(syncer_.get()); @@ -140,75 +159,135 @@ void SyncerThread::StartImpl(Mode mode, AdjustPolling(NULL); // Will kick start poll timer if needed. if (callback.get()) callback->Run(); + + // We just changed our mode. See if there are any pending jobs that we could + // execute in the new mode. + DoPendingJobIfPossible(false); } -bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, - const TimeTicks& scheduled_start) { - DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); +SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( + const SyncSessionJob& job) { - // Check wait interval. - if (wait_interval_.get()) { - // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit - // when throttled). - if (wait_interval_->mode == WaitInterval::THROTTLED) - return false; + DCHECK(wait_interval_.get()); + DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); - DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); - if ((purpose != NUDGE) || wait_interval_->had_nudge) - return false; - } + VLOG(2) << "SyncerThread(" << this << ")" << " Wait interval mode : " + << wait_interval_->mode << "Wait interval had nudge : " + << wait_interval_->had_nudge << "is canary job : " + << job.is_canary_job; - // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that - // were intended for a normal sync if we are in configuration mode, and vice - // versa. - switch (mode_) { - case CONFIGURATION_MODE: - if (purpose != CONFIGURATION) - return false; - break; - case NORMAL_MODE: - if (purpose == CONFIGURATION) - return false; - break; - default: - NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; - return false; + if (job.purpose == SyncSessionJob::POLL) + return DROP; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || + job.purpose == SyncSessionJob::CONFIGURATION); + if (wait_interval_->mode == WaitInterval::THROTTLED) + return SAVE; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + if (job.purpose == SyncSessionJob::NUDGE) { + if (mode_ == CONFIGURATION_MODE) + return SAVE; + + // If we already had one nudge then just drop this nudge. We will retry + // later when the timer runs out. + return wait_interval_->had_nudge ? DROP : CONTINUE; } + // This is a config job. + return job.is_canary_job ? CONTINUE : SAVE; +} + +SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( + const SyncSessionJob& job) { + if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) + return CONTINUE; - // Continuation NUDGE tasks have priority over POLLs because they are the - // only tasks that trigger exponential backoff, so this prevents them from - // being starved from running (e.g. due to a very, very low poll interval, - // such as 0ms). It's rare that this would ever matter in practice. - if (purpose == POLL && (pending_nudge_.get() && - pending_nudge_->session->source().updates_source == - GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { - return false; + if (wait_interval_.get()) + return DecideWhileInWaitInterval(job); + + if (mode_ == CONFIGURATION_MODE) { + if (job.purpose == SyncSessionJob::NUDGE) + return SAVE; + else if (job.purpose == SyncSessionJob::CONFIGURATION) + return CONTINUE; + else + return DROP; } - // Freshness condition. - if (purpose == NUDGE && - (scheduled_start < last_sync_session_end_time_)) { - return false; + // We are in normal mode. + DCHECK_EQ(mode_, NORMAL_MODE); + DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); + + // Freshness condition + if (job.scheduled_start < last_sync_session_end_time_) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Dropping job because of freshness"; + return DROP; } - return server_connection_ok_; + if (server_connection_ok_) + return CONTINUE; + + VLOG(2) << "SyncerThread(" << this << ")" + << " Bad server connection. Using that to decide on job."; + return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; } -GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( - NudgeSource source) { - switch (source) { - case NUDGE_SOURCE_NOTIFICATION: - return GetUpdatesCallerInfo::NOTIFICATION; - case NUDGE_SOURCE_LOCAL: - return GetUpdatesCallerInfo::LOCAL; - case NUDGE_SOURCE_CONTINUATION: - return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; - case NUDGE_SOURCE_UNKNOWN: - return GetUpdatesCallerInfo::UNKNOWN; - default: - NOTREACHED(); - return GetUpdatesCallerInfo::UNKNOWN; +void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); + if (pending_nudge_.get() == NULL) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Creating a pending nudge job"; + SyncSession* s = job.session.get(); + scoped_ptr<SyncSession> session(new SyncSession(s->context(), + s->delegate(), s->source(), s->routing_info(), s->workers())); + + SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, + make_linked_ptr(session.release()), false, job.nudge_location); + pending_nudge_.reset(new SyncSessionJob(new_job)); + + return; } + + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); + pending_nudge_->scheduled_start = job.scheduled_start; + + // Unfortunately the nudge location cannot be modified. So it stores the + // location of the first caller. +} + +bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { + JobProcessDecision decision = DecideOnJob(job); + VLOG(2) << "SyncerThread(" << this << ")" << " Should run job, decision: " + << decision << " Job purpose " << job.purpose << "mode " << mode_; + if (decision != SAVE) + return decision == CONTINUE; + + DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == + SyncSessionJob::CONFIGURATION); + + SaveJob(job); + return false; +} + +void SyncerThread::SaveJob(const SyncSessionJob& job) { + DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); + if (job.purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a nudge job"; + InitOrCoalescePendingJob(job); + } else if (job.purpose == SyncSessionJob::CONFIGURATION){ + VLOG(2) << "SyncerThread(" << this << ")" << " Saving a configuration job"; + DCHECK(wait_interval_.get()); + DCHECK(mode_ == CONFIGURATION_MODE); + + SyncSession* old = job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + SyncSessionJob new_job(job.purpose, TimeTicks::Now(), + make_linked_ptr(s), false, job.nudge_location); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); + } // drop the rest. } // Functor for std::find_if to search by ModelSafeGroup. @@ -237,11 +316,14 @@ void SyncerThread::ScheduleNudge(const TimeDelta& delay, return; } + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled"; + ModelTypePayloadMap types_with_payloads = syncable::ModelTypePayloadMapFromBitSet(types, std::string()); thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleNudgeImpl, delay, source, - types_with_payloads, nudge_location)); + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); } void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, @@ -252,9 +334,12 @@ void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, return; } + VLOG(2) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleNudgeImpl, delay, source, - types_with_payloads, nudge_location)); + this, &SyncerThread::ScheduleNudgeImpl, delay, + GetUpdatesFromNudgeSource(source), types_with_payloads, false, + nudge_location)); } void SyncerThread::ScheduleClearUserDataImpl() { @@ -262,51 +347,58 @@ void SyncerThread::ScheduleClearUserDataImpl() { SyncSession* session = new SyncSession(session_context_.get(), this, SyncSourceInfo(), ModelSafeRoutingInfo(), std::vector<ModelSafeWorker*>()); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session, - FROM_HERE); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); } void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, - NudgeSource source, const ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location) { + GetUpdatesCallerInfo::GetUpdatesSource source, + const ModelTypePayloadMap& types_with_payloads, + bool is_canary_job, const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - TimeTicks rough_start = TimeTicks::Now() + delay; - if (!ShouldRunJob(NUDGE, rough_start)) { - LOG(WARNING) << "Dropping nudge at scheduling time, source = " - << source; - return; - } + VLOG(2) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; // Note we currently nudge for all types regardless of the ones incurring // the nudge. Doing different would throw off some syncer commands like // CleanupDisabledTypes. We may want to change this in the future. - ModelSafeRoutingInfo routes; - std::vector<ModelSafeWorker*> workers; - session_context_->registrar()->GetModelSafeRoutingInfo(&routes); - session_context_->registrar()->GetWorkers(&workers); - SyncSourceInfo info(GetUpdatesFromNudgeSource(source), - types_with_payloads); + SyncSourceInfo info(source, types_with_payloads); - scoped_ptr<SyncSession> session(new SyncSession( - session_context_.get(), this, info, routes, workers)); + SyncSession* session(CreateSyncSession(info)); + SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, + make_linked_ptr(session), is_canary_job, + nudge_location); + + session = NULL; + if (!ShouldRunJob(job)) + return; if (pending_nudge_.get()) { - if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping the nudge because" + << "we are in backoff"; return; + } - pending_nudge_->session->Coalesce(*session.get()); + VLOG(2) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; + pending_nudge_->session->Coalesce(*(job.session.get())); if (!IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Dropping a nudge because" + << " we are not in backoff and the job was coalesced"; return; } else { - // Re-schedule the current pending nudge. + VLOG(2) << "SyncerThread(" << this << ")" + << " Rescheduling pending nudge"; SyncSession* s = pending_nudge_->session.get(); - session.reset(new SyncSession(s->context(), s->delegate(), s->source(), - s->routing_info(), s->workers())); + job.session.reset(new SyncSession(s->context(), s->delegate(), + s->source(), s->routing_info(), s->workers())); pending_nudge_.reset(); } } - ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location); + + // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), + nudge_location); } // Helper to extract the routing info and workers corresponding to types in @@ -348,58 +440,69 @@ void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { return; } + VLOG(2) << "SyncerThread(" << this << ")" << " Scheduling a config"; ModelSafeRoutingInfo routes; std::vector<ModelSafeWorker*> workers; GetModelSafeParamsForTypes(types, session_context_->registrar(), &routes, &workers); thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - this, &SyncerThread::ScheduleConfigImpl, routes, workers)); + this, &SyncerThread::ScheduleConfigImpl, routes, workers, + GetUpdatesCallerInfo::FIRST_UPDATE)); } void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, - const std::vector<ModelSafeWorker*>& workers) { + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + VLOG(2) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; // TODO(tim): config-specific GetUpdatesCallerInfo value? SyncSession* session = new SyncSession(session_context_.get(), this, - SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE, + SyncSourceInfo(source, syncable::ModelTypePayloadMapFromRoutingInfo( routing_info, std::string())), routing_info, workers); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session, - FROM_HERE); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), + SyncSessionJob::CONFIGURATION, session, FROM_HERE); } void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, - SyncSessionJobPurpose purpose, sessions::SyncSession* session, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, const tracked_objects::Location& nudge_location) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - SyncSessionJob job = {purpose, TimeTicks::Now() + delay, - make_linked_ptr(session), nudge_location}; - if (purpose == NUDGE) { + SyncSessionJob job(purpose, TimeTicks::Now() + delay, + make_linked_ptr(session), false, nudge_location); + if (purpose == SyncSessionJob::NUDGE) { + VLOG(2) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" + << " ScheduleSyncSessionJob"; DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); pending_nudge_.reset(new SyncSessionJob(job)); } + VLOG(2) << "SyncerThread(" << this << ")" + << " Posting job to execute in DoSyncSessionJob. Job purpose " + << job.purpose; MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); } -void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, +void SyncerThread::SetSyncerStepsForPurpose( + SyncSessionJob::SyncSessionJobPurpose purpose, SyncerStep* start, SyncerStep* end) { *end = SYNCER_END; switch (purpose) { - case CONFIGURATION: + case SyncSessionJob::CONFIGURATION: *start = DOWNLOAD_UPDATES; *end = APPLY_UPDATES; return; - case CLEAR_USER_DATA: + case SyncSessionJob::CLEAR_USER_DATA: *start = CLEAR_PRIVATE_DATA; return; - case NUDGE: - case POLL: + case SyncSessionJob::NUDGE: + case SyncSessionJob::POLL: *start = SYNCER_BEGIN; return; default: @@ -409,38 +512,39 @@ void SyncerThread::SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - if (!ShouldRunJob(job.purpose, job.scheduled_start)) { - LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = " - << job.session->source().updates_source; + if (!ShouldRunJob(job)) return; - } - if (job.purpose == NUDGE) { + if (job.purpose == SyncSessionJob::NUDGE) { DCHECK(pending_nudge_.get()); if (pending_nudge_->session != job.session) return; // Another nudge must have been scheduled in in the meantime. pending_nudge_.reset(); } + VLOG(2) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " + << job.purpose; SyncerStep begin(SYNCER_BEGIN); SyncerStep end(SYNCER_END); SetSyncerStepsForPurpose(job.purpose, &begin, &end); bool has_more_to_sync = true; - while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { - VLOG(1) << "SyncerThread: Calling SyncShare."; + while (ShouldRunJob(job) && has_more_to_sync) { + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Calling SyncShare."; // Synchronously perform the sync session from this thread. syncer_->SyncShare(job.session.get(), begin, end); has_more_to_sync = job.session->HasMoreToSync(); if (has_more_to_sync) job.session->ResetTransientState(); } - VLOG(1) << "SyncerThread: Done SyncShare looping."; + VLOG(2) << "SyncerThread(" << this << ")" + << " SyncerThread: Done SyncShare looping."; FinishSyncSessionJob(job); } void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { - if (old_job.purpose == CONFIGURATION) { + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { // Whatever types were part of a configuration task will have had updates // downloaded. For that reason, we make sure they get recorded in the // event that they get disabled at a later time. @@ -473,10 +577,15 @@ void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { } last_sync_session_end_time_ = now; UpdateCarryoverSessionState(job); - if (IsSyncingCurrentlySilenced()) + if (IsSyncingCurrentlySilenced()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " We are currently throttled. So not scheduling the next sync."; + SaveJob(job); return; // Nothing to do. + } - VLOG(1) << "Updating the next polling time after SyncMain"; + VLOG(2) << "SyncerThread(" << this << ")" + << " Updating the next polling time after SyncMain"; ScheduleNextSync(job); } @@ -492,35 +601,52 @@ void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { const bool work_to_do = old_job.session->status_controller()->num_server_changes_remaining() > 0 || old_job.session->status_controller()->unsynced_handles().size() > 0; - VLOG(1) << "syncer has work to do: " << work_to_do; + VLOG(2) << "SyncerThread(" << this << ")" << " syncer has work to do: " + << work_to_do; AdjustPolling(&old_job); // TODO(tim): Old impl had special code if notifications disabled. Needed? if (!work_to_do) { // Success implies backoff relief. Note that if this was a "one-off" job - // (i.e. purpose == CLEAR_USER_DATA), if there was work_to_do before it - // ran this wont have changed, as jobs like this don't run a full sync - // cycle. So we don't need special code here. + // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was + // work_to_do before it ran this wont have changed, as jobs like this don't + // run a full sync cycle. So we don't need special code here. wait_interval_.reset(); + VLOG(2) << "SyncerThread(" << this << ")" + << " Job suceeded so not scheduling more jobs"; return; } if (old_job.session->source().updates_source == GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { + VLOG(2) << "SyncerThread(" << this << ")" + << " Job failed with source continuation"; // We don't seem to have made forward progress. Start or extend backoff. HandleConsecutiveContinuationError(old_job); } else if (IsBackingOff()) { + VLOG(2) << "SyncerThread(" << this << ")" + << " A nudge during backoff failed"; // We weren't continuing but we're in backoff; must have been a nudge. - DCHECK_EQ(NUDGE, old_job.purpose); + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); DCHECK(!wait_interval_->had_nudge); wait_interval_->had_nudge = true; wait_interval_->timer.Reset(); } else { + VLOG(2) << "SyncerThread(" << this << ")" + << " Failed. Schedule a job with continuation as source"; // We weren't continuing and we aren't in backoff. Schedule a normal // continuation. - ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, - old_job.session->source().types, FROM_HERE); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + ScheduleConfigImpl(old_job.session->routing_info(), + old_job.session->workers(), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); + } else { + // For all other purposes(nudge and poll) we schedule a retry nudge. + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), + GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), + old_job.session->source().types, false, FROM_HERE); + } } } @@ -534,7 +660,7 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { bool rate_changed = !poll_timer_.IsRunning() || poll != poll_timer_.GetCurrentDelay(); - if (old_job && old_job->purpose != POLL && !rate_changed) + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) poll_timer_.Reset(); if (!rate_changed) @@ -548,17 +674,38 @@ void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { void SyncerThread::HandleConsecutiveContinuationError( const SyncSessionJob& old_job) { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); - DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); + // This if conditions should be compiled out in retail builds. + if (IsBackingOff()) { + DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); + } SyncSession* old = old_job.session.get(); SyncSession* s(new SyncSession(session_context_.get(), this, old->source(), old->routing_info(), old->workers())); TimeDelta length = delay_provider_->GetDelay( IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + + VLOG(2) << "SyncerThread(" << this << ")" + << " In handle continuation error. Old job purpose is " + << old_job.purpose; + VLOG(2) << "SyncerThread(" << this << ")" + << " In Handle continuation error. The time delta(ms) is: " + << length.InMilliseconds(); + + // This will reset the had_nudge variable as well. wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); - SyncSessionJob job = {NUDGE, TimeTicks::Now() + length, - make_linked_ptr(s), FROM_HERE}; - pending_nudge_.reset(new SyncSessionJob(job)); + if (old_job.purpose == SyncSessionJob::CONFIGURATION) { + SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, + make_linked_ptr(s), false, FROM_HERE); + wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); + } else { + // We are not in configuration mode. So wait_interval's pending job + // should be null. + DCHECK(wait_interval_->pending_configure_job.get() == NULL); + + // TODO(lipalani) - handle clear user data. + InitOrCoalescePendingJob(old_job); + } wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); } @@ -587,33 +734,76 @@ TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { } void SyncerThread::Stop() { + VLOG(2) << "SyncerThread(" << this << ")" << " stop called"; syncer_->RequestEarlyExit(); // Safe to call from any thread. session_context_->connection_manager()->RemoveListener(this); thread_.Stop(); } void SyncerThread::DoCanaryJob() { - DCHECK(pending_nudge_.get()); - wait_interval_->had_nudge = false; - SyncSessionJob copy = *pending_nudge_; - DoSyncSessionJob(copy); + VLOG(2) << "SyncerThread(" << this << ")" << " Do canary job"; + DoPendingJobIfPossible(true); +} + +void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { + SyncSessionJob* job_to_execute = NULL; + if (mode_ == CONFIGURATION_MODE && wait_interval_.get() + && wait_interval_->pending_configure_job.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending configure job"; + job_to_execute = wait_interval_->pending_configure_job.get(); + } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { + VLOG(2) << "SyncerThread(" << this << ")" << " Found pending nudge job"; + // Pending jobs mostly have time from the past. Reset it so this job + // will get executed. + if (pending_nudge_->scheduled_start < TimeTicks::Now()) + pending_nudge_->scheduled_start = TimeTicks::Now(); + + scoped_ptr<SyncSession> session(CreateSyncSession( + pending_nudge_->session->source())); + + // Also the routing info might have been changed since we cached the + // pending nudge. Update it by coalescing to the latest. + pending_nudge_->session->Coalesce(*(session.get())); + // The pending nudge would be cleared in the DoSyncSessionJob function. + job_to_execute = pending_nudge_.get(); + } + + if (job_to_execute != NULL) { + VLOG(2) << "SyncerThread(" << this << ")" << " Executing pending job"; + SyncSessionJob copy = *job_to_execute; + copy.is_canary_job = is_canary_job; + DoSyncSessionJob(copy); + } +} + +SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(source); + + SyncSession* session(new SyncSession(session_context_.get(), this, info, + routes, workers)); + + return session; } void SyncerThread::PollTimerCallback() { DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); ModelSafeRoutingInfo r; - std::vector<ModelSafeWorker*> w; - session_context_->registrar()->GetModelSafeRoutingInfo(&r); - session_context_->registrar()->GetWorkers(&w); ModelTypePayloadMap types_with_payloads = syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); - SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); - ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE); + SyncSession* s = CreateSyncSession(info); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, + FROM_HERE); } void SyncerThread::Unthrottle() { DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + VLOG(2) << "SyncerThread(" << this << ")" << " Unthrottled.."; + DoCanaryJob(); wait_interval_.reset(); } @@ -652,6 +842,8 @@ void SyncerThread::OnReceivedLongPollIntervalUpdate( } void SyncerThread::OnShouldStopSyncingPermanently() { + VLOG(2) << "SyncerThread(" << this << ")" + << " OnShouldStopSyncingPermanently"; syncer_->RequestEarlyExit(); // Thread-safe. Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); } diff --git a/chrome/browser/sync/engine/syncer_thread2.h b/chrome/browser/sync/engine/syncer_thread2.h index 3fbdd7f..1c698db 100644 --- a/chrome/browser/sync/engine/syncer_thread2.h +++ b/chrome/browser/sync/engine/syncer_thread2.h @@ -99,30 +99,67 @@ class SyncerThread : public sessions::SyncSession::Delegate, virtual void OnServerConnectionEvent(const ServerConnectionEvent2& event); private: - friend class SyncerThread2Test; - - // State pertaining to exponential backoff or throttling periods. - struct WaitInterval; - - // An enum used to describe jobs for scheduling purposes. - enum SyncSessionJobPurpose { - // Our poll timer schedules POLL jobs periodically based on a server - // assigned poll interval. - POLL, - // A nudge task can come from a variety of components needing to force - // a sync. The source is inferable from |session.source()|. - NUDGE, - // The user invoked a function in the UI to clear their entire account - // and stop syncing (globally). - CLEAR_USER_DATA, - // Typically used for fetching updates for a subset of the enabled types - // during initial sync or reconfiguration. We don't run all steps of - // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). - CONFIGURATION, + enum JobProcessDecision { + // Indicates we should continue with the current job. + CONTINUE, + // Indicates that we should save it to be processed later. + SAVE, + // Indicates we should drop this job. + DROP, }; - // Internal state for every sync task that is scheduled. - struct SyncSessionJob; + struct SyncSessionJob { + // An enum used to describe jobs for scheduling purposes. + enum SyncSessionJobPurpose { + // Our poll timer schedules POLL jobs periodically based on a server + // assigned poll interval. + POLL, + // A nudge task can come from a variety of components needing to force + // a sync. The source is inferable from |session.source()|. + NUDGE, + // The user invoked a function in the UI to clear their entire account + // and stop syncing (globally). + CLEAR_USER_DATA, + // Typically used for fetching updates for a subset of the enabled types + // during initial sync or reconfiguration. We don't run all steps of + // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). + CONFIGURATION, + }; + SyncSessionJob(); + SyncSessionJob(SyncSessionJobPurpose purpose, base::TimeTicks start, + linked_ptr<sessions::SyncSession> session, bool is_canary_job, + const tracked_objects::Location& nudge_location); + ~SyncSessionJob(); + SyncSessionJobPurpose purpose; + base::TimeTicks scheduled_start; + linked_ptr<sessions::SyncSession> session; + bool is_canary_job; + + // This is the location the nudge came from. used for debugging purpose. + // In case of multiple nudges getting coalesced this stores the first nudge + // that came in. + tracked_objects::Location nudge_location; + }; + friend class SyncerThread2Test; + friend class SyncerThread2WhiteboxTest; + + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + DropNudgeWhileExponentialBackOff); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, SaveNudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueNudge); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, DropPoll); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinuePoll); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, ContinueConfiguration); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + SaveConfigurationWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + SaveNudgeWhileThrottled); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueClearUserDataUnderAllCircumstances); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueCanaryJobConfig); + FRIEND_TEST_ALL_PREFIXES(SyncerThread2WhiteboxTest, + ContinueNudgeWhileExponentialBackOff); // A component used to get time delays associated with exponential backoff. // Encapsulated into a class to facilitate testing. @@ -135,11 +172,39 @@ class SyncerThread : public sessions::SyncSession::Delegate, DISALLOW_COPY_AND_ASSIGN(DelayProvider); }; + struct WaitInterval { + enum Mode { + // A wait interval whose duration has been affected by exponential + // backoff. + // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. + EXPONENTIAL_BACKOFF, + // A server-initiated throttled interval. We do not allow any syncing + // during such an interval. + THROTTLED, + }; + WaitInterval(); + ~WaitInterval(); + + Mode mode; + + // This bool is set to true if we have observed a nudge during this + // interval and mode == EXPONENTIAL_BACKOFF. + bool had_nudge; + base::TimeDelta length; + base::OneShotTimer<SyncerThread> timer; + + // Configure jobs are saved only when backing off or throttling. So we + // expose the pointer here. + scoped_ptr<SyncSessionJob> pending_configure_job; + WaitInterval(Mode mode, base::TimeDelta length); + }; + // Helper to assemble a job and post a delayed task to sync. - void ScheduleSyncSessionJob(const base::TimeDelta& delay, - SyncSessionJobPurpose purpose, - sessions::SyncSession* session, - const tracked_objects::Location& nudge_location); + void ScheduleSyncSessionJob( + const base::TimeDelta& delay, + SyncSessionJob::SyncSessionJobPurpose purpose, + sessions::SyncSession* session, + const tracked_objects::Location& nudge_location); // Invoke the Syncer to perform a sync. void DoSyncSessionJob(const SyncSessionJob& job); @@ -161,22 +226,35 @@ class SyncerThread : public sessions::SyncSession::Delegate, // Helper to ScheduleNextSync in case of consecutive sync errors. void HandleConsecutiveContinuationError(const SyncSessionJob& old_job); - // Determines if it is legal to run a sync job for |purpose| at - // |scheduled_start|. This checks current operational mode, backoff or - // throttling, freshness (so we don't make redundant syncs), and connection. - bool ShouldRunJob(SyncSessionJobPurpose purpose, - const base::TimeTicks& scheduled_start); + // Determines if it is legal to run |job| by checking current + // operational mode, backoff or throttling, freshness + // (so we don't make redundant syncs), and connection. + bool ShouldRunJob(const SyncSessionJob& job); + + // Decide whether we should CONTINUE, SAVE or DROP the job. + JobProcessDecision DecideOnJob(const SyncSessionJob& job); + + // Decide on whether to CONTINUE, SAVE or DROP the job when we are in + // backoff mode. + JobProcessDecision DecideWhileInWaitInterval(const SyncSessionJob& job); + + // Saves the job for future execution. Note: It drops all the poll jobs. + void SaveJob(const SyncSessionJob& job); + + // Coalesces the current job with the pending nudge. + void InitOrCoalescePendingJob(const SyncSessionJob& job); // 'Impl' here refers to real implementation of public functions, running on // |thread_|. void StartImpl(Mode mode, linked_ptr<ModeChangeCallback> callback); void ScheduleNudgeImpl( const base::TimeDelta& delay, - NudgeSource source, + sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source, const syncable::ModelTypePayloadMap& types_with_payloads, - const tracked_objects::Location& nudge_location); + bool is_canary_job, const tracked_objects::Location& nudge_location); void ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, - const std::vector<ModelSafeWorker*>& workers); + const std::vector<ModelSafeWorker*>& workers, + const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source); void ScheduleClearUserDataImpl(); // Returns true if the client is currently in exponential backoff. @@ -189,12 +267,21 @@ class SyncerThread : public sessions::SyncSession::Delegate, void DoCanaryJob(); void Unthrottle(); + // Executes the pending job. Called whenever an event occurs that may + // change conditions permitting a job to run. Like when network connection is + // re-established, mode changes etc. + void DoPendingJobIfPossible(bool is_canary_job); + + // The pointer is owned by the caller. + browser_sync::sessions::SyncSession* CreateSyncSession( + const browser_sync::sessions::SyncSourceInfo& info); + // Creates a session for a poll and performs the sync. void PollTimerCallback(); // Assign |start| and |end| to appropriate SyncerStep values for the // specified |purpose|. - void SetSyncerStepsForPurpose(SyncSessionJobPurpose purpose, + void SetSyncerStepsForPurpose(SyncSessionJob::SyncSessionJobPurpose purpose, SyncerStep* start, SyncerStep* end); diff --git a/chrome/browser/sync/engine/syncer_thread2_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_unittest.cc index 22c72cd..2823567 100644 --- a/chrome/browser/sync/engine/syncer_thread2_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread2_unittest.cc @@ -47,7 +47,6 @@ struct SyncShareRecords { // Convenient to use in tests wishing to analyze SyncShare calls over time. static const size_t kMinNumSamples = 5; - class SyncerThread2Test : public testing::Test { public: class MockDelayProvider : public SyncerThread::DelayProvider { @@ -69,6 +68,20 @@ class SyncerThread2Test : public testing::Test { syncer_thread_.reset(new SyncerThread(context_, syncer_)); } + virtual void SetUpWithTypes(syncable::ModelTypeBitSet types) { + syncdb_.SetUp(); + syncer_ = new MockSyncer(); + delay_ = NULL; + registrar_.reset(MockModelSafeWorkerRegistrar::PassiveForTypes(types)); + connection_.reset(new MockConnectionManager(syncdb_.manager(), "Test")); + connection_->SetServerReachable(); + context_ = new SyncSessionContext(connection_.get(), syncdb_.manager(), + registrar_.get(), std::vector<SyncEngineEventListener*>()); + context_->set_notifications_enabled(true); + context_->set_account_name("Test"); + syncer_thread_.reset(new SyncerThread(context_, syncer_)); + } + SyncerThread* syncer_thread() { return syncer_thread_.get(); } MockSyncer* syncer() { return syncer_; } MockDelayProvider* delay() { return delay_; } @@ -227,6 +240,148 @@ TEST_F(SyncerThread2Test, Nudge) { records2.snapshots[0]->source.updates_source); } +// Make sure a regular config command is scheduled fine in the absence of any +// errors. +TEST_F(SyncerThread2Test, Config) { + base::WaitableEvent done(false, false); + SyncShareRecords records; + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types); + done.TimedWait(timeout()); + + EXPECT_EQ(1U, records.snapshots.size()); + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[0]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::FIRST_UPDATE, + records.snapshots[0]->source.updates_source); +} + +// Simulate a failure and make sure the config request is retried. +TEST_F(SyncerThread2Test, ConfigWithBackingOff) { + base::WaitableEvent done(false, false); + base::WaitableEvent* dummy = NULL; + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(1))); + SyncShareRecords records; + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, dummy)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types); + done.TimedWait(timeout()); + + EXPECT_EQ(2U, records.snapshots.size()); + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[1]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION, + records.snapshots[1]->source.updates_source); +} + +// Issue 2 config commands. Second one right after the first has failed +// and make sure LATEST is executed. +TEST_F(SyncerThread2Test, MultipleConfigWithBackingOff) { + syncable::ModelTypeBitSet model_types1, model_types2; + model_types1[syncable::BOOKMARKS] = true; + model_types2[syncable::AUTOFILL] = true; + SetUpWithTypes(model_types1 | model_types2); + base::WaitableEvent done(false, false); + base::WaitableEvent done1(false, false); + base::WaitableEvent* dummy = NULL; + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(30))); + SyncShareRecords records; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, dummy)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, &done1)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types1); + + // done1 indicates the first config failed. + done1.TimedWait(timeout()); + syncer_thread()->ScheduleConfig(model_types2); + done.TimedWait(timeout()); + + EXPECT_EQ(3U, records.snapshots.size()); + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types2, + records.snapshots[2]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::FIRST_UPDATE, + records.snapshots[2]->source.updates_source); +} + +// Issue a nudge when the config has failed. Make sure both the config and +// nudge are executed. +TEST_F(SyncerThread2Test, NudgeWithConfigWithBackingOff) { + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + base::WaitableEvent done(false, false); + base::WaitableEvent done1(false, false); + base::WaitableEvent done2(false, false); + base::WaitableEvent* dummy = NULL; + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(50))); + SyncShareRecords records; + + EXPECT_CALL(*syncer(), SyncShare(_,_,_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, dummy)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + WithArg<0>(RecordSyncShare(&records, 1U, &done1)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done2)))) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); + + syncer_thread()->ScheduleConfig(model_types); + done1.TimedWait(timeout()); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, model_types, + FROM_HERE); + + // done2 indicates config suceeded. Now change the mode so nudge can execute. + done2.TimedWait(timeout()); + syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); + done.TimedWait(timeout()); + EXPECT_EQ(4U, records.snapshots.size()); + + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[2]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION, + records.snapshots[2]->source.updates_source); + + EXPECT_TRUE(CompareModelTypeBitSetToModelTypePayloadMap(model_types, + records.snapshots[3]->source.types)); + EXPECT_EQ(GetUpdatesCallerInfo::LOCAL, + records.snapshots[3]->source.updates_source); + +} + + // Test that nudges are coalesced. TEST_F(SyncerThread2Test, NudgeCoalescing) { syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); @@ -476,7 +631,7 @@ TEST_F(SyncerThread2Test, ConfigurationMode) { base::WaitableEvent* dummy = NULL; syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); EXPECT_CALL(*syncer(), SyncShare(_,_,_)) - .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + .WillOnce((Invoke(sessions::test_util::SimulateSuccess), WithArg<0>(RecordSyncShare(&records, 1U, dummy)))); syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE, NULL); syncable::ModelTypeBitSet nudge_types; @@ -739,6 +894,7 @@ TEST_F(SyncerThread2Test, StartWhenNotConnected) { base::WaitableEvent done(false, false); MessageLoop cur; connection()->SetServerNotReachable(); + EXPECT_CALL(*syncer(), SyncShare(_,_,_)).WillOnce(SignalEvent(&done)); syncer_thread()->Start(SyncerThread::NORMAL_MODE, NULL); syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, ModelTypeBitSet(), FROM_HERE); @@ -752,9 +908,6 @@ TEST_F(SyncerThread2Test, StartWhenNotConnected) { // By now, the server connection event should have been posted to the // SyncerThread. FlushLastTask(&done); - EXPECT_CALL(*syncer(), SyncShare(_,_,_)).WillOnce(SignalEvent(&done)); - syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, ModelTypeBitSet(), - FROM_HERE); done.TimedWait(timeout()); } diff --git a/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc new file mode 100644 index 0000000..e1d1218 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2_whitebox_unittest.cc @@ -0,0 +1,235 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/time.h" +#include "chrome/browser/sync/engine/mock_model_safe_workers.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" +#include "chrome/browser/sync/sessions/sync_session_context.h" +#include "chrome/browser/sync/sessions/test_util.h" +#include "chrome/test/sync/engine/mock_connection_manager.h" +#include "chrome/test/sync/engine/test_directory_setter_upper.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/gmock/include/gmock/gmock.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { +using sessions::SyncSessionContext; +using browser_sync::Syncer; + +namespace s3 { + +class SyncerThread2WhiteboxTest : public testing::Test { + public: + virtual void SetUp() { + syncdb_.SetUp(); + Syncer* syncer = new Syncer(); + registrar_.reset(MockModelSafeWorkerRegistrar::PassiveBookmarks()); + context_ = new SyncSessionContext(connection_.get(), syncdb_.manager(), + registrar_.get(), std::vector<SyncEngineEventListener*>()); + context_->set_notifications_enabled(true); + context_->set_account_name("Test"); + syncer_thread_.reset(new SyncerThread(context_, syncer)); + } + + virtual void TearDown() { + syncdb_.TearDown(); + } + + void SetMode(SyncerThread::Mode mode) { + syncer_thread_->mode_ = mode; + } + + void SetLastSyncedTime(base::TimeTicks ticks) { + syncer_thread_->last_sync_session_end_time_ = ticks; + } + + void SetServerConnection(bool connected) { + syncer_thread_->server_connection_ok_ = connected; + } + + void ResetWaitInterval() { + syncer_thread_->wait_interval_.reset(); + } + + void SetWaitIntervalToThrottled() { + syncer_thread_->wait_interval_.reset(new SyncerThread::WaitInterval( + SyncerThread::WaitInterval::THROTTLED, TimeDelta::FromSeconds(1))); + } + + void SetWaitIntervalToExponentialBackoff() { + syncer_thread_->wait_interval_.reset( + new SyncerThread::WaitInterval( + SyncerThread::WaitInterval::EXPONENTIAL_BACKOFF, + TimeDelta::FromSeconds(1))); + } + + SyncerThread::JobProcessDecision DecideOnJob( + const SyncerThread::SyncSessionJob& job) { + return syncer_thread_->DecideOnJob(job); + } + + void InitializeSyncerOnNormalMode() { + SetMode(SyncerThread::NORMAL_MODE); + ResetWaitInterval(); + SetServerConnection(true); + SetLastSyncedTime(base::TimeTicks::Now()); + } + + SyncerThread::JobProcessDecision CreateAndDecideJob( + SyncerThread::SyncSessionJob::SyncSessionJobPurpose purpose) { + struct SyncerThread::SyncSessionJob job; + job.purpose = purpose; + job.scheduled_start = TimeTicks::Now(); + return DecideOnJob(job); + } + + protected: + scoped_ptr<SyncerThread> syncer_thread_; + + private: + scoped_ptr<MockConnectionManager> connection_; + SyncSessionContext* context_; + //MockDelayProvider* delay_; + scoped_ptr<MockModelSafeWorkerRegistrar> registrar_; + MockDirectorySetterUpper syncdb_; +}; + +TEST_F(SyncerThread2WhiteboxTest, SaveNudge) { + InitializeSyncerOnNormalMode(); + + // Now set the mode to configure. + SetMode(SyncerThread::CONFIGURATION_MODE); + + SyncerThread::JobProcessDecision decision = + CreateAndDecideJob(SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::SAVE); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueNudge) { + InitializeSyncerOnNormalMode(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, DropPoll) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::POLL); + + EXPECT_EQ(decision, SyncerThread::DROP); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinuePoll) { + InitializeSyncerOnNormalMode(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::POLL); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueConfiguration) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CONFIGURATION); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, SaveConfigurationWhileThrottled) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SetWaitIntervalToThrottled(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CONFIGURATION); + + EXPECT_EQ(decision, SyncerThread::SAVE); +} + +TEST_F(SyncerThread2WhiteboxTest, SaveNudgeWhileThrottled) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + + SetWaitIntervalToThrottled(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::SAVE); + +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueClearUserDataUnderAllCircumstances) { + InitializeSyncerOnNormalMode(); + + SetMode(SyncerThread::CONFIGURATION_MODE); + SetWaitIntervalToThrottled(); + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CLEAR_USER_DATA); + EXPECT_EQ(decision, SyncerThread::CONTINUE); + + SetMode(SyncerThread::NORMAL_MODE); + SetWaitIntervalToExponentialBackoff(); + decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::CLEAR_USER_DATA); + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueNudgeWhileExponentialBackOff) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::NORMAL_MODE); + SetWaitIntervalToExponentialBackoff(); + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + +TEST_F(SyncerThread2WhiteboxTest, DropNudgeWhileExponentialBackOff) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::NORMAL_MODE); + SetWaitIntervalToExponentialBackoff(); + + syncer_thread_->wait_interval_->had_nudge = true; + + SyncerThread::JobProcessDecision decision = CreateAndDecideJob( + SyncerThread::SyncSessionJob::NUDGE); + + EXPECT_EQ(decision, SyncerThread::DROP); +} + +TEST_F(SyncerThread2WhiteboxTest, ContinueCanaryJobConfig) { + InitializeSyncerOnNormalMode(); + SetMode(SyncerThread::CONFIGURATION_MODE); + SetWaitIntervalToExponentialBackoff(); + + struct SyncerThread::SyncSessionJob job; + job.purpose = SyncerThread::SyncSessionJob::CONFIGURATION; + job.scheduled_start = TimeTicks::Now(); + job.is_canary_job = true; + SyncerThread::JobProcessDecision decision = DecideOnJob(job); + + EXPECT_EQ(decision, SyncerThread::CONTINUE); +} + + +} // namespace s3 +} // namespace browser_sync + +// SyncerThread won't outlive the test! +DISABLE_RUNNABLE_METHOD_REFCOUNT( + browser_sync::s3::SyncerThread2WhiteboxTest); diff --git a/chrome/browser/sync/glue/sync_backend_host.cc b/chrome/browser/sync/glue/sync_backend_host.cc index c317158..50fc35c 100644 --- a/chrome/browser/sync/glue/sync_backend_host.cc +++ b/chrome/browser/sync/glue/sync_backend_host.cc @@ -412,8 +412,10 @@ void SyncBackendHost::ConfigureDataTypes( // If we're doing the first configure (at startup) this is redundant as the // syncer thread always must start in config mode. if (using_new_syncer_thread_) { - core_->syncapi()->StartConfigurationMode(NewCallback(core_.get(), - &SyncBackendHost::Core::FinishConfigureDataTypes)); + core_thread_.message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &SyncBackendHost::Core::DoStartConfigurationMode)); } else { FinishConfigureDataTypesOnFrontendLoop(); } @@ -1211,6 +1213,11 @@ void SyncBackendHost::Core::DoProcessMessage( syncapi_->GetJsBackend()->ProcessMessage(name, args, sender); } +void SyncBackendHost::Core::DoStartConfigurationMode() { + syncapi_->StartConfigurationMode(NewCallback(this, + &SyncBackendHost::Core::FinishConfigureDataTypes)); +} + void SyncBackendHost::Core::DeferNudgeForCleanup() { DCHECK_EQ(MessageLoop::current(), host_->core_thread_.message_loop()); deferred_nudge_for_cleanup_requested_ = true; diff --git a/chrome/browser/sync/glue/sync_backend_host.h b/chrome/browser/sync/glue/sync_backend_host.h index 1e65dd9..086b2e0 100644 --- a/chrome/browser/sync/glue/sync_backend_host.h +++ b/chrome/browser/sync/glue/sync_backend_host.h @@ -326,7 +326,6 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { void CreateSyncNotifier(const scoped_refptr<net::URLRequestContextGetter>& request_context_getter); - // Note: // // The Do* methods are the various entry points from our SyncBackendHost. @@ -402,6 +401,8 @@ class SyncBackendHost : public browser_sync::ModelSafeWorkerRegistrar { const std::string& name, const JsArgList& args, const JsEventHandler* sender); + void DoStartConfigurationMode(); + // A callback from the SyncerThread when it is safe to continue config. void FinishConfigureDataTypes(); diff --git a/chrome/browser/sync/profile_sync_service.cc b/chrome/browser/sync/profile_sync_service.cc index dde2a0e..aa95e2a 100644 --- a/chrome/browser/sync/profile_sync_service.cc +++ b/chrome/browser/sync/profile_sync_service.cc @@ -522,6 +522,7 @@ void ProfileSyncService::OnBackendInitialized() { void ProfileSyncService::OnSyncCycleCompleted() { UpdateLastSyncedTime(); + VLOG(2) << "Notifying observers sync cycle completed"; NotifyObservers(); } diff --git a/chrome/browser/sync/sessions/sync_session.cc b/chrome/browser/sync/sessions/sync_session.cc index cd01375..a38e31a 100644 --- a/chrome/browser/sync/sessions/sync_session.cc +++ b/chrome/browser/sync/sessions/sync_session.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -41,11 +41,14 @@ void SyncSession::Coalesce(const SyncSession& session) { std::back_inserter(temp)); workers_.swap(temp); - ModelSafeRoutingInfo temp_r; - std::set_union(routing_info_.begin(), routing_info_.end(), - session.routing_info_.begin(), session.routing_info_.end(), - std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); - routing_info_.swap(temp_r); + // We have to update the model safe routing info to the union. In case the + // same key is present in both pick the one from session. + for (ModelSafeRoutingInfo::const_iterator it = + session.routing_info_.begin(); + it != session.routing_info_.end(); + ++it) { + routing_info_[it->first] = it->second; + } } void SyncSession::ResetTransientState() { diff --git a/chrome/chrome_tests.gypi b/chrome/chrome_tests.gypi index 23158f8..c8189a1 100644 --- a/chrome/chrome_tests.gypi +++ b/chrome/chrome_tests.gypi @@ -2915,6 +2915,7 @@ 'browser/sync/engine/syncer_proto_util_unittest.cc', 'browser/sync/engine/syncer_thread_unittest.cc', 'browser/sync/engine/syncer_thread2_unittest.cc', + 'browser/sync/engine/syncer_thread2_whitebox_unittest.cc', 'browser/sync/engine/syncer_unittest.cc', 'browser/sync/engine/syncproto_unittest.cc', 'browser/sync/engine/syncapi_mock.h', |