summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-08-21 18:35:24 +0000
committerrvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-08-21 18:35:24 +0000
commitf3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e (patch)
treec7302913bc73ae2bca0d2dfa8ba87bd152e17a2e
parentf3642e9ea5c41ae3dac690e29bad3df295bc52a9 (diff)
downloadchromium_src-f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e.zip
chromium_src-f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e.tar.gz
chromium_src-f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e.tar.bz2
Disk cache: Implement asynchronous IO for Posix.
BUG=16507 TEST=Unittests Review URL: http://codereview.chromium.org/173170 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@23990 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--net/disk_cache/backend_impl.cc2
-rw-r--r--net/disk_cache/backend_unittest.cc33
-rw-r--r--net/disk_cache/cache_util.h3
-rw-r--r--net/disk_cache/cache_util_posix.cc6
-rw-r--r--net/disk_cache/cache_util_win.cc11
-rw-r--r--net/disk_cache/entry_unittest.cc17
-rw-r--r--net/disk_cache/file.h3
-rw-r--r--net/disk_cache/file_posix.cc273
-rw-r--r--net/disk_cache/file_win.cc25
9 files changed, 326 insertions, 47 deletions
diff --git a/net/disk_cache/backend_impl.cc b/net/disk_cache/backend_impl.cc
index 426a130..f61fab8 100644
--- a/net/disk_cache/backend_impl.cc
+++ b/net/disk_cache/backend_impl.cc
@@ -346,7 +346,7 @@ BackendImpl::~BackendImpl() {
timer_.Stop();
- WaitForPendingIO(&num_pending_io_);
+ File::WaitForPendingIO(&num_pending_io_);
DCHECK(!num_refs_);
}
diff --git a/net/disk_cache/backend_unittest.cc b/net/disk_cache/backend_unittest.cc
index ec9b61e..ad7ff66 100644
--- a/net/disk_cache/backend_unittest.cc
+++ b/net/disk_cache/backend_unittest.cc
@@ -208,6 +208,39 @@ TEST_F(DiskCacheBackendTest, ExternalFiles) {
EXPECT_EQ(0, memcmp(buffer1->data(), buffer2->data(), kSize));
}
+TEST_F(DiskCacheTest, ShutdownWithPendingIO) {
+ SimpleCallbackTest callback;
+
+ {
+ std::wstring path = GetCachePath();
+ ASSERT_TRUE(DeleteCache(path.c_str()));
+
+ disk_cache::Backend* cache =
+ disk_cache::CreateCacheBackend(path, false, 0, net::DISK_CACHE);
+
+ disk_cache::Entry* entry;
+ ASSERT_TRUE(cache->CreateEntry("some key", &entry));
+
+ const int kSize = 25000;
+ scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(kSize);
+ CacheTestFillBuffer(buffer->data(), kSize, false);
+
+ for (int i = 0; i < 10 * 1024 * 1024; i += 64 * 1024) {
+ int rv = entry->WriteData(0, i, buffer, kSize, &callback, false);
+ if (rv == net::ERR_IO_PENDING)
+ break;
+ EXPECT_EQ(kSize, rv);
+ }
+
+ entry->Close();
+
+ // The cache destructor will see one pending operation here.
+ delete cache;
+ }
+
+ MessageLoop::current()->RunAllPending();
+}
+
void DiskCacheBackendTest::BackendSetSize() {
SetDirectMode();
const int cache_size = 0x10000; // 64 kB
diff --git a/net/disk_cache/cache_util.h b/net/disk_cache/cache_util.h
index 78566a3..cb09521 100644
--- a/net/disk_cache/cache_util.h
+++ b/net/disk_cache/cache_util.h
@@ -22,9 +22,6 @@ void DeleteCache(const std::wstring& path, bool remove_folder);
// Deletes a cache file.
bool DeleteCacheFile(const std::wstring& name);
-// Blocks until |num_pending_io| IO operations complete.
-void WaitForPendingIO(int* num_pending_io);
-
} // namespace disk_cache
#endif // NET_DISK_CACHE_CACHE_UTIL_H_
diff --git a/net/disk_cache/cache_util_posix.cc b/net/disk_cache/cache_util_posix.cc
index 24ef4d5..cd8fdea 100644
--- a/net/disk_cache/cache_util_posix.cc
+++ b/net/disk_cache/cache_util_posix.cc
@@ -34,10 +34,4 @@ bool DeleteCacheFile(const std::wstring& name) {
return file_util::Delete(name, false);
}
-void WaitForPendingIO(int* num_pending_io) {
- if (*num_pending_io) {
- NOTIMPLEMENTED();
- }
-}
-
} // namespace disk_cache
diff --git a/net/disk_cache/cache_util_win.cc b/net/disk_cache/cache_util_win.cc
index a82231a..3a99836 100644
--- a/net/disk_cache/cache_util_win.cc
+++ b/net/disk_cache/cache_util_win.cc
@@ -39,9 +39,6 @@ void DeleteFiles(const wchar_t* path, const wchar_t* search_name) {
namespace disk_cache {
-// Implemented in file_win.cc.
-MessageLoopForIO::IOHandler* GetFileIOHandler();
-
bool MoveCache(const std::wstring& from_path, const std::wstring& to_path) {
// I don't want to use the shell version of move because if something goes
// wrong, that version will attempt to move file by file and fail at the end.
@@ -64,12 +61,4 @@ bool DeleteCacheFile(const std::wstring& name) {
return DeleteFile(name.c_str()) ? true : false;
}
-void WaitForPendingIO(int* num_pending_io) {
- while (*num_pending_io) {
- // Asynchronous IO operations may be in flight and the completion may end
- // up calling us back so let's wait for them.
- MessageLoopForIO::current()->WaitForIOCompletion(100, GetFileIOHandler());
- }
-}
-
} // namespace disk_cache
diff --git a/net/disk_cache/entry_unittest.cc b/net/disk_cache/entry_unittest.cc
index df5a058..32ac93d 100644
--- a/net/disk_cache/entry_unittest.cc
+++ b/net/disk_cache/entry_unittest.cc
@@ -99,6 +99,16 @@ void DiskCacheEntryTest::InternalAsyncIO() {
ASSERT_TRUE(cache_->CreateEntry("the first key", &entry1));
ASSERT_TRUE(NULL != entry1);
+ // Avoid using internal buffers for the test. We have to write something to
+ // the entry and close it so that we flush the internal buffer to disk. After
+ // that, IO operations will be really hitting the disk. We don't care about
+ // the content, so just extending the entry is enough (all extensions zero-
+ // fill any holes).
+ EXPECT_EQ(0, entry1->WriteData(0, 15 * 1024, NULL, 0, NULL, false));
+ EXPECT_EQ(0, entry1->WriteData(1, 15 * 1024, NULL, 0, NULL, false));
+ entry1->Close();
+ ASSERT_TRUE(cache_->OpenEntry("the first key", &entry1));
+
// Let's verify that each IO goes to the right callback object.
CallbackTest callback1(false);
CallbackTest callback2(false);
@@ -129,7 +139,7 @@ void DiskCacheEntryTest::InternalAsyncIO() {
CacheTestFillBuffer(buffer2->data(), kSize2, false);
CacheTestFillBuffer(buffer3->data(), kSize3, false);
- EXPECT_EQ(0, entry1->ReadData(0, 0, buffer1, kSize1, &callback1));
+ EXPECT_EQ(0, entry1->ReadData(0, 15 * 1024, buffer1, kSize1, &callback1));
base::strlcpy(buffer1->data(), "the data", kSize1);
int expected = 0;
int ret = entry1->WriteData(0, 0, buffer1, kSize1, &callback2, false);
@@ -147,7 +157,7 @@ void DiskCacheEntryTest::InternalAsyncIO() {
EXPECT_STREQ("the data", buffer2->data());
base::strlcpy(buffer2->data(), "The really big data goes here", kSize2);
- ret = entry1->WriteData(1, 1500, buffer2, kSize2, &callback4, false);
+ ret = entry1->WriteData(1, 1500, buffer2, kSize2, &callback4, true);
EXPECT_TRUE(5000 == ret || net::ERR_IO_PENDING == ret);
if (net::ERR_IO_PENDING == ret)
expected++;
@@ -174,13 +184,12 @@ void DiskCacheEntryTest::InternalAsyncIO() {
if (net::ERR_IO_PENDING == ret)
expected++;
- EXPECT_EQ(0, entry1->ReadData(1, 6500, buffer2, kSize2, &callback8));
ret = entry1->ReadData(1, 0, buffer3, kSize3, &callback9);
EXPECT_TRUE(6500 == ret || net::ERR_IO_PENDING == ret);
if (net::ERR_IO_PENDING == ret)
expected++;
- ret = entry1->WriteData(1, 0, buffer3, 8192, &callback10, false);
+ ret = entry1->WriteData(1, 0, buffer3, 8192, &callback10, true);
EXPECT_TRUE(8192 == ret || net::ERR_IO_PENDING == ret);
if (net::ERR_IO_PENDING == ret)
expected++;
diff --git a/net/disk_cache/file.h b/net/disk_cache/file.h
index bb0c56f..3c167db 100644
--- a/net/disk_cache/file.h
+++ b/net/disk_cache/file.h
@@ -67,6 +67,9 @@ class File : public base::RefCounted<File> {
bool SetLength(size_t length);
size_t GetLength();
+ // Blocks until |num_pending_io| IO operations complete.
+ static void WaitForPendingIO(int* num_pending_io);
+
protected:
virtual ~File();
diff --git a/net/disk_cache/file_posix.cc b/net/disk_cache/file_posix.cc
index a7d1c24..b080f53 100644
--- a/net/disk_cache/file_posix.cc
+++ b/net/disk_cache/file_posix.cc
@@ -6,9 +6,239 @@
#include <fcntl.h>
+#include <set>
+
#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/singleton.h"
+#include "base/waitable_event.h"
+#include "base/worker_pool.h"
#include "net/disk_cache/disk_cache.h"
+namespace {
+
+class InFlightIO;
+
+// This class represents a single asynchronous IO operation while it is being
+// bounced between threads.
+class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> {
+ public:
+ // Other than the actual parameters for the IO operation (including the
+ // |callback| that must be notified at the end), we need the controller that
+ // is keeping track of all operations. When done, we notify the controller
+ // (we do NOT invoke the callback), in the worker thead that completed the
+ // operation.
+ BackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len,
+ size_t offset, disk_cache::FileIOCallback* callback,
+ InFlightIO* controller)
+ : io_completed_(true, false), callback_(callback), file_(file), buf_(buf),
+ buf_len_(buf_len), offset_(offset), controller_(controller),
+ bytes_(0) {}
+
+ ~BackgroundIO() {}
+
+ // Read and Write are the operations that can be performed asynchronously.
+ // The actual parameters for the operation are setup in the constructor of
+ // the object, with the exception of |delete_buffer|, that allows a write
+ // without a callback. Both methods should be called from a worker thread, by
+ // posting a task to the WorkerPool (they are RunnableMethods). When finished,
+ // controller->OnIOComplete() is called.
+ void Read();
+ void Write(bool delete_buffer);
+
+ // This method signals the controller that this operation is finished, in the
+ // original thread (presumably the IO-Thread). In practice, this is a
+ // RunableMethod that allows cancellation.
+ void OnIOSignalled();
+
+ // Allows the cancellation of the task to notify the controller (step number 7
+ // in the diagram below). In practice, if the controller waits for the
+ // operation to finish it doesn't have to wait for the final task to be
+ // processed by the message loop so calling this method prevents its delivery.
+ // Note that this method is not intended to cancel the actual IO operation or
+ // to prevent the first notification to take place (OnIOComplete).
+ void Cancel();
+
+ // Retrieves the number of bytes transfered.
+ int Result();
+
+ base::WaitableEvent* io_completed() {
+ return &io_completed_;
+ }
+
+ disk_cache::FileIOCallback* callback() {
+ return callback_;
+ }
+
+ private:
+ // An event to signal when the operation completes, and the user callback tha
+ // has to be invoked. These members are accessed directly by the controller.
+ base::WaitableEvent io_completed_;
+ disk_cache::FileIOCallback* callback_;
+
+ scoped_refptr<disk_cache::File> file_;
+ const void* buf_;
+ size_t buf_len_;
+ size_t offset_;
+ InFlightIO* controller_; // The controller that tracks all operations.
+ int bytes_; // Final operation result.
+
+ DISALLOW_COPY_AND_ASSIGN(BackgroundIO);
+};
+
+// This class keeps track of every asynchronous IO operation. A single instance
+// of this class is meant to be used to start an asynchronous operation (using
+// PostRead/PostWrite). This class will post the operation to a worker thread,
+// hanlde the notification when the operation finishes and perform the callback
+// on the same thread that was used to start the operation.
+//
+// The regular sequence of calls is:
+// Thread_1 Worker_thread
+// 1. InFlightIO::PostRead()
+// 2. -> PostTask ->
+// 3. BackgroundIO::Read()
+// 4. IO operation completes
+// 5. InFlightIO::OnIOComplete()
+// 6. <- PostTask <-
+// 7. BackgroundIO::OnIOSignalled()
+// 8. InFlightIO::InvokeCallback()
+// 9. invoke callback
+//
+// Shutdown is a special case that is handled though WaitForPendingIO() instead
+// of just waiting for step 7.
+class InFlightIO {
+ public:
+ InFlightIO() : callback_thread_(MessageLoop::current()) {}
+ ~InFlightIO() {}
+
+ // These methods start an asynchronous operation. The arguments have the same
+ // semantics of the File asynchronous operations, with the exception that the
+ // operation never finishes synchronously.
+ void PostRead(disk_cache::File* file, void* buf, size_t buf_len,
+ size_t offset, disk_cache::FileIOCallback* callback);
+ void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len,
+ size_t offset, disk_cache::FileIOCallback* callback,
+ bool delete_buffer);
+
+ // Blocks the current thread until all IO operations tracked by this object
+ // complete.
+ void WaitForPendingIO();
+
+ // Called on a worker thread when |operation| completes.
+ void OnIOComplete(BackgroundIO* operation);
+
+ // Invokes the users' completion callback at the end of the IO operation.
+ // |cancel_task| is true if the actual task posted to the thread is still
+ // queued (because we are inside WaitForPendingIO), and false if said task is
+ // the one performing the call.
+ void InvokeCallback(BackgroundIO* operation, bool cancel_task);
+
+ private:
+ typedef std::set<scoped_refptr<BackgroundIO> > IOList;
+
+ IOList io_list_; // List of pending io operations.
+ MessageLoop* callback_thread_;
+};
+
+// ---------------------------------------------------------------------------
+
+// Runs on a worker thread.
+void BackgroundIO::Read() {
+ if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) {
+ bytes_ = static_cast<int>(buf_len_);
+ } else {
+ bytes_ = -1;
+ }
+ controller_->OnIOComplete(this);
+}
+
+int BackgroundIO::Result() {
+ return bytes_;
+}
+
+void BackgroundIO::Cancel() {
+ DCHECK(controller_);
+ controller_ = NULL;
+}
+
+// Runs on a worker thread.
+void BackgroundIO::Write(bool delete_buffer) {
+ bool rv = file_->Write(buf_, buf_len_, offset_);
+ if (delete_buffer) {
+ // TODO(rvargas): remove or update this code.
+ delete[] reinterpret_cast<const char*>(buf_);
+ }
+
+ bytes_ = rv ? static_cast<int>(buf_len_) : -1;
+ controller_->OnIOComplete(this);
+}
+
+// Runs on the IO thread.
+void BackgroundIO::OnIOSignalled() {
+ if (controller_)
+ controller_->InvokeCallback(this, false);
+}
+
+// ---------------------------------------------------------------------------
+
+void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len,
+ size_t offset, disk_cache::FileIOCallback *callback) {
+ scoped_refptr<BackgroundIO> operation =
+ new BackgroundIO(file, buf, buf_len, offset, callback, this);
+ io_list_.insert(operation.get());
+
+ WorkerPool::PostTask(FROM_HERE,
+ NewRunnableMethod(operation.get(), &BackgroundIO::Read),
+ true);
+}
+
+void InFlightIO::PostWrite(disk_cache::File* file, const void* buf,
+ size_t buf_len, size_t offset,
+ disk_cache::FileIOCallback* callback,
+ bool delete_buffer) {
+ scoped_refptr<BackgroundIO> operation =
+ new BackgroundIO(file, buf, buf_len, offset, callback, this);
+ io_list_.insert(operation.get());
+
+ WorkerPool::PostTask(FROM_HERE,
+ NewRunnableMethod(operation.get(), &BackgroundIO::Write,
+ delete_buffer),
+ true);
+}
+
+void InFlightIO::WaitForPendingIO() {
+ while (!io_list_.empty()) {
+ // Block the current thread until all pending IO completes.
+ IOList::iterator it = io_list_.begin();
+ InvokeCallback(*it, true);
+ }
+}
+
+// Runs on a worker thread.
+void InFlightIO::OnIOComplete(BackgroundIO* operation) {
+ callback_thread_->PostTask(FROM_HERE,
+ NewRunnableMethod(operation,
+ &BackgroundIO::OnIOSignalled));
+ operation->io_completed()->Signal();
+}
+
+// Runs on the IO thread.
+void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) {
+ operation->io_completed()->Wait();
+
+ if (cancel_task)
+ operation->Cancel();
+
+ disk_cache::FileIOCallback* callback = operation->callback();
+ int bytes = operation->Result();
+
+ // Release the reference acquired in PostRead / PostWrite.
+ io_list_.erase(operation);
+ callback->OnFileIOComplete(bytes);
+}
+
+} // namespace
+
namespace disk_cache {
File::File(base::PlatformFile file)
@@ -71,19 +301,31 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
bool File::Read(void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
+ if (!callback) {
+ if (completed)
+ *completed = true;
+ return Read(buffer, buffer_len, offset);
+ }
+
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
- // TODO: Implement async IO.
- bool ret = Read(buffer, buffer_len, offset);
- if (ret && completed)
- *completed = true;
- return ret;
+ InFlightIO* io_operations = Singleton<InFlightIO>::get();
+ io_operations->PostRead(this, buffer, buffer_len, offset, callback);
+
+ *completed = false;
+ return true;
}
bool File::Write(const void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
+ if (!callback) {
+ if (completed)
+ *completed = true;
+ return Write(buffer, buffer_len, offset);
+ }
+
return AsyncWrite(buffer, buffer_len, offset, true, callback, completed);
}
@@ -98,17 +340,12 @@ bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset,
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
- // TODO: Implement async IO.
- bool ret = Write(buffer, buffer_len, offset);
- if (ret && completed)
- *completed = true;
-
- // If we supply our own async callback, and the caller is not asking to be
- // notified when completed, we are supposed to delete the buffer.
- if (ret && !callback && !notify)
- delete[] reinterpret_cast<const char*>(buffer);
+ InFlightIO* io_operations = Singleton<InFlightIO>::get();
+ io_operations->PostWrite(this, buffer, buffer_len, offset, callback, !notify);
- return ret;
+ if (completed)
+ *completed = false;
+ return true;
}
bool File::SetLength(size_t length) {
@@ -125,4 +362,10 @@ size_t File::GetLength() {
return ret;
}
+// Static.
+void File::WaitForPendingIO(int* num_pending_io) {
+ if (*num_pending_io)
+ Singleton<InFlightIO>::get()->WaitForPendingIO();
+}
+
} // namespace disk_cache
diff --git a/net/disk_cache/file_win.cc b/net/disk_cache/file_win.cc
index c2e00b7..b341caf 100644
--- a/net/disk_cache/file_win.cc
+++ b/net/disk_cache/file_win.cc
@@ -72,11 +72,6 @@ MyOverlapped::~MyOverlapped() {
namespace disk_cache {
-// Used from WaitForPendingIO() when the cache is being destroyed.
-MessageLoopForIO::IOHandler* GetFileIOHandler() {
- return Singleton<CompletionHandler>::get();
-}
-
File::File(base::PlatformFile file)
: init_(true), mixed_(true), platform_file_(INVALID_HANDLE_VALUE),
sync_platform_file_(file) {
@@ -171,8 +166,11 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
bool File::Read(void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
- if (!callback)
+ if (!callback) {
+ if (completed)
+ *completed = true;
return Read(buffer, buffer_len, offset);
+ }
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
@@ -200,8 +198,11 @@ bool File::Read(void* buffer, size_t buffer_len, size_t offset,
bool File::Write(const void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
- if (!callback)
+ if (!callback) {
+ if (completed)
+ *completed = true;
return Write(buffer, buffer_len, offset);
+ }
return AsyncWrite(buffer, buffer_len, offset, true, callback, completed);
}
@@ -270,4 +271,14 @@ size_t File::GetLength() {
return static_cast<size_t>(size.LowPart);
}
+// Static.
+void File::WaitForPendingIO(int* num_pending_io) {
+ while (*num_pending_io) {
+ // Asynchronous IO operations may be in flight and the completion may end
+ // up calling us back so let's wait for them.
+ MessageLoopForIO::IOHandler* handler = Singleton<CompletionHandler>::get();
+ MessageLoopForIO::current()->WaitForIOCompletion(100, handler);
+ }
+}
+
} // namespace disk_cache