summaryrefslogtreecommitdiffstats
path: root/sync/engine
diff options
context:
space:
mode:
Diffstat (limited to 'sync/engine')
-rw-r--r--sync/engine/model_thread_sync_entity.cc157
-rw-r--r--sync/engine/model_thread_sync_entity.h186
-rw-r--r--sync/engine/model_thread_sync_entity_unittest.cc178
-rw-r--r--sync/engine/non_blocking_sync_common.cc33
-rw-r--r--sync/engine/non_blocking_sync_common.h98
-rw-r--r--sync/engine/non_blocking_type_processor.cc149
-rw-r--r--sync/engine/non_blocking_type_processor.h49
-rw-r--r--sync/engine/non_blocking_type_processor_core.cc5
-rw-r--r--sync/engine/non_blocking_type_processor_core.h4
-rw-r--r--sync/engine/non_blocking_type_processor_core_interface.cc16
-rw-r--r--sync/engine/non_blocking_type_processor_core_interface.h24
-rw-r--r--sync/engine/non_blocking_type_processor_unittest.cc524
12 files changed, 1409 insertions, 14 deletions
diff --git a/sync/engine/model_thread_sync_entity.cc b/sync/engine/model_thread_sync_entity.cc
new file mode 100644
index 0000000..fde2eb3
--- /dev/null
+++ b/sync/engine/model_thread_sync_entity.cc
@@ -0,0 +1,157 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/model_thread_sync_entity.h"
+#include "sync/syncable/syncable_util.h"
+
+namespace syncer {
+
+scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::NewLocalItem(
+ const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time now) {
+ return scoped_ptr<ModelThreadSyncEntity>(new ModelThreadSyncEntity(
+ 1,
+ 0,
+ 0,
+ 0,
+ true,
+ std::string(), // Sync thread will assign the initial ID.
+ syncable::GenerateSyncableHash(GetModelTypeFromSpecifics(specifics),
+ client_tag),
+ client_tag, // As non-unique name.
+ specifics,
+ false,
+ now,
+ now));
+}
+
+scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::FromServerUpdate(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ int64 version,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime) {
+ return scoped_ptr<ModelThreadSyncEntity>(
+ new ModelThreadSyncEntity(0,
+ 0,
+ 0,
+ version,
+ true,
+ id,
+ client_tag_hash,
+ non_unique_name,
+ specifics,
+ deleted,
+ ctime,
+ mtime));
+}
+
+ModelThreadSyncEntity::ModelThreadSyncEntity(
+ int64 sequence_number,
+ int64 commit_requested_sequence_number,
+ int64 acked_sequence_number,
+ int64 base_version,
+ bool is_dirty,
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime)
+ : sequence_number_(sequence_number),
+ commit_requested_sequence_number_(commit_requested_sequence_number),
+ acked_sequence_number_(acked_sequence_number),
+ base_version_(base_version),
+ is_dirty_(is_dirty),
+ id_(id),
+ client_tag_hash_(client_tag_hash),
+ non_unique_name_(non_unique_name),
+ specifics_(specifics),
+ deleted_(deleted),
+ ctime_(ctime),
+ mtime_(mtime) {
+}
+
+ModelThreadSyncEntity::~ModelThreadSyncEntity() {
+}
+
+bool ModelThreadSyncEntity::IsWriteRequired() const {
+ return is_dirty_;
+}
+
+bool ModelThreadSyncEntity::IsUnsynced() const {
+ return sequence_number_ > acked_sequence_number_;
+}
+
+bool ModelThreadSyncEntity::RequiresCommitRequest() const {
+ return sequence_number_ > commit_requested_sequence_number_;
+}
+
+bool ModelThreadSyncEntity::UpdateIsReflection(int64 update_version) const {
+ return base_version_ >= update_version;
+}
+
+bool ModelThreadSyncEntity::UpdateIsInConflict(int64 update_version) const {
+ return IsUnsynced() && !UpdateIsReflection(update_version);
+}
+
+void ModelThreadSyncEntity::ApplyUpdateFromServer(
+ int64 update_version,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time mtime) {
+ // There was a conflict and the server just won it.
+ // This implicitly acks all outstanding commits because a received update
+ // will clobber any pending commits on the sync thread.
+ acked_sequence_number_ = sequence_number_;
+ commit_requested_sequence_number_ = sequence_number_;
+
+ base_version_ = update_version;
+ specifics_ = specifics;
+ mtime_ = mtime;
+}
+
+void ModelThreadSyncEntity::MakeLocalChange(
+ const sync_pb::EntitySpecifics& specifics) {
+ sequence_number_++;
+ specifics_ = specifics;
+}
+
+void ModelThreadSyncEntity::Delete() {
+ sequence_number_++;
+ specifics_.Clear();
+ deleted_ = true;
+}
+
+void ModelThreadSyncEntity::InitializeCommitRequestData(
+ CommitRequestData* request) const {
+ request->id = id_;
+ request->client_tag_hash = client_tag_hash_;
+ request->sequence_number = sequence_number_;
+ request->base_version = base_version_;
+ request->ctime = ctime_;
+ request->mtime = mtime_;
+ request->non_unique_name = non_unique_name_;
+ request->deleted = deleted_;
+ request->specifics.CopyFrom(specifics_);
+}
+
+void ModelThreadSyncEntity::SetCommitRequestInProgress() {
+ commit_requested_sequence_number_ = sequence_number_;
+}
+
+void ModelThreadSyncEntity::ReceiveCommitResponse(const std::string& id,
+ int64 sequence_number,
+ int64 response_version) {
+ id_ = id; // The server can assign us a new ID in a commit response.
+ acked_sequence_number_ = sequence_number;
+ base_version_ = response_version;
+}
+
+} // namespace syncer
diff --git a/sync/engine/model_thread_sync_entity.h b/sync/engine/model_thread_sync_entity.h
new file mode 100644
index 0000000..31cc2b8
--- /dev/null
+++ b/sync/engine/model_thread_sync_entity.h
@@ -0,0 +1,186 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_
+#define SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_
+
+#include <string>
+
+#include "base/memory/scoped_ptr.h"
+#include "base/time/time.h"
+#include "sync/base/sync_export.h"
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/protocol/sync.pb.h"
+
+namespace syncer {
+
+// This is the model thread's representation of a SyncEntity.
+//
+// The model thread sync entity receives updates from the model itself and
+// (asynchronously) from the sync server via the sync thread. From the point
+// of view of this class, updates from the server take precedence over local
+// changes, though the model may be given an opportunity to overwrite this
+// decision later.
+//
+// Sync will try to commit this entity's data to the sync server and local
+// storage.
+//
+// Most of the logic related to those processes live outside this class. This
+// class helps out a bit by offering some functions to serialize its data to
+// various formats and query the entity's status.
+class SYNC_EXPORT_PRIVATE ModelThreadSyncEntity {
+ public:
+ // Construct an instance representing a new locally-created item.
+ static scoped_ptr<ModelThreadSyncEntity> NewLocalItem(
+ const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time now);
+
+ // Construct an instance representing an item newly received from the server.
+ static scoped_ptr<ModelThreadSyncEntity> FromServerUpdate(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ int64 version,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime);
+
+ // TODO(rlarocque): Implement FromDisk constructor when we implement storage.
+
+ ~ModelThreadSyncEntity();
+
+ // Returns true if this data is out of sync with local storage.
+ bool IsWriteRequired() const;
+
+ // Returns true if this data is out of sync with the server.
+ // A commit may or may not be in progress at this time.
+ bool IsUnsynced() const;
+
+ // Returns true if this data is out of sync with the sync thread.
+ //
+ // There may or may not be a commit in progress for this item, but there's
+ // definitely no commit in progress for this (most up to date) version of
+ // this item.
+ bool RequiresCommitRequest() const;
+
+ // Returns true if the specified update version does not contain new data.
+ bool UpdateIsReflection(int64 update_version) const;
+
+ // Returns true if the specified update version conflicts with local changes.
+ bool UpdateIsInConflict(int64 update_version) const;
+
+ // Applies an update from the sync server.
+ //
+ // Overrides any local changes. Check UpdateIsInConflict() before calling
+ // this function if you want to handle conflicts differently.
+ void ApplyUpdateFromServer(int64 update_version,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics,
+ base::Time mtime);
+
+ // Applies a local change to this item.
+ void MakeLocalChange(const sync_pb::EntitySpecifics& specifics);
+
+ // Applies a local deletion to this item.
+ void Delete();
+
+ // Initializes a message representing this item's uncommitted state
+ // to be forwarded to the sync server for committing.
+ void InitializeCommitRequestData(CommitRequestData* request) const;
+
+ // Notes that the current version of this item has been queued for commit.
+ void SetCommitRequestInProgress();
+
+ // Receives a successful commit response.
+ //
+ // Sucssful commit responses can overwrite an item's ID.
+ //
+ // Note that the receipt of a successful commit response does not necessarily
+ // unset IsUnsynced(). If many local changes occur in quick succession, it's
+ // possible that the committed item was already out of date by the time it
+ // reached the server.
+ void ReceiveCommitResponse(const std::string& id,
+ int64 sequence_number,
+ int64 response_version);
+
+ private:
+ ModelThreadSyncEntity(int64 sequence_number,
+ int64 commit_requested_sequence_number,
+ int64 acked_sequence_number,
+ int64 base_version,
+ bool is_dirty,
+ const std::string& id,
+ const std::string& client_tag_hash,
+ const std::string& non_unique_name,
+ const sync_pb::EntitySpecifics& specifics,
+ bool deleted,
+ base::Time ctime,
+ base::Time mtime);
+
+ // A sequence number used to track in-progress commits. Each local change
+ // increments this number.
+ int64 sequence_number_;
+
+ // The sequence number of the last item sent to the sync thread.
+ int64 commit_requested_sequence_number_;
+
+ // The sequence number of the last item known to be successfully committed.
+ int64 acked_sequence_number_;
+
+ // The server version on which this item is based.
+ //
+ // If there are no local changes, this is the version of the entity as we see
+ // it here.
+ //
+ // If there are local changes, this is the version of the entity on which
+ // those changes are based.
+ int64 base_version_;
+
+ // True if this entity is out of sync with local storage.
+ bool is_dirty_;
+
+ // The entity's ID.
+ //
+ // Most of the time, this is a server-assigned value.
+ //
+ // Prior to the item's first commit, we leave this value as an empty string.
+ // The initial ID for a newly created item has to meet certain uniqueness
+ // requirements, and we handle those on the sync thread.
+ std::string id_;
+
+ // A hash based on the client tag and model type.
+ // Used for various map lookups. Should always be available.
+ std::string client_tag_hash_;
+
+ // A non-unique name associated with this entity.
+ //
+ // It is sometimes used for debugging. It gets saved to and restored from
+ // the sync server.
+ //
+ // Its value is often related to the item's unhashed client tag, though this
+ // is not guaranteed and should not be relied on. May be hidden when
+ // encryption is enabled.
+ std::string non_unique_name_;
+
+ // A protobuf filled with type-specific information. Contains the most
+ // up-to-date specifics, whether it be from the server or a locally modified
+ // version.
+ sync_pb::EntitySpecifics specifics_;
+
+ // Whether or not the item is deleted. The |specifics_| field may be empty
+ // if this flag is true.
+ bool deleted_;
+
+ // Entity creation and modification timestamps.
+ // Assigned by the client and synced by the server, though the server usually
+ // doesn't bother to inspect their values.
+ base::Time ctime_;
+ base::Time mtime_;
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_
diff --git a/sync/engine/model_thread_sync_entity_unittest.cc b/sync/engine/model_thread_sync_entity_unittest.cc
new file mode 100644
index 0000000..6b158dd
--- /dev/null
+++ b/sync/engine/model_thread_sync_entity_unittest.cc
@@ -0,0 +1,178 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/model_thread_sync_entity.h"
+
+#include "base/memory/scoped_ptr.h"
+#include "base/time/time.h"
+#include "sync/internal_api/public/base/model_type.h"
+#include "sync/protocol/sync.pb.h"
+#include "sync/syncable/syncable_util.h"
+
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace syncer {
+
+// Some simple sanity tests for the ModelThreadSyncEntity.
+//
+// A lot of the more complicated sync logic is implemented in the
+// NonBlockingTypeProcessor that owns the ModelThreadSyncEntity. We
+// can't unit test it here.
+//
+// Instead, we focus on simple tests to make sure that variables are getting
+// properly intialized and flags properly set. Anything more complicated would
+// be a redundant and incomplete version of the NonBlockingTypeProcessor tests.
+class ModelThreadSyncEntityTest : public ::testing::Test {
+ public:
+ ModelThreadSyncEntityTest()
+ : kServerId("ServerID"),
+ kClientTag("sample.pref.name"),
+ kClientTagHash(syncable::GenerateSyncableHash(PREFERENCES, kClientTag)),
+ kCtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(10)),
+ kMtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(20)) {
+ sync_pb::PreferenceSpecifics* pref_specifics =
+ specifics.mutable_preference();
+ pref_specifics->set_name(kClientTag);
+ pref_specifics->set_value("pref.value");
+ }
+
+ const std::string kServerId;
+ const std::string kClientTag;
+ const std::string kClientTagHash;
+ const base::Time kCtime;
+ const base::Time kMtime;
+ sync_pb::EntitySpecifics specifics;
+};
+
+TEST_F(ModelThreadSyncEntityTest, NewLocalItem) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::NewLocalItem("asdf", specifics, kCtime));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_TRUE(entity->IsUnsynced());
+ EXPECT_FALSE(entity->UpdateIsReflection(1));
+ EXPECT_TRUE(entity->UpdateIsInConflict(1));
+}
+
+TEST_F(ModelThreadSyncEntityTest, FromServerUpdate) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_FALSE(entity->IsUnsynced());
+ EXPECT_TRUE(entity->UpdateIsReflection(9));
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_FALSE(entity->UpdateIsInConflict(11));
+}
+
+// Tombstones should behave just like regular updates. Mostly. The strangest
+// thing about them is that they don't have specifics, so it can be hard to
+// detect their type. Fortunately, this class doesn't care about types in
+// received updates.
+TEST_F(ModelThreadSyncEntityTest, TombstoneUpdate) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ sync_pb::EntitySpecifics(),
+ true,
+ kCtime,
+ kMtime));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_FALSE(entity->IsUnsynced());
+ EXPECT_TRUE(entity->UpdateIsReflection(9));
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_FALSE(entity->UpdateIsInConflict(11));
+}
+
+// Apply a deletion update.
+TEST_F(ModelThreadSyncEntityTest, ApplyUpdate) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ // A deletion update one version later.
+ entity->ApplyUpdateFromServer(11,
+ true,
+ sync_pb::EntitySpecifics(),
+ kMtime + base::TimeDelta::FromSeconds(10));
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_FALSE(entity->IsUnsynced());
+ EXPECT_TRUE(entity->UpdateIsReflection(11));
+ EXPECT_FALSE(entity->UpdateIsReflection(12));
+}
+
+TEST_F(ModelThreadSyncEntityTest, LocalChange) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ sync_pb::EntitySpecifics specifics2;
+ specifics2.CopyFrom(specifics);
+ specifics2.mutable_preference()->set_value("new.pref.value");
+
+ entity->MakeLocalChange(specifics2);
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_TRUE(entity->IsUnsynced());
+
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsInConflict(10));
+
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_TRUE(entity->UpdateIsInConflict(11));
+}
+
+TEST_F(ModelThreadSyncEntityTest, LocalDeletion) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::FromServerUpdate(
+ kServerId,
+ kClientTagHash,
+ kClientTag, // As non-unique name.
+ 10,
+ specifics,
+ false,
+ kCtime,
+ kMtime));
+
+ entity->Delete();
+
+ EXPECT_TRUE(entity->IsWriteRequired());
+ EXPECT_TRUE(entity->IsUnsynced());
+
+ EXPECT_TRUE(entity->UpdateIsReflection(10));
+ EXPECT_FALSE(entity->UpdateIsInConflict(10));
+
+ EXPECT_FALSE(entity->UpdateIsReflection(11));
+ EXPECT_TRUE(entity->UpdateIsInConflict(11));
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_sync_common.cc b/sync/engine/non_blocking_sync_common.cc
new file mode 100644
index 0000000..c5a3656
--- /dev/null
+++ b/sync/engine/non_blocking_sync_common.cc
@@ -0,0 +1,33 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_sync_common.h"
+
+namespace syncer {
+
+DataTypeState::DataTypeState() {
+}
+
+DataTypeState::~DataTypeState() {
+}
+
+CommitRequestData::CommitRequestData() {
+}
+
+CommitRequestData::~CommitRequestData() {
+}
+
+CommitResponseData::CommitResponseData() {
+}
+
+CommitResponseData::~CommitResponseData() {
+}
+
+UpdateResponseData::UpdateResponseData() {
+}
+
+UpdateResponseData::~UpdateResponseData() {
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_sync_common.h b/sync/engine/non_blocking_sync_common.h
new file mode 100644
index 0000000..f07fdb2
--- /dev/null
+++ b/sync/engine/non_blocking_sync_common.h
@@ -0,0 +1,98 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_
+#define SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_
+
+#include <string>
+#include <vector>
+
+#include "base/time/time.h"
+#include "sync/base/sync_export.h"
+#include "sync/protocol/sync.pb.h"
+
+namespace syncer {
+
+// Data-type global state that must be accessed and updated on the sync thread,
+// but persisted on or through the model thread.
+struct SYNC_EXPORT_PRIVATE DataTypeState {
+ DataTypeState();
+ ~DataTypeState();
+
+ // The latest progress markers received from the server.
+ sync_pb::DataTypeProgressMarker progress_marker;
+
+ // A data type context. Sent to the server in every commit or update
+ // request. May be updated by either by responses from the server or
+ // requests made on the model thread. The interpretation of this value may
+ // be data-type specific. Many data types ignore it.
+ sync_pb::DataTypeContext type_context;
+
+ // The ID of the folder node that sits at the top of this type's folder
+ // hierarchy. We keep this around for legacy reasons. The protocol expects
+ // that all nodes of a certain type are children of the same type root
+ // entity. This entity is delivered by the server, and may not be available
+ // until the first download cycle has completed.
+ std::string type_root_id;
+
+ // A strictly increasing counter used to generate unique values for the
+ // client-assigned IDs. The incrementing and ID assignment happens on the
+ // sync thread, but we store the value here so we can pass it back to the
+ // model thread for persistence. This is probably unnecessary for the
+ // client-tagged data types supported by non-blocking sync, but we will
+ // continue to emulate the directory sync's behavior for now.
+ int64 next_client_id;
+};
+
+struct SYNC_EXPORT_PRIVATE CommitRequestData {
+ CommitRequestData();
+ ~CommitRequestData();
+
+ std::string id;
+ std::string client_tag_hash;
+
+ // Strictly incrementing number for in-progress commits. More information
+ // about its meaning can be found in comments in the files that make use of
+ // this struct.
+ int64 sequence_number;
+
+ int64 base_version;
+ base::Time ctime;
+ base::Time mtime;
+ std::string non_unique_name;
+ bool deleted;
+ sync_pb::EntitySpecifics specifics;
+};
+
+struct SYNC_EXPORT_PRIVATE CommitResponseData {
+ CommitResponseData();
+ ~CommitResponseData();
+
+ std::string id;
+ std::string client_tag_hash;
+ int64 sequence_number;
+ int64 response_version;
+};
+
+struct SYNC_EXPORT_PRIVATE UpdateResponseData {
+ UpdateResponseData();
+ ~UpdateResponseData();
+
+ std::string id;
+ std::string client_tag_hash;
+ int64 response_version;
+ base::Time ctime;
+ base::Time mtime;
+ std::string non_unique_name;
+ bool deleted;
+ sync_pb::EntitySpecifics specifics;
+};
+
+typedef std::vector<CommitRequestData> CommitRequestDataList;
+typedef std::vector<CommitResponseData> CommitResponseDataList;
+typedef std::vector<UpdateResponseData> UpdateResponseDataList;
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_
diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc
index 66a016b..0aaead4 100644
--- a/sync/engine/non_blocking_type_processor.cc
+++ b/sync/engine/non_blocking_type_processor.cc
@@ -4,9 +4,13 @@
#include "sync/engine/non_blocking_type_processor.h"
+#include "base/bind.h"
+#include "base/location.h"
#include "base/message_loop/message_loop_proxy.h"
-#include "sync/engine/non_blocking_type_processor_core.h"
+#include "sync/engine/model_thread_sync_entity.h"
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
#include "sync/internal_api/public/sync_core_proxy.h"
+#include "sync/syncable/syncable_util.h"
namespace syncer {
@@ -14,6 +18,7 @@ NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
: type_(type),
is_preferred_(false),
is_connected_(false),
+ entities_deleter_(&entities_),
weak_ptr_factory_for_ui_(this),
weak_ptr_factory_for_sync_(this) {
}
@@ -40,9 +45,17 @@ void NonBlockingTypeProcessor::Enable(
scoped_ptr<SyncCoreProxy> sync_core_proxy) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
+
is_preferred_ = true;
+
+ // TODO(rlarocque): At some point, this should be loaded from storage.
+ data_type_state_.progress_marker.set_data_type_id(
+ GetSpecificsFieldNumberFromModelType(type_));
+ data_type_state_.next_client_id = 0;
+
sync_core_proxy_ = sync_core_proxy.Pass();
sync_core_proxy_->ConnectTypeToCore(GetModelType(),
+ data_type_state_,
weak_ptr_factory_for_sync_.GetWeakPtr());
}
@@ -63,8 +76,7 @@ void NonBlockingTypeProcessor::Disconnect() {
}
weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
- core_ = base::WeakPtr<NonBlockingTypeProcessorCore>();
- sync_thread_ = scoped_refptr<base::SequencedTaskRunner>();
+ core_interface_.reset();
}
base::WeakPtr<NonBlockingTypeProcessor>
@@ -74,13 +86,136 @@ NonBlockingTypeProcessor::AsWeakPtrForUI() {
}
void NonBlockingTypeProcessor::OnConnect(
- base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread) {
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
+
is_connected_ = true;
- core_ = core;
- sync_thread_ = sync_thread;
+ core_interface_ = core_interface.Pass();
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::Put(const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics) {
+ DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
+
+ const std::string client_tag_hash(
+ syncable::GenerateSyncableHash(type_, client_tag));
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ scoped_ptr<ModelThreadSyncEntity> entity(
+ ModelThreadSyncEntity::NewLocalItem(
+ client_tag, specifics, base::Time::Now()));
+ entities_.insert(std::make_pair(client_tag_hash, entity.release()));
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->MakeLocalChange(specifics);
+ }
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
+ const std::string client_tag_hash(
+ syncable::GenerateSyncableHash(type_, client_tag));
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ // That's unusual, but not necessarily a bad thing.
+ // Missing is as good as deleted as far as the model is concerned.
+ DLOG(WARNING) << "Attempted to delete missing item."
+ << " client tag: " << client_tag;
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->Delete();
+ }
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
+ CommitRequestDataList commit_requests;
+
+ // Don't bother sending anything if there's no one to send to.
+ if (!IsConnected())
+ return;
+
+ // TODO(rlarocque): Do something smarter than iterate here.
+ for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
+ ++it) {
+ if (it->second->RequiresCommitRequest()) {
+ CommitRequestData request;
+ it->second->InitializeCommitRequestData(&request);
+ commit_requests.push_back(request);
+ it->second->SetCommitRequestInProgress();
+ }
+ }
+
+ if (!commit_requests.empty())
+ core_interface_->RequestCommits(commit_requests);
+}
+
+void NonBlockingTypeProcessor::OnCommitCompletion(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ data_type_state_ = type_state;
+
+ for (CommitResponseDataList::const_iterator list_it = response_list.begin();
+ list_it != response_list.end();
+ ++list_it) {
+ const CommitResponseData& response_data = *list_it;
+ const std::string& client_tag_hash = response_data.client_tag_hash;
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ NOTREACHED() << "Received commit response for missing item."
+ << " type: " << type_ << " client_tag: " << client_tag_hash;
+ return;
+ } else {
+ it->second->ReceiveCommitResponse(response_data.id,
+ response_data.sequence_number,
+ response_data.response_version);
+ }
+ }
+}
+
+void NonBlockingTypeProcessor::OnUpdateReceived(
+ const DataTypeState& data_type_state,
+ const UpdateResponseDataList& response_list) {
+ data_type_state_ = data_type_state;
+
+ for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
+ list_it != response_list.end();
+ ++list_it) {
+ const UpdateResponseData& response_data = *list_it;
+ const std::string& client_tag_hash = response_data.client_tag_hash;
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ scoped_ptr<ModelThreadSyncEntity> entity =
+ ModelThreadSyncEntity::FromServerUpdate(
+ response_data.id,
+ response_data.client_tag_hash,
+ response_data.non_unique_name,
+ response_data.response_version,
+ response_data.specifics,
+ response_data.deleted,
+ response_data.ctime,
+ response_data.mtime);
+ entities_.insert(std::make_pair(client_tag_hash, entity.release()));
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->ApplyUpdateFromServer(response_data.response_version,
+ response_data.deleted,
+ response_data.specifics,
+ response_data.mtime);
+ // TODO: Do something special when conflicts are detected.
+ }
+ }
+
+ // TODO: Inform the model of the new or updated data.
}
} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor.h b/sync/engine/non_blocking_type_processor.h
index 2db2c32..2274c5d 100644
--- a/sync/engine/non_blocking_type_processor.h
+++ b/sync/engine/non_blocking_type_processor.h
@@ -6,16 +6,18 @@
#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_H_
#include "base/memory/weak_ptr.h"
-#include "base/sequenced_task_runner.h"
+#include "base/stl_util.h"
#include "base/threading/non_thread_safe.h"
#include "sync/base/sync_export.h"
+#include "sync/engine/non_blocking_sync_common.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/protocol/sync.pb.h"
namespace syncer {
class SyncCoreProxy;
-class NonBlockingTypeProcessorCore;
+class ModelThreadSyncEntity;
+class NonBlockingTypeProcessorCoreInterface;
// A sync component embedded on the synced type's thread that helps to handle
// communication between sync and model type threads.
@@ -54,16 +56,38 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe {
void Disconnect();
// Callback used to process the handshake response.
- void OnConnect(base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread);
+ void OnConnect(
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface);
+
+ // Requests that an item be stored in sync.
+ void Put(const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics);
+
+ // Deletes an item from sync.
+ void Delete(const std::string& client_tag);
+
+ // Informs this object that some of its commit requests have been
+ // successfully serviced.
+ void OnCommitCompletion(const DataTypeState& type_state,
+ const CommitResponseDataList& response_list);
+
+ // Informs this object that there are some incoming updates is should
+ // handle.
+ void OnUpdateReceived(const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list);
// Returns the long-lived WeakPtr that is intended to be registered with the
// ProfileSyncService.
base::WeakPtr<NonBlockingTypeProcessor> AsWeakPtrForUI();
private:
+ typedef std::map<std::string, ModelThreadSyncEntity*> EntityMap;
+
+ // Sends all commit requests that are due to be sent to the sync thread.
+ void FlushPendingCommitRequests();
+
ModelType type_;
- sync_pb::DataTypeProgressMarker progress_marker_;
+ DataTypeState data_type_state_;
// Whether or not sync is preferred for this type. This is a cached copy of
// the canonical copy information on the UI thread.
@@ -75,10 +99,21 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe {
// Our link to data type management on the sync thread.
// Used for enabling and disabling sync for this type.
+ //
+ // Beware of NULL pointers: This object is uninitialized when we are not
+ // connected to sync.
scoped_ptr<SyncCoreProxy> sync_core_proxy_;
- base::WeakPtr<NonBlockingTypeProcessorCore> core_;
- scoped_refptr<base::SequencedTaskRunner> sync_thread_;
+ // Reference to the NonBlockingTypeProcessorCore.
+ //
+ // The interface hides the posting of tasks across threads as well as the
+ // NonBlockingTypeProcessorCore's implementation. Both of these features are
+ // useful in tests.
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface_;
+
+ // The set of sync entities known to this object.
+ EntityMap entities_;
+ STLValueDeleter<EntityMap> entities_deleter_;
// We use two different WeakPtrFactories because we want the pointers they
// issue to have different lifetimes. When asked to disconnect from the sync
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc
index ea0f918..236c9c5 100644
--- a/sync/engine/non_blocking_type_processor_core.cc
+++ b/sync/engine/non_blocking_type_processor_core.cc
@@ -63,6 +63,11 @@ void NonBlockingTypeProcessorCore::ApplyUpdates(
DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_);
}
+void NonBlockingTypeProcessorCore::RequestCommits(
+ const CommitRequestDataList& request_list) {
+ // TODO(rlarocque): Implement this. crbug.com/351005.
+}
+
void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
sessions::StatusController* status) {
NOTREACHED()
diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h
index c9cec88..e226349 100644
--- a/sync/engine/non_blocking_type_processor_core.h
+++ b/sync/engine/non_blocking_type_processor_core.h
@@ -9,6 +9,7 @@
#include "base/threading/non_thread_safe.h"
#include "sync/base/sync_export.h"
#include "sync/engine/commit_contributor.h"
+#include "sync/engine/non_blocking_sync_common.h"
#include "sync/engine/update_handler.h"
#include "sync/internal_api/public/base/model_type.h"
#include "sync/protocol/sync.pb.h"
@@ -67,6 +68,9 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore
virtual void ApplyUpdates(sessions::StatusController* status) OVERRIDE;
virtual void PassiveApplyUpdates(sessions::StatusController* status) OVERRIDE;
+ // Entry point for NonBlockingTypeProcessor to send commit requests.
+ void RequestCommits(const CommitRequestDataList& request_list);
+
// CommitContributor implementation.
virtual scoped_ptr<CommitContribution> GetContribution(
size_t max_entries) OVERRIDE;
diff --git a/sync/engine/non_blocking_type_processor_core_interface.cc b/sync/engine/non_blocking_type_processor_core_interface.cc
new file mode 100644
index 0000000..6ce74cc
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_core_interface.cc
@@ -0,0 +1,16 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
+
+namespace syncer {
+
+NonBlockingTypeProcessorCoreInterface::NonBlockingTypeProcessorCoreInterface() {
+}
+
+NonBlockingTypeProcessorCoreInterface::
+ ~NonBlockingTypeProcessorCoreInterface() {
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor_core_interface.h b/sync/engine/non_blocking_type_processor_core_interface.h
new file mode 100644
index 0000000..56675c2
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_core_interface.h
@@ -0,0 +1,24 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_
+#define SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_
+
+#include "sync/engine/non_blocking_sync_common.h"
+
+namespace syncer {
+
+// An interface representing a NonBlockingTypeProcessorCore and its thread.
+// This abstraction is useful in tests.
+class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorCoreInterface {
+ public:
+ NonBlockingTypeProcessorCoreInterface();
+ virtual ~NonBlockingTypeProcessorCoreInterface();
+
+ virtual void RequestCommits(const CommitRequestDataList& list) = 0;
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_
diff --git a/sync/engine/non_blocking_type_processor_unittest.cc b/sync/engine/non_blocking_type_processor_unittest.cc
new file mode 100644
index 0000000..9ab6ca3
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_unittest.cc
@@ -0,0 +1,524 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_processor.h"
+
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
+#include "sync/internal_api/public/base/model_type.h"
+#include "sync/internal_api/public/sync_core_proxy.h"
+#include "sync/protocol/sync.pb.h"
+#include "sync/syncable/syncable_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace syncer {
+
+namespace {
+
+class MockNonBlockingTypeProcessorCore
+ : public NonBlockingTypeProcessorCoreInterface {
+ public:
+ MockNonBlockingTypeProcessorCore();
+ virtual ~MockNonBlockingTypeProcessorCore();
+
+ virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
+
+ bool is_connected_;
+
+ std::vector<CommitRequestDataList> commit_request_lists_;
+};
+
+MockNonBlockingTypeProcessorCore::MockNonBlockingTypeProcessorCore()
+ : is_connected_(false) {
+}
+
+MockNonBlockingTypeProcessorCore::~MockNonBlockingTypeProcessorCore() {
+}
+
+void MockNonBlockingTypeProcessorCore::RequestCommits(
+ const CommitRequestDataList& list) {
+ commit_request_lists_.push_back(list);
+}
+
+class MockSyncCoreProxy : public syncer::SyncCoreProxy {
+ public:
+ MockSyncCoreProxy();
+ virtual ~MockSyncCoreProxy();
+
+ virtual void ConnectTypeToCore(
+ syncer::ModelType type,
+ const DataTypeState& data_type_state,
+ base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) OVERRIDE;
+ virtual void Disconnect(syncer::ModelType type) OVERRIDE;
+ virtual scoped_ptr<SyncCoreProxy> Clone() const OVERRIDE;
+
+ MockNonBlockingTypeProcessorCore* GetMockProcessorCore();
+
+ private:
+ explicit MockSyncCoreProxy(MockNonBlockingTypeProcessorCore*);
+
+ // The NonBlockingTypeProcessor's contract expects that it gets to own this
+ // object, so we can retain only a non-owned pointer to it.
+ //
+ // This is very unsafe, but we can get away with it since these tests are not
+ // exercising the processor <-> processor_core connection code.
+ MockNonBlockingTypeProcessorCore* mock_core_;
+};
+
+MockSyncCoreProxy::MockSyncCoreProxy()
+ : mock_core_(new MockNonBlockingTypeProcessorCore) {
+}
+
+MockSyncCoreProxy::MockSyncCoreProxy(MockNonBlockingTypeProcessorCore* core)
+ : mock_core_(core) {
+}
+
+MockSyncCoreProxy::~MockSyncCoreProxy() {
+}
+
+void MockSyncCoreProxy::ConnectTypeToCore(
+ syncer::ModelType type,
+ const DataTypeState& data_type_state,
+ base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) {
+ // This class is allowed to participate in only one connection.
+ DCHECK(!mock_core_->is_connected_);
+ mock_core_->is_connected_ = true;
+
+ // Hands off ownership of our member to the type_processor, while keeping
+ // an unsafe pointer to it. This is why we can only connect once.
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core(mock_core_);
+
+ type_processor->OnConnect(core.Pass());
+}
+
+void MockSyncCoreProxy::Disconnect(syncer::ModelType type) {
+ // This mock object is not meant for connect and disconnect tests.
+ NOTREACHED() << "Not implemented";
+}
+
+scoped_ptr<SyncCoreProxy> MockSyncCoreProxy::Clone() const {
+ // There's no sensible way to clone this MockSyncCoreProxy.
+ return scoped_ptr<SyncCoreProxy>(new MockSyncCoreProxy(mock_core_));
+}
+
+MockNonBlockingTypeProcessorCore* MockSyncCoreProxy::GetMockProcessorCore() {
+ return mock_core_;
+}
+
+} // namespace
+
+// Tests the sync engine parts of NonBlockingTypeProcessor.
+//
+// The NonBlockingTypeProcessor contains a non-trivial amount of code dedicated
+// to turning the sync engine on and off again. That code is fairly well
+// tested in the NonBlockingDataTypeController unit tests and it doesn't need
+// to be re-tested here.
+//
+// These tests skip past initialization and focus on steady state sync engine
+// behvior. This is where we test how the processor responds to the model's
+// requests to make changes to its data, the messages incoming fro the sync
+// server, and what happens when the two conflict.
+//
+// Inputs:
+// - Initial state from permanent storage. (TODO)
+// - Create, update or delete requests from the model.
+// - Update responses and commit responses from the server.
+//
+// Outputs:
+// - Writes to permanent storage. (TODO)
+// - Callbacks into the model. (TODO)
+// - Requests to the sync thread. Tested with MockNonBlockingTypeProcessorCore.
+class NonBlockingTypeProcessorTest : public ::testing::Test {
+ public:
+ NonBlockingTypeProcessorTest();
+ virtual ~NonBlockingTypeProcessorTest();
+
+ // Explicit initialization step. Kept separate to allow tests to inject
+ // on-disk state before the test begins.
+ void Initialize();
+
+ // Local data modification. Emulates signals from the model thread.
+ void WriteItem(const std::string& tag, const std::string& value);
+ void DeleteItem(const std::string& tag);
+
+ // Emulate updates from the server.
+ // This harness has some functionality to help emulate server behavior.
+ // See the definitions of these methods for more information.
+ void UpdateFromServer(int64 version_offset,
+ const std::string& tag,
+ const std::string& value);
+ void TombstoneFromServer(int64 version_offset, const std::string& tag);
+
+ // Read emitted commit requests as batches.
+ size_t GetNumCommitRequestLists();
+ CommitRequestDataList GetNthCommitRequestList(size_t n);
+
+ // Read emitted commit requests by tag, most recent only.
+ bool HasCommitRequestForTag(const std::string& tag);
+ CommitRequestData GetLatestCommitRequestForTag(const std::string& tag);
+
+ // Sends the processor a successful commit response.
+ void SuccessfulCommitResponse(const CommitRequestData& request_data);
+
+ private:
+ std::string GenerateId(const std::string& tag) const;
+ std::string GenerateTagHash(const std::string& tag) const;
+ sync_pb::EntitySpecifics GenerateSpecifics(const std::string& tag,
+ const std::string& value) const;
+
+ int64 GetServerVersion(const std::string& tag);
+ void SetServerVersion(const std::string& tag, int64 version);
+
+ const ModelType type_;
+
+ scoped_ptr<MockSyncCoreProxy> mock_sync_core_proxy_;
+ scoped_ptr<NonBlockingTypeProcessor> processor_;
+ MockNonBlockingTypeProcessorCore* mock_processor_core_;
+
+ DataTypeState data_type_state_;
+
+ std::map<const std::string, int64> server_versions_;
+};
+
+NonBlockingTypeProcessorTest::NonBlockingTypeProcessorTest()
+ : type_(PREFERENCES),
+ mock_sync_core_proxy_(new MockSyncCoreProxy()),
+ processor_(new NonBlockingTypeProcessor(type_)) {
+}
+
+NonBlockingTypeProcessorTest::~NonBlockingTypeProcessorTest() {
+}
+
+void NonBlockingTypeProcessorTest::Initialize() {
+ processor_->Enable(mock_sync_core_proxy_->Clone());
+ mock_processor_core_ = mock_sync_core_proxy_->GetMockProcessorCore();
+}
+
+void NonBlockingTypeProcessorTest::WriteItem(const std::string& tag,
+ const std::string& value) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ processor_->Put(tag, GenerateSpecifics(tag, value));
+}
+
+void NonBlockingTypeProcessorTest::DeleteItem(const std::string& tag) {
+ processor_->Delete(tag);
+}
+
+void NonBlockingTypeProcessorTest::UpdateFromServer(int64 version_offset,
+ const std::string& tag,
+ const std::string& value) {
+ const std::string tag_hash = GenerateTagHash(tag);
+
+ // Overwrite the existing server version if this is the new highest version.
+ int64 old_version = GetServerVersion(tag_hash);
+ int64 version = old_version + version_offset;
+ if (version > old_version) {
+ SetServerVersion(tag_hash, version);
+ }
+
+ UpdateResponseData data;
+ data.id = GenerateId(tag_hash);
+ data.client_tag_hash = tag_hash;
+ data.response_version = version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
+ data.non_unique_name = tag;
+ data.deleted = false;
+ data.specifics = GenerateSpecifics(tag, value);
+
+ UpdateResponseDataList list;
+ list.push_back(data);
+
+ processor_->OnUpdateReceived(data_type_state_, list);
+}
+
+void NonBlockingTypeProcessorTest::TombstoneFromServer(int64 version_offset,
+ const std::string& tag) {
+ // Overwrite the existing server version if this is the new highest version.
+ std::string tag_hash = GenerateTagHash(tag);
+ int64 old_version = GetServerVersion(tag_hash);
+ int64 version = old_version + version_offset;
+ if (version > old_version) {
+ SetServerVersion(tag_hash, version);
+ }
+
+ UpdateResponseData data;
+ data.id = GenerateId(tag_hash);
+ data.client_tag_hash = tag_hash;
+ data.response_version = version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
+ data.non_unique_name = tag;
+ data.deleted = true;
+
+ UpdateResponseDataList list;
+ list.push_back(data);
+
+ processor_->OnUpdateReceived(data_type_state_, list);
+}
+
+void NonBlockingTypeProcessorTest::SuccessfulCommitResponse(
+ const CommitRequestData& request_data) {
+ const std::string& client_tag_hash = request_data.client_tag_hash;
+ CommitResponseData response_data;
+
+ if (request_data.base_version == 0) {
+ // Server assigns new ID to newly committed items.
+ DCHECK(request_data.id.empty());
+ response_data.id = request_data.id;
+ } else {
+ // Otherwise we reuse the ID from the request.
+ response_data.id = GenerateId(client_tag_hash);
+ }
+
+ response_data.client_tag_hash = client_tag_hash;
+ response_data.sequence_number = request_data.sequence_number;
+
+ // Increment the server version on successful commit.
+ int64 version = GetServerVersion(client_tag_hash);
+ version++;
+ SetServerVersion(client_tag_hash, version);
+
+ response_data.response_version = version;
+
+ CommitResponseDataList list;
+ list.push_back(response_data);
+
+ processor_->OnCommitCompletion(data_type_state_, list);
+}
+
+int64 NonBlockingTypeProcessorTest::GetServerVersion(const std::string& tag) {
+ std::map<const std::string, int64>::const_iterator it;
+ it = server_versions_.find(tag);
+ if (it == server_versions_.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+}
+
+void NonBlockingTypeProcessorTest::SetServerVersion(const std::string& tag_hash,
+ int64 version) {
+ server_versions_[tag_hash] = version;
+}
+
+std::string NonBlockingTypeProcessorTest::GenerateId(
+ const std::string& tag) const {
+ return "FakeId:" + tag;
+}
+
+std::string NonBlockingTypeProcessorTest::GenerateTagHash(
+ const std::string& tag) const {
+ return syncable::GenerateSyncableHash(type_, tag);
+}
+
+sync_pb::EntitySpecifics NonBlockingTypeProcessorTest::GenerateSpecifics(
+ const std::string& tag,
+ const std::string& value) const {
+ sync_pb::EntitySpecifics specifics;
+ specifics.mutable_preference()->set_name(tag);
+ specifics.mutable_preference()->set_value(value);
+ return specifics;
+}
+
+size_t NonBlockingTypeProcessorTest::GetNumCommitRequestLists() {
+ return mock_processor_core_->commit_request_lists_.size();
+}
+
+CommitRequestDataList NonBlockingTypeProcessorTest::GetNthCommitRequestList(
+ size_t n) {
+ DCHECK_LT(n, GetNumCommitRequestLists());
+ return mock_processor_core_->commit_request_lists_[n];
+}
+
+bool NonBlockingTypeProcessorTest::HasCommitRequestForTag(
+ const std::string& tag) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ const std::vector<CommitRequestDataList>& lists =
+ mock_processor_core_->commit_request_lists_;
+
+ // Iterate backward through the sets of commit requests to find the most
+ // recent one that applies to the specified tag.
+ for (std::vector<CommitRequestDataList>::const_reverse_iterator lists_it =
+ lists.rbegin();
+ lists_it != lists.rend();
+ ++lists_it) {
+ for (CommitRequestDataList::const_iterator it = lists_it->begin();
+ it != lists_it->end();
+ ++it) {
+ if (it->client_tag_hash == tag_hash) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+CommitRequestData NonBlockingTypeProcessorTest::GetLatestCommitRequestForTag(
+ const std::string& tag) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ const std::vector<CommitRequestDataList>& lists =
+ mock_processor_core_->commit_request_lists_;
+
+ // Iterate backward through the sets of commit requests to find the most
+ // recent one that applies to the specified tag.
+ for (std::vector<CommitRequestDataList>::const_reverse_iterator lists_it =
+ lists.rbegin();
+ lists_it != lists.rend();
+ ++lists_it) {
+ for (CommitRequestDataList::const_iterator it = lists_it->begin();
+ it != lists_it->end();
+ ++it) {
+ if (it->client_tag_hash == tag_hash) {
+ return *it;
+ }
+ }
+ }
+
+ NOTREACHED() << "Could not find any commits for given tag " << tag << ". "
+ << "Test should have checked HasCommitRequestForTag() first.";
+ return CommitRequestData();
+}
+
+// Creates a new item locally.
+// Thoroughly tests the data generated by a local item creation.
+TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) {
+ Initialize();
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ WriteItem("tag1", "value1");
+
+ // Verify the commit request this operation has triggered.
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_data = GetLatestCommitRequestForTag("tag1");
+
+ EXPECT_TRUE(tag1_data.id.empty());
+ EXPECT_EQ(0, tag1_data.base_version);
+ EXPECT_FALSE(tag1_data.ctime.is_null());
+ EXPECT_FALSE(tag1_data.mtime.is_null());
+ EXPECT_EQ("tag1", tag1_data.non_unique_name);
+ EXPECT_FALSE(tag1_data.deleted);
+ EXPECT_EQ("tag1", tag1_data.specifics.preference().name());
+ EXPECT_EQ("value1", tag1_data.specifics.preference().value());
+}
+
+// Creates a new local item then modifies it.
+// Thoroughly tests data generated by modification of server-unknown item.
+TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) {
+ Initialize();
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1");
+
+ WriteItem("tag1", "value2");
+ EXPECT_EQ(2U, GetNumCommitRequestLists());
+
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v2_data = GetLatestCommitRequestForTag("tag1");
+
+ // Test some of the relations between old and new commit requests.
+ EXPECT_EQ(tag1_v1_data.specifics.preference().value(), "value1");
+ EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number);
+
+ // Perform a thorough examination of the update-generated request.
+ EXPECT_TRUE(tag1_v2_data.id.empty());
+ EXPECT_EQ(0, tag1_v2_data.base_version);
+ EXPECT_FALSE(tag1_v2_data.ctime.is_null());
+ EXPECT_FALSE(tag1_v2_data.mtime.is_null());
+ EXPECT_EQ("tag1", tag1_v2_data.non_unique_name);
+ EXPECT_FALSE(tag1_v2_data.deleted);
+ EXPECT_EQ("tag1", tag1_v2_data.specifics.preference().name());
+ EXPECT_EQ("value2", tag1_v2_data.specifics.preference().value());
+}
+
+// Deletes an item we've never seen before.
+// Should have no effect and not crash.
+TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) {
+ Initialize();
+
+ DeleteItem("tag1");
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+}
+
+// Creates an item locally then deletes it.
+//
+// In this test, no commit responses are received, so the deleted item is
+// server-unknown as far as the model thread is concerned. That behavior
+// is race-dependent; other tests are used to test other races.
+TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) {
+ Initialize();
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1");
+
+ DeleteItem("tag1");
+ EXPECT_EQ(2U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v2_data = GetLatestCommitRequestForTag("tag1");
+
+ EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number);
+
+ EXPECT_TRUE(tag1_v2_data.id.empty());
+ EXPECT_EQ(0, tag1_v2_data.base_version);
+ EXPECT_TRUE(tag1_v2_data.deleted);
+}
+
+// Creates an item locally then deletes it.
+//
+// The item is created locally then enqueued for commit. The sync thread
+// successfully commits it, but, before the commit response is picked up
+// by the model thread, the item is deleted by the model thread.
+TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) {
+ Initialize();
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+ const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1");
+
+ DeleteItem("tag1");
+ EXPECT_EQ(2U, GetNumCommitRequestLists());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+
+ // This commit happened while the deletion was in progress, but the commit
+ // response didn't arrive on our thread until after the delete was issued to
+ // the sync thread. It will update some metadata, but won't do much else.
+ SuccessfulCommitResponse(tag1_v1_data);
+
+ // TODO(rlarocque): Verify the state of the item is correct once we get
+ // storage hooked up in these tests. For example, verify the item is still
+ // marked as deleted.
+}
+
+// Creates two different sync items.
+// Verifies that the second has no effect on the first.
+TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) {
+ Initialize();
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ WriteItem("tag1", "value1");
+
+ // There should be one commit request for this item only.
+ ASSERT_EQ(1U, GetNumCommitRequestLists());
+ EXPECT_EQ(1U, GetNthCommitRequestList(0).size());
+ ASSERT_TRUE(HasCommitRequestForTag("tag1"));
+
+ WriteItem("tag2", "value2");
+
+ // The second write should trigger another single-item commit request.
+ ASSERT_EQ(2U, GetNumCommitRequestLists());
+ EXPECT_EQ(1U, GetNthCommitRequestList(1).size());
+ ASSERT_TRUE(HasCommitRequestForTag("tag2"));
+}
+
+// TODO(rlarocque): Add more testing of non_unique_name fields.
+
+} // namespace syncer