diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-22 18:00:32 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-22 18:00:32 +0000 |
commit | 995b3daaaf10741c1fa8cb2c33b3b36a47a0f917 (patch) | |
tree | b4240ff03da3c5cedff01d0e612b002096c85e05 /chrome/browser/sync/tools | |
parent | 50b997c85d4dfaf2d01de3448314ca15efcec789 (diff) | |
download | chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.zip chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.gz chromium_src-995b3daaaf10741c1fa8cb2c33b3b36a47a0f917.tar.bz2 |
Revert 50479 - Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use
--sync-notification-method=server to turn on).
BUG=34647
TEST=manually
Review URL: http://codereview.chromium.org/2827014
TBR=akalin@chromium.org
Review URL: http://codereview.chromium.org/2805023
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50482 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/tools')
-rw-r--r-- | chrome/browser/sync/tools/sync_listen_notifications.cc | 412 | ||||
-rw-r--r-- | chrome/browser/sync/tools/sync_tools.gyp | 1 |
2 files changed, 386 insertions, 27 deletions
diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc index 2e762ac..899798ef 100644 --- a/chrome/browser/sync/tools/sync_listen_notifications.cc +++ b/chrome/browser/sync/tools/sync_listen_notifications.cc @@ -2,10 +2,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include <cstdlib> #include <string> +#include <sstream> #include <vector> #include "base/at_exit.h" +#include "base/base64.h" #include "base/command_line.h" #include "base/logging.h" #include "base/message_loop.h" @@ -13,17 +16,15 @@ #include "base/string_util.h" #include "base/task.h" #include "chrome/browser/sync/notification_method.h" -#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" -#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" -#include "chrome/browser/sync/notifier/chrome_system_resources.h" -#include "chrome/browser/sync/notifier/invalidation_util.h" #include "chrome/browser/sync/sync_constants.h" #include "chrome/common/net/notifier/base/task_pump.h" #include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h" #include "chrome/common/net/notifier/listener/listen_task.h" #include "chrome/common/net/notifier/listener/notification_constants.h" #include "chrome/common/net/notifier/listener/subscribe_task.h" +#include "chrome/common/net/notifier/listener/xml_element_util.h" #include "google/cacheinvalidation/invalidation-client.h" +#include "google/cacheinvalidation/invalidation-client-impl.h" #include "talk/base/cryptstring.h" #include "talk/base/logging.h" #include "talk/base/sigslot.h" @@ -193,6 +194,148 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate { } }; +// TODO(akalin): Move all the cache-invalidation-related stuff below +// out of this file once we have a real Chrome-integrated +// implementation. + +void RunAndDeleteClosure(invalidation::Closure* task) { + task->Run(); + delete task; +} + +// Simple system resources class that uses the current message loop +// for scheduling. Assumes the current message loop is already +// running. +class ChromeSystemResources : public invalidation::SystemResources { + public: + ChromeSystemResources() : scheduler_active_(false) {} + + ~ChromeSystemResources() { + CHECK(!scheduler_active_); + } + + virtual invalidation::Time current_time() { + return base::Time::Now(); + } + + virtual void StartScheduler() { + CHECK(!scheduler_active_); + scheduler_active_ = true; + } + + // We assume that the current message loop is stopped shortly after + // this is called, i.e. that any in-flight delayed tasks won't get + // run. + // + // TODO(akalin): Make sure that the above actually holds. + virtual void StopScheduler() { + CHECK(scheduler_active_); + scheduler_active_ = false; + } + + virtual void ScheduleWithDelay(invalidation::TimeDelta delay, + invalidation::Closure* task) { + if (!scheduler_active_) { + delete task; + return; + } + CHECK(invalidation::IsCallbackRepeatable(task)); + MessageLoop::current()->PostDelayedTask( + FROM_HERE, + NewRunnableFunction(&RunAndDeleteClosure, task), + delay.InMillisecondsRoundedUp()); + } + + virtual void ScheduleImmediately(invalidation::Closure* task) { + if (!scheduler_active_) { + delete task; + return; + } + CHECK(invalidation::IsCallbackRepeatable(task)); + MessageLoop::current()->PostTask( + FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task)); + } + + virtual void Log(LogLevel level, const char* file, int line, + const char* format, ...) { + va_list ap; + va_start(ap, format); + std::string result; + StringAppendV(&result, format, ap); + logging::LogMessage(file, line).stream() << result; + va_end(ap); + } + + private: + bool scheduler_active_; +}; + +// We need to write our own protobuf to string functions because we +// use LITE_RUNTIME, which doesn't support DebugString(). + +std::string ObjectIdToString( + const invalidation::ObjectId& object_id) { + std::stringstream ss; + ss << "{ "; + ss << "name: " << object_id.name().string_value() << ", "; + ss << "source: " << object_id.source(); + ss << " }"; + return ss.str(); +} + +std::string StatusToString( + const invalidation::Status& status) { + std::stringstream ss; + ss << "{ "; + ss << "code: " << status.code() << ", "; + ss << "description: " << status.description(); + ss << " }"; + return ss.str(); +} + +std::string InvalidationToString( + const invalidation::Invalidation& invalidation) { + std::stringstream ss; + ss << "{ "; + ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", "; + ss << "version: " << invalidation.version() << ", "; + ss << "components: { "; + const invalidation::ComponentStampLog& component_stamp_log = + invalidation.component_stamp_log(); + for (int i = 0; i < component_stamp_log.stamps_size(); ++i) { + const invalidation::ComponentStamp& component_stamp = + component_stamp_log.stamps(i); + ss << "component: " << component_stamp.component() << ", "; + ss << "time: " << component_stamp.time() << ", "; + } + ss << " }"; + ss << " }"; + return ss.str(); +} + +std::string RegistrationUpdateToString( + const invalidation::RegistrationUpdate& update) { + std::stringstream ss; + ss << "{ "; + ss << "type: " << update.type() << ", "; + ss << "object_id: " << ObjectIdToString(update.object_id()) << ", "; + ss << "version: " << update.version() << ", "; + ss << "sequence_number: " << update.sequence_number(); + ss << " }"; + return ss.str(); +} + +std::string RegistrationUpdateResultToString( + const invalidation::RegistrationUpdateResult& update_result) { + std::stringstream ss; + ss << "{ "; + ss << "operation: " + << RegistrationUpdateToString(update_result.operation()) << ", "; + ss << "status: " << StatusToString(update_result.status()); + ss << " }"; + return ss.str(); +} + // The actual listener for sync notifications from the cache // invalidation service. class ChromeInvalidationListener @@ -203,9 +346,8 @@ class ChromeInvalidationListener virtual void Invalidate(const invalidation::Invalidation& invalidation, invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "Invalidate: " - << sync_notifier::InvalidationToString(invalidation); - sync_notifier::RunAndDeleteClosure(callback); + LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation); + RunAndDeleteClosure(callback); // A real implementation would respond to the invalidation for the // given object (e.g., refetch the invalidated object). } @@ -213,7 +355,7 @@ class ChromeInvalidationListener virtual void InvalidateAll(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "InvalidateAll"; - sync_notifier::RunAndDeleteClosure(callback); + RunAndDeleteClosure(callback); // A real implementation would loop over the current registered // data types and send notifications for those. } @@ -221,7 +363,7 @@ class ChromeInvalidationListener virtual void AllRegistrationsLost(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "AllRegistrationsLost"; - sync_notifier::RunAndDeleteClosure(callback); + RunAndDeleteClosure(callback); // A real implementation would try to re-register for all // registered data types. } @@ -230,8 +372,8 @@ class ChromeInvalidationListener invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "RegistrationLost: " - << sync_notifier::ObjectIdToString(object_id); - sync_notifier::RunAndDeleteClosure(callback); + << ObjectIdToString(object_id); + RunAndDeleteClosure(callback); // A real implementation would try to re-register for this // particular data type. } @@ -240,6 +382,199 @@ class ChromeInvalidationListener DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener); }; +static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); +static const buzz::QName kQnTangoIqPacketContent( + "google:tango", "content"); + +// A task that listens for ClientInvalidation messages and calls the +// given callback on them. +class CacheInvalidationListenTask : public buzz::XmppTask { + public: + // Takes ownership of callback. + CacheInvalidationListenTask(Task* parent, + Callback1<const std::string&>::Type* callback) + : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} + virtual ~CacheInvalidationListenTask() {} + + virtual int ProcessStart() { + LOG(INFO) << "CacheInvalidationListenTask started"; + return STATE_RESPONSE; + } + + virtual int ProcessResponse() { + const buzz::XmlElement* stanza = NextStanza(); + if (stanza == NULL) { + LOG(INFO) << "CacheInvalidationListenTask blocked"; + return STATE_BLOCKED; + } + LOG(INFO) << "CacheInvalidationListenTask response received"; + std::string data; + if (GetTangoIqPacketData(stanza, &data)) { + callback_->Run(data); + } else { + LOG(ERROR) << "Could not get packet data"; + } + // Acknowledge receipt of the iq to the buzz server. + // TODO(akalin): Send an error response for malformed packets. + scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); + SendStanza(response_stanza.get()); + return STATE_RESPONSE; + } + + virtual bool HandleStanza(const buzz::XmlElement* stanza) { + LOG(INFO) << "Stanza received: " + << notifier::XmlElementToString(*stanza); + if (IsValidTangoIqPacket(stanza)) { + LOG(INFO) << "Queueing stanza"; + QueueStanza(stanza); + return true; + } + LOG(INFO) << "Stanza skipped"; + return false; + } + + private: + bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { + return + (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && + (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); + } + + bool GetTangoIqPacketData(const buzz::XmlElement* stanza, + std::string* data) { + DCHECK(IsValidTangoIqPacket(stanza)); + const buzz::XmlElement* tango_iq_packet = + stanza->FirstNamed(kQnTangoIqPacket); + if (!tango_iq_packet) { + LOG(ERROR) << "Could not find tango IQ packet element"; + return false; + } + const buzz::XmlElement* tango_iq_packet_content = + tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); + if (!tango_iq_packet_content) { + LOG(ERROR) << "Could not find tango IQ packet content element"; + return false; + } + *data = tango_iq_packet_content->BodyText(); + return true; + } + + scoped_ptr<Callback1<const std::string&>::Type> callback_; + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); +}; + +// A task that sends a single outbound ClientInvalidation message. +class CacheInvalidationSendMessageTask : public buzz::XmppTask { + public: + CacheInvalidationSendMessageTask(Task* parent, + const buzz::Jid& to_jid, + const std::string& msg) + : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), + to_jid_(to_jid), msg_(msg) {} + virtual ~CacheInvalidationSendMessageTask() {} + + virtual int ProcessStart() { + scoped_ptr<buzz::XmlElement> stanza( + MakeTangoIqPacket(to_jid_, task_id(), msg_)); + LOG(INFO) << "Sending message: " + << notifier::XmlElementToString(*stanza.get()); + if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { + LOG(INFO) << "Error when sending message"; + return STATE_ERROR; + } + return STATE_RESPONSE; + } + + virtual int ProcessResponse() { + const buzz::XmlElement* stanza = NextStanza(); + if (stanza == NULL) { + LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; + return STATE_BLOCKED; + } + LOG(INFO) << "CacheInvalidationSendMessageTask response received: " + << notifier::XmlElementToString(*stanza); + return STATE_DONE; + } + + virtual bool HandleStanza(const buzz::XmlElement* stanza) { + LOG(INFO) << "Stanza received: " + << notifier::XmlElementToString(*stanza); + if (!MatchResponseIq(stanza, to_jid_, task_id())) { + LOG(INFO) << "Stanza skipped"; + return false; + } + LOG(INFO) << "Queueing stanza"; + QueueStanza(stanza); + return true; + } + + private: + static buzz::XmlElement* MakeTangoIqPacket( + const buzz::Jid& to_jid, + const std::string& task_id, + const std::string& msg) { + buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); + buzz::XmlElement* tango_iq_packet = + new buzz::XmlElement(kQnTangoIqPacket, true); + iq->AddElement(tango_iq_packet); + buzz::XmlElement* tango_iq_packet_content = + new buzz::XmlElement(kQnTangoIqPacketContent, true); + tango_iq_packet->AddElement(tango_iq_packet_content); + tango_iq_packet_content->SetBodyText(msg); + return iq; + } + + buzz::Jid to_jid_; + std::string msg_; + + DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); +}; + +// Class that handles the details of sending and receiving client +// invalidation packets. +class CacheInvalidationPacketHandler { + public: + CacheInvalidationPacketHandler( + buzz::XmppClient* xmpp_client, + invalidation::NetworkEndpoint* network_endpoint, + const buzz::Jid& to_jid) + : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint), + to_jid_(to_jid) { + // Owned by xmpp_client. + CacheInvalidationListenTask* listen_task = + new CacheInvalidationListenTask( + xmpp_client, NewCallback( + this, &CacheInvalidationPacketHandler::HandleInboundPacket)); + listen_task->Start(); + } + + void HandleOutboundPacket(invalidation::NetworkEndpoint* const & + network_endpoint) { + CHECK_EQ(network_endpoint, network_endpoint_); + invalidation::string message; + network_endpoint->TakeOutboundMessage(&message); + std::string encoded_message; + CHECK(base::Base64Encode(message, &encoded_message)); + // Owned by xmpp_client. + CacheInvalidationSendMessageTask* send_message_task = + new CacheInvalidationSendMessageTask(xmpp_client_, + to_jid_, + encoded_message); + send_message_task->Start(); + } + + private: + void HandleInboundPacket(const std::string& packet) { + std::string decoded_message; + CHECK(base::Base64Decode(packet, &decoded_message)); + network_endpoint_->HandleInboundMessage(decoded_message); + } + + buzz::XmppClient* xmpp_client_; + invalidation::NetworkEndpoint* network_endpoint_; + buzz::Jid to_jid_; +}; + // Delegate for server-side notifications. class CacheInvalidationNotifierDelegate : public XmppNotificationClient::Delegate { @@ -266,15 +601,36 @@ class CacheInvalidationNotifierDelegate buzz::XmppClient* xmpp_client) { LOG(INFO) << "Logged in"; + chrome_system_resources_.StartScheduler(); + + invalidation::ClientType client_type; + client_type.set_type(invalidation::ClientType::CHROME_SYNC); // TODO(akalin): app_name should be per-client unique. - const std::string kAppName = "cc_sync_listen_notifications"; - chrome_invalidation_client_.Start(kAppName, - &chrome_invalidation_listener_, - xmpp_client); + std::string app_name = "cc_sync_listen_notifications"; + invalidation::ClientConfig ticl_config; + invalidation_client_.reset( + new invalidation::InvalidationClientImpl( + &chrome_system_resources_, client_type, + app_name, &chrome_invalidation_listener_, ticl_config)); + + invalidation::NetworkEndpoint* network_endpoint = + invalidation_client_->network_endpoint(); + // TODO(akalin): Make tango jid configurable (so we can use + // staging). + buzz::Jid to_jid("tango@bot.talk.google.com/PROD"); + packet_handler_.reset( + new CacheInvalidationPacketHandler(xmpp_client, + network_endpoint, + to_jid)); + + network_endpoint->RegisterOutboundListener( + invalidation::NewPermanentCallback( + packet_handler_.get(), + &CacheInvalidationPacketHandler::HandleOutboundPacket)); for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - chrome_invalidation_client_.Register( + invalidation_client_->Register( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); @@ -287,39 +643,43 @@ class CacheInvalidationNotifierDelegate // TODO(akalin): Figure out the correct place to put this. for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - chrome_invalidation_client_.Unregister( + invalidation_client_->Unregister( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); } - chrome_invalidation_client_.Stop(); + packet_handler_.reset(); + invalidation_client_.reset(); + + chrome_system_resources_.StopScheduler(); } virtual void OnError(buzz::XmppEngine::Error error, int subcode) { LOG(INFO) << "Error: " << error << ", subcode: " << subcode; + packet_handler_.reset(); + invalidation_client_.reset(); - // TODO(akalin): Figure out whether we should unregister here, - // too. - chrome_invalidation_client_.Stop(); + // TODO(akalin): Figure out whether we should stop the scheduler + // here. } private: void RegisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Registered: " - << sync_notifier::RegistrationUpdateResultToString(result); + LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result); } void UnregisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Unregistered: " - << sync_notifier::RegistrationUpdateResultToString(result); + LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result); } std::vector<invalidation::ObjectId> object_ids_; + ChromeSystemResources chrome_system_resources_; ChromeInvalidationListener chrome_invalidation_listener_; - sync_notifier::ChromeInvalidationClient chrome_invalidation_client_; + scoped_ptr<invalidation::InvalidationClient> invalidation_client_; + scoped_ptr<CacheInvalidationPacketHandler> packet_handler_; }; } // namespace diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp index 3343035..792bfbe 100644 --- a/chrome/browser/sync/tools/sync_tools.gyp +++ b/chrome/browser/sync/tools/sync_tools.gyp @@ -21,7 +21,6 @@ 'dependencies': [ '<(DEPTH)/base/base.gyp:base', '<(DEPTH)/chrome/chrome.gyp:notifier', - '<(DEPTH)/chrome/chrome.gyp:sync_notifier', '<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation', '<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle', ], |