diff options
author | rlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-06-05 18:05:33 +0000 |
---|---|---|
committer | rlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-06-05 18:05:33 +0000 |
commit | f233b14dadfea6651036f2fcfce3c6cdb5f1503d (patch) | |
tree | f0b38cd358b6f67fc241d34d6326a6091141efdf /sync | |
parent | 8a3f8248312a5c0e49006489ee7bb67d9a48cfd4 (diff) | |
download | chromium_src-f233b14dadfea6651036f2fcfce3c6cdb5f1503d.zip chromium_src-f233b14dadfea6651036f2fcfce3c6cdb5f1503d.tar.gz chromium_src-f233b14dadfea6651036f2fcfce3c6cdb5f1503d.tar.bz2 |
sync: Implement NonBlockingTypeProcessorCore
Introduces the second half of the non-blocking sync engine. For now,
most of the classes invovled are never instantiated outside of tests.
Adds NonBlockingTypeProcessorCore, the sync thread component of
non-blocking sync. It coordinates between the sync server and the
NonBlockingTypeProcessor that lives on the model thread. The
SyncThreadSyncEntity exists to help it handle keep track of the
in-flight sync entities.
The NonBlockingTypeProcessorCore interacts with the sync thread
components by implementing both the UpdateHandler and CommitContributor
interfaces. This allows it to take part in commit and update operations
that are managed by the syncer.
As part of its implementation of the CommitContributor interface, the
NonBlockingTypeProcessorCore introduces
a NonBlockingTypeProcessorCommitContribution class to manage its
contribution to a commit request and associated it with the response.
This CL includes a large amount of test framework code to help test the
NonBlockingTypeProcessorCore.
Makes the SyncEntityToValue function in proto_value_conversions.h
public to enable more informative debug messages.
BUG=351005
Review URL: https://codereview.chromium.org/299963002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@275187 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'sync')
20 files changed, 2173 insertions, 57 deletions
diff --git a/sync/engine/model_thread_sync_entity.cc b/sync/engine/model_thread_sync_entity.cc index fde2eb3..6e466f5 100644 --- a/sync/engine/model_thread_sync_entity.cc +++ b/sync/engine/model_thread_sync_entity.cc @@ -15,7 +15,7 @@ scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::NewLocalItem( 1, 0, 0, - 0, + kUncommittedVersion, true, std::string(), // Sync thread will assign the initial ID. syncable::GenerateSyncableHash(GetModelTypeFromSpecifics(specifics), diff --git a/sync/engine/non_blocking_sync_common.cc b/sync/engine/non_blocking_sync_common.cc index c5a3656..0042990 100644 --- a/sync/engine/non_blocking_sync_common.cc +++ b/sync/engine/non_blocking_sync_common.cc @@ -6,25 +6,32 @@ namespace syncer { -DataTypeState::DataTypeState() { +DataTypeState::DataTypeState() : next_client_id(0), initial_sync_done(false) { } DataTypeState::~DataTypeState() { } -CommitRequestData::CommitRequestData() { +CommitRequestData::CommitRequestData() + : sequence_number(0), + base_version(0), + deleted(false) { } CommitRequestData::~CommitRequestData() { } -CommitResponseData::CommitResponseData() { +CommitResponseData::CommitResponseData() + : sequence_number(0), + response_version(0) { } CommitResponseData::~CommitResponseData() { } -UpdateResponseData::UpdateResponseData() { +UpdateResponseData::UpdateResponseData() + : response_version(0), + deleted(false) { } UpdateResponseData::~UpdateResponseData() { diff --git a/sync/engine/non_blocking_sync_common.h b/sync/engine/non_blocking_sync_common.h index f07fdb2..d0e0f9d 100644 --- a/sync/engine/non_blocking_sync_common.h +++ b/sync/engine/non_blocking_sync_common.h @@ -14,6 +14,8 @@ namespace syncer { +static const int64 kUncommittedVersion = -1; + // Data-type global state that must be accessed and updated on the sync thread, // but persisted on or through the model thread. struct SYNC_EXPORT_PRIVATE DataTypeState { @@ -43,6 +45,11 @@ struct SYNC_EXPORT_PRIVATE DataTypeState { // client-tagged data types supported by non-blocking sync, but we will // continue to emulate the directory sync's behavior for now. int64 next_client_id; + + // This flag is set to true when the first download cycle is complete. The + // NonBlockingTypeProcessor should not attempt to commit any items until this + // flag is set. + bool initial_sync_done; }; struct SYNC_EXPORT_PRIVATE CommitRequestData { diff --git a/sync/engine/non_blocking_type_commit_contribution.cc b/sync/engine/non_blocking_type_commit_contribution.cc new file mode 100644 index 0000000..9e48ad6 --- /dev/null +++ b/sync/engine/non_blocking_type_commit_contribution.cc @@ -0,0 +1,119 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_commit_contribution.h" + +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/protocol/proto_value_conversions.h" + +namespace syncer { + +NonBlockingTypeCommitContribution::NonBlockingTypeCommitContribution( + const sync_pb::DataTypeContext& context, + const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities, + const std::vector<int64>& sequence_numbers, + NonBlockingTypeProcessorCore* processor_core) + : processor_core_(processor_core), + context_(context), + entities_(entities), + sequence_numbers_(sequence_numbers), + cleaned_up_(false) { +} + +NonBlockingTypeCommitContribution::~NonBlockingTypeCommitContribution() { + DCHECK(cleaned_up_); +} + +void NonBlockingTypeCommitContribution::AddToCommitMessage( + sync_pb::ClientToServerMessage* msg) { + sync_pb::CommitMessage* commit_message = msg->mutable_commit(); + entries_start_index_ = commit_message->entries_size(); + + std::copy(entities_.begin(), + entities_.end(), + RepeatedPtrFieldBackInserter(commit_message->mutable_entries())); + if (!context_.context().empty()) + commit_message->add_client_contexts()->CopyFrom(context_); +} + +SyncerError NonBlockingTypeCommitContribution::ProcessCommitResponse( + const sync_pb::ClientToServerResponse& response, + sessions::StatusController* status) { + const sync_pb::CommitResponse& commit_response = response.commit(); + + bool transient_error = false; + bool commit_conflict = false; + bool unknown_error = false; + + CommitResponseDataList response_list; + + for (size_t i = 0; i < sequence_numbers_.size(); ++i) { + const sync_pb::CommitResponse_EntryResponse& entry_response = + commit_response.entryresponse(entries_start_index_ + i); + + switch (entry_response.response_type()) { + case sync_pb::CommitResponse::INVALID_MESSAGE: + LOG(ERROR) << "Server reports commit message is invalid."; + DLOG(ERROR) << "Message was: " << SyncEntityToValue(entities_.Get(i), + false); + unknown_error = true; + break; + case sync_pb::CommitResponse::CONFLICT: + DVLOG(1) << "Server reports conflict for commit message."; + DVLOG(1) << "Message was: " << SyncEntityToValue(entities_.Get(i), + false); + commit_conflict = true; + break; + case sync_pb::CommitResponse::SUCCESS: { + CommitResponseData response_data; + response_data.id = entry_response.id_string(); + response_data.client_tag_hash = + entities_.Get(i).client_defined_unique_tag(); + response_data.sequence_number = sequence_numbers_[i]; + response_data.response_version = entry_response.version(); + response_list.push_back(response_data); + break; + } + case sync_pb::CommitResponse::OVER_QUOTA: + case sync_pb::CommitResponse::RETRY: + case sync_pb::CommitResponse::TRANSIENT_ERROR: + DLOG(WARNING) << "Entity commit blocked by transient error."; + transient_error = true; + break; + default: + LOG(ERROR) << "Bad return from ProcessSingleCommitResponse."; + unknown_error = true; + } + } + + // Send whatever successful responses we did get back to our parent. + // It's the schedulers job to handle the failures. + processor_core_->OnCommitResponse(response_list); + + // Let the scheduler know about the failures. + if (unknown_error) { + return SERVER_RETURN_UNKNOWN_ERROR; + } else if (transient_error) { + return SERVER_RETURN_TRANSIENT_ERROR; + } else if (commit_conflict) { + return SERVER_RETURN_CONFLICT; + } else { + return SYNCER_OK; + } +} + +void NonBlockingTypeCommitContribution::CleanUp() { + cleaned_up_ = true; + + // We could inform our parent NonBlockingCommitContributor that a commit is + // no longer in progress. The current implementation doesn't really care + // either way, so we don't bother sending the signal. +} + +size_t NonBlockingTypeCommitContribution::GetNumEntries() const { + return sequence_numbers_.size(); +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_commit_contribution.h b/sync/engine/non_blocking_type_commit_contribution.h new file mode 100644 index 0000000..4d415ca --- /dev/null +++ b/sync/engine/non_blocking_type_commit_contribution.h @@ -0,0 +1,66 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_ +#define SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_ + +#include <vector> + +#include "base/basictypes.h" +#include "sync/engine/commit_contribution.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +class NonBlockingTypeProcessorCore; + +// A non-blocking sync type's contribution to an outgoing commit message. +// +// Helps build a commit message and process its response. It collaborates +// closely with the NonBlockingTypeProcessorCore. +class NonBlockingTypeCommitContribution : public CommitContribution { + public: + NonBlockingTypeCommitContribution( + const sync_pb::DataTypeContext& context, + const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities, + const std::vector<int64>& sequence_numbers, + NonBlockingTypeProcessorCore* processor_core); + virtual ~NonBlockingTypeCommitContribution(); + + // Implementation of CommitContribution + virtual void AddToCommitMessage(sync_pb::ClientToServerMessage* msg) OVERRIDE; + virtual SyncerError ProcessCommitResponse( + const sync_pb::ClientToServerResponse& response, + sessions::StatusController* status) OVERRIDE; + virtual void CleanUp() OVERRIDE; + virtual size_t GetNumEntries() const OVERRIDE; + + private: + // A non-owned pointer back to the object that created this contribution. + NonBlockingTypeProcessorCore* const processor_core_; + + // The type-global context information. + const sync_pb::DataTypeContext context_; + + // The set of entities to be committed, serialized as SyncEntities. + const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> entities_; + + // The sequence numbers associated with the pending commits. These match up + // with the entities_ vector. + const std::vector<int64> sequence_numbers_; + + // The index in the commit message where this contribution's entities are + // added. Used to correlate per-item requests with per-item responses. + size_t entries_start_index_; + + // A flag used to ensure this object's contract is respected. Helps to check + // that CleanUp() is called before the object is destructed. + bool cleaned_up_; + + DISALLOW_COPY_AND_ASSIGN(NonBlockingTypeCommitContribution); +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_ diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc index 0aaead4..c1982b3 100644 --- a/sync/engine/non_blocking_type_processor.cc +++ b/sync/engine/non_blocking_type_processor.cc @@ -51,7 +51,6 @@ void NonBlockingTypeProcessor::Enable( // TODO(rlarocque): At some point, this should be loaded from storage. data_type_state_.progress_marker.set_data_type_id( GetSpecificsFieldNumberFromModelType(type_)); - data_type_state_.next_client_id = 0; sync_core_proxy_ = sync_core_proxy.Pass(); sync_core_proxy_->ConnectTypeToCore(GetModelType(), @@ -142,6 +141,10 @@ void NonBlockingTypeProcessor::FlushPendingCommitRequests() { if (!IsConnected()) return; + // Don't send anything if the type is not ready to handle commits. + if (!data_type_state_.initial_sync_done) + return; + // TODO(rlarocque): Do something smarter than iterate here. for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); ++it) { @@ -184,6 +187,9 @@ void NonBlockingTypeProcessor::OnCommitCompletion( void NonBlockingTypeProcessor::OnUpdateReceived( const DataTypeState& data_type_state, const UpdateResponseDataList& response_list) { + bool initial_sync_just_finished = + !data_type_state_.initial_sync_done && data_type_state.initial_sync_done; + data_type_state_ = data_type_state; for (UpdateResponseDataList::const_iterator list_it = response_list.begin(); @@ -215,6 +221,9 @@ void NonBlockingTypeProcessor::OnUpdateReceived( } } + if (initial_sync_just_finished) + FlushPendingCommitRequests(); + // TODO: Inform the model of the new or updated data. } diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc index 236c9c5..a14a8d0 100644 --- a/sync/engine/non_blocking_type_processor_core.cc +++ b/sync/engine/non_blocking_type_processor_core.cc @@ -4,20 +4,28 @@ #include "sync/engine/non_blocking_type_processor_core.h" +#include "base/bind.h" +#include "base/format_macros.h" #include "base/logging.h" +#include "base/strings/stringprintf.h" #include "sync/engine/commit_contribution.h" +#include "sync/engine/non_blocking_type_commit_contribution.h" +#include "sync/engine/non_blocking_type_processor_interface.h" +#include "sync/engine/sync_thread_sync_entity.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" namespace syncer { NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( - ModelType type, - scoped_refptr<base::SequencedTaskRunner> processor_task_runner, - base::WeakPtr<NonBlockingTypeProcessor> processor) + ModelType type, + const DataTypeState& initial_state, + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface) : type_(type), - processor_task_runner_(processor_task_runner), - processor_(processor), + data_type_state_(initial_state), + processor_interface_(processor_interface.Pass()), + entities_deleter_(&entities_), weak_ptr_factory_(this) { - progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type)); } NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() { @@ -32,16 +40,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const { void NonBlockingTypeProcessorCore::GetDownloadProgress( sync_pb::DataTypeProgressMarker* progress_marker) const { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_); - *progress_marker = progress_marker_; + progress_marker->CopyFrom(data_type_state_.progress_marker); } void NonBlockingTypeProcessorCore::GetDataTypeContext( sync_pb::DataTypeContext* context) const { - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting context for: " << ModelTypeToString(type_); - context->Clear(); + DCHECK(CalledOnValidThread()); + context->CopyFrom(data_type_state_.type_context); } SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( @@ -50,22 +55,78 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( const SyncEntityList& applicable_updates, sessions::StatusController* status) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_); - progress_marker_ = progress_marker; + + // TODO(rlarocque): Handle data type context conflicts. + data_type_state_.type_context = mutated_context; + data_type_state_.progress_marker = progress_marker; + + UpdateResponseDataList response_datas; + + for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); + update_it != applicable_updates.end(); + ++update_it) { + const sync_pb::SyncEntity* update_entity = *update_it; + if (!update_entity->server_defined_unique_tag().empty()) { + // We can't commit an item unless we know its parent ID. This is where + // we learn that ID and remember it forever. + DCHECK_EQ(ModelTypeToRootTag(type_), + update_entity->server_defined_unique_tag()); + if (!data_type_state_.type_root_id.empty()) { + DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); + } + data_type_state_.type_root_id = update_entity->id_string(); + } else { + // Normal updates are handled here. + const std::string& client_tag_hash = + update_entity->client_defined_unique_tag(); + DCHECK(!client_tag_hash.empty()); + EntityMap::const_iterator map_it = entities_.find(client_tag_hash); + if (map_it == entities_.end()) { + SyncThreadSyncEntity* entity = + SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(), + client_tag_hash, + update_entity->version()); + entities_.insert(std::make_pair(client_tag_hash, entity)); + } else { + SyncThreadSyncEntity* entity = map_it->second; + entity->ReceiveUpdate(update_entity->version()); + } + + // Prepare the message for the model thread. + UpdateResponseData response_data; + response_data.id = update_entity->id_string(); + response_data.client_tag_hash = client_tag_hash; + response_data.response_version = update_entity->version(); + response_data.ctime = ProtoTimeToTime(update_entity->ctime()); + response_data.mtime = ProtoTimeToTime(update_entity->mtime()); + response_data.non_unique_name = update_entity->name(); + response_data.deleted = update_entity->deleted(); + response_data.specifics = update_entity->specifics(); + + response_datas.push_back(response_data); + } + } + + // Forward these updates to the model thread so it can do the rest. + processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas); + return SYNCER_OK; } void NonBlockingTypeProcessorCore::ApplyUpdates( sessions::StatusController* status) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_); -} + // This function is called only when we've finished a download cycle, ie. we + // got a response with changes_remaining == 0. If this is our first download + // cycle, we should update our state so the NonBlockingTypeProcessor knows + // that it's safe to commit items now. + if (!data_type_state_.initial_sync_done) { + data_type_state_.initial_sync_done = true; -void NonBlockingTypeProcessorCore::RequestCommits( - const CommitRequestDataList& request_list) { - // TODO(rlarocque): Implement this. crbug.com/351005. + UpdateResponseDataList empty_update_list; + processor_interface_->ReceiveUpdateResponse(data_type_state_, + empty_update_list); + } } void NonBlockingTypeProcessorCore::PassiveApplyUpdates( @@ -75,13 +136,119 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates( << "ModelType is: " << ModelTypeToString(type_); } +void NonBlockingTypeProcessorCore::EnqueueForCommit( + const CommitRequestDataList& list) { + DCHECK(CalledOnValidThread()); + + DCHECK(CanCommitItems()) + << "Asked to commit items before type was initialized. " + << "ModelType is: " << ModelTypeToString(type_); + + for (CommitRequestDataList::const_iterator it = list.begin(); + it != list.end(); + ++it) { + StorePendingCommit(*it); + } +} + // CommitContributor implementation. scoped_ptr<CommitContribution> NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_); - return scoped_ptr<CommitContribution>(); + + size_t space_remaining = max_entries; + std::vector<int64> sequence_numbers; + google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; + + if (!CanCommitItems()) + return scoped_ptr<CommitContribution>(); + + // TODO(rlarocque): Avoid iterating here. + for (EntityMap::const_iterator it = entities_.begin(); + it != entities_.end() && space_remaining > 0; + ++it) { + SyncThreadSyncEntity* entity = it->second; + if (entity->IsCommitPending()) { + sync_pb::SyncEntity* commit_entity = commit_entities.Add(); + int64 sequence_number = -1; + + entity->PrepareCommitProto(commit_entity, &sequence_number); + HelpInitializeCommitEntity(commit_entity); + sequence_numbers.push_back(sequence_number); + + space_remaining--; + } + } + + if (commit_entities.size() == 0) + return scoped_ptr<CommitContribution>(); + + return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( + data_type_state_.type_context, commit_entities, sequence_numbers, this)); +} + +void NonBlockingTypeProcessorCore::StorePendingCommit( + const CommitRequestData& request) { + if (!request.deleted) { + DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); + } + + EntityMap::iterator map_it = entities_.find(request.client_tag_hash); + if (map_it == entities_.end()) { + SyncThreadSyncEntity* entity = + SyncThreadSyncEntity::FromCommitRequest(request.id, + request.client_tag_hash, + request.sequence_number, + request.base_version, + request.ctime, + request.mtime, + request.non_unique_name, + request.deleted, + request.specifics); + entities_.insert(std::make_pair(request.client_tag_hash, entity)); + } else { + SyncThreadSyncEntity* entity = map_it->second; + entity->RequestCommit(request.id, + request.client_tag_hash, + request.sequence_number, + request.base_version, + request.ctime, + request.mtime, + request.non_unique_name, + request.deleted, + request.specifics); + } + + // TODO: Nudge SyncScheduler. +} + +void NonBlockingTypeProcessorCore::OnCommitResponse( + const CommitResponseDataList& response_list) { + for (CommitResponseDataList::const_iterator response_it = + response_list.begin(); + response_it != response_list.end(); + ++response_it) { + const std::string client_tag_hash = response_it->client_tag_hash; + EntityMap::iterator map_it = entities_.find(client_tag_hash); + + // There's no way we could have committed an entry we know nothing about. + if (map_it == entities_.end()) { + NOTREACHED() << "Received commit response for item unknown to us." + << " Model type: " << ModelTypeToString(type_) + << " ID: " << response_it->id; + continue; + } + + SyncThreadSyncEntity* entity = map_it->second; + entity->ReceiveCommitResponse(response_it->id, + response_it->response_version, + response_it->sequence_number); + } + + // Send the responses back to the model thread. It needs to know which + // items have been successfully committed so it can save that information in + // permanent storage. + processor_interface_->ReceiveCommitResponse(data_type_state_, response_list); } base::WeakPtr<NonBlockingTypeProcessorCore> @@ -89,4 +256,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() { return weak_ptr_factory_.GetWeakPtr(); } +bool NonBlockingTypeProcessorCore::CanCommitItems() const { + // We can't commit anything until we know the type's parent node. + // We'll get it in the first update response. + return !data_type_state_.type_root_id.empty() && + data_type_state_.initial_sync_done; +} + +void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity( + sync_pb::SyncEntity* sync_entity) { + // Initial commits need our help to generate a client ID. + if (!sync_entity->has_id_string()) { + DCHECK_EQ(kUncommittedVersion, sync_entity->version()); + const int64 id = data_type_state_.next_client_id++; + sync_entity->set_id_string( + base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); + } + + // Always include enough specifics to identify the type. Do this even in + // deletion requests, where the specifics are otherwise invalid. + if (!sync_entity->has_specifics()) { + AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); + } + + // We're always responsible for the parent ID. + sync_entity->set_parent_id_string(data_type_state_.type_root_id); +} + } // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h index e226349..93d4215 100644 --- a/sync/engine/non_blocking_type_processor_core.h +++ b/sync/engine/non_blocking_type_processor_core.h @@ -6,6 +6,7 @@ #define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_CORE_H_ #include "base/memory/weak_ptr.h" +#include "base/stl_util.h" #include "base/threading/non_thread_safe.h" #include "sync/base/sync_export.h" #include "sync/engine/commit_contributor.h" @@ -20,7 +21,8 @@ class SingleThreadTaskRunner; namespace syncer { -class NonBlockingTypeProcessor; +class NonBlockingTypeProcessorInterface; +class SyncThreadSyncEntity; // A smart cache for sync types that use message passing (rather than // transactions and the syncable::Directory) to communicate with the sync @@ -28,9 +30,9 @@ class NonBlockingTypeProcessor; // // When the non-blocking sync type wants to talk with the sync server, it will // send a message from its thread to this object on the sync thread. This -// object is responsible for helping to ensure the appropriate sync server -// communication gets scheduled and executed. The response, if any, will be -// returned to the non-blocking sync type's thread eventually. +// object ensures the appropriate sync server communication gets scheduled and +// executed. The response, if any, will be returned to the non-blocking sync +// type's thread eventually. // // This object also has a role to play in communications in the opposite // direction. Sometimes the sync thread will receive changes from the sync @@ -49,8 +51,8 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore public: NonBlockingTypeProcessorCore( ModelType type, - scoped_refptr<base::SequencedTaskRunner> processor_task_runner, - base::WeakPtr<NonBlockingTypeProcessor> processor); + const DataTypeState& initial_state, + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface); virtual ~NonBlockingTypeProcessorCore(); ModelType GetModelType() const; @@ -69,20 +71,57 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore virtual void PassiveApplyUpdates(sessions::StatusController* status) OVERRIDE; // Entry point for NonBlockingTypeProcessor to send commit requests. - void RequestCommits(const CommitRequestDataList& request_list); + void EnqueueForCommit(const CommitRequestDataList& request_list); // CommitContributor implementation. virtual scoped_ptr<CommitContribution> GetContribution( size_t max_entries) OVERRIDE; + // Callback for when our contribution gets a response. + void OnCommitResponse(const CommitResponseDataList& response_list); + base::WeakPtr<NonBlockingTypeProcessorCore> AsWeakPtr(); private: + typedef std::map<std::string, SyncThreadSyncEntity*> EntityMap; + + // Stores a single commit request in this object's internal state. + void StorePendingCommit(const CommitRequestData& request); + + // Returns true if all data type state required for commits is available. In + // practice, this means that it returns true from the time this object first + // receives notice of a successful update fetch from the server. + bool CanCommitItems() const; + + // Initializes the parts of a commit entity that are the responsibility of + // this class, and not the SyncThreadSyncEntity. Some fields, like the + // client-assigned ID, can only be set by an entity with knowledge of the + // entire data type's state. + void HelpInitializeCommitEntity(sync_pb::SyncEntity* commit_entity); + ModelType type_; - sync_pb::DataTypeProgressMarker progress_marker_; - scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; - base::WeakPtr<NonBlockingTypeProcessor> processor_; + // State that applies to the entire model type. + DataTypeState data_type_state_; + + // Abstraction around the NonBlockingTypeProcessor so this class + // doesn't need to know about its specific implementation or + // which thread it's on. This makes it easier to write tests. + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface_; + + // A map of per-entity information known to this object. + // + // When commits are pending, their information is stored here. This + // information is dropped from memory when the commit succeeds or gets + // cancelled. + // + // This also stores some information related to received server state in + // order to implement reflection blocking and conflict detection. This + // information is kept in memory indefinitely. With a bit more coordination + // with the model thread, we could optimize this to reduce memory usage in + // the steady state. + EntityMap entities_; + STLValueDeleter<EntityMap> entities_deleter_; base::WeakPtrFactory<NonBlockingTypeProcessorCore> weak_ptr_factory_; }; diff --git a/sync/engine/non_blocking_type_processor_core_unittest.cc b/sync/engine/non_blocking_type_processor_core_unittest.cc new file mode 100644 index 0000000..87b5647 --- /dev/null +++ b/sync/engine/non_blocking_type_processor_core_unittest.cc @@ -0,0 +1,961 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_processor_core.h" + +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/callback.h" +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/engine/non_blocking_type_commit_contribution.h" +#include "sync/engine/non_blocking_type_processor_interface.h" +#include "sync/protocol/sync.pb.h" +#include "sync/sessions/status_controller.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" + +#include "testing/gtest/include/gtest/gtest.h" + +using google::protobuf::RepeatedPtrField; + +static const std::string kTypeParentId = "PrefsRootNodeID"; + +namespace syncer { + +class NonBlockingTypeProcessorCoreTest; + +namespace { + +class MockNonBlockingTypeProcessor : public NonBlockingTypeProcessorInterface { + public: + MockNonBlockingTypeProcessor(NonBlockingTypeProcessorCoreTest* parent); + virtual ~MockNonBlockingTypeProcessor(); + + virtual void ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) OVERRIDE; + virtual void ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) OVERRIDE; + + private: + NonBlockingTypeProcessorCoreTest* parent_; + + DISALLOW_COPY_AND_ASSIGN(MockNonBlockingTypeProcessor); +}; + +} // namespace + +// Tests the NonBlockingTypeProcessorCore. +// +// This class passes messages between the model thread and sync server. +// As such, its code is subject to lots of different race conditions. This +// test harness lets us exhaustively test all possible races. We try to +// focus on just a few interesting cases. +// +// Inputs: +// - Initial data type state from the model thread. +// - Commit requests from the model thread. +// - Update responses from the server. +// - Commit responses from the server. +// +// Outputs: +// - Commit requests to the server. +// - Commit responses to the model thread. +// - Update responses to the model thread. +// - Nudges to the sync scheduler. +// +// We use the MockNonBlockingTypeProcessor to stub out all communication +// with the model thread. That interface is synchronous, which makes it +// much easier to test races. +// +// The interface with the server is built around "pulling" data from this +// class, so we don't have to mock out any of it. We wrap it with some +// convenience functions to we can emulate server behavior. +class NonBlockingTypeProcessorCoreTest : public ::testing::Test { + public: + NonBlockingTypeProcessorCoreTest(); + virtual ~NonBlockingTypeProcessorCoreTest(); + + // One of these Initialize functions should be called at the beginning of + // each test. + + // Initializes with no data type state. We will be unable to perform any + // significant server action until we receive an update response that + // contains the type root node for this type. + void FirstInitialize(); + + // Initializes with some existing data type state. Allows us to start + // committing items right away. + void NormalInitialize(); + + // Initialize with a custom initial DataTypeState. + void InitializeWithState(const DataTypeState& state); + + // Modifications on the model thread that get sent to the core under test. + void CommitRequest(const std::string& tag, const std::string& value); + void DeleteRequest(const std::string& tag); + + // Pretends to receive update messages from the server. + void TriggerTypeRootUpdateFromServer(); + void TriggerUpdateFromServer(int64 version_offset, + const std::string& tag, + const std::string& value); + void TriggerTombstoneFromServer(int64 version_offset, const std::string& tag); + + // Callbacks from the mock processor. Called when the |core_| tries to send + // messages to its associated processor on the model thread. + void OnModelThreadReceivedCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list); + void OnModelThreadReceivedUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list); + + // By default, this harness behaves as if all tasks posted to the model + // thread are executed immediately. However, this is not necessarily true. + // The model's TaskRunner has a queue, and the tasks we post to it could + // linger there for a while. In the meantime, the model thread could + // continue posting tasks to the core based on its stale state. + // + // If you want to test those race cases, then these functions are for you. + void SetModelThreadIsSynchronous(bool is_synchronous); + void PumpModelThread(); + + // Returns true if the |core_| is ready to commit something. + bool WillCommit(); + + // Pretend to successfully commit all outstanding unsynced items. + // It is safe to call this only if WillCommit() returns true. + void DoSuccessfulCommit(); + + // Read commit messages the core_ sent to the emulated server. + size_t GetNumCommitMessagesOnServer() const; + sync_pb::ClientToServerMessage GetNthCommitMessageOnServer(size_t n) const; + + // Read the latest version of sync entities committed to the emulated server. + bool HasCommitEntityOnServer(const std::string& tag) const; + sync_pb::SyncEntity GetLatestCommitEntityOnServer( + const std::string& tag) const; + + // Read the latest update messages received on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + size_t GetNumModelThreadUpdateResponses() const; + UpdateResponseDataList GetNthModelThreadUpdateResponse(size_t n) const; + DataTypeState GetNthModelThreadUpdateState(size_t n) const; + + // Reads the latest update response datas on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + bool HasUpdateResponseOnModelThread(const std::string& tag) const; + UpdateResponseData GetUpdateResponseOnModelThread( + const std::string& tag) const; + + // Read the latest commit messages received on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + size_t GetNumModelThreadCommitResponses() const; + CommitResponseDataList GetNthModelThreadCommitResponse(size_t n) const; + DataTypeState GetNthModelThreadCommitState(size_t n) const; + + // Reads the latest commit response datas on the model thread. + // Note that if the model thread is in non-blocking mode, this data will not + // be updated until the response is actually processed by the model thread. + bool HasCommitResponseOnModelThread(const std::string& tag) const; + CommitResponseData GetCommitResponseOnModelThread( + const std::string& tag) const; + + // Helpers for building various messages and structures. + static std::string GenerateId(const std::string& tag_hash); + static std::string GenerateTagHash(const std::string& tag); + static sync_pb::EntitySpecifics GenerateSpecifics(const std::string& tag, + const std::string& value); + + private: + // Get and set our emulated server state. + int64 GetServerVersion(const std::string& tag_hash); + void SetServerVersion(const std::string& tag_hash, int64 version); + + // Get and set our emulated model thread state. + int64 GetCurrentSequenceNumber(const std::string& tag_hash) const; + int64 GetNextSequenceNumber(const std::string& tag_hash); + int64 GetModelVersion(const std::string& tag_hash) const; + void SetModelVersion(const std::string& tag_hash, int64 version); + + // Receive a commit response in the emulated model thread. + // + // Kept in a separate Impl method so we can emulate deferred task processing. + // See SetModelThreadIsSynchronous() for details. + void ModelThreadReceiveCommitResponseImpl( + const DataTypeState& type_state, + const CommitResponseDataList& response_list); + + // Receive an update response in the emulated model thread. + // + // Kept in a separate Impl method so we can emulate deferred task processing. + // See SetModelThreadIsSynchronous() for details. + void ModelThreadReceiveUpdateResponseImpl( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list); + + // Builds a fake progress marker for our response. + sync_pb::DataTypeProgressMarker GenerateResponseProgressMarker() const; + + scoped_ptr<NonBlockingTypeProcessorCore> core_; + MockNonBlockingTypeProcessor* mock_processor_; + + // Model thread state maps. + std::map<const std::string, int64> model_sequence_numbers_; + std::map<const std::string, int64> model_base_versions_; + + // Server state maps. + std::map<const std::string, int64> server_versions_; + + // Logs of messages sent to the server. Used in assertions. + std::map<const std::string, sync_pb::SyncEntity> committed_items_; + std::vector<sync_pb::ClientToServerMessage> commit_messages_; + + // State related to emulation of the model thread's task queue. Used to + // defer model thread work to simulate a full model thread task runner queue. + bool model_thread_is_synchronous_; + std::vector<base::Closure> model_thread_tasks_; + + // A cache of messages sent to the model thread. + std::vector<CommitResponseDataList> commit_responses_to_model_thread_; + std::vector<UpdateResponseDataList> updates_responses_to_model_thread_; + std::vector<DataTypeState> updates_states_to_model_thread_; + std::vector<DataTypeState> commit_states_to_model_thread_; + + // A cache of the latest responses on the model thread, by client tag. + std::map<const std::string, CommitResponseData> + model_thread_commit_response_items_; + std::map<const std::string, UpdateResponseData> + model_thread_update_response_items_; +}; + +// These had to wait until the class definition of +// NonBlockingTypeProcessorCoreTest +MockNonBlockingTypeProcessor::MockNonBlockingTypeProcessor( + NonBlockingTypeProcessorCoreTest* parent) + : parent_(parent) { +} + +MockNonBlockingTypeProcessor::~MockNonBlockingTypeProcessor() { +} + +void MockNonBlockingTypeProcessor::ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + parent_->OnModelThreadReceivedCommitResponse(type_state, response_list); +} + +void MockNonBlockingTypeProcessor::ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + parent_->OnModelThreadReceivedUpdateResponse(type_state, response_list); +} + +NonBlockingTypeProcessorCoreTest::NonBlockingTypeProcessorCoreTest() + : model_thread_is_synchronous_(true) { +} + +NonBlockingTypeProcessorCoreTest::~NonBlockingTypeProcessorCoreTest() { +} + +void NonBlockingTypeProcessorCoreTest::FirstInitialize() { + DataTypeState initial_state; + initial_state.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(PREFERENCES)); + initial_state.next_client_id = 0; + + InitializeWithState(initial_state); +} + +void NonBlockingTypeProcessorCoreTest::NormalInitialize() { + DataTypeState initial_state; + initial_state.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(PREFERENCES)); + initial_state.progress_marker.set_token("some_saved_progress_token"); + + initial_state.next_client_id = 10; + initial_state.type_root_id = kTypeParentId; + initial_state.initial_sync_done = true; + + InitializeWithState(initial_state); +} + +void NonBlockingTypeProcessorCoreTest::InitializeWithState( + const DataTypeState& state) { + DCHECK(!core_); + + // We don't get to own this interace. The |core_| keeps a scoped_ptr to it. + mock_processor_ = new MockNonBlockingTypeProcessor(this); + scoped_ptr<NonBlockingTypeProcessorInterface> interface(mock_processor_); + + core_.reset( + new NonBlockingTypeProcessorCore(PREFERENCES, state, interface.Pass())); +} + +void NonBlockingTypeProcessorCoreTest::CommitRequest(const std::string& tag, + const std::string& value) { + const std::string tag_hash = GenerateTagHash(tag); + const int64 base_version = GetModelVersion(tag_hash); + + CommitRequestData data; + + // Initial commits don't have IDs. Everything else does. + if (base_version > kUncommittedVersion) { + data.id = GenerateId(tag_hash); + } + + data.client_tag_hash = tag_hash; + data.sequence_number = GetNextSequenceNumber(tag_hash); + + data.base_version = base_version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.mtime = data.ctime + base::TimeDelta::FromSeconds(base_version); + data.non_unique_name = tag; + + data.deleted = false; + data.specifics = GenerateSpecifics(tag, value); + + CommitRequestDataList list; + list.push_back(data); + + core_->EnqueueForCommit(list); +} + +void NonBlockingTypeProcessorCoreTest::DeleteRequest(const std::string& tag) { + const std::string tag_hash = GenerateTagHash(tag); + const int64 base_version = GetModelVersion(tag_hash); + CommitRequestData data; + + // Requests to commit server-unknown items don't have IDs. + // We'll never send a deletion for a server-unknown item, but the model is + // allowed to request that we do. + if (base_version > kUncommittedVersion) { + data.id = GenerateId(tag_hash); + } + + data.client_tag_hash = tag_hash; + data.sequence_number = GetNextSequenceNumber(tag_hash); + + data.base_version = base_version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.client_tag_hash = tag_hash; + data.mtime = data.ctime + base::TimeDelta::FromSeconds(base_version); + data.deleted = true; + + CommitRequestDataList list; + list.push_back(data); + + core_->EnqueueForCommit(list); +} + +void NonBlockingTypeProcessorCoreTest::TriggerTypeRootUpdateFromServer() { + sync_pb::SyncEntity entity; + + entity.set_id_string(kTypeParentId); + entity.set_parent_id_string("r"); + entity.set_version(1000); + entity.set_ctime(TimeToProtoTime(base::Time::UnixEpoch())); + entity.set_mtime(TimeToProtoTime(base::Time::UnixEpoch())); + entity.set_server_defined_unique_tag(ModelTypeToRootTag(PREFERENCES)); + entity.set_deleted(false); + AddDefaultFieldValue(PREFERENCES, entity.mutable_specifics()); + + const sync_pb::DataTypeProgressMarker& progress = + GenerateResponseProgressMarker(); + const sync_pb::DataTypeContext blank_context; + sessions::StatusController dummy_status; + + SyncEntityList entity_list; + entity_list.push_back(&entity); + + core_->ProcessGetUpdatesResponse( + progress, blank_context, entity_list, &dummy_status); + core_->ApplyUpdates(&dummy_status); +} + +void NonBlockingTypeProcessorCoreTest::TriggerUpdateFromServer( + int64 version_offset, + const std::string& tag, + const std::string& value) { + const std::string tag_hash = GenerateTagHash(tag); + + int64 old_version = GetServerVersion(tag_hash); + int64 version = old_version + version_offset; + if (version > old_version) { + SetServerVersion(tag_hash, version); + } + + sync_pb::SyncEntity entity; + + entity.set_id_string(GenerateId(tag_hash)); + entity.set_parent_id_string(kTypeParentId); + entity.set_version(version); + + base::Time ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + base::Time mtime = ctime + base::TimeDelta::FromSeconds(version); + entity.set_ctime(TimeToProtoTime(ctime)); + entity.set_mtime(TimeToProtoTime(mtime)); + + entity.set_name(tag); + entity.set_client_defined_unique_tag(GenerateTagHash(tag)); + entity.set_deleted(false); + entity.mutable_specifics()->CopyFrom(GenerateSpecifics(tag, value)); + + SyncEntityList entity_list; + entity_list.push_back(&entity); + + const sync_pb::DataTypeProgressMarker& progress = + GenerateResponseProgressMarker(); + const sync_pb::DataTypeContext blank_context; + sessions::StatusController dummy_status; + + core_->ProcessGetUpdatesResponse( + progress, blank_context, entity_list, &dummy_status); + core_->ApplyUpdates(&dummy_status); +} + +void NonBlockingTypeProcessorCoreTest::TriggerTombstoneFromServer( + int64 version_offset, + const std::string& tag) { + const std::string tag_hash = GenerateTagHash(tag); + int64 old_version = GetServerVersion(tag_hash); + int64 version = old_version + version_offset; + if (version > old_version) { + SetServerVersion(tag_hash, version); + } + + UpdateResponseData data; + + data.id = GenerateId(tag_hash); + data.client_tag_hash = tag_hash; + data.response_version = version; + data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1); + data.mtime = data.ctime + base::TimeDelta::FromSeconds(version); + data.non_unique_name = tag; + data.deleted = true; +} + +void NonBlockingTypeProcessorCoreTest::OnModelThreadReceivedCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + base::Closure task = base::Bind( + &NonBlockingTypeProcessorCoreTest::ModelThreadReceiveCommitResponseImpl, + base::Unretained(this), + type_state, + response_list); + model_thread_tasks_.push_back(task); + if (model_thread_is_synchronous_) + PumpModelThread(); +} + +void NonBlockingTypeProcessorCoreTest::OnModelThreadReceivedUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + base::Closure task = base::Bind( + &NonBlockingTypeProcessorCoreTest::ModelThreadReceiveUpdateResponseImpl, + base::Unretained(this), + type_state, + response_list); + model_thread_tasks_.push_back(task); + if (model_thread_is_synchronous_) + PumpModelThread(); +} + +void NonBlockingTypeProcessorCoreTest::SetModelThreadIsSynchronous( + bool is_synchronous) { + model_thread_is_synchronous_ = is_synchronous; +} + +void NonBlockingTypeProcessorCoreTest::PumpModelThread() { + for (std::vector<base::Closure>::iterator it = model_thread_tasks_.begin(); + it != model_thread_tasks_.end(); + ++it) { + it->Run(); + } + model_thread_tasks_.clear(); +} + +bool NonBlockingTypeProcessorCoreTest::WillCommit() { + scoped_ptr<CommitContribution> contribution(core_->GetContribution(INT_MAX)); + + if (contribution) { + contribution->CleanUp(); // Gracefully abort the commit. + return true; + } else { + return false; + } +} + +// Conveniently, this is all one big synchronous operation. The sync thread +// remains blocked while the commit is in progress, so we don't need to worry +// about other tasks being run between the time when the commit request is +// issued and the time when the commit response is received. +void NonBlockingTypeProcessorCoreTest::DoSuccessfulCommit() { + DCHECK(WillCommit()); + scoped_ptr<CommitContribution> contribution(core_->GetContribution(INT_MAX)); + + sync_pb::ClientToServerMessage message; + contribution->AddToCommitMessage(&message); + commit_messages_.push_back(message); + + sync_pb::ClientToServerResponse response; + sync_pb::CommitResponse* commit_response = response.mutable_commit(); + + const RepeatedPtrField<sync_pb::SyncEntity>& entries = + message.commit().entries(); + for (RepeatedPtrField<sync_pb::SyncEntity>::const_iterator it = + entries.begin(); + it != entries.end(); + ++it) { + const std::string tag_hash = it->client_defined_unique_tag(); + + committed_items_[tag_hash] = *it; + + // Every commit increments the version number. + int64 version = GetServerVersion(tag_hash); + version++; + SetServerVersion(tag_hash, version); + + sync_pb::CommitResponse_EntryResponse* entryresponse = + commit_response->add_entryresponse(); + entryresponse->set_response_type(sync_pb::CommitResponse::SUCCESS); + entryresponse->set_id_string(GenerateId(tag_hash)); + entryresponse->set_parent_id_string(it->parent_id_string()); + entryresponse->set_version(version); + entryresponse->set_name(it->name()); + entryresponse->set_mtime(it->mtime()); + } + + sessions::StatusController dummy_status; + contribution->ProcessCommitResponse(response, &dummy_status); + contribution->CleanUp(); +} + +size_t NonBlockingTypeProcessorCoreTest::GetNumCommitMessagesOnServer() const { + return commit_messages_.size(); +} + +sync_pb::ClientToServerMessage +NonBlockingTypeProcessorCoreTest::GetNthCommitMessageOnServer(size_t n) const { + DCHECK_LT(n, GetNumCommitMessagesOnServer()); + return commit_messages_[n]; +} + +bool NonBlockingTypeProcessorCoreTest::HasCommitEntityOnServer( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, sync_pb::SyncEntity>::const_iterator it = + committed_items_.find(tag_hash); + return it != committed_items_.end(); +} + +sync_pb::SyncEntity +NonBlockingTypeProcessorCoreTest::GetLatestCommitEntityOnServer( + const std::string& tag) const { + DCHECK(HasCommitEntityOnServer(tag)); + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, sync_pb::SyncEntity>::const_iterator it = + committed_items_.find(tag_hash); + return it->second; +} + +size_t NonBlockingTypeProcessorCoreTest::GetNumModelThreadUpdateResponses() + const { + return updates_responses_to_model_thread_.size(); +} + +UpdateResponseDataList +NonBlockingTypeProcessorCoreTest::GetNthModelThreadUpdateResponse( + size_t n) const { + DCHECK(GetNumModelThreadUpdateResponses()); + return updates_responses_to_model_thread_[n]; +} + +DataTypeState NonBlockingTypeProcessorCoreTest::GetNthModelThreadUpdateState( + size_t n) const { + DCHECK(GetNumModelThreadUpdateResponses()); + return updates_states_to_model_thread_[n]; +} + +bool NonBlockingTypeProcessorCoreTest::HasUpdateResponseOnModelThread( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, UpdateResponseData>::const_iterator it = + model_thread_update_response_items_.find(tag_hash); + return it != model_thread_update_response_items_.end(); +} + +UpdateResponseData +NonBlockingTypeProcessorCoreTest::GetUpdateResponseOnModelThread( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + DCHECK(HasUpdateResponseOnModelThread(tag)); + std::map<const std::string, UpdateResponseData>::const_iterator it = + model_thread_update_response_items_.find(tag_hash); + return it->second; +} + +size_t NonBlockingTypeProcessorCoreTest::GetNumModelThreadCommitResponses() + const { + return commit_responses_to_model_thread_.size(); +} + +CommitResponseDataList +NonBlockingTypeProcessorCoreTest::GetNthModelThreadCommitResponse( + size_t n) const { + DCHECK(GetNumModelThreadCommitResponses()); + return commit_responses_to_model_thread_[n]; +} + +DataTypeState NonBlockingTypeProcessorCoreTest::GetNthModelThreadCommitState( + size_t n) const { + DCHECK(GetNumModelThreadCommitResponses()); + return commit_states_to_model_thread_[n]; +} + +bool NonBlockingTypeProcessorCoreTest::HasCommitResponseOnModelThread( + const std::string& tag) const { + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, CommitResponseData>::const_iterator it = + model_thread_commit_response_items_.find(tag_hash); + return it != model_thread_commit_response_items_.end(); +} + +CommitResponseData +NonBlockingTypeProcessorCoreTest::GetCommitResponseOnModelThread( + const std::string& tag) const { + DCHECK(HasCommitResponseOnModelThread(tag)); + const std::string tag_hash = GenerateTagHash(tag); + std::map<const std::string, CommitResponseData>::const_iterator it = + model_thread_commit_response_items_.find(tag_hash); + return it->second; +} + +std::string NonBlockingTypeProcessorCoreTest::GenerateId( + const std::string& tag_hash) { + return "FakeId:" + tag_hash; +} + +std::string NonBlockingTypeProcessorCoreTest::GenerateTagHash( + const std::string& tag) { + const std::string& client_tag_hash = + syncable::GenerateSyncableHash(PREFERENCES, tag); + + return client_tag_hash; +} + +sync_pb::EntitySpecifics NonBlockingTypeProcessorCoreTest::GenerateSpecifics( + const std::string& tag, + const std::string& value) { + sync_pb::EntitySpecifics specifics; + specifics.mutable_preference()->set_name(tag); + specifics.mutable_preference()->set_value(value); + return specifics; +} + +int64 NonBlockingTypeProcessorCoreTest::GetServerVersion( + const std::string& tag_hash) { + std::map<const std::string, int64>::const_iterator it; + it = server_versions_.find(tag_hash); + // Server versions do not necessarily start at 1 or 0. + if (it == server_versions_.end()) { + return 2048; + } else { + return it->second; + } +} + +void NonBlockingTypeProcessorCoreTest::SetServerVersion( + const std::string& tag_hash, + int64 version) { + server_versions_[tag_hash] = version; +} + +// Fetches the sequence number as of the most recent update request. +int64 NonBlockingTypeProcessorCoreTest::GetCurrentSequenceNumber( + const std::string& tag_hash) const { + std::map<const std::string, int64>::const_iterator it = + model_sequence_numbers_.find(tag_hash); + if (it == model_sequence_numbers_.end()) { + return 0; + } else { + return it->second; + } +} + +// The model thread should be sending us items with strictly increasing +// sequence numbers. Here's where we emulate that behavior. +int64 NonBlockingTypeProcessorCoreTest::GetNextSequenceNumber( + const std::string& tag_hash) { + int64 sequence_number = GetCurrentSequenceNumber(tag_hash); + sequence_number++; + model_sequence_numbers_[tag_hash] = sequence_number; + return sequence_number; +} + +// Fetches the model's base version. +int64 NonBlockingTypeProcessorCoreTest::GetModelVersion( + const std::string& tag_hash) const { + std::map<const std::string, int64>::const_iterator it = + model_base_versions_.find(tag_hash); + if (it == model_base_versions_.end()) { + return kUncommittedVersion; + } else { + return it->second; + } +} + +void NonBlockingTypeProcessorCoreTest::SetModelVersion( + const std::string& tag_hash, + int64 version) { + model_base_versions_[tag_hash] = version; +} + +void NonBlockingTypeProcessorCoreTest::ModelThreadReceiveCommitResponseImpl( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + commit_responses_to_model_thread_.push_back(response_list); + commit_states_to_model_thread_.push_back(type_state); + for (CommitResponseDataList::const_iterator it = response_list.begin(); + it != response_list.end(); + ++it) { + model_thread_commit_response_items_.insert( + std::make_pair(it->client_tag_hash, *it)); + + // Server wins. Set the model's base version. + SetModelVersion(it->client_tag_hash, it->response_version); + } +} + +void NonBlockingTypeProcessorCoreTest::ModelThreadReceiveUpdateResponseImpl( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + updates_responses_to_model_thread_.push_back(response_list); + updates_states_to_model_thread_.push_back(type_state); + for (UpdateResponseDataList::const_iterator it = response_list.begin(); + it != response_list.end(); + ++it) { + model_thread_update_response_items_.insert( + std::make_pair(it->client_tag_hash, *it)); + + // Server wins. Set the model's base version. + SetModelVersion(it->client_tag_hash, it->response_version); + } +} + +sync_pb::DataTypeProgressMarker +NonBlockingTypeProcessorCoreTest::GenerateResponseProgressMarker() const { + sync_pb::DataTypeProgressMarker progress; + progress.set_data_type_id(PREFERENCES); + progress.set_token("non_null_progress_token"); + return progress; +} + +// Requests a commit and verifies the messages sent to the client and server as +// a result. +// +// This test performs sanity checks on most of the fields in these messages. +// For the most part this is checking that the test code behaves as expected +// and the |core_| doesn't mess up its simple task of moving around these +// values. It makes sense to have one or two tests that are this thorough, but +// we shouldn't be this verbose in all tests. +TEST_F(NonBlockingTypeProcessorCoreTest, SimpleCommit) { + NormalInitialize(); + + EXPECT_FALSE(WillCommit()); + EXPECT_EQ(0U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(0U, GetNumModelThreadCommitResponses()); + + CommitRequest("tag1", "value1"); + + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + + const std::string& client_tag_hash = GenerateTagHash("tag1"); + + // Exhaustively verify the SyncEntity sent in the commit message. + ASSERT_EQ(1U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(0).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag1")); + const sync_pb::SyncEntity& entity = GetLatestCommitEntityOnServer("tag1"); + EXPECT_FALSE(entity.id_string().empty()); + EXPECT_EQ(kTypeParentId, entity.parent_id_string()); + EXPECT_EQ(kUncommittedVersion, entity.version()); + EXPECT_NE(0, entity.mtime()); + EXPECT_NE(0, entity.ctime()); + EXPECT_EQ("tag1", entity.name()); + EXPECT_EQ(client_tag_hash, entity.client_defined_unique_tag()); + EXPECT_EQ("tag1", entity.specifics().preference().name()); + EXPECT_FALSE(entity.deleted()); + EXPECT_EQ("value1", entity.specifics().preference().value()); + + // Exhaustively verify the commit response returned to the model thread. + ASSERT_EQ(1U, GetNumModelThreadCommitResponses()); + EXPECT_EQ(1U, GetNthModelThreadCommitResponse(0).size()); + ASSERT_TRUE(HasCommitResponseOnModelThread("tag1")); + const CommitResponseData& commit_response = + GetCommitResponseOnModelThread("tag1"); + + // The ID changes in a commit response to initial commit. + EXPECT_FALSE(commit_response.id.empty()); + EXPECT_NE(entity.id_string(), commit_response.id); + + EXPECT_EQ(client_tag_hash, commit_response.client_tag_hash); + EXPECT_LT(0, commit_response.response_version); +} + +TEST_F(NonBlockingTypeProcessorCoreTest, SimpleDelete) { + NormalInitialize(); + + // We can't delete an entity that was never committed. + // Step 1 is to create and commit a new entity. + CommitRequest("tag1", "value1"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + + ASSERT_TRUE(HasCommitResponseOnModelThread("tag1")); + const CommitResponseData& initial_commit_response = + GetCommitResponseOnModelThread("tag1"); + int64 base_version = initial_commit_response.response_version; + + // Now that we have an entity, we can delete it. + DeleteRequest("tag1"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + + // Verify the SyncEntity sent in the commit message. + ASSERT_EQ(2U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(1).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag1")); + const sync_pb::SyncEntity& entity = GetLatestCommitEntityOnServer("tag1"); + EXPECT_FALSE(entity.id_string().empty()); + EXPECT_EQ(GenerateTagHash("tag1"), entity.client_defined_unique_tag()); + EXPECT_EQ(base_version, entity.version()); + EXPECT_TRUE(entity.deleted()); + + // Deletions should contain enough specifics to identify the type. + EXPECT_TRUE(entity.has_specifics()); + EXPECT_EQ(PREFERENCES, GetModelTypeFromSpecifics(entity.specifics())); + + // Verify the commit response returned to the model thread. + ASSERT_EQ(2U, GetNumModelThreadCommitResponses()); + EXPECT_EQ(1U, GetNthModelThreadCommitResponse(1).size()); + ASSERT_TRUE(HasCommitResponseOnModelThread("tag1")); + const CommitResponseData& commit_response = + GetCommitResponseOnModelThread("tag1"); + + EXPECT_EQ(entity.id_string(), commit_response.id); + EXPECT_EQ(entity.client_defined_unique_tag(), + commit_response.client_tag_hash); + EXPECT_EQ(entity.version(), commit_response.response_version); +} + +// The server doesn't like it when we try to delete an entity it's never heard +// of before. This test helps ensure we avoid that scenario. +TEST_F(NonBlockingTypeProcessorCoreTest, NoDeleteUncommitted) { + NormalInitialize(); + + // Request the commit of a new, never-before-seen item. + CommitRequest("tag1", "value1"); + EXPECT_TRUE(WillCommit()); + + // Request a deletion of that item before we've had a chance to commit it. + DeleteRequest("tag1"); + EXPECT_FALSE(WillCommit()); +} + +// Verifies the sending of an "initial sync done" signal. +TEST_F(NonBlockingTypeProcessorCoreTest, SendInitialSyncDone) { + FirstInitialize(); // Initialize with no saved sync state. + EXPECT_EQ(0U, GetNumModelThreadUpdateResponses()); + + // Receive an update response that contains only the type root node. + TriggerTypeRootUpdateFromServer(); + + // Two updates: + // - One triggered by process updates to forward the type root ID. + // - One triggered by apply updates, which the core interprets to mean + // "initial sync done". This triggers a model thread update, too. + EXPECT_EQ(2U, GetNumModelThreadUpdateResponses()); + + // The type root and initial sync done updates both contain no entities. + EXPECT_EQ(0U, GetNthModelThreadUpdateResponse(0).size()); + EXPECT_EQ(0U, GetNthModelThreadUpdateResponse(1).size()); + + const DataTypeState& state = GetNthModelThreadUpdateState(1); + EXPECT_FALSE(state.progress_marker.token().empty()); + EXPECT_FALSE(state.type_root_id.empty()); + EXPECT_TRUE(state.initial_sync_done); +} + +// Commit two new entities in two separate commit messages. +TEST_F(NonBlockingTypeProcessorCoreTest, TwoNewItemsCommittedSeparately) { + NormalInitialize(); + + // Commit the first of two entities. + CommitRequest("tag1", "value1"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + ASSERT_EQ(1U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(0).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag1")); + const sync_pb::SyncEntity& tag1_entity = + GetLatestCommitEntityOnServer("tag1"); + + // Commit the second of two entities. + CommitRequest("tag2", "value2"); + ASSERT_TRUE(WillCommit()); + DoSuccessfulCommit(); + ASSERT_EQ(2U, GetNumCommitMessagesOnServer()); + EXPECT_EQ(1, GetNthCommitMessageOnServer(1).commit().entries_size()); + ASSERT_TRUE(HasCommitEntityOnServer("tag2")); + const sync_pb::SyncEntity& tag2_entity = + GetLatestCommitEntityOnServer("tag2"); + + EXPECT_FALSE(WillCommit()); + + // The IDs assigned by the |core_| should be unique. + EXPECT_NE(tag1_entity.id_string(), tag2_entity.id_string()); + + // Check that the committed specifics values are sane. + EXPECT_EQ(tag1_entity.specifics().preference().value(), "value1"); + EXPECT_EQ(tag2_entity.specifics().preference().value(), "value2"); + + // There should have been two separate commit responses sent to the model + // thread. They should be uninteresting, so we don't bother inspecting them. + EXPECT_EQ(2U, GetNumModelThreadCommitResponses()); +} + +TEST_F(NonBlockingTypeProcessorCoreTest, ReceiveUpdates) { + NormalInitialize(); + + const std::string& tag_hash = GenerateTagHash("tag1"); + + TriggerUpdateFromServer(10, "tag1", "value1"); + + ASSERT_EQ(1U, GetNumModelThreadUpdateResponses()); + UpdateResponseDataList updates_list = GetNthModelThreadUpdateResponse(0); + ASSERT_EQ(1U, updates_list.size()); + + ASSERT_TRUE(HasUpdateResponseOnModelThread("tag1")); + UpdateResponseData update = GetUpdateResponseOnModelThread("tag1"); + + EXPECT_FALSE(update.id.empty()); + EXPECT_EQ(tag_hash, update.client_tag_hash); + EXPECT_LT(0, update.response_version); + EXPECT_FALSE(update.ctime.is_null()); + EXPECT_FALSE(update.mtime.is_null()); + EXPECT_EQ("tag1", update.non_unique_name); + EXPECT_FALSE(update.deleted); + EXPECT_EQ("tag1", update.specifics.preference().name()); + EXPECT_EQ("value1", update.specifics.preference().value()); +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_interface.cc b/sync/engine/non_blocking_type_processor_interface.cc new file mode 100644 index 0000000..b936239 --- /dev/null +++ b/sync/engine/non_blocking_type_processor_interface.cc @@ -0,0 +1,15 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/non_blocking_type_processor_interface.h" + +namespace syncer { + +NonBlockingTypeProcessorInterface::NonBlockingTypeProcessorInterface() { +} + +NonBlockingTypeProcessorInterface::~NonBlockingTypeProcessorInterface() { +} + +} // namespace syncer diff --git a/sync/engine/non_blocking_type_processor_interface.h b/sync/engine/non_blocking_type_processor_interface.h new file mode 100644 index 0000000..ccc504b --- /dev/null +++ b/sync/engine/non_blocking_type_processor_interface.h @@ -0,0 +1,31 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_ +#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_ + +#include "sync/base/sync_export.h" + +struct CommitResponseDataList; +struct DataTypeState; +struct UpdateResponseDataList; + +namespace syncer { + +class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorInterface { + public: + NonBlockingTypeProcessorInterface(); + virtual ~NonBlockingTypeProcessorInterface(); + + virtual void ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) = 0; + virtual void ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) = 0; +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_ diff --git a/sync/engine/non_blocking_type_processor_unittest.cc b/sync/engine/non_blocking_type_processor_unittest.cc index 9ab6ca3..3a3ca5c 100644 --- a/sync/engine/non_blocking_type_processor_unittest.cc +++ b/sync/engine/non_blocking_type_processor_unittest.cc @@ -134,14 +134,21 @@ class NonBlockingTypeProcessorTest : public ::testing::Test { NonBlockingTypeProcessorTest(); virtual ~NonBlockingTypeProcessorTest(); - // Explicit initialization step. Kept separate to allow tests to inject - // on-disk state before the test begins. - void Initialize(); + // Initialize with no local state. The processor will be unable to commit + // until it receives notification that initial sync has completed. + void FirstTimeInitialize(); + + // Initialize to a "ready-to-commit" state. + void InitializeToReadyState(); // Local data modification. Emulates signals from the model thread. void WriteItem(const std::string& tag, const std::string& value); void DeleteItem(const std::string& tag); + // Emulates an "initial sync done" message from the + // NonBlockingTypeProcessorCore. + void OnInitialSyncDone(); + // Emulate updates from the server. // This harness has some functionality to help emulate server behavior. // See the definitions of these methods for more information. @@ -190,11 +197,20 @@ NonBlockingTypeProcessorTest::NonBlockingTypeProcessorTest() NonBlockingTypeProcessorTest::~NonBlockingTypeProcessorTest() { } -void NonBlockingTypeProcessorTest::Initialize() { +void NonBlockingTypeProcessorTest::FirstTimeInitialize() { processor_->Enable(mock_sync_core_proxy_->Clone()); mock_processor_core_ = mock_sync_core_proxy_->GetMockProcessorCore(); } +void NonBlockingTypeProcessorTest::InitializeToReadyState() { + // TODO(rlarocque): This should be updated to inject on-disk state. + // At the time this code was written, there was no support for on-disk + // state so this was the only way to inject a data_type_state into + // the |processor_|. + FirstTimeInitialize(); + OnInitialSyncDone(); +} + void NonBlockingTypeProcessorTest::WriteItem(const std::string& tag, const std::string& value) { const std::string tag_hash = GenerateTagHash(tag); @@ -205,6 +221,13 @@ void NonBlockingTypeProcessorTest::DeleteItem(const std::string& tag) { processor_->Delete(tag); } +void NonBlockingTypeProcessorTest::OnInitialSyncDone() { + data_type_state_.initial_sync_done = true; + UpdateResponseDataList empty_update_list; + + processor_->OnUpdateReceived(data_type_state_, empty_update_list); +} + void NonBlockingTypeProcessorTest::UpdateFromServer(int64 version_offset, const std::string& tag, const std::string& value) { @@ -385,7 +408,7 @@ CommitRequestData NonBlockingTypeProcessorTest::GetLatestCommitRequestForTag( // Creates a new item locally. // Thoroughly tests the data generated by a local item creation. TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { - Initialize(); + InitializeToReadyState(); EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); @@ -396,7 +419,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { const CommitRequestData& tag1_data = GetLatestCommitRequestForTag("tag1"); EXPECT_TRUE(tag1_data.id.empty()); - EXPECT_EQ(0, tag1_data.base_version); + EXPECT_EQ(kUncommittedVersion, tag1_data.base_version); EXPECT_FALSE(tag1_data.ctime.is_null()); EXPECT_FALSE(tag1_data.mtime.is_null()); EXPECT_EQ("tag1", tag1_data.non_unique_name); @@ -408,7 +431,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) { // Creates a new local item then modifies it. // Thoroughly tests data generated by modification of server-unknown item. TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { - Initialize(); + InitializeToReadyState(); EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); @@ -428,7 +451,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { // Perform a thorough examination of the update-generated request. EXPECT_TRUE(tag1_v2_data.id.empty()); - EXPECT_EQ(0, tag1_v2_data.base_version); + EXPECT_EQ(kUncommittedVersion, tag1_v2_data.base_version); EXPECT_FALSE(tag1_v2_data.ctime.is_null()); EXPECT_FALSE(tag1_v2_data.mtime.is_null()); EXPECT_EQ("tag1", tag1_v2_data.non_unique_name); @@ -440,7 +463,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) { // Deletes an item we've never seen before. // Should have no effect and not crash. TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) { - Initialize(); + InitializeToReadyState(); DeleteItem("tag1"); EXPECT_EQ(0U, GetNumCommitRequestLists()); @@ -452,7 +475,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) { // server-unknown as far as the model thread is concerned. That behavior // is race-dependent; other tests are used to test other races. TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { - Initialize(); + InitializeToReadyState(); WriteItem("tag1", "value1"); EXPECT_EQ(1U, GetNumCommitRequestLists()); @@ -467,7 +490,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number); EXPECT_TRUE(tag1_v2_data.id.empty()); - EXPECT_EQ(0, tag1_v2_data.base_version); + EXPECT_EQ(kUncommittedVersion, tag1_v2_data.base_version); EXPECT_TRUE(tag1_v2_data.deleted); } @@ -477,7 +500,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) { // successfully commits it, but, before the commit response is picked up // by the model thread, the item is deleted by the model thread. TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) { - Initialize(); + InitializeToReadyState(); WriteItem("tag1", "value1"); EXPECT_EQ(1U, GetNumCommitRequestLists()); @@ -501,7 +524,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) { // Creates two different sync items. // Verifies that the second has no effect on the first. TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) { - Initialize(); + InitializeToReadyState(); EXPECT_EQ(0U, GetNumCommitRequestLists()); WriteItem("tag1", "value1"); @@ -519,6 +542,20 @@ TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) { ASSERT_TRUE(HasCommitRequestForTag("tag2")); } +// Starts the processor with no local state. +// Verify that it waits until initial sync is complete before requesting +// commits. +TEST_F(NonBlockingTypeProcessorTest, NoCommitsUntilInitialSyncDone) { + FirstTimeInitialize(); + + WriteItem("tag1", "value1"); + EXPECT_EQ(0U, GetNumCommitRequestLists()); + + OnInitialSyncDone(); + EXPECT_EQ(1U, GetNumCommitRequestLists()); + EXPECT_TRUE(HasCommitRequestForTag("tag1")); +} + // TODO(rlarocque): Add more testing of non_unique_name fields. } // namespace syncer diff --git a/sync/engine/sync_thread_sync_entity.cc b/sync/engine/sync_thread_sync_entity.cc new file mode 100644 index 0000000..21899d2 --- /dev/null +++ b/sync/engine/sync_thread_sync_entity.cc @@ -0,0 +1,242 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_thread_sync_entity.h" + +#include "base/logging.h" +#include "sync/engine/non_blocking_sync_common.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" + +namespace syncer { + +SyncThreadSyncEntity* SyncThreadSyncEntity::FromServerUpdate( + const std::string& id_string, + const std::string& client_tag_hash, + int64 received_version) { + return new SyncThreadSyncEntity( + id_string, client_tag_hash, 0, received_version); +} + +SyncThreadSyncEntity* SyncThreadSyncEntity::FromCommitRequest( + const std::string& id_string, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics) { + return new SyncThreadSyncEntity(id_string, + client_tag_hash, + 0, + 0, + true, + sequence_number, + base_version, + ctime, + mtime, + non_unique_name, + deleted, + specifics); +} + +// Constructor that does not set any pending commit fields. +SyncThreadSyncEntity::SyncThreadSyncEntity( + const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version) + : id_(id), + client_tag_hash_(client_tag_hash), + highest_commit_response_version_(highest_commit_response_version), + highest_gu_response_version_(highest_gu_response_version), + is_commit_pending_(false), + sequence_number_(0), + base_version_(0), + deleted_(false) { +} + +SyncThreadSyncEntity::SyncThreadSyncEntity( + const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version, + bool is_commit_pending, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics) + : id_(id), + client_tag_hash_(client_tag_hash), + highest_commit_response_version_(highest_commit_response_version), + highest_gu_response_version_(highest_gu_response_version), + is_commit_pending_(is_commit_pending), + sequence_number_(sequence_number), + base_version_(base_version), + ctime_(ctime), + mtime_(mtime), + non_unique_name_(non_unique_name), + deleted_(deleted), + specifics_(specifics) { +} + +SyncThreadSyncEntity::~SyncThreadSyncEntity() { +} + +bool SyncThreadSyncEntity::IsCommitPending() const { + return is_commit_pending_; +} + +void SyncThreadSyncEntity::PrepareCommitProto( + sync_pb::SyncEntity* commit_entity, + int64* sequence_number) const { + // Set ID if we have a server-assigned ID. Otherwise, it will be up to + // our caller to assign a client-unique initial ID. + if (base_version_ != kUncommittedVersion) { + commit_entity->set_id_string(id_); + } + + commit_entity->set_client_defined_unique_tag(client_tag_hash_); + commit_entity->set_version(base_version_); + commit_entity->set_deleted(deleted_); + commit_entity->set_folder(false); + commit_entity->set_name(non_unique_name_); + if (!deleted_) { + commit_entity->set_ctime(TimeToProtoTime(ctime_)); + commit_entity->set_mtime(TimeToProtoTime(mtime_)); + commit_entity->mutable_specifics()->CopyFrom(specifics_); + } + + *sequence_number = sequence_number_; +} + +void SyncThreadSyncEntity::RequestCommit( + const std::string& id, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics) { + DCHECK_GE(base_version, base_version_) + << "Base version should never decrease"; + + DCHECK_GE(sequence_number, sequence_number_) + << "Sequence number should never decrease"; + + // Update our book-keeping counters. + base_version_ = base_version; + sequence_number_ = sequence_number; + + // Do our counter values indicate a conflict? If so, don't commit. + // + // There's no need to inform the model thread of the conflict. The + // conflicting update has already been posted to its task runner; it will + // figure it out as soon as it runs that task. + is_commit_pending_ = true; + if (IsInConflict()) { + ClearPendingCommit(); + return; + } + + // We don't commit deletions of server-unknown items. + if (deleted && !IsServerKnown()) { + ClearPendingCommit(); + return; + } + + // Otherwise, we should store the data associated with this pending commit + // so we're ready to commit at the next possible opportunity. + + // We intentionally don't update the id_ here. Good ID values come from the + // server and always pass through the sync thread first. There's no way the + // model thread could have a better ID value than we do. + + // This entity is identified by its client tag. That value can never change. + DCHECK_EQ(client_tag_hash_, client_tag_hash); + + // Set the fields for the pending commit. + ctime_ = ctime; + mtime_ = mtime; + non_unique_name_ = non_unique_name; + deleted_ = deleted; + specifics_ = specifics; +} + +void SyncThreadSyncEntity::ReceiveCommitResponse(const std::string& response_id, + int64 response_version, + int64 sequence_number) { + // Commit responses, especially after the first commit, can update our ID. + id_ = response_id; + + DCHECK_GT(response_version, highest_commit_response_version_) + << "Had expected higher response version." + << " id: " << id_; + + // Commits are synchronous, so there's no reason why the sequence numbers + // wouldn't match. + DCHECK_EQ(sequence_number_, sequence_number) + << "Unexpected sequence number mismatch." + << " id: " << id_; + + highest_commit_response_version_ = response_version; + + // Because an in-progress commit blocks the sync thread, we can assume that + // the item we just committed successfully is exactly the one we have now. + // Nothing changed it while the commit was happening. Since we're now in + // sync with the server, we can clear the pending commit. + ClearPendingCommit(); +} + +void SyncThreadSyncEntity::ReceiveUpdate(int64 version) { + highest_gu_response_version_ = + std::max(highest_gu_response_version_, version); + + if (IsInConflict()) { + // Incoming update clobbers the pending commit on the sync thread. + // The model thread can re-request this commit later if it wants to. + ClearPendingCommit(); + } +} + +bool SyncThreadSyncEntity::IsInConflict() const { + if (!is_commit_pending_) + return false; + + if (highest_gu_response_version_ <= highest_commit_response_version_) { + // The most recent server state was created in a commit made by this + // client. We're fully up to date, and therefore not in conflict. + return false; + } else { + // The most recent server state was written by someone else. + // Did the model thread have the most up to date version when it issued the + // commit request? + if (base_version_ >= highest_gu_response_version_) { + return false; // Yes. + } else { + return true; // No. + } + } +} + +bool SyncThreadSyncEntity::IsServerKnown() const { + return base_version_ != kUncommittedVersion; +} + +void SyncThreadSyncEntity::ClearPendingCommit() { + is_commit_pending_ = false; + + // Clearing the specifics might free up some memory. It can't hurt to try. + specifics_.Clear(); +} + +} // namespace syncer diff --git a/sync/engine/sync_thread_sync_entity.h b/sync/engine/sync_thread_sync_entity.h new file mode 100644 index 0000000..6ed9685 --- /dev/null +++ b/sync/engine/sync_thread_sync_entity.h @@ -0,0 +1,155 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_ +#define SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_ + +#include <string> + +#include "base/basictypes.h" +#include "base/time/time.h" +#include "sync/base/sync_export.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +// Manages the pending commit and update state for an entity on the sync +// thread. +// +// It should be considered a helper class internal to the +// NonBlockingTypeProcessorCore. +// +// Maintains the state associated with a particular sync entity which is +// necessary for decision-making on the sync thread. It can track pending +// commit state, received update state, and can detect conflicts. +// +// This object may or may not contain state associated with a pending commit. +// If no commit is pending, the |is_commit_pending_| flag will be set to false +// and many of this object's fields will be cleared. +class SYNC_EXPORT SyncThreadSyncEntity { + public: + ~SyncThreadSyncEntity(); + + // Initialize a new entity based on an update response. + static SyncThreadSyncEntity* FromServerUpdate( + const std::string& id_string, + const std::string& client_tag_hash, + int64 version); + + // Initialize a new entity based on a commit request. + static SyncThreadSyncEntity* FromCommitRequest( + const std::string& id_string, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics); + + // Returns true if this entity should be commited to the server. + bool IsCommitPending() const; + + // Populates a sync_pb::SyncEntity for a commit. Also sets the + // |sequence_number|, so we can track it throughout the commit process. + void PrepareCommitProto(sync_pb::SyncEntity* commit_entity, + int64* sequence_number) const; + + // Updates this entity with data from the latest version that the + // model asked us to commit. May clobber state related to the + // model's previous commit attempt(s). + void RequestCommit(const std::string& id, + const std::string& client_tag_hash, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics); + + // Handles the receipt of a commit response. + // + // Since commits happen entirely on the sync thread, we can safely assume + // that our item's state at the end of the commit is the same as it was at + // the start. + void ReceiveCommitResponse(const std::string& response_id, + int64 response_version, + int64 sequence_number); + + // Handles receipt of an update from the server. + void ReceiveUpdate(int64 version); + + private: + // Initializes received update state. Does not initialize state related to + // pending commits and sets |is_commit_pending_| to false. + SyncThreadSyncEntity(const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version); + + // Initializes all fields. Sets |is_commit_pending_| to true. + SyncThreadSyncEntity(const std::string& id, + const std::string& client_tag_hash, + int64 highest_commit_response_version, + int64 highest_gu_response_version, + bool is_commit_pending, + int64 sequence_number, + int64 base_version, + base::Time ctime, + base::Time mtime, + const std::string& non_unique_name, + bool deleted, + const sync_pb::EntitySpecifics& specifics); + + // Checks if the current state indicates a conflict. + // + // This can be true only while a call to this object is in progress. + // Conflicts are always cleared before the method call ends. + bool IsInConflict() const; + + // Checks if the server knows about this item. + bool IsServerKnown() const; + + // Clears flag and optionally clears state associated with a pending commit. + void ClearPendingCommit(); + + // The ID for this entry. May be empty if the entry has never been committed. + std::string id_; + + // The hashed client tag for this entry. + std::string client_tag_hash_; + + // The highest version seen in a commit response for this entry. + int64 highest_commit_response_version_; + + // The highest version seen in a GU response for this entry. + int64 highest_gu_response_version_; + + // Flag that indicates whether or not we're waiting for a chance to commit + // this item. + bool is_commit_pending_; + + // Used to track in-flight commit requests on the model thread. All we need + // to do here is return it back to the model thread when the pending commit + // is completed and confirmed. Not valid if no commit is pending. + int64 sequence_number_; + + // The following fields are valid only when a commit is pending. + // This is where we store the data that is to be sent up to the server + // at the next possible opportunity. + int64 base_version_; + base::Time ctime_; + base::Time mtime_; + std::string non_unique_name_; + bool deleted_; + sync_pb::EntitySpecifics specifics_; + + DISALLOW_COPY_AND_ASSIGN(SyncThreadSyncEntity); +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_ diff --git a/sync/engine/sync_thread_sync_entity_unittest.cc b/sync/engine/sync_thread_sync_entity_unittest.cc new file mode 100644 index 0000000..be2604b --- /dev/null +++ b/sync/engine/sync_thread_sync_entity_unittest.cc @@ -0,0 +1,164 @@ + +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/engine/sync_thread_sync_entity.h" + +#include "base/memory/scoped_ptr.h" +#include "base/time/time.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace syncer { + +// Some simple tests for the SyncThreadSyncEntity. +// +// The SyncThreadSyncEntity is an implementation detail of the +// NonBlockingTypeProcessorCore. As such, it doesn't make much sense to test +// it exhaustively, since it already gets a lot of test coverage from the +// NonBlockingTypeProcessorCore unit tests. +// +// These tests are intended as a basic sanity check. Anything more complicated +// would be redundant. +class SyncThreadSyncEntityTest : public ::testing::Test { + public: + SyncThreadSyncEntityTest() + : kServerId("ServerID"), + kClientTag("some.sample.tag"), + kClientTagHash(syncable::GenerateSyncableHash(PREFERENCES, kClientTag)), + kCtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(10)), + kMtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(20)) { + specifics.mutable_preference()->set_name(kClientTag); + specifics.mutable_preference()->set_value("pref.value"); + } + + virtual ~SyncThreadSyncEntityTest() {} + + const std::string kServerId; + const std::string kClientTag; + const std::string kClientTagHash; + const base::Time kCtime; + const base::Time kMtime; + sync_pb::EntitySpecifics specifics; +}; + +// Construct a new entity from a server update. Then receive another update. +TEST_F(SyncThreadSyncEntityTest, FromServerUpdate) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10)); + EXPECT_FALSE(entity->IsCommitPending()); + + entity->ReceiveUpdate(20); + EXPECT_FALSE(entity->IsCommitPending()); +} + +// Construct a new entity from a commit request. Then serialize it. +TEST_F(SyncThreadSyncEntityTest, FromCommitRequest) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromCommitRequest(kServerId, + kClientTagHash, + 22, + 33, + kCtime, + kMtime, + kClientTag, + false, + specifics)); + + ASSERT_TRUE(entity->IsCommitPending()); + sync_pb::SyncEntity pb_entity; + int64 sequence_number = 0; + entity->PrepareCommitProto(&pb_entity, &sequence_number); + EXPECT_EQ(22, sequence_number); + EXPECT_EQ(kServerId, pb_entity.id_string()); + EXPECT_EQ(kClientTagHash, pb_entity.client_defined_unique_tag()); + EXPECT_EQ(33, pb_entity.version()); + EXPECT_EQ(kCtime, ProtoTimeToTime(pb_entity.ctime())); + EXPECT_EQ(kMtime, ProtoTimeToTime(pb_entity.mtime())); + EXPECT_FALSE(pb_entity.deleted()); + EXPECT_EQ(specifics.preference().name(), + pb_entity.specifics().preference().name()); + EXPECT_EQ(specifics.preference().value(), + pb_entity.specifics().preference().value()); +} + +// Start with a server initiated entity. Commit over top of it. +TEST_F(SyncThreadSyncEntityTest, RequestCommit) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10)); + + entity->RequestCommit(kServerId, + kClientTagHash, + 1, + 10, + kCtime, + kMtime, + kClientTag, + false, + specifics); + + EXPECT_TRUE(entity->IsCommitPending()); +} + +// Start with a server initiated entity. Fail to request a commit because of +// an out of date base version. +TEST_F(SyncThreadSyncEntityTest, RequestCommitFailure) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10)); + EXPECT_FALSE(entity->IsCommitPending()); + + entity->RequestCommit(kServerId, + kClientTagHash, + 23, + 5, // Version 5 < 10 + kCtime, + kMtime, + kClientTag, + false, + specifics); + EXPECT_FALSE(entity->IsCommitPending()); +} + +// Start with a pending commit. Clobber it with an incoming update. +TEST_F(SyncThreadSyncEntityTest, UpdateClobbersCommit) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromCommitRequest(kServerId, + kClientTagHash, + 22, + 33, + kCtime, + kMtime, + kClientTag, + false, + specifics)); + + EXPECT_TRUE(entity->IsCommitPending()); + + entity->ReceiveUpdate(400); // Version 400 > 33. + EXPECT_FALSE(entity->IsCommitPending()); +} + +// Start with a pending commit. Send it a reflected update that +// will not override the in-progress commit. +TEST_F(SyncThreadSyncEntityTest, ReflectedUpdateDoesntClobberCommit) { + scoped_ptr<SyncThreadSyncEntity> entity( + SyncThreadSyncEntity::FromCommitRequest(kServerId, + kClientTagHash, + 22, + 33, + kCtime, + kMtime, + kClientTag, + false, + specifics)); + + EXPECT_TRUE(entity->IsCommitPending()); + + entity->ReceiveUpdate(33); // Version 33 == 33. + EXPECT_TRUE(entity->IsCommitPending()); +} + +} // namespace syncer diff --git a/sync/protocol/proto_value_conversions.cc b/sync/protocol/proto_value_conversions.cc index e1c8851..5ea434c 100644 --- a/sync/protocol/proto_value_conversions.cc +++ b/sync/protocol/proto_value_conversions.cc @@ -813,6 +813,8 @@ base::StringValue* UniquePositionToStringValue( return new base::StringValue(pos.ToDebugString()); } +} // namespace + base::DictionaryValue* SyncEntityToValue(const sync_pb::SyncEntity& proto, bool include_specifics) { base::DictionaryValue* value = new base::DictionaryValue(); @@ -839,6 +841,8 @@ base::DictionaryValue* SyncEntityToValue(const sync_pb::SyncEntity& proto, return value; } +namespace { + base::ListValue* SyncEntitiesToValue( const ::google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities, bool include_specifics) { diff --git a/sync/protocol/proto_value_conversions.h b/sync/protocol/proto_value_conversions.h index c8e5166c..2a3c1ae 100644 --- a/sync/protocol/proto_value_conversions.h +++ b/sync/protocol/proto_value_conversions.h @@ -63,6 +63,7 @@ class SessionTab; class SessionWindow; class SimpleCollapsedLayout; class SyncCycleCompletedEventInfo; +class SyncEntity; class SyncedNotification; class SyncedNotificationAction; class SyncedNotificationAppInfo; @@ -278,6 +279,10 @@ SYNC_EXPORT_PRIVATE base::DictionaryValue* TypedUrlSpecificsToValue( SYNC_EXPORT_PRIVATE base::DictionaryValue* EntitySpecificsToValue( const sync_pb::EntitySpecifics& specifics); +SYNC_EXPORT_PRIVATE base::DictionaryValue* SyncEntityToValue( + const sync_pb::SyncEntity& entity, + bool include_specifics); + SYNC_EXPORT_PRIVATE base::DictionaryValue* ClientToServerMessageToValue( const sync_pb::ClientToServerMessage& proto, bool include_specifics); diff --git a/sync/sessions/model_type_registry.cc b/sync/sessions/model_type_registry.cc index 7ca68c7..9ceac74 100644 --- a/sync/sessions/model_type_registry.cc +++ b/sync/sessions/model_type_registry.cc @@ -13,12 +13,64 @@ #include "sync/engine/non_blocking_type_processor.h" #include "sync/engine/non_blocking_type_processor_core.h" #include "sync/engine/non_blocking_type_processor_core_interface.h" +#include "sync/engine/non_blocking_type_processor_interface.h" #include "sync/sessions/directory_type_debug_info_emitter.h" namespace syncer { namespace { +class NonBlockingTypeProcessorWrapper + : public NonBlockingTypeProcessorInterface { + public: + NonBlockingTypeProcessorWrapper( + base::WeakPtr<NonBlockingTypeProcessor> processor, + scoped_refptr<base::SequencedTaskRunner> processor_task_runner); + virtual ~NonBlockingTypeProcessorWrapper(); + + virtual void ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) OVERRIDE; + virtual void ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) OVERRIDE; + + private: + base::WeakPtr<NonBlockingTypeProcessor> processor_; + scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; +}; + +NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper( + base::WeakPtr<NonBlockingTypeProcessor> processor, + scoped_refptr<base::SequencedTaskRunner> processor_task_runner) + : processor_(processor), processor_task_runner_(processor_task_runner) { +} + +NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() { +} + +void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + processor_task_runner_->PostTask( + FROM_HERE, + base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion, + processor_, + type_state, + response_list)); +} + +void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse( + const DataTypeState& type_state, + const UpdateResponseDataList& response_list) { + processor_task_runner_->PostTask( + FROM_HERE, + base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived, + processor_, + type_state, + response_list)); +} + class NonBlockingTypeProcessorCoreWrapper : public NonBlockingTypeProcessorCoreInterface { public: @@ -47,7 +99,7 @@ void NonBlockingTypeProcessorCoreWrapper::RequestCommits( const CommitRequestDataList& list) { sync_thread_->PostTask( FROM_HERE, - base::Bind(&NonBlockingTypeProcessorCore::RequestCommits, core_, list)); + base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list)); } } // namespace @@ -139,10 +191,11 @@ void ModelTypeRegistry::InitializeNonBlockingType( DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); // Initialize CoreProcessor -> Processor communication channel. + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface( + new NonBlockingTypeProcessorWrapper(processor, type_task_runner)); scoped_ptr<NonBlockingTypeProcessorCore> core( - new NonBlockingTypeProcessorCore(type, type_task_runner, processor)); - - // TODO(rlarocque): DataTypeState should be forwarded to core here. + new NonBlockingTypeProcessorCore( + type, data_type_state, processor_interface.Pass())); // Initialize Processor -> CoreProcessor communication channel. scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface( diff --git a/sync/sync_core.gypi b/sync/sync_core.gypi index bec8952..8772d43 100644 --- a/sync/sync_core.gypi +++ b/sync/sync_core.gypi @@ -68,12 +68,16 @@ 'engine/net/url_translator.h', 'engine/non_blocking_sync_common.cc', 'engine/non_blocking_sync_common.h', + 'engine/non_blocking_type_commit_contribution.cc', + 'engine/non_blocking_type_commit_contribution.h', 'engine/non_blocking_type_processor.cc', 'engine/non_blocking_type_processor.h', 'engine/non_blocking_type_processor_core.cc', 'engine/non_blocking_type_processor_core.h', 'engine/non_blocking_type_processor_core_interface.cc', 'engine/non_blocking_type_processor_core_interface.h', + 'engine/non_blocking_type_processor_interface.cc', + 'engine/non_blocking_type_processor_interface.h', 'engine/nudge_source.cc', 'engine/nudge_source.h', 'engine/process_updates_util.cc', @@ -86,6 +90,8 @@ 'engine/sync_scheduler.h', 'engine/sync_scheduler_impl.cc', 'engine/sync_scheduler_impl.h', + 'engine/sync_thread_sync_entity.cc', + 'engine/sync_thread_sync_entity.h', 'engine/syncer.cc', 'engine/syncer.h', 'engine/syncer_proto_util.cc', diff --git a/sync/sync_tests.gypi b/sync/sync_tests.gypi index cd4f933..eea0d38 100644 --- a/sync/sync_tests.gypi +++ b/sync/sync_tests.gypi @@ -300,8 +300,10 @@ 'engine/directory_update_handler_unittest.cc', 'engine/get_updates_processor_unittest.cc', 'engine/model_thread_sync_entity_unittest.cc', + 'engine/non_blocking_type_processor_core_unittest.cc', 'engine/non_blocking_type_processor_unittest.cc', 'engine/sync_scheduler_unittest.cc', + 'engine/sync_thread_sync_entity_unittest.cc', 'engine/syncer_proto_util_unittest.cc', 'engine/syncer_unittest.cc', 'engine/syncer_util_unittest.cc', |