diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-22 18:00:32 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-22 18:00:32 +0000 |
commit | 995b3daaaf10741c1fa8cb2c33b3b36a47a0f917 (patch) | |
tree | b4240ff03da3c5cedff01d0e612b002096c85e05 /chrome/browser/sync | |
parent | 50b997c85d4dfaf2d01de3448314ca15efcec789 (diff) | |
download | chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.zip chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.gz chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.bz2 |
Revert 50479 - Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use
--sync-notification-method=server to turn on).
BUG=34647
TEST=manually
Review URL: http://codereview.chromium.org/2827014
TBR=akalin@chromium.org
Review URL: http://codereview.chromium.org/2805023
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50482 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
-rw-r--r-- | chrome/browser/sync/engine/syncapi.cc | 42 | ||||
-rw-r--r-- | chrome/browser/sync/notification_method.cc | 5 | ||||
-rw-r--r-- | chrome/browser/sync/notification_method.h | 24 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc | 243 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/cache_invalidation_packet_handler.h | 58 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_invalidation_client.cc | 74 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_invalidation_client.h | 65 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_system_resources.cc | 75 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_system_resources.h | 50 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/invalidation_util.cc | 82 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/invalidation_util.h | 36 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/server_notifier_thread.cc | 201 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/server_notifier_thread.h | 100 | ||||
-rw-r--r-- | chrome/browser/sync/tools/sync_listen_notifications.cc | 412 | ||||
-rw-r--r-- | chrome/browser/sync/tools/sync_tools.gyp | 1 |
15 files changed, 404 insertions, 1064 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc index c242d47..20d6a2c 100644 --- a/chrome/browser/sync/engine/syncapi.cc +++ b/chrome/browser/sync/engine/syncapi.cc @@ -32,7 +32,6 @@ #include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" #include "chrome/browser/sync/engine/syncer_thread.h" -#include "chrome/browser/sync/notifier/server_notifier_thread.h" #include "chrome/browser/sync/protocol/autofill_specifics.pb.h" #include "chrome/browser/sync/protocol/bookmark_specifics.pb.h" #include "chrome/browser/sync/protocol/extension_specifics.pb.h" @@ -1286,23 +1285,12 @@ bool SyncManager::SyncInternal::Init( // jank if we try to shut down sync. Fix this. connection_manager()->CheckServerReachable(); - // NOTIFICATION_SERVER uses a substantially different notification - // method, so it has its own MediatorThread implementation. - // Everything else just uses MediatorThreadImpl. - notifier::MediatorThread* mediator_thread = - (notification_method == browser_sync::NOTIFICATION_SERVER) ? - static_cast<notifier::MediatorThread*>( - new sync_notifier::ServerNotifierThread( - network_change_notifier_thread)) : - static_cast<notifier::MediatorThread*>( - new notifier::MediatorThreadImpl(network_change_notifier_thread)); const bool kInitializeSsl = true; const bool kConnectImmediately = false; talk_mediator_.reset(new TalkMediatorImpl( - mediator_thread, + new notifier::MediatorThreadImpl(network_change_notifier_thread), kInitializeSsl, kConnectImmediately, invalidate_xmpp_auth_token)); - if (notification_method != browser_sync::NOTIFICATION_LEGACY && - notification_method != browser_sync::NOTIFICATION_SERVER) { + if (notification_method != browser_sync::NOTIFICATION_LEGACY) { if (notification_method == browser_sync::NOTIFICATION_TRANSITIONAL) { talk_mediator_->AddSubscribedServiceUrl( browser_sync::kSyncLegacyServiceUrl); @@ -1391,7 +1379,6 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { void SyncManager::SyncInternal::SendPendingXMPPNotification( bool new_pending_notification) { DCHECK_EQ(MessageLoop::current(), core_message_loop_); - DCHECK_NE(notification_method_, browser_sync::NOTIFICATION_SERVER); notification_pending_ = notification_pending_ || new_pending_notification; if (!notification_pending_) { LOG(INFO) << "Not sending notification: no pending notification"; @@ -1785,18 +1772,16 @@ void SyncManager::SyncInternal::HandleChannelEvent(const SyncerEvent& event) { observer_->OnSyncCycleCompleted(event.snapshot); } - if (notification_method_ != browser_sync::NOTIFICATION_SERVER) { - // TODO(chron): Consider changing this back to track has_more_to_sync - // only notify peers if a successful commit has occurred. - bool new_pending_notification = - (event.snapshot->syncer_status.num_successful_commits > 0); - core_message_loop_->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &SyncManager::SyncInternal::SendPendingXMPPNotification, - new_pending_notification)); - } + // TODO(chron): Consider changing this back to track has_more_to_sync + // only notify peers if a successful commit has occurred. + bool new_pending_notification = + (event.snapshot->syncer_status.num_successful_commits > 0); + core_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &SyncManager::SyncInternal::SendPendingXMPPNotification, + new_pending_notification)); } if (event.what_happened == SyncerEvent::PAUSED) { @@ -1919,8 +1904,7 @@ void SyncManager::SyncInternal::OnNotificationStateChange( if (syncer_thread()) { syncer_thread()->SetNotificationsEnabled(notifications_enabled); } - if ((notification_method_ != browser_sync::NOTIFICATION_SERVER) && - notifications_enabled) { + if (notifications_enabled) { // Send a notification as soon as subscriptions are on // (see http://code.google.com/p/chromium/issues/detail?id=38563 ). core_message_loop_->PostTask( diff --git a/chrome/browser/sync/notification_method.cc b/chrome/browser/sync/notification_method.cc index 9c00344..aec4a9c 100644 --- a/chrome/browser/sync/notification_method.cc +++ b/chrome/browser/sync/notification_method.cc @@ -24,9 +24,6 @@ std::string NotificationMethodToString( case NOTIFICATION_NEW: return "NOTIFICATION_NEW"; break; - case NOTIFICATION_SERVER: - return "NOTIFICATION_SERVER"; - break; default: LOG(WARNING) << "Unknown value for notification method: " << notification_method; @@ -42,8 +39,6 @@ NotificationMethod StringToNotificationMethod(const std::string& str) { return NOTIFICATION_TRANSITIONAL; } else if (str == "new") { return NOTIFICATION_NEW; - } else if (str == "server") { - return NOTIFICATION_SERVER; } LOG(WARNING) << "Unknown notification method \"" << str << "\"; using method " diff --git a/chrome/browser/sync/notification_method.h b/chrome/browser/sync/notification_method.h index 4e1bcd1..569f8d2 100644 --- a/chrome/browser/sync/notification_method.h +++ b/chrome/browser/sync/notification_method.h @@ -10,7 +10,7 @@ namespace browser_sync { // This is the matrix for the interaction between clients with -// different notification methods (except for NOTIFICATION_SERVER): +// different notification methods: // // Listen // L T N @@ -24,12 +24,6 @@ namespace browser_sync { // will receive notifications from a client sending with the row // notification method. 'E' means means that the notification will be // an empty one, which may be dropped by the server in the future. -// -// As for NOTIFICATION_SERVER, server-issued notifications will also -// simulate a peer-issued notification, so that any client with -// NOTIFICATION_TRANSITIONAL or NOTIFICATION_NEW will be able to -// receive those, too. This support will be removed once everyone is -// on NOTIFICATION_SERVER. enum NotificationMethod { // Old, broken notification method. Works only if notification @@ -39,17 +33,9 @@ enum NotificationMethod { // notifications if the notification servers don't drop empty // notifications. NOTIFICATION_TRANSITIONAL, - // New notification method. Compatible only with transitional - // notifications. - // - // NOTE: "New" is kind of a misnomer, as it refers only to - // peer-issued notifications; the plan is to migrate everyone to - // using NOTIFICATION_SERVER. + // New, ideal notification method. Compatible only with + // transitional notifications. NOTIFICATION_NEW, - - // Server-issued notifications. Compatible only with transitional - // notifications. - NOTIFICATION_SERVER, }; extern const NotificationMethod kDefaultNotificationMethod; @@ -57,8 +43,8 @@ extern const NotificationMethod kDefaultNotificationMethod; std::string NotificationMethodToString( NotificationMethod notification_method); -// If the given string is not one of "legacy", "transitional", "new", -// or "server", returns kDefaultNotificationMethod. +// If the given string is not one of "legacy", "transitional", or +// "new", returns kDefaultNotificationMethod. NotificationMethod StringToNotificationMethod(const std::string& str); } // namespace browser_sync diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc deleted file mode 100644 index dbc3d54..0000000 --- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc +++ /dev/null @@ -1,243 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" - -#include "base/base64.h" -#include "base/callback.h" -#include "base/logging.h" -#include "chrome/common/net/notifier/listener/xml_element_util.h" -#include "google/cacheinvalidation/invalidation-client.h" -#include "talk/xmpp/constants.h" -#include "talk/xmpp/jid.h" -#include "talk/xmpp/xmppclient.h" -#include "talk/xmpp/xmpptask.h" - -namespace sync_notifier { - -namespace { - -const char kBotJid[] = "tango@bot.talk.google.com"; - -// TODO(akalin): Eliminate use of 'tango' name. - -// TODO(akalin): Hash out details of tango IQ protocol. - -static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); -static const buzz::QName kQnTangoIqPacketContent( - "google:tango", "content"); - -// TODO(akalin): Move these task classes out so that they can be -// unit-tested. This'll probably be done easier once we consolidate -// all the packet sending/receiving classes. - -// A task that listens for ClientInvalidation messages and calls the -// given callback on them. -class CacheInvalidationListenTask : public buzz::XmppTask { - public: - // Takes ownership of callback. - CacheInvalidationListenTask(Task* parent, - Callback1<const std::string&>::Type* callback) - : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} - virtual ~CacheInvalidationListenTask() {} - - virtual int ProcessStart() { - LOG(INFO) << "CacheInvalidationListenTask started"; - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - LOG(INFO) << "CacheInvalidationListenTask blocked"; - return STATE_BLOCKED; - } - LOG(INFO) << "CacheInvalidationListenTask response received"; - std::string data; - if (GetTangoIqPacketData(stanza, &data)) { - callback_->Run(data); - } else { - LOG(ERROR) << "Could not get packet data"; - } - // Acknowledge receipt of the iq to the buzz server. - // TODO(akalin): Send an error response for malformed packets. - scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); - SendStanza(response_stanza.get()); - return STATE_RESPONSE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - LOG(INFO) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (IsValidTangoIqPacket(stanza)) { - LOG(INFO) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - LOG(INFO) << "Stanza skipped"; - return false; - } - - private: - bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { - return - (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && - (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); - } - - bool GetTangoIqPacketData(const buzz::XmlElement* stanza, - std::string* data) { - DCHECK(IsValidTangoIqPacket(stanza)); - const buzz::XmlElement* tango_iq_packet = - stanza->FirstNamed(kQnTangoIqPacket); - if (!tango_iq_packet) { - LOG(ERROR) << "Could not find tango IQ packet element"; - return false; - } - const buzz::XmlElement* tango_iq_packet_content = - tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); - if (!tango_iq_packet_content) { - LOG(ERROR) << "Could not find tango IQ packet content element"; - return false; - } - *data = tango_iq_packet_content->BodyText(); - return true; - } - - scoped_ptr<Callback1<const std::string&>::Type> callback_; - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); -}; - -// A task that sends a single outbound ClientInvalidation message. -class CacheInvalidationSendMessageTask : public buzz::XmppTask { - public: - CacheInvalidationSendMessageTask(Task* parent, - const buzz::Jid& to_jid, - const std::string& msg) - : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), - to_jid_(to_jid), msg_(msg) {} - virtual ~CacheInvalidationSendMessageTask() {} - - virtual int ProcessStart() { - scoped_ptr<buzz::XmlElement> stanza( - MakeTangoIqPacket(to_jid_, task_id(), msg_)); - LOG(INFO) << "Sending message: " - << notifier::XmlElementToString(*stanza.get()); - if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { - LOG(INFO) << "Error when sending message"; - return STATE_ERROR; - } - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; - return STATE_BLOCKED; - } - LOG(INFO) << "CacheInvalidationSendMessageTask response received: " - << notifier::XmlElementToString(*stanza); - // TODO(akalin): Handle errors here. - return STATE_DONE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - LOG(INFO) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (!MatchResponseIq(stanza, to_jid_, task_id())) { - LOG(INFO) << "Stanza skipped"; - return false; - } - LOG(INFO) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - - private: - static buzz::XmlElement* MakeTangoIqPacket( - const buzz::Jid& to_jid, - const std::string& task_id, - const std::string& msg) { - buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); - buzz::XmlElement* tango_iq_packet = - new buzz::XmlElement(kQnTangoIqPacket, true); - iq->AddElement(tango_iq_packet); - buzz::XmlElement* tango_iq_packet_content = - new buzz::XmlElement(kQnTangoIqPacketContent, true); - tango_iq_packet->AddElement(tango_iq_packet_content); - tango_iq_packet_content->SetBodyText(msg); - return iq; - } - - const buzz::Jid to_jid_; - std::string msg_; - - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); -}; - -} // namespace - -CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( - buzz::XmppClient* xmpp_client, - invalidation::InvalidationClient* invalidation_client) - : xmpp_client_(xmpp_client), - invalidation_client_(invalidation_client) { - CHECK(xmpp_client_); - CHECK(invalidation_client_); - invalidation::NetworkEndpoint* network_endpoint = - invalidation_client_->network_endpoint(); - CHECK(network_endpoint); - network_endpoint->RegisterOutboundListener( - invalidation::NewPermanentCallback( - this, - &CacheInvalidationPacketHandler::HandleOutboundPacket)); - // Owned by xmpp_client. - CacheInvalidationListenTask* listen_task = - new CacheInvalidationListenTask( - xmpp_client, NewCallback( - this, &CacheInvalidationPacketHandler::HandleInboundPacket)); - listen_task->Start(); -} - -CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { - invalidation::NetworkEndpoint* network_endpoint = - invalidation_client_->network_endpoint(); - CHECK(network_endpoint); - network_endpoint->RegisterOutboundListener(NULL); -} - -void CacheInvalidationPacketHandler::HandleOutboundPacket( - invalidation::NetworkEndpoint* const& network_endpoint) { - CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint()); - invalidation::string message; - network_endpoint->TakeOutboundMessage(&message); - std::string encoded_message; - if (!base::Base64Encode(message, &encoded_message)) { - LOG(ERROR) << "Could not base64-encode message to send: " - << message; - return; - } - // Owned by xmpp_client. - CacheInvalidationSendMessageTask* send_message_task = - new CacheInvalidationSendMessageTask(xmpp_client_, - buzz::Jid(kBotJid), - encoded_message); - send_message_task->Start(); -} - -void CacheInvalidationPacketHandler::HandleInboundPacket( - const std::string& packet) { - invalidation::NetworkEndpoint* network_endpoint = - invalidation_client_->network_endpoint(); - std::string decoded_message; - if (!base::Base64Decode(packet, &decoded_message)) { - LOG(ERROR) << "Could not base64-decode received message: " - << packet; - return; - } - network_endpoint->HandleInboundMessage(decoded_message); -} - -} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h deleted file mode 100644 index 2ab1b3d..0000000 --- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -// -// Class that handles the details of sending and receiving client -// invalidation packets. - -#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_ -#define CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_ - -#include <string> - -#include "base/basictypes.h" -#include "talk/xmpp/jid.h" - -namespace buzz { -class XmppClient; -} // namespace buzz - -namespace invalidation { -class InvalidationClient; -class NetworkEndpoint; -} // namespace invalidation - -namespace sync_notifier { - -// TODO(akalin): Add a NonThreadSafe member to this class and use it. - -class CacheInvalidationPacketHandler { - public: - // Starts routing packets from |invalidation_client| through - // |xmpp_client|. |invalidation_client| must not already be routing - // packets through something. Does not take ownership of - // |xmpp_client| or |invalidation_client|. - CacheInvalidationPacketHandler( - buzz::XmppClient* xmpp_client, - invalidation::InvalidationClient* invalidation_client); - - // Makes the invalidation client passed into the constructor not - // route packets through the XMPP client passed into the constructor - // anymore. - ~CacheInvalidationPacketHandler(); - - private: - void HandleOutboundPacket( - invalidation::NetworkEndpoint* const& network_endpoint); - - void HandleInboundPacket(const std::string& packet); - - buzz::XmppClient* xmpp_client_; - invalidation::InvalidationClient* invalidation_client_; - - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler); -}; - -} // namespace sync_notifier - -#endif // CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_ diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.cc b/chrome/browser/sync/notifier/chrome_invalidation_client.cc deleted file mode 100644 index 3191b8d..0000000 --- a/chrome/browser/sync/notifier/chrome_invalidation_client.cc +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" - -#include <string> -#include <vector> - -#include "base/logging.h" -#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" -#include "chrome/browser/sync/notifier/invalidation_util.h" -#include "google/cacheinvalidation/invalidation-client-impl.h" - -namespace sync_notifier { - -ChromeInvalidationClient::ChromeInvalidationClient() { - DCHECK(non_thread_safe_.CalledOnValidThread()); -} - -ChromeInvalidationClient::~ChromeInvalidationClient() { - DCHECK(non_thread_safe_.CalledOnValidThread()); - Stop(); -} - -void ChromeInvalidationClient::Start( - const std::string& app_name, - invalidation::InvalidationListener* listener, - buzz::XmppClient* xmpp_client) { - DCHECK(non_thread_safe_.CalledOnValidThread()); - Stop(); - - chrome_system_resources_.StartScheduler(); - - invalidation::ClientType client_type; - client_type.set_type(invalidation::ClientType::CHROME_SYNC); - invalidation::ClientConfig ticl_config; - invalidation_client_.reset( - new invalidation::InvalidationClientImpl( - &chrome_system_resources_, client_type, app_name, listener, - ticl_config)); - cache_invalidation_packet_handler_.reset( - new CacheInvalidationPacketHandler(xmpp_client, - invalidation_client_.get())); -} - -void ChromeInvalidationClient::Stop() { - DCHECK(non_thread_safe_.CalledOnValidThread()); - if (!invalidation_client_.get()) { - DCHECK(!cache_invalidation_packet_handler_.get()); - return; - } - - chrome_system_resources_.StopScheduler(); - - cache_invalidation_packet_handler_.reset(); - invalidation_client_.reset(); -} - -void ChromeInvalidationClient::Register( - const invalidation::ObjectId& oid, - invalidation::RegistrationCallback* callback) { - DCHECK(non_thread_safe_.CalledOnValidThread()); - invalidation_client_->Register(oid, callback); -} - -void ChromeInvalidationClient::Unregister( - const invalidation::ObjectId& oid, - invalidation::RegistrationCallback* callback) { - DCHECK(non_thread_safe_.CalledOnValidThread()); - invalidation_client_->Unregister(oid, callback); -} - -} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.h b/chrome/browser/sync/notifier/chrome_invalidation_client.h deleted file mode 100644 index e3f67b1..0000000 --- a/chrome/browser/sync/notifier/chrome_invalidation_client.h +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -// -// A simple wrapper around invalidation::InvalidationClient that -// handles all the startup/shutdown details and hookups. - -#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_ -#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_ - -#include <string> - -#include "base/basictypes.h" -#include "base/non_thread_safe.h" -#include "base/scoped_ptr.h" -#include "chrome/browser/sync/notifier/chrome_system_resources.h" -#include "google/cacheinvalidation/invalidation-client.h" - -namespace buzz { -class XmppClient; -} // namespace - -namespace sync_notifier { - -class CacheInvalidationPacketHandler; - -// TODO(akalin): Hook this up to a NetworkChangeNotifier so we can -// properly notify invalidation_client_. - -class ChromeInvalidationClient { - public: - ChromeInvalidationClient(); - - ~ChromeInvalidationClient(); - - // Does not take ownership of |listener| nor |xmpp_client|. - void Start( - const std::string& app_name, - invalidation::InvalidationListener* listener, - buzz::XmppClient* xmpp_client); - - void Stop(); - - // The following functions must only be called between calls to - // Start() and Stop(). See invalidation-client.h for documentation. - - void Register(const invalidation::ObjectId& oid, - invalidation::RegistrationCallback* callback); - - void Unregister(const invalidation::ObjectId& oid, - invalidation::RegistrationCallback* callback); - - private: - NonThreadSafe non_thread_safe_; - ChromeSystemResources chrome_system_resources_; - scoped_ptr<invalidation::InvalidationClient> invalidation_client_; - scoped_ptr<CacheInvalidationPacketHandler> - cache_invalidation_packet_handler_; - - DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationClient); -}; - -} // namespace sync_notifier - -#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_ diff --git a/chrome/browser/sync/notifier/chrome_system_resources.cc b/chrome/browser/sync/notifier/chrome_system_resources.cc deleted file mode 100644 index 0ec0210..0000000 --- a/chrome/browser/sync/notifier/chrome_system_resources.cc +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "chrome/browser/sync/notifier/chrome_system_resources.h" - -#include <cstdlib> -#include <string> - -#include "base/logging.h" -#include "base/message_loop.h" -#include "base/string_util.h" -#include "base/task.h" -#include "chrome/browser/sync/notifier/invalidation_util.h" - -namespace sync_notifier { - -ChromeSystemResources::ChromeSystemResources() - : scheduler_active_(false) {} - -ChromeSystemResources::~ChromeSystemResources() { - DCHECK(!scheduler_active_); -} - -invalidation::Time ChromeSystemResources::current_time() { - return base::Time::Now(); -} - -void ChromeSystemResources::StartScheduler() { - DCHECK(!scheduler_active_); - scheduler_active_ = true; -} - -void ChromeSystemResources::StopScheduler() { - DCHECK(scheduler_active_); - scheduler_active_ = false; -} - -void ChromeSystemResources::ScheduleWithDelay( - invalidation::TimeDelta delay, - invalidation::Closure* task) { - if (!scheduler_active_) { - delete task; - return; - } - DCHECK(invalidation::IsCallbackRepeatable(task)); - MessageLoop::current()->PostDelayedTask( - FROM_HERE, - NewRunnableFunction(&RunAndDeleteClosure, task), - delay.InMillisecondsRoundedUp()); -} - -void ChromeSystemResources::ScheduleImmediately( - invalidation::Closure* task) { - if (!scheduler_active_) { - delete task; - return; - } - DCHECK(invalidation::IsCallbackRepeatable(task)); - MessageLoop::current()->PostTask( - FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task)); -} - -void ChromeSystemResources::Log( - LogLevel level, const char* file, int line, - const char* format, ...) { - va_list ap; - va_start(ap, format); - std::string result; - StringAppendV(&result, format, ap); - logging::LogMessage(file, line).stream() << result; - va_end(ap); -} - -} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/chrome_system_resources.h b/chrome/browser/sync/notifier/chrome_system_resources.h deleted file mode 100644 index a8a98df..0000000 --- a/chrome/browser/sync/notifier/chrome_system_resources.h +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -// -// Simple system resources class that uses the current message loop -// for scheduling. Assumes the current message loop is already -// running. - -#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_ -#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_ - -#include "google/cacheinvalidation/invalidation-client.h" - -namespace sync_notifier { - -// TODO(akalin): Add a NonThreadSafe member to this class and use it. - -class ChromeSystemResources : public invalidation::SystemResources { - public: - ChromeSystemResources(); - - ~ChromeSystemResources(); - - virtual invalidation::Time current_time(); - - virtual void StartScheduler(); - - // We assume that the current message loop is stopped shortly after - // this is called, i.e. that any in-flight delayed tasks won't get - // run. - // - // TODO(akalin): Make sure that the above actually holds. Use a - // ScopedRunnableMethodFactory for better safety. - virtual void StopScheduler(); - - virtual void ScheduleWithDelay(invalidation::TimeDelta delay, - invalidation::Closure* task); - - virtual void ScheduleImmediately(invalidation::Closure* task); - - virtual void Log(LogLevel level, const char* file, int line, - const char* format, ...); - - private: - bool scheduler_active_; -}; - -} // namespace sync_notifier - -#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_ diff --git a/chrome/browser/sync/notifier/invalidation_util.cc b/chrome/browser/sync/notifier/invalidation_util.cc deleted file mode 100644 index 2683fae..0000000 --- a/chrome/browser/sync/notifier/invalidation_util.cc +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "chrome/browser/sync/notifier/invalidation_util.h" - -#include <sstream> - -namespace sync_notifier { - -void RunAndDeleteClosure(invalidation::Closure* task) { - task->Run(); - delete task; -} - -// We need to write our own protobuf to string functions because we -// use LITE_RUNTIME, which doesn't support DebugString(). - -std::string ObjectIdToString( - const invalidation::ObjectId& object_id) { - std::stringstream ss; - ss << "{ "; - ss << "name: " << object_id.name().string_value() << ", "; - ss << "source: " << object_id.source(); - ss << " }"; - return ss.str(); -} - -std::string StatusToString( - const invalidation::Status& status) { - std::stringstream ss; - ss << "{ "; - ss << "code: " << status.code() << ", "; - ss << "description: " << status.description(); - ss << " }"; - return ss.str(); -} - -std::string InvalidationToString( - const invalidation::Invalidation& invalidation) { - std::stringstream ss; - ss << "{ "; - ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", "; - ss << "version: " << invalidation.version() << ", "; - ss << "components: { "; - const invalidation::ComponentStampLog& component_stamp_log = - invalidation.component_stamp_log(); - for (int i = 0; i < component_stamp_log.stamps_size(); ++i) { - const invalidation::ComponentStamp& component_stamp = - component_stamp_log.stamps(i); - ss << "component: " << component_stamp.component() << ", "; - ss << "time: " << component_stamp.time() << ", "; - } - ss << " }"; - ss << " }"; - return ss.str(); -} - -std::string RegistrationUpdateToString( - const invalidation::RegistrationUpdate& update) { - std::stringstream ss; - ss << "{ "; - ss << "type: " << update.type() << ", "; - ss << "object_id: " << ObjectIdToString(update.object_id()) << ", "; - ss << "version: " << update.version() << ", "; - ss << "sequence_number: " << update.sequence_number(); - ss << " }"; - return ss.str(); -} - -std::string RegistrationUpdateResultToString( - const invalidation::RegistrationUpdateResult& update_result) { - std::stringstream ss; - ss << "{ "; - ss << "operation: " - << RegistrationUpdateToString(update_result.operation()) << ", "; - ss << "status: " << StatusToString(update_result.status()); - ss << " }"; - return ss.str(); -} - -} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/invalidation_util.h b/chrome/browser/sync/notifier/invalidation_util.h deleted file mode 100644 index 3ca5fe9..0000000 --- a/chrome/browser/sync/notifier/invalidation_util.h +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -// -// Various utilities for dealing with invalidation data types. - -#ifndef CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_ -#define CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_ - -#include <string> - -#include "google/cacheinvalidation/invalidation-client.h" - -namespace sync_notifier { - -void RunAndDeleteClosure(invalidation::Closure* task); - -// We need to write our own protobuf-to-string functions because we -// use LITE_RUNTIME, which doesn't support DebugString(). - -std::string ObjectIdToString(const invalidation::ObjectId& object_id); - -std::string StatusToString(const invalidation::Status& status); - -std::string InvalidationToString( - const invalidation::Invalidation& invalidation); - -std::string RegistrationUpdateToString( - const invalidation::RegistrationUpdate& update); - -std::string RegistrationUpdateResultToString( - const invalidation::RegistrationUpdateResult& update_result); - -} // namespace sync_notifier - -#endif // CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_ diff --git a/chrome/browser/sync/notifier/server_notifier_thread.cc b/chrome/browser/sync/notifier/server_notifier_thread.cc deleted file mode 100644 index fbfc999..0000000 --- a/chrome/browser/sync/notifier/server_notifier_thread.cc +++ /dev/null @@ -1,201 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "chrome/browser/sync/notifier/server_notifier_thread.h" - -#include <string> -#include <vector> - -#include "base/logging.h" -#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" -#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" -#include "chrome/browser/sync/notifier/chrome_system_resources.h" -#include "chrome/browser/sync/notifier/invalidation_util.h" -#include "chrome/common/net/notifier/listener/notification_defines.h" -#include "google/cacheinvalidation/invalidation-client-impl.h" -#include "talk/xmpp/jid.h" - -namespace sync_notifier { - -ServerNotifierThread::ServerNotifierThread( - chrome_common_net::NetworkChangeNotifierThread* - network_change_notifier_thread) - : notifier::MediatorThreadImpl(network_change_notifier_thread) {} - -ServerNotifierThread::~ServerNotifierThread() {} - -void ServerNotifierThread::ListenForUpdates() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, - &ServerNotifierThread::StartInvalidationListener)); -} - -void ServerNotifierThread::SubscribeForUpdates( - const std::vector<std::string>& subscribed_services_list) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod( - this, &ServerNotifierThread::RegisterTypesAndSignalSubscribed)); -} - -void ServerNotifierThread::Logout() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, - &ServerNotifierThread::StopInvalidationListener)); - MediatorThreadImpl::Logout(); -} - -void ServerNotifierThread::SendNotification( - const OutgoingNotificationData& data) { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); - NOTREACHED() << "Shouldn't send notifications if " - << "ServerNotifierThread is used"; -} - -void ServerNotifierThread::Invalidate( - const invalidation::Invalidation& invalidation, - invalidation::Closure* callback) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation); - // Signal notification only for the invalidated types. - parent_message_loop_->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &ServerNotifierThread::SignalIncomingNotification)); - RunAndDeleteClosure(callback); - // A real implementation would respond to the invalidation for the - // given object (e.g., refetch the invalidated object). -} - -void ServerNotifierThread::InvalidateAll( - invalidation::Closure* callback) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "InvalidateAll"; - parent_message_loop_->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &ServerNotifierThread::SignalIncomingNotification)); - RunAndDeleteClosure(callback); -} - -void ServerNotifierThread::AllRegistrationsLost( - invalidation::Closure* callback) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "AllRegistrationsLost; reregistering"; - RegisterTypes(); - RunAndDeleteClosure(callback); -} - -void ServerNotifierThread::RegistrationLost( - const invalidation::ObjectId& object_id, - invalidation::Closure* callback) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "RegistrationLost; reregistering: " - << ObjectIdToString(object_id); - RegisterTypes(); - RunAndDeleteClosure(callback); -} - -void ServerNotifierThread::StartInvalidationListener() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - - StopInvalidationListener(); - chrome_invalidation_client_.reset(new ChromeInvalidationClient()); - - // TODO(akalin): If we can figure out a unique per-session key that - // is preserved by the server, we can use that instead of kAppName. - // What this buys us is that we then won't receive any notifications - // that were generated by ourselves. - const std::string kAppName = "cc_sync_listen_notifications"; - chrome_invalidation_client_->Start(kAppName, this, xmpp_client()); -} - -void ServerNotifierThread::RegisterTypesAndSignalSubscribed() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - RegisterTypes(); - parent_message_loop_->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &ServerNotifierThread::SignalSubscribed)); -} - -void ServerNotifierThread::RegisterTypes() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - - // TODO(akalin): This is a giant hack! Make this configurable. Add - // a mapping to/from ModelType. - std::vector<std::string> data_types; - data_types.push_back("AUTOFILL"); - data_types.push_back("BOOKMARK"); - data_types.push_back("EXTENSION"); - data_types.push_back("PASSWORD"); - data_types.push_back("THEME"); - data_types.push_back("TYPED_URL"); - data_types.push_back("PREFERENCE"); - - std::vector<invalidation::ObjectId> object_ids; - - for (std::vector<std::string>::const_iterator it = data_types.begin(); - it != data_types.end(); ++it) { - invalidation::ObjectId object_id; - object_id.mutable_name()->set_string_value(*it); - object_id.set_source(invalidation::ObjectId::CHROME_SYNC); - object_ids.push_back(object_id); - } - - for (std::vector<invalidation::ObjectId>::const_iterator it = - object_ids.begin(); it != object_ids.end(); ++it) { - chrome_invalidation_client_->Register( - *it, - invalidation::NewPermanentCallback( - this, &ServerNotifierThread::RegisterCallback)); - } -} - -void ServerNotifierThread::RegisterCallback( - const invalidation::RegistrationUpdateResult& result) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - // TODO(akalin): Do something meaningful here. - LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result); -} - -void ServerNotifierThread::SignalSubscribed() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); - if (delegate_) { - delegate_->OnSubscriptionStateChange(true); - } -} - -void ServerNotifierThread::SignalIncomingNotification() { - DCHECK_EQ(MessageLoop::current(), parent_message_loop_); - if (delegate_) { - // TODO(akalin): Fill this in with something meaningful. - IncomingNotificationData notification_data; - delegate_->OnIncomingNotification(notification_data); - } -} - -void ServerNotifierThread::StopInvalidationListener() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); - - if (chrome_invalidation_client_.get()) { - // TODO(akalin): Need to do unregisters here? - chrome_invalidation_client_->Stop(); - } - chrome_invalidation_client_.reset(); -} - -} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/server_notifier_thread.h b/chrome/browser/sync/notifier/server_notifier_thread.h deleted file mode 100644 index 9f079e2..0000000 --- a/chrome/browser/sync/notifier/server_notifier_thread.h +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. -// -// This class is the (hackish) way to use the XMPP parts of -// MediatorThread for server-issued notifications. -// -// TODO(akalin): Decomp MediatorThread into an XMPP service part and a -// notifications-specific part and use the XMPP service part for -// server-issued notifications. - -#ifndef CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_ -#define CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_ - -#include <string> -#include <vector> - -#include "base/scoped_ptr.h" -#include "base/task.h" -#include "chrome/common/net/notifier/listener/mediator_thread_impl.h" -#include "google/cacheinvalidation/invalidation-client.h" - -namespace chrome_common_net { -class NetworkChangeNotifierThread; -} // namespace chrome_common_net - -namespace sync_notifier { - -class ChromeInvalidationClient; - -class ServerNotifierThread - : public notifier::MediatorThreadImpl, - public invalidation::InvalidationListener { - public: - explicit ServerNotifierThread( - chrome_common_net::NetworkChangeNotifierThread* - network_change_notifier_thread); - - virtual ~ServerNotifierThread(); - - // Overridden to start listening to server notifications. - virtual void ListenForUpdates(); - - // Overridden to immediately notify the delegate that subscriptions - // (i.e., notifications) are on. Must be called only after a call - // to ListenForUpdates(). - virtual void SubscribeForUpdates( - const std::vector<std::string>& subscribed_services_list); - - // Overridden to also stop listening to server notifications. - virtual void Logout(); - - // Must not be called. - virtual void SendNotification(const OutgoingNotificationData& data); - - // invalidation::InvalidationListener implementation. - - virtual void Invalidate(const invalidation::Invalidation& invalidation, - invalidation::Closure* callback); - - virtual void InvalidateAll(invalidation::Closure* callback); - - virtual void AllRegistrationsLost(invalidation::Closure* callback); - - virtual void RegistrationLost(const invalidation::ObjectId& object_id, - invalidation::Closure* callback); - - private: - // Posted to the worker thread by ListenForUpdates(). - void StartInvalidationListener(); - - // Posted to the worker thread by SubscribeForUpdates(). - void RegisterTypesAndSignalSubscribed(); - - // Register the sync types that we're interested in getting - // notifications for. - void RegisterTypes(); - - // Called when we get a registration response back. - void RegisterCallback( - const invalidation::RegistrationUpdateResult& result); - - // Signal to the delegate that we're subscribed. - void SignalSubscribed(); - - // Signal to the delegate that we have an incoming notification. - void SignalIncomingNotification(); - - // Called by StartInvalidationListener() and posted to the worker - // thread by Stop(). - void StopInvalidationListener(); - - scoped_ptr<ChromeInvalidationClient> chrome_invalidation_client_; -}; - -} // namespace sync_notifier - -DISABLE_RUNNABLE_METHOD_REFCOUNT(sync_notifier::ServerNotifierThread); - -#endif // CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_ diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc index 2e762ac..899798ef 100644 --- a/chrome/browser/sync/tools/sync_listen_notifications.cc +++ b/chrome/browser/sync/tools/sync_listen_notifications.cc @@ -2,10 +2,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include <cstdlib> #include <string> +#include <sstream> #include <vector> #include "base/at_exit.h" +#include "base/base64.h" #include "base/command_line.h" #include "base/logging.h" #include "base/message_loop.h" @@ -13,17 +16,15 @@ #include "base/string_util.h" #include "base/task.h" #include "chrome/browser/sync/notification_method.h" -#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" -#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" -#include "chrome/browser/sync/notifier/chrome_system_resources.h" -#include "chrome/browser/sync/notifier/invalidation_util.h" #include "chrome/browser/sync/sync_constants.h" #include "chrome/common/net/notifier/base/task_pump.h" #include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h" #include "chrome/common/net/notifier/listener/listen_task.h" #include "chrome/common/net/notifier/listener/notification_constants.h" #include "chrome/common/net/notifier/listener/subscribe_task.h" +#include "chrome/common/net/notifier/listener/xml_element_util.h" #include "google/cacheinvalidation/invalidation-client.h" +#include "google/cacheinvalidation/invalidation-client-impl.h" #include "talk/base/cryptstring.h" #include "talk/base/logging.h" #include "talk/base/sigslot.h" @@ -193,6 +194,148 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate { } }; +// TODO(akalin): Move all the cache-invalidation-related stuff below +// out of this file once we have a real Chrome-integrated +// implementation. + +void RunAndDeleteClosure(invalidation::Closure* task) { + task->Run(); + delete task; +} + +// Simple system resources class that uses the current message loop +// for scheduling. Assumes the current message loop is already +// running. +class ChromeSystemResources : public invalidation::SystemResources { + public: + ChromeSystemResources() : scheduler_active_(false) {} + + ~ChromeSystemResources() { + CHECK(!scheduler_active_); + } + + virtual invalidation::Time current_time() { + return base::Time::Now(); + } + + virtual void StartScheduler() { + CHECK(!scheduler_active_); + scheduler_active_ = true; + } + + // We assume that the current message loop is stopped shortly after + // this is called, i.e. that any in-flight delayed tasks won't get + // run. + // + // TODO(akalin): Make sure that the above actually holds. + virtual void StopScheduler() { + CHECK(scheduler_active_); + scheduler_active_ = false; + } + + virtual void ScheduleWithDelay(invalidation::TimeDelta delay, + invalidation::Closure* task) { + if (!scheduler_active_) { + delete task; + return; + } + CHECK(invalidation::IsCallbackRepeatable(task)); + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + NewRunnableFunction(&RunAndDeleteClosure, task), + delay.InMillisecondsRoundedUp()); + } + + virtual void ScheduleImmediately(invalidation::Closure* task) { + if (!scheduler_active_) { + delete task; + return; + } + CHECK(invalidation::IsCallbackRepeatable(task)); + MessageLoop::current()->PostTask( + FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task)); + } + + virtual void Log(LogLevel level, const char* file, int line, + const char* format, ...) { + va_list ap; + va_start(ap, format); + std::string result; + StringAppendV(&result, format, ap); + logging::LogMessage(file, line).stream() << result; + va_end(ap); + } + + private: + bool scheduler_active_; +}; + +// We need to write our own protobuf to string functions because we +// use LITE_RUNTIME, which doesn't support DebugString(). + +std::string ObjectIdToString( + const invalidation::ObjectId& object_id) { + std::stringstream ss; + ss << "{ "; + ss << "name: " << object_id.name().string_value() << ", "; + ss << "source: " << object_id.source(); + ss << " }"; + return ss.str(); +} + +std::string StatusToString( + const invalidation::Status& status) { + std::stringstream ss; + ss << "{ "; + ss << "code: " << status.code() << ", "; + ss << "description: " << status.description(); + ss << " }"; + return ss.str(); +} + +std::string InvalidationToString( + const invalidation::Invalidation& invalidation) { + std::stringstream ss; + ss << "{ "; + ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", "; + ss << "version: " << invalidation.version() << ", "; + ss << "components: { "; + const invalidation::ComponentStampLog& component_stamp_log = + invalidation.component_stamp_log(); + for (int i = 0; i < component_stamp_log.stamps_size(); ++i) { + const invalidation::ComponentStamp& component_stamp = + component_stamp_log.stamps(i); + ss << "component: " << component_stamp.component() << ", "; + ss << "time: " << component_stamp.time() << ", "; + } + ss << " }"; + ss << " }"; + return ss.str(); +} + +std::string RegistrationUpdateToString( + const invalidation::RegistrationUpdate& update) { + std::stringstream ss; + ss << "{ "; + ss << "type: " << update.type() << ", "; + ss << "object_id: " << ObjectIdToString(update.object_id()) << ", "; + ss << "version: " << update.version() << ", "; + ss << "sequence_number: " << update.sequence_number(); + ss << " }"; + return ss.str(); +} + +std::string RegistrationUpdateResultToString( + const invalidation::RegistrationUpdateResult& update_result) { + std::stringstream ss; + ss << "{ "; + ss << "operation: " + << RegistrationUpdateToString(update_result.operation()) << ", "; + ss << "status: " << StatusToString(update_result.status()); + ss << " }"; + return ss.str(); +} + // The actual listener for sync notifications from the cache // invalidation service. class ChromeInvalidationListener @@ -203,9 +346,8 @@ class ChromeInvalidationListener virtual void Invalidate(const invalidation::Invalidation& invalidation, invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "Invalidate: " - << sync_notifier::InvalidationToString(invalidation); - sync_notifier::RunAndDeleteClosure(callback); + LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation); + RunAndDeleteClosure(callback); // A real implementation would respond to the invalidation for the // given object (e.g., refetch the invalidated object). } @@ -213,7 +355,7 @@ class ChromeInvalidationListener virtual void InvalidateAll(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "InvalidateAll"; - sync_notifier::RunAndDeleteClosure(callback); + RunAndDeleteClosure(callback); // A real implementation would loop over the current registered // data types and send notifications for those. } @@ -221,7 +363,7 @@ class ChromeInvalidationListener virtual void AllRegistrationsLost(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "AllRegistrationsLost"; - sync_notifier::RunAndDeleteClosure(callback); + RunAndDeleteClosure(callback); // A real implementation would try to re-register for all // registered data types. } @@ -230,8 +372,8 @@ class ChromeInvalidationListener invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "RegistrationLost: " - << sync_notifier::ObjectIdToString(object_id); - sync_notifier::RunAndDeleteClosure(callback); + << ObjectIdToString(object_id); + RunAndDeleteClosure(callback); // A real implementation would try to re-register for this // particular data type. } @@ -240,6 +382,199 @@ class ChromeInvalidationListener DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener); }; +static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); +static const buzz::QName kQnTangoIqPacketContent( + "google:tango", "content"); + +// A task that listens for ClientInvalidation messages and calls the +// given callback on them. +class CacheInvalidationListenTask : public buzz::XmppTask { + public: + // Takes ownership of callback. + CacheInvalidationListenTask(Task* parent, + Callback1<const std::string&>::Type* callback) + : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} + virtual ~CacheInvalidationListenTask() {} + + virtual int ProcessStart() { + LOG(INFO) << "CacheInvalidationListenTask started"; + return STATE_RESPONSE; + } + + virtual int ProcessResponse() { + const buzz::XmlElement* stanza = NextStanza(); + if (stanza == NULL) { + LOG(INFO) << "CacheInvalidationListenTask blocked"; + return STATE_BLOCKED; + } + LOG(INFO) << "CacheInvalidationListenTask response received"; + std::string data; + if (GetTangoIqPacketData(stanza, &data)) { + callback_->Run(data); + } else { + LOG(ERROR) << "Could not get packet data"; + } + // Acknowledge receipt of the iq to the buzz server. + // TODO(akalin): Send an error response for malformed packets. + scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); + SendStanza(response_stanza.get()); + return STATE_RESPONSE; + } + + virtual bool HandleStanza(const buzz::XmlElement* stanza) { + LOG(INFO) << "Stanza received: " + << notifier::XmlElementToString(*stanza); + if (IsValidTangoIqPacket(stanza)) { + LOG(INFO) << "Queueing stanza"; + QueueStanza(stanza); + return true; + } + LOG(INFO) << "Stanza skipped"; + return false; + } + + private: + bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { + return + (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && + (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); + } + + bool GetTangoIqPacketData(const buzz::XmlElement* stanza, + std::string* data) { + DCHECK(IsValidTangoIqPacket(stanza)); + const buzz::XmlElement* tango_iq_packet = + stanza->FirstNamed(kQnTangoIqPacket); + if (!tango_iq_packet) { + LOG(ERROR) << "Could not find tango IQ packet element"; + return false; + } + const buzz::XmlElement* tango_iq_packet_content = + tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); + if (!tango_iq_packet_content) { + LOG(ERROR) << "Could not find tango IQ packet content element"; + return false; + } + *data = tango_iq_packet_content->BodyText(); + return true; + } + + scoped_ptr<Callback1<const std::string&>::Type> callback_; + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); +}; + +// A task that sends a single outbound ClientInvalidation message. +class CacheInvalidationSendMessageTask : public buzz::XmppTask { + public: + CacheInvalidationSendMessageTask(Task* parent, + const buzz::Jid& to_jid, + const std::string& msg) + : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), + to_jid_(to_jid), msg_(msg) {} + virtual ~CacheInvalidationSendMessageTask() {} + + virtual int ProcessStart() { + scoped_ptr<buzz::XmlElement> stanza( + MakeTangoIqPacket(to_jid_, task_id(), msg_)); + LOG(INFO) << "Sending message: " + << notifier::XmlElementToString(*stanza.get()); + if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { + LOG(INFO) << "Error when sending message"; + return STATE_ERROR; + } + return STATE_RESPONSE; + } + + virtual int ProcessResponse() { + const buzz::XmlElement* stanza = NextStanza(); + if (stanza == NULL) { + LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; + return STATE_BLOCKED; + } + LOG(INFO) << "CacheInvalidationSendMessageTask response received: " + << notifier::XmlElementToString(*stanza); + return STATE_DONE; + } + + virtual bool HandleStanza(const buzz::XmlElement* stanza) { + LOG(INFO) << "Stanza received: " + << notifier::XmlElementToString(*stanza); + if (!MatchResponseIq(stanza, to_jid_, task_id())) { + LOG(INFO) << "Stanza skipped"; + return false; + } + LOG(INFO) << "Queueing stanza"; + QueueStanza(stanza); + return true; + } + + private: + static buzz::XmlElement* MakeTangoIqPacket( + const buzz::Jid& to_jid, + const std::string& task_id, + const std::string& msg) { + buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); + buzz::XmlElement* tango_iq_packet = + new buzz::XmlElement(kQnTangoIqPacket, true); + iq->AddElement(tango_iq_packet); + buzz::XmlElement* tango_iq_packet_content = + new buzz::XmlElement(kQnTangoIqPacketContent, true); + tango_iq_packet->AddElement(tango_iq_packet_content); + tango_iq_packet_content->SetBodyText(msg); + return iq; + } + + buzz::Jid to_jid_; + std::string msg_; + + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); +}; + +// Class that handles the details of sending and receiving client +// invalidation packets. +class CacheInvalidationPacketHandler { + public: + CacheInvalidationPacketHandler( + buzz::XmppClient* xmpp_client, + invalidation::NetworkEndpoint* network_endpoint, + const buzz::Jid& to_jid) + : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint), + to_jid_(to_jid) { + // Owned by xmpp_client. + CacheInvalidationListenTask* listen_task = + new CacheInvalidationListenTask( + xmpp_client, NewCallback( + this, &CacheInvalidationPacketHandler::HandleInboundPacket)); + listen_task->Start(); + } + + void HandleOutboundPacket(invalidation::NetworkEndpoint* const & + network_endpoint) { + CHECK_EQ(network_endpoint, network_endpoint_); + invalidation::string message; + network_endpoint->TakeOutboundMessage(&message); + std::string encoded_message; + CHECK(base::Base64Encode(message, &encoded_message)); + // Owned by xmpp_client. + CacheInvalidationSendMessageTask* send_message_task = + new CacheInvalidationSendMessageTask(xmpp_client_, + to_jid_, + encoded_message); + send_message_task->Start(); + } + + private: + void HandleInboundPacket(const std::string& packet) { + std::string decoded_message; + CHECK(base::Base64Decode(packet, &decoded_message)); + network_endpoint_->HandleInboundMessage(decoded_message); + } + + buzz::XmppClient* xmpp_client_; + invalidation::NetworkEndpoint* network_endpoint_; + buzz::Jid to_jid_; +}; + // Delegate for server-side notifications. class CacheInvalidationNotifierDelegate : public XmppNotificationClient::Delegate { @@ -266,15 +601,36 @@ class CacheInvalidationNotifierDelegate buzz::XmppClient* xmpp_client) { LOG(INFO) << "Logged in"; + chrome_system_resources_.StartScheduler(); + + invalidation::ClientType client_type; + client_type.set_type(invalidation::ClientType::CHROME_SYNC); // TODO(akalin): app_name should be per-client unique. - const std::string kAppName = "cc_sync_listen_notifications"; - chrome_invalidation_client_.Start(kAppName, - &chrome_invalidation_listener_, - xmpp_client); + std::string app_name = "cc_sync_listen_notifications"; + invalidation::ClientConfig ticl_config; + invalidation_client_.reset( + new invalidation::InvalidationClientImpl( + &chrome_system_resources_, client_type, + app_name, &chrome_invalidation_listener_, ticl_config)); + + invalidation::NetworkEndpoint* network_endpoint = + invalidation_client_->network_endpoint(); + // TODO(akalin): Make tango jid configurable (so we can use + // staging). + buzz::Jid to_jid("tango@bot.talk.google.com/PROD"); + packet_handler_.reset( + new CacheInvalidationPacketHandler(xmpp_client, + network_endpoint, + to_jid)); + + network_endpoint->RegisterOutboundListener( + invalidation::NewPermanentCallback( + packet_handler_.get(), + &CacheInvalidationPacketHandler::HandleOutboundPacket)); for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - chrome_invalidation_client_.Register( + invalidation_client_->Register( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); @@ -287,39 +643,43 @@ class CacheInvalidationNotifierDelegate // TODO(akalin): Figure out the correct place to put this. for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - chrome_invalidation_client_.Unregister( + invalidation_client_->Unregister( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); } - chrome_invalidation_client_.Stop(); + packet_handler_.reset(); + invalidation_client_.reset(); + + chrome_system_resources_.StopScheduler(); } virtual void OnError(buzz::XmppEngine::Error error, int subcode) { LOG(INFO) << "Error: " << error << ", subcode: " << subcode; + packet_handler_.reset(); + invalidation_client_.reset(); - // TODO(akalin): Figure out whether we should unregister here, - // too. - chrome_invalidation_client_.Stop(); + // TODO(akalin): Figure out whether we should stop the scheduler + // here. } private: void RegisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Registered: " - << sync_notifier::RegistrationUpdateResultToString(result); + LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result); } void UnregisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Unregistered: " - << sync_notifier::RegistrationUpdateResultToString(result); + LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result); } std::vector<invalidation::ObjectId> object_ids_; + ChromeSystemResources chrome_system_resources_; ChromeInvalidationListener chrome_invalidation_listener_; - sync_notifier::ChromeInvalidationClient chrome_invalidation_client_; + scoped_ptr<invalidation::InvalidationClient> invalidation_client_; + scoped_ptr<CacheInvalidationPacketHandler> packet_handler_; }; } // namespace diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp index 3343035..792bfbe 100644 --- a/chrome/browser/sync/tools/sync_tools.gyp +++ b/chrome/browser/sync/tools/sync_tools.gyp @@ -21,7 +21,6 @@ 'dependencies': [ '<(DEPTH)/base/base.gyp:base', '<(DEPTH)/chrome/chrome.gyp:notifier', - '<(DEPTH)/chrome/chrome.gyp:sync_notifier', '<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation', '<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle', ], |