diff options
author | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-08-21 18:35:24 +0000 |
---|---|---|
committer | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-08-21 18:35:24 +0000 |
commit | f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e (patch) | |
tree | c7302913bc73ae2bca0d2dfa8ba87bd152e17a2e | |
parent | f3642e9ea5c41ae3dac690e29bad3df295bc52a9 (diff) | |
download | chromium_src-f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e.zip chromium_src-f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e.tar.gz chromium_src-f3c56e7003ecb777c9e981e7ccc0a5c5fe9ced1e.tar.bz2 |
Disk cache: Implement asynchronous IO for Posix.
BUG=16507
TEST=Unittests
Review URL: http://codereview.chromium.org/173170
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@23990 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/disk_cache/backend_impl.cc | 2 | ||||
-rw-r--r-- | net/disk_cache/backend_unittest.cc | 33 | ||||
-rw-r--r-- | net/disk_cache/cache_util.h | 3 | ||||
-rw-r--r-- | net/disk_cache/cache_util_posix.cc | 6 | ||||
-rw-r--r-- | net/disk_cache/cache_util_win.cc | 11 | ||||
-rw-r--r-- | net/disk_cache/entry_unittest.cc | 17 | ||||
-rw-r--r-- | net/disk_cache/file.h | 3 | ||||
-rw-r--r-- | net/disk_cache/file_posix.cc | 273 | ||||
-rw-r--r-- | net/disk_cache/file_win.cc | 25 |
9 files changed, 326 insertions, 47 deletions
diff --git a/net/disk_cache/backend_impl.cc b/net/disk_cache/backend_impl.cc index 426a130..f61fab8 100644 --- a/net/disk_cache/backend_impl.cc +++ b/net/disk_cache/backend_impl.cc @@ -346,7 +346,7 @@ BackendImpl::~BackendImpl() { timer_.Stop(); - WaitForPendingIO(&num_pending_io_); + File::WaitForPendingIO(&num_pending_io_); DCHECK(!num_refs_); } diff --git a/net/disk_cache/backend_unittest.cc b/net/disk_cache/backend_unittest.cc index ec9b61e..ad7ff66 100644 --- a/net/disk_cache/backend_unittest.cc +++ b/net/disk_cache/backend_unittest.cc @@ -208,6 +208,39 @@ TEST_F(DiskCacheBackendTest, ExternalFiles) { EXPECT_EQ(0, memcmp(buffer1->data(), buffer2->data(), kSize)); } +TEST_F(DiskCacheTest, ShutdownWithPendingIO) { + SimpleCallbackTest callback; + + { + std::wstring path = GetCachePath(); + ASSERT_TRUE(DeleteCache(path.c_str())); + + disk_cache::Backend* cache = + disk_cache::CreateCacheBackend(path, false, 0, net::DISK_CACHE); + + disk_cache::Entry* entry; + ASSERT_TRUE(cache->CreateEntry("some key", &entry)); + + const int kSize = 25000; + scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(kSize); + CacheTestFillBuffer(buffer->data(), kSize, false); + + for (int i = 0; i < 10 * 1024 * 1024; i += 64 * 1024) { + int rv = entry->WriteData(0, i, buffer, kSize, &callback, false); + if (rv == net::ERR_IO_PENDING) + break; + EXPECT_EQ(kSize, rv); + } + + entry->Close(); + + // The cache destructor will see one pending operation here. + delete cache; + } + + MessageLoop::current()->RunAllPending(); +} + void DiskCacheBackendTest::BackendSetSize() { SetDirectMode(); const int cache_size = 0x10000; // 64 kB diff --git a/net/disk_cache/cache_util.h b/net/disk_cache/cache_util.h index 78566a3..cb09521 100644 --- a/net/disk_cache/cache_util.h +++ b/net/disk_cache/cache_util.h @@ -22,9 +22,6 @@ void DeleteCache(const std::wstring& path, bool remove_folder); // Deletes a cache file. bool DeleteCacheFile(const std::wstring& name); -// Blocks until |num_pending_io| IO operations complete. -void WaitForPendingIO(int* num_pending_io); - } // namespace disk_cache #endif // NET_DISK_CACHE_CACHE_UTIL_H_ diff --git a/net/disk_cache/cache_util_posix.cc b/net/disk_cache/cache_util_posix.cc index 24ef4d5..cd8fdea 100644 --- a/net/disk_cache/cache_util_posix.cc +++ b/net/disk_cache/cache_util_posix.cc @@ -34,10 +34,4 @@ bool DeleteCacheFile(const std::wstring& name) { return file_util::Delete(name, false); } -void WaitForPendingIO(int* num_pending_io) { - if (*num_pending_io) { - NOTIMPLEMENTED(); - } -} - } // namespace disk_cache diff --git a/net/disk_cache/cache_util_win.cc b/net/disk_cache/cache_util_win.cc index a82231a..3a99836 100644 --- a/net/disk_cache/cache_util_win.cc +++ b/net/disk_cache/cache_util_win.cc @@ -39,9 +39,6 @@ void DeleteFiles(const wchar_t* path, const wchar_t* search_name) { namespace disk_cache { -// Implemented in file_win.cc. -MessageLoopForIO::IOHandler* GetFileIOHandler(); - bool MoveCache(const std::wstring& from_path, const std::wstring& to_path) { // I don't want to use the shell version of move because if something goes // wrong, that version will attempt to move file by file and fail at the end. @@ -64,12 +61,4 @@ bool DeleteCacheFile(const std::wstring& name) { return DeleteFile(name.c_str()) ? true : false; } -void WaitForPendingIO(int* num_pending_io) { - while (*num_pending_io) { - // Asynchronous IO operations may be in flight and the completion may end - // up calling us back so let's wait for them. - MessageLoopForIO::current()->WaitForIOCompletion(100, GetFileIOHandler()); - } -} - } // namespace disk_cache diff --git a/net/disk_cache/entry_unittest.cc b/net/disk_cache/entry_unittest.cc index df5a058..32ac93d 100644 --- a/net/disk_cache/entry_unittest.cc +++ b/net/disk_cache/entry_unittest.cc @@ -99,6 +99,16 @@ void DiskCacheEntryTest::InternalAsyncIO() { ASSERT_TRUE(cache_->CreateEntry("the first key", &entry1)); ASSERT_TRUE(NULL != entry1); + // Avoid using internal buffers for the test. We have to write something to + // the entry and close it so that we flush the internal buffer to disk. After + // that, IO operations will be really hitting the disk. We don't care about + // the content, so just extending the entry is enough (all extensions zero- + // fill any holes). + EXPECT_EQ(0, entry1->WriteData(0, 15 * 1024, NULL, 0, NULL, false)); + EXPECT_EQ(0, entry1->WriteData(1, 15 * 1024, NULL, 0, NULL, false)); + entry1->Close(); + ASSERT_TRUE(cache_->OpenEntry("the first key", &entry1)); + // Let's verify that each IO goes to the right callback object. CallbackTest callback1(false); CallbackTest callback2(false); @@ -129,7 +139,7 @@ void DiskCacheEntryTest::InternalAsyncIO() { CacheTestFillBuffer(buffer2->data(), kSize2, false); CacheTestFillBuffer(buffer3->data(), kSize3, false); - EXPECT_EQ(0, entry1->ReadData(0, 0, buffer1, kSize1, &callback1)); + EXPECT_EQ(0, entry1->ReadData(0, 15 * 1024, buffer1, kSize1, &callback1)); base::strlcpy(buffer1->data(), "the data", kSize1); int expected = 0; int ret = entry1->WriteData(0, 0, buffer1, kSize1, &callback2, false); @@ -147,7 +157,7 @@ void DiskCacheEntryTest::InternalAsyncIO() { EXPECT_STREQ("the data", buffer2->data()); base::strlcpy(buffer2->data(), "The really big data goes here", kSize2); - ret = entry1->WriteData(1, 1500, buffer2, kSize2, &callback4, false); + ret = entry1->WriteData(1, 1500, buffer2, kSize2, &callback4, true); EXPECT_TRUE(5000 == ret || net::ERR_IO_PENDING == ret); if (net::ERR_IO_PENDING == ret) expected++; @@ -174,13 +184,12 @@ void DiskCacheEntryTest::InternalAsyncIO() { if (net::ERR_IO_PENDING == ret) expected++; - EXPECT_EQ(0, entry1->ReadData(1, 6500, buffer2, kSize2, &callback8)); ret = entry1->ReadData(1, 0, buffer3, kSize3, &callback9); EXPECT_TRUE(6500 == ret || net::ERR_IO_PENDING == ret); if (net::ERR_IO_PENDING == ret) expected++; - ret = entry1->WriteData(1, 0, buffer3, 8192, &callback10, false); + ret = entry1->WriteData(1, 0, buffer3, 8192, &callback10, true); EXPECT_TRUE(8192 == ret || net::ERR_IO_PENDING == ret); if (net::ERR_IO_PENDING == ret) expected++; diff --git a/net/disk_cache/file.h b/net/disk_cache/file.h index bb0c56f..3c167db 100644 --- a/net/disk_cache/file.h +++ b/net/disk_cache/file.h @@ -67,6 +67,9 @@ class File : public base::RefCounted<File> { bool SetLength(size_t length); size_t GetLength(); + // Blocks until |num_pending_io| IO operations complete. + static void WaitForPendingIO(int* num_pending_io); + protected: virtual ~File(); diff --git a/net/disk_cache/file_posix.cc b/net/disk_cache/file_posix.cc index a7d1c24..b080f53 100644 --- a/net/disk_cache/file_posix.cc +++ b/net/disk_cache/file_posix.cc @@ -6,9 +6,239 @@ #include <fcntl.h> +#include <set> + #include "base/logging.h" +#include "base/message_loop.h" +#include "base/singleton.h" +#include "base/waitable_event.h" +#include "base/worker_pool.h" #include "net/disk_cache/disk_cache.h" +namespace { + +class InFlightIO; + +// This class represents a single asynchronous IO operation while it is being +// bounced between threads. +class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> { + public: + // Other than the actual parameters for the IO operation (including the + // |callback| that must be notified at the end), we need the controller that + // is keeping track of all operations. When done, we notify the controller + // (we do NOT invoke the callback), in the worker thead that completed the + // operation. + BackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len, + size_t offset, disk_cache::FileIOCallback* callback, + InFlightIO* controller) + : io_completed_(true, false), callback_(callback), file_(file), buf_(buf), + buf_len_(buf_len), offset_(offset), controller_(controller), + bytes_(0) {} + + ~BackgroundIO() {} + + // Read and Write are the operations that can be performed asynchronously. + // The actual parameters for the operation are setup in the constructor of + // the object, with the exception of |delete_buffer|, that allows a write + // without a callback. Both methods should be called from a worker thread, by + // posting a task to the WorkerPool (they are RunnableMethods). When finished, + // controller->OnIOComplete() is called. + void Read(); + void Write(bool delete_buffer); + + // This method signals the controller that this operation is finished, in the + // original thread (presumably the IO-Thread). In practice, this is a + // RunableMethod that allows cancellation. + void OnIOSignalled(); + + // Allows the cancellation of the task to notify the controller (step number 7 + // in the diagram below). In practice, if the controller waits for the + // operation to finish it doesn't have to wait for the final task to be + // processed by the message loop so calling this method prevents its delivery. + // Note that this method is not intended to cancel the actual IO operation or + // to prevent the first notification to take place (OnIOComplete). + void Cancel(); + + // Retrieves the number of bytes transfered. + int Result(); + + base::WaitableEvent* io_completed() { + return &io_completed_; + } + + disk_cache::FileIOCallback* callback() { + return callback_; + } + + private: + // An event to signal when the operation completes, and the user callback tha + // has to be invoked. These members are accessed directly by the controller. + base::WaitableEvent io_completed_; + disk_cache::FileIOCallback* callback_; + + scoped_refptr<disk_cache::File> file_; + const void* buf_; + size_t buf_len_; + size_t offset_; + InFlightIO* controller_; // The controller that tracks all operations. + int bytes_; // Final operation result. + + DISALLOW_COPY_AND_ASSIGN(BackgroundIO); +}; + +// This class keeps track of every asynchronous IO operation. A single instance +// of this class is meant to be used to start an asynchronous operation (using +// PostRead/PostWrite). This class will post the operation to a worker thread, +// hanlde the notification when the operation finishes and perform the callback +// on the same thread that was used to start the operation. +// +// The regular sequence of calls is: +// Thread_1 Worker_thread +// 1. InFlightIO::PostRead() +// 2. -> PostTask -> +// 3. BackgroundIO::Read() +// 4. IO operation completes +// 5. InFlightIO::OnIOComplete() +// 6. <- PostTask <- +// 7. BackgroundIO::OnIOSignalled() +// 8. InFlightIO::InvokeCallback() +// 9. invoke callback +// +// Shutdown is a special case that is handled though WaitForPendingIO() instead +// of just waiting for step 7. +class InFlightIO { + public: + InFlightIO() : callback_thread_(MessageLoop::current()) {} + ~InFlightIO() {} + + // These methods start an asynchronous operation. The arguments have the same + // semantics of the File asynchronous operations, with the exception that the + // operation never finishes synchronously. + void PostRead(disk_cache::File* file, void* buf, size_t buf_len, + size_t offset, disk_cache::FileIOCallback* callback); + void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len, + size_t offset, disk_cache::FileIOCallback* callback, + bool delete_buffer); + + // Blocks the current thread until all IO operations tracked by this object + // complete. + void WaitForPendingIO(); + + // Called on a worker thread when |operation| completes. + void OnIOComplete(BackgroundIO* operation); + + // Invokes the users' completion callback at the end of the IO operation. + // |cancel_task| is true if the actual task posted to the thread is still + // queued (because we are inside WaitForPendingIO), and false if said task is + // the one performing the call. + void InvokeCallback(BackgroundIO* operation, bool cancel_task); + + private: + typedef std::set<scoped_refptr<BackgroundIO> > IOList; + + IOList io_list_; // List of pending io operations. + MessageLoop* callback_thread_; +}; + +// --------------------------------------------------------------------------- + +// Runs on a worker thread. +void BackgroundIO::Read() { + if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) { + bytes_ = static_cast<int>(buf_len_); + } else { + bytes_ = -1; + } + controller_->OnIOComplete(this); +} + +int BackgroundIO::Result() { + return bytes_; +} + +void BackgroundIO::Cancel() { + DCHECK(controller_); + controller_ = NULL; +} + +// Runs on a worker thread. +void BackgroundIO::Write(bool delete_buffer) { + bool rv = file_->Write(buf_, buf_len_, offset_); + if (delete_buffer) { + // TODO(rvargas): remove or update this code. + delete[] reinterpret_cast<const char*>(buf_); + } + + bytes_ = rv ? static_cast<int>(buf_len_) : -1; + controller_->OnIOComplete(this); +} + +// Runs on the IO thread. +void BackgroundIO::OnIOSignalled() { + if (controller_) + controller_->InvokeCallback(this, false); +} + +// --------------------------------------------------------------------------- + +void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len, + size_t offset, disk_cache::FileIOCallback *callback) { + scoped_refptr<BackgroundIO> operation = + new BackgroundIO(file, buf, buf_len, offset, callback, this); + io_list_.insert(operation.get()); + + WorkerPool::PostTask(FROM_HERE, + NewRunnableMethod(operation.get(), &BackgroundIO::Read), + true); +} + +void InFlightIO::PostWrite(disk_cache::File* file, const void* buf, + size_t buf_len, size_t offset, + disk_cache::FileIOCallback* callback, + bool delete_buffer) { + scoped_refptr<BackgroundIO> operation = + new BackgroundIO(file, buf, buf_len, offset, callback, this); + io_list_.insert(operation.get()); + + WorkerPool::PostTask(FROM_HERE, + NewRunnableMethod(operation.get(), &BackgroundIO::Write, + delete_buffer), + true); +} + +void InFlightIO::WaitForPendingIO() { + while (!io_list_.empty()) { + // Block the current thread until all pending IO completes. + IOList::iterator it = io_list_.begin(); + InvokeCallback(*it, true); + } +} + +// Runs on a worker thread. +void InFlightIO::OnIOComplete(BackgroundIO* operation) { + callback_thread_->PostTask(FROM_HERE, + NewRunnableMethod(operation, + &BackgroundIO::OnIOSignalled)); + operation->io_completed()->Signal(); +} + +// Runs on the IO thread. +void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) { + operation->io_completed()->Wait(); + + if (cancel_task) + operation->Cancel(); + + disk_cache::FileIOCallback* callback = operation->callback(); + int bytes = operation->Result(); + + // Release the reference acquired in PostRead / PostWrite. + io_list_.erase(operation); + callback->OnFileIOComplete(bytes); +} + +} // namespace + namespace disk_cache { File::File(base::PlatformFile file) @@ -71,19 +301,31 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { bool File::Read(void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); + if (!callback) { + if (completed) + *completed = true; + return Read(buffer, buffer_len, offset); + } + if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - // TODO: Implement async IO. - bool ret = Read(buffer, buffer_len, offset); - if (ret && completed) - *completed = true; - return ret; + InFlightIO* io_operations = Singleton<InFlightIO>::get(); + io_operations->PostRead(this, buffer, buffer_len, offset, callback); + + *completed = false; + return true; } bool File::Write(const void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); + if (!callback) { + if (completed) + *completed = true; + return Write(buffer, buffer_len, offset); + } + return AsyncWrite(buffer, buffer_len, offset, true, callback, completed); } @@ -98,17 +340,12 @@ bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - // TODO: Implement async IO. - bool ret = Write(buffer, buffer_len, offset); - if (ret && completed) - *completed = true; - - // If we supply our own async callback, and the caller is not asking to be - // notified when completed, we are supposed to delete the buffer. - if (ret && !callback && !notify) - delete[] reinterpret_cast<const char*>(buffer); + InFlightIO* io_operations = Singleton<InFlightIO>::get(); + io_operations->PostWrite(this, buffer, buffer_len, offset, callback, !notify); - return ret; + if (completed) + *completed = false; + return true; } bool File::SetLength(size_t length) { @@ -125,4 +362,10 @@ size_t File::GetLength() { return ret; } +// Static. +void File::WaitForPendingIO(int* num_pending_io) { + if (*num_pending_io) + Singleton<InFlightIO>::get()->WaitForPendingIO(); +} + } // namespace disk_cache diff --git a/net/disk_cache/file_win.cc b/net/disk_cache/file_win.cc index c2e00b7..b341caf 100644 --- a/net/disk_cache/file_win.cc +++ b/net/disk_cache/file_win.cc @@ -72,11 +72,6 @@ MyOverlapped::~MyOverlapped() { namespace disk_cache { -// Used from WaitForPendingIO() when the cache is being destroyed. -MessageLoopForIO::IOHandler* GetFileIOHandler() { - return Singleton<CompletionHandler>::get(); -} - File::File(base::PlatformFile file) : init_(true), mixed_(true), platform_file_(INVALID_HANDLE_VALUE), sync_platform_file_(file) { @@ -171,8 +166,11 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { bool File::Read(void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); - if (!callback) + if (!callback) { + if (completed) + *completed = true; return Read(buffer, buffer_len, offset); + } if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; @@ -200,8 +198,11 @@ bool File::Read(void* buffer, size_t buffer_len, size_t offset, bool File::Write(const void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); - if (!callback) + if (!callback) { + if (completed) + *completed = true; return Write(buffer, buffer_len, offset); + } return AsyncWrite(buffer, buffer_len, offset, true, callback, completed); } @@ -270,4 +271,14 @@ size_t File::GetLength() { return static_cast<size_t>(size.LowPart); } +// Static. +void File::WaitForPendingIO(int* num_pending_io) { + while (*num_pending_io) { + // Asynchronous IO operations may be in flight and the completion may end + // up calling us back so let's wait for them. + MessageLoopForIO::IOHandler* handler = Singleton<CompletionHandler>::get(); + MessageLoopForIO::current()->WaitForIOCompletion(100, handler); + } +} + } // namespace disk_cache |