summaryrefslogtreecommitdiffstats
path: root/sync/engine/non_blocking_type_processor_core.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sync/engine/non_blocking_type_processor_core.cc')
-rw-r--r--sync/engine/non_blocking_type_processor_core.cc242
1 files changed, 218 insertions, 24 deletions
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc
index 236c9c5..a14a8d0 100644
--- a/sync/engine/non_blocking_type_processor_core.cc
+++ b/sync/engine/non_blocking_type_processor_core.cc
@@ -4,20 +4,28 @@
#include "sync/engine/non_blocking_type_processor_core.h"
+#include "base/bind.h"
+#include "base/format_macros.h"
#include "base/logging.h"
+#include "base/strings/stringprintf.h"
#include "sync/engine/commit_contribution.h"
+#include "sync/engine/non_blocking_type_commit_contribution.h"
+#include "sync/engine/non_blocking_type_processor_interface.h"
+#include "sync/engine/sync_thread_sync_entity.h"
+#include "sync/syncable/syncable_util.h"
+#include "sync/util/time.h"
namespace syncer {
NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
- ModelType type,
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner,
- base::WeakPtr<NonBlockingTypeProcessor> processor)
+ ModelType type,
+ const DataTypeState& initial_state,
+ scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
: type_(type),
- processor_task_runner_(processor_task_runner),
- processor_(processor),
+ data_type_state_(initial_state),
+ processor_interface_(processor_interface.Pass()),
+ entities_deleter_(&entities_),
weak_ptr_factory_(this) {
- progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type));
}
NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
@@ -32,16 +40,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const {
void NonBlockingTypeProcessorCore::GetDownloadProgress(
sync_pb::DataTypeProgressMarker* progress_marker) const {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_);
- *progress_marker = progress_marker_;
+ progress_marker->CopyFrom(data_type_state_.progress_marker);
}
void NonBlockingTypeProcessorCore::GetDataTypeContext(
sync_pb::DataTypeContext* context) const {
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Getting context for: " << ModelTypeToString(type_);
- context->Clear();
+ DCHECK(CalledOnValidThread());
+ context->CopyFrom(data_type_state_.type_context);
}
SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
@@ -50,22 +55,78 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
const SyncEntityList& applicable_updates,
sessions::StatusController* status) {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_);
- progress_marker_ = progress_marker;
+
+ // TODO(rlarocque): Handle data type context conflicts.
+ data_type_state_.type_context = mutated_context;
+ data_type_state_.progress_marker = progress_marker;
+
+ UpdateResponseDataList response_datas;
+
+ for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
+ update_it != applicable_updates.end();
+ ++update_it) {
+ const sync_pb::SyncEntity* update_entity = *update_it;
+ if (!update_entity->server_defined_unique_tag().empty()) {
+ // We can't commit an item unless we know its parent ID. This is where
+ // we learn that ID and remember it forever.
+ DCHECK_EQ(ModelTypeToRootTag(type_),
+ update_entity->server_defined_unique_tag());
+ if (!data_type_state_.type_root_id.empty()) {
+ DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
+ }
+ data_type_state_.type_root_id = update_entity->id_string();
+ } else {
+ // Normal updates are handled here.
+ const std::string& client_tag_hash =
+ update_entity->client_defined_unique_tag();
+ DCHECK(!client_tag_hash.empty());
+ EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
+ if (map_it == entities_.end()) {
+ SyncThreadSyncEntity* entity =
+ SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
+ client_tag_hash,
+ update_entity->version());
+ entities_.insert(std::make_pair(client_tag_hash, entity));
+ } else {
+ SyncThreadSyncEntity* entity = map_it->second;
+ entity->ReceiveUpdate(update_entity->version());
+ }
+
+ // Prepare the message for the model thread.
+ UpdateResponseData response_data;
+ response_data.id = update_entity->id_string();
+ response_data.client_tag_hash = client_tag_hash;
+ response_data.response_version = update_entity->version();
+ response_data.ctime = ProtoTimeToTime(update_entity->ctime());
+ response_data.mtime = ProtoTimeToTime(update_entity->mtime());
+ response_data.non_unique_name = update_entity->name();
+ response_data.deleted = update_entity->deleted();
+ response_data.specifics = update_entity->specifics();
+
+ response_datas.push_back(response_data);
+ }
+ }
+
+ // Forward these updates to the model thread so it can do the rest.
+ processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
+
return SYNCER_OK;
}
void NonBlockingTypeProcessorCore::ApplyUpdates(
sessions::StatusController* status) {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_);
-}
+ // This function is called only when we've finished a download cycle, ie. we
+ // got a response with changes_remaining == 0. If this is our first download
+ // cycle, we should update our state so the NonBlockingTypeProcessor knows
+ // that it's safe to commit items now.
+ if (!data_type_state_.initial_sync_done) {
+ data_type_state_.initial_sync_done = true;
-void NonBlockingTypeProcessorCore::RequestCommits(
- const CommitRequestDataList& request_list) {
- // TODO(rlarocque): Implement this. crbug.com/351005.
+ UpdateResponseDataList empty_update_list;
+ processor_interface_->ReceiveUpdateResponse(data_type_state_,
+ empty_update_list);
+ }
}
void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
@@ -75,13 +136,119 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
<< "ModelType is: " << ModelTypeToString(type_);
}
+void NonBlockingTypeProcessorCore::EnqueueForCommit(
+ const CommitRequestDataList& list) {
+ DCHECK(CalledOnValidThread());
+
+ DCHECK(CanCommitItems())
+ << "Asked to commit items before type was initialized. "
+ << "ModelType is: " << ModelTypeToString(type_);
+
+ for (CommitRequestDataList::const_iterator it = list.begin();
+ it != list.end();
+ ++it) {
+ StorePendingCommit(*it);
+ }
+}
+
// CommitContributor implementation.
scoped_ptr<CommitContribution>
NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_);
- return scoped_ptr<CommitContribution>();
+
+ size_t space_remaining = max_entries;
+ std::vector<int64> sequence_numbers;
+ google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
+
+ if (!CanCommitItems())
+ return scoped_ptr<CommitContribution>();
+
+ // TODO(rlarocque): Avoid iterating here.
+ for (EntityMap::const_iterator it = entities_.begin();
+ it != entities_.end() && space_remaining > 0;
+ ++it) {
+ SyncThreadSyncEntity* entity = it->second;
+ if (entity->IsCommitPending()) {
+ sync_pb::SyncEntity* commit_entity = commit_entities.Add();
+ int64 sequence_number = -1;
+
+ entity->PrepareCommitProto(commit_entity, &sequence_number);
+ HelpInitializeCommitEntity(commit_entity);
+ sequence_numbers.push_back(sequence_number);
+
+ space_remaining--;
+ }
+ }
+
+ if (commit_entities.size() == 0)
+ return scoped_ptr<CommitContribution>();
+
+ return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
+ data_type_state_.type_context, commit_entities, sequence_numbers, this));
+}
+
+void NonBlockingTypeProcessorCore::StorePendingCommit(
+ const CommitRequestData& request) {
+ if (!request.deleted) {
+ DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
+ }
+
+ EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
+ if (map_it == entities_.end()) {
+ SyncThreadSyncEntity* entity =
+ SyncThreadSyncEntity::FromCommitRequest(request.id,
+ request.client_tag_hash,
+ request.sequence_number,
+ request.base_version,
+ request.ctime,
+ request.mtime,
+ request.non_unique_name,
+ request.deleted,
+ request.specifics);
+ entities_.insert(std::make_pair(request.client_tag_hash, entity));
+ } else {
+ SyncThreadSyncEntity* entity = map_it->second;
+ entity->RequestCommit(request.id,
+ request.client_tag_hash,
+ request.sequence_number,
+ request.base_version,
+ request.ctime,
+ request.mtime,
+ request.non_unique_name,
+ request.deleted,
+ request.specifics);
+ }
+
+ // TODO: Nudge SyncScheduler.
+}
+
+void NonBlockingTypeProcessorCore::OnCommitResponse(
+ const CommitResponseDataList& response_list) {
+ for (CommitResponseDataList::const_iterator response_it =
+ response_list.begin();
+ response_it != response_list.end();
+ ++response_it) {
+ const std::string client_tag_hash = response_it->client_tag_hash;
+ EntityMap::iterator map_it = entities_.find(client_tag_hash);
+
+ // There's no way we could have committed an entry we know nothing about.
+ if (map_it == entities_.end()) {
+ NOTREACHED() << "Received commit response for item unknown to us."
+ << " Model type: " << ModelTypeToString(type_)
+ << " ID: " << response_it->id;
+ continue;
+ }
+
+ SyncThreadSyncEntity* entity = map_it->second;
+ entity->ReceiveCommitResponse(response_it->id,
+ response_it->response_version,
+ response_it->sequence_number);
+ }
+
+ // Send the responses back to the model thread. It needs to know which
+ // items have been successfully committed so it can save that information in
+ // permanent storage.
+ processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
}
base::WeakPtr<NonBlockingTypeProcessorCore>
@@ -89,4 +256,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
+bool NonBlockingTypeProcessorCore::CanCommitItems() const {
+ // We can't commit anything until we know the type's parent node.
+ // We'll get it in the first update response.
+ return !data_type_state_.type_root_id.empty() &&
+ data_type_state_.initial_sync_done;
+}
+
+void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity(
+ sync_pb::SyncEntity* sync_entity) {
+ // Initial commits need our help to generate a client ID.
+ if (!sync_entity->has_id_string()) {
+ DCHECK_EQ(kUncommittedVersion, sync_entity->version());
+ const int64 id = data_type_state_.next_client_id++;
+ sync_entity->set_id_string(
+ base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
+ }
+
+ // Always include enough specifics to identify the type. Do this even in
+ // deletion requests, where the specifics are otherwise invalid.
+ if (!sync_entity->has_specifics()) {
+ AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
+ }
+
+ // We're always responsible for the parent ID.
+ sync_entity->set_parent_id_string(data_type_state_.type_root_id);
+}
+
} // namespace syncer