diff options
author | rlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-03-27 17:13:43 +0000 |
---|---|---|
committer | rlarocque@chromium.org <rlarocque@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-03-27 17:13:43 +0000 |
commit | bd7c4fda2b543725ec3ab29952d64674d86d6b27 (patch) | |
tree | 2707e3dfc1c1e04fbf6b07e87053a2bac48cafec | |
parent | 6c8715d3e19279d7b28be72c703d1af8fbfef80c (diff) | |
download | chromium_src-bd7c4fda2b543725ec3ab29952d64674d86d6b27.zip chromium_src-bd7c4fda2b543725ec3ab29952d64674d86d6b27.tar.gz chromium_src-bd7c4fda2b543725ec3ab29952d64674d86d6b27.tar.bz2 |
sync: Add NonBlockingTypeProcesssor and SyncCore
Introduces the model-thread sibling of the NonBlockingTypeProcessorCore
that was added in r258390. Also adds SyncCore and SyncCoreProxy, the
classes that will be used to connect the two.
The SyncCore lives on the sync thread and carries out requests sent to
it by the SyncCoreProxy. The SyncCoreProxy is a thread-safe wrapper
around the SyncCore that can be easily copied and whose methods can be
called from any thread.
The NonBlockingTypeProcessor is instantiated on the same thread as the
data to be synced. It connects to a NonBlockingTypeProcessorCore on the
sync thread by sending a request through a SyncCoreProxy. Keeping data
in sync will be a collaborative effort involving both the
NonBlockingTypeProcessor and NonBlockingTypeProcesssorCore, though none
of this functionality has been implemented yet.
As of this CL, none these classes are instantiated outside of tests.
This CL should have no effect on current sync behavior.
BUG=351005
Review URL: https://codereview.chromium.org/208893004
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@259921 0039d316-1c4b-4281-b951-d872f2087c98
20 files changed, 541 insertions, 19 deletions
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc index 6d6abc4..fdb9e10 100644 --- a/sync/engine/non_blocking_type_processor_core.cc +++ b/sync/engine/non_blocking_type_processor_core.cc @@ -10,7 +10,13 @@ namespace syncer { NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( - ModelType type) : type_(type), weak_ptr_factory_(this) { + ModelType type, + scoped_refptr<base::SequencedTaskRunner> processor_task_runner, + base::WeakPtr<NonBlockingTypeProcessor> processor) + : type_(type), + processor_task_runner_(processor_task_runner), + processor_(processor), + weak_ptr_factory_(this) { progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type)); } diff --git a/sync/engine/non_blocking_type_processor_core.h b/sync/engine/non_blocking_type_processor_core.h index 028bb3a..412d2dd 100644 --- a/sync/engine/non_blocking_type_processor_core.h +++ b/sync/engine/non_blocking_type_processor_core.h @@ -13,8 +13,14 @@ #include "sync/internal_api/public/base/model_type.h" #include "sync/protocol/sync.pb.h" +namespace base { +class SingleThreadTaskRunner; +} + namespace syncer { +class NonBlockingTypeProcessor; + // A smart cache for sync types that use message passing (rather than // transactions and the syncable::Directory) to communicate with the sync // thread. @@ -35,12 +41,15 @@ namespace syncer { // example, if the sync server sends down an update for a sync entity that is // currently pending for commit, this object will detect this condition and // cancel the pending commit. -class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorCore +class SYNC_EXPORT NonBlockingTypeProcessorCore : public UpdateHandler, public CommitContributor, public base::NonThreadSafe { public: - explicit NonBlockingTypeProcessorCore(ModelType type); + NonBlockingTypeProcessorCore( + ModelType type, + scoped_refptr<base::SequencedTaskRunner> processor_task_runner, + base::WeakPtr<NonBlockingTypeProcessor> processor); virtual ~NonBlockingTypeProcessorCore(); ModelType GetModelType() const; @@ -65,6 +74,9 @@ class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessorCore ModelType type_; sync_pb::DataTypeProgressMarker progress_marker_; + scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; + base::WeakPtr<NonBlockingTypeProcessor> processor_; + base::WeakPtrFactory<NonBlockingTypeProcessorCore> weak_ptr_factory_; }; diff --git a/sync/internal_api/non_blocking_type_processor.cc b/sync/internal_api/non_blocking_type_processor.cc new file mode 100644 index 0000000..242c42f --- /dev/null +++ b/sync/internal_api/non_blocking_type_processor.cc @@ -0,0 +1,57 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/internal_api/public/non_blocking_type_processor.h" + +#include "base/message_loop/message_loop_proxy.h" +#include "sync/engine/non_blocking_type_processor_core.h" + +namespace syncer { + +NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type) + : type_(type), enabled_(false), weak_ptr_factory_(this) {} + +NonBlockingTypeProcessor::~NonBlockingTypeProcessor() { +} + +bool NonBlockingTypeProcessor::IsEnabled() const { + DCHECK(CalledOnValidThread()); + return enabled_; +} + +ModelType NonBlockingTypeProcessor::GetModelType() const { + DCHECK(CalledOnValidThread()); + return type_; +} + +void NonBlockingTypeProcessor::Enable(SyncCoreProxy core_proxy_) { + DCHECK(CalledOnValidThread()); + core_proxy_.ConnectTypeToCore( + GetModelType(), + AsWeakPtr()); +} + +void NonBlockingTypeProcessor::Disable() { + DCHECK(CalledOnValidThread()); + enabled_ = false; + weak_ptr_factory_.InvalidateWeakPtrs(); + core_ = base::WeakPtr<NonBlockingTypeProcessorCore>(); + sync_thread_ = scoped_refptr<base::SequencedTaskRunner>(); +} + +base::WeakPtr<NonBlockingTypeProcessor> NonBlockingTypeProcessor::AsWeakPtr() { + DCHECK(CalledOnValidThread()); + return weak_ptr_factory_.GetWeakPtr(); +} + +void NonBlockingTypeProcessor::OnConnect( + base::WeakPtr<NonBlockingTypeProcessorCore> core, + scoped_refptr<base::SequencedTaskRunner> sync_thread) { + DCHECK(CalledOnValidThread()); + enabled_ = true; + core_ = core; + sync_thread_ = sync_thread; +} + +} // namespace syncer diff --git a/sync/internal_api/public/non_blocking_type_processor.h b/sync/internal_api/public/non_blocking_type_processor.h new file mode 100644 index 0000000..a9079d5a --- /dev/null +++ b/sync/internal_api/public/non_blocking_type_processor.h @@ -0,0 +1,60 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_H_ +#define SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_H_ + +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/sequenced_task_runner.h" +#include "base/threading/non_thread_safe.h" +#include "sync/base/sync_export.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/internal_api/public/sync_core_proxy.h" +#include "sync/protocol/sync.pb.h" + +namespace syncer { + +class NonBlockingTypeProcessorCore; + +// A sync component embedded on the synced type's thread that helps to handle +// communication between sync and model type threads. +class SYNC_EXPORT_PRIVATE NonBlockingTypeProcessor : base::NonThreadSafe { + public: + NonBlockingTypeProcessor(ModelType type); + virtual ~NonBlockingTypeProcessor(); + + // Returns true if the handshake with sync thread is complete. + bool IsEnabled() const; + + // Returns the model type handled by this processor. + ModelType GetModelType() const; + + // Starts the handshake with the sync thread. + void Enable(SyncCoreProxy core_proxy_); + + // Severs all ties to the sync thread. + // Another call to Enable() can be used to re-establish this connection. + void Disable(); + + // Callback used to process the handshake response. + void OnConnect(base::WeakPtr<NonBlockingTypeProcessorCore> core, + scoped_refptr<base::SequencedTaskRunner> sync_thread); + + base::WeakPtr<NonBlockingTypeProcessor> AsWeakPtr(); + + private: + ModelType type_; + sync_pb::DataTypeProgressMarker progress_marker_; + bool enabled_; + + base::WeakPtr<NonBlockingTypeProcessorCore> core_; + scoped_refptr<base::SequencedTaskRunner> sync_thread_; + + base::WeakPtrFactory<NonBlockingTypeProcessor> weak_ptr_factory_; +}; + +} // namespace syncer + +#endif // SYNC_ENGINE_NON_BLOCKING_TYPE_PROCESSOR_H_ diff --git a/sync/internal_api/public/sync_core_proxy.h b/sync/internal_api/public/sync_core_proxy.h new file mode 100644 index 0000000..dd5a4ac --- /dev/null +++ b/sync/internal_api/public/sync_core_proxy.h @@ -0,0 +1,55 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_INTERNAL_API_PUBLIC_SYNC_CORE_PROXY_H_ +#define SYNC_INTERNAL_API_PUBLIC_SYNC_CORE_PROXY_H_ + +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/sequenced_task_runner.h" +#include "sync/base/sync_export.h" +#include "sync/internal_api/public/base/model_type.h" + +namespace syncer { + +class SyncCore; +class NonBlockingTypeProcessor; + +// Encapsulates a reference to the sync core and the thread it's running on. +// Used by sync's data types to connect with the sync core. +// +// It is epxected that this object will be copied to and used on many different +// threads. It is small and safe to pass by value. +class SYNC_EXPORT_PRIVATE SyncCoreProxy { + public: + SyncCoreProxy( + scoped_refptr<base::SequencedTaskRunner> sync_task_runner, + base::WeakPtr<SyncCore> sync_core); + ~SyncCoreProxy(); + + // Attempts to connect a non-blocking type to the sync core. + // + // This may fail under some unusual circumstances, like shutdown. Due to the + // nature of WeakPtrs and cross-thread communication, the caller will be + // unable to distinguish a slow success from failure. + // + // Must be called from the thread where the data type lives. + void ConnectTypeToCore( + syncer::ModelType type, + base::WeakPtr<NonBlockingTypeProcessor> type_processor); + + // Constructs and returns a useless instance of this object. + static SyncCoreProxy GetInvalidSyncCoreProxyForTest(); + + private: + // A SequencedTaskRunner representing the thread where the SyncCore lives. + scoped_refptr<base::SequencedTaskRunner> sync_task_runner_; + + // The SyncCore this object is wrapping. + base::WeakPtr<SyncCore> sync_core_; +}; + +} // namespace syncer + +#endif // SYNC_INTERNAL_API_PUBLIC_SYNC_CORE_PROXY_H_ diff --git a/sync/internal_api/public/sync_manager.h b/sync/internal_api/public/sync_manager.h index 6f6d9fc..38bad07 100644 --- a/sync/internal_api/public/sync_manager.h +++ b/sync/internal_api/public/sync_manager.h @@ -35,19 +35,20 @@ class EncryptedData; namespace syncer { class BaseTransaction; +class CancelationSignal; class DataTypeDebugInfoListener; class Encryptor; -struct Experiments; class ExtensionsActivity; class HttpPostProviderFactory; class InternalComponentsFactory; class JsBackend; class JsEventHandler; +class SyncCore; class SyncEncryptionHandler; class ProtocolEvent; class SyncScheduler; +struct Experiments; struct UserShare; -class CancelationSignal; namespace sessions { class SyncSessionSnapshot; @@ -331,6 +332,9 @@ class SYNC_EXPORT SyncManager : public syncer::InvalidationHandler { // May be called from any thread. virtual UserShare* GetUserShare() = 0; + // Returns an instance of the main interface for non-blocking sync types. + virtual syncer::SyncCore* GetSyncCore() = 0; + // Returns the cache_guid of the currently open database. // Requires that the SyncManager be initialized. virtual const std::string cache_guid() = 0; diff --git a/sync/internal_api/public/test/fake_sync_manager.h b/sync/internal_api/public/test/fake_sync_manager.h index 1194399..81b7160 100644 --- a/sync/internal_api/public/test/fake_sync_manager.h +++ b/sync/internal_api/public/test/fake_sync_manager.h @@ -119,6 +119,7 @@ class FakeSyncManager : public SyncManager { virtual void SaveChanges() OVERRIDE; virtual void ShutdownOnSyncThread() OVERRIDE; virtual UserShare* GetUserShare() OVERRIDE; + virtual syncer::SyncCore* GetSyncCore() OVERRIDE; virtual const std::string cache_guid() OVERRIDE; virtual bool ReceivedExperiment(Experiments* experiments) OVERRIDE; virtual bool HasUnsyncedItems() OVERRIDE; diff --git a/sync/internal_api/sync_core.cc b/sync/internal_api/sync_core.cc new file mode 100644 index 0000000..9c26095 --- /dev/null +++ b/sync/internal_api/sync_core.cc @@ -0,0 +1,32 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/internal_api/sync_core.h" + +#include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/sessions/model_type_registry.h" + +namespace syncer { + +SyncCore::SyncCore(ModelTypeRegistry* model_type_registry) + : model_type_registry_(model_type_registry), weak_ptr_factory_(this) {} + +SyncCore::~SyncCore() {} + +void SyncCore::ConnectSyncTypeToCore( + ModelType type, + scoped_refptr<base::SequencedTaskRunner> task_runner, + base::WeakPtr<NonBlockingTypeProcessor> processor) { + + // Initialize the processor's sync-thread sibling and the + // processor <-> processor_core (ie. model thread <-> sync thread) + // communication channel. + model_type_registry_->InitializeNonBlockingType(type, task_runner, processor); +} + +base::WeakPtr<SyncCore> SyncCore::AsWeakPtr() { + return weak_ptr_factory_.GetWeakPtr(); +} + +} // namespace syncer diff --git a/sync/internal_api/sync_core.h b/sync/internal_api/sync_core.h new file mode 100644 index 0000000..504304c --- /dev/null +++ b/sync/internal_api/sync_core.h @@ -0,0 +1,51 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_INTERNAL_API_PUBLIC_SYNC_CORE_H_ +#define SYNC_INTERNAL_API_PUBLIC_SYNC_CORE_H_ + +#include "base/basictypes.h" +#include "base/memory/weak_ptr.h" +#include "base/sequenced_task_runner.h" +#include "sync/base/sync_export.h" +#include "sync/internal_api/public/base/model_type.h" + +namespace syncer { + +class ModelTypeRegistry; +class NonBlockingTypeProcessor; + +// An interface of the core parts of sync. +// +// In theory, this is the component that provides off-thread sync types with +// functionality to schedule and execute communication with the sync server. In +// practice, this class delegates most of the responsibilty of implemeting this +// functionality to other classes, and most of the interface is exposed not +// directly here but instead through a per-ModelType class that this class helps +// instantiate. +class SYNC_EXPORT_PRIVATE SyncCore { + public: + explicit SyncCore(ModelTypeRegistry* model_type_registry); + ~SyncCore(); + + // Initializes the connection between the sync core and its delegate on the + // sync client's thread. + void ConnectSyncTypeToCore( + syncer::ModelType type, + scoped_refptr<base::SequencedTaskRunner> datatype_task_runner, + base::WeakPtr<NonBlockingTypeProcessor> sync_client); + + base::WeakPtr<SyncCore> AsWeakPtr(); + + private: + ModelTypeRegistry* model_type_registry_; + base::WeakPtrFactory<SyncCore> weak_ptr_factory_; + + DISALLOW_COPY_AND_ASSIGN(SyncCore); +}; + +} // namespace syncer + +#endif // SYNC_INTERNAL_API_PUBLIC_SYNC_CORE_H_ + diff --git a/sync/internal_api/sync_core_proxy.cc b/sync/internal_api/sync_core_proxy.cc new file mode 100644 index 0000000..5ba3135 --- /dev/null +++ b/sync/internal_api/sync_core_proxy.cc @@ -0,0 +1,40 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/internal_api/public/sync_core_proxy.h" + +#include "base/bind.h" +#include "base/location.h" +#include "base/message_loop/message_loop_proxy.h" +#include "sync/internal_api/sync_core.h" + +namespace syncer { + +SyncCoreProxy::SyncCoreProxy( + scoped_refptr<base::SequencedTaskRunner> sync_task_runner, + base::WeakPtr<SyncCore> sync_core) + : sync_task_runner_(sync_task_runner), + sync_core_(sync_core) {} + +SyncCoreProxy::~SyncCoreProxy() {} + +void SyncCoreProxy::ConnectTypeToCore( + ModelType type, + base::WeakPtr<NonBlockingTypeProcessor> type_processor) { + VLOG(1) << "ConnectSyncTypeToCore: " << ModelTypeToString(type); + sync_task_runner_->PostTask( + FROM_HERE, + base::Bind(&SyncCore::ConnectSyncTypeToCore, + sync_core_, + type, + base::MessageLoopProxy::current(), + type_processor)); +} + +SyncCoreProxy SyncCoreProxy::GetInvalidSyncCoreProxyForTest() { + return SyncCoreProxy(scoped_refptr<base::SequencedTaskRunner>(), + base::WeakPtr<SyncCore>()); +} + +} // namespace syncer diff --git a/sync/internal_api/sync_core_proxy_unittest.cc b/sync/internal_api/sync_core_proxy_unittest.cc new file mode 100644 index 0000000..d6885bf --- /dev/null +++ b/sync/internal_api/sync_core_proxy_unittest.cc @@ -0,0 +1,95 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop/message_loop.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/run_loop.h" +#include "base/sequenced_task_runner.h" +#include "sync/internal_api/public/base/model_type.h" +#include "sync/internal_api/public/non_blocking_type_processor.h" +#include "sync/internal_api/public/sync_core_proxy.h" +#include "sync/internal_api/sync_core.h" +#include "sync/sessions/model_type_registry.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace syncer { + +class SyncCoreProxyTest : public ::testing::Test { + public: + SyncCoreProxyTest() + : sync_task_runner_(base::MessageLoopProxy::current()), + type_task_runner_(base::MessageLoopProxy::current()), + core_(new SyncCore(®istry_)), + core_proxy_( + sync_task_runner_, + core_->AsWeakPtr()) {} + + // The sync thread could be shut down at any time without warning. This + // function simulates such an event. + void DisableSync() { + core_.reset(); + } + + SyncCoreProxy GetProxy() { + return core_proxy_; + } + + private: + base::MessageLoop loop_; + scoped_refptr<base::SequencedTaskRunner> sync_task_runner_; + scoped_refptr<base::SequencedTaskRunner> type_task_runner_; + ModelTypeRegistry registry_; + scoped_ptr<SyncCore> core_; + SyncCoreProxy core_proxy_; +}; + +// Try to connect a type to a SyncCore that has already shut down. +TEST_F(SyncCoreProxyTest, FailToConnect1) { + NonBlockingTypeProcessor themes_processor(syncer::THEMES); + DisableSync(); + themes_processor.Enable(GetProxy()); + + base::RunLoop run_loop_; + run_loop_.RunUntilIdle(); + EXPECT_FALSE(themes_processor.IsEnabled()); +} + +// Try to connect a type to a SyncCore as it shuts down. +TEST_F(SyncCoreProxyTest, FailToConnect2) { + NonBlockingTypeProcessor themes_processor(syncer::THEMES); + themes_processor.Enable(GetProxy()); + DisableSync(); + + base::RunLoop run_loop_; + run_loop_.RunUntilIdle(); + EXPECT_FALSE(themes_processor.IsEnabled()); +} + +// Tests the case where the type's processor shuts down first. +TEST_F(SyncCoreProxyTest, TypeDisconnectsFirst) { + scoped_ptr<NonBlockingTypeProcessor> themes_processor + (new NonBlockingTypeProcessor(syncer::THEMES)); + themes_processor->Enable(GetProxy()); + + base::RunLoop run_loop_; + run_loop_.RunUntilIdle(); + + EXPECT_TRUE(themes_processor->IsEnabled()); + themes_processor.reset(); +} + +// Tests the case where the sync thread shuts down first. +TEST_F(SyncCoreProxyTest, SyncDisconnectsFirst) { + scoped_ptr<NonBlockingTypeProcessor> themes_processor + (new NonBlockingTypeProcessor(syncer::THEMES)); + themes_processor->Enable(GetProxy()); + + base::RunLoop run_loop_; + run_loop_.RunUntilIdle(); + + EXPECT_TRUE(themes_processor->IsEnabled()); + DisableSync(); +} + +} // namespace syncer diff --git a/sync/internal_api/sync_manager_impl.cc b/sync/internal_api/sync_manager_impl.cc index 3b4a1fd..224b38b 100644 --- a/sync/internal_api/sync_manager_impl.cc +++ b/sync/internal_api/sync_manager_impl.cc @@ -32,6 +32,7 @@ #include "sync/internal_api/public/util/experiments.h" #include "sync/internal_api/public/write_node.h" #include "sync/internal_api/public/write_transaction.h" +#include "sync/internal_api/sync_core.h" #include "sync/internal_api/syncapi_internal.h" #include "sync/internal_api/syncapi_server_connection_manager.h" #include "sync/js/js_arg_list.h" @@ -410,6 +411,8 @@ void SyncManagerImpl::Init( model_type_registry_.reset(new ModelTypeRegistry(workers, directory())); + sync_core_.reset(new SyncCore(model_type_registry_.get())); + // Build a SyncSessionContext and store the worker in it. DVLOG(1) << "Sync is bringing up SyncSessionContext."; std::vector<SyncEngineEventListener*> listeners; @@ -1066,6 +1069,11 @@ UserShare* SyncManagerImpl::GetUserShare() { return &share_; } +syncer::SyncCore* SyncManagerImpl::GetSyncCore() { + DCHECK(initialized_); + return sync_core_.get(); +} + const std::string SyncManagerImpl::cache_guid() { DCHECK(initialized_); return directory()->cache_guid(); diff --git a/sync/internal_api/sync_manager_impl.h b/sync/internal_api/sync_manager_impl.h index 7189026..5cb3908 100644 --- a/sync/internal_api/sync_manager_impl.h +++ b/sync/internal_api/sync_manager_impl.h @@ -110,6 +110,7 @@ class SYNC_EXPORT_PRIVATE SyncManagerImpl : virtual void SaveChanges() OVERRIDE; virtual void ShutdownOnSyncThread() OVERRIDE; virtual UserShare* GetUserShare() OVERRIDE; + virtual syncer::SyncCore* GetSyncCore() OVERRIDE; virtual const std::string cache_guid() OVERRIDE; virtual bool ReceivedExperiment(Experiments* experiments) OVERRIDE; virtual bool HasUnsyncedItems() OVERRIDE; @@ -311,6 +312,9 @@ class SYNC_EXPORT_PRIVATE SyncManagerImpl : // This state changes when entering or exiting a configuration cycle. scoped_ptr<ModelTypeRegistry> model_type_registry_; + // The main interface for non-blocking sync types. + scoped_ptr<SyncCore> sync_core_; + // A container of various bits of information used by the SyncScheduler to // create SyncSessions. Must outlive the SyncScheduler. scoped_ptr<sessions::SyncSessionContext> session_context_; diff --git a/sync/internal_api/test/fake_sync_manager.cc b/sync/internal_api/test/fake_sync_manager.cc index d6b538f..883da74 100644 --- a/sync/internal_api/test/fake_sync_manager.cc +++ b/sync/internal_api/test/fake_sync_manager.cc @@ -218,6 +218,10 @@ UserShare* FakeSyncManager::GetUserShare() { return test_user_share_.user_share(); } +syncer::SyncCore* FakeSyncManager::GetSyncCore() { + return NULL; +} + const std::string FakeSyncManager::cache_guid() { return test_user_share_.user_share()->directory->cache_guid(); } diff --git a/sync/sessions/DEPS b/sync/sessions/DEPS index b4042e3..36fed20 100644 --- a/sync/sessions/DEPS +++ b/sync/sessions/DEPS @@ -1,9 +1,7 @@ include_rules = [ "+sync/base", "+sync/engine", - "+sync/internal_api/public/base", - "+sync/internal_api/public/engine", - "+sync/internal_api/public/sessions", + "+sync/internal_api/public", "+sync/notifier", "+sync/protocol", "+sync/syncable", diff --git a/sync/sessions/model_type_registry.cc b/sync/sessions/model_type_registry.cc index ab93c68..f93cba3e 100644 --- a/sync/sessions/model_type_registry.cc +++ b/sync/sessions/model_type_registry.cc @@ -4,12 +4,17 @@ #include "sync/sessions/model_type_registry.h" +#include "base/bind.h" +#include "base/message_loop/message_loop_proxy.h" #include "sync/engine/directory_commit_contributor.h" #include "sync/engine/directory_update_handler.h" #include "sync/engine/non_blocking_type_processor_core.h" +#include "sync/internal_api/public/non_blocking_type_processor.h" namespace syncer { +ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} + ModelTypeRegistry::ModelTypeRegistry( const std::vector<scoped_refptr<ModelSafeWorker> >& workers, syncable::Directory* directory) @@ -67,13 +72,28 @@ void ModelTypeRegistry::SetEnabledDirectoryTypes( } enabled_directory_types_ = GetRoutingInfoTypes(routing_info); + DCHECK(Intersection(GetEnabledDirectoryTypes(), + GetEnabledNonBlockingTypes()).Empty()); } -void ModelTypeRegistry::InitializeNonBlockingType(syncer::ModelType type) { +void ModelTypeRegistry::InitializeNonBlockingType( + ModelType type, + scoped_refptr<base::SequencedTaskRunner> type_task_runner, + base::WeakPtr<NonBlockingTypeProcessor> processor) { DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); + // Initialize CoreProcessor -> Processor communication channel. scoped_ptr<NonBlockingTypeProcessorCore> core( - new NonBlockingTypeProcessorCore(type)); + new NonBlockingTypeProcessorCore(type, type_task_runner, processor)); + + // Initialize Processor -> CoreProcessor communication channel. + type_task_runner->PostTask( + FROM_HERE, + base::Bind(&NonBlockingTypeProcessor::OnConnect, + processor->AsWeakPtr(), + core->AsWeakPtr(), + scoped_refptr<base::SequencedTaskRunner>( + base::MessageLoopProxy::current()))); DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); @@ -83,6 +103,9 @@ void ModelTypeRegistry::InitializeNonBlockingType(syncer::ModelType type) { // The container takes ownership. non_blocking_type_processor_cores_.push_back(core.release()); + + DCHECK(Intersection(GetEnabledDirectoryTypes(), + GetEnabledNonBlockingTypes()).Empty()); } void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) { diff --git a/sync/sessions/model_type_registry.h b/sync/sessions/model_type_registry.h index 65445d4..1317a6a 100644 --- a/sync/sessions/model_type_registry.h +++ b/sync/sessions/model_type_registry.h @@ -24,6 +24,7 @@ class CommitContributor; class DirectoryCommitContributor; class DirectoryUpdateHandler; class NonBlockingTypeProcessorCore; +class NonBlockingTypeProcessor; class UpdateHandler; typedef std::map<ModelType, UpdateHandler*> UpdateHandlerMap; @@ -32,6 +33,11 @@ typedef std::map<ModelType, CommitContributor*> CommitContributorMap; // Keeps track of the sets of active update handlers and commit contributors. class SYNC_EXPORT_PRIVATE ModelTypeRegistry { public: + // This alternative constructor does not support any directory types. + // It is used only in tests. + ModelTypeRegistry(); + + // Constructs a ModelTypeRegistry that supports directory types. ModelTypeRegistry( const std::vector<scoped_refptr<ModelSafeWorker> >& workers, syncable::Directory* directory); @@ -40,10 +46,14 @@ class SYNC_EXPORT_PRIVATE ModelTypeRegistry { // Sets the set of enabled types. void SetEnabledDirectoryTypes(const ModelSafeRoutingInfo& routing_info); - // Enables an off-thread type for syncing. + // Enables an off-thread type for syncing. Connects the given processor + // and its task_runner to the newly created processor core. // - // Expects that the type is not currently enabled. - void InitializeNonBlockingType(syncer::ModelType type); + // Expects that the processor's ModelType is not currently enabled. + void InitializeNonBlockingType( + syncer::ModelType type, + scoped_refptr<base::SequencedTaskRunner> type_task_runner, + base::WeakPtr<NonBlockingTypeProcessor> processor); // Disables the syncing of an off-thread type. // diff --git a/sync/sessions/model_type_registry_unittest.cc b/sync/sessions/model_type_registry_unittest.cc index 7f174e5..0248e8d 100644 --- a/sync/sessions/model_type_registry_unittest.cc +++ b/sync/sessions/model_type_registry_unittest.cc @@ -4,9 +4,10 @@ #include <vector> +#include "base/deferred_sequenced_task_runner.h" #include "base/message_loop/message_loop.h" -#include "sync/engine/non_blocking_type_processor_core.h" #include "sync/internal_api/public/base/model_type.h" +#include "sync/internal_api/public/non_blocking_type_processor.h" #include "sync/sessions/model_type_registry.h" #include "sync/test/engine/fake_model_worker.h" #include "sync/test/engine/test_directory_setter_upper.h" @@ -113,13 +114,24 @@ TEST_F(ModelTypeRegistryTest, SetEnabledDirectoryTypes_Clear) { } TEST_F(ModelTypeRegistryTest, NonBlockingTypes) { + NonBlockingTypeProcessor themes_processor(syncer::THEMES); + NonBlockingTypeProcessor sessions_processor(syncer::SESSIONS); + scoped_refptr<base::DeferredSequencedTaskRunner> task_runner = + new base::DeferredSequencedTaskRunner(base::MessageLoopProxy::current()); + EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); - registry()->InitializeNonBlockingType(syncer::THEMES); + registry()->InitializeNonBlockingType( + syncer::THEMES, + task_runner, + themes_processor.AsWeakPtr()); EXPECT_TRUE(registry()->GetEnabledTypes().Equals( ModelTypeSet(syncer::THEMES))); - registry()->InitializeNonBlockingType(syncer::SESSIONS); + registry()->InitializeNonBlockingType( + syncer::SESSIONS, + task_runner, + sessions_processor.AsWeakPtr()); EXPECT_TRUE(registry()->GetEnabledTypes().Equals( ModelTypeSet(syncer::THEMES, syncer::SESSIONS))); @@ -132,6 +144,11 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypes) { } TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) { + NonBlockingTypeProcessor themes_processor(syncer::THEMES); + NonBlockingTypeProcessor sessions_processor(syncer::SESSIONS); + scoped_refptr<base::DeferredSequencedTaskRunner> task_runner = + new base::DeferredSequencedTaskRunner(base::MessageLoopProxy::current()); + ModelSafeRoutingInfo routing_info1; routing_info1.insert(std::make_pair(NIGORI, GROUP_PASSIVE)); routing_info1.insert(std::make_pair(BOOKMARKS, GROUP_UI)); @@ -141,7 +158,10 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) { EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); // Add the themes non-blocking type. - registry()->InitializeNonBlockingType(syncer::THEMES); + registry()->InitializeNonBlockingType( + syncer::THEMES, + task_runner, + themes_processor.AsWeakPtr()); current_types.Put(syncer::THEMES); EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); @@ -151,7 +171,10 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) { EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); // Add sessions non-blocking type. - registry()->InitializeNonBlockingType(syncer::SESSIONS); + registry()->InitializeNonBlockingType( + syncer::SESSIONS, + task_runner, + sessions_processor.AsWeakPtr()); current_types.Put(syncer::SESSIONS); EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); @@ -167,4 +190,36 @@ TEST_F(ModelTypeRegistryTest, NonBlockingTypesWithDirectoryTypes) { EXPECT_TRUE(registry()->GetEnabledTypes().Equals(current_types)); } +TEST_F(ModelTypeRegistryTest, DeletionOrdering) { + scoped_ptr<NonBlockingTypeProcessor> themes_processor( + new NonBlockingTypeProcessor(syncer::THEMES)); + scoped_ptr<NonBlockingTypeProcessor> sessions_processor( + new NonBlockingTypeProcessor(syncer::SESSIONS)); + scoped_refptr<base::DeferredSequencedTaskRunner> task_runner = + new base::DeferredSequencedTaskRunner(base::MessageLoopProxy::current()); + + EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); + + registry()->InitializeNonBlockingType( + syncer::THEMES, + task_runner, + themes_processor->AsWeakPtr()); + registry()->InitializeNonBlockingType( + syncer::SESSIONS, + task_runner, + sessions_processor->AsWeakPtr()); + EXPECT_TRUE(registry()->GetEnabledTypes().Equals( + ModelTypeSet(syncer::THEMES, syncer::SESSIONS))); + + // Tear down themes processing, starting with the ProcessorCore. + registry()->RemoveNonBlockingType(syncer::THEMES); + themes_processor.reset(); + + // Tear down sessions processing, starting with the Processor. + sessions_processor.reset(); + registry()->RemoveNonBlockingType(syncer::SESSIONS); + + EXPECT_TRUE(registry()->GetEnabledTypes().Empty()); +} + } // namespace syncer diff --git a/sync/sync_internal_api.gypi b/sync/sync_internal_api.gypi index 8f46442..20ac6a5 100644 --- a/sync/sync_internal_api.gypi +++ b/sync/sync_internal_api.gypi @@ -40,6 +40,7 @@ 'internal_api/js_sync_encryption_handler_observer.h', 'internal_api/js_sync_manager_observer.cc', 'internal_api/js_sync_manager_observer.h', + 'internal_api/non_blocking_type_processor.cc', 'internal_api/public/base/ack_handle.cc', 'internal_api/public/base/ack_handle.h', 'internal_api/public/base/cancelation_observer.cc', @@ -89,12 +90,14 @@ 'internal_api/public/internal_components_factory.h', 'internal_api/public/internal_components_factory_impl.h', 'internal_api/public/network_resources.h', + 'internal_api/public/non_blocking_type_processor.h', 'internal_api/public/read_node.h', 'internal_api/public/read_transaction.h', 'internal_api/public/sessions/model_neutral_state.cc', 'internal_api/public/sessions/model_neutral_state.h', 'internal_api/public/sessions/sync_session_snapshot.cc', 'internal_api/public/sessions/sync_session_snapshot.h', + 'internal_api/public/sync_core_proxy.h', 'internal_api/public/sync_encryption_handler.cc', 'internal_api/public/sync_encryption_handler.h', 'internal_api/public/sync_manager.cc', @@ -117,6 +120,9 @@ 'internal_api/public/write_transaction.h', 'internal_api/read_node.cc', 'internal_api/read_transaction.cc', + 'internal_api/sync_core.cc', + 'internal_api/sync_core.h', + 'internal_api/sync_core_proxy.cc', 'internal_api/sync_encryption_handler_impl.cc', 'internal_api/sync_encryption_handler_impl.h', 'internal_api/sync_manager_factory.cc', diff --git a/sync/sync_tests.gypi b/sync/sync_tests.gypi index 7c89c99..a8ac4a0 100644 --- a/sync/sync_tests.gypi +++ b/sync/sync_tests.gypi @@ -419,9 +419,10 @@ 'internal_api/js_sync_manager_observer_unittest.cc', 'internal_api/public/change_record_unittest.cc', 'internal_api/public/sessions/sync_session_snapshot_unittest.cc', - 'internal_api/syncapi_server_connection_manager_unittest.cc', + 'internal_api/sync_core_proxy_unittest.cc', 'internal_api/sync_encryption_handler_impl_unittest.cc', 'internal_api/sync_manager_impl_unittest.cc', + 'internal_api/syncapi_server_connection_manager_unittest.cc', ], 'conditions': [ ['OS == "ios"', { |