// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/base/upload_data_stream.h" #include "base/logging.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/base/upload_bytes_element_reader.h" #include "net/base/upload_element_reader.h" namespace net { UploadDataStream::UploadDataStream( ScopedVector element_readers, int64 identifier) : element_readers_(element_readers.Pass()), element_index_(0), total_size_(0), current_position_(0), identifier_(identifier), is_chunked_(false), last_chunk_appended_(false), read_failed_(false), initialized_successfully_(false), weak_ptr_factory_(this) { } UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier) : element_index_(0), total_size_(0), current_position_(0), identifier_(identifier), is_chunked_(true), last_chunk_appended_(false), read_failed_(false), initialized_successfully_(false), weak_ptr_factory_(this) { } UploadDataStream::~UploadDataStream() { } UploadDataStream* UploadDataStream::CreateWithReader( scoped_ptr reader, int64 identifier) { ScopedVector readers; readers.push_back(reader.release()); return new UploadDataStream(readers.Pass(), identifier); } int UploadDataStream::Init(const CompletionCallback& callback) { Reset(); return InitInternal(0, callback); } int UploadDataStream::Read(IOBuffer* buf, int buf_len, const CompletionCallback& callback) { DCHECK(initialized_successfully_); DCHECK_GT(buf_len, 0); return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback); } bool UploadDataStream::IsEOF() const { DCHECK(initialized_successfully_); if (!is_chunked_) return current_position_ == total_size_; // If the upload data is chunked, check if the last chunk is appended and all // elements are consumed. return element_index_ == element_readers_.size() && last_chunk_appended_; } bool UploadDataStream::IsInMemory() const { // Chunks are in memory, but UploadData does not have all the chunks at // once. Chunks are provided progressively with AppendChunk() as chunks // are ready. Check is_chunked_ here, rather than relying on the loop // below, as there is a case that is_chunked_ is set to true, but the // first chunk is not yet delivered. if (is_chunked_) return false; for (size_t i = 0; i < element_readers_.size(); ++i) { if (!element_readers_[i]->IsInMemory()) return false; } return true; } void UploadDataStream::AppendChunk(const char* bytes, int bytes_len, bool is_last_chunk) { DCHECK(is_chunked_); DCHECK(!last_chunk_appended_); last_chunk_appended_ = is_last_chunk; // Initialize a reader for the newly appended chunk. We leave |total_size_| at // zero, since for chunked uploads, we may not know the total size. std::vector data(bytes, bytes + bytes_len); UploadElementReader* reader = new UploadOwnedBytesElementReader(&data); const int rv = reader->Init(net::CompletionCallback()); DCHECK_EQ(OK, rv); element_readers_.push_back(reader); // Resume pending read. if (!pending_chunked_read_callback_.is_null()) { base::Closure callback = pending_chunked_read_callback_; pending_chunked_read_callback_.Reset(); callback.Run(); } } void UploadDataStream::Reset() { weak_ptr_factory_.InvalidateWeakPtrs(); pending_chunked_read_callback_.Reset(); initialized_successfully_ = false; read_failed_ = false; current_position_ = 0; total_size_ = 0; element_index_ = 0; } int UploadDataStream::InitInternal(int start_index, const CompletionCallback& callback) { DCHECK(!initialized_successfully_); // Call Init() for all elements. for (size_t i = start_index; i < element_readers_.size(); ++i) { UploadElementReader* reader = element_readers_[i]; // When new_result is ERR_IO_PENDING, InitInternal() will be called // with start_index == i + 1 when reader->Init() finishes. const int result = reader->Init( base::Bind(&UploadDataStream::ResumePendingInit, weak_ptr_factory_.GetWeakPtr(), i + 1, callback)); if (result != OK) { DCHECK(result != ERR_IO_PENDING || !callback.is_null()); return result; } } // Finalize initialization. if (!is_chunked_) { uint64 total_size = 0; for (size_t i = 0; i < element_readers_.size(); ++i) { UploadElementReader* reader = element_readers_[i]; total_size += reader->GetContentLength(); } total_size_ = total_size; } initialized_successfully_ = true; return OK; } void UploadDataStream::ResumePendingInit(int start_index, const CompletionCallback& callback, int previous_result) { DCHECK(!initialized_successfully_); DCHECK(!callback.is_null()); DCHECK_NE(ERR_IO_PENDING, previous_result); // Check the last result. if (previous_result != OK) { callback.Run(previous_result); return; } const int result = InitInternal(start_index, callback); if (result != ERR_IO_PENDING) callback.Run(result); } int UploadDataStream::ReadInternal(scoped_refptr buf, const CompletionCallback& callback) { DCHECK(initialized_successfully_); while (!read_failed_ && element_index_ < element_readers_.size()) { UploadElementReader* reader = element_readers_[element_index_]; if (reader->BytesRemaining() == 0) { ++element_index_; continue; } if (buf->BytesRemaining() == 0) break; int result = reader->Read( buf.get(), buf->BytesRemaining(), base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead), weak_ptr_factory_.GetWeakPtr(), buf, callback)); if (result == ERR_IO_PENDING) { DCHECK(!callback.is_null()); return ERR_IO_PENDING; } ProcessReadResult(buf, result); } if (read_failed_) { // Chunked transfers may only contain byte readers, so cannot have read // failures. DCHECK(!is_chunked_); // If an error occured during read operation, then pad with zero. // Otherwise the server will hang waiting for the rest of the data. const int num_bytes_to_fill = std::min(static_cast(buf->BytesRemaining()), size() - position() - buf->BytesConsumed()); DCHECK_LE(0, num_bytes_to_fill); memset(buf->data(), 0, num_bytes_to_fill); buf->DidConsume(num_bytes_to_fill); } const int bytes_copied = buf->BytesConsumed(); current_position_ += bytes_copied; DCHECK(is_chunked_ || total_size_ >= current_position_); if (is_chunked_ && !IsEOF() && bytes_copied == 0) { DCHECK(!callback.is_null()); DCHECK(pending_chunked_read_callback_.is_null()); pending_chunked_read_callback_ = base::Bind(&UploadDataStream::ResumePendingRead, weak_ptr_factory_.GetWeakPtr(), buf, callback, OK); return ERR_IO_PENDING; } // Returning 0 is allowed only when IsEOF() == true. DCHECK(bytes_copied != 0 || IsEOF()); return bytes_copied; } void UploadDataStream::ResumePendingRead(scoped_refptr buf, const CompletionCallback& callback, int previous_result) { DCHECK(!callback.is_null()); ProcessReadResult(buf, previous_result); const int result = ReadInternal(buf, callback); if (result != ERR_IO_PENDING) callback.Run(result); } void UploadDataStream::ProcessReadResult(scoped_refptr buf, int result) { DCHECK_NE(ERR_IO_PENDING, result); DCHECK(!read_failed_); if (result >= 0) buf->DidConsume(result); else read_failed_ = true; } } // namespace net