diff options
25 files changed, 1559 insertions, 51 deletions
diff --git a/components/sync_driver/non_blocking_data_type_controller_unittest.cc b/components/sync_driver/non_blocking_data_type_controller_unittest.cc index 32e475f..2c9076c 100644 --- a/components/sync_driver/non_blocking_data_type_controller_unittest.cc +++ b/components/sync_driver/non_blocking_data_type_controller_unittest.cc @@ -12,12 +12,38 @@ #include "base/test/test_simple_task_runner.h" #include "components/sync_driver/non_blocking_data_type_controller.h" #include "sync/engine/non_blocking_type_processor.h" +#include "sync/engine/non_blocking_type_processor_core_interface.h" #include "sync/internal_api/public/base/model_type.h" #include "sync/internal_api/public/sync_core_proxy.h" #include "testing/gtest/include/gtest/gtest.h" +namespace syncer { + +class NonBlockingTypeProcessorCore; + namespace { +// A useless instance of NonBlockingTypeProcessorCore. +class NullNonBlockingTypeProcessorCore + : public NonBlockingTypeProcessorCoreInterface { + public: + NullNonBlockingTypeProcessorCore(); + virtual ~NullNonBlockingTypeProcessorCore(); + + virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; +}; + +NullNonBlockingTypeProcessorCore::NullNonBlockingTypeProcessorCore() { +} + +NullNonBlockingTypeProcessorCore::~NullNonBlockingTypeProcessorCore() { +} + +void NullNonBlockingTypeProcessorCore::RequestCommits( + const CommitRequestDataList& list) { + NOTREACHED() << "Not implemented."; +} + // A class that pretends to be the sync backend. class MockSyncCore { public: @@ -28,10 +54,11 @@ class MockSyncCore { enabled_types_.Put(type); model_task_runner->PostTask( FROM_HERE, - base::Bind(&syncer::NonBlockingTypeProcessor::OnConnect, - type_processor, - base::WeakPtr<syncer::NonBlockingTypeProcessorCore>(), - scoped_refptr<base::SequencedTaskRunner>())); + base::Bind( + &syncer::NonBlockingTypeProcessor::OnConnect, + type_processor, + base::Passed(scoped_ptr<NonBlockingTypeProcessorCoreInterface>( + new NullNonBlockingTypeProcessorCore()).Pass()))); } void Disconnect(syncer::ModelType type) { @@ -58,6 +85,7 @@ class MockSyncCoreProxy : public syncer::SyncCoreProxy { virtual void ConnectTypeToCore( syncer::ModelType type, + const DataTypeState& data_type_state, base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) OVERRIDE { // Normally we'd use MessageLoopProxy::current() as the TaskRunner argument // to Connect(). That won't work here in this test, so we use the @@ -88,6 +116,8 @@ class MockSyncCoreProxy : public syncer::SyncCoreProxy { scoped_refptr<base::TestSimpleTaskRunner> sync_task_runner_; }; +} // namespace + class NonBlockingDataTypeControllerTest : public testing::Test { public: NonBlockingDataTypeControllerTest() @@ -406,4 +436,4 @@ TEST_F(NonBlockingDataTypeControllerTest, EnableDisableEnableRace) { EXPECT_TRUE(processor_.IsConnected()); } -} // namespace +} // namespace syncer diff --git a/sync/engine/model_thread_sync_entity.cc b/sync/engine/model_thread_sync_entity.cc new file mode 100644 index 0000000..fde2eb3 --- /dev/null +++ b/sync/engine/model_thread_sync_entity.cc @@ -0,0 +1,157 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/model_thread_sync_entity.h" +#include "sync/syncable/syncable_util.h" + +namespace syncer { + +scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::NewLocalItem( + const std::string& client_tag, + const sync_pb::EntitySpecifics& specifics, + base::Time now) { + return scoped_ptr<ModelThreadSyncEntity>(new ModelThreadSyncEntity( + 1, + 0, + 0, + 0, + true, + std::string(), // Sync thread will assign the initial ID. + syncable::GenerateSyncableHash(GetModelTypeFromSpecifics(specifics), + client_tag), + client_tag, // As non-unique name. + specifics, + false, + now, + now)); +} + +scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::FromServerUpdate( + const std::string& id, + const std::string& client_tag_hash, + const std::string& non_unique_name, + int64 version, + const sync_pb::EntitySpecifics& specifics, + bool deleted, + base::Time ctime, + base::Time mtime) { + return scoped_ptr<ModelThreadSyncEntity>( + new ModelThreadSyncEntity(0, + 0, + 0, + version, + true, + id, + client_tag_hash, + non_unique_name, + specifics, + deleted, + ctime, + mtime)); +} + +ModelThreadSyncEntity::ModelThreadSyncEntity( + int64 sequence_number, + int64 commit_requested_sequence_number, + int64 acked_sequence_number, + int64 base_version, + bool is_dirty, + const std::string& id, + const std::string& client_tag_hash, + const std::string& non_unique_name, + const sync_pb::EntitySpecifics& specifics, + bool deleted, + base::Time ctime, + base::Time mtime) + : sequence_number_(sequence_number), + commit_requested_sequence_number_(commit_requested_sequence_number), + acked_sequence_number_(acked_sequence_number), + base_version_(base_version), + is_dirty_(is_dirty), + id_(id), + client_tag_hash_(client_tag_hash), + non_unique_name_(non_unique_name), + specifics_(specifics), + deleted_(deleted), + ctime_(ctime), + mtime_(mtime) { +} + +ModelThreadSyncEntity::~ModelThreadSyncEntity() { +} + +bool ModelThreadSyncEntity::IsWriteRequired() const { + return is_dirty_; +} + +bool ModelThreadSyncEntity::IsUnsynced() const { + return sequence_number_ > acked_sequence_number_; +} + +bool ModelThreadSyncEntity::RequiresCommitRequest() const { + return sequence_number_ > commit_requested_sequence_number_; +} + +bool ModelThreadSyncEntity::UpdateIsReflection(int64 update_version) const { + return base_version_ >= update_version; +} + +bool ModelThreadSyncEntity::UpdateIsInConflict(int64 update_version) const { + return IsUnsynced() && !UpdateIsReflection(update_version); +} + +void ModelThreadSyncEntity::ApplyUpdateFromServer( + int64 update_version, + bool deleted, + const sync_pb::EntitySpecifics& specifics, + base::Time mtime) { + // There was a conflict and the server just won it. + // This implicitly acks all outstanding commits because a received update + // will clobber any pending commits on the sync thread. + acked_sequence_number_ = sequence_number_; + commit_requested_sequence_number_ = sequence_number_; + + base_version_ = update_version; + specifics_ = specifics; + mtime_ = mtime; +} + +void ModelThreadSyncEntity::MakeLocalChange( + const sync_pb::EntitySpecifics& specifics) { + sequence_number_++; + specifics_ = specifics; +} + +void ModelThreadSyncEntity::Delete() { + sequence_number_++; + specifics_.Clear(); + deleted_ = true; +} + +void ModelThreadSyncEntity::InitializeCommitRequestData( + CommitRequestData* request) const { + request->id = id_; + request->client_tag_hash = client_tag_hash_; + request->sequence_number = sequence_number_; + request->base_version = base_version_; + request->ctime = ctime_; + request->mtime = mtime_; + request->non_unique_name = non_unique_name_; + request->deleted = deleted_; + request->specifics.CopyFrom(specifics_); +} + +void ModelThreadSyncEntity::SetCommitRequestInProgress() { + commit_requested_sequence_number_ = sequence_number_; +} + +void ModelThreadSyncEntity::ReceiveCommitResponse(const std::string& id, + int64 sequence_number, + int64 response_version) { + id_ = id; // The server can assign us a new ID in a commit response. + acked_sequence_number_ = sequence_number; + base_version_ = response_version; +} + +} // namespace syncer diff --git a/sync/engine/model_thread_sync_entity.h b/sync/engine/model_thread_sync_entity.h new file mode 100644 index 0000000..31cc2b8 --- /dev/null +++ b/sync/engine/model_thread_sync_entity.h @@ -0,0 +1,186 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_ +#define SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_ + +#include <string> + +#include "base/memory/scoped_ptr.h" +#include "base/time/time.h" +#include "sync/base/sync_export.h" +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +// This is the model thread's representation of a SyncEntity. +// +// The model thread sync entity receives updates from the model itself and +// (asynchronously) from the sync server via the sync thread. From the point +// of view of this class, updates from the server take precedence over local +// changes, though the model may be given an opportunity to overwrite this +// decision later. +// +// Sync will try to commit this entity's data to the sync server and local +// storage. +// +// Most of the logic related to those processes live outside this class. This +// class helps out a bit by offering some functions to serialize its data to +// various formats and query the entity's status. +class SYNC_EXPORT_PRIVATE ModelThreadSyncEntity { + public: + // Construct an instance representing a new locally-created item. + static scoped_ptr<ModelThreadSyncEntity> NewLocalItem( + const std::string& client_tag, + const sync_pb::EntitySpecifics& specifics, + base::Time now); + + // Construct an instance representing an item newly received from the server. + static scoped_ptr<ModelThreadSyncEntity> FromServerUpdate( + const std::string& id, + const std::string& client_tag_hash, + const std::string& non_unique_name, + int64 version, + const sync_pb::EntitySpecifics& specifics, + bool deleted, + base::Time ctime, + base::Time mtime); + + // TODO(rlarocque): Implement FromDisk constructor when we implement storage. + + ~ModelThreadSyncEntity(); + + // Returns true if this data is out of sync with local storage. + bool IsWriteRequired() const; + + // Returns true if this data is out of sync with the server. + // A commit may or may not be in progress at this time. + bool IsUnsynced() const; + + // Returns true if this data is out of sync with the sync thread. + // + // There may or may not be a commit in progress for this item, but there's + // definitely no commit in progress for this (most up to date) version of + // this item. + bool RequiresCommitRequest() const; + + // Returns true if the specified update version does not contain new data. + bool UpdateIsReflection(int64 update_version) const; + + // Returns true if the specified update version conflicts with local changes. + bool UpdateIsInConflict(int64 update_version) const; + + // Applies an update from the sync server. + // + // Overrides any local changes. Check UpdateIsInConflict() before calling + // this function if you want to handle conflicts differently. + void ApplyUpdateFromServer(int64 update_version, + bool deleted, + const sync_pb::EntitySpecifics& specifics, + base::Time mtime); + + // Applies a local change to this item. + void MakeLocalChange(const sync_pb::EntitySpecifics& specifics); + + // Applies a local deletion to this item. + void Delete(); + + // Initializes a message representing this item's uncommitted state + // to be forwarded to the sync server for committing. + void InitializeCommitRequestData(CommitRequestData* request) const; + + // Notes that the current version of this item has been queued for commit. + void SetCommitRequestInProgress(); + + // Receives a successful commit response. + // + // Sucssful commit responses can overwrite an item's ID. + // + // Note that the receipt of a successful commit response does not necessarily + // unset IsUnsynced(). If many local changes occur in quick succession, it's + // possible that the committed item was already out of date by the time it + // reached the server. + void ReceiveCommitResponse(const std::string& id, + int64 sequence_number, + int64 response_version); + + private: + ModelThreadSyncEntity(int64 sequence_number, + int64 commit_requested_sequence_number, + int64 acked_sequence_number, + int64 base_version, + bool is_dirty, + const std::string& id, + const std::string& client_tag_hash, + const std::string& non_unique_name, + const sync_pb::EntitySpecifics& specifics, + bool deleted, + base::Time ctime, + base::Time mtime); + + // A sequence number used to track in-progress commits. Each local change + // increments this number. + int64 sequence_number_; + + // The sequence number of the last item sent to the sync thread. + int64 commit_requested_sequence_number_; + + // The sequence number of the last item known to be successfully committed. + int64 acked_sequence_number_; + + // The server version on which this item is based. + // + // If there are no local changes, this is the version of the entity as we see + // it here. + // + // If there are local changes, this is the version of the entity on which + // those changes are based. + int64 base_version_; + + // True if this entity is out of sync with local storage. + bool is_dirty_; + + // The entity's ID. + // + // Most of the time, this is a server-assigned value. + // + // Prior to the item's first commit, we leave this value as an empty string. + // The initial ID for a newly created item has to meet certain uniqueness + // requirements, and we handle those on the sync thread. + std::string id_; + + // A hash based on the client tag and model type. + // Used for various map lookups. Should always be available. + std::string client_tag_hash_; + + // A non-unique name associated with this entity. + // + // It is sometimes used for debugging. It gets saved to and restored from + // the sync server. + // + // Its value is often related to the item's unhashed client tag, though this + // is not guaranteed and should not be relied on. May be hidden when + // encryption is enabled. + std::string non_unique_name_; + + // A protobuf filled with type-specific information. Contains the most + // up-to-date specifics, whether it be from the server or a locally modified + // version. + sync_pb::EntitySpecifics specifics_; + + // Whether or not the item is deleted. The |specifics_| field may be empty + // if this flag is true. + bool deleted_; + + // Entity creation and modification timestamps. + // Assigned by the client and synced by the server, though the server usually + // doesn't bother to inspect their values. + base::Time ctime_; + base::Time mtime_; +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_MODEL_THREAD_SYNC_ENTITY_H_ diff --git a/sync/engine/model_thread_sync_entity_unittest.cc b/sync/engine/model_thread_sync_entity_unittest.cc new file mode 100644 index 0000000..6b158dd --- /dev/null +++ b/sync/engine/model_thread_sync_entity_unittest.cc @@ -0,0 +1,178 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/model_thread_sync_entity.h" + +#include "base/memory/scoped_ptr.h" +#include "base/time/time.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/protocol/sync.pb.h" +#include "sync/syncable/syncable_util.h" + +#include "testing/gtest/include/gtest/gtest.h" + +namespace syncer { + +// Some simple sanity tests for the ModelThreadSyncEntity. +// +// A lot of the more complicated sync logic is implemented in the +// NonBlockingTypeProcessor that owns the ModelThreadSyncEntity. We +// can't unit test it here. +// +// Instead, we focus on simple tests to make sure that variables are getting +// properly intialized and flags properly set. Anything more complicated would +// be a redundant and incomplete version of the NonBlockingTypeProcessor tests. +class ModelThreadSyncEntityTest : public ::testing::Test { + public: + ModelThreadSyncEntityTest() + : kServerId("ServerID"), + kClientTag("sample.pref.name"), + kClientTagHash(syncable::GenerateSyncableHash(PREFERENCES, kClientTag)), + kCtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(10)), + kMtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(20)) { + sync_pb::PreferenceSpecifics* pref_specifics = + specifics.mutable_preference(); + pref_specifics->set_name(kClientTag); + pref_specifics->set_value("pref.value"); + } + + const std::string kServerId; + const std::string kClientTag; + const std::string kClientTagHash; + const base::Time kCtime; + const base::Time kMtime; + sync_pb::EntitySpecifics specifics; +}; + +TEST_F(ModelThreadSyncEntityTest, NewLocalItem) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::NewLocalItem("asdf", specifics, kCtime)); + + EXPECT_TRUE(entity->IsWriteRequired()); + EXPECT_TRUE(entity->IsUnsynced()); + EXPECT_FALSE(entity->UpdateIsReflection(1)); + EXPECT_TRUE(entity->UpdateIsInConflict(1)); +} + +TEST_F(ModelThreadSyncEntityTest, FromServerUpdate) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::FromServerUpdate( + kServerId, + kClientTagHash, + kClientTag, // As non-unique name. + 10, + specifics, + false, + kCtime, + kMtime)); + + EXPECT_TRUE(entity->IsWriteRequired()); + EXPECT_FALSE(entity->IsUnsynced()); + EXPECT_TRUE(entity->UpdateIsReflection(9)); + EXPECT_TRUE(entity->UpdateIsReflection(10)); + EXPECT_FALSE(entity->UpdateIsReflection(11)); + EXPECT_FALSE(entity->UpdateIsInConflict(11)); +} + +// Tombstones should behave just like regular updates. Mostly. The strangest +// thing about them is that they don't have specifics, so it can be hard to +// detect their type. Fortunately, this class doesn't care about types in +// received updates. +TEST_F(ModelThreadSyncEntityTest, TombstoneUpdate) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::FromServerUpdate( + kServerId, + kClientTagHash, + kClientTag, // As non-unique name. + 10, + sync_pb::EntitySpecifics(), + true, + kCtime, + kMtime)); + + EXPECT_TRUE(entity->IsWriteRequired()); + EXPECT_FALSE(entity->IsUnsynced()); + EXPECT_TRUE(entity->UpdateIsReflection(9)); + EXPECT_TRUE(entity->UpdateIsReflection(10)); + EXPECT_FALSE(entity->UpdateIsReflection(11)); + EXPECT_FALSE(entity->UpdateIsInConflict(11)); +} + +// Apply a deletion update. +TEST_F(ModelThreadSyncEntityTest, ApplyUpdate) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::FromServerUpdate( + kServerId, + kClientTagHash, + kClientTag, // As non-unique name. + 10, + specifics, + false, + kCtime, + kMtime)); + + // A deletion update one version later. + entity->ApplyUpdateFromServer(11, + true, + sync_pb::EntitySpecifics(), + kMtime + base::TimeDelta::FromSeconds(10)); + + EXPECT_TRUE(entity->IsWriteRequired()); + EXPECT_FALSE(entity->IsUnsynced()); + EXPECT_TRUE(entity->UpdateIsReflection(11)); + EXPECT_FALSE(entity->UpdateIsReflection(12)); +} + +TEST_F(ModelThreadSyncEntityTest, LocalChange) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::FromServerUpdate( + kServerId, + kClientTagHash, + kClientTag, // As non-unique name. + 10, + specifics, + false, + kCtime, + kMtime)); + + sync_pb::EntitySpecifics specifics2; + specifics2.CopyFrom(specifics); + specifics2.mutable_preference()->set_value("new.pref.value"); + + entity->MakeLocalChange(specifics2); + EXPECT_TRUE(entity->IsWriteRequired()); + EXPECT_TRUE(entity->IsUnsynced()); + + EXPECT_TRUE(entity->UpdateIsReflection(10)); + EXPECT_FALSE(entity->UpdateIsInConflict(10)); + + EXPECT_FALSE(entity->UpdateIsReflection(11)); + EXPECT_TRUE(entity->UpdateIsInConflict(11)); +} + +TEST_F(ModelThreadSyncEntityTest, LocalDeletion) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::FromServerUpdate( + kServerId, + kClientTagHash, + kClientTag, // As non-unique name. + 10, + specifics, + false, + kCtime, + kMtime)); + + entity->Delete(); + + EXPECT_TRUE(entity->IsWriteRequired()); + EXPECT_TRUE(entity->IsUnsynced()); + + EXPECT_TRUE(entity->UpdateIsReflection(10)); + EXPECT_FALSE(entity->UpdateIsInConflict(10)); + + EXPECT_FALSE(entity->UpdateIsReflection(11)); + EXPECT_TRUE(entity->UpdateIsInConflict(11)); +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_sync_common.cc b/sync/engine/non_blocking_sync_common.cc new file mode 100644 index 0000000..c5a3656 --- /dev/null +++ b/sync/engine/non_blocking_sync_common.cc @@ -0,0 +1,33 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_sync_common.h" + +namespace syncer { + +DataTypeState::DataTypeState() { +} + +DataTypeState::~DataTypeState() { +} + +CommitRequestData::CommitRequestData() { +} + +CommitRequestData::~CommitRequestData() { +} + +CommitResponseData::CommitResponseData() { +} + +CommitResponseData::~CommitResponseData() { +} + +UpdateResponseData::UpdateResponseData() { +} + +UpdateResponseData::~UpdateResponseData() { +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_sync_common.h b/sync/engine/non_blocking_sync_common.h new file mode 100644 index 0000000..f07fdb2 --- /dev/null +++ b/sync/engine/non_blocking_sync_common.h @@ -0,0 +1,98 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_ +#define SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_ + +#include <string> +#include <vector> + +#include "base/time/time.h" +#include "sync/base/sync_export.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +// Data-type global state that must be accessed and updated on the sync thread, +// but persisted on or through the model thread. +struct SYNC_EXPORT_PRIVATE DataTypeState { + DataTypeState(); + ~DataTypeState(); + + // The latest progress markers received from the server. + sync_pb::DataTypeProgressMarker progress_marker; + + // A data type context. Sent to the server in every commit or update + // request. May be updated by either by responses from the server or + // requests made on the model thread. The interpretation of this value may + // be data-type specific. Many data types ignore it. + sync_pb::DataTypeContext type_context; + + // The ID of the folder node that sits at the top of this type's folder + // hierarchy. We keep this around for legacy reasons. The protocol expects + // that all nodes of a certain type are children of the same type root + // entity. This entity is delivered by the server, and may not be available + // until the first download cycle has completed. + std::string type_root_id; + + // A strictly increasing counter used to generate unique values for the + // client-assigned IDs. The incrementing and ID assignment happens on the + // sync thread, but we store the value here so we can pass it back to the + // model thread for persistence. This is probably unnecessary for the + // client-tagged data types supported by non-blocking sync, but we will + // continue to emulate the directory sync's behavior for now. + int64 next_client_id; +}; + +struct SYNC_EXPORT_PRIVATE CommitRequestData { + CommitRequestData(); + ~CommitRequestData(); + + std::string id; + std::string client_tag_hash; + + // Strictly incrementing number for in-progress commits. More information + // about its meaning can be found in comments in the files that make use of + // this struct. + int64 sequence_number; + + int64 base_version; + base::Time ctime; + base::Time mtime; + std::string non_unique_name; + bool deleted; + sync_pb::EntitySpecifics specifics; +}; + +struct SYNC_EXPORT_PRIVATE CommitResponseData { + CommitResponseData(); + ~CommitResponseData(); + + std::string id; + std::string client_tag_hash; + int64 sequence_number; + int64 response_version; +}; + +struct SYNC_EXPORT_PRIVATE UpdateResponseData { + UpdateResponseData(); + ~UpdateResponseData(); + + std::string id; + std::string client_tag_hash; + int64 response_version; + base::Time ctime; + base::Time mtime; + std::string non_unique_name; + bool deleted; + sync_pb::EntitySpecifics specifics; +}; + +typedef std::vector<CommitRequestData> CommitRequestDataList; +typedef std::vector<CommitResponseData> CommitResponseDataList; +typedef std::vector<UpdateResponseData> UpdateResponseDataList; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_SYNC_COMMON_H_ diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc index 66a016b..0aaead4 100644 --- a/sync/engine/non_blocking_type_processor.cc +++ b/sync/engine/non_blocking_type_processor.cc @@ -4,9 +4,13 @@ #include "sync/engine/non_blocking_type_processor.h" +#include "base/bind.h" +#include "base/location.h" #include "base/message_loop/message_loop_proxy.h" -#include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/engine/model_thread_sync_entity.h" +#include "sync/engine/non_blocking_type_processor_core_interface.h" #include "sync/internal_api/public/sync_core_proxy.h" +#include "sync/syncable/syncable_util.h" namespace syncer { @@ -14,6 +18,7 @@ NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type) : type_(type), is_preferred_(false), is_connected_(false), + entities_deleter_(&entities_), weak_ptr_factory_for_ui_(this), weak_ptr_factory_for_sync_(this) { } @@ -40,9 +45,17 @@ void NonBlockingTypeProcessor::Enable( scoped_ptr<SyncCoreProxy> sync_core_proxy) { DCHECK(CalledOnValidThread()); DVLOG(1) << "Asked to enable " << ModelTypeToString(type_); + is_preferred_ = true; + + // TODO(rlarocque): At some point, this should be loaded from storage. + data_type_state_.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(type_)); + data_type_state_.next_client_id = 0; + sync_core_proxy_ = sync_core_proxy.Pass(); sync_core_proxy_->ConnectTypeToCore(GetModelType(), + data_type_state_, weak_ptr_factory_for_sync_.GetWeakPtr()); } @@ -63,8 +76,7 @@ void NonBlockingTypeProcessor::Disconnect() { } weak_ptr_factory_for_sync_.InvalidateWeakPtrs(); - core_ = base::WeakPtr<NonBlockingTypeProcessorCore>(); - sync_thread_ = scoped_refptr<base::SequencedTaskRunner>(); + core_interface_.reset(); } base::WeakPtr<NonBlockingTypeProcessor> @@ -74,13 +86,136 @@ NonBlockingTypeProcessor::AsWeakPtrForUI() { } void NonBlockingTypeProcessor::OnConnect( - base::WeakPtr<NonBlockingTypeProcessorCore> core, - scoped_refptr<base::SequencedTaskRunner> sync_thread) { + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) { DCHECK(CalledOnValidThread()); DVLOG(1) << "Successfully connected " << ModelTypeToString(type_); + is_connected_ = true; - core_ = core; - sync_thread_ = sync_thread; + core_interface_ = core_interface.Pass(); + + FlushPendingCommitRequests(); +} + +void NonBlockingTypeProcessor::Put(const std::string& client_tag, + const sync_pb::EntitySpecifics& specifics) { + DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics)); + + const std::string client_tag_hash( + syncable::GenerateSyncableHash(type_, client_tag)); + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::NewLocalItem( + client_tag, specifics, base::Time::Now())); + entities_.insert(std::make_pair(client_tag_hash, entity.release())); + } else { + ModelThreadSyncEntity* entity = it->second; + entity->MakeLocalChange(specifics); + } + + FlushPendingCommitRequests(); +} + +void NonBlockingTypeProcessor::Delete(const std::string& client_tag) { + const std::string client_tag_hash( + syncable::GenerateSyncableHash(type_, client_tag)); + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + // That's unusual, but not necessarily a bad thing. + // Missing is as good as deleted as far as the model is concerned. + DLOG(WARNING) << "Attempted to delete missing item." + << " client tag: " << client_tag; + } else { + ModelThreadSyncEntity* entity = it->second; + entity->Delete(); + } + + FlushPendingCommitRequests(); +} + +void NonBlockingTypeProcessor::FlushPendingCommitRequests() { + CommitRequestDataList commit_requests; + + // Don't bother sending anything if there's no one to send to. + if (!IsConnected()) + return; + + // TODO(rlarocque): Do something smarter than iterate here. + for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); + ++it) { + if (it->second->RequiresCommitRequest()) { + CommitRequestData request; + it->second->InitializeCommitRequestData(&request); + commit_requests.push_back(request); + it->second->SetCommitRequestInProgress(); + } + } + + if (!commit_requests.empty()) + core_interface_->RequestCommits(commit_requests); +} + +void NonBlockingTypeProcessor::OnCommitCompletion( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + data_type_state_ = type_state; + + for (CommitResponseDataList::const_iterator list_it = response_list.begin(); + list_it != response_list.end(); + ++list_it) { + const CommitResponseData& response_data = *list_it; + const std::string& client_tag_hash = response_data.client_tag_hash; + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + NOTREACHED() << "Received commit response for missing item." + << " type: " << type_ << " client_tag: " << client_tag_hash; + return; + } else { + it->second->ReceiveCommitResponse(response_data.id, + response_data.sequence_number, + response_data.response_version); + } + } +} + +void NonBlockingTypeProcessor::OnUpdateReceived( + const DataTypeState& data_type_state, + const UpdateResponseDataList& response_list) { + data_type_state_ = data_type_state; + + for (UpdateResponseDataList::const_iterator list_it = response_list.begin(); + list_it != response_list.end(); + ++list_it) { + const UpdateResponseData& response_data = *list_it; + const std::string& client_tag_hash = response_data.client_tag_hash; + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + scoped_ptr<ModelThreadSyncEntity> entity = + ModelThreadSyncEntity::FromServerUpdate( + response_data.id, + response_data.client_tag_hash, + response_data.non_unique_name, + response_data.response_version, + response_data.specifics, + response_data.deleted, + response_data.ctime, + response_data.mtime); + entities_.insert(std::make_pair(client_tag_hash, entity.release())); + } else { + ModelThreadSyncEntity* entity = it->second; + entity->ApplyUpdateFromServer(response_data.response_version, + response_data.deleted, + response_data.specifics, + response_data.mtime); + // TODO: Do something special when conflicts are detected. + } + } + + // TODO: Inform the model of the new or updated data. } } // namespace syncer diff --git a/sync/engine/non_blocking_type_processor.h b/sync/engine/non_blocking_type_processor.h index 2db2c32..2274c5d 100644 --- a/sync/engine/non_blocking_type_processor.h +++ b/sync/engine/non_blocking_type_processor.h @@ -6,16 +6,18 @@ #define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_H_ #include "base/memory/weak_ptr.h" -#include "base/sequenced_task_runner.h" +#include "base/stl_util.h" #include "base/threading/non_thread_safe.h" #include "sync/base/sync_export.h" +#include "sync/engine/non_blocking_sync_common.h" #include "sync/internal_api/public/base/model_type.h" #include "sync/protocol/sync.pb.h" namespace syncer { class SyncCoreProxy; -class NonBlockingTypeProcessorCore; +class ModelThreadSyncEntity; +class NonBlockingTypeProcessorCoreInterface; // A sync component embedded on the synced type's thread that helps to handle // communication between sync and model type threads. @@ -54,16 +56,38 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe { void Disconnect(); // Callback used to process the handshake response. - void OnConnect(base::WeakPtr<NonBlockingTypeProcessorCore> core, - scoped_refptr<base::SequencedTaskRunner> sync_thread); + void OnConnect( + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface); + + // Requests that an item be stored in sync. + void Put(const std::string& client_tag, + const sync_pb::EntitySpecifics& specifics); + + // Deletes an item from sync. + void Delete(const std::string& client_tag); + + // Informs this object that some of its commit requests have been + // successfully serviced. + void OnCommitCompletion(const DataTypeState& type_state, + const CommitResponseDataList& response_list); + + // Informs this object that there are some incoming updates is should + // handle. + void OnUpdateReceived(const DataTypeState& type_state, + const UpdateResponseDataList& response_list); // Returns the long-lived WeakPtr that is intended to be registered with the // ProfileSyncService. base::WeakPtr<NonBlockingTypeProcessor> AsWeakPtrForUI(); private: + typedef std::map<std::string, ModelThreadSyncEntity*> EntityMap; + + // Sends all commit requests that are due to be sent to the sync thread. + void FlushPendingCommitRequests(); + ModelType type_; - sync_pb::DataTypeProgressMarker progress_marker_; + DataTypeState data_type_state_; // Whether or not sync is preferred for this type. This is a cached copy of // the canonical copy information on the UI thread. @@ -75,10 +99,21 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe { // Our link to data type management on the sync thread. // Used for enabling and disabling sync for this type. + // + // Beware of NULL pointers: This object is uninitialized when we are not + // connected to sync. scoped_ptr<SyncCoreProxy> sync_core_proxy_; - base::WeakPtr<NonBlockingTypeProcessorCore> core_; - scoped_refptr<base::SequencedTaskRunner> sync_thread_; + // Reference to the NonBlockingTypeProcessorCore. + // + // The interface hides the posting of tasks across threads as well as the + // NonBlockingTypeProcessorCore's implementation. Both of these features are + // useful in tests. + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface_; + + // The set of sync entities known to this object. + EntityMap entities_; + STLValueDeleter<EntityMap> entities_deleter_; // We use two different WeakPtrFactories because we want the pointers they // issue to have different lifetimes. When asked to disconnect from the sync diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc index ea0f918..236c9c5 100644 --- a/sync/engine/non_blocking_type_processor_core.cc +++ b/sync/engine/non_blocking_type_processor_core.cc @@ -63,6 +63,11 @@ void NonBlockingTypeProcessorCore::ApplyUpdates( DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_); } +void NonBlockingTypeProcessorCore::RequestCommits( + const CommitRequestDataList& request_list) { + // TODO(rlarocque): Implement this. crbug.com/351005. +} + void NonBlockingTypeProcessorCore::PassiveApplyUpdates( sessions::StatusController* status) { NOTREACHED() diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h index c9cec88..e226349 100644 --- a/sync/engine/non_blocking_type_processor_core.h +++ b/sync/engine/non_blocking_type_processor_core.h @@ -9,6 +9,7 @@ #include "base/threading/non_thread_safe.h" #include "sync/base/sync_export.h" #include "sync/engine/commit_contributor.h" +#include "sync/engine/non_blocking_sync_common.h" #include "sync/engine/update_handler.h" #include "sync/internal_api/public/base/model_type.h" #include "sync/protocol/sync.pb.h" @@ -67,6 +68,9 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore virtual void ApplyUpdates(sessions::StatusController* status) OVERRIDE; virtual void PassiveApplyUpdates(sessions::StatusController* status) OVERRIDE; + // Entry point for NonBlockingTypeProcessor to send commit requests. + void RequestCommits(const CommitRequestDataList& request_list); + // CommitContributor implementation. virtual scoped_ptr<CommitContribution> GetContribution( size_t max_entries) OVERRIDE; diff --git a/sync/engine/non_blocking_type_processor_core_interface.cc b/sync/engine/non_blocking_type_processor_core_interface.cc new file mode 100644 index 0000000..6ce74cc --- /dev/null +++ b/sync/engine/non_blocking_type_processor_core_interface.cc @@ -0,0 +1,16 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_processor_core_interface.h" + +namespace syncer { + +NonBlockingTypeProcessorCoreInterface::NonBlockingTypeProcessorCoreInterface() { +} + +NonBlockingTypeProcessorCoreInterface:: + ~NonBlockingTypeProcessorCoreInterface() { +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_core_interface.h b/sync/engine/non_blocking_type_processor_core_interface.h new file mode 100644 index 0000000..56675c2 --- /dev/null +++ b/sync/engine/non_blocking_type_processor_core_interface.h @@ -0,0 +1,24 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_ +#define SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_ + +#include "sync/engine/non_blocking_sync_common.h" + +namespace syncer { + +// An interface representing a NonBlockingTypeProcessorCore and its thread. +// This abstraction is useful in tests. +class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorCoreInterface { + public: + NonBlockingTypeProcessorCoreInterface(); + virtual ~NonBlockingTypeProcessorCoreInterface(); + + virtual void RequestCommits(const CommitRequestDataList& list) = 0; +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_CORE_INTERFACE_H_ diff --git a/sync/engine/non_blocking_type_processor_unittest.cc b/sync/engine/non_blocking_type_processor_unittest.cc new file mode 100644 index 0000000..9ab6ca3 --- /dev/null +++ b/sync/engine/non_blocking_type_processor_unittest.cc @@ -0,0 +1,524 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_processor.h" + +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/engine/non_blocking_type_processor_core_interface.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/internal_api/public/sync_core_proxy.h" +#include "sync/protocol/sync.pb.h" +#include "sync/syncable/syncable_util.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace syncer { + +namespace { + +class MockNonBlockingTypeProcessorCore + : public NonBlockingTypeProcessorCoreInterface { + public: + MockNonBlockingTypeProcessorCore(); + virtual ~MockNonBlockingTypeProcessorCore(); + + virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; + + bool is_connected_; + + std::vector<CommitRequestDataList> commit_request_lists_; +}; + +MockNonBlockingTypeProcessorCore::MockNonBlockingTypeProcessorCore() + : is_connected_(false) { +} + +MockNonBlockingTypeProcessorCore::~MockNonBlockingTypeProcessorCore() { +} + +void MockNonBlockingTypeProcessorCore::RequestCommits( + const CommitRequestDataList& list) { + commit_request_lists_.push_back(list); +} + +class MockSyncCoreProxy : public syncer::SyncCoreProxy { + public: + MockSyncCoreProxy(); + virtual ~MockSyncCoreProxy(); + + virtual void ConnectTypeToCore( + syncer::ModelType type, + const DataTypeState& data_type_state, + base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) OVERRIDE; + virtual void Disconnect(syncer::ModelType type) OVERRIDE; + virtual scoped_ptr<SyncCoreProxy> Clone() const OVERRIDE; + + MockNonBlockingTypeProcessorCore* GetMockProcessorCore(); + + private: + explicit MockSyncCoreProxy(MockNonBlockingTypeProcessorCore*); + + // The NonBlockingTypeProcessor's contract expects that it gets to own this + // object, so we can retain only a non-owned pointer to it. + // + // This is very unsafe, but we can get away with it since these tests are not + // exercising the processor <-> processor_core connection code. + MockNonBlockingTypeProcessorCore* mock_core_; +}; + +MockSyncCoreProxy::MockSyncCoreProxy() + : mock_core_(new MockNonBlockingTypeProcessorCore) { +} + +MockSyncCoreProxy::MockSyncCoreProxy(MockNonBlockingTypeProcessorCore* core) + : mock_core_(core) { +} + +MockSyncCoreProxy::~MockSyncCoreProxy() { +} + +void MockSyncCoreProxy::ConnectTypeToCore( + syncer::ModelType type, + const DataTypeState& data_type_state, + base::WeakPtr<syncer::NonBlockingTypeProcessor> type_processor) { + // This class is allowed to participate in only one connection. + DCHECK(!mock_core_->is_connected_); + mock_core_->is_connected_ = true; + + // Hands off ownership of our member to the type_processor, while keeping + // an unsafe pointer to it. This is why we can only connect once. + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core(mock_core_); + + type_processor->OnConnect(core.Pass()); +} + +void MockSyncCoreProxy::Disconnect(syncer::ModelType type) { + // This mock object is not meant for connect and disconnect tests. + NOTREACHED() << "Not implemented"; +} + +scoped_ptr<SyncCoreProxy> MockSyncCoreProxy::Clone() const { + // There's no sensible way to clone this MockSyncCoreProxy. + return scoped_ptr<SyncCoreProxy>(new MockSyncCoreProxy(mock_core_)); +} + +MockNonBlockingTypeProcessorCore* MockSyncCoreProxy::GetMockProcessorCore() { + return mock_core_; +} + +} // namespace + +// Tests the sync engine parts of NonBlockingTypeProcessor. +// +// The NonBlockingTypeProcessor contains a non-trivial amount of code dedicated +// to turning the sync engine on and off again. That code is fairly well +// tested in the NonBlockingDataTypeController unit tests and it doesn't need +// to be re-tested here. +// +// These tests skip past initialization and focus on steady state sync engine +// behvior. This is where we test how the processor responds to the model's +// requests to make changes to its data, the messages incoming fro the sync +// server, and what happens when the two conflict. +// +// Inputs: +// - Initial state from permanent storage. (TODO) +// - Create, update or delete requests from the model. +// - Update responses and commit responses from the server. +// +// Outputs: +// - Writes to permanent storage. (TODO) +// - Callbacks into the model. (TODO) +// - Requests to the sync thread. Tested with MockNonBlockingTypeProcessorCore. +class NonBlockingTypeProcessorTest : public ::testing::Test { + public: + NonBlockingTypeProcessorTest(); + virtual ~NonBlockingTypeProcessorTest(); + + // Explicit initialization step. Kept separate to allow tests to inject + // on-disk state before the test begins. + void Initialize(); + + // Local data modification. Emulates signals from the model thread. + void WriteItem(const std::string& tag, const std::string& value); + void DeleteItem(const std::string& tag); + + // Emulate updates from the server. + // This harness has some functionality to help emulate server behavior. + // See the definitions of these methods for more information. + void UpdateFromServer(int64 version_offset, + const std::string& tag, + const std::string& value); + void TombstoneFromServer(int64 version_offset, const std::string& tag); + + // Read emitted commit requests as batches. + size_t GetNumCommitRequestLists(); + CommitRequestDataList GetNthCommitRequestList(size_t n); + + // Read emitted commit requests by tag, most recent only. + bool HasCommitRequestForTag(const std::string& tag); + CommitRequestData GetLatestCommitRequestForTag(const std::string& tag); + + // Sends the processor a successful commit response. + void SuccessfulCommitResponse(const CommitRequestData& request_data); + + private: + std::string GenerateId(const std::string& tag) const; + std::string GenerateTagHash(const std::string& tag) const; + sync_pb::EntitySpecifics GenerateSpecifics(const std::string& tag, + const std::string& value) const; + + int64 GetServerVersion(const std::string& tag); + void SetServerVersion(const std::string& tag, int64 version); + + const ModelType type_; + + scoped_ptr<MockSyncCoreProxy> mock_sync_core_proxy_; + scoped_ptr<NonBlockingTypeProcessor> processor_; + MockNonBlockingTypeProcessorCore* mock_processor_core_; + + DataTypeState data_type_state_; + + std::map<const std::string, int64> server_versions_; +}; + +NonBlockingTypeProcessorTest::NonBlockingTypeProcessorTest() + : type_(PREFERENCES), + mock_sync_core_proxy_(new MockSyncCoreProxy()), + processor_(new NonBlockingTypeProcessor(type_)) { +} + +NonBlockingTypeProcessorTest::~NonBlockingTypeProcessorTest() { +} + +void NonBlockingTypeProcessorTest::Initialize() { + processor_->Enable(mock_sync_core_proxy_->Clone()); + mock_processor_core_ = mock_sync_core_proxy_->GetMockProcessorCore(); +} + +void NonBlockingTypeProcessorTest::WriteItem(const std::string& tag, + const std::string& value) { + const std::string tag_hash = GenerateTagHash(tag); + processor_->Put(tag, GenerateSpecifics(tag, value)); +} + +void NonBlockingTypeProcessorTest::DeleteItem(const std::string& tag) { + processor_->Delete(tag); +} + +void NonBlockingTypeProcessorTest::UpdateFromServer(int64 version_offset, + const std::string& tag, + const std::string& value) { + const std::string tag_hash = GenerateTagHash(tag); + + // Overwrite the existing server version if this is the new highest version. + int64 old_version = GetServerVersion(tag_hash); + int64 version = old_version + version_offset; + if (version > old_version) { + SetServerVersion(tag_hash, version); + } + + UpdateResponseData data; + data.id = GenerateId(tag_hash); + data.client_tag_hash = tag_hash; + data.response_version = version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.mtime = data.ctime + base::TimeDelta::FromSeconds(version); + data.non_unique_name = tag; + data.deleted = false; + data.specifics = GenerateSpecifics(tag, value); + + UpdateResponseDataList list; + list.push_back(data); + + processor_->OnUpdateReceived(data_type_state_, list); +} + +void NonBlockingTypeProcessorTest::TombstoneFromServer(int64 version_offset, + const std::string& tag) { + // Overwrite the existing server version if this is the new highest version. + std::string tag_hash = GenerateTagHash(tag); + int64 old_version = GetServerVersion(tag_hash); + int64 version = old_version + version_offset; + if (version > old_version) { + SetServerVersion(tag_hash, version); + } + + UpdateResponseData data; + data.id = GenerateId(tag_hash); + data.client_tag_hash = tag_hash; + data.response_version = version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.mtime = data.ctime + base::TimeDelta::FromSeconds(version); + data.non_unique_name = tag; + data.deleted = true; + + UpdateResponseDataList list; + list.push_back(data); + + processor_->OnUpdateReceived(data_type_state_, list); +} + +void NonBlockingTypeProcessorTest::SuccessfulCommitResponse( + const CommitRequestData& request_data) { + const std::string& client_tag_hash = request_data.client_tag_hash; + CommitResponseData response_data; + + if (request_data.base_version == 0) { + // Server assigns new ID to newly committed items. + DCHECK(request_data.id.empty()); + response_data.id = request_data.id; + } else { + // Otherwise we reuse the ID from the request. + response_data.id = GenerateId(client_tag_hash); + } + + response_data.client_tag_hash = client_tag_hash; + response_data.sequence_number = request_data.sequence_number; + + // Increment the server version on successful commit. + int64 version = GetServerVersion(client_tag_hash); + version++; + SetServerVersion(client_tag_hash, version); + + response_data.response_version = version; + + CommitResponseDataList list; + list.push_back(response_data); + + processor_->OnCommitCompletion(data_type_state_, list); +} + +int64 NonBlockingTypeProcessorTest::GetServerVersion(const std::string& tag) { + std::map<const std::string, int64>::const_iterator it; + it = server_versions_.find(tag); + if (it == server_versions_.end()) { + return 0; + } else { + return it->second; + } +} + +void NonBlockingTypeProcessorTest::SetServerVersion(const std::string& tag_hash, + int64 version) { + server_versions_[tag_hash] = version; +} + +std::string NonBlockingTypeProcessorTest::GenerateId( + const std::string& tag) const { + return "FakeId:" + tag; +} + +std::string NonBlockingTypeProcessorTest::GenerateTagHash( + const std::string& tag) const { + return syncable::GenerateSyncableHash(type_, tag); +} + +sync_pb::EntitySpecifics NonBlockingTypeProcessorTest::GenerateSpecifics( + const std::string& tag, + const std::string& value) const { + sync_pb::EntitySpecifics specifics; + specifics.mutable_preference()->set_name(tag); + specifics.mutable_preference()->set_value(value); + return specifics; +} + +size_t NonBlockingTypeProcessorTest::GetNumCommitRequestLists() { + return mock_processor_core_->commit_request_lists_.size(); +} + +CommitRequestDataList NonBlockingTypeProcessorTest::GetNthCommitRequestList( + size_t n) { + DCHECK_LT(n, GetNumCommitRequestLists()); + return mock_processor_core_->commit_request_lists_[n]; +} + +bool NonBlockingTypeProcessorTest::HasCommitRequestForTag( + const std::string& tag) { + const std::string tag_hash = GenerateTagHash(tag); + const std::vector<CommitRequestDataList>& lists = + mock_processor_core_->commit_request_lists_; + + // Iterate backward through the sets of commit requests to find the most + // recent one that applies to the specified tag. + for (std::vector<CommitRequestDataList>::const_reverse_iterator lists_it = + lists.rbegin(); + lists_it != lists.rend(); + ++lists_it) { + for (CommitRequestDataList::const_iterator it = lists_it->begin(); + it != lists_it->end(); + ++it) { + if (it->client_tag_hash == tag_hash) { + return true; + } + } + } + + return false; +} + +CommitRequestData NonBlockingTypeProcessorTest::GetLatestCommitRequestForTag( + const std::string& tag) { + const std::string tag_hash = GenerateTagHash(tag); + const std::vector<CommitRequestDataList>& lists = + mock_processor_core_->commit_request_lists_; + + // Iterate backward through the sets of commit requests to find the most + // recent one that applies to the specified tag. + for (std::vector<CommitRequestDataList>::const_reverse_iterator lists_it = + lists.rbegin(); + lists_it != lists.rend(); + ++lists_it) { + for (CommitRequestDataList::const_iterator it = lists_it->begin(); + it != lists_it->end(); + ++it) { + if (it->client_tag_hash == tag_hash) { + return *it; + } + } + } + + NOTREACHED() << "Could not find any commits for given tag " << tag << ". " + << "Test should have checked HasCommitRequestForTag() first."; + return CommitRequestData(); +} + +// Creates a new item locally. +// Thoroughly tests the data generated by a local item creation. +TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { + Initialize(); + EXPECT_EQ(0U, GetNumCommitRequestLists()); + + WriteItem("tag1", "value1"); + + // Verify the commit request this operation has triggered. + EXPECT_EQ(1U, GetNumCommitRequestLists()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + const CommitRequestData& tag1_data = GetLatestCommitRequestForTag("tag1"); + + EXPECT_TRUE(tag1_data.id.empty()); + EXPECT_EQ(0, tag1_data.base_version); + EXPECT_FALSE(tag1_data.ctime.is_null()); + EXPECT_FALSE(tag1_data.mtime.is_null()); + EXPECT_EQ("tag1", tag1_data.non_unique_name); + EXPECT_FALSE(tag1_data.deleted); + EXPECT_EQ("tag1", tag1_data.specifics.preference().name()); + EXPECT_EQ("value1", tag1_data.specifics.preference().value()); +} + +// Creates a new local item then modifies it. +// Thoroughly tests data generated by modification of server-unknown item. +TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { + Initialize(); + EXPECT_EQ(0U, GetNumCommitRequestLists()); + + WriteItem("tag1", "value1"); + EXPECT_EQ(1U, GetNumCommitRequestLists()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1"); + + WriteItem("tag1", "value2"); + EXPECT_EQ(2U, GetNumCommitRequestLists()); + + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + const CommitRequestData& tag1_v2_data = GetLatestCommitRequestForTag("tag1"); + + // Test some of the relations between old and new commit requests. + EXPECT_EQ(tag1_v1_data.specifics.preference().value(), "value1"); + EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number); + + // Perform a thorough examination of the update-generated request. + EXPECT_TRUE(tag1_v2_data.id.empty()); + EXPECT_EQ(0, tag1_v2_data.base_version); + EXPECT_FALSE(tag1_v2_data.ctime.is_null()); + EXPECT_FALSE(tag1_v2_data.mtime.is_null()); + EXPECT_EQ("tag1", tag1_v2_data.non_unique_name); + EXPECT_FALSE(tag1_v2_data.deleted); + EXPECT_EQ("tag1", tag1_v2_data.specifics.preference().name()); + EXPECT_EQ("value2", tag1_v2_data.specifics.preference().value()); +} + +// Deletes an item we've never seen before. +// Should have no effect and not crash. +TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) { + Initialize(); + + DeleteItem("tag1"); + EXPECT_EQ(0U, GetNumCommitRequestLists()); +} + +// Creates an item locally then deletes it. +// +// In this test, no commit responses are received, so the deleted item is +// server-unknown as far as the model thread is concerned. That behavior +// is race-dependent; other tests are used to test other races. +TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { + Initialize(); + + WriteItem("tag1", "value1"); + EXPECT_EQ(1U, GetNumCommitRequestLists()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1"); + + DeleteItem("tag1"); + EXPECT_EQ(2U, GetNumCommitRequestLists()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + const CommitRequestData& tag1_v2_data = GetLatestCommitRequestForTag("tag1"); + + EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number); + + EXPECT_TRUE(tag1_v2_data.id.empty()); + EXPECT_EQ(0, tag1_v2_data.base_version); + EXPECT_TRUE(tag1_v2_data.deleted); +} + +// Creates an item locally then deletes it. +// +// The item is created locally then enqueued for commit. The sync thread +// successfully commits it, but, before the commit response is picked up +// by the model thread, the item is deleted by the model thread. +TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) { + Initialize(); + + WriteItem("tag1", "value1"); + EXPECT_EQ(1U, GetNumCommitRequestLists()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + const CommitRequestData& tag1_v1_data = GetLatestCommitRequestForTag("tag1"); + + DeleteItem("tag1"); + EXPECT_EQ(2U, GetNumCommitRequestLists()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + + // This commit happened while the deletion was in progress, but the commit + // response didn't arrive on our thread until after the delete was issued to + // the sync thread. It will update some metadata, but won't do much else. + SuccessfulCommitResponse(tag1_v1_data); + + // TODO(rlarocque): Verify the state of the item is correct once we get + // storage hooked up in these tests. For example, verify the item is still + // marked as deleted. +} + +// Creates two different sync items. +// Verifies that the second has no effect on the first. +TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) { + Initialize(); + EXPECT_EQ(0U, GetNumCommitRequestLists()); + + WriteItem("tag1", "value1"); + + // There should be one commit request for this item only. + ASSERT_EQ(1U, GetNumCommitRequestLists()); + EXPECT_EQ(1U, GetNthCommitRequestList(0).size()); + ASSERT_TRUE(HasCommitRequestForTag("tag1")); + + WriteItem("tag2", "value2"); + + // The second write should trigger another single-item commit request. + ASSERT_EQ(2U, GetNumCommitRequestLists()); + EXPECT_EQ(1U, GetNthCommitRequestList(1).size()); + ASSERT_TRUE(HasCommitRequestForTag("tag2")); +} + +// TODO(rlarocque): Add more testing of non_unique_name fields. + +} // namespace syncer diff --git a/sync/internal_api/public/sync_core_proxy.h b/sync/internal_api/public/sync_core_proxy.h index 98f481d..099b5f9 100644 --- a/sync/internal_api/public/sync_core_proxy.h +++ b/sync/internal_api/public/sync_core_proxy.h @@ -11,6 +11,7 @@ namespace syncer { class NonBlockingTypeProcessor; +struct DataTypeState; // Interface for the datatype integration logic from non-sync threads. // @@ -25,6 +26,7 @@ class SYNC_EXPORT_PRIVATE SyncCoreProxy { // Must be called from the thread where the data type lives. virtual void ConnectTypeToCore( syncer::ModelType type, + const DataTypeState& data_type_state, base::WeakPtr<NonBlockingTypeProcessor> type_processor) = 0; // Tells the syncer that we're no longer interested in syncing this type. diff --git a/sync/internal_api/public/test/null_sync_core_proxy.h b/sync/internal_api/public/test/null_sync_core_proxy.h index d957aec5..4c8fc67 100644 --- a/sync/internal_api/public/test/null_sync_core_proxy.h +++ b/sync/internal_api/public/test/null_sync_core_proxy.h @@ -22,6 +22,7 @@ class NullSyncCoreProxy : public SyncCoreProxy { virtual void ConnectTypeToCore( syncer::ModelType type, + const DataTypeState& data_type_state, base::WeakPtr<NonBlockingTypeProcessor> processor) OVERRIDE; virtual void Disconnect(syncer::ModelType type) OVERRIDE; virtual scoped_ptr<SyncCoreProxy> Clone() const OVERRIDE; diff --git a/sync/internal_api/sync_core.cc b/sync/internal_api/sync_core.cc index dbeb76d..6707176 100644 --- a/sync/internal_api/sync_core.cc +++ b/sync/internal_api/sync_core.cc @@ -16,13 +16,14 @@ SyncCore::~SyncCore() {} void SyncCore::ConnectSyncTypeToCore( ModelType type, + const DataTypeState& data_type_state, scoped_refptr<base::SequencedTaskRunner> task_runner, base::WeakPtr<NonBlockingTypeProcessor> processor) { - // Initialize the processor's sync-thread sibling and the // processor <-> processor_core (ie. model thread <-> sync thread) // communication channel. - model_type_registry_->InitializeNonBlockingType(type, task_runner, processor); + model_type_registry_->InitializeNonBlockingType( + type, data_type_state, task_runner, processor); } void SyncCore::Disconnect(ModelType type) { diff --git a/sync/internal_api/sync_core.h b/sync/internal_api/sync_core.h index 6f63070..417a296 100644 --- a/sync/internal_api/sync_core.h +++ b/sync/internal_api/sync_core.h @@ -15,6 +15,7 @@ namespace syncer { class ModelTypeRegistry; class NonBlockingTypeProcessor; +struct DataTypeState; // An interface of the core parts of sync. // @@ -32,9 +33,10 @@ class SYNC_EXPORT_PRIVATE SyncCore { // Initializes the connection between the sync core and its delegate on the // sync client's thread. void ConnectSyncTypeToCore( - syncer::ModelType type, - scoped_refptr<base::SequencedTaskRunner> datatype_task_runner, - base::WeakPtr<NonBlockingTypeProcessor> sync_client); + syncer::ModelType type, + const DataTypeState& data_type_state, + scoped_refptr<base::SequencedTaskRunner> datatype_task_runner, + base::WeakPtr<NonBlockingTypeProcessor> sync_client); // Disconnects the syncer from the model and stops syncing the type. // diff --git a/sync/internal_api/sync_core_proxy_impl.cc b/sync/internal_api/sync_core_proxy_impl.cc index 7e4ecce..81c0fa2 100644 --- a/sync/internal_api/sync_core_proxy_impl.cc +++ b/sync/internal_api/sync_core_proxy_impl.cc @@ -7,6 +7,7 @@ #include "base/bind.h" #include "base/location.h" #include "base/message_loop/message_loop_proxy.h" +#include "sync/engine/non_blocking_sync_common.h" #include "sync/internal_api/sync_core.h" namespace syncer { @@ -21,15 +22,16 @@ SyncCoreProxyImpl::~SyncCoreProxyImpl() {} void SyncCoreProxyImpl::ConnectTypeToCore( ModelType type, + const DataTypeState& data_type_state, base::WeakPtr<NonBlockingTypeProcessor> type_processor) { VLOG(1) << "ConnectTypeToCore: " << ModelTypeToString(type); - sync_task_runner_->PostTask( - FROM_HERE, - base::Bind(&SyncCore::ConnectSyncTypeToCore, - sync_core_, - type, - base::MessageLoopProxy::current(), - type_processor)); + sync_task_runner_->PostTask(FROM_HERE, + base::Bind(&SyncCore::ConnectSyncTypeToCore, + sync_core_, + type, + data_type_state, + base::MessageLoopProxy::current(), + type_processor)); } void SyncCoreProxyImpl::Disconnect(ModelType type) { diff --git a/sync/internal_api/sync_core_proxy_impl.h b/sync/internal_api/sync_core_proxy_impl.h index 3622d2b..91061fe 100644 --- a/sync/internal_api/sync_core_proxy_impl.h +++ b/sync/internal_api/sync_core_proxy_impl.h @@ -16,6 +16,7 @@ namespace syncer { class SyncCore; class NonBlockingTypeProcessor; +struct DataTypeState; // Encapsulates a reference to the sync core and the thread it's running on. // Used by sync's data types to connect with the sync core. @@ -38,6 +39,7 @@ class SYNC_EXPORT_PRIVATE SyncCoreProxyImpl : public SyncCoreProxy { // Must be called from the thread where the data type lives. virtual void ConnectTypeToCore( syncer::ModelType type, + const DataTypeState& data_type_state, base::WeakPtr<NonBlockingTypeProcessor> type_processor) OVERRIDE; // Disables syncing for the given type on the sync thread. diff --git a/sync/internal_api/test/null_sync_core_proxy.cc b/sync/internal_api/test/null_sync_core_proxy.cc index e0c93f6..14d9d65 100644 --- a/sync/internal_api/test/null_sync_core_proxy.cc +++ b/sync/internal_api/test/null_sync_core_proxy.cc @@ -12,7 +12,8 @@ NullSyncCoreProxy::~NullSyncCoreProxy() {} void NullSyncCoreProxy::ConnectTypeToCore( syncer::ModelType type, - base::WeakPtr<NonBlockingTypeProcessor> processor) { + const DataTypeState& data_type_state, + base::WeakPtr<NonBlockingTypeProcessor> processor) { NOTREACHED() << "NullSyncCoreProxy is not meant to be used"; } diff --git a/sync/sessions/model_type_registry.cc b/sync/sessions/model_type_registry.cc index 9cc6869..7ca68c7 100644 --- a/sync/sessions/model_type_registry.cc +++ b/sync/sessions/model_type_registry.cc @@ -9,12 +9,49 @@ #include "base/observer_list.h" #include "sync/engine/directory_commit_contributor.h" #include "sync/engine/directory_update_handler.h" +#include "sync/engine/non_blocking_sync_common.h" #include "sync/engine/non_blocking_type_processor.h" #include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/engine/non_blocking_type_processor_core_interface.h" #include "sync/sessions/directory_type_debug_info_emitter.h" namespace syncer { +namespace { + +class NonBlockingTypeProcessorCoreWrapper + : public NonBlockingTypeProcessorCoreInterface { + public: + NonBlockingTypeProcessorCoreWrapper( + base::WeakPtr<NonBlockingTypeProcessorCore> core, + scoped_refptr<base::SequencedTaskRunner> sync_thread); + virtual ~NonBlockingTypeProcessorCoreWrapper(); + + virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; + + private: + base::WeakPtr<NonBlockingTypeProcessorCore> core_; + scoped_refptr<base::SequencedTaskRunner> sync_thread_; +}; + +NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper( + base::WeakPtr<NonBlockingTypeProcessorCore> core, + scoped_refptr<base::SequencedTaskRunner> sync_thread) + : core_(core), sync_thread_(sync_thread) { +} + +NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() { +} + +void NonBlockingTypeProcessorCoreWrapper::RequestCommits( + const CommitRequestDataList& list) { + sync_thread_->PostTask( + FROM_HERE, + base::Bind(&NonBlockingTypeProcessorCore::RequestCommits, core_, list)); +} + +} // namespace + ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} ModelTypeRegistry::ModelTypeRegistry( @@ -96,6 +133,7 @@ void ModelTypeRegistry::SetEnabledDirectoryTypes( void ModelTypeRegistry::InitializeNonBlockingType( ModelType type, + const DataTypeState& data_type_state, scoped_refptr<base::SequencedTaskRunner> type_task_runner, base::WeakPtr<NonBlockingTypeProcessor> processor) { DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); @@ -104,14 +142,18 @@ void ModelTypeRegistry::InitializeNonBlockingType( scoped_ptr<NonBlockingTypeProcessorCore> core( new NonBlockingTypeProcessorCore(type, type_task_runner, processor)); + // TODO(rlarocque): DataTypeState should be forwarded to core here. + // Initialize Processor -> CoreProcessor communication channel. - type_task_runner->PostTask( - FROM_HERE, - base::Bind(&NonBlockingTypeProcessor::OnConnect, - processor, - core->AsWeakPtr(), - scoped_refptr<base::SequencedTaskRunner>( - base::MessageLoopProxy::current()))); + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface( + new NonBlockingTypeProcessorCoreWrapper( + core->AsWeakPtr(), + scoped_refptr<base::SequencedTaskRunner>( + base::MessageLoopProxy::current()))); + type_task_runner->PostTask(FROM_HERE, + base::Bind(&NonBlockingTypeProcessor::OnConnect, + processor, + base::Passed(&core_interface))); DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); diff --git a/sync/sessions/model_type_registry.h b/sync/sessions/model_type_registry.h index 20d6d20..6d05dfc 100644 --- a/sync/sessions/model_type_registry.h +++ b/sync/sessions/model_type_registry.h @@ -28,6 +28,7 @@ class DirectoryTypeDebugInfoEmitter; class NonBlockingTypeProcessorCore; class NonBlockingTypeProcessor; class UpdateHandler; +struct DataTypeState; typedef std::map<ModelType, UpdateHandler*> UpdateHandlerMap; typedef std::map<ModelType, CommitContributor*> CommitContributorMap; @@ -56,6 +57,7 @@ class SYNC_EXPORT_PRIVATE ModelTypeRegistry { // Expects that the processor's ModelType is not currently enabled. void InitializeNonBlockingType( syncer::ModelType type, + const DataTypeState& data_type_state, scoped_refptr<base::SequencedTaskRunner> type_task_runner, base::WeakPtr<NonBlockingTypeProcessor> processor); diff --git a/sync/sessions/model_type_registry_unittest.cc b/sync/sessions/model_type_registry_unittest.cc index 6b344eb..2d42215 100644 --- a/sync/sessions/model_type_registry_unittest.cc +++ b/sync/sessions/model_type_registry_unittest.cc @@ -23,6 +23,14 @@ class ModelTypeRegistryTest : public ::testing::Test { ModelTypeRegistry* registry(); + static DataTypeState MakeInitialDataTypeState(ModelType type) { + DataTypeState state; + state.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(type)); + state.next_client_id = 0; + return state; + } + private: syncable::Directory* directory(); @@ -139,13 +147,17 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypes) { EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); - registry()->InitializeNonBlockingType( - syncer::THEMES, task_runner, themes_processor.AsWeakPtrForUI()); + registry()->InitializeNonBlockingType(syncer::THEMES, + MakeInitialDataTypeState(THEMES), + task_runner, + themes_processor.AsWeakPtrForUI()); EXPECT_TRUE(registry()->GetEnabledTypes().Equals( ModelTypeSet(syncer::THEMES))); - registry()->InitializeNonBlockingType( - syncer::SESSIONS, task_runner, sessions_processor.AsWeakPtrForUI()); + registry()->InitializeNonBlockingType(syncer::SESSIONS, + MakeInitialDataTypeState(SESSIONS), + task_runner, + sessions_processor.AsWeakPtrForUI()); EXPECT_TRUE(registry()->GetEnabledTypes().Equals( ModelTypeSet(syncer::THEMES, syncer::SESSIONS))); @@ -172,8 +184,10 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) { EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); // Add the themes non-blocking type. - registry()->InitializeNonBlockingType( - syncer::THEMES, task_runner, themes_processor.AsWeakPtrForUI()); + registry()->InitializeNonBlockingType(syncer::THEMES, + MakeInitialDataTypeState(THEMES), + task_runner, + themes_processor.AsWeakPtrForUI()); current_types.Put(syncer::THEMES); EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); @@ -183,8 +197,10 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) { EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); // Add sessions non-blocking type. - registry()->InitializeNonBlockingType( - syncer::SESSIONS, task_runner, sessions_processor.AsWeakPtrForUI()); + registry()->InitializeNonBlockingType(syncer::SESSIONS, + MakeInitialDataTypeState(SESSIONS), + task_runner, + sessions_processor.AsWeakPtrForUI()); current_types.Put(syncer::SESSIONS); EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); @@ -210,10 +226,14 @@ TEST_F(ModelTypeRegistryTest, DeletionOrdering) { EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); - registry()->InitializeNonBlockingType( - syncer::THEMES, task_runner, themes_processor->AsWeakPtrForUI()); - registry()->InitializeNonBlockingType( - syncer::SESSIONS, task_runner, sessions_processor->AsWeakPtrForUI()); + registry()->InitializeNonBlockingType(syncer::THEMES, + MakeInitialDataTypeState(THEMES), + task_runner, + themes_processor->AsWeakPtrForUI()); + registry()->InitializeNonBlockingType(syncer::SESSIONS, + MakeInitialDataTypeState(SESSIONS), + task_runner, + sessions_processor->AsWeakPtrForUI()); EXPECT_TRUE(registry()->GetEnabledTypes().Equals( ModelTypeSet(syncer::THEMES, syncer::SESSIONS))); diff --git a/sync/sync_core.gypi b/sync/sync_core.gypi index 074ddd4..bec8952 100644 --- a/sync/sync_core.gypi +++ b/sync/sync_core.gypi @@ -60,14 +60,20 @@ 'engine/get_updates_delegate.h', 'engine/get_updates_processor.cc', 'engine/get_updates_processor.h', + 'engine/model_thread_sync_entity.cc', + 'engine/model_thread_sync_entity.h', 'engine/net/server_connection_manager.cc', 'engine/net/server_connection_manager.h', 'engine/net/url_translator.cc', 'engine/net/url_translator.h', + 'engine/non_blocking_sync_common.cc', + 'engine/non_blocking_sync_common.h', 'engine/non_blocking_type_processor.cc', 'engine/non_blocking_type_processor.h', 'engine/non_blocking_type_processor_core.cc', 'engine/non_blocking_type_processor_core.h', + 'engine/non_blocking_type_processor_core_interface.cc', + 'engine/non_blocking_type_processor_core_interface.h', 'engine/nudge_source.cc', 'engine/nudge_source.h', 'engine/process_updates_util.cc', diff --git a/sync/sync_tests.gypi b/sync/sync_tests.gypi index af63ad9..0948f00b 100644 --- a/sync/sync_tests.gypi +++ b/sync/sync_tests.gypi @@ -299,6 +299,8 @@ 'engine/directory_commit_contribution_unittest.cc', 'engine/directory_update_handler_unittest.cc', 'engine/get_updates_processor_unittest.cc', + 'engine/model_thread_sync_entity_unittest.cc', + 'engine/non_blocking_type_processor_unittest.cc', 'engine/sync_scheduler_unittest.cc', 'engine/syncer_proto_util_unittest.cc', 'engine/syncer_unittest.cc', |