summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync/engine/syncer_thread.cc
diff options
context:
space:
mode:
authordavemoore@chromium.org <davemoore@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-09-24 21:05:36 +0000
committerdavemoore@chromium.org <davemoore@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-09-24 21:05:36 +0000
commit506cb1db9a466aad6b8d3fdce9cb94eb54516e47 (patch)
tree8959bc3c7ab135de91fa746bf416603cc6c98bdd /chrome/browser/sync/engine/syncer_thread.cc
parent7b3c0312746e6f21478a01c3d56db4714b48618f (diff)
downloadchromium_src-506cb1db9a466aad6b8d3fdce9cb94eb54516e47.zip
chromium_src-506cb1db9a466aad6b8d3fdce9cb94eb54516e47.tar.gz
chromium_src-506cb1db9a466aad6b8d3fdce9cb94eb54516e47.tar.bz2
Reverting 27117.
Review URL: http://codereview.chromium.org/235010 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@27124 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/engine/syncer_thread.cc')
-rw-r--r--chrome/browser/sync/engine/syncer_thread.cc317
1 files changed, 118 insertions, 199 deletions
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc
index 0ef436b..a53ab93 100644
--- a/chrome/browser/sync/engine/syncer_thread.cc
+++ b/chrome/browser/sync/engine/syncer_thread.cc
@@ -1,6 +1,7 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+
#include "chrome/browser/sync/engine/syncer_thread.h"
#include "build/build_config.h"
@@ -15,23 +16,20 @@
#include <map>
#include <queue>
-#include "base/command_line.h"
#include "chrome/browser/sync/engine/auth_watcher.h"
#include "chrome/browser/sync/engine/model_safe_worker.h"
#include "chrome/browser/sync/engine/net/server_connection_manager.h"
#include "chrome/browser/sync/engine/syncer.h"
-#include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
-#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
#include "chrome/browser/sync/notifier/listener/talk_mediator.h"
#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
#include "chrome/browser/sync/syncable/directory_manager.h"
-#include "chrome/common/chrome_switches.h"
using std::priority_queue;
using std::min;
-using base::Time;
-using base::TimeDelta;
-using base::TimeTicks;
+
+static inline bool operator < (const timespec& a, const timespec& b) {
+ return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
+}
namespace {
@@ -110,56 +108,17 @@ int UserIdleTime() {
namespace browser_sync {
-SyncerThread* SyncerThreadFactory::Create(
- ClientCommandChannel* command_channel,
- syncable::DirectoryManager* mgr,
- ServerConnectionManager* connection_manager, AllStatus* all_status,
- ModelSafeWorker* model_safe_worker) {
- const CommandLine* cmd = CommandLine::ForCurrentProcess();
- if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) {
- return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
- all_status, model_safe_worker);
- } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) {
- return new SyncerThreadPthreads(command_channel, mgr, connection_manager,
- all_status, model_safe_worker);
- } else {
- // The default SyncerThread implementation, which does not time-out when
- // Stop is called.
- return new SyncerThread(command_channel, mgr, connection_manager,
- all_status, model_safe_worker);
- }
-}
-
bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
- AutoLock lock(lock_);
- if (vault_.syncer_ == NULL) {
+ MutexLock lock(&mutex_);
+ if (syncer_ == NULL) {
return false;
}
NudgeSyncImpl(milliseconds_from_now, source);
return true;
}
-SyncerThread::SyncerThread()
- : thread_main_started_(false, false),
- thread_("SyncEngine_SyncerThread"),
- vault_field_changed_(&lock_),
- p2p_authenticated_(false),
- p2p_subscribed_(false),
- client_command_hookup_(NULL),
- conn_mgr_hookup_(NULL),
- allstatus_(NULL),
- dirman_(NULL),
- scm_(NULL),
- syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
- syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
- syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
- syncer_max_interval_(kDefaultMaxPollIntervalMs),
- talk_mediator_hookup_(NULL),
- command_channel_(NULL),
- directory_manager_hookup_(NULL),
- syncer_events_(NULL),
- model_safe_worker_(NULL),
- disable_idle_detection_(false) {
+void* RunSyncerThread(void* syncer_thread) {
+ return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
}
SyncerThread::SyncerThread(
@@ -168,14 +127,15 @@ SyncerThread::SyncerThread(
ServerConnectionManager* connection_manager,
AllStatus* all_status,
ModelSafeWorker* model_safe_worker)
- : thread_main_started_(false, false),
- thread_("SyncEngine_SyncerThread"),
- vault_field_changed_(&lock_),
+ : stop_syncer_thread_(false),
+ thread_running_(false),
+ connected_(false),
p2p_authenticated_(false),
p2p_subscribed_(false),
client_command_hookup_(NULL),
conn_mgr_hookup_(NULL),
allstatus_(all_status),
+ syncer_(NULL),
dirman_(mgr),
scm_(connection_manager),
syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
@@ -212,76 +172,55 @@ SyncerThread::~SyncerThread() {
syncer_event_channel_.reset();
directory_manager_hookup_.reset();
syncer_events_.reset();
- delete vault_.syncer_;
+ delete syncer_;
talk_mediator_hookup_.reset();
- CHECK(!thread_.IsRunning());
+ CHECK(!thread_running_);
}
// Creates and starts a syncer thread.
// Returns true if it creates a thread or if there's currently a thread running
// and false otherwise.
bool SyncerThread::Start() {
- {
- AutoLock lock(lock_);
- if (thread_.IsRunning()) {
- return true;
- }
-
- if (!thread_.Start()) {
- return false;
- }
+ MutexLock lock(&mutex_);
+ if (thread_running_) {
+ return true;
}
-
- thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
- &SyncerThread::ThreadMain));
-
- // Wait for notification that our task makes it safely onto the message
- // loop before returning, so the caller can't call Stop before we're
- // actually up and running. This is for consistency with the old pthread
- // impl because pthread_create would do this in one step.
- thread_main_started_.Wait();
- LOG(INFO) << "SyncerThread started.";
- return true;
+ thread_running_ =
+ (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
+ if (thread_running_) {
+ pthread_detach(thread_);
+ }
+ return thread_running_;
}
// Stop processing. A max wait of at least 2*server RTT time is recommended.
// Returns true if we stopped, false otherwise.
bool SyncerThread::Stop(int max_wait) {
- {
- AutoLock lock(lock_);
- // If the thread has been started, then we either already have or are about
- // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
- // it to finish. If the thread has not been started --and we now own the
- // lock-- then we can early out because the caller has not called Start().
- if (!thread_.IsRunning())
- return true;
-
- LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
- << "true (vault_.stop_syncer_thread_)";
- // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
- // below).
- vault_.stop_syncer_thread_ = true;
- if (NULL != vault_.syncer_) {
- // Try to early exit the syncer itself, which could be looping inside
- // SyncShare.
- vault_.syncer_->RequestEarlyExit();
- }
-
- // stop_syncer_thread_ is now true and the Syncer has been told to exit.
- // We want to wake up all waiters so they can re-examine state. We signal,
- // causing all waiters to try to re-acquire the lock, and then we release
- // the lock, and join on our internal thread which should soon run off the
- // end of ThreadMain.
- vault_field_changed_.Broadcast();
+ MutexLock lock(&mutex_);
+ if (!thread_running_)
+ return true;
+ stop_syncer_thread_ = true;
+ if (NULL != syncer_) {
+ // Try to early exit the syncer.
+ syncer_->RequestEarlyExit();
}
-
- // This will join, and finish when ThreadMain terminates.
- thread_.Stop();
+ pthread_cond_broadcast(&changed_.condvar_);
+ timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
+ do {
+ const int wait_result = max_wait < 0 ?
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
+ pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
+ &deadline);
+ if (ETIMEDOUT == wait_result) {
+ LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
+ return false;
+ }
+ } while (thread_running_);
return true;
}
void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
- AutoLock lock(lock_);
+ PThreadScopedLock<PThreadMutex> lock(&mutex_);
client_command_hookup_.reset(NewEventListenerHookup(channel, this,
&SyncerThread::HandleClientCommand));
}
@@ -302,89 +241,67 @@ void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) {
}
void SyncerThread::ThreadMainLoop() {
- // This is called with lock_ acquired.
- lock_.AssertAcquired();
- LOG(INFO) << "In thread main loop.";
-
// Use the short poll value by default.
- TimeDelta poll_seconds =
- TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
+ int poll_seconds = syncer_short_poll_interval_seconds_;
int user_idle_milliseconds = 0;
- TimeTicks last_sync_time;
+ timespec last_sync_time = { 0 };
bool initial_sync_for_thread = true;
bool continue_sync_cycle = false;
- while (!vault_.stop_syncer_thread_) {
- // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
- // below) because we cannot poll until these conditions are met, so we wait
- // indefinitely.
- if (!vault_.connected_) {
+ while (!stop_syncer_thread_) {
+ if (!connected_) {
LOG(INFO) << "Syncer thread waiting for connection.";
- while (!vault_.connected_ && !vault_.stop_syncer_thread_)
- vault_field_changed_.Wait();
- LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection.";
+ while (!connected_ && !stop_syncer_thread_)
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
+ LOG_IF(INFO, connected_) << "Syncer thread found connection.";
continue;
}
- if (vault_.syncer_ == NULL) {
+ if (syncer_ == NULL) {
LOG(INFO) << "Syncer thread waiting for database initialization.";
- while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
- vault_field_changed_.Wait();
- LOG_IF(INFO, !(vault_.syncer_ == NULL))
- << "Syncer was found after DB started.";
+ while (syncer_ == NULL && !stop_syncer_thread_)
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
+ LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
continue;
}
- const TimeTicks next_poll = last_sync_time + poll_seconds;
- const TimeTicks end_wait =
- !vault_.nudge_queue_.empty() &&
- vault_.nudge_queue_.top().first < next_poll ?
- vault_.nudge_queue_.top().first : next_poll;
- LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
- LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();
-
- // We block until the CV is signaled (e.g a control field changed, loss of
- // network connection, nudge, spurious, etc), or the poll interval elapses.
- TimeDelta sleep_time = end_wait - TimeTicks::Now();
- if (sleep_time > TimeDelta::FromSeconds(0)) {
- vault_field_changed_.TimedWait(sleep_time);
-
- if (TimeTicks::Now() < end_wait) {
- // Didn't timeout. Could be a spurious signal, or a signal corresponding
- // to an actual change in one of our control fields. By continuing here
- // we perform the typical "always recheck conditions when signaled",
- // (typically handled by a while(condition_not_met) cv.wait() construct)
- // because we jump to the top of the loop. The main difference is we
- // recalculate the wait interval, but last_sync_time won't have changed.
- // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
- // off the queue and wait for that delta. If it was a spurious signal,
- // we'll keep waiting for the same moment in time as we just were.
- continue;
- }
+ timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
+ last_sync_time.tv_nsec };
+ const timespec wake_time =
+ !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
+ nudge_queue_.top().first : next_poll;
+ LOG(INFO) << "wake time is " << wake_time.tv_sec;
+ LOG(INFO) << "next poll is " << next_poll.tv_sec;
+
+ const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
+ &wake_time);
+ if (ETIMEDOUT != error) {
+ continue; // Check all the conditions again.
}
+ const timespec now = GetPThreadAbsoluteTime(0);
+
// Handle a nudge, caused by either a notification or a local bookmark
// event. This will also update the source of the following SyncMain call.
- UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread);
+ UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
- LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
- SyncMain(vault_.syncer_);
- last_sync_time = TimeTicks::Now();
+ LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
+ SyncMain(syncer_);
+ last_sync_time = now;
LOG(INFO) << "Updating the next polling time after SyncMain";
- poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime(
- allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()),
- &user_idle_milliseconds, &continue_sync_cycle));
+ poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
+ poll_seconds,
+ &user_idle_milliseconds,
+ &continue_sync_cycle);
}
-
}
// We check how long the user's been idle and sync less often if the machine is
// not in use. The aim is to reduce server load.
-// TODO(timsteele): Should use Time(Delta).
int SyncerThread::CalculatePollingWaitTime(
const AllStatus::Status& status,
- int last_poll_wait, // Time in seconds.
+ int last_poll_wait, // in s
int* user_idle_milliseconds,
bool* continue_sync_cycle) {
bool is_continuing_sync_cyle = *continue_sync_cycle;
@@ -435,25 +352,30 @@ int SyncerThread::CalculatePollingWaitTime(
return actual_next_wait;
}
-void SyncerThread::ThreadMain() {
- AutoLock lock(lock_);
- // Signal Start() to let it know we've made it safely onto the message loop,
- // and unblock it's caller.
- thread_main_started_.Signal();
+void* SyncerThread::ThreadMain() {
+ NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
+ mutex_.Lock();
ThreadMainLoop();
- LOG(INFO) << "Syncer thread ThreadMain is done.";
+ thread_running_ = false;
+ pthread_cond_broadcast(&changed_.condvar_);
+ mutex_.Unlock();
+ LOG(INFO) << "Syncer thread exiting.";
+ return 0;
}
void SyncerThread::SyncMain(Syncer* syncer) {
CHECK(syncer);
- AutoUnlock unlock(lock_);
+ mutex_.Unlock();
while (syncer->SyncShare()) {
LOG(INFO) << "Looping in sync share";
}
LOG(INFO) << "Done looping in sync share";
+
+ mutex_.Lock();
}
-void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle,
+void SyncerThread::UpdateNudgeSource(const timespec& now,
+ bool* continue_sync_cycle,
bool* initial_sync) {
bool nudged = false;
NudgeSource nudge_source = kUnknown;
@@ -463,14 +385,13 @@ void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle,
}
// Update the nudge source if a new nudge has come through during the
// previous sync cycle.
- while (!vault_.nudge_queue_.empty() &&
- TimeTicks::Now() >= vault_.nudge_queue_.top().first) {
+ while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
if (!nudged) {
- nudge_source = vault_.nudge_queue_.top().second;
+ nudge_source = nudge_queue_.top().second;
*continue_sync_cycle = false; // Reset the continuation token on nudge.
nudged = true;
}
- vault_.nudge_queue_.pop();
+ nudge_queue_.pop();
}
SetUpdatesSource(nudged, nudge_source, initial_sync);
}
@@ -501,11 +422,11 @@ void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
break;
}
}
- vault_.syncer_->set_updates_source(updates_source);
+ syncer_->set_updates_source(updates_source);
}
void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
- AutoLock lock(lock_);
+ MutexLock lock(&mutex_);
channel()->NotifyListeners(event);
if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
return;
@@ -517,33 +438,33 @@ void SyncerThread::HandleDirectoryManagerEvent(
const syncable::DirectoryManagerEvent& event) {
LOG(INFO) << "Handling a directory manager event";
if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
- AutoLock lock(lock_);
+ MutexLock lock(&mutex_);
LOG(INFO) << "Syncer starting up for: " << event.dirname;
// The underlying database structure is ready, and we should create
// the syncer.
- CHECK(vault_.syncer_ == NULL);
- vault_.syncer_ =
+ CHECK(syncer_ == NULL);
+ syncer_ =
new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
- vault_.syncer_->set_command_channel(command_channel_);
+ syncer_->set_command_channel(command_channel_);
syncer_events_.reset(NewEventListenerHookup(
- vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
- vault_field_changed_.Broadcast();
+ syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
+ pthread_cond_broadcast(&changed_.condvar_);
}
}
static inline void CheckConnected(bool* connected,
HttpResponse::ServerConnectionCode code,
- ConditionVariable* condvar) {
+ pthread_cond_t* condvar) {
if (*connected) {
if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
*connected = false;
- condvar->Broadcast();
+ pthread_cond_broadcast(condvar);
}
} else {
if (HttpResponse::SERVER_CONNECTION_OK == code) {
*connected = true;
- condvar->Broadcast();
+ pthread_cond_broadcast(condvar);
}
}
}
@@ -551,16 +472,16 @@ static inline void CheckConnected(bool* connected,
void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
&SyncerThread::HandleServerConnectionEvent));
- CheckConnected(&vault_.connected_, conn_mgr->server_status(),
- &vault_field_changed_);
+ CheckConnected(&connected_, conn_mgr->server_status(),
+ &changed_.condvar_);
}
void SyncerThread::HandleServerConnectionEvent(
const ServerConnectionEvent& event) {
if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
- AutoLock lock(lock_);
- CheckConnected(&vault_.connected_, event.connection_code,
- &vault_field_changed_);
+ MutexLock lock(&mutex_);
+ CheckConnected(&connected_, event.connection_code,
+ &changed_.condvar_);
}
}
@@ -592,11 +513,10 @@ int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
// Called with mutex_ already locked.
void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
NudgeSource source) {
- const TimeTicks nudge_time = TimeTicks::Now() +
- TimeDelta::FromMilliseconds(milliseconds_from_now);
+ const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
NudgeObject nudge_object(nudge_time, source);
- vault_.nudge_queue_.push(nudge_object);
- vault_field_changed_.Broadcast();
+ nudge_queue_.push(nudge_object);
+ pthread_cond_broadcast(&changed_.condvar_);
}
void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
@@ -608,7 +528,7 @@ void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
}
void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
- AutoLock lock(lock_);
+ MutexLock lock(&mutex_);
switch (event.what_happened) {
case TalkMediatorEvent::LOGIN_SUCCEEDED:
LOG(INFO) << "P2P: Login succeeded.";
@@ -621,7 +541,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
case TalkMediatorEvent::SUBSCRIPTIONS_ON:
LOG(INFO) << "P2P: Subscriptions successfully enabled.";
p2p_subscribed_ = true;
- if (NULL != vault_.syncer_) {
+ if (NULL != syncer_) {
LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
NudgeSyncImpl(0, kLocal);
}
@@ -632,7 +552,7 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
break;
case TalkMediatorEvent::NOTIFICATION_RECEIVED:
LOG(INFO) << "P2P: Updates on server, pushing syncer";
- if (NULL != vault_.syncer_) {
+ if (NULL != syncer_) {
NudgeSyncImpl(0, kNotification);
}
break;
@@ -640,9 +560,8 @@ void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
break;
}
- if (NULL != vault_.syncer_) {
- vault_.syncer_->set_notifications_enabled(
- p2p_authenticated_ && p2p_subscribed_);
+ if (NULL != syncer_) {
+ syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
}
}