diff options
author | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-09-25 23:09:36 +0000 |
---|---|---|
committer | tim@chromium.org <tim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-09-25 23:09:36 +0000 |
commit | 2186fec4df1dde813a12011eb1ea650ce8470ea2 (patch) | |
tree | 96ebaec4e2e3bf246f0730b7093d153011fbbd7b /chrome/browser/sync/engine/syncer_thread.cc | |
parent | b17b3c8b37c0acd4528a338583f851a5a10063b0 (diff) | |
download | chromium_src-2186fec4df1dde813a12011eb1ea650ce8470ea2.zip chromium_src-2186fec4df1dde813a12011eb1ea650ce8470ea2.tar.gz chromium_src-2186fec4df1dde813a12011eb1ea650ce8470ea2.tar.bz2 |
Second attempt at the new syncer thread impl, now with less crashes!
Previous one at http://codereview.chromium.org/214033.
I had local edits that resulted in initializing the CommandLine for syncapi,
but didn't have them as part of the patch, so this was causing a crash whenever
SyncerThreadFactory::Create was called. The only diff here is the call to
CommandLine::Init in syncapi.cc. This effectively means you can't change the
syncer thread impl on linux (we init an empty command line there), but this is
OK. Once we link statically we won't need to do this.
TEST=ProfileSyncServiceTest, SyncerThreadTest, SyncerThreadWithSyncerTest
Review URL: http://codereview.chromium.org/250001
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@27281 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/engine/syncer_thread.cc')
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread.cc | 317 |
1 files changed, 199 insertions, 118 deletions
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc index a53ab93..0ef436b 100644 --- a/chrome/browser/sync/engine/syncer_thread.cc +++ b/chrome/browser/sync/engine/syncer_thread.cc @@ -1,7 +1,6 @@ // Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. - #include "chrome/browser/sync/engine/syncer_thread.h" #include "build/build_config.h" @@ -16,20 +15,23 @@ #include <map> #include <queue> +#include "base/command_line.h" #include "chrome/browser/sync/engine/auth_watcher.h" #include "chrome/browser/sync/engine/model_safe_worker.h" #include "chrome/browser/sync/engine/net/server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" +#include "chrome/browser/sync/engine/syncer_thread_pthreads.h" +#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" #include "chrome/browser/sync/notifier/listener/talk_mediator.h" #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" #include "chrome/browser/sync/syncable/directory_manager.h" +#include "chrome/common/chrome_switches.h" using std::priority_queue; using std::min; - -static inline bool operator < (const timespec& a, const timespec& b) { - return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; -} +using base::Time; +using base::TimeDelta; +using base::TimeTicks; namespace { @@ -108,17 +110,56 @@ int UserIdleTime() { namespace browser_sync { +SyncerThread* SyncerThreadFactory::Create( + ClientCommandChannel* command_channel, + syncable::DirectoryManager* mgr, + ServerConnectionManager* connection_manager, AllStatus* all_status, + ModelSafeWorker* model_safe_worker) { + const CommandLine* cmd = CommandLine::ForCurrentProcess(); + if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) { + return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, + all_status, model_safe_worker); + } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) { + return new SyncerThreadPthreads(command_channel, mgr, connection_manager, + all_status, model_safe_worker); + } else { + // The default SyncerThread implementation, which does not time-out when + // Stop is called. + return new SyncerThread(command_channel, mgr, connection_manager, + all_status, model_safe_worker); + } +} + bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { - MutexLock lock(&mutex_); - if (syncer_ == NULL) { + AutoLock lock(lock_); + if (vault_.syncer_ == NULL) { return false; } NudgeSyncImpl(milliseconds_from_now, source); return true; } -void* RunSyncerThread(void* syncer_thread) { - return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); +SyncerThread::SyncerThread() + : thread_main_started_(false, false), + thread_("SyncEngine_SyncerThread"), + vault_field_changed_(&lock_), + p2p_authenticated_(false), + p2p_subscribed_(false), + client_command_hookup_(NULL), + conn_mgr_hookup_(NULL), + allstatus_(NULL), + dirman_(NULL), + scm_(NULL), + syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), + syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), + syncer_polling_interval_(kDefaultShortPollIntervalSeconds), + syncer_max_interval_(kDefaultMaxPollIntervalMs), + talk_mediator_hookup_(NULL), + command_channel_(NULL), + directory_manager_hookup_(NULL), + syncer_events_(NULL), + model_safe_worker_(NULL), + disable_idle_detection_(false) { } SyncerThread::SyncerThread( @@ -127,15 +168,14 @@ SyncerThread::SyncerThread( ServerConnectionManager* connection_manager, AllStatus* all_status, ModelSafeWorker* model_safe_worker) - : stop_syncer_thread_(false), - thread_running_(false), - connected_(false), + : thread_main_started_(false, false), + thread_("SyncEngine_SyncerThread"), + vault_field_changed_(&lock_), p2p_authenticated_(false), p2p_subscribed_(false), client_command_hookup_(NULL), conn_mgr_hookup_(NULL), allstatus_(all_status), - syncer_(NULL), dirman_(mgr), scm_(connection_manager), syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), @@ -172,55 +212,76 @@ SyncerThread::~SyncerThread() { syncer_event_channel_.reset(); directory_manager_hookup_.reset(); syncer_events_.reset(); - delete syncer_; + delete vault_.syncer_; talk_mediator_hookup_.reset(); - CHECK(!thread_running_); + CHECK(!thread_.IsRunning()); } // Creates and starts a syncer thread. // Returns true if it creates a thread or if there's currently a thread running // and false otherwise. bool SyncerThread::Start() { - MutexLock lock(&mutex_); - if (thread_running_) { - return true; - } - thread_running_ = - (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); - if (thread_running_) { - pthread_detach(thread_); + { + AutoLock lock(lock_); + if (thread_.IsRunning()) { + return true; + } + + if (!thread_.Start()) { + return false; + } } - return thread_running_; + + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, + &SyncerThread::ThreadMain)); + + // Wait for notification that our task makes it safely onto the message + // loop before returning, so the caller can't call Stop before we're + // actually up and running. This is for consistency with the old pthread + // impl because pthread_create would do this in one step. + thread_main_started_.Wait(); + LOG(INFO) << "SyncerThread started."; + return true; } // Stop processing. A max wait of at least 2*server RTT time is recommended. // Returns true if we stopped, false otherwise. bool SyncerThread::Stop(int max_wait) { - MutexLock lock(&mutex_); - if (!thread_running_) - return true; - stop_syncer_thread_ = true; - if (NULL != syncer_) { - // Try to early exit the syncer. - syncer_->RequestEarlyExit(); - } - pthread_cond_broadcast(&changed_.condvar_); - timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; - do { - const int wait_result = max_wait < 0 ? - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : - pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, - &deadline); - if (ETIMEDOUT == wait_result) { - LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; - return false; + { + AutoLock lock(lock_); + // If the thread has been started, then we either already have or are about + // to enter ThreadMainLoop so we have to proceed with shutdown and wait for + // it to finish. If the thread has not been started --and we now own the + // lock-- then we can early out because the caller has not called Start(). + if (!thread_.IsRunning()) + return true; + + LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " + << "true (vault_.stop_syncer_thread_)"; + // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit + // below). + vault_.stop_syncer_thread_ = true; + if (NULL != vault_.syncer_) { + // Try to early exit the syncer itself, which could be looping inside + // SyncShare. + vault_.syncer_->RequestEarlyExit(); } - } while (thread_running_); + + // stop_syncer_thread_ is now true and the Syncer has been told to exit. + // We want to wake up all waiters so they can re-examine state. We signal, + // causing all waiters to try to re-acquire the lock, and then we release + // the lock, and join on our internal thread which should soon run off the + // end of ThreadMain. + vault_field_changed_.Broadcast(); + } + + // This will join, and finish when ThreadMain terminates. + thread_.Stop(); return true; } void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { - PThreadScopedLock<PThreadMutex> lock(&mutex_); + AutoLock lock(lock_); client_command_hookup_.reset(NewEventListenerHookup(channel, this, &SyncerThread::HandleClientCommand)); } @@ -241,67 +302,89 @@ void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { } void SyncerThread::ThreadMainLoop() { + // This is called with lock_ acquired. + lock_.AssertAcquired(); + LOG(INFO) << "In thread main loop."; + // Use the short poll value by default. - int poll_seconds = syncer_short_poll_interval_seconds_; + TimeDelta poll_seconds = + TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); int user_idle_milliseconds = 0; - timespec last_sync_time = { 0 }; + TimeTicks last_sync_time; bool initial_sync_for_thread = true; bool continue_sync_cycle = false; - while (!stop_syncer_thread_) { - if (!connected_) { + while (!vault_.stop_syncer_thread_) { + // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as + // below) because we cannot poll until these conditions are met, so we wait + // indefinitely. + if (!vault_.connected_) { LOG(INFO) << "Syncer thread waiting for connection."; - while (!connected_ && !stop_syncer_thread_) - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); - LOG_IF(INFO, connected_) << "Syncer thread found connection."; + while (!vault_.connected_ && !vault_.stop_syncer_thread_) + vault_field_changed_.Wait(); + LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; continue; } - if (syncer_ == NULL) { + if (vault_.syncer_ == NULL) { LOG(INFO) << "Syncer thread waiting for database initialization."; - while (syncer_ == NULL && !stop_syncer_thread_) - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); - LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; + while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) + vault_field_changed_.Wait(); + LOG_IF(INFO, !(vault_.syncer_ == NULL)) + << "Syncer was found after DB started."; continue; } - timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, - last_sync_time.tv_nsec }; - const timespec wake_time = - !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? - nudge_queue_.top().first : next_poll; - LOG(INFO) << "wake time is " << wake_time.tv_sec; - LOG(INFO) << "next poll is " << next_poll.tv_sec; - - const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, - &wake_time); - if (ETIMEDOUT != error) { - continue; // Check all the conditions again. + const TimeTicks next_poll = last_sync_time + poll_seconds; + const TimeTicks end_wait = + !vault_.nudge_queue_.empty() && + vault_.nudge_queue_.top().first < next_poll ? + vault_.nudge_queue_.top().first : next_poll; + LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); + LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); + + // We block until the CV is signaled (e.g a control field changed, loss of + // network connection, nudge, spurious, etc), or the poll interval elapses. + TimeDelta sleep_time = end_wait - TimeTicks::Now(); + if (sleep_time > TimeDelta::FromSeconds(0)) { + vault_field_changed_.TimedWait(sleep_time); + + if (TimeTicks::Now() < end_wait) { + // Didn't timeout. Could be a spurious signal, or a signal corresponding + // to an actual change in one of our control fields. By continuing here + // we perform the typical "always recheck conditions when signaled", + // (typically handled by a while(condition_not_met) cv.wait() construct) + // because we jump to the top of the loop. The main difference is we + // recalculate the wait interval, but last_sync_time won't have changed. + // So if we were signaled by a nudge (for ex.) we'll grab the new nudge + // off the queue and wait for that delta. If it was a spurious signal, + // we'll keep waiting for the same moment in time as we just were. + continue; + } } - const timespec now = GetPThreadAbsoluteTime(0); - // Handle a nudge, caused by either a notification or a local bookmark // event. This will also update the source of the following SyncMain call. - UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); + UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); - LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; - SyncMain(syncer_); - last_sync_time = now; + LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); + SyncMain(vault_.syncer_); + last_sync_time = TimeTicks::Now(); LOG(INFO) << "Updating the next polling time after SyncMain"; - poll_seconds = CalculatePollingWaitTime(allstatus_->status(), - poll_seconds, - &user_idle_milliseconds, - &continue_sync_cycle); + poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( + allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), + &user_idle_milliseconds, &continue_sync_cycle)); } + } // We check how long the user's been idle and sync less often if the machine is // not in use. The aim is to reduce server load. +// TODO(timsteele): Should use Time(Delta). int SyncerThread::CalculatePollingWaitTime( const AllStatus::Status& status, - int last_poll_wait, // in s + int last_poll_wait, // Time in seconds. int* user_idle_milliseconds, bool* continue_sync_cycle) { bool is_continuing_sync_cyle = *continue_sync_cycle; @@ -352,30 +435,25 @@ int SyncerThread::CalculatePollingWaitTime( return actual_next_wait; } -void* SyncerThread::ThreadMain() { - NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); - mutex_.Lock(); +void SyncerThread::ThreadMain() { + AutoLock lock(lock_); + // Signal Start() to let it know we've made it safely onto the message loop, + // and unblock it's caller. + thread_main_started_.Signal(); ThreadMainLoop(); - thread_running_ = false; - pthread_cond_broadcast(&changed_.condvar_); - mutex_.Unlock(); - LOG(INFO) << "Syncer thread exiting."; - return 0; + LOG(INFO) << "Syncer thread ThreadMain is done."; } void SyncerThread::SyncMain(Syncer* syncer) { CHECK(syncer); - mutex_.Unlock(); + AutoUnlock unlock(lock_); while (syncer->SyncShare()) { LOG(INFO) << "Looping in sync share"; } LOG(INFO) << "Done looping in sync share"; - - mutex_.Lock(); } -void SyncerThread::UpdateNudgeSource(const timespec& now, - bool* continue_sync_cycle, +void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, bool* initial_sync) { bool nudged = false; NudgeSource nudge_source = kUnknown; @@ -385,13 +463,14 @@ void SyncerThread::UpdateNudgeSource(const timespec& now, } // Update the nudge source if a new nudge has come through during the // previous sync cycle. - while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { + while (!vault_.nudge_queue_.empty() && + TimeTicks::Now() >= vault_.nudge_queue_.top().first) { if (!nudged) { - nudge_source = nudge_queue_.top().second; + nudge_source = vault_.nudge_queue_.top().second; *continue_sync_cycle = false; // Reset the continuation token on nudge. nudged = true; } - nudge_queue_.pop(); + vault_.nudge_queue_.pop(); } SetUpdatesSource(nudged, nudge_source, initial_sync); } @@ -422,11 +501,11 @@ void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, break; } } - syncer_->set_updates_source(updates_source); + vault_.syncer_->set_updates_source(updates_source); } void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { - MutexLock lock(&mutex_); + AutoLock lock(lock_); channel()->NotifyListeners(event); if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { return; @@ -438,33 +517,33 @@ void SyncerThread::HandleDirectoryManagerEvent( const syncable::DirectoryManagerEvent& event) { LOG(INFO) << "Handling a directory manager event"; if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { - MutexLock lock(&mutex_); + AutoLock lock(lock_); LOG(INFO) << "Syncer starting up for: " << event.dirname; // The underlying database structure is ready, and we should create // the syncer. - CHECK(syncer_ == NULL); - syncer_ = + CHECK(vault_.syncer_ == NULL); + vault_.syncer_ = new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); - syncer_->set_command_channel(command_channel_); + vault_.syncer_->set_command_channel(command_channel_); syncer_events_.reset(NewEventListenerHookup( - syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); - pthread_cond_broadcast(&changed_.condvar_); + vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); + vault_field_changed_.Broadcast(); } } static inline void CheckConnected(bool* connected, HttpResponse::ServerConnectionCode code, - pthread_cond_t* condvar) { + ConditionVariable* condvar) { if (*connected) { if (HttpResponse::CONNECTION_UNAVAILABLE == code) { *connected = false; - pthread_cond_broadcast(condvar); + condvar->Broadcast(); } } else { if (HttpResponse::SERVER_CONNECTION_OK == code) { *connected = true; - pthread_cond_broadcast(condvar); + condvar->Broadcast(); } } } @@ -472,16 +551,16 @@ static inline void CheckConnected(bool* connected, void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, &SyncerThread::HandleServerConnectionEvent)); - CheckConnected(&connected_, conn_mgr->server_status(), - &changed_.condvar_); + CheckConnected(&vault_.connected_, conn_mgr->server_status(), + &vault_field_changed_); } void SyncerThread::HandleServerConnectionEvent( const ServerConnectionEvent& event) { if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { - MutexLock lock(&mutex_); - CheckConnected(&connected_, event.connection_code, - &changed_.condvar_); + AutoLock lock(lock_); + CheckConnected(&vault_.connected_, event.connection_code, + &vault_field_changed_); } } @@ -513,10 +592,11 @@ int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { // Called with mutex_ already locked. void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, NudgeSource source) { - const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); + const TimeTicks nudge_time = TimeTicks::Now() + + TimeDelta::FromMilliseconds(milliseconds_from_now); NudgeObject nudge_object(nudge_time, source); - nudge_queue_.push(nudge_object); - pthread_cond_broadcast(&changed_.condvar_); + vault_.nudge_queue_.push(nudge_object); + vault_field_changed_.Broadcast(); } void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { @@ -528,7 +608,7 @@ void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { } void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { - MutexLock lock(&mutex_); + AutoLock lock(lock_); switch (event.what_happened) { case TalkMediatorEvent::LOGIN_SUCCEEDED: LOG(INFO) << "P2P: Login succeeded."; @@ -541,7 +621,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { case TalkMediatorEvent::SUBSCRIPTIONS_ON: LOG(INFO) << "P2P: Subscriptions successfully enabled."; p2p_subscribed_ = true; - if (NULL != syncer_) { + if (NULL != vault_.syncer_) { LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; NudgeSyncImpl(0, kLocal); } @@ -552,7 +632,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { break; case TalkMediatorEvent::NOTIFICATION_RECEIVED: LOG(INFO) << "P2P: Updates on server, pushing syncer"; - if (NULL != syncer_) { + if (NULL != vault_.syncer_) { NudgeSyncImpl(0, kNotification); } break; @@ -560,8 +640,9 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { break; } - if (NULL != syncer_) { - syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); + if (NULL != vault_.syncer_) { + vault_.syncer_->set_notifications_enabled( + p2p_authenticated_ && p2p_subscribed_); } } |