summaryrefslogtreecommitdiffstats
path: root/net/tools/testserver/chromiumsync_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'net/tools/testserver/chromiumsync_test.py')
-rwxr-xr-xnet/tools/testserver/chromiumsync_test.py317
1 files changed, 317 insertions, 0 deletions
diff --git a/net/tools/testserver/chromiumsync_test.py b/net/tools/testserver/chromiumsync_test.py
new file mode 100755
index 0000000..bb73d05
--- /dev/null
+++ b/net/tools/testserver/chromiumsync_test.py
@@ -0,0 +1,317 @@
+#!/usr/bin/python2.4
+# Copyright (c) 2010 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests exercising chromiumsync and SyncDataModel."""
+
+import unittest
+
+from google.protobuf import text_format
+
+import chromiumsync
+import sync_pb2
+
+class SyncDataModelTest(unittest.TestCase):
+ def setUp(self):
+ self.model = chromiumsync.SyncDataModel()
+
+ def AddToModel(self, proto):
+ self.model._entries[proto.id_string] = proto
+
+ def testPermanentItemSpecs(self):
+ SPECS = chromiumsync.SyncDataModel._PERMANENT_ITEM_SPECS
+ # parent_tags must be declared before use.
+ declared_specs = set(['0'])
+ for spec in SPECS:
+ self.assertTrue(spec.parent_tag in declared_specs)
+ declared_specs.add(spec.tag)
+ # Every sync datatype should have a permanent folder associated with it.
+ unique_datatypes = set([x.sync_type for x in SPECS])
+ self.assertEqual(unique_datatypes,
+ set(chromiumsync.ALL_TYPES))
+
+ def testSaveEntry(self):
+ proto = sync_pb2.SyncEntity()
+ proto.id_string = 'abcd';
+ proto.version = 0;
+ self.assertFalse(self.model._ItemExists(proto.id_string))
+ self.model._SaveEntry(proto)
+ self.assertEqual(1, proto.version)
+ self.assertTrue(self.model._ItemExists(proto.id_string))
+ self.model._SaveEntry(proto)
+ self.assertEqual(2, proto.version)
+ proto.version = 0
+ self.assertTrue(self.model._ItemExists(proto.id_string))
+ self.assertEqual(2, self.model._entries[proto.id_string].version)
+
+ def testWritePosition(self):
+ def MakeProto(id_string, parent, position):
+ proto = sync_pb2.SyncEntity()
+ proto.id_string = id_string
+ proto.position_in_parent = position
+ proto.parent_id_string = parent
+ self.AddToModel(proto)
+
+ MakeProto('a', 'X', 1000)
+ MakeProto('b', 'X', 1800)
+ MakeProto('c', 'X', 2600)
+ MakeProto('a1', 'Z', 1007)
+ MakeProto('a2', 'Z', 1807)
+ MakeProto('a3', 'Z', 2607)
+ MakeProto('s', 'Y', 10000)
+
+ def AssertPositionResult(my_id, parent_id, prev_id, expected_position):
+ entry = sync_pb2.SyncEntity()
+ entry.id_string = my_id
+ self.model._WritePosition(entry, parent_id, prev_id)
+ self.assertEqual(expected_position, entry.position_in_parent)
+ self.assertEqual(parent_id, entry.parent_id_string)
+ self.assertFalse(entry.HasField('insert_after_item_id'))
+
+ AssertPositionResult('new', 'new_parent', '', 0)
+ AssertPositionResult('new', 'Y', '', 10000 - (2 ** 20))
+ AssertPositionResult('new', 'Y', 's', 10000 + (2 ** 20))
+ AssertPositionResult('s', 'Y', '', 10000)
+ AssertPositionResult('s', 'Y', 's', 10000)
+ AssertPositionResult('a1', 'Z', '', 1007)
+
+ AssertPositionResult('new', 'X', '', 1000 - (2 ** 20))
+ AssertPositionResult('new', 'X', 'a', 1100)
+ AssertPositionResult('new', 'X', 'b', 1900)
+ AssertPositionResult('new', 'X', 'c', 2600 + (2 ** 20))
+
+ AssertPositionResult('a1', 'X', '', 1000 - (2 ** 20))
+ AssertPositionResult('a1', 'X', 'a', 1100)
+ AssertPositionResult('a1', 'X', 'b', 1900)
+ AssertPositionResult('a1', 'X', 'c', 2600 + (2 ** 20))
+
+ AssertPositionResult('a', 'X', '', 1000)
+ AssertPositionResult('a', 'X', 'b', 1900)
+ AssertPositionResult('a', 'X', 'c', 2600 + (2 ** 20))
+
+ AssertPositionResult('b', 'X', '', 1000 - (2 ** 20))
+ AssertPositionResult('b', 'X', 'a', 1800)
+ AssertPositionResult('b', 'X', 'c', 2600 + (2 ** 20))
+
+ AssertPositionResult('c', 'X', '', 1000 - (2 ** 20))
+ AssertPositionResult('c', 'X', 'a', 1100)
+ AssertPositionResult('c', 'X', 'b', 2600)
+
+ def testCreatePermanentItems(self):
+ self.model._CreatePermanentItems(chromiumsync.ALL_TYPES)
+ self.assertEqual(len(chromiumsync.ALL_TYPES) + 2,
+ len(self.model._entries))
+
+ def ExpectedPermanentItemCount(self, sync_type):
+ if sync_type == chromiumsync.BOOKMARK:
+ return 4
+ elif sync_type == chromiumsync.TOP_LEVEL:
+ return 1
+ else:
+ return 2
+
+ def testGetChangesFromTimestampZeroForEachType(self):
+ for sync_type in chromiumsync.ALL_TYPES:
+ self.model = chromiumsync.SyncDataModel()
+ request_types = [sync_type, chromiumsync.TOP_LEVEL]
+
+ version, changes = self.model.GetChangesFromTimestamp(request_types, 0)
+
+ expected_count = self.ExpectedPermanentItemCount(sync_type)
+ self.assertEqual(expected_count, version)
+ self.assertEqual(expected_count, len(changes))
+ self.assertEqual('google_chrome', changes[0].server_defined_unique_tag)
+ for change in changes:
+ self.assertTrue(change.HasField('server_defined_unique_tag'))
+ self.assertEqual(change.version, change.sync_timestamp)
+ self.assertTrue(change.version <= version)
+
+ # Test idempotence: another GetUpdates from ts=0 shouldn't recreate.
+ version, changes = self.model.GetChangesFromTimestamp(request_types, 0)
+ self.assertEqual(expected_count, version)
+ self.assertEqual(expected_count, len(changes))
+
+ # Doing a wider GetUpdates from timestamp zero shouldn't recreate either.
+ new_version, changes = self.model.GetChangesFromTimestamp(
+ chromiumsync.ALL_TYPES, 0)
+ self.assertEqual(len(chromiumsync.SyncDataModel._PERMANENT_ITEM_SPECS),
+ new_version)
+ self.assertEqual(new_version, len(changes))
+ version, changes = self.model.GetChangesFromTimestamp(request_types, 0)
+ self.assertEqual(new_version, version)
+ self.assertEqual(expected_count, len(changes))
+
+ def testBatchSize(self):
+ for sync_type in chromiumsync.ALL_TYPES[1:]:
+ specifics = chromiumsync.GetDefaultEntitySpecifics(sync_type)
+ self.model = chromiumsync.SyncDataModel()
+ request_types = [sync_type, chromiumsync.TOP_LEVEL]
+
+ for i in range(self.model._BATCH_SIZE*3):
+ entry = sync_pb2.SyncEntity()
+ entry.id_string = 'batch test %d' % i
+ entry.specifics.CopyFrom(specifics)
+ self.model._SaveEntry(entry)
+ version, changes = self.model.GetChangesFromTimestamp(request_types, 0)
+ self.assertEqual(self.model._BATCH_SIZE, version)
+ version, changes = self.model.GetChangesFromTimestamp(request_types,
+ version)
+ self.assertEqual(self.model._BATCH_SIZE*2, version)
+ version, changes = self.model.GetChangesFromTimestamp(request_types,
+ version)
+ self.assertEqual(self.model._BATCH_SIZE*3, version)
+ expected_dingleberry = self.ExpectedPermanentItemCount(sync_type)
+ version, changes = self.model.GetChangesFromTimestamp(request_types,
+ version)
+ self.assertEqual(self.model._BATCH_SIZE*3 + expected_dingleberry,
+ version)
+
+ # Now delete a third of the items.
+ for i in xrange(self.model._BATCH_SIZE*3 - 1, 0, -3):
+ entry = sync_pb2.SyncEntity()
+ entry.id_string = 'batch test %d' % i
+ entry.deleted = True
+ self.model._SaveEntry(entry)
+
+ # The batch counts shouldn't change.
+ version, changes = self.model.GetChangesFromTimestamp(request_types, 0)
+ self.assertEqual(self.model._BATCH_SIZE, len(changes))
+ version, changes = self.model.GetChangesFromTimestamp(request_types,
+ version)
+ self.assertEqual(self.model._BATCH_SIZE, len(changes))
+ version, changes = self.model.GetChangesFromTimestamp(request_types,
+ version)
+ self.assertEqual(self.model._BATCH_SIZE, len(changes))
+ expected_dingleberry = self.ExpectedPermanentItemCount(sync_type)
+ version, changes = self.model.GetChangesFromTimestamp(request_types,
+ version)
+ self.assertEqual(expected_dingleberry, len(changes))
+ self.assertEqual(self.model._BATCH_SIZE*4 + expected_dingleberry, version)
+
+ def testCommitEachDataType(self):
+ for sync_type in chromiumsync.ALL_TYPES[1:]:
+ specifics = chromiumsync.GetDefaultEntitySpecifics(sync_type)
+ self.model = chromiumsync.SyncDataModel()
+ my_cache_guid = '112358132134'
+ parent = 'foobar'
+ commit_session = {}
+
+ # Start with a GetUpdates from timestamp 0, to populate permanent items.
+ original_version, original_changes = (
+ self.model.GetChangesFromTimestamp([sync_type], 0))
+
+ def DoCommit(original=None, id='', name=None, parent=None, prev=None):
+ proto = sync_pb2.SyncEntity()
+ if original is not None:
+ proto.version = original.version
+ proto.id_string = original.id_string
+ proto.parent_id_string = original.parent_id_string
+ proto.name = original.name
+ else:
+ proto.id_string = id
+ proto.version = 0
+ proto.specifics.CopyFrom(specifics)
+ if name is not None:
+ proto.name = name
+ if parent:
+ proto.parent_id_string = parent.id_string
+ if prev:
+ proto.insert_after_item_id = prev.id_string
+ else:
+ proto.insert_after_item_id = ''
+ proto.folder = True
+ proto.deleted = False
+ result = self.model.CommitEntry(proto, my_cache_guid, commit_session)
+ self.assertTrue(result)
+ return (proto, result)
+
+ # Commit a new item.
+ proto1, result1 = DoCommit(name='namae', id='Foo',
+ parent=original_changes[-1])
+ # Commit an item whose parent is another item (referenced via the
+ # pre-commit ID).
+ proto2, result2 = DoCommit(name='Secondo', id='Bar',
+ parent=proto1)
+ # Commit a sibling of the second item.
+ proto3, result3 = DoCommit(name='Third!', id='Baz',
+ parent=proto1, prev=proto2)
+
+ self.assertEqual(3, len(commit_session))
+ for p, r in [(proto1, result1), (proto2, result2), (proto3, result3)]:
+ self.assertNotEqual(r.id_string, p.id_string)
+ self.assertEqual(r.originator_client_item_id, p.id_string)
+ self.assertEqual(r.originator_cache_guid, my_cache_guid)
+ self.assertTrue(r is not self.model._entries[r.id_string],
+ "Commit result didn't make a defensive copy.")
+ self.assertTrue(p is not self.model._entries[r.id_string],
+ "Commit result didn't make a defensive copy.")
+ self.assertEqual(commit_session.get(p.id_string), r.id_string)
+ self.assertTrue(r.version > original_version)
+ self.assertEqual(result1.parent_id_string, proto1.parent_id_string)
+ self.assertEqual(result2.parent_id_string, result1.id_string)
+ version, changes = self.model.GetChangesFromTimestamp([sync_type],
+ original_version)
+ self.assertEqual(3, len(changes))
+ self.assertEqual(original_version + 3, version)
+ self.assertEqual([result1, result2, result3], changes)
+ for c in changes:
+ self.assertTrue(c is not self.model._entries[c.id_string],
+ "GetChanges didn't make a defensive copy.")
+ self.assertTrue(result2.position_in_parent < result3.position_in_parent)
+ self.assertEqual(0, result2.position_in_parent)
+
+ # Now update the items so that the second item is the parent of the
+ # first; with the first sandwiched between two new items (4 and 5).
+ # Do this in a new commit session, meaning we'll reference items from
+ # the first batch by their post-commit, server IDs.
+ commit_session = {}
+ old_cache_guid = my_cache_guid
+ my_cache_guid = 'A different GUID'
+ proto2b, result2b = DoCommit(original=result2,
+ parent=original_changes[-1])
+ proto4, result4 = DoCommit(id='ID4', name='Four',
+ parent=result2, prev=None)
+ proto1b, result1b = DoCommit(original=result1,
+ parent=result2, prev=proto4)
+ proto5, result5 = DoCommit(id='ID5', name='Five', parent=result2,
+ prev=result1)
+
+ self.assertEqual(2, len(commit_session),
+ 'Only new items in second batch should be in the session')
+ for p, r, original in [(proto2b, result2b, proto2),
+ (proto4, result4, proto4),
+ (proto1b, result1b, proto1),
+ (proto5, result5, proto5)]:
+ self.assertEqual(r.originator_client_item_id, original.id_string)
+ if original is not p:
+ self.assertEqual(r.id_string, p.id_string,
+ 'Ids should be stable after first commit')
+ self.assertEqual(r.originator_cache_guid, old_cache_guid)
+ else:
+ self.assertNotEqual(r.id_string, p.id_string)
+ self.assertEqual(r.originator_cache_guid, my_cache_guid)
+ self.assertEqual(commit_session.get(p.id_string), r.id_string)
+ self.assertTrue(r is not self.model._entries[r.id_string],
+ "Commit result didn't make a defensive copy.")
+ self.assertTrue(p is not self.model._entries[r.id_string],
+ "Commit didn't make a defensive copy.")
+ self.assertTrue(r.version > p.version)
+ version, changes = self.model.GetChangesFromTimestamp([sync_type],
+ original_version)
+ self.assertEqual(5, len(changes))
+ self.assertEqual(original_version + 7, version)
+ self.assertEqual([result3, result2b, result4, result1b, result5], changes)
+ for c in changes:
+ self.assertTrue(c is not self.model._entries[c.id_string],
+ "GetChanges didn't make a defensive copy.")
+ self.assertTrue(result4.parent_id_string ==
+ result1b.parent_id_string ==
+ result5.parent_id_string ==
+ result2b.id_string)
+ self.assertTrue(result4.position_in_parent <
+ result1b.position_in_parent <
+ result5.position_in_parent)
+
+if __name__ == '__main__':
+ unittest.main() \ No newline at end of file