diff options
Diffstat (limited to 'sync/engine/non_blocking_type_processor_core.cc')
-rw-r--r-- | sync/engine/non_blocking_type_processor_core.cc | 242 |
1 files changed, 218 insertions, 24 deletions
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc index 236c9c5..a14a8d0 100644 --- a/sync/engine/non_blocking_type_processor_core.cc +++ b/sync/engine/non_blocking_type_processor_core.cc @@ -4,20 +4,28 @@ #include "sync/engine/non_blocking_type_processor_core.h" +#include "base/bind.h" +#include "base/format_macros.h" #include "base/logging.h" +#include "base/strings/stringprintf.h" #include "sync/engine/commit_contribution.h" +#include "sync/engine/non_blocking_type_commit_contribution.h" +#include "sync/engine/non_blocking_type_processor_interface.h" +#include "sync/engine/sync_thread_sync_entity.h" +#include "sync/syncable/syncable_util.h" +#include "sync/util/time.h" namespace syncer { NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( - ModelType type, - scoped_refptr<base::SequencedTaskRunner> processor_task_runner, - base::WeakPtr<NonBlockingTypeProcessor> processor) + ModelType type, + const DataTypeState& initial_state, + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface) : type_(type), - processor_task_runner_(processor_task_runner), - processor_(processor), + data_type_state_(initial_state), + processor_interface_(processor_interface.Pass()), + entities_deleter_(&entities_), weak_ptr_factory_(this) { - progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type)); } NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() { @@ -32,16 +40,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const { void NonBlockingTypeProcessorCore::GetDownloadProgress( sync_pb::DataTypeProgressMarker* progress_marker) const { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_); - *progress_marker = progress_marker_; + progress_marker->CopyFrom(data_type_state_.progress_marker); } void NonBlockingTypeProcessorCore::GetDataTypeContext( sync_pb::DataTypeContext* context) const { - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting context for: " << ModelTypeToString(type_); - context->Clear(); + DCHECK(CalledOnValidThread()); + context->CopyFrom(data_type_state_.type_context); } SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( @@ -50,22 +55,78 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( const SyncEntityList& applicable_updates, sessions::StatusController* status) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_); - progress_marker_ = progress_marker; + + // TODO(rlarocque): Handle data type context conflicts. + data_type_state_.type_context = mutated_context; + data_type_state_.progress_marker = progress_marker; + + UpdateResponseDataList response_datas; + + for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); + update_it != applicable_updates.end(); + ++update_it) { + const sync_pb::SyncEntity* update_entity = *update_it; + if (!update_entity->server_defined_unique_tag().empty()) { + // We can't commit an item unless we know its parent ID. This is where + // we learn that ID and remember it forever. + DCHECK_EQ(ModelTypeToRootTag(type_), + update_entity->server_defined_unique_tag()); + if (!data_type_state_.type_root_id.empty()) { + DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); + } + data_type_state_.type_root_id = update_entity->id_string(); + } else { + // Normal updates are handled here. + const std::string& client_tag_hash = + update_entity->client_defined_unique_tag(); + DCHECK(!client_tag_hash.empty()); + EntityMap::const_iterator map_it = entities_.find(client_tag_hash); + if (map_it == entities_.end()) { + SyncThreadSyncEntity* entity = + SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(), + client_tag_hash, + update_entity->version()); + entities_.insert(std::make_pair(client_tag_hash, entity)); + } else { + SyncThreadSyncEntity* entity = map_it->second; + entity->ReceiveUpdate(update_entity->version()); + } + + // Prepare the message for the model thread. + UpdateResponseData response_data; + response_data.id = update_entity->id_string(); + response_data.client_tag_hash = client_tag_hash; + response_data.response_version = update_entity->version(); + response_data.ctime = ProtoTimeToTime(update_entity->ctime()); + response_data.mtime = ProtoTimeToTime(update_entity->mtime()); + response_data.non_unique_name = update_entity->name(); + response_data.deleted = update_entity->deleted(); + response_data.specifics = update_entity->specifics(); + + response_datas.push_back(response_data); + } + } + + // Forward these updates to the model thread so it can do the rest. + processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas); + return SYNCER_OK; } void NonBlockingTypeProcessorCore::ApplyUpdates( sessions::StatusController* status) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_); -} + // This function is called only when we've finished a download cycle, ie. we + // got a response with changes_remaining == 0. If this is our first download + // cycle, we should update our state so the NonBlockingTypeProcessor knows + // that it's safe to commit items now. + if (!data_type_state_.initial_sync_done) { + data_type_state_.initial_sync_done = true; -void NonBlockingTypeProcessorCore::RequestCommits( - const CommitRequestDataList& request_list) { - // TODO(rlarocque): Implement this. crbug.com/351005. + UpdateResponseDataList empty_update_list; + processor_interface_->ReceiveUpdateResponse(data_type_state_, + empty_update_list); + } } void NonBlockingTypeProcessorCore::PassiveApplyUpdates( @@ -75,13 +136,119 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates( << "ModelType is: " << ModelTypeToString(type_); } +void NonBlockingTypeProcessorCore::EnqueueForCommit( + const CommitRequestDataList& list) { + DCHECK(CalledOnValidThread()); + + DCHECK(CanCommitItems()) + << "Asked to commit items before type was initialized. " + << "ModelType is: " << ModelTypeToString(type_); + + for (CommitRequestDataList::const_iterator it = list.begin(); + it != list.end(); + ++it) { + StorePendingCommit(*it); + } +} + // CommitContributor implementation. scoped_ptr<CommitContribution> NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) { DCHECK(CalledOnValidThread()); - // TODO(rlarocque): Implement this properly. crbug.com/351005. - DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_); - return scoped_ptr<CommitContribution>(); + + size_t space_remaining = max_entries; + std::vector<int64> sequence_numbers; + google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; + + if (!CanCommitItems()) + return scoped_ptr<CommitContribution>(); + + // TODO(rlarocque): Avoid iterating here. + for (EntityMap::const_iterator it = entities_.begin(); + it != entities_.end() && space_remaining > 0; + ++it) { + SyncThreadSyncEntity* entity = it->second; + if (entity->IsCommitPending()) { + sync_pb::SyncEntity* commit_entity = commit_entities.Add(); + int64 sequence_number = -1; + + entity->PrepareCommitProto(commit_entity, &sequence_number); + HelpInitializeCommitEntity(commit_entity); + sequence_numbers.push_back(sequence_number); + + space_remaining--; + } + } + + if (commit_entities.size() == 0) + return scoped_ptr<CommitContribution>(); + + return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( + data_type_state_.type_context, commit_entities, sequence_numbers, this)); +} + +void NonBlockingTypeProcessorCore::StorePendingCommit( + const CommitRequestData& request) { + if (!request.deleted) { + DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); + } + + EntityMap::iterator map_it = entities_.find(request.client_tag_hash); + if (map_it == entities_.end()) { + SyncThreadSyncEntity* entity = + SyncThreadSyncEntity::FromCommitRequest(request.id, + request.client_tag_hash, + request.sequence_number, + request.base_version, + request.ctime, + request.mtime, + request.non_unique_name, + request.deleted, + request.specifics); + entities_.insert(std::make_pair(request.client_tag_hash, entity)); + } else { + SyncThreadSyncEntity* entity = map_it->second; + entity->RequestCommit(request.id, + request.client_tag_hash, + request.sequence_number, + request.base_version, + request.ctime, + request.mtime, + request.non_unique_name, + request.deleted, + request.specifics); + } + + // TODO: Nudge SyncScheduler. +} + +void NonBlockingTypeProcessorCore::OnCommitResponse( + const CommitResponseDataList& response_list) { + for (CommitResponseDataList::const_iterator response_it = + response_list.begin(); + response_it != response_list.end(); + ++response_it) { + const std::string client_tag_hash = response_it->client_tag_hash; + EntityMap::iterator map_it = entities_.find(client_tag_hash); + + // There's no way we could have committed an entry we know nothing about. + if (map_it == entities_.end()) { + NOTREACHED() << "Received commit response for item unknown to us." + << " Model type: " << ModelTypeToString(type_) + << " ID: " << response_it->id; + continue; + } + + SyncThreadSyncEntity* entity = map_it->second; + entity->ReceiveCommitResponse(response_it->id, + response_it->response_version, + response_it->sequence_number); + } + + // Send the responses back to the model thread. It needs to know which + // items have been successfully committed so it can save that information in + // permanent storage. + processor_interface_->ReceiveCommitResponse(data_type_state_, response_list); } base::WeakPtr<NonBlockingTypeProcessorCore> @@ -89,4 +256,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() { return weak_ptr_factory_.GetWeakPtr(); } +bool NonBlockingTypeProcessorCore::CanCommitItems() const { + // We can't commit anything until we know the type's parent node. + // We'll get it in the first update response. + return !data_type_state_.type_root_id.empty() && + data_type_state_.initial_sync_done; +} + +void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity( + sync_pb::SyncEntity* sync_entity) { + // Initial commits need our help to generate a client ID. + if (!sync_entity->has_id_string()) { + DCHECK_EQ(kUncommittedVersion, sync_entity->version()); + const int64 id = data_type_state_.next_client_id++; + sync_entity->set_id_string( + base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); + } + + // Always include enough specifics to identify the type. Do this even in + // deletion requests, where the specifics are otherwise invalid. + if (!sync_entity->has_specifics()) { + AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); + } + + // We're always responsible for the parent ID. + sync_entity->set_parent_id_string(data_type_state_.type_root_id); +} + } // namespace syncer |