diff options
Diffstat (limited to 'sync/engine')
-rw-r--r-- | sync/engine/model_thread_sync_entity.cc | 2 | ||||
-rw-r--r-- | sync/engine/non_blocking_sync_common.cc | 15 | ||||
-rw-r--r-- | sync/engine/non_blocking_sync_common.h | 7 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_commit_contribution.cc | 119 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_commit_contribution.h | 66 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor.cc | 11 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor_core.cc | 242 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor_core.h | 59 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor_core_unittest.cc | 961 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor_interface.cc | 15 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor_interface.h | 31 | ||||
-rw-r--r-- | sync/engine/non_blocking_type_processor_unittest.cc | 63 | ||||
-rw-r--r-- | sync/engine/sync_thread_sync_entity.cc | 242 | ||||
-rw-r--r-- | sync/engine/sync_thread_sync_entity.h | 155 | ||||
-rw-r--r-- | sync/engine/sync_thread_sync_entity_unittest.cc | 164 |
15 files changed, 2099 insertions, 53 deletions
diff --git a/sync/engine/model_thread_sync_entity.cc b/sync/engine/model_thread_sync_entity.cc index fde2eb3..6e466f5 100644 --- a/sync/engine/model_thread_sync_entity.cc +++ b/sync/engine/model_thread_sync_entity.cc @@ -15,7 +15,7 @@ scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::NewLocalItem( 1, 0, 0, - 0, + kUncommittedVersion, true, std::string(), // Sync thread will assign the initial ID. syncable::GenerateSyncableHash(GetModelTypeFromSpecifics(specifics), diff --git a/sync/engine/non_blocking_sync_common.cc b/sync/engine/non_blocking_sync_common.cc index c5a3656..0042990 100644 --- a/sync/engine/non_blocking_sync_common.cc +++ b/sync/engine/non_blocking_sync_common.cc @@ -6,25 +6,32 @@ namespace syncer { -DataTypeState::DataTypeState() { +DataTypeState::DataTypeState() : next_client_id(0), initial_sync_done(false) { } DataTypeState::~DataTypeState() { } -CommitRequestData::CommitRequestData() { +CommitRequestData::CommitRequestData() + : sequence_number(0), + base_version(0), + deleted(false) { } CommitRequestData::~CommitRequestData() { } -CommitResponseData::CommitResponseData() { +CommitResponseData::CommitResponseData() + : sequence_number(0), + response_version(0) { } CommitResponseData::~CommitResponseData() { } -UpdateResponseData::UpdateResponseData() { +UpdateResponseData::UpdateResponseData() + : response_version(0), + deleted(false) { } UpdateResponseData::~UpdateResponseData() { diff --git a/sync/engine/non_blocking_sync_common.h b/sync/engine/non_blocking_sync_common.h index f07fdb2..d0e0f9d 100644 --- a/sync/engine/non_blocking_sync_common.h +++ b/sync/engine/non_blocking_sync_common.h @@ -14,6 +14,8 @@ namespace syncer { +static const int64 kUncommittedVersion = -1; + // Data-type global state that must be accessed and updated on the sync thread, // but persisted on or through the model thread. struct SYNC_EXPORT_PRIVATE DataTypeState { @@ -43,6 +45,11 @@ struct SYNC_EXPORT_PRIVATE DataTypeState { // client-tagged data types supported by non-blocking sync, but we will // continue to emulate the directory sync's behavior for now. int64 next_client_id; + + // This flag is set to true when the first download cycle is complete. The + // NonBlockingTypeProcessor should not attempt to commit any items until this + // flag is set. + bool initial_sync_done; }; struct SYNC_EXPORT_PRIVATE CommitRequestData { diff --git a/sync/engine/non_blocking_type_commit_contribution.cc b/sync/engine/non_blocking_type_commit_contribution.cc new file mode 100644 index 0000000..9e48ad6 --- /dev/null +++ b/sync/engine/non_blocking_type_commit_contribution.cc @@ -0,0 +1,119 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_commit_contribution.h" + +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/protocol/proto_value_conversions.h" + +namespace syncer { + +NonBlockingTypeCommitContribution::NonBlockingTypeCommitContribution( + const sync_pb::DataTypeContext& context, + const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities, + const std::vector<int64>& sequence_numbers, + NonBlockingTypeProcessorCore* processor_core) + : processor_core_(processor_core), + context_(context), + entities_(entities), + sequence_numbers_(sequence_numbers), + cleaned_up_(false) { +} + +NonBlockingTypeCommitContribution::~NonBlockingTypeCommitContribution() { + DCHECK(cleaned_up_); +} + +void NonBlockingTypeCommitContribution::AddToCommitMessage( + sync_pb::ClientToServerMessage* msg) { + sync_pb::CommitMessage* commit_message = msg->mutable_commit(); + entries_start_index_ = commit_message->entries_size(); + + std::copy(entities_.begin(), + entities_.end(), + RepeatedPtrFieldBackInserter(commit_message->mutable_entries())); + if (!context_.context().empty()) + commit_message->add_client_contexts()->CopyFrom(context_); +} + +SyncerError NonBlockingTypeCommitContribution::ProcessCommitResponse( + const sync_pb::ClientToServerResponse& response, + sessions::StatusController* status) { + const sync_pb::CommitResponse& commit_response = response.commit(); + + bool transient_error = false; + bool commit_conflict = false; + bool unknown_error = false; + + CommitResponseDataList response_list; + + for (size_t i = 0; i < sequence_numbers_.size(); ++i) { + const sync_pb::CommitResponse_EntryResponse& entry_response = + commit_response.entryresponse(entries_start_index_ + i); + + switch (entry_response.response_type()) { + case sync_pb::CommitResponse::INVALID_MESSAGE: + LOG(ERROR) << "Server reports commit message is invalid."; + DLOG(ERROR) << "Message was: " << SyncEntityToValue(entities_.Get(i), + false); + unknown_error = true; + break; + case sync_pb::CommitResponse::CONFLICT: + DVLOG(1) << "Server reports conflict for commit message."; + DVLOG(1) << "Message was: " << SyncEntityToValue(entities_.Get(i), + false); + commit_conflict = true; + break; + case sync_pb::CommitResponse::SUCCESS: { + CommitResponseData response_data; + response_data.id = entry_response.id_string(); + response_data.client_tag_hash = + entities_.Get(i).client_defined_unique_tag(); + response_data.sequence_number = sequence_numbers_[i]; + response_data.response_version = entry_response.version(); + response_list.push_back(response_data); + break; + } + case sync_pb::CommitResponse::OVER_QUOTA: + case sync_pb::CommitResponse::RETRY: + case sync_pb::CommitResponse::TRANSIENT_ERROR: + DLOG(WARNING) << "Entity commit blocked by transient error."; + transient_error = true; + break; + default: + LOG(ERROR) << "Bad return from ProcessSingleCommitResponse."; + unknown_error = true; + } + } + + // Send whatever successful responses we did get back to our parent. + // It's the schedulers job to handle the failures. + processor_core_->OnCommitResponse(response_list); + + // Let the scheduler know about the failures. + if (unknown_error) { + return SERVER_RETURN_UNKNOWN_ERROR; + } else if (transient_error) { + return SERVER_RETURN_TRANSIENT_ERROR; + } else if (commit_conflict) { + return SERVER_RETURN_CONFLICT; + } else { + return SYNCER_OK; + } +} + +void NonBlockingTypeCommitContribution::CleanUp() { + cleaned_up_ = true; + + // We could inform our parent NonBlockingCommitContributor that a commit is + // no longer in progress. The current implementation doesn't really care + // either way, so we don't bother sending the signal. +} + +size_t NonBlockingTypeCommitContribution::GetNumEntries() const { + return sequence_numbers_.size(); +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_commit_contribution.h b/sync/engine/non_blocking_type_commit_contribution.h new file mode 100644 index 0000000..4d415ca --- /dev/null +++ b/sync/engine/non_blocking_type_commit_contribution.h @@ -0,0 +1,66 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_ +#define SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_ + +#include <vector> + +#include "base/basictypes.h" +#include "sync/engine/commit_contribution.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +class NonBlockingTypeProcessorCore; + +// A non-blocking sync type's contribution to an outgoing commit message. +// +// Helps build a commit message and process its response. It collaborates +// closely with the NonBlockingTypeProcessorCore. +class NonBlockingTypeCommitContribution : public CommitContribution { + public: + NonBlockingTypeCommitContribution( + const sync_pb::DataTypeContext& context, + const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities, + const std::vector<int64>& sequence_numbers, + NonBlockingTypeProcessorCore* processor_core); + virtual ~NonBlockingTypeCommitContribution(); + + // Implementation of CommitContribution + virtual void AddToCommitMessage(sync_pb::ClientToServerMessage* msg) OVERRIDE; + virtual SyncerError ProcessCommitResponse( + const sync_pb::ClientToServerResponse& response, + sessions::StatusController* status) OVERRIDE; + virtual void CleanUp() OVERRIDE; + virtual size_t GetNumEntries() const OVERRIDE; + + private: + // A non-owned pointer back to the object that created this contribution. + NonBlockingTypeProcessorCore* const processor_core_; + + // The type-global context information. + const sync_pb::DataTypeContext context_; + + // The set of entities to be committed, serialized as SyncEntities. + const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> entities_; + + // The sequence numbers associated with the pending commits. These match up + // with the entities_ vector. + const std::vector<int64> sequence_numbers_; + + // The index in the commit message where this contribution's entities are + // added. Used to correlate per-item requests with per-item responses. + size_t entries_start_index_; + + // A flag used to ensure this object's contract is respected. Helps to check + // that CleanUp() is called before the object is destructed. + bool cleaned_up_; + + DISALLOW_COPY_AND_ASSIGN(NonBlockingTypeCommitContribution); +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_ diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc index 0aaead4..c1982b3 100644 --- a/sync/engine/non_blocking_type_processor.cc +++ b/sync/engine/non_blocking_type_processor.cc @@ -51,7 +51,6 @@ void NonBlockingTypeProcessor::Enable( // TODO(rlarocque): At some point, this should be loaded from storage. data_type_state_.progress_marker.set_data_type_id( GetSpecificsFieldNumberFromModelType(type_)); - data_type_state_.next_client_id = 0; sync_core_proxy_ = sync_core_proxy.Pass(); sync_core_proxy_->ConnectTypeToCore(GetModelType(), @@ -142,6 +141,10 @@ void NonBlockingTypeProcessor::FlushPendingCommitRequests() { if (!IsConnected()) return; + // Don't send anything if the type is not ready to handle commits. + if (!data_type_state_.initial_sync_done) + return; + // TODO(rlarocque): Do something smarter than iterate here. for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); ++it) { @@ -184,6 +187,9 @@ void NonBlockingTypeProcessor::OnCommitCompletion( void NonBlockingTypeProcessor::OnUpdateReceived( const DataTypeState& data_type_state, const UpdateResponseDataList& response_list) { + bool initial_sync_just_finished = + !data_type_state_.initial_sync_done && data_type_state.initial_sync_done; + data_type_state_ = data_type_state; for (UpdateResponseDataList::const_iterator list_it = response_list.begin(); @@ -215,6 +221,9 @@ void NonBlockingTypeProcessor::OnUpdateReceived( } } + if (initial_sync_just_finished) + FlushPendingCommitRequests(); + // TODO: Inform the model of the new or updated data. } diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc index 236c9c5..a14a8d0 100644 --- a/sync/engine/non_blocking_type_processor_core.cc +++ b/sync/engine/non_blocking_type_processor_core.cc @@ -4,20 +4,28 @@ #include "sync/engine/non_blocking_type_processor_core.h" +#include "base/bind.h" +#include "base/format_macros.h" #include "base/logging.h" +#include "base/strings/stringprintf.h" #include "sync/engine/commit_contribution.h" +#include "sync/engine/non_blocking_type_commit_contribution.h" +#include "sync/engine/non_blocking_type_processor_interface.h" +#include "sync/engine/sync_thread_sync_entity.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" namespace syncer { NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( - ModelType type, - scoped_refptr<base::SequencedTaskRunner> processor_task_runner, - base::WeakPtr<NonBlockingTypeProcessor> processor) + ModelType type, + const DataTypeState& initial_state, + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface) : type_(type), - processor_task_runner_(processor_task_runner), - processor_(processor), + data_type_state_(initial_state), + processor_interface_(processor_interface.Pass()), + entities_deleter_(&entities_), weak_ptr_factory_(this) { - progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type)); } NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() { @@ -32,16 +40,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const { void NonBlockingTypeProcessorCore::GetDownloadProgress( sync_pb::DataTypeProgressMarker* progress_marker) const { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_); - *progress_marker = progress_marker_; + progress_marker->CopyFrom(data_type_state_.progress_marker); } void NonBlockingTypeProcessorCore::GetDataTypeContext( sync_pb::DataTypeContext* context) const { - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting context for: " << ModelTypeToString(type_); - context->Clear(); + DCHECK(CalledOnValidThread()); + context->CopyFrom(data_type_state_.type_context); } SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( @@ -50,22 +55,78 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( const SyncEntityList& applicable_updates, sessions::StatusController* status) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_); - progress_marker_ = progress_marker; + + // TODO(rlarocque): Handle data type context conflicts. + data_type_state_.type_context = mutated_context; + data_type_state_.progress_marker = progress_marker; + + UpdateResponseDataList response_datas; + + for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); + update_it != applicable_updates.end(); + ++update_it) { + const sync_pb::SyncEntity* update_entity = *update_it; + if (!update_entity->server_defined_unique_tag().empty()) { + // We can't commit an item unless we know its parent ID. This is where + // we learn that ID and remember it forever. + DCHECK_EQ(ModelTypeToRootTag(type_), + update_entity->server_defined_unique_tag()); + if (!data_type_state_.type_root_id.empty()) { + DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); + } + data_type_state_.type_root_id = update_entity->id_string(); + } else { + // Normal updates are handled here. + const std::string& client_tag_hash = + update_entity->client_defined_unique_tag(); + DCHECK(!client_tag_hash.empty()); + EntityMap::const_iterator map_it = entities_.find(client_tag_hash); + if (map_it == entities_.end()) { + SyncThreadSyncEntity* entity = + SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(), + client_tag_hash, + update_entity->version()); + entities_.insert(std::make_pair(client_tag_hash, entity)); + } else { + SyncThreadSyncEntity* entity = map_it->second; + entity->ReceiveUpdate(update_entity->version()); + } + + // Prepare the message for the model thread. + UpdateResponseData response_data; + response_data.id = update_entity->id_string(); + response_data.client_tag_hash = client_tag_hash; + response_data.response_version = update_entity->version(); + response_data.ctime = ProtoTimeToTime(update_entity->ctime()); + response_data.mtime = ProtoTimeToTime(update_entity->mtime()); + response_data.non_unique_name = update_entity->name(); + response_data.deleted = update_entity->deleted(); + response_data.specifics = update_entity->specifics(); + + response_datas.push_back(response_data); + } + } + + // Forward these updates to the model thread so it can do the rest. + processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas); + return SYNCER_OK; } void NonBlockingTypeProcessorCore::ApplyUpdates( sessions::StatusController* status) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_); -} + // This function is called only when we've finished a download cycle, ie. we + // got a response with changes_remaining == 0. If this is our first download + // cycle, we should update our state so the NonBlockingTypeProcessor knows + // that it's safe to commit items now. + if (!data_type_state_.initial_sync_done) { + data_type_state_.initial_sync_done = true; -void NonBlockingTypeProcessorCore::RequestCommits( - const CommitRequestDataList& request_list) { - // TODO(rlarocque): Implement this. crbug.com/351005. + UpdateResponseDataList empty_update_list; + processor_interface_->ReceiveUpdateResponse(data_type_state_, + empty_update_list); + } } void NonBlockingTypeProcessorCore::PassiveApplyUpdates( @@ -75,13 +136,119 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates( << "ModelType is: " << ModelTypeToString(type_); } +void NonBlockingTypeProcessorCore::EnqueueForCommit( + const CommitRequestDataList& list) { + DCHECK(CalledOnValidThread()); + + DCHECK(CanCommitItems()) + << "Asked to commit items before type was initialized. " + << "ModelType is: " << ModelTypeToString(type_); + + for (CommitRequestDataList::const_iterator it = list.begin(); + it != list.end(); + ++it) { + StorePendingCommit(*it); + } +} + // CommitContributor implementation. scoped_ptr<CommitContribution> NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_); - return scoped_ptr<CommitContribution>(); + + size_t space_remaining = max_entries; + std::vector<int64> sequence_numbers; + google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; + + if (!CanCommitItems()) + return scoped_ptr<CommitContribution>(); + + // TODO(rlarocque): Avoid iterating here. + for (EntityMap::const_iterator it = entities_.begin(); + it != entities_.end() && space_remaining > 0; + ++it) { + SyncThreadSyncEntity* entity = it->second; + if (entity->IsCommitPending()) { + sync_pb::SyncEntity* commit_entity = commit_entities.Add(); + int64 sequence_number = -1; + + entity->PrepareCommitProto(commit_entity, &sequence_number); + HelpInitializeCommitEntity(commit_entity); + sequence_numbers.push_back(sequence_number); + + space_remaining--; + } + } + + if (commit_entities.size() == 0) + return scoped_ptr<CommitContribution>(); + + return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( + data_type_state_.type_context, commit_entities, sequence_numbers, this)); +} + +void NonBlockingTypeProcessorCore::StorePendingCommit( + const CommitRequestData& request) { + if (!request.deleted) { + DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); + } + + EntityMap::iterator map_it = entities_.find(request.client_tag_hash); + if (map_it == entities_.end()) { + SyncThreadSyncEntity* entity = + SyncThreadSyncEntity::FromCommitRequest(request.id, + request.client_tag_hash, + request.sequence_number, + request.base_version, + request.ctime, + request.mtime, + request.non_unique_name, + request.deleted, + request.specifics); + entities_.insert(std::make_pair(request.client_tag_hash, entity)); + } else { + SyncThreadSyncEntity* entity = map_it->second; + entity->RequestCommit(request.id, + request.client_tag_hash, + request.sequence_number, + request.base_version, + request.ctime, + request.mtime, + request.non_unique_name, + request.deleted, + request.specifics); + } + + // TODO: Nudge SyncScheduler. +} + +void NonBlockingTypeProcessorCore::OnCommitResponse( + const CommitResponseDataList& response_list) { + for (CommitResponseDataList::const_iterator response_it = + response_list.begin(); + response_it != response_list.end(); + ++response_it) { + const std::string client_tag_hash = response_it->client_tag_hash; + EntityMap::iterator map_it = entities_.find(client_tag_hash); + + // There's no way we could have committed an entry we know nothing about. + if (map_it == entities_.end()) { + NOTREACHED() << "Received commit response for item unknown to us." + << " Model type: " << ModelTypeToString(type_) + << " ID: " << response_it->id; + continue; + } + + SyncThreadSyncEntity* entity = map_it->second; + entity->ReceiveCommitResponse(response_it->id, + response_it->response_version, + response_it->sequence_number); + } + + // Send the responses back to the model thread. It needs to know which + // items have been successfully committed so it can save that information in + // permanent storage. + processor_interface_->ReceiveCommitResponse(data_type_state_, response_list); } base::WeakPtr<NonBlockingTypeProcessorCore> @@ -89,4 +256,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() { return weak_ptr_factory_.GetWeakPtr(); } +bool NonBlockingTypeProcessorCore::CanCommitItems() const { + // We can't commit anything until we know the type's parent node. + // We'll get it in the first update response. + return !data_type_state_.type_root_id.empty() && + data_type_state_.initial_sync_done; +} + +void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity( + sync_pb::SyncEntity* sync_entity) { + // Initial commits need our help to generate a client ID. + if (!sync_entity->has_id_string()) { + DCHECK_EQ(kUncommittedVersion, sync_entity->version()); + const int64 id = data_type_state_.next_client_id++; + sync_entity->set_id_string( + base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); + } + + // Always include enough specifics to identify the type. Do this even in + // deletion requests, where the specifics are otherwise invalid. + if (!sync_entity->has_specifics()) { + AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); + } + + // We're always responsible for the parent ID. + sync_entity->set_parent_id_string(data_type_state_.type_root_id); +} + } // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h index e226349..93d4215 100644 --- a/sync/engine/non_blocking_type_processor_core.h +++ b/sync/engine/non_blocking_type_processor_core.h @@ -6,6 +6,7 @@ #define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_CORE_H_ #include "base/memory/weak_ptr.h" +#include "base/stl_util.h" #include "base/threading/non_thread_safe.h" #include "sync/base/sync_export.h" #include "sync/engine/commit_contributor.h" @@ -20,7 +21,8 @@ class SingleThreadTaskRunner; namespace syncer { -class NonBlockingTypeProcessor; +class NonBlockingTypeProcessorInterface; +class SyncThreadSyncEntity; // A smart cache for sync types that use message passing (rather than // transactions and the syncable::Directory) to communicate with the sync @@ -28,9 +30,9 @@ class NonBlockingTypeProcessor; // // When the non-blocking sync type wants to talk with the sync server, it will // send a message from its thread to this object on the sync thread. This -// object is responsible for helping to ensure the appropriate sync server -// communication gets scheduled and executed. The response, if any, will be -// returned to the non-blocking sync type's thread eventually. +// object ensures the appropriate sync server communication gets scheduled and +// executed. The response, if any, will be returned to the non-blocking sync +// type's thread eventually. // // This object also has a role to play in communications in the opposite // direction. Sometimes the sync thread will receive changes from the sync @@ -49,8 +51,8 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore public: NonBlockingTypeProcessorCore( ModelType type, - scoped_refptr<base::SequencedTaskRunner> processor_task_runner, - base::WeakPtr<NonBlockingTypeProcessor> processor); + const DataTypeState& initial_state, + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface); virtual ~NonBlockingTypeProcessorCore(); ModelType GetModelType() const; @@ -69,20 +71,57 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore virtual void PassiveApplyUpdates(sessions::StatusController* status) OVERRIDE; // Entry point for NonBlockingTypeProcessor to send commit requests. - void RequestCommits(const CommitRequestDataList& request_list); + void EnqueueForCommit(const CommitRequestDataList& request_list); // CommitContributor implementation. virtual scoped_ptr<CommitContribution> GetContribution( size_t max_entries) OVERRIDE; + // Callback for when our contribution gets a response. + void OnCommitResponse(const CommitResponseDataList& response_list); + base::WeakPtr<NonBlockingTypeProcessorCore> AsWeakPtr(); private: + typedef std::map<std::string, SyncThreadSyncEntity*> EntityMap; + + // Stores a single commit request in this object's internal state. + void StorePendingCommit(const CommitRequestData& request); + + // Returns true if all data type state required for commits is available. In + // practice, this means that it returns true from the time this object first + // receives notice of a successful update fetch from the server. + bool CanCommitItems() const; + + // Initializes the parts of a commit entity that are the responsibility of + // this class, and not the SyncThreadSyncEntity. Some fields, like the + // client-assigned ID, can only be set by an entity with knowledge of the + // entire data type's state. + void HelpInitializeCommitEntity(sync_pb::SyncEntity* commit_entity); + ModelType type_; - sync_pb::DataTypeProgressMarker progress_marker_; - scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; - base::WeakPtr<NonBlockingTypeProcessor> processor_; + // State that applies to the entire model type. + DataTypeState data_type_state_; + + // Abstraction around the NonBlockingTypeProcessor so this class + // doesn't need to know about its specific implementation or + // which thread it's on. This makes it easier to write tests. + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface_; + + // A map of per-entity information known to this object. + // + // When commits are pending, their information is stored here. This + // information is dropped from memory when the commit succeeds or gets + // cancelled. + // + // This also stores some information related to received server state in + // order to implement reflection blocking and conflict detection. This + // information is kept in memory indefinitely. With a bit more coordination + // with the model thread, we could optimize this to reduce memory usage in + // the steady state. + EntityMap entities_; + STLValueDeleter<EntityMap> entities_deleter_; base::WeakPtrFactory<NonBlockingTypeProcessorCore> weak_ptr_factory_; }; diff --git a/sync/engine/non_blocking_type_processor_core_unittest.cc b/sync/engine/non_blocking_type_processor_core_unittest.cc new file mode 100644 index 0000000..87b5647 --- /dev/null +++ b/sync/engine/non_blocking_type_processor_core_unittest.cc @@ -0,0 +1,961 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_processor_core.h" + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/callback.h" +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/engine/non_blocking_type_commit_contribution.h" +#include "sync/engine/non_blocking_type_processor_interface.h" +#include "sync/protocol/sync.pb.h" +#include "sync/sessions/status_controller.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" + +#include "testing/gtest/include/gtest/gtest.h" + +using google::protobuf::RepeatedPtrField; + +static const std::string kTypeParentId = "PrefsRootNodeID"; + +namespace syncer { + +class NonBlockingTypeProcessorCoreTest; + +namespace { + +class MockNonBlockingTypeProcessor : public NonBlockingTypeProcessorInterface { + public: + MockNonBlockingTypeProcessor(NonBlockingTypeProcessorCoreTest* parent); + virtual ~MockNonBlockingTypeProcessor(); + + virtual void ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) OVERRIDE; + virtual void ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) OVERRIDE; + + private: + NonBlockingTypeProcessorCoreTest* parent_; + + DISALLOW_COPY_AND_ASSIGN(MockNonBlockingTypeProcessor); +}; + +} // namespace + +// Tests the NonBlockingTypeProcessorCore. +// +// This class passes messages between the model thread and sync server. +// As such, its code is subject to lots of different race conditions. This +// test harness lets us exhaustively test all possible races. We try to +// focus on just a few interesting cases. +// +// Inputs: +// - Initial data type state from the model thread. +// - Commit requests from the model thread. +// - Update responses from the server. +// - Commit responses from the server. +// +// Outputs: +// - Commit requests to the server. +// - Commit responses to the model thread. +// - Update responses to the model thread. +// - Nudges to the sync scheduler. +// +// We use the MockNonBlockingTypeProcessor to stub out all communication +// with the model thread. That interface is synchronous, which makes it +// much easier to test races. +// +// The interface with the server is built around "pulling" data from this +// class, so we don't have to mock out any of it. We wrap it with some +// convenience functions to we can emulate server behavior. +class NonBlockingTypeProcessorCoreTest : public ::testing::Test { + public: + NonBlockingTypeProcessorCoreTest(); + virtual ~NonBlockingTypeProcessorCoreTest(); + + // One of these Initialize functions should be called at the beginning of + // each test. + + // Initializes with no data type state. We will be unable to perform any + // significant server action until we receive an update response that + // contains the type root node for this type. + void FirstInitialize(); + + // Initializes with some existing data type state. Allows us to start + // committing items right away. + void NormalInitialize(); + + // Initialize with a custom initial DataTypeState. + void InitializeWithState(const DataTypeState& state); + + // Modifications on the model thread that get sent to the core under test. + void CommitRequest(const std::string& tag, const std::string& value); + void DeleteRequest(const std::string& tag); + + // Pretends to receive update messages from the server. + void TriggerTypeRootUpdateFromServer(); + void TriggerUpdateFromServer(int64 version_offset, + const std::string& tag, + const std::string& value); + void TriggerTombstoneFromServer(int64 version_offset, const std::string& tag); + + // Callbacks from the mock processor. Called when the |core_| tries to send + // messages to its associated processor on the model thread. + void OnModelThreadReceivedCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list); + void OnModelThreadReceivedUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list); + + // By default, this harness behaves as if all tasks posted to the model + // thread are executed immediately. However, this is not necessarily true. + // The model's TaskRunner has a queue, and the tasks we post to it could + // linger there for a while. In the meantime, the model thread could + // continue posting tasks to the core based on its stale state. + // + // If you want to test those race cases, then these functions are for you. + void SetModelThreadIsSynchronous(bool is_synchronous); + void PumpModelThread(); + + // Returns true if the |core_| is ready to commit something. + bool WillCommit(); + + // Pretend to successfully commit all outstanding unsynced items. + // It is safe to call this only if WillCommit() returns true. + void DoSuccessfulCommit(); + + // Read commit messages the core_ sent to the emulated server. + size_t GetNumCommitMessagesOnServer() const; + sync_pb::ClientToServerMessage GetNthCommitMessageOnServer(size_t n) const; + + // Read the latest version of sync entities committed to the emulated server. + bool HasCommitEntityOnServer(const std::string& tag) const; + sync_pb::SyncEntity GetLatestCommitEntityOnServer( + const std::string& tag) const; + + // Read the latest update messages received on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + size_t GetNumModelThreadUpdateResponses() const; + UpdateResponseDataList GetNthModelThreadUpdateResponse(size_t n) const; + DataTypeState GetNthModelThreadUpdateState(size_t n) const; + + // Reads the latest update response datas on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + bool HasUpdateResponseOnModelThread(const std::string& tag) const; + UpdateResponseData GetUpdateResponseOnModelThread( + const std::string& tag) const; + + // Read the latest commit messages received on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + size_t GetNumModelThreadCommitResponses() const; + CommitResponseDataList GetNthModelThreadCommitResponse(size_t n) const; + DataTypeState GetNthModelThreadCommitState(size_t n) const; + + // Reads the latest commit response datas on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + bool HasCommitResponseOnModelThread(const std::string& tag) const; + CommitResponseData GetCommitResponseOnModelThread( + const std::string& tag) const; + + // Helpers for building various messages and structures. + static std::string GenerateId(const std::string& tag_hash); + static std::string GenerateTagHash(const std::string& tag); + static sync_pb::EntitySpecifics GenerateSpecifics(const std::string& tag, + const std::string& value); + + private: + // Get and set our emulated server state. + int64 GetServerVersion(const std::string& tag_hash); + void SetServerVersion(const std::string& tag_hash, int64 version); + + // Get and set our emulated model thread state. + int64 GetCurrentSequenceNumber(const std::string& tag_hash) const; + int64 GetNextSequenceNumber(const std::string& tag_hash); + int64 GetModelVersion(const std::string& tag_hash) const; + void SetModelVersion(const std::string& tag_hash, int64 version); + + // Receive a commit response in the emulated model thread. + // + // Kept in a separate Impl method so we can emulate deferred task processing. + // See SetModelThreadIsSynchronous() for details. + void ModelThreadReceiveCommitResponseImpl( + const DataTypeState& type_state, + const CommitResponseDataList& response_list); + + // Receive an update response in the emulated model thread. + // + // Kept in a separate Impl method so we can emulate deferred task processing. + // See SetModelThreadIsSynchronous() for details. + void ModelThreadReceiveUpdateResponseImpl( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list); + + // Builds a fake progress marker for our response. + sync_pb::DataTypeProgressMarker GenerateResponseProgressMarker() const; + + scoped_ptr<NonBlockingTypeProcessorCore> core_; + MockNonBlockingTypeProcessor* mock_processor_; + + // Model thread state maps. + std::map<const std::string, int64> model_sequence_numbers_; + std::map<const std::string, int64> model_base_versions_; + + // Server state maps. + std::map<const std::string, int64> server_versions_; + + // Logs of messages sent to the server. Used in assertions. + std::map<const std::string, sync_pb::SyncEntity> committed_items_; + std::vector<sync_pb::ClientToServerMessage> commit_messages_; + + // State related to emulation of the model thread's task queue. Used to + // defer model thread work to simulate a full model thread task runner queue. + bool model_thread_is_synchronous_; + std::vector<base::Closure> model_thread_tasks_; + + // A cache of messages sent to the model thread. + std::vector<CommitResponseDataList> commit_responses_to_model_thread_; + std::vector<UpdateResponseDataList> updates_responses_to_model_thread_; + std::vector<DataTypeState> updates_states_to_model_thread_; + std::vector<DataTypeState> commit_states_to_model_thread_; + + // A cache of the latest responses on the model thread, by client tag. + std::map<const std::string, CommitResponseData> + model_thread_commit_response_items_; + std::map<const std::string, UpdateResponseData> + model_thread_update_response_items_; +}; + +// These had to wait until the class definition of +// NonBlockingTypeProcessorCoreTest +MockNonBlockingTypeProcessor::MockNonBlockingTypeProcessor( + NonBlockingTypeProcessorCoreTest* parent) + : parent_(parent) { +} + +MockNonBlockingTypeProcessor::~MockNonBlockingTypeProcessor() { +} + +void MockNonBlockingTypeProcessor::ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + parent_->OnModelThreadReceivedCommitResponse(type_state, response_list); +} + +void MockNonBlockingTypeProcessor::ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + parent_->OnModelThreadReceivedUpdateResponse(type_state, response_list); +} + +NonBlockingTypeProcessorCoreTest::NonBlockingTypeProcessorCoreTest() + : model_thread_is_synchronous_(true) { +} + +NonBlockingTypeProcessorCoreTest::~NonBlockingTypeProcessorCoreTest() { +} + +void NonBlockingTypeProcessorCoreTest::FirstInitialize() { + DataTypeState initial_state; + initial_state.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(PREFERENCES)); + initial_state.next_client_id = 0; + + InitializeWithState(initial_state); +} + +void NonBlockingTypeProcessorCoreTest::NormalInitialize() { + DataTypeState initial_state; + initial_state.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(PREFERENCES)); + initial_state.progress_marker.set_token("some_saved_progress_token"); + + initial_state.next_client_id = 10; + initial_state.type_root_id = kTypeParentId; + initial_state.initial_sync_done = true; + + InitializeWithState(initial_state); +} + +void NonBlockingTypeProcessorCoreTest::InitializeWithState( + const DataTypeState& state) { + DCHECK(!core_); + + // We don't get to own this interace. The |core_| keeps a scoped_ptr to it. + mock_processor_ = new MockNonBlockingTypeProcessor(this); + scoped_ptr<NonBlockingTypeProcessorInterface> interface(mock_processor_); + + core_.reset( + new NonBlockingTypeProcessorCore(PREFERENCES, state, interface.Pass())); +} + +void NonBlockingTypeProcessorCoreTest::CommitRequest(const std::string& tag, + const std::string& value) { + const std::string tag_hash = GenerateTagHash(tag); + const int64 base_version = GetModelVersion(tag_hash); + + CommitRequestData data; + + // Initial commits don't have IDs. Everything else does. + if (base_version > kUncommittedVersion) { + data.id = GenerateId(tag_hash); + } + + data.client_tag_hash = tag_hash; + data.sequence_number = GetNextSequenceNumber(tag_hash); + + data.base_version = base_version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.mtime = data.ctime + base::TimeDelta::FromSeconds(base_version); + data.non_unique_name = tag; + + data.deleted = false; + data.specifics = GenerateSpecifics(tag, value); + + CommitRequestDataList list; + list.push_back(data); + + core_->EnqueueForCommit(list); +} + +void NonBlockingTypeProcessorCoreTest::DeleteRequest(const std::string& tag) { + const std::string tag_hash = GenerateTagHash(tag); + const int64 base_version = GetModelVersion(tag_hash); + CommitRequestData data; + + // Requests to commit server-unknown items don't have IDs. + // We'll never send a deletion for a server-unknown item, but the model is + // allowed to request that we do. + if (base_version > kUncommittedVersion) { + data.id = GenerateId(tag_hash); + } + + data.client_tag_hash = tag_hash; + data.sequence_number = GetNextSequenceNumber(tag_hash); + + data.base_version = base_version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.client_tag_hash = tag_hash; + data.mtime = data.ctime + base::TimeDelta::FromSeconds(base_version); + data.deleted = true; + + CommitRequestDataList list; + list.push_back(data); + + core_->EnqueueForCommit(list); +} + +void NonBlockingTypeProcessorCoreTest::TriggerTypeRootUpdateFromServer() { + sync_pb::SyncEntity entity; + + entity.set_id_string(kTypeParentId); + entity.set_parent_id_string("r"); + entity.set_version(1000); + entity.set_ctime(TimeToProtoTime(base::Time::UnixEpoch())); + entity.set_mtime(TimeToProtoTime(base::Time::UnixEpoch())); + entity.set_server_defined_unique_tag(ModelTypeToRootTag(PREFERENCES)); + entity.set_deleted(false); + AddDefaultFieldValue(PREFERENCES, entity.mutable_specifics()); + + const sync_pb::DataTypeProgressMarker& progress = + GenerateResponseProgressMarker(); + const sync_pb::DataTypeContext blank_context; + sessions::StatusController dummy_status; + + SyncEntityList entity_list; + entity_list.push_back(&entity); + + core_->ProcessGetUpdatesResponse( + progress, blank_context, entity_list, &dummy_status); + core_->ApplyUpdates(&dummy_status); +} + +void NonBlockingTypeProcessorCoreTest::TriggerUpdateFromServer( + int64 version_offset, + const std::string& tag, + const std::string& value) { + const std::string tag_hash = GenerateTagHash(tag); + + int64 old_version = GetServerVersion(tag_hash); + int64 version = old_version + version_offset; + if (version > old_version) { + SetServerVersion(tag_hash, version); + } + + sync_pb::SyncEntity entity; + + entity.set_id_string(GenerateId(tag_hash)); + entity.set_parent_id_string(kTypeParentId); + entity.set_version(version); + + base::Time ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + base::Time mtime = ctime + base::TimeDelta::FromSeconds(version); + entity.set_ctime(TimeToProtoTime(ctime)); + entity.set_mtime(TimeToProtoTime(mtime)); + + entity.set_name(tag); + entity.set_client_defined_unique_tag(GenerateTagHash(tag)); + entity.set_deleted(false); + entity.mutable_specifics()->CopyFrom(GenerateSpecifics(tag, value)); + + SyncEntityList entity_list; + entity_list.push_back(&entity); + + const sync_pb::DataTypeProgressMarker& progress = + GenerateResponseProgressMarker(); + const sync_pb::DataTypeContext blank_context; + sessions::StatusController dummy_status; + + core_->ProcessGetUpdatesResponse( + progress, blank_context, entity_list, &dummy_status); + core_->ApplyUpdates(&dummy_status); +} + +void NonBlockingTypeProcessorCoreTest::TriggerTombstoneFromServer( + int64 version_offset, + const std::string& tag) { + const std::string tag_hash = GenerateTagHash(tag); + int64 old_version = GetServerVersion(tag_hash); + int64 version = old_version + version_offset; + if (version > old_version) { + SetServerVersion(tag_hash, version); + } + + UpdateResponseData data; + + data.id = GenerateId(tag_hash); + data.client_tag_hash = tag_hash; + data.response_version = version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.mtime = data.ctime + base::TimeDelta::FromSeconds(version); + data.non_unique_name = tag; + data.deleted = true; +} + +void NonBlockingTypeProcessorCoreTest::OnModelThreadReceivedCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + base::Closure task = base::Bind( + &NonBlockingTypeProcessorCoreTest::ModelThreadReceiveCommitResponseImpl, + base::Unretained(this), + type_state, + response_list); + model_thread_tasks_.push_back(task); + if (model_thread_is_synchronous_) + PumpModelThread(); +} + +void NonBlockingTypeProcessorCoreTest::OnModelThreadReceivedUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + base::Closure task = base::Bind( + &NonBlockingTypeProcessorCoreTest::ModelThreadReceiveUpdateResponseImpl, + base::Unretained(this), + type_state, + response_list); + model_thread_tasks_.push_back(task); + if (model_thread_is_synchronous_) + PumpModelThread(); +} + +void NonBlockingTypeProcessorCoreTest::SetModelThreadIsSynchronous( + bool is_synchronous) { + model_thread_is_synchronous_ = is_synchronous; +} + +void NonBlockingTypeProcessorCoreTest::PumpModelThread() { + for (std::vector<base::Closure>::iterator it = model_thread_tasks_.begin(); + it != model_thread_tasks_.end(); + ++it) { + it->Run(); + } + model_thread_tasks_.clear(); +} + +bool NonBlockingTypeProcessorCoreTest::WillCommit() { + scoped_ptr<CommitContribution> contribution(core_->GetContribution(INT_MAX)); + + if (contribution) { + contribution->CleanUp(); // Gracefully abort the commit. + return true; + } else { + return false; + } +} + +// Conveniently, this is all one big synchronous operation. The sync thread +// remains blocked while the commit is in progress, so we don't need to worry +// about other tasks being run between the time when the commit request is +// issued and the time when the commit response is received. +void NonBlockingTypeProcessorCoreTest::DoSuccessfulCommit() { + DCHECK(WillCommit()); + scoped_ptr<CommitContribution> contribution(core_->GetContribution(INT_MAX)); + + sync_pb::ClientToServerMessage message; + contribution->AddToCommitMessage(&message); + commit_messages_.push_back(message); + + sync_pb::ClientToServerResponse response; + sync_pb::CommitResponse* commit_response = response.mutable_commit(); + + const RepeatedPtrField<sync_pb::SyncEntity>& entries = + message.commit().entries(); + for (RepeatedPtrField<sync_pb::SyncEntity>::const_iterator it = + entries.begin(); + it != entries.end(); + ++it) { + const std::string tag_hash = it->client_defined_unique_tag(); + + committed_items_[tag_hash] = *it; + + // Every commit increments the version number. + int64 version = GetServerVersion(tag_hash); + version++; + SetServerVersion(tag_hash, version); + + sync_pb::CommitResponse_EntryResponse* entryresponse = + commit_response->add_entryresponse(); + entryresponse->set_response_type(sync_pb::CommitResponse::SUCCESS); + entryresponse->set_id_string(GenerateId(tag_hash)); + entryresponse->set_parent_id_string(it->parent_id_string()); + entryresponse->set_version(version); + entryresponse->set_name(it->name()); + entryresponse->set_mtime(it->mtime()); + } + + sessions::StatusController dummy_status; + contribution->ProcessCommitResponse(response, &dummy_status); + contribution->CleanUp(); +} + +size_t NonBlockingTypeProcessorCoreTest::GetNumCommitMessagesOnServer() const { + return commit_messages_.size(); +} + +sync_pb::ClientToServerMessage +NonBlockingTypeProcessorCoreTest::GetNthCommitMessageOnServer(size_t n) const { + DCHECK_LT(n, GetNumCommitMessagesOnServer()); + return commit_messages_[n]; +} + +bool NonBlockingTypeProcessorCoreTest::HasCommitEntityOnServer( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, sync_pb::SyncEntity>::const_iterator it = + committed_items_.find(tag_hash); + return it != committed_items_.end(); +} + +sync_pb::SyncEntity +NonBlockingTypeProcessorCoreTest::GetLatestCommitEntityOnServer( + const std::string& tag) const { + DCHECK(HasCommitEntityOnServer(tag)); + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, sync_pb::SyncEntity>::const_iterator it = + committed_items_.find(tag_hash); + return it->second; +} + +size_t NonBlockingTypeProcessorCoreTest::GetNumModelThreadUpdateResponses() + const { + return updates_responses_to_model_thread_.size(); +} + +UpdateResponseDataList +NonBlockingTypeProcessorCoreTest::GetNthModelThreadUpdateResponse( + size_t n) const { + DCHECK(GetNumModelThreadUpdateResponses()); + return updates_responses_to_model_thread_[n]; +} + +DataTypeState NonBlockingTypeProcessorCoreTest::GetNthModelThreadUpdateState( + size_t n) const { + DCHECK(GetNumModelThreadUpdateResponses()); + return updates_states_to_model_thread_[n]; +} + +bool NonBlockingTypeProcessorCoreTest::HasUpdateResponseOnModelThread( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, UpdateResponseData>::const_iterator it = + model_thread_update_response_items_.find(tag_hash); + return it != model_thread_update_response_items_.end(); +} + +UpdateResponseData +NonBlockingTypeProcessorCoreTest::GetUpdateResponseOnModelThread( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + DCHECK(HasUpdateResponseOnModelThread(tag)); + std::map<const std::string, UpdateResponseData>::const_iterator it = + model_thread_update_response_items_.find(tag_hash); + return it->second; +} + +size_t NonBlockingTypeProcessorCoreTest::GetNumModelThreadCommitResponses() + const { + return commit_responses_to_model_thread_.size(); +} + +CommitResponseDataList +NonBlockingTypeProcessorCoreTest::GetNthModelThreadCommitResponse( + size_t n) const { + DCHECK(GetNumModelThreadCommitResponses()); + return commit_responses_to_model_thread_[n]; +} + +DataTypeState NonBlockingTypeProcessorCoreTest::GetNthModelThreadCommitState( + size_t n) const { + DCHECK(GetNumModelThreadCommitResponses()); + return commit_states_to_model_thread_[n]; +} + +bool NonBlockingTypeProcessorCoreTest::HasCommitResponseOnModelThread( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, CommitResponseData>::const_iterator it = + model_thread_commit_response_items_.find(tag_hash); + return it != model_thread_commit_response_items_.end(); +} + +CommitResponseData +NonBlockingTypeProcessorCoreTest::GetCommitResponseOnModelThread( + const std::string& tag) const { + DCHECK(HasCommitResponseOnModelThread(tag)); + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, CommitResponseData>::const_iterator it = + model_thread_commit_response_items_.find(tag_hash); + return it->second; +} + +std::string NonBlockingTypeProcessorCoreTest::GenerateId( + const std::string& tag_hash) { + return "FakeId:" + tag_hash; +} + +std::string NonBlockingTypeProcessorCoreTest::GenerateTagHash( + const std::string& tag) { + const std::string& client_tag_hash = + syncable::GenerateSyncableHash(PREFERENCES, tag); + + return client_tag_hash; +} + +sync_pb::EntitySpecifics NonBlockingTypeProcessorCoreTest::GenerateSpecifics( + const std::string& tag, + const std::string& value) { + sync_pb::EntitySpecifics specifics; + specifics.mutable_preference()->set_name(tag); + specifics.mutable_preference()->set_value(value); + return specifics; +} + +int64 NonBlockingTypeProcessorCoreTest::GetServerVersion( + const std::string& tag_hash) { + std::map<const std::string, int64>::const_iterator it; + it = server_versions_.find(tag_hash); + // Server versions do not necessarily start at 1 or 0. + if (it == server_versions_.end()) { + return 2048; + } else { + return it->second; + } +} + +void NonBlockingTypeProcessorCoreTest::SetServerVersion( + const std::string& tag_hash, + int64 version) { + server_versions_[tag_hash] = version; +} + +// Fetches the sequence number as of the most recent update request. +int64 NonBlockingTypeProcessorCoreTest::GetCurrentSequenceNumber( + const std::string& tag_hash) const { + std::map<const std::string, int64>::const_iterator it = + model_sequence_numbers_.find(tag_hash); + if (it == model_sequence_numbers_.end()) { + return 0; + } else { + return it->second; + } +} + +// The model thread should be sending us items with strictly increasing +// sequence numbers. Here's where we emulate that behavior. +int64 NonBlockingTypeProcessorCoreTest::GetNextSequenceNumber( + const std::string& tag_hash) { + int64 sequence_number = GetCurrentSequenceNumber(tag_hash); + sequence_number++; + model_sequence_numbers_[tag_hash] = sequence_number; + return sequence_number; +} + +// Fetches the model's base version. +int64 NonBlockingTypeProcessorCoreTest::GetModelVersion( + const std::string& tag_hash) const { + std::map<const std::string, int64>::const_iterator it = + model_base_versions_.find(tag_hash); + if (it == model_base_versions_.end()) { + return kUncommittedVersion; + } else { + return it->second; + } +} + +void NonBlockingTypeProcessorCoreTest::SetModelVersion( + const std::string& tag_hash, + int64 version) { + model_base_versions_[tag_hash] = version; +} + +void NonBlockingTypeProcessorCoreTest::ModelThreadReceiveCommitResponseImpl( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + commit_responses_to_model_thread_.push_back(response_list); + commit_states_to_model_thread_.push_back(type_state); + for (CommitResponseDataList::const_iterator it = response_list.begin(); + it != response_list.end(); + ++it) { + model_thread_commit_response_items_.insert( + std::make_pair(it->client_tag_hash, *it)); + + // Server wins. Set the model's base version. + SetModelVersion(it->client_tag_hash, it->response_version); + } +} + +void NonBlockingTypeProcessorCoreTest::ModelThreadReceiveUpdateResponseImpl( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + updates_responses_to_model_thread_.push_back(response_list); + updates_states_to_model_thread_.push_back(type_state); + for (UpdateResponseDataList::const_iterator it = response_list.begin(); + it != response_list.end(); + ++it) { + model_thread_update_response_items_.insert( + std::make_pair(it->client_tag_hash, *it)); + + // Server wins. Set the model's base version. + SetModelVersion(it->client_tag_hash, it->response_version); + } +} + +sync_pb::DataTypeProgressMarker +NonBlockingTypeProcessorCoreTest::GenerateResponseProgressMarker() const { + sync_pb::DataTypeProgressMarker progress; + progress.set_data_type_id(PREFERENCES); + progress.set_token("non_null_progress_token"); + return progress; +} + +// Requests a commit and verifies the messages sent to the client and server as +// a result. +// +// This test performs sanity checks on most of the fields in these messages. +// For the most part this is checking that the test code behaves as expected +// and the |core_| doesn't mess up its simple task of moving around these +// values. It makes sense to have one or two tests that are this thorough, but +// we shouldn't be this verbose in all tests. +TEST_F(NonBlockingTypeProcessorCoreTest, SimpleCommit) { + NormalInitialize(); + + EXPECT_FALSE(WillCommit()); + EXPECT_EQ(0U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(0U, GetNumModelThreadCommitResponses()); + + CommitRequest("tag1", "value1"); + + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + + const std::string& client_tag_hash = GenerateTagHash("tag1"); + + // Exhaustively verify the SyncEntity sent in the commit message. + ASSERT_EQ(1U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(0).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag1")); + const sync_pb::SyncEntity& entity = GetLatestCommitEntityOnServer("tag1"); + EXPECT_FALSE(entity.id_string().empty()); + EXPECT_EQ(kTypeParentId, entity.parent_id_string()); + EXPECT_EQ(kUncommittedVersion, entity.version()); + EXPECT_NE(0, entity.mtime()); + EXPECT_NE(0, entity.ctime()); + EXPECT_EQ("tag1", entity.name()); + EXPECT_EQ(client_tag_hash, entity.client_defined_unique_tag()); + EXPECT_EQ("tag1", entity.specifics().preference().name()); + EXPECT_FALSE(entity.deleted()); + EXPECT_EQ("value1", entity.specifics().preference().value()); + + // Exhaustively verify the commit response returned to the model thread. + ASSERT_EQ(1U, GetNumModelThreadCommitResponses()); + EXPECT_EQ(1U, GetNthModelThreadCommitResponse(0).size()); + ASSERT_TRUE(HasCommitResponseOnModelThread("tag1")); + const CommitResponseData& commit_response = + GetCommitResponseOnModelThread("tag1"); + + // The ID changes in a commit response to initial commit. + EXPECT_FALSE(commit_response.id.empty()); + EXPECT_NE(entity.id_string(), commit_response.id); + + EXPECT_EQ(client_tag_hash, commit_response.client_tag_hash); + EXPECT_LT(0, commit_response.response_version); +} + +TEST_F(NonBlockingTypeProcessorCoreTest, SimpleDelete) { + NormalInitialize(); + + // We can't delete an entity that was never committed. + // Step 1 is to create and commit a new entity. + CommitRequest("tag1", "value1"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + + ASSERT_TRUE(HasCommitResponseOnModelThread("tag1")); + const CommitResponseData& initial_commit_response = + GetCommitResponseOnModelThread("tag1"); + int64 base_version = initial_commit_response.response_version; + + // Now that we have an entity, we can delete it. + DeleteRequest("tag1"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + + // Verify the SyncEntity sent in the commit message. + ASSERT_EQ(2U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(1).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag1")); + const sync_pb::SyncEntity& entity = GetLatestCommitEntityOnServer("tag1"); + EXPECT_FALSE(entity.id_string().empty()); + EXPECT_EQ(GenerateTagHash("tag1"), entity.client_defined_unique_tag()); + EXPECT_EQ(base_version, entity.version()); + EXPECT_TRUE(entity.deleted()); + + // Deletions should contain enough specifics to identify the type. + EXPECT_TRUE(entity.has_specifics()); + EXPECT_EQ(PREFERENCES, GetModelTypeFromSpecifics(entity.specifics())); + + // Verify the commit response returned to the model thread. + ASSERT_EQ(2U, GetNumModelThreadCommitResponses()); + EXPECT_EQ(1U, GetNthModelThreadCommitResponse(1).size()); + ASSERT_TRUE(HasCommitResponseOnModelThread("tag1")); + const CommitResponseData& commit_response = + GetCommitResponseOnModelThread("tag1"); + + EXPECT_EQ(entity.id_string(), commit_response.id); + EXPECT_EQ(entity.client_defined_unique_tag(), + commit_response.client_tag_hash); + EXPECT_EQ(entity.version(), commit_response.response_version); +} + +// The server doesn't like it when we try to delete an entity it's never heard +// of before. This test helps ensure we avoid that scenario. +TEST_F(NonBlockingTypeProcessorCoreTest, NoDeleteUncommitted) { + NormalInitialize(); + + // Request the commit of a new, never-before-seen item. + CommitRequest("tag1", "value1"); + EXPECT_TRUE(WillCommit()); + + // Request a deletion of that item before we've had a chance to commit it. + DeleteRequest("tag1"); + EXPECT_FALSE(WillCommit()); +} + +// Verifies the sending of an "initial sync done" signal. +TEST_F(NonBlockingTypeProcessorCoreTest, SendInitialSyncDone) { + FirstInitialize(); // Initialize with no saved sync state. + EXPECT_EQ(0U, GetNumModelThreadUpdateResponses()); + + // Receive an update response that contains only the type root node. + TriggerTypeRootUpdateFromServer(); + + // Two updates: + // - One triggered by process updates to forward the type root ID. + // - One triggered by apply updates, which the core interprets to mean + // "initial sync done". This triggers a model thread update, too. + EXPECT_EQ(2U, GetNumModelThreadUpdateResponses()); + + // The type root and initial sync done updates both contain no entities. + EXPECT_EQ(0U, GetNthModelThreadUpdateResponse(0).size()); + EXPECT_EQ(0U, GetNthModelThreadUpdateResponse(1).size()); + + const DataTypeState& state = GetNthModelThreadUpdateState(1); + EXPECT_FALSE(state.progress_marker.token().empty()); + EXPECT_FALSE(state.type_root_id.empty()); + EXPECT_TRUE(state.initial_sync_done); +} + +// Commit two new entities in two separate commit messages. +TEST_F(NonBlockingTypeProcessorCoreTest, TwoNewItemsCommittedSeparately) { + NormalInitialize(); + + // Commit the first of two entities. + CommitRequest("tag1", "value1"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + ASSERT_EQ(1U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(0).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag1")); + const sync_pb::SyncEntity& tag1_entity = + GetLatestCommitEntityOnServer("tag1"); + + // Commit the second of two entities. + CommitRequest("tag2", "value2"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + ASSERT_EQ(2U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(1).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag2")); + const sync_pb::SyncEntity& tag2_entity = + GetLatestCommitEntityOnServer("tag2"); + + EXPECT_FALSE(WillCommit()); + + // The IDs assigned by the |core_| should be unique. + EXPECT_NE(tag1_entity.id_string(), tag2_entity.id_string()); + + // Check that the committed specifics values are sane. + EXPECT_EQ(tag1_entity.specifics().preference().value(), "value1"); + EXPECT_EQ(tag2_entity.specifics().preference().value(), "value2"); + + // There should have been two separate commit responses sent to the model + // thread. They should be uninteresting, so we don't bother inspecting them. + EXPECT_EQ(2U, GetNumModelThreadCommitResponses()); +} + +TEST_F(NonBlockingTypeProcessorCoreTest, ReceiveUpdates) { + NormalInitialize(); + + const std::string& tag_hash = GenerateTagHash("tag1"); + + TriggerUpdateFromServer(10, "tag1", "value1"); + + ASSERT_EQ(1U, GetNumModelThreadUpdateResponses()); + UpdateResponseDataList updates_list = GetNthModelThreadUpdateResponse(0); + ASSERT_EQ(1U, updates_list.size()); + + ASSERT_TRUE(HasUpdateResponseOnModelThread("tag1")); + UpdateResponseData update = GetUpdateResponseOnModelThread("tag1"); + + EXPECT_FALSE(update.id.empty()); + EXPECT_EQ(tag_hash, update.client_tag_hash); + EXPECT_LT(0, update.response_version); + EXPECT_FALSE(update.ctime.is_null()); + EXPECT_FALSE(update.mtime.is_null()); + EXPECT_EQ("tag1", update.non_unique_name); + EXPECT_FALSE(update.deleted); + EXPECT_EQ("tag1", update.specifics.preference().name()); + EXPECT_EQ("value1", update.specifics.preference().value()); +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_interface.cc b/sync/engine/non_blocking_type_processor_interface.cc new file mode 100644 index 0000000..b936239 --- /dev/null +++ b/sync/engine/non_blocking_type_processor_interface.cc @@ -0,0 +1,15 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_processor_interface.h" + +namespace syncer { + +NonBlockingTypeProcessorInterface::NonBlockingTypeProcessorInterface() { +} + +NonBlockingTypeProcessorInterface::~NonBlockingTypeProcessorInterface() { +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_interface.h b/sync/engine/non_blocking_type_processor_interface.h new file mode 100644 index 0000000..ccc504b --- /dev/null +++ b/sync/engine/non_blocking_type_processor_interface.h @@ -0,0 +1,31 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_ +#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_ + +#include "sync/base/sync_export.h" + +struct CommitResponseDataList; +struct DataTypeState; +struct UpdateResponseDataList; + +namespace syncer { + +class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorInterface { + public: + NonBlockingTypeProcessorInterface(); + virtual ~NonBlockingTypeProcessorInterface(); + + virtual void ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) = 0; + virtual void ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) = 0; +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_ diff --git a/sync/engine/non_blocking_type_processor_unittest.cc b/sync/engine/non_blocking_type_processor_unittest.cc index 9ab6ca3..3a3ca5c 100644 --- a/sync/engine/non_blocking_type_processor_unittest.cc +++ b/sync/engine/non_blocking_type_processor_unittest.cc @@ -134,14 +134,21 @@ class NonBlockingTypeProcessorTest : public ::testing::Test { NonBlockingTypeProcessorTest(); virtual ~NonBlockingTypeProcessorTest(); - // Explicit initialization step. Kept separate to allow tests to inject - // on-disk state before the test begins. - void Initialize(); + // Initialize with no local state. The processor will be unable to commit + // until it receives notification that initial sync has completed. + void FirstTimeInitialize(); + + // Initialize to a "ready-to-commit" state. + void InitializeToReadyState(); // Local data modification. Emulates signals from the model thread. void WriteItem(const std::string& tag, const std::string& value); void DeleteItem(const std::string& tag); + // Emulates an "initial sync done" message from the + // NonBlockingTypeProcessorCore. + void OnInitialSyncDone(); + // Emulate updates from the server. // This harness has some functionality to help emulate server behavior. // See the definitions of these methods for more information. @@ -190,11 +197,20 @@ NonBlockingTypeProcessorTest::NonBlockingTypeProcessorTest() NonBlockingTypeProcessorTest::~NonBlockingTypeProcessorTest() { } -void NonBlockingTypeProcessorTest::Initialize() { +void NonBlockingTypeProcessorTest::FirstTimeInitialize() { processor_->Enable(mock_sync_core_proxy_->Clone()); mock_processor_core_ = mock_sync_core_proxy_->GetMockProcessorCore(); } +void NonBlockingTypeProcessorTest::InitializeToReadyState() { + // TODO(rlarocque): This should be updated to inject on-disk state. + // At the time this code was written, there was no support for on-disk + // state so this was the only way to inject a data_type_state into + // the |processor_|. + FirstTimeInitialize(); + OnInitialSyncDone(); +} + void NonBlockingTypeProcessorTest::WriteItem(const std::string& tag, const std::string& value) { const std::string tag_hash = GenerateTagHash(tag); @@ -205,6 +221,13 @@ void NonBlockingTypeProcessorTest::DeleteItem(const std::string& tag) { processor_->Delete(tag); } +void NonBlockingTypeProcessorTest::OnInitialSyncDone() { + data_type_state_.initial_sync_done = true; + UpdateResponseDataList empty_update_list; + + processor_->OnUpdateReceived(data_type_state_, empty_update_list); +} + void NonBlockingTypeProcessorTest::UpdateFromServer(int64 version_offset, const std::string& tag, const std::string& value) { @@ -385,7 +408,7 @@ CommitRequestData NonBlockingTypeProcessorTest::GetLatestCommitRequestForTag( // Creates a new item locally. // Thoroughly tests the data generated by a local item creation. TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { - Initialize(); + InitializeToReadyState(); EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); @@ -396,7 +419,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { const CommitRequestData& tag1_data = GetLatestCommitRequestForTag("tag1"); EXPECT_TRUE(tag1_data.id.empty()); - EXPECT_EQ(0, tag1_data.base_version); + EXPECT_EQ(kUncommittedVersion, tag1_data.base_version); EXPECT_FALSE(tag1_data.ctime.is_null()); EXPECT_FALSE(tag1_data.mtime.is_null()); EXPECT_EQ("tag1", tag1_data.non_unique_name); @@ -408,7 +431,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { // Creates a new local item then modifies it. // Thoroughly tests data generated by modification of server-unknown item. TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { - Initialize(); + InitializeToReadyState(); EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); @@ -428,7 +451,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { // Perform a thorough examination of the update-generated request. EXPECT_TRUE(tag1_v2_data.id.empty()); - EXPECT_EQ(0, tag1_v2_data.base_version); + EXPECT_EQ(kUncommittedVersion, tag1_v2_data.base_version); EXPECT_FALSE(tag1_v2_data.ctime.is_null()); EXPECT_FALSE(tag1_v2_data.mtime.is_null()); EXPECT_EQ("tag1", tag1_v2_data.non_unique_name); @@ -440,7 +463,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { // Deletes an item we've never seen before. // Should have no effect and not crash. TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) { - Initialize(); + InitializeToReadyState(); DeleteItem("tag1"); EXPECT_EQ(0U, GetNumCommitRequestLists()); @@ -452,7 +475,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) { // server-unknown as far as the model thread is concerned. That behavior // is race-dependent; other tests are used to test other races. TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { - Initialize(); + InitializeToReadyState(); WriteItem("tag1", "value1"); EXPECT_EQ(1U, GetNumCommitRequestLists()); @@ -467,7 +490,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number); EXPECT_TRUE(tag1_v2_data.id.empty()); - EXPECT_EQ(0, tag1_v2_data.base_version); + EXPECT_EQ(kUncommittedVersion, tag1_v2_data.base_version); EXPECT_TRUE(tag1_v2_data.deleted); } @@ -477,7 +500,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { // successfully commits it, but, before the commit response is picked up // by the model thread, the item is deleted by the model thread. TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) { - Initialize(); + InitializeToReadyState(); WriteItem("tag1", "value1"); EXPECT_EQ(1U, GetNumCommitRequestLists()); @@ -501,7 +524,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) { // Creates two different sync items. // Verifies that the second has no effect on the first. TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) { - Initialize(); + InitializeToReadyState(); EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); @@ -519,6 +542,20 @@ TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) { ASSERT_TRUE(HasCommitRequestForTag("tag2")); } +// Starts the processor with no local state. +// Verify that it waits until initial sync is complete before requesting +// commits. +TEST_F(NonBlockingTypeProcessorTest, NoCommitsUntilInitialSyncDone) { + FirstTimeInitialize(); + + WriteItem("tag1", "value1"); + EXPECT_EQ(0U, GetNumCommitRequestLists()); + + OnInitialSyncDone(); + EXPECT_EQ(1U, GetNumCommitRequestLists()); + EXPECT_TRUE(HasCommitRequestForTag("tag1")); +} + // TODO(rlarocque): Add more testing of non_unique_name fields. } // namespace syncer diff --git a/sync/engine/sync_thread_sync_entity.cc b/sync/engine/sync_thread_sync_entity.cc new file mode 100644 index 0000000..21899d2 --- /dev/null +++ b/sync/engine/sync_thread_sync_entity.cc @@ -0,0 +1,242 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_thread_sync_entity.h" + +#include "base/logging.h" +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" + +namespace syncer { + +SyncThreadSyncEntity* SyncThreadSyncEntity::FromServerUpdate( + const std::string& id_string, + const std::string& client_tag_hash, + int64 received_version) { + return new SyncThreadSyncEntity( + id_string, client_tag_hash, 0, received_version); +} + +SyncThreadSyncEntity* SyncThreadSyncEntity::FromCommitRequest( + const std::string& id_string, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics) { + return new SyncThreadSyncEntity(id_string, + client_tag_hash, + 0, + 0, + true, + sequence_number, + base_version, + ctime, + mtime, + non_unique_name, + deleted, + specifics); +} + +// Constructor that does not set any pending commit fields. +SyncThreadSyncEntity::SyncThreadSyncEntity( + const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version) + : id_(id), + client_tag_hash_(client_tag_hash), + highest_commit_response_version_(highest_commit_response_version), + highest_gu_response_version_(highest_gu_response_version), + is_commit_pending_(false), + sequence_number_(0), + base_version_(0), + deleted_(false) { +} + +SyncThreadSyncEntity::SyncThreadSyncEntity( + const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version, + bool is_commit_pending, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics) + : id_(id), + client_tag_hash_(client_tag_hash), + highest_commit_response_version_(highest_commit_response_version), + highest_gu_response_version_(highest_gu_response_version), + is_commit_pending_(is_commit_pending), + sequence_number_(sequence_number), + base_version_(base_version), + ctime_(ctime), + mtime_(mtime), + non_unique_name_(non_unique_name), + deleted_(deleted), + specifics_(specifics) { +} + +SyncThreadSyncEntity::~SyncThreadSyncEntity() { +} + +bool SyncThreadSyncEntity::IsCommitPending() const { + return is_commit_pending_; +} + +void SyncThreadSyncEntity::PrepareCommitProto( + sync_pb::SyncEntity* commit_entity, + int64* sequence_number) const { + // Set ID if we have a server-assigned ID. Otherwise, it will be up to + // our caller to assign a client-unique initial ID. + if (base_version_ != kUncommittedVersion) { + commit_entity->set_id_string(id_); + } + + commit_entity->set_client_defined_unique_tag(client_tag_hash_); + commit_entity->set_version(base_version_); + commit_entity->set_deleted(deleted_); + commit_entity->set_folder(false); + commit_entity->set_name(non_unique_name_); + if (!deleted_) { + commit_entity->set_ctime(TimeToProtoTime(ctime_)); + commit_entity->set_mtime(TimeToProtoTime(mtime_)); + commit_entity->mutable_specifics()->CopyFrom(specifics_); + } + + *sequence_number = sequence_number_; +} + +void SyncThreadSyncEntity::RequestCommit( + const std::string& id, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics) { + DCHECK_GE(base_version, base_version_) + << "Base version should never decrease"; + + DCHECK_GE(sequence_number, sequence_number_) + << "Sequence number should never decrease"; + + // Update our book-keeping counters. + base_version_ = base_version; + sequence_number_ = sequence_number; + + // Do our counter values indicate a conflict? If so, don't commit. + // + // There's no need to inform the model thread of the conflict. The + // conflicting update has already been posted to its task runner; it will + // figure it out as soon as it runs that task. + is_commit_pending_ = true; + if (IsInConflict()) { + ClearPendingCommit(); + return; + } + + // We don't commit deletions of server-unknown items. + if (deleted && !IsServerKnown()) { + ClearPendingCommit(); + return; + } + + // Otherwise, we should store the data associated with this pending commit + // so we're ready to commit at the next possible opportunity. + + // We intentionally don't update the id_ here. Good ID values come from the + // server and always pass through the sync thread first. There's no way the + // model thread could have a better ID value than we do. + + // This entity is identified by its client tag. That value can never change. + DCHECK_EQ(client_tag_hash_, client_tag_hash); + + // Set the fields for the pending commit. + ctime_ = ctime; + mtime_ = mtime; + non_unique_name_ = non_unique_name; + deleted_ = deleted; + specifics_ = specifics; +} + +void SyncThreadSyncEntity::ReceiveCommitResponse(const std::string& response_id, + int64 response_version, + int64 sequence_number) { + // Commit responses, especially after the first commit, can update our ID. + id_ = response_id; + + DCHECK_GT(response_version, highest_commit_response_version_) + << "Had expected higher response version." + << " id: " << id_; + + // Commits are synchronous, so there's no reason why the sequence numbers + // wouldn't match. + DCHECK_EQ(sequence_number_, sequence_number) + << "Unexpected sequence number mismatch." + << " id: " << id_; + + highest_commit_response_version_ = response_version; + + // Because an in-progress commit blocks the sync thread, we can assume that + // the item we just committed successfully is exactly the one we have now. + // Nothing changed it while the commit was happening. Since we're now in + // sync with the server, we can clear the pending commit. + ClearPendingCommit(); +} + +void SyncThreadSyncEntity::ReceiveUpdate(int64 version) { + highest_gu_response_version_ = + std::max(highest_gu_response_version_, version); + + if (IsInConflict()) { + // Incoming update clobbers the pending commit on the sync thread. + // The model thread can re-request this commit later if it wants to. + ClearPendingCommit(); + } +} + +bool SyncThreadSyncEntity::IsInConflict() const { + if (!is_commit_pending_) + return false; + + if (highest_gu_response_version_ <= highest_commit_response_version_) { + // The most recent server state was created in a commit made by this + // client. We're fully up to date, and therefore not in conflict. + return false; + } else { + // The most recent server state was written by someone else. + // Did the model thread have the most up to date version when it issued the + // commit request? + if (base_version_ >= highest_gu_response_version_) { + return false; // Yes. + } else { + return true; // No. + } + } +} + +bool SyncThreadSyncEntity::IsServerKnown() const { + return base_version_ != kUncommittedVersion; +} + +void SyncThreadSyncEntity::ClearPendingCommit() { + is_commit_pending_ = false; + + // Clearing the specifics might free up some memory. It can't hurt to try. + specifics_.Clear(); +} + +} // namespace syncer diff --git a/sync/engine/sync_thread_sync_entity.h b/sync/engine/sync_thread_sync_entity.h new file mode 100644 index 0000000..6ed9685 --- /dev/null +++ b/sync/engine/sync_thread_sync_entity.h @@ -0,0 +1,155 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_ +#define SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_ + +#include <string> + +#include "base/basictypes.h" +#include "base/time/time.h" +#include "sync/base/sync_export.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +// Manages the pending commit and update state for an entity on the sync +// thread. +// +// It should be considered a helper class internal to the +// NonBlockingTypeProcessorCore. +// +// Maintains the state associated with a particular sync entity which is +// necessary for decision-making on the sync thread. It can track pending +// commit state, received update state, and can detect conflicts. +// +// This object may or may not contain state associated with a pending commit. +// If no commit is pending, the |is_commit_pending_| flag will be set to false +// and many of this object's fields will be cleared. +class SYNC_EXPORT SyncThreadSyncEntity { + public: + ~SyncThreadSyncEntity(); + + // Initialize a new entity based on an update response. + static SyncThreadSyncEntity* FromServerUpdate( + const std::string& id_string, + const std::string& client_tag_hash, + int64 version); + + // Initialize a new entity based on a commit request. + static SyncThreadSyncEntity* FromCommitRequest( + const std::string& id_string, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics); + + // Returns true if this entity should be commited to the server. + bool IsCommitPending() const; + + // Populates a sync_pb::SyncEntity for a commit. Also sets the + // |sequence_number|, so we can track it throughout the commit process. + void PrepareCommitProto(sync_pb::SyncEntity* commit_entity, + int64* sequence_number) const; + + // Updates this entity with data from the latest version that the + // model asked us to commit. May clobber state related to the + // model's previous commit attempt(s). + void RequestCommit(const std::string& id, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics); + + // Handles the receipt of a commit response. + // + // Since commits happen entirely on the sync thread, we can safely assume + // that our item's state at the end of the commit is the same as it was at + // the start. + void ReceiveCommitResponse(const std::string& response_id, + int64 response_version, + int64 sequence_number); + + // Handles receipt of an update from the server. + void ReceiveUpdate(int64 version); + + private: + // Initializes received update state. Does not initialize state related to + // pending commits and sets |is_commit_pending_| to false. + SyncThreadSyncEntity(const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version); + + // Initializes all fields. Sets |is_commit_pending_| to true. + SyncThreadSyncEntity(const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version, + bool is_commit_pending, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics); + + // Checks if the current state indicates a conflict. + // + // This can be true only while a call to this object is in progress. + // Conflicts are always cleared before the method call ends. + bool IsInConflict() const; + + // Checks if the server knows about this item. + bool IsServerKnown() const; + + // Clears flag and optionally clears state associated with a pending commit. + void ClearPendingCommit(); + + // The ID for this entry. May be empty if the entry has never been committed. + std::string id_; + + // The hashed client tag for this entry. + std::string client_tag_hash_; + + // The highest version seen in a commit response for this entry. + int64 highest_commit_response_version_; + + // The highest version seen in a GU response for this entry. + int64 highest_gu_response_version_; + + // Flag that indicates whether or not we're waiting for a chance to commit + // this item. + bool is_commit_pending_; + + // Used to track in-flight commit requests on the model thread. All we need + // to do here is return it back to the model thread when the pending commit + // is completed and confirmed. Not valid if no commit is pending. + int64 sequence_number_; + + // The following fields are valid only when a commit is pending. + // This is where we store the data that is to be sent up to the server + // at the next possible opportunity. + int64 base_version_; + base::Time ctime_; + base::Time mtime_; + std::string non_unique_name_; + bool deleted_; + sync_pb::EntitySpecifics specifics_; + + DISALLOW_COPY_AND_ASSIGN(SyncThreadSyncEntity); +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_ diff --git a/sync/engine/sync_thread_sync_entity_unittest.cc b/sync/engine/sync_thread_sync_entity_unittest.cc new file mode 100644 index 0000000..be2604b --- /dev/null +++ b/sync/engine/sync_thread_sync_entity_unittest.cc @@ -0,0 +1,164 @@ + +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_thread_sync_entity.h" + +#include "base/memory/scoped_ptr.h" +#include "base/time/time.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace syncer { + +// Some simple tests for the SyncThreadSyncEntity. +// +// The SyncThreadSyncEntity is an implementation detail of the +// NonBlockingTypeProcessorCore. As such, it doesn't make much sense to test +// it exhaustively, since it already gets a lot of test coverage from the +// NonBlockingTypeProcessorCore unit tests. +// +// These tests are intended as a basic sanity check. Anything more complicated +// would be redundant. +class SyncThreadSyncEntityTest : public ::testing::Test { + public: + SyncThreadSyncEntityTest() + : kServerId("ServerID"), + kClientTag("some.sample.tag"), + kClientTagHash(syncable::GenerateSyncableHash(PREFERENCES, kClientTag)), + kCtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(10)), + kMtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(20)) { + specifics.mutable_preference()->set_name(kClientTag); + specifics.mutable_preference()->set_value("pref.value"); + } + + virtual ~SyncThreadSyncEntityTest() {} + + const std::string kServerId; + const std::string kClientTag; + const std::string kClientTagHash; + const base::Time kCtime; + const base::Time kMtime; + sync_pb::EntitySpecifics specifics; +}; + +// Construct a new entity from a server update. Then receive another update. +TEST_F(SyncThreadSyncEntityTest, FromServerUpdate) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10)); + EXPECT_FALSE(entity->IsCommitPending()); + + entity->ReceiveUpdate(20); + EXPECT_FALSE(entity->IsCommitPending()); +} + +// Construct a new entity from a commit request. Then serialize it. +TEST_F(SyncThreadSyncEntityTest, FromCommitRequest) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromCommitRequest(kServerId, + kClientTagHash, + 22, + 33, + kCtime, + kMtime, + kClientTag, + false, + specifics)); + + ASSERT_TRUE(entity->IsCommitPending()); + sync_pb::SyncEntity pb_entity; + int64 sequence_number = 0; + entity->PrepareCommitProto(&pb_entity, &sequence_number); + EXPECT_EQ(22, sequence_number); + EXPECT_EQ(kServerId, pb_entity.id_string()); + EXPECT_EQ(kClientTagHash, pb_entity.client_defined_unique_tag()); + EXPECT_EQ(33, pb_entity.version()); + EXPECT_EQ(kCtime, ProtoTimeToTime(pb_entity.ctime())); + EXPECT_EQ(kMtime, ProtoTimeToTime(pb_entity.mtime())); + EXPECT_FALSE(pb_entity.deleted()); + EXPECT_EQ(specifics.preference().name(), + pb_entity.specifics().preference().name()); + EXPECT_EQ(specifics.preference().value(), + pb_entity.specifics().preference().value()); +} + +// Start with a server initiated entity. Commit over top of it. +TEST_F(SyncThreadSyncEntityTest, RequestCommit) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10)); + + entity->RequestCommit(kServerId, + kClientTagHash, + 1, + 10, + kCtime, + kMtime, + kClientTag, + false, + specifics); + + EXPECT_TRUE(entity->IsCommitPending()); +} + +// Start with a server initiated entity. Fail to request a commit because of +// an out of date base version. +TEST_F(SyncThreadSyncEntityTest, RequestCommitFailure) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10)); + EXPECT_FALSE(entity->IsCommitPending()); + + entity->RequestCommit(kServerId, + kClientTagHash, + 23, + 5, // Version 5 < 10 + kCtime, + kMtime, + kClientTag, + false, + specifics); + EXPECT_FALSE(entity->IsCommitPending()); +} + +// Start with a pending commit. Clobber it with an incoming update. +TEST_F(SyncThreadSyncEntityTest, UpdateClobbersCommit) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromCommitRequest(kServerId, + kClientTagHash, + 22, + 33, + kCtime, + kMtime, + kClientTag, + false, + specifics)); + + EXPECT_TRUE(entity->IsCommitPending()); + + entity->ReceiveUpdate(400); // Version 400 > 33. + EXPECT_FALSE(entity->IsCommitPending()); +} + +// Start with a pending commit. Send it a reflected update that +// will not override the in-progress commit. +TEST_F(SyncThreadSyncEntityTest, ReflectedUpdateDoesntClobberCommit) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromCommitRequest(kServerId, + kClientTagHash, + 22, + 33, + kCtime, + kMtime, + kClientTag, + false, + specifics)); + + EXPECT_TRUE(entity->IsCommitPending()); + + entity->ReceiveUpdate(33); // Version 33 == 33. + EXPECT_TRUE(entity->IsCommitPending()); +} + +} // namespace syncer |