summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-23 00:16:45 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-23 00:16:45 +0000
commit3d47a26aed8fd135280d4924d9e9bb36cb624946 (patch)
treef515a3604302e123d67a04c80079df4da6fc6b35 /chrome/browser/sync
parent51e413c3e9f11186b5026123a4610dd1dc005416 (diff)
downloadchromium_src-3d47a26aed8fd135280d4924d9e9bb36cb624946.zip
chromium_src-3d47a26aed8fd135280d4924d9e9bb36cb624946.tar.gz
chromium_src-3d47a26aed8fd135280d4924d9e9bb36cb624946.tar.bz2
Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use --sync-notification-method=server to turn on). BUG=34647 TEST=manually Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=50479 Review URL: http://codereview.chromium.org/2827014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50550 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
-rw-r--r--chrome/browser/sync/engine/syncapi.cc42
-rw-r--r--chrome/browser/sync/notification_method.cc5
-rw-r--r--chrome/browser/sync/notification_method.h24
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc243
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.h58
-rw-r--r--chrome/browser/sync/notifier/chrome_invalidation_client.cc74
-rw-r--r--chrome/browser/sync/notifier/chrome_invalidation_client.h65
-rw-r--r--chrome/browser/sync/notifier/chrome_system_resources.cc75
-rw-r--r--chrome/browser/sync/notifier/chrome_system_resources.h50
-rw-r--r--chrome/browser/sync/notifier/invalidation_util.cc82
-rw-r--r--chrome/browser/sync/notifier/invalidation_util.h36
-rw-r--r--chrome/browser/sync/notifier/server_notifier_thread.cc201
-rw-r--r--chrome/browser/sync/notifier/server_notifier_thread.h100
-rw-r--r--chrome/browser/sync/tools/sync_listen_notifications.cc412
-rw-r--r--chrome/browser/sync/tools/sync_tools.gyp1
15 files changed, 1064 insertions, 404 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc
index 20d6a2c..c242d47 100644
--- a/chrome/browser/sync/engine/syncapi.cc
+++ b/chrome/browser/sync/engine/syncapi.cc
@@ -32,6 +32,7 @@
#include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h"
#include "chrome/browser/sync/engine/syncer.h"
#include "chrome/browser/sync/engine/syncer_thread.h"
+#include "chrome/browser/sync/notifier/server_notifier_thread.h"
#include "chrome/browser/sync/protocol/autofill_specifics.pb.h"
#include "chrome/browser/sync/protocol/bookmark_specifics.pb.h"
#include "chrome/browser/sync/protocol/extension_specifics.pb.h"
@@ -1285,12 +1286,23 @@ bool SyncManager::SyncInternal::Init(
// jank if we try to shut down sync. Fix this.
connection_manager()->CheckServerReachable();
+ // NOTIFICATION_SERVER uses a substantially different notification
+ // method, so it has its own MediatorThread implementation.
+ // Everything else just uses MediatorThreadImpl.
+ notifier::MediatorThread* mediator_thread =
+ (notification_method == browser_sync::NOTIFICATION_SERVER) ?
+ static_cast<notifier::MediatorThread*>(
+ new sync_notifier::ServerNotifierThread(
+ network_change_notifier_thread)) :
+ static_cast<notifier::MediatorThread*>(
+ new notifier::MediatorThreadImpl(network_change_notifier_thread));
const bool kInitializeSsl = true;
const bool kConnectImmediately = false;
talk_mediator_.reset(new TalkMediatorImpl(
- new notifier::MediatorThreadImpl(network_change_notifier_thread),
+ mediator_thread,
kInitializeSsl, kConnectImmediately, invalidate_xmpp_auth_token));
- if (notification_method != browser_sync::NOTIFICATION_LEGACY) {
+ if (notification_method != browser_sync::NOTIFICATION_LEGACY &&
+ notification_method != browser_sync::NOTIFICATION_SERVER) {
if (notification_method == browser_sync::NOTIFICATION_TRANSITIONAL) {
talk_mediator_->AddSubscribedServiceUrl(
browser_sync::kSyncLegacyServiceUrl);
@@ -1379,6 +1391,7 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() {
void SyncManager::SyncInternal::SendPendingXMPPNotification(
bool new_pending_notification) {
DCHECK_EQ(MessageLoop::current(), core_message_loop_);
+ DCHECK_NE(notification_method_, browser_sync::NOTIFICATION_SERVER);
notification_pending_ = notification_pending_ || new_pending_notification;
if (!notification_pending_) {
LOG(INFO) << "Not sending notification: no pending notification";
@@ -1772,16 +1785,18 @@ void SyncManager::SyncInternal::HandleChannelEvent(const SyncerEvent& event) {
observer_->OnSyncCycleCompleted(event.snapshot);
}
- // TODO(chron): Consider changing this back to track has_more_to_sync
- // only notify peers if a successful commit has occurred.
- bool new_pending_notification =
- (event.snapshot->syncer_status.num_successful_commits > 0);
- core_message_loop_->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this,
- &SyncManager::SyncInternal::SendPendingXMPPNotification,
- new_pending_notification));
+ if (notification_method_ != browser_sync::NOTIFICATION_SERVER) {
+ // TODO(chron): Consider changing this back to track has_more_to_sync
+ // only notify peers if a successful commit has occurred.
+ bool new_pending_notification =
+ (event.snapshot->syncer_status.num_successful_commits > 0);
+ core_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &SyncManager::SyncInternal::SendPendingXMPPNotification,
+ new_pending_notification));
+ }
}
if (event.what_happened == SyncerEvent::PAUSED) {
@@ -1904,7 +1919,8 @@ void SyncManager::SyncInternal::OnNotificationStateChange(
if (syncer_thread()) {
syncer_thread()->SetNotificationsEnabled(notifications_enabled);
}
- if (notifications_enabled) {
+ if ((notification_method_ != browser_sync::NOTIFICATION_SERVER) &&
+ notifications_enabled) {
// Send a notification as soon as subscriptions are on
// (see http://code.google.com/p/chromium/issues/detail?id=38563 ).
core_message_loop_->PostTask(
diff --git a/chrome/browser/sync/notification_method.cc b/chrome/browser/sync/notification_method.cc
index aec4a9c..9c00344 100644
--- a/chrome/browser/sync/notification_method.cc
+++ b/chrome/browser/sync/notification_method.cc
@@ -24,6 +24,9 @@ std::string NotificationMethodToString(
case NOTIFICATION_NEW:
return "NOTIFICATION_NEW";
break;
+ case NOTIFICATION_SERVER:
+ return "NOTIFICATION_SERVER";
+ break;
default:
LOG(WARNING) << "Unknown value for notification method: "
<< notification_method;
@@ -39,6 +42,8 @@ NotificationMethod StringToNotificationMethod(const std::string& str) {
return NOTIFICATION_TRANSITIONAL;
} else if (str == "new") {
return NOTIFICATION_NEW;
+ } else if (str == "server") {
+ return NOTIFICATION_SERVER;
}
LOG(WARNING) << "Unknown notification method \"" << str
<< "\"; using method "
diff --git a/chrome/browser/sync/notification_method.h b/chrome/browser/sync/notification_method.h
index 569f8d2..4e1bcd1 100644
--- a/chrome/browser/sync/notification_method.h
+++ b/chrome/browser/sync/notification_method.h
@@ -10,7 +10,7 @@
namespace browser_sync {
// This is the matrix for the interaction between clients with
-// different notification methods:
+// different notification methods (except for NOTIFICATION_SERVER):
//
// Listen
// L T N
@@ -24,6 +24,12 @@ namespace browser_sync {
// will receive notifications from a client sending with the row
// notification method. 'E' means means that the notification will be
// an empty one, which may be dropped by the server in the future.
+//
+// As for NOTIFICATION_SERVER, server-issued notifications will also
+// simulate a peer-issued notification, so that any client with
+// NOTIFICATION_TRANSITIONAL or NOTIFICATION_NEW will be able to
+// receive those, too. This support will be removed once everyone is
+// on NOTIFICATION_SERVER.
enum NotificationMethod {
// Old, broken notification method. Works only if notification
@@ -33,9 +39,17 @@ enum NotificationMethod {
// notifications if the notification servers don't drop empty
// notifications.
NOTIFICATION_TRANSITIONAL,
- // New, ideal notification method. Compatible only with
- // transitional notifications.
+ // New notification method. Compatible only with transitional
+ // notifications.
+ //
+ // NOTE: "New" is kind of a misnomer, as it refers only to
+ // peer-issued notifications; the plan is to migrate everyone to
+ // using NOTIFICATION_SERVER.
NOTIFICATION_NEW,
+
+ // Server-issued notifications. Compatible only with transitional
+ // notifications.
+ NOTIFICATION_SERVER,
};
extern const NotificationMethod kDefaultNotificationMethod;
@@ -43,8 +57,8 @@ extern const NotificationMethod kDefaultNotificationMethod;
std::string NotificationMethodToString(
NotificationMethod notification_method);
-// If the given string is not one of "legacy", "transitional", or
-// "new", returns kDefaultNotificationMethod.
+// If the given string is not one of "legacy", "transitional", "new",
+// or "server", returns kDefaultNotificationMethod.
NotificationMethod StringToNotificationMethod(const std::string& str);
} // namespace browser_sync
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
new file mode 100644
index 0000000..dbc3d54
--- /dev/null
+++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
@@ -0,0 +1,243 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
+
+#include "base/base64.h"
+#include "base/callback.h"
+#include "base/logging.h"
+#include "chrome/common/net/notifier/listener/xml_element_util.h"
+#include "google/cacheinvalidation/invalidation-client.h"
+#include "talk/xmpp/constants.h"
+#include "talk/xmpp/jid.h"
+#include "talk/xmpp/xmppclient.h"
+#include "talk/xmpp/xmpptask.h"
+
+namespace sync_notifier {
+
+namespace {
+
+const char kBotJid[] = "tango@bot.talk.google.com";
+
+// TODO(akalin): Eliminate use of 'tango' name.
+
+// TODO(akalin): Hash out details of tango IQ protocol.
+
+static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
+static const buzz::QName kQnTangoIqPacketContent(
+ "google:tango", "content");
+
+// TODO(akalin): Move these task classes out so that they can be
+// unit-tested. This'll probably be done easier once we consolidate
+// all the packet sending/receiving classes.
+
+// A task that listens for ClientInvalidation messages and calls the
+// given callback on them.
+class CacheInvalidationListenTask : public buzz::XmppTask {
+ public:
+ // Takes ownership of callback.
+ CacheInvalidationListenTask(Task* parent,
+ Callback1<const std::string&>::Type* callback)
+ : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
+ virtual ~CacheInvalidationListenTask() {}
+
+ virtual int ProcessStart() {
+ LOG(INFO) << "CacheInvalidationListenTask started";
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationListenTask blocked";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationListenTask response received";
+ std::string data;
+ if (GetTangoIqPacketData(stanza, &data)) {
+ callback_->Run(data);
+ } else {
+ LOG(ERROR) << "Could not get packet data";
+ }
+ // Acknowledge receipt of the iq to the buzz server.
+ // TODO(akalin): Send an error response for malformed packets.
+ scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
+ SendStanza(response_stanza.get());
+ return STATE_RESPONSE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (IsValidTangoIqPacket(stanza)) {
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+
+ private:
+ bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
+ return
+ (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
+ (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
+ }
+
+ bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
+ std::string* data) {
+ DCHECK(IsValidTangoIqPacket(stanza));
+ const buzz::XmlElement* tango_iq_packet =
+ stanza->FirstNamed(kQnTangoIqPacket);
+ if (!tango_iq_packet) {
+ LOG(ERROR) << "Could not find tango IQ packet element";
+ return false;
+ }
+ const buzz::XmlElement* tango_iq_packet_content =
+ tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
+ if (!tango_iq_packet_content) {
+ LOG(ERROR) << "Could not find tango IQ packet content element";
+ return false;
+ }
+ *data = tango_iq_packet_content->BodyText();
+ return true;
+ }
+
+ scoped_ptr<Callback1<const std::string&>::Type> callback_;
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
+};
+
+// A task that sends a single outbound ClientInvalidation message.
+class CacheInvalidationSendMessageTask : public buzz::XmppTask {
+ public:
+ CacheInvalidationSendMessageTask(Task* parent,
+ const buzz::Jid& to_jid,
+ const std::string& msg)
+ : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
+ to_jid_(to_jid), msg_(msg) {}
+ virtual ~CacheInvalidationSendMessageTask() {}
+
+ virtual int ProcessStart() {
+ scoped_ptr<buzz::XmlElement> stanza(
+ MakeTangoIqPacket(to_jid_, task_id(), msg_));
+ LOG(INFO) << "Sending message: "
+ << notifier::XmlElementToString(*stanza.get());
+ if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
+ LOG(INFO) << "Error when sending message";
+ return STATE_ERROR;
+ }
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
+ << notifier::XmlElementToString(*stanza);
+ // TODO(akalin): Handle errors here.
+ return STATE_DONE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (!MatchResponseIq(stanza, to_jid_, task_id())) {
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+
+ private:
+ static buzz::XmlElement* MakeTangoIqPacket(
+ const buzz::Jid& to_jid,
+ const std::string& task_id,
+ const std::string& msg) {
+ buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
+ buzz::XmlElement* tango_iq_packet =
+ new buzz::XmlElement(kQnTangoIqPacket, true);
+ iq->AddElement(tango_iq_packet);
+ buzz::XmlElement* tango_iq_packet_content =
+ new buzz::XmlElement(kQnTangoIqPacketContent, true);
+ tango_iq_packet->AddElement(tango_iq_packet_content);
+ tango_iq_packet_content->SetBodyText(msg);
+ return iq;
+ }
+
+ const buzz::Jid to_jid_;
+ std::string msg_;
+
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
+};
+
+} // namespace
+
+CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
+ buzz::XmppClient* xmpp_client,
+ invalidation::InvalidationClient* invalidation_client)
+ : xmpp_client_(xmpp_client),
+ invalidation_client_(invalidation_client) {
+ CHECK(xmpp_client_);
+ CHECK(invalidation_client_);
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ CHECK(network_endpoint);
+ network_endpoint->RegisterOutboundListener(
+ invalidation::NewPermanentCallback(
+ this,
+ &CacheInvalidationPacketHandler::HandleOutboundPacket));
+ // Owned by xmpp_client.
+ CacheInvalidationListenTask* listen_task =
+ new CacheInvalidationListenTask(
+ xmpp_client, NewCallback(
+ this, &CacheInvalidationPacketHandler::HandleInboundPacket));
+ listen_task->Start();
+}
+
+CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ CHECK(network_endpoint);
+ network_endpoint->RegisterOutboundListener(NULL);
+}
+
+void CacheInvalidationPacketHandler::HandleOutboundPacket(
+ invalidation::NetworkEndpoint* const& network_endpoint) {
+ CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
+ invalidation::string message;
+ network_endpoint->TakeOutboundMessage(&message);
+ std::string encoded_message;
+ if (!base::Base64Encode(message, &encoded_message)) {
+ LOG(ERROR) << "Could not base64-encode message to send: "
+ << message;
+ return;
+ }
+ // Owned by xmpp_client.
+ CacheInvalidationSendMessageTask* send_message_task =
+ new CacheInvalidationSendMessageTask(xmpp_client_,
+ buzz::Jid(kBotJid),
+ encoded_message);
+ send_message_task->Start();
+}
+
+void CacheInvalidationPacketHandler::HandleInboundPacket(
+ const std::string& packet) {
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ std::string decoded_message;
+ if (!base::Base64Decode(packet, &decoded_message)) {
+ LOG(ERROR) << "Could not base64-decode received message: "
+ << packet;
+ return;
+ }
+ network_endpoint->HandleInboundMessage(decoded_message);
+}
+
+} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
new file mode 100644
index 0000000..2ab1b3d
--- /dev/null
+++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
@@ -0,0 +1,58 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Class that handles the details of sending and receiving client
+// invalidation packets.
+
+#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
+#define CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "talk/xmpp/jid.h"
+
+namespace buzz {
+class XmppClient;
+} // namespace buzz
+
+namespace invalidation {
+class InvalidationClient;
+class NetworkEndpoint;
+} // namespace invalidation
+
+namespace sync_notifier {
+
+// TODO(akalin): Add a NonThreadSafe member to this class and use it.
+
+class CacheInvalidationPacketHandler {
+ public:
+ // Starts routing packets from |invalidation_client| through
+ // |xmpp_client|. |invalidation_client| must not already be routing
+ // packets through something. Does not take ownership of
+ // |xmpp_client| or |invalidation_client|.
+ CacheInvalidationPacketHandler(
+ buzz::XmppClient* xmpp_client,
+ invalidation::InvalidationClient* invalidation_client);
+
+ // Makes the invalidation client passed into the constructor not
+ // route packets through the XMPP client passed into the constructor
+ // anymore.
+ ~CacheInvalidationPacketHandler();
+
+ private:
+ void HandleOutboundPacket(
+ invalidation::NetworkEndpoint* const& network_endpoint);
+
+ void HandleInboundPacket(const std::string& packet);
+
+ buzz::XmppClient* xmpp_client_;
+ invalidation::InvalidationClient* invalidation_client_;
+
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler);
+};
+
+} // namespace sync_notifier
+
+#endif // CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.cc b/chrome/browser/sync/notifier/chrome_invalidation_client.cc
new file mode 100644
index 0000000..3191b8d
--- /dev/null
+++ b/chrome/browser/sync/notifier/chrome_invalidation_client.cc
@@ -0,0 +1,74 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
+
+#include <string>
+#include <vector>
+
+#include "base/logging.h"
+#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
+#include "chrome/browser/sync/notifier/invalidation_util.h"
+#include "google/cacheinvalidation/invalidation-client-impl.h"
+
+namespace sync_notifier {
+
+ChromeInvalidationClient::ChromeInvalidationClient() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+}
+
+ChromeInvalidationClient::~ChromeInvalidationClient() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ Stop();
+}
+
+void ChromeInvalidationClient::Start(
+ const std::string& app_name,
+ invalidation::InvalidationListener* listener,
+ buzz::XmppClient* xmpp_client) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ Stop();
+
+ chrome_system_resources_.StartScheduler();
+
+ invalidation::ClientType client_type;
+ client_type.set_type(invalidation::ClientType::CHROME_SYNC);
+ invalidation::ClientConfig ticl_config;
+ invalidation_client_.reset(
+ new invalidation::InvalidationClientImpl(
+ &chrome_system_resources_, client_type, app_name, listener,
+ ticl_config));
+ cache_invalidation_packet_handler_.reset(
+ new CacheInvalidationPacketHandler(xmpp_client,
+ invalidation_client_.get()));
+}
+
+void ChromeInvalidationClient::Stop() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ if (!invalidation_client_.get()) {
+ DCHECK(!cache_invalidation_packet_handler_.get());
+ return;
+ }
+
+ chrome_system_resources_.StopScheduler();
+
+ cache_invalidation_packet_handler_.reset();
+ invalidation_client_.reset();
+}
+
+void ChromeInvalidationClient::Register(
+ const invalidation::ObjectId& oid,
+ invalidation::RegistrationCallback* callback) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ invalidation_client_->Register(oid, callback);
+}
+
+void ChromeInvalidationClient::Unregister(
+ const invalidation::ObjectId& oid,
+ invalidation::RegistrationCallback* callback) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ invalidation_client_->Unregister(oid, callback);
+}
+
+} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.h b/chrome/browser/sync/notifier/chrome_invalidation_client.h
new file mode 100644
index 0000000..e3f67b1
--- /dev/null
+++ b/chrome/browser/sync/notifier/chrome_invalidation_client.h
@@ -0,0 +1,65 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A simple wrapper around invalidation::InvalidationClient that
+// handles all the startup/shutdown details and hookups.
+
+#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
+#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/non_thread_safe.h"
+#include "base/scoped_ptr.h"
+#include "chrome/browser/sync/notifier/chrome_system_resources.h"
+#include "google/cacheinvalidation/invalidation-client.h"
+
+namespace buzz {
+class XmppClient;
+} // namespace
+
+namespace sync_notifier {
+
+class CacheInvalidationPacketHandler;
+
+// TODO(akalin): Hook this up to a NetworkChangeNotifier so we can
+// properly notify invalidation_client_.
+
+class ChromeInvalidationClient {
+ public:
+ ChromeInvalidationClient();
+
+ ~ChromeInvalidationClient();
+
+ // Does not take ownership of |listener| nor |xmpp_client|.
+ void Start(
+ const std::string& app_name,
+ invalidation::InvalidationListener* listener,
+ buzz::XmppClient* xmpp_client);
+
+ void Stop();
+
+ // The following functions must only be called between calls to
+ // Start() and Stop(). See invalidation-client.h for documentation.
+
+ void Register(const invalidation::ObjectId& oid,
+ invalidation::RegistrationCallback* callback);
+
+ void Unregister(const invalidation::ObjectId& oid,
+ invalidation::RegistrationCallback* callback);
+
+ private:
+ NonThreadSafe non_thread_safe_;
+ ChromeSystemResources chrome_system_resources_;
+ scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
+ scoped_ptr<CacheInvalidationPacketHandler>
+ cache_invalidation_packet_handler_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationClient);
+};
+
+} // namespace sync_notifier
+
+#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
diff --git a/chrome/browser/sync/notifier/chrome_system_resources.cc b/chrome/browser/sync/notifier/chrome_system_resources.cc
new file mode 100644
index 0000000..0ec0210
--- /dev/null
+++ b/chrome/browser/sync/notifier/chrome_system_resources.cc
@@ -0,0 +1,75 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/notifier/chrome_system_resources.h"
+
+#include <cstdlib>
+#include <string>
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/string_util.h"
+#include "base/task.h"
+#include "chrome/browser/sync/notifier/invalidation_util.h"
+
+namespace sync_notifier {
+
+ChromeSystemResources::ChromeSystemResources()
+ : scheduler_active_(false) {}
+
+ChromeSystemResources::~ChromeSystemResources() {
+ DCHECK(!scheduler_active_);
+}
+
+invalidation::Time ChromeSystemResources::current_time() {
+ return base::Time::Now();
+}
+
+void ChromeSystemResources::StartScheduler() {
+ DCHECK(!scheduler_active_);
+ scheduler_active_ = true;
+}
+
+void ChromeSystemResources::StopScheduler() {
+ DCHECK(scheduler_active_);
+ scheduler_active_ = false;
+}
+
+void ChromeSystemResources::ScheduleWithDelay(
+ invalidation::TimeDelta delay,
+ invalidation::Closure* task) {
+ if (!scheduler_active_) {
+ delete task;
+ return;
+ }
+ DCHECK(invalidation::IsCallbackRepeatable(task));
+ MessageLoop::current()->PostDelayedTask(
+ FROM_HERE,
+ NewRunnableFunction(&RunAndDeleteClosure, task),
+ delay.InMillisecondsRoundedUp());
+}
+
+void ChromeSystemResources::ScheduleImmediately(
+ invalidation::Closure* task) {
+ if (!scheduler_active_) {
+ delete task;
+ return;
+ }
+ DCHECK(invalidation::IsCallbackRepeatable(task));
+ MessageLoop::current()->PostTask(
+ FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
+}
+
+void ChromeSystemResources::Log(
+ LogLevel level, const char* file, int line,
+ const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ std::string result;
+ StringAppendV(&result, format, ap);
+ logging::LogMessage(file, line).stream() << result;
+ va_end(ap);
+}
+
+} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/chrome_system_resources.h b/chrome/browser/sync/notifier/chrome_system_resources.h
new file mode 100644
index 0000000..a8a98df
--- /dev/null
+++ b/chrome/browser/sync/notifier/chrome_system_resources.h
@@ -0,0 +1,50 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Simple system resources class that uses the current message loop
+// for scheduling. Assumes the current message loop is already
+// running.
+
+#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
+#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
+
+#include "google/cacheinvalidation/invalidation-client.h"
+
+namespace sync_notifier {
+
+// TODO(akalin): Add a NonThreadSafe member to this class and use it.
+
+class ChromeSystemResources : public invalidation::SystemResources {
+ public:
+ ChromeSystemResources();
+
+ ~ChromeSystemResources();
+
+ virtual invalidation::Time current_time();
+
+ virtual void StartScheduler();
+
+ // We assume that the current message loop is stopped shortly after
+ // this is called, i.e. that any in-flight delayed tasks won't get
+ // run.
+ //
+ // TODO(akalin): Make sure that the above actually holds. Use a
+ // ScopedRunnableMethodFactory for better safety.
+ virtual void StopScheduler();
+
+ virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
+ invalidation::Closure* task);
+
+ virtual void ScheduleImmediately(invalidation::Closure* task);
+
+ virtual void Log(LogLevel level, const char* file, int line,
+ const char* format, ...);
+
+ private:
+ bool scheduler_active_;
+};
+
+} // namespace sync_notifier
+
+#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
diff --git a/chrome/browser/sync/notifier/invalidation_util.cc b/chrome/browser/sync/notifier/invalidation_util.cc
new file mode 100644
index 0000000..2683fae
--- /dev/null
+++ b/chrome/browser/sync/notifier/invalidation_util.cc
@@ -0,0 +1,82 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/notifier/invalidation_util.h"
+
+#include <sstream>
+
+namespace sync_notifier {
+
+void RunAndDeleteClosure(invalidation::Closure* task) {
+ task->Run();
+ delete task;
+}
+
+// We need to write our own protobuf to string functions because we
+// use LITE_RUNTIME, which doesn't support DebugString().
+
+std::string ObjectIdToString(
+ const invalidation::ObjectId& object_id) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "name: " << object_id.name().string_value() << ", ";
+ ss << "source: " << object_id.source();
+ ss << " }";
+ return ss.str();
+}
+
+std::string StatusToString(
+ const invalidation::Status& status) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "code: " << status.code() << ", ";
+ ss << "description: " << status.description();
+ ss << " }";
+ return ss.str();
+}
+
+std::string InvalidationToString(
+ const invalidation::Invalidation& invalidation) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
+ ss << "version: " << invalidation.version() << ", ";
+ ss << "components: { ";
+ const invalidation::ComponentStampLog& component_stamp_log =
+ invalidation.component_stamp_log();
+ for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
+ const invalidation::ComponentStamp& component_stamp =
+ component_stamp_log.stamps(i);
+ ss << "component: " << component_stamp.component() << ", ";
+ ss << "time: " << component_stamp.time() << ", ";
+ }
+ ss << " }";
+ ss << " }";
+ return ss.str();
+}
+
+std::string RegistrationUpdateToString(
+ const invalidation::RegistrationUpdate& update) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "type: " << update.type() << ", ";
+ ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
+ ss << "version: " << update.version() << ", ";
+ ss << "sequence_number: " << update.sequence_number();
+ ss << " }";
+ return ss.str();
+}
+
+std::string RegistrationUpdateResultToString(
+ const invalidation::RegistrationUpdateResult& update_result) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "operation: "
+ << RegistrationUpdateToString(update_result.operation()) << ", ";
+ ss << "status: " << StatusToString(update_result.status());
+ ss << " }";
+ return ss.str();
+}
+
+} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/invalidation_util.h b/chrome/browser/sync/notifier/invalidation_util.h
new file mode 100644
index 0000000..3ca5fe9
--- /dev/null
+++ b/chrome/browser/sync/notifier/invalidation_util.h
@@ -0,0 +1,36 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// Various utilities for dealing with invalidation data types.
+
+#ifndef CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_
+#define CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_
+
+#include <string>
+
+#include "google/cacheinvalidation/invalidation-client.h"
+
+namespace sync_notifier {
+
+void RunAndDeleteClosure(invalidation::Closure* task);
+
+// We need to write our own protobuf-to-string functions because we
+// use LITE_RUNTIME, which doesn't support DebugString().
+
+std::string ObjectIdToString(const invalidation::ObjectId& object_id);
+
+std::string StatusToString(const invalidation::Status& status);
+
+std::string InvalidationToString(
+ const invalidation::Invalidation& invalidation);
+
+std::string RegistrationUpdateToString(
+ const invalidation::RegistrationUpdate& update);
+
+std::string RegistrationUpdateResultToString(
+ const invalidation::RegistrationUpdateResult& update_result);
+
+} // namespace sync_notifier
+
+#endif // CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_
diff --git a/chrome/browser/sync/notifier/server_notifier_thread.cc b/chrome/browser/sync/notifier/server_notifier_thread.cc
new file mode 100644
index 0000000..fbfc999
--- /dev/null
+++ b/chrome/browser/sync/notifier/server_notifier_thread.cc
@@ -0,0 +1,201 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/notifier/server_notifier_thread.h"
+
+#include <string>
+#include <vector>
+
+#include "base/logging.h"
+#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
+#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
+#include "chrome/browser/sync/notifier/chrome_system_resources.h"
+#include "chrome/browser/sync/notifier/invalidation_util.h"
+#include "chrome/common/net/notifier/listener/notification_defines.h"
+#include "google/cacheinvalidation/invalidation-client-impl.h"
+#include "talk/xmpp/jid.h"
+
+namespace sync_notifier {
+
+ServerNotifierThread::ServerNotifierThread(
+ chrome_common_net::NetworkChangeNotifierThread*
+ network_change_notifier_thread)
+ : notifier::MediatorThreadImpl(network_change_notifier_thread) {}
+
+ServerNotifierThread::~ServerNotifierThread() {}
+
+void ServerNotifierThread::ListenForUpdates() {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this,
+ &ServerNotifierThread::StartInvalidationListener));
+}
+
+void ServerNotifierThread::SubscribeForUpdates(
+ const std::vector<std::string>& subscribed_services_list) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this, &ServerNotifierThread::RegisterTypesAndSignalSubscribed));
+}
+
+void ServerNotifierThread::Logout() {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this,
+ &ServerNotifierThread::StopInvalidationListener));
+ MediatorThreadImpl::Logout();
+}
+
+void ServerNotifierThread::SendNotification(
+ const OutgoingNotificationData& data) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ NOTREACHED() << "Shouldn't send notifications if "
+ << "ServerNotifierThread is used";
+}
+
+void ServerNotifierThread::Invalidate(
+ const invalidation::Invalidation& invalidation,
+ invalidation::Closure* callback) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ CHECK(invalidation::IsCallbackRepeatable(callback));
+ LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
+ // Signal notification only for the invalidated types.
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &ServerNotifierThread::SignalIncomingNotification));
+ RunAndDeleteClosure(callback);
+ // A real implementation would respond to the invalidation for the
+ // given object (e.g., refetch the invalidated object).
+}
+
+void ServerNotifierThread::InvalidateAll(
+ invalidation::Closure* callback) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ CHECK(invalidation::IsCallbackRepeatable(callback));
+ LOG(INFO) << "InvalidateAll";
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &ServerNotifierThread::SignalIncomingNotification));
+ RunAndDeleteClosure(callback);
+}
+
+void ServerNotifierThread::AllRegistrationsLost(
+ invalidation::Closure* callback) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ CHECK(invalidation::IsCallbackRepeatable(callback));
+ LOG(INFO) << "AllRegistrationsLost; reregistering";
+ RegisterTypes();
+ RunAndDeleteClosure(callback);
+}
+
+void ServerNotifierThread::RegistrationLost(
+ const invalidation::ObjectId& object_id,
+ invalidation::Closure* callback) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ CHECK(invalidation::IsCallbackRepeatable(callback));
+ LOG(INFO) << "RegistrationLost; reregistering: "
+ << ObjectIdToString(object_id);
+ RegisterTypes();
+ RunAndDeleteClosure(callback);
+}
+
+void ServerNotifierThread::StartInvalidationListener() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+
+ StopInvalidationListener();
+ chrome_invalidation_client_.reset(new ChromeInvalidationClient());
+
+ // TODO(akalin): If we can figure out a unique per-session key that
+ // is preserved by the server, we can use that instead of kAppName.
+ // What this buys us is that we then won't receive any notifications
+ // that were generated by ourselves.
+ const std::string kAppName = "cc_sync_listen_notifications";
+ chrome_invalidation_client_->Start(kAppName, this, xmpp_client());
+}
+
+void ServerNotifierThread::RegisterTypesAndSignalSubscribed() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ RegisterTypes();
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &ServerNotifierThread::SignalSubscribed));
+}
+
+void ServerNotifierThread::RegisterTypes() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+
+ // TODO(akalin): This is a giant hack! Make this configurable. Add
+ // a mapping to/from ModelType.
+ std::vector<std::string> data_types;
+ data_types.push_back("AUTOFILL");
+ data_types.push_back("BOOKMARK");
+ data_types.push_back("EXTENSION");
+ data_types.push_back("PASSWORD");
+ data_types.push_back("THEME");
+ data_types.push_back("TYPED_URL");
+ data_types.push_back("PREFERENCE");
+
+ std::vector<invalidation::ObjectId> object_ids;
+
+ for (std::vector<std::string>::const_iterator it = data_types.begin();
+ it != data_types.end(); ++it) {
+ invalidation::ObjectId object_id;
+ object_id.mutable_name()->set_string_value(*it);
+ object_id.set_source(invalidation::ObjectId::CHROME_SYNC);
+ object_ids.push_back(object_id);
+ }
+
+ for (std::vector<invalidation::ObjectId>::const_iterator it =
+ object_ids.begin(); it != object_ids.end(); ++it) {
+ chrome_invalidation_client_->Register(
+ *it,
+ invalidation::NewPermanentCallback(
+ this, &ServerNotifierThread::RegisterCallback));
+ }
+}
+
+void ServerNotifierThread::RegisterCallback(
+ const invalidation::RegistrationUpdateResult& result) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ // TODO(akalin): Do something meaningful here.
+ LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
+}
+
+void ServerNotifierThread::SignalSubscribed() {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ if (delegate_) {
+ delegate_->OnSubscriptionStateChange(true);
+ }
+}
+
+void ServerNotifierThread::SignalIncomingNotification() {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ if (delegate_) {
+ // TODO(akalin): Fill this in with something meaningful.
+ IncomingNotificationData notification_data;
+ delegate_->OnIncomingNotification(notification_data);
+ }
+}
+
+void ServerNotifierThread::StopInvalidationListener() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+
+ if (chrome_invalidation_client_.get()) {
+ // TODO(akalin): Need to do unregisters here?
+ chrome_invalidation_client_->Stop();
+ }
+ chrome_invalidation_client_.reset();
+}
+
+} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/server_notifier_thread.h b/chrome/browser/sync/notifier/server_notifier_thread.h
new file mode 100644
index 0000000..9f079e2
--- /dev/null
+++ b/chrome/browser/sync/notifier/server_notifier_thread.h
@@ -0,0 +1,100 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// This class is the (hackish) way to use the XMPP parts of
+// MediatorThread for server-issued notifications.
+//
+// TODO(akalin): Decomp MediatorThread into an XMPP service part and a
+// notifications-specific part and use the XMPP service part for
+// server-issued notifications.
+
+#ifndef CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_
+#define CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_
+
+#include <string>
+#include <vector>
+
+#include "base/scoped_ptr.h"
+#include "base/task.h"
+#include "chrome/common/net/notifier/listener/mediator_thread_impl.h"
+#include "google/cacheinvalidation/invalidation-client.h"
+
+namespace chrome_common_net {
+class NetworkChangeNotifierThread;
+} // namespace chrome_common_net
+
+namespace sync_notifier {
+
+class ChromeInvalidationClient;
+
+class ServerNotifierThread
+ : public notifier::MediatorThreadImpl,
+ public invalidation::InvalidationListener {
+ public:
+ explicit ServerNotifierThread(
+ chrome_common_net::NetworkChangeNotifierThread*
+ network_change_notifier_thread);
+
+ virtual ~ServerNotifierThread();
+
+ // Overridden to start listening to server notifications.
+ virtual void ListenForUpdates();
+
+ // Overridden to immediately notify the delegate that subscriptions
+ // (i.e., notifications) are on. Must be called only after a call
+ // to ListenForUpdates().
+ virtual void SubscribeForUpdates(
+ const std::vector<std::string>& subscribed_services_list);
+
+ // Overridden to also stop listening to server notifications.
+ virtual void Logout();
+
+ // Must not be called.
+ virtual void SendNotification(const OutgoingNotificationData& data);
+
+ // invalidation::InvalidationListener implementation.
+
+ virtual void Invalidate(const invalidation::Invalidation& invalidation,
+ invalidation::Closure* callback);
+
+ virtual void InvalidateAll(invalidation::Closure* callback);
+
+ virtual void AllRegistrationsLost(invalidation::Closure* callback);
+
+ virtual void RegistrationLost(const invalidation::ObjectId& object_id,
+ invalidation::Closure* callback);
+
+ private:
+ // Posted to the worker thread by ListenForUpdates().
+ void StartInvalidationListener();
+
+ // Posted to the worker thread by SubscribeForUpdates().
+ void RegisterTypesAndSignalSubscribed();
+
+ // Register the sync types that we're interested in getting
+ // notifications for.
+ void RegisterTypes();
+
+ // Called when we get a registration response back.
+ void RegisterCallback(
+ const invalidation::RegistrationUpdateResult& result);
+
+ // Signal to the delegate that we're subscribed.
+ void SignalSubscribed();
+
+ // Signal to the delegate that we have an incoming notification.
+ void SignalIncomingNotification();
+
+ // Called by StartInvalidationListener() and posted to the worker
+ // thread by Stop().
+ void StopInvalidationListener();
+
+ scoped_ptr<ChromeInvalidationClient> chrome_invalidation_client_;
+};
+
+} // namespace sync_notifier
+
+DISABLE_RUNNABLE_METHOD_REFCOUNT(sync_notifier::ServerNotifierThread);
+
+#endif // CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_
diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc
index 899798ef..2e762ac 100644
--- a/chrome/browser/sync/tools/sync_listen_notifications.cc
+++ b/chrome/browser/sync/tools/sync_listen_notifications.cc
@@ -2,13 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <cstdlib>
#include <string>
-#include <sstream>
#include <vector>
#include "base/at_exit.h"
-#include "base/base64.h"
#include "base/command_line.h"
#include "base/logging.h"
#include "base/message_loop.h"
@@ -16,15 +13,17 @@
#include "base/string_util.h"
#include "base/task.h"
#include "chrome/browser/sync/notification_method.h"
+#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
+#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
+#include "chrome/browser/sync/notifier/chrome_system_resources.h"
+#include "chrome/browser/sync/notifier/invalidation_util.h"
#include "chrome/browser/sync/sync_constants.h"
#include "chrome/common/net/notifier/base/task_pump.h"
#include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h"
#include "chrome/common/net/notifier/listener/listen_task.h"
#include "chrome/common/net/notifier/listener/notification_constants.h"
#include "chrome/common/net/notifier/listener/subscribe_task.h"
-#include "chrome/common/net/notifier/listener/xml_element_util.h"
#include "google/cacheinvalidation/invalidation-client.h"
-#include "google/cacheinvalidation/invalidation-client-impl.h"
#include "talk/base/cryptstring.h"
#include "talk/base/logging.h"
#include "talk/base/sigslot.h"
@@ -194,148 +193,6 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate {
}
};
-// TODO(akalin): Move all the cache-invalidation-related stuff below
-// out of this file once we have a real Chrome-integrated
-// implementation.
-
-void RunAndDeleteClosure(invalidation::Closure* task) {
- task->Run();
- delete task;
-}
-
-// Simple system resources class that uses the current message loop
-// for scheduling. Assumes the current message loop is already
-// running.
-class ChromeSystemResources : public invalidation::SystemResources {
- public:
- ChromeSystemResources() : scheduler_active_(false) {}
-
- ~ChromeSystemResources() {
- CHECK(!scheduler_active_);
- }
-
- virtual invalidation::Time current_time() {
- return base::Time::Now();
- }
-
- virtual void StartScheduler() {
- CHECK(!scheduler_active_);
- scheduler_active_ = true;
- }
-
- // We assume that the current message loop is stopped shortly after
- // this is called, i.e. that any in-flight delayed tasks won't get
- // run.
- //
- // TODO(akalin): Make sure that the above actually holds.
- virtual void StopScheduler() {
- CHECK(scheduler_active_);
- scheduler_active_ = false;
- }
-
- virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
- invalidation::Closure* task) {
- if (!scheduler_active_) {
- delete task;
- return;
- }
- CHECK(invalidation::IsCallbackRepeatable(task));
- MessageLoop::current()->PostDelayedTask(
- FROM_HERE,
- NewRunnableFunction(&RunAndDeleteClosure, task),
- delay.InMillisecondsRoundedUp());
- }
-
- virtual void ScheduleImmediately(invalidation::Closure* task) {
- if (!scheduler_active_) {
- delete task;
- return;
- }
- CHECK(invalidation::IsCallbackRepeatable(task));
- MessageLoop::current()->PostTask(
- FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
- }
-
- virtual void Log(LogLevel level, const char* file, int line,
- const char* format, ...) {
- va_list ap;
- va_start(ap, format);
- std::string result;
- StringAppendV(&result, format, ap);
- logging::LogMessage(file, line).stream() << result;
- va_end(ap);
- }
-
- private:
- bool scheduler_active_;
-};
-
-// We need to write our own protobuf to string functions because we
-// use LITE_RUNTIME, which doesn't support DebugString().
-
-std::string ObjectIdToString(
- const invalidation::ObjectId& object_id) {
- std::stringstream ss;
- ss << "{ ";
- ss << "name: " << object_id.name().string_value() << ", ";
- ss << "source: " << object_id.source();
- ss << " }";
- return ss.str();
-}
-
-std::string StatusToString(
- const invalidation::Status& status) {
- std::stringstream ss;
- ss << "{ ";
- ss << "code: " << status.code() << ", ";
- ss << "description: " << status.description();
- ss << " }";
- return ss.str();
-}
-
-std::string InvalidationToString(
- const invalidation::Invalidation& invalidation) {
- std::stringstream ss;
- ss << "{ ";
- ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
- ss << "version: " << invalidation.version() << ", ";
- ss << "components: { ";
- const invalidation::ComponentStampLog& component_stamp_log =
- invalidation.component_stamp_log();
- for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
- const invalidation::ComponentStamp& component_stamp =
- component_stamp_log.stamps(i);
- ss << "component: " << component_stamp.component() << ", ";
- ss << "time: " << component_stamp.time() << ", ";
- }
- ss << " }";
- ss << " }";
- return ss.str();
-}
-
-std::string RegistrationUpdateToString(
- const invalidation::RegistrationUpdate& update) {
- std::stringstream ss;
- ss << "{ ";
- ss << "type: " << update.type() << ", ";
- ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
- ss << "version: " << update.version() << ", ";
- ss << "sequence_number: " << update.sequence_number();
- ss << " }";
- return ss.str();
-}
-
-std::string RegistrationUpdateResultToString(
- const invalidation::RegistrationUpdateResult& update_result) {
- std::stringstream ss;
- ss << "{ ";
- ss << "operation: "
- << RegistrationUpdateToString(update_result.operation()) << ", ";
- ss << "status: " << StatusToString(update_result.status());
- ss << " }";
- return ss.str();
-}
-
// The actual listener for sync notifications from the cache
// invalidation service.
class ChromeInvalidationListener
@@ -346,8 +203,9 @@ class ChromeInvalidationListener
virtual void Invalidate(const invalidation::Invalidation& invalidation,
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
- RunAndDeleteClosure(callback);
+ LOG(INFO) << "Invalidate: "
+ << sync_notifier::InvalidationToString(invalidation);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would respond to the invalidation for the
// given object (e.g., refetch the invalidated object).
}
@@ -355,7 +213,7 @@ class ChromeInvalidationListener
virtual void InvalidateAll(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "InvalidateAll";
- RunAndDeleteClosure(callback);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would loop over the current registered
// data types and send notifications for those.
}
@@ -363,7 +221,7 @@ class ChromeInvalidationListener
virtual void AllRegistrationsLost(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "AllRegistrationsLost";
- RunAndDeleteClosure(callback);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would try to re-register for all
// registered data types.
}
@@ -372,8 +230,8 @@ class ChromeInvalidationListener
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "RegistrationLost: "
- << ObjectIdToString(object_id);
- RunAndDeleteClosure(callback);
+ << sync_notifier::ObjectIdToString(object_id);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would try to re-register for this
// particular data type.
}
@@ -382,199 +240,6 @@ class ChromeInvalidationListener
DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener);
};
-static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
-static const buzz::QName kQnTangoIqPacketContent(
- "google:tango", "content");
-
-// A task that listens for ClientInvalidation messages and calls the
-// given callback on them.
-class CacheInvalidationListenTask : public buzz::XmppTask {
- public:
- // Takes ownership of callback.
- CacheInvalidationListenTask(Task* parent,
- Callback1<const std::string&>::Type* callback)
- : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
- virtual ~CacheInvalidationListenTask() {}
-
- virtual int ProcessStart() {
- LOG(INFO) << "CacheInvalidationListenTask started";
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- LOG(INFO) << "CacheInvalidationListenTask blocked";
- return STATE_BLOCKED;
- }
- LOG(INFO) << "CacheInvalidationListenTask response received";
- std::string data;
- if (GetTangoIqPacketData(stanza, &data)) {
- callback_->Run(data);
- } else {
- LOG(ERROR) << "Could not get packet data";
- }
- // Acknowledge receipt of the iq to the buzz server.
- // TODO(akalin): Send an error response for malformed packets.
- scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
- SendStanza(response_stanza.get());
- return STATE_RESPONSE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- LOG(INFO) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (IsValidTangoIqPacket(stanza)) {
- LOG(INFO) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
- LOG(INFO) << "Stanza skipped";
- return false;
- }
-
- private:
- bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
- return
- (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
- (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
- }
-
- bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
- std::string* data) {
- DCHECK(IsValidTangoIqPacket(stanza));
- const buzz::XmlElement* tango_iq_packet =
- stanza->FirstNamed(kQnTangoIqPacket);
- if (!tango_iq_packet) {
- LOG(ERROR) << "Could not find tango IQ packet element";
- return false;
- }
- const buzz::XmlElement* tango_iq_packet_content =
- tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
- if (!tango_iq_packet_content) {
- LOG(ERROR) << "Could not find tango IQ packet content element";
- return false;
- }
- *data = tango_iq_packet_content->BodyText();
- return true;
- }
-
- scoped_ptr<Callback1<const std::string&>::Type> callback_;
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
-};
-
-// A task that sends a single outbound ClientInvalidation message.
-class CacheInvalidationSendMessageTask : public buzz::XmppTask {
- public:
- CacheInvalidationSendMessageTask(Task* parent,
- const buzz::Jid& to_jid,
- const std::string& msg)
- : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
- to_jid_(to_jid), msg_(msg) {}
- virtual ~CacheInvalidationSendMessageTask() {}
-
- virtual int ProcessStart() {
- scoped_ptr<buzz::XmlElement> stanza(
- MakeTangoIqPacket(to_jid_, task_id(), msg_));
- LOG(INFO) << "Sending message: "
- << notifier::XmlElementToString(*stanza.get());
- if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
- LOG(INFO) << "Error when sending message";
- return STATE_ERROR;
- }
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
- return STATE_BLOCKED;
- }
- LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
- << notifier::XmlElementToString(*stanza);
- return STATE_DONE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- LOG(INFO) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (!MatchResponseIq(stanza, to_jid_, task_id())) {
- LOG(INFO) << "Stanza skipped";
- return false;
- }
- LOG(INFO) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
-
- private:
- static buzz::XmlElement* MakeTangoIqPacket(
- const buzz::Jid& to_jid,
- const std::string& task_id,
- const std::string& msg) {
- buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
- buzz::XmlElement* tango_iq_packet =
- new buzz::XmlElement(kQnTangoIqPacket, true);
- iq->AddElement(tango_iq_packet);
- buzz::XmlElement* tango_iq_packet_content =
- new buzz::XmlElement(kQnTangoIqPacketContent, true);
- tango_iq_packet->AddElement(tango_iq_packet_content);
- tango_iq_packet_content->SetBodyText(msg);
- return iq;
- }
-
- buzz::Jid to_jid_;
- std::string msg_;
-
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
-};
-
-// Class that handles the details of sending and receiving client
-// invalidation packets.
-class CacheInvalidationPacketHandler {
- public:
- CacheInvalidationPacketHandler(
- buzz::XmppClient* xmpp_client,
- invalidation::NetworkEndpoint* network_endpoint,
- const buzz::Jid& to_jid)
- : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint),
- to_jid_(to_jid) {
- // Owned by xmpp_client.
- CacheInvalidationListenTask* listen_task =
- new CacheInvalidationListenTask(
- xmpp_client, NewCallback(
- this, &CacheInvalidationPacketHandler::HandleInboundPacket));
- listen_task->Start();
- }
-
- void HandleOutboundPacket(invalidation::NetworkEndpoint* const &
- network_endpoint) {
- CHECK_EQ(network_endpoint, network_endpoint_);
- invalidation::string message;
- network_endpoint->TakeOutboundMessage(&message);
- std::string encoded_message;
- CHECK(base::Base64Encode(message, &encoded_message));
- // Owned by xmpp_client.
- CacheInvalidationSendMessageTask* send_message_task =
- new CacheInvalidationSendMessageTask(xmpp_client_,
- to_jid_,
- encoded_message);
- send_message_task->Start();
- }
-
- private:
- void HandleInboundPacket(const std::string& packet) {
- std::string decoded_message;
- CHECK(base::Base64Decode(packet, &decoded_message));
- network_endpoint_->HandleInboundMessage(decoded_message);
- }
-
- buzz::XmppClient* xmpp_client_;
- invalidation::NetworkEndpoint* network_endpoint_;
- buzz::Jid to_jid_;
-};
-
// Delegate for server-side notifications.
class CacheInvalidationNotifierDelegate
: public XmppNotificationClient::Delegate {
@@ -601,36 +266,15 @@ class CacheInvalidationNotifierDelegate
buzz::XmppClient* xmpp_client) {
LOG(INFO) << "Logged in";
- chrome_system_resources_.StartScheduler();
-
- invalidation::ClientType client_type;
- client_type.set_type(invalidation::ClientType::CHROME_SYNC);
// TODO(akalin): app_name should be per-client unique.
- std::string app_name = "cc_sync_listen_notifications";
- invalidation::ClientConfig ticl_config;
- invalidation_client_.reset(
- new invalidation::InvalidationClientImpl(
- &chrome_system_resources_, client_type,
- app_name, &chrome_invalidation_listener_, ticl_config));
-
- invalidation::NetworkEndpoint* network_endpoint =
- invalidation_client_->network_endpoint();
- // TODO(akalin): Make tango jid configurable (so we can use
- // staging).
- buzz::Jid to_jid("tango@bot.talk.google.com/PROD");
- packet_handler_.reset(
- new CacheInvalidationPacketHandler(xmpp_client,
- network_endpoint,
- to_jid));
-
- network_endpoint->RegisterOutboundListener(
- invalidation::NewPermanentCallback(
- packet_handler_.get(),
- &CacheInvalidationPacketHandler::HandleOutboundPacket));
+ const std::string kAppName = "cc_sync_listen_notifications";
+ chrome_invalidation_client_.Start(kAppName,
+ &chrome_invalidation_listener_,
+ xmpp_client);
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- invalidation_client_->Register(
+ chrome_invalidation_client_.Register(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
@@ -643,43 +287,39 @@ class CacheInvalidationNotifierDelegate
// TODO(akalin): Figure out the correct place to put this.
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- invalidation_client_->Unregister(
+ chrome_invalidation_client_.Unregister(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
}
- packet_handler_.reset();
- invalidation_client_.reset();
-
- chrome_system_resources_.StopScheduler();
+ chrome_invalidation_client_.Stop();
}
virtual void OnError(buzz::XmppEngine::Error error, int subcode) {
LOG(INFO) << "Error: " << error << ", subcode: " << subcode;
- packet_handler_.reset();
- invalidation_client_.reset();
- // TODO(akalin): Figure out whether we should stop the scheduler
- // here.
+ // TODO(akalin): Figure out whether we should unregister here,
+ // too.
+ chrome_invalidation_client_.Stop();
}
private:
void RegisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Registered: "
+ << sync_notifier::RegistrationUpdateResultToString(result);
}
void UnregisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Unregistered: "
+ << sync_notifier::RegistrationUpdateResultToString(result);
}
std::vector<invalidation::ObjectId> object_ids_;
- ChromeSystemResources chrome_system_resources_;
ChromeInvalidationListener chrome_invalidation_listener_;
- scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
- scoped_ptr<CacheInvalidationPacketHandler> packet_handler_;
+ sync_notifier::ChromeInvalidationClient chrome_invalidation_client_;
};
} // namespace
diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp
index 792bfbe..3343035 100644
--- a/chrome/browser/sync/tools/sync_tools.gyp
+++ b/chrome/browser/sync/tools/sync_tools.gyp
@@ -21,6 +21,7 @@
'dependencies': [
'<(DEPTH)/base/base.gyp:base',
'<(DEPTH)/chrome/chrome.gyp:notifier',
+ '<(DEPTH)/chrome/chrome.gyp:sync_notifier',
'<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
'<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle',
],