diff options
author | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-26 19:21:59 +0000 |
---|---|---|
committer | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-01-26 19:21:59 +0000 |
commit | a55fde6d976fc328946c02270fa6c517b0faa9a7 (patch) | |
tree | ac780f2e5aed836459d1a9edc7fd9e8ad40568f6 /chrome | |
parent | ac522c7e26be18b614d514a5131199658d5c57cb (diff) | |
download | chromium_src-a55fde6d976fc328946c02270fa6c517b0faa9a7.zip chromium_src-a55fde6d976fc328946c02270fa6c517b0faa9a7.tar.gz chromium_src-a55fde6d976fc328946c02270fa6c517b0faa9a7.tar.bz2 |
sync: beginnings of MessageLoop based SyncerThread
This is just a checkpoint. It's not wired up to anything yet, and not fully implemented (note several NOTIMPLEMENTED() checks).
For design and overview, see https://docs.google.com/document/d/1bFqqtpA7TZuwtyEqlSxgkCCRrHvYRnnH6HlHou2LwVo/edit
BUG=26339
TEST=sync_unit_tests
Review URL: http://codereview.chromium.org/5939006
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@72663 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome')
22 files changed, 1639 insertions, 20 deletions
diff --git a/chrome/browser/sync/engine/mock_model_safe_workers.cc b/chrome/browser/sync/engine/mock_model_safe_workers.cc new file mode 100644 index 0000000..bd6a26b --- /dev/null +++ b/chrome/browser/sync/engine/mock_model_safe_workers.cc @@ -0,0 +1,43 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/engine/mock_model_safe_workers.h" + +namespace browser_sync { + +ModelSafeGroup MockUIModelWorker::GetModelSafeGroup() { return GROUP_UI; } +bool MockUIModelWorker::CurrentThreadIsWorkThread() { return true; } + +ModelSafeGroup MockDBModelWorker::GetModelSafeGroup() { return GROUP_DB; } +bool MockDBModelWorker::CurrentThreadIsWorkThread() { return true; } + +MockModelSafeWorkerRegistrar::~MockModelSafeWorkerRegistrar() {} + +// static +MockModelSafeWorkerRegistrar* + MockModelSafeWorkerRegistrar::PassiveBookmarks() { + ModelSafeRoutingInfo routes; + routes[syncable::BOOKMARKS] = GROUP_PASSIVE; + MockModelSafeWorkerRegistrar* m = new MockModelSafeWorkerRegistrar(routes); + m->passive_worker_ = new ModelSafeWorker(); + return m; +} + +void MockModelSafeWorkerRegistrar::GetWorkers( + std::vector<ModelSafeWorker*>* out) { + if (passive_worker_.get()) + out->push_back(passive_worker_.get()); +} + +void MockModelSafeWorkerRegistrar::GetModelSafeRoutingInfo( + ModelSafeRoutingInfo* out) { + *out = routes_; +} + +MockModelSafeWorkerRegistrar::MockModelSafeWorkerRegistrar( + const ModelSafeRoutingInfo& routes) { + routes_ = routes; +} + +} // namespace browser_sync diff --git a/chrome/browser/sync/engine/mock_model_safe_workers.h b/chrome/browser/sync/engine/mock_model_safe_workers.h index 3d0aa8a..a76a51c 100644 --- a/chrome/browser/sync/engine/mock_model_safe_workers.h +++ b/chrome/browser/sync/engine/mock_model_safe_workers.h @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -6,20 +6,38 @@ #define CHROME_BROWSER_SYNC_ENGINE_MOCK_MODEL_SAFE_WORKERS_H_ #pragma once +#include <vector> + +#include "base/ref_counted.h" #include "chrome/browser/sync/engine/model_safe_worker.h" +#include "chrome/browser/sync/syncable/model_type.h" namespace browser_sync { class MockUIModelWorker : public ModelSafeWorker { public: - virtual ModelSafeGroup GetModelSafeGroup() { return GROUP_UI; } - virtual bool CurrentThreadIsWorkThread() { return true; } + virtual ModelSafeGroup GetModelSafeGroup(); + virtual bool CurrentThreadIsWorkThread(); }; class MockDBModelWorker : public ModelSafeWorker { public: - virtual ModelSafeGroup GetModelSafeGroup() { return GROUP_DB; } - virtual bool CurrentThreadIsWorkThread() { return true; } + virtual ModelSafeGroup GetModelSafeGroup(); + virtual bool CurrentThreadIsWorkThread(); +}; + +class MockModelSafeWorkerRegistrar : public ModelSafeWorkerRegistrar { + public: + virtual ~MockModelSafeWorkerRegistrar(); + static MockModelSafeWorkerRegistrar* PassiveBookmarks(); + virtual void GetWorkers(std::vector<ModelSafeWorker*>* out); + virtual void GetModelSafeRoutingInfo(ModelSafeRoutingInfo* out); + + private: + explicit MockModelSafeWorkerRegistrar(const ModelSafeRoutingInfo& routes); + + scoped_refptr<ModelSafeWorker> passive_worker_; + ModelSafeRoutingInfo routes_; }; } // namespace browser_sync diff --git a/chrome/browser/sync/engine/nudge_source.h b/chrome/browser/sync/engine/nudge_source.h new file mode 100644 index 0000000..6a75ad1 --- /dev/null +++ b/chrome/browser/sync/engine/nudge_source.h @@ -0,0 +1,29 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROME_BROWSER_SYNC_ENGINE_NUDGE_SOURCE_H_ +#define CHROME_BROWSER_SYNC_ENGINE_NUDGE_SOURCE_H_ +#pragma once + +namespace browser_sync { + +namespace s3 { + +enum NudgeSource { + NUDGE_SOURCE_UNKNOWN = 0, + // We received an invalidation message and are nudging to check for updates. + NUDGE_SOURCE_NOTIFICATION, + // A local change occurred (e.g. bookmark moved). + NUDGE_SOURCE_LOCAL, + // A previous sync cycle did not fully complete (e.g. HTTP error). + NUDGE_SOURCE_CONTINUATION, + // A nudge corresponding to the user invoking a function in the UI to clear + // their entire account and stop syncing (globally). + NUDGE_SOURCE_CLEAR_PRIVATE_DATA, +}; + +} // namespace s3 +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_ENGINE_NUDGE_SOURCE_H_ diff --git a/chrome/browser/sync/engine/polling_constants.cc b/chrome/browser/sync/engine/polling_constants.cc new file mode 100644 index 0000000..74f24ba --- /dev/null +++ b/chrome/browser/sync/engine/polling_constants.cc @@ -0,0 +1,26 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/basictypes.h" +#include "chrome/browser/sync/engine/polling_constants.h" + +namespace browser_sync { + +// Server can overwrite these values via client commands. +// Standard short poll. This is used when XMPP is off. +// We use high values here to ensure that failure to receive poll updates from +// the server doesn't result in rapid-fire polling from the client due to low +// local limits. +const int64 kDefaultShortPollIntervalSeconds = 3600 * 8; +// Long poll is used when XMPP is on. +const int64 kDefaultLongPollIntervalSeconds = 3600 * 12; + +// Maximum interval for exponential backoff. +const int64 kMaxBackoffSeconds = 60 * 60 * 4; // 4 hours. + +// Backoff interval randomization factor. +const int kBackoffRandomizationFactor = 2; + +} // namespace browser_sync + diff --git a/chrome/browser/sync/engine/polling_constants.h b/chrome/browser/sync/engine/polling_constants.h new file mode 100644 index 0000000..a970f96 --- /dev/null +++ b/chrome/browser/sync/engine/polling_constants.h @@ -0,0 +1,20 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Constants used by SyncerThread when polling servers for updates. + +#ifndef CHROME_BROWSER_SYNC_ENGINE_POLLING_CONSTANTS_H_ +#define CHROME_BROWSER_SYNC_ENGINE_POLLING_CONSTANTS_H_ +#pragma once + +namespace browser_sync { + +extern const int64 kDefaultShortPollIntervalSeconds; +extern const int64 kDefaultLongPollIntervalSeconds; +extern const int64 kMaxBackoffSeconds; +extern const int kBackoffRandomizationFactor; + +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_ENGINE_POLLING_CONSTANTS_H_ diff --git a/chrome/browser/sync/engine/syncer.h b/chrome/browser/sync/engine/syncer.h index 89f1307..f1a63be 100644 --- a/chrome/browser/sync/engine/syncer.h +++ b/chrome/browser/sync/engine/syncer.h @@ -79,7 +79,7 @@ class Syncer { // The constructor may be called from a thread that is not the Syncer's // dedicated thread, to allow some flexibility in the setup. Syncer(); - ~Syncer(); + virtual ~Syncer(); // Called by other threads to tell the syncer to stop what it's doing // and return early from SyncShare, if possible. @@ -89,7 +89,7 @@ class Syncer { // Cause one sync cycle to occur. Like a good parent, it is the caller's // responsibility to clean up after the syncer when it finishes a sync share // operation and honor server mandated throttles. - void SyncShare(sessions::SyncSession* session); + virtual void SyncShare(sessions::SyncSession* session); // Limit the batch size of commit operations to a specified number of items. void set_max_commit_batch_size(int x) { max_commit_batch_size_ = x; } @@ -174,3 +174,4 @@ void ClearServerData(syncable::MutableEntry* entry); } // namespace browser_sync #endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_H_ + diff --git a/chrome/browser/sync/engine/syncer_thread2.cc b/chrome/browser/sync/engine/syncer_thread2.cc new file mode 100755 index 0000000..dfb9b95 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2.cc @@ -0,0 +1,498 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/engine/syncer_thread2.h" + +#include <algorithm> + +#include "base/rand_util.h" +#include "chrome/browser/sync/engine/syncer.h" + +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { + +using sessions::SyncSession; +using sessions::SyncSessionSnapshot; +using sessions::SyncSourceInfo; +using syncable::ModelTypeBitSet; +using sync_pb::GetUpdatesCallerInfo; + +namespace s3 { + +SyncerThread::DelayProvider::DelayProvider() {} +SyncerThread::DelayProvider::~DelayProvider() {} + +TimeDelta SyncerThread::DelayProvider::GetDelay( + const base::TimeDelta& last_delay) { + return SyncerThread::GetRecommendedDelay(last_delay); +} + +SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) + : mode(mode), had_nudge(false), length(length) { } + +SyncerThread::SyncerThread(sessions::SyncSessionContext* context, + Syncer* syncer) + : thread_("SyncEngine_SyncerThread"), + syncer_short_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), + syncer_long_poll_interval_seconds_( + TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), + server_connection_ok_(false), + delay_provider_(new DelayProvider()), + syncer_(syncer), + session_context_(context) { +} + +SyncerThread::~SyncerThread() { + DCHECK(!thread_.IsRunning()); +} + +void SyncerThread::Start(Mode mode) { + if (!thread_.IsRunning() && !thread_.Start()) { + NOTREACHED() << "Unable to start SyncerThread."; + return; + } + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::StartImpl, mode)); +} + +void SyncerThread::StartImpl(Mode mode) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!session_context_->account_name().empty()); + DCHECK(syncer_.get()); + mode_ = mode; + AdjustPolling(NULL); // Will kick start poll timer if needed. +} + +bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose, + const TimeTicks& scheduled_start) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + // Check wait interval. + if (wait_interval_.get()) { + if (wait_interval_->mode == WaitInterval::THROTTLED) + return false; + + DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); + DCHECK(purpose == SyncSessionJob::POLL || + purpose == SyncSessionJob::NUDGE); + if ((purpose != SyncSessionJob::NUDGE) || wait_interval_->had_nudge) + return false; + } + + // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that + // were intended for a normal sync if we are in configuration mode, and vice + // versa. + switch (mode_) { + case CONFIGURATION_MODE: + if (purpose != SyncSessionJob::CONFIGURATION) + return false; + break; + case NORMAL_MODE: + if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE) + return false; + break; + default: + NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; + return false; + } + + // Continuation NUDGE tasks have priority over POLLs because they are the + // only tasks that trigger exponential backoff, so this prevents them from + // being starved from running (e.g. due to a very, very low poll interval, + // such as 0ms). It's rare that this would ever matter in practice. + if (purpose == SyncSessionJob::POLL && (pending_nudge_.get() && + pending_nudge_->session->source().first == + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { + return false; + } + + // Freshness condition. + if (purpose == SyncSessionJob::NUDGE && + (scheduled_start < last_sync_session_end_time_)) { + return false; + } + + return server_connection_ok_; +} + +namespace { +GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( + NudgeSource source) { + switch (source) { + case NUDGE_SOURCE_NOTIFICATION: + return GetUpdatesCallerInfo::NOTIFICATION; + case NUDGE_SOURCE_LOCAL: + return GetUpdatesCallerInfo::LOCAL; + case NUDGE_SOURCE_CONTINUATION: + return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + case NUDGE_SOURCE_CLEAR_PRIVATE_DATA: + return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; + case NUDGE_SOURCE_UNKNOWN: + default: + return GetUpdatesCallerInfo::UNKNOWN; + } +} + +// Functor for std::find_if to search by ModelSafeGroup. +struct WorkerGroupIs { + explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {} + bool operator()(ModelSafeWorker* w) { + return group == w->GetModelSafeGroup(); + } + ModelSafeGroup group; +}; +} // namespace + +void SyncerThread::ScheduleNudge(const TimeDelta& delay, + NudgeSource source, const ModelTypeBitSet& types) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleNudgeImpl, delay, source, types)); +} + +void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, + NudgeSource source, const ModelTypeBitSet& model_types) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + TimeTicks rough_start = TimeTicks::Now() + delay; + if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start)) + return; + + // Note we currently nudge for all types regardless of the ones incurring + // the nudge. Doing different would throw off some syncer commands like + // CleanupDisabledTypes. We may want to change this in the future. + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + session_context_->registrar()->GetModelSafeRoutingInfo(&routes); + session_context_->registrar()->GetWorkers(&workers); + SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types); + + scoped_ptr<SyncSession> session(new SyncSession( + session_context_.get(), this, info, routes, workers)); + + if (pending_nudge_.get()) { + if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) + return; + + pending_nudge_->session->Coalesce(*session.get()); + if (!IsBackingOff()) { + return; + } else { + // Re-schedule the current pending nudge. + SyncSession* s = pending_nudge_->session.get(); + session.reset(new SyncSession(s->context(), s->delegate(), s->source(), + s->routing_info(), s->workers())); + pending_nudge_.reset(); + } + } + ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release()); +} + +// Helper to extract the routing info and workers corresponding to types in +// |types| from |registrar|. +void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, + ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, + std::vector<ModelSafeWorker*>* workers) { + ModelSafeRoutingInfo r_tmp; + std::vector<ModelSafeWorker*> w_tmp; + registrar->GetModelSafeRoutingInfo(&r_tmp); + registrar->GetWorkers(&w_tmp); + + typedef std::vector<ModelSafeWorker*>::const_iterator iter; + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { + if (!types.test(i)) + continue; + syncable::ModelType t = syncable::ModelTypeFromInt(i); + DCHECK_EQ(1U, r_tmp.count(t)); + (*routes)[t] = r_tmp[t]; + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t])); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); + } + + iter it = std::find_if(w_tmp.begin(), w_tmp.end(), + WorkerGroupIs(GROUP_PASSIVE)); + if (it != w_tmp.end()) + workers->push_back(*it); + else + NOTREACHED(); +} + +void SyncerThread::ScheduleConfig(const TimeDelta& delay, + const ModelTypeBitSet& types) { + if (!thread_.IsRunning()) { + NOTREACHED(); + return; + } + + ModelSafeRoutingInfo routes; + std::vector<ModelSafeWorker*> workers; + GetModelSafeParamsForTypes(types, session_context_->registrar(), + &routes, &workers); + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers)); +} + +void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay, + const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + NOTIMPLEMENTED() << "TODO(tim)"; +} + +void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, + SyncSessionJob::Purpose purpose, sessions::SyncSession* session) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + SyncSessionJob job = {purpose, TimeTicks::Now() + delay, + make_linked_ptr(session)}; + if (purpose == SyncSessionJob::NUDGE) { + DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); + pending_nudge_.reset(new SyncSessionJob(job)); + } + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); +} + +void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + if (job.purpose == SyncSessionJob::NUDGE) { + DCHECK(pending_nudge_.get()); + if (pending_nudge_->session != job.session) + return; // Another nudge must have been scheduled in in the meantime. + pending_nudge_.reset(); + } else if (job.purpose == SyncSessionJob::CONFIGURATION) { + NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]"; + } + + bool has_more_to_sync = true; + bool did_job = false; + while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { + VLOG(1) << "SyncerThread: Calling SyncShare."; + did_job = true; + // Synchronously perform the sync session from this thread. + syncer_->SyncShare(job.session.get()); + has_more_to_sync = job.session->HasMoreToSync(); + if (has_more_to_sync) + job.session->ResetTransientState(); + } + VLOG(1) << "SyncerThread: Done SyncShare looping."; + if (did_job) + FinishSyncSessionJob(job); +} + +void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + // Update timing information for how often datatypes are triggering nudges. + base::TimeTicks now = TimeTicks::Now(); + for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; + i < job.session->source().second.size() && + !last_sync_session_end_time_.is_null(); + ++i) { + if (job.session->source().second[i]) { + syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i), + now - last_sync_session_end_time_); + } + } + last_sync_session_end_time_ = now; + if (IsSyncingCurrentlySilenced()) + return; // Nothing to do. + + VLOG(1) << "Updating the next polling time after SyncMain"; + ScheduleNextSync(job); +} + +void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!old_job.session->HasMoreToSync()); + // Note: |num_server_changes_remaining| > 0 here implies that we received a + // broken response while trying to download all updates, because the Syncer + // will loop until this value is exhausted. Also, if unsynced_handles exist + // but HasMoreToSync is false, this implies that the Syncer determined no + // forward progress was possible at this time (an error, such as an HTTP + // 500, is likely to have occurred during commit). + const bool work_to_do = + old_job.session->status_controller()->num_server_changes_remaining() > 0 + || old_job.session->status_controller()->unsynced_handles().size() > 0; + VLOG(1) << "syncer has work to do: " << work_to_do; + + AdjustPolling(&old_job); + + // TODO(tim): Old impl had special code if notifications disabled. Needed? + if (!work_to_do) { + wait_interval_.reset(); // Success implies backoff relief. + return; + } + + if (old_job.session->source().first == + GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { + // We don't seem to have made forward progress. Start or extend backoff. + HandleConsecutiveContinuationError(old_job); + } else if (IsBackingOff()) { + // We weren't continuing but we're in backoff; must have been a nudge. + DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); + DCHECK(!wait_interval_->had_nudge); + wait_interval_->had_nudge = true; + wait_interval_->timer.Reset(); + } else { + // We weren't continuing and we aren't in backoff. Schedule a normal + // continuation. + ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, + old_job.session->source().second); + } +} + +void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { + DCHECK(thread_.IsRunning()); + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + + TimeDelta poll = (!session_context_->notifications_enabled()) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + bool rate_changed = !poll_timer_.IsRunning() || + poll != poll_timer_.GetCurrentDelay(); + + if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) + poll_timer_.Reset(); + + if (!rate_changed) + return; + + // Adjust poll rate. + poll_timer_.Stop(); + poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); +} + +void SyncerThread::HandleConsecutiveContinuationError( + const SyncSessionJob& old_job) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); + SyncSession* old = old_job.session.get(); + SyncSession* s(new SyncSession(session_context_.get(), this, + old->source(), old->routing_info(), old->workers())); + TimeDelta length = delay_provider_->GetDelay( + IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); + wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, + length)); + SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, + make_linked_ptr(s)}; + pending_nudge_.reset(new SyncSessionJob(job)); + wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); +} + +// static +TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { + if (last_delay.InSeconds() >= kMaxBackoffSeconds) + return TimeDelta::FromSeconds(kMaxBackoffSeconds); + + // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 + int64 backoff_s = + std::max(static_cast<int64>(1), + last_delay.InSeconds() * kBackoffRandomizationFactor); + + // Flip a coin to randomize backoff interval by +/- 50%. + int rand_sign = base::RandInt(0, 1) * 2 - 1; + + // Truncation is adequate for rounding here. + backoff_s = backoff_s + + (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); + + // Cap the backoff interval. + backoff_s = std::max(static_cast<int64>(1), + std::min(backoff_s, kMaxBackoffSeconds)); + + return TimeDelta::FromSeconds(backoff_s); +} + +void SyncerThread::Stop() { + syncer_->RequestEarlyExit(); // Safe to call from any thread. + thread_.Stop(); + Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); +} + +void SyncerThread::DoCanaryJob() { + DCHECK(pending_nudge_.get()); + wait_interval_->had_nudge = false; + SyncSessionJob copy = {pending_nudge_->purpose, + pending_nudge_->scheduled_start, + pending_nudge_->session}; + DoSyncSessionJob(copy); +} + +void SyncerThread::PollTimerCallback() { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + ModelSafeRoutingInfo r; + std::vector<ModelSafeWorker*> w; + session_context_->registrar()->GetModelSafeRoutingInfo(&r); + session_context_->registrar()->GetWorkers(&w); + SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet()); + SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); + ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s); +} + +void SyncerThread::Unthrottle() { + DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); + wait_interval_.reset(); +} + +void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + session_context_->NotifyListeners(SyncEngineEvent(cause)); +} + +bool SyncerThread::IsBackingOff() const { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::EXPONENTIAL_BACKOFF; +} + +void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { + wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, + silenced_until - TimeTicks::Now())); + wait_interval_->timer.Start(wait_interval_->length, this, + &SyncerThread::Unthrottle); +} + +bool SyncerThread::IsSyncingCurrentlySilenced() { + return wait_interval_.get() && wait_interval_->mode == + WaitInterval::THROTTLED; +} + +void SyncerThread::OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_short_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval) { + DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); + syncer_long_poll_interval_seconds_ = new_interval; +} + +void SyncerThread::OnShouldStopSyncingPermanently() { + syncer_->RequestEarlyExit(); // Thread-safe. + Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); +} + +void SyncerThread::OnServerConnectionEvent( + const ServerConnectionEvent& event) { + NOTIMPLEMENTED(); +} + +void SyncerThread::set_notifications_enabled(bool notifications_enabled) { + session_context_->set_notifications_enabled(notifications_enabled); +} + +} // s3 +} // browser_sync diff --git a/chrome/browser/sync/engine/syncer_thread2.h b/chrome/browser/sync/engine/syncer_thread2.h new file mode 100755 index 0000000..e60b5f4 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2.h @@ -0,0 +1,237 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A class to run the syncer on a thread. +#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD2_H_ +#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD2_H_ +#pragma once + +#include "base/linked_ptr.h" +#include "base/observer_list.h" +#include "base/scoped_ptr.h" +#include "base/task.h" +#include "base/threading/thread.h" +#include "base/time.h" +#include "base/timer.h" +#include "chrome/browser/sync/engine/nudge_source.h" +#include "chrome/browser/sync/engine/polling_constants.h" +#include "chrome/browser/sync/sessions/sync_session.h" +#include "chrome/browser/sync/sessions/sync_session_context.h" + +namespace browser_sync { + +struct ServerConnectionEvent; +class Syncer; + +namespace s3 { + +class SyncerThread : public sessions::SyncSession::Delegate { + public: + enum Mode { + // In this mode, the thread only performs configuration tasks. This is + // designed to make the case where we want to download updates for a + // specific type only, and not continue syncing until we are moved into + // normal mode. + CONFIGURATION_MODE, + // Resumes polling and allows nudges, drops configuration tasks. Runs + // through entire sync cycle. + NORMAL_MODE, + }; + + // Takes ownership of both |context| and |syncer|. + SyncerThread(sessions::SyncSessionContext* context, Syncer* syncer); + virtual ~SyncerThread(); + + // Change the mode of operation. + // We don't use a lock when changing modes, so we won't cause currently + // scheduled jobs to adhere to the new mode. We could protect it, but it + // doesn't buy very much as a) a session could already be in progress and it + // will continue no matter what, b) the scheduled sessions already contain + // all their required state and won't be affected by potential change at + // higher levels (i.e. the registrar), and c) we service tasks FIFO, so once + // the mode changes all future jobs will be run against the updated mode. + void Start(Mode mode); + + // Joins on the thread as soon as possible (currently running session + // completes). + void Stop(); + + // The meat and potatoes. + void ScheduleNudge(const base::TimeDelta& delay, NudgeSource source, + const syncable::ModelTypeBitSet& types); + void ScheduleConfig(const base::TimeDelta& delay, + const syncable::ModelTypeBitSet& types); + + // Change status of notifications in the SyncSessionContext. + void set_notifications_enabled(bool notifications_enabled); + + // DDOS avoidance function. Calculates how long we should wait before trying + // again after a failed sync attempt, where the last delay was |base_delay|. + // TODO(tim): Look at URLRequestThrottlerEntryInterface. + static base::TimeDelta GetRecommendedDelay(const base::TimeDelta& base_delay); + + // SyncSession::Delegate implementation. + virtual void OnSilencedUntil(const base::TimeTicks& silenced_until); + virtual bool IsSyncingCurrentlySilenced(); + virtual void OnReceivedShortPollIntervalUpdate( + const base::TimeDelta& new_interval); + virtual void OnReceivedLongPollIntervalUpdate( + const base::TimeDelta& new_interval); + virtual void OnShouldStopSyncingPermanently(); + + private: + friend class SyncerThread2Test; + + // State pertaining to exponential backoff or throttling periods. + struct WaitInterval { + enum Mode { + // A wait interval whose duration has been affected by exponential + // backoff. + // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. + EXPONENTIAL_BACKOFF, + // A server-initiated throttled interval. We do not allow any syncing + // during such an interval. + THROTTLED, + }; + Mode mode; + + // This bool is set to true if we have observed a nudge during this + // interval and mode == EXPONENTIAL_BACKOFF. + bool had_nudge; + base::TimeDelta length; + base::OneShotTimer<SyncerThread> timer; + WaitInterval(Mode mode, base::TimeDelta length); + }; + + // Internal state for every sync task that is scheduled. + struct SyncSessionJob { + // An enum used to describe jobs for scheduling purposes. + enum Purpose { + // Our poll timer schedules POLL jobs periodically based on a server + // assigned poll interval. + POLL, + // A nudge task can come from a variety of components needing to force + // a sync. The source is inferable from |session.source()|. + NUDGE, + // Typically used for fetching updates for a subset of the enabled types + // during initial sync or reconfiguration. We don't run all steps of + // the sync cycle for these (e.g. CleanupDisabledTypes is skipped). + CONFIGURATION, + }; + + Purpose purpose; + base::TimeTicks scheduled_start; + linked_ptr<sessions::SyncSession> session; + }; + + // A component used to get time delays associated with exponential backoff. + // Encapsulated into a class to facilitate testing. + class DelayProvider { + public: + DelayProvider(); + virtual base::TimeDelta GetDelay(const base::TimeDelta& last_delay); + virtual ~DelayProvider(); + private: + DISALLOW_COPY_AND_ASSIGN(DelayProvider); + }; + + // Helper to assemble a job and post a delayed task to sync. + void ScheduleSyncSessionJob(const base::TimeDelta& delay, + SyncSessionJob::Purpose purpose, + sessions::SyncSession* session); + + // Invoke the Syncer to perform a sync. + void DoSyncSessionJob(const SyncSessionJob& job); + + // Called after the Syncer has performed the sync represented by |job|, to + // reset our state. + void FinishSyncSessionJob(const SyncSessionJob& job); + + // Helper to FinishSyncSessionJob to schedule the next sync operation. + void ScheduleNextSync(const SyncSessionJob& old_job); + + // Helper to configure polling intervals. Used by Start and ScheduleNextSync. + void AdjustPolling(const SyncSessionJob* old_job); + + // Helper to ScheduleNextSync in case of consecutive sync errors. + void HandleConsecutiveContinuationError(const SyncSessionJob& old_job); + + // Determines if it is legal to run a sync job for |purpose| at + // |scheduled_start|. This checks current operational mode, backoff or + // throttling, freshness (so we don't make redundant syncs), and connection. + bool ShouldRunJob(SyncSessionJob::Purpose purpose, + const base::TimeTicks& scheduled_start); + + // 'Impl' here refers to real implementation of public functions, running on + // |thread_|. + void StartImpl(Mode mode); + void ScheduleNudgeImpl(const base::TimeDelta& delay, + NudgeSource source, + const syncable::ModelTypeBitSet& model_types); + void ScheduleConfigImpl(const base::TimeDelta& delay, + const ModelSafeRoutingInfo& routing_info, + const std::vector<ModelSafeWorker*>& workers); + + // Returns true if the client is currently in exponential backoff. + bool IsBackingOff() const; + + // Helper to signal all listeners registered with |session_context_|. + void Notify(SyncEngineEvent::EventCause cause); + + // ServerConnectionEventListener implementation. + // TODO(tim): schedule a nudge when valid connection detected? in 1 minute? + virtual void OnServerConnectionEvent(const ServerConnectionEvent& event); + + // Callback to change backoff state. + void DoCanaryJob(); + void Unthrottle(); + + // Creates a session for a poll and performs the sync. + void PollTimerCallback(); + + base::Thread thread_; + + // Modifiable versions of kDefaultLongPollIntervalSeconds which can be + // updated by the server. + base::TimeDelta syncer_short_poll_interval_seconds_; + base::TimeDelta syncer_long_poll_interval_seconds_; + + // Periodic timer for polling. See AdjustPolling. + base::RepeatingTimer<SyncerThread> poll_timer_; + + // The mode of operation. We don't use a lock, see Start(...) comment. + Mode mode_; + + // TODO(tim): Bug 26339. This needs to track more than just time I think, + // since the nudges could be for different types. Current impl doesn't care. + base::TimeTicks last_sync_session_end_time_; + + // Have we observed a valid server connection? + bool server_connection_ok_; + + // Tracks in-flight nudges so we can coalesce. + scoped_ptr<SyncSessionJob> pending_nudge_; + + // Current wait state. Null if we're not in backoff and not throttled. + scoped_ptr<WaitInterval> wait_interval_; + + scoped_ptr<DelayProvider> delay_provider_; + + // Invoked to run through the sync cycle. + scoped_ptr<Syncer> syncer_; + + scoped_ptr<sessions::SyncSessionContext> session_context_; + + DISALLOW_COPY_AND_ASSIGN(SyncerThread); +}; + +} // namespace s3 + +} // namespace browser_sync + +// The SyncerThread manages its own internal thread and thus outlives it. We +// don't need refcounting for posting tasks to this internal thread. +DISABLE_RUNNABLE_METHOD_REFCOUNT(browser_sync::s3::SyncerThread); + +#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD2_H_ diff --git a/chrome/browser/sync/engine/syncer_thread2_unittest.cc b/chrome/browser/sync/engine/syncer_thread2_unittest.cc new file mode 100644 index 0000000..b0adecd --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread2_unittest.cc @@ -0,0 +1,554 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/synchronization/waitable_event.h" +#include "base/test/test_timeouts.h" +#include "chrome/browser/sync/engine/mock_model_safe_workers.h" +#include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/engine/syncer_thread2.h" +#include "chrome/browser/sync/sessions/test_util.h" +#include "chrome/test/sync/engine/test_directory_setter_upper.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/gmock/include/gmock/gmock.h" + +using base::TimeDelta; +using base::TimeTicks; +using testing::_; +using testing::AtLeast; +using testing::DoAll; +using testing::Eq; +using testing::Invoke; +using testing::Mock; +using testing::Return; +using testing::WithArg; + +namespace browser_sync { +using sessions::SyncSession; +using sessions::SyncSessionContext; +using sessions::SyncSessionSnapshot; +using syncable::ModelTypeBitSet; +using sync_pb::GetUpdatesCallerInfo; + +class MockSyncer : public Syncer { + public: + MOCK_METHOD1(SyncShare, void(sessions::SyncSession*)); +}; + +namespace s3 { + +// Used when tests want to record syncing activity to examine later. +struct SyncShareRecords { + std::vector<TimeTicks> times; + std::vector<linked_ptr<SyncSessionSnapshot> > snapshots; +}; + +// Convenient to use in tests wishing to analyze SyncShare calls over time. +static const size_t kMinNumSamples = 5; + +class SyncerThread2Test : public testing::Test { + public: + class MockDelayProvider : public SyncerThread::DelayProvider { + public: + MOCK_METHOD1(GetDelay, TimeDelta(const TimeDelta&)); + }; + + virtual void SetUp() { + syncdb_.SetUp(); + syncer_ = new MockSyncer(); + delay_ = NULL; + registrar_.reset(MockModelSafeWorkerRegistrar::PassiveBookmarks()); + context_ = new SyncSessionContext(NULL, syncdb_.manager(), + registrar_.get(), std::vector<SyncEngineEventListener*>()); + context_->set_notifications_enabled(true); + context_->set_account_name("Test"); + syncer_thread_.reset(new SyncerThread(context_, syncer_)); + // TODO(tim): Once the SCM is hooked up, remove this. + syncer_thread_->server_connection_ok_ = true; + } + + SyncerThread* syncer_thread() { return syncer_thread_.get(); } + MockSyncer* syncer() { return syncer_; } + MockDelayProvider* delay() { return delay_; } + TimeDelta zero() { return TimeDelta::FromSeconds(0); } + TimeDelta timeout() { + return TimeDelta::FromMilliseconds(TestTimeouts::action_timeout_ms()); + } + + virtual void TearDown() { + syncer_thread()->Stop(); + syncdb_.TearDown(); + } + + void AnalyzePollRun(const SyncShareRecords& records, size_t min_num_samples, + const TimeTicks& optimal_start, const TimeDelta& poll_interval) { + const std::vector<TimeTicks>& data(records.times); + EXPECT_GE(data.size(), min_num_samples); + for (size_t i = 0; i < data.size(); i++) { + SCOPED_TRACE(testing::Message() << "SyncShare # (" << i << ")"); + TimeTicks optimal_next_sync = optimal_start + poll_interval * i; + EXPECT_GE(data[i], optimal_next_sync); + EXPECT_LT(data[i], optimal_next_sync + poll_interval); + EXPECT_EQ(GetUpdatesCallerInfo::PERIODIC, + records.snapshots[i]->source.first); + } + } + + bool GetBackoffAndResetTest(base::WaitableEvent* done) { + syncable::ModelTypeBitSet nudge_types; + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, nudge_types); + done->TimedWait(timeout()); + TearDown(); + done->Reset(); + Mock::VerifyAndClearExpectations(syncer()); + bool backing_off = syncer_thread()->IsBackingOff(); + SetUp(); + UseMockDelayProvider(); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromMilliseconds(1))); + return backing_off; + } + + void UseMockDelayProvider() { + delay_ = new MockDelayProvider(); + syncer_thread_->delay_provider_.reset(delay_); + } + + void PostSignalTask(base::WaitableEvent* done) { + syncer_thread_->thread_.message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&SyncerThread2Test::SignalWaitableEvent, done)); + } + + void FlushLastTask(base::WaitableEvent* done) { + PostSignalTask(done); + done->TimedWait(timeout()); + done->Reset(); + } + + static void SignalWaitableEvent(base::WaitableEvent* event) { + event->Signal(); + } + + private: + scoped_ptr<SyncerThread> syncer_thread_; + SyncSessionContext* context_; + MockSyncer* syncer_; + MockDelayProvider* delay_; + scoped_ptr<MockModelSafeWorkerRegistrar> registrar_; + MockDirectorySetterUpper syncdb_; +}; + +bool RecordSyncShareImpl(SyncSession* s, SyncShareRecords* record, + size_t signal_after) { + record->times.push_back(TimeTicks::Now()); + record->snapshots.push_back(make_linked_ptr(new SyncSessionSnapshot( + s->TakeSnapshot()))); + return record->times.size() >= signal_after; +} + +ACTION_P4(RecordSyncShareAndPostSignal, record, signal_after, test, event) { + if (RecordSyncShareImpl(arg0, record, signal_after) && event) + test->PostSignalTask(event); +} + +ACTION_P3(RecordSyncShare, record, signal_after, event) { + if (RecordSyncShareImpl(arg0, record, signal_after) && event) + event->Signal(); +} + +ACTION_P(SignalEvent, event) { + SyncerThread2Test::SignalWaitableEvent(event); +} + +// Test nudge scheduling. +TEST_F(SyncerThread2Test, Nudge) { + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + base::WaitableEvent done(false, false); + SyncShareRecords records; + syncable::ModelTypeBitSet model_types; + model_types[syncable::BOOKMARKS] = true; + + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, 1U, &done)))); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, model_types); + done.TimedWait(timeout()); + + EXPECT_EQ(1U, records.snapshots.size()); + EXPECT_EQ(model_types, records.snapshots[0]->source.second); + EXPECT_EQ(GetUpdatesCallerInfo::LOCAL, records.snapshots[0]->source.first); +} + +// Test that nudges are coalesced. +TEST_F(SyncerThread2Test, NudgeCoalescing) { + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + base::WaitableEvent done(false, false); + SyncShareRecords r; + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&r, 1U, &done)))); + syncable::ModelTypeBitSet types1, types2, types3; + types1[syncable::BOOKMARKS] = true; + types2[syncable::AUTOFILL] = true; + types3[syncable::THEMES] = true; + TimeDelta delay = TimeDelta::FromMilliseconds(20); + TimeTicks optimal_time = TimeTicks::Now() + delay; + syncer_thread()->ScheduleNudge(delay, NUDGE_SOURCE_UNKNOWN, types1); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, types2); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_NOTIFICATION, types3); + done.TimedWait(timeout()); + + EXPECT_EQ(1U, r.snapshots.size()); + EXPECT_GE(r.times[0], optimal_time); + EXPECT_EQ(types1 | types2 | types3, r.snapshots[0]->source.second); + EXPECT_EQ(GetUpdatesCallerInfo::NOTIFICATION, r.snapshots[0]->source.first); + + SyncShareRecords r2; + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&r2, 1U, &done)))); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_NOTIFICATION, types3); + done.TimedWait(timeout()); + EXPECT_EQ(1U, r2.snapshots.size()); + EXPECT_EQ(types3, r2.snapshots[0]->source.second); + EXPECT_EQ(GetUpdatesCallerInfo::NOTIFICATION, r2.snapshots[0]->source.first); +} + +// Test that polling works as expected. +TEST_F(SyncerThread2Test, Polling) { + SyncShareRecords records; + base::WaitableEvent done(false, false); + TimeDelta poll_interval(TimeDelta::FromMilliseconds(30)); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll_interval); + EXPECT_CALL(*syncer(), SyncShare(_)).Times(AtLeast(kMinNumSamples)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, kMinNumSamples, &done)))); + + TimeTicks optimal_start = TimeTicks::Now() + poll_interval; + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + done.TimedWait(timeout()); + syncer_thread()->Stop(); + + AnalyzePollRun(records, kMinNumSamples, optimal_start, poll_interval); +} + +// Test that the short poll interval is used. +TEST_F(SyncerThread2Test, PollNotificationsDisabled) { + SyncShareRecords records; + base::WaitableEvent done(false, false); + TimeDelta poll_interval(TimeDelta::FromMilliseconds(30)); + syncer_thread()->OnReceivedShortPollIntervalUpdate(poll_interval); + syncer_thread()->set_notifications_enabled(false); + EXPECT_CALL(*syncer(), SyncShare(_)).Times(AtLeast(kMinNumSamples)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, kMinNumSamples, &done)))); + + TimeTicks optimal_start = TimeTicks::Now() + poll_interval; + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + done.TimedWait(timeout()); + syncer_thread()->Stop(); + + AnalyzePollRun(records, kMinNumSamples, optimal_start, poll_interval); +} + +// Test that polling intervals are updated when needed. +TEST_F(SyncerThread2Test, PollIntervalUpdate) { + SyncShareRecords records; + base::WaitableEvent done(false, false); + TimeDelta poll1(TimeDelta::FromMilliseconds(120)); + TimeDelta poll2(TimeDelta::FromMilliseconds(30)); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll1); + EXPECT_CALL(*syncer(), SyncShare(_)).Times(AtLeast(kMinNumSamples)) + .WillOnce(WithArg<0>( + sessions::test_util::SimulatePollIntervalUpdate(poll2))) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, kMinNumSamples, &done)))); + + TimeTicks optimal_start = TimeTicks::Now() + poll1 + poll2; + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + done.TimedWait(timeout()); + syncer_thread()->Stop(); + + AnalyzePollRun(records, kMinNumSamples, optimal_start, poll2); +} + +// Test that a sync session is run through to completion. +TEST_F(SyncerThread2Test, HasMoreToSync) { + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + base::WaitableEvent done(false, false); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateHasMoreToSync)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + SignalEvent(&done))); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, ModelTypeBitSet()); + done.TimedWait(timeout()); + // If more nudges are scheduled, they'll be waited on by TearDown, and would + // cause our expectation to break. +} + +// Test that no syncing occurs when throttled. +TEST_F(SyncerThread2Test, ThrottlingDoesThrottle) { + syncable::ModelTypeBitSet types; + types[syncable::BOOKMARKS] = true; + base::WaitableEvent done(false, false); + TimeDelta poll(TimeDelta::FromMilliseconds(5)); + TimeDelta throttle(TimeDelta::FromMinutes(10)); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(WithArg<0>(sessions::test_util::SimulateThrottled(throttle))); + + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, types); + FlushLastTask(&done); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE); + syncer_thread()->ScheduleConfig(zero(), types); + FlushLastTask(&done); +} + +TEST_F(SyncerThread2Test, ThrottlingExpires) { + SyncShareRecords records; + base::WaitableEvent done(false, false); + TimeDelta poll(TimeDelta::FromMilliseconds(15)); + TimeDelta throttle1(TimeDelta::FromMilliseconds(150)); + TimeDelta throttle2(TimeDelta::FromMinutes(10)); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); + + ::testing::InSequence seq; + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(WithArg<0>(sessions::test_util::SimulateThrottled(throttle1))) + .RetiresOnSaturation(); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + WithArg<0>(RecordSyncShare(&records, kMinNumSamples, &done)))); + + TimeTicks optimal_start = TimeTicks::Now() + poll + throttle1; + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + done.TimedWait(timeout()); + syncer_thread()->Stop(); + + AnalyzePollRun(records, kMinNumSamples, optimal_start, poll); +} + +// Test nudges / polls don't run in config mode and config tasks do. +TEST_F(SyncerThread2Test, ConfigurationMode) { + TimeDelta poll(TimeDelta::FromMilliseconds(15)); + base::WaitableEvent done(false, false); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); + EXPECT_CALL(*syncer(), SyncShare(_)).Times(0); + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE); + syncable::ModelTypeBitSet nudge_types; + nudge_types[syncable::AUTOFILL] = true; + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, nudge_types); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, nudge_types); + + syncable::ModelTypeBitSet config_types; + config_types[syncable::BOOKMARKS] = true; + // TODO(tim): This will fail once CONFIGURATION tasks are implemented. Update + // the EXPECT when that happens. + syncer_thread()->ScheduleConfig(zero(), config_types); + FlushLastTask(&done); +} + +// Test that exponential backoff is properly triggered. +TEST_F(SyncerThread2Test, BackoffTriggers) { + base::WaitableEvent done(false, false); + UseMockDelayProvider(); + + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateDownloadUpdatesFailed)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + SignalEvent(&done))); + EXPECT_FALSE(GetBackoffAndResetTest(&done)); + // Note GetBackoffAndResetTest clears mocks and re-instantiates the syncer. + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateSuccess), + SignalEvent(&done))); + EXPECT_FALSE(GetBackoffAndResetTest(&done)); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateDownloadUpdatesFailed)) + .WillRepeatedly(DoAll(Invoke( + sessions::test_util::SimulateDownloadUpdatesFailed), + SignalEvent(&done))); + EXPECT_TRUE(GetBackoffAndResetTest(&done)); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + SignalEvent(&done))); + EXPECT_TRUE(GetBackoffAndResetTest(&done)); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateDownloadUpdatesFailed)) + .WillOnce(Invoke(sessions::test_util::SimulateDownloadUpdatesFailed)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + SignalEvent(&done))); + EXPECT_FALSE(GetBackoffAndResetTest(&done)); + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + SignalEvent(&done))); + EXPECT_FALSE(GetBackoffAndResetTest(&done)); +} + +// Test that no polls or extraneous nudges occur when in backoff. +TEST_F(SyncerThread2Test, BackoffDropsJobs) { + SyncShareRecords r; + TimeDelta poll(TimeDelta::FromMilliseconds(5)); + base::WaitableEvent done(false, false); + syncable::ModelTypeBitSet types; + types[syncable::BOOKMARKS] = true; + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); + UseMockDelayProvider(); + + EXPECT_CALL(*syncer(), SyncShare(_)).Times(2) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + RecordSyncShareAndPostSignal(&r, 2U, this, &done))); + EXPECT_CALL(*delay(), GetDelay(_)) + .WillRepeatedly(Return(TimeDelta::FromDays(1))); + + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + ASSERT_TRUE(done.TimedWait(timeout())); + done.Reset(); + + Mock::VerifyAndClearExpectations(syncer()); + EXPECT_EQ(2U, r.snapshots.size()); + EXPECT_EQ(GetUpdatesCallerInfo::PERIODIC, r.snapshots[0]->source.first); + EXPECT_EQ(GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION, + r.snapshots[1]->source.first); + + EXPECT_CALL(*syncer(), SyncShare(_)).Times(1) + .WillOnce(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + RecordSyncShareAndPostSignal(&r, 1U, this, &done))); + + // We schedule a nudge with enough delay (10X poll interval) that at least + // one or two polls would have taken place. The nudge should succeed. + syncer_thread()->ScheduleNudge(poll * 10, NUDGE_SOURCE_LOCAL, types); + ASSERT_TRUE(done.TimedWait(timeout())); + done.Reset(); + + Mock::VerifyAndClearExpectations(syncer()); + Mock::VerifyAndClearExpectations(delay()); + EXPECT_EQ(3U, r.snapshots.size()); + EXPECT_EQ(GetUpdatesCallerInfo::LOCAL, r.snapshots[2]->source.first); + + EXPECT_CALL(*syncer(), SyncShare(_)).Times(0); + EXPECT_CALL(*delay(), GetDelay(_)).Times(0); + + syncer_thread()->Start(SyncerThread::CONFIGURATION_MODE); + syncer_thread()->ScheduleConfig(zero(), types); + FlushLastTask(&done); + + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, types); + syncer_thread()->ScheduleNudge(zero(), NUDGE_SOURCE_LOCAL, types); + FlushLastTask(&done); +} + +// Test that backoff is shaping traffic properly with consecutive errors. +TEST_F(SyncerThread2Test, BackoffElevation) { + SyncShareRecords r; + const TimeDelta poll(TimeDelta::FromMilliseconds(10)); + base::WaitableEvent done(false, false); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); + UseMockDelayProvider(); + + const TimeDelta first = TimeDelta::FromSeconds(1); + const TimeDelta second = TimeDelta::FromMilliseconds(10); + const TimeDelta third = TimeDelta::FromMilliseconds(20); + const TimeDelta fourth = TimeDelta::FromMilliseconds(30); + const TimeDelta fifth = TimeDelta::FromDays(1); + + EXPECT_CALL(*syncer(), SyncShare(_)).Times(kMinNumSamples) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateCommitFailed), + RecordSyncShareAndPostSignal(&r, kMinNumSamples, this, &done))); + + EXPECT_CALL(*delay(), GetDelay(Eq(first))).WillOnce(Return(second)) + .RetiresOnSaturation(); + EXPECT_CALL(*delay(), GetDelay(Eq(second))).WillOnce(Return(third)) + .RetiresOnSaturation(); + EXPECT_CALL(*delay(), GetDelay(Eq(third))).WillOnce(Return(fourth)) + .RetiresOnSaturation(); + EXPECT_CALL(*delay(), GetDelay(Eq(fourth))).WillOnce(Return(fifth)); + + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + ASSERT_TRUE(done.TimedWait(timeout())); + + EXPECT_GE(r.times[2] - r.times[1], second); + EXPECT_GE(r.times[3] - r.times[2], third); + EXPECT_GE(r.times[4] - r.times[3], fourth); +} + +// Test that things go back to normal once a canary task makes forward progress +// following a succession of failures. +TEST_F(SyncerThread2Test, BackoffRelief) { + SyncShareRecords r; + const TimeDelta poll(TimeDelta::FromMilliseconds(10)); + base::WaitableEvent done(false, false); + syncer_thread()->OnReceivedLongPollIntervalUpdate(poll); + UseMockDelayProvider(); + + const TimeDelta backoff = TimeDelta::FromMilliseconds(100); + + EXPECT_CALL(*syncer(), SyncShare(_)) + .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillOnce(Invoke(sessions::test_util::SimulateCommitFailed)) + .WillRepeatedly(DoAll(Invoke(sessions::test_util::SimulateSuccess), + RecordSyncShareAndPostSignal(&r, kMinNumSamples, this, &done))); + EXPECT_CALL(*delay(), GetDelay(_)).WillOnce(Return(backoff)); + + // Optimal start for the post-backoff poll party. + TimeTicks optimal_start = TimeTicks::Now() + poll + backoff; + syncer_thread()->Start(SyncerThread::NORMAL_MODE); + done.TimedWait(timeout()); + + // Check for healthy polling after backoff is relieved. + // Can't use AnalyzePollRun because first sync is a continuation. Bleh. + for (size_t i = 0; i < r.times.size(); i++) { + SCOPED_TRACE(testing::Message() << "SyncShare # (" << i << ")"); + TimeTicks optimal_next_sync = optimal_start + poll * i; + EXPECT_GE(r.times[i], optimal_next_sync); + EXPECT_LT(r.times[i], optimal_next_sync + poll); + EXPECT_EQ(i == 0 ? GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION + : GetUpdatesCallerInfo::PERIODIC, + r.snapshots[i]->source.first); + } +} + +TEST_F(SyncerThread2Test, GetRecommendedDelay) { + EXPECT_LE(TimeDelta::FromSeconds(0), + SyncerThread::GetRecommendedDelay(TimeDelta::FromSeconds(0))); + EXPECT_LE(TimeDelta::FromSeconds(1), + SyncerThread::GetRecommendedDelay(TimeDelta::FromSeconds(1))); + EXPECT_LE(TimeDelta::FromSeconds(50), + SyncerThread::GetRecommendedDelay(TimeDelta::FromSeconds(50))); + EXPECT_LE(TimeDelta::FromSeconds(10), + SyncerThread::GetRecommendedDelay(TimeDelta::FromSeconds(10))); + EXPECT_EQ(TimeDelta::FromSeconds(kMaxBackoffSeconds), + SyncerThread::GetRecommendedDelay( + TimeDelta::FromSeconds(kMaxBackoffSeconds))); + EXPECT_EQ(TimeDelta::FromSeconds(kMaxBackoffSeconds), + SyncerThread::GetRecommendedDelay( + TimeDelta::FromSeconds(kMaxBackoffSeconds + 1))); +} + +// Test config tasks don't run during normal mode. +// TODO(tim): Implement this test and then the functionality! +TEST_F(SyncerThread2Test, DISABLED_NoConfigDuringNormal) { +} + +// Test that starting the syncer thread without a valid connection doesn't +// break things when a connection is detected. +// Test config tasks don't run during normal mode. +// TODO(tim): Implement this test and then the functionality! +TEST_F(SyncerThread2Test, DISABLED_StartWhenNotConnected) { + +} + +} // namespace s3 +} // namespace browser_sync + +// SyncerThread won't outlive the test! +DISABLE_RUNNABLE_METHOD_REFCOUNT(browser_sync::s3::SyncerThread2Test); + diff --git a/chrome/browser/sync/engine/syncer_thread_unittest.cc b/chrome/browser/sync/engine/syncer_thread_unittest.cc index 1553e9c..9822547 100644 --- a/chrome/browser/sync/engine/syncer_thread_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread_unittest.cc @@ -58,7 +58,7 @@ SyncSessionSnapshot SessionSnapshotForTest( return SyncSessionSnapshot(SyncerStatus(), ErrorCounters(), num_server_changes_remaining, false, syncable::ModelTypeBitSet(), download_progress_markers, - false, false, unsynced_count, 0, false); + false, false, unsynced_count, 0, false, sessions::SyncSourceInfo()); } class ListenerMock : public SyncEngineEventListener { diff --git a/chrome/browser/sync/engine/syncer_types.h b/chrome/browser/sync/engine/syncer_types.h index 0ed1f19..0e05e0e 100644 --- a/chrome/browser/sync/engine/syncer_types.h +++ b/chrome/browser/sync/engine/syncer_types.h @@ -87,10 +87,12 @@ struct SyncEngineEvent { // This event is sent when the thread is paused in response to a // pause request. + // TODO(tim): Deprecated. SYNCER_THREAD_PAUSED, // This event is sent when the thread is resumed in response to a // resume request. + // TODO(tim): Deprecated. SYNCER_THREAD_RESUMED, // This event is sent when the thread is waiting for a connection diff --git a/chrome/browser/sync/sessions/session_state.cc b/chrome/browser/sync/sessions/session_state.cc index 1fe714f..fda99b8 100644 --- a/chrome/browser/sync/sessions/session_state.cc +++ b/chrome/browser/sync/sessions/session_state.cc @@ -40,7 +40,8 @@ SyncSessionSnapshot::SyncSessionSnapshot( bool is_silenced, int64 unsynced_count, int num_conflicting_updates, - bool did_commit_items) + bool did_commit_items, + const SyncSourceInfo& source) : syncer_status(syncer_status), errors(errors), num_server_changes_remaining(num_server_changes_remaining), @@ -51,7 +52,8 @@ SyncSessionSnapshot::SyncSessionSnapshot( is_silenced(is_silenced), unsynced_count(unsynced_count), num_conflicting_updates(num_conflicting_updates), - did_commit_items(did_commit_items) { + did_commit_items(did_commit_items), + source(source) { for (int i = 0; i < syncable::MODEL_TYPE_COUNT; ++i) { const_cast<std::string&>(this->download_progress_markers[i]).assign( download_progress_markers[i]); diff --git a/chrome/browser/sync/sessions/session_state.h b/chrome/browser/sync/sessions/session_state.h index fc42994..2d8381e3 100644 --- a/chrome/browser/sync/sessions/session_state.h +++ b/chrome/browser/sync/sessions/session_state.h @@ -32,6 +32,9 @@ namespace sessions { class UpdateProgress; +typedef std::pair<sync_pb::GetUpdatesCallerInfo::GetUpdatesSource, + syncable::ModelTypeBitSet> SyncSourceInfo; + // Data pertaining to the status of an active Syncer object. struct SyncerStatus { SyncerStatus(); @@ -79,7 +82,8 @@ struct SyncSessionSnapshot { bool is_silenced, int64 unsynced_count, int num_conflicting_updates, - bool did_commit_items); + bool did_commit_items, + const SyncSourceInfo& source); ~SyncSessionSnapshot(); const SyncerStatus syncer_status; @@ -93,6 +97,7 @@ struct SyncSessionSnapshot { const int64 unsynced_count; const int num_conflicting_updates; const bool did_commit_items; + const SyncSourceInfo source; }; // Tracks progress of conflicts and their resolution using conflict sets. diff --git a/chrome/browser/sync/sessions/sync_session.cc b/chrome/browser/sync/sessions/sync_session.cc index da6412f..077b629 100644 --- a/chrome/browser/sync/sessions/sync_session.cc +++ b/chrome/browser/sync/sessions/sync_session.cc @@ -24,6 +24,32 @@ SyncSession::SyncSession(SyncSessionContext* context, Delegate* delegate, SyncSession::~SyncSession() {} +void SyncSession::Coalesce(const SyncSession& session) { + if (context_ != session.context() || delegate_ != session.delegate_) { + NOTREACHED(); + return; + } + + source_ = SyncSourceInfo(session.source_.first, + source_.second | session.source_.second); + + std::vector<ModelSafeWorker*> temp; + std::set_union(workers_.begin(), workers_.end(), + session.workers_.begin(), session.workers_.end(), + std::back_inserter(temp)); + workers_.swap(temp); + + ModelSafeRoutingInfo temp_r; + std::set_union(routing_info_.begin(), routing_info_.end(), + session.routing_info_.begin(), session.routing_info_.end(), + std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); + routing_info_.swap(temp_r); +} + +void SyncSession::ResetTransientState() { + status_controller_.reset(new StatusController(routing_info_)); +} + SyncSessionSnapshot SyncSession::TakeSnapshot() const { syncable::ScopedDirLookup dir(context_->directory_manager(), context_->account_name()); @@ -55,7 +81,8 @@ SyncSessionSnapshot SyncSession::TakeSnapshot() const { delegate_->IsSyncingCurrentlySilenced(), status_controller_->unsynced_handles().size(), status_controller_->TotalNumConflictingItems(), - status_controller_->did_commit_items()); + status_controller_->did_commit_items(), + source_); } SyncSourceInfo SyncSession::TestAndSetSource() { diff --git a/chrome/browser/sync/sessions/sync_session.h b/chrome/browser/sync/sessions/sync_session.h index ce7c9e5..3626d11 100644 --- a/chrome/browser/sync/sessions/sync_session.h +++ b/chrome/browser/sync/sessions/sync_session.h @@ -35,8 +35,6 @@ namespace browser_sync { class ModelSafeWorker; namespace sessions { -typedef std::pair<sync_pb::GetUpdatesCallerInfo::GetUpdatesSource, - syncable::ModelTypeBitSet> SyncSourceInfo; class SyncSession { public: @@ -92,8 +90,19 @@ class SyncSession { // engine again. bool HasMoreToSync() const; - SyncSessionContext* context() { return context_; } - Delegate* delegate() { return delegate_; } + // Collects all state pertaining to how and why |s| originated and unions it + // with corresponding state in |this|, leaving |s| unchanged. Allows |this| + // to take on the responsibilities |s| had (e.g. certain data types) in the + // next SyncShare operation using |this|, rather than needed two separate + // sessions. + void Coalesce(const SyncSession& session); + + // Should be called any time |this| is being re-used in a new call to + // SyncShare (e.g., HasMoreToSync returned true). + void ResetTransientState(); + + SyncSessionContext* context() const { return context_; } + Delegate* delegate() const { return delegate_; } syncable::WriteTransaction* write_transaction() { return write_transaction_; } StatusController* status_controller() { return status_controller_.get(); } @@ -138,12 +147,13 @@ class SyncSession { scoped_ptr<StatusController> status_controller_; // The set of active ModelSafeWorkers for the duration of this session. - const std::vector<ModelSafeWorker*> workers_; + // This can change if this session is Coalesce()'d with another. + std::vector<ModelSafeWorker*> workers_; // The routing info for the duration of this session, dictating which // datatypes should be synced and which workers should be used when working // on those datatypes. - const ModelSafeRoutingInfo routing_info_; + ModelSafeRoutingInfo routing_info_; DISALLOW_COPY_AND_ASSIGN(SyncSession); }; diff --git a/chrome/browser/sync/sessions/sync_session_context.h b/chrome/browser/sync/sessions/sync_session_context.h index c6d87a1..2fe8832 100644 --- a/chrome/browser/sync/sessions/sync_session_context.h +++ b/chrome/browser/sync/sessions/sync_session_context.h @@ -84,6 +84,7 @@ class SyncSessionContext { previous_session_routing_info_ = info; } + // TODO(tim): Deprecated. Bug 26339. sessions::SyncSessionSnapshot* previous_session_snapshot() { return previous_session_snapshot_.get(); } diff --git a/chrome/browser/sync/sessions/sync_session_unittest.cc b/chrome/browser/sync/sessions/sync_session_unittest.cc index 9bb1e3b..681c0ad 100644 --- a/chrome/browser/sync/sessions/sync_session_unittest.cc +++ b/chrome/browser/sync/sessions/sync_session_unittest.cc @@ -4,7 +4,9 @@ #include "chrome/browser/sync/sessions/sync_session.h" +#include "base/ref_counted.h" #include "chrome/browser/sync/engine/conflict_resolver.h" +#include "chrome/browser/sync/engine/mock_model_safe_workers.h" #include "chrome/browser/sync/engine/syncer_types.h" #include "chrome/browser/sync/engine/syncer_util.h" #include "chrome/browser/sync/syncable/directory_manager.h" @@ -108,7 +110,7 @@ TEST_F(SyncSessionTest, ScopedContextHelpers) { TEST_F(SyncSessionTest, SetWriteTransaction) { TestDirectorySetterUpper db; db.SetUp(); - session_.reset(NULL); + session_.reset(); context_.reset(new SyncSessionContext(NULL, db.manager(), this, std::vector<SyncEngineEventListener*>())); session_.reset(MakeSession()); @@ -250,6 +252,47 @@ TEST_F(SyncSessionTest, MoreToSyncIfConflictsResolved) { EXPECT_TRUE(session_->HasMoreToSync()); } +TEST_F(SyncSessionTest, ResetTransientState) { + status()->update_conflicts_resolved(true); + status()->increment_num_successful_commits(); + EXPECT_TRUE(session_->HasMoreToSync()); + session_->ResetTransientState(); + EXPECT_FALSE(status()->conflicts_resolved()); + EXPECT_FALSE(session_->HasMoreToSync()); + EXPECT_FALSE(status()->TestAndClearIsDirty()); +} + +TEST_F(SyncSessionTest, Coalesce) { + std::vector<ModelSafeWorker*> workers_one, workers_two; + ModelSafeRoutingInfo routes_one, routes_two; + SyncSourceInfo source_one(sync_pb::GetUpdatesCallerInfo::PERIODIC, + ParamsMeaningJustOneEnabledType()); + SyncSourceInfo source_two(sync_pb::GetUpdatesCallerInfo::LOCAL, + ParamsMeaningAllEnabledTypes()); + scoped_refptr<MockDBModelWorker> db_worker(new MockDBModelWorker()); + scoped_refptr<MockUIModelWorker> ui_worker(new MockUIModelWorker()); + workers_one.push_back(db_worker); + workers_two.push_back(db_worker); + workers_two.push_back(ui_worker); + routes_one[syncable::AUTOFILL] = GROUP_DB; + routes_two[syncable::AUTOFILL] = GROUP_DB; + routes_two[syncable::BOOKMARKS] = GROUP_UI; + SyncSession one(context_.get(), this, source_one, routes_one, workers_one); + SyncSession two(context_.get(), this, source_two, routes_two, workers_two); + + one.Coalesce(two); + + EXPECT_EQ(two.source().first, one.source().first); + EXPECT_EQ(ParamsMeaningAllEnabledTypes(), one.source().second); + std::vector<ModelSafeWorker*>::const_iterator it_db = + std::find(one.workers().begin(), one.workers().end(), db_worker); + std::vector<ModelSafeWorker*>::const_iterator it_ui = + std::find(one.workers().begin(), one.workers().end(), ui_worker); + EXPECT_NE(it_db, one.workers().end()); + EXPECT_NE(it_ui, one.workers().end()); + EXPECT_EQ(routes_two, one.routing_info()); +} + } // namespace } // namespace sessions } // namespace browser_sync diff --git a/chrome/browser/sync/sessions/test_util.cc b/chrome/browser/sync/sessions/test_util.cc new file mode 100644 index 0000000..d6fdef5 --- /dev/null +++ b/chrome/browser/sync/sessions/test_util.cc @@ -0,0 +1,53 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/sessions/test_util.h" + +namespace browser_sync { +namespace sessions { +namespace test_util { + +void SimulateHasMoreToSync(sessions::SyncSession* session) { + session->status_controller()->update_conflicts_resolved(true); + ASSERT_TRUE(session->HasMoreToSync()); +} + +void SimulateDownloadUpdatesFailed(sessions::SyncSession* session) { + // Note that a non-zero value of changes_remaining once a session has + // completed implies that the Syncer was unable to exhaust this count during + // the GetUpdates cycle. This is an indication that an error occurred. + session->status_controller()->set_num_server_changes_remaining(1); +} + +void SimulateCommitFailed(sessions::SyncSession* session) { + // Note that a non-zero number of unsynced handles once a session has + // completed implies that the Syncer was unable to make forward progress + // during a commit, indicating an error occurred. + // See implementation of SyncSession::HasMoreToSync. + std::vector<int64> handles; + handles.push_back(1); + session->status_controller()->set_unsynced_handles(handles); +} + +void SimulateSuccess(sessions::SyncSession* session) { + if (session->HasMoreToSync()) { + ADD_FAILURE() << "Shouldn't have more to sync"; + } + ASSERT_EQ(0U, session->status_controller()->num_server_changes_remaining()); + ASSERT_EQ(0U, session->status_controller()->unsynced_handles().size()); +} + +void SimulateThrottledImpl(sessions::SyncSession* session, + const base::TimeDelta& delta) { + session->delegate()->OnSilencedUntil(base::TimeTicks::Now() + delta); +} + +void SimulatePollIntervalUpdateImpl(sessions::SyncSession* session, + const base::TimeDelta& new_poll) { + session->delegate()->OnReceivedLongPollIntervalUpdate(new_poll); +} + +} // namespace test_util +} // namespace sessions +} // namespace browser_sync diff --git a/chrome/browser/sync/sessions/test_util.h b/chrome/browser/sync/sessions/test_util.h new file mode 100644 index 0000000..420cf3f --- /dev/null +++ b/chrome/browser/sync/sessions/test_util.h @@ -0,0 +1,39 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Utils to simulate various outcomes of a sync session. +#ifndef CHROME_BROWSER_SYNC_SESSIONS_TEST_UTIL_H_ +#define CHROME_BROWSER_SYNC_SESSIONS_TEST_UTIL_H_ +#pragma once + +#include "chrome/browser/sync/sessions/sync_session.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/gmock/include/gmock/gmock.h" + +namespace browser_sync { +namespace sessions { +namespace test_util { + +void SimulateHasMoreToSync(sessions::SyncSession* session); +void SimulateDownloadUpdatesFailed(sessions::SyncSession* session); +void SimulateCommitFailed(sessions::SyncSession* session); +void SimulateSuccess(sessions::SyncSession* session); +void SimulateThrottledImpl(sessions::SyncSession* session, + const base::TimeDelta& delta); +void SimulatePollIntervalUpdateImpl(sessions::SyncSession* session, + const base::TimeDelta& new_poll); + +ACTION_P(SimulateThrottled, throttle) { + SimulateThrottledImpl(arg0, throttle); +} + +ACTION_P(SimulatePollIntervalUpdate, poll) { + SimulatePollIntervalUpdateImpl(arg0, poll); +} + +} // namespace test_util +} // namespace sessions +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_SESSIONS_TEST_UTIL_H_ diff --git a/chrome/browser/sync/test_profile_sync_service.cc b/chrome/browser/sync/test_profile_sync_service.cc index 9886dd1..0b9ec18 100644 --- a/chrome/browser/sync/test_profile_sync_service.cc +++ b/chrome/browser/sync/test_profile_sync_service.cc @@ -16,6 +16,7 @@ using browser_sync::ModelSafeRoutingInfo; using browser_sync::sessions::ErrorCounters; +using browser_sync::sessions::SyncSourceInfo; using browser_sync::sessions::SyncerStatus; using browser_sync::sessions::SyncSessionSnapshot; using syncable::DirectoryManager; @@ -81,7 +82,8 @@ void SyncBackendHostForProfileSyncTest:: } core_->HandleSyncCycleCompletedOnFrontendLoop(new SyncSessionSnapshot( SyncerStatus(), ErrorCounters(), 0, false, - sync_ended, download_progress_markers, false, false, 0, 0, false)); + sync_ended, download_progress_markers, false, false, 0, 0, false, + SyncSourceInfo())); } sync_api::HttpPostProviderFactory* diff --git a/chrome/chrome.gyp b/chrome/chrome.gyp index b27b214..1f421cb 100644 --- a/chrome/chrome.gyp +++ b/chrome/chrome.gyp @@ -920,6 +920,9 @@ 'browser/sync/engine/net/syncapi_server_connection_manager.h', 'browser/sync/engine/net/url_translator.cc', 'browser/sync/engine/net/url_translator.h', + 'browser/sync/engine/nudge_source.h', + 'browser/sync/engine/polling_constants.cc', + 'browser/sync/engine/polling_constants.h', 'browser/sync/engine/post_commit_message_command.cc', 'browser/sync/engine/post_commit_message_command.h', 'browser/sync/engine/process_commit_response_command.cc', @@ -939,6 +942,8 @@ 'browser/sync/engine/syncer_end_command.h', 'browser/sync/engine/syncer_proto_util.cc', 'browser/sync/engine/syncer_proto_util.h', + 'browser/sync/engine/syncer_thread2.cc', + 'browser/sync/engine/syncer_thread2.h', 'browser/sync/engine/syncer_thread.cc', 'browser/sync/engine/syncer_thread.h', 'browser/sync/engine/syncer_types.h', diff --git a/chrome/chrome_tests.gypi b/chrome/chrome_tests.gypi index 0f3211b..bb56e40 100644 --- a/chrome/chrome_tests.gypi +++ b/chrome/chrome_tests.gypi @@ -2701,12 +2701,14 @@ 'browser/sync/engine/clear_data_command_unittest.cc', 'browser/sync/engine/cleanup_disabled_types_command_unittest.cc', 'browser/sync/engine/download_updates_command_unittest.cc', + 'browser/sync/engine/mock_model_safe_workers.cc', 'browser/sync/engine/mock_model_safe_workers.h', 'browser/sync/engine/process_commit_response_command_unittest.cc', 'browser/sync/engine/read_node_mock.h', 'browser/sync/engine/syncapi_unittest.cc', 'browser/sync/engine/syncer_proto_util_unittest.cc', 'browser/sync/engine/syncer_thread_unittest.cc', + 'browser/sync/engine/syncer_thread2_unittest.cc', 'browser/sync/engine/syncer_unittest.cc', 'browser/sync/engine/syncproto_unittest.cc', 'browser/sync/engine/syncapi_mock.h', @@ -2720,6 +2722,8 @@ 'browser/sync/sessions/ordered_commit_set_unittest.cc', 'browser/sync/sessions/status_controller_unittest.cc', 'browser/sync/sessions/sync_session_unittest.cc', + 'browser/sync/sessions/test_util.cc', + 'browser/sync/sessions/test_util.h', 'browser/sync/syncable/directory_backing_store_unittest.cc', 'browser/sync/syncable/syncable_id_unittest.cc', 'browser/sync/syncable/syncable_mock.h', |