// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/base/file_stream.h" #include #include "base/file_path.h" #include "base/logging.h" #include "base/message_loop.h" #include "base/metrics/histogram.h" #include "base/synchronization/waitable_event.h" #include "base/threading/thread_restrictions.h" #include "base/threading/worker_pool.h" #include "net/base/file_stream_metrics.h" #include "net/base/file_stream_net_log_parameters.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" namespace net { // Ensure that we can just use our Whence values directly. COMPILE_ASSERT(FROM_BEGIN == FILE_BEGIN, bad_whence_begin); COMPILE_ASSERT(FROM_CURRENT == FILE_CURRENT, bad_whence_current); COMPILE_ASSERT(FROM_END == FILE_END, bad_whence_end); namespace { void SetOffset(OVERLAPPED* overlapped, const LARGE_INTEGER& offset) { overlapped->Offset = offset.LowPart; overlapped->OffsetHigh = offset.HighPart; } void IncrementOffset(OVERLAPPED* overlapped, DWORD count) { LARGE_INTEGER offset; offset.LowPart = overlapped->Offset; offset.HighPart = overlapped->OffsetHigh; offset.QuadPart += static_cast(count); SetOffset(overlapped, offset); } int RecordAndMapError(int error, FileErrorSource source, bool record_uma, const net::BoundNetLog& bound_net_log) { net::Error net_error = MapSystemError(error); bound_net_log.AddEvent( net::NetLog::TYPE_FILE_STREAM_ERROR, make_scoped_refptr( new FileStreamErrorParameters(GetFileErrorSourceName(source), error, net_error))); RecordFileError(error, source, record_uma); return net_error; } // Opens a file with some network logging. // The opened file and the result code are written to |file| and |result|. void OpenFile(const FilePath& path, int open_flags, bool record_uma, base::PlatformFile* file, int* result, const net::BoundNetLog& bound_net_log) { bound_net_log.BeginEvent( net::NetLog::TYPE_FILE_STREAM_OPEN, make_scoped_refptr( new net::NetLogStringParameter("file_name", path.AsUTF8Unsafe()))); *file = base::CreatePlatformFile(path, open_flags, NULL, NULL); if (*file == base::kInvalidPlatformFileValue) { DWORD error = GetLastError(); LOG(WARNING) << "Failed to open file: " << error; *result = RecordAndMapError(error, FILE_ERROR_SOURCE_OPEN, record_uma, bound_net_log); bound_net_log.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN, NULL); return; } } // Closes a file with some network logging. void CloseFile(base::PlatformFile file, const net::BoundNetLog& bound_net_log) { bound_net_log.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE, NULL); if (file == base::kInvalidPlatformFileValue) return; CancelIo(file); if (!base::ClosePlatformFile(file)) NOTREACHED(); bound_net_log.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN, NULL); } // Closes a file with CloseFile() and signals the completion. void CloseFileAndSignal(base::PlatformFile* file, base::WaitableEvent* on_io_complete, const net::BoundNetLog& bound_net_log) { CloseFile(*file, bound_net_log); *file = base::kInvalidPlatformFileValue; on_io_complete->Signal(); } // Invokes a given closure and signals the completion. void InvokeAndSignal(const base::Closure& closure, base::WaitableEvent* on_io_complete) { closure.Run(); on_io_complete->Signal(); } } // namespace // FileStreamWin::AsyncContext ---------------------------------------------- class FileStreamWin::AsyncContext : public MessageLoopForIO::IOHandler { public: explicit AsyncContext(const net::BoundNetLog& bound_net_log) : context_(), is_closing_(false), record_uma_(false), bound_net_log_(bound_net_log), error_source_(FILE_ERROR_SOURCE_COUNT) { context_.handler = this; } ~AsyncContext(); void IOCompletionIsPending(const CompletionCallback& callback, IOBuffer* buf); OVERLAPPED* overlapped() { return &context_.overlapped; } const CompletionCallback& callback() const { return callback_; } void set_error_source(FileErrorSource source) { error_source_ = source; } void EnableErrorStatistics() { record_uma_ = true; } private: virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) OVERRIDE; MessageLoopForIO::IOContext context_; CompletionCallback callback_; scoped_refptr in_flight_buf_; bool is_closing_; bool record_uma_; const net::BoundNetLog bound_net_log_; FileErrorSource error_source_; }; FileStreamWin::AsyncContext::~AsyncContext() { is_closing_ = true; bool waited = false; base::TimeTicks start = base::TimeTicks::Now(); while (!callback_.is_null()) { waited = true; MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); } if (waited) { // We want to see if we block the message loop for too long. UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose", base::TimeTicks::Now() - start); } } void FileStreamWin::AsyncContext::IOCompletionIsPending( const CompletionCallback& callback, IOBuffer* buf) { DCHECK(callback_.is_null()); callback_ = callback; in_flight_buf_ = buf; // Hold until the async operation ends. } void FileStreamWin::AsyncContext::OnIOCompleted( MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) { DCHECK_EQ(&context_, context); DCHECK(!callback_.is_null()); if (is_closing_) { callback_.Reset(); in_flight_buf_ = NULL; return; } int result = static_cast(bytes_read); if (error && error != ERROR_HANDLE_EOF) { result = RecordAndMapError(error, error_source_, record_uma_, bound_net_log_); } if (bytes_read) IncrementOffset(&context->overlapped, bytes_read); CompletionCallback temp_callback = callback_; callback_.Reset(); scoped_refptr temp_buf = in_flight_buf_; in_flight_buf_ = NULL; temp_callback.Run(result); } // FileStream ------------------------------------------------------------ FileStreamWin::FileStreamWin(net::NetLog* net_log) : file_(base::kInvalidPlatformFileValue), open_flags_(0), auto_closed_(true), record_uma_(false), bound_net_log_(net::BoundNetLog::Make(net_log, net::NetLog::SOURCE_FILESTREAM)), weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE, NULL); } FileStreamWin::FileStreamWin( base::PlatformFile file, int flags, net::NetLog* net_log) : file_(file), open_flags_(flags), auto_closed_(false), record_uma_(false), bound_net_log_(net::BoundNetLog::Make(net_log, net::NetLog::SOURCE_FILESTREAM)), weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE, NULL); // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to // make sure we will perform asynchronous File IO to it. if (flags & base::PLATFORM_FILE_ASYNC) { async_context_.reset(new AsyncContext(bound_net_log_)); MessageLoopForIO::current()->RegisterIOHandler(file_, async_context_.get()); } } FileStreamWin::~FileStreamWin() { if (open_flags_ & base::PLATFORM_FILE_ASYNC) { // Block until the in-flight open/close operation is complete. // TODO(satorux): Ideally we should not block. crbug.com/115067 WaitForIOCompletion(); // Block until the last read/write operation is complete. async_context_.reset(); } if (auto_closed_) { if (open_flags_ & base::PLATFORM_FILE_ASYNC) { // Close the file in the background. if (IsOpen()) { const bool posted = base::WorkerPool::PostTask( FROM_HERE, base::Bind(&CloseFile, file_, bound_net_log_), true /* task_is_slow */); DCHECK(posted); } } else { CloseSync(); } } bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE, NULL); } void FileStreamWin::Close(const CompletionCallback& callback) { DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); // Passing &file_ to a thread pool looks unsafe but it's safe here as the // destructor ensures that the close operation is complete with // WaitForIOCompletion(). See also the destructor. const bool posted = base::WorkerPool::PostTaskAndReply( FROM_HERE, base::Bind(&CloseFileAndSignal, &file_, on_io_complete_.get(), bound_net_log_), base::Bind(&FileStreamWin::OnClosed, weak_ptr_factory_.GetWeakPtr(), callback), true /* task_is_slow */); DCHECK(posted); } void FileStreamWin::CloseSync() { // The logic here is similar to CloseFile() but async_context_.reset() is // caled in this function. // Block until the in-flight open operation is complete. // TODO(satorux): Replace this with a DCHECK(open_flags & ASYNC) once this // once all async clients are migrated to use Close(). crbug.com/114783 WaitForIOCompletion(); bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE, NULL); if (file_ != base::kInvalidPlatformFileValue) CancelIo(file_); // Block until the last read/write operation is complete. async_context_.reset(); if (file_ != base::kInvalidPlatformFileValue) { if (!base::ClosePlatformFile(file_)) NOTREACHED(); file_ = base::kInvalidPlatformFileValue; bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN, NULL); } } int FileStreamWin::Open(const FilePath& path, int open_flags, const CompletionCallback& callback) { if (IsOpen()) { DLOG(FATAL) << "File is already open!"; return ERR_UNEXPECTED; } open_flags_ = open_flags; DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); // Passing &file_ to a thread pool looks unsafe but it's safe here as the // destructor ensures that the open operation is complete with // WaitForIOCompletion(). See also the destructor. int* result = new int(OK); const bool posted = base::WorkerPool::PostTaskAndReply( FROM_HERE, base::Bind(&InvokeAndSignal, base::Bind(&OpenFile, path, open_flags, record_uma_, &file_, result, bound_net_log_), on_io_complete_.get()), base::Bind(&FileStreamWin::OnOpened, weak_ptr_factory_.GetWeakPtr(), callback, base::Owned(result)), true /* task_is_slow */); DCHECK(posted); return ERR_IO_PENDING; } int FileStreamWin::OpenSync(const FilePath& path, int open_flags) { if (IsOpen()) { DLOG(FATAL) << "File is already open!"; return ERR_UNEXPECTED; } open_flags_ = open_flags; int result = OK; OpenFile(path, open_flags_, record_uma_, &file_, &result, bound_net_log_); if (result != OK) return result; // TODO(satorux): Remove this once all async clients are migrated to use // Open(). crbug.com/114783 if (open_flags_ & base::PLATFORM_FILE_ASYNC) { async_context_.reset(new AsyncContext(bound_net_log_)); if (record_uma_) async_context_->EnableErrorStatistics(); MessageLoopForIO::current()->RegisterIOHandler(file_, async_context_.get()); } return OK; } bool FileStreamWin::IsOpen() const { return file_ != base::kInvalidPlatformFileValue; } int FileStreamWin::Seek(Whence whence, int64 offset, const Int64CompletionCallback& callback) { if (!IsOpen()) return ERR_UNEXPECTED; // Make sure we're async and we have no other in-flight async operations. DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); int64* result = new int64(-1); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); const bool posted = base::WorkerPool::PostTaskAndReply( FROM_HERE, base::Bind(&InvokeAndSignal, // Unretained should be fine as we wait for a signal on // on_io_complete_ at the destructor. base::Bind(&FileStreamWin::SeekFile, base::Unretained(this), whence, offset, result), on_io_complete_.get()), base::Bind(&FileStreamWin::OnSeeked, weak_ptr_factory_.GetWeakPtr(), callback, base::Owned(result)), true /* task is slow */); DCHECK(posted); return ERR_IO_PENDING; } int64 FileStreamWin::SeekSync(Whence whence, int64 offset) { if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(!async_context_.get() || async_context_->callback().is_null()); int64 result = -1; SeekFile(whence, offset, &result); return result; } int64 FileStreamWin::Available() { base::ThreadRestrictions::AssertIOAllowed(); if (!IsOpen()) return ERR_UNEXPECTED; int64 cur_pos = SeekSync(FROM_CURRENT, 0); if (cur_pos < 0) return cur_pos; LARGE_INTEGER file_size; if (!GetFileSizeEx(file_, &file_size)) { DWORD error = GetLastError(); LOG(WARNING) << "GetFileSizeEx failed: " << error; return RecordAndMapError(error, FILE_ERROR_SOURCE_GET_SIZE, record_uma_, bound_net_log_); } return file_size.QuadPart - cur_pos; } int FileStreamWin::Read( IOBuffer* buf, int buf_len, const CompletionCallback& callback) { DCHECK(async_context_.get()); if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_READ); OVERLAPPED* overlapped = NULL; DCHECK(!callback.is_null()); DCHECK(async_context_->callback().is_null()); overlapped = async_context_->overlapped(); async_context_->set_error_source(FILE_ERROR_SOURCE_READ); int rv = 0; DWORD bytes_read; if (!ReadFile(file_, buf->data(), buf_len, &bytes_read, overlapped)) { DWORD error = GetLastError(); if (error == ERROR_IO_PENDING) { async_context_->IOCompletionIsPending(callback, buf); rv = ERR_IO_PENDING; } else if (error == ERROR_HANDLE_EOF) { rv = 0; // Report EOF by returning 0 bytes read. } else { LOG(WARNING) << "ReadFile failed: " << error; rv = RecordAndMapError(error, FILE_ERROR_SOURCE_READ, record_uma_, bound_net_log_); } } else if (overlapped) { async_context_->IOCompletionIsPending(callback, buf); rv = ERR_IO_PENDING; } else { rv = static_cast(bytes_read); } return rv; } int FileStreamWin::ReadSync(char* buf, int buf_len) { DCHECK(!async_context_.get()); base::ThreadRestrictions::AssertIOAllowed(); if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_READ); int rv = 0; DWORD bytes_read; if (!ReadFile(file_, buf, buf_len, &bytes_read, NULL)) { DWORD error = GetLastError(); if (error == ERROR_HANDLE_EOF) { rv = 0; // Report EOF by returning 0 bytes read. } else { LOG(WARNING) << "ReadFile failed: " << error; rv = RecordAndMapError(error, FILE_ERROR_SOURCE_READ, record_uma_, bound_net_log_); } } else { rv = static_cast(bytes_read); } return rv; } int FileStreamWin::ReadUntilComplete(char *buf, int buf_len) { int to_read = buf_len; int bytes_total = 0; do { int bytes_read = ReadSync(buf, to_read); if (bytes_read <= 0) { if (bytes_total == 0) return bytes_read; return bytes_total; } bytes_total += bytes_read; buf += bytes_read; to_read -= bytes_read; } while (bytes_total < buf_len); return bytes_total; } int FileStreamWin::Write( IOBuffer* buf, int buf_len, const CompletionCallback& callback) { DCHECK(async_context_.get()); if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); OVERLAPPED* overlapped = NULL; DCHECK(!callback.is_null()); DCHECK(async_context_->callback().is_null()); overlapped = async_context_->overlapped(); async_context_->set_error_source(FILE_ERROR_SOURCE_WRITE); int rv = 0; DWORD bytes_written = 0; if (!WriteFile(file_, buf->data(), buf_len, &bytes_written, overlapped)) { DWORD error = GetLastError(); if (error == ERROR_IO_PENDING) { async_context_->IOCompletionIsPending(callback, buf); rv = ERR_IO_PENDING; } else { LOG(WARNING) << "WriteFile failed: " << error; rv = RecordAndMapError(error, FILE_ERROR_SOURCE_WRITE, record_uma_, bound_net_log_); } } else if (overlapped) { async_context_->IOCompletionIsPending(callback, buf); rv = ERR_IO_PENDING; } else { rv = static_cast(bytes_written); } return rv; } int FileStreamWin::WriteSync( const char* buf, int buf_len) { DCHECK(!async_context_.get()); base::ThreadRestrictions::AssertIOAllowed(); if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); int rv = 0; DWORD bytes_written = 0; if (!WriteFile(file_, buf, buf_len, &bytes_written, NULL)) { DWORD error = GetLastError(); LOG(WARNING) << "WriteFile failed: " << error; rv = RecordAndMapError(error, FILE_ERROR_SOURCE_WRITE, record_uma_, bound_net_log_); } else { rv = static_cast(bytes_written); } return rv; } int FileStreamWin::Flush() { base::ThreadRestrictions::AssertIOAllowed(); if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); if (FlushFileBuffers(file_)) { return OK; } return RecordAndMapError(GetLastError(), FILE_ERROR_SOURCE_FLUSH, record_uma_, bound_net_log_); } int64 FileStreamWin::Truncate(int64 bytes) { base::ThreadRestrictions::AssertIOAllowed(); if (!IsOpen()) return ERR_UNEXPECTED; // We'd better be open for writing. DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); // Seek to the position to truncate from. int64 seek_position = SeekSync(FROM_BEGIN, bytes); if (seek_position != bytes) return ERR_UNEXPECTED; // And truncate the file. BOOL result = SetEndOfFile(file_); if (!result) { DWORD error = GetLastError(); LOG(WARNING) << "SetEndOfFile failed: " << error; return RecordAndMapError(error, FILE_ERROR_SOURCE_SET_EOF, record_uma_, bound_net_log_); } // Success. return seek_position; } void FileStreamWin::EnableErrorStatistics() { record_uma_ = true; if (async_context_.get()) async_context_->EnableErrorStatistics(); } void FileStreamWin::SetBoundNetLogSource( const net::BoundNetLog& owner_bound_net_log) { if ((owner_bound_net_log.source().id == net::NetLog::Source::kInvalidId) && (bound_net_log_.source().id == net::NetLog::Source::kInvalidId)) { // Both |BoundNetLog|s are invalid. return; } // Should never connect to itself. DCHECK_NE(bound_net_log_.source().id, owner_bound_net_log.source().id); bound_net_log_.AddEvent( net::NetLog::TYPE_FILE_STREAM_BOUND_TO_OWNER, make_scoped_refptr( new net::NetLogSourceParameter("source_dependency", owner_bound_net_log.source()))); owner_bound_net_log.AddEvent( net::NetLog::TYPE_FILE_STREAM_SOURCE, make_scoped_refptr( new net::NetLogSourceParameter("source_dependency", bound_net_log_.source()))); } base::PlatformFile FileStreamWin::GetPlatformFileForTesting() { return file_; } void FileStreamWin::OnClosed(const CompletionCallback& callback) { file_ = base::kInvalidPlatformFileValue; // Reset this before Run() as Run() may issue a new async operation. ResetOnIOComplete(); callback.Run(OK); } void FileStreamWin::SeekFile(Whence whence, int64 offset, int64* result) { LARGE_INTEGER distance, res; distance.QuadPart = offset; DWORD move_method = static_cast(whence); if (!SetFilePointerEx(file_, distance, &res, move_method)) { DWORD error = GetLastError(); LOG(WARNING) << "SetFilePointerEx failed: " << error; *result = RecordAndMapError(error, FILE_ERROR_SOURCE_SEEK, record_uma_, bound_net_log_); return; } if (async_context_.get()) { async_context_->set_error_source(FILE_ERROR_SOURCE_SEEK); SetOffset(async_context_->overlapped(), res); } *result = res.QuadPart; } void FileStreamWin::OnOpened(const CompletionCallback& callback, int* result) { if (*result == OK) { async_context_.reset(new AsyncContext(bound_net_log_)); if (record_uma_) async_context_->EnableErrorStatistics(); MessageLoopForIO::current()->RegisterIOHandler(file_, async_context_.get()); } // Reset this before Run() as Run() may issue a new async operation. ResetOnIOComplete(); callback.Run(*result); } void FileStreamWin::OnSeeked( const Int64CompletionCallback& callback, int64* result) { // Reset this before Run() as Run() may issue a new async operation. ResetOnIOComplete(); callback.Run(*result); } void FileStreamWin::ResetOnIOComplete() { on_io_complete_.reset(); weak_ptr_factory_.InvalidateWeakPtrs(); } void FileStreamWin::WaitForIOCompletion() { // http://crbug.com/115067 base::ThreadRestrictions::ScopedAllowWait allow_wait; if (on_io_complete_.get()) { on_io_complete_->Wait(); on_io_complete_.reset(); } } } // namespace net