# Copyright 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from file_system import FileSystem, FileNotFoundError from future import Future from test_file_system import TestFileSystem class MockFileSystem(FileSystem): '''Wraps FileSystems to add a selection of mock behaviour: - asserting how often Stat/Read calls are being made to it. - primitive changes/versioning via applying object "diffs", mapping paths to new content (similar to how TestFileSystem works). ''' def __init__(self, file_system): self._file_system = file_system # Updates are modelled are stored as TestFileSystems because they've # implemented a bunch of logic to interpret paths into dictionaries. self._updates = [] self._read_count = 0 self._stat_count = 0 @staticmethod def Create(file_system, updates): mock_file_system = MockFileSystem(file_system) for update in updates: mock_file_system.Update(update) return mock_file_system # # FileSystem implementation. # def Read(self, paths, binary=False): '''Reads |paths| from |_file_system|, then applies the most recent update from |_updates|, if any. ''' self._read_count += 1 future_result = self._file_system.Read(paths, binary=binary) try: result = future_result.Get() except: return future_result for path in result.iterkeys(): _, update = self._GetMostRecentUpdate(path) if update is not None: result[path] = update return Future(value=result) def _GetMostRecentUpdate(self, path): for revision, update in reversed(list(enumerate(self._updates))): try: return (revision + 1, update.ReadSingle(path)) except FileNotFoundError: pass return (0, None) def Stat(self, path): self._stat_count += 1 return self._StatImpl(path) def _StatImpl(self, path): result = self._file_system.Stat(path) result.version = self._UpdateStat(result.version, path) child_versions = result.child_versions if child_versions is not None: for child_path in child_versions.iterkeys(): child_versions[child_path] = self._UpdateStat( child_versions[child_path], '%s%s' % (path, child_path)) return result def _UpdateStat(self, version, path): if not path.endswith('/'): return str(int(version) + self._GetMostRecentUpdate(path)[0]) # Bleh, it's a directory, need to recursively search all the children. child_paths = self._file_system.ReadSingle(path) if not child_paths: return version return str(max([int(version)] + [int(self._StatImpl('%s%s' % (path, child_path)).version) for child_path in child_paths])) def GetIdentity(self): return self._file_system.GetIdentity() def __str__(self): return repr(self) def __repr__(self): return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % ( self._read_count, self._stat_count, len(self._updates)) # # Testing methods. # def GetReadCount(self): return self._read_count def GetStatCount(self): return self._stat_count def CheckAndReset(self, stat_count=0, read_count=0): '''Returns a tuple (success, error). Use in tests like: self.assertTrue(*object_store.CheckAndReset(...)) ''' errors = [] for desc, expected, actual in ( ('read_count', read_count, self._read_count), ('stat_count', stat_count, self._stat_count)): if actual != expected: errors.append('%s: expected %s got %s' % (desc, expected, actual)) try: return (len(errors) == 0, ', '.join(errors)) finally: self.Reset() def Reset(self): self._read_count = 0 self._stat_count = 0 def Update(self, update): self._updates.append(TestFileSystem(update))