diff options
Diffstat (limited to 'sync/engine/non_blocking_type_processor.cc')
-rw-r--r-- | sync/engine/non_blocking_type_processor.cc | 149 |
1 files changed, 142 insertions, 7 deletions
diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc index 66a016b..0aaead4 100644 --- a/sync/engine/non_blocking_type_processor.cc +++ b/sync/engine/non_blocking_type_processor.cc @@ -4,9 +4,13 @@ #include "sync/engine/non_blocking_type_processor.h" +#include "base/bind.h" +#include "base/location.h" #include "base/message_loop/message_loop_proxy.h" -#include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/engine/model_thread_sync_entity.h" +#include "sync/engine/non_blocking_type_processor_core_interface.h" #include "sync/internal_api/public/sync_core_proxy.h" +#include "sync/syncable/syncable_util.h" namespace syncer { @@ -14,6 +18,7 @@ NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type) : type_(type), is_preferred_(false), is_connected_(false), + entities_deleter_(&entities_), weak_ptr_factory_for_ui_(this), weak_ptr_factory_for_sync_(this) { } @@ -40,9 +45,17 @@ void NonBlockingTypeProcessor::Enable( scoped_ptr<SyncCoreProxy> sync_core_proxy) { DCHECK(CalledOnValidThread()); DVLOG(1) << "Asked to enable " << ModelTypeToString(type_); + is_preferred_ = true; + + // TODO(rlarocque): At some point, this should be loaded from storage. + data_type_state_.progress_marker.set_data_type_id( + GetSpecificsFieldNumberFromModelType(type_)); + data_type_state_.next_client_id = 0; + sync_core_proxy_ = sync_core_proxy.Pass(); sync_core_proxy_->ConnectTypeToCore(GetModelType(), + data_type_state_, weak_ptr_factory_for_sync_.GetWeakPtr()); } @@ -63,8 +76,7 @@ void NonBlockingTypeProcessor::Disconnect() { } weak_ptr_factory_for_sync_.InvalidateWeakPtrs(); - core_ = base::WeakPtr<NonBlockingTypeProcessorCore>(); - sync_thread_ = scoped_refptr<base::SequencedTaskRunner>(); + core_interface_.reset(); } base::WeakPtr<NonBlockingTypeProcessor> @@ -74,13 +86,136 @@ NonBlockingTypeProcessor::AsWeakPtrForUI() { } void NonBlockingTypeProcessor::OnConnect( - base::WeakPtr<NonBlockingTypeProcessorCore> core, - scoped_refptr<base::SequencedTaskRunner> sync_thread) { + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) { DCHECK(CalledOnValidThread()); DVLOG(1) << "Successfully connected " << ModelTypeToString(type_); + is_connected_ = true; - core_ = core; - sync_thread_ = sync_thread; + core_interface_ = core_interface.Pass(); + + FlushPendingCommitRequests(); +} + +void NonBlockingTypeProcessor::Put(const std::string& client_tag, + const sync_pb::EntitySpecifics& specifics) { + DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics)); + + const std::string client_tag_hash( + syncable::GenerateSyncableHash(type_, client_tag)); + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + scoped_ptr<ModelThreadSyncEntity> entity( + ModelThreadSyncEntity::NewLocalItem( + client_tag, specifics, base::Time::Now())); + entities_.insert(std::make_pair(client_tag_hash, entity.release())); + } else { + ModelThreadSyncEntity* entity = it->second; + entity->MakeLocalChange(specifics); + } + + FlushPendingCommitRequests(); +} + +void NonBlockingTypeProcessor::Delete(const std::string& client_tag) { + const std::string client_tag_hash( + syncable::GenerateSyncableHash(type_, client_tag)); + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + // That's unusual, but not necessarily a bad thing. + // Missing is as good as deleted as far as the model is concerned. + DLOG(WARNING) << "Attempted to delete missing item." + << " client tag: " << client_tag; + } else { + ModelThreadSyncEntity* entity = it->second; + entity->Delete(); + } + + FlushPendingCommitRequests(); +} + +void NonBlockingTypeProcessor::FlushPendingCommitRequests() { + CommitRequestDataList commit_requests; + + // Don't bother sending anything if there's no one to send to. + if (!IsConnected()) + return; + + // TODO(rlarocque): Do something smarter than iterate here. + for (EntityMap::iterator it = entities_.begin(); it != entities_.end(); + ++it) { + if (it->second->RequiresCommitRequest()) { + CommitRequestData request; + it->second->InitializeCommitRequestData(&request); + commit_requests.push_back(request); + it->second->SetCommitRequestInProgress(); + } + } + + if (!commit_requests.empty()) + core_interface_->RequestCommits(commit_requests); +} + +void NonBlockingTypeProcessor::OnCommitCompletion( + const DataTypeState& type_state, + const CommitResponseDataList& response_list) { + data_type_state_ = type_state; + + for (CommitResponseDataList::const_iterator list_it = response_list.begin(); + list_it != response_list.end(); + ++list_it) { + const CommitResponseData& response_data = *list_it; + const std::string& client_tag_hash = response_data.client_tag_hash; + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + NOTREACHED() << "Received commit response for missing item." + << " type: " << type_ << " client_tag: " << client_tag_hash; + return; + } else { + it->second->ReceiveCommitResponse(response_data.id, + response_data.sequence_number, + response_data.response_version); + } + } +} + +void NonBlockingTypeProcessor::OnUpdateReceived( + const DataTypeState& data_type_state, + const UpdateResponseDataList& response_list) { + data_type_state_ = data_type_state; + + for (UpdateResponseDataList::const_iterator list_it = response_list.begin(); + list_it != response_list.end(); + ++list_it) { + const UpdateResponseData& response_data = *list_it; + const std::string& client_tag_hash = response_data.client_tag_hash; + + EntityMap::iterator it = entities_.find(client_tag_hash); + if (it == entities_.end()) { + scoped_ptr<ModelThreadSyncEntity> entity = + ModelThreadSyncEntity::FromServerUpdate( + response_data.id, + response_data.client_tag_hash, + response_data.non_unique_name, + response_data.response_version, + response_data.specifics, + response_data.deleted, + response_data.ctime, + response_data.mtime); + entities_.insert(std::make_pair(client_tag_hash, entity.release())); + } else { + ModelThreadSyncEntity* entity = it->second; + entity->ApplyUpdateFromServer(response_data.response_version, + response_data.deleted, + response_data.specifics, + response_data.mtime); + // TODO: Do something special when conflicts are detected. + } + } + + // TODO: Inform the model of the new or updated data. } } // namespace syncer |