summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc')
-rw-r--r--chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc243
1 files changed, 243 insertions, 0 deletions
diff --git a/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
new file mode 100644
index 0000000..dbc3d54
--- /dev/null
+++ b/chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
@@ -0,0 +1,243 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
+
+#include "base/base64.h"
+#include "base/callback.h"
+#include "base/logging.h"
+#include "chrome/common/net/notifier/listener/xml_element_util.h"
+#include "google/cacheinvalidation/invalidation-client.h"
+#include "talk/xmpp/constants.h"
+#include "talk/xmpp/jid.h"
+#include "talk/xmpp/xmppclient.h"
+#include "talk/xmpp/xmpptask.h"
+
+namespace sync_notifier {
+
+namespace {
+
+const char kBotJid[] = "tango@bot.talk.google.com";
+
+// TODO(akalin): Eliminate use of 'tango' name.
+
+// TODO(akalin): Hash out details of tango IQ protocol.
+
+static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
+static const buzz::QName kQnTangoIqPacketContent(
+ "google:tango", "content");
+
+// TODO(akalin): Move these task classes out so that they can be
+// unit-tested. This'll probably be done easier once we consolidate
+// all the packet sending/receiving classes.
+
+// A task that listens for ClientInvalidation messages and calls the
+// given callback on them.
+class CacheInvalidationListenTask : public buzz::XmppTask {
+ public:
+ // Takes ownership of callback.
+ CacheInvalidationListenTask(Task* parent,
+ Callback1<const std::string&>::Type* callback)
+ : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
+ virtual ~CacheInvalidationListenTask() {}
+
+ virtual int ProcessStart() {
+ LOG(INFO) << "CacheInvalidationListenTask started";
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationListenTask blocked";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationListenTask response received";
+ std::string data;
+ if (GetTangoIqPacketData(stanza, &data)) {
+ callback_->Run(data);
+ } else {
+ LOG(ERROR) << "Could not get packet data";
+ }
+ // Acknowledge receipt of the iq to the buzz server.
+ // TODO(akalin): Send an error response for malformed packets.
+ scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
+ SendStanza(response_stanza.get());
+ return STATE_RESPONSE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (IsValidTangoIqPacket(stanza)) {
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+
+ private:
+ bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
+ return
+ (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
+ (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
+ }
+
+ bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
+ std::string* data) {
+ DCHECK(IsValidTangoIqPacket(stanza));
+ const buzz::XmlElement* tango_iq_packet =
+ stanza->FirstNamed(kQnTangoIqPacket);
+ if (!tango_iq_packet) {
+ LOG(ERROR) << "Could not find tango IQ packet element";
+ return false;
+ }
+ const buzz::XmlElement* tango_iq_packet_content =
+ tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
+ if (!tango_iq_packet_content) {
+ LOG(ERROR) << "Could not find tango IQ packet content element";
+ return false;
+ }
+ *data = tango_iq_packet_content->BodyText();
+ return true;
+ }
+
+ scoped_ptr<Callback1<const std::string&>::Type> callback_;
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
+};
+
+// A task that sends a single outbound ClientInvalidation message.
+class CacheInvalidationSendMessageTask : public buzz::XmppTask {
+ public:
+ CacheInvalidationSendMessageTask(Task* parent,
+ const buzz::Jid& to_jid,
+ const std::string& msg)
+ : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
+ to_jid_(to_jid), msg_(msg) {}
+ virtual ~CacheInvalidationSendMessageTask() {}
+
+ virtual int ProcessStart() {
+ scoped_ptr<buzz::XmlElement> stanza(
+ MakeTangoIqPacket(to_jid_, task_id(), msg_));
+ LOG(INFO) << "Sending message: "
+ << notifier::XmlElementToString(*stanza.get());
+ if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
+ LOG(INFO) << "Error when sending message";
+ return STATE_ERROR;
+ }
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
+ << notifier::XmlElementToString(*stanza);
+ // TODO(akalin): Handle errors here.
+ return STATE_DONE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (!MatchResponseIq(stanza, to_jid_, task_id())) {
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+
+ private:
+ static buzz::XmlElement* MakeTangoIqPacket(
+ const buzz::Jid& to_jid,
+ const std::string& task_id,
+ const std::string& msg) {
+ buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
+ buzz::XmlElement* tango_iq_packet =
+ new buzz::XmlElement(kQnTangoIqPacket, true);
+ iq->AddElement(tango_iq_packet);
+ buzz::XmlElement* tango_iq_packet_content =
+ new buzz::XmlElement(kQnTangoIqPacketContent, true);
+ tango_iq_packet->AddElement(tango_iq_packet_content);
+ tango_iq_packet_content->SetBodyText(msg);
+ return iq;
+ }
+
+ const buzz::Jid to_jid_;
+ std::string msg_;
+
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
+};
+
+} // namespace
+
+CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
+ buzz::XmppClient* xmpp_client,
+ invalidation::InvalidationClient* invalidation_client)
+ : xmpp_client_(xmpp_client),
+ invalidation_client_(invalidation_client) {
+ CHECK(xmpp_client_);
+ CHECK(invalidation_client_);
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ CHECK(network_endpoint);
+ network_endpoint->RegisterOutboundListener(
+ invalidation::NewPermanentCallback(
+ this,
+ &CacheInvalidationPacketHandler::HandleOutboundPacket));
+ // Owned by xmpp_client.
+ CacheInvalidationListenTask* listen_task =
+ new CacheInvalidationListenTask(
+ xmpp_client, NewCallback(
+ this, &CacheInvalidationPacketHandler::HandleInboundPacket));
+ listen_task->Start();
+}
+
+CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ CHECK(network_endpoint);
+ network_endpoint->RegisterOutboundListener(NULL);
+}
+
+void CacheInvalidationPacketHandler::HandleOutboundPacket(
+ invalidation::NetworkEndpoint* const& network_endpoint) {
+ CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
+ invalidation::string message;
+ network_endpoint->TakeOutboundMessage(&message);
+ std::string encoded_message;
+ if (!base::Base64Encode(message, &encoded_message)) {
+ LOG(ERROR) << "Could not base64-encode message to send: "
+ << message;
+ return;
+ }
+ // Owned by xmpp_client.
+ CacheInvalidationSendMessageTask* send_message_task =
+ new CacheInvalidationSendMessageTask(xmpp_client_,
+ buzz::Jid(kBotJid),
+ encoded_message);
+ send_message_task->Start();
+}
+
+void CacheInvalidationPacketHandler::HandleInboundPacket(
+ const std::string& packet) {
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ std::string decoded_message;
+ if (!base::Base64Decode(packet, &decoded_message)) {
+ LOG(ERROR) << "Could not base64-decode received message: "
+ << packet;
+ return;
+ }
+ network_endpoint->HandleInboundMessage(decoded_message);
+}
+
+} // namespace sync_notifier