diff options
author | rlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-12-17 07:01:11 +0000 |
---|---|---|
committer | rlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-12-17 07:01:11 +0000 |
commit | 55d56cf76e7070bfaaf8f795b8c35e9befdf2372 (patch) | |
tree | f938933da60c9c5f387dcdd5e8c0581a15db1670 /sync | |
parent | 1fc99a487f94fc6fcfdbc9d5cdd1d3e77b7336fa (diff) | |
download | chromium_src-55d56cf76e7070bfaaf8f795b8c35e9befdf2372.zip chromium_src-55d56cf76e7070bfaaf8f795b8c35e9befdf2372.tar.gz chromium_src-55d56cf76e7070bfaaf8f795b8c35e9befdf2372.tar.bz2 |
sync: Use invalidation local acknowledgements
As of r237421, the invalidations system now supports a new and improved
form of local acknowledgements. Any invalidations not explicitly
dropped or acknowledged will be redelivered after a restart.
This CL makes use of the new functionality in sync. It uses the
DataTypeTrackers to track the status of pending invalidations and
acknowledge or drop invalidations as appropriate.
Also notable in this CL is that the 'server_dropped_hints' flag has now
been uncommented in the sync protocol. We've known for a long time that
we wanted to support such a flag, but haven't been able to implement it
correctly until now.
BUG=78462,233437
Review URL: https://codereview.chromium.org/102353005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@241170 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'sync')
-rw-r--r-- | sync/internal_api/public/base/model_type_test_util.cc | 13 | ||||
-rw-r--r-- | sync/internal_api/public/base/model_type_test_util.h | 7 | ||||
-rw-r--r-- | sync/notifier/dropped_invalidation_tracker.cc | 7 | ||||
-rw-r--r-- | sync/notifier/dropped_invalidation_tracker.h | 11 | ||||
-rw-r--r-- | sync/notifier/mock_ack_handler.cc | 38 | ||||
-rw-r--r-- | sync/notifier/mock_ack_handler.h | 19 | ||||
-rw-r--r-- | sync/notifier/single_object_invalidation_set.cc | 4 | ||||
-rw-r--r-- | sync/notifier/single_object_invalidation_set.h | 1 | ||||
-rw-r--r-- | sync/protocol/sync.proto | 12 | ||||
-rw-r--r-- | sync/sessions/data_type_tracker.cc | 132 | ||||
-rw-r--r-- | sync/sessions/data_type_tracker.h | 26 | ||||
-rw-r--r-- | sync/sessions/nudge_tracker.cc | 41 | ||||
-rw-r--r-- | sync/sessions/nudge_tracker_unittest.cc | 291 |
13 files changed, 517 insertions, 85 deletions
diff --git a/sync/internal_api/public/base/model_type_test_util.cc b/sync/internal_api/public/base/model_type_test_util.cc index d9621bb..4876962 100644 --- a/sync/internal_api/public/base/model_type_test_util.cc +++ b/sync/internal_api/public/base/model_type_test_util.cc @@ -7,15 +7,22 @@ namespace syncer { -ObjectIdInvalidationMap BuildInvalidationMap( +syncer::Invalidation BuildInvalidation( ModelType type, int version, const std::string& payload) { - ObjectIdInvalidationMap map; invalidation::ObjectId id; bool result = RealModelTypeToObjectId(type, &id); DCHECK(result); - map.Insert(Invalidation::Init(id, version, payload)); + return Invalidation::Init(id, version, payload); +} + +ObjectIdInvalidationMap BuildInvalidationMap( + ModelType type, + int version, + const std::string& payload) { + ObjectIdInvalidationMap map; + map.Insert(BuildInvalidation(type, version, payload)); return map; } diff --git a/sync/internal_api/public/base/model_type_test_util.h b/sync/internal_api/public/base/model_type_test_util.h index c724bf5..1412f66 100644 --- a/sync/internal_api/public/base/model_type_test_util.h +++ b/sync/internal_api/public/base/model_type_test_util.h @@ -21,6 +21,13 @@ ObjectIdInvalidationMap BuildInvalidationMap( int version, const std::string& payload); +// Builds an invalidation. Similar to Invalidation::Init, but its first +// parameter is a ModelType rather than an ObjectId. +syncer::Invalidation BuildInvalidation( + ModelType type, + int version, + const std::string& payload); + // Defined for googletest. Forwards to ModelTypeSetToString(). void PrintTo(ModelTypeSet model_types, ::std::ostream* os); diff --git a/sync/notifier/dropped_invalidation_tracker.cc b/sync/notifier/dropped_invalidation_tracker.cc index 8599cc2..fc576e8 100644 --- a/sync/notifier/dropped_invalidation_tracker.cc +++ b/sync/notifier/dropped_invalidation_tracker.cc @@ -11,7 +11,8 @@ namespace syncer { DroppedInvalidationTracker::DroppedInvalidationTracker( const invalidation::ObjectId& id) : id_(id), - drop_ack_handle_(AckHandle::InvalidAckHandle()) {} + drop_ack_handle_(AckHandle::InvalidAckHandle()), + recovering_from_drop_(false) {} DroppedInvalidationTracker::~DroppedInvalidationTracker() {} @@ -23,6 +24,7 @@ void DroppedInvalidationTracker::RecordDropEvent( WeakHandle<AckHandler> handler, AckHandle handle) { drop_ack_handler_ = handler; drop_ack_handle_ = handle; + recovering_from_drop_ = true; } void DroppedInvalidationTracker::RecordRecoveryFromDropEvent() { @@ -33,10 +35,11 @@ void DroppedInvalidationTracker::RecordRecoveryFromDropEvent() { drop_ack_handle_); } drop_ack_handler_ = syncer::WeakHandle<AckHandler>(); + recovering_from_drop_ = false; } bool DroppedInvalidationTracker::IsRecoveringFromDropEvent() const { - return drop_ack_handler_.IsInitialized(); + return recovering_from_drop_; } } // namespace syncer diff --git a/sync/notifier/dropped_invalidation_tracker.h b/sync/notifier/dropped_invalidation_tracker.h index 877187e..20f32af 100644 --- a/sync/notifier/dropped_invalidation_tracker.h +++ b/sync/notifier/dropped_invalidation_tracker.h @@ -27,6 +27,9 @@ class Invalidation; // an instance of this class to that Invalidation's Drop() method. In order to // indicate recovery from a drop, the handler can call this class' // RecordRecoveryFromDropEvent(). +// +// Copy and assign are allowed for this class so we can use it in STL +// containers. class SYNC_EXPORT DroppedInvalidationTracker { public: explicit DroppedInvalidationTracker(const invalidation::ObjectId& id); @@ -51,6 +54,12 @@ class SYNC_EXPORT DroppedInvalidationTracker { invalidation::ObjectId id_; AckHandle drop_ack_handle_; + // This flag is set to true when we have dropped an invalidation and have not + // yet recovered from this drop event. Note that this may not always coincide + // with drop_ack_handler_ being initialized because a null AckHandler could be + // passed in to RecordDropEvent(). + bool recovering_from_drop_; + // A WeakHandle to the enitity responsible for persisting invalidation // acknowledgement state on disk. We can get away with using a WeakHandle // because we don't care if our drop recovery message doesn't gets delivered @@ -58,8 +67,6 @@ class SYNC_EXPORT DroppedInvalidationTracker { // invalidation state again on the next restart. It would be a waste of time // and resources, but otherwise not particularly harmful. WeakHandle<AckHandler> drop_ack_handler_; - - DISALLOW_COPY_AND_ASSIGN(DroppedInvalidationTracker); }; } // namespace syncer diff --git a/sync/notifier/mock_ack_handler.cc b/sync/notifier/mock_ack_handler.cc index 6a4c834..60300a2 100644 --- a/sync/notifier/mock_ack_handler.cc +++ b/sync/notifier/mock_ack_handler.cc @@ -50,6 +50,24 @@ bool MockAckHandler::IsUnacked(const Invalidation& invalidation) const { return it != unacked_invalidations_.end(); } +bool MockAckHandler::IsAcknowledged(const Invalidation& invalidation) const { + AckHandleMatcher matcher(invalidation.ack_handle()); + InvalidationVector::const_iterator it = std::find_if( + acked_invalidations_.begin(), + acked_invalidations_.end(), + matcher); + return it != acked_invalidations_.end(); +} + +bool MockAckHandler::IsDropped(const Invalidation& invalidation) const { + AckHandleMatcher matcher(invalidation.ack_handle()); + InvalidationVector::const_iterator it = std::find_if( + dropped_invalidations_.begin(), + dropped_invalidations_.end(), + matcher); + return it != dropped_invalidations_.end(); +} + bool MockAckHandler::IsUnsent(const Invalidation& invalidation) const { AckHandleMatcher matcher(invalidation.ack_handle()); InvalidationVector::const_iterator it1 = std::find_if( @@ -59,6 +77,10 @@ bool MockAckHandler::IsUnsent(const Invalidation& invalidation) const { return it1 != unsent_invalidations_.end(); } +bool MockAckHandler::AllInvalidationsAccountedFor() const { + return unacked_invalidations_.empty() && unrecovered_drop_events_.empty(); +} + void MockAckHandler::Acknowledge( const invalidation::ObjectId& id, const AckHandle& handle) { @@ -71,11 +93,27 @@ void MockAckHandler::Acknowledge( acked_invalidations_.push_back(*it); unacked_invalidations_.erase(it); } + + IdHandleMap::iterator it2 = unrecovered_drop_events_.find(id); + if (it2 != unrecovered_drop_events_.end() && it2->second.Equals(handle)) { + unrecovered_drop_events_.erase(it2); + } } void MockAckHandler::Drop( const invalidation::ObjectId& id, const AckHandle& handle) { + AckHandleMatcher matcher(handle); + InvalidationVector::iterator it = std::find_if( + unacked_invalidations_.begin(), + unacked_invalidations_.end(), + matcher); + if (it != unacked_invalidations_.end()) { + dropped_invalidations_.push_back(*it); + unacked_invalidations_.erase(it); + } + unrecovered_drop_events_.erase(id); + unrecovered_drop_events_.insert(std::make_pair(id, handle)); } WeakHandle<AckHandler> MockAckHandler::WeakHandleThis() { diff --git a/sync/notifier/mock_ack_handler.h b/sync/notifier/mock_ack_handler.h index bf6ecc9..08f0539 100644 --- a/sync/notifier/mock_ack_handler.h +++ b/sync/notifier/mock_ack_handler.h @@ -5,6 +5,7 @@ #ifndef SYNC_NOTIFIER_MOCK_ACK_HANDLER_H_ #define SYNC_NOTIFIER_MOCK_ACK_HANDLER_H_ +#include <map> #include <vector> #include "base/compiler_specific.h" @@ -12,6 +13,7 @@ #include "sync/base/sync_export.h" #include "sync/internal_api/public/util/weak_handle.h" #include "sync/notifier/ack_handler.h" +#include "sync/notifier/invalidation_util.h" namespace syncer { @@ -38,9 +40,20 @@ class SYNC_EXPORT MockAckHandler // been acknowledged yet. bool IsUnacked(const Invalidation& invalidation) const; + // Returns true if the specified invalidation has been delivered and + // acknowledged. + bool IsAcknowledged(const Invalidation& invalidation) const; + + // Returns true if the specified invalidation has been delivered and + // dropped. + bool IsDropped(const Invalidation& invalidation) const; + // Returns true if the specified invalidation was never delivered. bool IsUnsent(const Invalidation& invalidation) const; + // Retruns true if all invalidations have been acked and all drops recovered. + bool AllInvalidationsAccountedFor() const; + // Implementation of AckHandler. virtual void Acknowledge( const invalidation::ObjectId& id, @@ -51,12 +64,18 @@ class SYNC_EXPORT MockAckHandler private: typedef std::vector<syncer::Invalidation> InvalidationVector; + typedef std::map<invalidation::ObjectId, + AckHandle, + ObjectIdLessThan> IdHandleMap; WeakHandle<AckHandler> WeakHandleThis(); InvalidationVector unsent_invalidations_; InvalidationVector unacked_invalidations_; InvalidationVector acked_invalidations_; + InvalidationVector dropped_invalidations_; + + IdHandleMap unrecovered_drop_events_; }; } // namespace syncer diff --git a/sync/notifier/single_object_invalidation_set.cc b/sync/notifier/single_object_invalidation_set.cc index 6da3972..b204c64 100644 --- a/sync/notifier/single_object_invalidation_set.cc +++ b/sync/notifier/single_object_invalidation_set.cc @@ -26,6 +26,10 @@ void SingleObjectInvalidationSet::Clear() { invalidations_.clear(); } +void SingleObjectInvalidationSet::Erase(const_iterator it) { + invalidations_.erase(*it); +} + bool SingleObjectInvalidationSet::StartsWithUnknownVersion() const { return !invalidations_.empty() && invalidations_.begin()->is_unknown_version(); diff --git a/sync/notifier/single_object_invalidation_set.h b/sync/notifier/single_object_invalidation_set.h index e6f4d75..579e55f 100644 --- a/sync/notifier/single_object_invalidation_set.h +++ b/sync/notifier/single_object_invalidation_set.h @@ -35,6 +35,7 @@ class SYNC_EXPORT SingleObjectInvalidationSet { void Insert(const Invalidation& invalidation); void InsertAll(const SingleObjectInvalidationSet& other); void Clear(); + void Erase(const_iterator it); // Returns true if this list contains an unknown version. // diff --git a/sync/protocol/sync.proto b/sync/protocol/sync.proto index ecaceef..fa6d8ea 100644 --- a/sync/protocol/sync.proto +++ b/sync/protocol/sync.proto @@ -425,14 +425,6 @@ message GetUpdateTriggers { // first. Introduced in M29. optional bool client_dropped_hints = 2; - // This flag is set if the invalidation server reports that it may have - // dropped some invalidations at some point. The client will also drop any - // locally cached hints that are older than the server-did-drop notification. - // - // TODO(sync): Determine the format for this. - // - // optional bool server_dropped_hints = 6; - // This flag is set when the client suspects that its list of invalidation // hints may be incomplete. This may be the case if: // - The client is syncing for the first time. @@ -468,6 +460,10 @@ message GetUpdateTriggers { // invalidations. // Introduced in M29. optional int64 datatype_refresh_nudges = 5; + + // This flag is set if the invalidation server reports that it may have + // dropped some invalidations at some point. Introduced in M33. + optional bool server_dropped_hints = 6; } message DataTypeProgressMarker { diff --git a/sync/sessions/data_type_tracker.cc b/sync/sessions/data_type_tracker.cc index b0b4649..9e5c870 100644 --- a/sync/sessions/data_type_tracker.cc +++ b/sync/sessions/data_type_tracker.cc @@ -6,18 +6,18 @@ #include "base/logging.h" #include "sync/internal_api/public/base/invalidation.h" +#include "sync/notifier/invalidation_util.h" #include "sync/notifier/single_object_invalidation_set.h" #include "sync/sessions/nudge_tracker.h" namespace syncer { namespace sessions { -DataTypeTracker::DataTypeTracker() +DataTypeTracker::DataTypeTracker(const invalidation::ObjectId& object_id) : local_nudge_count_(0), local_refresh_request_count_(0), - local_payload_overflow_(false), - server_payload_overflow_(false), - payload_buffer_size_(NudgeTracker::kDefaultMaxPayloadsPerType) { } + payload_buffer_size_(NudgeTracker::kDefaultMaxPayloadsPerType), + drop_tracker_(object_id) { } DataTypeTracker::~DataTypeTracker() { } @@ -29,21 +29,74 @@ void DataTypeTracker::RecordLocalRefreshRequest() { local_refresh_request_count_++; } +namespace { + +bool IsInvalidationVersionLessThan( + const Invalidation& a, + const Invalidation& b) { + InvalidationVersionLessThan comparator; + return comparator(a, b); +} + +} // namespace + void DataTypeTracker::RecordRemoteInvalidations( const SingleObjectInvalidationSet& invalidations) { - for (SingleObjectInvalidationSet::const_iterator it = - invalidations.begin(); it != invalidations.end(); ++it) { - if (it->is_unknown_version()) { - server_payload_overflow_ = true; + // Merge the incoming invalidations into our list of pending invalidations. + // + // We won't use STL algorithms here because our concept of equality doesn't + // quite fit the expectations of set_intersection. In particular, two + // invalidations can be equal according to the SingleObjectInvalidationSet's + // rules (ie. have equal versions), but still have different AckHandle values + // and need to be acknowledged separately. + // + // The invalidaitons service can only track one outsanding invalidation per + // type and version, so the acknowledgement here should be redundant. We'll + // acknowledge them anyway since it should do no harm, and makes this code a + // bit easier to test. + // + // Overlaps should be extremely rare for most invalidations. They can happen + // for unknown version invalidations, though. + SingleObjectInvalidationSet::const_iterator incoming_it = + invalidations.begin(); + SingleObjectInvalidationSet::const_iterator existing_it = + pending_invalidations_.begin(); + + while (incoming_it != invalidations.end()) { + // Keep existing_it ahead of incoming_it. + while (existing_it != pending_invalidations_.end() + && IsInvalidationVersionLessThan(*existing_it, *incoming_it)) { + existing_it++; + } + + if (existing_it != pending_invalidations_.end() + && !IsInvalidationVersionLessThan(*incoming_it, *existing_it) + && !IsInvalidationVersionLessThan(*existing_it, *incoming_it)) { + // Incoming overlaps with existing. Either both are unknown versions + // (likely) or these two have the same version number (very unlikely). + // Acknowledge and overwrite existing. + SingleObjectInvalidationSet::const_iterator old_inv = existing_it; + existing_it++; + old_inv->Acknowledge(); + pending_invalidations_.Erase(old_inv); + pending_invalidations_.Insert(*incoming_it); + incoming_it++; } else { - pending_payloads_.push_back(it->payload()); - if (pending_payloads_.size() > payload_buffer_size_) { - // Drop the oldest payload if we've overflowed. - pending_payloads_.pop_front(); - local_payload_overflow_ = true; - } + DCHECK(existing_it == pending_invalidations_.end() + || IsInvalidationVersionLessThan(*incoming_it, *existing_it)); + // The incoming_it points at a version not in the pending_invalidations_ + // list. Add it to the list then increment past it. + pending_invalidations_.Insert(*incoming_it); + incoming_it++; } } + + // Those incoming invalidations may have caused us to exceed our buffer size. + // Trim some items from our list, if necessary. + while (pending_invalidations_.GetSize() > payload_buffer_size_) { + pending_invalidations_.begin()->Drop(&drop_tracker_); + pending_invalidations_.Erase(pending_invalidations_.begin()); + } } void DataTypeTracker::RecordSuccessfulSyncCycle() { @@ -54,9 +107,21 @@ void DataTypeTracker::RecordSuccessfulSyncCycle() { local_nudge_count_ = 0; local_refresh_request_count_ = 0; - pending_payloads_.clear(); - local_payload_overflow_ = false; - server_payload_overflow_ = false; + + // TODO(rlarocque): If we want this to be correct even if we should happen to + // crash before writing all our state, we should wait until the results of + // this sync cycle have been written to disk before updating the invalidations + // state. See crbug.com/324996. + for (SingleObjectInvalidationSet::const_iterator it = + pending_invalidations_.begin(); + it != pending_invalidations_.end(); ++it) { + it->Acknowledge(); + } + pending_invalidations_.Clear(); + + if (drop_tracker_.IsRecoveringFromDropEvent()) { + drop_tracker_.RecordRecoveryFromDropEvent(); + } } // This limit will take effect on all future invalidations received. @@ -69,16 +134,14 @@ bool DataTypeTracker::IsSyncRequired() const { (local_nudge_count_ > 0 || local_refresh_request_count_ > 0 || HasPendingInvalidation() || - local_payload_overflow_ || - server_payload_overflow_); + drop_tracker_.IsRecoveringFromDropEvent()); } bool DataTypeTracker::IsGetUpdatesRequired() const { return !IsThrottled() && (local_refresh_request_count_ > 0 || HasPendingInvalidation() || - local_payload_overflow_ || - server_payload_overflow_); + drop_tracker_.IsRecoveringFromDropEvent()); } bool DataTypeTracker::HasLocalChangePending() const { @@ -86,11 +149,7 @@ bool DataTypeTracker::HasLocalChangePending() const { } bool DataTypeTracker::HasPendingInvalidation() const { - return !pending_payloads_.empty(); -} - -std::string DataTypeTracker::GetMostRecentInvalidationPayload() const { - return pending_payloads_.back(); + return !pending_invalidations_.IsEmpty(); } void DataTypeTracker::SetLegacyNotificationHint( @@ -98,10 +157,11 @@ void DataTypeTracker::SetLegacyNotificationHint( DCHECK(!IsThrottled()) << "We should not make requests if the type is throttled."; - if (HasPendingInvalidation()) { + if (!pending_invalidations_.IsEmpty() && + !pending_invalidations_.back().is_unknown_version()) { // The old-style source info can contain only one hint per type. We grab // the most recent, to mimic the old coalescing behaviour. - progress->set_notification_hint(GetMostRecentInvalidationPayload()); + progress->set_notification_hint(pending_invalidations_.back().payload()); } else if (HasLocalChangePending()) { // The old-style source info sent up an empty string (as opposed to // nothing at all) when the type was locally nudged, but had not received @@ -115,17 +175,19 @@ void DataTypeTracker::FillGetUpdatesTriggersMessage( // Fill the list of payloads, if applicable. The payloads must be ordered // oldest to newest, so we insert them in the same order as we've been storing // them internally. - for (PayloadList::const_iterator payload_it = pending_payloads_.begin(); - payload_it != pending_payloads_.end(); ++payload_it) { - msg->add_notification_hint(*payload_it); + for (SingleObjectInvalidationSet::const_iterator it = + pending_invalidations_.begin(); + it != pending_invalidations_.end(); ++it) { + if (!it->is_unknown_version()) { + msg->add_notification_hint(it->payload()); + } } - msg->set_client_dropped_hints(local_payload_overflow_); + msg->set_server_dropped_hints( + pending_invalidations_.StartsWithUnknownVersion()); + msg->set_client_dropped_hints(drop_tracker_.IsRecoveringFromDropEvent()); msg->set_local_modification_nudges(local_nudge_count_); msg->set_datatype_refresh_nudges(local_refresh_request_count_); - - // TODO(rlarocque): Support Tango trickles. See crbug.com/223437. - // msg->set_server_dropped_hints(server_payload_oveflow_); } bool DataTypeTracker::IsThrottled() const { diff --git a/sync/sessions/data_type_tracker.h b/sync/sessions/data_type_tracker.h index 6ecaa0e..849d229 100644 --- a/sync/sessions/data_type_tracker.h +++ b/sync/sessions/data_type_tracker.h @@ -11,6 +11,8 @@ #include "base/basictypes.h" #include "base/time/time.h" +#include "sync/notifier/dropped_invalidation_tracker.h" +#include "sync/notifier/single_object_invalidation_set.h" #include "sync/protocol/sync.pb.h" namespace syncer { @@ -24,7 +26,7 @@ typedef std::deque<std::string> PayloadList; class DataTypeTracker { public: - DataTypeTracker(); + explicit DataTypeTracker(const invalidation::ObjectId& object_id); ~DataTypeTracker(); // For STL compatibility, we do not forbid the creation of a default copy @@ -65,9 +67,6 @@ class DataTypeTracker { // updates. bool HasPendingInvalidation() const; - // Returns the most recent invalidation payload. - std::string GetMostRecentInvalidationPayload() const; - // Fills in the legacy invalidaiton payload information fields. void SetLegacyNotificationHint( sync_pb::DataTypeProgressMarker* progress) const; @@ -100,24 +99,19 @@ class DataTypeTracker { // successful sync cycle. int local_refresh_request_count_; - // The list of invalidation payloads received since the last successful sync - // cycle. This list may be incomplete. See also: local_payload_overflow_ and - // server_payload_overflow_. - PayloadList pending_payloads_; - - // This flag is set if the the local buffer space was been exhausted, causing - // us to prematurely discard the invalidation payloads stored locally. - bool local_payload_overflow_; - - // This flag is set if the server buffer space was exchauseted, causing the - // server to prematurely discard some invalidation payloads. - bool server_payload_overflow_; + // The list of invalidations received since the last successful sync cycle. + // This list may be incomplete. See also: + // drop_tracker_.IsRecoveringFromDropEvent() and server_payload_overflow_. + SingleObjectInvalidationSet pending_invalidations_; size_t payload_buffer_size_; // If !unthrottle_time_.is_null(), this type is throttled and may not download // or commit data until the specified time. base::TimeTicks unthrottle_time_; + + // A helper to keep track invalidations we dropped due to overflow. + DroppedInvalidationTracker drop_tracker_; }; } // namespace syncer diff --git a/sync/sessions/nudge_tracker.cc b/sync/sessions/nudge_tracker.cc index 94bef81..d109294 100644 --- a/sync/sessions/nudge_tracker.cc +++ b/sync/sessions/nudge_tracker.cc @@ -23,7 +23,12 @@ NudgeTracker::NudgeTracker() // Default initialize all the type trackers. for (ModelTypeSet::Iterator it = protocol_types.First(); it.Good(); it.Inc()) { - type_trackers_[it.Get()] = DataTypeTracker(); + invalidation::ObjectId id; + if (!RealModelTypeToObjectId(it.Get(), &id)) { + NOTREACHED(); + } else { + type_trackers_.insert(std::make_pair(it.Get(), DataTypeTracker(id))); + } } } @@ -73,9 +78,11 @@ void NudgeTracker::RecordLocalChange(ModelTypeSet types) { updates_source_ = sync_pb::GetUpdatesCallerInfo::LOCAL; } - for (ModelTypeSet::Iterator it = types.First(); it.Good(); it.Inc()) { - DCHECK(type_trackers_.find(it.Get()) != type_trackers_.end()); - type_trackers_[it.Get()].RecordLocalChange(); + for (ModelTypeSet::Iterator type_it = types.First(); type_it.Good(); + type_it.Inc()) { + TypeTrackerMap::iterator tracker_it = type_trackers_.find(type_it.Get()); + DCHECK(tracker_it != type_trackers_.end()); + tracker_it->second.RecordLocalChange(); } } @@ -89,8 +96,9 @@ void NudgeTracker::RecordLocalRefreshRequest(ModelTypeSet types) { } for (ModelTypeSet::Iterator it = types.First(); it.Good(); it.Inc()) { - DCHECK(type_trackers_.find(it.Get()) != type_trackers_.end()); - type_trackers_[it.Get()].RecordLocalRefreshRequest(); + TypeTrackerMap::iterator tracker_it = type_trackers_.find(it.Get()); + DCHECK(tracker_it != type_trackers_.end()); + tracker_it->second.RecordLocalRefreshRequest(); } } @@ -98,16 +106,26 @@ void NudgeTracker::RecordRemoteInvalidation( const ObjectIdInvalidationMap& invalidation_map) { updates_source_ = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; - ObjectIdSet ids = invalidation_map.GetObjectIds(); - for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { + // Be very careful here. The invalidations acknowledgement system requires a + // sort of manual memory management. We'll leak a small amount of memory if + // we fail to acknowledge or drop any of these incoming invalidations. + + ObjectIdSet id_set = invalidation_map.GetObjectIds(); + for (ObjectIdSet::iterator it = id_set.begin(); it != id_set.end(); ++it) { ModelType type; + + // This should never happen. If it does, we'll start to leak memory. if (!ObjectIdToRealModelType(*it, &type)) { NOTREACHED() << "Object ID " << ObjectIdToString(*it) << " does not map to valid model type"; + continue; } - DCHECK(type_trackers_.find(type) != type_trackers_.end()); - type_trackers_[type].RecordRemoteInvalidations( + + // Forward the invalidations to the proper recipient. + TypeTrackerMap::iterator tracker_it = type_trackers_.find(type); + DCHECK(tracker_it != type_trackers_.end()); + tracker_it->second.RecordRemoteInvalidations( invalidation_map.ForObject(*it)); } } @@ -126,7 +144,8 @@ void NudgeTracker::SetTypesThrottledUntil( base::TimeDelta length, base::TimeTicks now) { for (ModelTypeSet::Iterator it = types.First(); it.Good(); it.Inc()) { - type_trackers_[it.Get()].ThrottleType(length, now); + TypeTrackerMap::iterator tracker_it = type_trackers_.find(it.Get()); + tracker_it->second.ThrottleType(length, now); } } diff --git a/sync/sessions/nudge_tracker_unittest.cc b/sync/sessions/nudge_tracker_unittest.cc index 450d17f..8fdf2f5 100644 --- a/sync/sessions/nudge_tracker_unittest.cc +++ b/sync/sessions/nudge_tracker_unittest.cc @@ -2,8 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" #include "sync/internal_api/public/base/model_type_test_util.h" #include "sync/notifier/invalidation_util.h" +#include "sync/notifier/mock_ack_handler.h" #include "sync/notifier/object_id_invalidation_map.h" #include "sync/sessions/nudge_tracker.h" #include "testing/gtest/include/gtest/gtest.h" @@ -22,6 +25,14 @@ testing::AssertionResult ModelTypeSetEquals(ModelTypeSet a, ModelTypeSet b) { } } +syncer::Invalidation BuildUnknownVersionInvalidation(ModelType type) { + invalidation::ObjectId id; + bool result = RealModelTypeToObjectId(type, &id); + DCHECK(result); + return Invalidation::InitUnknownVersion(id); +} + + } // namespace namespace sessions { @@ -115,6 +126,7 @@ TEST_F(NudgeTrackerTest, SourcePriorities) { nudge_tracker_.updates_source()); } +// Verifies the management of invalidation hints and GU trigger fields. TEST_F(NudgeTrackerTest, HintCoalescing) { // Easy case: record one hint. { @@ -169,11 +181,11 @@ TEST_F(NudgeTrackerTest, HintCoalescing) { } } -TEST_F(NudgeTrackerTest, DropHintsLocally) { - ObjectIdInvalidationMap invalidation_map = - BuildInvalidationMap(BOOKMARKS, 1, "hint"); - +// Test the dropping of invalidation hints. Receives invalidations one by one. +TEST_F(NudgeTrackerTest, DropHintsLocally_OneAtATime) { for (size_t i = 0; i < GetHintBufferSize(); ++i) { + ObjectIdInvalidationMap invalidation_map = + BuildInvalidationMap(BOOKMARKS, i, "hint"); nudge_tracker_.RecordRemoteInvalidation(invalidation_map); } { @@ -192,9 +204,9 @@ TEST_F(NudgeTrackerTest, DropHintsLocally) { { sync_pb::GetUpdateTriggers gu_trigger; nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); - EXPECT_EQ(GetHintBufferSize(), - static_cast<size_t>(gu_trigger.notification_hint_size())); EXPECT_TRUE(gu_trigger.client_dropped_hints()); + ASSERT_EQ(GetHintBufferSize(), + static_cast<size_t>(gu_trigger.notification_hint_size())); // Verify the newest hint was not dropped and is the last in the list. EXPECT_EQ("new_hint", gu_trigger.notification_hint(GetHintBufferSize()-1)); @@ -204,8 +216,99 @@ TEST_F(NudgeTrackerTest, DropHintsLocally) { } } -// TODO(rlarocque): Add trickles support. See crbug.com/223437. -// TEST_F(NudgeTrackerTest, DropHintsAtServer); +// Test the dropping of invalidation hints. +// Receives invalidations in large batches. +TEST_F(NudgeTrackerTest, DropHintsLocally_ManyHints) { + ObjectIdInvalidationMap invalidation_map; + for (size_t i = 0; i < GetHintBufferSize(); ++i) { + invalidation_map.Insert(BuildInvalidation(BOOKMARKS, i, "hint")); + } + nudge_tracker_.RecordRemoteInvalidation(invalidation_map); + { + sync_pb::GetUpdateTriggers gu_trigger; + nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); + EXPECT_EQ(GetHintBufferSize(), + static_cast<size_t>(gu_trigger.notification_hint_size())); + EXPECT_FALSE(gu_trigger.client_dropped_hints()); + } + + // Force an overflow. + ObjectIdInvalidationMap invalidation_map2; + invalidation_map2.Insert(BuildInvalidation(BOOKMARKS, 1000, "new_hint")); + invalidation_map2.Insert(BuildInvalidation(BOOKMARKS, 1001, "newer_hint")); + nudge_tracker_.RecordRemoteInvalidation(invalidation_map2); + + { + sync_pb::GetUpdateTriggers gu_trigger; + nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); + EXPECT_TRUE(gu_trigger.client_dropped_hints()); + ASSERT_EQ(GetHintBufferSize(), + static_cast<size_t>(gu_trigger.notification_hint_size())); + + // Verify the newest hints were not dropped and are the last in the list. + EXPECT_EQ("newer_hint", + gu_trigger.notification_hint(GetHintBufferSize()-1)); + EXPECT_EQ("new_hint", gu_trigger.notification_hint(GetHintBufferSize()-2)); + + // Verify the oldest hint, too. + EXPECT_EQ("hint", gu_trigger.notification_hint(0)); + } +} + +// Tests the receipt of 'unknown version' invalidations. +TEST_F(NudgeTrackerTest, DropHintsAtServer_Alone) { + ObjectIdInvalidationMap invalidation_map; + invalidation_map.Insert(BuildUnknownVersionInvalidation(BOOKMARKS)); + + // Record the unknown version invalidation. + nudge_tracker_.RecordRemoteInvalidation(invalidation_map); + { + sync_pb::GetUpdateTriggers gu_trigger; + nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); + EXPECT_TRUE(gu_trigger.server_dropped_hints()); + EXPECT_FALSE(gu_trigger.client_dropped_hints()); + ASSERT_EQ(0, gu_trigger.notification_hint_size()); + } + + // Clear status then verify. + nudge_tracker_.RecordSuccessfulSyncCycle(); + { + sync_pb::GetUpdateTriggers gu_trigger; + nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); + EXPECT_FALSE(gu_trigger.client_dropped_hints()); + EXPECT_FALSE(gu_trigger.server_dropped_hints()); + ASSERT_EQ(0, gu_trigger.notification_hint_size()); + } +} + +// Tests the receipt of 'unknown version' invalidations. This test also +// includes a known version invalidation to mix things up a bit. +TEST_F(NudgeTrackerTest, DropHintsAtServer_WithOtherInvalidations) { + ObjectIdInvalidationMap invalidation_map; + invalidation_map.Insert(BuildUnknownVersionInvalidation(BOOKMARKS)); + invalidation_map.Insert(BuildInvalidation(BOOKMARKS, 10, "hint")); + + // Record the two invalidations, one with unknown version, the other unknown. + nudge_tracker_.RecordRemoteInvalidation(invalidation_map); + { + sync_pb::GetUpdateTriggers gu_trigger; + nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); + EXPECT_TRUE(gu_trigger.server_dropped_hints()); + EXPECT_FALSE(gu_trigger.client_dropped_hints()); + ASSERT_EQ(1, gu_trigger.notification_hint_size()); + EXPECT_EQ("hint", gu_trigger.notification_hint(0)); + } + + // Clear status then verify. + nudge_tracker_.RecordSuccessfulSyncCycle(); + { + sync_pb::GetUpdateTriggers gu_trigger; + nudge_tracker_.FillProtoMessage(BOOKMARKS, &gu_trigger); + EXPECT_FALSE(gu_trigger.client_dropped_hints()); + EXPECT_FALSE(gu_trigger.server_dropped_hints()); + ASSERT_EQ(0, gu_trigger.notification_hint_size()); + } +} // Checks the behaviour of the invalidations-out-of-sync flag. TEST_F(NudgeTrackerTest, EnableDisableInvalidations) { @@ -463,5 +566,177 @@ TEST_F(NudgeTrackerTest, OverlappingThrottleIntervals) { EXPECT_TRUE(nudge_tracker_.GetThrottledTypes().Empty()); } +class NudgeTrackerAckTrackingTest : public NudgeTrackerTest { + public: + NudgeTrackerAckTrackingTest() {} + + bool IsInvalidationUnacknowledged(const syncer::Invalidation& invalidation) { + // Run pending tasks before checking with the MockAckHandler. + // The WeakHandle may have posted some tasks for it. + base::RunLoop().RunUntilIdle(); + return mock_ack_handler_.IsUnacked(invalidation); + } + + bool IsInvalidationAcknowledged(const syncer::Invalidation& invalidation) { + // Run pending tasks before checking with the MockAckHandler. + // The WeakHandle may have posted some tasks for it. + base::RunLoop().RunUntilIdle(); + return mock_ack_handler_.IsAcknowledged(invalidation); + } + + bool IsInvalidationDropped(const syncer::Invalidation& invalidation) { + // Run pending tasks before checking with the MockAckHandler. + // The WeakHandle may have posted some tasks for it. + base::RunLoop().RunUntilIdle(); + return mock_ack_handler_.IsDropped(invalidation); + } + + bool AllInvalidationsAccountedFor() { + return mock_ack_handler_.AllInvalidationsAccountedFor(); + } + + Invalidation SendInvalidation( + ModelType type, + int64 version, + const std::string& hint) { + // Build and register the invalidation. + syncer::Invalidation invalidation = BuildInvalidation(type, version, hint); + mock_ack_handler_.RegisterInvalidation(&invalidation); + + // Send it to the NudgeTracker. + ObjectIdInvalidationMap invalidation_map; + invalidation_map.Insert(invalidation); + nudge_tracker_.RecordRemoteInvalidation(invalidation_map); + + // Return it to the test framework for use in assertions. + return invalidation; + } + + Invalidation SendUnknownVersionInvalidation(ModelType type) { + // Build and register the invalidation. + syncer::Invalidation invalidation = BuildUnknownVersionInvalidation(type); + mock_ack_handler_.RegisterInvalidation(&invalidation); + + // Send it to the NudgeTracker. + ObjectIdInvalidationMap invalidation_map; + invalidation_map.Insert(invalidation); + nudge_tracker_.RecordRemoteInvalidation(invalidation_map); + + // Return it to the test framework for use in assertions. + return invalidation; + } + + void RecordSuccessfulSyncCycle() { + nudge_tracker_.RecordSuccessfulSyncCycle(); + } + + private: + syncer::MockAckHandler mock_ack_handler_; + base::MessageLoop loop_; +}; + +// Test the acknowledgement of a single invalidation. +TEST_F(NudgeTrackerAckTrackingTest, SimpleAcknowledgement) { + Invalidation inv = SendInvalidation(BOOKMARKS, 10, "hint"); + + EXPECT_TRUE(IsInvalidationUnacknowledged(inv)); + + RecordSuccessfulSyncCycle(); + EXPECT_TRUE(IsInvalidationAcknowledged(inv)); + + EXPECT_TRUE(AllInvalidationsAccountedFor()); +} + +// Test the acknowledgement of many invalidations. +TEST_F(NudgeTrackerAckTrackingTest, ManyAcknowledgements) { + Invalidation inv1 = SendInvalidation(BOOKMARKS, 10, "hint"); + Invalidation inv2 = SendInvalidation(BOOKMARKS, 14, "hint2"); + Invalidation inv3 = SendInvalidation(PREFERENCES, 8, "hint3"); + + EXPECT_TRUE(IsInvalidationUnacknowledged(inv1)); + EXPECT_TRUE(IsInvalidationUnacknowledged(inv2)); + EXPECT_TRUE(IsInvalidationUnacknowledged(inv3)); + + RecordSuccessfulSyncCycle(); + EXPECT_TRUE(IsInvalidationAcknowledged(inv1)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv2)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv3)); + + EXPECT_TRUE(AllInvalidationsAccountedFor()); +} + +// Test dropping when the buffer overflows and subsequent drop recovery. +TEST_F(NudgeTrackerAckTrackingTest, OverflowAndRecover) { + std::vector<Invalidation> invalidations; + + Invalidation inv10 = SendInvalidation(BOOKMARKS, 10, "hint"); + for (size_t i = 1; i < GetHintBufferSize(); ++i) { + invalidations.push_back(SendInvalidation(BOOKMARKS, i+10, "hint")); + } + + for (std::vector<Invalidation>::iterator it = invalidations.begin(); + it != invalidations.end(); ++it) { + EXPECT_TRUE(IsInvalidationUnacknowledged(*it)); + } + + // This invalidation, though arriving the most recently, has the oldest + // version number so it should be dropped first. + Invalidation inv5 = SendInvalidation(BOOKMARKS, 5, "old_hint"); + EXPECT_TRUE(IsInvalidationDropped(inv5)); + + // This invalidation has a larger version number, so it will force a + // previously delivered invalidation to be dropped. + Invalidation inv100 = SendInvalidation(BOOKMARKS, 100, "new_hint"); + EXPECT_TRUE(IsInvalidationDropped(inv10)); + + // This should recover from the drop and bring us back into sync. + RecordSuccessfulSyncCycle(); + + for (std::vector<Invalidation>::iterator it = invalidations.begin(); + it != invalidations.end(); ++it) { + EXPECT_TRUE(IsInvalidationAcknowledged(*it)); + } + EXPECT_TRUE(IsInvalidationAcknowledged(inv100)); + + EXPECT_TRUE(AllInvalidationsAccountedFor()); +} + +// Test receipt of an unknown version invalidation from the server. +TEST_F(NudgeTrackerAckTrackingTest, UnknownVersionFromServer_Simple) { + Invalidation inv = SendUnknownVersionInvalidation(BOOKMARKS); + EXPECT_TRUE(IsInvalidationUnacknowledged(inv)); + RecordSuccessfulSyncCycle(); + EXPECT_TRUE(IsInvalidationAcknowledged(inv)); + EXPECT_TRUE(AllInvalidationsAccountedFor()); +} + +// Test receipt of multiple unknown version invalidations from the server. +TEST_F(NudgeTrackerAckTrackingTest, UnknownVersionFromServer_Complex) { + Invalidation inv1 = SendUnknownVersionInvalidation(BOOKMARKS); + Invalidation inv2 = SendInvalidation(BOOKMARKS, 10, "hint"); + Invalidation inv3 = SendUnknownVersionInvalidation(BOOKMARKS); + Invalidation inv4 = SendUnknownVersionInvalidation(BOOKMARKS); + Invalidation inv5 = SendInvalidation(BOOKMARKS, 20, "hint2"); + + // These invalidations have been overridden, so they got acked early. + EXPECT_TRUE(IsInvalidationAcknowledged(inv1)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv3)); + + // These invalidations are still waiting to be used. + EXPECT_TRUE(IsInvalidationUnacknowledged(inv2)); + EXPECT_TRUE(IsInvalidationUnacknowledged(inv4)); + EXPECT_TRUE(IsInvalidationUnacknowledged(inv5)); + + // Finish the sync cycle and expect all remaining invalidations to be acked. + RecordSuccessfulSyncCycle(); + EXPECT_TRUE(IsInvalidationAcknowledged(inv1)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv2)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv3)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv4)); + EXPECT_TRUE(IsInvalidationAcknowledged(inv5)); + + EXPECT_TRUE(AllInvalidationsAccountedFor()); +} + } // namespace sessions } // namespace syncer |