diff options
author | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-24 21:02:01 +0000 |
---|---|---|
committer | willchan@chromium.org <willchan@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-24 21:02:01 +0000 |
commit | 4c2048aace034bed3d2a6b47ef67438025144cbb (patch) | |
tree | 99cecccbd7e00b77ee5f96e2a0a94d6709339329 /net/base | |
parent | 6da922a74869fec3178cd2263b90ef0b92a232aa (diff) | |
download | chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.zip chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.tar.gz chromium_src-4c2048aace034bed3d2a6b47ef67438025144cbb.tar.bz2 |
* Switch the posix FileStream code over to using WorkerPool for asynchronous operations.
* Add a bunch of tests for asynchronous operations.
* Fix lint errors.
Before:
Summary
iterations 5
pages 20
milliseconds 18585
mean per set 3717.00
mean per page 185.85
timer lag 2663.00
timer lag per page 26.63
After:
Summary
iterations 5
pages 20
milliseconds 9279
mean per set 1855.80
mean per page 92.79
timer lag 689.00
timer lag per page 6.89
Review URL: http://codereview.chromium.org/48111
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12398 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/base')
-rw-r--r-- | net/base/file_stream_posix.cc | 330 | ||||
-rw-r--r-- | net/base/file_stream_unittest.cc | 491 |
2 files changed, 752 insertions, 69 deletions
diff --git a/net/base/file_stream_posix.cc b/net/base/file_stream_posix.cc index d7f4811..d1b70eb 100644 --- a/net/base/file_stream_posix.cc +++ b/net/base/file_stream_posix.cc @@ -17,6 +17,8 @@ #include "base/logging.h" #include "base/message_loop.h" #include "base/string_util.h" +#include "base/waitable_event.h" +#include "base/worker_pool.h" #include "net/base/net_errors.h" // We cast back and forth, so make sure it's the size we're expecting. @@ -28,19 +30,263 @@ COMPILE_ASSERT(net::FROM_BEGIN == SEEK_SET && net::FROM_END == SEEK_END, whence_matches_system); namespace net { +namespace { + +// Map from errno to net error codes. +int64 MapErrorCode(int err) { + switch (err) { + case ENOENT: + return ERR_FILE_NOT_FOUND; + case EACCES: + return ERR_ACCESS_DENIED; + default: + LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED"; + return ERR_FAILED; + } +} + +// ReadFile() is a simple wrapper around read() that handles EINTR signals and +// calls MapErrorCode() to map errno to net error codes. +int ReadFile(base::PlatformFile file, char* buf, int buf_len) { + // read(..., 0) returns 0 to indicate end-of-file. + + // Loop in the case of getting interrupted by a signal. + for (;;) { + ssize_t res = read(file, buf, static_cast<size_t>(buf_len)); + if (res == static_cast<ssize_t>(-1)) { + if (errno == EINTR) + continue; + return MapErrorCode(errno); + } + return static_cast<int>(res); + } +} + +// WriteFile() is a simple wrapper around write() that handles EINTR signals and +// calls MapErrorCode() to map errno to net error codes. It tries to write to +// completion. +int WriteFile(base::PlatformFile file, const char* buf, int buf_len) { + while (true) { + ssize_t res = write(file, buf, buf_len); + if (res == -1) { + if (errno == EINTR) + continue; + return MapErrorCode(errno); + } + return res; + } +} + +// BackgroundReadTask is a simple task that reads a file and then runs +// |callback|. AsyncContext will post this task to the WorkerPool. +class BackgroundReadTask : public Task { + public: + BackgroundReadTask(base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback); + ~BackgroundReadTask(); + + virtual void Run(); + + private: + const base::PlatformFile file_; + char* const buf_; + const int buf_len_; + CompletionCallback* const callback_; + + DISALLOW_COPY_AND_ASSIGN(BackgroundReadTask); +}; + +BackgroundReadTask::BackgroundReadTask( + base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback) + : file_(file), buf_(buf), buf_len_(buf_len), callback_(callback) {} + +BackgroundReadTask::~BackgroundReadTask() {} + +void BackgroundReadTask::Run() { + int result = ReadFile(file_, buf_, buf_len_); + callback_->Run(result); +} + +// BackgroundWriteTask is a simple task that writes to a file and then runs +// |callback|. AsyncContext will post this task to the WorkerPool. +class BackgroundWriteTask : public Task { + public: + BackgroundWriteTask(base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback); + ~BackgroundWriteTask(); + + virtual void Run(); + + private: + const base::PlatformFile file_; + const char* const buf_; + const int buf_len_; + CompletionCallback* const callback_; + + DISALLOW_COPY_AND_ASSIGN(BackgroundWriteTask); +}; + +BackgroundWriteTask::BackgroundWriteTask( + base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback) + : file_(file), buf_(buf), buf_len_(buf_len), callback_(callback) {} + +BackgroundWriteTask::~BackgroundWriteTask() {} + +void BackgroundWriteTask::Run() { + int result = WriteFile(file_, buf_, buf_len_); + callback_->Run(result); +} + +} // namespace + +// CancelableCallbackTask takes ownership of the Callback. This task gets +// posted to the MessageLoopForIO instance. +class CancelableCallbackTask : public CancelableTask { + public: + explicit CancelableCallbackTask(Callback0::Type* callback) + : canceled_(false), callback_(callback) {} + + virtual void Run() { + if (!canceled_) + callback_->Run(); + } + + virtual void Cancel() { + canceled_ = true; + } + + private: + bool canceled_; + scoped_ptr<Callback0::Type> callback_; +}; // FileStream::AsyncContext ---------------------------------------------- -// TODO(deanm): Figure out how to best do async IO. class FileStream::AsyncContext { public: + AsyncContext(); + ~AsyncContext(); + + // These methods post synchronous read() and write() calls to a WorkerThread. + void InitiateAsyncRead( + base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback); + void InitiateAsyncWrite( + base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback); + + CompletionCallback* callback() const { return callback_; } + + // Called by the WorkerPool thread executing the IO after the IO completes. + // This method queues RunAsynchronousCallback() on the MessageLoop and signals + // |background_io_completed_callback_|, in case the destructor is waiting. In + // that case, the destructor will call RunAsynchronousCallback() instead, and + // cancel |message_loop_task_|. + // |result| is the result of the Read/Write task. + void OnBackgroundIOCompleted(int result); - CompletionCallback* callback() const { return NULL; } private: + // Always called on the IO thread, either directly by a task on the + // MessageLoop or by ~AsyncContext(). + void RunAsynchronousCallback(); + + // The MessageLoopForIO that this AsyncContext is running on. + MessageLoopForIO* const message_loop_; + CompletionCallback* callback_; // The user provided callback. + + // A callback wrapper around OnBackgroundIOCompleted(). Run by the WorkerPool + // thread doing the background IO on our behalf. + CompletionCallbackImpl<AsyncContext> background_io_completed_callback_; + + // This is used to synchronize between the AsyncContext destructor (which runs + // on the IO thread and OnBackgroundIOCompleted() which runs on the WorkerPool + // thread. + base::WaitableEvent background_io_completed_; + + // These variables are only valid when background_io_completed is signaled. + int result_; + CancelableCallbackTask* message_loop_task_; DISALLOW_COPY_AND_ASSIGN(AsyncContext); }; +FileStream::AsyncContext::AsyncContext() + : message_loop_(MessageLoopForIO::current()), + callback_(NULL), + background_io_completed_callback_( + this, &AsyncContext::OnBackgroundIOCompleted), + background_io_completed_(true, false), + message_loop_task_(NULL) {} + +FileStream::AsyncContext::~AsyncContext() { + if (callback_) { + // If |callback_| is non-NULL, that implies either the worker thread is + // still running the IO task, or the completion callback is queued up on the + // MessageLoopForIO, but AsyncContext() got deleted before then. + const bool need_to_wait = !background_io_completed_.IsSignaled(); + base::Time start = base::Time::Now(); + RunAsynchronousCallback(); + if (need_to_wait) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose", base::Time::Now() - start); + } + } +} + +void FileStream::AsyncContext::InitiateAsyncRead( + base::PlatformFile file, char* buf, int buf_len, + CompletionCallback* callback) { + DCHECK(!callback_); + callback_ = callback; + + WorkerPool::PostTask(FROM_HERE, + new BackgroundReadTask( + file, buf, buf_len, + &background_io_completed_callback_), + true /* task_is_slow */); +} + +void FileStream::AsyncContext::InitiateAsyncWrite( + base::PlatformFile file, const char* buf, int buf_len, + CompletionCallback* callback) { + DCHECK(!callback_); + callback_ = callback; + + WorkerPool::PostTask(FROM_HERE, + new BackgroundWriteTask( + file, buf, buf_len, + &background_io_completed_callback_), + true /* task_is_slow */); +} + +void FileStream::AsyncContext::OnBackgroundIOCompleted(int result) { + result_ = result; + message_loop_task_ = new CancelableCallbackTask( + NewCallback(this, &AsyncContext::RunAsynchronousCallback)); + message_loop_->PostTask(FROM_HERE, message_loop_task_); + background_io_completed_.Signal(); +} + +void FileStream::AsyncContext::RunAsynchronousCallback() { + // Wait() here ensures that all modifications from the WorkerPool thread are + // now visible. + background_io_completed_.Wait(); + + // Either we're in the MessageLoop's task, in which case Cancel() doesn't do + // anything, or we're in ~AsyncContext(), in which case this prevents the call + // from happening again. Must do it here after calling Wait(). + message_loop_task_->Cancel(); + message_loop_task_ = NULL; + + DCHECK(callback_); + CompletionCallback* temp = NULL; + std::swap(temp, callback_); + background_io_completed_.Reset(); + temp->Run(result_); +} + // FileStream ------------------------------------------------------------ FileStream::FileStream() : file_(base::kInvalidPlatformFileValue) { @@ -49,8 +295,11 @@ FileStream::FileStream() : file_(base::kInvalidPlatformFileValue) { FileStream::FileStream(base::PlatformFile file, int flags) : file_(file), open_flags_(flags) { - // TODO(hclam): initialize the aync_context_ if the file handle - // is opened as an asynchronous file handle. + // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to + // make sure we will perform asynchronous File IO to it. + if (flags & base::PLATFORM_FILE_ASYNC) { + async_context_.reset(new AsyncContext()); + } } FileStream::~FileStream() { @@ -58,26 +307,15 @@ FileStream::~FileStream() { } void FileStream::Close() { + // Abort any existing asynchronous operations. + async_context_.reset(); + if (file_ != base::kInvalidPlatformFileValue) { if (close(file_) != 0) { NOTREACHED(); } file_ = base::kInvalidPlatformFileValue; } - async_context_.reset(); -} - -// Map from errno to net error codes. -static int64 MapErrorCode(int err) { - switch(err) { - case ENOENT: - return ERR_FILE_NOT_FOUND; - case EACCES: - return ERR_ACCESS_DENIED; - default: - LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED"; - return ERR_FAILED; - } } int FileStream::Open(const FilePath& path, int open_flags) { @@ -93,6 +331,10 @@ int FileStream::Open(const FilePath& path, int open_flags) { return MapErrorCode(errno); } + if (open_flags_ & base::PLATFORM_FILE_ASYNC) { + async_context_.reset(new AsyncContext()); + } + return OK; } @@ -128,29 +370,28 @@ int64 FileStream::Available() { return MapErrorCode(errno); int64 size = static_cast<int64>(info.st_size); - DCHECK(size >= cur_pos); + DCHECK_GT(size, cur_pos); return size - cur_pos; } -// TODO(deanm): async. int FileStream::Read( char* buf, int buf_len, CompletionCallback* callback) { - // read(..., 0) will return 0, which indicates end-of-file. - DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX); - if (!IsOpen()) return ERR_UNEXPECTED; - // Loop in the case of getting interrupted by a signal. - for (;;) { - ssize_t res = read(file_, buf, static_cast<size_t>(buf_len)); - if (res == static_cast<ssize_t>(-1)) { - if (errno == EINTR) - continue; - return MapErrorCode(errno); - } - return static_cast<int>(res); + // read(..., 0) will return 0, which indicates end-of-file. + DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX); + DCHECK(open_flags_ & base::PLATFORM_FILE_READ); + + if (async_context_.get()) { + DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + // If we're in async, make sure we don't have a request in flight. + DCHECK(!async_context_->callback()); + async_context_->InitiateAsyncRead(file_, buf, buf_len, callback); + return ERR_IO_PENDING; + } else { + return ReadFile(file_, buf, buf_len); } } @@ -175,30 +416,23 @@ int FileStream::ReadUntilComplete(char *buf, int buf_len) { return bytes_total; } -// TODO(deanm): async. int FileStream::Write( const char* buf, int buf_len, CompletionCallback* callback) { - - // read(..., 0) will return 0, which indicates end-of-file. + // write(..., 0) will return 0, which indicates end-of-file. DCHECK(buf_len > 0 && buf_len <= SSIZE_MAX); if (!IsOpen()) return ERR_UNEXPECTED; - int total_bytes_written = 0; - size_t len = static_cast<size_t>(buf_len); - while (total_bytes_written < buf_len) { - ssize_t res = write(file_, buf, len); - if (res == static_cast<ssize_t>(-1)) { - if (errno == EINTR) - continue; - return MapErrorCode(errno); - } - total_bytes_written += res; - buf += res; - len -= res; + if (async_context_.get()) { + DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + // If we're in async, make sure we don't have a request in flight. + DCHECK(!async_context_->callback()); + async_context_->InitiateAsyncWrite(file_, buf, buf_len, callback); + return ERR_IO_PENDING; + } else { + return WriteFile(file_, buf, buf_len); } - return total_bytes_written; } int64 FileStream::Truncate(int64 bytes) { diff --git a/net/base/file_stream_unittest.cc b/net/base/file_stream_unittest.cc index 966b9ff..947aa3c 100644 --- a/net/base/file_stream_unittest.cc +++ b/net/base/file_stream_unittest.cc @@ -11,8 +11,10 @@ #include "testing/gtest/include/gtest/gtest.h" #include "testing/platform_test.h" -static const char kTestData[] = "0123456789"; -static const int kTestDataSize = arraysize(kTestData) - 1; +namespace { + +const char kTestData[] = "0123456789"; +const int kTestDataSize = arraysize(kTestData) - 1; class FileStreamTest : public PlatformTest { public: @@ -93,7 +95,7 @@ TEST_F(FileStreamTest, UseClosedStream) { // Try reading... char buf[10]; - int rv = stream.Read(buf, sizeof(buf), NULL); + int rv = stream.Read(buf, arraysize(buf), NULL); EXPECT_EQ(net::ERR_UNEXPECTED, rv); } @@ -111,12 +113,12 @@ TEST_F(FileStreamTest, BasicRead) { int64 total_bytes_avail = stream.Available(); EXPECT_EQ(file_size, total_bytes_avail); - int64 total_bytes_read = 0; + int total_bytes_read = 0; std::string data_read; for (;;) { char buf[4]; - rv = stream.Read(buf, sizeof(buf), NULL); + rv = stream.Read(buf, arraysize(buf), NULL); EXPECT_LE(0, rv); if (rv <= 0) break; @@ -124,7 +126,7 @@ TEST_F(FileStreamTest, BasicRead) { data_read.append(buf, rv); } EXPECT_EQ(file_size, total_bytes_read); - EXPECT_TRUE(data_read == kTestData); + EXPECT_EQ(kTestData, data_read); } TEST_F(FileStreamTest, AsyncRead) { @@ -144,12 +146,12 @@ TEST_F(FileStreamTest, AsyncRead) { TestCompletionCallback callback; - int64 total_bytes_read = 0; + int total_bytes_read = 0; std::string data_read; for (;;) { char buf[4]; - rv = stream.Read(buf, sizeof(buf), &callback); + rv = stream.Read(buf, arraysize(buf), &callback); if (rv == net::ERR_IO_PENDING) rv = callback.WaitForResult(); EXPECT_LE(0, rv); @@ -159,7 +161,33 @@ TEST_F(FileStreamTest, AsyncRead) { data_read.append(buf, rv); } EXPECT_EQ(file_size, total_bytes_read); - EXPECT_TRUE(data_read == kTestData); + EXPECT_EQ(kTestData, data_read); +} + +TEST_F(FileStreamTest, AsyncRead_EarlyClose) { + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + + net::FileStream stream; + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_READ | + base::PLATFORM_FILE_ASYNC; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 total_bytes_avail = stream.Available(); + EXPECT_EQ(file_size, total_bytes_avail); + + TestCompletionCallback callback; + + char buf[4]; + rv = stream.Read(buf, arraysize(buf), &callback); + stream.Close(); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + ASSERT_LE(0, rv); + EXPECT_EQ(std::string(kTestData, rv), std::string(buf, rv)); } TEST_F(FileStreamTest, BasicRead_FromOffset) { @@ -185,7 +213,7 @@ TEST_F(FileStreamTest, BasicRead_FromOffset) { std::string data_read; for (;;) { char buf[4]; - rv = stream.Read(buf, sizeof(buf), NULL); + rv = stream.Read(buf, arraysize(buf), NULL); EXPECT_LE(0, rv); if (rv <= 0) break; @@ -194,6 +222,7 @@ TEST_F(FileStreamTest, BasicRead_FromOffset) { } EXPECT_EQ(file_size - kOffset, total_bytes_read); EXPECT_TRUE(data_read == kTestData + kOffset); + EXPECT_EQ(kTestData + kOffset, data_read); } TEST_F(FileStreamTest, AsyncRead_FromOffset) { @@ -217,12 +246,12 @@ TEST_F(FileStreamTest, AsyncRead_FromOffset) { TestCompletionCallback callback; - int64 total_bytes_read = 0; + int total_bytes_read = 0; std::string data_read; for (;;) { char buf[4]; - rv = stream.Read(buf, sizeof(buf), &callback); + rv = stream.Read(buf, arraysize(buf), &callback); if (rv == net::ERR_IO_PENDING) rv = callback.WaitForResult(); EXPECT_LE(0, rv); @@ -232,7 +261,7 @@ TEST_F(FileStreamTest, AsyncRead_FromOffset) { data_read.append(buf, rv); } EXPECT_EQ(file_size - kOffset, total_bytes_read); - EXPECT_TRUE(data_read == kTestData + kOffset); + EXPECT_EQ(kTestData + kOffset, data_read); } TEST_F(FileStreamTest, SeekAround) { @@ -293,10 +322,12 @@ TEST_F(FileStreamTest, AsyncWrite) { EXPECT_EQ(0, file_size); TestCompletionCallback callback; - int64 total_bytes_written = 0; + int total_bytes_written = 0; while (total_bytes_written != kTestDataSize) { - rv = stream.Write(kTestData, kTestDataSize, &callback); + rv = stream.Write(kTestData + total_bytes_written, + kTestDataSize - total_bytes_written, + &callback); if (rv == net::ERR_IO_PENDING) rv = callback.WaitForResult(); EXPECT_LT(0, rv); @@ -309,6 +340,34 @@ TEST_F(FileStreamTest, AsyncWrite) { EXPECT_EQ(file_size, total_bytes_written); } +TEST_F(FileStreamTest, AsyncWrite_EarlyClose) { + net::FileStream stream; + int flags = base::PLATFORM_FILE_CREATE_ALWAYS | + base::PLATFORM_FILE_WRITE | + base::PLATFORM_FILE_ASYNC; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(0, file_size); + + TestCompletionCallback callback; + int total_bytes_written = 0; + + rv = stream.Write(kTestData + total_bytes_written, + kTestDataSize - total_bytes_written, + &callback); + stream.Close(); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + ASSERT_LT(0, rv); + ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(file_size, rv); +} + TEST_F(FileStreamTest, BasicWrite_FromOffset) { net::FileStream stream; int flags = base::PLATFORM_FILE_OPEN | @@ -351,10 +410,12 @@ TEST_F(FileStreamTest, AsyncWrite_FromOffset) { EXPECT_EQ(kTestDataSize, new_offset); TestCompletionCallback callback; - int64 total_bytes_written = 0; + int total_bytes_written = 0; while (total_bytes_written != kTestDataSize) { - rv = stream.Write(kTestData, kTestDataSize, &callback); + rv = stream.Write(kTestData + total_bytes_written, + kTestDataSize - total_bytes_written, + &callback); if (rv == net::ERR_IO_PENDING) rv = callback.WaitForResult(); EXPECT_LT(0, rv); @@ -382,12 +443,12 @@ TEST_F(FileStreamTest, BasicReadWrite) { int64 total_bytes_avail = stream.Available(); EXPECT_EQ(file_size, total_bytes_avail); - int64 total_bytes_read = 0; + int total_bytes_read = 0; std::string data_read; for (;;) { char buf[4]; - rv = stream.Read(buf, sizeof(buf), NULL); + rv = stream.Read(buf, arraysize(buf), NULL); EXPECT_LE(0, rv); if (rv <= 0) break; @@ -406,6 +467,394 @@ TEST_F(FileStreamTest, BasicReadWrite) { EXPECT_EQ(kTestDataSize * 2, file_size); } +TEST_F(FileStreamTest, BasicWriteRead) { + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + + net::FileStream stream; + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_READ | + base::PLATFORM_FILE_WRITE; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 total_bytes_avail = stream.Available(); + EXPECT_EQ(file_size, total_bytes_avail); + + int64 offset = stream.Seek(net::FROM_END, 0); + EXPECT_EQ(offset, file_size); + + rv = stream.Write(kTestData, kTestDataSize, NULL); + EXPECT_EQ(kTestDataSize, rv); + + offset = stream.Seek(net::FROM_BEGIN, 0); + EXPECT_EQ(0, offset); + + int64 total_bytes_read = 0; + + std::string data_read; + for (;;) { + char buf[4]; + rv = stream.Read(buf, arraysize(buf), NULL); + EXPECT_LE(0, rv); + if (rv <= 0) + break; + total_bytes_read += rv; + data_read.append(buf, rv); + } + stream.Close(); + + ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(kTestDataSize * 2, file_size); + EXPECT_EQ(kTestDataSize * 2, total_bytes_read); + + const std::string kExpectedFileData = + std::string(kTestData) + std::string(kTestData); + EXPECT_EQ(kExpectedFileData, data_read); +} + +TEST_F(FileStreamTest, BasicAsyncReadWrite) { + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + + net::FileStream stream; + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_READ | + base::PLATFORM_FILE_WRITE | + base::PLATFORM_FILE_ASYNC; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 total_bytes_avail = stream.Available(); + EXPECT_EQ(file_size, total_bytes_avail); + + TestCompletionCallback callback; + int64 total_bytes_read = 0; + + std::string data_read; + for (;;) { + char buf[4]; + rv = stream.Read(buf, arraysize(buf), &callback); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + EXPECT_LE(0, rv); + if (rv <= 0) + break; + total_bytes_read += rv; + data_read.append(buf, rv); + } + EXPECT_EQ(file_size, total_bytes_read); + EXPECT_TRUE(data_read == kTestData); + + int total_bytes_written = 0; + + while (total_bytes_written != kTestDataSize) { + rv = stream.Write(kTestData + total_bytes_written, + kTestDataSize - total_bytes_written, + &callback); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + EXPECT_LT(0, rv); + if (rv <= 0) + break; + total_bytes_written += rv; + } + + stream.Close(); + + ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(kTestDataSize * 2, file_size); +} + +TEST_F(FileStreamTest, BasicAsyncWriteRead) { + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + + net::FileStream stream; + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_READ | + base::PLATFORM_FILE_WRITE | + base::PLATFORM_FILE_ASYNC; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 total_bytes_avail = stream.Available(); + EXPECT_EQ(file_size, total_bytes_avail); + + int64 offset = stream.Seek(net::FROM_END, 0); + EXPECT_EQ(offset, file_size); + + TestCompletionCallback callback; + int total_bytes_written = 0; + + while (total_bytes_written != kTestDataSize) { + rv = stream.Write(kTestData + total_bytes_written, + kTestDataSize - total_bytes_written, + &callback); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + EXPECT_LT(0, rv); + if (rv <= 0) + break; + total_bytes_written += rv; + } + + EXPECT_EQ(kTestDataSize, total_bytes_written); + + offset = stream.Seek(net::FROM_BEGIN, 0); + EXPECT_EQ(0, offset); + + int total_bytes_read = 0; + + std::string data_read; + for (;;) { + char buf[4]; + rv = stream.Read(buf, arraysize(buf), &callback); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + EXPECT_LE(0, rv); + if (rv <= 0) + break; + total_bytes_read += rv; + data_read.append(buf, rv); + } + stream.Close(); + + ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(kTestDataSize * 2, file_size); + + EXPECT_EQ(kTestDataSize * 2, total_bytes_read); + const std::string kExpectedFileData = + std::string(kTestData) + std::string(kTestData); + EXPECT_EQ(kExpectedFileData, data_read); +} + +class TestWriteReadCompletionCallback : public Callback1<int>::Type { + public: + explicit TestWriteReadCompletionCallback( + net::FileStream* stream, + int* total_bytes_written, + int* total_bytes_read, + std::string* data_read) + : result_(0), + have_result_(false), + waiting_for_result_(false), + stream_(stream), + total_bytes_written_(total_bytes_written), + total_bytes_read_(total_bytes_read), + data_read_(data_read) {} + + int WaitForResult() { + DCHECK(!waiting_for_result_); + while (!have_result_) { + waiting_for_result_ = true; + MessageLoop::current()->Run(); + waiting_for_result_ = false; + } + have_result_ = false; // auto-reset for next callback + return result_; + } + + private: + virtual void RunWithParams(const Tuple1<int>& params) { + DCHECK_LT(0, params.a); + *total_bytes_written_ += params.a; + + int rv; + + if (*total_bytes_written_ != kTestDataSize) { + // Recurse to finish writing all data. + int total_bytes_written = 0, total_bytes_read = 0; + std::string data_read; + TestWriteReadCompletionCallback callback( + stream_, &total_bytes_written, &total_bytes_read, &data_read); + rv = stream_->Write(kTestData + *total_bytes_written_, + kTestDataSize - *total_bytes_written_, + &callback); + DCHECK_EQ(net::ERR_IO_PENDING, rv); + rv = callback.WaitForResult(); + *total_bytes_written_ += total_bytes_written; + *total_bytes_read_ += total_bytes_read; + *data_read_ += data_read; + } else { // We're done writing all data. Start reading the data. + stream_->Seek(net::FROM_BEGIN, 0); + + TestCompletionCallback callback; + for (;;) { + char buf[4]; + rv = stream_->Read(buf, arraysize(buf), &callback); + if (rv == net::ERR_IO_PENDING) { + bool old_state = MessageLoop::current()->NestableTasksAllowed(); + MessageLoop::current()->SetNestableTasksAllowed(true); + rv = callback.WaitForResult(); + MessageLoop::current()->SetNestableTasksAllowed(old_state); + } + EXPECT_LE(0, rv); + if (rv <= 0) + break; + *total_bytes_read_ += rv; + data_read_->append(buf, rv); + } + } + + result_ = *total_bytes_written_; + have_result_ = true; + if (waiting_for_result_) + MessageLoop::current()->Quit(); + } + + int result_; + bool have_result_; + bool waiting_for_result_; + net::FileStream* stream_; + int* total_bytes_written_; + int* total_bytes_read_; + std::string* data_read_; + + DISALLOW_COPY_AND_ASSIGN(TestWriteReadCompletionCallback); +}; + +TEST_F(FileStreamTest, AsyncWriteRead) { + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + + net::FileStream stream; + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_READ | + base::PLATFORM_FILE_WRITE | + base::PLATFORM_FILE_ASYNC; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 total_bytes_avail = stream.Available(); + EXPECT_EQ(file_size, total_bytes_avail); + + int64 offset = stream.Seek(net::FROM_END, 0); + EXPECT_EQ(offset, file_size); + + int total_bytes_written = 0; + int total_bytes_read = 0; + std::string data_read; + TestWriteReadCompletionCallback callback(&stream, &total_bytes_written, + &total_bytes_read, &data_read); + + rv = stream.Write(kTestData + total_bytes_written, + kTestDataSize - static_cast<int>(total_bytes_written), + &callback); + if (rv == net::ERR_IO_PENDING) + rv = callback.WaitForResult(); + EXPECT_LT(0, rv); + EXPECT_EQ(kTestDataSize, total_bytes_written); + + stream.Close(); + + ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(kTestDataSize * 2, file_size); + + EXPECT_EQ(kTestDataSize * 2, total_bytes_read); + const std::string kExpectedFileData = + std::string(kTestData) + std::string(kTestData); + EXPECT_EQ(kExpectedFileData, data_read); +} + +class TestWriteCloseCompletionCallback : public Callback1<int>::Type { + public: + explicit TestWriteCloseCompletionCallback(net::FileStream* stream, + int* total_bytes_written) + : result_(0), + have_result_(false), + waiting_for_result_(false), + stream_(stream), + total_bytes_written_(total_bytes_written) {} + + int WaitForResult() { + DCHECK(!waiting_for_result_); + while (!have_result_) { + waiting_for_result_ = true; + MessageLoop::current()->Run(); + waiting_for_result_ = false; + } + have_result_ = false; // auto-reset for next callback + return result_; + } + + private: + virtual void RunWithParams(const Tuple1<int>& params) { + DCHECK_LT(0, params.a); + *total_bytes_written_ += params.a; + + int rv; + + if (*total_bytes_written_ != kTestDataSize) { + // Recurse to finish writing all data. + int total_bytes_written = 0; + TestWriteCloseCompletionCallback callback(stream_, &total_bytes_written); + rv = stream_->Write(kTestData + *total_bytes_written_, + kTestDataSize - *total_bytes_written_, + &callback); + DCHECK_EQ(net::ERR_IO_PENDING, rv); + rv = callback.WaitForResult(); + *total_bytes_written_ += total_bytes_written; + } else { // We're done writing all data. Close the file. + stream_->Close(); + } + + result_ = *total_bytes_written_; + have_result_ = true; + if (waiting_for_result_) + MessageLoop::current()->Quit(); + } + + int result_; + bool have_result_; + bool waiting_for_result_; + net::FileStream* stream_; + int* total_bytes_written_; + + DISALLOW_COPY_AND_ASSIGN(TestWriteCloseCompletionCallback); +}; + +TEST_F(FileStreamTest, AsyncWriteClose) { + int64 file_size; + bool ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + + net::FileStream stream; + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_READ | + base::PLATFORM_FILE_WRITE | + base::PLATFORM_FILE_ASYNC; + int rv = stream.Open(temp_file_path(), flags); + EXPECT_EQ(net::OK, rv); + + int64 total_bytes_avail = stream.Available(); + EXPECT_EQ(file_size, total_bytes_avail); + + int64 offset = stream.Seek(net::FROM_END, 0); + EXPECT_EQ(offset, file_size); + + int total_bytes_written = 0; + TestWriteCloseCompletionCallback callback(&stream, &total_bytes_written); + + rv = stream.Write(kTestData, kTestDataSize, &callback); + if (rv == net::ERR_IO_PENDING) + total_bytes_written = callback.WaitForResult(); + EXPECT_LT(0, total_bytes_written); + EXPECT_EQ(kTestDataSize, total_bytes_written); + + ok = file_util::GetFileSize(temp_file_path(), &file_size); + EXPECT_TRUE(ok); + EXPECT_EQ(kTestDataSize * 2, file_size); +} + // Tests truncating a file. TEST_F(FileStreamTest, Truncate) { int flags = base::PLATFORM_FILE_CREATE_ALWAYS | base::PLATFORM_FILE_WRITE; @@ -430,7 +879,7 @@ TEST_F(FileStreamTest, Truncate) { std::string read_contents; file_util::ReadFileToString(temp_file_path(), &read_contents); - ASSERT_TRUE(read_contents == "01230123"); + EXPECT_EQ("01230123", read_contents); } -// TODO(erikkay): more READ_WRITE tests? +} // namespace |