diff options
author | nick@chromium.org <nick@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-10-08 18:22:25 +0000 |
---|---|---|
committer | nick@chromium.org <nick@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-10-08 18:22:25 +0000 |
commit | 3a218efa0cc67f81736e05fa1838ebc46378cde1 (patch) | |
tree | 62c6908a74c41ce442676e28649873d309d53404 /net | |
parent | 2c1dba660624ecc04ba6e5f2116694d517b34677 (diff) | |
download | chromium_src-3a218efa0cc67f81736e05fa1838ebc46378cde1.zip chromium_src-3a218efa0cc67f81736e05fa1838ebc46378cde1.tar.gz chromium_src-3a218efa0cc67f81736e05fa1838ebc46378cde1.tar.bz2 |
Implement generic GetUpdates support in python sync server. This was a recent
change to the sync protocol.
BUG=58217
TEST=chromiumsync_test.py
Review URL: http://codereview.chromium.org/3552010
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@61989 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rwxr-xr-x | net/tools/testserver/chromiumsync.py | 119 | ||||
-rwxr-xr-x | net/tools/testserver/chromiumsync_test.py | 191 |
2 files changed, 262 insertions, 48 deletions
diff --git a/net/tools/testserver/chromiumsync.py b/net/tools/testserver/chromiumsync.py index 843761a..62d1bf7 100755 --- a/net/tools/testserver/chromiumsync.py +++ b/net/tools/testserver/chromiumsync.py @@ -75,6 +75,10 @@ class ProtobufExtensionNotUnique(Error): """An entry should not have more than one protobuf extension present.""" +class DataTypeIdNotRecognized(Error): + """The requested data type is not recognized.""" + + def GetEntryType(entry): """Extract the sync type from a SyncEntry. @@ -119,13 +123,17 @@ def GetEntryTypesFromSpecifics(specifics): if specifics.HasExtension(extension)] -def GetRequestedTypes(get_updates_message): - """Determine the sync types requested by a client GetUpdates operation.""" - types = GetEntryTypesFromSpecifics( - get_updates_message.requested_types) - if types: - types.append(TOP_LEVEL) - return types +def SyncTypeToProtocolDataTypeId(data_type): + """Convert from a sync type (python enum) to the protocol's data type id.""" + return SYNC_TYPE_TO_EXTENSION[data_type].number + + +def ProtocolDataTypeIdToSyncType(protocol_data_type_id): + """Convert from the protocol's data type id to a sync type (python enum).""" + for data_type, protocol_extension in SYNC_TYPE_TO_EXTENSION.iteritems(): + if protocol_extension.number == protocol_data_type_id: + return data_type + raise DataTypeIdNotRecognized def GetDefaultEntitySpecifics(data_type): @@ -136,6 +144,7 @@ def GetDefaultEntitySpecifics(data_type): specifics.Extensions[extension_handle].SetInParent() return specifics + def DeepCopyOfProto(proto): """Return a deep copy of a protocol buffer.""" new_proto = type(proto)() @@ -165,6 +174,62 @@ class PermanentItem(object): self.sync_type = sync_type +class UpdateSieve(object): + """A filter to remove items the client has already seen.""" + def __init__(self, request): + self._original_request = request + self._state = {} + if request.from_progress_marker: + for marker in request.from_progress_marker: + if marker.HasField("timestamp_token_for_migration"): + timestamp = marker.timestamp_token_for_migration + elif marker.token: + timestamp = int(marker.token) + elif marker.HasField("token"): + timestamp = 0 + else: + raise ValueError("No timestamp information in progress marker.") + data_type = ProtocolDataTypeIdToSyncType(marker.data_type_id) + self._state[data_type] = timestamp + elif request.HasField("from_timestamp"): + for data_type in GetEntryTypesFromSpecifics(request.requested_types): + self._state[data_type] = request.from_timestamp + if self._state: + self._state[TOP_LEVEL] = min(self._state.itervalues()) + + def ClientWantsItem(self, item): + """Return true if the client hasn't already seen an item.""" + return self._state.get(GetEntryType(item), sys.maxint) < item.version + + def HasAnyTimestamp(self): + """Return true if at least one datatype was requested.""" + return bool(self._state) + + def GetMinTimestamp(self): + """Return true the smallest timestamp requested across all datatypes.""" + return min(self._state.itervalues()) + + def GetFirstTimeTypes(self): + """Return a list of datatypes requesting updates from timestamp zero.""" + return [datatype for datatype, timestamp in self._state.iteritems() + if timestamp == 0] + + def SaveProgress(self, new_timestamp, get_updates_response): + """Write the new_timestamp or new_progress_marker fields to a response.""" + if self._original_request.from_progress_marker: + for data_type, old_timestamp in self._state.iteritems(): + if data_type == TOP_LEVEL: + continue + new_marker = sync_pb2.DataTypeProgressMarker() + new_marker.data_type_id = SyncTypeToProtocolDataTypeId(data_type) + new_marker.token = str(max(old_timestamp, new_timestamp)) + if new_marker not in self._original_request.from_progress_marker: + get_updates_response.new_progress_marker.add().MergeFrom(new_marker) + elif self._original_request.HasField("from_timestamp"): + if self._original_request.from_timestamp < new_timestamp: + get_updates_response.new_timestamp = new_timestamp + + class SyncDataModel(object): """Models the account state of one sync user.""" _BATCH_SIZE = 100 @@ -383,19 +448,15 @@ class SyncDataModel(object): if spec.sync_type in requested_types: self._CreatePermanentItem(spec) - def GetChangesFromTimestamp(self, requested_types, timestamp): - """Get entries which have changed since a given timestamp, oldest first. + def GetChanges(self, sieve): + """Get entries which have changed, oldest first. The returned entries are limited to being _BATCH_SIZE many. The entries are returned in strict version order. Args: - requested_types: A list of sync data types from ALL_TYPES. - Only items of these types will be retrieved; others will be filtered - out. - timestamp: A timestamp / version number. Only items that have changed - more recently than this value will be retrieved; older items will - be filtered out. + sieve: An update sieve to use to filter out updates the client + has already seen. Returns: A tuple of (version, entries, changes_remaining). Version is a new timestamp value, which should be used as the starting point for the @@ -403,22 +464,24 @@ class SyncDataModel(object): timestamp query. Changes_remaining indicates the number of changes left on the server after this batch. """ - if timestamp == 0: - self._CreatePermanentItems(requested_types) + if not sieve.HasAnyTimestamp(): + return (0, [], 0) + min_timestamp = sieve.GetMinTimestamp() + self._CreatePermanentItems(sieve.GetFirstTimeTypes()) change_log = sorted(self._entries.values(), key=operator.attrgetter('version')) - new_changes = [x for x in change_log if x.version > timestamp] + new_changes = [x for x in change_log if x.version > min_timestamp] # Pick batch_size new changes, and then filter them. This matches # the RPC behavior of the production sync server. batch = new_changes[:self._BATCH_SIZE] if not batch: # Client is up to date. - return (timestamp, [], 0) + return (min_timestamp, [], 0) # Restrict batch to requested types. Tombstones are untyped # and will always get included. filtered = [DeepCopyOfProto(item) for item in batch - if item.deleted or (GetEntryType(item) in requested_types)] + if item.deleted or sieve.ClientWantsItem(item)] # The new client timestamp is the timestamp of the last item in the # batch, even if that item was filtered out. @@ -737,15 +800,11 @@ class TestServer(object): to the client request will be written. """ update_response.SetInParent() - requested_types = GetRequestedTypes(update_request) - new_timestamp, entries, remaining = self.account.GetChangesFromTimestamp( - requested_types, update_request.from_timestamp) + update_sieve = UpdateSieve(update_request) + new_timestamp, entries, remaining = self.account.GetChanges(update_sieve) update_response.changes_remaining = remaining - # If the client is up to date, we are careful not to set the - # new_timestamp field. - if new_timestamp != update_request.from_timestamp: - update_response.new_timestamp = new_timestamp - for entry in entries: - reply = update_response.entries.add() - reply.CopyFrom(entry) + for entry in entries: + reply = update_response.entries.add() + reply.CopyFrom(entry) + update_sieve.SaveProgress(new_timestamp, update_response) diff --git a/net/tools/testserver/chromiumsync_test.py b/net/tools/testserver/chromiumsync_test.py index f753c3b..844ba50 100755 --- a/net/tools/testserver/chromiumsync_test.py +++ b/net/tools/testserver/chromiumsync_test.py @@ -7,8 +7,10 @@ import unittest +import autofill_specifics_pb2 import chromiumsync import sync_pb2 +import theme_specifics_pb2 class SyncDataModelTest(unittest.TestCase): def setUp(self): @@ -17,6 +19,14 @@ class SyncDataModelTest(unittest.TestCase): def AddToModel(self, proto): self.model._entries[proto.id_string] = proto + def GetChangesFromTimestamp(self, requested_types, timestamp): + message = sync_pb2.GetUpdatesMessage() + message.from_timestamp = timestamp + for data_type in requested_types: + message.requested_types.Extensions[ + chromiumsync.SYNC_TYPE_TO_EXTENSION[data_type]].SetInParent() + return self.model.GetChanges(chromiumsync.UpdateSieve(message)) + def testPermanentItemSpecs(self): specs = chromiumsync.SyncDataModel._PERMANENT_ITEM_SPECS @@ -112,12 +122,13 @@ class SyncDataModelTest(unittest.TestCase): return 2 def testGetChangesFromTimestampZeroForEachType(self): - for sync_type in chromiumsync.ALL_TYPES: + all_types = chromiumsync.ALL_TYPES[1:] + for sync_type in all_types: self.model = chromiumsync.SyncDataModel() - request_types = [sync_type, chromiumsync.TOP_LEVEL] + request_types = [sync_type] version, changes, remaining = ( - self.model.GetChangesFromTimestamp(request_types, 0)) + self.GetChangesFromTimestamp(request_types, 0)) expected_count = self.ExpectedPermanentItemCount(sync_type) self.assertEqual(expected_count, version) @@ -130,20 +141,20 @@ class SyncDataModelTest(unittest.TestCase): # Test idempotence: another GetUpdates from ts=0 shouldn't recreate. version, changes, remaining = ( - self.model.GetChangesFromTimestamp(request_types, 0)) + self.GetChangesFromTimestamp(request_types, 0)) self.assertEqual(expected_count, version) self.assertEqual(expected_count, len(changes)) self.assertEqual(0, remaining) # Doing a wider GetUpdates from timestamp zero shouldn't recreate either. new_version, changes, remaining = ( - self.model.GetChangesFromTimestamp(chromiumsync.ALL_TYPES, 0)) + self.GetChangesFromTimestamp(all_types, 0)) self.assertEqual(len(chromiumsync.SyncDataModel._PERMANENT_ITEM_SPECS), new_version) self.assertEqual(new_version, len(changes)) self.assertEqual(0, remaining) version, changes, remaining = ( - self.model.GetChangesFromTimestamp(request_types, 0)) + self.GetChangesFromTimestamp(request_types, 0)) self.assertEqual(new_version, version) self.assertEqual(expected_count, len(changes)) self.assertEqual(0, remaining) @@ -152,7 +163,7 @@ class SyncDataModelTest(unittest.TestCase): for sync_type in chromiumsync.ALL_TYPES[1:]: specifics = chromiumsync.GetDefaultEntitySpecifics(sync_type) self.model = chromiumsync.SyncDataModel() - request_types = [sync_type, chromiumsync.TOP_LEVEL] + request_types = [sync_type] for i in range(self.model._BATCH_SIZE*3): entry = sync_pb2.SyncEntity() @@ -161,19 +172,19 @@ class SyncDataModelTest(unittest.TestCase): self.model._SaveEntry(entry) last_bit = self.ExpectedPermanentItemCount(sync_type) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, 0)) + self.GetChangesFromTimestamp(request_types, 0)) self.assertEqual(self.model._BATCH_SIZE, version) self.assertEqual(self.model._BATCH_SIZE*2 + last_bit, changes_remaining) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, version)) + self.GetChangesFromTimestamp(request_types, version)) self.assertEqual(self.model._BATCH_SIZE*2, version) self.assertEqual(self.model._BATCH_SIZE + last_bit, changes_remaining) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, version)) + self.GetChangesFromTimestamp(request_types, version)) self.assertEqual(self.model._BATCH_SIZE*3, version) self.assertEqual(last_bit, changes_remaining) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, version)) + self.GetChangesFromTimestamp(request_types, version)) self.assertEqual(self.model._BATCH_SIZE*3 + last_bit, version) self.assertEqual(0, changes_remaining) @@ -186,19 +197,19 @@ class SyncDataModelTest(unittest.TestCase): # The batch counts shouldn't change. version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, 0)) + self.GetChangesFromTimestamp(request_types, 0)) self.assertEqual(self.model._BATCH_SIZE, len(changes)) self.assertEqual(self.model._BATCH_SIZE*2 + last_bit, changes_remaining) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, version)) + self.GetChangesFromTimestamp(request_types, version)) self.assertEqual(self.model._BATCH_SIZE, len(changes)) self.assertEqual(self.model._BATCH_SIZE + last_bit, changes_remaining) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, version)) + self.GetChangesFromTimestamp(request_types, version)) self.assertEqual(self.model._BATCH_SIZE, len(changes)) self.assertEqual(last_bit, changes_remaining) version, changes, changes_remaining = ( - self.model.GetChangesFromTimestamp(request_types, version)) + self.GetChangesFromTimestamp(request_types, version)) self.assertEqual(last_bit, len(changes)) self.assertEqual(self.model._BATCH_SIZE*4 + last_bit, version) self.assertEqual(0, changes_remaining) @@ -213,7 +224,7 @@ class SyncDataModelTest(unittest.TestCase): # Start with a GetUpdates from timestamp 0, to populate permanent items. original_version, original_changes, changes_remaining = ( - self.model.GetChangesFromTimestamp([sync_type], 0)) + self.GetChangesFromTimestamp([sync_type], 0)) def DoCommit(original=None, id_string='', name=None, parent=None, prev=None): @@ -266,7 +277,7 @@ class SyncDataModelTest(unittest.TestCase): self.assertEqual(result1.parent_id_string, proto1.parent_id_string) self.assertEqual(result2.parent_id_string, result1.id_string) version, changes, remaining = ( - self.model.GetChangesFromTimestamp([sync_type], original_version)) + self.GetChangesFromTimestamp([sync_type], original_version)) self.assertEqual(3, len(changes)) self.assertEqual(0, remaining) self.assertEqual(original_version + 3, version) @@ -314,7 +325,7 @@ class SyncDataModelTest(unittest.TestCase): "Commit didn't make a defensive copy.") self.assertTrue(r.version > p.version) version, changes, remaining = ( - self.model.GetChangesFromTimestamp([sync_type], original_version)) + self.GetChangesFromTimestamp([sync_type], original_version)) self.assertEqual(5, len(changes)) self.assertEqual(0, remaining) self.assertEqual(original_version + 7, version) @@ -330,6 +341,150 @@ class SyncDataModelTest(unittest.TestCase): result1b.position_in_parent < result5.position_in_parent) + def testUpdateSieve(self): + # from_timestamp, legacy mode + autofill = autofill_specifics_pb2.autofill + theme = theme_specifics_pb2.theme + msg = sync_pb2.GetUpdatesMessage() + msg.from_timestamp = 15412 + msg.requested_types.Extensions[autofill].SetInParent() + msg.requested_types.Extensions[theme].SetInParent() + + sieve = chromiumsync.UpdateSieve(msg) + self.assertEqual(sieve._state, + {chromiumsync.TOP_LEVEL: 15412, + chromiumsync.AUTOFILL: 15412, + chromiumsync.THEME: 15412}) + + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(15412, response) + self.assertEqual(0, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(15413, response) + self.assertEqual(0, len(response.new_progress_marker)) + self.assertTrue(response.HasField('new_timestamp')) + self.assertEqual(15413, response.new_timestamp) + + # Existing tokens + msg = sync_pb2.GetUpdatesMessage() + marker = msg.from_progress_marker.add() + marker.data_type_id = autofill.number + marker.token = '15412' + marker = msg.from_progress_marker.add() + marker.data_type_id = theme.number + marker.token = '15413' + sieve = chromiumsync.UpdateSieve(msg) + self.assertEqual(sieve._state, + {chromiumsync.TOP_LEVEL: 15412, + chromiumsync.AUTOFILL: 15412, + chromiumsync.THEME: 15413}) + + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(15413, response) + self.assertEqual(1, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + marker = response.new_progress_marker[0] + self.assertEqual(marker.data_type_id, autofill.number) + self.assertEqual(marker.token, '15413') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + + # Empty tokens indicating from timestamp = 0 + msg = sync_pb2.GetUpdatesMessage() + marker = msg.from_progress_marker.add() + marker.data_type_id = autofill.number + marker.token = '412' + marker = msg.from_progress_marker.add() + marker.data_type_id = theme.number + marker.token = '' + sieve = chromiumsync.UpdateSieve(msg) + self.assertEqual(sieve._state, + {chromiumsync.TOP_LEVEL: 0, + chromiumsync.AUTOFILL: 412, + chromiumsync.THEME: 0}) + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(1, response) + self.assertEqual(1, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + marker = response.new_progress_marker[0] + self.assertEqual(marker.data_type_id, theme.number) + self.assertEqual(marker.token, '1') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(412, response) + self.assertEqual(1, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + marker = response.new_progress_marker[0] + self.assertEqual(marker.data_type_id, theme.number) + self.assertEqual(marker.token, '412') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(413, response) + self.assertEqual(2, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + marker = response.new_progress_marker[0] + self.assertEqual(marker.data_type_id, theme.number) + self.assertEqual(marker.token, '413') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + marker = response.new_progress_marker[1] + self.assertEqual(marker.data_type_id, autofill.number) + self.assertEqual(marker.token, '413') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + + # Migration token timestamps (client gives timestamp, server returns token) + msg = sync_pb2.GetUpdatesMessage() + marker = msg.from_progress_marker.add() + marker.data_type_id = autofill.number + marker.timestamp_token_for_migration = 15213 + marker = msg.from_progress_marker.add() + marker.data_type_id = theme.number + marker.timestamp_token_for_migration = 15211 + sieve = chromiumsync.UpdateSieve(msg) + self.assertEqual(sieve._state, + {chromiumsync.TOP_LEVEL: 15211, + chromiumsync.AUTOFILL: 15213, + chromiumsync.THEME: 15211}) + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(16000, response) # There were updates + self.assertEqual(2, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + marker = response.new_progress_marker[0] + self.assertEqual(marker.data_type_id, theme.number) + self.assertEqual(marker.token, '16000') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + marker = response.new_progress_marker[1] + self.assertEqual(marker.data_type_id, autofill.number) + self.assertEqual(marker.token, '16000') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + + msg = sync_pb2.GetUpdatesMessage() + marker = msg.from_progress_marker.add() + marker.data_type_id = autofill.number + marker.timestamp_token_for_migration = 3000 + marker = msg.from_progress_marker.add() + marker.data_type_id = theme.number + marker.timestamp_token_for_migration = 3000 + sieve = chromiumsync.UpdateSieve(msg) + self.assertEqual(sieve._state, + {chromiumsync.TOP_LEVEL: 3000, + chromiumsync.AUTOFILL: 3000, + chromiumsync.THEME: 3000}) + response = sync_pb2.GetUpdatesResponse() + sieve.SaveProgress(3000, response) # Already up to date + self.assertEqual(2, len(response.new_progress_marker)) + self.assertFalse(response.HasField('new_timestamp')) + marker = response.new_progress_marker[0] + self.assertEqual(marker.data_type_id, theme.number) + self.assertEqual(marker.token, '3000') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + marker = response.new_progress_marker[1] + self.assertEqual(marker.data_type_id, autofill.number) + self.assertEqual(marker.token, '3000') + self.assertFalse(marker.HasField('timestamp_token_for_migration')) + if __name__ == '__main__': unittest.main() |