summaryrefslogtreecommitdiffstats
path: root/chrome
diff options
context:
space:
mode:
authortim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-09-25 23:09:36 +0000
committertim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-09-25 23:09:36 +0000
commit2186fec4df1dde813a12011eb1ea650ce8470ea2 (patch)
tree96ebaec4e2e3bf246f0730b7093d153011fbbd7b /chrome
parentb17b3c8b37c0acd4528a338583f851a5a10063b0 (diff)
downloadchromium_src-2186fec4df1dde813a12011eb1ea650ce8470ea2.zip
chromium_src-2186fec4df1dde813a12011eb1ea650ce8470ea2.tar.gz
chromium_src-2186fec4df1dde813a12011eb1ea650ce8470ea2.tar.bz2
Second attempt at the new syncer thread impl, now with less crashes!
Previous one at http://codereview.chromium.org/214033. I had local edits that resulted in initializing the CommandLine for syncapi, but didn't have them as part of the patch, so this was causing a crash whenever SyncerThreadFactory::Create was called. The only diff here is the call to CommandLine::Init in syncapi.cc. This effectively means you can't change the syncer thread impl on linux (we init an empty command line there), but this is OK. Once we link statically we won't need to do this. TEST=ProfileSyncServiceTest, SyncerThreadTest, SyncerThreadWithSyncerTest Review URL: http://codereview.chromium.org/250001 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@27281 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome')
-rw-r--r--chrome/browser/sync/engine/syncapi.cc19
-rw-r--r--chrome/browser/sync/engine/syncer_thread.cc317
-rw-r--r--chrome/browser/sync/engine/syncer_thread.h190
-rw-r--r--chrome/browser/sync/engine/syncer_thread_pthreads.cc578
-rw-r--r--chrome/browser/sync/engine/syncer_thread_pthreads.h284
-rw-r--r--chrome/browser/sync/engine/syncer_thread_timed_stop.cc119
-rw-r--r--chrome/browser/sync/engine/syncer_thread_timed_stop.h53
-rw-r--r--chrome/browser/sync/engine/syncer_thread_unittest.cc244
-rw-r--r--chrome/chrome.gyp5
-rw-r--r--chrome/common/chrome_switches.cc10
-rw-r--r--chrome/common/chrome_switches.h2
-rw-r--r--chrome/test/sync/engine/mock_server_connection.cc22
-rw-r--r--chrome/test/sync/engine/mock_server_connection.h9
-rw-r--r--chrome/test/sync/engine/test_directory_setter_upper.cc20
-rw-r--r--chrome/test/sync/engine/test_directory_setter_upper.h22
15 files changed, 1637 insertions, 257 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc
index f1ca43d..3e414a2 100644
--- a/chrome/browser/sync/engine/syncapi.cc
+++ b/chrome/browser/sync/engine/syncapi.cc
@@ -18,6 +18,7 @@
#include "base/at_exit.h"
#include "base/basictypes.h"
+#include "base/command_line.h"
#include "base/scoped_ptr.h"
#include "base/string_util.h"
#include "chrome/browser/sync/engine/all_status.h"
@@ -53,6 +54,7 @@ using browser_sync::Syncer;
using browser_sync::SyncerEvent;
using browser_sync::SyncerStatus;
using browser_sync::SyncerThread;
+using browser_sync::SyncerThreadFactory;
using browser_sync::UserSettings;
using browser_sync::TalkMediator;
using browser_sync::TalkMediatorImpl;
@@ -873,7 +875,7 @@ class SyncManager::SyncInternal {
scoped_ptr<SyncAPIServerConnectionManager> connection_manager_;
// The thread that runs the Syncer. Needs to be explicitly Start()ed.
- scoped_ptr<SyncerThread> syncer_thread_;
+ scoped_refptr<SyncerThread> syncer_thread_;
// Notification (xmpp) handler.
scoped_ptr<TalkMediator> talk_mediator_;
@@ -995,6 +997,11 @@ bool SyncManager::SyncInternal::Init(
g_log_files_initialized = true;
}
+ // TODO(timsteele): We need to do this for syncapi.dll, but should remove
+ // once we link statically. On windows this will set up the correct command
+ // line, on posix it will be create an empty command line.
+ CommandLine::Init(0, NULL);
+
// Set up UserSettings, creating the db if necessary. We need this to
// instantiate a URLFactory to give to the Syncer.
PathString settings_db_file = AppendSlash(database_location) +
@@ -1063,11 +1070,11 @@ bool SyncManager::SyncInternal::Init(
// on the Syncer side, and |model_safe_worker| on the API client side.
ModelSafeWorkerBridge* worker = new ModelSafeWorkerBridge(model_safe_worker);
- syncer_thread_.reset(new SyncerThread(&command_channel_,
- dir_manager(),
- connection_manager(),
- &allstatus_,
- worker));
+ syncer_thread_ = SyncerThreadFactory::Create(&command_channel_,
+ dir_manager(),
+ connection_manager(),
+ &allstatus_,
+ worker);
syncer_thread()->WatchTalkMediator(talk_mediator());
allstatus()->WatchSyncerThread(syncer_thread());
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc
index a53ab93..0ef436b 100644
--- a/chrome/browser/sync/engine/syncer_thread.cc
+++ b/chrome/browser/sync/engine/syncer_thread.cc
@@ -1,7 +1,6 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-
#include "chrome/browser/sync/engine/syncer_thread.h"
#include "build/build_config.h"
@@ -16,20 +15,23 @@
#include <map>
#include <queue>
+#include "base/command_line.h"
#include "chrome/browser/sync/engine/auth_watcher.h"
#include "chrome/browser/sync/engine/model_safe_worker.h"
#include "chrome/browser/sync/engine/net/server_connection_manager.h"
#include "chrome/browser/sync/engine/syncer.h"
+#include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
+#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
#include "chrome/browser/sync/notifier/listener/talk_mediator.h"
#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
#include "chrome/browser/sync/syncable/directory_manager.h"
+#include "chrome/common/chrome_switches.h"
using std::priority_queue;
using std::min;
-
-static inline bool operator < (const timespec& a, const timespec& b) {
- return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
-}
+using base::Time;
+using base::TimeDelta;
+using base::TimeTicks;
namespace {
@@ -108,17 +110,56 @@ int UserIdleTime() {
namespace browser_sync {
+SyncerThread* SyncerThreadFactory::Create(
+ ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager, AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker) {
+ const CommandLine* cmd = CommandLine::ForCurrentProcess();
+ if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) {
+ return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
+ all_status, model_safe_worker);
+ } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) {
+ return new SyncerThreadPthreads(command_channel, mgr, connection_manager,
+ all_status, model_safe_worker);
+ } else {
+ // The default SyncerThread implementation, which does not time-out when
+ // Stop is called.
+ return new SyncerThread(command_channel, mgr, connection_manager,
+ all_status, model_safe_worker);
+ }
+}
+
bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
- MutexLock lock(&mutex_);
- if (syncer_ == NULL) {
+ AutoLock lock(lock_);
+ if (vault_.syncer_ == NULL) {
return false;
}
NudgeSyncImpl(milliseconds_from_now, source);
return true;
}
-void* RunSyncerThread(void* syncer_thread) {
- return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
+SyncerThread::SyncerThread()
+ : thread_main_started_(false, false),
+ thread_("SyncEngine_SyncerThread"),
+ vault_field_changed_(&lock_),
+ p2p_authenticated_(false),
+ p2p_subscribed_(false),
+ client_command_hookup_(NULL),
+ conn_mgr_hookup_(NULL),
+ allstatus_(NULL),
+ dirman_(NULL),
+ scm_(NULL),
+ syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
+ syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
+ syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
+ syncer_max_interval_(kDefaultMaxPollIntervalMs),
+ talk_mediator_hookup_(NULL),
+ command_channel_(NULL),
+ directory_manager_hookup_(NULL),
+ syncer_events_(NULL),
+ model_safe_worker_(NULL),
+ disable_idle_detection_(false) {
}
SyncerThread::SyncerThread(
@@ -127,15 +168,14 @@ SyncerThread::SyncerThread(
ServerConnectionManager* connection_manager,
AllStatus* all_status,
ModelSafeWorker* model_safe_worker)
- : stop_syncer_thread_(false),
- thread_running_(false),
- connected_(false),
+ : thread_main_started_(false, false),
+ thread_("SyncEngine_SyncerThread"),
+ vault_field_changed_(&lock_),
p2p_authenticated_(false),
p2p_subscribed_(false),
client_command_hookup_(NULL),
conn_mgr_hookup_(NULL),
allstatus_(all_status),
- syncer_(NULL),
dirman_(mgr),
scm_(connection_manager),
syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
@@ -172,55 +212,76 @@ SyncerThread::~SyncerThread() {
syncer_event_channel_.reset();
directory_manager_hookup_.reset();
syncer_events_.reset();
- delete syncer_;
+ delete vault_.syncer_;
talk_mediator_hookup_.reset();
- CHECK(!thread_running_);
+ CHECK(!thread_.IsRunning());
}
// Creates and starts a syncer thread.
// Returns true if it creates a thread or if there's currently a thread running
// and false otherwise.
bool SyncerThread::Start() {
- MutexLock lock(&mutex_);
- if (thread_running_) {
- return true;
- }
- thread_running_ =
- (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
- if (thread_running_) {
- pthread_detach(thread_);
+ {
+ AutoLock lock(lock_);
+ if (thread_.IsRunning()) {
+ return true;
+ }
+
+ if (!thread_.Start()) {
+ return false;
+ }
}
- return thread_running_;
+
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
+ &SyncerThread::ThreadMain));
+
+ // Wait for notification that our task makes it safely onto the message
+ // loop before returning, so the caller can't call Stop before we're
+ // actually up and running. This is for consistency with the old pthread
+ // impl because pthread_create would do this in one step.
+ thread_main_started_.Wait();
+ LOG(INFO) << "SyncerThread started.";
+ return true;
}
// Stop processing. A max wait of at least 2*server RTT time is recommended.
// Returns true if we stopped, false otherwise.
bool SyncerThread::Stop(int max_wait) {
- MutexLock lock(&mutex_);
- if (!thread_running_)
- return true;
- stop_syncer_thread_ = true;
- if (NULL != syncer_) {
- // Try to early exit the syncer.
- syncer_->RequestEarlyExit();
- }
- pthread_cond_broadcast(&changed_.condvar_);
- timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
- do {
- const int wait_result = max_wait < 0 ?
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
- pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
- &deadline);
- if (ETIMEDOUT == wait_result) {
- LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
- return false;
+ {
+ AutoLock lock(lock_);
+ // If the thread has been started, then we either already have or are about
+ // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
+ // it to finish. If the thread has not been started --and we now own the
+ // lock-- then we can early out because the caller has not called Start().
+ if (!thread_.IsRunning())
+ return true;
+
+ LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
+ << "true (vault_.stop_syncer_thread_)";
+ // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
+ // below).
+ vault_.stop_syncer_thread_ = true;
+ if (NULL != vault_.syncer_) {
+ // Try to early exit the syncer itself, which could be looping inside
+ // SyncShare.
+ vault_.syncer_->RequestEarlyExit();
}
- } while (thread_running_);
+
+ // stop_syncer_thread_ is now true and the Syncer has been told to exit.
+ // We want to wake up all waiters so they can re-examine state. We signal,
+ // causing all waiters to try to re-acquire the lock, and then we release
+ // the lock, and join on our internal thread which should soon run off the
+ // end of ThreadMain.
+ vault_field_changed_.Broadcast();
+ }
+
+ // This will join, and finish when ThreadMain terminates.
+ thread_.Stop();
return true;
}
void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
- PThreadScopedLock<PThreadMutex> lock(&mutex_);
+ AutoLock lock(lock_);
client_command_hookup_.reset(NewEventListenerHookup(channel, this,
&SyncerThread::HandleClientCommand));
}
@@ -241,67 +302,89 @@ void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) {
}
void SyncerThread::ThreadMainLoop() {
+ // This is called with lock_ acquired.
+ lock_.AssertAcquired();
+ LOG(INFO) << "In thread main loop.";
+
// Use the short poll value by default.
- int poll_seconds = syncer_short_poll_interval_seconds_;
+ TimeDelta poll_seconds =
+ TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
int user_idle_milliseconds = 0;
- timespec last_sync_time = { 0 };
+ TimeTicks last_sync_time;
bool initial_sync_for_thread = true;
bool continue_sync_cycle = false;
- while (!stop_syncer_thread_) {
- if (!connected_) {
+ while (!vault_.stop_syncer_thread_) {
+ // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
+ // below) because we cannot poll until these conditions are met, so we wait
+ // indefinitely.
+ if (!vault_.connected_) {
LOG(INFO) << "Syncer thread waiting for connection.";
- while (!connected_ && !stop_syncer_thread_)
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
- LOG_IF(INFO, connected_) << "Syncer thread found connection.";
+ while (!vault_.connected_ && !vault_.stop_syncer_thread_)
+ vault_field_changed_.Wait();
+ LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection.";
continue;
}
- if (syncer_ == NULL) {
+ if (vault_.syncer_ == NULL) {
LOG(INFO) << "Syncer thread waiting for database initialization.";
- while (syncer_ == NULL && !stop_syncer_thread_)
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
- LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
+ while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
+ vault_field_changed_.Wait();
+ LOG_IF(INFO, !(vault_.syncer_ == NULL))
+ << "Syncer was found after DB started.";
continue;
}
- timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
- last_sync_time.tv_nsec };
- const timespec wake_time =
- !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
- nudge_queue_.top().first : next_poll;
- LOG(INFO) << "wake time is " << wake_time.tv_sec;
- LOG(INFO) << "next poll is " << next_poll.tv_sec;
-
- const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
- &wake_time);
- if (ETIMEDOUT != error) {
- continue; // Check all the conditions again.
+ const TimeTicks next_poll = last_sync_time + poll_seconds;
+ const TimeTicks end_wait =
+ !vault_.nudge_queue_.empty() &&
+ vault_.nudge_queue_.top().first < next_poll ?
+ vault_.nudge_queue_.top().first : next_poll;
+ LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
+ LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();
+
+ // We block until the CV is signaled (e.g a control field changed, loss of
+ // network connection, nudge, spurious, etc), or the poll interval elapses.
+ TimeDelta sleep_time = end_wait - TimeTicks::Now();
+ if (sleep_time > TimeDelta::FromSeconds(0)) {
+ vault_field_changed_.TimedWait(sleep_time);
+
+ if (TimeTicks::Now() < end_wait) {
+ // Didn't timeout. Could be a spurious signal, or a signal corresponding
+ // to an actual change in one of our control fields. By continuing here
+ // we perform the typical "always recheck conditions when signaled",
+ // (typically handled by a while(condition_not_met) cv.wait() construct)
+ // because we jump to the top of the loop. The main difference is we
+ // recalculate the wait interval, but last_sync_time won't have changed.
+ // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
+ // off the queue and wait for that delta. If it was a spurious signal,
+ // we'll keep waiting for the same moment in time as we just were.
+ continue;
+ }
}
- const timespec now = GetPThreadAbsoluteTime(0);
-
// Handle a nudge, caused by either a notification or a local bookmark
// event. This will also update the source of the following SyncMain call.
- UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
+ UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread);
- LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
- SyncMain(syncer_);
- last_sync_time = now;
+ LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
+ SyncMain(vault_.syncer_);
+ last_sync_time = TimeTicks::Now();
LOG(INFO) << "Updating the next polling time after SyncMain";
- poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
- poll_seconds,
- &user_idle_milliseconds,
- &continue_sync_cycle);
+ poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime(
+ allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()),
+ &user_idle_milliseconds, &continue_sync_cycle));
}
+
}
// We check how long the user's been idle and sync less often if the machine is
// not in use. The aim is to reduce server load.
+// TODO(timsteele): Should use Time(Delta).
int SyncerThread::CalculatePollingWaitTime(
const AllStatus::Status& status,
- int last_poll_wait, // in s
+ int last_poll_wait, // Time in seconds.
int* user_idle_milliseconds,
bool* continue_sync_cycle) {
bool is_continuing_sync_cyle = *continue_sync_cycle;
@@ -352,30 +435,25 @@ int SyncerThread::CalculatePollingWaitTime(
return actual_next_wait;
}
-void* SyncerThread::ThreadMain() {
- NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
- mutex_.Lock();
+void SyncerThread::ThreadMain() {
+ AutoLock lock(lock_);
+ // Signal Start() to let it know we've made it safely onto the message loop,
+ // and unblock it's caller.
+ thread_main_started_.Signal();
ThreadMainLoop();
- thread_running_ = false;
- pthread_cond_broadcast(&changed_.condvar_);
- mutex_.Unlock();
- LOG(INFO) << "Syncer thread exiting.";
- return 0;
+ LOG(INFO) << "Syncer thread ThreadMain is done.";
}
void SyncerThread::SyncMain(Syncer* syncer) {
CHECK(syncer);
- mutex_.Unlock();
+ AutoUnlock unlock(lock_);
while (syncer->SyncShare()) {
LOG(INFO) << "Looping in sync share";
}
LOG(INFO) << "Done looping in sync share";
-
- mutex_.Lock();
}
-void SyncerThread::UpdateNudgeSource(const timespec& now,
- bool* continue_sync_cycle,
+void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle,
bool* initial_sync) {
bool nudged = false;
NudgeSource nudge_source = kUnknown;
@@ -385,13 +463,14 @@ void SyncerThread::UpdateNudgeSource(const timespec& now,
}
// Update the nudge source if a new nudge has come through during the
// previous sync cycle.
- while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
+ while (!vault_.nudge_queue_.empty() &&
+ TimeTicks::Now() >= vault_.nudge_queue_.top().first) {
if (!nudged) {
- nudge_source = nudge_queue_.top().second;
+ nudge_source = vault_.nudge_queue_.top().second;
*continue_sync_cycle = false; // Reset the continuation token on nudge.
nudged = true;
}
- nudge_queue_.pop();
+ vault_.nudge_queue_.pop();
}
SetUpdatesSource(nudged, nudge_source, initial_sync);
}
@@ -422,11 +501,11 @@ void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
break;
}
}
- syncer_->set_updates_source(updates_source);
+ vault_.syncer_->set_updates_source(updates_source);
}
void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
- MutexLock lock(&mutex_);
+ AutoLock lock(lock_);
channel()->NotifyListeners(event);
if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
return;
@@ -438,33 +517,33 @@ void SyncerThread::HandleDirectoryManagerEvent(
const syncable::DirectoryManagerEvent& event) {
LOG(INFO) << "Handling a directory manager event";
if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
- MutexLock lock(&mutex_);
+ AutoLock lock(lock_);
LOG(INFO) << "Syncer starting up for: " << event.dirname;
// The underlying database structure is ready, and we should create
// the syncer.
- CHECK(syncer_ == NULL);
- syncer_ =
+ CHECK(vault_.syncer_ == NULL);
+ vault_.syncer_ =
new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
- syncer_->set_command_channel(command_channel_);
+ vault_.syncer_->set_command_channel(command_channel_);
syncer_events_.reset(NewEventListenerHookup(
- syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
- pthread_cond_broadcast(&changed_.condvar_);
+ vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
+ vault_field_changed_.Broadcast();
}
}
static inline void CheckConnected(bool* connected,
HttpResponse::ServerConnectionCode code,
- pthread_cond_t* condvar) {
+ ConditionVariable* condvar) {
if (*connected) {
if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
*connected = false;
- pthread_cond_broadcast(condvar);
+ condvar->Broadcast();
}
} else {
if (HttpResponse::SERVER_CONNECTION_OK == code) {
*connected = true;
- pthread_cond_broadcast(condvar);
+ condvar->Broadcast();
}
}
}
@@ -472,16 +551,16 @@ static inline void CheckConnected(bool* connected,
void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
&SyncerThread::HandleServerConnectionEvent));
- CheckConnected(&connected_, conn_mgr->server_status(),
- &changed_.condvar_);
+ CheckConnected(&vault_.connected_, conn_mgr->server_status(),
+ &vault_field_changed_);
}
void SyncerThread::HandleServerConnectionEvent(
const ServerConnectionEvent& event) {
if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
- MutexLock lock(&mutex_);
- CheckConnected(&connected_, event.connection_code,
- &changed_.condvar_);
+ AutoLock lock(lock_);
+ CheckConnected(&vault_.connected_, event.connection_code,
+ &vault_field_changed_);
}
}
@@ -513,10 +592,11 @@ int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
// Called with mutex_ already locked.
void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
NudgeSource source) {
- const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
+ const TimeTicks nudge_time = TimeTicks::Now() +
+ TimeDelta::FromMilliseconds(milliseconds_from_now);
NudgeObject nudge_object(nudge_time, source);
- nudge_queue_.push(nudge_object);
- pthread_cond_broadcast(&changed_.condvar_);
+ vault_.nudge_queue_.push(nudge_object);
+ vault_field_changed_.Broadcast();
}
void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
@@ -528,7 +608,7 @@ void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
}
void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
- MutexLock lock(&mutex_);
+ AutoLock lock(lock_);
switch (event.what_happened) {
case TalkMediatorEvent::LOGIN_SUCCEEDED:
LOG(INFO) << "P2P: Login succeeded.";
@@ -541,7 +621,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
case TalkMediatorEvent::SUBSCRIPTIONS_ON:
LOG(INFO) << "P2P: Subscriptions successfully enabled.";
p2p_subscribed_ = true;
- if (NULL != syncer_) {
+ if (NULL != vault_.syncer_) {
LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
NudgeSyncImpl(0, kLocal);
}
@@ -552,7 +632,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
break;
case TalkMediatorEvent::NOTIFICATION_RECEIVED:
LOG(INFO) << "P2P: Updates on server, pushing syncer";
- if (NULL != syncer_) {
+ if (NULL != vault_.syncer_) {
NudgeSyncImpl(0, kNotification);
}
break;
@@ -560,8 +640,9 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
break;
}
- if (NULL != syncer_) {
- syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
+ if (NULL != vault_.syncer_) {
+ vault_.syncer_->set_notifications_enabled(
+ p2p_authenticated_ && p2p_subscribed_);
}
}
diff --git a/chrome/browser/sync/engine/syncer_thread.h b/chrome/browser/sync/engine/syncer_thread.h
index ca22969e..4d1ae95 100644
--- a/chrome/browser/sync/engine/syncer_thread.h
+++ b/chrome/browser/sync/engine/syncer_thread.h
@@ -3,7 +3,8 @@
// found in the LICENSE file.
//
// A class to run the syncer on a thread.
-
+// This is the default implementation of SyncerThread whose Stop implementation
+// does not support a timeout, but is greatly simplified.
#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_
#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_
@@ -13,11 +14,15 @@
#include <vector>
#include "base/basictypes.h"
+#include "base/condition_variable.h"
+#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
+#include "base/thread.h"
+#include "base/time.h"
+#include "base/waitable_event.h"
#include "chrome/browser/sync/engine/all_status.h"
#include "chrome/browser/sync/engine/client_command_channel.h"
#include "chrome/browser/sync/util/event_sys-inl.h"
-#include "chrome/browser/sync/util/pthread_helpers.h"
#include "testing/gtest/include/gtest/gtest_prod.h" // For FRIEND_TEST
class EventListenerHookup;
@@ -39,20 +44,42 @@ struct SyncerEvent;
struct SyncerShutdownEvent;
struct TalkMediatorEvent;
-class SyncerThread {
- FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime);
- FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime);
-
+class SyncerThreadFactory {
public:
- friend class SyncerThreadTest;
+ // Creates a SyncerThread based on the default (or user-overridden)
+ // implementation. The thread does not start running until you call Start(),
+ // which will cause it to check-and-wait for certain conditions to be met
+ // (such as valid connection with Server established, syncable::Directory has
+ // been opened) before performing an intial sync with a server. It uses
+ // |connection_manager| to detect valid connections, and |mgr| to detect the
+ // opening of a Directory, which will cause it to create a Syncer object for
+ // said Directory, and assign |model_safe_worker| to it. |connection_manager|
+ // and |mgr| should outlive the SyncerThread. You must stop the thread by
+ // calling Stop before destroying the object. Stopping will first tear down
+ // the Syncer object, allowing it to finish work in progress, before joining
+ // the Stop-calling thread with the internal thread.
+ static SyncerThread* Create(ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager, AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker);
+ private:
+ DISALLOW_IMPLICIT_CONSTRUCTORS(SyncerThreadFactory);
+};
+class SyncerThread : public base::RefCountedThreadSafe<SyncerThread> {
+ FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime);
+ FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime);
+ FRIEND_TEST(SyncerThreadWithSyncerTest, Polling);
+ FRIEND_TEST(SyncerThreadWithSyncerTest, Nudge);
+ friend class SyncerThreadWithSyncerTest;
+ friend class SyncerThreadFactory;
+public:
enum NudgeSource {
kUnknown = 0,
kNotification,
kLocal,
kContinuation
};
-
// Server can overwrite these values via client commands.
// Standard short poll. This is used when XMPP is off.
static const int kDefaultShortPollIntervalSeconds = 60;
@@ -62,54 +89,106 @@ class SyncerThread {
// longest possible poll interval.
static const int kDefaultMaxPollIntervalMs = 30 * 60 * 1000;
- SyncerThread(ClientCommandChannel* command_channel,
- syncable::DirectoryManager* mgr,
- ServerConnectionManager* connection_manager, AllStatus* all_status,
- ModelSafeWorker* model_safe_worker);
- ~SyncerThread();
+ virtual ~SyncerThread();
+
+ virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr);
- void WatchConnectionManager(ServerConnectionManager* conn_mgr);
- // Creates and starts a syncer thread.
+ // Starts a syncer thread.
// Returns true if it creates a thread or if there's currently a thread
// running and false otherwise.
- bool Start();
+ virtual bool Start();
- // Stop processing. A max wait of at least 2*server RTT time is recommended.
- // returns true if we stopped, false otherwise.
- bool Stop(int max_wait);
+ // Stop processing. |max_wait| doesn't do anything in this version.
+ virtual bool Stop(int max_wait);
- // Nudges the syncer to sync with a delay specified. This API is for access
+ // Nudges the syncer to sync with a delay specified. This API is for access
// from the SyncerThread's controller and will cause a mutex lock.
- bool NudgeSyncer(int milliseconds_from_now, NudgeSource source);
+ virtual bool NudgeSyncer(int milliseconds_from_now, NudgeSource source);
// Registers this thread to watch talk mediator events.
- void WatchTalkMediator(TalkMediator* talk_mediator);
+ virtual void WatchTalkMediator(TalkMediator* talk_mediator);
- void WatchClientCommands(ClientCommandChannel* channel);
+ virtual void WatchClientCommands(ClientCommandChannel* channel);
- SyncerEventChannel* channel();
+ virtual SyncerEventChannel* channel();
- private:
- // A few members to gate the rate at which we nudge the syncer.
- enum {
- kNudgeRateLimitCount = 6,
- kNudgeRateLimitTime = 180,
- };
+ protected:
+ SyncerThread(); // Necessary for temporary pthreads-based PIMPL impl.
+ SyncerThread(ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager, AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker);
+ virtual void ThreadMain();
+ void ThreadMainLoop();
- // A queue of all scheduled nudges. One insertion for every call to
- // NudgeQueue().
- typedef std::pair<timespec, NudgeSource> NudgeObject;
+ virtual void SetConnected(bool connected) {
+ DCHECK(!thread_.IsRunning());
+ vault_.connected_ = connected;
+ }
+
+ virtual void SetSyncerPollingInterval(base::TimeDelta interval) {
+ // TODO(timsteele): Use TimeDelta internally.
+ syncer_polling_interval_ = static_cast<int>(interval.InSeconds());
+ }
+ virtual void SetSyncerShortPollInterval(base::TimeDelta interval) {
+ // TODO(timsteele): Use TimeDelta internally.
+ syncer_short_poll_interval_seconds_ =
+ static_cast<int>(interval.InSeconds());
+ }
+
+ // Needed to emulate the behavior of pthread_create, which synchronously
+ // started the thread and set the value of thread_running_ to true.
+ // We can't quite match that because we asynchronously post the task,
+ // which opens a window for Stop to get called before the task actually
+ // makes it. To prevent this, we block Start() until we're sure it's ok.
+ base::WaitableEvent thread_main_started_;
- struct IsTimeSpecGreater {
+ // Handle of the running thread.
+ base::Thread thread_;
+
+ typedef std::pair<base::TimeTicks, NudgeSource> NudgeObject;
+
+ struct IsTimeTicksGreater {
inline bool operator() (const NudgeObject& lhs, const NudgeObject& rhs) {
- return lhs.first.tv_sec == rhs.first.tv_sec ?
- lhs.first.tv_nsec > rhs.first.tv_nsec :
- lhs.first.tv_sec > rhs.first.tv_sec;
+ return lhs.first > rhs.first;
}
};
typedef std::priority_queue<NudgeObject, std::vector<NudgeObject>,
- IsTimeSpecGreater> NudgeQueue;
+ IsTimeTicksGreater> NudgeQueue;
+
+ // Fields that are modified / accessed by multiple threads go in this struct
+ // for clarity and explicitness.
+ struct ProtectedFields {
+ // False when we want to stop the thread.
+ bool stop_syncer_thread_;
+
+ Syncer* syncer_;
+
+ // State of the server connection.
+ bool connected_;
+
+ // A queue of all scheduled nudges. One insertion for every call to
+ // NudgeQueue().
+ NudgeQueue nudge_queue_;
+
+ ProtectedFields()
+ : stop_syncer_thread_(false), syncer_(NULL), connected_(false) {}
+ } vault_;
+
+ // Gets signaled whenever a thread outside of the syncer thread changes a
+ // protected field in the vault_.
+ ConditionVariable vault_field_changed_;
+
+ // Used to lock everything in |vault_|.
+ Lock lock_;
+
+ private:
+ // A few members to gate the rate at which we nudge the syncer.
+ enum {
+ kNudgeRateLimitCount = 6,
+ kNudgeRateLimitTime = 180,
+ };
// Threshold multipler for how long before user should be considered idle.
static const int kPollBackoffThresholdMultiplier = 10;
@@ -125,9 +204,6 @@ class SyncerThread {
void HandleTalkMediatorEvent(const TalkMediatorEvent& event);
- void* ThreadMain();
- void ThreadMainLoop();
-
void SyncMain(Syncer* syncer);
// Calculates the next sync wait time in seconds. last_poll_wait is the time
@@ -137,42 +213,24 @@ class SyncerThread {
// continue_sync_cycle parameter is used to determine whether or not we are
// calculating a polling wait time that is a continuation of an sync cycle
// which terminated while the syncer still had work to do.
- int CalculatePollingWaitTime(
+ virtual int CalculatePollingWaitTime(
const AllStatus::Status& status,
int last_poll_wait, // in s
int* user_idle_milliseconds,
bool* continue_sync_cycle);
// Helper to above function, considers effect of user idle time.
- int CalculateSyncWaitTime(int last_wait, int user_idle_ms);
+ virtual int CalculateSyncWaitTime(int last_wait, int user_idle_ms);
// Sets the source value of the controlled syncer's updates_source value.
// The initial sync boolean is updated if read as a sentinel. The following
// two methods work in concert to achieve this goal.
- void UpdateNudgeSource(const timespec& now, bool* continue_sync_cycle,
+ void UpdateNudgeSource(bool* continue_sync_cycle,
bool* initial_sync);
void SetUpdatesSource(bool nudged, NudgeSource nudge_source,
bool* initial_sync);
// For unit tests only.
- void DisableIdleDetection() { disable_idle_detection_ = true; }
-
- // False when we want to stop the thread.
- bool stop_syncer_thread_;
-
- // We use one mutex for all members except the channel.
- PThreadMutex mutex_;
- typedef PThreadScopedLock<PThreadMutex> MutexLock;
-
- // Handle of the running thread.
- pthread_t thread_;
- bool thread_running_;
-
- // Gets signaled whenever a thread outside of the syncer thread changes a
- // member variable.
- PThreadCondVar changed_;
-
- // State of the server connection.
- bool connected_;
+ virtual void DisableIdleDetection() { disable_idle_detection_ = true; }
// State of the notification framework is tracked by these values.
bool p2p_authenticated_;
@@ -182,8 +240,6 @@ class SyncerThread {
scoped_ptr<EventListenerHookup> conn_mgr_hookup_;
const AllStatus* allstatus_;
- Syncer* syncer_;
-
syncable::DirectoryManager* dirman_;
ServerConnectionManager* scm_;
@@ -208,8 +264,6 @@ class SyncerThread {
// this is called.
void NudgeSyncImpl(int milliseconds_from_now, NudgeSource source);
- NudgeQueue nudge_queue_;
-
scoped_ptr<EventListenerHookup> talk_mediator_hookup_;
ClientCommandChannel* const command_channel_;
scoped_ptr<EventListenerHookup> directory_manager_hookup_;
@@ -228,4 +282,4 @@ class SyncerThread {
} // namespace browser_sync
-#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_
+#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ \ No newline at end of file
diff --git a/chrome/browser/sync/engine/syncer_thread_pthreads.cc b/chrome/browser/sync/engine/syncer_thread_pthreads.cc
new file mode 100644
index 0000000..9fdfb21
--- /dev/null
+++ b/chrome/browser/sync/engine/syncer_thread_pthreads.cc
@@ -0,0 +1,578 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
+
+#include "build/build_config.h"
+
+#ifdef OS_MACOSX
+#include <CoreFoundation/CFNumber.h>
+#include <IOKit/IOTypes.h>
+#include <IOKit/IOKitLib.h>
+#endif
+
+#include <algorithm>
+#include <map>
+#include <queue>
+
+#include "chrome/browser/sync/engine/auth_watcher.h"
+#include "chrome/browser/sync/engine/model_safe_worker.h"
+#include "chrome/browser/sync/engine/net/server_connection_manager.h"
+#include "chrome/browser/sync/engine/syncer.h"
+#include "chrome/browser/sync/notifier/listener/talk_mediator.h"
+#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
+#include "chrome/browser/sync/syncable/directory_manager.h"
+
+using std::priority_queue;
+using std::min;
+
+static inline bool operator < (const timespec& a, const timespec& b) {
+ return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
+}
+
+namespace {
+
+// Returns the amount of time since the user last interacted with the computer,
+// in milliseconds
+int UserIdleTime() {
+#ifdef OS_WIN
+ LASTINPUTINFO last_input_info;
+ last_input_info.cbSize = sizeof(LASTINPUTINFO);
+
+ // Get time in windows ticks since system start of last activity.
+ BOOL b = ::GetLastInputInfo(&last_input_info);
+ if (b == TRUE)
+ return ::GetTickCount() - last_input_info.dwTime;
+#elif defined(OS_MACOSX)
+ // It would be great to do something like:
+ //
+ // return 1000 *
+ // CGEventSourceSecondsSinceLastEventType(
+ // kCGEventSourceStateCombinedSessionState,
+ // kCGAnyInputEventType);
+ //
+ // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
+ // and can't link that high up the food chain. Thus this mucking in IOKit.
+
+ io_service_t hid_service =
+ IOServiceGetMatchingService(kIOMasterPortDefault,
+ IOServiceMatching("IOHIDSystem"));
+ if (!hid_service) {
+ LOG(WARNING) << "Could not obtain IOHIDSystem";
+ return 0;
+ }
+
+ CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
+ CFSTR("HIDIdleTime"),
+ kCFAllocatorDefault,
+ 0);
+ if (!object) {
+ LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
+ IOObjectRelease(hid_service);
+ return 0;
+ }
+
+ int64 idle_time; // in nanoseconds
+ Boolean success;
+ if (CFGetTypeID(object) == CFNumberGetTypeID()) {
+ success = CFNumberGetValue((CFNumberRef)object,
+ kCFNumberSInt64Type,
+ &idle_time);
+ } else {
+ LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
+ }
+
+ CFRelease(object);
+ IOObjectRelease(hid_service);
+
+ if (!success) {
+ LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
+ return 0;
+ } else {
+ return idle_time / 1000000; // nano to milli
+ }
+#else
+ static bool was_logged = false;
+ if (!was_logged) {
+ was_logged = true;
+ LOG(INFO) << "UserIdleTime unimplemented on this platform, "
+ "synchronization will not throttle when user idle";
+ }
+#endif
+
+ return 0;
+}
+
+} // namespace
+
+namespace browser_sync {
+
+SyncerThreadPthreads::SyncerThreadPthreads(
+ ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager,
+ AllStatus* all_status, ModelSafeWorker* model_safe_worker)
+ : SyncerThread() {
+ impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr,
+ connection_manager, all_status, model_safe_worker));
+}
+
+bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now,
+ SyncerThread::NudgeSource source) {
+ MutexLock lock(&mutex_);
+ if (syncer_ == NULL) {
+ return false;
+ }
+ NudgeSyncImpl(milliseconds_from_now, source);
+ return true;
+}
+
+void* RunSyncerThread(void* syncer_thread) {
+ return (reinterpret_cast<SyncerThreadPthreadImpl*>(
+ syncer_thread))->ThreadMain();
+}
+
+SyncerThreadPthreadImpl::SyncerThreadPthreadImpl(
+ ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager,
+ AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker)
+ : dirman_(mgr), scm_(connection_manager),
+ syncer_(NULL), syncer_events_(NULL), thread_running_(false),
+ syncer_short_poll_interval_seconds_(
+ SyncerThread::kDefaultShortPollIntervalSeconds),
+ syncer_long_poll_interval_seconds_(
+ SyncerThread::kDefaultLongPollIntervalSeconds),
+ syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds),
+ syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs),
+ stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
+ p2p_authenticated_(false), p2p_subscribed_(false),
+ allstatus_(all_status), talk_mediator_hookup_(NULL),
+ command_channel_(command_channel), directory_manager_hookup_(NULL),
+ model_safe_worker_(model_safe_worker),
+ client_command_hookup_(NULL), disable_idle_detection_(false) {
+
+ SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
+ syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
+
+ if (dirman_) {
+ directory_manager_hookup_.reset(NewEventListenerHookup(
+ dirman_->channel(), this,
+ &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent));
+ }
+
+ if (scm_) {
+ WatchConnectionManager(scm_);
+ }
+
+ if (command_channel_) {
+ WatchClientCommands(command_channel_);
+ }
+}
+
+SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() {
+ client_command_hookup_.reset();
+ conn_mgr_hookup_.reset();
+ syncer_event_channel_.reset();
+ directory_manager_hookup_.reset();
+ syncer_events_.reset();
+ delete syncer_;
+ talk_mediator_hookup_.reset();
+ CHECK(!thread_running_);
+}
+
+// Creates and starts a syncer thread.
+// Returns true if it creates a thread or if there's currently a thread running
+// and false otherwise.
+bool SyncerThreadPthreadImpl::Start() {
+ MutexLock lock(&mutex_);
+ if (thread_running_) {
+ return true;
+ }
+ thread_running_ =
+ (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
+ if (thread_running_) {
+ pthread_detach(thread_);
+ }
+ return thread_running_;
+}
+
+// Stop processing. A max wait of at least 2*server RTT time is recommended.
+// Returns true if we stopped, false otherwise.
+bool SyncerThreadPthreadImpl::Stop(int max_wait) {
+ MutexLock lock(&mutex_);
+ if (!thread_running_)
+ return true;
+ stop_syncer_thread_ = true;
+ if (NULL != syncer_) {
+ // Try to early exit the syncer.
+ syncer_->RequestEarlyExit();
+ }
+ pthread_cond_broadcast(&changed_.condvar_);
+ timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
+ do {
+ const int wait_result = max_wait < 0 ?
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
+ pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
+ &deadline);
+ if (ETIMEDOUT == wait_result) {
+ LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
+ return false;
+ }
+ } while (thread_running_);
+ return true;
+}
+
+void SyncerThreadPthreadImpl::WatchClientCommands(
+ ClientCommandChannel* channel) {
+ PThreadScopedLock<PThreadMutex> lock(&mutex_);
+ client_command_hookup_.reset(NewEventListenerHookup(channel, this,
+ &SyncerThreadPthreadImpl::HandleClientCommand));
+}
+
+void SyncerThreadPthreadImpl::HandleClientCommand(
+ ClientCommandChannel::EventType event) {
+ if (!event) {
+ return;
+ }
+
+ // Mutex not really necessary for these.
+ if (event->has_set_sync_poll_interval()) {
+ syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
+ }
+
+ if (event->has_set_sync_long_poll_interval()) {
+ syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
+ }
+}
+
+void SyncerThreadPthreadImpl::ThreadMainLoop() {
+ // Use the short poll value by default.
+ int poll_seconds = syncer_short_poll_interval_seconds_;
+ int user_idle_milliseconds = 0;
+ timespec last_sync_time = { 0 };
+ bool initial_sync_for_thread = true;
+ bool continue_sync_cycle = false;
+
+ while (!stop_syncer_thread_) {
+ if (!connected_) {
+ LOG(INFO) << "Syncer thread waiting for connection.";
+ while (!connected_ && !stop_syncer_thread_)
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
+ LOG_IF(INFO, connected_) << "Syncer thread found connection.";
+ continue;
+ }
+
+ if (syncer_ == NULL) {
+ LOG(INFO) << "Syncer thread waiting for database initialization.";
+ while (syncer_ == NULL && !stop_syncer_thread_)
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
+ LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
+ continue;
+ }
+
+ timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
+ last_sync_time.tv_nsec };
+ const timespec wake_time =
+ !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
+ nudge_queue_.top().first : next_poll;
+ LOG(INFO) << "wake time is " << wake_time.tv_sec;
+ LOG(INFO) << "next poll is " << next_poll.tv_sec;
+
+ const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
+ &wake_time);
+ if (ETIMEDOUT != error) {
+ continue; // Check all the conditions again.
+ }
+
+ const timespec now = GetPThreadAbsoluteTime(0);
+
+ // Handle a nudge, caused by either a notification or a local bookmark
+ // event. This will also update the source of the following SyncMain call.
+ UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
+
+ LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
+ SyncMain(syncer_);
+ last_sync_time = now;
+
+ LOG(INFO) << "Updating the next polling time after SyncMain";
+ poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
+ poll_seconds,
+ &user_idle_milliseconds,
+ &continue_sync_cycle);
+ }
+}
+
+// We check how long the user's been idle and sync less often if the machine is
+// not in use. The aim is to reduce server load.
+int SyncerThreadPthreadImpl::CalculatePollingWaitTime(
+ const AllStatus::Status& status,
+ int last_poll_wait, // in s
+ int* user_idle_milliseconds,
+ bool* continue_sync_cycle) {
+ bool is_continuing_sync_cyle = *continue_sync_cycle;
+ *continue_sync_cycle = false;
+
+ // Determine if the syncer has unfinished work to do from allstatus_.
+ const bool syncer_has_work_to_do =
+ status.updates_available > status.updates_received
+ || status.unsynced_count > 0;
+ LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
+
+ // First calculate the expected wait time, figuring in any backoff because of
+ // user idle time. next_wait is in seconds
+ syncer_polling_interval_ = (!status.notifications_enabled) ?
+ syncer_short_poll_interval_seconds_ :
+ syncer_long_poll_interval_seconds_;
+ int default_next_wait = syncer_polling_interval_;
+ int actual_next_wait = default_next_wait;
+
+ if (syncer_has_work_to_do) {
+ // Provide exponential backoff due to consecutive errors, else attempt to
+ // complete the work as soon as possible.
+ if (!is_continuing_sync_cyle) {
+ actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
+ } else {
+ actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
+ }
+ *continue_sync_cycle = true;
+ } else if (!status.notifications_enabled) {
+ // Ensure that we start exponential backoff from our base polling
+ // interval when we are not continuing a sync cycle.
+ last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
+
+ // Did the user start interacting with the computer again?
+ // If so, revise our idle time (and probably next_sync_time) downwards
+ int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
+ if (new_idle_time < *user_idle_milliseconds) {
+ *user_idle_milliseconds = new_idle_time;
+ }
+ actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
+ *user_idle_milliseconds) / 1000;
+ DCHECK_GE(actual_next_wait, default_next_wait);
+ }
+
+ LOG(INFO) << "Sync wait: idle " << default_next_wait
+ << " non-idle or backoff " << actual_next_wait << ".";
+
+ return actual_next_wait;
+}
+
+void* SyncerThreadPthreadImpl::ThreadMain() {
+ NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
+ mutex_.Lock();
+ ThreadMainLoop();
+ thread_running_ = false;
+ pthread_cond_broadcast(&changed_.condvar_);
+ mutex_.Unlock();
+ LOG(INFO) << "Syncer thread exiting.";
+ return 0;
+}
+
+void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) {
+ CHECK(syncer);
+ mutex_.Unlock();
+ while (syncer->SyncShare()) {
+ LOG(INFO) << "Looping in sync share";
+ }
+ LOG(INFO) << "Done looping in sync share";
+
+ mutex_.Lock();
+}
+
+void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now,
+ bool* continue_sync_cycle,
+ bool* initial_sync) {
+ bool nudged = false;
+ SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown;
+ // Has the previous sync cycle completed?
+ if (continue_sync_cycle) {
+ nudge_source = SyncerThread::kContinuation;
+ }
+ // Update the nudge source if a new nudge has come through during the
+ // previous sync cycle.
+ while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
+ if (!nudged) {
+ nudge_source = nudge_queue_.top().second;
+ *continue_sync_cycle = false; // Reset the continuation token on nudge.
+ nudged = true;
+ }
+ nudge_queue_.pop();
+ }
+ SetUpdatesSource(nudged, nudge_source, initial_sync);
+}
+
+void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged,
+ SyncerThread::NudgeSource nudge_source, bool* initial_sync) {
+ sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
+ sync_pb::GetUpdatesCallerInfo::UNKNOWN;
+ if (*initial_sync) {
+ updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
+ *initial_sync = false;
+ } else if (!nudged) {
+ updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
+ } else {
+ switch (nudge_source) {
+ case SyncerThread::kNotification:
+ updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
+ break;
+ case SyncerThread::kLocal:
+ updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
+ break;
+ case SyncerThread::kContinuation:
+ updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
+ break;
+ case SyncerThread::kUnknown:
+ default:
+ updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
+ break;
+ }
+ }
+ syncer_->set_updates_source(updates_source);
+}
+
+void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) {
+ MutexLock lock(&mutex_);
+ channel()->NotifyListeners(event);
+ if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
+ return;
+ }
+ NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown);
+}
+
+void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent(
+ const syncable::DirectoryManagerEvent& event) {
+ LOG(INFO) << "Handling a directory manager event";
+ if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
+ MutexLock lock(&mutex_);
+ LOG(INFO) << "Syncer starting up for: " << event.dirname;
+ // The underlying database structure is ready, and we should create
+ // the syncer.
+ CHECK(syncer_ == NULL);
+ syncer_ =
+ new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
+
+ syncer_->set_command_channel(command_channel_);
+ syncer_events_.reset(NewEventListenerHookup(
+ syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent));
+ pthread_cond_broadcast(&changed_.condvar_);
+ }
+}
+
+static inline void CheckConnected(bool* connected,
+ HttpResponse::ServerConnectionCode code,
+ pthread_cond_t* condvar) {
+ if (*connected) {
+ if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
+ *connected = false;
+ pthread_cond_broadcast(condvar);
+ }
+ } else {
+ if (HttpResponse::SERVER_CONNECTION_OK == code) {
+ *connected = true;
+ pthread_cond_broadcast(condvar);
+ }
+ }
+}
+
+void SyncerThreadPthreadImpl::WatchConnectionManager(
+ ServerConnectionManager* conn_mgr) {
+ conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
+ &SyncerThreadPthreadImpl::HandleServerConnectionEvent));
+ CheckConnected(&connected_, conn_mgr->server_status(),
+ &changed_.condvar_);
+}
+
+void SyncerThreadPthreadImpl::HandleServerConnectionEvent(
+ const ServerConnectionEvent& event) {
+ if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
+ MutexLock lock(&mutex_);
+ CheckConnected(&connected_, event.connection_code,
+ &changed_.condvar_);
+ }
+}
+
+SyncerEventChannel* SyncerThreadPthreadImpl::channel() {
+ return syncer_event_channel_.get();
+}
+
+// Inputs and return value in milliseconds.
+int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval,
+ int user_idle_ms) {
+ // syncer_polling_interval_ is in seconds
+ int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
+
+ // This is our default and lower bound.
+ int next_wait = syncer_polling_interval_ms;
+
+ // Get idle time, bounded by max wait.
+ int idle = min(user_idle_ms, syncer_max_interval_);
+
+ // If the user has been idle for a while, we'll start decreasing the poll
+ // rate.
+ if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
+ next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
+ last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
+ }
+
+ return next_wait;
+}
+
+// Called with mutex_ already locked.
+void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now,
+ SyncerThread::NudgeSource source) {
+ const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
+ NudgeObject nudge_object(nudge_time, source);
+ nudge_queue_.push(nudge_object);
+ pthread_cond_broadcast(&changed_.condvar_);
+}
+
+void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) {
+ talk_mediator_hookup_.reset(
+ NewEventListenerHookup(
+ mediator->channel(),
+ this,
+ &SyncerThreadPthreadImpl::HandleTalkMediatorEvent));
+}
+
+void SyncerThreadPthreadImpl::HandleTalkMediatorEvent(
+ const TalkMediatorEvent& event) {
+ MutexLock lock(&mutex_);
+ switch (event.what_happened) {
+ case TalkMediatorEvent::LOGIN_SUCCEEDED:
+ LOG(INFO) << "P2P: Login succeeded.";
+ p2p_authenticated_ = true;
+ break;
+ case TalkMediatorEvent::LOGOUT_SUCCEEDED:
+ LOG(INFO) << "P2P: Login succeeded.";
+ p2p_authenticated_ = false;
+ break;
+ case TalkMediatorEvent::SUBSCRIPTIONS_ON:
+ LOG(INFO) << "P2P: Subscriptions successfully enabled.";
+ p2p_subscribed_ = true;
+ if (NULL != syncer_) {
+ LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
+ NudgeSyncImpl(0, SyncerThread::kLocal);
+ }
+ break;
+ case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
+ LOG(INFO) << "P2P: Subscriptions are not enabled.";
+ p2p_subscribed_ = false;
+ break;
+ case TalkMediatorEvent::NOTIFICATION_RECEIVED:
+ LOG(INFO) << "P2P: Updates on server, pushing syncer";
+ if (NULL != syncer_) {
+ NudgeSyncImpl(0, SyncerThread::kNotification);
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (NULL != syncer_) {
+ syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
+ }
+}
+
+} // namespace browser_sync \ No newline at end of file
diff --git a/chrome/browser/sync/engine/syncer_thread_pthreads.h b/chrome/browser/sync/engine/syncer_thread_pthreads.h
new file mode 100644
index 0000000..bebd8ed
--- /dev/null
+++ b/chrome/browser/sync/engine/syncer_thread_pthreads.h
@@ -0,0 +1,284 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// *THIS EXISTS FOR EXPERIMENTATION AND TESTING WHILE WE REPLACE PTHREADS
+// WITH CHROME THREADS IN SYNC CODE*
+
+// A class to run the syncer on a thread. Uses PIMPL to wrap the old, original
+// pthreads implementation of SyncerThread.
+#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_PTHREADS_H_
+#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_PTHREADS_H_
+
+#include <list>
+#include <map>
+#include <queue>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/scoped_ptr.h"
+#include "chrome/browser/sync/engine/all_status.h"
+#include "chrome/browser/sync/engine/client_command_channel.h"
+#include "chrome/browser/sync/util/event_sys-inl.h"
+#include "chrome/browser/sync/util/pthread_helpers.h"
+#include "chrome/browser/sync/engine/syncer_thread.h"
+#include "testing/gtest/include/gtest/gtest_prod.h" // For FRIEND_TEST
+
+class EventListenerHookup;
+
+namespace syncable {
+class DirectoryManager;
+struct DirectoryManagerEvent;
+}
+
+namespace browser_sync {
+
+class ModelSafeWorker;
+class ServerConnectionManager;
+class Syncer;
+class TalkMediator;
+class URLFactory;
+struct ServerConnectionEvent;
+struct SyncerEvent;
+struct SyncerShutdownEvent;
+struct TalkMediatorEvent;
+
+// The legacy implementation of SyncerThread using pthreads, kept around for
+// historical experimentation until a new version is finalized.
+class SyncerThreadPthreadImpl {
+ public:
+ virtual ~SyncerThreadPthreadImpl();
+
+ virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr);
+ // Creates and starts a syncer thread.
+ // Returns true if it creates a thread or if there's currently a thread
+ // running and false otherwise.
+ virtual bool Start();
+
+ // Stop processing. A max wait of at least 2*server RTT time is recommended.
+ // returns true if we stopped, false otherwise.
+ virtual bool Stop(int max_wait);
+
+ // Nudges the syncer to sync with a delay specified. This API is for access
+ // from the SyncerThread's controller and will cause a mutex lock.
+ virtual bool NudgeSyncer(int milliseconds_from_now,
+ SyncerThread::NudgeSource source);
+
+ // Registers this thread to watch talk mediator events.
+ virtual void WatchTalkMediator(TalkMediator* talk_mediator);
+
+ virtual void WatchClientCommands(ClientCommandChannel* channel);
+
+ virtual SyncerEventChannel* channel();
+
+ private:
+ friend class SyncerThreadPthreads;
+ SyncerThreadPthreadImpl(ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager, AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker);
+
+ // A few members to gate the rate at which we nudge the syncer.
+ enum {
+ kNudgeRateLimitCount = 6,
+ kNudgeRateLimitTime = 180,
+ };
+
+ // A queue of all scheduled nudges. One insertion for every call to
+ // NudgeQueue().
+ typedef std::pair<timespec, SyncerThread::NudgeSource> NudgeObject;
+
+ struct IsTimeSpecGreater {
+ inline bool operator() (const NudgeObject& lhs, const NudgeObject& rhs) {
+ return lhs.first.tv_sec == rhs.first.tv_sec ?
+ lhs.first.tv_nsec > rhs.first.tv_nsec :
+ lhs.first.tv_sec > rhs.first.tv_sec;
+ }
+ };
+
+ typedef std::priority_queue<NudgeObject, std::vector<NudgeObject>,
+ IsTimeSpecGreater> NudgeQueue;
+
+ // Threshold multipler for how long before user should be considered idle.
+ static const int kPollBackoffThresholdMultiplier = 10;
+
+ friend void* RunSyncerThread(void* syncer_thread);
+ void* Run();
+ void HandleDirectoryManagerEvent(
+ const syncable::DirectoryManagerEvent& event);
+ void HandleSyncerEvent(const SyncerEvent& event);
+ void HandleClientCommand(ClientCommandChannel::EventType event);
+
+ void HandleServerConnectionEvent(const ServerConnectionEvent& event);
+
+ void HandleTalkMediatorEvent(const TalkMediatorEvent& event);
+
+ void* ThreadMain();
+ void ThreadMainLoop();
+
+ void SyncMain(Syncer* syncer);
+
+ // Calculates the next sync wait time in seconds. last_poll_wait is the time
+ // duration of the previous polling timeout which was used.
+ // user_idle_milliseconds is updated by this method, and is a report of the
+ // full amount of time since the last period of activity for the user. The
+ // continue_sync_cycle parameter is used to determine whether or not we are
+ // calculating a polling wait time that is a continuation of an sync cycle
+ // which terminated while the syncer still had work to do.
+ int CalculatePollingWaitTime(
+ const AllStatus::Status& status,
+ int last_poll_wait, // in s
+ int* user_idle_milliseconds,
+ bool* continue_sync_cycle);
+ // Helper to above function, considers effect of user idle time.
+ int CalculateSyncWaitTime(int last_wait, int user_idle_ms);
+
+ // Sets the source value of the controlled syncer's updates_source value.
+ // The initial sync boolean is updated if read as a sentinel. The following
+ // two methods work in concert to achieve this goal.
+ void UpdateNudgeSource(const timespec& now, bool* continue_sync_cycle,
+ bool* initial_sync);
+ void SetUpdatesSource(bool nudged, SyncerThread::NudgeSource nudge_source,
+ bool* initial_sync);
+
+ // For unit tests only.
+ void DisableIdleDetection() { disable_idle_detection_ = true; }
+
+ // False when we want to stop the thread.
+ bool stop_syncer_thread_;
+
+ // We use one mutex for all members except the channel.
+ PThreadMutex mutex_;
+ typedef PThreadScopedLock<PThreadMutex> MutexLock;
+
+ // Handle of the running thread.
+ pthread_t thread_;
+ bool thread_running_;
+
+ // Gets signaled whenever a thread outside of the syncer thread changes a
+ // member variable.
+ PThreadCondVar changed_;
+
+ // State of the server connection.
+ bool connected_;
+
+ // State of the notification framework is tracked by these values.
+ bool p2p_authenticated_;
+ bool p2p_subscribed_;
+
+ scoped_ptr<EventListenerHookup> client_command_hookup_;
+ scoped_ptr<EventListenerHookup> conn_mgr_hookup_;
+ const AllStatus* allstatus_;
+
+ Syncer* syncer_;
+
+ syncable::DirectoryManager* dirman_;
+ ServerConnectionManager* scm_;
+
+ // Modifiable versions of kDefaultLongPollIntervalSeconds which can be
+ // updated by the server.
+ int syncer_short_poll_interval_seconds_;
+ int syncer_long_poll_interval_seconds_;
+
+ // The time we wait between polls in seconds. This is used as lower bound on
+ // our wait time. Updated once per loop from the command line flag.
+ int syncer_polling_interval_;
+
+ // The upper bound on the nominal wait between polls in seconds. Note that
+ // this bounds the "nominal" poll interval, while the the actual interval
+ // also takes previous failures into account.
+ int syncer_max_interval_;
+
+ scoped_ptr<SyncerEventChannel> syncer_event_channel_;
+
+ // This causes syncer to start syncing ASAP. If the rate of requests is too
+ // high the request will be silently dropped. mutex_ should be held when
+ // this is called.
+ void NudgeSyncImpl(int milliseconds_from_now,
+ SyncerThread::NudgeSource source);
+
+ NudgeQueue nudge_queue_;
+
+ scoped_ptr<EventListenerHookup> talk_mediator_hookup_;
+ ClientCommandChannel* const command_channel_;
+ scoped_ptr<EventListenerHookup> directory_manager_hookup_;
+ scoped_ptr<EventListenerHookup> syncer_events_;
+
+ // Handles any tasks that will result in model changes (modifications of
+ // syncable::Entries). Pass this to the syncer created and managed by |this|.
+ // Only non-null in syncapi case.
+ scoped_ptr<ModelSafeWorker> model_safe_worker_;
+
+ // Useful for unit tests
+ bool disable_idle_detection_;
+
+ DISALLOW_COPY_AND_ASSIGN(SyncerThreadPthreadImpl);
+};
+
+// A new-version SyncerThread pimpl wrapper for the old legacy implementation.
+class SyncerThreadPthreads : public SyncerThread {
+ FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime);
+ FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime);
+ FRIEND_TEST(SyncerThreadWithSyncerTest, Polling);
+ FRIEND_TEST(SyncerThreadWithSyncerTest, Nudge);
+ friend class SyncerThreadWithSyncerTest;
+ friend class SyncerThreadFactory;
+ public:
+ virtual ~SyncerThreadPthreads() {}
+
+ virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr) {
+ impl_->WatchConnectionManager(conn_mgr);
+ }
+ virtual bool Start() {
+ return impl_->Start();
+ }
+ virtual bool Stop(int max_wait) {
+ return impl_->Stop(max_wait);
+ }
+ virtual bool NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
+ return impl_->NudgeSyncer(milliseconds_from_now, source);
+ }
+ virtual void WatchTalkMediator(TalkMediator* talk_mediator) {
+ impl_->WatchTalkMediator(talk_mediator);
+ }
+ virtual void WatchClientCommands(ClientCommandChannel* channel) {
+ impl_->WatchClientCommands(channel);
+ }
+ virtual SyncerEventChannel* channel() {
+ return impl_->channel();
+ }
+ protected:
+ SyncerThreadPthreads(ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager, AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker);
+ virtual void SetConnected(bool connected) {
+ impl_->connected_ = connected;
+ }
+ virtual void SetSyncerPollingInterval(int interval) {
+ impl_->syncer_polling_interval_ = interval;
+ }
+ virtual void SetSyncerShortPollInterval(base::TimeDelta interval) {
+ impl_->syncer_short_poll_interval_seconds_ = static_cast<int>(
+ interval.InSeconds());
+ }
+ virtual void DisableIdleDetection() { impl_->disable_idle_detection_ = true; }
+ virtual int CalculateSyncWaitTime(int last_wait, int user_idle_ms) {
+ return impl_->CalculateSyncWaitTime(last_wait, user_idle_ms);
+ }
+ virtual int CalculatePollingWaitTime(
+ const AllStatus::Status& status,
+ int last_poll_wait, // in s
+ int* user_idle_milliseconds,
+ bool* continue_sync_cycle) {
+ return impl_->CalculatePollingWaitTime(status, last_poll_wait,
+ user_idle_milliseconds, continue_sync_cycle);
+ }
+ private:
+ scoped_ptr<SyncerThreadPthreadImpl> impl_;
+ DISALLOW_COPY_AND_ASSIGN(SyncerThreadPthreads);
+};
+
+} // namespace browser_sync
+
+#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_PTHREADS_H_ \ No newline at end of file
diff --git a/chrome/browser/sync/engine/syncer_thread_timed_stop.cc b/chrome/browser/sync/engine/syncer_thread_timed_stop.cc
new file mode 100644
index 0000000..a396dd0
--- /dev/null
+++ b/chrome/browser/sync/engine/syncer_thread_timed_stop.cc
@@ -0,0 +1,119 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
+
+#include "build/build_config.h"
+
+#ifdef OS_MACOSX
+#include <CoreFoundation/CFNumber.h>
+#include <IOKit/IOTypes.h>
+#include <IOKit/IOKitLib.h>
+#endif
+
+#include <algorithm>
+#include <map>
+#include <queue>
+
+#include "chrome/browser/sync/engine/auth_watcher.h"
+#include "chrome/browser/sync/engine/model_safe_worker.h"
+#include "chrome/browser/sync/engine/net/server_connection_manager.h"
+#include "chrome/browser/sync/engine/syncer.h"
+#include "chrome/browser/sync/notifier/listener/talk_mediator.h"
+#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
+#include "chrome/browser/sync/syncable/directory_manager.h"
+
+using std::priority_queue;
+using std::min;
+using base::Time;
+using base::TimeDelta;
+using base::TimeTicks;
+
+namespace browser_sync {
+
+SyncerThreadTimedStop::SyncerThreadTimedStop(
+ ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager,
+ AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker)
+ : SyncerThread(command_channel, mgr, connection_manager, all_status,
+ model_safe_worker),
+ in_thread_main_loop_(false) {
+}
+
+// Stop processing. A max wait of at least 2*server RTT time is recommended.
+// Returns true if we stopped, false otherwise.
+bool SyncerThreadTimedStop::Stop(int max_wait) {
+ AutoLock lock(lock_);
+ // If the thread has been started, then we either already have or are about to
+ // enter ThreadMainLoop so we have to proceed with shutdown and wait for it to
+ // finish. If the thread has not been started --and we now own the lock--
+ // then we can early out because the caller has not called Start().
+ if (!thread_.IsRunning())
+ return true;
+
+ LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
+ << "true (vault_.stop_syncer_thread_)";
+ // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
+ // below).
+ vault_.stop_syncer_thread_ = true;
+ if (NULL != vault_.syncer_) {
+ // Try to early exit the syncer.
+ vault_.syncer_->RequestEarlyExit();
+ }
+
+ // stop_syncer_thread_ is now true and the Syncer has been told to exit.
+ // We want to wake up all waiters so they can re-examine state. We signal,
+ // causing all waiters to try to re-acquire the lock, and then we atomically
+ // release the lock and wait. Our wait can be spuriously signaled, so we
+ // recalculate the remaining sleep time each time through and re-
+ // check the condition before exiting the loop.
+ vault_field_changed_.Broadcast();
+ TimeTicks start = TimeTicks::Now();
+ TimeTicks end = start + TimeDelta::FromMilliseconds(max_wait);
+ bool timed_out = false;
+ // Eventually the combination of RequestEarlyExit and setting
+ // stop_syncer_thread_ to true above will cause in_thread_main_loop_ to become
+ // false.
+ while (in_thread_main_loop_) {
+ TimeDelta sleep_time = end - TimeTicks::Now();
+ if (sleep_time < TimeDelta::FromSeconds(0)) {
+ timed_out = true;
+ break;
+ }
+ LOG(INFO) << "Waiting in stop for " << sleep_time.InSeconds() << "s.";
+ vault_field_changed_.TimedWait(sleep_time);
+ }
+
+ if (timed_out) {
+ LOG(ERROR) << "SyncerThread::Stop timed out or error. Problems likely.";
+ return false;
+ }
+
+ // Stop() should not block on anything at this point, given above madness.
+ DLOG(INFO) << "Calling SyncerThread::thread_.Stop() at "
+ << Time::Now().ToInternalValue();
+ thread_.Stop();
+ DLOG(INFO) << "SyncerThread::thread_.Stop() finished at "
+ << Time::Now().ToInternalValue();
+ return true;
+}
+
+void SyncerThreadTimedStop::ThreadMain() {
+ AutoLock lock(lock_);
+ // Signal Start() to let it know we've made it safely are now running on the
+ // message loop, and unblock it's caller.
+ thread_main_started_.Signal();
+
+ // The only thing that could be waiting on this value is Stop, and we don't
+ // release the lock until we're far enough along to Stop safely.
+ in_thread_main_loop_ = true;
+ vault_field_changed_.Broadcast();
+ ThreadMainLoop();
+ in_thread_main_loop_ = false;
+ vault_field_changed_.Broadcast();
+ LOG(INFO) << "Syncer thread ThreadMain is done.";
+}
+
+} // namespace browser_sync \ No newline at end of file
diff --git a/chrome/browser/sync/engine/syncer_thread_timed_stop.h b/chrome/browser/sync/engine/syncer_thread_timed_stop.h
new file mode 100644
index 0000000..e1037a1
--- /dev/null
+++ b/chrome/browser/sync/engine/syncer_thread_timed_stop.h
@@ -0,0 +1,53 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A class to run the syncer on a thread. This guy is the closest chrome-based
+// (as opposed to pthreads based) SyncerThread to the old pthread implementation
+// in semantics, as it supports a timeout on Stop() -- It is just an override of
+// two methods from SyncerThread: ThreadMain and Stop -- to provide this.
+#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_TIMED_STOP_H_
+#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_TIMED_STOP_H_
+
+#include <list>
+#include <map>
+#include <queue>
+#include <vector>
+
+#include "chrome/browser/sync/engine/syncer_thread.h"
+
+namespace browser_sync {
+
+class SyncerThreadTimedStop : public SyncerThread {
+ FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime);
+ FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime);
+ FRIEND_TEST(SyncerThreadWithSyncerTest, Polling);
+ FRIEND_TEST(SyncerThreadWithSyncerTest, Nudge);
+ friend class SyncerThreadWithSyncerTest;
+ friend class SyncerThreadFactory;
+ public:
+ virtual ~SyncerThreadTimedStop() {}
+
+ // Stop processing. This version comes with a supported max_wait.
+ // A max wait of at least 2*server RTT time is recommended.
+ // Returns true if we stopped, false otherwise.
+ virtual bool Stop(int max_wait);
+
+ private:
+ SyncerThreadTimedStop(ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager, AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker);
+ virtual void ThreadMain();
+
+ // We use this to track when our synthesized thread loop is active, so we can
+ // timed-wait for it to become false. For this and only this (temporary)
+ // implementation, we protect this variable using our parent lock_.
+ bool in_thread_main_loop_;
+
+ DISALLOW_COPY_AND_ASSIGN(SyncerThreadTimedStop);
+};
+
+} // namespace browser_sync
+
+#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_TIMED_STOP_H_ \ No newline at end of file
diff --git a/chrome/browser/sync/engine/syncer_thread_unittest.cc b/chrome/browser/sync/engine/syncer_thread_unittest.cc
index 6b758a7..fa28f7f 100644
--- a/chrome/browser/sync/engine/syncer_thread_unittest.cc
+++ b/chrome/browser/sync/engine/syncer_thread_unittest.cc
@@ -7,89 +7,158 @@
#include <set>
#include <strstream>
+#include "base/command_line.h"
#include "base/scoped_ptr.h"
+#include "base/time.h"
+#include "chrome/browser/sync/engine/model_safe_worker.h"
#include "chrome/browser/sync/engine/syncer_thread.h"
+#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
+#include "chrome/test/sync/engine/mock_server_connection.h"
+#include "chrome/test/sync/engine/test_directory_setter_upper.h"
#include "testing/gtest/include/gtest/gtest.h"
+using base::Time;
+using base::TimeDelta;
+
namespace browser_sync {
-class SyncerThreadTest : public testing::Test {
- protected:
- SyncerThreadTest() {}
- virtual ~SyncerThreadTest() {}
- virtual void SetUp() {}
- virtual void TearDown() {}
+typedef testing::Test SyncerThreadTest;
+
+class SyncerThreadWithSyncerTest : public testing::Test {
+ public:
+ SyncerThreadWithSyncerTest() {}
+ virtual void SetUp() {
+ metadb_.SetUp();
+ connection_.reset(new MockConnectionManager(metadb_.manager(),
+ metadb_.name()));
+ allstatus_.reset(new AllStatus());
+
+ syncer_thread_ = SyncerThreadFactory::Create(NULL, metadb_.manager(),
+ connection_.get(), allstatus_.get(), new ModelSafeWorker());
+
+ allstatus_->WatchSyncerThread(syncer_thread_);
+ syncer_thread_->SetConnected(true);
+ }
+ virtual void TearDown() {
+ syncer_thread_ = NULL;
+ allstatus_.reset();
+ connection_.reset();
+ metadb_.TearDown();
+ }
+ ManuallyOpenedTestDirectorySetterUpper* metadb() { return &metadb_; }
+ MockConnectionManager* connection() { return connection_.get(); }
+ SyncerThread* syncer_thread() { return syncer_thread_; }
+ private:
+ ManuallyOpenedTestDirectorySetterUpper metadb_;
+ scoped_ptr<MockConnectionManager> connection_;
+ scoped_ptr<AllStatus> allstatus_;
+ scoped_refptr<SyncerThread> syncer_thread_;
+ DISALLOW_COPY_AND_ASSIGN(SyncerThreadWithSyncerTest);
+};
+
+class SyncShareIntercept : public MockConnectionManager::MidCommitObserver {
+ public:
+ SyncShareIntercept() : sync_occured_(false, false) {}
+ virtual ~SyncShareIntercept() {}
+ virtual void Observe() {
+ times_sync_occured_.push_back(Time::NowFromSystemTime());
+ sync_occured_.Signal();
+ }
+ void WaitForSyncShare(int at_least_this_many, TimeDelta max_wait) {
+ while (at_least_this_many-- > 0)
+ sync_occured_.TimedWait(max_wait);
+ }
+ std::vector<Time> times_sync_occured() const {
+ return times_sync_occured_;
+ }
private:
- DISALLOW_COPY_AND_ASSIGN(SyncerThreadTest);
+ std::vector<Time> times_sync_occured_;
+ base::WaitableEvent sync_occured_;
+ DISALLOW_COPY_AND_ASSIGN(SyncShareIntercept);
};
TEST_F(SyncerThreadTest, Construction) {
- SyncerThread syncer_thread(NULL, NULL, NULL, NULL, NULL);
+ scoped_refptr<SyncerThread> syncer_thread(
+ SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL));
+}
+
+TEST_F(SyncerThreadTest, StartStop) {
+ scoped_refptr<SyncerThread> syncer_thread(
+ SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL));
+ EXPECT_TRUE(syncer_thread->Start());
+ EXPECT_TRUE(syncer_thread->Stop(2000));
+
+ // Do it again for good measure. I caught some bugs by adding this so
+ // I would recommend keeping it.
+ EXPECT_TRUE(syncer_thread->Start());
+ EXPECT_TRUE(syncer_thread->Stop(2000));
}
TEST_F(SyncerThreadTest, CalculateSyncWaitTime) {
- SyncerThread syncer_thread(NULL, NULL, NULL, NULL, NULL);
- syncer_thread.DisableIdleDetection();
+ scoped_refptr<SyncerThread> syncer_thread(
+ SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL));
+ syncer_thread->DisableIdleDetection();
+
+ // Syncer_polling_interval_ is less than max poll interval.
+ TimeDelta syncer_polling_interval = TimeDelta::FromSeconds(1);
- // Syncer_polling_interval_ is less than max poll interval
- int syncer_polling_interval = 1; // Needed since AssertionResult is not a
- // friend of SyncerThread
- syncer_thread.syncer_polling_interval_ = syncer_polling_interval;
+ syncer_thread->SetSyncerPollingInterval(syncer_polling_interval);
// user_idle_ms is less than 10 * (syncer_polling_interval*1000).
- ASSERT_EQ(syncer_polling_interval * 1000,
- syncer_thread.CalculateSyncWaitTime(1000, 0));
- ASSERT_EQ(syncer_polling_interval * 1000,
- syncer_thread.CalculateSyncWaitTime(1000, 1));
+ ASSERT_EQ(syncer_polling_interval.InMilliseconds(),
+ syncer_thread->CalculateSyncWaitTime(1000, 0));
+ ASSERT_EQ(syncer_polling_interval.InMilliseconds(),
+ syncer_thread->CalculateSyncWaitTime(1000, 1));
// user_idle_ms is ge than 10 * (syncer_polling_interval*1000).
int last_poll_time = 2000;
ASSERT_LE(last_poll_time,
- syncer_thread.CalculateSyncWaitTime(last_poll_time, 10000));
+ syncer_thread->CalculateSyncWaitTime(last_poll_time, 10000));
ASSERT_GE(last_poll_time*3,
- syncer_thread.CalculateSyncWaitTime(last_poll_time, 10000));
+ syncer_thread->CalculateSyncWaitTime(last_poll_time, 10000));
ASSERT_LE(last_poll_time,
- syncer_thread.CalculateSyncWaitTime(last_poll_time, 100000));
+ syncer_thread->CalculateSyncWaitTime(last_poll_time, 100000));
ASSERT_GE(last_poll_time*3,
- syncer_thread.CalculateSyncWaitTime(last_poll_time, 100000));
+ syncer_thread->CalculateSyncWaitTime(last_poll_time, 100000));
// Maximum backoff time should be syncer_max_interval.
int near_threshold = SyncerThread::kDefaultMaxPollIntervalMs / 2 - 1;
int threshold = SyncerThread::kDefaultMaxPollIntervalMs;
int over_threshold = SyncerThread::kDefaultMaxPollIntervalMs + 1;
ASSERT_LE(near_threshold,
- syncer_thread.CalculateSyncWaitTime(near_threshold, 10000));
+ syncer_thread->CalculateSyncWaitTime(near_threshold, 10000));
ASSERT_GE(SyncerThread::kDefaultMaxPollIntervalMs,
- syncer_thread.CalculateSyncWaitTime(near_threshold, 10000));
+ syncer_thread->CalculateSyncWaitTime(near_threshold, 10000));
ASSERT_EQ(SyncerThread::kDefaultMaxPollIntervalMs,
- syncer_thread.CalculateSyncWaitTime(threshold, 10000));
+ syncer_thread->CalculateSyncWaitTime(threshold, 10000));
ASSERT_EQ(SyncerThread::kDefaultMaxPollIntervalMs,
- syncer_thread.CalculateSyncWaitTime(over_threshold, 10000));
+ syncer_thread->CalculateSyncWaitTime(over_threshold, 10000));
// Possible idle time must be capped by syncer_max_interval.
int over_sync_max_interval =
SyncerThread::kDefaultMaxPollIntervalMs + 1;
- syncer_polling_interval = over_sync_max_interval / 100; // so 1000* is right
- syncer_thread.syncer_polling_interval_ = syncer_polling_interval;
- ASSERT_EQ(syncer_polling_interval * 1000,
- syncer_thread.CalculateSyncWaitTime(1000, over_sync_max_interval));
- syncer_polling_interval = 1;
- syncer_thread.syncer_polling_interval_ = syncer_polling_interval;
+ syncer_polling_interval = TimeDelta::FromSeconds(
+ over_sync_max_interval / 100); // so 1000* is right
+ syncer_thread->SetSyncerPollingInterval(syncer_polling_interval);
+ ASSERT_EQ(syncer_polling_interval.InSeconds() * 1000,
+ syncer_thread->CalculateSyncWaitTime(1000, over_sync_max_interval));
+ syncer_polling_interval = TimeDelta::FromSeconds(1);
+ syncer_thread->SetSyncerPollingInterval(syncer_polling_interval);
ASSERT_LE(last_poll_time,
- syncer_thread.CalculateSyncWaitTime(last_poll_time,
+ syncer_thread->CalculateSyncWaitTime(last_poll_time,
over_sync_max_interval));
ASSERT_GE(last_poll_time * 3,
- syncer_thread.CalculateSyncWaitTime(last_poll_time,
+ syncer_thread->CalculateSyncWaitTime(last_poll_time,
over_sync_max_interval));
}
TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// Set up the environment.
int user_idle_milliseconds_param = 0;
-
- SyncerThread syncer_thread(NULL, NULL, NULL, NULL, NULL);
- syncer_thread.DisableIdleDetection();
+ scoped_refptr<SyncerThread> syncer_thread(
+ SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL));
+ syncer_thread->DisableIdleDetection();
// Notifications disabled should result in a polling interval of
// kDefaultShortPollInterval.
@@ -100,7 +169,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// No work and no backoff.
ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -110,7 +179,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// In this case the continue_sync_cycle is turned off.
continue_sync_cycle_param = true;
ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -131,7 +200,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// No work and no backoff.
ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -141,7 +210,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// In this case the continue_sync_cycle is turned off.
continue_sync_cycle_param = true;
ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -162,7 +231,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
status.updates_received = 0;
bool continue_sync_cycle_param = false;
- ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -170,19 +239,19 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
ASSERT_TRUE(continue_sync_cycle_param);
continue_sync_cycle_param = false;
- ASSERT_GE(3, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_GE(3, syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
&continue_sync_cycle_param));
ASSERT_TRUE(continue_sync_cycle_param);
- ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
&continue_sync_cycle_param));
- ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -191,7 +260,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
status.updates_received = 1;
ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
10,
&user_idle_milliseconds_param,
@@ -204,7 +273,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
status.unsynced_count = 1;
bool continue_sync_cycle_param = false;
- ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -212,7 +281,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
ASSERT_TRUE(continue_sync_cycle_param);
continue_sync_cycle_param = false;
- ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime(
status,
0,
&user_idle_milliseconds_param,
@@ -221,7 +290,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
status.unsynced_count = 0;
ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
4,
&user_idle_milliseconds_param,
@@ -237,7 +306,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// Expect move from default polling interval to exponential backoff due to
// unsynced_count != 0.
- ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime(
status,
3600,
&user_idle_milliseconds_param,
@@ -245,7 +314,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
ASSERT_TRUE(continue_sync_cycle_param);
continue_sync_cycle_param = false;
- ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime(
status,
3600,
&user_idle_milliseconds_param,
@@ -253,12 +322,12 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
ASSERT_TRUE(continue_sync_cycle_param);
// Expect exponential backoff.
- ASSERT_LE(2, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_LE(2, syncer_thread->CalculatePollingWaitTime(
status,
2,
&user_idle_milliseconds_param,
&continue_sync_cycle_param));
- ASSERT_GE(6, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_GE(6, syncer_thread->CalculatePollingWaitTime(
status,
2,
&user_idle_milliseconds_param,
@@ -268,7 +337,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// A nudge resets the continue_sync_cycle_param value, so our backoff
// should return to the minimum.
continue_sync_cycle_param = false;
- ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime(
status,
3600,
&user_idle_milliseconds_param,
@@ -276,7 +345,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
ASSERT_TRUE(continue_sync_cycle_param);
continue_sync_cycle_param = false;
- ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime(
+ ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime(
status,
3600,
&user_idle_milliseconds_param,
@@ -286,7 +355,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
// Setting unsynced_count = 0 returns us to the default polling interval.
status.unsynced_count = 0;
ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds,
- syncer_thread.CalculatePollingWaitTime(
+ syncer_thread->CalculatePollingWaitTime(
status,
4,
&user_idle_milliseconds_param,
@@ -295,4 +364,67 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) {
}
}
+TEST_F(SyncerThreadWithSyncerTest, Polling) {
+ SyncShareIntercept interceptor;
+ connection()->SetMidCommitObserver(&interceptor);
+
+ const TimeDelta poll_interval = TimeDelta::FromSeconds(1);
+ syncer_thread()->SetSyncerShortPollInterval(poll_interval);
+ EXPECT_TRUE(syncer_thread()->Start());
+
+ // Calling Open() should cause the SyncerThread to create a Syncer.
+ metadb()->Open();
+
+ TimeDelta two_polls = poll_interval + poll_interval;
+ // We could theoretically return immediately from the wait if the interceptor
+ // was already signaled for a SyncShare (the first one comes quick).
+ interceptor.WaitForSyncShare(1, two_polls);
+ EXPECT_FALSE(interceptor.times_sync_occured().empty());
+
+ // Wait for at least 2 more SyncShare operations.
+ interceptor.WaitForSyncShare(2, two_polls);
+ EXPECT_TRUE(syncer_thread()->Stop(2000));
+
+ // Now analyze the run.
+ std::vector<Time> data = interceptor.times_sync_occured();
+
+ EXPECT_GE(data.size(), static_cast<unsigned int>(3));
+ for (unsigned int i = 0; i < data.size() - 1; i++) {
+ Time optimal_next_sync = data[i] + poll_interval;
+ // The pthreads impl uses a different time impl and is slightly (~900usecs)
+ // off, so this expectation can fail with --syncer-thread-pthreads.
+ EXPECT_TRUE(data[i + 1] >= optimal_next_sync)
+ << "difference is "
+ << (data[i + 1] - optimal_next_sync).InMicroseconds() << " usecs. "
+ << "~900usec delta is OK with --syncer-thread-pthreads";
+ // This should be reliable, as there are no blocking or I/O operations
+ // except the explicit 2 second wait, so if it takes longer than this
+ // there is a problem.
+ EXPECT_TRUE(data[i + 1] < optimal_next_sync + poll_interval);
+ }
+}
+
+TEST_F(SyncerThreadWithSyncerTest, Nudge) {
+ SyncShareIntercept interceptor;
+ connection()->SetMidCommitObserver(&interceptor);
+ // We don't want a poll to happen during this test (except the first one).
+ const TimeDelta poll_interval = TimeDelta::FromMinutes(5);
+ syncer_thread()->SetSyncerShortPollInterval(poll_interval);
+ EXPECT_TRUE(syncer_thread()->Start());
+ metadb()->Open();
+ interceptor.WaitForSyncShare(1, poll_interval + poll_interval);
+
+ EXPECT_EQ(static_cast<unsigned int>(1),
+ interceptor.times_sync_occured().size());
+ // The SyncerThread should be waiting for the poll now. Nudge it to sync
+ // immediately (5ms).
+ syncer_thread()->NudgeSyncer(5, SyncerThread::kUnknown);
+ interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1));
+ EXPECT_EQ(static_cast<unsigned int>(2),
+ interceptor.times_sync_occured().size());
+
+ // SyncerThread should be waiting again. Signal it to stop.
+ EXPECT_TRUE(syncer_thread()->Stop(2000));
+}
+
} // namespace browser_sync
diff --git a/chrome/chrome.gyp b/chrome/chrome.gyp
index 4b37d3d..89a5efd 100644
--- a/chrome/chrome.gyp
+++ b/chrome/chrome.gyp
@@ -4592,6 +4592,7 @@
'../third_party/libjingle/libjingle.gyp:libjingle',
'../third_party/protobuf2/protobuf.gyp:protobuf',
'../third_party/sqlite/sqlite.gyp:sqlite',
+ 'common',
'notifier',
'sync',
],
@@ -6556,6 +6557,10 @@
'browser/sync/engine/syncer_status.h',
'browser/sync/engine/syncer_thread.cc',
'browser/sync/engine/syncer_thread.h',
+ 'browser/sync/engine/syncer_thread_timed_stop.cc',
+ 'browser/sync/engine/syncer_thread_timed_stop.h',
+ 'browser/sync/engine/syncer_thread_pthreads.cc',
+ 'browser/sync/engine/syncer_thread_pthreads.h',
'browser/sync/engine/syncer_types.h',
'browser/sync/engine/syncer_util.cc',
'browser/sync/engine/syncer_util.h',
diff --git a/chrome/common/chrome_switches.cc b/chrome/common/chrome_switches.cc
index 075a350..14aea58 100644
--- a/chrome/common/chrome_switches.cc
+++ b/chrome/common/chrome_switches.cc
@@ -378,6 +378,16 @@ const wchar_t kEnableFastback[] = L"enable-fastback";
// Enable syncing bookmarks to a Google Account.
const wchar_t kEnableSync[] = L"enable-sync";
+// Use the SyncerThread implementation that matches up with the old pthread
+// impl semantics, but using Chrome synchronization primitives. The only
+// difference between this and the default is that we now have no timeout on
+// Stop(). Should only use if you experience problems with the default.
+const wchar_t kSyncerThreadTimedStop[] = L"syncer-thread-timed-stop";
+
+// Use the old pthreads SyncerThread implementation.
+// Should only use if you experience problems with the default.
+const wchar_t kSyncerThreadPthreads[] = L"syncer-thread-pthreads";
+
// Enable support for SDCH filtering (dictionary based expansion of content).
// Optional argument is *the* only domain name that will have SDCH suppport.
// Default is "-enable-sdch" to advertise SDCH on all domains.
diff --git a/chrome/common/chrome_switches.h b/chrome/common/chrome_switches.h
index f031399..9ed753c 100644
--- a/chrome/common/chrome_switches.h
+++ b/chrome/common/chrome_switches.h
@@ -140,6 +140,8 @@ extern const wchar_t kGearsPluginPathOverride[];
extern const wchar_t kEnableFastback[];
extern const wchar_t kEnableSync[];
+extern const wchar_t kSyncerThreadTimedStop[];
+extern const wchar_t kSyncerThreadPthreads[];
extern const wchar_t kSdchFilter[];
diff --git a/chrome/test/sync/engine/mock_server_connection.cc b/chrome/test/sync/engine/mock_server_connection.cc
index bd3d90f..c4595bd 100644
--- a/chrome/test/sync/engine/mock_server_connection.cc
+++ b/chrome/test/sync/engine/mock_server_connection.cc
@@ -25,6 +25,7 @@ using sync_pb::CommitResponse_EntryResponse;
using sync_pb::GetUpdatesMessage;
using sync_pb::SyncEntity;
using syncable::DirectoryManager;
+using syncable::ScopedDirLookup;
using syncable::WriteTransaction;
MockConnectionManager::MockConnectionManager(DirectoryManager* dirmgr,
@@ -38,12 +39,13 @@ MockConnectionManager::MockConnectionManager(DirectoryManager* dirmgr,
client_stuck_(false),
commit_time_rename_prepended_string_(""),
fail_next_postbuffer_(false),
- directory_(dirmgr, name),
+ directory_manager_(dirmgr),
+ directory_name_(name),
mid_commit_callback_function_(NULL),
+ mid_commit_observer_(NULL),
client_command_(NULL),
next_position_in_parent_(2) {
server_reachable_ = true;
- CHECK(directory_.good());
};
MockConnectionManager::~MockConnectionManager() {
@@ -60,9 +62,18 @@ void MockConnectionManager::SetMidCommitCallbackFunction(
mid_commit_callback_function_ = callback;
}
+void MockConnectionManager::SetMidCommitObserver(
+ MockConnectionManager::MidCommitObserver* observer) {
+ mid_commit_observer_ = observer;
+}
+
bool MockConnectionManager::PostBufferToPath(const PostBufferParams* params,
const string& path,
const string& auth_token) {
+
+ ScopedDirLookup directory(directory_manager_, directory_name_);
+ CHECK(directory.good());
+
ClientToServerMessage post;
CHECK(post.ParseFromString(params->buffer_in));
client_stuck_ = post.sync_problem_detected();
@@ -73,7 +84,7 @@ bool MockConnectionManager::PostBufferToPath(const PostBufferParams* params,
// network. As we can't test this we do the next best thing and hang here
// when there's an issue.
{
- WriteTransaction wt(directory_, syncable::UNITTEST, __FILE__, __LINE__);
+ WriteTransaction wt(directory, syncable::UNITTEST, __FILE__, __LINE__);
}
if (fail_next_postbuffer_) {
fail_next_postbuffer_ = false;
@@ -105,9 +116,12 @@ bool MockConnectionManager::PostBufferToPath(const PostBufferParams* params,
}
response.SerializeToString(params->buffer_out);
if (mid_commit_callback_function_) {
- if (mid_commit_callback_function_(directory_))
+ if (mid_commit_callback_function_(directory))
mid_commit_callback_function_ = 0;
}
+ if (mid_commit_observer_) {
+ mid_commit_observer_->Observe();
+ }
return result;
}
diff --git a/chrome/test/sync/engine/mock_server_connection.h b/chrome/test/sync/engine/mock_server_connection.h
index e5573e0..8c0cca7 100644
--- a/chrome/test/sync/engine/mock_server_connection.h
+++ b/chrome/test/sync/engine/mock_server_connection.h
@@ -31,6 +31,10 @@ class MockConnectionManager : public browser_sync::ServerConnectionManager {
// activity would normally take place. This aids simulation of race
// conditions.
typedef bool (*TestCallbackFunction)(syncable::Directory* dir);
+ class MidCommitObserver {
+ public:
+ virtual void Observe() = 0;
+ };
MockConnectionManager(syncable::DirectoryManager* dirmgr, PathString name);
virtual ~MockConnectionManager();
@@ -45,6 +49,7 @@ class MockConnectionManager : public browser_sync::ServerConnectionManager {
// Control of commit response.
void SetMidCommitCallbackFunction(TestCallbackFunction callback);
+ void SetMidCommitObserver(MidCommitObserver* observer);
// Set this if you want commit to perform commit time rename. Will request
// that the client renames all commited entries, prepending this string.
@@ -177,11 +182,13 @@ class MockConnectionManager : public browser_sync::ServerConnectionManager {
bool fail_next_postbuffer_;
// Our directory.
- syncable::ScopedDirLookup directory_;
+ syncable::DirectoryManager* directory_manager_;
+ PathString directory_name_;
// The updates we'll return to the next request.
sync_pb::GetUpdatesResponse updates_;
TestCallbackFunction mid_commit_callback_function_;
+ MidCommitObserver* mid_commit_observer_;
scoped_ptr<sync_pb::ClientCommand> client_command_;
diff --git a/chrome/test/sync/engine/test_directory_setter_upper.cc b/chrome/test/sync/engine/test_directory_setter_upper.cc
index 468875a..0a7b32d 100644
--- a/chrome/test/sync/engine/test_directory_setter_upper.cc
+++ b/chrome/test/sync/engine/test_directory_setter_upper.cc
@@ -20,12 +20,15 @@ TestDirectorySetterUpper::TestDirectorySetterUpper() : name_(PSTR("Test")) {}
TestDirectorySetterUpper::~TestDirectorySetterUpper() {}
-void TestDirectorySetterUpper::SetUp() {
+void TestDirectorySetterUpper::Init() {
PathString test_data_dir_ = PSTR(".");
manager_.reset(new DirectoryManager(test_data_dir_));
file_path_ = manager_->GetSyncDataDatabasePath();
PathRemove(file_path_.c_str());
+}
+void TestDirectorySetterUpper::SetUp() {
+ Init();
ASSERT_TRUE(manager()->Open(name()));
}
@@ -64,4 +67,19 @@ void TestDirectorySetterUpper::RunInvariantCheck(const ScopedDirLookup& dir) {
}
}
+void ManuallyOpenedTestDirectorySetterUpper::SetUp() {
+ Init();
+}
+
+void ManuallyOpenedTestDirectorySetterUpper::Open() {
+ ASSERT_TRUE(manager()->Open(name()));
+ was_opened_ = true;
+}
+
+void ManuallyOpenedTestDirectorySetterUpper::TearDown() {
+ if (was_opened_) {
+ TestDirectorySetterUpper::TearDown();
+ }
+}
+
} // namespace browser_sync
diff --git a/chrome/test/sync/engine/test_directory_setter_upper.h b/chrome/test/sync/engine/test_directory_setter_upper.h
index 0cac7e1..0cfa3e6 100644
--- a/chrome/test/sync/engine/test_directory_setter_upper.h
+++ b/chrome/test/sync/engine/test_directory_setter_upper.h
@@ -44,20 +44,23 @@ namespace browser_sync {
class TestDirectorySetterUpper {
public:
TestDirectorySetterUpper();
- ~TestDirectorySetterUpper();
+ virtual ~TestDirectorySetterUpper();
// Create a DirectoryManager instance and use it to open the directory.
// Clears any existing database backing files that might exist on disk.
- void SetUp();
+ virtual void SetUp();
// Undo everything done by SetUp(): close the directory and delete the
// backing files. Before closing the directory, this will run the directory
// invariant checks and perform the SaveChanges action on the directory.
- void TearDown();
+ virtual void TearDown();
syncable::DirectoryManager* manager() const { return manager_.get(); }
const PathString& name() const { return name_; }
+ protected:
+ virtual void Init();
+
private:
void RunInvariantCheck(const syncable::ScopedDirLookup& dir);
@@ -66,6 +69,19 @@ class TestDirectorySetterUpper {
PathString file_path_;
};
+// A variant of the above where SetUp does not actually open the directory.
+// You must manually invoke Open(). This is useful if you are writing a test
+// that depends on the DirectoryManager::OPENED event.
+class ManuallyOpenedTestDirectorySetterUpper : public TestDirectorySetterUpper {
+ public:
+ ManuallyOpenedTestDirectorySetterUpper() : was_opened_(false) {}
+ virtual void SetUp();
+ virtual void TearDown();
+ void Open();
+ private:
+ bool was_opened_;
+};
+
} // namespace browser_sync
#endif // CHROME_TEST_SYNC_ENGINE_TEST_DIRECTORY_SETTER_UPPER_H_