diff options
author | dmurph <dmurph@chromium.org> | 2015-09-30 11:49:01 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-09-30 18:49:54 +0000 |
commit | 164f0b76c3a8156b95c36f17c95c452e8f3b9765 (patch) | |
tree | 2f79df2132a0451d8415a918edd966d33217d3f3 /storage/browser | |
parent | 72869406e9af49285a992c008a81bdfc99ce9f5b (diff) | |
download | chromium_src-164f0b76c3a8156b95c36f17c95c452e8f3b9765.zip chromium_src-164f0b76c3a8156b95c36f17c95c452e8f3b9765.tar.gz chromium_src-164f0b76c3a8156b95c36f17c95c452e8f3b9765.tar.bz2 |
[Blob] BlobReader class & tests, and removal of all redundant reading.
* New BlobReader class & tests
* Removal of other reading code, which now uses the BlobReader
* Removal of unnecessary UploadDiskCacheEntryElementReader
* Removal of blob expansion logic in UploadDataStreamBuilder
Now it's very easy for anyone in browserland to read blobs instead of
having to do url requests, and it's also easy for anyone to add new
blob backing storage.
This is a prerequisite for the new blob async transportation, see here:
goto/BlobPaging
BUG=138051,375297
Committed: https://crrev.com/02561552a57ed8792a8ebb2676bad485e1d99605
Cr-Commit-Position: refs/heads/master@{#351470}
Review URL: https://codereview.chromium.org/1337153002
Cr-Commit-Position: refs/heads/master@{#351611}
Diffstat (limited to 'storage/browser')
-rw-r--r-- | storage/browser/BUILD.gn | 4 | ||||
-rw-r--r-- | storage/browser/blob/blob_data_handle.cc | 90 | ||||
-rw-r--r-- | storage/browser/blob/blob_data_handle.h | 31 | ||||
-rw-r--r-- | storage/browser/blob/blob_data_snapshot.h | 1 | ||||
-rw-r--r-- | storage/browser/blob/blob_reader.cc | 568 | ||||
-rw-r--r-- | storage/browser/blob/blob_reader.h | 190 | ||||
-rw-r--r-- | storage/browser/blob/blob_storage_context.cc | 6 | ||||
-rw-r--r-- | storage/browser/blob/blob_url_request_job.cc | 540 | ||||
-rw-r--r-- | storage/browser/blob/blob_url_request_job.h | 63 | ||||
-rw-r--r-- | storage/browser/blob/blob_url_request_job_factory.cc | 29 | ||||
-rw-r--r-- | storage/browser/blob/blob_url_request_job_factory.h | 6 | ||||
-rw-r--r-- | storage/browser/blob/upload_blob_element_reader.cc | 67 | ||||
-rw-r--r-- | storage/browser/blob/upload_blob_element_reader.h | 54 |
13 files changed, 1103 insertions, 546 deletions
diff --git a/storage/browser/BUILD.gn b/storage/browser/BUILD.gn index 3594d96..6206f37 100644 --- a/storage/browser/BUILD.gn +++ b/storage/browser/BUILD.gn @@ -14,6 +14,8 @@ component("browser") { "blob/blob_data_item.h", "blob/blob_data_snapshot.cc", "blob/blob_data_snapshot.h", + "blob/blob_reader.cc", + "blob/blob_reader.h", "blob/blob_storage_context.cc", "blob/blob_storage_context.h", "blob/blob_url_request_job.cc", @@ -28,6 +30,8 @@ component("browser") { "blob/shareable_blob_data_item.h", "blob/shareable_file_reference.cc", "blob/shareable_file_reference.h", + "blob/upload_blob_element_reader.cc", + "blob/upload_blob_element_reader.h", "blob/view_blob_internals_job.cc", "blob/view_blob_internals_job.h", "database/database_quota_client.cc", diff --git a/storage/browser/blob/blob_data_handle.cc b/storage/browser/blob/blob_data_handle.cc index e3a4be9..3e864fa 100644 --- a/storage/browser/blob/blob_data_handle.cc +++ b/storage/browser/blob/blob_data_handle.cc @@ -8,38 +8,98 @@ #include "base/location.h" #include "base/logging.h" #include "base/sequenced_task_runner.h" +#include "base/task_runner.h" +#include "base/time/time.h" #include "storage/browser/blob/blob_data_snapshot.h" +#include "storage/browser/blob/blob_reader.h" #include "storage/browser/blob/blob_storage_context.h" +#include "storage/browser/fileapi/file_stream_reader.h" +#include "storage/browser/fileapi/file_system_context.h" +#include "storage/browser/fileapi/file_system_url.h" +#include "url/gurl.h" namespace storage { +namespace { + +class FileStreamReaderProviderImpl + : public BlobReader::FileStreamReaderProvider { + public: + FileStreamReaderProviderImpl(FileSystemContext* file_system_context) + : file_system_context_(file_system_context) {} + ~FileStreamReaderProviderImpl() override {} + + scoped_ptr<FileStreamReader> CreateForLocalFile( + base::TaskRunner* task_runner, + const base::FilePath& file_path, + int64_t initial_offset, + const base::Time& expected_modification_time) override { + return make_scoped_ptr(FileStreamReader::CreateForLocalFile( + task_runner, file_path, initial_offset, expected_modification_time)); + } + + scoped_ptr<FileStreamReader> CreateFileStreamReader( + const GURL& filesystem_url, + int64_t offset, + int64_t max_bytes_to_read, + const base::Time& expected_modification_time) override { + return file_system_context_->CreateFileStreamReader( + storage::FileSystemURL( + file_system_context_->CrackURL( + filesystem_url)), + offset, max_bytes_to_read, + expected_modification_time) + .Pass(); + } + + private: + scoped_refptr<FileSystemContext> file_system_context_; + DISALLOW_COPY_AND_ASSIGN(FileStreamReaderProviderImpl); +}; + +} // namespace + BlobDataHandle::BlobDataHandleShared::BlobDataHandleShared( const std::string& uuid, - BlobStorageContext* context, - base::SequencedTaskRunner* task_runner) - : uuid_(uuid), context_(context->AsWeakPtr()) { + const std::string& content_type, + const std::string& content_disposition, + BlobStorageContext* context) + : uuid_(uuid), + content_type_(content_type), + content_disposition_(content_disposition), + context_(context->AsWeakPtr()) { context_->IncrementBlobRefCount(uuid); } +scoped_ptr<BlobReader> BlobDataHandle::CreateReader( + FileSystemContext* file_system_context, + base::SequencedTaskRunner* file_task_runner) const { + return scoped_ptr<BlobReader>(new BlobReader( + this, scoped_ptr<BlobReader::FileStreamReaderProvider>( + new FileStreamReaderProviderImpl(file_system_context)), + file_task_runner)); +} + scoped_ptr<BlobDataSnapshot> BlobDataHandle::BlobDataHandleShared::CreateSnapshot() const { return context_->CreateSnapshot(uuid_).Pass(); } -const std::string& BlobDataHandle::BlobDataHandleShared::uuid() const { - return uuid_; -} - BlobDataHandle::BlobDataHandleShared::~BlobDataHandleShared() { if (context_.get()) context_->DecrementBlobRefCount(uuid_); } BlobDataHandle::BlobDataHandle(const std::string& uuid, + const std::string& content_type, + const std::string& content_disposition, BlobStorageContext* context, - base::SequencedTaskRunner* task_runner) - : io_task_runner_(task_runner), - shared_(new BlobDataHandleShared(uuid, context, task_runner)) { + base::SequencedTaskRunner* io_task_runner) + : io_task_runner_(io_task_runner), + shared_(new BlobDataHandleShared(uuid, + content_type, + content_disposition, + context)) { DCHECK(io_task_runner_.get()); DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); } @@ -62,7 +122,15 @@ scoped_ptr<BlobDataSnapshot> BlobDataHandle::CreateSnapshot() const { } const std::string& BlobDataHandle::uuid() const { - return shared_->uuid(); + return shared_->uuid_; +} + +const std::string& BlobDataHandle::content_type() const { + return shared_->content_type_; +} + +const std::string& BlobDataHandle::content_disposition() const { + return shared_->content_disposition_; } } // namespace storage diff --git a/storage/browser/blob/blob_data_handle.h b/storage/browser/blob/blob_data_handle.h index 3041241..8eba2c6 100644 --- a/storage/browser/blob/blob_data_handle.h +++ b/storage/browser/blob/blob_data_handle.h @@ -8,6 +8,7 @@ #include <string> #include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/supports_user_data.h" #include "storage/browser/storage_browser_export.h" @@ -19,7 +20,9 @@ class SequencedTaskRunner; namespace storage { class BlobDataSnapshot; +class BlobReader; class BlobStorageContext; +class FileSystemContext; // BlobDataHandle ensures that the underlying blob (keyed by the uuid) remains // in the BlobStorageContext's collection while this object is alive. Anything @@ -36,15 +39,25 @@ class STORAGE_EXPORT BlobDataHandle BlobDataHandle(const BlobDataHandle& other); // May be copied on any thread. ~BlobDataHandle() override; // May be deleted on any thread. - // A BlobDataSnapshot is used to read the data from the blob. This object is + // A BlobReader is used to read the data from the blob. This object is // intended to be transient and should not be stored for any extended period // of time. + scoped_ptr<BlobReader> CreateReader( + FileSystemContext* file_system_context, + base::SequencedTaskRunner* file_task_runner) const; + + // May be accessed on any thread. + const std::string& uuid() const; + // May be accessed on any thread. + const std::string& content_type() const; + // May be accessed on any thread. + const std::string& content_disposition() const; + // This call and the destruction of the returned snapshot must be called // on the IO thread. + // TODO(dmurph): Make this protected, where only the BlobReader can call it. scoped_ptr<BlobDataSnapshot> CreateSnapshot() const; - const std::string& uuid() const; // May be accessed on any thread. - private: // Internal class whose destructor is guarenteed to be called on the IO // thread. @@ -52,11 +65,11 @@ class STORAGE_EXPORT BlobDataHandle : public base::RefCountedThreadSafe<BlobDataHandleShared> { public: BlobDataHandleShared(const std::string& uuid, - BlobStorageContext* context, - base::SequencedTaskRunner* task_runner); + const std::string& content_type, + const std::string& content_disposition, + BlobStorageContext* context); scoped_ptr<BlobDataSnapshot> CreateSnapshot() const; - const std::string& uuid() const; private: friend class base::DeleteHelper<BlobDataHandleShared>; @@ -66,6 +79,8 @@ class STORAGE_EXPORT BlobDataHandle virtual ~BlobDataHandleShared(); const std::string uuid_; + const std::string content_type_; + const std::string content_disposition_; base::WeakPtr<BlobStorageContext> context_; DISALLOW_COPY_AND_ASSIGN(BlobDataHandleShared); @@ -73,8 +88,10 @@ class STORAGE_EXPORT BlobDataHandle friend class BlobStorageContext; BlobDataHandle(const std::string& uuid, + const std::string& content_type, + const std::string& content_disposition, BlobStorageContext* context, - base::SequencedTaskRunner* task_runner); + base::SequencedTaskRunner* io_task_runner); scoped_refptr<base::SequencedTaskRunner> io_task_runner_; scoped_refptr<BlobDataHandleShared> shared_; diff --git a/storage/browser/blob/blob_data_snapshot.h b/storage/browser/blob/blob_data_snapshot.h index f0d47227..01ea2ef 100644 --- a/storage/browser/blob/blob_data_snapshot.h +++ b/storage/browser/blob/blob_data_snapshot.h @@ -53,6 +53,7 @@ class STORAGE_EXPORT BlobDataSnapshot : public base::SupportsUserData::Data { const std::string uuid_; const std::string content_type_; const std::string content_disposition_; + // Non-const for constrution in BlobStorageContext std::vector<scoped_refptr<BlobDataItem>> items_; }; diff --git a/storage/browser/blob/blob_reader.cc b/storage/browser/blob/blob_reader.cc new file mode 100644 index 0000000..ccb4e55 --- /dev/null +++ b/storage/browser/blob/blob_reader.cc @@ -0,0 +1,568 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "storage/browser/blob/blob_reader.h" + +#include <algorithm> +#include <limits> + +#include "base/bind.h" +#include "base/sequenced_task_runner.h" +#include "base/stl_util.h" +#include "base/time/time.h" +#include "base/trace_event/trace_event.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/disk_cache/disk_cache.h" +#include "storage/browser/blob/blob_data_handle.h" +#include "storage/browser/blob/blob_data_snapshot.h" +#include "storage/browser/fileapi/file_stream_reader.h" +#include "storage/browser/fileapi/file_system_context.h" +#include "storage/browser/fileapi/file_system_url.h" +#include "storage/common/data_element.h" + +namespace storage { +namespace { +bool IsFileType(DataElement::Type type) { + switch (type) { + case DataElement::TYPE_FILE: + case DataElement::TYPE_FILE_FILESYSTEM: + return true; + default: + return false; + } +} +} // namespace + +BlobReader::FileStreamReaderProvider::~FileStreamReaderProvider() {} + +BlobReader::BlobReader( + const BlobDataHandle* blob_handle, + scoped_ptr<FileStreamReaderProvider> file_stream_provider, + base::SequencedTaskRunner* file_task_runner) + : file_stream_provider_(file_stream_provider.Pass()), + file_task_runner_(file_task_runner), + net_error_(net::OK), + weak_factory_(this) { + if (blob_handle) { + blob_data_ = blob_handle->CreateSnapshot().Pass(); + } +} + +BlobReader::~BlobReader() { + STLDeleteValues(&index_to_reader_); +} + +BlobReader::Status BlobReader::CalculateSize( + const net::CompletionCallback& done) { + DCHECK(!total_size_calculated_); + DCHECK(size_callback_.is_null()); + if (!blob_data_.get()) { + return ReportError(net::ERR_FILE_NOT_FOUND); + } + + net_error_ = net::OK; + total_size_ = 0; + const auto& items = blob_data_->items(); + item_length_list_.resize(items.size()); + pending_get_file_info_count_ = 0; + for (size_t i = 0; i < items.size(); ++i) { + const BlobDataItem& item = *items.at(i); + if (IsFileType(item.type())) { + ++pending_get_file_info_count_; + storage::FileStreamReader* const reader = GetOrCreateFileReaderAtIndex(i); + if (!reader) { + return ReportError(net::ERR_FAILED); + } + int64_t length_output = reader->GetLength(base::Bind( + &BlobReader::DidGetFileItemLength, weak_factory_.GetWeakPtr(), i)); + if (length_output == net::ERR_IO_PENDING) { + continue; + } + if (length_output < 0) { + return ReportError(length_output); + } + // We got the length right away + --pending_get_file_info_count_; + uint64_t resolved_length; + if (!ResolveFileItemLength(item, length_output, &resolved_length)) { + return ReportError(net::ERR_FILE_NOT_FOUND); + } + if (!AddItemLength(i, resolved_length)) { + return ReportError(net::ERR_FAILED); + } + continue; + } + + if (!AddItemLength(i, item.length())) + return ReportError(net::ERR_FAILED); + } + + if (pending_get_file_info_count_ == 0) { + DidCountSize(); + return Status::DONE; + } + // Note: We only set the callback if we know that we're an async operation. + size_callback_ = done; + return Status::IO_PENDING; +} + +BlobReader::Status BlobReader::SetReadRange(uint64_t offset, uint64_t length) { + if (!blob_data_.get()) { + return ReportError(net::ERR_FILE_NOT_FOUND); + } + if (!total_size_calculated_) { + return ReportError(net::ERR_FAILED); + } + if (offset + length > total_size_) { + return ReportError(net::ERR_FILE_NOT_FOUND); + } + // Skip the initial items that are not in the range. + remaining_bytes_ = length; + const auto& items = blob_data_->items(); + for (current_item_index_ = 0; + current_item_index_ < items.size() && + offset >= item_length_list_[current_item_index_]; + ++current_item_index_) { + offset -= item_length_list_[current_item_index_]; + } + + // Set the offset that need to jump to for the first item in the range. + current_item_offset_ = offset; + if (current_item_offset_ == 0) + return Status::DONE; + + // Adjust the offset of the first stream if it is of file type. + const BlobDataItem& item = *items.at(current_item_index_); + if (IsFileType(item.type())) { + SetFileReaderAtIndex(current_item_index_, + CreateFileStreamReader(item, offset)); + } + return Status::DONE; +} + +BlobReader::Status BlobReader::Read(net::IOBuffer* buffer, + size_t dest_size, + int* bytes_read, + net::CompletionCallback done) { + DCHECK(bytes_read); + DCHECK_GE(remaining_bytes_, 0ul); + DCHECK(read_callback_.is_null()); + + *bytes_read = 0; + if (!blob_data_.get()) { + return ReportError(net::ERR_FILE_NOT_FOUND); + } + if (!total_size_calculated_) { + return ReportError(net::ERR_FAILED); + } + + // Bail out immediately if we encountered an error. + if (net_error_ != net::OK) { + return Status::NET_ERROR; + } + + DCHECK_GE(dest_size, 0ul); + if (remaining_bytes_ < static_cast<uint64_t>(dest_size)) + dest_size = static_cast<int>(remaining_bytes_); + + // If we should copy zero bytes because |remaining_bytes_| is zero, short + // circuit here. + if (!dest_size) { + *bytes_read = 0; + return Status::DONE; + } + + // Keep track of the buffer. + DCHECK(!read_buf_.get()); + read_buf_ = new net::DrainableIOBuffer(buffer, dest_size); + + Status status = ReadLoop(bytes_read); + if (status == Status::IO_PENDING) + read_callback_ = done; + return status; +} + +void BlobReader::Kill() { + DeleteCurrentFileReader(); + weak_factory_.InvalidateWeakPtrs(); +} + +bool BlobReader::IsInMemory() const { + if (!blob_data_.get()) { + return true; + } + for (const auto& item : blob_data_->items()) { + if (item->type() != DataElement::TYPE_BYTES) { + return false; + } + } + return true; +} + +void BlobReader::InvalidateCallbacksAndDone(int net_error, + net::CompletionCallback done) { + net_error_ = net_error; + weak_factory_.InvalidateWeakPtrs(); + size_callback_.Reset(); + read_callback_.Reset(); + read_buf_ = nullptr; + done.Run(net_error); +} + +BlobReader::Status BlobReader::ReportError(int net_error) { + net_error_ = net_error; + return Status::NET_ERROR; +} + +bool BlobReader::AddItemLength(size_t index, uint64_t item_length) { + if (item_length > std::numeric_limits<uint64_t>::max() - total_size_) { + return false; + } + + // Cache the size and add it to the total size. + DCHECK_LT(index, item_length_list_.size()); + item_length_list_[index] = item_length; + total_size_ += item_length; + return true; +} + +bool BlobReader::ResolveFileItemLength(const BlobDataItem& item, + int64_t total_length, + uint64_t* output_length) { + DCHECK(IsFileType(item.type())); + DCHECK(output_length); + uint64_t file_length = total_length; + uint64_t item_offset = item.offset(); + uint64_t item_length = item.length(); + if (item_offset > file_length) { + return false; + } + + uint64 max_length = file_length - item_offset; + + // If item length is undefined, then we need to use the file size being + // resolved in the real time. + if (item_length == std::numeric_limits<uint64>::max()) { + item_length = max_length; + } else if (item_length > max_length) { + return false; + } + + *output_length = item_length; + return true; +} + +void BlobReader::DidGetFileItemLength(size_t index, int64_t result) { + // Do nothing if we have encountered an error. + if (net_error_) + return; + + if (result == net::ERR_UPLOAD_FILE_CHANGED) + result = net::ERR_FILE_NOT_FOUND; + if (result < 0) { + InvalidateCallbacksAndDone(result, size_callback_); + return; + } + + const auto& items = blob_data_->items(); + DCHECK_LT(index, items.size()); + const BlobDataItem& item = *items.at(index); + uint64_t length; + if (!ResolveFileItemLength(item, result, &length)) { + InvalidateCallbacksAndDone(net::ERR_FILE_NOT_FOUND, size_callback_); + return; + } + if (!AddItemLength(index, length)) { + InvalidateCallbacksAndDone(net::ERR_FAILED, size_callback_); + return; + } + + if (--pending_get_file_info_count_ == 0) + DidCountSize(); +} + +void BlobReader::DidCountSize() { + DCHECK(!net_error_); + total_size_calculated_ = true; + remaining_bytes_ = total_size_; + // This is set only if we're async. + if (!size_callback_.is_null()) { + net::CompletionCallback done = size_callback_; + size_callback_.Reset(); + done.Run(net::OK); + } +} + +BlobReader::Status BlobReader::ReadLoop(int* bytes_read) { + // Read until we encounter an error or could not get the data immediately. + while (remaining_bytes_ > 0 && read_buf_->BytesRemaining() > 0) { + Status read_status = ReadItem(); + if (read_status == Status::DONE) { + continue; + } + return read_status; + } + + *bytes_read = BytesReadCompleted(); + return Status::DONE; +} + +BlobReader::Status BlobReader::ReadItem() { + // Are we done with reading all the blob data? + if (remaining_bytes_ == 0) + return Status::DONE; + + const auto& items = blob_data_->items(); + // If we get to the last item but still expect something to read, bail out + // since something is wrong. + if (current_item_index_ >= items.size()) { + return ReportError(net::ERR_FAILED); + } + + // Compute the bytes to read for current item. + int bytes_to_read = ComputeBytesToRead(); + + // If nothing to read for current item, advance to next item. + if (bytes_to_read == 0) { + AdvanceItem(); + return Status::DONE; + } + + // Do the reading. + const BlobDataItem& item = *items.at(current_item_index_); + if (item.type() == DataElement::TYPE_BYTES) { + ReadBytesItem(item, bytes_to_read); + return Status::DONE; + } + if (item.type() == DataElement::TYPE_DISK_CACHE_ENTRY) + return ReadDiskCacheEntryItem(item, bytes_to_read); + if (!IsFileType(item.type())) { + NOTREACHED(); + return ReportError(net::ERR_FAILED); + } + storage::FileStreamReader* const reader = + GetOrCreateFileReaderAtIndex(current_item_index_); + if (!reader) { + return ReportError(net::ERR_FAILED); + } + + return ReadFileItem(reader, bytes_to_read); +} + +void BlobReader::AdvanceItem() { + // Close the file if the current item is a file. + DeleteCurrentFileReader(); + + // Advance to the next item. + current_item_index_++; + current_item_offset_ = 0; +} + +void BlobReader::AdvanceBytesRead(int result) { + DCHECK_GT(result, 0); + + // Do we finish reading the current item? + current_item_offset_ += result; + if (current_item_offset_ == item_length_list_[current_item_index_]) + AdvanceItem(); + + // Subtract the remaining bytes. + remaining_bytes_ -= result; + DCHECK_GE(remaining_bytes_, 0ul); + + // Adjust the read buffer. + read_buf_->DidConsume(result); + DCHECK_GE(read_buf_->BytesRemaining(), 0); +} + +void BlobReader::ReadBytesItem(const BlobDataItem& item, int bytes_to_read) { + TRACE_EVENT1("Blob", "BlobReader::ReadBytesItem", "uuid", blob_data_->uuid()); + DCHECK_GE(read_buf_->BytesRemaining(), bytes_to_read); + + memcpy(read_buf_->data(), item.bytes() + item.offset() + current_item_offset_, + bytes_to_read); + + AdvanceBytesRead(bytes_to_read); +} + +BlobReader::Status BlobReader::ReadFileItem(FileStreamReader* reader, + int bytes_to_read) { + DCHECK(!io_pending_) + << "Can't begin IO while another IO operation is pending."; + DCHECK_GE(read_buf_->BytesRemaining(), bytes_to_read); + DCHECK(reader); + TRACE_EVENT_ASYNC_BEGIN1("Blob", "BlobRequest::ReadFileItem", this, "uuid", + blob_data_->uuid()); + const int result = reader->Read( + read_buf_.get(), bytes_to_read, + base::Bind(&BlobReader::DidReadFile, weak_factory_.GetWeakPtr())); + if (result >= 0) { + AdvanceBytesRead(result); + return Status::DONE; + } + if (result == net::ERR_IO_PENDING) { + io_pending_ = true; + return Status::IO_PENDING; + } + return ReportError(result); +} + +void BlobReader::DidReadFile(int result) { + TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::ReadFileItem", this, "uuid", + blob_data_->uuid()); + DidReadItem(result); +} + +void BlobReader::ContinueAsyncReadLoop() { + int bytes_read = 0; + Status read_status = ReadLoop(&bytes_read); + switch (read_status) { + case Status::DONE: { + net::CompletionCallback done = read_callback_; + read_callback_.Reset(); + done.Run(bytes_read); + return; + } + case Status::NET_ERROR: + InvalidateCallbacksAndDone(net_error_, read_callback_); + return; + case Status::IO_PENDING: + return; + } +} + +void BlobReader::DeleteCurrentFileReader() { + SetFileReaderAtIndex(current_item_index_, scoped_ptr<FileStreamReader>()); +} + +BlobReader::Status BlobReader::ReadDiskCacheEntryItem(const BlobDataItem& item, + int bytes_to_read) { + DCHECK(!io_pending_) + << "Can't begin IO while another IO operation is pending."; + TRACE_EVENT_ASYNC_BEGIN1("Blob", "BlobRequest::ReadDiskCacheItem", this, + "uuid", blob_data_->uuid()); + DCHECK_GE(read_buf_->BytesRemaining(), bytes_to_read); + + const int result = item.disk_cache_entry()->ReadData( + item.disk_cache_stream_index(), current_item_offset_, read_buf_.get(), + bytes_to_read, base::Bind(&BlobReader::DidReadDiskCacheEntry, + weak_factory_.GetWeakPtr())); + if (result >= 0) { + AdvanceBytesRead(result); + return Status::DONE; + } + if (result == net::ERR_IO_PENDING) { + io_pending_ = true; + return Status::IO_PENDING; + } + return ReportError(result); +} + +void BlobReader::DidReadDiskCacheEntry(int result) { + TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::ReadDiskCacheItem", this, "uuid", + blob_data_->uuid()); + DidReadItem(result); +} + +void BlobReader::DidReadItem(int result) { + DCHECK(io_pending_) << "Asynchronous IO completed while IO wasn't pending?"; + io_pending_ = false; + if (result <= 0) { + InvalidateCallbacksAndDone(result, read_callback_); + return; + } + AdvanceBytesRead(result); + ContinueAsyncReadLoop(); +} + +int BlobReader::BytesReadCompleted() { + int bytes_read = read_buf_->BytesConsumed(); + read_buf_ = nullptr; + return bytes_read; +} + +int BlobReader::ComputeBytesToRead() const { + uint64_t current_item_length = item_length_list_[current_item_index_]; + + uint64_t item_remaining = current_item_length - current_item_offset_; + uint64_t buf_remaining = read_buf_->BytesRemaining(); + uint64_t max_int_value = std::numeric_limits<int>::max(); + // Here we make sure we don't overflow 'max int'. + uint64_t min = std::min( + std::min(std::min(item_remaining, buf_remaining), remaining_bytes_), + max_int_value); + + return static_cast<int>(min); +} + +FileStreamReader* BlobReader::GetOrCreateFileReaderAtIndex(size_t index) { + const auto& items = blob_data_->items(); + DCHECK_LT(index, items.size()); + const BlobDataItem& item = *items.at(index); + if (!IsFileType(item.type())) + return nullptr; + auto it = index_to_reader_.find(index); + if (it != index_to_reader_.end()) { + DCHECK(it->second); + return it->second; + } + scoped_ptr<FileStreamReader> reader = CreateFileStreamReader(item, 0); + FileStreamReader* ret_value = reader.get(); + if (!ret_value) + return nullptr; + index_to_reader_[index] = reader.release(); + return ret_value; +} + +scoped_ptr<FileStreamReader> BlobReader::CreateFileStreamReader( + const BlobDataItem& item, + uint64_t additional_offset) { + DCHECK(IsFileType(item.type())); + + switch (item.type()) { + case DataElement::TYPE_FILE: + return file_stream_provider_->CreateForLocalFile( + file_task_runner_.get(), item.path(), + item.offset() + additional_offset, + item.expected_modification_time()) + .Pass(); + case DataElement::TYPE_FILE_FILESYSTEM: + return file_stream_provider_ + ->CreateFileStreamReader( + item.filesystem_url(), item.offset() + additional_offset, + item.length() == std::numeric_limits<uint64_t>::max() + ? storage::kMaximumLength + : item.length() - additional_offset, + item.expected_modification_time()) + .Pass(); + case DataElement::TYPE_BLOB: + case DataElement::TYPE_BYTES: + case DataElement::TYPE_DISK_CACHE_ENTRY: + case DataElement::TYPE_UNKNOWN: + break; + } + + NOTREACHED(); + return nullptr; +} + +void BlobReader::SetFileReaderAtIndex(size_t index, + scoped_ptr<FileStreamReader> reader) { + auto found = index_to_reader_.find(current_item_index_); + if (found != index_to_reader_.end()) { + if (found->second) { + delete found->second; + } + if (!reader.get()) { + index_to_reader_.erase(found); + return; + } + found->second = reader.release(); + } else if (reader.get()) { + index_to_reader_[current_item_index_] = reader.release(); + } +} + +} // namespace storage diff --git a/storage/browser/blob/blob_reader.h b/storage/browser/blob/blob_reader.h new file mode 100644 index 0000000..54b262f --- /dev/null +++ b/storage/browser/blob/blob_reader.h @@ -0,0 +1,190 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef STORAGE_BROWSER_BLOB_BLOB_READER_H_ +#define STORAGE_BROWSER_BLOB_BLOB_READER_H_ + +#include <stdint.h> +#include <map> +#include <vector> + +#include "base/macros.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "net/base/completion_callback.h" +#include "storage/browser/storage_browser_export.h" + +class GURL; + +namespace base { +class FilePath; +class SequencedTaskRunner; +class TaskRunner; +class Time; +} + +namespace net { +class DrainableIOBuffer; +class IOBuffer; +} + +namespace storage { +class BlobDataItem; +class BlobDataHandle; +class BlobDataSnapshot; +class FileStreamReader; +class FileSystemContext; + +// The blob reader is used to read a blob. This can only be used in the browser +// process, and we need to be on the IO thread. +// * There can only be one read happening at a time per reader. +// * If a status of Status::NET_ERROR is returned, that means there was an +// error and the net_error() variable contains the error code. +// Use a BlobDataHandle to create an instance. +class STORAGE_EXPORT BlobReader { + public: + class STORAGE_EXPORT FileStreamReaderProvider { + public: + virtual ~FileStreamReaderProvider(); + + virtual scoped_ptr<FileStreamReader> CreateForLocalFile( + base::TaskRunner* task_runner, + const base::FilePath& file_path, + int64_t initial_offset, + const base::Time& expected_modification_time) = 0; + + virtual scoped_ptr<FileStreamReader> CreateFileStreamReader( + const GURL& filesystem_url, + int64_t offset, + int64_t max_bytes_to_read, + const base::Time& expected_modification_time) = 0; + }; + enum class Status { NET_ERROR, IO_PENDING, DONE }; + virtual ~BlobReader(); + + // This calculates the total size of the blob, and initializes the reading + // cursor. + // * This should only be called once per reader. + // * Status::Done means that the total_size() value is populated and you can + // continue to SetReadRange or Read. + // * The 'done' callback is only called if Status::IO_PENDING is returned. + // The callback value contains the error code or net::OK. Please use the + // total_size() value to query the blob size, as it's uint64_t. + Status CalculateSize(const net::CompletionCallback& done); + + // Used to set the read position. + // * This should be called after CalculateSize and before Read. + // * Range can only be set once. + Status SetReadRange(uint64_t position, uint64_t length); + + // Reads a portion of the data. + // * CalculateSize (and optionally SetReadRange) must be called beforehand. + // * bytes_read is populated only if Status::DONE is returned. Otherwise the + // bytes read (or error code) is populated in the 'done' callback. + // * The done callback is only called if Status::IO_PENDING is returned. + // * This method can be called multiple times. A bytes_read value (either from + // the callback for Status::IO_PENDING or the bytes_read value for + // Status::DONE) of 0 means we're finished reading. + Status Read(net::IOBuffer* buffer, + size_t dest_size, + int* bytes_read, + net::CompletionCallback done); + + // Kills reading and invalidates all callbacks. The reader cannot be used + // after this call. + void Kill(); + + // Returns if all of the blob's items are in memory. + bool IsInMemory() const; + + // Returns the remaining bytes to be read in the blob. This is populated + // after CalculateSize, and is modified by SetReadRange. + uint64_t remaining_bytes() const { return remaining_bytes_; } + + // Returns the net error code if there was an error. Defaults to net::OK. + int net_error() const { return net_error_; } + + // Returns the total size of the blob. This is populated after CalculateSize + // is called. + uint64_t total_size() const { return total_size_; } + + protected: + friend class BlobDataHandle; + friend class BlobReaderTest; + + BlobReader(const BlobDataHandle* blob_handle, + scoped_ptr<FileStreamReaderProvider> file_stream_provider, + base::SequencedTaskRunner* file_task_runner); + + bool total_size_calculated() const { return total_size_calculated_; } + + private: + Status ReportError(int net_error); + void InvalidateCallbacksAndDone(int net_error, net::CompletionCallback done); + + bool AddItemLength(size_t index, uint64_t length); + bool ResolveFileItemLength(const BlobDataItem& item, + int64_t total_length, + uint64_t* output_length); + void DidGetFileItemLength(size_t index, int64_t result); + void DidCountSize(); + + // For reading the blob. + // Returns if we're done, PENDING_IO if we're waiting on async. + Status ReadLoop(int* bytes_read); + // Called from asynchronously called methods to continue the read loop. + void ContinueAsyncReadLoop(); + // PENDING_IO means we're waiting on async. + Status ReadItem(); + void AdvanceItem(); + void AdvanceBytesRead(int result); + void ReadBytesItem(const BlobDataItem& item, int bytes_to_read); + BlobReader::Status ReadFileItem(FileStreamReader* reader, int bytes_to_read); + void DidReadFile(int result); + void DeleteCurrentFileReader(); + Status ReadDiskCacheEntryItem(const BlobDataItem& item, int bytes_to_read); + void DidReadDiskCacheEntry(int result); + void DidReadItem(int result); + int ComputeBytesToRead() const; + int BytesReadCompleted(); + + // Returns a FileStreamReader for a blob item at |index|. + // If the item at |index| is not of file this returns NULL. + FileStreamReader* GetOrCreateFileReaderAtIndex(size_t index); + // If the reader is null, then this basically performs a delete operation. + void SetFileReaderAtIndex(size_t index, scoped_ptr<FileStreamReader> reader); + // Creates a FileStreamReader for the item with additional_offset. + scoped_ptr<FileStreamReader> CreateFileStreamReader( + const BlobDataItem& item, + uint64_t additional_offset); + + scoped_ptr<BlobDataSnapshot> blob_data_; + scoped_ptr<FileStreamReaderProvider> file_stream_provider_; + scoped_refptr<base::SequencedTaskRunner> file_task_runner_; + + int net_error_; + bool item_list_populated_ = false; + std::vector<uint64_t> item_length_list_; + + scoped_refptr<net::DrainableIOBuffer> read_buf_; + + bool total_size_calculated_ = false; + uint64_t total_size_ = 0; + uint64_t remaining_bytes_ = 0; + size_t pending_get_file_info_count_ = 0; + std::map<size_t, FileStreamReader*> index_to_reader_; + size_t current_item_index_ = 0; + uint64_t current_item_offset_ = 0; + + bool io_pending_ = false; + + net::CompletionCallback size_callback_; + net::CompletionCallback read_callback_; + + base::WeakPtrFactory<BlobReader> weak_factory_; + DISALLOW_COPY_AND_ASSIGN(BlobReader); +}; + +} // namespace storage +#endif // STORAGE_BROWSER_BLOB_BLOB_READER_H_ diff --git a/storage/browser/blob/blob_storage_context.cc b/storage/browser/blob/blob_storage_context.cc index d08ccb3..7114696 100644 --- a/storage/browser/blob/blob_storage_context.cc +++ b/storage/browser/blob/blob_storage_context.cc @@ -77,9 +77,9 @@ scoped_ptr<BlobDataHandle> BlobStorageContext::GetBlobDataFromUUID( if (entry->flags & EXCEEDED_MEMORY) return result.Pass(); DCHECK(!entry->IsBeingBuilt()); - result.reset( - new BlobDataHandle(uuid, this, - base::ThreadTaskRunnerHandle::Get().get())); + result.reset(new BlobDataHandle(uuid, entry->data->content_type(), + entry->data->content_disposition(), this, + base::ThreadTaskRunnerHandle::Get().get())); return result.Pass(); } diff --git a/storage/browser/blob/blob_url_request_job.cc b/storage/browser/blob/blob_url_request_job.cc index f429020..6ec2fe5 100644 --- a/storage/browser/blob/blob_url_request_job.cc +++ b/storage/browser/blob/blob_url_request_job.cc @@ -31,6 +31,8 @@ #include "net/url_request/url_request_context.h" #include "net/url_request/url_request_error_job.h" #include "net/url_request/url_request_status.h" +#include "storage/browser/blob/blob_data_handle.h" +#include "storage/browser/blob/blob_reader.h" #include "storage/browser/fileapi/file_stream_reader.h" #include "storage/browser/fileapi/file_system_context.h" #include "storage/browser/fileapi/file_system_url.h" @@ -38,41 +40,24 @@ namespace storage { -namespace { - -bool IsFileType(DataElement::Type type) { - switch (type) { - case DataElement::TYPE_FILE: - case DataElement::TYPE_FILE_FILESYSTEM: - return true; - default: - return false; - } -} - -} // namespace - BlobURLRequestJob::BlobURLRequestJob( net::URLRequest* request, net::NetworkDelegate* network_delegate, - scoped_ptr<BlobDataSnapshot> blob_data, - storage::FileSystemContext* file_system_context, + BlobDataHandle* blob_handle, + FileSystemContext* file_system_context, base::SingleThreadTaskRunner* file_task_runner) : net::URLRequestJob(request, network_delegate), - blob_data_(blob_data.Pass()), - file_system_context_(file_system_context), - file_task_runner_(file_task_runner), - total_size_(0), - remaining_bytes_(0), - pending_get_file_info_count_(0), - current_item_index_(0), - current_item_offset_(0), error_(false), byte_range_set_(false), weak_factory_(this) { TRACE_EVENT_ASYNC_BEGIN1("Blob", "BlobRequest", this, "uuid", - blob_data_ ? blob_data_->uuid() : "NotFound"); - DCHECK(file_task_runner_.get()); + blob_handle ? blob_handle->uuid() : "NotFound"); + DCHECK(file_task_runner); + if (blob_handle) { + blob_handle_.reset(new BlobDataHandle(*blob_handle)); + blob_reader_ = + blob_handle_->CreateReader(file_system_context, file_task_runner); + } } void BlobURLRequestJob::Start() { @@ -83,8 +68,9 @@ void BlobURLRequestJob::Start() { } void BlobURLRequestJob::Kill() { - DeleteCurrentFileReader(); - + if (blob_reader_) { + blob_reader_->Kill(); + } net::URLRequestJob::Kill(); weak_factory_.InvalidateWeakPtrs(); } @@ -92,9 +78,10 @@ void BlobURLRequestJob::Kill() { bool BlobURLRequestJob::ReadRawData(net::IOBuffer* dest, int dest_size, int* bytes_read) { + TRACE_EVENT_ASYNC_BEGIN1("Blob", "BlobRequest::ReadRawData", this, "uuid", + blob_handle_ ? blob_handle_->uuid() : "NotFound"); DCHECK_NE(dest_size, 0); DCHECK(bytes_read); - DCHECK_GE(remaining_bytes_, 0); // Bail out immediately if we encounter an error. if (error_) { @@ -102,21 +89,27 @@ bool BlobURLRequestJob::ReadRawData(net::IOBuffer* dest, return true; } - if (remaining_bytes_ < dest_size) - dest_size = static_cast<int>(remaining_bytes_); + BlobReader::Status read_status = + blob_reader_->Read(dest, dest_size, bytes_read, + base::Bind(&BlobURLRequestJob::DidReadRawData, + weak_factory_.GetWeakPtr())); - // If we should copy zero bytes because |remaining_bytes_| is zero, short - // circuit here. - if (!dest_size) { - *bytes_read = 0; - return true; + switch (read_status) { + case BlobReader::Status::NET_ERROR: + NotifyFailure(blob_reader_->net_error()); + TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::ReadRawData", this, "uuid", + blob_handle_ ? blob_handle_->uuid() : "NotFound"); + return false; + case BlobReader::Status::IO_PENDING: + SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0)); + return false; + case BlobReader::Status::DONE: + TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::ReadRawData", this, "uuid", + blob_handle_ ? blob_handle_->uuid() : "NotFound"); + return true; } - - // Keep track of the buffer. - DCHECK(!read_buf_.get()); - read_buf_ = new net::DrainableIOBuffer(dest, dest_size); - - return ReadLoop(bytes_read); + NOTREACHED(); + return true; } bool BlobURLRequestJob::GetMimeType(std::string* mime_type) const { @@ -159,13 +152,11 @@ void BlobURLRequestJob::SetExtraRequestHeaders( } BlobURLRequestJob::~BlobURLRequestJob() { - STLDeleteValues(&index_to_reader_); TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest", this, "uuid", - blob_data_ ? blob_data_->uuid() : "NotFound"); + blob_handle_ ? blob_handle_->uuid() : "NotFound"); } void BlobURLRequestJob::DidStart() { - current_file_chunk_number_ = 0; error_ = false; // We only support GET request per the spec. @@ -175,369 +166,69 @@ void BlobURLRequestJob::DidStart() { } // If the blob data is not present, bail out. - if (!blob_data_) { + if (!blob_handle_) { NotifyFailure(net::ERR_FILE_NOT_FOUND); return; } - CountSize(); -} - -bool BlobURLRequestJob::AddItemLength(size_t index, int64 item_length) { - if (item_length > kint64max - total_size_) { - TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::CountSize", this, "uuid", - blob_data_->uuid()); - NotifyFailure(net::ERR_FAILED); - return false; - } - - // Cache the size and add it to the total size. - DCHECK_LT(index, item_length_list_.size()); - item_length_list_[index] = item_length; - total_size_ += item_length; - return true; -} - -bool BlobURLRequestJob::CountSize() { TRACE_EVENT_ASYNC_BEGIN1("Blob", "BlobRequest::CountSize", this, "uuid", - blob_data_->uuid()); - pending_get_file_info_count_ = 0; - total_size_ = 0; - const auto& items = blob_data_->items(); - item_length_list_.resize(items.size()); - - for (size_t i = 0; i < items.size(); ++i) { - const BlobDataItem& item = *items.at(i); - if (IsFileType(item.type())) { - ++pending_get_file_info_count_; - storage::FileStreamReader* const reader = GetFileStreamReader(i); - if (!reader) { - NotifyFailure(net::ERR_FAILED); - return false; - } - if (!reader->GetLength( - base::Bind(&BlobURLRequestJob::DidGetFileItemLength, - weak_factory_.GetWeakPtr(), i))) { - NotifyFailure(net::ERR_FILE_NOT_FOUND); - return false; - } - continue; - } - - if (!AddItemLength(i, item.length())) - return false; + blob_handle_->uuid()); + BlobReader::Status size_status = blob_reader_->CalculateSize(base::Bind( + &BlobURLRequestJob::DidCalculateSize, weak_factory_.GetWeakPtr())); + switch (size_status) { + case BlobReader::Status::NET_ERROR: + NotifyFailure(blob_reader_->net_error()); + return; + case BlobReader::Status::IO_PENDING: + SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0)); + return; + case BlobReader::Status::DONE: + DidCalculateSize(net::OK); + return; } - - if (pending_get_file_info_count_ == 0) - DidCountSize(net::OK); - - return true; } -void BlobURLRequestJob::DidCountSize(int error) { - DCHECK(!error_); +void BlobURLRequestJob::DidCalculateSize(int result) { TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::CountSize", this, "uuid", - blob_data_->uuid()); + blob_handle_->uuid()); + // Clear the IO_PENDING status + SetStatus(net::URLRequestStatus()); - // If an error occured, bail out. - if (error != net::OK) { - NotifyFailure(error); + if (result != net::OK) { + NotifyFailure(result); return; } // Apply the range requirement. - if (!byte_range_.ComputeBounds(total_size_)) { + if (!byte_range_.ComputeBounds(blob_reader_->total_size())) { NotifyFailure(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE); return; } - remaining_bytes_ = base::checked_cast<int64>( + DCHECK_LE(byte_range_.first_byte_position(), + byte_range_.last_byte_position() + 1); + uint64_t length = base::checked_cast<uint64_t>( byte_range_.last_byte_position() - byte_range_.first_byte_position() + 1); - DCHECK_GE(remaining_bytes_, 0); - // Do the seek at the beginning of the request. - if (byte_range_.first_byte_position()) - Seek(byte_range_.first_byte_position()); + if (byte_range_set_) + blob_reader_->SetReadRange(byte_range_.first_byte_position(), length); - NotifySuccess(); -} - -void BlobURLRequestJob::DidGetFileItemLength(size_t index, int64 result) { - // Do nothing if we have encountered an error. - if (error_) - return; - - if (result == net::ERR_UPLOAD_FILE_CHANGED) { - NotifyFailure(net::ERR_FILE_NOT_FOUND); - return; - } else if (result < 0) { - NotifyFailure(result); - return; - } - - const auto& items = blob_data_->items(); - DCHECK_LT(index, items.size()); - const BlobDataItem& item = *items.at(index); - DCHECK(IsFileType(item.type())); - - uint64 file_length = result; - uint64 item_offset = item.offset(); - uint64 item_length = item.length(); - - if (item_offset > file_length) { - NotifyFailure(net::ERR_FILE_NOT_FOUND); - return; - } - - uint64 max_length = file_length - item_offset; - - // If item length is undefined, then we need to use the file size being - // resolved in the real time. - if (item_length == std::numeric_limits<uint64>::max()) { - item_length = max_length; - } else if (item_length > max_length) { - NotifyFailure(net::ERR_FILE_NOT_FOUND); - return; - } - - if (!AddItemLength(index, item_length)) - return; - - if (--pending_get_file_info_count_ == 0) - DidCountSize(net::OK); -} - -void BlobURLRequestJob::Seek(int64 offset) { - // Skip the initial items that are not in the range. - const auto& items = blob_data_->items(); - for (current_item_index_ = 0; - current_item_index_ < items.size() && - offset >= item_length_list_[current_item_index_]; - ++current_item_index_) { - offset -= item_length_list_[current_item_index_]; - } - - // Set the offset that need to jump to for the first item in the range. - current_item_offset_ = offset; - - if (offset == 0) - return; - - // Adjust the offset of the first stream if it is of file type. - const BlobDataItem& item = *items.at(current_item_index_); - if (IsFileType(item.type())) { - DeleteCurrentFileReader(); - CreateFileStreamReader(current_item_index_, offset); - } -} - -bool BlobURLRequestJob::ReadItem() { - // Are we done with reading all the blob data? - if (remaining_bytes_ == 0) - return true; - - const auto& items = blob_data_->items(); - // If we get to the last item but still expect something to read, bail out - // since something is wrong. - if (current_item_index_ >= items.size()) { - NotifyFailure(net::ERR_FAILED); - return false; - } - - // Compute the bytes to read for current item. - int bytes_to_read = ComputeBytesToRead(); - - // If nothing to read for current item, advance to next item. - if (bytes_to_read == 0) { - AdvanceItem(); - return true; - } - - // Do the reading. - const BlobDataItem& item = *items.at(current_item_index_); - if (item.type() == DataElement::TYPE_BYTES) - return ReadBytesItem(item, bytes_to_read); - if (item.type() == DataElement::TYPE_DISK_CACHE_ENTRY) - return ReadDiskCacheEntryItem(item, bytes_to_read); - if (!IsFileType(item.type())) { - NOTREACHED(); - return false; - } - storage::FileStreamReader* const reader = - GetFileStreamReader(current_item_index_); - if (!reader) { - NotifyFailure(net::ERR_FAILED); - return false; - } - - return ReadFileItem(reader, bytes_to_read); -} - -void BlobURLRequestJob::AdvanceItem() { - // Close the file if the current item is a file. - DeleteCurrentFileReader(); - - // Advance to the next item. - current_item_index_++; - current_item_offset_ = 0; -} - -void BlobURLRequestJob::AdvanceBytesRead(int result) { - DCHECK_GT(result, 0); - - // Do we finish reading the current item? - current_item_offset_ += result; - if (current_item_offset_ == item_length_list_[current_item_index_]) - AdvanceItem(); - - // Subtract the remaining bytes. - remaining_bytes_ -= result; - DCHECK_GE(remaining_bytes_, 0); - - // Adjust the read buffer. - read_buf_->DidConsume(result); - DCHECK_GE(read_buf_->BytesRemaining(), 0); -} - -bool BlobURLRequestJob::ReadBytesItem(const BlobDataItem& item, - int bytes_to_read) { - TRACE_EVENT1("Blob", "BlobRequest::ReadBytesItem", "uuid", - blob_data_->uuid()); - DCHECK_GE(read_buf_->BytesRemaining(), bytes_to_read); - - memcpy(read_buf_->data(), - item.bytes() + item.offset() + current_item_offset_, - bytes_to_read); - - AdvanceBytesRead(bytes_to_read); - return true; -} - -bool BlobURLRequestJob::ReadFileItem(FileStreamReader* reader, - int bytes_to_read) { - DCHECK(!GetStatus().is_io_pending()) - << "Can't begin IO while another IO operation is pending."; - DCHECK_GE(read_buf_->BytesRemaining(), bytes_to_read); - DCHECK(reader); - int chunk_number = current_file_chunk_number_++; - TRACE_EVENT_ASYNC_BEGIN1("Blob", "BlobRequest::ReadFileItem", this, "uuid", - blob_data_->uuid()); - const int result = - reader->Read(read_buf_.get(), bytes_to_read, - base::Bind(&BlobURLRequestJob::DidReadFile, - weak_factory_.GetWeakPtr(), chunk_number)); - if (result >= 0) { - AdvanceBytesRead(result); - return true; - } - if (result == net::ERR_IO_PENDING) - SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0)); - else - NotifyFailure(result); - return false; -} - -void BlobURLRequestJob::DidReadFile(int chunk_number, int result) { - DCHECK(GetStatus().is_io_pending()) - << "Asynchronous IO completed while IO wasn't pending?"; - TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::ReadFileItem", this, "uuid", - blob_data_->uuid()); - if (result <= 0) { - NotifyFailure(result); - return; - } - SetStatus(net::URLRequestStatus()); // Clear the IO_PENDING status - - AdvanceBytesRead(result); - - // Otherwise, continue the reading. - int bytes_read = 0; - if (ReadLoop(&bytes_read)) - NotifyReadComplete(bytes_read); -} - -void BlobURLRequestJob::DeleteCurrentFileReader() { - IndexToReaderMap::iterator found = index_to_reader_.find(current_item_index_); - if (found != index_to_reader_.end() && found->second) { - delete found->second; - index_to_reader_.erase(found); - } -} - -bool BlobURLRequestJob::ReadDiskCacheEntryItem(const BlobDataItem& item, - int bytes_to_read) { - DCHECK(!GetStatus().is_io_pending()) - << "Can't begin IO while another IO operation is pending."; - DCHECK_GE(read_buf_->BytesRemaining(), bytes_to_read); - - const int result = item.disk_cache_entry()->ReadData( - item.disk_cache_stream_index(), current_item_offset_, read_buf_.get(), - bytes_to_read, base::Bind(&BlobURLRequestJob::DidReadDiskCacheEntry, - weak_factory_.GetWeakPtr())); - if (result >= 0) { - AdvanceBytesRead(result); - return true; - } - if (result == net::ERR_IO_PENDING) - SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0)); - else - NotifyFailure(result); - return false; + net::HttpStatusCode status_code = net::HTTP_OK; + if (byte_range_set_ && byte_range_.IsValid()) + status_code = net::HTTP_PARTIAL_CONTENT; + HeadersCompleted(status_code); } -void BlobURLRequestJob::DidReadDiskCacheEntry(int result) { - DCHECK(GetStatus().is_io_pending()) - << "Asynchronous IO completed while IO wasn't pending?"; - if (result <= 0) { +void BlobURLRequestJob::DidReadRawData(int result) { + TRACE_EVENT_ASYNC_END1("Blob", "BlobRequest::ReadRawData", this, "uuid", + blob_handle_ ? blob_handle_->uuid() : "NotFound"); + if (result < 0) { NotifyFailure(result); return; } + // Clear the IO_PENDING status SetStatus(net::URLRequestStatus()); - - AdvanceBytesRead(result); - - int bytes_read = 0; - if (ReadLoop(&bytes_read)) - NotifyReadComplete(bytes_read); -} - -int BlobURLRequestJob::BytesReadCompleted() { - int bytes_read = read_buf_->BytesConsumed(); - read_buf_ = NULL; - return bytes_read; -} - -int BlobURLRequestJob::ComputeBytesToRead() const { - int64 current_item_length = item_length_list_[current_item_index_]; - - int64 item_remaining = current_item_length - current_item_offset_; - int64 buf_remaining = read_buf_->BytesRemaining(); - int64 max_remaining = std::numeric_limits<int>::max(); - - int64 min = std::min(std::min(std::min(item_remaining, - buf_remaining), - remaining_bytes_), - max_remaining); - - return static_cast<int>(min); -} - -bool BlobURLRequestJob::ReadLoop(int* bytes_read) { - // Read until we encounter an error or could not get the data immediately. - while (remaining_bytes_ > 0 && read_buf_->BytesRemaining() > 0) { - if (!ReadItem()) - return false; - } - - *bytes_read = BytesReadCompleted(); - return true; -} - -void BlobURLRequestJob::NotifySuccess() { - net::HttpStatusCode status_code = net::HTTP_OK; - if (byte_range_set_ && byte_range_.IsValid()) - status_code = net::HTTP_PARTIAL_CONTENT; - HeadersCompleted(status_code); + NotifyReadComplete(result); } void BlobURLRequestJob::NotifyFailure(int error_code) { @@ -546,8 +237,8 @@ void BlobURLRequestJob::NotifyFailure(int error_code) { // If we already return the headers on success, we can't change the headers // now. Instead, we just error out. if (response_info_) { - NotifyDone(net::URLRequestStatus(net::URLRequestStatus::FAILED, - error_code)); + NotifyDone( + net::URLRequestStatus(net::URLRequestStatus::FAILED, error_code)); return; } @@ -582,10 +273,14 @@ void BlobURLRequestJob::HeadersCompleted(net::HttpStatusCode status_code) { status.append("\0\0", 2); net::HttpResponseHeaders* headers = new net::HttpResponseHeaders(status); + set_expected_content_size(0); + if (status_code == net::HTTP_OK || status_code == net::HTTP_PARTIAL_CONTENT) { + set_expected_content_size(blob_reader_->remaining_bytes()); std::string content_length_header(net::HttpRequestHeaders::kContentLength); content_length_header.append(": "); - content_length_header.append(base::Int64ToString(remaining_bytes_)); + content_length_header.append( + base::Int64ToString(blob_reader_->remaining_bytes())); headers->AddHeader(content_length_header); if (status_code == net::HTTP_PARTIAL_CONTENT) { DCHECK(byte_range_set_); @@ -593,21 +288,22 @@ void BlobURLRequestJob::HeadersCompleted(net::HttpStatusCode status_code) { std::string content_range_header(net::HttpResponseHeaders::kContentRange); content_range_header.append(": bytes "); content_range_header.append(base::StringPrintf( - "%" PRId64 "-%" PRId64, - byte_range_.first_byte_position(), byte_range_.last_byte_position())); + "%" PRId64 "-%" PRId64, byte_range_.first_byte_position(), + byte_range_.last_byte_position())); content_range_header.append("/"); - content_range_header.append(base::StringPrintf("%" PRId64, total_size_)); + content_range_header.append( + base::StringPrintf("%" PRId64, blob_reader_->total_size())); headers->AddHeader(content_range_header); } - if (!blob_data_->content_type().empty()) { + if (!blob_handle_->content_type().empty()) { std::string content_type_header(net::HttpRequestHeaders::kContentType); content_type_header.append(": "); - content_type_header.append(blob_data_->content_type()); + content_type_header.append(blob_handle_->content_type()); headers->AddHeader(content_type_header); } - if (!blob_data_->content_disposition().empty()) { + if (!blob_handle_->content_disposition().empty()) { std::string content_disposition_header("Content-Disposition: "); - content_disposition_header.append(blob_data_->content_disposition()); + content_disposition_header.append(blob_handle_->content_disposition()); headers->AddHeader(content_disposition_header); } } @@ -615,69 +311,7 @@ void BlobURLRequestJob::HeadersCompleted(net::HttpStatusCode status_code) { response_info_.reset(new net::HttpResponseInfo()); response_info_->headers = headers; - set_expected_content_size(remaining_bytes_); - NotifyHeadersComplete(); } -FileStreamReader* BlobURLRequestJob::GetFileStreamReader(size_t index) { - const auto& items = blob_data_->items(); - DCHECK_LT(index, items.size()); - const BlobDataItem& item = *items.at(index); - if (!IsFileType(item.type())) - return nullptr; - if (index_to_reader_.find(index) == index_to_reader_.end()) { - if (!CreateFileStreamReader(index, 0)) - return nullptr; - } - DCHECK(index_to_reader_[index]); - return index_to_reader_[index]; -} - -bool BlobURLRequestJob::CreateFileStreamReader(size_t index, - int64 additional_offset) { - const auto& items = blob_data_->items(); - DCHECK_LT(index, items.size()); - const BlobDataItem& item = *items.at(index); - DCHECK(IsFileType(item.type())); - DCHECK_EQ(0U, index_to_reader_.count(index)); - - FileStreamReader* reader = nullptr; - switch (item.type()) { - case DataElement::TYPE_FILE: - reader = FileStreamReader::CreateForLocalFile( - file_task_runner_.get(), item.path(), - item.offset() + additional_offset, item.expected_modification_time()); - DCHECK(reader); - index_to_reader_[index] = reader; - return true; - - case DataElement::TYPE_FILE_FILESYSTEM: - reader = file_system_context_ - ->CreateFileStreamReader( - storage::FileSystemURL(file_system_context_->CrackURL( - item.filesystem_url())), - item.offset() + additional_offset, - item.length() == std::numeric_limits<uint64>::max() - ? storage::kMaximumLength - : item.length() - additional_offset, - item.expected_modification_time()) - .release(); - if (reader) { - index_to_reader_[index] = reader; - return true; - } - - // The file stream reader may not be obtainable if the file is on an - // isolated file system, which has been unmounted. - return false; - - default: - break; - } - - NOTREACHED(); - return false; -} - } // namespace storage diff --git a/storage/browser/blob/blob_url_request_job.h b/storage/browser/blob/blob_url_request_job.h index 74d07ad..21baa2c 100644 --- a/storage/browser/blob/blob_url_request_job.h +++ b/storage/browser/blob/blob_url_request_job.h @@ -13,7 +13,6 @@ #include "net/http/http_byte_range.h" #include "net/http/http_status_code.h" #include "net/url_request/url_request_job.h" -#include "storage/browser/blob/blob_data_snapshot.h" #include "storage/browser/storage_browser_export.h" namespace base { @@ -27,6 +26,8 @@ class IOBuffer; namespace storage { +class BlobDataHandle; +class BlobReader; class FileStreamReader; class FileSystemContext; @@ -36,7 +37,7 @@ class STORAGE_EXPORT BlobURLRequestJob public: BlobURLRequestJob(net::URLRequest* request, net::NetworkDelegate* network_delegate, - scoped_ptr<BlobDataSnapshot> blob_data, + BlobDataHandle* blob_handle, storage::FileSystemContext* file_system_context, base::SingleThreadTaskRunner* resolving_thread_task_runner); @@ -57,68 +58,20 @@ class STORAGE_EXPORT BlobURLRequestJob // For preparing for read: get the size, apply the range and perform seek. void DidStart(); - bool AddItemLength(size_t index, int64 item_length); - bool CountSize(); - void DidCountSize(int error); - void DidGetFileItemLength(size_t index, int64 result); - void Seek(int64 offset); - - // For reading the blob. - bool ReadLoop(int* bytes_read); - bool ReadItem(); - void AdvanceItem(); - void AdvanceBytesRead(int result); - bool ReadBytesItem(const BlobDataItem& item, int bytes_to_read); - - bool ReadFileItem(FileStreamReader* reader, int bytes_to_read); - void DidReadFile(int chunk_number, int result); - void DeleteCurrentFileReader(); - - bool ReadDiskCacheEntryItem(const BlobDataItem& item, int bytes_to_read); - void DidReadDiskCacheEntry(int result); - - int ComputeBytesToRead() const; - int BytesReadCompleted(); - - // These methods convert the result of blob data reading into response headers - // and pass it to URLRequestJob's NotifyDone() or NotifyHeadersComplete(). - void NotifySuccess(); + void DidCalculateSize(int result); + void DidReadRawData(int result); + void NotifyFailure(int); void HeadersCompleted(net::HttpStatusCode status_code); - // Returns a FileStreamReader for a blob item at |index|. - // If the item at |index| is not of file this returns NULL. - FileStreamReader* GetFileStreamReader(size_t index); - - // Creates a FileStreamReader for the item at |index| with additional_offset. - // If failed, then returns false. - bool CreateFileStreamReader(size_t index, int64 additional_offset); - - scoped_ptr<BlobDataSnapshot> blob_data_; - - // Variables for controlling read from |blob_data_|. - scoped_refptr<storage::FileSystemContext> file_system_context_; - scoped_refptr<base::SingleThreadTaskRunner> file_task_runner_; - std::vector<int64> item_length_list_; - int64 total_size_; - int64 remaining_bytes_; - int pending_get_file_info_count_; - IndexToReaderMap index_to_reader_; - size_t current_item_index_; - int64 current_item_offset_; - - // Holds the buffer for read data with the IOBuffer interface. - scoped_refptr<net::DrainableIOBuffer> read_buf_; - // Is set when NotifyFailure() is called and reset when DidStart is called. bool error_; bool byte_range_set_; net::HttpByteRange byte_range_; - // Used to create unique id's for tracing. - int current_file_chunk_number_; - + scoped_ptr<BlobDataHandle> blob_handle_; + scoped_ptr<BlobReader> blob_reader_; scoped_ptr<net::HttpResponseInfo> response_info_; base::WeakPtrFactory<BlobURLRequestJob> weak_factory_; diff --git a/storage/browser/blob/blob_url_request_job_factory.cc b/storage/browser/blob/blob_url_request_job_factory.cc index feb5df0..5961697 100644 --- a/storage/browser/blob/blob_url_request_job_factory.cc +++ b/storage/browser/blob/blob_url_request_job_factory.cc @@ -19,10 +19,6 @@ namespace { int kUserDataKey; // The value is not important, the addr is a key. -BlobDataHandle* GetRequestedBlobDataHandle(net::URLRequest* request) { - return static_cast<BlobDataHandle*>(request->GetUserData(&kUserDataKey)); -} - } // namespace // static @@ -44,6 +40,12 @@ void BlobProtocolHandler::SetRequestedBlobDataHandle( request->SetUserData(&kUserDataKey, blob_data_handle.release()); } +// static +BlobDataHandle* BlobProtocolHandler::GetRequestBlobDataHandle( + net::URLRequest* request) { + return static_cast<BlobDataHandle*>(request->GetUserData(&kUserDataKey)); +} + BlobProtocolHandler::BlobProtocolHandler( BlobStorageContext* context, storage::FileSystemContext* file_system_context, @@ -59,18 +61,16 @@ BlobProtocolHandler::~BlobProtocolHandler() { net::URLRequestJob* BlobProtocolHandler::MaybeCreateJob( net::URLRequest* request, net::NetworkDelegate* network_delegate) const { - return new storage::BlobURLRequestJob(request, - network_delegate, - LookupBlobData(request), - file_system_context_.get(), - file_task_runner_.get()); + return new storage::BlobURLRequestJob( + request, network_delegate, LookupBlobHandle(request), + file_system_context_.get(), file_task_runner_.get()); } -scoped_ptr<BlobDataSnapshot> BlobProtocolHandler::LookupBlobData( +BlobDataHandle* BlobProtocolHandler::LookupBlobHandle( net::URLRequest* request) const { - BlobDataHandle* blob_data_handle = GetRequestedBlobDataHandle(request); + BlobDataHandle* blob_data_handle = GetRequestBlobDataHandle(request); if (blob_data_handle) - return blob_data_handle->CreateSnapshot().Pass(); + return blob_data_handle; if (!context_.get()) return NULL; @@ -83,12 +83,11 @@ scoped_ptr<BlobDataSnapshot> BlobProtocolHandler::LookupBlobData( return NULL; std::string uuid = request->url().spec().substr(kPrefix.length()); scoped_ptr<BlobDataHandle> handle = context_->GetBlobDataFromUUID(uuid); - scoped_ptr<BlobDataSnapshot> snapshot; + BlobDataHandle* handle_ptr = handle.get(); if (handle) { - snapshot = handle->CreateSnapshot().Pass(); SetRequestedBlobDataHandle(request, handle.Pass()); } - return snapshot.Pass(); + return handle_ptr; } } // namespace storage diff --git a/storage/browser/blob/blob_url_request_job_factory.h b/storage/browser/blob/blob_url_request_job_factory.h index 7f7a550..dcb2fd6 100644 --- a/storage/browser/blob/blob_url_request_job_factory.h +++ b/storage/browser/blob/blob_url_request_job_factory.h @@ -26,7 +26,6 @@ class URLRequestContext; namespace storage { -class BlobDataSnapshot; class BlobDataHandle; class BlobStorageContext; @@ -45,6 +44,9 @@ class STORAGE_EXPORT BlobProtocolHandler net::URLRequest* request, scoped_ptr<BlobDataHandle> blob_data_handle); + // This gets the handle on the request if it exists. + static BlobDataHandle* GetRequestBlobDataHandle(net::URLRequest* request); + BlobProtocolHandler( BlobStorageContext* context, storage::FileSystemContext* file_system_context, @@ -56,7 +58,7 @@ class STORAGE_EXPORT BlobProtocolHandler net::NetworkDelegate* network_delegate) const override; private: - scoped_ptr<BlobDataSnapshot> LookupBlobData(net::URLRequest* request) const; + BlobDataHandle* LookupBlobHandle(net::URLRequest* request) const; base::WeakPtr<BlobStorageContext> context_; const scoped_refptr<storage::FileSystemContext> file_system_context_; diff --git a/storage/browser/blob/upload_blob_element_reader.cc b/storage/browser/blob/upload_blob_element_reader.cc new file mode 100644 index 0000000..dd0058f --- /dev/null +++ b/storage/browser/blob/upload_blob_element_reader.cc @@ -0,0 +1,67 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "storage/browser/blob/upload_blob_element_reader.h" + +#include "net/base/net_errors.h" +#include "storage/browser/blob/blob_data_handle.h" +#include "storage/browser/blob/blob_reader.h" + +namespace storage { + +UploadBlobElementReader::UploadBlobElementReader( + scoped_ptr<storage::BlobReader> reader, + scoped_ptr<BlobDataHandle> handle) + : reader_(reader.Pass()), handle_(handle.Pass()) {} + +UploadBlobElementReader::~UploadBlobElementReader() {} + +int UploadBlobElementReader::Init(const net::CompletionCallback& callback) { + BlobReader::Status status = reader_->CalculateSize(callback); + switch (status) { + case BlobReader::Status::NET_ERROR: + return reader_->net_error(); + case BlobReader::Status::IO_PENDING: + return net::ERR_IO_PENDING; + case BlobReader::Status::DONE: + return net::OK; + } + NOTREACHED(); + return net::ERR_FAILED; +} + +uint64_t UploadBlobElementReader::GetContentLength() const { + return reader_->total_size(); +} + +uint64_t UploadBlobElementReader::BytesRemaining() const { + return reader_->remaining_bytes(); +} + +bool UploadBlobElementReader::IsInMemory() const { + return reader_->IsInMemory(); +} + +int UploadBlobElementReader::Read(net::IOBuffer* buf, + int buf_length, + const net::CompletionCallback& callback) { + int length = 0; + BlobReader::Status status = reader_->Read(buf, buf_length, &length, callback); + switch (status) { + case BlobReader::Status::NET_ERROR: + return reader_->net_error(); + case BlobReader::Status::IO_PENDING: + return net::ERR_IO_PENDING; + case BlobReader::Status::DONE: + return length; + } + NOTREACHED(); + return net::ERR_FAILED; +} + +const std::string& UploadBlobElementReader::uuid() const { + return handle_->uuid(); +} + +} // namespace storage diff --git a/storage/browser/blob/upload_blob_element_reader.h b/storage/browser/blob/upload_blob_element_reader.h new file mode 100644 index 0000000..72b7244 --- /dev/null +++ b/storage/browser/blob/upload_blob_element_reader.h @@ -0,0 +1,54 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef STORAGE_BROWSER_BLOB_UPLOAD_BLOB_ELEMENT_READER_H_ +#define STORAGE_BROWSER_BLOB_UPLOAD_BLOB_ELEMENT_READER_H_ + +#include "base/macros.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/completion_callback.h" +#include "net/base/upload_element_reader.h" +#include "storage/browser/storage_browser_export.h" + +namespace net { +class IOBuffer; +} + +namespace storage { +class BlobDataHandle; +class BlobReader; + +// This class is a wrapper around the BlobReader to make it conform +// to the net::UploadElementReader interface, and it also holds around the +// handle to the blob so it stays in memory while we read it. +class STORAGE_EXPORT UploadBlobElementReader + : NON_EXPORTED_BASE(public net::UploadElementReader) { + public: + UploadBlobElementReader(scoped_ptr<BlobReader> reader, + scoped_ptr<BlobDataHandle> handle); + ~UploadBlobElementReader() override; + + int Init(const net::CompletionCallback& callback) override; + + uint64_t GetContentLength() const override; + + uint64_t BytesRemaining() const override; + + bool IsInMemory() const override; + + int Read(net::IOBuffer* buf, + int buf_length, + const net::CompletionCallback& callback) override; + + const std::string& uuid() const; + + private: + scoped_ptr<BlobReader> reader_; + scoped_ptr<BlobDataHandle> handle_; + + DISALLOW_COPY_AND_ASSIGN(UploadBlobElementReader); +}; + +} // namespace storage +#endif // STORAGE_BROWSER_BLOB_UPLOAD_BLOB_ELEMENT_READER_H_ |