diff options
author | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-24 21:02:01 +0000 |
---|---|---|
committer | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-24 21:02:01 +0000 |
commit | 4c2048aace034bed3d2a6b47ef67438025144cbb (patch) | |
tree | 99cecccbd7e00b77ee5f96e2a0a94d6709339329 /net/base/file_stream_posix.cc | |
parent | 6da922a74869fec3178cd2263b90ef0b92a232aa (diff) | |
download | chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.zip chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.tar.gz chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.tar.bz2 |
* Switch the posix FileStream code over to using WorkerPool for asynchronous operations.
* Add a bunch of tests for asynchronous operations.
* Fix lint errors.
Before:
Summary
iterations 5
pages 20
milliseconds 18585
mean per set 3717.00
mean per page 185.85
timer lag 2663.00
timer lag per page 26.63
After:
Summary
iterations 5
pages 20
milliseconds 9279
mean per set 1855.80
mean per page 92.79
timer lag 689.00
timer lag per page 6.89
Review URL: http://codereview.chromium.org/48111
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12398 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/base/file_stream_posix.cc')
-rw-r--r-- | net/base/file_stream_posix.cc | 330 |
1 files changed, 282 insertions, 48 deletions
diff --git a/net/base/file_stream_posix.cc b/net/base/file_stream_posix.cc index d7f4811..d1b70eb 100644 --- a/net/base/file_stream_posix.cc +++ b/net/base/file_stream_posix.cc @@ -17,6 +17,8 @@ #include "base/logging.h" #include "base/message_loop.h" #include "base/string_util.h" +#include "base/waitable_event.h" +#include "base/worker_pool.h" #include "net/base/net_errors.h" // We cast back and forth, so make sure it's the size we're expecting. @@ -28,19 +30,263 @@ COMPILE_ASSERT(net::FROM_BEGIN == SEEK_SET && net::FROM_END == SEEK_END, whence_matches_system); namespace net { +namespace { + +// Map from errno to net error codes. +int64 MapErrorCode(int err) { + switch (err) { + case ENOENT: + return ERR_FILE_NOT_FOUND; + case EACCES: + return ERR_ACCESS_DENIED; + default: + LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED"; + return ERR_FAILED; + } +} + +// ReadFile() is a simple wrapper around read() that handles EINTR signals and +// calls MapErrorCode() to map errno to net error codes. +int ReadFile(base::PlatformFile file, char* buf, int buf_len) { + // read(..., 0) returns 0 to indicate end-of-file. + + // Loop in the case of getting interrupted by a signal. + for (;;) { + ssize_t res = read(file, buf, static_cast<size_t>(buf_len)); + if (res == static_cast<ssize_t>(-1)) { + if (errno == EINTR) + continue; + return MapErrorCode(errno); + } + return static_cast<int>(res); + } +} + +// WriteFile() is a simple wrapper around write() that handles EINTR signals and +// calls MapErrorCode() to map errno to net error codes. It tries to write to +// completion. +int WriteFile(base::PlatformFile file, const char* buf, int buf_len) { + while (true) { + ssize_t res = write(file, buf, buf_len); + if (res == -1) { + if (errno == EINTR) + continue; + return MapErrorCode(errno); + } + return res; + } +} + +// BackgroundReadTask is a simple task that reads a file and then runs +// |callback|. AsyncContext will post this task to the WorkerPool. +class BackgroundReadTask : public Task { + public: + BackgroundReadTask(base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback); + ~BackgroundReadTask(); + + virtual void Run(); + + private: + const base::PlatformFile file_; + char* const buf_; + const int buf_len_; + CompletionCallback* const callback_; + + DISALLOW_COPY_AND_ASSIGN(BackgroundReadTask); +}; + +BackgroundReadTask::BackgroundReadTask( + base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback) + : file_(file), buf_(buf), buf_len_(buf_len), callback_(callback) {} + +BackgroundReadTask::~BackgroundReadTask() {} + +void BackgroundReadTask::Run() { + int result = ReadFile(file_, buf_, buf_len_); + callback_->Run(result); +} + +// BackgroundWriteTask is a simple task that writes to a file and then runs +// |callback|. AsyncContext will post this task to the WorkerPool. +class BackgroundWriteTask : public Task { + public: + BackgroundWriteTask(base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback); + ~BackgroundWriteTask(); + + virtual void Run(); + + private: + const base::PlatformFile file_; + const char* const buf_; + const int buf_len_; + CompletionCallback* const callback_; + + DISALLOW_COPY_AND_ASSIGN(BackgroundWriteTask); +}; + +BackgroundWriteTask::BackgroundWriteTask( + base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback) + : file_(file), buf_(buf), buf_len_(buf_len), callback_(callback) {} + +BackgroundWriteTask::~BackgroundWriteTask() {} + +void BackgroundWriteTask::Run() { + int result = WriteFile(file_, buf_, buf_len_); + callback_->Run(result); +} + +} // namespace + +// CancelableCallbackTask takes ownership of the Callback. This task gets +// posted to the MessageLoopForIO instance. +class CancelableCallbackTask : public CancelableTask { + public: + explicit CancelableCallbackTask(Callback0::Type* callback) + : canceled_(false), callback_(callback) {} + + virtual void Run() { + if (!canceled_) + callback_->Run(); + } + + virtual void Cancel() { + canceled_ = true; + } + + private: + bool canceled_; + scoped_ptr<Callback0::Type> callback_; +}; // FileStream::AsyncContext ---------------------------------------------- -// TODO(deanm): Figure out how to best do async IO. class FileStream::AsyncContext { public: + AsyncContext(); + ~AsyncContext(); + + // These methods post synchronous read() and write() calls to a WorkerThread. + void InitiateAsyncRead( + base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback); + void InitiateAsyncWrite( + base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback); + + CompletionCallback* callback() const { return callback_; } + + // Called by the WorkerPool thread executing the IO after the IO completes. + // This method queues RunAsynchronousCallback() on the MessageLoop and signals + // |background_io_completed_callback_|, in case the destructor is waiting. In + // that case, the destructor will call RunAsynchronousCallback() instead, and + // cancel |message_loop_task_|. + // |result| is the result of the Read/Write task. + void OnBackgroundIOCompleted(int result); - CompletionCallback* callback() const { return NULL; } private: + // Always called on the IO thread, either directly by a task on the + // MessageLoop or by ~AsyncContext(). + void RunAsynchronousCallback(); + + // The MessageLoopForIO that this AsyncContext is running on. + MessageLoopForIO* const message_loop_; + CompletionCallback* callback_; // The user provided callback. + + // A callback wrapper around OnBackgroundIOCompleted(). Run by the WorkerPool + // thread doing the background IO on our behalf. + CompletionCallbackImpl<AsyncContext> background_io_completed_callback_; + + // This is used to synchronize between the AsyncContext destructor (which runs + // on the IO thread and OnBackgroundIOCompleted() which runs on the WorkerPool + // thread. + base::WaitableEvent background_io_completed_; + + // These variables are only valid when background_io_completed is signaled. + int result_; + CancelableCallbackTask* message_loop_task_; DISALLOW_COPY_AND_ASSIGN(AsyncContext); }; +FileStream::AsyncContext::AsyncContext() + : message_loop_(MessageLoopForIO::current()), + callback_(NULL), + background_io_completed_callback_( + this, &AsyncContext::OnBackgroundIOCompleted), + background_io_completed_(true, false), + message_loop_task_(NULL) {} + +FileStream::AsyncContext::~AsyncContext() { + if (callback_) { + // If |callback_| is non-NULL, that implies either the worker thread is + // still running the IO task, or the completion callback is queued up on the + // MessageLoopForIO, but AsyncContext() got deleted before then. + const bool need_to_wait = !background_io_completed_.IsSignaled(); + base::Time start = base::Time::Now(); + RunAsynchronousCallback(); + if (need_to_wait) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose", base::Time::Now() - start); + } + } +} + +void FileStream::AsyncContext::InitiateAsyncRead( + base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback) { + DCHECK(!callback_); + callback_ = callback; + + WorkerPool::PostTask(FROM_HERE, + new BackgroundReadTask( + file, buf, buf_len, + &background_io_completed_callback_), + true /* task_is_slow */); +} + +void FileStream::AsyncContext::InitiateAsyncWrite( + base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback) { + DCHECK(!callback_); + callback_ = callback; + + WorkerPool::PostTask(FROM_HERE, + new BackgroundWriteTask( + file, buf, buf_len, + &background_io_completed_callback_), + true /* task_is_slow */); +} + +void FileStream::AsyncContext::OnBackgroundIOCompleted(int result) { + result_ = result; + message_loop_task_ = new CancelableCallbackTask( + NewCallback(this, &AsyncContext::RunAsynchronousCallback)); + message_loop_->PostTask(FROM_HERE, message_loop_task_); + background_io_completed_.Signal(); +} + +void FileStream::AsyncContext::RunAsynchronousCallback() { + // Wait() here ensures that all modifications from the WorkerPool thread are + // now visible. + background_io_completed_.Wait(); + + // Either we're in the MessageLoop's task, in which case Cancel() doesn't do + // anything, or we're in ~AsyncContext(), in which case this prevents the call + // from happening again. Must do it here after calling Wait(). + message_loop_task_->Cancel(); + message_loop_task_ = NULL; + + DCHECK(callback_); + CompletionCallback* temp = NULL; + std::swap(temp, callback_); + background_io_completed_.Reset(); + temp->Run(result_); +} + // FileStream ------------------------------------------------------------ FileStream::FileStream() : file_(base::kInvalidPlatformFileValue) { @@ -49,8 +295,11 @@ FileStream::FileStream() : file_(base::kInvalidPlatformFileValue) { FileStream::FileStream(base::PlatformFile file, int flags) : file_(file), open_flags_(flags) { - // TODO(hclam): initialize the aync_context_ if the file handle - // is opened as an asynchronous file handle. + // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to + // make sure we will perform asynchronous File IO to it. + if (flags & base::PLATFORM_FILE_ASYNC) { + async_context_.reset(new AsyncContext()); + } } FileStream::~FileStream() { @@ -58,26 +307,15 @@ FileStream::~FileStream() { } void FileStream::Close() { + // Abort any existing asynchronous operations. + async_context_.reset(); + if (file_ != base::kInvalidPlatformFileValue) { if (close(file_) != 0) { NOTREACHED(); } file_ = base::kInvalidPlatformFileValue; } - async_context_.reset(); -} - -// Map from errno to net error codes. -static int64 MapErrorCode(int err) { - switch(err) { - case ENOENT: - return ERR_FILE_NOT_FOUND; - case EACCES: - return ERR_ACCESS_DENIED; - default: - LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED"; - return ERR_FAILED; - } } int FileStream::Open(const FilePath& path, int open_flags) { @@ -93,6 +331,10 @@ int FileStream::Open(const FilePath& path, int open_flags) { return MapErrorCode(errno); } + if (open_flags_ & base::PLATFORM_FILE_ASYNC) { + async_context_.reset(new AsyncContext()); + } + return OK; } @@ -128,29 +370,28 @@ int64 FileStream::Available() { return MapErrorCode(errno); int64 size = static_cast<int64>(info.st_size); - DCHECK(size >= cur_pos); + DCHECK_GT(size, cur_pos); return size - cur_pos; } -// TODO(deanm): async. int FileStream::Read( char* buf, int buf_len, CompletionCallback* callback) { - // read(..., 0) will return 0, which indicates end-of-file. - DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX); - if (!IsOpen()) return ERR_UNEXPECTED; - // Loop in the case of getting interrupted by a signal. - for (;;) { - ssize_t res = read(file_, buf, static_cast<size_t>(buf_len)); - if (res == static_cast<ssize_t>(-1)) { - if (errno == EINTR) - continue; - return MapErrorCode(errno); - } - return static_cast<int>(res); + // read(..., 0) will return 0, which indicates end-of-file. + DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX); + DCHECK(open_flags_ & base::PLATFORM_FILE_READ); + + if (async_context_.get()) { + DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + // If we're in async, make sure we don't have a request in flight. + DCHECK(!async_context_->callback()); + async_context_->InitiateAsyncRead(file_, buf, buf_len, callback); + return ERR_IO_PENDING; + } else { + return ReadFile(file_, buf, buf_len); } } @@ -175,30 +416,23 @@ int FileStream::ReadUntilComplete(char *buf, int buf_len) { return bytes_total; } -// TODO(deanm): async. int FileStream::Write( const char* buf, int buf_len, CompletionCallback* callback) { - - // read(..., 0) will return 0, which indicates end-of-file. + // write(..., 0) will return 0, which indicates end-of-file. DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX); if (!IsOpen()) return ERR_UNEXPECTED; - int total_bytes_written = 0; - size_t len = static_cast<size_t>(buf_len); - while (total_bytes_written < buf_len) { - ssize_t res = write(file_, buf, len); - if (res == static_cast<ssize_t>(-1)) { - if (errno == EINTR) - continue; - return MapErrorCode(errno); - } - total_bytes_written += res; - buf += res; - len -= res; + if (async_context_.get()) { + DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + // If we're in async, make sure we don't have a request in flight. + DCHECK(!async_context_->callback()); + async_context_->InitiateAsyncWrite(file_, buf, buf_len, callback); + return ERR_IO_PENDING; + } else { + return WriteFile(file_, buf, buf_len); } - return total_bytes_written; } int64 FileStream::Truncate(int64 bytes) { |