diff options
author | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-26 19:21:59 +0000 |
---|---|---|
committer | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-26 19:21:59 +0000 |
commit | a55fde6d976fc328946c02270fa6c517b0faa9a7 (patch) | |
tree | ac780f2e5aed836459d1a9edc7fd9e8ad40568f6 /chrome/browser/sync/engine/syncer_thread2.cc | |
parent | ac522c7e26be18b614d514a5131199658d5c57cb (diff) | |
download | chromium_src-a55fde6d976fc328946c02270fa6c517b0faa9a7.zip chromium_src-a55fde6d976fc328946c02270fa6c517b0faa9a7.tar.gz chromium_src-a55fde6d976fc328946c02270fa6c517b0faa9a7.tar.bz2 |
sync: beginnings of MessageLoop based SyncerThread
This is just a checkpoint. It's not wired up to anything yet, and not fully implemented (note several NOTIMPLEMENTED() checks).
For design and overview, see https://docs.google.com/document/d/1bFqqtpA7TZuwtyEqlSxgkCCRrHvYRnnH6HlHou2LwVo/edit
BUG=26339
TEST=sync_unit_tests
Review URL: http://codereview.chromium.org/5939006
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@72663 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/engine/syncer_thread2.cc')
-rwxr-xr-x | chrome/browser/sync/engine/syncer_thread2.cc | 498 |
1 files changed, 498 insertions, 0 deletions
diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc new file mode 100755 index 0000000..dfb9b95 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2.cc @@ -0,0 +1,498 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/engine/syncer_thread2.h" + +#include <algorithm> + +#include "base/rand_util.h" +#include "chrome/browser/sync/engine/syncer.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { + +using sessions::SyncSession; +using sessions::SyncSessionSnapshot; +using sessions::SyncSourceInfo; +using syncable::ModelTypeBitSet; +using sync_pb::GetUpdatesCallerInfo; + +namespace s3 { + +SyncerThread::DelayProvider::DelayProvider() {} +SyncerThread::DelayProvider::~DelayProvider() {} + +TimeDelta SyncerThread::DelayProvider::GetDelay( + const base::TimeDelta& last_delay) { + return SyncerThread::GetRecommendedDelay(last_delay); +} + +SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) + : mode(mode), had_nudge(false), length(length) { } + +SyncerThread::SyncerThread(sessions::SyncSessionContext* context, + Syncer* syncer) + : thread_("SyncEngine_SyncerThread"), + syncer_short_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), + syncer_long_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), + server_connection_ok_(false), + delay_provider_(new DelayProvider()), + syncer_(syncer), + session_context_(context) { +} + +SyncerThread::~SyncerThread() { + DCHECK(!thread_.IsRunning()); +} + +void SyncerThread::Start(Mode mode) { + if (!thread_.IsRunning() && !thread_.Start()) { + NOTREACHED() << "Unable to start SyncerThread."; + return; + } + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::StartImpl, mode)); +} + +void SyncerThread::StartImpl(Mode mode) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!session_context_->account_name().empty()); + DCHECK(syncer_.get()); + mode_ = mode; + AdjustPolling(NULL); // Will kick start poll timer if needed. +} + +bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose, + const TimeTicks& scheduled_start) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + // Check wait interval. + if (wait_interval_.get()) { + if (wait_interval_->mode == WaitInterval::THROTTLED) + return false; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + DCHECK(purpose == SyncSessionJob::POLL || + purpose == SyncSessionJob::NUDGE); + if ((purpose != SyncSessionJob::NUDGE) || wait_interval_->had_nudge) + return false; + } + + // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that + // were intended for a normal sync if we are in configuration mode, and vice + // versa. + switch (mode_) { + case CONFIGURATION_MODE: + if (purpose != SyncSessionJob::CONFIGURATION) + return false; + break; + case NORMAL_MODE: + if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE) + return false; + break; + default: + NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; + return false; + } + + // Continuation NUDGE tasks have priority over POLLs because they are the + // only tasks that trigger exponential backoff, so this prevents them from + // being starved from running (e.g. due to a very, very low poll interval, + // such as 0ms). It's rare that this would ever matter in practice. + if (purpose == SyncSessionJob::POLL && (pending_nudge_.get() && + pending_nudge_->session->source().first == + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { + return false; + } + + // Freshness condition. + if (purpose == SyncSessionJob::NUDGE && + (scheduled_start < last_sync_session_end_time_)) { + return false; + } + + return server_connection_ok_; +} + +namespace { +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_CLEAR_PRIVATE_DATA: + return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; + case NUDGE_SOURCE_UNKNOWN: + default: + return GetUpdatesCallerInfo::UNKNOWN; + } +} + +// Functor for std::find_if to search by ModelSafeGroup. +struct WorkerGroupIs { + explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {} + bool operator()(ModelSafeWorker* w) { + return group == w->GetModelSafeGroup(); + } + ModelSafeGroup group; +}; +} // namespace + +void SyncerThread::ScheduleNudge(const TimeDelta& delay, + NudgeSource source, const ModelTypeBitSet& types) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleNudgeImpl, delay, source, types)); +} + +void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, + NudgeSource source, const ModelTypeBitSet& model_types) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + TimeTicks rough_start = TimeTicks::Now() + delay; + if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start)) + return; + + // Note we currently nudge for all types regardless of the ones incurring + // the nudge. Doing different would throw off some syncer commands like + // CleanupDisabledTypes. We may want to change this in the future. + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types); + + scoped_ptr<SyncSession> session(new SyncSession( + session_context_.get(), this, info, routes, workers)); + + if (pending_nudge_.get()) { + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) + return; + + pending_nudge_->session->Coalesce(*session.get()); + if (!IsBackingOff()) { + return; + } else { + // Re-schedule the current pending nudge. + SyncSession* s = pending_nudge_->session.get(); + session.reset(new SyncSession(s->context(), s->delegate(), s->source(), + s->routing_info(), s->workers())); + pending_nudge_.reset(); + } + } + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release()); +} + +// Helper to extract the routing info and workers corresponding to types in +// |types| from |registrar|. +void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, + ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, + std::vector<ModelSafeWorker*>* workers) { + ModelSafeRoutingInfo r_tmp; + std::vector<ModelSafeWorker*> w_tmp; + registrar->GetModelSafeRoutingInfo(&r_tmp); + registrar->GetWorkers(&w_tmp); + + typedef std::vector<ModelSafeWorker*>::const_iterator iter; + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { + if (!types.test(i)) + continue; + syncable::ModelType t = syncable::ModelTypeFromInt(i); + DCHECK_EQ(1U, r_tmp.count(t)); + (*routes)[t] = r_tmp[t]; + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t])); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); + } + + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), + WorkerGroupIs(GROUP_PASSIVE)); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); +} + +void SyncerThread::ScheduleConfig(const TimeDelta& delay, + const ModelTypeBitSet& types) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + GetModelSafeParamsForTypes(types, session_context_->registrar(), + &routes, &workers); + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers)); +} + +void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay, + const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + NOTIMPLEMENTED() << "TODO(tim)"; +} + +void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, + SyncSessionJob::Purpose purpose, sessions::SyncSession* session) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + SyncSessionJob job = {purpose, TimeTicks::Now() + delay, + make_linked_ptr(session)}; + if (purpose == SyncSessionJob::NUDGE) { + DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); + pending_nudge_.reset(new SyncSessionJob(job)); + } + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); +} + +void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + if (job.purpose == SyncSessionJob::NUDGE) { + DCHECK(pending_nudge_.get()); + if (pending_nudge_->session != job.session) + return; // Another nudge must have been scheduled in in the meantime. + pending_nudge_.reset(); + } else if (job.purpose == SyncSessionJob::CONFIGURATION) { + NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]"; + } + + bool has_more_to_sync = true; + bool did_job = false; + while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { + VLOG(1) << "SyncerThread: Calling SyncShare."; + did_job = true; + // Synchronously perform the sync session from this thread. + syncer_->SyncShare(job.session.get()); + has_more_to_sync = job.session->HasMoreToSync(); + if (has_more_to_sync) + job.session->ResetTransientState(); + } + VLOG(1) << "SyncerThread: Done SyncShare looping."; + if (did_job) + FinishSyncSessionJob(job); +} + +void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + // Update timing information for how often datatypes are triggering nudges. + base::TimeTicks now = TimeTicks::Now(); + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; + i < job.session->source().second.size() && + !last_sync_session_end_time_.is_null(); + ++i) { + if (job.session->source().second[i]) { + syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i), + now - last_sync_session_end_time_); + } + } + last_sync_session_end_time_ = now; + if (IsSyncingCurrentlySilenced()) + return; // Nothing to do. + + VLOG(1) << "Updating the next polling time after SyncMain"; + ScheduleNextSync(job); +} + +void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!old_job.session->HasMoreToSync()); + // Note: |num_server_changes_remaining| > 0 here implies that we received a + // broken response while trying to download all updates, because the Syncer + // will loop until this value is exhausted. Also, if unsynced_handles exist + // but HasMoreToSync is false, this implies that the Syncer determined no + // forward progress was possible at this time (an error, such as an HTTP + // 500, is likely to have occurred during commit). + const bool work_to_do = + old_job.session->status_controller()->num_server_changes_remaining() > 0 + || old_job.session->status_controller()->unsynced_handles().size() > 0; + VLOG(1) << "syncer has work to do: " << work_to_do; + + AdjustPolling(&old_job); + + // TODO(tim): Old impl had special code if notifications disabled. Needed? + if (!work_to_do) { + wait_interval_.reset(); // Success implies backoff relief. + return; + } + + if (old_job.session->source().first == + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { + // We don't seem to have made forward progress. Start or extend backoff. + HandleConsecutiveContinuationError(old_job); + } else if (IsBackingOff()) { + // We weren't continuing but we're in backoff; must have been a nudge. + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); + DCHECK(!wait_interval_->had_nudge); + wait_interval_->had_nudge = true; + wait_interval_->timer.Reset(); + } else { + // We weren't continuing and we aren't in backoff. Schedule a normal + // continuation. + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, + old_job.session->source().second); + } +} + +void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { + DCHECK(thread_.IsRunning()); + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + TimeDelta poll = (!session_context_->notifications_enabled()) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + bool rate_changed = !poll_timer_.IsRunning() || + poll != poll_timer_.GetCurrentDelay(); + + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) + poll_timer_.Reset(); + + if (!rate_changed) + return; + + // Adjust poll rate. + poll_timer_.Stop(); + poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); +} + +void SyncerThread::HandleConsecutiveContinuationError( + const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); + SyncSession* old = old_job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + TimeDelta length = delay_provider_->GetDelay( + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, + length)); + SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, + make_linked_ptr(s)}; + pending_nudge_.reset(new SyncSessionJob(job)); + wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); +} + +// static +TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { + if (last_delay.InSeconds() >= kMaxBackoffSeconds) + return TimeDelta::FromSeconds(kMaxBackoffSeconds); + + // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 + int64 backoff_s = + std::max(static_cast<int64>(1), + last_delay.InSeconds() * kBackoffRandomizationFactor); + + // Flip a coin to randomize backoff interval by +/- 50%. + int rand_sign = base::RandInt(0, 1) * 2 - 1; + + // Truncation is adequate for rounding here. + backoff_s = backoff_s + + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); + + // Cap the backoff interval. + backoff_s = std::max(static_cast<int64>(1), + std::min(backoff_s, kMaxBackoffSeconds)); + + return TimeDelta::FromSeconds(backoff_s); +} + +void SyncerThread::Stop() { + syncer_->RequestEarlyExit(); // Safe to call from any thread. + thread_.Stop(); + Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); +} + +void SyncerThread::DoCanaryJob() { + DCHECK(pending_nudge_.get()); + wait_interval_->had_nudge = false; + SyncSessionJob copy = {pending_nudge_->purpose, + pending_nudge_->scheduled_start, + pending_nudge_->session}; + DoSyncSessionJob(copy); +} + +void SyncerThread::PollTimerCallback() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + ModelSafeRoutingInfo r; + std::vector<ModelSafeWorker*> w; + session_context_->registrar()->GetModelSafeRoutingInfo(&r); + session_context_->registrar()->GetWorkers(&w); + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet()); + SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s); +} + +void SyncerThread::Unthrottle() { + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + wait_interval_.reset(); +} + +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + session_context_->NotifyListeners(SyncEngineEvent(cause)); +} + +bool SyncerThread::IsBackingOff() const { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::EXPONENTIAL_BACKOFF; +} + +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, + silenced_until - TimeTicks::Now())); + wait_interval_->timer.Start(wait_interval_->length, this, + &SyncerThread::Unthrottle); +} + +bool SyncerThread::IsSyncingCurrentlySilenced() { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::THROTTLED; +} + +void SyncerThread::OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_short_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_long_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnShouldStopSyncingPermanently() { + syncer_->RequestEarlyExit(); // Thread-safe. + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); +} + +void SyncerThread::OnServerConnectionEvent( + const ServerConnectionEvent& event) { + NOTIMPLEMENTED(); +} + +void SyncerThread::set_notifications_enabled(bool notifications_enabled) { + session_context_->set_notifications_enabled(notifications_enabled); +} + +} // s3 +} // browser_sync |