summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-22 18:00:32 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-22 18:00:32 +0000
commit995b3daaaf10741c1fa8cb2c33b3b36a47a0f917 (patch)
treeb4240ff03da3c5cedff01d0e612b002096c85e05 /chrome/browser/sync
parent50b997c85d4dfaf2d01de3448314ca15efcec789 (diff)
downloadchromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.zip
chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.gz
chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.bz2
Revert 50479 - Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use --sync-notification-method=server to turn on). BUG=34647 TEST=manually Review URL: http://codereview.chromium.org/2827014 TBR=akalin@chromium.org Review URL: http://codereview.chromium.org/2805023 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50482 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
-rw-r--r--chrome/browser/sync/engine/syncapi.cc42
-rw-r--r--chrome/browser/sync/notification_method.cc5
-rw-r--r--chrome/browser/sync/notification_method.h24
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc243
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.h58
-rw-r--r--chrome/browser/sync/notifier/chrome_invalidation_client.cc74
-rw-r--r--chrome/browser/sync/notifier/chrome_invalidation_client.h65
-rw-r--r--chrome/browser/sync/notifier/chrome_system_resources.cc75
-rw-r--r--chrome/browser/sync/notifier/chrome_system_resources.h50
-rw-r--r--chrome/browser/sync/notifier/invalidation_util.cc82
-rw-r--r--chrome/browser/sync/notifier/invalidation_util.h36
-rw-r--r--chrome/browser/sync/notifier/server_notifier_thread.cc201
-rw-r--r--chrome/browser/sync/notifier/server_notifier_thread.h100
-rw-r--r--chrome/browser/sync/tools/sync_listen_notifications.cc412
-rw-r--r--chrome/browser/sync/tools/sync_tools.gyp1
15 files changed, 404 insertions, 1064 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc
index c242d47..20d6a2c 100644
--- a/chrome/browser/sync/engine/syncapi.cc
+++ b/chrome/browser/sync/engine/syncapi.cc
@@ -32,7 +32,6 @@
#include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h"
#include "chrome/browser/sync/engine/syncer.h"
#include "chrome/browser/sync/engine/syncer_thread.h"
-#include "chrome/browser/sync/notifier/server_notifier_thread.h"
#include "chrome/browser/sync/protocol/autofill_specifics.pb.h"
#include "chrome/browser/sync/protocol/bookmark_specifics.pb.h"
#include "chrome/browser/sync/protocol/extension_specifics.pb.h"
@@ -1286,23 +1285,12 @@ bool SyncManager::SyncInternal::Init(
// jank if we try to shut down sync. Fix this.
connection_manager()->CheckServerReachable();
- // NOTIFICATION_SERVER uses a substantially different notification
- // method, so it has its own MediatorThread implementation.
- // Everything else just uses MediatorThreadImpl.
- notifier::MediatorThread* mediator_thread =
- (notification_method == browser_sync::NOTIFICATION_SERVER) ?
- static_cast<notifier::MediatorThread*>(
- new sync_notifier::ServerNotifierThread(
- network_change_notifier_thread)) :
- static_cast<notifier::MediatorThread*>(
- new notifier::MediatorThreadImpl(network_change_notifier_thread));
const bool kInitializeSsl = true;
const bool kConnectImmediately = false;
talk_mediator_.reset(new TalkMediatorImpl(
- mediator_thread,
+ new notifier::MediatorThreadImpl(network_change_notifier_thread),
kInitializeSsl, kConnectImmediately, invalidate_xmpp_auth_token));
- if (notification_method != browser_sync::NOTIFICATION_LEGACY &&
- notification_method != browser_sync::NOTIFICATION_SERVER) {
+ if (notification_method != browser_sync::NOTIFICATION_LEGACY) {
if (notification_method == browser_sync::NOTIFICATION_TRANSITIONAL) {
talk_mediator_->AddSubscribedServiceUrl(
browser_sync::kSyncLegacyServiceUrl);
@@ -1391,7 +1379,6 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() {
void SyncManager::SyncInternal::SendPendingXMPPNotification(
bool new_pending_notification) {
DCHECK_EQ(MessageLoop::current(), core_message_loop_);
- DCHECK_NE(notification_method_, browser_sync::NOTIFICATION_SERVER);
notification_pending_ = notification_pending_ || new_pending_notification;
if (!notification_pending_) {
LOG(INFO) << "Not sending notification: no pending notification";
@@ -1785,18 +1772,16 @@ void SyncManager::SyncInternal::HandleChannelEvent(const SyncerEvent& event) {
observer_->OnSyncCycleCompleted(event.snapshot);
}
- if (notification_method_ != browser_sync::NOTIFICATION_SERVER) {
- // TODO(chron): Consider changing this back to track has_more_to_sync
- // only notify peers if a successful commit has occurred.
- bool new_pending_notification =
- (event.snapshot->syncer_status.num_successful_commits > 0);
- core_message_loop_->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this,
- &SyncManager::SyncInternal::SendPendingXMPPNotification,
- new_pending_notification));
- }
+ // TODO(chron): Consider changing this back to track has_more_to_sync
+ // only notify peers if a successful commit has occurred.
+ bool new_pending_notification =
+ (event.snapshot->syncer_status.num_successful_commits > 0);
+ core_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &SyncManager::SyncInternal::SendPendingXMPPNotification,
+ new_pending_notification));
}
if (event.what_happened == SyncerEvent::PAUSED) {
@@ -1919,8 +1904,7 @@ void SyncManager::SyncInternal::OnNotificationStateChange(
if (syncer_thread()) {
syncer_thread()->SetNotificationsEnabled(notifications_enabled);
}
- if ((notification_method_ != browser_sync::NOTIFICATION_SERVER) &&
- notifications_enabled) {
+ if (notifications_enabled) {
// Send a notification as soon as subscriptions are on
// (see http://code.google.com/p/chromium/issues/detail?id=38563 ).
core_message_loop_->PostTask(
diff --git a/chrome/browser/sync/notification_method.cc b/chrome/browser/sync/notification_method.cc
index 9c00344..aec4a9c 100644
--- a/chrome/browser/sync/notification_method.cc
+++ b/chrome/browser/sync/notification_method.cc
@@ -24,9 +24,6 @@ std::string NotificationMethodToString(
case NOTIFICATION_NEW:
return "NOTIFICATION_NEW";
break;
- case NOTIFICATION_SERVER:
- return "NOTIFICATION_SERVER";
- break;
default:
LOG(WARNING) << "Unknown value for notification method: "
<< notification_method;
@@ -42,8 +39,6 @@ NotificationMethod StringToNotificationMethod(const std::string& str) {
return NOTIFICATION_TRANSITIONAL;
} else if (str == "new") {
return NOTIFICATION_NEW;
- } else if (str == "server") {
- return NOTIFICATION_SERVER;
}
LOG(WARNING) << "Unknown notification method \"" << str
<< "\"; using method "
diff --git a/chrome/browser/sync/notification_method.h b/chrome/browser/sync/notification_method.h
index 4e1bcd1..569f8d2 100644
--- a/chrome/browser/sync/notification_method.h
+++ b/chrome/browser/sync/notification_method.h
@@ -10,7 +10,7 @@
namespace browser_sync {
// This is the matrix for the interaction between clients with
-// different notification methods (except for NOTIFICATION_SERVER):
+// different notification methods:
//
// Listen
// L T N
@@ -24,12 +24,6 @@ namespace browser_sync {
// will receive notifications from a client sending with the row
// notification method. 'E' means means that the notification will be
// an empty one, which may be dropped by the server in the future.
-//
-// As for NOTIFICATION_SERVER, server-issued notifications will also
-// simulate a peer-issued notification, so that any client with
-// NOTIFICATION_TRANSITIONAL or NOTIFICATION_NEW will be able to
-// receive those, too. This support will be removed once everyone is
-// on NOTIFICATION_SERVER.
enum NotificationMethod {
// Old, broken notification method. Works only if notification
@@ -39,17 +33,9 @@ enum NotificationMethod {
// notifications if the notification servers don't drop empty
// notifications.
NOTIFICATION_TRANSITIONAL,
- // New notification method. Compatible only with transitional
- // notifications.
- //
- // NOTE: "New" is kind of a misnomer, as it refers only to
- // peer-issued notifications; the plan is to migrate everyone to
- // using NOTIFICATION_SERVER.
+ // New, ideal notification method. Compatible only with
+ // transitional notifications.
NOTIFICATION_NEW,
-
- // Server-issued notifications. Compatible only with transitional
- // notifications.
- NOTIFICATION_SERVER,
};
extern const NotificationMethod kDefaultNotificationMethod;
@@ -57,8 +43,8 @@ extern const NotificationMethod kDefaultNotificationMethod;
std::string NotificationMethodToString(
NotificationMethod notification_method);
-// If the given string is not one of "legacy", "transitional", "new",
-// or "server", returns kDefaultNotificationMethod.
+// If the given string is not one of "legacy", "transitional", or
+// "new", returns kDefaultNotificationMethod.
NotificationMethod StringToNotificationMethod(const std::string& str);
} // namespace browser_sync
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
deleted file mode 100644
index dbc3d54..0000000
--- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
+++ /dev/null
@@ -1,243 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
-
-#include "base/base64.h"
-#include "base/callback.h"
-#include "base/logging.h"
-#include "chrome/common/net/notifier/listener/xml_element_util.h"
-#include "google/cacheinvalidation/invalidation-client.h"
-#include "talk/xmpp/constants.h"
-#include "talk/xmpp/jid.h"
-#include "talk/xmpp/xmppclient.h"
-#include "talk/xmpp/xmpptask.h"
-
-namespace sync_notifier {
-
-namespace {
-
-const char kBotJid[] = "tango@bot.talk.google.com";
-
-// TODO(akalin): Eliminate use of 'tango' name.
-
-// TODO(akalin): Hash out details of tango IQ protocol.
-
-static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
-static const buzz::QName kQnTangoIqPacketContent(
- "google:tango", "content");
-
-// TODO(akalin): Move these task classes out so that they can be
-// unit-tested. This'll probably be done easier once we consolidate
-// all the packet sending/receiving classes.
-
-// A task that listens for ClientInvalidation messages and calls the
-// given callback on them.
-class CacheInvalidationListenTask : public buzz::XmppTask {
- public:
- // Takes ownership of callback.
- CacheInvalidationListenTask(Task* parent,
- Callback1<const std::string&>::Type* callback)
- : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
- virtual ~CacheInvalidationListenTask() {}
-
- virtual int ProcessStart() {
- LOG(INFO) << "CacheInvalidationListenTask started";
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- LOG(INFO) << "CacheInvalidationListenTask blocked";
- return STATE_BLOCKED;
- }
- LOG(INFO) << "CacheInvalidationListenTask response received";
- std::string data;
- if (GetTangoIqPacketData(stanza, &data)) {
- callback_->Run(data);
- } else {
- LOG(ERROR) << "Could not get packet data";
- }
- // Acknowledge receipt of the iq to the buzz server.
- // TODO(akalin): Send an error response for malformed packets.
- scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
- SendStanza(response_stanza.get());
- return STATE_RESPONSE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- LOG(INFO) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (IsValidTangoIqPacket(stanza)) {
- LOG(INFO) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
- LOG(INFO) << "Stanza skipped";
- return false;
- }
-
- private:
- bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
- return
- (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
- (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
- }
-
- bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
- std::string* data) {
- DCHECK(IsValidTangoIqPacket(stanza));
- const buzz::XmlElement* tango_iq_packet =
- stanza->FirstNamed(kQnTangoIqPacket);
- if (!tango_iq_packet) {
- LOG(ERROR) << "Could not find tango IQ packet element";
- return false;
- }
- const buzz::XmlElement* tango_iq_packet_content =
- tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
- if (!tango_iq_packet_content) {
- LOG(ERROR) << "Could not find tango IQ packet content element";
- return false;
- }
- *data = tango_iq_packet_content->BodyText();
- return true;
- }
-
- scoped_ptr<Callback1<const std::string&>::Type> callback_;
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
-};
-
-// A task that sends a single outbound ClientInvalidation message.
-class CacheInvalidationSendMessageTask : public buzz::XmppTask {
- public:
- CacheInvalidationSendMessageTask(Task* parent,
- const buzz::Jid& to_jid,
- const std::string& msg)
- : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
- to_jid_(to_jid), msg_(msg) {}
- virtual ~CacheInvalidationSendMessageTask() {}
-
- virtual int ProcessStart() {
- scoped_ptr<buzz::XmlElement> stanza(
- MakeTangoIqPacket(to_jid_, task_id(), msg_));
- LOG(INFO) << "Sending message: "
- << notifier::XmlElementToString(*stanza.get());
- if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
- LOG(INFO) << "Error when sending message";
- return STATE_ERROR;
- }
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
- return STATE_BLOCKED;
- }
- LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
- << notifier::XmlElementToString(*stanza);
- // TODO(akalin): Handle errors here.
- return STATE_DONE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- LOG(INFO) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (!MatchResponseIq(stanza, to_jid_, task_id())) {
- LOG(INFO) << "Stanza skipped";
- return false;
- }
- LOG(INFO) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
-
- private:
- static buzz::XmlElement* MakeTangoIqPacket(
- const buzz::Jid& to_jid,
- const std::string& task_id,
- const std::string& msg) {
- buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
- buzz::XmlElement* tango_iq_packet =
- new buzz::XmlElement(kQnTangoIqPacket, true);
- iq->AddElement(tango_iq_packet);
- buzz::XmlElement* tango_iq_packet_content =
- new buzz::XmlElement(kQnTangoIqPacketContent, true);
- tango_iq_packet->AddElement(tango_iq_packet_content);
- tango_iq_packet_content->SetBodyText(msg);
- return iq;
- }
-
- const buzz::Jid to_jid_;
- std::string msg_;
-
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
-};
-
-} // namespace
-
-CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
- buzz::XmppClient* xmpp_client,
- invalidation::InvalidationClient* invalidation_client)
- : xmpp_client_(xmpp_client),
- invalidation_client_(invalidation_client) {
- CHECK(xmpp_client_);
- CHECK(invalidation_client_);
- invalidation::NetworkEndpoint* network_endpoint =
- invalidation_client_->network_endpoint();
- CHECK(network_endpoint);
- network_endpoint->RegisterOutboundListener(
- invalidation::NewPermanentCallback(
- this,
- &CacheInvalidationPacketHandler::HandleOutboundPacket));
- // Owned by xmpp_client.
- CacheInvalidationListenTask* listen_task =
- new CacheInvalidationListenTask(
- xmpp_client, NewCallback(
- this, &CacheInvalidationPacketHandler::HandleInboundPacket));
- listen_task->Start();
-}
-
-CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
- invalidation::NetworkEndpoint* network_endpoint =
- invalidation_client_->network_endpoint();
- CHECK(network_endpoint);
- network_endpoint->RegisterOutboundListener(NULL);
-}
-
-void CacheInvalidationPacketHandler::HandleOutboundPacket(
- invalidation::NetworkEndpoint* const& network_endpoint) {
- CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
- invalidation::string message;
- network_endpoint->TakeOutboundMessage(&message);
- std::string encoded_message;
- if (!base::Base64Encode(message, &encoded_message)) {
- LOG(ERROR) << "Could not base64-encode message to send: "
- << message;
- return;
- }
- // Owned by xmpp_client.
- CacheInvalidationSendMessageTask* send_message_task =
- new CacheInvalidationSendMessageTask(xmpp_client_,
- buzz::Jid(kBotJid),
- encoded_message);
- send_message_task->Start();
-}
-
-void CacheInvalidationPacketHandler::HandleInboundPacket(
- const std::string& packet) {
- invalidation::NetworkEndpoint* network_endpoint =
- invalidation_client_->network_endpoint();
- std::string decoded_message;
- if (!base::Base64Decode(packet, &decoded_message)) {
- LOG(ERROR) << "Could not base64-decode received message: "
- << packet;
- return;
- }
- network_endpoint->HandleInboundMessage(decoded_message);
-}
-
-} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
deleted file mode 100644
index 2ab1b3d..0000000
--- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-//
-// Class that handles the details of sending and receiving client
-// invalidation packets.
-
-#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
-#define CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
-
-#include <string>
-
-#include "base/basictypes.h"
-#include "talk/xmpp/jid.h"
-
-namespace buzz {
-class XmppClient;
-} // namespace buzz
-
-namespace invalidation {
-class InvalidationClient;
-class NetworkEndpoint;
-} // namespace invalidation
-
-namespace sync_notifier {
-
-// TODO(akalin): Add a NonThreadSafe member to this class and use it.
-
-class CacheInvalidationPacketHandler {
- public:
- // Starts routing packets from |invalidation_client| through
- // |xmpp_client|. |invalidation_client| must not already be routing
- // packets through something. Does not take ownership of
- // |xmpp_client| or |invalidation_client|.
- CacheInvalidationPacketHandler(
- buzz::XmppClient* xmpp_client,
- invalidation::InvalidationClient* invalidation_client);
-
- // Makes the invalidation client passed into the constructor not
- // route packets through the XMPP client passed into the constructor
- // anymore.
- ~CacheInvalidationPacketHandler();
-
- private:
- void HandleOutboundPacket(
- invalidation::NetworkEndpoint* const& network_endpoint);
-
- void HandleInboundPacket(const std::string& packet);
-
- buzz::XmppClient* xmpp_client_;
- invalidation::InvalidationClient* invalidation_client_;
-
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler);
-};
-
-} // namespace sync_notifier
-
-#endif // CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_
diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.cc b/chrome/browser/sync/notifier/chrome_invalidation_client.cc
deleted file mode 100644
index 3191b8d..0000000
--- a/chrome/browser/sync/notifier/chrome_invalidation_client.cc
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
-
-#include <string>
-#include <vector>
-
-#include "base/logging.h"
-#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
-#include "chrome/browser/sync/notifier/invalidation_util.h"
-#include "google/cacheinvalidation/invalidation-client-impl.h"
-
-namespace sync_notifier {
-
-ChromeInvalidationClient::ChromeInvalidationClient() {
- DCHECK(non_thread_safe_.CalledOnValidThread());
-}
-
-ChromeInvalidationClient::~ChromeInvalidationClient() {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- Stop();
-}
-
-void ChromeInvalidationClient::Start(
- const std::string& app_name,
- invalidation::InvalidationListener* listener,
- buzz::XmppClient* xmpp_client) {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- Stop();
-
- chrome_system_resources_.StartScheduler();
-
- invalidation::ClientType client_type;
- client_type.set_type(invalidation::ClientType::CHROME_SYNC);
- invalidation::ClientConfig ticl_config;
- invalidation_client_.reset(
- new invalidation::InvalidationClientImpl(
- &chrome_system_resources_, client_type, app_name, listener,
- ticl_config));
- cache_invalidation_packet_handler_.reset(
- new CacheInvalidationPacketHandler(xmpp_client,
- invalidation_client_.get()));
-}
-
-void ChromeInvalidationClient::Stop() {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- if (!invalidation_client_.get()) {
- DCHECK(!cache_invalidation_packet_handler_.get());
- return;
- }
-
- chrome_system_resources_.StopScheduler();
-
- cache_invalidation_packet_handler_.reset();
- invalidation_client_.reset();
-}
-
-void ChromeInvalidationClient::Register(
- const invalidation::ObjectId& oid,
- invalidation::RegistrationCallback* callback) {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- invalidation_client_->Register(oid, callback);
-}
-
-void ChromeInvalidationClient::Unregister(
- const invalidation::ObjectId& oid,
- invalidation::RegistrationCallback* callback) {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- invalidation_client_->Unregister(oid, callback);
-}
-
-} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.h b/chrome/browser/sync/notifier/chrome_invalidation_client.h
deleted file mode 100644
index e3f67b1..0000000
--- a/chrome/browser/sync/notifier/chrome_invalidation_client.h
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-//
-// A simple wrapper around invalidation::InvalidationClient that
-// handles all the startup/shutdown details and hookups.
-
-#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
-#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
-
-#include <string>
-
-#include "base/basictypes.h"
-#include "base/non_thread_safe.h"
-#include "base/scoped_ptr.h"
-#include "chrome/browser/sync/notifier/chrome_system_resources.h"
-#include "google/cacheinvalidation/invalidation-client.h"
-
-namespace buzz {
-class XmppClient;
-} // namespace
-
-namespace sync_notifier {
-
-class CacheInvalidationPacketHandler;
-
-// TODO(akalin): Hook this up to a NetworkChangeNotifier so we can
-// properly notify invalidation_client_.
-
-class ChromeInvalidationClient {
- public:
- ChromeInvalidationClient();
-
- ~ChromeInvalidationClient();
-
- // Does not take ownership of |listener| nor |xmpp_client|.
- void Start(
- const std::string& app_name,
- invalidation::InvalidationListener* listener,
- buzz::XmppClient* xmpp_client);
-
- void Stop();
-
- // The following functions must only be called between calls to
- // Start() and Stop(). See invalidation-client.h for documentation.
-
- void Register(const invalidation::ObjectId& oid,
- invalidation::RegistrationCallback* callback);
-
- void Unregister(const invalidation::ObjectId& oid,
- invalidation::RegistrationCallback* callback);
-
- private:
- NonThreadSafe non_thread_safe_;
- ChromeSystemResources chrome_system_resources_;
- scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
- scoped_ptr<CacheInvalidationPacketHandler>
- cache_invalidation_packet_handler_;
-
- DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationClient);
-};
-
-} // namespace sync_notifier
-
-#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_
diff --git a/chrome/browser/sync/notifier/chrome_system_resources.cc b/chrome/browser/sync/notifier/chrome_system_resources.cc
deleted file mode 100644
index 0ec0210..0000000
--- a/chrome/browser/sync/notifier/chrome_system_resources.cc
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "chrome/browser/sync/notifier/chrome_system_resources.h"
-
-#include <cstdlib>
-#include <string>
-
-#include "base/logging.h"
-#include "base/message_loop.h"
-#include "base/string_util.h"
-#include "base/task.h"
-#include "chrome/browser/sync/notifier/invalidation_util.h"
-
-namespace sync_notifier {
-
-ChromeSystemResources::ChromeSystemResources()
- : scheduler_active_(false) {}
-
-ChromeSystemResources::~ChromeSystemResources() {
- DCHECK(!scheduler_active_);
-}
-
-invalidation::Time ChromeSystemResources::current_time() {
- return base::Time::Now();
-}
-
-void ChromeSystemResources::StartScheduler() {
- DCHECK(!scheduler_active_);
- scheduler_active_ = true;
-}
-
-void ChromeSystemResources::StopScheduler() {
- DCHECK(scheduler_active_);
- scheduler_active_ = false;
-}
-
-void ChromeSystemResources::ScheduleWithDelay(
- invalidation::TimeDelta delay,
- invalidation::Closure* task) {
- if (!scheduler_active_) {
- delete task;
- return;
- }
- DCHECK(invalidation::IsCallbackRepeatable(task));
- MessageLoop::current()->PostDelayedTask(
- FROM_HERE,
- NewRunnableFunction(&RunAndDeleteClosure, task),
- delay.InMillisecondsRoundedUp());
-}
-
-void ChromeSystemResources::ScheduleImmediately(
- invalidation::Closure* task) {
- if (!scheduler_active_) {
- delete task;
- return;
- }
- DCHECK(invalidation::IsCallbackRepeatable(task));
- MessageLoop::current()->PostTask(
- FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
-}
-
-void ChromeSystemResources::Log(
- LogLevel level, const char* file, int line,
- const char* format, ...) {
- va_list ap;
- va_start(ap, format);
- std::string result;
- StringAppendV(&result, format, ap);
- logging::LogMessage(file, line).stream() << result;
- va_end(ap);
-}
-
-} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/chrome_system_resources.h b/chrome/browser/sync/notifier/chrome_system_resources.h
deleted file mode 100644
index a8a98df..0000000
--- a/chrome/browser/sync/notifier/chrome_system_resources.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-//
-// Simple system resources class that uses the current message loop
-// for scheduling. Assumes the current message loop is already
-// running.
-
-#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
-#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
-
-#include "google/cacheinvalidation/invalidation-client.h"
-
-namespace sync_notifier {
-
-// TODO(akalin): Add a NonThreadSafe member to this class and use it.
-
-class ChromeSystemResources : public invalidation::SystemResources {
- public:
- ChromeSystemResources();
-
- ~ChromeSystemResources();
-
- virtual invalidation::Time current_time();
-
- virtual void StartScheduler();
-
- // We assume that the current message loop is stopped shortly after
- // this is called, i.e. that any in-flight delayed tasks won't get
- // run.
- //
- // TODO(akalin): Make sure that the above actually holds. Use a
- // ScopedRunnableMethodFactory for better safety.
- virtual void StopScheduler();
-
- virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
- invalidation::Closure* task);
-
- virtual void ScheduleImmediately(invalidation::Closure* task);
-
- virtual void Log(LogLevel level, const char* file, int line,
- const char* format, ...);
-
- private:
- bool scheduler_active_;
-};
-
-} // namespace sync_notifier
-
-#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_
diff --git a/chrome/browser/sync/notifier/invalidation_util.cc b/chrome/browser/sync/notifier/invalidation_util.cc
deleted file mode 100644
index 2683fae..0000000
--- a/chrome/browser/sync/notifier/invalidation_util.cc
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "chrome/browser/sync/notifier/invalidation_util.h"
-
-#include <sstream>
-
-namespace sync_notifier {
-
-void RunAndDeleteClosure(invalidation::Closure* task) {
- task->Run();
- delete task;
-}
-
-// We need to write our own protobuf to string functions because we
-// use LITE_RUNTIME, which doesn't support DebugString().
-
-std::string ObjectIdToString(
- const invalidation::ObjectId& object_id) {
- std::stringstream ss;
- ss << "{ ";
- ss << "name: " << object_id.name().string_value() << ", ";
- ss << "source: " << object_id.source();
- ss << " }";
- return ss.str();
-}
-
-std::string StatusToString(
- const invalidation::Status& status) {
- std::stringstream ss;
- ss << "{ ";
- ss << "code: " << status.code() << ", ";
- ss << "description: " << status.description();
- ss << " }";
- return ss.str();
-}
-
-std::string InvalidationToString(
- const invalidation::Invalidation& invalidation) {
- std::stringstream ss;
- ss << "{ ";
- ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
- ss << "version: " << invalidation.version() << ", ";
- ss << "components: { ";
- const invalidation::ComponentStampLog& component_stamp_log =
- invalidation.component_stamp_log();
- for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
- const invalidation::ComponentStamp& component_stamp =
- component_stamp_log.stamps(i);
- ss << "component: " << component_stamp.component() << ", ";
- ss << "time: " << component_stamp.time() << ", ";
- }
- ss << " }";
- ss << " }";
- return ss.str();
-}
-
-std::string RegistrationUpdateToString(
- const invalidation::RegistrationUpdate& update) {
- std::stringstream ss;
- ss << "{ ";
- ss << "type: " << update.type() << ", ";
- ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
- ss << "version: " << update.version() << ", ";
- ss << "sequence_number: " << update.sequence_number();
- ss << " }";
- return ss.str();
-}
-
-std::string RegistrationUpdateResultToString(
- const invalidation::RegistrationUpdateResult& update_result) {
- std::stringstream ss;
- ss << "{ ";
- ss << "operation: "
- << RegistrationUpdateToString(update_result.operation()) << ", ";
- ss << "status: " << StatusToString(update_result.status());
- ss << " }";
- return ss.str();
-}
-
-} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/invalidation_util.h b/chrome/browser/sync/notifier/invalidation_util.h
deleted file mode 100644
index 3ca5fe9..0000000
--- a/chrome/browser/sync/notifier/invalidation_util.h
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-//
-// Various utilities for dealing with invalidation data types.
-
-#ifndef CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_
-#define CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_
-
-#include <string>
-
-#include "google/cacheinvalidation/invalidation-client.h"
-
-namespace sync_notifier {
-
-void RunAndDeleteClosure(invalidation::Closure* task);
-
-// We need to write our own protobuf-to-string functions because we
-// use LITE_RUNTIME, which doesn't support DebugString().
-
-std::string ObjectIdToString(const invalidation::ObjectId& object_id);
-
-std::string StatusToString(const invalidation::Status& status);
-
-std::string InvalidationToString(
- const invalidation::Invalidation& invalidation);
-
-std::string RegistrationUpdateToString(
- const invalidation::RegistrationUpdate& update);
-
-std::string RegistrationUpdateResultToString(
- const invalidation::RegistrationUpdateResult& update_result);
-
-} // namespace sync_notifier
-
-#endif // CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_
diff --git a/chrome/browser/sync/notifier/server_notifier_thread.cc b/chrome/browser/sync/notifier/server_notifier_thread.cc
deleted file mode 100644
index fbfc999..0000000
--- a/chrome/browser/sync/notifier/server_notifier_thread.cc
+++ /dev/null
@@ -1,201 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "chrome/browser/sync/notifier/server_notifier_thread.h"
-
-#include <string>
-#include <vector>
-
-#include "base/logging.h"
-#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
-#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
-#include "chrome/browser/sync/notifier/chrome_system_resources.h"
-#include "chrome/browser/sync/notifier/invalidation_util.h"
-#include "chrome/common/net/notifier/listener/notification_defines.h"
-#include "google/cacheinvalidation/invalidation-client-impl.h"
-#include "talk/xmpp/jid.h"
-
-namespace sync_notifier {
-
-ServerNotifierThread::ServerNotifierThread(
- chrome_common_net::NetworkChangeNotifierThread*
- network_change_notifier_thread)
- : notifier::MediatorThreadImpl(network_change_notifier_thread) {}
-
-ServerNotifierThread::~ServerNotifierThread() {}
-
-void ServerNotifierThread::ListenForUpdates() {
- DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
- worker_message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(this,
- &ServerNotifierThread::StartInvalidationListener));
-}
-
-void ServerNotifierThread::SubscribeForUpdates(
- const std::vector<std::string>& subscribed_services_list) {
- DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
- worker_message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this, &ServerNotifierThread::RegisterTypesAndSignalSubscribed));
-}
-
-void ServerNotifierThread::Logout() {
- DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
- worker_message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(this,
- &ServerNotifierThread::StopInvalidationListener));
- MediatorThreadImpl::Logout();
-}
-
-void ServerNotifierThread::SendNotification(
- const OutgoingNotificationData& data) {
- DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
- NOTREACHED() << "Shouldn't send notifications if "
- << "ServerNotifierThread is used";
-}
-
-void ServerNotifierThread::Invalidate(
- const invalidation::Invalidation& invalidation,
- invalidation::Closure* callback) {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
- CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
- // Signal notification only for the invalidated types.
- parent_message_loop_->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this,
- &ServerNotifierThread::SignalIncomingNotification));
- RunAndDeleteClosure(callback);
- // A real implementation would respond to the invalidation for the
- // given object (e.g., refetch the invalidated object).
-}
-
-void ServerNotifierThread::InvalidateAll(
- invalidation::Closure* callback) {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
- CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "InvalidateAll";
- parent_message_loop_->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this,
- &ServerNotifierThread::SignalIncomingNotification));
- RunAndDeleteClosure(callback);
-}
-
-void ServerNotifierThread::AllRegistrationsLost(
- invalidation::Closure* callback) {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
- CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "AllRegistrationsLost; reregistering";
- RegisterTypes();
- RunAndDeleteClosure(callback);
-}
-
-void ServerNotifierThread::RegistrationLost(
- const invalidation::ObjectId& object_id,
- invalidation::Closure* callback) {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
- CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "RegistrationLost; reregistering: "
- << ObjectIdToString(object_id);
- RegisterTypes();
- RunAndDeleteClosure(callback);
-}
-
-void ServerNotifierThread::StartInvalidationListener() {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
-
- StopInvalidationListener();
- chrome_invalidation_client_.reset(new ChromeInvalidationClient());
-
- // TODO(akalin): If we can figure out a unique per-session key that
- // is preserved by the server, we can use that instead of kAppName.
- // What this buys us is that we then won't receive any notifications
- // that were generated by ourselves.
- const std::string kAppName = "cc_sync_listen_notifications";
- chrome_invalidation_client_->Start(kAppName, this, xmpp_client());
-}
-
-void ServerNotifierThread::RegisterTypesAndSignalSubscribed() {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
- RegisterTypes();
- parent_message_loop_->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this,
- &ServerNotifierThread::SignalSubscribed));
-}
-
-void ServerNotifierThread::RegisterTypes() {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
-
- // TODO(akalin): This is a giant hack! Make this configurable. Add
- // a mapping to/from ModelType.
- std::vector<std::string> data_types;
- data_types.push_back("AUTOFILL");
- data_types.push_back("BOOKMARK");
- data_types.push_back("EXTENSION");
- data_types.push_back("PASSWORD");
- data_types.push_back("THEME");
- data_types.push_back("TYPED_URL");
- data_types.push_back("PREFERENCE");
-
- std::vector<invalidation::ObjectId> object_ids;
-
- for (std::vector<std::string>::const_iterator it = data_types.begin();
- it != data_types.end(); ++it) {
- invalidation::ObjectId object_id;
- object_id.mutable_name()->set_string_value(*it);
- object_id.set_source(invalidation::ObjectId::CHROME_SYNC);
- object_ids.push_back(object_id);
- }
-
- for (std::vector<invalidation::ObjectId>::const_iterator it =
- object_ids.begin(); it != object_ids.end(); ++it) {
- chrome_invalidation_client_->Register(
- *it,
- invalidation::NewPermanentCallback(
- this, &ServerNotifierThread::RegisterCallback));
- }
-}
-
-void ServerNotifierThread::RegisterCallback(
- const invalidation::RegistrationUpdateResult& result) {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
- // TODO(akalin): Do something meaningful here.
- LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
-}
-
-void ServerNotifierThread::SignalSubscribed() {
- DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
- if (delegate_) {
- delegate_->OnSubscriptionStateChange(true);
- }
-}
-
-void ServerNotifierThread::SignalIncomingNotification() {
- DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
- if (delegate_) {
- // TODO(akalin): Fill this in with something meaningful.
- IncomingNotificationData notification_data;
- delegate_->OnIncomingNotification(notification_data);
- }
-}
-
-void ServerNotifierThread::StopInvalidationListener() {
- DCHECK_EQ(MessageLoop::current(), worker_message_loop());
-
- if (chrome_invalidation_client_.get()) {
- // TODO(akalin): Need to do unregisters here?
- chrome_invalidation_client_->Stop();
- }
- chrome_invalidation_client_.reset();
-}
-
-} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/server_notifier_thread.h b/chrome/browser/sync/notifier/server_notifier_thread.h
deleted file mode 100644
index 9f079e2..0000000
--- a/chrome/browser/sync/notifier/server_notifier_thread.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-//
-// This class is the (hackish) way to use the XMPP parts of
-// MediatorThread for server-issued notifications.
-//
-// TODO(akalin): Decomp MediatorThread into an XMPP service part and a
-// notifications-specific part and use the XMPP service part for
-// server-issued notifications.
-
-#ifndef CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_
-#define CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_
-
-#include <string>
-#include <vector>
-
-#include "base/scoped_ptr.h"
-#include "base/task.h"
-#include "chrome/common/net/notifier/listener/mediator_thread_impl.h"
-#include "google/cacheinvalidation/invalidation-client.h"
-
-namespace chrome_common_net {
-class NetworkChangeNotifierThread;
-} // namespace chrome_common_net
-
-namespace sync_notifier {
-
-class ChromeInvalidationClient;
-
-class ServerNotifierThread
- : public notifier::MediatorThreadImpl,
- public invalidation::InvalidationListener {
- public:
- explicit ServerNotifierThread(
- chrome_common_net::NetworkChangeNotifierThread*
- network_change_notifier_thread);
-
- virtual ~ServerNotifierThread();
-
- // Overridden to start listening to server notifications.
- virtual void ListenForUpdates();
-
- // Overridden to immediately notify the delegate that subscriptions
- // (i.e., notifications) are on. Must be called only after a call
- // to ListenForUpdates().
- virtual void SubscribeForUpdates(
- const std::vector<std::string>& subscribed_services_list);
-
- // Overridden to also stop listening to server notifications.
- virtual void Logout();
-
- // Must not be called.
- virtual void SendNotification(const OutgoingNotificationData& data);
-
- // invalidation::InvalidationListener implementation.
-
- virtual void Invalidate(const invalidation::Invalidation& invalidation,
- invalidation::Closure* callback);
-
- virtual void InvalidateAll(invalidation::Closure* callback);
-
- virtual void AllRegistrationsLost(invalidation::Closure* callback);
-
- virtual void RegistrationLost(const invalidation::ObjectId& object_id,
- invalidation::Closure* callback);
-
- private:
- // Posted to the worker thread by ListenForUpdates().
- void StartInvalidationListener();
-
- // Posted to the worker thread by SubscribeForUpdates().
- void RegisterTypesAndSignalSubscribed();
-
- // Register the sync types that we're interested in getting
- // notifications for.
- void RegisterTypes();
-
- // Called when we get a registration response back.
- void RegisterCallback(
- const invalidation::RegistrationUpdateResult& result);
-
- // Signal to the delegate that we're subscribed.
- void SignalSubscribed();
-
- // Signal to the delegate that we have an incoming notification.
- void SignalIncomingNotification();
-
- // Called by StartInvalidationListener() and posted to the worker
- // thread by Stop().
- void StopInvalidationListener();
-
- scoped_ptr<ChromeInvalidationClient> chrome_invalidation_client_;
-};
-
-} // namespace sync_notifier
-
-DISABLE_RUNNABLE_METHOD_REFCOUNT(sync_notifier::ServerNotifierThread);
-
-#endif // CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_
diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc
index 2e762ac..899798ef 100644
--- a/chrome/browser/sync/tools/sync_listen_notifications.cc
+++ b/chrome/browser/sync/tools/sync_listen_notifications.cc
@@ -2,10 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include <cstdlib>
#include <string>
+#include <sstream>
#include <vector>
#include "base/at_exit.h"
+#include "base/base64.h"
#include "base/command_line.h"
#include "base/logging.h"
#include "base/message_loop.h"
@@ -13,17 +16,15 @@
#include "base/string_util.h"
#include "base/task.h"
#include "chrome/browser/sync/notification_method.h"
-#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
-#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
-#include "chrome/browser/sync/notifier/chrome_system_resources.h"
-#include "chrome/browser/sync/notifier/invalidation_util.h"
#include "chrome/browser/sync/sync_constants.h"
#include "chrome/common/net/notifier/base/task_pump.h"
#include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h"
#include "chrome/common/net/notifier/listener/listen_task.h"
#include "chrome/common/net/notifier/listener/notification_constants.h"
#include "chrome/common/net/notifier/listener/subscribe_task.h"
+#include "chrome/common/net/notifier/listener/xml_element_util.h"
#include "google/cacheinvalidation/invalidation-client.h"
+#include "google/cacheinvalidation/invalidation-client-impl.h"
#include "talk/base/cryptstring.h"
#include "talk/base/logging.h"
#include "talk/base/sigslot.h"
@@ -193,6 +194,148 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate {
}
};
+// TODO(akalin): Move all the cache-invalidation-related stuff below
+// out of this file once we have a real Chrome-integrated
+// implementation.
+
+void RunAndDeleteClosure(invalidation::Closure* task) {
+ task->Run();
+ delete task;
+}
+
+// Simple system resources class that uses the current message loop
+// for scheduling. Assumes the current message loop is already
+// running.
+class ChromeSystemResources : public invalidation::SystemResources {
+ public:
+ ChromeSystemResources() : scheduler_active_(false) {}
+
+ ~ChromeSystemResources() {
+ CHECK(!scheduler_active_);
+ }
+
+ virtual invalidation::Time current_time() {
+ return base::Time::Now();
+ }
+
+ virtual void StartScheduler() {
+ CHECK(!scheduler_active_);
+ scheduler_active_ = true;
+ }
+
+ // We assume that the current message loop is stopped shortly after
+ // this is called, i.e. that any in-flight delayed tasks won't get
+ // run.
+ //
+ // TODO(akalin): Make sure that the above actually holds.
+ virtual void StopScheduler() {
+ CHECK(scheduler_active_);
+ scheduler_active_ = false;
+ }
+
+ virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
+ invalidation::Closure* task) {
+ if (!scheduler_active_) {
+ delete task;
+ return;
+ }
+ CHECK(invalidation::IsCallbackRepeatable(task));
+ MessageLoop::current()->PostDelayedTask(
+ FROM_HERE,
+ NewRunnableFunction(&RunAndDeleteClosure, task),
+ delay.InMillisecondsRoundedUp());
+ }
+
+ virtual void ScheduleImmediately(invalidation::Closure* task) {
+ if (!scheduler_active_) {
+ delete task;
+ return;
+ }
+ CHECK(invalidation::IsCallbackRepeatable(task));
+ MessageLoop::current()->PostTask(
+ FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
+ }
+
+ virtual void Log(LogLevel level, const char* file, int line,
+ const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ std::string result;
+ StringAppendV(&result, format, ap);
+ logging::LogMessage(file, line).stream() << result;
+ va_end(ap);
+ }
+
+ private:
+ bool scheduler_active_;
+};
+
+// We need to write our own protobuf to string functions because we
+// use LITE_RUNTIME, which doesn't support DebugString().
+
+std::string ObjectIdToString(
+ const invalidation::ObjectId& object_id) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "name: " << object_id.name().string_value() << ", ";
+ ss << "source: " << object_id.source();
+ ss << " }";
+ return ss.str();
+}
+
+std::string StatusToString(
+ const invalidation::Status& status) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "code: " << status.code() << ", ";
+ ss << "description: " << status.description();
+ ss << " }";
+ return ss.str();
+}
+
+std::string InvalidationToString(
+ const invalidation::Invalidation& invalidation) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
+ ss << "version: " << invalidation.version() << ", ";
+ ss << "components: { ";
+ const invalidation::ComponentStampLog& component_stamp_log =
+ invalidation.component_stamp_log();
+ for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
+ const invalidation::ComponentStamp& component_stamp =
+ component_stamp_log.stamps(i);
+ ss << "component: " << component_stamp.component() << ", ";
+ ss << "time: " << component_stamp.time() << ", ";
+ }
+ ss << " }";
+ ss << " }";
+ return ss.str();
+}
+
+std::string RegistrationUpdateToString(
+ const invalidation::RegistrationUpdate& update) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "type: " << update.type() << ", ";
+ ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
+ ss << "version: " << update.version() << ", ";
+ ss << "sequence_number: " << update.sequence_number();
+ ss << " }";
+ return ss.str();
+}
+
+std::string RegistrationUpdateResultToString(
+ const invalidation::RegistrationUpdateResult& update_result) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "operation: "
+ << RegistrationUpdateToString(update_result.operation()) << ", ";
+ ss << "status: " << StatusToString(update_result.status());
+ ss << " }";
+ return ss.str();
+}
+
// The actual listener for sync notifications from the cache
// invalidation service.
class ChromeInvalidationListener
@@ -203,9 +346,8 @@ class ChromeInvalidationListener
virtual void Invalidate(const invalidation::Invalidation& invalidation,
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "Invalidate: "
- << sync_notifier::InvalidationToString(invalidation);
- sync_notifier::RunAndDeleteClosure(callback);
+ LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
+ RunAndDeleteClosure(callback);
// A real implementation would respond to the invalidation for the
// given object (e.g., refetch the invalidated object).
}
@@ -213,7 +355,7 @@ class ChromeInvalidationListener
virtual void InvalidateAll(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "InvalidateAll";
- sync_notifier::RunAndDeleteClosure(callback);
+ RunAndDeleteClosure(callback);
// A real implementation would loop over the current registered
// data types and send notifications for those.
}
@@ -221,7 +363,7 @@ class ChromeInvalidationListener
virtual void AllRegistrationsLost(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "AllRegistrationsLost";
- sync_notifier::RunAndDeleteClosure(callback);
+ RunAndDeleteClosure(callback);
// A real implementation would try to re-register for all
// registered data types.
}
@@ -230,8 +372,8 @@ class ChromeInvalidationListener
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "RegistrationLost: "
- << sync_notifier::ObjectIdToString(object_id);
- sync_notifier::RunAndDeleteClosure(callback);
+ << ObjectIdToString(object_id);
+ RunAndDeleteClosure(callback);
// A real implementation would try to re-register for this
// particular data type.
}
@@ -240,6 +382,199 @@ class ChromeInvalidationListener
DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener);
};
+static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
+static const buzz::QName kQnTangoIqPacketContent(
+ "google:tango", "content");
+
+// A task that listens for ClientInvalidation messages and calls the
+// given callback on them.
+class CacheInvalidationListenTask : public buzz::XmppTask {
+ public:
+ // Takes ownership of callback.
+ CacheInvalidationListenTask(Task* parent,
+ Callback1<const std::string&>::Type* callback)
+ : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
+ virtual ~CacheInvalidationListenTask() {}
+
+ virtual int ProcessStart() {
+ LOG(INFO) << "CacheInvalidationListenTask started";
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationListenTask blocked";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationListenTask response received";
+ std::string data;
+ if (GetTangoIqPacketData(stanza, &data)) {
+ callback_->Run(data);
+ } else {
+ LOG(ERROR) << "Could not get packet data";
+ }
+ // Acknowledge receipt of the iq to the buzz server.
+ // TODO(akalin): Send an error response for malformed packets.
+ scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
+ SendStanza(response_stanza.get());
+ return STATE_RESPONSE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (IsValidTangoIqPacket(stanza)) {
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+
+ private:
+ bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
+ return
+ (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
+ (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
+ }
+
+ bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
+ std::string* data) {
+ DCHECK(IsValidTangoIqPacket(stanza));
+ const buzz::XmlElement* tango_iq_packet =
+ stanza->FirstNamed(kQnTangoIqPacket);
+ if (!tango_iq_packet) {
+ LOG(ERROR) << "Could not find tango IQ packet element";
+ return false;
+ }
+ const buzz::XmlElement* tango_iq_packet_content =
+ tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
+ if (!tango_iq_packet_content) {
+ LOG(ERROR) << "Could not find tango IQ packet content element";
+ return false;
+ }
+ *data = tango_iq_packet_content->BodyText();
+ return true;
+ }
+
+ scoped_ptr<Callback1<const std::string&>::Type> callback_;
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
+};
+
+// A task that sends a single outbound ClientInvalidation message.
+class CacheInvalidationSendMessageTask : public buzz::XmppTask {
+ public:
+ CacheInvalidationSendMessageTask(Task* parent,
+ const buzz::Jid& to_jid,
+ const std::string& msg)
+ : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
+ to_jid_(to_jid), msg_(msg) {}
+ virtual ~CacheInvalidationSendMessageTask() {}
+
+ virtual int ProcessStart() {
+ scoped_ptr<buzz::XmlElement> stanza(
+ MakeTangoIqPacket(to_jid_, task_id(), msg_));
+ LOG(INFO) << "Sending message: "
+ << notifier::XmlElementToString(*stanza.get());
+ if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
+ LOG(INFO) << "Error when sending message";
+ return STATE_ERROR;
+ }
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
+ << notifier::XmlElementToString(*stanza);
+ return STATE_DONE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (!MatchResponseIq(stanza, to_jid_, task_id())) {
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+
+ private:
+ static buzz::XmlElement* MakeTangoIqPacket(
+ const buzz::Jid& to_jid,
+ const std::string& task_id,
+ const std::string& msg) {
+ buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
+ buzz::XmlElement* tango_iq_packet =
+ new buzz::XmlElement(kQnTangoIqPacket, true);
+ iq->AddElement(tango_iq_packet);
+ buzz::XmlElement* tango_iq_packet_content =
+ new buzz::XmlElement(kQnTangoIqPacketContent, true);
+ tango_iq_packet->AddElement(tango_iq_packet_content);
+ tango_iq_packet_content->SetBodyText(msg);
+ return iq;
+ }
+
+ buzz::Jid to_jid_;
+ std::string msg_;
+
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
+};
+
+// Class that handles the details of sending and receiving client
+// invalidation packets.
+class CacheInvalidationPacketHandler {
+ public:
+ CacheInvalidationPacketHandler(
+ buzz::XmppClient* xmpp_client,
+ invalidation::NetworkEndpoint* network_endpoint,
+ const buzz::Jid& to_jid)
+ : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint),
+ to_jid_(to_jid) {
+ // Owned by xmpp_client.
+ CacheInvalidationListenTask* listen_task =
+ new CacheInvalidationListenTask(
+ xmpp_client, NewCallback(
+ this, &CacheInvalidationPacketHandler::HandleInboundPacket));
+ listen_task->Start();
+ }
+
+ void HandleOutboundPacket(invalidation::NetworkEndpoint* const &
+ network_endpoint) {
+ CHECK_EQ(network_endpoint, network_endpoint_);
+ invalidation::string message;
+ network_endpoint->TakeOutboundMessage(&message);
+ std::string encoded_message;
+ CHECK(base::Base64Encode(message, &encoded_message));
+ // Owned by xmpp_client.
+ CacheInvalidationSendMessageTask* send_message_task =
+ new CacheInvalidationSendMessageTask(xmpp_client_,
+ to_jid_,
+ encoded_message);
+ send_message_task->Start();
+ }
+
+ private:
+ void HandleInboundPacket(const std::string& packet) {
+ std::string decoded_message;
+ CHECK(base::Base64Decode(packet, &decoded_message));
+ network_endpoint_->HandleInboundMessage(decoded_message);
+ }
+
+ buzz::XmppClient* xmpp_client_;
+ invalidation::NetworkEndpoint* network_endpoint_;
+ buzz::Jid to_jid_;
+};
+
// Delegate for server-side notifications.
class CacheInvalidationNotifierDelegate
: public XmppNotificationClient::Delegate {
@@ -266,15 +601,36 @@ class CacheInvalidationNotifierDelegate
buzz::XmppClient* xmpp_client) {
LOG(INFO) << "Logged in";
+ chrome_system_resources_.StartScheduler();
+
+ invalidation::ClientType client_type;
+ client_type.set_type(invalidation::ClientType::CHROME_SYNC);
// TODO(akalin): app_name should be per-client unique.
- const std::string kAppName = "cc_sync_listen_notifications";
- chrome_invalidation_client_.Start(kAppName,
- &chrome_invalidation_listener_,
- xmpp_client);
+ std::string app_name = "cc_sync_listen_notifications";
+ invalidation::ClientConfig ticl_config;
+ invalidation_client_.reset(
+ new invalidation::InvalidationClientImpl(
+ &chrome_system_resources_, client_type,
+ app_name, &chrome_invalidation_listener_, ticl_config));
+
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ // TODO(akalin): Make tango jid configurable (so we can use
+ // staging).
+ buzz::Jid to_jid("tango@bot.talk.google.com/PROD");
+ packet_handler_.reset(
+ new CacheInvalidationPacketHandler(xmpp_client,
+ network_endpoint,
+ to_jid));
+
+ network_endpoint->RegisterOutboundListener(
+ invalidation::NewPermanentCallback(
+ packet_handler_.get(),
+ &CacheInvalidationPacketHandler::HandleOutboundPacket));
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- chrome_invalidation_client_.Register(
+ invalidation_client_->Register(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
@@ -287,39 +643,43 @@ class CacheInvalidationNotifierDelegate
// TODO(akalin): Figure out the correct place to put this.
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- chrome_invalidation_client_.Unregister(
+ invalidation_client_->Unregister(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
}
- chrome_invalidation_client_.Stop();
+ packet_handler_.reset();
+ invalidation_client_.reset();
+
+ chrome_system_resources_.StopScheduler();
}
virtual void OnError(buzz::XmppEngine::Error error, int subcode) {
LOG(INFO) << "Error: " << error << ", subcode: " << subcode;
+ packet_handler_.reset();
+ invalidation_client_.reset();
- // TODO(akalin): Figure out whether we should unregister here,
- // too.
- chrome_invalidation_client_.Stop();
+ // TODO(akalin): Figure out whether we should stop the scheduler
+ // here.
}
private:
void RegisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Registered: "
- << sync_notifier::RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
}
void UnregisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Unregistered: "
- << sync_notifier::RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result);
}
std::vector<invalidation::ObjectId> object_ids_;
+ ChromeSystemResources chrome_system_resources_;
ChromeInvalidationListener chrome_invalidation_listener_;
- sync_notifier::ChromeInvalidationClient chrome_invalidation_client_;
+ scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
+ scoped_ptr<CacheInvalidationPacketHandler> packet_handler_;
};
} // namespace
diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp
index 3343035..792bfbe 100644
--- a/chrome/browser/sync/tools/sync_tools.gyp
+++ b/chrome/browser/sync/tools/sync_tools.gyp
@@ -21,7 +21,6 @@
'dependencies': [
'<(DEPTH)/base/base.gyp:base',
'<(DEPTH)/chrome/chrome.gyp:notifier',
- '<(DEPTH)/chrome/chrome.gyp:sync_notifier',
'<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
'<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle',
],