diff options
16 files changed, 700 insertions, 138 deletions
diff --git a/chrome/browser/sync/profile_sync_components_factory_impl.cc b/chrome/browser/sync/profile_sync_components_factory_impl.cc index 09b2d08..d68f582 100644 --- a/chrome/browser/sync/profile_sync_components_factory_impl.cc +++ b/chrome/browser/sync/profile_sync_components_factory_impl.cc @@ -630,12 +630,20 @@ ProfileSyncComponentsFactoryImpl::CreateAttachmentService( token_service_provider); } + // It is important that the initial backoff delay is relatively large. For + // whatever reason, the server may fail all requests for a short period of + // time. When this happens we don't want to overwhelm the server with + // requests so we use a large initial backoff. + const base::TimeDelta initial_backoff_delay = + base::TimeDelta::FromMinutes(30); + const base::TimeDelta max_backoff_delay = base::TimeDelta::FromHours(4); scoped_ptr<syncer::AttachmentService> attachment_service( new syncer::AttachmentServiceImpl(attachment_store, attachment_uploader.Pass(), attachment_downloader.Pass(), - delegate)); - + delegate, + initial_backoff_delay, + max_backoff_delay)); return attachment_service.Pass(); } diff --git a/components/sync_driver/generic_change_processor_unittest.cc b/components/sync_driver/generic_change_processor_unittest.cc index 73bd92d..9ea4e43 100644 --- a/components/sync_driver/generic_change_processor_unittest.cc +++ b/components/sync_driver/generic_change_processor_unittest.cc @@ -56,7 +56,9 @@ MockAttachmentService::MockAttachmentService( new syncer::FakeAttachmentUploader), scoped_ptr<syncer::AttachmentDownloader>( new syncer::FakeAttachmentDownloader), - NULL) { + NULL, + base::TimeDelta(), + base::TimeDelta()) { } MockAttachmentService::~MockAttachmentService() { diff --git a/sync/internal_api/attachments/attachment_downloader_impl.cc b/sync/internal_api/attachments/attachment_downloader_impl.cc index d43b3ea..a7a43b4 100644 --- a/sync/internal_api/attachments/attachment_downloader_impl.cc +++ b/sync/internal_api/attachments/attachment_downloader_impl.cc @@ -115,7 +115,7 @@ void AttachmentDownloaderImpl::OnGetTokenFailure( DownloadState* download_state = *iter; scoped_refptr<base::RefCountedString> null_attachment_data; ReportResult( - *download_state, DOWNLOAD_UNSPECIFIED_ERROR, null_attachment_data); + *download_state, DOWNLOAD_TRANSIENT_ERROR, null_attachment_data); DCHECK(state_map_.find(download_state->attachment_url) != state_map_.end()); state_map_.erase(download_state->attachment_url); } @@ -133,22 +133,28 @@ void AttachmentDownloaderImpl::OnURLFetchComplete( const DownloadState& download_state = *iter->second; DCHECK(source == download_state.url_fetcher.get()); - DownloadResult result = DOWNLOAD_UNSPECIFIED_ERROR; + DownloadResult result = DOWNLOAD_TRANSIENT_ERROR; scoped_refptr<base::RefCountedString> attachment_data; - if (source->GetResponseCode() == net::HTTP_OK) { + const int response_code = source->GetResponseCode(); + if (response_code == net::HTTP_OK) { result = DOWNLOAD_SUCCESS; std::string data_as_string; source->GetResponseAsString(&data_as_string); attachment_data = base::RefCountedString::TakeString(&data_as_string); - } else if (source->GetResponseCode() == net::HTTP_UNAUTHORIZED) { + } else if (response_code == net::HTTP_UNAUTHORIZED) { + // Server tells us we've got a bad token so invalidate it. OAuth2TokenServiceRequest::InvalidateToken(token_service_provider_.get(), account_id_, oauth2_scopes_, download_state.access_token); - // TODO(pavely): crbug/380437. This is transient error. Request new access - // token for this DownloadState. The only trick is to do it with exponential - // backoff. + // Fail the request, but indicate that it may be successful if retried. + result = DOWNLOAD_TRANSIENT_ERROR; + } else if (response_code == net::HTTP_FORBIDDEN) { + // User is not allowed to use attachments. Retrying won't help. + result = DOWNLOAD_UNSPECIFIED_ERROR; + } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) { + result = DOWNLOAD_TRANSIENT_ERROR; } ReportResult(download_state, result, attachment_data); state_map_.erase(iter); @@ -159,6 +165,7 @@ scoped_ptr<net::URLFetcher> AttachmentDownloaderImpl::CreateFetcher( const std::string& access_token) { scoped_ptr<net::URLFetcher> url_fetcher( net::URLFetcher::Create(GURL(url), net::URLFetcher::GET, this)); + url_fetcher->SetAutomaticallyRetryOn5xx(false); const std::string auth_header("Authorization: Bearer " + access_token); url_fetcher->AddExtraRequestHeader(auth_header); url_fetcher->SetRequestContext(url_request_context_getter_.get()); diff --git a/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc b/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc index 01b12c1..ee57a5b 100644 --- a/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc +++ b/sync/internal_api/attachments/attachment_downloader_impl_unittest.cc @@ -319,7 +319,7 @@ TEST_F(AttachmentDownloaderImplTest, RequestAccessTokenFails) { GoogleServiceAuthError(GoogleServiceAuthError::INVALID_GAIA_CREDENTIALS)); RunMessageLoop(); // Only id2 should fail. - VerifyDownloadResult(id2, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR); + VerifyDownloadResult(id2, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR); // Complete request for id1. CompleteDownload(net::HTTP_OK); VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_SUCCESS); @@ -337,7 +337,7 @@ TEST_F(AttachmentDownloaderImplTest, URLFetcher_BadToken) { // invalidation. CompleteDownload(net::HTTP_UNAUTHORIZED); EXPECT_EQ(1, token_service()->num_invalidate_token()); - VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR); + VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR); } TEST_F(AttachmentDownloaderImplTest, URLFetcher_ServiceUnavailable) { @@ -352,7 +352,7 @@ TEST_F(AttachmentDownloaderImplTest, URLFetcher_ServiceUnavailable) { // shouldn't be invalidated. CompleteDownload(net::HTTP_SERVICE_UNAVAILABLE); EXPECT_EQ(0, token_service()->num_invalidate_token()); - VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR); + VerifyDownloadResult(id1, AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR); } } // namespace syncer diff --git a/sync/internal_api/attachments/attachment_service_impl.cc b/sync/internal_api/attachments/attachment_service_impl.cc index 94ffead..5ba540b 100644 --- a/sync/internal_api/attachments/attachment_service_impl.cc +++ b/sync/internal_api/attachments/attachment_service_impl.cc @@ -9,6 +9,7 @@ #include "base/bind.h" #include "base/message_loop/message_loop.h" #include "base/thread_task_runner_handle.h" +#include "base/time/time.h" #include "sync/api/attachments/attachment.h" #include "sync/api/attachments/fake_attachment_store.h" #include "sync/internal_api/public/attachments/fake_attachment_downloader.h" @@ -113,7 +114,9 @@ AttachmentServiceImpl::AttachmentServiceImpl( scoped_refptr<AttachmentStore> attachment_store, scoped_ptr<AttachmentUploader> attachment_uploader, scoped_ptr<AttachmentDownloader> attachment_downloader, - Delegate* delegate) + Delegate* delegate, + const base::TimeDelta& initial_backoff_delay, + const base::TimeDelta& max_backoff_delay) : attachment_store_(attachment_store), attachment_uploader_(attachment_uploader.Pass()), attachment_downloader_(attachment_downloader.Pass()), @@ -121,6 +124,16 @@ AttachmentServiceImpl::AttachmentServiceImpl( weak_ptr_factory_(this) { DCHECK(CalledOnValidThread()); DCHECK(attachment_store_.get()); + + // TODO(maniscalco): Observe network connectivity change events. When the + // network becomes disconnected, consider suspending queue dispatch. When + // connectivity is restored, consider clearing any dispatch backoff (bug + // 411981). + upload_task_queue_.reset(new TaskQueue<AttachmentId>( + base::Bind(&AttachmentServiceImpl::BeginUpload, + weak_ptr_factory_.GetWeakPtr()), + initial_backoff_delay, + max_backoff_delay)); } AttachmentServiceImpl::~AttachmentServiceImpl() { @@ -139,7 +152,9 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() { new syncer::AttachmentServiceImpl(attachment_store, attachment_uploader.Pass(), attachment_downloader.Pass(), - NULL)); + NULL, + base::TimeDelta(), + base::TimeDelta())); return attachment_service.Pass(); } @@ -216,12 +231,22 @@ void AttachmentServiceImpl::DropDone(const DropCallback& callback, void AttachmentServiceImpl::UploadDone( const AttachmentUploader::UploadResult& result, const AttachmentId& attachment_id) { - ids_in_queue_.erase(attachment_id); - // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. - if (result != AttachmentUploader::UPLOAD_SUCCESS) - return; - if (delegate_) { - delegate_->OnAttachmentUploaded(attachment_id); + DCHECK(CalledOnValidThread()); + switch (result) { + case AttachmentUploader::UPLOAD_SUCCESS: + upload_task_queue_->MarkAsSucceeded(attachment_id); + if (delegate_) { + delegate_->OnAttachmentUploaded(attachment_id); + } + break; + case AttachmentUploader::UPLOAD_TRANSIENT_ERROR: + upload_task_queue_->MarkAsFailed(attachment_id); + upload_task_queue_->AddToQueue(attachment_id); + break; + case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR: + // TODO(pavely): crbug/372622: Deal with UploadAttachment failures. + upload_task_queue_->MarkAsFailed(attachment_id); + break; } } @@ -230,46 +255,36 @@ void AttachmentServiceImpl::DownloadDone( const AttachmentId& attachment_id, const AttachmentDownloader::DownloadResult& result, scoped_ptr<Attachment> attachment) { - if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) { - state->AddAttachment(*attachment.get()); - } else { - state->AddUnavailableAttachmentId(attachment_id); + switch (result) { + case AttachmentDownloader::DOWNLOAD_SUCCESS: + state->AddAttachment(*attachment.get()); + break; + case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR: + case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR: + state->AddUnavailableAttachmentId(attachment_id); + break; } } +void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) { + DCHECK(CalledOnValidThread()); + AttachmentIdList attachment_ids; + attachment_ids.push_back(attachment_id); + attachment_store_->Read(attachment_ids, + base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, + weak_ptr_factory_.GetWeakPtr())); +} + void AttachmentServiceImpl::UploadAttachments( const AttachmentIdSet& attachment_ids) { DCHECK(CalledOnValidThread()); if (!attachment_uploader_.get()) { return; } - - // Enqueue the attachment ids that aren't already in the queue. AttachmentIdSet::const_iterator iter = attachment_ids.begin(); AttachmentIdSet::const_iterator end = attachment_ids.end(); for (; iter != end; ++iter) { - if (ids_in_queue_.find(*iter) == ids_in_queue_.end()) { - queue_.push_back(*iter); - ids_in_queue_.insert(*iter); - } - } - - ProcessQueuedUploads(); -} - -void AttachmentServiceImpl::ProcessQueuedUploads() { - DCHECK(CalledOnValidThread()); - // TODO(maniscalco): Don't dequeue them all. Instead, limit the number of - // concurrent uploads and apply backoff on failure. - while (!queue_.empty()) { - const AttachmentId id = queue_.front(); - queue_.pop_front(); - AttachmentIdList attachment_ids; - attachment_ids.push_back(id); - attachment_store_->Read( - attachment_ids, - base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload, - weak_ptr_factory_.GetWeakPtr())); + upload_task_queue_->AddToQueue(*iter); } } @@ -281,6 +296,11 @@ void AttachmentServiceImpl::ReadDoneNowUpload( if (!unavailable_attachment_ids->empty()) { // TODO(maniscalco): We failed to read some attachments. What should we do // now? + AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin(); + AttachmentIdList::const_iterator end = unavailable_attachment_ids->end(); + for (; iter != end; ++iter) { + upload_task_queue_->Cancel(*iter); + } } AttachmentMap::const_iterator iter = attachments->begin(); diff --git a/sync/internal_api/attachments/attachment_service_impl_unittest.cc b/sync/internal_api/attachments/attachment_service_impl_unittest.cc index 6148033..dbbf89f 100644 --- a/sync/internal_api/attachments/attachment_service_impl_unittest.cc +++ b/sync/internal_api/attachments/attachment_service_impl_unittest.cc @@ -191,7 +191,9 @@ class AttachmentServiceImplTest : public testing::Test, new AttachmentServiceImpl(attachment_store, uploader.PassAs<AttachmentUploader>(), downloader.PassAs<AttachmentDownloader>(), - delegate)); + delegate, + base::TimeDelta(), + base::TimeDelta())); } AttachmentService* attachment_service() { return attachment_service_.get(); } @@ -344,32 +346,31 @@ TEST_F(AttachmentServiceImplTest, GetOrDownload_NoDownloader) { TEST_F(AttachmentServiceImplTest, UploadAttachments_Success) { AttachmentIdSet attachment_ids; - const size_t num_attachments = 3; + const unsigned num_attachments = 3; for (unsigned i = 0; i < num_attachments; ++i) { attachment_ids.insert(AttachmentId::Create()); } attachment_service()->UploadAttachments(attachment_ids); - RunLoop(); - // See that the service has issued reads for the attachments, but not yet - // uploaded anything. - EXPECT_EQ(num_attachments, store()->read_ids.size()); - EXPECT_EQ(0U, uploader()->upload_requests.size()); + for (unsigned i = 0; i < num_attachments; ++i) { + RunLoop(); + // See that the service has issued a read for at least one of the + // attachments. + ASSERT_GE(1U, store()->read_ids.size()); store()->RespondToRead(attachment_ids); - } - - RunLoop(); - EXPECT_EQ(0U, store()->read_ids.size()); - EXPECT_EQ(num_attachments, uploader()->upload_requests.size()); - AttachmentIdSet::const_iterator iter = attachment_ids.begin(); - const AttachmentIdSet::const_iterator end = attachment_ids.end(); - for (; iter != end; ++iter) { - uploader()->RespondToUpload(*iter, AttachmentUploader::UPLOAD_SUCCESS); + RunLoop(); + ASSERT_GE(1U, uploader()->upload_requests.size()); + uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, + AttachmentUploader::UPLOAD_SUCCESS); } RunLoop(); + ASSERT_EQ(0U, store()->read_ids.size()); + ASSERT_EQ(0U, uploader()->upload_requests.size()); // See that all the attachments were uploaded. ASSERT_EQ(attachment_ids.size(), on_attachment_uploaded_list().size()); + AttachmentIdSet::const_iterator iter = attachment_ids.begin(); + const AttachmentIdSet::const_iterator end = attachment_ids.end(); for (iter = attachment_ids.begin(); iter != end; ++iter) { EXPECT_THAT(on_attachment_uploaded_list(), testing::Contains(*iter)); } @@ -400,47 +401,45 @@ TEST_F(AttachmentServiceImplTest, UploadAttachments_SomeMissingFromStore) { AttachmentIdSet attachment_ids; attachment_ids.insert(AttachmentId::Create()); attachment_ids.insert(AttachmentId::Create()); - attachment_service()->UploadAttachments(attachment_ids); RunLoop(); - EXPECT_EQ(2U, store()->read_ids.size()); - EXPECT_EQ(0U, uploader()->upload_requests.size()); + ASSERT_GE(1U, store()->read_ids.size()); + + ASSERT_EQ(0U, uploader()->upload_requests.size()); store()->RespondToRead(attachment_ids); - EXPECT_EQ(1U, store()->read_ids.size()); - // Not found! - store()->RespondToRead(AttachmentIdSet()); - EXPECT_EQ(0U, store()->read_ids.size()); RunLoop(); + ASSERT_EQ(1U, uploader()->upload_requests.size()); - // One attachment went missing so we should see only one upload request. - EXPECT_EQ(1U, uploader()->upload_requests.size()); uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, AttachmentUploader::UPLOAD_SUCCESS); RunLoop(); - - // See that the delegate was called for only one. ASSERT_EQ(1U, on_attachment_uploaded_list().size()); + ASSERT_GE(1U, store()->read_ids.size()); + // Not found! + store()->RespondToRead(AttachmentIdSet()); + RunLoop(); + // No upload requests since the read failed. + ASSERT_EQ(0U, uploader()->upload_requests.size()); } TEST_F(AttachmentServiceImplTest, UploadAttachments_AllMissingFromStore) { AttachmentIdSet attachment_ids; - attachment_ids.insert(AttachmentId::Create()); - attachment_ids.insert(AttachmentId::Create()); - + const unsigned num_attachments = 2; + for (unsigned i = 0; i < num_attachments; ++i) { + attachment_ids.insert(AttachmentId::Create()); + } attachment_service()->UploadAttachments(attachment_ids); - RunLoop(); - EXPECT_EQ(2U, store()->read_ids.size()); - EXPECT_EQ(0U, uploader()->upload_requests.size()); - // None found! - store()->RespondToRead(AttachmentIdSet()); - store()->RespondToRead(AttachmentIdSet()); - EXPECT_EQ(0U, store()->read_ids.size()); + + for (unsigned i = 0; i < num_attachments; ++i) { + RunLoop(); + ASSERT_GE(1U, store()->read_ids.size()); + // None found! + store()->RespondToRead(AttachmentIdSet()); + } RunLoop(); // Nothing uploaded. EXPECT_EQ(0U, uploader()->upload_requests.size()); - RunLoop(); - // See that the delegate was never called. ASSERT_EQ(0U, on_attachment_uploaded_list().size()); } @@ -461,32 +460,31 @@ TEST_F(AttachmentServiceImplTest, UploadAttachments_NoUploader) { // Upload three attachments. For one of them, server responds with error. TEST_F(AttachmentServiceImplTest, UploadAttachments_OneUploadFails) { AttachmentIdSet attachment_ids; - attachment_ids.insert(AttachmentId::Create()); - attachment_ids.insert(AttachmentId::Create()); - attachment_ids.insert(AttachmentId::Create()); - + const unsigned num_attachments = 3; + for (unsigned i = 0; i < num_attachments; ++i) { + attachment_ids.insert(AttachmentId::Create()); + } attachment_service()->UploadAttachments(attachment_ids); - RunLoop(); - EXPECT_EQ(3U, store()->read_ids.size()); - EXPECT_EQ(0U, uploader()->upload_requests.size()); - // All attachments found. - store()->RespondToRead(attachment_ids); - store()->RespondToRead(attachment_ids); - store()->RespondToRead(attachment_ids); - RunLoop(); - - EXPECT_EQ(3U, uploader()->upload_requests.size()); - uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, - AttachmentUploader::UPLOAD_SUCCESS); - uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, - AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR); - uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, - AttachmentUploader::UPLOAD_SUCCESS); - EXPECT_EQ(0U, uploader()->upload_requests.size()); + for (unsigned i = 0; i < num_attachments; ++i) { + RunLoop(); + ASSERT_GE(1U, store()->read_ids.size()); + store()->RespondToRead(attachment_ids); + RunLoop(); + ASSERT_EQ(1U, uploader()->upload_requests.size()); + AttachmentUploader::UploadResult result = + AttachmentUploader::UPLOAD_SUCCESS; + // Fail the 2nd one. + if (i == 2U) { + result = AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR; + } else { + result = AttachmentUploader::UPLOAD_SUCCESS; + } + uploader()->RespondToUpload(uploader()->upload_requests.begin()->first, + result); + } RunLoop(); - - EXPECT_EQ(2U, on_attachment_uploaded_list().size()); + ASSERT_EQ(2U, on_attachment_uploaded_list().size()); } } // namespace syncer diff --git a/sync/internal_api/attachments/attachment_uploader_impl.cc b/sync/internal_api/attachments/attachment_uploader_impl.cc index 4d197eb..59885b7 100644 --- a/sync/internal_api/attachments/attachment_uploader_impl.cc +++ b/sync/internal_api/attachments/attachment_uploader_impl.cc @@ -133,20 +133,22 @@ const Attachment& AttachmentUploaderImpl::UploadState::GetAttachment() { void AttachmentUploaderImpl::UploadState::OnURLFetchComplete( const net::URLFetcher* source) { DCHECK(CalledOnValidThread()); - UploadResult result = UPLOAD_UNSPECIFIED_ERROR; + UploadResult result = UPLOAD_TRANSIENT_ERROR; AttachmentId attachment_id = attachment_.GetId(); - if (source->GetResponseCode() == net::HTTP_OK) { + const int response_code = source->GetResponseCode(); + if (response_code == net::HTTP_OK) { result = UPLOAD_SUCCESS; - } else if (source->GetResponseCode() == net::HTTP_UNAUTHORIZED) { - // TODO(maniscalco): One possibility is that we received a 401 because our - // access token has expired. We should probably fetch a new access token - // and retry this upload before giving up and reporting failure to our - // caller (bug 380437). + } else if (response_code == net::HTTP_UNAUTHORIZED) { + // Server tells us we've got a bad token so invalidate it. OAuth2TokenServiceRequest::InvalidateToken( token_service_provider_, account_id_, scopes_, access_token_); - } else { - // TODO(maniscalco): Once the protocol is better defined, deal with the - // various HTTP response codes we may encounter. + // Fail the request, but indicate that it may be successful if retried. + result = UPLOAD_TRANSIENT_ERROR; + } else if (response_code == net::HTTP_FORBIDDEN) { + // User is not allowed to use attachments. Retrying won't help. + result = UPLOAD_UNSPECIFIED_ERROR; + } else if (response_code == net::URLFetcher::RESPONSE_CODE_INVALID) { + result = UPLOAD_TRANSIENT_ERROR; } ReportResult(result, attachment_id); } @@ -160,6 +162,7 @@ void AttachmentUploaderImpl::UploadState::OnGetTokenSuccess( access_token_ = access_token; fetcher_.reset( net::URLFetcher::Create(upload_url_, net::URLFetcher::POST, this)); + fetcher_->SetAutomaticallyRetryOn5xx(false); fetcher_->SetRequestContext(url_request_context_getter_.get()); // TODO(maniscalco): Is there a better way? Copying the attachment data into // a string feels wrong given how large attachments may be (several MBs). If @@ -184,7 +187,11 @@ void AttachmentUploaderImpl::UploadState::OnGetTokenFailure( const GoogleServiceAuthError& error) { DCHECK_EQ(access_token_request_.get(), request); access_token_request_.reset(); - ReportResult(UPLOAD_UNSPECIFIED_ERROR, attachment_.GetId()); + // TODO(maniscalco): We treat this as a transient error, but it may in fact be + // a very long lived error and require user action. Consider differentiating + // between the causes of GetToken failure and act accordingly. Think about + // the causes of GetToken failure. Are there (bug 412802). + ReportResult(UPLOAD_TRANSIENT_ERROR, attachment_.GetId()); } void AttachmentUploaderImpl::UploadState::GetToken() { diff --git a/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc b/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc index 9cfc625..e682253 100644 --- a/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc +++ b/sync/internal_api/attachments/attachment_uploader_impl_unittest.cc @@ -509,7 +509,7 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_FailToGetToken) { // See that the done callback was invoked. ASSERT_EQ(1U, upload_results().size()); - EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]); + EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]); ASSERT_EQ(1U, attachment_ids().size()); EXPECT_EQ(attachment.GetId(), attachment_ids()[0]); @@ -531,6 +531,43 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_ServiceUnavilable) { // See that the done callback was invoked. ASSERT_EQ(1U, upload_results().size()); + EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]); + ASSERT_EQ(1U, attachment_ids().size()); + EXPECT_EQ(attachment.GetId(), attachment_ids()[0]); + + // See that the HTTP server received one request. + ASSERT_EQ(1U, http_requests_received().size()); + const HttpRequest& http_request = http_requests_received().front(); + EXPECT_EQ(net::test_server::METHOD_POST, http_request.method); + std::string expected_relative_url(kAttachments + + attachment.GetId().GetProto().unique_id()); + EXPECT_EQ(expected_relative_url, http_request.relative_url); + EXPECT_TRUE(http_request.has_content); + EXPECT_EQ(kAttachmentData, http_request.content); + std::string expected_header(kAuthorization); + const std::string header_name(kAuthorization); + const std::string header_value(std::string("Bearer ") + kAccessToken); + EXPECT_THAT(http_request.headers, + testing::Contains(testing::Pair(header_name, header_value))); + + // See that we did not invalidate the token. + ASSERT_EQ(0, token_service().num_invalidate_token()); +} + +// Verify that we "403 Forbidden" as a non-transient error. +TEST_F(AttachmentUploaderImplTest, UploadAttachment_Forbidden) { + token_service().AddAccount(kAccountId); + request_handler().SetStatusCode(net::HTTP_FORBIDDEN); + + scoped_refptr<base::RefCountedString> some_data(new base::RefCountedString); + some_data->data() = kAttachmentData; + Attachment attachment = Attachment::Create(some_data); + uploader()->UploadAttachment(attachment, upload_callback()); + + RunAndWaitFor(1); + + // See that the done callback was invoked. + ASSERT_EQ(1U, upload_results().size()); EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]); ASSERT_EQ(1U, attachment_ids().size()); EXPECT_EQ(attachment.GetId(), attachment_ids()[0]); @@ -569,7 +606,7 @@ TEST_F(AttachmentUploaderImplTest, UploadAttachment_BadToken) { // See that the done callback was invoked. ASSERT_EQ(1U, upload_results().size()); - EXPECT_EQ(AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR, upload_results()[0]); + EXPECT_EQ(AttachmentUploader::UPLOAD_TRANSIENT_ERROR, upload_results()[0]); ASSERT_EQ(1U, attachment_ids().size()); EXPECT_EQ(attachment.GetId(), attachment_ids()[0]); diff --git a/sync/internal_api/attachments/task_queue.cc b/sync/internal_api/attachments/task_queue.cc new file mode 100644 index 0000000..a89e496 --- /dev/null +++ b/sync/internal_api/attachments/task_queue.cc @@ -0,0 +1,5 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/internal_api/public/attachments/task_queue.h" diff --git a/sync/internal_api/attachments/task_queue_unittest.cc b/sync/internal_api/attachments/task_queue_unittest.cc new file mode 100644 index 0000000..ac4ecd6 --- /dev/null +++ b/sync/internal_api/attachments/task_queue_unittest.cc @@ -0,0 +1,197 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "sync/internal_api/public/attachments/task_queue.h" + +#include <vector> + +#include "base/bind.h" +#include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "base/timer/mock_timer.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::TimeDelta; + +namespace syncer { + +namespace { + +const TimeDelta kZero; + +} // namespace + +class TaskQueueTest : public testing::Test { + protected: + TaskQueueTest() : weak_ptr_factory_(this) { + queue_.reset(new TaskQueue<int>( + base::Bind(&TaskQueueTest::Process, weak_ptr_factory_.GetWeakPtr()), + TimeDelta::FromMinutes(1), + TimeDelta::FromMinutes(8))); + } + + void RunLoop() { + base::RunLoop run_loop; + run_loop.RunUntilIdle(); + } + + void Process(const int& task) { dispatched_.push_back(task); } + + base::MessageLoop message_loop_; + scoped_ptr<TaskQueue<int> > queue_; + std::vector<int> dispatched_; + base::WeakPtrFactory<TaskQueueTest> weak_ptr_factory_; +}; + +// See that at most one task is dispatched at a time. +TEST_F(TaskQueueTest, AddToQueue_NoConcurrentTasks) { + queue_->AddToQueue(1); + queue_->AddToQueue(2); + RunLoop(); + + // Only one has been dispatched. + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + RunLoop(); + + // Still only one. + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsSucceeded(1); + RunLoop(); + + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(2, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsSucceeded(2); + RunLoop(); + + ASSERT_TRUE(dispatched_.empty()); +} + +// See that that the queue ignores duplicate adds. +TEST_F(TaskQueueTest, AddToQueue_NoDuplicates) { + queue_->AddToQueue(1); + queue_->AddToQueue(1); + queue_->AddToQueue(2); + queue_->AddToQueue(1); + ASSERT_TRUE(dispatched_.empty()); + RunLoop(); + + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsSucceeded(1); + RunLoop(); + + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(2, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsSucceeded(2); + RunLoop(); + + ASSERT_TRUE(dispatched_.empty()); +} + +// See that Retry works as expected. +TEST_F(TaskQueueTest, Retry) { + scoped_ptr<base::MockTimer> timer_to_pass(new base::MockTimer(false, false)); + base::MockTimer* mock_timer = timer_to_pass.get(); + queue_->SetTimerForTest(timer_to_pass.PassAs<base::Timer>()); + + // 1st attempt. + queue_->AddToQueue(1); + ASSERT_TRUE(mock_timer->IsRunning()); + ASSERT_EQ(TimeDelta(), mock_timer->GetCurrentDelay()); + TimeDelta last_delay = mock_timer->GetCurrentDelay(); + mock_timer->Fire(); + RunLoop(); + + // 2nd attempt. + ASSERT_FALSE(mock_timer->IsRunning()); + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsFailed(1); + queue_->AddToQueue(1); + ASSERT_TRUE(mock_timer->IsRunning()); + EXPECT_GT(mock_timer->GetCurrentDelay(), last_delay); + EXPECT_LE(mock_timer->GetCurrentDelay(), TimeDelta::FromMinutes(1)); + last_delay = mock_timer->GetCurrentDelay(); + mock_timer->Fire(); + RunLoop(); + + // 3rd attempt. + ASSERT_FALSE(mock_timer->IsRunning()); + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsFailed(1); + queue_->AddToQueue(1); + ASSERT_TRUE(mock_timer->IsRunning()); + EXPECT_GT(mock_timer->GetCurrentDelay(), last_delay); + last_delay = mock_timer->GetCurrentDelay(); + mock_timer->Fire(); + RunLoop(); + + // Give up. + ASSERT_FALSE(mock_timer->IsRunning()); + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + dispatched_.clear(); + queue_->Cancel(1); + ASSERT_FALSE(mock_timer->IsRunning()); + + // Try a different task. See the timer remains unchanged because the previous + // task was cancelled. + ASSERT_TRUE(dispatched_.empty()); + queue_->AddToQueue(2); + ASSERT_TRUE(mock_timer->IsRunning()); + EXPECT_GE(last_delay, mock_timer->GetCurrentDelay()); + last_delay = mock_timer->GetCurrentDelay(); + mock_timer->Fire(); + RunLoop(); + + // Mark this one as succeeding, which will clear the backoff delay. + ASSERT_FALSE(mock_timer->IsRunning()); + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(2, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsSucceeded(2); + ASSERT_FALSE(mock_timer->IsRunning()); + + // Add one last task and see that it's dispatched without delay because the + // previous one succeeded. + ASSERT_TRUE(dispatched_.empty()); + queue_->AddToQueue(3); + ASSERT_TRUE(mock_timer->IsRunning()); + EXPECT_LT(mock_timer->GetCurrentDelay(), last_delay); + last_delay = mock_timer->GetCurrentDelay(); + mock_timer->Fire(); + RunLoop(); + + // Clean up. + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(3, dispatched_.front()); + dispatched_.clear(); + queue_->MarkAsSucceeded(3); + ASSERT_FALSE(mock_timer->IsRunning()); +} + +TEST_F(TaskQueueTest, Cancel) { + queue_->AddToQueue(1); + RunLoop(); + + ASSERT_EQ(1U, dispatched_.size()); + EXPECT_EQ(1, dispatched_.front()); + dispatched_.clear(); + queue_->Cancel(1); + RunLoop(); + + ASSERT_TRUE(dispatched_.empty()); +} + +} // namespace syncer diff --git a/sync/internal_api/public/attachments/attachment_downloader.h b/sync/internal_api/public/attachments/attachment_downloader.h index 93498ed..12db422 100644 --- a/sync/internal_api/public/attachments/attachment_downloader.h +++ b/sync/internal_api/public/attachments/attachment_downloader.h @@ -24,6 +24,7 @@ class SYNC_EXPORT AttachmentDownloader { enum DownloadResult { DOWNLOAD_SUCCESS, // No error, attachment was downloaded // successfully. + DOWNLOAD_TRANSIENT_ERROR, // A transient error occurred, try again later. DOWNLOAD_UNSPECIFIED_ERROR, // An unspecified error occurred. }; diff --git a/sync/internal_api/public/attachments/attachment_service_impl.h b/sync/internal_api/public/attachments/attachment_service_impl.h index 619b131..423656b 100644 --- a/sync/internal_api/public/attachments/attachment_service_impl.h +++ b/sync/internal_api/public/attachments/attachment_service_impl.h @@ -15,6 +15,7 @@ #include "sync/internal_api/public/attachments/attachment_service.h" #include "sync/internal_api/public/attachments/attachment_service_proxy.h" #include "sync/internal_api/public/attachments/attachment_uploader.h" +#include "sync/internal_api/public/attachments/task_queue.h" namespace syncer { @@ -22,11 +23,6 @@ namespace syncer { class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, public base::NonThreadSafe { public: - // |delegate| is optional delegate for AttachmentService to notify about - // asynchronous events (AttachmentUploaded). Pass NULL if delegate is not - // provided. AttachmentService doesn't take ownership of delegate, the pointer - // must be valid throughout AttachmentService lifetime. - // // |attachment_uploader| is optional. If null, attachments will never be // uploaded to the sync server and |delegate|'s OnAttachmentUploaded will // never be invoked. @@ -34,11 +30,26 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, // |attachment_downloader| is optional. If null, attachments will never be // downloaded. Only attachments in |attachment_store| will be returned from // GetOrDownloadAttachments. - + // + // |delegate| is optional delegate for AttachmentService to notify about + // asynchronous events (AttachmentUploaded). Pass NULL if delegate is not + // provided. AttachmentService doesn't take ownership of delegate, the pointer + // must be valid throughout AttachmentService lifetime. + // + // |initial_backoff_delay| the initial delay between upload attempts. This + // class automatically retries failed uploads. After the first failure, it + // will wait this amount of time until it tries again. After each failure, + // the delay is doubled until the |max_backoff_delay| is reached. A + // successful upload clears the delay. + // + // |max_backoff_delay| the maxmium delay between upload attempts when backed + // off. AttachmentServiceImpl(scoped_refptr<AttachmentStore> attachment_store, scoped_ptr<AttachmentUploader> attachment_uploader, scoped_ptr<AttachmentDownloader> attachment_downloader, - Delegate* delegate); + Delegate* delegate, + const base::TimeDelta& initial_backoff_delay, + const base::TimeDelta& max_backoff_delay); virtual ~AttachmentServiceImpl(); // Create an AttachmentServiceImpl suitable for use in tests. @@ -69,7 +80,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, const AttachmentId& attachment_id, const AttachmentDownloader::DownloadResult& result, scoped_ptr<Attachment> attachment); - void ProcessQueuedUploads(); + void BeginUpload(const AttachmentId& attachment_id); void ReadDoneNowUpload( const AttachmentStore::Result& result, scoped_ptr<AttachmentMap> attachments, @@ -86,12 +97,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService, // May be null. Delegate* delegate_; - // Queue of attachment ids to be uploaded. Every entry in this queue should - // also exist in ids_in_queue_. - std::deque<AttachmentId> queue_; - - // Ids of attachments currently being uploaded or queued for upload. - AttachmentIdSet ids_in_queue_; + scoped_ptr<TaskQueue<AttachmentId> > upload_task_queue_; // Must be last data member. base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_; diff --git a/sync/internal_api/public/attachments/attachment_uploader.h b/sync/internal_api/public/attachments/attachment_uploader.h index 7c2f39c..0339e67 100644 --- a/sync/internal_api/public/attachments/attachment_uploader.h +++ b/sync/internal_api/public/attachments/attachment_uploader.h @@ -20,6 +20,7 @@ class SYNC_EXPORT AttachmentUploader { enum UploadResult { UPLOAD_SUCCESS, // No error, attachment was uploaded // successfully. + UPLOAD_TRANSIENT_ERROR, // A transient error occurred, try again later. UPLOAD_UNSPECIFIED_ERROR, // An unspecified error occurred. }; diff --git a/sync/internal_api/public/attachments/task_queue.h b/sync/internal_api/public/attachments/task_queue.h new file mode 100644 index 0000000..2466f3a --- /dev/null +++ b/sync/internal_api/public/attachments/task_queue.h @@ -0,0 +1,270 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ +#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ + +#include <deque> +#include <set> + +#include "base/bind.h" +#include "base/callback.h" +#include "base/gtest_prod_util.h" +#include "base/macros.h" +#include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/threading/non_thread_safe.h" +#include "base/time/time.h" +#include "base/timer/timer.h" +#include "net/base/backoff_entry.h" + +namespace syncer { + +// A queue that dispatches tasks, ignores duplicates, and provides backoff +// semantics. +// +// |T| is the task type. +// +// For each task added to the queue, the HandleTaskCallback will eventually be +// invoked. For each invocation, the user of TaskQueue must call exactly one of +// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|. +// +// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task). +// +// Example usage: +// +// void Handle(const Foo& foo); +// ... +// TaskQueue<Foo> queue(base::Bind(&Handle), +// base::TimeDelta::FromSeconds(1), +// base::TimeDelta::FromMinutes(1)); +// ... +// { +// Foo foo; +// // Add foo to the queue. At some point, Handle will be invoked in this +// // message loop. +// queue.AddToQueue(foo); +// } +// ... +// void Handle(const Foo& foo) { +// DoSomethingWith(foo); +// // We must call one of the three methods to tell the queue how we're +// // dealing with foo. Of course, we are free to call in the the context of +// // this HandleTaskCallback or outside the context if we so choose. +// if (SuccessfullyHandled(foo)) { +// queue.MarkAsSucceeded(foo); +// } else if (Failed(foo)) { +// queue.MarkAsFailed(foo); +// if (ShouldRetry(foo)) { +// queue.AddToQueue(foo); +// } +// } else { +// Cancel(foo); +// } +// } +// +template <typename T> +class TaskQueue : base::NonThreadSafe { + public: + // A callback provided by users of the TaskQueue to handle tasks. + // + // This callback is invoked by the queue with a task to be handled. The + // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, + // or |Cancel| to signify completion of the task. + typedef base::Callback<void(const T&)> HandleTaskCallback; + + // Construct a TaskQueue. + // + // |callback| the callback to be invoked for handling tasks. + // + // |initial_backoff_delay| the initial amount of time the queue will wait + // before dispatching tasks after a failed task (see |MarkAsFailed|). May be + // zero. Subsequent failures will increase the delay up to + // |max_backoff_delay|. + // + // |max_backoff_delay| the maximum amount of time the queue will wait before + // dispatching tasks. May be zero. Must be greater than or equal to + // |initial_backoff_delay|. + TaskQueue(const HandleTaskCallback& callback, + const base::TimeDelta& initial_backoff_delay, + const base::TimeDelta& max_backoff_delay); + + // Add |task| to the end of the queue. + // + // If |task| is already present (as determined by operator==) it is not added. + void AddToQueue(const T& task); + + // Mark |task| as completing successfully. + // + // Marking a task as completing successfully will reduce or eliminate any + // backoff delay in effect. + // + // May only be called after the HandleTaskCallback has been invoked with + // |task|. + void MarkAsSucceeded(const T& task); + + // Mark |task| as failed. + // + // Marking a task as failed will cause a backoff, i.e. a delay in dispatching + // of subsequent tasks. Repeated failures will increase the delay. + // + // May only be called after the HandleTaskCallback has been invoked with + // |task|. + void MarkAsFailed(const T& task); + + // Cancel |task|. + // + // |task| is removed from the queue and will not be retried. Does not affect + // the backoff delay. + // + // May only be called after the HandleTaskCallback has been invoked with + // |task|. + void Cancel(const T& task); + + private: + FRIEND_TEST_ALL_PREFIXES(TaskQueueTest, Retry); + + // Use |timer| for scheduled events. + // + // Used in tests. See also MockTimer. + void SetTimerForTest(scoped_ptr<base::Timer> timer); + void FinishTask(const T& task); + void ScheduleDispatch(); + void Dispatch(); + // Return true if we should dispatch tasks. + bool ShouldDispatch(); + + const HandleTaskCallback process_callback_; + net::BackoffEntry::Policy backoff_policy_; + scoped_ptr<net::BackoffEntry> backoff_entry_; + // The number of tasks currently being handled. + int num_in_progress_; + std::deque<T> queue_; + // The set of tasks in queue_ or currently being handled. + std::set<T> tasks_; + base::Closure dispatch_closure_; + scoped_ptr<base::Timer> backoff_timer_; + base::TimeDelta delay_; + + // Must be last data member. + base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; + + DISALLOW_COPY_AND_ASSIGN(TaskQueue); +}; + +// The maximum number of tasks that may be concurrently executed. Think +// carefully before changing this value. The desired behavior of backoff may +// not be obvious when there is more than one concurrent task +const int kMaxConcurrentTasks = 1; + +template <typename T> +TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback, + const base::TimeDelta& initial_backoff_delay, + const base::TimeDelta& max_backoff_delay) + : process_callback_(callback), + backoff_policy_({}), + num_in_progress_(0), + weak_ptr_factory_(this) { + DCHECK_LE(initial_backoff_delay.InMicroseconds(), + max_backoff_delay.InMicroseconds()); + backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds(); + backoff_policy_.multiply_factor = 2.0; + backoff_policy_.jitter_factor = 0.1; + backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); + backoff_policy_.entry_lifetime_ms = -1; + backoff_policy_.always_use_initial_delay = false; + backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_)); + dispatch_closure_ = + base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); + backoff_timer_.reset(new base::Timer(false, false)); +} + +template <typename T> +void TaskQueue<T>::AddToQueue(const T& task) { + DCHECK(CalledOnValidThread()); + // Ignore duplicates. + if (tasks_.find(task) == tasks_.end()) { + queue_.push_back(task); + tasks_.insert(task); + } + ScheduleDispatch(); +} + +template <typename T> +void TaskQueue<T>::MarkAsSucceeded(const T& task) { + DCHECK(CalledOnValidThread()); + FinishTask(task); + // The task succeeded. Stop any pending timer, reset (clear) the backoff, and + // reschedule a dispatch. + backoff_timer_->Stop(); + backoff_entry_->Reset(); + ScheduleDispatch(); +} + +template <typename T> +void TaskQueue<T>::MarkAsFailed(const T& task) { + DCHECK(CalledOnValidThread()); + FinishTask(task); + backoff_entry_->InformOfRequest(false); + ScheduleDispatch(); +} + +template <typename T> +void TaskQueue<T>::Cancel(const T& task) { + DCHECK(CalledOnValidThread()); + FinishTask(task); + ScheduleDispatch(); +} + +template <typename T> +void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) { + DCHECK(CalledOnValidThread()); + DCHECK(timer.get()); + backoff_timer_ = timer.Pass(); +} + +template <typename T> +void TaskQueue<T>::FinishTask(const T& task) { + DCHECK(CalledOnValidThread()); + DCHECK_GE(num_in_progress_, 1); + --num_in_progress_; + const size_t num_erased = tasks_.erase(task); + DCHECK_EQ(1U, num_erased); +} + +template <typename T> +void TaskQueue<T>::ScheduleDispatch() { + DCHECK(CalledOnValidThread()); + if (backoff_timer_->IsRunning() || !ShouldDispatch()) { + return; + } + + backoff_timer_->Start( + FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_); +} + +template <typename T> +void TaskQueue<T>::Dispatch() { + DCHECK(CalledOnValidThread()); + if (!ShouldDispatch()) { + return; + } + + DCHECK(!queue_.empty()); + const T& task = queue_.front(); + ++num_in_progress_; + DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); + base::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(process_callback_, task)); + queue_.pop_front(); +} + +template <typename T> +bool TaskQueue<T>::ShouldDispatch() { + return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); +} + +} // namespace syncer + +#endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ diff --git a/sync/sync.gyp b/sync/sync.gyp index 8646013..f01ea81 100644 --- a/sync/sync.gyp +++ b/sync/sync.gyp @@ -169,6 +169,7 @@ 'internal_api/attachments/attachment_uploader_impl.cc', 'internal_api/attachments/fake_attachment_downloader.cc', 'internal_api/attachments/fake_attachment_uploader.cc', + 'internal_api/attachments/task_queue.cc', 'internal_api/base_node.cc', 'internal_api/base_transaction.cc', 'internal_api/change_record.cc', @@ -205,6 +206,7 @@ 'internal_api/public/attachments/attachment_uploader_impl.h', 'internal_api/public/attachments/fake_attachment_downloader.h', 'internal_api/public/attachments/fake_attachment_uploader.h', + 'internal_api/public/attachments/task_queue.h', 'internal_api/public/base/attachment_id_proto.cc', 'internal_api/public/base/attachment_id_proto.h', 'internal_api/public/base/cancelation_observer.cc', diff --git a/sync/sync_tests.gypi b/sync/sync_tests.gypi index cb728f1..32ed597 100644 --- a/sync/sync_tests.gypi +++ b/sync/sync_tests.gypi @@ -284,6 +284,7 @@ 'internal_api/attachments/attachment_uploader_impl_unittest.cc', 'internal_api/attachments/fake_attachment_downloader_unittest.cc', 'internal_api/attachments/fake_attachment_uploader_unittest.cc', + 'internal_api/attachments/task_queue_unittest.cc', 'internal_api/debug_info_event_listener_unittest.cc', 'internal_api/http_bridge_unittest.cc', 'internal_api/js_mutation_event_observer_unittest.cc', |