summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync/engine/syncer_thread.cc
diff options
context:
space:
mode:
authornick@chromium.org <nick@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-09-10 06:05:27 +0000
committernick@chromium.org <nick@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-09-10 06:05:27 +0000
commit5852edc1b6eab234b9e048c41dd0d664ae7fc747 (patch)
tree9e5d8eb4833b76cdb11e66fc3607689e0f5e0122 /chrome/browser/sync/engine/syncer_thread.cc
parentf6059e37f8b8ac335ce18a189a13e702974a1c7e (diff)
downloadchromium_src-5852edc1b6eab234b9e048c41dd0d664ae7fc747.zip
chromium_src-5852edc1b6eab234b9e048c41dd0d664ae7fc747.tar.gz
chromium_src-5852edc1b6eab234b9e048c41dd0d664ae7fc747.tar.bz2
Initial commit of sync engine code to browser/sync.
The code is not built on any platform yet. That will arrive as a subsequent checkin. This is an implementation of the interface exposed earlier through syncapi.h. It is the client side of a sync protocol that lets users sync their browser data (currently, just bookmarks) with their Google Account. Table of contents: browser/sync/ protocol - The protocol definition, and other definitions necessary to connect to the service. syncable/ - defines a data model for syncable objects, and provides a sqlite-based backing store for this model. engine/ - includes the core sync logic, including commiting changes to the server, downloading changes from the server, resolving conflicts, other parts of the sync algorithm. engine/net - parts of the sync engine focused on the business of talking to the server. Some of this is binds a generic "server connection" interface to a concrete implementation provided by Chromium. notifier - the part of the syncer focused on the business of sending and receiving xmpp notifications. Notifications are used instead of polling to achieve very low latency change propagation. util - not necessarily sync specific utility code. Much of this is scaffolding which should either be replaced by, or merged with, the utility code in base/. BUG=none TEST=this code includes its own suite of unit tests. Review URL: http://codereview.chromium.org/194065 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@25850 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/engine/syncer_thread.cc')
-rw-r--r--chrome/browser/sync/engine/syncer_thread.cc558
1 files changed, 558 insertions, 0 deletions
diff --git a/chrome/browser/sync/engine/syncer_thread.cc b/chrome/browser/sync/engine/syncer_thread.cc
new file mode 100644
index 0000000..e0832a7
--- /dev/null
+++ b/chrome/browser/sync/engine/syncer_thread.cc
@@ -0,0 +1,558 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/engine/syncer_thread.h"
+
+#ifdef OS_MACOSX
+#include <CoreFoundation/CFNumber.h>
+#include <IOKit/IOTypes.h>
+#include <IOKit/IOKitLib.h>
+#endif
+
+#include <algorithm>
+#include <map>
+#include <queue>
+
+#include "chrome/browser/sync/engine/auth_watcher.h"
+#include "chrome/browser/sync/engine/model_safe_worker.h"
+#include "chrome/browser/sync/engine/net/server_connection_manager.h"
+#include "chrome/browser/sync/engine/syncer.h"
+#include "chrome/browser/sync/notifier/listener/talk_mediator.h"
+#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
+#include "chrome/browser/sync/syncable/directory_manager.h"
+
+using std::priority_queue;
+using std::min;
+
+static inline bool operator < (const timespec& a, const timespec& b) {
+ return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
+}
+
+namespace {
+
+// returns the amount of time since the user last interacted with
+// the computer, in milliseconds
+int UserIdleTime() {
+#ifdef OS_WINDOWS
+ LASTINPUTINFO last_input_info;
+ last_input_info.cbSize = sizeof(LASTINPUTINFO);
+
+ // get time in windows ticks since system start of last activity
+ BOOL b = ::GetLastInputInfo(&last_input_info);
+ if (b == TRUE)
+ return ::GetTickCount() - last_input_info.dwTime;
+#elif defined(OS_MACOSX)
+ // It would be great to do something like:
+ //
+ // return 1000 *
+ // CGEventSourceSecondsSinceLastEventType(
+ // kCGEventSourceStateCombinedSessionState,
+ // kCGAnyInputEventType);
+ //
+ // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
+ // and can't link that high up the food chain. Thus this mucking in IOKit.
+
+ io_service_t hid_service =
+ IOServiceGetMatchingService(kIOMasterPortDefault,
+ IOServiceMatching("IOHIDSystem"));
+ if (!hid_service) {
+ LOG(WARNING) << "Could not obtain IOHIDSystem";
+ return 0;
+ }
+
+ CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
+ CFSTR("HIDIdleTime"),
+ kCFAllocatorDefault,
+ 0);
+ if (!object) {
+ LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
+ IOObjectRelease(hid_service);
+ return 0;
+ }
+
+ int64 idle_time; // in nanoseconds
+ Boolean success;
+ if (CFGetTypeID(object) == CFNumberGetTypeID()) {
+ success = CFNumberGetValue((CFNumberRef)object,
+ kCFNumberSInt64Type,
+ &idle_time);
+ } else {
+ LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
+ }
+
+ CFRelease(object);
+ IOObjectRelease(hid_service);
+
+ if (!success) {
+ LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
+ return 0;
+ } else {
+ return idle_time / 1000000; // nano to milli
+ }
+#else
+ static bool was_logged = false;
+ if (!was_logged) {
+ was_logged = true;
+ LOG(INFO) << "UserIdleTime unimplemented on this platform, "
+ "synchronization will not throttle when user idle";
+ }
+#endif
+
+ return 0;
+}
+
+} // namespace
+
+namespace browser_sync {
+
+bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
+ MutexLock lock(&mutex_);
+ if (syncer_ == NULL) {
+ return false;
+ }
+ NudgeSyncImpl(milliseconds_from_now, source);
+ return true;
+}
+
+void* RunSyncerThread(void* syncer_thread) {
+ return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
+}
+
+SyncerThread::SyncerThread(
+ ClientCommandChannel* command_channel,
+ syncable::DirectoryManager* mgr,
+ ServerConnectionManager* connection_manager,
+ AllStatus* all_status,
+ ModelSafeWorker* model_safe_worker)
+ : dirman_(mgr), scm_(connection_manager),
+ syncer_(NULL), syncer_events_(NULL), thread_running_(false),
+ syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
+ syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
+ syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
+ syncer_max_interval_(kDefaultMaxPollIntervalMs),
+ stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
+ p2p_authenticated_(false), p2p_subscribed_(false),
+ allstatus_(all_status), talk_mediator_hookup_(NULL),
+ command_channel_(command_channel), directory_manager_hookup_(NULL),
+ model_safe_worker_(model_safe_worker),
+ client_command_hookup_(NULL), disable_idle_detection_(false) {
+
+ SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
+ syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
+
+ if (dirman_) {
+ directory_manager_hookup_.reset(NewEventListenerHookup(
+ dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent));
+ }
+
+ if (scm_) {
+ WatchConnectionManager(scm_);
+ }
+
+ if (command_channel_) {
+ WatchClientCommands(command_channel_);
+ }
+}
+
+SyncerThread::~SyncerThread() {
+ client_command_hookup_.reset();
+ conn_mgr_hookup_.reset();
+ syncer_event_channel_.reset();
+ directory_manager_hookup_.reset();
+ syncer_events_.reset();
+ delete syncer_;
+ talk_mediator_hookup_.reset();
+ CHECK(!thread_running_);
+}
+
+// Creates and starts a syncer thread.
+// Returns true if it creates a thread or if there's currently a thread
+// running and false otherwise.
+bool SyncerThread::Start() {
+ MutexLock lock(&mutex_);
+ if (thread_running_) {
+ return true;
+ }
+ thread_running_ =
+ (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
+ if (thread_running_) {
+ pthread_detach(thread_);
+ }
+ return thread_running_;
+}
+
+// Stop processing. A max wait of at least 2*server RTT time is recommended.
+// returns true if we stopped, false otherwise.
+bool SyncerThread::Stop(int max_wait) {
+ MutexLock lock(&mutex_);
+ if (!thread_running_)
+ return true;
+ stop_syncer_thread_ = true;
+ if (NULL != syncer_) {
+ // try to early exit the syncer
+ syncer_->RequestEarlyExit();
+ }
+ pthread_cond_broadcast(&changed_.condvar_);
+ timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
+ do {
+ const int wait_result = max_wait < 0 ?
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
+ pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
+ &deadline);
+ if (ETIMEDOUT == wait_result) {
+ LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
+ return false;
+ }
+ } while (thread_running_);
+ return true;
+}
+
+void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
+ PThreadScopedLock<PThreadMutex> lock(&mutex_);
+ client_command_hookup_.reset(NewEventListenerHookup(channel, this,
+ &SyncerThread::HandleClientCommand));
+}
+
+void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType
+ event) {
+ if (!event) {
+ return;
+ }
+
+ // mutex not really necessary for these
+ if (event->has_set_sync_poll_interval()) {
+ syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
+ }
+
+ if (event->has_set_sync_long_poll_interval()) {
+ syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
+ }
+}
+
+void SyncerThread::ThreadMainLoop() {
+ // Use the short poll value by default.
+ int poll_seconds = syncer_short_poll_interval_seconds_;
+ int user_idle_milliseconds = 0;
+ timespec last_sync_time = { 0 };
+ bool initial_sync_for_thread = true;
+ bool continue_sync_cycle = false;
+
+ while (!stop_syncer_thread_) {
+ if (!connected_) {
+ LOG(INFO) << "Syncer thread waiting for connection.";
+ while (!connected_ && !stop_syncer_thread_)
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
+ LOG_IF(INFO, connected_) << "Syncer thread found connection.";
+ continue;
+ }
+
+ if (syncer_ == NULL) {
+ LOG(INFO) << "Syncer thread waiting for database initialization.";
+ while (syncer_ == NULL && !stop_syncer_thread_)
+ pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
+ LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
+ continue;
+ }
+
+ timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
+ last_sync_time.tv_nsec };
+ const timespec wake_time =
+ !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
+ nudge_queue_.top().first : next_poll;
+ LOG(INFO) << "wake time is " << wake_time.tv_sec;
+ LOG(INFO) << "next poll is " << next_poll.tv_sec;
+
+ const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
+ &wake_time);
+ if (ETIMEDOUT != error) {
+ continue; // Check all the conditions again.
+ }
+
+ const timespec now = GetPThreadAbsoluteTime(0);
+
+ // Handle a nudge, caused by either a notification or a local bookmark
+ // event. This will also update the source of the following SyncMain call.
+ UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
+
+ LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
+ SyncMain(syncer_);
+ last_sync_time = now;
+
+ LOG(INFO) << "Updating the next polling time after SyncMain";
+ poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
+ poll_seconds,
+ &user_idle_milliseconds,
+ &continue_sync_cycle);
+ }
+}
+
+// We check how long the user's been idle and sync less often if the
+// machine is not in use. The aim is to reduce server load.
+int SyncerThread::CalculatePollingWaitTime(
+ const AllStatus::Status& status,
+ int last_poll_wait, // in s
+ int* user_idle_milliseconds,
+ bool* continue_sync_cycle) {
+ bool is_continuing_sync_cyle = *continue_sync_cycle;
+ *continue_sync_cycle = false;
+
+ // Determine if the syncer has unfinished work to do from allstatus_.
+ const bool syncer_has_work_to_do =
+ status.updates_available > status.updates_received
+ || status.unsynced_count > 0;
+ LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
+
+ // First calculate the expected wait time, figuring in any backoff because of
+ // user idle time. next_wait is in seconds
+ syncer_polling_interval_ = (!status.notifications_enabled) ?
+ syncer_short_poll_interval_seconds_ :
+ syncer_long_poll_interval_seconds_;
+ int default_next_wait = syncer_polling_interval_;
+ int actual_next_wait = default_next_wait;
+
+ if (syncer_has_work_to_do) {
+ // Provide exponential backoff due to consecutive errors, else attempt to
+ // complete the work as soon as possible.
+ if (!is_continuing_sync_cyle) {
+ actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
+ } else {
+ actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
+ }
+ *continue_sync_cycle = true;
+ } else if (!status.notifications_enabled) {
+ // Ensure that we start exponential backoff from our base polling
+ // interval when we are not continuing a sync cycle.
+ last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
+
+ // Did the user start interacting with the computer again?
+ // If so, revise our idle time (and probably next_sync_time) downwards
+ int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
+ if (new_idle_time < *user_idle_milliseconds) {
+ *user_idle_milliseconds = new_idle_time;
+ }
+ actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
+ *user_idle_milliseconds) / 1000;
+ DCHECK_GE(actual_next_wait, default_next_wait);
+ }
+
+ LOG(INFO) << "Sync wait: idle " << default_next_wait
+ << " non-idle or backoff " << actual_next_wait << ".";
+
+ return actual_next_wait;
+}
+
+void* SyncerThread::ThreadMain() {
+ NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
+ mutex_.Lock();
+ ThreadMainLoop();
+ thread_running_ = false;
+ pthread_cond_broadcast(&changed_.condvar_);
+ mutex_.Unlock();
+ LOG(INFO) << "Syncer thread exiting.";
+ return 0;
+}
+
+void SyncerThread::SyncMain(Syncer* syncer) {
+ CHECK(syncer);
+ mutex_.Unlock();
+ while (syncer->SyncShare()) {
+ LOG(INFO) << "Looping in sync share";
+ }
+ LOG(INFO) << "Done looping in sync share";
+
+ mutex_.Lock();
+}
+
+void SyncerThread::UpdateNudgeSource(const timespec& now,
+ bool* continue_sync_cycle,
+ bool* initial_sync) {
+ bool nudged = false;
+ NudgeSource nudge_source = kUnknown;
+ // Has the previous sync cycle completed?
+ if (continue_sync_cycle) {
+ nudge_source = kContinuation;
+ }
+ // Update the nudge source if a new nudge has come through during the
+ // previous sync cycle.
+ while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
+ if (!nudged) {
+ nudge_source = nudge_queue_.top().second;
+ *continue_sync_cycle = false; // Reset the continuation token on nudge.
+ nudged = true;
+ }
+ nudge_queue_.pop();
+ }
+ SetUpdatesSource(nudged, nudge_source, initial_sync);
+}
+
+void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
+ bool* initial_sync) {
+ sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
+ sync_pb::GetUpdatesCallerInfo::UNKNOWN;
+ if (*initial_sync) {
+ updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
+ *initial_sync = false;
+ } else if (!nudged) {
+ updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
+ } else {
+ switch (nudge_source) {
+ case kNotification:
+ updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
+ break;
+ case kLocal:
+ updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
+ break;
+ case kContinuation:
+ updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
+ break;
+ case kUnknown:
+ default:
+ updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
+ break;
+ }
+ }
+ syncer_->set_updates_source(updates_source);
+}
+
+void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
+ MutexLock lock(&mutex_);
+ channel()->NotifyListeners(event);
+ if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
+ return;
+ }
+ NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
+}
+
+void SyncerThread::HandleDirectoryManagerEvent(
+ const syncable::DirectoryManagerEvent& event) {
+ LOG(INFO) << "Handling a directory manager event";
+ if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
+ MutexLock lock(&mutex_);
+ LOG(INFO) << "Syncer starting up for: " << event.dirname;
+ // The underlying database structure is ready, and we should create
+ // the syncer.
+ CHECK(syncer_ == NULL);
+ syncer_ =
+ new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
+
+ syncer_->set_command_channel(command_channel_);
+ syncer_events_.reset(NewEventListenerHookup(
+ syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
+ pthread_cond_broadcast(&changed_.condvar_);
+ }
+}
+
+static inline void CheckConnected(bool* connected,
+ HttpResponse::ServerConnectionCode code,
+ pthread_cond_t* condvar) {
+ if (*connected) {
+ if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
+ *connected = false;
+ pthread_cond_broadcast(condvar);
+ }
+ } else {
+ if (HttpResponse::SERVER_CONNECTION_OK == code) {
+ *connected = true;
+ pthread_cond_broadcast(condvar);
+ }
+ }
+}
+
+void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
+ conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
+ &SyncerThread::HandleServerConnectionEvent));
+ CheckConnected(&connected_, conn_mgr->server_status(),
+ &changed_.condvar_);
+}
+
+void SyncerThread::HandleServerConnectionEvent(
+ const ServerConnectionEvent& event) {
+ if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
+ MutexLock lock(&mutex_);
+ CheckConnected(&connected_, event.connection_code,
+ &changed_.condvar_);
+ }
+}
+
+SyncerEventChannel* SyncerThread::channel() {
+ return syncer_event_channel_.get();
+}
+
+// inputs and return value in milliseconds
+int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
+ // syncer_polling_interval_ is in seconds
+ int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
+
+ // This is our default and lower bound.
+ int next_wait = syncer_polling_interval_ms;
+
+ // Get idle time, bounded by max wait.
+ int idle = min(user_idle_ms, syncer_max_interval_);
+
+ // If the user has been idle for a while,
+ // we'll start decreasing the poll rate.
+ if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
+ next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
+ last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
+ }
+
+ return next_wait;
+}
+
+// Called with mutex_ already locked
+void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
+ NudgeSource source) {
+ const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
+ NudgeObject nudge_object(nudge_time, source);
+ nudge_queue_.push(nudge_object);
+ pthread_cond_broadcast(&changed_.condvar_);
+}
+
+void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
+ talk_mediator_hookup_.reset(
+ NewEventListenerHookup(
+ mediator->channel(),
+ this,
+ &SyncerThread::HandleTalkMediatorEvent));
+}
+
+void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
+ MutexLock lock(&mutex_);
+ switch (event.what_happened) {
+ case TalkMediatorEvent::LOGIN_SUCCEEDED:
+ LOG(INFO) << "P2P: Login succeeded.";
+ p2p_authenticated_ = true;
+ break;
+ case TalkMediatorEvent::LOGOUT_SUCCEEDED:
+ LOG(INFO) << "P2P: Login succeeded.";
+ p2p_authenticated_ = false;
+ break;
+ case TalkMediatorEvent::SUBSCRIPTIONS_ON:
+ LOG(INFO) << "P2P: Subscriptions successfully enabled.";
+ p2p_subscribed_ = true;
+ if (NULL != syncer_) {
+ LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
+ NudgeSyncImpl(0, kLocal);
+ }
+ break;
+ case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
+ LOG(INFO) << "P2P: Subscriptions are not enabled.";
+ p2p_subscribed_ = false;
+ break;
+ case TalkMediatorEvent::NOTIFICATION_RECEIVED:
+ LOG(INFO) << "P2P: Updates on server, pushing syncer";
+ if (NULL != syncer_) {
+ NudgeSyncImpl(0, kNotification);
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (NULL != syncer_) {
+ syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
+ }
+}
+
+} // namespace browser_sync