diff options
Diffstat (limited to 'net/tools/testserver/chromiumsync_test.py')
-rwxr-xr-x | net/tools/testserver/chromiumsync_test.py | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/net/tools/testserver/chromiumsync_test.py b/net/tools/testserver/chromiumsync_test.py new file mode 100755 index 0000000..bb73d05 --- /dev/null +++ b/net/tools/testserver/chromiumsync_test.py @@ -0,0 +1,317 @@ +#!/usr/bin/python2.4 +# Copyright (c) 2010 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Tests exercising chromiumsync and SyncDataModel.""" + +import unittest + +from google.protobuf import text_format + +import chromiumsync +import sync_pb2 + +class SyncDataModelTest(unittest.TestCase): + def setUp(self): + self.model = chromiumsync.SyncDataModel() + + def AddToModel(self, proto): + self.model._entries[proto.id_string] = proto + + def testPermanentItemSpecs(self): + SPECS = chromiumsync.SyncDataModel._PERMANENT_ITEM_SPECS + # parent_tags must be declared before use. + declared_specs = set(['0']) + for spec in SPECS: + self.assertTrue(spec.parent_tag in declared_specs) + declared_specs.add(spec.tag) + # Every sync datatype should have a permanent folder associated with it. + unique_datatypes = set([x.sync_type for x in SPECS]) + self.assertEqual(unique_datatypes, + set(chromiumsync.ALL_TYPES)) + + def testSaveEntry(self): + proto = sync_pb2.SyncEntity() + proto.id_string = 'abcd'; + proto.version = 0; + self.assertFalse(self.model._ItemExists(proto.id_string)) + self.model._SaveEntry(proto) + self.assertEqual(1, proto.version) + self.assertTrue(self.model._ItemExists(proto.id_string)) + self.model._SaveEntry(proto) + self.assertEqual(2, proto.version) + proto.version = 0 + self.assertTrue(self.model._ItemExists(proto.id_string)) + self.assertEqual(2, self.model._entries[proto.id_string].version) + + def testWritePosition(self): + def MakeProto(id_string, parent, position): + proto = sync_pb2.SyncEntity() + proto.id_string = id_string + proto.position_in_parent = position + proto.parent_id_string = parent + self.AddToModel(proto) + + MakeProto('a', 'X', 1000) + MakeProto('b', 'X', 1800) + MakeProto('c', 'X', 2600) + MakeProto('a1', 'Z', 1007) + MakeProto('a2', 'Z', 1807) + MakeProto('a3', 'Z', 2607) + MakeProto('s', 'Y', 10000) + + def AssertPositionResult(my_id, parent_id, prev_id, expected_position): + entry = sync_pb2.SyncEntity() + entry.id_string = my_id + self.model._WritePosition(entry, parent_id, prev_id) + self.assertEqual(expected_position, entry.position_in_parent) + self.assertEqual(parent_id, entry.parent_id_string) + self.assertFalse(entry.HasField('insert_after_item_id')) + + AssertPositionResult('new', 'new_parent', '', 0) + AssertPositionResult('new', 'Y', '', 10000 - (2 ** 20)) + AssertPositionResult('new', 'Y', 's', 10000 + (2 ** 20)) + AssertPositionResult('s', 'Y', '', 10000) + AssertPositionResult('s', 'Y', 's', 10000) + AssertPositionResult('a1', 'Z', '', 1007) + + AssertPositionResult('new', 'X', '', 1000 - (2 ** 20)) + AssertPositionResult('new', 'X', 'a', 1100) + AssertPositionResult('new', 'X', 'b', 1900) + AssertPositionResult('new', 'X', 'c', 2600 + (2 ** 20)) + + AssertPositionResult('a1', 'X', '', 1000 - (2 ** 20)) + AssertPositionResult('a1', 'X', 'a', 1100) + AssertPositionResult('a1', 'X', 'b', 1900) + AssertPositionResult('a1', 'X', 'c', 2600 + (2 ** 20)) + + AssertPositionResult('a', 'X', '', 1000) + AssertPositionResult('a', 'X', 'b', 1900) + AssertPositionResult('a', 'X', 'c', 2600 + (2 ** 20)) + + AssertPositionResult('b', 'X', '', 1000 - (2 ** 20)) + AssertPositionResult('b', 'X', 'a', 1800) + AssertPositionResult('b', 'X', 'c', 2600 + (2 ** 20)) + + AssertPositionResult('c', 'X', '', 1000 - (2 ** 20)) + AssertPositionResult('c', 'X', 'a', 1100) + AssertPositionResult('c', 'X', 'b', 2600) + + def testCreatePermanentItems(self): + self.model._CreatePermanentItems(chromiumsync.ALL_TYPES) + self.assertEqual(len(chromiumsync.ALL_TYPES) + 2, + len(self.model._entries)) + + def ExpectedPermanentItemCount(self, sync_type): + if sync_type == chromiumsync.BOOKMARK: + return 4 + elif sync_type == chromiumsync.TOP_LEVEL: + return 1 + else: + return 2 + + def testGetChangesFromTimestampZeroForEachType(self): + for sync_type in chromiumsync.ALL_TYPES: + self.model = chromiumsync.SyncDataModel() + request_types = [sync_type, chromiumsync.TOP_LEVEL] + + version, changes = self.model.GetChangesFromTimestamp(request_types, 0) + + expected_count = self.ExpectedPermanentItemCount(sync_type) + self.assertEqual(expected_count, version) + self.assertEqual(expected_count, len(changes)) + self.assertEqual('google_chrome', changes[0].server_defined_unique_tag) + for change in changes: + self.assertTrue(change.HasField('server_defined_unique_tag')) + self.assertEqual(change.version, change.sync_timestamp) + self.assertTrue(change.version <= version) + + # Test idempotence: another GetUpdates from ts=0 shouldn't recreate. + version, changes = self.model.GetChangesFromTimestamp(request_types, 0) + self.assertEqual(expected_count, version) + self.assertEqual(expected_count, len(changes)) + + # Doing a wider GetUpdates from timestamp zero shouldn't recreate either. + new_version, changes = self.model.GetChangesFromTimestamp( + chromiumsync.ALL_TYPES, 0) + self.assertEqual(len(chromiumsync.SyncDataModel._PERMANENT_ITEM_SPECS), + new_version) + self.assertEqual(new_version, len(changes)) + version, changes = self.model.GetChangesFromTimestamp(request_types, 0) + self.assertEqual(new_version, version) + self.assertEqual(expected_count, len(changes)) + + def testBatchSize(self): + for sync_type in chromiumsync.ALL_TYPES[1:]: + specifics = chromiumsync.GetDefaultEntitySpecifics(sync_type) + self.model = chromiumsync.SyncDataModel() + request_types = [sync_type, chromiumsync.TOP_LEVEL] + + for i in range(self.model._BATCH_SIZE*3): + entry = sync_pb2.SyncEntity() + entry.id_string = 'batch test %d' % i + entry.specifics.CopyFrom(specifics) + self.model._SaveEntry(entry) + version, changes = self.model.GetChangesFromTimestamp(request_types, 0) + self.assertEqual(self.model._BATCH_SIZE, version) + version, changes = self.model.GetChangesFromTimestamp(request_types, + version) + self.assertEqual(self.model._BATCH_SIZE*2, version) + version, changes = self.model.GetChangesFromTimestamp(request_types, + version) + self.assertEqual(self.model._BATCH_SIZE*3, version) + expected_dingleberry = self.ExpectedPermanentItemCount(sync_type) + version, changes = self.model.GetChangesFromTimestamp(request_types, + version) + self.assertEqual(self.model._BATCH_SIZE*3 + expected_dingleberry, + version) + + # Now delete a third of the items. + for i in xrange(self.model._BATCH_SIZE*3 - 1, 0, -3): + entry = sync_pb2.SyncEntity() + entry.id_string = 'batch test %d' % i + entry.deleted = True + self.model._SaveEntry(entry) + + # The batch counts shouldn't change. + version, changes = self.model.GetChangesFromTimestamp(request_types, 0) + self.assertEqual(self.model._BATCH_SIZE, len(changes)) + version, changes = self.model.GetChangesFromTimestamp(request_types, + version) + self.assertEqual(self.model._BATCH_SIZE, len(changes)) + version, changes = self.model.GetChangesFromTimestamp(request_types, + version) + self.assertEqual(self.model._BATCH_SIZE, len(changes)) + expected_dingleberry = self.ExpectedPermanentItemCount(sync_type) + version, changes = self.model.GetChangesFromTimestamp(request_types, + version) + self.assertEqual(expected_dingleberry, len(changes)) + self.assertEqual(self.model._BATCH_SIZE*4 + expected_dingleberry, version) + + def testCommitEachDataType(self): + for sync_type in chromiumsync.ALL_TYPES[1:]: + specifics = chromiumsync.GetDefaultEntitySpecifics(sync_type) + self.model = chromiumsync.SyncDataModel() + my_cache_guid = '112358132134' + parent = 'foobar' + commit_session = {} + + # Start with a GetUpdates from timestamp 0, to populate permanent items. + original_version, original_changes = ( + self.model.GetChangesFromTimestamp([sync_type], 0)) + + def DoCommit(original=None, id='', name=None, parent=None, prev=None): + proto = sync_pb2.SyncEntity() + if original is not None: + proto.version = original.version + proto.id_string = original.id_string + proto.parent_id_string = original.parent_id_string + proto.name = original.name + else: + proto.id_string = id + proto.version = 0 + proto.specifics.CopyFrom(specifics) + if name is not None: + proto.name = name + if parent: + proto.parent_id_string = parent.id_string + if prev: + proto.insert_after_item_id = prev.id_string + else: + proto.insert_after_item_id = '' + proto.folder = True + proto.deleted = False + result = self.model.CommitEntry(proto, my_cache_guid, commit_session) + self.assertTrue(result) + return (proto, result) + + # Commit a new item. + proto1, result1 = DoCommit(name='namae', id='Foo', + parent=original_changes[-1]) + # Commit an item whose parent is another item (referenced via the + # pre-commit ID). + proto2, result2 = DoCommit(name='Secondo', id='Bar', + parent=proto1) + # Commit a sibling of the second item. + proto3, result3 = DoCommit(name='Third!', id='Baz', + parent=proto1, prev=proto2) + + self.assertEqual(3, len(commit_session)) + for p, r in [(proto1, result1), (proto2, result2), (proto3, result3)]: + self.assertNotEqual(r.id_string, p.id_string) + self.assertEqual(r.originator_client_item_id, p.id_string) + self.assertEqual(r.originator_cache_guid, my_cache_guid) + self.assertTrue(r is not self.model._entries[r.id_string], + "Commit result didn't make a defensive copy.") + self.assertTrue(p is not self.model._entries[r.id_string], + "Commit result didn't make a defensive copy.") + self.assertEqual(commit_session.get(p.id_string), r.id_string) + self.assertTrue(r.version > original_version) + self.assertEqual(result1.parent_id_string, proto1.parent_id_string) + self.assertEqual(result2.parent_id_string, result1.id_string) + version, changes = self.model.GetChangesFromTimestamp([sync_type], + original_version) + self.assertEqual(3, len(changes)) + self.assertEqual(original_version + 3, version) + self.assertEqual([result1, result2, result3], changes) + for c in changes: + self.assertTrue(c is not self.model._entries[c.id_string], + "GetChanges didn't make a defensive copy.") + self.assertTrue(result2.position_in_parent < result3.position_in_parent) + self.assertEqual(0, result2.position_in_parent) + + # Now update the items so that the second item is the parent of the + # first; with the first sandwiched between two new items (4 and 5). + # Do this in a new commit session, meaning we'll reference items from + # the first batch by their post-commit, server IDs. + commit_session = {} + old_cache_guid = my_cache_guid + my_cache_guid = 'A different GUID' + proto2b, result2b = DoCommit(original=result2, + parent=original_changes[-1]) + proto4, result4 = DoCommit(id='ID4', name='Four', + parent=result2, prev=None) + proto1b, result1b = DoCommit(original=result1, + parent=result2, prev=proto4) + proto5, result5 = DoCommit(id='ID5', name='Five', parent=result2, + prev=result1) + + self.assertEqual(2, len(commit_session), + 'Only new items in second batch should be in the session') + for p, r, original in [(proto2b, result2b, proto2), + (proto4, result4, proto4), + (proto1b, result1b, proto1), + (proto5, result5, proto5)]: + self.assertEqual(r.originator_client_item_id, original.id_string) + if original is not p: + self.assertEqual(r.id_string, p.id_string, + 'Ids should be stable after first commit') + self.assertEqual(r.originator_cache_guid, old_cache_guid) + else: + self.assertNotEqual(r.id_string, p.id_string) + self.assertEqual(r.originator_cache_guid, my_cache_guid) + self.assertEqual(commit_session.get(p.id_string), r.id_string) + self.assertTrue(r is not self.model._entries[r.id_string], + "Commit result didn't make a defensive copy.") + self.assertTrue(p is not self.model._entries[r.id_string], + "Commit didn't make a defensive copy.") + self.assertTrue(r.version > p.version) + version, changes = self.model.GetChangesFromTimestamp([sync_type], + original_version) + self.assertEqual(5, len(changes)) + self.assertEqual(original_version + 7, version) + self.assertEqual([result3, result2b, result4, result1b, result5], changes) + for c in changes: + self.assertTrue(c is not self.model._entries[c.id_string], + "GetChanges didn't make a defensive copy.") + self.assertTrue(result4.parent_id_string == + result1b.parent_id_string == + result5.parent_id_string == + result2b.id_string) + self.assertTrue(result4.position_in_parent < + result1b.position_in_parent < + result5.position_in_parent) + +if __name__ == '__main__': + unittest.main()
\ No newline at end of file |