summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authormaruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-08-28 22:04:53 +0000
committermaruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-08-28 22:04:53 +0000
commit5eea271e9cb548e53ac1fb8385eb88f82cc6786e (patch)
tree7e4f76fabdbc8a0586eb02c8c9a71216ac46f6aa /tools
parent7a58fec1f2d37466fe97e9d5acefb391f47c30ca (diff)
downloadchromium_src-5eea271e9cb548e53ac1fb8385eb88f82cc6786e.zip
chromium_src-5eea271e9cb548e53ac1fb8385eb88f82cc6786e.tar.gz
chromium_src-5eea271e9cb548e53ac1fb8385eb88f82cc6786e.tar.bz2
Add class Remote to handling asynchronous fetches.
Keeps the fetches completely synchronous for now to simplify the CL. The next CL will use the asynchronous support to fetch multiple items simultaneously and prioritize fetches. R=cmp@chromium.org NOTRY=true BUG= Review URL: https://chromiumcodereview.appspot.com/10871044 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@153754 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-xtools/isolate/run_test_from_archive.py149
1 files changed, 133 insertions, 16 deletions
diff --git a/tools/isolate/run_test_from_archive.py b/tools/isolate/run_test_from_archive.py
index e568899..f0c0393 100755
--- a/tools/isolate/run_test_from_archive.py
+++ b/tools/isolate/run_test_from_archive.py
@@ -13,12 +13,14 @@ import json
import logging
import optparse
import os
+import Queue
import re
import shutil
import stat
import subprocess
import sys
import tempfile
+import threading
import time
import urllib
@@ -149,17 +151,6 @@ def open_remote(file_or_url):
return open(file_or_url, 'rb')
-def download_or_copy(file_or_url, dest):
- """Copies a file or download an url."""
- if re.match(r'^https?://.+$', file_or_url):
- try:
- urllib.URLopener().retrieve(file_or_url, dest)
- except IOError:
- logging.error('Failed to download ' + file_or_url)
- else:
- shutil.copy(file_or_url, dest)
-
-
def get_free_space(path):
"""Returns the number of free bytes."""
if sys.platform == 'win32':
@@ -182,7 +173,11 @@ def make_temp_dir(prefix, root_dir):
def load_manifest(content):
"""Verifies the manifest is valid and loads this object with the json data.
"""
- data = json.loads(content)
+ try:
+ data = json.loads(content)
+ except ValueError:
+ raise ConfigError('Failed to parse: %s...' % content[:100])
+
if not isinstance(data, dict):
raise ConfigError('Expected dict, got %r' % data)
@@ -260,6 +255,126 @@ class Profiler(object):
self.name, time_taken)
+class Remote(object):
+ """Priority based worker queue to fetch files from a content-address server.
+
+ Supports local file system, CIFS or http remotes.
+
+ When the priority of items is equals, works in strict FIFO mode.
+ """
+ # Initial and maximum number of worker threads.
+ INITIAL_WORKERS = 2
+ MAX_WORKERS = 16
+ # Priorities.
+ LOW, MED, HIGH = (1<<8, 2<<8, 3<<8)
+ INTERNAL_PRIORITY_BITS = (1<<8) - 1
+
+ def __init__(self, file_or_url):
+ # Function to fetch a remote object.
+ self._do_item = self._get_remote_fetcher(file_or_url)
+ # Contains tuple(priority, index, obj, destination).
+ self._queue = Queue.PriorityQueue()
+ # Contains tuple(priority, index, obj).
+ self._done = Queue.PriorityQueue()
+
+ # To keep FIFO ordering in self._queue. It is assumed xrange's iterator is
+ # thread-safe.
+ self._next_index = xrange(0, 1<<31).__iter__().next
+
+ # Control access to the following member.
+ self._ready_lock = threading.Lock()
+ # Number of threads in wait state.
+ self._ready = 0
+
+ # Control access to the following member.
+ self._workers_lock = threading.Lock()
+ self._workers = []
+ for _ in range(self.INITIAL_WORKERS):
+ self._add_worker()
+
+ def fetch_item(self, priority, obj, dest):
+ """Retrieves an object from the remote data store.
+
+ The smaller |priority| gets fetched first.
+
+ Thread-safe.
+ """
+ assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
+ self._fetch(priority, obj, dest)
+
+ def get_result(self):
+ """Returns the next file that was successfully fetched."""
+ r = self._done.get()
+ if r[0] == '-1':
+ # It's an exception.
+ raise r[2][0], r[2][1], r[2][2]
+ return r[2]
+
+ def _fetch(self, priority, obj, dest):
+ with self._ready_lock:
+ start_new_worker = not self._ready
+ self._queue.put((priority, self._next_index(), obj, dest))
+ if start_new_worker:
+ self._add_worker()
+
+ def _add_worker(self):
+ """Add one worker thread if there isn't too many. Thread-safe."""
+ with self._workers_lock:
+ if len(self._workers) >= self.MAX_WORKERS:
+ return False
+ worker = threading.Thread(target=self._run)
+ self._workers.append(worker)
+ worker.daemon = True
+ worker.start()
+
+ def _run(self):
+ """Worker thread loop."""
+ while True:
+ try:
+ with self._ready_lock:
+ self._ready += 1
+ item = self._queue.get()
+ finally:
+ with self._ready_lock:
+ self._ready -= 1
+ if not item:
+ return
+ priority, index, obj, dest = item
+ try:
+ self._do_item(obj, dest)
+ except IOError:
+ # Retry a few times, lowering the priority.
+ if (priority & self.INTERNAL_PRIORITY_BITS) < 5:
+ self._fetch(priority + 1, obj, dest)
+ continue
+ # Transfers the exception back. It has maximum priority.
+ self._done.put((-1, 0, sys.exc_info()))
+ except:
+ # Transfers the exception back. It has maximum priority.
+ self._done.put((-1, 0, sys.exc_info()))
+ else:
+ self._done.put((priority, index, obj))
+
+ @staticmethod
+ def _get_remote_fetcher(file_or_url):
+ """Returns a object to retrieve objects from a remote."""
+ if re.match(r'^https?://.+$', file_or_url):
+ file_or_url = file_or_url.rstrip('/') + '/'
+ def download_file(item, dest):
+ # TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this
+ # easy.
+ source = file_or_url + item
+ logging.debug('download_file(%s, %s)', source, dest)
+ urllib.urlretrieve(source, dest)
+ return download_file
+
+ def copy_file(item, dest):
+ source = os.path.join(file_or_url, item)
+ logging.debug('copy_file(%s, %s)', source, dest)
+ shutil.copy(source, dest)
+ return copy_file
+
+
class CachePolicies(object):
def __init__(self, max_cache_size, min_free_space, max_items):
"""
@@ -287,8 +402,7 @@ class Cache(object):
"""
Arguments:
- cache_dir: Directory where to place the cache.
- - remote: Remote directory (NFS, SMB, etc) or HTTP url to fetch the objects
- from
+ - remote: Remote where to fetch items from.
- policies: cache retention policies.
"""
self.cache_dir = cache_dir
@@ -342,6 +456,7 @@ class Cache(object):
logging.debug(self.files_added)
def remove_lru_file(self):
+ """Removes the last recently used file."""
try:
filename = self.state.pop(0)
full_path = self.path(filename)
@@ -407,7 +522,9 @@ class Cache(object):
except ValueError:
out = self.path(item)
start_retrieve = time.time()
- download_or_copy(self.remote.rstrip('/') + '/' + item, out)
+ self.remote.fetch_item(Remote.MED, item, out)
+ # TODO(maruel): Temporarily fetch the files serially.
+ self.remote.get_result()
if os.path.exists(out):
self.state.append(item)
self.files_added.append((out, os.stat(out).st_size))
@@ -431,7 +548,7 @@ def run_tha_test(manifest, cache_dir, remote, policies):
"""Downloads the dependencies in the cache, hardlinks them into a temporary
directory and runs the executable.
"""
- with Cache(cache_dir, remote, policies) as cache:
+ with Cache(cache_dir, Remote(remote), policies) as cache:
outdir = make_temp_dir('run_tha_test', cache_dir)
if not 'files' in manifest: