diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-23 01:13:56 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-23 01:13:56 +0000 |
commit | 8516471942ded5ca17efd0681f1ae12ac86f3617 (patch) | |
tree | 2c2cfb3d1891c638dff5b62d3c68e916814237a7 /chrome/browser/sync/tools | |
parent | 11fb5e5771fb2d148e6c8d7245f28642a749a722 (diff) | |
download | chromium_src-8516471942ded5ca17efd0681f1ae12ac86f3617.zip chromium_src-8516471942ded5ca17efd0681f1ae12ac86f3617.tar.gz chromium_src-8516471942ded5ca17efd0681f1ae12ac86f3617.tar.bz2 |
Implemented initial version of server-issued notification client.
Added NOTIFICATION_SERVER notification method (use
--sync-notification-method=server to turn on).
BUG=34647
TEST=manually
Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=50479
Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=50550
Review URL: http://codereview.chromium.org/2827014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50560 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/browser/sync/tools')
-rw-r--r-- | chrome/browser/sync/tools/sync_listen_notifications.cc | 412 | ||||
-rw-r--r-- | chrome/browser/sync/tools/sync_tools.gyp | 1 |
2 files changed, 27 insertions, 386 deletions
diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc index 899798ef..2e762ac 100644 --- a/chrome/browser/sync/tools/sync_listen_notifications.cc +++ b/chrome/browser/sync/tools/sync_listen_notifications.cc @@ -2,13 +2,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include <cstdlib> #include <string> -#include <sstream> #include <vector> #include "base/at_exit.h" -#include "base/base64.h" #include "base/command_line.h" #include "base/logging.h" #include "base/message_loop.h" @@ -16,15 +13,17 @@ #include "base/string_util.h" #include "base/task.h" #include "chrome/browser/sync/notification_method.h" +#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" +#include "chrome/browser/sync/notifier/chrome_invalidation_client.h" +#include "chrome/browser/sync/notifier/chrome_system_resources.h" +#include "chrome/browser/sync/notifier/invalidation_util.h" #include "chrome/browser/sync/sync_constants.h" #include "chrome/common/net/notifier/base/task_pump.h" #include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h" #include "chrome/common/net/notifier/listener/listen_task.h" #include "chrome/common/net/notifier/listener/notification_constants.h" #include "chrome/common/net/notifier/listener/subscribe_task.h" -#include "chrome/common/net/notifier/listener/xml_element_util.h" #include "google/cacheinvalidation/invalidation-client.h" -#include "google/cacheinvalidation/invalidation-client-impl.h" #include "talk/base/cryptstring.h" #include "talk/base/logging.h" #include "talk/base/sigslot.h" @@ -194,148 +193,6 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate { } }; -// TODO(akalin): Move all the cache-invalidation-related stuff below -// out of this file once we have a real Chrome-integrated -// implementation. - -void RunAndDeleteClosure(invalidation::Closure* task) { - task->Run(); - delete task; -} - -// Simple system resources class that uses the current message loop -// for scheduling. Assumes the current message loop is already -// running. -class ChromeSystemResources : public invalidation::SystemResources { - public: - ChromeSystemResources() : scheduler_active_(false) {} - - ~ChromeSystemResources() { - CHECK(!scheduler_active_); - } - - virtual invalidation::Time current_time() { - return base::Time::Now(); - } - - virtual void StartScheduler() { - CHECK(!scheduler_active_); - scheduler_active_ = true; - } - - // We assume that the current message loop is stopped shortly after - // this is called, i.e. that any in-flight delayed tasks won't get - // run. - // - // TODO(akalin): Make sure that the above actually holds. - virtual void StopScheduler() { - CHECK(scheduler_active_); - scheduler_active_ = false; - } - - virtual void ScheduleWithDelay(invalidation::TimeDelta delay, - invalidation::Closure* task) { - if (!scheduler_active_) { - delete task; - return; - } - CHECK(invalidation::IsCallbackRepeatable(task)); - MessageLoop::current()->PostDelayedTask( - FROM_HERE, - NewRunnableFunction(&RunAndDeleteClosure, task), - delay.InMillisecondsRoundedUp()); - } - - virtual void ScheduleImmediately(invalidation::Closure* task) { - if (!scheduler_active_) { - delete task; - return; - } - CHECK(invalidation::IsCallbackRepeatable(task)); - MessageLoop::current()->PostTask( - FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task)); - } - - virtual void Log(LogLevel level, const char* file, int line, - const char* format, ...) { - va_list ap; - va_start(ap, format); - std::string result; - StringAppendV(&result, format, ap); - logging::LogMessage(file, line).stream() << result; - va_end(ap); - } - - private: - bool scheduler_active_; -}; - -// We need to write our own protobuf to string functions because we -// use LITE_RUNTIME, which doesn't support DebugString(). - -std::string ObjectIdToString( - const invalidation::ObjectId& object_id) { - std::stringstream ss; - ss << "{ "; - ss << "name: " << object_id.name().string_value() << ", "; - ss << "source: " << object_id.source(); - ss << " }"; - return ss.str(); -} - -std::string StatusToString( - const invalidation::Status& status) { - std::stringstream ss; - ss << "{ "; - ss << "code: " << status.code() << ", "; - ss << "description: " << status.description(); - ss << " }"; - return ss.str(); -} - -std::string InvalidationToString( - const invalidation::Invalidation& invalidation) { - std::stringstream ss; - ss << "{ "; - ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", "; - ss << "version: " << invalidation.version() << ", "; - ss << "components: { "; - const invalidation::ComponentStampLog& component_stamp_log = - invalidation.component_stamp_log(); - for (int i = 0; i < component_stamp_log.stamps_size(); ++i) { - const invalidation::ComponentStamp& component_stamp = - component_stamp_log.stamps(i); - ss << "component: " << component_stamp.component() << ", "; - ss << "time: " << component_stamp.time() << ", "; - } - ss << " }"; - ss << " }"; - return ss.str(); -} - -std::string RegistrationUpdateToString( - const invalidation::RegistrationUpdate& update) { - std::stringstream ss; - ss << "{ "; - ss << "type: " << update.type() << ", "; - ss << "object_id: " << ObjectIdToString(update.object_id()) << ", "; - ss << "version: " << update.version() << ", "; - ss << "sequence_number: " << update.sequence_number(); - ss << " }"; - return ss.str(); -} - -std::string RegistrationUpdateResultToString( - const invalidation::RegistrationUpdateResult& update_result) { - std::stringstream ss; - ss << "{ "; - ss << "operation: " - << RegistrationUpdateToString(update_result.operation()) << ", "; - ss << "status: " << StatusToString(update_result.status()); - ss << " }"; - return ss.str(); -} - // The actual listener for sync notifications from the cache // invalidation service. class ChromeInvalidationListener @@ -346,8 +203,9 @@ class ChromeInvalidationListener virtual void Invalidate(const invalidation::Invalidation& invalidation, invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); - LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation); - RunAndDeleteClosure(callback); + LOG(INFO) << "Invalidate: " + << sync_notifier::InvalidationToString(invalidation); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would respond to the invalidation for the // given object (e.g., refetch the invalidated object). } @@ -355,7 +213,7 @@ class ChromeInvalidationListener virtual void InvalidateAll(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "InvalidateAll"; - RunAndDeleteClosure(callback); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would loop over the current registered // data types and send notifications for those. } @@ -363,7 +221,7 @@ class ChromeInvalidationListener virtual void AllRegistrationsLost(invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "AllRegistrationsLost"; - RunAndDeleteClosure(callback); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would try to re-register for all // registered data types. } @@ -372,8 +230,8 @@ class ChromeInvalidationListener invalidation::Closure* callback) { CHECK(invalidation::IsCallbackRepeatable(callback)); LOG(INFO) << "RegistrationLost: " - << ObjectIdToString(object_id); - RunAndDeleteClosure(callback); + << sync_notifier::ObjectIdToString(object_id); + sync_notifier::RunAndDeleteClosure(callback); // A real implementation would try to re-register for this // particular data type. } @@ -382,199 +240,6 @@ class ChromeInvalidationListener DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener); }; -static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); -static const buzz::QName kQnTangoIqPacketContent( - "google:tango", "content"); - -// A task that listens for ClientInvalidation messages and calls the -// given callback on them. -class CacheInvalidationListenTask : public buzz::XmppTask { - public: - // Takes ownership of callback. - CacheInvalidationListenTask(Task* parent, - Callback1<const std::string&>::Type* callback) - : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} - virtual ~CacheInvalidationListenTask() {} - - virtual int ProcessStart() { - LOG(INFO) << "CacheInvalidationListenTask started"; - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - LOG(INFO) << "CacheInvalidationListenTask blocked"; - return STATE_BLOCKED; - } - LOG(INFO) << "CacheInvalidationListenTask response received"; - std::string data; - if (GetTangoIqPacketData(stanza, &data)) { - callback_->Run(data); - } else { - LOG(ERROR) << "Could not get packet data"; - } - // Acknowledge receipt of the iq to the buzz server. - // TODO(akalin): Send an error response for malformed packets. - scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); - SendStanza(response_stanza.get()); - return STATE_RESPONSE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - LOG(INFO) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (IsValidTangoIqPacket(stanza)) { - LOG(INFO) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - LOG(INFO) << "Stanza skipped"; - return false; - } - - private: - bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { - return - (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && - (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); - } - - bool GetTangoIqPacketData(const buzz::XmlElement* stanza, - std::string* data) { - DCHECK(IsValidTangoIqPacket(stanza)); - const buzz::XmlElement* tango_iq_packet = - stanza->FirstNamed(kQnTangoIqPacket); - if (!tango_iq_packet) { - LOG(ERROR) << "Could not find tango IQ packet element"; - return false; - } - const buzz::XmlElement* tango_iq_packet_content = - tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); - if (!tango_iq_packet_content) { - LOG(ERROR) << "Could not find tango IQ packet content element"; - return false; - } - *data = tango_iq_packet_content->BodyText(); - return true; - } - - scoped_ptr<Callback1<const std::string&>::Type> callback_; - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); -}; - -// A task that sends a single outbound ClientInvalidation message. -class CacheInvalidationSendMessageTask : public buzz::XmppTask { - public: - CacheInvalidationSendMessageTask(Task* parent, - const buzz::Jid& to_jid, - const std::string& msg) - : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), - to_jid_(to_jid), msg_(msg) {} - virtual ~CacheInvalidationSendMessageTask() {} - - virtual int ProcessStart() { - scoped_ptr<buzz::XmlElement> stanza( - MakeTangoIqPacket(to_jid_, task_id(), msg_)); - LOG(INFO) << "Sending message: " - << notifier::XmlElementToString(*stanza.get()); - if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { - LOG(INFO) << "Error when sending message"; - return STATE_ERROR; - } - return STATE_RESPONSE; - } - - virtual int ProcessResponse() { - const buzz::XmlElement* stanza = NextStanza(); - if (stanza == NULL) { - LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; - return STATE_BLOCKED; - } - LOG(INFO) << "CacheInvalidationSendMessageTask response received: " - << notifier::XmlElementToString(*stanza); - return STATE_DONE; - } - - virtual bool HandleStanza(const buzz::XmlElement* stanza) { - LOG(INFO) << "Stanza received: " - << notifier::XmlElementToString(*stanza); - if (!MatchResponseIq(stanza, to_jid_, task_id())) { - LOG(INFO) << "Stanza skipped"; - return false; - } - LOG(INFO) << "Queueing stanza"; - QueueStanza(stanza); - return true; - } - - private: - static buzz::XmlElement* MakeTangoIqPacket( - const buzz::Jid& to_jid, - const std::string& task_id, - const std::string& msg) { - buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); - buzz::XmlElement* tango_iq_packet = - new buzz::XmlElement(kQnTangoIqPacket, true); - iq->AddElement(tango_iq_packet); - buzz::XmlElement* tango_iq_packet_content = - new buzz::XmlElement(kQnTangoIqPacketContent, true); - tango_iq_packet->AddElement(tango_iq_packet_content); - tango_iq_packet_content->SetBodyText(msg); - return iq; - } - - buzz::Jid to_jid_; - std::string msg_; - - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); -}; - -// Class that handles the details of sending and receiving client -// invalidation packets. -class CacheInvalidationPacketHandler { - public: - CacheInvalidationPacketHandler( - buzz::XmppClient* xmpp_client, - invalidation::NetworkEndpoint* network_endpoint, - const buzz::Jid& to_jid) - : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint), - to_jid_(to_jid) { - // Owned by xmpp_client. - CacheInvalidationListenTask* listen_task = - new CacheInvalidationListenTask( - xmpp_client, NewCallback( - this, &CacheInvalidationPacketHandler::HandleInboundPacket)); - listen_task->Start(); - } - - void HandleOutboundPacket(invalidation::NetworkEndpoint* const & - network_endpoint) { - CHECK_EQ(network_endpoint, network_endpoint_); - invalidation::string message; - network_endpoint->TakeOutboundMessage(&message); - std::string encoded_message; - CHECK(base::Base64Encode(message, &encoded_message)); - // Owned by xmpp_client. - CacheInvalidationSendMessageTask* send_message_task = - new CacheInvalidationSendMessageTask(xmpp_client_, - to_jid_, - encoded_message); - send_message_task->Start(); - } - - private: - void HandleInboundPacket(const std::string& packet) { - std::string decoded_message; - CHECK(base::Base64Decode(packet, &decoded_message)); - network_endpoint_->HandleInboundMessage(decoded_message); - } - - buzz::XmppClient* xmpp_client_; - invalidation::NetworkEndpoint* network_endpoint_; - buzz::Jid to_jid_; -}; - // Delegate for server-side notifications. class CacheInvalidationNotifierDelegate : public XmppNotificationClient::Delegate { @@ -601,36 +266,15 @@ class CacheInvalidationNotifierDelegate buzz::XmppClient* xmpp_client) { LOG(INFO) << "Logged in"; - chrome_system_resources_.StartScheduler(); - - invalidation::ClientType client_type; - client_type.set_type(invalidation::ClientType::CHROME_SYNC); // TODO(akalin): app_name should be per-client unique. - std::string app_name = "cc_sync_listen_notifications"; - invalidation::ClientConfig ticl_config; - invalidation_client_.reset( - new invalidation::InvalidationClientImpl( - &chrome_system_resources_, client_type, - app_name, &chrome_invalidation_listener_, ticl_config)); - - invalidation::NetworkEndpoint* network_endpoint = - invalidation_client_->network_endpoint(); - // TODO(akalin): Make tango jid configurable (so we can use - // staging). - buzz::Jid to_jid("tango@bot.talk.google.com/PROD"); - packet_handler_.reset( - new CacheInvalidationPacketHandler(xmpp_client, - network_endpoint, - to_jid)); - - network_endpoint->RegisterOutboundListener( - invalidation::NewPermanentCallback( - packet_handler_.get(), - &CacheInvalidationPacketHandler::HandleOutboundPacket)); + const std::string kAppName = "cc_sync_listen_notifications"; + chrome_invalidation_client_.Start(kAppName, + &chrome_invalidation_listener_, + xmpp_client); for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - invalidation_client_->Register( + chrome_invalidation_client_.Register( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); @@ -643,43 +287,39 @@ class CacheInvalidationNotifierDelegate // TODO(akalin): Figure out the correct place to put this. for (std::vector<invalidation::ObjectId>::const_iterator it = object_ids_.begin(); it != object_ids_.end(); ++it) { - invalidation_client_->Unregister( + chrome_invalidation_client_.Unregister( *it, invalidation::NewPermanentCallback( this, &CacheInvalidationNotifierDelegate::RegisterCallback)); } - packet_handler_.reset(); - invalidation_client_.reset(); - - chrome_system_resources_.StopScheduler(); + chrome_invalidation_client_.Stop(); } virtual void OnError(buzz::XmppEngine::Error error, int subcode) { LOG(INFO) << "Error: " << error << ", subcode: " << subcode; - packet_handler_.reset(); - invalidation_client_.reset(); - // TODO(akalin): Figure out whether we should stop the scheduler - // here. + // TODO(akalin): Figure out whether we should unregister here, + // too. + chrome_invalidation_client_.Stop(); } private: void RegisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result); + LOG(INFO) << "Registered: " + << sync_notifier::RegistrationUpdateResultToString(result); } void UnregisterCallback( const invalidation::RegistrationUpdateResult& result) { - LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result); + LOG(INFO) << "Unregistered: " + << sync_notifier::RegistrationUpdateResultToString(result); } std::vector<invalidation::ObjectId> object_ids_; - ChromeSystemResources chrome_system_resources_; ChromeInvalidationListener chrome_invalidation_listener_; - scoped_ptr<invalidation::InvalidationClient> invalidation_client_; - scoped_ptr<CacheInvalidationPacketHandler> packet_handler_; + sync_notifier::ChromeInvalidationClient chrome_invalidation_client_; }; } // namespace diff --git a/chrome/browser/sync/tools/sync_tools.gyp b/chrome/browser/sync/tools/sync_tools.gyp index 792bfbe..3343035 100644 --- a/chrome/browser/sync/tools/sync_tools.gyp +++ b/chrome/browser/sync/tools/sync_tools.gyp @@ -21,6 +21,7 @@ 'dependencies': [ '<(DEPTH)/base/base.gyp:base', '<(DEPTH)/chrome/chrome.gyp:notifier', + '<(DEPTH)/chrome/chrome.gyp:sync_notifier', '<(DEPTH)/third_party/cacheinvalidation/cacheinvalidation.gyp:cacheinvalidation', '<(DEPTH)/third_party/libjingle/libjingle.gyp:libjingle', ], |