diff options
author | ghc@google.com <ghc@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-27 00:40:21 +0000 |
---|---|---|
committer | ghc@google.com <ghc@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-27 00:40:21 +0000 |
commit | 6f765db725292f9eb8349afc004c933f5cffa352 (patch) | |
tree | eb70892adbd652b07593b1d19f119904ef3504e3 /chrome/browser/sync | |
parent | 449a7fcf7a00939468e8de7f1889db9b9d7be49f (diff) | |
download | chromium_src-6f765db725292f9eb8349afc004c933f5cffa352.zip chromium_src-6f765db725292f9eb8349afc004c933f5cffa352.tar.gz chromium_src-6f765db725292f9eb8349afc004c933f5cffa352.tar.bz2 |
- replace custom <iq>-stanza protocol with push notifications (using PushNotifications* classes) in CacheInvalidationPacketHandler, and update unit test accordingly
- extend notifier::Notification and PushNotificationsSendUpdateTask to allow specification of recipients (and recipient-specific data)
Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=119171
Review URL: http://codereview.chromium.org/9190029
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@119316 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
4 files changed, 97 insertions, 241 deletions
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc index 754984f..53f7ce2 100644 --- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc +++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -6,16 +6,18 @@ #include <string> -#include "base/bind.h" #include "base/base64.h" #include "base/callback.h" #include "base/compiler_specific.h" #include "base/logging.h" #include "base/rand_util.h" #include "base/string_number_conversions.h" +#include "google/cacheinvalidation/v2/client_gateway.pb.h" #include "google/cacheinvalidation/v2/constants.h" #include "google/cacheinvalidation/v2/invalidation-client.h" #include "google/cacheinvalidation/v2/system-resources.h" +#include "jingle/notifier/listener/notification_constants.h" +#include "jingle/notifier/listener/push_notifications_send_update_task.h" #include "jingle/notifier/listener/xml_element_util.h" #include "talk/xmpp/constants.h" #include "talk/xmpp/jid.h" @@ -27,198 +29,7 @@ namespace sync_notifier { namespace { const char kBotJid[] = "tango@bot.talk.google.com"; -const char kServiceUrl[] = "http://www.google.com/chrome/sync"; - -buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); } -buzz::QName GetQnSeq() { return buzz::QName("", "seq"); } -buzz::QName GetQnSid() { return buzz::QName("", "sid"); } -buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); } -buzz::QName GetQnProtocolVersion() { - return buzz::QName("", "protocolVersion"); -} -buzz::QName GetQnChannelContext() { - return buzz::QName("", "channelContext"); -} - -// TODO(akalin): Move these task classes out so that they can be -// unit-tested. This'll probably be done easier once we consolidate -// all the packet sending/receiving classes. - -// A task that listens for ClientInvalidation messages and calls the -// given callback on them. -class CacheInvalidationListenTask : public buzz::XmppTask { - public: - // Takes ownership of callback. - CacheInvalidationListenTask( - buzz::XmppTaskParentInterface* parent, - const base::Callback<void(const std::string&)>& callback, - const base::Callback<void(const std::string&)>& context_change_callback) - : XmppTask(parent, buzz::XmppEngine::HL_TYPE), - callback_(callback), - context_change_callback_(context_change_callback) {} - virtual ~CacheInvalidationListenTask() {} - - virtual int ProcessStart() { - DVLOG(2) << "CacheInvalidationListenTask started"; - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - DVLOG(2) << "CacheInvalidationListenTask blocked"; - return STATE_BLOCKED; - } - DVLOG(2) << "CacheInvalidationListenTask response received"; - std::string data; - if (GetCacheInvalidationIqPacketData(stanza, &data)) { - callback_.Run(data); - } else { - LOG(ERROR) << "Could not get packet data"; - } - // Acknowledge receipt of the iq to the buzz server. - // TODO(akalin): Send an error response for malformed packets. - scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); - SendStanza(response_stanza.get()); - return STATE_RESPONSE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - DVLOG(1) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (IsValidCacheInvalidationIqPacket(stanza)) { - DVLOG(2) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - DVLOG(2) << "Stanza skipped"; - return false; - } - - private: - bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { - // We deliberately minimize the verification we do here: see - // http://crbug.com/71285 . - return MatchRequestIq(stanza, buzz::STR_SET, GetQnData()); - } - - bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, - std::string* data) { - DCHECK(IsValidCacheInvalidationIqPacket(stanza)); - const buzz::XmlElement* cache_invalidation_iq_packet = - stanza->FirstNamed(GetQnData()); - if (!cache_invalidation_iq_packet) { - LOG(ERROR) << "Could not find cache invalidation IQ packet element"; - return false; - } - // Look for a channelContext attribute in the content of the stanza. If - // present, remember it so it can be echoed back. - if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) { - context_change_callback_.Run( - cache_invalidation_iq_packet->Attr(GetQnChannelContext())); - } - *data = cache_invalidation_iq_packet->BodyText(); - return true; - } - - base::Callback<void(const std::string&)> callback_; - base::Callback<void(const std::string&)> context_change_callback_; - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); -}; - -std::string MakeProtocolVersion() { - return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) + - "." + - base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion); -} - -// A task that sends a single outbound ClientInvalidation message. -class CacheInvalidationSendMessageTask : public buzz::XmppTask { - public: - CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent, - const buzz::Jid& to_jid, - const std::string& msg, - int seq, - const std::string& sid, - const std::string& channel_context) - : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), - to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid), - channel_context_(channel_context) {} - virtual ~CacheInvalidationSendMessageTask() {} - - virtual int ProcessStart() { - scoped_ptr<buzz::XmlElement> stanza( - MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, - seq_, sid_, channel_context_)); - DVLOG(1) << "Sending message: " - << notifier::XmlElementToString(*stanza.get()); - if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { - DVLOG(2) << "Error when sending message"; - return STATE_ERROR; - } - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - DVLOG(2) << "CacheInvalidationSendMessageTask blocked..."; - return STATE_BLOCKED; - } - DVLOG(2) << "CacheInvalidationSendMessageTask response received: " - << notifier::XmlElementToString(*stanza); - // TODO(akalin): Handle errors here. - return STATE_DONE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - DVLOG(1) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (!MatchResponseIq(stanza, to_jid_, task_id())) { - DVLOG(2) << "Stanza skipped"; - return false; - } - DVLOG(2) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - - private: - static buzz::XmlElement* MakeCacheInvalidationIqPacket( - const buzz::Jid& to_jid, - const std::string& task_id, - const std::string& msg, - int seq, const std::string& sid, const std::string& channel_context) { - buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); - buzz::XmlElement* cache_invalidation_iq_packet = - new buzz::XmlElement(GetQnData(), true); - iq->AddElement(cache_invalidation_iq_packet); - cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq)); - cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid); - cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl); - cache_invalidation_iq_packet->SetAttr( - GetQnProtocolVersion(), MakeProtocolVersion()); - if (!channel_context.empty()) { - cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(), - channel_context); - } - cache_invalidation_iq_packet->SetBodyText(msg); - return iq; - } - - const buzz::Jid to_jid_; - std::string msg_; - int seq_; - std::string sid_; - const std::string channel_context_; - - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); -}; - -std::string MakeSid() { - uint64 sid = base::RandUint64(); - return std::string("chrome-sync-") + base::Uint64ToString(sid); -} +const char kChannelName[] = "tango_raw"; } // namespace @@ -227,17 +38,11 @@ CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), base_task_(base_task), seq_(0), - sid_(MakeSid()) { + scheduling_hash_(0) { CHECK(base_task_.get()); // Owned by base_task. Takes ownership of the callback. - CacheInvalidationListenTask* listen_task = - new CacheInvalidationListenTask( - base_task_, base::Bind( - &CacheInvalidationPacketHandler::HandleInboundPacket, - weak_factory_.GetWeakPtr()), - base::Bind( - &CacheInvalidationPacketHandler::HandleChannelContextChange, - weak_factory_.GetWeakPtr())); + notifier::PushNotificationsListenTask* listen_task = + new notifier::PushNotificationsListenTask(base_task_, this); listen_task->Start(); } @@ -251,20 +56,25 @@ void CacheInvalidationPacketHandler::SendMessage( if (!base_task_.get()) { return; } - std::string encoded_message; - if (!base::Base64Encode(message, &encoded_message)) { - LOG(ERROR) << "Could not base64-encode message to send: " - << message; - return; + ipc::invalidation::ClientGatewayMessage envelope; + envelope.set_is_client_to_server(true); + if (!service_context_.empty()) { + envelope.set_service_context(service_context_); + envelope.set_rpc_scheduling_hash(scheduling_hash_); } + envelope.set_network_message(message); + + notifier::Recipient recipient; + recipient.to = kBotJid; + notifier::Notification notification; + notification.channel = kChannelName; + notification.recipients.push_back(recipient); + envelope.SerializeToString(¬ification.data); + // Owned by base_task_. - CacheInvalidationSendMessageTask* send_message_task = - new CacheInvalidationSendMessageTask(base_task_, - buzz::Jid(kBotJid), - encoded_message, - seq_, sid_, channel_context_); + notifier::PushNotificationsSendUpdateTask* send_message_task = + new notifier::PushNotificationsSendUpdateTask(base_task_, notification); send_message_task->Start(); - ++seq_; } void CacheInvalidationPacketHandler::SetMessageReceiver( @@ -272,22 +82,45 @@ void CacheInvalidationPacketHandler::SetMessageReceiver( incoming_receiver_.reset(incoming_receiver); } -void CacheInvalidationPacketHandler::HandleInboundPacket( - const std::string& packet) { - DCHECK(non_thread_safe_.CalledOnValidThread()); - std::string decoded_message; - if (!base::Base64Decode(packet, &decoded_message)) { - LOG(ERROR) << "Could not base64-decode received message: " - << packet; - return; - } - incoming_receiver_->Run(decoded_message); +void CacheInvalidationPacketHandler::SendSubscriptionRequest() { + notifier::Subscription subscription; + subscription.channel = kChannelName; + subscription.from = ""; + notifier::SubscriptionList subscription_list; + subscription_list.push_back(subscription); + // Owned by base_task_. + notifier::PushNotificationsSubscribeTask* push_subscription_task = + new notifier::PushNotificationsSubscribeTask( + base_task_, subscription_list, this); + push_subscription_task->Start(); } -void CacheInvalidationPacketHandler::HandleChannelContextChange( - const std::string& context) { +void CacheInvalidationPacketHandler::OnSubscribed() { + // TODO(ghc): Consider whether we should do more here. +} + +void CacheInvalidationPacketHandler::OnSubscriptionError() { + // TODO(ghc): Consider whether we should do more here. +} + +void CacheInvalidationPacketHandler::OnNotificationReceived( + const notifier::Notification& notification) { DCHECK(non_thread_safe_.CalledOnValidThread()); - channel_context_ = context; + const std::string& decoded_message = notification.data; + ipc::invalidation::ClientGatewayMessage envelope; + envelope.ParseFromString(decoded_message); + if (!envelope.IsInitialized()) { + LOG(ERROR) << "Could not parse ClientGatewayMessage: " + << decoded_message; + return; + } + if (envelope.has_service_context()) { + service_context_ = envelope.service_context(); + } + if (envelope.has_rpc_scheduling_hash()) { + scheduling_hash_ = envelope.rpc_scheduling_hash(); + } + incoming_receiver_->Run(envelope.network_message()); } } // namespace sync_notifier diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h index 2316ec6..5378c5e 100644 --- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h +++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // @@ -17,6 +17,8 @@ #include "base/memory/weak_ptr.h" #include "base/threading/non_thread_safe.h" #include "google/cacheinvalidation/v2/system-resources.h" +#include "jingle/notifier/listener/push_notifications_listen_task.h" +#include "jingle/notifier/listener/push_notifications_subscribe_task.h" namespace buzz { class XmppTaskParentInterface; @@ -24,7 +26,9 @@ class XmppTaskParentInterface; namespace sync_notifier { -class CacheInvalidationPacketHandler { +class CacheInvalidationPacketHandler + : public notifier::PushNotificationsSubscribeTaskDelegate, + public notifier::PushNotificationsListenTaskDelegate { public: // Starts routing packets from |invalidation_client| using // |base_task|. |base_task.get()| must still be non-NULL. @@ -44,12 +48,20 @@ class CacheInvalidationPacketHandler { virtual void SetMessageReceiver( invalidation::MessageCallback* incoming_receiver); + // Sends a message requesting a subscription to the notification channel. + virtual void SendSubscriptionRequest(); + + // PushNotificationsSubscribeTaskDelegate implementation. + virtual void OnSubscribed() OVERRIDE; + virtual void OnSubscriptionError() OVERRIDE; + + // PushNotificationsListenTaskDelegate implementation. + virtual void OnNotificationReceived( + const notifier::Notification& notification) OVERRIDE; + private: FRIEND_TEST_ALL_PREFIXES(CacheInvalidationPacketHandlerTest, Basic); - void HandleInboundPacket(const std::string& packet); - void HandleChannelContextChange(const std::string& context); - base::NonThreadSafe non_thread_safe_; base::WeakPtrFactory<CacheInvalidationPacketHandler> weak_factory_; @@ -61,10 +73,10 @@ class CacheInvalidationPacketHandler { // Monotonically increasing sequence number. int seq_; - // Unique session token. - const std::string sid_; - // Channel context. - std::string channel_context_; + // Service context. + std::string service_context_; + // Scheduling hash. + int64 scheduling_hash_; DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler); }; diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc index 08cdcb6..e7d8d06 100644 --- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc +++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc @@ -8,8 +8,10 @@ #include "base/memory/weak_ptr.h" #include "base/message_loop.h" #include "google/cacheinvalidation/v2/callback.h" +#include "google/cacheinvalidation/v2/client_gateway.pb.h" #include "google/cacheinvalidation/v2/system-resources.h" #include "jingle/notifier/base/fake_base_task.h" +#include "jingle/notifier/listener/notification_defines.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" #include "talk/base/task.h" @@ -32,6 +34,13 @@ class MockMessageCallback { class CacheInvalidationPacketHandlerTest : public testing::Test { public: virtual ~CacheInvalidationPacketHandlerTest() {} + + notifier::Notification MakeNotification(const std::string& data) { + notifier::Notification notification; + notification.channel = "tango_raw"; + notification.data = data; + return notification; + } }; TEST_F(CacheInvalidationPacketHandlerTest, Basic) { @@ -46,6 +55,10 @@ TEST_F(CacheInvalidationPacketHandlerTest, Basic) { &callback, &MockMessageCallback::StoreMessage); const char kInboundMessage[] = "non-bogus"; + ipc::invalidation::ClientGatewayMessage envelope; + envelope.set_network_message(kInboundMessage); + std::string serialized; + envelope.SerializeToString(&serialized); { CacheInvalidationPacketHandler handler(fake_base_task.AsWeakPtr()); handler.SetMessageReceiver(mock_message_callback); @@ -54,11 +67,8 @@ TEST_F(CacheInvalidationPacketHandlerTest, Basic) { message_loop.RunAllPending(); { - handler.HandleInboundPacket("bogus"); - std::string inbound_message_encoded; - EXPECT_TRUE( - base::Base64Encode(kInboundMessage, &inbound_message_encoded)); - handler.HandleInboundPacket(inbound_message_encoded); + handler.OnNotificationReceived(MakeNotification("bogus")); + handler.OnNotificationReceived(MakeNotification(serialized)); } // Take care of any tasks posted by HandleOutboundPacket(). diff --git a/chrome/browser/sync/notifier/chrome_system_resources.cc b/chrome/browser/sync/notifier/chrome_system_resources.cc index 474d0b6..db13317 100644 --- a/chrome/browser/sync/notifier/chrome_system_resources.cc +++ b/chrome/browser/sync/notifier/chrome_system_resources.cc @@ -230,6 +230,7 @@ void ChromeNetwork::UpdatePacketHandler( base::Bind(&ChromeNetwork::HandleInboundMessage, weak_factory_.GetWeakPtr()))); } + packet_handler_->SendSubscriptionRequest(); } void ChromeNetwork::HandleInboundMessage(const std::string& incoming_message) { |