diff options
author | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-09-24 20:24:48 +0000 |
---|---|---|
committer | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-09-24 20:24:48 +0000 |
commit | 3a76ec1999dc11db01da59a322b0b9f2eceba3b2 (patch) | |
tree | 7ecc0eb0c05a3795a245626efcd1ae4f93199983 /chrome/browser/sync | |
parent | 2d6e5029ff051a4bb7632c99b575170ba93a0a62 (diff) | |
download | chromium_src-3a76ec1999dc11db01da59a322b0b9f2eceba3b2.zip chromium_src-3a76ec1999dc11db01da59a322b0b9f2eceba3b2.tar.gz chromium_src-3a76ec1999dc11db01da59a322b0b9f2eceba3b2.tar.bz2 |
Use chrome/base synchronization primitives and threads instead of
pthreads in SyncerThread. The old pthread impl can be used by
specifying --syncer-thread-pthreads for comparison until we
settle fully on the final impl (I have a MessageLoop-based impl
in progress).
The default SyncerThread is as close to the pthreads-impl semantics as
I could get, with one exception: it does not offer a time-out
when calling Stop(), because it greatly simplifies the implementation.
I first implemented it *with* the timeout, and for sake of
experimentation while this is in shuffle I am checking it in
as SyncerThreadTimedStop, available by using --syncer-thread-timed-stop. I'm not sure which we want ultimately, but it's useful to have around when building the MessageLoop based impl.
I had to refactor the interface slightly to allow multiple implementations,
I think it will be quite useful while working on the MessageLoop impl.
Added several tests to SyncerThreadUnittest, which all impls now pass (
just pass the command line flag to try each out).
TEST=SyncerThreadTest, SyncerThreadWithSyncerTest, integration tests
Review URL: http://codereview.chromium.org/214033
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@27117 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
-rw-r--r-- | chrome/browser/sync/engine/syncapi.cc | 13 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread.cc | 317 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread.h | 190 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread_pthreads.cc | 578 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread_pthreads.h | 284 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread_timed_stop.cc | 119 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread_timed_stop.h | 53 | ||||
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread_unittest.cc | 244 |
8 files changed, 1550 insertions, 248 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc index f1ca43d..b9a8695 100644 --- a/chrome/browser/sync/engine/syncapi.cc +++ b/chrome/browser/sync/engine/syncapi.cc @@ -53,6 +53,7 @@ using browser_sync::Syncer; using browser_sync::SyncerEvent; using browser_sync::SyncerStatus; using browser_sync::SyncerThread; +using browser_sync::SyncerThreadFactory; using browser_sync::UserSettings; using browser_sync::TalkMediator; using browser_sync::TalkMediatorImpl; @@ -873,7 +874,7 @@ class SyncManager::SyncInternal { scoped_ptr<SyncAPIServerConnectionManager> connection_manager_; // The thread that runs the Syncer. Needs to be explicitly Start()ed. - scoped_ptr<SyncerThread> syncer_thread_; + scoped_refptr<SyncerThread> syncer_thread_; // Notification (xmpp) handler. scoped_ptr<TalkMediator> talk_mediator_; @@ -1063,11 +1064,11 @@ bool SyncManager::SyncInternal::Init( // on the Syncer side, and |model_safe_worker| on the API client side. ModelSafeWorkerBridge* worker = new ModelSafeWorkerBridge(model_safe_worker); - syncer_thread_.reset(new SyncerThread(&command_channel_, - dir_manager(), - connection_manager(), - &allstatus_, - worker)); + syncer_thread_ = SyncerThreadFactory::Create(&command_channel_, + dir_manager(), + connection_manager(), + &allstatus_, + worker); syncer_thread()->WatchTalkMediator(talk_mediator()); allstatus()->WatchSyncerThread(syncer_thread()); diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc index a53ab93..0ef436b 100644 --- a/chrome/browser/sync/engine/syncer_thread.cc +++ b/chrome/browser/sync/engine/syncer_thread.cc @@ -1,7 +1,6 @@ // Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. - #include "chrome/browser/sync/engine/syncer_thread.h" #include "build/build_config.h" @@ -16,20 +15,23 @@ #include <map> #include <queue> +#include "base/command_line.h" #include "chrome/browser/sync/engine/auth_watcher.h" #include "chrome/browser/sync/engine/model_safe_worker.h" #include "chrome/browser/sync/engine/net/server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/engine/syncer_thread_pthreads.h" +#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" #include "chrome/browser/sync/notifier/listener/talk_mediator.h" #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" #include "chrome/browser/sync/syncable/directory_manager.h" +#include "chrome/common/chrome_switches.h" using std::priority_queue; using std::min; - -static inline bool operator < (const timespec& a, const timespec& b) { - return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; -} +using base::Time; +using base::TimeDelta; +using base::TimeTicks; namespace { @@ -108,17 +110,56 @@ int UserIdleTime() { namespace browser_sync { +SyncerThread* SyncerThreadFactory::Create( + ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker) { + const CommandLine* cmd = CommandLine::ForCurrentProcess(); + if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) { + return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, + all_status, model_safe_worker); + } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) { + return new SyncerThreadPthreads(command_channel, mgr, connection_manager, + all_status, model_safe_worker); + } else { + // The default SyncerThread implementation, which does not time-out when + // Stop is called. + return new SyncerThread(command_channel, mgr, connection_manager, + all_status, model_safe_worker); + } +} + bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { - MutexLock lock(&mutex_); - if (syncer_ == NULL) { + AutoLock lock(lock_); + if (vault_.syncer_ == NULL) { return false; } NudgeSyncImpl(milliseconds_from_now, source); return true; } -void* RunSyncerThread(void* syncer_thread) { - return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); +SyncerThread::SyncerThread() + : thread_main_started_(false, false), + thread_("SyncEngine_SyncerThread"), + vault_field_changed_(&lock_), + p2p_authenticated_(false), + p2p_subscribed_(false), + client_command_hookup_(NULL), + conn_mgr_hookup_(NULL), + allstatus_(NULL), + dirman_(NULL), + scm_(NULL), + syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), + syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), + syncer_polling_interval_(kDefaultShortPollIntervalSeconds), + syncer_max_interval_(kDefaultMaxPollIntervalMs), + talk_mediator_hookup_(NULL), + command_channel_(NULL), + directory_manager_hookup_(NULL), + syncer_events_(NULL), + model_safe_worker_(NULL), + disable_idle_detection_(false) { } SyncerThread::SyncerThread( @@ -127,15 +168,14 @@ SyncerThread::SyncerThread( ServerConnectionManager* connection_manager, AllStatus* all_status, ModelSafeWorker* model_safe_worker) - : stop_syncer_thread_(false), - thread_running_(false), - connected_(false), + : thread_main_started_(false, false), + thread_("SyncEngine_SyncerThread"), + vault_field_changed_(&lock_), p2p_authenticated_(false), p2p_subscribed_(false), client_command_hookup_(NULL), conn_mgr_hookup_(NULL), allstatus_(all_status), - syncer_(NULL), dirman_(mgr), scm_(connection_manager), syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), @@ -172,55 +212,76 @@ SyncerThread::~SyncerThread() { syncer_event_channel_.reset(); directory_manager_hookup_.reset(); syncer_events_.reset(); - delete syncer_; + delete vault_.syncer_; talk_mediator_hookup_.reset(); - CHECK(!thread_running_); + CHECK(!thread_.IsRunning()); } // Creates and starts a syncer thread. // Returns true if it creates a thread or if there's currently a thread running // and false otherwise. bool SyncerThread::Start() { - MutexLock lock(&mutex_); - if (thread_running_) { - return true; - } - thread_running_ = - (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); - if (thread_running_) { - pthread_detach(thread_); + { + AutoLock lock(lock_); + if (thread_.IsRunning()) { + return true; + } + + if (!thread_.Start()) { + return false; + } } - return thread_running_; + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::ThreadMain)); + + // Wait for notification that our task makes it safely onto the message + // loop before returning, so the caller can't call Stop before we're + // actually up and running. This is for consistency with the old pthread + // impl because pthread_create would do this in one step. + thread_main_started_.Wait(); + LOG(INFO) << "SyncerThread started."; + return true; } // Stop processing. A max wait of at least 2*server RTT time is recommended. // Returns true if we stopped, false otherwise. bool SyncerThread::Stop(int max_wait) { - MutexLock lock(&mutex_); - if (!thread_running_) - return true; - stop_syncer_thread_ = true; - if (NULL != syncer_) { - // Try to early exit the syncer. - syncer_->RequestEarlyExit(); - } - pthread_cond_broadcast(&changed_.condvar_); - timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; - do { - const int wait_result = max_wait < 0 ? - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : - pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, - &deadline); - if (ETIMEDOUT == wait_result) { - LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; - return false; + { + AutoLock lock(lock_); + // If the thread has been started, then we either already have or are about + // to enter ThreadMainLoop so we have to proceed with shutdown and wait for + // it to finish. If the thread has not been started --and we now own the + // lock-- then we can early out because the caller has not called Start(). + if (!thread_.IsRunning()) + return true; + + LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " + << "true (vault_.stop_syncer_thread_)"; + // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit + // below). + vault_.stop_syncer_thread_ = true; + if (NULL != vault_.syncer_) { + // Try to early exit the syncer itself, which could be looping inside + // SyncShare. + vault_.syncer_->RequestEarlyExit(); } - } while (thread_running_); + + // stop_syncer_thread_ is now true and the Syncer has been told to exit. + // We want to wake up all waiters so they can re-examine state. We signal, + // causing all waiters to try to re-acquire the lock, and then we release + // the lock, and join on our internal thread which should soon run off the + // end of ThreadMain. + vault_field_changed_.Broadcast(); + } + + // This will join, and finish when ThreadMain terminates. + thread_.Stop(); return true; } void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { - PThreadScopedLock<PThreadMutex> lock(&mutex_); + AutoLock lock(lock_); client_command_hookup_.reset(NewEventListenerHookup(channel, this, &SyncerThread::HandleClientCommand)); } @@ -241,67 +302,89 @@ void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { } void SyncerThread::ThreadMainLoop() { + // This is called with lock_ acquired. + lock_.AssertAcquired(); + LOG(INFO) << "In thread main loop."; + // Use the short poll value by default. - int poll_seconds = syncer_short_poll_interval_seconds_; + TimeDelta poll_seconds = + TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); int user_idle_milliseconds = 0; - timespec last_sync_time = { 0 }; + TimeTicks last_sync_time; bool initial_sync_for_thread = true; bool continue_sync_cycle = false; - while (!stop_syncer_thread_) { - if (!connected_) { + while (!vault_.stop_syncer_thread_) { + // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as + // below) because we cannot poll until these conditions are met, so we wait + // indefinitely. + if (!vault_.connected_) { LOG(INFO) << "Syncer thread waiting for connection."; - while (!connected_ && !stop_syncer_thread_) - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); - LOG_IF(INFO, connected_) << "Syncer thread found connection."; + while (!vault_.connected_ && !vault_.stop_syncer_thread_) + vault_field_changed_.Wait(); + LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; continue; } - if (syncer_ == NULL) { + if (vault_.syncer_ == NULL) { LOG(INFO) << "Syncer thread waiting for database initialization."; - while (syncer_ == NULL && !stop_syncer_thread_) - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); - LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; + while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) + vault_field_changed_.Wait(); + LOG_IF(INFO, !(vault_.syncer_ == NULL)) + << "Syncer was found after DB started."; continue; } - timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, - last_sync_time.tv_nsec }; - const timespec wake_time = - !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? - nudge_queue_.top().first : next_poll; - LOG(INFO) << "wake time is " << wake_time.tv_sec; - LOG(INFO) << "next poll is " << next_poll.tv_sec; - - const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, - &wake_time); - if (ETIMEDOUT != error) { - continue; // Check all the conditions again. + const TimeTicks next_poll = last_sync_time + poll_seconds; + const TimeTicks end_wait = + !vault_.nudge_queue_.empty() && + vault_.nudge_queue_.top().first < next_poll ? + vault_.nudge_queue_.top().first : next_poll; + LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); + LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); + + // We block until the CV is signaled (e.g a control field changed, loss of + // network connection, nudge, spurious, etc), or the poll interval elapses. + TimeDelta sleep_time = end_wait - TimeTicks::Now(); + if (sleep_time > TimeDelta::FromSeconds(0)) { + vault_field_changed_.TimedWait(sleep_time); + + if (TimeTicks::Now() < end_wait) { + // Didn't timeout. Could be a spurious signal, or a signal corresponding + // to an actual change in one of our control fields. By continuing here + // we perform the typical "always recheck conditions when signaled", + // (typically handled by a while(condition_not_met) cv.wait() construct) + // because we jump to the top of the loop. The main difference is we + // recalculate the wait interval, but last_sync_time won't have changed. + // So if we were signaled by a nudge (for ex.) we'll grab the new nudge + // off the queue and wait for that delta. If it was a spurious signal, + // we'll keep waiting for the same moment in time as we just were. + continue; + } } - const timespec now = GetPThreadAbsoluteTime(0); - // Handle a nudge, caused by either a notification or a local bookmark // event. This will also update the source of the following SyncMain call. - UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); + UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); - LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; - SyncMain(syncer_); - last_sync_time = now; + LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); + SyncMain(vault_.syncer_); + last_sync_time = TimeTicks::Now(); LOG(INFO) << "Updating the next polling time after SyncMain"; - poll_seconds = CalculatePollingWaitTime(allstatus_->status(), - poll_seconds, - &user_idle_milliseconds, - &continue_sync_cycle); + poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( + allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), + &user_idle_milliseconds, &continue_sync_cycle)); } + } // We check how long the user's been idle and sync less often if the machine is // not in use. The aim is to reduce server load. +// TODO(timsteele): Should use Time(Delta). int SyncerThread::CalculatePollingWaitTime( const AllStatus::Status& status, - int last_poll_wait, // in s + int last_poll_wait, // Time in seconds. int* user_idle_milliseconds, bool* continue_sync_cycle) { bool is_continuing_sync_cyle = *continue_sync_cycle; @@ -352,30 +435,25 @@ int SyncerThread::CalculatePollingWaitTime( return actual_next_wait; } -void* SyncerThread::ThreadMain() { - NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); - mutex_.Lock(); +void SyncerThread::ThreadMain() { + AutoLock lock(lock_); + // Signal Start() to let it know we've made it safely onto the message loop, + // and unblock it's caller. + thread_main_started_.Signal(); ThreadMainLoop(); - thread_running_ = false; - pthread_cond_broadcast(&changed_.condvar_); - mutex_.Unlock(); - LOG(INFO) << "Syncer thread exiting."; - return 0; + LOG(INFO) << "Syncer thread ThreadMain is done."; } void SyncerThread::SyncMain(Syncer* syncer) { CHECK(syncer); - mutex_.Unlock(); + AutoUnlock unlock(lock_); while (syncer->SyncShare()) { LOG(INFO) << "Looping in sync share"; } LOG(INFO) << "Done looping in sync share"; - - mutex_.Lock(); } -void SyncerThread::UpdateNudgeSource(const timespec& now, - bool* continue_sync_cycle, +void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, bool* initial_sync) { bool nudged = false; NudgeSource nudge_source = kUnknown; @@ -385,13 +463,14 @@ void SyncerThread::UpdateNudgeSource(const timespec& now, } // Update the nudge source if a new nudge has come through during the // previous sync cycle. - while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { + while (!vault_.nudge_queue_.empty() && + TimeTicks::Now() >= vault_.nudge_queue_.top().first) { if (!nudged) { - nudge_source = nudge_queue_.top().second; + nudge_source = vault_.nudge_queue_.top().second; *continue_sync_cycle = false; // Reset the continuation token on nudge. nudged = true; } - nudge_queue_.pop(); + vault_.nudge_queue_.pop(); } SetUpdatesSource(nudged, nudge_source, initial_sync); } @@ -422,11 +501,11 @@ void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, break; } } - syncer_->set_updates_source(updates_source); + vault_.syncer_->set_updates_source(updates_source); } void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { - MutexLock lock(&mutex_); + AutoLock lock(lock_); channel()->NotifyListeners(event); if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { return; @@ -438,33 +517,33 @@ void SyncerThread::HandleDirectoryManagerEvent( const syncable::DirectoryManagerEvent& event) { LOG(INFO) << "Handling a directory manager event"; if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { - MutexLock lock(&mutex_); + AutoLock lock(lock_); LOG(INFO) << "Syncer starting up for: " << event.dirname; // The underlying database structure is ready, and we should create // the syncer. - CHECK(syncer_ == NULL); - syncer_ = + CHECK(vault_.syncer_ == NULL); + vault_.syncer_ = new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); - syncer_->set_command_channel(command_channel_); + vault_.syncer_->set_command_channel(command_channel_); syncer_events_.reset(NewEventListenerHookup( - syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); - pthread_cond_broadcast(&changed_.condvar_); + vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); + vault_field_changed_.Broadcast(); } } static inline void CheckConnected(bool* connected, HttpResponse::ServerConnectionCode code, - pthread_cond_t* condvar) { + ConditionVariable* condvar) { if (*connected) { if (HttpResponse::CONNECTION_UNAVAILABLE == code) { *connected = false; - pthread_cond_broadcast(condvar); + condvar->Broadcast(); } } else { if (HttpResponse::SERVER_CONNECTION_OK == code) { *connected = true; - pthread_cond_broadcast(condvar); + condvar->Broadcast(); } } } @@ -472,16 +551,16 @@ static inline void CheckConnected(bool* connected, void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, &SyncerThread::HandleServerConnectionEvent)); - CheckConnected(&connected_, conn_mgr->server_status(), - &changed_.condvar_); + CheckConnected(&vault_.connected_, conn_mgr->server_status(), + &vault_field_changed_); } void SyncerThread::HandleServerConnectionEvent( const ServerConnectionEvent& event) { if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { - MutexLock lock(&mutex_); - CheckConnected(&connected_, event.connection_code, - &changed_.condvar_); + AutoLock lock(lock_); + CheckConnected(&vault_.connected_, event.connection_code, + &vault_field_changed_); } } @@ -513,10 +592,11 @@ int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { // Called with mutex_ already locked. void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, NudgeSource source) { - const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); + const TimeTicks nudge_time = TimeTicks::Now() + + TimeDelta::FromMilliseconds(milliseconds_from_now); NudgeObject nudge_object(nudge_time, source); - nudge_queue_.push(nudge_object); - pthread_cond_broadcast(&changed_.condvar_); + vault_.nudge_queue_.push(nudge_object); + vault_field_changed_.Broadcast(); } void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { @@ -528,7 +608,7 @@ void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { } void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { - MutexLock lock(&mutex_); + AutoLock lock(lock_); switch (event.what_happened) { case TalkMediatorEvent::LOGIN_SUCCEEDED: LOG(INFO) << "P2P: Login succeeded."; @@ -541,7 +621,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { case TalkMediatorEvent::SUBSCRIPTIONS_ON: LOG(INFO) << "P2P: Subscriptions successfully enabled."; p2p_subscribed_ = true; - if (NULL != syncer_) { + if (NULL != vault_.syncer_) { LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; NudgeSyncImpl(0, kLocal); } @@ -552,7 +632,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { break; case TalkMediatorEvent::NOTIFICATION_RECEIVED: LOG(INFO) << "P2P: Updates on server, pushing syncer"; - if (NULL != syncer_) { + if (NULL != vault_.syncer_) { NudgeSyncImpl(0, kNotification); } break; @@ -560,8 +640,9 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { break; } - if (NULL != syncer_) { - syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); + if (NULL != vault_.syncer_) { + vault_.syncer_->set_notifications_enabled( + p2p_authenticated_ && p2p_subscribed_); } } diff --git a/chrome/browser/sync/engine/syncer_thread.h b/chrome/browser/sync/engine/syncer_thread.h index ca22969e..4d1ae95 100644 --- a/chrome/browser/sync/engine/syncer_thread.h +++ b/chrome/browser/sync/engine/syncer_thread.h @@ -3,7 +3,8 @@ // found in the LICENSE file. // // A class to run the syncer on a thread. - +// This is the default implementation of SyncerThread whose Stop implementation +// does not support a timeout, but is greatly simplified. #ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ #define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ @@ -13,11 +14,15 @@ #include <vector> #include "base/basictypes.h" +#include "base/condition_variable.h" +#include "base/ref_counted.h" #include "base/scoped_ptr.h" +#include "base/thread.h" +#include "base/time.h" +#include "base/waitable_event.h" #include "chrome/browser/sync/engine/all_status.h" #include "chrome/browser/sync/engine/client_command_channel.h" #include "chrome/browser/sync/util/event_sys-inl.h" -#include "chrome/browser/sync/util/pthread_helpers.h" #include "testing/gtest/include/gtest/gtest_prod.h" // For FRIEND_TEST class EventListenerHookup; @@ -39,20 +44,42 @@ struct SyncerEvent; struct SyncerShutdownEvent; struct TalkMediatorEvent; -class SyncerThread { - FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime); - FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime); - +class SyncerThreadFactory { public: - friend class SyncerThreadTest; + // Creates a SyncerThread based on the default (or user-overridden) + // implementation. The thread does not start running until you call Start(), + // which will cause it to check-and-wait for certain conditions to be met + // (such as valid connection with Server established, syncable::Directory has + // been opened) before performing an intial sync with a server. It uses + // |connection_manager| to detect valid connections, and |mgr| to detect the + // opening of a Directory, which will cause it to create a Syncer object for + // said Directory, and assign |model_safe_worker| to it. |connection_manager| + // and |mgr| should outlive the SyncerThread. You must stop the thread by + // calling Stop before destroying the object. Stopping will first tear down + // the Syncer object, allowing it to finish work in progress, before joining + // the Stop-calling thread with the internal thread. + static SyncerThread* Create(ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker); + private: + DISALLOW_IMPLICIT_CONSTRUCTORS(SyncerThreadFactory); +}; +class SyncerThread : public base::RefCountedThreadSafe<SyncerThread> { + FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime); + FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime); + FRIEND_TEST(SyncerThreadWithSyncerTest, Polling); + FRIEND_TEST(SyncerThreadWithSyncerTest, Nudge); + friend class SyncerThreadWithSyncerTest; + friend class SyncerThreadFactory; +public: enum NudgeSource { kUnknown = 0, kNotification, kLocal, kContinuation }; - // Server can overwrite these values via client commands. // Standard short poll. This is used when XMPP is off. static const int kDefaultShortPollIntervalSeconds = 60; @@ -62,54 +89,106 @@ class SyncerThread { // longest possible poll interval. static const int kDefaultMaxPollIntervalMs = 30 * 60 * 1000; - SyncerThread(ClientCommandChannel* command_channel, - syncable::DirectoryManager* mgr, - ServerConnectionManager* connection_manager, AllStatus* all_status, - ModelSafeWorker* model_safe_worker); - ~SyncerThread(); + virtual ~SyncerThread(); + + virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr); - void WatchConnectionManager(ServerConnectionManager* conn_mgr); - // Creates and starts a syncer thread. + // Starts a syncer thread. // Returns true if it creates a thread or if there's currently a thread // running and false otherwise. - bool Start(); + virtual bool Start(); - // Stop processing. A max wait of at least 2*server RTT time is recommended. - // returns true if we stopped, false otherwise. - bool Stop(int max_wait); + // Stop processing. |max_wait| doesn't do anything in this version. + virtual bool Stop(int max_wait); - // Nudges the syncer to sync with a delay specified. This API is for access + // Nudges the syncer to sync with a delay specified. This API is for access // from the SyncerThread's controller and will cause a mutex lock. - bool NudgeSyncer(int milliseconds_from_now, NudgeSource source); + virtual bool NudgeSyncer(int milliseconds_from_now, NudgeSource source); // Registers this thread to watch talk mediator events. - void WatchTalkMediator(TalkMediator* talk_mediator); + virtual void WatchTalkMediator(TalkMediator* talk_mediator); - void WatchClientCommands(ClientCommandChannel* channel); + virtual void WatchClientCommands(ClientCommandChannel* channel); - SyncerEventChannel* channel(); + virtual SyncerEventChannel* channel(); - private: - // A few members to gate the rate at which we nudge the syncer. - enum { - kNudgeRateLimitCount = 6, - kNudgeRateLimitTime = 180, - }; + protected: + SyncerThread(); // Necessary for temporary pthreads-based PIMPL impl. + SyncerThread(ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker); + virtual void ThreadMain(); + void ThreadMainLoop(); - // A queue of all scheduled nudges. One insertion for every call to - // NudgeQueue(). - typedef std::pair<timespec, NudgeSource> NudgeObject; + virtual void SetConnected(bool connected) { + DCHECK(!thread_.IsRunning()); + vault_.connected_ = connected; + } + + virtual void SetSyncerPollingInterval(base::TimeDelta interval) { + // TODO(timsteele): Use TimeDelta internally. + syncer_polling_interval_ = static_cast<int>(interval.InSeconds()); + } + virtual void SetSyncerShortPollInterval(base::TimeDelta interval) { + // TODO(timsteele): Use TimeDelta internally. + syncer_short_poll_interval_seconds_ = + static_cast<int>(interval.InSeconds()); + } + + // Needed to emulate the behavior of pthread_create, which synchronously + // started the thread and set the value of thread_running_ to true. + // We can't quite match that because we asynchronously post the task, + // which opens a window for Stop to get called before the task actually + // makes it. To prevent this, we block Start() until we're sure it's ok. + base::WaitableEvent thread_main_started_; - struct IsTimeSpecGreater { + // Handle of the running thread. + base::Thread thread_; + + typedef std::pair<base::TimeTicks, NudgeSource> NudgeObject; + + struct IsTimeTicksGreater { inline bool operator() (const NudgeObject& lhs, const NudgeObject& rhs) { - return lhs.first.tv_sec == rhs.first.tv_sec ? - lhs.first.tv_nsec > rhs.first.tv_nsec : - lhs.first.tv_sec > rhs.first.tv_sec; + return lhs.first > rhs.first; } }; typedef std::priority_queue<NudgeObject, std::vector<NudgeObject>, - IsTimeSpecGreater> NudgeQueue; + IsTimeTicksGreater> NudgeQueue; + + // Fields that are modified / accessed by multiple threads go in this struct + // for clarity and explicitness. + struct ProtectedFields { + // False when we want to stop the thread. + bool stop_syncer_thread_; + + Syncer* syncer_; + + // State of the server connection. + bool connected_; + + // A queue of all scheduled nudges. One insertion for every call to + // NudgeQueue(). + NudgeQueue nudge_queue_; + + ProtectedFields() + : stop_syncer_thread_(false), syncer_(NULL), connected_(false) {} + } vault_; + + // Gets signaled whenever a thread outside of the syncer thread changes a + // protected field in the vault_. + ConditionVariable vault_field_changed_; + + // Used to lock everything in |vault_|. + Lock lock_; + + private: + // A few members to gate the rate at which we nudge the syncer. + enum { + kNudgeRateLimitCount = 6, + kNudgeRateLimitTime = 180, + }; // Threshold multipler for how long before user should be considered idle. static const int kPollBackoffThresholdMultiplier = 10; @@ -125,9 +204,6 @@ class SyncerThread { void HandleTalkMediatorEvent(const TalkMediatorEvent& event); - void* ThreadMain(); - void ThreadMainLoop(); - void SyncMain(Syncer* syncer); // Calculates the next sync wait time in seconds. last_poll_wait is the time @@ -137,42 +213,24 @@ class SyncerThread { // continue_sync_cycle parameter is used to determine whether or not we are // calculating a polling wait time that is a continuation of an sync cycle // which terminated while the syncer still had work to do. - int CalculatePollingWaitTime( + virtual int CalculatePollingWaitTime( const AllStatus::Status& status, int last_poll_wait, // in s int* user_idle_milliseconds, bool* continue_sync_cycle); // Helper to above function, considers effect of user idle time. - int CalculateSyncWaitTime(int last_wait, int user_idle_ms); + virtual int CalculateSyncWaitTime(int last_wait, int user_idle_ms); // Sets the source value of the controlled syncer's updates_source value. // The initial sync boolean is updated if read as a sentinel. The following // two methods work in concert to achieve this goal. - void UpdateNudgeSource(const timespec& now, bool* continue_sync_cycle, + void UpdateNudgeSource(bool* continue_sync_cycle, bool* initial_sync); void SetUpdatesSource(bool nudged, NudgeSource nudge_source, bool* initial_sync); // For unit tests only. - void DisableIdleDetection() { disable_idle_detection_ = true; } - - // False when we want to stop the thread. - bool stop_syncer_thread_; - - // We use one mutex for all members except the channel. - PThreadMutex mutex_; - typedef PThreadScopedLock<PThreadMutex> MutexLock; - - // Handle of the running thread. - pthread_t thread_; - bool thread_running_; - - // Gets signaled whenever a thread outside of the syncer thread changes a - // member variable. - PThreadCondVar changed_; - - // State of the server connection. - bool connected_; + virtual void DisableIdleDetection() { disable_idle_detection_ = true; } // State of the notification framework is tracked by these values. bool p2p_authenticated_; @@ -182,8 +240,6 @@ class SyncerThread { scoped_ptr<EventListenerHookup> conn_mgr_hookup_; const AllStatus* allstatus_; - Syncer* syncer_; - syncable::DirectoryManager* dirman_; ServerConnectionManager* scm_; @@ -208,8 +264,6 @@ class SyncerThread { // this is called. void NudgeSyncImpl(int milliseconds_from_now, NudgeSource source); - NudgeQueue nudge_queue_; - scoped_ptr<EventListenerHookup> talk_mediator_hookup_; ClientCommandChannel* const command_channel_; scoped_ptr<EventListenerHookup> directory_manager_hookup_; @@ -228,4 +282,4 @@ class SyncerThread { } // namespace browser_sync -#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_ +#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_H_
\ No newline at end of file diff --git a/chrome/browser/sync/engine/syncer_thread_pthreads.cc b/chrome/browser/sync/engine/syncer_thread_pthreads.cc new file mode 100644 index 0000000..9fdfb21 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread_pthreads.cc @@ -0,0 +1,578 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#include "chrome/browser/sync/engine/syncer_thread_pthreads.h" + +#include "build/build_config.h" + +#ifdef OS_MACOSX +#include <CoreFoundation/CFNumber.h> +#include <IOKit/IOTypes.h> +#include <IOKit/IOKitLib.h> +#endif + +#include <algorithm> +#include <map> +#include <queue> + +#include "chrome/browser/sync/engine/auth_watcher.h" +#include "chrome/browser/sync/engine/model_safe_worker.h" +#include "chrome/browser/sync/engine/net/server_connection_manager.h" +#include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/notifier/listener/talk_mediator.h" +#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" +#include "chrome/browser/sync/syncable/directory_manager.h" + +using std::priority_queue; +using std::min; + +static inline bool operator < (const timespec& a, const timespec& b) { + return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; +} + +namespace { + +// Returns the amount of time since the user last interacted with the computer, +// in milliseconds +int UserIdleTime() { +#ifdef OS_WIN + LASTINPUTINFO last_input_info; + last_input_info.cbSize = sizeof(LASTINPUTINFO); + + // Get time in windows ticks since system start of last activity. + BOOL b = ::GetLastInputInfo(&last_input_info); + if (b == TRUE) + return ::GetTickCount() - last_input_info.dwTime; +#elif defined(OS_MACOSX) + // It would be great to do something like: + // + // return 1000 * + // CGEventSourceSecondsSinceLastEventType( + // kCGEventSourceStateCombinedSessionState, + // kCGAnyInputEventType); + // + // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon + // and can't link that high up the food chain. Thus this mucking in IOKit. + + io_service_t hid_service = + IOServiceGetMatchingService(kIOMasterPortDefault, + IOServiceMatching("IOHIDSystem")); + if (!hid_service) { + LOG(WARNING) << "Could not obtain IOHIDSystem"; + return 0; + } + + CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, + CFSTR("HIDIdleTime"), + kCFAllocatorDefault, + 0); + if (!object) { + LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; + IOObjectRelease(hid_service); + return 0; + } + + int64 idle_time; // in nanoseconds + Boolean success; + if (CFGetTypeID(object) == CFNumberGetTypeID()) { + success = CFNumberGetValue((CFNumberRef)object, + kCFNumberSInt64Type, + &idle_time); + } else { + LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; + } + + CFRelease(object); + IOObjectRelease(hid_service); + + if (!success) { + LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; + return 0; + } else { + return idle_time / 1000000; // nano to milli + } +#else + static bool was_logged = false; + if (!was_logged) { + was_logged = true; + LOG(INFO) << "UserIdleTime unimplemented on this platform, " + "synchronization will not throttle when user idle"; + } +#endif + + return 0; +} + +} // namespace + +namespace browser_sync { + +SyncerThreadPthreads::SyncerThreadPthreads( + ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, + AllStatus* all_status, ModelSafeWorker* model_safe_worker) + : SyncerThread() { + impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr, + connection_manager, all_status, model_safe_worker)); +} + +bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now, + SyncerThread::NudgeSource source) { + MutexLock lock(&mutex_); + if (syncer_ == NULL) { + return false; + } + NudgeSyncImpl(milliseconds_from_now, source); + return true; +} + +void* RunSyncerThread(void* syncer_thread) { + return (reinterpret_cast<SyncerThreadPthreadImpl*>( + syncer_thread))->ThreadMain(); +} + +SyncerThreadPthreadImpl::SyncerThreadPthreadImpl( + ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, + AllStatus* all_status, + ModelSafeWorker* model_safe_worker) + : dirman_(mgr), scm_(connection_manager), + syncer_(NULL), syncer_events_(NULL), thread_running_(false), + syncer_short_poll_interval_seconds_( + SyncerThread::kDefaultShortPollIntervalSeconds), + syncer_long_poll_interval_seconds_( + SyncerThread::kDefaultLongPollIntervalSeconds), + syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds), + syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs), + stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), + p2p_authenticated_(false), p2p_subscribed_(false), + allstatus_(all_status), talk_mediator_hookup_(NULL), + command_channel_(command_channel), directory_manager_hookup_(NULL), + model_safe_worker_(model_safe_worker), + client_command_hookup_(NULL), disable_idle_detection_(false) { + + SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; + syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); + + if (dirman_) { + directory_manager_hookup_.reset(NewEventListenerHookup( + dirman_->channel(), this, + &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent)); + } + + if (scm_) { + WatchConnectionManager(scm_); + } + + if (command_channel_) { + WatchClientCommands(command_channel_); + } +} + +SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() { + client_command_hookup_.reset(); + conn_mgr_hookup_.reset(); + syncer_event_channel_.reset(); + directory_manager_hookup_.reset(); + syncer_events_.reset(); + delete syncer_; + talk_mediator_hookup_.reset(); + CHECK(!thread_running_); +} + +// Creates and starts a syncer thread. +// Returns true if it creates a thread or if there's currently a thread running +// and false otherwise. +bool SyncerThreadPthreadImpl::Start() { + MutexLock lock(&mutex_); + if (thread_running_) { + return true; + } + thread_running_ = + (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); + if (thread_running_) { + pthread_detach(thread_); + } + return thread_running_; +} + +// Stop processing. A max wait of at least 2*server RTT time is recommended. +// Returns true if we stopped, false otherwise. +bool SyncerThreadPthreadImpl::Stop(int max_wait) { + MutexLock lock(&mutex_); + if (!thread_running_) + return true; + stop_syncer_thread_ = true; + if (NULL != syncer_) { + // Try to early exit the syncer. + syncer_->RequestEarlyExit(); + } + pthread_cond_broadcast(&changed_.condvar_); + timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; + do { + const int wait_result = max_wait < 0 ? + pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : + pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, + &deadline); + if (ETIMEDOUT == wait_result) { + LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; + return false; + } + } while (thread_running_); + return true; +} + +void SyncerThreadPthreadImpl::WatchClientCommands( + ClientCommandChannel* channel) { + PThreadScopedLock<PThreadMutex> lock(&mutex_); + client_command_hookup_.reset(NewEventListenerHookup(channel, this, + &SyncerThreadPthreadImpl::HandleClientCommand)); +} + +void SyncerThreadPthreadImpl::HandleClientCommand( + ClientCommandChannel::EventType event) { + if (!event) { + return; + } + + // Mutex not really necessary for these. + if (event->has_set_sync_poll_interval()) { + syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); + } + + if (event->has_set_sync_long_poll_interval()) { + syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); + } +} + +void SyncerThreadPthreadImpl::ThreadMainLoop() { + // Use the short poll value by default. + int poll_seconds = syncer_short_poll_interval_seconds_; + int user_idle_milliseconds = 0; + timespec last_sync_time = { 0 }; + bool initial_sync_for_thread = true; + bool continue_sync_cycle = false; + + while (!stop_syncer_thread_) { + if (!connected_) { + LOG(INFO) << "Syncer thread waiting for connection."; + while (!connected_ && !stop_syncer_thread_) + pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); + LOG_IF(INFO, connected_) << "Syncer thread found connection."; + continue; + } + + if (syncer_ == NULL) { + LOG(INFO) << "Syncer thread waiting for database initialization."; + while (syncer_ == NULL && !stop_syncer_thread_) + pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); + LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; + continue; + } + + timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, + last_sync_time.tv_nsec }; + const timespec wake_time = + !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? + nudge_queue_.top().first : next_poll; + LOG(INFO) << "wake time is " << wake_time.tv_sec; + LOG(INFO) << "next poll is " << next_poll.tv_sec; + + const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, + &wake_time); + if (ETIMEDOUT != error) { + continue; // Check all the conditions again. + } + + const timespec now = GetPThreadAbsoluteTime(0); + + // Handle a nudge, caused by either a notification or a local bookmark + // event. This will also update the source of the following SyncMain call. + UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); + + LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; + SyncMain(syncer_); + last_sync_time = now; + + LOG(INFO) << "Updating the next polling time after SyncMain"; + poll_seconds = CalculatePollingWaitTime(allstatus_->status(), + poll_seconds, + &user_idle_milliseconds, + &continue_sync_cycle); + } +} + +// We check how long the user's been idle and sync less often if the machine is +// not in use. The aim is to reduce server load. +int SyncerThreadPthreadImpl::CalculatePollingWaitTime( + const AllStatus::Status& status, + int last_poll_wait, // in s + int* user_idle_milliseconds, + bool* continue_sync_cycle) { + bool is_continuing_sync_cyle = *continue_sync_cycle; + *continue_sync_cycle = false; + + // Determine if the syncer has unfinished work to do from allstatus_. + const bool syncer_has_work_to_do = + status.updates_available > status.updates_received + || status.unsynced_count > 0; + LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; + + // First calculate the expected wait time, figuring in any backoff because of + // user idle time. next_wait is in seconds + syncer_polling_interval_ = (!status.notifications_enabled) ? + syncer_short_poll_interval_seconds_ : + syncer_long_poll_interval_seconds_; + int default_next_wait = syncer_polling_interval_; + int actual_next_wait = default_next_wait; + + if (syncer_has_work_to_do) { + // Provide exponential backoff due to consecutive errors, else attempt to + // complete the work as soon as possible. + if (!is_continuing_sync_cyle) { + actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); + } else { + actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); + } + *continue_sync_cycle = true; + } else if (!status.notifications_enabled) { + // Ensure that we start exponential backoff from our base polling + // interval when we are not continuing a sync cycle. + last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); + + // Did the user start interacting with the computer again? + // If so, revise our idle time (and probably next_sync_time) downwards + int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); + if (new_idle_time < *user_idle_milliseconds) { + *user_idle_milliseconds = new_idle_time; + } + actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, + *user_idle_milliseconds) / 1000; + DCHECK_GE(actual_next_wait, default_next_wait); + } + + LOG(INFO) << "Sync wait: idle " << default_next_wait + << " non-idle or backoff " << actual_next_wait << "."; + + return actual_next_wait; +} + +void* SyncerThreadPthreadImpl::ThreadMain() { + NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); + mutex_.Lock(); + ThreadMainLoop(); + thread_running_ = false; + pthread_cond_broadcast(&changed_.condvar_); + mutex_.Unlock(); + LOG(INFO) << "Syncer thread exiting."; + return 0; +} + +void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) { + CHECK(syncer); + mutex_.Unlock(); + while (syncer->SyncShare()) { + LOG(INFO) << "Looping in sync share"; + } + LOG(INFO) << "Done looping in sync share"; + + mutex_.Lock(); +} + +void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now, + bool* continue_sync_cycle, + bool* initial_sync) { + bool nudged = false; + SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown; + // Has the previous sync cycle completed? + if (continue_sync_cycle) { + nudge_source = SyncerThread::kContinuation; + } + // Update the nudge source if a new nudge has come through during the + // previous sync cycle. + while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { + if (!nudged) { + nudge_source = nudge_queue_.top().second; + *continue_sync_cycle = false; // Reset the continuation token on nudge. + nudged = true; + } + nudge_queue_.pop(); + } + SetUpdatesSource(nudged, nudge_source, initial_sync); +} + +void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged, + SyncerThread::NudgeSource nudge_source, bool* initial_sync) { + sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = + sync_pb::GetUpdatesCallerInfo::UNKNOWN; + if (*initial_sync) { + updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; + *initial_sync = false; + } else if (!nudged) { + updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; + } else { + switch (nudge_source) { + case SyncerThread::kNotification: + updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; + break; + case SyncerThread::kLocal: + updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; + break; + case SyncerThread::kContinuation: + updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; + break; + case SyncerThread::kUnknown: + default: + updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; + break; + } + } + syncer_->set_updates_source(updates_source); +} + +void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) { + MutexLock lock(&mutex_); + channel()->NotifyListeners(event); + if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { + return; + } + NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown); +} + +void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent( + const syncable::DirectoryManagerEvent& event) { + LOG(INFO) << "Handling a directory manager event"; + if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { + MutexLock lock(&mutex_); + LOG(INFO) << "Syncer starting up for: " << event.dirname; + // The underlying database structure is ready, and we should create + // the syncer. + CHECK(syncer_ == NULL); + syncer_ = + new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); + + syncer_->set_command_channel(command_channel_); + syncer_events_.reset(NewEventListenerHookup( + syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent)); + pthread_cond_broadcast(&changed_.condvar_); + } +} + +static inline void CheckConnected(bool* connected, + HttpResponse::ServerConnectionCode code, + pthread_cond_t* condvar) { + if (*connected) { + if (HttpResponse::CONNECTION_UNAVAILABLE == code) { + *connected = false; + pthread_cond_broadcast(condvar); + } + } else { + if (HttpResponse::SERVER_CONNECTION_OK == code) { + *connected = true; + pthread_cond_broadcast(condvar); + } + } +} + +void SyncerThreadPthreadImpl::WatchConnectionManager( + ServerConnectionManager* conn_mgr) { + conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, + &SyncerThreadPthreadImpl::HandleServerConnectionEvent)); + CheckConnected(&connected_, conn_mgr->server_status(), + &changed_.condvar_); +} + +void SyncerThreadPthreadImpl::HandleServerConnectionEvent( + const ServerConnectionEvent& event) { + if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { + MutexLock lock(&mutex_); + CheckConnected(&connected_, event.connection_code, + &changed_.condvar_); + } +} + +SyncerEventChannel* SyncerThreadPthreadImpl::channel() { + return syncer_event_channel_.get(); +} + +// Inputs and return value in milliseconds. +int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval, + int user_idle_ms) { + // syncer_polling_interval_ is in seconds + int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; + + // This is our default and lower bound. + int next_wait = syncer_polling_interval_ms; + + // Get idle time, bounded by max wait. + int idle = min(user_idle_ms, syncer_max_interval_); + + // If the user has been idle for a while, we'll start decreasing the poll + // rate. + if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { + next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( + last_interval / 1000), syncer_max_interval_ / 1000) * 1000; + } + + return next_wait; +} + +// Called with mutex_ already locked. +void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now, + SyncerThread::NudgeSource source) { + const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); + NudgeObject nudge_object(nudge_time, source); + nudge_queue_.push(nudge_object); + pthread_cond_broadcast(&changed_.condvar_); +} + +void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) { + talk_mediator_hookup_.reset( + NewEventListenerHookup( + mediator->channel(), + this, + &SyncerThreadPthreadImpl::HandleTalkMediatorEvent)); +} + +void SyncerThreadPthreadImpl::HandleTalkMediatorEvent( + const TalkMediatorEvent& event) { + MutexLock lock(&mutex_); + switch (event.what_happened) { + case TalkMediatorEvent::LOGIN_SUCCEEDED: + LOG(INFO) << "P2P: Login succeeded."; + p2p_authenticated_ = true; + break; + case TalkMediatorEvent::LOGOUT_SUCCEEDED: + LOG(INFO) << "P2P: Login succeeded."; + p2p_authenticated_ = false; + break; + case TalkMediatorEvent::SUBSCRIPTIONS_ON: + LOG(INFO) << "P2P: Subscriptions successfully enabled."; + p2p_subscribed_ = true; + if (NULL != syncer_) { + LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; + NudgeSyncImpl(0, SyncerThread::kLocal); + } + break; + case TalkMediatorEvent::SUBSCRIPTIONS_OFF: + LOG(INFO) << "P2P: Subscriptions are not enabled."; + p2p_subscribed_ = false; + break; + case TalkMediatorEvent::NOTIFICATION_RECEIVED: + LOG(INFO) << "P2P: Updates on server, pushing syncer"; + if (NULL != syncer_) { + NudgeSyncImpl(0, SyncerThread::kNotification); + } + break; + default: + break; + } + + if (NULL != syncer_) { + syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); + } +} + +} // namespace browser_sync
\ No newline at end of file diff --git a/chrome/browser/sync/engine/syncer_thread_pthreads.h b/chrome/browser/sync/engine/syncer_thread_pthreads.h new file mode 100644 index 0000000..bebd8ed --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread_pthreads.h @@ -0,0 +1,284 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// *THIS EXISTS FOR EXPERIMENTATION AND TESTING WHILE WE REPLACE PTHREADS +// WITH CHROME THREADS IN SYNC CODE* + +// A class to run the syncer on a thread. Uses PIMPL to wrap the old, original +// pthreads implementation of SyncerThread. +#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_PTHREADS_H_ +#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_PTHREADS_H_ + +#include <list> +#include <map> +#include <queue> +#include <vector> + +#include "base/basictypes.h" +#include "base/scoped_ptr.h" +#include "chrome/browser/sync/engine/all_status.h" +#include "chrome/browser/sync/engine/client_command_channel.h" +#include "chrome/browser/sync/util/event_sys-inl.h" +#include "chrome/browser/sync/util/pthread_helpers.h" +#include "chrome/browser/sync/engine/syncer_thread.h" +#include "testing/gtest/include/gtest/gtest_prod.h" // For FRIEND_TEST + +class EventListenerHookup; + +namespace syncable { +class DirectoryManager; +struct DirectoryManagerEvent; +} + +namespace browser_sync { + +class ModelSafeWorker; +class ServerConnectionManager; +class Syncer; +class TalkMediator; +class URLFactory; +struct ServerConnectionEvent; +struct SyncerEvent; +struct SyncerShutdownEvent; +struct TalkMediatorEvent; + +// The legacy implementation of SyncerThread using pthreads, kept around for +// historical experimentation until a new version is finalized. +class SyncerThreadPthreadImpl { + public: + virtual ~SyncerThreadPthreadImpl(); + + virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr); + // Creates and starts a syncer thread. + // Returns true if it creates a thread or if there's currently a thread + // running and false otherwise. + virtual bool Start(); + + // Stop processing. A max wait of at least 2*server RTT time is recommended. + // returns true if we stopped, false otherwise. + virtual bool Stop(int max_wait); + + // Nudges the syncer to sync with a delay specified. This API is for access + // from the SyncerThread's controller and will cause a mutex lock. + virtual bool NudgeSyncer(int milliseconds_from_now, + SyncerThread::NudgeSource source); + + // Registers this thread to watch talk mediator events. + virtual void WatchTalkMediator(TalkMediator* talk_mediator); + + virtual void WatchClientCommands(ClientCommandChannel* channel); + + virtual SyncerEventChannel* channel(); + + private: + friend class SyncerThreadPthreads; + SyncerThreadPthreadImpl(ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker); + + // A few members to gate the rate at which we nudge the syncer. + enum { + kNudgeRateLimitCount = 6, + kNudgeRateLimitTime = 180, + }; + + // A queue of all scheduled nudges. One insertion for every call to + // NudgeQueue(). + typedef std::pair<timespec, SyncerThread::NudgeSource> NudgeObject; + + struct IsTimeSpecGreater { + inline bool operator() (const NudgeObject& lhs, const NudgeObject& rhs) { + return lhs.first.tv_sec == rhs.first.tv_sec ? + lhs.first.tv_nsec > rhs.first.tv_nsec : + lhs.first.tv_sec > rhs.first.tv_sec; + } + }; + + typedef std::priority_queue<NudgeObject, std::vector<NudgeObject>, + IsTimeSpecGreater> NudgeQueue; + + // Threshold multipler for how long before user should be considered idle. + static const int kPollBackoffThresholdMultiplier = 10; + + friend void* RunSyncerThread(void* syncer_thread); + void* Run(); + void HandleDirectoryManagerEvent( + const syncable::DirectoryManagerEvent& event); + void HandleSyncerEvent(const SyncerEvent& event); + void HandleClientCommand(ClientCommandChannel::EventType event); + + void HandleServerConnectionEvent(const ServerConnectionEvent& event); + + void HandleTalkMediatorEvent(const TalkMediatorEvent& event); + + void* ThreadMain(); + void ThreadMainLoop(); + + void SyncMain(Syncer* syncer); + + // Calculates the next sync wait time in seconds. last_poll_wait is the time + // duration of the previous polling timeout which was used. + // user_idle_milliseconds is updated by this method, and is a report of the + // full amount of time since the last period of activity for the user. The + // continue_sync_cycle parameter is used to determine whether or not we are + // calculating a polling wait time that is a continuation of an sync cycle + // which terminated while the syncer still had work to do. + int CalculatePollingWaitTime( + const AllStatus::Status& status, + int last_poll_wait, // in s + int* user_idle_milliseconds, + bool* continue_sync_cycle); + // Helper to above function, considers effect of user idle time. + int CalculateSyncWaitTime(int last_wait, int user_idle_ms); + + // Sets the source value of the controlled syncer's updates_source value. + // The initial sync boolean is updated if read as a sentinel. The following + // two methods work in concert to achieve this goal. + void UpdateNudgeSource(const timespec& now, bool* continue_sync_cycle, + bool* initial_sync); + void SetUpdatesSource(bool nudged, SyncerThread::NudgeSource nudge_source, + bool* initial_sync); + + // For unit tests only. + void DisableIdleDetection() { disable_idle_detection_ = true; } + + // False when we want to stop the thread. + bool stop_syncer_thread_; + + // We use one mutex for all members except the channel. + PThreadMutex mutex_; + typedef PThreadScopedLock<PThreadMutex> MutexLock; + + // Handle of the running thread. + pthread_t thread_; + bool thread_running_; + + // Gets signaled whenever a thread outside of the syncer thread changes a + // member variable. + PThreadCondVar changed_; + + // State of the server connection. + bool connected_; + + // State of the notification framework is tracked by these values. + bool p2p_authenticated_; + bool p2p_subscribed_; + + scoped_ptr<EventListenerHookup> client_command_hookup_; + scoped_ptr<EventListenerHookup> conn_mgr_hookup_; + const AllStatus* allstatus_; + + Syncer* syncer_; + + syncable::DirectoryManager* dirman_; + ServerConnectionManager* scm_; + + // Modifiable versions of kDefaultLongPollIntervalSeconds which can be + // updated by the server. + int syncer_short_poll_interval_seconds_; + int syncer_long_poll_interval_seconds_; + + // The time we wait between polls in seconds. This is used as lower bound on + // our wait time. Updated once per loop from the command line flag. + int syncer_polling_interval_; + + // The upper bound on the nominal wait between polls in seconds. Note that + // this bounds the "nominal" poll interval, while the the actual interval + // also takes previous failures into account. + int syncer_max_interval_; + + scoped_ptr<SyncerEventChannel> syncer_event_channel_; + + // This causes syncer to start syncing ASAP. If the rate of requests is too + // high the request will be silently dropped. mutex_ should be held when + // this is called. + void NudgeSyncImpl(int milliseconds_from_now, + SyncerThread::NudgeSource source); + + NudgeQueue nudge_queue_; + + scoped_ptr<EventListenerHookup> talk_mediator_hookup_; + ClientCommandChannel* const command_channel_; + scoped_ptr<EventListenerHookup> directory_manager_hookup_; + scoped_ptr<EventListenerHookup> syncer_events_; + + // Handles any tasks that will result in model changes (modifications of + // syncable::Entries). Pass this to the syncer created and managed by |this|. + // Only non-null in syncapi case. + scoped_ptr<ModelSafeWorker> model_safe_worker_; + + // Useful for unit tests + bool disable_idle_detection_; + + DISALLOW_COPY_AND_ASSIGN(SyncerThreadPthreadImpl); +}; + +// A new-version SyncerThread pimpl wrapper for the old legacy implementation. +class SyncerThreadPthreads : public SyncerThread { + FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime); + FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime); + FRIEND_TEST(SyncerThreadWithSyncerTest, Polling); + FRIEND_TEST(SyncerThreadWithSyncerTest, Nudge); + friend class SyncerThreadWithSyncerTest; + friend class SyncerThreadFactory; + public: + virtual ~SyncerThreadPthreads() {} + + virtual void WatchConnectionManager(ServerConnectionManager* conn_mgr) { + impl_->WatchConnectionManager(conn_mgr); + } + virtual bool Start() { + return impl_->Start(); + } + virtual bool Stop(int max_wait) { + return impl_->Stop(max_wait); + } + virtual bool NudgeSyncer(int milliseconds_from_now, NudgeSource source) { + return impl_->NudgeSyncer(milliseconds_from_now, source); + } + virtual void WatchTalkMediator(TalkMediator* talk_mediator) { + impl_->WatchTalkMediator(talk_mediator); + } + virtual void WatchClientCommands(ClientCommandChannel* channel) { + impl_->WatchClientCommands(channel); + } + virtual SyncerEventChannel* channel() { + return impl_->channel(); + } + protected: + SyncerThreadPthreads(ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker); + virtual void SetConnected(bool connected) { + impl_->connected_ = connected; + } + virtual void SetSyncerPollingInterval(int interval) { + impl_->syncer_polling_interval_ = interval; + } + virtual void SetSyncerShortPollInterval(base::TimeDelta interval) { + impl_->syncer_short_poll_interval_seconds_ = static_cast<int>( + interval.InSeconds()); + } + virtual void DisableIdleDetection() { impl_->disable_idle_detection_ = true; } + virtual int CalculateSyncWaitTime(int last_wait, int user_idle_ms) { + return impl_->CalculateSyncWaitTime(last_wait, user_idle_ms); + } + virtual int CalculatePollingWaitTime( + const AllStatus::Status& status, + int last_poll_wait, // in s + int* user_idle_milliseconds, + bool* continue_sync_cycle) { + return impl_->CalculatePollingWaitTime(status, last_poll_wait, + user_idle_milliseconds, continue_sync_cycle); + } + private: + scoped_ptr<SyncerThreadPthreadImpl> impl_; + DISALLOW_COPY_AND_ASSIGN(SyncerThreadPthreads); +}; + +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_PTHREADS_H_
\ No newline at end of file diff --git a/chrome/browser/sync/engine/syncer_thread_timed_stop.cc b/chrome/browser/sync/engine/syncer_thread_timed_stop.cc new file mode 100644 index 0000000..a396dd0 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread_timed_stop.cc @@ -0,0 +1,119 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" + +#include "build/build_config.h" + +#ifdef OS_MACOSX +#include <CoreFoundation/CFNumber.h> +#include <IOKit/IOTypes.h> +#include <IOKit/IOKitLib.h> +#endif + +#include <algorithm> +#include <map> +#include <queue> + +#include "chrome/browser/sync/engine/auth_watcher.h" +#include "chrome/browser/sync/engine/model_safe_worker.h" +#include "chrome/browser/sync/engine/net/server_connection_manager.h" +#include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/notifier/listener/talk_mediator.h" +#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" +#include "chrome/browser/sync/syncable/directory_manager.h" + +using std::priority_queue; +using std::min; +using base::Time; +using base::TimeDelta; +using base::TimeTicks; + +namespace browser_sync { + +SyncerThreadTimedStop::SyncerThreadTimedStop( + ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, + AllStatus* all_status, + ModelSafeWorker* model_safe_worker) + : SyncerThread(command_channel, mgr, connection_manager, all_status, + model_safe_worker), + in_thread_main_loop_(false) { +} + +// Stop processing. A max wait of at least 2*server RTT time is recommended. +// Returns true if we stopped, false otherwise. +bool SyncerThreadTimedStop::Stop(int max_wait) { + AutoLock lock(lock_); + // If the thread has been started, then we either already have or are about to + // enter ThreadMainLoop so we have to proceed with shutdown and wait for it to + // finish. If the thread has not been started --and we now own the lock-- + // then we can early out because the caller has not called Start(). + if (!thread_.IsRunning()) + return true; + + LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " + << "true (vault_.stop_syncer_thread_)"; + // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit + // below). + vault_.stop_syncer_thread_ = true; + if (NULL != vault_.syncer_) { + // Try to early exit the syncer. + vault_.syncer_->RequestEarlyExit(); + } + + // stop_syncer_thread_ is now true and the Syncer has been told to exit. + // We want to wake up all waiters so they can re-examine state. We signal, + // causing all waiters to try to re-acquire the lock, and then we atomically + // release the lock and wait. Our wait can be spuriously signaled, so we + // recalculate the remaining sleep time each time through and re- + // check the condition before exiting the loop. + vault_field_changed_.Broadcast(); + TimeTicks start = TimeTicks::Now(); + TimeTicks end = start + TimeDelta::FromMilliseconds(max_wait); + bool timed_out = false; + // Eventually the combination of RequestEarlyExit and setting + // stop_syncer_thread_ to true above will cause in_thread_main_loop_ to become + // false. + while (in_thread_main_loop_) { + TimeDelta sleep_time = end - TimeTicks::Now(); + if (sleep_time < TimeDelta::FromSeconds(0)) { + timed_out = true; + break; + } + LOG(INFO) << "Waiting in stop for " << sleep_time.InSeconds() << "s."; + vault_field_changed_.TimedWait(sleep_time); + } + + if (timed_out) { + LOG(ERROR) << "SyncerThread::Stop timed out or error. Problems likely."; + return false; + } + + // Stop() should not block on anything at this point, given above madness. + DLOG(INFO) << "Calling SyncerThread::thread_.Stop() at " + << Time::Now().ToInternalValue(); + thread_.Stop(); + DLOG(INFO) << "SyncerThread::thread_.Stop() finished at " + << Time::Now().ToInternalValue(); + return true; +} + +void SyncerThreadTimedStop::ThreadMain() { + AutoLock lock(lock_); + // Signal Start() to let it know we've made it safely are now running on the + // message loop, and unblock it's caller. + thread_main_started_.Signal(); + + // The only thing that could be waiting on this value is Stop, and we don't + // release the lock until we're far enough along to Stop safely. + in_thread_main_loop_ = true; + vault_field_changed_.Broadcast(); + ThreadMainLoop(); + in_thread_main_loop_ = false; + vault_field_changed_.Broadcast(); + LOG(INFO) << "Syncer thread ThreadMain is done."; +} + +} // namespace browser_sync
\ No newline at end of file diff --git a/chrome/browser/sync/engine/syncer_thread_timed_stop.h b/chrome/browser/sync/engine/syncer_thread_timed_stop.h new file mode 100644 index 0000000..e1037a1 --- /dev/null +++ b/chrome/browser/sync/engine/syncer_thread_timed_stop.h @@ -0,0 +1,53 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A class to run the syncer on a thread. This guy is the closest chrome-based +// (as opposed to pthreads based) SyncerThread to the old pthread implementation +// in semantics, as it supports a timeout on Stop() -- It is just an override of +// two methods from SyncerThread: ThreadMain and Stop -- to provide this. +#ifndef CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_TIMED_STOP_H_ +#define CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_TIMED_STOP_H_ + +#include <list> +#include <map> +#include <queue> +#include <vector> + +#include "chrome/browser/sync/engine/syncer_thread.h" + +namespace browser_sync { + +class SyncerThreadTimedStop : public SyncerThread { + FRIEND_TEST(SyncerThreadTest, CalculateSyncWaitTime); + FRIEND_TEST(SyncerThreadTest, CalculatePollingWaitTime); + FRIEND_TEST(SyncerThreadWithSyncerTest, Polling); + FRIEND_TEST(SyncerThreadWithSyncerTest, Nudge); + friend class SyncerThreadWithSyncerTest; + friend class SyncerThreadFactory; + public: + virtual ~SyncerThreadTimedStop() {} + + // Stop processing. This version comes with a supported max_wait. + // A max wait of at least 2*server RTT time is recommended. + // Returns true if we stopped, false otherwise. + virtual bool Stop(int max_wait); + + private: + SyncerThreadTimedStop(ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker); + virtual void ThreadMain(); + + // We use this to track when our synthesized thread loop is active, so we can + // timed-wait for it to become false. For this and only this (temporary) + // implementation, we protect this variable using our parent lock_. + bool in_thread_main_loop_; + + DISALLOW_COPY_AND_ASSIGN(SyncerThreadTimedStop); +}; + +} // namespace browser_sync + +#endif // CHROME_BROWSER_SYNC_ENGINE_SYNCER_THREAD_TIMED_STOP_H_
\ No newline at end of file diff --git a/chrome/browser/sync/engine/syncer_thread_unittest.cc b/chrome/browser/sync/engine/syncer_thread_unittest.cc index 6b758a7..fa28f7f 100644 --- a/chrome/browser/sync/engine/syncer_thread_unittest.cc +++ b/chrome/browser/sync/engine/syncer_thread_unittest.cc @@ -7,89 +7,158 @@ #include <set> #include <strstream> +#include "base/command_line.h" #include "base/scoped_ptr.h" +#include "base/time.h" +#include "chrome/browser/sync/engine/model_safe_worker.h" #include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" +#include "chrome/test/sync/engine/mock_server_connection.h" +#include "chrome/test/sync/engine/test_directory_setter_upper.h" #include "testing/gtest/include/gtest/gtest.h" +using base::Time; +using base::TimeDelta; + namespace browser_sync { -class SyncerThreadTest : public testing::Test { - protected: - SyncerThreadTest() {} - virtual ~SyncerThreadTest() {} - virtual void SetUp() {} - virtual void TearDown() {} +typedef testing::Test SyncerThreadTest; + +class SyncerThreadWithSyncerTest : public testing::Test { + public: + SyncerThreadWithSyncerTest() {} + virtual void SetUp() { + metadb_.SetUp(); + connection_.reset(new MockConnectionManager(metadb_.manager(), + metadb_.name())); + allstatus_.reset(new AllStatus()); + + syncer_thread_ = SyncerThreadFactory::Create(NULL, metadb_.manager(), + connection_.get(), allstatus_.get(), new ModelSafeWorker()); + + allstatus_->WatchSyncerThread(syncer_thread_); + syncer_thread_->SetConnected(true); + } + virtual void TearDown() { + syncer_thread_ = NULL; + allstatus_.reset(); + connection_.reset(); + metadb_.TearDown(); + } + ManuallyOpenedTestDirectorySetterUpper* metadb() { return &metadb_; } + MockConnectionManager* connection() { return connection_.get(); } + SyncerThread* syncer_thread() { return syncer_thread_; } + private: + ManuallyOpenedTestDirectorySetterUpper metadb_; + scoped_ptr<MockConnectionManager> connection_; + scoped_ptr<AllStatus> allstatus_; + scoped_refptr<SyncerThread> syncer_thread_; + DISALLOW_COPY_AND_ASSIGN(SyncerThreadWithSyncerTest); +}; + +class SyncShareIntercept : public MockConnectionManager::MidCommitObserver { + public: + SyncShareIntercept() : sync_occured_(false, false) {} + virtual ~SyncShareIntercept() {} + virtual void Observe() { + times_sync_occured_.push_back(Time::NowFromSystemTime()); + sync_occured_.Signal(); + } + void WaitForSyncShare(int at_least_this_many, TimeDelta max_wait) { + while (at_least_this_many-- > 0) + sync_occured_.TimedWait(max_wait); + } + std::vector<Time> times_sync_occured() const { + return times_sync_occured_; + } private: - DISALLOW_COPY_AND_ASSIGN(SyncerThreadTest); + std::vector<Time> times_sync_occured_; + base::WaitableEvent sync_occured_; + DISALLOW_COPY_AND_ASSIGN(SyncShareIntercept); }; TEST_F(SyncerThreadTest, Construction) { - SyncerThread syncer_thread(NULL, NULL, NULL, NULL, NULL); + scoped_refptr<SyncerThread> syncer_thread( + SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL)); +} + +TEST_F(SyncerThreadTest, StartStop) { + scoped_refptr<SyncerThread> syncer_thread( + SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL)); + EXPECT_TRUE(syncer_thread->Start()); + EXPECT_TRUE(syncer_thread->Stop(2000)); + + // Do it again for good measure. I caught some bugs by adding this so + // I would recommend keeping it. + EXPECT_TRUE(syncer_thread->Start()); + EXPECT_TRUE(syncer_thread->Stop(2000)); } TEST_F(SyncerThreadTest, CalculateSyncWaitTime) { - SyncerThread syncer_thread(NULL, NULL, NULL, NULL, NULL); - syncer_thread.DisableIdleDetection(); + scoped_refptr<SyncerThread> syncer_thread( + SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL)); + syncer_thread->DisableIdleDetection(); + + // Syncer_polling_interval_ is less than max poll interval. + TimeDelta syncer_polling_interval = TimeDelta::FromSeconds(1); - // Syncer_polling_interval_ is less than max poll interval - int syncer_polling_interval = 1; // Needed since AssertionResult is not a - // friend of SyncerThread - syncer_thread.syncer_polling_interval_ = syncer_polling_interval; + syncer_thread->SetSyncerPollingInterval(syncer_polling_interval); // user_idle_ms is less than 10 * (syncer_polling_interval*1000). - ASSERT_EQ(syncer_polling_interval * 1000, - syncer_thread.CalculateSyncWaitTime(1000, 0)); - ASSERT_EQ(syncer_polling_interval * 1000, - syncer_thread.CalculateSyncWaitTime(1000, 1)); + ASSERT_EQ(syncer_polling_interval.InMilliseconds(), + syncer_thread->CalculateSyncWaitTime(1000, 0)); + ASSERT_EQ(syncer_polling_interval.InMilliseconds(), + syncer_thread->CalculateSyncWaitTime(1000, 1)); // user_idle_ms is ge than 10 * (syncer_polling_interval*1000). int last_poll_time = 2000; ASSERT_LE(last_poll_time, - syncer_thread.CalculateSyncWaitTime(last_poll_time, 10000)); + syncer_thread->CalculateSyncWaitTime(last_poll_time, 10000)); ASSERT_GE(last_poll_time*3, - syncer_thread.CalculateSyncWaitTime(last_poll_time, 10000)); + syncer_thread->CalculateSyncWaitTime(last_poll_time, 10000)); ASSERT_LE(last_poll_time, - syncer_thread.CalculateSyncWaitTime(last_poll_time, 100000)); + syncer_thread->CalculateSyncWaitTime(last_poll_time, 100000)); ASSERT_GE(last_poll_time*3, - syncer_thread.CalculateSyncWaitTime(last_poll_time, 100000)); + syncer_thread->CalculateSyncWaitTime(last_poll_time, 100000)); // Maximum backoff time should be syncer_max_interval. int near_threshold = SyncerThread::kDefaultMaxPollIntervalMs / 2 - 1; int threshold = SyncerThread::kDefaultMaxPollIntervalMs; int over_threshold = SyncerThread::kDefaultMaxPollIntervalMs + 1; ASSERT_LE(near_threshold, - syncer_thread.CalculateSyncWaitTime(near_threshold, 10000)); + syncer_thread->CalculateSyncWaitTime(near_threshold, 10000)); ASSERT_GE(SyncerThread::kDefaultMaxPollIntervalMs, - syncer_thread.CalculateSyncWaitTime(near_threshold, 10000)); + syncer_thread->CalculateSyncWaitTime(near_threshold, 10000)); ASSERT_EQ(SyncerThread::kDefaultMaxPollIntervalMs, - syncer_thread.CalculateSyncWaitTime(threshold, 10000)); + syncer_thread->CalculateSyncWaitTime(threshold, 10000)); ASSERT_EQ(SyncerThread::kDefaultMaxPollIntervalMs, - syncer_thread.CalculateSyncWaitTime(over_threshold, 10000)); + syncer_thread->CalculateSyncWaitTime(over_threshold, 10000)); // Possible idle time must be capped by syncer_max_interval. int over_sync_max_interval = SyncerThread::kDefaultMaxPollIntervalMs + 1; - syncer_polling_interval = over_sync_max_interval / 100; // so 1000* is right - syncer_thread.syncer_polling_interval_ = syncer_polling_interval; - ASSERT_EQ(syncer_polling_interval * 1000, - syncer_thread.CalculateSyncWaitTime(1000, over_sync_max_interval)); - syncer_polling_interval = 1; - syncer_thread.syncer_polling_interval_ = syncer_polling_interval; + syncer_polling_interval = TimeDelta::FromSeconds( + over_sync_max_interval / 100); // so 1000* is right + syncer_thread->SetSyncerPollingInterval(syncer_polling_interval); + ASSERT_EQ(syncer_polling_interval.InSeconds() * 1000, + syncer_thread->CalculateSyncWaitTime(1000, over_sync_max_interval)); + syncer_polling_interval = TimeDelta::FromSeconds(1); + syncer_thread->SetSyncerPollingInterval(syncer_polling_interval); ASSERT_LE(last_poll_time, - syncer_thread.CalculateSyncWaitTime(last_poll_time, + syncer_thread->CalculateSyncWaitTime(last_poll_time, over_sync_max_interval)); ASSERT_GE(last_poll_time * 3, - syncer_thread.CalculateSyncWaitTime(last_poll_time, + syncer_thread->CalculateSyncWaitTime(last_poll_time, over_sync_max_interval)); } TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // Set up the environment. int user_idle_milliseconds_param = 0; - - SyncerThread syncer_thread(NULL, NULL, NULL, NULL, NULL); - syncer_thread.DisableIdleDetection(); + scoped_refptr<SyncerThread> syncer_thread( + SyncerThreadFactory::Create(NULL, NULL, NULL, NULL, NULL)); + syncer_thread->DisableIdleDetection(); // Notifications disabled should result in a polling interval of // kDefaultShortPollInterval. @@ -100,7 +169,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // No work and no backoff. ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -110,7 +179,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // In this case the continue_sync_cycle is turned off. continue_sync_cycle_param = true; ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -131,7 +200,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // No work and no backoff. ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -141,7 +210,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // In this case the continue_sync_cycle is turned off. continue_sync_cycle_param = true; ASSERT_EQ(SyncerThread::kDefaultLongPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -162,7 +231,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { status.updates_received = 0; bool continue_sync_cycle_param = false; - ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime( + ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -170,19 +239,19 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { ASSERT_TRUE(continue_sync_cycle_param); continue_sync_cycle_param = false; - ASSERT_GE(3, syncer_thread.CalculatePollingWaitTime( + ASSERT_GE(3, syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, &continue_sync_cycle_param)); ASSERT_TRUE(continue_sync_cycle_param); - ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime( + ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, &continue_sync_cycle_param)); - ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime( + ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -191,7 +260,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { status.updates_received = 1; ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 10, &user_idle_milliseconds_param, @@ -204,7 +273,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { status.unsynced_count = 1; bool continue_sync_cycle_param = false; - ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime( + ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -212,7 +281,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { ASSERT_TRUE(continue_sync_cycle_param); continue_sync_cycle_param = false; - ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime( + ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime( status, 0, &user_idle_milliseconds_param, @@ -221,7 +290,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { status.unsynced_count = 0; ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 4, &user_idle_milliseconds_param, @@ -237,7 +306,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // Expect move from default polling interval to exponential backoff due to // unsynced_count != 0. - ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime( + ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime( status, 3600, &user_idle_milliseconds_param, @@ -245,7 +314,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { ASSERT_TRUE(continue_sync_cycle_param); continue_sync_cycle_param = false; - ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime( + ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime( status, 3600, &user_idle_milliseconds_param, @@ -253,12 +322,12 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { ASSERT_TRUE(continue_sync_cycle_param); // Expect exponential backoff. - ASSERT_LE(2, syncer_thread.CalculatePollingWaitTime( + ASSERT_LE(2, syncer_thread->CalculatePollingWaitTime( status, 2, &user_idle_milliseconds_param, &continue_sync_cycle_param)); - ASSERT_GE(6, syncer_thread.CalculatePollingWaitTime( + ASSERT_GE(6, syncer_thread->CalculatePollingWaitTime( status, 2, &user_idle_milliseconds_param, @@ -268,7 +337,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // A nudge resets the continue_sync_cycle_param value, so our backoff // should return to the minimum. continue_sync_cycle_param = false; - ASSERT_LE(0, syncer_thread.CalculatePollingWaitTime( + ASSERT_LE(0, syncer_thread->CalculatePollingWaitTime( status, 3600, &user_idle_milliseconds_param, @@ -276,7 +345,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { ASSERT_TRUE(continue_sync_cycle_param); continue_sync_cycle_param = false; - ASSERT_GE(2, syncer_thread.CalculatePollingWaitTime( + ASSERT_GE(2, syncer_thread->CalculatePollingWaitTime( status, 3600, &user_idle_milliseconds_param, @@ -286,7 +355,7 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { // Setting unsynced_count = 0 returns us to the default polling interval. status.unsynced_count = 0; ASSERT_EQ(SyncerThread::kDefaultShortPollIntervalSeconds, - syncer_thread.CalculatePollingWaitTime( + syncer_thread->CalculatePollingWaitTime( status, 4, &user_idle_milliseconds_param, @@ -295,4 +364,67 @@ TEST_F(SyncerThreadTest, CalculatePollingWaitTime) { } } +TEST_F(SyncerThreadWithSyncerTest, Polling) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + + const TimeDelta poll_interval = TimeDelta::FromSeconds(1); + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + EXPECT_TRUE(syncer_thread()->Start()); + + // Calling Open() should cause the SyncerThread to create a Syncer. + metadb()->Open(); + + TimeDelta two_polls = poll_interval + poll_interval; + // We could theoretically return immediately from the wait if the interceptor + // was already signaled for a SyncShare (the first one comes quick). + interceptor.WaitForSyncShare(1, two_polls); + EXPECT_FALSE(interceptor.times_sync_occured().empty()); + + // Wait for at least 2 more SyncShare operations. + interceptor.WaitForSyncShare(2, two_polls); + EXPECT_TRUE(syncer_thread()->Stop(2000)); + + // Now analyze the run. + std::vector<Time> data = interceptor.times_sync_occured(); + + EXPECT_GE(data.size(), static_cast<unsigned int>(3)); + for (unsigned int i = 0; i < data.size() - 1; i++) { + Time optimal_next_sync = data[i] + poll_interval; + // The pthreads impl uses a different time impl and is slightly (~900usecs) + // off, so this expectation can fail with --syncer-thread-pthreads. + EXPECT_TRUE(data[i + 1] >= optimal_next_sync) + << "difference is " + << (data[i + 1] - optimal_next_sync).InMicroseconds() << " usecs. " + << "~900usec delta is OK with --syncer-thread-pthreads"; + // This should be reliable, as there are no blocking or I/O operations + // except the explicit 2 second wait, so if it takes longer than this + // there is a problem. + EXPECT_TRUE(data[i + 1] < optimal_next_sync + poll_interval); + } +} + +TEST_F(SyncerThreadWithSyncerTest, Nudge) { + SyncShareIntercept interceptor; + connection()->SetMidCommitObserver(&interceptor); + // We don't want a poll to happen during this test (except the first one). + const TimeDelta poll_interval = TimeDelta::FromMinutes(5); + syncer_thread()->SetSyncerShortPollInterval(poll_interval); + EXPECT_TRUE(syncer_thread()->Start()); + metadb()->Open(); + interceptor.WaitForSyncShare(1, poll_interval + poll_interval); + + EXPECT_EQ(static_cast<unsigned int>(1), + interceptor.times_sync_occured().size()); + // The SyncerThread should be waiting for the poll now. Nudge it to sync + // immediately (5ms). + syncer_thread()->NudgeSyncer(5, SyncerThread::kUnknown); + interceptor.WaitForSyncShare(1, TimeDelta::FromSeconds(1)); + EXPECT_EQ(static_cast<unsigned int>(2), + interceptor.times_sync_occured().size()); + + // SyncerThread should be waiting again. Signal it to stop. + EXPECT_TRUE(syncer_thread()->Stop(2000)); +} + } // namespace browser_sync |