diff options
author | maniscalco <maniscalco@chromium.org> | 2014-09-05 09:44:54 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2014-09-05 16:47:12 +0000 |
commit | 5dde06853d1d5020796184c5fc749b9730ce8f07 (patch) | |
tree | 032fd16c70b02694c18ca634c0d44dba141c9c01 | |
parent | 17267f48ca31c053628657ec7731bfb72f8ea8dc (diff) | |
download | chromium_src-5dde06853d1d5020796184c5fc749b9730ce8f07.zip chromium_src-5dde06853d1d5020796184c5fc749b9730ce8f07.tar.gz chromium_src-5dde06853d1d5020796184c5fc749b9730ce8f07.tar.bz2 |
Replace AttachmentStore's StoreAttachments with UploadAttachments.
This change is lays groundwork for making attachment upload operations
persistent.
Make GenericChangeProcessor responsible for writing attachments to the
AttachmentStore and calling UploadAttachments. In a future CL, datatype
code will be responsible for writing attachments to the store.
Queue up attachments for upload inside of AttachmentService. In a
future CL we'll add rate limiting, retry, and back-off logic.
Expose AttachmentService's AttachmentStore via GetStore method.
BUG=
Review URL: https://codereview.chromium.org/512413003
Cr-Commit-Position: refs/heads/master@{#293536}
10 files changed, 354 insertions, 236 deletions
diff --git a/components/sync_driver/generic_change_processor.cc b/components/sync_driver/generic_change_processor.cc index c44ce72..ddf6b67 100644 --- a/components/sync_driver/generic_change_processor.cc +++ b/components/sync_driver/generic_change_processor.cc @@ -85,6 +85,11 @@ syncer::SyncData BuildRemoteSyncData( attachment_service_proxy); } +const syncer::AttachmentId& AttachmentToAttachmentId( + const syncer::Attachment& attachment) { + return attachment.GetId(); +} + } // namespace GenericChangeProcessor::GenericChangeProcessor( @@ -102,7 +107,8 @@ GenericChangeProcessor::GenericChangeProcessor( attachment_service_weak_ptr_factory_(attachment_service_.get()), attachment_service_proxy_( base::MessageLoopProxy::current(), - attachment_service_weak_ptr_factory_.GetWeakPtr()) { + attachment_service_weak_ptr_factory_.GetWeakPtr()), + weak_ptr_factory_(this) { DCHECK(CalledOnValidThread()); DCHECK(attachment_service_); } @@ -387,25 +393,6 @@ syncer::SyncError AttemptDelete(const syncer::SyncChange& change, return syncer::SyncError(); } -// A callback invoked on completion of AttachmentService::StoreAttachment. -void IgnoreStoreResult(const syncer::AttachmentService::StoreResult&) { - // TODO(maniscalco): Here is where we're going to update the in-directory - // entry to indicate that the attachments have been successfully stored on - // disk. Why do we care? Because we might crash after persisting the - // directory to disk, but before we have persisted its attachments, leaving us - // with danging attachment ids. Having a flag that indicates we've stored the - // entry will allow us to detect and filter entries with dangling attachment - // ids (bug 368353). -} - -void StoreAttachments(syncer::AttachmentService* attachment_service, - const syncer::AttachmentList& attachments) { - DCHECK(attachment_service); - syncer::AttachmentService::StoreCallback ignore_store_result = - base::Bind(&IgnoreStoreResult); - attachment_service->StoreAttachments(attachments, ignore_store_result); -} - } // namespace syncer::SyncError GenericChangeProcessor::ProcessSyncChanges( @@ -465,7 +452,7 @@ syncer::SyncError GenericChangeProcessor::ProcessSyncChanges( } if (!new_attachments.empty()) { - StoreAttachments(attachment_service_.get(), new_attachments); + StoreAndUploadAttachments(new_attachments); } return syncer::SyncError(); @@ -711,4 +698,40 @@ syncer::UserShare* GenericChangeProcessor::share_handle() const { return share_handle_; } +void GenericChangeProcessor::StoreAndUploadAttachments( + const syncer::AttachmentList& attachments) { + DCHECK(CalledOnValidThread()); + attachment_service_->GetStore()->Write( + attachments, + base::Bind(&GenericChangeProcessor::WriteAttachmentsDone, + weak_ptr_factory_.GetWeakPtr(), + attachments)); +} + +void GenericChangeProcessor::WriteAttachmentsDone( + const syncer::AttachmentList& attachments, + const syncer::AttachmentStore::Result& result) { + DCHECK(CalledOnValidThread()); + if (result != syncer::AttachmentStore::SUCCESS) { + // TODO(maniscalco): Deal with case where an error occurred (bug 361251). + return; + } + + // TODO(maniscalco): Here is where we're going to update the in-directory + // entry to indicate that the attachments have been successfully stored on + // disk. Why do we care? Because we might crash after persisting the + // directory to disk, but before we have persisted its attachments, leaving us + // with danging attachment ids. Having a flag that indicates we've stored the + // entry will allow us to detect and filter entries with dangling attachment + // ids (bug 368353). + + // Begin uploading the attachments now that they are safe on disk. + syncer::AttachmentIdSet attachment_ids; + std::transform(attachments.begin(), + attachments.end(), + std::inserter(attachment_ids, attachment_ids.end()), + AttachmentToAttachmentId); + attachment_service_->UploadAttachments(attachment_ids); +} + } // namespace sync_driver diff --git a/components/sync_driver/generic_change_processor.h b/components/sync_driver/generic_change_processor.h index 1941600..a3139d8 100644 --- a/components/sync_driver/generic_change_processor.h +++ b/components/sync_driver/generic_change_processor.h @@ -13,6 +13,7 @@ #include "components/sync_driver/change_processor.h" #include "components/sync_driver/data_type_controller.h" #include "components/sync_driver/data_type_error_handler.h" +#include "sync/api/attachments/attachment_store.h" #include "sync/api/sync_change_processor.h" #include "sync/api/sync_merge_result.h" #include "sync/internal_api/public/attachments/attachment_service.h" @@ -126,6 +127,18 @@ class GenericChangeProcessor : public ChangeProcessor, syncer::WriteNode* sync_node, syncer::AttachmentList* new_attachments); + // Store |attachments| locally then upload them to the sync server. + // + // Store and uploading are asynchronous operations. |WriteAttachmentsDone| + // will be invoked once the attachments have been stored on the local device. + void StoreAndUploadAttachments(const syncer::AttachmentList& attachments); + + // Invoked once attachments have been stored locally. + // + // See also AttachmentStore::WriteCallback. + void WriteAttachmentsDone(const syncer::AttachmentList& attachments, + const syncer::AttachmentStore::Result& result); + // The SyncableService this change processor will forward changes on to. const base::WeakPtr<syncer::SyncableService> local_service_; @@ -155,6 +168,8 @@ class GenericChangeProcessor : public ChangeProcessor, attachment_service_weak_ptr_factory_; syncer::AttachmentServiceProxy attachment_service_proxy_; + base::WeakPtrFactory<GenericChangeProcessor> weak_ptr_factory_; + DISALLOW_COPY_AND_ASSIGN(GenericChangeProcessor); }; diff --git a/components/sync_driver/generic_change_processor_unittest.cc b/components/sync_driver/generic_change_processor_unittest.cc index bdd7266..f1b35e6 100644 --- a/components/sync_driver/generic_change_processor_unittest.cc +++ b/components/sync_driver/generic_change_processor_unittest.cc @@ -7,6 +7,7 @@ #include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" +#include "base/run_loop.h" #include "base/strings/stringprintf.h" #include "components/sync_driver/data_type_error_handler_mock.h" #include "components/sync_driver/sync_api_component_factory.h" @@ -25,6 +26,7 @@ #include "sync/internal_api/public/user_share.h" #include "sync/internal_api/public/write_node.h" #include "sync/internal_api/public/write_transaction.h" +#include "testing/gmock/include/gmock/gmock-matchers.h" #include "testing/gtest/include/gtest/gtest.h" namespace sync_driver { @@ -33,17 +35,17 @@ namespace { const char kTestData[] = "some data"; -// A mock that keeps track of attachments passed to StoreAttachments. +// A mock that keeps track of attachments passed to UploadAttachments. class MockAttachmentService : public syncer::AttachmentServiceImpl { public: MockAttachmentService(); virtual ~MockAttachmentService(); - virtual void StoreAttachments(const syncer::AttachmentList& attachments, - const StoreCallback& callback) OVERRIDE; - std::vector<syncer::AttachmentList>* attachment_lists(); + virtual void UploadAttachments( + const syncer::AttachmentIdSet& attachment_ids) OVERRIDE; + std::vector<syncer::AttachmentIdSet>* attachment_id_sets(); private: - std::vector<syncer::AttachmentList> attachment_lists_; + std::vector<syncer::AttachmentIdSet> attachment_id_sets_; }; MockAttachmentService::MockAttachmentService() @@ -60,15 +62,15 @@ MockAttachmentService::MockAttachmentService() MockAttachmentService::~MockAttachmentService() { } -void MockAttachmentService::StoreAttachments( - const syncer::AttachmentList& attachments, - const StoreCallback& callback) { - attachment_lists_.push_back(attachments); - AttachmentServiceImpl::StoreAttachments(attachments, callback); +void MockAttachmentService::UploadAttachments( + const syncer::AttachmentIdSet& attachment_ids) { + attachment_id_sets_.push_back(attachment_ids); + AttachmentServiceImpl::UploadAttachments(attachment_ids); } -std::vector<syncer::AttachmentList>* MockAttachmentService::attachment_lists() { - return &attachment_lists_; +std::vector<syncer::AttachmentIdSet>* +MockAttachmentService::attachment_id_sets() { + return &attachment_id_sets_; } // MockSyncApiComponentFactory needed to initialize GenericChangeProcessor and @@ -161,6 +163,11 @@ class SyncGenericChangeProcessorTest : public testing::Test { return mock_attachment_service_; } + void RunLoop() { + base::RunLoop run_loop; + run_loop.RunUntilIdle(); + } + private: base::MessageLoopForUI loop_; @@ -349,19 +356,20 @@ TEST_F(SyncGenericChangeProcessorTest, tag, title, specifics, attachments))); ASSERT_FALSE( change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet()); + RunLoop(); // Check that the AttachmentService received the new attachments. - ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U); - const syncer::AttachmentList& attachments_added = - mock_attachment_service()->attachment_lists()->front(); - ASSERT_EQ(attachments_added.size(), 2U); - ASSERT_EQ(attachments_added[0].GetId(), attachments[0].GetId()); - ASSERT_EQ(attachments_added[1].GetId(), attachments[1].GetId()); + ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U); + const syncer::AttachmentIdSet& attachments_added = + mock_attachment_service()->attachment_id_sets()->front(); + ASSERT_THAT(attachments_added, + testing::UnorderedElementsAre(attachments[0].GetId(), + attachments[1].GetId())); // Update the SyncData, replacing its two attachments with one new attachment. syncer::AttachmentList new_attachments; new_attachments.push_back(syncer::Attachment::Create(attachment_data)); - mock_attachment_service()->attachment_lists()->clear(); + mock_attachment_service()->attachment_id_sets()->clear(); change_list.clear(); change_list.push_back( syncer::SyncChange(FROM_HERE, @@ -370,13 +378,14 @@ TEST_F(SyncGenericChangeProcessorTest, tag, title, specifics, new_attachments))); ASSERT_FALSE( change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet()); + RunLoop(); // Check that the AttachmentService received it. - ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U); - const syncer::AttachmentList& new_attachments_added = - mock_attachment_service()->attachment_lists()->front(); - ASSERT_EQ(new_attachments_added.size(), 1U); - ASSERT_EQ(new_attachments_added[0].GetId(), new_attachments[0].GetId()); + ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U); + const syncer::AttachmentIdSet& new_attachments_added = + mock_attachment_service()->attachment_id_sets()->front(); + ASSERT_THAT(new_attachments_added, + testing::UnorderedElementsAre(new_attachments[0].GetId())); } // Verify that after attachment is uploaded GenericChangeProcessor updates diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc index 61fb1c6..e9d3a82 100644 --- a/sync/internal_api/attachments/attachment_service_impl.cc +++ b/sync/internal_api/attachments/attachment_service_impl.cc @@ -4,6 +4,8 @@ #include "sync/internal_api/public/attachments/attachment_service_impl.h" +#include <iterator> + #include "base/bind.h" #include "base/message_loop/message_loop.h" #include "base/thread_task_runner_handle.h" @@ -141,6 +143,10 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { return attachment_service.Pass(); } +AttachmentStore* AttachmentServiceImpl::GetStore() { + return attachment_store_.get(); +} + void AttachmentServiceImpl::GetOrDownloadAttachments( const AttachmentIdList& attachment_ids, const GetOrDownloadCallback& callback) { @@ -163,25 +169,6 @@ void AttachmentServiceImpl::DropAttachments( callback)); } -void AttachmentServiceImpl::StoreAttachments(const AttachmentList& attachments, - const StoreCallback& callback) { - DCHECK(CalledOnValidThread()); - attachment_store_->Write(attachments, - base::Bind(&AttachmentServiceImpl::WriteDone, - weak_ptr_factory_.GetWeakPtr(), - callback)); - if (attachment_uploader_.get()) { - for (AttachmentList::const_iterator iter = attachments.begin(); - iter != attachments.end(); - ++iter) { - attachment_uploader_->UploadAttachment( - *iter, - base::Bind(&AttachmentServiceImpl::UploadDone, - weak_ptr_factory_.GetWeakPtr())); - } - } -} - void AttachmentServiceImpl::ReadDone( const scoped_refptr<GetOrDownloadState>& state, const AttachmentStore::Result& result, @@ -226,21 +213,10 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback, base::Bind(callback, drop_result)); } -void AttachmentServiceImpl::WriteDone(const StoreCallback& callback, - const AttachmentStore::Result& result) { - AttachmentService::StoreResult store_result = - AttachmentService::STORE_UNSPECIFIED_ERROR; - if (result == AttachmentStore::SUCCESS) { - store_result = AttachmentService::STORE_SUCCESS; - } - // TODO(maniscalco): Deal with case where an error occurred (bug 361251). - base::MessageLoop::current()->PostTask(FROM_HERE, - base::Bind(callback, store_result)); -} - void AttachmentServiceImpl::UploadDone( const AttachmentUploader::UploadResult& result, const AttachmentId& attachment_id) { + ids_in_queue_.erase(attachment_id); // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. if (result != AttachmentUploader::UPLOAD_SUCCESS) return; @@ -261,4 +237,60 @@ void AttachmentServiceImpl::DownloadDone( } } +void AttachmentServiceImpl::UploadAttachments( + const AttachmentIdSet& attachment_ids) { + DCHECK(CalledOnValidThread()); + if (!attachment_uploader_.get()) { + return; + } + + // Enqueue the attachment ids that aren't already in the queue. + AttachmentIdSet::const_iterator iter = attachment_ids.begin(); + AttachmentIdSet::const_iterator end = attachment_ids.end(); + for (; iter != end; ++iter) { + if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { + queue_.push_back(*iter); + ids_in_queue_.insert(*iter); + } + } + + ProcessQueuedUploads(); +} + +void AttachmentServiceImpl::ProcessQueuedUploads() { + DCHECK(CalledOnValidThread()); + // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of + // concurrent uploads and apply backoff on failure. + while (!queue_.empty()) { + const AttachmentId id = queue_.front(); + queue_.pop_front(); + AttachmentIdList attachment_ids; + attachment_ids.push_back(id); + attachment_store_->Read( + attachment_ids, + base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, + weak_ptr_factory_.GetWeakPtr())); + } +} + +void AttachmentServiceImpl::ReadDoneNowUpload( + const AttachmentStore::Result& result, + scoped_ptr<AttachmentMap> attachments, + scoped_ptr<AttachmentIdList> unavailable_attachment_ids) { + DCHECK(CalledOnValidThread()); + if (!unavailable_attachment_ids->empty()) { + // TODO(maniscalco): We failed to read some attachments. What should we do + // now? + } + + AttachmentMap::const_iterator iter = attachments->begin(); + AttachmentMap::const_iterator end = attachments->end(); + for (; iter != end; ++iter) { + attachment_uploader_->UploadAttachment( + iter->second, + base::Bind(&AttachmentServiceImpl::UploadDone, + weak_ptr_factory_.GetWeakPtr())); + } +} + } // namespace syncer diff --git a/sync/internal_api/attachments/attachment_service_impl_unittest.cc b/sync/internal_api/attachments/attachment_service_impl_unittest.cc index d8bf6b2..1b642e6 100644 --- a/sync/internal_api/attachments/attachment_service_impl_unittest.cc +++ b/sync/internal_api/attachments/attachment_service_impl_unittest.cc @@ -10,6 +10,7 @@ #include "base/run_loop.h" #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" #include "sync/internal_api/public/attachments/fake_attachment_uploader.h" +#include "testing/gmock/include/gmock/gmock-matchers.h" #include "testing/gtest/include/gtest/gtest.h" namespace syncer { @@ -196,21 +197,12 @@ class AttachmentServiceImplTest : public testing::Test, base::Unretained(this)); } - AttachmentService::StoreCallback store_callback() { - return base::Bind(&AttachmentServiceImplTest::StoreDone, - base::Unretained(this)); - } - void DownloadDone(const AttachmentService::GetOrDownloadResult& result, scoped_ptr<AttachmentMap> attachments) { download_results_.push_back(result); last_download_attachments_ = attachments.Pass(); } - void StoreDone(const AttachmentService::StoreResult& result) { - store_results_.push_back(result); - } - void RunLoop() { base::RunLoop run_loop; run_loop.RunUntilIdle(); @@ -225,10 +217,6 @@ class AttachmentServiceImplTest : public testing::Test, return *last_download_attachments_.get(); } - const std::vector<AttachmentService::StoreResult>& store_results() const { - return store_results_; - } - MockAttachmentStore* store() { return attachment_store_.get(); } MockAttachmentDownloader* downloader() { @@ -253,10 +241,12 @@ class AttachmentServiceImplTest : public testing::Test, std::vector<AttachmentService::GetOrDownloadResult> download_results_; scoped_ptr<AttachmentMap> last_download_attachments_; std::vector<AttachmentId> on_attachment_uploaded_list_; - - std::vector<AttachmentService::StoreResult> store_results_; }; +TEST_F(AttachmentServiceImplTest, GetStore) { + EXPECT_EQ(store(), attachment_service()->GetStore()); +} + TEST_F(AttachmentServiceImplTest, GetOrDownload_EmptyAttachmentList) { AttachmentIdList attachment_ids; attachment_service()->GetOrDownloadAttachments(attachment_ids, @@ -348,105 +338,151 @@ TEST_F(AttachmentServiceImplTest, GetOrDownload_NoDownloader) { EXPECT_TRUE(last_download_attachments().empty()); } -TEST_F(AttachmentServiceImplTest, StoreAttachments_Success) { - scoped_refptr<base::RefCountedString> data = new base::RefCountedString(); - Attachment attachment(Attachment::Create(data)); - AttachmentList attachments; - attachments.push_back(attachment); - attachment_service()->StoreAttachments(attachments, store_callback()); - EXPECT_EQ(1U, store()->write_attachments.size()); - EXPECT_EQ(1U, uploader()->upload_requests.size()); +TEST_F(AttachmentServiceImplTest, UploadAttachments_Success) { + AttachmentIdSet attachment_ids; + const size_t num_attachments = 3; + for (unsigned i = 0; i < num_attachments; ++i) { + attachment_ids.insert(AttachmentId::Create()); + } + attachment_service()->UploadAttachments(attachment_ids); + RunLoop(); + // See that the service has issued reads for the attachments, but not yet + // uploaded anything. + EXPECT_EQ(num_attachments, store()->read_ids.size()); + EXPECT_EQ(0U, uploader()->upload_requests.size()); + for (unsigned i = 0; i < num_attachments; ++i) { + store()->RespondToRead(attachment_ids); + } - store()->RespondToWrite(AttachmentStore::SUCCESS); - uploader()->RespondToUpload(attachment.GetId(), - AttachmentUploader::UPLOAD_SUCCESS); RunLoop(); - ASSERT_EQ(1U, store_results().size()); - EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]); - ASSERT_EQ(1U, on_attachment_uploaded_list().size()); - EXPECT_EQ(attachment.GetId(), on_attachment_uploaded_list()[0]); + EXPECT_EQ(0U, store()->read_ids.size()); + EXPECT_EQ(num_attachments, uploader()->upload_requests.size()); + AttachmentIdSet::const_iterator iter = attachment_ids.begin(); + const AttachmentIdSet::const_iterator end = attachment_ids.end(); + for (; iter != end; ++iter) { + uploader()->RespondToUpload(*iter, AttachmentUploader::UPLOAD_SUCCESS); + } + RunLoop(); + + // See that all the attachments were uploaded. + ASSERT_EQ(attachment_ids.size(), on_attachment_uploaded_list().size()); + for (iter = attachment_ids.begin(); iter != end; ++iter) { + EXPECT_THAT(on_attachment_uploaded_list(), testing::Contains(*iter)); + } } -TEST_F(AttachmentServiceImplTest, - StoreAttachments_StoreFailsWithUnspecifiedError) { - scoped_refptr<base::RefCountedString> data = new base::RefCountedString(); - Attachment attachment(Attachment::Create(data)); - AttachmentList attachments; - attachments.push_back(attachment); - attachment_service()->StoreAttachments(attachments, store_callback()); - EXPECT_EQ(1U, store()->write_attachments.size()); - EXPECT_EQ(1U, uploader()->upload_requests.size()); +TEST_F(AttachmentServiceImplTest, UploadAttachments_Success_NoDelegate) { + InitializeAttachmentService(make_scoped_ptr(new MockAttachmentUploader()), + make_scoped_ptr(new MockAttachmentDownloader()), + NULL); // No delegate. - store()->RespondToWrite(AttachmentStore::UNSPECIFIED_ERROR); - uploader()->RespondToUpload(attachment.GetId(), + AttachmentIdSet attachment_ids; + attachment_ids.insert(AttachmentId::Create()); + attachment_service()->UploadAttachments(attachment_ids); + RunLoop(); + EXPECT_EQ(1U, store()->read_ids.size()); + EXPECT_EQ(0U, uploader()->upload_requests.size()); + store()->RespondToRead(attachment_ids); + RunLoop(); + EXPECT_EQ(0U, store()->read_ids.size()); + EXPECT_EQ(1U, uploader()->upload_requests.size()); + uploader()->RespondToUpload(*attachment_ids.begin(), AttachmentUploader::UPLOAD_SUCCESS); RunLoop(); - ASSERT_EQ(1U, store_results().size()); - EXPECT_EQ(AttachmentService::STORE_UNSPECIFIED_ERROR, store_results()[0]); - ASSERT_EQ(1U, on_attachment_uploaded_list().size()); - EXPECT_EQ(attachment.GetId(), on_attachment_uploaded_list()[0]); + ASSERT_TRUE(on_attachment_uploaded_list().empty()); } -TEST_F(AttachmentServiceImplTest, - StoreAttachments_UploadFailsWithUnspecifiedError) { - scoped_refptr<base::RefCountedString> data = new base::RefCountedString(); - Attachment attachment(Attachment::Create(data)); - AttachmentList attachments; - attachments.push_back(attachment); - attachment_service()->StoreAttachments(attachments, store_callback()); - EXPECT_EQ(1U, store()->write_attachments.size()); - EXPECT_EQ(1U, uploader()->upload_requests.size()); +TEST_F(AttachmentServiceImplTest, UploadAttachments_SomeMissingFromStore) { + AttachmentIdSet attachment_ids; + attachment_ids.insert(AttachmentId::Create()); + attachment_ids.insert(AttachmentId::Create()); - store()->RespondToWrite(AttachmentStore::SUCCESS); - uploader()->RespondToUpload(attachment.GetId(), - AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR); + attachment_service()->UploadAttachments(attachment_ids); RunLoop(); - ASSERT_EQ(1U, store_results().size()); - // Even though the upload failed, the Store operation is successful. - EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]); - EXPECT_TRUE(on_attachment_uploaded_list().empty()); + EXPECT_EQ(2U, store()->read_ids.size()); + EXPECT_EQ(0U, uploader()->upload_requests.size()); + store()->RespondToRead(attachment_ids); + EXPECT_EQ(1U, store()->read_ids.size()); + // Not found! + store()->RespondToRead(AttachmentIdSet()); + EXPECT_EQ(0U, store()->read_ids.size()); + RunLoop(); + + // One attachment went missing so we should see only one upload request. + EXPECT_EQ(1U, uploader()->upload_requests.size()); + uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, + AttachmentUploader::UPLOAD_SUCCESS); + RunLoop(); + + // See that the delegate was called for only one. + ASSERT_EQ(1U, on_attachment_uploaded_list().size()); } -TEST_F(AttachmentServiceImplTest, StoreAttachments_NoDelegate) { - InitializeAttachmentService(make_scoped_ptr(new MockAttachmentUploader()), - make_scoped_ptr(new MockAttachmentDownloader()), - NULL); // No delegate. +TEST_F(AttachmentServiceImplTest, UploadAttachments_AllMissingFromStore) { + AttachmentIdSet attachment_ids; + attachment_ids.insert(AttachmentId::Create()); + attachment_ids.insert(AttachmentId::Create()); - scoped_refptr<base::RefCountedString> data = new base::RefCountedString(); - Attachment attachment(Attachment::Create(data)); - AttachmentList attachments; - attachments.push_back(attachment); - attachment_service()->StoreAttachments(attachments, store_callback()); - EXPECT_EQ(1U, store()->write_attachments.size()); - EXPECT_EQ(1U, uploader()->upload_requests.size()); + attachment_service()->UploadAttachments(attachment_ids); + RunLoop(); + EXPECT_EQ(2U, store()->read_ids.size()); + EXPECT_EQ(0U, uploader()->upload_requests.size()); + // None found! + store()->RespondToRead(AttachmentIdSet()); + store()->RespondToRead(AttachmentIdSet()); + EXPECT_EQ(0U, store()->read_ids.size()); + RunLoop(); - store()->RespondToWrite(AttachmentStore::SUCCESS); - uploader()->RespondToUpload(attachment.GetId(), - AttachmentUploader::UPLOAD_SUCCESS); + // Nothing uploaded. + EXPECT_EQ(0U, uploader()->upload_requests.size()); RunLoop(); - ASSERT_EQ(1U, store_results().size()); - EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]); - EXPECT_TRUE(on_attachment_uploaded_list().empty()); + + // See that the delegate was never called. + ASSERT_EQ(0U, on_attachment_uploaded_list().size()); } -TEST_F(AttachmentServiceImplTest, StoreAttachments_NoUploader) { - // No uploader. +TEST_F(AttachmentServiceImplTest, UploadAttachments_NoUploader) { InitializeAttachmentService(make_scoped_ptr<MockAttachmentUploader>(NULL), make_scoped_ptr(new MockAttachmentDownloader()), this); - scoped_refptr<base::RefCountedString> data = new base::RefCountedString(); - Attachment attachment(Attachment::Create(data)); - AttachmentList attachments; - attachments.push_back(attachment); - attachment_service()->StoreAttachments(attachments, store_callback()); - EXPECT_EQ(1U, store()->write_attachments.size()); + AttachmentIdSet attachment_ids; + attachment_ids.insert(AttachmentId::Create()); + attachment_service()->UploadAttachments(attachment_ids); + RunLoop(); + EXPECT_EQ(0U, store()->read_ids.size()); + ASSERT_EQ(0U, on_attachment_uploaded_list().size()); +} + +// Upload three attachments. For one of them, server responds with error. +TEST_F(AttachmentServiceImplTest, UploadAttachments_OneUploadFails) { + AttachmentIdSet attachment_ids; + attachment_ids.insert(AttachmentId::Create()); + attachment_ids.insert(AttachmentId::Create()); + attachment_ids.insert(AttachmentId::Create()); + + attachment_service()->UploadAttachments(attachment_ids); + RunLoop(); + EXPECT_EQ(3U, store()->read_ids.size()); + EXPECT_EQ(0U, uploader()->upload_requests.size()); - store()->RespondToWrite(AttachmentStore::SUCCESS); + // All attachments found. + store()->RespondToRead(attachment_ids); + store()->RespondToRead(attachment_ids); + store()->RespondToRead(attachment_ids); RunLoop(); - ASSERT_EQ(1U, store_results().size()); - EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]); - EXPECT_TRUE(on_attachment_uploaded_list().empty()); + + EXPECT_EQ(3U, uploader()->upload_requests.size()); + uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, + AttachmentUploader::UPLOAD_SUCCESS); + uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, + AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR); + uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, + AttachmentUploader::UPLOAD_SUCCESS); + EXPECT_EQ(0U, uploader()->upload_requests.size()); + RunLoop(); + + EXPECT_EQ(2U, on_attachment_uploaded_list().size()); } } // namespace syncer diff --git a/sync/internal_api/attachments/attachment_service_proxy.cc b/sync/internal_api/attachments/attachment_service_proxy.cc index 1d41878..ef858dd 100644 --- a/sync/internal_api/attachments/attachment_service_proxy.cc +++ b/sync/internal_api/attachments/attachment_service_proxy.cc @@ -35,15 +35,6 @@ void ProxyDropCallback( task_runner->PostTask(FROM_HERE, base::Bind(callback, result)); } -// Invokes |callback| with |result| and |attachments| in the |task_runner| -// thread. -void ProxyStoreCallback( - const scoped_refptr<base::SequencedTaskRunner>& task_runner, - const AttachmentService::StoreCallback& callback, - const AttachmentService::StoreResult& result) { - task_runner->PostTask(FROM_HERE, base::Bind(callback, result)); -} - } // namespace AttachmentServiceProxy::AttachmentServiceProxy() { @@ -67,6 +58,10 @@ AttachmentServiceProxy::AttachmentServiceProxy( AttachmentServiceProxy::~AttachmentServiceProxy() { } +AttachmentStore* AttachmentServiceProxy::GetStore() { + return NULL; +} + void AttachmentServiceProxy::GetOrDownloadAttachments( const AttachmentIdList& attachment_ids, const GetOrDownloadCallback& callback) { @@ -96,17 +91,12 @@ void AttachmentServiceProxy::DropAttachments( proxy_callback)); } -void AttachmentServiceProxy::StoreAttachments(const AttachmentList& attachments, - const StoreCallback& callback) { - DCHECK(wrapped_task_runner_.get()); - StoreCallback proxy_callback = base::Bind( - &ProxyStoreCallback, base::ThreadTaskRunnerHandle::Get(), callback); +void AttachmentServiceProxy::UploadAttachments( + const AttachmentIdSet& attachment_ids) { + DCHECK(wrapped_task_runner_); wrapped_task_runner_->PostTask( FROM_HERE, - base::Bind(&AttachmentService::StoreAttachments, - core_, - attachments, - proxy_callback)); + base::Bind(&AttachmentService::UploadAttachments, core_, attachment_ids)); } AttachmentServiceProxy::Core::Core( @@ -117,6 +107,10 @@ AttachmentServiceProxy::Core::Core( AttachmentServiceProxy::Core::~Core() { } +AttachmentStore* AttachmentServiceProxy::Core::GetStore() { + return NULL; +} + void AttachmentServiceProxy::Core::GetOrDownloadAttachments( const AttachmentIdList& attachment_ids, const GetOrDownloadCallback& callback) { @@ -135,13 +129,12 @@ void AttachmentServiceProxy::Core::DropAttachments( wrapped_->DropAttachments(attachment_ids, callback); } -void AttachmentServiceProxy::Core::StoreAttachments( - const AttachmentList& attachments, - const StoreCallback& callback) { +void AttachmentServiceProxy::Core::UploadAttachments( + const AttachmentIdSet& attachment_ids) { if (!wrapped_) { return; } - wrapped_->StoreAttachments(attachments, callback); + wrapped_->UploadAttachments(attachment_ids); } } // namespace syncer diff --git a/sync/internal_api/attachments/attachment_service_proxy_unittest.cc b/sync/internal_api/attachments/attachment_service_proxy_unittest.cc index 545998d..a946382 100644 --- a/sync/internal_api/attachments/attachment_service_proxy_unittest.cc +++ b/sync/internal_api/attachments/attachment_service_proxy_unittest.cc @@ -34,6 +34,8 @@ class StubAttachmentService : public AttachmentService, virtual ~StubAttachmentService() {} + virtual AttachmentStore* GetStore() OVERRIDE { return NULL; } + virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids, const GetOrDownloadCallback& callback) OVERRIDE { @@ -55,12 +57,10 @@ class StubAttachmentService : public AttachmentService, FROM_HERE, base::Bind(callback, AttachmentService::DROP_SUCCESS)); } - virtual void StoreAttachments(const AttachmentList& attachments, - const StoreCallback& callback) OVERRIDE { + virtual void UploadAttachments( + const AttachmentIdSet& attachments_ids) OVERRIDE { CalledOnValidThread(); Increment(); - base::MessageLoop::current()->PostTask( - FROM_HERE, base::Bind(callback, AttachmentService::STORE_SUCCESS)); } virtual base::WeakPtr<AttachmentService> AsWeakPtr() { @@ -105,11 +105,8 @@ class AttachmentServiceProxyTest : public testing::Test, base::Unretained(this)); callback_drop = base::Bind(&AttachmentServiceProxyTest::IncrementDrop, base::Unretained(this)); - callback_store = base::Bind(&AttachmentServiceProxyTest::IncrementStore, - base::Unretained(this)); count_callback_get_or_download = 0; count_callback_drop = 0; - count_callback_store = 0; } virtual void TearDown() @@ -136,12 +133,6 @@ class AttachmentServiceProxyTest : public testing::Test, ++count_callback_drop; } - // a StoreCallback - void IncrementStore(const AttachmentService::StoreResult&) { - CalledOnValidThread(); - ++count_callback_store; - } - void WaitForStubThread() { base::WaitableEvent done(false, false); stub_thread->message_loop()->PostTask( @@ -157,23 +148,24 @@ class AttachmentServiceProxyTest : public testing::Test, AttachmentService::GetOrDownloadCallback callback_get_or_download; AttachmentService::DropCallback callback_drop; - AttachmentService::StoreCallback callback_store; // number of times callback_get_or_download was invoked int count_callback_get_or_download; // number of times callback_drop was invoked int count_callback_drop; - // number of times callback_store was invoked - int count_callback_store; }; -// Verify that each of AttachmentServiceProxy's callback methods (those that -// take callbacks) are invoked on the stub and that the passed callbacks are -// invoked in this thread. -TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) { +TEST_F(AttachmentServiceProxyTest, GetStore) { + EXPECT_EQ(NULL, proxy->GetStore()); +} + +// Verify that each of AttachmentServiceProxy's methods are invoked on the stub. +// Verify that the methods that take callbacks invoke passed callbacks on this +// thread. +TEST_F(AttachmentServiceProxyTest, MethodsAreProxied) { proxy->GetOrDownloadAttachments(AttachmentIdList(), callback_get_or_download); proxy->DropAttachments(AttachmentIdList(), callback_drop); - proxy->StoreAttachments(AttachmentList(), callback_store); + proxy->UploadAttachments(AttachmentIdSet()); // Wait for the posted calls to execute in the stub thread. WaitForStubThread(); EXPECT_EQ(3, stub->GetCallCount()); @@ -185,7 +177,6 @@ TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) { loop.RunUntilIdle(); EXPECT_EQ(1, count_callback_get_or_download); EXPECT_EQ(1, count_callback_drop); - EXPECT_EQ(1, count_callback_store); } // Verify that it's safe to use an AttachmentServiceProxy even after its wrapped diff --git a/sync/internal_api/public/attachments/attachment_service.h b/sync/internal_api/public/attachments/attachment_service.h index 6ac8cc6..758f960 100644 --- a/sync/internal_api/public/attachments/attachment_service.h +++ b/sync/internal_api/public/attachments/attachment_service.h @@ -14,6 +14,7 @@ namespace syncer { +class AttachmentStore; class SyncData; // AttachmentService is responsible for managing a model type's attachments. @@ -44,16 +45,6 @@ class SYNC_EXPORT AttachmentService { typedef base::Callback<void(const DropResult&)> DropCallback; - // The result of a StoreAttachments operation. - enum StoreResult { - STORE_SUCCESS, // No error, all attachments stored (at least - // locally). - STORE_UNSPECIFIED_ERROR, // An unspecified error occurred. Some or all - // attachments may not have been stored. - }; - - typedef base::Callback<void(const StoreResult&)> StoreCallback; - // An interface that embedder code implements to be notified about different // events that originate from AttachmentService. // This interface will be called from the same thread AttachmentService was @@ -70,6 +61,11 @@ class SYNC_EXPORT AttachmentService { AttachmentService(); virtual ~AttachmentService(); + // Return a pointer to the AttachmentStore owned by this object. + // + // May return NULL. + virtual AttachmentStore* GetStore() = 0; + // See SyncData::GetOrDownloadAttachments. virtual void GetOrDownloadAttachments( const AttachmentIdList& attachment_ids, @@ -79,12 +75,18 @@ class SYNC_EXPORT AttachmentService { virtual void DropAttachments(const AttachmentIdList& attachment_ids, const DropCallback& callback) = 0; - // Store |attachments| on device and (eventually) upload them to the server. + // Schedules the attachments identified by |attachment_ids| to be uploaded to + // the server. + // + // Assumes the attachments are already in the attachment store. + // + // A request to upload attachments is persistent in that uploads will be + // automatically retried if transient errors occur. + // + // A request to upload attachments does not persist across restarts of Chrome. // - // Invokes |callback| once the attachments have been written to device - // storage. - virtual void StoreAttachments(const AttachmentList& attachments, - const StoreCallback& callback) = 0; + // Invokes OnAttachmentUploaded on the Delegate (if provided). + virtual void UploadAttachments(const AttachmentIdSet& attachment_ids) = 0; }; } // namespace syncer diff --git a/sync/internal_api/public/attachments/attachment_service_impl.h b/sync/internal_api/public/attachments/attachment_service_impl.h index 53f6889..1fa0c54 100644 --- a/sync/internal_api/public/attachments/attachment_service_impl.h +++ b/sync/internal_api/public/attachments/attachment_service_impl.h @@ -5,6 +5,8 @@ #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_ #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_ +#include <deque> + #include "base/memory/ref_counted.h" #include "base/memory/weak_ptr.h" #include "base/threading/non_thread_safe.h" @@ -43,13 +45,14 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, static scoped_ptr<syncer::AttachmentService> CreateForTest(); // AttachmentService implementation. - virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids, - const GetOrDownloadCallback& callback) - OVERRIDE; + virtual AttachmentStore* GetStore() OVERRIDE; + virtual void GetOrDownloadAttachments( + const AttachmentIdList& attachment_ids, + const GetOrDownloadCallback& callback) OVERRIDE; virtual void DropAttachments(const AttachmentIdList& attachment_ids, const DropCallback& callback) OVERRIDE; - virtual void StoreAttachments(const AttachmentList& attachments, - const StoreCallback& callback) OVERRIDE; + virtual void UploadAttachments( + const AttachmentIdSet& attachment_ids) OVERRIDE; private: class GetOrDownloadState; @@ -60,14 +63,17 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, scoped_ptr<AttachmentIdList> unavailable_attachment_ids); void DropDone(const DropCallback& callback, const AttachmentStore::Result& result); - void WriteDone(const StoreCallback& callback, - const AttachmentStore::Result& result); void UploadDone(const AttachmentUploader::UploadResult& result, const AttachmentId& attachment_id); void DownloadDone(const scoped_refptr<GetOrDownloadState>& state, const AttachmentId& attachment_id, const AttachmentDownloader::DownloadResult& result, scoped_ptr<Attachment> attachment); + void ProcessQueuedUploads(); + void ReadDoneNowUpload( + const AttachmentStore::Result& result, + scoped_ptr<AttachmentMap> attachments, + scoped_ptr<AttachmentIdList> unavailable_attachment_ids); const scoped_ptr<AttachmentStore> attachment_store_; @@ -80,6 +86,13 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, // May be null. Delegate* delegate_; + // Queue of attachment ids to be uploaded. Every entry in this queue should + // also exist in ids_in_queue_. + std::deque<AttachmentId> queue_; + + // Ids of attachments currently being uploaded or queued for upload. + AttachmentIdSet ids_in_queue_; + // Must be last data member. base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_; diff --git a/sync/internal_api/public/attachments/attachment_service_proxy.h b/sync/internal_api/public/attachments/attachment_service_proxy.h index fb7c08c..fa51457 100644 --- a/sync/internal_api/public/attachments/attachment_service_proxy.h +++ b/sync/internal_api/public/attachments/attachment_service_proxy.h @@ -51,13 +51,16 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService { virtual ~AttachmentServiceProxy(); // AttachmentService implementation. + // + // GetStore always returns NULL. + virtual AttachmentStore* GetStore() OVERRIDE; virtual void GetOrDownloadAttachments( const AttachmentIdList& attachment_ids, const GetOrDownloadCallback& callback) OVERRIDE; virtual void DropAttachments(const AttachmentIdList& attachment_ids, const DropCallback& callback) OVERRIDE; - virtual void StoreAttachments(const AttachmentList& attachment, - const StoreCallback& callback) OVERRIDE; + virtual void UploadAttachments( + const AttachmentIdSet& attachment_ids) OVERRIDE; protected: // Core does the work of proxying calls to AttachmentService methods from one @@ -80,13 +83,14 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService { Core(const base::WeakPtr<syncer::AttachmentService>& wrapped); // AttachmentService implementation. + virtual AttachmentStore* GetStore() OVERRIDE; virtual void GetOrDownloadAttachments( const AttachmentIdList& attachment_ids, const GetOrDownloadCallback& callback) OVERRIDE; virtual void DropAttachments(const AttachmentIdList& attachment_ids, const DropCallback& callback) OVERRIDE; - virtual void StoreAttachments(const AttachmentList& attachment, - const StoreCallback& callback) OVERRIDE; + virtual void UploadAttachments( + const AttachmentIdSet& attachment_ids) OVERRIDE; protected: friend class base::RefCountedThreadSafe<Core>; |