diff options
author | maruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-07-19 18:15:11 +0000 |
---|---|---|
committer | maruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-07-19 18:15:11 +0000 |
commit | 1c7a70157c2dae86faba2afd36c7da51398b779b (patch) | |
tree | 2c63ea73d9f0dc79fb808af6f8c8148447d45343 /tools | |
parent | dca4c16aafe0fe480a9857ec8ea83f80b4d1fccf (diff) | |
download | chromium_src-1c7a70157c2dae86faba2afd36c7da51398b779b.zip chromium_src-1c7a70157c2dae86faba2afd36c7da51398b779b.tar.gz chromium_src-1c7a70157c2dae86faba2afd36c7da51398b779b.tar.bz2 |
Merge worker_pool into run_test_cases.
This makes run_test_cases.py completely standalone so it can be easily used on
the swarm slave.
R=cmp@chromium.org
NOTRY=true
BUG=
TEST=
Review URL: https://chromiumcodereview.appspot.com/10803030
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@147492 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/isolate/run_test_cases.py | 147 | ||||
-rwxr-xr-x | tools/isolate/run_test_cases_test.py | 25 | ||||
-rwxr-xr-x | tools/isolate/trace_inputs_smoke_test.py | 4 | ||||
-rwxr-xr-x | tools/isolate/trace_test_cases.py | 5 | ||||
-rw-r--r-- | tools/isolate/worker_pool.py | 149 | ||||
-rwxr-xr-x | tools/isolate/worker_pool_test.py | 37 |
6 files changed, 172 insertions, 195 deletions
diff --git a/tools/isolate/run_test_cases.py b/tools/isolate/run_test_cases.py index f9fffc8..0fdf269 100755 --- a/tools/isolate/run_test_cases.py +++ b/tools/isolate/run_test_cases.py @@ -15,12 +15,12 @@ import logging import multiprocessing import optparse import os +import Queue import subprocess import sys +import threading import time -import worker_pool - if subprocess.mswindows: import msvcrt # pylint: disable=F0401 @@ -153,6 +153,145 @@ def call_with_timeout(cmd, timeout, **kwargs): return output, proc.returncode +class QueueWithTimeout(Queue.Queue): + """Implements timeout support in join().""" + + # QueueWithTimeout.join: Arguments number differs from overridden method + # pylint: disable=W0221 + def join(self, timeout=None): + """Returns True if all tasks are finished.""" + if not timeout: + return Queue.Queue.join(self) + start = time.time() + self.all_tasks_done.acquire() + try: + while self.unfinished_tasks: + remaining = time.time() - start - timeout + if remaining <= 0: + break + self.all_tasks_done.wait(remaining) + return not self.unfinished_tasks + finally: + self.all_tasks_done.release() + + +class WorkerThread(threading.Thread): + """Keeps the results of each task in a thread-local outputs variable.""" + def __init__(self, tasks, *args, **kwargs): + super(WorkerThread, self).__init__(*args, **kwargs) + self._tasks = tasks + self.outputs = [] + self.exceptions = [] + + self.daemon = True + self.start() + + def run(self): + """Runs until a None task is queued.""" + while True: + task = self._tasks.get() + if task is None: + # We're done. + return + try: + func, args, kwargs = task + self.outputs.append(func(*args, **kwargs)) + except Exception, e: + logging.error('Caught exception! %s' % e) + self.exceptions.append(sys.exc_info()) + finally: + self._tasks.task_done() + + +class ThreadPool(object): + """Implements a multithreaded worker pool oriented for mapping jobs with + thread-local result storage. + """ + def __init__(self, num_threads): + self._tasks = QueueWithTimeout() + self._workers = [ + WorkerThread(self._tasks, name='worker-%d' % i) + for i in range(num_threads) + ] + + def add_task(self, func, *args, **kwargs): + """Adds a task, a function to be executed by a worker. + + The function's return value will be stored in the the worker's thread local + outputs list. + """ + self._tasks.put((func, args, kwargs)) + + def join(self, progress=None, timeout=None): + """Extracts all the results from each threads unordered.""" + if progress and timeout: + while not self._tasks.join(timeout): + progress.print_update() + else: + self._tasks.join() + out = [] + for w in self._workers: + if w.exceptions: + raise w.exceptions[0][0], w.exceptions[0][1], w.exceptions[0][2] + out.extend(w.outputs) + w.outputs = [] + # Look for exceptions. + return out + + def close(self): + """Closes all the threads.""" + for _ in range(len(self._workers)): + # Enqueueing None causes the worker to stop. + self._tasks.put(None) + for t in self._workers: + t.join() + + def __enter__(self): + """Enables 'with' statement.""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Enables 'with' statement.""" + self.close() + + +class Progress(object): + """Prints progress and accepts updates thread-safely.""" + def __init__(self, size): + self.last_printed_line = '' + self.next_line = '' + self.index = -1 + self.size = size + self.start = time.time() + self.lock = threading.Lock() + self.update_item('') + + def update_item(self, name): + with self.lock: + self.index += 1 + self.next_line = '%d of %d (%.1f%%), %.1fs: %s' % ( + self.index, + self.size, + self.index * 100. / self.size, + time.time() - self.start, + name) + + def print_update(self): + """Prints the current status.""" + with self.lock: + if self.next_line == self.last_printed_line: + return + line = '\r%s%s' % ( + self.next_line, + ' ' * max(0, len(self.last_printed_line) - len(self.next_line))) + self.last_printed_line = self.next_line + sys.stderr.write(line) + + def increase_count(self): + with self.lock: + self.size += 1 + + def fix_python_path(cmd): """Returns the fixed command line to call the right python executable.""" out = cmd[:] @@ -322,8 +461,8 @@ def run_test_cases( if not test_cases: return - progress = worker_pool.Progress(len(test_cases)) - with worker_pool.ThreadPool(jobs or multiprocessing.cpu_count()) as pool: + progress = Progress(len(test_cases)) + with ThreadPool(jobs or multiprocessing.cpu_count()) as pool: function = Runner(executable, os.getcwd(), timeout, progress).map for test_case in test_cases: pool.add_task(function, test_case) diff --git a/tools/isolate/run_test_cases_test.py b/tools/isolate/run_test_cases_test.py index 29313b1..8f06a6c 100755 --- a/tools/isolate/run_test_cases_test.py +++ b/tools/isolate/run_test_cases_test.py @@ -97,6 +97,31 @@ class RunTestCases(unittest.TestCase): self.assertEquals(0, code) +class WorkerPoolTest(unittest.TestCase): + def test_normal(self): + mapper = lambda value: -value + with run_test_cases.ThreadPool(8) as pool: + for i in range(32): + pool.add_task(mapper, i) + results = pool.join() + self.assertEquals(range(-31, 1), sorted(results)) + + def test_exception(self): + class FearsomeException(Exception): + pass + def mapper(value): + raise FearsomeException(value) + task_added = False + try: + with run_test_cases.ThreadPool(8) as pool: + pool.add_task(mapper, 0) + task_added = True + pool.join() + self.fail() + except FearsomeException: + self.assertEquals(True, task_added) + + if __name__ == '__main__': VERBOSE = '-v' in sys.argv logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) diff --git a/tools/isolate/trace_inputs_smoke_test.py b/tools/isolate/trace_inputs_smoke_test.py index 0dc3d72..9da8b78 100755 --- a/tools/isolate/trace_inputs_smoke_test.py +++ b/tools/isolate/trace_inputs_smoke_test.py @@ -12,7 +12,7 @@ import sys import tempfile import unittest -import worker_pool +import run_test_cases FULLNAME = os.path.abspath(__file__) ROOT_DIR = os.path.dirname(FULLNAME) @@ -432,7 +432,7 @@ class TraceInputsImport(TraceInputsBase): cmd, cwd, tracename, True) return (tracename, resultcode, output) - with worker_pool.ThreadPool(PARALLEL) as pool: + with run_test_cases.ThreadPool(PARALLEL) as pool: api = self.trace_inputs.get_api() with api.get_tracer(self.log) as tracer: pool.add_task( diff --git a/tools/isolate/trace_test_cases.py b/tools/isolate/trace_test_cases.py index dee2b06..92e7097 100755 --- a/tools/isolate/trace_test_cases.py +++ b/tools/isolate/trace_test_cases.py @@ -20,7 +20,6 @@ import time import isolate_common import run_test_cases import trace_inputs -import worker_pool BASE_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(os.path.dirname(BASE_DIR)) @@ -111,8 +110,8 @@ def trace_test_cases( assert os.path.isdir(full_cwd_dir) logname = output_file + '.logs' - progress = worker_pool.Progress(len(test_cases)) - with worker_pool.ThreadPool(jobs or multiprocessing.cpu_count()) as pool: + progress = run_test_cases.Progress(len(test_cases)) + with run_test_cases.ThreadPool(jobs or multiprocessing.cpu_count()) as pool: api = trace_inputs.get_api() api.clean_trace(logname) with api.get_tracer(logname) as tracer: diff --git a/tools/isolate/worker_pool.py b/tools/isolate/worker_pool.py deleted file mode 100644 index af0d4c9..0000000 --- a/tools/isolate/worker_pool.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright (c) 2012 The Chromium Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Implements a multithreaded worker pool oriented for mapping jobs with -thread-local result storage. -""" - -import logging -import Queue -import sys -import time -import threading - - -class QueueWithTimeout(Queue.Queue): - """Implements timeout support in join().""" - - # QueueWithTimeout.join: Arguments number differs from overridden method - # pylint: disable=W0221 - def join(self, timeout=None): - """Returns True if all tasks are finished.""" - if not timeout: - return Queue.Queue.join(self) - start = time.time() - self.all_tasks_done.acquire() - try: - while self.unfinished_tasks: - remaining = time.time() - start - timeout - if remaining <= 0: - break - self.all_tasks_done.wait(remaining) - return not self.unfinished_tasks - finally: - self.all_tasks_done.release() - - -class WorkerThread(threading.Thread): - """Keeps the results of each task in a thread-local outputs variable.""" - def __init__(self, tasks, *args, **kwargs): - super(WorkerThread, self).__init__(*args, **kwargs) - self._tasks = tasks - self.outputs = [] - self.exceptions = [] - - self.daemon = True - self.start() - - def run(self): - """Runs until a None task is queued.""" - while True: - task = self._tasks.get() - if task is None: - # We're done. - return - try: - func, args, kwargs = task - self.outputs.append(func(*args, **kwargs)) - except Exception, e: - logging.error('Caught exception! %s' % e) - self.exceptions.append(sys.exc_info()) - finally: - self._tasks.task_done() - - -class ThreadPool(object): - def __init__(self, num_threads): - self._tasks = QueueWithTimeout() - self._workers = [ - WorkerThread(self._tasks, name='worker-%d' % i) - for i in range(num_threads) - ] - - def add_task(self, func, *args, **kwargs): - """Adds a task, a function to be executed by a worker. - - The function's return value will be stored in the the worker's thread local - outputs list. - """ - self._tasks.put((func, args, kwargs)) - - def join(self, progress=None, timeout=None): - """Extracts all the results from each threads unordered.""" - if progress and timeout: - while not self._tasks.join(timeout): - progress.print_update() - else: - self._tasks.join() - out = [] - for w in self._workers: - if w.exceptions: - raise w.exceptions[0][0], w.exceptions[0][1], w.exceptions[0][2] - out.extend(w.outputs) - w.outputs = [] - # Look for exceptions. - return out - - def close(self): - """Closes all the threads.""" - for _ in range(len(self._workers)): - # Enqueueing None causes the worker to stop. - self._tasks.put(None) - for t in self._workers: - t.join() - - def __enter__(self): - """Enables 'with' statement.""" - return self - - def __exit__(self, exc_type, exc_value, traceback): - """Enables 'with' statement.""" - self.close() - - -class Progress(object): - """Prints progress and accepts updates thread-safely.""" - def __init__(self, size): - self.last_printed_line = '' - self.next_line = '' - self.index = -1 - self.size = size - self.start = time.time() - self.lock = threading.Lock() - self.update_item('') - - def update_item(self, name): - with self.lock: - self.index += 1 - self.next_line = '%d of %d (%.1f%%), %.1fs: %s' % ( - self.index, - self.size, - self.index * 100. / self.size, - time.time() - self.start, - name) - - def print_update(self): - """Prints the current status.""" - with self.lock: - if self.next_line == self.last_printed_line: - return - line = '\r%s%s' % ( - self.next_line, - ' ' * max(0, len(self.last_printed_line) - len(self.next_line))) - self.last_printed_line = self.next_line - sys.stderr.write(line) - - def increase_count(self): - with self.lock: - self.size += 1 diff --git a/tools/isolate/worker_pool_test.py b/tools/isolate/worker_pool_test.py deleted file mode 100755 index 577a38d..0000000 --- a/tools/isolate/worker_pool_test.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2012 The Chromium Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -import unittest - -import worker_pool - - -class WorkerPoolTest(unittest.TestCase): - def test_normal(self): - mapper = lambda value: -value - with worker_pool.ThreadPool(8) as pool: - for i in range(32): - pool.add_task(mapper, i) - results = pool.join() - self.assertEquals(range(-31, 1), sorted(results)) - - def test_exception(self): - class FearsomeException(Exception): - pass - def mapper(value): - raise FearsomeException(value) - task_added = False - try: - with worker_pool.ThreadPool(8) as pool: - pool.add_task(mapper, 0) - task_added = True - pool.join() - self.fail() - except FearsomeException: - self.assertEquals(True, task_added) - - -if __name__ == '__main__': - unittest.main() |