summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sync/engine/model_thread_sync_entity.cc2
-rw-r--r--sync/engine/non_blocking_sync_common.cc15
-rw-r--r--sync/engine/non_blocking_sync_common.h7
-rw-r--r--sync/engine/non_blocking_type_commit_contribution.cc119
-rw-r--r--sync/engine/non_blocking_type_commit_contribution.h66
-rw-r--r--sync/engine/non_blocking_type_processor.cc11
-rw-r--r--sync/engine/non_blocking_type_processor_core.cc242
-rw-r--r--sync/engine/non_blocking_type_processor_core.h59
-rw-r--r--sync/engine/non_blocking_type_processor_core_unittest.cc961
-rw-r--r--sync/engine/non_blocking_type_processor_interface.cc15
-rw-r--r--sync/engine/non_blocking_type_processor_interface.h31
-rw-r--r--sync/engine/non_blocking_type_processor_unittest.cc63
-rw-r--r--sync/engine/sync_thread_sync_entity.cc242
-rw-r--r--sync/engine/sync_thread_sync_entity.h155
-rw-r--r--sync/engine/sync_thread_sync_entity_unittest.cc164
-rw-r--r--sync/protocol/proto_value_conversions.cc4
-rw-r--r--sync/protocol/proto_value_conversions.h5
-rw-r--r--sync/sessions/model_type_registry.cc61
-rw-r--r--sync/sync_core.gypi6
-rw-r--r--sync/sync_tests.gypi2
20 files changed, 2173 insertions, 57 deletions
diff --git a/sync/engine/model_thread_sync_entity.cc b/sync/engine/model_thread_sync_entity.cc
index fde2eb3..6e466f5 100644
--- a/sync/engine/model_thread_sync_entity.cc
+++ b/sync/engine/model_thread_sync_entity.cc
@@ -15,7 +15,7 @@ scoped_ptr<ModelThreadSyncEntity> ModelThreadSyncEntity::NewLocalItem(
1,
0,
0,
- 0,
+ kUncommittedVersion,
true,
std::string(), // Sync thread will assign the initial ID.
syncable::GenerateSyncableHash(GetModelTypeFromSpecifics(specifics),
diff --git a/sync/engine/non_blocking_sync_common.cc b/sync/engine/non_blocking_sync_common.cc
index c5a3656..0042990 100644
--- a/sync/engine/non_blocking_sync_common.cc
+++ b/sync/engine/non_blocking_sync_common.cc
@@ -6,25 +6,32 @@
namespace syncer {
-DataTypeState::DataTypeState() {
+DataTypeState::DataTypeState() : next_client_id(0), initial_sync_done(false) {
}
DataTypeState::~DataTypeState() {
}
-CommitRequestData::CommitRequestData() {
+CommitRequestData::CommitRequestData()
+ : sequence_number(0),
+ base_version(0),
+ deleted(false) {
}
CommitRequestData::~CommitRequestData() {
}
-CommitResponseData::CommitResponseData() {
+CommitResponseData::CommitResponseData()
+ : sequence_number(0),
+ response_version(0) {
}
CommitResponseData::~CommitResponseData() {
}
-UpdateResponseData::UpdateResponseData() {
+UpdateResponseData::UpdateResponseData()
+ : response_version(0),
+ deleted(false) {
}
UpdateResponseData::~UpdateResponseData() {
diff --git a/sync/engine/non_blocking_sync_common.h b/sync/engine/non_blocking_sync_common.h
index f07fdb2..d0e0f9d 100644
--- a/sync/engine/non_blocking_sync_common.h
+++ b/sync/engine/non_blocking_sync_common.h
@@ -14,6 +14,8 @@
namespace syncer {
+static const int64 kUncommittedVersion = -1;
+
// Data-type global state that must be accessed and updated on the sync thread,
// but persisted on or through the model thread.
struct SYNC_EXPORT_PRIVATE DataTypeState {
@@ -43,6 +45,11 @@ struct SYNC_EXPORT_PRIVATE DataTypeState {
// client-tagged data types supported by non-blocking sync, but we will
// continue to emulate the directory sync's behavior for now.
int64 next_client_id;
+
+ // This flag is set to true when the first download cycle is complete. The
+ // NonBlockingTypeProcessor should not attempt to commit any items until this
+ // flag is set.
+ bool initial_sync_done;
};
struct SYNC_EXPORT_PRIVATE CommitRequestData {
diff --git a/sync/engine/non_blocking_type_commit_contribution.cc b/sync/engine/non_blocking_type_commit_contribution.cc
new file mode 100644
index 0000000..9e48ad6
--- /dev/null
+++ b/sync/engine/non_blocking_type_commit_contribution.cc
@@ -0,0 +1,119 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_commit_contribution.h"
+
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/engine/non_blocking_type_processor_core.h"
+#include "sync/protocol/proto_value_conversions.h"
+
+namespace syncer {
+
+NonBlockingTypeCommitContribution::NonBlockingTypeCommitContribution(
+ const sync_pb::DataTypeContext& context,
+ const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities,
+ const std::vector<int64>& sequence_numbers,
+ NonBlockingTypeProcessorCore* processor_core)
+ : processor_core_(processor_core),
+ context_(context),
+ entities_(entities),
+ sequence_numbers_(sequence_numbers),
+ cleaned_up_(false) {
+}
+
+NonBlockingTypeCommitContribution::~NonBlockingTypeCommitContribution() {
+ DCHECK(cleaned_up_);
+}
+
+void NonBlockingTypeCommitContribution::AddToCommitMessage(
+ sync_pb::ClientToServerMessage* msg) {
+ sync_pb::CommitMessage* commit_message = msg->mutable_commit();
+ entries_start_index_ = commit_message->entries_size();
+
+ std::copy(entities_.begin(),
+ entities_.end(),
+ RepeatedPtrFieldBackInserter(commit_message->mutable_entries()));
+ if (!context_.context().empty())
+ commit_message->add_client_contexts()->CopyFrom(context_);
+}
+
+SyncerError NonBlockingTypeCommitContribution::ProcessCommitResponse(
+ const sync_pb::ClientToServerResponse& response,
+ sessions::StatusController* status) {
+ const sync_pb::CommitResponse& commit_response = response.commit();
+
+ bool transient_error = false;
+ bool commit_conflict = false;
+ bool unknown_error = false;
+
+ CommitResponseDataList response_list;
+
+ for (size_t i = 0; i < sequence_numbers_.size(); ++i) {
+ const sync_pb::CommitResponse_EntryResponse& entry_response =
+ commit_response.entryresponse(entries_start_index_ + i);
+
+ switch (entry_response.response_type()) {
+ case sync_pb::CommitResponse::INVALID_MESSAGE:
+ LOG(ERROR) << "Server reports commit message is invalid.";
+ DLOG(ERROR) << "Message was: " << SyncEntityToValue(entities_.Get(i),
+ false);
+ unknown_error = true;
+ break;
+ case sync_pb::CommitResponse::CONFLICT:
+ DVLOG(1) << "Server reports conflict for commit message.";
+ DVLOG(1) << "Message was: " << SyncEntityToValue(entities_.Get(i),
+ false);
+ commit_conflict = true;
+ break;
+ case sync_pb::CommitResponse::SUCCESS: {
+ CommitResponseData response_data;
+ response_data.id = entry_response.id_string();
+ response_data.client_tag_hash =
+ entities_.Get(i).client_defined_unique_tag();
+ response_data.sequence_number = sequence_numbers_[i];
+ response_data.response_version = entry_response.version();
+ response_list.push_back(response_data);
+ break;
+ }
+ case sync_pb::CommitResponse::OVER_QUOTA:
+ case sync_pb::CommitResponse::RETRY:
+ case sync_pb::CommitResponse::TRANSIENT_ERROR:
+ DLOG(WARNING) << "Entity commit blocked by transient error.";
+ transient_error = true;
+ break;
+ default:
+ LOG(ERROR) << "Bad return from ProcessSingleCommitResponse.";
+ unknown_error = true;
+ }
+ }
+
+ // Send whatever successful responses we did get back to our parent.
+ // It's the schedulers job to handle the failures.
+ processor_core_->OnCommitResponse(response_list);
+
+ // Let the scheduler know about the failures.
+ if (unknown_error) {
+ return SERVER_RETURN_UNKNOWN_ERROR;
+ } else if (transient_error) {
+ return SERVER_RETURN_TRANSIENT_ERROR;
+ } else if (commit_conflict) {
+ return SERVER_RETURN_CONFLICT;
+ } else {
+ return SYNCER_OK;
+ }
+}
+
+void NonBlockingTypeCommitContribution::CleanUp() {
+ cleaned_up_ = true;
+
+ // We could inform our parent NonBlockingCommitContributor that a commit is
+ // no longer in progress. The current implementation doesn't really care
+ // either way, so we don't bother sending the signal.
+}
+
+size_t NonBlockingTypeCommitContribution::GetNumEntries() const {
+ return sequence_numbers_.size();
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_type_commit_contribution.h b/sync/engine/non_blocking_type_commit_contribution.h
new file mode 100644
index 0000000..4d415ca
--- /dev/null
+++ b/sync/engine/non_blocking_type_commit_contribution.h
@@ -0,0 +1,66 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_
+#define SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "sync/engine/commit_contribution.h"
+#include "sync/protocol/sync.pb.h"
+
+namespace syncer {
+
+class NonBlockingTypeProcessorCore;
+
+// A non-blocking sync type's contribution to an outgoing commit message.
+//
+// Helps build a commit message and process its response. It collaborates
+// closely with the NonBlockingTypeProcessorCore.
+class NonBlockingTypeCommitContribution : public CommitContribution {
+ public:
+ NonBlockingTypeCommitContribution(
+ const sync_pb::DataTypeContext& context,
+ const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities,
+ const std::vector<int64>& sequence_numbers,
+ NonBlockingTypeProcessorCore* processor_core);
+ virtual ~NonBlockingTypeCommitContribution();
+
+ // Implementation of CommitContribution
+ virtual void AddToCommitMessage(sync_pb::ClientToServerMessage* msg) OVERRIDE;
+ virtual SyncerError ProcessCommitResponse(
+ const sync_pb::ClientToServerResponse& response,
+ sessions::StatusController* status) OVERRIDE;
+ virtual void CleanUp() OVERRIDE;
+ virtual size_t GetNumEntries() const OVERRIDE;
+
+ private:
+ // A non-owned pointer back to the object that created this contribution.
+ NonBlockingTypeProcessorCore* const processor_core_;
+
+ // The type-global context information.
+ const sync_pb::DataTypeContext context_;
+
+ // The set of entities to be committed, serialized as SyncEntities.
+ const google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> entities_;
+
+ // The sequence numbers associated with the pending commits. These match up
+ // with the entities_ vector.
+ const std::vector<int64> sequence_numbers_;
+
+ // The index in the commit message where this contribution's entities are
+ // added. Used to correlate per-item requests with per-item responses.
+ size_t entries_start_index_;
+
+ // A flag used to ensure this object's contract is respected. Helps to check
+ // that CleanUp() is called before the object is destructed.
+ bool cleaned_up_;
+
+ DISALLOW_COPY_AND_ASSIGN(NonBlockingTypeCommitContribution);
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_COMMIT_CONTRIBUTION_H_
diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc
index 0aaead4..c1982b3 100644
--- a/sync/engine/non_blocking_type_processor.cc
+++ b/sync/engine/non_blocking_type_processor.cc
@@ -51,7 +51,6 @@ void NonBlockingTypeProcessor::Enable(
// TODO(rlarocque): At some point, this should be loaded from storage.
data_type_state_.progress_marker.set_data_type_id(
GetSpecificsFieldNumberFromModelType(type_));
- data_type_state_.next_client_id = 0;
sync_core_proxy_ = sync_core_proxy.Pass();
sync_core_proxy_->ConnectTypeToCore(GetModelType(),
@@ -142,6 +141,10 @@ void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
if (!IsConnected())
return;
+ // Don't send anything if the type is not ready to handle commits.
+ if (!data_type_state_.initial_sync_done)
+ return;
+
// TODO(rlarocque): Do something smarter than iterate here.
for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
++it) {
@@ -184,6 +187,9 @@ void NonBlockingTypeProcessor::OnCommitCompletion(
void NonBlockingTypeProcessor::OnUpdateReceived(
const DataTypeState& data_type_state,
const UpdateResponseDataList& response_list) {
+ bool initial_sync_just_finished =
+ !data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
+
data_type_state_ = data_type_state;
for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
@@ -215,6 +221,9 @@ void NonBlockingTypeProcessor::OnUpdateReceived(
}
}
+ if (initial_sync_just_finished)
+ FlushPendingCommitRequests();
+
// TODO: Inform the model of the new or updated data.
}
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc
index 236c9c5..a14a8d0 100644
--- a/sync/engine/non_blocking_type_processor_core.cc
+++ b/sync/engine/non_blocking_type_processor_core.cc
@@ -4,20 +4,28 @@
#include "sync/engine/non_blocking_type_processor_core.h"
+#include "base/bind.h"
+#include "base/format_macros.h"
#include "base/logging.h"
+#include "base/strings/stringprintf.h"
#include "sync/engine/commit_contribution.h"
+#include "sync/engine/non_blocking_type_commit_contribution.h"
+#include "sync/engine/non_blocking_type_processor_interface.h"
+#include "sync/engine/sync_thread_sync_entity.h"
+#include "sync/syncable/syncable_util.h"
+#include "sync/util/time.h"
namespace syncer {
NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
- ModelType type,
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner,
- base::WeakPtr<NonBlockingTypeProcessor> processor)
+ ModelType type,
+ const DataTypeState& initial_state,
+ scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
: type_(type),
- processor_task_runner_(processor_task_runner),
- processor_(processor),
+ data_type_state_(initial_state),
+ processor_interface_(processor_interface.Pass()),
+ entities_deleter_(&entities_),
weak_ptr_factory_(this) {
- progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type));
}
NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
@@ -32,16 +40,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const {
void NonBlockingTypeProcessorCore::GetDownloadProgress(
sync_pb::DataTypeProgressMarker* progress_marker) const {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_);
- *progress_marker = progress_marker_;
+ progress_marker->CopyFrom(data_type_state_.progress_marker);
}
void NonBlockingTypeProcessorCore::GetDataTypeContext(
sync_pb::DataTypeContext* context) const {
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Getting context for: " << ModelTypeToString(type_);
- context->Clear();
+ DCHECK(CalledOnValidThread());
+ context->CopyFrom(data_type_state_.type_context);
}
SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
@@ -50,22 +55,78 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
const SyncEntityList& applicable_updates,
sessions::StatusController* status) {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_);
- progress_marker_ = progress_marker;
+
+ // TODO(rlarocque): Handle data type context conflicts.
+ data_type_state_.type_context = mutated_context;
+ data_type_state_.progress_marker = progress_marker;
+
+ UpdateResponseDataList response_datas;
+
+ for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
+ update_it != applicable_updates.end();
+ ++update_it) {
+ const sync_pb::SyncEntity* update_entity = *update_it;
+ if (!update_entity->server_defined_unique_tag().empty()) {
+ // We can't commit an item unless we know its parent ID. This is where
+ // we learn that ID and remember it forever.
+ DCHECK_EQ(ModelTypeToRootTag(type_),
+ update_entity->server_defined_unique_tag());
+ if (!data_type_state_.type_root_id.empty()) {
+ DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
+ }
+ data_type_state_.type_root_id = update_entity->id_string();
+ } else {
+ // Normal updates are handled here.
+ const std::string& client_tag_hash =
+ update_entity->client_defined_unique_tag();
+ DCHECK(!client_tag_hash.empty());
+ EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
+ if (map_it == entities_.end()) {
+ SyncThreadSyncEntity* entity =
+ SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
+ client_tag_hash,
+ update_entity->version());
+ entities_.insert(std::make_pair(client_tag_hash, entity));
+ } else {
+ SyncThreadSyncEntity* entity = map_it->second;
+ entity->ReceiveUpdate(update_entity->version());
+ }
+
+ // Prepare the message for the model thread.
+ UpdateResponseData response_data;
+ response_data.id = update_entity->id_string();
+ response_data.client_tag_hash = client_tag_hash;
+ response_data.response_version = update_entity->version();
+ response_data.ctime = ProtoTimeToTime(update_entity->ctime());
+ response_data.mtime = ProtoTimeToTime(update_entity->mtime());
+ response_data.non_unique_name = update_entity->name();
+ response_data.deleted = update_entity->deleted();
+ response_data.specifics = update_entity->specifics();
+
+ response_datas.push_back(response_data);
+ }
+ }
+
+ // Forward these updates to the model thread so it can do the rest.
+ processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
+
return SYNCER_OK;
}
void NonBlockingTypeProcessorCore::ApplyUpdates(
sessions::StatusController* status) {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_);
-}
+ // This function is called only when we've finished a download cycle, ie. we
+ // got a response with changes_remaining == 0. If this is our first download
+ // cycle, we should update our state so the NonBlockingTypeProcessor knows
+ // that it's safe to commit items now.
+ if (!data_type_state_.initial_sync_done) {
+ data_type_state_.initial_sync_done = true;
-void NonBlockingTypeProcessorCore::RequestCommits(
- const CommitRequestDataList& request_list) {
- // TODO(rlarocque): Implement this. crbug.com/351005.
+ UpdateResponseDataList empty_update_list;
+ processor_interface_->ReceiveUpdateResponse(data_type_state_,
+ empty_update_list);
+ }
}
void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
@@ -75,13 +136,119 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
<< "ModelType is: " << ModelTypeToString(type_);
}
+void NonBlockingTypeProcessorCore::EnqueueForCommit(
+ const CommitRequestDataList& list) {
+ DCHECK(CalledOnValidThread());
+
+ DCHECK(CanCommitItems())
+ << "Asked to commit items before type was initialized. "
+ << "ModelType is: " << ModelTypeToString(type_);
+
+ for (CommitRequestDataList::const_iterator it = list.begin();
+ it != list.end();
+ ++it) {
+ StorePendingCommit(*it);
+ }
+}
+
// CommitContributor implementation.
scoped_ptr<CommitContribution>
NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
DCHECK(CalledOnValidThread());
- // TODO(rlarocque): Implement this properly. crbug.com/351005.
- DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_);
- return scoped_ptr<CommitContribution>();
+
+ size_t space_remaining = max_entries;
+ std::vector<int64> sequence_numbers;
+ google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
+
+ if (!CanCommitItems())
+ return scoped_ptr<CommitContribution>();
+
+ // TODO(rlarocque): Avoid iterating here.
+ for (EntityMap::const_iterator it = entities_.begin();
+ it != entities_.end() && space_remaining > 0;
+ ++it) {
+ SyncThreadSyncEntity* entity = it->second;
+ if (entity->IsCommitPending()) {
+ sync_pb::SyncEntity* commit_entity = commit_entities.Add();
+ int64 sequence_number = -1;
+
+ entity->PrepareCommitProto(commit_entity, &sequence_number);
+ HelpInitializeCommitEntity(commit_entity);
+ sequence_numbers.push_back(sequence_number);
+
+ space_remaining--;
+ }
+ }
+
+ if (commit_entities.size() == 0)
+ return scoped_ptr<CommitContribution>();
+
+ return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
+ data_type_state_.type_context, commit_entities, sequence_numbers, this));
+}
+
+void NonBlockingTypeProcessorCore::StorePendingCommit(
+ const CommitRequestData& request) {
+ if (!request.deleted) {
+ DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
+ }
+
+ EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
+ if (map_it == entities_.end()) {
+ SyncThreadSyncEntity* entity =
+ SyncThreadSyncEntity::FromCommitRequest(request.id,
+ request.client_tag_hash,
+ request.sequence_number,
+ request.base_version,
+ request.ctime,
+ request.mtime,
+ request.non_unique_name,
+ request.deleted,
+ request.specifics);
+ entities_.insert(std::make_pair(request.client_tag_hash, entity));
+ } else {
+ SyncThreadSyncEntity* entity = map_it->second;
+ entity->RequestCommit(request.id,
+ request.client_tag_hash,
+ request.sequence_number,
+ request.base_version,
+ request.ctime,
+ request.mtime,
+ request.non_unique_name,
+ request.deleted,
+ request.specifics);
+ }
+
+ // TODO: Nudge SyncScheduler.
+}
+
+void NonBlockingTypeProcessorCore::OnCommitResponse(
+ const CommitResponseDataList& response_list) {
+ for (CommitResponseDataList::const_iterator response_it =
+ response_list.begin();
+ response_it != response_list.end();
+ ++response_it) {
+ const std::string client_tag_hash = response_it->client_tag_hash;
+ EntityMap::iterator map_it = entities_.find(client_tag_hash);
+
+ // There's no way we could have committed an entry we know nothing about.
+ if (map_it == entities_.end()) {
+ NOTREACHED() << "Received commit response for item unknown to us."
+ << " Model type: " << ModelTypeToString(type_)
+ << " ID: " << response_it->id;
+ continue;
+ }
+
+ SyncThreadSyncEntity* entity = map_it->second;
+ entity->ReceiveCommitResponse(response_it->id,
+ response_it->response_version,
+ response_it->sequence_number);
+ }
+
+ // Send the responses back to the model thread. It needs to know which
+ // items have been successfully committed so it can save that information in
+ // permanent storage.
+ processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
}
base::WeakPtr<NonBlockingTypeProcessorCore>
@@ -89,4 +256,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
+bool NonBlockingTypeProcessorCore::CanCommitItems() const {
+ // We can't commit anything until we know the type's parent node.
+ // We'll get it in the first update response.
+ return !data_type_state_.type_root_id.empty() &&
+ data_type_state_.initial_sync_done;
+}
+
+void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity(
+ sync_pb::SyncEntity* sync_entity) {
+ // Initial commits need our help to generate a client ID.
+ if (!sync_entity->has_id_string()) {
+ DCHECK_EQ(kUncommittedVersion, sync_entity->version());
+ const int64 id = data_type_state_.next_client_id++;
+ sync_entity->set_id_string(
+ base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
+ }
+
+ // Always include enough specifics to identify the type. Do this even in
+ // deletion requests, where the specifics are otherwise invalid.
+ if (!sync_entity->has_specifics()) {
+ AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
+ }
+
+ // We're always responsible for the parent ID.
+ sync_entity->set_parent_id_string(data_type_state_.type_root_id);
+}
+
} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h
index e226349..93d4215 100644
--- a/sync/engine/non_blocking_type_processor_core.h
+++ b/sync/engine/non_blocking_type_processor_core.h
@@ -6,6 +6,7 @@
#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_CORE_H_
#include "base/memory/weak_ptr.h"
+#include "base/stl_util.h"
#include "base/threading/non_thread_safe.h"
#include "sync/base/sync_export.h"
#include "sync/engine/commit_contributor.h"
@@ -20,7 +21,8 @@ class SingleThreadTaskRunner;
namespace syncer {
-class NonBlockingTypeProcessor;
+class NonBlockingTypeProcessorInterface;
+class SyncThreadSyncEntity;
// A smart cache for sync types that use message passing (rather than
// transactions and the syncable::Directory) to communicate with the sync
@@ -28,9 +30,9 @@ class NonBlockingTypeProcessor;
//
// When the non-blocking sync type wants to talk with the sync server, it will
// send a message from its thread to this object on the sync thread. This
-// object is responsible for helping to ensure the appropriate sync server
-// communication gets scheduled and executed. The response, if any, will be
-// returned to the non-blocking sync type's thread eventually.
+// object ensures the appropriate sync server communication gets scheduled and
+// executed. The response, if any, will be returned to the non-blocking sync
+// type's thread eventually.
//
// This object also has a role to play in communications in the opposite
// direction. Sometimes the sync thread will receive changes from the sync
@@ -49,8 +51,8 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore
public:
NonBlockingTypeProcessorCore(
ModelType type,
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner,
- base::WeakPtr<NonBlockingTypeProcessor> processor);
+ const DataTypeState& initial_state,
+ scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface);
virtual ~NonBlockingTypeProcessorCore();
ModelType GetModelType() const;
@@ -69,20 +71,57 @@ class SYNC_EXPORT NonBlockingTypeProcessorCore
virtual void PassiveApplyUpdates(sessions::StatusController* status) OVERRIDE;
// Entry point for NonBlockingTypeProcessor to send commit requests.
- void RequestCommits(const CommitRequestDataList& request_list);
+ void EnqueueForCommit(const CommitRequestDataList& request_list);
// CommitContributor implementation.
virtual scoped_ptr<CommitContribution> GetContribution(
size_t max_entries) OVERRIDE;
+ // Callback for when our contribution gets a response.
+ void OnCommitResponse(const CommitResponseDataList& response_list);
+
base::WeakPtr<NonBlockingTypeProcessorCore> AsWeakPtr();
private:
+ typedef std::map<std::string, SyncThreadSyncEntity*> EntityMap;
+
+ // Stores a single commit request in this object's internal state.
+ void StorePendingCommit(const CommitRequestData& request);
+
+ // Returns true if all data type state required for commits is available. In
+ // practice, this means that it returns true from the time this object first
+ // receives notice of a successful update fetch from the server.
+ bool CanCommitItems() const;
+
+ // Initializes the parts of a commit entity that are the responsibility of
+ // this class, and not the SyncThreadSyncEntity. Some fields, like the
+ // client-assigned ID, can only be set by an entity with knowledge of the
+ // entire data type's state.
+ void HelpInitializeCommitEntity(sync_pb::SyncEntity* commit_entity);
+
ModelType type_;
- sync_pb::DataTypeProgressMarker progress_marker_;
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
- base::WeakPtr<NonBlockingTypeProcessor> processor_;
+ // State that applies to the entire model type.
+ DataTypeState data_type_state_;
+
+ // Abstraction around the NonBlockingTypeProcessor so this class
+ // doesn't need to know about its specific implementation or
+ // which thread it's on. This makes it easier to write tests.
+ scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface_;
+
+ // A map of per-entity information known to this object.
+ //
+ // When commits are pending, their information is stored here. This
+ // information is dropped from memory when the commit succeeds or gets
+ // cancelled.
+ //
+ // This also stores some information related to received server state in
+ // order to implement reflection blocking and conflict detection. This
+ // information is kept in memory indefinitely. With a bit more coordination
+ // with the model thread, we could optimize this to reduce memory usage in
+ // the steady state.
+ EntityMap entities_;
+ STLValueDeleter<EntityMap> entities_deleter_;
base::WeakPtrFactory<NonBlockingTypeProcessorCore> weak_ptr_factory_;
};
diff --git a/sync/engine/non_blocking_type_processor_core_unittest.cc b/sync/engine/non_blocking_type_processor_core_unittest.cc
new file mode 100644
index 0000000..87b5647
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_core_unittest.cc
@@ -0,0 +1,961 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_processor_core.h"
+
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/engine/non_blocking_type_commit_contribution.h"
+#include "sync/engine/non_blocking_type_processor_interface.h"
+#include "sync/protocol/sync.pb.h"
+#include "sync/sessions/status_controller.h"
+#include "sync/syncable/syncable_util.h"
+#include "sync/util/time.h"
+
+#include "testing/gtest/include/gtest/gtest.h"
+
+using google::protobuf::RepeatedPtrField;
+
+static const std::string kTypeParentId = "PrefsRootNodeID";
+
+namespace syncer {
+
+class NonBlockingTypeProcessorCoreTest;
+
+namespace {
+
+class MockNonBlockingTypeProcessor : public NonBlockingTypeProcessorInterface {
+ public:
+ MockNonBlockingTypeProcessor(NonBlockingTypeProcessorCoreTest* parent);
+ virtual ~MockNonBlockingTypeProcessor();
+
+ virtual void ReceiveCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) OVERRIDE;
+ virtual void ReceiveUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) OVERRIDE;
+
+ private:
+ NonBlockingTypeProcessorCoreTest* parent_;
+
+ DISALLOW_COPY_AND_ASSIGN(MockNonBlockingTypeProcessor);
+};
+
+} // namespace
+
+// Tests the NonBlockingTypeProcessorCore.
+//
+// This class passes messages between the model thread and sync server.
+// As such, its code is subject to lots of different race conditions. This
+// test harness lets us exhaustively test all possible races. We try to
+// focus on just a few interesting cases.
+//
+// Inputs:
+// - Initial data type state from the model thread.
+// - Commit requests from the model thread.
+// - Update responses from the server.
+// - Commit responses from the server.
+//
+// Outputs:
+// - Commit requests to the server.
+// - Commit responses to the model thread.
+// - Update responses to the model thread.
+// - Nudges to the sync scheduler.
+//
+// We use the MockNonBlockingTypeProcessor to stub out all communication
+// with the model thread. That interface is synchronous, which makes it
+// much easier to test races.
+//
+// The interface with the server is built around "pulling" data from this
+// class, so we don't have to mock out any of it. We wrap it with some
+// convenience functions to we can emulate server behavior.
+class NonBlockingTypeProcessorCoreTest : public ::testing::Test {
+ public:
+ NonBlockingTypeProcessorCoreTest();
+ virtual ~NonBlockingTypeProcessorCoreTest();
+
+ // One of these Initialize functions should be called at the beginning of
+ // each test.
+
+ // Initializes with no data type state. We will be unable to perform any
+ // significant server action until we receive an update response that
+ // contains the type root node for this type.
+ void FirstInitialize();
+
+ // Initializes with some existing data type state. Allows us to start
+ // committing items right away.
+ void NormalInitialize();
+
+ // Initialize with a custom initial DataTypeState.
+ void InitializeWithState(const DataTypeState& state);
+
+ // Modifications on the model thread that get sent to the core under test.
+ void CommitRequest(const std::string& tag, const std::string& value);
+ void DeleteRequest(const std::string& tag);
+
+ // Pretends to receive update messages from the server.
+ void TriggerTypeRootUpdateFromServer();
+ void TriggerUpdateFromServer(int64 version_offset,
+ const std::string& tag,
+ const std::string& value);
+ void TriggerTombstoneFromServer(int64 version_offset, const std::string& tag);
+
+ // Callbacks from the mock processor. Called when the |core_| tries to send
+ // messages to its associated processor on the model thread.
+ void OnModelThreadReceivedCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list);
+ void OnModelThreadReceivedUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list);
+
+ // By default, this harness behaves as if all tasks posted to the model
+ // thread are executed immediately. However, this is not necessarily true.
+ // The model's TaskRunner has a queue, and the tasks we post to it could
+ // linger there for a while. In the meantime, the model thread could
+ // continue posting tasks to the core based on its stale state.
+ //
+ // If you want to test those race cases, then these functions are for you.
+ void SetModelThreadIsSynchronous(bool is_synchronous);
+ void PumpModelThread();
+
+ // Returns true if the |core_| is ready to commit something.
+ bool WillCommit();
+
+ // Pretend to successfully commit all outstanding unsynced items.
+ // It is safe to call this only if WillCommit() returns true.
+ void DoSuccessfulCommit();
+
+ // Read commit messages the core_ sent to the emulated server.
+ size_t GetNumCommitMessagesOnServer() const;
+ sync_pb::ClientToServerMessage GetNthCommitMessageOnServer(size_t n) const;
+
+ // Read the latest version of sync entities committed to the emulated server.
+ bool HasCommitEntityOnServer(const std::string& tag) const;
+ sync_pb::SyncEntity GetLatestCommitEntityOnServer(
+ const std::string& tag) const;
+
+ // Read the latest update messages received on the model thread.
+ // Note that if the model thread is in non-blocking mode, this data will not
+ // be updated until the response is actually processed by the model thread.
+ size_t GetNumModelThreadUpdateResponses() const;
+ UpdateResponseDataList GetNthModelThreadUpdateResponse(size_t n) const;
+ DataTypeState GetNthModelThreadUpdateState(size_t n) const;
+
+ // Reads the latest update response datas on the model thread.
+ // Note that if the model thread is in non-blocking mode, this data will not
+ // be updated until the response is actually processed by the model thread.
+ bool HasUpdateResponseOnModelThread(const std::string& tag) const;
+ UpdateResponseData GetUpdateResponseOnModelThread(
+ const std::string& tag) const;
+
+ // Read the latest commit messages received on the model thread.
+ // Note that if the model thread is in non-blocking mode, this data will not
+ // be updated until the response is actually processed by the model thread.
+ size_t GetNumModelThreadCommitResponses() const;
+ CommitResponseDataList GetNthModelThreadCommitResponse(size_t n) const;
+ DataTypeState GetNthModelThreadCommitState(size_t n) const;
+
+ // Reads the latest commit response datas on the model thread.
+ // Note that if the model thread is in non-blocking mode, this data will not
+ // be updated until the response is actually processed by the model thread.
+ bool HasCommitResponseOnModelThread(const std::string& tag) const;
+ CommitResponseData GetCommitResponseOnModelThread(
+ const std::string& tag) const;
+
+ // Helpers for building various messages and structures.
+ static std::string GenerateId(const std::string& tag_hash);
+ static std::string GenerateTagHash(const std::string& tag);
+ static sync_pb::EntitySpecifics GenerateSpecifics(const std::string& tag,
+ const std::string& value);
+
+ private:
+ // Get and set our emulated server state.
+ int64 GetServerVersion(const std::string& tag_hash);
+ void SetServerVersion(const std::string& tag_hash, int64 version);
+
+ // Get and set our emulated model thread state.
+ int64 GetCurrentSequenceNumber(const std::string& tag_hash) const;
+ int64 GetNextSequenceNumber(const std::string& tag_hash);
+ int64 GetModelVersion(const std::string& tag_hash) const;
+ void SetModelVersion(const std::string& tag_hash, int64 version);
+
+ // Receive a commit response in the emulated model thread.
+ //
+ // Kept in a separate Impl method so we can emulate deferred task processing.
+ // See SetModelThreadIsSynchronous() for details.
+ void ModelThreadReceiveCommitResponseImpl(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list);
+
+ // Receive an update response in the emulated model thread.
+ //
+ // Kept in a separate Impl method so we can emulate deferred task processing.
+ // See SetModelThreadIsSynchronous() for details.
+ void ModelThreadReceiveUpdateResponseImpl(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list);
+
+ // Builds a fake progress marker for our response.
+ sync_pb::DataTypeProgressMarker GenerateResponseProgressMarker() const;
+
+ scoped_ptr<NonBlockingTypeProcessorCore> core_;
+ MockNonBlockingTypeProcessor* mock_processor_;
+
+ // Model thread state maps.
+ std::map<const std::string, int64> model_sequence_numbers_;
+ std::map<const std::string, int64> model_base_versions_;
+
+ // Server state maps.
+ std::map<const std::string, int64> server_versions_;
+
+ // Logs of messages sent to the server. Used in assertions.
+ std::map<const std::string, sync_pb::SyncEntity> committed_items_;
+ std::vector<sync_pb::ClientToServerMessage> commit_messages_;
+
+ // State related to emulation of the model thread's task queue. Used to
+ // defer model thread work to simulate a full model thread task runner queue.
+ bool model_thread_is_synchronous_;
+ std::vector<base::Closure> model_thread_tasks_;
+
+ // A cache of messages sent to the model thread.
+ std::vector<CommitResponseDataList> commit_responses_to_model_thread_;
+ std::vector<UpdateResponseDataList> updates_responses_to_model_thread_;
+ std::vector<DataTypeState> updates_states_to_model_thread_;
+ std::vector<DataTypeState> commit_states_to_model_thread_;
+
+ // A cache of the latest responses on the model thread, by client tag.
+ std::map<const std::string, CommitResponseData>
+ model_thread_commit_response_items_;
+ std::map<const std::string, UpdateResponseData>
+ model_thread_update_response_items_;
+};
+
+// These had to wait until the class definition of
+// NonBlockingTypeProcessorCoreTest
+MockNonBlockingTypeProcessor::MockNonBlockingTypeProcessor(
+ NonBlockingTypeProcessorCoreTest* parent)
+ : parent_(parent) {
+}
+
+MockNonBlockingTypeProcessor::~MockNonBlockingTypeProcessor() {
+}
+
+void MockNonBlockingTypeProcessor::ReceiveCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ parent_->OnModelThreadReceivedCommitResponse(type_state, response_list);
+}
+
+void MockNonBlockingTypeProcessor::ReceiveUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) {
+ parent_->OnModelThreadReceivedUpdateResponse(type_state, response_list);
+}
+
+NonBlockingTypeProcessorCoreTest::NonBlockingTypeProcessorCoreTest()
+ : model_thread_is_synchronous_(true) {
+}
+
+NonBlockingTypeProcessorCoreTest::~NonBlockingTypeProcessorCoreTest() {
+}
+
+void NonBlockingTypeProcessorCoreTest::FirstInitialize() {
+ DataTypeState initial_state;
+ initial_state.progress_marker.set_data_type_id(
+ GetSpecificsFieldNumberFromModelType(PREFERENCES));
+ initial_state.next_client_id = 0;
+
+ InitializeWithState(initial_state);
+}
+
+void NonBlockingTypeProcessorCoreTest::NormalInitialize() {
+ DataTypeState initial_state;
+ initial_state.progress_marker.set_data_type_id(
+ GetSpecificsFieldNumberFromModelType(PREFERENCES));
+ initial_state.progress_marker.set_token("some_saved_progress_token");
+
+ initial_state.next_client_id = 10;
+ initial_state.type_root_id = kTypeParentId;
+ initial_state.initial_sync_done = true;
+
+ InitializeWithState(initial_state);
+}
+
+void NonBlockingTypeProcessorCoreTest::InitializeWithState(
+ const DataTypeState& state) {
+ DCHECK(!core_);
+
+ // We don't get to own this interace. The |core_| keeps a scoped_ptr to it.
+ mock_processor_ = new MockNonBlockingTypeProcessor(this);
+ scoped_ptr<NonBlockingTypeProcessorInterface> interface(mock_processor_);
+
+ core_.reset(
+ new NonBlockingTypeProcessorCore(PREFERENCES, state, interface.Pass()));
+}
+
+void NonBlockingTypeProcessorCoreTest::CommitRequest(const std::string& tag,
+ const std::string& value) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ const int64 base_version = GetModelVersion(tag_hash);
+
+ CommitRequestData data;
+
+ // Initial commits don't have IDs. Everything else does.
+ if (base_version > kUncommittedVersion) {
+ data.id = GenerateId(tag_hash);
+ }
+
+ data.client_tag_hash = tag_hash;
+ data.sequence_number = GetNextSequenceNumber(tag_hash);
+
+ data.base_version = base_version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(base_version);
+ data.non_unique_name = tag;
+
+ data.deleted = false;
+ data.specifics = GenerateSpecifics(tag, value);
+
+ CommitRequestDataList list;
+ list.push_back(data);
+
+ core_->EnqueueForCommit(list);
+}
+
+void NonBlockingTypeProcessorCoreTest::DeleteRequest(const std::string& tag) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ const int64 base_version = GetModelVersion(tag_hash);
+ CommitRequestData data;
+
+ // Requests to commit server-unknown items don't have IDs.
+ // We'll never send a deletion for a server-unknown item, but the model is
+ // allowed to request that we do.
+ if (base_version > kUncommittedVersion) {
+ data.id = GenerateId(tag_hash);
+ }
+
+ data.client_tag_hash = tag_hash;
+ data.sequence_number = GetNextSequenceNumber(tag_hash);
+
+ data.base_version = base_version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.client_tag_hash = tag_hash;
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(base_version);
+ data.deleted = true;
+
+ CommitRequestDataList list;
+ list.push_back(data);
+
+ core_->EnqueueForCommit(list);
+}
+
+void NonBlockingTypeProcessorCoreTest::TriggerTypeRootUpdateFromServer() {
+ sync_pb::SyncEntity entity;
+
+ entity.set_id_string(kTypeParentId);
+ entity.set_parent_id_string("r");
+ entity.set_version(1000);
+ entity.set_ctime(TimeToProtoTime(base::Time::UnixEpoch()));
+ entity.set_mtime(TimeToProtoTime(base::Time::UnixEpoch()));
+ entity.set_server_defined_unique_tag(ModelTypeToRootTag(PREFERENCES));
+ entity.set_deleted(false);
+ AddDefaultFieldValue(PREFERENCES, entity.mutable_specifics());
+
+ const sync_pb::DataTypeProgressMarker& progress =
+ GenerateResponseProgressMarker();
+ const sync_pb::DataTypeContext blank_context;
+ sessions::StatusController dummy_status;
+
+ SyncEntityList entity_list;
+ entity_list.push_back(&entity);
+
+ core_->ProcessGetUpdatesResponse(
+ progress, blank_context, entity_list, &dummy_status);
+ core_->ApplyUpdates(&dummy_status);
+}
+
+void NonBlockingTypeProcessorCoreTest::TriggerUpdateFromServer(
+ int64 version_offset,
+ const std::string& tag,
+ const std::string& value) {
+ const std::string tag_hash = GenerateTagHash(tag);
+
+ int64 old_version = GetServerVersion(tag_hash);
+ int64 version = old_version + version_offset;
+ if (version > old_version) {
+ SetServerVersion(tag_hash, version);
+ }
+
+ sync_pb::SyncEntity entity;
+
+ entity.set_id_string(GenerateId(tag_hash));
+ entity.set_parent_id_string(kTypeParentId);
+ entity.set_version(version);
+
+ base::Time ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ base::Time mtime = ctime + base::TimeDelta::FromSeconds(version);
+ entity.set_ctime(TimeToProtoTime(ctime));
+ entity.set_mtime(TimeToProtoTime(mtime));
+
+ entity.set_name(tag);
+ entity.set_client_defined_unique_tag(GenerateTagHash(tag));
+ entity.set_deleted(false);
+ entity.mutable_specifics()->CopyFrom(GenerateSpecifics(tag, value));
+
+ SyncEntityList entity_list;
+ entity_list.push_back(&entity);
+
+ const sync_pb::DataTypeProgressMarker& progress =
+ GenerateResponseProgressMarker();
+ const sync_pb::DataTypeContext blank_context;
+ sessions::StatusController dummy_status;
+
+ core_->ProcessGetUpdatesResponse(
+ progress, blank_context, entity_list, &dummy_status);
+ core_->ApplyUpdates(&dummy_status);
+}
+
+void NonBlockingTypeProcessorCoreTest::TriggerTombstoneFromServer(
+ int64 version_offset,
+ const std::string& tag) {
+ const std::string tag_hash = GenerateTagHash(tag);
+ int64 old_version = GetServerVersion(tag_hash);
+ int64 version = old_version + version_offset;
+ if (version > old_version) {
+ SetServerVersion(tag_hash, version);
+ }
+
+ UpdateResponseData data;
+
+ data.id = GenerateId(tag_hash);
+ data.client_tag_hash = tag_hash;
+ data.response_version = version;
+ data.ctime = base::Time::UnixEpoch() + base::TimeDelta::FromDays(1);
+ data.mtime = data.ctime + base::TimeDelta::FromSeconds(version);
+ data.non_unique_name = tag;
+ data.deleted = true;
+}
+
+void NonBlockingTypeProcessorCoreTest::OnModelThreadReceivedCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ base::Closure task = base::Bind(
+ &NonBlockingTypeProcessorCoreTest::ModelThreadReceiveCommitResponseImpl,
+ base::Unretained(this),
+ type_state,
+ response_list);
+ model_thread_tasks_.push_back(task);
+ if (model_thread_is_synchronous_)
+ PumpModelThread();
+}
+
+void NonBlockingTypeProcessorCoreTest::OnModelThreadReceivedUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) {
+ base::Closure task = base::Bind(
+ &NonBlockingTypeProcessorCoreTest::ModelThreadReceiveUpdateResponseImpl,
+ base::Unretained(this),
+ type_state,
+ response_list);
+ model_thread_tasks_.push_back(task);
+ if (model_thread_is_synchronous_)
+ PumpModelThread();
+}
+
+void NonBlockingTypeProcessorCoreTest::SetModelThreadIsSynchronous(
+ bool is_synchronous) {
+ model_thread_is_synchronous_ = is_synchronous;
+}
+
+void NonBlockingTypeProcessorCoreTest::PumpModelThread() {
+ for (std::vector<base::Closure>::iterator it = model_thread_tasks_.begin();
+ it != model_thread_tasks_.end();
+ ++it) {
+ it->Run();
+ }
+ model_thread_tasks_.clear();
+}
+
+bool NonBlockingTypeProcessorCoreTest::WillCommit() {
+ scoped_ptr<CommitContribution> contribution(core_->GetContribution(INT_MAX));
+
+ if (contribution) {
+ contribution->CleanUp(); // Gracefully abort the commit.
+ return true;
+ } else {
+ return false;
+ }
+}
+
+// Conveniently, this is all one big synchronous operation. The sync thread
+// remains blocked while the commit is in progress, so we don't need to worry
+// about other tasks being run between the time when the commit request is
+// issued and the time when the commit response is received.
+void NonBlockingTypeProcessorCoreTest::DoSuccessfulCommit() {
+ DCHECK(WillCommit());
+ scoped_ptr<CommitContribution> contribution(core_->GetContribution(INT_MAX));
+
+ sync_pb::ClientToServerMessage message;
+ contribution->AddToCommitMessage(&message);
+ commit_messages_.push_back(message);
+
+ sync_pb::ClientToServerResponse response;
+ sync_pb::CommitResponse* commit_response = response.mutable_commit();
+
+ const RepeatedPtrField<sync_pb::SyncEntity>& entries =
+ message.commit().entries();
+ for (RepeatedPtrField<sync_pb::SyncEntity>::const_iterator it =
+ entries.begin();
+ it != entries.end();
+ ++it) {
+ const std::string tag_hash = it->client_defined_unique_tag();
+
+ committed_items_[tag_hash] = *it;
+
+ // Every commit increments the version number.
+ int64 version = GetServerVersion(tag_hash);
+ version++;
+ SetServerVersion(tag_hash, version);
+
+ sync_pb::CommitResponse_EntryResponse* entryresponse =
+ commit_response->add_entryresponse();
+ entryresponse->set_response_type(sync_pb::CommitResponse::SUCCESS);
+ entryresponse->set_id_string(GenerateId(tag_hash));
+ entryresponse->set_parent_id_string(it->parent_id_string());
+ entryresponse->set_version(version);
+ entryresponse->set_name(it->name());
+ entryresponse->set_mtime(it->mtime());
+ }
+
+ sessions::StatusController dummy_status;
+ contribution->ProcessCommitResponse(response, &dummy_status);
+ contribution->CleanUp();
+}
+
+size_t NonBlockingTypeProcessorCoreTest::GetNumCommitMessagesOnServer() const {
+ return commit_messages_.size();
+}
+
+sync_pb::ClientToServerMessage
+NonBlockingTypeProcessorCoreTest::GetNthCommitMessageOnServer(size_t n) const {
+ DCHECK_LT(n, GetNumCommitMessagesOnServer());
+ return commit_messages_[n];
+}
+
+bool NonBlockingTypeProcessorCoreTest::HasCommitEntityOnServer(
+ const std::string& tag) const {
+ const std::string tag_hash = GenerateTagHash(tag);
+ std::map<const std::string, sync_pb::SyncEntity>::const_iterator it =
+ committed_items_.find(tag_hash);
+ return it != committed_items_.end();
+}
+
+sync_pb::SyncEntity
+NonBlockingTypeProcessorCoreTest::GetLatestCommitEntityOnServer(
+ const std::string& tag) const {
+ DCHECK(HasCommitEntityOnServer(tag));
+ const std::string tag_hash = GenerateTagHash(tag);
+ std::map<const std::string, sync_pb::SyncEntity>::const_iterator it =
+ committed_items_.find(tag_hash);
+ return it->second;
+}
+
+size_t NonBlockingTypeProcessorCoreTest::GetNumModelThreadUpdateResponses()
+ const {
+ return updates_responses_to_model_thread_.size();
+}
+
+UpdateResponseDataList
+NonBlockingTypeProcessorCoreTest::GetNthModelThreadUpdateResponse(
+ size_t n) const {
+ DCHECK(GetNumModelThreadUpdateResponses());
+ return updates_responses_to_model_thread_[n];
+}
+
+DataTypeState NonBlockingTypeProcessorCoreTest::GetNthModelThreadUpdateState(
+ size_t n) const {
+ DCHECK(GetNumModelThreadUpdateResponses());
+ return updates_states_to_model_thread_[n];
+}
+
+bool NonBlockingTypeProcessorCoreTest::HasUpdateResponseOnModelThread(
+ const std::string& tag) const {
+ const std::string tag_hash = GenerateTagHash(tag);
+ std::map<const std::string, UpdateResponseData>::const_iterator it =
+ model_thread_update_response_items_.find(tag_hash);
+ return it != model_thread_update_response_items_.end();
+}
+
+UpdateResponseData
+NonBlockingTypeProcessorCoreTest::GetUpdateResponseOnModelThread(
+ const std::string& tag) const {
+ const std::string tag_hash = GenerateTagHash(tag);
+ DCHECK(HasUpdateResponseOnModelThread(tag));
+ std::map<const std::string, UpdateResponseData>::const_iterator it =
+ model_thread_update_response_items_.find(tag_hash);
+ return it->second;
+}
+
+size_t NonBlockingTypeProcessorCoreTest::GetNumModelThreadCommitResponses()
+ const {
+ return commit_responses_to_model_thread_.size();
+}
+
+CommitResponseDataList
+NonBlockingTypeProcessorCoreTest::GetNthModelThreadCommitResponse(
+ size_t n) const {
+ DCHECK(GetNumModelThreadCommitResponses());
+ return commit_responses_to_model_thread_[n];
+}
+
+DataTypeState NonBlockingTypeProcessorCoreTest::GetNthModelThreadCommitState(
+ size_t n) const {
+ DCHECK(GetNumModelThreadCommitResponses());
+ return commit_states_to_model_thread_[n];
+}
+
+bool NonBlockingTypeProcessorCoreTest::HasCommitResponseOnModelThread(
+ const std::string& tag) const {
+ const std::string tag_hash = GenerateTagHash(tag);
+ std::map<const std::string, CommitResponseData>::const_iterator it =
+ model_thread_commit_response_items_.find(tag_hash);
+ return it != model_thread_commit_response_items_.end();
+}
+
+CommitResponseData
+NonBlockingTypeProcessorCoreTest::GetCommitResponseOnModelThread(
+ const std::string& tag) const {
+ DCHECK(HasCommitResponseOnModelThread(tag));
+ const std::string tag_hash = GenerateTagHash(tag);
+ std::map<const std::string, CommitResponseData>::const_iterator it =
+ model_thread_commit_response_items_.find(tag_hash);
+ return it->second;
+}
+
+std::string NonBlockingTypeProcessorCoreTest::GenerateId(
+ const std::string& tag_hash) {
+ return "FakeId:" + tag_hash;
+}
+
+std::string NonBlockingTypeProcessorCoreTest::GenerateTagHash(
+ const std::string& tag) {
+ const std::string& client_tag_hash =
+ syncable::GenerateSyncableHash(PREFERENCES, tag);
+
+ return client_tag_hash;
+}
+
+sync_pb::EntitySpecifics NonBlockingTypeProcessorCoreTest::GenerateSpecifics(
+ const std::string& tag,
+ const std::string& value) {
+ sync_pb::EntitySpecifics specifics;
+ specifics.mutable_preference()->set_name(tag);
+ specifics.mutable_preference()->set_value(value);
+ return specifics;
+}
+
+int64 NonBlockingTypeProcessorCoreTest::GetServerVersion(
+ const std::string& tag_hash) {
+ std::map<const std::string, int64>::const_iterator it;
+ it = server_versions_.find(tag_hash);
+ // Server versions do not necessarily start at 1 or 0.
+ if (it == server_versions_.end()) {
+ return 2048;
+ } else {
+ return it->second;
+ }
+}
+
+void NonBlockingTypeProcessorCoreTest::SetServerVersion(
+ const std::string& tag_hash,
+ int64 version) {
+ server_versions_[tag_hash] = version;
+}
+
+// Fetches the sequence number as of the most recent update request.
+int64 NonBlockingTypeProcessorCoreTest::GetCurrentSequenceNumber(
+ const std::string& tag_hash) const {
+ std::map<const std::string, int64>::const_iterator it =
+ model_sequence_numbers_.find(tag_hash);
+ if (it == model_sequence_numbers_.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+}
+
+// The model thread should be sending us items with strictly increasing
+// sequence numbers. Here's where we emulate that behavior.
+int64 NonBlockingTypeProcessorCoreTest::GetNextSequenceNumber(
+ const std::string& tag_hash) {
+ int64 sequence_number = GetCurrentSequenceNumber(tag_hash);
+ sequence_number++;
+ model_sequence_numbers_[tag_hash] = sequence_number;
+ return sequence_number;
+}
+
+// Fetches the model's base version.
+int64 NonBlockingTypeProcessorCoreTest::GetModelVersion(
+ const std::string& tag_hash) const {
+ std::map<const std::string, int64>::const_iterator it =
+ model_base_versions_.find(tag_hash);
+ if (it == model_base_versions_.end()) {
+ return kUncommittedVersion;
+ } else {
+ return it->second;
+ }
+}
+
+void NonBlockingTypeProcessorCoreTest::SetModelVersion(
+ const std::string& tag_hash,
+ int64 version) {
+ model_base_versions_[tag_hash] = version;
+}
+
+void NonBlockingTypeProcessorCoreTest::ModelThreadReceiveCommitResponseImpl(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ commit_responses_to_model_thread_.push_back(response_list);
+ commit_states_to_model_thread_.push_back(type_state);
+ for (CommitResponseDataList::const_iterator it = response_list.begin();
+ it != response_list.end();
+ ++it) {
+ model_thread_commit_response_items_.insert(
+ std::make_pair(it->client_tag_hash, *it));
+
+ // Server wins. Set the model's base version.
+ SetModelVersion(it->client_tag_hash, it->response_version);
+ }
+}
+
+void NonBlockingTypeProcessorCoreTest::ModelThreadReceiveUpdateResponseImpl(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) {
+ updates_responses_to_model_thread_.push_back(response_list);
+ updates_states_to_model_thread_.push_back(type_state);
+ for (UpdateResponseDataList::const_iterator it = response_list.begin();
+ it != response_list.end();
+ ++it) {
+ model_thread_update_response_items_.insert(
+ std::make_pair(it->client_tag_hash, *it));
+
+ // Server wins. Set the model's base version.
+ SetModelVersion(it->client_tag_hash, it->response_version);
+ }
+}
+
+sync_pb::DataTypeProgressMarker
+NonBlockingTypeProcessorCoreTest::GenerateResponseProgressMarker() const {
+ sync_pb::DataTypeProgressMarker progress;
+ progress.set_data_type_id(PREFERENCES);
+ progress.set_token("non_null_progress_token");
+ return progress;
+}
+
+// Requests a commit and verifies the messages sent to the client and server as
+// a result.
+//
+// This test performs sanity checks on most of the fields in these messages.
+// For the most part this is checking that the test code behaves as expected
+// and the |core_| doesn't mess up its simple task of moving around these
+// values. It makes sense to have one or two tests that are this thorough, but
+// we shouldn't be this verbose in all tests.
+TEST_F(NonBlockingTypeProcessorCoreTest, SimpleCommit) {
+ NormalInitialize();
+
+ EXPECT_FALSE(WillCommit());
+ EXPECT_EQ(0U, GetNumCommitMessagesOnServer());
+ EXPECT_EQ(0U, GetNumModelThreadCommitResponses());
+
+ CommitRequest("tag1", "value1");
+
+ ASSERT_TRUE(WillCommit());
+ DoSuccessfulCommit();
+
+ const std::string& client_tag_hash = GenerateTagHash("tag1");
+
+ // Exhaustively verify the SyncEntity sent in the commit message.
+ ASSERT_EQ(1U, GetNumCommitMessagesOnServer());
+ EXPECT_EQ(1, GetNthCommitMessageOnServer(0).commit().entries_size());
+ ASSERT_TRUE(HasCommitEntityOnServer("tag1"));
+ const sync_pb::SyncEntity& entity = GetLatestCommitEntityOnServer("tag1");
+ EXPECT_FALSE(entity.id_string().empty());
+ EXPECT_EQ(kTypeParentId, entity.parent_id_string());
+ EXPECT_EQ(kUncommittedVersion, entity.version());
+ EXPECT_NE(0, entity.mtime());
+ EXPECT_NE(0, entity.ctime());
+ EXPECT_EQ("tag1", entity.name());
+ EXPECT_EQ(client_tag_hash, entity.client_defined_unique_tag());
+ EXPECT_EQ("tag1", entity.specifics().preference().name());
+ EXPECT_FALSE(entity.deleted());
+ EXPECT_EQ("value1", entity.specifics().preference().value());
+
+ // Exhaustively verify the commit response returned to the model thread.
+ ASSERT_EQ(1U, GetNumModelThreadCommitResponses());
+ EXPECT_EQ(1U, GetNthModelThreadCommitResponse(0).size());
+ ASSERT_TRUE(HasCommitResponseOnModelThread("tag1"));
+ const CommitResponseData& commit_response =
+ GetCommitResponseOnModelThread("tag1");
+
+ // The ID changes in a commit response to initial commit.
+ EXPECT_FALSE(commit_response.id.empty());
+ EXPECT_NE(entity.id_string(), commit_response.id);
+
+ EXPECT_EQ(client_tag_hash, commit_response.client_tag_hash);
+ EXPECT_LT(0, commit_response.response_version);
+}
+
+TEST_F(NonBlockingTypeProcessorCoreTest, SimpleDelete) {
+ NormalInitialize();
+
+ // We can't delete an entity that was never committed.
+ // Step 1 is to create and commit a new entity.
+ CommitRequest("tag1", "value1");
+ ASSERT_TRUE(WillCommit());
+ DoSuccessfulCommit();
+
+ ASSERT_TRUE(HasCommitResponseOnModelThread("tag1"));
+ const CommitResponseData& initial_commit_response =
+ GetCommitResponseOnModelThread("tag1");
+ int64 base_version = initial_commit_response.response_version;
+
+ // Now that we have an entity, we can delete it.
+ DeleteRequest("tag1");
+ ASSERT_TRUE(WillCommit());
+ DoSuccessfulCommit();
+
+ // Verify the SyncEntity sent in the commit message.
+ ASSERT_EQ(2U, GetNumCommitMessagesOnServer());
+ EXPECT_EQ(1, GetNthCommitMessageOnServer(1).commit().entries_size());
+ ASSERT_TRUE(HasCommitEntityOnServer("tag1"));
+ const sync_pb::SyncEntity& entity = GetLatestCommitEntityOnServer("tag1");
+ EXPECT_FALSE(entity.id_string().empty());
+ EXPECT_EQ(GenerateTagHash("tag1"), entity.client_defined_unique_tag());
+ EXPECT_EQ(base_version, entity.version());
+ EXPECT_TRUE(entity.deleted());
+
+ // Deletions should contain enough specifics to identify the type.
+ EXPECT_TRUE(entity.has_specifics());
+ EXPECT_EQ(PREFERENCES, GetModelTypeFromSpecifics(entity.specifics()));
+
+ // Verify the commit response returned to the model thread.
+ ASSERT_EQ(2U, GetNumModelThreadCommitResponses());
+ EXPECT_EQ(1U, GetNthModelThreadCommitResponse(1).size());
+ ASSERT_TRUE(HasCommitResponseOnModelThread("tag1"));
+ const CommitResponseData& commit_response =
+ GetCommitResponseOnModelThread("tag1");
+
+ EXPECT_EQ(entity.id_string(), commit_response.id);
+ EXPECT_EQ(entity.client_defined_unique_tag(),
+ commit_response.client_tag_hash);
+ EXPECT_EQ(entity.version(), commit_response.response_version);
+}
+
+// The server doesn't like it when we try to delete an entity it's never heard
+// of before. This test helps ensure we avoid that scenario.
+TEST_F(NonBlockingTypeProcessorCoreTest, NoDeleteUncommitted) {
+ NormalInitialize();
+
+ // Request the commit of a new, never-before-seen item.
+ CommitRequest("tag1", "value1");
+ EXPECT_TRUE(WillCommit());
+
+ // Request a deletion of that item before we've had a chance to commit it.
+ DeleteRequest("tag1");
+ EXPECT_FALSE(WillCommit());
+}
+
+// Verifies the sending of an "initial sync done" signal.
+TEST_F(NonBlockingTypeProcessorCoreTest, SendInitialSyncDone) {
+ FirstInitialize(); // Initialize with no saved sync state.
+ EXPECT_EQ(0U, GetNumModelThreadUpdateResponses());
+
+ // Receive an update response that contains only the type root node.
+ TriggerTypeRootUpdateFromServer();
+
+ // Two updates:
+ // - One triggered by process updates to forward the type root ID.
+ // - One triggered by apply updates, which the core interprets to mean
+ // "initial sync done". This triggers a model thread update, too.
+ EXPECT_EQ(2U, GetNumModelThreadUpdateResponses());
+
+ // The type root and initial sync done updates both contain no entities.
+ EXPECT_EQ(0U, GetNthModelThreadUpdateResponse(0).size());
+ EXPECT_EQ(0U, GetNthModelThreadUpdateResponse(1).size());
+
+ const DataTypeState& state = GetNthModelThreadUpdateState(1);
+ EXPECT_FALSE(state.progress_marker.token().empty());
+ EXPECT_FALSE(state.type_root_id.empty());
+ EXPECT_TRUE(state.initial_sync_done);
+}
+
+// Commit two new entities in two separate commit messages.
+TEST_F(NonBlockingTypeProcessorCoreTest, TwoNewItemsCommittedSeparately) {
+ NormalInitialize();
+
+ // Commit the first of two entities.
+ CommitRequest("tag1", "value1");
+ ASSERT_TRUE(WillCommit());
+ DoSuccessfulCommit();
+ ASSERT_EQ(1U, GetNumCommitMessagesOnServer());
+ EXPECT_EQ(1, GetNthCommitMessageOnServer(0).commit().entries_size());
+ ASSERT_TRUE(HasCommitEntityOnServer("tag1"));
+ const sync_pb::SyncEntity& tag1_entity =
+ GetLatestCommitEntityOnServer("tag1");
+
+ // Commit the second of two entities.
+ CommitRequest("tag2", "value2");
+ ASSERT_TRUE(WillCommit());
+ DoSuccessfulCommit();
+ ASSERT_EQ(2U, GetNumCommitMessagesOnServer());
+ EXPECT_EQ(1, GetNthCommitMessageOnServer(1).commit().entries_size());
+ ASSERT_TRUE(HasCommitEntityOnServer("tag2"));
+ const sync_pb::SyncEntity& tag2_entity =
+ GetLatestCommitEntityOnServer("tag2");
+
+ EXPECT_FALSE(WillCommit());
+
+ // The IDs assigned by the |core_| should be unique.
+ EXPECT_NE(tag1_entity.id_string(), tag2_entity.id_string());
+
+ // Check that the committed specifics values are sane.
+ EXPECT_EQ(tag1_entity.specifics().preference().value(), "value1");
+ EXPECT_EQ(tag2_entity.specifics().preference().value(), "value2");
+
+ // There should have been two separate commit responses sent to the model
+ // thread. They should be uninteresting, so we don't bother inspecting them.
+ EXPECT_EQ(2U, GetNumModelThreadCommitResponses());
+}
+
+TEST_F(NonBlockingTypeProcessorCoreTest, ReceiveUpdates) {
+ NormalInitialize();
+
+ const std::string& tag_hash = GenerateTagHash("tag1");
+
+ TriggerUpdateFromServer(10, "tag1", "value1");
+
+ ASSERT_EQ(1U, GetNumModelThreadUpdateResponses());
+ UpdateResponseDataList updates_list = GetNthModelThreadUpdateResponse(0);
+ ASSERT_EQ(1U, updates_list.size());
+
+ ASSERT_TRUE(HasUpdateResponseOnModelThread("tag1"));
+ UpdateResponseData update = GetUpdateResponseOnModelThread("tag1");
+
+ EXPECT_FALSE(update.id.empty());
+ EXPECT_EQ(tag_hash, update.client_tag_hash);
+ EXPECT_LT(0, update.response_version);
+ EXPECT_FALSE(update.ctime.is_null());
+ EXPECT_FALSE(update.mtime.is_null());
+ EXPECT_EQ("tag1", update.non_unique_name);
+ EXPECT_FALSE(update.deleted);
+ EXPECT_EQ("tag1", update.specifics.preference().name());
+ EXPECT_EQ("value1", update.specifics.preference().value());
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor_interface.cc b/sync/engine/non_blocking_type_processor_interface.cc
new file mode 100644
index 0000000..b936239
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_interface.cc
@@ -0,0 +1,15 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/non_blocking_type_processor_interface.h"
+
+namespace syncer {
+
+NonBlockingTypeProcessorInterface::NonBlockingTypeProcessorInterface() {
+}
+
+NonBlockingTypeProcessorInterface::~NonBlockingTypeProcessorInterface() {
+}
+
+} // namespace syncer
diff --git a/sync/engine/non_blocking_type_processor_interface.h b/sync/engine/non_blocking_type_processor_interface.h
new file mode 100644
index 0000000..ccc504b
--- /dev/null
+++ b/sync/engine/non_blocking_type_processor_interface.h
@@ -0,0 +1,31 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_
+#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_
+
+#include "sync/base/sync_export.h"
+
+struct CommitResponseDataList;
+struct DataTypeState;
+struct UpdateResponseDataList;
+
+namespace syncer {
+
+class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorInterface {
+ public:
+ NonBlockingTypeProcessorInterface();
+ virtual ~NonBlockingTypeProcessorInterface();
+
+ virtual void ReceiveCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) = 0;
+ virtual void ReceiveUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) = 0;
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_INTERFACE_H_
diff --git a/sync/engine/non_blocking_type_processor_unittest.cc b/sync/engine/non_blocking_type_processor_unittest.cc
index 9ab6ca3..3a3ca5c 100644
--- a/sync/engine/non_blocking_type_processor_unittest.cc
+++ b/sync/engine/non_blocking_type_processor_unittest.cc
@@ -134,14 +134,21 @@ class NonBlockingTypeProcessorTest : public ::testing::Test {
NonBlockingTypeProcessorTest();
virtual ~NonBlockingTypeProcessorTest();
- // Explicit initialization step. Kept separate to allow tests to inject
- // on-disk state before the test begins.
- void Initialize();
+ // Initialize with no local state. The processor will be unable to commit
+ // until it receives notification that initial sync has completed.
+ void FirstTimeInitialize();
+
+ // Initialize to a "ready-to-commit" state.
+ void InitializeToReadyState();
// Local data modification. Emulates signals from the model thread.
void WriteItem(const std::string& tag, const std::string& value);
void DeleteItem(const std::string& tag);
+ // Emulates an "initial sync done" message from the
+ // NonBlockingTypeProcessorCore.
+ void OnInitialSyncDone();
+
// Emulate updates from the server.
// This harness has some functionality to help emulate server behavior.
// See the definitions of these methods for more information.
@@ -190,11 +197,20 @@ NonBlockingTypeProcessorTest::NonBlockingTypeProcessorTest()
NonBlockingTypeProcessorTest::~NonBlockingTypeProcessorTest() {
}
-void NonBlockingTypeProcessorTest::Initialize() {
+void NonBlockingTypeProcessorTest::FirstTimeInitialize() {
processor_->Enable(mock_sync_core_proxy_->Clone());
mock_processor_core_ = mock_sync_core_proxy_->GetMockProcessorCore();
}
+void NonBlockingTypeProcessorTest::InitializeToReadyState() {
+ // TODO(rlarocque): This should be updated to inject on-disk state.
+ // At the time this code was written, there was no support for on-disk
+ // state so this was the only way to inject a data_type_state into
+ // the |processor_|.
+ FirstTimeInitialize();
+ OnInitialSyncDone();
+}
+
void NonBlockingTypeProcessorTest::WriteItem(const std::string& tag,
const std::string& value) {
const std::string tag_hash = GenerateTagHash(tag);
@@ -205,6 +221,13 @@ void NonBlockingTypeProcessorTest::DeleteItem(const std::string& tag) {
processor_->Delete(tag);
}
+void NonBlockingTypeProcessorTest::OnInitialSyncDone() {
+ data_type_state_.initial_sync_done = true;
+ UpdateResponseDataList empty_update_list;
+
+ processor_->OnUpdateReceived(data_type_state_, empty_update_list);
+}
+
void NonBlockingTypeProcessorTest::UpdateFromServer(int64 version_offset,
const std::string& tag,
const std::string& value) {
@@ -385,7 +408,7 @@ CommitRequestData NonBlockingTypeProcessorTest::GetLatestCommitRequestForTag(
// Creates a new item locally.
// Thoroughly tests the data generated by a local item creation.
TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) {
- Initialize();
+ InitializeToReadyState();
EXPECT_EQ(0U, GetNumCommitRequestLists());
WriteItem("tag1", "value1");
@@ -396,7 +419,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) {
const CommitRequestData& tag1_data = GetLatestCommitRequestForTag("tag1");
EXPECT_TRUE(tag1_data.id.empty());
- EXPECT_EQ(0, tag1_data.base_version);
+ EXPECT_EQ(kUncommittedVersion, tag1_data.base_version);
EXPECT_FALSE(tag1_data.ctime.is_null());
EXPECT_FALSE(tag1_data.mtime.is_null());
EXPECT_EQ("tag1", tag1_data.non_unique_name);
@@ -408,7 +431,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateLocalItem) {
// Creates a new local item then modifies it.
// Thoroughly tests data generated by modification of server-unknown item.
TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) {
- Initialize();
+ InitializeToReadyState();
EXPECT_EQ(0U, GetNumCommitRequestLists());
WriteItem("tag1", "value1");
@@ -428,7 +451,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) {
// Perform a thorough examination of the update-generated request.
EXPECT_TRUE(tag1_v2_data.id.empty());
- EXPECT_EQ(0, tag1_v2_data.base_version);
+ EXPECT_EQ(kUncommittedVersion, tag1_v2_data.base_version);
EXPECT_FALSE(tag1_v2_data.ctime.is_null());
EXPECT_FALSE(tag1_v2_data.mtime.is_null());
EXPECT_EQ("tag1", tag1_v2_data.non_unique_name);
@@ -440,7 +463,7 @@ TEST_F(NonBlockingTypeProcessorTest, CreateAndModifyLocalItem) {
// Deletes an item we've never seen before.
// Should have no effect and not crash.
TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) {
- Initialize();
+ InitializeToReadyState();
DeleteItem("tag1");
EXPECT_EQ(0U, GetNumCommitRequestLists());
@@ -452,7 +475,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteUnknown) {
// server-unknown as far as the model thread is concerned. That behavior
// is race-dependent; other tests are used to test other races.
TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) {
- Initialize();
+ InitializeToReadyState();
WriteItem("tag1", "value1");
EXPECT_EQ(1U, GetNumCommitRequestLists());
@@ -467,7 +490,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) {
EXPECT_GT(tag1_v2_data.sequence_number, tag1_v1_data.sequence_number);
EXPECT_TRUE(tag1_v2_data.id.empty());
- EXPECT_EQ(0, tag1_v2_data.base_version);
+ EXPECT_EQ(kUncommittedVersion, tag1_v2_data.base_version);
EXPECT_TRUE(tag1_v2_data.deleted);
}
@@ -477,7 +500,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown) {
// successfully commits it, but, before the commit response is picked up
// by the model thread, the item is deleted by the model thread.
TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) {
- Initialize();
+ InitializeToReadyState();
WriteItem("tag1", "value1");
EXPECT_EQ(1U, GetNumCommitRequestLists());
@@ -501,7 +524,7 @@ TEST_F(NonBlockingTypeProcessorTest, DeleteServerUnknown_RacyCommitResponse) {
// Creates two different sync items.
// Verifies that the second has no effect on the first.
TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) {
- Initialize();
+ InitializeToReadyState();
EXPECT_EQ(0U, GetNumCommitRequestLists());
WriteItem("tag1", "value1");
@@ -519,6 +542,20 @@ TEST_F(NonBlockingTypeProcessorTest, TwoIndependentItems) {
ASSERT_TRUE(HasCommitRequestForTag("tag2"));
}
+// Starts the processor with no local state.
+// Verify that it waits until initial sync is complete before requesting
+// commits.
+TEST_F(NonBlockingTypeProcessorTest, NoCommitsUntilInitialSyncDone) {
+ FirstTimeInitialize();
+
+ WriteItem("tag1", "value1");
+ EXPECT_EQ(0U, GetNumCommitRequestLists());
+
+ OnInitialSyncDone();
+ EXPECT_EQ(1U, GetNumCommitRequestLists());
+ EXPECT_TRUE(HasCommitRequestForTag("tag1"));
+}
+
// TODO(rlarocque): Add more testing of non_unique_name fields.
} // namespace syncer
diff --git a/sync/engine/sync_thread_sync_entity.cc b/sync/engine/sync_thread_sync_entity.cc
new file mode 100644
index 0000000..21899d2
--- /dev/null
+++ b/sync/engine/sync_thread_sync_entity.cc
@@ -0,0 +1,242 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/sync_thread_sync_entity.h"
+
+#include "base/logging.h"
+#include "sync/engine/non_blocking_sync_common.h"
+#include "sync/internal_api/public/base/model_type.h"
+#include "sync/syncable/syncable_util.h"
+#include "sync/util/time.h"
+
+namespace syncer {
+
+SyncThreadSyncEntity* SyncThreadSyncEntity::FromServerUpdate(
+ const std::string& id_string,
+ const std::string& client_tag_hash,
+ int64 received_version) {
+ return new SyncThreadSyncEntity(
+ id_string, client_tag_hash, 0, received_version);
+}
+
+SyncThreadSyncEntity* SyncThreadSyncEntity::FromCommitRequest(
+ const std::string& id_string,
+ const std::string& client_tag_hash,
+ int64 sequence_number,
+ int64 base_version,
+ base::Time ctime,
+ base::Time mtime,
+ const std::string& non_unique_name,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics) {
+ return new SyncThreadSyncEntity(id_string,
+ client_tag_hash,
+ 0,
+ 0,
+ true,
+ sequence_number,
+ base_version,
+ ctime,
+ mtime,
+ non_unique_name,
+ deleted,
+ specifics);
+}
+
+// Constructor that does not set any pending commit fields.
+SyncThreadSyncEntity::SyncThreadSyncEntity(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ int64 highest_commit_response_version,
+ int64 highest_gu_response_version)
+ : id_(id),
+ client_tag_hash_(client_tag_hash),
+ highest_commit_response_version_(highest_commit_response_version),
+ highest_gu_response_version_(highest_gu_response_version),
+ is_commit_pending_(false),
+ sequence_number_(0),
+ base_version_(0),
+ deleted_(false) {
+}
+
+SyncThreadSyncEntity::SyncThreadSyncEntity(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ int64 highest_commit_response_version,
+ int64 highest_gu_response_version,
+ bool is_commit_pending,
+ int64 sequence_number,
+ int64 base_version,
+ base::Time ctime,
+ base::Time mtime,
+ const std::string& non_unique_name,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics)
+ : id_(id),
+ client_tag_hash_(client_tag_hash),
+ highest_commit_response_version_(highest_commit_response_version),
+ highest_gu_response_version_(highest_gu_response_version),
+ is_commit_pending_(is_commit_pending),
+ sequence_number_(sequence_number),
+ base_version_(base_version),
+ ctime_(ctime),
+ mtime_(mtime),
+ non_unique_name_(non_unique_name),
+ deleted_(deleted),
+ specifics_(specifics) {
+}
+
+SyncThreadSyncEntity::~SyncThreadSyncEntity() {
+}
+
+bool SyncThreadSyncEntity::IsCommitPending() const {
+ return is_commit_pending_;
+}
+
+void SyncThreadSyncEntity::PrepareCommitProto(
+ sync_pb::SyncEntity* commit_entity,
+ int64* sequence_number) const {
+ // Set ID if we have a server-assigned ID. Otherwise, it will be up to
+ // our caller to assign a client-unique initial ID.
+ if (base_version_ != kUncommittedVersion) {
+ commit_entity->set_id_string(id_);
+ }
+
+ commit_entity->set_client_defined_unique_tag(client_tag_hash_);
+ commit_entity->set_version(base_version_);
+ commit_entity->set_deleted(deleted_);
+ commit_entity->set_folder(false);
+ commit_entity->set_name(non_unique_name_);
+ if (!deleted_) {
+ commit_entity->set_ctime(TimeToProtoTime(ctime_));
+ commit_entity->set_mtime(TimeToProtoTime(mtime_));
+ commit_entity->mutable_specifics()->CopyFrom(specifics_);
+ }
+
+ *sequence_number = sequence_number_;
+}
+
+void SyncThreadSyncEntity::RequestCommit(
+ const std::string& id,
+ const std::string& client_tag_hash,
+ int64 sequence_number,
+ int64 base_version,
+ base::Time ctime,
+ base::Time mtime,
+ const std::string& non_unique_name,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics) {
+ DCHECK_GE(base_version, base_version_)
+ << "Base version should never decrease";
+
+ DCHECK_GE(sequence_number, sequence_number_)
+ << "Sequence number should never decrease";
+
+ // Update our book-keeping counters.
+ base_version_ = base_version;
+ sequence_number_ = sequence_number;
+
+ // Do our counter values indicate a conflict? If so, don't commit.
+ //
+ // There's no need to inform the model thread of the conflict. The
+ // conflicting update has already been posted to its task runner; it will
+ // figure it out as soon as it runs that task.
+ is_commit_pending_ = true;
+ if (IsInConflict()) {
+ ClearPendingCommit();
+ return;
+ }
+
+ // We don't commit deletions of server-unknown items.
+ if (deleted && !IsServerKnown()) {
+ ClearPendingCommit();
+ return;
+ }
+
+ // Otherwise, we should store the data associated with this pending commit
+ // so we're ready to commit at the next possible opportunity.
+
+ // We intentionally don't update the id_ here. Good ID values come from the
+ // server and always pass through the sync thread first. There's no way the
+ // model thread could have a better ID value than we do.
+
+ // This entity is identified by its client tag. That value can never change.
+ DCHECK_EQ(client_tag_hash_, client_tag_hash);
+
+ // Set the fields for the pending commit.
+ ctime_ = ctime;
+ mtime_ = mtime;
+ non_unique_name_ = non_unique_name;
+ deleted_ = deleted;
+ specifics_ = specifics;
+}
+
+void SyncThreadSyncEntity::ReceiveCommitResponse(const std::string& response_id,
+ int64 response_version,
+ int64 sequence_number) {
+ // Commit responses, especially after the first commit, can update our ID.
+ id_ = response_id;
+
+ DCHECK_GT(response_version, highest_commit_response_version_)
+ << "Had expected higher response version."
+ << " id: " << id_;
+
+ // Commits are synchronous, so there's no reason why the sequence numbers
+ // wouldn't match.
+ DCHECK_EQ(sequence_number_, sequence_number)
+ << "Unexpected sequence number mismatch."
+ << " id: " << id_;
+
+ highest_commit_response_version_ = response_version;
+
+ // Because an in-progress commit blocks the sync thread, we can assume that
+ // the item we just committed successfully is exactly the one we have now.
+ // Nothing changed it while the commit was happening. Since we're now in
+ // sync with the server, we can clear the pending commit.
+ ClearPendingCommit();
+}
+
+void SyncThreadSyncEntity::ReceiveUpdate(int64 version) {
+ highest_gu_response_version_ =
+ std::max(highest_gu_response_version_, version);
+
+ if (IsInConflict()) {
+ // Incoming update clobbers the pending commit on the sync thread.
+ // The model thread can re-request this commit later if it wants to.
+ ClearPendingCommit();
+ }
+}
+
+bool SyncThreadSyncEntity::IsInConflict() const {
+ if (!is_commit_pending_)
+ return false;
+
+ if (highest_gu_response_version_ <= highest_commit_response_version_) {
+ // The most recent server state was created in a commit made by this
+ // client. We're fully up to date, and therefore not in conflict.
+ return false;
+ } else {
+ // The most recent server state was written by someone else.
+ // Did the model thread have the most up to date version when it issued the
+ // commit request?
+ if (base_version_ >= highest_gu_response_version_) {
+ return false; // Yes.
+ } else {
+ return true; // No.
+ }
+ }
+}
+
+bool SyncThreadSyncEntity::IsServerKnown() const {
+ return base_version_ != kUncommittedVersion;
+}
+
+void SyncThreadSyncEntity::ClearPendingCommit() {
+ is_commit_pending_ = false;
+
+ // Clearing the specifics might free up some memory. It can't hurt to try.
+ specifics_.Clear();
+}
+
+} // namespace syncer
diff --git a/sync/engine/sync_thread_sync_entity.h b/sync/engine/sync_thread_sync_entity.h
new file mode 100644
index 0000000..6ed9685
--- /dev/null
+++ b/sync/engine/sync_thread_sync_entity.h
@@ -0,0 +1,155 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_
+#define SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_
+
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/time/time.h"
+#include "sync/base/sync_export.h"
+#include "sync/protocol/sync.pb.h"
+
+namespace syncer {
+
+// Manages the pending commit and update state for an entity on the sync
+// thread.
+//
+// It should be considered a helper class internal to the
+// NonBlockingTypeProcessorCore.
+//
+// Maintains the state associated with a particular sync entity which is
+// necessary for decision-making on the sync thread. It can track pending
+// commit state, received update state, and can detect conflicts.
+//
+// This object may or may not contain state associated with a pending commit.
+// If no commit is pending, the |is_commit_pending_| flag will be set to false
+// and many of this object's fields will be cleared.
+class SYNC_EXPORT SyncThreadSyncEntity {
+ public:
+ ~SyncThreadSyncEntity();
+
+ // Initialize a new entity based on an update response.
+ static SyncThreadSyncEntity* FromServerUpdate(
+ const std::string& id_string,
+ const std::string& client_tag_hash,
+ int64 version);
+
+ // Initialize a new entity based on a commit request.
+ static SyncThreadSyncEntity* FromCommitRequest(
+ const std::string& id_string,
+ const std::string& client_tag_hash,
+ int64 sequence_number,
+ int64 base_version,
+ base::Time ctime,
+ base::Time mtime,
+ const std::string& non_unique_name,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics);
+
+ // Returns true if this entity should be commited to the server.
+ bool IsCommitPending() const;
+
+ // Populates a sync_pb::SyncEntity for a commit. Also sets the
+ // |sequence_number|, so we can track it throughout the commit process.
+ void PrepareCommitProto(sync_pb::SyncEntity* commit_entity,
+ int64* sequence_number) const;
+
+ // Updates this entity with data from the latest version that the
+ // model asked us to commit. May clobber state related to the
+ // model's previous commit attempt(s).
+ void RequestCommit(const std::string& id,
+ const std::string& client_tag_hash,
+ int64 sequence_number,
+ int64 base_version,
+ base::Time ctime,
+ base::Time mtime,
+ const std::string& non_unique_name,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics);
+
+ // Handles the receipt of a commit response.
+ //
+ // Since commits happen entirely on the sync thread, we can safely assume
+ // that our item's state at the end of the commit is the same as it was at
+ // the start.
+ void ReceiveCommitResponse(const std::string& response_id,
+ int64 response_version,
+ int64 sequence_number);
+
+ // Handles receipt of an update from the server.
+ void ReceiveUpdate(int64 version);
+
+ private:
+ // Initializes received update state. Does not initialize state related to
+ // pending commits and sets |is_commit_pending_| to false.
+ SyncThreadSyncEntity(const std::string& id,
+ const std::string& client_tag_hash,
+ int64 highest_commit_response_version,
+ int64 highest_gu_response_version);
+
+ // Initializes all fields. Sets |is_commit_pending_| to true.
+ SyncThreadSyncEntity(const std::string& id,
+ const std::string& client_tag_hash,
+ int64 highest_commit_response_version,
+ int64 highest_gu_response_version,
+ bool is_commit_pending,
+ int64 sequence_number,
+ int64 base_version,
+ base::Time ctime,
+ base::Time mtime,
+ const std::string& non_unique_name,
+ bool deleted,
+ const sync_pb::EntitySpecifics& specifics);
+
+ // Checks if the current state indicates a conflict.
+ //
+ // This can be true only while a call to this object is in progress.
+ // Conflicts are always cleared before the method call ends.
+ bool IsInConflict() const;
+
+ // Checks if the server knows about this item.
+ bool IsServerKnown() const;
+
+ // Clears flag and optionally clears state associated with a pending commit.
+ void ClearPendingCommit();
+
+ // The ID for this entry. May be empty if the entry has never been committed.
+ std::string id_;
+
+ // The hashed client tag for this entry.
+ std::string client_tag_hash_;
+
+ // The highest version seen in a commit response for this entry.
+ int64 highest_commit_response_version_;
+
+ // The highest version seen in a GU response for this entry.
+ int64 highest_gu_response_version_;
+
+ // Flag that indicates whether or not we're waiting for a chance to commit
+ // this item.
+ bool is_commit_pending_;
+
+ // Used to track in-flight commit requests on the model thread. All we need
+ // to do here is return it back to the model thread when the pending commit
+ // is completed and confirmed. Not valid if no commit is pending.
+ int64 sequence_number_;
+
+ // The following fields are valid only when a commit is pending.
+ // This is where we store the data that is to be sent up to the server
+ // at the next possible opportunity.
+ int64 base_version_;
+ base::Time ctime_;
+ base::Time mtime_;
+ std::string non_unique_name_;
+ bool deleted_;
+ sync_pb::EntitySpecifics specifics_;
+
+ DISALLOW_COPY_AND_ASSIGN(SyncThreadSyncEntity);
+};
+
+} // namespace syncer
+
+#endif // SYNC_ENGINE_SYNC_THREAD_SYNC_ENTITY_H_
diff --git a/sync/engine/sync_thread_sync_entity_unittest.cc b/sync/engine/sync_thread_sync_entity_unittest.cc
new file mode 100644
index 0000000..be2604b
--- /dev/null
+++ b/sync/engine/sync_thread_sync_entity_unittest.cc
@@ -0,0 +1,164 @@
+
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/engine/sync_thread_sync_entity.h"
+
+#include "base/memory/scoped_ptr.h"
+#include "base/time/time.h"
+#include "sync/internal_api/public/base/model_type.h"
+#include "sync/syncable/syncable_util.h"
+#include "sync/util/time.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace syncer {
+
+// Some simple tests for the SyncThreadSyncEntity.
+//
+// The SyncThreadSyncEntity is an implementation detail of the
+// NonBlockingTypeProcessorCore. As such, it doesn't make much sense to test
+// it exhaustively, since it already gets a lot of test coverage from the
+// NonBlockingTypeProcessorCore unit tests.
+//
+// These tests are intended as a basic sanity check. Anything more complicated
+// would be redundant.
+class SyncThreadSyncEntityTest : public ::testing::Test {
+ public:
+ SyncThreadSyncEntityTest()
+ : kServerId("ServerID"),
+ kClientTag("some.sample.tag"),
+ kClientTagHash(syncable::GenerateSyncableHash(PREFERENCES, kClientTag)),
+ kCtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(10)),
+ kMtime(base::Time::UnixEpoch() + base::TimeDelta::FromDays(20)) {
+ specifics.mutable_preference()->set_name(kClientTag);
+ specifics.mutable_preference()->set_value("pref.value");
+ }
+
+ virtual ~SyncThreadSyncEntityTest() {}
+
+ const std::string kServerId;
+ const std::string kClientTag;
+ const std::string kClientTagHash;
+ const base::Time kCtime;
+ const base::Time kMtime;
+ sync_pb::EntitySpecifics specifics;
+};
+
+// Construct a new entity from a server update. Then receive another update.
+TEST_F(SyncThreadSyncEntityTest, FromServerUpdate) {
+ scoped_ptr<SyncThreadSyncEntity> entity(
+ SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10));
+ EXPECT_FALSE(entity->IsCommitPending());
+
+ entity->ReceiveUpdate(20);
+ EXPECT_FALSE(entity->IsCommitPending());
+}
+
+// Construct a new entity from a commit request. Then serialize it.
+TEST_F(SyncThreadSyncEntityTest, FromCommitRequest) {
+ scoped_ptr<SyncThreadSyncEntity> entity(
+ SyncThreadSyncEntity::FromCommitRequest(kServerId,
+ kClientTagHash,
+ 22,
+ 33,
+ kCtime,
+ kMtime,
+ kClientTag,
+ false,
+ specifics));
+
+ ASSERT_TRUE(entity->IsCommitPending());
+ sync_pb::SyncEntity pb_entity;
+ int64 sequence_number = 0;
+ entity->PrepareCommitProto(&pb_entity, &sequence_number);
+ EXPECT_EQ(22, sequence_number);
+ EXPECT_EQ(kServerId, pb_entity.id_string());
+ EXPECT_EQ(kClientTagHash, pb_entity.client_defined_unique_tag());
+ EXPECT_EQ(33, pb_entity.version());
+ EXPECT_EQ(kCtime, ProtoTimeToTime(pb_entity.ctime()));
+ EXPECT_EQ(kMtime, ProtoTimeToTime(pb_entity.mtime()));
+ EXPECT_FALSE(pb_entity.deleted());
+ EXPECT_EQ(specifics.preference().name(),
+ pb_entity.specifics().preference().name());
+ EXPECT_EQ(specifics.preference().value(),
+ pb_entity.specifics().preference().value());
+}
+
+// Start with a server initiated entity. Commit over top of it.
+TEST_F(SyncThreadSyncEntityTest, RequestCommit) {
+ scoped_ptr<SyncThreadSyncEntity> entity(
+ SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10));
+
+ entity->RequestCommit(kServerId,
+ kClientTagHash,
+ 1,
+ 10,
+ kCtime,
+ kMtime,
+ kClientTag,
+ false,
+ specifics);
+
+ EXPECT_TRUE(entity->IsCommitPending());
+}
+
+// Start with a server initiated entity. Fail to request a commit because of
+// an out of date base version.
+TEST_F(SyncThreadSyncEntityTest, RequestCommitFailure) {
+ scoped_ptr<SyncThreadSyncEntity> entity(
+ SyncThreadSyncEntity::FromServerUpdate(kServerId, kClientTagHash, 10));
+ EXPECT_FALSE(entity->IsCommitPending());
+
+ entity->RequestCommit(kServerId,
+ kClientTagHash,
+ 23,
+ 5, // Version 5 < 10
+ kCtime,
+ kMtime,
+ kClientTag,
+ false,
+ specifics);
+ EXPECT_FALSE(entity->IsCommitPending());
+}
+
+// Start with a pending commit. Clobber it with an incoming update.
+TEST_F(SyncThreadSyncEntityTest, UpdateClobbersCommit) {
+ scoped_ptr<SyncThreadSyncEntity> entity(
+ SyncThreadSyncEntity::FromCommitRequest(kServerId,
+ kClientTagHash,
+ 22,
+ 33,
+ kCtime,
+ kMtime,
+ kClientTag,
+ false,
+ specifics));
+
+ EXPECT_TRUE(entity->IsCommitPending());
+
+ entity->ReceiveUpdate(400); // Version 400 > 33.
+ EXPECT_FALSE(entity->IsCommitPending());
+}
+
+// Start with a pending commit. Send it a reflected update that
+// will not override the in-progress commit.
+TEST_F(SyncThreadSyncEntityTest, ReflectedUpdateDoesntClobberCommit) {
+ scoped_ptr<SyncThreadSyncEntity> entity(
+ SyncThreadSyncEntity::FromCommitRequest(kServerId,
+ kClientTagHash,
+ 22,
+ 33,
+ kCtime,
+ kMtime,
+ kClientTag,
+ false,
+ specifics));
+
+ EXPECT_TRUE(entity->IsCommitPending());
+
+ entity->ReceiveUpdate(33); // Version 33 == 33.
+ EXPECT_TRUE(entity->IsCommitPending());
+}
+
+} // namespace syncer
diff --git a/sync/protocol/proto_value_conversions.cc b/sync/protocol/proto_value_conversions.cc
index e1c8851..5ea434c 100644
--- a/sync/protocol/proto_value_conversions.cc
+++ b/sync/protocol/proto_value_conversions.cc
@@ -813,6 +813,8 @@ base::StringValue* UniquePositionToStringValue(
return new base::StringValue(pos.ToDebugString());
}
+} // namespace
+
base::DictionaryValue* SyncEntityToValue(const sync_pb::SyncEntity& proto,
bool include_specifics) {
base::DictionaryValue* value = new base::DictionaryValue();
@@ -839,6 +841,8 @@ base::DictionaryValue* SyncEntityToValue(const sync_pb::SyncEntity& proto,
return value;
}
+namespace {
+
base::ListValue* SyncEntitiesToValue(
const ::google::protobuf::RepeatedPtrField<sync_pb::SyncEntity>& entities,
bool include_specifics) {
diff --git a/sync/protocol/proto_value_conversions.h b/sync/protocol/proto_value_conversions.h
index c8e5166c..2a3c1ae 100644
--- a/sync/protocol/proto_value_conversions.h
+++ b/sync/protocol/proto_value_conversions.h
@@ -63,6 +63,7 @@ class SessionTab;
class SessionWindow;
class SimpleCollapsedLayout;
class SyncCycleCompletedEventInfo;
+class SyncEntity;
class SyncedNotification;
class SyncedNotificationAction;
class SyncedNotificationAppInfo;
@@ -278,6 +279,10 @@ SYNC_EXPORT_PRIVATE base::DictionaryValue* TypedUrlSpecificsToValue(
SYNC_EXPORT_PRIVATE base::DictionaryValue* EntitySpecificsToValue(
const sync_pb::EntitySpecifics& specifics);
+SYNC_EXPORT_PRIVATE base::DictionaryValue* SyncEntityToValue(
+ const sync_pb::SyncEntity& entity,
+ bool include_specifics);
+
SYNC_EXPORT_PRIVATE base::DictionaryValue* ClientToServerMessageToValue(
const sync_pb::ClientToServerMessage& proto,
bool include_specifics);
diff --git a/sync/sessions/model_type_registry.cc b/sync/sessions/model_type_registry.cc
index 7ca68c7..9ceac74 100644
--- a/sync/sessions/model_type_registry.cc
+++ b/sync/sessions/model_type_registry.cc
@@ -13,12 +13,64 @@
#include "sync/engine/non_blocking_type_processor.h"
#include "sync/engine/non_blocking_type_processor_core.h"
#include "sync/engine/non_blocking_type_processor_core_interface.h"
+#include "sync/engine/non_blocking_type_processor_interface.h"
#include "sync/sessions/directory_type_debug_info_emitter.h"
namespace syncer {
namespace {
+class NonBlockingTypeProcessorWrapper
+ : public NonBlockingTypeProcessorInterface {
+ public:
+ NonBlockingTypeProcessorWrapper(
+ base::WeakPtr<NonBlockingTypeProcessor> processor,
+ scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
+ virtual ~NonBlockingTypeProcessorWrapper();
+
+ virtual void ReceiveCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) OVERRIDE;
+ virtual void ReceiveUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) OVERRIDE;
+
+ private:
+ base::WeakPtr<NonBlockingTypeProcessor> processor_;
+ scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
+};
+
+NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
+ base::WeakPtr<NonBlockingTypeProcessor> processor,
+ scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
+ : processor_(processor), processor_task_runner_(processor_task_runner) {
+}
+
+NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
+}
+
+void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ processor_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
+ processor_,
+ type_state,
+ response_list));
+}
+
+void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
+ const DataTypeState& type_state,
+ const UpdateResponseDataList& response_list) {
+ processor_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
+ processor_,
+ type_state,
+ response_list));
+}
+
class NonBlockingTypeProcessorCoreWrapper
: public NonBlockingTypeProcessorCoreInterface {
public:
@@ -47,7 +99,7 @@ void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
const CommitRequestDataList& list) {
sync_thread_->PostTask(
FROM_HERE,
- base::Bind(&NonBlockingTypeProcessorCore::RequestCommits, core_, list));
+ base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
}
} // namespace
@@ -139,10 +191,11 @@ void ModelTypeRegistry::InitializeNonBlockingType(
DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
// Initialize CoreProcessor -> Processor communication channel.
+ scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
+ new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
scoped_ptr<NonBlockingTypeProcessorCore> core(
- new NonBlockingTypeProcessorCore(type, type_task_runner, processor));
-
- // TODO(rlarocque): DataTypeState should be forwarded to core here.
+ new NonBlockingTypeProcessorCore(
+ type, data_type_state, processor_interface.Pass()));
// Initialize Processor -> CoreProcessor communication channel.
scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
diff --git a/sync/sync_core.gypi b/sync/sync_core.gypi
index bec8952..8772d43 100644
--- a/sync/sync_core.gypi
+++ b/sync/sync_core.gypi
@@ -68,12 +68,16 @@
'engine/net/url_translator.h',
'engine/non_blocking_sync_common.cc',
'engine/non_blocking_sync_common.h',
+ 'engine/non_blocking_type_commit_contribution.cc',
+ 'engine/non_blocking_type_commit_contribution.h',
'engine/non_blocking_type_processor.cc',
'engine/non_blocking_type_processor.h',
'engine/non_blocking_type_processor_core.cc',
'engine/non_blocking_type_processor_core.h',
'engine/non_blocking_type_processor_core_interface.cc',
'engine/non_blocking_type_processor_core_interface.h',
+ 'engine/non_blocking_type_processor_interface.cc',
+ 'engine/non_blocking_type_processor_interface.h',
'engine/nudge_source.cc',
'engine/nudge_source.h',
'engine/process_updates_util.cc',
@@ -86,6 +90,8 @@
'engine/sync_scheduler.h',
'engine/sync_scheduler_impl.cc',
'engine/sync_scheduler_impl.h',
+ 'engine/sync_thread_sync_entity.cc',
+ 'engine/sync_thread_sync_entity.h',
'engine/syncer.cc',
'engine/syncer.h',
'engine/syncer_proto_util.cc',
diff --git a/sync/sync_tests.gypi b/sync/sync_tests.gypi
index cd4f933..eea0d38 100644
--- a/sync/sync_tests.gypi
+++ b/sync/sync_tests.gypi
@@ -300,8 +300,10 @@
'engine/directory_update_handler_unittest.cc',
'engine/get_updates_processor_unittest.cc',
'engine/model_thread_sync_entity_unittest.cc',
+ 'engine/non_blocking_type_processor_core_unittest.cc',
'engine/non_blocking_type_processor_unittest.cc',
'engine/sync_scheduler_unittest.cc',
+ 'engine/sync_thread_sync_entity_unittest.cc',
'engine/syncer_proto_util_unittest.cc',
'engine/syncer_unittest.cc',
'engine/syncer_util_unittest.cc',