summaryrefslogtreecommitdiffstats
path: root/net/base/file_stream_posix.cc
diff options
context:
space:
mode:
authorwillchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-03-24 21:02:01 +0000
committerwillchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-03-24 21:02:01 +0000
commit4c2048aace034bed3d2a6b47ef67438025144cbb (patch)
tree99cecccbd7e00b77ee5f96e2a0a94d6709339329 /net/base/file_stream_posix.cc
parent6da922a74869fec3178cd2263b90ef0b92a232aa (diff)
downloadchromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.zip
chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.tar.gz
chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.tar.bz2
* Switch the posix FileStream code over to using WorkerPool for asynchronous operations.
* Add a bunch of tests for asynchronous operations. * Fix lint errors. Before: Summary iterations 5 pages 20 milliseconds 18585 mean per set 3717.00 mean per page 185.85 timer lag 2663.00 timer lag per page 26.63 After: Summary iterations 5 pages 20 milliseconds 9279 mean per set 1855.80 mean per page 92.79 timer lag 689.00 timer lag per page 6.89 Review URL: http://codereview.chromium.org/48111 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12398 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/base/file_stream_posix.cc')
-rw-r--r--net/base/file_stream_posix.cc330
1 files changed, 282 insertions, 48 deletions
diff --git a/net/base/file_stream_posix.cc b/net/base/file_stream_posix.cc
index d7f4811..d1b70eb 100644
--- a/net/base/file_stream_posix.cc
+++ b/net/base/file_stream_posix.cc
@@ -17,6 +17,8 @@
#include "base/logging.h"
#include "base/message_loop.h"
#include "base/string_util.h"
+#include "base/waitable_event.h"
+#include "base/worker_pool.h"
#include "net/base/net_errors.h"
// We cast back and forth, so make sure it's the size we're expecting.
@@ -28,19 +30,263 @@ COMPILE_ASSERT(net::FROM_BEGIN == SEEK_SET &&
net::FROM_END == SEEK_END, whence_matches_system);
namespace net {
+namespace {
+
+// Map from errno to net error codes.
+int64 MapErrorCode(int err) {
+ switch (err) {
+ case ENOENT:
+ return ERR_FILE_NOT_FOUND;
+ case EACCES:
+ return ERR_ACCESS_DENIED;
+ default:
+ LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED";
+ return ERR_FAILED;
+ }
+}
+
+// ReadFile() is a simple wrapper around read() that handles EINTR signals and
+// calls MapErrorCode() to map errno to net error codes.
+int ReadFile(base::PlatformFile file, char* buf, int buf_len) {
+ // read(..., 0) returns 0 to indicate end-of-file.
+
+ // Loop in the case of getting interrupted by a signal.
+ for (;;) {
+ ssize_t res = read(file, buf, static_cast<size_t>(buf_len));
+ if (res == static_cast<ssize_t>(-1)) {
+ if (errno == EINTR)
+ continue;
+ return MapErrorCode(errno);
+ }
+ return static_cast<int>(res);
+ }
+}
+
+// WriteFile() is a simple wrapper around write() that handles EINTR signals and
+// calls MapErrorCode() to map errno to net error codes. It tries to write to
+// completion.
+int WriteFile(base::PlatformFile file, const char* buf, int buf_len) {
+ while (true) {
+ ssize_t res = write(file, buf, buf_len);
+ if (res == -1) {
+ if (errno == EINTR)
+ continue;
+ return MapErrorCode(errno);
+ }
+ return res;
+ }
+}
+
+// BackgroundReadTask is a simple task that reads a file and then runs
+// |callback|. AsyncContext will post this task to the WorkerPool.
+class BackgroundReadTask : public Task {
+ public:
+ BackgroundReadTask(base::PlatformFile file, char* buf, int buf_len,
+ CompletionCallback* callback);
+ ~BackgroundReadTask();
+
+ virtual void Run();
+
+ private:
+ const base::PlatformFile file_;
+ char* const buf_;
+ const int buf_len_;
+ CompletionCallback* const callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(BackgroundReadTask);
+};
+
+BackgroundReadTask::BackgroundReadTask(
+ base::PlatformFile file, char* buf, int buf_len,
+ CompletionCallback* callback)
+ : file_(file), buf_(buf), buf_len_(buf_len), callback_(callback) {}
+
+BackgroundReadTask::~BackgroundReadTask() {}
+
+void BackgroundReadTask::Run() {
+ int result = ReadFile(file_, buf_, buf_len_);
+ callback_->Run(result);
+}
+
+// BackgroundWriteTask is a simple task that writes to a file and then runs
+// |callback|. AsyncContext will post this task to the WorkerPool.
+class BackgroundWriteTask : public Task {
+ public:
+ BackgroundWriteTask(base::PlatformFile file, const char* buf, int buf_len,
+ CompletionCallback* callback);
+ ~BackgroundWriteTask();
+
+ virtual void Run();
+
+ private:
+ const base::PlatformFile file_;
+ const char* const buf_;
+ const int buf_len_;
+ CompletionCallback* const callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(BackgroundWriteTask);
+};
+
+BackgroundWriteTask::BackgroundWriteTask(
+ base::PlatformFile file, const char* buf, int buf_len,
+ CompletionCallback* callback)
+ : file_(file), buf_(buf), buf_len_(buf_len), callback_(callback) {}
+
+BackgroundWriteTask::~BackgroundWriteTask() {}
+
+void BackgroundWriteTask::Run() {
+ int result = WriteFile(file_, buf_, buf_len_);
+ callback_->Run(result);
+}
+
+} // namespace
+
+// CancelableCallbackTask takes ownership of the Callback. This task gets
+// posted to the MessageLoopForIO instance.
+class CancelableCallbackTask : public CancelableTask {
+ public:
+ explicit CancelableCallbackTask(Callback0::Type* callback)
+ : canceled_(false), callback_(callback) {}
+
+ virtual void Run() {
+ if (!canceled_)
+ callback_->Run();
+ }
+
+ virtual void Cancel() {
+ canceled_ = true;
+ }
+
+ private:
+ bool canceled_;
+ scoped_ptr<Callback0::Type> callback_;
+};
// FileStream::AsyncContext ----------------------------------------------
-// TODO(deanm): Figure out how to best do async IO.
class FileStream::AsyncContext {
public:
+ AsyncContext();
+ ~AsyncContext();
+
+ // These methods post synchronous read() and write() calls to a WorkerThread.
+ void InitiateAsyncRead(
+ base::PlatformFile file, char* buf, int buf_len,
+ CompletionCallback* callback);
+ void InitiateAsyncWrite(
+ base::PlatformFile file, const char* buf, int buf_len,
+ CompletionCallback* callback);
+
+ CompletionCallback* callback() const { return callback_; }
+
+ // Called by the WorkerPool thread executing the IO after the IO completes.
+ // This method queues RunAsynchronousCallback() on the MessageLoop and signals
+ // |background_io_completed_callback_|, in case the destructor is waiting. In
+ // that case, the destructor will call RunAsynchronousCallback() instead, and
+ // cancel |message_loop_task_|.
+ // |result| is the result of the Read/Write task.
+ void OnBackgroundIOCompleted(int result);
- CompletionCallback* callback() const { return NULL; }
private:
+ // Always called on the IO thread, either directly by a task on the
+ // MessageLoop or by ~AsyncContext().
+ void RunAsynchronousCallback();
+
+ // The MessageLoopForIO that this AsyncContext is running on.
+ MessageLoopForIO* const message_loop_;
+ CompletionCallback* callback_; // The user provided callback.
+
+ // A callback wrapper around OnBackgroundIOCompleted(). Run by the WorkerPool
+ // thread doing the background IO on our behalf.
+ CompletionCallbackImpl<AsyncContext> background_io_completed_callback_;
+
+ // This is used to synchronize between the AsyncContext destructor (which runs
+ // on the IO thread and OnBackgroundIOCompleted() which runs on the WorkerPool
+ // thread.
+ base::WaitableEvent background_io_completed_;
+
+ // These variables are only valid when background_io_completed is signaled.
+ int result_;
+ CancelableCallbackTask* message_loop_task_;
DISALLOW_COPY_AND_ASSIGN(AsyncContext);
};
+FileStream::AsyncContext::AsyncContext()
+ : message_loop_(MessageLoopForIO::current()),
+ callback_(NULL),
+ background_io_completed_callback_(
+ this, &AsyncContext::OnBackgroundIOCompleted),
+ background_io_completed_(true, false),
+ message_loop_task_(NULL) {}
+
+FileStream::AsyncContext::~AsyncContext() {
+ if (callback_) {
+ // If |callback_| is non-NULL, that implies either the worker thread is
+ // still running the IO task, or the completion callback is queued up on the
+ // MessageLoopForIO, but AsyncContext() got deleted before then.
+ const bool need_to_wait = !background_io_completed_.IsSignaled();
+ base::Time start = base::Time::Now();
+ RunAsynchronousCallback();
+ if (need_to_wait) {
+ // We want to see if we block the message loop for too long.
+ UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose", base::Time::Now() - start);
+ }
+ }
+}
+
+void FileStream::AsyncContext::InitiateAsyncRead(
+ base::PlatformFile file, char* buf, int buf_len,
+ CompletionCallback* callback) {
+ DCHECK(!callback_);
+ callback_ = callback;
+
+ WorkerPool::PostTask(FROM_HERE,
+ new BackgroundReadTask(
+ file, buf, buf_len,
+ &background_io_completed_callback_),
+ true /* task_is_slow */);
+}
+
+void FileStream::AsyncContext::InitiateAsyncWrite(
+ base::PlatformFile file, const char* buf, int buf_len,
+ CompletionCallback* callback) {
+ DCHECK(!callback_);
+ callback_ = callback;
+
+ WorkerPool::PostTask(FROM_HERE,
+ new BackgroundWriteTask(
+ file, buf, buf_len,
+ &background_io_completed_callback_),
+ true /* task_is_slow */);
+}
+
+void FileStream::AsyncContext::OnBackgroundIOCompleted(int result) {
+ result_ = result;
+ message_loop_task_ = new CancelableCallbackTask(
+ NewCallback(this, &AsyncContext::RunAsynchronousCallback));
+ message_loop_->PostTask(FROM_HERE, message_loop_task_);
+ background_io_completed_.Signal();
+}
+
+void FileStream::AsyncContext::RunAsynchronousCallback() {
+ // Wait() here ensures that all modifications from the WorkerPool thread are
+ // now visible.
+ background_io_completed_.Wait();
+
+ // Either we're in the MessageLoop's task, in which case Cancel() doesn't do
+ // anything, or we're in ~AsyncContext(), in which case this prevents the call
+ // from happening again. Must do it here after calling Wait().
+ message_loop_task_->Cancel();
+ message_loop_task_ = NULL;
+
+ DCHECK(callback_);
+ CompletionCallback* temp = NULL;
+ std::swap(temp, callback_);
+ background_io_completed_.Reset();
+ temp->Run(result_);
+}
+
// FileStream ------------------------------------------------------------
FileStream::FileStream() : file_(base::kInvalidPlatformFileValue) {
@@ -49,8 +295,11 @@ FileStream::FileStream() : file_(base::kInvalidPlatformFileValue) {
FileStream::FileStream(base::PlatformFile file, int flags)
: file_(file), open_flags_(flags) {
- // TODO(hclam): initialize the aync_context_ if the file handle
- // is opened as an asynchronous file handle.
+ // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to
+ // make sure we will perform asynchronous File IO to it.
+ if (flags & base::PLATFORM_FILE_ASYNC) {
+ async_context_.reset(new AsyncContext());
+ }
}
FileStream::~FileStream() {
@@ -58,26 +307,15 @@ FileStream::~FileStream() {
}
void FileStream::Close() {
+ // Abort any existing asynchronous operations.
+ async_context_.reset();
+
if (file_ != base::kInvalidPlatformFileValue) {
if (close(file_) != 0) {
NOTREACHED();
}
file_ = base::kInvalidPlatformFileValue;
}
- async_context_.reset();
-}
-
-// Map from errno to net error codes.
-static int64 MapErrorCode(int err) {
- switch(err) {
- case ENOENT:
- return ERR_FILE_NOT_FOUND;
- case EACCES:
- return ERR_ACCESS_DENIED;
- default:
- LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED";
- return ERR_FAILED;
- }
}
int FileStream::Open(const FilePath& path, int open_flags) {
@@ -93,6 +331,10 @@ int FileStream::Open(const FilePath& path, int open_flags) {
return MapErrorCode(errno);
}
+ if (open_flags_ & base::PLATFORM_FILE_ASYNC) {
+ async_context_.reset(new AsyncContext());
+ }
+
return OK;
}
@@ -128,29 +370,28 @@ int64 FileStream::Available() {
return MapErrorCode(errno);
int64 size = static_cast<int64>(info.st_size);
- DCHECK(size >= cur_pos);
+ DCHECK_GT(size, cur_pos);
return size - cur_pos;
}
-// TODO(deanm): async.
int FileStream::Read(
char* buf, int buf_len, CompletionCallback* callback) {
- // read(..., 0) will return 0, which indicates end-of-file.
- DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX);
-
if (!IsOpen())
return ERR_UNEXPECTED;
- // Loop in the case of getting interrupted by a signal.
- for (;;) {
- ssize_t res = read(file_, buf, static_cast<size_t>(buf_len));
- if (res == static_cast<ssize_t>(-1)) {
- if (errno == EINTR)
- continue;
- return MapErrorCode(errno);
- }
- return static_cast<int>(res);
+ // read(..., 0) will return 0, which indicates end-of-file.
+ DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX);
+ DCHECK(open_flags_ & base::PLATFORM_FILE_READ);
+
+ if (async_context_.get()) {
+ DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
+ // If we're in async, make sure we don't have a request in flight.
+ DCHECK(!async_context_->callback());
+ async_context_->InitiateAsyncRead(file_, buf, buf_len, callback);
+ return ERR_IO_PENDING;
+ } else {
+ return ReadFile(file_, buf, buf_len);
}
}
@@ -175,30 +416,23 @@ int FileStream::ReadUntilComplete(char *buf, int buf_len) {
return bytes_total;
}
-// TODO(deanm): async.
int FileStream::Write(
const char* buf, int buf_len, CompletionCallback* callback) {
-
- // read(..., 0) will return 0, which indicates end-of-file.
+ // write(..., 0) will return 0, which indicates end-of-file.
DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX);
if (!IsOpen())
return ERR_UNEXPECTED;
- int total_bytes_written = 0;
- size_t len = static_cast<size_t>(buf_len);
- while (total_bytes_written < buf_len) {
- ssize_t res = write(file_, buf, len);
- if (res == static_cast<ssize_t>(-1)) {
- if (errno == EINTR)
- continue;
- return MapErrorCode(errno);
- }
- total_bytes_written += res;
- buf += res;
- len -= res;
+ if (async_context_.get()) {
+ DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
+ // If we're in async, make sure we don't have a request in flight.
+ DCHECK(!async_context_->callback());
+ async_context_->InitiateAsyncWrite(file_, buf, buf_len, callback);
+ return ERR_IO_PENDING;
+ } else {
+ return WriteFile(file_, buf, buf_len);
}
- return total_bytes_written;
}
int64 FileStream::Truncate(int64 bytes) {