diff options
author | maxbogue <maxbogue@chromium.org> | 2016-02-24 18:09:17 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-02-25 02:10:28 +0000 |
commit | a39eea0ab8d5c18d789567c5ca9af3a1ecba3775 (patch) | |
tree | 3e720be1d153a49b0d19f05d91b6697dcfe96306 /sync | |
parent | 1152e786b74c1bbba74af386eb9757f3f8713add (diff) | |
download | chromium_src-a39eea0ab8d5c18d789567c5ca9af3a1ecba3775.zip chromium_src-a39eea0ab8d5c18d789567c5ca9af3a1ecba3775.tar.gz chromium_src-a39eea0ab8d5c18d789567c5ca9af3a1ecba3775.tar.bz2 |
[Sync] USS: Implement initial GetUpdates + merge handling in SMTP.
- MergeSyncData now takes a map of tag -> data because I need the
tag to generate the ModelTypeEntity anyways.
- Put and Delete are now noops until the initial sync is done.
BUG=569675
Review URL: https://codereview.chromium.org/1717903002
Cr-Commit-Position: refs/heads/master@{#377464}
Diffstat (limited to 'sync')
-rw-r--r-- | sync/api/entity_data.h | 2 | ||||
-rw-r--r-- | sync/api/model_type_service.h | 2 | ||||
-rw-r--r-- | sync/internal_api/public/shared_model_type_processor.h | 11 | ||||
-rw-r--r-- | sync/internal_api/public/test/fake_model_type_service.h | 2 | ||||
-rw-r--r-- | sync/internal_api/shared_model_type_processor.cc | 60 | ||||
-rw-r--r-- | sync/internal_api/shared_model_type_processor_unittest.cc | 149 | ||||
-rw-r--r-- | sync/internal_api/test/fake_model_type_service.cc | 2 |
7 files changed, 165 insertions, 63 deletions
diff --git a/sync/api/entity_data.h b/sync/api/entity_data.h index c1bbba9..842c512 100644 --- a/sync/api/entity_data.h +++ b/sync/api/entity_data.h @@ -5,6 +5,7 @@ #ifndef SYNC_API_ENTITY_DATA_H_ #define SYNC_API_ENTITY_DATA_H_ +#include <map> #include <string> #include <vector> @@ -26,6 +27,7 @@ struct SYNC_EXPORT EntityDataTraits { typedef syncer::ProtoValuePtr<EntityData, EntityDataTraits> EntityDataPtr; typedef std::vector<EntityDataPtr> EntityDataList; +typedef std::map<std::string, EntityDataPtr> EntityDataMap; // A light-weight container for sync entity data which represents either // local data created on the ModelTypeService side or remote data created diff --git a/sync/api/model_type_service.h b/sync/api/model_type_service.h index 469ed4a..950060e 100644 --- a/sync/api/model_type_service.h +++ b/sync/api/model_type_service.h @@ -42,7 +42,7 @@ class SYNC_EXPORT ModelTypeService { // to be called when sync is first turned on, not on every restart. virtual syncer::SyncError MergeSyncData( scoped_ptr<MetadataChangeList> metadata_change_list, - EntityDataList entity_data_list) = 0; + EntityDataMap entity_data_map) = 0; // Apply changes from the sync server locally. // Please note that |entity_changes| might have fewer entries than diff --git a/sync/internal_api/public/shared_model_type_processor.h b/sync/internal_api/public/shared_model_type_processor.h index 782cf8c..b9034fe 100644 --- a/sync/internal_api/public/shared_model_type_processor.h +++ b/sync/internal_api/public/shared_model_type_processor.h @@ -108,8 +108,7 @@ class SYNC_EXPORT SharedModelTypeProcessor : public ModelTypeProcessor, // Handle the first update received from the server after being enabled. void OnInitialUpdateReceived(const sync_pb::DataTypeState& type_state, - const UpdateResponseDataList& updates, - const UpdateResponseDataList& pending_updates); + const UpdateResponseDataList& updates); // Sends all commit requests that are due to be sent to the sync thread. void FlushPendingCommitRequests(); @@ -117,12 +116,16 @@ class SYNC_EXPORT SharedModelTypeProcessor : public ModelTypeProcessor, // Computes the client tag hash for the given client |tag|. std::string GetHashForTag(const std::string& tag); - // Gets the entity for the given tag, which must exist. + // Gets the entity for the given tag, or null if there isn't one. ModelTypeEntity* GetEntityForTag(const std::string& tag); - // Gets the entity for the given tag hash, which must exist. + // Gets the entity for the given tag hash, or null if there isn't one. ModelTypeEntity* GetEntityForTagHash(const std::string& tag_hash); + // Create an entity in the entity map for |tag| and return a pointer to it. + // Requires that no entity for |tag| already exists in the map. + ModelTypeEntity* CreateEntity(const std::string& tag, const EntityData& data); + syncer::ModelType type_; sync_pb::DataTypeState data_type_state_; diff --git a/sync/internal_api/public/test/fake_model_type_service.h b/sync/internal_api/public/test/fake_model_type_service.h index 76d2141..966680a 100644 --- a/sync/internal_api/public/test/fake_model_type_service.h +++ b/sync/internal_api/public/test/fake_model_type_service.h @@ -29,7 +29,7 @@ class FakeModelTypeService syncer::SyncError MergeSyncData( scoped_ptr<MetadataChangeList> metadata_change_list, - EntityDataList entity_data_list) override; + EntityDataMap entity_data_map) override; syncer::SyncError ApplySyncChanges( scoped_ptr<MetadataChangeList> metadata_change_list, diff --git a/sync/internal_api/shared_model_type_processor.cc b/sync/internal_api/shared_model_type_processor.cc index 65e9924..3db885f 100644 --- a/sync/internal_api/shared_model_type_processor.cc +++ b/sync/internal_api/shared_model_type_processor.cc @@ -225,6 +225,11 @@ void SharedModelTypeProcessor::Put(const std::string& client_tag, DCHECK(!entity_data->non_unique_name.empty()); DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(entity_data->specifics)); + if (!data_type_state_.initial_sync_done()) { + // Ignore changes before the initial sync is done. + return; + } + // If the service specified an overriding hash, use that, otherwise generate // one from the tag. // TODO(skym): This behavior should be delayed, once crbug.com/561818 is fixed @@ -263,6 +268,11 @@ void SharedModelTypeProcessor::Delete( MetadataChangeList* metadata_change_list) { DCHECK(IsAllowingChanges()); + if (!data_type_state_.initial_sync_done()) { + // Ignore changes before the initial sync is done. + return; + } + const std::string client_tag_hash( syncer::syncable::GenerateSyncableHash(type_, client_tag)); @@ -350,7 +360,8 @@ void SharedModelTypeProcessor::OnUpdateReceived( const UpdateResponseDataList& updates, const UpdateResponseDataList& pending_updates) { if (!data_type_state_.initial_sync_done()) { - OnInitialUpdateReceived(data_type_state, updates, pending_updates); + OnInitialUpdateReceived(data_type_state, updates); + return; } scoped_ptr<MetadataChangeList> metadata_changes = @@ -458,10 +469,38 @@ void SharedModelTypeProcessor::OnUpdateReceived( void SharedModelTypeProcessor::OnInitialUpdateReceived( const sync_pb::DataTypeState& data_type_state, - const UpdateResponseDataList& updates, - const UpdateResponseDataList& pending_updates) { - // TODO(maxbogue): crbug.com/569675: Generate metadata for all entities. - // TODO(maxbogue): crbug.com/569675: Call merge method on the service. + const UpdateResponseDataList& updates) { + DCHECK(entities_.empty()); + // Ensure that initial sync was not already done and that the worker + // correctly marked initial sync as done for this update. + DCHECK(!data_type_state_.initial_sync_done()); + DCHECK(data_type_state.initial_sync_done()); + + scoped_ptr<MetadataChangeList> metadata_changes = + service_->CreateMetadataChangeList(); + EntityDataMap data_map; + + data_type_state_ = data_type_state; + metadata_changes->UpdateDataTypeState(data_type_state_); + + for (const UpdateResponseData& update : updates) { + const EntityData& data = update.entity.value(); + + // Let the service define a client |tag| based on the entity data. + std::string tag = service_->GetClientTag(data); + + ModelTypeEntity* entity = CreateEntity(tag, data); + entity->ApplyUpdateFromServer(update); + metadata_changes->UpdateMetadata(tag, entity->metadata()); + data_map[tag] = update.entity; + } + + // Let the service handle associating and merging the data. + // TODO(stanisc): crbug.com/570085: Error handling. + service_->MergeSyncData(std::move(metadata_changes), data_map); + + // We may have new reasons to commit by the time this function is done. + FlushPendingCommitRequests(); } UpdateResponseDataList SharedModelTypeProcessor::GetPendingUpdates() { @@ -488,4 +527,15 @@ ModelTypeEntity* SharedModelTypeProcessor::GetEntityForTagHash( return it != entities_.end() ? it->second.get() : nullptr; } +ModelTypeEntity* SharedModelTypeProcessor::CreateEntity( + const std::string& tag, + const EntityData& data) { + DCHECK(entities_.find(data.client_tag_hash) == entities_.end()); + scoped_ptr<ModelTypeEntity> entity = ModelTypeEntity::CreateNew( + tag, data.client_tag_hash, data.id, data.creation_time); + ModelTypeEntity* entity_ptr = entity.get(); + entities_[data.client_tag_hash] = std::move(entity); + return entity_ptr; +} + } // namespace syncer_v2 diff --git a/sync/internal_api/shared_model_type_processor_unittest.cc b/sync/internal_api/shared_model_type_processor_unittest.cc index a79e5a6..5a8cd66 100644 --- a/sync/internal_api/shared_model_type_processor_unittest.cc +++ b/sync/internal_api/shared_model_type_processor_unittest.cc @@ -71,6 +71,11 @@ class SimpleStore { return metadata_store_.find(tag) != metadata_store_.end(); } + const std::unordered_map<std::string, scoped_ptr<EntityData>>& GetAllData() + const { + return data_store_; + } + const EntityData& GetData(const std::string& tag) const { return *data_store_.find(tag)->second; } @@ -189,10 +194,7 @@ class SharedModelTypeProcessorTest : public ::testing::Test, // Local data modification. Emulates signals from the model thread. void WriteItem(const std::string& tag, const std::string& value) { - scoped_ptr<EntityData> entity_data = make_scoped_ptr(new EntityData()); - entity_data->specifics = GenerateSpecifics(tag, value); - entity_data->non_unique_name = tag; - WriteItem(tag, std::move(entity_data)); + WriteItem(tag, GenerateEntityData(tag, value)); } // Overloaded form to allow passing of custom entity data. @@ -247,13 +249,24 @@ class SharedModelTypeProcessorTest : public ::testing::Test, clear_change_processor(); } - // Emulates an "initial sync done" message from the CommitQueue. - void OnInitialSyncDone() { + // Simulates an initial GetUpdates response from the worker with |updates|. + void OnInitialSyncDone(UpdateResponseDataList updates) { sync_pb::DataTypeState data_type_state(db_.data_type_state()); data_type_state.set_initial_sync_done(true); - UpdateResponseDataList empty_update_list; - type_processor()->OnUpdateReceived(data_type_state, empty_update_list, - empty_update_list); + type_processor()->OnUpdateReceived(data_type_state, updates, + UpdateResponseDataList()); + } + + // Overloaded form with no updates. + void OnInitialSyncDone() { OnInitialSyncDone(UpdateResponseDataList()); } + + // Overloaded form that constructs an update for a single entity. + void OnInitialSyncDone(const std::string& tag, const std::string& value) { + UpdateResponseDataList updates; + UpdateResponseData update; + update.entity = GenerateEntityData(tag, value)->PassToPtr(); + updates.push_back(update); + OnInitialSyncDone(updates); } // Emulate updates from the server. @@ -391,6 +404,16 @@ class SharedModelTypeProcessorTest : public ::testing::Test, EXPECT_EQ(value, data.specifics.preference().value()); } + // For each tag in |tags|, expect a corresponding request list of length one. + void ExpectCommitRequests(const std::vector<std::string>& tags) { + EXPECT_EQ(tags.size(), GetNumCommitRequestLists()); + for (size_t i = 0; i < tags.size(); i++) { + const CommitRequestDataList& commits = GetNthCommitRequestList(i); + EXPECT_EQ(1U, commits.size()); + EXPECT_EQ(GenerateTagHash(tags[i]), commits[0].entity->client_tag_hash); + } + } + const SimpleStore& db() const { return db_; } MockCommitQueue* mock_queue() { return mock_queue_; } @@ -425,6 +448,15 @@ class SharedModelTypeProcessorTest : public ::testing::Test, return specifics; } + static scoped_ptr<EntityData> GenerateEntityData(const std::string& tag, + const std::string& value) { + scoped_ptr<EntityData> entity_data = make_scoped_ptr(new EntityData()); + entity_data->client_tag_hash = GenerateTagHash(tag); + entity_data->specifics = GenerateSpecifics(tag, value); + entity_data->non_unique_name = tag; + return entity_data; + } + void OnReadyToConnect(syncer::SyncError error, scoped_ptr<ActivationContext> context) { scoped_ptr<MockCommitQueue> commit_queue(new MockCommitQueue()); @@ -446,6 +478,25 @@ class SharedModelTypeProcessorTest : public ::testing::Test, return scoped_ptr<MetadataChangeList>(new SimpleMetadataChangeList()); } + syncer::SyncError MergeSyncData( + scoped_ptr<MetadataChangeList> metadata_changes, + EntityDataMap data_map) override { + // Commit any local entities that aren't being overwritten by the server. + const auto& local_data = db_.GetAllData(); + for (auto it = local_data.begin(); it != local_data.end(); it++) { + if (data_map.find(it->first) == data_map.end()) { + type_processor()->Put(it->first, CopyEntityData(*it->second), + metadata_changes.get()); + } + } + // Store any new remote entities. + for (auto it = data_map.begin(); it != data_map.end(); it++) { + db_.PutData(it->first, it->second.value()); + } + ApplyMetadataChangeList(std::move(metadata_changes)); + return syncer::SyncError(); + } + syncer::SyncError ApplySyncChanges( scoped_ptr<MetadataChangeList> metadata_changes, EntityChangeList entity_changes) override { @@ -522,6 +573,34 @@ class SharedModelTypeProcessorTest : public ::testing::Test, SimpleStore db_; }; +// Test that an initial sync handles local and remote items properly. +TEST_F(SharedModelTypeProcessorTest, InitialSync) { + CreateProcessor(); + OnMetadataLoaded(); + OnSyncStarting(); + + // Local write before initial sync. + WriteItem("tag1", "value1"); + + // Has data, but no metadata, entity in the processor, or commit request. + EXPECT_EQ(1U, db().DataCount()); + EXPECT_EQ(0U, db().MetadataCount()); + EXPECT_EQ(0U, ProcessorEntityCount()); + EXPECT_EQ(0U, GetNumCommitRequestLists()); + + // Initial sync with one server item. + OnInitialSyncDone("tag2", "value2"); + + // Now have data and metadata for both items, as well as a commit request for + // the local item. + EXPECT_EQ(2U, db().DataCount()); + EXPECT_EQ(2U, db().MetadataCount()); + EXPECT_EQ(2U, ProcessorEntityCount()); + EXPECT_EQ(1, db().GetMetadata("tag1").sequence_number()); + EXPECT_EQ(0, db().GetMetadata("tag2").sequence_number()); + ExpectCommitRequests({"tag1"}); +} + // This test covers race conditions during loading pending data. All cases // start with no processor and one item with a pending commit. There are three // different events that can occur in any order once metadata is loaded: @@ -745,8 +824,7 @@ TEST_F(SharedModelTypeProcessorTest, CreateLocalItem) { WriteItem("tag1", "value1"); // Verify the commit request this operation has triggered. - EXPECT_EQ(1U, GetNumCommitRequestLists()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1"}); const CommitRequestData& tag1_request_data = GetLatestCommitRequestForTag("tag1"); const EntityData& tag1_data = tag1_request_data.entity.value(); @@ -839,22 +917,20 @@ TEST_F(SharedModelTypeProcessorTest, CreateAndModifyWithOverrides) { // Thoroughly tests data generated by modification of server-unknown item. TEST_F(SharedModelTypeProcessorTest, CreateAndModifyLocalItem) { InitializeToReadyState(); - EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); - EXPECT_EQ(1U, GetNumCommitRequestLists()); EXPECT_EQ(1U, db().MetadataCount()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1"}); + const CommitRequestData& request_data_v1 = GetLatestCommitRequestForTag("tag1"); const EntityData& data_v1 = request_data_v1.entity.value(); const sync_pb::EntityMetadata metadata_v1 = db().GetMetadata("tag1"); WriteItem("tag1", "value2"); - EXPECT_EQ(2U, GetNumCommitRequestLists()); EXPECT_EQ(1U, db().MetadataCount()); + ExpectCommitRequests({"tag1", "tag1"}); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); const CommitRequestData& request_data_v2 = GetLatestCommitRequestForTag("tag1"); const EntityData& data_v2 = request_data_v2.entity.value(); @@ -913,16 +989,14 @@ TEST_F(SharedModelTypeProcessorTest, DeleteServerUnknown) { // commit at all. WriteItem("tag1", "value1"); - EXPECT_EQ(1U, GetNumCommitRequestLists()); EXPECT_EQ(1U, db().MetadataCount()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1"}); const CommitRequestData& data_v1 = GetLatestCommitRequestForTag("tag1"); const sync_pb::EntityMetadata metadata_v1 = db().GetMetadata("tag1"); DeleteItem("tag1"); - EXPECT_EQ(2U, GetNumCommitRequestLists()); EXPECT_EQ(1U, db().MetadataCount()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1", "tag1"}); const CommitRequestData& data_v2 = GetLatestCommitRequestForTag("tag1"); const sync_pb::EntityMetadata metadata_v2 = db().GetMetadata("tag1"); @@ -959,18 +1033,16 @@ TEST_F(SharedModelTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) { InitializeToReadyState(); WriteItem("tag1", "value1"); - EXPECT_EQ(1U, GetNumCommitRequestLists()); EXPECT_EQ(1U, db().DataCount()); EXPECT_EQ(1U, db().MetadataCount()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1"}); const CommitRequestData& data_v1 = GetLatestCommitRequestForTag("tag1"); EXPECT_FALSE(db().GetMetadata("tag1").is_deleted()); DeleteItem("tag1"); - EXPECT_EQ(2U, GetNumCommitRequestLists()); EXPECT_EQ(0U, db().DataCount()); EXPECT_EQ(1U, db().MetadataCount()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1", "tag1"}); EXPECT_TRUE(db().GetMetadata("tag1").is_deleted()); // This commit happened while the deletion was in progress, but the commit @@ -1010,9 +1082,7 @@ TEST_F(SharedModelTypeProcessorTest, TwoIndependentItems) { const sync_pb::EntityMetadata metadata1 = db().GetMetadata("tag1"); // There should be one commit request for this item only. - ASSERT_EQ(1U, GetNumCommitRequestLists()); - EXPECT_EQ(1U, GetNthCommitRequestList(0).size()); - ASSERT_TRUE(HasCommitRequestForTag("tag1")); + ExpectCommitRequests({"tag1"}); WriteItem("tag2", "value2"); EXPECT_EQ(2U, db().DataCount()); @@ -1020,9 +1090,7 @@ TEST_F(SharedModelTypeProcessorTest, TwoIndependentItems) { const sync_pb::EntityMetadata metadata2 = db().GetMetadata("tag2"); // The second write should trigger another single-item commit request. - ASSERT_EQ(2U, GetNumCommitRequestLists()); - EXPECT_EQ(1U, GetNthCommitRequestList(1).size()); - ASSERT_TRUE(HasCommitRequestForTag("tag2")); + ExpectCommitRequests({"tag1", "tag2"}); EXPECT_FALSE(metadata1.is_deleted()); EXPECT_EQ(1, metadata1.sequence_number()); @@ -1035,27 +1103,6 @@ TEST_F(SharedModelTypeProcessorTest, TwoIndependentItems) { EXPECT_EQ(kUncommittedVersion, metadata2.server_version()); } -// Starts the type sync proxy with no local state. -// Verify that it waits until initial sync is complete before requesting -// commits. -TEST_F(SharedModelTypeProcessorTest, NoCommitsUntilInitialSyncDone) { - CreateProcessor(); - OnMetadataLoaded(); - OnSyncStarting(); - - WriteItem("tag1", "value1"); - EXPECT_EQ(0U, GetNumCommitRequestLists()); - - // Even though there the item hasn't been committed its metadata should have - // already been updated and the sequence number changed. - EXPECT_EQ(1U, db().MetadataCount()); - EXPECT_EQ(1, db().GetMetadata("tag1").sequence_number()); - - OnInitialSyncDone(); - EXPECT_EQ(1U, GetNumCommitRequestLists()); - EXPECT_TRUE(HasCommitRequestForTag("tag1")); -} - // Test proper handling of disconnect and reconnect. // // Creates items in various states of commit and verifies they re-attempt to diff --git a/sync/internal_api/test/fake_model_type_service.cc b/sync/internal_api/test/fake_model_type_service.cc index 999d501..8f1e14c 100644 --- a/sync/internal_api/test/fake_model_type_service.cc +++ b/sync/internal_api/test/fake_model_type_service.cc @@ -17,7 +17,7 @@ FakeModelTypeService::CreateMetadataChangeList() { syncer::SyncError FakeModelTypeService::MergeSyncData( scoped_ptr<MetadataChangeList> metadata_change_list, - EntityDataList entity_data_list) { + EntityDataMap entity_data_map) { return syncer::SyncError(); } |