summaryrefslogtreecommitdiffstats
path: root/chrome/browser/sync/tools
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-23 01:13:56 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-23 01:13:56 +0000
commit8516471942ded5ca17efd0681f1ae12ac86f3617 (patch)
tree2c2cfb3d1891c638dff5b62d3c68e916814237a7 /chrome/browser/sync/tools
parent11fb5e5771fb2d148e6c8d7245f28642a749a722 (diff)
downloadchromium_src-8516471942ded5ca17efd0681f1ae12ac86f3617.zip
chromium_src-8516471942ded5ca17efd0681f1ae12ac86f3617.tar.gz
chromium_src-8516471942ded5ca17efd0681f1ae12ac86f3617.tar.bz2
Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use --sync-notification-method=server to turn on). BUG=34647 TEST=manually Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=50479 Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=50550 Review URL: http://codereview.chromium.org/2827014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50560 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/tools')
-rw-r--r--chrome/browser/sync/tools/sync_listen_notifications.cc412
-rw-r--r--chrome/browser/sync/tools/sync_tools.gyp1
2 files changed, 27 insertions, 386 deletions
diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc
index 899798ef..2e762ac 100644
--- a/chrome/browser/sync/tools/sync_listen_notifications.cc
+++ b/chrome/browser/sync/tools/sync_listen_notifications.cc
@@ -2,13 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <cstdlib>
#include <string>
-#include <sstream>
#include <vector>
#include "base/at_exit.h"
-#include "base/base64.h"
#include "base/command_line.h"
#include "base/logging.h"
#include "base/message_loop.h"
@@ -16,15 +13,17 @@
#include "base/string_util.h"
#include "base/task.h"
#include "chrome/browser/sync/notification_method.h"
+#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
+#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
+#include "chrome/browser/sync/notifier/chrome_system_resources.h"
+#include "chrome/browser/sync/notifier/invalidation_util.h"
#include "chrome/browser/sync/sync_constants.h"
#include "chrome/common/net/notifier/base/task_pump.h"
#include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h"
#include "chrome/common/net/notifier/listener/listen_task.h"
#include "chrome/common/net/notifier/listener/notification_constants.h"
#include "chrome/common/net/notifier/listener/subscribe_task.h"
-#include "chrome/common/net/notifier/listener/xml_element_util.h"
#include "google/cacheinvalidation/invalidation-client.h"
-#include "google/cacheinvalidation/invalidation-client-impl.h"
#include "talk/base/cryptstring.h"
#include "talk/base/logging.h"
#include "talk/base/sigslot.h"
@@ -194,148 +193,6 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate {
}
};
-// TODO(akalin): Move all the cache-invalidation-related stuff below
-// out of this file once we have a real Chrome-integrated
-// implementation.
-
-void RunAndDeleteClosure(invalidation::Closure* task) {
- task->Run();
- delete task;
-}
-
-// Simple system resources class that uses the current message loop
-// for scheduling. Assumes the current message loop is already
-// running.
-class ChromeSystemResources : public invalidation::SystemResources {
- public:
- ChromeSystemResources() : scheduler_active_(false) {}
-
- ~ChromeSystemResources() {
- CHECK(!scheduler_active_);
- }
-
- virtual invalidation::Time current_time() {
- return base::Time::Now();
- }
-
- virtual void StartScheduler() {
- CHECK(!scheduler_active_);
- scheduler_active_ = true;
- }
-
- // We assume that the current message loop is stopped shortly after
- // this is called, i.e. that any in-flight delayed tasks won't get
- // run.
- //
- // TODO(akalin): Make sure that the above actually holds.
- virtual void StopScheduler() {
- CHECK(scheduler_active_);
- scheduler_active_ = false;
- }
-
- virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
- invalidation::Closure* task) {
- if (!scheduler_active_) {
- delete task;
- return;
- }
- CHECK(invalidation::IsCallbackRepeatable(task));
- MessageLoop::current()->PostDelayedTask(
- FROM_HERE,
- NewRunnableFunction(&RunAndDeleteClosure, task),
- delay.InMillisecondsRoundedUp());
- }
-
- virtual void ScheduleImmediately(invalidation::Closure* task) {
- if (!scheduler_active_) {
- delete task;
- return;
- }
- CHECK(invalidation::IsCallbackRepeatable(task));
- MessageLoop::current()->PostTask(
- FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
- }
-
- virtual void Log(LogLevel level, const char* file, int line,
- const char* format, ...) {
- va_list ap;
- va_start(ap, format);
- std::string result;
- StringAppendV(&result, format, ap);
- logging::LogMessage(file, line).stream() << result;
- va_end(ap);
- }
-
- private:
- bool scheduler_active_;
-};
-
-// We need to write our own protobuf to string functions because we
-// use LITE_RUNTIME, which doesn't support DebugString().
-
-std::string ObjectIdToString(
- const invalidation::ObjectId& object_id) {
- std::stringstream ss;
- ss << "{ ";
- ss << "name: " << object_id.name().string_value() << ", ";
- ss << "source: " << object_id.source();
- ss << " }";
- return ss.str();
-}
-
-std::string StatusToString(
- const invalidation::Status& status) {
- std::stringstream ss;
- ss << "{ ";
- ss << "code: " << status.code() << ", ";
- ss << "description: " << status.description();
- ss << " }";
- return ss.str();
-}
-
-std::string InvalidationToString(
- const invalidation::Invalidation& invalidation) {
- std::stringstream ss;
- ss << "{ ";
- ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
- ss << "version: " << invalidation.version() << ", ";
- ss << "components: { ";
- const invalidation::ComponentStampLog& component_stamp_log =
- invalidation.component_stamp_log();
- for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
- const invalidation::ComponentStamp& component_stamp =
- component_stamp_log.stamps(i);
- ss << "component: " << component_stamp.component() << ", ";
- ss << "time: " << component_stamp.time() << ", ";
- }
- ss << " }";
- ss << " }";
- return ss.str();
-}
-
-std::string RegistrationUpdateToString(
- const invalidation::RegistrationUpdate& update) {
- std::stringstream ss;
- ss << "{ ";
- ss << "type: " << update.type() << ", ";
- ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
- ss << "version: " << update.version() << ", ";
- ss << "sequence_number: " << update.sequence_number();
- ss << " }";
- return ss.str();
-}
-
-std::string RegistrationUpdateResultToString(
- const invalidation::RegistrationUpdateResult& update_result) {
- std::stringstream ss;
- ss << "{ ";
- ss << "operation: "
- << RegistrationUpdateToString(update_result.operation()) << ", ";
- ss << "status: " << StatusToString(update_result.status());
- ss << " }";
- return ss.str();
-}
-
// The actual listener for sync notifications from the cache
// invalidation service.
class ChromeInvalidationListener
@@ -346,8 +203,9 @@ class ChromeInvalidationListener
virtual void Invalidate(const invalidation::Invalidation& invalidation,
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
- LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
- RunAndDeleteClosure(callback);
+ LOG(INFO) << "Invalidate: "
+ << sync_notifier::InvalidationToString(invalidation);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would respond to the invalidation for the
// given object (e.g., refetch the invalidated object).
}
@@ -355,7 +213,7 @@ class ChromeInvalidationListener
virtual void InvalidateAll(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "InvalidateAll";
- RunAndDeleteClosure(callback);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would loop over the current registered
// data types and send notifications for those.
}
@@ -363,7 +221,7 @@ class ChromeInvalidationListener
virtual void AllRegistrationsLost(invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "AllRegistrationsLost";
- RunAndDeleteClosure(callback);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would try to re-register for all
// registered data types.
}
@@ -372,8 +230,8 @@ class ChromeInvalidationListener
invalidation::Closure* callback) {
CHECK(invalidation::IsCallbackRepeatable(callback));
LOG(INFO) << "RegistrationLost: "
- << ObjectIdToString(object_id);
- RunAndDeleteClosure(callback);
+ << sync_notifier::ObjectIdToString(object_id);
+ sync_notifier::RunAndDeleteClosure(callback);
// A real implementation would try to re-register for this
// particular data type.
}
@@ -382,199 +240,6 @@ class ChromeInvalidationListener
DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener);
};
-static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
-static const buzz::QName kQnTangoIqPacketContent(
- "google:tango", "content");
-
-// A task that listens for ClientInvalidation messages and calls the
-// given callback on them.
-class CacheInvalidationListenTask : public buzz::XmppTask {
- public:
- // Takes ownership of callback.
- CacheInvalidationListenTask(Task* parent,
- Callback1<const std::string&>::Type* callback)
- : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
- virtual ~CacheInvalidationListenTask() {}
-
- virtual int ProcessStart() {
- LOG(INFO) << "CacheInvalidationListenTask started";
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- LOG(INFO) << "CacheInvalidationListenTask blocked";
- return STATE_BLOCKED;
- }
- LOG(INFO) << "CacheInvalidationListenTask response received";
- std::string data;
- if (GetTangoIqPacketData(stanza, &data)) {
- callback_->Run(data);
- } else {
- LOG(ERROR) << "Could not get packet data";
- }
- // Acknowledge receipt of the iq to the buzz server.
- // TODO(akalin): Send an error response for malformed packets.
- scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
- SendStanza(response_stanza.get());
- return STATE_RESPONSE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- LOG(INFO) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (IsValidTangoIqPacket(stanza)) {
- LOG(INFO) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
- LOG(INFO) << "Stanza skipped";
- return false;
- }
-
- private:
- bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
- return
- (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
- (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
- }
-
- bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
- std::string* data) {
- DCHECK(IsValidTangoIqPacket(stanza));
- const buzz::XmlElement* tango_iq_packet =
- stanza->FirstNamed(kQnTangoIqPacket);
- if (!tango_iq_packet) {
- LOG(ERROR) << "Could not find tango IQ packet element";
- return false;
- }
- const buzz::XmlElement* tango_iq_packet_content =
- tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
- if (!tango_iq_packet_content) {
- LOG(ERROR) << "Could not find tango IQ packet content element";
- return false;
- }
- *data = tango_iq_packet_content->BodyText();
- return true;
- }
-
- scoped_ptr<Callback1<const std::string&>::Type> callback_;
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
-};
-
-// A task that sends a single outbound ClientInvalidation message.
-class CacheInvalidationSendMessageTask : public buzz::XmppTask {
- public:
- CacheInvalidationSendMessageTask(Task* parent,
- const buzz::Jid& to_jid,
- const std::string& msg)
- : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
- to_jid_(to_jid), msg_(msg) {}
- virtual ~CacheInvalidationSendMessageTask() {}
-
- virtual int ProcessStart() {
- scoped_ptr<buzz::XmlElement> stanza(
- MakeTangoIqPacket(to_jid_, task_id(), msg_));
- LOG(INFO) << "Sending message: "
- << notifier::XmlElementToString(*stanza.get());
- if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
- LOG(INFO) << "Error when sending message";
- return STATE_ERROR;
- }
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
- return STATE_BLOCKED;
- }
- LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
- << notifier::XmlElementToString(*stanza);
- return STATE_DONE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- LOG(INFO) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (!MatchResponseIq(stanza, to_jid_, task_id())) {
- LOG(INFO) << "Stanza skipped";
- return false;
- }
- LOG(INFO) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
-
- private:
- static buzz::XmlElement* MakeTangoIqPacket(
- const buzz::Jid& to_jid,
- const std::string& task_id,
- const std::string& msg) {
- buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
- buzz::XmlElement* tango_iq_packet =
- new buzz::XmlElement(kQnTangoIqPacket, true);
- iq->AddElement(tango_iq_packet);
- buzz::XmlElement* tango_iq_packet_content =
- new buzz::XmlElement(kQnTangoIqPacketContent, true);
- tango_iq_packet->AddElement(tango_iq_packet_content);
- tango_iq_packet_content->SetBodyText(msg);
- return iq;
- }
-
- buzz::Jid to_jid_;
- std::string msg_;
-
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
-};
-
-// Class that handles the details of sending and receiving client
-// invalidation packets.
-class CacheInvalidationPacketHandler {
- public:
- CacheInvalidationPacketHandler(
- buzz::XmppClient* xmpp_client,
- invalidation::NetworkEndpoint* network_endpoint,
- const buzz::Jid& to_jid)
- : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint),
- to_jid_(to_jid) {
- // Owned by xmpp_client.
- CacheInvalidationListenTask* listen_task =
- new CacheInvalidationListenTask(
- xmpp_client, NewCallback(
- this, &CacheInvalidationPacketHandler::HandleInboundPacket));
- listen_task->Start();
- }
-
- void HandleOutboundPacket(invalidation::NetworkEndpoint* const &
- network_endpoint) {
- CHECK_EQ(network_endpoint, network_endpoint_);
- invalidation::string message;
- network_endpoint->TakeOutboundMessage(&message);
- std::string encoded_message;
- CHECK(base::Base64Encode(message, &encoded_message));
- // Owned by xmpp_client.
- CacheInvalidationSendMessageTask* send_message_task =
- new CacheInvalidationSendMessageTask(xmpp_client_,
- to_jid_,
- encoded_message);
- send_message_task->Start();
- }
-
- private:
- void HandleInboundPacket(const std::string& packet) {
- std::string decoded_message;
- CHECK(base::Base64Decode(packet, &decoded_message));
- network_endpoint_->HandleInboundMessage(decoded_message);
- }
-
- buzz::XmppClient* xmpp_client_;
- invalidation::NetworkEndpoint* network_endpoint_;
- buzz::Jid to_jid_;
-};
-
// Delegate for server-side notifications.
class CacheInvalidationNotifierDelegate
: public XmppNotificationClient::Delegate {
@@ -601,36 +266,15 @@ class CacheInvalidationNotifierDelegate
buzz::XmppClient* xmpp_client) {
LOG(INFO) << "Logged in";
- chrome_system_resources_.StartScheduler();
-
- invalidation::ClientType client_type;
- client_type.set_type(invalidation::ClientType::CHROME_SYNC);
// TODO(akalin): app_name should be per-client unique.
- std::string app_name = "cc_sync_listen_notifications";
- invalidation::ClientConfig ticl_config;
- invalidation_client_.reset(
- new invalidation::InvalidationClientImpl(
- &chrome_system_resources_, client_type,
- app_name, &chrome_invalidation_listener_, ticl_config));
-
- invalidation::NetworkEndpoint* network_endpoint =
- invalidation_client_->network_endpoint();
- // TODO(akalin): Make tango jid configurable (so we can use
- // staging).
- buzz::Jid to_jid("tango@bot.talk.google.com/PROD");
- packet_handler_.reset(
- new CacheInvalidationPacketHandler(xmpp_client,
- network_endpoint,
- to_jid));
-
- network_endpoint->RegisterOutboundListener(
- invalidation::NewPermanentCallback(
- packet_handler_.get(),
- &CacheInvalidationPacketHandler::HandleOutboundPacket));
+ const std::string kAppName = "cc_sync_listen_notifications";
+ chrome_invalidation_client_.Start(kAppName,
+ &chrome_invalidation_listener_,
+ xmpp_client);
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- invalidation_client_->Register(
+ chrome_invalidation_client_.Register(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
@@ -643,43 +287,39 @@ class CacheInvalidationNotifierDelegate
// TODO(akalin): Figure out the correct place to put this.
for (std::vector<invalidation::ObjectId>::const_iterator it =
object_ids_.begin(); it != object_ids_.end(); ++it) {
- invalidation_client_->Unregister(
+ chrome_invalidation_client_.Unregister(
*it,
invalidation::NewPermanentCallback(
this, &CacheInvalidationNotifierDelegate::RegisterCallback));
}
- packet_handler_.reset();
- invalidation_client_.reset();
-
- chrome_system_resources_.StopScheduler();
+ chrome_invalidation_client_.Stop();
}
virtual void OnError(buzz::XmppEngine::Error error, int subcode) {
LOG(INFO) << "Error: " << error << ", subcode: " << subcode;
- packet_handler_.reset();
- invalidation_client_.reset();
- // TODO(akalin): Figure out whether we should stop the scheduler
- // here.
+ // TODO(akalin): Figure out whether we should unregister here,
+ // too.
+ chrome_invalidation_client_.Stop();
}
private:
void RegisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Registered: "
+ << sync_notifier::RegistrationUpdateResultToString(result);
}
void UnregisterCallback(
const invalidation::RegistrationUpdateResult& result) {
- LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result);
+ LOG(INFO) << "Unregistered: "
+ << sync_notifier::RegistrationUpdateResultToString(result);
}
std::vector<invalidation::ObjectId> object_ids_;
- ChromeSystemResources chrome_system_resources_;
ChromeInvalidationListener chrome_invalidation_listener_;
- scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
- scoped_ptr<CacheInvalidationPacketHandler> packet_handler_;
+ sync_notifier::ChromeInvalidationClient chrome_invalidation_client_;
};
} // namespace
diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp
index 792bfbe..3343035 100644
--- a/chrome/browser/sync/tools/sync_tools.gyp
+++ b/chrome/browser/sync/tools/sync_tools.gyp
@@ -21,6 +21,7 @@
'dependencies': [
'<(DEPTH)/base/base.gyp:base',
'<(DEPTH)/chrome/chrome.gyp:notifier',
+ '<(DEPTH)/chrome/chrome.gyp:sync_notifier',
'<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation',
'<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle',
],