diff options
author | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-07-13 18:00:56 +0000 |
---|---|---|
committer | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-07-13 18:00:56 +0000 |
commit | fb2622f6816ed20ffd8a35994f7372b67613ba92 (patch) | |
tree | 2aa33016e72361032264904916c4374e4784fd11 /net | |
parent | ea9a4ee67732b90e834def1cf98be1d047a93063 (diff) | |
download | chromium_src-fb2622f6816ed20ffd8a35994f7372b67613ba92.zip chromium_src-fb2622f6816ed20ffd8a35994f7372b67613ba92.tar.gz chromium_src-fb2622f6816ed20ffd8a35994f7372b67613ba92.tar.bz2 |
Disk cache: Switch the disk cache to use the cache_thread.
Add an InFlightBackendIO class that handles posting of
cacheoperations back and forth between the IO thread and
the cachethread.
BUG=26730
TEST=unit tests
Review URL: http://codereview.chromium.org/2945002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@52185 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/disk_cache/backend_impl.cc | 332 | ||||
-rw-r--r-- | net/disk_cache/backend_impl.h | 49 | ||||
-rw-r--r-- | net/disk_cache/backend_unittest.cc | 97 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_test_base.cc | 33 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_test_base.h | 8 | ||||
-rw-r--r-- | net/disk_cache/entry_impl.cc | 168 | ||||
-rw-r--r-- | net/disk_cache/entry_impl.h | 17 | ||||
-rw-r--r-- | net/disk_cache/entry_unittest.cc | 72 | ||||
-rw-r--r-- | net/disk_cache/eviction.cc | 6 | ||||
-rw-r--r-- | net/disk_cache/file_posix.cc | 13 | ||||
-rw-r--r-- | net/disk_cache/in_flight_backend_io.cc | 448 | ||||
-rw-r--r-- | net/disk_cache/in_flight_backend_io.h | 200 | ||||
-rw-r--r-- | net/disk_cache/in_flight_io.cc | 73 | ||||
-rw-r--r-- | net/disk_cache/in_flight_io.h | 134 | ||||
-rw-r--r-- | net/disk_cache/sparse_control.cc | 18 | ||||
-rw-r--r-- | net/disk_cache/stress_cache.cc | 1 | ||||
-rw-r--r-- | net/http/http_cache.cc | 1 | ||||
-rw-r--r-- | net/http/http_cache_unittest.cc | 34 | ||||
-rw-r--r-- | net/net.gyp | 4 | ||||
-rw-r--r-- | net/tools/crash_cache/crash_cache.cc | 30 | ||||
-rw-r--r-- | net/tools/dump_cache/upgrade.cc | 45 |
21 files changed, 1543 insertions, 240 deletions
diff --git a/net/disk_cache/backend_impl.cc b/net/disk_cache/backend_impl.cc index 075360c..0194bb9 100644 --- a/net/disk_cache/backend_impl.cc +++ b/net/disk_cache/backend_impl.cc @@ -162,6 +162,106 @@ void SetFieldTrialInfo(int size_group) { trial1->AppendGroup(group1, FieldTrial::kAllRemainingProbability); } +// ------------------------------------------------------------------------ + +// This class takes care of building an instance of the backend. +class CacheCreator { + public: + CacheCreator(const FilePath& path, bool force, int max_bytes, + net::CacheType type, uint32 flags, + base::MessageLoopProxy* thread, disk_cache::Backend** backend, + net::CompletionCallback* callback) + : path_(path), force_(force), retry_(false), max_bytes_(max_bytes), + type_(type), flags_(flags), thread_(thread), backend_(backend), + callback_(callback), cache_(NULL), + ALLOW_THIS_IN_INITIALIZER_LIST( + my_callback_(this, &CacheCreator::OnIOComplete)) { + } + ~CacheCreator() {} + + // Creates the backend. + int Run(); + + // Callback implementation. + void OnIOComplete(int result); + + private: + void DoCallback(int result); + + const FilePath& path_; + bool force_; + bool retry_; + int max_bytes_; + net::CacheType type_; + uint32 flags_; + scoped_refptr<base::MessageLoopProxy> thread_; + disk_cache::Backend** backend_; + net::CompletionCallback* callback_; + disk_cache::BackendImpl* cache_; + net::CompletionCallbackImpl<CacheCreator> my_callback_; + + DISALLOW_COPY_AND_ASSIGN(CacheCreator); +}; + +int CacheCreator::Run() { + cache_ = new disk_cache::BackendImpl(path_, thread_); + cache_->SetMaxSize(max_bytes_); + cache_->SetType(type_); + cache_->SetFlags(flags_); + int rv = cache_->Init(&my_callback_); + DCHECK_EQ(net::ERR_IO_PENDING, rv); + return rv; +} + +void CacheCreator::OnIOComplete(int result) { + if (result == net::OK || !force_ || retry_) + return DoCallback(result); + + // This is a failure and we are supposed to try again, so delete the object, + // delete all the files, and try again. + retry_ = true; + delete cache_; + cache_ = NULL; + if (!DelayedCacheCleanup(path_)) + return DoCallback(result); + + // The worker thread will start deleting files soon, but the original folder + // is not there anymore... let's create a new set of files. + int rv = Run(); + DCHECK_EQ(net::ERR_IO_PENDING, rv); +} + +void CacheCreator::DoCallback(int result) { + DCHECK_NE(net::ERR_IO_PENDING, result); + if (result == net::OK) { + *backend_ = cache_; + } else { + LOG(ERROR) << "Unable to create cache"; + *backend_ = NULL; + delete cache_; + } + callback_->Run(result); + delete this; +} + +// ------------------------------------------------------------------------ + +// A task to perform final cleanup on the background thread. +class FinalCleanup : public Task { + public: + explicit FinalCleanup(disk_cache::BackendImpl* backend) : backend_(backend) {} + ~FinalCleanup() {} + + virtual void Run(); + private: + disk_cache::BackendImpl* backend_; + DISALLOW_EVIL_CONSTRUCTORS(FinalCleanup); +}; + +void FinalCleanup::Run() { + backend_->CleanupCache(); +} + } // namespace // ------------------------------------------------------------------------ @@ -228,36 +328,16 @@ int BackendImpl::CreateBackend(const FilePath& full_path, bool force, uint32 flags, base::MessageLoopProxy* thread, Backend** backend, CompletionCallback* callback) { - BackendImpl* cache = new BackendImpl(full_path, thread); - cache->SetMaxSize(max_bytes); - cache->SetType(type); - cache->SetFlags(flags); - if (cache->Init()) { - *backend = cache; - return net::OK; - } - - *backend = NULL; - delete cache; - if (!force) - return net::ERR_FAILED; - - if (!DelayedCacheCleanup(full_path)) - return net::ERR_FAILED; + CacheCreator* creator = new CacheCreator(full_path, force, max_bytes, type, + flags, thread, backend, callback); + // This object will self-destroy when finished. + return creator->Run(); +} - // The worker thread will start deleting files soon, but the original folder - // is not there anymore... let's create a new set of files. - cache = new BackendImpl(full_path, thread); - cache->SetMaxSize(max_bytes); - cache->SetType(type); - cache->SetFlags(flags); - if (cache->Init()) { - *backend = cache; +int BackendImpl::SyncInit() { + if (Init()) return net::OK; - } - delete cache; - LOG(ERROR) << "Unable to create cache"; return net::ERR_FAILED; } @@ -305,7 +385,7 @@ bool BackendImpl::Init() { // We don't care if the value overflows. The only thing we care about is that // the id cannot be zero, because that value is used as "not dirty". - // Increasing the value once per second gives us many years before a we start + // Increasing the value once per second gives us many years before we start // having collisions. data_->header.this_id++; if (!data_->header.this_id) @@ -336,18 +416,36 @@ bool BackendImpl::Init() { return !disabled_; } +int BackendImpl::Init(CompletionCallback* callback) { + background_queue_.Init(callback); + return net::ERR_IO_PENDING; +} + BackendImpl::~BackendImpl() { - Trace("Backend destructor"); - if (!init_) - return; + background_queue_.WaitForPendingIO(); - if (data_) - data_->header.crash = 0; + if (background_queue_.BackgroundIsCurrentThread()) { + // Unit tests may use the same thread for everything. + CleanupCache(); + } else { + background_queue_.background_thread()->PostTask(FROM_HERE, + new FinalCleanup(this)); + done_.Wait(); + } +} - timer_.Stop(); +void BackendImpl::CleanupCache() { + Trace("Backend Cleanup"); + if (init_) { + if (data_) + data_->header.crash = 0; - File::WaitForPendingIO(&num_pending_io_); - DCHECK(!num_refs_); + timer_.Stop(); + File::WaitForPendingIO(&num_pending_io_); + DCHECK(!num_refs_); + } + factory_.RevokeAll(); + done_.Signal(); } // ------------------------------------------------------------------------ @@ -395,18 +493,17 @@ EntryImpl* BackendImpl::OpenEntryImpl(const std::string& key) { return cache_entry; } -bool BackendImpl::OpenEntry(const std::string& key, Entry** entry) { +int BackendImpl::SyncOpenEntry(const std::string& key, Entry** entry) { DCHECK(entry); *entry = OpenEntryImpl(key); - return (*entry) ? true : false; + return (*entry) ? net::OK : net::ERR_FAILED; } int BackendImpl::OpenEntry(const std::string& key, Entry** entry, CompletionCallback* callback) { - if (OpenEntry(key, entry)) - return net::OK; - - return net::ERR_FAILED; + DCHECK(callback); + background_queue_.OpenEntry(key, entry, callback); + return net::ERR_IO_PENDING; } EntryImpl* BackendImpl::CreateEntryImpl(const std::string& key) { @@ -487,15 +584,21 @@ EntryImpl* BackendImpl::CreateEntryImpl(const std::string& key) { return cache_entry.release(); } -bool BackendImpl::CreateEntry(const std::string& key, Entry** entry) { +int BackendImpl::SyncCreateEntry(const std::string& key, Entry** entry) { DCHECK(entry); *entry = CreateEntryImpl(key); - return (*entry) ? true : false; + return (*entry) ? net::OK : net::ERR_FAILED; } int BackendImpl::CreateEntry(const std::string& key, Entry** entry, CompletionCallback* callback) { - if (CreateEntry(key, entry)) + DCHECK(callback); + background_queue_.CreateEntry(key, entry, callback); + return net::ERR_IO_PENDING; +} + +int BackendImpl::SyncDoomEntry(const std::string& key) { + if (DoomEntry(key)) return net::OK; return net::ERR_FAILED; @@ -505,21 +608,24 @@ bool BackendImpl::DoomEntry(const std::string& key) { if (disabled_) return false; - Entry* entry; - if (!OpenEntry(key, &entry)) + EntryImpl* entry = OpenEntryImpl(key); + if (!entry) return false; - // Note that you'd think you could just pass &entry_impl to OpenEntry, - // but that triggers strict aliasing problems with gcc. - EntryImpl* entry_impl = reinterpret_cast<EntryImpl*>(entry); - entry_impl->Doom(); - entry_impl->Release(); + entry->DoomImpl(); + entry->Release(); return true; } int BackendImpl::DoomEntry(const std::string& key, CompletionCallback* callback) { - if (DoomEntry(key)) + DCHECK(callback); + background_queue_.DoomEntry(key, callback); + return net::ERR_IO_PENDING; +} + +int BackendImpl::SyncDoomAllEntries() { + if (DoomAllEntries()) return net::OK; return net::ERR_FAILED; @@ -541,7 +647,14 @@ bool BackendImpl::DoomAllEntries() { } int BackendImpl::DoomAllEntries(CompletionCallback* callback) { - if (DoomAllEntries()) + DCHECK(callback); + background_queue_.DoomAllEntries(callback); + return net::ERR_IO_PENDING; +} + +int BackendImpl::SyncDoomEntriesBetween(const base::Time initial_time, + const base::Time end_time) { + if (DoomEntriesBetween(initial_time, end_time)) return net::OK; return net::ERR_FAILED; @@ -557,27 +670,27 @@ bool BackendImpl::DoomEntriesBetween(const Time initial_time, if (disabled_) return false; - Entry* node, *next; + EntryImpl* node; void* iter = NULL; - if (!OpenNextEntry(&iter, &next)) + EntryImpl* next = OpenNextEntryImpl(&iter); + if (!next) return true; while (next) { node = next; - if (!OpenNextEntry(&iter, &next)) - next = NULL; + next = OpenNextEntryImpl(&iter); if (node->GetLastUsed() >= initial_time && node->GetLastUsed() < end_time) { - node->Doom(); + node->DoomImpl(); } else if (node->GetLastUsed() < initial_time) { if (next) - next->Close(); + next->Release(); next = NULL; - EndEnumeration(&iter); + SyncEndEnumeration(iter); } - node->Close(); + node->Release(); } return true; @@ -586,59 +699,72 @@ bool BackendImpl::DoomEntriesBetween(const Time initial_time, int BackendImpl::DoomEntriesBetween(const base::Time initial_time, const base::Time end_time, CompletionCallback* callback) { - if (DoomEntriesBetween(initial_time, end_time)) + DCHECK(callback); + background_queue_.DoomEntriesBetween(initial_time, end_time, callback); + return net::ERR_IO_PENDING; +} + +int BackendImpl::SyncDoomEntriesSince(const base::Time initial_time) { + if (DoomEntriesSince(initial_time)) return net::OK; return net::ERR_FAILED; } -// We use OpenNextEntry to retrieve elements from the cache, until we get +// We use OpenNextEntryImpl to retrieve elements from the cache, until we get // entries that are too old. bool BackendImpl::DoomEntriesSince(const Time initial_time) { if (disabled_) return false; for (;;) { - Entry* entry; void* iter = NULL; - if (!OpenNextEntry(&iter, &entry)) + EntryImpl* entry = OpenNextEntryImpl(&iter); + if (!entry) return true; if (initial_time > entry->GetLastUsed()) { - entry->Close(); - EndEnumeration(&iter); + entry->Release(); + SyncEndEnumeration(iter); return true; } - entry->Doom(); - entry->Close(); - EndEnumeration(&iter); // Dooming the entry invalidates the iterator. + entry->DoomImpl(); + entry->Release(); + SyncEndEnumeration(iter); // Dooming the entry invalidates the iterator. } } int BackendImpl::DoomEntriesSince(const base::Time initial_time, CompletionCallback* callback) { - if (DoomEntriesSince(initial_time)) - return net::OK; + DCHECK(callback); + background_queue_.DoomEntriesSince(initial_time, callback); + return net::ERR_IO_PENDING; +} - return net::ERR_FAILED; +int BackendImpl::SyncOpenNextEntry(void** iter, Entry** next_entry) { + *next_entry = OpenNextEntryImpl(iter); + return (*next_entry) ? net::OK : net::ERR_FAILED; } -bool BackendImpl::OpenNextEntry(void** iter, Entry** next_entry) { - return OpenFollowingEntry(true, iter, next_entry); +EntryImpl* BackendImpl::OpenNextEntryImpl(void** iter) { + return OpenFollowingEntry(true, iter); } int BackendImpl::OpenNextEntry(void** iter, Entry** next_entry, CompletionCallback* callback) { - if (OpenNextEntry(iter, next_entry)) - return net::OK; + DCHECK(callback); + background_queue_.OpenNextEntry(iter, next_entry, callback); + return net::ERR_IO_PENDING; +} - return net::ERR_FAILED; +void BackendImpl::SyncEndEnumeration(void* iter) { + scoped_ptr<Rankings::Iterator> iterator( + reinterpret_cast<Rankings::Iterator*>(iter)); } void BackendImpl::EndEnumeration(void** iter) { - scoped_ptr<Rankings::Iterator> iterator( - reinterpret_cast<Rankings::Iterator*>(*iter)); + background_queue_.EndEnumeration(*iter); *iter = NULL; } @@ -1043,6 +1169,11 @@ void BackendImpl::ClearRefCountForTest() { num_refs_ = 0; } +int BackendImpl::FlushQueueForTest(CompletionCallback* callback) { + background_queue_.FlushQueue(callback); + return net::ERR_IO_PENDING; +} + int BackendImpl::SelfCheck() { if (!init_) { LOG(ERROR) << "Init failed"; @@ -1063,8 +1194,20 @@ int BackendImpl::SelfCheck() { return CheckAllEntries(); } -bool BackendImpl::OpenPrevEntry(void** iter, Entry** prev_entry) { - return OpenFollowingEntry(false, iter, prev_entry); +int BackendImpl::SyncOpenPrevEntry(void** iter, Entry** prev_entry) { + *prev_entry = OpenPrevEntryImpl(iter); + return (*prev_entry) ? net::OK : net::ERR_FAILED; +} + +int BackendImpl::OpenPrevEntry(void** iter, Entry** prev_entry, + CompletionCallback* callback) { + DCHECK(callback); + background_queue_.OpenPrevEntry(iter, prev_entry, callback); + return net::ERR_IO_PENDING; +} + +EntryImpl* BackendImpl::OpenPrevEntryImpl(void** iter) { + return OpenFollowingEntry(false, iter); } // ------------------------------------------------------------------------ @@ -1178,7 +1321,7 @@ void BackendImpl::RestartCache() { // trying to re-enable the cache. if (unit_test_) init_ = true; // Let the destructor do proper cleanup. - else if (Init()) + else if (SyncInit()) stats_.SetCounter(Stats::FATAL_ERROR, errors + 1); } @@ -1328,14 +1471,11 @@ EntryImpl* BackendImpl::MatchEntry(const std::string& key, uint32 hash, } // This is the actual implementation for OpenNextEntry and OpenPrevEntry. -bool BackendImpl::OpenFollowingEntry(bool forward, void** iter, - Entry** next_entry) { +EntryImpl* BackendImpl::OpenFollowingEntry(bool forward, void** iter) { if (disabled_) - return false; + return NULL; DCHECK(iter); - DCHECK(next_entry); - *next_entry = NULL; const int kListsToSearch = 3; scoped_refptr<EntryImpl> entries[kListsToSearch]; @@ -1355,7 +1495,7 @@ bool BackendImpl::OpenFollowingEntry(bool forward, void** iter, entries[i].swap(&temp); // The entry was already addref'd. } if (!ret) - return false; + return NULL; } else { // Get the next entry from the last list, and the actual entries for the // elements on the other lists. @@ -1391,18 +1531,19 @@ bool BackendImpl::OpenFollowingEntry(bool forward, void** iter, } if (newest < 0 || oldest < 0) - return false; + return NULL; + EntryImpl* next_entry; if (forward) { - entries[newest].swap(reinterpret_cast<EntryImpl**>(next_entry)); + next_entry = entries[newest].release(); iterator->list = static_cast<Rankings::List>(newest); } else { - entries[oldest].swap(reinterpret_cast<EntryImpl**>(next_entry)); + next_entry = entries[oldest].release(); iterator->list = static_cast<Rankings::List>(oldest); } *iter = iterator.release(); - return true; + return next_entry; } bool BackendImpl::OpenFollowingEntryFromList(bool forward, Rankings::List list, @@ -1451,6 +1592,9 @@ EntryImpl* BackendImpl::GetEnumeratedEntry(CacheRankingsBlock* next, return NULL; } + // Make sure that we save the key for later. + entry->GetKey(); + return entry; } diff --git a/net/disk_cache/backend_impl.h b/net/disk_cache/backend_impl.h index f0a8aab..e15e429 100644 --- a/net/disk_cache/backend_impl.h +++ b/net/disk_cache/backend_impl.h @@ -13,6 +13,7 @@ #include "net/disk_cache/block_files.h" #include "net/disk_cache/disk_cache.h" #include "net/disk_cache/eviction.h" +#include "net/disk_cache/in_flight_backend_io.h" #include "net/disk_cache/rankings.h" #include "net/disk_cache/stats.h" #include "net/disk_cache/trace.h" @@ -36,18 +37,20 @@ class BackendImpl : public Backend { friend class Eviction; public: BackendImpl(const FilePath& path, base::MessageLoopProxy* cache_thread) - : path_(path), block_files_(path), mask_(0), max_size_(0), + : ALLOW_THIS_IN_INITIALIZER_LIST(background_queue_(this, cache_thread)), + path_(path), block_files_(path), mask_(0), max_size_(0), cache_type_(net::DISK_CACHE), uma_report_(0), user_flags_(0), init_(false), restarted_(false), unit_test_(false), read_only_(false), - new_eviction_(false), first_timer_(true), + new_eviction_(false), first_timer_(true), done_(true, false), ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {} // mask can be used to limit the usable size of the hash table, for testing. BackendImpl(const FilePath& path, uint32 mask, base::MessageLoopProxy* cache_thread) - : path_(path), block_files_(path), mask_(mask), max_size_(0), + : ALLOW_THIS_IN_INITIALIZER_LIST(background_queue_(this, cache_thread)), + path_(path), block_files_(path), mask_(mask), max_size_(0), cache_type_(net::DISK_CACHE), uma_report_(0), user_flags_(kMask), init_(false), restarted_(false), unit_test_(false), read_only_(false), - new_eviction_(false), first_timer_(true), + new_eviction_(false), first_timer_(true), done_(true, false), ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {} ~BackendImpl(); @@ -59,7 +62,12 @@ class BackendImpl : public Backend { Backend** backend, CompletionCallback* callback); // Performs general initialization for this current instance of the cache. - bool Init(); + bool Init(); // Deprecated. + int Init(CompletionCallback* callback); + int SyncInit(); + + // Performs final cleanup on destruction. + void CleanupCache(); // Backend interface. virtual int32 GetEntryCount() const; @@ -79,6 +87,17 @@ class BackendImpl : public Backend { virtual void EndEnumeration(void** iter); virtual void GetStats(StatsItems* stats); + // Synchronous implementation of the asynchronous interface. + int SyncOpenEntry(const std::string& key, Entry** entry); + int SyncCreateEntry(const std::string& key, Entry** entry); + int SyncDoomEntry(const std::string& key); + int SyncDoomAllEntries(); + int SyncDoomEntriesBetween(const base::Time initial_time, + const base::Time end_time); + int SyncDoomEntriesSince(const base::Time initial_time); + int SyncOpenNextEntry(void** iter, Entry** next_entry); + void SyncEndEnumeration(void* iter); + // Sets the maximum size for the total amount of data stored by this instance. bool SetMaxSize(int max_bytes); @@ -91,6 +110,10 @@ class BackendImpl : public Backend { // Returns the actual file used to store a given (non-external) address. MappedFile* File(Addr address); + InFlightBackendIO* background_queue() { + return &background_queue_; + } + // Creates an external storage file. bool CreateExternalFile(Addr* address); @@ -197,12 +220,17 @@ class BackendImpl : public Backend { // Clears the counter of references to test handling of corruptions. void ClearRefCountForTest(); + // Sends a dummy operation through the operation queue, for unit tests. + int FlushQueueForTest(CompletionCallback* callback); + // Peforms a simple self-check, and returns the number of dirty items // or an error code (negative value). int SelfCheck(); // Same bahavior as OpenNextEntry but walks the list from back to front. - bool OpenPrevEntry(void** iter, Entry** prev_entry); + int OpenPrevEntry(void** iter, Entry** prev_entry, + CompletionCallback* callback); + int SyncOpenPrevEntry(void** iter, Entry** prev_entry); // Old Backend interface. bool OpenEntry(const std::string& key, Entry** entry); @@ -212,11 +240,12 @@ class BackendImpl : public Backend { bool DoomEntriesBetween(const base::Time initial_time, const base::Time end_time); bool DoomEntriesSince(const base::Time initial_time); - bool OpenNextEntry(void** iter, Entry** next_entry); - // Open or create an entry for the given |key|. + // Open or create an entry for the given |key| or |iter|. EntryImpl* OpenEntryImpl(const std::string& key); EntryImpl* CreateEntryImpl(const std::string& key); + EntryImpl* OpenNextEntryImpl(void** iter); + EntryImpl* OpenPrevEntryImpl(void** iter); private: typedef base::hash_map<CacheAddr, EntryImpl*> EntriesMap; @@ -240,7 +269,7 @@ class BackendImpl : public Backend { EntryImpl* MatchEntry(const std::string& key, uint32 hash, bool find_parent); // Opens the next or previous entry on a cache iteration. - bool OpenFollowingEntry(bool forward, void** iter, Entry** next_entry); + EntryImpl* OpenFollowingEntry(bool forward, void** iter); // Opens the next or previous entry on a single list. If successfull, // |from_entry| will be updated to point to the new entry, otherwise it will @@ -287,6 +316,7 @@ class BackendImpl : public Backend { // Part of the self test. Returns false if the entry is corrupt. bool CheckEntry(EntryImpl* cache_entry); + InFlightBackendIO background_queue_; // The controller of pending operations. scoped_refptr<MappedFile> index_; // The main cache index. FilePath path_; // Path to the folder used as backing storage. Index* data_; // Pointer to the index data. @@ -314,6 +344,7 @@ class BackendImpl : public Backend { Stats stats_; // Usage statistcs. base::RepeatingTimer<BackendImpl> timer_; // Usage timer. + base::WaitableEvent done_; // Signals the end of background work. scoped_refptr<TraceObject> trace_object_; // Inits internal tracing. ScopedRunnableMethodFactory<BackendImpl> factory_; diff --git a/net/disk_cache/backend_unittest.cc b/net/disk_cache/backend_unittest.cc index efb7925..a9ab2a2 100644 --- a/net/disk_cache/backend_unittest.cc +++ b/net/disk_cache/backend_unittest.cc @@ -7,7 +7,6 @@ #include "base/path_service.h" #include "base/platform_thread.h" #include "base/string_util.h" -#include "base/thread.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/base/test_completion_callback.h" @@ -254,8 +253,9 @@ TEST_F(DiskCacheBackendTest, ExternalFiles) { EXPECT_EQ(0, memcmp(buffer1->data(), buffer2->data(), kSize)); } +// Tests that we deal with file-level pending operations at destruction time. TEST_F(DiskCacheTest, ShutdownWithPendingIO) { - TestCompletionCallback callback; + TestCompletionCallback cb; { FilePath path = GetCacheFilePath(); @@ -267,28 +267,75 @@ TEST_F(DiskCacheTest, ShutdownWithPendingIO) { disk_cache::Backend* cache; int rv = disk_cache::BackendImpl::CreateBackend( path, false, 0, net::DISK_CACHE, disk_cache::kNoRandom, - cache_thread.message_loop_proxy(), &cache, &callback); - ASSERT_EQ(net::OK, callback.GetResult(rv)); + base::MessageLoopProxy::CreateForCurrentThread(), &cache, &cb); + ASSERT_EQ(net::OK, cb.GetResult(rv)); - disk_cache::Entry* entry; - rv = cache->CreateEntry("some key", &entry, &callback); - ASSERT_EQ(net::OK, callback.GetResult(rv)); + disk_cache::EntryImpl* entry; + rv = cache->CreateEntry("some key", + reinterpret_cast<disk_cache::Entry**>(&entry), &cb); + ASSERT_EQ(net::OK, cb.GetResult(rv)); const int kSize = 25000; scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(kSize); CacheTestFillBuffer(buffer->data(), kSize, false); for (int i = 0; i < 10 * 1024 * 1024; i += 64 * 1024) { - int rv = entry->WriteData(0, i, buffer, kSize, &callback, false); + // We are using the current thread as the cache thread because we want to + // be able to call directly this method to make sure that the OS (instead + // of us switching thread) is returning IO pending. + rv = entry->WriteDataImpl(0, i, buffer, kSize, &cb, false); if (rv == net::ERR_IO_PENDING) break; EXPECT_EQ(kSize, rv); } - entry->Close(); + // Don't call Close() to avoid going through the queue or we'll deadlock + // waiting for the operation to finish. + entry->Release(); // The cache destructor will see one pending operation here. delete cache; + + if (rv == net::ERR_IO_PENDING) { + EXPECT_TRUE(cb.have_result()); + } + } + + MessageLoop::current()->RunAllPending(); +} + +// Tests that we deal with background-thread pending operations. +TEST_F(DiskCacheTest, ShutdownWithPendingIO2) { + TestCompletionCallback cb; + + { + FilePath path = GetCacheFilePath(); + ASSERT_TRUE(DeleteCache(path)); + base::Thread cache_thread("CacheThread"); + ASSERT_TRUE(cache_thread.StartWithOptions( + base::Thread::Options(MessageLoop::TYPE_IO, 0))); + + disk_cache::Backend* cache; + int rv = disk_cache::BackendImpl::CreateBackend( + path, false, 0, net::DISK_CACHE, disk_cache::kNoRandom, + cache_thread.message_loop_proxy(), &cache, &cb); + ASSERT_EQ(net::OK, cb.GetResult(rv)); + + disk_cache::Entry* entry; + rv = cache->CreateEntry("some key", &entry, &cb); + ASSERT_EQ(net::OK, cb.GetResult(rv)); + + const int kSize = 25000; + scoped_refptr<net::IOBuffer> buffer = new net::IOBuffer(kSize); + CacheTestFillBuffer(buffer->data(), kSize, false); + + rv = entry->WriteData(0, 0, buffer, kSize, &cb, false); + EXPECT_EQ(net::ERR_IO_PENDING, rv); + + entry->Close(); + + // The cache destructor will see two pending operations here. + delete cache; } MessageLoop::current()->RunAllPending(); @@ -344,6 +391,7 @@ void DiskCacheBackendTest::BackendSetSize() { EXPECT_EQ(cache_size * 3 / 4, entry->WriteData(0, 0, buffer, cache_size * 3 / 4, NULL, false)); entry->Close(); + FlushQueueForTest(); SetMaxSize(cache_size); @@ -351,10 +399,20 @@ void DiskCacheBackendTest::BackendSetSize() { ASSERT_EQ(net::OK, CreateEntry(second, &entry)); EXPECT_EQ(cache_size / 10, entry->WriteData(0, 0, buffer, cache_size / 10, - NULL, false)) << "trim the cache"; - entry->Close(); + NULL, false)); - EXPECT_NE(net::OK, OpenEntry(first, &entry)); + disk_cache::Entry* entry2; + ASSERT_EQ(net::OK, CreateEntry("an extra key", &entry2)); + EXPECT_EQ(cache_size / 10, entry2->WriteData(0, 0, buffer, cache_size / 10, + NULL, false)); + entry2->Close(); // This will trigger the cache trim. + + EXPECT_NE(net::OK, OpenEntry(first, &entry2)); + + FlushQueueForTest(); // Make sure that we are done trimming the cache. + FlushQueueForTest(); // We may have posted two tasks to evict stuff. + + entry->Close(); ASSERT_EQ(net::OK, OpenEntry(second, &entry)); EXPECT_EQ(cache_size / 10, entry->GetDataSize(0)); entry->Close(); @@ -402,6 +460,7 @@ void DiskCacheBackendTest::BackendLoad() { entries[i]->Doom(); entries[i]->Close(); } + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } @@ -625,6 +684,7 @@ void DiskCacheBackendTest::BackendTrimInvalidEntry() { EXPECT_EQ(2, cache_->GetEntryCount()); SetMaxSize(kSize); entry->Close(); // Trim the cache. + FlushQueueForTest(); // If we evicted the entry in less than 20mS, we have one entry in the cache; // if it took more than that, we posted a task and we'll delete the second @@ -685,6 +745,7 @@ void DiskCacheBackendTest::BackendTrimInvalidEntry2() { } entry->Close(); // Trim the cache. + FlushQueueForTest(); // We may abort the eviction before cleaning up everything. MessageLoop::current()->RunAllPending(); @@ -1259,7 +1320,7 @@ void DiskCacheBackendTest::BackendInvalidRankings() { EXPECT_EQ(2, cache_->GetEntryCount()); EXPECT_NE(net::OK, OpenNextEntry(&iter, &entry)); - MessageLoop::current()->RunAllPending(); + FlushQueueForTest(); // Allow the restart to finish. EXPECT_EQ(0, cache_->GetEntryCount()); } @@ -1310,7 +1371,8 @@ void DiskCacheBackendTest::BackendDisable() { EXPECT_NE(net::OK, CreateEntry("Something new", &entry2)); entry1->Close(); - MessageLoop::current()->RunAllPending(); + FlushQueueForTest(); // Flushing the Close posts a task to restart the cache. + FlushQueueForTest(); // This one actually allows that task to complete. EXPECT_EQ(0, cache_->GetEntryCount()); } @@ -1365,7 +1427,7 @@ void DiskCacheBackendTest::BackendDisable2() { ASSERT_LT(count, 9); }; - MessageLoop::current()->RunAllPending(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } @@ -1414,7 +1476,7 @@ void DiskCacheBackendTest::BackendDisable3() { entry1->Close(); EXPECT_NE(net::OK, OpenNextEntry(&iter, &entry2)); - MessageLoop::current()->RunAllPending(); + FlushQueueForTest(); ASSERT_EQ(net::OK, CreateEntry("Something new", &entry2)); entry2->Close(); @@ -1482,7 +1544,8 @@ void DiskCacheBackendTest::BackendDisable4() { entry1->Close(); entry2->Close(); entry3->Close(); - MessageLoop::current()->RunAllPending(); + FlushQueueForTest(); // Flushing the Close posts a task to restart the cache. + FlushQueueForTest(); // This one actually allows that task to complete. EXPECT_EQ(0, cache_->GetEntryCount()); } diff --git a/net/disk_cache/disk_cache_test_base.cc b/net/disk_cache/disk_cache_test_base.cc index 7b0cc8e..0add3c7 100644 --- a/net/disk_cache/disk_cache_test_base.cc +++ b/net/disk_cache/disk_cache_test_base.cc @@ -4,6 +4,7 @@ #include "net/disk_cache/disk_cache_test_base.h" +#include "net/base/net_errors.h" #include "net/base/test_completion_callback.h" #include "net/disk_cache/backend_impl.h" #include "net/disk_cache/disk_cache_test_util.h" @@ -66,21 +67,25 @@ void DiskCacheTestWithCache::InitDiskCache() { if (implementation_) return InitDiskCacheImpl(path); + scoped_refptr<base::MessageLoopProxy> thread = + use_current_thread_ ? base::MessageLoopProxy::CreateForCurrentThread() : + cache_thread_.message_loop_proxy(); + TestCompletionCallback cb; int rv = disk_cache::BackendImpl::CreateBackend( path, force_creation_, size_, net::DISK_CACHE, - disk_cache::kNoRandom, cache_thread_.message_loop_proxy(), - &cache_, &cb); + disk_cache::kNoRandom, thread, &cache_, &cb); ASSERT_EQ(net::OK, cb.GetResult(rv)); } void DiskCacheTestWithCache::InitDiskCacheImpl(const FilePath& path) { + scoped_refptr<base::MessageLoopProxy> thread = + use_current_thread_ ? base::MessageLoopProxy::CreateForCurrentThread() : + cache_thread_.message_loop_proxy(); if (mask_) - cache_impl_ = new disk_cache::BackendImpl( - path, mask_, cache_thread_.message_loop_proxy()); + cache_impl_ = new disk_cache::BackendImpl(path, mask_, thread); else - cache_impl_ = new disk_cache::BackendImpl( - path, cache_thread_.message_loop_proxy()); + cache_impl_ = new disk_cache::BackendImpl(path, thread); cache_ = cache_impl_; ASSERT_TRUE(NULL != cache_); @@ -92,7 +97,9 @@ void DiskCacheTestWithCache::InitDiskCacheImpl(const FilePath& path) { cache_impl_->SetNewEviction(); cache_impl_->SetFlags(disk_cache::kNoRandom); - ASSERT_TRUE(cache_impl_->Init()); + TestCompletionCallback cb; + int rv = cache_impl_->Init(&cb); + ASSERT_EQ(net::OK, cb.GetResult(rv)); } void DiskCacheTestWithCache::TearDown() { @@ -112,6 +119,9 @@ void DiskCacheTestWithCache::TearDown() { // We are expected to leak memory when simulating crashes. void DiskCacheTestWithCache::SimulateCrash() { ASSERT_TRUE(implementation_ && !memory_only_); + TestCompletionCallback cb; + int rv = cache_impl_->FlushQueueForTest(&cb); + ASSERT_EQ(net::OK, cb.GetResult(rv)); cache_impl_->ClearRefCountForTest(); delete cache_impl_; @@ -171,3 +181,12 @@ int DiskCacheTestWithCache::OpenNextEntry(void** iter, int rv = cache_->OpenNextEntry(iter, next_entry, &cb); return cb.GetResult(rv); } + +void DiskCacheTestWithCache::FlushQueueForTest() { + if (memory_only_ || !cache_impl_) + return; + + TestCompletionCallback cb; + int rv = cache_impl_->FlushQueueForTest(&cb); + EXPECT_EQ(net::OK, cb.GetResult(rv)); +} diff --git a/net/disk_cache/disk_cache_test_base.h b/net/disk_cache/disk_cache_test_base.h index f3c8040..6c6b54b 100644 --- a/net/disk_cache/disk_cache_test_base.h +++ b/net/disk_cache/disk_cache_test_base.h @@ -36,7 +36,7 @@ class DiskCacheTestWithCache : public DiskCacheTest { : cache_(NULL), cache_impl_(NULL), mem_cache_(NULL), mask_(0), size_(0), memory_only_(false), implementation_(false), force_creation_(false), new_eviction_(false), first_cleanup_(true), integrity_(true), - cache_thread_("CacheThread") {} + use_current_thread_(false), cache_thread_("CacheThread") {} void InitCache(); virtual void TearDown(); @@ -75,6 +75,10 @@ class DiskCacheTestWithCache : public DiskCacheTest { integrity_ = false; } + void UseCurrentThread() { + use_current_thread_ = true; + } + // Utility methods to access the cache and wait for each operation to finish. int OpenEntry(const std::string& key, disk_cache::Entry** entry); int CreateEntry(const std::string& key, disk_cache::Entry** entry); @@ -84,6 +88,7 @@ class DiskCacheTestWithCache : public DiskCacheTest { const base::Time end_time); int DoomEntriesSince(const base::Time initial_time); int OpenNextEntry(void** iter, disk_cache::Entry** next_entry); + void FlushQueueForTest(); // cache_ will always have a valid object, regardless of how the cache was // initialized. The implementation pointers can be NULL. @@ -99,6 +104,7 @@ class DiskCacheTestWithCache : public DiskCacheTest { bool new_eviction_; bool first_cleanup_; bool integrity_; + bool use_current_thread_; // This is intentionally left uninitialized, to be used by any test. bool success_; diff --git a/net/disk_cache/entry_impl.cc b/net/disk_cache/entry_impl.cc index db7c284..549cea2 100644 --- a/net/disk_cache/entry_impl.cc +++ b/net/disk_cache/entry_impl.cc @@ -89,7 +89,6 @@ EntryImpl::EntryImpl(BackendImpl* backend, Addr address) for (int i = 0; i < kNumStreams; i++) { unreported_size_[i] = 0; } - key_file_ = NULL; } // When an entry is deleted from the cache, we clean up all the data associated @@ -131,7 +130,7 @@ EntryImpl::~EntryImpl() { backend_->CacheEntryDestroyed(entry_.address()); } -void EntryImpl::Doom() { +void EntryImpl::DoomImpl() { if (doomed_) return; @@ -139,8 +138,12 @@ void EntryImpl::Doom() { backend_->InternalDoomEntry(this); } +void EntryImpl::Doom() { + backend_->background_queue()->DoomEntryImpl(this); +} + void EntryImpl::Close() { - Release(); + backend_->background_queue()->CloseEntryImpl(this); } std::string EntryImpl::GetKey() const { @@ -148,26 +151,26 @@ std::string EntryImpl::GetKey() const { if (entry->Data()->key_len <= kMaxInternalKeyLength) return std::string(entry->Data()->key); + // We keep a copy of the key so that we can always return it, even if the + // backend is disabled. + if (!key_.empty()) + return key_; + Addr address(entry->Data()->long_key); DCHECK(address.is_initialized()); size_t offset = 0; if (address.is_block_file()) offset = address.start_block() * address.BlockSize() + kBlockHeaderSize; - if (!key_file_) { - // We keep a copy of the file needed to access the key so that we can - // always return this object's key, even if the backend is disabled. - COMPILE_ASSERT(kNumStreams == kKeyFileIndex, invalid_key_index); - key_file_ = const_cast<EntryImpl*>(this)->GetBackingFile(address, - kKeyFileIndex); - } + COMPILE_ASSERT(kNumStreams == kKeyFileIndex, invalid_key_index); + File* key_file = const_cast<EntryImpl*>(this)->GetBackingFile(address, + kKeyFileIndex); - std::string key; - if (!key_file_ || - !key_file_->Read(WriteInto(&key, entry->Data()->key_len + 1), - entry->Data()->key_len + 1, offset)) - key.clear(); - return key; + if (!key_file || + !key_file->Read(WriteInto(&key_, entry->Data()->key_len + 1), + entry->Data()->key_len + 1, offset)) + key_.clear(); + return key_; } Time EntryImpl::GetLastUsed() const { @@ -188,8 +191,8 @@ int32 EntryImpl::GetDataSize(int index) const { return entry->Data()->data_size[index]; } -int EntryImpl::ReadData(int index, int offset, net::IOBuffer* buf, int buf_len, - net::CompletionCallback* completion_callback) { +int EntryImpl::ReadDataImpl(int index, int offset, net::IOBuffer* buf, + int buf_len, CompletionCallback* callback) { DCHECK(node_.Data()->dirty); if (index < 0 || index >= kNumStreams) return net::ERR_INVALID_ARGUMENT; @@ -234,8 +237,8 @@ int EntryImpl::ReadData(int index, int offset, net::IOBuffer* buf, int buf_len, kBlockHeaderSize; SyncCallback* io_callback = NULL; - if (completion_callback) - io_callback = new SyncCallback(this, buf, completion_callback); + if (callback) + io_callback = new SyncCallback(this, buf, callback); bool completed; if (!file->Read(buf->data(), buf_len, file_offset, io_callback, &completed)) { @@ -248,12 +251,33 @@ int EntryImpl::ReadData(int index, int offset, net::IOBuffer* buf, int buf_len, io_callback->Discard(); ReportIOTime(kRead, start); - return (completed || !completion_callback) ? buf_len : net::ERR_IO_PENDING; + return (completed || !callback) ? buf_len : net::ERR_IO_PENDING; } -int EntryImpl::WriteData(int index, int offset, net::IOBuffer* buf, int buf_len, - net::CompletionCallback* completion_callback, - bool truncate) { +int EntryImpl::ReadData(int index, int offset, net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + if (!callback) + return ReadDataImpl(index, offset, buf, buf_len, callback); + + DCHECK(node_.Data()->dirty); + if (index < 0 || index >= kNumStreams) + return net::ERR_INVALID_ARGUMENT; + + int entry_size = entry_.Data()->data_size[index]; + if (offset >= entry_size || offset < 0 || !buf_len) + return 0; + + if (buf_len < 0) + return net::ERR_INVALID_ARGUMENT; + + backend_->background_queue()->ReadData(this, index, offset, buf, buf_len, + callback); + return net::ERR_IO_PENDING; +} + +int EntryImpl::WriteDataImpl(int index, int offset, net::IOBuffer* buf, + int buf_len, CompletionCallback* callback, + bool truncate) { DCHECK(node_.Data()->dirty); if (index < 0 || index >= kNumStreams) return net::ERR_INVALID_ARGUMENT; @@ -263,7 +287,7 @@ int EntryImpl::WriteData(int index, int offset, net::IOBuffer* buf, int buf_len, int max_file_size = backend_->MaxFileSize(); - // offset of buf_len could be negative numbers. + // offset or buf_len could be negative numbers. if (offset > max_file_size || buf_len > max_file_size || offset + buf_len > max_file_size) { int size = offset + buf_len; @@ -334,8 +358,8 @@ int EntryImpl::WriteData(int index, int offset, net::IOBuffer* buf, int buf_len, return 0; SyncCallback* io_callback = NULL; - if (completion_callback) - io_callback = new SyncCallback(this, buf, completion_callback); + if (callback) + io_callback = new SyncCallback(this, buf, callback); bool completed; if (!file->Write(buf->data(), buf_len, file_offset, io_callback, @@ -349,11 +373,28 @@ int EntryImpl::WriteData(int index, int offset, net::IOBuffer* buf, int buf_len, io_callback->Discard(); ReportIOTime(kWrite, start); - return (completed || !completion_callback) ? buf_len : net::ERR_IO_PENDING; + return (completed || !callback) ? buf_len : net::ERR_IO_PENDING; } -int EntryImpl::ReadSparseData(int64 offset, net::IOBuffer* buf, int buf_len, - net::CompletionCallback* completion_callback) { +int EntryImpl::WriteData(int index, int offset, net::IOBuffer* buf, int buf_len, + CompletionCallback* callback, bool truncate) { + if (!callback) + return WriteDataImpl(index, offset, buf, buf_len, callback, truncate); + + DCHECK(node_.Data()->dirty); + if (index < 0 || index >= kNumStreams) + return net::ERR_INVALID_ARGUMENT; + + if (offset < 0 || buf_len < 0) + return net::ERR_INVALID_ARGUMENT; + + backend_->background_queue()->WriteData(this, index, offset, buf, buf_len, + truncate, callback); + return net::ERR_IO_PENDING; +} + +int EntryImpl::ReadSparseDataImpl(int64 offset, net::IOBuffer* buf, int buf_len, + CompletionCallback* callback) { DCHECK(node_.Data()->dirty); int result = InitSparseData(); if (net::OK != result) @@ -361,13 +402,23 @@ int EntryImpl::ReadSparseData(int64 offset, net::IOBuffer* buf, int buf_len, TimeTicks start = TimeTicks::Now(); result = sparse_->StartIO(SparseControl::kReadOperation, offset, buf, buf_len, - completion_callback); + callback); ReportIOTime(kSparseRead, start); return result; } -int EntryImpl::WriteSparseData(int64 offset, net::IOBuffer* buf, int buf_len, - net::CompletionCallback* completion_callback) { +int EntryImpl::ReadSparseData(int64 offset, net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + if (!callback) + return ReadSparseDataImpl(offset, buf, buf_len, callback); + + backend_->background_queue()->ReadSparseData(this, offset, buf, buf_len, + callback); + return net::ERR_IO_PENDING; +} + +int EntryImpl::WriteSparseDataImpl(int64 offset, net::IOBuffer* buf, + int buf_len, CompletionCallback* callback) { DCHECK(node_.Data()->dirty); int result = InitSparseData(); if (net::OK != result) @@ -375,11 +426,25 @@ int EntryImpl::WriteSparseData(int64 offset, net::IOBuffer* buf, int buf_len, TimeTicks start = TimeTicks::Now(); result = sparse_->StartIO(SparseControl::kWriteOperation, offset, buf, - buf_len, completion_callback); + buf_len, callback); ReportIOTime(kSparseWrite, start); return result; } +int EntryImpl::WriteSparseData(int64 offset, net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback) { + if (!callback) + return WriteSparseDataImpl(offset, buf, buf_len, callback); + + backend_->background_queue()->WriteSparseData(this, offset, buf, buf_len, + callback); + return net::ERR_IO_PENDING; +} + +int EntryImpl::GetAvailableRangeImpl(int64 offset, int len, int64* start) { + return GetAvailableRange(offset, len, start); +} + int EntryImpl::GetAvailableRange(int64 offset, int len, int64* start) { int result = InitSparseData(); if (net::OK != result) @@ -390,7 +455,9 @@ int EntryImpl::GetAvailableRange(int64 offset, int len, int64* start) { int EntryImpl::GetAvailableRange(int64 offset, int len, int64* start, CompletionCallback* callback) { - return GetAvailableRange(offset, len, start); + backend_->background_queue()->GetAvailableRange(this, offset, len, start, + callback); + return net::ERR_IO_PENDING; } bool EntryImpl::CouldBeSparse() const { @@ -403,17 +470,27 @@ bool EntryImpl::CouldBeSparse() const { } void EntryImpl::CancelSparseIO() { + backend_->background_queue()->CancelSparseIO(this); +} + +void EntryImpl::CancelSparseIOImpl() { if (!sparse_.get()) return; sparse_->CancelIO(); } -int EntryImpl::ReadyForSparseIO(net::CompletionCallback* completion_callback) { +int EntryImpl::ReadyForSparseIOImpl(CompletionCallback* callback) { + DCHECK(sparse_.get()); + return sparse_->ReadyToUse(callback); +} + +int EntryImpl::ReadyForSparseIO(net::CompletionCallback* callback) { if (!sparse_.get()) return net::OK; - return sparse_->ReadyToUse(completion_callback); + backend_->background_queue()->ReadyForSparseIO(this, callback); + return net::ERR_IO_PENDING; } // ------------------------------------------------------------------------ @@ -444,19 +521,20 @@ bool EntryImpl::CreateEntry(Addr node_address, const std::string& key, return false; entry_store->long_key = address.value(); - key_file_ = GetBackingFile(address, kKeyFileIndex); + File* key_file = GetBackingFile(address, kKeyFileIndex); + key_ = key; size_t offset = 0; if (address.is_block_file()) offset = address.start_block() * address.BlockSize() + kBlockHeaderSize; - if (!key_file_ || !key_file_->Write(key.data(), key.size(), offset)) { + if (!key_file || !key_file->Write(key.data(), key.size(), offset)) { DeleteData(address, kKeyFileIndex); return false; } if (address.is_separate_file()) - key_file_->SetLength(key.size() + 1); + key_file->SetLength(key.size() + 1); } else { memcpy(entry_store->key, key.data(), key.size()); entry_store->key[key.size()] = '\0'; @@ -888,10 +966,12 @@ int EntryImpl::InitSparseData() { if (sparse_.get()) return net::OK; - sparse_.reset(new SparseControl(this)); - int result = sparse_->Init(); - if (net::OK != result) - sparse_.reset(); + // Use a local variable so that sparse_ never goes from 'valid' to NULL. + scoped_ptr<SparseControl> sparse(new SparseControl(this)); + int result = sparse->Init(); + if (net::OK == result) + sparse_.swap(sparse); + return result; } diff --git a/net/disk_cache/entry_impl.h b/net/disk_cache/entry_impl.h index 8ab4d44..1caece2 100644 --- a/net/disk_cache/entry_impl.h +++ b/net/disk_cache/entry_impl.h @@ -53,6 +53,20 @@ class EntryImpl : public Entry, public base::RefCounted<EntryImpl> { virtual void CancelSparseIO(); virtual int ReadyForSparseIO(net::CompletionCallback* completion_callback); + // Background implementation of the Entry interface. + void DoomImpl(); + int ReadDataImpl(int index, int offset, net::IOBuffer* buf, int buf_len, + CompletionCallback* callback); + int WriteDataImpl(int index, int offset, net::IOBuffer* buf, int buf_len, + CompletionCallback* callback, bool truncate); + int ReadSparseDataImpl(int64 offset, net::IOBuffer* buf, int buf_len, + CompletionCallback* callback); + int WriteSparseDataImpl(int64 offset, net::IOBuffer* buf, int buf_len, + CompletionCallback* callback); + int GetAvailableRangeImpl(int64 offset, int len, int64* start); + void CancelSparseIOImpl(); + int ReadyForSparseIOImpl(CompletionCallback* callback); + inline CacheEntryBlock* entry() { return &entry_; } @@ -186,8 +200,7 @@ class EntryImpl : public Entry, public base::RefCounted<EntryImpl> { scoped_array<char> user_buffers_[kNumStreams]; // Store user data. // Files to store external user data and key. scoped_refptr<File> files_[kNumStreams + 1]; - // Copy of the file used to store the key. We don't own this object. - mutable File* key_file_; + mutable std::string key_; // Copy of the key. int unreported_size_[kNumStreams]; // Bytes not reported yet to the backend. bool doomed_; // True if this entry was removed from the cache. scoped_ptr<SparseControl> sparse_; // Support for sparse entries. diff --git a/net/disk_cache/entry_unittest.cc b/net/disk_cache/entry_unittest.cc index 95bbb47..067f260 100644 --- a/net/disk_cache/entry_unittest.cc +++ b/net/disk_cache/entry_unittest.cc @@ -82,10 +82,12 @@ void DiskCacheEntryTest::InternalSyncIO() { entry1->Doom(); entry1->Close(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } TEST_F(DiskCacheEntryTest, InternalSyncIO) { + SetDirectMode(); InitCache(); InternalSyncIO(); } @@ -223,10 +225,12 @@ void DiskCacheEntryTest::InternalAsyncIO() { entry1->Doom(); entry1->Close(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } TEST_F(DiskCacheEntryTest, InternalAsyncIO) { + SetDirectMode(); InitCache(); InternalAsyncIO(); } @@ -269,10 +273,12 @@ void DiskCacheEntryTest::ExternalSyncIO() { entry1->Doom(); entry1->Close(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } TEST_F(DiskCacheEntryTest, ExternalSyncIO) { + SetDirectMode(); InitCache(); ExternalSyncIO(); } @@ -367,19 +373,21 @@ void DiskCacheEntryTest::ExternalAsyncIO() { EXPECT_TRUE(17000 == ret || net::ERR_IO_PENDING == ret); if (net::ERR_IO_PENDING == ret) expected++; - EXPECT_EQ(37000, entry1->GetDataSize(1)); EXPECT_TRUE(helper.WaitUntilCacheIoFinished(expected)); + EXPECT_EQ(37000, entry1->GetDataSize(1)); EXPECT_FALSE(g_cache_tests_error); EXPECT_EQ(expected, g_cache_tests_received); entry1->Doom(); entry1->Close(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } TEST_F(DiskCacheEntryTest, ExternalAsyncIO) { + SetDirectMode(); InitCache(); ExternalAsyncIO(); } @@ -788,10 +796,12 @@ void DiskCacheEntryTest::DoomNormalEntry() { entry1->Doom(); entry1->Close(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); } TEST_F(DiskCacheEntryTest, DoomEntry) { + SetDirectMode(); InitCache(); DoomNormalEntry(); } @@ -809,6 +819,7 @@ void DiskCacheEntryTest::DoomedEntry() { ASSERT_EQ(net::OK, CreateEntry(key, &entry)); entry->Doom(); + FlushQueueForTest(); EXPECT_EQ(0, cache_->GetEntryCount()); Time initial = Time::Now(); PlatformThread::Sleep(20); @@ -831,6 +842,7 @@ void DiskCacheEntryTest::DoomedEntry() { } TEST_F(DiskCacheEntryTest, DoomedEntry) { + SetDirectMode(); InitCache(); DoomedEntry(); } @@ -1263,6 +1275,8 @@ void DiskCacheEntryTest::DoomSparseEntry() { } TEST_F(DiskCacheEntryTest, DoomSparseEntry) { + SetDirectMode(); + UseCurrentThread(); InitCache(); DoomSparseEntry(); } @@ -1421,6 +1435,7 @@ TEST_F(DiskCacheEntryTest, CleanupSparseEntry) { } TEST_F(DiskCacheEntryTest, CancelSparseIO) { + UseCurrentThread(); InitCache(); std::string key("the first key"); disk_cache::Entry* entry; @@ -1430,25 +1445,20 @@ TEST_F(DiskCacheEntryTest, CancelSparseIO) { scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(kSize); CacheTestFillBuffer(buf->data(), kSize, false); + // This will open and write two "real" entries. TestCompletionCallback cb1, cb2, cb3, cb4, cb5; + int rv = entry->WriteSparseData(1024 * 1024 - 4096, buf, kSize, &cb1); + EXPECT_EQ(net::ERR_IO_PENDING, rv); + int64 offset = 0; - int tries = 0; - const int maxtries = 100; // Avoid hang on infinitely fast disks. - for (int ret = 0; ret != net::ERR_IO_PENDING; offset += kSize * 4) { - ret = entry->WriteSparseData(offset, buf, kSize, &cb1); - if (++tries > maxtries) { - LOG(ERROR) << "Data writes never come back PENDING; skipping test"; - entry->Close(); - return; - } + rv = entry->GetAvailableRange(offset, kSize, &offset, &cb5); + rv = cb5.GetResult(rv); + if (!cb1.have_result()) { + // We may or may not have finished writing to the entry. If we have not, + // we cannot start another operation at this time. + EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, rv); } - // Cannot use the entry at this point. - offset = 0; - int rv = entry->GetAvailableRange(offset, kSize, &offset, &cb5); - EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, cb5.GetResult(rv)); - EXPECT_EQ(net::OK, entry->ReadyForSparseIO(&cb2)); - // We cancel the pending operation, and register multiple notifications. entry->CancelSparseIO(); EXPECT_EQ(net::ERR_IO_PENDING, entry->ReadyForSparseIO(&cb2)); @@ -1456,22 +1466,22 @@ TEST_F(DiskCacheEntryTest, CancelSparseIO) { entry->CancelSparseIO(); // Should be a no op at this point. EXPECT_EQ(net::ERR_IO_PENDING, entry->ReadyForSparseIO(&cb4)); - offset = 0; - rv = entry->GetAvailableRange(offset, kSize, &offset, &cb5); - EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, cb5.GetResult(rv)); - EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, - entry->ReadSparseData(offset, buf, kSize, NULL)); - EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, - entry->WriteSparseData(offset, buf, kSize, NULL)); - - // Now see if we receive all notifications. - EXPECT_EQ(kSize, cb1.GetResult(net::ERR_IO_PENDING)); - EXPECT_EQ(net::OK, cb2.GetResult(net::ERR_IO_PENDING)); - EXPECT_EQ(net::OK, cb3.GetResult(net::ERR_IO_PENDING)); - EXPECT_EQ(net::OK, cb4.GetResult(net::ERR_IO_PENDING)); + if (!cb1.have_result()) { + EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, + entry->ReadSparseData(offset, buf, kSize, NULL)); + EXPECT_EQ(net::ERR_CACHE_OPERATION_NOT_SUPPORTED, + entry->WriteSparseData(offset, buf, kSize, NULL)); + } + + // Now see if we receive all notifications. Note that we should not be able + // to write everything (unless the timing of the system is really weird). + rv = cb1.WaitForResult(); + EXPECT_TRUE(rv == 4096 || rv == kSize); + EXPECT_EQ(net::OK, cb2.WaitForResult()); + EXPECT_EQ(net::OK, cb3.WaitForResult()); + EXPECT_EQ(net::OK, cb4.WaitForResult()); rv = entry->GetAvailableRange(offset, kSize, &offset, &cb5); - EXPECT_EQ(kSize, cb5.GetResult(rv)); - EXPECT_EQ(net::OK, entry->ReadyForSparseIO(&cb2)); + EXPECT_EQ(0, cb5.GetResult(rv)); entry->Close(); } diff --git a/net/disk_cache/eviction.cc b/net/disk_cache/eviction.cc index 0122b7a..e2e810a 100644 --- a/net/disk_cache/eviction.cc +++ b/net/disk_cache/eviction.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Copyright (c) 2006-2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -224,7 +224,7 @@ bool Eviction::EvictEntry(CacheRankingsBlock* node, bool empty) { ReportTrimTimes(entry); if (empty || !new_eviction_) { - entry->Doom(); + entry->DoomImpl(); } else { entry->DeleteEntryData(false); EntryStore* info = entry->entry()->Data(); @@ -453,7 +453,7 @@ bool Eviction::RemoveDeletedNode(CacheRankingsBlock* node) { } bool doomed = (entry->entry()->Data()->state == ENTRY_DOOMED); entry->entry()->Data()->state = ENTRY_DOOMED; - entry->Doom(); + entry->DoomImpl(); entry->Release(); return !doomed; } diff --git a/net/disk_cache/file_posix.cc b/net/disk_cache/file_posix.cc index cf621f4..295f744 100644 --- a/net/disk_cache/file_posix.cc +++ b/net/disk_cache/file_posix.cc @@ -193,6 +193,9 @@ void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len, io_list_.insert(operation.get()); file->AddRef(); // Balanced on InvokeCallback() + if (!callback_thread_) + callback_thread_ = MessageLoop::current(); + WorkerPool::PostTask(FROM_HERE, NewRunnableMethod(operation.get(), &BackgroundIO::Read), true); @@ -207,6 +210,9 @@ void InFlightIO::PostWrite(disk_cache::File* file, const void* buf, io_list_.insert(operation.get()); file->AddRef(); // Balanced on InvokeCallback() + if (!callback_thread_) + callback_thread_ = MessageLoop::current(); + WorkerPool::PostTask(FROM_HERE, NewRunnableMethod(operation.get(), &BackgroundIO::Write, delete_buffer), @@ -219,6 +225,8 @@ void InFlightIO::WaitForPendingIO() { IOList::iterator it = io_list_.begin(); InvokeCallback(*it, true); } + // Unit tests can use different threads. + callback_thread_ = NULL; } // Runs on a worker thread. @@ -372,8 +380,9 @@ size_t File::GetLength() { // Static. void File::WaitForPendingIO(int* num_pending_io) { - if (*num_pending_io) - Singleton<InFlightIO>::get()->WaitForPendingIO(); + // We may be running unit tests so we should allow InFlightIO to reset the + // message loop. + Singleton<InFlightIO>::get()->WaitForPendingIO(); } } // namespace disk_cache diff --git a/net/disk_cache/in_flight_backend_io.cc b/net/disk_cache/in_flight_backend_io.cc new file mode 100644 index 0000000..1e2ce9f --- /dev/null +++ b/net/disk_cache/in_flight_backend_io.cc @@ -0,0 +1,448 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/disk_cache/in_flight_backend_io.h" + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "net/base/net_errors.h" +#include "net/disk_cache/backend_impl.h" +#include "net/disk_cache/entry_impl.h" + +namespace disk_cache { + +BackendIO::BackendIO(InFlightIO* controller, BackendImpl* backend, + net::CompletionCallback* callback) + : BackgroundIO(controller), backend_(backend), callback_(callback), + operation_(OP_NONE), + ALLOW_THIS_IN_INITIALIZER_LIST( + my_callback_(this, &BackendIO::OnIOComplete)) { +} + +// Runs on the background thread. +void BackendIO::ExecuteOperation() { + if (IsEntryOperation()) + return ExecuteEntryOperation(); + + ExecuteBackendOperation(); +} + +// Runs on the background thread. +void BackendIO::OnIOComplete(int result) { + DCHECK(IsEntryOperation()); + DCHECK_NE(result, net::ERR_IO_PENDING); + result_ = result; + controller_->OnIOComplete(this); +} + +bool BackendIO::IsEntryOperation() { + return operation_ > OP_MAX_BACKEND; +} + +void BackendIO::ReleaseEntry() { + entry_ = NULL; +} + +void BackendIO::Init() { + operation_ = OP_INIT; +} + +void BackendIO::OpenEntry(const std::string& key, Entry** entry) { + operation_ = OP_OPEN; + key_ = key; + entry_ptr_ = entry; +} + +void BackendIO::CreateEntry(const std::string& key, Entry** entry) { + operation_ = OP_CREATE; + key_ = key; + entry_ptr_ = entry; +} + +void BackendIO::DoomEntry(const std::string& key) { + operation_ = OP_DOOM; + key_ = key; +} + +void BackendIO::DoomAllEntries() { + operation_ = OP_DOOM_ALL; +} + +void BackendIO::DoomEntriesBetween(const base::Time initial_time, + const base::Time end_time) { + operation_ = OP_DOOM_BETWEEN; + initial_time_ = initial_time; + end_time_ = end_time; +} + +void BackendIO::DoomEntriesSince(const base::Time initial_time) { + operation_ = OP_DOOM_SINCE; + initial_time_ = initial_time; +} + +void BackendIO::OpenNextEntry(void** iter, Entry** next_entry) { + operation_ = OP_OPEN_NEXT; + iter_ptr_ = iter; + entry_ptr_ = next_entry; +} + +void BackendIO::OpenPrevEntry(void** iter, Entry** prev_entry) { + operation_ = OP_OPEN_PREV; + iter_ptr_ = iter; + entry_ptr_ = prev_entry; +} + +void BackendIO::EndEnumeration(void* iterator) { + operation_ = OP_END_ENUMERATION; + iter_ = iterator; +} + +void BackendIO::CloseEntryImpl(EntryImpl* entry) { + operation_ = OP_CLOSE_ENTRY; + entry_ = entry; +} + +void BackendIO::DoomEntryImpl(EntryImpl* entry) { + operation_ = OP_DOOM_ENTRY; + entry_ = entry; +} + +void BackendIO::FlushQueue() { + operation_ = OP_FLUSH_QUEUE; +} + +void BackendIO::ReadData(EntryImpl* entry, int index, int offset, + net::IOBuffer* buf, int buf_len) { + operation_ = OP_READ; + entry_ = entry; + index_ = index; + offset_ = offset; + buf_ = buf; + buf_len_ = buf_len; +} + +void BackendIO::WriteData(EntryImpl* entry, int index, int offset, + net::IOBuffer* buf, int buf_len, bool truncate) { + operation_ = OP_WRITE; + entry_ = entry; + index_ = index; + offset_ = offset; + buf_ = buf; + buf_len_ = buf_len; + truncate_ = truncate; +} + +void BackendIO::ReadSparseData(EntryImpl* entry, int64 offset, + net::IOBuffer* buf, int buf_len) { + operation_ = OP_READ_SPARSE; + entry_ = entry; + offset64_ = offset; + buf_ = buf; + buf_len_ = buf_len; +} + +void BackendIO::WriteSparseData(EntryImpl* entry, int64 offset, + net::IOBuffer* buf, int buf_len) { + operation_ = OP_WRITE_SPARSE; + entry_ = entry; + offset64_ = offset; + buf_ = buf; + buf_len_ = buf_len; +} + +void BackendIO::GetAvailableRange(EntryImpl* entry, int64 offset, int len, + int64* start) { + operation_ = OP_GET_RANGE; + entry_ = entry; + offset64_ = offset; + buf_len_ = len; + start_ = start; +} + +void BackendIO::CancelSparseIO(EntryImpl* entry) { + operation_ = OP_CANCEL_IO; + entry_ = entry; +} + +void BackendIO::ReadyForSparseIO(EntryImpl* entry) { + operation_ = OP_IS_READY; + entry_ = entry; +} + +// Runs on the background thread. +void BackendIO::ExecuteBackendOperation() { + switch (operation_) { + case OP_INIT: + result_ = backend_->SyncInit(); + break; + case OP_OPEN: + result_ = backend_->SyncOpenEntry(key_, entry_ptr_); + break; + case OP_CREATE: + result_ = backend_->SyncCreateEntry(key_, entry_ptr_); + break; + case OP_DOOM: + result_ = backend_->SyncDoomEntry(key_); + break; + case OP_DOOM_ALL: + result_ = backend_->SyncDoomAllEntries(); + break; + case OP_DOOM_BETWEEN: + result_ = backend_->SyncDoomEntriesBetween(initial_time_, end_time_); + break; + case OP_DOOM_SINCE: + result_ = backend_->SyncDoomEntriesSince(initial_time_); + break; + case OP_OPEN_NEXT: + result_ = backend_->SyncOpenNextEntry(iter_ptr_, entry_ptr_); + break; + case OP_OPEN_PREV: + result_ = backend_->SyncOpenPrevEntry(iter_ptr_, entry_ptr_); + break; + case OP_END_ENUMERATION: + backend_->SyncEndEnumeration(iter_); + result_ = net::OK; + break; + case OP_CLOSE_ENTRY: + entry_->Release(); + result_ = net::OK; + break; + case OP_DOOM_ENTRY: + entry_->DoomImpl(); + result_ = net::OK; + break; + case OP_FLUSH_QUEUE: + result_ = net::OK; + break; + default: + NOTREACHED() << "Invalid Operation"; + result_ = net::ERR_UNEXPECTED; + } + DCHECK_NE(net::ERR_IO_PENDING, result_); + controller_->OnIOComplete(this); +} + +// Runs on the background thread. +void BackendIO::ExecuteEntryOperation() { + switch (operation_) { + case OP_READ: + result_ = entry_->ReadDataImpl(index_, offset_, buf_, buf_len_, + &my_callback_); + break; + case OP_WRITE: + result_ = entry_->WriteDataImpl(index_, offset_, buf_, buf_len_, + &my_callback_, truncate_); + break; + case OP_READ_SPARSE: + result_ = entry_->ReadSparseDataImpl(offset64_, buf_, buf_len_, + &my_callback_); + break; + case OP_WRITE_SPARSE: + result_ = entry_->WriteSparseDataImpl(offset64_, buf_, buf_len_, + &my_callback_); + break; + case OP_GET_RANGE: + result_ = entry_->GetAvailableRangeImpl(offset64_, buf_len_, start_); + break; + case OP_CANCEL_IO: + entry_->CancelSparseIOImpl(); + result_ = net::OK; + break; + case OP_IS_READY: + result_ = entry_->ReadyForSparseIOImpl(&my_callback_); + break; + default: + NOTREACHED() << "Invalid Operation"; + result_ = net::ERR_UNEXPECTED; + } + if (result_ != net::ERR_IO_PENDING) + controller_->OnIOComplete(this); +} + +// --------------------------------------------------------------------------- + +void InFlightBackendIO::Init(CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->Init(); + QueueOperation(operation); +} + +void InFlightBackendIO::OpenEntry(const std::string& key, Entry** entry, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->OpenEntry(key, entry); + QueueOperation(operation); +} + +void InFlightBackendIO::CreateEntry(const std::string& key, Entry** entry, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->CreateEntry(key, entry); + QueueOperation(operation); +} + +void InFlightBackendIO::DoomEntry(const std::string& key, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->DoomEntry(key); + QueueOperation(operation); +} + +void InFlightBackendIO::DoomAllEntries(CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->DoomAllEntries(); + QueueOperation(operation); +} + +void InFlightBackendIO::DoomEntriesBetween(const base::Time initial_time, + const base::Time end_time, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->DoomEntriesBetween(initial_time, end_time); + QueueOperation(operation); +} + +void InFlightBackendIO::DoomEntriesSince(const base::Time initial_time, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->DoomEntriesSince(initial_time); + QueueOperation(operation); +} + +void InFlightBackendIO::OpenNextEntry(void** iter, Entry** next_entry, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->OpenNextEntry(iter, next_entry); + QueueOperation(operation); +} + +void InFlightBackendIO::OpenPrevEntry(void** iter, Entry** prev_entry, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->OpenPrevEntry(iter, prev_entry); + QueueOperation(operation); +} + +void InFlightBackendIO::EndEnumeration(void* iterator) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, NULL); + operation->EndEnumeration(iterator); + QueueOperation(operation); +} + +void InFlightBackendIO::CloseEntryImpl(EntryImpl* entry) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, NULL); + operation->CloseEntryImpl(entry); + QueueOperation(operation); +} + +void InFlightBackendIO::DoomEntryImpl(EntryImpl* entry) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, NULL); + operation->DoomEntryImpl(entry); + QueueOperation(operation); +} + +void InFlightBackendIO::FlushQueue(net::CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->FlushQueue(); + QueueOperation(operation); +} + +void InFlightBackendIO::ReadData(EntryImpl* entry, int index, int offset, + net::IOBuffer* buf, int buf_len, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->ReadData(entry, index, offset, buf, buf_len); + QueueOperation(operation); +} + +void InFlightBackendIO::WriteData(EntryImpl* entry, int index, int offset, + net::IOBuffer* buf, int buf_len, + bool truncate, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->WriteData(entry, index, offset, buf, buf_len, truncate); + QueueOperation(operation); +} + +void InFlightBackendIO::ReadSparseData(EntryImpl* entry, int64 offset, + net::IOBuffer* buf, int buf_len, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->ReadSparseData(entry, offset, buf, buf_len); + QueueOperation(operation); +} + +void InFlightBackendIO::WriteSparseData(EntryImpl* entry, int64 offset, + net::IOBuffer* buf, int buf_len, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->WriteSparseData(entry, offset, buf, buf_len); + QueueOperation(operation); +} + +void InFlightBackendIO::GetAvailableRange(EntryImpl* entry, int64 offset, + int len, int64* start, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->GetAvailableRange(entry, offset, len, start); + QueueOperation(operation); +} + +void InFlightBackendIO::CancelSparseIO(EntryImpl* entry) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, NULL); + operation->CancelSparseIO(entry); + QueueOperation(operation); +} + +void InFlightBackendIO::ReadyForSparseIO(EntryImpl* entry, + CompletionCallback* callback) { + scoped_refptr<BackendIO> operation = new BackendIO(this, backend_, callback); + operation->ReadyForSparseIO(entry); + QueueOperation(operation); +} + +void InFlightBackendIO::WaitForPendingIO() { + // We clear the list first so that we don't post more operations after this + // point. + pending_ops_.clear(); + InFlightIO::WaitForPendingIO(); +} + +void InFlightBackendIO::OnOperationComplete(BackgroundIO* operation, + bool cancel) { + BackendIO* op = static_cast<BackendIO*>(operation); + + if (!op->IsEntryOperation() && !pending_ops_.empty()) { + // Process the next request. Note that invoking the callback may result + // in the backend destruction (and with it this object), so we should deal + // with the next operation before invoking the callback. + scoped_refptr<BackendIO> next_op = pending_ops_.front(); + pending_ops_.pop_front(); + PostOperation(next_op); + } + + if (op->callback() && (!cancel || op->IsEntryOperation())) + op->callback()->Run(op->result()); + + if (cancel) + op->ReleaseEntry(); +} + +void InFlightBackendIO::QueueOperation(BackendIO* operation) { + if (operation->IsEntryOperation()) + return PostOperation(operation); + + if (pending_ops_.empty()) + return PostOperation(operation); + + pending_ops_.push_back(operation); +} + +void InFlightBackendIO::PostOperation(BackendIO* operation) { + background_thread_->PostTask(FROM_HERE, + NewRunnableMethod(operation, &BackendIO::ExecuteOperation)); + OnOperationPosted(operation); +} + +} // namespace diff --git a/net/disk_cache/in_flight_backend_io.h b/net/disk_cache/in_flight_backend_io.h new file mode 100644 index 0000000..4db2d11 --- /dev/null +++ b/net/disk_cache/in_flight_backend_io.h @@ -0,0 +1,200 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_DISK_CACHE_IN_FLIGHT_BACKEND_IO_H_ +#define NET_DISK_CACHE_IN_FLIGHT_BACKEND_IO_H_ + +#include <string> + +#include "base/message_loop_proxy.h" +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" +#include "net/disk_cache/entry_impl.h" +#include "net/disk_cache/in_flight_io.h" + +namespace disk_cache { + +class BackendImpl; + +// This class represents a single asynchronous disk cache IO operation while it +// is being bounced between threads. +class BackendIO : public BackgroundIO { + public: + BackendIO(InFlightIO* controller, BackendImpl* backend, + net::CompletionCallback* callback); + + // Runs the actual operation on the background thread. + void ExecuteOperation(); + + // Callback implementation. + void OnIOComplete(int result); + + // Returns true if this operation is directed to an entry (vs. the backend). + bool IsEntryOperation(); + + net::CompletionCallback* callback() { return callback_; } + + void ReleaseEntry(); + + // The operations we proxy: + void Init(); + void OpenEntry(const std::string& key, Entry** entry); + void CreateEntry(const std::string& key, Entry** entry); + void DoomEntry(const std::string& key); + void DoomAllEntries(); + void DoomEntriesBetween(const base::Time initial_time, + const base::Time end_time); + void DoomEntriesSince(const base::Time initial_time); + void OpenNextEntry(void** iter, Entry** next_entry); + void OpenPrevEntry(void** iter, Entry** prev_entry); + void EndEnumeration(void* iterator); + void CloseEntryImpl(EntryImpl* entry); + void DoomEntryImpl(EntryImpl* entry); + void FlushQueue(); // Dummy operation. + void ReadData(EntryImpl* entry, int index, int offset, net::IOBuffer* buf, + int buf_len); + void WriteData(EntryImpl* entry, int index, int offset, net::IOBuffer* buf, + int buf_len, bool truncate); + void ReadSparseData(EntryImpl* entry, int64 offset, net::IOBuffer* buf, + int buf_len); + void WriteSparseData(EntryImpl* entry, int64 offset, net::IOBuffer* buf, + int buf_len); + void GetAvailableRange(EntryImpl* entry, int64 offset, int len, int64* start); + void CancelSparseIO(EntryImpl* entry); + void ReadyForSparseIO(EntryImpl* entry); + + private: + // There are two types of operations to proxy: regular backend operations are + // queued so that we don't have more than one operation going on at the same + // time (for instance opening an entry and creating the same entry). On the + // other hand, operations targeted to a given entry can be long lived and + // support multiple simultaneous users (multiple reads or writes to the same + // entry), so they are not queued, just posted to the worker thread as they + // come. + enum Operation { + OP_NONE = 0, + OP_INIT, + OP_OPEN, + OP_CREATE, + OP_DOOM, + OP_DOOM_ALL, + OP_DOOM_BETWEEN, + OP_DOOM_SINCE, + OP_OPEN_NEXT, + OP_OPEN_PREV, + OP_END_ENUMERATION, + OP_CLOSE_ENTRY, + OP_DOOM_ENTRY, + OP_FLUSH_QUEUE, + OP_MAX_BACKEND, + OP_READ, + OP_WRITE, + OP_READ_SPARSE, + OP_WRITE_SPARSE, + OP_GET_RANGE, + OP_CANCEL_IO, + OP_IS_READY + }; + + ~BackendIO() {} + + void ExecuteBackendOperation(); + void ExecuteEntryOperation(); + + BackendImpl* backend_; + net::CompletionCallback* callback_; + Operation operation_; + net::CompletionCallbackImpl<BackendIO> my_callback_; + + // The arguments of all the operations we proxy: + std::string key_; + Entry** entry_ptr_; + base::Time initial_time_; + base::Time end_time_; + void** iter_ptr_; + void* iter_; + EntryImpl* entry_; + int index_; + int offset_; + scoped_refptr<net::IOBuffer> buf_; + int buf_len_; + bool truncate_; + int64 offset64_; + int64* start_; + + DISALLOW_COPY_AND_ASSIGN(BackendIO); +}; + +// The specialized controller that keeps track of current operations. +class InFlightBackendIO : public InFlightIO { + public: + InFlightBackendIO(BackendImpl* backend, + base::MessageLoopProxy* background_thread) + : backend_(backend), background_thread_(background_thread) {} + ~InFlightBackendIO() {} + + // The operations we proxy: + void Init(net::CompletionCallback* callback); + void OpenEntry(const std::string& key, Entry** entry, + net::CompletionCallback* callback); + void CreateEntry(const std::string& key, Entry** entry, + net::CompletionCallback* callback); + void DoomEntry(const std::string& key, net::CompletionCallback* callback); + void DoomAllEntries(net::CompletionCallback* callback); + void DoomEntriesBetween(const base::Time initial_time, + const base::Time end_time, + net::CompletionCallback* callback); + void DoomEntriesSince(const base::Time initial_time, + net::CompletionCallback* callback); + void OpenNextEntry(void** iter, Entry** next_entry, + net::CompletionCallback* callback); + void OpenPrevEntry(void** iter, Entry** prev_entry, + net::CompletionCallback* callback); + void EndEnumeration(void* iterator); + void CloseEntryImpl(EntryImpl* entry); + void DoomEntryImpl(EntryImpl* entry); + void FlushQueue(net::CompletionCallback* callback); + void ReadData(EntryImpl* entry, int index, int offset, net::IOBuffer* buf, + int buf_len, net::CompletionCallback* callback); + void WriteData(EntryImpl* entry, int index, int offset, net::IOBuffer* buf, + int buf_len, bool truncate, net::CompletionCallback* callback); + void ReadSparseData(EntryImpl* entry, int64 offset, net::IOBuffer* buf, + int buf_len, net::CompletionCallback* callback); + void WriteSparseData(EntryImpl* entry, int64 offset, net::IOBuffer* buf, + int buf_len, net::CompletionCallback* callback); + void GetAvailableRange(EntryImpl* entry, int64 offset, int len, int64* start, + net::CompletionCallback* callback); + void CancelSparseIO(EntryImpl* entry); + void ReadyForSparseIO(EntryImpl* entry, net::CompletionCallback* callback); + + // Blocks until all operations are cancelled or completed. + void WaitForPendingIO(); + + scoped_refptr<base::MessageLoopProxy> background_thread() { + return background_thread_; + } + + // Returns true if the current thread is the background thread. + bool BackgroundIsCurrentThread() { + return background_thread_->BelongsToCurrentThread(); + } + + protected: + virtual void OnOperationComplete(BackgroundIO* operation, bool cancel); + + private: + typedef std::list<scoped_refptr<BackendIO> > OperationList; + void QueueOperation(BackendIO* operation); + void PostOperation(BackendIO* operation); + + BackendImpl* backend_; + scoped_refptr<base::MessageLoopProxy> background_thread_; + OperationList pending_ops_; // The list of operations to be posted. + + DISALLOW_COPY_AND_ASSIGN(InFlightBackendIO); +}; + +} // namespace disk_cache + +#endif // NET_DISK_CACHE_IN_FLIGHT_BACKEND_IO_H_ diff --git a/net/disk_cache/in_flight_io.cc b/net/disk_cache/in_flight_io.cc new file mode 100644 index 0000000..24b0e9c --- /dev/null +++ b/net/disk_cache/in_flight_io.cc @@ -0,0 +1,73 @@ +// Copyright (c) 2006-2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/disk_cache/in_flight_io.h" + +#include "base/logging.h" + +namespace disk_cache { + +// Runs on the primary thread. +void BackgroundIO::OnIOSignalled() { + if (controller_) + controller_->InvokeCallback(this, false); +} + +void BackgroundIO::Cancel() { + DCHECK(controller_); + controller_ = NULL; +} + +// Runs on the background thread. +void BackgroundIO::NotifyController() { + controller_->OnIOComplete(this); +} + +// --------------------------------------------------------------------------- + +void InFlightIO::WaitForPendingIO() { + while (!io_list_.empty()) { + // Block the current thread until all pending IO completes. + IOList::iterator it = io_list_.begin(); + InvokeCallback(*it, true); + } +} + +// Runs on a background thread. +void InFlightIO::OnIOComplete(BackgroundIO* operation) { +#ifndef NDEBUG + if (callback_thread_ == MessageLoop::current()) { + DCHECK(single_thread_ || !running_); + single_thread_ = true; + } + running_ = true; +#endif + + callback_thread_->PostTask(FROM_HERE, + NewRunnableMethod(operation, + &BackgroundIO::OnIOSignalled)); + operation->io_completed()->Signal(); +} + +// Runs on the primary thread. +void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) { + operation->io_completed()->Wait(); + + if (cancel_task) + operation->Cancel(); + + // Make sure that we remove the operation from the list before invoking the + // callback (so that a subsequent cancel does not invoke the callback again). + DCHECK(io_list_.find(operation) != io_list_.end()); + io_list_.erase(operation); + OnOperationComplete(operation, cancel_task); +} + +// Runs on the primary thread. +void InFlightIO::OnOperationPosted(BackgroundIO* operation) { + DCHECK(callback_thread_ == MessageLoop::current()); + io_list_.insert(operation); +} + +} // namespace disk_cache diff --git a/net/disk_cache/in_flight_io.h b/net/disk_cache/in_flight_io.h new file mode 100644 index 0000000..75552cb7 --- /dev/null +++ b/net/disk_cache/in_flight_io.h @@ -0,0 +1,134 @@ +// Copyright (c) 2006-2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_DISK_CACHE_IN_FLIGHT_IO_H_ +#define NET_DISK_CACHE_IN_FLIGHT_IO_H_ + +#include <set> + +#include "base/message_loop.h" +#include "base/waitable_event.h" + +namespace disk_cache { + +class InFlightIO; + +// This class represents a single asynchronous IO operation while it is being +// bounced between threads. +class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> { + public: + // Other than the actual parameters for the IO operation (including the + // |callback| that must be notified at the end), we need the controller that + // is keeping track of all operations. When done, we notify the controller + // (we do NOT invoke the callback), in the worker thead that completed the + // operation. + explicit BackgroundIO(InFlightIO* controller) + : controller_(controller), result_(-1), io_completed_(true, false) {} + + // This method signals the controller that this operation is finished, in the + // original thread. In practice, this is a RunableMethod that allows + // cancellation. + void OnIOSignalled(); + + // Allows the cancellation of the task to notify the controller (step number 8 + // in the diagram below). In practice, if the controller waits for the + // operation to finish it doesn't have to wait for the final task to be + // processed by the message loop so calling this method prevents its delivery. + // Note that this method is not intended to cancel the actual IO operation or + // to prevent the first notification to take place (OnIOComplete). + void Cancel(); + + int result() { return result_; } + + base::WaitableEvent* io_completed() { + return &io_completed_; + } + + protected: + virtual ~BackgroundIO() {} + + InFlightIO* controller_; // The controller that tracks all operations. + int result_; // Final operation result. + + private: + friend class base::RefCountedThreadSafe<BackgroundIO>; + + // Notifies the controller about the end of the operation, from the background + // thread. + void NotifyController(); + + // An event to signal when the operation completes. + base::WaitableEvent io_completed_; + + DISALLOW_COPY_AND_ASSIGN(BackgroundIO); +}; + +// This class keeps track of asynchronous IO operations. A single instance +// of this class is meant to be used to start an asynchronous operation (using +// PostXX, exposed by a derived class). This class will post the operation to a +// worker thread, hanlde the notification when the operation finishes and +// perform the callback on the same thread that was used to start the operation. +// +// The regular sequence of calls is: +// Thread_1 Worker_thread +// 1. DerivedInFlightIO::PostXX() +// 2. -> PostTask -> +// 3. InFlightIO::OnOperationPosted() +// 4. DerivedBackgroundIO::XX() +// 5. IO operation completes +// 6. InFlightIO::OnIOComplete() +// 7. <- PostTask <- +// 8. BackgroundIO::OnIOSignalled() +// 9. InFlightIO::InvokeCallback() +// 10. DerivedInFlightIO::OnOperationComplete() +// 11. invoke callback +// +// Shutdown is a special case that is handled though WaitForPendingIO() instead +// of just waiting for step 7. +class InFlightIO { + public: + InFlightIO() + : callback_thread_(MessageLoop::current()), running_(false), + single_thread_(false) {} + virtual ~InFlightIO() {} + + // Blocks the current thread until all IO operations tracked by this object + // complete. + void WaitForPendingIO(); + + // Called on a background thread when |operation| completes. + void OnIOComplete(BackgroundIO* operation); + + // Invokes the users' completion callback at the end of the IO operation. + // |cancel_task| is true if the actual task posted to the thread is still + // queued (because we are inside WaitForPendingIO), and false if said task is + // the one performing the call. + void InvokeCallback(BackgroundIO* operation, bool cancel_task); + + protected: + // This method is called to signal the completion of the |operation|. |cancel| + // is true if the operation is being cancelled. This method is called on the + // thread that created this object. + virtual void OnOperationComplete(BackgroundIO* operation, bool cancel) = 0; + + // Signals this object that the derived class just posted the |operation| to + // be executed on a background thread. This method must be called on the same + // thread used to create this object. + void OnOperationPosted(BackgroundIO* operation); + + private: + typedef std::set<scoped_refptr<BackgroundIO> > IOList; + + IOList io_list_; // List of pending, in-flight io operations. + MessageLoop* callback_thread_; + + bool running_; // True after the first posted operation completes. + bool single_thread_; // True if we only have one thread. + + DISALLOW_COPY_AND_ASSIGN(InFlightIO); +}; + +} // namespace disk_cache + +#endif // NET_DISK_CACHE_IN_FLIGHT_IO_H_ diff --git a/net/disk_cache/sparse_control.cc b/net/disk_cache/sparse_control.cc index 2d9d3c1..884a1b7 100644 --- a/net/disk_cache/sparse_control.cc +++ b/net/disk_cache/sparse_control.cc @@ -126,7 +126,7 @@ void ChildrenDeleter::DeleteChildren() { return Release(); } std::string child_name = GenerateChildName(name_, signature_, child_id); - backend_->DoomEntry(child_name); + backend_->SyncDoomEntry(child_name); children_map_.Set(child_id, false); // Post a task to delete the next child. @@ -373,7 +373,7 @@ bool SparseControl::OpenChild() { CloseChild(); } - // Se if we are tracking this child. + // See if we are tracking this child. if (!ChildPresent()) return ContinueWithoutChild(key); @@ -418,7 +418,7 @@ void SparseControl::CloseChild() { NULL, false); if (rv != sizeof(child_data_)) DLOG(ERROR) << "Failed to save child data"; - child_->Close(); + child_->Release(); child_ = NULL; } @@ -430,8 +430,8 @@ std::string SparseControl::GenerateChildKey() { // We are deleting the child because something went wrong. bool SparseControl::KillChildAndContinue(const std::string& key, bool fatal) { SetChildBit(false); - child_->Doom(); - child_->Close(); + child_->DoomImpl(); + child_->Release(); child_ = NULL; if (fatal) { result_ = net::ERR_CACHE_READ_FAILURE; @@ -617,12 +617,12 @@ bool SparseControl::DoChildIO() { int rv = 0; switch (operation_) { case kReadOperation: - rv = child_->ReadData(kSparseData, child_offset_, user_buf_, child_len_, - callback); + rv = child_->ReadDataImpl(kSparseData, child_offset_, user_buf_, + child_len_, callback); break; case kWriteOperation: - rv = child_->WriteData(kSparseData, child_offset_, user_buf_, child_len_, - callback, false); + rv = child_->WriteDataImpl(kSparseData, child_offset_, user_buf_, + child_len_, callback, false); break; case kGetRangeOperation: rv = DoGetAvailableRange(); diff --git a/net/disk_cache/stress_cache.cc b/net/disk_cache/stress_cache.cc index db84fd8..b153659 100644 --- a/net/disk_cache/stress_cache.cc +++ b/net/disk_cache/stress_cache.cc @@ -140,7 +140,6 @@ void StressTheCache(int iteration) { if (!(i % 100)) printf("Entries: %d \r", i); - MessageLoop::current()->RunAllPending(); } } diff --git a/net/http/http_cache.cc b/net/http/http_cache.cc index 7e73d94..3758a51 100644 --- a/net/http/http_cache.cc +++ b/net/http/http_cache.cc @@ -902,6 +902,7 @@ void HttpCache::OnIOComplete(int result, PendingOp* pending_op) { if (op == WI_CREATE_ENTRY) pending_op->disk_entry->Doom(); pending_op->disk_entry->Close(); + pending_op->disk_entry = NULL; fail_requests = true; } } diff --git a/net/http/http_cache_unittest.cc b/net/http/http_cache_unittest.cc index f48899c..dc114da 100644 --- a/net/http/http_cache_unittest.cc +++ b/net/http/http_cache_unittest.cc @@ -369,9 +369,7 @@ class MockDiskCache : public disk_cache::Backend { } ~MockDiskCache() { - EntryMap::iterator it = entries_.begin(); - for (; it != entries_.end(); ++it) - it->second->Release(); + ReleaseAll(); } virtual int32 GetEntryCount() const { @@ -496,6 +494,13 @@ class MockDiskCache : public disk_cache::Backend { // Return entries that fail some of their requests. void set_soft_failures(bool value) { soft_failures_ = value; } + void ReleaseAll() { + EntryMap::iterator it = entries_.begin(); + for (; it != entries_.end(); ++it) + it->second->Release(); + entries_.clear(); + } + private: typedef base::hash_map<std::string, MockDiskEntry*> EntryMap; @@ -1720,6 +1725,29 @@ TEST(HttpCache, SimpleGET_ManyWriters_CancelCreate) { } } +// Tests that we can cancel a single request to open a disk cache entry. +TEST(HttpCache, SimpleGET_CancelCreate) { + MockHttpCache cache; + + MockHttpRequest request(kSimpleGET_Transaction); + + Context* c = new Context(); + + c->result = cache.http_cache()->CreateTransaction(&c->trans); + EXPECT_EQ(net::OK, c->result); + + c->result = c->trans->Start(&request, &c->callback, net::BoundNetLog()); + EXPECT_EQ(net::ERR_IO_PENDING, c->result); + + // Release the reference that the mock disk cache keeps for this entry, so + // that we test that the http cache handles the cancelation correctly. + cache.disk_cache()->ReleaseAll(); + delete c; + + MessageLoop::current()->RunAllPending(); + EXPECT_EQ(1, cache.disk_cache()->create_count()); +} + // Tests that we delete/create entries even if multiple requests are queued. TEST(HttpCache, SimpleGET_ManyWriters_BypassCache) { MockHttpCache cache; diff --git a/net/net.gyp b/net/net.gyp index 908e17d..6bbde7f 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -265,6 +265,10 @@ 'disk_cache/hash.cc', 'disk_cache/hash.h', 'disk_cache/histogram_macros.h', + 'disk_cache/in_flight_backend_io.cc', + 'disk_cache/in_flight_backend_io.h', + 'disk_cache/in_flight_io.cc', + 'disk_cache/in_flight_io.h', 'disk_cache/mapped_file.h', 'disk_cache/mapped_file_posix.cc', 'disk_cache/mapped_file_win.cc', diff --git a/net/tools/crash_cache/crash_cache.cc b/net/tools/crash_cache/crash_cache.cc index a26c873..1d11013 100644 --- a/net/tools/crash_cache/crash_cache.cc +++ b/net/tools/crash_cache/crash_cache.cc @@ -118,6 +118,14 @@ bool CreateTargetFolder(const FilePath& path, RankCrashes action, return file_util::CreateDirectory(*full_path); } +// Makes sure that any pending task is processed. +void FlushQueue(disk_cache::Backend* cache) { + TestCompletionCallback cb; + int rv = + reinterpret_cast<disk_cache::BackendImpl*>(cache)->FlushQueueForTest(&cb); + cb.GetResult(rv); // Ignore the result; +} + // Generates the files for an empty and one item cache. int SimpleInsert(const FilePath& path, RankCrashes action, base::Thread* cache_thread) { @@ -142,6 +150,7 @@ int SimpleInsert(const FilePath& path, RankCrashes action, return GENERIC; entry->Close(); + FlushQueue(cache); DCHECK(action <= disk_cache::INSERT_ONE_3); g_rankings_crash = action; @@ -162,7 +171,8 @@ int SimpleRemove(const FilePath& path, RankCrashes action, TestCompletionCallback cb; disk_cache::Backend* cache; - int rv = disk_cache::CreateCacheBackend(net::DISK_CACHE, path, 0, false, + // Use a simple LRU for eviction. + int rv = disk_cache::CreateCacheBackend(net::MEDIA_CACHE, path, 0, false, cache_thread->message_loop_proxy(), &cache, &cb); if (cb.GetResult(rv) != net::OK || cache->GetEntryCount()) @@ -174,6 +184,7 @@ int SimpleRemove(const FilePath& path, RankCrashes action, return GENERIC; entry->Close(); + FlushQueue(cache); if (action >= disk_cache::REMOVE_TAIL_1) { rv = cache->CreateEntry("some other key", &entry, &cb); @@ -181,6 +192,7 @@ int SimpleRemove(const FilePath& path, RankCrashes action, return GENERIC; entry->Close(); + FlushQueue(cache); } rv = cache->OpenEntry(kCrashEntryName, &entry, &cb); @@ -190,6 +202,7 @@ int SimpleRemove(const FilePath& path, RankCrashes action, g_rankings_crash = action; entry->Doom(); entry->Close(); + FlushQueue(cache); return NOT_REACHED; } @@ -201,7 +214,8 @@ int HeadRemove(const FilePath& path, RankCrashes action, TestCompletionCallback cb; disk_cache::Backend* cache; - int rv = disk_cache::CreateCacheBackend(net::DISK_CACHE, path, 0, false, + // Use a simple LRU for eviction. + int rv = disk_cache::CreateCacheBackend(net::MEDIA_CACHE, path, 0, false, cache_thread->message_loop_proxy(), &cache, &cb); if (cb.GetResult(rv) != net::OK || cache->GetEntryCount()) @@ -213,11 +227,13 @@ int HeadRemove(const FilePath& path, RankCrashes action, return GENERIC; entry->Close(); + FlushQueue(cache); rv = cache->CreateEntry(kCrashEntryName, &entry, &cb); if (cb.GetResult(rv) != net::OK) return GENERIC; entry->Close(); + FlushQueue(cache); rv = cache->OpenEntry(kCrashEntryName, &entry, &cb); if (cb.GetResult(rv) != net::OK) @@ -226,6 +242,7 @@ int HeadRemove(const FilePath& path, RankCrashes action, g_rankings_crash = action; entry->Doom(); entry->Close(); + FlushQueue(cache); return NOT_REACHED; } @@ -241,14 +258,14 @@ int LoadOperations(const FilePath& path, RankCrashes action, if (!cache || !cache->SetMaxSize(0x100000)) return GENERIC; - if (!cache->Init() || cache->GetEntryCount()) + TestCompletionCallback cb; + int rv = cache->Init(&cb); + if (cb.GetResult(rv) != net::OK || cache->GetEntryCount()) return GENERIC; int seed = static_cast<int>(Time::Now().ToInternalValue()); srand(seed); - TestCompletionCallback cb; - int rv; disk_cache::Entry* entry; for (int i = 0; i < 100; i++) { std::string key = GenerateKey(true); @@ -256,11 +273,13 @@ int LoadOperations(const FilePath& path, RankCrashes action, if (cb.GetResult(rv) != net::OK) return GENERIC; entry->Close(); + FlushQueue(cache); if (50 == i && action >= disk_cache::REMOVE_LOAD_1) { rv = cache->CreateEntry(kCrashEntryName, &entry, &cb); if (cb.GetResult(rv) != net::OK) return GENERIC; entry->Close(); + FlushQueue(cache); } } @@ -280,6 +299,7 @@ int LoadOperations(const FilePath& path, RankCrashes action, entry->Doom(); entry->Close(); + FlushQueue(cache); return NOT_REACHED; } diff --git a/net/tools/dump_cache/upgrade.cc b/net/tools/dump_cache/upgrade.cc index 5a3cd67..60099b2 100644 --- a/net/tools/dump_cache/upgrade.cc +++ b/net/tools/dump_cache/upgrade.cc @@ -104,7 +104,8 @@ enum { RESULT_OK = 0, RESULT_UNKNOWN_COMMAND, RESULT_INVALID_PARAMETER, - RESULT_NAME_OVERFLOW + RESULT_NAME_OVERFLOW, + RESULT_PENDING // This error code is NOT expected by the master process. }; // ----------------------------------------------------------------------- @@ -575,6 +576,7 @@ class SlaveSM : public BaseSM { void DoGetNextEntry(); void DoGetPrevEntry(); int32 GetEntryFromList(); + void DoGetEntryComplete(int result); void DoCloseEntry(); void DoGetKey(); void DoGetUseTimes(); @@ -585,16 +587,19 @@ class SlaveSM : public BaseSM { void Fail(); void* iterator_; - Message msg_; // Only used for DoReadDataComplete. + Message msg_; // Used for DoReadDataComplete and DoGetEntryComplete. net::CompletionCallbackImpl<SlaveSM> read_callback_; + net::CompletionCallbackImpl<SlaveSM> next_callback_; scoped_ptr<disk_cache::BackendImpl> cache_; }; SlaveSM::SlaveSM(const std::wstring& path, HANDLE channel) : BaseSM(channel), iterator_(NULL), ALLOW_THIS_IN_INITIALIZER_LIST( - read_callback_(this, &SlaveSM::DoReadDataComplete)) { + read_callback_(this, &SlaveSM::DoReadDataComplete)), + ALLOW_THIS_IN_INITIALIZER_LIST( + next_callback_(this, &SlaveSM::DoGetEntryComplete)) { disk_cache::Backend* cache; TestCompletionCallback cb; int rv = disk_cache::CreateCacheBackend(net::DISK_CACHE, @@ -672,6 +677,9 @@ bool SlaveSM::DoInit() { DEBUGMSG("\t\t\tSlave DoInit\n"); DCHECK(state_ == SLAVE_INITIAL); state_ = SLAVE_WAITING; + if (!cache_.get()) + return false; + return ReceiveMsg(); } @@ -700,6 +708,11 @@ void SlaveSM::DoGetPrevEntry() { msg.result = RESULT_UNKNOWN_COMMAND; } else { msg.result = GetEntryFromList(); + if (msg.result == RESULT_PENDING) { + // We are not done yet. + msg_ = msg; + return; + } msg.long_arg1 = reinterpret_cast<int64>(entry_); } SendMsg(msg); @@ -715,23 +728,31 @@ int32 SlaveSM::GetEntryFromList() { if (entry_) entry_->Close(); - bool ret; + int rv; if (input_->msg.command == GET_NEXT_ENTRY) { - ret = cache_->OpenNextEntry(&iterator_, - reinterpret_cast<disk_cache::Entry**>(&entry_)); + rv = cache_->OpenNextEntry(&iterator_, + reinterpret_cast<disk_cache::Entry**>(&entry_), + &next_callback_); } else { DCHECK(input_->msg.command == GET_PREV_ENTRY); - ret = cache_->OpenPrevEntry(&iterator_, - reinterpret_cast<disk_cache::Entry**>(&entry_)); + rv = cache_->OpenPrevEntry(&iterator_, + reinterpret_cast<disk_cache::Entry**>(&entry_), + &next_callback_); } + DCHECK_EQ(net::ERR_IO_PENDING, rv); + return RESULT_PENDING; +} - if (!ret) +void SlaveSM::DoGetEntryComplete(int result) { + DEBUGMSG("\t\t\tSlave DoGetEntryComplete\n"); + if (result != net::OK) { entry_ = NULL; - - if (!entry_) DEBUGMSG("\t\t\tSlave end of list\n"); + } - return RESULT_OK; + msg_.result = RESULT_OK; + msg_.long_arg1 = reinterpret_cast<int64>(entry_); + SendMsg(msg_); } void SlaveSM::DoCloseEntry() { |