diff options
author | maruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-08-28 22:04:53 +0000 |
---|---|---|
committer | maruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-08-28 22:04:53 +0000 |
commit | 5eea271e9cb548e53ac1fb8385eb88f82cc6786e (patch) | |
tree | 7e4f76fabdbc8a0586eb02c8c9a71216ac46f6aa /tools | |
parent | 7a58fec1f2d37466fe97e9d5acefb391f47c30ca (diff) | |
download | chromium_src-5eea271e9cb548e53ac1fb8385eb88f82cc6786e.zip chromium_src-5eea271e9cb548e53ac1fb8385eb88f82cc6786e.tar.gz chromium_src-5eea271e9cb548e53ac1fb8385eb88f82cc6786e.tar.bz2 |
Add class Remote to handling asynchronous fetches.
Keeps the fetches completely synchronous for now to simplify the CL. The next
CL will use the asynchronous support to fetch multiple items simultaneously and
prioritize fetches.
R=cmp@chromium.org
NOTRY=true
BUG=
Review URL: https://chromiumcodereview.appspot.com/10871044
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@153754 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/isolate/run_test_from_archive.py | 149 |
1 files changed, 133 insertions, 16 deletions
diff --git a/tools/isolate/run_test_from_archive.py b/tools/isolate/run_test_from_archive.py index e568899..f0c0393 100755 --- a/tools/isolate/run_test_from_archive.py +++ b/tools/isolate/run_test_from_archive.py @@ -13,12 +13,14 @@ import json import logging import optparse import os +import Queue import re import shutil import stat import subprocess import sys import tempfile +import threading import time import urllib @@ -149,17 +151,6 @@ def open_remote(file_or_url): return open(file_or_url, 'rb') -def download_or_copy(file_or_url, dest): - """Copies a file or download an url.""" - if re.match(r'^https?://.+$', file_or_url): - try: - urllib.URLopener().retrieve(file_or_url, dest) - except IOError: - logging.error('Failed to download ' + file_or_url) - else: - shutil.copy(file_or_url, dest) - - def get_free_space(path): """Returns the number of free bytes.""" if sys.platform == 'win32': @@ -182,7 +173,11 @@ def make_temp_dir(prefix, root_dir): def load_manifest(content): """Verifies the manifest is valid and loads this object with the json data. """ - data = json.loads(content) + try: + data = json.loads(content) + except ValueError: + raise ConfigError('Failed to parse: %s...' % content[:100]) + if not isinstance(data, dict): raise ConfigError('Expected dict, got %r' % data) @@ -260,6 +255,126 @@ class Profiler(object): self.name, time_taken) +class Remote(object): + """Priority based worker queue to fetch files from a content-address server. + + Supports local file system, CIFS or http remotes. + + When the priority of items is equals, works in strict FIFO mode. + """ + # Initial and maximum number of worker threads. + INITIAL_WORKERS = 2 + MAX_WORKERS = 16 + # Priorities. + LOW, MED, HIGH = (1<<8, 2<<8, 3<<8) + INTERNAL_PRIORITY_BITS = (1<<8) - 1 + + def __init__(self, file_or_url): + # Function to fetch a remote object. + self._do_item = self._get_remote_fetcher(file_or_url) + # Contains tuple(priority, index, obj, destination). + self._queue = Queue.PriorityQueue() + # Contains tuple(priority, index, obj). + self._done = Queue.PriorityQueue() + + # To keep FIFO ordering in self._queue. It is assumed xrange's iterator is + # thread-safe. + self._next_index = xrange(0, 1<<31).__iter__().next + + # Control access to the following member. + self._ready_lock = threading.Lock() + # Number of threads in wait state. + self._ready = 0 + + # Control access to the following member. + self._workers_lock = threading.Lock() + self._workers = [] + for _ in range(self.INITIAL_WORKERS): + self._add_worker() + + def fetch_item(self, priority, obj, dest): + """Retrieves an object from the remote data store. + + The smaller |priority| gets fetched first. + + Thread-safe. + """ + assert (priority & self.INTERNAL_PRIORITY_BITS) == 0 + self._fetch(priority, obj, dest) + + def get_result(self): + """Returns the next file that was successfully fetched.""" + r = self._done.get() + if r[0] == '-1': + # It's an exception. + raise r[2][0], r[2][1], r[2][2] + return r[2] + + def _fetch(self, priority, obj, dest): + with self._ready_lock: + start_new_worker = not self._ready + self._queue.put((priority, self._next_index(), obj, dest)) + if start_new_worker: + self._add_worker() + + def _add_worker(self): + """Add one worker thread if there isn't too many. Thread-safe.""" + with self._workers_lock: + if len(self._workers) >= self.MAX_WORKERS: + return False + worker = threading.Thread(target=self._run) + self._workers.append(worker) + worker.daemon = True + worker.start() + + def _run(self): + """Worker thread loop.""" + while True: + try: + with self._ready_lock: + self._ready += 1 + item = self._queue.get() + finally: + with self._ready_lock: + self._ready -= 1 + if not item: + return + priority, index, obj, dest = item + try: + self._do_item(obj, dest) + except IOError: + # Retry a few times, lowering the priority. + if (priority & self.INTERNAL_PRIORITY_BITS) < 5: + self._fetch(priority + 1, obj, dest) + continue + # Transfers the exception back. It has maximum priority. + self._done.put((-1, 0, sys.exc_info())) + except: + # Transfers the exception back. It has maximum priority. + self._done.put((-1, 0, sys.exc_info())) + else: + self._done.put((priority, index, obj)) + + @staticmethod + def _get_remote_fetcher(file_or_url): + """Returns a object to retrieve objects from a remote.""" + if re.match(r'^https?://.+$', file_or_url): + file_or_url = file_or_url.rstrip('/') + '/' + def download_file(item, dest): + # TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this + # easy. + source = file_or_url + item + logging.debug('download_file(%s, %s)', source, dest) + urllib.urlretrieve(source, dest) + return download_file + + def copy_file(item, dest): + source = os.path.join(file_or_url, item) + logging.debug('copy_file(%s, %s)', source, dest) + shutil.copy(source, dest) + return copy_file + + class CachePolicies(object): def __init__(self, max_cache_size, min_free_space, max_items): """ @@ -287,8 +402,7 @@ class Cache(object): """ Arguments: - cache_dir: Directory where to place the cache. - - remote: Remote directory (NFS, SMB, etc) or HTTP url to fetch the objects - from + - remote: Remote where to fetch items from. - policies: cache retention policies. """ self.cache_dir = cache_dir @@ -342,6 +456,7 @@ class Cache(object): logging.debug(self.files_added) def remove_lru_file(self): + """Removes the last recently used file.""" try: filename = self.state.pop(0) full_path = self.path(filename) @@ -407,7 +522,9 @@ class Cache(object): except ValueError: out = self.path(item) start_retrieve = time.time() - download_or_copy(self.remote.rstrip('/') + '/' + item, out) + self.remote.fetch_item(Remote.MED, item, out) + # TODO(maruel): Temporarily fetch the files serially. + self.remote.get_result() if os.path.exists(out): self.state.append(item) self.files_added.append((out, os.stat(out).st_size)) @@ -431,7 +548,7 @@ def run_tha_test(manifest, cache_dir, remote, policies): """Downloads the dependencies in the cache, hardlinks them into a temporary directory and runs the executable. """ - with Cache(cache_dir, remote, policies) as cache: + with Cache(cache_dir, Remote(remote), policies) as cache: outdir = make_temp_dir('run_tha_test', cache_dir) if not 'files' in manifest: |