summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--chrome/browser/sync/profile_sync_components_factory_impl.cc12
-rw-r--r--components/sync_driver/generic_change_processor_unittest.cc4
-rw-r--r--sync/internal_api/attachments/attachment_downloader_impl.cc21
-rw-r--r--sync/internal_api/attachments/attachment_downloader_impl_unittest.cc6
-rw-r--r--sync/internal_api/attachments/attachment_service_impl.cc92
-rw-r--r--sync/internal_api/attachments/attachment_service_impl_unittest.cc122
-rw-r--r--sync/internal_api/attachments/attachment_uploader_impl.cc29
-rw-r--r--sync/internal_api/attachments/attachment_uploader_impl_unittest.cc41
-rw-r--r--sync/internal_api/attachments/task_queue.cc5
-rw-r--r--sync/internal_api/attachments/task_queue_unittest.cc197
-rw-r--r--sync/internal_api/public/attachments/attachment_downloader.h1
-rw-r--r--sync/internal_api/public/attachments/attachment_service_impl.h34
-rw-r--r--sync/internal_api/public/attachments/attachment_uploader.h1
-rw-r--r--sync/internal_api/public/attachments/task_queue.h270
-rw-r--r--sync/sync.gyp2
-rw-r--r--sync/sync_tests.gypi1
16 files changed, 700 insertions, 138 deletions
diff --git a/chrome/browser/sync/profile_sync_components_factory_impl.cc b/chrome/browser/sync/profile_sync_components_factory_impl.cc
index 09b2d08..d68f582 100644
--- a/chrome/browser/sync/profile_sync_components_factory_impl.cc
+++ b/chrome/browser/sync/profile_sync_components_factory_impl.cc
@@ -630,12 +630,20 @@ ProfileSyncComponentsFactoryImpl::CreateAttachmentService(
token_service_provider);
}
+ // It is important that the initial backoff delay is relatively large. For
+ // whatever reason, the server may fail all requests for a short period of
+ // time. When this happens we don't want to overwhelm the server with
+ // requests so we use a large initial backoff.
+ const base::TimeDelta initial_backoff_delay =
+ base::TimeDelta::FromMinutes(30);
+ const base::TimeDelta max_backoff_delay = base::TimeDelta::FromHours(4);
scoped_ptr<syncer::AttachmentService> attachment_service(
new syncer::AttachmentServiceImpl(attachment_store,
attachment_uploader.Pass(),
attachment_downloader.Pass(),
- delegate));
-
+ delegate,
+ initial_backoff_delay,
+ max_backoff_delay));
return attachment_service.Pass();
}
diff --git a/components/sync_driver/generic_change_processor_unittest.cc b/components/sync_driver/generic_change_processor_unittest.cc
index 73bd92d..9ea4e43 100644
--- a/components/sync_driver/generic_change_processor_unittest.cc
+++ b/components/sync_driver/generic_change_processor_unittest.cc
@@ -56,7 +56,9 @@ MockAttachmentService::MockAttachmentService(
new syncer::FakeAttachmentUploader),
scoped_ptr<syncer::AttachmentDownloader>(
new syncer::FakeAttachmentDownloader),
- NULL) {
+ NULL,
+ base::TimeDelta(),
+ base::TimeDelta()) {
}
MockAttachmentService::~MockAttachmentService() {
diff --git a/sync/internal_api/attachments/attachment_downloader_impl.cc b/sync/internal_api/attachments/attachment_downloader_impl.cc
index d43b3ea..a7a43b4 100644
--- a/sync/internal_api/attachments/attachment_downloader_impl.cc
+++ b/sync/internal_api/attachments/attachment_downloader_impl.cc
@@ -115,7 +115,7 @@ void AttachmentDownloaderImpl::OnGetTokenFailure(
DownloadState* download_state = *iter;
scoped_refptr<base::RefCountedString> null_attachment_data;
ReportResult(
- *download_state, DOWNLOAD_UNSPECIFIED_ERROR, null_attachment_data);
+ *download_state, DOWNLOAD_TRANSIENT_ERROR, null_attachment_data);
DCHECK(state_map_.find(download_state->attachment_url) != state_map_.end());
state_map_.erase(download_state->attachment_url);
}
@@ -133,22 +133,28 @@ void AttachmentDownloaderImpl::OnURLFetchComplete(
const DownloadState& download_state = *iter->second;
DCHECK(source == download_state.url_fetcher.get());
- DownloadResult result = DOWNLOAD_UNSPECIFIED_ERROR;
+ DownloadResult result = DOWNLOAD_TRANSIENT_ERROR;
scoped_refptr<base::RefCountedString> attachment_data;
- if (source->GetResponseCode() == net::HTTP_OK) {
+ const int response_code = source->GetResponseCode();
+ if (response_code == net::HTTP_OK) {
result = DOWNLOAD_SUCCESS;
std::string data_as_string;
source->GetResponseAsString(&data_as_string);
attachment_data = base::RefCountedString::TakeString(&data_as_string);
- } else if (source->GetResponseCode() == net::HTTP_UNAUTHORIZED) {
+ } else if (response_code == net::HTTP_UNAUTHORIZED) {
+ // Server tells us we've got a bad token so invalidate it.
OAuth2TokenServiceRequest::InvalidateToken(token_service_provider_.get(),
account_id_,
oauth2_scopes_,
download_state.access_token);
- // TODO(pavely): crbug/380437. This is transient error. Request new access
- // token for this DownloadState. The only trick is to do it with exponential
- // backoff.
+ // Fail the request, but indicate that it may be successful if retried.
+ result = DOWNLOAD_TRANSIENT_ERROR;
+ } else if (response_code == net::HTTP_FORBIDDEN) {
+ // User is not allowed to use attachments. Retrying won't help.
+ result = DOWNLOAD_UNSPECIFIED_ERROR;
+ } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
+ result = DOWNLOAD_TRANSIENT_ERROR;
}
ReportResult(download_state, result, attachment_data);
state_map_.erase(iter);
@@ -159,6 +165,7 @@ scoped_ptr<net::URLFetcher> AttachmentDownloaderImpl::CreateFetcher(
const std::string& access_token) {
scoped_ptr<net::URLFetcher> url_fetcher(
net::URLFetcher::Create(GURL(url), net::URLFetcher::GET, this));
+ url_fetcher->SetAutomaticallyRetryOn5xx(false);
const std::string auth_header("Authorization: Bearer " + access_token);
url_fetcher->AddExtraRequestHeader(auth_header);
url_fetcher->SetRequestContext(url_request_context_getter_.get());
diff --git a/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc b/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc
index 01b12c1..ee57a5b 100644
--- a/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc
+++ b/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc
@@ -319,7 +319,7 @@ TEST_F(AttachmentDownloaderImplTest, RequestAccessTokenFails) {
GoogleServiceAuthError(GoogleServiceAuthError::INVALID_GAIA_CREDENTIALS));
RunMessageLoop();
// Only id2 should fail.
- VerifyDownloadResult(id2, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR);
+ VerifyDownloadResult(id2, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR);
// Complete request for id1.
CompleteDownload(net::HTTP_OK);
VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_SUCCESS);
@@ -337,7 +337,7 @@ TEST_F(AttachmentDownloaderImplTest, URLFetcher_BadToken) {
// invalidation.
CompleteDownload(net::HTTP_UNAUTHORIZED);
EXPECT_EQ(1, token_service()->num_invalidate_token());
- VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR);
+ VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR);
}
TEST_F(AttachmentDownloaderImplTest, URLFetcher_ServiceUnavailable) {
@@ -352,7 +352,7 @@ TEST_F(AttachmentDownloaderImplTest, URLFetcher_ServiceUnavailable) {
// shouldn't be invalidated.
CompleteDownload(net::HTTP_SERVICE_UNAVAILABLE);
EXPECT_EQ(0, token_service()->num_invalidate_token());
- VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR);
+ VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR);
}
} // namespace syncer
diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc
index 94ffead..5ba540b 100644
--- a/sync/internal_api/attachments/attachment_service_impl.cc
+++ b/sync/internal_api/attachments/attachment_service_impl.cc
@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "base/thread_task_runner_handle.h"
+#include "base/time/time.h"
#include "sync/api/attachments/attachment.h"
#include "sync/api/attachments/fake_attachment_store.h"
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
@@ -113,7 +114,9 @@ AttachmentServiceImpl::AttachmentServiceImpl(
scoped_refptr<AttachmentStore> attachment_store,
scoped_ptr<AttachmentUploader> attachment_uploader,
scoped_ptr<AttachmentDownloader> attachment_downloader,
- Delegate* delegate)
+ Delegate* delegate,
+ const base::TimeDelta& initial_backoff_delay,
+ const base::TimeDelta& max_backoff_delay)
: attachment_store_(attachment_store),
attachment_uploader_(attachment_uploader.Pass()),
attachment_downloader_(attachment_downloader.Pass()),
@@ -121,6 +124,16 @@ AttachmentServiceImpl::AttachmentServiceImpl(
weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread());
DCHECK(attachment_store_.get());
+
+ // TODO(maniscalco): Observe network connectivity change events. When the
+ // network becomes disconnected, consider suspending queue dispatch. When
+ // connectivity is restored, consider clearing any dispatch backoff (bug
+ // 411981).
+ upload_task_queue_.reset(new TaskQueue<AttachmentId>(
+ base::Bind(&AttachmentServiceImpl::BeginUpload,
+ weak_ptr_factory_.GetWeakPtr()),
+ initial_backoff_delay,
+ max_backoff_delay));
}
AttachmentServiceImpl::~AttachmentServiceImpl() {
@@ -139,7 +152,9 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
new syncer::AttachmentServiceImpl(attachment_store,
attachment_uploader.Pass(),
attachment_downloader.Pass(),
- NULL));
+ NULL,
+ base::TimeDelta(),
+ base::TimeDelta()));
return attachment_service.Pass();
}
@@ -216,12 +231,22 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback,
void AttachmentServiceImpl::UploadDone(
const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id) {
- ids_in_queue_.erase(attachment_id);
- // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
- if (result != AttachmentUploader::UPLOAD_SUCCESS)
- return;
- if (delegate_) {
- delegate_->OnAttachmentUploaded(attachment_id);
+ DCHECK(CalledOnValidThread());
+ switch (result) {
+ case AttachmentUploader::UPLOAD_SUCCESS:
+ upload_task_queue_->MarkAsSucceeded(attachment_id);
+ if (delegate_) {
+ delegate_->OnAttachmentUploaded(attachment_id);
+ }
+ break;
+ case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
+ upload_task_queue_->MarkAsFailed(attachment_id);
+ upload_task_queue_->AddToQueue(attachment_id);
+ break;
+ case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
+ // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
+ upload_task_queue_->MarkAsFailed(attachment_id);
+ break;
}
}
@@ -230,46 +255,36 @@ void AttachmentServiceImpl::DownloadDone(
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment) {
- if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) {
- state->AddAttachment(*attachment.get());
- } else {
- state->AddUnavailableAttachmentId(attachment_id);
+ switch (result) {
+ case AttachmentDownloader::DOWNLOAD_SUCCESS:
+ state->AddAttachment(*attachment.get());
+ break;
+ case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
+ case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
+ state->AddUnavailableAttachmentId(attachment_id);
+ break;
}
}
+void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
+ DCHECK(CalledOnValidThread());
+ AttachmentIdList attachment_ids;
+ attachment_ids.push_back(attachment_id);
+ attachment_store_->Read(attachment_ids,
+ base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
void AttachmentServiceImpl::UploadAttachments(
const AttachmentIdSet& attachment_ids) {
DCHECK(CalledOnValidThread());
if (!attachment_uploader_.get()) {
return;
}
-
- // Enqueue the attachment ids that aren't already in the queue.
AttachmentIdSet::const_iterator iter = attachment_ids.begin();
AttachmentIdSet::const_iterator end = attachment_ids.end();
for (; iter != end; ++iter) {
- if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) {
- queue_.push_back(*iter);
- ids_in_queue_.insert(*iter);
- }
- }
-
- ProcessQueuedUploads();
-}
-
-void AttachmentServiceImpl::ProcessQueuedUploads() {
- DCHECK(CalledOnValidThread());
- // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of
- // concurrent uploads and apply backoff on failure.
- while (!queue_.empty()) {
- const AttachmentId id = queue_.front();
- queue_.pop_front();
- AttachmentIdList attachment_ids;
- attachment_ids.push_back(id);
- attachment_store_->Read(
- attachment_ids,
- base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
- weak_ptr_factory_.GetWeakPtr()));
+ upload_task_queue_->AddToQueue(*iter);
}
}
@@ -281,6 +296,11 @@ void AttachmentServiceImpl::ReadDoneNowUpload(
if (!unavailable_attachment_ids->empty()) {
// TODO(maniscalco): We failed to read some attachments. What should we do
// now?
+ AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
+ AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
+ for (; iter != end; ++iter) {
+ upload_task_queue_->Cancel(*iter);
+ }
}
AttachmentMap::const_iterator iter = attachments->begin();
diff --git a/sync/internal_api/attachments/attachment_service_impl_unittest.cc b/sync/internal_api/attachments/attachment_service_impl_unittest.cc
index 6148033..dbbf89f 100644
--- a/sync/internal_api/attachments/attachment_service_impl_unittest.cc
+++ b/sync/internal_api/attachments/attachment_service_impl_unittest.cc
@@ -191,7 +191,9 @@ class AttachmentServiceImplTest : public testing::Test,
new AttachmentServiceImpl(attachment_store,
uploader.PassAs<AttachmentUploader>(),
downloader.PassAs<AttachmentDownloader>(),
- delegate));
+ delegate,
+ base::TimeDelta(),
+ base::TimeDelta()));
}
AttachmentService* attachment_service() { return attachment_service_.get(); }
@@ -344,32 +346,31 @@ TEST_F(AttachmentServiceImplTest, GetOrDownload_NoDownloader) {
TEST_F(AttachmentServiceImplTest, UploadAttachments_Success) {
AttachmentIdSet attachment_ids;
- const size_t num_attachments = 3;
+ const unsigned num_attachments = 3;
for (unsigned i = 0; i < num_attachments; ++i) {
attachment_ids.insert(AttachmentId::Create());
}
attachment_service()->UploadAttachments(attachment_ids);
- RunLoop();
- // See that the service has issued reads for the attachments, but not yet
- // uploaded anything.
- EXPECT_EQ(num_attachments, store()->read_ids.size());
- EXPECT_EQ(0U, uploader()->upload_requests.size());
+
for (unsigned i = 0; i < num_attachments; ++i) {
+ RunLoop();
+ // See that the service has issued a read for at least one of the
+ // attachments.
+ ASSERT_GE(1U, store()->read_ids.size());
store()->RespondToRead(attachment_ids);
- }
-
- RunLoop();
- EXPECT_EQ(0U, store()->read_ids.size());
- EXPECT_EQ(num_attachments, uploader()->upload_requests.size());
- AttachmentIdSet::const_iterator iter = attachment_ids.begin();
- const AttachmentIdSet::const_iterator end = attachment_ids.end();
- for (; iter != end; ++iter) {
- uploader()->RespondToUpload(*iter, AttachmentUploader::UPLOAD_SUCCESS);
+ RunLoop();
+ ASSERT_GE(1U, uploader()->upload_requests.size());
+ uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
+ AttachmentUploader::UPLOAD_SUCCESS);
}
RunLoop();
+ ASSERT_EQ(0U, store()->read_ids.size());
+ ASSERT_EQ(0U, uploader()->upload_requests.size());
// See that all the attachments were uploaded.
ASSERT_EQ(attachment_ids.size(), on_attachment_uploaded_list().size());
+ AttachmentIdSet::const_iterator iter = attachment_ids.begin();
+ const AttachmentIdSet::const_iterator end = attachment_ids.end();
for (iter = attachment_ids.begin(); iter != end; ++iter) {
EXPECT_THAT(on_attachment_uploaded_list(), testing::Contains(*iter));
}
@@ -400,47 +401,45 @@ TEST_F(AttachmentServiceImplTest, UploadAttachments_SomeMissingFromStore) {
AttachmentIdSet attachment_ids;
attachment_ids.insert(AttachmentId::Create());
attachment_ids.insert(AttachmentId::Create());
-
attachment_service()->UploadAttachments(attachment_ids);
RunLoop();
- EXPECT_EQ(2U, store()->read_ids.size());
- EXPECT_EQ(0U, uploader()->upload_requests.size());
+ ASSERT_GE(1U, store()->read_ids.size());
+
+ ASSERT_EQ(0U, uploader()->upload_requests.size());
store()->RespondToRead(attachment_ids);
- EXPECT_EQ(1U, store()->read_ids.size());
- // Not found!
- store()->RespondToRead(AttachmentIdSet());
- EXPECT_EQ(0U, store()->read_ids.size());
RunLoop();
+ ASSERT_EQ(1U, uploader()->upload_requests.size());
- // One attachment went missing so we should see only one upload request.
- EXPECT_EQ(1U, uploader()->upload_requests.size());
uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
AttachmentUploader::UPLOAD_SUCCESS);
RunLoop();
-
- // See that the delegate was called for only one.
ASSERT_EQ(1U, on_attachment_uploaded_list().size());
+ ASSERT_GE(1U, store()->read_ids.size());
+ // Not found!
+ store()->RespondToRead(AttachmentIdSet());
+ RunLoop();
+ // No upload requests since the read failed.
+ ASSERT_EQ(0U, uploader()->upload_requests.size());
}
TEST_F(AttachmentServiceImplTest, UploadAttachments_AllMissingFromStore) {
AttachmentIdSet attachment_ids;
- attachment_ids.insert(AttachmentId::Create());
- attachment_ids.insert(AttachmentId::Create());
-
+ const unsigned num_attachments = 2;
+ for (unsigned i = 0; i < num_attachments; ++i) {
+ attachment_ids.insert(AttachmentId::Create());
+ }
attachment_service()->UploadAttachments(attachment_ids);
- RunLoop();
- EXPECT_EQ(2U, store()->read_ids.size());
- EXPECT_EQ(0U, uploader()->upload_requests.size());
- // None found!
- store()->RespondToRead(AttachmentIdSet());
- store()->RespondToRead(AttachmentIdSet());
- EXPECT_EQ(0U, store()->read_ids.size());
+
+ for (unsigned i = 0; i < num_attachments; ++i) {
+ RunLoop();
+ ASSERT_GE(1U, store()->read_ids.size());
+ // None found!
+ store()->RespondToRead(AttachmentIdSet());
+ }
RunLoop();
// Nothing uploaded.
EXPECT_EQ(0U, uploader()->upload_requests.size());
- RunLoop();
-
// See that the delegate was never called.
ASSERT_EQ(0U, on_attachment_uploaded_list().size());
}
@@ -461,32 +460,31 @@ TEST_F(AttachmentServiceImplTest, UploadAttachments_NoUploader) {
// Upload three attachments. For one of them, server responds with error.
TEST_F(AttachmentServiceImplTest, UploadAttachments_OneUploadFails) {
AttachmentIdSet attachment_ids;
- attachment_ids.insert(AttachmentId::Create());
- attachment_ids.insert(AttachmentId::Create());
- attachment_ids.insert(AttachmentId::Create());
-
+ const unsigned num_attachments = 3;
+ for (unsigned i = 0; i < num_attachments; ++i) {
+ attachment_ids.insert(AttachmentId::Create());
+ }
attachment_service()->UploadAttachments(attachment_ids);
- RunLoop();
- EXPECT_EQ(3U, store()->read_ids.size());
- EXPECT_EQ(0U, uploader()->upload_requests.size());
- // All attachments found.
- store()->RespondToRead(attachment_ids);
- store()->RespondToRead(attachment_ids);
- store()->RespondToRead(attachment_ids);
- RunLoop();
-
- EXPECT_EQ(3U, uploader()->upload_requests.size());
- uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
- AttachmentUploader::UPLOAD_SUCCESS);
- uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
- AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR);
- uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
- AttachmentUploader::UPLOAD_SUCCESS);
- EXPECT_EQ(0U, uploader()->upload_requests.size());
+ for (unsigned i = 0; i < num_attachments; ++i) {
+ RunLoop();
+ ASSERT_GE(1U, store()->read_ids.size());
+ store()->RespondToRead(attachment_ids);
+ RunLoop();
+ ASSERT_EQ(1U, uploader()->upload_requests.size());
+ AttachmentUploader::UploadResult result =
+ AttachmentUploader::UPLOAD_SUCCESS;
+ // Fail the 2nd one.
+ if (i == 2U) {
+ result = AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR;
+ } else {
+ result = AttachmentUploader::UPLOAD_SUCCESS;
+ }
+ uploader()->RespondToUpload(uploader()->upload_requests.begin()->first,
+ result);
+ }
RunLoop();
-
- EXPECT_EQ(2U, on_attachment_uploaded_list().size());
+ ASSERT_EQ(2U, on_attachment_uploaded_list().size());
}
} // namespace syncer
diff --git a/sync/internal_api/attachments/attachment_uploader_impl.cc b/sync/internal_api/attachments/attachment_uploader_impl.cc
index 4d197eb..59885b7 100644
--- a/sync/internal_api/attachments/attachment_uploader_impl.cc
+++ b/sync/internal_api/attachments/attachment_uploader_impl.cc
@@ -133,20 +133,22 @@ const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() {
void AttachmentUploaderImpl::UploadState::OnURLFetchComplete(
const net::URLFetcher* source) {
DCHECK(CalledOnValidThread());
- UploadResult result = UPLOAD_UNSPECIFIED_ERROR;
+ UploadResult result = UPLOAD_TRANSIENT_ERROR;
AttachmentId attachment_id = attachment_.GetId();
- if (source->GetResponseCode() == net::HTTP_OK) {
+ const int response_code = source->GetResponseCode();
+ if (response_code == net::HTTP_OK) {
result = UPLOAD_SUCCESS;
- } else if (source->GetResponseCode() == net::HTTP_UNAUTHORIZED) {
- // TODO(maniscalco): One possibility is that we received a 401 because our
- // access token has expired. We should probably fetch a new access token
- // and retry this upload before giving up and reporting failure to our
- // caller (bug 380437).
+ } else if (response_code == net::HTTP_UNAUTHORIZED) {
+ // Server tells us we've got a bad token so invalidate it.
OAuth2TokenServiceRequest::InvalidateToken(
token_service_provider_, account_id_, scopes_, access_token_);
- } else {
- // TODO(maniscalco): Once the protocol is better defined, deal with the
- // various HTTP response codes we may encounter.
+ // Fail the request, but indicate that it may be successful if retried.
+ result = UPLOAD_TRANSIENT_ERROR;
+ } else if (response_code == net::HTTP_FORBIDDEN) {
+ // User is not allowed to use attachments. Retrying won't help.
+ result = UPLOAD_UNSPECIFIED_ERROR;
+ } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) {
+ result = UPLOAD_TRANSIENT_ERROR;
}
ReportResult(result, attachment_id);
}
@@ -160,6 +162,7 @@ void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess(
access_token_ = access_token;
fetcher_.reset(
net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this));
+ fetcher_->SetAutomaticallyRetryOn5xx(false);
fetcher_->SetRequestContext(url_request_context_getter_.get());
// TODO(maniscalco): Is there a better way? Copying the attachment data into
// a string feels wrong given how large attachments may be (several MBs). If
@@ -184,7 +187,11 @@ void AttachmentUploaderImpl::UploadState::OnGetTokenFailure(
const GoogleServiceAuthError& error) {
DCHECK_EQ(access_token_request_.get(), request);
access_token_request_.reset();
- ReportResult(UPLOAD_UNSPECIFIED_ERROR, attachment_.GetId());
+ // TODO(maniscalco): We treat this as a transient error, but it may in fact be
+ // a very long lived error and require user action. Consider differentiating
+ // between the causes of GetToken failure and act accordingly. Think about
+ // the causes of GetToken failure. Are there (bug 412802).
+ ReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId());
}
void AttachmentUploaderImpl::UploadState::GetToken() {
diff --git a/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc b/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc
index 9cfc625..e682253 100644
--- a/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc
+++ b/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc
@@ -509,7 +509,7 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_FailToGetToken) {
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
- EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]);
+ EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]);
ASSERT_EQ(1U, attachment_ids().size());
EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
@@ -531,6 +531,43 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_ServiceUnavilable) {
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
+ EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]);
+ ASSERT_EQ(1U, attachment_ids().size());
+ EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
+
+ // See that the HTTP server received one request.
+ ASSERT_EQ(1U, http_requests_received().size());
+ const HttpRequest& http_request = http_requests_received().front();
+ EXPECT_EQ(net::test_server::METHOD_POST, http_request.method);
+ std::string expected_relative_url(kAttachments +
+ attachment.GetId().GetProto().unique_id());
+ EXPECT_EQ(expected_relative_url, http_request.relative_url);
+ EXPECT_TRUE(http_request.has_content);
+ EXPECT_EQ(kAttachmentData, http_request.content);
+ std::string expected_header(kAuthorization);
+ const std::string header_name(kAuthorization);
+ const std::string header_value(std::string("Bearer ") + kAccessToken);
+ EXPECT_THAT(http_request.headers,
+ testing::Contains(testing::Pair(header_name, header_value)));
+
+ // See that we did not invalidate the token.
+ ASSERT_EQ(0, token_service().num_invalidate_token());
+}
+
+// Verify that we "403 Forbidden" as a non-transient error.
+TEST_F(AttachmentUploaderImplTest, UploadAttachment_Forbidden) {
+ token_service().AddAccount(kAccountId);
+ request_handler().SetStatusCode(net::HTTP_FORBIDDEN);
+
+ scoped_refptr<base::RefCountedString> some_data(new base::RefCountedString);
+ some_data->data() = kAttachmentData;
+ Attachment attachment = Attachment::Create(some_data);
+ uploader()->UploadAttachment(attachment, upload_callback());
+
+ RunAndWaitFor(1);
+
+ // See that the done callback was invoked.
+ ASSERT_EQ(1U, upload_results().size());
EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]);
ASSERT_EQ(1U, attachment_ids().size());
EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
@@ -569,7 +606,7 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_BadToken) {
// See that the done callback was invoked.
ASSERT_EQ(1U, upload_results().size());
- EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]);
+ EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]);
ASSERT_EQ(1U, attachment_ids().size());
EXPECT_EQ(attachment.GetId(), attachment_ids()[0]);
diff --git a/sync/internal_api/attachments/task_queue.cc b/sync/internal_api/attachments/task_queue.cc
new file mode 100644
index 0000000..a89e496
--- /dev/null
+++ b/sync/internal_api/attachments/task_queue.cc
@@ -0,0 +1,5 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/internal_api/public/attachments/task_queue.h"
diff --git a/sync/internal_api/attachments/task_queue_unittest.cc b/sync/internal_api/attachments/task_queue_unittest.cc
new file mode 100644
index 0000000..ac4ecd6
--- /dev/null
+++ b/sync/internal_api/attachments/task_queue_unittest.cc
@@ -0,0 +1,197 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/internal_api/public/attachments/task_queue.h"
+
+#include <vector>
+
+#include "base/bind.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "base/timer/mock_timer.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using base::TimeDelta;
+
+namespace syncer {
+
+namespace {
+
+const TimeDelta kZero;
+
+} // namespace
+
+class TaskQueueTest : public testing::Test {
+ protected:
+ TaskQueueTest() : weak_ptr_factory_(this) {
+ queue_.reset(new TaskQueue<int>(
+ base::Bind(&TaskQueueTest::Process, weak_ptr_factory_.GetWeakPtr()),
+ TimeDelta::FromMinutes(1),
+ TimeDelta::FromMinutes(8)));
+ }
+
+ void RunLoop() {
+ base::RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
+
+ void Process(const int& task) { dispatched_.push_back(task); }
+
+ base::MessageLoop message_loop_;
+ scoped_ptr<TaskQueue<int> > queue_;
+ std::vector<int> dispatched_;
+ base::WeakPtrFactory<TaskQueueTest> weak_ptr_factory_;
+};
+
+// See that at most one task is dispatched at a time.
+TEST_F(TaskQueueTest, AddToQueue_NoConcurrentTasks) {
+ queue_->AddToQueue(1);
+ queue_->AddToQueue(2);
+ RunLoop();
+
+ // Only one has been dispatched.
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ RunLoop();
+
+ // Still only one.
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsSucceeded(1);
+ RunLoop();
+
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(2, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsSucceeded(2);
+ RunLoop();
+
+ ASSERT_TRUE(dispatched_.empty());
+}
+
+// See that that the queue ignores duplicate adds.
+TEST_F(TaskQueueTest, AddToQueue_NoDuplicates) {
+ queue_->AddToQueue(1);
+ queue_->AddToQueue(1);
+ queue_->AddToQueue(2);
+ queue_->AddToQueue(1);
+ ASSERT_TRUE(dispatched_.empty());
+ RunLoop();
+
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsSucceeded(1);
+ RunLoop();
+
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(2, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsSucceeded(2);
+ RunLoop();
+
+ ASSERT_TRUE(dispatched_.empty());
+}
+
+// See that Retry works as expected.
+TEST_F(TaskQueueTest, Retry) {
+ scoped_ptr<base::MockTimer> timer_to_pass(new base::MockTimer(false, false));
+ base::MockTimer* mock_timer = timer_to_pass.get();
+ queue_->SetTimerForTest(timer_to_pass.PassAs<base::Timer>());
+
+ // 1st attempt.
+ queue_->AddToQueue(1);
+ ASSERT_TRUE(mock_timer->IsRunning());
+ ASSERT_EQ(TimeDelta(), mock_timer->GetCurrentDelay());
+ TimeDelta last_delay = mock_timer->GetCurrentDelay();
+ mock_timer->Fire();
+ RunLoop();
+
+ // 2nd attempt.
+ ASSERT_FALSE(mock_timer->IsRunning());
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsFailed(1);
+ queue_->AddToQueue(1);
+ ASSERT_TRUE(mock_timer->IsRunning());
+ EXPECT_GT(mock_timer->GetCurrentDelay(), last_delay);
+ EXPECT_LE(mock_timer->GetCurrentDelay(), TimeDelta::FromMinutes(1));
+ last_delay = mock_timer->GetCurrentDelay();
+ mock_timer->Fire();
+ RunLoop();
+
+ // 3rd attempt.
+ ASSERT_FALSE(mock_timer->IsRunning());
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsFailed(1);
+ queue_->AddToQueue(1);
+ ASSERT_TRUE(mock_timer->IsRunning());
+ EXPECT_GT(mock_timer->GetCurrentDelay(), last_delay);
+ last_delay = mock_timer->GetCurrentDelay();
+ mock_timer->Fire();
+ RunLoop();
+
+ // Give up.
+ ASSERT_FALSE(mock_timer->IsRunning());
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ dispatched_.clear();
+ queue_->Cancel(1);
+ ASSERT_FALSE(mock_timer->IsRunning());
+
+ // Try a different task. See the timer remains unchanged because the previous
+ // task was cancelled.
+ ASSERT_TRUE(dispatched_.empty());
+ queue_->AddToQueue(2);
+ ASSERT_TRUE(mock_timer->IsRunning());
+ EXPECT_GE(last_delay, mock_timer->GetCurrentDelay());
+ last_delay = mock_timer->GetCurrentDelay();
+ mock_timer->Fire();
+ RunLoop();
+
+ // Mark this one as succeeding, which will clear the backoff delay.
+ ASSERT_FALSE(mock_timer->IsRunning());
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(2, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsSucceeded(2);
+ ASSERT_FALSE(mock_timer->IsRunning());
+
+ // Add one last task and see that it's dispatched without delay because the
+ // previous one succeeded.
+ ASSERT_TRUE(dispatched_.empty());
+ queue_->AddToQueue(3);
+ ASSERT_TRUE(mock_timer->IsRunning());
+ EXPECT_LT(mock_timer->GetCurrentDelay(), last_delay);
+ last_delay = mock_timer->GetCurrentDelay();
+ mock_timer->Fire();
+ RunLoop();
+
+ // Clean up.
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(3, dispatched_.front());
+ dispatched_.clear();
+ queue_->MarkAsSucceeded(3);
+ ASSERT_FALSE(mock_timer->IsRunning());
+}
+
+TEST_F(TaskQueueTest, Cancel) {
+ queue_->AddToQueue(1);
+ RunLoop();
+
+ ASSERT_EQ(1U, dispatched_.size());
+ EXPECT_EQ(1, dispatched_.front());
+ dispatched_.clear();
+ queue_->Cancel(1);
+ RunLoop();
+
+ ASSERT_TRUE(dispatched_.empty());
+}
+
+} // namespace syncer
diff --git a/sync/internal_api/public/attachments/attachment_downloader.h b/sync/internal_api/public/attachments/attachment_downloader.h
index 93498ed..12db422 100644
--- a/sync/internal_api/public/attachments/attachment_downloader.h
+++ b/sync/internal_api/public/attachments/attachment_downloader.h
@@ -24,6 +24,7 @@ class SYNC_EXPORT AttachmentDownloader {
enum DownloadResult {
DOWNLOAD_SUCCESS, // No error, attachment was downloaded
// successfully.
+ DOWNLOAD_TRANSIENT_ERROR, // A transient error occurred, try again later.
DOWNLOAD_UNSPECIFIED_ERROR, // An unspecified error occurred.
};
diff --git a/sync/internal_api/public/attachments/attachment_service_impl.h b/sync/internal_api/public/attachments/attachment_service_impl.h
index 619b131..423656b 100644
--- a/sync/internal_api/public/attachments/attachment_service_impl.h
+++ b/sync/internal_api/public/attachments/attachment_service_impl.h
@@ -15,6 +15,7 @@
#include "sync/internal_api/public/attachments/attachment_service.h"
#include "sync/internal_api/public/attachments/attachment_service_proxy.h"
#include "sync/internal_api/public/attachments/attachment_uploader.h"
+#include "sync/internal_api/public/attachments/task_queue.h"
namespace syncer {
@@ -22,11 +23,6 @@ namespace syncer {
class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
public base::NonThreadSafe {
public:
- // |delegate| is optional delegate for AttachmentService to notify about
- // asynchronous events (AttachmentUploaded). Pass NULL if delegate is not
- // provided. AttachmentService doesn't take ownership of delegate, the pointer
- // must be valid throughout AttachmentService lifetime.
- //
// |attachment_uploader| is optional. If null, attachments will never be
// uploaded to the sync server and |delegate|'s OnAttachmentUploaded will
// never be invoked.
@@ -34,11 +30,26 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// |attachment_downloader| is optional. If null, attachments will never be
// downloaded. Only attachments in |attachment_store| will be returned from
// GetOrDownloadAttachments.
-
+ //
+ // |delegate| is optional delegate for AttachmentService to notify about
+ // asynchronous events (AttachmentUploaded). Pass NULL if delegate is not
+ // provided. AttachmentService doesn't take ownership of delegate, the pointer
+ // must be valid throughout AttachmentService lifetime.
+ //
+ // |initial_backoff_delay| the initial delay between upload attempts. This
+ // class automatically retries failed uploads. After the first failure, it
+ // will wait this amount of time until it tries again. After each failure,
+ // the delay is doubled until the |max_backoff_delay| is reached. A
+ // successful upload clears the delay.
+ //
+ // |max_backoff_delay| the maxmium delay between upload attempts when backed
+ // off.
AttachmentServiceImpl(scoped_refptr<AttachmentStore> attachment_store,
scoped_ptr<AttachmentUploader> attachment_uploader,
scoped_ptr<AttachmentDownloader> attachment_downloader,
- Delegate* delegate);
+ Delegate* delegate,
+ const base::TimeDelta& initial_backoff_delay,
+ const base::TimeDelta& max_backoff_delay);
virtual ~AttachmentServiceImpl();
// Create an AttachmentServiceImpl suitable for use in tests.
@@ -69,7 +80,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment);
- void ProcessQueuedUploads();
+ void BeginUpload(const AttachmentId& attachment_id);
void ReadDoneNowUpload(
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
@@ -86,12 +97,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// May be null.
Delegate* delegate_;
- // Queue of attachment ids to be uploaded. Every entry in this queue should
- // also exist in ids_in_queue_.
- std::deque<AttachmentId> queue_;
-
- // Ids of attachments currently being uploaded or queued for upload.
- AttachmentIdSet ids_in_queue_;
+ scoped_ptr<TaskQueue<AttachmentId> > upload_task_queue_;
// Must be last data member.
base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_;
diff --git a/sync/internal_api/public/attachments/attachment_uploader.h b/sync/internal_api/public/attachments/attachment_uploader.h
index 7c2f39c..0339e67 100644
--- a/sync/internal_api/public/attachments/attachment_uploader.h
+++ b/sync/internal_api/public/attachments/attachment_uploader.h
@@ -20,6 +20,7 @@ class SYNC_EXPORT AttachmentUploader {
enum UploadResult {
UPLOAD_SUCCESS, // No error, attachment was uploaded
// successfully.
+ UPLOAD_TRANSIENT_ERROR, // A transient error occurred, try again later.
UPLOAD_UNSPECIFIED_ERROR, // An unspecified error occurred.
};
diff --git a/sync/internal_api/public/attachments/task_queue.h b/sync/internal_api/public/attachments/task_queue.h
new file mode 100644
index 0000000..2466f3a
--- /dev/null
+++ b/sync/internal_api/public/attachments/task_queue.h
@@ -0,0 +1,270 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
+#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
+
+#include <deque>
+#include <set>
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/gtest_prod_util.h"
+#include "base/macros.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/threading/non_thread_safe.h"
+#include "base/time/time.h"
+#include "base/timer/timer.h"
+#include "net/base/backoff_entry.h"
+
+namespace syncer {
+
+// A queue that dispatches tasks, ignores duplicates, and provides backoff
+// semantics.
+//
+// |T| is the task type.
+//
+// For each task added to the queue, the HandleTaskCallback will eventually be
+// invoked. For each invocation, the user of TaskQueue must call exactly one of
+// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
+//
+// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
+//
+// Example usage:
+//
+// void Handle(const Foo& foo);
+// ...
+// TaskQueue<Foo> queue(base::Bind(&Handle),
+// base::TimeDelta::FromSeconds(1),
+// base::TimeDelta::FromMinutes(1));
+// ...
+// {
+// Foo foo;
+// // Add foo to the queue. At some point, Handle will be invoked in this
+// // message loop.
+// queue.AddToQueue(foo);
+// }
+// ...
+// void Handle(const Foo& foo) {
+// DoSomethingWith(foo);
+// // We must call one of the three methods to tell the queue how we're
+// // dealing with foo. Of course, we are free to call in the the context of
+// // this HandleTaskCallback or outside the context if we so choose.
+// if (SuccessfullyHandled(foo)) {
+// queue.MarkAsSucceeded(foo);
+// } else if (Failed(foo)) {
+// queue.MarkAsFailed(foo);
+// if (ShouldRetry(foo)) {
+// queue.AddToQueue(foo);
+// }
+// } else {
+// Cancel(foo);
+// }
+// }
+//
+template <typename T>
+class TaskQueue : base::NonThreadSafe {
+ public:
+ // A callback provided by users of the TaskQueue to handle tasks.
+ //
+ // This callback is invoked by the queue with a task to be handled. The
+ // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
+ // or |Cancel| to signify completion of the task.
+ typedef base::Callback<void(const T&)> HandleTaskCallback;
+
+ // Construct a TaskQueue.
+ //
+ // |callback| the callback to be invoked for handling tasks.
+ //
+ // |initial_backoff_delay| the initial amount of time the queue will wait
+ // before dispatching tasks after a failed task (see |MarkAsFailed|). May be
+ // zero. Subsequent failures will increase the delay up to
+ // |max_backoff_delay|.
+ //
+ // |max_backoff_delay| the maximum amount of time the queue will wait before
+ // dispatching tasks. May be zero. Must be greater than or equal to
+ // |initial_backoff_delay|.
+ TaskQueue(const HandleTaskCallback& callback,
+ const base::TimeDelta& initial_backoff_delay,
+ const base::TimeDelta& max_backoff_delay);
+
+ // Add |task| to the end of the queue.
+ //
+ // If |task| is already present (as determined by operator==) it is not added.
+ void AddToQueue(const T& task);
+
+ // Mark |task| as completing successfully.
+ //
+ // Marking a task as completing successfully will reduce or eliminate any
+ // backoff delay in effect.
+ //
+ // May only be called after the HandleTaskCallback has been invoked with
+ // |task|.
+ void MarkAsSucceeded(const T& task);
+
+ // Mark |task| as failed.
+ //
+ // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
+ // of subsequent tasks. Repeated failures will increase the delay.
+ //
+ // May only be called after the HandleTaskCallback has been invoked with
+ // |task|.
+ void MarkAsFailed(const T& task);
+
+ // Cancel |task|.
+ //
+ // |task| is removed from the queue and will not be retried. Does not affect
+ // the backoff delay.
+ //
+ // May only be called after the HandleTaskCallback has been invoked with
+ // |task|.
+ void Cancel(const T& task);
+
+ private:
+ FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry);
+
+ // Use |timer| for scheduled events.
+ //
+ // Used in tests. See also MockTimer.
+ void SetTimerForTest(scoped_ptr<base::Timer> timer);
+ void FinishTask(const T& task);
+ void ScheduleDispatch();
+ void Dispatch();
+ // Return true if we should dispatch tasks.
+ bool ShouldDispatch();
+
+ const HandleTaskCallback process_callback_;
+ net::BackoffEntry::Policy backoff_policy_;
+ scoped_ptr<net::BackoffEntry> backoff_entry_;
+ // The number of tasks currently being handled.
+ int num_in_progress_;
+ std::deque<T> queue_;
+ // The set of tasks in queue_ or currently being handled.
+ std::set<T> tasks_;
+ base::Closure dispatch_closure_;
+ scoped_ptr<base::Timer> backoff_timer_;
+ base::TimeDelta delay_;
+
+ // Must be last data member.
+ base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(TaskQueue);
+};
+
+// The maximum number of tasks that may be concurrently executed. Think
+// carefully before changing this value. The desired behavior of backoff may
+// not be obvious when there is more than one concurrent task
+const int kMaxConcurrentTasks = 1;
+
+template <typename T>
+TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
+ const base::TimeDelta& initial_backoff_delay,
+ const base::TimeDelta& max_backoff_delay)
+ : process_callback_(callback),
+ backoff_policy_({}),
+ num_in_progress_(0),
+ weak_ptr_factory_(this) {
+ DCHECK_LE(initial_backoff_delay.InMicroseconds(),
+ max_backoff_delay.InMicroseconds());
+ backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
+ backoff_policy_.multiply_factor = 2.0;
+ backoff_policy_.jitter_factor = 0.1;
+ backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
+ backoff_policy_.entry_lifetime_ms = -1;
+ backoff_policy_.always_use_initial_delay = false;
+ backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
+ dispatch_closure_ =
+ base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
+ backoff_timer_.reset(new base::Timer(false, false));
+}
+
+template <typename T>
+void TaskQueue<T>::AddToQueue(const T& task) {
+ DCHECK(CalledOnValidThread());
+ // Ignore duplicates.
+ if (tasks_.find(task) == tasks_.end()) {
+ queue_.push_back(task);
+ tasks_.insert(task);
+ }
+ ScheduleDispatch();
+}
+
+template <typename T>
+void TaskQueue<T>::MarkAsSucceeded(const T& task) {
+ DCHECK(CalledOnValidThread());
+ FinishTask(task);
+ // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
+ // reschedule a dispatch.
+ backoff_timer_->Stop();
+ backoff_entry_->Reset();
+ ScheduleDispatch();
+}
+
+template <typename T>
+void TaskQueue<T>::MarkAsFailed(const T& task) {
+ DCHECK(CalledOnValidThread());
+ FinishTask(task);
+ backoff_entry_->InformOfRequest(false);
+ ScheduleDispatch();
+}
+
+template <typename T>
+void TaskQueue<T>::Cancel(const T& task) {
+ DCHECK(CalledOnValidThread());
+ FinishTask(task);
+ ScheduleDispatch();
+}
+
+template <typename T>
+void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(timer.get());
+ backoff_timer_ = timer.Pass();
+}
+
+template <typename T>
+void TaskQueue<T>::FinishTask(const T& task) {
+ DCHECK(CalledOnValidThread());
+ DCHECK_GE(num_in_progress_, 1);
+ --num_in_progress_;
+ const size_t num_erased = tasks_.erase(task);
+ DCHECK_EQ(1U, num_erased);
+}
+
+template <typename T>
+void TaskQueue<T>::ScheduleDispatch() {
+ DCHECK(CalledOnValidThread());
+ if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
+ return;
+ }
+
+ backoff_timer_->Start(
+ FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
+}
+
+template <typename T>
+void TaskQueue<T>::Dispatch() {
+ DCHECK(CalledOnValidThread());
+ if (!ShouldDispatch()) {
+ return;
+ }
+
+ DCHECK(!queue_.empty());
+ const T& task = queue_.front();
+ ++num_in_progress_;
+ DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
+ base::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(process_callback_, task));
+ queue_.pop_front();
+}
+
+template <typename T>
+bool TaskQueue<T>::ShouldDispatch() {
+ return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
+}
+
+} // namespace syncer
+
+#endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
diff --git a/sync/sync.gyp b/sync/sync.gyp
index 8646013..f01ea81 100644
--- a/sync/sync.gyp
+++ b/sync/sync.gyp
@@ -169,6 +169,7 @@
'internal_api/attachments/attachment_uploader_impl.cc',
'internal_api/attachments/fake_attachment_downloader.cc',
'internal_api/attachments/fake_attachment_uploader.cc',
+ 'internal_api/attachments/task_queue.cc',
'internal_api/base_node.cc',
'internal_api/base_transaction.cc',
'internal_api/change_record.cc',
@@ -205,6 +206,7 @@
'internal_api/public/attachments/attachment_uploader_impl.h',
'internal_api/public/attachments/fake_attachment_downloader.h',
'internal_api/public/attachments/fake_attachment_uploader.h',
+ 'internal_api/public/attachments/task_queue.h',
'internal_api/public/base/attachment_id_proto.cc',
'internal_api/public/base/attachment_id_proto.h',
'internal_api/public/base/cancelation_observer.cc',
diff --git a/sync/sync_tests.gypi b/sync/sync_tests.gypi
index cb728f1..32ed597 100644
--- a/sync/sync_tests.gypi
+++ b/sync/sync_tests.gypi
@@ -284,6 +284,7 @@
'internal_api/attachments/attachment_uploader_impl_unittest.cc',
'internal_api/attachments/fake_attachment_downloader_unittest.cc',
'internal_api/attachments/fake_attachment_uploader_unittest.cc',
+ 'internal_api/attachments/task_queue_unittest.cc',
'internal_api/debug_info_event_listener_unittest.cc',
'internal_api/http_bridge_unittest.cc',
'internal_api/js_mutation_event_observer_unittest.cc',