summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync
diff options
context:
space:
mode:
authorghc@google.com <ghc@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2012-01-27 00:40:21 +0000
committerghc@google.com <ghc@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2012-01-27 00:40:21 +0000
commit6f765db725292f9eb8349afc004c933f5cffa352 (patch)
treeeb70892adbd652b07593b1d19f119904ef3504e3 /chrome/browser/sync
parent449a7fcf7a00939468e8de7f1889db9b9d7be49f (diff)
downloadchromium_src-6f765db725292f9eb8349afc004c933f5cffa352.zip
chromium_src-6f765db725292f9eb8349afc004c933f5cffa352.tar.gz
chromium_src-6f765db725292f9eb8349afc004c933f5cffa352.tar.bz2
- replace custom <iq>-stanza protocol with push notifications (using PushNotifications* classes) in CacheInvalidationPacketHandler, and update unit test accordingly
- extend notifier::Notification and PushNotificationsSendUpdateTask to allow specification of recipients (and recipient-specific data) Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=119171 Review URL: http://codereview.chromium.org/9190029 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@119316 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync')
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc287
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.h30
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc20
-rw-r--r--chrome/browser/sync/notifier/chrome_system_resources.cc1
4 files changed, 97 insertions, 241 deletions
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
index 754984f..53f7ce2 100644
--- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
+++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -6,16 +6,18 @@
#include <string>
-#include "base/bind.h"
#include "base/base64.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/rand_util.h"
#include "base/string_number_conversions.h"
+#include "google/cacheinvalidation/v2/client_gateway.pb.h"
#include "google/cacheinvalidation/v2/constants.h"
#include "google/cacheinvalidation/v2/invalidation-client.h"
#include "google/cacheinvalidation/v2/system-resources.h"
+#include "jingle/notifier/listener/notification_constants.h"
+#include "jingle/notifier/listener/push_notifications_send_update_task.h"
#include "jingle/notifier/listener/xml_element_util.h"
#include "talk/xmpp/constants.h"
#include "talk/xmpp/jid.h"
@@ -27,198 +29,7 @@ namespace sync_notifier {
namespace {
const char kBotJid[] = "tango@bot.talk.google.com";
-const char kServiceUrl[] = "http://www.google.com/chrome/sync";
-
-buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); }
-buzz::QName GetQnSeq() { return buzz::QName("", "seq"); }
-buzz::QName GetQnSid() { return buzz::QName("", "sid"); }
-buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); }
-buzz::QName GetQnProtocolVersion() {
- return buzz::QName("", "protocolVersion");
-}
-buzz::QName GetQnChannelContext() {
- return buzz::QName("", "channelContext");
-}
-
-// TODO(akalin): Move these task classes out so that they can be
-// unit-tested. This'll probably be done easier once we consolidate
-// all the packet sending/receiving classes.
-
-// A task that listens for ClientInvalidation messages and calls the
-// given callback on them.
-class CacheInvalidationListenTask : public buzz::XmppTask {
- public:
- // Takes ownership of callback.
- CacheInvalidationListenTask(
- buzz::XmppTaskParentInterface* parent,
- const base::Callback<void(const std::string&)>& callback,
- const base::Callback<void(const std::string&)>& context_change_callback)
- : XmppTask(parent, buzz::XmppEngine::HL_TYPE),
- callback_(callback),
- context_change_callback_(context_change_callback) {}
- virtual ~CacheInvalidationListenTask() {}
-
- virtual int ProcessStart() {
- DVLOG(2) << "CacheInvalidationListenTask started";
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- DVLOG(2) << "CacheInvalidationListenTask blocked";
- return STATE_BLOCKED;
- }
- DVLOG(2) << "CacheInvalidationListenTask response received";
- std::string data;
- if (GetCacheInvalidationIqPacketData(stanza, &data)) {
- callback_.Run(data);
- } else {
- LOG(ERROR) << "Could not get packet data";
- }
- // Acknowledge receipt of the iq to the buzz server.
- // TODO(akalin): Send an error response for malformed packets.
- scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
- SendStanza(response_stanza.get());
- return STATE_RESPONSE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- DVLOG(1) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (IsValidCacheInvalidationIqPacket(stanza)) {
- DVLOG(2) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
- DVLOG(2) << "Stanza skipped";
- return false;
- }
-
- private:
- bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
- // We deliberately minimize the verification we do here: see
- // http://crbug.com/71285 .
- return MatchRequestIq(stanza, buzz::STR_SET, GetQnData());
- }
-
- bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
- std::string* data) {
- DCHECK(IsValidCacheInvalidationIqPacket(stanza));
- const buzz::XmlElement* cache_invalidation_iq_packet =
- stanza->FirstNamed(GetQnData());
- if (!cache_invalidation_iq_packet) {
- LOG(ERROR) << "Could not find cache invalidation IQ packet element";
- return false;
- }
- // Look for a channelContext attribute in the content of the stanza. If
- // present, remember it so it can be echoed back.
- if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) {
- context_change_callback_.Run(
- cache_invalidation_iq_packet->Attr(GetQnChannelContext()));
- }
- *data = cache_invalidation_iq_packet->BodyText();
- return true;
- }
-
- base::Callback<void(const std::string&)> callback_;
- base::Callback<void(const std::string&)> context_change_callback_;
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
-};
-
-std::string MakeProtocolVersion() {
- return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) +
- "." +
- base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion);
-}
-
-// A task that sends a single outbound ClientInvalidation message.
-class CacheInvalidationSendMessageTask : public buzz::XmppTask {
- public:
- CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent,
- const buzz::Jid& to_jid,
- const std::string& msg,
- int seq,
- const std::string& sid,
- const std::string& channel_context)
- : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
- to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid),
- channel_context_(channel_context) {}
- virtual ~CacheInvalidationSendMessageTask() {}
-
- virtual int ProcessStart() {
- scoped_ptr<buzz::XmlElement> stanza(
- MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
- seq_, sid_, channel_context_));
- DVLOG(1) << "Sending message: "
- << notifier::XmlElementToString(*stanza.get());
- if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
- DVLOG(2) << "Error when sending message";
- return STATE_ERROR;
- }
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- DVLOG(2) << "CacheInvalidationSendMessageTask blocked...";
- return STATE_BLOCKED;
- }
- DVLOG(2) << "CacheInvalidationSendMessageTask response received: "
- << notifier::XmlElementToString(*stanza);
- // TODO(akalin): Handle errors here.
- return STATE_DONE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- DVLOG(1) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (!MatchResponseIq(stanza, to_jid_, task_id())) {
- DVLOG(2) << "Stanza skipped";
- return false;
- }
- DVLOG(2) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
-
- private:
- static buzz::XmlElement* MakeCacheInvalidationIqPacket(
- const buzz::Jid& to_jid,
- const std::string& task_id,
- const std::string& msg,
- int seq, const std::string& sid, const std::string& channel_context) {
- buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
- buzz::XmlElement* cache_invalidation_iq_packet =
- new buzz::XmlElement(GetQnData(), true);
- iq->AddElement(cache_invalidation_iq_packet);
- cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq));
- cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid);
- cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl);
- cache_invalidation_iq_packet->SetAttr(
- GetQnProtocolVersion(), MakeProtocolVersion());
- if (!channel_context.empty()) {
- cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(),
- channel_context);
- }
- cache_invalidation_iq_packet->SetBodyText(msg);
- return iq;
- }
-
- const buzz::Jid to_jid_;
- std::string msg_;
- int seq_;
- std::string sid_;
- const std::string channel_context_;
-
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
-};
-
-std::string MakeSid() {
- uint64 sid = base::RandUint64();
- return std::string("chrome-sync-") + base::Uint64ToString(sid);
-}
+const char kChannelName[] = "tango_raw";
} // namespace
@@ -227,17 +38,11 @@ CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
: weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
base_task_(base_task),
seq_(0),
- sid_(MakeSid()) {
+ scheduling_hash_(0) {
CHECK(base_task_.get());
// Owned by base_task. Takes ownership of the callback.
- CacheInvalidationListenTask* listen_task =
- new CacheInvalidationListenTask(
- base_task_, base::Bind(
- &CacheInvalidationPacketHandler::HandleInboundPacket,
- weak_factory_.GetWeakPtr()),
- base::Bind(
- &CacheInvalidationPacketHandler::HandleChannelContextChange,
- weak_factory_.GetWeakPtr()));
+ notifier::PushNotificationsListenTask* listen_task =
+ new notifier::PushNotificationsListenTask(base_task_, this);
listen_task->Start();
}
@@ -251,20 +56,25 @@ void CacheInvalidationPacketHandler::SendMessage(
if (!base_task_.get()) {
return;
}
- std::string encoded_message;
- if (!base::Base64Encode(message, &encoded_message)) {
- LOG(ERROR) << "Could not base64-encode message to send: "
- << message;
- return;
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.set_is_client_to_server(true);
+ if (!service_context_.empty()) {
+ envelope.set_service_context(service_context_);
+ envelope.set_rpc_scheduling_hash(scheduling_hash_);
}
+ envelope.set_network_message(message);
+
+ notifier::Recipient recipient;
+ recipient.to = kBotJid;
+ notifier::Notification notification;
+ notification.channel = kChannelName;
+ notification.recipients.push_back(recipient);
+ envelope.SerializeToString(&notification.data);
+
// Owned by base_task_.
- CacheInvalidationSendMessageTask* send_message_task =
- new CacheInvalidationSendMessageTask(base_task_,
- buzz::Jid(kBotJid),
- encoded_message,
- seq_, sid_, channel_context_);
+ notifier::PushNotificationsSendUpdateTask* send_message_task =
+ new notifier::PushNotificationsSendUpdateTask(base_task_, notification);
send_message_task->Start();
- ++seq_;
}
void CacheInvalidationPacketHandler::SetMessageReceiver(
@@ -272,22 +82,45 @@ void CacheInvalidationPacketHandler::SetMessageReceiver(
incoming_receiver_.reset(incoming_receiver);
}
-void CacheInvalidationPacketHandler::HandleInboundPacket(
- const std::string& packet) {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- std::string decoded_message;
- if (!base::Base64Decode(packet, &decoded_message)) {
- LOG(ERROR) << "Could not base64-decode received message: "
- << packet;
- return;
- }
- incoming_receiver_->Run(decoded_message);
+void CacheInvalidationPacketHandler::SendSubscriptionRequest() {
+ notifier::Subscription subscription;
+ subscription.channel = kChannelName;
+ subscription.from = "";
+ notifier::SubscriptionList subscription_list;
+ subscription_list.push_back(subscription);
+ // Owned by base_task_.
+ notifier::PushNotificationsSubscribeTask* push_subscription_task =
+ new notifier::PushNotificationsSubscribeTask(
+ base_task_, subscription_list, this);
+ push_subscription_task->Start();
}
-void CacheInvalidationPacketHandler::HandleChannelContextChange(
- const std::string& context) {
+void CacheInvalidationPacketHandler::OnSubscribed() {
+ // TODO(ghc): Consider whether we should do more here.
+}
+
+void CacheInvalidationPacketHandler::OnSubscriptionError() {
+ // TODO(ghc): Consider whether we should do more here.
+}
+
+void CacheInvalidationPacketHandler::OnNotificationReceived(
+ const notifier::Notification& notification) {
DCHECK(non_thread_safe_.CalledOnValidThread());
- channel_context_ = context;
+ const std::string& decoded_message = notification.data;
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.ParseFromString(decoded_message);
+ if (!envelope.IsInitialized()) {
+ LOG(ERROR) << "Could not parse ClientGatewayMessage: "
+ << decoded_message;
+ return;
+ }
+ if (envelope.has_service_context()) {
+ service_context_ = envelope.service_context();
+ }
+ if (envelope.has_rpc_scheduling_hash()) {
+ scheduling_hash_ = envelope.rpc_scheduling_hash();
+ }
+ incoming_receiver_->Run(envelope.network_message());
}
} // namespace sync_notifier
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
index 2316ec6..5378c5e 100644
--- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
+++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
@@ -17,6 +17,8 @@
#include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h"
#include "google/cacheinvalidation/v2/system-resources.h"
+#include "jingle/notifier/listener/push_notifications_listen_task.h"
+#include "jingle/notifier/listener/push_notifications_subscribe_task.h"
namespace buzz {
class XmppTaskParentInterface;
@@ -24,7 +26,9 @@ class XmppTaskParentInterface;
namespace sync_notifier {
-class CacheInvalidationPacketHandler {
+class CacheInvalidationPacketHandler
+ : public notifier::PushNotificationsSubscribeTaskDelegate,
+ public notifier::PushNotificationsListenTaskDelegate {
public:
// Starts routing packets from |invalidation_client| using
// |base_task|. |base_task.get()| must still be non-NULL.
@@ -44,12 +48,20 @@ class CacheInvalidationPacketHandler {
virtual void SetMessageReceiver(
invalidation::MessageCallback* incoming_receiver);
+ // Sends a message requesting a subscription to the notification channel.
+ virtual void SendSubscriptionRequest();
+
+ // PushNotificationsSubscribeTaskDelegate implementation.
+ virtual void OnSubscribed() OVERRIDE;
+ virtual void OnSubscriptionError() OVERRIDE;
+
+ // PushNotificationsListenTaskDelegate implementation.
+ virtual void OnNotificationReceived(
+ const notifier::Notification& notification) OVERRIDE;
+
private:
FRIEND_TEST_ALL_PREFIXES(CacheInvalidationPacketHandlerTest, Basic);
- void HandleInboundPacket(const std::string& packet);
- void HandleChannelContextChange(const std::string& context);
-
base::NonThreadSafe non_thread_safe_;
base::WeakPtrFactory<CacheInvalidationPacketHandler> weak_factory_;
@@ -61,10 +73,10 @@ class CacheInvalidationPacketHandler {
// Monotonically increasing sequence number.
int seq_;
- // Unique session token.
- const std::string sid_;
- // Channel context.
- std::string channel_context_;
+ // Service context.
+ std::string service_context_;
+ // Scheduling hash.
+ int64 scheduling_hash_;
DISALLOW_COPY_AND_ASSIGN(CacheInvalidationPacketHandler);
};
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc
index 08cdcb6..e7d8d06 100644
--- a/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc
+++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler_unittest.cc
@@ -8,8 +8,10 @@
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
#include "google/cacheinvalidation/v2/callback.h"
+#include "google/cacheinvalidation/v2/client_gateway.pb.h"
#include "google/cacheinvalidation/v2/system-resources.h"
#include "jingle/notifier/base/fake_base_task.h"
+#include "jingle/notifier/listener/notification_defines.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "talk/base/task.h"
@@ -32,6 +34,13 @@ class MockMessageCallback {
class CacheInvalidationPacketHandlerTest : public testing::Test {
public:
virtual ~CacheInvalidationPacketHandlerTest() {}
+
+ notifier::Notification MakeNotification(const std::string& data) {
+ notifier::Notification notification;
+ notification.channel = "tango_raw";
+ notification.data = data;
+ return notification;
+ }
};
TEST_F(CacheInvalidationPacketHandlerTest, Basic) {
@@ -46,6 +55,10 @@ TEST_F(CacheInvalidationPacketHandlerTest, Basic) {
&callback, &MockMessageCallback::StoreMessage);
const char kInboundMessage[] = "non-bogus";
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.set_network_message(kInboundMessage);
+ std::string serialized;
+ envelope.SerializeToString(&serialized);
{
CacheInvalidationPacketHandler handler(fake_base_task.AsWeakPtr());
handler.SetMessageReceiver(mock_message_callback);
@@ -54,11 +67,8 @@ TEST_F(CacheInvalidationPacketHandlerTest, Basic) {
message_loop.RunAllPending();
{
- handler.HandleInboundPacket("bogus");
- std::string inbound_message_encoded;
- EXPECT_TRUE(
- base::Base64Encode(kInboundMessage, &inbound_message_encoded));
- handler.HandleInboundPacket(inbound_message_encoded);
+ handler.OnNotificationReceived(MakeNotification("bogus"));
+ handler.OnNotificationReceived(MakeNotification(serialized));
}
// Take care of any tasks posted by HandleOutboundPacket().
diff --git a/chrome/browser/sync/notifier/chrome_system_resources.cc b/chrome/browser/sync/notifier/chrome_system_resources.cc
index 474d0b6..db13317 100644
--- a/chrome/browser/sync/notifier/chrome_system_resources.cc
+++ b/chrome/browser/sync/notifier/chrome_system_resources.cc
@@ -230,6 +230,7 @@ void ChromeNetwork::UpdatePacketHandler(
base::Bind(&ChromeNetwork::HandleInboundMessage,
weak_factory_.GetWeakPtr())));
}
+ packet_handler_->SendSubscriptionRequest();
}
void ChromeNetwork::HandleInboundMessage(const std::string& incoming_message) {