summaryrefslogtreecommitdiffstats
path: root/sync/engine
diff options
context:
space:
mode:
authorrlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-05-22 21:40:12 +0000
committerrlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-05-22 21:40:12 +0000
commitfff8e6f15c42b509dcd3966976c03d69fd53593b (patch)
tree96699fd92538b7a518991712fdb13d35bde9d499 /sync/engine
parent99993e49b65e58a6e972426abd408cb1963c5539 (diff)
downloadchromium_src-fff8e6f15c42b509dcd3966976c03d69fd53593b.zip
chromium_src-fff8e6f15c42b509dcd3966976c03d69fd53593b.tar.gz
chromium_src-fff8e6f15c42b509dcd3966976c03d69fd53593b.tar.bz2
Implement sync in the NonBlockingTypeProcessor
Introduces the NonBlockingTypeProcessor's sync logic. When combined with the NonBlockingTypeProcessorCore's sync logic (which will be introduced in a follow-up commit), this will be an alternative to the existing sync engine implemented with DirectoryUpdateHandler and DirectoryCommitContributor. Adds non_blocking_sync_common.h, which defines structs to be used to pass messages between the processor and processor core. Adds DataTypeState as a parameter to the processor to processor core connection methods. Eventually this will be used to initialize the processor core with state that the processor loaded from disk. Adds a lot of unit tests and unit test framework intrastructure. The NonBlockingTypeProcessor and NonBlockingTypeProcessorCore's communications with each other will be very racy. These tests are intended to help manage the complexity this will cause by allowing us to test all the possible race conditions individually. BUG=351005 Review URL: https://codereview.chromium.org/280983002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@272329 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'sync/engine')
-rw-r--r--sync/engine/model_thread_sync_entity.cc157
-rw-r--r--sync/engine/model_thread_sync_entity.h186
-rw-r--r--sync/engine/model_thread_sync_entity_unittest.cc178
-rw-r--r--sync/engine/non_blocking_sync_common.cc33
-rw-r--r--sync/engine/non_blocking_sync_common.h98
-rw-r--r--sync/engine/non_blocking_type_processor.cc149
-rw-r--r--sync/engine/non_blocking_type_processor.h49
-rw-r--r--sync/engine/non_blocking_type_processor_core.cc5
-rw-r--r--sync/engine/non_blocking_type_processor_core.h4
-rw-r--r--sync/engine/non_blocking_type_processor_core_interface.cc16
-rw-r--r--sync/engine/non_blocking_type_processor_core_interface.h24
-rw-r--r--sync/engine/non_blocking_type_processor_unittest.cc524
12 files changed, 1409 insertions, 14 deletions
diff --git a/sync/engine/model_thread_sync_entity.cc b/sync/engine/model_thread_sync_entity.cc
new file mode 100644
index 0000000..fde2eb3
--- /dev/null
+++ b/sync/engine/model_thread_sync_entity.cc
@@ -0,0 +1,157 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/model_thread_sync_entity.h"
+#include "sync/syncable/syncable_util.h"
+
+namespace syncer {
+
+scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::NewLocalItem(
+ const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time now) {
+ return scoped_ptr<ModelThreadSyncEntity>(new ModelThreadSyncEntity(
+ 1,
+ 0,
+ 0,
+ 0,
+ true,
+ std::string(), // Sync thread will assign the initial ID.
+ syncable::GenerateSyncableHash(GetModelTypeFromSpecifics(specifics),
+ client_tag),
+ client_tag, // As non-unique name.
+ specifics,
+ false,
+ now,
+ now));
+}
+
+scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::FromServerUpdate(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ int64 version,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime) {
+ return scoped_ptr<ModelThreadSyncEntity>(
+ new ModelThreadSyncEntity(0,
+ 0,
+ 0,
+ version,
+ true,
+ id,
+ client_tag_hash,
+ non_unique_name,
+ specifics,
+ deleted,
+ ctime,
+ mtime));
+}
+
+ModelThreadSyncEntity::ModelThreadSyncEntity(
+ int64 sequence_number,
+ int64 commit_requested_sequence_number,
+ int64 acked_sequence_number,
+ int64 base_version,
+ bool is_dirty,
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime)
+ : sequence_number_(sequence_number),
+ commit_requested_sequence_number_(commit_requested_sequence_number),
+ acked_sequence_number_(acked_sequence_number),
+ base_version_(base_version),
+ is_dirty_(is_dirty),
+ id_(id),
+ client_tag_hash_(client_tag_hash),
+ non_unique_name_(non_unique_name),
+ specifics_(specifics),
+ deleted_(deleted),
+ ctime_(ctime),
+ mtime_(mtime) {
+}
+
+ModelThreadSyncEntity::~ModelThreadSyncEntity() {
+}
+
+bool ModelThreadSyncEntity::IsWriteRequired() const {
+ return is_dirty_;
+}
+
+bool ModelThreadSyncEntity::IsUnsynced() const {
+ return sequence_number_ > acked_sequence_number_;
+}
+
+bool ModelThreadSyncEntity::RequiresCommitRequest() const {
+ return sequence_number_ > commit_requested_sequence_number_;
+}
+
+bool ModelThreadSyncEntity::UpdateIsReflection(int64 update_version) const {
+ return base_version_ >= update_version;
+}
+
+bool ModelThreadSyncEntity::UpdateIsInConflict(int64 update_version) const {
+ return IsUnsynced() && !UpdateIsReflection(update_version);
+}
+
+void ModelThreadSyncEntity::ApplyUpdateFromServer(
+ int64 update_version,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time mtime) {
+ // There was a conflict and the server just won it.
+ // This implicitly acks all outstanding commits because a received update
+ // will clobber any pending commits on the sync thread.
+ acked_sequence_number_ = sequence_number_;
+ commit_requested_sequence_number_ = sequence_number_;
+
+ base_version_ = update_version;
+ specifics_ = specifics;
+ mtime_ = mtime;
+}
+
+void ModelThreadSyncEntity::MakeLocalChange(
+ const sync_pb::EntitySpecifics& specifics) {
+ sequence_number_++;
+ specifics_ = specifics;
+}
+
+void ModelThreadSyncEntity::Delete() {
+ sequence_number_++;
+ specifics_.Clear();
+ deleted_ = true;
+}
+
+void ModelThreadSyncEntity::InitializeCommitRequestData(
+ CommitRequestData* request) const {
+ request->id = id_;
+ request->client_tag_hash = client_tag_hash_;
+ request->sequence_number = sequence_number_;
+ request->base_version = base_version_;
+ request->ctime = ctime_;
+ request->mtime = mtime_;
+ request->non_unique_name = non_unique_name_;
+ request->deleted = deleted_;
+ request->specifics.CopyFrom(specifics_);
+}
+
+void ModelThreadSyncEntity::SetCommitRequestInProgress() {
+ commit_requested_sequence_number_ = sequence_number_;
+}
+
+void ModelThreadSyncEntity::ReceiveCommitResponse(const std::string& id,
+ int64 sequence_number,
+ int64 response_version) {
+ id_ = id; // The server can assign us a new ID in a commit response.
+ acked_sequence_number_ = sequence_number;
+ base_version_ = response_version;
+}
+
+} // namespace syncer
diff --git a/sync/engine/model_thread_sync_entity.h b/sync/engine/model_thread_sync_entity.h
new file mode 100644
index 0000000..31cc2b8
--- /dev/null
+++ b/sync/engine/model_thread_sync_entity.h
@@ -0,0 +1,186 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_
+#define SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_
+
+#include <string>
+
+#include "base/memory/scoped_ptr.h"
+#include "base/time/time.h"
+#include "sync/base/sync_export.h"
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/protocol/sync.pb.h"
+
+namespace syncer {
+
+// This is the model thread's representation of a SyncEntity.
+//
+// The model thread sync entity receives updates from the model itself and
+// (asynchronously) from the sync server via the sync thread. From the point
+// of view of this class, updates from the server take precedence over local
+// changes, though the model may be given an opportunity to overwrite this
+// decision later.
+//
+// Sync will try to commit this entity's data to the sync server and local
+// storage.
+//
+// Most of the logic related to those processes live outside this class. This
+// class helps out a bit by offering some functions to serialize its data to
+// various formats and query the entity's status.
+class SYNC_EXPORT_PRIVATE ModelThreadSyncEntity {
+ public:
+ // Construct an instance representing a new locally-created item.
+ static scoped_ptr<ModelThreadSyncEntity> NewLocalItem(
+ const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time now);
+
+ // Construct an instance representing an item newly received from the server.
+ static scoped_ptr<ModelThreadSyncEntity> FromServerUpdate(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ int64 version,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime);
+
+ // TODO(rlarocque): Implement FromDisk constructor when we implement storage.
+
+ ~ModelThreadSyncEntity();
+
+ // Returns true if this data is out of sync with local storage.
+ bool IsWriteRequired() const;
+
+ // Returns true if this data is out of sync with the server.
+ // A commit may or may not be in progress at this time.
+ bool IsUnsynced() const;
+
+ // Returns true if this data is out of sync with the sync thread.
+ //
+ // There may or may not be a commit in progress for this item, but there's
+ // definitely no commit in progress for this (most up to date) version of
+ // this item.
+ bool RequiresCommitRequest() const;
+
+ // Returns true if the specified update version does not contain new data.
+ bool UpdateIsReflection(int64 update_version) const;
+
+ // Returns true if the specified update version conflicts with local changes.
+ bool UpdateIsInConflict(int64 update_version) const;
+
+ // Applies an update from the sync server.
+ //
+ // Overrides any local changes. Check UpdateIsInConflict() before calling
+ // this function if you want to handle conflicts differently.
+ void ApplyUpdateFromServer(int64 update_version,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time mtime);
+
+ // Applies a local change to this item.
+ void MakeLocalChange(const sync_pb::EntitySpecifics& specifics);
+
+ // Applies a local deletion to this item.
+ void Delete();
+
+ // Initializes a message representing this item's uncommitted state
+ // to be forwarded to the sync server for committing.
+ void InitializeCommitRequestData(CommitRequestData* request) const;
+
+ // Notes that the current version of this item has been queued for commit.
+ void SetCommitRequestInProgress();
+
+ // Receives a successful commit response.
+ //
+ // Sucssful commit responses can overwrite an item's ID.
+ //
+ // Note that the receipt of a successful commit response does not necessarily
+ // unset IsUnsynced(). If many local changes occur in quick succession, it's
+ // possible that the committed item was already out of date by the time it
+ // reached the server.
+ void ReceiveCommitResponse(const std::string& id,
+ int64 sequence_number,
+ int64 response_version);
+
+ private:
+ ModelThreadSyncEntity(int64 sequence_number,
+ int64 commit_requested_sequence_number,
+ int64 acked_sequence_number,
+ int64 base_version,
+ bool is_dirty,
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime);
+
+ // A sequence number used to track in-progress commits. Each local change
+ // increments this number.
+ int64 sequence_number_;
+
+ // The sequence number of the last item sent to the sync thread.
+ int64 commit_requested_sequence_number_;
+
+ // The sequence number of the last item known to be successfully committed.
+ int64 acked_sequence_number_;
+
+ // The server version on which this item is based.
+ //
+ // If there are no local changes, this is the version of the entity as we see
+ // it here.
+ //
+ // If there are local changes, this is the version of the entity on which
+ // those changes are based.
+ int64 base_version_;
+
+ // True if this entity is out of sync with local storage.
+ bool is_dirty_;
+
+ // The entity's ID.
+ //
+ // Most of the time, this is a server-assigned value.
+ //
+ // Prior to the item's first commit, we leave this value as an empty string.
+ // The initial ID for a newly created item has to meet certain uniqueness
+ // requirements, and we handle those on the sync thread.
+ std::string id_;
+
+ // A hash based on the client tag and model type.
+ // Used for various map lookups. Should always be available.
+ std::string client_tag_hash_;
+
+ // A non-unique name associated with this entity.
+ //
+ // It is sometimes used for debugging. It gets saved to and restored from
+ // the sync server.
+ //
+ // Its value is often related to the item's unhashed client tag, though this
+ // is not guaranteed and should not be relied on. May be hidden when
+ // encryption is enabled.
+ std::string non_unique_name_;
+
+ // A protobuf filled with type-specific information. Contains the most
+ // up-to-date specifics, whether it be from the server or a locally modified
+ // version.
+ sync_pb::EntitySpecifics specifics_;
+
+ // Whether or not the item is deleted. The |specifics_| field may be empty
+ // if this flag is true.
+ bool deleted_;
+
+ // Entity creation and modification timestamps.
+ // Assigned by the client and synced by the server, though the server usually
+ // doesn't bother to inspect their values.
+ base::Time ctime_;
+ base::Time mtime_;
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_
diff --git a/sync/engine/model_thread_sync_entity_unittest.cc b/sync/engine/model_thread_sync_entity_unittest.cc
new file mode 100644
index 0000000..6b158dd
--- /dev/null
+++ b/sync/engine/model_thread_sync_entity_unittest.cc
@@ -0,0 +1,178 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/model_thread_sync_entity.h"
+
+#include "base/memory/scoped_ptr.h"
+#include "base/time/time.h"
+#include "sync/internal_api/public/base/model_type.h"
+#include "sync/protocol/sync.pb.h"
+#include "sync/syncable/syncable_util.h"
+
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace syncer {
+
+// Some simple sanity tests for the ModelThreadSyncEntity.
+//
+// A lot of the more complicated sync logic is implemented in the
+// NonBlockingTypeProcessor that owns the ModelThreadSyncEntity. We
+// can't unit test it here.
+//
+// Instead, we focus on simple tests to make sure that variables are getting
+// properly intialized and flags properly set. Anything more complicated would
+// be a redundant and incomplete version of the NonBlockingTypeProcessor tests.
+class ModelThreadSyncEntityTest : public ::testing::Test {
+ public:
+ ModelThreadSyncEntityTest()
+ : kServerId("ServerID"),
+ kClientTag("sample.pref.name"),
+ kClientTagHash(syncable::GenerateSyncableHash(PREFERENCES, kClientTag)),
+ kCtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(10)),
+ kMtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(20)) {
+ sync_pb::PreferenceSpecifics* pref_specifics =
+ specifics.mutable_preference();
+ pref_specifics->set_name(kClientTag);
+ pref_specifics->set_value("pref.value");
+ }
+
+ const std::string kServerId;
+ const std::string kClientTag;
+ const std::string kClientTagHash;
+ const base::Time kCtime;
+ const base::Time kMtime;
+ sync_pb::EntitySpecifics specifics;
+};
+
+TEST_F(ModelThreadSyncEntityTest, NewLocalItem) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::NewLocalItem("asdf", specifics, kCtime));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_TRUE(entity->IsUnsynced());
+ EXPECT_FALSE(entity->UpdateIsReflection(1));
+ EXPECT_TRUE(entity->UpdateIsInConflict(1));
+}
+
+TEST_F(ModelThreadSyncEntityTest, FromServerUpdate) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_FALSE(entity->IsUnsynced());
+ EXPECT_TRUE(entity->UpdateIsReflection(9));
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_FALSE(entity->UpdateIsInConflict(11));
+}
+
+// Tombstones should behave just like regular updates. Mostly. The strangest
+// thing about them is that they don't have specifics, so it can be hard to
+// detect their type. Fortunately, this class doesn't care about types in
+// received updates.
+TEST_F(ModelThreadSyncEntityTest, TombstoneUpdate) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ sync_pb::EntitySpecifics(),
+ true,
+ kCtime,
+ kMtime));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_FALSE(entity->IsUnsynced());
+ EXPECT_TRUE(entity->UpdateIsReflection(9));
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_FALSE(entity->UpdateIsInConflict(11));
+}
+
+// Apply a deletion update.
+TEST_F(ModelThreadSyncEntityTest, ApplyUpdate) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ // A deletion update one version later.
+ entity->ApplyUpdateFromServer(11,
+ true,
+ sync_pb::EntitySpecifics(),
+ kMtime + base::TimeDelta::FromSeconds(10));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_FALSE(entity->IsUnsynced());
+ EXPECT_TRUE(entity->UpdateIsReflection(11));
+ EXPECT_FALSE(entity->UpdateIsReflection(12));
+}
+
+TEST_F(ModelThreadSyncEntityTest, LocalChange) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ sync_pb::EntitySpecifics specifics2;
+ specifics2.CopyFrom(specifics);
+ specifics2.mutable_preference()->set_value("new.pref.value");
+
+ entity->MakeLocalChange(specifics2);
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_TRUE(entity->IsUnsynced());
+
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsInConflict(10));
+
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_TRUE(entity->UpdateIsInConflict(11));
+}
+
+TEST_F(ModelThreadSyncEntityTest, LocalDeletion) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ entity->Delete();
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_TRUE(entity->IsUnsynced());
+
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsInConflict(10));
+
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_TRUE(entity->UpdateIsInConflict(11));
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_sync_common.cc b/sync/engine/non_blocking_sync_common.cc
new file mode 100644
index 0000000..c5a3656
--- /dev/null
+++ b/sync/engine/non_blocking_sync_common.cc
@@ -0,0 +1,33 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_sync_common.h"
+
+namespace syncer {
+
+DataTypeState::DataTypeState() {
+}
+
+DataTypeState::~DataTypeState() {
+}
+
+CommitRequestData::CommitRequestData() {
+}
+
+CommitRequestData::~CommitRequestData() {
+}
+
+CommitResponseData::CommitResponseData() {
+}
+
+CommitResponseData::~CommitResponseData() {
+}
+
+UpdateResponseData::UpdateResponseData() {
+}
+
+UpdateResponseData::~UpdateResponseData() {
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_sync_common.h b/sync/engine/non_blocking_sync_common.h
new file mode 100644
index 0000000..f07fdb2
--- /dev/null
+++ b/sync/engine/non_blocking_sync_common.h
@@ -0,0 +1,98 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_
+#define SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_
+
+#include <string>
+#include <vector>
+
+#include "base/time/time.h"
+#include "sync/base/sync_export.h"
+#include "sync/protocol/sync.pb.h"
+
+namespace syncer {
+
+// Data-type global state that must be accessed and updated on the sync thread,
+// but persisted on or through the model thread.
+struct SYNC_EXPORT_PRIVATE DataTypeState {
+ DataTypeState();
+ ~DataTypeState();
+
+ // The latest progress markers received from the server.
+ sync_pb::DataTypeProgressMarker progress_marker;
+
+ // A data type context. Sent to the server in every commit or update
+ // request. May be updated by either by responses from the server or
+ // requests made on the model thread. The interpretation of this value may
+ // be data-type specific. Many data types ignore it.
+ sync_pb::DataTypeContext type_context;
+
+ // The ID of the folder node that sits at the top of this type's folder
+ // hierarchy. We keep this around for legacy reasons. The protocol expects
+ // that all nodes of a certain type are children of the same type root
+ // entity. This entity is delivered by the server, and may not be available
+ // until the first download cycle has completed.
+ std::string type_root_id;
+
+ // A strictly increasing counter used to generate unique values for the
+ // client-assigned IDs. The incrementing and ID assignment happens on the
+ // sync thread, but we store the value here so we can pass it back to the
+ // model thread for persistence. This is probably unnecessary for the
+ // client-tagged data types supported by non-blocking sync, but we will
+ // continue to emulate the directory sync's behavior for now.
+ int64 next_client_id;
+};
+
+struct SYNC_EXPORT_PRIVATE CommitRequestData {
+ CommitRequestData();
+ ~CommitRequestData();
+
+ std::string id;
+ std::string client_tag_hash;
+
+ // Strictly incrementing number for in-progress commits. More information
+ // about its meaning can be found in comments in the files that make use of
+ // this struct.
+ int64 sequence_number;
+
+ int64 base_version;
+ base::Time ctime;
+ base::Time mtime;
+ std::string non_unique_name;
+ bool deleted;
+ sync_pb::EntitySpecifics specifics;
+};
+
+struct SYNC_EXPORT_PRIVATE CommitResponseData {
+ CommitResponseData();
+ ~CommitResponseData();
+
+ std::string id;
+ std::string client_tag_hash;
+ int64 sequence_number;
+ int64 response_version;
+};
+
+struct SYNC_EXPORT_PRIVATE UpdateResponseData {
+ UpdateResponseData();
+ ~UpdateResponseData();
+
+ std::string id;
+ std::string client_tag_hash;
+ int64 response_version;
+ base::Time ctime;
+ base::Time mtime;
+ std::string non_unique_name;
+ bool deleted;
+ sync_pb::EntitySpecifics specifics;
+};
+
+typedef std::vector<CommitRequestData> CommitRequestDataList;
+typedef std::vector<CommitResponseData> CommitResponseDataList;
+typedef std::vector<UpdateResponseData> UpdateResponseDataList;
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_
diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc
index 66a016b..0aaead4 100644
--- a/sync/engine/non_blocking_type_processor.cc
+++ b/sync/engine/non_blocking_type_processor.cc
@@ -4,9 +4,13 @@
#include "sync/engine/non_blocking_type_processor.h"
+#include "base/bind.h"
+#include "base/location.h"
#include "base/message_loop/message_loop_proxy.h"
-#include "sync/engine/non_blocking_type_processor_core.h"
+#include "sync/engine/model_thread_sync_entity.h"
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
#include "sync/internal_api/public/sync_core_proxy.h"
+#include "sync/syncable/syncable_util.h"
namespace syncer {
@@ -14,6 +18,7 @@ NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
: type_(type),
is_preferred_(false),
is_connected_(false),
+ entities_deleter_(&entities_),
weak_ptr_factory_for_ui_(this),
weak_ptr_factory_for_sync_(this) {
}
@@ -40,9 +45,17 @@ void NonBlockingTypeProcessor::Enable(
scoped_ptr<SyncCoreProxy> sync_core_proxy) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
+
is_preferred_ = true;
+
+ // TODO(rlarocque): At some point, this should be loaded from storage.
+ data_type_state_.progress_marker.set_data_type_id(
+ GetSpecificsFieldNumberFromModelType(type_));
+ data_type_state_.next_client_id = 0;
+
sync_core_proxy_ = sync_core_proxy.Pass();
sync_core_proxy_->ConnectTypeToCore(GetModelType(),
+ data_type_state_,
weak_ptr_factory_for_sync_.GetWeakPtr());
}
@@ -63,8 +76,7 @@ void NonBlockingTypeProcessor::Disconnect() {
}
weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
- core_ = base::WeakPtr<NonBlockingTypeProcessorCore>();
- sync_thread_ = scoped_refptr<base::SequencedTaskRunner>();
+ core_interface_.reset();
}
base::WeakPtr<NonBlockingTypeProcessor>
@@ -74,13 +86,136 @@ NonBlockingTypeProcessor::AsWeakPtrForUI() {
}
void NonBlockingTypeProcessor::OnConnect(
- base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread) {
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
+
is_connected_ = true;
- core_ = core;
- sync_thread_ = sync_thread;
+ core_interface_ = core_interface.Pass();
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::Put(const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics) {
+ DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
+
+ const std::string client_tag_hash(
+ syncable::GenerateSyncableHash(type_, client_tag));
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::NewLocalItem(
+ client_tag, specifics, base::Time::Now()));
+ entities_.insert(std::make_pair(client_tag_hash, entity.release()));
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->MakeLocalChange(specifics);
+ }
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
+ const std::string client_tag_hash(
+ syncable::GenerateSyncableHash(type_, client_tag));
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ // That's unusual, but not necessarily a bad thing.
+ // Missing is as good as deleted as far as the model is concerned.
+ DLOG(WARNING) << "Attempted to delete missing item."
+ << " client tag: " << client_tag;
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->Delete();
+ }
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
+ CommitRequestDataList commit_requests;
+
+ // Don't bother sending anything if there's no one to send to.
+ if (!IsConnected())
+ return;
+
+ // TODO(rlarocque): Do something smarter than iterate here.
+ for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
+ ++it) {
+ if (it->second->RequiresCommitRequest()) {
+ CommitRequestData request;
+ it->second->InitializeCommitRequestData(&request);
+ commit_requests.push_back(request);
+ it->second->SetCommitRequestInProgress();
+ }
+ }
+
+ if (!commit_requests.empty())
+ core_interface_->RequestCommits(commit_requests);
+}
+
+void NonBlockingTypeProcessor::OnCommitCompletion(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ data_type_state_ = type_state;
+
+ for (CommitResponseDataList::const_iterator list_it = response_list.begin();
+ list_it != response_list.end();
+ ++list_it) {
+ const CommitResponseData& response_data = *list_it;
+ const std::string& client_tag_hash = response_data.client_tag_hash;
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ NOTREACHED() << "Received commit response for missing item."
+ << " type: " << type_ << " client_tag: " << client_tag_hash;
+ return;
+ } else {
+ it->second->ReceiveCommitResponse(response_data.id,
+ response_data.sequence_number,
+ response_data.response_version);
+ }
+ }
+}
+
+void NonBlockingTypeProcessor::OnUpdateReceived(
+ const DataTypeState& data_type_state,
+ const UpdateResponseDataList& response_list) {
+ data_type_state_ = data_type_state;
+
+ for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
+ list_it != response_list.end();
+ ++list_it) {
+ const UpdateResponseData& response_data = *list_it;
+ const std::string& client_tag_hash = response_data.client_tag_hash;
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ scoped_ptr<ModelThreadSyncEntity> entity =
+ ModelThreadSyncEntity::FromServerUpdate(
+ response_data.id,
+ response_data.client_tag_hash,
+ response_data.non_unique_name,
+ response_data.response_version,
+ response_data.specifics,
+ response_data.deleted,
+ response_data.ctime,
+ response_data.mtime);
+ entities_.insert(std::make_pair(client_tag_hash, entity.release()));
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->ApplyUpdateFromServer(response_data.response_version,
+ response_data.deleted,
+ response_data.specifics,
+ response_data.mtime);
+ // TODO: Do something special when conflicts are detected.
+ }
+ }
+
+ // TODO: Inform the model of the new or updated data.
}
} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor.h b/sync/engine/non_blocking_type_processor.h
index 2db2c32..2274c5d 100644
--- a/sync/engine/non_blocking_type_processor.h
+++ b/sync/engine/non_blocking_type_processor.h
@@ -6,16 +6,18 @@
#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_H_
#include "base/memory/weak_ptr.h"
-#include "base/sequenced_task_runner.h"
+#include "base/stl_util.h"
#include "base/threading/non_thread_safe.h"
#include "sync/base/sync_export.h"
+#include "sync/engine/non_blocking_sync_common.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/protocol/sync.pb.h"
namespace syncer {
class SyncCoreProxy;
-class NonBlockingTypeProcessorCore;
+class ModelThreadSyncEntity;
+class NonBlockingTypeProcessorCoreInterface;
// A sync component embedded on the synced type's thread that helps to handle
// communication between sync and model type threads.
@@ -54,16 +56,38 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe {
void Disconnect();
// Callback used to process the handshake response.
- void OnConnect(base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread);
+ void OnConnect(
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface);
+
+ // Requests that an item be stored in sync.
+ void Put(const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics);
+
+ // Deletes an item from sync.
+ void Delete(const std::string& client_tag);
+
+ // Informs this object that some of its commit requests have been
+ // successfully serviced.
+ void OnCommitCompletion(const DataTypeState& type_state,
+ const CommitResponseDataList& response_list);
+
+ // Informs this object that there are some incoming updates is should
+ // handle.
+ void OnUpdateReceived(const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list);
// Returns the long-lived WeakPtr that is intended to be registered with the
// ProfileSyncService.
base::WeakPtr<NonBlockingTypeProcessor> AsWeakPtrForUI();
private:
+ typedef std::map<std::string, ModelThreadSyncEntity*> EntityMap;
+
+ // Sends all commit requests that are due to be sent to the sync thread.
+ void FlushPendingCommitRequests();
+
ModelType type_;
- sync_pb::DataTypeProgressMarker progress_marker_;
+ DataTypeState data_type_state_;
// Whether or not sync is preferred for this type. This is a cached copy of
// the canonical copy information on the UI thread.
@@ -75,10 +99,21 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe {
// Our link to data type management on the sync thread.
// Used for enabling and disabling sync for this type.
+ //
+ // Beware of NULL pointers: This object is uninitialized when we are not
+ // connected to sync.
scoped_ptr<SyncCoreProxy> sync_core_proxy_;
- base::WeakPtr<NonBlockingTypeProcessorCore> core_;
- scoped_refptr<base::SequencedTaskRunner> sync_thread_;
+ // Reference to the NonBlockingTypeProcessorCore.
+ //
+ // The interface hides the posting of tasks across threads as well as the
+ // NonBlockingTypeProcessorCore's implementation. Both of these features are
+ // useful in tests.
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface_;
+
+ // The set of sync entities known to this object.
+ EntityMap entities_;
+ STLValueDeleter<EntityMap> entities_deleter_;
// We use two different WeakPtrFactories because we want the pointers they
// issue to have different lifetimes. When asked to disconnect from the sync
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc
index ea0f918..236c9c5 100644
--- a/sync/engine/non_blocking_type_processor_core.cc
+++ b/sync/engine/non_blocking_type_processor_core.cc
@@ -63,6 +63,11 @@ void NonBlockingTypeProcessorCore::ApplyUpdates(
DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_);
}
+void NonBlockingTypeProcessorCore::RequestCommits(
+ const CommitRequestDataList& request_list) {
+ // TODO(rlarocque): Implement this. crbug.com/351005.
+}
+
void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
sessions::StatusController* status) {
NOTREACHED()
diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h
index c9cec88..e226349 100644
--- a/sync/engine/non_blocking_type_processor_core.h
+++ b/sync/engine/non_blocking_type_processor_core.h
@@ -9,6 +9,7 @@
#include "base/threading/non_thread_safe.h"
#include "sync/base/sync_export.h"
#include "sync/engine/commit_contributor.h"
+#include "sync/engine/non_blocking_sync_common.h"
#include "sync/engine/update_handler.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/protocol/sync.pb.h"
@@ -67,6 +68,9 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore
virtual void ApplyUpdates(sessions::StatusController* status) OVERRIDE;
virtual void PassiveApplyUpdates(sessions::StatusController* status) OVERRIDE;
+ // Entry point for NonBlockingTypeProcessor to send commit requests.
+ void RequestCommits(const CommitRequestDataList& request_list);
+
// CommitContributor implementation.
virtual scoped_ptr<CommitContribution> GetContribution(
size_t max_entries) OVERRIDE;
diff --git a/sync/engine/non_blocking_type_processor_core_interface.cc b/sync/engine/non_blocking_type_processor_core_interface.cc
new file mode 100644
index 0000000..6ce74cc
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_core_interface.cc
@@ -0,0 +1,16 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
+
+namespace syncer {
+
+NonBlockingTypeProcessorCoreInterface::NonBlockingTypeProcessorCoreInterface() {
+}
+
+NonBlockingTypeProcessorCoreInterface::
+ ~NonBlockingTypeProcessorCoreInterface() {
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor_core_interface.h b/sync/engine/non_blocking_type_processor_core_interface.h
new file mode 100644
index 0000000..56675c2
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_core_interface.h
@@ -0,0 +1,24 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_
+#define SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_
+
+#include "sync/engine/non_blocking_sync_common.h"
+
+namespace syncer {
+
+// An interface representing a NonBlockingTypeProcessorCore and its thread.
+// This abstraction is useful in tests.
+class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorCoreInterface {
+ public:
+ NonBlockingTypeProcessorCoreInterface();
+ virtual ~NonBlockingTypeProcessorCoreInterface();
+
+ virtual void RequestCommits(const CommitRequestDataList& list) = 0;
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_
diff --git a/sync/engine/non_blocking_type_processor_unittest.cc b/sync/engine/non_blocking_type_processor_unittest.cc
new file mode 100644
index 0000000..9ab6ca3
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_unittest.cc
@@ -0,0 +1,524 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_processor.h"
+
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
+#include "sync/internal_api/public/base/model_type.h"
+#include "sync/internal_api/public/sync_core_proxy.h"
+#include "sync/protocol/sync.pb.h"
+#include "sync/syncable/syncable_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace syncer {
+
+namespace {
+
+class MockNonBlockingTypeProcessorCore
+ : public NonBlockingTypeProcessorCoreInterface {
+ public:
+ MockNonBlockingTypeProcessorCore();
+ virtual ~MockNonBlockingTypeProcessorCore();
+
+ virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
+
+ bool is_connected_;
+
+ std::vector<CommitRequestDataList> commit_request_lists_;
+};
+
+MockNonBlockingTypeProcessorCore::MockNonBlockingTypeProcessorCore()
+ : is_connected_(false) {
+}
+
+MockNonBlockingTypeProcessorCore::~MockNonBlockingTypeProcessorCore() {
+}
+
+void MockNonBlockingTypeProcessorCore::RequestCommits(
+ const CommitRequestDataList& list) {
+ commit_request_lists_.push_back(list);
+}
+
+class MockSyncCoreProxy : public syncer::SyncCoreProxy {
+ public:
+ MockSyncCoreProxy();
+ virtual ~MockSyncCoreProxy();
+
+ virtual void ConnectTypeToCore(
+ syncer::ModelType type,
+ const DataTypeState& data_type_state,
+ base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) OVERRIDE;
+ virtual void Disconnect(syncer::ModelType type) OVERRIDE;
+ virtual scoped_ptr<SyncCoreProxy> Clone() const OVERRIDE;
+
+ MockNonBlockingTypeProcessorCore* GetMockProcessorCore();
+
+ private:
+ explicit MockSyncCoreProxy(MockNonBlockingTypeProcessorCore*);
+
+ // The NonBlockingTypeProcessor's contract expects that it gets to own this
+ // object, so we can retain only a non-owned pointer to it.
+ //
+ // This is very unsafe, but we can get away with it since these tests are not
+ // exercising the processor <-> processor_core connection code.
+ MockNonBlockingTypeProcessorCore* mock_core_;
+};
+
+MockSyncCoreProxy::MockSyncCoreProxy()
+ : mock_core_(new MockNonBlockingTypeProcessorCore) {
+}
+
+MockSyncCoreProxy::MockSyncCoreProxy(MockNonBlockingTypeProcessorCore* core)
+ : mock_core_(core) {
+}
+
+MockSyncCoreProxy::~MockSyncCoreProxy() {
+}
+
+void MockSyncCoreProxy::ConnectTypeToCore(
+ syncer::ModelType type,
+ const DataTypeState& data_type_state,
+ base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) {
+ // This class is allowed to participate in only one connection.
+ DCHECK(!mock_core_->is_connected_);
+ mock_core_->is_connected_ = true;
+
+ // Hands off ownership of our member to the type_processor, while keeping
+ // an unsafe pointer to it. This is why we can only connect once.
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core(mock_core_);
+
+ type_processor->OnConnect(core.Pass());
+}
+
+void MockSyncCoreProxy::Disconnect(syncer::ModelType type) {
+ // This mock object is not meant for connect and disconnect tests.
+ NOTREACHED() << "Not implemented";
+}
+
+scoped_ptr<SyncCoreProxy> MockSyncCoreProxy::Clone() const {
+ // There's no sensible way to clone this MockSyncCoreProxy.
+ return scoped_ptr<SyncCoreProxy>(new MockSyncCoreProxy(mock_core_));
+}
+
+MockNonBlockingTypeProcessorCore* MockSyncCoreProxy::GetMockProcessorCore() {
+ return mock_core_;
+}
+
+} // namespace
+
+// Tests the sync engine parts of NonBlockingTypeProcessor.
+//
+// The NonBlockingTypeProcessor contains a non-trivial amount of code dedicated
+// to turning the sync engine on and off again. That code is fairly well
+// tested in the NonBlockingDataTypeController unit tests and it doesn't need
+// to be re-tested here.
+//
+// These tests skip past initialization and focus on steady state sync engine
+// behvior. This is where we test how the processor responds to the model's
+// requests to make changes to its data, the messages incoming fro the sync
+// server, and what happens when the two conflict.
+//
+// Inputs:
+// - Initial state from permanent storage. (TODO)
+// - Create, update or delete requests from the model.
+// - Update responses and commit responses from the server.
+//
+// Outputs:
+// - Writes to permanent storage. (TODO)
+// - Callbacks into the model. (TODO)
+// - Requests to the sync thread. Tested with MockNonBlockingTypeProcessorCore.
+class NonBlockingTypeProcessorTest : public ::testing::Test {
+ public:
+ NonBlockingTypeProcessorTest();
+ virtual ~NonBlockingTypeProcessorTest();
+
+ // Explicit initialization step. Kept separate to allow tests to inject
+ // on-disk state before the test begins.
+ void Initialize();
+
+ // Local data modification. Emulates signals from the model thread.
+ void WriteItem(const std::string& tag, const std::string& value);
+ void DeleteItem(const std::string& tag);
+
+ // Emulate updates from the server.
+ // This harness has some functionality to help emulate server behavior.
+ // See the definitions of these methods for more information.
+ void UpdateFromServer(int64 version_offset,
+ const std::string& tag,
+ const std::string& value);
+ void TombstoneFromServer(int64 version_offset, const std::string& tag);
+
+ // Read emitted commit requests as batches.
+ size_t GetNumCommitRequestLists();
+ CommitRequestDataList GetNthCommitRequestList(size_t n);
+
+ // Read emitted commit requests by tag, most recent only.
+ bool HasCommitRequestForTag(const std::string& tag);
+ CommitRequestData GetLatestCommitRequestForTag(const std::string& tag);
+
+ // Sends the processor a successful commit response.
+ void SuccessfulCommitResponse(const CommitRequestData& request_data);
+
+ private:
+ std::string GenerateId(const std::string& tag) const;
+ std::string GenerateTagHash(const std::string& tag) const;
+ sync_pb::EntitySpecifics GenerateSpecifics(const std::string& tag,
+ const std::string& value) const;
+
+ int64 GetServerVersion(const std::string& tag);
+ void SetServerVersion(const std::string& tag, int64 version);
+
+ const ModelType type_;
+
+ scoped_ptr<MockSyncCoreProxy> mock_sync_core_proxy_;
+ scoped_ptr<NonBlockingTypeProcessor> processor_;
+ MockNonBlockingTypeProcessorCore* mock_processor_core_;
+
+ DataTypeState data_type_state_;
+
+ std::map<const std::string, int64> server_versions_;
+};
+
+NonBlockingTypeProcessorTest::NonBlockingTypeProcessorTest()
+ : type_(PREFERENCES),
+ mock_sync_core_proxy_(new MockSyncCoreProxy()),
+ processor_(new NonBlockingTypeProcessor(type_)) {
+}
+
+NonBlockingTypeProcessorTest::~NonBlockingTypeProcessorTest() {
+}
+
+void NonBlockingTypeProcessorTest::Initialize() {
+ processor_->Enable(mock_sync_core_proxy_->Clone());
+ mock_processor_core_ = mock_sync_core_proxy_->GetMockProcessorCore();
+}
+
+void NonBlockingTypeProcessorTest::WriteItem(const std::string& tag,
+ const std::string& value) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ processor_->Put(tag, GenerateSpecifics(tag, value));
+}
+
+void NonBlockingTypeProcessorTest::DeleteItem(const std::string& tag) {
+ processor_->Delete(tag);
+}
+
+void NonBlockingTypeProcessorTest::UpdateFromServer(int64 version_offset,
+ const std::string& tag,
+ const std::string& value) {
+ const std::string tag_hash = GenerateTagHash(tag);
+
+ // Overwrite the existing server version if this is the new highest version.
+ int64 old_version = GetServerVersion(tag_hash);
+ int64 version = old_version + version_offset;
+ if (version > old_version) {
+ SetServerVersion(tag_hash, version);
+ }
+
+ UpdateResponseData data;
+ data.id = GenerateId(tag_hash);
+ data.client_tag_hash = tag_hash;
+ data.response_version = version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
+ data.non_unique_name = tag;
+ data.deleted = false;
+ data.specifics = GenerateSpecifics(tag, value);
+
+ UpdateResponseDataList list;
+ list.push_back(data);
+
+ processor_->OnUpdateReceived(data_type_state_, list);
+}
+
+void NonBlockingTypeProcessorTest::TombstoneFromServer(int64 version_offset,
+ const std::string& tag) {
+ // Overwrite the existing server version if this is the new highest version.
+ std::string tag_hash = GenerateTagHash(tag);
+ int64 old_version = GetServerVersion(tag_hash);
+ int64 version = old_version + version_offset;
+ if (version > old_version) {
+ SetServerVersion(tag_hash, version);
+ }
+
+ UpdateResponseData data;
+ data.id = GenerateId(tag_hash);
+ data.client_tag_hash = tag_hash;
+ data.response_version = version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
+ data.non_unique_name = tag;
+ data.deleted = true;
+
+ UpdateResponseDataList list;
+ list.push_back(data);
+
+ processor_->OnUpdateReceived(data_type_state_, list);
+}
+
+void NonBlockingTypeProcessorTest::SuccessfulCommitResponse(
+ const CommitRequestData& request_data) {
+ const std::string& client_tag_hash = request_data.client_tag_hash;
+ CommitResponseData response_data;
+
+ if (request_data.base_version == 0) {
+ // Server assigns new ID to newly committed items.
+ DCHECK(request_data.id.empty());
+ response_data.id = request_data.id;
+ } else {
+ // Otherwise we reuse the ID from the request.
+ response_data.id = GenerateId(client_tag_hash);
+ }
+
+ response_data.client_tag_hash = client_tag_hash;
+ response_data.sequence_number = request_data.sequence_number;
+
+ // Increment the server version on successful commit.
+ int64 version = GetServerVersion(client_tag_hash);
+ version++;
+ SetServerVersion(client_tag_hash, version);
+
+ response_data.response_version = version;
+
+ CommitResponseDataList list;
+ list.push_back(response_data);
+
+ processor_->OnCommitCompletion(data_type_state_, list);
+}
+
+int64 NonBlockingTypeProcessorTest::GetServerVersion(const std::string& tag) {
+ std::map<const std::string, int64>::const_iterator it;
+ it = server_versions_.find(tag);
+ if (it == server_versions_.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+}
+
+void NonBlockingTypeProcessorTest::SetServerVersion(const std::string& tag_hash,
+ int64 version) {
+ server_versions_[tag_hash] = version;
+}
+
+std::string NonBlockingTypeProcessorTest::GenerateId(
+ const std::string& tag) const {
+ return "FakeId:" + tag;
+}
+
+std::string NonBlockingTypeProcessorTest::GenerateTagHash(
+ const std::string& tag) const {
+ return syncable::GenerateSyncableHash(type_, tag);
+}
+
+sync_pb::EntitySpecifics NonBlockingTypeProcessorTest::GenerateSpecifics(
+ const std::string& tag,
+ const std::string& value) const {
+ sync_pb::EntitySpecifics specifics;
+ specifics.mutable_preference()->set_name(tag);
+ specifics.mutable_preference()->set_value(value);
+ return specifics;
+}
+
+size_t NonBlockingTypeProcessorTest::GetNumCommitRequestLists() {
+ return mock_processor_core_->commit_request_lists_.size();
+}
+
+CommitRequestDataList NonBlockingTypeProcessorTest::GetNthCommitRequestList(
+ size_t n) {
+ DCHECK_LT(n, GetNumCommitRequestLists());
+ return mock_processor_core_->commit_request_lists_[n];
+}
+
+bool NonBlockingTypeProcessorTest::HasCommitRequestForTag(
+ const std::string& tag) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ const std::vector<CommitRequestDataList>& lists =
+ mock_processor_core_->commit_request_lists_;
+
+ // Iterate backward through the sets of commit requests to find the most
+ // recent one that applies to the specified tag.
+ for (std::vector<CommitRequestDataList>::const_reverse_iterator lists_it =
+ lists.rbegin();
+ lists_it != lists.rend();
+ ++lists_it) {
+ for (CommitRequestDataList::const_iterator it = lists_it->begin();
+ it != lists_it->end();
+ ++it) {
+ if (it->client_tag_hash == tag_hash) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+CommitRequestData NonBlockingTypeProcessorTest::GetLatestCommitRequestForTag(
+ const std::string& tag) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ const std::vector<CommitRequestDataList>& lists =
+ mock_processor_core_->commit_request_lists_;
+
+ // Iterate backward through the sets of commit requests to find the most
+ // recent one that applies to the specified tag.
+ for (std::vector<CommitRequestDataList>::const_reverse_iterator lists_it =
+ lists.rbegin();
+ lists_it != lists.rend();
+ ++lists_it) {
+ for (CommitRequestDataList::const_iterator it = lists_it->begin();
+ it != lists_it->end();
+ ++it) {
+ if (it->client_tag_hash == tag_hash) {
+ return *it;
+ }
+ }
+ }
+
+ NOTREACHED() << "Could not find any commits for given tag " << tag << ". "
+ << "Test should have checked HasCommitRequestForTag() first.";
+ return CommitRequestData();
+}
+
+// Creates a new item locally.
+// Thoroughly tests the data generated by a local item creation.
+TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) {
+ Initialize();
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ WriteItem("tag1", "value1");
+
+ // Verify the commit request this operation has triggered.
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_data = GetLatestCommitRequestForTag("tag1");
+
+ EXPECT_TRUE(tag1_data.id.empty());
+ EXPECT_EQ(0, tag1_data.base_version);
+ EXPECT_FALSE(tag1_data.ctime.is_null());
+ EXPECT_FALSE(tag1_data.mtime.is_null());
+ EXPECT_EQ("tag1", tag1_data.non_unique_name);
+ EXPECT_FALSE(tag1_data.deleted);
+ EXPECT_EQ("tag1", tag1_data.specifics.preference().name());
+ EXPECT_EQ("value1", tag1_data.specifics.preference().value());
+}
+
+// Creates a new local item then modifies it.
+// Thoroughly tests data generated by modification of server-unknown item.
+TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) {
+ Initialize();
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1");
+
+ WriteItem("tag1", "value2");
+ EXPECT_EQ(2U, GetNumCommitRequestLists());
+
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v2_data = GetLatestCommitRequestForTag("tag1");
+
+ // Test some of the relations between old and new commit requests.
+ EXPECT_EQ(tag1_v1_data.specifics.preference().value(), "value1");
+ EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number);
+
+ // Perform a thorough examination of the update-generated request.
+ EXPECT_TRUE(tag1_v2_data.id.empty());
+ EXPECT_EQ(0, tag1_v2_data.base_version);
+ EXPECT_FALSE(tag1_v2_data.ctime.is_null());
+ EXPECT_FALSE(tag1_v2_data.mtime.is_null());
+ EXPECT_EQ("tag1", tag1_v2_data.non_unique_name);
+ EXPECT_FALSE(tag1_v2_data.deleted);
+ EXPECT_EQ("tag1", tag1_v2_data.specifics.preference().name());
+ EXPECT_EQ("value2", tag1_v2_data.specifics.preference().value());
+}
+
+// Deletes an item we've never seen before.
+// Should have no effect and not crash.
+TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) {
+ Initialize();
+
+ DeleteItem("tag1");
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+}
+
+// Creates an item locally then deletes it.
+//
+// In this test, no commit responses are received, so the deleted item is
+// server-unknown as far as the model thread is concerned. That behavior
+// is race-dependent; other tests are used to test other races.
+TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) {
+ Initialize();
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1");
+
+ DeleteItem("tag1");
+ EXPECT_EQ(2U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v2_data = GetLatestCommitRequestForTag("tag1");
+
+ EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number);
+
+ EXPECT_TRUE(tag1_v2_data.id.empty());
+ EXPECT_EQ(0, tag1_v2_data.base_version);
+ EXPECT_TRUE(tag1_v2_data.deleted);
+}
+
+// Creates an item locally then deletes it.
+//
+// The item is created locally then enqueued for commit. The sync thread
+// successfully commits it, but, before the commit response is picked up
+// by the model thread, the item is deleted by the model thread.
+TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) {
+ Initialize();
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1");
+
+ DeleteItem("tag1");
+ EXPECT_EQ(2U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+
+ // This commit happened while the deletion was in progress, but the commit
+ // response didn't arrive on our thread until after the delete was issued to
+ // the sync thread. It will update some metadata, but won't do much else.
+ SuccessfulCommitResponse(tag1_v1_data);
+
+ // TODO(rlarocque): Verify the state of the item is correct once we get
+ // storage hooked up in these tests. For example, verify the item is still
+ // marked as deleted.
+}
+
+// Creates two different sync items.
+// Verifies that the second has no effect on the first.
+TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) {
+ Initialize();
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ WriteItem("tag1", "value1");
+
+ // There should be one commit request for this item only.
+ ASSERT_EQ(1U, GetNumCommitRequestLists());
+ EXPECT_EQ(1U, GetNthCommitRequestList(0).size());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+
+ WriteItem("tag2", "value2");
+
+ // The second write should trigger another single-item commit request.
+ ASSERT_EQ(2U, GetNumCommitRequestLists());
+ EXPECT_EQ(1U, GetNthCommitRequestList(1).size());
+ ASSERT_TRUE(HasCommitRequestForTag("tag2"));
+}
+
+// TODO(rlarocque): Add more testing of non_unique_name fields.
+
+} // namespace syncer