diff options
author | davemoore@chromium.org <davemoore@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-09-24 21:05:36 +0000 |
---|---|---|
committer | davemoore@chromium.org <davemoore@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-09-24 21:05:36 +0000 |
commit | 506cb1db9a466aad6b8d3fdce9cb94eb54516e47 (patch) | |
tree | 8959bc3c7ab135de91fa746bf416603cc6c98bdd /chrome/browser/sync/engine/syncer_thread.cc | |
parent | 7b3c0312746e6f21478a01c3d56db4714b48618f (diff) | |
download | chromium_src-506cb1db9a466aad6b8d3fdce9cb94eb54516e47.zip chromium_src-506cb1db9a466aad6b8d3fdce9cb94eb54516e47.tar.gz chromium_src-506cb1db9a466aad6b8d3fdce9cb94eb54516e47.tar.bz2 |
Reverting 27117.
Review URL: http://codereview.chromium.org/235010
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@27124 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/engine/syncer_thread.cc')
-rw-r--r-- | chrome/browser/sync/engine/syncer_thread.cc | 317 |
1 files changed, 118 insertions, 199 deletions
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc index 0ef436b..a53ab93 100644 --- a/chrome/browser/sync/engine/syncer_thread.cc +++ b/chrome/browser/sync/engine/syncer_thread.cc @@ -1,6 +1,7 @@ // Copyright (c) 2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. + #include "chrome/browser/sync/engine/syncer_thread.h" #include "build/build_config.h" @@ -15,23 +16,20 @@ #include <map> #include <queue> -#include "base/command_line.h" #include "chrome/browser/sync/engine/auth_watcher.h" #include "chrome/browser/sync/engine/model_safe_worker.h" #include "chrome/browser/sync/engine/net/server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" -#include "chrome/browser/sync/engine/syncer_thread_pthreads.h" -#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" #include "chrome/browser/sync/notifier/listener/talk_mediator.h" #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" #include "chrome/browser/sync/syncable/directory_manager.h" -#include "chrome/common/chrome_switches.h" using std::priority_queue; using std::min; -using base::Time; -using base::TimeDelta; -using base::TimeTicks; + +static inline bool operator < (const timespec& a, const timespec& b) { + return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; +} namespace { @@ -110,56 +108,17 @@ int UserIdleTime() { namespace browser_sync { -SyncerThread* SyncerThreadFactory::Create( - ClientCommandChannel* command_channel, - syncable::DirectoryManager* mgr, - ServerConnectionManager* connection_manager, AllStatus* all_status, - ModelSafeWorker* model_safe_worker) { - const CommandLine* cmd = CommandLine::ForCurrentProcess(); - if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) { - return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, - all_status, model_safe_worker); - } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) { - return new SyncerThreadPthreads(command_channel, mgr, connection_manager, - all_status, model_safe_worker); - } else { - // The default SyncerThread implementation, which does not time-out when - // Stop is called. - return new SyncerThread(command_channel, mgr, connection_manager, - all_status, model_safe_worker); - } -} - bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { - AutoLock lock(lock_); - if (vault_.syncer_ == NULL) { + MutexLock lock(&mutex_); + if (syncer_ == NULL) { return false; } NudgeSyncImpl(milliseconds_from_now, source); return true; } -SyncerThread::SyncerThread() - : thread_main_started_(false, false), - thread_("SyncEngine_SyncerThread"), - vault_field_changed_(&lock_), - p2p_authenticated_(false), - p2p_subscribed_(false), - client_command_hookup_(NULL), - conn_mgr_hookup_(NULL), - allstatus_(NULL), - dirman_(NULL), - scm_(NULL), - syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), - syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), - syncer_polling_interval_(kDefaultShortPollIntervalSeconds), - syncer_max_interval_(kDefaultMaxPollIntervalMs), - talk_mediator_hookup_(NULL), - command_channel_(NULL), - directory_manager_hookup_(NULL), - syncer_events_(NULL), - model_safe_worker_(NULL), - disable_idle_detection_(false) { +void* RunSyncerThread(void* syncer_thread) { + return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); } SyncerThread::SyncerThread( @@ -168,14 +127,15 @@ SyncerThread::SyncerThread( ServerConnectionManager* connection_manager, AllStatus* all_status, ModelSafeWorker* model_safe_worker) - : thread_main_started_(false, false), - thread_("SyncEngine_SyncerThread"), - vault_field_changed_(&lock_), + : stop_syncer_thread_(false), + thread_running_(false), + connected_(false), p2p_authenticated_(false), p2p_subscribed_(false), client_command_hookup_(NULL), conn_mgr_hookup_(NULL), allstatus_(all_status), + syncer_(NULL), dirman_(mgr), scm_(connection_manager), syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), @@ -212,76 +172,55 @@ SyncerThread::~SyncerThread() { syncer_event_channel_.reset(); directory_manager_hookup_.reset(); syncer_events_.reset(); - delete vault_.syncer_; + delete syncer_; talk_mediator_hookup_.reset(); - CHECK(!thread_.IsRunning()); + CHECK(!thread_running_); } // Creates and starts a syncer thread. // Returns true if it creates a thread or if there's currently a thread running // and false otherwise. bool SyncerThread::Start() { - { - AutoLock lock(lock_); - if (thread_.IsRunning()) { - return true; - } - - if (!thread_.Start()) { - return false; - } + MutexLock lock(&mutex_); + if (thread_running_) { + return true; } - - thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, - &SyncerThread::ThreadMain)); - - // Wait for notification that our task makes it safely onto the message - // loop before returning, so the caller can't call Stop before we're - // actually up and running. This is for consistency with the old pthread - // impl because pthread_create would do this in one step. - thread_main_started_.Wait(); - LOG(INFO) << "SyncerThread started."; - return true; + thread_running_ = + (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); + if (thread_running_) { + pthread_detach(thread_); + } + return thread_running_; } // Stop processing. A max wait of at least 2*server RTT time is recommended. // Returns true if we stopped, false otherwise. bool SyncerThread::Stop(int max_wait) { - { - AutoLock lock(lock_); - // If the thread has been started, then we either already have or are about - // to enter ThreadMainLoop so we have to proceed with shutdown and wait for - // it to finish. If the thread has not been started --and we now own the - // lock-- then we can early out because the caller has not called Start(). - if (!thread_.IsRunning()) - return true; - - LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " - << "true (vault_.stop_syncer_thread_)"; - // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit - // below). - vault_.stop_syncer_thread_ = true; - if (NULL != vault_.syncer_) { - // Try to early exit the syncer itself, which could be looping inside - // SyncShare. - vault_.syncer_->RequestEarlyExit(); - } - - // stop_syncer_thread_ is now true and the Syncer has been told to exit. - // We want to wake up all waiters so they can re-examine state. We signal, - // causing all waiters to try to re-acquire the lock, and then we release - // the lock, and join on our internal thread which should soon run off the - // end of ThreadMain. - vault_field_changed_.Broadcast(); + MutexLock lock(&mutex_); + if (!thread_running_) + return true; + stop_syncer_thread_ = true; + if (NULL != syncer_) { + // Try to early exit the syncer. + syncer_->RequestEarlyExit(); } - - // This will join, and finish when ThreadMain terminates. - thread_.Stop(); + pthread_cond_broadcast(&changed_.condvar_); + timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; + do { + const int wait_result = max_wait < 0 ? + pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : + pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, + &deadline); + if (ETIMEDOUT == wait_result) { + LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; + return false; + } + } while (thread_running_); return true; } void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { - AutoLock lock(lock_); + PThreadScopedLock<PThreadMutex> lock(&mutex_); client_command_hookup_.reset(NewEventListenerHookup(channel, this, &SyncerThread::HandleClientCommand)); } @@ -302,89 +241,67 @@ void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) { } void SyncerThread::ThreadMainLoop() { - // This is called with lock_ acquired. - lock_.AssertAcquired(); - LOG(INFO) << "In thread main loop."; - // Use the short poll value by default. - TimeDelta poll_seconds = - TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); + int poll_seconds = syncer_short_poll_interval_seconds_; int user_idle_milliseconds = 0; - TimeTicks last_sync_time; + timespec last_sync_time = { 0 }; bool initial_sync_for_thread = true; bool continue_sync_cycle = false; - while (!vault_.stop_syncer_thread_) { - // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as - // below) because we cannot poll until these conditions are met, so we wait - // indefinitely. - if (!vault_.connected_) { + while (!stop_syncer_thread_) { + if (!connected_) { LOG(INFO) << "Syncer thread waiting for connection."; - while (!vault_.connected_ && !vault_.stop_syncer_thread_) - vault_field_changed_.Wait(); - LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; + while (!connected_ && !stop_syncer_thread_) + pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); + LOG_IF(INFO, connected_) << "Syncer thread found connection."; continue; } - if (vault_.syncer_ == NULL) { + if (syncer_ == NULL) { LOG(INFO) << "Syncer thread waiting for database initialization."; - while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) - vault_field_changed_.Wait(); - LOG_IF(INFO, !(vault_.syncer_ == NULL)) - << "Syncer was found after DB started."; + while (syncer_ == NULL && !stop_syncer_thread_) + pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); + LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; continue; } - const TimeTicks next_poll = last_sync_time + poll_seconds; - const TimeTicks end_wait = - !vault_.nudge_queue_.empty() && - vault_.nudge_queue_.top().first < next_poll ? - vault_.nudge_queue_.top().first : next_poll; - LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); - LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); - - // We block until the CV is signaled (e.g a control field changed, loss of - // network connection, nudge, spurious, etc), or the poll interval elapses. - TimeDelta sleep_time = end_wait - TimeTicks::Now(); - if (sleep_time > TimeDelta::FromSeconds(0)) { - vault_field_changed_.TimedWait(sleep_time); - - if (TimeTicks::Now() < end_wait) { - // Didn't timeout. Could be a spurious signal, or a signal corresponding - // to an actual change in one of our control fields. By continuing here - // we perform the typical "always recheck conditions when signaled", - // (typically handled by a while(condition_not_met) cv.wait() construct) - // because we jump to the top of the loop. The main difference is we - // recalculate the wait interval, but last_sync_time won't have changed. - // So if we were signaled by a nudge (for ex.) we'll grab the new nudge - // off the queue and wait for that delta. If it was a spurious signal, - // we'll keep waiting for the same moment in time as we just were. - continue; - } + timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, + last_sync_time.tv_nsec }; + const timespec wake_time = + !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? + nudge_queue_.top().first : next_poll; + LOG(INFO) << "wake time is " << wake_time.tv_sec; + LOG(INFO) << "next poll is " << next_poll.tv_sec; + + const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, + &wake_time); + if (ETIMEDOUT != error) { + continue; // Check all the conditions again. } + const timespec now = GetPThreadAbsoluteTime(0); + // Handle a nudge, caused by either a notification or a local bookmark // event. This will also update the source of the following SyncMain call. - UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); + UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); - LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); - SyncMain(vault_.syncer_); - last_sync_time = TimeTicks::Now(); + LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; + SyncMain(syncer_); + last_sync_time = now; LOG(INFO) << "Updating the next polling time after SyncMain"; - poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( - allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), - &user_idle_milliseconds, &continue_sync_cycle)); + poll_seconds = CalculatePollingWaitTime(allstatus_->status(), + poll_seconds, + &user_idle_milliseconds, + &continue_sync_cycle); } - } // We check how long the user's been idle and sync less often if the machine is // not in use. The aim is to reduce server load. -// TODO(timsteele): Should use Time(Delta). int SyncerThread::CalculatePollingWaitTime( const AllStatus::Status& status, - int last_poll_wait, // Time in seconds. + int last_poll_wait, // in s int* user_idle_milliseconds, bool* continue_sync_cycle) { bool is_continuing_sync_cyle = *continue_sync_cycle; @@ -435,25 +352,30 @@ int SyncerThread::CalculatePollingWaitTime( return actual_next_wait; } -void SyncerThread::ThreadMain() { - AutoLock lock(lock_); - // Signal Start() to let it know we've made it safely onto the message loop, - // and unblock it's caller. - thread_main_started_.Signal(); +void* SyncerThread::ThreadMain() { + NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); + mutex_.Lock(); ThreadMainLoop(); - LOG(INFO) << "Syncer thread ThreadMain is done."; + thread_running_ = false; + pthread_cond_broadcast(&changed_.condvar_); + mutex_.Unlock(); + LOG(INFO) << "Syncer thread exiting."; + return 0; } void SyncerThread::SyncMain(Syncer* syncer) { CHECK(syncer); - AutoUnlock unlock(lock_); + mutex_.Unlock(); while (syncer->SyncShare()) { LOG(INFO) << "Looping in sync share"; } LOG(INFO) << "Done looping in sync share"; + + mutex_.Lock(); } -void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, +void SyncerThread::UpdateNudgeSource(const timespec& now, + bool* continue_sync_cycle, bool* initial_sync) { bool nudged = false; NudgeSource nudge_source = kUnknown; @@ -463,14 +385,13 @@ void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, } // Update the nudge source if a new nudge has come through during the // previous sync cycle. - while (!vault_.nudge_queue_.empty() && - TimeTicks::Now() >= vault_.nudge_queue_.top().first) { + while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { if (!nudged) { - nudge_source = vault_.nudge_queue_.top().second; + nudge_source = nudge_queue_.top().second; *continue_sync_cycle = false; // Reset the continuation token on nudge. nudged = true; } - vault_.nudge_queue_.pop(); + nudge_queue_.pop(); } SetUpdatesSource(nudged, nudge_source, initial_sync); } @@ -501,11 +422,11 @@ void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, break; } } - vault_.syncer_->set_updates_source(updates_source); + syncer_->set_updates_source(updates_source); } void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { - AutoLock lock(lock_); + MutexLock lock(&mutex_); channel()->NotifyListeners(event); if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { return; @@ -517,33 +438,33 @@ void SyncerThread::HandleDirectoryManagerEvent( const syncable::DirectoryManagerEvent& event) { LOG(INFO) << "Handling a directory manager event"; if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { - AutoLock lock(lock_); + MutexLock lock(&mutex_); LOG(INFO) << "Syncer starting up for: " << event.dirname; // The underlying database structure is ready, and we should create // the syncer. - CHECK(vault_.syncer_ == NULL); - vault_.syncer_ = + CHECK(syncer_ == NULL); + syncer_ = new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); - vault_.syncer_->set_command_channel(command_channel_); + syncer_->set_command_channel(command_channel_); syncer_events_.reset(NewEventListenerHookup( - vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); - vault_field_changed_.Broadcast(); + syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); + pthread_cond_broadcast(&changed_.condvar_); } } static inline void CheckConnected(bool* connected, HttpResponse::ServerConnectionCode code, - ConditionVariable* condvar) { + pthread_cond_t* condvar) { if (*connected) { if (HttpResponse::CONNECTION_UNAVAILABLE == code) { *connected = false; - condvar->Broadcast(); + pthread_cond_broadcast(condvar); } } else { if (HttpResponse::SERVER_CONNECTION_OK == code) { *connected = true; - condvar->Broadcast(); + pthread_cond_broadcast(condvar); } } } @@ -551,16 +472,16 @@ static inline void CheckConnected(bool* connected, void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, &SyncerThread::HandleServerConnectionEvent)); - CheckConnected(&vault_.connected_, conn_mgr->server_status(), - &vault_field_changed_); + CheckConnected(&connected_, conn_mgr->server_status(), + &changed_.condvar_); } void SyncerThread::HandleServerConnectionEvent( const ServerConnectionEvent& event) { if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { - AutoLock lock(lock_); - CheckConnected(&vault_.connected_, event.connection_code, - &vault_field_changed_); + MutexLock lock(&mutex_); + CheckConnected(&connected_, event.connection_code, + &changed_.condvar_); } } @@ -592,11 +513,10 @@ int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) { // Called with mutex_ already locked. void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, NudgeSource source) { - const TimeTicks nudge_time = TimeTicks::Now() + - TimeDelta::FromMilliseconds(milliseconds_from_now); + const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); NudgeObject nudge_object(nudge_time, source); - vault_.nudge_queue_.push(nudge_object); - vault_field_changed_.Broadcast(); + nudge_queue_.push(nudge_object); + pthread_cond_broadcast(&changed_.condvar_); } void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { @@ -608,7 +528,7 @@ void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { } void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { - AutoLock lock(lock_); + MutexLock lock(&mutex_); switch (event.what_happened) { case TalkMediatorEvent::LOGIN_SUCCEEDED: LOG(INFO) << "P2P: Login succeeded."; @@ -621,7 +541,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { case TalkMediatorEvent::SUBSCRIPTIONS_ON: LOG(INFO) << "P2P: Subscriptions successfully enabled."; p2p_subscribed_ = true; - if (NULL != vault_.syncer_) { + if (NULL != syncer_) { LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; NudgeSyncImpl(0, kLocal); } @@ -632,7 +552,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { break; case TalkMediatorEvent::NOTIFICATION_RECEIVED: LOG(INFO) << "P2P: Updates on server, pushing syncer"; - if (NULL != vault_.syncer_) { + if (NULL != syncer_) { NudgeSyncImpl(0, kNotification); } break; @@ -640,9 +560,8 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { break; } - if (NULL != vault_.syncer_) { - vault_.syncer_->set_notifications_enabled( - p2p_authenticated_ && p2p_subscribed_); + if (NULL != syncer_) { + syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); } } |