summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authormaruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-07-19 18:15:11 +0000
committermaruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-07-19 18:15:11 +0000
commit1c7a70157c2dae86faba2afd36c7da51398b779b (patch)
tree2c63ea73d9f0dc79fb808af6f8c8148447d45343 /tools
parentdca4c16aafe0fe480a9857ec8ea83f80b4d1fccf (diff)
downloadchromium_src-1c7a70157c2dae86faba2afd36c7da51398b779b.zip
chromium_src-1c7a70157c2dae86faba2afd36c7da51398b779b.tar.gz
chromium_src-1c7a70157c2dae86faba2afd36c7da51398b779b.tar.bz2
Merge worker_pool into run_test_cases.
This makes run_test_cases.py completely standalone so it can be easily used on the swarm slave. R=cmp@chromium.org NOTRY=true BUG= TEST= Review URL: https://chromiumcodereview.appspot.com/10803030 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@147492 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-xtools/isolate/run_test_cases.py147
-rwxr-xr-xtools/isolate/run_test_cases_test.py25
-rwxr-xr-xtools/isolate/trace_inputs_smoke_test.py4
-rwxr-xr-xtools/isolate/trace_test_cases.py5
-rw-r--r--tools/isolate/worker_pool.py149
-rwxr-xr-xtools/isolate/worker_pool_test.py37
6 files changed, 172 insertions, 195 deletions
diff --git a/tools/isolate/run_test_cases.py b/tools/isolate/run_test_cases.py
index f9fffc8..0fdf269 100755
--- a/tools/isolate/run_test_cases.py
+++ b/tools/isolate/run_test_cases.py
@@ -15,12 +15,12 @@ import logging
import multiprocessing
import optparse
import os
+import Queue
import subprocess
import sys
+import threading
import time
-import worker_pool
-
if subprocess.mswindows:
import msvcrt # pylint: disable=F0401
@@ -153,6 +153,145 @@ def call_with_timeout(cmd, timeout, **kwargs):
return output, proc.returncode
+class QueueWithTimeout(Queue.Queue):
+ """Implements timeout support in join()."""
+
+ # QueueWithTimeout.join: Arguments number differs from overridden method
+ # pylint: disable=W0221
+ def join(self, timeout=None):
+ """Returns True if all tasks are finished."""
+ if not timeout:
+ return Queue.Queue.join(self)
+ start = time.time()
+ self.all_tasks_done.acquire()
+ try:
+ while self.unfinished_tasks:
+ remaining = time.time() - start - timeout
+ if remaining <= 0:
+ break
+ self.all_tasks_done.wait(remaining)
+ return not self.unfinished_tasks
+ finally:
+ self.all_tasks_done.release()
+
+
+class WorkerThread(threading.Thread):
+ """Keeps the results of each task in a thread-local outputs variable."""
+ def __init__(self, tasks, *args, **kwargs):
+ super(WorkerThread, self).__init__(*args, **kwargs)
+ self._tasks = tasks
+ self.outputs = []
+ self.exceptions = []
+
+ self.daemon = True
+ self.start()
+
+ def run(self):
+ """Runs until a None task is queued."""
+ while True:
+ task = self._tasks.get()
+ if task is None:
+ # We're done.
+ return
+ try:
+ func, args, kwargs = task
+ self.outputs.append(func(*args, **kwargs))
+ except Exception, e:
+ logging.error('Caught exception! %s' % e)
+ self.exceptions.append(sys.exc_info())
+ finally:
+ self._tasks.task_done()
+
+
+class ThreadPool(object):
+ """Implements a multithreaded worker pool oriented for mapping jobs with
+ thread-local result storage.
+ """
+ def __init__(self, num_threads):
+ self._tasks = QueueWithTimeout()
+ self._workers = [
+ WorkerThread(self._tasks, name='worker-%d' % i)
+ for i in range(num_threads)
+ ]
+
+ def add_task(self, func, *args, **kwargs):
+ """Adds a task, a function to be executed by a worker.
+
+ The function's return value will be stored in the the worker's thread local
+ outputs list.
+ """
+ self._tasks.put((func, args, kwargs))
+
+ def join(self, progress=None, timeout=None):
+ """Extracts all the results from each threads unordered."""
+ if progress and timeout:
+ while not self._tasks.join(timeout):
+ progress.print_update()
+ else:
+ self._tasks.join()
+ out = []
+ for w in self._workers:
+ if w.exceptions:
+ raise w.exceptions[0][0], w.exceptions[0][1], w.exceptions[0][2]
+ out.extend(w.outputs)
+ w.outputs = []
+ # Look for exceptions.
+ return out
+
+ def close(self):
+ """Closes all the threads."""
+ for _ in range(len(self._workers)):
+ # Enqueueing None causes the worker to stop.
+ self._tasks.put(None)
+ for t in self._workers:
+ t.join()
+
+ def __enter__(self):
+ """Enables 'with' statement."""
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Enables 'with' statement."""
+ self.close()
+
+
+class Progress(object):
+ """Prints progress and accepts updates thread-safely."""
+ def __init__(self, size):
+ self.last_printed_line = ''
+ self.next_line = ''
+ self.index = -1
+ self.size = size
+ self.start = time.time()
+ self.lock = threading.Lock()
+ self.update_item('')
+
+ def update_item(self, name):
+ with self.lock:
+ self.index += 1
+ self.next_line = '%d of %d (%.1f%%), %.1fs: %s' % (
+ self.index,
+ self.size,
+ self.index * 100. / self.size,
+ time.time() - self.start,
+ name)
+
+ def print_update(self):
+ """Prints the current status."""
+ with self.lock:
+ if self.next_line == self.last_printed_line:
+ return
+ line = '\r%s%s' % (
+ self.next_line,
+ ' ' * max(0, len(self.last_printed_line) - len(self.next_line)))
+ self.last_printed_line = self.next_line
+ sys.stderr.write(line)
+
+ def increase_count(self):
+ with self.lock:
+ self.size += 1
+
+
def fix_python_path(cmd):
"""Returns the fixed command line to call the right python executable."""
out = cmd[:]
@@ -322,8 +461,8 @@ def run_test_cases(
if not test_cases:
return
- progress = worker_pool.Progress(len(test_cases))
- with worker_pool.ThreadPool(jobs or multiprocessing.cpu_count()) as pool:
+ progress = Progress(len(test_cases))
+ with ThreadPool(jobs or multiprocessing.cpu_count()) as pool:
function = Runner(executable, os.getcwd(), timeout, progress).map
for test_case in test_cases:
pool.add_task(function, test_case)
diff --git a/tools/isolate/run_test_cases_test.py b/tools/isolate/run_test_cases_test.py
index 29313b1..8f06a6c 100755
--- a/tools/isolate/run_test_cases_test.py
+++ b/tools/isolate/run_test_cases_test.py
@@ -97,6 +97,31 @@ class RunTestCases(unittest.TestCase):
self.assertEquals(0, code)
+class WorkerPoolTest(unittest.TestCase):
+ def test_normal(self):
+ mapper = lambda value: -value
+ with run_test_cases.ThreadPool(8) as pool:
+ for i in range(32):
+ pool.add_task(mapper, i)
+ results = pool.join()
+ self.assertEquals(range(-31, 1), sorted(results))
+
+ def test_exception(self):
+ class FearsomeException(Exception):
+ pass
+ def mapper(value):
+ raise FearsomeException(value)
+ task_added = False
+ try:
+ with run_test_cases.ThreadPool(8) as pool:
+ pool.add_task(mapper, 0)
+ task_added = True
+ pool.join()
+ self.fail()
+ except FearsomeException:
+ self.assertEquals(True, task_added)
+
+
if __name__ == '__main__':
VERBOSE = '-v' in sys.argv
logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR)
diff --git a/tools/isolate/trace_inputs_smoke_test.py b/tools/isolate/trace_inputs_smoke_test.py
index 0dc3d72..9da8b78 100755
--- a/tools/isolate/trace_inputs_smoke_test.py
+++ b/tools/isolate/trace_inputs_smoke_test.py
@@ -12,7 +12,7 @@ import sys
import tempfile
import unittest
-import worker_pool
+import run_test_cases
FULLNAME = os.path.abspath(__file__)
ROOT_DIR = os.path.dirname(FULLNAME)
@@ -432,7 +432,7 @@ class TraceInputsImport(TraceInputsBase):
cmd, cwd, tracename, True)
return (tracename, resultcode, output)
- with worker_pool.ThreadPool(PARALLEL) as pool:
+ with run_test_cases.ThreadPool(PARALLEL) as pool:
api = self.trace_inputs.get_api()
with api.get_tracer(self.log) as tracer:
pool.add_task(
diff --git a/tools/isolate/trace_test_cases.py b/tools/isolate/trace_test_cases.py
index dee2b06..92e7097 100755
--- a/tools/isolate/trace_test_cases.py
+++ b/tools/isolate/trace_test_cases.py
@@ -20,7 +20,6 @@ import time
import isolate_common
import run_test_cases
import trace_inputs
-import worker_pool
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(os.path.dirname(BASE_DIR))
@@ -111,8 +110,8 @@ def trace_test_cases(
assert os.path.isdir(full_cwd_dir)
logname = output_file + '.logs'
- progress = worker_pool.Progress(len(test_cases))
- with worker_pool.ThreadPool(jobs or multiprocessing.cpu_count()) as pool:
+ progress = run_test_cases.Progress(len(test_cases))
+ with run_test_cases.ThreadPool(jobs or multiprocessing.cpu_count()) as pool:
api = trace_inputs.get_api()
api.clean_trace(logname)
with api.get_tracer(logname) as tracer:
diff --git a/tools/isolate/worker_pool.py b/tools/isolate/worker_pool.py
deleted file mode 100644
index af0d4c9..0000000
--- a/tools/isolate/worker_pool.py
+++ /dev/null
@@ -1,149 +0,0 @@
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Implements a multithreaded worker pool oriented for mapping jobs with
-thread-local result storage.
-"""
-
-import logging
-import Queue
-import sys
-import time
-import threading
-
-
-class QueueWithTimeout(Queue.Queue):
- """Implements timeout support in join()."""
-
- # QueueWithTimeout.join: Arguments number differs from overridden method
- # pylint: disable=W0221
- def join(self, timeout=None):
- """Returns True if all tasks are finished."""
- if not timeout:
- return Queue.Queue.join(self)
- start = time.time()
- self.all_tasks_done.acquire()
- try:
- while self.unfinished_tasks:
- remaining = time.time() - start - timeout
- if remaining <= 0:
- break
- self.all_tasks_done.wait(remaining)
- return not self.unfinished_tasks
- finally:
- self.all_tasks_done.release()
-
-
-class WorkerThread(threading.Thread):
- """Keeps the results of each task in a thread-local outputs variable."""
- def __init__(self, tasks, *args, **kwargs):
- super(WorkerThread, self).__init__(*args, **kwargs)
- self._tasks = tasks
- self.outputs = []
- self.exceptions = []
-
- self.daemon = True
- self.start()
-
- def run(self):
- """Runs until a None task is queued."""
- while True:
- task = self._tasks.get()
- if task is None:
- # We're done.
- return
- try:
- func, args, kwargs = task
- self.outputs.append(func(*args, **kwargs))
- except Exception, e:
- logging.error('Caught exception! %s' % e)
- self.exceptions.append(sys.exc_info())
- finally:
- self._tasks.task_done()
-
-
-class ThreadPool(object):
- def __init__(self, num_threads):
- self._tasks = QueueWithTimeout()
- self._workers = [
- WorkerThread(self._tasks, name='worker-%d' % i)
- for i in range(num_threads)
- ]
-
- def add_task(self, func, *args, **kwargs):
- """Adds a task, a function to be executed by a worker.
-
- The function's return value will be stored in the the worker's thread local
- outputs list.
- """
- self._tasks.put((func, args, kwargs))
-
- def join(self, progress=None, timeout=None):
- """Extracts all the results from each threads unordered."""
- if progress and timeout:
- while not self._tasks.join(timeout):
- progress.print_update()
- else:
- self._tasks.join()
- out = []
- for w in self._workers:
- if w.exceptions:
- raise w.exceptions[0][0], w.exceptions[0][1], w.exceptions[0][2]
- out.extend(w.outputs)
- w.outputs = []
- # Look for exceptions.
- return out
-
- def close(self):
- """Closes all the threads."""
- for _ in range(len(self._workers)):
- # Enqueueing None causes the worker to stop.
- self._tasks.put(None)
- for t in self._workers:
- t.join()
-
- def __enter__(self):
- """Enables 'with' statement."""
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- """Enables 'with' statement."""
- self.close()
-
-
-class Progress(object):
- """Prints progress and accepts updates thread-safely."""
- def __init__(self, size):
- self.last_printed_line = ''
- self.next_line = ''
- self.index = -1
- self.size = size
- self.start = time.time()
- self.lock = threading.Lock()
- self.update_item('')
-
- def update_item(self, name):
- with self.lock:
- self.index += 1
- self.next_line = '%d of %d (%.1f%%), %.1fs: %s' % (
- self.index,
- self.size,
- self.index * 100. / self.size,
- time.time() - self.start,
- name)
-
- def print_update(self):
- """Prints the current status."""
- with self.lock:
- if self.next_line == self.last_printed_line:
- return
- line = '\r%s%s' % (
- self.next_line,
- ' ' * max(0, len(self.last_printed_line) - len(self.next_line)))
- self.last_printed_line = self.next_line
- sys.stderr.write(line)
-
- def increase_count(self):
- with self.lock:
- self.size += 1
diff --git a/tools/isolate/worker_pool_test.py b/tools/isolate/worker_pool_test.py
deleted file mode 100755
index 577a38d..0000000
--- a/tools/isolate/worker_pool_test.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import unittest
-
-import worker_pool
-
-
-class WorkerPoolTest(unittest.TestCase):
- def test_normal(self):
- mapper = lambda value: -value
- with worker_pool.ThreadPool(8) as pool:
- for i in range(32):
- pool.add_task(mapper, i)
- results = pool.join()
- self.assertEquals(range(-31, 1), sorted(results))
-
- def test_exception(self):
- class FearsomeException(Exception):
- pass
- def mapper(value):
- raise FearsomeException(value)
- task_added = False
- try:
- with worker_pool.ThreadPool(8) as pool:
- pool.add_task(mapper, 0)
- task_added = True
- pool.join()
- self.fail()
- except FearsomeException:
- self.assertEquals(True, task_added)
-
-
-if __name__ == '__main__':
- unittest.main()