summaryrefslogtreecommitdiffstats
path: root/sync
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-04-19 14:21:09 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-04-19 14:21:09 +0000
commitb7fe4b561ee884e64c2492c338b4cfda0227ebd4 (patch)
tree8fad788255a99f9493546a307b90a6592b8a22f3 /sync
parented0626b92b987b22d13efb278d6887feac04158b (diff)
downloadchromium_src-b7fe4b561ee884e64c2492c338b4cfda0227ebd4.zip
chromium_src-b7fe4b561ee884e64c2492c338b4cfda0227ebd4.tar.gz
chromium_src-b7fe4b561ee884e64c2492c338b4cfda0227ebd4.tar.bz2
[Sync] Move 'sync_notifier' target to sync/
Also move related test files. Remove some unneeded includes. Lock down deps for sync/notifier. Clean up some deps in chrome/browser/sync. BUG=117585,124139 TEST= Review URL: http://codereview.chromium.org/10115046 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@132990 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'sync')
-rw-r--r--sync/notifier/DEPS16
-rw-r--r--sync/notifier/cache_invalidation_packet_handler.cc126
-rw-r--r--sync/notifier/cache_invalidation_packet_handler.h86
-rw-r--r--sync/notifier/cache_invalidation_packet_handler_unittest.cc77
-rw-r--r--sync/notifier/chrome_invalidation_client.cc313
-rw-r--r--sync/notifier/chrome_invalidation_client.h142
-rw-r--r--sync/notifier/chrome_invalidation_client_unittest.cc275
-rw-r--r--sync/notifier/chrome_system_resources.cc298
-rw-r--r--sync/notifier/chrome_system_resources.h176
-rw-r--r--sync/notifier/chrome_system_resources_unittest.cc174
-rw-r--r--sync/notifier/invalidation_notifier.cc150
-rw-r--r--sync/notifier/invalidation_notifier.h123
-rw-r--r--sync/notifier/invalidation_notifier_unittest.cc96
-rw-r--r--sync/notifier/invalidation_util.cc58
-rw-r--r--sync/notifier/invalidation_util.h40
-rw-r--r--sync/notifier/invalidation_version_tracker.h38
-rw-r--r--sync/notifier/mock_sync_notifier_observer.cc12
-rw-r--r--sync/notifier/mock_sync_notifier_observer.h30
-rw-r--r--sync/notifier/non_blocking_invalidation_notifier.cc268
-rw-r--r--sync/notifier/non_blocking_invalidation_notifier.h83
-rw-r--r--sync/notifier/non_blocking_invalidation_notifier_unittest.cc97
-rw-r--r--sync/notifier/p2p_notifier.cc301
-rw-r--r--sync/notifier/p2p_notifier.h143
-rw-r--r--sync/notifier/p2p_notifier_unittest.cc259
-rw-r--r--sync/notifier/registration_manager.cc275
-rw-r--r--sync/notifier/registration_manager.h177
-rw-r--r--sync/notifier/registration_manager_unittest.cc437
-rw-r--r--sync/notifier/state_writer.h24
-rw-r--r--sync/notifier/sync_notifier.h55
-rw-r--r--sync/notifier/sync_notifier_factory.cc69
-rw-r--r--sync/notifier/sync_notifier_factory.h48
-rw-r--r--sync/notifier/sync_notifier_factory_unittest.cc78
-rw-r--r--sync/notifier/sync_notifier_observer.h39
-rw-r--r--sync/sync.gyp120
34 files changed, 4703 insertions, 0 deletions
diff --git a/sync/notifier/DEPS b/sync/notifier/DEPS
new file mode 100644
index 0000000..ab55b47
--- /dev/null
+++ b/sync/notifier/DEPS
@@ -0,0 +1,16 @@
+include_rules = [
+ "+google/cacheinvalidation",
+ "+jingle/notifier",
+ "+net/url_request/url_request_context.h",
+ "+net/url_request/url_request_test_util.h",
+
+ "+sync/syncable/model_type.h",
+ "+sync/syncable/model_type_payload_map.h",
+ "+sync/protocol/service_constants.h",
+ "+sync/util",
+
+ # unit tests depend on talk/base.
+ "+talk/base",
+ # sync_notifier depends on the xmpp part of libjingle.
+ "+talk/xmpp",
+]
diff --git a/sync/notifier/cache_invalidation_packet_handler.cc b/sync/notifier/cache_invalidation_packet_handler.cc
new file mode 100644
index 0000000..5d68753
--- /dev/null
+++ b/sync/notifier/cache_invalidation_packet_handler.cc
@@ -0,0 +1,126 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/cache_invalidation_packet_handler.h"
+
+#include <string>
+
+#include "base/base64.h"
+#include "base/callback.h"
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/rand_util.h"
+#include "base/string_number_conversions.h"
+#include "google/cacheinvalidation/impl/constants.h"
+#include "google/cacheinvalidation/include/invalidation-client.h"
+#include "google/cacheinvalidation/include/system-resources.h"
+#include "google/cacheinvalidation/v2/client_gateway.pb.h"
+#include "jingle/notifier/listener/notification_constants.h"
+#include "jingle/notifier/listener/push_notifications_send_update_task.h"
+#include "jingle/notifier/listener/xml_element_util.h"
+#include "talk/xmpp/constants.h"
+#include "talk/xmpp/jid.h"
+#include "talk/xmpp/xmppclient.h"
+#include "talk/xmpp/xmpptask.h"
+
+namespace sync_notifier {
+
+namespace {
+
+const char kBotJid[] = "tango@bot.talk.google.com";
+const char kChannelName[] = "tango_raw";
+
+} // namespace
+
+CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task)
+ : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ base_task_(base_task),
+ seq_(0),
+ scheduling_hash_(0) {
+ CHECK(base_task_.get());
+ // Owned by base_task. Takes ownership of the callback.
+ notifier::PushNotificationsListenTask* listen_task =
+ new notifier::PushNotificationsListenTask(base_task_, this);
+ listen_task->Start();
+}
+
+CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+}
+
+void CacheInvalidationPacketHandler::SendMessage(
+ const std::string& message) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ if (!base_task_.get()) {
+ return;
+ }
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.set_is_client_to_server(true);
+ if (!service_context_.empty()) {
+ envelope.set_service_context(service_context_);
+ envelope.set_rpc_scheduling_hash(scheduling_hash_);
+ }
+ envelope.set_network_message(message);
+
+ notifier::Recipient recipient;
+ recipient.to = kBotJid;
+ notifier::Notification notification;
+ notification.channel = kChannelName;
+ notification.recipients.push_back(recipient);
+ envelope.SerializeToString(&notification.data);
+
+ // Owned by base_task_.
+ notifier::PushNotificationsSendUpdateTask* send_message_task =
+ new notifier::PushNotificationsSendUpdateTask(base_task_, notification);
+ send_message_task->Start();
+}
+
+void CacheInvalidationPacketHandler::SetMessageReceiver(
+ invalidation::MessageCallback* incoming_receiver) {
+ incoming_receiver_.reset(incoming_receiver);
+}
+
+void CacheInvalidationPacketHandler::SendSubscriptionRequest() {
+ notifier::Subscription subscription;
+ subscription.channel = kChannelName;
+ subscription.from = "";
+ notifier::SubscriptionList subscription_list;
+ subscription_list.push_back(subscription);
+ // Owned by base_task_.
+ notifier::PushNotificationsSubscribeTask* push_subscription_task =
+ new notifier::PushNotificationsSubscribeTask(
+ base_task_, subscription_list, this);
+ push_subscription_task->Start();
+}
+
+void CacheInvalidationPacketHandler::OnSubscribed() {
+ // TODO(ghc): Consider whether we should do more here.
+}
+
+void CacheInvalidationPacketHandler::OnSubscriptionError() {
+ // TODO(ghc): Consider whether we should do more here.
+}
+
+void CacheInvalidationPacketHandler::OnNotificationReceived(
+ const notifier::Notification& notification) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ const std::string& decoded_message = notification.data;
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.ParseFromString(decoded_message);
+ if (!envelope.IsInitialized()) {
+ LOG(ERROR) << "Could not parse ClientGatewayMessage: "
+ << decoded_message;
+ return;
+ }
+ if (envelope.has_service_context()) {
+ service_context_ = envelope.service_context();
+ }
+ if (envelope.has_rpc_scheduling_hash()) {
+ scheduling_hash_ = envelope.rpc_scheduling_hash();
+ }
+ incoming_receiver_->Run(envelope.network_message());
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/cache_invalidation_packet_handler.h b/sync/notifier/cache_invalidation_packet_handler.h
new file mode 100644
index 0000000..655933d
--- /dev/null
+++ b/sync/notifier/cache_invalidation_packet_handler.h
@@ -0,0 +1,86 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Class that handles the details of sending and receiving client
+// invalidation packets.
+
+#ifndef SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
+#define SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
+#pragma once
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/gtest_prod_util.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/threading/non_thread_safe.h"
+#include "google/cacheinvalidation/include/system-resources.h"
+#include "jingle/notifier/listener/push_notifications_listen_task.h"
+#include "jingle/notifier/listener/push_notifications_subscribe_task.h"
+
+namespace buzz {
+class XmppTaskParentInterface;
+} // namespace buzz
+
+namespace sync_notifier {
+
+class CacheInvalidationPacketHandler
+ : public notifier::PushNotificationsSubscribeTaskDelegate,
+ public notifier::PushNotificationsListenTaskDelegate {
+ public:
+ // Starts routing packets from |invalidation_client| using
+ // |base_task|. |base_task.get()| must still be non-NULL.
+ // |invalidation_client| must not already be routing packets through
+ // something. Does not take ownership of |invalidation_client|.
+ CacheInvalidationPacketHandler(
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task);
+
+ // Makes the invalidation client passed into the constructor not
+ // route packets through the XMPP client passed into the constructor
+ // anymore.
+ virtual ~CacheInvalidationPacketHandler();
+
+ // If |base_task| is non-NULL, sends the outgoing message.
+ virtual void SendMessage(const std::string& outgoing_message);
+
+ virtual void SetMessageReceiver(
+ invalidation::MessageCallback* incoming_receiver);
+
+ // Sends a message requesting a subscription to the notification channel.
+ virtual void SendSubscriptionRequest();
+
+ // PushNotificationsSubscribeTaskDelegate implementation.
+ virtual void OnSubscribed() OVERRIDE;
+ virtual void OnSubscriptionError() OVERRIDE;
+
+ // PushNotificationsListenTaskDelegate implementation.
+ virtual void OnNotificationReceived(
+ const notifier::Notification& notification) OVERRIDE;
+
+ private:
+ FRIEND_TEST_ALL_PREFIXES(CacheInvalidationPacketHandlerTest, Basic);
+
+ base::NonThreadSafe non_thread_safe_;
+ base::WeakPtrFactory<CacheInvalidationPacketHandler> weak_factory_;
+
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task_;
+
+ scoped_ptr<invalidation::MessageCallback> incoming_receiver_;
+
+ // Parameters for sent messages.
+
+ // Monotonically increasing sequence number.
+ int seq_;
+ // Service context.
+ std::string service_context_;
+ // Scheduling hash.
+ int64 scheduling_hash_;
+
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler);
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
diff --git a/sync/notifier/cache_invalidation_packet_handler_unittest.cc b/sync/notifier/cache_invalidation_packet_handler_unittest.cc
new file mode 100644
index 0000000..184c030
--- /dev/null
+++ b/sync/notifier/cache_invalidation_packet_handler_unittest.cc
@@ -0,0 +1,77 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/cache_invalidation_packet_handler.h"
+
+#include "base/base64.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop.h"
+#include "google/cacheinvalidation/deps/callback.h"
+#include "google/cacheinvalidation/include/system-resources.h"
+#include "google/cacheinvalidation/v2/client_gateway.pb.h"
+#include "jingle/notifier/base/fake_base_task.h"
+#include "jingle/notifier/listener/notification_defines.h"
+#include "talk/base/task.h"
+#include "talk/xmpp/asyncsocket.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+
+class MockMessageCallback {
+ public:
+ void StoreMessage(const std::string& message) {
+ last_message = message;
+ }
+
+ std::string last_message;
+};
+
+class CacheInvalidationPacketHandlerTest : public testing::Test {
+ public:
+ virtual ~CacheInvalidationPacketHandlerTest() {}
+
+ notifier::Notification MakeNotification(const std::string& data) {
+ notifier::Notification notification;
+ notification.channel = "tango_raw";
+ notification.data = data;
+ return notification;
+ }
+};
+
+TEST_F(CacheInvalidationPacketHandlerTest, Basic) {
+ MessageLoop message_loop;
+
+ notifier::FakeBaseTask fake_base_task;
+
+ std::string last_message;
+ MockMessageCallback callback;
+ invalidation::MessageCallback* mock_message_callback =
+ invalidation::NewPermanentCallback(
+ &callback, &MockMessageCallback::StoreMessage);
+
+ const char kInboundMessage[] = "non-bogus";
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.set_network_message(kInboundMessage);
+ std::string serialized;
+ envelope.SerializeToString(&serialized);
+ {
+ CacheInvalidationPacketHandler handler(fake_base_task.AsWeakPtr());
+ handler.SetMessageReceiver(mock_message_callback);
+
+ // Take care of any tasks posted by the constructor.
+ message_loop.RunAllPending();
+
+ {
+ handler.OnNotificationReceived(MakeNotification("bogus"));
+ handler.OnNotificationReceived(MakeNotification(serialized));
+ }
+
+ // Take care of any tasks posted by HandleOutboundPacket().
+ message_loop.RunAllPending();
+
+ EXPECT_EQ(callback.last_message, kInboundMessage);
+ }
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/chrome_invalidation_client.cc b/sync/notifier/chrome_invalidation_client.cc
new file mode 100644
index 0000000..d74cf0a
--- /dev/null
+++ b/sync/notifier/chrome_invalidation_client.cc
@@ -0,0 +1,313 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/chrome_invalidation_client.h"
+
+#include <string>
+#include <vector>
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/tracked_objects.h"
+#include "google/cacheinvalidation/include/invalidation-client-factory.h"
+#include "google/cacheinvalidation/include/invalidation-client.h"
+#include "google/cacheinvalidation/include/types.h"
+#include "google/cacheinvalidation/v2/types.pb.h"
+#include "sync/notifier/cache_invalidation_packet_handler.h"
+#include "sync/notifier/invalidation_util.h"
+#include "sync/notifier/registration_manager.h"
+#include "sync/syncable/model_type.h"
+
+namespace {
+
+const char kApplicationName[] = "chrome-sync";
+
+} // anonymous namespace
+
+namespace sync_notifier {
+
+ChromeInvalidationClient::Listener::~Listener() {}
+
+ChromeInvalidationClient::ChromeInvalidationClient()
+ : chrome_system_resources_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ listener_(NULL),
+ state_writer_(NULL),
+ ticl_ready_(false) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+}
+
+ChromeInvalidationClient::~ChromeInvalidationClient() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ Stop();
+ DCHECK(!listener_);
+ DCHECK(!state_writer_);
+}
+
+void ChromeInvalidationClient::Start(
+ const std::string& client_id, const std::string& client_info,
+ const std::string& state,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ Listener* listener,
+ StateWriter* state_writer,
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ Stop();
+
+ chrome_system_resources_.set_platform(client_info);
+ chrome_system_resources_.Start();
+
+ // The Storage resource is implemented as a write-through cache. We populate
+ // it with the initial state on startup, so subsequent writes go to disk and
+ // update the in-memory cache, while reads just return the cached state.
+ chrome_system_resources_.storage()->SetInitialState(state);
+
+ max_invalidation_versions_ = initial_max_invalidation_versions;
+ if (max_invalidation_versions_.empty()) {
+ DVLOG(2) << "No initial max invalidation versions for any type";
+ } else {
+ for (InvalidationVersionMap::const_iterator it =
+ max_invalidation_versions_.begin();
+ it != max_invalidation_versions_.end(); ++it) {
+ DVLOG(2) << "Initial max invalidation version for "
+ << syncable::ModelTypeToString(it->first) << " is "
+ << it->second;
+ }
+ }
+ invalidation_version_tracker_ = invalidation_version_tracker;
+ DCHECK(invalidation_version_tracker_.IsInitialized());
+
+ DCHECK(!listener_);
+ DCHECK(listener);
+ listener_ = listener;
+ DCHECK(!state_writer_);
+ DCHECK(state_writer);
+ state_writer_ = state_writer;
+
+ int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
+ invalidation_client_.reset(
+ invalidation::CreateInvalidationClient(
+ &chrome_system_resources_, client_type, client_id,
+ kApplicationName, this));
+ ChangeBaseTask(base_task);
+ invalidation_client_->Start();
+
+ registration_manager_.reset(
+ new RegistrationManager(invalidation_client_.get()));
+}
+
+void ChromeInvalidationClient::ChangeBaseTask(
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task) {
+ DCHECK(invalidation_client_.get());
+ DCHECK(base_task.get());
+ cache_invalidation_packet_handler_.reset(
+ new CacheInvalidationPacketHandler(base_task));
+ chrome_system_resources_.network()->UpdatePacketHandler(
+ cache_invalidation_packet_handler_.get());
+}
+
+void ChromeInvalidationClient::Stop() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ if (!invalidation_client_.get()) {
+ DCHECK(!cache_invalidation_packet_handler_.get());
+ return;
+ }
+
+ registration_manager_.reset();
+ cache_invalidation_packet_handler_.reset();
+ chrome_system_resources_.Stop();
+ invalidation_client_->Stop();
+
+ invalidation_client_.reset();
+ state_writer_ = NULL;
+ listener_ = NULL;
+
+ invalidation_version_tracker_.Reset();
+ max_invalidation_versions_.clear();
+}
+
+void ChromeInvalidationClient::RegisterTypes(syncable::ModelTypeSet types) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ registered_types_ = types;
+ if (ticl_ready_ && registration_manager_.get()) {
+ registration_manager_->SetRegisteredTypes(registered_types_);
+ }
+ // TODO(akalin): Clear invalidation versions for unregistered types.
+}
+
+void ChromeInvalidationClient::Ready(
+ invalidation::InvalidationClient* client) {
+ ticl_ready_ = true;
+ listener_->OnSessionStatusChanged(true);
+ registration_manager_->SetRegisteredTypes(registered_types_);
+}
+
+void ChromeInvalidationClient::Invalidate(
+ invalidation::InvalidationClient* client,
+ const invalidation::Invalidation& invalidation,
+ const invalidation::AckHandle& ack_handle) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
+ syncable::ModelType model_type;
+ if (!ObjectIdToRealModelType(invalidation.object_id(), &model_type)) {
+ LOG(WARNING) << "Could not get invalidation model type; "
+ << "invalidating everything";
+ EmitInvalidation(registered_types_, std::string());
+ client->Acknowledge(ack_handle);
+ return;
+ }
+ // The invalidation API spec allows for the possibility of redundant
+ // invalidations, so keep track of the max versions and drop
+ // invalidations with old versions.
+ //
+ // TODO(akalin): Now that we keep track of registered types, we
+ // should drop invalidations for unregistered types. We may also
+ // have to filter it at a higher level, as invalidations for
+ // newly-unregistered types may already be in flight.
+ InvalidationVersionMap::const_iterator it =
+ max_invalidation_versions_.find(model_type);
+ if ((it != max_invalidation_versions_.end()) &&
+ (invalidation.version() <= it->second)) {
+ // Drop redundant invalidations.
+ client->Acknowledge(ack_handle);
+ return;
+ }
+ DVLOG(2) << "Setting max invalidation version for "
+ << syncable::ModelTypeToString(model_type) << " to "
+ << invalidation.version();
+ max_invalidation_versions_[model_type] = invalidation.version();
+ invalidation_version_tracker_.Call(
+ FROM_HERE,
+ &InvalidationVersionTracker::SetMaxVersion,
+ model_type, invalidation.version());
+
+ std::string payload;
+ // payload() CHECK()'s has_payload(), so we must check it ourselves first.
+ if (invalidation.has_payload())
+ payload = invalidation.payload();
+
+ EmitInvalidation(syncable::ModelTypeSet(model_type), payload);
+ // TODO(akalin): We should really acknowledge only after we get the
+ // updates from the sync server. (see http://crbug.com/78462).
+ client->Acknowledge(ack_handle);
+}
+
+void ChromeInvalidationClient::InvalidateUnknownVersion(
+ invalidation::InvalidationClient* client,
+ const invalidation::ObjectId& object_id,
+ const invalidation::AckHandle& ack_handle) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "InvalidateUnknownVersion";
+
+ syncable::ModelType model_type;
+ if (!ObjectIdToRealModelType(object_id, &model_type)) {
+ LOG(WARNING) << "Could not get invalidation model type; "
+ << "invalidating everything";
+ EmitInvalidation(registered_types_, std::string());
+ client->Acknowledge(ack_handle);
+ return;
+ }
+
+ EmitInvalidation(syncable::ModelTypeSet(model_type), "");
+ // TODO(akalin): We should really acknowledge only after we get the
+ // updates from the sync server. (see http://crbug.com/78462).
+ client->Acknowledge(ack_handle);
+}
+
+// This should behave as if we got an invalidation with version
+// UNKNOWN_OBJECT_VERSION for all known data types.
+void ChromeInvalidationClient::InvalidateAll(
+ invalidation::InvalidationClient* client,
+ const invalidation::AckHandle& ack_handle) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "InvalidateAll";
+ EmitInvalidation(registered_types_, std::string());
+ // TODO(akalin): We should really acknowledge only after we get the
+ // updates from the sync server. (see http://crbug.com/76482).
+ client->Acknowledge(ack_handle);
+}
+
+void ChromeInvalidationClient::EmitInvalidation(
+ syncable::ModelTypeSet types, const std::string& payload) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ syncable::ModelTypePayloadMap type_payloads =
+ syncable::ModelTypePayloadMapFromEnumSet(types, payload);
+ listener_->OnInvalidate(type_payloads);
+}
+
+void ChromeInvalidationClient::InformRegistrationStatus(
+ invalidation::InvalidationClient* client,
+ const invalidation::ObjectId& object_id,
+ InvalidationListener::RegistrationState new_state) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "InformRegistrationStatus: "
+ << ObjectIdToString(object_id) << " " << new_state;
+
+ syncable::ModelType model_type;
+ if (!ObjectIdToRealModelType(object_id, &model_type)) {
+ LOG(WARNING) << "Could not get object id model type; ignoring";
+ return;
+ }
+
+ if (new_state != InvalidationListener::REGISTERED) {
+ // Let |registration_manager_| handle the registration backoff policy.
+ registration_manager_->MarkRegistrationLost(model_type);
+ }
+}
+
+void ChromeInvalidationClient::InformRegistrationFailure(
+ invalidation::InvalidationClient* client,
+ const invalidation::ObjectId& object_id,
+ bool is_transient,
+ const std::string& error_message) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "InformRegistrationFailure: "
+ << ObjectIdToString(object_id)
+ << "is_transient=" << is_transient
+ << ", message=" << error_message;
+
+ syncable::ModelType model_type;
+ if (!ObjectIdToRealModelType(object_id, &model_type)) {
+ LOG(WARNING) << "Could not get object id model type; ignoring";
+ return;
+ }
+
+ if (is_transient) {
+ // We don't care about |unknown_hint|; we let
+ // |registration_manager_| handle the registration backoff policy.
+ registration_manager_->MarkRegistrationLost(model_type);
+ } else {
+ // Non-transient failures are permanent, so block any future
+ // registration requests for |model_type|. (This happens if the
+ // server doesn't recognize the data type, which could happen for
+ // brand-new data types.)
+ registration_manager_->DisableType(model_type);
+ }
+}
+
+void ChromeInvalidationClient::ReissueRegistrations(
+ invalidation::InvalidationClient* client,
+ const std::string& prefix,
+ int prefix_length) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "AllRegistrationsLost";
+ registration_manager_->MarkAllRegistrationsLost();
+}
+
+void ChromeInvalidationClient::InformError(
+ invalidation::InvalidationClient* client,
+ const invalidation::ErrorInfo& error_info) {
+ listener_->OnSessionStatusChanged(false);
+ LOG(ERROR) << "Invalidation client encountered an error";
+ // TODO(ghc): handle the error.
+}
+
+void ChromeInvalidationClient::WriteState(const std::string& state) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ CHECK(state_writer_);
+ state_writer_->WriteState(state);
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/chrome_invalidation_client.h b/sync/notifier/chrome_invalidation_client.h
new file mode 100644
index 0000000..b1c8438
--- /dev/null
+++ b/sync/notifier/chrome_invalidation_client.h
@@ -0,0 +1,142 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A simple wrapper around invalidation::InvalidationClient that
+// handles all the startup/shutdown details and hookups.
+
+#ifndef SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
+#define SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
+#pragma once
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/threading/non_thread_safe.h"
+#include "google/cacheinvalidation/include/invalidation-listener.h"
+#include "sync/notifier/chrome_system_resources.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/notifier/state_writer.h"
+#include "sync/syncable/model_type.h"
+#include "sync/syncable/model_type_payload_map.h"
+#include "sync/util/weak_handle.h"
+
+namespace buzz {
+class XmppTaskParentInterface;
+} // namespace buzz
+
+namespace sync_notifier {
+
+using invalidation::InvalidationListener;
+
+class CacheInvalidationPacketHandler;
+class RegistrationManager;
+
+// ChromeInvalidationClient is not thread-safe and lives on the sync
+// thread.
+class ChromeInvalidationClient
+ : public InvalidationListener,
+ public StateWriter {
+ public:
+ class Listener {
+ public:
+ virtual ~Listener();
+
+ virtual void OnInvalidate(
+ const syncable::ModelTypePayloadMap& type_payloads) = 0;
+
+ virtual void OnSessionStatusChanged(bool has_session) = 0;
+ };
+
+ ChromeInvalidationClient();
+
+ // Calls Stop().
+ virtual ~ChromeInvalidationClient();
+
+ // Does not take ownership of |listener| or |state_writer|.
+ // |invalidation_version_tracker| must be initialized.
+ // |base_task| must still be non-NULL.
+ void Start(
+ const std::string& client_id, const std::string& client_info,
+ const std::string& state,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ Listener* listener,
+ StateWriter* state_writer,
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task);
+
+ void Stop();
+
+ // Changes the task used to |base_task|, which must still be
+ // non-NULL. Must only be called between calls to Start() and
+ // Stop().
+ void ChangeBaseTask(base::WeakPtr<buzz::XmppTaskParentInterface> base_task);
+
+ // Register the sync types that we're interested in getting
+ // notifications for. May be called at any time.
+ void RegisterTypes(syncable::ModelTypeSet types);
+
+ virtual void WriteState(const std::string& state) OVERRIDE;
+
+ // invalidation::InvalidationListener implementation.
+ virtual void Ready(
+ invalidation::InvalidationClient* client) OVERRIDE;
+ virtual void Invalidate(
+ invalidation::InvalidationClient* client,
+ const invalidation::Invalidation& invalidation,
+ const invalidation::AckHandle& ack_handle) OVERRIDE;
+ virtual void InvalidateUnknownVersion(
+ invalidation::InvalidationClient* client,
+ const invalidation::ObjectId& object_id,
+ const invalidation::AckHandle& ack_handle) OVERRIDE;
+ virtual void InvalidateAll(
+ invalidation::InvalidationClient* client,
+ const invalidation::AckHandle& ack_handle) OVERRIDE;
+ virtual void InformRegistrationStatus(
+ invalidation::InvalidationClient* client,
+ const invalidation::ObjectId& object_id,
+ InvalidationListener::RegistrationState reg_state) OVERRIDE;
+ virtual void InformRegistrationFailure(
+ invalidation::InvalidationClient* client,
+ const invalidation::ObjectId& object_id,
+ bool is_transient,
+ const std::string& error_message) OVERRIDE;
+ virtual void ReissueRegistrations(
+ invalidation::InvalidationClient* client,
+ const std::string& prefix,
+ int prefix_length) OVERRIDE;
+ virtual void InformError(
+ invalidation::InvalidationClient* client,
+ const invalidation::ErrorInfo& error_info) OVERRIDE;
+
+ private:
+ friend class ChromeInvalidationClientTest;
+
+ void EmitInvalidation(
+ syncable::ModelTypeSet types, const std::string& payload);
+
+ base::NonThreadSafe non_thread_safe_;
+ ChromeSystemResources chrome_system_resources_;
+ InvalidationVersionMap max_invalidation_versions_;
+ browser_sync::WeakHandle<InvalidationVersionTracker>
+ invalidation_version_tracker_;
+ Listener* listener_;
+ StateWriter* state_writer_;
+ scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
+ scoped_ptr<CacheInvalidationPacketHandler>
+ cache_invalidation_packet_handler_;
+ scoped_ptr<RegistrationManager> registration_manager_;
+ // Stored to pass to |registration_manager_| on start.
+ syncable::ModelTypeSet registered_types_;
+ bool ticl_ready_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationClient);
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
diff --git a/sync/notifier/chrome_invalidation_client_unittest.cc b/sync/notifier/chrome_invalidation_client_unittest.cc
new file mode 100644
index 0000000..72d71da
--- /dev/null
+++ b/sync/notifier/chrome_invalidation_client_unittest.cc
@@ -0,0 +1,275 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <string>
+
+#include "base/message_loop.h"
+#include "google/cacheinvalidation/include/invalidation-client.h"
+#include "google/cacheinvalidation/include/types.h"
+#include "google/cacheinvalidation/v2/types.pb.h"
+#include "jingle/notifier/base/fake_base_task.h"
+#include "sync/notifier/chrome_invalidation_client.h"
+#include "sync/notifier/state_writer.h"
+#include "sync/syncable/model_type.h"
+#include "sync/syncable/model_type_payload_map.h"
+#include "sync/util/weak_handle.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+
+using ::testing::_;
+using ::testing::Return;
+using ::testing::StrictMock;
+
+namespace {
+
+const char kClientId[] = "client_id";
+const char kClientInfo[] = "client_info";
+const char kState[] = "state";
+
+class MockInvalidationClient : public invalidation::InvalidationClient {
+ public:
+ MOCK_METHOD0(Start, void());
+ MOCK_METHOD0(Stop, void());
+ MOCK_METHOD1(Register, void(const invalidation::ObjectId&));
+ MOCK_METHOD1(Register, void(const std::vector<invalidation::ObjectId>&));
+ MOCK_METHOD1(Unregister, void(const invalidation::ObjectId&));
+ MOCK_METHOD1(Unregister, void(const std::vector<invalidation::ObjectId>&));
+ MOCK_METHOD1(Acknowledge, void(const invalidation::AckHandle&));
+};
+
+class MockListener : public ChromeInvalidationClient::Listener {
+ public:
+ MOCK_METHOD1(OnInvalidate, void(const syncable::ModelTypePayloadMap&));
+ MOCK_METHOD1(OnSessionStatusChanged, void(bool));
+};
+
+class MockInvalidationVersionTracker
+ : public InvalidationVersionTracker,
+ public base::SupportsWeakPtr<MockInvalidationVersionTracker> {
+ public:
+ MOCK_CONST_METHOD0(GetAllMaxVersions, InvalidationVersionMap());
+ MOCK_METHOD2(SetMaxVersion, void(syncable::ModelType, int64));
+};
+
+class MockStateWriter : public StateWriter {
+ public:
+ MOCK_METHOD1(WriteState, void(const std::string&));
+};
+
+} // namespace
+
+class ChromeInvalidationClientTest : public testing::Test {
+ protected:
+ virtual void SetUp() {
+ client_.Start(kClientId, kClientInfo, kState,
+ InvalidationVersionMap(),
+ browser_sync::MakeWeakHandle(
+ mock_invalidation_version_tracker_.AsWeakPtr()),
+ &mock_listener_, &mock_state_writer_,
+ fake_base_task_.AsWeakPtr());
+ }
+
+ virtual void TearDown() {
+ // client_.Stop() stops the invalidation scheduler, which deletes any
+ // pending tasks without running them. Some tasks "run and delete" another
+ // task, so they must be run in order to avoid leaking the inner task.
+ // client_.Stop() does not schedule any tasks, so it's both necessary and
+ // sufficient to drain the task queue before calling it.
+ message_loop_.RunAllPending();
+ client_.Stop();
+ }
+
+ // |payload| can be NULL, but not |type_name|.
+ void FireInvalidate(const char* type_name,
+ int64 version, const char* payload) {
+ const invalidation::ObjectId object_id(
+ ipc::invalidation::ObjectSource::CHROME_SYNC, type_name);
+ std::string payload_tmp = payload ? payload : "";
+ invalidation::Invalidation inv;
+ if (payload) {
+ inv = invalidation::Invalidation(object_id, version, payload);
+ } else {
+ inv = invalidation::Invalidation(object_id, version);
+ }
+ invalidation::AckHandle ack_handle("fakedata");
+ EXPECT_CALL(mock_invalidation_client_, Acknowledge(ack_handle));
+ client_.Invalidate(&mock_invalidation_client_, inv, ack_handle);
+ // Pump message loop to trigger
+ // InvalidationVersionTracker::SetMaxVersion().
+ message_loop_.RunAllPending();
+ }
+
+ // |payload| can be NULL, but not |type_name|.
+ void FireInvalidateUnknownVersion(const char* type_name) {
+ const invalidation::ObjectId object_id(
+ ipc::invalidation::ObjectSource::CHROME_SYNC, type_name);
+
+ invalidation::AckHandle ack_handle("fakedata");
+ EXPECT_CALL(mock_invalidation_client_, Acknowledge(ack_handle));
+ client_.InvalidateUnknownVersion(&mock_invalidation_client_, object_id,
+ ack_handle);
+ }
+
+ void FireInvalidateAll() {
+ invalidation::AckHandle ack_handle("fakedata");
+ EXPECT_CALL(mock_invalidation_client_, Acknowledge(ack_handle));
+ client_.InvalidateAll(&mock_invalidation_client_, ack_handle);
+ }
+
+ MessageLoop message_loop_;
+ StrictMock<MockListener> mock_listener_;
+ StrictMock<MockInvalidationVersionTracker>
+ mock_invalidation_version_tracker_;
+ StrictMock<MockStateWriter> mock_state_writer_;
+ StrictMock<MockInvalidationClient> mock_invalidation_client_;
+ notifier::FakeBaseTask fake_base_task_;
+ ChromeInvalidationClient client_;
+};
+
+namespace {
+
+syncable::ModelTypePayloadMap MakeMap(syncable::ModelType model_type,
+ const std::string& payload) {
+ syncable::ModelTypePayloadMap type_payloads;
+ type_payloads[model_type] = payload;
+ return type_payloads;
+}
+
+syncable::ModelTypePayloadMap MakeMapFromSet(syncable::ModelTypeSet types,
+ const std::string& payload) {
+ return syncable::ModelTypePayloadMapFromEnumSet(types, payload);
+}
+
+} // namespace
+
+TEST_F(ChromeInvalidationClientTest, InvalidateBadObjectId) {
+ syncable::ModelTypeSet types(syncable::BOOKMARKS, syncable::APPS);
+ client_.RegisterTypes(types);
+ EXPECT_CALL(mock_listener_, OnInvalidate(MakeMapFromSet(types, "")));
+ FireInvalidate("bad", 1, NULL);
+ message_loop_.RunAllPending();
+}
+
+TEST_F(ChromeInvalidationClientTest, InvalidateNoPayload) {
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::BOOKMARKS, "")));
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::BOOKMARKS, 1));
+ FireInvalidate("BOOKMARK", 1, NULL);
+}
+
+TEST_F(ChromeInvalidationClientTest, InvalidateWithPayload) {
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::PREFERENCES, "payload")));
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::PREFERENCES, 1));
+ FireInvalidate("PREFERENCE", 1, "payload");
+}
+
+TEST_F(ChromeInvalidationClientTest, InvalidateVersion) {
+ using ::testing::Mock;
+
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::APPS, "")));
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::APPS, 1));
+
+ // Should trigger.
+ FireInvalidate("APP", 1, NULL);
+
+ Mock::VerifyAndClearExpectations(&mock_listener_);
+
+ // Should be dropped.
+ FireInvalidate("APP", 1, NULL);
+}
+
+TEST_F(ChromeInvalidationClientTest, InvalidateUnknownVersion) {
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::EXTENSIONS, "")))
+ .Times(2);
+
+ // Should trigger twice.
+ FireInvalidateUnknownVersion("EXTENSION");
+ FireInvalidateUnknownVersion("EXTENSION");
+}
+
+TEST_F(ChromeInvalidationClientTest, InvalidateVersionMultipleTypes) {
+ using ::testing::Mock;
+
+ syncable::ModelTypeSet types(syncable::BOOKMARKS, syncable::APPS);
+ client_.RegisterTypes(types);
+
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::APPS, "")));
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::EXTENSIONS, "")));
+
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::APPS, 3));
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::EXTENSIONS, 2));
+
+ // Should trigger both.
+ FireInvalidate("APP", 3, NULL);
+ FireInvalidate("EXTENSION", 2, NULL);
+
+ Mock::VerifyAndClearExpectations(&mock_listener_);
+ Mock::VerifyAndClearExpectations(&mock_invalidation_version_tracker_);
+
+ // Should both be dropped.
+ FireInvalidate("APP", 1, NULL);
+ FireInvalidate("EXTENSION", 1, NULL);
+
+ Mock::VerifyAndClearExpectations(&mock_listener_);
+ Mock::VerifyAndClearExpectations(&mock_invalidation_version_tracker_);
+
+ // InvalidateAll shouldn't change any version state.
+ EXPECT_CALL(mock_listener_, OnInvalidate(MakeMapFromSet(types, "")));
+ FireInvalidateAll();
+
+ Mock::VerifyAndClearExpectations(&mock_listener_);
+ Mock::VerifyAndClearExpectations(&mock_invalidation_version_tracker_);
+
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::PREFERENCES, "")));
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::EXTENSIONS, "")));
+ EXPECT_CALL(mock_listener_,
+ OnInvalidate(MakeMap(syncable::APPS, "")));
+
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::PREFERENCES, 5));
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::EXTENSIONS, 3));
+ EXPECT_CALL(mock_invalidation_version_tracker_,
+ SetMaxVersion(syncable::APPS, 4));
+
+ // Should trigger all three.
+ FireInvalidate("PREFERENCE", 5, NULL);
+ FireInvalidate("EXTENSION", 3, NULL);
+ FireInvalidate("APP", 4, NULL);
+}
+
+TEST_F(ChromeInvalidationClientTest, InvalidateAll) {
+ syncable::ModelTypeSet types(syncable::PREFERENCES, syncable::EXTENSIONS);
+ client_.RegisterTypes(types);
+ EXPECT_CALL(mock_listener_, OnInvalidate(MakeMapFromSet(types, "")));
+ FireInvalidateAll();
+}
+
+TEST_F(ChromeInvalidationClientTest, RegisterTypes) {
+ syncable::ModelTypeSet types(syncable::PREFERENCES, syncable::EXTENSIONS);
+ client_.RegisterTypes(types);
+ // Registered types should be preserved across Stop/Start.
+ TearDown();
+ SetUp();
+ EXPECT_CALL(mock_listener_, OnInvalidate(MakeMapFromSet(types, "")));
+ FireInvalidateAll();
+}
+
+// TODO(akalin): Flesh out unit tests.
+
+} // namespace sync_notifier
diff --git a/sync/notifier/chrome_system_resources.cc b/sync/notifier/chrome_system_resources.cc
new file mode 100644
index 0000000..498f5fd
--- /dev/null
+++ b/sync/notifier/chrome_system_resources.cc
@@ -0,0 +1,298 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/chrome_system_resources.h"
+
+#include <cstdlib>
+#include <cstring>
+#include <string>
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/stl_util.h"
+#include "base/string_util.h"
+#include "base/stringprintf.h"
+#include "google/cacheinvalidation/include/types.h"
+#include "sync/notifier/cache_invalidation_packet_handler.h"
+#include "sync/notifier/invalidation_util.h"
+
+namespace sync_notifier {
+
+ChromeLogger::ChromeLogger() {}
+ChromeLogger::~ChromeLogger() {}
+
+void ChromeLogger::Log(LogLevel level, const char* file, int line,
+ const char* format, ...) {
+ logging::LogSeverity log_severity = -2; // VLOG(2)
+ bool emit_log = false;
+ switch (level) {
+ case FINE_LEVEL:
+ log_severity = -2; // VLOG(2)
+ emit_log = VLOG_IS_ON(2);
+ break;
+ case INFO_LEVEL:
+ log_severity = -1; // VLOG(1)
+ emit_log = VLOG_IS_ON(1);
+ break;
+ case WARNING_LEVEL:
+ log_severity = logging::LOG_WARNING;
+ emit_log = LOG_IS_ON(WARNING);
+ break;
+ case SEVERE_LEVEL:
+ log_severity = logging::LOG_ERROR;
+ emit_log = LOG_IS_ON(ERROR);
+ break;
+ }
+ if (emit_log) {
+ va_list ap;
+ va_start(ap, format);
+ std::string result;
+ base::StringAppendV(&result, format, ap);
+ logging::LogMessage(file, line, log_severity).stream() << result;
+ va_end(ap);
+ }
+}
+
+void ChromeLogger::SetSystemResources(
+ invalidation::SystemResources* resources) {
+ // Do nothing.
+}
+
+ChromeScheduler::ChromeScheduler()
+ : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
+ created_on_loop_(MessageLoop::current()),
+ is_started_(false),
+ is_stopped_(false) {
+ CHECK(created_on_loop_);
+}
+
+ChromeScheduler::~ChromeScheduler() {
+ CHECK_EQ(created_on_loop_, MessageLoop::current());
+ CHECK(is_stopped_);
+}
+
+void ChromeScheduler::Start() {
+ CHECK_EQ(created_on_loop_, MessageLoop::current());
+ CHECK(!is_started_);
+ is_started_ = true;
+ is_stopped_ = false;
+ weak_factory_.InvalidateWeakPtrs();
+}
+
+void ChromeScheduler::Stop() {
+ CHECK_EQ(created_on_loop_, MessageLoop::current());
+ is_stopped_ = true;
+ is_started_ = false;
+ weak_factory_.InvalidateWeakPtrs();
+ STLDeleteElements(&posted_tasks_);
+ posted_tasks_.clear();
+}
+
+void ChromeScheduler::Schedule(invalidation::TimeDelta delay,
+ invalidation::Closure* task) {
+ DCHECK(invalidation::IsCallbackRepeatable(task));
+ CHECK_EQ(created_on_loop_, MessageLoop::current());
+
+ if (!is_started_) {
+ delete task;
+ return;
+ }
+
+ posted_tasks_.insert(task);
+ MessageLoop::current()->PostDelayedTask(
+ FROM_HERE, base::Bind(&ChromeScheduler::RunPostedTask,
+ weak_factory_.GetWeakPtr(), task),
+ delay);
+}
+
+bool ChromeScheduler::IsRunningOnThread() const {
+ return created_on_loop_ == MessageLoop::current();
+}
+
+invalidation::Time ChromeScheduler::GetCurrentTime() const {
+ CHECK_EQ(created_on_loop_, MessageLoop::current());
+ return base::Time::Now();
+}
+
+void ChromeScheduler::SetSystemResources(
+ invalidation::SystemResources* resources) {
+ // Do nothing.
+}
+
+void ChromeScheduler::RunPostedTask(invalidation::Closure* task) {
+ CHECK_EQ(created_on_loop_, MessageLoop::current());
+ RunAndDeleteClosure(task);
+ posted_tasks_.erase(task);
+}
+
+ChromeStorage::ChromeStorage(StateWriter* state_writer,
+ invalidation::Scheduler* scheduler)
+ : state_writer_(state_writer),
+ scheduler_(scheduler) {
+ DCHECK(state_writer_);
+ DCHECK(scheduler_);
+}
+
+ChromeStorage::~ChromeStorage() {}
+
+void ChromeStorage::WriteKey(const std::string& key, const std::string& value,
+ invalidation::WriteKeyCallback* done) {
+ CHECK(state_writer_);
+ // TODO(ghc): actually write key,value associations, and don't invoke the
+ // callback until the operation completes.
+ state_writer_->WriteState(value);
+ cached_state_ = value;
+ // According to the cache invalidation API folks, we can do this as
+ // long as we make sure to clear the persistent state that we start
+ // up the cache invalidation client with. However, we musn't do it
+ // right away, as we may be called under a lock that the callback
+ // uses.
+ scheduler_->Schedule(
+ invalidation::Scheduler::NoDelay(),
+ invalidation::NewPermanentCallback(
+ this, &ChromeStorage::RunAndDeleteWriteKeyCallback,
+ done));
+}
+
+void ChromeStorage::ReadKey(const std::string& key,
+ invalidation::ReadKeyCallback* done) {
+ DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread";
+ RunAndDeleteReadKeyCallback(done, cached_state_);
+}
+
+void ChromeStorage::DeleteKey(const std::string& key,
+ invalidation::DeleteKeyCallback* done) {
+ // TODO(ghc): Implement.
+ LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)";
+}
+
+void ChromeStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) {
+ // TODO(ghc): Implement.
+ LOG(WARNING) << "ignoring call to ReadAllKeys(callback)";
+}
+
+void ChromeStorage::SetSystemResources(
+ invalidation::SystemResources* resources) {
+ // Do nothing.
+}
+
+void ChromeStorage::RunAndDeleteWriteKeyCallback(
+ invalidation::WriteKeyCallback* callback) {
+ callback->Run(invalidation::Status(invalidation::Status::SUCCESS, ""));
+ delete callback;
+}
+
+void ChromeStorage::RunAndDeleteReadKeyCallback(
+ invalidation::ReadKeyCallback* callback, const std::string& value) {
+ callback->Run(std::make_pair(
+ invalidation::Status(invalidation::Status::SUCCESS, ""),
+ value));
+ delete callback;
+}
+
+ChromeNetwork::ChromeNetwork()
+ : packet_handler_(NULL),
+ ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) {}
+
+ChromeNetwork::~ChromeNetwork() {
+ STLDeleteElements(&network_status_receivers_);
+}
+
+void ChromeNetwork::SendMessage(const std::string& outgoing_message) {
+ if (packet_handler_) {
+ packet_handler_->SendMessage(outgoing_message);
+ }
+}
+
+void ChromeNetwork::SetMessageReceiver(
+ invalidation::MessageCallback* incoming_receiver) {
+ incoming_receiver_.reset(incoming_receiver);
+}
+
+void ChromeNetwork::AddNetworkStatusReceiver(
+ invalidation::NetworkStatusCallback* network_status_receiver) {
+ network_status_receivers_.push_back(network_status_receiver);
+}
+
+void ChromeNetwork::SetSystemResources(
+ invalidation::SystemResources* resources) {
+ // Do nothing.
+}
+
+void ChromeNetwork::UpdatePacketHandler(
+ CacheInvalidationPacketHandler* packet_handler) {
+ packet_handler_ = packet_handler;
+ if (packet_handler_ != NULL) {
+ packet_handler_->SetMessageReceiver(
+ new invalidation::MessageCallback(
+ base::Bind(&ChromeNetwork::HandleInboundMessage,
+ weak_factory_.GetWeakPtr())));
+ }
+ packet_handler_->SendSubscriptionRequest();
+}
+
+void ChromeNetwork::HandleInboundMessage(const std::string& incoming_message) {
+ if (incoming_receiver_.get()) {
+ incoming_receiver_->Run(incoming_message);
+ }
+}
+
+ChromeSystemResources::ChromeSystemResources(StateWriter* state_writer)
+ : is_started_(false),
+ logger_(new ChromeLogger()),
+ internal_scheduler_(new ChromeScheduler()),
+ listener_scheduler_(new ChromeScheduler()),
+ storage_(new ChromeStorage(state_writer, internal_scheduler_.get())),
+ network_(new ChromeNetwork()) {
+}
+
+ChromeSystemResources::~ChromeSystemResources() {
+ Stop();
+}
+
+void ChromeSystemResources::Start() {
+ internal_scheduler_->Start();
+ listener_scheduler_->Start();
+ is_started_ = true;
+}
+
+void ChromeSystemResources::Stop() {
+ internal_scheduler_->Stop();
+ listener_scheduler_->Stop();
+}
+
+bool ChromeSystemResources::IsStarted() const {
+ return is_started_;
+}
+
+void ChromeSystemResources::set_platform(const std::string& platform) {
+ platform_ = platform;
+}
+
+std::string ChromeSystemResources::platform() const {
+ return platform_;
+}
+
+ChromeLogger* ChromeSystemResources::logger() {
+ return logger_.get();
+}
+
+ChromeStorage* ChromeSystemResources::storage() {
+ return storage_.get();
+}
+
+ChromeNetwork* ChromeSystemResources::network() {
+ return network_.get();
+}
+
+ChromeScheduler* ChromeSystemResources::internal_scheduler() {
+ return internal_scheduler_.get();
+}
+
+ChromeScheduler* ChromeSystemResources::listener_scheduler() {
+ return listener_scheduler_.get();
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/chrome_system_resources.h b/sync/notifier/chrome_system_resources.h
new file mode 100644
index 0000000..032e050
--- /dev/null
+++ b/sync/notifier/chrome_system_resources.h
@@ -0,0 +1,176 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Simple system resources class that uses the current message loop
+// for scheduling. Assumes the current message loop is already
+// running.
+
+#ifndef SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
+#define SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
+#pragma once
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include "base/compiler_specific.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop.h"
+#include "base/threading/non_thread_safe.h"
+#include "google/cacheinvalidation/include/system-resources.h"
+#include "sync/notifier/state_writer.h"
+
+namespace sync_notifier {
+
+class CacheInvalidationPacketHandler;
+
+class ChromeLogger : public invalidation::Logger {
+ public:
+ ChromeLogger();
+
+ virtual ~ChromeLogger();
+
+ // invalidation::Logger implementation.
+ virtual void Log(LogLevel level, const char* file, int line,
+ const char* format, ...) OVERRIDE;
+
+ virtual void SetSystemResources(
+ invalidation::SystemResources* resources) OVERRIDE;
+};
+
+class ChromeScheduler : public invalidation::Scheduler {
+ public:
+ ChromeScheduler();
+
+ virtual ~ChromeScheduler();
+
+ // Start and stop the scheduler.
+ void Start();
+ void Stop();
+
+ // invalidation::Scheduler implementation.
+ virtual void Schedule(invalidation::TimeDelta delay,
+ invalidation::Closure* task) OVERRIDE;
+
+ virtual bool IsRunningOnThread() const OVERRIDE;
+
+ virtual invalidation::Time GetCurrentTime() const OVERRIDE;
+
+ virtual void SetSystemResources(
+ invalidation::SystemResources* resources) OVERRIDE;
+
+ private:
+ base::WeakPtrFactory<ChromeScheduler> weak_factory_;
+ // Holds all posted tasks that have not yet been run.
+ std::set<invalidation::Closure*> posted_tasks_;
+
+ const MessageLoop* created_on_loop_;
+ bool is_started_;
+ bool is_stopped_;
+
+ // Runs the task, deletes it, and removes it from |posted_tasks_|.
+ void RunPostedTask(invalidation::Closure* task);
+};
+
+class ChromeStorage : public invalidation::Storage {
+ public:
+ ChromeStorage(StateWriter* state_writer, invalidation::Scheduler* scheduler);
+
+ virtual ~ChromeStorage();
+
+ void SetInitialState(const std::string& value) {
+ cached_state_ = value;
+ }
+
+ // invalidation::Storage implementation.
+ virtual void WriteKey(const std::string& key, const std::string& value,
+ invalidation::WriteKeyCallback* done) OVERRIDE;
+
+ virtual void ReadKey(const std::string& key,
+ invalidation::ReadKeyCallback* done) OVERRIDE;
+
+ virtual void DeleteKey(const std::string& key,
+ invalidation::DeleteKeyCallback* done) OVERRIDE;
+
+ virtual void ReadAllKeys(
+ invalidation::ReadAllKeysCallback* key_callback) OVERRIDE;
+
+ virtual void SetSystemResources(
+ invalidation::SystemResources* resources) OVERRIDE;
+
+ private:
+ // Runs the given storage callback with SUCCESS status and deletes it.
+ void RunAndDeleteWriteKeyCallback(
+ invalidation::WriteKeyCallback* callback);
+
+ // Runs the given callback with the given value and deletes it.
+ void RunAndDeleteReadKeyCallback(
+ invalidation::ReadKeyCallback* callback, const std::string& value);
+
+ StateWriter* state_writer_;
+ invalidation::Scheduler* scheduler_;
+ std::string cached_state_;
+};
+
+class ChromeNetwork : public invalidation::NetworkChannel {
+ public:
+ ChromeNetwork();
+
+ virtual ~ChromeNetwork();
+
+ void UpdatePacketHandler(CacheInvalidationPacketHandler* packet_handler);
+
+ // invalidation::NetworkChannel implementation.
+ virtual void SendMessage(const std::string& outgoing_message) OVERRIDE;
+
+ virtual void SetMessageReceiver(
+ invalidation::MessageCallback* incoming_receiver) OVERRIDE;
+
+ virtual void AddNetworkStatusReceiver(
+ invalidation::NetworkStatusCallback* network_status_receiver) OVERRIDE;
+
+ virtual void SetSystemResources(
+ invalidation::SystemResources* resources) OVERRIDE;
+
+ private:
+ void HandleInboundMessage(const std::string& incoming_message);
+
+ CacheInvalidationPacketHandler* packet_handler_;
+ scoped_ptr<invalidation::MessageCallback> incoming_receiver_;
+ std::vector<invalidation::NetworkStatusCallback*> network_status_receivers_;
+ base::WeakPtrFactory<ChromeNetwork> weak_factory_;
+};
+
+class ChromeSystemResources : public invalidation::SystemResources {
+ public:
+ explicit ChromeSystemResources(StateWriter* state_writer);
+
+ virtual ~ChromeSystemResources();
+
+ // invalidation::SystemResources implementation.
+ virtual void Start() OVERRIDE;
+ virtual void Stop() OVERRIDE;
+ virtual bool IsStarted() const OVERRIDE;
+ virtual void set_platform(const std::string& platform);
+ virtual std::string platform() const OVERRIDE;
+ virtual ChromeLogger* logger() OVERRIDE;
+ virtual ChromeStorage* storage() OVERRIDE;
+ virtual ChromeNetwork* network() OVERRIDE;
+ virtual ChromeScheduler* internal_scheduler() OVERRIDE;
+ virtual ChromeScheduler* listener_scheduler() OVERRIDE;
+
+ private:
+ bool is_started_;
+ std::string platform_;
+ scoped_ptr<ChromeLogger> logger_;
+ scoped_ptr<ChromeScheduler> internal_scheduler_;
+ scoped_ptr<ChromeScheduler> listener_scheduler_;
+ scoped_ptr<ChromeStorage> storage_;
+ scoped_ptr<ChromeNetwork> network_;
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
diff --git a/sync/notifier/chrome_system_resources_unittest.cc b/sync/notifier/chrome_system_resources_unittest.cc
new file mode 100644
index 0000000..a61497f
--- /dev/null
+++ b/sync/notifier/chrome_system_resources_unittest.cc
@@ -0,0 +1,174 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/chrome_system_resources.h"
+
+#include <string>
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/message_loop.h"
+
+#include "google/cacheinvalidation/include/types.h"
+#include "sync/notifier/state_writer.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+namespace {
+
+using ::testing::_;
+using ::testing::SaveArg;
+
+class MockStateWriter : public StateWriter {
+ public:
+ MOCK_METHOD1(WriteState, void(const std::string&));
+};
+
+class MockClosure {
+ public:
+ MOCK_CONST_METHOD0(Run, void(void));
+ base::Closure* CreateClosure() {
+ return new base::Closure(
+ base::Bind(&MockClosure::Run, base::Unretained(this)));
+ }
+};
+
+class MockStorageCallback {
+ public:
+ MOCK_CONST_METHOD1(Run, void(invalidation::Status));
+ base::Callback<void(invalidation::Status)>* CreateCallback() {
+ return new base::Callback<void(invalidation::Status)>(
+ base::Bind(&MockStorageCallback::Run, base::Unretained(this)));
+ }
+};
+
+class ChromeSystemResourcesTest : public testing::Test {
+ protected:
+ ChromeSystemResourcesTest()
+ : chrome_system_resources_(&mock_state_writer_) {}
+
+ virtual ~ChromeSystemResourcesTest() {}
+
+ void ScheduleShouldNotRun() {
+ {
+ // Owned by ScheduleImmediately.
+ MockClosure mock_closure;
+ base::Closure* should_not_run = mock_closure.CreateClosure();
+ EXPECT_CALL(mock_closure, Run()).Times(0);
+ chrome_system_resources_.internal_scheduler()->Schedule(
+ invalidation::Scheduler::NoDelay(), should_not_run);
+ }
+ {
+ // Owned by ScheduleOnListenerThread.
+ MockClosure mock_closure;
+ base::Closure* should_not_run = mock_closure.CreateClosure();
+ EXPECT_CALL(mock_closure, Run()).Times(0);
+ chrome_system_resources_.listener_scheduler()->Schedule(
+ invalidation::Scheduler::NoDelay(), should_not_run);
+ }
+ {
+ // Owned by ScheduleWithDelay.
+ MockClosure mock_closure;
+ base::Closure* should_not_run = mock_closure.CreateClosure();
+ EXPECT_CALL(mock_closure, Run()).Times(0);
+ chrome_system_resources_.internal_scheduler()->Schedule(
+ invalidation::TimeDelta::FromSeconds(0), should_not_run);
+ }
+ }
+
+ // Needed by |chrome_system_resources_|.
+ MessageLoop message_loop_;
+ MockStateWriter mock_state_writer_;
+ ChromeSystemResources chrome_system_resources_;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ChromeSystemResourcesTest);
+};
+
+// Make sure current_time() doesn't crash or leak.
+TEST_F(ChromeSystemResourcesTest, CurrentTime) {
+ invalidation::Time current_time =
+ chrome_system_resources_.internal_scheduler()->GetCurrentTime();
+ DVLOG(1) << "current_time returned: " << current_time.ToInternalValue();
+}
+
+// Make sure Log() doesn't crash or leak.
+TEST_F(ChromeSystemResourcesTest, Log) {
+ chrome_system_resources_.logger()->Log(ChromeLogger::INFO_LEVEL,
+ __FILE__, __LINE__, "%s %d",
+ "test string", 5);
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleBeforeStart) {
+ ScheduleShouldNotRun();
+ chrome_system_resources_.Start();
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleAfterStop) {
+ chrome_system_resources_.Start();
+ chrome_system_resources_.Stop();
+ ScheduleShouldNotRun();
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleAndStop) {
+ chrome_system_resources_.Start();
+ ScheduleShouldNotRun();
+ chrome_system_resources_.Stop();
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleAndDestroy) {
+ chrome_system_resources_.Start();
+ ScheduleShouldNotRun();
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleImmediately) {
+ chrome_system_resources_.Start();
+ MockClosure mock_closure;
+ EXPECT_CALL(mock_closure, Run());
+ chrome_system_resources_.internal_scheduler()->Schedule(
+ invalidation::Scheduler::NoDelay(), mock_closure.CreateClosure());
+ message_loop_.RunAllPending();
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleOnListenerThread) {
+ chrome_system_resources_.Start();
+ MockClosure mock_closure;
+ EXPECT_CALL(mock_closure, Run());
+ chrome_system_resources_.listener_scheduler()->Schedule(
+ invalidation::Scheduler::NoDelay(), mock_closure.CreateClosure());
+ EXPECT_TRUE(
+ chrome_system_resources_.internal_scheduler()->IsRunningOnThread());
+ message_loop_.RunAllPending();
+}
+
+TEST_F(ChromeSystemResourcesTest, ScheduleWithZeroDelay) {
+ chrome_system_resources_.Start();
+ MockClosure mock_closure;
+ EXPECT_CALL(mock_closure, Run());
+ chrome_system_resources_.internal_scheduler()->Schedule(
+ invalidation::TimeDelta::FromSeconds(0), mock_closure.CreateClosure());
+ message_loop_.RunAllPending();
+}
+
+// TODO(akalin): Figure out how to test with a non-zero delay.
+
+TEST_F(ChromeSystemResourcesTest, WriteState) {
+ chrome_system_resources_.Start();
+ EXPECT_CALL(mock_state_writer_, WriteState(_));
+ // Owned by WriteState.
+ MockStorageCallback mock_storage_callback;
+ invalidation::Status results(invalidation::Status::PERMANENT_FAILURE,
+ "fake-failure");
+ EXPECT_CALL(mock_storage_callback, Run(_))
+ .WillOnce(SaveArg<0>(&results));
+ chrome_system_resources_.storage()->WriteKey(
+ "", "state", mock_storage_callback.CreateCallback());
+ message_loop_.RunAllPending();
+ EXPECT_EQ(invalidation::Status(invalidation::Status::SUCCESS, ""), results);
+}
+
+} // namespace
+} // namespace notifier
diff --git a/sync/notifier/invalidation_notifier.cc b/sync/notifier/invalidation_notifier.cc
new file mode 100644
index 0000000..f9ab44d
--- /dev/null
+++ b/sync/notifier/invalidation_notifier.cc
@@ -0,0 +1,150 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/invalidation_notifier.h"
+
+#include "base/logging.h"
+#include "base/message_loop_proxy.h"
+#include "jingle/notifier/base/const_communicator.h"
+#include "jingle/notifier/base/notifier_options_util.h"
+#include "jingle/notifier/communicator/connection_options.h"
+#include "net/url_request/url_request_context.h"
+#include "sync/notifier/sync_notifier_observer.h"
+#include "sync/protocol/service_constants.h"
+#include "sync/syncable/model_type_payload_map.h"
+#include "talk/xmpp/jid.h"
+#include "talk/xmpp/xmppclientsettings.h"
+
+namespace sync_notifier {
+
+InvalidationNotifier::InvalidationNotifier(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info)
+ : state_(STOPPED),
+ notifier_options_(notifier_options),
+ initial_max_invalidation_versions_(initial_max_invalidation_versions),
+ invalidation_version_tracker_(invalidation_version_tracker),
+ client_info_(client_info) {
+ DCHECK_EQ(notifier::NOTIFICATION_SERVER,
+ notifier_options.notification_method);
+ DCHECK(notifier_options_.request_context_getter);
+ // TODO(akalin): Replace NonThreadSafe checks with IO thread checks.
+ DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()->
+ BelongsToCurrentThread());
+}
+
+InvalidationNotifier::~InvalidationNotifier() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+}
+
+void InvalidationNotifier::AddObserver(SyncNotifierObserver* observer) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ observers_.AddObserver(observer);
+}
+
+void InvalidationNotifier::RemoveObserver(SyncNotifierObserver* observer) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ observers_.RemoveObserver(observer);
+}
+
+void InvalidationNotifier::SetUniqueId(const std::string& unique_id) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ invalidation_client_id_ = unique_id;
+ DVLOG(1) << "Setting unique ID to " << unique_id;
+ CHECK(!invalidation_client_id_.empty());
+}
+
+void InvalidationNotifier::SetState(const std::string& state) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ invalidation_state_ = state;
+ DVLOG(1) << "Setting new state";
+}
+
+void InvalidationNotifier::UpdateCredentials(
+ const std::string& email, const std::string& token) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ CHECK(!invalidation_client_id_.empty());
+ DVLOG(1) << "Updating credentials for " << email;
+ buzz::XmppClientSettings xmpp_client_settings =
+ notifier::MakeXmppClientSettings(notifier_options_,
+ email, token, SYNC_SERVICE_NAME);
+ if (state_ >= CONNECTING) {
+ login_->UpdateXmppSettings(xmpp_client_settings);
+ } else {
+ notifier::ConnectionOptions options;
+ DVLOG(1) << "First time updating credentials: connecting";
+ login_.reset(
+ new notifier::Login(this,
+ xmpp_client_settings,
+ notifier::ConnectionOptions(),
+ notifier_options_.request_context_getter,
+ notifier::GetServerList(notifier_options_),
+ notifier_options_.try_ssltcp_first,
+ notifier_options_.auth_mechanism));
+ login_->StartConnection();
+ state_ = CONNECTING;
+ }
+}
+
+void InvalidationNotifier::UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ CHECK(!invalidation_client_id_.empty());
+ invalidation_client_.RegisterTypes(enabled_types);
+}
+
+void InvalidationNotifier::SendNotification(
+ syncable::ModelTypeSet changed_types) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ // Do nothing.
+}
+
+void InvalidationNotifier::OnConnect(
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "OnConnect";
+ if (state_ >= STARTED) {
+ invalidation_client_.ChangeBaseTask(base_task);
+ } else {
+ DVLOG(1) << "First time connecting: starting invalidation client with id "
+ << invalidation_client_id_ << " and client info "
+ << client_info_;
+ invalidation_client_.Start(
+ invalidation_client_id_, client_info_, invalidation_state_,
+ initial_max_invalidation_versions_,
+ invalidation_version_tracker_,
+ this, this, base_task);
+ invalidation_state_.clear();
+ state_ = STARTED;
+ }
+}
+
+void InvalidationNotifier::OnDisconnect() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "OnDisconnect";
+}
+
+void InvalidationNotifier::OnInvalidate(
+ const syncable::ModelTypePayloadMap& type_payloads) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
+ OnIncomingNotification(type_payloads,
+ sync_notifier::REMOTE_NOTIFICATION));
+}
+
+void InvalidationNotifier::OnSessionStatusChanged(bool has_session) {
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
+ OnNotificationStateChange(has_session));
+}
+
+void InvalidationNotifier::WriteState(const std::string& state) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ DVLOG(1) << "WriteState";
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observers_, StoreState(state));
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/invalidation_notifier.h b/sync/notifier/invalidation_notifier.h
new file mode 100644
index 0000000..385fbfc
--- /dev/null
+++ b/sync/notifier/invalidation_notifier.h
@@ -0,0 +1,123 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// An implementation of SyncNotifier that wraps an invalidation
+// client. Handles the details of connecting to XMPP and hooking it
+// up to the invalidation client.
+//
+// You probably don't want to use this directly; use
+// NonBlockingInvalidationNotifier.
+
+#ifndef SYNC_NOTIFIER_INVALIDATION_NOTIFIER_H_
+#define SYNC_NOTIFIER_INVALIDATION_NOTIFIER_H_
+#pragma once
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/observer_list.h"
+#include "base/threading/non_thread_safe.h"
+#include "jingle/notifier/base/notifier_options.h"
+#include "jingle/notifier/communicator/login.h"
+#include "sync/notifier/chrome_invalidation_client.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/notifier/state_writer.h"
+#include "sync/notifier/sync_notifier.h"
+#include "sync/syncable/model_type.h"
+#include "sync/util/weak_handle.h"
+
+namespace sync_notifier {
+
+// This class must live on the IO thread.
+class InvalidationNotifier
+ : public SyncNotifier,
+ public notifier::LoginDelegate,
+ public ChromeInvalidationClient::Listener,
+ public StateWriter {
+ public:
+ // |invalidation_version_tracker| must be initialized.
+ InvalidationNotifier(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info);
+
+ virtual ~InvalidationNotifier();
+
+ // SyncNotifier implementation.
+ virtual void AddObserver(SyncNotifierObserver* observer) OVERRIDE;
+ virtual void RemoveObserver(SyncNotifierObserver* observer) OVERRIDE;
+ virtual void SetUniqueId(const std::string& unique_id) OVERRIDE;
+ virtual void SetState(const std::string& state) OVERRIDE;
+ virtual void UpdateCredentials(
+ const std::string& email, const std::string& token) OVERRIDE;
+ virtual void UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) OVERRIDE;
+ virtual void SendNotification(
+ syncable::ModelTypeSet changed_types) OVERRIDE;
+
+ // notifier::LoginDelegate implementation.
+ virtual void OnConnect(
+ base::WeakPtr<buzz::XmppTaskParentInterface> base_task) OVERRIDE;
+ virtual void OnDisconnect() OVERRIDE;
+
+ // ChromeInvalidationClient::Listener implementation.
+ virtual void OnInvalidate(
+ const syncable::ModelTypePayloadMap& type_payloads) OVERRIDE;
+ virtual void OnSessionStatusChanged(bool has_session) OVERRIDE;
+
+ // StateWriter implementation.
+ virtual void WriteState(const std::string& state) OVERRIDE;
+
+ private:
+ base::NonThreadSafe non_thread_safe_;
+
+ // We start off in the STOPPED state. When we get our initial
+ // credentials, we connect and move to the CONNECTING state. When
+ // we're connected we start the invalidation client and move to the
+ // STARTED state. We never go back to a previous state.
+ enum State {
+ STOPPED,
+ CONNECTING,
+ STARTED
+ };
+ State state_;
+
+ // Used to build parameters for |login_|.
+ const notifier::NotifierOptions notifier_options_;
+
+ // Passed to |invalidation_client_|.
+ const InvalidationVersionMap initial_max_invalidation_versions_;
+
+ // Passed to |invalidation_client_|.
+ const browser_sync::WeakHandle<InvalidationVersionTracker>
+ invalidation_version_tracker_;
+
+ // Passed to |invalidation_client_|.
+ const std::string client_info_;
+
+ // Our observers (which must live on the same thread).
+ ObserverList<SyncNotifierObserver> observers_;
+
+ // The client ID to pass to |chrome_invalidation_client_|.
+ std::string invalidation_client_id_;
+
+ // The state to pass to |chrome_invalidation_client_|.
+ std::string invalidation_state_;
+
+ // The XMPP connection manager.
+ scoped_ptr<notifier::Login> login_;
+
+ // The invalidation client.
+ ChromeInvalidationClient invalidation_client_;
+
+ DISALLOW_COPY_AND_ASSIGN(InvalidationNotifier);
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_INVALIDATION_NOTIFIER_H_
diff --git a/sync/notifier/invalidation_notifier_unittest.cc b/sync/notifier/invalidation_notifier_unittest.cc
new file mode 100644
index 0000000..701e133
--- /dev/null
+++ b/sync/notifier/invalidation_notifier_unittest.cc
@@ -0,0 +1,96 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/invalidation_notifier.h"
+
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "jingle/notifier/base/fake_base_task.h"
+#include "jingle/notifier/base/notifier_options.h"
+#include "net/url_request/url_request_test_util.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/notifier/mock_sync_notifier_observer.h"
+#include "sync/syncable/model_type.h"
+#include "sync/syncable/model_type_payload_map.h"
+#include "sync/util/weak_handle.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+
+namespace {
+
+using ::testing::InSequence;
+using ::testing::StrictMock;
+
+class InvalidationNotifierTest : public testing::Test {
+ protected:
+ virtual void SetUp() {
+ notifier::NotifierOptions notifier_options;
+ // Note: URLRequestContextGetters are ref-counted.
+ notifier_options.request_context_getter =
+ new TestURLRequestContextGetter(message_loop_.message_loop_proxy());
+ invalidation_notifier_.reset(
+ new InvalidationNotifier(
+ notifier_options,
+ InvalidationVersionMap(),
+ browser_sync::MakeWeakHandle(
+ base::WeakPtr<InvalidationVersionTracker>()),
+ "fake_client_info"));
+ invalidation_notifier_->AddObserver(&mock_observer_);
+ }
+
+ virtual void TearDown() {
+ invalidation_notifier_->RemoveObserver(&mock_observer_);
+ // Stopping the invalidation notifier stops its scheduler, which deletes any
+ // pending tasks without running them. Some tasks "run and delete" another
+ // task, so they must be run in order to avoid leaking the inner task.
+ // Stopping does not schedule any tasks, so it's both necessary and
+ // sufficient to drain the task queue before stopping the notifier.
+ message_loop_.RunAllPending();
+ invalidation_notifier_.reset();
+ }
+
+ MessageLoop message_loop_;
+ scoped_ptr<InvalidationNotifier> invalidation_notifier_;
+ StrictMock<MockSyncNotifierObserver> mock_observer_;
+ notifier::FakeBaseTask fake_base_task_;
+};
+
+TEST_F(InvalidationNotifierTest, Basic) {
+ InSequence dummy;
+
+ syncable::ModelTypePayloadMap type_payloads;
+ type_payloads[syncable::PREFERENCES] = "payload";
+ type_payloads[syncable::BOOKMARKS] = "";
+ type_payloads[syncable::AUTOFILL] = "";
+
+ EXPECT_CALL(mock_observer_, OnNotificationStateChange(true));
+ EXPECT_CALL(mock_observer_, StoreState("new_fake_state"));
+ EXPECT_CALL(mock_observer_,
+ OnIncomingNotification(type_payloads,
+ REMOTE_NOTIFICATION));
+ EXPECT_CALL(mock_observer_, OnNotificationStateChange(false));
+
+ invalidation_notifier_->SetState("fake_state");
+ invalidation_notifier_->SetUniqueId("fake_id");
+ invalidation_notifier_->UpdateCredentials("foo@bar.com", "fake_token");
+
+ invalidation_notifier_->OnConnect(fake_base_task_.AsWeakPtr());
+ invalidation_notifier_->OnSessionStatusChanged(true);
+
+ invalidation_notifier_->WriteState("new_fake_state");
+
+ invalidation_notifier_->OnInvalidate(type_payloads);
+
+ // Shouldn't trigger notification state change.
+ invalidation_notifier_->OnDisconnect();
+ invalidation_notifier_->OnConnect(fake_base_task_.AsWeakPtr());
+
+ invalidation_notifier_->OnSessionStatusChanged(false);
+}
+
+} // namespace
+
+} // namespace sync_notifier
diff --git a/sync/notifier/invalidation_util.cc b/sync/notifier/invalidation_util.cc
new file mode 100644
index 0000000..126e56f
--- /dev/null
+++ b/sync/notifier/invalidation_util.cc
@@ -0,0 +1,58 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/invalidation_util.h"
+
+#include <sstream>
+
+#include "google/cacheinvalidation/include/types.h"
+#include "google/cacheinvalidation/v2/types.pb.h"
+
+namespace sync_notifier {
+
+void RunAndDeleteClosure(invalidation::Closure* task) {
+ task->Run();
+ delete task;
+}
+
+bool RealModelTypeToObjectId(syncable::ModelType model_type,
+ invalidation::ObjectId* object_id) {
+ std::string notification_type;
+ if (!syncable::RealModelTypeToNotificationType(
+ model_type, &notification_type)) {
+ return false;
+ }
+ object_id->Init(ipc::invalidation::ObjectSource::CHROME_SYNC,
+ notification_type);
+ return true;
+}
+
+bool ObjectIdToRealModelType(const invalidation::ObjectId& object_id,
+ syncable::ModelType* model_type) {
+ return
+ syncable::NotificationTypeToRealModelType(
+ object_id.name(), model_type);
+}
+
+std::string ObjectIdToString(
+ const invalidation::ObjectId& object_id) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "name: " << object_id.name() << ", ";
+ ss << "source: " << object_id.source();
+ ss << " }";
+ return ss.str();
+}
+
+std::string InvalidationToString(
+ const invalidation::Invalidation& invalidation) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
+ ss << "version: " << invalidation.version();
+ ss << " }";
+ return ss.str();
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/invalidation_util.h b/sync/notifier/invalidation_util.h
new file mode 100644
index 0000000..80c032d
--- /dev/null
+++ b/sync/notifier/invalidation_util.h
@@ -0,0 +1,40 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Various utilities for dealing with invalidation data types.
+
+#ifndef SYNC_NOTIFIER_INVALIDATION_UTIL_H_
+#define SYNC_NOTIFIER_INVALIDATION_UTIL_H_
+#pragma once
+
+#include <string>
+
+#include "google/cacheinvalidation/deps/callback.h"
+#include "sync/syncable/model_type.h"
+
+namespace invalidation {
+
+class Invalidation;
+class ObjectId;
+
+} // namespace invalidation
+
+namespace sync_notifier {
+
+void RunAndDeleteClosure(invalidation::Closure* task);
+
+bool RealModelTypeToObjectId(syncable::ModelType model_type,
+ invalidation::ObjectId* object_id);
+
+bool ObjectIdToRealModelType(const invalidation::ObjectId& object_id,
+ syncable::ModelType* model_type);
+
+std::string ObjectIdToString(const invalidation::ObjectId& object_id);
+
+std::string InvalidationToString(
+ const invalidation::Invalidation& invalidation);
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_INVALIDATION_UTIL_H_
diff --git a/sync/notifier/invalidation_version_tracker.h b/sync/notifier/invalidation_version_tracker.h
new file mode 100644
index 0000000..70eda6b
--- /dev/null
+++ b/sync/notifier/invalidation_version_tracker.h
@@ -0,0 +1,38 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// An InvalidationVersionTracker is an interface that handles getting
+// and setting (persisting) max invalidation versions.
+
+#ifndef SYNC_NOTIFIER_INVALIDATION_VERSION_TRACKER_H_
+#define SYNC_NOTIFIER_INVALIDATION_VERSION_TRACKER_H_
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "sync/syncable/model_type.h"
+
+namespace sync_notifier {
+
+typedef std::map<syncable::ModelType, int64> InvalidationVersionMap;
+
+class InvalidationVersionTracker {
+ public:
+ InvalidationVersionTracker() {}
+
+ virtual InvalidationVersionMap GetAllMaxVersions() const = 0;
+
+ // |max_version| should be strictly greater than any existing max
+ // version for |model_type|.
+ virtual void SetMaxVersion(syncable::ModelType model_type,
+ int64 max_version) = 0;
+
+ protected:
+ virtual ~InvalidationVersionTracker() {}
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_INVALIDATION_VERSION_TRACKER_H_
+
diff --git a/sync/notifier/mock_sync_notifier_observer.cc b/sync/notifier/mock_sync_notifier_observer.cc
new file mode 100644
index 0000000..51c0135
--- /dev/null
+++ b/sync/notifier/mock_sync_notifier_observer.cc
@@ -0,0 +1,12 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/mock_sync_notifier_observer.h"
+
+namespace sync_notifier {
+
+MockSyncNotifierObserver::MockSyncNotifierObserver() {}
+MockSyncNotifierObserver::~MockSyncNotifierObserver() {}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/mock_sync_notifier_observer.h b/sync/notifier/mock_sync_notifier_observer.h
new file mode 100644
index 0000000..7d648df
--- /dev/null
+++ b/sync/notifier/mock_sync_notifier_observer.h
@@ -0,0 +1,30 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_NOTIFIER_MOCK_SYNC_NOTIFIER_OBSERVER_H_
+#define SYNC_NOTIFIER_MOCK_SYNC_NOTIFIER_OBSERVER_H_
+#pragma once
+
+#include <string>
+
+#include "sync/notifier/sync_notifier_observer.h"
+#include "testing/gmock/include/gmock/gmock.h"
+
+namespace sync_notifier {
+
+class MockSyncNotifierObserver : public SyncNotifierObserver {
+ public:
+ MockSyncNotifierObserver();
+ virtual ~MockSyncNotifierObserver();
+
+ MOCK_METHOD2(OnIncomingNotification,
+ void(const syncable::ModelTypePayloadMap&,
+ IncomingNotificationSource));
+ MOCK_METHOD1(OnNotificationStateChange, void(bool));
+ MOCK_METHOD1(StoreState, void(const std::string&));
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_MOCK_SYNC_NOTIFIER_OBSERVER_H_
diff --git a/sync/notifier/non_blocking_invalidation_notifier.cc b/sync/notifier/non_blocking_invalidation_notifier.cc
new file mode 100644
index 0000000..01af7d9
--- /dev/null
+++ b/sync/notifier/non_blocking_invalidation_notifier.cc
@@ -0,0 +1,268 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/non_blocking_invalidation_notifier.h"
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "base/threading/thread.h"
+#include "sync/notifier/invalidation_notifier.h"
+
+namespace sync_notifier {
+
+class NonBlockingInvalidationNotifier::Core
+ : public base::RefCountedThreadSafe<NonBlockingInvalidationNotifier::Core>,
+ public SyncNotifierObserver {
+ public:
+ // Called on parent thread. |delegate_observer| should be
+ // initialized.
+ explicit Core(
+ const browser_sync::WeakHandle<SyncNotifierObserver>&
+ delegate_observer);
+
+ // Helpers called on I/O thread.
+ void Initialize(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info);
+ void Teardown();
+ void SetUniqueId(const std::string& unique_id);
+ void SetState(const std::string& state);
+ void UpdateCredentials(const std::string& email, const std::string& token);
+ void UpdateEnabledTypes(syncable::ModelTypeSet enabled_types);
+
+ // SyncNotifierObserver implementation (all called on I/O thread).
+ virtual void OnIncomingNotification(
+ const syncable::ModelTypePayloadMap& type_payloads,
+ IncomingNotificationSource source);
+ virtual void OnNotificationStateChange(bool notifications_enabled);
+ virtual void StoreState(const std::string& state);
+
+ private:
+ friend class
+ base::RefCountedThreadSafe<NonBlockingInvalidationNotifier::Core>;
+ // Called on parent or I/O thread.
+ ~Core();
+
+ // The variables below should be used only on the I/O thread.
+ const browser_sync::WeakHandle<SyncNotifierObserver> delegate_observer_;
+ scoped_ptr<InvalidationNotifier> invalidation_notifier_;
+ scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_;
+
+ DISALLOW_COPY_AND_ASSIGN(Core);
+};
+
+NonBlockingInvalidationNotifier::Core::Core(
+ const browser_sync::WeakHandle<SyncNotifierObserver>&
+ delegate_observer)
+ : delegate_observer_(delegate_observer) {
+ DCHECK(delegate_observer_.IsInitialized());
+}
+
+NonBlockingInvalidationNotifier::Core::~Core() {
+}
+
+void NonBlockingInvalidationNotifier::Core::Initialize(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info) {
+ DCHECK(notifier_options.request_context_getter);
+ DCHECK_EQ(notifier::NOTIFICATION_SERVER,
+ notifier_options.notification_method);
+ io_message_loop_proxy_ = notifier_options.request_context_getter->
+ GetIOMessageLoopProxy();
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ invalidation_notifier_.reset(
+ new InvalidationNotifier(
+ notifier_options,
+ initial_max_invalidation_versions,
+ invalidation_version_tracker,
+ client_info));
+ invalidation_notifier_->AddObserver(this);
+}
+
+
+void NonBlockingInvalidationNotifier::Core::Teardown() {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ invalidation_notifier_->RemoveObserver(this);
+ invalidation_notifier_.reset();
+ io_message_loop_proxy_ = NULL;
+}
+
+void NonBlockingInvalidationNotifier::Core::SetUniqueId(
+ const std::string& unique_id) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ invalidation_notifier_->SetUniqueId(unique_id);
+}
+
+void NonBlockingInvalidationNotifier::Core::SetState(
+ const std::string& state) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ invalidation_notifier_->SetState(state);
+}
+
+void NonBlockingInvalidationNotifier::Core::UpdateCredentials(
+ const std::string& email, const std::string& token) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ invalidation_notifier_->UpdateCredentials(email, token);
+}
+
+void NonBlockingInvalidationNotifier::Core::UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ invalidation_notifier_->UpdateEnabledTypes(enabled_types);
+}
+
+void NonBlockingInvalidationNotifier::Core::OnIncomingNotification(
+ const syncable::ModelTypePayloadMap& type_payloads,
+ IncomingNotificationSource source) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ delegate_observer_.Call(FROM_HERE,
+ &SyncNotifierObserver::OnIncomingNotification,
+ type_payloads,
+ source);
+}
+
+void NonBlockingInvalidationNotifier::Core::OnNotificationStateChange(
+ bool notifications_enabled) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ delegate_observer_.Call(FROM_HERE,
+ &SyncNotifierObserver::OnNotificationStateChange,
+ notifications_enabled);
+}
+
+void NonBlockingInvalidationNotifier::Core::StoreState(
+ const std::string& state) {
+ DCHECK(io_message_loop_proxy_->BelongsToCurrentThread());
+ delegate_observer_.Call(FROM_HERE,
+ &SyncNotifierObserver::StoreState, state);
+}
+
+NonBlockingInvalidationNotifier::NonBlockingInvalidationNotifier(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info)
+ : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ core_(
+ new Core(browser_sync::MakeWeakHandle(
+ weak_ptr_factory_.GetWeakPtr()))),
+ parent_message_loop_proxy_(
+ base::MessageLoopProxy::current()),
+ io_message_loop_proxy_(notifier_options.request_context_getter->
+ GetIOMessageLoopProxy()) {
+ if (!io_message_loop_proxy_->PostTask(
+ FROM_HERE,
+ base::Bind(
+ &NonBlockingInvalidationNotifier::Core::Initialize,
+ core_.get(),
+ notifier_options,
+ initial_max_invalidation_versions,
+ invalidation_version_tracker,
+ client_info))) {
+ NOTREACHED();
+ }
+}
+
+NonBlockingInvalidationNotifier::~NonBlockingInvalidationNotifier() {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ if (!io_message_loop_proxy_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidationNotifier::Core::Teardown,
+ core_.get()))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidationNotifier::AddObserver(
+ SyncNotifierObserver* observer) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ observers_.AddObserver(observer);
+}
+
+void NonBlockingInvalidationNotifier::RemoveObserver(
+ SyncNotifierObserver* observer) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ observers_.RemoveObserver(observer);
+}
+
+void NonBlockingInvalidationNotifier::SetUniqueId(
+ const std::string& unique_id) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ if (!io_message_loop_proxy_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidationNotifier::Core::SetUniqueId,
+ core_.get(), unique_id))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidationNotifier::SetState(const std::string& state) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ if (!io_message_loop_proxy_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidationNotifier::Core::SetState,
+ core_.get(), state))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidationNotifier::UpdateCredentials(
+ const std::string& email, const std::string& token) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ if (!io_message_loop_proxy_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidationNotifier::Core::UpdateCredentials,
+ core_.get(), email, token))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidationNotifier::UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ if (!io_message_loop_proxy_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidationNotifier::Core::UpdateEnabledTypes,
+ core_.get(), enabled_types))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidationNotifier::SendNotification(
+ syncable::ModelTypeSet changed_types) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ // InvalidationClient doesn't implement SendNotification(), so no
+ // need to forward on the call.
+}
+
+void NonBlockingInvalidationNotifier::OnIncomingNotification(
+ const syncable::ModelTypePayloadMap& type_payloads,
+ IncomingNotificationSource source) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
+ OnIncomingNotification(type_payloads, source));
+}
+
+void NonBlockingInvalidationNotifier::OnNotificationStateChange(
+ bool notifications_enabled) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
+ OnNotificationStateChange(notifications_enabled));
+}
+
+void NonBlockingInvalidationNotifier::StoreState(
+ const std::string& state) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observers_,
+ StoreState(state));
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/non_blocking_invalidation_notifier.h b/sync/notifier/non_blocking_invalidation_notifier.h
new file mode 100644
index 0000000..9432936
--- /dev/null
+++ b/sync/notifier/non_blocking_invalidation_notifier.h
@@ -0,0 +1,83 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// An implementation of SyncNotifier that wraps InvalidationNotifier
+// on its own thread.
+
+#ifndef SYNC_NOTIFIER_NON_BLOCKING_INVALIDATION_NOTIFIER_H_
+#define SYNC_NOTIFIER_NON_BLOCKING_INVALIDATION_NOTIFIER_H_
+#pragma once
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "base/observer_list.h"
+#include "jingle/notifier/base/notifier_options.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/notifier/sync_notifier.h"
+#include "sync/notifier/sync_notifier_observer.h"
+#include "sync/util/weak_handle.h"
+
+namespace base {
+class MessageLoopProxy;
+}
+
+namespace sync_notifier {
+
+class NonBlockingInvalidationNotifier
+ : public SyncNotifier,
+ public SyncNotifierObserver {
+ public:
+ // |invalidation_version_tracker| must be initialized.
+ NonBlockingInvalidationNotifier(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info);
+
+ virtual ~NonBlockingInvalidationNotifier();
+
+ // SyncNotifier implementation.
+ virtual void AddObserver(SyncNotifierObserver* observer) OVERRIDE;
+ virtual void RemoveObserver(SyncNotifierObserver* observer) OVERRIDE;
+ virtual void SetUniqueId(const std::string& unique_id) OVERRIDE;
+ virtual void SetState(const std::string& state) OVERRIDE;
+ virtual void UpdateCredentials(
+ const std::string& email, const std::string& token) OVERRIDE;
+ virtual void UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) OVERRIDE;
+ virtual void SendNotification(
+ syncable::ModelTypeSet changed_types) OVERRIDE;
+
+ // SyncNotifierObserver implementation.
+ virtual void OnIncomingNotification(
+ const syncable::ModelTypePayloadMap& type_payloads,
+ IncomingNotificationSource source) OVERRIDE;
+ virtual void OnNotificationStateChange(bool notifications_enabled) OVERRIDE;
+ virtual void StoreState(const std::string& state) OVERRIDE;
+
+ private:
+ class Core;
+
+ base::WeakPtrFactory<NonBlockingInvalidationNotifier> weak_ptr_factory_;
+
+ // Our observers (which must live on the parent thread).
+ ObserverList<SyncNotifierObserver> observers_;
+
+ // The real guts of NonBlockingInvalidationNotifier, which allows
+ // this class to live completely on the parent thread.
+ scoped_refptr<Core> core_;
+ scoped_refptr<base::MessageLoopProxy> parent_message_loop_proxy_;
+ scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_;
+
+ DISALLOW_COPY_AND_ASSIGN(NonBlockingInvalidationNotifier);
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_NON_BLOCKING_INVALIDATION_NOTIFIER_H_
diff --git a/sync/notifier/non_blocking_invalidation_notifier_unittest.cc b/sync/notifier/non_blocking_invalidation_notifier_unittest.cc
new file mode 100644
index 0000000..fea7742
--- /dev/null
+++ b/sync/notifier/non_blocking_invalidation_notifier_unittest.cc
@@ -0,0 +1,97 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/non_blocking_invalidation_notifier.h"
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "base/threading/thread.h"
+#include "jingle/notifier/base/fake_base_task.h"
+#include "net/url_request/url_request_test_util.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/notifier/mock_sync_notifier_observer.h"
+#include "sync/syncable/model_type.h"
+#include "sync/syncable/model_type_payload_map.h"
+#include "sync/util/weak_handle.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+
+namespace {
+
+using ::testing::InSequence;
+using ::testing::StrictMock;
+
+class NonBlockingInvalidationNotifierTest : public testing::Test {
+ public:
+ NonBlockingInvalidationNotifierTest() : io_thread_("Test IO thread") {}
+
+ protected:
+ virtual void SetUp() {
+ base::Thread::Options options;
+ options.message_loop_type = MessageLoop::TYPE_IO;
+ io_thread_.StartWithOptions(options);
+ request_context_getter_ =
+ new TestURLRequestContextGetter(io_thread_.message_loop_proxy());
+ notifier::NotifierOptions notifier_options;
+ notifier_options.request_context_getter = request_context_getter_;
+ invalidation_notifier_.reset(
+ new NonBlockingInvalidationNotifier(
+ notifier_options,
+ InvalidationVersionMap(),
+ browser_sync::MakeWeakHandle(
+ base::WeakPtr<sync_notifier::InvalidationVersionTracker>()),
+ "fake_client_info"));
+ invalidation_notifier_->AddObserver(&mock_observer_);
+ }
+
+ virtual void TearDown() {
+ invalidation_notifier_->RemoveObserver(&mock_observer_);
+ invalidation_notifier_.reset();
+ request_context_getter_ = NULL;
+ io_thread_.Stop();
+ ui_loop_.RunAllPending();
+ }
+
+ MessageLoop ui_loop_;
+ base::Thread io_thread_;
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter_;
+ scoped_ptr<NonBlockingInvalidationNotifier> invalidation_notifier_;
+ StrictMock<MockSyncNotifierObserver> mock_observer_;
+ notifier::FakeBaseTask fake_base_task_;
+};
+
+TEST_F(NonBlockingInvalidationNotifierTest, Basic) {
+ InSequence dummy;
+
+ syncable::ModelTypePayloadMap type_payloads;
+ type_payloads[syncable::PREFERENCES] = "payload";
+ type_payloads[syncable::BOOKMARKS] = "";
+ type_payloads[syncable::AUTOFILL] = "";
+
+ EXPECT_CALL(mock_observer_, OnNotificationStateChange(true));
+ EXPECT_CALL(mock_observer_, StoreState("new_fake_state"));
+ EXPECT_CALL(mock_observer_,
+ OnIncomingNotification(type_payloads,
+ REMOTE_NOTIFICATION));
+ EXPECT_CALL(mock_observer_, OnNotificationStateChange(false));
+
+ invalidation_notifier_->SetState("fake_state");
+ invalidation_notifier_->SetUniqueId("fake_id");
+ invalidation_notifier_->UpdateCredentials("foo@bar.com", "fake_token");
+
+ invalidation_notifier_->OnNotificationStateChange(true);
+ invalidation_notifier_->StoreState("new_fake_state");
+ invalidation_notifier_->OnIncomingNotification(type_payloads,
+ REMOTE_NOTIFICATION);
+ invalidation_notifier_->OnNotificationStateChange(false);
+
+ ui_loop_.RunAllPending();
+}
+
+} // namespace
+
+} // namespace sync_notifier
diff --git a/sync/notifier/p2p_notifier.cc b/sync/notifier/p2p_notifier.cc
new file mode 100644
index 0000000..49462ce
--- /dev/null
+++ b/sync/notifier/p2p_notifier.cc
@@ -0,0 +1,301 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/p2p_notifier.h"
+
+#include <algorithm>
+
+#include "base/json/json_reader.h"
+#include "base/json/json_writer.h"
+#include "base/logging.h"
+#include "base/message_loop_proxy.h"
+#include "base/values.h"
+#include "sync/notifier/sync_notifier_observer.h"
+#include "sync/protocol/service_constants.h"
+#include "sync/syncable/model_type_payload_map.h"
+
+namespace sync_notifier {
+
+const char* kSyncP2PNotificationChannel = "http://www.google.com/chrome/sync";
+
+namespace {
+
+const char kNotifySelf[] = "notifySelf";
+const char kNotifyOthers[] = "notifyOthers";
+const char kNotifyAll[] = "notifyAll";
+
+const char kSenderIdKey[] = "senderId";
+const char kNotificationTypeKey[] = "notificationType";
+const char kChangedTypesKey[] = "changedTypes";
+
+} // namespace
+
+std::string P2PNotificationTargetToString(P2PNotificationTarget target) {
+ switch (target) {
+ case NOTIFY_SELF:
+ return kNotifySelf;
+ case NOTIFY_OTHERS:
+ return kNotifyOthers;
+ case NOTIFY_ALL:
+ return kNotifyAll;
+ default:
+ NOTREACHED();
+ return "";
+ }
+}
+
+P2PNotificationTarget P2PNotificationTargetFromString(
+ const std::string& target_str) {
+ if (target_str == kNotifySelf) {
+ return NOTIFY_SELF;
+ }
+ if (target_str == kNotifyOthers) {
+ return NOTIFY_OTHERS;
+ }
+ if (target_str == kNotifyAll) {
+ return NOTIFY_ALL;
+ }
+ LOG(WARNING) << "Could not parse " << target_str;
+ return NOTIFY_SELF;
+}
+
+P2PNotificationData::P2PNotificationData() : target_(NOTIFY_SELF) {}
+
+P2PNotificationData::P2PNotificationData(
+ const std::string& sender_id,
+ P2PNotificationTarget target,
+ syncable::ModelTypeSet changed_types)
+ : sender_id_(sender_id),
+ target_(target),
+ changed_types_(changed_types) {}
+
+P2PNotificationData::~P2PNotificationData() {}
+
+bool P2PNotificationData::IsTargeted(const std::string& id) const {
+ switch (target_) {
+ case NOTIFY_SELF:
+ return sender_id_ == id;
+ case NOTIFY_OTHERS:
+ return sender_id_ != id;
+ case NOTIFY_ALL:
+ return true;
+ default:
+ NOTREACHED();
+ return false;
+ }
+}
+
+syncable::ModelTypeSet P2PNotificationData::GetChangedTypes() const {
+ return changed_types_;
+}
+
+bool P2PNotificationData::Equals(const P2PNotificationData& other) const {
+ return
+ (sender_id_ == other.sender_id_) &&
+ (target_ == other.target_) &&
+ changed_types_.Equals(other.changed_types_);
+}
+
+std::string P2PNotificationData::ToString() const {
+ scoped_ptr<DictionaryValue> dict(new DictionaryValue());
+ dict->SetString(kSenderIdKey, sender_id_);
+ dict->SetString(kNotificationTypeKey,
+ P2PNotificationTargetToString(target_));
+ dict->Set(kChangedTypesKey, syncable::ModelTypeSetToValue(changed_types_));
+ std::string json;
+ base::JSONWriter::Write(dict.get(), &json);
+ return json;
+}
+
+bool P2PNotificationData::ResetFromString(const std::string& str) {
+ scoped_ptr<Value> data_value(base::JSONReader::Read(str));
+ if (!data_value.get()) {
+ LOG(WARNING) << "Could not parse " << str;
+ return false;
+ }
+ if (!data_value->IsType(Value::TYPE_DICTIONARY)) {
+ LOG(WARNING) << "Could not parse " << str << " as a dictionary";
+ return false;
+ }
+ // TODO(akalin): Use Values::AsDictionary() when it becomes
+ // available.
+ DictionaryValue* data_dict =
+ static_cast<DictionaryValue*>(data_value.get());
+ if (!data_dict->GetString(kSenderIdKey, &sender_id_)) {
+ LOG(WARNING) << "Could not find string value for " << kSenderIdKey;
+ }
+ std::string target_str;
+ if (!data_dict->GetString(kNotificationTypeKey, &target_str)) {
+ LOG(WARNING) << "Could not find string value for "
+ << kNotificationTypeKey;
+ }
+ target_ = P2PNotificationTargetFromString(target_str);
+ ListValue* changed_types_list = NULL;
+ if (!data_dict->GetList(kChangedTypesKey, &changed_types_list)) {
+ LOG(WARNING) << "Could not find list value for "
+ << kChangedTypesKey;
+ return false;
+ }
+ changed_types_ = syncable::ModelTypeSetFromValue(*changed_types_list);
+ return true;
+}
+
+P2PNotifier::P2PNotifier(notifier::TalkMediator* talk_mediator,
+ P2PNotificationTarget send_notification_target)
+ : talk_mediator_(talk_mediator),
+ logged_in_(false),
+ notifications_enabled_(false),
+ send_notification_target_(send_notification_target),
+ parent_message_loop_proxy_(
+ base::MessageLoopProxy::current()) {
+ DCHECK(send_notification_target_ == NOTIFY_OTHERS ||
+ send_notification_target_ == NOTIFY_ALL);
+ talk_mediator_->SetDelegate(this);
+}
+
+P2PNotifier::~P2PNotifier() {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+}
+
+void P2PNotifier::AddObserver(SyncNotifierObserver* observer) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ observer_list_.AddObserver(observer);
+}
+
+// Note: Since we need to shutdown TalkMediator on the method_thread, we are
+// calling Logout on TalkMediator when the last observer is removed.
+// Users will need to call UpdateCredentials again to use the same object.
+// TODO(akalin): Think of a better solution to fix this.
+void P2PNotifier::RemoveObserver(SyncNotifierObserver* observer) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ observer_list_.RemoveObserver(observer);
+
+ // Logout after the last observer is removed.
+ if (observer_list_.size() == 0) {
+ talk_mediator_->Logout();
+ }
+}
+
+void P2PNotifier::SetUniqueId(const std::string& unique_id) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ unique_id_ = unique_id;
+}
+
+void P2PNotifier::SetState(const std::string& state) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+}
+
+void P2PNotifier::UpdateCredentials(
+ const std::string& email, const std::string& token) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ // If already logged in, the new credentials will take effect on the
+ // next reconnection.
+ talk_mediator_->SetAuthToken(email, token, SYNC_SERVICE_NAME);
+ if (!logged_in_) {
+ if (!talk_mediator_->Login()) {
+ LOG(DFATAL) << "Could not login for " << email;
+ return;
+ }
+
+ notifier::Subscription subscription;
+ subscription.channel = kSyncP2PNotificationChannel;
+ // There may be some subtle issues around case sensitivity of the
+ // from field, but it doesn't matter too much since this is only
+ // used in p2p mode (which is only used in testing).
+ subscription.from = email;
+ talk_mediator_->AddSubscription(subscription);
+
+ logged_in_ = true;
+ }
+}
+
+void P2PNotifier::UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ const syncable::ModelTypeSet new_enabled_types =
+ Difference(enabled_types, enabled_types_);
+ enabled_types_ = enabled_types;
+ const P2PNotificationData notification_data(
+ unique_id_, NOTIFY_SELF, new_enabled_types);
+ SendNotificationData(notification_data);
+}
+
+void P2PNotifier::SendNotification(
+ syncable::ModelTypeSet changed_types) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ const P2PNotificationData notification_data(
+ unique_id_, send_notification_target_, changed_types);
+ SendNotificationData(notification_data);
+}
+
+void P2PNotifier::OnNotificationStateChange(bool notifications_enabled) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ bool disabled_to_enabled = notifications_enabled && !notifications_enabled_;
+ notifications_enabled_ = notifications_enabled;
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observer_list_,
+ OnNotificationStateChange(notifications_enabled_));
+ if (disabled_to_enabled) {
+ const P2PNotificationData notification_data(
+ unique_id_, NOTIFY_SELF, enabled_types_);
+ SendNotificationData(notification_data);
+ }
+}
+
+void P2PNotifier::OnIncomingNotification(
+ const notifier::Notification& notification) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ DVLOG(1) << "Received notification " << notification.ToString();
+ if (!logged_in_) {
+ DVLOG(1) << "Not logged in yet -- not emitting notification";
+ return;
+ }
+ if (!notifications_enabled_) {
+ DVLOG(1) << "Notifications not enabled -- not emitting notification";
+ return;
+ }
+ if (notification.channel != kSyncP2PNotificationChannel) {
+ LOG(WARNING) << "Notification from unexpected source "
+ << notification.channel;
+ }
+ P2PNotificationData notification_data;
+ if (!notification_data.ResetFromString(notification.data)) {
+ LOG(WARNING) << "Could not parse notification data from "
+ << notification.data;
+ notification_data =
+ P2PNotificationData(unique_id_, NOTIFY_ALL, enabled_types_);
+ }
+ if (!notification_data.IsTargeted(unique_id_)) {
+ DVLOG(1) << "Not a target of the notification -- "
+ << "not emitting notification";
+ return;
+ }
+ if (notification_data.GetChangedTypes().Empty()) {
+ DVLOG(1) << "No changed types -- not emitting notification";
+ return;
+ }
+ const syncable::ModelTypePayloadMap& type_payloads =
+ syncable::ModelTypePayloadMapFromEnumSet(
+ notification_data.GetChangedTypes(), std::string());
+ FOR_EACH_OBSERVER(SyncNotifierObserver, observer_list_,
+ OnIncomingNotification(type_payloads, REMOTE_NOTIFICATION));
+}
+
+void P2PNotifier::OnOutgoingNotification() {}
+
+void P2PNotifier::SendNotificationDataForTest(
+ const P2PNotificationData& notification_data) {
+ SendNotificationData(notification_data);
+}
+
+void P2PNotifier::SendNotificationData(
+ const P2PNotificationData& notification_data) {
+ DCHECK(parent_message_loop_proxy_->BelongsToCurrentThread());
+ notifier::Notification notification;
+ notification.channel = kSyncP2PNotificationChannel;
+ notification.data = notification_data.ToString();
+ DVLOG(1) << "Sending XMPP notification: " << notification.ToString();
+ talk_mediator_->SendNotification(notification);
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/p2p_notifier.h b/sync/notifier/p2p_notifier.h
new file mode 100644
index 0000000..093d023
--- /dev/null
+++ b/sync/notifier/p2p_notifier.h
@@ -0,0 +1,143 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A notifier that uses p2p notifications based on XMPP push
+// notifications. Used only for sync integration tests.
+
+#ifndef SYNC_NOTIFIER_P2P_NOTIFIER_H_
+#define SYNC_NOTIFIER_P2P_NOTIFIER_H_
+
+#include <string>
+
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/observer_list.h"
+#include "jingle/notifier/listener/talk_mediator.h"
+#include "sync/notifier/sync_notifier.h"
+#include "sync/syncable/model_type.h"
+
+namespace base {
+class MessageLoopProxy;
+}
+
+
+namespace sync_notifier {
+
+// The channel to use for sync notifications.
+extern const char* kSyncP2PNotificationChannel;
+
+// The intended recipient(s) of a P2P notification.
+enum P2PNotificationTarget {
+ NOTIFY_SELF,
+ FIRST_NOTIFICATION_TARGET = NOTIFY_SELF,
+ NOTIFY_OTHERS,
+ NOTIFY_ALL,
+ LAST_NOTIFICATION_TARGET = NOTIFY_ALL
+};
+
+std::string P2PNotificationTargetToString(
+ P2PNotificationTarget target);
+
+// If |target_str| can't be parsed, assumes NOTIFY_SELF.
+P2PNotificationTarget P2PNotificationTargetFromString(
+ const std::string& target_str);
+
+// Helper notification data class that can be serialized to and
+// deserialized from a string.
+class P2PNotificationData {
+ public:
+ // Initializes with an empty sender ID, target set to NOTIFY_SELF,
+ // and empty changed types.
+ P2PNotificationData();
+ P2PNotificationData(const std::string& sender_id,
+ P2PNotificationTarget target,
+ syncable::ModelTypeSet changed_types);
+
+ ~P2PNotificationData();
+
+ // Returns true if the given ID is targeted by this notification.
+ bool IsTargeted(const std::string& id) const;
+
+ syncable::ModelTypeSet GetChangedTypes() const;
+
+ bool Equals(const P2PNotificationData& other) const;
+
+ std::string ToString() const;
+
+ // Returns whether parsing |str| was successful. If parsing was
+ // unsuccessful, the state of the notification is undefined.
+ bool ResetFromString(const std::string& str);
+
+ private:
+ // The unique ID of the client that sent the notification.
+ std::string sender_id_;
+ // The intendent recipient(s) of the notification.
+ P2PNotificationTarget target_;
+ // The types the notification is for.
+ syncable::ModelTypeSet changed_types_;
+};
+
+class P2PNotifier
+ : public SyncNotifier,
+ public notifier::TalkMediator::Delegate {
+ public:
+ // Takes ownership of |talk_mediator|, but it is guaranteed that
+ // |talk_mediator| is destroyed only when this object is destroyed.
+ //
+ // The |send_notification_target| parameter was added to allow us to send
+ // self-notifications in some cases, but not others. The value should be
+ // either NOTIFY_ALL to send notifications to all clients, or NOTIFY_OTHERS
+ // to send notificaitons to all clients except for the one that triggered the
+ // notification. See crbug.com/97780.
+ P2PNotifier(notifier::TalkMediator* talk_mediator,
+ P2PNotificationTarget send_notification_target);
+
+ virtual ~P2PNotifier();
+
+ // SyncNotifier implementation
+ virtual void AddObserver(SyncNotifierObserver* observer) OVERRIDE;
+ virtual void RemoveObserver(SyncNotifierObserver* observer) OVERRIDE;
+ virtual void SetUniqueId(const std::string& unique_id) OVERRIDE;
+ virtual void SetState(const std::string& state) OVERRIDE;
+ virtual void UpdateCredentials(
+ const std::string& email, const std::string& token) OVERRIDE;
+ virtual void UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) OVERRIDE;
+ virtual void SendNotification(
+ syncable::ModelTypeSet changed_types) OVERRIDE;
+
+ // TalkMediator::Delegate implementation.
+ virtual void OnNotificationStateChange(bool notifications_enabled) OVERRIDE;
+ virtual void OnIncomingNotification(
+ const notifier::Notification& notification) OVERRIDE;
+ virtual void OnOutgoingNotification() OVERRIDE;
+
+ // For testing.
+ void SendNotificationDataForTest(
+ const P2PNotificationData& notification_data);
+
+ private:
+ void SendNotificationData(const P2PNotificationData& notification_data);
+
+ ObserverList<SyncNotifierObserver> observer_list_;
+
+ // The actual notification listener.
+ scoped_ptr<notifier::TalkMediator> talk_mediator_;
+ // Our unique ID.
+ std::string unique_id_;
+ // Whether we called Login() on |talk_mediator_| yet.
+ bool logged_in_;
+ // Whether |talk_mediator_| has notified us that notifications are
+ // enabled.
+ bool notifications_enabled_;
+ // Which set of clients should be sent notifications.
+ P2PNotificationTarget send_notification_target_;
+
+ syncable::ModelTypeSet enabled_types_;
+ scoped_refptr<base::MessageLoopProxy> parent_message_loop_proxy_;
+};
+
+} // namespace sync_notifier
+#endif // SYNC_NOTIFIER_P2P_NOTIFIER_H_
diff --git a/sync/notifier/p2p_notifier_unittest.cc b/sync/notifier/p2p_notifier_unittest.cc
new file mode 100644
index 0000000..a9f8db9
--- /dev/null
+++ b/sync/notifier/p2p_notifier_unittest.cc
@@ -0,0 +1,259 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/p2p_notifier.h"
+
+#include <cstddef>
+
+#include "base/compiler_specific.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop.h"
+#include "sync/notifier/mock_sync_notifier_observer.h"
+#include "sync/syncable/model_type.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+
+namespace {
+
+using ::testing::_;
+using ::testing::Mock;
+using ::testing::StrictMock;
+
+class FakeTalkMediator : public notifier::TalkMediator {
+ public:
+ FakeTalkMediator() : delegate_(NULL) {}
+ virtual ~FakeTalkMediator() {}
+
+ // notifier::TalkMediator implementation.
+ virtual void SetDelegate(Delegate* delegate) OVERRIDE {
+ delegate_ = delegate;
+ }
+ virtual void SetAuthToken(const std::string& email,
+ const std::string& token,
+ const std::string& token_service) OVERRIDE {}
+ virtual bool Login() OVERRIDE {
+ if (delegate_) {
+ delegate_->OnNotificationStateChange(true /* notifications_enabled */);
+ }
+ return true;
+ }
+ virtual bool Logout() OVERRIDE {
+ if (delegate_) {
+ delegate_->OnNotificationStateChange(false /* notifiations_enabled */);
+ }
+ return true;
+ }
+ virtual void SendNotification(const notifier::Notification& data) OVERRIDE {
+ if (delegate_) {
+ delegate_->OnOutgoingNotification();
+ delegate_->OnIncomingNotification(data);
+ }
+ }
+ virtual void AddSubscription(
+ const notifier::Subscription& subscription) OVERRIDE {}
+
+ private:
+ Delegate* delegate_;
+};
+
+class P2PNotifierTest : public testing::Test {
+ protected:
+ P2PNotifierTest() : talk_mediator_(NULL) {}
+
+ virtual void SetUp() {
+ talk_mediator_ = new FakeTalkMediator();
+ p2p_notifier_.reset(new P2PNotifier(talk_mediator_, NOTIFY_OTHERS));
+ p2p_notifier_->AddObserver(&mock_observer_);
+ }
+
+ virtual void TearDown() {
+ p2p_notifier_->RemoveObserver(&mock_observer_);
+ p2p_notifier_.reset();
+ talk_mediator_ = NULL;
+ }
+
+ syncable::ModelTypePayloadMap MakePayloadMap(
+ syncable::ModelTypeSet types) {
+ return syncable::ModelTypePayloadMapFromEnumSet(types, "");
+ }
+
+ MessageLoop message_loop_;
+ // Owned by |p2p_notifier_|.
+ notifier::TalkMediator* talk_mediator_;
+ scoped_ptr<P2PNotifier> p2p_notifier_;
+ StrictMock<MockSyncNotifierObserver> mock_observer_;
+};
+
+TEST_F(P2PNotifierTest, P2PNotificationTarget) {
+ for (int i = FIRST_NOTIFICATION_TARGET;
+ i <= LAST_NOTIFICATION_TARGET; ++i) {
+ P2PNotificationTarget target = static_cast<P2PNotificationTarget>(i);
+ const std::string& target_str = P2PNotificationTargetToString(target);
+ EXPECT_FALSE(target_str.empty());
+ EXPECT_EQ(target, P2PNotificationTargetFromString(target_str));
+ }
+ EXPECT_EQ(NOTIFY_SELF, P2PNotificationTargetFromString("unknown"));
+}
+
+TEST_F(P2PNotifierTest, P2PNotificationDataIsTargeted) {
+ {
+ const P2PNotificationData notification_data(
+ "sender", NOTIFY_SELF, syncable::ModelTypeSet());
+ EXPECT_TRUE(notification_data.IsTargeted("sender"));
+ EXPECT_FALSE(notification_data.IsTargeted("other1"));
+ EXPECT_FALSE(notification_data.IsTargeted("other2"));
+ }
+ {
+ const P2PNotificationData notification_data(
+ "sender", NOTIFY_OTHERS, syncable::ModelTypeSet());
+ EXPECT_FALSE(notification_data.IsTargeted("sender"));
+ EXPECT_TRUE(notification_data.IsTargeted("other1"));
+ EXPECT_TRUE(notification_data.IsTargeted("other2"));
+ }
+ {
+ const P2PNotificationData notification_data(
+ "sender", NOTIFY_ALL, syncable::ModelTypeSet());
+ EXPECT_TRUE(notification_data.IsTargeted("sender"));
+ EXPECT_TRUE(notification_data.IsTargeted("other1"));
+ EXPECT_TRUE(notification_data.IsTargeted("other2"));
+ }
+}
+
+TEST_F(P2PNotifierTest, P2PNotificationDataDefault) {
+ const P2PNotificationData notification_data;
+ EXPECT_TRUE(notification_data.IsTargeted(""));
+ EXPECT_FALSE(notification_data.IsTargeted("other1"));
+ EXPECT_FALSE(notification_data.IsTargeted("other2"));
+ EXPECT_TRUE(notification_data.GetChangedTypes().Empty());
+ const std::string& notification_data_str = notification_data.ToString();
+ EXPECT_EQ(
+ "{\"changedTypes\":[],\"notificationType\":\"notifySelf\","
+ "\"senderId\":\"\"}", notification_data_str);
+
+ P2PNotificationData notification_data_parsed;
+ EXPECT_TRUE(notification_data_parsed.ResetFromString(notification_data_str));
+ EXPECT_TRUE(notification_data.Equals(notification_data_parsed));
+}
+
+TEST_F(P2PNotifierTest, P2PNotificationDataNonDefault) {
+ const syncable::ModelTypeSet changed_types(
+ syncable::BOOKMARKS, syncable::THEMES);
+ const P2PNotificationData notification_data(
+ "sender", NOTIFY_ALL, changed_types);
+ EXPECT_TRUE(notification_data.IsTargeted("sender"));
+ EXPECT_TRUE(notification_data.IsTargeted("other1"));
+ EXPECT_TRUE(notification_data.IsTargeted("other2"));
+ EXPECT_TRUE(notification_data.GetChangedTypes().Equals(changed_types));
+ const std::string& notification_data_str = notification_data.ToString();
+ EXPECT_EQ(
+ "{\"changedTypes\":[\"Bookmarks\",\"Themes\"],"
+ "\"notificationType\":\"notifyAll\","
+ "\"senderId\":\"sender\"}", notification_data_str);
+
+ P2PNotificationData notification_data_parsed;
+ EXPECT_TRUE(notification_data_parsed.ResetFromString(notification_data_str));
+ EXPECT_TRUE(notification_data.Equals(notification_data_parsed));
+}
+
+TEST_F(P2PNotifierTest, NotificationsBasic) {
+ syncable::ModelTypeSet enabled_types(
+ syncable::BOOKMARKS, syncable::PREFERENCES);
+
+ EXPECT_CALL(mock_observer_, OnNotificationStateChange(true));
+ EXPECT_CALL(mock_observer_,
+ OnIncomingNotification(MakePayloadMap(enabled_types),
+ REMOTE_NOTIFICATION));
+
+ p2p_notifier_->SetUniqueId("sender");
+ p2p_notifier_->UpdateCredentials("foo@bar.com", "fake_token");
+ p2p_notifier_->UpdateEnabledTypes(enabled_types);
+ // Sent with target NOTIFY_OTHERS so should not be propagated to
+ // |mock_observer_|.
+ {
+ syncable::ModelTypeSet changed_types(
+ syncable::THEMES, syncable::APPS);
+ p2p_notifier_->SendNotification(changed_types);
+ }
+}
+
+TEST_F(P2PNotifierTest, SendNotificationData) {
+ syncable::ModelTypeSet enabled_types(
+ syncable::BOOKMARKS, syncable::PREFERENCES);
+
+ syncable::ModelTypeSet changed_types(
+ syncable::THEMES, syncable::APPS);
+
+ const syncable::ModelTypePayloadMap& changed_payload_map =
+ MakePayloadMap(changed_types);
+
+ EXPECT_CALL(mock_observer_, OnNotificationStateChange(true));
+ EXPECT_CALL(mock_observer_,
+ OnIncomingNotification(MakePayloadMap(enabled_types),
+ REMOTE_NOTIFICATION));
+
+ p2p_notifier_->SetUniqueId("sender");
+ p2p_notifier_->UpdateCredentials("foo@bar.com", "fake_token");
+ p2p_notifier_->UpdateEnabledTypes(enabled_types);
+
+ // Should be dropped.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ p2p_notifier_->SendNotificationDataForTest(P2PNotificationData());
+
+ // Should be propagated.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ EXPECT_CALL(mock_observer_, OnIncomingNotification(changed_payload_map,
+ REMOTE_NOTIFICATION));
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender", NOTIFY_SELF, changed_types));
+
+ // Should be dropped.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender2", NOTIFY_SELF, changed_types));
+
+ // Should be dropped.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender", NOTIFY_SELF, syncable::ModelTypeSet()));
+
+ // Should be dropped.
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender", NOTIFY_OTHERS, changed_types));
+
+ // Should be propagated.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ EXPECT_CALL(mock_observer_, OnIncomingNotification(changed_payload_map,
+ REMOTE_NOTIFICATION));
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender2", NOTIFY_OTHERS, changed_types));
+
+ // Should be dropped.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender2", NOTIFY_OTHERS, syncable::ModelTypeSet()));
+
+ // Should be propagated.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ EXPECT_CALL(mock_observer_, OnIncomingNotification(changed_payload_map,
+ REMOTE_NOTIFICATION));
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender", NOTIFY_ALL, changed_types));
+
+ // Should be propagated.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ EXPECT_CALL(mock_observer_, OnIncomingNotification(changed_payload_map,
+ REMOTE_NOTIFICATION));
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender2", NOTIFY_ALL, changed_types));
+
+ // Should be dropped.
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ p2p_notifier_->SendNotificationDataForTest(
+ P2PNotificationData("sender2", NOTIFY_ALL, syncable::ModelTypeSet()));
+}
+
+} // namespace
+
+} // namespace sync_notifier
diff --git a/sync/notifier/registration_manager.cc b/sync/notifier/registration_manager.cc
new file mode 100644
index 0000000..de844c5
--- /dev/null
+++ b/sync/notifier/registration_manager.cc
@@ -0,0 +1,275 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/registration_manager.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <string>
+
+#include "base/rand_util.h"
+#include "google/cacheinvalidation/include/invalidation-client.h"
+#include "google/cacheinvalidation/include/types.h"
+#include "sync/notifier/invalidation_util.h"
+#include "sync/syncable/model_type.h"
+
+namespace sync_notifier {
+
+RegistrationManager::PendingRegistrationInfo::PendingRegistrationInfo() {}
+
+RegistrationManager::RegistrationStatus::RegistrationStatus()
+ : model_type(syncable::UNSPECIFIED),
+ registration_manager(NULL),
+ enabled(true),
+ state(invalidation::InvalidationListener::UNREGISTERED) {}
+
+RegistrationManager::RegistrationStatus::~RegistrationStatus() {}
+
+void RegistrationManager::RegistrationStatus::DoRegister() {
+ DCHECK_NE(model_type, syncable::UNSPECIFIED);
+ DCHECK(registration_manager);
+ CHECK(enabled);
+ // We might be called explicitly, so stop the timer manually and
+ // reset the delay.
+ registration_timer.Stop();
+ delay = base::TimeDelta();
+ registration_manager->DoRegisterType(model_type);
+ DCHECK(!last_registration_request.is_null());
+}
+
+void RegistrationManager::RegistrationStatus::Disable() {
+ enabled = false;
+ state = invalidation::InvalidationListener::UNREGISTERED;
+ registration_timer.Stop();
+ delay = base::TimeDelta();
+}
+
+const int RegistrationManager::kInitialRegistrationDelaySeconds = 5;
+const int RegistrationManager::kRegistrationDelayExponent = 2;
+const double RegistrationManager::kRegistrationDelayMaxJitter = 0.5;
+const int RegistrationManager::kMinRegistrationDelaySeconds = 1;
+// 1 hour.
+const int RegistrationManager::kMaxRegistrationDelaySeconds = 60 * 60;
+
+RegistrationManager::RegistrationManager(
+ invalidation::InvalidationClient* invalidation_client)
+ : invalidation_client_(invalidation_client) {
+ DCHECK(invalidation_client_);
+ // Initialize statuses.
+ for (int i = syncable::FIRST_REAL_MODEL_TYPE;
+ i < syncable::MODEL_TYPE_COUNT; ++i) {
+ syncable::ModelType model_type = syncable::ModelTypeFromInt(i);
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ status->model_type = model_type;
+ status->registration_manager = this;
+ }
+}
+
+RegistrationManager::~RegistrationManager() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+}
+
+void RegistrationManager::SetRegisteredTypes(
+ syncable::ModelTypeSet types) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+
+ for (int i = syncable::FIRST_REAL_MODEL_TYPE;
+ i < syncable::MODEL_TYPE_COUNT; ++i) {
+ syncable::ModelType model_type = syncable::ModelTypeFromInt(i);
+ if (types.Has(model_type)) {
+ if (!IsTypeRegistered(model_type)) {
+ TryRegisterType(model_type, false /* is_retry */);
+ }
+ } else {
+ if (IsTypeRegistered(model_type)) {
+ UnregisterType(model_type);
+ }
+ }
+ }
+}
+
+void RegistrationManager::MarkRegistrationLost(
+ syncable::ModelType model_type) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ if (!status->enabled) {
+ return;
+ }
+ status->state = invalidation::InvalidationListener::UNREGISTERED;
+ bool is_retry = !status->last_registration_request.is_null();
+ TryRegisterType(model_type, is_retry);
+}
+
+void RegistrationManager::MarkAllRegistrationsLost() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ for (int i = syncable::FIRST_REAL_MODEL_TYPE;
+ i < syncable::MODEL_TYPE_COUNT; ++i) {
+ syncable::ModelType model_type = syncable::ModelTypeFromInt(i);
+ if (IsTypeRegistered(model_type)) {
+ MarkRegistrationLost(model_type);
+ }
+ }
+}
+
+void RegistrationManager::DisableType(syncable::ModelType model_type) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ LOG(INFO) << "Disabling " << syncable::ModelTypeToString(model_type);
+ status->Disable();
+}
+
+syncable::ModelTypeSet RegistrationManager::GetRegisteredTypes() const {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ syncable::ModelTypeSet registered_types;
+ for (int i = syncable::FIRST_REAL_MODEL_TYPE;
+ i < syncable::MODEL_TYPE_COUNT; ++i) {
+ syncable::ModelType model_type = syncable::ModelTypeFromInt(i);
+ if (IsTypeRegistered(model_type)) {
+ registered_types.Put(model_type);
+ }
+ }
+ return registered_types;
+}
+
+RegistrationManager::PendingRegistrationMap
+ RegistrationManager::GetPendingRegistrations() const {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ PendingRegistrationMap pending_registrations;
+ for (int i = syncable::FIRST_REAL_MODEL_TYPE;
+ i < syncable::MODEL_TYPE_COUNT; ++i) {
+ syncable::ModelType model_type = syncable::ModelTypeFromInt(i);
+ const RegistrationStatus& status = registration_statuses_[model_type];
+ if (status.registration_timer.IsRunning()) {
+ pending_registrations[model_type].last_registration_request =
+ status.last_registration_request;
+ pending_registrations[model_type].registration_attempt =
+ status.last_registration_attempt;
+ pending_registrations[model_type].delay = status.delay;
+ pending_registrations[model_type].actual_delay =
+ status.registration_timer.GetCurrentDelay();
+ }
+ }
+ return pending_registrations;
+}
+
+void RegistrationManager::FirePendingRegistrationsForTest() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ for (int i = syncable::FIRST_REAL_MODEL_TYPE;
+ i < syncable::MODEL_TYPE_COUNT; ++i) {
+ syncable::ModelType model_type = syncable::ModelTypeFromInt(i);
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ if (status->registration_timer.IsRunning()) {
+ status->DoRegister();
+ }
+ }
+}
+
+// static
+double RegistrationManager::CalculateBackoff(
+ double retry_interval,
+ double initial_retry_interval,
+ double min_retry_interval,
+ double max_retry_interval,
+ double backoff_exponent,
+ double jitter,
+ double max_jitter) {
+ // scaled_jitter lies in [-max_jitter, max_jitter].
+ double scaled_jitter = jitter * max_jitter;
+ double new_retry_interval =
+ (retry_interval == 0.0) ?
+ (initial_retry_interval * (1.0 + scaled_jitter)) :
+ (retry_interval * (backoff_exponent + scaled_jitter));
+ return std::max(min_retry_interval,
+ std::min(max_retry_interval, new_retry_interval));
+}
+
+double RegistrationManager::GetJitter() {
+ // |jitter| lies in [-1.0, 1.0), which is low-biased, but only
+ // barely.
+ //
+ // TODO(akalin): Fix the bias.
+ return 2.0 * base::RandDouble() - 1.0;
+}
+
+void RegistrationManager::TryRegisterType(syncable::ModelType model_type,
+ bool is_retry) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ if (!status->enabled) {
+ // Disabled, so do nothing.
+ return;
+ }
+ status->last_registration_attempt = base::Time::Now();
+ if (is_retry) {
+ // If we're a retry, we must have tried at least once before.
+ DCHECK(!status->last_registration_request.is_null());
+ // delay = max(0, (now - last request) + next_delay)
+ status->delay =
+ (status->last_registration_request -
+ status->last_registration_attempt) +
+ status->next_delay;
+ base::TimeDelta delay =
+ (status->delay <= base::TimeDelta()) ?
+ base::TimeDelta() : status->delay;
+ DVLOG(2) << "Registering "
+ << syncable::ModelTypeToString(model_type) << " in "
+ << delay.InMilliseconds() << " ms";
+ status->registration_timer.Stop();
+ status->registration_timer.Start(FROM_HERE,
+ delay, status, &RegistrationManager::RegistrationStatus::DoRegister);
+ double next_delay_seconds =
+ CalculateBackoff(static_cast<double>(status->next_delay.InSeconds()),
+ kInitialRegistrationDelaySeconds,
+ kMinRegistrationDelaySeconds,
+ kMaxRegistrationDelaySeconds,
+ kRegistrationDelayExponent,
+ GetJitter(),
+ kRegistrationDelayMaxJitter);
+ status->next_delay =
+ base::TimeDelta::FromSeconds(static_cast<int64>(next_delay_seconds));
+ DVLOG(2) << "New next delay for "
+ << syncable::ModelTypeToString(model_type) << " is "
+ << status->next_delay.InSeconds() << " seconds";
+ } else {
+ DVLOG(2) << "Not a retry -- registering "
+ << syncable::ModelTypeToString(model_type) << " immediately";
+ status->delay = base::TimeDelta();
+ status->next_delay = base::TimeDelta();
+ status->DoRegister();
+ }
+}
+
+void RegistrationManager::DoRegisterType(syncable::ModelType model_type) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ invalidation::ObjectId object_id;
+ if (!RealModelTypeToObjectId(model_type, &object_id)) {
+ LOG(DFATAL) << "Invalid model type: " << model_type;
+ return;
+ }
+ invalidation_client_->Register(object_id);
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ status->state = invalidation::InvalidationListener::REGISTERED;
+ status->last_registration_request = base::Time::Now();
+}
+
+void RegistrationManager::UnregisterType(syncable::ModelType model_type) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ invalidation::ObjectId object_id;
+ if (!RealModelTypeToObjectId(model_type, &object_id)) {
+ LOG(DFATAL) << "Invalid model type: " << model_type;
+ return;
+ }
+ invalidation_client_->Unregister(object_id);
+ RegistrationStatus* status = &registration_statuses_[model_type];
+ status->state = invalidation::InvalidationListener::UNREGISTERED;
+}
+
+bool RegistrationManager::IsTypeRegistered(
+ syncable::ModelType model_type) const {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ return registration_statuses_[model_type].state ==
+ invalidation::InvalidationListener::REGISTERED;
+}
+
+} // namespace sync_notifier
diff --git a/sync/notifier/registration_manager.h b/sync/notifier/registration_manager.h
new file mode 100644
index 0000000..e8639d0
--- /dev/null
+++ b/sync/notifier/registration_manager.h
@@ -0,0 +1,177 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A class that manages the registration of types for server-issued
+// notifications.
+
+#ifndef SYNC_NOTIFIER_REGISTRATION_MANAGER_H_
+#define SYNC_NOTIFIER_REGISTRATION_MANAGER_H_
+#pragma once
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/threading/non_thread_safe.h"
+#include "base/threading/non_thread_safe.h"
+#include "base/time.h"
+#include "base/timer.h"
+#include "sync/syncable/model_type.h"
+// For invalidation::InvalidationListener::RegistrationState.
+#include "google/cacheinvalidation/include/invalidation-listener.h"
+
+namespace sync_notifier {
+
+using ::invalidation::InvalidationListener;
+
+// Manages the details of registering types for invalidation.
+// Implements exponential backoff for repeated registration attempts
+// to the invalidation client.
+//
+// TODO(akalin): Consolidate exponential backoff code. Other
+// implementations include the syncer thread (both versions) and XMPP
+// retries. The most sophisticated one is URLRequestThrottler; making
+// that generic should work for everyone.
+class RegistrationManager {
+ public:
+ // Constants for exponential backoff (used by tests).
+ static const int kInitialRegistrationDelaySeconds;
+ static const int kRegistrationDelayExponent;
+ static const double kRegistrationDelayMaxJitter;
+ static const int kMinRegistrationDelaySeconds;
+ static const int kMaxRegistrationDelaySeconds;
+
+ // Types used by testing functions.
+ struct PendingRegistrationInfo {
+ PendingRegistrationInfo();
+
+ // Last time a registration request was actually sent.
+ base::Time last_registration_request;
+ // Time the registration was attempted.
+ base::Time registration_attempt;
+ // The calculated delay of the pending registration (which may be
+ // negative).
+ base::TimeDelta delay;
+ // The delay of the timer, which should be max(delay, 0).
+ base::TimeDelta actual_delay;
+ };
+ // Map from types with pending registrations to info about the
+ // pending registration.
+ typedef std::map<syncable::ModelType, PendingRegistrationInfo>
+ PendingRegistrationMap;
+
+ // Does not take ownership of |invalidation_client_|.
+ explicit RegistrationManager(
+ invalidation::InvalidationClient* invalidation_client);
+
+ virtual ~RegistrationManager();
+
+ // Registers all types included in the given set (that are not
+ // already disabled) and sets all other types to be unregistered.
+ void SetRegisteredTypes(syncable::ModelTypeSet types);
+
+ // Marks the registration for the |model_type| lost and re-registers
+ // it (unless it's disabled).
+ void MarkRegistrationLost(syncable::ModelType model_type);
+
+ // Marks registrations lost for all enabled types and re-registers
+ // them.
+ void MarkAllRegistrationsLost();
+
+ // Marks the registration for the |model_type| permanently lost and
+ // blocks any future registration attempts.
+ void DisableType(syncable::ModelType model_type);
+
+ // The functions below should only be used in tests.
+
+ // Gets all currently-registered types.
+ syncable::ModelTypeSet GetRegisteredTypes() const;
+
+ // Gets all pending registrations and their next min delays.
+ PendingRegistrationMap GetPendingRegistrations() const;
+
+ // Run pending registrations immediately.
+ void FirePendingRegistrationsForTest();
+
+ // Calculate exponential backoff. |jitter| must be Uniform[-1.0,
+ // 1.0].
+ static double CalculateBackoff(double retry_interval,
+ double initial_retry_interval,
+ double min_retry_interval,
+ double max_retry_interval,
+ double backoff_exponent,
+ double jitter,
+ double max_jitter);
+
+ protected:
+ // Overrideable for testing purposes.
+ virtual double GetJitter();
+
+ private:
+ struct RegistrationStatus {
+ RegistrationStatus();
+ ~RegistrationStatus();
+
+ // Calls registration_manager->DoRegister(model_type). (needed by
+ // |registration_timer|). Should only be called if |enabled| is
+ // true.
+ void DoRegister();
+
+ // Sets |enabled| to false and resets other variables.
+ void Disable();
+
+ // The model type for which this is the status.
+ syncable::ModelType model_type;
+ // The parent registration manager.
+ RegistrationManager* registration_manager;
+
+ // Whether this data type should be registered. Set to false if
+ // we get a non-transient registration failure.
+ bool enabled;
+ // The current registration state.
+ InvalidationListener::RegistrationState state;
+ // When we last sent a registration request.
+ base::Time last_registration_request;
+ // When we last tried to register.
+ base::Time last_registration_attempt;
+ // The calculated delay of any pending registration (which may be
+ // negative).
+ base::TimeDelta delay;
+ // The minimum time to wait until any next registration attempt.
+ // Increased after each consecutive failure.
+ base::TimeDelta next_delay;
+ // The actual timer for registration.
+ base::OneShotTimer<RegistrationStatus> registration_timer;
+ };
+
+ // Does nothing if the given type is disabled. Otherwise, if
+ // |is_retry| is not set, registers the given type immediately and
+ // resets all backoff parameters. If |is_retry| is set, registers
+ // the given type at some point in the future and increases the
+ // delay until the next retry.
+ void TryRegisterType(syncable::ModelType model_type,
+ bool is_retry);
+
+ // Registers the given type, which must be valid, immediately.
+ // Updates |last_registration| in the appropriate
+ // RegistrationStatus. Should only be called by
+ // RegistrationStatus::DoRegister().
+ void DoRegisterType(syncable::ModelType model_type);
+
+ // Unregisters the given type, which must be valid.
+ void UnregisterType(syncable::ModelType model_type);
+
+ // Returns true iff the given type, which must be valid, is registered.
+ bool IsTypeRegistered(syncable::ModelType model_type) const;
+
+ base::NonThreadSafe non_thread_safe_;
+ RegistrationStatus registration_statuses_[syncable::MODEL_TYPE_COUNT];
+ // Weak pointer.
+ invalidation::InvalidationClient* invalidation_client_;
+
+ DISALLOW_COPY_AND_ASSIGN(RegistrationManager);
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_REGISTRATION_MANAGER_H_
diff --git a/sync/notifier/registration_manager_unittest.cc b/sync/notifier/registration_manager_unittest.cc
new file mode 100644
index 0000000..21172f8
--- /dev/null
+++ b/sync/notifier/registration_manager_unittest.cc
@@ -0,0 +1,437 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/registration_manager.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstddef>
+#include <deque>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/message_loop.h"
+#include "google/cacheinvalidation/include/invalidation-client.h"
+#include "sync/notifier/invalidation_util.h"
+#include "sync/syncable/model_type.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+namespace {
+
+syncable::ModelType ObjectIdToModelType(
+ const invalidation::ObjectId& object_id) {
+ syncable::ModelType model_type = syncable::UNSPECIFIED;
+ EXPECT_TRUE(ObjectIdToRealModelType(object_id, &model_type));
+ return model_type;
+}
+
+// Fake registration manager that lets you override jitter.
+class FakeRegistrationManager : public RegistrationManager {
+ public:
+ explicit FakeRegistrationManager(
+ invalidation::InvalidationClient* invalidation_client)
+ : RegistrationManager(invalidation_client),
+ jitter_(0.0) {}
+
+ virtual ~FakeRegistrationManager() {}
+
+ void SetJitter(double jitter) {
+ jitter_ = jitter;
+ }
+
+ protected:
+ virtual double GetJitter() {
+ return jitter_;
+ }
+
+ private:
+ double jitter_;
+
+ DISALLOW_COPY_AND_ASSIGN(FakeRegistrationManager);
+};
+
+// Fake invalidation client that just stores the currently-registered
+// model types.
+class FakeInvalidationClient : public invalidation::InvalidationClient {
+ public:
+ FakeInvalidationClient() {}
+
+ virtual ~FakeInvalidationClient() {}
+
+ void LoseRegistration(syncable::ModelType model_type) {
+ EXPECT_TRUE(registered_types_.Has(model_type));
+ registered_types_.Remove(model_type);
+ }
+
+ void LoseAllRegistrations() {
+ registered_types_.Clear();
+ }
+
+ // invalidation::InvalidationClient implementation.
+
+ virtual void Start() {}
+ virtual void Stop() {}
+ virtual void Acknowledge(const invalidation::AckHandle& handle) {}
+
+ virtual void Register(const invalidation::ObjectId& oid) {
+ syncable::ModelType model_type = ObjectIdToModelType(oid);
+ EXPECT_FALSE(registered_types_.Has(model_type));
+ registered_types_.Put(model_type);
+ }
+
+ virtual void Register(const std::vector<invalidation::ObjectId>& oids) {
+ // Unused for now.
+ }
+
+ virtual void Unregister(const invalidation::ObjectId& oid) {
+ syncable::ModelType model_type = ObjectIdToModelType(oid);
+ EXPECT_TRUE(registered_types_.Has(model_type));
+ registered_types_.Remove(model_type);
+ }
+
+ virtual void Unregister(const std::vector<invalidation::ObjectId>& oids) {
+ // Unused for now.
+ }
+
+ const syncable::ModelTypeSet GetRegisteredTypes() const {
+ return registered_types_;
+ }
+
+ private:
+ syncable::ModelTypeSet registered_types_;
+
+ DISALLOW_COPY_AND_ASSIGN(FakeInvalidationClient);
+};
+
+const syncable::ModelType kModelTypes[] = {
+ syncable::BOOKMARKS,
+ syncable::PREFERENCES,
+ syncable::THEMES,
+ syncable::AUTOFILL,
+ syncable::EXTENSIONS,
+};
+const size_t kModelTypeCount = arraysize(kModelTypes);
+
+syncable::ModelTypeSet FromPtr(
+ const syncable::ModelType* types, size_t count) {
+ syncable::ModelTypeSet type_set;
+ for (size_t i = 0; i < count; ++i) {
+ type_set.Put(types[i]);
+ }
+ return type_set;
+}
+
+void ExpectPendingRegistrations(
+ syncable::ModelTypeSet expected_pending_types,
+ double expected_delay_seconds,
+ const RegistrationManager::PendingRegistrationMap& pending_registrations) {
+ syncable::ModelTypeSet pending_types;
+ for (RegistrationManager::PendingRegistrationMap::const_iterator it =
+ pending_registrations.begin(); it != pending_registrations.end();
+ ++it) {
+ SCOPED_TRACE(syncable::ModelTypeToString(it->first));
+ pending_types.Put(it->first);
+ base::TimeDelta offset =
+ it->second.last_registration_request -
+ it->second.registration_attempt;
+ base::TimeDelta expected_delay =
+ base::TimeDelta::FromSeconds(
+ static_cast<int64>(expected_delay_seconds)) + offset;
+ // TODO(akalin): Add base::PrintTo() for base::Time and
+ // base::TimeDeltas.
+ EXPECT_EQ(it->second.delay, expected_delay)
+ << it->second.delay.InMicroseconds()
+ << ", " << expected_delay.InMicroseconds();
+ if (it->second.delay <= base::TimeDelta()) {
+ EXPECT_EQ(it->second.actual_delay, base::TimeDelta());
+ } else {
+ EXPECT_EQ(it->second.delay, it->second.actual_delay);
+ }
+ }
+ EXPECT_TRUE(pending_types.Equals(expected_pending_types));
+}
+
+class RegistrationManagerTest : public testing::Test {
+ protected:
+ RegistrationManagerTest()
+ : fake_registration_manager_(&fake_invalidation_client_) {}
+
+ virtual ~RegistrationManagerTest() {}
+
+ void LoseRegistrations(syncable::ModelTypeSet types) {
+ for (syncable::ModelTypeSet::Iterator it = types.First();
+ it.Good(); it.Inc()) {
+ fake_invalidation_client_.LoseRegistration(it.Get());
+ fake_registration_manager_.MarkRegistrationLost(it.Get());
+ }
+ }
+
+ void DisableTypes(syncable::ModelTypeSet types) {
+ for (syncable::ModelTypeSet::Iterator it = types.First();
+ it.Good(); it.Inc()) {
+ fake_invalidation_client_.LoseRegistration(it.Get());
+ fake_registration_manager_.DisableType(it.Get());
+ }
+ }
+
+ // Used by MarkRegistrationLostBackoff* tests.
+ void RunBackoffTest(double jitter) {
+ fake_registration_manager_.SetJitter(jitter);
+ syncable::ModelTypeSet types = FromPtr(kModelTypes, kModelTypeCount);
+ fake_registration_manager_.SetRegisteredTypes(types);
+
+ // Lose some types.
+ syncable::ModelTypeSet lost_types = FromPtr(kModelTypes, 2);
+ LoseRegistrations(lost_types);
+ ExpectPendingRegistrations(
+ lost_types, 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Trigger another failure to start delaying.
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ LoseRegistrations(lost_types);
+
+ double scaled_jitter =
+ jitter * RegistrationManager::kRegistrationDelayMaxJitter;
+
+ double expected_delay =
+ RegistrationManager::kInitialRegistrationDelaySeconds *
+ (1.0 + scaled_jitter);
+ expected_delay = std::floor(expected_delay);
+ ExpectPendingRegistrations(
+ lost_types, expected_delay,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Trigger another failure.
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ LoseRegistrations(lost_types);
+ expected_delay *=
+ RegistrationManager::kRegistrationDelayExponent + scaled_jitter;
+ expected_delay = std::floor(expected_delay);
+ ExpectPendingRegistrations(
+ lost_types, expected_delay,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Trigger enough failures to hit the ceiling.
+ while (expected_delay < RegistrationManager::kMaxRegistrationDelaySeconds) {
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ LoseRegistrations(lost_types);
+ expected_delay *=
+ RegistrationManager::kRegistrationDelayExponent + scaled_jitter;
+ expected_delay = std::floor(expected_delay);
+ }
+ ExpectPendingRegistrations(
+ lost_types,
+ RegistrationManager::kMaxRegistrationDelaySeconds,
+ fake_registration_manager_.GetPendingRegistrations());
+ }
+
+ FakeInvalidationClient fake_invalidation_client_;
+ FakeRegistrationManager fake_registration_manager_;
+
+ private:
+ // Needed by timers in RegistrationManager.
+ MessageLoop message_loop_;
+
+ DISALLOW_COPY_AND_ASSIGN(RegistrationManagerTest);
+};
+
+TEST_F(RegistrationManagerTest, SetRegisteredTypes) {
+ syncable::ModelTypeSet types = FromPtr(kModelTypes, kModelTypeCount);
+
+ EXPECT_TRUE(fake_registration_manager_.GetRegisteredTypes().Empty());
+ EXPECT_TRUE(fake_invalidation_client_.GetRegisteredTypes().Empty());
+
+ fake_registration_manager_.SetRegisteredTypes(types);
+ EXPECT_TRUE(fake_registration_manager_.GetRegisteredTypes().Equals(types));
+ EXPECT_TRUE(fake_invalidation_client_.GetRegisteredTypes().Equals(types));
+
+ types.Put(syncable::APPS);
+ types.Remove(syncable::BOOKMARKS);
+ fake_registration_manager_.SetRegisteredTypes(types);
+ EXPECT_TRUE(fake_registration_manager_.GetRegisteredTypes().Equals(types));
+ EXPECT_TRUE(fake_invalidation_client_.GetRegisteredTypes().Equals(types));
+}
+
+int GetRoundedBackoff(double retry_interval, double jitter) {
+ const double kInitialRetryInterval = 3.0;
+ const double kMinRetryInterval = 2.0;
+ const double kMaxRetryInterval = 20.0;
+ const double kBackoffExponent = 2.0;
+ const double kMaxJitter = 0.5;
+
+ return static_cast<int>(
+ RegistrationManager::CalculateBackoff(retry_interval,
+ kInitialRetryInterval,
+ kMinRetryInterval,
+ kMaxRetryInterval,
+ kBackoffExponent,
+ jitter,
+ kMaxJitter));
+}
+
+TEST_F(RegistrationManagerTest, CalculateBackoff) {
+ // Test initial.
+ EXPECT_EQ(2, GetRoundedBackoff(0.0, -1.0));
+ EXPECT_EQ(3, GetRoundedBackoff(0.0, 0.0));
+ EXPECT_EQ(4, GetRoundedBackoff(0.0, +1.0));
+
+ // Test non-initial.
+ EXPECT_EQ(4, GetRoundedBackoff(3.0, -1.0));
+ EXPECT_EQ(6, GetRoundedBackoff(3.0, 0.0));
+ EXPECT_EQ(7, GetRoundedBackoff(3.0, +1.0));
+
+ EXPECT_EQ(7, GetRoundedBackoff(5.0, -1.0));
+ EXPECT_EQ(10, GetRoundedBackoff(5.0, 0.0));
+ EXPECT_EQ(12, GetRoundedBackoff(5.0, +1.0));
+
+ // Test ceiling.
+ EXPECT_EQ(19, GetRoundedBackoff(13.0, -1.0));
+ EXPECT_EQ(20, GetRoundedBackoff(13.0, 0.0));
+ EXPECT_EQ(20, GetRoundedBackoff(13.0, +1.0));
+}
+
+TEST_F(RegistrationManagerTest, MarkRegistrationLost) {
+ syncable::ModelTypeSet types = FromPtr(kModelTypes, kModelTypeCount);
+
+ fake_registration_manager_.SetRegisteredTypes(types);
+ EXPECT_TRUE(fake_registration_manager_.GetPendingRegistrations().empty());
+
+ // Lose some types.
+ syncable::ModelTypeSet lost_types = FromPtr(
+ kModelTypes, 3);
+ syncable::ModelTypeSet non_lost_types = FromPtr(
+ kModelTypes + 3, kModelTypeCount - 3);
+ LoseRegistrations(lost_types);
+ ExpectPendingRegistrations(
+ lost_types, 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+ EXPECT_TRUE(
+ fake_registration_manager_.GetRegisteredTypes().Equals(non_lost_types));
+ EXPECT_TRUE(
+ fake_invalidation_client_.GetRegisteredTypes().Equals(non_lost_types));
+
+ // Pretend we waited long enough to re-register.
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ EXPECT_TRUE(
+ fake_registration_manager_.GetRegisteredTypes().Equals(types));
+ EXPECT_TRUE(
+ fake_invalidation_client_.GetRegisteredTypes().Equals(types));
+}
+
+TEST_F(RegistrationManagerTest, MarkRegistrationLostBackoffLow) {
+ RunBackoffTest(-1.0);
+}
+
+TEST_F(RegistrationManagerTest, MarkRegistrationLostBackoffMid) {
+ RunBackoffTest(0.0);
+}
+
+TEST_F(RegistrationManagerTest, MarkRegistrationLostBackoffHigh) {
+ RunBackoffTest(+1.0);
+}
+
+TEST_F(RegistrationManagerTest, MarkRegistrationLostBackoffReset) {
+ syncable::ModelTypeSet types = FromPtr(kModelTypes, kModelTypeCount);
+
+ fake_registration_manager_.SetRegisteredTypes(types);
+
+ // Lose some types.
+ syncable::ModelTypeSet lost_types = FromPtr(kModelTypes, 2);
+ LoseRegistrations(lost_types);
+ ExpectPendingRegistrations(
+ lost_types, 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Trigger another failure to start delaying.
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ LoseRegistrations(lost_types);
+ double expected_delay =
+ RegistrationManager::kInitialRegistrationDelaySeconds;
+ ExpectPendingRegistrations(
+ lost_types, expected_delay,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Set types again.
+ fake_registration_manager_.SetRegisteredTypes(types);
+ ExpectPendingRegistrations(
+ syncable::ModelTypeSet(), 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+}
+
+TEST_F(RegistrationManagerTest, MarkAllRegistrationsLost) {
+ syncable::ModelTypeSet types = FromPtr(kModelTypes, kModelTypeCount);
+
+ fake_registration_manager_.SetRegisteredTypes(types);
+
+ fake_invalidation_client_.LoseAllRegistrations();
+ fake_registration_manager_.MarkAllRegistrationsLost();
+
+ syncable::ModelTypeSet expected_types;
+ EXPECT_TRUE(
+ fake_registration_manager_.GetRegisteredTypes().Equals(expected_types));
+ EXPECT_TRUE(
+ fake_invalidation_client_.GetRegisteredTypes().Equals(expected_types));
+
+ ExpectPendingRegistrations(
+ types, 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Trigger another failure to start delaying.
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ fake_invalidation_client_.LoseAllRegistrations();
+ fake_registration_manager_.MarkAllRegistrationsLost();
+ double expected_delay =
+ RegistrationManager::kInitialRegistrationDelaySeconds;
+ ExpectPendingRegistrations(
+ types, expected_delay,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ // Pretend we waited long enough to re-register.
+ fake_registration_manager_.FirePendingRegistrationsForTest();
+ EXPECT_TRUE(
+ fake_registration_manager_.GetRegisteredTypes().Equals(types));
+ EXPECT_TRUE(
+ fake_invalidation_client_.GetRegisteredTypes().Equals(types));
+}
+
+TEST_F(RegistrationManagerTest, DisableType) {
+ syncable::ModelTypeSet types = FromPtr(kModelTypes, kModelTypeCount);
+
+ fake_registration_manager_.SetRegisteredTypes(types);
+ EXPECT_TRUE(fake_registration_manager_.GetPendingRegistrations().empty());
+
+ // Disable some types.
+ syncable::ModelTypeSet disabled_types = FromPtr(
+ kModelTypes, 3);
+ syncable::ModelTypeSet enabled_types = FromPtr(
+ kModelTypes + 3, kModelTypeCount - 3);
+ DisableTypes(disabled_types);
+ ExpectPendingRegistrations(
+ syncable::ModelTypeSet(), 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+ EXPECT_TRUE(
+ fake_registration_manager_.GetRegisteredTypes().Equals(enabled_types));
+ EXPECT_TRUE(
+ fake_invalidation_client_.GetRegisteredTypes().Equals(enabled_types));
+
+ fake_registration_manager_.SetRegisteredTypes(types);
+ EXPECT_TRUE(
+ fake_registration_manager_.GetRegisteredTypes().Equals(enabled_types));
+
+ fake_registration_manager_.MarkRegistrationLost(
+ disabled_types.First().Get());
+ ExpectPendingRegistrations(
+ syncable::ModelTypeSet(), 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+
+ fake_registration_manager_.MarkAllRegistrationsLost();
+ ExpectPendingRegistrations(
+ enabled_types, 0.0,
+ fake_registration_manager_.GetPendingRegistrations());
+}
+
+} // namespace
+} // namespace notifier
diff --git a/sync/notifier/state_writer.h b/sync/notifier/state_writer.h
new file mode 100644
index 0000000..c1b72b8
--- /dev/null
+++ b/sync/notifier/state_writer.h
@@ -0,0 +1,24 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Simple interface for something that persists state.
+
+#ifndef SYNC_NOTIFIER_STATE_WRITER_H_
+#define SYNC_NOTIFIER_STATE_WRITER_H_
+#pragma once
+
+#include <string>
+
+namespace sync_notifier {
+
+class StateWriter {
+ public:
+ virtual ~StateWriter() {}
+
+ virtual void WriteState(const std::string& state) = 0;
+};
+
+} // sync_notifier
+
+#endif // SYNC_NOTIFIER_STATE_WRITER_H_
diff --git a/sync/notifier/sync_notifier.h b/sync/notifier/sync_notifier.h
new file mode 100644
index 0000000..288d0ee
--- /dev/null
+++ b/sync/notifier/sync_notifier.h
@@ -0,0 +1,55 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Interface to the sync notifier, which is an object that receives
+// notifications when updates are available for a set of sync types.
+// All the observers are notified when such an event happens.
+
+#ifndef SYNC_NOTIFIER_SYNC_NOTIFIER_H_
+#define SYNC_NOTIFIER_SYNC_NOTIFIER_H_
+
+#include <string>
+
+#include "sync/syncable/model_type.h"
+
+namespace sync_notifier {
+class SyncNotifierObserver;
+
+class SyncNotifier {
+ public:
+ SyncNotifier() {}
+ virtual ~SyncNotifier() {}
+
+ virtual void AddObserver(SyncNotifierObserver* observer) = 0;
+ virtual void RemoveObserver(SyncNotifierObserver* observer) = 0;
+
+ // SetUniqueId must be called once, before any call to
+ // UpdateCredentials. |unique_id| should be a non-empty globally
+ // unique string.
+ virtual void SetUniqueId(const std::string& unique_id) = 0;
+
+ // SetState must be called once, before any call to
+ // UpdateCredentials. |state| may be empty.
+ virtual void SetState(const std::string& state) = 0;
+
+ // The observers won't be notified of any notifications until
+ // UpdateCredentials is called at least once. It can be called more than
+ // once.
+ virtual void UpdateCredentials(
+ const std::string& email, const std::string& token) = 0;
+
+ virtual void UpdateEnabledTypes(
+ syncable::ModelTypeSet enabled_types) = 0;
+
+ // This is here only to support the old p2p notification implementation,
+ // which is still used by sync integration tests.
+ // TODO(akalin): Remove this once we move the integration tests off p2p
+ // notifications.
+ virtual void SendNotification(
+ syncable::ModelTypeSet changed_types) = 0;
+};
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_SYNC_NOTIFIER_H_
+
diff --git a/sync/notifier/sync_notifier_factory.cc b/sync/notifier/sync_notifier_factory.cc
new file mode 100644
index 0000000..f547633
--- /dev/null
+++ b/sync/notifier/sync_notifier_factory.cc
@@ -0,0 +1,69 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/sync_notifier_factory.h"
+
+#include <string>
+
+#include "base/logging.h"
+#include "jingle/notifier/listener/mediator_thread_impl.h"
+#include "jingle/notifier/listener/talk_mediator_impl.h"
+#include "sync/notifier/non_blocking_invalidation_notifier.h"
+#include "sync/notifier/p2p_notifier.h"
+#include "sync/notifier/sync_notifier.h"
+
+namespace sync_notifier {
+namespace {
+
+SyncNotifier* CreateDefaultSyncNotifier(
+ const notifier::NotifierOptions& notifier_options,
+ const InvalidationVersionMap& initial_max_invalidation_versions,
+ const browser_sync::WeakHandle<InvalidationVersionTracker>&
+ invalidation_version_tracker,
+ const std::string& client_info) {
+ if (notifier_options.notification_method == notifier::NOTIFICATION_P2P) {
+ notifier::TalkMediator* const talk_mediator =
+ new notifier::TalkMediatorImpl(
+ new notifier::MediatorThreadImpl(notifier_options),
+ notifier_options);
+ // TODO(rlarocque): Ideally, the notification target would be
+ // NOTIFY_OTHERS. There's no good reason to notify ourselves of our own
+ // commits. We self-notify for now only because the integration tests rely
+ // on this behaviour. See crbug.com/97780.
+ //
+ // Takes ownership of |talk_mediator|.
+ return new P2PNotifier(talk_mediator, NOTIFY_ALL);
+ }
+
+ return new NonBlockingInvalidationNotifier(
+ notifier_options, initial_max_invalidation_versions,
+ invalidation_version_tracker, client_info);
+}
+
+} // namespace
+
+SyncNotifierFactory::SyncNotifierFactory(
+ const notifier::NotifierOptions& notifier_options,
+ const std::string& client_info,
+ const base::WeakPtr<InvalidationVersionTracker>&
+ invalidation_version_tracker)
+ : notifier_options_(notifier_options),
+ client_info_(client_info),
+ initial_max_invalidation_versions_(
+ invalidation_version_tracker.get() ?
+ invalidation_version_tracker->GetAllMaxVersions() :
+ InvalidationVersionMap()),
+ invalidation_version_tracker_(invalidation_version_tracker) {
+}
+
+SyncNotifierFactory::~SyncNotifierFactory() {
+}
+
+SyncNotifier* SyncNotifierFactory::CreateSyncNotifier() {
+ return CreateDefaultSyncNotifier(notifier_options_,
+ initial_max_invalidation_versions_,
+ invalidation_version_tracker_,
+ client_info_);
+}
+} // namespace sync_notifier
diff --git a/sync/notifier/sync_notifier_factory.h b/sync/notifier/sync_notifier_factory.h
new file mode 100644
index 0000000..4aab659
--- /dev/null
+++ b/sync/notifier/sync_notifier_factory.h
@@ -0,0 +1,48 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_NOTIFIER_SYNC_NOTIFIER_FACTORY_H_
+#define SYNC_NOTIFIER_SYNC_NOTIFIER_FACTORY_H_
+
+#include <string>
+
+#include "base/memory/weak_ptr.h"
+#include "jingle/notifier/base/notifier_options.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/util/weak_handle.h"
+
+namespace sync_notifier {
+
+class SyncNotifier;
+
+// Class to instantiate various implementations of the SyncNotifier
+// interface.
+class SyncNotifierFactory {
+ public:
+ // |client_info| is a string identifying the client, e.g. a user
+ // agent string. |invalidation_version_tracker| may be NULL (for
+ // tests).
+ SyncNotifierFactory(
+ const notifier::NotifierOptions& notifier_options,
+ const std::string& client_info,
+ const base::WeakPtr<InvalidationVersionTracker>&
+ invalidation_version_tracker);
+ ~SyncNotifierFactory();
+
+ // Creates a sync notifier. Caller takes ownership of the returned
+ // object. However, the returned object must not outlive the
+ // factory from which it was created. Can be called on any thread.
+ SyncNotifier* CreateSyncNotifier();
+
+ private:
+ const notifier::NotifierOptions notifier_options_;
+ const std::string client_info_;
+ const InvalidationVersionMap initial_max_invalidation_versions_;
+ const browser_sync::WeakHandle<InvalidationVersionTracker>
+ invalidation_version_tracker_;
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_SYNC_NOTIFIER_FACTORY_H_
diff --git a/sync/notifier/sync_notifier_factory_unittest.cc b/sync/notifier/sync_notifier_factory_unittest.cc
new file mode 100644
index 0000000..f895772
--- /dev/null
+++ b/sync/notifier/sync_notifier_factory_unittest.cc
@@ -0,0 +1,78 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/sync_notifier_factory.h"
+
+#include "base/command_line.h"
+#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop.h"
+#include "base/threading/thread.h"
+#include "jingle/notifier/base/notification_method.h"
+#include "jingle/notifier/base/notifier_options.h"
+#include "net/url_request/url_request_test_util.h"
+#include "sync/notifier/invalidation_version_tracker.h"
+#include "sync/notifier/mock_sync_notifier_observer.h"
+#include "sync/notifier/sync_notifier.h"
+#include "sync/syncable/model_type.h"
+#include "sync/syncable/model_type_payload_map.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace sync_notifier {
+namespace {
+
+using ::testing::Mock;
+using ::testing::NiceMock;
+using ::testing::StrictMock;
+
+class SyncNotifierFactoryTest : public testing::Test {
+ protected:
+
+ virtual void SetUp() OVERRIDE {
+ notifier_options_.request_context_getter =
+ new TestURLRequestContextGetter(message_loop_.message_loop_proxy());
+ }
+
+ virtual void TearDown() OVERRIDE {
+ Mock::VerifyAndClearExpectations(&mock_observer_);
+ message_loop_.RunAllPending();
+ }
+
+ MessageLoop message_loop_;
+ StrictMock<MockSyncNotifierObserver> mock_observer_;
+ notifier::NotifierOptions notifier_options_;
+ scoped_ptr<SyncNotifierFactory> factory_;
+};
+
+// Test basic creation of a NonBlockingInvalidationNotifier.
+TEST_F(SyncNotifierFactoryTest, Basic) {
+ notifier_options_.notification_method = notifier::NOTIFICATION_SERVER;
+ SyncNotifierFactory factory(
+ notifier_options_,
+ "test client info",
+ base::WeakPtr<sync_notifier::InvalidationVersionTracker>());
+ scoped_ptr<SyncNotifier> notifier(factory.CreateSyncNotifier());
+ ASSERT_TRUE(notifier.get());
+ notifier->AddObserver(&mock_observer_);
+ notifier->RemoveObserver(&mock_observer_);
+}
+
+// Test basic creation of a P2PNotifier.
+TEST_F(SyncNotifierFactoryTest, Basic_P2P) {
+ notifier_options_.notification_method = notifier::NOTIFICATION_P2P;
+ SyncNotifierFactory factory(
+ notifier_options_,
+ "test client info",
+ base::WeakPtr<sync_notifier::InvalidationVersionTracker>());
+ scoped_ptr<SyncNotifier> notifier(factory.CreateSyncNotifier());
+ ASSERT_TRUE(notifier.get());
+ notifier->AddObserver(&mock_observer_);
+ notifier->RemoveObserver(&mock_observer_);
+}
+
+} // namespace
+} // namespace sync_notifier
diff --git a/sync/notifier/sync_notifier_observer.h b/sync/notifier/sync_notifier_observer.h
new file mode 100644
index 0000000..0d6cc02
--- /dev/null
+++ b/sync/notifier/sync_notifier_observer.h
@@ -0,0 +1,39 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_NOTIFIER_SYNC_NOTIFIER_OBSERVER_H_
+#define SYNC_NOTIFIER_SYNC_NOTIFIER_OBSERVER_H_
+#pragma once
+
+#include <string>
+
+#include "sync/syncable/model_type_payload_map.h"
+
+namespace sync_notifier {
+
+enum IncomingNotificationSource {
+ // The server is notifying us that one or more datatypes have stale data.
+ REMOTE_NOTIFICATION,
+ // A chrome datatype is requesting an optimistic refresh of its data.
+ LOCAL_NOTIFICATION,
+};
+
+class SyncNotifierObserver {
+ public:
+ SyncNotifierObserver() {}
+ virtual ~SyncNotifierObserver() {}
+
+ virtual void OnIncomingNotification(
+ const syncable::ModelTypePayloadMap& type_payloads,
+ IncomingNotificationSource source) = 0;
+ virtual void OnNotificationStateChange(bool notifications_enabled) = 0;
+
+ // TODO(nileshagrawal): Find a way to hide state handling inside the
+ // sync notifier implementation.
+ virtual void StoreState(const std::string& state) = 0;
+};
+
+} // namespace sync_notifier
+
+#endif // SYNC_NOTIFIER_SYNC_NOTIFIER_OBSERVER_H_
diff --git a/sync/sync.gyp b/sync/sync.gyp
index 599f915..59073ad 100644
--- a/sync/sync.gyp
+++ b/sync/sync.gyp
@@ -184,6 +184,50 @@
],
},
+ # The sync notifications library.
+ {
+ 'target_name': 'sync_notifier',
+ 'type': 'static_library',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '..',
+ ],
+ 'dependencies': [
+ '../base/base.gyp:base',
+ '../jingle/jingle.gyp:notifier',
+ '../third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
+ '../third_party/libjingle/libjingle.gyp:libjingle',
+ 'sync',
+ ],
+ 'export_dependent_settings': [
+ '../jingle/jingle.gyp:notifier',
+ '../third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
+ ],
+ 'sources': [
+ 'notifier/cache_invalidation_packet_handler.cc',
+ 'notifier/cache_invalidation_packet_handler.h',
+ 'notifier/chrome_invalidation_client.cc',
+ 'notifier/chrome_invalidation_client.h',
+ 'notifier/chrome_system_resources.cc',
+ 'notifier/chrome_system_resources.h',
+ 'notifier/invalidation_notifier.h',
+ 'notifier/invalidation_notifier.cc',
+ 'notifier/invalidation_util.cc',
+ 'notifier/invalidation_util.h',
+ 'notifier/invalidation_version_tracker.h',
+ 'notifier/non_blocking_invalidation_notifier.h',
+ 'notifier/non_blocking_invalidation_notifier.cc',
+ 'notifier/p2p_notifier.h',
+ 'notifier/p2p_notifier.cc',
+ 'notifier/registration_manager.cc',
+ 'notifier/registration_manager.h',
+ 'notifier/state_writer.h',
+ 'notifier/sync_notifier.h',
+ 'notifier/sync_notifier_factory.h',
+ 'notifier/sync_notifier_factory.cc',
+ ],
+ },
+
# Test support files for the 'sync' target.
{
'target_name': 'test_support_sync',
@@ -243,6 +287,28 @@
],
},
+ # Test support files for the 'sync_notifier' target.
+ {
+ 'target_name': 'test_support_sync_notifier',
+ 'type': 'static_library',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '..',
+ ],
+ 'dependencies': [
+ '../testing/gmock.gyp:gmock',
+ 'sync_notifier',
+ ],
+ 'export_dependent_settings': [
+ '../testing/gmock.gyp:gmock',
+ 'sync_notifier',
+ ],
+ 'sources': [
+ 'notifier/mock_sync_notifier_observer.cc',
+ 'notifier/mock_sync_notifier_observer.h',
+ ],
+ },
+
# Unit tests for the 'sync' target. This cannot be a static
# library because the unit test files have to be compiled directly
# into the executable, so we push the target files to the
@@ -323,6 +389,59 @@
},
},
+ # Unit tests for the 'sync_notifier' target. This cannot be a static
+ # library because the unit test files have to be compiled directly
+ # into the executable, so we push the target files to the
+ # depending executable target via direct_dependent_settings.
+ {
+ 'target_name': 'sync_notifier_tests',
+ 'type': 'none',
+ # We only want unit test executables to include this target.
+ 'suppress_wildcard': 1,
+ 'dependencies': [
+ '../base/base.gyp:base',
+ '../jingle/jingle.gyp:notifier_test_util',
+ '../net/net.gyp:net_test_support',
+ '../testing/gmock.gyp:gmock',
+ '../testing/gtest.gyp:gtest',
+ '../third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
+ '../third_party/libjingle/libjingle.gyp:libjingle',
+ 'sync',
+ 'sync_notifier',
+ 'test_support_sync_notifier',
+ ],
+ # Propagate all dependencies since the actual compilation
+ # happens in the dependents.
+ 'export_dependent_settings': [
+ '../base/base.gyp:base',
+ '../jingle/jingle.gyp:notifier_test_util',
+ '../net/net.gyp:net_test_support',
+ '../testing/gmock.gyp:gmock',
+ '../testing/gtest.gyp:gtest',
+ '../third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
+ '../third_party/libjingle/libjingle.gyp:libjingle',
+ 'sync',
+ 'sync_notifier',
+ 'test_support_sync_notifier',
+ ],
+ 'direct_dependent_settings': {
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '..',
+ ],
+ 'sources': [
+ 'notifier/cache_invalidation_packet_handler_unittest.cc',
+ 'notifier/chrome_invalidation_client_unittest.cc',
+ 'notifier/chrome_system_resources_unittest.cc',
+ 'notifier/invalidation_notifier_unittest.cc',
+ 'notifier/non_blocking_invalidation_notifier_unittest.cc',
+ 'notifier/p2p_notifier_unittest.cc',
+ 'notifier/registration_manager_unittest.cc',
+ 'notifier/sync_notifier_factory_unittest.cc',
+ ],
+ },
+ },
+
# The unit test executable for sync tests. Currently this isn't
# automatically run, as there is already a sync_unit_tests
# executable in chrome.gyp; this is just to make sure that all the
@@ -337,6 +456,7 @@
'dependencies': [
'../base/base.gyp:run_all_unittests',
'sync_tests',
+ 'sync_notifier_tests',
],
# TODO(akalin): This is needed because histogram.cc uses
# leak_annotations.h, which pulls this in. Make 'base'