summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaniscalco <maniscalco@chromium.org>2014-09-05 09:44:54 -0700
committerCommit bot <commit-bot@chromium.org>2014-09-05 16:47:12 +0000
commit5dde06853d1d5020796184c5fc749b9730ce8f07 (patch)
tree032fd16c70b02694c18ca634c0d44dba141c9c01
parent17267f48ca31c053628657ec7731bfb72f8ea8dc (diff)
downloadchromium_src-5dde06853d1d5020796184c5fc749b9730ce8f07.zip
chromium_src-5dde06853d1d5020796184c5fc749b9730ce8f07.tar.gz
chromium_src-5dde06853d1d5020796184c5fc749b9730ce8f07.tar.bz2
Replace AttachmentStore's StoreAttachments with UploadAttachments.
This change is lays groundwork for making attachment upload operations persistent. Make GenericChangeProcessor responsible for writing attachments to the AttachmentStore and calling UploadAttachments. In a future CL, datatype code will be responsible for writing attachments to the store. Queue up attachments for upload inside of AttachmentService. In a future CL we'll add rate limiting, retry, and back-off logic. Expose AttachmentService's AttachmentStore via GetStore method. BUG= Review URL: https://codereview.chromium.org/512413003 Cr-Commit-Position: refs/heads/master@{#293536}
-rw-r--r--components/sync_driver/generic_change_processor.cc65
-rw-r--r--components/sync_driver/generic_change_processor.h15
-rw-r--r--components/sync_driver/generic_change_processor_unittest.cc57
-rw-r--r--sync/internal_api/attachments/attachment_service_impl.cc94
-rw-r--r--sync/internal_api/attachments/attachment_service_impl_unittest.cc216
-rw-r--r--sync/internal_api/attachments/attachment_service_proxy.cc37
-rw-r--r--sync/internal_api/attachments/attachment_service_proxy_unittest.cc35
-rw-r--r--sync/internal_api/public/attachments/attachment_service.h32
-rw-r--r--sync/internal_api/public/attachments/attachment_service_impl.h27
-rw-r--r--sync/internal_api/public/attachments/attachment_service_proxy.h12
10 files changed, 354 insertions, 236 deletions
diff --git a/components/sync_driver/generic_change_processor.cc b/components/sync_driver/generic_change_processor.cc
index c44ce72..ddf6b67 100644
--- a/components/sync_driver/generic_change_processor.cc
+++ b/components/sync_driver/generic_change_processor.cc
@@ -85,6 +85,11 @@ syncer::SyncData BuildRemoteSyncData(
attachment_service_proxy);
}
+const syncer::AttachmentId& AttachmentToAttachmentId(
+ const syncer::Attachment& attachment) {
+ return attachment.GetId();
+}
+
} // namespace
GenericChangeProcessor::GenericChangeProcessor(
@@ -102,7 +107,8 @@ GenericChangeProcessor::GenericChangeProcessor(
attachment_service_weak_ptr_factory_(attachment_service_.get()),
attachment_service_proxy_(
base::MessageLoopProxy::current(),
- attachment_service_weak_ptr_factory_.GetWeakPtr()) {
+ attachment_service_weak_ptr_factory_.GetWeakPtr()),
+ weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread());
DCHECK(attachment_service_);
}
@@ -387,25 +393,6 @@ syncer::SyncError AttemptDelete(const syncer::SyncChange& change,
return syncer::SyncError();
}
-// A callback invoked on completion of AttachmentService::StoreAttachment.
-void IgnoreStoreResult(const syncer::AttachmentService::StoreResult&) {
- // TODO(maniscalco): Here is where we're going to update the in-directory
- // entry to indicate that the attachments have been successfully stored on
- // disk. Why do we care? Because we might crash after persisting the
- // directory to disk, but before we have persisted its attachments, leaving us
- // with danging attachment ids. Having a flag that indicates we've stored the
- // entry will allow us to detect and filter entries with dangling attachment
- // ids (bug 368353).
-}
-
-void StoreAttachments(syncer::AttachmentService* attachment_service,
- const syncer::AttachmentList& attachments) {
- DCHECK(attachment_service);
- syncer::AttachmentService::StoreCallback ignore_store_result =
- base::Bind(&IgnoreStoreResult);
- attachment_service->StoreAttachments(attachments, ignore_store_result);
-}
-
} // namespace
syncer::SyncError GenericChangeProcessor::ProcessSyncChanges(
@@ -465,7 +452,7 @@ syncer::SyncError GenericChangeProcessor::ProcessSyncChanges(
}
if (!new_attachments.empty()) {
- StoreAttachments(attachment_service_.get(), new_attachments);
+ StoreAndUploadAttachments(new_attachments);
}
return syncer::SyncError();
@@ -711,4 +698,40 @@ syncer::UserShare* GenericChangeProcessor::share_handle() const {
return share_handle_;
}
+void GenericChangeProcessor::StoreAndUploadAttachments(
+ const syncer::AttachmentList& attachments) {
+ DCHECK(CalledOnValidThread());
+ attachment_service_->GetStore()->Write(
+ attachments,
+ base::Bind(&GenericChangeProcessor::WriteAttachmentsDone,
+ weak_ptr_factory_.GetWeakPtr(),
+ attachments));
+}
+
+void GenericChangeProcessor::WriteAttachmentsDone(
+ const syncer::AttachmentList& attachments,
+ const syncer::AttachmentStore::Result& result) {
+ DCHECK(CalledOnValidThread());
+ if (result != syncer::AttachmentStore::SUCCESS) {
+ // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
+ return;
+ }
+
+ // TODO(maniscalco): Here is where we're going to update the in-directory
+ // entry to indicate that the attachments have been successfully stored on
+ // disk. Why do we care? Because we might crash after persisting the
+ // directory to disk, but before we have persisted its attachments, leaving us
+ // with danging attachment ids. Having a flag that indicates we've stored the
+ // entry will allow us to detect and filter entries with dangling attachment
+ // ids (bug 368353).
+
+ // Begin uploading the attachments now that they are safe on disk.
+ syncer::AttachmentIdSet attachment_ids;
+ std::transform(attachments.begin(),
+ attachments.end(),
+ std::inserter(attachment_ids, attachment_ids.end()),
+ AttachmentToAttachmentId);
+ attachment_service_->UploadAttachments(attachment_ids);
+}
+
} // namespace sync_driver
diff --git a/components/sync_driver/generic_change_processor.h b/components/sync_driver/generic_change_processor.h
index 1941600..a3139d8 100644
--- a/components/sync_driver/generic_change_processor.h
+++ b/components/sync_driver/generic_change_processor.h
@@ -13,6 +13,7 @@
#include "components/sync_driver/change_processor.h"
#include "components/sync_driver/data_type_controller.h"
#include "components/sync_driver/data_type_error_handler.h"
+#include "sync/api/attachments/attachment_store.h"
#include "sync/api/sync_change_processor.h"
#include "sync/api/sync_merge_result.h"
#include "sync/internal_api/public/attachments/attachment_service.h"
@@ -126,6 +127,18 @@ class GenericChangeProcessor : public ChangeProcessor,
syncer::WriteNode* sync_node,
syncer::AttachmentList* new_attachments);
+ // Store |attachments| locally then upload them to the sync server.
+ //
+ // Store and uploading are asynchronous operations. |WriteAttachmentsDone|
+ // will be invoked once the attachments have been stored on the local device.
+ void StoreAndUploadAttachments(const syncer::AttachmentList& attachments);
+
+ // Invoked once attachments have been stored locally.
+ //
+ // See also AttachmentStore::WriteCallback.
+ void WriteAttachmentsDone(const syncer::AttachmentList& attachments,
+ const syncer::AttachmentStore::Result& result);
+
// The SyncableService this change processor will forward changes on to.
const base::WeakPtr<syncer::SyncableService> local_service_;
@@ -155,6 +168,8 @@ class GenericChangeProcessor : public ChangeProcessor,
attachment_service_weak_ptr_factory_;
syncer::AttachmentServiceProxy attachment_service_proxy_;
+ base::WeakPtrFactory<GenericChangeProcessor> weak_ptr_factory_;
+
DISALLOW_COPY_AND_ASSIGN(GenericChangeProcessor);
};
diff --git a/components/sync_driver/generic_change_processor_unittest.cc b/components/sync_driver/generic_change_processor_unittest.cc
index bdd7266..f1b35e6 100644
--- a/components/sync_driver/generic_change_processor_unittest.cc
+++ b/components/sync_driver/generic_change_processor_unittest.cc
@@ -7,6 +7,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
#include "base/strings/stringprintf.h"
#include "components/sync_driver/data_type_error_handler_mock.h"
#include "components/sync_driver/sync_api_component_factory.h"
@@ -25,6 +26,7 @@
#include "sync/internal_api/public/user_share.h"
#include "sync/internal_api/public/write_node.h"
#include "sync/internal_api/public/write_transaction.h"
+#include "testing/gmock/include/gmock/gmock-matchers.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace sync_driver {
@@ -33,17 +35,17 @@ namespace {
const char kTestData[] = "some data";
-// A mock that keeps track of attachments passed to StoreAttachments.
+// A mock that keeps track of attachments passed to UploadAttachments.
class MockAttachmentService : public syncer::AttachmentServiceImpl {
public:
MockAttachmentService();
virtual ~MockAttachmentService();
- virtual void StoreAttachments(const syncer::AttachmentList& attachments,
- const StoreCallback& callback) OVERRIDE;
- std::vector<syncer::AttachmentList>* attachment_lists();
+ virtual void UploadAttachments(
+ const syncer::AttachmentIdSet& attachment_ids) OVERRIDE;
+ std::vector<syncer::AttachmentIdSet>* attachment_id_sets();
private:
- std::vector<syncer::AttachmentList> attachment_lists_;
+ std::vector<syncer::AttachmentIdSet> attachment_id_sets_;
};
MockAttachmentService::MockAttachmentService()
@@ -60,15 +62,15 @@ MockAttachmentService::MockAttachmentService()
MockAttachmentService::~MockAttachmentService() {
}
-void MockAttachmentService::StoreAttachments(
- const syncer::AttachmentList& attachments,
- const StoreCallback& callback) {
- attachment_lists_.push_back(attachments);
- AttachmentServiceImpl::StoreAttachments(attachments, callback);
+void MockAttachmentService::UploadAttachments(
+ const syncer::AttachmentIdSet& attachment_ids) {
+ attachment_id_sets_.push_back(attachment_ids);
+ AttachmentServiceImpl::UploadAttachments(attachment_ids);
}
-std::vector<syncer::AttachmentList>* MockAttachmentService::attachment_lists() {
- return &attachment_lists_;
+std::vector<syncer::AttachmentIdSet>*
+MockAttachmentService::attachment_id_sets() {
+ return &attachment_id_sets_;
}
// MockSyncApiComponentFactory needed to initialize GenericChangeProcessor and
@@ -161,6 +163,11 @@ class SyncGenericChangeProcessorTest : public testing::Test {
return mock_attachment_service_;
}
+ void RunLoop() {
+ base::RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
+
private:
base::MessageLoopForUI loop_;
@@ -349,19 +356,20 @@ TEST_F(SyncGenericChangeProcessorTest,
tag, title, specifics, attachments)));
ASSERT_FALSE(
change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet());
+ RunLoop();
// Check that the AttachmentService received the new attachments.
- ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U);
- const syncer::AttachmentList& attachments_added =
- mock_attachment_service()->attachment_lists()->front();
- ASSERT_EQ(attachments_added.size(), 2U);
- ASSERT_EQ(attachments_added[0].GetId(), attachments[0].GetId());
- ASSERT_EQ(attachments_added[1].GetId(), attachments[1].GetId());
+ ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U);
+ const syncer::AttachmentIdSet& attachments_added =
+ mock_attachment_service()->attachment_id_sets()->front();
+ ASSERT_THAT(attachments_added,
+ testing::UnorderedElementsAre(attachments[0].GetId(),
+ attachments[1].GetId()));
// Update the SyncData, replacing its two attachments with one new attachment.
syncer::AttachmentList new_attachments;
new_attachments.push_back(syncer::Attachment::Create(attachment_data));
- mock_attachment_service()->attachment_lists()->clear();
+ mock_attachment_service()->attachment_id_sets()->clear();
change_list.clear();
change_list.push_back(
syncer::SyncChange(FROM_HERE,
@@ -370,13 +378,14 @@ TEST_F(SyncGenericChangeProcessorTest,
tag, title, specifics, new_attachments)));
ASSERT_FALSE(
change_processor()->ProcessSyncChanges(FROM_HERE, change_list).IsSet());
+ RunLoop();
// Check that the AttachmentService received it.
- ASSERT_EQ(mock_attachment_service()->attachment_lists()->size(), 1U);
- const syncer::AttachmentList& new_attachments_added =
- mock_attachment_service()->attachment_lists()->front();
- ASSERT_EQ(new_attachments_added.size(), 1U);
- ASSERT_EQ(new_attachments_added[0].GetId(), new_attachments[0].GetId());
+ ASSERT_EQ(mock_attachment_service()->attachment_id_sets()->size(), 1U);
+ const syncer::AttachmentIdSet& new_attachments_added =
+ mock_attachment_service()->attachment_id_sets()->front();
+ ASSERT_THAT(new_attachments_added,
+ testing::UnorderedElementsAre(new_attachments[0].GetId()));
}
// Verify that after attachment is uploaded GenericChangeProcessor updates
diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc
index 61fb1c6..e9d3a82 100644
--- a/sync/internal_api/attachments/attachment_service_impl.cc
+++ b/sync/internal_api/attachments/attachment_service_impl.cc
@@ -4,6 +4,8 @@
#include "sync/internal_api/public/attachments/attachment_service_impl.h"
+#include <iterator>
+
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "base/thread_task_runner_handle.h"
@@ -141,6 +143,10 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
return attachment_service.Pass();
}
+AttachmentStore* AttachmentServiceImpl::GetStore() {
+ return attachment_store_.get();
+}
+
void AttachmentServiceImpl::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
@@ -163,25 +169,6 @@ void AttachmentServiceImpl::DropAttachments(
callback));
}
-void AttachmentServiceImpl::StoreAttachments(const AttachmentList& attachments,
- const StoreCallback& callback) {
- DCHECK(CalledOnValidThread());
- attachment_store_->Write(attachments,
- base::Bind(&AttachmentServiceImpl::WriteDone,
- weak_ptr_factory_.GetWeakPtr(),
- callback));
- if (attachment_uploader_.get()) {
- for (AttachmentList::const_iterator iter = attachments.begin();
- iter != attachments.end();
- ++iter) {
- attachment_uploader_->UploadAttachment(
- *iter,
- base::Bind(&AttachmentServiceImpl::UploadDone,
- weak_ptr_factory_.GetWeakPtr()));
- }
- }
-}
-
void AttachmentServiceImpl::ReadDone(
const scoped_refptr<GetOrDownloadState>& state,
const AttachmentStore::Result& result,
@@ -226,21 +213,10 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback,
base::Bind(callback, drop_result));
}
-void AttachmentServiceImpl::WriteDone(const StoreCallback& callback,
- const AttachmentStore::Result& result) {
- AttachmentService::StoreResult store_result =
- AttachmentService::STORE_UNSPECIFIED_ERROR;
- if (result == AttachmentStore::SUCCESS) {
- store_result = AttachmentService::STORE_SUCCESS;
- }
- // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
- base::MessageLoop::current()->PostTask(FROM_HERE,
- base::Bind(callback, store_result));
-}
-
void AttachmentServiceImpl::UploadDone(
const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id) {
+ ids_in_queue_.erase(attachment_id);
// TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
if (result != AttachmentUploader::UPLOAD_SUCCESS)
return;
@@ -261,4 +237,60 @@ void AttachmentServiceImpl::DownloadDone(
}
}
+void AttachmentServiceImpl::UploadAttachments(
+ const AttachmentIdSet& attachment_ids) {
+ DCHECK(CalledOnValidThread());
+ if (!attachment_uploader_.get()) {
+ return;
+ }
+
+ // Enqueue the attachment ids that aren't already in the queue.
+ AttachmentIdSet::const_iterator iter = attachment_ids.begin();
+ AttachmentIdSet::const_iterator end = attachment_ids.end();
+ for (; iter != end; ++iter) {
+ if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) {
+ queue_.push_back(*iter);
+ ids_in_queue_.insert(*iter);
+ }
+ }
+
+ ProcessQueuedUploads();
+}
+
+void AttachmentServiceImpl::ProcessQueuedUploads() {
+ DCHECK(CalledOnValidThread());
+ // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of
+ // concurrent uploads and apply backoff on failure.
+ while (!queue_.empty()) {
+ const AttachmentId id = queue_.front();
+ queue_.pop_front();
+ AttachmentIdList attachment_ids;
+ attachment_ids.push_back(id);
+ attachment_store_->Read(
+ attachment_ids,
+ base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+}
+
+void AttachmentServiceImpl::ReadDoneNowUpload(
+ const AttachmentStore::Result& result,
+ scoped_ptr<AttachmentMap> attachments,
+ scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
+ DCHECK(CalledOnValidThread());
+ if (!unavailable_attachment_ids->empty()) {
+ // TODO(maniscalco): We failed to read some attachments. What should we do
+ // now?
+ }
+
+ AttachmentMap::const_iterator iter = attachments->begin();
+ AttachmentMap::const_iterator end = attachments->end();
+ for (; iter != end; ++iter) {
+ attachment_uploader_->UploadAttachment(
+ iter->second,
+ base::Bind(&AttachmentServiceImpl::UploadDone,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+}
+
} // namespace syncer
diff --git a/sync/internal_api/attachments/attachment_service_impl_unittest.cc b/sync/internal_api/attachments/attachment_service_impl_unittest.cc
index d8bf6b2..1b642e6 100644
--- a/sync/internal_api/attachments/attachment_service_impl_unittest.cc
+++ b/sync/internal_api/attachments/attachment_service_impl_unittest.cc
@@ -10,6 +10,7 @@
#include "base/run_loop.h"
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
#include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
+#include "testing/gmock/include/gmock/gmock-matchers.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace syncer {
@@ -196,21 +197,12 @@ class AttachmentServiceImplTest : public testing::Test,
base::Unretained(this));
}
- AttachmentService::StoreCallback store_callback() {
- return base::Bind(&AttachmentServiceImplTest::StoreDone,
- base::Unretained(this));
- }
-
void DownloadDone(const AttachmentService::GetOrDownloadResult& result,
scoped_ptr<AttachmentMap> attachments) {
download_results_.push_back(result);
last_download_attachments_ = attachments.Pass();
}
- void StoreDone(const AttachmentService::StoreResult& result) {
- store_results_.push_back(result);
- }
-
void RunLoop() {
base::RunLoop run_loop;
run_loop.RunUntilIdle();
@@ -225,10 +217,6 @@ class AttachmentServiceImplTest : public testing::Test,
return *last_download_attachments_.get();
}
- const std::vector<AttachmentService::StoreResult>& store_results() const {
- return store_results_;
- }
-
MockAttachmentStore* store() { return attachment_store_.get(); }
MockAttachmentDownloader* downloader() {
@@ -253,10 +241,12 @@ class AttachmentServiceImplTest : public testing::Test,
std::vector<AttachmentService::GetOrDownloadResult> download_results_;
scoped_ptr<AttachmentMap> last_download_attachments_;
std::vector<AttachmentId> on_attachment_uploaded_list_;
-
- std::vector<AttachmentService::StoreResult> store_results_;
};
+TEST_F(AttachmentServiceImplTest, GetStore) {
+ EXPECT_EQ(store(), attachment_service()->GetStore());
+}
+
TEST_F(AttachmentServiceImplTest, GetOrDownload_EmptyAttachmentList) {
AttachmentIdList attachment_ids;
attachment_service()->GetOrDownloadAttachments(attachment_ids,
@@ -348,105 +338,151 @@ TEST_F(AttachmentServiceImplTest, GetOrDownload_NoDownloader) {
EXPECT_TRUE(last_download_attachments().empty());
}
-TEST_F(AttachmentServiceImplTest, StoreAttachments_Success) {
- scoped_refptr<base::RefCountedString> data = new base::RefCountedString();
- Attachment attachment(Attachment::Create(data));
- AttachmentList attachments;
- attachments.push_back(attachment);
- attachment_service()->StoreAttachments(attachments, store_callback());
- EXPECT_EQ(1U, store()->write_attachments.size());
- EXPECT_EQ(1U, uploader()->upload_requests.size());
+TEST_F(AttachmentServiceImplTest, UploadAttachments_Success) {
+ AttachmentIdSet attachment_ids;
+ const size_t num_attachments = 3;
+ for (unsigned i = 0; i < num_attachments; ++i) {
+ attachment_ids.insert(AttachmentId::Create());
+ }
+ attachment_service()->UploadAttachments(attachment_ids);
+ RunLoop();
+ // See that the service has issued reads for the attachments, but not yet
+ // uploaded anything.
+ EXPECT_EQ(num_attachments, store()->read_ids.size());
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
+ for (unsigned i = 0; i < num_attachments; ++i) {
+ store()->RespondToRead(attachment_ids);
+ }
- store()->RespondToWrite(AttachmentStore::SUCCESS);
- uploader()->RespondToUpload(attachment.GetId(),
- AttachmentUploader::UPLOAD_SUCCESS);
RunLoop();
- ASSERT_EQ(1U, store_results().size());
- EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]);
- ASSERT_EQ(1U, on_attachment_uploaded_list().size());
- EXPECT_EQ(attachment.GetId(), on_attachment_uploaded_list()[0]);
+ EXPECT_EQ(0U, store()->read_ids.size());
+ EXPECT_EQ(num_attachments, uploader()->upload_requests.size());
+ AttachmentIdSet::const_iterator iter = attachment_ids.begin();
+ const AttachmentIdSet::const_iterator end = attachment_ids.end();
+ for (; iter != end; ++iter) {
+ uploader()->RespondToUpload(*iter, AttachmentUploader::UPLOAD_SUCCESS);
+ }
+ RunLoop();
+
+ // See that all the attachments were uploaded.
+ ASSERT_EQ(attachment_ids.size(), on_attachment_uploaded_list().size());
+ for (iter = attachment_ids.begin(); iter != end; ++iter) {
+ EXPECT_THAT(on_attachment_uploaded_list(), testing::Contains(*iter));
+ }
}
-TEST_F(AttachmentServiceImplTest,
- StoreAttachments_StoreFailsWithUnspecifiedError) {
- scoped_refptr<base::RefCountedString> data = new base::RefCountedString();
- Attachment attachment(Attachment::Create(data));
- AttachmentList attachments;
- attachments.push_back(attachment);
- attachment_service()->StoreAttachments(attachments, store_callback());
- EXPECT_EQ(1U, store()->write_attachments.size());
- EXPECT_EQ(1U, uploader()->upload_requests.size());
+TEST_F(AttachmentServiceImplTest, UploadAttachments_Success_NoDelegate) {
+ InitializeAttachmentService(make_scoped_ptr(new MockAttachmentUploader()),
+ make_scoped_ptr(new MockAttachmentDownloader()),
+ NULL); // No delegate.
- store()->RespondToWrite(AttachmentStore::UNSPECIFIED_ERROR);
- uploader()->RespondToUpload(attachment.GetId(),
+ AttachmentIdSet attachment_ids;
+ attachment_ids.insert(AttachmentId::Create());
+ attachment_service()->UploadAttachments(attachment_ids);
+ RunLoop();
+ EXPECT_EQ(1U, store()->read_ids.size());
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
+ store()->RespondToRead(attachment_ids);
+ RunLoop();
+ EXPECT_EQ(0U, store()->read_ids.size());
+ EXPECT_EQ(1U, uploader()->upload_requests.size());
+ uploader()->RespondToUpload(*attachment_ids.begin(),
AttachmentUploader::UPLOAD_SUCCESS);
RunLoop();
- ASSERT_EQ(1U, store_results().size());
- EXPECT_EQ(AttachmentService::STORE_UNSPECIFIED_ERROR, store_results()[0]);
- ASSERT_EQ(1U, on_attachment_uploaded_list().size());
- EXPECT_EQ(attachment.GetId(), on_attachment_uploaded_list()[0]);
+ ASSERT_TRUE(on_attachment_uploaded_list().empty());
}
-TEST_F(AttachmentServiceImplTest,
- StoreAttachments_UploadFailsWithUnspecifiedError) {
- scoped_refptr<base::RefCountedString> data = new base::RefCountedString();
- Attachment attachment(Attachment::Create(data));
- AttachmentList attachments;
- attachments.push_back(attachment);
- attachment_service()->StoreAttachments(attachments, store_callback());
- EXPECT_EQ(1U, store()->write_attachments.size());
- EXPECT_EQ(1U, uploader()->upload_requests.size());
+TEST_F(AttachmentServiceImplTest, UploadAttachments_SomeMissingFromStore) {
+ AttachmentIdSet attachment_ids;
+ attachment_ids.insert(AttachmentId::Create());
+ attachment_ids.insert(AttachmentId::Create());
- store()->RespondToWrite(AttachmentStore::SUCCESS);
- uploader()->RespondToUpload(attachment.GetId(),
- AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR);
+ attachment_service()->UploadAttachments(attachment_ids);
RunLoop();
- ASSERT_EQ(1U, store_results().size());
- // Even though the upload failed, the Store operation is successful.
- EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]);
- EXPECT_TRUE(on_attachment_uploaded_list().empty());
+ EXPECT_EQ(2U, store()->read_ids.size());
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
+ store()->RespondToRead(attachment_ids);
+ EXPECT_EQ(1U, store()->read_ids.size());
+ // Not found!
+ store()->RespondToRead(AttachmentIdSet());
+ EXPECT_EQ(0U, store()->read_ids.size());
+ RunLoop();
+
+ // One attachment went missing so we should see only one upload request.
+ EXPECT_EQ(1U, uploader()->upload_requests.size());
+ uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
+ AttachmentUploader::UPLOAD_SUCCESS);
+ RunLoop();
+
+ // See that the delegate was called for only one.
+ ASSERT_EQ(1U, on_attachment_uploaded_list().size());
}
-TEST_F(AttachmentServiceImplTest, StoreAttachments_NoDelegate) {
- InitializeAttachmentService(make_scoped_ptr(new MockAttachmentUploader()),
- make_scoped_ptr(new MockAttachmentDownloader()),
- NULL); // No delegate.
+TEST_F(AttachmentServiceImplTest, UploadAttachments_AllMissingFromStore) {
+ AttachmentIdSet attachment_ids;
+ attachment_ids.insert(AttachmentId::Create());
+ attachment_ids.insert(AttachmentId::Create());
- scoped_refptr<base::RefCountedString> data = new base::RefCountedString();
- Attachment attachment(Attachment::Create(data));
- AttachmentList attachments;
- attachments.push_back(attachment);
- attachment_service()->StoreAttachments(attachments, store_callback());
- EXPECT_EQ(1U, store()->write_attachments.size());
- EXPECT_EQ(1U, uploader()->upload_requests.size());
+ attachment_service()->UploadAttachments(attachment_ids);
+ RunLoop();
+ EXPECT_EQ(2U, store()->read_ids.size());
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
+ // None found!
+ store()->RespondToRead(AttachmentIdSet());
+ store()->RespondToRead(AttachmentIdSet());
+ EXPECT_EQ(0U, store()->read_ids.size());
+ RunLoop();
- store()->RespondToWrite(AttachmentStore::SUCCESS);
- uploader()->RespondToUpload(attachment.GetId(),
- AttachmentUploader::UPLOAD_SUCCESS);
+ // Nothing uploaded.
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
RunLoop();
- ASSERT_EQ(1U, store_results().size());
- EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]);
- EXPECT_TRUE(on_attachment_uploaded_list().empty());
+
+ // See that the delegate was never called.
+ ASSERT_EQ(0U, on_attachment_uploaded_list().size());
}
-TEST_F(AttachmentServiceImplTest, StoreAttachments_NoUploader) {
- // No uploader.
+TEST_F(AttachmentServiceImplTest, UploadAttachments_NoUploader) {
InitializeAttachmentService(make_scoped_ptr<MockAttachmentUploader>(NULL),
make_scoped_ptr(new MockAttachmentDownloader()),
this);
- scoped_refptr<base::RefCountedString> data = new base::RefCountedString();
- Attachment attachment(Attachment::Create(data));
- AttachmentList attachments;
- attachments.push_back(attachment);
- attachment_service()->StoreAttachments(attachments, store_callback());
- EXPECT_EQ(1U, store()->write_attachments.size());
+ AttachmentIdSet attachment_ids;
+ attachment_ids.insert(AttachmentId::Create());
+ attachment_service()->UploadAttachments(attachment_ids);
+ RunLoop();
+ EXPECT_EQ(0U, store()->read_ids.size());
+ ASSERT_EQ(0U, on_attachment_uploaded_list().size());
+}
+
+// Upload three attachments. For one of them, server responds with error.
+TEST_F(AttachmentServiceImplTest, UploadAttachments_OneUploadFails) {
+ AttachmentIdSet attachment_ids;
+ attachment_ids.insert(AttachmentId::Create());
+ attachment_ids.insert(AttachmentId::Create());
+ attachment_ids.insert(AttachmentId::Create());
+
+ attachment_service()->UploadAttachments(attachment_ids);
+ RunLoop();
+ EXPECT_EQ(3U, store()->read_ids.size());
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
- store()->RespondToWrite(AttachmentStore::SUCCESS);
+ // All attachments found.
+ store()->RespondToRead(attachment_ids);
+ store()->RespondToRead(attachment_ids);
+ store()->RespondToRead(attachment_ids);
RunLoop();
- ASSERT_EQ(1U, store_results().size());
- EXPECT_EQ(AttachmentService::STORE_SUCCESS, store_results()[0]);
- EXPECT_TRUE(on_attachment_uploaded_list().empty());
+
+ EXPECT_EQ(3U, uploader()->upload_requests.size());
+ uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
+ AttachmentUploader::UPLOAD_SUCCESS);
+ uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
+ AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR);
+ uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
+ AttachmentUploader::UPLOAD_SUCCESS);
+ EXPECT_EQ(0U, uploader()->upload_requests.size());
+ RunLoop();
+
+ EXPECT_EQ(2U, on_attachment_uploaded_list().size());
}
} // namespace syncer
diff --git a/sync/internal_api/attachments/attachment_service_proxy.cc b/sync/internal_api/attachments/attachment_service_proxy.cc
index 1d41878..ef858dd 100644
--- a/sync/internal_api/attachments/attachment_service_proxy.cc
+++ b/sync/internal_api/attachments/attachment_service_proxy.cc
@@ -35,15 +35,6 @@ void ProxyDropCallback(
task_runner->PostTask(FROM_HERE, base::Bind(callback, result));
}
-// Invokes |callback| with |result| and |attachments| in the |task_runner|
-// thread.
-void ProxyStoreCallback(
- const scoped_refptr<base::SequencedTaskRunner>& task_runner,
- const AttachmentService::StoreCallback& callback,
- const AttachmentService::StoreResult& result) {
- task_runner->PostTask(FROM_HERE, base::Bind(callback, result));
-}
-
} // namespace
AttachmentServiceProxy::AttachmentServiceProxy() {
@@ -67,6 +58,10 @@ AttachmentServiceProxy::AttachmentServiceProxy(
AttachmentServiceProxy::~AttachmentServiceProxy() {
}
+AttachmentStore* AttachmentServiceProxy::GetStore() {
+ return NULL;
+}
+
void AttachmentServiceProxy::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
@@ -96,17 +91,12 @@ void AttachmentServiceProxy::DropAttachments(
proxy_callback));
}
-void AttachmentServiceProxy::StoreAttachments(const AttachmentList& attachments,
- const StoreCallback& callback) {
- DCHECK(wrapped_task_runner_.get());
- StoreCallback proxy_callback = base::Bind(
- &ProxyStoreCallback, base::ThreadTaskRunnerHandle::Get(), callback);
+void AttachmentServiceProxy::UploadAttachments(
+ const AttachmentIdSet& attachment_ids) {
+ DCHECK(wrapped_task_runner_);
wrapped_task_runner_->PostTask(
FROM_HERE,
- base::Bind(&AttachmentService::StoreAttachments,
- core_,
- attachments,
- proxy_callback));
+ base::Bind(&AttachmentService::UploadAttachments, core_, attachment_ids));
}
AttachmentServiceProxy::Core::Core(
@@ -117,6 +107,10 @@ AttachmentServiceProxy::Core::Core(
AttachmentServiceProxy::Core::~Core() {
}
+AttachmentStore* AttachmentServiceProxy::Core::GetStore() {
+ return NULL;
+}
+
void AttachmentServiceProxy::Core::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
@@ -135,13 +129,12 @@ void AttachmentServiceProxy::Core::DropAttachments(
wrapped_->DropAttachments(attachment_ids, callback);
}
-void AttachmentServiceProxy::Core::StoreAttachments(
- const AttachmentList& attachments,
- const StoreCallback& callback) {
+void AttachmentServiceProxy::Core::UploadAttachments(
+ const AttachmentIdSet& attachment_ids) {
if (!wrapped_) {
return;
}
- wrapped_->StoreAttachments(attachments, callback);
+ wrapped_->UploadAttachments(attachment_ids);
}
} // namespace syncer
diff --git a/sync/internal_api/attachments/attachment_service_proxy_unittest.cc b/sync/internal_api/attachments/attachment_service_proxy_unittest.cc
index 545998d..a946382 100644
--- a/sync/internal_api/attachments/attachment_service_proxy_unittest.cc
+++ b/sync/internal_api/attachments/attachment_service_proxy_unittest.cc
@@ -34,6 +34,8 @@ class StubAttachmentService : public AttachmentService,
virtual ~StubAttachmentService() {}
+ virtual AttachmentStore* GetStore() OVERRIDE { return NULL; }
+
virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback)
OVERRIDE {
@@ -55,12 +57,10 @@ class StubAttachmentService : public AttachmentService,
FROM_HERE, base::Bind(callback, AttachmentService::DROP_SUCCESS));
}
- virtual void StoreAttachments(const AttachmentList& attachments,
- const StoreCallback& callback) OVERRIDE {
+ virtual void UploadAttachments(
+ const AttachmentIdSet& attachments_ids) OVERRIDE {
CalledOnValidThread();
Increment();
- base::MessageLoop::current()->PostTask(
- FROM_HERE, base::Bind(callback, AttachmentService::STORE_SUCCESS));
}
virtual base::WeakPtr<AttachmentService> AsWeakPtr() {
@@ -105,11 +105,8 @@ class AttachmentServiceProxyTest : public testing::Test,
base::Unretained(this));
callback_drop = base::Bind(&AttachmentServiceProxyTest::IncrementDrop,
base::Unretained(this));
- callback_store = base::Bind(&AttachmentServiceProxyTest::IncrementStore,
- base::Unretained(this));
count_callback_get_or_download = 0;
count_callback_drop = 0;
- count_callback_store = 0;
}
virtual void TearDown()
@@ -136,12 +133,6 @@ class AttachmentServiceProxyTest : public testing::Test,
++count_callback_drop;
}
- // a StoreCallback
- void IncrementStore(const AttachmentService::StoreResult&) {
- CalledOnValidThread();
- ++count_callback_store;
- }
-
void WaitForStubThread() {
base::WaitableEvent done(false, false);
stub_thread->message_loop()->PostTask(
@@ -157,23 +148,24 @@ class AttachmentServiceProxyTest : public testing::Test,
AttachmentService::GetOrDownloadCallback callback_get_or_download;
AttachmentService::DropCallback callback_drop;
- AttachmentService::StoreCallback callback_store;
// number of times callback_get_or_download was invoked
int count_callback_get_or_download;
// number of times callback_drop was invoked
int count_callback_drop;
- // number of times callback_store was invoked
- int count_callback_store;
};
-// Verify that each of AttachmentServiceProxy's callback methods (those that
-// take callbacks) are invoked on the stub and that the passed callbacks are
-// invoked in this thread.
-TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) {
+TEST_F(AttachmentServiceProxyTest, GetStore) {
+ EXPECT_EQ(NULL, proxy->GetStore());
+}
+
+// Verify that each of AttachmentServiceProxy's methods are invoked on the stub.
+// Verify that the methods that take callbacks invoke passed callbacks on this
+// thread.
+TEST_F(AttachmentServiceProxyTest, MethodsAreProxied) {
proxy->GetOrDownloadAttachments(AttachmentIdList(), callback_get_or_download);
proxy->DropAttachments(AttachmentIdList(), callback_drop);
- proxy->StoreAttachments(AttachmentList(), callback_store);
+ proxy->UploadAttachments(AttachmentIdSet());
// Wait for the posted calls to execute in the stub thread.
WaitForStubThread();
EXPECT_EQ(3, stub->GetCallCount());
@@ -185,7 +177,6 @@ TEST_F(AttachmentServiceProxyTest, MethodsWithCallbacksAreProxied) {
loop.RunUntilIdle();
EXPECT_EQ(1, count_callback_get_or_download);
EXPECT_EQ(1, count_callback_drop);
- EXPECT_EQ(1, count_callback_store);
}
// Verify that it's safe to use an AttachmentServiceProxy even after its wrapped
diff --git a/sync/internal_api/public/attachments/attachment_service.h b/sync/internal_api/public/attachments/attachment_service.h
index 6ac8cc6..758f960 100644
--- a/sync/internal_api/public/attachments/attachment_service.h
+++ b/sync/internal_api/public/attachments/attachment_service.h
@@ -14,6 +14,7 @@
namespace syncer {
+class AttachmentStore;
class SyncData;
// AttachmentService is responsible for managing a model type's attachments.
@@ -44,16 +45,6 @@ class SYNC_EXPORT AttachmentService {
typedef base::Callback<void(const DropResult&)> DropCallback;
- // The result of a StoreAttachments operation.
- enum StoreResult {
- STORE_SUCCESS, // No error, all attachments stored (at least
- // locally).
- STORE_UNSPECIFIED_ERROR, // An unspecified error occurred. Some or all
- // attachments may not have been stored.
- };
-
- typedef base::Callback<void(const StoreResult&)> StoreCallback;
-
// An interface that embedder code implements to be notified about different
// events that originate from AttachmentService.
// This interface will be called from the same thread AttachmentService was
@@ -70,6 +61,11 @@ class SYNC_EXPORT AttachmentService {
AttachmentService();
virtual ~AttachmentService();
+ // Return a pointer to the AttachmentStore owned by this object.
+ //
+ // May return NULL.
+ virtual AttachmentStore* GetStore() = 0;
+
// See SyncData::GetOrDownloadAttachments.
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
@@ -79,12 +75,18 @@ class SYNC_EXPORT AttachmentService {
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) = 0;
- // Store |attachments| on device and (eventually) upload them to the server.
+ // Schedules the attachments identified by |attachment_ids| to be uploaded to
+ // the server.
+ //
+ // Assumes the attachments are already in the attachment store.
+ //
+ // A request to upload attachments is persistent in that uploads will be
+ // automatically retried if transient errors occur.
+ //
+ // A request to upload attachments does not persist across restarts of Chrome.
//
- // Invokes |callback| once the attachments have been written to device
- // storage.
- virtual void StoreAttachments(const AttachmentList& attachments,
- const StoreCallback& callback) = 0;
+ // Invokes OnAttachmentUploaded on the Delegate (if provided).
+ virtual void UploadAttachments(const AttachmentIdSet& attachment_ids) = 0;
};
} // namespace syncer
diff --git a/sync/internal_api/public/attachments/attachment_service_impl.h b/sync/internal_api/public/attachments/attachment_service_impl.h
index 53f6889..1fa0c54 100644
--- a/sync/internal_api/public/attachments/attachment_service_impl.h
+++ b/sync/internal_api/public/attachments/attachment_service_impl.h
@@ -5,6 +5,8 @@
#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
+#include <deque>
+
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h"
@@ -43,13 +45,14 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
static scoped_ptr<syncer::AttachmentService> CreateForTest();
// AttachmentService implementation.
- virtual void GetOrDownloadAttachments(const AttachmentIdList& attachment_ids,
- const GetOrDownloadCallback& callback)
- OVERRIDE;
+ virtual AttachmentStore* GetStore() OVERRIDE;
+ virtual void GetOrDownloadAttachments(
+ const AttachmentIdList& attachment_ids,
+ const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE;
- virtual void StoreAttachments(const AttachmentList& attachments,
- const StoreCallback& callback) OVERRIDE;
+ virtual void UploadAttachments(
+ const AttachmentIdSet& attachment_ids) OVERRIDE;
private:
class GetOrDownloadState;
@@ -60,14 +63,17 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
void DropDone(const DropCallback& callback,
const AttachmentStore::Result& result);
- void WriteDone(const StoreCallback& callback,
- const AttachmentStore::Result& result);
void UploadDone(const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id);
void DownloadDone(const scoped_refptr<GetOrDownloadState>& state,
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment);
+ void ProcessQueuedUploads();
+ void ReadDoneNowUpload(
+ const AttachmentStore::Result& result,
+ scoped_ptr<AttachmentMap> attachments,
+ scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
const scoped_ptr<AttachmentStore> attachment_store_;
@@ -80,6 +86,13 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// May be null.
Delegate* delegate_;
+ // Queue of attachment ids to be uploaded. Every entry in this queue should
+ // also exist in ids_in_queue_.
+ std::deque<AttachmentId> queue_;
+
+ // Ids of attachments currently being uploaded or queued for upload.
+ AttachmentIdSet ids_in_queue_;
+
// Must be last data member.
base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_;
diff --git a/sync/internal_api/public/attachments/attachment_service_proxy.h b/sync/internal_api/public/attachments/attachment_service_proxy.h
index fb7c08c..fa51457 100644
--- a/sync/internal_api/public/attachments/attachment_service_proxy.h
+++ b/sync/internal_api/public/attachments/attachment_service_proxy.h
@@ -51,13 +51,16 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService {
virtual ~AttachmentServiceProxy();
// AttachmentService implementation.
+ //
+ // GetStore always returns NULL.
+ virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE;
- virtual void StoreAttachments(const AttachmentList& attachment,
- const StoreCallback& callback) OVERRIDE;
+ virtual void UploadAttachments(
+ const AttachmentIdSet& attachment_ids) OVERRIDE;
protected:
// Core does the work of proxying calls to AttachmentService methods from one
@@ -80,13 +83,14 @@ class SYNC_EXPORT AttachmentServiceProxy : public AttachmentService {
Core(const base::WeakPtr<syncer::AttachmentService>& wrapped);
// AttachmentService implementation.
+ virtual AttachmentStore* GetStore() OVERRIDE;
virtual void GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) OVERRIDE;
virtual void DropAttachments(const AttachmentIdList& attachment_ids,
const DropCallback& callback) OVERRIDE;
- virtual void StoreAttachments(const AttachmentList& attachment,
- const StoreCallback& callback) OVERRIDE;
+ virtual void UploadAttachments(
+ const AttachmentIdSet& attachment_ids) OVERRIDE;
protected:
friend class base::RefCountedThreadSafe<Core>;