diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-23 00:16:45 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-23 00:16:45 +0000 |
commit | 3d47a26aed8fd135280d4924d9e9bb36cb624946 (patch) | |
tree | f515a3604302e123d67a04c80079df4da6fc6b35 /chrome/browser/sync | |
parent | 51e413c3e9f11186b5026123a4610dd1dc005416 (diff) | |
download | chromium_src-3d47a26aed8fd135280d4924d9e9bb36cb624946.zip chromium_src-3d47a26aed8fd135280d4924d9e9bb36cb624946.tar.gz chromium_src-3d47a26aed8fd135280d4924d9e9bb36cb624946.tar.bz2 |
Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use
--sync-notification-method=server to turn on).
BUG=34647
TEST=manually
Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=50479
Review URL: http://codereview.chromium.org/2827014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50550 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
-rw-r--r-- | chrome/browser/sync/engine/syncapi.cc | 42 | ||||
-rw-r--r-- | chrome/browser/sync/notification_method.cc | 5 | ||||
-rw-r--r-- | chrome/browser/sync/notification_method.h | 24 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc | 243 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/cache_invalidation_packet_handler.h | 58 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_invalidation_client.cc | 74 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_invalidation_client.h | 65 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_system_resources.cc | 75 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/chrome_system_resources.h | 50 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/invalidation_util.cc | 82 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/invalidation_util.h | 36 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/server_notifier_thread.cc | 201 | ||||
-rw-r--r-- | chrome/browser/sync/notifier/server_notifier_thread.h | 100 | ||||
-rw-r--r-- | chrome/browser/sync/tools/sync_listen_notifications.cc | 412 | ||||
-rw-r--r-- | chrome/browser/sync/tools/sync_tools.gyp | 1 |
15 files changed, 1064 insertions, 404 deletions
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc index 20d6a2c..c242d47 100644 --- a/chrome/browser/sync/engine/syncapi.cc +++ b/chrome/browser/sync/engine/syncapi.cc @@ -32,6 +32,7 @@ #include "chrome/browser/sync/engine/net/syncapi_server_connection_manager.h" #include "chrome/browser/sync/engine/syncer.h" #include "chrome/browser/sync/engine/syncer_thread.h" +#include "chrome/browser/sync/notifier/server_notifier_thread.h" #include "chrome/browser/sync/protocol/autofill_specifics.pb.h" #include "chrome/browser/sync/protocol/bookmark_specifics.pb.h" #include "chrome/browser/sync/protocol/extension_specifics.pb.h" @@ -1285,12 +1286,23 @@ bool SyncManager::SyncInternal::Init( // jank if we try to shut down sync. Fix this. connection_manager()->CheckServerReachable(); + // NOTIFICATION_SERVER uses a substantially different notification + // method, so it has its own MediatorThread implementation. + // Everything else just uses MediatorThreadImpl. + notifier::MediatorThread* mediator_thread = + (notification_method == browser_sync::NOTIFICATION_SERVER) ? + static_cast<notifier::MediatorThread*>( + new sync_notifier::ServerNotifierThread( + network_change_notifier_thread)) : + static_cast<notifier::MediatorThread*>( + new notifier::MediatorThreadImpl(network_change_notifier_thread)); const bool kInitializeSsl = true; const bool kConnectImmediately = false; talk_mediator_.reset(new TalkMediatorImpl( - new notifier::MediatorThreadImpl(network_change_notifier_thread), + mediator_thread, kInitializeSsl, kConnectImmediately, invalidate_xmpp_auth_token)); - if (notification_method != browser_sync::NOTIFICATION_LEGACY) { + if (notification_method != browser_sync::NOTIFICATION_LEGACY && + notification_method != browser_sync::NOTIFICATION_SERVER) { if (notification_method == browser_sync::NOTIFICATION_TRANSITIONAL) { talk_mediator_->AddSubscribedServiceUrl( browser_sync::kSyncLegacyServiceUrl); @@ -1379,6 +1391,7 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { void SyncManager::SyncInternal::SendPendingXMPPNotification( bool new_pending_notification) { DCHECK_EQ(MessageLoop::current(), core_message_loop_); + DCHECK_NE(notification_method_, browser_sync::NOTIFICATION_SERVER); notification_pending_ = notification_pending_ || new_pending_notification; if (!notification_pending_) { LOG(INFO) << "Not sending notification: no pending notification"; @@ -1772,16 +1785,18 @@ void SyncManager::SyncInternal::HandleChannelEvent(const SyncerEvent& event) { observer_->OnSyncCycleCompleted(event.snapshot); } - // TODO(chron): Consider changing this back to track has_more_to_sync - // only notify peers if a successful commit has occurred. - bool new_pending_notification = - (event.snapshot->syncer_status.num_successful_commits > 0); - core_message_loop_->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &SyncManager::SyncInternal::SendPendingXMPPNotification, - new_pending_notification)); + if (notification_method_ != browser_sync::NOTIFICATION_SERVER) { + // TODO(chron): Consider changing this back to track has_more_to_sync + // only notify peers if a successful commit has occurred. + bool new_pending_notification = + (event.snapshot->syncer_status.num_successful_commits > 0); + core_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &SyncManager::SyncInternal::SendPendingXMPPNotification, + new_pending_notification)); + } } if (event.what_happened == SyncerEvent::PAUSED) { @@ -1904,7 +1919,8 @@ void SyncManager::SyncInternal::OnNotificationStateChange( if (syncer_thread()) { syncer_thread()->SetNotificationsEnabled(notifications_enabled); } - if (notifications_enabled) { + if ((notification_method_ != browser_sync::NOTIFICATION_SERVER) && + notifications_enabled) { // Send a notification as soon as subscriptions are on // (see http://code.google.com/p/chromium/issues/detail?id=38563 ). core_message_loop_->PostTask( diff --git a/chrome/browser/sync/notification_method.cc b/chrome/browser/sync/notification_method.cc index aec4a9c..9c00344 100644 --- a/chrome/browser/sync/notification_method.cc +++ b/chrome/browser/sync/notification_method.cc @@ -24,6 +24,9 @@ std::string NotificationMethodToString( case NOTIFICATION_NEW: return "NOTIFICATION_NEW"; break; + case NOTIFICATION_SERVER: + return "NOTIFICATION_SERVER"; + break; default: LOG(WARNING) << "Unknown value for notification method: " << notification_method; @@ -39,6 +42,8 @@ NotificationMethod StringToNotificationMethod(const std::string& str) { return NOTIFICATION_TRANSITIONAL; } else if (str == "new") { return NOTIFICATION_NEW; + } else if (str == "server") { + return NOTIFICATION_SERVER; } LOG(WARNING) << "Unknown notification method \"" << str << "\"; using method " diff --git a/chrome/browser/sync/notification_method.h b/chrome/browser/sync/notification_method.h index 569f8d2..4e1bcd1 100644 --- a/chrome/browser/sync/notification_method.h +++ b/chrome/browser/sync/notification_method.h @@ -10,7 +10,7 @@ namespace browser_sync { // This is the matrix for the interaction between clients with -// different notification methods: +// different notification methods (except for NOTIFICATION_SERVER): // // Listen // L T N @@ -24,6 +24,12 @@ namespace browser_sync { // will receive notifications from a client sending with the row // notification method. 'E' means means that the notification will be // an empty one, which may be dropped by the server in the future. +// +// As for NOTIFICATION_SERVER, server-issued notifications will also +// simulate a peer-issued notification, so that any client with +// NOTIFICATION_TRANSITIONAL or NOTIFICATION_NEW will be able to +// receive those, too. This support will be removed once everyone is +// on NOTIFICATION_SERVER. enum NotificationMethod { // Old, broken notification method. Works only if notification @@ -33,9 +39,17 @@ enum NotificationMethod { // notifications if the notification servers don't drop empty // notifications. NOTIFICATION_TRANSITIONAL, - // New, ideal notification method. Compatible only with - // transitional notifications. + // New notification method. Compatible only with transitional + // notifications. + // + // NOTE: "New" is kind of a misnomer, as it refers only to + // peer-issued notifications; the plan is to migrate everyone to + // using NOTIFICATION_SERVER. NOTIFICATION_NEW, + + // Server-issued notifications. Compatible only with transitional + // notifications. + NOTIFICATION_SERVER, }; extern const NotificationMethod kDefaultNotificationMethod; @@ -43,8 +57,8 @@ extern const NotificationMethod kDefaultNotificationMethod; std::string NotificationMethodToString( NotificationMethod notification_method); -// If the given string is not one of "legacy", "transitional", or -// "new", returns kDefaultNotificationMethod. +// If the given string is not one of "legacy", "transitional", "new", +// or "server", returns kDefaultNotificationMethod. NotificationMethod StringToNotificationMethod(const std::string& str); } // namespace browser_sync diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc new file mode 100644 index 0000000..dbc3d54 --- /dev/null +++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc @@ -0,0 +1,243 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" + +#include "base/base64.h" +#include "base/callback.h" +#include "base/logging.h" +#include "chrome/common/net/notifier/listener/xml_element_util.h" +#include "google/cacheinvalidation/invalidation-client.h" +#include "talk/xmpp/constants.h" +#include "talk/xmpp/jid.h" +#include "talk/xmpp/xmppclient.h" +#include "talk/xmpp/xmpptask.h" + +namespace sync_notifier { + +namespace { + +const char kBotJid[] = "tango@bot.talk.google.com"; + +// TODO(akalin): Eliminate use of 'tango' name. + +// TODO(akalin): Hash out details of tango IQ protocol. + +static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); +static const buzz::QName kQnTangoIqPacketContent( + "google:tango", "content"); + +// TODO(akalin): Move these task classes out so that they can be +// unit-tested. This'll probably be done easier once we consolidate +// all the packet sending/receiving classes. + +// A task that listens for ClientInvalidation messages and calls the +// given callback on them. +class CacheInvalidationListenTask : public buzz::XmppTask { + public: + // Takes ownership of callback. + CacheInvalidationListenTask(Task* parent, + Callback1<const std::string&>::Type* callback) + : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} + virtual ~CacheInvalidationListenTask() {} + + virtual int ProcessStart() { + LOG(INFO) << "CacheInvalidationListenTask started"; + return STATE_RESPONSE; + } + + virtual int ProcessResponse() { + const buzz::XmlElement* stanza = NextStanza(); + if (stanza == NULL) { + LOG(INFO) << "CacheInvalidationListenTask blocked"; + return STATE_BLOCKED; + } + LOG(INFO) << "CacheInvalidationListenTask response received"; + std::string data; + if (GetTangoIqPacketData(stanza, &data)) { + callback_->Run(data); + } else { + LOG(ERROR) << "Could not get packet data"; + } + // Acknowledge receipt of the iq to the buzz server. + // TODO(akalin): Send an error response for malformed packets. + scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); + SendStanza(response_stanza.get()); + return STATE_RESPONSE; + } + + virtual bool HandleStanza(const buzz::XmlElement* stanza) { + LOG(INFO) << "Stanza received: " + << notifier::XmlElementToString(*stanza); + if (IsValidTangoIqPacket(stanza)) { + LOG(INFO) << "Queueing stanza"; + QueueStanza(stanza); + return true; + } + LOG(INFO) << "Stanza skipped"; + return false; + } + + private: + bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { + return + (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && + (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); + } + + bool GetTangoIqPacketData(const buzz::XmlElement* stanza, + std::string* data) { + DCHECK(IsValidTangoIqPacket(stanza)); + const buzz::XmlElement* tango_iq_packet = + stanza->FirstNamed(kQnTangoIqPacket); + if (!tango_iq_packet) { + LOG(ERROR) << "Could not find tango IQ packet element"; + return false; + } + const buzz::XmlElement* tango_iq_packet_content = + tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); + if (!tango_iq_packet_content) { + LOG(ERROR) << "Could not find tango IQ packet content element"; + return false; + } + *data = tango_iq_packet_content->BodyText(); + return true; + } + + scoped_ptr<Callback1<const std::string&>::Type> callback_; + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); +}; + +// A task that sends a single outbound ClientInvalidation message. +class CacheInvalidationSendMessageTask : public buzz::XmppTask { + public: + CacheInvalidationSendMessageTask(Task* parent, + const buzz::Jid& to_jid, + const std::string& msg) + : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), + to_jid_(to_jid), msg_(msg) {} + virtual ~CacheInvalidationSendMessageTask() {} + + virtual int ProcessStart() { + scoped_ptr<buzz::XmlElement> stanza( + MakeTangoIqPacket(to_jid_, task_id(), msg_)); + LOG(INFO) << "Sending message: " + << notifier::XmlElementToString(*stanza.get()); + if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { + LOG(INFO) << "Error when sending message"; + return STATE_ERROR; + } + return STATE_RESPONSE; + } + + virtual int ProcessResponse() { + const buzz::XmlElement* stanza = NextStanza(); + if (stanza == NULL) { + LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; + return STATE_BLOCKED; + } + LOG(INFO) << "CacheInvalidationSendMessageTask response received: " + << notifier::XmlElementToString(*stanza); + // TODO(akalin): Handle errors here. + return STATE_DONE; + } + + virtual bool HandleStanza(const buzz::XmlElement* stanza) { + LOG(INFO) << "Stanza received: " + << notifier::XmlElementToString(*stanza); + if (!MatchResponseIq(stanza, to_jid_, task_id())) { + LOG(INFO) << "Stanza skipped"; + return false; + } + LOG(INFO) << "Queueing stanza"; + QueueStanza(stanza); + return true; + } + + private: + static buzz::XmlElement* MakeTangoIqPacket( + const buzz::Jid& to_jid, + const std::string& task_id, + const std::string& msg) { + buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); + buzz::XmlElement* tango_iq_packet = + new buzz::XmlElement(kQnTangoIqPacket, true); + iq->AddElement(tango_iq_packet); + buzz::XmlElement* tango_iq_packet_content = + new buzz::XmlElement(kQnTangoIqPacketContent, true); + tango_iq_packet->AddElement(tango_iq_packet_content); + tango_iq_packet_content->SetBodyText(msg); + return iq; + } + + const buzz::Jid to_jid_; + std::string msg_; + + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); +}; + +} // namespace + +CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( + buzz::XmppClient* xmpp_client, + invalidation::InvalidationClient* invalidation_client) + : xmpp_client_(xmpp_client), + invalidation_client_(invalidation_client) { + CHECK(xmpp_client_); + CHECK(invalidation_client_); + invalidation::NetworkEndpoint* network_endpoint = + invalidation_client_->network_endpoint(); + CHECK(network_endpoint); + network_endpoint->RegisterOutboundListener( + invalidation::NewPermanentCallback( + this, + &CacheInvalidationPacketHandler::HandleOutboundPacket)); + // Owned by xmpp_client. + CacheInvalidationListenTask* listen_task = + new CacheInvalidationListenTask( + xmpp_client, NewCallback( + this, &CacheInvalidationPacketHandler::HandleInboundPacket)); + listen_task->Start(); +} + +CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { + invalidation::NetworkEndpoint* network_endpoint = + invalidation_client_->network_endpoint(); + CHECK(network_endpoint); + network_endpoint->RegisterOutboundListener(NULL); +} + +void CacheInvalidationPacketHandler::HandleOutboundPacket( + invalidation::NetworkEndpoint* const& network_endpoint) { + CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint()); + invalidation::string message; + network_endpoint->TakeOutboundMessage(&message); + std::string encoded_message; + if (!base::Base64Encode(message, &encoded_message)) { + LOG(ERROR) << "Could not base64-encode message to send: " + << message; + return; + } + // Owned by xmpp_client. + CacheInvalidationSendMessageTask* send_message_task = + new CacheInvalidationSendMessageTask(xmpp_client_, + buzz::Jid(kBotJid), + encoded_message); + send_message_task->Start(); +} + +void CacheInvalidationPacketHandler::HandleInboundPacket( + const std::string& packet) { + invalidation::NetworkEndpoint* network_endpoint = + invalidation_client_->network_endpoint(); + std::string decoded_message; + if (!base::Base64Decode(packet, &decoded_message)) { + LOG(ERROR) << "Could not base64-decode received message: " + << packet; + return; + } + network_endpoint->HandleInboundMessage(decoded_message); +} + +} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h new file mode 100644 index 0000000..2ab1b3d --- /dev/null +++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h @@ -0,0 +1,58 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Class that handles the details of sending and receiving client +// invalidation packets. + +#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_ +#define CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_ + +#include <string> + +#include "base/basictypes.h" +#include "talk/xmpp/jid.h" + +namespace buzz { +class XmppClient; +} // namespace buzz + +namespace invalidation { +class InvalidationClient; +class NetworkEndpoint; +} // namespace invalidation + +namespace sync_notifier { + +// TODO(akalin): Add a NonThreadSafe member to this class and use it. + +class CacheInvalidationPacketHandler { + public: + // Starts routing packets from |invalidation_client| through + // |xmpp_client|. |invalidation_client| must not already be routing + // packets through something. Does not take ownership of + // |xmpp_client| or |invalidation_client|. + CacheInvalidationPacketHandler( + buzz::XmppClient* xmpp_client, + invalidation::InvalidationClient* invalidation_client); + + // Makes the invalidation client passed into the constructor not + // route packets through the XMPP client passed into the constructor + // anymore. + ~CacheInvalidationPacketHandler(); + + private: + void HandleOutboundPacket( + invalidation::NetworkEndpoint* const& network_endpoint); + + void HandleInboundPacket(const std::string& packet); + + buzz::XmppClient* xmpp_client_; + invalidation::InvalidationClient* invalidation_client_; + + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler); +}; + +} // namespace sync_notifier + +#endif // CHROME_BROWSER_SYNC_NOTIFIER_CACHE_INVALIDATION_PACKET_HANDLER_H_ diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.cc b/chrome/browser/sync/notifier/chrome_invalidation_client.cc new file mode 100644 index 0000000..3191b8d --- /dev/null +++ b/chrome/browser/sync/notifier/chrome_invalidation_client.cc @@ -0,0 +1,74 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" + +#include <string> +#include <vector> + +#include "base/logging.h" +#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" +#include "chrome/browser/sync/notifier/invalidation_util.h" +#include "google/cacheinvalidation/invalidation-client-impl.h" + +namespace sync_notifier { + +ChromeInvalidationClient::ChromeInvalidationClient() { + DCHECK(non_thread_safe_.CalledOnValidThread()); +} + +ChromeInvalidationClient::~ChromeInvalidationClient() { + DCHECK(non_thread_safe_.CalledOnValidThread()); + Stop(); +} + +void ChromeInvalidationClient::Start( + const std::string& app_name, + invalidation::InvalidationListener* listener, + buzz::XmppClient* xmpp_client) { + DCHECK(non_thread_safe_.CalledOnValidThread()); + Stop(); + + chrome_system_resources_.StartScheduler(); + + invalidation::ClientType client_type; + client_type.set_type(invalidation::ClientType::CHROME_SYNC); + invalidation::ClientConfig ticl_config; + invalidation_client_.reset( + new invalidation::InvalidationClientImpl( + &chrome_system_resources_, client_type, app_name, listener, + ticl_config)); + cache_invalidation_packet_handler_.reset( + new CacheInvalidationPacketHandler(xmpp_client, + invalidation_client_.get())); +} + +void ChromeInvalidationClient::Stop() { + DCHECK(non_thread_safe_.CalledOnValidThread()); + if (!invalidation_client_.get()) { + DCHECK(!cache_invalidation_packet_handler_.get()); + return; + } + + chrome_system_resources_.StopScheduler(); + + cache_invalidation_packet_handler_.reset(); + invalidation_client_.reset(); +} + +void ChromeInvalidationClient::Register( + const invalidation::ObjectId& oid, + invalidation::RegistrationCallback* callback) { + DCHECK(non_thread_safe_.CalledOnValidThread()); + invalidation_client_->Register(oid, callback); +} + +void ChromeInvalidationClient::Unregister( + const invalidation::ObjectId& oid, + invalidation::RegistrationCallback* callback) { + DCHECK(non_thread_safe_.CalledOnValidThread()); + invalidation_client_->Unregister(oid, callback); +} + +} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/chrome_invalidation_client.h b/chrome/browser/sync/notifier/chrome_invalidation_client.h new file mode 100644 index 0000000..e3f67b1 --- /dev/null +++ b/chrome/browser/sync/notifier/chrome_invalidation_client.h @@ -0,0 +1,65 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A simple wrapper around invalidation::InvalidationClient that +// handles all the startup/shutdown details and hookups. + +#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_ +#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_ + +#include <string> + +#include "base/basictypes.h" +#include "base/non_thread_safe.h" +#include "base/scoped_ptr.h" +#include "chrome/browser/sync/notifier/chrome_system_resources.h" +#include "google/cacheinvalidation/invalidation-client.h" + +namespace buzz { +class XmppClient; +} // namespace + +namespace sync_notifier { + +class CacheInvalidationPacketHandler; + +// TODO(akalin): Hook this up to a NetworkChangeNotifier so we can +// properly notify invalidation_client_. + +class ChromeInvalidationClient { + public: + ChromeInvalidationClient(); + + ~ChromeInvalidationClient(); + + // Does not take ownership of |listener| nor |xmpp_client|. + void Start( + const std::string& app_name, + invalidation::InvalidationListener* listener, + buzz::XmppClient* xmpp_client); + + void Stop(); + + // The following functions must only be called between calls to + // Start() and Stop(). See invalidation-client.h for documentation. + + void Register(const invalidation::ObjectId& oid, + invalidation::RegistrationCallback* callback); + + void Unregister(const invalidation::ObjectId& oid, + invalidation::RegistrationCallback* callback); + + private: + NonThreadSafe non_thread_safe_; + ChromeSystemResources chrome_system_resources_; + scoped_ptr<invalidation::InvalidationClient> invalidation_client_; + scoped_ptr<CacheInvalidationPacketHandler> + cache_invalidation_packet_handler_; + + DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationClient); +}; + +} // namespace sync_notifier + +#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_INVALIDATION_CLIENT_H_ diff --git a/chrome/browser/sync/notifier/chrome_system_resources.cc b/chrome/browser/sync/notifier/chrome_system_resources.cc new file mode 100644 index 0000000..0ec0210 --- /dev/null +++ b/chrome/browser/sync/notifier/chrome_system_resources.cc @@ -0,0 +1,75 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/notifier/chrome_system_resources.h" + +#include <cstdlib> +#include <string> + +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/string_util.h" +#include "base/task.h" +#include "chrome/browser/sync/notifier/invalidation_util.h" + +namespace sync_notifier { + +ChromeSystemResources::ChromeSystemResources() + : scheduler_active_(false) {} + +ChromeSystemResources::~ChromeSystemResources() { + DCHECK(!scheduler_active_); +} + +invalidation::Time ChromeSystemResources::current_time() { + return base::Time::Now(); +} + +void ChromeSystemResources::StartScheduler() { + DCHECK(!scheduler_active_); + scheduler_active_ = true; +} + +void ChromeSystemResources::StopScheduler() { + DCHECK(scheduler_active_); + scheduler_active_ = false; +} + +void ChromeSystemResources::ScheduleWithDelay( + invalidation::TimeDelta delay, + invalidation::Closure* task) { + if (!scheduler_active_) { + delete task; + return; + } + DCHECK(invalidation::IsCallbackRepeatable(task)); + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + NewRunnableFunction(&RunAndDeleteClosure, task), + delay.InMillisecondsRoundedUp()); +} + +void ChromeSystemResources::ScheduleImmediately( + invalidation::Closure* task) { + if (!scheduler_active_) { + delete task; + return; + } + DCHECK(invalidation::IsCallbackRepeatable(task)); + MessageLoop::current()->PostTask( + FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task)); +} + +void ChromeSystemResources::Log( + LogLevel level, const char* file, int line, + const char* format, ...) { + va_list ap; + va_start(ap, format); + std::string result; + StringAppendV(&result, format, ap); + logging::LogMessage(file, line).stream() << result; + va_end(ap); +} + +} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/chrome_system_resources.h b/chrome/browser/sync/notifier/chrome_system_resources.h new file mode 100644 index 0000000..a8a98df --- /dev/null +++ b/chrome/browser/sync/notifier/chrome_system_resources.h @@ -0,0 +1,50 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Simple system resources class that uses the current message loop +// for scheduling. Assumes the current message loop is already +// running. + +#ifndef CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_ +#define CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_ + +#include "google/cacheinvalidation/invalidation-client.h" + +namespace sync_notifier { + +// TODO(akalin): Add a NonThreadSafe member to this class and use it. + +class ChromeSystemResources : public invalidation::SystemResources { + public: + ChromeSystemResources(); + + ~ChromeSystemResources(); + + virtual invalidation::Time current_time(); + + virtual void StartScheduler(); + + // We assume that the current message loop is stopped shortly after + // this is called, i.e. that any in-flight delayed tasks won't get + // run. + // + // TODO(akalin): Make sure that the above actually holds. Use a + // ScopedRunnableMethodFactory for better safety. + virtual void StopScheduler(); + + virtual void ScheduleWithDelay(invalidation::TimeDelta delay, + invalidation::Closure* task); + + virtual void ScheduleImmediately(invalidation::Closure* task); + + virtual void Log(LogLevel level, const char* file, int line, + const char* format, ...); + + private: + bool scheduler_active_; +}; + +} // namespace sync_notifier + +#endif // CHROME_BROWSER_SYNC_NOTIFIER_CHROME_SYSTEM_RESOURCES_H_ diff --git a/chrome/browser/sync/notifier/invalidation_util.cc b/chrome/browser/sync/notifier/invalidation_util.cc new file mode 100644 index 0000000..2683fae --- /dev/null +++ b/chrome/browser/sync/notifier/invalidation_util.cc @@ -0,0 +1,82 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/notifier/invalidation_util.h" + +#include <sstream> + +namespace sync_notifier { + +void RunAndDeleteClosure(invalidation::Closure* task) { + task->Run(); + delete task; +} + +// We need to write our own protobuf to string functions because we +// use LITE_RUNTIME, which doesn't support DebugString(). + +std::string ObjectIdToString( + const invalidation::ObjectId& object_id) { + std::stringstream ss; + ss << "{ "; + ss << "name: " << object_id.name().string_value() << ", "; + ss << "source: " << object_id.source(); + ss << " }"; + return ss.str(); +} + +std::string StatusToString( + const invalidation::Status& status) { + std::stringstream ss; + ss << "{ "; + ss << "code: " << status.code() << ", "; + ss << "description: " << status.description(); + ss << " }"; + return ss.str(); +} + +std::string InvalidationToString( + const invalidation::Invalidation& invalidation) { + std::stringstream ss; + ss << "{ "; + ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", "; + ss << "version: " << invalidation.version() << ", "; + ss << "components: { "; + const invalidation::ComponentStampLog& component_stamp_log = + invalidation.component_stamp_log(); + for (int i = 0; i < component_stamp_log.stamps_size(); ++i) { + const invalidation::ComponentStamp& component_stamp = + component_stamp_log.stamps(i); + ss << "component: " << component_stamp.component() << ", "; + ss << "time: " << component_stamp.time() << ", "; + } + ss << " }"; + ss << " }"; + return ss.str(); +} + +std::string RegistrationUpdateToString( + const invalidation::RegistrationUpdate& update) { + std::stringstream ss; + ss << "{ "; + ss << "type: " << update.type() << ", "; + ss << "object_id: " << ObjectIdToString(update.object_id()) << ", "; + ss << "version: " << update.version() << ", "; + ss << "sequence_number: " << update.sequence_number(); + ss << " }"; + return ss.str(); +} + +std::string RegistrationUpdateResultToString( + const invalidation::RegistrationUpdateResult& update_result) { + std::stringstream ss; + ss << "{ "; + ss << "operation: " + << RegistrationUpdateToString(update_result.operation()) << ", "; + ss << "status: " << StatusToString(update_result.status()); + ss << " }"; + return ss.str(); +} + +} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/invalidation_util.h b/chrome/browser/sync/notifier/invalidation_util.h new file mode 100644 index 0000000..3ca5fe9 --- /dev/null +++ b/chrome/browser/sync/notifier/invalidation_util.h @@ -0,0 +1,36 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Various utilities for dealing with invalidation data types. + +#ifndef CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_ +#define CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_ + +#include <string> + +#include "google/cacheinvalidation/invalidation-client.h" + +namespace sync_notifier { + +void RunAndDeleteClosure(invalidation::Closure* task); + +// We need to write our own protobuf-to-string functions because we +// use LITE_RUNTIME, which doesn't support DebugString(). + +std::string ObjectIdToString(const invalidation::ObjectId& object_id); + +std::string StatusToString(const invalidation::Status& status); + +std::string InvalidationToString( + const invalidation::Invalidation& invalidation); + +std::string RegistrationUpdateToString( + const invalidation::RegistrationUpdate& update); + +std::string RegistrationUpdateResultToString( + const invalidation::RegistrationUpdateResult& update_result); + +} // namespace sync_notifier + +#endif // CHROME_BROWSER_SYNC_NOTIFIER_INVALIDATION_UTIL_H_ diff --git a/chrome/browser/sync/notifier/server_notifier_thread.cc b/chrome/browser/sync/notifier/server_notifier_thread.cc new file mode 100644 index 0000000..fbfc999 --- /dev/null +++ b/chrome/browser/sync/notifier/server_notifier_thread.cc @@ -0,0 +1,201 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/browser/sync/notifier/server_notifier_thread.h" + +#include <string> +#include <vector> + +#include "base/logging.h" +#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" +#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" +#include "chrome/browser/sync/notifier/chrome_system_resources.h" +#include "chrome/browser/sync/notifier/invalidation_util.h" +#include "chrome/common/net/notifier/listener/notification_defines.h" +#include "google/cacheinvalidation/invalidation-client-impl.h" +#include "talk/xmpp/jid.h" + +namespace sync_notifier { + +ServerNotifierThread::ServerNotifierThread( + chrome_common_net::NetworkChangeNotifierThread* + network_change_notifier_thread) + : notifier::MediatorThreadImpl(network_change_notifier_thread) {} + +ServerNotifierThread::~ServerNotifierThread() {} + +void ServerNotifierThread::ListenForUpdates() { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, + &ServerNotifierThread::StartInvalidationListener)); +} + +void ServerNotifierThread::SubscribeForUpdates( + const std::vector<std::string>& subscribed_services_list) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod( + this, &ServerNotifierThread::RegisterTypesAndSignalSubscribed)); +} + +void ServerNotifierThread::Logout() { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + worker_message_loop()->PostTask( + FROM_HERE, + NewRunnableMethod(this, + &ServerNotifierThread::StopInvalidationListener)); + MediatorThreadImpl::Logout(); +} + +void ServerNotifierThread::SendNotification( + const OutgoingNotificationData& data) { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + NOTREACHED() << "Shouldn't send notifications if " + << "ServerNotifierThread is used"; +} + +void ServerNotifierThread::Invalidate( + const invalidation::Invalidation& invalidation, + invalidation::Closure* callback) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + CHECK(invalidation::IsCallbackRepeatable(callback)); + LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation); + // Signal notification only for the invalidated types. + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &ServerNotifierThread::SignalIncomingNotification)); + RunAndDeleteClosure(callback); + // A real implementation would respond to the invalidation for the + // given object (e.g., refetch the invalidated object). +} + +void ServerNotifierThread::InvalidateAll( + invalidation::Closure* callback) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + CHECK(invalidation::IsCallbackRepeatable(callback)); + LOG(INFO) << "InvalidateAll"; + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &ServerNotifierThread::SignalIncomingNotification)); + RunAndDeleteClosure(callback); +} + +void ServerNotifierThread::AllRegistrationsLost( + invalidation::Closure* callback) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + CHECK(invalidation::IsCallbackRepeatable(callback)); + LOG(INFO) << "AllRegistrationsLost; reregistering"; + RegisterTypes(); + RunAndDeleteClosure(callback); +} + +void ServerNotifierThread::RegistrationLost( + const invalidation::ObjectId& object_id, + invalidation::Closure* callback) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + CHECK(invalidation::IsCallbackRepeatable(callback)); + LOG(INFO) << "RegistrationLost; reregistering: " + << ObjectIdToString(object_id); + RegisterTypes(); + RunAndDeleteClosure(callback); +} + +void ServerNotifierThread::StartInvalidationListener() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + + StopInvalidationListener(); + chrome_invalidation_client_.reset(new ChromeInvalidationClient()); + + // TODO(akalin): If we can figure out a unique per-session key that + // is preserved by the server, we can use that instead of kAppName. + // What this buys us is that we then won't receive any notifications + // that were generated by ourselves. + const std::string kAppName = "cc_sync_listen_notifications"; + chrome_invalidation_client_->Start(kAppName, this, xmpp_client()); +} + +void ServerNotifierThread::RegisterTypesAndSignalSubscribed() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + RegisterTypes(); + parent_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod( + this, + &ServerNotifierThread::SignalSubscribed)); +} + +void ServerNotifierThread::RegisterTypes() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + + // TODO(akalin): This is a giant hack! Make this configurable. Add + // a mapping to/from ModelType. + std::vector<std::string> data_types; + data_types.push_back("AUTOFILL"); + data_types.push_back("BOOKMARK"); + data_types.push_back("EXTENSION"); + data_types.push_back("PASSWORD"); + data_types.push_back("THEME"); + data_types.push_back("TYPED_URL"); + data_types.push_back("PREFERENCE"); + + std::vector<invalidation::ObjectId> object_ids; + + for (std::vector<std::string>::const_iterator it = data_types.begin(); + it != data_types.end(); ++it) { + invalidation::ObjectId object_id; + object_id.mutable_name()->set_string_value(*it); + object_id.set_source(invalidation::ObjectId::CHROME_SYNC); + object_ids.push_back(object_id); + } + + for (std::vector<invalidation::ObjectId>::const_iterator it = + object_ids.begin(); it != object_ids.end(); ++it) { + chrome_invalidation_client_->Register( + *it, + invalidation::NewPermanentCallback( + this, &ServerNotifierThread::RegisterCallback)); + } +} + +void ServerNotifierThread::RegisterCallback( + const invalidation::RegistrationUpdateResult& result) { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + // TODO(akalin): Do something meaningful here. + LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result); +} + +void ServerNotifierThread::SignalSubscribed() { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + if (delegate_) { + delegate_->OnSubscriptionStateChange(true); + } +} + +void ServerNotifierThread::SignalIncomingNotification() { + DCHECK_EQ(MessageLoop::current(), parent_message_loop_); + if (delegate_) { + // TODO(akalin): Fill this in with something meaningful. + IncomingNotificationData notification_data; + delegate_->OnIncomingNotification(notification_data); + } +} + +void ServerNotifierThread::StopInvalidationListener() { + DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + + if (chrome_invalidation_client_.get()) { + // TODO(akalin): Need to do unregisters here? + chrome_invalidation_client_->Stop(); + } + chrome_invalidation_client_.reset(); +} + +} // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/server_notifier_thread.h b/chrome/browser/sync/notifier/server_notifier_thread.h new file mode 100644 index 0000000..9f079e2 --- /dev/null +++ b/chrome/browser/sync/notifier/server_notifier_thread.h @@ -0,0 +1,100 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// This class is the (hackish) way to use the XMPP parts of +// MediatorThread for server-issued notifications. +// +// TODO(akalin): Decomp MediatorThread into an XMPP service part and a +// notifications-specific part and use the XMPP service part for +// server-issued notifications. + +#ifndef CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_ +#define CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_ + +#include <string> +#include <vector> + +#include "base/scoped_ptr.h" +#include "base/task.h" +#include "chrome/common/net/notifier/listener/mediator_thread_impl.h" +#include "google/cacheinvalidation/invalidation-client.h" + +namespace chrome_common_net { +class NetworkChangeNotifierThread; +} // namespace chrome_common_net + +namespace sync_notifier { + +class ChromeInvalidationClient; + +class ServerNotifierThread + : public notifier::MediatorThreadImpl, + public invalidation::InvalidationListener { + public: + explicit ServerNotifierThread( + chrome_common_net::NetworkChangeNotifierThread* + network_change_notifier_thread); + + virtual ~ServerNotifierThread(); + + // Overridden to start listening to server notifications. + virtual void ListenForUpdates(); + + // Overridden to immediately notify the delegate that subscriptions + // (i.e., notifications) are on. Must be called only after a call + // to ListenForUpdates(). + virtual void SubscribeForUpdates( + const std::vector<std::string>& subscribed_services_list); + + // Overridden to also stop listening to server notifications. + virtual void Logout(); + + // Must not be called. + virtual void SendNotification(const OutgoingNotificationData& data); + + // invalidation::InvalidationListener implementation. + + virtual void Invalidate(const invalidation::Invalidation& invalidation, + invalidation::Closure* callback); + + virtual void InvalidateAll(invalidation::Closure* callback); + + virtual void AllRegistrationsLost(invalidation::Closure* callback); + + virtual void RegistrationLost(const invalidation::ObjectId& object_id, + invalidation::Closure* callback); + + private: + // Posted to the worker thread by ListenForUpdates(). + void StartInvalidationListener(); + + // Posted to the worker thread by SubscribeForUpdates(). + void RegisterTypesAndSignalSubscribed(); + + // Register the sync types that we're interested in getting + // notifications for. + void RegisterTypes(); + + // Called when we get a registration response back. + void RegisterCallback( + const invalidation::RegistrationUpdateResult& result); + + // Signal to the delegate that we're subscribed. + void SignalSubscribed(); + + // Signal to the delegate that we have an incoming notification. + void SignalIncomingNotification(); + + // Called by StartInvalidationListener() and posted to the worker + // thread by Stop(). + void StopInvalidationListener(); + + scoped_ptr<ChromeInvalidationClient> chrome_invalidation_client_; +}; + +} // namespace sync_notifier + +DISABLE_RUNNABLE_METHOD_REFCOUNT(sync_notifier::ServerNotifierThread); + +#endif // CHROME_BROWSER_SYNC_NOTIFIER_SERVER_NOTIFIER_THREAD_H_ diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc index 899798ef..2e762ac 100644 --- a/chrome/browser/sync/tools/sync_listen_notifications.cc +++ b/chrome/browser/sync/tools/sync_listen_notifications.cc @@ -2,13 +2,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include <cstdlib> #include <string> -#include <sstream> #include <vector> #include "base/at_exit.h" -#include "base/base64.h" #include "base/command_line.h" #include "base/logging.h" #include "base/message_loop.h" @@ -16,15 +13,17 @@ #include "base/string_util.h" #include "base/task.h" #include "chrome/browser/sync/notification_method.h" +#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" +#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" +#include "chrome/browser/sync/notifier/chrome_system_resources.h" +#include "chrome/browser/sync/notifier/invalidation_util.h" #include "chrome/browser/sync/sync_constants.h" #include "chrome/common/net/notifier/base/task_pump.h" #include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h" #include "chrome/common/net/notifier/listener/listen_task.h" #include "chrome/common/net/notifier/listener/notification_constants.h" #include "chrome/common/net/notifier/listener/subscribe_task.h" -#include "chrome/common/net/notifier/listener/xml_element_util.h" #include "google/cacheinvalidation/invalidation-client.h" -#include "google/cacheinvalidation/invalidation-client-impl.h" #include "talk/base/cryptstring.h" #include "talk/base/logging.h" #include "talk/base/sigslot.h" @@ -194,148 +193,6 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate { } }; -// TODO(akalin): Move all the cache-invalidation-related stuff below -// out of this file once we have a real Chrome-integrated -// implementation. - -void RunAndDeleteClosure(invalidation::Closure* task) { - task->Run(); - delete task; -} - -// Simple system resources class that uses the current message loop -// for scheduling. Assumes the current message loop is already -// running. -class ChromeSystemResources : public invalidation::SystemResources { - public: - ChromeSystemResources() : scheduler_active_(false) {} - - ~ChromeSystemResources() { - CHECK(!scheduler_active_); - } - - virtual invalidation::Time current_time() { - return base::Time::Now(); - } - - virtual void StartScheduler() { - CHECK(!scheduler_active_); - scheduler_active_ = true; - } - - // We assume that the current message loop is stopped shortly after - // this is called, i.e. that any in-flight delayed tasks won't get - // run. - // - // TODO(akalin): Make sure that the above actually holds. - virtual void StopScheduler() { - CHECK(scheduler_active_); - scheduler_active_ = false; - } - - virtual void ScheduleWithDelay(invalidation::TimeDelta delay, - invalidation::Closure* task) { - if (!scheduler_active_) { - delete task; - return; - } - CHECK(invalidation::IsCallbackRepeatable(task)); - MessageLoop::current()->PostDelayedTask( - FROM_HERE, - NewRunnableFunction(&RunAndDeleteClosure, task), - delay.InMillisecondsRoundedUp()); - } - - virtual void ScheduleImmediately(invalidation::Closure* task) { - if (!scheduler_active_) { - delete task; - return; - } - CHECK(invalidation::IsCallbackRepeatable(task)); - MessageLoop::current()->PostTask( - FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task)); - } - - virtual void Log(LogLevel level, const char* file, int line, - const char* format, ...) { - va_list ap; - va_start(ap, format); - std::string result; - StringAppendV(&result, format, ap); - logging::LogMessage(file, line).stream() << result; - va_end(ap); - } - - private: - bool scheduler_active_; -}; - -// We need to write our own protobuf to string functions because we -// use LITE_RUNTIME, which doesn't support DebugString(). - -std::string ObjectIdToString( - const invalidation::ObjectId& object_id) { - std::stringstream ss; - ss << "{ "; - ss << "name: " << object_id.name().string_value() << ", "; - ss << "source: " << object_id.source(); - ss << " }"; - return ss.str(); -} - -std::string StatusToString( - const invalidation::Status& status) { - std::stringstream ss; - ss << "{ "; - ss << "code: " << status.code() << ", "; - ss << "description: " << status.description(); - ss << " }"; - return ss.str(); -} - -std::string InvalidationToString( - const invalidation::Invalidation& invalidation) { - std::stringstream ss; - ss << "{ "; - ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", "; - ss << "version: " << invalidation.version() << ", "; - ss << "components: { "; - const invalidation::ComponentStampLog& component_stamp_log = - invalidation.component_stamp_log(); - for (int i = 0; i < component_stamp_log.stamps_size(); ++i) { - const invalidation::ComponentStamp& component_stamp = - component_stamp_log.stamps(i); - ss << "component: " << component_stamp.component() << ", "; - ss << "time: " << component_stamp.time() << ", "; - } - ss << " }"; - ss << " }"; - return ss.str(); -} - -std::string RegistrationUpdateToString( - const invalidation::RegistrationUpdate& update) { - std::stringstream ss; - ss << "{ "; - ss << "type: " << update.type() << ", "; - ss << "object_id: " << ObjectIdToString(update.object_id()) << ", "; - ss << "version: " << update.version() << ", "; - ss << "sequence_number: " << update.sequence_number(); - ss << " }"; - return ss.str(); -} - -std::string RegistrationUpdateResultToString( - const invalidation::RegistrationUpdateResult& update_result) { - std::stringstream ss; - ss << "{ "; - ss << "operation: " - << RegistrationUpdateToString(update_result.operation()) << ", "; - ss << "status: " << StatusToString(update_result.status()); - ss << " }"; - return ss.str(); -} - // The actual listener for sync notifications from the cache // invalidation service. class ChromeInvalidationListener @@ -346,8 +203,9 @@ class ChromeInvalidationListener virtual void Invalidate(const invalidation::Invalidation& invalidation, invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation); - RunAndDeleteClosure(callback); + LOG(INFO) << "Invalidate: " + << sync_notifier::InvalidationToString(invalidation); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would respond to the invalidation for the // given object (e.g., refetch the invalidated object). } @@ -355,7 +213,7 @@ class ChromeInvalidationListener virtual void InvalidateAll(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "InvalidateAll"; - RunAndDeleteClosure(callback); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would loop over the current registered // data types and send notifications for those. } @@ -363,7 +221,7 @@ class ChromeInvalidationListener virtual void AllRegistrationsLost(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "AllRegistrationsLost"; - RunAndDeleteClosure(callback); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would try to re-register for all // registered data types. } @@ -372,8 +230,8 @@ class ChromeInvalidationListener invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "RegistrationLost: " - << ObjectIdToString(object_id); - RunAndDeleteClosure(callback); + << sync_notifier::ObjectIdToString(object_id); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would try to re-register for this // particular data type. } @@ -382,199 +240,6 @@ class ChromeInvalidationListener DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener); }; -static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); -static const buzz::QName kQnTangoIqPacketContent( - "google:tango", "content"); - -// A task that listens for ClientInvalidation messages and calls the -// given callback on them. -class CacheInvalidationListenTask : public buzz::XmppTask { - public: - // Takes ownership of callback. - CacheInvalidationListenTask(Task* parent, - Callback1<const std::string&>::Type* callback) - : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} - virtual ~CacheInvalidationListenTask() {} - - virtual int ProcessStart() { - LOG(INFO) << "CacheInvalidationListenTask started"; - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - LOG(INFO) << "CacheInvalidationListenTask blocked"; - return STATE_BLOCKED; - } - LOG(INFO) << "CacheInvalidationListenTask response received"; - std::string data; - if (GetTangoIqPacketData(stanza, &data)) { - callback_->Run(data); - } else { - LOG(ERROR) << "Could not get packet data"; - } - // Acknowledge receipt of the iq to the buzz server. - // TODO(akalin): Send an error response for malformed packets. - scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); - SendStanza(response_stanza.get()); - return STATE_RESPONSE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - LOG(INFO) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (IsValidTangoIqPacket(stanza)) { - LOG(INFO) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - LOG(INFO) << "Stanza skipped"; - return false; - } - - private: - bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { - return - (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && - (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); - } - - bool GetTangoIqPacketData(const buzz::XmlElement* stanza, - std::string* data) { - DCHECK(IsValidTangoIqPacket(stanza)); - const buzz::XmlElement* tango_iq_packet = - stanza->FirstNamed(kQnTangoIqPacket); - if (!tango_iq_packet) { - LOG(ERROR) << "Could not find tango IQ packet element"; - return false; - } - const buzz::XmlElement* tango_iq_packet_content = - tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); - if (!tango_iq_packet_content) { - LOG(ERROR) << "Could not find tango IQ packet content element"; - return false; - } - *data = tango_iq_packet_content->BodyText(); - return true; - } - - scoped_ptr<Callback1<const std::string&>::Type> callback_; - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); -}; - -// A task that sends a single outbound ClientInvalidation message. -class CacheInvalidationSendMessageTask : public buzz::XmppTask { - public: - CacheInvalidationSendMessageTask(Task* parent, - const buzz::Jid& to_jid, - const std::string& msg) - : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), - to_jid_(to_jid), msg_(msg) {} - virtual ~CacheInvalidationSendMessageTask() {} - - virtual int ProcessStart() { - scoped_ptr<buzz::XmlElement> stanza( - MakeTangoIqPacket(to_jid_, task_id(), msg_)); - LOG(INFO) << "Sending message: " - << notifier::XmlElementToString(*stanza.get()); - if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { - LOG(INFO) << "Error when sending message"; - return STATE_ERROR; - } - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; - return STATE_BLOCKED; - } - LOG(INFO) << "CacheInvalidationSendMessageTask response received: " - << notifier::XmlElementToString(*stanza); - return STATE_DONE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - LOG(INFO) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (!MatchResponseIq(stanza, to_jid_, task_id())) { - LOG(INFO) << "Stanza skipped"; - return false; - } - LOG(INFO) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - - private: - static buzz::XmlElement* MakeTangoIqPacket( - const buzz::Jid& to_jid, - const std::string& task_id, - const std::string& msg) { - buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); - buzz::XmlElement* tango_iq_packet = - new buzz::XmlElement(kQnTangoIqPacket, true); - iq->AddElement(tango_iq_packet); - buzz::XmlElement* tango_iq_packet_content = - new buzz::XmlElement(kQnTangoIqPacketContent, true); - tango_iq_packet->AddElement(tango_iq_packet_content); - tango_iq_packet_content->SetBodyText(msg); - return iq; - } - - buzz::Jid to_jid_; - std::string msg_; - - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); -}; - -// Class that handles the details of sending and receiving client -// invalidation packets. -class CacheInvalidationPacketHandler { - public: - CacheInvalidationPacketHandler( - buzz::XmppClient* xmpp_client, - invalidation::NetworkEndpoint* network_endpoint, - const buzz::Jid& to_jid) - : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint), - to_jid_(to_jid) { - // Owned by xmpp_client. - CacheInvalidationListenTask* listen_task = - new CacheInvalidationListenTask( - xmpp_client, NewCallback( - this, &CacheInvalidationPacketHandler::HandleInboundPacket)); - listen_task->Start(); - } - - void HandleOutboundPacket(invalidation::NetworkEndpoint* const & - network_endpoint) { - CHECK_EQ(network_endpoint, network_endpoint_); - invalidation::string message; - network_endpoint->TakeOutboundMessage(&message); - std::string encoded_message; - CHECK(base::Base64Encode(message, &encoded_message)); - // Owned by xmpp_client. - CacheInvalidationSendMessageTask* send_message_task = - new CacheInvalidationSendMessageTask(xmpp_client_, - to_jid_, - encoded_message); - send_message_task->Start(); - } - - private: - void HandleInboundPacket(const std::string& packet) { - std::string decoded_message; - CHECK(base::Base64Decode(packet, &decoded_message)); - network_endpoint_->HandleInboundMessage(decoded_message); - } - - buzz::XmppClient* xmpp_client_; - invalidation::NetworkEndpoint* network_endpoint_; - buzz::Jid to_jid_; -}; - // Delegate for server-side notifications. class CacheInvalidationNotifierDelegate : public XmppNotificationClient::Delegate { @@ -601,36 +266,15 @@ class CacheInvalidationNotifierDelegate buzz::XmppClient* xmpp_client) { LOG(INFO) << "Logged in"; - chrome_system_resources_.StartScheduler(); - - invalidation::ClientType client_type; - client_type.set_type(invalidation::ClientType::CHROME_SYNC); // TODO(akalin): app_name should be per-client unique. - std::string app_name = "cc_sync_listen_notifications"; - invalidation::ClientConfig ticl_config; - invalidation_client_.reset( - new invalidation::InvalidationClientImpl( - &chrome_system_resources_, client_type, - app_name, &chrome_invalidation_listener_, ticl_config)); - - invalidation::NetworkEndpoint* network_endpoint = - invalidation_client_->network_endpoint(); - // TODO(akalin): Make tango jid configurable (so we can use - // staging). - buzz::Jid to_jid("tango@bot.talk.google.com/PROD"); - packet_handler_.reset( - new CacheInvalidationPacketHandler(xmpp_client, - network_endpoint, - to_jid)); - - network_endpoint->RegisterOutboundListener( - invalidation::NewPermanentCallback( - packet_handler_.get(), - &CacheInvalidationPacketHandler::HandleOutboundPacket)); + const std::string kAppName = "cc_sync_listen_notifications"; + chrome_invalidation_client_.Start(kAppName, + &chrome_invalidation_listener_, + xmpp_client); for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - invalidation_client_->Register( + chrome_invalidation_client_.Register( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); @@ -643,43 +287,39 @@ class CacheInvalidationNotifierDelegate // TODO(akalin): Figure out the correct place to put this. for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - invalidation_client_->Unregister( + chrome_invalidation_client_.Unregister( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); } - packet_handler_.reset(); - invalidation_client_.reset(); - - chrome_system_resources_.StopScheduler(); + chrome_invalidation_client_.Stop(); } virtual void OnError(buzz::XmppEngine::Error error, int subcode) { LOG(INFO) << "Error: " << error << ", subcode: " << subcode; - packet_handler_.reset(); - invalidation_client_.reset(); - // TODO(akalin): Figure out whether we should stop the scheduler - // here. + // TODO(akalin): Figure out whether we should unregister here, + // too. + chrome_invalidation_client_.Stop(); } private: void RegisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result); + LOG(INFO) << "Registered: " + << sync_notifier::RegistrationUpdateResultToString(result); } void UnregisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result); + LOG(INFO) << "Unregistered: " + << sync_notifier::RegistrationUpdateResultToString(result); } std::vector<invalidation::ObjectId> object_ids_; - ChromeSystemResources chrome_system_resources_; ChromeInvalidationListener chrome_invalidation_listener_; - scoped_ptr<invalidation::InvalidationClient> invalidation_client_; - scoped_ptr<CacheInvalidationPacketHandler> packet_handler_; + sync_notifier::ChromeInvalidationClient chrome_invalidation_client_; }; } // namespace diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp index 792bfbe..3343035 100644 --- a/chrome/browser/sync/tools/sync_tools.gyp +++ b/chrome/browser/sync/tools/sync_tools.gyp @@ -21,6 +21,7 @@ 'dependencies': [ '<(DEPTH)/base/base.gyp:base', '<(DEPTH)/chrome/chrome.gyp:notifier', + '<(DEPTH)/chrome/chrome.gyp:sync_notifier', '<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation', '<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle', ], |