summaryrefslogtreecommitdiffstats
path: root/sync
diff options
context:
space:
mode:
authormaxbogue <maxbogue@chromium.org>2016-02-24 18:09:17 -0800
committerCommit bot <commit-bot@chromium.org>2016-02-25 02:10:28 +0000
commita39eea0ab8d5c18d789567c5ca9af3a1ecba3775 (patch)
tree3e720be1d153a49b0d19f05d91b6697dcfe96306 /sync
parent1152e786b74c1bbba74af386eb9757f3f8713add (diff)
downloadchromium_src-a39eea0ab8d5c18d789567c5ca9af3a1ecba3775.zip
chromium_src-a39eea0ab8d5c18d789567c5ca9af3a1ecba3775.tar.gz
chromium_src-a39eea0ab8d5c18d789567c5ca9af3a1ecba3775.tar.bz2
[Sync] USS: Implement initial GetUpdates + merge handling in SMTP.
- MergeSyncData now takes a map of tag -> data because I need the tag to generate the ModelTypeEntity anyways. - Put and Delete are now noops until the initial sync is done. BUG=569675 Review URL: https://codereview.chromium.org/1717903002 Cr-Commit-Position: refs/heads/master@{#377464}
Diffstat (limited to 'sync')
-rw-r--r--sync/api/entity_data.h2
-rw-r--r--sync/api/model_type_service.h2
-rw-r--r--sync/internal_api/public/shared_model_type_processor.h11
-rw-r--r--sync/internal_api/public/test/fake_model_type_service.h2
-rw-r--r--sync/internal_api/shared_model_type_processor.cc60
-rw-r--r--sync/internal_api/shared_model_type_processor_unittest.cc149
-rw-r--r--sync/internal_api/test/fake_model_type_service.cc2
7 files changed, 165 insertions, 63 deletions
diff --git a/sync/api/entity_data.h b/sync/api/entity_data.h
index c1bbba9..842c512 100644
--- a/sync/api/entity_data.h
+++ b/sync/api/entity_data.h
@@ -5,6 +5,7 @@
#ifndef SYNC_API_ENTITY_DATA_H_
#define SYNC_API_ENTITY_DATA_H_
+#include <map>
#include <string>
#include <vector>
@@ -26,6 +27,7 @@ struct SYNC_EXPORT EntityDataTraits {
typedef syncer::ProtoValuePtr<EntityData, EntityDataTraits> EntityDataPtr;
typedef std::vector<EntityDataPtr> EntityDataList;
+typedef std::map<std::string, EntityDataPtr> EntityDataMap;
// A light-weight container for sync entity data which represents either
// local data created on the ModelTypeService side or remote data created
diff --git a/sync/api/model_type_service.h b/sync/api/model_type_service.h
index 469ed4a..950060e 100644
--- a/sync/api/model_type_service.h
+++ b/sync/api/model_type_service.h
@@ -42,7 +42,7 @@ class SYNC_EXPORT ModelTypeService {
// to be called when sync is first turned on, not on every restart.
virtual syncer::SyncError MergeSyncData(
scoped_ptr<MetadataChangeList> metadata_change_list,
- EntityDataList entity_data_list) = 0;
+ EntityDataMap entity_data_map) = 0;
// Apply changes from the sync server locally.
// Please note that |entity_changes| might have fewer entries than
diff --git a/sync/internal_api/public/shared_model_type_processor.h b/sync/internal_api/public/shared_model_type_processor.h
index 782cf8c..b9034fe 100644
--- a/sync/internal_api/public/shared_model_type_processor.h
+++ b/sync/internal_api/public/shared_model_type_processor.h
@@ -108,8 +108,7 @@ class SYNC_EXPORT SharedModelTypeProcessor : public ModelTypeProcessor,
// Handle the first update received from the server after being enabled.
void OnInitialUpdateReceived(const sync_pb::DataTypeState& type_state,
- const UpdateResponseDataList& updates,
- const UpdateResponseDataList& pending_updates);
+ const UpdateResponseDataList& updates);
// Sends all commit requests that are due to be sent to the sync thread.
void FlushPendingCommitRequests();
@@ -117,12 +116,16 @@ class SYNC_EXPORT SharedModelTypeProcessor : public ModelTypeProcessor,
// Computes the client tag hash for the given client |tag|.
std::string GetHashForTag(const std::string& tag);
- // Gets the entity for the given tag, which must exist.
+ // Gets the entity for the given tag, or null if there isn't one.
ModelTypeEntity* GetEntityForTag(const std::string& tag);
- // Gets the entity for the given tag hash, which must exist.
+ // Gets the entity for the given tag hash, or null if there isn't one.
ModelTypeEntity* GetEntityForTagHash(const std::string& tag_hash);
+ // Create an entity in the entity map for |tag| and return a pointer to it.
+ // Requires that no entity for |tag| already exists in the map.
+ ModelTypeEntity* CreateEntity(const std::string& tag, const EntityData& data);
+
syncer::ModelType type_;
sync_pb::DataTypeState data_type_state_;
diff --git a/sync/internal_api/public/test/fake_model_type_service.h b/sync/internal_api/public/test/fake_model_type_service.h
index 76d2141..966680a 100644
--- a/sync/internal_api/public/test/fake_model_type_service.h
+++ b/sync/internal_api/public/test/fake_model_type_service.h
@@ -29,7 +29,7 @@ class FakeModelTypeService
syncer::SyncError MergeSyncData(
scoped_ptr<MetadataChangeList> metadata_change_list,
- EntityDataList entity_data_list) override;
+ EntityDataMap entity_data_map) override;
syncer::SyncError ApplySyncChanges(
scoped_ptr<MetadataChangeList> metadata_change_list,
diff --git a/sync/internal_api/shared_model_type_processor.cc b/sync/internal_api/shared_model_type_processor.cc
index 65e9924..3db885f 100644
--- a/sync/internal_api/shared_model_type_processor.cc
+++ b/sync/internal_api/shared_model_type_processor.cc
@@ -225,6 +225,11 @@ void SharedModelTypeProcessor::Put(const std::string& client_tag,
DCHECK(!entity_data->non_unique_name.empty());
DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(entity_data->specifics));
+ if (!data_type_state_.initial_sync_done()) {
+ // Ignore changes before the initial sync is done.
+ return;
+ }
+
// If the service specified an overriding hash, use that, otherwise generate
// one from the tag.
// TODO(skym): This behavior should be delayed, once crbug.com/561818 is fixed
@@ -263,6 +268,11 @@ void SharedModelTypeProcessor::Delete(
MetadataChangeList* metadata_change_list) {
DCHECK(IsAllowingChanges());
+ if (!data_type_state_.initial_sync_done()) {
+ // Ignore changes before the initial sync is done.
+ return;
+ }
+
const std::string client_tag_hash(
syncer::syncable::GenerateSyncableHash(type_, client_tag));
@@ -350,7 +360,8 @@ void SharedModelTypeProcessor::OnUpdateReceived(
const UpdateResponseDataList& updates,
const UpdateResponseDataList& pending_updates) {
if (!data_type_state_.initial_sync_done()) {
- OnInitialUpdateReceived(data_type_state, updates, pending_updates);
+ OnInitialUpdateReceived(data_type_state, updates);
+ return;
}
scoped_ptr<MetadataChangeList> metadata_changes =
@@ -458,10 +469,38 @@ void SharedModelTypeProcessor::OnUpdateReceived(
void SharedModelTypeProcessor::OnInitialUpdateReceived(
const sync_pb::DataTypeState& data_type_state,
- const UpdateResponseDataList& updates,
- const UpdateResponseDataList& pending_updates) {
- // TODO(maxbogue): crbug.com/569675: Generate metadata for all entities.
- // TODO(maxbogue): crbug.com/569675: Call merge method on the service.
+ const UpdateResponseDataList& updates) {
+ DCHECK(entities_.empty());
+ // Ensure that initial sync was not already done and that the worker
+ // correctly marked initial sync as done for this update.
+ DCHECK(!data_type_state_.initial_sync_done());
+ DCHECK(data_type_state.initial_sync_done());
+
+ scoped_ptr<MetadataChangeList> metadata_changes =
+ service_->CreateMetadataChangeList();
+ EntityDataMap data_map;
+
+ data_type_state_ = data_type_state;
+ metadata_changes->UpdateDataTypeState(data_type_state_);
+
+ for (const UpdateResponseData& update : updates) {
+ const EntityData& data = update.entity.value();
+
+ // Let the service define a client |tag| based on the entity data.
+ std::string tag = service_->GetClientTag(data);
+
+ ModelTypeEntity* entity = CreateEntity(tag, data);
+ entity->ApplyUpdateFromServer(update);
+ metadata_changes->UpdateMetadata(tag, entity->metadata());
+ data_map[tag] = update.entity;
+ }
+
+ // Let the service handle associating and merging the data.
+ // TODO(stanisc): crbug.com/570085: Error handling.
+ service_->MergeSyncData(std::move(metadata_changes), data_map);
+
+ // We may have new reasons to commit by the time this function is done.
+ FlushPendingCommitRequests();
}
UpdateResponseDataList SharedModelTypeProcessor::GetPendingUpdates() {
@@ -488,4 +527,15 @@ ModelTypeEntity* SharedModelTypeProcessor::GetEntityForTagHash(
return it != entities_.end() ? it->second.get() : nullptr;
}
+ModelTypeEntity* SharedModelTypeProcessor::CreateEntity(
+ const std::string& tag,
+ const EntityData& data) {
+ DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
+ scoped_ptr<ModelTypeEntity> entity = ModelTypeEntity::CreateNew(
+ tag, data.client_tag_hash, data.id, data.creation_time);
+ ModelTypeEntity* entity_ptr = entity.get();
+ entities_[data.client_tag_hash] = std::move(entity);
+ return entity_ptr;
+}
+
} // namespace syncer_v2
diff --git a/sync/internal_api/shared_model_type_processor_unittest.cc b/sync/internal_api/shared_model_type_processor_unittest.cc
index a79e5a6..5a8cd66 100644
--- a/sync/internal_api/shared_model_type_processor_unittest.cc
+++ b/sync/internal_api/shared_model_type_processor_unittest.cc
@@ -71,6 +71,11 @@ class SimpleStore {
return metadata_store_.find(tag) != metadata_store_.end();
}
+ const std::unordered_map<std::string, scoped_ptr<EntityData>>& GetAllData()
+ const {
+ return data_store_;
+ }
+
const EntityData& GetData(const std::string& tag) const {
return *data_store_.find(tag)->second;
}
@@ -189,10 +194,7 @@ class SharedModelTypeProcessorTest : public ::testing::Test,
// Local data modification. Emulates signals from the model thread.
void WriteItem(const std::string& tag, const std::string& value) {
- scoped_ptr<EntityData> entity_data = make_scoped_ptr(new EntityData());
- entity_data->specifics = GenerateSpecifics(tag, value);
- entity_data->non_unique_name = tag;
- WriteItem(tag, std::move(entity_data));
+ WriteItem(tag, GenerateEntityData(tag, value));
}
// Overloaded form to allow passing of custom entity data.
@@ -247,13 +249,24 @@ class SharedModelTypeProcessorTest : public ::testing::Test,
clear_change_processor();
}
- // Emulates an "initial sync done" message from the CommitQueue.
- void OnInitialSyncDone() {
+ // Simulates an initial GetUpdates response from the worker with |updates|.
+ void OnInitialSyncDone(UpdateResponseDataList updates) {
sync_pb::DataTypeState data_type_state(db_.data_type_state());
data_type_state.set_initial_sync_done(true);
- UpdateResponseDataList empty_update_list;
- type_processor()->OnUpdateReceived(data_type_state, empty_update_list,
- empty_update_list);
+ type_processor()->OnUpdateReceived(data_type_state, updates,
+ UpdateResponseDataList());
+ }
+
+ // Overloaded form with no updates.
+ void OnInitialSyncDone() { OnInitialSyncDone(UpdateResponseDataList()); }
+
+ // Overloaded form that constructs an update for a single entity.
+ void OnInitialSyncDone(const std::string& tag, const std::string& value) {
+ UpdateResponseDataList updates;
+ UpdateResponseData update;
+ update.entity = GenerateEntityData(tag, value)->PassToPtr();
+ updates.push_back(update);
+ OnInitialSyncDone(updates);
}
// Emulate updates from the server.
@@ -391,6 +404,16 @@ class SharedModelTypeProcessorTest : public ::testing::Test,
EXPECT_EQ(value, data.specifics.preference().value());
}
+ // For each tag in |tags|, expect a corresponding request list of length one.
+ void ExpectCommitRequests(const std::vector<std::string>& tags) {
+ EXPECT_EQ(tags.size(), GetNumCommitRequestLists());
+ for (size_t i = 0; i < tags.size(); i++) {
+ const CommitRequestDataList& commits = GetNthCommitRequestList(i);
+ EXPECT_EQ(1U, commits.size());
+ EXPECT_EQ(GenerateTagHash(tags[i]), commits[0].entity->client_tag_hash);
+ }
+ }
+
const SimpleStore& db() const { return db_; }
MockCommitQueue* mock_queue() { return mock_queue_; }
@@ -425,6 +448,15 @@ class SharedModelTypeProcessorTest : public ::testing::Test,
return specifics;
}
+ static scoped_ptr<EntityData> GenerateEntityData(const std::string& tag,
+ const std::string& value) {
+ scoped_ptr<EntityData> entity_data = make_scoped_ptr(new EntityData());
+ entity_data->client_tag_hash = GenerateTagHash(tag);
+ entity_data->specifics = GenerateSpecifics(tag, value);
+ entity_data->non_unique_name = tag;
+ return entity_data;
+ }
+
void OnReadyToConnect(syncer::SyncError error,
scoped_ptr<ActivationContext> context) {
scoped_ptr<MockCommitQueue> commit_queue(new MockCommitQueue());
@@ -446,6 +478,25 @@ class SharedModelTypeProcessorTest : public ::testing::Test,
return scoped_ptr<MetadataChangeList>(new SimpleMetadataChangeList());
}
+ syncer::SyncError MergeSyncData(
+ scoped_ptr<MetadataChangeList> metadata_changes,
+ EntityDataMap data_map) override {
+ // Commit any local entities that aren't being overwritten by the server.
+ const auto& local_data = db_.GetAllData();
+ for (auto it = local_data.begin(); it != local_data.end(); it++) {
+ if (data_map.find(it->first) == data_map.end()) {
+ type_processor()->Put(it->first, CopyEntityData(*it->second),
+ metadata_changes.get());
+ }
+ }
+ // Store any new remote entities.
+ for (auto it = data_map.begin(); it != data_map.end(); it++) {
+ db_.PutData(it->first, it->second.value());
+ }
+ ApplyMetadataChangeList(std::move(metadata_changes));
+ return syncer::SyncError();
+ }
+
syncer::SyncError ApplySyncChanges(
scoped_ptr<MetadataChangeList> metadata_changes,
EntityChangeList entity_changes) override {
@@ -522,6 +573,34 @@ class SharedModelTypeProcessorTest : public ::testing::Test,
SimpleStore db_;
};
+// Test that an initial sync handles local and remote items properly.
+TEST_F(SharedModelTypeProcessorTest, InitialSync) {
+ CreateProcessor();
+ OnMetadataLoaded();
+ OnSyncStarting();
+
+ // Local write before initial sync.
+ WriteItem("tag1", "value1");
+
+ // Has data, but no metadata, entity in the processor, or commit request.
+ EXPECT_EQ(1U, db().DataCount());
+ EXPECT_EQ(0U, db().MetadataCount());
+ EXPECT_EQ(0U, ProcessorEntityCount());
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ // Initial sync with one server item.
+ OnInitialSyncDone("tag2", "value2");
+
+ // Now have data and metadata for both items, as well as a commit request for
+ // the local item.
+ EXPECT_EQ(2U, db().DataCount());
+ EXPECT_EQ(2U, db().MetadataCount());
+ EXPECT_EQ(2U, ProcessorEntityCount());
+ EXPECT_EQ(1, db().GetMetadata("tag1").sequence_number());
+ EXPECT_EQ(0, db().GetMetadata("tag2").sequence_number());
+ ExpectCommitRequests({"tag1"});
+}
+
// This test covers race conditions during loading pending data. All cases
// start with no processor and one item with a pending commit. There are three
// different events that can occur in any order once metadata is loaded:
@@ -745,8 +824,7 @@ TEST_F(SharedModelTypeProcessorTest, CreateLocalItem) {
WriteItem("tag1", "value1");
// Verify the commit request this operation has triggered.
- EXPECT_EQ(1U, GetNumCommitRequestLists());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1"});
const CommitRequestData& tag1_request_data =
GetLatestCommitRequestForTag("tag1");
const EntityData& tag1_data = tag1_request_data.entity.value();
@@ -839,22 +917,20 @@ TEST_F(SharedModelTypeProcessorTest, CreateAndModifyWithOverrides) {
// Thoroughly tests data generated by modification of server-unknown item.
TEST_F(SharedModelTypeProcessorTest, CreateAndModifyLocalItem) {
InitializeToReadyState();
- EXPECT_EQ(0U, GetNumCommitRequestLists());
WriteItem("tag1", "value1");
- EXPECT_EQ(1U, GetNumCommitRequestLists());
EXPECT_EQ(1U, db().MetadataCount());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1"});
+
const CommitRequestData& request_data_v1 =
GetLatestCommitRequestForTag("tag1");
const EntityData& data_v1 = request_data_v1.entity.value();
const sync_pb::EntityMetadata metadata_v1 = db().GetMetadata("tag1");
WriteItem("tag1", "value2");
- EXPECT_EQ(2U, GetNumCommitRequestLists());
EXPECT_EQ(1U, db().MetadataCount());
+ ExpectCommitRequests({"tag1", "tag1"});
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
const CommitRequestData& request_data_v2 =
GetLatestCommitRequestForTag("tag1");
const EntityData& data_v2 = request_data_v2.entity.value();
@@ -913,16 +989,14 @@ TEST_F(SharedModelTypeProcessorTest, DeleteServerUnknown) {
// commit at all.
WriteItem("tag1", "value1");
- EXPECT_EQ(1U, GetNumCommitRequestLists());
EXPECT_EQ(1U, db().MetadataCount());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1"});
const CommitRequestData& data_v1 = GetLatestCommitRequestForTag("tag1");
const sync_pb::EntityMetadata metadata_v1 = db().GetMetadata("tag1");
DeleteItem("tag1");
- EXPECT_EQ(2U, GetNumCommitRequestLists());
EXPECT_EQ(1U, db().MetadataCount());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1", "tag1"});
const CommitRequestData& data_v2 = GetLatestCommitRequestForTag("tag1");
const sync_pb::EntityMetadata metadata_v2 = db().GetMetadata("tag1");
@@ -959,18 +1033,16 @@ TEST_F(SharedModelTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) {
InitializeToReadyState();
WriteItem("tag1", "value1");
- EXPECT_EQ(1U, GetNumCommitRequestLists());
EXPECT_EQ(1U, db().DataCount());
EXPECT_EQ(1U, db().MetadataCount());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1"});
const CommitRequestData& data_v1 = GetLatestCommitRequestForTag("tag1");
EXPECT_FALSE(db().GetMetadata("tag1").is_deleted());
DeleteItem("tag1");
- EXPECT_EQ(2U, GetNumCommitRequestLists());
EXPECT_EQ(0U, db().DataCount());
EXPECT_EQ(1U, db().MetadataCount());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1", "tag1"});
EXPECT_TRUE(db().GetMetadata("tag1").is_deleted());
// This commit happened while the deletion was in progress, but the commit
@@ -1010,9 +1082,7 @@ TEST_F(SharedModelTypeProcessorTest, TwoIndependentItems) {
const sync_pb::EntityMetadata metadata1 = db().GetMetadata("tag1");
// There should be one commit request for this item only.
- ASSERT_EQ(1U, GetNumCommitRequestLists());
- EXPECT_EQ(1U, GetNthCommitRequestList(0).size());
- ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ ExpectCommitRequests({"tag1"});
WriteItem("tag2", "value2");
EXPECT_EQ(2U, db().DataCount());
@@ -1020,9 +1090,7 @@ TEST_F(SharedModelTypeProcessorTest, TwoIndependentItems) {
const sync_pb::EntityMetadata metadata2 = db().GetMetadata("tag2");
// The second write should trigger another single-item commit request.
- ASSERT_EQ(2U, GetNumCommitRequestLists());
- EXPECT_EQ(1U, GetNthCommitRequestList(1).size());
- ASSERT_TRUE(HasCommitRequestForTag("tag2"));
+ ExpectCommitRequests({"tag1", "tag2"});
EXPECT_FALSE(metadata1.is_deleted());
EXPECT_EQ(1, metadata1.sequence_number());
@@ -1035,27 +1103,6 @@ TEST_F(SharedModelTypeProcessorTest, TwoIndependentItems) {
EXPECT_EQ(kUncommittedVersion, metadata2.server_version());
}
-// Starts the type sync proxy with no local state.
-// Verify that it waits until initial sync is complete before requesting
-// commits.
-TEST_F(SharedModelTypeProcessorTest, NoCommitsUntilInitialSyncDone) {
- CreateProcessor();
- OnMetadataLoaded();
- OnSyncStarting();
-
- WriteItem("tag1", "value1");
- EXPECT_EQ(0U, GetNumCommitRequestLists());
-
- // Even though there the item hasn't been committed its metadata should have
- // already been updated and the sequence number changed.
- EXPECT_EQ(1U, db().MetadataCount());
- EXPECT_EQ(1, db().GetMetadata("tag1").sequence_number());
-
- OnInitialSyncDone();
- EXPECT_EQ(1U, GetNumCommitRequestLists());
- EXPECT_TRUE(HasCommitRequestForTag("tag1"));
-}
-
// Test proper handling of disconnect and reconnect.
//
// Creates items in various states of commit and verifies they re-attempt to
diff --git a/sync/internal_api/test/fake_model_type_service.cc b/sync/internal_api/test/fake_model_type_service.cc
index 999d501..8f1e14c 100644
--- a/sync/internal_api/test/fake_model_type_service.cc
+++ b/sync/internal_api/test/fake_model_type_service.cc
@@ -17,7 +17,7 @@ FakeModelTypeService::CreateMetadataChangeList() {
syncer::SyncError FakeModelTypeService::MergeSyncData(
scoped_ptr<MetadataChangeList> metadata_change_list,
- EntityDataList entity_data_list) {
+ EntityDataMap entity_data_map) {
return syncer::SyncError();
}