summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync/tools
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-22 18:00:32 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-22 18:00:32 +0000
commit995b3daaaf10741c1fa8cb2c33b3b36a47a0f917 (patch)
treeb4240ff03da3c5cedff01d0e612b002096c85e05 /chrome/browser/sync/tools
parent50b997c85d4dfaf2d01de3448314ca15efcec789 (diff)
downloadchromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.zip
chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.gz
chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.bz2
Revert 50479 - Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use --sync-notification-method=server to turn on). BUG=34647 TEST=manually Review URL: http://codereview.chromium.org/2827014 TBR=akalin@chromium.org Review URL: http://codereview.chromium.org/2805023 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50482 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/tools')
-rw-r--r--chrome/browser/sync/tools/sync_listen_notifications.cc412
-rw-r--r--chrome/browser/sync/tools/sync_tools.gyp1
2 files changed, 386 insertions, 27 deletions
diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc
index 2e762ac..899798ef 100644
--- a/chrome/browser/sync/tools/sync_listen_notifications.cc
+++ b/chrome/browser/sync/tools/sync_listen_notifications.cc
@@ -2,10 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include <cstdlib>
#include <string>
+#include <sstream>
#include <vector>
#include "base/at_exit.h"
+#include "base/base64.h"
#include "base/command_line.h"
#include "base/logging.h"
#include "base/message_loop.h"
@@ -13,17 +16,15 @@
#include "base/string_util.h"
#include "base/task.h"
#include "chrome/browser/sync/notification_method.h"
-#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
-#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
-#include "chrome/browser/sync/notifier/chrome_system_resources.h"
-#include "chrome/browser/sync/notifier/invalidation_util.h"
#include "chrome/browser/sync/sync_constants.h"
#include "chrome/common/net/notifier/base/task_pump.h"
#include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h"
#include "chrome/common/net/notifier/listener/listen_task.h"
#include "chrome/common/net/notifier/listener/notification_constants.h"
#include "chrome/common/net/notifier/listener/subscribe_task.h"
+#include "chrome/common/net/notifier/listener/xml_element_util.h"
#include "google/cacheinvalidation/invalidation-client.h"
+#include "google/cacheinvalidation/invalidation-client-impl.h"
#include "talk/base/cryptstring.h"
#include "talk/base/logging.h"
#include "talk/base/sigslot.h"
@@ -193,6 +194,148 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate {
}
};
+// TODO(akalin): Move all the cache-invalidation-related stuff below
+// out of this file once we have a real Chrome-integrated
+// implementation.
+
+void RunAndDeleteClosure(invalidation::Closure* task) {
+ task->Run();
+ delete task;
+}
+
+// Simple system resources class that uses the current message loop
+// for scheduling. Assumes the current message loop is already
+// running.
+class ChromeSystemResources : public invalidation::SystemResources {
+ public:
+ ChromeSystemResources() : scheduler_active_(false) {}
+
+ ~ChromeSystemResources() {
+ CHECK(!scheduler_active_);
+ }
+
+ virtual invalidation::Time current_time() {
+ return base::Time::Now();
+ }
+
+ virtual void StartScheduler() {
+ CHECK(!scheduler_active_);
+ scheduler_active_ = true;
+ }
+
+ // We assume that the current message loop is stopped shortly after
+ // this is called, i.e. that any in-flight delayed tasks won't get
+ // run.
+ //
+ // TODO(akalin): Make sure that the above actually holds.
+ virtual void StopScheduler() {
+ CHECK(scheduler_active_);
+ scheduler_active_ = false;
+ }
+
+ virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
+ invalidation::Closure* task) {
+ if (!scheduler_active_) {
+ delete task;
+ return;
+ }
+ CHECK(invalidation::IsCallbackRepeatable(task));
+ MessageLoop::current()->PostDelayedTask(
+ FROM_HERE,
+ NewRunnableFunction(&RunAndDeleteClosure, task),
+ delay.InMillisecondsRoundedUp());
+ }
+
+ virtual void ScheduleImmediately(invalidation::Closure* task) {
+ if (!scheduler_active_) {
+ delete task;
+ return;
+ }
+ CHECK(invalidation::IsCallbackRepeatable(task));
+ MessageLoop::current()->PostTask(
+ FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
+ }
+
+ virtual void Log(LogLevel level, const char* file, int line,
+ const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ std::string result;
+ StringAppendV(&result, format, ap);
+ logging::LogMessage(file, line).stream() << result;
+ va_end(ap);
+ }
+
+ private:
+ bool scheduler_active_;
+};
+
+// We need to write our own protobuf to string functions because we
+// use LITE_RUNTIME, which doesn't support DebugString().
+
+std::string ObjectIdToString(
+ const invalidation::ObjectId& object_id) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "name: " << object_id.name().string_value() << ", ";
+ ss << "source: " << object_id.source();
+ ss << " }";
+ return ss.str();
+}
+
+std::string StatusToString(
+ const invalidation::Status& status) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "code: " << status.code() << ", ";
+ ss << "description: " << status.description();
+ ss << " }";
+ return ss.str();
+}
+
+std::string InvalidationToString(
+ const invalidation::Invalidation& invalidation) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
+ ss << "version: " << invalidation.version() << ", ";
+ ss << "components: { ";
+ const invalidation::ComponentStampLog& component_stamp_log =
+ invalidation.component_stamp_log();
+ for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
+ const invalidation::ComponentStamp& component_stamp =
+ component_stamp_log.stamps(i);
+ ss << "component: " << component_stamp.component() << ", ";
+ ss << "time: " << component_stamp.time() << ", ";
+ }
+ ss << " }";
+ ss << " }";
+ return ss.str();
+}
+
+std::string RegistrationUpdateToString(
+ const invalidation::RegistrationUpdate& update) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "type: " << update.type() << ", ";
+ ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
+ ss << "version: " << update.version() << ", ";
+ ss << "sequence_number: " << update.sequence_number();
+ ss << " }";
+ return ss.str();
+}
+
+std::string RegistrationUpdateResultToString(
+ const invalidation::RegistrationUpdateResult& update_result) {
+ std::stringstream ss;
+ ss << "{ ";
+ ss << "operation: "
+ << RegistrationUpdateToString(update_result.operation()) << ", ";
+ ss << "status: " << StatusToString(update_result.status());
+ ss << " }";
+ return ss.str();
+}
+
// The actual listener for sync notifications from the cache
// invalidation service.
class ChromeInvalidationListener
@@ -203,9 +346,8 @@ class ChromeInvalidationListener
virtual void Invalidate(const invalidation::Invalidation& invalidation,
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "Invalidate: "
- << sync_notifier::InvalidationToString(invalidation);
- sync_notifier::RunAndDeleteClosure(callback);
+ LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
+ RunAndDeleteClosure(callback);
// A real implementation would respond to the invalidation for the
// given object (e.g., refetch the invalidated object).
}
@@ -213,7 +355,7 @@ class ChromeInvalidationListener
virtual void InvalidateAll(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "InvalidateAll";
- sync_notifier::RunAndDeleteClosure(callback);
+ RunAndDeleteClosure(callback);
// A real implementation would loop over the current registered
// data types and send notifications for those.
}
@@ -221,7 +363,7 @@ class ChromeInvalidationListener
virtual void AllRegistrationsLost(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "AllRegistrationsLost";
- sync_notifier::RunAndDeleteClosure(callback);
+ RunAndDeleteClosure(callback);
// A real implementation would try to re-register for all
// registered data types.
}
@@ -230,8 +372,8 @@ class ChromeInvalidationListener
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "RegistrationLost: "
- << sync_notifier::ObjectIdToString(object_id);
- sync_notifier::RunAndDeleteClosure(callback);
+ << ObjectIdToString(object_id);
+ RunAndDeleteClosure(callback);
// A real implementation would try to re-register for this
// particular data type.
}
@@ -240,6 +382,199 @@ class ChromeInvalidationListener
DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener);
};
+static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
+static const buzz::QName kQnTangoIqPacketContent(
+ "google:tango", "content");
+
+// A task that listens for ClientInvalidation messages and calls the
+// given callback on them.
+class CacheInvalidationListenTask : public buzz::XmppTask {
+ public:
+ // Takes ownership of callback.
+ CacheInvalidationListenTask(Task* parent,
+ Callback1<const std::string&>::Type* callback)
+ : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
+ virtual ~CacheInvalidationListenTask() {}
+
+ virtual int ProcessStart() {
+ LOG(INFO) << "CacheInvalidationListenTask started";
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationListenTask blocked";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationListenTask response received";
+ std::string data;
+ if (GetTangoIqPacketData(stanza, &data)) {
+ callback_->Run(data);
+ } else {
+ LOG(ERROR) << "Could not get packet data";
+ }
+ // Acknowledge receipt of the iq to the buzz server.
+ // TODO(akalin): Send an error response for malformed packets.
+ scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
+ SendStanza(response_stanza.get());
+ return STATE_RESPONSE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (IsValidTangoIqPacket(stanza)) {
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+
+ private:
+ bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
+ return
+ (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
+ (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
+ }
+
+ bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
+ std::string* data) {
+ DCHECK(IsValidTangoIqPacket(stanza));
+ const buzz::XmlElement* tango_iq_packet =
+ stanza->FirstNamed(kQnTangoIqPacket);
+ if (!tango_iq_packet) {
+ LOG(ERROR) << "Could not find tango IQ packet element";
+ return false;
+ }
+ const buzz::XmlElement* tango_iq_packet_content =
+ tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
+ if (!tango_iq_packet_content) {
+ LOG(ERROR) << "Could not find tango IQ packet content element";
+ return false;
+ }
+ *data = tango_iq_packet_content->BodyText();
+ return true;
+ }
+
+ scoped_ptr<Callback1<const std::string&>::Type> callback_;
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
+};
+
+// A task that sends a single outbound ClientInvalidation message.
+class CacheInvalidationSendMessageTask : public buzz::XmppTask {
+ public:
+ CacheInvalidationSendMessageTask(Task* parent,
+ const buzz::Jid& to_jid,
+ const std::string& msg)
+ : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
+ to_jid_(to_jid), msg_(msg) {}
+ virtual ~CacheInvalidationSendMessageTask() {}
+
+ virtual int ProcessStart() {
+ scoped_ptr<buzz::XmlElement> stanza(
+ MakeTangoIqPacket(to_jid_, task_id(), msg_));
+ LOG(INFO) << "Sending message: "
+ << notifier::XmlElementToString(*stanza.get());
+ if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
+ LOG(INFO) << "Error when sending message";
+ return STATE_ERROR;
+ }
+ return STATE_RESPONSE;
+ }
+
+ virtual int ProcessResponse() {
+ const buzz::XmlElement* stanza = NextStanza();
+ if (stanza == NULL) {
+ LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
+ return STATE_BLOCKED;
+ }
+ LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
+ << notifier::XmlElementToString(*stanza);
+ return STATE_DONE;
+ }
+
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) {
+ LOG(INFO) << "Stanza received: "
+ << notifier::XmlElementToString(*stanza);
+ if (!MatchResponseIq(stanza, to_jid_, task_id())) {
+ LOG(INFO) << "Stanza skipped";
+ return false;
+ }
+ LOG(INFO) << "Queueing stanza";
+ QueueStanza(stanza);
+ return true;
+ }
+
+ private:
+ static buzz::XmlElement* MakeTangoIqPacket(
+ const buzz::Jid& to_jid,
+ const std::string& task_id,
+ const std::string& msg) {
+ buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
+ buzz::XmlElement* tango_iq_packet =
+ new buzz::XmlElement(kQnTangoIqPacket, true);
+ iq->AddElement(tango_iq_packet);
+ buzz::XmlElement* tango_iq_packet_content =
+ new buzz::XmlElement(kQnTangoIqPacketContent, true);
+ tango_iq_packet->AddElement(tango_iq_packet_content);
+ tango_iq_packet_content->SetBodyText(msg);
+ return iq;
+ }
+
+ buzz::Jid to_jid_;
+ std::string msg_;
+
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
+};
+
+// Class that handles the details of sending and receiving client
+// invalidation packets.
+class CacheInvalidationPacketHandler {
+ public:
+ CacheInvalidationPacketHandler(
+ buzz::XmppClient* xmpp_client,
+ invalidation::NetworkEndpoint* network_endpoint,
+ const buzz::Jid& to_jid)
+ : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint),
+ to_jid_(to_jid) {
+ // Owned by xmpp_client.
+ CacheInvalidationListenTask* listen_task =
+ new CacheInvalidationListenTask(
+ xmpp_client, NewCallback(
+ this, &CacheInvalidationPacketHandler::HandleInboundPacket));
+ listen_task->Start();
+ }
+
+ void HandleOutboundPacket(invalidation::NetworkEndpoint* const &
+ network_endpoint) {
+ CHECK_EQ(network_endpoint, network_endpoint_);
+ invalidation::string message;
+ network_endpoint->TakeOutboundMessage(&message);
+ std::string encoded_message;
+ CHECK(base::Base64Encode(message, &encoded_message));
+ // Owned by xmpp_client.
+ CacheInvalidationSendMessageTask* send_message_task =
+ new CacheInvalidationSendMessageTask(xmpp_client_,
+ to_jid_,
+ encoded_message);
+ send_message_task->Start();
+ }
+
+ private:
+ void HandleInboundPacket(const std::string& packet) {
+ std::string decoded_message;
+ CHECK(base::Base64Decode(packet, &decoded_message));
+ network_endpoint_->HandleInboundMessage(decoded_message);
+ }
+
+ buzz::XmppClient* xmpp_client_;
+ invalidation::NetworkEndpoint* network_endpoint_;
+ buzz::Jid to_jid_;
+};
+
// Delegate for server-side notifications.
class CacheInvalidationNotifierDelegate
: public XmppNotificationClient::Delegate {
@@ -266,15 +601,36 @@ class CacheInvalidationNotifierDelegate
buzz::XmppClient* xmpp_client) {
LOG(INFO) << "Logged in";
+ chrome_system_resources_.StartScheduler();
+
+ invalidation::ClientType client_type;
+ client_type.set_type(invalidation::ClientType::CHROME_SYNC);
// TODO(akalin): app_name should be per-client unique.
- const std::string kAppName = "cc_sync_listen_notifications";
- chrome_invalidation_client_.Start(kAppName,
- &chrome_invalidation_listener_,
- xmpp_client);
+ std::string app_name = "cc_sync_listen_notifications";
+ invalidation::ClientConfig ticl_config;
+ invalidation_client_.reset(
+ new invalidation::InvalidationClientImpl(
+ &chrome_system_resources_, client_type,
+ app_name, &chrome_invalidation_listener_, ticl_config));
+
+ invalidation::NetworkEndpoint* network_endpoint =
+ invalidation_client_->network_endpoint();
+ // TODO(akalin): Make tango jid configurable (so we can use
+ // staging).
+ buzz::Jid to_jid("tango@bot.talk.google.com/PROD");
+ packet_handler_.reset(
+ new CacheInvalidationPacketHandler(xmpp_client,
+ network_endpoint,
+ to_jid));
+
+ network_endpoint->RegisterOutboundListener(
+ invalidation::NewPermanentCallback(
+ packet_handler_.get(),
+ &CacheInvalidationPacketHandler::HandleOutboundPacket));
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- chrome_invalidation_client_.Register(
+ invalidation_client_->Register(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
@@ -287,39 +643,43 @@ class CacheInvalidationNotifierDelegate
// TODO(akalin): Figure out the correct place to put this.
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- chrome_invalidation_client_.Unregister(
+ invalidation_client_->Unregister(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
}
- chrome_invalidation_client_.Stop();
+ packet_handler_.reset();
+ invalidation_client_.reset();
+
+ chrome_system_resources_.StopScheduler();
}
virtual void OnError(buzz::XmppEngine::Error error, int subcode) {
LOG(INFO) << "Error: " << error << ", subcode: " << subcode;
+ packet_handler_.reset();
+ invalidation_client_.reset();
- // TODO(akalin): Figure out whether we should unregister here,
- // too.
- chrome_invalidation_client_.Stop();
+ // TODO(akalin): Figure out whether we should stop the scheduler
+ // here.
}
private:
void RegisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Registered: "
- << sync_notifier::RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
}
void UnregisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Unregistered: "
- << sync_notifier::RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result);
}
std::vector<invalidation::ObjectId> object_ids_;
+ ChromeSystemResources chrome_system_resources_;
ChromeInvalidationListener chrome_invalidation_listener_;
- sync_notifier::ChromeInvalidationClient chrome_invalidation_client_;
+ scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
+ scoped_ptr<CacheInvalidationPacketHandler> packet_handler_;
};
} // namespace
diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp
index 3343035..792bfbe 100644
--- a/chrome/browser/sync/tools/sync_tools.gyp
+++ b/chrome/browser/sync/tools/sync_tools.gyp
@@ -21,7 +21,6 @@
'dependencies': [
'<(DEPTH)/base/base.gyp:base',
'<(DEPTH)/chrome/chrome.gyp:notifier',
- '<(DEPTH)/chrome/chrome.gyp:sync_notifier',
'<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
'<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle',
],